iis服务器助手广告广告
返回顶部
首页 > 资讯 > 后端开发 > PHP编程 >think\queue 消息队列
  • 957
分享到

think\queue 消息队列

redisphpthinkphp 2023-09-10 22:09:48 957人浏览 安东尼
摘要

简介 TP 中使用 think-queue 可以实现普通队列和延迟队列。 think-queue 是thinkPHP 官方提供的一个消息队列服务,它支持消息队列的一些基本特性: 消息的发布,获取,执行

简介

TP 中使用 think-queue 可以实现普通队列和延迟队列。

think-queue 是thinkPHP 官方提供的一个消息队列服务,它支持消息队列的一些基本特性:

  • 消息的发布,获取,执行,删除,重发,失败处理,延迟执行,超时控制等
  • 队列的多队列, 内存限制 ,启动,停止,守护等
  • 消息队列可降级为同步执行

消息队列实现过程

  1. 通过生产者推送消息到消息队列服务中
  2. 消息队列服务将收到的消息存入Redis队列中(zset)
  3. 消费者进行监听队列,当监听到队列有新的消息时,获取队列第一条
  4. 处理获取下来的消息调用业务类进行处理相关业务
  5. 业务处理后,需要从队列中删除消息

安装queue

composer 安装 think-queue

composer require topthink/think-queue

配置文件

在 \application\extra 新建queue.php 为 queue 的配置文件
声明启动redis (服务器redis需要先开启安装redis扩展)

return [    'connector'  => 'Redis',          // Redis 驱动    'expire'     => null,             // 任务的过期时间,默认为60秒; 若要禁用,则设置为 null    'default'    => 'default',        // 默认的队列名称    'host'       => '127.0.0.1',      // redis 主机ip    'port'       => 6379,             // redis 端口    'passWord'   => '',               // redis 密码    'select'     => 0,                // 使用哪一个 db,默认为 db0    'timeout'    => 0,                // redis连接的超时时间    'persistent' => false,            // 是否是长连接];

queue服务基类

首先要写一个声明一个服务基类 命名空间及文件夹名 自己定义好 到直接位置
我这里是在 \application\common\library\task\OrderClose.php 这个路径

namespace app\common\library\task;use app\common\controller\Frontend;use think\Lang;use think\Response;use think\queue\Job;class OrderClose{            public function fire(Job $job, $data)    {        // 此处做一些 check,提前判断是否需要执行        // $isJobStillNeedToBeDone = $this->checkJob($data);        // if(! $isJobStillNeedToBeDone){        //     $job->delete();        //     return;        // }        // if ($job->attempts() > 6) {        //     $job->delete();        //     // 也可以重新发布这个任务        //     //$job->release(2); // $delay为延迟时间,表示该任务延迟2秒后再执行        // }        // 执行逻辑处理(即:你需要该消息队列做什么)        $isJobDone = $this->doJob($data);        if ($isJobDone) {            // 如果任务执行成功,记得删除任务            $job->delete();            // $job->release(10);        } else {            //删除任务            $job->delete();            // 通过这个方法可以检查这个任务已经重试了几次了                    }    }            private function checkJob($data){        // $order_id = !empty($data['order_id']) ? $data['order_id'] : '';        // if(!$order_id) return false;        // $orderModel = new \app\api\model\order\Order;        // $orderInfo = $orderModel->where(['id'=>$order_id])->find();        // if(!$orderInfo) return false;        // if($orderInfo->status != '1') return false;        // //更新订单数据        // $updateData = [        //     'status'=>'10',        //     'close_time'=>time(),        //     'remarks'=>'未支付订单超时自动关闭'        // ];        // $ret = $orderModel->save($updateData);        // if($ret){        //     return true;        // }else{        //     return false;            // }    }            private function doJob($data)    {        $order_id = !empty($data['order_id']) ? $data['order_id'] : '';        if(!$order_id) return false;        $orderInfo = \app\api\model\order\Order::where(['id'=>$order_id])->find();        // \think\Log::write(['msg'=>'测试队列7878','data'=>$orderInfo,'order_id'=>$order_id,'activity_id'=>$orderInfo['activity_id']],'error');        if(!$orderInfo) return false;        if($orderInfo->status != '1') return false;        //更新订单数据        $updateData = [            'status'=>'10',            'close_time'=>time(),            'remarks'=>'未支付订单超时自动关闭'        ];        $ret = \app\api\model\order\Order::where(['id'=>$order_id])->update($updateData);        //归还库存        if($orderInfo['activity_id']){            // \think\Log::write(['msg'=>'测试队列返还库存','data'=>$orderInfo,'order_id'=>$order_id,'activity_id'=>$orderInfo['activity_id']],'error');            $orderProductInfo = \app\api\model\order\OrderProduct::where('order_id',$order_id)->find();            $backStock = \app\api\model\activity\Activity::where('id',$orderInfo['activity_id'])->setInc('stock',$orderProductInfo['number']);        }        if($ret){            return true;        }else{            return false;            }    }    }

$job->release(2); 重新调用任务 2是延迟2秒执行
$job->delete(); 删除任务

调用

在实际应用中调用该基类

//创建订单自动关闭任务$jobHandlerClassName = 'app\common\library\task\OrderClose';$jobData = ['order_id'=>$orderProductDatas['order_id']];$jobQueueName = 'order_close';//延迟执行$isPushed = \think\Queue::later(30 * 60,$jobHandlerClassName, $jobData,$jobQueueName);//立即执行$isPushed = \think\Queue::push($jobHandlerClassName, $jobData,$jobQueueName);

两个方法,前者是(30 * 60)秒后执行,后者立即执行
注意的是
later 延迟执行 无反参 我执行成功 返回了null
push 有反参 返回随机的字符串

开启守护进程

我服务器是用宝塔的守护进程
在这里插入图片描述
在这里安装Supervisor管理器
安装完需要配置一下守护进程的命令行

在这里插入图片描述
名称必须是英文
运行目录要选择项目根目录
命令行如下 注意order_close 是你项目基类名称

php think queue:listen --queue order_close

填写完成之后 保存就可以了

如果不是宝塔环境 请看一下 supervisor—进程管理神器
给我大哥点点关注

结束

这里的配置就基本结束了 基类中的 注释 表明了一下需要参数和方法 需要自己多揣摩一下
还有自己的业务逻辑也要修改 这里只作为参考 大家自行删除就可以

来源地址:https://blog.csdn.net/weixin_43866089/article/details/126436143

--结束END--

本文标题: think\queue 消息队列

本文链接: https://www.lsjlt.com/news/402664.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

本篇文章演示代码以及资料文档资料下载

下载Word文档到电脑,方便收藏和打印~

下载Word文档
猜你喜欢
  • think\queue 消息队列
    简介 TP 中使用 think-queue 可以实现普通队列和延迟队列。 think-queue 是thinkphp 官方提供的一个消息队列服务,它支持消息队列的一些基本特性: 消息的发布,获取,执行...
    99+
    2023-09-10
    redis php thinkphp
  • ThinkPHP6 think-queue 消息队列(延迟队列)
    安装 composer require topthink/think-queue 配置 配置文件位于 config/queue.php [ 'default'=>'sync' //...
    99+
    2023-09-04
    php 数据库 mysql
  • thinkphp5.0消息队列topthink/think-queue详解
    第一、安装topthink/think-queue composer require topthink/think-queue=1.1.6 第二、配置queue.php信息 找到应用目录下面的extra/queue.php进行配置,队列驱...
    99+
    2023-10-07
    php 开发语言
  • thinkphp6 消息队列think-queue(完整版)
    1.安装队列依赖 如果是在Linux上,进入thinkphp项目的think文件所在目录,执行安装命令 composer require topthink/think-queue 修改queue的配置文件,文件位置config/queue....
    99+
    2023-10-21
    php 服务器 thinkphp queue
  • ThinkPHP怎么使用think-queue实现redis消息队列
    本篇内容主要讲解“ThinkPHP怎么使用think-queue实现redis消息队列”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“ThinkPHP怎么使用think-queue实现redis消...
    99+
    2023-07-02
  • ThinkPHP 使用 think-queue 实现 redis 消息队列(超详细)
    简单介绍: 消息队列中间件是大型系统中的重要组件,已经逐渐成为企业系统内部通信的核心手段。它具有松耦合、异步消息、流量削峰、可靠投递、广播、流量控制、最终一致性等一系列功能,已经成为异步RPC的主要手...
    99+
    2023-09-04
    redis php
  • python消息队列Queue
    实例1:消息队列Queue,不要将文件命名为“queue.py”,否则会报异常“ImportError: cannot import name 'Queue'”#coding=utf-8 from multiprocessing impor...
    99+
    2023-01-31
    队列 消息 python
  • thinkphp6 使用 topthink/think-queue 配置守护进程消息队列
    当前演示使用宝塔面板 安装composer require topthink/think-queue 配置config/queue.php return [ //驱动类型,可选择 sync(默认):同...
    99+
    2023-09-21
    数据库 php redis
  • ThinkPHP基于think-queue的队列插件实现消息推送
    目录前言安装搭建消息队列的存储环境消息的创建与推送消息的消费与删除发布任务处理任务think-queue是ThinkPHP官方提供的一个消息队列服务,是专门支持队列服务的扩展包。th...
    99+
    2022-12-14
    thinkphp think-queue 消息推送
  • thinkphp6、thinkphp5.0 使用think-queue实现普通队列和延迟队列
    何为异步消息队列: 所谓消息队列,就是一个以队列数据结构为基础的一个实体,这个实体是真实存在的,比如程序中的数组,数据库中的表,或者redis等等,都可以。 异步队列的作用: 个人认为消息队列的主...
    99+
    2023-08-31
    redis php
  • thinkphp6使用think-queue怎么实现普通队列和延迟队列
    本文小编为大家详细介绍“thinkphp6使用think-queue怎么实现普通队列和延迟队列”,内容详细,步骤清晰,细节处理妥当,希望这篇“thinkphp6使用think-queue怎么实现普通队列和延迟队列”文章能帮助大家解决疑惑,下...
    99+
    2023-06-30
  • 消息队列 Kafka
    Kafka Kafka 是一个分布式的基于发布/订阅模式的消息队列(MQ,Message Queue),主要应用于大数据实时处理领域 为什么使用消息队列MQ 在高并发环境下,同步请求来不及处理会发生堵塞,从而触发too many conn...
    99+
    2023-10-23
    kafka 分布式
  • Python消息队列
    消息中间件 --->就是消息队列异步方式:不需要立马得到结果,需要排队同步方式:需要实时获得数据,坚决不能排队例子:#多进程模块multiprocessingfrom multiprocessing import Processfro...
    99+
    2023-01-31
    队列 消息 Python
  • RabbitMQ消息队列
      一、简介   RabbitMQ是一个在AMQP基础上完整的、可复用的企业消息系统,遵循Mozilla Public License开源协议。MQ全称Message Queue(消息队列),它是一种应用程序对应用程序的通信方式。应用程序...
    99+
    2023-01-31
    队列 消息 RabbitMQ
  • RabbitMQ 消息队列
    RabbitMQ是一个在AMQP基础上完整的,可复用的企业消息系统。他遵循Mozilla Public License开源协议。 MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过读写...
    99+
    2023-01-31
    队列 消息 RabbitMQ
  • kafka之消息队列
    大数据工具 kafka 学习 之前需要先了解队列的相关知识 了解万队列就知道kafka的用处 之后再详细了解kafka的具体知识和操作 ...
    99+
    2021-05-31
    kafka之消息队列
  • Redis中如何实现消息队列和延时消息队列
    这篇文章将为大家详细讲解有关Redis中如何实现消息队列和延时消息队列,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。list的几个命令lpush (left push)由...
    99+
    2022-10-19
  • Java自带消息队列Queue的使用教程详细讲解
    目录阻塞队列和非阻塞队列非阻塞队列阻塞队列抛出异常特殊值阻塞超时总结阻塞队列和非阻塞队列 非阻塞队列 ConcurrentLinkedQueue 单向链表结构的无界并发队列, 非阻塞...
    99+
    2023-05-20
    Java自带消息队列Queue Java Queue Java消息队列
  • FreeRTOS-消息队列详解
    ✅作者简介:嵌入式入坑者,与大家一起加油,希望文章能够帮助各位!!!! 📃个人主页:@rivencode的个人主页 🔥系列专栏:玩转FreeRTOS Ὂ...
    99+
    2023-09-29
    java 网络 开发语言
  • redis中的消息队列
    这期内容当中的小编将会给大家带来有关redis中的消息队列介绍,以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。一、认识消息队列1.1 消息队列概念“消息”是在两台计算机间传送的数据单位。...
    99+
    2022-11-30
    redis 消息队列 edi
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作