标题:【Laravel系列7.7】队列系统

文章目录
    分类:PHP 标签:Laravel,PHP框架

    队列系统

    队列相关的应用对于现在的系统开发来说非常常见,不管你是发消息还是应对大流量,队列都是一个非常常用而且非常好用的解决方案。我们自己写队列去实现很多功能其实已经非常方便了,不过 Laravel 也为我们准备好了一套现成的队列系统,直接配置一下就能够方便地使用了。今天,我们就来学习了解一下 Laravel 中队列系统相关的内容。

    配置

    队列的配置非常简单,在 config 目录下就有一个名为 queue.php 的文件,这个文件就是队列的配置文件。

    'default' => env('QUEUE_CONNECTION', 'sync'),

    第一行的这个 default 就是一个默认队列系统的连接配置,在默认情况下,它使用的是 sync 。意思就是同步的,也就是说,只要调用了队列分发,马上就执行队列的内容。显然,这个和普通的顺序编写代码没什么区别,它也不是我们的重点。我们可以通过修改 .env 配置文件中的 QUEUE_CONNECTION 来修改默认的连接配置,它所能接受的值就是这个配置文件中下方 connections 中的内容。

    'connections' => [
        // ………………
        // ………………
        'redis' => [
            'driver' => 'redis',
            'connection' => 'default',
            'queue' => env('REDIS_QUEUE', 'default'),
            'retry_after' => 90,
            'block_for' => null,
            'after_commit' => false,
        ],
    
    ],

    在这些连接配置中,我们可以看到 database、beanstalkd、sqs、redis 等相关队列系统的配置。database 其实就是使用数据库来作为队列系统。相应的你需要建立对应的数据表,不过数据库当队列的效率很一般。另外 beanstalkd 和 sqs 大家可能也接触得不多,所以我们主要还是使用 redis 这个数据连接驱动。现在大家直接把 .env 中的 QUEUE_CONNECTION 改成 redis 就好了。至于更复杂的 RabbitMQ 和 Kafka 之类的队列系统,在 Laravel 框架中并没有给出直接的集成应用方案,这些还是建议大家找找其它的 Composer 包吧。

    使用队列

    在默认情况下,我们所使用的 任务类 都被存放在了 app/Jobs 目录中。没错,在 Laravel 中,队列被表示为一个一个的任务。我们可以使用下面这个命令来创建一个任务类。如果你的目录中没有 Jobs 目录也没关系,命令行会自动创建这个目录。

    php artisan make:job Test

    生成后的任务类有一个构造函数,还有一个 handle() 方法。相信大家对这种类已经不陌生了,handle() 方法肯定是用来处理队列任务的。那么我们就给 handle() 方法中增加一些内容。

    // app/Jobs/Test.php
    // ………………
    public function handle()
    {
        //
        echo date("Y-m-d H:i:s");
        sleep(10);
    }
    // ………………

    打印一下日期,然后再睡个 10 秒钟,这样一会我们测试的时候可以看得更清楚。


    接下来我们定义一个路由,并且实现队列的分发。

    Route::get('queue/test1', function(){
        \App\Jobs\Test::dispatch();
        \App\Jobs\Test::dispatch();
        \App\Jobs\Test::dispatch();
    
        dispatch(function(){
            echo 'callback queue';
            sleep(10);
        });
        dispatch(function(){
            echo 'callback queue';
            sleep(10);
        });
    });

    在这个测试路由中,我们将 Test 任务分发了三次。分发?没错,相信你又发现了一个问题,这是不是和事件有关啊?是的,Laravel 中的队列也是以事件的形式实现的。另外我们还分发了两条回调函数形式的队列任务,也就是说,队列任务是支持两种形式的,要么我们定义的 Jobs 任务类,要么就是回调函数形式的任务。


    好了,访问这个路由,貌似没什么效果,但你可以在 redis 中看到一条 laravel_database_queues:default 数据。type laravel_database_queues:default 可以看到它是一个 list 类型的数据。我们直接 LPOP 弹一条出来看看。

    "{\"uuid\":\"568c9f5f-062f-4f16-b0df-a9711406332d\",\"displayName\":\"App\\\\Jobs\\\\Test\",\"job\":\"Illuminate\\\\Queue\\\\CallQueuedHandler@call\",\"maxTries\":null,\"maxExceptions\":null,\"backoff\":null,\"timeout\":null,\"retryUntil\":null,\"data\":{\"commandName\":\"App\\\\Jobs\\\\Test\",\"command\":\"O:13:\\\"App\\\\Jobs\\\\Test\\\":10:{s:3:\\\"job\\\";N;s:10:\\\"connection\\\";N;s:5:\\\"queue\\\";N;s:15:\\\"chainConnection\\\";N;s:10:\\\"chainQueue\\\";N;s:19:\\\"chainCatchCallbacks\\\";N;s:5:\\\"delay\\\";N;s:11:\\\"afterCommit\\\";N;s:10:\\\"middleware\\\";a:0:{}s:7:\\\"chained\\\";a:0:{}}\"},\"id\":\"xLG4ZUsds6uWibV3sxOXxY9ESwWFni8F\",\"attempts\":0}"
    
    {
    	"uuid": "568c9f5f-062f-4f16-b0df-a9711406332d",
    	"displayName": "App\\Jobs\\Test",
    	"job": "Illuminate\\Queue\\CallQueuedHandler@call",
    	"maxTries": null,
    	"maxExceptions": null,
    	"backoff": null,
    	"timeout": null,
    	"retryUntil": null,
    	"data": {
    		"commandName": "App\\Jobs\\Test",
    		"command": "O:13:\"App\\Jobs\\Test\":10:{s:3:\"job\";N;s:10:\"connection\";N;s:5:\"queue\";N;s:15:\"chainConnection\";N;s:10:\"chainQueue\";N;s:19:\"chainCatchCallbacks\";N;s:5:\"delay\";N;s:11:\"afterCommit\";N;s:10:\"middleware\";a:0:{}s:7:\"chained\";a:0:{}}"
    	},
    	"id": "xLG4ZUsds6uWibV3sxOXxY9ESwWFni8F",
    	"attempts": 0
    }

    将它格式化之后就看得比较清楚了。在这里面我们看到了 App\Jobs\Test 的存在,也看到了许多其它参数。很明显,从这里我们可以猜测出来 Laravel 也是通过 POP 一条队列数据,然后再去根据这个 json 内容实例化 Test 对象并执行里面的 handle() 方法来实现队列的处理。后面我们在分析源码的时候再深入地看一看是不是这样。


    队列插入是成功了,redis 中也有了数据了,接下来要怎么执行队列里面的内容呢?也就是异步地去执行队列操作。相信不少同学已经想到了,肯定得有一个命令行在后端持续运行嘛。

    队列处理

    对于队列的处理,我们有两个命令可以使用,这两个命令都会挂起一个监听,也就是监听队列内是否有内容,如果有的话就 pop 出来进行处理。

    php artisan queue:work
    
    php artisan queue:listen

    那么他们两个有什么区别呢?work 是工作的意思,也就是让队列开始工作,它比较适合线上使用,效率更高。另外如果修改 job 类或者修改代码,它也是需要先手动停止然后再次启动才能看到效果的。


    而对于我们现在的测试来说,使用 listen 更好一些,它是监听的意思。这种运行方式的效率差一些,但可以实时监听 job 任务类的变化。


    现在你可以随便运行这两个命令中的任何一个,前面我们在路由中添加到队列中应该有 5 条队列任务,但是我们在查看 redis 的时候手动 lpop 出来的一条,那么现在应该是输出四条任务,就像下面一样:

    [2021-11-18 08:49:56][YluIVplfkScY3lqFhCr5bICzzlX0Yx6H] Processing: App\Jobs\Test
    2021-11-18 08:49:56
    [2021-11-18 08:50:06][YluIVplfkScY3lqFhCr5bICzzlX0Yx6H] Processed:  App\Jobs\Test
    
    [2021-11-18 08:50:07][yUZIA8g0xoJGLZd3EML3PTmuWr3VlVgs] Processing: App\Jobs\Test
    2021-11-18 08:50:07
    [2021-11-18 08:50:17][yUZIA8g0xoJGLZd3EML3PTmuWr3VlVgs] Processed:  App\Jobs\Test
    
    [2021-11-18 08:50:17][q1eG5yZfRBLRHJArn9z43jgX9iYe7KNH] Processing: Closure (web.php:834)
    callback queue[2021-11-18 08:50:27][q1eG5yZfRBLRHJArn9z43jgX9iYe7KNH] Processed:  Closure (web.php:834)
    
    [2021-11-18 08:50:28][j7SPjzF0WNxGKsA32icXovT433BWDHCJ] Processing: Closure (web.php:838)
    callback queue[2021-11-18 08:50:38][j7SPjzF0WNxGKsA32icXovT433BWDHCJ] Processed:  Closure (web.php:838)

    可能数据会比较乱,但是应该也能清晰地看出我们输出的结果是四条队列信息中的输出内容。前两个是我们在 app/Jobs/Test 类中输出的时间信息,后两条是打印的回调函数输出的 callback queue 内容。

    队列参数

    队列的定义、分发、执行都没问题了,但是小伙伴们肯定要问了,光这样不行呀,不管是发送短信、邮件还是处理订单信息,我们肯定是要传值给任务处理对象的嘛,就像是手机号、邮箱地址或者订单号之类的,这个要怎么传给任务对象或者回调函数呢?

    class Test2 implements ShouldQueue
    {
        use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
    
        private $obj;
    
        /**
         * Create a new job instance.
         *
         * @return void
         */
        public function __construct($obj)
        {
            //
            $this->obj = $obj;
        }
    
        /**
         * Execute the job.
         *
         * @return void
         */
        public function handle()
        {
            //
            echo date("Y-m-d H:i:s");
            print_r($this->obj);
            sleep(10);
        }
    }

    其实非常简单,就像上面的这个新定义的 Test2 类一样,我们直接在构造函数中接参就可以了。

    Route::get('queue/test2', function(){
        $obj = new stdClass();
        $obj->a = 111;
        \App\Jobs\Test2::dispatch($obj);
        \App\Jobs\Test2::dispatch($obj);
        \App\Jobs\Test2::dispatch($obj);
    
        dispatch(function() use ($obj){
            echo 'callback queue';
            print_r($obj);
            sleep(10);
        });
        dispatch(function() use($obj){
            echo 'callback queue';
            print_r($obj);
            sleep(10);
        });
    });

    在路由分发的时候,直接就把参数放到 Test2::dispatch() 方法的参数中就可以了。回调函数形式的则直接使用 use 判断字将参数传递进去就可以了。Test2::dispatch() 方法的实现其实就是实例化自己并将所有接收到的参数传给自己的构造函数。

    // vendor/laravel/framework/src/Illuminate/Foundation/Bus/Dispatchable.php
    public static function dispatch()
    {
        return new PendingDispatch(new static(...func_get_args()));
    }

    这个 trait 是我们生成的任务类都会调用的,它是静态的方法,所以在这个方法中,使用的是 new static() ,也就是实例化当前这个 Test2 类自己,将这个实例化对象再当做参数传递给新 new 出来的 PendingDispatch() 对象。

    任务链

    任务链是个什么鬼?它其实就是让你能够指定一级在主任务成功执行后按顺序运行的排队任务。也就是说,它是一个大队列任务,然后在这个队列任务中我们可以再指定一系列小的队列任务,让他们在这个大任务中有序执行。

    \Illuminate\Support\Facades\Bus::chain([
        function(){
            echo 'first';
        },
        new \App\Jobs\Test(),
        function(){
            echo 'third';
        }
    ])->dispatch();

    执行这段任务链接之后,输出的结果是 first Test third 这样的效果。如果中间有任务出现问题了,那么我们可以通过 catch() 来捕获异常。

    \Illuminate\Support\Facades\Bus::chain([
        function(){
            echo 'first';
            throw new \Exception("第一个就错了");
        },
        new \App\Jobs\Test(),
        function(){
            echo 'third';
        }
    ])->catch(function(Throwable $e){
        echo "Error:", $e->getMessage();
    })->dispatch();

    执行分析

    对于队列的执行分析,我们要从两个方向上来看,一个是分发也就是入队,另一个是脚本 queue:work 或者 queue:listen ,也就是出队。

    分发入队

    前面我们已经看到了,在执行 dispatch() 方法时会 new 一个 PendingDispatch() 对象,然后将 Test 这种 Job 对象当做参数放到它的 job 属性中。通过构造函数赋值完 job 之后,直接会进入它的 __destruct() 析构函数。

    public function __destruct()
    {
        if (! $this->shouldDispatch()) {
            return;
        } elseif ($this->afterResponse) {
            app(Dispatcher::class)->dispatchAfterResponse($this->job);
        } else {
            app(Dispatcher::class)->dispatch($this->job);
        }
    }

    在这个函数中会进行判断,如果我们没有别的操作,那么它会进入到 app(Dispatcher::class)->dispatch($this->job); 中。

    public function dispatch($command)
    {
        return $this->queueResolver && $this->commandShouldBeQueued($command)
                        ? $this->dispatchToQueue($command)
                        : $this->dispatchNow($command);
    }

    接着在判断完成后进入到 dispatchToQueue() 。

    public function dispatchToQueue($command)
    {
        $connection = $command->connection ?? null;
    
        $queue = call_user_func($this->queueResolver, $connection);
    
        if (! $queue instanceof Queue) {
            throw new RuntimeException('Queue resolver did not return a Queue implementation.');
        }
    
        if (method_exists($command, 'queue')) {
            return $command->queue($queue, $command);
        }
    
        return $this->pushCommandToQueue($queue, $command);
    }

    这里会组织我们的队列连接内容,也就是使用哪种队列驱动。如果你设置了断点调试的话,最后传递到 pushCommandToQueue() 的 $queue 属性实际上已经是一个 Illuminate\Queue\RedisQueue 对象。

    protected function pushCommandToQueue($queue, $command)
    {
        if (isset($command->queue, $command->delay)) {
            return $queue->laterOn($command->queue, $command->delay, $command);
        }
    
        if (isset($command->queue)) {
            return $queue->pushOn($command->queue, $command);
        }
    
        if (isset($command->delay)) {
            return $queue->later($command->delay, $command);
        }
    
        return $queue->push($command);
    }

    pushCommandToQueue() 方法继续组织数据,最后将命令 push 到我们指定的 redis 队列中。这个 $command 又是什么呢?其实就是我们的 Test 对象。它最终会和其它一些参数组成一个 payload 并进行 json_encode() 之后保存在 redis 中。

    脚本出队执行

    接下来的脚本出队操作就是在命令行了,我们需要找到 queue:work 的位置,这个也比较好找,它就在 vendor/laravel/framework/src/Illuminate/Queue/Console/WorkCommand.php 。直接看它的 handle() 方法。

    public function handle()
    {
        if ($this->downForMaintenance() && $this->option('once')) {
            return $this->worker->sleep($this->option('sleep'));
        }
    
        $this->listenForEvents();
    
        $connection = $this->argument('connection')
                        ?: $this->laravel['config']['queue.default'];
    
    
        $queue = $this->getQueue($connection);
    
        return $this->runWorker(
            $connection, $queue
        );
    }

    从这段代码中,我们可以看出,最后返回的 runWorker() 肯定是在运行工作脚本,而前面的 $connection 就是获取使用的队列连接配置,它返回的是 redis ,而 $queue 则是队列的名称配置,也就是在 redis 中的 list 名称的定义,这里返回的是 default 默认的队列名称。

    protected function runWorker($connection, $queue)
    {
        return $this->worker->setName($this->option('name'))
                        ->setCache($this->cache)
                        ->{$this->option('once') ? 'runNextJob' : 'daemon'}(
            $connection, $queue, $this->gatherWorkerOptions()
        );
    }

    在 runWorker() 方法中,我们使用的是当前命令类中的 worker 属性,它是在构造函数中通过服务容器依赖注入进来的一个 vendor/laravel/framework/src/Illuminate/Queue/Worker.php 对象。我们的 work 默认走的是 daemon 模式,所以会进入 Worker 的 daemon() 方法。

    public function daemon($connectionName, $queue, WorkerOptions $options)
    {
        if ($this->supportsAsyncSignals()) {
            $this->listenForSignals();
        }
    
        $lastRestart = $this->getTimestampOfLastQueueRestart();
    
        [$startTime, $jobsProcessed] = [hrtime(true) / 1e9, 0];
    
        while (true) {
            if (! $this->daemonShouldRun($options, $connectionName, $queue)) {
                $status = $this->pauseWorker($options, $lastRestart);
    
                if (! is_null($status)) {
                    return $this->stop($status);
                }
    
                continue;
            }
    
            $job = $this->getNextJob(
                $this->manager->connection($connectionName), $queue
            );
    
            if ($this->supportsAsyncSignals()) {
                $this->registerTimeoutHandler($job, $options);
            }
    
            if ($job) {
                $jobsProcessed++;
    
                $this->runJob($job, $connectionName, $options);
    
                if ($options->rest > 0) {
                    $this->sleep($options->rest);
                }
            } else {
                $this->sleep($options->sleep);
            }
    
            if ($this->supportsAsyncSignals()) {
                $this->resetTimeoutHandler();
            }
    
            $status = $this->stopIfNecessary(
                $options, $lastRestart, $startTime, $jobsProcessed, $job
            );
    
            if (! is_null($status)) {
                return $this->stop($status);
            }
        }
    }

    在这个方法的 getNextJob() 中,$this->manager->connection() 是通过 vendor/laravel/framework/src/Illuminate/Queue/QueueManager.php 的 connection() 方法获得一个驱动实例,如果你在断点调试的话,它返回的就是一个 redis 连接实例 vendor/laravel/framework/src/Illuminate/Queue/RedisQueue.php ,然后在 getNextJob() 方法中会使用 RedisJob 对象的 pop() 方法弹出队列中的一条信息并封装成 vendor/laravel/framework/src/Illuminate/Queue/Jobs/RedisJob.php 对象。


    接下来就是使用 runJob() 方法来执行这个 RedisJob 对象中的内容。runJob() 方法继续向下调用 process() 方法,在这个方法中会执行 $job->fire() 方法,这个方法是 RedisJob 继承的 vendor/laravel/framework/src/Illuminate/Queue/Jobs/Job.php 。

    public function fire()
    {
        $payload = $this->payload();
    
        [$class, $method] = JobName::parse($payload['job']);
    
        ($this->instance = $this->resolve($class))->{$method}($this, $payload['data']);
    }

    payload() 方法用于将队列中的 job 内容取出来,其实也就将我们上面保存在 redis 中的 json 字符串转换为数组。然后再取出这个数组信息中的 job 字段的内容。在我这里,它显示的是一个 Illuminate\Queue\CallQueuedHandler@call 信息,其实也就是框架默认将使用这样一个回调类来处理我们的队列对象信息。而 data 中的信息则是我们的任务类 App\Jobs\Test 。那么我们就再进入到 CallQueuedHandler 的 call() 方法中看一下。

    public function call(Job $job, array $data)
    {
        try {
            $command = $this->setJobInstanceIfNecessary(
                $job, $this->getCommand($data)
            );
        } catch (ModelNotFoundException $e) {
            return $this->handleModelNotFound($job, $e);
        }
    
        if ($command instanceof ShouldBeUniqueUntilProcessing) {
            $this->ensureUniqueJobLockIsReleased($command);
        }
    
        $this->dispatchThroughMiddleware($job, $command);
    
        if (! $job->isReleased() && ! $command instanceof ShouldBeUniqueUntilProcessing) {
            $this->ensureUniqueJobLockIsReleased($command);
        }
    
        if (! $job->hasFailed() && ! $job->isReleased()) {
            $this->ensureNextJobInChainIsDispatched($command);
            $this->ensureSuccessfulBatchJobIsRecorded($command);
        }
    
        if (! $job->isDeletedOrReleased()) {
            $job->delete();
        }
    }

    在这个 call() 方法中,首先获得 $command 对象,它就是通过容器实例化之后的 App\Jobs\Test 对象。然后转入 dispatchThroughMiddleware() 方法中。

    protected function dispatchThroughMiddleware(Job $job, $command)
    {
        return (new Pipeline($this->container))->send($command)
                ->through(array_merge(method_exists($command, 'middleware') ? $command->middleware() : [], $command->middleware ?? []))
                ->then(function ($command) use ($job) {
                    return $this->dispatcher->dispatchNow(
                        $command, $this->resolveHandler($job, $command)
                    );
                });
    }

    dispatchThroughMiddleware() 方法会再封装成一个 管道 继续向下执行到 then() 方法里面的 dispatchNow() 方法。这个方法是实例 vendor/laravel/framework/src/Illuminate/Bus/Dispatcher.php 中的方法,

    public function dispatchNow($command, $handler = null)
    {
        $uses = class_uses_recursive($command);
    
        if (in_array(InteractsWithQueue::class, $uses) &&
            in_array(Queueable::class, $uses) &&
            ! $command->job) {
            $command->setJob(new SyncJob($this->container, json_encode([]), 'sync', 'sync'));
        }
    
        if ($handler || $handler = $this->getCommandHandler($command)) {
            $callback = function ($command) use ($handler) {
                $method = method_exists($handler, 'handle') ? 'handle' : '__invoke';
    
                return $handler->{$method}($command);
            };
        } else {
            $callback = function ($command) {
                $method = method_exists($command, 'handle') ? 'handle' : '__invoke';
    
                return $this->container->call([$command, $method]);
            };
        }
    
        return $this->pipeline->send($command)->through($this->pipes)->then($callback);
    }

    可以看到在这个方法中,通过不同的判断分别生成了两个回调方法,它们的内容略有不同,但都可以看到 handle 和 __invoke 的影子,而且他们是调用 $command 也就是我们之前已经实例化的 App\Jobs\Test 里面对应的 handle() 或 __invoke() 方法了。这下相信大家就比较清楚了。最后还是返回了一个 管道 操作,不过管道操作的最终都会进入到 then() 方法,其实也就是调用了 $callback 里面的内容。之后就是进入到我们定义的 App\Jobs\Test 对象的 handle() 方法中执行我们写好的队列处理操作了。


    看出来了吧,整个队列的调用执行过程非常长,也非常复杂。这里我们也只是将最核心的步骤摘取了出来。在这其中,我们见到了事件分发使用,也见到了管道操作的使用,至于服务容器更是不用多说了。从这里也可以看出,队列系统就是建立在之前我们已经学习过的这些内容的基础上实现的。更多具体的内容大家可以再继续深入的自行调试,配置好断点,学会断点调试真的非常重要哦。

    总结

    看似一个小小的队列系统,内部实现并没有我们想像中的简单吧。当然,如果只是使用的话这套队列系统还是非常简单方便的。如果不想那么复杂,其实你自己去使用 redis 的 lpop 、lpush 之类的功能也是没问题的。还是那句话,具体业务具体分析。


    另外,整个队列系统还有很多其它的功能,比如说任务中间件、延迟分发、任务批处理、优先队列、Supervisor保活配置等,大家可以继续根据官方文档进行深入的学习哦!


    参考文档:


    https://learnku.com/docs/laravel/8.5/queues/10395

    视频链接

    微信文章地址:http://mp.weixin.qq.com/s?__biz=MzIxODQyNTU1MA==&mid=2247486999&idx=1&sn=3eb186c3c571397def64225e8e036285&chksm=97ebffb6a09c76a06c8671ce2667f47ba23b1ca1449deafbb2d79c6805421ecf25d6c1ed1387&scene=27#wechat_redirect

    B站视频地址:https://www.bilibili.com/video/BV1yS4y1s7Ec/

    微信视频地址:https://mp.weixin.qq.com/s/55-wp3YIQpLSrktIlZKMow

    搜索
    关注