⚡ 并行处理
引言
在现代 AI 应用中,性能和效率至关重要。LangGraphJS 提供了强大的并行处理能力,让你能够同时执行多个任务,显著提升应用的响应速度和吞吐量。
并行处理在以下场景中特别有用:
- 多代理协作:多个 AI 代理同时处理不同方面的任务
- 批量数据处理:对大量数据进行并行分析和处理
- 独立任务执行:同时执行多个不相关的操作
- Map-Reduce 模式:将复杂任务分解为可并行执行的子任务
核心概念
超级步骤(Super-Step)
在 LangGraphJS 中,超级步骤是并行处理的基本单位:
- 同一超级步骤中的节点会并行执行
- 不同超级步骤中的节点按顺序执行
- 每个超级步骤完成后,状态会被合并和同步
Send API
Send API 是 LangGraphJS 中实现动态并行处理的核心机制:
- 允许在运行时动态创建并行任务
- 每个 Send 对象包含目标节点和特定的状态
- 支持 Map-Reduce 等复杂并行模式
基础并行执行
让我们从一个简单的并行执行示例开始:
基础并行处理
import { StateGraph, Annotation, START, END } from '@langchain/langgraph';
// 并行处理状态
const ParallelState = Annotation.Root({
input: Annotation<string>(),
results: Annotation<Record<string, string>>({
reducer: (state, update) => ({ ...state, ...update }),
default: () => ({}),
}),
finalOutput: Annotation<string>(),
});
// 并行任务A:转大写
const taskA = async (state: typeof ParallelState.State) => {
await new Promise((resolve) => setTimeout(resolve, 500));
return { results: { taskA: state.input.toUpperCase() } };
};
// 并行任务B:计算长度
const taskB = async (state: typeof ParallelState.State) => {
await new Promise((resolve) => setTimeout(resolve, 300));
return { results: { taskB: `${state.input.length} 个字符` } };
};
// 并行任务C:反转字符串
const taskC = async (state: typeof ParallelState.State) => {
await new Promise((resolve) => setTimeout(resolve, 400));
return { results: { taskC: state.input.split('').reverse().join('') } };
};
// 聚合结果
const aggregateResults = (state: typeof ParallelState.State) => {
const { results } = state;
if (Object.keys(results).length === 3) {
const finalOutput = [
`📊 并行处理完成!`,
`• 任务A: ${results.taskA}`,
`• 任务B: ${results.taskB}`,
`• 任务C: ${results.taskC}`,
].join('\n');
return { finalOutput };
}
return {};
};
// 构建并行图
const app = new StateGraph(ParallelState)
.addNode('taskA', taskA)
.addNode('taskB', taskB)
.addNode('taskC', taskC)
.addNode('aggregate', aggregateResults)
.addEdge(START, 'taskA')
.addEdge(START, 'taskB')
.addEdge(START, 'taskC')
.addEdge('taskA', 'aggregate')
.addEdge('taskB', 'aggregate')
.addEdge('taskC', 'aggregate')
.addEdge('aggregate', END)
.compile();
这个示例展示了如何让多个节点在同一个超级步骤中并行执行。
Send API 详解
Send API 提供了更灵活的并行处理方式,特别适用于动态任务创建:
Send API 使用
import { StateGraph, Annotation, START, END, Send } from '@langchain/langgraph';
// Send API 状态
const SendState = Annotation.Root({
tasks: Annotation<string[]>(),
results: Annotation<string[]>({
reducer: (state, update) => state.concat(update),
default: () => [],
}),
summary: Annotation<string>(),
});
// 任务分发器 - 动态创建并行任务
const taskDispatcher = (state: typeof SendState.State) => {
// 为每个任务创建 Send 对象
return state.tasks.map((task, index) =>
new Send('processTask', { taskId: index, content: task })
);
};
// 任务处理器
const processTask = async (state: { taskId: number; content: string }) => {
await new Promise((resolve) => setTimeout(resolve, 200));
const result = `任务${state.taskId}: ${state.content.toUpperCase()}`;
return { results: [result] };
};
// 结果汇总器
const summarizeResults = (state: typeof SendState.State) => {
if (state.results.length < state.tasks.length) return {};
const summary = [
'📊 处理结果汇总:',
...state.results.map(result => `• ${result}`),
`✅ 完成 ${state.tasks.length} 个任务`,
].join('\n');
return { summary };
};
// 构建 Send API 图
const sendApp = new StateGraph(SendState)
.addNode('dispatcher', taskDispatcher)
.addNode('processTask', processTask)
.addNode('summarize', summarizeResults)
.addEdge(START, 'dispatcher')
.addEdge('processTask', 'summarize')
.addEdge('summarize', END)
.compile();
Send API 的优势
Send API 的核心优势
- 动态任务创建:可以根据运行时数据动态决定并行任务数量
- 独立状态管理:每个并行任务可以有自己的状态
- 灵活的数据流:支持复杂的数据传递和聚合模式
- 错误隔离:单个任务的失败不会影响其他并行任务
Map-Reduce 模式
Map-Reduce 是并行处理中的经典模式,特别适用于批量数据处理:
Map-Reduce 实现
import { StateGraph, Annotation, START, END, Send } from '@langchain/langgraph';
// Map-Reduce 状态
const MapReduceState = Annotation.Root({
data: Annotation<string[]>(),
mapResults: Annotation<string[]>({
reducer: (state, update) => state.concat(update),
default: () => [],
}),
finalResult: Annotation<string>(),
});
// Map 阶段:数据分发器
const mapDispatcher = (state: typeof MapReduceState.State) => {
const batchSize = 2;
const batches: string[][] = [];
for (let i = 0; i < state.data.length; i += batchSize) {
batches.push(state.data.slice(i, i + batchSize));
}
// 为每个批次创建并行任务
return batches.map((batch, index) =>
new Send('mapProcessor', { batchId: index, items: batch })
);
};
// Map 处理器:转换数据
const mapProcessor = async (state: { batchId: number; items: string[] }) => {
await new Promise((resolve) => setTimeout(resolve, 100));
// Map 操作:将字符串转大写
const processed = state.items.map(item => item.toUpperCase());
return {
mapResults: processed.map(p => `批次${state.batchId}: ${p}`)
};
};
// Reduce 阶段:聚合结果
const reduceAggregator = (state: typeof MapReduceState.State) => {
if (state.mapResults.length === 0) return {};
const finalResult = [
'🗺️ Map-Reduce 结果:',
...state.mapResults,
`✅ 处理了 ${state.mapResults.length} 个项目`,
].join('\n');
return { finalResult };
};
// 构建 Map-Reduce 图
const mapReduceApp = new StateGraph(MapReduceState)
.addNode('mapDispatcher', mapDispatcher)
.addNode('mapProcessor', mapProcessor)
.addNode('reduceAggregator', reduceAggregator)
.addEdge(START, 'mapDispatcher')
.addEdge('mapProcessor', 'reduceAggregator')
.addEdge('reduceAggregator', END)
.compile();
Map-Reduce 执行流程
实际应用场景
多代理协作系统
在复杂的 AI 应用中,不同的代理可以并行处理任务的不同方面:
多代理并行协作
import { StateGraph, Annotation, START, END, Send } from '@langchain/langgraph';
// 多代理协作状态
const MultiAgentState = Annotation.Root({
query: Annotation<string>(),
results: Annotation<Record<string, string>>({
reducer: (state, update) => ({ ...state, ...update }),
default: () => ({}),
}),
finalResponse: Annotation<string>(),
});
// 任务分发器 - 根据查询类型选择代理
const taskDispatcher = (state: typeof MultiAgentState.State) => {
const query = state.query.toLowerCase();
const agents = ['research', 'analysis', 'creative'];
// 智能选择相关代理
const selectedAgents = agents.filter(agent => {
if (agent === 'research') return query.includes('研究') || query.includes('查找');
if (agent === 'analysis') return query.includes('分析') || query.includes('比较');
if (agent === 'creative') return query.includes('创建') || query.includes('设计');
return true; // 默认包含
});
return selectedAgents.map(agentType =>
new Send('processAgent', { agentType, query: state.query })
);
};
// 代理处理器
const processAgent = async (state: { agentType: string; query: string }) => {
await new Promise((resolve) => setTimeout(resolve, 300));
const responses = {
research: `🔍 研究结果:关于"${state.query}"的背景信息`,
analysis: `📊 分析结果:"${state.query}"的深度评估`,
creative: `💡 创意建议:关于"${state.query}"的创新方案`,
};
const result = responses[state.agentType as keyof typeof responses] ||
`🤖 ${state.agentType}代理的处理结果`;
return { results: { [state.agentType]: result } };
};
// 结果综合器
const synthesizeResults = (state: typeof MultiAgentState.State) => {
if (Object.keys(state.results).length === 0) return {};
const finalResponse = [
'🤝 多代理协作结果',
'',
...Object.entries(state.results).map(([agent, result]) =>
`【${agent}代理】${result}`
),
'',
'🎯 综合建议:基于多个代理的专业分析,为您提供全面解决方案。'
].join('\n');
return { finalResponse };
};
// 构建多代理协作图
const multiAgentApp = new StateGraph(MultiAgentState)
.addNode('taskDispatcher', taskDispatcher)
.addNode('processAgent', processAgent)
.addNode('synthesizeResults', synthesizeResults)
.addEdge(START, 'taskDispatcher')
.addEdge('processAgent', 'synthesizeResults')
.addEdge('synthesizeResults', END)
.compile();
批量文档处理
对于需要处理大量文档的场景,并行处理可以显著提升效率:
批量文档处理
import { StateGraph, Annotation, START, END, Send } from '@langchain/langgraph';
// 批量处理状态
const BatchState = Annotation.Root({
documents: Annotation<string[]>(),
processed: Annotation<string[]>({
reducer: (state, update) => state.concat(update),
default: () => [],
}),
statistics: Annotation<{
total: number;
processed: number;
startTime: string;
}>(),
});
// 批量分发器
const batchDispatcher = (state: typeof BatchState.State) => {
const batchSize = 2;
const batches: string[][] = [];
for (let i = 0; i < state.documents.length; i += batchSize) {
batches.push(state.documents.slice(i, i + batchSize));
}
return batches.map((batch, index) =>
new Send('processBatch', { batchId: index, items: batch })
);
};
// 批处理器
const processBatch = async (state: { batchId: number; items: string[] }) => {
await new Promise((resolve) => setTimeout(resolve, 200));
// 处理文档:转大写并添加统计
const processed = state.items.map((doc, index) =>
`📄 文档${state.batchId}-${index}: ${doc.toUpperCase()}`
);
return {
processed,
statistics: { processed: processed.length }
};
};
// 汇总器
const batchSummarizer = (state: typeof BatchState.State) => {
if (state.processed.length < state.documents.length) return {};
const endTime = new Date().toISOString();
const summary = [
'📊 批量处理汇总',
`• 总文档数: ${state.documents.length}`,
`• 处理完成: ${state.processed.length}`,
`• 成功率: 100%`,
`• 完成时间: ${endTime}`,
'',
'📋 处理结果:',
...state.processed,
].join('\n');
return {
statistics: {
total: state.documents.length,
processed: state.processed.length,
startTime: state.statistics?.startTime || endTime,
},
summary
};
};
// 构建批量处理图
const batchApp = new StateGraph(BatchState)
.addNode('batchDispatcher', batchDispatcher)
.addNode('processBatch', processBatch)
.addNode('batchSummarizer', batchSummarizer)
.addEdge(START, 'batchDispatcher')
.addEdge('processBatch', 'batchSummarizer')
.addEdge('batchSummarizer', END)
.compile();
性能优化和最佳实践
1. 合理控制并发数量
并发控制
虽然并行处理可以提升性能,但过多的并发可能导致:
- API 限流
- 内存占用过高
- 系统资源竞争
建议根据系统资源和 API 限制合理设置并发数量。
2. 错误处理策略
// 并行节点错误处理示例
const robustNode = async (state: any) => {
try {
const result = await processTask(state.task);
return { result, status: 'success' };
} catch (error) {
return {
error: error.message,
status: 'failed',
fallback: 'default result'
};
}
};
3. 状态管理优化
// 高效状态管理策略
const StateAnnotation = Annotation.Root({
// 使用数组合并避免数据丢失
results: Annotation<string[]>({
reducer: (state, update) => state.concat(update),
default: () => [],
}),
// 使用 Map 提升查找效率
resultCache: Annotation<Map<string, string>>({
reducer: (state, update) => {
const newMap = new Map(state);
update.forEach((value, key) => newMap.set(key, value));
return newMap;
},
default: () => new Map(),
}),
});
4. 监控和调试
// 执行时间监控示例
const monitoredNode = async (state: any) => {
const startTime = Date.now();
try {
const result = await processTask(state.task);
const duration = Date.now() - startTime;
console.log(`任务完成,耗时: ${duration}ms`);
return {
result,
executionTime: duration,
timestamp: new Date().toISOString()
};
} catch (error) {
console.error('任务执行失败:', error);
throw error;
}
};
流式并行处理
LangGraphJS 还支持流式并行处理,让你能够实时观察并行任务的执行进度:
流式并行处理
import { StateGraph, Annotation, START, END } from '@langchain/langgraph';
// 流式并行处理状态
const StreamingState = Annotation.Root({
input: Annotation<string>(),
results: Annotation<Record<string, any>>({
reducer: (state, update) => ({ ...state, ...update }),
default: () => ({}),
}),
completed: Annotation<number>({
reducer: (state, update) => state + update,
default: () => 0,
}),
finalOutput: Annotation<string>(),
});
// 任务分发器
const distributeTask = (state: typeof StreamingState.State) => {
return {
results: {},
completed: 0,
tasks: ['process', 'analyze', 'validate']
};
};
// 数据处理任务(模拟流式)
const processData = async (state: typeof StreamingState.State) => {
const steps = ['初始化', '处理数据', '生成结果'];
for (let i = 0; i < steps.length; i++) {
await new Promise((resolve) => setTimeout(resolve, 100));
console.log(`📡 流式进度 ${i + 1}/${steps.length}: ${steps[i]}`);
}
return {
results: { process: `✅ 处理完成: ${state.input.toUpperCase()}` },
completed: 1
};
};
// 内容分析任务
const analyzeContent = async (state: typeof StreamingState.State) => {
await new Promise((resolve) => setTimeout(resolve, 200));
return {
results: { analyze: `📊 分析完成: ${state.input.length} 字符` },
completed: 1
};
};
// 质量验证任务
const validateResult = async (state: typeof StreamingState.State) => {
await new Promise((resolve) => setTimeout(resolve, 150));
return {
results: { validate: `✅ 验证通过: ${new Date().toISOString()}` },
completed: 1
};
};
// 结果聚合器
const aggregateResults = (state: typeof StreamingState.State) => {
if (state.completed < 3) return {};
const finalOutput = [
'🌟 流式并行处理完成',
'',
...Object.entries(state.results).map(([task, result]) => result),
'',
`🎯 总计完成 ${state.completed} 个任务`
].join('\n');
return { finalOutput };
};
// 构建流式并行处理图
const streamingApp = new StateGraph(StreamingState)
.addNode('distribute', distributeTask)
.addNode('processData', processData)
.addNode('analyzeContent', analyzeContent)
.addNode('validateResult', validateResult)
.addNode('aggregate', aggregateResults)
.addEdge(START, 'distribute')
.addEdge('distribute', 'processData')
.addEdge('distribute', 'analyzeContent')
.addEdge('distribute', 'validateResult')
.addEdge('processData', 'aggregate')
.addEdge('analyzeContent', 'aggregate')
.addEdge('validateResult', 'aggregate')
.addEdge('aggregate', END)
.compile();
小结
并行处理是 LangGraphJS 的核心优势之一,它让你能够:
- 提升性能:通过并行执行减少总体执行时间
- 增强可扩展性:轻松处理大规模数据和复杂任务
- 改善用户体验:通过流式处理提供实时反馈
- 优化资源利用:充分利用系统的计算资源
在下一节子图中,我们将学习如何构建更复杂的图结构,进一步提升应用的模块化和可维护性。
关键要点
- 理解超级步骤的概念,合理设计并行执行流程
- 掌握 Send API 的使用,实现动态并行任务创建
- 应用 Map-Reduce 模式处理批量数据
- 注意并发控制和错误处理
- 利用流式处理提供更好的用户体验