M
MeshWorld.
Python Asyncio Async/Await Concurrency Performance Web Development APIs Backend 9 min read

Python Async/Await: The Complete Guide for Developers

Vishnu
By Vishnu

Python’s asyncio is the standard for writing concurrent code — but most developers bounce off it. The syntax looks simple, but the mental model is different. This guide covers everything from basic async/await to building production-ready async systems.

:::note[TL;DR]

  • async def creates coroutines; await yields control to the event loop
  • Use asyncio.gather() for concurrent execution, asyncio.create_task() for background work
  • Never call time.sleep() in async code — use asyncio.sleep()
  • aiohttp for HTTP, asyncpg for PostgreSQL, aioredis for Redis
  • Use asyncio.Queue for producer/consumer patterns :::

Why Async?

Synchronous Python handles one thing at a time per thread. For I/O-bound work (HTTP requests, database queries, file operations), this wastes time waiting.

Synchronous (blocking):

Request 1 → Wait 100ms → Response → Request 2 → Wait 100ms → Response
Total: 200ms

Asynchronous (concurrent):

Request 1 → Request 2 → Wait 100ms → Response 1 → Response 2
Total: 100ms

The Scenario: Your API makes 5 database calls per request. With sync code, that’s 500ms sequential. With async, it’s ~100ms concurrent.

Basic Syntax

async def and await

import asyncio

async def fetch_data():
    print("Starting fetch...")
    await asyncio.sleep(1)  # Non-blocking sleep
    print("Fetch complete!")
    return {"data": "value"}

async def main():
    result = await fetch_data()
    print(result)

# Run the async program
asyncio.run(main())

Key rules:

  • await only works inside async def functions
  • asyncio.run() starts the event loop and runs your main coroutine
  • Regular functions can’t await — they must be async

The Event Loop

The event loop is the heart of asyncio:

import asyncio

# Get the current event loop
loop = asyncio.get_event_loop()

# Schedule a coroutine
task = loop.create_task(fetch_data())

# Run until complete
result = loop.run_until_complete(task)

In practice, use asyncio.run() and let Python manage the loop.

Running Multiple Tasks

Sequential (Slow)

async def sequential():
    result1 = await fetch_data()  # 1 second
    result2 = await fetch_data()  # 1 second
    result3 = await fetch_data()  # 1 second
    return [result1, result2, result3]
# Total: 3 seconds

Concurrent (Fast)

async def concurrent():
    tasks = [
        fetch_data(),
        fetch_data(),
        fetch_data()
    ]
    results = await asyncio.gather(*tasks)
    return results
# Total: ~1 second

With Error Handling

async def concurrent_with_errors():
    tasks = [
        asyncio.create_task(fetch_data()),
        asyncio.create_task(risky_operation()),
        asyncio.create_task(another_fetch())
    ]
    
    # Return_exceptions=True prevents one failure from stopping everything
    results = await asyncio.gather(*tasks, return_exceptions=True)
    
    for i, result in enumerate(results):
        if isinstance(result, Exception):
            print(f"Task {i} failed: {result}")
        else:
            print(f"Task {i} succeeded: {result}")

asyncio.wait() for Fine Control

async def wait_with_timeout():
    tasks = [fetch_data(), slow_operation(), fast_operation()]
    
    # Return when first completes
    done, pending = await asyncio.wait(
        tasks,
        return_when=asyncio.FIRST_COMPLETED
    )
    
    # Cancel remaining tasks
    for task in pending:
        task.cancel()
    
    # Get results from completed tasks
    for task in done:
        try:
            result = task.result()
            print(f"Completed: {result}")
        except Exception as e:
            print(f"Failed: {e}")

Real HTTP Requests

Use aiohttp instead of requests:

import aiohttp
import asyncio

async def fetch_url(session: aiohttp.ClientSession, url: str):
    async with session.get(url) as response:
        return await response.text()

async def fetch_all_urls(urls: list[str]):
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_url(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
        return results

# Usage
urls = [
    'https://api.github.com/users/python',
    'https://api.github.com/users/google',
    'https://api.github.com/users/microsoft'
]
results = asyncio.run(fetch_all_urls(urls))

Connection Pooling

async def fetch_with_pool():
    # Limit concurrent connections
    connector = aiohttp.TCPConnector(limit=100, limit_per_host=10)
    timeout = aiohttp.ClientTimeout(total=30)
    
    async with aiohttp.ClientSession(
        connector=connector,
        timeout=timeout
    ) as session:
        tasks = [fetch_url(session, url) for url in urls]
        return await asyncio.gather(*tasks)

Database Operations

PostgreSQL with asyncpg

import asyncpg
import asyncio

async def fetch_users():
    # Connect to database
    conn = await asyncpg.connect(
        host='localhost',
        database='mydb',
        user='user',
        password='pass'
    )
    
    try:
        # Fetch multiple rows
        rows = await conn.fetch('SELECT * FROM users WHERE active = $1', True)
        
        # Fetch single row
        user = await conn.fetchrow(
            'SELECT * FROM users WHERE id = $1',
            123
        )
        
        # Execute write operation
        await conn.execute(
            'INSERT INTO users(name, email) VALUES($1, $2)',
            'John', 'john@example.com'
        )
        
        return rows
    finally:
        await conn.close()

# Connection pooling
async def fetch_with_pool():
    pool = await asyncpg.create_pool(
        host='localhost',
        database='mydb',
        user='user',
        password='pass',
        min_size=5,
        max_size=20
    )
    
    async with pool.acquire() as conn:
        rows = await conn.fetch('SELECT * FROM users')
        return rows
    
    await pool.close()

Transactions

async def transfer_funds(from_id: int, to_id: int, amount: float):
    conn = await asyncpg.connect(dsn='postgresql://...')
    
    try:
        async with conn.transaction():
            # Both operations succeed or both fail
            await conn.execute(
                'UPDATE accounts SET balance = balance - $1 WHERE id = $2',
                amount, from_id
            )
            await conn.execute(
                'UPDATE accounts SET balance = balance + $1 WHERE id = $2',
                amount, to_id
            )
    finally:
        await conn.close()

Producer/Consumer Pattern

Use asyncio.Queue for decoupled processing:

import asyncio
from asyncio import Queue

async def producer(queue: Queue, name: str):
    for i in range(5):
        item = f"{name}-item-{i}"
        await queue.put(item)
        print(f"{name} produced: {item}")
        await asyncio.sleep(0.5)

async def consumer(queue: Queue, name: str):
    while True:
        item = await queue.get()
        if item is None:  # Poison pill to stop
            break
        
        print(f"{name} consumed: {item}")
        await asyncio.sleep(1)  # Simulate processing
        queue.task_done()

async def main():
    queue = Queue(maxsize=10)
    
    # Start producers
    producers = [
        asyncio.create_task(producer(queue, f"P{i}"))
        for i in range(2)
    ]
    
    # Start consumers
    consumers = [
        asyncio.create_task(consumer(queue, f"C{i}"))
        for i in range(3)
    ]
    
    # Wait for producers to finish
    await asyncio.gather(*producers)
    
    # Signal consumers to stop
    for _ in consumers:
        await queue.put(None)
    
    await asyncio.gather(*consumers)

asyncio.run(main())

Web Frameworks

FastAPI (Native Async)

from fastapi import FastAPI
import asyncpg

app = FastAPI()

# Database pool
pool = None

@app.on_event("startup")
async def startup():
    global pool
    pool = await asyncpg.create_pool(dsn='postgresql://...')

@app.on_event("shutdown")
async def shutdown():
    await pool.close()

@app.get("/users/{user_id}")
async def get_user(user_id: int):
    async with pool.acquire() as conn:
        user = await conn.fetchrow(
            'SELECT * FROM users WHERE id = $1',
            user_id
        )
        if user:
            return dict(user)
        return {"error": "User not found"}

@app.get("/users")
async def list_users():
    async with pool.acquire() as conn:
        rows = await conn.fetch('SELECT * FROM users')
        return [dict(row) for row in rows]

Background Tasks

from fastapi import BackgroundTasks

async def send_email(email: str, message: str):
    await asyncio.sleep(2)  # Simulate sending
    print(f"Email sent to {email}")

@app.post("/signup")
async def signup(email: str, background_tasks: BackgroundTasks):
    # Save user to database
    await save_user(email)
    
    # Don't block response — send email in background
    background_tasks.add_task(send_email, email, "Welcome!")
    
    return {"status": "success"}

Common Pitfalls

1. Blocking the Event Loop

# WRONG: Blocks entire event loop
async def wrong():
    time.sleep(5)  # Blocks everything!
    return "done"

# RIGHT: Use async version
async def right():
    await asyncio.sleep(5)  # Yields control
    return "done"

2. Running Sync Code in Async

import asyncio
from concurrent.futures import ThreadPoolExecutor

# Create thread pool for CPU-bound work
executor = ThreadPoolExecutor(max_workers=4)

async def cpu_bound_task(data):
    # Run in thread pool to avoid blocking event loop
    loop = asyncio.get_event_loop()
    result = await loop.run_in_executor(
        executor,
        heavy_computation,
        data
    )
    return result

3. Forgetting to await

# WRONG: Returns coroutine object, not result
async def wrong():
    result = fetch_data()  # Missing await!
    print(result)  # <coroutine object fetch_data>

# RIGHT:
async def right():
    result = await fetch_data()
    print(result)

4. Creating Too Many Tasks

# WRONG: Unbounded concurrency
async def wrong():
    tasks = [fetch_url(url) for url in 10000_urls]
    return await asyncio.gather(*tasks)

# RIGHT: Use semaphore to limit concurrency
semaphore = asyncio.Semaphore(100)

async def fetch_limited(url):
    async with semaphore:
        return await fetch_url(url)

async def right():
    tasks = [fetch_limited(url) for url in urls]
    return await asyncio.gather(*tasks)

Advanced Patterns

Context Managers

from contextlib import asynccontextmanager

@asynccontextmanager
async def managed_resource():
    resource = await acquire_resource()
    try:
        yield resource
    finally:
        await release_resource(resource)

async def use_resource():
    async with managed_resource() as r:
        await r.do_something()
    # Automatically released

Async Generators

async def stream_large_dataset():
    conn = await asyncpg.connect(dsn='postgresql://...')
    try:
        async with conn.transaction():
            async for record in conn.cursor('SELECT * FROM large_table'):
                yield record
    finally:
        await conn.close()

# Usage
async for record in stream_large_dataset():
    process(record)

Timeouts

async def fetch_with_timeout():
    try:
        result = await asyncio.wait_for(
            fetch_data(),
            timeout=5.0
        )
        return result
    except asyncio.TimeoutError:
        return {"error": "Request timed out"}

Rate Limiting

import asyncio
from datetime import datetime, timedelta

class RateLimiter:
    def __init__(self, calls_per_second: float):
        self.interval = 1.0 / calls_per_second
        self.last_call = 0
        self.lock = asyncio.Lock()
    
    async def acquire(self):
        async with self.lock:
            now = asyncio.get_event_loop().time()
            elapsed = now - self.last_call
            
            if elapsed < self.interval:
                await asyncio.sleep(self.interval - elapsed)
            
            self.last_call = asyncio.get_event_loop().time()

# Usage
limiter = RateLimiter(calls_per_second=10)

async def rate_limited_fetch(url):
    await limiter.acquire()
    return await fetch_url(url)

Testing Async Code

pytest-asyncio

import pytest

@pytest.mark.asyncio
async def test_fetch_data():
    result = await fetch_data()
    assert result["data"] == "value"

@pytest.mark.asyncio
async def test_concurrent_fetches():
    tasks = [fetch_data() for _ in range(5)]
    results = await asyncio.gather(*tasks)
    assert len(results) == 5

Mocking Async Functions

from unittest.mock import AsyncMock, patch

@pytest.mark.asyncio
async def test_with_mock():
    with patch('module.fetch_data', new_callable=AsyncMock) as mock:
        mock.return_value = {"mocked": True}
        
        result = await fetch_data()
        assert result == {"mocked": True}

Testing Time-Based Code

import pytest
from unittest.mock import patch

@pytest.mark.asyncio
async def test_sleep():
    with patch('asyncio.sleep') as mock_sleep:
        await my_async_function()
        
        # Verify sleep was called with expected duration
        mock_sleep.assert_called_once_with(1.0)

Performance Comparison

Approach100 HTTP RequestsMemory Usage
Synchronous (requests)~50 secondsLow
Threading~10 secondsHigh
Asyncio (aiohttp)~2 secondsLow

When NOT to Use Async

Use CaseBetter Alternative
CPU-intensive workMultiprocessing
Simple scriptsRegular sync code
Database without async driverSync with connection pooling
Single requestDon’t add complexity

Migration Strategy

Moving from sync to async:

# Step 1: Identify I/O bottlenecks
# Step 2: Add async versions alongside sync
# Step 3: Migrate incrementally
# Step 4: Remove sync versions

# Bridge pattern
async def async_api():
    return await internal_async_impl()

def sync_api():
    return asyncio.run(async_api())

Summary

  • async/await — Core syntax for non-blocking code
  • asyncio.gather() — Run tasks concurrently
  • Use async librariesaiohttp, asyncpg, aioredis
  • Avoid blocking — Never use time.sleep(), requests.get() in async code
  • Control concurrency — Use Semaphore for rate limiting
  • Test properly — Use pytest-asyncio and AsyncMock

Async Python unlocks massive performance gains for I/O-bound applications. The learning curve is worth it.