标题:【Swoole系列3.4】进程间通信

文章目录
    分类:PHP 标签:Swoole

    进程间通信

    正常情况下,进程的用户空间是互相独立的,一般也是不能互相访问的,就像我们开启了两个一样的游戏,为了登录两个不同的帐号一起挂机。但是,系统空间是公共的区域,我们可以让不同的两个进程同时访问外部的另一个进程,比如说两套程序一起访问 Redis ,这就是一种进程间的通信。也可以访问外部的磁盘数据,共同操作一个文件,不过就像很多人会接触过的一个概念,那就是共同操作相同的数据可能会带来竞争的问题,导致数据不一致的情况发生。


    我们今天先不讨论并发竞争的问题,而是主要看看在 Swoole 中,如何不通过第三方来实现两个进程间的通讯。其实这也是大部分操作系统都提供的功能,也是很多其它语言在使用的解决方案。


    进程间通讯也叫做 IPC ,之前我们已经提到过一些 IPC 相关的内容,主要使用的就是 UnixSocket 来实现的。

    管道 Pipe

    管道方式是非常常用的一种方式,比如说我们打印一个进程回调函数中返回的 Process 对象参数,就可以看到这样的内容。

    new Swoole\Process(function (Swoole\Process $worker) {
        var_dump($worker);
    });
    // object(Swoole\Process)#3 (6) {
    //   ["pipe"]=>
    //   int(6)
    //   ["msgQueueId"]=>
    //   NULL
    //   ["msgQueueKey"]=>
    //   NULL
    //   ["pid"]=>
    //   int(2081)
    //   ["id"]=>
    //   NULL
    //   ["callback":"Swoole\Process":private]=>
    //   object(Closure)#4 (1) {
    //     ["parameter"]=>
    //     array(1) {
    //       ["$worker"]=>
    //       string(10) "<required>"
    //     }
    //   }
    // }

    注意到最上方的 pipe 数据没有,它就是一个管道 ID 。管道是啥?ps -ef | grep php 这个命令熟悉吧,中间的那个 | 就是让这两个命令形成了一个管道调用。前面的输出成为后面的输入。然后呢,我们的进程之间也可以利用这个能力来实现进程间的通讯。

    $workers = [];
    for ($i = 1; $i < 3; $i++) {
        $process = new Swoole\Process(function (Swoole\Process $worker) {
            var_dump($worker);
            $data = $worker->read();
            echo "Child {$worker->pid}:来自 Master 的数据 - \"{$data}\"。", PHP_EOL;
            $worker->write("发送给领导,来自 {$worker->pid} 的数据,通过管道 {$worker->pipe} !");
        });
        $process->start();
        $workers[$process->pid] = $process;
    }
    
    foreach ($workers as $pid => $process) {
        $process->write("{$pid},你好!");
        $data = $process->read();
        echo "Master:来自子进程 {$pid} 的数据 - \"{$data}\"", PHP_EOL;
    }
    
    // [root@localhost source]# php 3.4进程间通信.php
    // Child 2076:来自 Master 的数据 - "2076,你好!"。
    // Master:来自子进程 2076 的数据 - "发送给领导,来自 2076 的数据,通过管道 4 !"
    // Child 2077:来自 Master 的数据 - "2077,你好!"。
    // Master:来自子进程 2077 的数据 - "发送给领导,来自 2077 的数据,通过管道 6 !"

    看出来这里的门道了吧,父子进程之间其实在底层是通过 UnixSocket 进行通讯的,我们可以在父进程使用子进程对象的 write() 方法向这个指定的子进程写数据,然后在子进程的回调函数的 Process 的 read() 方法读取数据。反过来也是可以的,子进程的回调函数中 write() 然后父进程通过子进程对象的 read() 方法读取这条数据。


    父子进程之间没问题,那么同样的两个子进程之间可以吗?拿到对象了就可以嘛,我们的 write() 方法都是针对指定的那个子进程对象的。

    $process1 = new Swoole\Process(function (Swoole\Process $worker) {
        while(1){
            $data = $worker->read();
            if($data){
                echo "Child {$worker->pid}:接收到的数据 - \"{$data}\"。", PHP_EOL;
            }
        }
    });
    $process1->start();
    
    $messages = [
        "Hello World!",
        "Hello Cat!",
        "Hello King",
        "Hello Leon",
        "Hello Rose"
    ];
    $process2 = new Swoole\Process(function (Swoole\Process $worker) use($process1, $messages) {
        foreach($messages as $msg){
            sleep(1);
            $process1->write("来自 {$worker->pid} {$msg} !");
        }
        sleep(1);
        $worker->kill($process1->pid, SIGKILL);
    });
    $process2->start();
    \Swoole\Process::wait(true);
    \Swoole\Process::wait(true);
    
    //  [root@localhost source]# php 3.4进程间通信.php
    //  Child 2102:接收到的数据 - "来自 2103 Hello World! !"。
    //  Child 2102:接收到的数据 - "来自 2103 Hello Cat! !"。
    //  Child 2102:接收到的数据 - "来自 2103 Hello King !"。
    //  Child 2102:接收到的数据 - "来自 2103 Hello Leon !"。
    //  Child 2102:接收到的数据 - "来自 2103 Hello Rose !"。

    在这段测试代码中,第一个 process1 的回调操作中,我们一直循环并查看 read() 有没有值。第二个 process2 中则是通过 use 把 process1 对象传递进来了,然后给这个 process1 对象循环间隔 1 秒 write() 消息。


    注意我们要回收这两个进程,于是直接挂起主进程。在 process2 中,最底下还有一个 kill 操作,目的是关闭上面那个一直在死循环挂起的 process1 进程。这也是一种进程间通信方式,最后我们也会说。


    如果没问题的话,下面注释中的测试效果是一条一条出现的,每条打印间隔 1 秒。

    协程 exportSocket

    除了上述普通的进程间管道通信外,还有一种协程化的进程间通信。协程的内容我们放到下个大章节才会讲,所以这里我们就简单了解一下。

    $workers = [];
    for ($i = 1; $i < 3; $i++) {
        $process = new Swoole\Process(function (Swoole\Process $worker) {
            $socket = $worker->exportSocket();
            $data = $socket->recv();
            echo "Child {$worker->pid}:来自 Master 的数据 - \"{$data}\"。", PHP_EOL;
            $socket->send("发送给领导,来自 {$worker->pid} 的数据,通过管道 {$worker->pipe} !");
        }, false, SOCK_DGRAM, true);
        $process->start();
        $workers[$process->pid] = $process;
    }
    foreach ($workers as $pid => $process) {
        \Swoole\Coroutine\run(function () use ($pid, $process) {
            $socket = $process->exportSocket();
            $socket->send("{$pid},你好!");
            $data = $socket->recv();
            echo "Master:来自子进程 {$pid} 的数据 - \"{$data}\"", PHP_EOL;
        });
    }
    foreach ($workers as $pid => $worker) {
        \Swoole\Process::wait();
    }
    
    //  [root@localhost source]# php 3.4进程间通信.php
    //  Child 2062:来自 Master 的数据 - "2062,你好!"。
    //  Master:来自子进程 2062 的数据 - "发送给领导,来自 2062 的数据,通过管道 4 !"
    //  Child 2063:来自 Master 的数据 - "2063,你好!"。
    //  Master:来自子进程 2063 的数据 - "发送给领导,来自 2063 的数据,通过管道 6 !"

    其实差别不大,只是实例化 Process 的时候,我们需要将它最后一个参数设置为 true 。这个参数的意思在这个进程的回调函数中启用协程能力,开启之后就可以在这个子进程中直接使用协程相关的 API 。


    在这里,我们使用的是 exportSocket() 方法先获得了一个 Swoole\Coroutine\Socket 模块。这东西大家见过的,之前我们写异步 UDP 服务的时候就用过。从名字就能看出,它是一个 Socket 编程组件,其实说白了,就是直接返回底层的 UnixSocket 操作相关接口给我们使用了,因此,如果你有 Socket 相关编程经验的话,那就相当轻松了。


    在这段测试代码中,其实就是变成了使用 exportSocket() 方法返回的 Swoole\Coroutine\Socket 对象中的 send() 和 recv() 方法来操作数据。其它的和我们上面学习的 read() 和 write() 没什么区别。

    队列

    除了管道,或者说就是 UnixSocket 方式之外,还有一种方式就是队列,或者也可以叫做 消息队列 。它使用的是另一种 IPC 模式,也就是我们之前讲过的 sysvmsg 模式。你可以把它当成是进程中的一个数据结构,也就是我们学习数据结构时的那个队列。

    for ($i = 1; $i < 3; $i++) {
        $process = new Swoole\Process(function (Swoole\Process $worker) {
            while($msg = $worker->pop()) {
                if ($msg === false) {
                    break;
                }
                echo "Child {$worker->pid}:来自 Master 的数据 - \"{$msg}\"。", PHP_EOL;
                sleep(1);
            }
        });
        $process->useQueue(1, 2);
        // $process->useQueue(1, 1);
        // $process->useQueue(1, 1 | \Swoole\Process::IPC_NOWAIT);
        $process->start();
    }
    $messages = [
        "Hello World!",
        "Hello Cat!",
        "Hello King",
        "Hello Leon",
        "Hello Rose"
    ];
    
    foreach ($messages as $msg){
        $process->push($msg);
    }
    
    \Swoole\Process::wait(true);
    \Swoole\Process::wait(true);
    
    //  [root@localhost source]# php 3.4进程间通信.php
    //  Child 2071:来自 Master 的数据 - "Hello World!"。
    //  Child 2070:来自 Master 的数据 - "Hello Cat!"。
    //  Child 2071:来自 Master 的数据 - "Hello King"。
    //  Child 2070:来自 Master 的数据 - "Hello Leon"。
    //  Child 2071:来自 Master 的数据 - "Hello Rose"。

    对于消息队列来说,我们只需要在创建子进程后设置一个 useQueue() 方法,它的参数第一个是标识 Key ,第二个是争抢模式,如果设置为 1 则是普通的队列消费,只有一个进程会去消费队列中的数据。如果设置为 2 的话,所有的子进程会进入争抢模式,就像我们这里输出的内容一样。


    那么能指定给一个子进程发送队列信息吗?可能真不行,队列是公共的,谁都可以来消费。因此,我们只需要给一个子进程对象 push() 数据,就可以实现所有子进程对象的消费。


    另外,在设置第二个参数的时候,我们还可以设置一个 \Swoole\Process::IPC_NOWAIT 常量,可以将队列设置为非阻塞的,比如上面这个测试代码运行完成后,主进程还是在挂起状态,因为我们的子进程中还在等待 pop() 数据。如果设置了 IPC_NOWAIT 的话,pop() 为空或 push() 已满的状态下,进程将不在阻塞。这个大家可以自己动手试试一试哦。

    信号

    最后就来说说信号通信。这东西吧,就是 Linux 自带的一个东西。干嘛用的呢?当我们执行一个程序的时候,如果想停止,一般会 Ctrl+c 一下吧,其实这个操作就是给这个程序发送了一个要关闭它的信号。同样地,我们 kill -9 也是在向程序发送信号,另外重启 php-fpm 的命令 kill -USR2 php-fpm 其实也是在向它发送信号。


    一般来说,信号都是通过 kill 命令传达的,大部分情况下,我们是做一些终止、重启操作的。从名字也能看出来嘛,杀死进程。但不同的信号其实又有不同的含义,比如说 -USR2 表示的用户自定义信号2,我们可以监听这个信号并进行一些特殊的操作。


    关于信号的内容其实远比我们看到的要复杂,同时也是 Unix 类操作系统的重要知识。但我们早就已经应用过了,还记得异步 wait() 进程的代码吧。

    Swoole\Process::signal(SIGCHLD, function ($sig) {
        //必须为false,非阻塞模式
        while ($ret = Swoole\Process::wait(false)) {
            echo "PID={$ret['pid']}\n";
        }
    });

    这就是在监听 SIGCHLD 这个信号,表示的是子进程状态改变时的操作。今天我们再弄个复杂一点的内容,让一个子进程发送消息给父进程,然后父进程关闭所有子进程结束程序运行。

    $ppid = getmypid();
    $workers = [];
    for ($i = 1; $i < 3; $i++) {
        $process = new Swoole\Process(function (Swoole\Process $worker) use ($ppid, $i) {
            sleep($i*3);
            echo "PID:{$worker->pid} kill -USR2 Master {$ppid}.", PHP_EOL;
            $worker->kill($ppid, SIGUSR2);
        });
    
        $process->start();
        $workers[$process->pid] = $process;
    }
    
    Swoole\Process::signal(SIGUSR2, function () use ($workers) {
        echo "收到子进程 USR2 信号,结束所有子进程!", PHP_EOL;
        foreach ($workers as $pid => $w) {
            Swoole\Process::kill($pid, SIGKILL);
        }
    });
    
    Swoole\Process::signal(SIGCHLD, function ($sig) {
        //必须为false,非阻塞模式
        while ($ret = Swoole\Process::wait(false)) {
            echo "PID={$ret['pid']}\n";
        }
    });
    Swoole\Timer::tick(2000, function () {});
    
    //  [root@localhost source]# php 3.4进程间通信.php
    //  PID:2044 kill -USR2 Master 2043.
    //  收到子进程 USR2 信号,结束所有子进程!
    //  PID=2044
    //  PID=2045

    我们的两个子进程分别 sleep() 了 3 秒和 6 秒。比较快运行完的肯定是第一个 3 秒的子进程,这时候,我们向主进程发送一个 kill() ,并指定信号为 SIGUSR2 ,也就是我们最常见的 -USR2 这种。然后在主进程的监听中,打印收到信号了,并使用 SIGKILL 结束所有子进程,SIGKILL 其实就是我们最熟悉的 -9 。


    运行结果其实就是当 SIGUSR2 监听完成后,会马上 kill 掉两个子进程,它们都马上被 SIGCHLD 回收,而不是等待第二个进程 sleep() 6 秒之后才结束。

    总结

    一口气又了解了这么多内容,是不是感觉意犹未尽呀。三种通信方式:管道、队列、信号,你都了解了吗?在这里我也只敢说了解,完全达不到掌握精通的程序,能写出来也是查阅了相当多的资料。说实话,如果只是做传统的 PHP 开发,真的很难接触到这些内容,因为我们不用考虑这些事,php-fpm 已经帮我们处理好了。说到这里,下篇我们就将要学习的是进程池与进程管理相关的内容,没错,就是 php-fpm 干的活。


    测试代码:


    https://github.com/zhangyue0503/swoole/blob/main/3.Swoole%E8%BF%9B%E7%A8%8B/source/3.4%E8%BF%9B%E7%A8%8B%E9%97%B4%E9%80%9A%E4%BF%A1.php


    参考文档:


    https://wiki.swoole.com/#/process/process


    https://wiki.swoole.com/wiki/page/216.html


    https://wiki.swoole.com/wiki/page/290.html

    视频链接

    微信文章地址:https://mp.weixin.qq.com/s/uftbLUHeQ1hzChpN2D-g7A

    微信视频地址:https://mp.weixin.qq.com/s/sWcTWeWVDrmUJxJatAUaPg

    B站视频地址:https://www.bilibili.com/video/BV1Le411T7xg/

    搜索
    关注