广告
返回顶部
首页 > 资讯 > 数据库 >如何使用Redis的streams
  • 678
分享到

如何使用Redis的streams

2024-04-02 19:04:59 678人浏览 独家记忆
摘要

这篇文章主要介绍“如何使用Redis的streams”,在日常操作中,相信很多人在如何使用Redis的streams问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”如何使用Re

这篇文章主要介绍“如何使用Redis的streams”,在日常操作中,相信很多人在如何使用Redis的streams问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”如何使用Redis的streams”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!

起源

在 Redis 4.0 中引入模块之后,用户开始考虑他们自己怎么去修复这些问题。其中一个用户 Timothy Downs 通过 IRC 和我说道:

\<forkfork> 我计划给这个模块增加一个事务日志式的数据类型 &mdash;&mdash; 这意味着大量的订阅者可以在不导致 redis 内存激增的情况下做一些像发布/订阅那样的事情\<forkfork> 订阅者持有他们在消息队列中的位置,而不是让 Redis 必须维护每个消费者的位置和为每个订阅者复制消息

他的思路启发了我。我想了几天,并且意识到这可能是我们马上同时解决上面所有问题的契机。我需要去重新构思 “日志”  的概念是什么。日志是个基本的编程元素,每个人都使用过它,因为它只是简单地以追加模式打开一个文件,并以一定的格式写入数据。然而 Redis  数据结构必须是抽象的。它们在内存中,并且我们使用内存并不是因为我们懒,而是因为使用一些指针,我们可以概念化数据结构并把它们抽象,以使它们摆脱明确的限制。例如,一般来说日志有几个问题:偏移不是逻辑化的,而是真实的字节偏移,如果你想要与条目插入的时间相关的逻辑偏移应该怎么办?我们有范围查询可用。同样,日志通常很难进行垃圾回收:在一个只能进行追加操作的数据结构中怎么去删除旧的元素?好吧,在我们理想的日志中,我们只需要说,我想要数字***的那个条目,而旧的元素一个也不要,等等。

当我从 Timothy 的想法中受到启发,去尝试着写一个规范的时候,我使用了 Redis 集群中的 radix 树去实现,优化了它内部的某些部分。这为实现一个有效利用空间的日志提供了基础,而且仍然有可能在对数时间logarithmic time内访问范围。同时,我开始去读关于 kafka 的流相关的内容以获得另外的灵感,它也非常适合我的设计,***借鉴了 Kafka消费组consumer groups的概念,并且再次针对  Redis 进行优化,以适用于 Redis  在内存中使用的情况。然而,该规范仅停留在纸面上,在一段时间后我几乎把它从头到尾重写了一遍,以便将我与别人讨论的所得到的许多建议一起增加到  Redis 升级中。我希望 Redis 流能成为对于时间序列有用的特性,而不仅是一个常见的事件和消息类的应用程序。

让我们写一些代码吧

从 Redis 大会回来后,整个夏天我都在实现一个叫 listpack 的库。这个库是 ziplist.c 的继任者,那是一个表示在单个分配中的字符串元素列表的数据结构。它是一个非常特殊的序列化格式,其特点在于也能够以逆序(从右到左)解析:以便在各种用例中替代 ziplists。

结合 radix 树和 listpacks 的特性,它可以很容易地去构建一个空间高效的日志,并且还是可索引的,这意味着允许通过 ID  和时间进行随机访问。自从这些就绪后,我开始去写一些代码以实现流数据结构。我还在完成这个实现,不管怎样,现在在 GitHub 上的 Redis 的  streams 分支里它已经可以跑起来了。我并没有声称那个 api 是 100%  的最终版本,但是,这有两个有意思的事实:一,在那时只有消费群组是缺失的,加上一些不太重要的操作流的命令,但是,所有的大的方面都已经实现了。二,一旦各个方面比较稳定了之后,我决定大概用两个月的时间将所有的流的特性向后移植backport到  4.0 分支。这意味着 Redis 用户想要使用流,不用等待 Redis 4.2  发布,它们在生产环境马上就可用了。这是可能的,因为作为一个新的数据结构,几乎所有的代码改变都出现在新的代码里面。除了阻塞列表操作之外:该代码被重构了,我们对于流和列表阻塞操作共享了相同的代码,而极大地简化了  Redis 内部实现。

教程:欢迎使用 Redis 的 streams

在某些方面,你可以认为流是 Redis 列表的一个增强版本。流元素不再是一个单一的字符串,而是一个字段fieldvalue组成的对象。范围查询更适用而且更快。在流中,每个条目都有一个 ID,它是一个逻辑偏移量。不同的客户端可以阻塞等待blocking-wait比指定的 ID 更大的元素。Redis 流的一个基本的命令是 XADD。是的,所有的 Redis 流命令都是以一个 X 为前缀的。

> XADD mystream * sensor-id 1234 temperature 10.51506871964177.0

这个 XADD 命令将追加指定的条目作为一个指定的流 &mdash;&mdash; “mystream” 的新元素。上面示例中的这个条目有两个字段:sensor-idtemperature,每个条目在同一个流中可以有不同的字段。使用相同的字段名可以更好地利用内存。有意思的是,字段的排序是可以保证顺序的。XADD 仅返回插入的条目的 ID,因为在第三个参数中是星号(*),表示由命令自动生成 ID。通常这样做就够了,但是也可以去强制指定一个 ID,这种情况用于复制这个命令到服务器slave serverAOFappend-only file文件。

这个 ID 是由两部分组成的:一个毫秒时间和一个序列号。1506871964177 是毫秒时间,它只是一个毫秒级的 UNIX 时间戳。圆点(.)后面的数字 0  是一个序号,它是为了区分相同毫秒数的条目增加上去的。这两个数字都是 64  位的无符号整数。这意味着,我们可以在流中增加所有想要的条目,即使是在同一毫秒中。ID 的毫秒部分使用 Redis 服务器的当前本地时间生成的  ID 和流中的***一个条目 ID 两者间的***的一个。因此,举例来说,即使是计算机时间回跳,这个 ID  仍然是增加的。在某些情况下,你可以认为流条目的 ID 是完整的 128  位数字。然而,事实上它们与被添加到的实例的本地时间有关,这意味着我们可以在毫秒级的精度的范围随意查询。

正如你想的那样,快速添加两个条目后,结果是仅一个序号递增了。我们可以用一个 MULTI/EXEC 块来简单模拟“快速插入”:

> MULTioK> XADD mystream * foo 10QUEUED> XADD mystream * bar 20QUEUED> EXEC1) 1506872463535.02) 1506872463535.1

在上面的示例中,也展示了无需指定任何初始模式schema的情况下,对不同的条目使用不同的字段。会发生什么呢?就像前面提到的一样,只有每个块(它通常包含  50-150  个消息内容)的***个消息被使用。并且,相同字段的连续条目都使用了一个标志进行了压缩,这个标志表示与“它们与这个块中的***个条目的字段相同”。因此,使用相同字段的连续消息可以节省许多内存,即使是字段集随着时间发生缓慢变化的情况下也很节省内存。

为了从流中检索数据,这里有两种方法:范围查询,它是通过 XRANGE 命令实现的;流播streaming,它是通过 XREAD 命令实现的。XRANGE 命令仅取得包括从开始到停止范围内的全部条目。因此,举例来说,如果我知道它的 ID,我可以使用如下的命名取得单个条目:

> XRANGE mystream 1506871964177.0 1506871964177.01) 1) 1506871964177.0   2) 1) "sensor-id"      2) "1234"      3) "temperature"      4) "10.5"

不管怎样,你都可以使用指定的开始符号 - 和停止符号 + 表示最小和***的 ID。为了限制返回条目的数量,也可以使用 COUNT 选项。下面是一个更复杂的 XRANGE 示例:

> XRANGE mystream - + COUNT 21) 1) 1506871964177.0   2) 1) "sensor-id"      2) "1234"      3) "temperature"      4) "10.5"2) 1) 1506872463535.0   2) 1) "foo"      2) "10"

这里我们讲的是 ID 的范围,然后,为了取得在一个给定时间范围内的特定范围的元素,你可以使用 XRANGE,因为 ID 的“序号” 部分可以省略。因此,你可以只指定“毫秒”时间即可,下面的命令的意思是:“从 UNIX 时间 1506872463 开始给我 10 个条目”:

127.0.0.1:6379> XRANGE mystream 1506872463000 + COUNT 101) 1) 1506872463535.0   2) 1) "foo"      2) "10"2) 1) 1506872463535.1   2) 1) "bar"      2) "20"

关于 XRANGE 需要注意的最重要的事情是,假设我们在回复中收到 ID,随后连续的 ID 只是增加了序号部分,所以可以使用 XRANGE 遍历整个流,接收每个调用的指定个数的元素。Redis 中的*SCAN 系列命令允许迭代 Redis 数据结构,尽管事实上它们不是为迭代设计的,但这样可以避免再犯相同的错误。

使用 XREAD 处理流播:阻塞新的数据

当我们想通过 ID 或时间去访问流中的一个范围或者是通过 ID 去获取单个元素时,使用 XRANGE 是非常***的。然而,在使用流的案例中,当数据到达时,它必须由不同的客户端来消费时,这就不是一个很好的解决方案,这需要某种形式的汇聚池pooling。(对于 某些 应用程序来说,这可能是个好主意,因为它们仅是偶尔连接查询的)

XREAD 命令是为读取设计的,在同一个时间,从多个流中仅指定我们从该流中得到的***条目的  ID。此外,如果没有数据可用,我们可以要求阻塞,当数据到达时,就解除阻塞。类似于阻塞列表操作产生的效果,但是这里并没有消费从流中得到的数据,并且多个客户端可以同时访问同一份数据。

这里有一个典型的 XREAD 调用示例:

> XREAD BLOCK 5000 STREAMS mystream otherstream $ $

它的意思是:从 mystreamotherstream 取得数据。如果没有数据可用,阻塞客户端 5000 毫秒。在 STREAMS 选项之后指定我们想要监听的关键字,***的是指定想要监听的 ID,指定的 ID 为 $ 的意思是:假设我现在需要流中的所有元素,因此,只需要从下一个到达的元素开始给我。

如果我从另一个客户端发送这样的命令:

> XADD otherstream * message “Hi There”

XREAD 侧会出现什么情况呢?

1) 1) "otherstream"   2) 1) 1) 1506935385635.0         2) 1) "message"            2) "Hi There"

与收到的数据一起,我们也得到了数据的关键字。在下次调用中,我们将使用接收到的***消息的 ID:

> XREAD BLOCK 5000 STREAMS mystream otherstream $ 1506935385635.0

依次类推。然而需要注意的是使用方式,客户端有可能在一个非常大的延迟之后再次连接(因为它处理消息需要时间,或者其它什么原因)。在这种情况下,期间会有很多消息堆积,为了确保客户端不被消息淹没,以及服务器不会因为给单个客户端提供大量消息而浪费太多的时间,使用 XREADCOUNT 选项是非常明智的。

流封顶

目前看起来还不错&hellip;&hellip;然而,有些时候,流需要删除一些旧的消息。幸运的是,这可以使用 XADD 命令的 MAXLEN 选项去做:

> XADD mystream MAXLEN 1000000 * field1 value1 field2 value2

它是基本意思是,如果在流中添加新元素后发现消息数量超过了 1000000 个,那么就删除旧的消息,以便于元素总量重新回到 1000000 以内。它很像是在列表中使用的 RPUSH + LTRIM,但是,这次我们是使用了一个内置机制去完成的。然而,需要注意的是,上面的意思是每次我们增加一个新的消息时,我们还需要另外的工作去从流中删除旧的消息。这将消耗一些 CPU 资源,所以在计算 MAXLEN 之前,尽可能使用 ~ 符号,以表明我们不要求非常 精确 的 1000000 个消息,就是稍微多一些也不是大问题:

> XADD mystream MAXLEN ~ 1000000 * foo bar

这种方式的 XADD 仅当它可以删除整个节点的时候才会删除消息。相比普通的 XADD,这种方式几乎可以自由地对流进行封顶。

消费组(开发中)

这是***个 Redis 中尚未实现而在开发中的特性。灵感也是来自 Kafka,尽管在这里是以不同的方式实现的。重点是使用了 XREAD,客户端也可以增加一个 GROUP <name> 选项。相同组的所有客户端将自动得到 不同的 消息。当然,同一个流可以被多个组读取。在这种情况下,所有的组将收到流中到达的消息的相同副本。但是,在每个组内,消息是不会重复的。

当指定组时,能够指定一个 RETRY <milliseconds> 选项去扩展组:在这种情况下,如果消息没有通过 XACK 进行确认,它将在指定的毫秒数后进行再次投递。这将为消息投递提供更佳的可靠性,这种情况下,客户端没有私有的方法将消息标记为已处理。这一部分也正在开发中。

内存使用和节省加载时间

因为用来建模 Redis 流的设计,内存使用率是非常低的。这取决于它们的字段、值的数量和长度,对于简单的消息,每使用 100MB  内存可以有几百万条消息。此外,该格式设想为需要极少的序列化:listpack 块以 radix  树节点方式存储,在磁盘上和内存中都以相同方式表示的,因此它们可以很轻松地存储和读取。例如,Redis 可以在 0.3 秒内从 RDB 文件中读取  500 万个条目。这使流的复制和持久存储非常高效。

我还计划允许从条目中间进行部分删除。现在仅实现了一部分,策略是在条目在标记中标识条目为已删除,并且,当已删除条目占全部条目的比例达到指定值时,这个块将被回收重写,如果需要,它将被连到相邻的另一个块上,以避免碎片化。

到此,关于“如何使用Redis的streams”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注编程网网站,小编会继续努力为大家带来更多实用的文章!

您可能感兴趣的文档:

--结束END--

本文标题: 如何使用Redis的streams

本文链接: https://www.lsjlt.com/news/71670.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

本篇文章演示代码以及资料文档资料下载

下载Word文档到电脑,方便收藏和打印~

下载Word文档
猜你喜欢
  • 如何使用Redis的streams
    这篇文章主要介绍“如何使用Redis的streams”,在日常操作中,相信很多人在如何使用Redis的streams问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”如何使用Re...
    99+
    2022-10-19
  • c# Async streams的使用解析
    目录本文我将回顾分享异步迭代C#8.0  Asynchronous streams附加思考:产生一个有意思的迭代器本文我将回顾分享 foreach/yield r...
    99+
    2022-11-12
  • Node中的streams流的具体使用
    目录Node中的streams流转换流创建对象模式最后Node中的streams流 streams流是Node中的最好的特性之一。它在我们的开发过程当中可以帮助我们做很多事情。比如通...
    99+
    2022-11-13
  • Redis 中使用 list,streams,pub/sub 几种方式实现消息队列的问题
    目录使用 Redis 实现消息队列基于List的消息队列分析下源码实现基于 Streams 的消息队列分析下源码实现stream 的结构streamCG 消费者组streamCons...
    99+
    2022-11-13
  • 如何使用redis
    这期内容当中小编将会给大家带来有关如何使用redis ,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。安装redis后,在命令行输入“redis-cli"会车输入...
    99+
    2022-10-18
  • Redis中的Bitmap如何使用
    今天小编给大家分享一下Redis中的Bitmap如何使用的相关知识点,内容详细,逻辑清晰,相信大部分人都还太了解这方面的知识,所以分享这篇文章给大家参考一下,希望大家阅读完这篇文章后有所收获,下面我们一起来了解一下吧。在日常开发过程中,经常...
    99+
    2023-06-30
  • Lumen如何使用Redis
    这篇文章主要介绍了Lumen如何使用Redis,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。1. 安装扩展要使用redis必须安装两个扩展&...
    99+
    2022-10-18
  • Redis如何使用pipeline
    这篇文章主要介绍Redis如何使用pipeline,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!使用 pipelineRedis 是基于请求-响应模型的 TCP 服务器。意味着单次请求 RTT(往返时间),取决于当前...
    99+
    2023-06-27
  • Redis如何使用HyperLogLog的实现
    目录1. 概述2. 什么是基数3. 命令3.1 PFADD3.2 PFCOUNT3.3 PFMERGE1. 概述 Redis 在 2.8.9 版本添加了 HyperLogLog 数据...
    99+
    2022-11-13
  • redis中的opsForList().range()如何使用
    这篇“redis中的opsForList().range()如何使用”文章的知识点大部分人都不太理解,所以小编给大家总结了以下内容,内容详细,步骤清晰,具有一定的借鉴价值,希望大家阅读完这篇文章能有所收获,下面我们一起来看看这篇“redis...
    99+
    2023-07-05
  • redis中zset如何使用
    这篇文章将为大家详细讲解有关redis中zset如何使用,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。Redis中zset是set的一个升级版本,他在set的基础上增加了...
    99+
    2022-10-18
  • 如何使用Redis协议
    如何使用Redis协议?相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。redis协议解析数据的过程主要依赖于redis的协议了。我们写个简单例子...
    99+
    2022-10-18
  • Bump中如何使用Redis
    这篇文章将为大家详细讲解有关Bump中如何使用Redis,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。  Bump的Redis怎么用  1.将Redis用作...
    99+
    2022-10-18
  • Redis内存如何使用
    Redis内存如何使用,很多新手对此不是很清楚,为了帮助大家解决这个难题,下面小编将为大家详细讲解,有这方面需求的人可以来学习下,希望你能有所收获。  Redis内存怎么使用  如果 Redis 使用的内存...
    99+
    2022-10-18
  • Redis如何安装使用
    这篇文章主要介绍了Redis如何安装使用,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。 一、Redis基础部分:  1、red...
    99+
    2022-10-18
  • Redis之Jedis如何使用
    这篇文章主要讲解了“Redis之Jedis如何使用”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“Redis之Jedis如何使用”吧!1.Jedis的介绍Je...
    99+
    2022-10-19
  • redis中如何使用scan
    这篇文章主要为大家展示了“redis中如何使用scan”,内容简而易懂,条理清晰,希望能够帮助大家解决疑惑,下面让小编带领大家一起研究并学习一下“redis中如何使用scan”这篇文章吧。 ...
    99+
    2022-10-19
  • ThinkPHP5中如何使用redis
    目录配置redis使用string(字符串)Hash(哈希)List(列表)Set(集合)zset(有序集合)总结前提:因为本文主要围绕着在thinkPHP5中使用redis的,所以...
    99+
    2023-05-14
    ThinkPHP5使用redis ThinkPHP5 redis
  • springboot中如何使用redis
    这篇文章将为大家详细讲解有关springboot中如何使用redis,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。正文很多时候,我们会在springboot中配置redis,但是就那么几个配置就配好了,没...
    99+
    2023-05-30
    springboot redis
  • redis集合如何使用
    Redis集合是一个无序的、唯一的、字符串集合,它提供了添加、删除、查询、判断元素是否存在等操作。下面是一些常用的Redis集合操作...
    99+
    2023-08-30
    redis
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作