Process Pool and Process Manager
We have already learned about a single process and how to communicate between processes. However, it is still very difficult to manage one process one by one. No, Swoole has directly prepared the process pool and process management related tools for us. .
process pool
The process pool is a tool for managing multiple worker processes, which itself is based on the process manager we will talk about later. The main core function is to manage multiple processes, allowing developers to implement process management functions without writing too much code, and can also create pure coroutine-style server programs that can utilize multi-core CPUs.
$workNum = 5;
$pool = new \Swoole\Process\Pool($workNum);
$pool->on('WorkerStart', function(\Swoole\Process\Pool $pool, $workerId){
echo "工作进程:{$workerId}, pid: " . posix_getpid() . " 开始运行!", PHP_EOL;
while(1);
});
$pool->on("WorkerStop", function(\Swoole\Process\Pool $pool, $workerId){
echo "工作进程:{$workerId}, pid: " . posix_getpid() . " 结束运行!", PHP_EOL;
});
$pool->start();
The above is the simplest process pool application. We just need to instantiate a \Swoole\Process\Pool object and give it a parameter for the number of processes. Then listen to its WorkerStart method and WorkerStop method. You can check ps and you will find that 5 child processes are started.
Here, we notice that in WorkerStart, a loop is used to suspend the process. If this loop is not used, the process will end when it finishes executing in the WorkerStart callback function. Then the process pool will start a new process in order to maintain the number of processes. So it will keep creating and ending processes all the time. You can try it yourself.
There are two parameters in the callback function of all events, one is the pool object itself, and the other is a WorkerId. This WorkerId is not a process ID, it is the number of the process pool for this process, starting from 0. Just like the $workers array we used in the last article. In fact, the bottom layer of the process pool also maintains such an array, and returns the key value of the array, that is, the number, to the callback function. Like here we have specified to start 5 processes, then its WorkerId is the WorkerId of 5 processes from 0 to 4.
For \Swoole\Process\Pool, in fact, this is just a manifestation of its default communication mode. Regarding this issue, it involves its second parameter. By default, the value of this parameter is SWOOLE_IPC_NONE. In fact, we can also set it to SWOOLE_IPC_UNIXSOCK/SWOOLE_IPC_MSGQUEUE/SWOOLE_IPC_SOCKET. There are many differences between these modes and SWOOLE_IPC_NONE. The content that needs to be monitored is different, and there is no need to suspend the loop. Let's look down.
communication mode
For different process communication modes, let's take a look at them one by one. The first is the SWOOLE_IPC_UNIXSOCK mode. In fact, it is obvious that this is the most commonly used pipeline communication that we talked about last time.
$pool = new \Swoole\Process\Pool($workNum, SWOOLE_IPC_UNIXSOCK);
$pool->on('WorkerStart', function(\Swoole\Process\Pool $pool, $workerId){
$proc1 = $pool->getProcess(0);
while(1){
sleep(1);
if($workerId == 0){
echo $proc1->read(), PHP_EOL;
}else{
$proc1->write("hello proc1, this is proc" . ($workerId + 1));
}
}
});
$pool->on("Message", function(Swoole\Process\Pool $pool, $data){
});
$pool->start();
It's a very simple and familiar code, which is the pipeline communication method we talked about last time. We let the first child process read data and the other processes send data to it. The getProcess() method is used to get a single process from the process pool, and it returns the \Swoole\Process object we learned earlier. If no parameter is specified, the process object of the current WorkerId is returned. If the parameter is specified, the process object of the specified WorkerId is returned.
Except for the SWOOLE_IPC_NONE mode, all other modes must listen to a Message event. Also, other modes may not listen to the WorkerStart event. We will see the application of this later.
Another communication mode is SWOOLE_IPC_MSGQUEUE, which is the message queue mode.
$pool = new \Swoole\Process\Pool($workNum, SWOOLE_IPC_MSGQUEUE, 1);
$pool->on('WorkerStart', function(\Swoole\Process\Pool $pool, $workerId){
$process = $pool->getProcess();
$process->useQueue(1, 2 | \Swoole\Process::IPC_NOWAIT);
while(1){
sleep(1);
if($workerId == 0){
foreach(range(1,4) as $v){
$process->push("[{$v}]消息来了" . time());
}
}else{
$data = $process->pop();
if($data){
echo $process->pop(), ' workerid:', $workerId, PHP_EOL;
}
}
}
});
$pool->on("Message", function(Swoole\Process\Pool $pool, $data){
});
$pool->start();
Similarly, in this test code, we also let the first child process push() the array, and other child processes consume the queue data. It is exactly the same as the queue communication method we talked about last time. But it also has different functions, here we use the queue to demonstrate the role of Message.
external communication
Through the study of the operating system, we know that the queue is shared in the system, all processes can receive queue data, it is not limited to the same program. Therefore, we can let another program send the queue, and the program on this side receives the queue, thus realizing cross-process communication.
$q = msg_get_queue(1);
foreach (range(1, 100) as $i) {
$data = "消息来了" . microtime(true);
msg_send($q, $i, $data, false);
}
We first prepare the above code file to simulate another program to send messages, and then prepare the following test code to receive the sent queue messages.
$pool = new \Swoole\Process\Pool($workNum, SWOOLE_IPC_MSGQUEUE, 1);
$pool->on("Message", function(Swoole\Process\Pool $pool, $data){
var_dump($pool);
$process = $pool->getProcess();
echo $process->pid, PHP_EOL;
var_dump($data);
});
$pool->start();
//object(Swoole\Process\Pool)#1 (2) {
//["master_pid"]=>
// int(7114)
// ["workers"]=>
// array(1) {
// [4]=>
// object(Swoole\Process)#3 (6) {
// ["pipe"]=>
// NULL
// ["msgQueueId"]=>
// NULL
// ["msgQueueKey"]=>
// NULL
// ["pid"]=>
// int(7119)
// ["id"]=>
// int(4)
// ["callback":"Swoole\Process":private]=>
// NULL
// }
// }
//}
//7119
//string(27) "消息来了1640313653.9647"
// ………………
First run the following code, then open a command line to execute the above code for sending a message, and then you can see the content returned in the comment. Here, our process pool only listens to one Message event, which means that the WorkerStart event is really not necessary. In this Message event, we can see that the process IDs processed each time are different, indicating that, as we understood before, these processes are also competing for processing queue data.
In addition to this kind of message queue, we can also use a method, I believe you will be very excited after reading it, that is the SWOOLE_IPC_SOCKET mode.
$pool = new \Swoole\Process\Pool($workNum, SWOOLE_IPC_SOCKET);
$pool->listen('0.0.0.0', 8089);
$pool->on("Message", function(Swoole\Process\Pool $pool, $data){
var_dump($data);
$pool->write("你发来的数据是:\"{$data}\"");
});
$pool->start();
// [root@localhost source]# php 3.5进程池与进程管理器.php
// string(33) "客户端发消息1640318342.8369"
// string(33) "客户端发消息1640318344.8386"
// string(33) "客户端发消息1640318346.8397"
It will start a listening service, so there must be a listen() method to specify the listening ip and port. Then the client can directly establish a Socket connection to communicate with the process. In addition to the ip + port method, it can also listen() a UnixSocket connection using "unix:/tmp/php.sock".
foreach (range(1, 3) as $i) {
$fp = stream_socket_client("tcp://127.0.0.1:8089", $errno, $errstr) or die("error: $errstr\n");
$msg = "客户端发消息" . microtime(true);
fwrite($fp, pack('N', strlen($msg)) . $msg);
sleep(2);
$data = fread($fp, 8192);
if($data){
var_dump(substr($data, 4, unpack('N', substr($data, 0, 4))[1]));
}
fclose($fp);
}
// [root@localhost source]# php 3.52socketclient.php
// string(59) "你发来的数据是:"客户端发消息1640318342.8369""
// string(59) "你发来的数据是:"客户端发消息1640318344.8386""
// string(59) "你发来的数据是:"客户端发消息1640318346.8397""
familiar? Are you kind? Think about the two listening methods of php-fpm, and then think about changing the port above to 9000. Isn't this just a set of php-fpm! !
close process
Shutting down a process is a shutdown() method. This method must be called in WorkerStart or other callback function after start().
$pool = new \Swoole\Process\Pool($workNum, SWOOLE_IPC_UNIXSOCK);
$pool->on('WorkerStart', function(\Swoole\Process\Pool $pool, $workerId){
if($workerId == 0){
echo "Shutdown Worker:{$workerId}, pid:" . posix_getpid(), PHP_EOL;
$pool->shutdown();
}
});
$pool->on('Message', function(\Swoole\Process\Pool $pool, $workerId){
});
$pool->start();
// [root@localhost source]# php 3.5进程池与进程管理器.php
// Shutdown Worker:0, pid:7247
// [root@localhost source]# ps -ef | grep php
// root 7246 4402 0 23:13 pts/1 00:00:00 php 3.5?程池与?程管理器.php
// root 7248 7246 0 23:13 pts/1 00:00:00 php 3.5?程池与?程管理器.php
// root 7249 7246 0 23:13 pts/1 00:00:00 php 3.5?程池与?程管理器.php
// root 7250 7246 0 23:13 pts/1 00:00:00 php 3.5?程池与?程管理器.php
// root 7251 7246 0 23:13 pts/1 00:00:00 php 3.5?程池与?程管理器.php
out of process
Detaching from the process means that the current worker process of the process pool is taken out of management, and the bottom layer will immediately create a new process, the old process will no longer process data, and the application layer code will manage its life cycle by itself.
$pool = new \Swoole\Process\Pool(2);
$pool->on('WorkerStart', function (\Swoole\Process\Pool $pool, $workerId) {
$i = 0;
while (1) {
sleep(1);
$i++;
if ($i == 5) {
echo "Detach Worker:{$workerId}, pid:" . posix_getpid(), PHP_EOL;
$pool->detach();
} else if ($i == 10) {
break;
}
}
});
$pool->on("WorkerStop", function (\Swoole\Process\Pool $pool, $workerId) {
echo "工作进程:{$workerId}, pid: " . posix_getpid() . " 结束运行!", PHP_EOL;
});
$pool->start();
// [root@localhost source]# php 3.5进程池与进程管理器.php
// Detach Worker:1, pid:16336
// Detach Worker:0, pid:16335
// 工作进程:0, pid: 16335 结束运行!
// 工作进程:1, pid: 16336 结束运行!
// [2021-12-23 23:33:42 @16334.0] WARNING ProcessPool::wait(): [Manager]unknown worker[pid=16335]
// [2021-12-23 23:33:42 @16334.0] WARNING ProcessPool::wait(): [Manager]unknown worker[pid=16336]
// Detach Worker:1, pid:16337
// Detach Worker:0, pid:16338
You can lengthen the sleep() time of the above code, and then you can see through ps that there are more processes appearing at the same time. The process pool will start a new child process immediately after it leaves the trigger, and then the original child process It is released by itself after running its own work.
Process manager
The last is the process manager. In fact, the bottom layer of the process pool is based on the process manager.
$pm = new \Swoole\Process\Manager();
for ($i = 0; $i < 2; $i++) {
$pm->add(function (\Swoole\Process\Pool $pool, $workerId) {
echo "工作进程:{$workerId}, pid: " . posix_getpid() . " 开始运行!", PHP_EOL;
while (1) ;
});
}
$pm->start();
Using the add method of the \Swoole\Process\Manager object can directly add a process. In fact, it is similar to the feeling of going to the new \Swoole\Process() object and putting the object in a $workers array, but it is at the bottom. There are more things to deal with, such as helping us wait() the process, performing some suspend operations, and so on.
There are some other methods for the process manager, but there are not many places to use the process manager directly. After all, it is more convenient for us to use the process pool directly. You can go to the official documentation for a more in-depth understanding.
Summarize
Today's content is about the learning of two process management tools in Swoole. In more cases, we can use the process pool directly. In fact, from here, you can also think that the various services we have seen before, such as Http and TCP server applications, are actually after implementing the service interface protocol, pulling up the Worker process through the process pool to achieve multi-process services. processed. The TaskWorker is actually another set of process pools, which is separated from the service processing process in the main process, and can realize asynchronous parallel processing of specific tasks.
Of course, the above content is my own understanding. If there are any mistakes, please correct me.
Test code:
https://github.com/zhangyue0503/swoole/blob/main/3.Swoole%E8%BF%9B%E7%A8%8B/source/3.5%E8%BF%9B%E7%A8%8B%E6% B1%A0%E4%B8%8E%E8%BF%9B%E7%A8%8B%E7%AE%A1%E7%90%86%E5%99%A8.php
Reference documentation:
https://wiki.swoole.com/#/process/process_pool
https://wiki.swoole.com/#/process/process_manager