AI Engineering Curriculum
Phase 3: Multi-Agent Systems·11 min read

Module 3.2

LangGraph Multi-Agent

What Is LangGraph Multi-Agent?

In Module 2.6, you learned LangGraph as a framework for building a single agent as a graph — one model, one set of tools, one state object. Multi-agent LangGraph takes that same idea and scales it up: now the nodes in your graph aren't just "call the model" and "run a tool" — they're entire agents, each with their own context, tools, and model.

The core idea: instead of one Claude doing everything, you build a graph where the nodes are specialized agents. A research node that searches the web. A coding node that writes and runs code. A reviewer node that checks quality. They all share a state object, and the graph controls who runs when.

This is the same LangGraph you already know — the difference is just what you put inside the nodes.


Real-World Use Cases

Elastic's threat detection system. When a security alert fires, a routing node decides which specialist agents to dispatch. A web reputation agent, a log analysis agent, and a threat database agent all run simultaneously. Their findings flow into a synthesis agent that scores the analysis quality (0 to 1). If the score is below 0.8, the graph loops back and retries. Once quality passes the threshold, the result is finalized. The entire loop — parallel execution, quality scoring, conditional retry — is a LangGraph multi-agent graph.

LinkedIn's SQL Bot. Natural language question → find the right tables → write the query → execute → if error, diagnose and fix → retry. The "diagnose and fix" step is a loop back to an earlier node. You can't do that in a straight chain. LangGraph is why this works.

Exa's web research system. A Planner agent generates a list of parallel research tasks. Each task runs as an independent agent simultaneously using the Send API. An Observer agent tracks all citations. The key engineering insight their team shared: "individual tasks only receive the final cleaned outputs from other tasks, not intermediate reasoning states" — that conscious decision to pass less information between agents cut their token costs dramatically and improved output quality.


Key Terms for This Module

Subgraph — a complete compiled LangGraph graph used as a single node inside a larger (parent) graph. Lets you build modular, reusable agent components that teams can develop independently.

Command — a special return type from a node that does two things at once: updates the graph state AND specifies which node runs next. Released December 2024. The core tool for agent handoffs in LangGraph.

Supervisor — a special orchestrator node that routes incoming tasks to the right worker agent and synthesizes their results. The langgraph-supervisor library (released February 2025) builds this pattern declaratively.

Swarm — a decentralized pattern where agents hand off directly to each other without a central supervisor. The langgraph-swarm library implements this in LangGraph.

Superstep — LangGraph's execution unit. All nodes that receive inputs in one round run concurrently within a superstep. After all finish, reducers merge their updates, state is checkpointed, and the next superstep begins.

Send API — the mechanism for dynamic fan-out. Instead of wiring static edges, Send lets you spawn one parallel branch per item in a list at runtime — you don't know the count when you build the graph.


Subgraphs — Modular Agent Components

The simplest way to think about subgraphs: a subgraph is an agent you've packaged as a reusable component. You build it, compile it, and plug it into a parent graph as a single node. The parent graph doesn't need to know what's inside it.

Why this matters: In a real company, different teams build different agents. The research team builds the research agent. The coding team builds the code agent. Each compiles their own subgraph. A top-level graph wires them together. Nobody needs to touch anyone else's code.

There are two ways to connect a subgraph to a parent:

Pattern A — shared state keys (the common case):

When the parent and subgraph share at least one state key, you can add the compiled subgraph directly as a node. Updates to shared keys automatically propagate between them.

Python
from typing import Annotated, TypedDict from langgraph.graph import StateGraph, START, END from langgraph.graph.message import add_messages # Parent state class ParentState(TypedDict): messages: Annotated[list, add_messages] topic: str # Subgraph state — shares 'topic' with parent class ResearchState(TypedDict): topic: str # shared — receives from parent findings: str # private to subgraph def search_web(state: ResearchState): return {"findings": f"Web results for: {state['topic']}"} def summarize(state: ResearchState): return {"topic": f"Summary: {state['findings']}"} # writes back to shared key # Build the subgraph sub = StateGraph(ResearchState) sub.add_node("search", search_web) sub.add_node("summarize", summarize) sub.add_edge(START, "search") sub.add_edge("search", "summarize") sub.add_edge("summarize", END) research_subgraph = sub.compile() # Embed in parent — the compiled subgraph IS the node parent = StateGraph(ParentState) parent.add_node("research", research_subgraph) # subgraph as node parent.add_edge(START, "research") parent.add_edge("research", END) graph = parent.compile()

Pattern B — different schemas (explicit transformation):

When parent and subgraph don't share state keys, you manually transform state going in and coming out:

Python
def call_research_agent(state: ParentState): # Transform parent state → subgraph input result = research_subgraph.invoke({"topic": state["topic"]}) # Transform subgraph output → parent state update return {"messages": [{"role": "assistant", "content": result["topic"]}]} parent.add_node("research", call_research_agent)

Use Pattern A when schemas are compatible — less boilerplate. Use Pattern B when you need full control over what goes in and what comes out.


The Command Type — How Agents Hand Off

Before Command (released December 2024), handoffs between agents required conditional edges wired at graph-build time. You had to declare every possible routing path upfront. Command changed this: now a node can decide at runtime both where to go next AND what state update to apply — in a single return value.

Python
from langgraph.types import Command from typing import Literal def research_agent(state: MessagesState) -> Command[Literal["writer", "fact_checker"]]: response = model.invoke(state["messages"]) # Agent decides where to go based on what it found if "needs verification" in response.content: next_agent = "fact_checker" else: next_agent = "writer" return Command( update={"messages": [response]}, # state update goto=next_agent # routing decision )

The type hint Command[Literal["writer", "fact_checker"]] is important — it tells LangGraph which nodes this agent can route to, which preserves graph visualization and catches routing bugs early.

Crossing subgraph boundaries: When an agent inside a subgraph needs to hand off to a node in the parent graph, use Command.PARENT:

Python
def node_inside_subgraph(state) -> Command: return Command( goto="parent_agent", update={"messages": [result]}, graph=Command.PARENT # navigate up to parent graph )

Command vs conditional edges: Use Command when you need to both update state AND route in the same step — the canonical handoff case. Use conditional_edges for pure routing without state changes.


The Supervisor Pattern

The supervisor pattern is the most common multi-agent architecture in production. One agent — the supervisor — sits in the center. It receives every message, decides which specialist to invoke, and synthesizes the results.

User message → [Supervisor]
                    ↓ routes to
         [Research]  [Coder]  [Writer]
                    ↓ results return to
               [Supervisor]
                    ↓
               Final answer

Using langgraph-supervisor (the recommended approach):

The langgraph-supervisor library (released February 2025) builds this entire pattern declaratively — no manual Command wiring, no conditional edges to define.

Python
pip install langgraph-supervisor
Python
from langchain_anthropic import ChatAnthropic # Note: create_react_agent moved from langgraph.prebuilt → langchain.agents in LangGraph v1 from langchain.agents import create_react_agent # LangChain >= 1.0 from langgraph_supervisor import create_supervisor model = ChatAnthropic(model="claude-opus-4-6") # Define worker agents — each is a full ReAct agent research_agent = create_react_agent( model=model, tools=[search_web, fetch_url], name="researcher", prompt="You are a research specialist. Find accurate, up-to-date information." ) writer_agent = create_react_agent( model=model, tools=[], name="writer", prompt="You are a writing specialist. Produce clear, well-structured content." ) fact_checker = create_react_agent( model=ChatAnthropic(model="claude-haiku-4-5-20251001"), # cheaper model for this task tools=[search_web], name="fact_checker", prompt="You verify claims. Cross-check key facts and flag any inaccuracies." ) # Create the supervisor — it orchestrates all workers workflow = create_supervisor( agents=[research_agent, writer_agent, fact_checker], model=model, prompt=( "You coordinate a content team. " "Use researcher for gathering information. " "Use writer to produce the final content. " "Use fact_checker to verify important claims before finalizing." ) ) app = workflow.compile() result = app.invoke({ "messages": [{"role": "user", "content": "Write a report on LangGraph's production use cases"}] })

Multi-level hierarchies: Supervisors can supervise other supervisors. A research team and a writing team, each with their own supervisor, can themselves be supervised by a top-level coordinator:

Python
research_team = create_supervisor( [web_researcher, academic_researcher], model=model, supervisor_name="research_supervisor" ).compile(name="research_team") writing_team = create_supervisor( [writer, editor], model=model, supervisor_name="writing_supervisor" ).compile(name="writing_team") # Top-level supervisor coordinates both teams top_level = create_supervisor( [research_team, writing_team], model=model, prompt="Coordinate the research and writing teams to produce excellent content." ).compile()

Production insight from LangChain benchmarks: Setting output_mode="last_message" instead of the default "full_history" cut token costs significantly — the supervisor doesn't need every message from every worker conversation, just the final result.


Parallel Agent Execution

Running agents in parallel is one of the biggest performance wins in multi-agent systems. Two approaches:

Static parallelism — multiple edges from one node:

If a node has multiple outgoing edges, all destination nodes run simultaneously in the same superstep:

Python
builder = StateGraph(State) builder.add_node("start", kickoff_node) builder.add_node("web_search", web_agent) builder.add_node("academic_search", academic_agent) builder.add_node("synthesize", synthesis_agent) builder.add_edge(START, "start") builder.add_edge("start", "web_search") # both run builder.add_edge("start", "academic_search") # in parallel builder.add_edge("web_search", "synthesize") builder.add_edge("academic_search", "synthesize") # synthesize waits until BOTH finish — LangGraph handles this automatically

Real benchmark: 4 seconds sequential → 2 seconds parallel on actual API calls.

Dynamic parallelism — the Send API:

When you don't know how many parallel branches you need until runtime, Send creates them dynamically:

Python
from langgraph.types import Send from typing import Annotated from operator import add class ResearchState(TypedDict): topics: list[str] reports: Annotated[list[str], add] # add reducer accumulates results def spawn_researchers(state: ResearchState): # One parallel branch per topic — count determined at runtime return [Send("research_topic", {"topic": t}) for t in state["topics"]] def research_topic(state: dict) -> dict: topic = state["topic"] # Each runs independently and simultaneously return {"reports": [f"Findings on {topic}: ..."]} builder = StateGraph(ResearchState) builder.add_node("research_topic", research_topic) builder.add_conditional_edges("kickoff", spawn_researchers)

If state["topics"] has 5 items, 5 research_topic nodes run simultaneously. The add reducer accumulates all their results into reports. This is how Exa processes hundreds of research queries simultaneously.

Controlling concurrency: parallel execution burns API quota fast:

Python
graph = builder.compile(max_concurrency=5) # throttle parallel calls

The Swarm Pattern

The swarm is the opposite of the supervisor. Instead of a central coordinator routing everything, agents hand off directly to each other — peer to peer. No boss. The system tracks which agent was last active and resumes from it on the next turn.

Python
pip install langgraph-swarm
Python
from langgraph.checkpoint.memory import InMemorySaver from langgraph_swarm import create_handoff_tool, create_swarm from langchain.agents import create_react_agent # LangGraph v1+ model = ChatAnthropic(model="claude-opus-4-6") # Each agent gets handoff tools pointing to the other agents researcher = create_react_agent( model=model, tools=[ search_web, create_handoff_tool( agent_name="writer", description="Transfer to writer when research is complete" ), ], name="researcher", prompt="You research topics thoroughly. When done, hand off to the writer." ) writer = create_react_agent( model=model, tools=[ create_handoff_tool( agent_name="researcher", description="Transfer back to researcher if you need more information" ), ], name="writer", prompt="You write clear reports. Request more research if needed." ) workflow = create_swarm([researcher, writer], default_active_agent="researcher") app = workflow.compile(checkpointer=InMemorySaver()) config = {"configurable": {"thread_id": "report-1"}} result = app.invoke( {"messages": [{"role": "user", "content": "Research and write a report on RAG systems"}]}, config )

Supervisor vs Swarm — when to use which:

The supervisor is better when work can be parallelized and you want structured, auditable control. The swarm is better when the task is inherently conversational and agents need to go back and forth naturally over multiple turns.

SupervisorSwarm
Who routes?Central supervisorEach agent decides
ParallelismYesNo (one active at a time)
Token costHigherLower
AuditabilityEasyHarder
Best forParallel workloads, structured pipelinesMulti-turn conversations, dynamic delegation

Checkpointing in Multi-Agent Systems

Every production multi-agent graph needs checkpointing. Without it, a crash mid-run loses all work. With it, the graph resumes from the last completed node.

Python
from langgraph.checkpoint.postgres import PostgresSaver DB_URI = "postgresql://user:pass@localhost:5432/agents" with PostgresSaver.from_conn_string(DB_URI) as checkpointer: checkpointer.setup() # creates required tables — run once app = workflow.compile(checkpointer=checkpointer) config = {"configurable": {"thread_id": "project-42"}} # First run — starts fresh app.invoke({"messages": [...]}, config) # Second run — resumes from last checkpoint app.invoke({"messages": [...]}, config)

For development: InMemorySaver() — fast, no setup, resets on restart. For production: PostgresSaver or RedisSaver — persistent, survives restarts.

The thread_id is how LangGraph identifies a conversation. Same thread_id = same conversation, resumed from where it stopped. Different thread_id = new conversation.


Human-in-the-Loop

This is the feature that makes LangGraph essential for any system that takes real-world actions. You can pause the graph before any sensitive step, show the pending action to a human, and only proceed if approved.

Python
from langgraph.checkpoint.memory import InMemorySaver from langgraph.types import interrupt def take_action(state: State): # This pauses execution and waits for human input approval = interrupt({ "message": "About to delete 500 records. Approve?", "pending_action": state["planned_action"] }) if approval: return execute_action(state) else: return {"result": "Action cancelled by user"} # Compile with interrupt_before to pause BEFORE a specific node app = workflow.compile( checkpointer=InMemorySaver(), interrupt_before=["take_action"] ) config = {"configurable": {"thread_id": "run-1"}} # Graph runs until it reaches take_action, then pauses app.invoke({"messages": [...]}, config) # Inspect what's about to happen state = app.get_state(config) print(state.values) # see the pending action # Resume with human's decision from langgraph.types import Command app.invoke(Command(resume=True), config) # approved # app.invoke(Command(resume=False), config) # denied

The graph saves state at the interrupt point. The human can take minutes or hours to review. When they respond, the graph picks up exactly where it paused — no context lost.


Where Things Go Wrong

Subgraph state doesn't propagate. If a subgraph writes to a key that's also in the parent, but the parent state doesn't define a reducer for that key, updates from the subgraph silently get dropped. Always check that shared keys have appropriate reducers.

Command adds routing but doesn't replace static edges. If you have both a Command(goto="node_b") and a static add_edge("node_a", "node_c"), both execute. Command doesn't cancel existing edges. In a multi-agent graph this can trigger unintended double-execution.

Parallel agents exhausting rate limits. Send spawning 20 parallel agents all calling Claude simultaneously will hit rate limits immediately. Always set max_concurrency and test with realistic API quotas before running parallel workloads.

Supervisor context bloat. The supervisor sees every worker message by default. In a 5-worker system with 10 turns each, the supervisor's context becomes enormous. Use output_mode="last_message" and consider compacting intermediate results before they hit the supervisor.

Forgetting thread_id for checkpointing. If you compile with a checkpointer but don't pass a thread_id in config, LangGraph can't identify which conversation to checkpoint. Always pass {"configurable": {"thread_id": "..."}} when using persistence.


Sources