topthink/think-queue 核心操作解析(一)

Queue的核心逻辑–push,later,pop

(1) vendor/topthink/think-queue/src/Queue.php

class Queue
{
    /** @var Connector */
    protected static $connector;

    private static function buildConnector()
    {
        $options = Config::get('queue');
        $type    = !empty($options['connector']) ? $options['connector'] : 'Sync';

        if (!isset(self::$connector)) {

            $class = false !== strpos($type, '\\') ? $type : '\\think\\queue\\connector\\' . Str::studly($type);

            self::$connector = new $class($options);
        }
        return self::$connector;
    }

    public static function __callStatic($name, $arguments)
    {
        return call_user_func_array([self::buildConnector(), $name], $arguments);
    }
}

一般都用redis作为队列处理的媒介。

(2) vendor/topthink/think-queue/src/queue/connector/Redis.php

(2.1) Queue:push 的处理逻辑

 public function push($job, $data = '', $queue = null)
 {
    return $this->pushRaw($this->createPayload($job, $data), $queue);
 }

 public function pushRaw($payload, $queue = null)
 {
    $this->redis->rPush($this->getQueue($queue), $payload);

    return json_decode($payload, true)['id'];
}

可以看到 往队列push消息的时候就是往list的最后塞了一个序列化的消息

(2.2)队列中id消息序列化

值的格式如下:

{
  "job": "Demojob",
  "data": "{\"id\":[\"11\"]}",
  "id": "BKW4c2vZv8ncBaE3vamch3Hwrwc92w0o",
  "attempts": 1
}

id的生成规则

public static function random($length = 16)
{
    $pool = '0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ';

    return static::substr(str_shuffle(str_repeat($pool, $length)), 0, $length);
}

(2.3) Queue:later 的处理方式

核心处理1: 进入sorted set的时机

public function later($delay, $job, $data = '', $queue = null)
{
    $payload = $this->createPayload($job, $data);

    $this->redis->zAdd($this->getQueue($queue) . ':delayed', time() + $delay, $payload);
}

可以看到延迟队列的实现主要就是 往队列 queues:{queue_name}:delayed 的sorted set 中塞了个序列化的消息

核心处理2: 从sorted set中del的时机

延迟消息合并到正常消息处理list中的总逻辑入口

public function migrateExpiredJobs($from, $to, $attempt = true)
{
    $this->redis->watch($from);

    $jobs = $this->getExpiredJobs(
        $from, $time = time()
    );
    if (count($jobs) > 0) {
        $this->transaction(function () use ($from, $to, $time, $jobs, $attempt) {
            $this->removeExpiredJobs($from, $time);
            $this->pushExpiredJobsOntoNewQueue($to, $jobs, $attempt);
        });
    }
    $this->redis->unwatch();
}

使用redis事务进行消息处理

protected function transaction(\Closure $closure)
{
    $this->redis->multi();
    try {
        call_user_func($closure);
        if (!$this->redis->exec()) {
            $this->redis->discard();
        }
    } catch (Exception $e) {
        $this->redis->discard();
    }
}

获取截止到当前时间的延迟消息

protected function getExpiredJobs($from, $time)
{
    return $this->redis->zRangeByScore($from, '-inf', $time);
}

删除截至到当前时间的延时任务

protected function removeExpiredJobs($from, $time)
{
    $this->redis->zRemRangeByScore($from, '-inf', $time);
}

将延迟消息移动到正常处理的消息list中

 protected function pushExpiredJobsOntoNewQueue($to, $jobs, $attempt = true)
 {
     if ($attempt) {
        foreach ($jobs as &$job) {
             $attempts = json_decode($job, true)['attempts'];
             $job      = $this->setMeta($job, 'attempts', $attempts + 1);
        }
     }
     call_user_func_array([$this->redis, 'rPush'], array_merge([$to], $jobs));
 }

(2.4) Queue:push的处理方式

 public function pop($queue = null)
 {
    $original = $queue ?: $this->options['default'];

    $queue = $this->getQueue($queue);

    $this->migrateExpiredJobs($queue . ':delayed', $queue, false);

    if (!is_null($this->options['expire'])) {
    $this->migrateExpiredJobs($queue . ':reserved', $queue);
    }

    $job = $this->redis->lPop($queue);

    if ($job !== false) {
        $this->redis->zAdd($queue . ':reserved', time() + $this->options['expire'], $job);

        return new RedisJob($this, $job, $original);
    }
}

使用redis做为队列的时候,常见的值

key名称 key类型 作用
queues:{queue_name} list Queue::push时,会往该值塞数据, Queue::pop的时候会出列
queues:{queue_name}:reserved sorted set 出队列失败,会进入到该值
queues:{queue_name}:delayed sorted set Queue::later时,会往该值塞数据,Queue::pop的时候会将延迟队列的数据放到list最后
普人特福的博客cnzz&51la for wordpress,cnzz for wordpress,51la for wordpress