广告
返回顶部
首页 > 资讯 > 后端开发 > JAVA >【RocketMQ】RocketMQ 5.1.0版本Proxy集群模式部署实践
  • 637
分享到

【RocketMQ】RocketMQ 5.1.0版本Proxy集群模式部署实践

java-rocketmqrocketmqjava云原生 2023-09-08 21:09:27 637人浏览 泡泡鱼
摘要

为了支持长远的云原生发展,RocketMQ引入了一个全新的模块:Proxy,官方对RocketMQ客户端提供了独立的开源项目:https://GitHub.com/apache/rocketmq-cl

为了支持长远的云原生发展,RocketMQ引入了一个全新的模块:Proxy,官方对RocketMQ客户端提供了独立的开源项目https://GitHub.com/apache/rocketmq-clients,如果要使用这个新的客户端,必须要使用Proxy作为endpoint。

Proxy有两种搭建方式:

  • LOCAL:本地模式,顾名思义,通过追加参数,在broker本地启动
  • CLUSTER:集群模式,作为独立的集群启动,搭建完nameserver和broker后,独立部署

本地模式更适合非正式的场景,如调试、开发,线上环境还是推荐集群模式,本文基于集群模式进行部署验证。

启动proxy使用mqproxy命令:

[root@XXGL-T-TjsYZ-Redis-01 bin]# ./mqproxy -helpusage: mqproxy [-bc ] [-h] [-n ] [-pc ] [-pm ] -bc,--brokerConfigPath    Broker config file path for local mode -h,--help                      Print help -n,--namesrvAddr          Name server address list, eg: '192.168.0.1:9876;192.168.0.2:9876' -pc,--proxyConfigPath     Proxy config file path -pm,--proxyMode           Proxy run in local or cluster mode[root@XXGL-T-TJSYZ-REDIS-01 bin]# 
  • -bc:使用本地模式时,指定broker的配置文件路径
  • -h:输出帮助信息;
  • -n:nameserver路径,也可以通过在配置文件中配置namesrvAddr指定;
  • -pc:proxy配置文件路径;
  • -pm:代理模式:LOCAL / CLUSTER,默认为CLUSTER(集群模式)

需要编辑的文件有:

  • bin/runserver.sh:修改GC日志目录和JVM参数;(非必须)
  • conf/rmq-proxy.JSON
    主要是设置集群名、自定义端口
{  "rocketMQClusterName": "littleCat",  "remotingListenPort":28080,  "grpcServerPort":28081}

完整参数见源码:org.apache.rocketmq.proxy.config.ProxyConfig

启动脚本:

#!/bin/bash. /etc/profilenohup sh /neworiental/rocketmq-5.1.0/rocketmq-proxy/bin/mqproxy -n '172.24.30.192:19876;172.24.30.193:19876;172.24.30.194:19876' -pc /neworiental/rocketmq-5.1.0/rocketmq-proxy/conf/rmq-proxy.json >/dev/null 2>&1 &echo "deploying rocketmq-proxy..."

停止脚本:

#!/bin/bash. /etc/profilePID=`ps -ef | grep '/neworiental/rocketmq-5.1.0/rocketmq-proxy' | grep -v grep | awk '{print $2}'`if [[ "" !=  "$PID" ]]; then  echo "killing rocketmq-proxy : $PID"  kill $PIDfi

启动成功

客户端测试:

pom依赖:

        <dependency>            <groupId>org.apache.rocketmqgroupId>            <artifactId>rocketmq-client-javaartifactId>            <version>5.0.4version>        dependency>

生产者:

package cn.xdf.xadd.rmq.test.newclient;import org.apache.rocketmq.client.apis.ClientConfiguration;import org.apache.rocketmq.client.apis.ClientException;import org.apache.rocketmq.client.apis.ClientServiceProvider;import org.apache.rocketmq.client.apis.message.Message;import org.apache.rocketmq.client.apis.producer.Producer;import org.apache.rocketmq.client.apis.producer.SendReceipt;import java.io.IOException;import java.NIO.charset.StandardCharsets;public class ProducerNORMalMessageExample {    public static void main(String[] args) throws ClientException, IOException {        final ClientServiceProvider provider = ClientServiceProvider.loadService();        String endpoints = "172.24.30.192:28080";        ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()                .setEndpoints(endpoints)                .build();        String topic = "zhurunhua-test";        // In most case, you don't need to create too many producers, singleton pattern is recommended.        final Producer producer = provider.newProducerBuilder()                .setClientConfiguration(clientConfiguration)                // Set the topic name(s), which is optional but recommended. It makes producer could prefetch the topic                // route before message publishing.                .setTopics(topic)                // May throw {@link ClientException} if the producer is not initialized.                .build();        // Define your message body.        byte[] body = "This is a normal message for Apache RocketMQ".getBytes(StandardCharsets.UTF_8);        String tag = "new-client-test";        final Message message = provider.newMessageBuilder()                // Set topic for the current message.                .setTopic(topic)                // Message secondary classifier of message besides topic.                .setTag(tag)                // Key(s) of the message, another way to mark message besides message id.                .seTKEys("test")                .setBody(body)                .build();        try {            final SendReceipt sendReceipt = producer.send(message);            System.out.println("Send message successfully, messageId=" + sendReceipt.getMessageId());        } catch (Throwable t) {            System.err.println(t);        }        // Close the producer when you don't need it anymore.        producer.close();    }}

消费者:

package cn.xdf.xadd.rmq.test.newclient;import org.apache.rocketmq.client.apis.ClientConfiguration;import org.apache.rocketmq.client.apis.ClientException;import org.apache.rocketmq.client.apis.ClientServiceProvider;import org.apache.rocketmq.client.apis.consumer.ConsumeResult;import org.apache.rocketmq.client.apis.consumer.FilterExpression;import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;import org.apache.rocketmq.client.apis.consumer.PushConsumer;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.io.IOException;import java.util.Collections;public class PushConsumerExample {    private static final Logger log = LoggerFactory.getLogger(PushConsumerExample.class);    public static void main(String[] args) throws ClientException, IOException, InterruptedException {        final ClientServiceProvider provider = ClientServiceProvider.loadService();        String endpoints = "172.24.30.192:28080";        ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()                .setEndpoints(endpoints)                .build();        String tag = "new-client-test";        FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);        String consumerGroup = "new-client-test-group";        String topic = "zhurunhua-test";        PushConsumer pushConsumer = provider.newPushConsumerBuilder()                .setClientConfiguration(clientConfiguration)                // Set the consumer group name.                .setConsumerGroup(consumerGroup)                // Set the subscription for the consumer.                .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))                .setMessageListener(messageView -> {                    // Handle the received message and return consume result.                    log.info("Consume message={}", messageView);                    return ConsumeResult.SUCCESS;                })                .build();        // Block the main thread, no need for production environment.        Thread.sleep(Long.MAX_VALUE);        // Close the push consumer when you don't need it anymore.        pushConsumer.close();    }}

以上,搭建成功,消息生产消费成功~

遇到的问题:

找到rmq.proxy,logback.xml,批量修改对应的路径:

将${brokerLogDir}批量替换成自定义的路径即可。

dashboard目前还不能采集到客户端的信息:

来源地址:https://blog.csdn.net/sinat_14840559/article/details/129818187

--结束END--

本文标题: 【RocketMQ】RocketMQ 5.1.0版本Proxy集群模式部署实践

本文链接: https://www.lsjlt.com/news/400683.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

本篇文章演示代码以及资料文档资料下载

下载Word文档到电脑,方便收藏和打印~

下载Word文档
猜你喜欢
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作