标题:【Swoole系列4.2】协程应用与容器

文章目录
    分类:PHP 标签:Swoole

    协程应用与容器

    在了解了协程服务如何启动之后,我们再来继续进行协程核心相关功能的应用学习。另外,我们还会在今天讲一下上一次没有说清楚的协程容器。

    协程应用

    对于协程的应用来说,最主要的其实就是官方文档中的协程核心 API ,其实也就是协程相关的那些方法函数。首先声明,我也是第一次学习协程相关的知识,之前并没有学习 Go ,所以没法做对比,或许在将来学习 Go 的时候,我会拿 Swoole 的协程去和 Go 的进行对比。

    $cid = Swoole\Coroutine::create(function(){
        //    sleep(10);
        Swoole\Coroutine::sleep(10);
        echo "协程1,cid:" . Swoole\Coroutine::getCid() , PHP_EOL;
    });
    go(function(){
        echo "协程2,cid:" . Co::getCid() , PHP_EOL;
    
        $ccid = go(function(){
            co::sleep(10);
            echo "协程2-1,cid:" . co::getCid() . ",pcid:" . co::getPcid(), PHP_EOL;
        });
        echo "协程2-1,pcid:" . Co::getPcid($ccid), PHP_EOL;
    });
    
    echo $cid, PHP_EOL;
    //[root@localhost source]# php 4.2协程应用与容器.php
    //协程2,cid:2
    //协程2-1,pcid:2
    //1
    //协程1,cid:1
    //协程2-1,cid:3,pcid:2

    最简单的一段协程代码,在这里我们演示了几种不同协程 API 使用方式。第一就是 Swoole\Coroutine::create() 和 go() ,这两个方法都是用于创建协程的,go() 函数是一个别名,需要保证 Swoole 配置中开启了 swoole.use_shortname ,这个东西是在 php.ini 文件中配置的。将来讲框架的时候还会讲到这个配置。


    然后在协程中,我们可以用 Swoole\Coroutine ,也可以使用 Co ,甚至是使用小写的 co 三种方式调用协程相关的函数方法。第一个协程中我们使用的是 Swoole\Coroutine ,第二个协程中我们使用了 Co ,第二个协程的子协程中,我们使用了小写的 co 。三种方式都没问题,Co 和 co 都是 Swoole\Coroutine 的别名。


    之前我们讲过,协程,不是并发的多线程,而是在一个线程上的多个函数调度执行。因此,如果我们在第一个协程中使用了注释上的 sleep() 的话,那么当前就会阻塞。注意,我们没有开启子进程,在当前主进程环境下,是单进程单线程的执行方式,后面的协程是需要等待阻塞的协程完成工作后才会运行的。


    我们使用 co::sleep() 方法,实现的是协程调度的休息功能,底层会挂起这个协程,这个我们后面讲 co::yield() 的时候会再说。使用这个方法就可以以用户态的方式让出这段协程的执行,从这里就可以看出,协程是我们可以控制的,不是像多进程和多线程一样是系统自动 IO 调度的。


    协程可以继续开子协程,并且可以一直向下开,这是一个栈式调用。本身协程就可以看做是运行在线程之上的一堆函数。子协程其实就是函数内部再调用函数这样一层一层的函数栈。如果不理解这一块的话,可以看下我们之前学的 PHP数据结构 中关于栈相关的内容。

    协程调度控制

    紧接着我们就来看看协程的调度控制。一直在说协程是用户态的,可以我们自己来控制它的执行,这个能力其实主要就是通过下面两个方法来实现的。

    $cid1 = go(function(){
       echo "协程1,cid:" . Co::getCid() , " start", PHP_EOL;
       co::yield();
       echo "协程1,cid:" . Co::getCid() , " end", PHP_EOL;
    });
    
    go(function() use($cid1){
       echo "协程2,cid:" . Co::getCid() , " start", PHP_EOL;
       co::sleep(5);
       co::resume($cid1);
       echo "协程2,cid:" . Co::getCid() , " end", PHP_EOL;
    });
    // [root@localhost source]# php 4.2协程应用与容器.php
    // 协程1,cid:1 start
    // 协程2,cid:2 start
    // 协程1,cid:1 end
    // 协程2,cid:2 end

    yield() 函数用于让出当前协程的执行权,它不基于 IO 调度,是我们手动调度的,它还有一个别名 suspend() 。而 resume() 则是手动恢复某个协程,让它继续运行。当某一个协程使用 yield() 挂起之后,就需要通过 resume() 来两次唤醒。它们一般都是成对出现的,如果一直没有 resume() ,则可能造成 协程泄露 ,被挂起的协程会一直占用资源,但永远不会向后继续执行。


    上面的测试代码其实可以看出,协程1打印了 start 之后,就通过 yield() 挂起,然后协程2执行,休息 5 秒后重新唤起协程1,协程1完成执行,协程2完成执行。


    co::sleep() 这个函数实际上就是在底层实现了一套 yield()+resume() 的机制,在底层启动另一个协程并设置计时器,当时间到了之后就 resume() 当前的协程,从而实现协程的休眠挂起。

    资源释放

    go(function(){
        defer(function(){
            echo "一定会进来!", PHP_EOL;
        });
        defer(function(){
            echo "一定会进来2!", PHP_EOL;
        });
        throw new Exception("发生异常了");
    });
    //[root@localhost source]# php 4.2协程应用与容器.php
    //一定会进来2!
    //一定会进来!
    //PHP Fatal error:  Uncaught Exception: 发生异常了 in /home/www/4.Swoole协程/source/4.2协程应用与容器.php:52
    //Stack trace:
    //#0 [internal function]: {closure}()
    //#1 {main}
    //  thrown in /home/www/4.Swoole协程/source/4.2协程应用与容器.php on line 52

    defer() 是一个很特别的函数,它一般用于资源的释放。在协程中,要早早的注册这个函数,然后即使是协程发生异常了,也一定会执行它。这样就可以避免一些资源在异常情况下无法释放的问题。它的调用顺序是逆序的(先进后出),也就是先注册 defer 的后执行,符合资源释放的正确逻辑,后申请的资源可能是基于先申请的资源的,所以肯定不能让先申请的资源先释放啦。


    另外一定要先注册哦,如果在异常之后注册,那么肯定是没效果的,毕竟函数都还没有注册就出问题了,那就只能外部去处理异常了。

    打印调用栈

    function test1(){
       test2();
    }
    function test2(){
       while(1) {
           co::sleep(1);
           break;
       }
    }
    
    go(function(){
       $cid = go(function(){test1();});
       var_dump(co::getBackTrace($cid));
    });
    //[root@localhost source]# php 4.2协程应用与容器.php
    //array(4) {
    //    [0]=>
    //  array(6) {
    //        ["file"]=>
    //    string(60) "/home/www/4.Swoole协程/source/4.2协程应用与容器.php"
    //        ["line"]=>
    //    int(59)
    //    ["function"]=>
    //    string(5) "sleep"
    //        ["class"]=>
    //    string(16) "Swoole\Coroutine"
    //        ["type"]=>
    //    string(2) "::"
    //        ["args"]=>
    //    array(1) {
    //            [0]=>
    //      int(1)
    //    }
    //  }
    //  [1]=>
    //  array(4) {
    //        ["file"]=>
    //    string(60) "/home/www/4.Swoole协程/source/4.2协程应用与容器.php"
    //        ["line"]=>
    //    int(55)
    //    ["function"]=>
    //    string(5) "test2"
    //        ["args"]=>
    //    array(0) {
    //        }
    //  }
    //  [2]=>
    //  array(4) {
    //        ["file"]=>
    //    string(60) "/home/www/4.Swoole协程/source/4.2协程应用与容器.php"
    //        ["line"]=>
    //    int(65)
    //    ["function"]=>
    //    string(5) "test1"
    //        ["args"]=>
    //    array(0) {
    //        }
    //  }
    //  [3]=>
    //  array(2) {
    //        ["function"]=>
    //    string(9) "{closure}"
    //        ["args"]=>
    //    array(0) {
    //        }
    //  }
    //}

    getBackTrace() 方法可以获取协程函数的调用栈。在上面的测试代码中,我们可以看到当前协程调用了 test1() ,test1() 中又调用了 test2() ,test2() 里面又使用了 sleep() ,一整个调用路径都可以清晰地看到。

    协程运行时间及一些杂项函数

    $cid = go(function(){
       co::sleep(2.2);
       echo co::getElapsed(), PHP_EOL;
    });
    echo co::getElapsed($cid), PHP_EOL;
    //0
    //2204
    
    var_dump(\Swoole\Coroutine::exists($cid)); // bool(true)
    var_dump(iterator_to_array(\Swoole\Coroutine::list()));
    var_dump(iterator_to_array(\Swoole\Coroutine::listCoroutines()));
    //array(1) {
    //    [0]=>
    //  int(1)
    //}
    var_dump(Swoole\Coroutine::stats());
    //array(9) {
    //    ["event_num"]=>
    //  int(0)
    //  ["signal_listener_num"]=>
    //  int(0)
    //  ["aio_task_num"]=>
    //  int(0)
    //  ["aio_worker_num"]=>
    //  int(0)
    //  ["aio_queue_size"]=>
    //  int(0)
    //  ["c_stack_size"]=>
    //  int(2097152)
    //  ["coroutine_num"]=>
    //  int(1)
    //  ["coroutine_peak_num"]=>
    //  int(1)
    //  ["coroutine_last_cid"]=>
    //  int(1)
    //}

    getElapsed() 方法用于获得协程的运行时间,可以用于调试以及查看是否出现僵尸协程。


    exists() 可以判断指定的协程 ID 是否存在。list() 以及它的别名函数 listCoroutines() 用于查看当前运行或挂起中的协程数量。stats() 查看协程的具体运行状态信息。


    注意上述的内容都是要有在运行的协程才能看到的。毕竟协程都运行完了,exists() 肯定就是不存在了,list() 中也不会有数据了。

    取消执行

    Swoole\Coroutine\run(function(){
       $cid = go(function(){
           echo co::getCid() , " 开始", PHP_EOL;
           co::sleep(10);
           var_dump(co::isCanceled());
           echo co::getCid() , " 结束", PHP_EOL;
       });
       co::cancel($cid);
    });
    // [root@localhost source]# php 4.2协程应用与容器.php
    // 2 开始
    // bool(true)
    // 2 结束

    cancel() 可以取消一个协程的执行,就像上面的代码一样,本身我们要挂起 10 秒钟,但是我们在外面马上就取消了,这个协程也就马上结束运行了,不会再等待 10 秒。注意,这两个方法函数是需要工作在 协程容器 中的。所以我们外面套上了 Swoole\Coroutine\run 方法,后面我们就会讲到 协程容器 。

    并行协程

    前面说过,普通的 sleep() 或者阻塞 IO 相关的操作会让协程排队等待。这是协程的特性,但是,Swoole 其实也为我们提供了一些并行执行协程的方式。

    $start_time = microtime(true);
    Swoole\Coroutine\run(function () {
       Swoole\Coroutine::join([
           go(function () {
               sleep(3);
               echo 1, PHP_EOL;
           }),
           go(function () {
               sleep(4);
               echo 2, PHP_EOL;
           })
       ], 1);
    });
    echo microtime(true) - $start_time, PHP_EOL;
    //[root@localhost source]# php 4.2协程应用与容器.php
    //1
    //2
    //4.0021550655365
    
    $start_time = microtime(true);
    Swoole\Coroutine\run(function () {
       $results = Swoole\Coroutine\batch([
           'func1'=>function(){sleep(1);return 'func1 ok';},
           'func2'=>function(){sleep(2);return 'func2 ok';},
           'func3'=>function(){sleep(3);return 'func3 ok';}
       ],5);
       var_dump($results);
    });
    echo microtime(true) - $start_time, PHP_EOL;
    //[root@localhost source]# php 4.2协程应用与容器.php
    //array(3) {
    //    ["func1"]=>
    //  string(8) "func1 ok"
    //    ["func2"]=>
    //  string(8) "func2 ok"
    //    ["func3"]=>
    //  string(8) "func3 ok"
    //}
    //3.002023935318

    上面两种方式都可以并行地执行协程,我们都是使用原生的 sleep() ,如果是默认情况下,第一个测试需要等待 7 秒,第二个测试需要等待 6 秒,但是在并发的情况下,多个协程是一块执行的,所以都应该是以最大的等待时间为结果,也就是第一条的结果是 4 秒,第二个测试的结果的是 3 秒。


    join() 函数是直接运行协程的内容,batch() 则是批量生成协程并获取运行结果的返回值。另外还有 Swoole\Coroutine\parallel() 方法,它和 join() 的效果是类似的。


    join() 具体是什么原理我不太清楚,而且都是比较新的版本才支持的新方法函数。但 batch() 和 parallel() 是 Swoole 中的 Library 提供的方法,这些方法是用 PHP 实现的一些协程辅助函数,我们是可以懂它们的源码的。

    function batch(array $tasks, float $timeout = -1): array
    {
        $wg = new WaitGroup(count($tasks));
        foreach ($tasks as $id => $task) {
            Coroutine::create(function () use ($wg, &$tasks, $id, $task) {
                $tasks[$id] = null;
                $tasks[$id] = $task();
                $wg->done();
            });
        }
        $wg->wait($timeout);
        return $tasks;
    }
    
    function parallel(int $n, callable $fn): void
    {
        $count = $n;
        $wg = new WaitGroup($n);
        while ($count--) {
            Coroutine::create(function () use ($fn, $wg) {
                $fn();
                $wg->done();
            });
        }
        $wg->wait();
    }

    很明显,这两个方法使用的是 WaitGroup ,这个东西我们在后面的文章中也会学习到,现在先知道它们是用这个实现的就好了。其实从名字也能看出了,这肯定是一个以组的方式管理协程的对象嘛!

    协程容器

    我们已经看到过很多次 协程容器 这四个字了吧。现在就来看看它真正的含义。


    所有的协程必须在协程容器里面创建,Swoole 程序启动的时候大部分情况会自动创建协程容器,用 Swoole 启动程序的方式一共有三种:

    • 调用异步风格服务端程序的 start 方法,此种启动方式会在事件回调中创建协程容器,参考 enable_coroutine。

    • 调用 Swoole 提供的 2 个进程管理模块 Process 和 Process\Pool 的 start 方法,此种启动方式会在进程启动的时候创建协程容器,参考这两个模块构造函数的 enable_coroutine 参数。

    • 其他直接裸写协程的方式启动程序,需要先创建一个协程容器 (Coroutine\run() 函数,可以理解为 java、c 的 main 函数)

    其实在上面的测试代码中,你会发现有一些方法函数是需要在 run() 中运行的,而有一些是不需要的。当然,最好还是都在 run() 中运行。后面我们其它文章中都会标准的放到 run() 中去。


    在之前学习的各种服务应用(TCP、HTTP)中,都可以通过 $server->set() 方法来指定 enable_coroutine 参数来开启或关闭协程支持。当开启了之后,所有的事件监听中都会使用到协程。而在进程中,它们的构造函数上其实就直接有是否开启协程支持的参数,默认都是不开启。这两个部分的设置其实都是在它们的进程的内部开启了一个协程容器。


    协程容器 Coroutine\run() 方法本身是一个 Coroutine\Scheduler 对象的简化操作,或者说是一个语法糖。我们也可以自己手动来创建并操作协程容器。

    $sch = new Swoole\Coroutine\Scheduler;
    $sch->set(['max_coroutine' => 2]);
    for($i=0;$i<3;$i++){
       $sch->add(function () {
           co::sleep(5);
           echo co::getCid(), PHP_EOL;
       });
    }
    $sch->start();
    //PHP Warning:  Swoole\Coroutine\Scheduler::start(): exceed max number of coroutine 2 in /home/www/4.Swoole协程/source/4.2协程应用与容器.php on line 199
    
    $sch = new Swoole\Coroutine\Scheduler;
    $sch->set(['max_coroutine' => 3]);
    for($i=0;$i<3;$i++){
       $sch->add(function () {
           co::sleep(5);
           echo co::getCid(), PHP_EOL;
       });
    }
    $sch->start();
    //[root@localhost source]# php 4.2协程应用与容器.php
    //1
    //3
    //2

    我们需要实例化一个 Swoole\Coroutine\Scheduler 对象,然后可以通过 add() 添加协程回调函数。在设置中可以通过 max_coroutine 来设置在这个协程容器中的最大可以放置的协程数量,超过数量会报错。最后通过 start() 来启动这个协程容器。


    除了 add() 之外 ,我们还可以通过 parallel() 批量地创建协程。

    $sch = new Swoole\Coroutine\Scheduler;
    $sch->parallel(10, function () {
       co::sleep(5);
       echo co::getCid(), PHP_EOL;
    });
    $sch->start();
    //[root@localhost source]# php 4.2协程应用与容器.php
    //1
    //10
    //9
    //8
    //7
    //6
    //5
    //4
    //3
    //2

    start() 启动之后,所有的协程都执行完了它才会返回 true 。协程容器是不能嵌套的,也就是说 run() 里面不能再套 run() 了。

    $ret = Swoole\Coroutine\run(function () {
    //    Swoole\Coroutine\run(function () {
    //
    //    });
        go(function () {
            sleep(4);
            echo 1, PHP_EOL;
        });
    });
    echo $ret, PHP_EOL;
    Swoole\Coroutine\run(function () {
        go(function () {
            echo 2, PHP_EOL;
        });
    });
    // [root@localhost source]# php 4.2协程应用与容器.php
    // 1
    // 1
    // 2

    总结

    今天的内容不少吧,但其实仔细看看的话并没有介绍太多的东西。我们看到的就是核心的那几个协程相关的方法和函数,另外就是协程容器相关的知识。大家只要跟着手码一遍就能明白协程在 Swoole 中最核心的应用了。


    下一篇文章再来看看协程相关的一些扩展函数方法。


    测试代码:


    https://github.com/zhangyue0503/swoole/blob/main/4.Swoole%E5%8D%8F%E7%A8%8B/source/4.2%E5%8D%8F%E7%A8%8B%E5%BA%94%E7%94%A8%E4%B8%8E%E5%AE%B9%E5%99%A8.php


    参考文档:


    https://wiki.swoole.com/#/coroutine/coroutine


    https://wiki.swoole.com/#/coroutine/scheduler

    视频链接

    微信文章地址:https://mp.weixin.qq.com/s/ZEMTl4O19QcopDz0vrupOg

    B站视频地址:https://www.bilibili.com/video/BV1UG4y1p7oU

    微信视频地址:https://mp.weixin.qq.com/s/KDdkViungAKNzwooj8c0aQ

    搜索
    关注