iis服务器助手广告广告
返回顶部
首页 > 资讯 > 数据库 >如何进行Redis5新特性中Streams作消息队列的分析
  • 607
分享到

如何进行Redis5新特性中Streams作消息队列的分析

2024-04-02 19:04:59 607人浏览 独家记忆
摘要

这期内容当中小编将会给大家带来有关如何进行Redis5新特性中Streams作消息队列的分析,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。前言Redis 5 新特性中,S

这期内容当中小编将会给大家带来有关如何进行Redis5新特性中Streams作消息队列的分析,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。

前言

Redis 5 新特性中,Streams 数据结构的引入,可以说它是在本次迭代中最大特性。它使本次 5.x 版本迭代中,Redis 作为消息队列使用时,得到更完善,更强大的原生支持,其中尤为明显的是持久化消息队列。同时,stream 借鉴了 kafka 的消费组模型概念和设计,使消费消息处理上更加高效快速。 数据结构中常用 api 进行分析。

准备

本文所使用 Redis 版本为 5.0.5 。如果使用更早的 5.x 版本,有些 API 使用效果,与本文中描述略有不同。

添加消息

Streams 添加数据使用 XADD 指令进行添加,消息中的数据以 K-V 键值对的形式进行操作。一条消息可以存在多个键值对,添加命令格式:

XADD key ID field string [field string ...]

其中 key 为 Streams 的名称,ID 为消息的唯一标志,不可重复,field string 就为键值对。下面我们就添加以 person 为名称的流,进行操作。

XADD person * name ytao des https://ytao.top

上面添加案例中,ID 使用 * 号复制,这里代表着服务端自动生成 Id,添加后返回数据 "1578238486193-0"

这里自动生成的 Id 格式为 <millisecondstime>-<sequencenumber> Id 是由两部分组成:

  1. 鸿蒙官方战略合作共建——HarmonyOS技术社区

  2.  millisecondsTime 为当前服务器时间毫秒时间戳。

  3.  sequenceNumber 当前序列号,取值来源于当前毫秒内,生成消息的顺序,默认从 0 开始加 1 递增。

比如:1578238486193-3 表示在 1578238486193 毫秒的时间戳时,添加的第 4 条消息。

除了服务端自动生成 Id 方式外,也支持指定 Id 的生成,但是指定 Id 有以下条件限制:

  1. 鸿蒙官方战略合作共建——HarmonyOS技术社区

  2.  Id 中的前后部分必须为数字。

  3.  最小 Id 为 0-1,不能为 0-0,但是 2-0,3-0 .... 是被允许的。

  4.  添加的消息,Id 的前半部分不能比存在 Id 最大的值小,Id 后半部分不能比存在前半部分相同的最大后半部分小。

否则,当不满足上述条件时,添加后会抛出异常:

(error) ERR The ID specified in XADD is equal or smaller than the target stream top item

实际上,当添加一条消息时,会进行两部操作。第一步,先判断如果不存在 Streams,则创建 Streams 的名称,再添加消息到 Streams 中。即使添加消息时,由于 Id 异常,也可以在 Redis 中存在以当前 Streams 的名称。 Streams 中 Id 也可作为指针使用,因为它是一个有序的标记。

生产中,如果这样使用添加消息,会存在一个问题,那就是消息数量太大时,会使服务宕机。这里 Streams 的设计初期也有考虑到这个问题,那就是可以指定 Streams 的容量。如果容量操作这个设定的值,就会对调旧的消息。在添加消息时,设置 MAXLEN 参数。

XADD person MAXLEN 5 * name ytao des Https://ytao.top

这样就指定该了 Streams 中的容量为 5 条消息。也可使用 XTRIM 截取消息,从小到大剔除多余的消息:

XTRIM person MAXLEN 8

消息数量

查看消息数量使用 XLEN 指令进行操作。

XLEN key

例:查看 person 流中的消息数量:

> XLEN person  (integer) 5

查询消息

查询 Streams 中的消息使用 XRANGE 和 XREVRANGE 指令。

XRANGE

查询数据时,可以按照指定 Id 范围进行查询,XRANGE 查询指令格式:

XRANGE key start end [COUNT count]

参数说明:

  •  key 为 Streams 的名称

  •  start 为范围查询开始 Id,包含本 Id。

  •  start 为范围查询结束 Id,包含本 Id。

  •  Count 为查询返回最大的消息数量,非必填。

这里 start 和 end 有-和+两个非指定值,他们分别表示无穷小和无穷大,所以当使用这个两个值时,会查询出全部的消息。

> XRANGE person - +  1) 1) "0-1"     2) 1) "name"        2) "ytao"        3) "des"        4) "https://ytao.top"  2) 1) "0-2"     2) 1) "name"        2) "luffy"        3) "des"        4) "valiant!"  3) 1) "2-0"     2) 1) "name"        2) "gaga"        3) "des"        4) "fishion!"

上面查询的消息数据,可以看到是按照先进先出的顺序查询出来的。

使用 COUNT 指定查询返回的数量:

# 查询所有的消息,并且返回一条数据  > XRANGE person - + COUNT 1  1) 1) "0-1"     2) 1) "name"        2) "ytao"        3) "des"        4) "https://ytao.top"

在范围查询中,Id 的后半部分可省略,后半部分中的数据会全部查询到。

XREVRANGE

XREVRANGE 的查询和 XRANGE 指令中的使用类似,但查询的 start 和 end 参数顺序进行了调换:

XREVRANGE key end start [COUNT count]

使用案例:

> XREVRANGE person +  -  1) 1) "2-0"     2) 1) "name"        2) "gaga"        3) "des"        4) "fishion!"  2) 1) "0-2"     2) 1) "name"        2) "luffy"        3) "des"        4) "valiant!"  3) 1) "0-1"     2) 1) "name"        2) "ytao"        3) "des"        4) "https://ytao.top"

查询后的结果与 XRANGE 的结果顺序刚好相反,其他都一样,这两个指令可进行消息的升序和降序的返回。

删除消息

删除消息使用 XDEL 指令操作,只需指定将要删除的 Streams 名称和 Id 即可,支持一次删除多个消息 。

XDEL key ID [ID ...]

删除案例:

# 查询所有消息  > XRANGE person - +  1) 1) "0-1"     2) 1) "name"        2) "ytao"        3) "des"        4) "https://ytao.top"  2) 1) "0-2"     2) 1) "name"        2) "luffy"        3) "des"        4) "valiant!"  3) 1) "2-0"     2) 1) "name"        2) "gaga"        3) "des"        4) "fishion!"  # 删除消息        > XDEL person 2-0  (integer) 1  # 再次查询删除后的所有消息  > XRANGE person - +  1) 1) "0-1"     2) 1) "name"        2) "ytao"        3) "des"        4) "https://ytao.top"  2) 1) "0-2"     2) 1) "name"        2) "luffy"        3) "des"        4) "valiant!"  # 查询删除后的长度        > XLEN person  (integer) 2

从上面可以看到,删除消息后,长度也会减少相应的数量。

消费消息

在 Redis 的 PUB/SUB 中,我们是通过订阅来消费消息,在 Streams 数据结构中,同样也能实现同等功能,当没有新的消息时,可进行阻塞等待。不仅支持单独消费,而且还可以支持群组消费。

单独消费

单独消费使用 XREAD 指令。可以看到,下面命令中,STREAMS,key, 以及 ID 为必填项。ID 表示将要读取大于该 ID 的消息。当 ID 值使用 $ 赋予时,表示已存在消息的最大 Id 值。

XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]

上面的 COUNT 参数用来指定读取的最大数量,与 XRANGE 的用法一样。

> XREAD COUNT 1 STREAMS person 0  1) 1) "person"     2) 1) 1) "0-1"           2) 1) "name"              2) "ytao"              3) "des"              4) "https://ytao.top"  > XREAD COUNT 2 STREAMS person 0  1) 1) "person"     2) 1) 1) "0-1"           2) 1) "name"              2) "ytao"              3) "des"              4) "https://ytao.top"        2) 1) "0-2"           2) 1) "name"              2) "luffy"              3) "des"              4) "valiant!"

在 XREAD 里面还有个 BLOCK 参数,这个是用来阻塞订阅消息的,BLOCK 携带的参数为阻塞时间,单位为毫秒,如果在这个时间内没有新的消息消费,那么就会释放该阻塞。当这里的时间指定为 0 时,会一直阻塞,直到有新的消息来消费到。

# 窗口 1 开启阻塞,等待新消息的到来  > XREAD BLOCK 0 STREAMS person $  # 另开一个连接窗口 2,添加一条新的消息  > XADD person 2-2 name tao des coder  "2-2"  # 窗口 1,获取到有新的消息来消费,并且带有阻塞的时间  > XREAD BLOCK 0 STREAMS person $  1) 1) "person"     2) 1) 1) "2-2"           2) 1) "name"              2) "tao"              3) "des"              4) "coder"  (60.81s)

当使用 XREAD 进行顺序消费时,需要额外记录下读取到位置的 Id,方便下次继续消费。

群组消费

群组消费的主要目的也就是为了分流消息给不同的客户端处理,以更高效的速率处理消息。为达到这一肝功能需求,我们需要做三件事:创建群组,群组读取消息,向服务端确认消息以处理。

群组操作

操作群组使用 XGROUP 指令:

XGROUP [CREATE key groupname id-or-$] [SETID key id-or-$] [DESTROY key groupname] [DELCONSUMER key groupname consumername]

上面命令中,包含操作有:

  •  CREATE 创建消费组。

  •  SETID 修改下一个处理消息的 Id。

  •  DESTROY 销毁消费组。

  •  DELCONSUMER 删除消费组中指定的消费者。

我们当前需要使用的是创建消费组:

# 以当前存在的最大 Id 作为消费起始   > XGROUP CREATE person group1 $  OK

群组读取消息

群组读取使用 XREADGROUP 指令,COUNT和BLOCK的使用类似 XREAD 的操作,只是多了个群组和消费者的指定:

XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]

由于群组消费和单独消费类似,这里只进行个阻塞分析,这里 Id 也有个特殊值>,表示还未进行消费的消息:

# 窗口 1,消费群组中,taotao 消费者建立阻塞监听  XREADGROUP GROUP group1 taotao BLOCK 0 STREAMS person >  # 窗口 2,消费群组中,yangyang 消费者建立阻塞监听   XREADGROUP GROUP group1 yangyang BLOCK 0 STREAMS person >  # 窗口 3,添加消费消息  > XADD person 3-1 name tony des 666  "3-1"  # 窗口 1,读取到新消息,此时 窗口 2 没有任何反应  > XREADGROUP GROUP group1 taotao BLOCK 0 STREAMS person >  1) 1) "person"     2) 1) 1) "3-1"           2) 1) "name"              2) "tony"              3) "des"              4) "666"  (77.54s)  # 窗口 3,再次添加消费消息  > XADD person 3-2 name james des abc!  "3-2"  # 窗口 2,读取到新消息,此时 窗口 1 没有任何反应  > XREADGROUP GROUP group1 yangyang BLOCK 0 STREAMS person >  1) 1) "person"     2) 1) 1) "3-2"           2) 1) "name"              2) "james"              3) "des"              4) "abc!"  (76.36s)

以上执行流程中,group1 群组中有两个消费者,当添加两条消息后,这两个消费者轮流消费。

消息ACK

消息消费后,为避免再次重复消费,这是需要向服务端发送 ACK,确保消息被消费后的标记。 例如下列情况,我们上面我们将最新两条消息已进行了消费,但是当我们再次读取消息时,还是被读到:

>  XREADGROUP GROUP group1 yangyang STREAMS person 0  1) 1) "person"     2) 1) 1) "3-2"           2) 1) "name"              2) "james"              3) "des"              4) "abc!"

这时,我们使用 XACK 指令告诉服务器,我们已处理的消息:

XACK key group ID [ID ...]0

让服务器标记 3-2 已处理:

> XACK person group1 3-2  (integer) 1

再次获取群组读取消息:

>  XREADGROUP GROUP group1 yangyang STREAMS person 0  1) 1) "person"     2) (empty list or set)

队列中没有了可读消息。 除了上面以讲解到的 API 外,查看消费群组信息可使用 XINFO 指令查看。

上面对 Streams 常用 API 进行了分析,我们可以感受到 Redis 在消息队列支持的道路上,也越来越强大。如果使用过它的 PUB/SUB 功能的话,就会感受到 5.x 迭代正是将你的一些痛点进行了优化

上述就是小编为大家分享的如何进行Redis5新特性中Streams作消息队列的分析了,如果刚好有类似的疑惑,不妨参照上述分析进行理解。如果想知道更多相关知识,欢迎关注编程网数据库频道。

您可能感兴趣的文档:

--结束END--

本文标题: 如何进行Redis5新特性中Streams作消息队列的分析

本文链接: https://www.lsjlt.com/news/68319.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

本篇文章演示代码以及资料文档资料下载

下载Word文档到电脑,方便收藏和打印~

下载Word文档
猜你喜欢
  • oracle怎么查询当前用户所有的表
    要查询当前用户拥有的所有表,可以使用以下 sql 命令:select * from user_tables; 如何查询当前用户拥有的所有表 要查询当前用户拥有的所有表,可以使...
    99+
    2024-05-14
    oracle
  • oracle怎么备份表中数据
    oracle 表数据备份的方法包括:导出数据 (exp):将表数据导出到外部文件。导入数据 (imp):将导出文件中的数据导入表中。用户管理的备份 (umr):允许用户控制备份和恢复过程...
    99+
    2024-05-14
    oracle
  • oracle怎么做到数据实时备份
    oracle 实时备份通过持续保持数据库和事务日志的副本来实现数据保护,提供快速恢复。实现机制主要包括归档重做日志和 asm 卷管理系统。它最小化数据丢失、加快恢复时间、消除手动备份任务...
    99+
    2024-05-14
    oracle 数据丢失
  • oracle怎么查询所有的表空间
    要查询 oracle 中的所有表空间,可以使用 sql 语句 "select tablespace_name from dba_tablespaces",其中 dba_tabl...
    99+
    2024-05-14
    oracle
  • oracle怎么创建新用户并赋予权限设置
    答案:要创建 oracle 新用户,请执行以下步骤:以具有 create user 权限的用户身份登录;在 sql*plus 窗口中输入 create user identified ...
    99+
    2024-05-14
    oracle
  • oracle怎么建立新用户
    在 oracle 数据库中创建用户的方法:使用 sql*plus 连接数据库;使用 create user 语法创建新用户;根据用户需要授予权限;注销并重新登录以使更改生效。 如何在 ...
    99+
    2024-05-14
    oracle
  • oracle怎么创建新用户并赋予权限密码
    本教程详细介绍了如何使用 oracle 创建一个新用户并授予其权限:创建新用户并设置密码。授予对特定表的读写权限。授予创建序列的权限。根据需要授予其他权限。 如何使用 Oracle 创...
    99+
    2024-05-14
    oracle
  • oracle怎么查询时间段内的数据记录表
    在 oracle 数据库中查询指定时间段内的数据记录表,可以使用 between 操作符,用于比较日期或时间的范围。语法:select * from table_name wh...
    99+
    2024-05-14
    oracle
  • oracle怎么查看表的分区
    问题:如何查看 oracle 表的分区?步骤:查询数据字典视图 all_tab_partitions,指定表名。结果显示分区名称、上边界值和下边界值。 如何查看 Oracle 表的分区...
    99+
    2024-05-14
    oracle
  • oracle怎么导入dump文件
    要导入 dump 文件,请先停止 oracle 服务,然后使用 impdp 命令。步骤包括:停止 oracle 数据库服务。导航到 oracle 数据泵工具目录。使用 impdp 命令导...
    99+
    2024-05-14
    oracle
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作