LangGraph4j 完全指南
用图驱动的方式构建 AI Agent
当单一的 LLM 调用已经不够用——你需要多步推理、条件分支、循环重试、多 Agent 协作时,LangGraph4j 就是你的答案。它让你用「画流程图」的方式来编排 AI 应用,每个节点是一个动作,每条边是一个决策。
← 第一篇:LangChain4j 完全指南一、开篇概览
LangGraph4j 是一个 Java 库,用于构建有状态的、多 Agent 的 AI 应用。它的核心思想是:用有向图(Graph)来编排 AI 工作流——节点(Node)执行动作,边(Edge)控制流转,状态(State)在节点间共享和传递。
为什么需要「图」?
在第一篇教程中,我们学了 LangChain4j 的 AI Services——它非常适合"问一个问题,得到一个回答"的场景。但现实中的 AI 应用往往更复杂:
简单场景(LangChain4j 就够了)
- 单轮问答
- 文本翻译 / 摘要
- 简单的 Tool 调用
- 线性的 RAG 流程
复杂场景(需要 LangGraph4j)
- AI 需要"想一想、做一做、再想想"(循环)
- 多个 Agent 互相协作 / 交接
- 需要人类审批才能继续的工作流
- 出错时自动重试或走备选路径
- 长时间运行的任务(需要持久化状态)
学完本教程,你将掌握
✅ 本节小结
- LangGraph4j 用图来编排复杂的 AI 工作流
- 支持循环、条件分支、多 Agent——这些是线性调用做不到的
- 与 LangChain4j / Spring AI 无缝集成
二、环境搭建
Maven 3.8+ 或 Gradle 7+
建议先完成第一篇教程的 LangChain4j 环境搭建
Maven 依赖
<properties>
<langgraph4j.version>1.8.5</langgraph4j.version>
</properties>
<!-- 使用 BOM 管理版本 -->
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.bsc.langgraph4j</groupId>
<artifactId>langgraph4j-bom</artifactId>
<version>${langgraph4j.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<!-- 核心库 -->
<dependency>
<groupId>org.bsc.langgraph4j</groupId>
<artifactId>langgraph4j-core</artifactId>
</dependency>
<!-- LangChain4j 集成(可选,用于连接 LLM) -->
<dependency>
<groupId>org.bsc.langgraph4j</groupId>
<artifactId>langgraph4j-langchain4j</artifactId>
</dependency>
<!-- Agent Executor(可选,内置 ReACT Agent) -->
<dependency>
<groupId>org.bsc.langgraph4j</groupId>
<artifactId>langgraph4j-agent-executor</artifactId>
</dependency>
</dependencies>
Gradle 依赖
dependencies {
implementation platform('org.bsc.langgraph4j:langgraph4j-bom:1.8.5')
implementation 'org.bsc.langgraph4j:langgraph4j-core'
implementation 'org.bsc.langgraph4j:langgraph4j-langchain4j'
}
✅ 本节小结
- 核心依赖是
langgraph4j-core - 连接 LLM 需要
langgraph4j-langchain4j或langgraph4j-spring-ai - Maven groupId 是
org.bsc.langgraph4j
三、核心概念
LangGraph4j 的世界观很简单:一切皆图。让我们用一个生活化的类比来理解。
AgentState — 工单
AgentState 是在整个图中流转的共享状态。本质上它是一个 Map<String, Object>,每个节点都可以读取和更新它。状态的结构通过Schema(一组 Channel)来定义。
import org.bsc.langgraph4j.state.AgentState;
import org.bsc.langgraph4j.state.Channel;
import org.bsc.langgraph4j.state.Channels;
class MyState extends AgentState {
// 定义状态的结构(Schema)
public static final Map<String, Channel<?>> SCHEMA = Map.of(
// "messages" 字段:追加模式,新消息会加入列表
"messages", Channels.appender(ArrayList::new),
// "currentStep" 字段:覆盖模式,新值直接替换旧值
"currentStep", Channels.base(() -> "init")
);
public MyState(Map<String, Object> initData) {
super(initData);
}
// 类型安全的访问方法
public List<String> messages() {
return this.<List<String>>value("messages").orElse(List.of());
}
public String currentStep() {
return this.<String>value("currentStep").orElse("init");
}
}
Node — 工位上的工人
节点是图中执行具体操作的地方。每个节点接收当前状态作为输入,返回一个 Map<String, Object> 作为状态更新。
import org.bsc.langgraph4j.action.NodeAction;
// 方式 1:实现 NodeAction 接口
class GreeterNode implements NodeAction<MyState> {
@Override
public Map<String, Object> apply(MyState state) {
return Map.of("messages", "你好!欢迎来到 LangGraph4j");
}
}
// 方式 2:用 Lambda 表达式(更简洁)
NodeAction<MyState> greeter = state ->
Map.of("messages", "你好!欢迎来到 LangGraph4j");
Edge — 传送带与质检员
LangGraph4j 有三种边:
| 边类型 | 类比 | 作用 |
|---|---|---|
普通边 addEdge(A, B) | 固定传送带 | A 完成后无条件去 B |
条件边 addConditionalEdges(...) | 质检员分拣 | 根据状态动态决定下一个节点 |
条件入口 addConditionalEntryPoint(...) | 分类入口 | 根据初始状态决定从哪个节点开始 |
编译与执行
定义好图之后,需要编译(compile)成一个不可变的 CompiledGraph 才能执行。编译过程会校验图结构(比如检查孤立节点)。
✅ 本节小结
- AgentState:图中流转的共享状态(Map 封装)
- Node:执行操作并返回状态更新的函数
- Edge:控制节点间的流转方向
- compile():将图定义编译为可执行的 CompiledGraph
四、第一个图:Hello Graph
场景:构建一个最简单的两节点图——greeter 打招呼,responder 回应,感受图的基本运作。
import org.bsc.langgraph4j.StateGraph;
import org.bsc.langgraph4j.state.*;
import static org.bsc.langgraph4j.StateGraph.START;
import static org.bsc.langgraph4j.StateGraph.END;
import static org.bsc.langgraph4j.action.AsyncNodeAction.node_async;
import java.util.*;
// 1. 定义状态
class SimpleState extends AgentState {
static final Map<String, Channel<?>> SCHEMA = Map.of(
"messages", Channels.appender(ArrayList::new)
);
public SimpleState(Map<String, Object> initData) { super(initData); }
public List<String> messages() {
return this.<List<String>>value("messages").orElse(List.of());
}
}
// 2. 定义并编译图
var graph = new StateGraph<>(SimpleState.SCHEMA, SimpleState::new)
.addNode("greeter", node_async(state -> {
System.out.println("Greeter 收到: " + state.messages());
return Map.of("messages", "你好,朋友!");
}))
.addNode("responder", node_async(state -> {
System.out.println("Responder 收到: " + state.messages());
return Map.of("messages", "很高兴见到你!");
}))
.addEdge(START, "greeter") // 起点 → greeter
.addEdge("greeter", "responder") // greeter → responder
.addEdge("responder", END) // responder → 终点
.compile();
// 3. 执行图
for (var step : graph.stream(Map.of("messages", "开始对话"))) {
System.out.println("步骤: " + step.node());
System.out.println("状态: " + step.state().messages());
System.out.println("---");
}
输出效果:
Greeter 收到: [开始对话]
步骤: greeter
状态: [开始对话, 你好,朋友!]
---
Responder 收到: [开始对话, 你好,朋友!]
步骤: responder
状态: [开始对话, 你好,朋友!, 很高兴见到你!]
---
messages 是追加的(因为用了 Channels.appender())。每个节点返回的消息被自动追加到列表中,而不是覆盖。这就是 Channel/Reducer 的作用。
✅ 本节小结
- 用
new StateGraph<>(schema, constructor)创建图 - 用
addNode()添加节点,addEdge()连接节点 START和END是内置的起止标记stream()返回每一步的执行结果
五、条件边与路由
场景:AI 根据用户输入判断意图——是闲聊还是查询订单?不同意图走不同的处理路径。这就是条件边的用武之地。
// 定义状态
class RouterState extends AgentState {
static final Map<String, Channel<?>> SCHEMA = Map.of(
"input", Channels.base(() -> ""),
"intent", Channels.base(() -> ""),
"output", Channels.base(() -> "")
);
public RouterState(Map<String, Object> d) { super(d); }
public String input() { return this.<String>value("input").orElse(""); }
public String intent() { return this.<String>value("intent").orElse(""); }
}
var graph = new StateGraph<>(RouterState.SCHEMA, RouterState::new)
// 意图识别节点
.addNode("classifier", node_async(state -> {
String input = state.input().toLowerCase();
String intent = input.contains("订单") ? "order" : "chat";
return Map.of("intent", intent);
}))
// 闲聊处理
.addNode("chatHandler", node_async(state ->
Map.of("output", "这是一个闲聊回复:" + state.input())
))
// 订单处理
.addNode("orderHandler", node_async(state ->
Map.of("output", "正在查询您的订单信息...")
))
// 边:起点 → 分类器
.addEdge(START, "classifier")
// 条件边:根据 intent 路由
.addConditionalEdges("classifier",
state -> state.intent(), // 路由函数:返回下一个节点名
Map.of(
"chat", "chatHandler", // intent=="chat" → chatHandler
"order", "orderHandler" // intent=="order" → orderHandler
)
)
// 两个处理器都通向终点
.addEdge("chatHandler", END)
.addEdge("orderHandler", END)
.compile();
// 测试
for (var step : graph.stream(Map.of("input", "帮我查一下订单状态"))) {
System.out.println(step.node() + " → " + step.state());
}
addConditionalEdges 的第三个参数是一个 Map,key 是路由函数可能返回的值,value 是对应的节点名。如果路由函数返回了 Map 里没有的值,会抛出异常。别忘了终止边:条件边只定义了"从 A 到哪",你仍然需要为后续节点添加到
END 的边,否则图不完整。
✅ 本节小结
addConditionalEdges(source, routingFn, routeMap)实现动态路由- 路由函数接收当前状态,返回一个字符串标识下一个节点
- 这是构建"会思考"的 AI 应用的关键能力
六、Channel 与状态管理
场景:不同的状态字段需要不同的更新策略——有的要追加(如消息列表),有的要覆盖(如当前步骤),有的需要自定义合并逻辑。
LangGraph4j 通过 Channel 来定义每个状态字段的更新策略(Reducer):
| Channel 类型 | 行为 | 适用场景 |
|---|---|---|
Channels.base(() -> default) | 新值覆盖旧值 | 当前步骤、最新结果等 |
Channels.appender(ArrayList::new) | 新值追加到列表 | 消息历史、日志等 |
// 混合使用不同的 Channel 策略
class WorkflowState extends AgentState {
static final Map<String, Channel<?>> SCHEMA = Map.of(
// 覆盖模式:只保留最新值
"status", Channels.base(() -> "pending"),
"result", Channels.base(() -> ""),
// 追加模式:所有值都保留
"logs", Channels.appender(ArrayList::new),
"errors", Channels.appender(ArrayList::new)
);
public WorkflowState(Map<String, Object> d) { super(d); }
public String status() {
return this.<String>value("status").orElse("pending");
}
public List<String> logs() {
return this.<List<String>>value("logs").orElse(List.of());
}
}
// 节点更新状态
var processNode = node_async((WorkflowState state) -> Map.of(
"status", "processing", // 覆盖 → "processing"
"logs", "开始处理任务", // 追加到 logs 列表
"logs", "读取输入数据完成" // 再追加一条
));
✅ 本节小结
Channels.base():覆盖模式,保留最新值Channels.appender():追加模式,累积所有值- Channel 机制是并行执行和正确状态合并的基础
七、Checkpoint 持久化
场景:你的 AI Agent 正在处理一个长时间任务,突然服务重启了——之前的所有进度都丢了。Checkpoint 就像游戏存档,让你随时可以「读档继续」。
import org.bsc.langgraph4j.checkpoint.MemorySaver;
import org.bsc.langgraph4j.CompileConfig;
import org.bsc.langgraph4j.RunnableConfig;
// 1. 创建 CheckpointSaver(内存版,生产环境可用数据库实现)
var saver = new MemorySaver();
// 2. 编译时注入 Saver
var compiledGraph = graph.compile(
CompileConfig.builder()
.checkpointSaver(saver)
.build()
);
// 3. 使用 threadId 隔离不同的执行上下文
var config = RunnableConfig.builder()
.threadId("session-001")
.build();
// 4. 执行图(每一步自动保存检查点)
for (var step : compiledGraph.stream(initialState, config)) {
System.out.println("节点: " + step.node());
}
// 5. 下次可以从最后的检查点恢复执行
// (使用相同的 threadId)
查看和回溯历史状态(Time Travel)
Checkpoint 不仅能恢复,还能「时间旅行」——查看图在每一步的完整状态:
// 使用 streamSnapshots 获取每一步的快照
for (var snapshot : compiledGraph.streamSnapshots(initialState, config)) {
System.out.println("节点: " + snapshot.node());
System.out.println("状态: " + snapshot.state());
System.out.println("配置: " + snapshot.config());
System.out.println("---");
}
✅ 本节小结
MemorySaver提供内存级的检查点存储threadId隔离不同的执行上下文- 支持查看历史状态快照(Time Travel)
八、ReACT Agent 实战
场景:构建一个能"思考→行动→观察→再思考"的 AI Agent。这就是著名的 ReACT(Reasoning + Acting)模式——LLM 决定调用哪个工具,观察结果,再决定下一步。
LLM 思考 → 需要工具?→ 是 → 执行工具 → 把结果反馈给 LLM → LLM 再思考 → 还需要工具?→ 否 → 输出最终回答
这正是 LangGraph4j 擅长的——循环图,线性流程做不到。
LangGraph4j 内置了 AgentExecutor,直接帮你搭建好了 ReACT 图:
import org.bsc.langgraph4j.agentexecutor.AgentExecutor;
import dev.langchain4j.model.openai.OpenAiChatModel;
import dev.langchain4j.agent.tool.Tool;
import dev.langchain4j.agent.tool.P;
// 1. 定义工具
public class MyTools {
@Tool("查询指定城市的天气")
String getWeather(@P("城市名") String city) {
return city + ": 晴天,26°C";
}
@Tool("计算数学表达式")
double calculate(@P("数学表达式") String expression) {
// 简化实现
return switch (expression) {
case "26 * 9 / 5 + 32" -> 78.8;
default -> 0;
};
}
}
// 2. 创建 LLM
var chatModel = OpenAiChatModel.builder()
.apiKey(System.getenv("OPENAI_API_KEY"))
.modelName("gpt-4o-mini")
.temperature(0.0)
.build();
// 3. 构建 AgentExecutor(内置 ReACT 图)
var agentGraph = AgentExecutor.builder()
.chatModel(chatModel)
.toolsFromObject(new MyTools())
.build()
.compile();
// 4. 运行
for (var step : agentGraph.stream(
Map.of("messages", "北京天气怎么样?把温度转成华氏度"))) {
System.out.println(step);
}
// AI 会:1) 调用 getWeather("北京") 2) 调用 calculate("26*9/5+32") 3) 组织最终回答
手动构建 ReACT 图(深入理解原理)
如果你想完全掌控 ReACT 的行为,可以手动构建循环图:
var graph = new StateGraph<>(AgentState.SCHEMA, AgentState::new)
// Agent 节点:调用 LLM 决定下一步
.addNode("agent", node_async(state -> {
// 调用 LLM,LLM 可能返回文本或工具调用
var response = chatModel.chat(buildMessages(state));
return Map.of("messages", response.aiMessage());
}))
// 工具执行节点
.addNode("tools", node_async(state -> {
// 执行 LLM 要求的工具调用
var toolResults = executeTools(state);
return Map.of("messages", toolResults);
}))
// 起点 → Agent
.addEdge(START, "agent")
// 条件边:Agent 之后,检查是否需要调用工具
.addConditionalEdges("agent",
state -> {
var lastMessage = getLastMessage(state);
return lastMessage.hasToolCalls() ? "continue" : "end";
},
Map.of(
"continue", "tools", // 需要工具 → 去执行
"end", END // 不需要 → 结束
)
)
// 工具执行完 → 回到 Agent(形成循环!)
.addEdge("tools", "agent")
.compile();
注意最后一行 .addEdge("tools", "agent")——这形成了一个循环,是 ReACT 模式的核心。Agent 不断"思考→行动→观察",直到认为可以给出最终回答。
✅ 本节小结
AgentExecutor是内置的 ReACT Agent 实现- ReACT 模式的核心是 Agent→Tool 的循环图
- 条件边决定"继续调用工具"还是"结束并输出"
九、Human-in-the-Loop
场景:AI 准备执行一个危险操作(如删除数据、发送邮件),你希望先暂停,让人类审批后再继续。
LangGraph4j 通过 interrupt 机制实现人工介入。当图执行到某个节点时可以暂停,等待外部输入后继续。
// 创建需要人工审批的图
var graph = new StateGraph<>(MyState.SCHEMA, MyState::new)
.addNode("prepare", node_async(state -> {
return Map.of(
"action", "delete_user_data",
"status", "pending_approval"
);
}))
.addNode("execute", node_async(state -> {
// 只有审批通过才会到达这里
return Map.of("status", "completed", "logs", "操作已执行");
}))
.addEdge(START, "prepare")
// 条件边:检查审批状态
.addConditionalEdges("prepare",
state -> state.<String>value("approval").orElse("waiting"),
Map.of(
"approved", "execute",
"rejected", END,
"waiting", END // 暂停等待
)
)
.addEdge("execute", END)
.compile(CompileConfig.builder()
.checkpointSaver(new MemorySaver())
.build());
// 第一次执行:到 prepare 后暂停
var config = RunnableConfig.builder().threadId("task-001").build();
graph.stream(Map.of("input", "删除用户数据"), config);
// 人类审批后,用更新状态恢复执行
graph.stream(Map.of("approval", "approved"), config);
信息补充:AI 需要用户提供额外信息才能继续。
质量把关:AI 生成内容后,由人工审核再发布。
✅ 本节小结
- 通过条件边 + Checkpoint 实现图的暂停与恢复
- 使用相同 threadId 可以从上次暂停的地方继续
- Human-in-the-Loop 是生产级 AI 应用的安全保障
十、子图与并行执行
子图(Subgraph)
场景:你的图太复杂了,想把一部分逻辑封装成独立的"子图",就像函数调用一样复用。
// 1. 定义子图(独立的小图)
var subGraph = new StateGraph<>(MyState.SCHEMA, MyState::new)
.addNode("step1", node_async(state ->
Map.of("logs", "子图 Step1 完成")))
.addNode("step2", node_async(state ->
Map.of("logs", "子图 Step2 完成")))
.addEdge(START, "step1")
.addEdge("step1", "step2")
.addEdge("step2", END);
// 2. 在父图中把子图作为节点使用
var parentGraph = new StateGraph<>(MyState.SCHEMA, MyState::new)
.addNode("init", node_async(state ->
Map.of("logs", "父图初始化")))
.addSubgraph("processing", subGraph) // 子图作为节点
.addNode("finish", node_async(state ->
Map.of("logs", "父图完成")))
.addEdge(START, "init")
.addEdge("init", "processing")
.addEdge("processing", "finish")
.addEdge("finish", END)
.compile();
并行执行
场景:你需要同时查询天气和新闻,没必要串行等待。LangGraph4j 支持将多个节点配置为并行执行。
// 从 "start" 出发,同时进入 "weather" 和 "news"
var graph = new StateGraph<>(MyState.SCHEMA, MyState::new)
.addNode("weather", node_async(state -> {
Thread.sleep(1000); // 模拟网络请求
return Map.of("logs", "天气:晴天 25°C");
}))
.addNode("news", node_async(state -> {
Thread.sleep(1000); // 模拟网络请求
return Map.of("logs", "新闻:Java 24 发布");
}))
.addNode("summarize", node_async(state ->
Map.of("logs", "汇总完成: " + state.logs())))
.addEdge(START, "weather") // START 同时连接两个节点
.addEdge(START, "news") // = 并行分支
.addEdge("weather", "summarize")
.addEdge("news", "summarize") // 两个分支汇合
.addEdge("summarize", END)
.compile();
// weather 和 news 会并行执行,都完成后再进入 summarize
✅ 本节小结
- 子图通过
addSubgraph()嵌入父图,实现逻辑复用 - 同一个节点连接多个后续节点即可实现并行执行
- 并行分支在汇合节点自动等待所有分支完成
十一、流式与异步处理
场景:AI 的回复很长,你想一边生成一边展示,而不是干等。LangGraph4j 原生支持异步和流式。
异步节点
import java.util.concurrent.CompletableFuture;
import org.bsc.langgraph4j.action.AsyncNodeAction;
// 所有节点默认包装为异步(node_async)
// 也可以直接返回 CompletableFuture
AsyncNodeAction<MyState> asyncNode = state ->
CompletableFuture.supplyAsync(() -> {
// 在单独线程执行耗时操作
var result = callExternalAPI(state);
return Map.of("result", result);
});
流式执行
// stream() 逐步返回每个节点执行后的状态
for (var nodeOutput : compiledGraph.stream(initialState)) {
System.out.println("节点 " + nodeOutput.node() + " 完成");
System.out.println("当前状态: " + nodeOutput.state());
}
// invoke() 等待整个图执行完,返回最终状态
var finalState = compiledGraph.invoke(initialState).get();
System.out.println("最终结果: " + finalState);
| 方法 | 行为 | 适用场景 |
|---|---|---|
stream() | 逐步返回每个节点的输出 | 需要实时展示进度的 UI |
invoke() | 返回最终状态的 CompletableFuture | 后台任务,只关心最终结果 |
streamSnapshots() | 返回包含完整配置信息的快照 | 调试和日志 |
✅ 本节小结
- 节点天然支持异步(CompletableFuture)
stream()逐步输出,invoke()一次性输出- 流式执行对构建实时 UI 至关重要
十二、Studio 可视化调试
场景:你的图越来越复杂,光看代码很难理解执行流程。LangGraph4j 内置了一个 Web UI(Studio),让你可视化地查看、运行和调试图。
import org.bsc.langgraph4j.studio.LangGraphStudioServer;
import org.bsc.langgraph4j.studio.LangGraphStudioServer4Jetty;
// 1. 添加 Studio 依赖
// <artifactId>langgraph4j-studio-jetty</artifactId>
// 2. 创建 Studio 实例
var saver = new MemorySaver();
var instance = LangGraphStudioServer.Instance.builder()
.title("我的 AI Agent")
.addInputStringArg("input") // 定义输入字段
.graph(stateGraph) // 传入 StateGraph
.compileConfig(CompileConfig.builder()
.checkpointSaver(saver)
.build())
.build();
// 3. 启动 Studio 服务器
LangGraphStudioServer4Jetty.builder()
.port(8080)
.instance("default", instance)
.build()
.start()
.join();
// 4. 打开浏览器访问 http://localhost:8080
实时执行:输入数据后在 UI 上实时观察图的执行过程。
状态检查:查看每一步的完整状态快照。
调试利器:比看日志效率高 10 倍。
生成 Mermaid / PlantUML 图
如果你只需要静态的图结构,可以导出为 Mermaid 或 PlantUML 格式:
// 编译后获取图的 Mermaid 描述
var compiled = stateGraph.compile();
String mermaid = compiled.getGraph().toMermaid();
System.out.println(mermaid);
// 输出类似:
// graph TD
// __start__ --> classifier
// classifier -->|chat| chatHandler
// classifier -->|order| orderHandler
// chatHandler --> __end__
// orderHandler --> __end__
✅ 本节小结
- Studio 提供 Web UI 可视化调试你的图
- 支持 Jetty 和 Spring Boot 两种集成方式
- 可导出 Mermaid / PlantUML 格式的图描述
十三、综合实战:智能研究助手
让我们构建一个「智能研究助手」——用户提出一个研究问题,Agent 会自动搜索信息、评估质量、不够好就继续搜索(循环),最后生成研究报告。这个项目串联了前面所有知识点。
(评估不通过时,从 researcher 循环重试)
定义状态
class ResearchState extends AgentState {
static final Map<String, Channel<?>> SCHEMA = Map.of(
"question", Channels.base(() -> ""),
"plan", Channels.base(() -> ""),
"findings", Channels.appender(ArrayList::new),
"quality", Channels.base(() -> "low"),
"retryCount", Channels.base(() -> 0),
"report", Channels.base(() -> "")
);
public ResearchState(Map<String, Object> d) { super(d); }
public String question() { return this.<String>value("question").orElse(""); }
public String plan() { return this.<String>value("plan").orElse(""); }
public List<String> findings() {
return this.<List<String>>value("findings").orElse(List.of());
}
public String quality() { return this.<String>value("quality").orElse("low"); }
public int retryCount() { return this.<Integer>value("retryCount").orElse(0); }
}
定义节点
// 规划节点:LLM 制定研究计划
var planner = node_async((ResearchState state) -> {
String plan = chatModel.chat(
"为以下研究问题制定搜索计划:" + state.question());
return Map.of("plan", plan);
});
// 研究节点:执行搜索(模拟)
var researcher = node_async((ResearchState state) -> {
String finding = chatModel.chat(
"根据计划搜索信息:" + state.plan()
+ "\n已有发现:" + state.findings());
return Map.of(
"findings", finding,
"retryCount", state.retryCount() + 1
);
});
// 评估节点:判断信息是否充分
var evaluator = node_async((ResearchState state) -> {
String quality = chatModel.chat(
"评估以下研究发现的质量(回答 high 或 low):\n"
+ String.join("\n", state.findings()));
return Map.of("quality", quality.trim().toLowerCase());
});
// 写作节点:生成研究报告
var writer = node_async((ResearchState state) -> {
String report = chatModel.chat(
"基于以下发现撰写研究报告:\n"
+ String.join("\n", state.findings()));
return Map.of("report", report);
});
组装图
var researchGraph = new StateGraph<>(ResearchState.SCHEMA, ResearchState::new)
.addNode("planner", planner)
.addNode("researcher", researcher)
.addNode("evaluator", evaluator)
.addNode("writer", writer)
// 流程编排
.addEdge(START, "planner")
.addEdge("planner", "researcher")
.addEdge("researcher", "evaluator")
// 关键:条件边实现循环
.addConditionalEdges("evaluator",
state -> {
if (state.quality().contains("high") || state.retryCount() >= 3) {
return "write"; // 质量够好或已重试 3 次 → 写报告
}
return "retry"; // 质量不够 → 继续研究
},
Map.of(
"write", "writer",
"retry", "researcher" // 循环回 researcher!
)
)
.addEdge("writer", END)
.compile(CompileConfig.builder()
.checkpointSaver(new MemorySaver())
.build());
运行
var config = RunnableConfig.builder().threadId("research-001").build();
for (var step : researchGraph.stream(
Map.of("question", "Java 虚拟线程在高并发场景下的性能表现如何?"),
config)) {
System.out.println("【" + step.node() + "】");
var state = (ResearchState) step.state();
if (!state.findings().isEmpty()) {
System.out.println(" 发现数量: " + state.findings().size());
}
if (step.node().equals("writer")) {
System.out.println(" 报告: " + state.<String>value("report").orElse(""));
}
}
✅ 本节小结
- 综合运用了状态管理、条件路由、循环图、持久化
- 循环 + 条件边实现了"质量不够就重试"的 AI 自主决策
- retryCount 防止无限循环,这是生产必备的安全措施
十四、速查表 / Cheat Sheet
图定义
| API | 用途 | 示例 |
|---|---|---|
new StateGraph<>(schema, ctor) | 创建状态图 | new StateGraph<>(MyState.SCHEMA, MyState::new) |
.addNode(name, action) | 添加节点 | .addNode("agent", node_async(...)) |
.addEdge(from, to) | 添加普通边 | .addEdge("a", "b") |
.addConditionalEdges(...) | 添加条件边 | .addConditionalEdges("a", fn, routeMap) |
.addSubgraph(name, graph) | 嵌入子图 | .addSubgraph("sub", childGraph) |
.compile() | 编译为可执行图 | .compile(CompileConfig.builder()...) |
START / END | 内置起止标记 | addEdge(START, "first") |
状态与 Channel
| API | 用途 | 示例 |
|---|---|---|
Channels.base(() -> default) | 覆盖模式 Channel | Channels.base(() -> "") |
Channels.appender(factory) | 追加模式 Channel | Channels.appender(ArrayList::new) |
state.value("key") | 读取状态值 | state.<String>value("name").orElse("") |
执行与调试
| API | 用途 | 示例 |
|---|---|---|
graph.stream(state) | 流式执行 | for (var s : graph.stream(init)) {...} |
graph.invoke(state) | 执行并返回最终状态 | graph.invoke(init).get() |
graph.streamSnapshots(...) | 流式快照(带配置信息) | 用于调试和 Time Travel |
MemorySaver | 内存检查点存储 | new MemorySaver() |
RunnableConfig | 运行配置(threadId 等) | RunnableConfig.builder().threadId("t1").build() |
集成
| 模块 | 用途 |
|---|---|
langgraph4j-langchain4j | 与 LangChain4j 集成 |
langgraph4j-spring-ai | 与 Spring AI 集成 |
langgraph4j-agent-executor | 内置 ReACT Agent |
langgraph4j-studio-jetty | Studio Web UI(Jetty) |
langgraph4j-studio-springboot | Studio Web UI(Spring Boot) |
十五、进阶路线图
高级用法探索
Multi-Agent Supervisor:一个 "主管" Agent 协调多个专业 Agent 工作
Agent Handoff:Agent 之间无缝交接任务和上下文
Adaptive RAG:根据查询复杂度动态调整检索策略
Deep Agents (Agent 2.0):深度嵌套的 Agent 架构
AG-UI 协议:标准化的 Agent 与前端交互协议
Postgres Checkpoint:生产级的持久化检查点存储
Graph Builder:可视化拖拽设计图,自动生成 Java 代码
推荐资源
| 资源 | 链接 | 说明 |
|---|---|---|
| 官方文档 | langgraph4j.github.io | 最权威的参考 |
| GitHub 仓库 | github.com/langgraph4j | 源码与示例 |
| 示例仓库 | langgraph4j-examples | 完整可运行的示例 |
| Graph Builder | 可视化图设计器 | 拖拽设计图并生成代码 |
| O'Reilly 书籍 | Applied AI for Enterprise Java | 深度学习 LangGraph4j 章节 |
🎓 系列教程第二篇完成!
你已经掌握了用图来编排复杂 AI 应用的能力。
结合第一篇 LangChain4j 的知识,你可以构建真正的生产级 AI Agent 了!