You are viewing a free preview of this lesson.
Subscribe to unlock all 10 lessons in this course and every other course on LearningBro.
Data pipelines often spend most of their time waiting — waiting for API responses, database queries, or file downloads. Async I/O lets you do useful work during that wait time, dramatically speeding up I/O-bound pipelines. This lesson covers asyncio fundamentals, aiohttp, async database drivers, concurrent API calls, and async generators.
SYNCHRONOUS ASYNCHRONOUS
┌─────┐ ┌─────┐ ┌─────┐ ┌─────┬─────┬─────┐
│ API │ │ API │ │ API │ │ API │ API │ API │
│ 1 │ │ 2 │ │ 3 │ │ 1 │ 2 │ 3 │
│ 2s │ │ 2s │ │ 2s │ │ 2s │ 2s │ 2s │
└─────┘ └─────┘ └─────┘ └─────┴─────┴─────┘
Total: 6 seconds Total: ~2 seconds
For I/O-bound work (API calls, database queries, file reads), async can provide near-linear speedup.
import asyncio
async def fetch_data(url: str) -> dict:
"""An async function (coroutine)."""
print(f"Fetching {url}...")
await asyncio.sleep(1) # Simulates an I/O operation
return {"url": url, "status": "ok"}
async def main():
# Run a single coroutine
result = await fetch_data("https://api.example.com/data")
print(result)
# Run multiple coroutines concurrently
results = await asyncio.gather(
fetch_data("https://api.example.com/users"),
fetch_data("https://api.example.com/orders"),
fetch_data("https://api.example.com/products"),
)
print(f"Got {len(results)} results")
# Entry point
asyncio.run(main())
| Concept | Description |
|---|---|
async def | Defines a coroutine (an async function) |
await | Pauses the coroutine until the awaited operation completes |
asyncio.gather | Runs multiple coroutines concurrently |
asyncio.run | Entry point to run the top-level coroutine |
| Event loop | Manages scheduling and execution of coroutines |
import aiohttp
import asyncio
async def fetch_url(session: aiohttp.ClientSession, url: str) -> dict:
"""Fetch a single URL."""
async with session.get(url, timeout=aiohttp.ClientTimeout(total=30)) as resp:
resp.raise_for_status()
return await resp.json()
async def fetch_all_pages(base_url: str, total_pages: int) -> list[dict]:
"""Fetch all pages concurrently."""
async with aiohttp.ClientSession() as session:
tasks = [
fetch_url(session, f"{base_url}?page={page}")
for page in range(1, total_pages + 1)
]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Filter out errors
successful = [r for r in results if not isinstance(r, Exception)]
errors = [r for r in results if isinstance(r, Exception)]
if errors:
print(f"Warning: {len(errors)} requests failed")
return successful
# Usage
asyncio.run(fetch_all_pages("https://api.example.com/data", 50))
async def fetch_with_limit(
urls: list[str],
max_concurrent: int = 10,
) -> list[dict]:
"""Fetch URLs with a concurrency limit."""
semaphore = asyncio.Semaphore(max_concurrent)
async def bounded_fetch(session: aiohttp.ClientSession, url: str) -> dict:
async with semaphore:
async with session.get(url) as resp:
return await resp.json()
async with aiohttp.ClientSession() as session:
tasks = [bounded_fetch(session, url) for url in urls]
return await asyncio.gather(*tasks, return_exceptions=True)
Tip: Always use a semaphore when making concurrent requests to avoid overwhelming the target server or hitting rate limits.
Subscribe to continue reading
Get full access to this lesson and all 10 lessons in this course.