GadaaLabs
AI Automation — Production Agents & Agentic Systems
Lesson 9

Workflow Automation — Event-Driven & DAG-Based Pipelines

24 min

When Workflows Beat Agents

Agents are the right tool when the task requires dynamic reasoning, tool selection, and adaptive decision-making. But not every automation problem needs an agent. Workflows — deterministic pipelines with predefined steps — are better when:

Determinism is required: an invoice processing pipeline must always run the same steps in the same order. A non-deterministic agent that "decides" to skip validation is a liability.

Audit trails matter: a compliance workflow needs a complete, reproducible record of every step. A DAG-based workflow provides this naturally; an agent's reasoning trace does not.

Cost is constrained: a simple extract-transform-load pipeline with one LLM call per document does not need an agent loop with 5 LLM calls to "decide" what to do.

Reliability is paramount: workflows are easier to test, retry, and monitor because the state machine is explicit.

The correct architecture uses both: workflows handle the deterministic orchestration; agents handle the steps that require language understanding.

Event-Driven Webhooks

Webhook endpoints receive HTTP POST requests when external events occur (GitHub push, Stripe payment, Slack message). The pattern:

  1. Receive POST request
  2. Validate the signature (HMAC-SHA256)
  3. Acknowledge immediately (return 200)
  4. Enqueue the actual processing for async execution
python
import hashlib
import hmac
import json
from fastapi import FastAPI, Request, HTTPException, BackgroundTasks
from redis import Redis

app = FastAPI()
redis = Redis(host="localhost", port=6379, decode_responses=True)

WEBHOOK_SECRET = "your_webhook_secret"  # shared secret from the sender


def verify_webhook_signature(payload: bytes, signature_header: str, secret: str) -> bool:
    """Verify HMAC-SHA256 webhook signature. Reject the request if invalid."""
    expected = "sha256=" + hmac.new(
        secret.encode(), payload, hashlib.sha256
    ).hexdigest()
    return hmac.compare_digest(expected, signature_header)


@app.post("/webhooks/document-updated")
async def document_webhook(request: Request, background_tasks: BackgroundTasks):
    """
    Receive document update events.
    Acknowledge immediately, process in background.
    """
    payload = await request.body()
    signature = request.headers.get("X-Signature", "")

    if not verify_webhook_signature(payload, signature, WEBHOOK_SECRET):
        raise HTTPException(status_code=401, detail="Invalid signature")

    event = json.loads(payload)
    # Enqueue for async processing — never block the webhook response
    background_tasks.add_task(process_document_event, event)

    return {"status": "accepted"}


async def process_document_event(event: dict) -> None:
    """Background processing of a document update event."""
    doc_id = event.get("doc_id")
    action = event.get("action")  # "created" | "updated" | "deleted"

    if action == "deleted":
        redis.rpush("delete_queue", json.dumps({"doc_id": doc_id}))
    elif action in ("created", "updated"):
        redis.rpush("index_queue", json.dumps({"doc_id": doc_id, "url": event.get("url")}))

Message Queue with Redis and Celery

For workloads that need reliable delivery, retry, and distributed processing, use a message queue:

python
# celery_app.py
from celery import Celery
from kombu import Queue

celery_app = Celery(
    "ai_workflows",
    broker="redis://localhost:6379/0",
    backend="redis://localhost:6379/1",
)

celery_app.conf.update(
    task_queues=(
        Queue("high_priority", routing_key="high"),
        Queue("default", routing_key="default"),
        Queue("low_priority", routing_key="low"),
    ),
    task_default_queue="default",
    task_acks_late=True,        # ack only after successful completion, not on receipt
    task_reject_on_worker_lost=True,
)


@celery_app.task(
    bind=True,
    max_retries=3,
    queue="default",
    name="process_document",
)
def process_document_task(self, doc_id: str, doc_url: str) -> dict:
    """Process a single document: download, chunk, embed, index."""
    try:
        content = download_document(doc_url)
        chunks = chunk_document(content)
        embeddings = embed_chunks(chunks)
        upsert_to_index(doc_id, chunks, embeddings)
        return {"doc_id": doc_id, "chunks_indexed": len(chunks), "status": "success"}
    except DownloadError as e:
        # Retry with exponential backoff for transient errors
        raise self.retry(exc=e, countdown=2 ** self.request.retries)
    except PermanentError:
        # Do not retry — log and fail permanently
        return {"doc_id": doc_id, "status": "failed", "error": str(e)}

Retry with Exponential Backoff

python
from tenacity import (
    retry,
    wait_exponential,
    stop_after_attempt,
    retry_if_exception_type,
    before_sleep_log,
)
import logging

logger = logging.getLogger(__name__)


class TransientError(Exception):
    """Retryable error: network timeout, rate limit, temporary service unavailability."""


class PermanentError(Exception):
    """Non-retryable error: invalid input, authentication failure, resource not found."""


@retry(
    retry=retry_if_exception_type(TransientError),
    wait=wait_exponential(multiplier=1, min=1, max=60),
    stop=stop_after_attempt(5),
    before_sleep=before_sleep_log(logger, logging.WARNING),
    reraise=True,
)
def call_llm_with_retry(prompt: str) -> str:
    """Call an LLM API with automatic retry on transient failures."""
    from groq import Groq, RateLimitError, APIConnectionError

    client = Groq()
    try:
        response = client.chat.completions.create(
            model="llama-3.3-70b-versatile",
            messages=[{"role": "user", "content": prompt}],
            max_tokens=500,
        )
        return response.choices[0].message.content
    except RateLimitError as e:
        raise TransientError(f"Rate limited: {e}") from e
    except APIConnectionError as e:
        raise TransientError(f"Connection error: {e}") from e

Idempotency

Every task must be idempotent: if the same task arrives twice (due to network retry, duplicate event), the second execution must produce the same result without double-processing.

python
import redis
import json
import uuid

redis_client = redis.Redis(host="localhost", port=6379, decode_responses=True)
IDEMPOTENCY_TTL = 86400  # 24 hours


def idempotent_task(idempotency_key: str, task_fn, *args, **kwargs):
    """
    Execute a task idempotently.
    If idempotency_key has been seen before, return the cached result.
    """
    cache_key = f"idempotency:{idempotency_key}"
    cached = redis_client.get(cache_key)

    if cached:
        print(f"Idempotency hit for key: {idempotency_key}")
        return json.loads(cached)

    result = task_fn(*args, **kwargs)
    redis_client.setex(cache_key, IDEMPOTENCY_TTL, json.dumps(result))
    return result

DAG-Based Workflow with State Machine

A state machine makes workflow transitions explicit and prevents invalid state jumps:

python
from enum import Enum
from dataclasses import dataclass, field
import datetime


class WorkflowState(str, Enum):
    PENDING = "pending"
    PROCESSING = "processing"
    WAITING_REVIEW = "waiting_review"
    COMPLETE = "complete"
    FAILED = "failed"


# Valid state transitions
VALID_TRANSITIONS: dict[WorkflowState, set[WorkflowState]] = {
    WorkflowState.PENDING: {WorkflowState.PROCESSING},
    WorkflowState.PROCESSING: {WorkflowState.WAITING_REVIEW, WorkflowState.FAILED},
    WorkflowState.WAITING_REVIEW: {WorkflowState.COMPLETE, WorkflowState.FAILED},
    WorkflowState.COMPLETE: set(),
    WorkflowState.FAILED: {WorkflowState.PENDING},  # allow retry from FAILED
}


@dataclass
class WorkflowRun:
    run_id: str
    task_type: str
    input_data: dict
    state: WorkflowState = WorkflowState.PENDING
    step_results: dict = field(default_factory=dict)
    created_at: str = field(default_factory=lambda: datetime.datetime.utcnow().isoformat())
    updated_at: str = field(default_factory=lambda: datetime.datetime.utcnow().isoformat())

    def transition(self, new_state: WorkflowState) -> None:
        """Validate and apply a state transition."""
        allowed = VALID_TRANSITIONS.get(self.state, set())
        if new_state not in allowed:
            raise ValueError(
                f"Invalid transition: {self.state}{new_state}. "
                f"Allowed from {self.state}: {allowed}"
            )
        self.state = new_state
        self.updated_at = datetime.datetime.utcnow().isoformat()

The Saga Pattern — Rollback on Failure

When a multi-step workflow partially succeeds and then fails, the saga pattern runs compensating actions to undo the completed steps:

python
from dataclasses import dataclass
from typing import Callable


@dataclass
class SagaStep:
    name: str
    execute: Callable       # the forward action
    compensate: Callable    # the rollback action (undo)


class Saga:
    """
    Execute a sequence of steps with automatic rollback on failure.
    Each step must define a compensating action.
    """

    def __init__(self, steps: list[SagaStep]):
        self.steps = steps

    def run(self, context: dict) -> dict:
        """Execute all steps. On failure, roll back all completed steps in reverse order."""
        completed: list[SagaStep] = []

        for step in self.steps:
            try:
                print(f"Executing: {step.name}")
                result = step.execute(context)
                context[step.name] = result
                completed.append(step)
            except Exception as e:
                print(f"Step '{step.name}' failed: {e}. Rolling back {len(completed)} completed steps.")
                # Compensate in reverse order
                for completed_step in reversed(completed):
                    try:
                        completed_step.compensate(context)
                        print(f"Compensated: {completed_step.name}")
                    except Exception as comp_err:
                        print(f"Compensation failed for '{completed_step.name}': {comp_err}")
                raise RuntimeError(f"Workflow failed at step '{step.name}': {e}") from e

        return context

Complete Report Generation Workflow

python
import datetime
import smtplib
from email.mime.text import MIMEText

from groq import Groq

client = Groq()


def load_data_step(context: dict) -> dict:
    """Step 1: Load data from the database."""
    # In production: run a SQL query
    return {"rows": 1500, "date_range": "2025-01-01 to 2025-03-31", "data": []}


def analyse_with_llm_step(context: dict) -> dict:
    """Step 2: Use LLM to generate qualitative analysis of the data."""
    data_summary = context.get("load_data_step", {})
    response = client.chat.completions.create(
        model="llama-3.3-70b-versatile",
        messages=[{
            "role": "user",
            "content": f"Analyse this data summary and provide 3 key insights: {data_summary}",
        }],
        max_tokens=300,
    )
    return {"insights": response.choices[0].message.content}


def generate_charts_step(context: dict) -> dict:
    """Step 3: Generate chart descriptions (placeholder for real chart generation)."""
    return {"charts": ["revenue_trend.png", "cost_breakdown.png"]}


def compose_html_report_step(context: dict) -> dict:
    """Step 4: Compose the full HTML report."""
    analysis = context.get("analyse_with_llm_step", {})
    charts = context.get("generate_charts_step", {})
    html = f"""
    <html><body>
    <h1>Quarterly Report — {datetime.date.today()}</h1>
    <h2>Key Insights</h2>
    <p>{analysis.get('insights', 'No insights generated')}</p>
    <h2>Charts</h2>
    <p>{', '.join(charts.get('charts', []))}</p>
    </body></html>
    """
    return {"html_report": html, "report_path": "/tmp/quarterly_report.html"}


def send_email_step(context: dict) -> dict:
    """Step 5: Email the report."""
    report = context.get("compose_html_report_step", {})
    # In production: use SES, SendGrid, etc.
    # msg = MIMEText(report["html_report"], "html")
    # ... smtp send logic ...
    return {"sent_to": "team@company.com", "status": "sent"}


def run_report_workflow() -> dict:
    """Execute the full report generation workflow with saga rollback."""
    steps = [
        SagaStep(
            name="load_data_step",
            execute=load_data_step,
            compensate=lambda ctx: print("Nothing to compensate for data load"),
        ),
        SagaStep(
            name="analyse_with_llm_step",
            execute=analyse_with_llm_step,
            compensate=lambda ctx: print("Nothing to compensate for analysis"),
        ),
        SagaStep(
            name="generate_charts_step",
            execute=generate_charts_step,
            compensate=lambda ctx: print("Deleting generated chart files"),
        ),
        SagaStep(
            name="compose_html_report_step",
            execute=compose_html_report_step,
            compensate=lambda ctx: print(f"Deleting report: {ctx.get('compose_html_report_step', {}).get('report_path')}"),
        ),
        SagaStep(
            name="send_email_step",
            execute=send_email_step,
            compensate=lambda ctx: print("Cannot unsend email — logging compensation failure"),
        ),
    ]

    saga = Saga(steps)
    return saga.run(context={})

Key Takeaways

  • Choose workflows over agents when the task is deterministic, requires an audit trail, or has strict cost and reliability constraints.
  • Always validate webhook signatures before processing — unauthenticated webhooks are a common attack vector.
  • Acknowledge webhook requests immediately and process asynchronously — never make the sender wait for your processing.
  • Use tenacity with wait_exponential for transient errors (rate limits, network timeouts); never retry PermanentErrors (invalid input, auth failures).
  • Idempotency keys prevent double-processing when events are delivered more than once — store results keyed by idempotency_key with TTL.
  • State machines make valid workflow transitions explicit; a transition that bypasses WAITING_REVIEW should be impossible by construction.
  • The saga pattern is the correct rollback mechanism for multi-step workflows — each step must define a compensating action that runs if a later step fails.
  • The cron → load → analyse → compose → send pattern is the template for most automated reporting workflows; parametrise the steps for different report types.