跳到主要内容

🎮 Command 对象

引言

Command 对象是 LangGraph 中一个强大的功能,它允许节点在返回状态更新的同时控制图的执行流程。通过 Command 对象,你可以实现动态路由、条件跳转、多代理切换等高级功能。

核心概念

Command 对象的作用

Command 对象结合了两个重要功能:

  • 状态更新:修改图的当前状态
  • 控制流:决定下一步的执行路径

Command 类型

基础用法

简单的 Command 使用

基础 Command 示例
import {
StateGraph,
Annotation,
START,
END,
Command,
} from '@langchain/langgraph';
import { BaseMessage, HumanMessage, AIMessage } from '@langchain/core/messages';

// 定义状态
const StateAnnotation = Annotation.Root({
messages: Annotation<BaseMessage[]>({
reducer: (state: BaseMessage[], update: BaseMessage[]) =>
state.concat(update),
default: () => [],
}),
step: Annotation<number>({
reducer: (state: number, update: number) => update,
default: () => 0,
}),
status: Annotation<string>({
reducer: (state: string, update: string) => update,
default: () => 'init',
}),
});

// 使用 Command 的节点
async function commandNode(state: typeof StateAnnotation.State) {
console.log(`当前步骤: ${state.step}, 状态: ${state.status}`);

// 使用 Command 同时更新状态和控制流程
return new Command({
update: {
step: state.step + 1,
status: 'processing',
messages: [new AIMessage({ content: `执行步骤 ${state.step + 1}` })],
},
goto: 'next_node',
});
}

// 下一个节点
async function nextNode(state: typeof StateAnnotation.State) {
console.log(`到达下一个节点,步骤: ${state.step}`);

return new Command({
update: {
status: 'completed',
messages: [new AIMessage({ content: '处理完成' })],
},
goto: END,
});
}

// 构建图
const workflow = new StateGraph(StateAnnotation)
.addNode('command_node', commandNode)
.addNode('next_node', nextNode)
.addEdge(START, 'command_node');

const app = workflow.compile();

// 使用示例
async function demonstrateBasicCommand() {
console.log('=== 基础 Command 示例 ===\n');

try {
const result = await app.invoke({
messages: [new HumanMessage({ content: '开始处理' })],
});

console.log('执行结果:');
console.log('最终步骤:', result.step);
console.log('最终状态:', result.status);
console.log('消息历史:');
result.messages.forEach((msg: BaseMessage, index: number) => {
console.log(` ${index + 1}. ${msg.content}`);
});
} catch (error) {
console.error('执行失败:', error.message);
}
}

export { app, demonstrateBasicCommand };

动态路由

动态路由示例
/**
* ============================================================================
* 动态路由 - Dynamic Routing with Command Objects
* ============================================================================
*
* 📖 概述
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* 本示例展示如何使用 Command 对象实现智能动态路由系统。根据输入内容的语义分析,
* 自动将请求路由到最合适的处理节点,支持多级路由和条件跳转。
*
* 🎯 核心功能
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* 1️⃣ 语义路由:基于输入内容的关键词分析智能路由
* 2️⃣ 多级路由:支持路由后再次路由到其他节点
* 3️⃣ 认证集成:需要认证的路由自动跳转到认证检查
* 4️⃣ 优先级处理:紧急请求自动升级到高优先级队列
* 5️⃣ 路径追踪:记录完整的路由历史便于分析
*
* 💡 实现思路
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* • 路由决策器:分析输入内容,匹配关键词模式
* • 专门处理器:针对不同类型请求(优先级、支持、订单等)的处理节点
* • 认证中间件:拦截需要认证的请求,验证后继续
* • 升级机制:自动将需要特殊处理的请求升级
*
* 🚀 使用方式
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* // 运行示例
* $ npx esno 实用功能/Command对象/dynamic-routing.ts
*
* ⚠️ 注意事项
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* • 路由规则应该清晰且优先级明确
* • 建议使用更智能的 NLP 模型进行语义分析
* • 认证机制需要与实际的身份系统集成
*
* @author 程哥
* @version 1.0.0
* @updated 2025-11
*/

import '../../utils/loadEnv';
import { StateGraph, Annotation, Command, END } from '@langchain/langgraph';
import { MemorySaver } from '@langchain/langgraph';

// 定义状态结构
const StateAnnotation = Annotation.Root({
input: Annotation<string>,
currentStep: Annotation<string>,
result: Annotation<string>,
routingHistory: Annotation<string[]>,
metadata: Annotation<Record<string, any>>,
});

// 路由决策节点
function routingDecision(state: typeof StateAnnotation.State) {
console.log('🎯 执行路由决策');

const input = state.input?.toLowerCase() || '';

// 基于输入内容决定路由
let nextNode: string;
let metadata: Record<string, any> = {};

if (input.includes('urgent') || input.includes('emergency')) {
nextNode = 'priority_handler';
metadata = { priority: 'high', escalate: true };
} else if (input.includes('question') || input.includes('help')) {
nextNode = 'support_handler';
metadata = { category: 'support', autoReply: true };
} else if (input.includes('order') || input.includes('purchase')) {
nextNode = 'order_handler';
metadata = { category: 'sales', requiresAuth: true };
} else if (input.includes('feedback') || input.includes('review')) {
nextNode = 'feedback_handler';
metadata = { category: 'feedback', sentiment: 'neutral' };
} else {
nextNode = 'general_handler';
metadata = { category: 'general', defaultFlow: true };
}

console.log(`📍 路由到: ${nextNode}`);

// 使用 Command 对象进行动态路由
return new Command({
goto: nextNode,
update: {
currentStep: 'routing_decision',
routingHistory: [
...(state.routingHistory || []),
`routed_to_${nextNode}`,
],
metadata: { ...state.metadata, ...metadata },
},
});
}

// 优先级处理节点
function priorityHandler(state: typeof StateAnnotation.State) {
console.log('🚨 执行优先级处理');

const result = `优先级处理: ${state.input} - 已升级到高优先级队列`;

// 根据处理结果决定下一步
const shouldEscalate = state.metadata?.escalate;

if (shouldEscalate) {
return new Command({
goto: 'escalation_handler',
update: {
currentStep: 'priority_handler',
result,
routingHistory: [...(state.routingHistory || []), 'priority_processed'],
},
});
}

return new Command({
goto: 'completion',
update: {
currentStep: 'priority_handler',
result,
routingHistory: [...(state.routingHistory || []), 'priority_completed'],
},
});
}

// 支持处理节点
function supportHandler(state: typeof StateAnnotation.State) {
console.log('🎧 执行支持处理');

const autoReply = state.metadata?.autoReply;
let result: string;
let nextStep: string;

if (autoReply) {
result = `自动回复: 感谢您的问题 "${state.input}"。我们的支持团队会在24小时内回复您。`;
nextStep = 'completion';
} else {
result = `支持处理: ${state.input} - 已转发给人工客服`;
nextStep = 'human_review';
}

return new Command({
goto: nextStep,
update: {
currentStep: 'support_handler',
result,
routingHistory: [...(state.routingHistory || []), 'support_processed'],
},
});
}

// 订单处理节点
function orderHandler(state: typeof StateAnnotation.State) {
console.log('🛒 执行订单处理');

const requiresAuth = state.metadata?.requiresAuth;
const isAuthenticated = state.metadata?.authenticated;

// 如果需要认证但尚未认证,跳转到认证检查
if (requiresAuth && !isAuthenticated) {
return new Command({
goto: 'auth_check',
update: {
currentStep: 'order_handler',
routingHistory: [
...(state.routingHistory || []),
'order_auth_required',
],
metadata: {
...state.metadata,
authRequired: true,
originalHandler: 'order_handler',
},
},
});
}

const result = `订单处理: ${state.input} - 订单信息已记录`;

return new Command({
goto: 'completion',
update: {
currentStep: 'order_handler',
result,
routingHistory: [...(state.routingHistory || []), 'order_processed'],
},
});
}

// 反馈处理节点
function feedbackHandler(state: typeof StateAnnotation.State) {
console.log('💬 执行反馈处理');

const sentiment = state.metadata?.sentiment || 'neutral';
let result: string;
let nextStep: string;

if (sentiment === 'negative') {
result = `反馈处理: ${state.input} - 负面反馈,需要特别关注`;
nextStep = 'escalation_handler';
} else {
result = `反馈处理: ${state.input} - 反馈已记录,感谢您的意见`;
nextStep = 'completion';
}

return new Command({
goto: nextStep,
update: {
currentStep: 'feedback_handler',
result,
routingHistory: [...(state.routingHistory || []), 'feedback_processed'],
},
});
}

// 通用处理节点
function generalHandler(state: typeof StateAnnotation.State) {
console.log('📝 执行通用处理');

const result = `通用处理: ${state.input} - 已按标准流程处理`;

return new Command({
goto: 'completion',
update: {
currentStep: 'general_handler',
result,
routingHistory: [...(state.routingHistory || []), 'general_processed'],
},
});
}

// 认证检查节点
function authCheck(state: typeof StateAnnotation.State) {
console.log('🔐 执行认证检查');

// 模拟认证检查
const isAuthenticated = Math.random() > 0.3; // 70% 通过率
const originalHandler = state.metadata?.originalHandler;

if (isAuthenticated) {
return new Command({
goto: originalHandler || 'completion',
update: {
currentStep: 'auth_check',
routingHistory: [...(state.routingHistory || []), 'auth_passed'],
metadata: {
...state.metadata,
authenticated: true,
},
},
});
} else {
return new Command({
goto: 'auth_failure',
update: {
currentStep: 'auth_check',
routingHistory: [...(state.routingHistory || []), 'auth_failed'],
metadata: {
...state.metadata,
authenticated: false,
},
},
});
}
}

// 认证失败处理节点
function authFailure(state: typeof StateAnnotation.State) {
console.log('❌ 认证失败处理');

const result = `认证失败: 请先登录后再进行 "${state.input}" 操作`;

return new Command({
goto: 'completion',
update: {
currentStep: 'auth_failure',
result,
routingHistory: [...(state.routingHistory || []), 'auth_failure_handled'],
},
});
}

// 升级处理节点
function escalationHandler(state: typeof StateAnnotation.State) {
console.log('⬆️ 执行升级处理');

const result = `升级处理: ${state.input} - 已升级到高级处理团队`;

return new Command({
goto: 'completion',
update: {
currentStep: 'escalation_handler',
result,
routingHistory: [...(state.routingHistory || []), 'escalated'],
},
});
}

// 人工审核节点
function humanReview(state: typeof StateAnnotation.State) {
console.log('👤 人工审核');

const result = `人工审核: ${state.input} - 已转交人工处理`;

return new Command({
goto: 'completion',
update: {
currentStep: 'human_review',
result,
routingHistory: [...(state.routingHistory || []), 'human_reviewed'],
},
});
}

// 完成节点
function completion(state: typeof StateAnnotation.State) {
console.log('✅ 处理完成');

return {
currentStep: 'completion',
routingHistory: [...(state.routingHistory || []), 'completed'],
};
}

// 构建动态路由图
const dynamicRoutingGraph = new StateGraph(StateAnnotation)
.addNode('routing_decision', routingDecision, { ends: ['priority_handler', 'support_handler', 'order_handler', 'feedback_handler', 'general_handler'] })
.addNode('priority_handler', priorityHandler, { ends: ['escalation_handler', 'completion'] })
.addNode('support_handler', supportHandler, { ends: ['completion', 'human_review'] })
.addNode('order_handler', orderHandler, { ends: ['auth_check', 'completion'] })
.addNode('feedback_handler', feedbackHandler, { ends: ['escalation_handler', 'completion'] })
.addNode('general_handler', generalHandler, { ends: ['completion'] })
.addNode('auth_check', authCheck, { ends: ['order_handler', 'completion', 'auth_failure'] })
.addNode('auth_failure', authFailure, { ends: ['completion'] })
.addNode('escalation_handler', escalationHandler, { ends: ['completion'] })
.addNode('human_review', humanReview, { ends: ['completion'] })
.addNode('completion', completion)
.setEntryPoint('routing_decision')
.addEdge('completion', END);

// 内存存储器
const memorySaver = new MemorySaver();

// 编译图
const app = dynamicRoutingGraph.compile({
checkpointer: memorySaver,
});

// 动态路由测试函数
async function testDynamicRouting() {
console.log('🧪 动态路由测试');
console.log('='.repeat(50));

const testCases = [
'urgent help needed with my account',
'I have a question about your service',
'I want to place an order for 5 items',
'Great service, very satisfied!',
'Hello, how are you today?',
'emergency: system is down',
'negative feedback about delivery',
];

for (let i = 0; i < testCases.length; i++) {
const input = testCases[i];
const threadId = `test-${i + 1}`;

console.log(`\n📝 测试案例 ${i + 1}: "${input}"`);
console.log('-'.repeat(30));

try {
const result = await app.invoke(
{
input,
routingHistory: [],
metadata: {},
},
{ configurable: { thread_id: threadId } }
);

console.log(`✅ 最终结果: ${result.result}`);
console.log(`📍 当前步骤: ${result.currentStep}`);
console.log(`🛤️ 路由历史: ${result.routingHistory?.join(' → ')}`);

if (result.metadata && Object.keys(result.metadata).length > 0) {
console.log(`📊 元数据: ${JSON.stringify(result.metadata, null, 2)}`);
}
} catch (error) {
console.error(`❌ 测试失败:`, error);
}
}
}

// 交互式路由测试
async function interactiveRoutingTest(input: string) {
console.log(`\n🎯 交互式路由测试: "${input}"`);

const threadId = `interactive-${Date.now()}`;

try {
const result = await app.invoke(
{
input,
routingHistory: [],
metadata: {},
},
{ configurable: { thread_id: threadId } }
);

console.log('\n📋 处理结果:');
console.log(` 结果: ${result.result}`);
console.log(` 步骤: ${result.currentStep}`);
console.log(` 路由: ${result.routingHistory?.join(' → ')}`);

if (result.metadata) {
console.log(` 元数据: ${JSON.stringify(result.metadata, null, 2)}`);
}

return result;
} catch (error) {
console.error('❌ 处理失败:', error);
return null;
}
}

// 路由统计分析
async function analyzeRoutingPatterns() {
console.log('\n📊 路由模式分析');
console.log('='.repeat(50));

const patterns = [
{ pattern: 'urgent', expectedRoute: 'priority_handler', count: 0 },
{ pattern: 'question', expectedRoute: 'support_handler', count: 0 },
{ pattern: 'order', expectedRoute: 'order_handler', count: 0 },
{ pattern: 'feedback', expectedRoute: 'feedback_handler', count: 0 },
{ pattern: 'general', expectedRoute: 'general_handler', count: 0 },
];

// 生成测试数据
const testInputs = [
'urgent issue with payment',
'question about shipping',
'order status inquiry',
'positive feedback',
'hello there',
'emergency support needed',
'help with account',
'place new order',
'great experience',
'random message',
];

for (let i = 0; i < testInputs.length; i++) {
const input = testInputs[i];
const threadId = `analysis-${i}`;

try {
const result = await app.invoke(
{
input,
routingHistory: [],
metadata: {},
},
{ configurable: { thread_id: threadId } }
);

// 分析路由模式
const routeHistory = result.routingHistory || [];
const firstRoute = routeHistory[0]?.replace('routed_to_', '');

patterns.forEach((pattern) => {
if (input.toLowerCase().includes(pattern.pattern)) {
pattern.count++;
if (firstRoute === pattern.expectedRoute) {
console.log(`✅ 正确路由: ${input} -> ${firstRoute}`);
} else {
console.log(
`❌ 路由偏差: ${input} -> ${firstRoute} (期望: ${pattern.expectedRoute})`
);
}
}
});
} catch (error) {
console.error(`❌ 分析失败:`, error);
}
}

console.log('\n📈 路由统计结果:');
patterns.forEach((pattern) => {
console.log(` ${pattern.pattern}: ${pattern.count} 次匹配`);
});
}

// 运行演示
if (require.main === module) {
testDynamicRouting()
.then(() => analyzeRoutingPatterns())
.catch(console.error);
}

export {
dynamicRoutingGraph,
app,
testDynamicRouting,
interactiveRoutingTest,
analyzeRoutingPatterns,
};

/*
执行结果:
Line:8 🌭 path.resolve(__dirname, '.env') /Users/loock/myFile/langgraphjs-tutorial/websites/examples/utils/.env
🧪 动态路由测试
==================================================

📝 测试案例 1: "urgent help needed with my account"
------------------------------
🎯 执行路由决策
📍 路由到: priority_handler
🚨 执行优先级处理
⬆️ 执行升级处理
✅ 处理完成
✅ 最终结果: 升级处理: urgent help needed with my account - 已升级到高级处理团队
📍 当前步骤: completion
🛤️ 路由历史: routed_to_priority_handler → priority_processed → escalated → completed
📊 元数据: {
"priority": "high",
"escalate": true
}

📝 测试案例 2: "I have a question about your service"
------------------------------
🎯 执行路由决策
📍 路由到: support_handler
🎧 执行支持处理
✅ 处理完成
✅ 最终结果: 自动回复: 感谢您的问题 "I have a question about your service"。我们的支持团队会在24小时内回复您。
📍 当前步骤: completion
🛤️ 路由历史: routed_to_support_handler → support_processed → completed
📊 元数据: {
"category": "support",
"autoReply": true
}

📝 测试案例 3: "I want to place an order for 5 items"
------------------------------
🎯 执行路由决策
📍 路由到: order_handler
🛒 执行订单处理
🔐 执行认证检查
❌ 认证失败处理
✅ 处理完成
✅ 最终结果: 认证失败: 请先登录后再进行 "I want to place an order for 5 items" 操作
📍 当前步骤: completion
🛤️ 路由历史: routed_to_order_handler → order_auth_required → auth_failed → auth_failure_handled → completed
📊 元数据: {
"category": "sales",
"requiresAuth": true,
"authRequired": true,
"originalHandler": "order_handler",
"authenticated": false
}

📝 测试案例 4: "Great service, very satisfied!"
------------------------------
🎯 执行路由决策
📍 路由到: general_handler
📝 执行通用处理
✅ 处理完成
✅ 最终结果: 通用处理: Great service, very satisfied! - 已按标准流程处理
📍 当前步骤: completion
🛤️ 路由历史: routed_to_general_handler → general_processed → completed
📊 元数据: {
"category": "general",
"defaultFlow": true
}

📝 测试案例 5: "Hello, how are you today?"
------------------------------
🎯 执行路由决策
📍 路由到: general_handler
📝 执行通用处理
✅ 处理完成
✅ 最终结果: 通用处理: Hello, how are you today? - 已按标准流程处理
📍 当前步骤: completion
🛤️ 路由历史: routed_to_general_handler → general_processed → completed
📊 元数据: {
"category": "general",
"defaultFlow": true
}

📝 测试案例 6: "emergency: system is down"
------------------------------
🎯 执行路由决策
📍 路由到: priority_handler
🚨 执行优先级处理
⬆️ 执行升级处理
✅ 处理完成
✅ 最终结果: 升级处理: emergency: system is down - 已升级到高级处理团队
📍 当前步骤: completion
🛤️ 路由历史: routed_to_priority_handler → priority_processed → escalated → completed
📊 元数据: {
"priority": "high",
"escalate": true
}

📝 测试案例 7: "negative feedback about delivery"
------------------------------
🎯 执行路由决策
📍 路由到: feedback_handler
💬 执行反馈处理
✅ 处理完成
✅ 最终结果: 反馈处理: negative feedback about delivery - 反馈已记录,感谢您的意见
📍 当前步骤: completion
🛤️ 路由历史: routed_to_feedback_handler → feedback_processed → completed
📊 元数据: {
"category": "feedback",
"sentiment": "neutral"
}

📊 路由模式分析
==================================================
🎯 执行路由决策
📍 路由到: priority_handler
🚨 执行优先级处理
⬆️ 执行升级处理
✅ 处理完成
✅ 正确路由: urgent issue with payment -> priority_handler
🎯 执行路由决策
📍 路由到: support_handler
🎧 执行支持处理
✅ 处理完成
✅ 正确路由: question about shipping -> support_handler
🎯 执行路由决策
📍 路由到: order_handler
🛒 执行订单处理
🔐 执行认证检查
🛒 执行订单处理
✅ 处理完成
✅ 正确路由: order status inquiry -> order_handler
🎯 执行路由决策
📍 路由到: feedback_handler
💬 执行反馈处理
✅ 处理完成
✅ 正确路由: positive feedback -> feedback_handler
🎯 执行路由决策
📍 路由到: general_handler
📝 执行通用处理
✅ 处理完成
🎯 执行路由决策
📍 路由到: priority_handler
🚨 执行优先级处理
⬆️ 执行升级处理
✅ 处理完成
🎯 执行路由决策
📍 路由到: support_handler
🎧 执行支持处理
✅ 处理完成
🎯 执行路由决策
📍 路由到: order_handler
🛒 执行订单处理
🔐 执行认证检查
🛒 执行订单处理
✅ 处理完成
✅ 正确路由: place new order -> order_handler
🎯 执行路由决策
📍 路由到: general_handler
📝 执行通用处理
✅ 处理完成
🎯 执行路由决策
📍 路由到: general_handler
📝 执行通用处理
✅ 处理完成

📈 路由统计结果:
urgent: 1 次匹配
question: 1 次匹配
order: 2 次匹配
feedback: 1 次匹配
general: 0 次匹配
*/

高级功能

多代理切换

多代理切换
/**
* ============================================================================
* 多代理切换 - Multi-Agent Switching with Command Objects
* ============================================================================
*
* 📖 概述
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* 本示例展示如何使用 Command 对象实现多代理系统的智能切换。根据任务类型自动路由到
* 专门代理(技术、销售、设计等),支持质量检查和人工审核,提供灵活的代理协作机制。
*
* 🎯 核心功能
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* 1️⃣ 任务分析:自动识别任务类型(技术、销售、设计、通用)
* 2️⃣ 代理路由:根据任务类型路由到对应的专门代理
* 3️⃣ 质量检查:自动评估代理处理结果的质量
* 4️⃣ 人工审核:质量不达标时触发人工审核流程
* 5️⃣ 专家代理:复杂问题升级到专家代理处理
*
* 💡 实现思路
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* • 任务分析器:基于关键词识别任务类型
* • 专门代理:针对不同领域提供专业处理能力
* • 质量检查器:评估处理结果质量并决定是否需要复审
* • 升级机制:自动将复杂任务升级到更高级的代理
*
* 🚀 使用方式
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* // 运行示例
* $ npx esno 实用功能/Command对象/multi-agent-switching.ts
*
* ⚠️ 注意事项
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* • 任务分类逻辑应该准确且覆盖全面
* • 质量评分标准需要根据实际业务调整
* • 建议记录代理切换历史用于性能分析
*
* @author 程哥
* @version 1.0.0
* @updated 2025-11
*/

import '../../utils/loadEnv';
import { StateGraph, Annotation, END, Command } from '@langchain/langgraph';
import { MemorySaver } from '@langchain/langgraph';

// 定义状态结构
const StateAnnotation = Annotation.Root({
input: Annotation<string>,
taskType: Annotation<string>,
currentAgent: Annotation<string>,
result: Annotation<string>,
agentHistory: Annotation<string[]>,
metadata: Annotation<Record<string, any>>,
});

// 任务分析节点
function analyzeTask(state: typeof StateAnnotation.State) {
console.log('🔍 分析任务类型:', state.input);

const input = state.input.toLowerCase();
let taskType = 'general';

if (
input.includes('技术') ||
input.includes('代码') ||
input.includes('编程')
) {
taskType = 'technical';
} else if (
input.includes('销售') ||
input.includes('市场') ||
input.includes('客户')
) {
taskType = 'sales';
} else if (
input.includes('设计') ||
input.includes('UI') ||
input.includes('界面')
) {
taskType = 'design';
}

console.log(`📋 任务类型识别为: ${taskType}`);

return new Command({
update: {
taskType,
agentHistory: [...(state.agentHistory || []), 'task_analyzer'],
},
goto: 'route_to_agent',
});
}

// 代理路由节点
function routeToAgent(state: typeof StateAnnotation.State) {
console.log('🚦 路由到专门代理');

const { taskType } = state;
let targetAgent = 'general_agent';

switch (taskType) {
case 'technical':
targetAgent = 'technical_agent';
break;
case 'sales':
targetAgent = 'sales_agent';
break;
case 'design':
targetAgent = 'design_agent';
break;
default:
targetAgent = 'general_agent';
}

console.log(`🎯 路由到代理: ${targetAgent}`);

return new Command({
update: {
currentAgent: targetAgent,
agentHistory: [...(state.agentHistory || []), 'router'],
},
goto: targetAgent,
});
}

// 技术代理
function technicalAgent(state: typeof StateAnnotation.State) {
console.log('💻 技术代理处理中...');

const technicalResponse = `技术分析: ${state.input}

建议的技术方案:
1. 使用现代框架进行开发
2. 考虑性能优化策略
3. 实施代码质量检查
4. 建立自动化测试流程`;

console.log('✅ 技术代理处理完成');

return new Command({
update: {
result: technicalResponse,
agentHistory: [...(state.agentHistory || []), 'technical_agent'],
metadata: {
...state.metadata,
processingTime: Date.now(),
agentType: 'technical',
},
},
goto: 'quality_check',
});
}

// 销售代理
function salesAgent(state: typeof StateAnnotation.State) {
console.log('💼 销售代理处理中...');

const salesResponse = `销售策略分析: ${state.input}

推荐的销售方案:
1. 目标客户群体分析
2. 竞争对手调研
3. 价格策略制定
4. 销售渠道优化`;

console.log('✅ 销售代理处理完成');

return new Command({
update: {
result: salesResponse,
agentHistory: [...(state.agentHistory || []), 'sales_agent'],
metadata: {
...state.metadata,
processingTime: Date.now(),
agentType: 'sales',
},
},
goto: 'quality_check',
});
}

// 设计代理
function designAgent(state: typeof StateAnnotation.State) {
console.log('🎨 设计代理处理中...');

const designResponse = `设计方案建议: ${state.input}

设计要点:
1. 用户体验优化
2. 视觉层次设计
3. 交互流程规划
4. 响应式设计考虑`;

console.log('✅ 设计代理处理完成');

return new Command({
update: {
result: designResponse,
agentHistory: [...(state.agentHistory || []), 'design_agent'],
metadata: {
...state.metadata,
processingTime: Date.now(),
agentType: 'design',
},
},
goto: 'quality_check',
});
}

// 通用代理
function generalAgent(state: typeof StateAnnotation.State) {
console.log('🤖 通用代理处理中...');

const generalResponse = `通用处理结果: ${state.input}

基础建议:
1. 明确需求和目标
2. 制定详细计划
3. 分步骤执行
4. 定期评估进展`;

console.log('✅ 通用代理处理完成');

return new Command({
update: {
result: generalResponse,
agentHistory: [...(state.agentHistory || []), 'general_agent'],
metadata: {
...state.metadata,
processingTime: Date.now(),
agentType: 'general',
},
},
goto: 'quality_check',
});
}

// 质量检查节点
function qualityCheck(state: typeof StateAnnotation.State) {
console.log('🔍 质量检查中...');

const { result, currentAgent } = state;
const qualityScore = Math.floor(Math.random() * 30) + 70; // 70-100分

console.log(`📊 质量评分: ${qualityScore}/100`);

if (qualityScore < 80) {
console.log('⚠️ 质量不达标,需要重新处理');

// 切换到更专业的代理或请求人工审核
return new Command({
update: {
metadata: {
...state.metadata,
qualityScore,
needsReview: true,
},
agentHistory: [...(state.agentHistory || []), 'quality_checker'],
},
goto: 'human_review',
});
}

console.log('✅ 质量检查通过');

return new Command({
update: {
metadata: {
...state.metadata,
qualityScore,
approved: true,
},
agentHistory: [...(state.agentHistory || []), 'quality_checker'],
},
goto: 'finalize',
});
}

// 人工审核节点
function humanReview(state: typeof StateAnnotation.State) {
console.log('👤 请求人工审核...');

// 模拟人工审核决策
const humanApproval = Math.random() > 0.3; // 70% 通过率

if (humanApproval) {
console.log('✅ 人工审核通过');

return new Command({
update: {
metadata: {
...state.metadata,
humanApproved: true,
},
agentHistory: [...(state.agentHistory || []), 'human_reviewer'],
},
goto: 'finalize',
});
} else {
console.log('❌ 人工审核不通过,切换到专家代理');

return new Command({
update: {
currentAgent: 'expert_agent',
agentHistory: [...(state.agentHistory || []), 'human_reviewer'],
},
goto: 'expert_agent',
});
}
}

// 专家代理
function expertAgent(state: typeof StateAnnotation.State) {
console.log('🎓 专家代理处理中...');

const expertResponse = `专家级分析: ${state.input}

深度分析结果:
1. 综合多维度考虑
2. 行业最佳实践应用
3. 风险评估和缓解
4. 长期战略规划
5. 创新解决方案建议

专家建议: 基于当前需求和行业趋势,建议采用渐进式实施策略,确保稳定性的同时保持创新性。`;

console.log('✅ 专家代理处理完成');

return new Command({
update: {
result: expertResponse,
agentHistory: [...(state.agentHistory || []), 'expert_agent'],
metadata: {
...state.metadata,
processingTime: Date.now(),
agentType: 'expert',
expertLevel: true,
},
},
goto: 'finalize',
});
}

// 最终处理节点
function finalize(state: typeof StateAnnotation.State) {
console.log('🏁 最终处理');

const { result, agentHistory, metadata } = state;

console.log('\n📋 处理摘要:');
console.log(`🎯 最终结果: ${result}`);
console.log(`🔄 代理路径: ${agentHistory?.join(' → ')}`);
console.log(`📊 质量评分: ${metadata?.qualityScore || 'N/A'}`);
console.log(`⏱️ 处理时间: ${metadata?.processingTime || 'N/A'}`);

return {
result,
agentHistory,
metadata: {
...metadata,
completed: true,
finalizedAt: new Date().toISOString(),
},
};
}

// 构建多代理切换图
const multiAgentGraph = new StateGraph(StateAnnotation)
.addNode('analyze_task', analyzeTask, { ends: ['route_to_agent'] })
.addNode('route_to_agent', routeToAgent, { ends: ['technical_agent', 'sales_agent', 'design_agent', 'general_agent'] })
.addNode('technical_agent', technicalAgent, { ends: ['quality_check'] })
.addNode('sales_agent', salesAgent, { ends: ['quality_check'] })
.addNode('design_agent', designAgent, { ends: ['quality_check'] })
.addNode('general_agent', generalAgent, { ends: ['quality_check'] })
.addNode('quality_check', qualityCheck, { ends: ['human_review', 'finalize'] })
.addNode('human_review', humanReview, { ends: ['expert_agent', 'finalize'] })
.addNode('expert_agent', expertAgent, { ends: ['finalize'] })
.addNode('finalize', finalize)
.setEntryPoint('analyze_task')
.addEdge('finalize', END);

// 内存存储器
const memorySaver = new MemorySaver();

// 编译图
const app = multiAgentGraph.compile({
checkpointer: memorySaver,
});

// 多代理切换演示
async function demonstrateMultiAgentSwitching() {
console.log('🎭 多代理切换演示');
console.log('='.repeat(60));

const testCases = [
'我需要开发一个新的Web应用,使用React和TypeScript',
'如何提高我们产品的销售业绩?',
'设计一个用户友好的移动应用界面',
'帮我分析一下市场趋势',
];

for (let i = 0; i < testCases.length; i++) {
const input = testCases[i];
console.log(`\n🧪 测试案例 ${i + 1}: "${input}"`);
console.log('-'.repeat(50));

try {
const result = await app.invoke(
{
input,
taskType: '',
currentAgent: '',
result: '',
agentHistory: [],
metadata: {},
},
{
configurable: {
thread_id: `test-case-${i + 1}`,
},
}
);

console.log('\n✅ 处理完成');
console.log(`📄 最终结果: ${result.result?.substring(0, 100)}...`);
console.log(`🔄 代理切换路径: ${result.agentHistory?.join(' → ')}`);
} catch (error) {
console.error('❌ 处理失败:', error);
}

console.log('\n' + '='.repeat(60));
}
}

// 代理性能分析
class AgentPerformanceAnalyzer {
private performanceData: Map<
string,
{
callCount: number;
totalTime: number;
successRate: number;
qualityScores: number[];
}
> = new Map();

recordAgentPerformance(
agentName: string,
processingTime: number,
qualityScore: number,
success: boolean
) {
if (!this.performanceData.has(agentName)) {
this.performanceData.set(agentName, {
callCount: 0,
totalTime: 0,
successRate: 0,
qualityScores: [],
});
}

const data = this.performanceData.get(agentName)!;
data.callCount++;
data.totalTime += processingTime;
data.qualityScores.push(qualityScore);
data.successRate =
(data.successRate * (data.callCount - 1) + (success ? 1 : 0)) /
data.callCount;
}

generateReport(): string {
let report = '\n📊 代理性能报告\n';
report += '='.repeat(50) + '\n';

for (const [agentName, data] of this.performanceData) {
const avgTime = data.totalTime / data.callCount;
const avgQuality =
data.qualityScores.reduce((sum, score) => sum + score, 0) /
data.qualityScores.length;

report += `\n🤖 ${agentName}:\n`;
report += ` 📞 调用次数: ${data.callCount}\n`;
report += ` ⏱️ 平均处理时间: ${avgTime.toFixed(2)}ms\n`;
report += ` ✅ 成功率: ${(data.successRate * 100).toFixed(1)}%\n`;
report += ` 📊 平均质量分: ${avgQuality.toFixed(1)}\n`;
}

return report;
}
}

// 运行演示
if (require.main === module) {
demonstrateMultiAgentSwitching().catch(console.error);
}

export {
multiAgentGraph,
app,
demonstrateMultiAgentSwitching,
AgentPerformanceAnalyzer,
};

/*
执行结果:
Line:8 🌭 path.resolve(__dirname, '.env') /Users/loock/myFile/langgraphjs-tutorial/websites/examples/utils/.env
🎭 多代理切换演示
============================================================

🧪 测试案例 1: "我需要开发一个新的Web应用,使用React和TypeScript"
--------------------------------------------------
🔍 分析任务类型: 我需要开发一个新的Web应用,使用React和TypeScript
📋 任务类型识别为: general
🚦 路由到专门代理
🎯 路由到代理: general_agent
🤖 通用代理处理中...
✅ 通用代理处理完成
🔍 质量检查中...
📊 质量评分: 88/100
✅ 质量检查通过
🏁 最终处理

📋 处理摘要:
🎯 最终结果: 通用处理结果: 我需要开发一个新的Web应用,使用React和TypeScript

基础建议:
1. 明确需求和目标
2. 制定详细计划
3. 分步骤执行
4. 定期评估进展
🔄 代理路径: task_analyzer → router → general_agent → quality_checker
📊 质量评分: 88
⏱️ 处理时间: 1763296096853

✅ 处理完成
📄 最终结果: 通用处理结果: 我需要开发一个新的Web应用,使用React和TypeScript

基础建议:
1. 明确需求和目标
2. 制定详细计划
3. 分步骤执行
4. 定期评估进展...
🔄 代理切换路径: task_analyzer → router → general_agent → quality_checker

============================================================

🧪 测试案例 2: "如何提高我们产品的销售业绩?"
--------------------------------------------------
🔍 分析任务类型: 如何提高我们产品的销售业绩?
📋 任务类型识别为: sales
🚦 路由到专门代理
🎯 路由到代理: sales_agent
💼 销售代理处理中...
✅ 销售代理处理完成
🔍 质量检查中...
📊 质量评分: 95/100
✅ 质量检查通过
🏁 最终处理

📋 处理摘要:
🎯 最终结果: 销售策略分析: 如何提高我们产品的销售业绩?

推荐的销售方案:
1. 目标客户群体分析
2. 竞争对手调研
3. 价格策略制定
4. 销售渠道优化
🔄 代理路径: task_analyzer → router → sales_agent → quality_checker
📊 质量评分: 95
⏱️ 处理时间: 1763296096865

✅ 处理完成
📄 最终结果: 销售策略分析: 如何提高我们产品的销售业绩?

推荐的销售方案:
1. 目标客户群体分析
2. 竞争对手调研
3. 价格策略制定
4. 销售渠道优化...
🔄 代理切换路径: task_analyzer → router → sales_agent → quality_checker

============================================================

🧪 测试案例 3: "设计一个用户友好的移动应用界面"
--------------------------------------------------
🔍 分析任务类型: 设计一个用户友好的移动应用界面
📋 任务类型识别为: design
🚦 路由到专门代理
🎯 路由到代理: design_agent
🎨 设计代理处理中...
✅ 设计代理处理完成
🔍 质量检查中...
📊 质量评分: 83/100
✅ 质量检查通过
🏁 最终处理

📋 处理摘要:
🎯 最终结果: 设计方案建议: 设计一个用户友好的移动应用界面

设计要点:
1. 用户体验优化
2. 视觉层次设计
3. 交互流程规划
4. 响应式设计考虑
🔄 代理路径: task_analyzer → router → design_agent → quality_checker
📊 质量评分: 83
⏱️ 处理时间: 1763296096870

✅ 处理完成
📄 最终结果: 设计方案建议: 设计一个用户友好的移动应用界面

设计要点:
1. 用户体验优化
2. 视觉层次设计
3. 交互流程规划
4. 响应式设计考虑...
🔄 代理切换路径: task_analyzer → router → design_agent → quality_checker

============================================================

🧪 测试案例 4: "帮我分析一下市场趋势"
--------------------------------------------------
🔍 分析任务类型: 帮我分析一下市场趋势
📋 任务类型识别为: sales
🚦 路由到专门代理
🎯 路由到代理: sales_agent
💼 销售代理处理中...
✅ 销售代理处理完成
🔍 质量检查中...
📊 质量评分: 81/100
✅ 质量检查通过
🏁 最终处理

📋 处理摘要:
🎯 最终结果: 销售策略分析: 帮我分析一下市场趋势

推荐的销售方案:
1. 目标客户群体分析
2. 竞争对手调研
3. 价格策略制定
4. 销售渠道优化
🔄 代理路径: task_analyzer → router → sales_agent → quality_checker
📊 质量评分: 81
⏱️ 处理时间: 1763296096874

✅ 处理完成
📄 最终结果: 销售策略分析: 帮我分析一下市场趋势

推荐的销售方案:
1. 目标客户群体分析
2. 竞争对手调研
3. 价格策略制定
4. 销售渠道优化...
🔄 代理切换路径: task_analyzer → router → sales_agent → quality_checker

============================================================
*/

条件执行

条件执行控制
/**
* ============================================================================
* 条件执行 - Conditional Execution with Command Objects
* ============================================================================
*
* 📖 概述
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* 本示例展示如何使用 Command 对象实现复杂的条件执行逻辑。通过评估多维度条件(权限、
* 紧急度、复杂度等),智能路由到不同的处理流程,实现灵活的工作流控制。
*
* 🎯 核心功能
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* 1️⃣ 条件评估:综合多个维度评估执行条件(权限、紧急度、复杂度、审批需求)
* 2️⃣ 智能路由:根据条件评估结果动态路由到对应处理节点
* 3️⃣ 权限控制:支持基于角色的访问控制和权限检查
* 4️⃣ 审批流程:对敏感操作自动触发审批工作流
* 5️⃣ 执行追踪:记录完整的执行路径和条件评估结果
*
* 💡 实现思路
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* • 条件评估器:分析输入和用户信息,生成多维度条件结果
* • 路由决策节点:基于条件组合决定最优执行路径
* • 专门处理器:针对不同场景(紧急、复杂、标准)的处理逻辑
* • 执行路径记录:维护状态历史,便于调试和审计
*
* 🚀 使用方式
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* // 运行示例
* $ npx esno 实用功能/Command对象/conditional-execution.ts
*
* ⚠️ 注意事项
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* • 条件逻辑应该清晰且可维护
* • 建议记录详细的执行路径用于审计
* • 权限检查应该在实际应用中与身份认证系统集成
*
* @author 程哥
* @version 1.0.0
* @updated 2025-11
*/

import '../../utils/loadEnv';
import { StateGraph, Annotation, END, Command } from '@langchain/langgraph';
import { MemorySaver } from '@langchain/langgraph';

// 定义状态结构
const StateAnnotation = Annotation.Root({
input: Annotation<string>,
userRole: Annotation<string>,
priority: Annotation<string>,
conditions: Annotation<Record<string, any>>,
result: Annotation<string>,
executionPath: Annotation<string[]>,
metadata: Annotation<Record<string, any>>,
});

// 条件评估节点
function evaluateConditions(state: typeof StateAnnotation.State) {
console.log('🔍 评估执行条件');

const { input, userRole } = state;
const conditions = {
hasPermission: userRole === 'admin' || userRole === 'manager',
isUrgent:
input.toLowerCase().includes('紧急') ||
input.toLowerCase().includes('urgent'),
isComplexTask: input.length > 100,
requiresApproval:
input.toLowerCase().includes('删除') ||
input.toLowerCase().includes('delete'),
};

console.log('📋 条件评估结果:', conditions);

return new Command({
update: {
conditions,
executionPath: [...(state.executionPath || []), 'condition_evaluator'],
},
goto: 'route_by_conditions',
});
}

// 条件路由节点
function routeByConditions(state: typeof StateAnnotation.State) {
console.log('🚦 根据条件进行路由');

const { conditions, userRole } = state;

// 复杂的条件判断逻辑
if (conditions.requiresApproval && !conditions.hasPermission) {
console.log('❌ 需要审批但权限不足');
return new Command({
update: {
executionPath: [...(state.executionPath || []), 'permission_denied'],
},
goto: 'permission_denied',
});
}

if (conditions.isUrgent && conditions.hasPermission) {
console.log('🚨 紧急任务,直接执行');
return new Command({
update: {
priority: 'high',
executionPath: [...(state.executionPath || []), 'urgent_handler'],
},
goto: 'urgent_execution',
});
}

if (conditions.isComplexTask) {
console.log('🧩 复杂任务,需要专门处理');
return new Command({
update: {
priority: 'normal',
executionPath: [...(state.executionPath || []), 'complex_handler'],
},
goto: 'complex_execution',
});
}

if (conditions.requiresApproval) {
console.log('✋ 需要审批流程');
return new Command({
update: {
executionPath: [...(state.executionPath || []), 'approval_required'],
},
goto: 'approval_workflow',
});
}

console.log('📝 标准处理流程');
return new Command({
update: {
priority: 'normal',
executionPath: [...(state.executionPath || []), 'standard_handler'],
},
goto: 'standard_execution',
});
}

// 权限拒绝处理
function permissionDenied(state: typeof StateAnnotation.State) {
console.log('🚫 权限不足,拒绝执行');

const result = `权限不足: 用户角色 "${state.userRole}" 无法执行此操作。请联系管理员获取必要权限。`;

return new Command({
update: {
result,
executionPath: [
...(state.executionPath || []),
'permission_denied_handler',
],
metadata: {
...state.metadata,
status: 'denied',
reason: 'insufficient_permissions',
},
},
goto: 'finalize',
});
}

// 紧急执行处理
function urgentExecution(state: typeof StateAnnotation.State) {
console.log('🚨 紧急任务执行中...');

// 模拟紧急处理逻辑
const result = `紧急处理完成: ${state.input}

执行详情:
- 优先级: 最高
- 处理时间: 立即
- 跳过常规审批流程
- 结果: 任务已紧急处理完成`;

console.log('✅ 紧急任务处理完成');

return new Command({
update: {
result,
executionPath: [...(state.executionPath || []), 'urgent_executor'],
metadata: {
...state.metadata,
status: 'completed',
processingType: 'urgent',
completedAt: new Date().toISOString(),
},
},
goto: 'finalize',
});
}

// 复杂任务执行
function complexExecution(state: typeof StateAnnotation.State) {
console.log('🧩 复杂任务执行中...');

// 模拟复杂任务处理
const steps = [
'分析任务复杂度',
'分解子任务',
'分配资源',
'执行各子任务',
'整合结果',
];

const result = `复杂任务处理完成: ${state.input}

处理步骤:
${steps.map((step, index) => `${index + 1}. ${step}`).join('\n')}

结果: 复杂任务已成功分解并完成处理`;

console.log('✅ 复杂任务处理完成');

return new Command({
update: {
result,
executionPath: [...(state.executionPath || []), 'complex_executor'],
metadata: {
...state.metadata,
status: 'completed',
processingType: 'complex',
steps,
completedAt: new Date().toISOString(),
},
},
goto: 'finalize',
});
}

// 审批工作流
function approvalWorkflow(state: typeof StateAnnotation.State) {
console.log('✋ 进入审批工作流...');

// 模拟审批过程
const approvalSteps = [
'提交审批申请',
'等待管理员审核',
'风险评估',
'最终批准',
];

// 模拟审批结果(80% 通过率)
const approved = Math.random() > 0.2;

if (approved) {
console.log('✅ 审批通过,继续执行');

return new Command({
update: {
executionPath: [...(state.executionPath || []), 'approval_granted'],
metadata: {
...state.metadata,
approvalStatus: 'approved',
approvalSteps,
},
},
goto: 'approved_execution',
});
} else {
console.log('❌ 审批被拒绝');

const result = `审批被拒绝: ${state.input}

审批流程:
${approvalSteps.map((step, index) => `${index + 1}. ${step}`).join('\n')}

拒绝原因: 操作风险过高,不符合安全策略`;

return new Command({
update: {
result,
executionPath: [...(state.executionPath || []), 'approval_denied'],
metadata: {
...state.metadata,
status: 'denied',
approvalStatus: 'rejected',
reason: 'high_risk_operation',
},
},
goto: 'finalize',
});
}
}

// 审批通过后执行
function approvedExecution(state: typeof StateAnnotation.State) {
console.log('✅ 审批通过,执行任务...');

const result = `审批通过后执行完成: ${state.input}

执行详情:
- 已通过审批流程
- 风险评估: 可接受
- 执行权限: 已授权
- 结果: 任务在审批监督下成功完成`;

console.log('✅ 审批任务执行完成');

return new Command({
update: {
result,
executionPath: [...(state.executionPath || []), 'approved_executor'],
metadata: {
...state.metadata,
status: 'completed',
processingType: 'approved',
completedAt: new Date().toISOString(),
},
},
goto: 'finalize',
});
}

// 标准执行
function standardExecution(state: typeof StateAnnotation.State) {
console.log('📝 标准任务执行中...');

const result = `标准处理完成: ${state.input}

执行详情:
- 处理类型: 标准流程
- 权限检查: 通过
- 执行时间: 正常
- 结果: 任务按标准流程成功完成`;

console.log('✅ 标准任务处理完成');

return new Command({
update: {
result,
executionPath: [...(state.executionPath || []), 'standard_executor'],
metadata: {
...state.metadata,
status: 'completed',
processingType: 'standard',
completedAt: new Date().toISOString(),
},
},
goto: 'finalize',
});
}

// 最终处理节点
function finalize(state: typeof StateAnnotation.State) {
console.log('🏁 条件执行完成');

const { result, executionPath, metadata, conditions } = state;

console.log('\n📋 执行摘要:');
console.log(`🎯 最终结果: ${result}`);
console.log(`🔄 执行路径: ${executionPath?.join(' → ')}`);
console.log(`📊 条件评估: ${JSON.stringify(conditions, null, 2)}`);
console.log(`📈 执行状态: ${metadata?.status || 'unknown'}`);

return {
result,
executionPath,
conditions,
metadata: {
...metadata,
finalizedAt: new Date().toISOString(),
completed: true,
},
};
}

// 构建条件执行图
const conditionalExecutionGraph = new StateGraph(StateAnnotation)
.addNode('evaluate_conditions', evaluateConditions, { ends: ['route_by_conditions'] })
.addNode('route_by_conditions', routeByConditions, { ends: ['permission_denied', 'urgent_execution', 'complex_execution', 'approval_workflow', 'standard_execution'] })
.addNode('permission_denied', permissionDenied, { ends: ['finalize'] })
.addNode('urgent_execution', urgentExecution, { ends: ['finalize'] })
.addNode('complex_execution', complexExecution, { ends: ['finalize'] })
.addNode('approval_workflow', approvalWorkflow, { ends: ['approved_execution', 'finalize'] })
.addNode('approved_execution', approvedExecution, { ends: ['finalize'] })
.addNode('standard_execution', standardExecution, { ends: ['finalize'] })
.addNode('finalize', finalize)
.setEntryPoint('evaluate_conditions')
.addEdge('finalize', END);

// 内存存储器
const memorySaver = new MemorySaver();

// 编译图
const app = conditionalExecutionGraph.compile({
checkpointer: memorySaver,
});

// 条件执行演示
async function demonstrateConditionalExecution() {
console.log('🎯 条件执行演示');
console.log('='.repeat(60));

const testCases = [
{
input: '紧急修复生产环境bug',
userRole: 'admin',
description: '紧急任务 + 管理员权限',
},
{
input: '删除用户数据库中的所有记录',
userRole: 'user',
description: '危险操作 + 普通用户',
},
{
input:
'这是一个非常复杂的任务,需要分析大量数据,处理多个步骤,涉及多个系统的集成,需要仔细规划和执行',
userRole: 'manager',
description: '复杂任务 + 管理员权限',
},
{
input: '删除临时文件',
userRole: 'admin',
description: '需要审批 + 管理员权限',
},
{
input: '查看用户列表',
userRole: 'user',
description: '标准操作 + 普通用户',
},
];

for (let i = 0; i < testCases.length; i++) {
const testCase = testCases[i];
console.log(`\n🧪 测试案例 ${i + 1}: ${testCase.description}`);
console.log(`📝 输入: "${testCase.input}"`);
console.log(`👤 用户角色: ${testCase.userRole}`);
console.log('-'.repeat(50));

try {
const result = await app.invoke(
{
input: testCase.input,
userRole: testCase.userRole,
priority: '',
conditions: {},
result: '',
executionPath: [],
metadata: {},
},
{
configurable: {
thread_id: `test-case-${i + 1}`,
},
}
);

console.log('\n✅ 执行完成');
console.log(`📊 执行状态: ${result.metadata?.status}`);
console.log(`🔄 执行路径: ${result.executionPath?.join(' → ')}`);

if (result.result) {
console.log(`📄 结果摘要: ${result.result.split('\n')[0]}`);
}
} catch (error) {
console.error('❌ 执行失败:', error);
}

console.log('\n' + '='.repeat(60));
}
}

// 条件执行分析器
class ConditionalExecutionAnalyzer {
private executionStats: Map<
string,
{
count: number;
successRate: number;
avgExecutionTime: number;
conditions: any[];
}
> = new Map();

recordExecution(
path: string[],
conditions: any,
success: boolean,
executionTime: number
) {
const pathKey = path.join('→');

if (!this.executionStats.has(pathKey)) {
this.executionStats.set(pathKey, {
count: 0,
successRate: 0,
avgExecutionTime: 0,
conditions: [],
});
}

const stats = this.executionStats.get(pathKey)!;
stats.count++;
stats.conditions.push(conditions);
stats.avgExecutionTime =
(stats.avgExecutionTime * (stats.count - 1) + executionTime) /
stats.count;
stats.successRate =
(stats.successRate * (stats.count - 1) + (success ? 1 : 0)) / stats.count;
}

generateAnalysisReport(): string {
let report = '\n📊 条件执行分析报告\n';
report += '='.repeat(50) + '\n';

for (const [path, stats] of this.executionStats) {
report += `\n🔄 执行路径: ${path}\n`;
report += ` 📞 执行次数: ${stats.count}\n`;
report += ` ✅ 成功率: ${(stats.successRate * 100).toFixed(1)}%\n`;
report += ` ⏱️ 平均执行时间: ${stats.avgExecutionTime.toFixed(2)}ms\n`;

// 分析条件模式
const conditionPatterns = this.analyzeConditionPatterns(stats.conditions);
report += ` 📋 条件模式: ${JSON.stringify(conditionPatterns)}\n`;
}

return report;
}

private analyzeConditionPatterns(conditions: any[]): any {
const patterns = {
hasPermission: 0,
isUrgent: 0,
isComplexTask: 0,
requiresApproval: 0,
};

conditions.forEach((condition) => {
if (condition.hasPermission) patterns.hasPermission++;
if (condition.isUrgent) patterns.isUrgent++;
if (condition.isComplexTask) patterns.isComplexTask++;
if (condition.requiresApproval) patterns.requiresApproval++;
});

return patterns;
}
}

// 运行演示
if (require.main === module) {
demonstrateConditionalExecution().catch(console.error);
}

export {
conditionalExecutionGraph,
app,
demonstrateConditionalExecution,
ConditionalExecutionAnalyzer,
};

/*
执行结果:
Line:8 🌭 path.resolve(__dirname, '.env') /Users/loock/myFile/langgraphjs-tutorial/websites/examples/utils/.env
🎯 条件执行演示
============================================================

🧪 测试案例 1: 紧急任务 + 管理员权限
📝 输入: "紧急修复生产环境bug"
👤 用户角色: admin
--------------------------------------------------
🔍 评估执行条件
📋 条件评估结果: {
hasPermission: true,
isUrgent: true,
isComplexTask: false,
requiresApproval: false
}
🚦 根据条件进行路由
🚨 紧急任务,直接执行
🚨 紧急任务执行中...
✅ 紧急任务处理完成
🏁 条件执行完成

📋 执行摘要:
🎯 最终结果: 紧急处理完成: 紧急修复生产环境bug

执行详情:
- 优先级: 最高
- 处理时间: 立即
- 跳过常规审批流程
- 结果: 任务已紧急处理完成
🔄 执行路径: condition_evaluator → urgent_handler → urgent_executor
📊 条件评估: {
"hasPermission": true,
"isUrgent": true,
"isComplexTask": false,
"requiresApproval": false
}
📈 执行状态: completed

✅ 执行完成
📊 执行状态: completed
🔄 执行路径: condition_evaluator → urgent_handler → urgent_executor
📄 结果摘要: 紧急处理完成: 紧急修复生产环境bug

============================================================

🧪 测试案例 2: 危险操作 + 普通用户
📝 输入: "删除用户数据库中的所有记录"
👤 用户角色: user
--------------------------------------------------
🔍 评估执行条件
📋 条件评估结果: {
hasPermission: false,
isUrgent: false,
isComplexTask: false,
requiresApproval: true
}
🚦 根据条件进行路由
❌ 需要审批但权限不足
🚫 权限不足,拒绝执行
🏁 条件执行完成

📋 执行摘要:
🎯 最终结果: 权限不足: 用户角色 "user" 无法执行此操作。请联系管理员获取必要权限。
🔄 执行路径: condition_evaluator → permission_denied → permission_denied_handler
📊 条件评估: {
"hasPermission": false,
"isUrgent": false,
"isComplexTask": false,
"requiresApproval": true
}
📈 执行状态: denied

✅ 执行完成
📊 执行状态: denied
🔄 执行路径: condition_evaluator → permission_denied → permission_denied_handler
📄 结果摘要: 权限不足: 用户角色 "user" 无法执行此操作。请联系管理员获取必要权限。

============================================================

🧪 测试案例 3: 复杂任务 + 管理员权限
📝 输入: "这是一个非常复杂的任务,需要分析大量数据,处理多个步骤,涉及多个系统的集成,需要仔细规划和执行"
👤 用户角色: manager
--------------------------------------------------
🔍 评估执行条件
📋 条件评估结果: {
hasPermission: true,
isUrgent: false,
isComplexTask: false,
requiresApproval: false
}
🚦 根据条件进行路由
📝 标准处理流程
📝 标准任务执行中...
✅ 标准任务处理完成
🏁 条件执行完成

📋 执行摘要:
🎯 最终结果: 标准处理完成: 这是一个非常复杂的任务,需要分析大量数据,处理多个步骤,涉及多个系统的集成,需要仔细规划和执行

执行详情:
- 处理类型: 标准流程
- 权限检查: 通过
- 执行时间: 正常
- 结果: 任务按标准流程成功完成
🔄 执行路径: condition_evaluator → standard_handler → standard_executor
📊 条件评估: {
"hasPermission": true,
"isUrgent": false,
"isComplexTask": false,
"requiresApproval": false
}
📈 执行状态: completed

✅ 执行完成
📊 执行状态: completed
🔄 执行路径: condition_evaluator → standard_handler → standard_executor
📄 结果摘要: 标准处理完成: 这是一个非常复杂的任务,需要分析大量数据,处理多个步骤,涉及多个系统的集成,需要仔细规划和执行

============================================================

🧪 测试案例 4: 需要审批 + 管理员权限
📝 输入: "删除临时文件"
👤 用户角色: admin
--------------------------------------------------
🔍 评估执行条件
📋 条件评估结果: {
hasPermission: true,
isUrgent: false,
isComplexTask: false,
requiresApproval: true
}
🚦 根据条件进行路由
✋ 需要审批流程
✋ 进入审批工作流...
✅ 审批通过,继续执行
✅ 审批通过,执行任务...
✅ 审批任务执行完成
🏁 条件执行完成

📋 执行摘要:
🎯 最终结果: 审批通过后执行完成: 删除临时文件

执行详情:
- 已通过审批流程
- 风险评估: 可接受
- 执行权限: 已授权
- 结果: 任务在审批监督下成功完成
🔄 执行路径: condition_evaluator → approval_required → approval_granted → approved_executor
📊 条件评估: {
"hasPermission": true,
"isUrgent": false,
"isComplexTask": false,
"requiresApproval": true
}
📈 执行状态: completed

✅ 执行完成
📊 执行状态: completed
🔄 执行路径: condition_evaluator → approval_required → approval_granted → approved_executor
📄 结果摘要: 审批通过后执行完成: 删除临时文件

============================================================

🧪 测试案例 5: 标准操作 + 普通用户
📝 输入: "查看用户列表"
👤 用户角色: user
--------------------------------------------------
🔍 评估执行条件
📋 条件评估结果: {
hasPermission: false,
isUrgent: false,
isComplexTask: false,
requiresApproval: false
}
🚦 根据条件进行路由
📝 标准处理流程
📝 标准任务执行中...
✅ 标准任务处理完成
🏁 条件执行完成

📋 执行摘要:
🎯 最终结果: 标准处理完成: 查看用户列表

执行详情:
- 处理类型: 标准流程
- 权限检查: 通过
- 执行时间: 正常
- 结果: 任务按标准流程成功完成
🔄 执行路径: condition_evaluator → standard_handler → standard_executor
📊 条件评估: {
"hasPermission": false,
"isUrgent": false,
"isComplexTask": false,
"requiresApproval": false
}
📈 执行状态: completed

✅ 执行完成
📊 执行状态: completed
🔄 执行路径: condition_evaluator → standard_handler → standard_executor
📄 结果摘要: 标准处理完成: 查看用户列表

============================================================
*/

实际应用场景

智能工作流管理

Command 对象在复杂工作流中的应用:

状态机实现

状态机实现
/**
* ============================================================================
* 状态机 - State Machine with Command Objects
* ============================================================================
*
* 📖 概述
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* 本示例展示如何使用 Command 对象实现标准状态机模式。支持多种状态(空闲、处理、
* 等待、完成、错误)之间的转换,提供状态历史追踪和错误重试机制。
*
* 🎯 核心功能
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* 1️⃣ 状态转换:在 idle、processing、waiting、completed、error 状态间转换
* 2️⃣ 状态历史:记录完整的状态转换历史便于调试
* 3️⃣ 条件转换:基于条件决定下一个状态
* 4️⃣ 错误处理:错误状态支持自动重试机制
* 5️⃣ 状态分析:提供状态转换统计和执行时间分析
*
* 💡 实现思路
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* • 状态定义:使用 TypeScript 类型系统定义所有可能的状态
* • 状态节点:每个状态对应一个节点函数
* • 转换逻辑:使用 Command 对象的 goto 实现状态转换
* • StateMachineAnalyzer:分析状态转换模式和性能
*
* 🚀 使用方式
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* // 运行示例
* $ npx esno 实用功能/Command对象/state-machine.ts
*
* ⚠️ 注意事项
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* • 确保所有状态转换都是合法的
* • 避免出现死循环状态
* • 建议设置最大状态转换次数限制
*
* @author 程哥
* @version 1.0.0
* @updated 2025-11
*/

import '../../utils/loadEnv';
import { StateGraph, Annotation, END, Command } from '@langchain/langgraph';
import { MemorySaver } from '@langchain/langgraph';

// 定义状态机状态
type StateMachineState =
| 'idle'
| 'processing'
| 'waiting'
| 'completed'
| 'error';

// 定义状态结构
const StateAnnotation = Annotation.Root({
input: Annotation<string>,
currentState: Annotation<StateMachineState>,
data: Annotation<any>,
result: Annotation<string>,
stateHistory: Annotation<StateMachineState[]>,
metadata: Annotation<Record<string, any>>,
idleWaitCount: Annotation<number>({
reducer: (_, update) => update,
default: () => 0,
}),
});

// 状态机初始化节点
function initializeStateMachine(state: typeof StateAnnotation.State) {
console.log('🚀 初始化状态机');

return new Command({
update: {
currentState: 'idle' as StateMachineState,
stateHistory: ['idle'],
metadata: {
...state.metadata,
startTime: new Date().toISOString(),
initialized: true,
},
},
goto: 'idle_state',
});
}

// 空闲状态
function idleState(state: typeof StateAnnotation.State) {
console.log('😴 进入空闲状态');

const { input } = state;
const waitCount = state.idleWaitCount ?? 0;

if (input && input.trim()) {
console.log('📥 接收到输入,开始处理');

return new Command({
update: {
currentState: 'processing' as StateMachineState,
stateHistory: [...(state.stateHistory || []), 'processing'],
data: { input: input.trim() },
idleWaitCount: 0,
},
goto: 'processing_state',
});
}

// 如果等待次数超过限制,结束执行
const MAX_IDLE_WAIT = 3;
if (waitCount >= MAX_IDLE_WAIT) {
console.log(`⏰ 等待超时(${MAX_IDLE_WAIT}次),结束执行`);

return new Command({
update: {
currentState: 'completed' as StateMachineState,
stateHistory: [...(state.stateHistory || []), 'completed'],
result: '等待输入超时,已自动结束',
metadata: {
...state.metadata,
timeoutReason: 'idle_wait_exceeded',
completedAt: new Date().toISOString(),
},
},
goto: END,
});
}

console.log(`⏳ 等待输入... (${waitCount + 1}/${MAX_IDLE_WAIT})`);

return new Command({
update: {
idleWaitCount: waitCount + 1,
metadata: {
...state.metadata,
lastCheck: new Date().toISOString(),
},
},
goto: 'idle_state',
});
}

// 处理状态
function processingState(state: typeof StateAnnotation.State) {
console.log('⚙️ 进入处理状态');

const { data } = state;

// 模拟处理逻辑
const processingSteps = ['验证输入', '解析数据', '执行业务逻辑', '生成结果'];

console.log('📋 处理步骤:', processingSteps);

// 模拟处理时间和可能的错误
const processingTime = Math.random() * 2000 + 500; // 0.5-2.5秒
const hasError = Math.random() < 0.2; // 20% 错误率

if (hasError) {
console.log('❌ 处理过程中发生错误');

return new Command({
update: {
currentState: 'error' as StateMachineState,
stateHistory: [...(state.stateHistory || []), 'error'],
metadata: {
...state.metadata,
error: '处理失败:模拟错误',
errorTime: new Date().toISOString(),
},
},
goto: 'error_state',
});
}

// 检查是否需要等待外部输入
const needsWaiting =
data?.input?.toLowerCase().includes('等待') ||
data?.input?.toLowerCase().includes('wait');

if (needsWaiting) {
console.log('⏸️ 需要等待外部确认');

return new Command({
update: {
currentState: 'waiting' as StateMachineState,
stateHistory: [...(state.stateHistory || []), 'waiting'],
metadata: {
...state.metadata,
waitingReason: '等待外部确认',
waitingStartTime: new Date().toISOString(),
},
},
goto: 'waiting_state',
});
}

console.log('✅ 处理完成');

const result = `处理完成: ${data?.input}

处理详情:
- 处理时间: ${processingTime.toFixed(0)}ms
- 处理步骤: ${processingSteps.join(' → ')}
- 状态转换: ${state.stateHistory?.join(' → ')} → completed`;

return new Command({
update: {
currentState: 'completed' as StateMachineState,
stateHistory: [...(state.stateHistory || []), 'completed'],
result,
metadata: {
...state.metadata,
processingTime,
completedAt: new Date().toISOString(),
},
},
goto: 'completed_state',
});
}

// 等待状态
function waitingState(state: typeof StateAnnotation.State) {
console.log('⏳ 进入等待状态');

// 模拟等待时间
const waitTime = Math.random() * 3000 + 1000; // 1-4秒

console.log(`⏰ 等待 ${waitTime.toFixed(0)}ms...`);

// 模拟外部确认
setTimeout(() => {
const confirmed = Math.random() > 0.3; // 70% 确认率

if (confirmed) {
console.log('✅ 外部确认成功,继续处理');
} else {
console.log('❌ 外部确认失败,返回错误状态');
}
}, waitTime);

// 简化处理:直接模拟确认结果
const confirmed = Math.random() > 0.3;

if (confirmed) {
console.log('✅ 等待完成,继续处理');

const result = `等待完成后处理: ${state.data?.input}

等待详情:
- 等待时间: ${waitTime.toFixed(0)}ms
- 等待原因: ${state.metadata?.waitingReason}
- 确认状态: 成功
- 状态转换: ${state.stateHistory?.join(' → ')} → completed`;

return new Command({
update: {
currentState: 'completed' as StateMachineState,
stateHistory: [...(state.stateHistory || []), 'completed'],
result,
metadata: {
...state.metadata,
waitTime,
confirmedAt: new Date().toISOString(),
},
},
goto: 'completed_state',
});
} else {
console.log('❌ 等待超时或确认失败');

return new Command({
update: {
currentState: 'error' as StateMachineState,
stateHistory: [...(state.stateHistory || []), 'error'],
metadata: {
...state.metadata,
error: '等待超时或确认失败',
errorTime: new Date().toISOString(),
},
},
goto: 'error_state',
});
}
}

// 完成状态
function completedState(state: typeof StateAnnotation.State) {
console.log('🎉 进入完成状态');

const { result, stateHistory, metadata } = state;

console.log('\n📋 状态机执行摘要:');
console.log(`🎯 最终结果: ${result}`);
console.log(`🔄 状态转换: ${stateHistory?.join(' → ')}`);
console.log(`⏱️ 总执行时间: ${metadata?.processingTime || 'N/A'}ms`);
console.log(`📅 完成时间: ${metadata?.completedAt || 'N/A'}`);

return {
currentState: 'completed' as StateMachineState,
result,
stateHistory,
metadata: {
...metadata,
finalizedAt: new Date().toISOString(),
success: true,
},
};
}

// 错误状态
function errorState(state: typeof StateAnnotation.State) {
console.log('💥 进入错误状态');

const { stateHistory, metadata } = state;

const errorResult = `状态机执行失败

错误详情:
- 错误信息: ${metadata?.error}
- 发生时间: ${metadata?.errorTime}
- 状态转换: ${stateHistory?.join(' → ')}
- 失败状态: ${state.currentState}`;

console.log('\n❌ 状态机错误摘要:');
console.log(`💥 错误信息: ${metadata?.error}`);
console.log(`🔄 状态转换: ${stateHistory?.join(' → ')}`);
console.log(`📅 错误时间: ${metadata?.errorTime}`);

// 检查是否可以重试
const canRetry = !metadata?.retryCount || metadata.retryCount < 3;

if (canRetry) {
console.log('🔄 尝试重新开始...');

return new Command({
update: {
currentState: 'idle' as StateMachineState,
stateHistory: [...(stateHistory || []), 'idle'],
metadata: {
...metadata,
retryCount: (metadata?.retryCount || 0) + 1,
lastRetryTime: new Date().toISOString(),
},
},
goto: 'idle_state',
});
}

console.log('❌ 重试次数已达上限,终止执行');

return {
currentState: 'error' as StateMachineState,
result: errorResult,
stateHistory,
metadata: {
...metadata,
finalizedAt: new Date().toISOString(),
success: false,
maxRetriesReached: true,
},
};
}

// 构建状态机图
const stateMachineGraph = new StateGraph(StateAnnotation)
.addNode('initialize', initializeStateMachine, { ends: ['idle_state'] })
.addNode('idle_state', idleState, { ends: ['processing_state', 'idle_state'] })
.addNode('processing_state', processingState, { ends: ['error_state', 'waiting_state', 'completed_state'] })
.addNode('waiting_state', waitingState, { ends: ['completed_state', 'error_state'] })
.addNode('completed_state', completedState, { ends: [END] })
.addNode('error_state', errorState, { ends: ['idle_state', END] })
.setEntryPoint('initialize')
.addEdge('completed_state', END)
.addEdge('error_state', END);

// 内存存储器
const memorySaver = new MemorySaver();

// 编译图
const app = stateMachineGraph.compile({
checkpointer: memorySaver,
});

// 状态机演示
async function demonstrateStateMachine() {
console.log('🎰 状态机演示');
console.log('='.repeat(60));

const testCases = [
{
input: '处理用户数据',
description: '正常处理流程',
},
{
input: '等待用户确认后处理订单',
description: '需要等待的处理流程',
},
{
input: '复杂的数据分析任务',
description: '可能出错的处理流程',
},
{
input: '',
description: '空输入测试',
},
];

for (let i = 0; i < testCases.length; i++) {
const testCase = testCases[i];
console.log(`\n🧪 测试案例 ${i + 1}: ${testCase.description}`);
console.log(`📝 输入: "${testCase.input}"`);
console.log('-'.repeat(50));

try {
const result = await app.invoke(
{
input: testCase.input,
currentState: 'idle' as StateMachineState,
data: {},
result: '',
stateHistory: [],
metadata: {},
},
{ configurable: { thread_id: `test-case-${i + 1}` } }
);

console.log('\n✅ 状态机执行完成');
console.log(`📊 最终状态: ${result.currentState}`);
console.log(`🔄 状态历史: ${result.stateHistory?.join(' → ')}`);
console.log(`📈 执行成功: ${result.metadata?.success ? '是' : '否'}`);

if (result.result) {
console.log(`📄 结果摘要: ${result.result.split('\n')[0]}`);
}
} catch (error) {
console.error('❌ 状态机执行失败:', error);
}

console.log('\n' + '='.repeat(60));
}
}

// 状态机分析器
class StateMachineAnalyzer {
private stateTransitions: Map<string, number> = new Map();
private stateExecutionTimes: Map<StateMachineState, number[]> = new Map();
private errorPatterns: Map<string, number> = new Map();

recordStateTransition(from: StateMachineState, to: StateMachineState) {
const transition = `${from}${to}`;
this.stateTransitions.set(
transition,
(this.stateTransitions.get(transition) || 0) + 1
);
}

recordStateExecutionTime(state: StateMachineState, executionTime: number) {
if (!this.stateExecutionTimes.has(state)) {
this.stateExecutionTimes.set(state, []);
}
this.stateExecutionTimes.get(state)!.push(executionTime);
}

recordError(errorType: string) {
this.errorPatterns.set(
errorType,
(this.errorPatterns.get(errorType) || 0) + 1
);
}

generateAnalysisReport(): string {
let report = '\n📊 状态机分析报告\n';
report += '='.repeat(50) + '\n';

// 状态转换统计
report += '\n🔄 状态转换统计:\n';
for (const [transition, count] of this.stateTransitions) {
report += ` ${transition}: ${count} 次\n`;
}

// 状态执行时间统计
report += '\n⏱️ 状态执行时间统计:\n';
for (const [state, times] of this.stateExecutionTimes) {
const avgTime = times.reduce((sum, time) => sum + time, 0) / times.length;
const minTime = Math.min(...times);
const maxTime = Math.max(...times);

report += ` ${state}:\n`;
report += ` 平均时间: ${avgTime.toFixed(2)}ms\n`;
report += ` 最短时间: ${minTime.toFixed(2)}ms\n`;
report += ` 最长时间: ${maxTime.toFixed(2)}ms\n`;
report += ` 执行次数: ${times.length}\n`;
}

// 错误模式统计
report += '\n❌ 错误模式统计:\n';
for (const [errorType, count] of this.errorPatterns) {
report += ` ${errorType}: ${count} 次\n`;
}

return report;
}
}

// 运行演示
if (require.main === module) {
demonstrateStateMachine().catch(console.error);
}

export {
stateMachineGraph,
app,
demonstrateStateMachine,
StateMachineAnalyzer,
};

/*
执行结果:
Line:8 🌭 path.resolve(__dirname, '.env') /Users/loock/myFile/langgraphjs-tutorial/websites/examples/utils/.env
🎰 状态机演示
============================================================

🧪 测试案例 1: 正常处理流程
📝 输入: "处理用户数据"
--------------------------------------------------
🚀 初始化状态机
😴 进入空闲状态
📥 接收到输入,开始处理
⚙️ 进入处理状态
📋 处理步骤: [ '验证输入', '解析数据', '执行业务逻辑', '生成结果' ]
✅ 处理完成
🎉 进入完成状态

📋 状态机执行摘要:
🎯 最终结果: 处理完成: 处理用户数据

处理详情:
- 处理时间: 1719ms
- 处理步骤: 验证输入 → 解析数据 → 执行业务逻辑 → 生成结果
- 状态转换: idle → processing → completed
🔄 状态转换: idle → processing → completed
⏱️ 总执行时间: 1718.628938543109ms
📅 完成时间: 2025-11-16T16:23:28.885Z

✅ 状态机执行完成
📊 最终状态: completed
🔄 状态历史: idle → processing → completed
📈 执行成功: 是
📄 结果摘要: 处理完成: 处理用户数据

============================================================

🧪 测试案例 2: 需要等待的处理流程
📝 输入: "等待用户确认后处理订单"
--------------------------------------------------
🚀 初始化状态机
😴 进入空闲状态
📥 接收到输入,开始处理
⚙️ 进入处理状态
📋 处理步骤: [ '验证输入', '解析数据', '执行业务逻辑', '生成结果' ]
⏸️ 需要等待外部确认
⏳ 进入等待状态
⏰ 等待 2770ms...
✅ 等待完成,继续处理
🎉 进入完成状态

📋 状态机执行摘要:
🎯 最终结果: 等待完成后处理: 等待用户确认后处理订单

等待详情:
- 等待时间: 2770ms
- 等待原因: 等待外部确认
- 确认状态: 成功
- 状态转换: idle → processing → waiting → completed
🔄 状态转换: idle → processing → waiting → completed
⏱️ 总执行时间: N/Ams
📅 完成时间: N/A

✅ 状态机执行完成
📊 最终状态: completed
🔄 状态历史: idle → processing → waiting → completed
📈 执行成功: 是
📄 结果摘要: 等待完成后处理: 等待用户确认后处理订单

============================================================

🧪 测试案例 3: 可能出错的处理流程
📝 输入: "复杂的数据分析任务"
--------------------------------------------------
🚀 初始化状态机
😴 进入空闲状态
📥 接收到输入,开始处理
⚙️ 进入处理状态
📋 处理步骤: [ '验证输入', '解析数据', '执行业务逻辑', '生成结果' ]
✅ 处理完成
🎉 进入完成状态

📋 状态机执行摘要:
🎯 最终结果: 处理完成: 复杂的数据分析任务

处理详情:
- 处理时间: 1090ms
- 处理步骤: 验证输入 → 解析数据 → 执行业务逻辑 → 生成结果
- 状态转换: idle → processing → completed
🔄 状态转换: idle → processing → completed
⏱️ 总执行时间: 1090.139915754114ms
📅 完成时间: 2025-11-16T16:23:28.905Z

✅ 状态机执行完成
📊 最终状态: completed
🔄 状态历史: idle → processing → completed
📈 执行成功: 是
📄 结果摘要: 处理完成: 复杂的数据分析任务

============================================================

🧪 测试案例 4: 空输入测试
📝 输入: ""
--------------------------------------------------
🚀 初始化状态机
😴 进入空闲状态
⏳ 等待输入... (1/3)
😴 进入空闲状态
⏳ 等待输入... (2/3)
😴 进入空闲状态
⏳ 等待输入... (3/3)
😴 进入空闲状态
⏰ 等待超时(3次),结束执行

✅ 状态机执行完成
📊 最终状态: completed
🔄 状态历史: idle → completed
📈 执行成功: 否
📄 结果摘要: 等待输入超时,已自动结束

============================================================
✅ 外部确认成功,继续处理
*/

错误恢复

错误恢复机制
/**
* ============================================================================
* 错误恢复 - Error Recovery with Command Objects
* ============================================================================
*
* 📖 概述
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* 本示例展示如何使用 Command 对象实现智能错误恢复机制。支持自动重试、指数退避、
* 错误分类、恢复策略选择等功能,提高系统的容错能力和稳定性。
*
* 🎯 核心功能
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* 1️⃣ 自动重试:失败操作自动重试,支持最大重试次数限制
* 2️⃣ 指数退避:重试间隔时间呈指数增长,避免资源浪费
* 3️⃣ 错误分类:智能识别错误类型(网络、超时、验证、资源等)
* 4️⃣ 恢复策略:针对不同错误类型应用对应的恢复策略
* 5️⃣ 错误统计:记录错误发生次数和恢复成功率
*
* 💡 实现思路
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* • 错误处理器:捕获错误并决定是否重试
* • 重试延迟:使用指数退避算法计算延迟时间
* • AdvancedErrorRecovery:提供错误分类和恢复策略管理
* • 统计分析:追踪错误模式和恢复效果
*
* 🚀 使用方式
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* // 运行示例
* $ npx esno 实用功能/Command对象/error-recovery.ts
*
* ⚠️ 注意事项
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* • 设置合理的最大重试次数避免无限循环
* • 指数退避可以减轻系统压力
* • 建议记录错误日志用于问题排查
*
* @author 程哥
* @version 1.0.0
* @updated 2025-11
*/

import '../../utils/loadEnv';
import { StateGraph, Annotation, Command, END } from '@langchain/langgraph';
import { BaseMessage, HumanMessage, AIMessage } from '@langchain/core/messages';

// 定义状态,包含错误恢复相关字段
const ErrorRecoveryStateAnnotation = Annotation.Root({
messages: Annotation<BaseMessage[]>({
reducer: (x, y) => x.concat(y),
}),
currentStep: Annotation<string>({
reducer: (x, y) => y ?? x,
default: () => 'start',
}),
errorCount: Annotation<number>({
reducer: (x, y) => y ?? x,
default: () => 0,
}),
lastError: Annotation<string | null>({
reducer: (x, y) => y ?? x,
default: () => null,
}),
retryAttempts: Annotation<number>({
reducer: (x, y) => y ?? x,
default: () => 0,
}),
maxRetries: Annotation<number>({
reducer: (x, y) => y ?? x,
default: () => 3,
}),
});

// 模拟可能失败的业务逻辑
function riskyBusinessLogic(input: string): string {
// 模拟随机失败
if (Math.random() < 0.6) {
throw new Error(`业务逻辑执行失败: ${input}`);
}
return `成功处理: ${input}`;
}

// 主处理节点 - 可能失败的操作
async function mainProcessingNode(
state: typeof ErrorRecoveryStateAnnotation.State
) {
const lastMessage = state.messages[state.messages.length - 1];

try {
console.log(`尝试处理: ${lastMessage.content}`);

// 执行可能失败的业务逻辑
const result = riskyBusinessLogic(lastMessage.content as string);

return new Command({
update: {
messages: [new AIMessage(result)],
currentStep: 'success',
errorCount: 0, // 重置错误计数
retryAttempts: 0, // 重置重试次数
lastError: null,
},
goto: 'success_handler',
});
} catch (error) {
console.log(`处理失败: ${error.message}`);

return new Command({
update: {
errorCount: state.errorCount + 1,
lastError: error.message,
currentStep: 'error',
},
goto: 'error_handler',
});
}
}

// 错误处理节点 - 决定是否重试
async function errorHandlerNode(
state: typeof ErrorRecoveryStateAnnotation.State
) {
const canRetry = state.retryAttempts < state.maxRetries;
const shouldRetry = canRetry && state.errorCount < 5; // 额外的安全检查

if (shouldRetry) {
console.log(`准备重试 (${state.retryAttempts + 1}/${state.maxRetries})`);

return new Command({
update: {
retryAttempts: state.retryAttempts + 1,
currentStep: 'retrying',
messages: [new AIMessage(`重试第 ${state.retryAttempts + 1} 次...`)],
},
goto: 'retry_delay',
});
} else {
console.log('达到最大重试次数,转入失败处理');

return new Command({
update: {
currentStep: 'failed',
messages: [
new AIMessage(
`处理失败,已达到最大重试次数。最后错误: ${state.lastError}`
),
],
},
goto: 'failure_handler',
});
}
}

// 重试延迟节点 - 添加延迟后重试
async function retryDelayNode(
state: typeof ErrorRecoveryStateAnnotation.State
) {
// 指数退避延迟
const delayMs = Math.min(1000 * Math.pow(2, state.retryAttempts - 1), 10000);
console.log(`等待 ${delayMs}ms 后重试...`);

// 模拟延迟
await new Promise((resolve) => setTimeout(resolve, delayMs));

return new Command({
update: {
currentStep: 'retrying',
},
goto: 'main_processing',
});
}

// 成功处理节点
async function successHandlerNode(
state: typeof ErrorRecoveryStateAnnotation.State
) {
console.log('处理成功完成');

return new Command({
update: {
currentStep: 'completed',
messages: [new AIMessage('任务成功完成!')],
},
goto: END,
});
}

// 失败处理节点
async function failureHandlerNode(
state: typeof ErrorRecoveryStateAnnotation.State
) {
console.log('处理最终失败');

return new Command({
update: {
currentStep: 'terminated',
messages: [new AIMessage('任务处理失败,请检查输入或联系管理员。')],
},
goto: END,
});
}

// 创建错误恢复图
const errorRecoveryGraph = new StateGraph(ErrorRecoveryStateAnnotation)
.addNode('main_processing', mainProcessingNode, { ends: ['success_handler', 'error_handler'] })
.addNode('error_handler', errorHandlerNode, { ends: ['retry_delay', 'failure_handler'] })
.addNode('retry_delay', retryDelayNode, { ends: ['main_processing'] })
.addNode('success_handler', successHandlerNode, { ends: [END] })
.addNode('failure_handler', failureHandlerNode, { ends: [END] })
.addEdge('__start__', 'main_processing')
.addEdge('retry_delay', 'main_processing');

const compiledErrorRecoveryGraph = errorRecoveryGraph.compile();

// 高级错误恢复策略
export class AdvancedErrorRecovery {
private errorPatterns: Map<string, string> = new Map();
private recoveryStrategies: Map<string, (state: any) => Command> = new Map();

constructor() {
this.initializeErrorPatterns();
this.initializeRecoveryStrategies();
}

private initializeErrorPatterns() {
// 定义错误模式和对应的恢复策略
this.errorPatterns.set('NETWORK_ERROR', 'network_recovery');
this.errorPatterns.set('TIMEOUT_ERROR', 'timeout_recovery');
this.errorPatterns.set('VALIDATION_ERROR', 'validation_recovery');
this.errorPatterns.set('RESOURCE_ERROR', 'resource_recovery');
}

private initializeRecoveryStrategies() {
// 网络错误恢复
this.recoveryStrategies.set('network_recovery', (state) => {
return new Command({
update: {
retryAttempts: state.retryAttempts + 1,
currentStep: 'network_recovery',
},
goto: state.retryAttempts < 5 ? 'retry_with_backoff' : 'escalate_error',
});
});

// 超时错误恢复
this.recoveryStrategies.set('timeout_recovery', (state) => {
return new Command({
update: {
retryAttempts: state.retryAttempts + 1,
currentStep: 'timeout_recovery',
timeout: Math.min(state.timeout * 2, 30000), // 增加超时时间
},
goto: 'retry_with_longer_timeout',
});
});

// 验证错误恢复
this.recoveryStrategies.set('validation_recovery', (state) => {
return new Command({
update: {
currentStep: 'validation_recovery',
messages: [
...state.messages,
new AIMessage('输入验证失败,请提供有效输入'),
],
},
goto: 'request_user_input',
});
});

// 资源错误恢复
this.recoveryStrategies.set('resource_recovery', (state) => {
return new Command({
update: {
currentStep: 'resource_recovery',
retryAttempts: state.retryAttempts + 1,
},
goto: 'wait_for_resources',
});
});
}

// 智能错误分类
classifyError(error: Error): string {
const message = error.message.toLowerCase();

if (message.includes('network') || message.includes('connection')) {
return 'NETWORK_ERROR';
} else if (message.includes('timeout')) {
return 'TIMEOUT_ERROR';
} else if (message.includes('validation') || message.includes('invalid')) {
return 'VALIDATION_ERROR';
} else if (message.includes('resource') || message.includes('limit')) {
return 'RESOURCE_ERROR';
} else {
return 'UNKNOWN_ERROR';
}
}

// 获取恢复策略
getRecoveryStrategy(errorType: string): (state: any) => Command {
const strategyKey = this.errorPatterns.get(errorType);
if (strategyKey && this.recoveryStrategies.has(strategyKey)) {
return this.recoveryStrategies.get(strategyKey)!;
}

// 默认恢复策略
return (state) =>
new Command({
update: {
currentStep: 'unknown_error',
lastError: `未知错误类型: ${errorType}`,
},
goto: 'failure_handler',
});
}
}

// 智能错误恢复节点
const errorRecovery = new AdvancedErrorRecovery();

async function intelligentErrorHandlerNode(
state: typeof ErrorRecoveryStateAnnotation.State
) {
if (!state.lastError) {
return new Command({
goto: 'failure_handler',
});
}

// 分类错误
const error = new Error(state.lastError);
const errorType = errorRecovery.classifyError(error);

console.log(`错误分类: ${errorType}`);

// 获取并执行恢复策略
const recoveryStrategy = errorRecovery.getRecoveryStrategy(errorType);
return recoveryStrategy(state);
}

// 演示错误恢复功能
export async function demonstrateErrorRecovery() {
console.log('=== 错误恢复演示 ===');

const initialState = {
messages: [new HumanMessage('处理这个可能失败的任务')],
};

try {
const result = await compiledErrorRecoveryGraph.invoke(initialState);

console.log('\n=== 最终结果 ===');
console.log('当前步骤:', result.currentStep);
console.log('错误次数:', result.errorCount);
console.log('重试次数:', result.retryAttempts);
console.log(
'最后消息:',
result.messages[result.messages.length - 1].content
);

return result;
} catch (error) {
console.error('图执行失败:', error);
throw error;
}
}

// 错误恢复统计
export class ErrorRecoveryStats {
private stats = {
totalErrors: 0,
recoveredErrors: 0,
failedRecoveries: 0,
errorTypes: new Map<string, number>(),
recoveryTimes: [] as number[],
};

recordError(errorType: string) {
this.stats.totalErrors++;
this.stats.errorTypes.set(
errorType,
(this.stats.errorTypes.get(errorType) || 0) + 1
);
}

recordRecovery(success: boolean, recoveryTime: number) {
if (success) {
this.stats.recoveredErrors++;
this.stats.recoveryTimes.push(recoveryTime);
} else {
this.stats.failedRecoveries++;
}
}

getStats() {
const avgRecoveryTime =
this.stats.recoveryTimes.length > 0
? this.stats.recoveryTimes.reduce((a, b) => a + b, 0) /
this.stats.recoveryTimes.length
: 0;

return {
...this.stats,
recoveryRate:
this.stats.totalErrors > 0
? this.stats.recoveredErrors / this.stats.totalErrors
: 0,
averageRecoveryTime: avgRecoveryTime,
};
}

reset() {
this.stats = {
totalErrors: 0,
recoveredErrors: 0,
failedRecoveries: 0,
errorTypes: new Map(),
recoveryTimes: [],
};
}
}

// 运行示例
if (require.main === module) {
demonstrateErrorRecovery().catch(console.error);
}

/*
执行结果:
Line:8 🌭 path.resolve(__dirname, '.env') /Users/loock/myFile/langgraphjs-tutorial/websites/examples/utils/.env
=== 错误恢复演示 ===
尝试处理: 处理这个可能失败的任务
处理成功完成

=== 最终结果 ===
当前步骤: completed
错误次数: 0
重试次数: 0
最后消息: 任务成功完成!
*/

Command 参数详解

update 参数

用于更新图的状态:

// 基础状态更新
return new Command({
update: {
step: state.step + 1,
status: 'processing',
},
});

// 复杂状态更新
return new Command({
update: {
messages: [...state.messages, newMessage],
userProfile: {
...state.userProfile,
lastActivity: new Date(),
},
},
});

goto 参数

控制执行流程跳转:

// 跳转到指定节点
return new Command({
goto: 'next_node',
});

// 条件跳转
return new Command({
goto: condition ? 'success_node' : 'failure_node',
});

// 跳转到结束
return new Command({
goto: END,
});

graph 参数

切换到不同的子图:

// 切换到专门的子图
return new Command({
graph: 'specialist_workflow',
update: { context: 'switched_to_specialist' },
});

resume 参数

恢复被中断的执行:

// 恢复执行
return new Command({
resume: userApproval,
});

最佳实践

1. 清晰的控制逻辑

// 好的实践:清晰的条件判断
function routingNode(state: typeof StateAnnotation.State) {
const { taskType, priority, userRole } = state;

// 明确的路由逻辑
if (taskType === 'urgent' && priority === 'high') {
return new Command({
update: { assignedTo: 'emergency_team' },
goto: 'emergency_handler',
});
}

if (userRole === 'admin') {
return new Command({
update: { permissions: 'full' },
goto: 'admin_workflow',
});
}

// 默认路径
return new Command({
update: { assignedTo: 'general_team' },
goto: 'standard_handler',
});
}

2. 状态一致性

// 确保状态更新的一致性
function updateWithValidation(state: any, updates: any) {
// 验证更新的有效性
const validatedUpdates = validateStateUpdates(updates);

return new Command({
update: {
...validatedUpdates,
lastUpdated: new Date().toISOString(),
version: state.version + 1,
},
goto: 'next_step',
});
}

3. 错误处理

// 在 Command 中处理错误
function safeCommandNode(state: typeof StateAnnotation.State) {
try {
const result = processBusinessLogic(state);

return new Command({
update: { result, status: 'success' },
goto: 'success_handler',
});
} catch (error) {
return new Command({
update: {
error: error.message,
status: 'failed',
retryCount: (state.retryCount || 0) + 1,
},
goto: state.retryCount < 3 ? 'retry_handler' : 'failure_handler',
});
}
}

4. 可测试性

// 将路由逻辑分离以便测试
function determineNextNode(state: any): string {
if (state.userType === 'premium') {
return 'premium_service';
} else if (state.userType === 'basic') {
return 'basic_service';
} else {
return 'registration_required';
}
}

function routingNodeWithTestableLogic(state: typeof StateAnnotation.State) {
const nextNode = determineNextNode(state);

return new Command({
update: { routedTo: nextNode },
goto: nextNode,
});
}

// 测试路由逻辑
describe('determineNextNode', () => {
it('should route premium users correctly', () => {
const state = { userType: 'premium' };
expect(determineNextNode(state)).toBe('premium_service');
});
});

与其他功能的集成

与人机交互结合

import { interrupt, Command } from '@langchain/langgraph';

function humanApprovalNode(state: typeof StateAnnotation.State) {
const approval = interrupt({
question: '是否批准此操作?',
data: state.pendingOperation,
});

return new Command({
update: {
approved: approval,
approvedAt: new Date().toISOString(),
},
goto: approval ? 'execute_operation' : 'cancel_operation',
});
}

与流式处理结合

function streamingCommandNode(state: typeof StateAnnotation.State) {
// 发送中间结果
const intermediateResult = processPartialData(state.data);

return new Command({
update: {
intermediateResult,
progress: calculateProgress(state),
},
goto: isComplete(state) ? 'finalize' : 'continue_processing',
});
}

调试和监控

Command 执行追踪

class CommandTracker {
private commandHistory: Array<{
timestamp: Date;
nodeId: string;
command: any;
state: any;
}> = [];

trackCommand(nodeId: string, command: any, state: any) {
this.commandHistory.push({
timestamp: new Date(),
nodeId,
command,
state: this.sanitizeState(state),
});
}

getExecutionPath(): string[] {
return this.commandHistory.map(
(entry) => `${entry.nodeId} -> ${entry.command.goto || 'unknown'}`
);
}

private sanitizeState(state: any) {
// 移除敏感信息用于日志记录
const { password, apiKey, ...safeState } = state;
return safeState;
}
}

性能监控

function monitoredCommandNode(state: typeof StateAnnotation.State) {
const startTime = Date.now();

try {
const result = expensiveOperation(state);

const executionTime = Date.now() - startTime;

return new Command({
update: {
result,
performance: {
executionTime,
timestamp: new Date().toISOString(),
},
},
goto: 'next_node',
});
} catch (error) {
const executionTime = Date.now() - startTime;

return new Command({
update: {
error: error.message,
performance: {
executionTime,
failed: true,
},
},
goto: 'error_handler',
});
}
}

常见问题和解决方案

问题 1:Command 对象状态更新不生效

原因:状态更新格式不正确或与 reducer 不兼容

解决方案

// 确保状态更新格式正确
return new Command({
update: {
// 使用正确的状态键
messages: [...state.messages, newMessage], // 而不是直接赋值
step: state.step + 1,
},
goto: 'next_node',
});

问题 2:goto 跳转到不存在的节点

原因:节点名称拼写错误或节点未定义

解决方案

// 使用常量定义节点名称
const NODES = {
PROCESS: 'process_node',
VALIDATE: 'validate_node',
COMPLETE: 'complete_node',
} as const;

return new Command({
goto: NODES.PROCESS, // 使用常量避免拼写错误
});

问题 3:Command 执行顺序混乱

原因:异步操作导致的竞态条件

解决方案

// 确保异步操作的正确处理
async function asyncCommandNode(state: typeof StateAnnotation.State) {
const result = await asyncOperation(state.data);

return new Command({
update: {
result,
processedAt: new Date().toISOString(),
},
goto: 'next_node',
});
}

小结与延伸

Command 对象是 LangGraph 中实现复杂控制流的核心工具。通过合理使用 Command 对象,你可以构建出灵活、可维护的智能应用。掌握 Command 对象的使用技巧,将大大提升你的 LangGraph 应用开发能力。

至此,我们已经完成了第五章实用功能的学习。接下来的章节将深入探讨架构模式和实际项目应用。

Command 对象提示
  • 合理组合状态更新和控制流
  • 保持路由逻辑的清晰和可测试性
  • 注意异步操作的正确处理
  • 使用常量定义节点名称避免错误