从一个或者多个流中读取数据,仅返回ID大于调用者报告的最后接收ID的条目。此命令有一个阻塞选项,用于等待可用的项目,类似于BRPOP
或者BZPOPMIN
等等。
请注意,在阅读本页之前,如果你不了解Stream,我们推荐先阅读我们的Redis Streams介绍。
非阻塞使用
如果未提供BLOCK选项,此命令是同步的,并可以认为与XRANGE
有些相关:它将会返回流中的一系列项目,但与XRANGE
相比它有两个基本差异(如果我们只考虑同步使用):
- 如果我们想要从多个键同时读取,则可以使用多个流调用此命令。这是
XREAD
的一个关键特性,因为特别是在使用BLOCK进行阻塞时,能够通过单个连接监听多个键是一个至关重要的特性。 XRANGE
返回一组ID中的项目,XREAD
更适合用于从第一个条目(比我们到目前为止看到的任何其他条目都要大)开始使用流。因此,我们传递给XREAD
的是,对于每个流,我们从该流接收的最后一个条目的ID。
例如,如果我有两个流mystream
和writers
,并且我希望同时从这两个流中读取数据(从它们的第一个元素开始),
我可以像下面这样调用XREAD
:
请注意:我们在例子中使用了COUNT选项,因此对于每一个流,调用将返回每个流最多两个元素。
> XREAD COUNT 2 STREAMS mystream writers 0-0 0-0
1) 1) "mystream"
2) 1) 1) 1526984818136-0
2) 1) "duration"
2) "1532"
3) "event-id"
4) "5"
5) "user-id"
6) "7782813"
2) 1) 1526999352406-0
2) 1) "duration"
2) "812"
3) "event-id"
4) "9"
5) "user-id"
6) "388234"
2) 1) "writers"
2) 1) 1) 1526985676425-0
2) 1) "name"
2) "Virginia"
3) "surname"
4) "Woolf"
2) 1) 1526985685298-0
2) 1) "name"
2) "Jane"
3) "surname"
4) "Austen"
STREAMS选项是强制的,并且必须是最后一个选项,因为此选项以下列格式获取可变长度的参数:
STREAMS key_1 key_2 key_3 ... key_N ID_1 ID_2 ID_3 ... ID_N
所以我们以一组流的key开始,并在后面跟着所有关联的ID,表示我们从该流中获取的最后ID,以便调用仅为我们提供同一流中具有更大ID的条目。
例如,在上面的例子中,我们从流mystream
中接收的最后项目的ID是1526999352406-0
,而对于流writers
,我们接收的最后项目的ID是1526985685298-0
。
要继续迭代这两个流,我将调用:
> XREAD COUNT 2 STREAMS mystream writers 1526999352406-0 1526985685298-0
1) 1) "mystream"
2) 1) 1) 1526999626221-0
2) 1) "duration"
2) "911"
3) "event-id"
4) "7"
5) "user-id"
6) "9488232"
2) 1) "writers"
2) 1) 1) 1526985691746-0
2) 1) "name"
2) "Toni"
3) "surname"
4) "Morris"
2) 1) 1526985712947-0
2) 1) "name"
2) "Agatha"
3) "surname"
4) "Christie"
以此类推,最终,调用不再返回任何项目,只返回一个空数组,然后我们就知道我们的流中没有更多数据可以获取了(我们必须重试该操作,因此该命令也支持阻塞模式)。
不完全ID
使用不完整的ID是有效的,就像它对XRANGE
一样有效。但是这里ID的序列号部分,如果缺少,将总是被解释为0,所以命令:
> XREAD COUNT 2 STREAMS mystream writers 0 0
完全等同于
> XREAD COUNT 2 STREAMS mystream writers 0-0 0-0
阻塞数据
在同步形式中,只要有更多可用项,该命令就可以获取新数据。但是,有些时候,我们不得不等待数据生产者使用XADD
向我们消费的流中推送新条目。为了避免使用固定或自适应间隔获取数据,如果命令根据指定的流和ID不能返回任何数据,则该命令能够阻塞,并且一旦请求的key之一接收了数据,就会自动解除阻塞。
重要的是需要理解这个命令是扇形分发到所有正在等待相同ID范围的客户端,因此每个消费者都将得到一份数据副本,这与使用阻塞列表pop操作时发生的情况不同。
为了阻塞,使用BLOCK选项,以及我们希望在超时前阻塞的毫秒数。通常Redis阻塞命令的超时时间单位是秒,但此命令拥有一个毫秒超时时间,虽然通常服务器的超时时间精度大概在0.1秒左右。这可以在某些用例中阻塞更短的时间,并且如果服务器内部结构随着时间的推移而改善,它的超时精度可能也会有提升。
当传递了BLOCK选项,但是在传递的流中没有任何流有数据返回,那么该命令是同步执行的,就像没有指定BLOCK选项一样。
这是阻塞调用的例子,其中命令稍后将返回空回复(nil
),因为超时时间已到但没有新数据到达:
> XREAD BLOCK 1000 STREAMS mystream 1526999626221-0
(nil)
特殊的ID$
有时阻塞我们只希望接收从我们阻塞的那一刻开始通过XADD
添加到流的条目。在这种情况下,我们对已经添加条目的历史不感兴趣。对于这个用例,我们不得不检查流的最大元素ID,并使在XREAD
命令行中使用这个ID。这不纯粹并且需要调用其他命令,因此可以使用特殊的ID$
来表明我们只想要流中的新条目。
你应该仅在第一次调用XREAD
时使用$
,理解这一点非常重要。后面的ID你应该使用前一次报告的项目中最后一项的ID,否则你将会丢失所有添加到这中间的条目。
这是典型的XREAD
调用(在想要仅消费新条目的消费者的第一次迭代)看起来的样子:
> XREAD BLOCK 5000 COUNT 100 STREAMS mystream $
一旦我们得到了一些回复,下一次调用会是像这样的:
> XREAD BLOCK 5000 COUNT 100 STREAMS mystream 1526999644174-3
依此类推。
如何在单个流上阻止多个客户端
列表或有序集合上的阻塞列表操作有pop行为。基本上,元素将从列表或有序集合中删除,以便返回给客户端。 在这种场景下,你希望项目以公平的方式被消费,具体取决于客户端在给定key上阻止的时刻到达。通常在这种用例中,Redis使用FIFO语义。
但请注意,对于流这不是问题:当服务客户端时,不会从流中删除条目,因此只要XADD
命令向流提供数据,就会提供给每个等待的客户端。
返回值
该命令返回一个结果数组:返回数组的每个元素都是一个由两个元素组成的数组(键名和为该键报告的条目)。报告的条目是完整的流条目,具有ID以及所有字段和值的列表。返回的条目及其字段和值的顺序与使用XADD
添加它们的顺序完全一致。
当使用BLOCK时,超时时将返回一个空回复(nil
)。
为了更多地了解流的整体行为和语义,强烈建议阅读Redis Streams介绍。