🤝 多代理系统
引言
多代理系统是 LangGraph 中一个强大的架构模式,它允许多个智能代理协同工作来完成复杂任务。对于前端开发者来说,这就像是微服务架构在 AI 领域的应用——每个代理专注于特定的功能领域,通过协作完成整体目标。
多代理系统特别适合处理需要不同专业技能的复杂任务,比如一个代理负责数据分析,另一个负责内容生成,还有一个负责质量检查。
概念解释
多代理系统的核心组件
多代理系统由以下几个关键部分组成:
- 专业代理(Specialist Agents):每个代理专注于特定的任务领域
- 协调器(Coordinator):负责任务分配和结果整合
- 共享状态(Shared State):代理间的信息交换机制
- 通信协议(Communication Protocol):定义代理间的交互规则
协作模式
代码示例
基础多代理系统
基础多代理协作
import { StateGraph, Annotation, START, END } from '@langchain/langgraph';
import { ChatOpenAI } from '@langchain/openai';
import { HumanMessage } from '@langchain/core/messages';
import { z } from 'zod';
// 多代理系统状态
const MultiAgentState = Annotation.Root({
task: Annotation<string>(),
researchResult: Annotation<any>(),
contentResult: Annotation<any>(),
reviewResult: Annotation<any>(),
currentStep: Annotation<string>(),
finalOutput: Annotation<any>(),
});
// 专业代理初始化
const agents = {
research: new ChatOpenAI({ temperature: 0.1 }).withStructuredOutput(
z.object({
sources: z.array(z.string()),
keyPoints: z.array(z.string()),
confidence: z.number(),
})
),
content: new ChatOpenAI({ temperature: 0.7 }).withStructuredOutput(
z.object({
title: z.string(),
content: z.string(),
wordCount: z.number(),
})
),
review: new ChatOpenAI({ temperature: 0 }).withStructuredOutput(
z.object({
score: z.number().min(0).max(10),
feedback: z.array(z.string()),
approved: z.boolean(),
})
),
};
// 研究代理
const researchNode = async (state: typeof MultiAgentState.State) => {
const prompt = `研究任务:${state.task}。请提供信息来源、关键要点和置信度评估。`;
const result = await agents.research.invoke([new HumanMessage(prompt)]);
return {
researchResult: result,
currentStep: 'content',
};
};
// 内容生成代理
const contentNode = async (state: typeof MultiAgentState.State) => {
const prompt = `基于研究结果创作内容:${state.researchResult.keyPoints.join('\n')}`;
const result = await agents.content.invoke([new HumanMessage(prompt)]);
return {
contentResult: result,
currentStep: 'review',
};
};
// 审核代理
const reviewNode = async (state: typeof MultiAgentState.State) => {
const prompt = `审核内容质量:${state.contentResult.content}`;
const result = await agents.review.invoke([new HumanMessage(prompt)]);
return {
reviewResult: result,
currentStep: result.approved ? 'finalize' : 'content',
};
};
// 最终整合
const finalizeNode = async (state: typeof MultiAgentState.State) => {
return {
finalOutput: {
task: state.task,
research: state.researchResult,
content: state.contentResult,
review: state.reviewResult,
completedAt: new Date().toISOString(),
},
currentStep: 'completed',
};
};
// 路由函数
const routeNext = (state: typeof MultiAgentState.State) => state.currentStep;
// 构建多代理协作图
const multiAgentWorkflow = new StateGraph(MultiAgentState)
.addNode('research', researchNode)
.addNode('content', contentNode)
.addNode('review', reviewNode)
.addNode('finalize', finalizeNode)
.addEdge(START, 'research')
.addConditionalEdges('research', routeNext)
.addConditionalEdges('content', routeNext)
.addConditionalEdges('review', routeNext)
.addConditionalEdges('finalize', routeNext);
const multiAgentSystem = multiAgentWorkflow.compile();
// 使用示例
const result = await multiAgentSystem.invoke({
task: '分析人工智能在教育领域的应用前景',
currentStep: 'research',
});
console.log('✅ 多代理协作完成');
console.log(`研究来源: ${result.finalOutput?.research.sources.length}`);
console.log(`内容字数: ${result.finalOutput?.content.wordCount}`);
console.log(`质量评分: ${result.finalOutput?.review.score}/10`);
监督式多代理系统
在监督式架构中,有一个主管代理负责协调其他专业代理:
监督式多代理系统
import { StateGraph, Annotation, START, END } from '@langchain/langgraph';
import { ChatOpenAI } from '@langchain/openai';
import { HumanMessage } from '@langchain/core/messages';
import { z } from 'zod';
// 监督式多代理系统状态
const SupervisorState = Annotation.Root({
originalTask: Annotation<string>(),
assignedAgent: Annotation<string>(),
agentResults: Annotation<Record<string, any>>({
reducer: (state, update) => ({ ...state, ...update }),
default: () => ({}),
}),
completedAgents: Annotation<string[]>({
reducer: (state, update) => [...state, ...update],
default: () => [],
}),
finalResult: Annotation<any>(),
});
// 专业代理定义
const agents = {
researcher: new ChatOpenAI({ temperature: 0.1 }).withStructuredOutput(
z.object({
findings: z.array(z.string()),
sources: z.array(z.string()),
confidence: z.number(),
})
),
analyst: new ChatOpenAI({ temperature: 0.2 }).withStructuredOutput(
z.object({
analysis: z.string(),
insights: z.array(z.string()),
trends: z.array(z.string()),
})
),
writer: new ChatOpenAI({ temperature: 0.6 }).withStructuredOutput(
z.object({
title: z.string(),
content: z.string(),
wordCount: z.number(),
})
),
reviewer: new ChatOpenAI({ temperature: 0 }).withStructuredOutput(
z.object({
score: z.number().min(0).max(10),
suggestions: z.array(z.string()),
approved: z.boolean(),
})
),
};
// 监督代理
const supervisorAgent = new ChatOpenAI({ temperature: 0.3 }).withStructuredOutput(
z.object({
nextAgent: z.enum(['researcher', 'analyst', 'writer', 'reviewer', 'complete']),
subtask: z.string(),
reasoning: z.string(),
})
);
// 监督节点
const supervisorNode = async (state: typeof SupervisorState.State) => {
const prompt = `
任务:${state.originalTask}
已完成:${state.completedAgents.join(', ') || '无'}
请决定下一个执行的代理(researcher/analyst/writer/reviewer/complete)和具体子任务。
`;
const decision = await supervisorAgent.invoke([new HumanMessage(prompt)]);
return {
assignedAgent: decision.nextAgent,
currentSubtask: decision.subtask,
};
};
// 各专业代理节点
const researcherNode = async (state: typeof SupervisorState.State) => {
const prompt = `研究任务:${state.originalTask} - ${state.currentSubtask}`;
const result = await agents.researcher.invoke([new HumanMessage(prompt)]);
return {
agentResults: { researcher: result },
completedAgents: ['researcher'],
};
};
const analystNode = async (state: typeof SupervisorState.State) => {
const prompt = `分析任务:${state.originalTask} - ${state.currentSubtask}`;
const result = await agents.analyst.invoke([new HumanMessage(prompt)]);
return {
agentResults: { analyst: result },
completedAgents: ['analyst'],
};
};
const writerNode = async (state: typeof SupervisorState.State) => {
const prompt = `写作任务:${state.originalTask} - ${state.currentSubtask}`;
const result = await agents.writer.invoke([new HumanMessage(prompt)]);
return {
agentResults: { writer: result },
completedAgents: ['writer'],
};
};
const reviewerNode = async (state: typeof SupervisorState.State) => {
const prompt = `审核任务:${state.originalTask} - ${state.currentSubtask}`;
const result = await agents.reviewer.invoke([new HumanMessage(prompt)]);
return {
agentResults: { reviewer: result },
completedAgents: ['reviewer'],
};
};
// 完成节点
const completeNode = async (state: typeof SupervisorState.State) => {
return {
finalResult: {
originalTask: state.originalTask,
completedAgents: state.completedAgents,
agentResults: state.agentResults,
completedAt: new Date().toISOString(),
},
};
};
// 路由函数
const routeToAgent = (state: typeof SupervisorState.State) => {
return state.assignedAgent === 'complete' ? 'complete' : state.assignedAgent;
};
// 构建监督式多代理图
const supervisorWorkflow = new StateGraph(SupervisorState)
.addNode('supervisor', supervisorNode)
.addNode('researcher', researcherNode)
.addNode('analyst', analystNode)
.addNode('writer', writerNode)
.addNode('reviewer', reviewerNode)
.addNode('complete', completeNode)
.addEdge(START, 'supervisor')
.addConditionalEdges('supervisor', routeToAgent, {
researcher: 'researcher',
analyst: 'analyst',
writer: 'writer',
reviewer: 'reviewer',
complete: 'complete',
})
.addEdge('researcher', 'supervisor')
.addEdge('analyst', 'supervisor')
.addEdge('writer', 'supervisor')
.addEdge('reviewer', 'supervisor')
.addEdge('complete', END);
const supervisorSystem = supervisorWorkflow.compile();
// 使用示例
const result = await supervisorSystem.invoke({
originalTask: '分析区块链技术在金融行业的应用前景',
});
console.log(`✅ 完成,参与代理: ${result.finalResult?.completedAgents.length}`);
层次化团队结构
层次化代理团队
import { StateGraph, Annotation, START, END } from '@langchain/langgraph';
import { ChatOpenAI } from '@langchain/openai';
import { HumanMessage } from '@langchain/core/messages';
import { z } from 'zod';
// 层次化团队状态
const HierarchicalState = Annotation.Root({
mainTask: Annotation<string>(),
currentTeam: Annotation<string>(),
teamResults: Annotation<Record<string, any>>({
reducer: (state, update) => ({ ...state, ...update }),
default: () => ({}),
}),
completedTeams: Annotation<string[]>({
reducer: (state, update) => [...state, ...update],
default: () => [],
}),
finalOutput: Annotation<any>(),
});
// 团队代理
const teams = {
research: new ChatOpenAI({ temperature: 0.1 }),
development: new ChatOpenAI({ temperature: 0.2 }),
testing: new ChatOpenAI({ temperature: 0 }),
deployment: new ChatOpenAI({ temperature: 0 }),
};
// 协调器
const coordinator = new ChatOpenAI({ temperature: 0.3 }).withStructuredOutput(
z.object({
nextTeam: z.enum(['research', 'development', 'testing', 'deployment', 'complete']),
reasoning: z.string(),
})
);
// 协调节点
const coordinatorNode = async (state: typeof HierarchicalState.State) => {
const prompt = `
任务:${state.mainTask}
已完成:${state.completedTeams.join(', ') || '无'}
请决定下一个执行的团队(research/development/testing/deployment/complete)。
`;
const decision = await coordinator.invoke([new HumanMessage(prompt)]);
return { currentTeam: decision.nextTeam };
};
// 各团队节点
const researchTeamNode = async (state: typeof HierarchicalState.State) => {
const prompt = `研究任务:${state.mainTask}`;
const result = await teams.research.invoke([new HumanMessage(prompt)]);
return {
teamResults: {
research: { feasibility: '技术可行', completedAt: new Date().toISOString() },
},
completedTeams: ['research'],
};
};
const developmentTeamNode = async (state: typeof HierarchicalState.State) => {
const prompt = `开发任务:${state.mainTask}`;
const result = await teams.development.invoke([new HumanMessage(prompt)]);
return {
teamResults: {
development: { architecture: '微服务架构', completedAt: new Date().toISOString() },
},
completedTeams: ['development'],
};
};
const testingTeamNode = async (state: typeof HierarchicalState.State) => {
const prompt = `测试任务:${state.mainTask}`;
const result = await teams.testing.invoke([new HumanMessage(prompt)]);
return {
teamResults: {
testing: { status: '测试通过', bugCount: 0, completedAt: new Date().toISOString() },
},
completedTeams: ['testing'],
};
};
const deploymentTeamNode = async (state: typeof HierarchicalState.State) => {
const prompt = `部署任务:${state.mainTask}`;
const result = await teams.deployment.invoke([new HumanMessage(prompt)]);
return {
teamResults: {
deployment: { status: '部署成功', url: 'https://example.com', completedAt: new Date().toISOString() },
},
completedTeams: ['deployment'],
};
};
// 完成节点
const completeNode = async (state: typeof HierarchicalState.State) => {
return {
finalOutput: {
mainTask: state.mainTask,
completedTeams: state.completedTeams,
teamResults: state.teamResults,
projectStatus: state.completedTeams.length === 4 ? 'success' : 'partial',
completedAt: new Date().toISOString(),
},
};
};
// 路由函数
const routeToTeam = (state: typeof HierarchicalState.State) => state.currentTeam;
// 构建层次化团队图
const hierarchicalWorkflow = new StateGraph(HierarchicalState)
.addNode('coordinator', coordinatorNode)
.addNode('research', researchTeamNode)
.addNode('development', developmentTeamNode)
.addNode('testing', testingTeamNode)
.addNode('deployment', deploymentTeamNode)
.addNode('complete', completeNode)
.addEdge(START, 'coordinator')
.addConditionalEdges('coordinator', routeToTeam, {
research: 'research',
development: 'development',
testing: 'testing',
deployment: 'deployment',
complete: 'complete',
})
.addEdge('research', 'coordinator')
.addEdge('development', 'coordinator')
.addEdge('testing', 'coordinator')
.addEdge('deployment', 'coordinator')
.addEdge('complete', END);
const hierarchicalSystem = hierarchicalWorkflow.compile();
// 使用示例
const result = await hierarchicalSystem.invoke({
mainTask: '开发一个在线教育平台',
});
console.log(`✅ 项目${result.finalOutput?.projectStatus === 'success' ? '成功' : '部分'}完成`);
console.log(`参与团队: ${result.finalOutput?.completedTeams.length}`);
可视化说明
多代理协作流程
代理状态管理
实践指导
步骤 1:定义专业代理
首先为每个专业领域创建专门的代理:
import { ChatOpenAI } from '@langchain/openai';
import { tool } from '@langchain/core/tools';
// 数据分析代理
const dataAnalysisAgent = new ChatOpenAI({
model: process.env.OPENAI_MODEL_NAME,
temperature: 0,
}).withStructuredOutput(
z.object({
insights: z.array(z.string()),
recommendations: z.array(z.string()),
confidence: z.number().min(0).max(1),
})
);
// 内容生成代理
const contentGenerationAgent = new ChatOpenAI({
model: process.env.OPENAI_MODEL_NAME,
temperature: 0.7,
}).withStructuredOutput(
z.object({
title: z.string(),
content: z.string(),
keywords: z.array(z.string()),
})
);
// 质量检查代理
const qualityAssuranceAgent = new ChatOpenAI({
model: process.env.OPENAI_MODEL_NAME,
temperature: 0,
}).withStructuredOutput(
z.object({
score: z.number().min(0).max(10),
issues: z.array(z.string()),
suggestions: z.array(z.string()),
approved: z.boolean(),
})
);
步骤 2:设计共享状态
import { Annotation } from '@langchain/langgraph';
const MultiAgentState = Annotation.Root({
// 原始任务信息
task: Annotation<string>(),
requirements: Annotation<string[]>(),
// 各代理的工作结果
dataAnalysis: Annotation<any>(),
generatedContent: Annotation<any>(),
qualityReport: Annotation<any>(),
// 协调信息
currentAgent: Annotation<string>(),
completedAgents: Annotation<string[]>({
reducer: (state: string[], update: string[]) => [...state, ...update],
default: () => [],
}),
// 最终结果
finalResult: Annotation<any>(),
isComplete: Annotation<boolean>(),
});
步骤 3:实现协调逻辑
// 协调器节点
const coordinatorNode = async (state: typeof MultiAgentState.State) => {
const { completedAgents, task } = state;
// 决定下一个要执行的代理
if (!completedAgents.includes('data_analysis')) {
return {
currentAgent: 'data_analysis',
};
} else if (!completedAgents.includes('content_generation')) {
return {
currentAgent: 'content_generation',
};
} else if (!completedAgents.includes('quality_assurance')) {
return {
currentAgent: 'quality_assurance',
};
} else {
// 所有代理都完成了,整合结果
return {
currentAgent: 'finalize',
isComplete: true,
};
}
};
// 路由函数
const routeToAgent = (state: typeof MultiAgentState.State) => {
return state.currentAgent;
};
步骤 4:构建多代理图
import { StateGraph, START, END } from '@langchain/langgraph';
const workflow = new StateGraph(MultiAgentState)
.addNode('coordinator', coordinatorNode)
.addNode('data_analysis', dataAnalysisNode)
.addNode('content_generation', contentGenerationNode)
.addNode('quality_assurance', qualityAssuranceNode)
.addNode('finalize', finalizeNode)
.addEdge(START, 'coordinator')
.addConditionalEdges('coordinator', routeToAgent, {
data_analysis: 'data_analysis',
content_generation: 'content_generation',
quality_assurance: 'quality_assurance',
finalize: 'finalize',
})
.addEdge('data_analysis', 'coordinator')
.addEdge('content_generation', 'coordinator')
.addEdge('quality_assurance', 'coordinator')
.addEdge('finalize', END);
const multiAgentSystem = workflow.compile();
高级特性
动态代理选择
动态代理选择
import { StateGraph, Annotation, START, END } from '@langchain/langgraph';
import { ChatOpenAI } from '@langchain/openai';
import { HumanMessage } from '@langchain/core/messages';
import { z } from 'zod';
// 动态代理选择状态
const DynamicState = Annotation.Root({
task: Annotation<string>(),
taskType: Annotation<string>(),
selectedAgents: Annotation<string[]>(),
agentResults: Annotation<Record<string, any>>({
reducer: (state, update) => ({ ...state, ...update }),
default: () => ({}),
}),
finalResult: Annotation<any>(),
});
// 代理池
const agentPool = {
textAnalyst: new ChatOpenAI({ temperature: 0.1 }),
codeReviewer: new ChatOpenAI({ temperature: 0 }),
dataScientist: new ChatOpenAI({ temperature: 0.2 }),
contentWriter: new ChatOpenAI({ temperature: 0.7 }),
translator: new ChatOpenAI({ temperature: 0.3 }),
researcher: new ChatOpenAI({ temperature: 0.1 }),
};
// 任务分类器
const taskClassifier = new ChatOpenAI({ temperature: 0 }).withStructuredOutput(
z.object({
taskType: z.enum(['analysis', 'coding', 'research', 'writing', 'translation', 'mixed']),
requiredAgents: z.array(z.enum(['textAnalyst', 'codeReviewer', 'dataScientist', 'contentWriter', 'translator', 'researcher'])),
reasoning: z.string(),
})
);
// 分类节点
const classifyNode = async (state: typeof DynamicState.State) => {
const prompt = `分析任务类型并选择合适的代理:${state.task}`;
const result = await taskClassifier.invoke([new HumanMessage(prompt)]);
return {
taskType: result.taskType,
selectedAgents: result.requiredAgents,
};
};
// 执行代理节点
const executeAgentsNode = async (state: typeof DynamicState.State) => {
const results: Record<string, any> = {};
for (const agentName of state.selectedAgents) {
const agent = agentPool[agentName as keyof typeof agentPool];
if (!agent) continue;
try {
const result = await agent.invoke([new HumanMessage(`处理任务:${state.task}`)]);
results[agentName] = { content: result.content, timestamp: new Date().toISOString() };
} catch (error) {
results[agentName] = { error: error.message, timestamp: new Date().toISOString() };
}
}
return { agentResults: results };
};
// 整合结果节点
const integrateNode = async (state: typeof DynamicState.State) => {
return {
finalResult: {
task: state.task,
taskType: state.taskType,
selectedAgents: state.selectedAgents,
agentResults: state.agentResults,
completedAt: new Date().toISOString(),
},
};
};
// 构建动态选择图
const dynamicWorkflow = new StateGraph(DynamicState)
.addNode('classify', classifyNode)
.addNode('execute', executeAgentsNode)
.addNode('integrate', integrateNode)
.addEdge(START, 'classify')
.addEdge('classify', 'execute')
.addEdge('execute', 'integrate')
.addEdge('integrate', END);
const dynamicSystem = dynamicWorkflow.compile();
// 使用示例
const result = await dynamicSystem.invoke({
task: '分析用户行为数据并提供洞察',
});
console.log(`任务类型: ${result.finalResult.taskType}`);
console.log(`选择代理: ${result.finalResult.selectedAgents.join(', ')}`);
代理间直接通信
代理间通信
import { StateGraph, Annotation, START, END } from '@langchain/langgraph';
import { ChatOpenAI } from '@langchain/openai';
import { HumanMessage } from '@langchain/core/messages';
// 代理间通信状态
const CommunicationState = Annotation.Root({
task: Annotation<string>(),
messages: Annotation<
Array<{
from: string;
to: string;
content: string;
timestamp: string;
}>
>({
reducer: (state, update) => [...state, ...update],
default: () => [],
}),
agentStates: Annotation<Record<string, any>>({
reducer: (state, update) => ({ ...state, ...update }),
default: () => ({}),
}),
finalResult: Annotation<any>(),
});
// 定义通信代理
const agents = {
coordinator: new ChatOpenAI({
model: process.env.OPENAI_MODEL_NAME,
temperature: 0.2,
}),
researcher: new ChatOpenAI({
model: process.env.OPENAI_MODEL_NAME,
temperature: 0.1,
}),
analyst: new ChatOpenAI({
model: process.env.OPENAI_MODEL_NAME,
temperature: 0.1,
}),
writer: new ChatOpenAI({
model: process.env.OPENAI_MODEL_NAME,
temperature: 0.6,
}),
};
// 消息发送辅助函数
const sendMessage = (from: string, to: string, content: string) => ({
from,
to,
content,
timestamp: new Date().toISOString(),
});
// 协调器节点
const coordinatorNode = async (state: typeof CommunicationState.State) => {
console.log('🎯 协调器开始工作...');
const recentMessages = state.messages.slice(-5);
const prompt = `
作为项目协调器,分析当前状况并决定下一步行动:
任务:${state.task}
最近的消息:
${recentMessages.map((m) => `${m.from} -> ${m.to}: ${m.content}`).join('\n')}
代理状态:
${JSON.stringify(state.agentStates, null, 2)}
请决定下一步行动并发送相应的指令。
`;
try {
const response = await agents.coordinator.invoke([
new HumanMessage(prompt),
]);
// 根据当前状态决定发送消息给哪个代理
let targetAgent = 'researcher';
if (state.agentStates.researcher?.completed) {
targetAgent = 'analyst';
}
if (state.agentStates.analyst?.completed) {
targetAgent = 'writer';
}
const content =
typeof response.content === 'string'
? response.content
: JSON.stringify(response.content);
const newMessage = sendMessage('coordinator', targetAgent, content);
console.log(`📤 协调器 -> ${targetAgent}: ${content.substring(0, 100)}...`);
return {
messages: [newMessage],
};
} catch (error) {
return {
messages: [
sendMessage('coordinator', 'system', `协调失败: ${error.message}`),
],
};
}
};
// 研究员节点
const researcherNode = async (state: typeof CommunicationState.State) => {
console.log('🔍 研究员开始工作...');
const myMessages = state.messages.filter((m) => m.to === 'researcher');
const latestMessage = myMessages[myMessages.length - 1];
const prompt = `
作为研究员,处理以下任务:
原始任务:${state.task}
协调器指令:${latestMessage?.content || '开始研究工作'}
请进行深入研究并准备向分析师汇报结果。
`;
try {
const response = await agents.researcher.invoke([new HumanMessage(prompt)]);
const messages = [
sendMessage('researcher', 'analyst', `研究结果:${response.content}`),
sendMessage('researcher', 'coordinator', '研究工作已完成'),
];
console.log('📤 研究员完成工作,向分析师和协调器发送消息');
return {
messages,
agentStates: {
researcher: { completed: true, result: response.content },
},
};
} catch (error) {
return {
messages: [
sendMessage('researcher', 'coordinator', `研究失败: ${error.message}`),
],
agentStates: { researcher: { completed: false, error: error.message } },
};
}
};
// 分析师节点
const analystNode = async (state: typeof CommunicationState.State) => {
console.log('📊 分析师开始工作...');
const myMessages = state.messages.filter((m) => m.to === 'analyst');
const researchData = myMessages.find((m) => m.from === 'researcher');
if (!researchData) {
return {
messages: [sendMessage('analyst', 'coordinator', '等待研究员的数据')],
};
}
const prompt = `
作为数据分析师,分析以下研究结果:
原始任务:${state.task}
研究数据:${researchData.content}
请进行深入分析并准备向写作者提供分析结果。
`;
try {
const response = await agents.analyst.invoke([new HumanMessage(prompt)]);
const messages = [
sendMessage('analyst', 'writer', `分析结果:${response.content}`),
sendMessage('analyst', 'coordinator', '分析工作已完成'),
];
console.log('📤 分析师完成工作,向写作者和协调器发送消息');
return {
messages,
agentStates: { analyst: { completed: true, result: response.content } },
};
} catch (error) {
return {
messages: [
sendMessage('analyst', 'coordinator', `分析失败: ${error.message}`),
],
agentStates: { analyst: { completed: false, error: error.message } },
};
}
};
// 写作者节点
const writerNode = async (state: typeof CommunicationState.State) => {
console.log('✍️ 写作者开始工作...');
const myMessages = state.messages.filter((m) => m.to === 'writer');
const analysisData = myMessages.find((m) => m.from === 'analyst');
if (!analysisData) {
return {
messages: [sendMessage('writer', 'coordinator', '等待分析师的数据')],
};
}
const prompt = `
作为内容写作者,基于以下分析结果创作内容:
原始任务:${state.task}
分析数据:${analysisData.content}
请创作一篇完整的文章。
`;
try {
const response = await agents.writer.invoke([new HumanMessage(prompt)]);
const messages = [
sendMessage('writer', 'coordinator', `写作完成:${response.content}`),
];
console.log('📤 写作者完成工作,向协调器发送最终结果');
return {
messages,
agentStates: { writer: { completed: true, result: response.content } },
};
} catch (error) {
return {
messages: [
sendMessage('writer', 'coordinator', `写作失败: ${error.message}`),
],
agentStates: { writer: { completed: false, error: error.message } },
};
}
};
// 完成节点
const completeNode = async (state: typeof CommunicationState.State) => {
console.log('🎉 整合通信结果...');
const finalResult = {
task: state.task,
totalMessages: state.messages.length,
messageFlow: state.messages,
agentStates: state.agentStates,
summary: {
researchCompleted: state.agentStates.researcher?.completed || false,
analysisCompleted: state.agentStates.analyst?.completed || false,
writingCompleted: state.agentStates.writer?.completed || false,
finalContent: state.agentStates.writer?.result || null,
},
completedAt: new Date().toISOString(),
};
console.log('📊 通信统计:');
console.log(`- 总消息数: ${finalResult.totalMessages}`);
console.log(`- 研究完成: ${finalResult.summary.researchCompleted ? '✅' : '❌'}`);
console.log(`- 分析完成: ${finalResult.summary.analysisCompleted ? '✅' : '❌'}`);
console.log(`- 写作完成: ${finalResult.summary.writingCompleted ? '✅' : '❌'}`);
return {
finalResult,
};
};
// 路由函数
const routeNext = (state: typeof CommunicationState.State) => {
const { agentStates } = state;
if (!agentStates.researcher?.completed) {
return 'researcher';
} else if (!agentStates.analyst?.completed) {
return 'analyst';
} else if (!agentStates.writer?.completed) {
return 'writer';
} else {
return 'complete';
}
};
// 构建通信图
const communicationWorkflow = new StateGraph(CommunicationState)
.addNode('coordinator', coordinatorNode)
.addNode('researcher', researcherNode)
.addNode('analyst', analystNode)
.addNode('writer', writerNode)
.addNode('complete', completeNode)
.addEdge(START, 'coordinator')
.addConditionalEdges('coordinator', routeNext, {
researcher: 'researcher',
analyst: 'analyst',
writer: 'writer',
complete: 'complete',
})
.addEdge('researcher', 'coordinator')
.addEdge('analyst', 'coordinator')
.addEdge('writer', 'coordinator')
.addEdge('complete', END);
const communicationSystem = communicationWorkflow.compile();
// 使用示例
const result = await communicationSystem.invoke({
task: '分析可持续发展在现代企业中的重要性'
});
if (result.finalResult) {
const summary = result.finalResult.summary;
console.log('\n📈 通信结果:');
console.log(`- 消息总数: ${result.finalResult.totalMessages}`);
console.log(`- 工作流程: ${summary.researchCompleted && summary.analysisCompleted && summary.writingCompleted ? '完整' : '部分'}`);
// 显示消息流
console.log('\n💬 消息流:');
result.finalResult.messageFlow.forEach((msg, i) => {
console.log(`${i + 1}. ${msg.from} -> ${msg.to}: ${msg.content.substring(0, 50)}...`);
});
}
并行代理执行
并行代理执行
import { StateGraph, Annotation, START, END } from '@langchain/langgraph';
import { ChatOpenAI } from '@langchain/openai';
import { HumanMessage } from '@langchain/core/messages';
// 并行代理状态定义
const ParallelState = Annotation.Root({
task: Annotation<string>(),
// 并行执行的代理结果
agentResults: Annotation<Record<string, any>>({
reducer: (state: Record<string, any>, update: Record<string, any>) => ({
...state,
...update,
}),
default: () => ({}),
}),
// 执行状态跟踪
completedAgents: Annotation<string[]>({
reducer: (state: string[], update: string[]) => [...state, ...update],
default: () => [],
}),
finalResult: Annotation<any>(),
});
// 定义并行代理
const parallelAgents = {
researcher: new ChatOpenAI({
model: process.env.OPENAI_MODEL_NAME,
temperature: 0.1,
}),
analyst: new ChatOpenAI({
model: process.env.OPENAI_MODEL_NAME,
temperature: 0.2,
}),
writer: new ChatOpenAI({
model: process.env.OPENAI_MODEL_NAME,
temperature: 0.6,
}),
reviewer: new ChatOpenAI({
model: process.env.OPENAI_MODEL_NAME,
temperature: 0,
}),
};
// 研究代理节点
const researcherNode = async (state: typeof ParallelState.State) => {
console.log('🔍 研究代理开始并行工作...');
const prompt = `
作为研究专家,请针对以下任务进行研究:
任务:${state.task}
请提供:
1. 相关背景信息
2. 关键数据和统计
3. 重要趋势分析
4. 参考资料来源
`;
try {
const result = await parallelAgents.researcher.invoke([
new HumanMessage(prompt),
]);
console.log('✅ 研究代理完成工作');
return {
agentResults: {
researcher: {
content:
typeof result.content === 'string'
? result.content
: JSON.stringify(result.content),
timestamp: new Date().toISOString(),
status: 'completed',
},
},
completedAgents: ['researcher'],
};
} catch (error) {
console.error('❌ 研究代理执行失败:', error.message);
return {
agentResults: {
researcher: {
error: error.message,
timestamp: new Date().toISOString(),
status: 'failed',
},
},
completedAgents: ['researcher'],
};
}
};
// 分析代理节点
const analystNode = async (state: typeof ParallelState.State) => {
console.log('📊 分析代理开始并行工作...');
const prompt = `
作为数据分析专家,请分析以下任务:
任务:${state.task}
请提供:
1. 数据模式识别
2. 关键指标分析
3. 趋势预测
4. 风险评估
`;
try {
const result = await parallelAgents.analyst.invoke([
new HumanMessage(prompt),
]);
console.log('✅ 分析代理完成工作');
return {
agentResults: {
analyst: {
content:
typeof result.content === 'string'
? result.content
: JSON.stringify(result.content),
timestamp: new Date().toISOString(),
status: 'completed',
},
},
completedAgents: ['analyst'],
};
} catch (error) {
console.error('❌ 分析代理执行失败:', error.message);
return {
agentResults: {
analyst: {
error: error.message,
timestamp: new Date().toISOString(),
status: 'failed',
},
},
completedAgents: ['analyst'],
};
}
};
// 写作代理节点
const writerNode = async (state: typeof ParallelState.State) => {
console.log('✍️ 写作代理开始并行工作...');
const prompt = `
作为内容创作专家,请为以下任务创作内容:
任务:${state.task}
请提供:
1. 吸引人的标题
2. 结构化的内容大纲
3. 关键要点总结
4. 目标受众分析
`;
try {
const result = await parallelAgents.writer.invoke([
new HumanMessage(prompt),
]);
console.log('✅ 写作代理完成工作');
return {
agentResults: {
writer: {
content:
typeof result.content === 'string'
? result.content
: JSON.stringify(result.content),
timestamp: new Date().toISOString(),
status: 'completed',
},
},
completedAgents: ['writer'],
};
} catch (error) {
console.error('❌ 写作代理执行失败:', error.message);
return {
agentResults: {
writer: {
error: error.message,
timestamp: new Date().toISOString(),
status: 'failed',
},
},
completedAgents: ['writer'],
};
}
};
// 审核代理节点
const reviewerNode = async (state: typeof ParallelState.State) => {
console.log('🔍 审核代理开始并行工作...');
const prompt = `
作为质量审核专家,请评估以下任务的要求:
任务:${state.task}
请提供:
1. 质量标准定义
2. 评估维度设计
3. 检查清单制定
4. 改进建议框架
`;
try {
const result = await parallelAgents.reviewer.invoke([
new HumanMessage(prompt),
]);
console.log('✅ 审核代理完成工作');
return {
agentResults: {
reviewer: {
content:
typeof result.content === 'string'
? result.content
: JSON.stringify(result.content),
timestamp: new Date().toISOString(),
status: 'completed',
},
},
completedAgents: ['reviewer'],
};
} catch (error) {
console.error('❌ 审核代理执行失败:', error.message);
return {
agentResults: {
reviewer: {
error: error.message,
timestamp: new Date().toISOString(),
status: 'failed',
},
},
completedAgents: ['reviewer'],
};
}
};
// 整合结果节点
const aggregateNode = async (state: typeof ParallelState.State) => {
console.log('🎯 整合并行代理结果...');
const { agentResults, completedAgents } = state;
// 检查所有代理是否都完成了
const expectedAgents = ['researcher', 'analyst', 'writer', 'reviewer'];
const allCompleted = expectedAgents.every((agent) =>
completedAgents.includes(agent)
);
if (!allCompleted) {
console.log('⏳ 等待所有代理完成...');
return {}; // 继续等待
}
// 统计成功和失败的代理
const successfulAgents = Object.entries(agentResults)
.filter(([_, result]) => result.status === 'completed')
.map(([name, _]) => name);
const failedAgents = Object.entries(agentResults)
.filter(([_, result]) => result.status === 'failed')
.map(([name, _]) => name);
const finalResult = {
task: state.task,
executionSummary: {
totalAgents: expectedAgents.length,
successfulAgents: successfulAgents.length,
failedAgents: failedAgents.length,
successRate:
((successfulAgents.length / expectedAgents.length) * 100).toFixed(1) +
'%',
},
agentResults,
successfulAgents,
failedAgents,
completedAt: new Date().toISOString(),
};
console.log('✅ 并行代理执行完成');
console.log('📊 执行统计:');
console.log(`- 总代理数: ${finalResult.executionSummary.totalAgents}`);
console.log(`- 成功执行: ${finalResult.executionSummary.successfulAgents}`);
console.log(`- 执行失败: ${finalResult.executionSummary.failedAgents}`);
console.log(`- 成功率: ${finalResult.executionSummary.successRate}`);
return {
finalResult,
};
};
// 构建并行代理图
const parallelWorkflow = new StateGraph(ParallelState)
.addNode('researcher', researcherNode)
.addNode('analyst', analystNode)
.addNode('writer', writerNode)
.addNode('reviewer', reviewerNode)
.addNode('aggregate', aggregateNode)
// 从开始节点并行启动所有代理
.addEdge(START, 'researcher')
.addEdge(START, 'analyst')
.addEdge(START, 'writer')
.addEdge(START, 'reviewer')
// 所有代理完成后都进入聚合节点
.addEdge('researcher', 'aggregate')
.addEdge('analyst', 'aggregate')
.addEdge('writer', 'aggregate')
.addEdge('reviewer', 'aggregate')
.addEdge('aggregate', END);
// 编译并行代理系统
const parallelSystem = parallelWorkflow.compile();
// 使用示例
const startTime = Date.now();
const result = await parallelSystem.invoke({
task: '分析人工智能在医疗健康领域的应用前景'
});
const duration = Date.now() - startTime;
if (result.finalResult) {
const summary = result.finalResult.executionSummary;
console.log('\n📈 并行执行结果:');
console.log(`- 总耗时: ${duration}ms`);
console.log(`- 代理总数: ${summary.totalAgents}`);
console.log(`- 成功执行: ${summary.successfulAgents}`);
console.log(`- 执行失败: ${summary.failedAgents}`);
console.log(`- 成功率: ${summary.successRate}`);
}
最佳实践
1. 代理职责分离
// ✅ 好的做法:明确的职责分离
const researchAgent = {
name: 'researcher',
role: '负责信息收集和事实核查',
capabilities: ['web_search', 'document_analysis', 'fact_checking'],
};
const writerAgent = {
name: 'writer',
role: '负责内容创作和文本生成',
capabilities: ['content_generation', 'style_adaptation', 'formatting'],
};
// ❌ 避免:职责重叠的代理
const generalAgent = {
name: 'general',
role: '什么都做',
capabilities: ['everything'], // 太宽泛
};
2. 状态管理策略
// 使用命名空间避免状态冲突
const MultiAgentState = Annotation.Root({
// 全局状态
task: Annotation<string>(),
// 代理专用状态
research: Annotation<{
sources: string[];
facts: string[];
confidence: number;
}>(),
writing: Annotation<{
drafts: string[];
currentDraft: string;
wordCount: number;
}>(),
review: Annotation<{
feedback: string[];
score: number;
approved: boolean;
}>(),
});
3. 错误处理和重试
const robustAgentNode = async (state: typeof MultiAgentState.State) => {
const maxRetries = 3;
let attempt = 0;
while (attempt < maxRetries) {
try {
const result = await agent.invoke(state);
return { agentResult: result };
} catch (error) {
attempt++;
console.log(`代理执行失败,第 ${attempt} 次重试`);
if (attempt >= maxRetries) {
return {
agentResult: null,
error: `代理执行失败:${error.message}`,
};
}
// 等待后重试
await new Promise((resolve) => setTimeout(resolve, 1000 * attempt));
}
}
};
4. 性能优化
// 并行执行独立的代理
const parallelExecution = async (state: typeof MultiAgentState.State) => {
const independentTasks = [
researchAgent.invoke(state),
dataAgent.invoke(state),
analysisAgent.invoke(state),
];
const results = await Promise.all(independentTasks);
return {
researchResult: results[0],
dataResult: results[1],
analysisResult: results[2],
};
};
实际应用场景
1. 内容创作流水线
// 研究 → 写作 → 编辑 → 发布
const contentPipeline = {
researcher: '收集资料和数据',
writer: '创作初稿',
editor: '编辑和优化',
publisher: '格式化和发布',
};
2. 代码开发团队
// 需求分析 → 架构设计 → 编码实现 → 测试验证
const developmentTeam = {
analyst: '分析需求和规格',
architect: '设计系统架构',
developer: '编写代码实现',
tester: '测试和质量保证',
};
3. 客户服务系统
// 问题分类 → 专业处理 → 质量检查 → 客户反馈
const customerService = {
classifier: '分类客户问题',
specialist: '专业问题处理',
supervisor: '质量检查和审核',
feedback: '收集客户反馈',
};
小结与延伸
多代理系统是构建复杂 AI 应用的强大模式,它的主要优势包括:
- 专业化分工:每个代理专注于特定领域,提高效率和质量
- 可扩展性:容易添加新的专业代理
- 容错性:单个代理的失败不会影响整个系统
- 并行处理:独立任务可以并行执行,提高性能
在下一节子图中,我们将学习如何使用子图来构建更复杂的多代理系统,实现代理的模块化和重用。
设计建议
- 明确定义每个代理的职责边界
- 设计清晰的代理间通信协议
- 实现 robust 的错误处理和重试机制
- 考虑代理执行的并行化优化
注意事项
- 避免代理间的循环依赖
- 控制代理间的通信开销
- 合理设计共享状态的结构
- 注意代理执行的顺序依赖