interprocess communication

Under normal circumstances, the user space of the process is independent of each other, and generally cannot access each other, just like we have opened two identical games and hung up together in order to log in to two different accounts. However, the system space is a common area, and we can allow two different processes to access another external process at the same time. For example, two sets of programs access Redis together, which is a kind of inter-process communication. It is also possible to access external disk data and jointly operate a file, but just like a concept that many people have come into contact with, operating the same data together may cause competition problems and lead to data inconsistency.

Today, we will not discuss the problem of concurrent competition, but mainly look at how to realize the communication between two processes without a third party in Swoole. In fact, this is also a function provided by most operating systems, and it is also a solution used by many other languages.

Inter-process communication is also called IPC. We have mentioned some IPC-related content before, which is mainly implemented by UnixSocket.

Pipe

The pipeline method is a very common method. For example, if we print the Process object parameters returned in a process callback function, we can see such content.

new Swoole\Process(function (Swoole\Process $worker) {
    var_dump($worker);
});
// object(Swoole\Process)#3 (6) {
//   ["pipe"]=>
//   int(6)
//   ["msgQueueId"]=>
//   NULL
//   ["msgQueueKey"]=>
//   NULL
//   ["pid"]=>
//   int(2081)
//   ["id"]=>
//   NULL
//   ["callback":"Swoole\Process":private]=>
//   object(Closure)#4 (1) {
//     ["parameter"]=>
//     array(1) {
//       ["$worker"]=>
//       string(10) "<required>"
//     }
//   }
// }

Notice that there is no pipe data at the top, it is a pipe ID. What is a pipeline? Familiar with the command ps -ef | grep php, the | in the middle makes these two commands form a pipeline call. The output of the front becomes the input of the latter. Then, our processes can also use this capability to achieve inter-process communication.

$workers = [];
for ($i = 1; $i < 3; $i++) {
    $process = new Swoole\Process(function (Swoole\Process $worker) {
        var_dump($worker);
        $data = $worker->read();
        echo "Child {$worker->pid}:来自 Master 的数据 - \"{$data}\"。", PHP_EOL;
        $worker->write("发送给领导,来自 {$worker->pid} 的数据,通过管道 {$worker->pipe} !");
    });
    $process->start();
    $workers[$process->pid] = $process;
}

foreach ($workers as $pid => $process) {
    $process->write("{$pid},你好!");
    $data = $process->read();
    echo "Master:来自子进程 {$pid} 的数据 - \"{$data}\"", PHP_EOL;
}

// [root@localhost source]# php 3.4进程间通信.php
// Child 2076:来自 Master 的数据 - "2076,你好!"。
// Master:来自子进程 2076 的数据 - "发送给领导,来自 2076 的数据,通过管道 4 !"
// Child 2077:来自 Master 的数据 - "2077,你好!"。
// Master:来自子进程 2077 的数据 - "发送给领导,来自 2077 的数据,通过管道 6 !"

You can see the doorway here. The parent and child processes actually communicate through UnixSocket at the bottom layer. We can use the write() method of the child process object in the parent process to write data to the specified child process, and then write data to the specified child process in the child process. The read() method of the Process callback function reads the data. The reverse is also possible, write() in the callback function of the child process, and then the parent process reads this data through the read() method of the child process object.

There is no problem between the parent and child processes, so can it be between the same two child processes? Just get the object, our write() method is for the specified child process object.

$process1 = new Swoole\Process(function (Swoole\Process $worker) {
    while(1){
        $data = $worker->read();
        if($data){
            echo "Child {$worker->pid}:接收到的数据 - \"{$data}\"。", PHP_EOL;
        }
    }
});
$process1->start();

$messages = [
    "Hello World!",
    "Hello Cat!",
    "Hello King",
    "Hello Leon",
    "Hello Rose"
];
$process2 = new Swoole\Process(function (Swoole\Process $worker) use($process1, $messages) {
    foreach($messages as $msg){
        sleep(1);
        $process1->write("来自 {$worker->pid} {$msg} !");
    }
    sleep(1);
    $worker->kill($process1->pid, SIGKILL);
});
$process2->start();
\Swoole\Process::wait(true);
\Swoole\Process::wait(true);

//  [root@localhost source]# php 3.4进程间通信.php
//  Child 2102:接收到的数据 - "来自 2103 Hello World! !"。
//  Child 2102:接收到的数据 - "来自 2103 Hello Cat! !"。
//  Child 2102:接收到的数据 - "来自 2103 Hello King !"。
//  Child 2102:接收到的数据 - "来自 2103 Hello Leon !"。
//  Child 2102:接收到的数据 - "来自 2103 Hello Rose !"。

In this test code, in the callback operation of the first process1, we keep looping and checking to see if read() has a value. In the second process2, the process1 object is passed in through use, and then the write() message is cycled to the process1 object for 1 second.

Note that we want to recycle these two processes, so we directly suspend the main process. In process2, there is also a kill operation at the bottom, the purpose is to close the above process1 process that has been suspended in an infinite loop. This is also a way of inter-process communication, which we will also talk about at the end.

If there is no problem, the test effects in the comments below appear one by one, and each print interval is 1 second.

Coroutine exportSocket

In addition to the above-mentioned common inter-process pipeline communication, there is also a coroutine inter-process communication. We will talk about the content of the coroutine in the next chapter, so here we will briefly understand.

$workers = [];
for ($i = 1; $i < 3; $i++) {
    $process = new Swoole\Process(function (Swoole\Process $worker) {
        $socket = $worker->exportSocket();
        $data = $socket->recv();
        echo "Child {$worker->pid}:来自 Master 的数据 - \"{$data}\"。", PHP_EOL;
        $socket->send("发送给领导,来自 {$worker->pid} 的数据,通过管道 {$worker->pipe} !");
    }, false, SOCK_DGRAM, true);
    $process->start();
    $workers[$process->pid] = $process;
}
foreach ($workers as $pid => $process) {
    \Swoole\Coroutine\run(function () use ($pid, $process) {
        $socket = $process->exportSocket();
        $socket->send("{$pid},你好!");
        $data = $socket->recv();
        echo "Master:来自子进程 {$pid} 的数据 - \"{$data}\"", PHP_EOL;
    });
}
foreach ($workers as $pid => $worker) {
    \Swoole\Process::wait();
}

//  [root@localhost source]# php 3.4进程间通信.php
//  Child 2062:来自 Master 的数据 - "2062,你好!"。
//  Master:来自子进程 2062 的数据 - "发送给领导,来自 2062 的数据,通过管道 4 !"
//  Child 2063:来自 Master 的数据 - "2063,你好!"。
//  Master:来自子进程 2063 的数据 - "发送给领导,来自 2063 的数据,通过管道 6 !"

In fact, the difference is not big, but when instantiating Process, we need to set its last parameter to true. This parameter means that the coroutine capability is enabled in the callback function of this process, and after it is enabled, the coroutine-related API can be used directly in this subprocess.

Here, we use the exportSocket() method to first obtain a Swoole\Coroutine\Socket module. You have seen this thing, and we used it before when we wrote asynchronous UDP services. As can be seen from the name, it is a Socket programming component. In fact, it directly returns the underlying UnixSocket operation related interface for us to use. Therefore, if you have Socket related programming experience, it is quite easy.

In this test code, it actually becomes to use the send() and recv() methods in the Swoole\Coroutine\Socket object returned by the exportSocket() method to operate the data. The rest are no different from the read() and write() we learned above.

queue

In addition to the pipeline, or the UnixSocket method, there is also a queue, or it can also be called a message queue. It uses another IPC mode, the sysvmsg mode we talked about earlier. You can think of it as a data structure in the process, which is the queue when we learned about data structures.

for ($i = 1; $i < 3; $i++) {
    $process = new Swoole\Process(function (Swoole\Process $worker) {
        while($msg = $worker->pop()) {
            if ($msg === false) {
                break;
            }
            echo "Child {$worker->pid}:来自 Master 的数据 - \"{$msg}\"。", PHP_EOL;
            sleep(1);
        }
    });
    $process->useQueue(12);
    // $process->useQueue(1, 1);
    // $process->useQueue(1, 1 | \Swoole\Process::IPC_NOWAIT);
    $process->start();
}
$messages = [
    "Hello World!",
    "Hello Cat!",
    "Hello King",
    "Hello Leon",
    "Hello Rose"
];

foreach ($messages as $msg){
    $process->push($msg);
}

\Swoole\Process::wait(true);
\Swoole\Process::wait(true);

//  [root@localhost source]# php 3.4进程间通信.php
//  Child 2071:来自 Master 的数据 - "Hello World!"。
//  Child 2070:来自 Master 的数据 - "Hello Cat!"。
//  Child 2071:来自 Master 的数据 - "Hello King"。
//  Child 2070:来自 Master 的数据 - "Hello Leon"。
//  Child 2071:来自 Master 的数据 - "Hello Rose"。

For the message queue, we only need to set a useQueue() method after creating the child process. The first parameter is the identification Key, and the second is the contention mode. If it is set to 1, it is the normal queue consumption. Only one process will consume the data in the queue. If set to 2, all child processes will enter contention mode, just like what we output here.

So can it be assigned to a child process to send queue information? It may not work. The queue is public and anyone can consume it. Therefore, we only need to push() data to a child process object to achieve the consumption of all child process objects.

In addition, when setting the second parameter, we can also set a \Swoole\Process::IPC_NOWAIT constant to set the queue to be non-blocking. For example, after the above test code is completed, the main process is still in a suspended state , because our child process is still waiting for pop() data. If IPC_NOWAIT is set, the process will not block when pop() is empty or push() is full. You can try this by yourself.

Signal

Finally, let's talk about signal communication. It's something that comes with Linux. What is it for? When we execute a program, if we want to stop, we usually press Ctrl+c. In fact, this operation is to send a signal to the program to close it. Similarly, our kill -9 is also sending a signal to the program, and the command to restart php-fpm kill -USR2 php-fpm is actually sending a signal to it.

Generally speaking, signals are conveyed through the kill command. In most cases, we do some termination and restart operations. As can be seen from the name, kill the process. But different signals actually have different meanings. For example, the user-defined signal 2 represented by -USR2, we can monitor this signal and perform some special operations.

The content of signals is actually far more complex than we have seen, and it is also an important knowledge of Unix-like operating systems. But we have already applied it, remember the code of the asynchronous wait() process.

Swoole\Process::signal(SIGCHLD, function ($sig) {
    //必须为false,非阻塞模式
    while ($ret = Swoole\Process::wait(false)) {
        echo "PID={$ret['pid']}\n";
    }
});

This is listening for the SIGCHLD signal, which indicates the operation when the state of the child process changes. Today, let's get a little more complicated, let a child process send a message to the parent process, and then the parent process closes all child processes and ends the program running.

$ppid = getmypid();
$workers = [];
for ($i = 1; $i < 3; $i++) {
    $process = new Swoole\Process(function (Swoole\Process $worker) use ($ppid, $i) {
        sleep($i*3);
        echo "PID:{$worker->pid} kill -USR2 Master {$ppid}.", PHP_EOL;
        $worker->kill($ppid, SIGUSR2);
    });

    $process->start();
    $workers[$process->pid] = $process;
}

Swoole\Process::signal(SIGUSR2, function () use ($workers) {
    echo "收到子进程 USR2 信号,结束所有子进程!", PHP_EOL;
    foreach ($workers as $pid => $w) {
        Swoole\Process::kill($pid, SIGKILL);
    }
});

Swoole\Process::signal(SIGCHLD, function ($sig) {
    //必须为false,非阻塞模式
    while ($ret = Swoole\Process::wait(false)) {
        echo "PID={$ret['pid']}\n";
    }
});
Swoole\Timer::tick(2000function () {});

//  [root@localhost source]# php 3.4进程间通信.php
//  PID:2044 kill -USR2 Master 2043.
//  收到子进程 USR2 信号,结束所有子进程!
//  PID=2044
//  PID=2045

Our two child processes sleep() for 3 and 6 seconds respectively. It must be the first 3-second child process that runs faster. At this time, we send a kill() to the main process and specify the signal as SIGUSR2, which is our most common -USR2. Then in the monitoring of the main process, print the received signal, and use SIGKILL to end all child processes. SIGKILL is actually the most familiar -9.

The result of the operation is that when the SIGUSR2 monitoring is completed, the two child processes will be killed immediately, and they will be reclaimed by SIGCHLD immediately, instead of waiting for the second process to sleep() for 6 seconds before ending.

Summarize

After learning so much in one breath, don't you feel like you're still unfinished? Three communication methods: pipes, queues, and signals, do you understand them all? Here I only dare to say that I understand it, and it is completely impossible to master the proficient program. If I can write it, I have consulted a lot of information. To be honest, if you just do traditional PHP development, it's really hard to get access to these things, because we don't have to think about these things, php-fpm has already handled them for us. Having said that, in the next article, we will learn about the process pool and process management. Yes, that is what php-fpm does.

Test code:

https://github.com/zhangyue0503/swoole/blob/main/3.Swoole%E8%BF%9B%E7%A8%8B/source/3.4%E8%BF%9B%E7%A8%8B%E9% 97%B4%E9%80%9A%E4%BF%A1.php

Reference documentation:

https://wiki.swoole.com/#/process/process

https://wiki.swoole.com/wiki/page/216.html

https://wiki.swoole.com/wiki/page/290.html