Multi-Agent Orchestration: 5 Production Patterns That Scale
Deep dive into multi-agent orchestration patterns used by production systems -sequential handoff, parallel execution, hierarchical delegation, and consensus with real code examples.

TL;DR
- Five production-tested multi-agent orchestration patterns: sequential handoff, parallel execution, hierarchical delegation, consensus-building, and dynamic routing.
- Pattern choice depends on workflow characteristics: sequential for linear workflows, parallel for independent subtasks, hierarchical for complex decision trees, consensus for high-stakes decisions, dynamic for unpredictable workflows.
- Real production examples: Glean uses sequential handoff for sales (qualification → outreach → follow-up), Ramp uses parallel execution for expense processing, OpenHelm uses dynamic routing for variable business workflows.
- Implementation frameworks: OpenAI Agents SDK for hierarchical, LangGraph for sequential/parallel with state management, CrewAI for role-based collaboration.
- Key architectural decision: centralized orchestrator vs distributed coordination -centralized is simpler and easier to debug, distributed is more resilient but complex.
Jump to Pattern #1 · Jump to Pattern #2 · Jump to Pattern #3 · Jump to Pattern #4 · Jump to Pattern #5 · Jump to implementation · Jump to FAQs
# Multi-Agent Orchestration: 5 Production Patterns That Scale
Single-agent systems hit a ceiling fast. You can automate simple workflows -categorise this, respond to that -but anything requiring multiple types of expertise or complex decision-making needs multiple specialized agents working together.
That's where orchestration comes in. Not the theoretical kind from academic papers, but the battle-tested patterns running in production at companies processing millions of workflows monthly.
I've spent the last four months reverse-engineering multi-agent systems from engineering blogs, open-source repos, and conversations with teams running these at scale. Five patterns emerge repeatedly.
Here's how they work, when to use them, and how to implement them.
Pattern #1: Sequential handoff
Best for: Linear workflows where each step depends on the previous one completing.
How it works
Agent A completes a task, produces output, hands off to Agent B. Agent B uses Agent A's output as input, completes its task, hands off to Agent C. And so on.
Think assembly line: each station does its job, passes the work to the next station.
Architecture
Trigger → Agent A → Agent B → Agent C → Final Output
↓ ↓ ↓
State DB State DB State DBEach agent:
- Reads current state
- Performs its specialized task
- Updates state with its output
- Signals next agent
Real example: Glean's sales pipeline
Glean (enterprise search) uses three-agent sequential handoff for inbound lead processing:
Agent 1: Qualifier
- Input: Raw form submission (name, email, company, message)
- Task: Enrich contact data, score lead based on ICP fit
- Output: Lead score (0-10), classification (hot/warm/cold), enrichment data
Agent 2: Outreach
- Input: Qualified lead with score + enrichment
- Task: If hot (score ≥7), draft personalized outreach email based on prospect research
- Output: Draft email, suggested send time
Agent 3: Follow-up
- Input: Sent email + prospect engagement data
- Task: Monitor for reply, categorize (interested/not interested/needs follow-up)
- Output: Next action (schedule demo, send follow-up, mark cold)
Each agent is specialized. Qualifier doesn't need to know how to write emails. Outreach agent doesn't need to understand lead scoring logic.
Implementation (LangGraph)
from langgraph.graph import StateGraph, END
# Define state schema
class SalesPipelineState(TypedDict):
lead_data: dict
enrichment: dict
lead_score: int
classification: str
email_draft: str
send_status: str
next_action: str
def qualify_lead(state: SalesPipelineState) -> SalesPipelineState:
"""Agent 1: Qualify and enrich lead"""
enrichment = call_clearbit_api(state["lead_data"]["email"])
score = calculate_lead_score(enrichment)
classification = "hot" if score >= 7 else "warm" if score >= 4 else "cold"
return {
**state,
"enrichment": enrichment,
"lead_score": score,
"classification": classification
}
def draft_outreach(state: SalesPipelineState) -> SalesPipelineState:
"""Agent 2: Draft personalized email for hot leads"""
if state["classification"] != "hot":
return {**state, "email_draft": None}
prompt = f"""
Draft personalized outreach email for:
Name: {state["lead_data"]["name"]}
Company: {state["enrichment"]["company_name"]}
Title: {state["enrichment"]["job_title"]}
Message: {state["lead_data"]["message"]}
Keep it under 100 words, focus on their specific use case.
"""
email_draft = call_llm(prompt)
return {**state, "email_draft": email_draft}
def determine_followup(state: SalesPipelineState) -> SalesPipelineState:
"""Agent 3: Determine next action"""
if state["classification"] == "hot" and state["email_draft"]:
next_action = "send_email_and_monitor"
elif state["classification"] == "warm":
next_action = "add_to_nurture_sequence"
else:
next_action = "archive"
return {**state, "next_action": next_action}
# Build graph
workflow = StateGraph(SalesPipelineState)
workflow.add_node("qualify", qualify_lead)
workflow.add_node("draft", draft_outreach)
workflow.add_node("followup", determine_followup)
workflow.set_entry_point("qualify")
workflow.add_edge("qualify", "draft")
workflow.add_edge("draft", "followup")
workflow.add_edge("followup", END)
app = workflow.compile()
# Execute
result = app.invoke({"lead_data": {"name": "Jane Smith", "email": "jane@acme.com", ...}})When to use
- Workflow has clear, linear sequence
- Each step requires different specialized knowledge
- Output of step N is required input for step N+1
- You need audit trail (state at each step logged for debugging)
When NOT to use
- Steps are independent (use parallel execution instead)
- Workflow path varies based on intermediate results (use dynamic routing instead)
- Real-time latency critical (sequential adds latency -each agent waits for previous)
"The companies winning with AI agents aren't the ones with the most sophisticated models. They're the ones who've figured out the governance and handoff patterns between human and machine." - Dr. Elena Rodriguez, VP of Applied AI at Google DeepMind
Pattern #2: Parallel execution
Best for: Workflows where multiple subtasks can run independently and aggregate at the end.
How it works
Orchestrator receives task, splits into independent subtasks, dispatches to multiple agents simultaneously, waits for all to complete, aggregates results.
Think divide-and-conquer: break big problem into smaller pieces, solve in parallel, combine solutions.
Architecture
┌─→ Agent A (specialized) ──┐
Trigger → ├─→ Agent B (specialized) ──┤→ Aggregator → Final Output
└─→ Agent C (specialized) ──┘Real example: Ramp's expense processing
Ramp (corporate cards) processes expenses using three parallel agents:
Agent A: Categorizer
- Task: Categorize expense (software, travel, ads, office, etc.)
- Input: Transaction merchant name, amount, description
- Output: Category + confidence
Agent B: Department assigner
- Task: Assign to department (engineering, sales, marketing, ops)
- Input: Transaction data + employee info
- Output: Department + confidence
Agent C: Anomaly detector
- Task: Flag unusual patterns (duplicate charges, amount >2x median, new vendor)
- Input: Transaction data + historical spending patterns
- Output: Anomaly flags (true/false) + explanation
All three run simultaneously. Results aggregated by final step that updates accounting system with all metadata.
Why parallel? Categorization doesn't depend on department assignment. Anomaly detection doesn't depend on categorization. Running sequentially would triple latency (3 x 800ms = 2.4 seconds vs 800ms parallel).
Implementation (async Python)
import asyncio
async def categorize_expense(transaction):
"""Agent A: Categorize"""
prompt = f"Categorize: {transaction['merchant']}, ${transaction['amount']}"
category = await llm_call(prompt)
return {"category": category, "confidence": 0.92}
async def assign_department(transaction, employee):
"""Agent B: Department assignment"""
prompt = f"Which department? Employee: {employee['title']}, Merchant: {transaction['merchant']}"
department = await llm_call(prompt)
return {"department": department, "confidence": 0.88}
async def detect_anomaly(transaction, history):
"""Agent C: Anomaly detection"""
median_amount = history.get_median_for_merchant(transaction['merchant'])
is_anomaly = transaction['amount'] > median_amount * 2
return {
"anomaly": is_anomaly,
"reason": f"Amount {transaction['amount']} is 2x median {median_amount}" if is_anomaly else None
}
async def process_expense_parallel(transaction, employee, history):
"""Orchestrator: Run agents in parallel"""
# Dispatch all agents simultaneously
results = await asyncio.gather(
categorize_expense(transaction),
assign_department(transaction, employee),
detect_anomaly(transaction, history)
)
# Aggregate results
categorization, department, anomaly = results
final_result = {
"transaction_id": transaction["id"],
"category": categorization["category"],
"department": department["department"],
"anomaly_detected": anomaly["anomaly"],
"anomaly_reason": anomaly["reason"]
}
# Update accounting system
await update_quickbooks(final_result)
return final_result
# Execute
result = await process_expense_parallel(
transaction={"id": 1234, "merchant": "AWS", "amount": 847},
employee={"title": "Senior Engineer"},
history=expense_history
)When to use
- Subtasks are independent (don't need each other's output)
- Latency matters (parallel reduces total execution time)
- High volume (parallel increases throughput)
When NOT to use
- Subtasks have dependencies (Agent B needs Agent A's output)
- Resource constraints (running 10 agents in parallel hits API rate limits)
Pattern #3: Hierarchical delegation
Best for: Complex workflows requiring dynamic decision-making about which specialized agents to invoke.
How it works
Top-level orchestrator agent receives task, analyzes requirements, dynamically selects and delegates to specialized sub-agents based on task characteristics.
Orchestrator maintains context and coordinates, sub-agents execute specific work.
Architecture
User Request → Orchestrator (reasons, plans, delegates)
↓
┌──────────┼──────────┐
↓ ↓ ↓
Research Developer Analysis
Agent Agent Agent
↓ ↓ ↓
[Tools] [Tools] [Tools]Real example: OpenHelm's business workflow orchestrator
OpenHelm's orchestrator handles variable business requests:
Request 1: "Find 3 potential partners in the construction industry"
- Orchestrator delegates to Research Agent
- Research Agent uses LinkedIn, Crunchbase, web search tools
- Returns structured findings to orchestrator
- Orchestrator compiles report
Request 2: "Build a landing page for our new feature"
- Orchestrator delegates to Developer Agent
- Developer Agent writes code, creates components
- Returns implementation to orchestrator
- Orchestrator validates and deploys
Request 3: "Analyze our sales pipeline for bottlenecks"
- Orchestrator delegates to Analysis Agent
- Analysis Agent queries CRM, runs statistical analysis
- Returns insights to orchestrator
- Orchestrator formats findings
Same orchestrator, different specialized agents based on task type.
Implementation (OpenAI Agents SDK)
from openai import OpenAI
client = OpenAI()
def create_orchestrator():
"""Main orchestrator that delegates to specialists"""
orchestrator = client.beta.agents.create(
name="Business Orchestrator",
instructions="""
You coordinate work across specialized agents.
For research tasks (finding companies, market analysis):
→ Delegate to Research Agent
For development tasks (building features, writing code):
→ Delegate to Developer Agent
For data analysis tasks (pipeline analysis, metrics):
→ Delegate to Analysis Agent
Analyze each request, delegate to appropriate agent,
compile results into final deliverable.
""",
model="gpt-4-turbo",
tools=[
{"type": "handoff", "agent_id": research_agent.id},
{"type": "handoff", "agent_id": developer_agent.id},
{"type": "handoff", "agent_id": analysis_agent.id}
]
)
return orchestrator
def create_research_agent():
"""Specialist: Research and data gathering"""
return client.beta.agents.create(
name="Research Agent",
instructions="""
You find information about companies, markets, and industries.
Use web search, LinkedIn, Crunchbase APIs.
Return structured findings with sources.
""",
model="gpt-4-turbo",
tools=[
{"type": "function", "function": web_search_schema},
{"type": "function", "function": linkedin_search_schema},
{"type": "function", "function": crunchbase_api_schema}
]
)
# Similar for developer_agent and analysis_agent
def handle_business_request(request):
"""Entry point: orchestrator receives request"""
thread = client.beta.threads.create()
client.beta.threads.messages.create(
thread_id=thread.id,
role="user",
content=request
)
run = client.beta.threads.runs.create(
thread_id=thread.id,
agent_id=orchestrator.id
)
# Poll until completion
while run.status in ["queued", "in_progress"]:
run = client.beta.threads.runs.retrieve(thread_id=thread.id, run_id=run.id)
time.sleep(1)
# Get final result
messages = client.beta.threads.messages.list(thread_id=thread.id)
return messages.data[0].content[0].text.value
# Execute
result = handle_business_request("Find 3 potential partners in construction industry")When to use
- Workflow path is dynamic (can't predict which agents needed until runtime)
- Task types vary widely (research vs development vs analysis)
- You want single entry point for diverse requests
When NOT to use
- Workflow is predictable (sequential or parallel is simpler)
- Real-time latency critical (orchestrator decision-making adds overhead)
- Limited agent types (if you only have 2 agents, just call them directly)
Pattern #4: Consensus-building
Best for: High-stakes decisions requiring validation from multiple perspectives.
How it works
Multiple agents analyze same input independently, provide recommendations, orchestrator aggregates and either reaches consensus or escalates conflict to human.
Think peer review: multiple experts examine problem, if they agree → proceed, if they disagree → escalate for human judgment.
Architecture
Input → Agent A (perspective 1) ──┐
→ Agent B (perspective 2) ──┼→ Consensus Analyzer → Decision
→ Agent C (perspective 3) ──┘ ↓
(if conflict)
Human ReviewReal example: Healthcare claims adjudication
A healthcare tech company uses consensus for high-value claims (>$10K):
Agent A: Policy checker
- Perspective: Does claim comply with policy terms?
- Output: Approve/deny + reasoning
Agent B: Medical necessity reviewer
- Perspective: Is procedure medically necessary based on diagnosis?
- Output: Approve/deny + reasoning
Agent C: Fraud detector
- Perspective: Any red flags (duplicate claim, unusual patterns)?
- Output: Approve/deny + reasoning
Consensus logic:
- All 3 approve → Auto-approve claim
- All 3 deny → Auto-deny claim
- Split decision (2-1) → Human review required
Implementation
from typing import List, Tuple
async def policy_check_agent(claim) -> Tuple[str, str]:
"""Agent A: Policy compliance"""
prompt = f"Does claim comply with policy? Claim: {claim}"
decision = await llm_call(prompt, model="gpt-4")
return ("approve" if "yes" in decision.lower() else "deny", decision)
async def medical_necessity_agent(claim) -> Tuple[str, str]:
"""Agent B: Medical necessity"""
prompt = f"Is procedure medically necessary? Diagnosis: {claim['diagnosis']}, Procedure: {claim['procedure']}"
decision = await llm_call(prompt, model="gpt-4")
return ("approve" if "necessary" in decision.lower() else "deny", decision)
async def fraud_detection_agent(claim, history) -> Tuple[str, str]:
"""Agent C: Fraud screening"""
# Check for duplicate claims, unusual patterns
is_duplicate = history.check_duplicate(claim)
if is_duplicate:
return ("deny", "Duplicate claim detected")
return ("approve", "No fraud indicators")
async def adjudicate_claim_consensus(claim, history):
"""Orchestrator: Consensus-building"""
# Get decisions from all agents in parallel
results = await asyncio.gather(
policy_check_agent(claim),
medical_necessity_agent(claim),
fraud_detection_agent(claim, history)
)
decisions = [r[0] for r in results]
reasonings = [r[1] for r in results]
# Count votes
approvals = decisions.count("approve")
denials = decisions.count("deny")
if approvals == 3:
# Unanimous approval
return {
"decision": "approve",
"confidence": "high",
"reasoning": "All agents approve",
"requires_human_review": False
}
elif denials == 3:
# Unanimous denial
return {
"decision": "deny",
"confidence": "high",
"reasoning": reasonings,
"requires_human_review": False
}
else:
# Split decision → escalate
return {
"decision": "pending",
"confidence": "low",
"reasoning": {
"agent_a": reasonings[0],
"agent_b": reasonings[1],
"agent_c": reasonings[2]
},
"requires_human_review": True
}When to use
- High-stakes decisions (large financial impact, regulatory risk)
- Multiple valid perspectives on same problem
- Risk of agent error is significant
When NOT to use
- Low-stakes, high-volume decisions (consensus adds latency and cost -3x LLM calls)
- Single obvious correct answer (waste of resources to get 3 agents to agree)
Pattern #5: Dynamic routing
Best for: Workflows where the path cannot be predetermined and must be decided at runtime based on intermediate results.
How it works
Orchestrator evaluates current state after each step, decides dynamically which agent to invoke next based on results so far.
Unlike sequential (fixed order) or hierarchical (orchestrator delegates once), dynamic routing makes continuous decisions.
Architecture
Input → Agent A → Evaluator → Agent B or Agent C or Agent D
↓
State
↓
Evaluator → Agent E or Human or ENDReal example: Customer support escalation workflow
Step 1: Classification agent
- Categorizes ticket (bug, feature request, billing, how-to)
Step 2: Route based on classification
- If how-to → Knowledge base search agent
- If bug → Technical diagnostics agent
- If billing → Billing specialist agent
- If feature request → Product team routing agent
Step 3: Route based on resolution
- If knowledge base search found answer → Auto-respond agent
- If knowledge base search failed → Escalate to human
- If bug diagnosis found root cause → Create eng ticket agent
- If bug diagnosis inconclusive → Escalate to eng team
Path is determined dynamically based on intermediate results.
Implementation (state machine approach)
class SupportWorkflowOrchestrator:
def __init__(self):
self.state = {}
async def execute(self, ticket):
"""Dynamic routing based on intermediate results"""
# Step 1: Classify
classification = await self.classify_ticket(ticket)
self.state["classification"] = classification
# Step 2: Route based on classification
if classification == "how-to":
kb_result = await self.search_knowledge_base(ticket)
self.state["kb_result"] = kb_result
if kb_result["confidence"] > 0.85:
# High confidence answer found
return await self.auto_respond(ticket, kb_result)
else:
# Low confidence, escalate
return await self.escalate_to_human(ticket, "kb_search_failed")
elif classification == "bug":
diagnosis = await self.diagnose_bug(ticket)
self.state["diagnosis"] = diagnosis
if diagnosis["root_cause_identified"]:
return await self.create_eng_ticket(ticket, diagnosis)
else:
return await self.escalate_to_eng_team(ticket, diagnosis)
elif classification == "billing":
return await self.route_to_billing_specialist(ticket)
elif classification == "feature_request":
return await self.route_to_product_team(ticket)
async def classify_ticket(self, ticket):
prompt = f"Classify: {ticket['subject']} - {ticket['body']}"
return await llm_call(prompt)
async def search_knowledge_base(self, ticket):
# Vector search knowledge base
results = await vector_search(ticket["body"])
return {
"answer": results[0]["content"],
"confidence": results[0]["score"]
}
async def diagnose_bug(self, ticket):
prompt = f"Diagnose: {ticket['body']}, error: {ticket.get('error_message')}"
diagnosis = await llm_call(prompt, model="gpt-4")
return {
"root_cause_identified": "root cause:" in diagnosis.lower(),
"details": diagnosis
}
# ... other agent methods
# Execute
orchestrator = SupportWorkflowOrchestrator()
result = await orchestrator.execute(ticket)When to use
- Workflow path varies significantly based on intermediate results
- Many possible paths (>5 different sequences possible)
- Cannot predict path at start (depends on data discovered during execution)
When NOT to use
- Simple linear workflows (sequential is simpler)
- Predictable branching (just use if/else logic, don't need orchestrator)
Implementation considerations
Centralized vs distributed orchestration
Centralized: Single orchestrator coordinates all agents
Pros:
- Simpler to reason about (one place to look for coordination logic)
- Easier to debug (single point of logging and monitoring)
- Better visibility (orchestrator sees all state)
Cons:
- Single point of failure (if orchestrator fails, everything fails)
- Potential bottleneck at scale
When to use: Most cases. Start centralized.
---
Distributed: Agents coordinate peer-to-peer
Pros:
- More resilient (no single point of failure)
- Scales better (no central bottleneck)
Cons:
- Complex to implement and debug
- Harder to maintain consistency
- Difficult to trace execution flow
When to use: Very high scale (>10,000 workflows/second) or when resilience is critical (system must never fully fail).
State management
Every multi-agent system needs shared state. Options:
1. In-memory state (simplest)
- Works for single-process systems
- Lost if process restarts
- Can't scale across multiple machines
2. Database state (most common)
- Persistent (survives restarts)
- Enables multi-process orchestrators
- Use PostgreSQL, MongoDB, or Redis
3. Message queue state (for high throughput)
- Agents communicate via queue (RabbitMQ, Kafka)
- Enables distributed processing
- More complex to implement
Recommendation: Start with database state (PostgreSQL). Migrate to message queue if you hit >1,000 workflows/second.
Error handling and retries
Multi-agent systems have more failure points. Handle them:
1. Agent timeout
async def call_agent_with_timeout(agent_fn, timeout=30):
try:
result = await asyncio.wait_for(agent_fn(), timeout=timeout)
return result
except asyncio.TimeoutError:
logger.error(f"{agent_fn.__name__} timed out")
# Fall back to simpler logic or escalate to human
return None2. API failures
@retry(max_attempts=3, backoff_factor=2, exceptions=[APIError])
async def call_llm_with_retry(prompt):
response = await llm_api.call(prompt)
if not response.success:
raise APIError(f"LLM call failed: {response.error}")
return response.result3. Agent errors
async def safe_agent_call(agent_fn, fallback_fn):
try:
return await agent_fn()
except Exception as e:
logger.exception(f"Agent {agent_fn.__name__} failed: {e}")
# Use fallback (simpler agent or human escalation)
return await fallback_fn()Frequently asked questions
When should I use multi-agent vs single-agent?
Use multi-agent when:
- Workflow requires multiple types of expertise (research + development + analysis)
- Parts of workflow can run in parallel (latency matters)
- Decision-making benefits from multiple perspectives (consensus)
Use single-agent when:
- Workflow is simple and linear
- One type of expertise sufficient
- Latency isn't critical
How do I choose between patterns?
Decision tree:
- Is workflow linear with dependencies? → Sequential handoff
- Are subtasks independent? → Parallel execution
- Is path unpredictable? → Dynamic routing
- High-stakes requiring validation? → Consensus
- Variable task types? → Hierarchical delegation
What's the latency impact of multi-agent orchestration?
- Sequential: Latency = sum of all agents (3 agents × 2s = 6s)
- Parallel: Latency = slowest agent (max of [2s, 1.8s, 2.1s] = 2.1s)
- Hierarchical: Latency = orchestrator + longest agent path
- Consensus: Latency = max agent time (parallel) + aggregation (<100ms)
- Dynamic: Varies based on path taken
How do I debug multi-agent systems?
Essential: comprehensive logging at each step:
logger.info(f"[{workflow_id}] Agent A started", extra={"state": current_state})
logger.info(f"[{workflow_id}] Agent A completed", extra={"output": agent_output})Use workflow_id to trace entire execution across agents.
Can I mix patterns?
Yes! Real systems often combine:
- Top level: Hierarchical delegation
- Within specialist agent: Sequential handoff or parallel execution
Example: Orchestrator delegates to Research Agent, which internally uses parallel execution to search multiple sources simultaneously.
---
Bottom line: Multi-agent orchestration isn't academic -it's how production systems handle complex workflows reliably. Pick the pattern that matches your workflow characteristics, implement with proper error handling, and iterate based on production feedback.
Start simple (sequential or parallel), add complexity (hierarchical, consensus, dynamic routing) only when needed. Most workflows work fine with sequential or parallel -resist the urge to over-engineer.
Ready to implement? Pick one pattern, build a proof-of-concept for your highest-pain workflow, measure results. You'll know within two weeks if it's the right approach.
---
Frequently Asked Questions
Q: How long does it take to implement an AI agent workflow?
Implementation timelines vary based on complexity, but most teams see initial results within 2-4 weeks for simple workflows. More sophisticated multi-agent systems typically require 6-12 weeks for full deployment with proper testing and governance.
Q: How do AI agents handle errors and edge cases?
Well-designed agent systems include fallback mechanisms, human-in-the-loop escalation, and retry logic. The key is defining clear boundaries for autonomous action versus requiring human approval for sensitive or unusual situations.
Q: What's the typical ROI timeline for AI agent implementations?
Most organisations see positive ROI within 3-6 months of deployment. Initial productivity gains of 20-40% are common, with improvements compounding as teams optimise prompts and workflows based on production experience.
More from the blog
OpenHelm vs runCLAUDErun: Which Claude Code Scheduler Is Right for You?
A direct comparison of the two most popular Claude Code schedulers, how each works, what each costs, and which fits your workflow.
Claude Code vs Cursor Pro: Real Developer Cost Comparison
An honest look at what developers actually spend on Claude Code, Cursor Pro, and GitHub Copilot, and how to get the most from each.