Skip to main content
Technology & EngineeringAi Agent Orchestration470 lines

agent-error-recovery

Handling failures in AI agent systems: retry strategies with backoff, fallback tools, graceful degradation, human-in-the-loop escalation, stuck-loop detection, and context recovery after crashes. Covers practical patterns for making agents robust against tool failures, API errors, and reasoning dead-ends.

Quick Summary3 lines
Make agents robust against failures: retries, fallbacks, escalation, and stuck detection.
skilldb get ai-agent-orchestration-skills/agent-error-recoveryFull skill: 470 lines
Paste into your CLAUDE.md or agent config

Agent Error Recovery

Make agents robust against failures: retries, fallbacks, escalation, and stuck detection.


Retry with Exponential Backoff

Tool calls fail. APIs time out. Wrap tool execution with retries.

import time
import random


def retry_with_backoff(func, max_retries: int = 3, base_delay: float = 1.0):
    """Retry a function with exponential backoff and jitter."""
    last_error = None

    for attempt in range(max_retries):
        try:
            return func()
        except Exception as e:
            last_error = e
            if attempt < max_retries - 1:
                delay = base_delay * (2 ** attempt) + random.uniform(0, 1)
                time.sleep(delay)

    return f"Failed after {max_retries} attempts. Last error: {last_error}"


def execute_tool_with_retry(name: str, inputs: dict) -> str:
    """Execute a tool with automatic retries."""
    def _call():
        result = execute_tool(name, inputs)
        if result.startswith("Error:"):
            raise RuntimeError(result)
        return result

    return retry_with_backoff(_call, max_retries=3)

Selective Retry Based on Error Type

RETRYABLE_ERRORS = {
    "TimeoutError",
    "ConnectionError",
    "RateLimitError",
    "ServiceUnavailableError",
}

NON_RETRYABLE_ERRORS = {
    "PermissionError",
    "FileNotFoundError",
    "ValidationError",
    "AuthenticationError",
}


def smart_retry(func, max_retries: int = 3) -> str:
    for attempt in range(max_retries):
        try:
            return func()
        except Exception as e:
            error_type = type(e).__name__

            if error_type in NON_RETRYABLE_ERRORS:
                return f"Non-retryable error: {error_type}: {e}"

            if error_type in RETRYABLE_ERRORS and attempt < max_retries - 1:
                delay = (2 ** attempt) + random.uniform(0, 0.5)
                time.sleep(delay)
                continue

            return f"Error after {attempt + 1} attempts: {error_type}: {e}"

    return "Exhausted retries."

Fallback Tools

When the primary tool fails, fall back to an alternative.

class ToolWithFallback:
    """Tool that tries alternatives on failure."""

    def __init__(self, primary: dict, fallbacks: list[dict]):
        self.primary = primary
        self.fallbacks = fallbacks

    def execute(self, inputs: dict) -> str:
        # Try primary
        result = execute_tool_safe(self.primary["name"], inputs)
        if not result.startswith("Error:"):
            return result

        # Try fallbacks in order
        for fallback in self.fallbacks:
            # Adapt inputs if the fallback has different parameters
            adapted_inputs = self._adapt_inputs(inputs, fallback)
            result = execute_tool_safe(fallback["name"], adapted_inputs)
            if not result.startswith("Error:"):
                return f"[via {fallback['name']}] {result}"

        return f"All tools failed for this operation."

    def _adapt_inputs(self, inputs: dict, fallback: dict) -> dict:
        """Map inputs from primary schema to fallback schema."""
        mapping = fallback.get("input_mapping", {})
        adapted = {}
        for key, value in inputs.items():
            mapped_key = mapping.get(key, key)
            adapted[mapped_key] = value
        return adapted


# Example: web search with fallback
search_tool = ToolWithFallback(
    primary={"name": "google_search"},
    fallbacks=[
        {"name": "bing_search", "input_mapping": {"query": "q"}},
        {"name": "duckduckgo_search", "input_mapping": {"query": "keywords"}},
    ],
)

Graceful Degradation

When capabilities fail, reduce scope rather than failing entirely.

def degrading_agent(task: str, tools: list[dict], system: str) -> str:
    """Agent that degrades gracefully when tools fail."""
    messages = [{"role": "user", "content": task}]
    failed_tools: set[str] = set()
    tool_failure_counts: dict[str, int] = {}

    for step in range(20):
        # Filter out repeatedly failing tools
        available_tools = [
            t for t in tools
            if t["name"] not in failed_tools
        ]

        if not available_tools:
            # All tools failed — ask model to answer from knowledge alone
            messages.append({
                "role": "user",
                "content": "All tools are unavailable. Please provide the best "
                           "answer you can from your training knowledge alone. "
                           "Clearly note that you could not verify with tools.",
            })
            response = client.messages.create(
                model="claude-sonnet-4-20250514",
                max_tokens=4096,
                system=system,
                messages=messages,
            )
            return extract_text(response)

        response = client.messages.create(
            model="claude-sonnet-4-20250514",
            max_tokens=4096,
            system=system,
            tools=available_tools,
            messages=messages,
        )

        if response.stop_reason == "end_turn":
            return extract_text(response)

        messages.append({"role": "assistant", "content": response.content})

        tool_results = []
        for block in response.content:
            if block.type == "tool_use":
                result = execute_tool_safe(block.name, block.input)

                if result.startswith("Error:"):
                    tool_failure_counts[block.name] = tool_failure_counts.get(block.name, 0) + 1

                    if tool_failure_counts[block.name] >= 3:
                        failed_tools.add(block.name)
                        result += f"\n\nNote: Tool '{block.name}' has been disabled due to repeated failures."

                tool_results.append({
                    "type": "tool_result",
                    "tool_use_id": block.id,
                    "content": result,
                })

        messages.append({"role": "user", "content": tool_results})

    return "Agent reached step limit."

Human-in-the-Loop Escalation

Agents should escalate to humans when they are stuck or when actions are high-risk.

class EscalationManager:
    """Manage human-in-the-loop escalation for agents."""

    def __init__(self, auto_approve: list[str] = None,
                 always_escalate: list[str] = None):
        self.auto_approve = set(auto_approve or [])
        self.always_escalate = set(always_escalate or [])
        self.pending_approvals: list[dict] = []

    def check_action(self, tool_name: str, inputs: dict) -> dict:
        """Check if an action needs human approval."""
        if tool_name in self.auto_approve:
            return {"approved": True}

        if tool_name in self.always_escalate:
            return self._request_approval(tool_name, inputs,
                                          reason="This action always requires approval.")

        # Check for high-risk patterns
        risk = self._assess_risk(tool_name, inputs)
        if risk == "high":
            return self._request_approval(tool_name, inputs,
                                          reason=f"High-risk action detected.")

        return {"approved": True}

    def _assess_risk(self, tool_name: str, inputs: dict) -> str:
        """Simple risk assessment based on action type and inputs."""
        high_risk_tools = {"delete_file", "drop_table", "send_email",
                           "deploy", "process_payment"}
        if tool_name in high_risk_tools:
            return "high"

        # Check for destructive patterns in commands
        if tool_name == "run_command":
            cmd = inputs.get("command", "")
            dangerous = ["rm -rf", "drop ", "delete ", "format ", "shutdown"]
            if any(d in cmd.lower() for d in dangerous):
                return "high"

        return "low"

    def _request_approval(self, tool_name: str, inputs: dict,
                          reason: str) -> dict:
        """Request human approval (blocking)."""
        print(f"\n{'='*60}")
        print(f"AGENT REQUESTS APPROVAL")
        print(f"Tool: {tool_name}")
        print(f"Inputs: {inputs}")
        print(f"Reason: {reason}")
        print(f"{'='*60}")

        while True:
            answer = input("Approve? (yes/no/modify): ").strip().lower()
            if answer == "yes":
                return {"approved": True}
            elif answer == "no":
                return {"approved": False, "reason": "Human denied the action."}
            elif answer == "modify":
                new_input = input("Enter modified inputs (JSON): ")
                import json
                return {"approved": True, "modified_inputs": json.loads(new_input)}


# Integration with agent loop
escalation = EscalationManager(
    auto_approve=["read_file", "search_web", "calculate"],
    always_escalate=["deploy", "send_email", "process_payment"],
)


def execute_with_escalation(name: str, inputs: dict) -> str:
    check = escalation.check_action(name, inputs)

    if not check.get("approved"):
        return f"Action denied: {check.get('reason', 'No reason given')}"

    final_inputs = check.get("modified_inputs", inputs)
    return execute_tool(name, final_inputs)

Stuck Detection

Detect when an agent is looping without making progress.

class StuckDetector:
    """Detect when an agent is stuck in a loop."""

    def __init__(self, max_repeated_tools: int = 3,
                 max_similar_errors: int = 3):
        self.tool_history: list[tuple[str, str]] = []  # (tool_name, input_hash)
        self.error_history: list[str] = []
        self.max_repeated = max_repeated_tools
        self.max_similar_errors = max_similar_errors

    def record_action(self, tool_name: str, inputs: dict, result: str):
        import hashlib, json
        input_hash = hashlib.md5(json.dumps(inputs, sort_keys=True).encode()).hexdigest()[:8]
        self.tool_history.append((tool_name, input_hash))

        if result.startswith("Error:"):
            self.error_history.append(result[:100])

    def is_stuck(self) -> tuple[bool, str]:
        """Check if the agent appears to be stuck."""
        # Check for repeated identical tool calls
        if len(self.tool_history) >= self.max_repeated:
            recent = self.tool_history[-self.max_repeated:]
            if len(set(recent)) == 1:
                return True, f"Repeated same tool call {self.max_repeated} times: {recent[0][0]}"

        # Check for repeated errors
        if len(self.error_history) >= self.max_similar_errors:
            recent_errors = self.error_history[-self.max_similar_errors:]
            if len(set(recent_errors)) == 1:
                return True, f"Same error {self.max_similar_errors} times: {recent_errors[0]}"

        # Check for oscillating between two actions
        if len(self.tool_history) >= 6:
            last_6 = self.tool_history[-6:]
            evens = set(last_6[::2])
            odds = set(last_6[1::2])
            if len(evens) == 1 and len(odds) == 1 and evens != odds:
                return True, "Oscillating between two tool calls."

        return False, ""


def agent_with_stuck_detection(task: str, tools: list[dict]) -> str:
    """Agent loop with stuck detection and recovery."""
    messages = [{"role": "user", "content": task}]
    detector = StuckDetector()

    for step in range(20):
        stuck, reason = detector.is_stuck()
        if stuck:
            messages.append({
                "role": "user",
                "content": f"You appear to be stuck: {reason}\n\n"
                           f"Please try a completely different approach. "
                           f"If you cannot make progress, explain what is blocking you.",
            })

        response = client.messages.create(
            model="claude-sonnet-4-20250514",
            max_tokens=4096,
            tools=tools,
            messages=messages,
        )

        if response.stop_reason == "end_turn":
            return extract_text(response)

        messages.append({"role": "assistant", "content": response.content})

        tool_results = []
        for block in response.content:
            if block.type == "tool_use":
                result = execute_tool(block.name, block.input)
                detector.record_action(block.name, block.input, result)
                tool_results.append({
                    "type": "tool_result",
                    "tool_use_id": block.id,
                    "content": result,
                })

        messages.append({"role": "user", "content": tool_results})

    return "Max steps reached."

Context Recovery After Crashes

Save agent state so you can resume after crashes.

import json
from pathlib import Path


class CheckpointManager:
    """Save and restore agent state for crash recovery."""

    def __init__(self, checkpoint_dir: str = ".agent_checkpoints"):
        self.dir = Path(checkpoint_dir)
        self.dir.mkdir(parents=True, exist_ok=True)

    def save(self, agent_id: str, state: dict):
        path = self.dir / f"{agent_id}.json"
        # Convert content blocks to serializable form
        serializable = self._make_serializable(state)
        path.write_text(json.dumps(serializable, indent=2))

    def load(self, agent_id: str) -> dict | None:
        path = self.dir / f"{agent_id}.json"
        if not path.exists():
            return None
        return json.loads(path.read_text())

    def _make_serializable(self, obj):
        """Convert API objects to dicts for JSON serialization."""
        if hasattr(obj, "model_dump"):
            return obj.model_dump()
        if isinstance(obj, list):
            return [self._make_serializable(i) for i in obj]
        if isinstance(obj, dict):
            return {k: self._make_serializable(v) for k, v in obj.items()}
        return obj


def resumable_agent(task: str, agent_id: str, tools: list[dict]) -> str:
    """Agent that can resume from checkpoints after crashes."""
    checkpoints = CheckpointManager()

    # Try to resume from checkpoint
    saved = checkpoints.load(agent_id)
    if saved:
        messages = saved["messages"]
        step = saved["step"]
        print(f"Resumed from checkpoint at step {step}")
    else:
        messages = [{"role": "user", "content": task}]
        step = 0

    for _ in range(20 - step):
        step += 1

        response = client.messages.create(
            model="claude-sonnet-4-20250514",
            max_tokens=4096,
            tools=tools,
            messages=messages,
        )

        if response.stop_reason == "end_turn":
            checkpoints.save(agent_id, {"messages": messages, "step": step, "status": "completed"})
            return extract_text(response)

        messages.append({"role": "assistant", "content": response.content})
        tool_results = execute_all_tools(response)
        messages.append({"role": "user", "content": tool_results})

        # Checkpoint after every step
        checkpoints.save(agent_id, {"messages": messages, "step": step, "status": "running"})

    return "Max steps reached."

Build error recovery incrementally: start with basic retries, add stuck detection when you observe looping in production, and add human escalation for high-stakes actions. Over-engineering error handling before you understand your failure modes wastes time.

Install this skill directly: skilldb add ai-agent-orchestration-skills

Get CLI access →

Related Skills

agent-architecture

Core patterns for building AI agent systems: the observe-think-act loop, ReAct pattern implementation, tool-use cycles, memory systems (short-term and long-term), and planning strategies. Covers how to structure an agent's main loop, manage state between iterations, and wire together perception, reasoning, and action into a reliable autonomous system.

Ai Agent Orchestration368L

agent-evaluation

Testing and evaluating AI agents: trajectory evaluation, task completion metrics, tool-use accuracy measurement, regression testing, benchmark suites, and A/B testing agent configurations. Covers practical approaches to measuring whether agents are working correctly and improving over time.

Ai Agent Orchestration553L

agent-frameworks

Comparison of major AI agent frameworks: LangGraph, CrewAI, AutoGen, Semantic Kernel, and Claude Agent SDK. Covers when to use each framework, their trade-offs, core patterns, practical setup examples, and migration strategies between frameworks.

Ai Agent Orchestration433L

agent-guardrails

Safety and control systems for AI agents: input and output validation, action authorization, rate limiting, cost controls, content filtering, scope restriction, and audit logging. Covers practical implementations for keeping agents within bounds while maintaining their usefulness.

Ai Agent Orchestration564L

agent-memory

Memory systems for AI agents: conversation history management, summarization strategies, vector-based long-term memory, entity memory, episodic memory, and memory retrieval patterns. Covers practical implementations for giving agents persistent, searchable memory across sessions and within long-running tasks.

Ai Agent Orchestration443L

agent-planning

Planning strategies for AI agents: chain-of-thought prompting, tree-of-thought exploration, plan-and-execute patterns, iterative refinement, task decomposition, and goal tracking. Covers practical implementations that make agents more reliable at complex, multi-step tasks by thinking before acting.

Ai Agent Orchestration459L