广告
返回顶部
首页 > 资讯 > 后端开发 > PHP编程 >thinkphp6、thinkphp5.0 使用think-queue实现普通队列和延迟队列
  • 790
分享到

thinkphp6、thinkphp5.0 使用think-queue实现普通队列和延迟队列

redisphp 2023-08-31 07:08:28 790人浏览 泡泡鱼
摘要

何为异步消息队列: 所谓消息队列,就是一个以队列数据结构为基础的一个实体,这个实体是真实存在的,比如程序中的数组,数据库中的表,或者Redis等等,都可以。 异步队列的作用: 个人认为消息队列的主

  • 何为异步消息队列
    所谓消息队列,就是一个以队列数据结构为基础的一个实体,这个实体是真实存在的,比如程序中的数组数据库中的表,或者Redis等等,都可以。

  • 异步队列的作用:
    个人认为消息队列的主要特点是异步处理,主要目的是减少请求响应时间和解耦。所以主要的使用场景就是将比较耗时而且不需要即时(同步)返回结果的操作作为消息放入消息队列

composer 安装 think-queue

https://GitHub.com/coolseven/notes/blob/master/thinkPHP-queue/README.md

# tp5.0composer require topthink/think-queue=1.1.6# tp5.1.xcomposer require topthink/think-queue 2.0.4# tp6composer require topthink/think-queue

判断是否安装成功

php think queue:work -h

在这里插入图片描述

配置文件

Tp5.0
文件位置:根目录/config/queue.php

return [//    'connector' => 'Sync'    'connector'  => 'Redis',        // Redis 驱动    'expire'     => 60,        // 任务的过期时间,默认为60秒; 若要禁用,则设置为 null    'default'    => 'default',        // 默认的队列名称    'host'       => '127.0.0.1',    // redis 主机ip    'port'       => 6379,        // redis 端口    'passWord'   => '',        // redis 密码    'select'     => 1,        // 使用哪一个 db,默认为 db0    'timeout'    => 0,        // redis连接的超时时间    'persistent' => false,        // 是否是长连接];

Tp5.1.x
文件位置:根目录/config/queue.php

return [//    'connector' => 'Sync'    'connector'  => 'Redis',        // Redis 驱动    'expire'     => 60,        // 任务的过期时间,默认为60秒; 若要禁用,则设置为 null    'default'    => 'default',        // 默认的队列名称    'host'       => '127.0.0.1',    // redis 主机ip    'port'       => 6379,        // redis 端口    'password'   => '',        // redis 密码    'select'     => 1,        // 使用哪一个 db,默认为 db0    'timeout'    => 0,        // redis连接的超时时间    'persistent' => false,        // 是否是长连接];

Tp6
配置文件在统一目录下/config/queue.php

return [    'default'     => 'redis', // 使用redis    'connections' => [        'sync'     => [            'type' => 'sync',        ],        'database' => [            'type'       => 'database',            'queue'      => 'default',            'table'      => 'jobs',            'connection' => null,        ],        'redis'    => [            'type'       => 'redis',            'queue'      => 'default',            'host'       => '127.0.0.1',            'port'       => 6379,            'password'   => '',            'select'     => 1,            'timeout'    => 0,            'persistent' => false,        ],    ],    'failed'      => [        'type'  => 'none',        'table' => 'failed_jobs',    ],];

项目下新建一个Job目录,编写对应的消费者类

在这里插入图片描述

Tp6

namespace app\job;use think\facade\Log;use think\queue\Job;class CronJob{        public function fire(Job $job, $data)    {        Log::channel('job')->info('一条测试日志');        if (empty($data)) {            Log::channel('job')->error(sprintf('[%s][%s] 队列无消息', __CLASS__, __FUNCTION__));            return;        }        //有效消息到达消费者时可能已经不再需要执行了        if (!$this->checkJob($data)) {            $job->delete();            Log::channel('job')->record("Job does not need to be executed");            return;        }        //执行业务处理        if ($this->doJob($data)) {            $job->delete();//任务执行成功后删除            Log::channel('job')->record("job has been down and deleted");        } else {            //检查任务重试次数            if ($job->attempts() > 3) {                Log::channel('job')->record("job has been retried more that 3 times");//                $job->release(10); // 10秒后在执行                $job->delete(); // 删除任务            }        }    }        private function checkJob($data)    {        Log::channel('job')->record('验证任务是否需要执行');        return true;    }        private function doJob($data)    {        // 实际业务流程处理        print_r($data['msg'] ?? '实际业务流程处理');        Log::channel('job')->record('实际业务流程处理');        return true;    }    function task1(){        print_r("task 1");    }    public function failed($data)    {        // ...任务达到最大重试次数后,失败了        Log::channel('job')->error('任务达到最大重试次数后,失败了');    }}

上面使用了日志通道 配置文件路径 /config/log.php

// +----------------------------------------------------------------------// | 日志设置// +----------------------------------------------------------------------return [    // 默认日志记录通道    'default'      => env('log.channel', 'file'),    // 日志记录级别    'level'        => [],    // 日志类型记录的通道 ['error'=>'email',...]    'type_channel'    =>    [],    // 关闭全局日志写入    'close'        => false,    // 全局日志处理 支持闭包    'processor'    => null,    // 日志通道列表    'channels'     => [        'file' => [            // 日志记录方式            'type'           => 'File',            // 日志保存目录            'path'           => '',            // 单文件日志写入            'single'         => false,            // 独立日志级别            'apart_level'    => [],            // 最大日志文件数量            'max_files'      => 0,            // 使用JSON格式记录            'json'           => false,            // 日志处理            'processor'      => null,            // 关闭通道日志写入            'close'          => false,            // 日志输出格式化            'fORMat'         => '[%s][%s] %s',            // 是否实时写入            'realtime_write' => false,        ],        // 其它日志通道配置        'job'    =>    [            'type' => 'File',//            'path' => app()->getRootPath() . 'runtime/pay', // 重点这个路径要写            'path' => app()->getRuntimePath() . 'pay', // 重点这个路径要写            'time_format' => 'Y-m-d H:i:s',            'format' => '[%s][%s]:%s'        ],    ],];

控制器编写测试代码

namespace app\api\controller\v1;use app\job\CronJob;use think\facade\Queue;class User{    public function index(): string    {        return 'v1/user/index2';    }        public function push(): string    {        //  queue的 push方法 第一个参数可以接收字符或者对象字符串        $job = 'app\Job\CronJob'; // 当前任务由哪个类负责处理        $queueName = 'cron_job_queue';  // 当前队列归属的队列名称        //  // 当前任务所需的业务数据        $data['msg'] = 'Test queue msg,time:' . date('Y-m-d H:i:s', time());        $data['user_id'] = 1;//        $res = Queue::push(CronJob::class, $data, $queueName);  // 可以自动获取        $res = Queue::push($job, $data, $queueName);   // 可以手动指定 -        $data['msg'] = 'later Test queue msg,time:' . date('Y-m-d H:i:s', time());        $res = Queue::later(10, $job, $data, $queueName);   // 10秒后执行        $data['msg'] = 'task1---,time:' . date('Y-m-d H:i:s', time());        $res = Queue::later(30, "app\Job\CronJob@task1", $data, $queueName);   // 10秒后执行        if ($res == false) {            return '消息投递失败';        } else {            return '消息投递成功';        }    }}

测试

简述下work和listen在thinkphp框架中明显区别

work模式只能跑一条队列数据,如果想持续运行必须加 --daemon 参数;
2、work模式速度和性能更快,每次执行无需再加载框架所有文件,因此如果想修改队列代码必须重启队列,listen模式每次执行都会重新加载thinkphp文件,相当于用户访问一个接口一样;
3、work模式因为无需重复加载框架文件,因此也会带来Mysql链接超时的烦恼,目前thinkphp5.x全系版本无法解决此问题,所以如果队列中有操作数据库的不建议使用此模式;

php think queue:work --queue cron_job_queuephp think queue:listen --daemon --queue helloJobQueue# linux上以守护进程方式运行nohup php think queue:work  --daemon --queue cron_job_queue &

在这里插入图片描述

在这里插入图片描述

来源地址:https://blog.csdn.net/qq_23564667/article/details/127919660

--结束END--

本文标题: thinkphp6、thinkphp5.0 使用think-queue实现普通队列和延迟队列

本文链接: https://www.lsjlt.com/news/384699.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

本篇文章演示代码以及资料文档资料下载

下载Word文档到电脑,方便收藏和打印~

下载Word文档
猜你喜欢
  • thinkphp6、thinkphp5.0 使用think-queue实现普通队列和延迟队列
    何为异步消息队列: 所谓消息队列,就是一个以队列数据结构为基础的一个实体,这个实体是真实存在的,比如程序中的数组,数据库中的表,或者redis等等,都可以。 异步队列的作用: 个人认为消息队列的主...
    99+
    2023-08-31
    redis php
  • thinkphp6使用think-queue怎么实现普通队列和延迟队列
    本文小编为大家详细介绍“thinkphp6使用think-queue怎么实现普通队列和延迟队列”,内容详细,步骤清晰,细节处理妥当,希望这篇“thinkphp6使用think-queue怎么实现普通队列和延迟队列”文章能帮助大家解决疑惑,下...
    99+
    2023-06-30
  • ThinkPHP怎么使用think-queue实现redis消息队列
    本篇内容主要讲解“ThinkPHP怎么使用think-queue实现redis消息队列”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“ThinkPHP怎么使用think-queue实现redis消...
    99+
    2023-07-02
  • 使用Redis怎么实现延迟队列
    本篇文章给大家分享的是有关使用Redis怎么实现延迟队列,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。方案一:采用通过定时任务采用数据库/非关系型数据库轮询方案。优点: 实现简...
    99+
    2023-06-15
  • ThinkPHP 使用 think-queue 实现 redis 消息队列(超详细)
    简单介绍: 消息队列中间件是大型系统中的重要组件,已经逐渐成为企业系统内部通信的核心手段。它具有松耦合、异步消息、流量削峰、可靠投递、广播、流量控制、最终一致性等一系列功能,已经成为异步RPC的主要手...
    99+
    2023-09-04
    redis php
  • 简述thinkphp自带队列think-queue的使用以及通过supervisor实现常驻进程
    think-queue是thinkphp官方提供的一个消息队列服务,适用于大并发、返回结果时间较长、需要批量操作等专门支持队列服务的扩展包。例如短信发送、模板消息邮件等推送。可以进行发布、获取、执行、...
    99+
    2023-09-07
    linux php redis
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作