Skip to main content
Technology & EngineeringAi Agent Orchestration564 lines

agent-guardrails

Safety and control systems for AI agents: input and output validation, action authorization, rate limiting, cost controls, content filtering, scope restriction, and audit logging. Covers practical implementations for keeping agents within bounds while maintaining their usefulness.

Quick Summary3 lines
Keep agents safe and controlled: validate inputs and outputs, authorize actions, limit costs, and maintain audit trails.
skilldb get ai-agent-orchestration-skills/agent-guardrailsFull skill: 564 lines
Paste into your CLAUDE.md or agent config

Agent Guardrails

Keep agents safe and controlled: validate inputs and outputs, authorize actions, limit costs, and maintain audit trails.


Input Validation

Validate what goes into the agent before it starts processing.

import re
from dataclasses import dataclass


@dataclass
class ValidationResult:
    valid: bool
    reason: str = ""
    sanitized: str = ""


class InputValidator:
    """Validate and sanitize agent inputs."""

    def __init__(self):
        self.max_input_length = 50000
        self.blocked_patterns = [
            r"ignore previous instructions",
            r"ignore all previous",
            r"disregard your instructions",
            r"you are now",
            r"pretend you are",
            r"jailbreak",
            r"do anything now",
        ]

    def validate(self, user_input: str) -> ValidationResult:
        # Length check
        if len(user_input) > self.max_input_length:
            return ValidationResult(
                valid=False,
                reason=f"Input exceeds maximum length of {self.max_input_length} characters.",
            )

        # Prompt injection detection (basic pattern matching)
        input_lower = user_input.lower()
        for pattern in self.blocked_patterns:
            if re.search(pattern, input_lower):
                return ValidationResult(
                    valid=False,
                    reason=f"Input contains a blocked pattern.",
                )

        # Sanitize: remove null bytes and control characters
        sanitized = re.sub(r"[\x00-\x08\x0b\x0c\x0e-\x1f]", "", user_input)

        return ValidationResult(valid=True, sanitized=sanitized)


validator = InputValidator()

def safe_agent_entry(task: str) -> str:
    result = validator.validate(task)
    if not result.valid:
        return f"Input rejected: {result.reason}"
    return run_agent(result.sanitized)

Output Validation

Check what the agent produces before returning it to the user.

class OutputValidator:
    """Validate agent outputs before they reach the user."""

    def __init__(self):
        self.pii_patterns = {
            "ssn": r"\b\d{3}-\d{2}-\d{4}\b",
            "credit_card": r"\b\d{4}[\s-]?\d{4}[\s-]?\d{4}[\s-]?\d{4}\b",
            "email": r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b",
            "phone": r"\b\d{3}[-.]?\d{3}[-.]?\d{4}\b",
        }
        self.blocked_content = [
            r"(?i)api[_-]?key\s*[:=]\s*\S+",
            r"(?i)password\s*[:=]\s*\S+",
            r"(?i)secret\s*[:=]\s*\S+",
            r"(?i)token\s*[:=]\s*['\"]?\S+",
        ]

    def validate(self, output: str) -> ValidationResult:
        issues = []

        # Check for PII
        for pii_type, pattern in self.pii_patterns.items():
            if re.search(pattern, output):
                issues.append(f"Potential {pii_type} detected in output.")

        # Check for leaked secrets
        for pattern in self.blocked_content:
            if re.search(pattern, output):
                issues.append("Potential secret/credential in output.")

        if issues:
            return ValidationResult(
                valid=False,
                reason="; ".join(issues),
            )

        return ValidationResult(valid=True, sanitized=output)

    def redact(self, output: str) -> str:
        """Redact sensitive patterns from output."""
        redacted = output
        for pii_type, pattern in self.pii_patterns.items():
            redacted = re.sub(pattern, f"[REDACTED-{pii_type.upper()}]", redacted)
        for pattern in self.blocked_content:
            redacted = re.sub(pattern, "[REDACTED-SECRET]", redacted)
        return redacted


output_validator = OutputValidator()

def validate_and_return(agent_output: str) -> str:
    result = output_validator.validate(agent_output)
    if not result.valid:
        # Redact and return with warning
        redacted = output_validator.redact(agent_output)
        return f"[Warning: {result.reason}]\n\n{redacted}"
    return agent_output

Action Authorization

Control which tools the agent can use and with what parameters.

class ActionAuthorizer:
    """Authorize agent actions based on policies."""

    def __init__(self):
        self.policies: list[dict] = []

    def add_policy(self, tool_name: str, rule: str, check_fn):
        """Add an authorization policy for a tool."""
        self.policies.append({
            "tool": tool_name,
            "rule": rule,
            "check": check_fn,
        })

    def authorize(self, tool_name: str, inputs: dict) -> tuple[bool, str]:
        """Check all applicable policies. All must pass."""
        for policy in self.policies:
            if policy["tool"] == "*" or policy["tool"] == tool_name:
                allowed, reason = policy["check"](tool_name, inputs)
                if not allowed:
                    return False, f"Policy '{policy['rule']}' denied: {reason}"
        return True, "Authorized"


# Define policies
authorizer = ActionAuthorizer()

# Policy: restrict file access to project directory
authorizer.add_policy(
    "read_file",
    "file_scope",
    lambda tool, inputs: (
        (True, "") if inputs.get("path", "").startswith("/project/")
        else (False, f"Access denied: path must be under /project/")
    ),
)

authorizer.add_policy(
    "write_file",
    "file_scope",
    lambda tool, inputs: (
        (True, "") if inputs.get("path", "").startswith("/project/")
        else (False, f"Write denied: path must be under /project/")
    ),
)

# Policy: block dangerous commands
authorizer.add_policy(
    "run_command",
    "no_destructive_commands",
    lambda tool, inputs: (
        (False, "Destructive command blocked")
        if any(d in inputs.get("command", "") for d in ["rm -rf", "mkfs", "dd if=", "> /dev/"])
        else (True, "")
    ),
)

# Policy: block network access for certain tools
authorizer.add_policy(
    "run_command",
    "no_network_exfiltration",
    lambda tool, inputs: (
        (False, "Network access not allowed in commands")
        if any(n in inputs.get("command", "") for n in ["curl", "wget", "nc ", "ssh "])
        else (True, "")
    ),
)


def authorized_execute(name: str, inputs: dict) -> str:
    allowed, reason = authorizer.authorize(name, inputs)
    if not allowed:
        return f"Denied: {reason}"
    return execute_tool(name, inputs)

Rate Limiting

Prevent agents from making too many API calls or tool executions.

import time
from collections import deque


class RateLimiter:
    """Rate limit agent actions by tool and globally."""

    def __init__(self, global_rpm: int = 60, per_tool_rpm: dict[str, int] = None):
        self.global_rpm = global_rpm
        self.per_tool_rpm = per_tool_rpm or {}
        self.global_timestamps: deque = deque()
        self.tool_timestamps: dict[str, deque] = {}

    def check(self, tool_name: str) -> tuple[bool, float]:
        """Check if action is allowed. Returns (allowed, wait_seconds)."""
        now = time.time()

        # Clean old entries (older than 60 seconds)
        while self.global_timestamps and now - self.global_timestamps[0] > 60:
            self.global_timestamps.popleft()

        # Global rate check
        if len(self.global_timestamps) >= self.global_rpm:
            wait = 60 - (now - self.global_timestamps[0])
            return False, wait

        # Per-tool rate check
        if tool_name in self.per_tool_rpm:
            if tool_name not in self.tool_timestamps:
                self.tool_timestamps[tool_name] = deque()
            ts = self.tool_timestamps[tool_name]
            while ts and now - ts[0] > 60:
                ts.popleft()
            if len(ts) >= self.per_tool_rpm[tool_name]:
                wait = 60 - (now - ts[0])
                return False, wait

        return True, 0

    def record(self, tool_name: str):
        now = time.time()
        self.global_timestamps.append(now)
        if tool_name not in self.tool_timestamps:
            self.tool_timestamps[tool_name] = deque()
        self.tool_timestamps[tool_name].append(now)


rate_limiter = RateLimiter(
    global_rpm=100,
    per_tool_rpm={
        "web_search": 10,
        "send_email": 5,
        "run_command": 30,
    },
)


def rate_limited_execute(name: str, inputs: dict) -> str:
    allowed, wait = rate_limiter.check(name)
    if not allowed:
        return f"Rate limited. Try again in {wait:.1f} seconds."
    result = execute_tool(name, inputs)
    rate_limiter.record(name)
    return result

Cost Controls

Track and limit spending on API calls and tool usage.

class CostTracker:
    """Track and limit agent spending."""

    # Approximate costs per 1M tokens (update as pricing changes)
    MODEL_COSTS = {
        "claude-sonnet-4-20250514": {"input": 3.0, "output": 15.0},
        "claude-opus-4-20250514": {"input": 15.0, "output": 75.0},
        "claude-haiku-35-20241022": {"input": 0.80, "output": 4.0},
    }

    def __init__(self, budget_usd: float = 10.0):
        self.budget = budget_usd
        self.total_spent = 0.0
        self.calls: list[dict] = []

    def record_call(self, model: str, input_tokens: int, output_tokens: int):
        costs = self.MODEL_COSTS.get(model, {"input": 3.0, "output": 15.0})
        cost = (input_tokens * costs["input"] + output_tokens * costs["output"]) / 1_000_000
        self.total_spent += cost
        self.calls.append({
            "model": model,
            "input_tokens": input_tokens,
            "output_tokens": output_tokens,
            "cost": cost,
            "timestamp": time.time(),
        })

    def check_budget(self) -> tuple[bool, float]:
        """Check if we are within budget. Returns (within_budget, remaining)."""
        remaining = self.budget - self.total_spent
        return remaining > 0, remaining

    def get_summary(self) -> str:
        return (
            f"Total spent: ${self.total_spent:.4f} / ${self.budget:.2f}\n"
            f"Remaining: ${self.budget - self.total_spent:.4f}\n"
            f"API calls: {len(self.calls)}\n"
            f"Total tokens: {sum(c['input_tokens'] + c['output_tokens'] for c in self.calls):,}"
        )


def cost_aware_agent(task: str, tools: list[dict], budget: float = 5.0) -> str:
    """Agent that stops when it exceeds its budget."""
    tracker = CostTracker(budget_usd=budget)
    messages = [{"role": "user", "content": task}]

    for _ in range(20):
        within_budget, remaining = tracker.check_budget()
        if not within_budget:
            return f"Budget exhausted.\n{tracker.get_summary()}"

        # Use cheaper model when budget is low
        model = "claude-sonnet-4-20250514"
        if remaining < budget * 0.2:
            model = "claude-haiku-35-20241022"

        response = client.messages.create(
            model=model,
            max_tokens=4096,
            tools=tools,
            messages=messages,
        )

        tracker.record_call(
            model=model,
            input_tokens=response.usage.input_tokens,
            output_tokens=response.usage.output_tokens,
        )

        if response.stop_reason == "end_turn":
            result = extract_text(response)
            return f"{result}\n\n---\n{tracker.get_summary()}"

        messages.append({"role": "assistant", "content": response.content})
        tool_results = execute_all_tools(response)
        messages.append({"role": "user", "content": tool_results})

    return f"Max steps reached.\n{tracker.get_summary()}"

Scope Restriction

Limit what the agent can access and modify.

class ScopeRestrictor:
    """Restrict agent access to specific directories, URLs, etc."""

    def __init__(self):
        self.allowed_paths: list[str] = []
        self.blocked_paths: list[str] = []
        self.allowed_domains: list[str] = []
        self.blocked_domains: list[str] = []

    def allow_path(self, path: str):
        self.allowed_paths.append(path)

    def block_path(self, path: str):
        self.blocked_paths.append(path)

    def check_path(self, path: str) -> tuple[bool, str]:
        import os
        real_path = os.path.realpath(path)

        # Check blocked first
        for blocked in self.blocked_paths:
            if real_path.startswith(os.path.realpath(blocked)):
                return False, f"Path is in blocked area: {blocked}"

        # Check allowed
        if self.allowed_paths:
            for allowed in self.allowed_paths:
                if real_path.startswith(os.path.realpath(allowed)):
                    return True, "OK"
            return False, f"Path is not in any allowed directory."

        return True, "OK"

    def check_url(self, url: str) -> tuple[bool, str]:
        from urllib.parse import urlparse
        domain = urlparse(url).netloc

        for blocked in self.blocked_domains:
            if domain.endswith(blocked):
                return False, f"Domain {domain} is blocked."

        if self.allowed_domains:
            for allowed in self.allowed_domains:
                if domain.endswith(allowed):
                    return True, "OK"
            return False, f"Domain {domain} is not in the allow list."

        return True, "OK"


scope = ScopeRestrictor()
scope.allow_path("/project/workspace")
scope.block_path("/project/workspace/.env")
scope.block_path("/etc")
scope.block_path("/root")

Audit Logging

Log every agent action for debugging and compliance.

import json
import time
from pathlib import Path


class AuditLogger:
    """Log all agent actions for review and debugging."""

    def __init__(self, log_dir: str = ".agent_logs"):
        self.log_dir = Path(log_dir)
        self.log_dir.mkdir(parents=True, exist_ok=True)
        self.session_id = f"session_{int(time.time())}"
        self.log_file = self.log_dir / f"{self.session_id}.jsonl"
        self.entries: list[dict] = []

    def log(self, event_type: str, data: dict):
        entry = {
            "timestamp": time.time(),
            "session": self.session_id,
            "type": event_type,
            **data,
        }
        self.entries.append(entry)
        with open(self.log_file, "a") as f:
            f.write(json.dumps(entry) + "\n")

    def log_tool_call(self, tool_name: str, inputs: dict, result: str,
                      authorized: bool, duration_ms: float):
        self.log("tool_call", {
            "tool": tool_name,
            "inputs": {k: str(v)[:200] for k, v in inputs.items()},
            "result_preview": result[:200],
            "authorized": authorized,
            "duration_ms": duration_ms,
        })

    def log_api_call(self, model: str, input_tokens: int,
                     output_tokens: int, stop_reason: str):
        self.log("api_call", {
            "model": model,
            "input_tokens": input_tokens,
            "output_tokens": output_tokens,
            "stop_reason": stop_reason,
        })

    def log_error(self, error_type: str, message: str):
        self.log("error", {"error_type": error_type, "message": message})

    def log_guardrail_trigger(self, guardrail: str, reason: str, action: str):
        self.log("guardrail", {
            "guardrail": guardrail,
            "reason": reason,
            "action": action,  # "blocked", "warned", "escalated"
        })

    def get_summary(self) -> str:
        tool_calls = [e for e in self.entries if e["type"] == "tool_call"]
        api_calls = [e for e in self.entries if e["type"] == "api_call"]
        errors = [e for e in self.entries if e["type"] == "error"]
        guardrails = [e for e in self.entries if e["type"] == "guardrail"]

        return (
            f"Session: {self.session_id}\n"
            f"Tool calls: {len(tool_calls)}\n"
            f"API calls: {len(api_calls)}\n"
            f"Errors: {len(errors)}\n"
            f"Guardrail triggers: {len(guardrails)}\n"
            f"Log file: {self.log_file}"
        )

Putting It All Together

class GuardedAgent:
    """Agent with all guardrails integrated."""

    def __init__(self, tools, system, budget=5.0):
        self.input_validator = InputValidator()
        self.output_validator = OutputValidator()
        self.authorizer = ActionAuthorizer()
        self.rate_limiter = RateLimiter(global_rpm=60)
        self.cost_tracker = CostTracker(budget_usd=budget)
        self.audit = AuditLogger()
        self.tools = tools
        self.system = system

    def run(self, task: str) -> str:
        # Validate input
        check = self.input_validator.validate(task)
        if not check.valid:
            self.audit.log_guardrail_trigger("input_validation", check.reason, "blocked")
            return f"Input rejected: {check.reason}"

        result = self._agent_loop(check.sanitized)

        # Validate output
        output_check = self.output_validator.validate(result)
        if not output_check.valid:
            self.audit.log_guardrail_trigger("output_validation", output_check.reason, "redacted")
            result = self.output_validator.redact(result)

        return result

Layer guardrails incrementally. Start with input validation and audit logging. Add cost controls early so runaway agents do not surprise you. Add authorization policies as you discover what actions need restriction. Avoid over-restricting at the start — agents that cannot do anything useful are worse than agents with reasonable defaults.

Install this skill directly: skilldb add ai-agent-orchestration-skills

Get CLI access →

Related Skills

agent-architecture

Core patterns for building AI agent systems: the observe-think-act loop, ReAct pattern implementation, tool-use cycles, memory systems (short-term and long-term), and planning strategies. Covers how to structure an agent's main loop, manage state between iterations, and wire together perception, reasoning, and action into a reliable autonomous system.

Ai Agent Orchestration368L

agent-error-recovery

Handling failures in AI agent systems: retry strategies with backoff, fallback tools, graceful degradation, human-in-the-loop escalation, stuck-loop detection, and context recovery after crashes. Covers practical patterns for making agents robust against tool failures, API errors, and reasoning dead-ends.

Ai Agent Orchestration470L

agent-evaluation

Testing and evaluating AI agents: trajectory evaluation, task completion metrics, tool-use accuracy measurement, regression testing, benchmark suites, and A/B testing agent configurations. Covers practical approaches to measuring whether agents are working correctly and improving over time.

Ai Agent Orchestration553L

agent-frameworks

Comparison of major AI agent frameworks: LangGraph, CrewAI, AutoGen, Semantic Kernel, and Claude Agent SDK. Covers when to use each framework, their trade-offs, core patterns, practical setup examples, and migration strategies between frameworks.

Ai Agent Orchestration433L

agent-memory

Memory systems for AI agents: conversation history management, summarization strategies, vector-based long-term memory, entity memory, episodic memory, and memory retrieval patterns. Covers practical implementations for giving agents persistent, searchable memory across sessions and within long-running tasks.

Ai Agent Orchestration443L

agent-planning

Planning strategies for AI agents: chain-of-thought prompting, tree-of-thought exploration, plan-and-execute patterns, iterative refinement, task decomposition, and goal tracking. Covers practical implementations that make agents more reliable at complex, multi-step tasks by thinking before acting.

Ai Agent Orchestration459L