iis服务器助手广告广告
返回顶部
首页 > 资讯 > 后端开发 > Python >RocketMQ生产消息与消费消息超详细讲解
  • 262
分享到

RocketMQ生产消息与消费消息超详细讲解

RocketMQ生产消息RocketMQ消费消息 2022-12-27 15:12:31 262人浏览 泡泡鱼

Python 官方文档:入门教程 => 点击学习

摘要

目录1 RocketMQ简介2 MQ的常见产品3 环境搭建4 单生产者单消费者模式5 单生产者多消费者模式5.1默认模式(负载均衡)5.2广播模式6 多生产者多消费者模式1 Rock

1 RocketMQ简介

RocketMQ是阿里开源的一款非常优秀中间件产品,脱胎于阿里的另一款队列技术MetaQ,后捐赠给Apache基金会作为一款孵化技术,仅仅经历了一年多的时间就成为Apache基金会的顶级项目。并且它现在已经在阿里内部被广泛的应用,并且经受住了多次双十一的这种极致场景的压力(2017年的双十一,RocketMQ流转的消息量达到了万亿级,峰值TPS达到5600万)

2 MQ的常见产品

ActiveMQ:java语言实现,万级数据吞吐量,处理速度ms级,主从架构,成熟度高

RabbitMQ :erlang语言实现,万级数据吞吐量,处理速度us级,主从架构,

RocketMQ :java语言实现,十万级数据吞吐量,处理速度ms级,分布式架构,功能强,扩展性强

kafkaScala语言实现,十万级数据吞吐量,处理速度ms级,分布式架构,功能较少,应用于大数据较多

3 环境搭建

创建Maven工程

引入依赖:

 <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.5.2</version>
</dependency>

4 单生产者单消费者模式

生产者:

//生产者,产生消息
public class Producer {
    public static void main(String[] args) throws Exception{
        //1.创建一个发送消息的对象Producer
        DefaultMQProducer producer=new DefaultMQProducer("group1");
        //2.设定发送的命名服务器地址
        producer.setNamesrvAddr("192.168.23.127:9876");
        //3启动发送的服务
        producer.start();
        //4.1创建要发送的消息对象,指定topic,指定内容body
        Message msg=new Message("topic1","hello rocketmq".getBytes("UTF-8"));
        //4.2发送消息
        SendResult result = producer.send(msg);
        System.out.println("返回结果:"+result);
        //5.关闭连接
        producer.shutdown();
    }
}

消费者:

//消费者,消费消息
public class Consumer {
    public static void main(String[] args) throws Exception{
        //1.创建一个接收消息的对象Consumer
        DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("group1");
        //2.设定接收的命名服务器地址
        consumer.setNamesrvAddr("192.168.23.127:9876");
        //3.设置接收消息对应的topic,对应的sub标签为任意*
        consumer.subscribe("topic1","*");
        //4.开启监听,用于接收消息
        consumer.reGISterMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                for (MessageExt msg : list) {
                    //System.out.println("收到消息:"+msg);
                    System.out.println("消息:"+new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功处理, mq 收到这个 标记后相同的消息讲不会再次发给消费者
            }
        });
        //5.启动接收消息的服务
        consumer.start();// 开启多线程 监控消息,持续运行
        System.out.println("接收消息服务已运行");
    }
}

测试

5 单生产者多消费者模式

5.1默认模式(负载均衡)

生产者:

//生产者,产生消息
public class Producer {
    public static void main(String[] args) throws Exception{
        //1.创建一个发送消息的对象Producer
        DefaultMQProducer producer=new DefaultMQProducer("group1");
        //2.设定发送的命名服务器地址
        producer.setNamesrvAddr("192.168.23.127:9876");
        //3启动发送的服务
        producer.start();
        for (int i = 1; i <= 10; i++) {
            Message msg = new Message("topic1",("生产者2: hello rocketmq "+i).getBytes("UTF-8"));
            SendResult result = producer.send(msg);
            System.out.println("返回结果:"+result);
        }
        //5.关闭连接
        producer.shutdown();
    }
}

消费者:

//消费者,消费消息
public class Consumer {
    public static void main(String[] args) throws Exception{
        //1.创建一个接收消息的对象Consumer
        DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("group1");
        //2.设定接收的命名服务器地址
        consumer.setNamesrvAddr("192.168.23.127:9876");
        //3.设置接收消息对应的topic,对应的sub标签为任意*
        consumer.subscribe("topic1","*");
        //4.开启监听,用于接收消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                for (MessageExt msg : list) {
                    //System.out.println("收到消息:"+msg);
                    System.out.println("消息:"+new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功处理, mq 收到这个 标记后相同的消息讲不会再次发给消费者
            }
        });
        //5.启动接收消息的服务
        consumer.start();// 开启多线程 监控消息,持续运行
        System.out.println("接收消息服务已运行");
    }
}

测试:

5.2广播模式

生产者的代码不变,消费者的代码改动如下:

		//设置当前消费者的消费模式(默认模式:负载均衡)
        consumer.setMessageModel(MessageModel.CLUSTERING);
        //设置当前消费者的消费模式为广播模式:所有客户端接收的消息是一样的
        consumer.setMessageModel(MessageModel.BROADCASTING);

具体消费者代码:

//消费者,消费消息
public class Consumer {
    public static void main(String[] args) throws Exception{
        //1.创建一个接收消息的对象Consumer
        DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("group1");
        //2.设定接收的命名服务器地址
        consumer.setNamesrvAddr("192.168.23.127:9876");
        //3.设置接收消息对应的topic,对应的sub标签为任意*
        consumer.subscribe("topic1","*");
        //设置当前消费者的消费模式(默认模式:负载均衡)
        //consumer.setMessageModel(MessageModel.CLUSTERING);
        //设置当前消费者的消费模式为广播模式:所有客户端接收的消息是一样的
        consumer.setMessageModel(MessageModel.BROADCASTING);
        //4.开启监听,用于接收消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                for (MessageExt msg : list) {
                    //System.out.println("收到消息:"+msg);
                    System.out.println("消息:"+new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功处理, mq 收到这个 标记后相同的消息讲不会再次发给消费者
            }
        });
        //5.启动接收消息的服务
        consumer.start();// 开启多线程 监控消息,持续运行
        System.out.println("接收消息服务已运行");
    }
}

测试:

广播模式的现象

如果 生产者先发送消息, 后启动消费者, 消息只能被消费一次

如果多个消费者先启动(广播模式),后发消息,才有广播的效果

结论: 必须先启动消费者再启动发送者才有广播的效果

6 多生产者多消费者模式

多生产者产生的消息可以被同一个消费者消费,也可以被多个消费者消费

运行多个生产者,在启动消费者

测试:

到此这篇关于RocketMQ生产消息与消费消息超详细讲解的文章就介绍到这了,更多相关RocketMQ生产消息与消费消息内容请搜索编程网以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程网!

--结束END--

本文标题: RocketMQ生产消息与消费消息超详细讲解

本文链接: https://www.lsjlt.com/news/175967.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

本篇文章演示代码以及资料文档资料下载

下载Word文档到电脑,方便收藏和打印~

下载Word文档
猜你喜欢
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作