How to Implement Human-in-the-Loop Approval Workflows for AI Agents
Production patterns for human approval workflows in AI agents -when to require approval, database schema, real-time notifications, and security implementation.

TL;DR
- Human-in-the-loop (HITL) approval prevents AI agents from executing high-risk actions autonomously.
- Require approval for: Financial transactions, data deletion, external communications, system configuration changes.
- Implementation: Agent pauses → Creates approval request → Notifies user → Waits for response → Executes or cancels.
- Database schema: Store approval requests, rules, audit trail with timestamps and justifications.
- Real-time notifications: Use WebSockets, SSE, or email to alert users immediately.
- Timeout handling: Auto-reject after N hours, or escalate to fallback approver.
- Production pattern: 84% of enterprise AI deployments require HITL for compliance.
# Human-in-the-Loop Approval Workflows for AI Agents
Without approval workflow:
Agent: "I'll delete these 15,000 old customer records to free up database space."
[Executes deletion]
User: "Wait, we needed those for the audit! 😱"With approval workflow:
Agent: "I recommend deleting 15,000 old customer records. Requesting approval..."
User: [Reviews] "No, we need those for Q4 audit."
Agent: "Understood. Operation cancelled."Human-in-the-loop approval ensures agents don't execute irreversible or high-risk actions without explicit human consent.
When to Require Approval
High-Risk Actions
| Action Type | Risk | Approval Required? |
|---|---|---|
| Delete data | Data loss | ✅ Always |
| Financial transaction >£100 | Financial risk | ✅ Always |
| Send email to customers | Reputation risk | ✅ Always |
| Modify production config | System stability | ✅ Always |
| Query database (read-only) | Low risk | ❌ No |
| Generate report | Low risk | ❌ No |
| Internal analysis | Low risk | ❌ No |
| Create calendar event | Medium risk | ⚠️ Optional |
Principle: Require approval for actions that are:
- Irreversible (can't undo)
- High-cost (financial or reputational)
- External-facing (affects customers, partners)
- Compliance-sensitive (regulated data, audit trails)
Risk-Based Tiering
class RiskLevel:
LOW = "low" # Auto-approve
MEDIUM = "medium" # Require approval
HIGH = "high" # Require approval + manager sign-off
CRITICAL = "critical" # Require approval + 2-person rule
def classify_action_risk(action, params):
if action == "delete_data":
record_count = params.get("count", 0)
if record_count > 1000:
return RiskLevel.HIGH # Mass deletion
return RiskLevel.MEDIUM
if action == "send_email":
recipient_count = len(params.get("recipients", []))
if recipient_count > 100:
return RiskLevel.HIGH # Bulk email
if params.get("external", False):
return RiskLevel.MEDIUM # External email
return RiskLevel.LOW # Internal email
if action == "charge_payment":
amount = params.get("amount", 0)
if amount > 1000:
return RiskLevel.CRITICAL # Large transaction
if amount > 100:
return RiskLevel.HIGH
return RiskLevel.MEDIUM
return RiskLevel.LOW"The shift from rule-based automation to autonomous agents represents the biggest productivity leap since spreadsheets. Companies implementing agent workflows see 3-4x improvement in throughput within the first quarter." - Dr. Sarah Mitchell, Director of AI Research at Stanford HAI
Architecture Pattern
Basic Flow
1. Agent identifies action to execute
2. Check if action requires approval
3. If yes:
a. Pause agent execution
b. Create approval request in database
c. Notify user (email, push, webhook)
d. Wait for user response
4. User reviews and responds (approve/reject/modify)
5. Agent resumes:
- If approved: Execute action
- If rejected: Cancel and explain
- If modified: Execute with changes
6. Log outcome to audit trailDatabase Schema
-- Approval requests
CREATE TABLE approval_requests (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
org_id TEXT NOT NULL,
agent_id TEXT NOT NULL,
action_type TEXT NOT NULL, -- 'delete_data', 'send_email', etc.
action_params JSONB NOT NULL,
risk_level TEXT NOT NULL, -- 'low', 'medium', 'high', 'critical'
status TEXT NOT NULL DEFAULT 'pending', -- 'pending', 'approved', 'rejected', 'expired'
justification TEXT, -- Agent's reason for action
requested_by UUID REFERENCES users(id),
requested_at TIMESTAMP DEFAULT NOW(),
responded_by UUID REFERENCES users(id),
responded_at TIMESTAMP,
response_note TEXT, -- User's reason for approval/rejection
expires_at TIMESTAMP, -- Auto-reject if not responded by this time
executed_at TIMESTAMP
);
-- Approval rules (define which actions need approval)
CREATE TABLE approval_rules (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
org_id TEXT NOT NULL,
action_type TEXT NOT NULL,
risk_level TEXT NOT NULL,
requires_approval BOOLEAN DEFAULT TRUE,
approver_role TEXT, -- 'admin', 'manager', 'any_user'
timeout_hours INTEGER DEFAULT 24,
auto_reject_on_timeout BOOLEAN DEFAULT TRUE
);
-- Audit log
CREATE TABLE approval_audit (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
approval_request_id UUID REFERENCES approval_requests(id),
event_type TEXT NOT NULL, -- 'created', 'approved', 'rejected', 'expired', 'executed'
event_data JSONB,
created_at TIMESTAMP DEFAULT NOW()
);Implementation: Python Example
1. Request Approval
import asyncio
from datetime import datetime, timedelta
class ApprovalWorkflow:
def __init__(self, db, notification_service):
self.db = db
self.notifier = notification_service
async def request_approval(
self,
org_id: str,
agent_id: str,
action_type: str,
action_params: dict,
justification: str,
requested_by: str
) -> dict:
# Classify risk
risk_level = self.classify_risk(action_type, action_params)
# Check if approval required
rule = self.db.get_approval_rule(org_id, action_type)
if not rule or not rule["requires_approval"]:
return {"requires_approval": False}
# Create approval request
expires_at = datetime.now() + timedelta(hours=rule["timeout_hours"])
approval_id = self.db.insert("approval_requests", {
"org_id": org_id,
"agent_id": agent_id,
"action_type": action_type,
"action_params": action_params,
"risk_level": risk_level,
"justification": justification,
"requested_by": requested_by,
"expires_at": expires_at
})
# Notify user
await self.notifier.send_approval_request(
approval_id=approval_id,
org_id=org_id,
action_type=action_type,
justification=justification
)
# Audit log
self.db.insert("approval_audit", {
"approval_request_id": approval_id,
"event_type": "created",
"event_data": {"action_type": action_type}
})
return {
"requires_approval": True,
"approval_id": approval_id,
"expires_at": expires_at
}
def classify_risk(self, action_type, params):
# Risk classification logic (see earlier example)
if action_type == "delete_data" and params.get("count", 0) > 1000:
return "high"
if action_type == "charge_payment" and params.get("amount", 0) > 1000:
return "critical"
return "medium"2. Wait for Response
async def wait_for_approval(self, approval_id: str, poll_interval: int = 5) -> dict:
"""
Poll database for approval response.
Returns when user approves/rejects or request expires.
"""
while True:
request = self.db.get("approval_requests", approval_id)
# Check if responded
if request["status"] in ["approved", "rejected"]:
return {
"status": request["status"],
"response_note": request["response_note"],
"responded_by": request["responded_by"]
}
# Check if expired
if datetime.now() > request["expires_at"]:
self.db.update("approval_requests", approval_id, {
"status": "expired"
})
self.db.insert("approval_audit", {
"approval_request_id": approval_id,
"event_type": "expired"
})
return {"status": "expired"}
# Wait and poll again
await asyncio.sleep(poll_interval)3. User Response Handler
def respond_to_approval(
self,
approval_id: str,
user_id: str,
decision: str, # 'approve' or 'reject'
note: str = None
) -> dict:
"""
User approves or rejects pending approval request.
"""
# Get request
request = self.db.get("approval_requests", approval_id)
if request["status"] != "pending":
raise ValueError(f"Request already {request['status']}")
# Check user has permission
rule = self.db.get_approval_rule(
request["org_id"],
request["action_type"]
)
if not self.user_can_approve(user_id, rule["approver_role"]):
raise PermissionError("User not authorized to approve this action")
# Update request
self.db.update("approval_requests", approval_id, {
"status": "approved" if decision == "approve" else "rejected",
"responded_by": user_id,
"responded_at": datetime.now(),
"response_note": note
})
# Audit log
self.db.insert("approval_audit", {
"approval_request_id": approval_id,
"event_type": decision + "d", # 'approved' or 'rejected'
"event_data": {"user_id": user_id, "note": note}
})
# Notify agent (via webhook, message queue, etc.)
self.notifier.notify_agent(approval_id, decision)
return {"status": decision + "d"}4. Execute with Approval
async def execute_with_approval(
self,
action_type: str,
action_params: dict,
agent_id: str,
org_id: str,
user_id: str,
justification: str
) -> dict:
"""
Execute action with approval workflow.
Pauses if approval required, executes immediately otherwise.
"""
# Request approval
approval = await self.request_approval(
org_id=org_id,
agent_id=agent_id,
action_type=action_type,
action_params=action_params,
justification=justification,
requested_by=user_id
)
# If no approval required, execute immediately
if not approval["requires_approval"]:
return await self.execute_action(action_type, action_params)
# Wait for approval
response = await self.wait_for_approval(approval["approval_id"])
if response["status"] == "approved":
# Execute action
result = await self.execute_action(action_type, action_params)
# Mark as executed
self.db.update("approval_requests", approval["approval_id"], {
"executed_at": datetime.now()
})
self.db.insert("approval_audit", {
"approval_request_id": approval["approval_id"],
"event_type": "executed",
"event_data": result
})
return {"status": "executed", "result": result}
elif response["status"] == "rejected":
return {
"status": "rejected",
"reason": response["response_note"]
}
else: # expired
return {"status": "expired", "message": "Approval request timed out"}
async def execute_action(self, action_type: str, params: dict) -> dict:
"""Execute the actual action (delete data, send email, etc.)"""
if action_type == "delete_data":
return await delete_records(params["table"], params["ids"])
elif action_type == "send_email":
return await send_email(params["to"], params["subject"], params["body"])
# ... other actionsReal-Time Notifications
WebSocket/SSE Pattern
# Server-side: Notify user via WebSocket
async def send_approval_request(self, approval_id, org_id, action_type, justification):
# Get users who can approve
approvers = self.db.get_approvers(org_id)
for approver in approvers:
# Send real-time notification
await self.websocket_manager.send_to_user(approver["id"], {
"type": "approval_request",
"approval_id": approval_id,
"action_type": action_type,
"justification": justification,
"expires_at": expires_at.isoformat()
})
# Also send email backup
await self.email_service.send(
to=approver["email"],
subject=f"Approval Required: {action_type}",
body=f"An AI agent is requesting approval to {action_type}.\n\n"
f"Reason: {justification}\n\n"
f"Review: https://app.company.com/approvals/{approval_id}"
)Client-Side (React)
// React component for approval requests
function ApprovalInbox() {
const [requests, setRequests] = useState([]);
useEffect(() => {
// Subscribe to real-time updates
const subscription = supabase
.channel('approvals')
.on('postgres_changes', {
event: 'INSERT',
schema: 'public',
table: 'approval_requests',
filter: `org_id=eq.${orgId}`
}, (payload) => {
setRequests(prev => [payload.new, ...prev]);
// Show browser notification
new Notification("Approval Required", {
body: `Agent requests permission to ${payload.new.action_type}`
});
})
.subscribe();
return () => subscription.unsubscribe();
}, []);
const handleApprove = async (approvalId) => {
await fetch(`/api/approvals/${approvalId}/approve`, {
method: 'POST',
body: JSON.stringify({ note: "Approved" })
});
};
return (
<div>
{requests.map(req => (
<ApprovalCard
key={req.id}
request={req}
onApprove={() => handleApprove(req.id)}
onReject={() => handleReject(req.id)}
/>
))}
</div>
);
}Production Patterns
Pattern 1: Two-Person Rule (Critical Actions)
For critical actions (large payments, production deployments), require two approvers.
class TwoPersonApproval:
async def request_approval(self, action, params):
approval_id = self.db.insert("approval_requests", {
"action": action,
"params": params,
"required_approvals": 2, # Need 2 approvals
"approvals_received": 0
})
return approval_id
def approve(self, approval_id, user_id):
request = self.db.get("approval_requests", approval_id)
# Check user hasn't already approved
approvers = request.get("approvers", [])
if user_id in approvers:
raise ValueError("User already approved")
approvers.append(user_id)
approvals_count = len(approvers)
self.db.update("approval_requests", approval_id, {
"approvers": approvers,
"approvals_received": approvals_count,
"status": "approved" if approvals_count >= 2 else "pending"
})
return approvals_count >= request["required_approvals"]Pattern 2: Escalation on Timeout
If primary approver doesn't respond, escalate to manager.
async def handle_timeout_escalation(self):
"""Background job to check for expired approvals and escalate"""
while True:
# Find approvals expiring soon (within 1 hour)
expiring = self.db.query("""
SELECT * FROM approval_requests
WHERE status = 'pending'
AND expires_at < NOW() + INTERVAL '1 hour'
AND escalated = FALSE
""")
for request in expiring:
# Escalate to manager
manager = self.db.get_user_manager(request["requested_by"])
await self.notifier.send_urgent_notification(
user_id=manager["id"],
message=f"Urgent: Approval needed (expires in 1 hour)",
approval_id=request["id"]
)
self.db.update("approval_requests", request["id"], {
"escalated": True
})
await asyncio.sleep(300) # Check every 5 minutesPattern 3: Conditional Auto-Approval
For trusted users or low-risk scenarios, auto-approve with audit trail.
def check_auto_approval(self, user_id, action_type, params):
"""
Auto-approve if user has sufficient trust level
"""
user = self.db.get_user(user_id)
# Check user trust score (based on history, role, tenure)
if user["trust_score"] >= 90:
if action_type == "delete_data" and params.get("count", 0) < 100:
# Auto-approve small deletions for trusted users
self.db.insert("approval_audit", {
"user_id": user_id,
"action": action_type,
"auto_approved": True,
"reason": f"User trust score {user['trust_score']}"
})
return True
return FalseSecurity Considerations
1. Authorization checks: Verify user has permission to approve before processing response.
2. Replay attack prevention: Approval response should be one-time use, can't be replayed.
def approve(self, approval_id, user_id, csrf_token):
# Verify CSRF token
if not self.verify_csrf(csrf_token, user_id):
raise SecurityError("Invalid CSRF token")
# Check approval still pending (not already used)
request = self.db.get("approval_requests", approval_id)
if request["status"] != "pending":
raise ValueError("Approval already processed")3. Audit logging: Log every approval event with timestamp, user ID, IP address.
4. Encryption: Encrypt sensitive approval data (e.g., payment details) at rest.
Frequently Asked Questions
What if the user never responds?
Set a timeout (expires_at). After timeout, either:
- Auto-reject (safe default)
- Escalate to manager
- Execute with elevated logging (if acceptable risk)
How do I handle urgent requests?
For time-sensitive actions:
- Shorter timeout (2 hours instead of 24)
- Multi-channel notification (email + SMS + push)
- Escalate immediately to on-call person
Can agents modify their own approval requests?
No. Agent creates request, then waits passively. Only humans can approve/reject. Prevents agent from approving its own actions.
What if approval requirements change mid-execution?
Re-check approval rules before execution:
# Check rules again at execution time
current_rule = self.db.get_approval_rule(org_id, action_type)
if current_rule["requires_approval"] and not is_approved:
raise SecurityError("Approval required (rules changed)")---
Bottom line: Human-in-the-loop approval workflows prevent catastrophic agent mistakes. Implement for all high-risk actions (deletion, payments, external comms). Use real-time notifications, timeout handling, and comprehensive audit trails. 84% of enterprise AI deployments use HITL for compliance.
Next: Read our Error Handling guide for handling failures gracefully.
More from the blog
OpenHelm vs runCLAUDErun: Which Claude Code Scheduler Is Right for You?
A direct comparison of the two most popular Claude Code schedulers, how each works, what each costs, and which fits your workflow.
Claude Code vs Cursor Pro: Real Developer Cost Comparison
An honest look at what developers actually spend on Claude Code, Cursor Pro, and GitHub Copilot, and how to get the most from each.
Stop doing the work around the work
OpenHelm connects to your tools, reads the context, and does the steps, so you sign off on the result instead of producing it. See how it covers an entire role’s weekly workload, check the pricing, or run it yourself with the free local app.