Async/Await
asyncio is Python's library for writing concurrent code using the async/await syntax. It's designed for I/O-bound tasks like network requests, file operations, and inter-process communication.
Basic Concepts
Async code runs on a single thread using an event loop:
import asyncio
async def main():
print("Hello")
await asyncio.sleep(1)
print("World")
# Run the async function
asyncio.run(main())
Coroutines
Functions defined with async def are coroutines:
async def fetch_data():
print("Fetching...")
await asyncio.sleep(2)
return {"data": 42}
async def main():
result = await fetch_data()
print(result)
asyncio.run(main())
Running Tasks Concurrently
Use asyncio.gather to run coroutines concurrently:
import asyncio
import time
async def task(name, delay):
print(f"{name} started")
await asyncio.sleep(delay)
print(f"{name} finished")
return name
async def main():
start = time.time()
# Run all tasks concurrently
results = await asyncio.gather(
task("A", 2),
task("B", 1),
task("C", 3),
)
print(f"Total time: {time.time() - start:.2f}s")
print(results)
# Total time: ~3s (max of delays), not 6s
asyncio.run(main())
Creating Tasks
Use asyncio.create_task to schedule concurrent execution:
async def main():
task1 = asyncio.create_task(some_async_func(1))
task2 = asyncio.create_task(some_async_func(2))
# Do other work here
print("Tasks created, doing other work...")
# Wait for both to complete
await task1
await task2
TaskGroup (Python 3.11+)
Safer way to manage multiple tasks with exception handling:
async def main():
async with asyncio.TaskGroup() as tg:
tg.create_task(some_async_func(1))
tg.create_task(some_async_func(2))
# All tasks completed or cancelled
asyncio.Queue
Producer-consumer patterns with async queues:
async def producer(queue, n):
for i in range(n):
await queue.put(i)
print(f"Produced {i}")
await queue.put(None)
async def consumer(queue, name):
while True:
item = await queue.get()
if item is None:
queue.task_done()
break
print(f"{name} consumed {item}")
queue.task_done()
async def main():
queue = asyncio.Queue()
await asyncio.gather(
producer(queue, 5),
consumer(queue, "C1"),
)
Async Context Managers
Use async with for async resources:
class AsyncConnection:
async def __aenter__(self):
await self.connect()
return self
async def __aexit__(self, exc_type, exc, tb):
await self.disconnect()
async def connect(self): pass
async def disconnect(self): pass
async def query(self, sql): pass
async def main():
async with AsyncConnection() as conn:
await conn.query("SELECT * FROM users")
Async Iterators
Iterate over async data sources:
class AsyncRange:
def __init__(self, n):
self.n = n
self.current = 0
def __aiter__(self):
return self
async def __anext__(self):
if self.current >= self.n:
raise StopAsyncIteration
value = self.current
self.current += 1
await asyncio.sleep(0.1) # Simulate async work
return value
async def main():
async for i in AsyncRange(5):
print(i)
asyncio.run(main())
Event and Locks
Synchronize async tasks:
import asyncio
# Event - signal between tasks
event = asyncio.Event()
async def waiter():
print("Waiting for event...")
await event.wait()
print("Event received!")
async def setter():
await asyncio.sleep(2)
event.set()
async def main():
await asyncio.gather(waiter(), setter())
# Lock - mutual exclusion
lock = asyncio.Lock()
async def critical_section():
async with lock:
# Only one task can enter at a time
print("Locked section")
await asyncio.sleep(1)
Running Sync Code in Async
Use run_in_executor for blocking code:
import asyncio
from concurrent.futures import ThreadPoolExecutor
import time
def blocking_task():
time.sleep(2)
return "Done"
async def main():
loop = asyncio.get_event_loop()
# Run blocking code in thread pool
result = await loop.run_in_executor(None, blocking_task)
print(result)
asyncio.run(main())
Real World: aiohttp
Async HTTP requests:
import aiohttp
import asyncio
async def fetch_urls(urls):
async with aiohttp.ClientSession() as session:
tasks = [session.get(url) for url in urls]
responses = await asyncio.gather(*tasks)
for resp in responses:
data = await resp.text()
print(f"{resp.url}: {len(data)} bytes")
asyncio.run(fetch_urls([
"https://example.com",
"https://example.org",
]))