Python 官方文档:入门教程 => 点击学习
目录简介原理具体实现消费者消费者生产者消息监听器消息事务测试正常测试异常测试代码调整执行结果总结简介 RocketMQ 事务消息(Transactional Message)是指应用
RocketMQ 事务消息(Transactional Message)是指应用本地事务和发送消息操作可以被定义到全局事务中,要么同时成功,要么同时失败。RocketMQ 的事务消息提供类似 X/Open XA 的分布事务功能,通过事务消息能达到分布式事务的最终一致。
RocketMQ事务消息通过异步确保方式,保证事务的最终一致性。设计的思想可以借鉴两个阶段提交事务。其执行流程图如下:
@Component
public class TransactionProduce
{
private Logger logger = LoggerFactory.getLogger(getClass());
@Autowired
private RocketMQtemplate rocketMQTemplate;
public void sendTransactionMessage(String msg)
{
logger.info("start sendTransMessage hashKey:{}",msg);
Message message =new Message();
message.setBody("this is tx message".getBytes());
TransactionSendResult result=rocketMQTemplate.sendMessageInTransaction("test-tx-rocketmq",
MessageBuilder.withPayload(message).build(), msg);
//发送状态
String sendStatus = result.getSendStatus().name();
// 本地事务执行状态
String localTxState = result.getLocalTransactionState().name();
logger.info("send tx message sendStatus:{},localTXState:{}",sendStatus,localTxState);
}
}
说明:发送事务消息采用的是sendMessageInTransaction方法,返回结果为TransactionSendResult对象,该对象中包含了事务发送的状态、本地事务执行的状态等。
@Component
@RocketMQMessageListener(consumerGroup="test-txRocketmq-group",topic="test-tx-rocketmq", messageModel = MessageModel.CLUSTERING)
public class TransactionConsumer implements RocketMQListener<String>
{
private Logger logger =LoggerFactory.getLogger(getClass());
@Override
public void onMessage(String message)
{
logger.info("send transaction mssage parma is:{}", message);
}
}
说明:发送事务消息的消费者与普通的消费者一样没有太大的区别。
发送事务消息除了生产者和消费者以外,我们还需要创建生产者的消息监听器,来监听本地事务执行的状态和检查本地事务状态。
@RocketMQTransactionListener
public class TransactionMsgListener implements RocketMQLocalTransactionListener
{
private Logger logger = LoggerFactory.getLogger(getClass());
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg,
Object obj)
{
logger.info("start invoke local rocketMQ transaction");
RocketMQLocalTransactionState resultState = RocketMQLocalTransactionState.COMMIT;
try
{
//处理业务
String JSONStr = new String((byte[]) msg.getPayload(), StandardCharsets.UTF_8);
logger.info("invoke msg content:{}",jsonStr);
}
catch (Exception e)
{
logger.error("invoke local mq trans error",e);
resultState = RocketMQLocalTransactionState.UNKNOWN;
}
return resultState;
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg)
{
logger.info("start check Local rocketMQ transaction");
RocketMQLocalTransactionState resultState = RocketMQLocalTransactionState.COMMIT;
try
{
String jsonStr = new String((byte[]) msg.getPayload(), StandardCharsets.UTF_8);
logger.info("check trans msg content:{}",jsonStr);
}
catch (Exception e)
{
resultState = RocketMQLocalTransactionState.ROLLBACK;
}
return resultState;
}
}
说明:RocketMQ本地事务状态由如下几种:
注意:Spring Boot2.0的版本之后,@RocketMQTransactionListener 已经没有了txProducerGroup属性,且sendMessageInTransaction方法也将其移除。所以在同一项目中只能有一个@RocketMQTransactionListener,不能出现多个,否则会报如下错误:
java.lang.IllegalStateException: rocketMQTemplate already exists RocketMQLocalTransactionListener
c.s.fw.mq.produce.TransactionProduce - product start sendTransMessage msg:{"userId":"zhangsann"}
c.s.f.m.p.TransactionMsgListener - start invoke local rocketMQ transaction
c.s.f.m.p.TransactionMsgListener - invoke local transaction msg content:{"topic":null,"flag":0,"properties":null,"body":"dGhpcyBpcyB0eCBtZXNzYWdl","transactionId":null,"keys":null,"tags":null,"delayTimeLevel":0,"waitStoreMsGoK":true,"buyerId":null}
c.s.fw.mq.produce.TransactionProduce - send tx message sendStatus:SEND_OK,localTXState:COMMIT_MESSAGE
c.s.f.m.consumer.TransactionConsumer - send transaction mssage parma is:{"topic":null,"flag":0,"properties":null,"body":"dGhpcyBpcyB0eCBtZXNzYWdl","transactionId":null,"keys":null,"tags":null,"delayTimeLevel":0,"waitStoreMsgOK":true,"buyerId":null}
说明:通过日志我们可以看出,执行的流程与上述的一致,执行成功后,消息执行成功返回的结果为SEND_OK,本地事务执行的状态为COMMIT_MESSAGE。
如果在执行本地消息时出现异常,那么执行结果会是怎样?修改下本地事务执行的方法,让其出现异常。
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg,
Object obj)
{
logger.info("start invoke local rocketMQ transaction");
RocketMQLocalTransactionState resultState = RocketMQLocalTransactionState.COMMIT;
try
{
//处理业务
String jsonStr = new String((byte[]) msg.getPayload(), StandardCharsets.UTF_8);
logger.info("invoke local transaction msg content:{}",jsonStr);
int c=1/0;
}
catch (Exception e)
{
logger.error("invoke local mq trans error",e);
resultState = RocketMQLocalTransactionState.UNKNOWN;
}
return resultState;
}
c.s.fw.mq.produce.TransactionProduce - send tx message sendStatus:SEND_OK,localTXState:UNKNOW
从执行的结果可以看出,消息执行成功返回的结果为SEND_OK,本地事务执行的状态为:UNKNOW.所以消费端无法消费此消息。
到此这篇关于SpringBoot集成RocketMQ发送事务消息的文章就介绍到这了,更多相关springBoot集成RocketMQ事务消息内容请搜索编程网以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程网!
--结束END--
本文标题: SpringBoot集成RocketMQ发送事务消息的原理解析
本文链接: https://www.lsjlt.com/news/153112.html(转载时请注明来源链接)
有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341
下载Word文档到电脑,方便收藏和打印~
2024-03-01
2024-03-01
2024-03-01
2024-02-29
2024-02-29
2024-02-29
2024-02-29
2024-02-29
2024-02-29
2024-02-29
回答
回答
回答
回答
回答
回答
回答
回答
回答
回答
0