Async/Await

asyncio is Python's library for writing concurrent code using the async/await syntax. It's designed for I/O-bound tasks like network requests, file operations, and inter-process communication.

Basic Concepts

Async code runs on a single thread using an event loop:

import asyncio

async def main():
    print("Hello")
    await asyncio.sleep(1)
    print("World")

# Run the async function
asyncio.run(main())

Coroutines

Functions defined with async def are coroutines:

async def fetch_data():
    print("Fetching...")
    await asyncio.sleep(2)
    return {"data": 42}

async def main():
    result = await fetch_data()
    print(result)

asyncio.run(main())

Running Tasks Concurrently

Use asyncio.gather to run coroutines concurrently:

import asyncio
import time

async def task(name, delay):
    print(f"{name} started")
    await asyncio.sleep(delay)
    print(f"{name} finished")
    return name

async def main():
    start = time.time()
    
    # Run all tasks concurrently
    results = await asyncio.gather(
        task("A", 2),
        task("B", 1),
        task("C", 3),
    )
    
    print(f"Total time: {time.time() - start:.2f}s")
    print(results)

# Total time: ~3s (max of delays), not 6s
asyncio.run(main())

Creating Tasks

Use asyncio.create_task to schedule concurrent execution:

async def main():
    task1 = asyncio.create_task(some_async_func(1))
    task2 = asyncio.create_task(some_async_func(2))
    
    # Do other work here
    print("Tasks created, doing other work...")
    
    # Wait for both to complete
    await task1
    await task2

TaskGroup (Python 3.11+)

Safer way to manage multiple tasks with exception handling:

async def main():
    async with asyncio.TaskGroup() as tg:
        tg.create_task(some_async_func(1))
        tg.create_task(some_async_func(2))
    # All tasks completed or cancelled

asyncio.Queue

Producer-consumer patterns with async queues:

async def producer(queue, n):
    for i in range(n):
        await queue.put(i)
        print(f"Produced {i}")
    await queue.put(None)

async def consumer(queue, name):
    while True:
        item = await queue.get()
        if item is None:
            queue.task_done()
            break
        print(f"{name} consumed {item}")
        queue.task_done()

async def main():
    queue = asyncio.Queue()
    await asyncio.gather(
        producer(queue, 5),
        consumer(queue, "C1"),
    )

Async Context Managers

Use async with for async resources:

class AsyncConnection:
    async def __aenter__(self):
    await self.connect()
    return self

    async def __aexit__(self, exc_type, exc, tb):
    await self.disconnect()

    async def connect(self): pass
    async def disconnect(self): pass
    async def query(self, sql): pass

async def main():
    async with AsyncConnection() as conn:
        await conn.query("SELECT * FROM users")

Async Iterators

Iterate over async data sources:

class AsyncRange:
    def __init__(self, n):
        self.n = n
        self.current = 0

    def __aiter__(self):
        return self

    async def __anext__(self):
        if self.current >= self.n:
            raise StopAsyncIteration
        value = self.current
        self.current += 1
        await asyncio.sleep(0.1)  # Simulate async work
        return value

async def main():
    async for i in AsyncRange(5):
        print(i)

asyncio.run(main())

Event and Locks

Synchronize async tasks:

import asyncio

# Event - signal between tasks
event = asyncio.Event()

async def waiter():
    print("Waiting for event...")
    await event.wait()
    print("Event received!")

async def setter():
    await asyncio.sleep(2)
    event.set()

async def main():
    await asyncio.gather(waiter(), setter())

# Lock - mutual exclusion
lock = asyncio.Lock()

async def critical_section():
    async with lock:
        # Only one task can enter at a time
        print("Locked section")
        await asyncio.sleep(1)

Running Sync Code in Async

Use run_in_executor for blocking code:

import asyncio
from concurrent.futures import ThreadPoolExecutor
import time

def blocking_task():
    time.sleep(2)
    return "Done"

async def main():
    loop = asyncio.get_event_loop()
    
    # Run blocking code in thread pool
    result = await loop.run_in_executor(None, blocking_task)
    print(result)

asyncio.run(main())

Real World: aiohttp

Async HTTP requests:

import aiohttp
import asyncio

async def fetch_urls(urls):
    async with aiohttp.ClientSession() as session:
        tasks = [session.get(url) for url in urls]
        responses = await asyncio.gather(*tasks)
        
        for resp in responses:
            data = await resp.text()
            print(f"{resp.url}: {len(data)} bytes")

asyncio.run(fetch_urls([
    "https://example.com",
    "https://example.org",
]))