Module 3.2
LangGraph Multi-Agent
What Is LangGraph Multi-Agent?
In Module 2.6, you learned LangGraph as a framework for building a single agent as a graph — one model, one set of tools, one state object. Multi-agent LangGraph takes that same idea and scales it up: now the nodes in your graph aren't just "call the model" and "run a tool" — they're entire agents, each with their own context, tools, and model.
The core idea: instead of one Claude doing everything, you build a graph where the nodes are specialized agents. A research node that searches the web. A coding node that writes and runs code. A reviewer node that checks quality. They all share a state object, and the graph controls who runs when.
This is the same LangGraph you already know — the difference is just what you put inside the nodes.
Real-World Use Cases
Elastic's threat detection system. When a security alert fires, a routing node decides which specialist agents to dispatch. A web reputation agent, a log analysis agent, and a threat database agent all run simultaneously. Their findings flow into a synthesis agent that scores the analysis quality (0 to 1). If the score is below 0.8, the graph loops back and retries. Once quality passes the threshold, the result is finalized. The entire loop — parallel execution, quality scoring, conditional retry — is a LangGraph multi-agent graph.
LinkedIn's SQL Bot. Natural language question → find the right tables → write the query → execute → if error, diagnose and fix → retry. The "diagnose and fix" step is a loop back to an earlier node. You can't do that in a straight chain. LangGraph is why this works.
Exa's web research system. A Planner agent generates a list of parallel research tasks. Each task runs as an independent agent simultaneously using the Send API. An Observer agent tracks all citations. The key engineering insight their team shared: "individual tasks only receive the final cleaned outputs from other tasks, not intermediate reasoning states" — that conscious decision to pass less information between agents cut their token costs dramatically and improved output quality.
Key Terms for This Module
Subgraph — a complete compiled LangGraph graph used as a single node inside a larger (parent) graph. Lets you build modular, reusable agent components that teams can develop independently.
Command — a special return type from a node that does two things at once: updates the graph state AND specifies which node runs next. Released December 2024. The core tool for agent handoffs in LangGraph.
Supervisor — a special orchestrator node that routes incoming tasks to the right worker agent and synthesizes their results. The langgraph-supervisor library (released February 2025) builds this pattern declaratively.
Swarm — a decentralized pattern where agents hand off directly to each other without a central supervisor. The langgraph-swarm library implements this in LangGraph.
Superstep — LangGraph's execution unit. All nodes that receive inputs in one round run concurrently within a superstep. After all finish, reducers merge their updates, state is checkpointed, and the next superstep begins.
Send API — the mechanism for dynamic fan-out. Instead of wiring static edges, Send lets you spawn one parallel branch per item in a list at runtime — you don't know the count when you build the graph.
Subgraphs — Modular Agent Components
The simplest way to think about subgraphs: a subgraph is an agent you've packaged as a reusable component. You build it, compile it, and plug it into a parent graph as a single node. The parent graph doesn't need to know what's inside it.
Why this matters: In a real company, different teams build different agents. The research team builds the research agent. The coding team builds the code agent. Each compiles their own subgraph. A top-level graph wires them together. Nobody needs to touch anyone else's code.
There are two ways to connect a subgraph to a parent:
Pattern A — shared state keys (the common case):
When the parent and subgraph share at least one state key, you can add the compiled subgraph directly as a node. Updates to shared keys automatically propagate between them.
from typing import Annotated, TypedDict
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
# Parent state
class ParentState(TypedDict):
messages: Annotated[list, add_messages]
topic: str
# Subgraph state — shares 'topic' with parent
class ResearchState(TypedDict):
topic: str # shared — receives from parent
findings: str # private to subgraph
def search_web(state: ResearchState):
return {"findings": f"Web results for: {state['topic']}"}
def summarize(state: ResearchState):
return {"topic": f"Summary: {state['findings']}"} # writes back to shared key
# Build the subgraph
sub = StateGraph(ResearchState)
sub.add_node("search", search_web)
sub.add_node("summarize", summarize)
sub.add_edge(START, "search")
sub.add_edge("search", "summarize")
sub.add_edge("summarize", END)
research_subgraph = sub.compile()
# Embed in parent — the compiled subgraph IS the node
parent = StateGraph(ParentState)
parent.add_node("research", research_subgraph) # subgraph as node
parent.add_edge(START, "research")
parent.add_edge("research", END)
graph = parent.compile()Pattern B — different schemas (explicit transformation):
When parent and subgraph don't share state keys, you manually transform state going in and coming out:
def call_research_agent(state: ParentState):
# Transform parent state → subgraph input
result = research_subgraph.invoke({"topic": state["topic"]})
# Transform subgraph output → parent state update
return {"messages": [{"role": "assistant", "content": result["topic"]}]}
parent.add_node("research", call_research_agent)Use Pattern A when schemas are compatible — less boilerplate. Use Pattern B when you need full control over what goes in and what comes out.
The Command Type — How Agents Hand Off
Before Command (released December 2024), handoffs between agents required conditional edges wired at graph-build time. You had to declare every possible routing path upfront. Command changed this: now a node can decide at runtime both where to go next AND what state update to apply — in a single return value.
from langgraph.types import Command
from typing import Literal
def research_agent(state: MessagesState) -> Command[Literal["writer", "fact_checker"]]:
response = model.invoke(state["messages"])
# Agent decides where to go based on what it found
if "needs verification" in response.content:
next_agent = "fact_checker"
else:
next_agent = "writer"
return Command(
update={"messages": [response]}, # state update
goto=next_agent # routing decision
)The type hint Command[Literal["writer", "fact_checker"]] is important — it tells LangGraph which nodes this agent can route to, which preserves graph visualization and catches routing bugs early.
Crossing subgraph boundaries: When an agent inside a subgraph needs to hand off to a node in the parent graph, use Command.PARENT:
def node_inside_subgraph(state) -> Command:
return Command(
goto="parent_agent",
update={"messages": [result]},
graph=Command.PARENT # navigate up to parent graph
)Command vs conditional edges: Use Command when you need to both update state AND route in the same step — the canonical handoff case. Use conditional_edges for pure routing without state changes.
The Supervisor Pattern
The supervisor pattern is the most common multi-agent architecture in production. One agent — the supervisor — sits in the center. It receives every message, decides which specialist to invoke, and synthesizes the results.
User message → [Supervisor]
↓ routes to
[Research] [Coder] [Writer]
↓ results return to
[Supervisor]
↓
Final answer
Using langgraph-supervisor (the recommended approach):
The langgraph-supervisor library (released February 2025) builds this entire pattern declaratively — no manual Command wiring, no conditional edges to define.
pip install langgraph-supervisorfrom langchain_anthropic import ChatAnthropic
# Note: create_react_agent moved from langgraph.prebuilt → langchain.agents in LangGraph v1
from langchain.agents import create_react_agent # LangChain >= 1.0
from langgraph_supervisor import create_supervisor
model = ChatAnthropic(model="claude-opus-4-6")
# Define worker agents — each is a full ReAct agent
research_agent = create_react_agent(
model=model,
tools=[search_web, fetch_url],
name="researcher",
prompt="You are a research specialist. Find accurate, up-to-date information."
)
writer_agent = create_react_agent(
model=model,
tools=[],
name="writer",
prompt="You are a writing specialist. Produce clear, well-structured content."
)
fact_checker = create_react_agent(
model=ChatAnthropic(model="claude-haiku-4-5-20251001"), # cheaper model for this task
tools=[search_web],
name="fact_checker",
prompt="You verify claims. Cross-check key facts and flag any inaccuracies."
)
# Create the supervisor — it orchestrates all workers
workflow = create_supervisor(
agents=[research_agent, writer_agent, fact_checker],
model=model,
prompt=(
"You coordinate a content team. "
"Use researcher for gathering information. "
"Use writer to produce the final content. "
"Use fact_checker to verify important claims before finalizing."
)
)
app = workflow.compile()
result = app.invoke({
"messages": [{"role": "user", "content": "Write a report on LangGraph's production use cases"}]
})Multi-level hierarchies: Supervisors can supervise other supervisors. A research team and a writing team, each with their own supervisor, can themselves be supervised by a top-level coordinator:
research_team = create_supervisor(
[web_researcher, academic_researcher],
model=model,
supervisor_name="research_supervisor"
).compile(name="research_team")
writing_team = create_supervisor(
[writer, editor],
model=model,
supervisor_name="writing_supervisor"
).compile(name="writing_team")
# Top-level supervisor coordinates both teams
top_level = create_supervisor(
[research_team, writing_team],
model=model,
prompt="Coordinate the research and writing teams to produce excellent content."
).compile()Production insight from LangChain benchmarks: Setting output_mode="last_message" instead of the default "full_history" cut token costs significantly — the supervisor doesn't need every message from every worker conversation, just the final result.
Parallel Agent Execution
Running agents in parallel is one of the biggest performance wins in multi-agent systems. Two approaches:
Static parallelism — multiple edges from one node:
If a node has multiple outgoing edges, all destination nodes run simultaneously in the same superstep:
builder = StateGraph(State)
builder.add_node("start", kickoff_node)
builder.add_node("web_search", web_agent)
builder.add_node("academic_search", academic_agent)
builder.add_node("synthesize", synthesis_agent)
builder.add_edge(START, "start")
builder.add_edge("start", "web_search") # both run
builder.add_edge("start", "academic_search") # in parallel
builder.add_edge("web_search", "synthesize")
builder.add_edge("academic_search", "synthesize")
# synthesize waits until BOTH finish — LangGraph handles this automaticallyReal benchmark: 4 seconds sequential → 2 seconds parallel on actual API calls.
Dynamic parallelism — the Send API:
When you don't know how many parallel branches you need until runtime, Send creates them dynamically:
from langgraph.types import Send
from typing import Annotated
from operator import add
class ResearchState(TypedDict):
topics: list[str]
reports: Annotated[list[str], add] # add reducer accumulates results
def spawn_researchers(state: ResearchState):
# One parallel branch per topic — count determined at runtime
return [Send("research_topic", {"topic": t}) for t in state["topics"]]
def research_topic(state: dict) -> dict:
topic = state["topic"]
# Each runs independently and simultaneously
return {"reports": [f"Findings on {topic}: ..."]}
builder = StateGraph(ResearchState)
builder.add_node("research_topic", research_topic)
builder.add_conditional_edges("kickoff", spawn_researchers)If state["topics"] has 5 items, 5 research_topic nodes run simultaneously. The add reducer accumulates all their results into reports. This is how Exa processes hundreds of research queries simultaneously.
Controlling concurrency: parallel execution burns API quota fast:
graph = builder.compile(max_concurrency=5) # throttle parallel callsThe Swarm Pattern
The swarm is the opposite of the supervisor. Instead of a central coordinator routing everything, agents hand off directly to each other — peer to peer. No boss. The system tracks which agent was last active and resumes from it on the next turn.
pip install langgraph-swarmfrom langgraph.checkpoint.memory import InMemorySaver
from langgraph_swarm import create_handoff_tool, create_swarm
from langchain.agents import create_react_agent # LangGraph v1+
model = ChatAnthropic(model="claude-opus-4-6")
# Each agent gets handoff tools pointing to the other agents
researcher = create_react_agent(
model=model,
tools=[
search_web,
create_handoff_tool(
agent_name="writer",
description="Transfer to writer when research is complete"
),
],
name="researcher",
prompt="You research topics thoroughly. When done, hand off to the writer."
)
writer = create_react_agent(
model=model,
tools=[
create_handoff_tool(
agent_name="researcher",
description="Transfer back to researcher if you need more information"
),
],
name="writer",
prompt="You write clear reports. Request more research if needed."
)
workflow = create_swarm([researcher, writer], default_active_agent="researcher")
app = workflow.compile(checkpointer=InMemorySaver())
config = {"configurable": {"thread_id": "report-1"}}
result = app.invoke(
{"messages": [{"role": "user", "content": "Research and write a report on RAG systems"}]},
config
)Supervisor vs Swarm — when to use which:
The supervisor is better when work can be parallelized and you want structured, auditable control. The swarm is better when the task is inherently conversational and agents need to go back and forth naturally over multiple turns.
| Supervisor | Swarm | |
|---|---|---|
| Who routes? | Central supervisor | Each agent decides |
| Parallelism | Yes | No (one active at a time) |
| Token cost | Higher | Lower |
| Auditability | Easy | Harder |
| Best for | Parallel workloads, structured pipelines | Multi-turn conversations, dynamic delegation |
Checkpointing in Multi-Agent Systems
Every production multi-agent graph needs checkpointing. Without it, a crash mid-run loses all work. With it, the graph resumes from the last completed node.
from langgraph.checkpoint.postgres import PostgresSaver
DB_URI = "postgresql://user:pass@localhost:5432/agents"
with PostgresSaver.from_conn_string(DB_URI) as checkpointer:
checkpointer.setup() # creates required tables — run once
app = workflow.compile(checkpointer=checkpointer)
config = {"configurable": {"thread_id": "project-42"}}
# First run — starts fresh
app.invoke({"messages": [...]}, config)
# Second run — resumes from last checkpoint
app.invoke({"messages": [...]}, config)For development: InMemorySaver() — fast, no setup, resets on restart.
For production: PostgresSaver or RedisSaver — persistent, survives restarts.
The thread_id is how LangGraph identifies a conversation. Same thread_id = same conversation, resumed from where it stopped. Different thread_id = new conversation.
Human-in-the-Loop
This is the feature that makes LangGraph essential for any system that takes real-world actions. You can pause the graph before any sensitive step, show the pending action to a human, and only proceed if approved.
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.types import interrupt
def take_action(state: State):
# This pauses execution and waits for human input
approval = interrupt({
"message": "About to delete 500 records. Approve?",
"pending_action": state["planned_action"]
})
if approval:
return execute_action(state)
else:
return {"result": "Action cancelled by user"}
# Compile with interrupt_before to pause BEFORE a specific node
app = workflow.compile(
checkpointer=InMemorySaver(),
interrupt_before=["take_action"]
)
config = {"configurable": {"thread_id": "run-1"}}
# Graph runs until it reaches take_action, then pauses
app.invoke({"messages": [...]}, config)
# Inspect what's about to happen
state = app.get_state(config)
print(state.values) # see the pending action
# Resume with human's decision
from langgraph.types import Command
app.invoke(Command(resume=True), config) # approved
# app.invoke(Command(resume=False), config) # deniedThe graph saves state at the interrupt point. The human can take minutes or hours to review. When they respond, the graph picks up exactly where it paused — no context lost.
Where Things Go Wrong
Subgraph state doesn't propagate. If a subgraph writes to a key that's also in the parent, but the parent state doesn't define a reducer for that key, updates from the subgraph silently get dropped. Always check that shared keys have appropriate reducers.
Command adds routing but doesn't replace static edges. If you have both a Command(goto="node_b") and a static add_edge("node_a", "node_c"), both execute. Command doesn't cancel existing edges. In a multi-agent graph this can trigger unintended double-execution.
Parallel agents exhausting rate limits. Send spawning 20 parallel agents all calling Claude simultaneously will hit rate limits immediately. Always set max_concurrency and test with realistic API quotas before running parallel workloads.
Supervisor context bloat. The supervisor sees every worker message by default. In a 5-worker system with 10 turns each, the supervisor's context becomes enormous. Use output_mode="last_message" and consider compacting intermediate results before they hit the supervisor.
Forgetting thread_id for checkpointing. If you compile with a checkpointer but don't pass a thread_id in config, LangGraph can't identify which conversation to checkpoint. Always pass {"configurable": {"thread_id": "..."}} when using persistence.
Sources
- LangGraph Graph API Docs
- langgraph-supervisor GitHub
- langgraph-swarm GitHub
- Command — LangChain Blog
- Benchmarking Multi-Agent Architectures — LangChain
- Elastic Multi-Agent with LangGraph
- Top 5 LangGraph Agents in Production 2024