Skip to content

Latest commit

 

History

History
256 lines (184 loc) · 12.6 KB

File metadata and controls

256 lines (184 loc) · 12.6 KB

Concurrent Programming in Python, Part 2

In the previous lesson, we said that because of the existence of the GIL, multithreading in CPython cannot make use of the multi-core advantage of the CPU. If you want to break through the limit of the GIL, you can consider using multiprocessing. In a multiprocessing program, each process has its own GIL, so multiprocessing is not affected by the GIL. Then, how should we create and use multiple processes in Python programs?

Creating Processes

In Python, we can create processes based on the Process class. Although processes and threads are essentially different, the usage of the Process class and the Thread class is very similar. When creating an object with the constructor of the Process class, we also pass in a function through the target parameter to specify the code to be executed by the process, while the args and kwargs parameters can specify the argument values used by that function.

from multiprocessing import Process, current_process
from time import sleep


def sub_task(content, nums):
    # Get the current process object through the current_process function
    # Get the process ID and name through the pid and name attributes of the process object
    print(f'PID: {current_process().pid}')
    print(f'Name: {current_process().name}')
    # Through the output below, it is not hard to find that each process has its own nums list
    # Processes do not share memory in the first place
    # When creating child processes, the data structure of the parent process is copied
    # so the value got by pop(0) from the list is 20 in all three processes
    counter, total = 0, nums.pop(0)
    print(f'Loop count: {total}')
    sleep(0.5)
    while counter < total:
        counter += 1
        print(f'{counter}: {content}')
        sleep(0.01)


def main():
    nums = [20, 30, 40]
    # Create and start processes to execute the specified function
    Process(target=sub_task, args=('Ping', nums)).start()
    Process(target=sub_task, args=('Pong', nums)).start()
    # Execute the sub_task function in the main process
    sub_task('Good', nums)


if __name__ == '__main__':
    main()

Note: The code above gets the current process object through the current_process function, and then gets the process ID through the pid attribute of the process object. In Python, using the getpid function of the os module can also achieve the same effect.

If you want, you can also use the fork function of the os module to create processes. When this function is called, the operating system automatically copies the current process, the parent process, into a child process. The fork function in the parent process returns the ID of the child process, while the fork function in the child process returns 0. In other words, one call to this function gets two different return values in the parent process and the child process. It should be noted that the Windows system does not support the fork function. If you are using Linux or macOS, you can try the code below.

import os

print(f'PID: {os.getpid()}')
pid = os.fork()
if pid == 0:
    print(f'Child process - PID: {os.getpid()}')
    print('Todo: code executed in the child process')
else:
    print(f'Parent process - PID: {os.getpid()}')
    print('Todo: code executed in the parent process')

In short, we still recommend that everyone create and use multiprocessing in three ways: directly using the Process class, inheriting the Process class, and using a process pool, ProcessPoolExecutor. Different from the fork function above, these three ways can ensure code compatibility and portability. The specific way is relatively close to the way of creating and using multithreading that we talked about before, so we will not repeat it here.

Comparison Between Multiprocessing and Multithreading

For I/O-intensive tasks such as crawlers, multiprocessing does not have much advantage. But for CPU-intensive tasks, multiprocessing has a clear improvement in efficiency compared with multithreading, and we can prove this with the code below. The code below uses multithreading and multiprocessing to judge whether a group of large integers are prime numbers. Clearly, this is a CPU-intensive task. We put the task into multiple threads and multiple processes to speed up execution and then see what difference the multithreaded code and the multiprocess code show.

Let us first implement a multithreaded version. The code is shown below.

import concurrent.futures

PRIMES = [
    1116281,
    1297337,
    104395303,
    472882027,
    533000389,
    817504243,
    982451653,
    112272535095293,
    112582705942171,
    112272535095293,
    115280095190773,
    115797848077099,
    1099726899285419
] * 5


def is_prime(n):
    """Judge whether it is a prime"""
    for i in range(2, int(n ** 0.5) + 1):
        if n % i == 0:
            return False
    return n != 1


def main():
    """Main function"""
    with concurrent.futures.ThreadPoolExecutor(max_workers=16) as executor:
        for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
            print('%d is prime: %s' % (number, prime))


if __name__ == '__main__':
    main()

Suppose the code above is saved in a file named example.py. On Linux or macOS, we can use the command time python example.py to run the program and get the operating system's statistics about execution time. On my macOS, the last line of output in one run is shown below.

python example09.py  38.69s user 1.01s system 101% cpu 39.213 total

From the run result, we can see that multithreaded code can only make CPU utilization reach about 100%. This actually already proves that multithreaded code cannot use the multi-core feature of the CPU to speed up execution. Let us now look at the multiprocessing version. We replace the thread pool, ThreadPoolExecutor, in the code above with the process pool, ProcessPoolExecutor.

Multiprocessing version.

import concurrent.futures

PRIMES = [
    1116281,
    1297337,
    104395303,
    472882027,
    533000389,
    817504243,
    982451653,
    112272535095293,
    112582705942171,
    112272535095293,
    115280095190773,
    115797848077099,
    1099726899285419
] * 5


def is_prime(n):
    """Judge whether it is a prime"""
    for i in range(2, int(n ** 0.5) + 1):
        if n % i == 0:
            return False
    return n != 1


def main():
    """Main function"""
    with concurrent.futures.ProcessPoolExecutor(max_workers=16) as executor:
        for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
            print('%d is prime: %s' % (number, prime))


if __name__ == '__main__':
    main()

Tip: When running the code above, you can use the task manager, or resource monitor, that comes with the operating system to check whether multiple Python interpreter processes have been started.

We still use time python example.py to run the code above, and the last line of the run result is shown below.

python example09.py 106.63s user 0.57s system 389% cpu 27.497 total

We can see that, on the computer I used, the multiprocessing version made CPU utilization reach almost 400%, while the CPU time spent in user mode when running the code, 106.63 seconds, is almost 4 times the total code running time, 27.497 seconds. From both points, we can see that my computer used a 4-core CPU. Of course, if you want to know how many CPUs or how many cores your computer has, you can directly use the code below.

import os

print(os.cpu_count())

To sum up, multiprocessing can break through the limit of the GIL and fully use the multi-core features of the CPU. This is very important for CPU-intensive tasks. Common CPU-intensive tasks include scientific computing, image processing, audio and video encoding and decoding, and so on. If these CPU-intensive tasks themselves can be parallelized, then using multiprocessing should be a better choice.

Inter-Process Communication

Before talking about inter-process communication, let us first give everyone one task: start two processes, one outputs Ping, one outputs Pong, and when the total count of Ping and Pong output by the two processes reaches 50, the program ends. It sounds very simple, but when actually writing the code, because multiple processes cannot exchange data directly by sharing memory like multiple threads do, the code below cannot get the result we want.

from multiprocessing import Process
from time import sleep

counter = 0


def sub_task(string):
    global counter
    while counter < 50:
        print(string, end='', flush=True)
        counter += 1
        sleep(0.01)


def main():
    Process(target=sub_task, args=('Ping', )).start()
    Process(target=sub_task, args=('Pong', )).start()


if __name__ == '__main__':
    main()

The code above looks fine, but in the final result, Ping and Pong are each output 50 times. Let me remind everyone again that when we create processes in a program, child processes copy the parent process and all its data structures. Each child process has its own independent memory space. This also means that there is one counter variable in each child process, and both of them will count from 0 to 50, so the result can be imagined. A simpler way to solve this problem is to use the Queue class in the multiprocessing module. It is a queue that can be shared by multiple processes. At the bottom, it is implemented through the operating system's pipes and semaphores. The code is shown below.

import time
from multiprocessing import Process, Queue


def sub_task(content, queue):
    counter = queue.get()
    while counter < 50:
        print(content, end='', flush=True)
        counter += 1
        queue.put(counter)
        time.sleep(0.01)
        counter = queue.get()


def main():
    queue = Queue()
    queue.put(0)
    p1 = Process(target=sub_task, args=('Ping', queue))
    p1.start()
    p2 = Process(target=sub_task, args=('Pong', queue))
    p2.start()
    while p1.is_alive() and p2.is_alive():
        pass
    queue.put(50)


if __name__ == '__main__':
    main()

Tip: By default, the get method of a multiprocessing.Queue object blocks when the queue is empty, and it does not return until it gets data. If you do not want this method to block, or if you need to specify the timeout of blocking, you can set it through the block and timeout parameters.

The code above uses the get and put methods of the Queue class to let three processes, p1, p2, and the main process, share data. This is the so-called communication between processes. In this way, when the value taken out from the Queue is already greater than or equal to 50, p1 and p2 will jump out of the while loop, so the execution of the processes ends. The loop on line 22 is used to wait until one of p1 and p2 ends. At this time, the main process still needs to put a value greater than or equal to 50 into the Queue, so that the other process that has not yet ended will also terminate because it reads a value greater than or equal to 50.

There are many other ways for communication between processes. For example, sockets can also be used to communicate between two processes, and these two processes do not even need to be on the same host. Interested readers can look into this by themselves.

Summary

In Python, we can also use the call function of the subprocess module to execute other commands and create child processes, which is equal to calling other programs inside our program. We will not discuss this knowledge for now. Interested readers can study it by themselves.

For Python developers, the following situations need consideration for using multithreading:

  1. The program needs to maintain a lot of shared state, especially mutable state. Lists, dictionaries, and sets in Python are thread-safe. If multiple threads operate on the same list, dictionary, or set at the same time, they will not cause errors and data problems. So the cost of using threads instead of processes to maintain shared state is relatively small.
  2. The program will spend a lot of time on I/O operations, does not have much demand for parallel computing, and does not need to use too much memory.

Then, when meeting the situations below, you should consider using multiprocessing:

  1. The program runs CPU-intensive tasks, such as audio and video encoding and decoding, data compression, scientific computing, and so on.
  2. The input of the program can be divided into blocks in parallel, and the calculation results can be merged.
  3. The program has no restriction on memory usage and does not strongly depend on I/O operations such as reading and writing files or sockets.