Python’s asyncio is the standard for writing concurrent code — but most developers bounce off it. The syntax looks simple, but the mental model is different. This guide covers everything from basic async/await to building production-ready async systems.
async defcreates coroutines;awaityields control to the event loop- Use
asyncio.gather()for concurrent execution,asyncio.create_task()for background work - Never call
time.sleep()in async code — useasyncio.sleep() aiohttpfor HTTP,asyncpgfor PostgreSQL,aioredisfor Redis- Use
asyncio.Queuefor producer/consumer patterns
Why Async?
Synchronous Python handles one thing at a time per thread. For I/O-bound work (HTTP requests, database queries, file operations), this wastes time waiting.
Synchronous (blocking):
Request 1 → Wait 100ms → Response → Request 2 → Wait 100ms → Response
Total: 200ms Asynchronous (concurrent):
Request 1 → Request 2 → Wait 100ms → Response 1 → Response 2
Total: 100ms The Scenario: Your API makes 5 database calls per request. With sync code, that’s 500ms sequential. With async, it’s ~100ms concurrent.
Basic Syntax
async def and await
import asyncio
async def fetch_data():
print("Starting fetch...")
await asyncio.sleep(1) # Non-blocking sleep
print("Fetch complete!")
return {"data": "value"}
async def main():
result = await fetch_data()
print(result)
# Run the async program
asyncio.run(main()) Key rules:
awaitonly works insideasync deffunctionsasyncio.run()starts the event loop and runs your main coroutine- Regular functions can’t
await— they must be async
The Event Loop
The event loop is the heart of asyncio:
import asyncio
# Get the current event loop
loop = asyncio.get_event_loop()
# Schedule a coroutine
task = loop.create_task(fetch_data())
# Run until complete
result = loop.run_until_complete(task) In practice, use asyncio.run() and let Python manage the loop.
Running Multiple Tasks
Sequential (Slow)
async def sequential():
result1 = await fetch_data() # 1 second
result2 = await fetch_data() # 1 second
result3 = await fetch_data() # 1 second
return [result1, result2, result3]
# Total: 3 seconds Concurrent (Fast)
async def concurrent():
tasks = [
fetch_data(),
fetch_data(),
fetch_data()
]
results = await asyncio.gather(*tasks)
return results
# Total: ~1 second With Error Handling
async def concurrent_with_errors():
tasks = [
asyncio.create_task(fetch_data()),
asyncio.create_task(risky_operation()),
asyncio.create_task(another_fetch())
]
# Return_exceptions=True prevents one failure from stopping everything
results = await asyncio.gather(*tasks, return_exceptions=True)
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"Task {i} failed: {result}")
else:
print(f"Task {i} succeeded: {result}") asyncio.wait() for Fine Control
async def wait_with_timeout():
tasks = [fetch_data(), slow_operation(), fast_operation()]
# Return when first completes
done, pending = await asyncio.wait(
tasks,
return_when=asyncio.FIRST_COMPLETED
)
# Cancel remaining tasks
for task in pending:
task.cancel()
# Get results from completed tasks
for task in done:
try:
result = task.result()
print(f"Completed: {result}")
except Exception as e:
print(f"Failed: {e}") Real HTTP Requests
Use aiohttp instead of requests:
import aiohttp
import asyncio
async def fetch_url(session: aiohttp.ClientSession, url: str):
async with session.get(url) as response:
return await response.text()
async def fetch_all_urls(urls: list[str]):
async with aiohttp.ClientSession() as session:
tasks = [fetch_url(session, url) for url in urls]
results = await asyncio.gather(*tasks)
return results
# Usage
urls = [
'https://api.github.com/users/python',
'https://api.github.com/users/google',
'https://api.github.com/users/microsoft'
]
results = asyncio.run(fetch_all_urls(urls)) Connection Pooling
async def fetch_with_pool():
# Limit concurrent connections
connector = aiohttp.TCPConnector(limit=100, limit_per_host=10)
timeout = aiohttp.ClientTimeout(total=30)
async with aiohttp.ClientSession(
connector=connector,
timeout=timeout
) as session:
tasks = [fetch_url(session, url) for url in urls]
return await asyncio.gather(*tasks) Database Operations
PostgreSQL with asyncpg
import asyncpg
import asyncio
async def fetch_users():
# Connect to database
conn = await asyncpg.connect(
host='localhost',
database='mydb',
user='user',
password='pass'
)
try:
# Fetch multiple rows
rows = await conn.fetch('SELECT * FROM users WHERE active = $1', True)
# Fetch single row
user = await conn.fetchrow(
'SELECT * FROM users WHERE id = $1',
123
)
# Execute write operation
await conn.execute(
'INSERT INTO users(name, email) VALUES($1, $2)',
'John', 'john@example.com'
)
return rows
finally:
await conn.close()
# Connection pooling
async def fetch_with_pool():
pool = await asyncpg.create_pool(
host='localhost',
database='mydb',
user='user',
password='pass',
min_size=5,
max_size=20
)
async with pool.acquire() as conn:
rows = await conn.fetch('SELECT * FROM users')
return rows
await pool.close() Transactions
async def transfer_funds(from_id: int, to_id: int, amount: float):
conn = await asyncpg.connect(dsn='postgresql://...')
try:
async with conn.transaction():
# Both operations succeed or both fail
await conn.execute(
'UPDATE accounts SET balance = balance - $1 WHERE id = $2',
amount, from_id
)
await conn.execute(
'UPDATE accounts SET balance = balance + $1 WHERE id = $2',
amount, to_id
)
finally:
await conn.close() Producer/Consumer Pattern
Use asyncio.Queue for decoupled processing:
import asyncio
from asyncio import Queue
async def producer(queue: Queue, name: str):
for i in range(5):
item = f"{name}-item-{i}"
await queue.put(item)
print(f"{name} produced: {item}")
await asyncio.sleep(0.5)
async def consumer(queue: Queue, name: str):
while True:
item = await queue.get()
if item is None: # Poison pill to stop
break
print(f"{name} consumed: {item}")
await asyncio.sleep(1) # Simulate processing
queue.task_done()
async def main():
queue = Queue(maxsize=10)
# Start producers
producers = [
asyncio.create_task(producer(queue, f"P{i}"))
for i in range(2)
]
# Start consumers
consumers = [
asyncio.create_task(consumer(queue, f"C{i}"))
for i in range(3)
]
# Wait for producers to finish
await asyncio.gather(*producers)
# Signal consumers to stop
for _ in consumers:
await queue.put(None)
await asyncio.gather(*consumers)
asyncio.run(main()) Web Frameworks
FastAPI (Native Async)
from fastapi import FastAPI
import asyncpg
app = FastAPI()
# Database pool
pool = None
@app.on_event("startup")
async def startup():
global pool
pool = await asyncpg.create_pool(dsn='postgresql://...')
@app.on_event("shutdown")
async def shutdown():
await pool.close()
@app.get("/users/{user_id}")
async def get_user(user_id: int):
async with pool.acquire() as conn:
user = await conn.fetchrow(
'SELECT * FROM users WHERE id = $1',
user_id
)
if user:
return dict(user)
return {"error": "User not found"}
@app.get("/users")
async def list_users():
async with pool.acquire() as conn:
rows = await conn.fetch('SELECT * FROM users')
return [dict(row) for row in rows] Background Tasks
from fastapi import BackgroundTasks
async def send_email(email: str, message: str):
await asyncio.sleep(2) # Simulate sending
print(f"Email sent to {email}")
@app.post("/signup")
async def signup(email: str, background_tasks: BackgroundTasks):
# Save user to database
await save_user(email)
# Don't block response — send email in background
background_tasks.add_task(send_email, email, "Welcome!")
return {"status": "success"} Common Pitfalls
1. Blocking the Event Loop
# WRONG: Blocks entire event loop
async def wrong():
time.sleep(5) # Blocks everything!
return "done"
# RIGHT: Use async version
async def right():
await asyncio.sleep(5) # Yields control
return "done" 2. Running Sync Code in Async
import asyncio
from concurrent.futures import ThreadPoolExecutor
# Create thread pool for CPU-bound work
executor = ThreadPoolExecutor(max_workers=4)
async def cpu_bound_task(data):
# Run in thread pool to avoid blocking event loop
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(
executor,
heavy_computation,
data
)
return result 3. Forgetting to await
# WRONG: Returns coroutine object, not result
async def wrong():
result = fetch_data() # Missing await!
print(result) # <coroutine object fetch_data>
# RIGHT:
async def right():
result = await fetch_data()
print(result) 4. Creating Too Many Tasks
# WRONG: Unbounded concurrency
async def wrong():
tasks = [fetch_url(url) for url in 10000_urls]
return await asyncio.gather(*tasks)
# RIGHT: Use semaphore to limit concurrency
semaphore = asyncio.Semaphore(100)
async def fetch_limited(url):
async with semaphore:
return await fetch_url(url)
async def right():
tasks = [fetch_limited(url) for url in urls]
return await asyncio.gather(*tasks) Advanced Patterns
Context Managers
from contextlib import asynccontextmanager
@asynccontextmanager
async def managed_resource():
resource = await acquire_resource()
try:
yield resource
finally:
await release_resource(resource)
async def use_resource():
async with managed_resource() as r:
await r.do_something()
# Automatically released Async Generators
async def stream_large_dataset():
conn = await asyncpg.connect(dsn='postgresql://...')
try:
async with conn.transaction():
async for record in conn.cursor('SELECT * FROM large_table'):
yield record
finally:
await conn.close()
# Usage
async for record in stream_large_dataset():
process(record) Timeouts
async def fetch_with_timeout():
try:
result = await asyncio.wait_for(
fetch_data(),
timeout=5.0
)
return result
except asyncio.TimeoutError:
return {"error": "Request timed out"} Rate Limiting
import asyncio
from datetime import datetime, timedelta
class RateLimiter:
def __init__(self, calls_per_second: float):
self.interval = 1.0 / calls_per_second
self.last_call = 0
self.lock = asyncio.Lock()
async def acquire(self):
async with self.lock:
now = asyncio.get_event_loop().time()
elapsed = now - self.last_call
if elapsed < self.interval:
await asyncio.sleep(self.interval - elapsed)
self.last_call = asyncio.get_event_loop().time()
# Usage
limiter = RateLimiter(calls_per_second=10)
async def rate_limited_fetch(url):
await limiter.acquire()
return await fetch_url(url) Testing Async Code
pytest-asyncio
import pytest
@pytest.mark.asyncio
async def test_fetch_data():
result = await fetch_data()
assert result["data"] == "value"
@pytest.mark.asyncio
async def test_concurrent_fetches():
tasks = [fetch_data() for _ in range(5)]
results = await asyncio.gather(*tasks)
assert len(results) == 5 Mocking Async Functions
from unittest.mock import AsyncMock, patch
@pytest.mark.asyncio
async def test_with_mock():
with patch('module.fetch_data', new_callable=AsyncMock) as mock:
mock.return_value = {"mocked": True}
result = await fetch_data()
assert result == {"mocked": True} Testing Time-Based Code
import pytest
from unittest.mock import patch
@pytest.mark.asyncio
async def test_sleep():
with patch('asyncio.sleep') as mock_sleep:
await my_async_function()
# Verify sleep was called with expected duration
mock_sleep.assert_called_once_with(1.0) Performance Comparison
| Approach | 100 HTTP Requests | Memory Usage |
|---|---|---|
| Synchronous (requests) | ~50 seconds | Low |
| Threading | ~10 seconds | High |
| Asyncio (aiohttp) | ~2 seconds | Low |
When NOT to Use Async
| Use Case | Better Alternative |
|---|---|
| CPU-intensive work | Multiprocessing |
| Simple scripts | Regular sync code |
| Database without async driver | Sync with connection pooling |
| Single request | Don’t add complexity |
Migration Strategy
Moving from sync to async:
# Step 1: Identify I/O bottlenecks
# Step 2: Add async versions alongside sync
# Step 3: Migrate incrementally
# Step 4: Remove sync versions
# Bridge pattern
async def async_api():
return await internal_async_impl()
def sync_api():
return asyncio.run(async_api()) Summary
async/await— Core syntax for non-blocking codeasyncio.gather()— Run tasks concurrently- Use async libraries —
aiohttp,asyncpg,aioredis - Avoid blocking — Never use
time.sleep(),requests.get()in async code - Control concurrency — Use
Semaphorefor rate limiting - Test properly — Use
pytest-asyncioandAsyncMock
Async Python unlocks massive performance gains for I/O-bound applications. The learning curve is worth it.
What to Read Next
- Python Cheat Sheet — Python fundamentals reference
- PostgreSQL with Docker — Database setup for async apps
- FastAPI vs Flask vs Django — Choosing web frameworks
Related Articles
Deepen your understanding with these curated continuations.
Python Type Hints: A Practical Guide for Real Codebases
Use Python type hints effectively with annotations, generics, and Pydantic. Learn to avoid common production gotchas and master tools like mypy and pyright.
Python Virtual Environments: venv, uv, and When to Use Each
Keep Python projects isolated with virtual environments. Learn how to set up venv and uv, manage dependencies, and choose the right tool for your 2026 workflow.
Python List Comprehensions: Syntax, Examples & Tips
Master Python list comprehensions with clear examples. Learn when to use them for cleaner code, how to add filters, and when a regular loop is still better.