iis服务器助手广告广告
返回顶部
首页 > 资讯 > 后端开发 > PHP编程 >PHP实现生产者与消费者的案例
  • 686
分享到

PHP实现生产者与消费者的案例

2023-06-14 07:06:45 686人浏览 安东尼
摘要

这篇文章主要介绍PHP实现生产者与消费者的案例,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!php中使用kafka需要RdKafka扩展,而RdKafka依赖于librdkafka,所以这两个我们都需要安装,具体安装

这篇文章主要介绍PHP实现生产者与消费者的案例,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!

php中使用kafka需要RdKafka扩展,而RdKafka依赖于librdkafka,所以这两个我们都需要安装,具体安装方法自行百度,本篇不做说明了。

生产者(测试

创建消费者需要步骤:

  • 生产者配置参数

  • 创建生产者实例

  • 创建主题实例(依赖生产者)

  • 生产主题消息

  • 推送消息

具体代码如下:

        $conf = new \RdKafka\Conf();        // 绑定服务节点        $conf->set('metadata.broker.list', '127.0.0.1:32772');        // 创建生产者        $kafka = new \RdKafka\Producer($conf);        // 创建主题实例        $topic = $kafka->newTopic('p1r1');        // 生产主题数据,此时消息在缓冲区中,并没有真正被推送        $topic->produce(RD_KAFKA_PARTITioN_UA, 0, 'Message');        // 阻塞时间(毫秒), 0为非阻塞        $kafka->poll(0);         // 推送消息,如果不调用此函数,消息不会被发送且会丢失        $result = $kafka->flush(5000);        if (RD_KAFKA_RESP_ERR_NO_ERROR !== $result) {            throw new \RuntimeException('Was unable to flush, messages might be lost!');        }

消费者

创建一个消费者需要几个步骤:

  • 消费者配置参数

  • 应用配置参数创建消费者实例

  • 订阅对应主题

  • 拉取数据

  • 提交位移

具体代码如下:

        $conf = new \RdKafka\Conf();        // 绑定消费者组        $conf->set('group.id', 'ceshi');        // 绑定服务节点,多个用,分隔        $conf->set('metadata.broker.list', '127.0.0.1:32787');        // 设置自动提交为false        $conf->set('enable.auto.commit', 'false');        // 设置当前消费者拉取数据时的偏移量, 可选参数:        // earliest: 如果消费者组是新创建的,从头开始消费,否则从消费者组当前消费位移开始。        // latest:如果消费者组是新创建的,从最新偏移量开始,否则从消费者组当前消费位移开始。        $conf->set('auto.offset.reset', 'earliest');        // 创建消费者实例        $consumer = new \RdKafka\KafkaConsumer($conf);        // 消费者订阅主题,数组形式        $consumer->subscribe(['topic1','topic2']);        while (true) {            // 消费数据,阻塞5秒(5秒内有数据就消费,没有数据等待5秒进入下一轮循环)            $message = $consumer->consume(5000);            switch ($message->err) {                case RD_KAFKA_RESP_ERR_NO_ERROR:                    // 业务逻辑                    var_dump($message);                    // 提交位移                    $consumer->commit($message);                    break;                case RD_KAFKA_RESP_ERR__PARTITION_EOF:                    echo "No more messages; will wait for more\n";                    break;                case RD_KAFKA_RESP_ERR__TIMED_OUT:                    echo "Timed out\n";                    break;                default:                    throw new \Exception($message->errstr(), $message->err);                    break;            }        }        // 关闭消费者(一般用在脚本中,不需要关闭)        $conumser->close();

只消费指定分区中的数据:

    // 对消费者指定分区,注意此方式不能与subscribe一同使用    $consumer->assign([        new RdKafka\TopicPartition("topic", 0),        new RdKafka\TopicPartition("topic", 1),    ]);

以上是“PHP实现生产者与消费者的案例”这篇文章的所有内容,感谢各位的阅读!希望分享的内容对大家有帮助,更多相关知识,欢迎关注编程网PHP编程频道!

--结束END--

本文标题: PHP实现生产者与消费者的案例

本文链接: https://www.lsjlt.com/news/269085.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

本篇文章演示代码以及资料文档资料下载

下载Word文档到电脑,方便收藏和打印~

下载Word文档
猜你喜欢
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作