Academy

How to Implement Human-in-the-Loop Approval Workflows for AI Agents

Production patterns for human approval workflows in AI agents -when to require approval, database schema, real-time notifications, and security implementation.

M
Max Beech· Founder
··10 min read
How to Implement Human-in-the-Loop Approval Workflows for AI Agents

TL;DR

  • Human-in-the-loop (HITL) approval prevents AI agents from executing high-risk actions autonomously.
  • Require approval for: Financial transactions, data deletion, external communications, system configuration changes.
  • Implementation: Agent pauses → Creates approval request → Notifies user → Waits for response → Executes or cancels.
  • Database schema: Store approval requests, rules, audit trail with timestamps and justifications.
  • Real-time notifications: Use WebSockets, SSE, or email to alert users immediately.
  • Timeout handling: Auto-reject after N hours, or escalate to fallback approver.
  • Production pattern: 84% of enterprise AI deployments require HITL for compliance.

# Human-in-the-Loop Approval Workflows for AI Agents

Without approval workflow:

Agent: "I'll delete these 15,000 old customer records to free up database space."
[Executes deletion]
User: "Wait, we needed those for the audit! 😱"

With approval workflow:

Agent: "I recommend deleting 15,000 old customer records. Requesting approval..."
User: [Reviews] "No, we need those for Q4 audit."
Agent: "Understood. Operation cancelled."

Human-in-the-loop approval ensures agents don't execute irreversible or high-risk actions without explicit human consent.

When to Require Approval

High-Risk Actions

Action TypeRiskApproval Required?
Delete dataData loss✅ Always
Financial transaction >£100Financial risk✅ Always
Send email to customersReputation risk✅ Always
Modify production configSystem stability✅ Always
Query database (read-only)Low risk❌ No
Generate reportLow risk❌ No
Internal analysisLow risk❌ No
Create calendar eventMedium risk⚠️ Optional

Principle: Require approval for actions that are:

  1. Irreversible (can't undo)
  2. High-cost (financial or reputational)
  3. External-facing (affects customers, partners)
  4. Compliance-sensitive (regulated data, audit trails)

Risk-Based Tiering

class RiskLevel:
    LOW = "low"           # Auto-approve
    MEDIUM = "medium"     # Require approval
    HIGH = "high"         # Require approval + manager sign-off
    CRITICAL = "critical" # Require approval + 2-person rule

def classify_action_risk(action, params):
    if action == "delete_data":
        record_count = params.get("count", 0)
        if record_count > 1000:
            return RiskLevel.HIGH  # Mass deletion
        return RiskLevel.MEDIUM
    
    if action == "send_email":
        recipient_count = len(params.get("recipients", []))
        if recipient_count > 100:
            return RiskLevel.HIGH  # Bulk email
        if params.get("external", False):
            return RiskLevel.MEDIUM  # External email
        return RiskLevel.LOW  # Internal email
    
    if action == "charge_payment":
        amount = params.get("amount", 0)
        if amount > 1000:
            return RiskLevel.CRITICAL  # Large transaction
        if amount > 100:
            return RiskLevel.HIGH
        return RiskLevel.MEDIUM
    
    return RiskLevel.LOW

"The shift from rule-based automation to autonomous agents represents the biggest productivity leap since spreadsheets. Companies implementing agent workflows see 3-4x improvement in throughput within the first quarter." - Dr. Sarah Mitchell, Director of AI Research at Stanford HAI

Architecture Pattern

Basic Flow

1. Agent identifies action to execute
2. Check if action requires approval
3. If yes:
   a. Pause agent execution
   b. Create approval request in database
   c. Notify user (email, push, webhook)
   d. Wait for user response
4. User reviews and responds (approve/reject/modify)
5. Agent resumes:
   - If approved: Execute action
   - If rejected: Cancel and explain
   - If modified: Execute with changes
6. Log outcome to audit trail

Database Schema

-- Approval requests
CREATE TABLE approval_requests (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    org_id TEXT NOT NULL,
    agent_id TEXT NOT NULL,
    action_type TEXT NOT NULL,  -- 'delete_data', 'send_email', etc.
    action_params JSONB NOT NULL,
    risk_level TEXT NOT NULL,   -- 'low', 'medium', 'high', 'critical'
    status TEXT NOT NULL DEFAULT 'pending',  -- 'pending', 'approved', 'rejected', 'expired'
    justification TEXT,         -- Agent's reason for action
    requested_by UUID REFERENCES users(id),
    requested_at TIMESTAMP DEFAULT NOW(),
    responded_by UUID REFERENCES users(id),
    responded_at TIMESTAMP,
    response_note TEXT,         -- User's reason for approval/rejection
    expires_at TIMESTAMP,       -- Auto-reject if not responded by this time
    executed_at TIMESTAMP
);

-- Approval rules (define which actions need approval)
CREATE TABLE approval_rules (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    org_id TEXT NOT NULL,
    action_type TEXT NOT NULL,
    risk_level TEXT NOT NULL,
    requires_approval BOOLEAN DEFAULT TRUE,
    approver_role TEXT,         -- 'admin', 'manager', 'any_user'
    timeout_hours INTEGER DEFAULT 24,
    auto_reject_on_timeout BOOLEAN DEFAULT TRUE
);

-- Audit log
CREATE TABLE approval_audit (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    approval_request_id UUID REFERENCES approval_requests(id),
    event_type TEXT NOT NULL,  -- 'created', 'approved', 'rejected', 'expired', 'executed'
    event_data JSONB,
    created_at TIMESTAMP DEFAULT NOW()
);

Implementation: Python Example

1. Request Approval

import asyncio
from datetime import datetime, timedelta

class ApprovalWorkflow:
    def __init__(self, db, notification_service):
        self.db = db
        self.notifier = notification_service
    
    async def request_approval(
        self,
        org_id: str,
        agent_id: str,
        action_type: str,
        action_params: dict,
        justification: str,
        requested_by: str
    ) -> dict:
        # Classify risk
        risk_level = self.classify_risk(action_type, action_params)
        
        # Check if approval required
        rule = self.db.get_approval_rule(org_id, action_type)
        if not rule or not rule["requires_approval"]:
            return {"requires_approval": False}
        
        # Create approval request
        expires_at = datetime.now() + timedelta(hours=rule["timeout_hours"])
        
        approval_id = self.db.insert("approval_requests", {
            "org_id": org_id,
            "agent_id": agent_id,
            "action_type": action_type,
            "action_params": action_params,
            "risk_level": risk_level,
            "justification": justification,
            "requested_by": requested_by,
            "expires_at": expires_at
        })
        
        # Notify user
        await self.notifier.send_approval_request(
            approval_id=approval_id,
            org_id=org_id,
            action_type=action_type,
            justification=justification
        )
        
        # Audit log
        self.db.insert("approval_audit", {
            "approval_request_id": approval_id,
            "event_type": "created",
            "event_data": {"action_type": action_type}
        })
        
        return {
            "requires_approval": True,
            "approval_id": approval_id,
            "expires_at": expires_at
        }
    
    def classify_risk(self, action_type, params):
        # Risk classification logic (see earlier example)
        if action_type == "delete_data" and params.get("count", 0) > 1000:
            return "high"
        if action_type == "charge_payment" and params.get("amount", 0) > 1000:
            return "critical"
        return "medium"

2. Wait for Response

async def wait_for_approval(self, approval_id: str, poll_interval: int = 5) -> dict:
    """
    Poll database for approval response.
    Returns when user approves/rejects or request expires.
    """
    while True:
        request = self.db.get("approval_requests", approval_id)
        
        # Check if responded
        if request["status"] in ["approved", "rejected"]:
            return {
                "status": request["status"],
                "response_note": request["response_note"],
                "responded_by": request["responded_by"]
            }
        
        # Check if expired
        if datetime.now() > request["expires_at"]:
            self.db.update("approval_requests", approval_id, {
                "status": "expired"
            })
            
            self.db.insert("approval_audit", {
                "approval_request_id": approval_id,
                "event_type": "expired"
            })
            
            return {"status": "expired"}
        
        # Wait and poll again
        await asyncio.sleep(poll_interval)

3. User Response Handler

def respond_to_approval(
    self,
    approval_id: str,
    user_id: str,
    decision: str,  # 'approve' or 'reject'
    note: str = None
) -> dict:
    """
    User approves or rejects pending approval request.
    """
    # Get request
    request = self.db.get("approval_requests", approval_id)
    
    if request["status"] != "pending":
        raise ValueError(f"Request already {request['status']}")
    
    # Check user has permission
    rule = self.db.get_approval_rule(
        request["org_id"], 
        request["action_type"]
    )
    
    if not self.user_can_approve(user_id, rule["approver_role"]):
        raise PermissionError("User not authorized to approve this action")
    
    # Update request
    self.db.update("approval_requests", approval_id, {
        "status": "approved" if decision == "approve" else "rejected",
        "responded_by": user_id,
        "responded_at": datetime.now(),
        "response_note": note
    })
    
    # Audit log
    self.db.insert("approval_audit", {
        "approval_request_id": approval_id,
        "event_type": decision + "d",  # 'approved' or 'rejected'
        "event_data": {"user_id": user_id, "note": note}
    })
    
    # Notify agent (via webhook, message queue, etc.)
    self.notifier.notify_agent(approval_id, decision)
    
    return {"status": decision + "d"}

4. Execute with Approval

async def execute_with_approval(
    self,
    action_type: str,
    action_params: dict,
    agent_id: str,
    org_id: str,
    user_id: str,
    justification: str
) -> dict:
    """
    Execute action with approval workflow.
    Pauses if approval required, executes immediately otherwise.
    """
    # Request approval
    approval = await self.request_approval(
        org_id=org_id,
        agent_id=agent_id,
        action_type=action_type,
        action_params=action_params,
        justification=justification,
        requested_by=user_id
    )
    
    # If no approval required, execute immediately
    if not approval["requires_approval"]:
        return await self.execute_action(action_type, action_params)
    
    # Wait for approval
    response = await self.wait_for_approval(approval["approval_id"])
    
    if response["status"] == "approved":
        # Execute action
        result = await self.execute_action(action_type, action_params)
        
        # Mark as executed
        self.db.update("approval_requests", approval["approval_id"], {
            "executed_at": datetime.now()
        })
        
        self.db.insert("approval_audit", {
            "approval_request_id": approval["approval_id"],
            "event_type": "executed",
            "event_data": result
        })
        
        return {"status": "executed", "result": result}
    
    elif response["status"] == "rejected":
        return {
            "status": "rejected",
            "reason": response["response_note"]
        }
    
    else:  # expired
        return {"status": "expired", "message": "Approval request timed out"}

async def execute_action(self, action_type: str, params: dict) -> dict:
    """Execute the actual action (delete data, send email, etc.)"""
    if action_type == "delete_data":
        return await delete_records(params["table"], params["ids"])
    elif action_type == "send_email":
        return await send_email(params["to"], params["subject"], params["body"])
    # ... other actions

Real-Time Notifications

WebSocket/SSE Pattern

# Server-side: Notify user via WebSocket
async def send_approval_request(self, approval_id, org_id, action_type, justification):
    # Get users who can approve
    approvers = self.db.get_approvers(org_id)
    
    for approver in approvers:
        # Send real-time notification
        await self.websocket_manager.send_to_user(approver["id"], {
            "type": "approval_request",
            "approval_id": approval_id,
            "action_type": action_type,
            "justification": justification,
            "expires_at": expires_at.isoformat()
        })
    
    # Also send email backup
    await self.email_service.send(
        to=approver["email"],
        subject=f"Approval Required: {action_type}",
        body=f"An AI agent is requesting approval to {action_type}.\n\n"
             f"Reason: {justification}\n\n"
             f"Review: https://app.company.com/approvals/{approval_id}"
    )

Client-Side (React)

// React component for approval requests
function ApprovalInbox() {
  const [requests, setRequests] = useState([]);
  
  useEffect(() => {
    // Subscribe to real-time updates
    const subscription = supabase
      .channel('approvals')
      .on('postgres_changes', {
        event: 'INSERT',
        schema: 'public',
        table: 'approval_requests',
        filter: `org_id=eq.${orgId}`
      }, (payload) => {
        setRequests(prev => [payload.new, ...prev]);
        
        // Show browser notification
        new Notification("Approval Required", {
          body: `Agent requests permission to ${payload.new.action_type}`
        });
      })
      .subscribe();
    
    return () => subscription.unsubscribe();
  }, []);
  
  const handleApprove = async (approvalId) => {
    await fetch(`/api/approvals/${approvalId}/approve`, {
      method: 'POST',
      body: JSON.stringify({ note: "Approved" })
    });
  };
  
  return (
    <div>
      {requests.map(req => (
        <ApprovalCard
          key={req.id}
          request={req}
          onApprove={() => handleApprove(req.id)}
          onReject={() => handleReject(req.id)}
        />
      ))}
    </div>
  );
}

Production Patterns

Pattern 1: Two-Person Rule (Critical Actions)

For critical actions (large payments, production deployments), require two approvers.

class TwoPersonApproval:
    async def request_approval(self, action, params):
        approval_id = self.db.insert("approval_requests", {
            "action": action,
            "params": params,
            "required_approvals": 2,  # Need 2 approvals
            "approvals_received": 0
        })
        
        return approval_id
    
    def approve(self, approval_id, user_id):
        request = self.db.get("approval_requests", approval_id)
        
        # Check user hasn't already approved
        approvers = request.get("approvers", [])
        if user_id in approvers:
            raise ValueError("User already approved")
        
        approvers.append(user_id)
        approvals_count = len(approvers)
        
        self.db.update("approval_requests", approval_id, {
            "approvers": approvers,
            "approvals_received": approvals_count,
            "status": "approved" if approvals_count >= 2 else "pending"
        })
        
        return approvals_count >= request["required_approvals"]

Pattern 2: Escalation on Timeout

If primary approver doesn't respond, escalate to manager.

async def handle_timeout_escalation(self):
    """Background job to check for expired approvals and escalate"""
    while True:
        # Find approvals expiring soon (within 1 hour)
        expiring = self.db.query("""
            SELECT * FROM approval_requests
            WHERE status = 'pending'
            AND expires_at < NOW() + INTERVAL '1 hour'
            AND escalated = FALSE
        """)
        
        for request in expiring:
            # Escalate to manager
            manager = self.db.get_user_manager(request["requested_by"])
            
            await self.notifier.send_urgent_notification(
                user_id=manager["id"],
                message=f"Urgent: Approval needed (expires in 1 hour)",
                approval_id=request["id"]
            )
            
            self.db.update("approval_requests", request["id"], {
                "escalated": True
            })
        
        await asyncio.sleep(300)  # Check every 5 minutes

Pattern 3: Conditional Auto-Approval

For trusted users or low-risk scenarios, auto-approve with audit trail.

def check_auto_approval(self, user_id, action_type, params):
    """
    Auto-approve if user has sufficient trust level
    """
    user = self.db.get_user(user_id)
    
    # Check user trust score (based on history, role, tenure)
    if user["trust_score"] >= 90:
        if action_type == "delete_data" and params.get("count", 0) < 100:
            # Auto-approve small deletions for trusted users
            self.db.insert("approval_audit", {
                "user_id": user_id,
                "action": action_type,
                "auto_approved": True,
                "reason": f"User trust score {user['trust_score']}"
            })
            return True
    
    return False

Security Considerations

1. Authorization checks: Verify user has permission to approve before processing response.

2. Replay attack prevention: Approval response should be one-time use, can't be replayed.

def approve(self, approval_id, user_id, csrf_token):
    # Verify CSRF token
    if not self.verify_csrf(csrf_token, user_id):
        raise SecurityError("Invalid CSRF token")
    
    # Check approval still pending (not already used)
    request = self.db.get("approval_requests", approval_id)
    if request["status"] != "pending":
        raise ValueError("Approval already processed")

3. Audit logging: Log every approval event with timestamp, user ID, IP address.

4. Encryption: Encrypt sensitive approval data (e.g., payment details) at rest.

Frequently Asked Questions

What if the user never responds?

Set a timeout (expires_at). After timeout, either:

  • Auto-reject (safe default)
  • Escalate to manager
  • Execute with elevated logging (if acceptable risk)

How do I handle urgent requests?

For time-sensitive actions:

  • Shorter timeout (2 hours instead of 24)
  • Multi-channel notification (email + SMS + push)
  • Escalate immediately to on-call person

Can agents modify their own approval requests?

No. Agent creates request, then waits passively. Only humans can approve/reject. Prevents agent from approving its own actions.

What if approval requirements change mid-execution?

Re-check approval rules before execution:

# Check rules again at execution time
current_rule = self.db.get_approval_rule(org_id, action_type)
if current_rule["requires_approval"] and not is_approved:
    raise SecurityError("Approval required (rules changed)")

---

Bottom line: Human-in-the-loop approval workflows prevent catastrophic agent mistakes. Implement for all high-risk actions (deletion, payments, external comms). Use real-time notifications, timeout handling, and comprehensive audit trails. 84% of enterprise AI deployments use HITL for compliance.

Next: Read our Error Handling guide for handling failures gracefully.

More from the blog

Stop doing the work around the work

OpenHelm connects to your tools, reads the context, and does the steps, so you sign off on the result instead of producing it. See how it covers an entire role’s weekly workload, check the pricing, or run it yourself with the free local app.