Back to Blog
AI AgentsData EngineeringPythonAirflowLLMOrchestration

Agentic Data Workflows — Using AI Agents to Automate Pipeline Orchestration and Quality Monitoring

A practical guide to agentic data workflows in production: designing the agent loop for pipeline orchestration using the ReAct pattern, building Python agents with the Anthropic SDK and tool use for Airflow DAG monitoring and log analysis, integrating agents with Apache Airflow REST API for backfill triggering and DAG health checks, embedding agents in Prefect flow on_failure hooks, self-healing quality gates with Great Expectations and LLM triage, multi-agent coordination with orchestrator and specialist models, Prefect flow hooks for AI-driven failure response, idempotent tool call patterns with Redis, structured agent run logging for audit trails and cost tracking, blast radius limits and table-level write permission guardrails, and escalation SLA enforcement to prevent silent agent runaway.

2026-05-20

Why Traditional Pipelines Aren't Enough

Data pipelines built with static DAGs are excellent at executing known sequences of transformations. They struggle with everything else: unexpected schema drift at 3 AM, a vendor API that starts returning partial data, a downstream table that's 6 hours stale for reasons buried three hops upstream. On-call engineers spend more time reading logs and chasing root causes than writing code.

Agentic data workflows flip this model. Instead of encoding every decision branch into a DAG, you give an LLM-powered agent access to a set of tools — query engines, pipeline APIs, alerting hooks, documentation — and let it reason about what to do in response to a given observation. The agent can read pipeline logs, identify the root cause, trigger a backfill, verify the fix, and page a human only if the automated recovery fails. The human stays in the loop for novel situations; routine remediation becomes fully automated.

This article covers the practical implementation path: agent loop design, tool definitions, Airflow and Prefect integration, self-healing quality gates, multi-agent coordination, and the production guardrails that keep agents from making things worse.

Note

Agentic workflows are not a replacement for well-designed pipelines. They are a reliability layer on top. Before adding agents, ensure your pipelines have idempotent writes, explicit schema contracts, and structured logs. Agents that can't read clean signals will hallucinate root causes.

Agent Loop Architecture for Data Pipelines

Data pipeline agents follow the ReAct (Reasoning + Acting) pattern: the agent alternates between reasoning about the current state and taking actions via tools until it reaches a terminal state (resolution, escalation, or explicit stop). Four components define the architecture:

Observation

The triggering event that starts the agent loop: a failed DAG run alert from Airflow, a freshness SLO breach from your observability stack, a data quality check failure from dbt or Great Expectations. The observation should include structured metadata — pipeline name, run ID, error message, affected tables — so the agent can immediately query for more context without guessing.

Tools

The callable functions the agent uses to interact with your data infrastructure: get_dag_run_logs, query_table_stats, trigger_backfill, run_data_quality_check, send_slack_alert, update_incident_ticket. Each tool has a JSON Schema describing its parameters and return shape. Tools are where agent reasoning meets real infrastructure — design them carefully.

Memory

What the agent knows beyond the current conversation turn. Short-term memory is the accumulated tool call results in the current run. Long-term memory stores past incident resolutions, known failure patterns, and remediation playbooks — retrieved via semantic search to inform the current investigation. A vector store like pgvector or Qdrant works well here.

Guardrails

Hard constraints on what the agent is allowed to do. Guardrails distinguish agentic workflows that are safe to run autonomously from those that require human approval. Common guardrails: maximum number of tool calls per run (to prevent loops), a list of tables that are read-only for the agent, mandatory human confirmation before any DELETE or schema ALTER, and a maximum blast radius for backfill operations.

Building a Pipeline Monitor Agent with the Anthropic SDK

The Anthropic tool use API is the foundation for building data pipeline agents. Install the SDK and define the tools your agent needs:

pip install anthropic>=0.30 httpx tenacity
# pipeline_agent.py
import json
import anthropic
from dataclasses import dataclass
from typing import Any

client = anthropic.Anthropic()

# --- Tool definitions (JSON Schema format) ---
TOOLS = [
    {
        "name": "get_dag_run_logs",
        "description": (
            "Fetch the last N log lines for a specific Airflow DAG run. "
            "Returns structured log entries with timestamps and task IDs."
        ),
        "input_schema": {
            "type": "object",
            "properties": {
                "dag_id": {"type": "string", "description": "Airflow DAG identifier"},
                "run_id": {"type": "string", "description": "The DAG run ID (e.g. scheduled__2026-05-20T00:00:00+00:00)"},
                "tail_lines": {"type": "integer", "default": 200, "description": "Number of log lines to return"},
            },
            "required": ["dag_id", "run_id"],
        },
    },
    {
        "name": "get_table_freshness",
        "description": "Check when a data warehouse table was last updated and how many rows it contains.",
        "input_schema": {
            "type": "object",
            "properties": {
                "table_fqn": {"type": "string", "description": "Fully-qualified table name, e.g. analytics.orders"},
                "freshness_threshold_hours": {"type": "number", "default": 24.0},
            },
            "required": ["table_fqn"],
        },
    },
    {
        "name": "trigger_dag_backfill",
        "description": (
            "Trigger an Airflow DAG backfill for a date range. "
            "Only use this after confirming the root cause is resolved."
        ),
        "input_schema": {
            "type": "object",
            "properties": {
                "dag_id": {"type": "string"},
                "start_date": {"type": "string", "description": "ISO 8601 date, e.g. 2026-05-18"},
                "end_date": {"type": "string", "description": "ISO 8601 date, inclusive"},
            },
            "required": ["dag_id", "start_date", "end_date"],
        },
    },
    {
        "name": "run_dbt_test",
        "description": "Run a specific dbt test or model and return pass/fail results with row counts.",
        "input_schema": {
            "type": "object",
            "properties": {
                "selector": {"type": "string", "description": "dbt node selector, e.g. 'orders' or 'test:not_null_orders_id'"},
                "target": {"type": "string", "default": "prod"},
            },
            "required": ["selector"],
        },
    },
    {
        "name": "send_incident_update",
        "description": "Post an update to the active incident channel with investigation findings.",
        "input_schema": {
            "type": "object",
            "properties": {
                "incident_id": {"type": "string"},
                "message": {"type": "string", "description": "Markdown-formatted investigation update"},
                "severity": {"type": "string", "enum": ["info", "warning", "critical"], "default": "info"},
            },
            "required": ["incident_id", "message"],
        },
    },
]


# --- Tool implementations (simplified) ---
def execute_tool(name: str, inputs: dict[str, Any]) -> str:
    """Dispatch tool calls to real implementations."""
    if name == "get_dag_run_logs":
        return airflow_get_logs(inputs["dag_id"], inputs["run_id"], inputs.get("tail_lines", 200))
    elif name == "get_table_freshness":
        return warehouse_check_freshness(inputs["table_fqn"], inputs.get("freshness_threshold_hours", 24.0))
    elif name == "trigger_dag_backfill":
        return airflow_trigger_backfill(inputs["dag_id"], inputs["start_date"], inputs["end_date"])
    elif name == "run_dbt_test":
        return dbt_run_selector(inputs["selector"], inputs.get("target", "prod"))
    elif name == "send_incident_update":
        return slack_post_update(inputs["incident_id"], inputs["message"], inputs.get("severity", "info"))
    raise ValueError(f"Unknown tool: {name}")


# --- Agent loop ---
MAX_TOOL_CALLS = 15  # hard guardrail

def run_pipeline_agent(
    incident_id: str,
    dag_id: str,
    run_id: str,
    error_summary: str,
) -> str:
    """
    Run the investigation agent for a failed pipeline run.
    Returns a resolution summary.
    """
    messages = [
        {
            "role": "user",
            "content": (
                f"A data pipeline has failed and requires investigation.\n\n"
                f"Incident ID: {incident_id}\n"
                f"DAG: {dag_id}\n"
                f"Run ID: {run_id}\n"
                f"Error summary: {error_summary}\n\n"
                f"Please investigate the root cause, attempt automated remediation if safe to do so, "
                f"and post an incident update with your findings. "
                f"Do NOT trigger a backfill until you have confirmed the underlying issue is resolved."
            ),
        }
    ]

    tool_call_count = 0

    while tool_call_count < MAX_TOOL_CALLS:
        response = client.messages.create(
            model="claude-opus-4-7",
            max_tokens=4096,
            tools=TOOLS,
            messages=messages,
        )

        # Append assistant response to history
        messages.append({"role": "assistant", "content": response.content})

        # Terminal: model stopped without requesting more tools
        if response.stop_reason == "end_turn":
            final_text = next(
                (b.text for b in response.content if hasattr(b, "text")),
                "Investigation complete — no further actions required.",
            )
            return final_text

        # Process tool calls
        if response.stop_reason == "tool_use":
            tool_results = []

            for block in response.content:
                if block.type != "tool_use":
                    continue

                tool_call_count += 1
                result = execute_tool(block.name, block.input)

                tool_results.append({
                    "type": "tool_result",
                    "tool_use_id": block.id,
                    "content": result,
                })

            messages.append({"role": "user", "content": tool_results})

    return "Agent reached maximum tool call limit — escalating to on-call engineer."

Note

Always append the assistant's full response (including tool_use blocks) to the message history before sending tool results back. Skipping this step breaks the conversation thread and causes the model to lose context about which tool calls it made. The pattern above — append assistant content, collect results, append tool results — is the correct loop structure for the Anthropic SDK.

Integrating Agents with Apache Airflow

The Airflow REST API is the integration surface for agentic tools. You can trigger DAG runs, fetch task logs, check run status, and manage variables — everything an agent needs for pipeline orchestration.

# airflow_tools.py — implementations for the agent's Airflow tools
import httpx
import os
from typing import Any

AIRFLOW_BASE_URL = os.environ["AIRFLOW_BASE_URL"]  # e.g. https://airflow.internal/api/v1
AIRFLOW_AUTH = (os.environ["AIRFLOW_USER"], os.environ["AIRFLOW_PASSWORD"])


def airflow_get_logs(dag_id: str, run_id: str, tail_lines: int = 200) -> str:
    """Fetch task logs for all tasks in a DAG run."""
    # First get the list of task instances for this run
    resp = httpx.get(
        f"{AIRFLOW_BASE_URL}/dags/{dag_id}/dagRuns/{run_id}/taskInstances",
        auth=AIRFLOW_AUTH,
        timeout=30,
    )
    resp.raise_for_status()
    tasks = resp.json()["task_instances"]

    log_entries = []
    for task in tasks:
        if task["state"] not in ("failed", "upstream_failed"):
            continue

        task_id = task["task_id"]
        try_number = task.get("try_number", 1)

        log_resp = httpx.get(
            f"{AIRFLOW_BASE_URL}/dags/{dag_id}/dagRuns/{run_id}/taskInstances/{task_id}/logs/{try_number}",
            auth=AIRFLOW_AUTH,
            timeout=30,
        )
        if log_resp.status_code == 200:
            lines = log_resp.text.splitlines()
            log_entries.append({
                "task_id": task_id,
                "state": task["state"],
                "log_tail": "\n".join(lines[-tail_lines:]),
            })

    if not log_entries:
        return "No failed tasks found in this DAG run."

    import json
    return json.dumps(log_entries, indent=2)


def airflow_trigger_backfill(dag_id: str, start_date: str, end_date: str) -> str:
    """Trigger a backfill by creating individual DAG runs for each execution date."""
    from datetime import date, timedelta

    start = date.fromisoformat(start_date)
    end = date.fromisoformat(end_date)

    triggered = []
    current = start
    while current <= end:
        logical_date = f"{current.isoformat()}T00:00:00+00:00"

        resp = httpx.post(
            f"{AIRFLOW_BASE_URL}/dags/{dag_id}/dagRuns",
            auth=AIRFLOW_AUTH,
            json={"logical_date": logical_date, "conf": {"triggered_by": "pipeline_agent"}},
            timeout=30,
        )
        if resp.status_code in (200, 409):  # 409 = run already exists
            triggered.append(current.isoformat())
        current += timedelta(days=1)

    return f"Triggered backfill for {dag_id}: {', '.join(triggered)}"


def airflow_get_dag_health(dag_id: str) -> dict[str, Any]:
    """Return recent run history and success rate for a DAG."""
    resp = httpx.get(
        f"{AIRFLOW_BASE_URL}/dags/{dag_id}/dagRuns",
        auth=AIRFLOW_AUTH,
        params={"limit": 20, "order_by": "-execution_date"},
        timeout=30,
    )
    resp.raise_for_status()
    runs = resp.json()["dag_runs"]

    states = [r["state"] for r in runs]
    success_rate = states.count("success") / len(states) if states else 0

    return {
        "dag_id": dag_id,
        "recent_run_count": len(runs),
        "success_rate": round(success_rate, 3),
        "last_run_state": states[0] if states else "unknown",
        "last_run_duration_seconds": (
            runs[0].get("end_date") and runs[0].get("start_date")
            and (
                __import__("datetime").datetime.fromisoformat(runs[0]["end_date"].replace("Z", "+00:00")) -
                __import__("datetime").datetime.fromisoformat(runs[0]["start_date"].replace("Z", "+00:00"))
            ).total_seconds()
        ),
    }

Prefect + AI — Embedding Agents in Flow Hooks

Prefect's flow hooks let you attach callbacks to flow state transitions. This makes it natural to trigger an agent when a flow enters a failed state, without any custom polling loop.

# prefect_agent_hook.py
import asyncio
from prefect import flow, task
from prefect.states import State
import anthropic

ANTHROPIC_CLIENT = anthropic.Anthropic()

TRIAGE_TOOLS = [
    {
        "name": "read_flow_logs",
        "description": "Read the last N log lines from a Prefect flow run.",
        "input_schema": {
            "type": "object",
            "properties": {
                "flow_run_id": {"type": "string"},
                "limit": {"type": "integer", "default": 100},
            },
            "required": ["flow_run_id"],
        },
    },
    {
        "name": "check_upstream_dependencies",
        "description": "Check whether upstream data sources are available and fresh.",
        "input_schema": {
            "type": "object",
            "properties": {
                "flow_name": {"type": "string"},
            },
            "required": ["flow_name"],
        },
    },
    {
        "name": "create_incident",
        "description": "Create a PagerDuty/Linear incident for the failure if it cannot be auto-resolved.",
        "input_schema": {
            "type": "object",
            "properties": {
                "title": {"type": "string"},
                "description": {"type": "string"},
                "priority": {"type": "string", "enum": ["P1", "P2", "P3"]},
            },
            "required": ["title", "description"],
        },
    },
]


def on_failure_hook(flow, flow_run, state: State):
    """Prefect on_failure hook — triggers an AI triage agent."""
    asyncio.run(_run_triage(flow, flow_run, state))


async def _run_triage(flow, flow_run, state: State):
    error_msg = str(state.message or "Unknown error")
    print(f"[agent] Triaging failure for flow run {flow_run.id}: {error_msg}")

    messages = [
        {
            "role": "user",
            "content": (
                f"Prefect flow '{flow.name}' (run ID: {flow_run.id}) failed.\n"
                f"Error: {error_msg}\n\n"
                f"Investigate the failure. If it is caused by a transient issue (e.g. network timeout, "
                f"upstream API unavailability), document the finding and recommend a retry schedule. "
                f"If the cause is a data contract violation or code bug, create an incident."
            ),
        }
    ]

    for _ in range(10):
        response = ANTHROPIC_CLIENT.messages.create(
            model="claude-sonnet-4-6",
            max_tokens=2048,
            tools=TRIAGE_TOOLS,
            messages=messages,
        )
        messages.append({"role": "assistant", "content": response.content})

        if response.stop_reason == "end_turn":
            break

        if response.stop_reason == "tool_use":
            results = []
            for block in response.content:
                if block.type == "tool_use":
                    result = dispatch_triage_tool(block.name, block.input)
                    results.append({
                        "type": "tool_result",
                        "tool_use_id": block.id,
                        "content": result,
                    })
            messages.append({"role": "user", "content": results})


# Attach the hook to a flow
@flow(on_failure=[on_failure_hook])
def nightly_revenue_pipeline():
    extract_raw_events()
    transform_to_orders()
    load_to_warehouse()

Self-Healing Quality Gates

The most impactful agentic pattern in data engineering is the self-healing quality gate: a check that not only detects a data quality violation but also attempts to diagnose and repair it automatically. Below is a complete example built on top of Great Expectations checkpoints.

# quality_gate.py — self-healing quality gate with LLM triage
import great_expectations as gx
import anthropic
import json

client = anthropic.Anthropic()

REPAIR_TOOLS = [
    {
        "name": "get_source_row_count",
        "description": "Count rows in the source table for a given partition date.",
        "input_schema": {
            "type": "object",
            "properties": {
                "table": {"type": "string"},
                "partition_date": {"type": "string", "description": "YYYY-MM-DD"},
            },
            "required": ["table", "partition_date"],
        },
    },
    {
        "name": "check_etl_job_status",
        "description": "Return the last-run status and log of the ETL job responsible for a table.",
        "input_schema": {
            "type": "object",
            "properties": {
                "table": {"type": "string"},
            },
            "required": ["table"],
        },
    },
    {
        "name": "rerun_etl_partition",
        "description": "Re-run the ETL job for a specific partition. Only call after diagnosing the cause.",
        "input_schema": {
            "type": "object",
            "properties": {
                "table": {"type": "string"},
                "partition_date": {"type": "string"},
                "dry_run": {"type": "boolean", "default": True},
            },
            "required": ["table", "partition_date"],
        },
    },
]


def run_quality_gate_with_healing(checkpoint_name: str, batch_date: str) -> dict:
    """
    Run a GX checkpoint. If it fails, invoke the repair agent.
    Returns a dict with gate_passed, violations, and agent_actions.
    """
    context = gx.get_context()
    result = context.checkpoints.get(checkpoint_name).run(
        batch_parameters={"partition_date": batch_date}
    )

    if result.success:
        return {"gate_passed": True, "violations": [], "agent_actions": []}

    # Extract violation summary
    violations = []
    for run_result in result.run_results.values():
        for res in run_result["validation_result"]["results"]:
            if not res["success"]:
                violations.append({
                    "expectation": res["expectation_config"]["expectation_type"],
                    "column": res["expectation_config"]["kwargs"].get("column"),
                    "observed": res["result"].get("observed_value"),
                    "expected": res["expectation_config"]["kwargs"].get("value_set")
                                or res["expectation_config"]["kwargs"].get("max_value"),
                })

    # Invoke repair agent
    agent_actions = _repair_agent(checkpoint_name, batch_date, violations)

    return {
        "gate_passed": False,
        "violations": violations,
        "agent_actions": agent_actions,
    }


def _repair_agent(checkpoint: str, batch_date: str, violations: list) -> list:
    prompt = (
        f"Data quality checkpoint '{checkpoint}' failed for batch_date={batch_date}.\n"
        f"Violations:\n{json.dumps(violations, indent=2)}\n\n"
        f"Investigate the root cause and attempt automated repair if safe. "
        f"Start with source row counts and ETL job status before rerunning anything."
    )

    messages = [{"role": "user", "content": prompt}]
    actions = []

    for _ in range(8):
        response = client.messages.create(
            model="claude-sonnet-4-6",
            max_tokens=2048,
            tools=REPAIR_TOOLS,
            messages=messages,
        )
        messages.append({"role": "assistant", "content": response.content})

        if response.stop_reason == "end_turn":
            break

        if response.stop_reason == "tool_use":
            results = []
            for block in response.content:
                if block.type == "tool_use":
                    result = dispatch_repair_tool(block.name, block.input)
                    actions.append({"tool": block.name, "input": block.input, "result": result})
                    results.append({
                        "type": "tool_result",
                        "tool_use_id": block.id,
                        "content": result,
                    })
            messages.append({"role": "user", "content": results})

    return actions

Note

Set dry_run=True by default on all write-path tools (reruns, backfills, schema repairs). The agent must explicitly set dry_run=False after reasoning that the action is safe. This prevents the model from accidentally triggering expensive operations during what it intended as a read-only investigation.

Multi-Agent Coordination — Orchestrator and Specialists

For complex incidents that span multiple systems — a data warehouse slowdown that triggers freshness violations that trigger downstream ML model staleness — a single agent with a large tool list becomes unwieldy. The orchestrator-specialist pattern works better: a lightweight orchestrator agent decides which specialist to invoke, collects their findings, and synthesises a resolution.

# multi_agent.py — orchestrator + specialist pattern
import anthropic
import json

client = anthropic.Anthropic()


def warehouse_specialist(context: str) -> str:
    """Diagnose warehouse-related issues (query latency, partition health, compaction)."""
    response = client.messages.create(
        model="claude-haiku-4-5-20251001",  # fast model for specialists
        max_tokens=1024,
        system=(
            "You are a warehouse performance specialist. "
            "Analyse the provided context and return a concise JSON with: "
            "root_cause (string), confidence (0-1), recommended_action (string)."
        ),
        messages=[{"role": "user", "content": context}],
    )
    return response.content[0].text


def pipeline_specialist(context: str) -> str:
    """Diagnose pipeline orchestration issues (DAG failures, task retries, dependencies)."""
    response = client.messages.create(
        model="claude-haiku-4-5-20251001",
        max_tokens=1024,
        system=(
            "You are a pipeline orchestration specialist. "
            "Analyse the provided context and return a JSON with: "
            "root_cause, confidence, recommended_action."
        ),
        messages=[{"role": "user", "content": context}],
    )
    return response.content[0].text


ORCHESTRATOR_TOOLS = [
    {
        "name": "invoke_warehouse_specialist",
        "description": "Ask the warehouse specialist to diagnose a performance or data issue.",
        "input_schema": {
            "type": "object",
            "properties": {
                "context": {"type": "string", "description": "All relevant metrics and logs for warehouse analysis"},
            },
            "required": ["context"],
        },
    },
    {
        "name": "invoke_pipeline_specialist",
        "description": "Ask the pipeline specialist to diagnose an orchestration or DAG failure.",
        "input_schema": {
            "type": "object",
            "properties": {
                "context": {"type": "string", "description": "DAG logs, task states, and error messages"},
            },
            "required": ["context"],
        },
    },
    {
        "name": "create_remediation_plan",
        "description": "Finalise a remediation plan based on specialist findings.",
        "input_schema": {
            "type": "object",
            "properties": {
                "findings": {"type": "array", "items": {"type": "object"}},
                "recommended_steps": {"type": "array", "items": {"type": "string"}},
                "requires_human": {"type": "boolean"},
            },
            "required": ["findings", "recommended_steps", "requires_human"],
        },
    },
]


def orchestrate_incident(incident_summary: str) -> dict:
    messages = [{"role": "user", "content": incident_summary}]

    for _ in range(12):
        response = client.messages.create(
            model="claude-opus-4-7",  # capable model for orchestration
            max_tokens=2048,
            tools=ORCHESTRATOR_TOOLS,
            messages=messages,
        )
        messages.append({"role": "assistant", "content": response.content})

        if response.stop_reason == "end_turn":
            return {"status": "resolved", "summary": response.content[0].text}

        if response.stop_reason == "tool_use":
            results = []
            for block in response.content:
                if block.type != "tool_use":
                    continue

                if block.name == "invoke_warehouse_specialist":
                    result = warehouse_specialist(block.input["context"])
                elif block.name == "invoke_pipeline_specialist":
                    result = pipeline_specialist(block.input["context"])
                elif block.name == "create_remediation_plan":
                    return {"status": "plan_ready", "plan": block.input}
                else:
                    result = json.dumps({"error": f"Unknown tool: {block.name}"})

                results.append({
                    "type": "tool_result",
                    "tool_use_id": block.id,
                    "content": result if isinstance(result, str) else json.dumps(result),
                })
            messages.append({"role": "user", "content": results})

    return {"status": "max_iterations_reached", "requires_human": True}

Production Patterns — Idempotency, Observability, Guardrails

Running agents autonomously in production requires the same discipline as running any stateful distributed system. Three areas need explicit engineering attention beyond the happy-path agent loop.

Idempotent Tool Calls

Agents can be retried — either by a watchdog that detects a stalled run, or because the orchestrating flow failed after the agent had already taken some actions. Every write-path tool must be idempotent: calling it twice with the same parameters must produce the same result as calling it once.

# idempotent_tools.py
import hashlib
import redis
import json
from datetime import timedelta

r = redis.Redis.from_url("redis://localhost:6379")


def idempotent_tool_call(
    incident_id: str,
    tool_name: str,
    tool_inputs: dict,
    fn,
    ttl: timedelta = timedelta(hours=24),
) -> str:
    """
    Execute a tool call exactly once per incident_id + tool signature.
    Subsequent calls with the same key return the cached result.
    """
    key_data = json.dumps({"tool": tool_name, "inputs": tool_inputs}, sort_keys=True)
    key_hash = hashlib.sha256(f"{incident_id}:{key_data}".encode()).hexdigest()[:16]
    cache_key = f"agent:tool:{key_hash}"

    cached = r.get(cache_key)
    if cached:
        return cached.decode()

    result = fn(**tool_inputs)
    r.setex(cache_key, int(ttl.total_seconds()), result)
    return result

Structured Agent Run Logging

Log every agent run as a structured document for audit trails and debugging. Store tool call sequences, model responses, and outcome metadata in a queryable store.

# agent_logger.py
import json
import uuid
from datetime import datetime, timezone
from dataclasses import dataclass, field, asdict
from typing import Any


@dataclass
class AgentRunRecord:
    run_id: str = field(default_factory=lambda: str(uuid.uuid4()))
    started_at: str = field(default_factory=lambda: datetime.now(timezone.utc).isoformat())
    incident_id: str = ""
    dag_id: str = ""
    model: str = ""
    tool_calls: list[dict[str, Any]] = field(default_factory=list)
    total_input_tokens: int = 0
    total_output_tokens: int = 0
    outcome: str = "in_progress"   # resolved | escalated | failed | max_iterations
    resolution_summary: str = ""
    finished_at: str = ""

    def record_tool_call(self, name: str, inputs: dict, result: str) -> None:
        self.tool_calls.append({
            "seq": len(self.tool_calls) + 1,
            "tool": name,
            "inputs": inputs,
            "result_preview": result[:300],
            "ts": datetime.now(timezone.utc).isoformat(),
        })

    def finish(self, outcome: str, summary: str) -> None:
        self.outcome = outcome
        self.resolution_summary = summary
        self.finished_at = datetime.now(timezone.utc).isoformat()

    def to_json(self) -> str:
        return json.dumps(asdict(self), indent=2)

Note

Ship agent run records to your data warehouse alongside pipeline metrics. Querying agent run history lets you answer: which incidents does the agent resolve vs escalate? Which tools are called most frequently? What is the average cost per resolved incident? Without this data, you cannot tune the system or justify its operational cost.

Guardrails and Human-in-the-Loop Checkpoints

Autonomous agents should never have unrestricted write access to production systems. Define explicit checkpoints where the agent must pause and request human approval before proceeding.

Blast Radius Limits

Set hard limits on the scope of automated actions: maximum days in a backfill range (e.g. 7 days), maximum number of tables touched in a single run (e.g. 5), maximum rows deleted in a repair operation (e.g. 0 — never auto-delete). These limits prevent a runaway agent from triggering a weeks-long backfill cascade.

Table-Level Write Permissions

Maintain an allowlist of tables the agent can write to or trigger re-loads for. Fact tables in the gold layer should require human approval. Staging and intermediate tables can be agent-managed. Encode this in the tool implementations, not as a prompt instruction — LLMs can be convinced to ignore prompt constraints, but code cannot.

Confirmation Tokens

For high-risk operations, require the agent to echo a confirmation token (provided in the system prompt) before the tool executes. This pattern ensures the model has processed the entire context before committing an irreversible action, and creates an audit trail proving the agent made an explicit decision rather than accidentally calling a tool.

Escalation SLA

If the agent has not resolved an incident within a configured time window (e.g. 30 minutes), automatically escalate to on-call even if the agent is still iterating. Data incidents compound over time — a stale dashboard that was fresh at 9 AM is a P1 by noon. Never let an agent silently iterate past an SLA deadline.

Further Reading

Work with us

Building data pipelines and want to automate incident response with AI agents?

We design and implement agentic data workflow systems — from Anthropic SDK tool development and Airflow/Prefect integration to self-healing quality gate pipelines, multi-agent orchestration, idempotent repair automation, and production observability for agent-driven data platforms. Let’s talk.

Get in touch

Related Articles