广告
返回顶部
首页 > 资讯 > 数据库 >详解Redis用链表实现消息队列
  • 858
分享到

详解Redis用链表实现消息队列

队列详解链表 2022-06-04 17:06:27 858人浏览 独家记忆
摘要

前言 Redis链表经常会被用于消息队列的服务,以完成多程序之间的消息交换。个人认为redis消息队列有一个好处,就是可以实现分布式和共享,就和memcache作为Mysql的缓存和mysql自带的缓存一样

前言

Redis链表经常会被用于消息队列的服务,以完成多程序之间的消息交换。个人认为redis消息队列有一个好处,就是可以实现分布式和共享,就和memcache作为Mysql缓存mysql自带的缓存一样。

链表实现消息队列

Redis链表支持前后插入以及前后取出,所以如果往尾部插入元素,往头部取出元素,这就是一种消息队列,也可以说是消费者/生产者模型。可以利用lpush和rpop来实现。但是有一个问题,如果链表中没有数据,那么消费者将要在while循环中调用rpop,这样以来就浪费cpu资源,好在Redis提供一种阻塞版pop命令brpop或者blpop,用法为brpop/blpop list timeout, 当链表为空的时候,brpop/blpop将阻塞,直到设置超时时间到或者list插入一个元素。

用法如下:


charles@charles-Aspire-4741:~/mydir/mylib/redis$ ./src/redis-cli
127.0.0.1:6379> lpush list hello
(integer) 1
127.0.0.1:6379> brpop list 0
1) "list"
2) "hello"
127.0.0.1:6379> brpop list 0
//阻塞在这里

//当我在另一个客户端lpush一个元素之后,客户端输出为
127.0.0.1:6379> brpop list 0
1) "list"
2) "world"
(50.60s)//阻塞的时间

当链表为空的时候,brpop是阻塞的,等待超时时间到或者另一个客户端lpush一个元素。接下来,看下源码是如何实现阻塞brpop命令的。要实现客户端阻塞,只需要服务器不给客户端发送消息,那么客户端就会阻塞在read调用中,等待消息到达。这是很好实现的,关键是如何判断这个客户端阻塞的链表有数据到达以及通知客户端解除阻塞?Redis的做法是,将阻塞的键以及阻塞在这个键上的客户端链表存储在一个字典中,然后每当向数据库插入一个链表时,就判断这个新插入的链表是否有客户端阻塞,有的话,就解除这个阻塞的客户端,并且发送刚插入链表元素给客户端,客户端就这样解除阻塞。

先看下有关数据结构,以及server和client有关属性


//阻塞状态
typedef struct blockingState {
 
 mstime_t timeout;  
 
 dict *keys;    
 robj *target;   
 
 int numreplicas;  
 long long reploffset; 
} blockingState;
//继续列表
typedef struct readyList {
 redisDb *db;//就绪键所在的数据库
 robj *key;//就绪键
} readyList;
//客户端有关属性
typedef struct redisClient {
 int btype;    
 blockingState bpop;  
}
//服务器有关属性
struct redisServer {
  
 unsigned int bpop_blocked_clients; 
 list *unblocked_clients; 
 list *ready_keys;  
}
//数据库有关属性
typedef struct redisDb {
  //keys->redisCLient映射
  dict *blocking_keys;  
 dict *ready_keys;   
}redisDB

必须对上述的数据结构足够了解,否则很难看懂下面的代码,因为这些代码需要操作上述的数据结构。先从brpop命令执行函数开始分析,brpop命令执行函数为


void brpopCommand(redisClient *c) {
 blockingPopGenericCommand(c,REDIS_TaiL);
}
//++++++++++++++++++++++++++++++++++++++++++++++++++
void blockingPopGenericCommand(redisClient *c, int where) {
 robj *o;
 mstime_t timeout;
 int j;
 if (getTimeoutFromObjectOrReply(c,c->argv[c->arGC-1],&timeout,UNIT_SECONDS)
  != REDIS_OK) return;//将超时时间保存在timeout中
 for (j = 1; j < c->argc-1; j++) {
  o = lookupKeyWrite(c->db,c->argv[j]);//在数据库中查找操作的链表
  if (o != NULL) {//如果不为空
   if (o->type != REDIS_LIST) {//不是链表类型
    addReply(c,shared.wrongtypeerr);//报错
    return;
   } else {
    if (listTypeLength(o) != 0) {//链表不为空
     
     char *event = (where == REDIS_HEAD) ? "lpop" : "rpop";
     robj *value = listTypePop(o,where);//从链表中pop出一个元素
     redisAssert(value != NULL);
     //给客户端发送pop出来的元素信息
     addReplyMultiBulkLen(c,2);
     addReplyBulk(c,c->argv[j]);
     addReplyBulk(c,value);
     decrRefCount(value);
     notifyKeyspaceEvent(REDIS_NOTIFY_LIST,event,
          c->argv[j],c->db->id);
     if (listTypeLength(o) == 0) {//如果链表为空,从数据库删除链表
      dbDelete(c->db,c->argv[j]);
      notifyKeyspaceEvent(REDIS_NOTIFY_GENERIC,"del",
           c->argv[j],c->db->id);
     }
     
    }
   }
  }
 }
  
  blockForKeys(c, c->argv + 1, c->argc - 2, timeout, NULL);
}

从源码可以看出,brpop可以操作多个链表变量,例如brpop list1 list2 0,但是只能输出第一个有元素的链表。如果list1没有元素,而list2有元素,则输出list2的元素;如果两个都有元素,则输出list1的元素;如果都没有元素,则等待其中某个链表插入一个元素,之后在2返回。最后调用blockForyKeys阻塞


void blockForKeys(redisClient *c, robj **keys, int numkeys, mstime_t timeout, robj *target) {
 dictEntry *de;
 list *l;
 int j;
 c->bpop.timeout = timeout;//超时时间赋值给客户端blockingState属性
 c->bpop.target = target;//这属性适用于brpoplpush命令的输入对象,如果是brpop, //则target为空
 if (target != NULL) incrRefCount(target);//不为空,增加引用计数
 for (j = 0; j < numkeys; j++) {
  
  if (dictAdd(c->bpop.keys,keys[j],NULL) != DICT_OK) continue;
  incrRefCount(keys[j]);
  
  //将阻塞的key和客户端添加进c->db->blocking_keys
  de = dictFind(c->db->blocking_keys,keys[j]);
  if (de == NULL) {
   int retval;
   
   l = listCreate();
   retval = dictAdd(c->db->blocking_keys,keys[j],l);
   incrRefCount(keys[j]);
   redisAssertWithInfo(c,keys[j],retval == DICT_OK);
  } else {
   l = dictGetVal(de);
  }
  listAddnodeTail(l,c);//添加到阻塞键的客户点链表中
 }
 blockClient(c,REDIS_BLOCKED_LIST);//设置客户端阻塞标志
}

blockClient函数只是简单的设置客户端属性,如下


void blockClient(redisClient *c, int btype) {
 c->flags |= REDIS_BLOCKED;//设置标志
 c->btype = btype;//阻塞操作类型
 server.bpop_blocked_clients++;
}

由于这个函数之后,brpop命令执行函数就结束了,由于没有给客户端发送消息,所以客户端就阻塞在read调用中。那么如何解开客户端的阻塞了?

插入一个元素解阻塞

任何指令的执行函数都是在processCommand函数中调用call函数,然后在call函数中调用命令执行函数,lpush也一样。当执行完lpush之后,此时链表不为空,回到processCommand调用中,执行以下语句


if (listLength(server.ready_keys))
   handleClientsBlockedOnLists();

这两行代码是先检查server.ready_keys是否为空,如果不为空,说明已经有一些就绪的链表,此时可以判断是否有客户端阻塞在这个键值上,如果有,则唤醒;现在问题又来了,这个server.ready_keys在哪更新链表了?

原来是在dbAdd函数中,当往数据库中添加的值类型为REDIS-LIST时,这时就要调用signalListAsReady函数将链表指针添加进server.ready_keys:


//db.c
void dbAdd(redisDb *db, robj *key, robj *val) {
 sds copy = sdsdup(key->ptr);
 int retval = dictAdd(db->dict, copy, val);//将数据添加进数据库
 redisAssertWithInfo(NULL,key,retval == REDIS_OK);
 //判断是否为链表类型,如果是,调用有链表已经ready函数
 if (val->type == REDIS_LIST) signalListAsReady(db, key);
 if (server.cluster_enabled) slotToKeyAdd(key);
 }
//t_list.c
void signalListAsReady(redisDb *db, robj *key) {
 readyList *rl;
 
 if (dictFind(db->blocking_keys,key) == NULL) return;
 
 if (dictFind(db->ready_keys,key) != NULL) return;
 
 rl = zmalloc(sizeof(*rl));
 rl->key = key;
 rl->db = db;
 incrRefCount(key);
 listAddNodeTail(server.ready_keys,rl);//添加链表末尾
 
 incrRefCount(key);
 //同时将这个阻塞键放入db->ready_keys
 redisAssert(dictAdd(db->ready_keys,key,NULL) == DICT_OK);
}

OK,这时server.ready_keys上已经有就绪键了,这时就调用processCommand函数中的handleClientsBlockedOnLists()函数来处理阻塞客户端,在这个函数中,


void handleClientsBlockedOnLists(void) {
 while(listLength(server.ready_keys) != 0) {
  list *l;
  
  l = server.ready_keys;
  server.ready_keys = listCreate();
  
  while(listLength(l) != 0) {
   listNode *ln = listFirst(l);//获取第一个就绪readyList
   readyList *rl = ln->value;
   
   dictDelete(rl->db->ready_keys,rl->key);
   
   robj *o = lookupKeyWrite(rl->db,rl->key);
   if (o != NULL && o->type == REDIS_LIST) {
    dictEntry *de;
    
    de = dictFind(rl->db->blocking_keys,rl->key);
    if (de) {
     list *clients = dictGetVal(de);//转换为客户端链表
     int numclients = listLength(clients);
     while(numclients--) {//给每个客户端发送消息
      listNode *clientnode = listFirst(clients);
      redisClient *receiver = clientnode->value;//阻塞的客户端
      robj *dsTKEy = receiver->bpop.target;//brpoplpush命令目的链表
      int where = (receiver->lastcmd &&
          receiver->lastcmd->proc == blpopCommand) ?
         REDIS_HEAD : REDIS_TAIL;//获取取出的方向
      robj *value = listTypePop(o,where);//取出就绪链表的元素
      if (value) {
       
       if (dstkey) incrRefCount(dstkey);
       unblockClient(receiver);//设置客户端为非阻塞状态
       if (serveClientBlockedOnList(receiver,
        rl->key,dstkey,rl->db,value,
        where) == REDIS_ERR)
       {
        
         listTypePush(o,value,where);
       }//给客户端回复链表中的元素内容
       if (dstkey) decrRefCount(dstkey);
       decrRefCount(value);
      } else {
       break;
      }
     }
    }
    //如果链表为空,则从数据库中删除
    if (listTypeLength(o) == 0) dbDelete(rl->db,rl->key);
    
   }
   
   decrRefCount(rl->key);
   zfree(rl);
   listDelNode(l,ln);
  }
  listRelease(l); 
 }
}

从这个源码可知,如果有两个客户端,同时阻塞在一个链表上面,那么如果链表插入一个元素之后,只有先阻塞的那个客户端收到消息,后面阻塞的那个客户端继续阻塞,这也是先阻塞先服务的思想。handleClientsBlockedOnLists函数调用了unblockClient(receiver) ,该函数功能为接触客户端阻塞标志,以及找到db阻塞在key上的客户端链表,并将接触阻塞的客户端从链表删除。然后调用serveClientBlockOnList给客户端回复刚在链表插入的元素。


int serveClientBlockedOnList(redisClient *receiver, robj *key, robj *dstkey, redisDb *db, robj *value, int where)
{
 robj *argv[3];
 if (dstkey == NULL) {
  
  argv[0] = (where == REDIS_HEAD) ? shared.lpop :
           shared.rpop;
  argv[1] = key;
  propagate((where == REDIS_HEAD) ?
   server.lpopCommand : server.rpopCommand,
   db->id,argv,2,REDIS_PROPAGATE_AOF|REDIS_PROPAGATE_REPL);
  
  addReplyMultiBulkLen(receiver,2);
  addReplyBulk(receiver,key);
  addReplyBulk(receiver,value);
 } else {
  
   
 }
}

propagate函数主要是将命令信息发送给aof和slave。函数中省略部分是brpoplpush list list1 0命令的目的链表list1非空时,将从list链表pop出来的元素插入list1中。当给客户端发送消息之后,客户端就从read函数调用中返回,变为不阻塞。

通过超时时间解阻塞

如果链表一直没有数据插入,那么客户端将会一直阻塞下去,这肯定是不行的,所以brpop还支持超时阻塞,即阻塞时间超过一定值之后,服务器返回一个空值,这样客户端就解脱阻塞了。

对于时间超时,都放在了100ms执行一次的时间事件中;超时解脱阻塞函数也在serverCron中;在serverCron->clientsCron->clientsCronHandleTimeout


int clientsCronHandleTimeout(redisClient *c, mstime_t now_ms) {
 time_t now = now_ms/1000;
 //..........
 else if (c->flags & REDIS_BLOCKED) {
  
  if (c->bpop.timeout != 0 && c->bpop.timeout < now_ms) {
   
   replyToBlockedClientTimedOut(c);
   unblockClient(c);
  }
 }
 //.............

把这个函数不相干的代码删除,主要部分先判断这个客户端是否阻塞,如果是,超时时间是否到期,如果是,则调用replyToBlockedClientTimedOut给客户端回复一个空回复,以及接触客户端阻塞。

总结

链表消息队列实现暂时分析到这了,大家都学会了吗?希望这篇文章给大家能带来一定的帮助,如果有疑问可以留言交流。

您可能感兴趣的文档:

--结束END--

本文标题: 详解Redis用链表实现消息队列

本文链接: https://www.lsjlt.com/news/11918.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

本篇文章演示代码以及资料文档资料下载

下载Word文档到电脑,方便收藏和打印~

下载Word文档
猜你喜欢
  • 详解Redis用链表实现消息队列
    前言 Redis链表经常会被用于消息队列的服务,以完成多程序之间的消息交换。个人认为redis消息队列有一个好处,就是可以实现分布式和共享,就和memcache作为mysql的缓存和mysql自带的缓存一样...
    99+
    2022-06-04
    队列 详解 链表
  • 详解Redis Stream做消息队列
    目录ListPub/subStreamConsumer Grouplast_delivered_idpending_idscurdpending_ids如何避免消息丢失嵌入SpringBoot注册Redis s...
    99+
    2022-09-23
  • 详解redis是如何实现队列消息的ack
    前言 由于公司提供的队列实在太过于蛋疼而且还限制不能使用其他队列,但为了保证数据安全性需要一个可以有ack功能的队列。 原生的redis中通过L/R PUSH/POP方式来实现队列的功能,这个当然是没办法满...
    99+
    2022-06-04
    队列 如何实现 详解
  • Redis中如何实现消息队列和延时消息队列
    这篇文章将为大家详细讲解有关Redis中如何实现消息队列和延时消息队列,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。list的几个命令lpush (left push)由...
    99+
    2022-10-19
  • redis中队列消息实现应用解耦
    本篇文章给大家分享的是有关redis中队列消息实现应用解耦,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。1、如果redis服务器挂掉了怎么办啊...
    99+
    2022-10-18
  • redis怎么实现消息队列
    Redis可以通过以下几种方式实现消息队列:1. List数据结构:使用Redis的List数据结构实现简单的消息队列。生产者将消息...
    99+
    2023-09-14
    redis
  • 如何使用redis实现消息队列
    使用redis实现消息队列的示例:redis的pubsub功能实现发布订阅模式,代码:import redisclass Task(object):def __init__(self):self.rcon = redis.StrictRed...
    99+
    2022-10-24
  • redis实现消息队列的方法
    这期内容当中的小编将会给大家带来有关redis实现消息队列的方法,以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。消息队列,Message Queue,常用于解决并发系统中的资源一致性问题...
    99+
    2022-10-18
  • Java实现Redis延时消息队列
    目录什么是延时任务 延时任务的特点 实现思路: 代码实现 1.消息模型2.RedisMq 消息队列实现类3.消息生产者 4.消息消费者 5. 消息执接口 6. 任务类型的实现类:可以...
    99+
    2022-11-12
  • redis stream 实现消息队列的实践
    目录Redis 实现消息对列4中方法发布订阅list 队列zset 队列Stream 队列基本命令xadd 生产消息读取消息xgroup 消费者组xreadgroup 消费消息Pending 等待列表消息确认消息转移信息...
    99+
    2022-08-10
    redisstream消息队列 redis消息队列
  • ThinkPHP 使用 think-queue 实现 redis 消息队列(超详细)
    简单介绍: 消息队列中间件是大型系统中的重要组件,已经逐渐成为企业系统内部通信的核心手段。它具有松耦合、异步消息、流量削峰、可靠投递、广播、流量控制、最终一致性等一系列功能,已经成为异步RPC的主要手...
    99+
    2023-09-04
    redis php
  • Redis怎么使用ZSET实现消息队列
    这篇文章主要介绍了Redis怎么使用ZSET实现消息队列的相关知识,内容详细易懂,操作简单快捷,具有一定借鉴价值,相信大家阅读完这篇Redis怎么使用ZSET实现消息队列文章都会有所收获,下面我们一起来看看吧。1.redis 用zset做消...
    99+
    2023-07-05
  • Redis如何实现消息队列功能
    Redis如何实现消息队列功能随着互联网的发展,消息队列在分布式系统中变得越来越重要。消息队列允许不同的应用程序之间通过异步通信来传递和处理消息,提高了系统的可伸缩性和可靠性。Redis作为一款快速、可靠、灵活的内存数据库,也可以用来实现消...
    99+
    2023-11-07
    redis 实现 消息队列
  • Redis消息队列怎么实现秒杀
    要实现秒杀功能,可以使用Redis的消息队列来进行异步处理。下面是一种基本的实现方法:1. 准备工作:创建一个商品库存键值对,如"s...
    99+
    2023-10-11
    Redis
  • redis用list做消息队列的实现示例
    目录生产消息服务消费消息服务,定时任务日志测试leftPush消息入队,rightPop对应,消息出队。 rightPop(RedisConstant.MQ_LIST, 0L, Ti...
    99+
    2022-11-13
  • 怎么在springboot中用redis实现消息队列
    本篇内容主要讲解“怎么在springboot中用redis实现消息队列”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“怎么在springboot中用redis实现消息队列”吧!准备阶段安装redi...
    99+
    2023-06-19
  • Redis使用ZSET实现消息队列使用小结
    目录1.Redis 用zset做消息队列如何处理消息积压2.redis分片并使用zset做消息队列3. redis如何分片4. redis使用Java发送消息到zset队列并对消息进行分片处理5. redis使用zset...
    99+
    2023-03-19
    Redis使用ZSET实现消息队列 Redis消息队列
  • 利用Redis流怎么实现一个消息队列
    利用Redis流怎么实现一个消息队列?相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。代码清单 10-1 展示了一个具有基本功能的消息队列实现:代...
    99+
    2022-10-18
  • Redis 使用 List 实现消息队列的优缺点
    目录什么是消息队列消息队列满足哪些特性消息有序性重复消息处理可靠性List 实现消息队列LPUSHRPOP实时消费问题重复消费消息可靠性需要注意的是Redission 实战添加依赖J...
    99+
    2022-11-12
  • ThinkPHP怎么使用think-queue实现redis消息队列
    本篇内容主要讲解“ThinkPHP怎么使用think-queue实现redis消息队列”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“ThinkPHP怎么使用think-queue实现redis消...
    99+
    2023-07-02
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作