Python asyncio Reference: Coroutines, Tasks, Queue, Semaphore & run_in_executor
Python asyncio is the standard library for writing concurrent I/O code. The model: a single-threaded event loop multiplexes I/O operations, running coroutines cooperatively. It’s not for CPU-bound work (use multiprocessing for that) — it shines when you’re waiting on network, disk, or databases.
1. Core Concepts — Coroutines, Tasks & Awaitables
async/await, asyncio.create_task, asyncio.run, and the awaitable protocol
import asyncio
# Coroutine (defined with async def — not executed until awaited):
async def fetch_user(user_id: int) -> dict:
await asyncio.sleep(0.1) # yields control to event loop
return {"id": user_id, "name": "Alice"}
# Run a coroutine (entry point):
result = asyncio.run(fetch_user(1)) # creates event loop, runs, closes
# asyncio.run() is Python 3.7+ and the right way for top-level entry points.
# Don't call asyncio.get_event_loop() directly in new code.
# Task (schedules a coroutine to run concurrently):
async def main():
# Sequential (slow — each awaits the previous):
u1 = await fetch_user(1)
u2 = await fetch_user(2)
# Concurrent with tasks (fast — both run in parallel):
task1 = asyncio.create_task(fetch_user(1))
task2 = asyncio.create_task(fetch_user(2))
u1 = await task1 # wait for result
u2 = await task2
asyncio.run(main())
# Three awaitables in Python:
# 1. coroutines (async def functions)
# 2. Tasks (asyncio.create_task wraps a coroutine)
# 3. Futures (low-level — rarely used directly)
# Gather (run multiple coroutines concurrently, collect results):
async def main():
results = await asyncio.gather(
fetch_user(1),
fetch_user(2),
fetch_user(3),
) # returns [result1, result2, result3]
# Note: if one raises, gather propagates the exception (all are cancelled)
# Use return_exceptions=True to collect exceptions instead:
results = await asyncio.gather(fetch_user(1), fetch_user(999),
return_exceptions=True)
2. Timeouts, Cancellation & Error Handling
asyncio.timeout, TaskGroup, CancelledError, and graceful shutdown
import asyncio
# Timeout (Python 3.11+):
async def fetch_with_timeout(url: str) -> str:
try:
async with asyncio.timeout(5.0): # raises TimeoutError after 5s
return await slow_fetch(url)
except TimeoutError:
return "timeout"
# Python 3.10 and below:
try:
result = await asyncio.wait_for(slow_fetch(url), timeout=5.0)
except asyncio.TimeoutError:
result = "timeout"
# TaskGroup (Python 3.11+ — structured concurrency, better than gather):
async def main():
async with asyncio.TaskGroup() as tg: # all tasks cancel if one raises
task1 = tg.create_task(fetch_user(1))
task2 = tg.create_task(fetch_user(2))
task3 = tg.create_task(fetch_user(3))
# All tasks complete (or all cancel) by the time we exit the `async with`
results = [task1.result(), task2.result(), task3.result()]
# Task cancellation:
async def main():
task = asyncio.create_task(long_operation())
await asyncio.sleep(1.0)
task.cancel() # sends CancelledError into the coroutine
try:
await task
except asyncio.CancelledError:
print("Task was cancelled")
# Handle CancelledError in long operations (for cleanup):
async def long_operation():
try:
while True:
await asyncio.sleep(0.1)
do_work()
except asyncio.CancelledError:
cleanup()
raise # always re-raise CancelledError
# Graceful shutdown (handle SIGTERM in servers):
async def main():
loop = asyncio.get_running_loop()
stop = loop.create_future()
loop.add_signal_handler(signal.SIGTERM, stop.set_result, None)
await stop
# cleanup here
3. Queue, Semaphore & Rate Limiting
Producer/consumer with Queue, Semaphore for concurrency limits, and rate limiting patterns
import asyncio
# asyncio.Queue (producer/consumer pattern):
async def producer(queue: asyncio.Queue, items: list):
for item in items:
await queue.put(item)
await queue.put(None) # sentinel to signal done
async def consumer(queue: asyncio.Queue, results: list):
while True:
item = await queue.get()
if item is None:
break
result = await process(item)
results.append(result)
queue.task_done()
async def main():
queue = asyncio.Queue(maxsize=10) # backpressure: blocks producer when full
results = []
await asyncio.gather(
producer(queue, [1, 2, 3, 4, 5]),
consumer(queue, results),
)
# Semaphore (limit concurrent operations — rate limiting, connection pooling):
async def fetch_url(session, url: str, semaphore: asyncio.Semaphore) -> str:
async with semaphore: # only N coroutines enter at once
async with session.get(url) as resp:
return await resp.text()
async def main():
semaphore = asyncio.Semaphore(10) # max 10 concurrent requests
async with aiohttp.ClientSession() as session:
tasks = [fetch_url(session, url, semaphore) for url in urls]
results = await asyncio.gather(*tasks)
# asyncio.Event (signal between coroutines):
event = asyncio.Event()
async def waiter():
await event.wait() # blocks until event.set()
print("Event triggered!")
async def setter():
await asyncio.sleep(1.0)
event.set() # unblocks all waiters
asyncio.run(asyncio.gather(waiter(), setter()))
4. Async Context Managers & Iterators
async with, async for, and writing your own async context managers
import asyncio
from contextlib import asynccontextmanager
# Async context manager (async with):
class AsyncDBConnection:
async def __aenter__(self):
self.conn = await connect_to_db()
return self.conn
async def __aexit__(self, exc_type, exc, tb):
await self.conn.close()
async def main():
async with AsyncDBConnection() as conn:
result = await conn.query("SELECT ...")
# asynccontextmanager decorator (simpler):
@asynccontextmanager
async def managed_connection():
conn = await connect_to_db()
try:
yield conn
finally:
await conn.close()
async def main():
async with managed_connection() as conn:
result = await conn.query("SELECT ...")
# Async iterator (async for):
class AsyncPageIterator:
def __init__(self, url: str):
self.url = url
self.page = 1
self.done = False
def __aiter__(self):
return self
async def __anext__(self):
if self.done:
raise StopAsyncIteration
data = await fetch_page(self.url, self.page)
if not data:
self.done = True
raise StopAsyncIteration
self.page += 1
return data
async def main():
async for page in AsyncPageIterator("https://api.example.com/items"):
process(page)
# Async generator (yield inside async def):
async def paginate(url: str):
page = 1
while True:
data = await fetch_page(url, page)
if not data:
return
yield data
page += 1
async def main():
async for page in paginate("https://api.example.com/items"):
process(page)
5. Running Blocking Code & Integration Patterns
run_in_executor for CPU/blocking I/O, mixing sync and async, aiohttp, aiofiles
import asyncio
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
# run_in_executor — run blocking code without blocking the event loop:
async def main():
loop = asyncio.get_running_loop()
# Thread pool (blocking I/O — file reads, legacy sync libraries):
result = await loop.run_in_executor(None, blocking_file_read, "path.txt")
# Custom thread pool:
with ThreadPoolExecutor(max_workers=4) as pool:
result = await loop.run_in_executor(pool, sync_api_call, params)
# Process pool (CPU-bound work — image processing, compression):
with ProcessPoolExecutor(max_workers=4) as pool:
result = await loop.run_in_executor(pool, cpu_intensive_task, data)
# aiohttp (async HTTP client — use instead of requests in async code):
# pip install aiohttp
import aiohttp
async def fetch_many(urls: list[str]) -> list[str]:
async with aiohttp.ClientSession() as session: # reuse session!
tasks = [session.get(url) for url in urls]
responses = await asyncio.gather(*tasks)
return [await r.text() for r in responses]
# aiofiles (async file I/O):
# pip install aiofiles
import aiofiles
async def read_file(path: str) -> str:
async with aiofiles.open(path, mode='r') as f:
return await f.read()
# asyncpg (async PostgreSQL — faster than psycopg2 for async code):
# pip install asyncpg
import asyncpg
async def get_users() -> list:
pool = await asyncpg.create_pool(dsn="postgresql://...")
async with pool.acquire() as conn:
return await conn.fetch("SELECT * FROM users WHERE active = true")
# Debug stuck coroutines (slow event loop):
import asyncio
asyncio.get_event_loop().set_debug(True) # logs coroutines running > 100ms
# Or: PYTHONASYNCIODEBUG=1 python main.py
Track Python and async framework releases.
ReleaseRun monitors Python, FastAPI, and 13+ technologies.
Related: Python Reference | FastAPI Reference | Python Packaging Reference | Python EOL Tracker
🔍 Free tool: PyPI Package Health Checker — check aiohttp, aiofiles, and other async Python packages for EOL status and known CVEs.
Founded
2023 in London, UK
Contact
hello@releaserun.com