The ReAct loop is powerful for exploratory tasks where the next action depends on the previous observation. But it has structural limitations:
No parallelism: ReAct is inherently sequential — think, act, observe, repeat. If three independent sub-tasks exist, they run one after another.
No upfront validation: ReAct discovers problems during execution. A planning agent can detect invalid tool names or missing dependencies before running a single step.
Poor handling of long tasks: tasks with 10+ steps produce very long context windows. The LLM must reason over an increasingly long trajectory, which degrades quality and inflates cost.
No separation of concerns: the same LLM is doing both the high-level planning and the low-level execution, which forces a generalised model where specialisation would be better.
Plan-and-Execute separates the Planner (generates a full JSON task graph) from the Executor (runs steps in dependency order with parallel execution where possible).
Plan-and-Execute Architecture
The Planner receives a goal and outputs a JSON array of steps. Each step has:
id: unique step identifier (e.g. "step_1")
description: human-readable description
tool: the tool to call
args: tool arguments (can reference outputs of previous steps with {{step_id.output}})
depends_on: list of step IDs that must complete before this step runs
The Executor performs a topological sort, then executes all steps whose dependencies are satisfied — potentially in parallel.
The Planner
python
import jsonfrom groq import Groqclient = Groq()PLANNER_PROMPT = """You are a task planning agent. Given a goal, create an execution plan as a JSON array.Each step must have:- "id": unique string like "step_1", "step_2"- "description": what this step does (one sentence)- "tool": one of {available_tools}- "args": dict of tool arguments- "depends_on": list of step IDs that must complete first (empty list if no deps)Use depends_on to express dependencies correctly.Steps with no dependencies can run in parallel.GOAL: {goal}Return ONLY valid JSON — a list of step objects. No explanation."""def create_plan(goal: str, available_tools: list[str]) -> list[dict]: """ Ask the LLM to generate a JSON execution plan for a goal. Returns a list of step dicts. """ response = client.chat.completions.create( model="llama-3.3-70b-versatile", messages=[{ "role": "user", "content": PLANNER_PROMPT.format( goal=goal, available_tools=", ".join(available_tools), ), }], response_format={"type": "json_object"}, temperature=0.1, ) data = json.loads(response.choices[0].message.content) # Normalise: model may wrap in a key if isinstance(data, dict): data = next(iter(data.values())) return data
Plan Validation
Validate the plan before executing a single step. Catch structural errors early.
python
from collections import defaultdictclass PlanValidationError(Exception): passdef validate_plan(steps: list[dict], available_tools: set[str]) -> None: """ Validate a plan for structural correctness. Raises PlanValidationError with a descriptive message if invalid. """ step_ids = {step["id"] for step in steps} for step in steps: # Check required fields for field in ("id", "description", "tool", "args", "depends_on"): if field not in step: raise PlanValidationError(f"Step {step.get('id', '?')} missing field '{field}'") # Check tool exists if step["tool"] not in available_tools: raise PlanValidationError( f"Step {step['id']} references unknown tool '{step['tool']}'. " f"Available: {available_tools}" ) # Check dependency IDs exist for dep_id in step["depends_on"]: if dep_id not in step_ids: raise PlanValidationError( f"Step {step['id']} depends on '{dep_id}' which does not exist in the plan" ) # Check for cycles with DFS topological sort _check_no_cycles(steps, step_ids)def _check_no_cycles(steps: list[dict], step_ids: set[str]) -> None: """Detect cycles using DFS. Raises PlanValidationError if a cycle is found.""" graph = {step["id"]: step["depends_on"] for step in steps} visited = set() in_stack = set() def dfs(node: str) -> None: visited.add(node) in_stack.add(node) for neighbour in graph.get(node, []): if neighbour not in visited: dfs(neighbour) elif neighbour in in_stack: raise PlanValidationError( f"Cycle detected in plan: {neighbour} is an ancestor of itself" ) in_stack.discard(node) for step_id in step_ids: if step_id not in visited: dfs(step_id)
Topological Sort
python
def topological_sort(steps: list[dict]) -> list[list[dict]]: """ Sort steps into execution waves. Each wave is a list of steps that can execute in parallel (all their dependencies are in earlier waves). Returns a list of waves (each wave is a list of step dicts). """ remaining = {step["id"]: step for step in steps} completed: set[str] = set() waves = [] while remaining: # Find all steps whose dependencies are fully satisfied ready = [ step for step in remaining.values() if all(dep in completed for dep in step["depends_on"]) ] if not ready: raise PlanValidationError("No progress possible — potential cycle not caught by validator") waves.append(ready) for step in ready: completed.add(step["id"]) del remaining[step["id"]] return waves
The Executor
python
import asyncioimport timeclass StepExecutionError(Exception): def __init__(self, step_id: str, original: Exception): self.step_id = step_id self.original = original super().__init__(f"Step {step_id} failed: {original}")def resolve_args(args: dict, completed_results: dict) -> dict: """ Replace template references like {{step_1.output}} with actual results. This allows later steps to consume outputs from earlier steps. """ resolved = {} for key, value in args.items(): if isinstance(value, str) and value.startswith("{{") and value.endswith("}}"): ref = value[2:-2].strip() # strip {{ and }} parts = ref.split(".") step_id = parts[0] field = parts[1] if len(parts) > 1 else "output" resolved[key] = completed_results.get(step_id, {}).get(field, value) else: resolved[key] = value return resolvedasync def execute_step(step: dict, tool_registry: dict, completed_results: dict) -> dict: """Execute a single step asynchronously, resolving arg templates first.""" tool_fn = tool_registry.get(step["tool"]) if tool_fn is None: raise StepExecutionError(step["id"], ValueError(f"Tool '{step['tool']}' not found")) resolved_args = resolve_args(step["args"], completed_results) t0 = time.perf_counter() try: result = await asyncio.to_thread(tool_fn, **resolved_args) latency = (time.perf_counter() - t0) * 1000 return {"output": result, "latency_ms": latency, "status": "success"} except Exception as e: raise StepExecutionError(step["id"], e)async def execute_plan( steps: list[dict], tool_registry: dict, on_step_complete=None,) -> dict[str, dict]: """ Execute the plan in dependency order, running independent steps in parallel. Returns a dict mapping step_id -> {output, latency_ms, status}. """ waves = topological_sort(steps) completed_results: dict[str, dict] = {} for wave_idx, wave in enumerate(waves): print(f"Wave {wave_idx + 1}: executing {[s['id'] for s in wave]} in parallel") # Execute all steps in this wave concurrently tasks = [execute_step(step, tool_registry, completed_results) for step in wave] wave_results = await asyncio.gather(*tasks, return_exceptions=True) for step, result in zip(wave, wave_results): if isinstance(result, StepExecutionError): completed_results[step["id"]] = {"status": "failed", "error": str(result)} else: completed_results[step["id"]] = result if on_step_complete: on_step_complete(step["id"], result) return completed_results
Replanning on Failure
When a step fails, pass the failure context back to the Planner and ask it to regenerate a recovery plan:
python
REPLAN_PROMPT = """You are a task planning agent. A previous execution plan failed.ORIGINAL GOAL: {goal}COMPLETED STEPS: {completed}FAILED STEP: {failed_step}ERROR: {error}Generate a recovery plan to complete the original goal, given what has already been done.Only include the REMAINING steps needed. Use the same JSON format.Step IDs must not conflict with already-completed step IDs."""def replan( goal: str, completed_steps: list[str], failed_step: dict, error: str, available_tools: list[str],) -> list[dict]: """Ask the LLM to generate a recovery plan after a step failure.""" response = client.chat.completions.create( model="llama-3.3-70b-versatile", messages=[{ "role": "user", "content": REPLAN_PROMPT.format( goal=goal, completed=", ".join(completed_steps), failed_step=json.dumps(failed_step), error=error, available_tools=available_tools, ), }], response_format={"type": "json_object"}, temperature=0.1, ) data = json.loads(response.choices[0].message.content) return data if isinstance(data, list) else next(iter(data.values()))
LLM Compiler — Batch Tool Calls
The LLM Compiler pattern generates all tool calls in a single LLM pass rather than iterating. This is faster for deterministic tasks but cannot handle conditional branches.
python
LLM_COMPILER_PROMPT = """Given the goal below, generate ALL tool calls needed to complete it in a single pass.Output a JSON object: {{"calls": [{{"tool": str, "args": dict, "id": str}}]}}Order calls so dependencies come first.GOAL: {goal}AVAILABLE TOOLS: {tools}"""def llm_compiler_plan(goal: str, available_tools: list[str]) -> list[dict]: """Generate all tool calls in one shot — no iterative replanning.""" response = client.chat.completions.create( model="llama-3.3-70b-versatile", messages=[{"role": "user", "content": LLM_COMPILER_PROMPT.format(goal=goal, tools=available_tools)}], response_format={"type": "json_object"}, temperature=0.0, ) data = json.loads(response.choices[0].message.content) return data.get("calls", [])
Complete Data Analysis Agent
python
import pandas as pd# --- Tool implementations ---def load_data(filepath: str) -> dict: df = pd.read_csv(filepath) return {"rows": len(df), "columns": list(df.columns), "data": df.to_dict(orient="records")}def compute_stats(data: dict) -> dict: df = pd.DataFrame(data["data"]) numeric_cols = df.select_dtypes(include="number").columns.tolist() return {"stats": df[numeric_cols].describe().to_dict()}def find_anomalies(stats: dict, data: dict) -> dict: df = pd.DataFrame(data["data"]) anomalies = [] for col, col_stats in stats["stats"].items(): mean = col_stats["mean"] std = col_stats["std"] outliers = df[abs(df[col] - mean) > 3 * std][col].tolist() if outliers: anomalies.append({"column": col, "outlier_values": outliers[:5]}) return {"anomalies": anomalies}def generate_summary(stats: dict, anomalies: dict) -> dict: summary_prompt = f"Summarise these dataset statistics and anomalies in 3 sentences:\nStats: {stats}\nAnomalies: {anomalies}" response = client.chat.completions.create( model="llama-3.1-8b-instant", messages=[{"role": "user", "content": summary_prompt}], max_tokens=200, ) return {"summary": response.choices[0].message.content.strip()}TOOL_REGISTRY = { "load_data": load_data, "compute_stats": compute_stats, "find_anomalies": find_anomalies, "generate_summary": generate_summary,}# --- Run the planning agent ---async def run_data_analysis_agent(filepath: str) -> dict: goal = f"Analyse the CSV file at '{filepath}': load it, compute statistics, find anomalies, and generate a summary." available_tools = list(TOOL_REGISTRY.keys()) # 1. Plan plan = create_plan(goal, available_tools) print(f"Generated plan with {len(plan)} steps") # 2. Validate validate_plan(plan, set(available_tools)) # 3. Execute results = await execute_plan(plan, TOOL_REGISTRY) # 4. Return final output summary_result = next( (v for k, v in results.items() if "summary" in k.lower()), list(results.values())[-1] ) return {"plan": plan, "results": results, "summary": summary_result.get("output", {})}# asyncio.run(run_data_analysis_agent("sales_data.csv"))
Key Takeaways
ReAct is sequential; planning agents separate concerns and enable parallel execution of independent steps, reducing total wall-clock time significantly.
Plan validation — checking tool names, dependency IDs, and cycle detection before execution — catches structural errors at zero cost instead of at step 7 of 12.
Topological sort into execution waves enables asyncio.gather-based parallel execution: independent steps run simultaneously.
Template references ({{step_1.output}}) allow later steps to consume earlier outputs without hard-coded data passing.
Replanning passes the failure context back to the Planner LLM, enabling graceful recovery without restarting the entire task.
The LLM Compiler generates all tool calls in a single pass — lower latency for deterministic pipelines, but cannot handle conditional branching.
Keep Planner and Executor as separate concerns; the Planner can be a large, slow model; the Executor is pure Python logic.
Always validate plans before executing — never trust raw LLM output to be structurally valid without checking it.