Alexander Schaaf

How to write concurrent Python using asyncio

A procedural program in Python executes one function (i.e. subroutine) at a time. A concurrent program is made of several autonomously executed activities - called coroutines. It means that a software does not need to wait for one coroutine to finish before executing another. This is increasingly relevant when writing networked software, such as web servers and microservices.

Note that concurrency is not parallelism, meaning that a concurrent Program does not inherently compute different coroutines at the same time. It rather can start processing others while waiting for another to finish.

In this post I’ll go through the very basics of asyncio, the Python standard library module designed to write concurrent Python using an async/await syntax. I’ll show you how to build a small dummy program that fetches data from a networked service. We’ll also look at the performance difference between implementing it procedually and concurrently.

The async/await syntax of asyncio

In Python, a basic coroutine is declared using the async keyword when defining a function. This allows us to use the await keyword within the function, which will make the program wait for the expression to finish before moving on. But to run the outermost coroutine of a Python program, we can’t just call it, but must use asyncio.run instead:

import asyncio

async def main():
    await asyncio.sleep(1)

asyncio.run(main())

Using tasks to execute awaitable coroutines

The above example executes just like procedural code. So how do we actually write something that acts differently? For that we can use tasks. We can create a task using asyncio.create_task by passing it a coroutine call as its argument, just like with asyncio.run. When creating a task, the coroutine will start executing immediately. The code below creates a task based on the say_hello coroutine, which waits for a second for saying hello. After creating the task we can execute other code (e.g. in this case some logging), before waiting for the task to finish.

import asyncio
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger()


async def say_hello():
    logging.info("say_hello function started")
    await asyncio.sleep(1)
    logging.info("Hello!")
    logging.info("say_hello function finished")


async def main():
    logging.info("main function started")
    task = asyncio.create_task(say_hello())
    logging.info("task created, now doing something else")
    await task
    logging.info("main function finished")


asyncio.run(main())

In the output below we can see how “task created, now doing something else” is actually printed before the say_hello coroutine is executed. This shows how concurrent Python can continue executing a script while firing off a coroutine. In the next line we await task, to make sure the task finishes before we exit the main function and program.

14:40:41 main function started
14:40:41 task created, now doing something else
14:40:41 say_hello function started
14:40:42 Hello!
14:40:42 say_hello function finished
14:40:42 main function finished

Fetching data procedurally

Now lets simulate a more realisitc example: fetching data from an external service. We define a function fetch_data to do this for us. A real coroutine could use the requests package to make a GET request to an API endpoint. In this dummy example we simply sleep for a random amount of time to simulate the query time. An actual HTTP request would need to be sent to the API server, be processed there, and finally the response needs to be sent back from the server to our program. In synchronous code this would look like the following:

import logging
from time import sleep
from random import random

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger()


def fetch_data(t: int) -> dict:
    """Fetch data from a service."""
    time = random() * 2 + 3
    sleep(time)  # simulate I/O delay
    logger.info(f"Task {t} - Completed")
    return {"data": 42}


def main():
    for t in range(3):
        fetch_data(t)
        logger.info(f"Task {t} - Created")

main()

Looking at the output, we see that this takes some time before eventually the fetch_data calls return. All tasks were executed as expected in a procedural program: one by one, predictable. But handling such an I/O task procedurally made the whole program run for ~12 seconds.

17:25:28.670 Task 0 - Created
17:25:32.827 Task 0 - Completed
17:25:32.827 Task 1 - Created
17:25:36.498 Task 1 - Completed
17:25:36.498 Task 2 - Created
17:25:40.215 Task 2 - Completed

Fetching data concurrently

To save time we can fire off all requests concurrently - instead of waiting for an individual response to return first. For that we turn fetch_data into a coroutine using the async keyword and again use await asyncio.sleep to simulate the API response time.

async def fetch_data(t: int) -> dict:
    """Fetch data from a service."""
    time = random() * 2 + 3
    await asyncio.sleep(time)  # simulate I/O delay
    logger.info(f"Task {t} - Completed")
    return {"data": 42}

In our main function we create all three tasks using a loop. But we now need a way to wait for all our tasks to finish. When learning about asyncio, I discovered this fantastic StackOverflow answer. It provides a very concise way to wait for all existing tasks to complete before returning. It uses the asyncio.all_tasks function to return a set of all scheduled tasks. asyncio.current_task returns the current task. We put the latter into a set and calculate the difference with all tasks to remove it from the set. The remaining set is unpacked into the asyncio.gather function, which we then await. It thus waits for all tasks to finish.

async def main():
    for t in range(3):
        logger.info(f"Task {t} - Created")
        asyncio.create_task(fetch_data(t))
    await asyncio.gather(*asyncio.all_tasks() - {asyncio.current_task()})

asyncio.run(main())

Looking at the results we see that all three tasks were fired off basically simultaneously. But notice that Task 2 returned before Task 1, as each task takes a random amount of time between 3-5 seconds. But you can see all tasks were completed within a total of five seconds - basically the time it took for the simulated responses to return from our fictional external server. A massive speed-up!

17:26:57.722 Task 0 - Created
17:26:57.722 Task 1 - Created
17:26:57.722 Task 2 - Created
17:27:02.002 Task 0 - Completed
17:27:02.086 Task 2 - Completed
17:27:02.173 Task 1 - Completed

This is of course just scratching the surface on concurrency in Python. If you’re interested to see concurrency used in the wild I can recommend to have a look at the web framework FastAPI. It uses starlette as its ASGI framework, which is based on AnyIO, which uses asyncio under the hood.