Stream基本介绍
从redis5.0开始,推出了Stream功能。在Stream中有一个消息链表,所有加入链表中的消息都会被串起来,每一条消息都有自己唯一的ID,还有对应的消息内容,消息内容就是key-value。一个Stream上可以有多个消费者,每个消费者都有一个游标,用于访问消息,在消息链表移动就代表着消费消息,多个消费者之间互不影响。
基本命令
XADD
该命令用于添加消息,返回的是消息的ID,语法:
XADD key ID field string [field string …]
例子:
> XADD mystream * name tom age 13
"1651118123780-0"
> XADD mystream * name lily age 24
"1651118131124-0"
> XADD mystream * name lisa age 47
"1651118219219-0"
mystream
是Stream的key,在这个例子中,我们传入的ID参数值是*
,因为我们希望由Redis服务器为我们自动生成一个新的ID。每一个新的ID都会单调增长,简单来讲就是,每次新添加的条目都会拥有一个比其它所有条目更大的ID。
Stream的一条记录可以有多个字段及值,后面加了name
和age
。
条目 ID
条目ID由XADD
命令返回,并且可以唯一的标识给定Stream中的每一个条目,<millisecondsTime>-<sequenceNumber>
由两部分组成:
millisecondsTime
毫秒时间部分实际是生成Stream ID的Redis节点的服务器本地时间,但是如果当前毫秒时间戳比以前的条目时间戳小的话,那么会使用以前的条目时间,所以即便是服务器时钟向后跳,单调增长ID的特性仍然会保持不变。sequenceNumber
序列号用于以相同毫秒创建的条目。由于序列号是64位的,所以实际上对于在同一毫秒内生成的条目数量是没有限制的。
如果由于某些原因,用户需要与时间无关但实际上与另一个外部系统ID关联的增量ID,就像前面所说的,XADD命令可以带上一个显式的ID,而不是使用通配符*来自动生成,如下所示:
> XADD xdemo 0-1 name AAA
"0-1"
> XADD xdemo 0-2 name EEE
"0-2"
> XADD xdemo 0-2 name BBB
(error) ERR The ID specified in XADD is equal or smaller than the target stream top item
注意: 在这种情况下,命令不接受等于或小于前一个ID的ID。
XLEN
该命令用于获取消息长度。语法:
XLEN key
例子:
> XLEN mystream
(integer) 3
XRANGE
该命令用于获取消息列表,两个特殊的ID-
和+
分别表示可能的最小ID和最大ID。语法:
XRANGE key start end [COUNT count]
例子:
> XRANGE mystream - +
1) 1) "1651118123780-0"
2) 1) "name"
2) "tom"
3) "age"
4) "13"
2) 1) "1651118131124-0"
2) 1) "name"
2) "lily"
3) "age"
4) "24"
3) 1) "1651118219219-0"
2) 1) "name"
2) "lisa"
3) "age"
4) "47"
返回的每个条目都是有两个元素的数组:ID和键值对列表。条目ID与时间有关系,因为在字符-
左边的部分是创建Stream条目的本地节点上的Unix毫秒时间,即条目创建的那一刻(请注意:Streams的复制使用的是完全详尽的XADD命令,因此从节点将具有与主节点相同的ID)。
这意味着我可以使用XRANGE查询一个时间范围。然而为了做到这一点,我可能想要省略ID的序列号部分:如果省略,区间范围的开始序列号将默认为0,结束部分的序列号默认是有效的最大序列号。这样一来,仅使用两个Unix毫秒时间去查询,我们就可以得到在那段时间内产生的所有条目(包含开始和结束)。例如,我可能想要查询两毫秒时间,可以这样使用:
> XRANGE mystream 1651118123780 1651118131124
1) 1) "1651118123780-0"
2) 1) "name"
2) "tom"
3) "age"
4) "13"
2) 1) "1651118131124-0"
2) 1) "name"
2) "lily"
3) "age"
4) "24"
如果只想获取2个项目,我从全范围开始,那么count是2。
> XRANGE mystream - + count 2
1) 1) "1651118123780-0"
2) 1) "name"
2) "tom"
3) "age"
4) "13"
2) 1) "1651118131124-0"
2) 1) "name"
2) "lily"
3) "age"
4) "24"
如果想继续迭代下一个项目,必须选择刚刚返回的最后一个ID,即1519073279157-0
,并且在ID序列号部分加1。请注意,序列号是64位的,因此无需检查溢出。在这个例子中,得到的结果ID是1651118131124-1
,现在可以用作下一次XRANGE
调用的新的start
参数:
> XRANGE mystream 1651118131124-1 +
1) 1) "1651118219219-0"
2) 1) "name"
2) "lisa"
3) "age"
4) "47"
依此类推。由于XRANGE的查找复杂度是O(log(N)),因此O(M)返回M个元素,这个命令在count较小时,具有对数时间复杂度,这意味着每一步迭代速度都很快。所以XRANGE也是事实上的流迭代器并且不需要XSCAN命令。
XREVRANGE
XREVRANGE
与XRANGE
相同,是以相反的顺序返回元素,因此XREVRANGE
的实际用途是检查一个Stream中的最后一项是什么:
> XREVRANGE mystream + - count 1
1) 1) "1651118219219-0"
2) 1) "name"
2) "lisa"
3) "age"
4) "47"
XDEL
该命令用于删除消息。语法:
XDEL key ID [ID …]
例子:
> XDEL mystream 1651117525833-0
(integer) 1
> XRANGE mystream - +
1) 1) "1651117411801-0"
2) 1) "name"
2) "tom"
3) "age"
4) "13"
如果想删除整个mystream,就可以使用del
命令:
> DEL xdemo
(integer) 1
零长度Stream
Stream与其他Redis数据结构有一个不同的地方在于,当其他数据结构没有元素的时候,调用删除元素的命令会把key本身删掉。举例来说就是,当调用ZREM
命令将有序集合中的最后一个元素删除时,这个有序集合会被彻底删除。但Stream允许在没有元素的时候仍然存在,不管是因为使用MAXLEN
选项的时候指定了count为零(在XADD
和XTRIM
命令中),或者因为调用了XDEL
命令。
存在这种不对称性的原因是因为,Stream可能具有相关联的消费者组,以及我们不希望因为Stream中没有项目而丢失消费者组定义的状态。当前,即使没有相关联的消费者组,Stream也不会被删除。
XREAD
该命令用于提供监听到达Stream的新消息的能力的命令。语法:
XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key …] ID [ID …]
从消息链头部开启读取1条消息,count 为 1,0代表头部
> XREAD count 2 STREAMS mystream 0
1) 1) "mystream"
2) 1) 1) "1651118123780-0"
2) 1) "name"
2) "tom"
3) "age"
4) "13"
2) 1) "1651118131124-0"
2) 1) "name"
2) "lily"
3) "age"
4) "24"
以上是XREAD
的非阻塞形式。注意COUNT
选项并不是必需的,实际上这个命令唯一强制的选项是STREAMS
,指定了一组key以及调用者已经看到的每个Stream相应的最大ID,以便该命令仅向客户端提供ID大于我们指定ID的消息。
在上面的命令中,我们写了STREAMS mystream 0
,所以我们想要消息流mystream中所有ID大于0-0的消息。正如你在上面的例子中所看到的,命令返回了键名,因为实际上可以通过传入多个key来同时从不同的Stream中读取数据。例如:STREAMS mystream otherstream 0 0
。注意在STREAMS
选项后面,我们需要提供键名称,以及之后的ID。
> XADD otherstream * name AAA
"1651124345974-0"
> XREAD streams mystream otherstream 0 0
1) 1) "mystream"
2) 1) 1) "1651118123780-0"
2) 1) "name"
2) "tom"
3) "age"
4) "13"
2) 1) "1651118131124-0"
2) 1) "name"
2) "lily"
3) "age"
4) "24"
3) 1) "1651118219219-0"
2) 1) "name"
2) "lisa"
3) "age"
4) "47"
2) 1) "otherstream"
2) 1) 1) "1651124345974-0"
2) 1) "name"
2) "AAA"
除了XREAD
可以同时访问多个Stream这一事实,以及能够指定我们拥有的最后一个ID来获取之后的新消息,在个简单的形式中,这个命令并没有做什么跟XRANGE
有太大区别的事情。然而,有趣的部分是我们可以通过指定BLOCK
参数,轻松地将XREAD 变成一个 阻塞命令:
> XREAD BLOCK 0 STREAMS mystream $
在另一个终端中,给这个mystream增加消息:
> XADD mystream * name bob age 10
"1651124679102-0"
则在刚刚BLOCK
阻塞的命令中,会看到新增加的消息:
> XREAD BLOCK 0 STREAMS mystream $
1) 1) "mystream"
2) 1) 1) "1651124679102-0"
2) 1) "name"
2) "bob"
3) "age"
4) "10"
(66.55s)
在上面的例子中,除了移除COUNT
以外,指定了新的BLOCK
选项,超时时间为0毫秒(意味着永不超时)。此外,我并没有给流 mystream传入一个常规的ID,而是传入了一个特殊的ID$
。这个特殊的ID意思是XREAD
应该使用流 mystream已经存储的最大ID作为最后一个ID。以便我们仅接收从我们开始监听时间以后的新消息。这在某种程度上相似于Unix命令tail -f。
请注意当使用BLOCK选项时,我们不必使用特殊ID $
。我们可以使用任意有效的ID。如果命令能够立即处理我们的请求而不会阻塞,它将执行此操作,否则它将阻止。
通常如果我们想要从新的条目开始消费Stream,我们以$
开始,接着继续使用接收到的最后一条消息的ID来发起下一次请求。
XREAD
的阻塞形式同样可以监听多个Stream,只需要指定多个键名即可。如果请求可以同步提供,因为至少有一个流的元素大于我们指定的相应ID,则返回结果。否则,该命令将阻塞并将返回获取新数据的第一个流的项目(根据提供的ID)。
XGROUP
该命令用于管理流数据结构关联的消费者组。使用XGROUP你可以:
- 创建与流关联的新消费者组。
- 销毁一个消费者组。
- 从消费者组中移除指定的消费者。
- 将消费者组的最后交付ID设置为其他内容。
要创建一个新的消费者组,语法如下:
XGROUP [CREATE key groupname id-or-$] [SETID key id-or-$] [DESTROY key groupname] [DELCONSUMER key groupname consumername]
> XGROUP CREATE mystream mygroup $
OK
目前还不能为不存在的Stream创建消费者组,所以需要有一个已经存在的Stream。
当创建一个消费者组的时候,我们必须指定一个ID,在这个例子中ID是$
。这是必要的,因为消费者组在其他状态中必须知道在第一个消费者连接时接下来要服务的消息,即消费者组创建完成时的最后消息ID是什么。我们就像上面例子一样,提供一个$
,那么只有从现在开始到达Stream的新消息才会被传递到消费者组中的消费者。
如果我们指定的消息ID是0,那么消费者组将会开始消费这个Stream中的所有历史消息。当然,你也可以指定任意其他有效的ID。
可以使用以下形式完全销毁消费者:
XGROUP DESTROY mystream mygroup
即使存在活动的消费者和待处理消息,消费者组也将被销毁,因此请确保仅在真正需要时才调用此命令。
要仅从消费者组中移除给定的消费者,使用以下命令格式:
XGROUP DELCONSUMER mystream mygroup myconsumer123
如果你希望消费者组中的消费者重新处理流中的所有消息,你可能希望将其下一个ID设置为0:
XGROUP SETID mystream mygroup 0
最后,如果您不记得语法,请使用HELP子命令:
> XGROUP HELP
1) XGROUP <subcommand> arg arg ... arg. Subcommands are:
2) CREATE <key> <groupname> <id or $> [opt] -- Create a new consumer group.
3) option MKSTREAM: create the empty stream if it does not exist.
4) SETID <key> <groupname> <id or $> -- Set the current group ID.
5) DESTROY <key> <groupname> -- Remove the specified group.
6) DELCONSUMER <key> <groupname> <consumer> -- Remove the specified consumer.
7) HELP -- Prints this help.
XREADGROUP
消费者组就像一个伪消费者,从流中获取数据,实际上为多个消费者提供服务,提供某些保证:
- 每条消息都提供给不同的消费者,因此不可能将相同的消息传递给多个消费者。
- 消费者在消费者组中通过名称来识别,该名称是实施消费者的客户必须选择的区分大小写的字符串。这意味着即便断开连接过后,消费者组仍然保留了所有的状态,因为客户端会重新申请成为相同的消费者。 然而,这也意味着由客户端提供唯一的标识符。
- 每一个消费者组都有一个第一个ID永远不会被消费的概念,这样一来,当消费者请求新消息时,它能提供以前从未传递过的消息。
- 消费消息需要使用特定的命令进行显式确认,表示:这条消息已经被正确处理了,所以可以从消费者组中逐出。
- 消费者组跟踪所有当前所有待处理的消息,也就是,消息被传递到消费者组的一些消费者,但是还没有被确认为已处理。由于这个特性,当访问一个Stream的历史消息的时候,每个消费者将只能看到传递给它的消息。
基本的消费者组所涉及的命令,具体如下:
- XGROUP 用于创建,摧毁或者管理消费者组。
- XREADGROUP 用于通过消费者组从一个Stream中读取。
- XACK 是允许消费者将待处理消息标记为已正确处理的命令。
消费者组创建好了,可以使用XREADGROUP
命令立即开始尝试通过消费者组读取消息。我们会从消费者那里读到,假设指定消费者分别是Alice和Bob,来看看系统会怎样返回不同消息给Alice和Bob。
XREADGROUP
和XREAD
非常相似,并且提供了相同的BLOCK
选项,除此以外还是一个同步命令。但是有一个强制的选项必须始终指定,那就是GROUP
,并且有两个参数:消费者组的名字,以及尝试读取的消费者的名字。选项COUNT
仍然是支持的,并且与XREAD
命令中的用法相同。
在开始从Stream中读取之前,先往里面放一些消息:
> XADD mystream * message AAA
"1651134692706-0"
> XADD mystream * message BBB
"1651134698155-0"
> XADD mystream * message CCC
"1651134707128-0"
> XADD mystream * message DDD
"1651134735969-0"
> XADD mystream * message EEE
"1651134744515-0"
现在是时候尝试使用消费者组读取了:
> XREADGROUP GROUP mygroup Alice COUNT 1 STREAMS mystream >
1) 1) "mystream"
2) 1) 1) "1651134692706-0"
2) 1) "message"
2) "AAA"
XREADGROUP
的响应内容就像XREAD
一样。但是请注意上面提供的GROUP <group-name> <consumer-name>
,这表示我想要使用消费者组mygroup从Stream中读取,我是消费者Alice。每次消费者使用消费者组中执行操作时,都必须要指定可以这个消费者组中唯一标识它的名字。
在以上命令行中还有另外一个非常重要的细节,在强制选项STREAMS
之后,键mystream请求的ID是特殊的ID >
。这个特殊的ID只在消费者组的上下文中有效,其意思是:消息到目前为止从未传递给其他消费者。
也可以指定一个真实的ID,比如0或者任何其他有效的ID,在这个例子中,我们请求XREADGROUP
只提供给我们历史待处理的消息,在这种情况下,将永远不会在组中看到新消息。所以基本上XREADGROUP
可以根据我们提供的ID有以下行为:
- 如果ID是特殊ID
>
,那么命令将会返回到目前为止从未传递给其他消费者的新消息,这有一个副作用,就是会更新消费者组的最后ID。 - 如果ID是任意其他有效的数字ID,那么命令将会让我们访问我们的历史待处理消息。即传递给这个指定消费者(由提供的名称标识)的消息集,并且到目前为止从未使用
XACK
进行确认。
XACK
该命令用于命令用于从流的消费者组的待处理条目列表(简称PEL)中删除一条或多条消息。 当一条消息交付到某个消费者时,它将被存储在PEL中等待处理, 这通常出现在作为调用XREADGROUP
命令的副作用,或者一个消费者通过调用XCLAIM
命令接管消息的时候。 待处理消息被交付到某些消费者,但是服务器尚不确定它是否至少被处理了一次。 因此对新调用XREADGROUP
来获取消费者的消息历史记录(比如用0作为ID)将返回此类消息。 类似地,待处理的消息将由检查PEL的XPENDING
命令列出。
一旦消费者成功地处理完一条消息,它应该调用XACK
,这样这个消息就不会被再次处理, 且作为一个副作用,关于此消息的PEL条目也会被清除,从Redis服务器释放内存。
语法如下:
XACK key group ID [ID …]
指定ID为0,不带任何COUNT选项:只会看到唯一的待处理消息,即关于AAA的消息:
> XREADGROUP GROUP mygroup Alice STREAMS mystream 0
1) 1) "mystream"
2) 1) 1) "1651134692706-0"
2) 1) "message"
2) "AAA"
如果确认这个消息已经处理,它将不再是历史待处理消息的一部分,因此系统将不再报告任何消息:
> XACK mystream mygroup 1651134692706-0
(integer) 1
> XREADGROUP GROUP mygroup Alice STREAMS mystream 0
1) 1) "mystream"
2) (empty list or set)
XACK
只是已处理的消息不再是我们可以访问的历史记录的一部分。
现在轮到Bob来读取:
> XREADGROUP GROUP mygroup bob COUNT 2 STREAMS mystream >
1) 1) "mystream"
2) 1) 1) "1651134698155-0"
2) 1) "message"
2) "BBB"
2) 1) "1651134707128-0"
2) 1) "message"
2) "CCC"
Bob要求最多两条消息,并通过同一消费者组mygroup读取。所以发生的是Redis仅报告新消息。正如你所看到的,消息AAA未被传递,因为它已经被传递给Alice,所以Bob获取到了BBB和CCC,以此类推。
这样,Alice和Bob以及这个消费者组中的任何其他消费者,都可以从相同的Stream中读取到不同的消息,读取他们尚未处理的历史消息,或者标记消息为已处理。这允许创建不同的拓扑和语义来从Stream中消费消息。
有几件事需要记住:
- 消费者是在他们第一次被提及的时候自动创建的,不需要显式创建。
- 即使使用
XREADGROUP
,你也可以同时从多个key中读取,但是要让其工作,你需要给每一个Stream创建一个名称相同的消费者组。 XREADGROUP
命令是一个写命令,因为当它从Stream中读取消息时,消费者组被修改了,所以这个命令只能在master节点调用。
XPENDING
检查待处理消息列表的接口,因此它是一个非常重要的命令,用于观察和了解消费者组正在发生的事情:哪些客户端是活跃的,哪些消息在等待消费,或者查看是否有空闲的消息。此外,该命令与XCLAIM一起使用,用于实现长时间故障的消费者的恢复,因此不处理某些消息:不同的消费者可以认领该消息并继续处理。语法如下:
XPENDING key group [start end count] [consumer]
上面的例子允许我们编写多个消费者参与同一个消费者组,每个消费者获取消息的一个子集进行处理,并且在故障恢复时重新读取各自的待处理消息。然而在现实世界中,消费者有可能永久地失败并且永远无法恢复。由于任何原因停止后,消费者的待处理消息会发生什么呢?
Redis的消费者组提供了一个专门针对这种场景的特性,用以认领给定消费者的待处理消息,这样一来,这些消息就会改变他们的所有者,并且被重新分配给其他消费者。这个特性是非常明确的,消费者必须检查待处理消息列表,并且必须使用特殊命令来认领特定的消息,否则服务器将把待处理的消息永久分配给旧消费者,这样不同的应用程序就可以选择是否使用这样的特性,以及使用它的方式。
这个过程的第一步是使用一个叫做XPENDING
的命令,这个命令提供消费者组中待处理条目的可观察性。这是一个只读命令,它总是可以安全地调用,不会改变任何消息的所有者。在最简单的形式中,调用这个命令只需要两个参数,即Stream的名称和消费者组的名称。
> XPENDING mystream mygroup
1) (integer) 2
2) "1651134698155-0"
3) "1651134707128-0"
4) 1) 1) "bob"
2) "2"
当以这种方式调用的时候,命令只会输出给定消费者组的待处理消息总数(在本例中是两条消息),所有待处理消息中的最小和最大的ID,最后是消费者列表和每个消费者的待处理消息数量。我们只有Bob有两条待处理消息,因为Alice请求的唯一一条消息已使用XACK确认了。
通过提供一个开始和结束ID(可以只是-和+,就像XRANGE
一样),以及一个控制命令返回的信息量的数字,可以了解有关待处理消息的更多信息。如果我们想要将输出限制为仅针对给定使用者组的待处理消息,可以使用最后一个可选参数,即消费者组的名称,但我们不会在以下示例中使用此功能。
> XPENDING mystream mygroup - + 10
1) 1) "1651134698155-0"
2) "bob"
3) (integer) 612540
4) (integer) 1
2) 1) "1651134707128-0"
2) "bob"
3) (integer) 612540
4) (integer) 1
现在我们有了每一条消息的详细信息:消息ID,消费者名称,空闲时间(单位是毫秒,意思是:自上次将消息传递给某个消费者以来经过了多少毫秒),以及每一条给定的消息被传递了多少次。我们有来自Bob的两条消息,它们空闲了612540毫秒,大概10分钟。因为刚刚Bob只读取了2条消息,所以count为10,也只能获取到2条。
检查第一条消息内容是什么,使用XRANGE
即可。
> XRANGE mystream 1651134698155-0 1651134698155-0
1) 1) "1651134698155-0"
2) 1) "message"
2) "BBB"
只需要在参数中重复两次相同的ID。
现在我们有了一些想法,Alice可能会根据过了20个小时仍然没有处理这些消息,来判断Bob可能无法及时恢复,所以现在是时候认领这些消息,并继续代替Bob处理了。为了做到这一点,我们使用XCLAIM
命令。
XCLAIM
在流的消费者组上下文中,此命令改变待处理消息的所有权, 因此新的所有者是在命令参数中指定的消费者。通常是这样的:
- 假设有一个具有关联消费者组的流。
- 某个消费者A在消费者组的上下文中通过XREADGROUP从流中读取一条消息。
- 作为读取消息的副作用,消费者组的待处理条目列表(PEL)中创建了一个待处理消息条目:这意味着这条消息已传递给给定的消费者,但是尚未通过XACK确认。
- 突然这个消费者出现故障,且永远无法恢复。
- 其他消费者可以使用XPENDING检查已经过时很长时间的待处理消息列表,为了继续处理这些消息,他们使用XCLAIM来获得消息的所有权,并继续处理。
这个命令在其完整形式中有很多选项,因为它用于复制消费者组的更改,但我们只使用我们通常需要的参数。
XCLAIM key group consumer min-idle-time ID [ID …] [IDLE ms] [TIME ms-unix-time] [RETRYCOUNT count] [force] [justid]
请注意,消息只有在其空闲时间大于我们通过XCLAIM指定的空闲时间的时才会被认领。 因为作为一个副作用,XCLAIM也会重置消息的空闲时间(因为这是处理消息的一次新尝试), 两个试图同时认领消息的消费者将永远不会成功:只有一个消费者能成功认领消息。 这避免了我们用微不足道的方式多次处理给定的消息(虽然一般情况下无法完全避免多次处理)。
此外,作为副作用,XCLAIM会增加消息的尝试交付次数。通过这种方式, 由于某些原因而无法处理的消息(例如因为消费者在尝试处理期间崩溃),将开始具有更大的计数器, 并可以在系统内部被检测到。
下面是命令执行的结果:
> XCLAIM mystream mygroup Alice 3600000 1651134698155-0
1) 1) "1651134698155-0"
2) 1) "message"
2) "BBB"
在上面的例子中,我们认领ID为1651134698155-0的消息,仅当消息闲置至少一小时且没有原始消费者或其他消费者进行推进(确认或认领它)时,并将所有权分配给消费者Alice。Alice成功认领了该消息,现在可以处理并确认消息,尽管原来的消费者还没有恢复,也能往前推动。
XINFO
缺乏可观察性的消息系统很难处理。不知道谁在消费消息,哪些消息待处理,不知道给定Stream的活跃消费者组的集合,使得一切都不透明。因此,Redis Stream和消费者组都有不同的方式来观察正在发生的事情。XPENDING
它允许我们检查在给定时刻正在处理的消息列表,以及它们的空闲时间和传递次数。
这个命令使用子命令来显示有关Stream和消费者组的状态的不同信息,比如使用XINFO STREAM可以报告关于Stream本身的信息。
语法:
XINFO [CONSUMERS key groupname] [GROUPS key] [STREAM key] [HELP]
> XINFO STREAM mystream
1) "length"
2) (integer) 9
3) "radix-tree-keys"
4) (integer) 1
5) "radix-tree-nodes"
6) (integer) 2
7) "groups"
8) (integer) 1
9) "last-generated-id"
10) "1651134744515-0"
11) "first-entry"
12) 1) "1651118123780-0"
2) 1) "name"
2) "tom"
3) "age"
4) "13"
13) "last-entry"
14) 1) "1651134744515-0"
2) 1) "message"
2) "EEE"
可以看到报告的信息是流的元素的数量,有关表示流的基数树的详细信息(主要用于优化和调试任务),与流关联的消费者组的数量,最后生成的ID(某些条目被删除时,此ID可能与最后一个条目的ID不同),最后显示了流中完整的第一个和最后一个条目,以便了解流的内容是什么。
另一个可用的信息是与这个Stream相关联的消费者组的数量。我们可以进一步挖掘有关消费者组的更多信息。
> XINFO GROUPS mystream
1) 1) "name"
2) "mygroup"
3) "consumers"
4) (integer) 2
5) "pending"
6) (integer) 2
7) "last-delivered-id"
8) "1651134707128-0"
XINFO
命令输出一系列键值对。对每一个列出的消费者组,该命令还显示该组中已知的消费者数量,以及该组中的待处理消息(已传递但尚未确认)数量。
因为这是一个可观察性命令,允许人类用户立即了解报告的信息,并允许命令通过添加更多字段来报告更多信息,而不会破坏与旧客户端的兼容性。其他更高带宽效率的命令,比如XPENDING
,只报告没有字段名称的信息。
上面例子中的输出(使用了子命令GROUPS
)应该能清楚地观察字段名称。我们可以通过检查在此类消费者组中注册的消费者,来更详细地检查特定消费者组的状态。
> XINFO CONSUMERS mystream mygroup
1) 1) "name"
2) "Alice"
3) "pending"
4) (integer) 1
5) "idle"
6) (integer) 41964096
2) 1) "name"
2) "bob"
3) "pending"
4) (integer) 1
5) "idle"
6) (integer) 54558647
可以看到这个消费者的空闲毫秒时间(最后一个字段)以及消费者名称和待处理消息数量。请注意,你不应该依赖字段的确切位置,也不应该依赖字段的数量,因为将来可能会增加新的字段。因此,表现良好的客户端应该获取整个列表,并将其报告给用户,例如,作为字典数据结构。
如果你不记得命令的语法,只需要查看命令本身的帮助:
> XINFO HELP
1) XINFO <subcommand> arg arg ... arg. Subcommands are:
2) CONSUMERS <key> <groupname> -- Show consumer groups of group <groupname>.
3) GROUPS <key> -- Show the stream consumer groups.
4) STREAM <key> -- Show information about the stream.
5) HELP -- Print this help.
XTRIM
设置Streams的上限,许多应用并不希望将数据永久收集到一个Stream。有时在Stream中指定一个最大项目数很有用,之后一旦达到给定的大小,将数据从Redis中移到不那么快的非内存存储是有用的,适合用来记录未来几十年的历史数据。Redis Stream对此有一定的支持。这就是XADD
命令的MAXLEN
选项,这个选项用起来很简单:
127.0.0.1:6379> XADD mystream MAXLEN 2 * value 1
"1651205020962-0"
127.0.0.1:6379> XADD mystream MAXLEN 2 * value 2
"1651205034177-0"
127.0.0.1:6379> XADD mystream MAXLEN 2 * value 3
"1651205040354-0"
127.0.0.1:6379> XLEN mystream
(integer) 2
127.0.0.1:6379> XRANGE mystream - +
1) 1) "1651205034177-0"
2) 1) "value"
2) "2"
2) 1) "1651205040354-0"
2) 1) "value"
2) "3"
如果使用MAXLEN
选项,当Stream的达到指定长度后,老的条目会自动被驱逐。
XTRIM命令它做的事情与上面讲到的MAXLEN选项非常相似,但是这个命令不需要添加任何其他参数,可以以独立的方式与Stream一起使用。
语法如下:
XTRIM key MAXLEN [~] count
XTRIM将流裁剪为指定数量的项目,如有需要,将驱逐旧的项目(ID较小的项目)。此命令被设想为接受多种修整策略,但目前只实现了一种,即MAXLEN,并且与XADD中的MAXLEN选项完全相同。
例如,下面的命令会将流裁剪到最新的1000个项目:
> XTRIM mystream MAXLEN 1000
可以使用以下特殊形式提供命令,以提高其效率:
> XTRIM mysteam MAXLEN ~ 1000
在选项MAXLEN
和实际计数中间的参数~
的意思是,用户不是真的需要精确的1000个项目。它可以多几十个条目,但决不能少于1000个。通过使用这个参数,仅当我们移除整个节点的时候才执行修整。这使得命令更高效,而且这也是我们通常想要的。
与Kafka(TM)分区的差异
Redis Stream的消费者组可能类似于基于Kafka(TM)分区的消费者组,但是要注意Redis Stream实际上非常不同。分区仅仅是逻辑的,并且消息只是放在一个Redis键中,因此不同客户端的服务方式取决于谁准备处理新消息,而不是从哪个分区客户端读取。例如,如果消费者C3在某一点永久故障,Redis会继续服务C1和C2,将新消息送达,就像现在只有两个逻辑分区一样。
类似地,如果一个给定的消费者在处理消息方面比其他消费者快很多,那么这个消费者在相同单位时间内按比例会接收更多的消息。这是有可能的,因为Redis显式地追踪所有未确认的消息,并且记住了谁接收了哪些消息,以及第一条消息的ID从未传递给任何消费者。
但是,这也意味着在Redis中,如果你真的想把同一个Stream的消息分区到不同的Redis实例中,你必须使用多个key和一些分区系统,比如Redis集群或者特定应用程序的分区系统。单个Redis Stream不会自动分区到多个实例上。
我们可以说,以下是正确的:
- 如果你使用一个Stream对应一个消费者,则消息是按顺序处理的。
- 如果你使用N个Stream对应N个消费者,那么只有给定的消费者hits N个Stream的子集,你可以扩展上面的模型来实现。
- 如果你使用一个Stream对应多个消费者,则对N个消费者进行负载平衡,但是在那种情况下,有关同一逻辑项的消息可能会无序消耗,因为给定的消费者处理消息3可能比另一个消费者处理消息4要快。
- 所以基本上Kafka分区更像是使用了N个不同的Redis键。而Redis消费者组是一个将给定Stream的消息负载均衡到N个不同消费者的服务端负载均衡系统。
持久化,复制和消息安全性
与任何其他Redis数据结构一样,Stream会异步复制到从节点,并持久化到AOF和RDB文件中。但可能不那么明显的是,消费者组的完整状态也会传输到AOF,RDB和从节点,因此如果消息在主节点是待处理的状态,在从节点也会是相同的信息。同样,节点重启后,AOF文件会恢复消费者组的状态。
但是请注意,Redis Stream和消费者组使用Redis默认复制来进行持久化和复制,所以:
- 如果消息的持久性在您的应用程序中很重要,则AOF必须与强大的fsync策略一起使用。
- 默认情况下,异步复制不能保证复制
XADD
命令或者消费者组的状态更改:在故障转移后,可能会丢失某些内容,具体取决于从节点从主节点接收数据的能力。 WAIT
命令可以用于强制将更改传输到一组从节点上。但请注意,虽然这使得数据不太可能丢失,但由Sentinel或Redis群集运行的Redis故障转移过程仅执行尽力检查以故障转移到最新的从节点,并且在某些特定故障下可能会选举出缺少一些数据的从节点。 因此,在使用Redis Stream和消费者组设计应用程序时,确保了解你的应用程序在故障期间应具有的语义属性,并进行相应地配置,评估它是否足够安全地用于您的用例。
参考:
http://www.redis.cn/topics/streams-intro.html
http://www.redis.cn/commands/xgroup.html
http://www.redis.cn/commands/xack.html
http://www.redis.cn/commands/xclaim.html
http://www.redis.cn/commands/xinfo.html
http://www.redis.cn/commands/xtrim.html