iis服务器助手广告
返回顶部
首页 > 资讯 > 精选 >如何限制生产者和消费者读取消息?
  • 139
分享到

如何限制生产者和消费者读取消息?

2024-02-11 17:02:16 139人浏览 薄情痞子
摘要

PHP小编子墨在软件开发过程中,消息队列是一种常见的通信机制,用于实现生产者和消费者之间的异步通信。然而,有时候我们希望控制生产者和消费者对消息的读取,以便更好地管理系统资源和处理高峰

PHP小编子墨在软件开发过程中,消息队列是一种常见的通信机制,用于实现生产者和消费者之间的异步通信。然而,有时候我们希望控制生产者和消费者对消息的读取,以便更好地管理系统资源和处理高峰时段的请求。本文将介绍一些限制生产者和消费者读取消息的方法,帮助开发优化系统性能和提高应用的稳定性。

问题内容

我想用 Go 获得应用程序生产者-消费者(通过信号关闭)。

生产者不断在队列中生成消息,限制为 10 条。 一些消费者阅读并处理该频道。 如果队列中的消息数为0,生产者再次生成10条消息。 当收到停止信号时,生产者停止生成新消息,消费者处理通道中的所有内容。

我找到了一段代码,但无法理解它是否正常工作,因为发现了奇怪的东西:

  1. 为什么停止程序后,队列中的消息并没有全部处理完,好像丢失了部分数据。 (在屏幕截图中,发送了 15 条消息,但处理了 5 条消息)
  2. 如何正确地将队列限制为10条消息,即必须写入10条消息,等待队列计数器变为0时处理,然后再写入10条?
  3. 是否可以在停止信号后通知生产者,以便他不再向通道生成新消息? (在屏幕截图中,生产者成功写入队列 - 12,13,14,15)

结果:

代码示例:

package main

import (
    "context"
    "fmt"
    "math/rand"
    "os"
    "os/signal"
    "sync"
    "syscall"
    "time"
)

func main() {
    const nConsumers = 2

    in := make(chan int, 10)
    p := Producer{&in}
    c := Consumer{&in, make(chan int, nConsumers)}
    go p.Produce()
    ctx, cancelFunc := context.WithCancel(context.Background())
    go c.Consume(ctx)
    wg := &sync.WaitGroup{}
    wg.Add(nConsumers)
    for i := 1; i <= nConsumers; i++ {
        go c.Work(wg, i)
    }
    termChan := make(chan os.Signal, 1)
    signal.Notify(termChan, syscall.SIGINT, syscall.SIGTERM)

    <-termChan

    cancelFunc()
    wg.Wait()
}

type Consumer struct {
    in   *chan int
    jobs chan int
}

func (c Consumer) Work(wg *sync.WaitGroup, i int) {
    defer wg.Done()
    for job := range c.jobs {
        fmt.Printf("Worker #%d start job %d\n", i, job)
        time.Sleep(time.Millisecond * time.Duration(3000+rand.Intn(3000)))
        fmt.Printf("Worker #%d finish job %d\n", i, job)
    }
    fmt.Printf("Worker #%d interrupted\n", i)
}

func (c Consumer) Consume(ctx context.Context) {
    for {
        select {
        case job := <-*c.in:
            c.jobs <- job
        case <-ctx.Done():
            close(c.jobs)
            fmt.Println("Consumer close channel")
            return
        }
    }
}

type Producer struct {
    in *chan int
}

func (p Producer) Produce() {
    task := 1
    for {
        *p.in <- task
        fmt.Printf("Send value %d\n", task)
        task++
        time.Sleep(time.Millisecond * 500)
    }
}

解决方法

为什么停止程序后,队列中的消息并没有全部处理完,好像丢失了部分数据。

这是因为当 ctx 完成后,(consumer).consume 停止从 in 通道读取,但 go p.produce() 创建的 goroutine 仍然写入 in 通道。

下面的演示解决了这个问题并简化了源代码。

注释

  1. producectx 完成后停止。并且它关闭了 in 通道。

  2. 字段 jobs 已从 consumer 中删除,工作人员直接从 in 通道读取。

  3. 以下要求被忽略,因为它很奇怪。常见的行为是,当作业产生时,如果 in 通道未满,则作业会立即发送到 in 通道;当它已满时,发送操作将阻塞,直到从 in 通道读取作业为止。

    如果队列中的消息数为0,生产者再次生成10条消息

package main

import (
    "context"
    "fmt"
    "math/rand"
    "os/signal"
    "sync"
    "syscall"
    "time"
)

func main() {
    const nConsumers = 2

    ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
    defer stop()

    in := make(chan int, 10)
    p := Producer{in}
    c := Consumer{in}
    go p.Produce(ctx)

    var wg sync.WaitGroup
    wg.Add(nConsumers)
    for i := 1; i <= nConsumers; i++ {
        go c.Work(&wg, i)
    }

    <-ctx.Done()
    fmt.Printf("\nGot end signal, waiting for %d jobs to finish\n", len(in))
    wg.Wait()
}

type Consumer struct {
    in chan int
}

func (c *Consumer) Work(wg *sync.WaitGroup, i int) {
    defer wg.Done()
    for job := range c.in {
        fmt.Printf("Worker #%d start job %d\n", i, job)
        time.Sleep(time.Millisecond * time.Duration(3000+rand.Intn(3000)))
        fmt.Printf("Worker #%d finish job %d\n", i, job)
    }
    fmt.Printf("Worker #%d interrupted\n", i)
}

type Producer struct {
    in chan int
}

func (p *Producer) Produce(ctx context.Context) {
    task := 1
    for {
        select {
        case p.in <- task:
            fmt.Printf("Send value %d\n", task)
            task++
            time.Sleep(time.Millisecond * 500)
        case <-ctx.Done():
            close(p.in)
            return
        }
    }
}

以上就是如何限制生产者和消费者读取消息?的详细内容,更多请关注编程网其它相关文章!

--结束END--

本文标题: 如何限制生产者和消费者读取消息?

本文链接: https://www.lsjlt.com/news/563535.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

本篇文章演示代码以及资料文档资料下载

下载Word文档到电脑,方便收藏和打印~

下载Word文档
猜你喜欢
  • 如何限制生产者和消费者读取消息?
    php小编子墨在软件开发过程中,消息队列是一种常见的通信机制,用于实现生产者和消费者之间的异步通信。然而,有时候我们希望控制生产者和消费者对消息的读取,以便更好地管理系统资源和处理高峰...
    99+
    2024-02-11
  • 生产者消费者
    1、概念 所谓,生产者与消费者模型,本质上是把进程通信的问题分开考虑 生产者,只需要往队列里面丢东西(生产者不需要关心消费者) 消费者,只需要从队列里面拿东西(消费者也不需要关心生产者) 1 # 多线程实现生产者消费者模型 2 ...
    99+
    2023-01-30
    生产者 消费者
  • kafka-3python生产者和消费者
    程序分为productor.py是发送消息端,consumer为消费消息端,启动的时候先启动product再启动consumer,毕竟只有发了消息,消费端才有消息可以消费,productor.py#!/usr/bin/env python2...
    99+
    2023-01-31
    生产者 消费者 kafka
  • 爬虫——生产者消费者
    结构 生产者生成网址并放入队列 多个消费者从队列中取出网址 1 from queue import Queue 2 import time, threading, requests 3 4 url_base = 'ht...
    99+
    2023-01-30
    爬虫 生产者 消费者
  • python 的生产者和消费者模式
    目录python 的生产者和消费者模式一、生产者消费者模式概述二、为什么使用生产者消费者模式三、什么是生产者消费者模式四、代码案例1、定义一个生产者2、定义一个消费者3、定义一个队列...
    99+
    2024-04-02
  • Kafka系列:查看Topic列表、消息消费情况、模拟生产者消费者
    1、查看kafka队列中topic信息 1.1、查看所有topic ./kafka-topics.sh --zookeeper 10.128.106.52:2181 --list 1.2、查看kafka中指定topic的详情 ./kafk...
    99+
    2023-08-21
    kafka 分布式 java
  • 生产者、消费者模型---Queue类
    Queue队列在几乎每种编程语言都会有,python的列表隐藏的一个特点就是一个后进先出(LIFO)队列。而本文所讨论的Queue是python标准库queue中的一个类。它的原理与列表相似,但是先进先出(FIFO)队列。而内部实现更为完...
    99+
    2023-01-30
    生产者 模型 消费者
  • java中BlockingQueue如何实现生产者消费者
    这篇文章主要为大家展示了“java中BlockingQueue如何实现生产者消费者”,内容简而易懂,条理清晰,希望能够帮助大家解决疑惑,下面让小编带领大家一起研究并学习一下“java中BlockingQueue如何实现生产者消费者”这篇文章...
    99+
    2023-05-30
    java blockingqueue
  • Golang rabbitMQ生产者和消费者怎么实现
    今天小编给大家分享一下Golang rabbitMQ生产者和消费者怎么实现的相关知识点,内容详细,逻辑清晰,相信大部分人都还太了解这方面的知识,所以分享这篇文章给大家参考一下,希望大家阅读完这篇文章后有所收获,下面我们一起来了解一...
    99+
    2023-06-30
  • Java中的生产者/消费者模型
    一、什么是生产者/消费者模型 生产者-消费者模型(Producer-Consumer problem)是一个非常经典的多线程并发协作的模型。 比如某个模块负责生产数据,而另一个模块负责处理数据。产生数据的模块就形象地被称为生产者;而处理数据...
    99+
    2023-10-05
    java 多线程 wait notify notifyAll
  • Go 生产者消费者避免死锁
    问题内容 我有一个关于消费者和生产者的代码。虽然我在这里提出了这个问题以进行代码审查,并且这个想法的很大一部分是从这个线程中衍生出来的,这里是操场上的代码。 此代码有多个生产者和消费...
    99+
    2024-02-06
  • Kafka中生产者和消费者指的是什么
    在Kafka中,生产者和消费者是指Kafka消息系统中参与消息传递的两种角色。 生产者是指负责向Kafka集群中的主题(topic)...
    99+
    2024-03-14
    Kafka
  • 生产者与消费者+Queue(线程安全)
    from queue import Queue from lxml import etree import requests from urllib import request from threading import Thread...
    99+
    2023-01-30
    生产者 线程 消费者
  • Golang rabbitMQ生产者消费者实现示例
    目录消费者生产者消费者 package main import ( "fmt" "github.com/streadway/amqp" ) func failOnError(er...
    99+
    2024-04-02
  • java 中怎么实现生产者消费者
    今天就跟大家聊聊有关java 中怎么实现生产者消费者,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。生产者消费者图存储空间已满,而生产者占用着它,消费者等着生产者让出空间从而去除产品,...
    99+
    2023-06-17
  • java中生产者和消费者问题实例分析
    这篇“java中生产者和消费者问题实例分析”文章的知识点大部分人都不太理解,所以小编给大家总结了以下内容,内容详细,步骤清晰,具有一定的借鉴价值,希望大家阅读完这篇文章能有所收获,下面我们一起来看看这篇“java中生产者和消费者问题实例分析...
    99+
    2023-06-29
  • java中生产者消费者问题和代码案例
    目录应用场景分析解决方法管程法信号灯法总结应用场景 假设仓库中只能存放一件产品,生产者将生产出来的产品放入仓库,消费者将仓库中产品取走消费 如果仓库中没有产品,则生产者将产品放入仓库...
    99+
    2024-04-02
  • PHP实现生产者与消费者的案例
    这篇文章主要介绍PHP实现生产者与消费者的案例,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!PHP中使用Kafka需要RdKafka扩展,而RdKafka依赖于librdkafka,所以这两个我们都需要安装,具体安装...
    99+
    2023-06-14
  • Java 生产者/消费者问题实例详解
    生产者和消费者问题是线程模型中的经典问题:生产者和消费者在同一时间段内共用同一个存储空间,如下图所示,生产者向空间里存放数据,而消费者取用数据,如果不加以协调可能会出现以下情况:存储空间已满,而生产者占用着它,消费者等着生产者让出空间从而去...
    99+
    2023-05-31
    java 生产者消费者 ava
  • Java多线程:生产者与消费者案例
    目录前言工具知识点设计思路具体步骤总结前言 想象一下生活中哪些是和线程沾边的?饭店炒菜就是一个很好的例子 首先客人要吃菜,前提是厨师要炒好,也就是说,厨师不炒好的话客人是没有饭菜的。...
    99+
    2024-04-02
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作