Python’s asyncio is the standard for writing concurrent code — but most developers bounce off it. The syntax looks simple, but the mental model is different. This guide covers everything from basic async/await to building production-ready async systems.
:::note[TL;DR]
async defcreates coroutines;awaityields control to the event loop- Use
asyncio.gather()for concurrent execution,asyncio.create_task()for background work - Never call
time.sleep()in async code — useasyncio.sleep() aiohttpfor HTTP,asyncpgfor PostgreSQL,aioredisfor Redis- Use
asyncio.Queuefor producer/consumer patterns :::
Why Async?
Synchronous Python handles one thing at a time per thread. For I/O-bound work (HTTP requests, database queries, file operations), this wastes time waiting.
Synchronous (blocking):
Request 1 → Wait 100ms → Response → Request 2 → Wait 100ms → Response
Total: 200ms
Asynchronous (concurrent):
Request 1 → Request 2 → Wait 100ms → Response 1 → Response 2
Total: 100ms
The Scenario: Your API makes 5 database calls per request. With sync code, that’s 500ms sequential. With async, it’s ~100ms concurrent.
Basic Syntax
async def and await
import asyncio
async def fetch_data():
print("Starting fetch...")
await asyncio.sleep(1) # Non-blocking sleep
print("Fetch complete!")
return {"data": "value"}
async def main():
result = await fetch_data()
print(result)
# Run the async program
asyncio.run(main())
Key rules:
awaitonly works insideasync deffunctionsasyncio.run()starts the event loop and runs your main coroutine- Regular functions can’t
await— they must be async
The Event Loop
The event loop is the heart of asyncio:
import asyncio
# Get the current event loop
loop = asyncio.get_event_loop()
# Schedule a coroutine
task = loop.create_task(fetch_data())
# Run until complete
result = loop.run_until_complete(task)
In practice, use asyncio.run() and let Python manage the loop.
Running Multiple Tasks
Sequential (Slow)
async def sequential():
result1 = await fetch_data() # 1 second
result2 = await fetch_data() # 1 second
result3 = await fetch_data() # 1 second
return [result1, result2, result3]
# Total: 3 seconds
Concurrent (Fast)
async def concurrent():
tasks = [
fetch_data(),
fetch_data(),
fetch_data()
]
results = await asyncio.gather(*tasks)
return results
# Total: ~1 second
With Error Handling
async def concurrent_with_errors():
tasks = [
asyncio.create_task(fetch_data()),
asyncio.create_task(risky_operation()),
asyncio.create_task(another_fetch())
]
# Return_exceptions=True prevents one failure from stopping everything
results = await asyncio.gather(*tasks, return_exceptions=True)
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"Task {i} failed: {result}")
else:
print(f"Task {i} succeeded: {result}")
asyncio.wait() for Fine Control
async def wait_with_timeout():
tasks = [fetch_data(), slow_operation(), fast_operation()]
# Return when first completes
done, pending = await asyncio.wait(
tasks,
return_when=asyncio.FIRST_COMPLETED
)
# Cancel remaining tasks
for task in pending:
task.cancel()
# Get results from completed tasks
for task in done:
try:
result = task.result()
print(f"Completed: {result}")
except Exception as e:
print(f"Failed: {e}")
Real HTTP Requests
Use aiohttp instead of requests:
import aiohttp
import asyncio
async def fetch_url(session: aiohttp.ClientSession, url: str):
async with session.get(url) as response:
return await response.text()
async def fetch_all_urls(urls: list[str]):
async with aiohttp.ClientSession() as session:
tasks = [fetch_url(session, url) for url in urls]
results = await asyncio.gather(*tasks)
return results
# Usage
urls = [
'https://api.github.com/users/python',
'https://api.github.com/users/google',
'https://api.github.com/users/microsoft'
]
results = asyncio.run(fetch_all_urls(urls))
Connection Pooling
async def fetch_with_pool():
# Limit concurrent connections
connector = aiohttp.TCPConnector(limit=100, limit_per_host=10)
timeout = aiohttp.ClientTimeout(total=30)
async with aiohttp.ClientSession(
connector=connector,
timeout=timeout
) as session:
tasks = [fetch_url(session, url) for url in urls]
return await asyncio.gather(*tasks)
Database Operations
PostgreSQL with asyncpg
import asyncpg
import asyncio
async def fetch_users():
# Connect to database
conn = await asyncpg.connect(
host='localhost',
database='mydb',
user='user',
password='pass'
)
try:
# Fetch multiple rows
rows = await conn.fetch('SELECT * FROM users WHERE active = $1', True)
# Fetch single row
user = await conn.fetchrow(
'SELECT * FROM users WHERE id = $1',
123
)
# Execute write operation
await conn.execute(
'INSERT INTO users(name, email) VALUES($1, $2)',
'John', 'john@example.com'
)
return rows
finally:
await conn.close()
# Connection pooling
async def fetch_with_pool():
pool = await asyncpg.create_pool(
host='localhost',
database='mydb',
user='user',
password='pass',
min_size=5,
max_size=20
)
async with pool.acquire() as conn:
rows = await conn.fetch('SELECT * FROM users')
return rows
await pool.close()
Transactions
async def transfer_funds(from_id: int, to_id: int, amount: float):
conn = await asyncpg.connect(dsn='postgresql://...')
try:
async with conn.transaction():
# Both operations succeed or both fail
await conn.execute(
'UPDATE accounts SET balance = balance - $1 WHERE id = $2',
amount, from_id
)
await conn.execute(
'UPDATE accounts SET balance = balance + $1 WHERE id = $2',
amount, to_id
)
finally:
await conn.close()
Producer/Consumer Pattern
Use asyncio.Queue for decoupled processing:
import asyncio
from asyncio import Queue
async def producer(queue: Queue, name: str):
for i in range(5):
item = f"{name}-item-{i}"
await queue.put(item)
print(f"{name} produced: {item}")
await asyncio.sleep(0.5)
async def consumer(queue: Queue, name: str):
while True:
item = await queue.get()
if item is None: # Poison pill to stop
break
print(f"{name} consumed: {item}")
await asyncio.sleep(1) # Simulate processing
queue.task_done()
async def main():
queue = Queue(maxsize=10)
# Start producers
producers = [
asyncio.create_task(producer(queue, f"P{i}"))
for i in range(2)
]
# Start consumers
consumers = [
asyncio.create_task(consumer(queue, f"C{i}"))
for i in range(3)
]
# Wait for producers to finish
await asyncio.gather(*producers)
# Signal consumers to stop
for _ in consumers:
await queue.put(None)
await asyncio.gather(*consumers)
asyncio.run(main())
Web Frameworks
FastAPI (Native Async)
from fastapi import FastAPI
import asyncpg
app = FastAPI()
# Database pool
pool = None
@app.on_event("startup")
async def startup():
global pool
pool = await asyncpg.create_pool(dsn='postgresql://...')
@app.on_event("shutdown")
async def shutdown():
await pool.close()
@app.get("/users/{user_id}")
async def get_user(user_id: int):
async with pool.acquire() as conn:
user = await conn.fetchrow(
'SELECT * FROM users WHERE id = $1',
user_id
)
if user:
return dict(user)
return {"error": "User not found"}
@app.get("/users")
async def list_users():
async with pool.acquire() as conn:
rows = await conn.fetch('SELECT * FROM users')
return [dict(row) for row in rows]
Background Tasks
from fastapi import BackgroundTasks
async def send_email(email: str, message: str):
await asyncio.sleep(2) # Simulate sending
print(f"Email sent to {email}")
@app.post("/signup")
async def signup(email: str, background_tasks: BackgroundTasks):
# Save user to database
await save_user(email)
# Don't block response — send email in background
background_tasks.add_task(send_email, email, "Welcome!")
return {"status": "success"}
Common Pitfalls
1. Blocking the Event Loop
# WRONG: Blocks entire event loop
async def wrong():
time.sleep(5) # Blocks everything!
return "done"
# RIGHT: Use async version
async def right():
await asyncio.sleep(5) # Yields control
return "done"
2. Running Sync Code in Async
import asyncio
from concurrent.futures import ThreadPoolExecutor
# Create thread pool for CPU-bound work
executor = ThreadPoolExecutor(max_workers=4)
async def cpu_bound_task(data):
# Run in thread pool to avoid blocking event loop
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(
executor,
heavy_computation,
data
)
return result
3. Forgetting to await
# WRONG: Returns coroutine object, not result
async def wrong():
result = fetch_data() # Missing await!
print(result) # <coroutine object fetch_data>
# RIGHT:
async def right():
result = await fetch_data()
print(result)
4. Creating Too Many Tasks
# WRONG: Unbounded concurrency
async def wrong():
tasks = [fetch_url(url) for url in 10000_urls]
return await asyncio.gather(*tasks)
# RIGHT: Use semaphore to limit concurrency
semaphore = asyncio.Semaphore(100)
async def fetch_limited(url):
async with semaphore:
return await fetch_url(url)
async def right():
tasks = [fetch_limited(url) for url in urls]
return await asyncio.gather(*tasks)
Advanced Patterns
Context Managers
from contextlib import asynccontextmanager
@asynccontextmanager
async def managed_resource():
resource = await acquire_resource()
try:
yield resource
finally:
await release_resource(resource)
async def use_resource():
async with managed_resource() as r:
await r.do_something()
# Automatically released
Async Generators
async def stream_large_dataset():
conn = await asyncpg.connect(dsn='postgresql://...')
try:
async with conn.transaction():
async for record in conn.cursor('SELECT * FROM large_table'):
yield record
finally:
await conn.close()
# Usage
async for record in stream_large_dataset():
process(record)
Timeouts
async def fetch_with_timeout():
try:
result = await asyncio.wait_for(
fetch_data(),
timeout=5.0
)
return result
except asyncio.TimeoutError:
return {"error": "Request timed out"}
Rate Limiting
import asyncio
from datetime import datetime, timedelta
class RateLimiter:
def __init__(self, calls_per_second: float):
self.interval = 1.0 / calls_per_second
self.last_call = 0
self.lock = asyncio.Lock()
async def acquire(self):
async with self.lock:
now = asyncio.get_event_loop().time()
elapsed = now - self.last_call
if elapsed < self.interval:
await asyncio.sleep(self.interval - elapsed)
self.last_call = asyncio.get_event_loop().time()
# Usage
limiter = RateLimiter(calls_per_second=10)
async def rate_limited_fetch(url):
await limiter.acquire()
return await fetch_url(url)
Testing Async Code
pytest-asyncio
import pytest
@pytest.mark.asyncio
async def test_fetch_data():
result = await fetch_data()
assert result["data"] == "value"
@pytest.mark.asyncio
async def test_concurrent_fetches():
tasks = [fetch_data() for _ in range(5)]
results = await asyncio.gather(*tasks)
assert len(results) == 5
Mocking Async Functions
from unittest.mock import AsyncMock, patch
@pytest.mark.asyncio
async def test_with_mock():
with patch('module.fetch_data', new_callable=AsyncMock) as mock:
mock.return_value = {"mocked": True}
result = await fetch_data()
assert result == {"mocked": True}
Testing Time-Based Code
import pytest
from unittest.mock import patch
@pytest.mark.asyncio
async def test_sleep():
with patch('asyncio.sleep') as mock_sleep:
await my_async_function()
# Verify sleep was called with expected duration
mock_sleep.assert_called_once_with(1.0)
Performance Comparison
| Approach | 100 HTTP Requests | Memory Usage |
|---|---|---|
| Synchronous (requests) | ~50 seconds | Low |
| Threading | ~10 seconds | High |
| Asyncio (aiohttp) | ~2 seconds | Low |
When NOT to Use Async
| Use Case | Better Alternative |
|---|---|
| CPU-intensive work | Multiprocessing |
| Simple scripts | Regular sync code |
| Database without async driver | Sync with connection pooling |
| Single request | Don’t add complexity |
Migration Strategy
Moving from sync to async:
# Step 1: Identify I/O bottlenecks
# Step 2: Add async versions alongside sync
# Step 3: Migrate incrementally
# Step 4: Remove sync versions
# Bridge pattern
async def async_api():
return await internal_async_impl()
def sync_api():
return asyncio.run(async_api())
Summary
async/await— Core syntax for non-blocking codeasyncio.gather()— Run tasks concurrently- Use async libraries —
aiohttp,asyncpg,aioredis - Avoid blocking — Never use
time.sleep(),requests.get()in async code - Control concurrency — Use
Semaphorefor rate limiting - Test properly — Use
pytest-asyncioandAsyncMock
Async Python unlocks massive performance gains for I/O-bound applications. The learning curve is worth it.
What to Read Next
- Python Cheat Sheet — Python fundamentals reference
- PostgreSQL with Docker — Database setup for async apps
- FastAPI vs Flask vs Django — Choosing web frameworks