iis服务器助手广告广告
返回顶部
首页 > 资讯 > 后端开发 > GO >使用go实现一个超级mini的消息队列的示例代码
  • 146
分享到

使用go实现一个超级mini的消息队列的示例代码

miniGO队列示例消息队列 2022-06-07 20:06:33 146人浏览 泡泡鱼
摘要

目录前言目的设计协议队列broker删除消息生产者消费者启动总结前言 趁着有空余时间,就想着撸一个mini的生产-消费消息队列,说干就干了。自己是个javer,这次实现,特意

目录

前言

目的

设计

协议

队列

broker

删除消息

生产者

消费者

启动

总结

前言

趁着有空余时间,就想着撸一个mini的生产-消费消息队列,说干就干了。自己是个javer,这次实现,特意换用了Go。没错,是零基础上手go,顺便可以学学go。

前置知识:

go基本语法

消息队列概念,也就三个:生产者、消费者、队列

目的

没想着实现多复杂,因为时间有限,就mini就好,mini到什么程度呢

使用双向链表数据结构作为队列

有多个topic可供生产者生成消息和消费者消费消息

支持生产者并发

支持消费者读,且ok后,从队列删除

消息不丢失(持久化)

高性能(先这样想)

设计

整体架构

协议

通讯协议底层使用tcpMQ是基于tcp自定义了一个协议,协议如下


type Msg struct {
   Id int64
   TopicLen int64
   Topic string
   // 1-consumer 2-producer 3-comsumer-ack 4-error
   MsgType int64 // 消息类型
   Len int64 // 消息长度
   Payload []byte // 消息
}

Payload使用字节数组,是因为不管数据是什么,只当做字节数组来处理即可。Msg承载着生产者生产的消息,消费者消费的消息,ACK、和错误消息,前两者会有负载,而后两者负载和长度都为空

协议的编解码处理,就是对字节的处理,接下来有从字节转为Msg,和从Msg转为字节两个函数


func BytesToMsg(reader io.Reader) Msg {
   m := Msg{}
   var buf [128]byte
   n, err := reader.Read(buf[:])
   if err != nil {
      fmt.Println("read failed, err:", err)
   }
   fmt.Println("read bytes:", n)
   // id
   buff := bytes.NewBuffer(buf[0:8])
   binary.Read(buff, binary.LittleEndian, &m.Id)
   // topiclen
   buff = bytes.NewBuffer(buf[8:16])
   binary.Read(buff, binary.LittleEndian, &m.TopicLen)
   // topic
   msgLastIndex := 16 + m.TopicLen
   m.Topic = string(buf[16: msgLastIndex])
   // msgtype
   buff = bytes.NewBuffer(buf[msgLastIndex : msgLastIndex + 8])
   binary.Read(buff, binary.LittleEndian, &m.MsgType)
   buff = bytes.NewBuffer(buf[msgLastIndex : msgLastIndex + 16])
   binary.Read(buff, binary.LittleEndian, &m.Len)
   if m.Len <= 0 {
      return m
   }
   m.Payload = buf[msgLastIndex + 16:]
   return m
}
func MsgToBytes(msg Msg) []byte {
   msg.TopicLen = int64(len([]byte(msg.Topic)))
   msg.Len = int64(len([]byte(msg.Payload)))
   var data []byte
   buf := bytes.NewBuffer([]byte{})
   binary.Write(buf, binary.LittleEndian, msg.Id)
   data = append(data, buf.Bytes()...)
   buf = bytes.NewBuffer([]byte{})
   binary.Write(buf, binary.LittleEndian, msg.TopicLen)
   data = append(data, buf.Bytes()...)
   data = append(data, []byte(msg.Topic)...)
   buf = bytes.NewBuffer([]byte{})
   binary.Write(buf, binary.LittleEndian, msg.MsgType)
   data = append(data, buf.Bytes()...)
   buf = bytes.NewBuffer([]byte{})
   binary.Write(buf, binary.LittleEndian, msg.Len)
   data = append(data, buf.Bytes()...)
   data = append(data, []byte(msg.Payload)...)
   return data
}
队列

使用container/list,实现先入先出,生产者在队尾写,消费者在队头读取


package broker
import (
   "container/list"
   "sync"
)
type Queue struct {
   len int
   data list.List
}
var lock sync.Mutex
func (queue *Queue) offer(msg Msg) {
   queue.data.PushBack(msg)
   queue.len = queue.data.Len()
}
func (queue *Queue) poll() Msg{
   if queue.len == 0 {
      return Msg{}
   }
   msg := queue.data.Front()
   return msg.Value.(Msg)
}
func (queue *Queue) delete(id int64) {
   lock.Lock()
   for msg := queue.data.Front(); msg != nil; msg = msg.Next() {
      if msg.Value.(Msg).Id == id {
         queue.data.Remove(msg)
         queue.len = queue.data.Len()
         break
      }
   }
   lock.Unlock()
}

方法offer往队列里插入数据,poll从队列头读取数据素,delete根据消息ID从队列删除数据。这里使用Queue结构体对List进行封装,其实是有必要的,List作为底层的数据结构,我们希望隐藏更多的底层操作,只给客户提供基本的操作
delete操作是在消费者消费成功且发送ACK后,对消息从队列里移除的,因为消费者可以多个同时消费,所以这里进入临界区时加(em,加锁是否就一定会影响对性能有较大的影响呢)

broker

broker作为服务器角色,负责接收连接,接收和响应请求


package broker
import (
   "bufio"
   "net"
   "os"
   "sync"
   "time"
)
var topics = sync.Map{}
func handleErr(conn net.Conn)  {
   defer func() {
      if err := recover(); err != nil {
         println(err.(string))
         conn.Write(MsgToBytes(Msg{MsgType: 4}))
      }
   }()
}
func Process(conn net.Conn) {
   handleErr(conn)
   reader := bufio.NewReader(conn)
   msg := BytesToMsg(reader)
   queue, ok := topics.Load(msg.Topic)
   var res Msg
   if msg.MsgType == 1 {
      // comsumer
      if queue == nil || queue.(*Queue).len == 0{
         return
      }
      msg = queue.(*Queue).poll()
      msg.MsgType = 1
      res = msg
   } else if msg.MsgType == 2 {
      // producer
      if ! ok {
         queue = &Queue{}
         queue.(*Queue).data.Init()
         topics.Store(msg.Topic, queue)
      }
      queue.(*Queue).offer(msg)
      res = Msg{Id: msg.Id, MsgType: 2}
   } else if msg.MsgType == 3 {
      // consumer ack
      if queue == nil {
         return
      }
      queue.(*Queue).delete(msg.Id)
   }
   conn.Write(MsgToBytes(res))
}

MsgType等于1时,直接消费消息;MsgType等于2时是生产者生产消息,如果队列为空,那么还需创建一个新的队列,放在对应的topic下;MsgType等于3时,代表消费者成功消费,可以

删除消息

我们说消息不丢失,这里实现不完全,我就实现了持久化(持久化也没全部实现)。思路就是该topic对应的队列里的消息,按协议格式进行序列化,当broker启动时,从文件恢复
持久化需要考虑的是增量还是全量,需要保存多久,这些都会影响实现的难度和性能(想想kafkaRedis的持久化),这里表示简单实现就好:定时器定时保存


func Save()  {
   ticker := time.NewTicker(60)
   for {
      select {
      case <-ticker.C:
         topics.Range(func(key, value interface{}) bool {
            if value == nil {
               return false
            }
            file, _ := os.Open(key.(string))
            if file == nil {
               file, _ = os.Create(key.(string))
            }
            for msg := value.(*Queue).data.Front(); msg != nil; msg = msg.Next() {
               file.Write(MsgToBytes(msg.Value.(Msg)))
            }
            _ := file.Close()
            return false
         })
      default:
         time.Sleep(1)
      }
   }
}

有一个问题是,当上面的delete操作时,这里的file文件需不需要跟着delete掉对应的消息?答案是需要删除的,如果不删除,只能等下一次的全量持久化来覆盖了,中间就有脏数据问题
下面是启动逻辑


package main
import (
   "awesomeProject/broker"
   "fmt"
   "net"
)
func main()  {
   listen, err := net.Listen("tcp", "127.0.0.1:12345")
   if err != nil {
      fmt.Print("listen failed, err:", err)
      return
   }
   go broker.Save()
   for {
      conn, err := listen.Accept()
      if err != nil {
         fmt.Print("accept failed, err:", err)
         continue
      }
      go broker.Process(conn)
   }
}
生产者

package main
import (
   "awesomeProject/broker"
   "fmt"
   "net"
)
func produce() {
   conn, err := net.Dial("tcp", "127.0.0.1:12345")
   if err != nil {
      fmt.Print("connect failed, err:", err)
   }
   defer conn.Close()
   msg := broker.Msg{Id: 1102, Topic: "topic-test",  MsgType: 2,  Payload: []byte("我")}
   n, err := conn.Write(broker.MsgToBytes(msg))
   if err != nil {
      fmt.Print("write failed, err:", err)
   }
   fmt.Print(n)
}
消费者

package main
import (
   "awesomeProject/broker"
   "bytes"
   "fmt"
   "net"
)
func comsume() {
   conn, err := net.Dial("tcp", "127.0.0.1:12345")
   if err != nil {
      fmt.Print("connect failed, err:", err)
   }
   defer conn.Close()
   msg := broker.Msg{Topic: "topic-test",  MsgType: 1}
   n, err := conn.Write(broker.MsgToBytes(msg))
   if err != nil {
      fmt.Println("write failed, err:", err)
   }
   fmt.Println("n", n)
   var res [128]byte
   conn.Read(res[:])
   buf := bytes.NewBuffer(res[:])
   receMsg := broker.BytesToMsg(buf)
   fmt.Print(receMsg)
   // ack
   conn, _ = net.Dial("tcp", "127.0.0.1:12345")
   l, e := conn.Write(broker.MsgToBytes(broker.Msg{Id: receMsg.Id, Topic: receMsg.Topic, MsgType: 3}))
   if e != nil {
      fmt.Println("write failed, err:", err)
   }
   fmt.Println("l:", l)
}

消费者这里ack时重新创建了连接,如果不创建连接的话,那服务端那里就需要一直从conn读取数据,直到结束。思考一下,像RabbitMQ的ack就有自动和手工的ack,如果是手工的ack,必然需要一个新的连接,因为不知道客户端什么时候发送ack,自动的话,当然可以使用同一个连接,but这里就简单创建一条新连接吧

启动

先启动broker,再启动producer,然后启动comsumer,OK,能跑,能实现发送消息到队列,从队列消费消息

总结

整体虽然简单,但毕竟是使用go实现的,就是看似一顿操作猛如虎,实质慌如狗。第一时间就被go的gopath和go mod困扰住,后面语法的使用,比如指针,传值传引用等,最头疼的就是类型转换,作为一个javer,使用go进行类型转换,着实被狠狠得虐了一番。

到此这篇关于使用go实现一个超级mini的消息队列的示例代码的文章就介绍到这了,更多相关go mini消息队列内容请搜索编程网以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程网! 


您可能感兴趣的文档:

--结束END--

本文标题: 使用go实现一个超级mini的消息队列的示例代码

本文链接: https://www.lsjlt.com/news/30725.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

本篇文章演示代码以及资料文档资料下载

下载Word文档到电脑,方便收藏和打印~

下载Word文档
猜你喜欢
  • PHP实现RabbitMQ消息列队的示例代码
    目录业务场景1、首先部署好thinkphp6框架2、安装workerman扩展3、生产者4、消费者5、整体测试业务场景 项目公司是主php做开发的,框架为thinkphp。众所周知,...
    99+
    2024-04-02
  • Java利用Redis实现消息队列的示例代码
    本文介绍了Java利用Redis实现消息队列的示例代码,分享给大家,具体如下:应用场景为什么要用redis?二进制存储、java序列化传输、IO连接数高、连接频繁一、序列化这里编写了一个java序列化的工具,主要是将对象转化为byt...
    99+
    2023-05-31
    java redis 消息队列
  • C++实现一个简单消息队列的示例详解
    目录前言一、如何实现1、接口定义2、用到的对象3、基本流程二、完整代码三、使用示例线程通信总结前言 消息队列在多线程的场景有时会用到,尤其是线程通信跨线程调用的时候,就可以使用消息队...
    99+
    2022-12-15
    C++实现消息队列 C++消息队列
  • redis用list做消息队列的实现示例
    目录生产消息服务消费消息服务,定时任务日志测试leftPush消息入队,rightPop对应,消息出队。 rightPop(RedisConstant.MQ_LIST, 0L, Ti...
    99+
    2024-04-02
  • 消息队列 RabbitMQ 与 Spring 整合使用的实例代码
    一、什么是 RabbitMQRabbitMQ 是实现 AMQP(高级消息队列协议)的消息中间件的一种,最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。消息中间件主要用于组件之间的解耦,消息的发送...
    99+
    2023-05-31
    rabbitmq spring
  • Java消息队列的简单实现代码
    今天看到我们的招聘信息有对消息队列有要求,然后就思索了一翻,网上一搜一大堆。我可以举个小例子先说明应用场景假设你的服务器每分钟的处理量为200个,但客户端再峰值的时候可能一分钟会发1000个消息给你,这时候你就可以把他做成队列,然后按正常有...
    99+
    2023-05-31
    java 消息队列 ava
  • Laravel实现队列的示例代码
    目录一:队列配置1:队列相关配置2:不同队列依赖二:创建队列任务三:任务分发1:默认分发2:延时分发3:指定队列分发4:指定驱动分发5:指定驱动和队列分发四:任务处理五:失败任务处理...
    99+
    2023-02-10
    Laravel实现队列 Laravel 队列
  • 使用PHP实现消息队列的开发
    随着现代互联网应用对高并发、高吞吐量和高可靠性的要求越来越高,消息队列作为一种异步解耦系统架构方式越来越被应用在互联网领域的各个方面。其原理是先将消息发送到消息队列中,等待异步消费,从而达到解耦的目的,提高系统的可扩展性与可维护性。在目前市...
    99+
    2023-05-25
    PHP 消息队列 开发
  • Java代码实现循环队列的示例代码
    循环队列结构 队列特点 队列为一种特殊的线性表,特殊之处在于它只允许在表的前端(front)进行删除操作,而在表的后端(rear)进行插入操作,和栈一样,队列是一种操作受...
    99+
    2024-04-02
  • python使用redis实现消息队列(异步)的实现完整例程
    目录安装相关库消息队列实现及使用创建配置文件代码实现最近在用fastapi框架开发web后端,由于近几年python异步编程大火,fastapi凭借高性能也火了起来。本篇介绍了在异步环境下实现Redis消息队列的方法,代...
    99+
    2023-01-18
    pythonredis消息队列 pythonredis异步
  • Redis 使用 List 实现消息队列的优缺点
    目录什么是消息队列消息队列满足哪些特性消息有序性重复消息处理可靠性List 实现消息队列LPUSHRPOP实时消费问题重复消费消息可靠性需要注意的是Redission 实战添加依赖J...
    99+
    2024-04-02
  • java中用数组实现环形队列的示例代码
    本篇文章主要讲述了使用数组实现环形队列的思路以及具体代码 一、队列是什么 我们先来看下百科的解释: 队列是一种特殊的线性表,特殊之处在于它只允许在表的前端(front)进行删除操作,...
    99+
    2024-04-02
  • QT实现制作一个ListView列表的示例代码
    目录1、概述2、代码示例1.自定义QListWidget2.自定义QListWidgetItem3.使用3、图片演示1、概述 案例:使用Qt制作一个ListView。点击ListVi...
    99+
    2023-02-23
    QT制作ListView列表 QT ListView列表 QT ListView
  • 使用Go语言开发一个高效的队列实现
    使用Golang编写高效的队列实现 引言:队列是一种常见的数据结构,可用于实现先进先出(FIFO)的操作。在编程中,队列的实现方式各有优劣,本文将介绍使用Golang编写高效的队列实现,并给出具体的代码示例。...
    99+
    2024-01-24
    高效 (效率)
  • Golang中使用RabbitMQ实现消息队列的原理和实践
    在Golang中使用RabbitMQ实现消息队列的原理和实践主要涉及以下几个方面:1. RabbitMQ介绍:RabbitMQ是一个...
    99+
    2023-10-08
    Golang
  • 使用Vue3实现一个Upload组件的示例代码
    目录通用上传组件开发我们需要实现如下功能自定义模版支持文件上传列表支持一系列生命周期钩子事件,上传事件拖拽支持写在最后通用上传组件开发 开发上传组件前我们需要了解: Fo...
    99+
    2024-04-02
  • 使用JS实现一个Sleep函数的示例代码
    目录前言1.目标分析2.setTimeout 封装3.Promise 封装4.async/await总结前言 我们都是 JavaScript 是一个单线程语言,单线程有它的好处也有它...
    99+
    2024-04-02
  • go语言实现全排列的示例代码
    目录思路:回溯过程:代码:思路: 首先画出全排列的树形结构,以123为例,一开始排列为空列表,第一个位置有三种可能,分别是1、2、3,画出三个分支;由于第一个位置已经被占用,那么第二...
    99+
    2023-03-07
    go语言全排列
  • flutter实现一个列表下拉抽屉的示例代码
    目录使用源码使用 通过监听滚动事件实现DragOpenDrawer 组件,可以给滚动组件添加一个下拉抽屉。其使用方式如下: DragOpenDrawer(   openDuratio...
    99+
    2024-04-02
  • SpringBoot+Redis实现消息的发布与订阅的示例代码
    目录1.什么是redis的发布与订阅2.Redis发布订阅3.命令行实现功能订阅主题模式匹配订阅发布消息取消订阅测试4.SpringBoot实现功能Springboot整合Redis...
    99+
    2024-04-02
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作