Skip to content

Latest commit

 

History

History
215 lines (150 loc) · 13.4 KB

File metadata and controls

215 lines (150 loc) · 13.4 KB

Concurrent Programming in Python, Part 3

Crawlers are typical I/O-intensive tasks. The feature of an I/O-intensive task is that the program often enters a blocking state because of I/O operations. For example, when we previously used requests to get page code or binary content, after one request is sent, the program must wait until the website returns a response before it can keep running. If the target website is not very good or the network condition is not ideal, the waiting time for the response may be very long, and during this process the whole program stays blocked there without doing anything. Through the previous lessons, we already know that we can speed up a crawler by using multithreading. The essence of using multithreading is that when one thread is blocked, the program still has other threads that can keep running, so the whole program does not waste a lot of time in blocking and waiting.

In fact, there is another concurrent programming way that is very suitable for I/O-intensive tasks. We call it asynchronous programming, and you can also call it asynchronous I/O. This way does not need to start multiple threads or multiple processes to achieve concurrency. It improves CPU utilization through the way that multiple subprograms cooperate with each other, and it solves the problem of low CPU utilization in I/O-intensive tasks. I generally call this way "cooperative concurrency". Here I do not plan to discuss the various I/O modes of the operating system, because for many readers that is too abstract. But we still need to first put out two groups of concepts: one group is "blocking" and "non-blocking", and the other group is "synchronous" and "asynchronous".

Basic Concepts

Blocking

A blocking state means the state in which a program is suspended when it does not get the needed computing resources. If a program cannot continue to handle other things while waiting for some operation to finish, then we say the program is blocked on that operation. Blocking can happen at any time. The most typical examples are I/O interrupts, including network I/O, disk I/O, user input, sleep operations, waiting for a thread to finish, and even when the CPU switches context, the program cannot truly execute. This is what is called blocking.

Non-Blocking

If a program, while waiting for some operation, is not blocked itself and can continue to handle other things, then we say the program is non-blocking on that operation. Non-blocking cannot exist at any program level and in every situation. It can exist only when the level wrapped by the program can contain independent subprogram units. Clearly, blocking of some operation can lead to wasted time and low efficiency, so we hope to turn it into non-blocking.

Synchronous

If different program units need to rely on some communication method during execution in order to coordinate with each other and complete a task, then we say these program units are executed synchronously. For example, in the deposit operation to a bank account that we talked about before, we used a lock in the code as a communication signal to force multiple deposit operations to line up and run in order. This is what is called synchronization.

Asynchronous

If different program units can complete a task without needing communication and coordination during execution, then we call this way asynchronous. For example, when using a crawler to download pages, after the scheduler calls the downloading program, it can schedule other tasks, and it does not need to keep communication with this download task to coordinate behavior. The download and saving operations of different pages are unrelated and do not need to notify or coordinate with each other. Clearly, the completion time and order of asynchronous operations cannot be determined.

Many people cannot grasp these concepts very accurately. Here, we make a simple summary: the focus of synchronous and asynchronous is the message communication mechanism, and finally what they show is the difference between "ordered" and "unordered"; the focus of blocking and non-blocking is the state of the program while waiting for messages, and finally what they show is whether the program can do something else while waiting. If you want to understand these things deeply, it is recommended to read the classic book UNIX Network Programming.

Generators and Coroutines

We said before that asynchronous programming is a kind of "cooperative concurrency", meaning improving CPU utilization through the cooperation of multiple subprograms with each other, so the time wasted by the program in blocking and waiting is reduced, and finally the effect of concurrency is reached. We can call these cooperating subprograms "coroutines". They are the key to implementing asynchronous programming. Before introducing coroutines, let us first use the code below to see what a generator is.

def fib(max_count):
    a, b = 0, 1
    for _ in range(max_count):
        a, b = b, a + b
        yield a

Above, we wrote a generator that generates the Fibonacci sequence. Calling the fib function above does not execute the function and get a return value, because there is a special keyword yield in the fib function. This keyword makes the fib function a little different from an ordinary function. Calling this function gives a generator object. We can verify this through the code below.

gen_obj = fib(20)
print(gen_obj)

Output:

<generator object fib at 0x106daee40>

We can use the built-in function next to get values of the Fibonacci sequence from the generator object, and we can also traverse the values provided by the generator through a for-in loop. The code is shown below.

for value in gen_obj:
    print(value)

A generator, after pre-activation, is a coroutine. It can cooperate with other subprograms.

def calc_average():
    total, counter = 0, 0
    avg_value = None
    while True:
        curr_value = yield avg_value
        total += curr_value
        counter += 1
        avg_value = total / counter


def main():
    obj = calc_average()
    # Pre-activate the generator
    obj.send(None)
    for _ in range(5):
        print(obj.send(float(input())))


if __name__ == '__main__':
    main()

The main function above first sends a None value through the send method of the generator object to activate it into a coroutine. Using next(obj) can also achieve the same effect. Next, the coroutine object receives the data sent by the main function and yields the average value of the data. Through the example above, I do not know whether everyone has seen how the two pieces of subprogram cooperate.

Asynchronous Functions

In Python 3.5, two very interesting elements were introduced, one is async and one is await. They became official keywords in Python 3.7. Through these two keywords, the writing of coroutine code can be simplified, and multiple subprograms can cooperate in a much simpler way. We explain this through one example. First, please look at the code below.

import time


def display(num):
    time.sleep(1)
    print(num)


def main():
    start = time.time()
    for i in range(1, 10):
        display(i)
    end = time.time()
    print(f'{end - start:.3f}s')


if __name__ == '__main__':
    main()

The code above outputs the numbers 1 to 9 in order every time it runs, with an interval of 1 second between each number. The whole code needs about more than 9 seconds to run. I believe everyone can understand this. I do not know whether everyone has noticed that this code is executed in a synchronous and blocking way. Synchronization can be seen from the output of the code, and blocking means that when the display function is called and sleeps, other parts of the whole code cannot continue to execute and must wait until the sleep ends.

Next, let us try to rewrite the code above in an asynchronous way, so that the display function runs asynchronously.

import asyncio
import time


async def display(num):
    await asyncio.sleep(1)
    print(num)


def main():
    start = time.time()
    objs = [display(i) for i in range(1, 10)]
    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.wait(objs))
    loop.close()
    end = time.time()
    print(f'{end - start:.3f}s')


if __name__ == '__main__':
    main()

Python's asyncio module provides support for asynchronous I/O. In the code above, we first add the async keyword before the display function to turn it into an asynchronous function. Calling an asynchronous function does not execute the function body, but gets a coroutine object. We change time.sleep(1) in the display function to await asyncio.sleep(1). The difference between the two is that the latter does not make the whole code fall into blocking, because the await operation lets other cooperating subprograms get the chance to obtain CPU resources and run. To let these subprograms cooperate, we need to put them on an event loop, which is a system for implementing message dispatch and delivery, because when a coroutine meets blocking caused by an I/O operation, it will go into the event loop to listen for whether the I/O operation is finished, and it will register its own context and its own wake-up function, so that execution can be resumed. After that, this coroutine becomes blocked. Line 12 of the code above creates 9 coroutine objects and puts them into a list. Line 13 gets the system's event loop through the get_event_loop function of the asyncio module. Line 14 mounts the coroutine objects onto the event loop through the run_until_complete function of the asyncio module. When you run the code above, you will find that the 9 coroutines that each block for 1 second only block for about 1 second in total, because blocked coroutine objects give up possession of the CPU instead of making the CPU stay idle, and this way greatly improves CPU utilization. Also, we will notice that the numbers are not printed in order from 1 to 9, and this is exactly the result we want, showing that they are executed asynchronously. For I/O-intensive tasks like crawlers, this kind of cooperative concurrency is a better choice than multithreading in many scenarios, because it reduces the overhead caused by managing and maintaining multiple threads and switching between multiple threads.

The aiohttp Library

The third-party library requests that we used before does not support asynchronous I/O. If you want to use asynchronous I/O to speed up the execution of crawler code, we can install and use the third-party library named aiohttp.

Install aiohttp.

pip install aiohttp

The code below uses aiohttp to fetch the home pages of 10 websites and parse out their titles.

import asyncio
import re

import aiohttp
from aiohttp import ClientSession

TITLE_PATTERN = re.compile(r'<title.*?>(.*?)</title>', re.DOTALL)


async def fetch_page_title(url):
    async with aiohttp.ClientSession(headers={
        'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/95.0.4638.69 Safari/537.36',
    }) as session:  # type: ClientSession
        async with session.get(url, ssl=False) as resp:
            if resp.status == 200:
                html_code = await resp.text()
                matcher = TITLE_PATTERN.search(html_code)
                title = matcher.group(1).strip()
                print(title)


def main():
    urls = [
        'https://www.python.org/',
        'https://www.jd.com/',
        'https://www.baidu.com/',
        'https://www.taobao.com/',
        'https://git-scm.com/',
        'https://www.sohu.com/',
        'https://gitee.com/',
        'https://www.amazon.com/',
        'https://www.usa.gov/',
        'https://www.nasa.gov/'
    ]
    objs = [fetch_page_title(url) for url in urls]
    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.wait(objs))
    loop.close()


if __name__ == '__main__':
    main()

Output:

JD.COM - Genuine low prices, quality guarantee, timely delivery, easy shopping!
Sohu
Taobao - Shop what I like
Baidu
Gitee - Git-based code hosting and collaboration platform
Git
NASA
Official Guide to Government Information and Services | USAGov
Amazon.com. Spend less. Smile more.
Welcome to Python.org

From the output above, we can see that the order of homepage titles has no relationship with the order of their URLs in the list. Lines 11 to 13 of the code create a ClientSession object. Through its get method, requests can be sent to the specified URL, as shown on line 14. There is no essential difference from the Session object in requests. The only difference is that asynchronous context is used here. The await on line 16 lets subprograms blocked by I/O operations give up possession of the CPU, and this lets other subprograms run to fetch pages. Lines 17 and 18 use regular expression capturing groups to parse the page title. fetch_page_title is an asynchronous function modified by the async keyword. Calling this function gets a coroutine object, as shown on line 35. The later code has no difference from the previous example, and I believe everyone can understand it.

You can try changing aiohttp back to requests and see what difference there is between not using asynchronous I/O and not using multithreading at all and the code above. I believe through this kind of comparison, everyone can understand more deeply the concepts we stressed before: synchronous and asynchronous, blocking and non-blocking.