广告
返回顶部
首页 > 资讯 > 后端开发 > Python >Python之RabbitMQ
  • 823
分享到

Python之RabbitMQ

PythonRabbitMQ 2023-01-31 07:01:02 823人浏览 安东尼

Python 官方文档:入门教程 => 点击学习

摘要

RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件。RabbitMQ服务器是用Erlang语言编写的,它可以为你的应用提供一个通用的消息发送和接收平台,并且保证消息在传输过程中的安全,RabbitMQ官网,RabbitM


RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件。RabbitMQ服务器是用Erlang语言编写的,它可以为你的应用提供一个通用的消息发送和接收平台,并且保证消息在传输过程中的安全,RabbitMQ官网,RabbitMQ中文文档。


安装RabbitMQ


安装EPEL源


[root@root ~]# yum -y install epel-release


安装erlang


[root@root ~]# yum -y install erlang


安装RabbitMQ


[root@root ~]# yum -y install rabbitmq-server


启动并设置开机器启动


在启动RabbitMQ之前需要hostname的解析,要不然启动不起来


[root@root ~]# cat /etc/hosts
127.0.0.1   localhost localhost.localdomain localhost4 localhost4.localdomain4 anshengme
::1         localhost localhost.localdomain localhost6 localhost6.localdomain6


[root@root ~]# systemctl start rabbitmq-server
[root@root ~]# systemctl enable rabbitmq-server
Created symlink from /etc/systemd/system/multi-user.target.wants/rabbitmq-server.service to /usr/lib/systemd/system/rabbitmq-server.service.


查看启动状态


[root@root ~]# netstat -tulnp |grep 5672
tcp        0      0 0.0.0.0:25672           0.0.0.0:*               LISTEN      37507/beam.smp      
tcp6       0      0 :::5672                 :::*                    LISTEN      37507/beam.smp


pika


pika模块是官方认可的操作RabbitMQ的api接口。


安装pika


pip3 install pika


pika:https://pypi.python.org/pypi/pika


测试


>>> import pika


Work Queues


如果你启动了多个消费者,那么生产者生产的任务会根据顺序的依次让消费者来执行,这就是Work Queues模式

wKioL1lRo0-R8tCXAAAbRac36VY052.png

rabbitmq-work-queues


生产者代码


# _*_ codin:utf-8 _*_
import pika

# 连接到RabbitMQ 这是一个阻塞的连接
connection = pika.BlockinGConnection(pika.ConnectionParameters('192.168.56.100'))

# 生成一个管道
channel = connection.channel()

# 通过管道创建一个队列
channel.queue_declare(queue='hello')

# 在队列内发送数据,body内容,routing_key队列,exchange交换器,通过交换器往hello队列内发送Hello World!数据
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')

# 关闭连接
connection.close()

消费者代码
# _*_ codin:utf-8 _*_
import pika

# 连接到RabbitMQ 这是一个阻塞的连接
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.56.100'))

# 生成一个管道
channel = connection.channel()

# 如果消费者连接到这个队列的时候,队列没有生成,那么消费者就生成这个队列,如果这个队列已经生成了,那么就忽略它
channel.queue_declare(queue='hello')

# 回调函数
def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    
# 消费,当收到hello队列的消息的时候就,就调用callback函数,no_ack消费者在处理任务的时候要不需要确认任务已经处理完成,改为False则要确认
channel.basic_consume(callback, queue='hello', no_ack=True)

# 开始接受任务,阻塞
channel.start_consuming()
持久化



队列持久化


试想,如果我们的消费者在执行任务执行到一半时,突然down掉了,我们可以更改no_ack=False来让消费者每次执行完成完成之后确认执行完毕了再把这个任务在队列中移除移除掉,但是如果RabbitMQ的服务器停止我们的任务仍然会丢失。


首先,我们需要确保的RabbitMQ永远不会在我们的队列中失去,为了做到这一点,我们需要把durable=True,声明一个新名称的队列,为task_queue:


channel.queue_declare(queue='task_queue', durable=True)


durable需要在生产者和消费者上面都需要写上,且durable只会让我们的队列持久化,并不能够让消息持久化。


消息持久化


消息持久化只需要在添加消息的时候添加一个delivery_mode=2


channel.basic_publish(exchange='',
                      routing_key='world',
                      body='Hello World!',
                      properties=pika.BasicProperties(
                          # 2=消息持久化
                          delivery_mode=2,
                      ))


在消费者的callback函数内添加以下代码:


ch.basic_ack(delivery_tag = method.delivery_tag)


消息公平分发


每一个消费者同时只处理一个任务,比如说现在有三个消费者,刚开始来了三个任务,平均分配给了三个消费者,那么这三个消费者目前都在同时执行任务,当第四个任务到来的时候依旧会分配给第一个消费者,第五个任务到来的时候会分配给第二个消费者,以此类推。


那么以上的状况有什么不妥呢?譬如说不同的消费者执行任务的时间不同,我们现在需要的时候,当三个消费者都在执行任务的时候,比如说第二个消费者任务执行完了,其他消费者都还在执行任务,当第四个任务到来的时候希望交给第二个消费者,若要实现此功能,只需要在消费者加上一下代码即可:


channel.basic_qos(prefetch_count=1)


完整的代码如下


消费者代码


#!/usr/bin/env Python
import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='192.168.56.100'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)
print(' [*] Waiting for messages. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    time.sleep(10)
    print(" [x] Done")
    ch.basic_ack(delivery_tag=method.delivery_tag)
    
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
                      queue='task_queue')
                      
channel.start_consuming()


生产者代码


#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='192.168.56.100'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)

for n in range(10):
    message = "Hello World! %s" % (n + 1)
    channel.basic_publish(exchange='',
                          routing_key='task_queue',
                          body=message,
                          properties=pika.BasicProperties(
                              delivery_mode=2,  # make message persistent
                          ))
    print(" [x] Sent %r" % message)
connection.close()


消息传输类型


之前的例子都基本都是1对1的消息发送和接收,即消息只能发送到指定的queue里,但有些时候你想让你的消息被所有的Queue收到,类似广播的效果,这时候就要用到exchange了,


Exchange在定义的时候是有类型的,以决定到底是哪些Queue符合条件,可以接收消息


属性        描述
fanout      所有bind到此exchange的queue都可以接收消息
direct      通过routingKey和exchange决定的那个唯一的queue可以接收消息
topic       所有符合routingKey(此时可以是一个表达式)的routingKey所bind的queue可以接收消息


fanout(发布订阅)


只要有消费者,那么我生产者发布一条消息的时候所有的消费者都会被收到

wKioL1lRpCiwDpI_AAAO2WHjmrg668.png

rabbitmq-fanout


# 消费者
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.56.100'))
channel = connection.channel()
channel.exchange_declare(exchange='logs', type='fanout')
# 不指定queue名字,rabbit会随机分配一个名字,exclusive=True会在使用此queue的消费者断开后,自动将queue删除
result = channel.queue_declare(exclusive=True)
# 获取queue的name
queue_name = result.method.queue
# 把queue绑定到exchange
channel.queue_bind(exchange='logs', queue=queue_name)
def callback(ch, method, properties, body):
    print(" [x] %r" % body)
channel.basic_consume(callback,queue=queue_name,no_ack=True)
channel.start_consuming()


# 生产者
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.56.100'))
channel = connection.channel()
# fanout发送给所有人
channel.exchange_declare(exchange='logs', type='fanout')
channel.basic_publish(exchange='logs', routing_key='', body="Hello World!")
connection.close()


直接(关键字)


RabbitMQ还支持根据关键字发送,即:队列绑定关键字,发送者将数据根据关键字发送到消息exchange,exchange根据 关键字 判定应该将数据发送至指定队列。

wKioL1lRpKCBzS8HAAAVtogSqW0191.png

rabbitmq-direct


生产者代码
#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='192.168.56.100'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs',
                         type='direct')
                         
severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='direct_logs',
                      routing_key=severity,
                      body=message)
print(" [x] Sent %r:%r" % (severity, message))
connection.close()


消费者代码
#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='192.168.56.100'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs',
                         type='direct')
                         
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

severities = sys.argv[1:]
if not severities:
    sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
    sys.exit(1)
for severity in severities:
    channel.queue_bind(exchange='direct_logs',
                       queue=queue_name,
                       routing_key=severity)
                       
print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))
    
channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)
                      
channel.start_consuming()


topic(模糊匹配)


在topic类型下,可以让队列绑定几个模糊的关键字,之后发送者将数据发送到exchange,exchange将传入”路由值“和 ”关键字“进行匹配,匹配成功,则将数据发送到指定队列。


表达式符号说明:


符号     描述
#        表示可以匹配0个或多个单词
*        表示只能匹配一个单词


发送者路由值     队列中        是否匹配
yangwen          yangwen.*     不匹配
yangwen          yangwen.#     匹配


消费者代码


#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='192.168.56.100'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs',
                         type='topic')
                         
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

binding_keys = sys.argv[1:]
if not binding_keys:
    sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
    sys.exit(1)
    
for binding_key in binding_keys:
    channel.queue_bind(exchange='topic_logs',
                       queue=queue_name,
                       routing_key=binding_key)
                       
print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))
    
channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)
                      
channel.start_consuming()


生产者代码


#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='192.168.56.100'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs',
                         type='topic')
                         
routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='topic_logs',
                      routing_key=routing_key,
                      body=message)
print(" [x] Sent %r:%r" % (routing_key, message))
connection.close()


rpc(远程过程调用)

客户端发送一个任务到服务端,服务端把任务的执行结果再返回给客户端

wKioL1lRpdHy1r-NAAAfr90s8dw854.png


RPC Server

# _*_coding:utf-8_*_
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='192.168.56.100'))
channel = connection.channel()
# 声明一个RPC QUEUE
channel.queue_declare(queue='rpc_queue')

def fib(n):
    if n == 0:
        return 0
    elif n == 1:
        return 1
    else:
        return fib(n - 1) + fib(n - 2)
        
def on_request(ch, method, props, body):
    # 接受传过来的值
    n = int(body)
    print(" [.] fib(%s)" % n)
    # 交给fib函数进行斐波那契处理
    response = fib(n)
    # 把结果发回去,此时消费者变成生产者
    ch.basic_publish(exchange='',
                     routing_key=props.reply_to,
                     # 客户端传过来的UUID顺便发回去
                     properties=pika.BasicProperties(correlation_id=props.correlation_id),
                     body=str(response))
    # 持久化
    ch.basic_ack(delivery_tag=method.delivery_tag)
    
# 同时只处理一个任务
channel.basic_qos(prefetch_count=1)
channel.basic_consume(on_request, queue='rpc_queue')
print(" [x] Awaiting RPC requests")
channel.start_consuming()


RPC Client


# _*_coding:utf-8_*_
import pika
import uuid

class FibonacciRpcClient(object):
    def __init__(self):
        self.connection = pika.BlockingConnection(pika.ConnectionParameters(
            host='192.168.56.100'))
            
        self.channel = self.connection.channel()
        
        result = self.channel.queue_declare(exclusive=True)
        # 服务端返回处理完毕的数据新Queue名称
        self.callback_queue = result.method.queue
        
        self.channel.basic_consume(self.on_response, no_ack=True,
                                   queue=self.callback_queue)
                                   
    def on_response(self, ch, method, props, body):
        # corr_id等于刚刚发送过去的ID,就代表这条消息是我的
        if self.corr_id == props.correlation_id:
            self.response = body
            
    def call(self, n):
        self.response = None
        # 生成一个唯一ID,相当于每个任务的ID
        self.corr_id = str(uuid.uuid4())
        self.channel.basic_publish(exchange='',
                                   routing_key='rpc_queue',
                                   properties=pika.BasicProperties(
                                       # 让服务端处理完成之后把数据放到这个Queue里面
                                       reply_to=self.callback_queue,
                                       # 加上一个任务ID
                                       correlation_id=self.corr_id,
                                   ),
                                   body=str(n))
        while self.response is None:
            # 不断地去Queue接受消息,但不是阻塞的,而是一直循环的去取
            self.connection.process_data_events()
        return int(self.response)
        
fibonacci_rpc = FibonacciRpcClient()
print(" [x] Requesting fib(30)")
response = fibonacci_rpc.call(30)
print(" [.] Got %r" % response)



--结束END--

本文标题: Python之RabbitMQ

本文链接: https://www.lsjlt.com/news/190714.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

本篇文章演示代码以及资料文档资料下载

下载Word文档到电脑,方便收藏和打印~

下载Word文档
猜你喜欢
  • Python之RabbitMQ
    RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件。RabbitMQ服务器是用Erlang语言编写的,它可以为你的应用提供一个通用的消息发送和接收平台,并且保证消息在传输过程中的安全,RabbitMQ官网,RabbitM...
    99+
    2023-01-31
    Python RabbitMQ
  • Python RabbitMQ
    RabbitMQRabbitMQ是一个在AMQP基础上完整的,可复用的企业消息系统。他遵循Mozilla Public License开源协议。MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。应用...
    99+
    2023-01-31
    Python RabbitMQ
  • python rabbitmq no_
    发送端:import pika connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.chann...
    99+
    2023-01-31
    python rabbitmq
  • python rabbitmq send
    #!/usr/bin/env python#-*- coding: utf8 -*- import pikaimport tracebacktry:    connection = pika.BlockingConnection(pika....
    99+
    2023-01-31
    python rabbitmq send
  • 【Python模块】rabbitMQ
    RabbitMQ介绍:父进程与子进程间,同一父继承可以用multiprocess的Manager模块来实现数据互访。作用:RabbitMQ是为了实现相互独立的两个进程数据互访。应用场景:不需要立即操作的数据。比如:发消息,发通知,发红包等。...
    99+
    2023-01-31
    模块 Python rabbitMQ
  • python中的rabbitmq
    RabbitMQ是一个在AMQP基础上完整的,可复用的企业消息系统。他遵循Mozilla Public License开源协议。MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过读写...
    99+
    2023-01-30
    python rabbitmq
  • python rabbitmq 队列持久
    发送端:import pika connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.chann...
    99+
    2023-01-31
    队列 持久 python
  • Springboot整合Rabbitmq之Confirm和Return机制
    目录前言为什么会有ConfirmSpringboot 整合 Mq 实现 Confirm 监听机制依赖引入增加配置文件,设定连接信息配置队列、交换机,以及对其进行绑定编写mq消息发送服...
    99+
    2022-11-13
  • python使用pika操作rabbitmq
    python 连接操作rabbitMQ 主要是使用pika库pip3 install pika==1.1.0 官方对于pika有如下介绍Since threads aren’t a...
    99+
    2023-01-31
    操作 python pika
  • python操作rabbitmq 实践笔
    发布/订阅  系统1.基本用法生产者 1 import pika 2 import sys 3 4 username = 'wt' #指定远程rabbitmq的用户名密码 5 pwd = '111111' 6 user_pw...
    99+
    2023-01-31
    操作 python rabbitmq
  • Python介绍RabbitMQ使用篇二
    1. RabbitMQ WorkQueue基本工作模式介绍 上一篇我们使用C#语言讲解了单个消费者从消息队列中处理消息的模型,这一篇我们使用Python语言来讲解多个消费者同时工作从一个Queue处理消息的模型。 工作队列(又称:任务队...
    99+
    2023-01-31
    Python RabbitMQ
  • python测试rabbitmq的消息收
    send.py#!/usr/bin/env python    # -*- coding: UTF-8 -*-  import pika   import random            credentials = pika.Plain...
    99+
    2023-01-31
    消息 测试 python
  • Java开发 - 消息队列之RabbitMQ初体验
    目录 前言 RabbitMQ 什么是RabbitMQ RabbitMQ特点 安装启动 RabbitMQ和Kafka的消息收发区别 RabbitMQ使用案例 添加依赖 添加配置 创建RabbitMQ配置类 RabbitMQ消息的发送 Rab...
    99+
    2023-09-16
    java-rabbitmq rabbitmq java 消息队列 交换机
  • RabbitMQ
    RabbitMQ概述:RabbitMQ是使用最广泛的开源消息代理。RabbitMQ轻量级,易于在集群内部和云平台中部署。它支持多种消息传递协议。 它可以满足企业高规模,高可用性的要求。RabbitMQ使用Erlang语言开发的。MQ概...
    99+
    2023-01-30
    RabbitMQ
  • 【RabbitMQ】RabbitMQ控制台的使用
    一、访问控制台页面 如果在本机上装了RabbitMQ则在浏览器访问127.0.0.1:15672,如果在服务器装了RabbitMQ则通过在浏览器输入urlip:15762来访问 登录后进入主页   二、添加RabbitMQ用户 进入主页...
    99+
    2023-09-18
    java-rabbitmq rabbitmq java
  • 利用Python学习RabbitMQ消息队列
    RabbitMQ可以当做一个消息代理,它的核心原理非常简单:即接收和发送消息,可以把它想象成一个邮局:我们把信件放入邮箱,邮递员就会把信件投递到你的收件人处,RabbitMQ就是一个邮箱、邮局、投递员功能综...
    99+
    2022-06-04
    队列 消息 Python
  • 【RabbitMQ】什么是RabbitMQ?RabbitMQ有什么用?应用场景有那些?
    目录 一、什么是RabbitMQ? 二、RabbitMQ是干什么的? 三、RabbitMQ的常见作用有那些? 四、RabbitMQ的应用场景有那些? 场景一:用户订单,库存处理。【服务间解耦】 场景二:用户注册,发送手机短信,邮件。【实现异...
    99+
    2023-08-31
    rabbitmq java 中间件 MQ
  • python操作RabbitMq的三种工作模式
    目录一、简介:二、RabbitMq 生产和消费三、RabbitMq 持久化四、RabbitMq 发布与订阅模式一:fanout模式二:direct模式三:topicd一、简介: Ra...
    99+
    2022-11-10
  • Golang与RabbitMQ实现多服务之间的异步通信
    要使用Golang与RabbitMQ实现多服务之间的异步通信,你需要按照以下步骤进行操作:1. 安装RabbitMQ:首先,你需要安...
    99+
    2023-10-08
    Golang
  • Java面试高频问题之RabbitMQ系列全面解析
    1.RabbitMQ是什么? RabbitMQ是一款开源的,Erlang编写的,基于AMQP(高级消息队列协议)协议的消息中间件。 2.为什么要使用消息队列? 从本质上来说是因为互联...
    99+
    2022-11-12
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作