广告
返回顶部
首页 > 资讯 > 后端开发 > Python >python—多进程的消息队列
  • 223
分享到

python—多进程的消息队列

队列进程消息 2023-01-31 07:01:21 223人浏览 薄情痞子

Python 官方文档:入门教程 => 点击学习

摘要

消息队列消息队列是在消息的传输过程中保存消息的容器消息队列最经典的用法就是消费者 和生产者之间通过消息管道传递消息,消费者和生成者是不同的进程。生产者往管道写消息,消费者从管道中读消息操作系统提供了很多机制来实现进程间的通信,multipr

消息队列

消息队列是在消息的传输过程中保存消息的容器

消息队列最经典的用法就是消费者 和生产者之间通过消息管道传递消息,消费者和生成者是不同的进程。生产者往管道写消息,消费者从管道中读消息

操作系统提供了很多机制来实现进程间的通信,multiprocessing模块提供了Queue和Pipe两种方法来实现


一、使用multiprocessing里面的Queue来实现消息队列

q = Queue() 

q.put(data)  #生产消息

data = q.get() #消费消息


例子:

from multiprocessing import Queue, Process

def write(q):
    for i in ["a","b","c","d"]:
        q.put(i)
        print("put {0} to queue".fORMat(i))
        
def read(q):
    while 1:
        result = q.get()
        print("get {0} from queue".format(result))
        
def main():
    q = Queue()  #定义一个消息队列容器
    pw = Process(target=write,args=(q,)) #定义一个写的进程
    pr = Process(target=read,args=(q,))  #定义一个读的进程
    pw.start()   #启动进程
    pr.start()
    pw.join()    
    pr.terminate()
if __name__ == "__main__":
    main()

运行结果:

put a to queue

put b to queueget a from queue

get b from queue

put c to queue

put d to queue

get c from queue

get d from queue



二、通过Multiprocessing里面的Pipe来实现消息队列

1)Pipe方法返回(conn1,conn2)代表一个管道的两个端。Pipe方法有duplex参数,如果duplux参数为True(默认值),那么这个管道是全双工模式,即conn1和conn2均可收发。duplux为False,conn1负责接收消息,conn2负责发行消息

2)send和recv方法分别是发送和接收消息的方法。close方法表示关闭管道,当消息接收结束以后,关闭管道。


例子:

from multiprocessing import Process,Pipe
import time

def proc1(pipe):
    for i in xrange(1,10):
        pipe.send(i)
        time.sleep(3)
        print("send {0} to pipe".format(i))
        
def proc2(pipe):
    n = 9
    while n>0:
        result = pipe.recv()
        time.sleep(3)
        print("recv {0} from pipe".format(result))
        n -= 1
        
if __name__ == "__main__":
    pipe = Pipe(duplex=False)  #定义并实例化一个管道
    print(type(pipe))
    p1 = Process(target=proc1,args=(pipe[1],))   #pipe[1],管道的右边,表示进入端,发送数据
    p2 = Process(target=proc2,args=(pipe[0],))   #pipe[0],管道的左边,表示出口端,接收数据
    p1.start()
    p2.start()
    
    p1.join()
    p2.join()
    
    pipe[0].close()
    pipe[1].close()


运行结果:

<type 'tuple'>

send 1 to pipe

recv 1 from pipe

send 2 to pipe

recv 2 from pipe

recv 3 from pipe

send 3 to pipe

send 4 to piperecv 4 from pipe

send 5 to pipe

recv 5 from pipe

recv 6 from pipe

send 6 to pipe

send 7 to pipe

recv 7 from pipe

send 8 to pipe

recv 8 from pipe

recv 9 from pipesend 9 to pipe


三、Queue模块

python提供了Queue模块来专门实现消息队列:

Queue对象实现一个fifo队列(其他的还有lifo、priority队列)。queue只有gsize一个构造函数,用来指定队列容量,指定为0的时候代表容量无限。主要有以下成员函数:

Queue.gsize():返回消息队列的当前空间。返回的值不一定可靠。

Queue.empty():判断消息队列是否为空,返回True或者False。同样不可靠

Queue.full():判断消息是否满

Queue.put(item,block=True,timeout=None):往消息队列中存放数据。block可以控制是否阻塞,timeout控制阻塞时候的等待时间。如果不阻塞或者超时,会引起一个full exception。

Queue.put_nowait(item):相当于put(item,False)

Queue.get(block=True,timeout=None):获取一个消息,其他等同put


以下两个函数用来判断消息对应的任务是否完成:

Queue.task_done():接收消息的线程通过调用这个函来说明消息对应的任务已完成

Queue.join():实际上意味着等到队列为空,再执行别的操作


例子:

from multiprocessing import Process, Pipe, Queue
import time
from threading import Thread

class Proceduer(Thread):
    def __init__(self,queue):
        super(Proceduer,self).__init__() # 超类
        self.queue = queue   #将queue赋给self.queue,便于类中其他函数调用
        
    def run(self):
        try:
            for i in xrange(1,10):
                print("put data is: {0} to queue".format(i))
                self.queue.put(i)
        except Exception as e:
            print("put data error")
            raise e
            
class Consumer_odd(Thread):
    def __init__(self,queue):
        super(Consumer_odd, self).__init__()
        self.queue = queue
        
    def run(self):
        try:
            while self.queue.empty:   #判断消息队列是否为空
                number = self.queue.get()  #取到消息值
                if number%2 != 0:
                    print("get {0} from queue ODD".format(number))
                else:
                    self.queue.put(number)  #将信息放回队列中
            time.sleep(1)
        except Exception as e:
            raise e
            
class Consumer_even(Thread):
    def __init__(self,queue):
        super(Consumer_even,self).__init__()
        self.queue = queue
    def run(self):
        try:
            while self.queue.empty:
                number = self.queue.get()
                if number%2 == 0:
                    print("get {0} from queue Even,thread name is :{1}".format(number,self.getName()))
                else:
                    self.queue.put(number)
                time.sleep(1)
        except Exception as e:
            raise e
            
def main():
    queue = Queue()  #实例化一个消息队列
    p = Proceduer(queue=queue)  #消息队列作为参数赋值给生产者函数,并实例化
    
    p.start()   #启动一个带消息队列的函数
    p.join()    #等待结束
    time.sleep(1)
    
    c1 = Consumer_odd(queue=queue)   #消息队列作为参数赋值给消费者函数,并实例化
    c2 = Consumer_even(queue=queue)    #消息队列作为参数赋值给消费者函数,并实例化
    c1.start()
    c2.start()
    c1.join()
    c2.join()
    print("All threads terminate!")
    
if __name__ == "__main__":
    main()


运行结果:

put data is: 1 to queue

put data is: 2 to queue

put data is: 3 to queue

put data is: 4 to queue

put data is: 5 to queue

put data is: 6 to queue

put data is: 7 to queue

put data is: 8 to queue

put data is: 9 to queue

get 1 from queue ODD

get 3 from queue ODD

get 4 from queue Even,thread name is :Thread-3

get 5 from queue ODD

get 7 from queue ODD

get 9 from queue ODD

get 2 from queue Even,thread name is :Thread-3

get 6 from queue Even,thread name is :Thread-3

get 8 from queue Even,thread name is :Thread-3


例子2:

import Queue

q = Queue.Queue()

for i in range(5):
    q.put(i)

while not q.empty():
    print q.get()


运行结果:

0

1

2

3

4

--结束END--

本文标题: python—多进程的消息队列

本文链接: https://www.lsjlt.com/news/191119.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

本篇文章演示代码以及资料文档资料下载

下载Word文档到电脑,方便收藏和打印~

下载Word文档
猜你喜欢
  • python—多进程的消息队列
    消息队列消息队列是在消息的传输过程中保存消息的容器消息队列最经典的用法就是消费者 和生产者之间通过消息管道传递消息,消费者和生成者是不同的进程。生产者往管道写消息,消费者从管道中读消息操作系统提供了很多机制来实现进程间的通信,multipr...
    99+
    2023-01-31
    队列 进程 消息
  • Python消息队列
    消息中间件 --->就是消息队列异步方式:不需要立马得到结果,需要排队同步方式:需要实时获得数据,坚决不能排队例子:#多进程模块multiprocessingfrom multiprocessing import Processfro...
    99+
    2023-01-31
    队列 消息 Python
  • python消息队列Queue
    实例1:消息队列Queue,不要将文件命名为“queue.py”,否则会报异常“ImportError: cannot import name 'Queue'”#coding=utf-8 from multiprocessing impor...
    99+
    2023-01-31
    队列 消息 python
  • Python中线程的MQ消息队列实现以及消息队列的优点解析
    “消息队列”是在消息的传输过程中保存消息的容器。消息队列管理器在将消息从它的源中继到它的目标时充当中间人。队列的主要目的是提供路由并保证消息的传递;如果发送消息时接收者不可用,消息队列会保留消息,直到可以成...
    99+
    2022-06-04
    队列 消息 线程
  • Java进程间通信之消息队列
    目录消息队列1.消息队列的原理2.消息队列的接口:2.1创建消息队列2.2向消息队列发送消息2.3接收消息:2.4操作消息队列的接口2.5代码测试:信号量:信号量的原理总结消息队列 ...
    99+
    2022-11-13
  • 详解PHP多进程消费队列
    目录引言nginx进程模型进程设计进程信号量设计PHP安装修信号量信号量和系统调用daemon(守护)进程命令设计启动命令强制停止命令强制重启命令平滑停止命令平滑重启命令查看进程状态...
    99+
    2022-11-12
  • redis中的消息队列
    这期内容当中的小编将会给大家带来有关redis中的消息队列介绍,以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。一、认识消息队列1.1 消息队列概念“消息”是在两台计算机间传送的数据单位。...
    99+
    2022-11-30
    redis 消息队列 edi
  • 利用Python学习RabbitMQ消息队列
    RabbitMQ可以当做一个消息代理,它的核心原理非常简单:即接收和发送消息,可以把它想象成一个邮局:我们把信件放入邮箱,邮递员就会把信件投递到你的收件人处,RabbitMQ就是一个邮箱、邮局、投递员功能综...
    99+
    2022-06-04
    队列 消息 Python
  • python消息队列框架有哪些
    python中的消息队列框架有RabbitMQ、Redis、ZeroMQ、Jafka几种RabbitMQRabbitMQ是一个使用Erlang编写的开源消息队列框架,RabbitMQ支持AMQP、XMPP、SMTP、STOMP等协议,常用于...
    99+
    2022-10-11
  • Spring整合消息队列RabbitMQ流程
    目录搭建生产者工程创建工程添加依赖配置整合发送消息搭建消费者工程创建工程添加依赖配置整合消息监听器搭建生产者工程 创建工程 添加依赖 修改pom.xml文件内容为如下: <...
    99+
    2023-03-20
    Spring RabbitMQ Spring整合消息队列 Spring整合RabbitMQ
  • Python的消息队列包SnakeMQ使用初探
    一、关于snakemq的官方介绍 SnakeMQ的GitHub项目页:https://github.com/dsiroky/snakemq 1.纯python实现,跨平台 2.自动重连接 3.可靠发送--可...
    99+
    2022-06-04
    队列 消息 Python
  • java中的消息队列怎么利用多线程实现
    java中的消息队列怎么利用多线程实现?相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。定义一个队列缓存池: //static修饰的成员变量和成员方法独立于该类的任何对象...
    99+
    2023-05-31
    java 多线程 ava
  • Linux消息队列如何实现进程间通信
    这篇文章主要介绍Linux消息队列如何实现进程间通信,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!Linux消息队列实现进程间通信实例详解一、什么是消息队列消息队列提供了一种从一个进程向另一个进程发送一个数据块的方法...
    99+
    2023-06-09
  • 浅谈消息队列的原理
    什么是消息队列 这样的场景你一定不陌生:小王到M记点餐之后,服务员给了他一个号牌,并让他在柜台桌子前方等待叫号取餐。每个人都按照自己付款拿到的号牌顺序排队等叫号。即使店里人再多,也不会显得没有秩序。在上述场...
    99+
    2022-10-18
  • 如何分析Linux消息队列编程
    这期内容当中小编将会给大家带来有关如何分析Linux消息队列编程,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。消息队列,Unix的通信机制之一,可以理解为是一个存放消息(数据)容器。将消息写入消息队列,然...
    99+
    2023-06-28
  • PHP中的多进程消费队列有什么用
    小编给大家分享一下PHP中的多进程消费队列有什么用,希望大家阅读完这篇文章之后都有所收获,下面让我们一起去探讨吧!最近开发一个小功能,用到了队列mcq,启动一个进程消费队列数据,后边发现一个进程处理不过来了,又加了一个进程,过了段时间又处理...
    99+
    2023-06-06
  • Linux进程间通信中如何使用消息队列
    本篇文章给大家分享的是有关Linux进程间通信中如何使用消息队列,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。一、什么是消息队列消息队列提供了一种从一个进程向另一个进程发送一个...
    99+
    2023-06-16
  • Redis在消息队列中的妙用
    Redis在消息队列中的妙用消息队列是一种常见的解耦架构,用于在应用程序之间传递异步消息。通过将消息发送到队列中,发送者可以在不等待接收者响应的情况下继续执行其他任务。而接收者可以在适当的时间从队列中获取消息并进行处理。Redis是一种常用...
    99+
    2023-11-07
    redis 消息队列 妙用
  • 消息队列的作用有哪些
    本篇内容介绍了“消息队列的作用有哪些”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!流量削峰消息队列,其实并...
    99+
    2022-10-19
  • redis实现消息队列的方法
    这期内容当中的小编将会给大家带来有关redis实现消息队列的方法,以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。消息队列,Message Queue,常用于解决并发系统中的资源一致性问题...
    99+
    2022-10-18
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作