Process synchronization and shared memory

Through the study of the previous articles, I believe you have a certain understanding of the process of Swoole. Whether it is a single process or a process pool, we focus on the communication between processes. After all, for processes, they are memory isolated, and communication is relatively a big problem. The content we talked about before is actually not using third-party tools for communication, but in fact, a more convenient way is to directly use some third-party tools as an intermediate storage medium, so that different processes can directly read the content here the ability to communicate. For example, we most commonly use Redis, but even if Redis is used, even if the connection pool is used, there will be a process of connection establishment, so it is not the most efficient. Today, we are going to learn a shared memory table, which is a more efficient data synchronization method provided by Swoole. In addition, we have to learn two other very common inter-process synchronization functions, one is a lock-free counter, and the other is a process lock.

Process synchronization

Regarding the process synchronization problem, we explained it very early. It was in terms of global variables, and explained why traditional global constants cannot be used in Swoole. To achieve similar global functions between processes, in addition to Table or external third-party tools to be discussed later, there are also some small tools that are worthy of our attention.

Interprocess lock-free counter (Atomic)

The inter-process lock-free counter is an atomic counting operation class provided by the bottom layer of Swoole, which can easily realize the lock-free atomic increment and decrement of integers. Does the word atom sound familiar? That's right, it's that atomicity of ACID in the database. Either success or failure, atomic operations are operations that are not interrupted by thread scheduling or multi-process, and run to the end once they start.

The atomic counter is actually a counter function application with atomic operation capability that is simply placed in shared memory, which is to implement simple addition and subtraction assignment operations.

$atomic = new Swoole\Atomic();

(new \Swoole\Process(function (\Swoole\Process $worker) use ($atomic) {
   while($atomic->get() < 5){
       $atomic->add();
       echo "Atomic Now: {$atomic->get()}, pid: {$worker->pid}", PHP_EOL;
       sleep(1);
   }
   echo "Shutdown {$worker->pid}", PHP_EOL;

}))->start();

(new \Swoole\Process(function (\Swoole\Process $worker) use ($atomic) {
   while($atomic->get() < 10){
       $atomic->add();
       echo "Atomic Now: {$atomic->get()}, pid: {$worker->pid}", PHP_EOL;
       sleep(1);
   }
   echo "Shutdown {$worker->pid}", PHP_EOL;
}))->start();

\Swoole\Process::wait();
\Swoole\Process::wait();

// [root@localhost source]# php 3.6进程同步与共享内存.php
// Atomic Now: 1, pid: 1469
// Atomic Now: 2, pid: 1468
// Atomic Now: 3, pid: 1468
// Atomic Now: 4, pid: 1469
// Atomic Now: 5, pid: 1468
// Atomic Now: 6, pid: 1469
// Shutdown 1468
// Atomic Now: 7, pid: 1469
// Atomic Now: 8, pid: 1469
// Atomic Now: 9, pid: 1469
// Atomic Now: 10, pid: 1469
// Shutdown 1469

Nothing special, you can think of an Atomic object as an Int, but it's smaller than an Int, just a 32-bit unsigned integer. If you need large numbers, you can use Swoole\Atomic\Long, which is a 64-bit signed integer object. But objects in Long format do not support the following wait() and weakup() operations.

$atomic = new Swoole\Atomic();

//$atomic->cmpset(0, 1);

(new \Swoole\Process(function (\Swoole\Process $worker) use ($atomic) {
   $atomic->wait(3);
   echo "Shutdown wait Process: {$worker->pid}", PHP_EOL;

}))->start();

(new \Swoole\Process(function (\Swoole\Process $worker) use ($atomic) {
   sleep(2);
   $atomic->wakeup();
//    $atomic->cmpset(0, 1);
   echo "Shutdown other Process: {$worker->pid}", PHP_EOL;
}))->start();

\Swoole\Process::wait();
\Swoole\Process::wait();

//  [root@localhost source]# php 3.6进程同步与共享内存.php
//  Shutdown other Process: 1511
//  Shutdown wait Process: 1510

What do these two methods mean? When the value of atomic is 0, if wait() is called, it will start to enter the wait state. What are you waiting for? wait() ends when the weakup() method is called or the atomic value is set to 1. If the atomic value is not 0 at the beginning, then the wait() method will not work.

In this test code, our final output is that other is executed first, that is, after waiting for 2 seconds, after calling the weakup() method, the previous process that internally called the wait() method ends. The parameter of wait() indicates how long to wait. If it is set to -1, it will wait forever. Otherwise, it will wait according to the number of seconds of the parameter value. After the timeout, it will not wait and continue to run. Here, you can test that atomic is set to a non-zero value at the beginning, that is, the line that calls the cmpset() method in the comment is turned on. Then if you run it again, you will find that wait() does not work, and the first process runs directly.

In fact, this function can realize a kind of lock ability, but it is not particularly flexible. After all, it needs to wait() for a while and weakup() for a while, and maybe the value will be changed directly. We also have a more convenient function of directly operating locks in Swoole, which is the inter-process lock we will talk about below.

Interprocess lock (Lock)

Lock operations are very important for multi-process and multi-thread related operations. why? One of the most important problems in executing code in parallel is that it is possible to modify one thing at the same time. It may be a database, memory data, or file resources. The MySQL we use itself has various lock mechanisms. Threads are used for processing, including transaction processing mechanisms, levels, etc., to solve various problems in data reading and writing under high concurrency. After this, we will study it carefully and comprehensively when we learn MySQL-related content. In the program code, it is the most common situation that memory operations and file operations operate at the same time and conflict occurs. For example, we two processes modify a value at the same time, one process is changed to 2, one process is changed to 3, which is the final result? Is that right?

This situation is actually to be seen based on the business scenario. Is it a cumulative change of 3? Or does 3 represent a state? In any case, the processing of the two processes should be in order, and they cannot be operated at the same time. The result obtained by the real simultaneous operation is ambiguous and cannot be predicted by our deduction. At this time, we can lock this kind of operation, so that only one process can operate this resource at the same time, so that its result is deterministic rather than ambiguous.

$lock = new Swoole\Lock();

(new \Swoole\Process(function (\Swoole\Process $worker) use ($lock) {
    echo "Process: {$worker->pid} Wait", PHP_EOL;
    $lock->lock();
    echo "Process: {$worker->pid} Locked", microtime(true), PHP_EOL;
    sleep(3);
    $lock->unlock();
    echo "Process: {$worker->pid} exit;", PHP_EOL;
}))->start();

(new \Swoole\Process(function (\Swoole\Process $worker) use ($lock) {
    sleep(1);
    echo "Process: {$worker->pid} Wait ", PHP_EOL;
    $lock->lock();
    echo "Process: {$worker->pid} Locked",microtime(true), PHP_EOL;
    $lock->unlock();
    echo "Process: {$worker->pid} exit;", PHP_EOL;
}))->start();

\Swoole\Process::wait();
\Swoole\Process::wait();

//[root@localhost source]# php 3.6进程同步与共享内存.php
//Process: 1611 Wait
//Process: 1611 Locked1640572026.9681
//Process: 1612 Wait
//Process: 1611 exit;
//Process: 1612 Locked1640572029.9771
//Process: 1612 exit;

In this test code, the lock() and unlock() methods of the Swoole\Lock object are used to lock and release the lock. The first process locks after it starts, and then rests for 3 seconds. The second process also wants to lock after it comes in, but the first process has already added the lock, so it has to wait for the first process to release the lock. It can be seen that after 3 seconds, the second process acquired the lock.

Students who have studied C/C++ or Java and Go should easily understand this. If there are some IO resource operations, especially writing data, it must be locked to avoid confusion caused by multiple processes writing data at the same time. At the same time, inter-process locks cannot be used in coroutines. Try not to use coroutine-related APIs in this lock, otherwise  deadlocks will easily occur  .

For more information, you can refer to the official documentation and search for related knowledge for more in-depth learning and understanding.

Shared memory (Table)

The above lock-free counters and lock functions are actually some functions provided for sharing data or communication between processes. For example, the counter can be used simply by simply accumulating numbers, and when operating the same handle file, a lock is added, and all processes in this file can read its data. In fact, this is also a way of inter-process communication and data synchronization. In addition to these, Swoole provides a Table tool, which is an ultra-high-performance in-memory data structure implemented directly based on shared memory and locks. It can solve the problem of multi-process/multi-thread data sharing and synchronization locking.

It is characterized by powerful performance, built-in row lock spin lock (no need for separate lock operation), support for multiple processes, and is a powerful tool for sharing data and communicating between processes.

$table = new Swoole\Table(1024);
$table->column('worker_id', Swoole\Table::TYPE_INT);
$table->column('count', Swoole\Table::TYPE_INT);
$table->column('data', Swoole\Table::TYPE_STRING, 64);
$table->create();

$ppid = getmypid();
$table->set($ppid, ['worker_id'=>getmypid(), 'count'=>0'data'=>"这里是 " . $ppid]);


(new \Swoole\Process(function (\Swoole\Process $worker) use ($table) {
    $table->set($worker->pid, ['worker_id'=>$worker->pid, 'count'=>0'data'=>"这里是 {$worker->pid}"]);
    sleep(1);
    $table->incr($worker->pid, 'count');
    print_r($table->get($worker->pid));
}))->start();

(new \Swoole\Process(function (\Swoole\Process $worker) use ($table, $ppid) {
    $table->set($worker->pid, ['worker_id'=>$worker->pid, 'count'=>3'data'=>"这里是 {$worker->pid}"]);
    sleep(1);
    $table->decr($worker->pid, 'count');
    print_r($table->get($worker->pid));
    sleep(1);

    echo "{$worker->pid} 内部循环:", PHP_EOL;
    foreach($table as $t){
        print_r($t);
    }
    if($table->exist($ppid)){
        $table->del($ppid);
    }
}))->start();

\Swoole\Process::wait();
\Swoole\Process::wait();

echo "Talbe 数量:",$table->count(), PHP_EOL;
echo "主进程循环:", PHP_EOL;
foreach($table as $t){
    print_r($t);
}
echo "Table 状态:", PHP_EOL;
print_r($table->stats());

After instantiating the Swoole\Table object, we need to specify the column information. The instantiation parameter is the maximum number of rows in the Table. This number of rows is not necessarily accurate and is related to the reserved memory size. Note that it does not dynamically allocate memory. It opens up a fixed content space when it is directly instantiated. It is necessary to plan the memory space we need in advance. The operation of specifying a column is especially like the operation of building a database table. This step is to easily serialize data in memory.

Then, we can set the data of each row by the set() method. In different processes, the data is shared and can be viewed.

Finally, it also implements iterator-related functions, which can be traversed by foreach(), returned by count(), and returned by stats().

The final output should look like this.

//  [root@localhost source]# php 3.6进程同步与共享内存.php
//  Array
//  (
//      [worker_id] => 1551
//      [count] => 1
//      [data] => 这里是 1551
//  )
//  Array
//  (
//      [worker_id] => 1552
//      [count] => 2
//      [data] => 这里是 1552
//  )
//  1552 内部循环:
//  Array
//  (
//      [worker_id] => 1550
//      [count] => 0
//      [data] => 这里是 1550
//  )
//  Array
//  (
//     [worker_id] => 1551
//      [count] => 1
//      [data] => 这里是 1551
//  )
//  Array
//  (
//      [worker_id] => 1552
//     [count] => 2
//     [data] => 这里是 1552
//  )
//  Talbe 数量:2
//  主进程循环:
//  Array
//  (
//      [worker_id] => 1551
//      [count] => 1
//      [data] => 这里是 1551
//  )
//  Array
//  (
//      [worker_id] => 1552
//      [count] => 2
//      [data] => 这里是 1552
//  )
//  Table 状态:
//  Array
//  (
//      [num] => 2
//      [conflict_count] => 0
//      [conflict_max_level] => 0
//      [insert_count] => 3
//      [update_count] => 2
//      [delete_count] => 1
//     [available_slice_num] => 204
//     [total_slice_num] => 204
//  )

Summarize

The content of today's study is related to inter-process synchronization. With these, the communication and synchronization between processes is much more convenient. However, it should be noted that if Atomic and Lock are used in server applications, do not create them in callback functions such as onReceive, otherwise the memory may continue to grow, which is the legendary memory leak overflow. why? In fact, it is because they are shared globally by the process and are not recycled. If they are created all the time, the process will continue to be created without stopping, and finally burst the physical memory of the entire application and server.

Well, we have already learned about the basic content related to the process, and then we will enter the next big chapter, which is the learning of the related content of the coroutine. More exciting content is already in front of you, you can't miss it if you pay attention to Sanlian!

Test code:

https://github.com/zhangyue0503/swoole/blob/main/3.Swoole%E8%BF%9B%E7%A8%8B/source/3.6%E8%BF%9B%E7%A8%8B%E5% 90%8C%E6%AD%A5%E4%B8%8E%E5%85%B1%E4%BA%AB%E5%86%85%E5%AD%98.php

Reference documentation:

https://wiki.swoole.com/#/process/process_pool

https://wiki.swoole.com/#/process/process_manager