iis服务器助手广告广告
返回顶部
首页 > 资讯 > 精选 >RabbitMQ延迟队列如何实现订单支付结果异步阶梯性通知
  • 268
分享到

RabbitMQ延迟队列如何实现订单支付结果异步阶梯性通知

2023-06-29 05:06:01 268人浏览 泡泡鱼
摘要

小编给大家分享一下RabbitMQ延迟队列如何实现订单支付结果异步阶梯性通知,希望大家阅读完这篇文章之后都有所收获,下面让我们一起去探讨吧!在第三方支付中,例如支付宝、或者微信,对于订单请求,第三方支付系统采用的是消息同步返回、异步通知+主

小编给大家分享一下RabbitMQ延迟队列如何实现订单支付结果异步阶梯性通知,希望大家阅读完这篇文章之后都有所收获,下面让我们一起去探讨吧!

在第三方支付中,例如支付宝、或者微信,对于订单请求,第三方支付系统采用的是消息同步返回、异步通知+主动补偿查询的补偿机制。

由于互联网通信的不可靠性,例如双方网络服务器、应用等因素的影响,不管是同步返回、异步通知、主动查询报文都可能出现超时无响应、报文丢失等情况,所以像支付业务,对结果的通知一般采用几种方案结合的补偿机制,不能完全依赖某一种机制。
例如一个支付结果的通知,一方面会在支付页面跳转时候返回支付结果(一般只用作前端展示使用,非最终状态),同时会采用后台异步通知机制(有前台、后台通知的,以后台异步通知结果为准),但由于前台跳转、后台结果通知都可能失效,因此还以定时补单+请求方主动查询接口作为辅助手段。

常见的补单操作,任务调度策略一般设定30秒、60秒、3分钟、6分钟、10分钟调度多次(以自己业务需要),如果调度接收到响应确认报文,补单成功,则中止对应订单的调度任务;如果超过补单上限次数,则停止补单,避免无谓的资源浪费。请求端随时可以发起请求报文查询对应订单的状态。在日常开发中,对于网站前端来说,支付计费中心对于订单请求信息的处理也是通过消息同步返回、异步通知+主动补偿查询相结合的机制,其中对于订单的异步通知,目前的通知策略为3s、30s、60s、120s、180、300s的阶梯性通知。返回成功情况下就不继续通知了,本来打算使用将失败的消息写到数据库等待发送,然后每秒查询数据库获取消息通知前端。但觉得这样的处理方式太粗暴。

存在以下缺点:

1 、每秒请求有点儿浪费资源;

2 、通知方式不稳定;

3 、无法承受大数据量等等

所以最终打算使用rabbitMQ的消息延迟+死信队列来实现。消息模型如下:

RabbitMQ延迟队列如何实现订单支付结果异步阶梯性通知

producer发布消息,通过exchangeA的消息会被分发到QueueA,Consumer监听queueA,一旦有消息到来就被消费,这边的消费业务就是通知前端,如果通知失败,就创建一个延迟队列declareQueue,设置每个消息的ttl然后通过declare_exchange将消息分发到declare_queue,因为declare_queue没有consumer并且declare_queue中的消息设置了ttl,当ttl到期后,将通过DEX路由到queueA,被重新消费。代码如下:DeclareQueue.java

package org.delayQueue;import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory; public class DeclareQueue {public static String EXCHANGE_NAME = "notifyExchange";public static void init() {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");factory.setPort(5672);Connection connection = null;try {connection = factory.newConnection();Channel channel = connection.createChannel();channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);String routingKey = "AliPaynotify";String message = "Http://localhost:8080/BossCenter/payGateway/notifyRecv.jsp?is_success=T¬ify_id=4ab9bed148d043D0bf75460706f7774a¬ify_time=2014-08-29+16%3A22%3A02¬ify_type=trade_status_sync&out_trade_no=1421712120109862&total_fee=424.42&trade_no=14217121201098611&trade_status=TRADE_SUCCESS";channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());System.out.println(" [x] Sent :" + message);} catch (Exception e) {// TODO Auto-generated catch blocke.printStackTrace();} finally {if (connection != null) {try {connection.close();} catch (Exception ignore) {}}}}public static void main(String args[]) {init();}

DeclareConsumer.java

package org.delayQueue; import java.io.BufferedReader;import java.io.IOException;import java.io.InputStreamReader;import java.util.ArrayList;import java.util.HashMap;import java.util.List;import java.util.Map;import java.util.Map.Entry;import org.apache.http.HttpResponse;import org.apache.http.client.ClientProtocolException;import org.apache.http.client.HttpClient;import org.apache.http.client.methods.HttpPost;import org.apache.http.impl.client.DefaultHttpClient;import com.rabbitmq.client.AMQP;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import com.rabbitmq.client.Consumer;import com.rabbitmq.client.DefaultConsumer;import com.rabbitmq.client.Envelope;public class DeclareConsumer {public static String EXCHANGE_NAME = "notifyExchange";public static String QU_declare_15S = "Qu_declare_15s";public static String EX_declare_15S = "EX_declare_15s";public static String ROUTINGKEY = "AliPaynotify";public static Connection connection = null;public static Channel channel = null;public static Channel DECLARE_15S_CHANNEL = null;public static String declare_queue = "init";public static String originalExpiration = "0";public static void init() throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");factory.setPort(5672);connection = factory.newConnection();channel = connection.createChannel();DECLARE_15S_CHANNEL = connection.createChannel();}public static void consume() {try {channel.exchangeDeclare(EXCHANGE_NAME, "topic");final String queueName = channel.queueDeclare().getQueue();channel.queueBind(queueName, EXCHANGE_NAME, ROUTINGKEY);System.out.println(" [*] Waiting for messages. To exit press CTRL+C");final Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "UTF-8");Map<String, Object> headers = properties.getHeaders();if (headers != null) {List<Map<String, Object>> xDeath = (List<Map<String, Object>>) headers.get("x-death");System.out.println("xDeath--- > " + xDeath);if (xDeath != null && !xDeath.isEmpty()) {Map<String, Object> entrys = xDeath.get(0);// for(Entry<String, Object>// entry:entrys.entrySet()){// System.out.println(entry.geTKEy()+":"+entry.getValue());// }originalExpiration = entrys.get("original-expiration").toString();}}System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'" + "time" + System.currentTimeMillis());HttpClient httpClient = new DefaultHttpClient();HttpPost post = new HttpPost(message);HttpResponse response = httpClient.execute(post);BufferedReader inreader = null;if (response.getStatusLine().getStatusCode() == 200) {inreader = new BufferedReader(new InputStreamReader(response.getEntity().getContent(), "UTF-8"));StringBuffer responseBody = new StringBuffer();String line = null;while ((line = inreader.readLine()) != null) {responseBody.append(line);if (!responseBody.equals("success")) {// putDeclre15s(message);if (originalExpiration.equals("0")) {putDeclreQueue(message, 3000, QU_declare_15S);}if (originalExpiration.equals("3000")) {putDeclreQueue(message, 30000, QU_declare_15S);if (originalExpiration.equals("30000")) {putDeclreQueue(message, 60000, QU_declare_15S);if (originalExpiration.equals("60000")) {putDeclreQueue(message, 120000, QU_declare_15S);if (originalExpiration.equals("120000")) {putDeclreQueue(message, 180000, QU_declare_15S);if (originalExpiration.equals("180000")) {putDeclreQueue(message, 300000, QU_declare_15S);if (originalExpiration.equals("300000")) {//channel.basicConsume(QU_declare_300S,true, this);System.out.println("finish notify");} else {System.out.println(response.getStatusLine().getStatusCode());}};channel.basicConsume(queueName, true, consumer);} catch (Exception e) {e.printStackTrace();} finally {}static Map<String, Object> xdeathMap = new HashMap<String, Object>();static List<Map<String, Object>> xDeath = new ArrayList<Map<String, Object>>();static Map<String, Object> xdeathParam = new HashMap<String, Object>();public static void putDeclre15s(String message) throws IOException {channel.exchangeDeclare(EX_declare_15S, "topic");Map<String, Object> args = new HashMap<String, Object>();args.put("x-dead-letter-exchange", EXCHANGE_NAME);// 死信exchangeAMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();builder.expiration("3000").deliveryMode(2);// 设置消息TTLAMQP.BasicProperties properties = builder.build();channel.queueDeclare(QU_declare_15S, false, false, false, args);channel.queueBind(QU_declare_15S, EX_declare_15S, ROUTINGKEY);channel.basicPublish(EX_declare_15S, ROUTINGKEY, properties, message.getBytes());System.out.println("send message in QA_DEFERRED_15S" + message + "time" + System.currentTimeMillis());public static void putDeclreQueue(String message, int mis, String queue) throws IOException {builder.expiration(String.valueOf(mis)).deliveryMode(2);// 设置消息TTLchannel.queueDeclare(queue, false, false, false, args);channel.queueBind(queue, EX_declare_15S, ROUTINGKEY);System.out.println("send message in " + queue + message + "time============" + System.currentTimeMillis());public static void main(String args[]) throws Exception {init();consume();}

消息通过dlx转发的情况下,header头部会带有x-death的一个数组,里面包含消息的各项属性,比如说消息成为死信的原因reason,original-expiration这个字段表示消息在原来队列中的过期时间,根据这个值来确定下一次通知的延迟时间应该是多少秒。运行结果如下:

RabbitMQ延迟队列如何实现订单支付结果异步阶梯性通知

RabbitMQ延迟队列如何实现订单支付结果异步阶梯性通知

RabbitMQ延迟队列如何实现订单支付结果异步阶梯性通知

看完了这篇文章,相信你对“RabbitMQ延迟队列如何实现订单支付结果异步阶梯性通知”有了一定的了解,如果想了解更多相关知识,欢迎关注编程网精选频道,感谢各位的阅读!

--结束END--

本文标题: RabbitMQ延迟队列如何实现订单支付结果异步阶梯性通知

本文链接: https://www.lsjlt.com/news/322782.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

本篇文章演示代码以及资料文档资料下载

下载Word文档到电脑,方便收藏和打印~

下载Word文档
猜你喜欢
  • C++ 生态系统中流行库和框架的贡献指南
    作为 c++++ 开发人员,通过遵循以下步骤即可为流行库和框架做出贡献:选择一个项目并熟悉其代码库。在 issue 跟踪器中寻找适合初学者的问题。创建一个新分支,实现修复并添加测试。提交...
    99+
    2024-05-15
    框架 c++ 流行库 git
  • C++ 生态系统中流行库和框架的社区支持情况
    c++++生态系统中流行库和框架的社区支持情况:boost:活跃的社区提供广泛的文档、教程和讨论区,确保持续的维护和更新。qt:庞大的社区提供丰富的文档、示例和论坛,积极参与开发和维护。...
    99+
    2024-05-15
    生态系统 社区支持 c++ overflow 标准库
  • c++中if elseif使用规则
    c++ 中 if-else if 语句的使用规则为:语法:if (条件1) { // 执行代码块 1} else if (条件 2) { // 执行代码块 2}// ...else ...
    99+
    2024-05-15
    c++
  • c++中的继承怎么写
    继承是一种允许类从现有类派生并访问其成员的强大机制。在 c++ 中,继承类型包括:单继承:一个子类从一个基类继承。多继承:一个子类从多个基类继承。层次继承:多个子类从同一个基类继承。多层...
    99+
    2024-05-15
    c++
  • c++中如何使用类和对象掌握目标
    在 c++ 中创建类和对象:使用 class 关键字定义类,包含数据成员和方法。使用对象名称和类名称创建对象。访问权限包括:公有、受保护和私有。数据成员是类的变量,每个对象拥有自己的副本...
    99+
    2024-05-15
    c++
  • c++中优先级是什么意思
    c++ 中的优先级规则:优先级高的操作符先执行,相同优先级的从左到右执行,括号可改变执行顺序。操作符优先级表包含从最高到最低的优先级列表,其中赋值运算符具有最低优先级。通过了解优先级,可...
    99+
    2024-05-15
    c++
  • c++中a+是什么意思
    c++ 中的 a+ 运算符表示自增运算符,用于将变量递增 1 并将结果存储在同一变量中。语法为 a++,用法包括循环和计数器。它可与后置递增运算符 ++a 交换使用,后者在表达式求值后递...
    99+
    2024-05-15
    c++
  • c++中a.b什么意思
    c++kquote>“a.b”表示对象“a”的成员“b”,用于访问对象成员,可用“对象名.成员名”的语法。它还可以用于访问嵌套成员,如“对象名.嵌套成员名.成员名”的语法。 c++...
    99+
    2024-05-15
    c++
  • C++ 并发编程库的优缺点
    c++++ 提供了多种并发编程库,满足不同场景下的需求。线程库 (std::thread) 易于使用但开销大;异步库 (std::async) 可异步执行任务,但 api 复杂;协程库 ...
    99+
    2024-05-15
    c++ 并发编程
  • 如何在 Golang 中备份数据库?
    在 golang 中备份数据库对于保护数据至关重要。可以使用标准库中的 database/sql 包,或第三方包如 github.com/go-sql-driver/mysql。具体步骤...
    99+
    2024-05-15
    golang 数据库备份 mysql git 标准库
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作