跳到主要内容

🧩 子图(Subgraphs)

引言

在构建复杂的 LangGraph 应用时,我们经常需要将功能模块化,实现代码重用,或者支持团队协作开发。子图(Subgraphs)正是为了解决这些需求而设计的强大功能。

子图本质上是一个可以作为节点使用的图,这种设计体现了软件工程中经典的封装思想。就像前端开发中的组件化一样,子图让我们能够将复杂的逻辑封装成可重用的模块。

核心概念

什么是子图?

子图是一个完整的 LangGraph 图,但它可以作为另一个图中的节点来使用。这种嵌套结构为我们提供了强大的模块化能力。

子图的特点

  • 封装性:子图内部的复杂逻辑对外部透明
  • 可重用性:同一个子图可以在多个父图中使用
  • 独立性:子图可以独立开发和测试
  • 通信性:通过共享状态键与父图通信

子图的使用方式

LangGraph 提供了两种添加子图的方式,适用于不同的场景。

方式一:直接添加编译后的子图

当父图和子图共享状态键时,可以直接将编译后的子图作为节点添加:

直接添加子图示例:

import '../../utils/loadEnv';
import { StateGraph, Annotation, START, END } from '@langchain/langgraph';

// 定义共享状态结构
const SharedStateAnnotation = Annotation.Root({
message: Annotation<string>(),
count: Annotation<number>({
reducer: (current: number, update: number) => current + update,
default: () => 0,
}),
});

// 子图节点函数
const subgraphNode1 = (state: typeof SharedStateAnnotation.State) => {
console.log(`子图节点1处理: ${state.message}`);
return {
message: `${state.message} -> 子图处理`,
count: 1,
};
};

const subgraphNode2 = (state: typeof SharedStateAnnotation.State) => {
console.log(`子图节点2处理: ${state.message}`);
return {
message: `${state.message} -> 完成`,
count: 1,
};
};

// 创建子图
const subgraph = new StateGraph(SharedStateAnnotation)
.addNode('process', subgraphNode1)
.addNode('finalize', subgraphNode2)
.addEdge(START, 'process')
.addEdge('process', 'finalize')
.addEdge('finalize', END)
.compile();

// 父图节点函数
const parentNode1 = (state: typeof SharedStateAnnotation.State) => {
console.log(`父图节点1: ${state.message}`);
return {
message: `开始处理: ${state.message}`,
count: 1,
};
};

const parentNode2 = (state: typeof SharedStateAnnotation.State) => {
console.log(`父图节点2: ${state.message}, 总计数: ${state.count}`);
return {
message: `最终结果: ${state.message}`,
};
};

// 创建父图,直接添加编译后的子图
const parentGraph = new StateGraph(SharedStateAnnotation)
.addNode('start', parentNode1)
.addNode('subgraph', subgraph) // 直接添加编译后的子图
.addNode('end', parentNode2)
.addEdge(START, 'start')
.addEdge('start', 'subgraph')
.addEdge('subgraph', 'end')
.addEdge('end', END)
.compile();

// 使用示例
async function runBasicSubgraphExample() {
console.log('=== 基础子图示例 ===');

const result = await parentGraph.invoke({
message: '用户输入',
count: 0,
});

console.log('最终结果:', result);
console.log('总处理步骤:', result.count);
}

// 导出供其他模块使用
export { subgraph, parentGraph, runBasicSubgraphExample };

方式二:通过函数调用子图

当父图和子图有不同的状态结构时,需要通过函数来调用子图并进行状态转换:

状态转换子图示例:

import '../../utils/loadEnv';
import { StateGraph, Annotation, START, END } from '@langchain/langgraph';

// 父图状态结构
const ParentStateAnnotation = Annotation.Root({
userInput: Annotation<string>(),
processedData: Annotation<string>(),
metadata: Annotation<{
timestamp: string;
userId: string;
}>(),
});

// 子图状态结构(完全不同的结构)
const SubgraphStateAnnotation = Annotation.Root({
inputText: Annotation<string>(),
outputText: Annotation<string>(),
processingSteps: Annotation<string[]>({
reducer: (current: string[], update: string[]) => [...current, ...update],
default: () => [],
}),
});

// 子图节点函数
const analyzeText = (state: typeof SubgraphStateAnnotation.State) => {
console.log(`分析文本: ${state.inputText}`);
return {
processingSteps: ['文本分析完成'],
outputText: `分析结果: ${state.inputText.length} 个字符`,
};
};

const enhanceText = (state: typeof SubgraphStateAnnotation.State) => {
console.log(`增强文本: ${state.outputText}`);
return {
processingSteps: ['文本增强完成'],
outputText: `${state.outputText} | 增强处理`,
};
};

// 创建子图
const textProcessingSubgraph = new StateGraph(SubgraphStateAnnotation)
.addNode('analyze', analyzeText)
.addNode('enhance', enhanceText)
.addEdge(START, 'analyze')
.addEdge('analyze', 'enhance')
.addEdge('enhance', END)
.compile();

// 父图节点函数
const prepareInput = (state: typeof ParentStateAnnotation.State) => {
console.log(`准备输入: ${state.userInput}`);
return {
metadata: {
timestamp: new Date().toISOString(),
userId: 'user123',
},
};
};

// 调用子图的包装函数(进行状态转换)
const callTextProcessing = async (
state: typeof ParentStateAnnotation.State
) => {
console.log('调用文本处理子图...');

// 将父图状态转换为子图状态
const subgraphInput = {
inputText: state.userInput,
outputText: '',
processingSteps: [],
};

try {
// 调用子图
const subgraphResult = await textProcessingSubgraph.invoke(subgraphInput);

// 将子图结果转换回父图状态
return {
processedData: subgraphResult.outputText,
metadata: {
...state.metadata,
processingSteps: subgraphResult.processingSteps.join(' -> '),
},
};
} catch (error) {
console.error('子图处理失败:', error);
return {
processedData: `处理失败: ${error.message}`,
};
}
};

const finalizeResult = (state: typeof ParentStateAnnotation.State) => {
console.log(`最终处理: ${state.processedData}`);
return {
processedData: `最终结果: ${state.processedData} (${state.metadata.timestamp})`,
};
};

// 创建父图
const parentGraph = new StateGraph(ParentStateAnnotation)
.addNode('prepare', prepareInput)
.addNode('process', callTextProcessing)
.addNode('finalize', finalizeResult)
.addEdge(START, 'prepare')
.addEdge('prepare', 'process')
.addEdge('process', 'finalize')
.addEdge('finalize', END)
.compile();

// 使用示例
async function runTransformStateExample() {
console.log('=== 状态转换子图示例 ===');

const result = await parentGraph.invoke({
userInput: '这是一个需要处理的文本',
processedData: '',
metadata: {
timestamp: '',
userId: '',
},
});

console.log('最终结果:', result);
}

// 高级示例:批量处理多个输入
async function runBatchProcessingExample() {
console.log('=== 批量处理示例 ===');

const inputs = ['第一个文本', '第二个文本', '第三个文本'];

const results = await Promise.all(
inputs.map(async (input, index) => {
const result = await parentGraph.invoke({
userInput: input,
processedData: '',
metadata: {
timestamp: '',
userId: `user${index + 1}`,
},
});
return result;
})
);

console.log('批量处理结果:', results);
}

// 导出供其他模块使用
export {
textProcessingSubgraph,
parentGraph,
runTransformStateExample,
runBatchProcessingExample,
};

状态通信机制

子图与父图之间的通信是通过共享状态键实现的。理解这种通信机制对于正确使用子图至关重要。

共享状态示例

共享状态通信示例:

import '../../utils/loadEnv';
import { StateGraph, Annotation, START, END } from '@langchain/langgraph';

// 定义共享状态结构
const SharedStateAnnotation = Annotation.Root({
// 共享键 - 父图和子图都可以访问
sharedData: Annotation<string>(),
sharedCounter: Annotation<number>({
reducer: (current: number, update: number) => current + update,
default: () => 0,
}),

// 父图专用键
parentOnlyData: Annotation<string>(),

// 子图专用键(父图会忽略这些键)
subgraphOnlyData: Annotation<string>(),
processingLog: Annotation<string[]>({
reducer: (current: string[], update: string[]) => [...current, ...update],
default: () => [],
}),
});

// 子图节点函数
const subgraphStep1 = (state: typeof SharedStateAnnotation.State) => {
console.log(`子图步骤1 - 接收到共享数据: ${state.sharedData}`);

return {
sharedData: `${state.sharedData} -> 子图处理1`,
sharedCounter: 1,
subgraphOnlyData: '子图内部数据1',
processingLog: ['子图步骤1完成'],
};
};

const subgraphStep2 = (state: typeof SharedStateAnnotation.State) => {
console.log(`子图步骤2 - 当前数据: ${state.sharedData}`);
console.log(`子图内部数据: ${state.subgraphOnlyData}`);

return {
sharedData: `${state.sharedData} -> 子图处理2`,
sharedCounter: 1,
subgraphOnlyData: '子图内部数据2',
processingLog: ['子图步骤2完成'],
};
};

// 创建子图
const processingSubgraph = new StateGraph(SharedStateAnnotation)
.addNode('step1', subgraphStep1)
.addNode('step2', subgraphStep2)
.addEdge(START, 'step1')
.addEdge('step1', 'step2')
.addEdge('step2', END)
.compile();

// 父图节点函数
const parentInitialize = (state: typeof SharedStateAnnotation.State) => {
console.log('父图初始化');

return {
sharedData: '初始数据',
sharedCounter: 1,
parentOnlyData: '父图专用数据',
};
};

const parentPreProcess = (state: typeof SharedStateAnnotation.State) => {
console.log(`父图预处理 - 当前数据: ${state.sharedData}`);

return {
sharedData: `${state.sharedData} -> 父图预处理`,
sharedCounter: 1,
parentOnlyData: `${state.parentOnlyData} -> 预处理`,
};
};

const parentFinalize = (state: typeof SharedStateAnnotation.State) => {
console.log(`父图最终处理 - 数据: ${state.sharedData}`);
console.log(`总计数器: ${state.sharedCounter}`);
console.log(`父图数据: ${state.parentOnlyData}`);

// 注意:子图的 subgraphOnlyData 和 processingLog 在这里是 undefined
// 因为父图的状态结构中虽然定义了这些键,但子图返回的这些键会被忽略
console.log(`子图日志: ${state.processingLog?.join(' | ') || '无'}`);

return {
sharedData: `最终结果: ${state.sharedData}`,
parentOnlyData: `最终: ${state.parentOnlyData}`,
};
};

// 创建父图,直接添加子图
const parentGraph = new StateGraph(SharedStateAnnotation)
.addNode('initialize', parentInitialize)
.addNode('preprocess', parentPreProcess)
.addNode('subgraph', processingSubgraph) // 直接添加编译后的子图
.addNode('finalize', parentFinalize)
.addEdge(START, 'initialize')
.addEdge('initialize', 'preprocess')
.addEdge('preprocess', 'subgraph')
.addEdge('subgraph', 'finalize')
.addEdge('finalize', END)
.compile();

// 演示状态通信的示例
async function runSharedStateExample() {
console.log('=== 共享状态通信示例 ===');

const result = await parentGraph.invoke({
sharedData: '',
sharedCounter: 0,
parentOnlyData: '',
subgraphOnlyData: '',
processingLog: [],
});

console.log('\n=== 最终状态 ===');
console.log('共享数据:', result.sharedData);
console.log('共享计数器:', result.sharedCounter);
console.log('父图数据:', result.parentOnlyData);
console.log('子图数据:', result.subgraphOnlyData); // 应该是 undefined
console.log('处理日志:', result.processingLog); // 应该是 undefined
}

// 演示状态键过滤的示例
async function runStateFilteringExample() {
console.log('\n=== 状态键过滤示例 ===');

// 直接调用子图,传入额外的键
const subgraphResult = await processingSubgraph.invoke({
sharedData: '直接调用子图',
sharedCounter: 0,
parentOnlyData: '这个键会被子图忽略',
subgraphOnlyData: '',
processingLog: [],
});

console.log('子图结果:', subgraphResult);
console.log('注意:不相关的键会被过滤掉');
}

// 演示多个子图共享状态
const anotherSubgraph = new StateGraph(SharedStateAnnotation)
.addNode('process', (state: typeof SharedStateAnnotation.State) => {
console.log(`另一个子图处理: ${state.sharedData}`);
return {
sharedData: `${state.sharedData} -> 另一个子图`,
sharedCounter: 2,
processingLog: ['另一个子图处理完成'],
};
})
.addEdge(START, 'process')
.addEdge('process', END)
.compile();

const multiSubgraphParent = new StateGraph(SharedStateAnnotation)
.addNode('init', (state: typeof SharedStateAnnotation.State) => ({
sharedData: '多子图示例',
sharedCounter: 1,
}))
.addNode('subgraph1', processingSubgraph)
.addNode('subgraph2', anotherSubgraph)
.addNode('final', (state: typeof SharedStateAnnotation.State) => ({
sharedData: `完成: ${state.sharedData}`,
}))
.addEdge(START, 'init')
.addEdge('init', 'subgraph1')
.addEdge('subgraph1', 'subgraph2')
.addEdge('subgraph2', 'final')
.addEdge('final', END)
.compile();

async function runMultiSubgraphExample() {
console.log('\n=== 多子图共享状态示例 ===');

const result = await multiSubgraphParent.invoke({
sharedData: '',
sharedCounter: 0,
parentOnlyData: '',
subgraphOnlyData: '',
processingLog: [],
});

console.log('多子图处理结果:', result);
}

// 导出供其他模块使用
export {
processingSubgraph,
parentGraph,
multiSubgraphParent,
runSharedStateExample,
runStateFilteringExample,
runMultiSubgraphExample,
};

实际应用场景

1. 多代理系统

子图在构建多代理系统中发挥重要作用,每个代理可以作为一个独立的子图:

多代理子图系统示例:

import '../../utils/loadEnv';
import { StateGraph, Annotation, START, END } from '@langchain/langgraph';

// 定义多代理系统的共享状态
const MultiAgentStateAnnotation = Annotation.Root({
task: Annotation<string>(),
currentAgent: Annotation<string>(),
results: Annotation<Record<string, any>>({
reducer: (current: Record<string, any>, update: Record<string, any>) => ({
...current,
...update,
}),
default: () => ({}),
}),
messages: Annotation<string[]>({
reducer: (current: string[], update: string[]) => [...current, ...update],
default: () => [],
}),
completed: Annotation<boolean>(),
});

// 研究代理子图
const ResearchAgentStateAnnotation = Annotation.Root({
query: Annotation<string>(),
findings: Annotation<string[]>({
reducer: (current: string[], update: string[]) => [...current, ...update],
default: () => [],
}),
confidence: Annotation<number>(),
});

const researchNode = (state: typeof ResearchAgentStateAnnotation.State) => {
console.log(`研究代理处理查询: ${state.query}`);

// 模拟研究过程
const findings = [
`关于 "${state.query}" 的发现1`,
`关于 "${state.query}" 的发现2`,
`关于 "${state.query}" 的发现3`,
];

return {
findings,
confidence: 0.85,
};
};

const researchAgent = new StateGraph(ResearchAgentStateAnnotation)
.addNode('research', researchNode)
.addEdge(START, 'research')
.addEdge('research', END)
.compile();

// 分析代理子图
const AnalysisAgentStateAnnotation = Annotation.Root({
data: Annotation<string[]>(),
analysis: Annotation<string>(),
recommendations: Annotation<string[]>({
reducer: (current: string[], update: string[]) => [...current, ...update],
default: () => [],
}),
});

const analysisNode = (state: typeof AnalysisAgentStateAnnotation.State) => {
console.log(`分析代理处理数据: ${state.data?.join(', ')}`);

const analysis = `基于 ${state.data?.length || 0} 项数据的综合分析`;
const recommendations = [
'建议1:优化数据收集流程',
'建议2:增加数据验证步骤',
'建议3:建立监控机制',
];

return {
analysis,
recommendations,
};
};

const analysisAgent = new StateGraph(AnalysisAgentStateAnnotation)
.addNode('analyze', analysisNode)
.addEdge(START, 'analyze')
.addEdge('analyze', END)
.compile();

// 报告代理子图
const ReportAgentStateAnnotation = Annotation.Root({
analysisResult: Annotation<string>(),
recommendations: Annotation<string[]>(),
report: Annotation<string>(),
format: Annotation<'markdown' | 'json' | 'text'>(),
});

const reportNode = (state: typeof ReportAgentStateAnnotation.State) => {
console.log(`报告代理生成报告,格式: ${state.format}`);

const report = `
# 分析报告

## 分析结果
${state.analysisResult}

## 建议
${state.recommendations?.map((rec, i) => `${i + 1}. ${rec}`).join('\n') || '无建议'}

## 总结
基于以上分析和建议,建议采取相应的改进措施。
`.trim();

return { report };
};

const reportAgent = new StateGraph(ReportAgentStateAnnotation)
.addNode('generate', reportNode)
.addEdge(START, 'generate')
.addEdge('generate', END)
.compile();

// 主控制器节点函数
const initializeTask = (state: typeof MultiAgentStateAnnotation.State) => {
console.log(`初始化任务: ${state.task}`);
return {
currentAgent: 'research',
messages: [`任务开始: ${state.task}`],
};
};

const callResearchAgent = async (
state: typeof MultiAgentStateAnnotation.State
) => {
console.log('调用研究代理...');

const researchResult = await researchAgent.invoke({
query: state.task,
findings: [],
confidence: 0,
});

return {
currentAgent: 'analysis',
results: {
research: researchResult,
},
messages: [`研究完成,发现 ${researchResult.findings.length} 项内容`],
};
};

const callAnalysisAgent = async (
state: typeof MultiAgentStateAnnotation.State
) => {
console.log('调用分析代理...');

const researchData = state.results.research?.findings || [];
const analysisResult = await analysisAgent.invoke({
data: researchData,
analysis: '',
recommendations: [],
});

return {
currentAgent: 'report',
results: {
...state.results,
analysis: analysisResult,
},
messages: [
`分析完成,生成 ${analysisResult.recommendations.length} 项建议`,
],
};
};

const callReportAgent = async (
state: typeof MultiAgentStateAnnotation.State
) => {
console.log('调用报告代理...');

const analysisData = state.results.analysis;
const reportResult = await reportAgent.invoke({
analysisResult: analysisData?.analysis || '',
recommendations: analysisData?.recommendations || [],
report: '',
format: 'markdown',
});

return {
currentAgent: 'completed',
results: {
...state.results,
report: reportResult,
},
messages: ['报告生成完成'],
completed: true,
};
};

// 路由函数
const routeToNextAgent = (state: typeof MultiAgentStateAnnotation.State) => {
console.log(`当前代理: ${state.currentAgent}`);

switch (state.currentAgent) {
case 'research':
return 'research_agent';
case 'analysis':
return 'analysis_agent';
case 'report':
return 'report_agent';
case 'completed':
return END;
default:
return END;
}
};

// 创建主控制图
const multiAgentSystem = new StateGraph(MultiAgentStateAnnotation)
.addNode('initialize', initializeTask)
.addNode('research_agent', callResearchAgent)
.addNode('analysis_agent', callAnalysisAgent)
.addNode('report_agent', callReportAgent)
.addEdge(START, 'initialize')
.addConditionalEdges('initialize', routeToNextAgent)
.addConditionalEdges('research_agent', routeToNextAgent)
.addConditionalEdges('analysis_agent', routeToNextAgent)
.addConditionalEdges('report_agent', routeToNextAgent)
.compile();

// 使用示例
async function runMultiAgentExample() {
console.log('=== 多代理子图系统示例 ===');

const result = await multiAgentSystem.invoke({
task: '分析用户行为数据趋势',
currentAgent: '',
results: {},
messages: [],
completed: false,
});

console.log('\n=== 执行结果 ===');
console.log('任务:', result.task);
console.log('完成状态:', result.completed);
console.log('执行日志:', result.messages);
console.log('最终报告:');
console.log(result.results.report?.report);
}

// 并行多代理示例
const parallelResearchAgent1 = new StateGraph(ResearchAgentStateAnnotation)
.addNode('research', (state: typeof ResearchAgentStateAnnotation.State) => {
console.log(`并行研究代理1处理: ${state.query}`);
return {
findings: [
`代理1发现: ${state.query}相关信息A`,
`代理1发现: ${state.query}相关信息B`,
],
confidence: 0.8,
};
})
.addEdge(START, 'research')
.addEdge('research', END)
.compile();

const parallelResearchAgent2 = new StateGraph(ResearchAgentStateAnnotation)
.addNode('research', (state: typeof ResearchAgentStateAnnotation.State) => {
console.log(`并行研究代理2处理: ${state.query}`);
return {
findings: [
`代理2发现: ${state.query}相关信息C`,
`代理2发现: ${state.query}相关信息D`,
],
confidence: 0.9,
};
})
.addEdge(START, 'research')
.addEdge('research', END)
.compile();

// 并行执行多个研究代理
const parallelResearchCoordinator = async (
state: typeof MultiAgentStateAnnotation.State
) => {
console.log('并行执行多个研究代理...');

const [result1, result2] = await Promise.all([
parallelResearchAgent1.invoke({
query: state.task,
findings: [],
confidence: 0,
}),
parallelResearchAgent2.invoke({
query: state.task,
findings: [],
confidence: 0,
}),
]);

// 合并结果
const combinedFindings = [...result1.findings, ...result2.findings];
const avgConfidence = (result1.confidence + result2.confidence) / 2;

return {
results: {
parallelResearch: {
findings: combinedFindings,
confidence: avgConfidence,
},
},
messages: [`并行研究完成,共发现 ${combinedFindings.length} 项内容`],
};
};

const parallelMultiAgentSystem = new StateGraph(MultiAgentStateAnnotation)
.addNode('parallel_research', parallelResearchCoordinator)
.addEdge(START, 'parallel_research')
.addEdge('parallel_research', END)
.compile();

async function runParallelMultiAgentExample() {
console.log('\n=== 并行多代理示例 ===');

const result = await parallelMultiAgentSystem.invoke({
task: '市场趋势分析',
currentAgent: '',
results: {},
messages: [],
completed: false,
});

console.log('并行研究结果:', result.results.parallelResearch);
console.log('执行日志:', result.messages);
}

// 导出供其他模块使用
export {
researchAgent,
analysisAgent,
reportAgent,
multiAgentSystem,
parallelMultiAgentSystem,
runMultiAgentExample,
runParallelMultiAgentExample,
};

2. 代码重用

将通用功能封装为子图,在多个项目中重用:

代码重用的优势
  • 一致性:确保相同功能在不同项目中的行为一致
  • 维护性:只需在一个地方修改和维护代码
  • 测试性:可以独立测试子图功能
  • 协作性:不同团队可以并行开发不同的子图

3. 团队协作开发

不同团队可以独立开发不同的子图,只要遵循约定的接口规范:

高级功能:Command 导航

在复杂的多子图系统中,有时需要从一个子图跳转到另一个子图。LangGraph 提供了 Command 对象来实现这种导航:

子图间导航示例:

import '../../utils/loadEnv';
import {
StateGraph,
Annotation,
START,
END,
Command,
} from '@langchain/langgraph';

// 定义共享状态
const NavigationStateAnnotation = Annotation.Root({
currentTask: Annotation<string>(),
taskHistory: Annotation<string[]>({
reducer: (current: string[], update: string[]) => [...current, ...update],
default: () => [],
}),
results: Annotation<Record<string, any>>({
reducer: (current: Record<string, any>, update: Record<string, any>) => ({
...current,
...update,
}),
default: () => ({}),
}),
nextAction: Annotation<string>(),
});

// 数据处理子图
const DataProcessingSubgraph = new StateGraph(NavigationStateAnnotation)
.addNode('process', (state: typeof NavigationStateAnnotation.State) => {
console.log(`数据处理子图处理: ${state.currentTask}`);

// 模拟数据处理
const processedData = `处理结果: ${state.currentTask} - 数据已清洗和转换`;

// 根据处理结果决定下一步
const needsAnalysis = state.currentTask.includes('分析');

if (needsAnalysis) {
// 使用 Command 跳转到分析子图
return new Command({
update: {
results: { dataProcessing: processedData },
taskHistory: [`数据处理完成: ${state.currentTask}`],
nextAction: 'analysis',
},
goto: 'analysis_subgraph',
graph: Command.PARENT, // 跳转到父图的节点
});
} else {
// 直接完成
return {
results: { dataProcessing: processedData },
taskHistory: [`数据处理完成: ${state.currentTask}`],
nextAction: 'completed',
};
}
})
.addEdge(START, 'process')
.addEdge('process', END)
.compile();

// 分析子图
const AnalysisSubgraph = new StateGraph(NavigationStateAnnotation)
.addNode('analyze', (state: typeof NavigationStateAnnotation.State) => {
console.log(`分析子图处理: ${state.currentTask}`);

const analysisResult = `分析结果: 基于 "${state.results.dataProcessing}" 的深度分析`;

// 检查是否需要生成报告
const needsReport = state.currentTask.includes('报告');

if (needsReport) {
return new Command({
update: {
results: {
...state.results,
analysis: analysisResult,
},
taskHistory: [`分析完成: ${state.currentTask}`],
nextAction: 'report',
},
goto: 'report_subgraph',
graph: Command.PARENT,
});
} else {
return {
results: {
...state.results,
analysis: analysisResult,
},
taskHistory: [`分析完成: ${state.currentTask}`],
nextAction: 'completed',
};
}
})
.addEdge(START, 'analyze')
.addEdge('analyze', END)
.compile();

// 报告生成子图
const ReportSubgraph = new StateGraph(NavigationStateAnnotation)
.addNode('generate', (state: typeof NavigationStateAnnotation.State) => {
console.log(`报告子图处理: ${state.currentTask}`);

const report = `
# 任务报告: ${state.currentTask}

## 数据处理结果
${state.results.dataProcessing || '无'}

## 分析结果
${state.results.analysis || '无'}

## 总结
任务已成功完成,所有步骤均按预期执行。
`.trim();

return {
results: {
...state.results,
report,
},
taskHistory: [`报告生成完成: ${state.currentTask}`],
nextAction: 'completed',
};
})
.addEdge(START, 'generate')
.addEdge('generate', END)
.compile();

// 主控制图
const mainController = new StateGraph(NavigationStateAnnotation)
.addNode('initialize', (state: typeof NavigationStateAnnotation.State) => {
console.log(`初始化任务: ${state.currentTask}`);
return {
taskHistory: [`任务开始: ${state.currentTask}`],
nextAction: 'data_processing',
};
})
.addNode('data_processing_subgraph', DataProcessingSubgraph)
.addNode('analysis_subgraph', AnalysisSubgraph)
.addNode('report_subgraph', ReportSubgraph)
.addNode('complete', (state: typeof NavigationStateAnnotation.State) => {
console.log('任务完成');
return {
taskHistory: [`任务完成: ${state.currentTask}`],
nextAction: 'done',
};
})
.addEdge(START, 'initialize')
.addEdge('initialize', 'data_processing_subgraph')
// 注意:子图通过 Command 对象动态路由,不需要预定义边
.addConditionalEdges(
'data_processing_subgraph',
(state: typeof NavigationStateAnnotation.State) => {
switch (state.nextAction) {
case 'analysis':
return 'analysis_subgraph';
case 'completed':
return 'complete';
default:
return END;
}
}
)
.addConditionalEdges(
'analysis_subgraph',
(state: typeof NavigationStateAnnotation.State) => {
switch (state.nextAction) {
case 'report':
return 'report_subgraph';
case 'completed':
return 'complete';
default:
return END;
}
}
)
.addEdge('report_subgraph', 'complete')
.addEdge('complete', END)
.compile();

// 使用示例
async function runCommandNavigationExample() {
console.log('=== Command 导航示例 ===');

// 测试不同的任务类型
const tasks = ['简单数据处理', '数据分析任务', '完整分析报告任务'];

for (const task of tasks) {
console.log(`\n--- 执行任务: ${task} ---`);

const result = await mainController.invoke({
currentTask: task,
taskHistory: [],
results: {},
nextAction: '',
});

console.log('任务历史:', result.taskHistory);
console.log('最终结果键:', Object.keys(result.results));

if (result.results.report) {
console.log('生成的报告:');
console.log(result.results.report);
}
}
}

// 高级示例:动态子图选择
const DynamicSubgraphSelector = new StateGraph(NavigationStateAnnotation)
.addNode('selector', (state: typeof NavigationStateAnnotation.State) => {
console.log(`动态选择器处理: ${state.currentTask}`);

// 根据任务内容动态选择下一个子图
if (state.currentTask.includes('紧急')) {
return new Command({
update: {
taskHistory: ['选择紧急处理流程'],
nextAction: 'urgent_processing',
},
goto: 'urgent_subgraph',
graph: Command.PARENT,
});
} else if (state.currentTask.includes('批量')) {
return new Command({
update: {
taskHistory: ['选择批量处理流程'],
nextAction: 'batch_processing',
},
goto: 'batch_subgraph',
graph: Command.PARENT,
});
} else {
return new Command({
update: {
taskHistory: ['选择标准处理流程'],
nextAction: 'standard_processing',
},
goto: 'data_processing_subgraph',
graph: Command.PARENT,
});
}
})
.addEdge(START, 'selector')
.addEdge('selector', END)
.compile();

// 紧急处理子图
const UrgentSubgraph = new StateGraph(NavigationStateAnnotation)
.addNode(
'urgent_process',
(state: typeof NavigationStateAnnotation.State) => {
console.log(`紧急处理: ${state.currentTask}`);
return {
results: { urgentProcessing: `紧急处理完成: ${state.currentTask}` },
taskHistory: [`紧急处理完成`],
nextAction: 'completed',
};
}
)
.addEdge(START, 'urgent_process')
.addEdge('urgent_process', END)
.compile();

// 批量处理子图
const BatchSubgraph = new StateGraph(NavigationStateAnnotation)
.addNode('batch_process', (state: typeof NavigationStateAnnotation.State) => {
console.log(`批量处理: ${state.currentTask}`);
return {
results: { batchProcessing: `批量处理完成: ${state.currentTask}` },
taskHistory: [`批量处理完成`],
nextAction: 'completed',
};
})
.addEdge(START, 'batch_process')
.addEdge('batch_process', END)
.compile();

// 动态路由主控制器
const dynamicController = new StateGraph(NavigationStateAnnotation)
.addNode('dynamic_selector', DynamicSubgraphSelector)
.addNode('data_processing_subgraph', DataProcessingSubgraph)
.addNode('urgent_subgraph', UrgentSubgraph)
.addNode('batch_subgraph', BatchSubgraph)
.addNode('final', (state: typeof NavigationStateAnnotation.State) => {
console.log('动态路由任务完成');
return { taskHistory: [`动态路由完成: ${state.currentTask}`] };
})
.addEdge(START, 'dynamic_selector')
.addConditionalEdges(
'dynamic_selector',
(state: typeof NavigationStateAnnotation.State) => {
switch (state.nextAction) {
case 'urgent_processing':
return 'urgent_subgraph';
case 'batch_processing':
return 'batch_subgraph';
case 'standard_processing':
return 'data_processing_subgraph';
default:
return 'final';
}
}
)
.addEdge('urgent_subgraph', 'final')
.addEdge('batch_subgraph', 'final')
.addEdge('data_processing_subgraph', 'final')
.addEdge('final', END)
.compile();

async function runDynamicNavigationExample() {
console.log('\n=== 动态导航示例 ===');

const dynamicTasks = ['紧急数据修复', '批量用户导入', '标准数据处理'];

for (const task of dynamicTasks) {
console.log(`\n--- 动态处理任务: ${task} ---`);

const result = await dynamicController.invoke({
currentTask: task,
taskHistory: [],
results: {},
nextAction: '',
});

console.log('处理历史:', result.taskHistory);
console.log('处理结果:', result.results);
}
}

// 导出供其他模块使用
export {
DataProcessingSubgraph,
AnalysisSubgraph,
ReportSubgraph,
mainController,
dynamicController,
runCommandNavigationExample,
runDynamicNavigationExample,
};

最佳实践

1. 状态设计原则

  • 最小化共享状态:只共享必要的状态键
  • 明确接口定义:清楚地定义子图的输入和输出
  • 类型安全:使用 TypeScript 确保状态类型的一致性

2. 性能考虑

性能注意事项
  • 避免在同一节点中多次调用子图:这可能导致检查点命名空间冲突
  • 合理使用检查点:如果不需要中断/恢复功能,可以禁用子图的检查点
  • 状态大小控制:避免在状态中传递过大的数据对象

3. 错误处理

// 在子图调用中添加错误处理
const callSubgraphSafely = async (state: typeof StateAnnotation.State) => {
try {
const result = await subgraph.invoke({
input: state.data
});
return { result: result.output };
} catch (error) {
console.error('子图执行失败:', error);
return { error: error.message };
}
};

4. 调试和监控

  • 使用 LangGraph Studio:可视化子图的执行流程
  • 添加日志记录:在关键节点添加日志输出
  • 状态快照:定期保存状态快照用于调试

与其他架构模式的关系

子图与我们之前学习的架构模式有着密切的关系:

  • 与 ReAct 架构:可以将 ReAct 模式封装为子图,在多个场景中重用
  • 与多代理系统:子图是构建多代理系统的基础组件
  • 与并行处理:可以并行执行多个子图来提高性能

小结与延伸

子图是 LangGraph 中实现模块化和代码重用的重要机制。通过合理使用子图,我们可以:

  1. 提高代码重用性:将通用功能封装为可重用的模块
  2. 支持团队协作:不同团队可以并行开发不同的子图
  3. 简化复杂系统:将大型系统分解为多个可管理的子图
  4. 增强可维护性:每个子图可以独立测试和维护

在下一节并行处理中,我们将学习如何利用 LangGraph 的并行能力来提高应用性能,其中也会涉及到子图的并行执行。

进一步学习
  • 尝试将现有的复杂图分解为多个子图
  • 实践不同的状态通信模式
  • 探索子图在实际项目中的应用场景
  • 学习如何在团队中协作开发子图