跳到主要内容

⚡ 并行处理

引言

在现代 AI 应用中,性能和效率至关重要。LangGraphJS 提供了强大的并行处理能力,让你能够同时执行多个任务,显著提升应用的响应速度和吞吐量。

并行处理在以下场景中特别有用:

  • 多代理协作:多个 AI 代理同时处理不同方面的任务
  • 批量数据处理:对大量数据进行并行分析和处理
  • 独立任务执行:同时执行多个不相关的操作
  • Map-Reduce 模式:将复杂任务分解为可并行执行的子任务

核心概念

超级步骤(Super-Step)

在 LangGraphJS 中,超级步骤是并行处理的基本单位:

  • 同一超级步骤中的节点会并行执行
  • 不同超级步骤中的节点按顺序执行
  • 每个超级步骤完成后,状态会被合并和同步

Send API

Send API 是 LangGraphJS 中实现动态并行处理的核心机制:

  • 允许在运行时动态创建并行任务
  • 每个 Send 对象包含目标节点和特定的状态
  • 支持 Map-Reduce 等复杂并行模式

基础并行执行

让我们从一个简单的并行执行示例开始:

基础并行处理
import { StateGraph, Annotation, START, END } from '@langchain/langgraph';

// 并行处理状态
const ParallelState = Annotation.Root({
input: Annotation<string>(),
results: Annotation<Record<string, string>>({
reducer: (state, update) => ({ ...state, ...update }),
default: () => ({}),
}),
finalOutput: Annotation<string>(),
});

// 并行任务A:转大写
const taskA = async (state: typeof ParallelState.State) => {
await new Promise((resolve) => setTimeout(resolve, 500));
return { results: { taskA: state.input.toUpperCase() } };
};

// 并行任务B:计算长度
const taskB = async (state: typeof ParallelState.State) => {
await new Promise((resolve) => setTimeout(resolve, 300));
return { results: { taskB: `${state.input.length} 个字符` } };
};

// 并行任务C:反转字符串
const taskC = async (state: typeof ParallelState.State) => {
await new Promise((resolve) => setTimeout(resolve, 400));
return { results: { taskC: state.input.split('').reverse().join('') } };
};

// 聚合结果
const aggregateResults = (state: typeof ParallelState.State) => {
const { results } = state;
if (Object.keys(results).length === 3) {
const finalOutput = [
`📊 并行处理完成!`,
`• 任务A: ${results.taskA}`,
`• 任务B: ${results.taskB}`,
`• 任务C: ${results.taskC}`,
].join('\n');
return { finalOutput };
}
return {};
};

// 构建并行图
const app = new StateGraph(ParallelState)
.addNode('taskA', taskA)
.addNode('taskB', taskB)
.addNode('taskC', taskC)
.addNode('aggregate', aggregateResults)
.addEdge(START, 'taskA')
.addEdge(START, 'taskB')
.addEdge(START, 'taskC')
.addEdge('taskA', 'aggregate')
.addEdge('taskB', 'aggregate')
.addEdge('taskC', 'aggregate')
.addEdge('aggregate', END)
.compile();

这个示例展示了如何让多个节点在同一个超级步骤中并行执行。

Send API 详解

Send API 提供了更灵活的并行处理方式,特别适用于动态任务创建:

Send API 使用
import { StateGraph, Annotation, START, END, Send } from '@langchain/langgraph';

// Send API 状态
const SendState = Annotation.Root({
tasks: Annotation<string[]>(),
results: Annotation<string[]>({
reducer: (state, update) => state.concat(update),
default: () => [],
}),
summary: Annotation<string>(),
});

// 任务分发器 - 动态创建并行任务
const taskDispatcher = (state: typeof SendState.State) => {
// 为每个任务创建 Send 对象
return state.tasks.map((task, index) =>
new Send('processTask', { taskId: index, content: task })
);
};

// 任务处理器
const processTask = async (state: { taskId: number; content: string }) => {
await new Promise((resolve) => setTimeout(resolve, 200));
const result = `任务${state.taskId}: ${state.content.toUpperCase()}`;
return { results: [result] };
};

// 结果汇总器
const summarizeResults = (state: typeof SendState.State) => {
if (state.results.length < state.tasks.length) return {};

const summary = [
'📊 处理结果汇总:',
...state.results.map(result => `${result}`),
`✅ 完成 ${state.tasks.length} 个任务`,
].join('\n');

return { summary };
};

// 构建 Send API 图
const sendApp = new StateGraph(SendState)
.addNode('dispatcher', taskDispatcher)
.addNode('processTask', processTask)
.addNode('summarize', summarizeResults)
.addEdge(START, 'dispatcher')
.addEdge('processTask', 'summarize')
.addEdge('summarize', END)
.compile();

Send API 的优势

Send API 的核心优势
  1. 动态任务创建:可以根据运行时数据动态决定并行任务数量
  2. 独立状态管理:每个并行任务可以有自己的状态
  3. 灵活的数据流:支持复杂的数据传递和聚合模式
  4. 错误隔离:单个任务的失败不会影响其他并行任务

Map-Reduce 模式

Map-Reduce 是并行处理中的经典模式,特别适用于批量数据处理:

Map-Reduce 实现
import { StateGraph, Annotation, START, END, Send } from '@langchain/langgraph';

// Map-Reduce 状态
const MapReduceState = Annotation.Root({
data: Annotation<string[]>(),
mapResults: Annotation<string[]>({
reducer: (state, update) => state.concat(update),
default: () => [],
}),
finalResult: Annotation<string>(),
});

// Map 阶段:数据分发器
const mapDispatcher = (state: typeof MapReduceState.State) => {
const batchSize = 2;
const batches: string[][] = [];

for (let i = 0; i < state.data.length; i += batchSize) {
batches.push(state.data.slice(i, i + batchSize));
}

// 为每个批次创建并行任务
return batches.map((batch, index) =>
new Send('mapProcessor', { batchId: index, items: batch })
);
};

// Map 处理器:转换数据
const mapProcessor = async (state: { batchId: number; items: string[] }) => {
await new Promise((resolve) => setTimeout(resolve, 100));

// Map 操作:将字符串转大写
const processed = state.items.map(item => item.toUpperCase());

return {
mapResults: processed.map(p => `批次${state.batchId}: ${p}`)
};
};

// Reduce 阶段:聚合结果
const reduceAggregator = (state: typeof MapReduceState.State) => {
if (state.mapResults.length === 0) return {};

const finalResult = [
'🗺️ Map-Reduce 结果:',
...state.mapResults,
`✅ 处理了 ${state.mapResults.length} 个项目`,
].join('\n');

return { finalResult };
};

// 构建 Map-Reduce 图
const mapReduceApp = new StateGraph(MapReduceState)
.addNode('mapDispatcher', mapDispatcher)
.addNode('mapProcessor', mapProcessor)
.addNode('reduceAggregator', reduceAggregator)
.addEdge(START, 'mapDispatcher')
.addEdge('mapProcessor', 'reduceAggregator')
.addEdge('reduceAggregator', END)
.compile();

Map-Reduce 执行流程

实际应用场景

多代理协作系统

在复杂的 AI 应用中,不同的代理可以并行处理任务的不同方面:

多代理并行协作
import { StateGraph, Annotation, START, END, Send } from '@langchain/langgraph';

// 多代理协作状态
const MultiAgentState = Annotation.Root({
query: Annotation<string>(),
results: Annotation<Record<string, string>>({
reducer: (state, update) => ({ ...state, ...update }),
default: () => ({}),
}),
finalResponse: Annotation<string>(),
});

// 任务分发器 - 根据查询类型选择代理
const taskDispatcher = (state: typeof MultiAgentState.State) => {
const query = state.query.toLowerCase();
const agents = ['research', 'analysis', 'creative'];

// 智能选择相关代理
const selectedAgents = agents.filter(agent => {
if (agent === 'research') return query.includes('研究') || query.includes('查找');
if (agent === 'analysis') return query.includes('分析') || query.includes('比较');
if (agent === 'creative') return query.includes('创建') || query.includes('设计');
return true; // 默认包含
});

return selectedAgents.map(agentType =>
new Send('processAgent', { agentType, query: state.query })
);
};

// 代理处理器
const processAgent = async (state: { agentType: string; query: string }) => {
await new Promise((resolve) => setTimeout(resolve, 300));

const responses = {
research: `🔍 研究结果:关于"${state.query}"的背景信息`,
analysis: `📊 分析结果:"${state.query}"的深度评估`,
creative: `💡 创意建议:关于"${state.query}"的创新方案`,
};

const result = responses[state.agentType as keyof typeof responses] ||
`🤖 ${state.agentType}代理的处理结果`;

return { results: { [state.agentType]: result } };
};

// 结果综合器
const synthesizeResults = (state: typeof MultiAgentState.State) => {
if (Object.keys(state.results).length === 0) return {};

const finalResponse = [
'🤝 多代理协作结果',
'',
...Object.entries(state.results).map(([agent, result]) =>
`${agent}代理】${result}`
),
'',
'🎯 综合建议:基于多个代理的专业分析,为您提供全面解决方案。'
].join('\n');

return { finalResponse };
};

// 构建多代理协作图
const multiAgentApp = new StateGraph(MultiAgentState)
.addNode('taskDispatcher', taskDispatcher)
.addNode('processAgent', processAgent)
.addNode('synthesizeResults', synthesizeResults)
.addEdge(START, 'taskDispatcher')
.addEdge('processAgent', 'synthesizeResults')
.addEdge('synthesizeResults', END)
.compile();

批量文档处理

对于需要处理大量文档的场景,并行处理可以显著提升效率:

批量文档处理
import { StateGraph, Annotation, START, END, Send } from '@langchain/langgraph';

// 批量处理状态
const BatchState = Annotation.Root({
documents: Annotation<string[]>(),
processed: Annotation<string[]>({
reducer: (state, update) => state.concat(update),
default: () => [],
}),
statistics: Annotation<{
total: number;
processed: number;
startTime: string;
}>(),
});

// 批量分发器
const batchDispatcher = (state: typeof BatchState.State) => {
const batchSize = 2;
const batches: string[][] = [];

for (let i = 0; i < state.documents.length; i += batchSize) {
batches.push(state.documents.slice(i, i + batchSize));
}

return batches.map((batch, index) =>
new Send('processBatch', { batchId: index, items: batch })
);
};

// 批处理器
const processBatch = async (state: { batchId: number; items: string[] }) => {
await new Promise((resolve) => setTimeout(resolve, 200));

// 处理文档:转大写并添加统计
const processed = state.items.map((doc, index) =>
`📄 文档${state.batchId}-${index}: ${doc.toUpperCase()}`
);

return {
processed,
statistics: { processed: processed.length }
};
};

// 汇总器
const batchSummarizer = (state: typeof BatchState.State) => {
if (state.processed.length < state.documents.length) return {};

const endTime = new Date().toISOString();
const summary = [
'📊 批量处理汇总',
`• 总文档数: ${state.documents.length}`,
`• 处理完成: ${state.processed.length}`,
`• 成功率: 100%`,
`• 完成时间: ${endTime}`,
'',
'📋 处理结果:',
...state.processed,
].join('\n');

return {
statistics: {
total: state.documents.length,
processed: state.processed.length,
startTime: state.statistics?.startTime || endTime,
},
summary
};
};

// 构建批量处理图
const batchApp = new StateGraph(BatchState)
.addNode('batchDispatcher', batchDispatcher)
.addNode('processBatch', processBatch)
.addNode('batchSummarizer', batchSummarizer)
.addEdge(START, 'batchDispatcher')
.addEdge('processBatch', 'batchSummarizer')
.addEdge('batchSummarizer', END)
.compile();

性能优化和最佳实践

1. 合理控制并发数量

并发控制

虽然并行处理可以提升性能,但过多的并发可能导致:

  • API 限流
  • 内存占用过高
  • 系统资源竞争

建议根据系统资源和 API 限制合理设置并发数量。

2. 错误处理策略

// 并行节点错误处理示例
const robustNode = async (state: any) => {
try {
const result = await processTask(state.task);
return { result, status: 'success' };
} catch (error) {
return {
error: error.message,
status: 'failed',
fallback: 'default result'
};
}
};

3. 状态管理优化

// 高效状态管理策略
const StateAnnotation = Annotation.Root({
// 使用数组合并避免数据丢失
results: Annotation<string[]>({
reducer: (state, update) => state.concat(update),
default: () => [],
}),

// 使用 Map 提升查找效率
resultCache: Annotation<Map<string, string>>({
reducer: (state, update) => {
const newMap = new Map(state);
update.forEach((value, key) => newMap.set(key, value));
return newMap;
},
default: () => new Map(),
}),
});

4. 监控和调试

// 执行时间监控示例
const monitoredNode = async (state: any) => {
const startTime = Date.now();

try {
const result = await processTask(state.task);
const duration = Date.now() - startTime;

console.log(`任务完成,耗时: ${duration}ms`);

return {
result,
executionTime: duration,
timestamp: new Date().toISOString()
};
} catch (error) {
console.error('任务执行失败:', error);
throw error;
}
};

流式并行处理

LangGraphJS 还支持流式并行处理,让你能够实时观察并行任务的执行进度:

流式并行处理
import { StateGraph, Annotation, START, END } from '@langchain/langgraph';

// 流式并行处理状态
const StreamingState = Annotation.Root({
input: Annotation<string>(),
results: Annotation<Record<string, any>>({
reducer: (state, update) => ({ ...state, ...update }),
default: () => ({}),
}),
completed: Annotation<number>({
reducer: (state, update) => state + update,
default: () => 0,
}),
finalOutput: Annotation<string>(),
});

// 任务分发器
const distributeTask = (state: typeof StreamingState.State) => {
return {
results: {},
completed: 0,
tasks: ['process', 'analyze', 'validate']
};
};

// 数据处理任务(模拟流式)
const processData = async (state: typeof StreamingState.State) => {
const steps = ['初始化', '处理数据', '生成结果'];

for (let i = 0; i < steps.length; i++) {
await new Promise((resolve) => setTimeout(resolve, 100));
console.log(`📡 流式进度 ${i + 1}/${steps.length}: ${steps[i]}`);
}

return {
results: { process: `✅ 处理完成: ${state.input.toUpperCase()}` },
completed: 1
};
};

// 内容分析任务
const analyzeContent = async (state: typeof StreamingState.State) => {
await new Promise((resolve) => setTimeout(resolve, 200));

return {
results: { analyze: `📊 分析完成: ${state.input.length} 字符` },
completed: 1
};
};

// 质量验证任务
const validateResult = async (state: typeof StreamingState.State) => {
await new Promise((resolve) => setTimeout(resolve, 150));

return {
results: { validate: `✅ 验证通过: ${new Date().toISOString()}` },
completed: 1
};
};

// 结果聚合器
const aggregateResults = (state: typeof StreamingState.State) => {
if (state.completed < 3) return {};

const finalOutput = [
'🌟 流式并行处理完成',
'',
...Object.entries(state.results).map(([task, result]) => result),
'',
`🎯 总计完成 ${state.completed} 个任务`
].join('\n');

return { finalOutput };
};

// 构建流式并行处理图
const streamingApp = new StateGraph(StreamingState)
.addNode('distribute', distributeTask)
.addNode('processData', processData)
.addNode('analyzeContent', analyzeContent)
.addNode('validateResult', validateResult)
.addNode('aggregate', aggregateResults)
.addEdge(START, 'distribute')
.addEdge('distribute', 'processData')
.addEdge('distribute', 'analyzeContent')
.addEdge('distribute', 'validateResult')
.addEdge('processData', 'aggregate')
.addEdge('analyzeContent', 'aggregate')
.addEdge('validateResult', 'aggregate')
.addEdge('aggregate', END)
.compile();

小结

并行处理是 LangGraphJS 的核心优势之一,它让你能够:

  1. 提升性能:通过并行执行减少总体执行时间
  2. 增强可扩展性:轻松处理大规模数据和复杂任务
  3. 改善用户体验:通过流式处理提供实时反馈
  4. 优化资源利用:充分利用系统的计算资源

在下一节子图中,我们将学习如何构建更复杂的图结构,进一步提升应用的模块化和可维护性。

关键要点
  • 理解超级步骤的概念,合理设计并行执行流程
  • 掌握 Send API 的使用,实现动态并行任务创建
  • 应用 Map-Reduce 模式处理批量数据
  • 注意并发控制和错误处理
  • 利用流式处理提供更好的用户体验