
=============================================================================================================================================================================================================================================================================================================================================================================================
两个 Graph 进阶特性,一次跑通——含节点级耗时日志验证。
上篇搭了 Supervisor + RAG 子图,CodeReview 子图是占位。这篇把它填满:并行执行让 codeCheck 和 styleCheck 同时跑,HITL 让高风险代码等人工拍板。
Supervisor 全局路由 + CodeReview 子图详情:
START
├─ codeCheckNode (调 Feign → codeReviewAnalyzeResult JSON)
└─ styleCheckNode (本地规则 → styleCheckResult JSON)
↓(并行汇聚)
→ mergeCheckNode (合并 STYLE issues,扣分规则:每条 -2,最低 0)
→ riskAssessNode (写 riskLevel / needHumanReview)
→ [条件边] → humanReviewNode 或 reportGenNode
→ humanReviewNode (可选)HITL
→ reportGenNode
END
两个亮点一目了然:
codeCheckNode 和 styleCheckNode 并行执行riskAssessNode 后的条件边决定是否进入 humanReviewNode(HITL)Spring AI Graph 1.0.0.3+ 并行分支用 addEdge 的 List 重载:
// 并行分叉:START → codeCheck ∥ styleCheck
stateGraph.addEdge(
START,
List.of(CodeReviewSubGraphNames.codeCheckNode,
CodeReviewSubGraphNames.styleCheckNode));// 并行汇聚:两分支均完成后进入 mergeCheck
stateGraph.addEdge(
List.of(CodeReviewSubGraphNames.codeCheckNode,
CodeReviewSubGraphNames.styleCheckNode),
CodeReviewSubGraphNames.mergeCheckNode);
就两行。不需要 addParallelNode 之类的专用 API,addEdge(START, List.of(...)) 自动创建并行分支,addEdge(List.of(...), target) 自动等待所有源节点完成再汇聚。
codeCheckNode——调 Feign 远程服务(重):
@Component
public class CodeCheckNode implements NodeAction { private final CodeReviewClient codeReviewClient; @Override
public Map<String, Object> apply(OverAllState state) {
CodeReviewGraphTiming timing = CodeReviewGraphTiming.begin();
String input = CodeReviewInputSupport.resolveReviewCode(state);
log.info("[CodeReviewGraph] node=codeCheckNode start atMs={} codeLength={}",
timing.startEpochMs(), input.length()); try {
CodeReviewAnalyzeRequest request =
CodeReviewAnalyzeRequest.fromReview(input,
CodeReviewInputSupport.resolveReviewInstruction(state));
CodeReviewAnalyzeResult result = codeReviewClient.analyze(request); log.info("[CodeReviewGraph] node=codeCheckNode done atMs={} elapsedMs={} score={} issueCount={}",
timing.nowEpochMs(), timing.elapsedMs(),
result.score(), result.issues().size()); return Map.of(CodeReviewSubGraphNames.codeReviewAnalyzeResult,
CodeReviewAnalyzeResults.toJson(result));
} catch (Exception ex) {
return Map.of(DreamSaaSOverAllState.errorMessage,
"调用 code-review-agent 失败: " + ex.getMessage());
}
}
}
styleCheckNode——本地规则检查(轻):
@Component
public class StyleCheckNode implements NodeAction { @Override
public Map<String, Object> apply(OverAllState state) {
CodeReviewGraphTiming timing = CodeReviewGraphTiming.begin();
String input = CodeReviewInputSupport.resolveReviewCode(state);
log.info("[CodeReviewGraph] node=styleCheckNode start atMs={} inputLength={}",
timing.startEpochMs(), input.length()); List<StyleCheckViolation> violations = CodeStyleRules.scan(input);
boolean compliant = violations.isEmpty();
StyleCheckResultPayload payload =
new StyleCheckResultPayload(compliant, violations,
compliant ? "格式检查通过" : "格式检查发现 " + violations.size() + " 处问题"); log.info("[CodeReviewGraph] node=styleCheckNode done atMs={} elapsedMs={} compliant={} violationCount={}",
timing.nowEpochMs(), timing.elapsedMs(), compliant, violations.size());
return Map.of(CodeReviewSubGraphNames.styleCheckResult,
StyleCheckResults.toJson(payload));
}
}
两个节点都实现了 NodeAction,注册时统一用 AsyncNodeAction.node_async() 包一层,框架自动用线程池并行执行。
部署到服务器跑了一轮,日志里线程名和时间戳清清楚楚:
23:18:39.570 [parallel-node-action-thread-6] INFO CodeCheckNode - start atMs=1780499919570
23:18:39.573 [parallel-node-action-thread-5] INFO StyleCheckNode - start atMs=1780499919573
23:18:39.573 [parallel-node-action-thread-5] INFO StyleCheckNode - done elapsedMs=0
23:19:00.026 [parallel-node-action-thread-6] INFO CodeCheckNode - done elapsedMs=20455
关键证据:
当前 styleCheckNode 是本地规则所以并行收益不明显。但如果 styleCheck 也调 LLM(比如代码风格建议,预估 5-10s),并行 = max(22s, 10s) ≈ 22s,串行 = 22s + 10s = 32s,节省 ~31%。架构就位,加节点就行。
并行汇聚之后,mergeCheckNode → riskAssessNode → [条件边] 这条链路看起来正常,但如果你试图在并行分支上直接加条件边——比如 codeCheckNode → addConditionalEdges(...) ——会报错。并行分支必须先 fan-in 到一个普通节点,再从那个节点出条件边。
// 错误:并行节点后直接条件边
stateGraph.addConditionalEdges(CodeReviewSubGraphNames.codeCheckNode, ...);// 正确:先 fan-in → mergeCheck → riskAssess → 条件边
stateGraph.addEdge(
List.of(codeCheckNode, styleCheckNode),
mergeCheckNode);
stateGraph.addConditionalEdges(riskAssessNode, routeNode, Map.of(...));
HITL(Human-in-the-Loop)的本质是:图跑到某个节点之前暂停,等人工决策后从断点继续。
CompileConfig 提供两种中断时机:
interruptBefore(node):节点执行前暂停,状态不含该节点输出——适用于人工审核后决定是否执行该节点interruptAfter(node):节点执行后暂停,状态已含该节点输出——适用于人工审核节点输出后决定走向本项目用 interruptBefore(humanReviewNode):风险评估已产出结论,人工在审核前介入,可以直接 APPROVED 或 REJECTED,不需要先跑 humanReviewNode 再看结果。
CompileConfig compileConfig =
GraphCheckpointCompileSupport.withSharedCheckpointSaver(
CompileConfig.builder()
.interruptBefore(CodeReviewSubGraphNames.humanReviewNode),
graphSaverConfig)
.build();CompiledGraph compiled = stateGraph.compile(compileConfig);
一行 interruptBefore(humanReviewNode),图跑到 humanReviewNode 前就停了。但停了不等于丢了——checkpoint 机制会把状态持久化到 Redis,等 resume 时恢复。
Graph state 只支持 String / Number / Boolean / Map / List。嵌套对象(比如 CodeReviewAnalyzeResult)必须转 JSON 字符串存进去,读出来再反序列化。
// 写入:对象 → JSON 字符串
return Map.of(CodeReviewSubGraphNames.codeReviewAnalyzeResult,
CodeReviewAnalyzeResults.toJson(result));// 读取:JSON 字符串 → 对象
CodeReviewAnalyzeResult analyze =
CodeReviewAnalyzeResults.parse(
state.value(CodeReviewSubGraphNames.codeReviewAnalyzeResult)
.orElse(null));
直接塞对象进去,invoke 能跑,但 checkpoint 序列化时炸——Redis 里存不进去,resume 反序列化也拿不到。
第一步:startHitl,拿到 threadId
public Map<String, Object> startHitl(String instruction, String code, boolean forceHumanReview) {
String threadId = UUID.randomUUID().toString();
RunnableConfig runConfig = GraphInvokeSupport.threadConfig(threadId); Optional<OverAllState> finished =
codeReviewSubGraph.invoke(
GraphInvokeSupport.codeReviewInputs(instruction, code, forceHumanReview),
runConfig);
// invoke 在 humanReviewNode 前停住,finished 里有 partial state
Map<String, Object> result = invokeAndPack(finished, threadId);
enrichInterruptFlags(result, runConfig);
return result;
}
forceHumanReview=true 时设置 hitlTestForceHumanReview 标记,RiskAssessNode 无视实际风险等级强制 needHumanReview=true,方便测试。
第二步:resumeHitl,写入人工决策 + 恢复执行
public Map<String, Object> resumeHitl(String threadId, String decision, String reason) {
RunnableConfig runConfig = GraphInvokeSupport.threadConfig(threadId); // 1. updateState:把人工决策写入 checkpoint
Map<String, Object> patch = new HashMap<>();
patch.put(CodeReviewSubGraphNames.humanDecision, decision);
patch.put(CodeReviewSubGraphNames.humanReviewReason, reason);
runConfig = codeReviewSubGraph.updateState(runConfig, patch, null); // 2. getState:读出 snapshot,拿 next 节点 ID
StateSnapshot snapshot = codeReviewSubGraph.getState(runConfig);
String nextNode = snapshot.next(); // 3. withNodeResumed:标记中断节点可继续
runConfig.withNodeResumed(nextNode); // 4. invoke:从 checkpoint state 接着跑
OverAllState state = codeReviewSubGraph
.invoke(snapshot.state(), runConfig)
.orElseThrow();
return invokeAndPack(Optional.of(state), threadId);
}
四步清清楚楚:updateState → getState → withNodeResumed → invoke。少一步都不行。
RiskAssessNode 决定需不需要人工介入:
static boolean needHumanReview(int score, SeverityCounts counts) {
if (counts.critical > 0) return true; // 有 CRITICAL issue
if (score < 60) return true; // 评分低于 60
return counts.high >= 3; // HIGH issue ≥ 3
}
风险等级由 issue 严重度和评分双维度取高:
用一段含 SQL 注入的代码测试:
public class AuthController {
@PostMapping("/login")
public String login(String username, String password) {
String sql = "SELECT * FROM users WHERE username=" + username
+ " AND password=" + password;
Statement stmt = conn.createStatement();
ResultSet rs = stmt.executeQuery(sql);
return rs.next() ? "token-" + username : null;
}
}
HITL Start 日志:
23:21:04.241 [parallel-node-action-thread-5] INFO CodeCheckNode - start codeLength=417
23:21:04.242 [parallel-node-action-thread-6] INFO StyleCheckNode - start inputLength=417
23:21:04.242 [parallel-node-action-thread-6] INFO StyleCheckNode - done elapsedMs=0
23:21:25.823 [parallel-node-action-thread-5] INFO CodeCheckNode - done elapsedMs=21581 score=70
23:21:25.832 [parallel-node-action-thread-5] INFO MergeCheckNode - done elapsedMs=0
23:21:25.838 [parallel-node-action-thread-5] INFO RiskAssessNode - done elapsedMs=0 riskLevel=CRITICAL needHumanReview=true
23:21:25.846 [http-nio-8098-exec-2] INFO Service - graph interrupted nextNode=humanReviewNode awaiting resume
并行执行 → 风险 CRITICAL → HITL 中断,等人工。
HITL Resume(APPROVED)日志:
23:21:40.872 [http-nio-8098-exec-7] INFO Controller - api hitl/resume threadId=9d3c8010 decision=APPROVED
23:21:40.907 [http-nio-8098-exec-7] INFO HumanReviewNode - approved elapsedMs=0 note=人工复核通过: 人工确认安全风险可接受,已规划修复
23:21:40.916 [http-nio-8098-exec-7] INFO ReportGenNode - done elapsedMs=0 mode=success score=70
23:21:40.933 [http-nio-8098-exec-7] INFO Service - resumeHitl done elapsedMs=61
61ms 完成断点恢复。REJECTED 同理只要 30ms。
HITL Resume(REJECTED)日志:
23:22:23.940 [http-nio-8098-exec-8] INFO HumanReviewNode - rejected elapsedMs=0 note=人工复核驳回: SQL注入漏洞不可接受,必须修复后重新提交
23:22:23.946 [http-nio-8098-exec-8] INFO ReportGenNode - done elapsedMs=0 mode=failure reportLength=40
23:22:23.954 [http-nio-8098-exec-8] INFO Service - resumeHitl done elapsedMs=30
REJECTED 时 HumanReviewNode 会写 errorMessage,ReportGenNode 走 failure 分支生成简短报告。
HITL 能断点续跑,靠的是 checkpoint 机制——图每次执行到一个节点,状态会被 Saver 持久化;中断后 resume 时,从上次存的快照恢复。
spring-ai-alibaba-graph-core 开箱即用的 CheckpointSaver:
入门教程推荐 MemorySaver(零依赖,跑起来再说),生产环境 RedisSaver(本项目选择)。
一套配置,Redis 优先、自动降级:
@Configuration
@EnableConfigurationProperties(GraphCheckpointProperties.class)
@ConditionalOnClass(BaseCheckpointSaver.class)
public class GraphCheckpointConfiguration { @Bean(name = "graphCheckpointSaver")
@ConditionalOnMissingBean(name = "graphCheckpointSaver")
public BaseCheckpointSaver graphCheckpointSaver(
GraphCheckpointProperties properties,
@Qualifier("graphRedissonClient") ObjectProvider<RedissonClient> redissonProvider) {
if (properties.isRedisEnabled()) {
RedissonClient client = redissonProvider.getIfAvailable();
if (client != null) {
return RedisSaver.builder().redisson(client).build();
}
// Redis enabled but Redisson missing → fallback
log.warn("[GraphCheckpoint] redis-enabled=true but RedissonClient missing; falling back to MemorySaver");
}
return MemorySaver.builder().build();
}
}
逻辑很简单:
dream.ai.graph.checkpoint.redis-enabled=true(默认)→ 创建独立 Redisson 客户端 → RedisSaverredis-enabled=false 或 Redisson 不可用 → 降级到 MemorySaverSaverConfig,注入 CompileConfig.builder().saverConfig(...)配置项只有两个:
dream:
ai:
graph:
checkpoint:
redis-enabled: true # false → MemorySaver
redis-address-prefix: "redis://" # 集群/哨兵可改为 rediss://
Redis 连接复用 spring.data.redis.*(host / port / password / database),不需要额外配数据源。
如果项目有多个子图都要 HITL(比如 CodeReview 子图 + 未来的 Chat 子图),不需要每个子图各自配 Saver。GraphCheckpointCompileSupport 把共享的 SaverConfig 挂到 CompileConfig:
public final class GraphCheckpointCompileSupport { public static CompileConfig.Builder withSharedCheckpointSaver(
CompileConfig.Builder builder, ObjectProvider<SaverConfig> graphSaverConfig) {
return builder.saverConfig(resolveSaverConfig(graphSaverConfig));
} public static SaverConfig resolveSaverConfig(ObjectProvider<SaverConfig> graphSaverConfig) {
return graphSaverConfig.getIfAvailable(
() -> SaverConfig.builder()
.register(MemorySaver.builder().build())
.build());
}
}
子图装配时一行搞定:
CompileConfig compileConfig =
GraphCheckpointCompileSupport.withSharedCheckpointSaver(
CompileConfig.builder()
.interruptBefore(CodeReviewSubGraphNames.humanReviewNode),
graphSaverConfig) // ObjectProvider<SaverConfig>
.build();
未装配 dream-ai-graph 模块时自动降级 MemorySaver,不报错、不阻塞。
多个请求复用同一个 CompiledGraph 实例,state 如果不清理会串数据。GraphInvokeStateDefaults 在每次 invoke 前注册默认值:
public class GraphInvokeStateDefaults {
public static Map<String, Object> codeReviewInputs(String instruction, String code) {
return codeReviewInputs(instruction, code, false);
} public static Map<String, Object> codeReviewInputs(String instruction, String code,
boolean forceHumanReview) {
Map<String, Object> inputs = new HashMap<>();
inputs.put(DreamSaaSOverAllState.userInput, instruction + "n" + code);
inputs.put(CodeReviewSubGraphNames.reviewInstruction,
StringUtils.hasText(instruction) ? instruction : "");
inputs.put(CodeReviewSubGraphNames.reviewCode,
StringUtils.hasText(code) ? code : "");
inputs.put(CodeReviewSubGraphNames.codeReviewAnalyzeResult, "");
inputs.put(CodeReviewSubGraphNames.styleCheckResult, "");
inputs.put(CodeReviewSubGraphNames.riskLevel, "");
inputs.put(CodeReviewSubGraphNames.needHumanReview, false);
inputs.put(CodeReviewSubGraphNames.hitlTestForceHumanReview, forceHumanReview);
inputs.put(DreamSaaSOverAllState.errorMessage, "");
inputs.put(DreamSaaSOverAllState.finalAnswer, "");
return inputs;
}
}
每个请求显式初始化所有 state 键,防止上一个请求的残留值污染当前请求。这个坑踩过一次就记住了——并行执行时两个分支写同一个 state key,如果初始值不干净,merge 阶段会拿到脏数据。
5 轮测试的节点级耗时(从服务器日志 [CodeReviewGraph] 提取):
关键结论:
当前并行收益不明显(因为 styleCheckNode 没有调 LLM),但架构已就位。未来 styleCheck 也接 LLM 做代码风格建议(预估 5-10s),并行可直接节省 ~30% 耗时。
src/main/java/com/zhu/dream/ai/graph/subgraph/codereview/
├── CodeReviewSubGraphConfiguration.java # 子图装配(并行+条件边+interruptBefore)
├── CodeCheckNode.java # Feign 调 code-review-agent
├── StyleCheckNode.java # 本地格式规则
├── CodeReviewMergeNode.java # 合并 STYLE issues
├── RiskAssessNode.java # 风险评估 + needHumanReview
├── CodeReviewRouteNode.java # 条件边:needHumanReview → humanReview / reportGen
├── HumanReviewNode.java # HITL 人工复核
├── ReportGenNode.java # 生成 Markdown 报告
├── CodeReviewSubGraphNames.java # 常量
├── CodeReviewSubGraphTestService.java # invoke / startHitl / resumeHitl
├── CodeReviewGraphTiming.java # 节点耗时统计(atMs / elapsedMs)src/main/java/com/zhu/dream/ai/graph/support/
├── CodeReviewInputSupport.java # 输入拆分(markdown fence 提取)
├── GraphInvokeStateDefaults.java # state 键默认值注册表
├── GraphInvokeSupport.java # invoke 工具(防状态污染)
版本信息:spring-ai-bom 1.1.6 + spring-ai-alibaba 1.1.2.2
在线体验:dream-saas.com