REDIS_LIST (列表) 是LPUSH 、LRANGE 等命令的操作对象, 它使用

REDIS_ENCODING_ZIPLIST 和REDIS_ENCODING_LINKEDLIST 这两种方式编码:

编码的选择

创建新列表时Redis 默认使用REDIS_ENCODING_ZIPLIST 编码,当以下任意一个条件被满足

时,列表会被转换成REDIS_ENCODING_LINKEDLIST 编码:

试图往列表新添加一个字符串值, 且这个字符串的长度超过

server.list_max_ziplist_value (默认值为64 )。

ziplist 包含的节点超过server.list_max_ziplist_entries (默认值为512 )。

列表命令的实现

因为两种底层实现的抽象方式和列表的抽象方式非常接近,所以列表命令几乎就是通过一对一

地映射到底层数据结构的操作来实现的。

我们将焦点放在BLPOP 、BRPOP 和BRPOPLPUSH 这个几个阻塞命令的实现原理上。

阻塞的条件

BLPOP 、BRPOP 和BRPOPLPUSH 三个命令都可能造成客户端被阻塞,以下将这些命令统

称为列表的阻塞原语。

阻塞原语并不是一定会造成客户端阻塞:

只有当这些命令被用于空列表时,它们才会阻塞客户端。

如果被处理的列表不为空的话,它们就执行无阻塞版本的LPOP 、RPOP 或RPOPLPUSH

命令。

作为例子,以下流程图展示了BLPOP 决定是否对客户端进行阻塞过程:

阻塞

当一个阻塞原语的处理目标为空键时,执行该阻塞原语的客户端就会被阻塞。

阻塞一个客户端需要执行以下步骤:

1. 将客户端的状态设为“正在阻塞” ,并记录阻塞这个客户端的各个键,以及阻塞的最长时限

(timeout)等数据。

2. 将客户端的信息记录到server.db[i]->blocking_keys 中(其中i 为客户端所使用的数

据库号码)。

3. 继续维持客户端和服务器之间的网络连接,但不再向客户端传送任何信息,造成客户端

阻塞。

步骤2 是将来解除阻塞的关键,server.db[i]->blocking_keys 是一个字典,字典的键是那

些造成客户端阻塞的键,而字典的值是一个链表,链表里保存了所有因为这个键而被阻塞的客

户端(被同一个键所阻塞的客户端可能不止一个):

在上图展示的blocking_keys 例子中,client2 、client5 和client1 三个客户端就正被

key1 阻塞,而其他几个客户端也正在被别的两个key 阻塞。

当客户端被阻塞之后,脱离阻塞状态有以下三种方法:

1. 被动脱离:有其他客户端为造成阻塞的键推入了新元素。

2. 主动脱离:到达执行阻塞原语时设定的最大阻塞时间。

3. 强制脱离:客户端强制终止和服务器的连接,或者服务器停机。

以下内容将分别介绍被动脱离和主动脱离的实现方式。

阻塞因LPUSH 、RPUSH 、LINSERT 等添加命令而被取消

通过将新元素推入造成客户端阻塞的某个键中,可以让相应的客户端从阻塞状态中脱离出来

(取消阻塞的客户端数量取决于推入元素的数量)。

LPUSH 、RPUSH 和LINSERT 这三个添加新元素到列表的命令, 在底层都由一个

pushGenericCommand 的函数实现,这个函数的运作流程如下图:

当向一个空键推入新元素时,pushGenericCommand 函数执行以下两件事:

1. 检查这个键是否存在于前面提到的server.db[i]->blocking_keys 字典里,如果是的

话,那么说明有至少一个客户端因为这个key 而被阻塞,程序会为这个键创建一个

redis.h/readyList 结构,并将它添加到server.ready_keys 链表中。

2. 将给定的值添加到列表键中。

readyList 结构的定义如下:

typedef struct readyList {redisDb *db;robj *key;} readyList;

readyList 结构的key 属性指向造成阻塞的键,而db 则指向该键所在的数据库。

举个例子,假设某个非阻塞客户端正在使用0 号数据库,而这个数据库当前的blocking_keys

属性的值如下:

如果这时客户端对该数据库执行PUSH key3 value ,那么pushGenericCommand 将创建一个

db 属性指向0 号数据库、key 属性指向key3 键对象的readyList 结构,并将它添加到服务器

server.ready_keys 属性的链表中:

在我们这个例子中,到目前为止,pushGenericCommand 函数完成了以下两件事:

1. 将readyList 添加到服务器。

2. 将新元素value 添加到键key3 。

虽然key3 已经不再是空键,但到目前为止,被key3 阻塞的客户端还没有任何一个被解除阻塞

状态。

为了做到这一点,Redis 的主进程在执行完pushGenericCommand 函数之后,会继续调用

handleClientsBlockedOnLists 函数,这个函数执行以下操作:

1. 如果server.ready_keys 不为空, 那么弹出该链表的表头元素, 并取出元素中的

readyList 值。

2. 根据readyList 值所保存的key 和db ,在server.blocking_keys 中查找所有因为key

而被阻塞的客户端(以链表的形式保存)。

3. 如果key 不为空,那么从key 中弹出一个元素,并弹出客户端链表的第一个客户端,然

后将被弹出元素返回给被弹出客户端作为阻塞原语的返回值。

4. 根据readyList 结构的属性,删除server.blocking_keys 中相应的客户端数据,取消

客户端的阻塞状态。

5. 继续执行步骤3 和4 ,直到key 没有元素可弹出,或者所有因为key 而阻塞的客户端都

取消阻塞为止。

6. 继续执行步骤1 ,直到ready_keys 链表里的所有readyList 结构都被处理完为止。

用一段伪代码描述以上操作可能会更直观一些:

def handleClientsBlockedOnLists():# 执行直到ready_keys 为空while server.ready_keys != NULL:# 弹出链表中的第一个readyListrl = server.ready_keys.pop_first_node()# 遍历所有因为这个键而被阻塞的客户端for client in all_client_blocking_by_key(rl.key, rl.db):# 只要还有客户端被这个键阻塞,就一直从键中弹出元素# 如果被阻塞客户端执行的是BLPOP ,那么对键执行LPOP# 如果执行的是BRPOP ,那么对键执行RPOPelement = rl.key.pop_element()if element == NULL:# 键为空,跳出for 循环# 余下的未解除阻塞的客户端只能等待下次新元素的进入了breakelse:#清除客户端的阻塞信息server.blocking_keys.remove_blocking_info(client)# 将元素返回给客户端,脱离阻塞状态client.reply_list_item(element)

先阻塞先服务(FBFS)策略

值得一提的是,当程序添加一个新的被阻塞客户端到server.blocking_keys 字典的链表中

时,它将该客户端放在链表的最后,而当handleClientsBlockedOnLists 取消客户端的阻塞

时,它从链表的最前面开始取消阻塞:这个链表形成了一个FIFO 队列,最先被阻塞的客户端

总值最先脱离阻塞状态,Redis 文档称这种模式为先阻塞先服务(FBFS,first-block-first-serve)。

举个例子,在下图所示的阻塞状况中,如果客户端对数据库执行PUSH key3 value ,那么只有

client3 会被取消阻塞,client6 和client4 仍然阻塞;如果客户端对数据库执行PUSH key3

value1 value2 ,那么client3 和client4 的阻塞都会被取消,而客户端client6 依然处于

阻塞状态:

阻塞因超过最大等待时间而被取消

前面提到过,当客户端被阻塞时,所有造成它阻塞的键,以及阻塞的最长时限会被记录在客户

端里面,并且该客户端的状态会被设置为“正在阻塞” 。

每次Redis 服务器常规操作函数(server cron job)执行时,程序都会检查所有连接到服务器

的客户端,查看那些处于“正在阻塞”状态的客户端的最大阻塞时限是否已经过期,如果是的话,

就给客户端返回一个空白回复,然后撤销对客户端的阻塞。

可以用一段伪代码来描述这个过程:

def server_cron_job():# 其他操作...# 遍历所有已连接客户端for client in server.all_connected_client:# 如果客户端状态为“正在阻塞”,并且最大阻塞时限已到达if client.state == BLOCKING and \client.max_blocking_timestamp < current_timestamp():# 那么给客户端发送空回复, 脱离阻塞状态client.send_empty_reply()# 并清除客户端在服务器上的阻塞信息server.blocking_keys.remove_blocking_info(client)# 其他操作...