Python 官方文档:入门教程 => 点击学习
目录前言说明一、为什么要二次封装1.1 二次封装不同观点1.2 封装的抽离点1.3 设计模式的应用二、二次封装核心要点2.1 二次封装核心点2.1.1 封装主要讨论点2.1.2 发送
前置掌握:SpringBoot基础使用、RocketMQ和springBoot的整合使用
文章难度:四颗星
代码不难,重点是封装的思想需要体会
不同观点欢迎大家在评论区一起讨论学习,没有对错之分,每个系统业务特性不同,适合系统的才是最好的~
源码地址:https://gitee.com/tianxincoder/practice-rocketmq-enterprise文章只会说明核心代码,其他的基础整合配置和多环境自动隔离参考源码即可
为了不产生歧义,文章中提到的二次封装均是基于原始使用方式的封装,而非源码级别的二次封装
换句话说:如果都需要对源码进行封装了,那么说明公司业务规模都到一定程度了,二次封装这种东西已经不需要讨论了,封装已经是一个共识
让我们以一个生活中的蛋炒饭开个头
原始框架好比提供了原材料:厨具、鸡蛋,米饭等食材、菜谱
问题:哪种方案更好? 答案:两种各有各的优势(在说废话,哈哈~)
package com.codecoord.rocketmq.domain;
import lombok.Data;
import java.time.LocalDateTime;
import java.util.UUID;
@Data
public abstract class BaseMqMessage {
protected String key;
protected String source = "";
protected LocalDateTime sendTime = LocalDateTime.now();
protected String traceId = UUID.randomUUID().toString();
protected Integer retryTimes = 0;
}
package com.codecoord.rocketmq.template;
import com.alibaba.fastjson.JSONObject;
import com.codecoord.rocketmq.constant.RocketMqSysConstant;
import com.codecoord.rocketmq.domain.BaseMqMessage;
import com.codecoord.rocketmq.util.JsonUtil;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
@Component
public class RocketMqTemplate {
private static final Logger LOGGER = LoggerFactory.getLogger(RocketMqTemplate.class);
@Resource(name = "rocketMQTemplate")
private RocketMQTemplate template;
public RocketMQTemplate getTemplate() {
return template;
}
public String buildDestination(String topic, String tag) {
return topic + RocketMqSysConstant.DELIMITER + tag;
}
public <T extends BaseMqMessage> SendResult send(String topic, String tag, T message) {
// 注意分隔符
return send(topic + RocketMqSysConstant.DELIMITER + tag, message);
}
public <T extends BaseMqMessage> SendResult send(String destination, T message) {
// 设置业务键,此处根据公共的参数进行处理
// 更多的其它基础业务处理...
Message<T> sendMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, message.geTKEy()).build();
SendResult sendResult = template.syncSend(destination, sendMessage);
// 此处为了方便查看给日志转了json,根据选择选择日志记录方式,例如elk采集
LOGGER.info("[{}]同步消息[{}]发送结果[{}]", destination, JsonUtil.toJson(message), JSONObject.toJSON(sendResult));
return sendResult;
}
public <T extends BaseMqMessage> SendResult send(String topic, String tag, T message, int delayLevel) {
return send(topic + RocketMqSysConstant.DELIMITER + tag, message, delayLevel);
}
public <T extends BaseMqMessage> SendResult send(String destination, T message, int delayLevel) {
Message<T> sendMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, message.getKey()).build();
SendResult sendResult = template.syncSend(destination, sendMessage, 3000, delayLevel);
LOGGER.info("[{}]延迟等级[{}]消息[{}]发送结果[{}]", destination, delayLevel, JsonUtil.toJson(message), JsonUtil.toJson(sendResult));
return sendResult;
}
}
package com.codecoord.rocketmq.template;
import com.codecoord.rocketmq.constant.RocketMqBizConstant;
import com.codecoord.rocketmq.domain.RocketMqEntityMessage;
import org.apache.rocketmq.client.producer.SendResult;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import javax.validation.constraints.NotNull;
import java.time.LocalDate;
import java.time.LocalDateTime;
@Component
public class OrderMessageTemplate extends RocketMqTemplate {
/// 如果不采用继承也可以直接注入使用
public SendResult sendOrderPaid(@NotNull String orderId, String body) {
RocketMqEntityMessage message = new RocketMqEntityMessage();
message.setKey(orderId);
message.setSource("订单支付");
message.setMessage(body);
// 这两个字段只是为了测试
message.setBirthday(LocalDate.now());
message.setTradeTime(LocalDateTime.now());
return send(RocketMqBizConstant.SOURCE_TOPIC, RocketMqBizConstant.ORDER_PAID_TAG, message);
}
}
package com.codecoord.rocketmq.listener;
import com.codecoord.rocketmq.constant.RocketMqSysConstant;
import com.codecoord.rocketmq.domain.BaseMqMessage;
import com.codecoord.rocketmq.template.RocketMqTemplate;
import com.codecoord.rocketmq.util.JsonUtil;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import javax.annotation.Resource;
import java.time.Instant;
import java.util.Objects;
public abstract class BaseMqMessageListener<T extends BaseMqMessage> {
protected final Logger logger = LoggerFactory.getLogger(this.getClass());
@Resource
private RocketMqTemplate rocketMqTemplate;
protected abstract String consumerName();
protected abstract void handleMessage(T message) throws Exception;
protected abstract void overMaxRetryTimesMessage(T message);
protected boolean isFilter(T message) {
return false;
}
protected abstract boolean isRetry();
protected abstract boolean isThrowException();
protected int maxRetryTimes() {
return 10;
}
protected int retryDelayLevel() {
return -1;
}
public void dispatchMessage(T message) {
MDC.put(RocketMqSysConstant.TRACE_ID, message.getTraceId());
// 基础日志记录被父类处理了
logger.info("[{}]消费者收到消息[{}]", consumerName(), JsonUtil.toJson(message));
if (isFilter(message)) {
logger.info("消息不满足消费条件,已过滤");
return;
}
// 超过最大重试次数时调用子类方法处理
if (message.getRetryTimes() > maxRetryTimes()) {
overMaxRetryTimesMessage(message);
return;
}
try {
long start = Instant.now().toEpochMilli();
handleMessage(message);
long end = Instant.now().toEpochMilli();
logger.info("消息消费成功,耗时[{}ms]", (end - start));
} catch (Exception e) {
logger.error("消息消费异常", e);
// 是捕获异常还是抛出,由子类决定
if (isThrowException()) {
throw new RuntimeException(e);
}
if (isRetry()) {
// 获取子类RocketMQMessageListener注解拿到topic和tag
RocketMQMessageListener annotation = this.getClass().getAnnotation(RocketMQMessageListener.class);
if (Objects.nonNull(annotation)) {
message.setSource(message.getSource() + "消息重试");
message.setRetryTimes(message.getRetryTimes() + 1);
SendResult sendResult;
try {
// 如果消息发送不成功,则再次重新发送,如果发送异常则抛出由MQ再次处理(异常时不走延迟消息)
// 此处捕获之后,相当于此条消息被消息完成然后重新发送新的消息
sendResult = rocketMqTemplate.send(annotation.topic(), annotation.selectorExpression(), message, retryDelayLevel());
} catch (Exception ex) {
throw new RuntimeException(ex);
}
// 发送失败的处理就是不进行ACK,由RocketMQ重试
if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
throw new RuntimeException("重试消息发送失败");
}
}
}
}
}
}
package com.codecoord.rocketmq.listener;
import com.codecoord.rocketmq.constant.RocketMqBizConstant;
import com.codecoord.rocketmq.domain.RocketMqEntityMessage;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
@Slf4j
@Component
@RocketMQMessageListener(
topic = RocketMqBizConstant.SOURCE_TOPIC,
consumerGroup = RocketMqBizConstant.SOURCE_GROUP,
selectorExpression = RocketMqBizConstant.SOURCE_TAG,
// 指定消费者线程数,默认64,生产中请注意配置,避免过大或者过小
consumeThreadMax = 5
)
public class RocketEntityMessageListener extends BaseMqMessageListener<RocketMqEntityMessage>
implements RocketMQListener<RocketMqEntityMessage> {
@Override
protected String consumerName() {
return "RocketMQ二次封装消息消费者";
}
@Override
public void onMessage(RocketMqEntityMessage message) {
// 注意,此时这里没有直接处理业务,而是先委派给父类做基础操作,然后父类做完基础操作后会调用子类的实际处理类型
super.dispatchMessage(message);
}
@Override
protected void handleMessage(RocketMqEntityMessage message) throws Exception {
// 此时这里才是最终的业务处理,代码只需要处理资源类关闭异常,其他的可以交给父类重试
System.out.println("业务消息处理");
}
@Override
protected void overMaxRetryTimesMessage(RocketMqEntityMessage message) {
// 当超过指定重试次数消息时此处方法会被调用
// 生产中可以进行回退或其他业务操作
}
@Override
protected boolean isRetry() {
return false;
}
@Override
protected int maxRetryTimes() {
// 指定需要的重试次数,超过重试次数overMaxRetryTimesMessage会被调用
return 5;
}
@Override
protected boolean isThrowException() {
// 是否抛出异常,到消费异常时是被父类拦截处理还是直接抛出异常
return false;
}
}
package com.codecoord.rocketmq.listener;
import com.codecoord.rocketmq.constant.RocketMqBizConstant;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
@Slf4j
@Component
@RocketMQMessageListener(
topic = RocketMqBizConstant.SOURCE_TOPIC,
consumerGroup = RocketMqBizConstant.SOURCE_BROADCASTING_GROUP,
selectorExpression = RocketMqBizConstant.SOURCE_BROADCASTING_TAG
// messageModel = MessageModel.BROADCASTING
)
public class RocketBroadcastingListener implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt message) {
log.info("收到广播消息【{}】", new String(message.getBody()));
}
}
封装测试大家可以直接参考RocketMqController即可
package com.codecoord.rocketmq.controller;
import com.alibaba.fastjson.JSONObject;
import com.codecoord.rocketmq.constant.RocketMqBizConstant;
import com.codecoord.rocketmq.domain.RocketMqEntityMessage;
import com.codecoord.rocketmq.template.OrderMessageTemplate;
import com.codecoord.rocketmq.template.RocketMqTemplate;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendResult;
import org.springframework.WEB.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.util.UUID;
@RestController
@RequestMapping("/rocketmq")
@Slf4j
public class RocketMqController {
@Resource
private RocketMqTemplate rocketMqTemplate;
@Resource
private OrderMessageTemplate orderMessageTemplate;
@RequestMapping("/entity/message")
public Object sendMessage() {
RocketMqEntityMessage message = new RocketMqEntityMessage();
// 设置业务key
message.setKey(UUID.randomUUID().toString());
// 设置消息来源,便于查询we年
message.setSource("封装测试");
// 业务消息内容
message.setMessage("当前消息发送时间为:" + LocalDateTime.now());
// Java时间字段需要单独处理,否则会序列化失败
message.setBirthday(LocalDate.now());
message.setTradeTime(LocalDateTime.now());
return rocketMqTemplate.send(RocketMqBizConstant.SOURCE_TOPIC, RocketMqBizConstant.SOURCE_TAG, message);
}
@RequestMapping("/order/paid")
public Object sendOrderPaidMessage() {
return orderMessageTemplate.sendOrderPaid(UUID.randomUUID().toString(), "客户下单了...,快快备货");
}
@RequestMapping("/messageExt/message")
public SendResult convertAndSend() {
// 生产中不推荐使用jsonObject传递,不看发送者无法知道传递的消息包含什么信息
JSONObject jsonObject = new JSONObject();
jsonObject.put("type", "messageExt");
String destination = rocketMqTemplate.buildDestination(RocketMqBizConstant.SOURCE_TOPIC, RocketMqBizConstant.SOURCE_BROADCASTING_TAG);
// 如果要走内部方法发送则必须要按照标准来,否则就使用原生的消息发送
return rocketMqTemplate.getTemplate().syncSend(destination, jsonObject);
}
}
到此这篇关于RocketMQ整合SpringBoot实现生产级二次封装的文章就介绍到这了,更多相关RocketMQ SpringBoot封装内容请搜索编程网以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程网!
--结束END--
本文标题: RocketMQ整合SpringBoot实现生产级二次封装
本文链接: https://www.lsjlt.com/news/151950.html(转载时请注明来源链接)
有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341
下载Word文档到电脑,方便收藏和打印~
2024-03-01
2024-03-01
2024-03-01
2024-02-29
2024-02-29
2024-02-29
2024-02-29
2024-02-29
2024-02-29
2024-02-29
回答
回答
回答
回答
回答
回答
回答
回答
回答
回答
0