标题:【Swoole系列3.5】进程池与进程管理器

文章目录
    分类:PHP 标签:Swoole

    进程池与进程管理器

    我们已经学习过单个进程相关的内容,也学习了进程间如何进行通信,但是,一个一个地进程还是非常不好管理,这不,Swoole 就为我们直接准备好了进程池以及进程管理相关的工具。

    进程池

    进程池就是管理多个工作进程的一个工具,它本身也是基于后面我们要讲的进程管理器。主要的核心功能就是去管理多个进程,让开发者无需编写过多代码即可实现进程管理功能,同时也可以创建纯协程风格的,能利用多核 CPU 的服务端程序。

    $workNum = 5;
    
    $pool = new \Swoole\Process\Pool($workNum);
    
    $pool->on('WorkerStart', function(\Swoole\Process\Pool $pool, $workerId){
       echo "工作进程:{$workerId}, pid: " . posix_getpid() . " 开始运行!", PHP_EOL;
       while(1);
    });
    
    $pool->on("WorkerStop", function(\Swoole\Process\Pool $pool, $workerId){
       echo "工作进程:{$workerId}, pid: " . posix_getpid() . " 结束运行!", PHP_EOL;
    });
    
    $pool->start();

    上面就是最简单的一个进程池应用。我们只需要实例化一个 \Swoole\Process\Pool 对象,给他一个进程数量的参数。然后监听它的 WorkerStart 方法以及 WorkerStop 方法。大家可以查看 ps ,会发现有 5 个子进程启动了。


    在这里,我们注意到在 WorkerStart 中,使用了一个循环挂起进程。如果不使用这个循环的话,进程在 WorkerStart 的回调函数中执行完成就会结束。然后进程池为了维护进程数量,又会拉起一个新的进程。于是就会一直不停地创建和结束进程。大家可以自己尝试一下哦。


    所有事件的回调函数中的参数都有两个,一个是 pool 对象本身,另一个是一个 WorkerId 。这个 WorkerId 不是进程 ID ,是进程池对于这个进程的编号,从 0 开始。就像我们在上篇文章中用过的 $workers 数组一样。其实进程池的底层也是维护了这样一个数组,并把数组 key 值也就是编号返回到了回调函数中。像这里我们指定了开启 5 个进程,那么它的 WorkerId 就是从 0 到 4 共 5 个进程的 WorkerId 。


    对于 \Swoole\Process\Pool 来说,其实这只是它默认的一种通信模式的表现。关于这个问题就牵涉到了它的第二个参数,默认情况下,这个参数的值是 SWOOLE_IPC_NONE ,我们其实还可以设置为 SWOOLE_IPC_UNIXSOCK/SWOOLE_IPC_MSGQUEUE/SWOOLE_IPC_SOCKET 这几种。这几种模式与 SWOOLE_IPC_NONE 有许多不同,需要强制监听的内容不同,也不需要循环挂起,我们接着往下看。

    通信模式

    对于不同的进程通信模式,我们一个一个的来看一下。首先就是 SWOOLE_IPC_UNIXSOCK 模式。其实很明显了,这就是我们上回讲过的最常用的管道方式的通信。

    $pool = new \Swoole\Process\Pool($workNum, SWOOLE_IPC_UNIXSOCK);
    
    $pool->on('WorkerStart', function(\Swoole\Process\Pool $pool, $workerId){
       $proc1 = $pool->getProcess(0);
       while(1){
           sleep(1);
           if($workerId == 0){
               echo $proc1->read(), PHP_EOL;
           }else{
               $proc1->write("hello proc1, this is proc" . ($workerId + 1));
           }
       }
    });
    
    $pool->on("Message", function(Swoole\Process\Pool $pool, $data){
    });
    
    $pool->start();

    很简单很熟悉的代码吧,就是上回我们讲过的管道通信方式。我们让第一个子进程读取数据,其它的进程向它发送数据。getProcess() 方法就是用于从进程池中获取单个进程,返回的就是之前我们学习过的 \Swoole\Process 对象。如果不指定它的参数,那么返回的就是当前 WorkerId 的进程对象,如果指定了参数,返回的就是指定的 WorkerId 进程对象。


    除了 SWOOLE_IPC_NONE 模式之外,其它的模式都要监听一个 Message 事件。而且,其它的模式可以不监听 WorkerStart 事件。这个我们后面也会看到应用。


    另一种通信模式就是 SWOOLE_IPC_MSGQUEUE ,也就是消息队列模式。

    $pool = new \Swoole\Process\Pool($workNum, SWOOLE_IPC_MSGQUEUE, 1);
    
    $pool->on('WorkerStart', function(\Swoole\Process\Pool $pool, $workerId){
       $process = $pool->getProcess();
       $process->useQueue(1, 2 | \Swoole\Process::IPC_NOWAIT);
       while(1){
           sleep(1);
           if($workerId == 0){
               foreach(range(1,4) as $v){
                   $process->push("[{$v}]消息来了" . time());
               }
           }else{
               $data = $process->pop();
               if($data){
                   echo $process->pop(), ' workerid:', $workerId, PHP_EOL;
               }
           }
       }
    });
    
    $pool->on("Message", function(Swoole\Process\Pool $pool, $data){
    });
    
    $pool->start();

    同样地,这个测试代码中我们也是让第一个子进程 push() 数组,其它的子进程消费队列数据。也和我们上回讲过的队列通信方式是一模一样的。但是它还有不同的功能,这里我们就用队列来演示 Message 的作用。

    外部通信

    通过对操作系统的学习,我们知道队列是在系统中共用的,所有进程都可以接收到队列数据,它不是只受限于同一个程序的。因此,我们可以让另一个程序发送队列,而这边的程序接收队列,从而实现跨进程的通信。

    $q = msg_get_queue(1);
    foreach (range(1, 100) as $i) {
        $data = "消息来了" . microtime(true);
        msg_send($q, $i, $data, false);
    }

    我们先准备一个上面的代码文件,用于模拟另外的程序发送消息,然后准备下面的测试代码,用于接收发送来的队列消息。

    $pool = new \Swoole\Process\Pool($workNum, SWOOLE_IPC_MSGQUEUE, 1);
    
    $pool->on("Message", function(Swoole\Process\Pool $pool, $data){
       var_dump($pool);
       $process = $pool->getProcess();
       echo $process->pid, PHP_EOL;
       var_dump($data);
    });
    
    $pool->start();
    
    //object(Swoole\Process\Pool)#1 (2) {
    //["master_pid"]=>
    //  int(7114)
    //  ["workers"]=>
    //  array(1) {
    //    [4]=>
    //    object(Swoole\Process)#3 (6) {
    //    ["pipe"]=>
    //      NULL
    //      ["msgQueueId"]=>
    //      NULL
    //      ["msgQueueKey"]=>
    //      NULL
    //      ["pid"]=>
    //      int(7119)
    //      ["id"]=>
    //      int(4)
    //      ["callback":"Swoole\Process":private]=>
    //      NULL
    //    }
    //  }
    //}
    //7119
    //string(27) "消息来了1640313653.9647"
    // ………………

    首先运行起下面的代码,然后再开一个命令行执行上面的发送消息的代码,之后就能看到注释中返回的内容。在这里,我们这个进程池只监听了一个 Message 事件,这就说明 WorkerStart 事件确实并不是必须的哦。在这个 Message 事件中,我们可以看到每次处理的进程 ID 都是不同的,说明和之前我们理解的一样,这些进程也是在争抢处理队列数据。


    除了这种消息队列之外,我们还可以使用一种方式,相信你看了以后会非常兴奋,那就是 SWOOLE_IPC_SOCKET 模式。

    $pool = new \Swoole\Process\Pool($workNum, SWOOLE_IPC_SOCKET);
    
    $pool->listen('0.0.0.0', 8089);
    
    $pool->on("Message", function(Swoole\Process\Pool $pool, $data){
       var_dump($data);
       $pool->write("你发来的数据是:\"{$data}\"");
    });
    
    $pool->start();
    
    // [root@localhost source]# php 3.5进程池与进程管理器.php
    // string(33) "客户端发消息1640318342.8369"
    // string(33) "客户端发消息1640318344.8386"
    // string(33) "客户端发消息1640318346.8397"

    它会启动一个监听服务,所以必须要一个 listen() 方法指定监听 ip 和 端口 。然后客户端就可以直接建立 Socket 连接来与进程通信。除了 ip + 端口 的方式之外,它还可以 listen() 一个 UnixSocket 方式的连接,使用 "unix:/tmp/php.sock" 。

    foreach (range(1, 3) as $i) {
        $fp = stream_socket_client("tcp://127.0.0.1:8089", $errno, $errstr) or die("error: $errstr\n");
        $msg = "客户端发消息" . microtime(true);
        fwrite($fp, pack('N', strlen($msg)) . $msg);
        sleep(2);
        $data = fread($fp, 8192);
        if($data){
            var_dump(substr($data, 4, unpack('N', substr($data, 0, 4))[1]));
        }
        fclose($fp);
    }
    
    // [root@localhost source]# php 3.52socketclient.php
    // string(59) "你发来的数据是:"客户端发消息1640318342.8369""
    // string(59) "你发来的数据是:"客户端发消息1640318344.8386""
    // string(59) "你发来的数据是:"客户端发消息1640318346.8397""

    熟悉吗?亲切吗?想想 php-fpm 的两种监听方式,再想想我们把上面的端口换成 9000 。这不就是一套 php-fpm 嘛!!

    关闭进程

    关闭进程就是一个 shutdown() 方法。这个方法必须在 start() 之后,在 WorkerStart 或其它回调函数中调用。

    $pool = new \Swoole\Process\Pool($workNum, SWOOLE_IPC_UNIXSOCK);
    
    $pool->on('WorkerStart', function(\Swoole\Process\Pool $pool, $workerId){
       if($workerId == 0){
           echo "Shutdown Worker:{$workerId}, pid:" . posix_getpid(), PHP_EOL;
           $pool->shutdown();
       }
    });
    
    $pool->on('Message', function(\Swoole\Process\Pool $pool, $workerId){
    });
    
    $pool->start();
    
    // [root@localhost source]# php 3.5进程池与进程管理器.php
    // Shutdown Worker:0, pid:7247
    
    // [root@localhost source]# ps -ef | grep php
    //    root      7246  4402  0 23:13 pts/1    00:00:00 php 3.5?程池与?程管理器.php
    //    root      7248  7246  0 23:13 pts/1    00:00:00 php 3.5?程池与?程管理器.php
    //    root      7249  7246  0 23:13 pts/1    00:00:00 php 3.5?程池与?程管理器.php
    //    root      7250  7246  0 23:13 pts/1    00:00:00 php 3.5?程池与?程管理器.php
    //    root      7251  7246  0 23:13 pts/1    00:00:00 php 3.5?程池与?程管理器.php

    脱离进程

    脱离进程的意思是将进程池当前 Worker 进程脱离管理,底层会立即创建新的进程,老的进程不再处理数据,由应用层代码自行管理它的生命周期。

    $pool = new \Swoole\Process\Pool(2);
    
    $pool->on('WorkerStart', function (\Swoole\Process\Pool $pool, $workerId) {
    
       $i = 0;
       while (1) {
           sleep(1);
           $i++;
           if ($i == 5) {
               echo "Detach Worker:{$workerId}, pid:" . posix_getpid(), PHP_EOL;
               $pool->detach();
           } else if ($i == 10) {
               break;
           }
       }
    
    });
    
    $pool->on("WorkerStop", function (\Swoole\Process\Pool $pool, $workerId) {
       echo "工作进程:{$workerId}, pid: " . posix_getpid() . " 结束运行!", PHP_EOL;
    });
    
    
    $pool->start();
    
    // [root@localhost source]# php 3.5进程池与进程管理器.php
    // Detach Worker:1, pid:16336
    // Detach Worker:0, pid:16335
    // 工作进程:0, pid: 16335 结束运行!
    // 工作进程:1, pid: 16336 结束运行!
    // [2021-12-23 23:33:42 @16334.0]	WARNING	ProcessPool::wait(): [Manager]unknown worker[pid=16335]
    // [2021-12-23 23:33:42 @16334.0]	WARNING	ProcessPool::wait(): [Manager]unknown worker[pid=16336]
    // Detach Worker:1, pid:16337
    // Detach Worker:0, pid:16338

    你可以把上面代码的 sleep() 时间拉长一下,然后就可以通过 ps 看到同时有更多的进程出现,进程池是在脱离触发后就马上拉起新的子进程,然后原来的子进程则是运行完自己的工作之后就自己释放掉了。

    进程管理器

    最后就是进程管理器,其实进程池底层就是基于进程管理器的。

    $pm = new \Swoole\Process\Manager();
    
    for ($i = 0; $i < 2; $i++) {
        $pm->add(function (\Swoole\Process\Pool $pool, $workerId) {
            echo "工作进程:{$workerId}, pid: " . posix_getpid() . " 开始运行!", PHP_EOL;
            while (1) ;
        });
    }
    
    $pm->start();

    使用 \Swoole\Process\Manager 对象的 add 方法直接就可以添加进程,其实就和我们之前去 new \Swoole\Process() 对象,并将对象放到一个 $workers 数组的感觉差不多,只不过在底层它处理的事情更多,比如说帮我们 wait() 进程,进行一些挂起操作等等。


    进程管理器还有一些别的方法,不过其实直接使用进程管理器的地方不多,毕竟我们还是直接使用进程池会更方便一些。大家可以自己去官方文档中进行更深入的了解。

    总结

    今天的内容就是关于 Swoole 中两个进程管理工具的学习,更多情况下我们直接使用进程池就好了。其实从这里,你也就能够想到了,之前我们看到过的各种服务,Http、TCP 这些服务器应用,其实就是在实现了服务接口协议之后,通过进程池拉起 Worker 进程来实现多进程服务处理的。而 TaskWorker 其实也是再分出来的另一套进程池,与主进程中的服务处理进程是分离的,可以实现具体任务的异步并行处理。


    当然,上面的内容是我自己的理解,如有纰漏,欢迎大家指正。


    测试代码:


    https://github.com/zhangyue0503/swoole/blob/main/3.Swoole%E8%BF%9B%E7%A8%8B/source/3.5%E8%BF%9B%E7%A8%8B%E6%B1%A0%E4%B8%8E%E8%BF%9B%E7%A8%8B%E7%AE%A1%E7%90%86%E5%99%A8.php


    参考文档:


    https://wiki.swoole.com/#/process/process_pool


    https://wiki.swoole.com/#/process/process_manager

    视频链接

    微信文章地址:https://mp.weixin.qq.com/s/J7dp86U0WrRwfDkQDJ6q0w

    B站视频地址:https://www.bilibili.com/video/BV1De411j7tq/

    微信视频地址:https://mp.weixin.qq.com/s/D8MB0IHDygAJpWZVJzXj1g

    搜索
    关注