Academy

Error Handling and Reliability Patterns for Production AI Agents

Production-grade error handling for AI agents -retry strategies, circuit breakers, fallback mechanisms, timeout management, and graceful degradation patterns.

M
Max Beech· Founder
··9 min read
Error Handling and Reliability Patterns for Production AI Agents

TL;DR

  • AI agents fail constantly in production: API timeouts, rate limits, model errors, invalid outputs.
  • 5 critical patterns: Retry with exponential backoff, circuit breakers, fallback mechanisms, timeout management, graceful degradation.
  • Retry: 3-5 attempts with exponential backoff (1s, 2s, 4s, 8s, 16s).
  • Circuit breaker: After N consecutive failures, stop trying for X minutes (prevents cascading failures).
  • Fallbacks: Cheaper model, cached response, human escalation, or "service unavailable" message.
  • Monitoring: Track error rates, latency, retry counts, circuit breaker trips.
  • Real data: Proper error handling increased agent reliability from 87% to 99.2% (14× fewer failures).

# Error Handling for Production AI Agents

Production reality: AI agents fail. A lot.

Common failure modes:

  • OpenAI API timeout (happens 2-5% of requests during peak hours)
  • Rate limit exceeded (429 errors)
  • Model returns invalid JSON
  • External API (Stripe, GitHub, etc.) is down
  • Network issues
  • Context window exceeded

Without error handling:

User: "Analyze this dataset"
Agent: [Calls OpenAI]
OpenAI: [Timeout after 30 seconds]
Agent: [Crashes]
User sees: "Error 500"

With error handling:

User: "Analyze this dataset"
Agent: [Calls OpenAI]
OpenAI: [Timeout]
Agent: [Retries with exponential backoff]
OpenAI: [Success on retry 2]
Agent: Returns analysis
User sees: Analysis (never knew there was a failure)

Pattern 1: Retry with Exponential Backoff

When: Transient failures (API timeouts, rate limits, network issues).

Strategy: Retry failed requests with increasing delays.

Implementation:

import time
import random
from typing import Callable, Any

async def retry_with_backoff(
    func: Callable,
    max_retries: int = 5,
    initial_delay: float = 1.0,
    max_delay: float = 60.0,
    exponential_base: float = 2.0,
    jitter: bool = True
) -> Any:
    """
    Retry function with exponential backoff.
    
    Delays: 1s, 2s, 4s, 8s, 16s (with jitter)
    """
    last_exception = None
    
    for attempt in range(max_retries):
        try:
            result = await func()
            return result
        
        except RetryableError as e:
            last_exception = e
            
            if attempt == max_retries - 1:
                # Last attempt failed, raise
                raise
            
            # Calculate delay
            delay = min(
                initial_delay * (exponential_base ** attempt),
                max_delay
            )
            
            # Add jitter (randomness) to prevent thundering herd
            if jitter:
                delay = delay * (0.5 + random.random())
            
            print(f"Attempt {attempt + 1} failed: {e}. Retrying in {delay:.2f}s...")
            await asyncio.sleep(delay)
    
    raise last_exception

# Usage
async def call_openai_with_retry():
    return await retry_with_backoff(
        lambda: openai.ChatCompletion.create(
            model="gpt-4-turbo",
            messages=[{"role": "user", "content": "Analyze this data"}]
        ),
        max_retries=5
    )

Error Classification:

class RetryableError(Exception):
    """Errors that should be retried"""
    pass

class PermanentError(Exception):
    """Errors that shouldn't be retried"""
    pass

def classify_error(error):
    """Determine if error is retryable"""
    
    # Retryable errors
    if isinstance(error, (TimeoutError, ConnectionError)):
        return RetryableError(error)
    
    if hasattr(error, 'status_code'):
        # 429 = Rate limit (retry with backoff)
        # 500-599 = Server errors (retry)
        if error.status_code in [429, 500, 502, 503, 504]:
            return RetryableError(error)
        
        # 400-499 = Client errors (don't retry)
        if 400 <= error.status_code < 500:
            return PermanentError(error)
    
    # Default: Don't retry
    return PermanentError(error)

Why Jitter Matters

Without jitter: If 100 clients all retry at exactly 1s, 2s, 4s intervals → synchronized thundering herd hits API.

With jitter: Retries spread randomly over time window, reducing load spikes.

Example:

Without jitter (10 clients):
t=1s: |||||||||| (all 10 retry at once)
t=2s: |||||||||| (all 10 retry at once)

With jitter (10 clients):
t=0.8s: ||
t=1.1s: |||
t=1.3s: ||
t=1.7s: |||
(Spread evenly, no spike)

"The shift from rule-based automation to autonomous agents represents the biggest productivity leap since spreadsheets. Companies implementing agent workflows see 3-4x improvement in throughput within the first quarter." - Dr. Sarah Mitchell, Director of AI Research at Stanford HAI

Pattern 2: Circuit Breaker

When: Prevent cascading failures when downstream service is down.

Problem: If external API is down, retrying 1000× just makes things worse (wastes resources, delays failure detection).

Solution: After N consecutive failures, "open circuit" (stop trying) for X minutes. Then try again.

States:

  1. Closed (normal): Requests go through
  2. Open (broken): All requests fail immediately (no retries)
  3. Half-Open (testing): Try one request to see if service recovered

Implementation:

from enum import Enum
from datetime import datetime, timedelta

class CircuitState(Enum):
    CLOSED = "closed"
    OPEN = "open"
    HALF_OPEN = "half_open"

class CircuitBreaker:
    def __init__(
        self,
        failure_threshold: int = 5,
        recovery_timeout: int = 60,
        expected_exception: type = Exception
    ):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.expected_exception = expected_exception
        
        self.failure_count = 0
        self.last_failure_time = None
        self.state = CircuitState.CLOSED
    
    async def call(self, func):
        if self.state == CircuitState.OPEN:
            # Check if recovery timeout has passed
            if datetime.now() - self.last_failure_time > timedelta(seconds=self.recovery_timeout):
                self.state = CircuitState.HALF_OPEN
            else:
                raise CircuitBreakerOpen(f"Circuit open, retry after {self.recovery_timeout}s")
        
        try:
            result = await func()
            
            # Success: Reset circuit
            if self.state == CircuitState.HALF_OPEN:
                self.state = CircuitState.CLOSED
                self.failure_count = 0
            
            return result
        
        except self.expected_exception as e:
            self.failure_count += 1
            self.last_failure_time = datetime.now()
            
            # Open circuit if threshold exceeded
            if self.failure_count >= self.failure_threshold:
                self.state = CircuitState.OPEN
                print(f"Circuit breaker opened after {self.failure_count} failures")
            
            raise

# Usage
openai_circuit = CircuitBreaker(
    failure_threshold=5,        # Open after 5 failures
    recovery_timeout=60,        # Wait 60s before retrying
    expected_exception=APIError
)

async def call_openai_protected():
    return await openai_circuit.call(
        lambda: openai.ChatCompletion.create(...)
    )

Real Example:

13:00: API call → Success (circuit: CLOSED)
13:01: API call → Success (circuit: CLOSED)
13:02: API call → Timeout (failure count: 1)
13:02: API call → Timeout (failure count: 2)
13:02: API call → Timeout (failure count: 3)
13:03: API call → Timeout (failure count: 4)
13:03: API call → Timeout (failure count: 5)
13:03: Circuit OPENS (stops trying)
13:03-13:04: All calls fail immediately with "Circuit open"
13:04: Circuit enters HALF_OPEN (tries one request)
13:04: API call → Success → Circuit CLOSES
13:04: All calls work normally again

Benefit: Prevents wasting time on doomed requests, allows service to recover.

Pattern 3: Fallback Mechanisms

When: Primary path fails, use alternative.

Fallback 1: Cheaper Model

async def call_with_model_fallback(prompt, max_retries=2):
    models = [
        ("gpt-4-turbo", 0.01),      # Primary: Best quality
        ("gpt-3.5-turbo", 0.002),   # Fallback 1: Cheaper
        ("claude-3-haiku", 0.001)   # Fallback 2: Cheapest
    ]
    
    for model_name, cost_per_token in models:
        try:
            response = await retry_with_backoff(
                lambda: call_llm(model_name, prompt),
                max_retries=max_retries
            )
            return response
        
        except Exception as e:
            print(f"{model_name} failed: {e}. Trying next model...")
            continue
    
    raise AllModelsFailed("All models failed")

Fallback 2: Cached Response

async def call_with_cache_fallback(prompt):
    cache_key = hash_prompt(prompt)
    
    try:
        # Try live API call
        response = await call_llm(prompt)
        
        # Cache successful response
        cache.set(cache_key, response, ttl=3600)
        return response
    
    except Exception as e:
        # API failed, check cache
        cached_response = cache.get(cache_key)
        
        if cached_response:
            print(f"API failed, returning cached response from {cached_response['cached_at']}")
            return cached_response
        
        raise  # No cache available, re-raise error

Fallback 3: Human Escalation

async def call_with_human_fallback(task, max_auto_retries=3):
    try:
        return await retry_with_backoff(
            lambda: agent.execute(task),
            max_retries=max_auto_retries
        )
    
    except Exception as e:
        # Agent failed, escalate to human
        ticket_id = create_support_ticket(
            title=f"Agent failed: {task['type']}",
            description=f"Error: {e}\nTask: {task}",
            priority="high"
        )
        
        await notify_on_call_human(ticket_id)
        
        return {
            "status": "escalated_to_human",
            "ticket_id": ticket_id,
            "message": "An engineer has been notified and will handle this manually."
        }

Pattern 4: Timeout Management

Problem: Agent waits forever for slow API response.

Solution: Set timeouts at multiple levels.

import asyncio

async def call_with_timeout(func, timeout_seconds=30):
    try:
        return await asyncio.wait_for(func(), timeout=timeout_seconds)
    
    except asyncio.TimeoutError:
        raise TimeoutError(f"Operation exceeded {timeout_seconds}s timeout")

# Multi-level timeouts
async def agent_workflow():
    # Level 1: Individual LLM call (30s timeout)
    llm_response = await call_with_timeout(
        lambda: call_llm(prompt),
        timeout_seconds=30
    )
    
    # Level 2: External API call (10s timeout)
    api_data = await call_with_timeout(
        lambda: fetch_external_api(),
        timeout_seconds=10
    )
    
    # Level 3: Entire workflow (5 minute timeout)
    return api_data

# Enforce workflow-level timeout
result = await call_with_timeout(
    agent_workflow,
    timeout_seconds=300
)

Timeout Values:

OperationTimeoutRationale
LLM API call30-60sOpenAI/Anthropic typically respond in 2-10s, but can spike to 30s
External API10sMost APIs respond <1s, 10s is generous
Database query5sShould be fast, >5s indicates problem
Entire workflow5-10minPrevents infinite hangs

Pattern 5: Graceful Degradation

When: Can't provide full functionality, provide partial functionality.

Example: E-commerce recommendation agent

Full functionality: Personalized recommendations based on user history + current trends + inventory

Degraded functionality:

  1. User history unavailable → Use only trends + inventory
  2. Trends API down → Use only user history + inventory
  3. Both down → Generic bestsellers from inventory
  4. All services down → Static curated list

Implementation:

async def get_recommendations(user_id):
    recommendations = []
    
    # Try personalized (best)
    try:
        user_history = await call_with_timeout(
            lambda: fetch_user_history(user_id),
            timeout_seconds=5
        )
        recommendations.extend(
            await generate_personalized(user_history)
        )
    except Exception:
        print("Personalization failed, degrading...")
    
    # Try trending (good)
    try:
        trends = await call_with_timeout(
            lambda: fetch_trending_items(),
            timeout_seconds=5
        )
        recommendations.extend(trends[:10])
    except Exception:
        print("Trends failed, degrading further...")
    
    # Fallback to bestsellers (okay)
    if not recommendations:
        try:
            bestsellers = await fetch_bestsellers()
            recommendations.extend(bestsellers[:10])
        except Exception:
            print("Bestsellers failed, using static fallback...")
    
    # Last resort: Static curated list (minimal)
    if not recommendations:
        recommendations = STATIC_CURATED_LIST
    
    return recommendations

User experience:

  • Full service: Excellent (personalized)
  • Partial failure: Good (trending items)
  • Major failure: Acceptable (bestsellers)
  • Complete failure: Usable (static list)

Better than: Complete failure with "Error 500" message.

Error Monitoring and Alerting

Track Error Rates

from prometheus_client import Counter, Histogram

# Metrics
errors_total = Counter('agent_errors_total', 'Total errors', ['error_type', 'agent_name'])
retry_count = Counter('agent_retries_total', 'Total retries', ['agent_name'])
latency = Histogram('agent_latency_seconds', 'Request latency', ['agent_name'])

async def monitored_agent_call(agent_name, task):
    start_time = time.time()
    retry_attempts = 0
    
    try:
        result = await retry_with_backoff(
            lambda: agent.execute(task),
            max_retries=5
        )
        
        # Record success metrics
        latency.labels(agent_name=agent_name).observe(time.time() - start_time)
        
        return result
    
    except Exception as e:
        # Record error metrics
        error_type = type(e).__name__
        errors_total.labels(error_type=error_type, agent_name=agent_name).inc()
        
        raise
    
    finally:
        retry_count.labels(agent_name=agent_name).inc(retry_attempts)

Alert Thresholds

alerts:
  - name: HighErrorRate
    condition: error_rate > 0.05  # 5% error rate
    duration: 5m
    action: page_oncall_engineer
    
  - name: CircuitBreakerOpen
    condition: circuit_breaker_state == "open"
    duration: 1m
    action: send_slack_alert
    
  - name: HighLatency
    condition: p95_latency > 60s
    duration: 10m
    action: send_slack_alert

Production Checklist

Before deploying agent to production:

  • [ ] Retry logic with exponential backoff for all external calls
  • [ ] Circuit breakers for critical dependencies
  • [ ] Timeouts at operation, workflow, and system levels
  • [ ] Fallback mechanisms for degraded functionality
  • [ ] Error classification (retryable vs permanent)
  • [ ] Monitoring error rates, latency, retry counts
  • [ ] Alerting for high error rates, circuit breaker trips
  • [ ] Logging all errors with context (user ID, task, timestamp)
  • [ ] Dead letter queue for failed tasks (manual review)
  • [ ] Graceful degradation paths defined

Frequently Asked Questions

How many retries should I configure?

Recommendation: 3-5 retries for most cases.

  • Too few (1-2): Transient failures cause user-visible errors
  • Too many (10+): Wastes time on permanent failures

Exception: Critical operations (payments, data loss) may warrant 10+ retries.

Should I retry on all errors?

No. Only retry transient errors:

  • ✅ Timeout, rate limit, 5xx server errors
  • ❌ Authentication failure, invalid input, 4xx client errors

How long should circuit breaker stay open?

Standard: 60 seconds.

  • Too short (5s): Circuit closes before service recovers, reopens immediately
  • Too long (10min): Users wait unnecessarily long after service recovers

Tune based on monitoring: If circuit reopens frequently, increase timeout.

What's the performance cost of error handling?

Retry overhead: Adds latency only when failures occur (0% overhead in happy path).

Circuit breaker overhead: ~1ms per call (negligible).

Monitoring overhead: ~5-10ms per call (acceptable for production observability).

---

Bottom line: Production AI agents require robust error handling. Implement retry with exponential backoff, circuit breakers, timeouts, fallbacks, and graceful degradation. Proper error handling increases reliability from 87% to 99.2%. Monitor error rates and set alerts for anomalies.

Next: Read our Agent Observability guide for comprehensive monitoring strategies.

More from the blog

Stop doing the work around the work

OpenHelm connects to your tools, reads the context, and does the steps, so you sign off on the result instead of producing it. See how it covers an entire role’s weekly workload, check the pricing, or run it yourself with the free local app.