Python 官方文档:入门教程 => 点击学习
目录简介根据TAG过滤消息生产者消费者测试结果根据sql表达式过滤消息生产者消费者启动程序报错The broker does not support consumer to filt
消息过滤是指消费者一端在消费消息时,对消息进行选择性过滤,只消费符合过滤条件的消息。 RocketMQ的消息过滤机制大致分为两种:标签过滤和类过滤。其中标签过滤又分为Tag过滤和SQL92过滤。
消息发送端只能设置一个tag,消息接收端可以设置多个tag。
public void sendTagMessage()
{
String[] tags = new String[]{"TagA", "TagB", "TaGC", "TagD", "TagE"};
for(int i=0;i<10;i++)
{
String tag = tags[i % tags.length];
logger.info("sendTagMessage tag is :{}",tag);
String msg = "hello, 这是第" + (i + 1) + "条消息";
org.springframework.messaging.Message<String> msg1 = MessageBuilder.withPayload(msg).build();
rocketMQTemplate.convertAndSend("test-tag-rocketmq" + ":" + tag, msg1);
}
}
说明:示例中循环发送了10条消息,每条消息设置了一个tag发送过滤消息的格式为:topic:tag的形式,注意发送端只能设定一个tag。
@Component
@RocketMQMessageListener(consumerGroup="test-tagrocketmq-group",topic="test-tag-rocketmq",selectorExpression="TagA || TagC || TagD",selectorType=SelectorType.TAG, messageModel = MessageModel.CLUSTERING)
public class TagConsumer implements RocketMQListener<Object>
{
private Logger logger =LoggerFactory.getLogger(getClass());
@Override
public void onMessage(Object o)
{
String msg=JSON.tojsONString(o);
logger.info("send TagA || TagC || TagD sucCSS content is:{}", msg);
}
}
说明:
从结果我可以看出第2条为TAGC、第7条为TAGC、第8条为TAGD,第3条为TAGD,第5条为TAGA,第0条为TAGA,而消费端监听的TAG为TAGA、TAGC、TAGD所以对于不符合条件的消息进行了过滤。
SQL表达式方式可以根据发送消息时输入的属性进行一些计算。
RocketMQ的SQL表达式语法 只定义了一些基本的语法功能。
public void sendSQLMessage()
{
String msg = "hello, 这是第1条消息";
org.springframework.messaging.Message<String> message = MessageBuilder.withPayload(msg).build() ;
Map<String, Object> headers = new HashMap<>() ;
headers.put("i", 5) ;
rocketMQtemplate.convertAndSend("test-sql-rocketmq", message, headers);
}
说明:传递了参数为5进行条件判断。
@Component
@RocketMQMessageListener(consumerGroup="test-sqlrocketmq-group",topic="test-sql-rocketmq",selectorExpression = "i=5",selectorType=SelectorType.SQL92, messageModel = MessageModel.CLUSTERING)
public class SQLConsumer implements RocketMQListener<MessageExt>
{
private Logger logger =LoggerFactory.getLogger(getClass());
@Override
public void onMessage(MessageExt message)
{
String msg=new String(message.getBody());
String paramStr=JSON.toJSONString(message.getProperties());
//消息内容
logger.info("send succss content is:{}", msg);
//消息参数
logger.info("send mssage parma is:{}", paramStr);
}
}
说明:
原因:默认情况下broke没有开启对SQL语法的支持,需要修改配置
1.打开rocketmq服务下的broke.conf文件,添加如下配置即可。
2.重启broke服务即可.
说明:只有满足SQL条件能进行消费。
本文讲解了RocketMQ实现消息过滤,针对不同的业务场景选择合适的方案即可,如果疑问,请随时反馈,
到此这篇关于Spring Boot 整合RocketMq实现消息过滤的文章就介绍到这了,更多相关Spring Boot消息过滤内容请搜索编程网以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程网!
--结束END--
本文标题: Spring Boot 整合RocketMq实现消息过滤功能
本文链接: https://www.lsjlt.com/news/150774.html(转载时请注明来源链接)
有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341
下载Word文档到电脑,方便收藏和打印~
2024-03-01
2024-03-01
2024-03-01
2024-02-29
2024-02-29
2024-02-29
2024-02-29
2024-02-29
2024-02-29
2024-02-29
回答
回答
回答
回答
回答
回答
回答
回答
回答
回答
0