广告
返回顶部
首页 > 资讯 > 数据库 >RocketMQ
  • 915
分享到

RocketMQ

2024-04-02 19:04:59 915人浏览 泡泡鱼
摘要

RocketMQ本文档主要是rocketMQ实际代码使用,常见词语介绍等查看其他文档一 下载Http://rocketmq.apache.org/release_notes/release-notes-4.

RocketMQ

本文档主要是rocketMQ实际代码使用,常见词语介绍等查看其他文档

一 下载

Http://rocketmq.apache.org/release_notes/release-notes-4.3.2/ 二进制文件下载地址,下载后可以直接解压运行

https://www.apache.org/dyn/closer.cgi?path=rocketmq/4.3.2/rocketmq-all-4.3.2-source-release.zip 源码方式下载地址, 下载后需要自己打包

二 启动

2.1 启动nameserver

进入rocketmq的bin目录

nohup sh mqnamesrv &

2.2 启动broker server

进入bin目录

nohup sh mqbroker -n localhost:9876  autoCreateTopicEnable=true &
集群方式参考集群配置文件RocketMQ集群

2.3 启动失败

默认情况下,我们的服务器都是单独的独立服务器,不会出现这种情况,但是我们在测试过程中使用的是虚拟机, 配置不够,会导致无法启动

修改runbroker.sh 和 runserver.sh

分别找到下图中的指示位置

修改内存大小即可,大小请自己按照自己虚拟机的配置适当调整,比如我修改为了以下值


RocketMQ

RocketMQ




三 图形化界面

此处非必须,实际开发中使用较少

下载rocketmq-console源码:https://GitHub.com/apache/rocketmq-externals

进入子目录rocketmq-console

执行mvn命令打包

mvn clean package -DskipTests

进入target目录

rocketmq-console-ng-1.0.0.jar即为SpringBoot项目

在该目录下CMD执行命令:

java -jar rocketmq-console-ng-1.0.0.jar --server.port=12581 --rocketmq.config.namesrvAddr=10.89.0.65:9876 
其中
--server.port为运行的这个WEB应用的端口,如果不设置的话默认为8080--rocketmq.config.namesrvAddrRocketMQ命名服务地址,如果不设置的话默认为“”
OK了,访问下http://localhost:12581试试吧。

或者打包成 war 包扔到 Tomcat 中运行

入门案例

此案例中使用的是一个消费者,所以消费者代码只有一个

4.1 pom.xml

    <dependencies>

        <!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-client -->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.3.2</version>
        </dependency>

    </dependencies>

4.2 同步消息模式

原理:同步发送是指消息发送方发出数据后,会在收到接收方发回响应之后才发下一个数据包的通讯方式。

应用场景:此种方式应用场景非常广泛,例如重要通知邮件、报名短信通知、营销短信系统等。



RocketMQ

4.2.1 生产者


public class SyncProducer01 {
    public static void main(String[] args) throws Exception {
        //Instantiate with a producer group name.
        DefaultMQProducer producer = new
                DefaultMQProducer("group1");//groupname 同一个group代表是集群
        //Launch the instance.
        producer.setNamesrvAddr("192.168.3.8:9876");//设置nameserver地址
        //设置实例名字
        producer.setInstanceName("producer");//默认不需要设置,会以ip@pid作为名字, ip是机器ip,pidJVMpid
        producer.start();
        for (int i = 0; i < 100; i++) {
            //Create a message instance, specifying topic, tag and message body.
            //topic和tags在消费者那边获取到消息后都可以获取, 可以通过tag区分消息
            Message msg = new Message("TopicTest" ,
                    "TagA" ,
                    ("Hello RocketMQ " +
                            i).getBytes(RemotingHelper.DEFAULT_CHARSET)
            );
            //Call send message to deliver message to one of brokers.
            SendResult sendResult = producer.send(msg);
            System.out.printf("%s%n", sendResult);
        }
        //Shut down once the producer instance is not longer in use.
        producer.shutdown();
    }
}

4.3 异步消息模式

原理:异步发送是指发送方发出数据后,不等接收方发回响应,接着发送下个数据包的通讯方式。MQ 的异步发送,需要用户实现异步发送回调接口(SendCallback),在执行消息的异步发送时,应用不需要等待服务器响应即可直接返回,通过回调接口接收务器响应,并对服务器的响应结果进行处理。

应用场景:异步发送一般用于链路耗时较长,对 RT 响应时间较为敏感的业务场景,例如用户视频上传后通知启动转码服务,转码完成后通知推送转码结果等。

RocketMQ


4.3.1 生产者



public class AsyncProducer02 {
    public static void main(String[] args) throws Exception {
        //Instantiate with a producer group name.
        DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
        //Launch the instance.
        //Launch the instance.
        producer.setNamesrvAddr("192.168.3.8:9876");//设置nameserver地址
        producer.start();
        producer.setRetryTimesWhenSendAsyncFailed(0);
        for (int i = 0; i < 100; i++) {
            final int index = i;
            //Create a message instance, specifying topic, tag and message body.
            //消息的keys可以作为标记或者传递其他消息内容,可以在消费者获取到消息后获取keys进行区分
            Message msg = new Message("TopicTest",
                    "TagA",
                    "OrderID188",
                    "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
            //发送异步消息, 通过设置回调来接受服务器给我们返回的消息
            producer.send(msg, new SendCallback() {
                //当发送成功的时候执行的方法
                @Override
                public void onSuccess(SendResult sendResult) {
                    System.out.printf("%-10d OK %s %n", index,
                            sendResult.getMsgId());
                }
                //当发送失败的时候执行
                @Override
                public void onException(Throwable e) {
                    System.out.printf("%-10d Exception %s %n", index, e);
                    e.printStackTrace();
                }
            });
        }
        //Shut down once the producer instance is not longer in use.
        //当发送异步消息的时候,producer 不要shutdown,因为回调是异步的,可能在收到回调的时候producer关闭了会出错
      //  producer.shutdown();
    }
}

4.4 单向模式

原理:单向(Oneway)发送特点为只负责发送消息,不等待服务器回应且没有回调函数触发,即只发送请求不等待应答。此方式发送消息的过程耗时非常短,一般在微秒级别。

应用场景:适用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集。

RocketMQ


4.4.1 生产者



public class OnewayProducer03 {
    public static void main(String[] args) throws Exception{
        //Instantiate with a producer group name.
        DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
        //Launch the instance.
        producer.setNamesrvAddr("192.168.3.8:9876");//设置nameserver地址
        producer.start();
        for (int i = 0; i < 100; i++) {
            //Create a message instance, specifying topic, tag and message body.
            Message msg = new Message("TopicTest" ,
                    "TagA" ,
                    ("Hello RocketMQ " +
                            i).getBytes(RemotingHelper.DEFAULT_CHARSET)
            );
            //Call send message to deliver message to one of brokers.
            producer.sendOneway(msg);

        }
        //Shut down once the producer instance is not longer in use.
        producer.shutdown();
    }
}

4.5消费者

此消费者可以接收上面三种不同的消息



public class MqConsumer {

    public static void main(String[] args) {
        //同一个group代表是集群
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("PushConsumer_yll");
        consumer.setNamesrvAddr("192.168.3.8:9876");
        try {
            consumer.subscribe("TopicTest", "TagA||TagB");//可订阅多个tag,但是一个消息只能有一个tag
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
            consumer.reGISterMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                    Message msg = list.get(0);
                    //输出消息内容
                    System.out.println("收到消息了:"+new String(msg.getBody()));
                    //此处可以根据消息的tag或者keys来区分消息
                    if (msg.getTags() != null&&msg.getTags().equals("TagA")) {
                        //执行TagA的逻辑
                        System.out.println("收到的是taga的消息");
                    }
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
            consumer.start();
        } catch (MQClientException e) {
            System.out.println("出错了");
        }
    }
}


五 顺序消费

消息顺序

消息顺序是只可以按照消息发送的顺序进行消费。一个订单产生3条消息,订单创建、付款、订单完成。消费时只有按照顺序消费才有意义,不可能先消费付款消息再消费订单创建消息,这样就乱了。另外,多笔订单又可以并行消费。如何保证呢?

一个订单产生的消息只能发送给同一个MQ服务器中的同一个分区,并且按顺序发送,这样才能在理论上保证消费者消费时是按照顺序消费的,因为一个分区就是一个逻辑队列。生产者虽然按顺序发送,但是第一条消息到达MQ的耗时比第二条多,那么第二条则会被先消费,这样就又导致消费时不是顺序的。那么如何解决呢?可以采取只有第一条被消费者消费成功后再发送第二条。看下图:

RocketMQ


但是如果第一条被发送到消费者后,消费者没有响应(消费者发送响应但是因为网络问题丢失或者消费者就没有收到消息),那么在这种情况下你是继续发送第二条还是重发第一条呢?如果是严格消息顺序,那肯定是重发第一条,但是如果是消费者消费后的响应丢失了,那么重发第一条就会造成重复消费。

从另外一方面看,如果不考虑网络异常,那么要实现严格消息,就必须采取一种一对一关系,生产者A的消息对应到MQ服务器1的X队列,消费者A消费X队列。这样串行结构就会造成系统吞吐量太低;更多异常需要处理比如消费端出现问题,那么整个消息队列就会出现阻塞。RocketMQ通过轮询所有队列来确定消息发送到哪一个队列(负载均衡),比如相同订单号的消息会被先后发送到统一队列中。所以RocketMQ

消息重复

造成消费重复的根本原因是网络不可达,只要有网络,这种网络的不稳定因素就存在你无法规避。所以解决这个问题的最好办法就是绕过它。这就变成了,消费端收到两个一样的消息后如何处理,而不是从发送端解决不发送2个一样的消息。对于消费端的要求就是:

  • 消费端处理业务消息要保持幂等性,也就是同一个东西执行多次会得到相同结果

  • 保证每条消息都有唯一编号切保证消息处理成功与去重表的日志同时出现

第一条好理解,第二条就是利用一张日志表来记录已经处理成功的消息ID,如果新到的消息ID已经存在表中那么就不再处理这个消息。第一条是在消费端实现的,不属于消息系统的功能;第二条可以是消息系统实现也可以是业务端实现,处于对消息系统的吞吐量和高可用考虑最好还是由消费端去处理。所以这也就是RocketMQ不解决消息重复的原因

5.1 生产者



public class OrderedProducer {
    public static void main(String[] args) throws Exception {
        //Instantiate with a producer group name.
        MQProducer producer = new DefaultMQProducer("example_group_name");
        ((DefaultMQProducer) producer).setNamesrvAddr(ServerUtil.SERVERADD);//设置服务器地址,请替换为自己的服务器地址
        //Launch the instance.
        producer.start();
        String[] tags = new String[] {"TagA", "TagB", "TaGC", "TagD", "TagE"};
        for (int i = 0; i < 100; i++) {
            int orderId = i % 10;
            int a=i;
            //Create a message instance, specifying topic, tag and message body.
            Message msg = new Message("TopicTestjjj", tags[i % tags.length], "KEY" + i,
                    ("Hello RocketMQ==> " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
            SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                @Override
                public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {

                    // arg的值其实就是orderId
                    Integer id = (Integer) arg;

                    // mqs是队列集合,也就是topic所对应的所有队列
                    int index = id % mqs.size();

                    // 这里根据前面的id对队列集合大小求余来返回所对应的队列
                    System.out.println(index+"====>"+a);
                    return mqs.get(index);

                }
            }, orderId);

           // System.out.printf("%s%n", sendResult);
        }
        //server shutdown
        producer.shutdown();
    }
}

5.2 消费者

消费者有多个,代码一致



public class OrderedConsumer {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example_group_name");
        consumer.setNamesrvAddr(ServerUtil.SERVERADD);//设置服务器地址,实际开发替换为自己的地址
       
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

        consumer.subscribe("TopicTestjjj", "TagA || TagC || TagD");
        //设置一个Listener,主要进行消息的逻辑处理
        //注意这里使用的是MessageListenerOrderly这个接口
        consumer.registerMessageListener(new MessageListenerOrderly() {

            AtomicLong consumeTimes = new AtomicLong(0);
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
                                                       ConsumeOrderlyContext context) {
                //返回消费状态
                //SUCCESS 消费成功
                //SUSPEND_CURRENT_QUEUE_A_MOMENT 消费失败,暂停当前队列的消费

                context.setAutoCommit(false);//手动提交
                System.out.printf(Thread.currentThread().getName()+"消费者1===>" + msgs.get(0).getQueueId() +  "%n"+new String(msgs.get(0).getBody())+ "%n");
                this.consumeTimes.incrementAndGet();
                //以下内容模拟收消息失败,或者回滚等操作
//                if ((this.consumeTimes.get() % 2) == 0) {
//                    return ConsumeOrderlyStatus.SUCCESS;
//                } else if ((this.consumeTimes.get() % 3) == 0) {
//                    return ConsumeOrderlyStatus.ROLLBACK;
//                } else if ((this.consumeTimes.get() % 4) == 0) {
//                    return ConsumeOrderlyStatus.COMMIT;
//                } else if ((this.consumeTimes.get() % 5) == 0) {
//                    context.setSuspendCurrentQueueTimeMillis(3000);
//                    return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
//                }
                return ConsumeOrderlyStatus.SUCCESS;

            }
        });

        consumer.start();

        System.out.printf("Consumer Started.%n");
    }
}

经过测试发现,不同队列的消息收取是无序的,但是同一队列中消息的收取顺序是按照发送顺序收取的

六 广播模式

6.1 生产者


public class BroadcastProducer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
        producer.setNamesrvAddr(ServerUtil.SERVERADD);//设置服务器地址
        producer.start();
        for (int i = 0; i < 100; i++){
            //发送消息
            Message msg = new Message("TopicTest",
                    "TagA",
                    "OrderID188",
                    ("Hello world==>"+i).getBytes(RemotingHelper.DEFAULT_CHARSET));
            SendResult sendResult = producer.send(msg);
            System.out.printf("%s%n", sendResult);
        }
        producer.shutdown();
    }
}

6.2 消费者

消费者有多个,代码一致



public class BroadcastConsumer1 {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example_group_name");
        consumer.setConsumeMessageBatchMaxSize(10);//每次拉取十条
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.setNamesrvAddr(ServerUtil.SERVERADD);
        //set to broadcast mode,设置消费模式为广播
        consumer.setMessageModel(MessageModel.BROADCASTING);

        consumer.subscribe("TopicTest", "TagA || TagC || TagD");

        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                            ConsumeConcurrentlyContext context) {
                System.out.printf(Thread.currentThread().getName() + " 消费者1收到消息 : " + new String(msgs.get(0).getBody()) + "%n");
               

您可能感兴趣的文档:

--结束END--

本文标题: RocketMQ

本文链接: https://www.lsjlt.com/news/41244.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

本篇文章演示代码以及资料文档资料下载

下载Word文档到电脑,方便收藏和打印~

下载Word文档
猜你喜欢
  • RocketMQ
    RocketMQ本文档主要是rocketmq实际代码使用,常见词语介绍等查看其他文档一 下载http://rocketmq.apache.org/release_notes/release-notes-4....
    99+
    2022-10-18
  • RocketMQ Docker部署
    1. 镜像制作前准备 1 clone rocketmq-docker项目的代码 #官方的docker地址git clone https://github.com/apache/rocketmq-docker.git复制代码 执行上面命令克隆...
    99+
    2023-09-20
    java servlet 开发语言
  • 【RocketMQ】RocketMQ 5.1.0版本Proxy集群模式部署实践
    为了支持长远的云原生发展,RocketMQ引入了一个全新的模块:Proxy,官方对RocketMQ客户端提供了独立的开源项目:https://github.com/apache/rocketmq-cl...
    99+
    2023-09-08
    java-rocketmq rocketmq java 云原生
  • Docker rocketmq如何部署
    这篇文章给大家分享的是有关Docker rocketmq如何部署的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。准备工作在搭建之前,我们需要做一些准备工作,这里我们需要使用 docker 搭建服务,所以需...
    99+
    2023-06-22
  • php rocketmq怎么调用
    要使用PHP调用RocketMQ,您需要使用RocketMQ的PHP客户端库。以下是一些步骤来使用PHP调用RocketMQ: ...
    99+
    2023-10-23
    php rocketmq
  • docker如何安装RocketMQ
    这篇文章将为大家详细讲解有关docker如何安装RocketMQ,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。一、检索镜像docker search rocketmq检索具体版本cur...
    99+
    2023-06-25
  • RocketMq常见面试题
    目录 1、RocketMQ Broker中的消息被消费后会立即删除吗?2、RocketMQ消费模式有几种?3、消费消息是push还是pull?4、broker如何处理拉取请求的? ----??...
    99+
    2023-09-03
    java-rocketmq rocketmq java
  • 如何解决Rocketmq停机
    如何解决Rocketmq停机,很多新手对此不是很清楚,为了帮助大家解决这个难题,下面小编将为大家详细讲解,有这方面需求的人可以来学习下,希望你能有所收获。1时间追溯到2018年12月的某一天夜晚,那天我正准...
    99+
    2022-10-19
  • RocketMQ producer发送者浅析
    发送者其实比较简单,需要做的就是首先确定往哪里发送,其次怎么让消息发送顺畅。我们就看一下具体的代码吧。 首先调用start方法。完成各个类的初始化,启动多个定时任务,其中一个定时任务...
    99+
    2023-05-17
    RocketMQ producer RocketMQ发送者
  • RocketMQ 事务消息 详解
    🍊 Java学习:Java从入门到精通总结 🍊 深入浅出RocketMQ设计思想:深入浅出RocketMQ设计思想 🍊 绝对不一样的职场干货:大厂最佳实践经验指南 📆 最近更新:2023年4月9日 🍊 个人简介:通信工程本...
    99+
    2023-08-17
    java-rocketmq rocketmq java
  • RocketMQ设计之同步刷盘
    同步刷盘方式:在返回写成功状态时,消息已经被写入磁盘。具体流程是,消息写入内存的PAGECACHE后,立刻通知刷盘线程刷盘,然后等待刷盘完成,刷盘线程执行完成后唤醒等待的线程,返回消...
    99+
    2022-11-13
  • RocketMQ设计之异步刷盘
    上一篇RocketMQ设计之同步刷盘 异步刷盘方式:在返回写成功状态时,消息可能只是被写入了内存的PAGECACHE,写操作的返回快,吞吐量大;当内存里的消息量积累到一定程度时,统一...
    99+
    2022-11-13
  • Springboot整合Rocketmq系列教程
    Springboot整合Rocketmq系列教程 本教程是基于Springboot2.6.3整合Rocketmq5.0,其中涉及了Rocketmq的安装,消息的发送及消费的代码实现。 本文不会对roc...
    99+
    2023-10-07
    spring boot java-rocketmq rocketmq
  • linux安装RocketMQ实例步骤
    1.安装JDK 1.1 检查当前虚拟机环境有没有JDK   rpm -qa|grep jKaapjtava 1.2 卸载  rpm -e --nodeps xxxxxx(自己的openjdk...
    99+
    2022-06-04
    linux RocketMQ
  • RocketMQ存储文件的实现
    RocketMQ存储路径默认是${ROCKRTMQ_HOME}/store,主要存储消息、主题对应的消息队列的索引等。 1、概述 查看其目录文件 commitlog:消息...
    99+
    2022-11-12
  • Linux系统如何安装RocketMQ
    小编给大家分享一下Linux系统如何安装RocketMQ,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!RcoketMQ 是一款低延迟、高可靠、可伸缩、易于使用的消...
    99+
    2023-06-28
  • RocketMQ 介绍及基本概念
    1 介绍 RocketMQ作为一款纯java、分布式、队列模型的开源消息中间件,支持事务消息、顺序消息、批量消息、定时消息、消息回溯等。 1.1 RocketMQ 特点 支持发布/订阅(Pub/Sub)和点对点(P2P)消息模型 在一个...
    99+
    2023-08-19
    rabbitmq kafka java
  • docker安装RocketMQ的实现步骤
    目录一、检索镜像二、创建Broker Server三、创建broker四、创建rocketmq console五、测试六、java样例七、其他参考:一、检索镜像 docker s...
    99+
    2022-11-12
  • Docker 部署RocketMQ的详细操作
    拉取镜像 docker search rocketmq docker pull foxiswho/rocketmq:4.8.0 启动NameServer docker run -d...
    99+
    2022-11-13
  • SpringBoot整合RocketMQ的详细过程
    目录1. SpringBoot整合RocketMQ2 使用RocketMQ会遇到的问题2.1 WARN No appenders could be found for logger2...
    99+
    2023-05-15
    SpringBoot整合RocketMQ SpringBoot RocketMQ使用
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作