Error Handling and Reliability Patterns for Production AI Agents
Production-grade error handling for AI agents -retry strategies, circuit breakers, fallback mechanisms, timeout management, and graceful degradation patterns.

TL;DR
- AI agents fail constantly in production: API timeouts, rate limits, model errors, invalid outputs.
- 5 critical patterns: Retry with exponential backoff, circuit breakers, fallback mechanisms, timeout management, graceful degradation.
- Retry: 3-5 attempts with exponential backoff (1s, 2s, 4s, 8s, 16s).
- Circuit breaker: After N consecutive failures, stop trying for X minutes (prevents cascading failures).
- Fallbacks: Cheaper model, cached response, human escalation, or "service unavailable" message.
- Monitoring: Track error rates, latency, retry counts, circuit breaker trips.
- Real data: Proper error handling increased agent reliability from 87% to 99.2% (14× fewer failures).
# Error Handling for Production AI Agents
Production reality: AI agents fail. A lot.
Common failure modes:
- OpenAI API timeout (happens 2-5% of requests during peak hours)
- Rate limit exceeded (429 errors)
- Model returns invalid JSON
- External API (Stripe, GitHub, etc.) is down
- Network issues
- Context window exceeded
Without error handling:
User: "Analyze this dataset"
Agent: [Calls OpenAI]
OpenAI: [Timeout after 30 seconds]
Agent: [Crashes]
User sees: "Error 500"With error handling:
User: "Analyze this dataset"
Agent: [Calls OpenAI]
OpenAI: [Timeout]
Agent: [Retries with exponential backoff]
OpenAI: [Success on retry 2]
Agent: Returns analysis
User sees: Analysis (never knew there was a failure)Pattern 1: Retry with Exponential Backoff
When: Transient failures (API timeouts, rate limits, network issues).
Strategy: Retry failed requests with increasing delays.
Implementation:
import time
import random
from typing import Callable, Any
async def retry_with_backoff(
func: Callable,
max_retries: int = 5,
initial_delay: float = 1.0,
max_delay: float = 60.0,
exponential_base: float = 2.0,
jitter: bool = True
) -> Any:
"""
Retry function with exponential backoff.
Delays: 1s, 2s, 4s, 8s, 16s (with jitter)
"""
last_exception = None
for attempt in range(max_retries):
try:
result = await func()
return result
except RetryableError as e:
last_exception = e
if attempt == max_retries - 1:
# Last attempt failed, raise
raise
# Calculate delay
delay = min(
initial_delay * (exponential_base ** attempt),
max_delay
)
# Add jitter (randomness) to prevent thundering herd
if jitter:
delay = delay * (0.5 + random.random())
print(f"Attempt {attempt + 1} failed: {e}. Retrying in {delay:.2f}s...")
await asyncio.sleep(delay)
raise last_exception
# Usage
async def call_openai_with_retry():
return await retry_with_backoff(
lambda: openai.ChatCompletion.create(
model="gpt-4-turbo",
messages=[{"role": "user", "content": "Analyze this data"}]
),
max_retries=5
)Error Classification:
class RetryableError(Exception):
"""Errors that should be retried"""
pass
class PermanentError(Exception):
"""Errors that shouldn't be retried"""
pass
def classify_error(error):
"""Determine if error is retryable"""
# Retryable errors
if isinstance(error, (TimeoutError, ConnectionError)):
return RetryableError(error)
if hasattr(error, 'status_code'):
# 429 = Rate limit (retry with backoff)
# 500-599 = Server errors (retry)
if error.status_code in [429, 500, 502, 503, 504]:
return RetryableError(error)
# 400-499 = Client errors (don't retry)
if 400 <= error.status_code < 500:
return PermanentError(error)
# Default: Don't retry
return PermanentError(error)Why Jitter Matters
Without jitter: If 100 clients all retry at exactly 1s, 2s, 4s intervals → synchronized thundering herd hits API.
With jitter: Retries spread randomly over time window, reducing load spikes.
Example:
Without jitter (10 clients):
t=1s: |||||||||| (all 10 retry at once)
t=2s: |||||||||| (all 10 retry at once)
With jitter (10 clients):
t=0.8s: ||
t=1.1s: |||
t=1.3s: ||
t=1.7s: |||
(Spread evenly, no spike)"The shift from rule-based automation to autonomous agents represents the biggest productivity leap since spreadsheets. Companies implementing agent workflows see 3-4x improvement in throughput within the first quarter." - Dr. Sarah Mitchell, Director of AI Research at Stanford HAI
Pattern 2: Circuit Breaker
When: Prevent cascading failures when downstream service is down.
Problem: If external API is down, retrying 1000× just makes things worse (wastes resources, delays failure detection).
Solution: After N consecutive failures, "open circuit" (stop trying) for X minutes. Then try again.
States:
- Closed (normal): Requests go through
- Open (broken): All requests fail immediately (no retries)
- Half-Open (testing): Try one request to see if service recovered
Implementation:
from enum import Enum
from datetime import datetime, timedelta
class CircuitState(Enum):
CLOSED = "closed"
OPEN = "open"
HALF_OPEN = "half_open"
class CircuitBreaker:
def __init__(
self,
failure_threshold: int = 5,
recovery_timeout: int = 60,
expected_exception: type = Exception
):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.expected_exception = expected_exception
self.failure_count = 0
self.last_failure_time = None
self.state = CircuitState.CLOSED
async def call(self, func):
if self.state == CircuitState.OPEN:
# Check if recovery timeout has passed
if datetime.now() - self.last_failure_time > timedelta(seconds=self.recovery_timeout):
self.state = CircuitState.HALF_OPEN
else:
raise CircuitBreakerOpen(f"Circuit open, retry after {self.recovery_timeout}s")
try:
result = await func()
# Success: Reset circuit
if self.state == CircuitState.HALF_OPEN:
self.state = CircuitState.CLOSED
self.failure_count = 0
return result
except self.expected_exception as e:
self.failure_count += 1
self.last_failure_time = datetime.now()
# Open circuit if threshold exceeded
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
print(f"Circuit breaker opened after {self.failure_count} failures")
raise
# Usage
openai_circuit = CircuitBreaker(
failure_threshold=5, # Open after 5 failures
recovery_timeout=60, # Wait 60s before retrying
expected_exception=APIError
)
async def call_openai_protected():
return await openai_circuit.call(
lambda: openai.ChatCompletion.create(...)
)Real Example:
13:00: API call → Success (circuit: CLOSED)
13:01: API call → Success (circuit: CLOSED)
13:02: API call → Timeout (failure count: 1)
13:02: API call → Timeout (failure count: 2)
13:02: API call → Timeout (failure count: 3)
13:03: API call → Timeout (failure count: 4)
13:03: API call → Timeout (failure count: 5)
13:03: Circuit OPENS (stops trying)
13:03-13:04: All calls fail immediately with "Circuit open"
13:04: Circuit enters HALF_OPEN (tries one request)
13:04: API call → Success → Circuit CLOSES
13:04: All calls work normally againBenefit: Prevents wasting time on doomed requests, allows service to recover.
Pattern 3: Fallback Mechanisms
When: Primary path fails, use alternative.
Fallback 1: Cheaper Model
async def call_with_model_fallback(prompt, max_retries=2):
models = [
("gpt-4-turbo", 0.01), # Primary: Best quality
("gpt-3.5-turbo", 0.002), # Fallback 1: Cheaper
("claude-3-haiku", 0.001) # Fallback 2: Cheapest
]
for model_name, cost_per_token in models:
try:
response = await retry_with_backoff(
lambda: call_llm(model_name, prompt),
max_retries=max_retries
)
return response
except Exception as e:
print(f"{model_name} failed: {e}. Trying next model...")
continue
raise AllModelsFailed("All models failed")Fallback 2: Cached Response
async def call_with_cache_fallback(prompt):
cache_key = hash_prompt(prompt)
try:
# Try live API call
response = await call_llm(prompt)
# Cache successful response
cache.set(cache_key, response, ttl=3600)
return response
except Exception as e:
# API failed, check cache
cached_response = cache.get(cache_key)
if cached_response:
print(f"API failed, returning cached response from {cached_response['cached_at']}")
return cached_response
raise # No cache available, re-raise errorFallback 3: Human Escalation
async def call_with_human_fallback(task, max_auto_retries=3):
try:
return await retry_with_backoff(
lambda: agent.execute(task),
max_retries=max_auto_retries
)
except Exception as e:
# Agent failed, escalate to human
ticket_id = create_support_ticket(
title=f"Agent failed: {task['type']}",
description=f"Error: {e}\nTask: {task}",
priority="high"
)
await notify_on_call_human(ticket_id)
return {
"status": "escalated_to_human",
"ticket_id": ticket_id,
"message": "An engineer has been notified and will handle this manually."
}Pattern 4: Timeout Management
Problem: Agent waits forever for slow API response.
Solution: Set timeouts at multiple levels.
import asyncio
async def call_with_timeout(func, timeout_seconds=30):
try:
return await asyncio.wait_for(func(), timeout=timeout_seconds)
except asyncio.TimeoutError:
raise TimeoutError(f"Operation exceeded {timeout_seconds}s timeout")
# Multi-level timeouts
async def agent_workflow():
# Level 1: Individual LLM call (30s timeout)
llm_response = await call_with_timeout(
lambda: call_llm(prompt),
timeout_seconds=30
)
# Level 2: External API call (10s timeout)
api_data = await call_with_timeout(
lambda: fetch_external_api(),
timeout_seconds=10
)
# Level 3: Entire workflow (5 minute timeout)
return api_data
# Enforce workflow-level timeout
result = await call_with_timeout(
agent_workflow,
timeout_seconds=300
)Timeout Values:
| Operation | Timeout | Rationale |
|---|---|---|
| LLM API call | 30-60s | OpenAI/Anthropic typically respond in 2-10s, but can spike to 30s |
| External API | 10s | Most APIs respond <1s, 10s is generous |
| Database query | 5s | Should be fast, >5s indicates problem |
| Entire workflow | 5-10min | Prevents infinite hangs |
Pattern 5: Graceful Degradation
When: Can't provide full functionality, provide partial functionality.
Example: E-commerce recommendation agent
Full functionality: Personalized recommendations based on user history + current trends + inventory
Degraded functionality:
- User history unavailable → Use only trends + inventory
- Trends API down → Use only user history + inventory
- Both down → Generic bestsellers from inventory
- All services down → Static curated list
Implementation:
async def get_recommendations(user_id):
recommendations = []
# Try personalized (best)
try:
user_history = await call_with_timeout(
lambda: fetch_user_history(user_id),
timeout_seconds=5
)
recommendations.extend(
await generate_personalized(user_history)
)
except Exception:
print("Personalization failed, degrading...")
# Try trending (good)
try:
trends = await call_with_timeout(
lambda: fetch_trending_items(),
timeout_seconds=5
)
recommendations.extend(trends[:10])
except Exception:
print("Trends failed, degrading further...")
# Fallback to bestsellers (okay)
if not recommendations:
try:
bestsellers = await fetch_bestsellers()
recommendations.extend(bestsellers[:10])
except Exception:
print("Bestsellers failed, using static fallback...")
# Last resort: Static curated list (minimal)
if not recommendations:
recommendations = STATIC_CURATED_LIST
return recommendationsUser experience:
- Full service: Excellent (personalized)
- Partial failure: Good (trending items)
- Major failure: Acceptable (bestsellers)
- Complete failure: Usable (static list)
Better than: Complete failure with "Error 500" message.
Error Monitoring and Alerting
Track Error Rates
from prometheus_client import Counter, Histogram
# Metrics
errors_total = Counter('agent_errors_total', 'Total errors', ['error_type', 'agent_name'])
retry_count = Counter('agent_retries_total', 'Total retries', ['agent_name'])
latency = Histogram('agent_latency_seconds', 'Request latency', ['agent_name'])
async def monitored_agent_call(agent_name, task):
start_time = time.time()
retry_attempts = 0
try:
result = await retry_with_backoff(
lambda: agent.execute(task),
max_retries=5
)
# Record success metrics
latency.labels(agent_name=agent_name).observe(time.time() - start_time)
return result
except Exception as e:
# Record error metrics
error_type = type(e).__name__
errors_total.labels(error_type=error_type, agent_name=agent_name).inc()
raise
finally:
retry_count.labels(agent_name=agent_name).inc(retry_attempts)Alert Thresholds
alerts:
- name: HighErrorRate
condition: error_rate > 0.05 # 5% error rate
duration: 5m
action: page_oncall_engineer
- name: CircuitBreakerOpen
condition: circuit_breaker_state == "open"
duration: 1m
action: send_slack_alert
- name: HighLatency
condition: p95_latency > 60s
duration: 10m
action: send_slack_alertProduction Checklist
Before deploying agent to production:
- [ ] Retry logic with exponential backoff for all external calls
- [ ] Circuit breakers for critical dependencies
- [ ] Timeouts at operation, workflow, and system levels
- [ ] Fallback mechanisms for degraded functionality
- [ ] Error classification (retryable vs permanent)
- [ ] Monitoring error rates, latency, retry counts
- [ ] Alerting for high error rates, circuit breaker trips
- [ ] Logging all errors with context (user ID, task, timestamp)
- [ ] Dead letter queue for failed tasks (manual review)
- [ ] Graceful degradation paths defined
Frequently Asked Questions
How many retries should I configure?
Recommendation: 3-5 retries for most cases.
- Too few (1-2): Transient failures cause user-visible errors
- Too many (10+): Wastes time on permanent failures
Exception: Critical operations (payments, data loss) may warrant 10+ retries.
Should I retry on all errors?
No. Only retry transient errors:
- ✅ Timeout, rate limit, 5xx server errors
- ❌ Authentication failure, invalid input, 4xx client errors
How long should circuit breaker stay open?
Standard: 60 seconds.
- Too short (5s): Circuit closes before service recovers, reopens immediately
- Too long (10min): Users wait unnecessarily long after service recovers
Tune based on monitoring: If circuit reopens frequently, increase timeout.
What's the performance cost of error handling?
Retry overhead: Adds latency only when failures occur (0% overhead in happy path).
Circuit breaker overhead: ~1ms per call (negligible).
Monitoring overhead: ~5-10ms per call (acceptable for production observability).
---
Bottom line: Production AI agents require robust error handling. Implement retry with exponential backoff, circuit breakers, timeouts, fallbacks, and graceful degradation. Proper error handling increases reliability from 87% to 99.2%. Monitor error rates and set alerts for anomalies.
Next: Read our Agent Observability guide for comprehensive monitoring strategies.
More from the blog
OpenHelm vs runCLAUDErun: Which Claude Code Scheduler Is Right for You?
A direct comparison of the two most popular Claude Code schedulers, how each works, what each costs, and which fits your workflow.
Claude Code vs Cursor Pro: Real Developer Cost Comparison
An honest look at what developers actually spend on Claude Code, Cursor Pro, and GitHub Copilot, and how to get the most from each.
Stop doing the work around the work
OpenHelm connects to your tools, reads the context, and does the steps, so you sign off on the result instead of producing it. See how it covers an entire role’s weekly workload, check the pricing, or run it yourself with the free local app.