You are viewing a free preview of this lesson.
Subscribe to unlock all 10 lessons in this course and every other course on LearningBro.
Building an agent that works in a notebook is very different from deploying one that serves real users reliably. Production agent systems must handle asynchronous execution, comprehensive observability, graceful error recovery, horizontal scaling, and detailed trace logging. This lesson covers the patterns and practices for deploying agents to production.
┌──────────┐ ┌──────────────┐ ┌────────────────────────────┐
│ Client │────▶│ API Gateway │────▶│ Agent Orchestrator │
│ │ │ (Auth, Rate │ │ (Task queue, routing) │
│ │ │ Limiting) │ └────────────┬───────────────┘
└──────────┘ └──────────────┘ │
┌───────────┼───────────────┐
│ │ │
▼ ▼ ▼
┌──────────┐ ┌──────────┐ ┌──────────┐
│ Agent │ │ Agent │ │ Agent │
│ Worker 1 │ │ Worker 2 │ │ Worker N │
└────┬─────┘ └────┬─────┘ └────┬─────┘
│ │ │
┌────────┴────────────┴──────────────┘
│
┌──────────┼──────────────┐
│ │ │
▼ ▼ ▼
┌──────────┐ ┌──────────┐ ┌──────────┐
│ LLM APIs │ │ Tool │ │ State │
│ │ │ Services │ │ Store │
└──────────┘ └──────────┘ │ (Redis/ │
│ Postgres)│
└──────────┘
Agents can take seconds to minutes to complete. Never block the request thread.
import asyncio
import uuid
from enum import Enum
class TaskStatus(Enum):
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
CANCELLED = "cancelled"
class AgentTask:
def __init__(self, task_id: str, user_id: str, task: str):
self.task_id = task_id
self.user_id = user_id
self.task = task
self.status = TaskStatus.PENDING
self.result: str | None = None
self.error: str | None = None
self.steps: list[dict] = []
class AgentTaskQueue:
"""Async task queue for agent execution."""
def __init__(self):
self.tasks: dict[str, AgentTask] = {}
self.queue: asyncio.Queue = asyncio.Queue()
async def submit(self, user_id: str, task: str) -> str:
task_id = str(uuid.uuid4())
agent_task = AgentTask(task_id, user_id, task)
self.tasks[task_id] = agent_task
await self.queue.put(agent_task)
return task_id
async def get_status(self, task_id: str) -> dict:
task = self.tasks.get(task_id)
if not task:
return {"error": "Task not found"}
return {
"task_id": task.task_id,
"status": task.status.value,
"result": task.result,
"error": task.error,
"steps_completed": len(task.steps),
}
async def worker(self, agent_fn):
"""Worker that processes tasks from the queue."""
while True:
task = await self.queue.get()
task.status = TaskStatus.RUNNING
try:
result = await agent_fn(task.task)
task.result = result
task.status = TaskStatus.COMPLETED
except Exception as e:
task.error = str(e)
task.status = TaskStatus.FAILED
finally:
self.queue.task_done()
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
app = FastAPI()
task_queue = AgentTaskQueue()
class SubmitRequest(BaseModel):
task: str
user_id: str
class SubmitResponse(BaseModel):
task_id: str
status: str
@app.post("/agent/submit", response_model=SubmitResponse)
async def submit_task(request: SubmitRequest):
task_id = await task_queue.submit(request.user_id, request.task)
return SubmitResponse(task_id=task_id, status="pending")
@app.get("/agent/status/{task_id}")
async def get_status(task_id: str):
status = await task_queue.get_status(task_id)
if "error" in status and status["error"] == "Task not found":
raise HTTPException(status_code=404, detail="Task not found")
return status
Every step of agent execution should be traced:
import time
import logging
import json
logger = logging.getLogger("agent_trace")
class AgentTracer:
"""Trace agent execution for observability."""
def __init__(self, task_id: str, user_id: str):
self.task_id = task_id
self.user_id = user_id
self.steps: list[dict] = []
self.start_time = time.time()
def trace_step(self, step_number: int, thought: str, action: str,
observation: str, tokens: int, latency_ms: float):
step = {
"task_id": self.task_id,
"step": step_number,
"thought": thought,
"action": action,
"observation": observation[:500],
"tokens": tokens,
"latency_ms": latency_ms,
"timestamp": time.time(),
}
self.steps.append(step)
logger.info(json.dumps(step))
def trace_completion(self, success: bool, final_answer: str, error: str | None = None):
summary = {
"task_id": self.task_id,
"user_id": self.user_id,
"total_steps": len(self.steps),
"total_tokens": sum(s["tokens"] for s in self.steps),
"total_latency_ms": (time.time() - self.start_time) * 1000,
"success": success,
"error": error,
}
logger.info(json.dumps({"event": "agent_completed", **summary}))
return summary
Subscribe to continue reading
Get full access to this lesson and all 10 lessons in this course.