Queue的核心逻辑–push,later,pop
(1) vendor/topthink/think-queue/src/Queue.php
class Queue
{
/** @var Connector */
protected static $connector;
private static function buildConnector()
{
$options = Config::get('queue');
$type = !empty($options['connector']) ? $options['connector'] : 'Sync';
if (!isset(self::$connector)) {
$class = false !== strpos($type, '\\') ? $type : '\\think\\queue\\connector\\' . Str::studly($type);
self::$connector = new $class($options);
}
return self::$connector;
}
public static function __callStatic($name, $arguments)
{
return call_user_func_array([self::buildConnector(), $name], $arguments);
}
}
一般都用redis作为队列处理的媒介。
(2) vendor/topthink/think-queue/src/queue/connector/Redis.php
(2.1) Queue:push 的处理逻辑
public function push($job, $data = '', $queue = null)
{
return $this->pushRaw($this->createPayload($job, $data), $queue);
}
public function pushRaw($payload, $queue = null)
{
$this->redis->rPush($this->getQueue($queue), $payload);
return json_decode($payload, true)['id'];
}
可以看到 往队列push消息的时候就是往list的最后塞了一个序列化的消息
(2.2)队列中id消息序列化
值的格式如下:
{
"job": "Demojob",
"data": "{\"id\":[\"11\"]}",
"id": "BKW4c2vZv8ncBaE3vamch3Hwrwc92w0o",
"attempts": 1
}
id的生成规则
public static function random($length = 16)
{
$pool = '0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ';
return static::substr(str_shuffle(str_repeat($pool, $length)), 0, $length);
}
(2.3) Queue:later 的处理方式
核心处理1: 进入sorted set的时机
public function later($delay, $job, $data = '', $queue = null)
{
$payload = $this->createPayload($job, $data);
$this->redis->zAdd($this->getQueue($queue) . ':delayed', time() + $delay, $payload);
}
可以看到延迟队列的实现主要就是 往队列 queues:{queue_name}:delayed 的sorted set 中塞了个序列化的消息
核心处理2: 从sorted set中del的时机
延迟消息合并到正常消息处理list中的总逻辑入口
public function migrateExpiredJobs($from, $to, $attempt = true)
{
$this->redis->watch($from);
$jobs = $this->getExpiredJobs(
$from, $time = time()
);
if (count($jobs) > 0) {
$this->transaction(function () use ($from, $to, $time, $jobs, $attempt) {
$this->removeExpiredJobs($from, $time);
$this->pushExpiredJobsOntoNewQueue($to, $jobs, $attempt);
});
}
$this->redis->unwatch();
}
使用redis事务进行消息处理
protected function transaction(\Closure $closure)
{
$this->redis->multi();
try {
call_user_func($closure);
if (!$this->redis->exec()) {
$this->redis->discard();
}
} catch (Exception $e) {
$this->redis->discard();
}
}
获取截止到当前时间的延迟消息
protected function getExpiredJobs($from, $time)
{
return $this->redis->zRangeByScore($from, '-inf', $time);
}
删除截至到当前时间的延时任务
protected function removeExpiredJobs($from, $time)
{
$this->redis->zRemRangeByScore($from, '-inf', $time);
}
将延迟消息移动到正常处理的消息list中
protected function pushExpiredJobsOntoNewQueue($to, $jobs, $attempt = true)
{
if ($attempt) {
foreach ($jobs as &$job) {
$attempts = json_decode($job, true)['attempts'];
$job = $this->setMeta($job, 'attempts', $attempts + 1);
}
}
call_user_func_array([$this->redis, 'rPush'], array_merge([$to], $jobs));
}
(2.4) Queue:push的处理方式
public function pop($queue = null)
{
$original = $queue ?: $this->options['default'];
$queue = $this->getQueue($queue);
$this->migrateExpiredJobs($queue . ':delayed', $queue, false);
if (!is_null($this->options['expire'])) {
$this->migrateExpiredJobs($queue . ':reserved', $queue);
}
$job = $this->redis->lPop($queue);
if ($job !== false) {
$this->redis->zAdd($queue . ':reserved', time() + $this->options['expire'], $job);
return new RedisJob($this, $job, $original);
}
}
使用redis做为队列的时候,常见的值
key名称 | key类型 | 作用 |
---|---|---|
queues:{queue_name} | list | Queue::push时,会往该值塞数据, Queue::pop的时候会出列 |
queues:{queue_name}:reserved | sorted set | 出队列失败,会进入到该值 |
queues:{queue_name}:delayed | sorted set | Queue::later时,会往该值塞数据,Queue::pop的时候会将延迟队列的数据放到list最后 |