Skip to content

Latest commit

 

History

History
379 lines (268 loc) · 19.9 KB

File metadata and controls

379 lines (268 loc) · 19.9 KB

Concurrent Programming in Python, Part 1

Nowadays, the computers we use are already multi-CPU or multi-core computers, and the operating systems we use basically all support "multitasking". This lets us run multiple programs at the same time, and it also lets us break one program into several relatively independent subtasks and let these subtasks run in "parallel" or "concurrently", so the execution time of the program is shortened and users also get a better experience. So today, no matter what programming language is used for development, implementing "parallel" or "concurrent" programming has already become a standard skill for programmers. To explain how to implement "parallel" or "concurrent" programming in Python programs, we first need to understand two important concepts: process and thread.

Threads and Processes

When we run a program through the operating system, one or more processes are created. A process is a running activity of a program with certain independent functions over a collection of data. Simply speaking, a process is the basic unit for which the operating system allocates storage space. Each process has its own address space, data stack, and other auxiliary data used to track process execution. The operating system manages the execution of all processes and allocates resources to them reasonably. One process can create new processes through methods such as fork or spawn to execute other tasks. But new processes also have their own independent memory spaces, so if two processes want to share data, they must use inter-process communication mechanisms to do it, such as pipes, signals, sockets, and so on.

A process can also have multiple execution lines. Simply speaking, it can have multiple execution units that can get CPU scheduling, and this is what is called a thread. Because threads are under the same process, they can share the same context. So compared with processes, information sharing and communication between threads are easier. Of course, in a single-core CPU system, multiple threads cannot execute at the same time, because at one moment only one thread can get the CPU. Multiple threads achieve concurrency by sharing CPU execution time.

Using multithreading in programs usually brings obvious benefits. The main ones are improving program performance and improving user experience. Almost all software we use today uses multithreading. This can be confirmed by the process-monitoring tools that come with the system, such as Activity Monitor in macOS or Task Manager in Windows, as shown below.

Here, we still need to stress two concepts again: concurrency and parallelism. Concurrency usually means that at one moment only one instruction can be executed, but the instructions of multiple threads are executed by quick switching. For example, one processor first executes thread A for a period of time, then executes thread B for a period of time, then switches back to thread A for a period of time. Because the processor executes instructions and switches very fast, people cannot feel that the computer is switching between multiple threads during this process. So on the macro level it looks like multiple threads are running at the same time, but on the micro level only one thread is actually executing. Parallelism means that at one moment multiple instructions are being executed on multiple processors at the same time. Parallelism must depend on multiple processors. On both the macro level and the micro level, multiple threads can really execute together at the same moment. Many times we do not strictly separate the words concurrency and parallelism, so we sometimes also treat multithreading, multiprocessing, and asynchronous I/O in Python as ways to implement concurrent programming. But actually the first two can also implement parallel programming. Of course, there is also the problem of the Global Interpreter Lock, GIL, and we will discuss that later.

Multithreaded Programming

The Thread class in the threading module of Python's standard library can help us implement multithreaded programming very easily. We use an example of downloading files over the network to compare the difference between using multithreading and not using multithreading. The code is shown below.

Downloading without multithreading.

import random
import time


def download(*, filename):
    start = time.time()
    print(f'Start downloading {filename}.')
    time.sleep(random.randint(3, 6))
    print(f'{filename} downloaded.')
    end = time.time()
    print(f'Download time: {end - start:.3f}s.')


def main():
    start = time.time()
    download(filename='Python-from-beginner-to-hospital.pdf')
    download(filename='MySQL-from-dropping-db-to-running-away.avi')
    download(filename='Linux-from-mastery-to-giving-up.mp4')
    end = time.time()
    print(f'Total time: {end - start:.3f}s.')


if __name__ == '__main__':
    main()

Note: The code above does not really implement downloading over the network. It uses time.sleep() to sleep for a period of time to simulate that downloading a file needs some time cost, which is similar to the real situation of downloading.

Running the code above can get output like the following. We can see that when our program has only one working thread, each download task has to wait until the previous download task is finished before it can start. So the total time of program execution is the sum of the execution time of the three download tasks.

Start downloading Python-from-beginner-to-hospital.pdf.
Python-from-beginner-to-hospital.pdf downloaded.
Download time: 3.005s.
Start downloading MySQL-from-dropping-db-to-running-away.avi.
MySQL-from-dropping-db-to-running-away.avi downloaded.
Download time: 5.006s.
Start downloading Linux-from-mastery-to-giving-up.mp4.
Linux-from-mastery-to-giving-up.mp4 downloaded.
Download time: 6.007s.
Total time: 14.018s.

In fact, there is no logical cause-and-effect relationship between the three download tasks above. The three can run concurrently, and the next download task does not need to wait for the previous one to finish. For this reason, we can rewrite the code above by using multithreaded programming.

import random
import time
from threading import Thread


def download(*, filename):
    start = time.time()
    print(f'Start downloading {filename}.')
    time.sleep(random.randint(3, 6))
    print(f'{filename} downloaded.')
    end = time.time()
    print(f'Download time: {end - start:.3f}s.')


def main():
    threads = [
        Thread(target=download, kwargs={'filename': 'Python-from-beginner-to-hospital.pdf'}),
        Thread(target=download, kwargs={'filename': 'MySQL-from-dropping-db-to-running-away.avi'}),
        Thread(target=download, kwargs={'filename': 'Linux-from-mastery-to-giving-up.mp4'})
    ]
    start = time.time()
    # Start three threads
    for thread in threads:
        thread.start()
    # Wait for the threads to end
    for thread in threads:
        thread.join()
    end = time.time()
    print(f'Total time: {end - start:.3f}s.')


if __name__ == '__main__':
    main()

One run result is shown below.

Start downloading Python-from-beginner-to-hospital.pdf.
Start downloading MySQL-from-dropping-db-to-running-away.avi.
Start downloading Linux-from-mastery-to-giving-up.mp4.
MySQL-from-dropping-db-to-running-away.avi downloaded.
Download time: 3.005s.
Python-from-beginner-to-hospital.pdf downloaded.
Download time: 5.006s.
Linux-from-mastery-to-giving-up.mp4 downloaded.
Download time: 6.003s.
Total time: 6.004s.

From the run result above, we can find that the execution time of the whole program is almost equal to the execution time of the download task that takes the longest time. This means the three download tasks are running concurrently, and one does not wait for another. This clearly improves the execution efficiency of the program. Simply speaking, if there are execution units in a program that take a lot of time, and there is no logical cause-and-effect relationship between them, meaning the execution of unit B does not depend on the execution result of unit A, then units A and B can be put into two different threads and be run concurrently. Besides reducing the waiting time of program execution, doing this can also bring a better user experience, because one blocked unit does not cause the program to "look dead", since there are still other units in the program that can keep running.

Creating Thread Objects with the Thread Class

From the code above, we can see that we can directly create thread objects by using the constructor of the Thread class, and the start() method of the thread object can start a thread. After the thread starts, it executes the function specified by the target parameter, of course on the premise that it gets CPU scheduling. If the target function that the thread needs to execute has parameters, they need to be specified through the args parameter. For keyword arguments, they can be passed in through the kwargs parameter. The constructor of the Thread class also has many other parameters. We will explain them when we meet them later. For now, what everyone needs to master is target, args, and kwargs.

Customizing Threads by Inheriting the Thread Class

Besides the way of creating threads shown in the code above, we can also customize threads by inheriting from the Thread class and overriding the run() method. The specific code is shown below.

import random
import time
from threading import Thread


class DownloadThread(Thread):

    def __init__(self, filename):
        self.filename = filename
        super().__init__()

    def run(self):
        start = time.time()
        print(f'Start downloading {self.filename}.')
        time.sleep(random.randint(3, 6))
        print(f'{self.filename} downloaded.')
        end = time.time()
        print(f'Download time: {end - start:.3f}s.')


def main():
    threads = [
        DownloadThread('Python-from-beginner-to-hospital.pdf'),
        DownloadThread('MySQL-from-dropping-db-to-running-away.avi'),
        DownloadThread('Linux-from-mastery-to-giving-up.mp4')
    ]
    start = time.time()
    # Start three threads
    for thread in threads:
        thread.start()
    # Wait for the threads to end
    for thread in threads:
        thread.join()
    end = time.time()
    print(f'Total time: {end - start:.3f}s.')


if __name__ == '__main__':
    main()

Using a Thread Pool

We can also put tasks into multiple threads to run by using a thread pool. Using threads through a thread pool should be the most ideal choice in multithreaded programming. In fact, creating and releasing threads both bring relatively large cost, and frequently creating and releasing threads is usually not a good choice. With a thread pool, we can prepare several threads in advance. During use, there is no need to create and release threads again through custom code. We can directly reuse the threads in the thread pool. Python's built-in concurrent.futures module provides support for thread pools. The code is shown below.

import random
import time
from concurrent.futures import ThreadPoolExecutor
from threading import Thread


def download(*, filename):
    start = time.time()
    print(f'Start downloading {filename}.')
    time.sleep(random.randint(3, 6))
    print(f'{filename} downloaded.')
    end = time.time()
    print(f'Download time: {end - start:.3f}s.')


def main():
    with ThreadPoolExecutor(max_workers=4) as pool:
        filenames = ['Python-from-beginner-to-hospital.pdf', 'MySQL-from-dropping-db-to-running-away.avi', 'Linux-from-mastery-to-giving-up.mp4']
        start = time.time()
        for filename in filenames:
            pool.submit(download, filename=filename)
    end = time.time()
    print(f'Total time: {end - start:.3f}s.')


if __name__ == '__main__':
    main()

Daemon Threads

A so-called daemon thread is an execution thread that does not need to be kept after the main thread ends. The meaning of "does not need to be kept" here is that daemon threads are destroyed after all other non-daemon threads finish running. What they guard are all non-daemon threads inside the current process. Simply speaking, daemon threads die along with the main thread, and the life cycle of the main thread is the life cycle of a process. If this is not clear, we can look at a simple piece of code.

import time
from threading import Thread


def display(content):
    while True:
        print(content, end='', flush=True)
        time.sleep(0.1)


def main():
    Thread(target=display, args=('Ping', )).start()
    Thread(target=display, args=('Pong', )).start()


if __name__ == '__main__':
    main()

Note: In the code above, we set the flush parameter of the print function to True. This is because if the value of the flush parameter is False, and print does not do line break processing, then the content output by each print will be put into the operating system's output buffer, until the buffer is filled by output content, and then the buffer will be cleared and one output will happen. This phenomenon is a setting made by the operating system to reduce I/O interrupts and improve CPU utilization. To let the code produce intuitive interaction, we set the flush parameter to True, forcing the output buffer to be cleared on every output.

After the code above starts running, it will not stop, because both child threads have infinite loops, unless you manually stop the code. However, if, when creating the thread object, the parameter named daemon is set to True, then the two threads become daemon threads. Then when other threads end, even though there is an infinite loop, the two daemon threads also die and do not continue to run. The code is shown below.

import time
from threading import Thread


def display(content):
    while True:
        print(content, end='', flush=True)
        time.sleep(0.1)


def main():
    Thread(target=display, args=('Ping', ), daemon=True).start()
    Thread(target=display, args=('Pong', ), daemon=True).start()
    time.sleep(5)


if __name__ == '__main__':
    main()

In the code above, we add one line, time.sleep(5), in the main thread to let the main thread sleep for 5 seconds. During this process, the daemon threads that output Ping and Pong keep running, until the main thread ends after 5 seconds, and these two daemon threads are also destroyed and do not keep running.

Think about it: If daemon=True is removed from line 12 of the code above, how will the code run? Interested readers can try it themselves and see whether the real result matches what they imagined.

Resource Contention

When writing multithreaded code, we will inevitably meet the situation where multiple threads compete for the same resource, meaning the same object. In this situation, if there is no proper mechanism to protect the contended resource, unexpected situations may happen. The code below creates 100 threads to transfer money into the same bank account, whose starting balance is 0 yuan. Each thread transfers 1 yuan. Under normal conditions, the final balance of our bank account should be 100 yuan. But by running the code below, we cannot get the result 100.

import time

from concurrent.futures import ThreadPoolExecutor


class Account(object):
    """Bank account"""

    def __init__(self):
        self.balance = 0.0

    def deposit(self, money):
        """Deposit money"""
        new_balance = self.balance + money
        time.sleep(0.01)
        self.balance = new_balance


def main():
    """Main function"""
    account = Account()
    with ThreadPoolExecutor(max_workers=16) as pool:
        for _ in range(100):
            pool.submit(account.deposit, 1)
    print(account.balance)


if __name__ == '__main__':
    main()

The Account class in the code above represents a bank account. Its deposit method represents the deposit action. The parameter money represents the amount of money to deposit. This method uses the time.sleep function to simulate that accepting a deposit takes some time. We start 100 threads through the thread pool to transfer money into one account, but the code above cannot run to the result 100 that we expect. This is the data inconsistency problem that may happen when multiple threads compete for one resource. Notice line 14 in the code above. When multiple threads all execute to this line, they add the deposit amount to the same balance. This causes the phenomenon of "lost update", meaning the result of an earlier data modification is overwritten by a later modification, so the correct result cannot be got.

To solve the problem above, we can use a lock mechanism and protect the key code that operates on the data with a lock. The threading module of Python's standard library provides the Lock and RLock classes to support lock mechanisms. Here we will not study the difference between them deeply. It is recommended that everyone directly use RLock. Next, we add a lock object to the bank account and use the lock object to solve the "lost update" problem that happened when depositing money just now. The code is shown below.

import time

from concurrent.futures import ThreadPoolExecutor
from threading import RLock


class Account(object):
    """Bank account"""

    def __init__(self):
        self.balance = 0.0
        self.lock = RLock()

    def deposit(self, money):
        # Acquire the lock
        self.lock.acquire()
        try:
            new_balance = self.balance + money
            time.sleep(0.01)
            self.balance = new_balance
        finally:
            # Release the lock
            self.lock.release()


def main():
    """Main function"""
    account = Account()
    with ThreadPoolExecutor(max_workers=16) as pool:
        for _ in range(100):
            pool.submit(account.deposit, 1)
    print(account.balance)


if __name__ == '__main__':
    main()

In the code above, the operations of acquiring and releasing the lock can also be implemented through context syntax. Using context syntax makes the code simpler and more elegant, and this is also the way we recommend.

import time

from concurrent.futures import ThreadPoolExecutor
from threading import RLock


class Account(object):
    """Bank account"""

    def __init__(self):
        self.balance = 0.0
        self.lock = RLock()

    def deposit(self, money):
        # Acquire and release the lock through context syntax
        with self.lock:
            new_balance = self.balance + money
            time.sleep(0.01)
            self.balance = new_balance


def main():
    """Main function"""
    account = Account()
    with ThreadPoolExecutor(max_workers=16) as pool:
        for _ in range(100):
            pool.submit(account.deposit, 1)
    print(account.balance)


if __name__ == '__main__':
    main()

Think about it: Change the code above so that 5 threads deposit money into the bank account, and 5 threads withdraw money from the bank account. When the balance is not enough, the withdrawing threads need to stop and wait until the depositing threads have deposited money, and then try to withdraw again. Here you need thread scheduling knowledge. Everyone can study the Condition class in the threading module by themselves and see whether this task can be completed.

The GIL Problem

If we use the official Python interpreter, usually called CPython, to run Python programs, we cannot raise CPU utilization close to 400% for a 4-core CPU or close to 800% for an 8-core CPU through multithreading, because CPython is limited by the GIL, the Global Interpreter Lock, when executing code. More specifically, when CPython executes any code, the matching thread must first get the GIL. Then after every 100 bytecode instructions, CPython makes the thread that got the GIL actively release the GIL, and only then do other threads get a chance to execute. Because of the existence of the GIL, no matter how many CPU cores your CPU has, the Python code we write has no chance to truly run in parallel.