🔗 边(Edges)
引言
在 LangGraphJS 中,边(Edges) 定义了节点之间的连接关系和执行顺序。如果你熟悉前端开发,可以将边理解为:
- React Router 的路由配置:定义组件之间的导航关系
- Vue Router 的导航守卫:控制路由跳转的条件和逻辑
- 状态机的转换规则:定义状态之间的转换条件
- 工作流的流程控制:决定任务的执行顺序和分支
边是 LangGraph 应用的"交通指挥员",它们决定了数据和控制流如何在节点之间流动。
与前端开发的类比
- 普通边 ≈ React 组件的直接调用关系
- 条件边 ≈ 条件渲染 (
condition ? <ComponentA /> : <ComponentB />) - 入口点 ≈ 应用的根组件或路由入口
- 条件入口点 ≈ 动态路由或权限控制的入口
概念解释
边的核心特征
🔗 连接性质
- 边连接两个节点,定义执行顺序
- 可以是一对一或一对多的关系
- 支持条件性和无条件连接
⚡ 执行控制
- 决定图的执行路径
- 控制数据在节点间的流动
- 支持动态路由决策
边的类型
📍 普通边(Regular Edges)
graph.addEdge('nodeA', 'nodeB');
- 无条件连接两个节点
- 执行完 nodeA 后直接执行 nodeB
- 最简单的连接方式
🔀 条件边(Conditional Edges)
graph.addConditionalEdges('nodeA', routingFunction, {
'option1': 'nodeB',
'option2': 'nodeC',
'default': END
});
- 根据条件函数的返回值决定下一个节点
- 支持多分支路由
- 实现复杂的业务逻辑
🚪 入口点(Entry Points)
graph.addEdge(START, 'firstNode');
- 定义图的开始执行点
- 通常连接 START 到第一个节点
🎯 条件入口点(Conditional Entry Points)
graph.addConditionalEdges(START, entryFunction, {
'path1': 'nodeA',
'path2': 'nodeB',
'default': END
});
- 根据初始条件决定从哪个节点开始
- 支持动态入口选择
可视化说明
下面的图表展示了不同类型边的工作原理:
图表说明:
- 实线箭头:普通边,无条件执行
- 虚线箭头:条件边,根据条件决定路径
- 菱形节点:决策点,包含路由逻辑
- 圆形节点:特殊节点(START/END)
实践指导
1. 基础边的使用
基础边示例:
import '../../utils/loadEnv';
import { StateGraph, Annotation, START, END } from '@langchain/langgraph';
import { RunnableConfig } from '@langchain/core/runnables';
// 定义基础边示例状态
const BasicEdgeState = Annotation.Root({
step: Annotation<number>(),
message: Annotation<string>(),
data: Annotation<any[]>(),
processed: Annotation<boolean>(),
timestamp: Annotation<Date>(),
});
// 节点函数
const initNode = (
state: typeof BasicEdgeState.State,
config?: RunnableConfig
) => {
console.log('🚀 初始化节点执行');
return {
step: 1,
message: '初始化完成',
data: ['item1', 'item2', 'item3'],
timestamp: new Date(),
};
};
const processNode = (
state: typeof BasicEdgeState.State,
config?: RunnableConfig
) => {
console.log('⚙️ 处理节点执行');
const processedData = state.data.map((item) => `processed_${item}`);
return {
step: state.step + 1,
message: '数据处理完成',
data: processedData,
processed: true,
timestamp: new Date(),
};
};
const validateNode = (
state: typeof BasicEdgeState.State,
config?: RunnableConfig
) => {
console.log('✅ 验证节点执行');
const isValid = state.processed && state.data.length > 0;
return {
step: state.step + 1,
message: isValid ? '验证通过' : '验证失败',
timestamp: new Date(),
};
};
const finalizeNode = (
state: typeof BasicEdgeState.State,
config?: RunnableConfig
) => {
console.log('🏁 完成节点执行');
return {
step: state.step + 1,
message: '流程完成',
timestamp: new Date(),
};
};
// 1. 线性边连接图
const linearGraph = new StateGraph(BasicEdgeState)
.addNode('init', initNode)
.addNode('process', processNode)
.addNode('validate', validateNode)
.addNode('finalize', finalizeNode)
// 线性连接:START -> init -> process -> validate -> finalize -> END
.addEdge(START, 'init')
.addEdge('init', 'process')
.addEdge('process', 'validate')
.addEdge('validate', 'finalize')
.addEdge('finalize', END)
.compile();
// 2. 分支汇聚图
const branchMergeGraph = new StateGraph(BasicEdgeState)
.addNode('start', initNode)
.addNode('branch_a', (state) => ({
...state,
message: '分支A处理',
step: state.step + 1,
}))
.addNode('branch_b', (state) => ({
...state,
message: '分支B处理',
step: state.step + 1,
}))
.addNode('merge', (state) => ({
...state,
message: '分支合并完成',
step: state.step + 1,
}))
// 分支结构:start 分别连接到 branch_a 和 branch_b,然后汇聚到 merge
.addEdge(START, 'start')
.addEdge('start', 'branch_a')
.addEdge('start', 'branch_b')
.addEdge('branch_a', 'merge')
.addEdge('branch_b', 'merge')
.addEdge('merge', END)
.compile();
// 3. 多层级图
const hierarchicalGraph = new StateGraph(BasicEdgeState)
.addNode('level1', (state) => ({
step: 1,
message: '第一层处理',
timestamp: new Date(),
}))
.addNode('level2_a', (state) => ({
...state,
step: 2,
message: '第二层A处理',
}))
.addNode('level2_b', (state) => ({
...state,
step: 2,
message: '第二层B处理',
}))
.addNode('level3', (state) => ({
...state,
step: 3,
message: '第三层处理',
}))
// 层级结构
.addEdge(START, 'level1')
.addEdge('level1', 'level2_a')
.addEdge('level1', 'level2_b')
.addEdge('level2_a', 'level3')
.addEdge('level2_b', 'level3')
.addEdge('level3', END)
.compile();
// 运行示例
async function runBasicEdgesExample() {
console.log('=== 基础边示例 ===\n');
// 示例 1: 线性流程
console.log('1. 线性流程执行:');
console.log('流程: START -> init -> process -> validate -> finalize -> END');
const linearResult = await linearGraph.invoke({});
console.log('线性流程结果:', {
finalStep: linearResult.step,
message: linearResult.message,
dataCount: linearResult.data?.length,
processed: linearResult.processed,
});
console.log();
// 示例 2: 分支汇聚
console.log('2. 分支汇聚执行:');
console.log('流程: START -> start -> [branch_a, branch_b] -> merge -> END');
const branchResult = await branchMergeGraph.invoke({});
console.log('分支汇聚结果:', {
finalStep: branchResult.step,
message: branchResult.message,
});
console.log();
// 示例 3: 多层级结构
console.log('3. 多层级结构执行:');
console.log('流程: START -> level1 -> [level2_a, level2_b] -> level3 -> END');
const hierarchicalResult = await hierarchicalGraph.invoke({});
console.log('多层级结果:', {
finalStep: hierarchicalResult.step,
message: hierarchicalResult.message,
});
console.log();
// 示例 4: 边的执行顺序演示
console.log('4. 边的执行顺序演示:');
await demonstrateExecutionOrder();
}
// 演示边的执行顺序
async function demonstrateExecutionOrder() {
const executionOrder: string[] = [];
const orderDemoGraph = new StateGraph(BasicEdgeState)
.addNode('A', (state) => {
executionOrder.push('节点A执行');
return { step: 1, message: 'A完成' };
})
.addNode('B', (state) => {
executionOrder.push('节点B执行');
return { ...state, step: 2, message: 'B完成' };
})
.addNode('C', (state) => {
executionOrder.push('节点C执行');
return { ...state, step: 3, message: 'C完成' };
})
.addNode('D', (state) => {
executionOrder.push('节点D执行');
return { ...state, step: 4, message: 'D完成' };
})
// 复杂的边连接
.addEdge(START, 'A')
.addEdge('A', 'B')
.addEdge('A', 'C') // A 同时连接到 B 和 C
.addEdge('B', 'D')
.addEdge('C', 'D') // B 和 C 都连接到 D
.addEdge('D', END)
.compile();
await orderDemoGraph.invoke({});
console.log('执行顺序:', executionOrder);
console.log('说明: 节点A执行后,B和C可能并行执行,然后D等待B和C都完成后执行');
}
// 边的类型演示
function demonstrateEdgeTypes() {
console.log('\n=== 边的类型演示 ===');
const graph = new StateGraph(BasicEdgeState);
// 1. 普通边
console.log('1. 普通边 (Regular Edge):');
console.log(' graph.addEdge("nodeA", "nodeB")');
console.log(' - 无条件连接');
console.log(' - nodeA 执行完直接执行 nodeB');
// 2. 起始边
console.log('\n2. 起始边 (Start Edge):');
console.log(' graph.addEdge(START, "firstNode")');
console.log(' - 定义图的入口点');
console.log(' - 图开始执行时的第一个节点');
// 3. 结束边
console.log('\n3. 结束边 (End Edge):');
console.log(' graph.addEdge("lastNode", END)');
console.log(' - 定义图的出口点');
console.log(' - 节点执行完后结束图的执行');
// 4. 多重边
console.log('\n4. 多重边 (Multiple Edges):');
console.log(' graph.addEdge("nodeA", "nodeB")');
console.log(' .addEdge("nodeA", "nodeC")');
console.log(' - 一个节点可以连接到多个节点');
console.log(' - 支持分支和并行执行');
}
2. 条件边的实现
条件边示例:
import '../../utils/loadEnv';
import { StateGraph, Annotation, START, END } from '@langchain/langgraph';
import { RunnableConfig } from '@langchain/core/runnables';
// 定义条件边示例状态
const ConditionalEdgeState = Annotation.Root({
userType: Annotation<string>(),
score: Annotation<number>(),
attempts: Annotation<number>(),
success: Annotation<boolean>(),
error: Annotation<string>(),
route: Annotation<string>(),
message: Annotation<string>(),
data: Annotation<any>(),
});
// 节点函数
const authNode = (
state: typeof ConditionalEdgeState.State,
config?: RunnableConfig
) => {
console.log('🔐 认证节点执行');
// 模拟认证逻辑
const score = Math.floor(Math.random() * 100);
let userType = 'guest';
if (score >= 90) userType = 'admin';
else if (score >= 70) userType = 'premium';
else if (score >= 50) userType = 'standard';
return {
userType,
score,
message: `用户认证完成,类型: ${userType},分数: ${score}`,
};
};
const adminNode = (
state: typeof ConditionalEdgeState.State,
config?: RunnableConfig
) => {
console.log('👑 管理员节点执行');
return {
...state,
route: 'admin_panel',
message: '进入管理员面板',
data: { permissions: ['read', 'write', 'delete', 'admin'] },
};
};
const premiumNode = (
state: typeof ConditionalEdgeState.State,
config?: RunnableConfig
) => {
console.log('💎 高级用户节点执行');
return {
...state,
route: 'premium_features',
message: '进入高级功能区',
data: { permissions: ['read', 'write', 'premium'] },
};
};
const standardNode = (
state: typeof ConditionalEdgeState.State,
config?: RunnableConfig
) => {
console.log('👤 标准用户节点执行');
return {
...state,
route: 'standard_features',
message: '进入标准功能区',
data: { permissions: ['read', 'write'] },
};
};
const guestNode = (
state: typeof ConditionalEdgeState.State,
config?: RunnableConfig
) => {
console.log('👥 访客节点执行');
return {
...state,
route: 'guest_area',
message: '进入访客区域',
data: { permissions: ['read'] },
};
};
const processNode = (
state: typeof ConditionalEdgeState.State,
config?: RunnableConfig
) => {
console.log('⚙️ 处理节点执行');
// 模拟处理逻辑,可能成功或失败
const success = Math.random() > 0.3; // 70% 成功率
const attempts = (state.attempts || 0) + 1;
return {
...state,
success,
attempts,
error: success ? '' : '处理失败',
message: success ? '处理成功' : `处理失败,尝试次数: ${attempts}`,
};
};
const retryNode = (
state: typeof ConditionalEdgeState.State,
config?: RunnableConfig
) => {
console.log('🔄 重试节点执行');
return {
...state,
message: `准备重试,当前尝试次数: ${state.attempts}`,
};
};
const errorNode = (
state: typeof ConditionalEdgeState.State,
config?: RunnableConfig
) => {
console.log('❌ 错误处理节点执行');
return {
...state,
route: 'error_page',
message: `处理失败,已达到最大重试次数: ${state.attempts}`,
};
};
const successNode = (
state: typeof ConditionalEdgeState.State,
config?: RunnableConfig
) => {
console.log('✅ 成功节点执行');
return {
...state,
route: 'success_page',
message: '操作成功完成',
};
};
// 路由函数
const userTypeRouter = (state: typeof ConditionalEdgeState.State): string => {
console.log(`🔀 用户类型路由: ${state.userType}`);
switch (state.userType) {
case 'admin':
return 'admin';
case 'premium':
return 'premium';
case 'standard':
return 'standard';
default:
return 'guest';
}
};
const retryRouter = (state: typeof ConditionalEdgeState.State): string => {
console.log(`🔀 重试路由: 成功=${state.success}, 尝试次数=${state.attempts}`);
if (state.success) {
return 'success';
} else if (state.attempts < 3) {
return 'retry';
} else {
return 'error';
}
};
const complexRouter = (state: typeof ConditionalEdgeState.State): string => {
console.log(
`🔀 复杂路由: 类型=${state.userType}, 分数=${state.score}, 成功=${state.success}`
);
// 复杂的多条件路由逻辑
if (state.success && state.userType === 'admin' && state.score >= 95) {
return 'super_admin';
} else if (state.success && state.score >= 80) {
return 'high_score';
} else if (state.success) {
return 'normal_success';
} else if (state.attempts < 2) {
return 'quick_retry';
} else {
return 'final_attempt';
}
};
// 1. 用户类型分支图
const userTypeGraph = new StateGraph(ConditionalEdgeState)
.addNode('auth', authNode)
.addNode('admin', adminNode)
.addNode('premium', premiumNode)
.addNode('standard', standardNode)
.addNode('guest', guestNode)
.addEdge(START, 'auth')
.addConditionalEdges('auth', userTypeRouter, {
admin: 'admin',
premium: 'premium',
standard: 'standard',
guest: 'guest',
})
.addEdge('admin', END)
.addEdge('premium', END)
.addEdge('standard', END)
.addEdge('guest', END)
.compile();
// 2. 重试逻辑图
const retryGraph = new StateGraph(ConditionalEdgeState)
.addNode('process', processNode)
.addNode('retry', retryNode)
.addNode('success', successNode)
.addNode('error', errorNode)
.addEdge(START, 'process')
.addConditionalEdges('process', retryRouter, {
success: 'success',
retry: 'retry',
error: 'error',
})
.addEdge('retry', 'process') // 重试回到处理节点
.addEdge('success', END)
.addEdge('error', END)
.compile();
// 3. 复杂条件图
const complexGraph = new StateGraph(ConditionalEdgeState)
.addNode('auth', authNode)
.addNode('process', processNode)
.addNode('super_admin', (state) => ({
...state,
route: 'super_admin_panel',
message: '超级管理员权限',
}))
.addNode('high_score', (state) => ({
...state,
route: 'high_score_rewards',
message: '高分奖励区域',
}))
.addNode('normal_success', successNode)
.addNode('quick_retry', (state) => ({
...state,
message: '快速重试',
}))
.addNode('final_attempt', (state) => ({
...state,
message: '最后尝试',
}))
.addEdge(START, 'auth')
.addEdge('auth', 'process')
.addConditionalEdges('process', complexRouter, {
super_admin: 'super_admin',
high_score: 'high_score',
normal_success: 'normal_success',
quick_retry: 'quick_retry',
final_attempt: 'final_attempt',
})
.addEdge('super_admin', END)
.addEdge('high_score', END)
.addEdge('normal_success', END)
.addEdge('quick_retry', 'process')
.addEdge('final_attempt', 'process')
.compile();
// 运行示例
async function runConditionalEdgesExample() {
console.log('=== 条件边示例 ===\n');
// 示例 1: 用户类型分支
console.log('1. 用户类型分支测试:');
for (let i = 0; i < 3; i++) {
console.log(`\n--- 第 ${i + 1} 次测试 ---`);
const result = await userTypeGraph.invoke({});
console.log('结果:', {
userType: result.userType,
score: result.score,
route: result.route,
message: result.message,
});
}
console.log();
// 示例 2: 重试逻辑
console.log('2. 重试逻辑测试:');
const retryResult = await retryGraph.invoke({});
console.log('重试结果:', {
success: retryResult.success,
attempts: retryResult.attempts,
route: retryResult.route,
message: retryResult.message,
});
console.log();
// 示例 3: 复杂条件路由
console.log('3. 复杂条件路由测试:');
const complexResult = await complexGraph.invoke({});
console.log('复杂路由结果:', {
userType: complexResult.userType,
score: complexResult.score,
success: complexResult.success,
attempts: complexResult.attempts,
route: complexResult.route,
message: complexResult.message,
});
console.log();
// 示例 4: 路由函数类型演示
demonstrateRouterTypes();
}
// 演示不同类型的路由函数
function demonstrateRouterTypes() {
console.log('4. 路由函数类型演示:');
// 简单路由函数
console.log('\n简单路由函数:');
const simpleRouter = (state: any) => {
return state.value > 50 ? 'high' : 'low';
};
console.log(' 用途: 简单的二分支决策');
console.log(' 示例: 根据数值大小选择路径');
// 多分支路由函数
console.log('\n多分支路由函数:');
const multiBranchRouter = (state: any) => {
if (state.score >= 90) return 'excellent';
if (state.score >= 70) return 'good';
if (state.score >= 50) return 'average';
return 'poor';
};
console.log(' 用途: 多级分类决策');
console.log(' 示例: 根据分数等级选择不同处理路径');
// 复合条件路由函数
console.log('\n复合条件路由函数:');
const complexConditionRouter = (state: any) => {
const { userType, isActive, hasPermission } = state;
if (userType === 'admin' && isActive && hasPermission) {
return 'admin_access';
} else if (isActive && hasPermission) {
return 'user_access';
} else if (isActive) {
return 'limited_access';
} else {
return 'no_access';
}
};
console.log(' 用途: 多条件组合决策');
console.log(' 示例: 权限控制和访问管理');
// 带状态更新的路由函数
console.log('\n带状态更新的路由函数:');
console.log(' 注意: 路由函数应该是纯函数,不应修改状态');
console.log(' 状态更新应该在节点函数中进行');
}
// 路由函数最佳实践演示
function demonstrateRouterBestPractices() {
console.log('\n=== 路由函数最佳实践 ===');
// ✅ 好的路由函数
const goodRouter = (state: typeof ConditionalEdgeState.State): string => {
// 1. 类型安全
const userType: string = state.userType || 'unknown';
// 2. 输入验证
if (!userType) {
console.warn('用户类型未定义,使用默认路径');
return 'default';
}
// 3. 清晰的逻辑
const routeMap: Record<string, string> = {
admin: 'admin_panel',
user: 'user_dashboard',
guest: 'guest_area',
};
// 4. 默认处理
return routeMap[userType] || 'unknown_user';
};
// ❌ 避免的路由函数
const badRouter = (state: any) => {
// 避免: 修改状态
// state.routeDecisionTime = new Date();
// 避免: 复杂的异步操作
// await someAsyncOperation();
// 避免: 副作用
// console.log('Making routing decision...');
// 避免: 不明确的返回值
return state.userType === 'admin' ? 'admin' : 'other';
};
console.log('✅ 好的路由函数特点:');
console.log(' - 类型安全');
console.log(' - 输入验证');
console.log(' - 清晰的逻辑');
console.log(' - 默认处理');
console.log(' - 纯函数(无副作用)');
console.log('\n❌ 避免的路由函数特点:');
console.log(' - 修改输入状态');
console.log(' - 执行异步操作');
console.log(' - 产生副作用');
console.log(' - 不明确的返回值');
console.log(' - 缺少错误处理');
}
3. 入口点的配置
入口点示例:
import '../../utils/loadEnv';
import { StateGraph, Annotation, START, END } from '@langchain/langgraph';
import { RunnableConfig } from '@langchain/core/runnables';
// 定义入口点示例状态
const EntryPointState = Annotation.Root({
userRole: Annotation<string>(),
priority: Annotation<number>(),
mode: Annotation<string>(),
route: Annotation<string>(),
message: Annotation<string>(),
timestamp: Annotation<Date>(),
});
// 节点函数
const adminEntryNode = (
state: typeof EntryPointState.State,
config?: RunnableConfig
) => {
console.log('👑 管理员入口节点执行');
return {
...state,
route: 'admin_dashboard',
message: '管理员直接进入后台',
timestamp: new Date(),
};
};
const userEntryNode = (
state: typeof EntryPointState.State,
config?: RunnableConfig
) => {
console.log('👤 用户入口节点执行');
return {
...state,
route: 'user_dashboard',
message: '用户进入个人面板',
timestamp: new Date(),
};
};
const guestEntryNode = (
state: typeof EntryPointState.State,
config?: RunnableConfig
) => {
console.log('👥 访客入口节点执行');
return {
...state,
route: 'guest_welcome',
message: '访客进入欢迎页面',
timestamp: new Date(),
};
};
const emergencyEntryNode = (
state: typeof EntryPointState.State,
config?: RunnableConfig
) => {
console.log('🚨 紧急入口节点执行');
return {
...state,
route: 'emergency_mode',
message: '紧急模式激活',
timestamp: new Date(),
};
};
const maintenanceEntryNode = (
state: typeof EntryPointState.State,
config?: RunnableConfig
) => {
console.log('🔧 维护入口节点执行');
return {
...state,
route: 'maintenance_mode',
message: '系统维护模式',
timestamp: new Date(),
};
};
const processNode = (
state: typeof EntryPointState.State,
config?: RunnableConfig
) => {
console.log('⚙️ 通用处理节点执行');
return {
...state,
message: `${state.message} -> 处理完成`,
timestamp: new Date(),
};
};
// 条件入口点路由函数
const entryPointRouter = (state: typeof EntryPointState.State): string => {
console.log(
`🎯 入口点路由决策: 角色=${state.userRole}, 优先级=${state.priority}, 模式=${state.mode}`
);
// 紧急模式优先级最高
if (state.mode === 'emergency') {
return 'emergency';
}
// 维护模式
if (state.mode === 'maintenance') {
return 'maintenance';
}
// 高优先级用户
if (state.priority >= 90) {
return 'admin';
}
// 根据用户角色决定
switch (state.userRole) {
case 'admin':
return 'admin';
case 'user':
return 'user';
default:
return 'guest';
}
};
// 优先级路由函数
const priorityRouter = (state: typeof EntryPointState.State): string => {
const priority = state.priority || 0;
if (priority >= 90) return 'high_priority';
if (priority >= 50) return 'medium_priority';
return 'low_priority';
};
// 1. 基础入口点图(固定入口)
const basicEntryGraph = new StateGraph(EntryPointState)
.addNode('main_entry', (state) => ({
...state,
route: 'main_flow',
message: '从主入口进入',
}))
.addNode('process', processNode)
// 固定入口点:所有请求都从 main_entry 开始
.addEdge(START, 'main_entry')
.addEdge('main_entry', 'process')
.addEdge('process', END)
.compile();
// 2. 条件入口点图
const conditionalEntryGraph = new StateGraph(EntryPointState)
.addNode('admin_entry', adminEntryNode)
.addNode('user_entry', userEntryNode)
.addNode('guest_entry', guestEntryNode)
.addNode('emergency_entry', emergencyEntryNode)
.addNode('maintenance_entry', maintenanceEntryNode)
.addNode('process', processNode)
// 条件入口点:根据状态决定从哪个节点开始
.addConditionalEdges(START, entryPointRouter, {
admin: 'admin_entry',
user: 'user_entry',
guest: 'guest_entry',
emergency: 'emergency_entry',
maintenance: 'maintenance_entry',
})
.addEdge('admin_entry', 'process')
.addEdge('user_entry', 'process')
.addEdge('guest_entry', 'process')
.addEdge('emergency_entry', 'process')
.addEdge('maintenance_entry', 'process')
.addEdge('process', END)
.compile();
// 3. 多级入口点图
const multiLevelEntryGraph = new StateGraph(EntryPointState)
.addNode('high_priority', (state) => ({
...state,
route: 'vip_lane',
message: '高优先级通道',
}))
.addNode('medium_priority', (state) => ({
...state,
route: 'standard_lane',
message: '标准优先级通道',
}))
.addNode('low_priority', (state) => ({
...state,
route: 'queue_lane',
message: '排队等待通道',
}))
.addNode('role_check', (state) => ({
...state,
message: `${state.message} -> 角色验证`,
}))
.addNode('final_process', processNode)
// 第一级:优先级入口
.addConditionalEdges(START, priorityRouter, {
high_priority: 'high_priority',
medium_priority: 'medium_priority',
low_priority: 'low_priority',
})
// 第二级:角色检查
.addEdge('high_priority', 'role_check')
.addEdge('medium_priority', 'role_check')
.addEdge('low_priority', 'role_check')
// 第三级:最终处理
.addEdge('role_check', 'final_process')
.addEdge('final_process', END)
.compile();
// 运行示例
async function runEntryPointsExample() {
console.log('=== 入口点示例 ===\n');
// 示例 1: 基础入口点
console.log('1. 基础入口点测试:');
const basicResult = await basicEntryGraph.invoke({
userRole: 'user',
priority: 50,
});
console.log('基础入口结果:', {
route: basicResult.route,
message: basicResult.message,
});
console.log();
// 示例 2: 条件入口点测试
console.log('2. 条件入口点测试:');
const testCases = [
{
userRole: 'admin',
priority: 95,
mode: 'normal',
description: '高级管理员',
},
{ userRole: 'user', priority: 60, mode: 'normal', description: '普通用户' },
{ userRole: 'guest', priority: 10, mode: 'normal', description: '访客' },
{
userRole: 'user',
priority: 50,
mode: 'emergency',
description: '紧急模式',
},
{
userRole: 'admin',
priority: 80,
mode: 'maintenance',
description: '维护模式',
},
];
for (const testCase of testCases) {
console.log(`\n--- ${testCase.description} ---`);
const result = await conditionalEntryGraph.invoke(testCase);
console.log('结果:', {
route: result.route,
message: result.message,
});
}
console.log();
// 示例 3: 多级入口点测试
console.log('3. 多级入口点测试:');
const multiLevelCases = [
{ userRole: 'admin', priority: 95, description: '超高优先级' },
{ userRole: 'user', priority: 65, description: '中等优先级' },
{ userRole: 'guest', priority: 25, description: '低优先级' },
];
for (const testCase of multiLevelCases) {
console.log(`\n--- ${testCase.description} ---`);
const result = await multiLevelEntryGraph.invoke(testCase);
console.log('结果:', {
route: result.route,
message: result.message,
});
}
console.log();
// 示例 4: 入口点设计模式演示
demonstrateEntryPointPatterns();
}
// 演示入口点设计模式
function demonstrateEntryPointPatterns() {
console.log('4. 入口点设计模式:');
console.log('\n📍 固定入口点模式:');
console.log(' 特点: 所有请求都从同一个节点开始');
console.log(' 适用: 简单的线性流程');
console.log(' 示例: graph.addEdge(START, "main_entry")');
console.log('\n🎯 条件入口点模式:');
console.log(' 特点: 根据初始状态选择不同的入口节点');
console.log(' 适用: 多种用户类型或场景');
console.log(' 示例: graph.addConditionalEdges(START, router, routes)');
console.log('\n🏗️ 分层入口点模式:');
console.log(' 特点: 多级入口决策,逐步细化');
console.log(' 适用: 复杂的权限和优先级控制');
console.log(' 示例: 优先级 -> 角色 -> 功能');
console.log('\n⚡ 动态入口点模式:');
console.log(' 特点: 运行时动态决定入口点');
console.log(' 适用: 负载均衡、故障转移');
console.log(' 示例: 根据系统负载选择处理节点');
}
// 入口点最佳实践
function demonstrateEntryPointBestPractices() {
console.log('\n=== 入口点最佳实践 ===');
console.log('\n✅ 好的入口点设计:');
console.log(' 1. 清晰的路由逻辑');
console.log(' 2. 完整的条件覆盖');
console.log(' 3. 默认入口处理');
console.log(' 4. 错误情况考虑');
console.log(' 5. 性能优化');
console.log('\n🔧 入口点路由函数示例:');
// 好的入口点路由函数
const goodEntryRouter = (state: typeof EntryPointState.State): string => {
// 1. 输入验证
if (!state.userRole) {
console.warn('用户角色未定义,使用默认入口');
return 'default_entry';
}
// 2. 优先级处理
if (state.mode === 'emergency') {
return 'emergency_entry';
}
// 3. 清晰的映射
const entryMap: Record<string, string> = {
admin: 'admin_entry',
moderator: 'moderator_entry',
user: 'user_entry',
guest: 'guest_entry',
};
// 4. 默认处理
return entryMap[state.userRole] || 'guest_entry';
};
console.log('\n❌ 避免的入口点设计:');
console.log(' 1. 复杂的嵌套条件');
console.log(' 2. 缺少默认处理');
console.log(' 3. 硬编码的路由逻辑');
console.log(' 4. 忽略边界情况');
console.log(' 5. 性能问题');
console.log('\n🎯 入口点选择策略:');
console.log(' - 简单应用: 使用固定入口点');
console.log(' - 多用户类型: 使用条件入口点');
console.log(' - 复杂权限: 使用分层入口点');
console.log(' - 高并发: 使用动态入口点');
}
边的设计模式
1. 线性流程模式
// 简单的线性执行流程
graph
.addEdge(START, 'step1')
.addEdge('step1', 'step2')
.addEdge('step2', 'step3')
.addEdge('step3', END);
适用场景:
- 数据处理管道
- 顺序验证流程
- 简单的工作流
2. 分支决策模式
// 根据条件分支执行
const routingFunction = (state) => {
if (state.userType === 'admin') return 'admin_flow';
if (state.userType === 'user') return 'user_flow';
return 'guest_flow';
};
graph.addConditionalEdges('auth', routingFunction, {
'admin_flow': 'admin_dashboard',
'user_flow': 'user_dashboard',
'guest_flow': 'guest_welcome'
});
适用场景:
- 用户权限控制
- 业务逻辑分支
- 错误处理路径
3. 循环重试模式
// 支持重试的循环结构
const retryLogic = (state) => {
if (state.success) return 'complete';
if (state.attempts < 3) return 'retry';
return 'failed';
};
graph.addConditionalEdges('process', retryLogic, {
'complete': 'success_handler',
'retry': 'process', // 回到自身重试
'failed': 'error_handler'
});
适用场景:
- API 调用重试
- 数据验证循环
- 错误恢复机制
4. 并行汇聚模式
// 多个分支最终汇聚到同一节点
graph
.addEdge('parallel_start', 'task_a')
.addEdge('parallel_start', 'task_b')
.addEdge('parallel_start', 'task_c')
.addEdge('task_a', 'merge_results')
.addEdge('task_b', 'merge_results')
.addEdge('task_c', 'merge_results');
适用场景:
- 并行数据处理
- 多源数据聚合
- 分布式任务协调
路由函数最佳实践
1. 类型安全的路由函数
type RouteOptions = 'success' | 'retry' | 'failed';
const typedRouter = (state: StateType): RouteOptions => {
if (state.error) {
return state.attempts < 3 ? 'retry' : 'failed';
}
return 'success';
};
// 使用类型安全的路由映射
const routeMap: Record<RouteOptions, string> = {
'success': 'complete',
'retry': 'process_again',
'failed': 'error_handler'
};
graph.addConditionalEdges('processor', typedRouter, routeMap);
2. 复杂条件的路由函数
const complexRouter = (state: StateType) => {
// 多重条件判断
const { userType, score, permissions, isActive } = state;
// 优先级判断
if (!isActive) return 'inactive_user';
if (userType === 'admin') return 'admin_panel';
if (score > 90 && permissions.includes('premium')) return 'premium_features';
if (score > 70) return 'standard_features';
if (score > 50) return 'basic_features';
return 'limited_access';
};
3. 带日志的路由函数
const loggedRouter = (state: StateType) => {
const route = determineRoute(state);
console.log(`路由决策: ${JSON.stringify({
currentState: state.currentStep,
decision: route,
timestamp: new Date().toISOString()
})}`);
return route;
};
错误处理和调试
1. 路由函数错误处理
const safeRouter = (state: StateType) => {
try {
// 路由逻辑
return calculateRoute(state);
} catch (error) {
console.error('路由函数执行失败:', error);
return 'error_fallback'; // 默认错误路径
}
};
2. 边的验证
const validateEdges = (graph: StateGraph) => {
// 检查是否有孤立节点
// 检查是否有无效的路由目标
// 验证条件边的完整性
};
3. 调试技巧
// 添加调试信息到路由函数
const debugRouter = (state: StateType) => {
const route = normalRouter(state);
if (process.env.NODE_ENV === 'development') {
console.log(`🔀 路由决策:`, {
from: state.currentNode,
to: route,
state: state,
timestamp: Date.now()
});
}
return route;
};
性能优化
1. 路由函数优化
// ❌ 避免复杂计算
const slowRouter = (state: StateType) => {
const expensiveResult = performHeavyCalculation(state);
return expensiveResult > threshold ? 'path_a' : 'path_b';
};
// ✅ 缓存计算结果
const optimizedRouter = (state: StateType) => {
if (!state.cachedResult) {
state.cachedResult = performHeavyCalculation(state);
}
return state.cachedResult > threshold ? 'path_a' : 'path_b';
};
2. 条件边的优化
// ✅ 使用 Map 替代大量 if-else
const routeMap = new Map([
['admin', 'admin_panel'],
['user', 'user_dashboard'],
['guest', 'guest_welcome']
]);
const mapBasedRouter = (state: StateType) => {
return routeMap.get(state.userType) || 'default_path';
};
常见问题解答
Q: 如何处理路由函数返回无效路径?
A: LangGraph 会抛出错误。建议在路由函数中添加默认路径:
const safeRouter = (state: StateType) => {
const route = calculateRoute(state);
// 验证路径是否有效
const validRoutes = ['path_a', 'path_b', 'default'];
return validRoutes.includes(route) ? route : 'default';
};
Q: 条件边可以返回多个目标节点吗?
A: 单个条件边只能返回一个目标。如需并行执行,可以使用中间节点:
// 创建分发节点
const distributorNode = (state: StateType) => {
// 触发并行处理的逻辑
return { needsParallelProcessing: true };
};
graph
.addConditionalEdges('decision', router, { 'parallel': 'distributor' })
.addEdge('distributor', 'task_a')
.addEdge('distributor', 'task_b');
Q: 如何实现动态边?
A: 使用条件边和动态路由函数:
const dynamicRouter = (state: StateType) => {
// 根据运行时状态动态决定路径
const availableNodes = state.availableProcessors;
const selectedNode = selectOptimalNode(availableNodes);
return selectedNode;
};
小结与延伸
在本节中,我们深入了解了 LangGraphJS 的边机制:
🔑 核心要点:
- 边定义节点间的连接关系和执行顺序
- 支持普通边、条件边、入口点等多种类型
- 路由函数是条件边的核心,决定执行路径
- 通过类型安全确保路由的可靠性
📈 最佳实践:
- 保持路由函数的简洁和高效
- 添加适当的错误处理和默认路径
- 使用类型安全的路由映射
- 考虑性能优化和调试需求
🔗 与下一节的关联: 掌握了边的机制后,我们将学习**Reducers(状态合并器)**的详细原理。Reducers 定义了如何将节点的状态更新应用到全局状态中,是状态管理的核心机制。理解 Reducers 将帮助你更好地控制状态的变化和合并逻辑。
💡 进阶学习建议:
- 尝试设计复杂的条件路由逻辑
- 实验不同的边连接模式
- 探索边的性能优化技巧
- 学习边的调试和可视化方法