Skip to content

Python asyncio Reference: Coroutines, Tasks, Queue, Semaphore & run_in_executor

Python asyncio is the standard library for writing concurrent I/O code. The model: a single-threaded event loop multiplexes I/O operations, running coroutines cooperatively. It’s not for CPU-bound work (use multiprocessing for that) — it shines when you’re waiting on network, disk, or databases.

1. Core Concepts — Coroutines, Tasks & Awaitables

async/await, asyncio.create_task, asyncio.run, and the awaitable protocol
import asyncio

# Coroutine (defined with async def — not executed until awaited):
async def fetch_user(user_id: int) -> dict:
    await asyncio.sleep(0.1)          # yields control to event loop
    return {"id": user_id, "name": "Alice"}

# Run a coroutine (entry point):
result = asyncio.run(fetch_user(1))   # creates event loop, runs, closes

# asyncio.run() is Python 3.7+ and the right way for top-level entry points.
# Don't call asyncio.get_event_loop() directly in new code.

# Task (schedules a coroutine to run concurrently):
async def main():
    # Sequential (slow — each awaits the previous):
    u1 = await fetch_user(1)
    u2 = await fetch_user(2)

    # Concurrent with tasks (fast — both run in parallel):
    task1 = asyncio.create_task(fetch_user(1))
    task2 = asyncio.create_task(fetch_user(2))
    u1 = await task1                  # wait for result
    u2 = await task2

asyncio.run(main())

# Three awaitables in Python:
# 1. coroutines (async def functions)
# 2. Tasks (asyncio.create_task wraps a coroutine)
# 3. Futures (low-level — rarely used directly)

# Gather (run multiple coroutines concurrently, collect results):
async def main():
    results = await asyncio.gather(
        fetch_user(1),
        fetch_user(2),
        fetch_user(3),
    )                                  # returns [result1, result2, result3]
    # Note: if one raises, gather propagates the exception (all are cancelled)
    # Use return_exceptions=True to collect exceptions instead:
    results = await asyncio.gather(fetch_user(1), fetch_user(999),
                                   return_exceptions=True)

2. Timeouts, Cancellation & Error Handling

asyncio.timeout, TaskGroup, CancelledError, and graceful shutdown
import asyncio

# Timeout (Python 3.11+):
async def fetch_with_timeout(url: str) -> str:
    try:
        async with asyncio.timeout(5.0):          # raises TimeoutError after 5s
            return await slow_fetch(url)
    except TimeoutError:
        return "timeout"

# Python 3.10 and below:
try:
    result = await asyncio.wait_for(slow_fetch(url), timeout=5.0)
except asyncio.TimeoutError:
    result = "timeout"

# TaskGroup (Python 3.11+ — structured concurrency, better than gather):
async def main():
    async with asyncio.TaskGroup() as tg:       # all tasks cancel if one raises
        task1 = tg.create_task(fetch_user(1))
        task2 = tg.create_task(fetch_user(2))
        task3 = tg.create_task(fetch_user(3))
    # All tasks complete (or all cancel) by the time we exit the `async with`
    results = [task1.result(), task2.result(), task3.result()]

# Task cancellation:
async def main():
    task = asyncio.create_task(long_operation())
    await asyncio.sleep(1.0)
    task.cancel()                               # sends CancelledError into the coroutine
    try:
        await task
    except asyncio.CancelledError:
        print("Task was cancelled")

# Handle CancelledError in long operations (for cleanup):
async def long_operation():
    try:
        while True:
            await asyncio.sleep(0.1)
            do_work()
    except asyncio.CancelledError:
        cleanup()
        raise                                   # always re-raise CancelledError

# Graceful shutdown (handle SIGTERM in servers):
async def main():
    loop = asyncio.get_running_loop()
    stop = loop.create_future()
    loop.add_signal_handler(signal.SIGTERM, stop.set_result, None)
    await stop
    # cleanup here

3. Queue, Semaphore & Rate Limiting

Producer/consumer with Queue, Semaphore for concurrency limits, and rate limiting patterns
import asyncio

# asyncio.Queue (producer/consumer pattern):
async def producer(queue: asyncio.Queue, items: list):
    for item in items:
        await queue.put(item)
    await queue.put(None)                         # sentinel to signal done

async def consumer(queue: asyncio.Queue, results: list):
    while True:
        item = await queue.get()
        if item is None:
            break
        result = await process(item)
        results.append(result)
        queue.task_done()

async def main():
    queue = asyncio.Queue(maxsize=10)             # backpressure: blocks producer when full
    results = []
    await asyncio.gather(
        producer(queue, [1, 2, 3, 4, 5]),
        consumer(queue, results),
    )

# Semaphore (limit concurrent operations — rate limiting, connection pooling):
async def fetch_url(session, url: str, semaphore: asyncio.Semaphore) -> str:
    async with semaphore:                         # only N coroutines enter at once
        async with session.get(url) as resp:
            return await resp.text()

async def main():
    semaphore = asyncio.Semaphore(10)             # max 10 concurrent requests
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_url(session, url, semaphore) for url in urls]
        results = await asyncio.gather(*tasks)

# asyncio.Event (signal between coroutines):
event = asyncio.Event()

async def waiter():
    await event.wait()                            # blocks until event.set()
    print("Event triggered!")

async def setter():
    await asyncio.sleep(1.0)
    event.set()                                   # unblocks all waiters

asyncio.run(asyncio.gather(waiter(), setter()))

4. Async Context Managers & Iterators

async with, async for, and writing your own async context managers
import asyncio
from contextlib import asynccontextmanager

# Async context manager (async with):
class AsyncDBConnection:
    async def __aenter__(self):
        self.conn = await connect_to_db()
        return self.conn

    async def __aexit__(self, exc_type, exc, tb):
        await self.conn.close()

async def main():
    async with AsyncDBConnection() as conn:
        result = await conn.query("SELECT ...")

# asynccontextmanager decorator (simpler):
@asynccontextmanager
async def managed_connection():
    conn = await connect_to_db()
    try:
        yield conn
    finally:
        await conn.close()

async def main():
    async with managed_connection() as conn:
        result = await conn.query("SELECT ...")

# Async iterator (async for):
class AsyncPageIterator:
    def __init__(self, url: str):
        self.url = url
        self.page = 1
        self.done = False

    def __aiter__(self):
        return self

    async def __anext__(self):
        if self.done:
            raise StopAsyncIteration
        data = await fetch_page(self.url, self.page)
        if not data:
            self.done = True
            raise StopAsyncIteration
        self.page += 1
        return data

async def main():
    async for page in AsyncPageIterator("https://api.example.com/items"):
        process(page)

# Async generator (yield inside async def):
async def paginate(url: str):
    page = 1
    while True:
        data = await fetch_page(url, page)
        if not data:
            return
        yield data
        page += 1

async def main():
    async for page in paginate("https://api.example.com/items"):
        process(page)

5. Running Blocking Code & Integration Patterns

run_in_executor for CPU/blocking I/O, mixing sync and async, aiohttp, aiofiles
import asyncio
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

# run_in_executor — run blocking code without blocking the event loop:
async def main():
    loop = asyncio.get_running_loop()

    # Thread pool (blocking I/O — file reads, legacy sync libraries):
    result = await loop.run_in_executor(None, blocking_file_read, "path.txt")

    # Custom thread pool:
    with ThreadPoolExecutor(max_workers=4) as pool:
        result = await loop.run_in_executor(pool, sync_api_call, params)

    # Process pool (CPU-bound work — image processing, compression):
    with ProcessPoolExecutor(max_workers=4) as pool:
        result = await loop.run_in_executor(pool, cpu_intensive_task, data)

# aiohttp (async HTTP client — use instead of requests in async code):
# pip install aiohttp
import aiohttp

async def fetch_many(urls: list[str]) -> list[str]:
    async with aiohttp.ClientSession() as session:   # reuse session!
        tasks = [session.get(url) for url in urls]
        responses = await asyncio.gather(*tasks)
        return [await r.text() for r in responses]

# aiofiles (async file I/O):
# pip install aiofiles
import aiofiles

async def read_file(path: str) -> str:
    async with aiofiles.open(path, mode='r') as f:
        return await f.read()

# asyncpg (async PostgreSQL — faster than psycopg2 for async code):
# pip install asyncpg
import asyncpg

async def get_users() -> list:
    pool = await asyncpg.create_pool(dsn="postgresql://...")
    async with pool.acquire() as conn:
        return await conn.fetch("SELECT * FROM users WHERE active = true")

# Debug stuck coroutines (slow event loop):
import asyncio
asyncio.get_event_loop().set_debug(True)           # logs coroutines running > 100ms
# Or: PYTHONASYNCIODEBUG=1 python main.py

Track Python and async framework releases.
ReleaseRun monitors Python, FastAPI, and 13+ technologies.

Related: Python Reference | FastAPI Reference | Python Packaging Reference | Python EOL Tracker

🔍 Free tool: PyPI Package Health Checker — check aiohttp, aiofiles, and other async Python packages for EOL status and known CVEs.

Founded

2023 in London, UK

Contact

hello@releaserun.com