Queue 介紹
原理
EasySwoole
封裝實(shí)現(xiàn)了一個(gè)輕量級(jí)的隊(duì)列,默認(rèn)使用 Redis
作為隊(duì)列驅(qū)動(dòng)器。
用戶可以自己實(shí)現(xiàn)一個(gè)隊(duì)列驅(qū)動(dòng)器來(lái)實(shí)現(xiàn)隊(duì)列,用 kafka
作為隊(duì)列驅(qū)動(dòng)器或者 其他驅(qū)動(dòng)器方式
作為隊(duì)列驅(qū)動(dòng)器,來(lái)進(jìn)行存儲(chǔ)。
從上可知,Queue
并不是一個(gè)單獨(dú)使用的組件,它更像一個(gè)對(duì)不同驅(qū)動(dòng)的隊(duì)列進(jìn)行統(tǒng)一封裝的門面組件。
Queue 組件當(dāng)前最新穩(wěn)定版本為 3.x。
舊版本 (2.1.x) 的 Queue
組件的使用,請(qǐng)看 Queue 2.1.x
組件要求
- ext-swoole: >=4.4.0
- easyswoole/component: ^2.0
- easyswoole/redis-pool: ~2.2.0
安裝方法
composer require easyswoole/queue 3.x
倉(cāng)庫(kù)地址
基本使用
默認(rèn)自帶的隊(duì)列驅(qū)動(dòng)為 Redis
隊(duì)列。這里簡(jiǎn)單列舉 2 種用戶可使用的方式:
- 在框架的任意位置進(jìn)行生產(chǎn)和消費(fèi)隊(duì)列任務(wù)。
- 在框架的任意位置進(jìn)行生產(chǎn)隊(duì)列任務(wù), 然后在自定義進(jìn)程中進(jìn)行消費(fèi)任務(wù)。
在框架中進(jìn)行生產(chǎn)和消費(fèi)任務(wù)
創(chuàng)建隊(duì)列
use EasySwoole\Queue\Driver\RedisQueue;
use EasySwoole\Queue\Job;
use EasySwoole\Queue\Queue;
use EasySwoole\Redis\Config\RedisConfig;
// 配置 Redis 隊(duì)列驅(qū)動(dòng)器
$redisConfig = new RedisConfig([
'host' => '127.0.0.1', // 服務(wù)端地址 默認(rèn)為 '127.0.0.1'
'port' => 6379, // 端口 默認(rèn)為 6379
'auth' => '', // 密碼 默認(rèn)為 不設(shè)置
'db' => 0, // 默認(rèn)為 0 號(hào)庫(kù)
]);
// 創(chuàng)建隊(duì)列
$queue = new Queue(new RedisQueue($redisConfig));
普通生產(chǎn)任務(wù)
$queue
為上述創(chuàng)建隊(duì)列中得到的隊(duì)列對(duì)象。
// 創(chuàng)建任務(wù)
$job = new Job();
// 設(shè)置任務(wù)數(shù)據(jù)
$job->setJobData("this is my job data time time ".date('Ymd h:i:s'));
// 生產(chǎn)普通任務(wù)
$queue->producer()->push($job);
普通消費(fèi)任務(wù)
$queue
為上述創(chuàng)建隊(duì)列中得到的隊(duì)列對(duì)象。
// 消費(fèi)任務(wù)
$job = $queue->consumer()->pop();
// 或者是自定義進(jìn)程中消費(fèi)任務(wù)(具體使用請(qǐng)看下文自定義進(jìn)程消費(fèi)任務(wù)完整使用示例)
$queue->consumer()->listen(function (Job $job){
var_dump($job);
});
生產(chǎn)延遲任務(wù)
$queue
為上述創(chuàng)建隊(duì)列中得到的隊(duì)列對(duì)象。
// 創(chuàng)建任務(wù)
$job = new Job();
// 設(shè)置任務(wù)數(shù)據(jù)
$job->setJobData("this is my job data time time ".date('Ymd h:i:s'));
// 設(shè)置任務(wù)延后執(zhí)行時(shí)間
$job->setDelayTime(5);
// 生產(chǎn)延遲任務(wù)
$queue->producer()->push($job);
生產(chǎn)可信任務(wù)
// 創(chuàng)建任務(wù)
$job = new Job();
// 設(shè)置任務(wù)數(shù)據(jù)
$job->setJobData("this is my job data time time ".date('Ymd h:i:s'));
// 設(shè)置任務(wù)重試次數(shù)為 3 次。任務(wù)如果沒(méi)有確認(rèn),則會(huì)執(zhí)行三次
$job->setRetryTimes(3);
// 如果5秒內(nèi)沒(méi)確認(rèn)任務(wù),會(huì)重新回到隊(duì)列。默認(rèn)為3秒
$job->setWaitConfirmTime(5);
// 投遞任務(wù)
$queue->producer()->push($job);
// 確認(rèn)一個(gè)任務(wù)
$queue->consumer()->confirm($job);
在框架中生產(chǎn)任務(wù)和自定義進(jìn)程中消費(fèi)任務(wù)
- 注冊(cè)隊(duì)列驅(qū)動(dòng)器
- 設(shè)置消費(fèi)進(jìn)程
- 生產(chǎn)者投遞任務(wù)
定義一個(gè)隊(duì)列
<?php
namespace App\Utility;
use EasySwoole\Component\Singleton;
use EasySwoole\Queue\Queue;
class MyQueue extends Queue
{
use Singleton;
}
定義消費(fèi)進(jìn)程
<?php
namespace App\Utility;
use EasySwoole\Component\Process\AbstractProcess;
use EasySwoole\Queue\Job;
class QueueProcess extends AbstractProcess
{
protected function run($arg)
{
go(function (){
MyQueue::getInstance()->consumer()->listen(function (Job $job){
var_dump($job->getJobData());
});
});
}
}
支持多進(jìn)程、多協(xié)程消費(fèi)
注冊(cè)隊(duì)列驅(qū)動(dòng)器、消費(fèi)進(jìn)程及設(shè)置生產(chǎn)者投遞任務(wù)
<?php
namespace EasySwoole\EasySwoole;
use App\Utility\MyQueue;
use App\Utility\QueueProcess;
use EasySwoole\Component\Timer;
use EasySwoole\EasySwoole\AbstractInterface\Event;
use EasySwoole\EasySwoole\Swoole\EventRegister;
use EasySwoole\Queue\Job;
class EasySwooleEvent implements Event
{
public static function initialize()
{
date_default_timezone_set('Asia/Shanghai');
}
public static function mainServerCreate(EventRegister $register)
{
// redis pool 使用請(qǐng)看 redis 章節(jié)文檔
$redisConfig = new \EasySwoole\Redis\Config\RedisConfig(
[
'host' => '127.0.0.1', // 服務(wù)端地址 默認(rèn)為 '127.0.0.1'
'port' => 6379, // 端口 默認(rèn)為 6379
'auth' => '', // 密碼 默認(rèn)為 不設(shè)置
'db' => 0, // 默認(rèn)為 0 號(hào)庫(kù)
]
);
// 配置 隊(duì)列驅(qū)動(dòng)器
$driver = new \EasySwoole\Queue\Driver\RedisQueue($redisConfig, 'easyswoole_queue');
MyQueue::getInstance($driver);
// 注冊(cè)一個(gè)消費(fèi)進(jìn)程
$processConfig = new \EasySwoole\Component\Process\Config([
'processName' => 'QueueProcess', // 設(shè)置 自定義進(jìn)程名稱
'processGroup' => 'Queue', // 設(shè)置 自定義進(jìn)程組名稱
'enableCoroutine' => true, // 設(shè)置 自定義進(jìn)程自動(dòng)開(kāi)啟協(xié)程
]);
\EasySwoole\Component\Process\Manager::getInstance()->addProcess(new QueueProcess($processConfig));
// 模擬生產(chǎn)者,可以在任意位置投遞
$register->add($register::onWorkerStart, function ($server, $id) {
if ($id == 0) {
Timer::getInstance()->loop(3000, function () {
$job = new Job();
$job->setJobData(['time' => \time()]);
MyQueue::getInstance()->producer()->push($job);
});
}
});
}
}
進(jìn)程安全退出問(wèn)題請(qǐng)看 自定義進(jìn)程 章節(jié)。
控制器使用
以在 http
服務(wù)中為例,使用示例代碼如下:
<?php
namespace App\HttpController;
use App\Utility\MyQueue;
use EasySwoole\Http\AbstractInterface\Controller;
use EasySwoole\Http\Message\Status;
use EasySwoole\Queue\Driver\RedisQueue;
use EasySwoole\Queue\Job;
use EasySwoole\Queue\Queue;
use EasySwoole\Redis\Config\RedisConfig;
class Index extends Controller
{
// 生產(chǎn)普通任務(wù)
public function producer1()
{
// 獲取隊(duì)列
$queue = MyQueue::getInstance();
// 創(chuàng)建任務(wù)
$job = new Job();
// 設(shè)置任務(wù)數(shù)據(jù)
$job->setJobData("this is my job data time time " . date('Ymd h:i:s'));
var_dump('producer1 => ');
var_dump($job->getJobData());
// 生產(chǎn)普通任務(wù)
$produceRes = $queue->producer()->push($job);
if (!$produceRes) {
$this->writeJson(Status::CODE_OK, [], '隊(duì)列生產(chǎn)普通任務(wù)失敗!');
} else {
$this->writeJson(Status::CODE_OK, [], '隊(duì)列生產(chǎn)普通任務(wù)成功!');
}
}
// 生產(chǎn)延遲任務(wù)
public function producer2()
{
// 獲取隊(duì)列
$queue = MyQueue::getInstance();
// 創(chuàng)建任務(wù)
$job = new Job();
// 設(shè)置任務(wù)數(shù)據(jù)
$job->setJobData("this is my job data time time " . date('Ymd h:i:s'));
// 設(shè)置任務(wù)延后執(zhí)行時(shí)間
$job->setDelayTime(5);
var_dump('producer2 => ');
var_dump($job->getJobData());
// 生產(chǎn)延遲任務(wù)
$produceRes = $queue->producer()->push($job);
if (!$produceRes) {
$this->writeJson(Status::CODE_OK, [], '隊(duì)列生產(chǎn)延遲任務(wù)失敗!');
} else {
$this->writeJson(Status::CODE_OK, [], '隊(duì)列生產(chǎn)延遲任務(wù)成功!');
}
}
// 生產(chǎn)可信任務(wù)
public function producer3()
{
// 獲取隊(duì)列
$queue = MyQueue::getInstance();
// 創(chuàng)建任務(wù)
$job = new Job();
// 設(shè)置任務(wù)數(shù)據(jù)
$job->setJobData("this is my job data time time " . date('Ymd h:i:s'));
var_dump('producer3 => ');
var_dump($job->getJobData());
// 設(shè)置任務(wù)重試次數(shù)為 3 次。任務(wù)如果沒(méi)有確認(rèn),則會(huì)執(zhí)行三次
$job->setRetryTimes(3);
// 如果5秒內(nèi)沒(méi)確認(rèn)任務(wù),會(huì)重新回到隊(duì)列。默認(rèn)為3秒
$job->setWaitConfirmTime(5);
// 投遞任務(wù)
$queue->producer()->push($job);
// 確認(rèn)一個(gè)任務(wù)
$queue->consumer()->confirm($job);
}
// 消費(fèi)任務(wù)
public function consumer()
{
// 獲取隊(duì)列
$queue = MyQueue::getInstance();
### 消費(fèi)任務(wù)
// 獲取到需要消費(fèi)的任務(wù)
$job = $queue->consumer()->pop();
if (!$job) {
$this->writeJson(Status::CODE_OK, [], '沒(méi)有隊(duì)列任務(wù)需要消費(fèi)了!');
return false;
}
// 獲取需要消費(fèi)的任務(wù)的數(shù)據(jù)
$jobData = $job->getJobData();
var_dump($jobData);
}
}
進(jìn)階使用
我們可以自定義驅(qū)動(dòng),實(shí)現(xiàn) RabbitMQ
、Kafka
等消費(fèi)隊(duì)列軟件的封裝。
用戶需要定義類,并實(shí)現(xiàn) \EasySwoole\Queue\QueueDriverInterface
接口的幾個(gè)方法即可。該接口的詳細(xì)實(shí)現(xiàn)請(qǐng)看下文。
QueueDriverInterface 接口類實(shí)現(xiàn)
<?php
namespace EasySwoole\Queue;
interface QueueDriverInterface
{
public function push(Job $job,float $timeout = 3.0): bool;
public function pop(float $timeout = 3.0, array $params = []): ?Job;
public function info(): ?array;
public function confirm(Job $job,float $timeout = 3.0): bool;
}
相關(guān)倉(cāng)庫(kù)
EasySwoole 中利用 Redis 實(shí)現(xiàn)消息隊(duì)列
如何利用 EasySwoole 多進(jìn)程多協(xié)程 Redis 隊(duì)列實(shí)現(xiàn)爬蟲(chóng)