⚙️ 节点(Nodes)
引言
在 LangGraphJS 中,节点(Nodes) 是图的执行单元,负责处理具体的业务逻辑。如果你熟悉前端开发,可以将节点理解为:
- React 组件的 render 函数:接收 props(状态),返回新的 JSX(状态更新)
- Redux 的 Action Creator:接收当前状态,返回 action(状态变更)
- Vue 的计算属性:基于响应式数据进行计算和转换
- 函数式编程的纯函数:输入确定,输出确定,无副作用
节点是 LangGraph 应用的"工作者",它们读取状态、执行逻辑、并产生新的状态更新。
与前端开发的类比
- 节点函数 ≈ React 组件函数 + 事件处理器
- 状态参数 ≈ React Props + Context
- 配置参数 ≈ React Context + 环境变量
- 返回值 ≈ setState() 的参数 + dispatch(action)
概念解释
节点的核心特征
🔧 函数本质
- 节点本质上是 JavaScript/TypeScript 函数
- 第一个参数是当前状态
- 第二个参数是可选的配置信息
- 返回状态更新对象或 Promise
📥 输入规范
type NodeFunction = (
state: StateType, // 当前状态
config?: RunnableConfig // 可选配置
) => StateUpdate | Promise<StateUpdate>
📤 输出规范
- 返回部分状态更新对象
- 可以返回 Promise(异步节点)
- 可以返回 Command 对象(控制流)
- 空对象
{}表示不更新状态
节点类型
🏁 特殊节点
- START:图的入口点,系统自动提供
- END:图的终止点,系统自动提供
⚡ 普通节点
- 同步节点:立即返回结果
- 异步节点:返回 Promise
- 条件节点:根据状态决定执行逻辑
可视化说明
下面的图表展示了节点在 LangGraph 中的工作流程:
图表说明:
- 输入状态:节点接收当前的完整状态
- 配置参数:可选的运行时配置
- 业务逻辑:节点内部的处理逻辑
- 状态更新:节点返回的状态变更
实践指导
1. 基础节点函数
基础节点示例:
import '../../utils/loadEnv';
import { StateGraph, Annotation, START, END } from '@langchain/langgraph';
import { RunnableConfig } from '@langchain/core/runnables';
// 定义状态
const NodeExampleState = Annotation.Root({
counter: Annotation<number>(),
message: Annotation<string>(),
userId: Annotation<string>(),
data: Annotation<any[]>(),
lastUpdated: Annotation<Date>(),
status: Annotation<string>(),
isValid: Annotation<boolean>(),
});
// 1. 基础同步节点
const incrementNode = (
state: typeof NodeExampleState.State,
config?: RunnableConfig
) => {
console.log(`当前计数: ${state.counter}`);
return {
counter: state.counter + 1,
lastUpdated: new Date(),
};
};
// 2. 数据处理节点
const processDataNode = (
state: typeof NodeExampleState.State,
config?: RunnableConfig
) => {
const newData = state.data.map((item) => ({
...item,
processed: true,
timestamp: new Date(),
}));
return {
data: newData,
message: `处理了 ${newData.length} 条数据`,
};
};
// 3. 验证节点
const validateNode = (
state: typeof NodeExampleState.State,
config?: RunnableConfig
) => {
const isValid = state.userId && state.userId.length > 0;
return {
message: isValid ? '验证通过' : '验证失败:用户ID不能为空',
isValid,
};
};
// 4. 配置参数节点
const configurableNode = (
state: typeof NodeExampleState.State,
config?: RunnableConfig
) => {
// 从配置中获取参数
const multiplier = config?.configurable?.multiplier || 1;
const prefix = config?.configurable?.prefix || '';
return {
counter: state.counter * multiplier,
message: `${prefix}计数器值: ${state.counter * multiplier}`,
};
};
// 5. 条件逻辑节点
const conditionalNode = (
state: typeof NodeExampleState.State,
config?: RunnableConfig
) => {
if (state.counter > 10) {
return {
message: '计数器已达到上限',
status: 'max_reached',
};
} else if (state.counter > 5) {
return {
message: '计数器接近上限',
status: 'warning',
};
} else {
return {
message: '计数器正常',
status: 'normal',
};
}
};
// 构建基础节点图
const basicNodesGraph = new StateGraph(NodeExampleState)
.addNode('increment', incrementNode)
.addNode('validate', validateNode)
.addNode('process', processDataNode)
.addNode('conditional', conditionalNode)
.addEdge(START, 'validate')
.addEdge('validate', 'increment')
.addEdge('increment', 'process')
.addEdge('process', 'conditional')
.addEdge('conditional', END)
.compile();
// 运行示例
async function runBasicNodesExample() {
console.log('=== 基础节点示例 ===\n');
// 示例 1: 基础执行
console.log('1. 基础节点执行:');
const result1 = await basicNodesGraph.invoke({
userId: 'user123',
data: [
{ id: 1, name: '项目A' },
{ id: 2, name: '项目B' },
],
});
console.log('执行结果:', {
counter: result1.counter,
message: result1.message,
dataCount: result1.data.length,
status: result1.status,
});
console.log();
// 示例 2: 配置参数使用
console.log('2. 使用配置参数:');
const result2 = await basicNodesGraph.invoke(
{
userId: 'user456',
counter: 3,
data: [{ id: 1, name: '测试' }],
},
{
configurable: {
multiplier: 5,
prefix: '[配置] ',
},
}
);
console.log('配置执行结果:', {
counter: result2.counter,
message: result2.message,
});
console.log();
// 示例 3: 多次执行
console.log('3. 多次执行演示:');
let currentState = {
userId: 'user789',
counter: 0,
data: [],
message: '',
status: 'init',
};
for (let i = 0; i < 3; i++) {
currentState = await basicNodesGraph.invoke(currentState);
console.log(`第 ${i + 1} 次执行:`, {
counter: currentState.counter,
message: currentState.message,
status: currentState.status,
});
}
}
// 导出
export {
NodeExampleState,
incrementNode,
processDataNode,
validateNode,
configurableNode,
conditionalNode,
basicNodesGraph,
runBasicNodesExample,
};
2. 异步节点处理
异步节点示例:
import '../../utils/loadEnv';
import { StateGraph, Annotation, START, END } from '@langchain/langgraph';
import { RunnableConfig } from '@langchain/core/runnables';
// 定义异步操作状态
const AsyncState = Annotation.Root({
userId: Annotation<string>(),
userData: Annotation<any>(),
apiResult: Annotation<any>(),
processedData: Annotation<any[]>(),
status: Annotation<string>(),
error: Annotation<string>(),
timestamp: Annotation<Date>(),
});
// 模拟异步 API 调用
const mockApiCall = (userId: string, delay: number = 1000): Promise<any> => {
return new Promise((resolve, reject) => {
setTimeout(() => {
if (Math.random() > 0.1) {
// 90% 成功率
resolve({
id: userId,
name: `用户_${userId}`,
email: `${userId}@example.com`,
createdAt: new Date(),
});
} else {
reject(new Error('API 调用失败'));
}
}, delay);
});
};
// 1. 基础异步节点
const fetchUserDataNode = async (
state: typeof AsyncState.State,
config?: RunnableConfig
) => {
console.log(`获取用户数据: ${state.userId}`);
try {
const userData = await mockApiCall(state.userId, 500);
return {
userData,
status: 'user_data_loaded',
timestamp: new Date(),
};
} catch (error) {
return {
error: error.message,
status: 'user_data_failed',
timestamp: new Date(),
};
}
};
// 2. 并行异步操作节点
const parallelProcessNode = async (
state: typeof AsyncState.State,
config?: RunnableConfig
) => {
console.log('执行并行处理...');
try {
// 并行执行多个异步操作
const [apiResult1, apiResult2, apiResult3] = await Promise.all([
mockApiCall('api1', 300),
mockApiCall('api2', 400),
mockApiCall('api3', 200),
]);
return {
apiResult: {
api1: apiResult1,
api2: apiResult2,
api3: apiResult3,
},
status: 'parallel_completed',
timestamp: new Date(),
};
} catch (error) {
return {
error: error.message,
status: 'parallel_failed',
timestamp: new Date(),
};
}
};
// 3. 串行异步操作节点
const sequentialProcessNode = async (
state: typeof AsyncState.State,
config?: RunnableConfig
) => {
console.log('执行串行处理...');
try {
// 串行执行异步操作
const step1 = await mockApiCall('step1', 200);
console.log('步骤1完成');
const step2 = await mockApiCall('step2', 300);
console.log('步骤2完成');
const step3 = await mockApiCall('step3', 100);
console.log('步骤3完成');
return {
processedData: [step1, step2, step3],
status: 'sequential_completed',
timestamp: new Date(),
};
} catch (error) {
return {
error: error.message,
status: 'sequential_failed',
timestamp: new Date(),
};
}
};
// 4. 带重试的异步节点
const retryableNode = async (
state: typeof AsyncState.State,
config?: RunnableConfig
) => {
const maxRetries = 3;
let lastError: Error | null = null;
for (let attempt = 1; attempt <= maxRetries; attempt++) {
try {
console.log(`尝试第 ${attempt} 次...`);
const result = await mockApiCall(state.userId, 200);
return {
apiResult: result,
status: `success_on_attempt_${attempt}`,
timestamp: new Date(),
};
} catch (error) {
lastError = error;
console.log(`第 ${attempt} 次尝试失败:`, error.message);
if (attempt < maxRetries) {
// 等待后重试
await new Promise((resolve) => setTimeout(resolve, 100 * attempt));
}
}
}
return {
error: `重试 ${maxRetries} 次后仍然失败: ${lastError?.message}`,
status: 'retry_exhausted',
timestamp: new Date(),
};
};
// 5. 超时控制节点
const timeoutNode = async (
state: typeof AsyncState.State,
config?: RunnableConfig
) => {
const timeout = config?.configurable?.timeout || 2000;
try {
const result = await Promise.race([
mockApiCall(state.userId, 1500),
new Promise((_, reject) =>
setTimeout(() => reject(new Error('操作超时')), timeout)
),
]);
return {
apiResult: result,
status: 'completed_within_timeout',
timestamp: new Date(),
};
} catch (error) {
return {
error: error.message,
status: 'timeout_or_failed',
timestamp: new Date(),
};
}
};
// 构建异步节点图
const asyncNodesGraph = new StateGraph(AsyncState)
.addNode('fetchUser', fetchUserDataNode)
.addNode('parallel', parallelProcessNode)
.addNode('sequential', sequentialProcessNode)
.addNode('retry', retryableNode)
.addEdge(START, 'fetchUser')
.addEdge('fetchUser', 'parallel')
.addEdge('parallel', 'sequential')
.addEdge('sequential', 'retry')
.addEdge('retry', END)
.compile();
// 单独的超时测试图
const timeoutGraph = new StateGraph(AsyncState)
.addNode('timeout', timeoutNode)
.addEdge(START, 'timeout')
.addEdge('timeout', END)
.compile();
// 运行示例
async function runAsyncNodesExample() {
console.log('=== 异步节点示例 ===\n');
// 示例 1: 基础异步操作
console.log('1. 基础异步操作:');
const startTime = Date.now();
const result1 = await asyncNodesGraph.invoke({
userId: 'async_user_001',
});
const duration = Date.now() - startTime;
console.log('异步操作结果:', {
status: result1.status,
hasUserData: !!result1.userData,
hasApiResult: !!result1.apiResult,
processedCount: result1.processedData?.length || 0,
duration: `${duration}ms`,
error: result1.error,
});
console.log();
// 示例 2: 超时控制
console.log('2. 超时控制测试:');
// 短超时测试
const timeoutResult = await timeoutGraph.invoke(
{ userId: 'timeout_user' },
{ configurable: { timeout: 1000 } }
);
console.log('超时测试结果:', {
status: timeoutResult.status,
error: timeoutResult.error,
});
console.log();
// 示例 3: 错误处理演示
console.log('3. 错误处理演示:');
// 使用无效用户ID触发错误
const errorResult = await asyncNodesGraph.invoke({
userId: '', // 空用户ID可能导致错误
});
console.log('错误处理结果:', {
status: errorResult.status,
hasError: !!errorResult.error,
error: errorResult.error,
});
}
// 导出
export {
AsyncState,
fetchUserDataNode,
parallelProcessNode,
sequentialProcessNode,
retryableNode,
timeoutNode,
asyncNodesGraph,
timeoutGraph,
runAsyncNodesExample,
};
3. 条件节点逻辑
条件节点示例:
import '../../utils/loadEnv';
import { StateGraph, Annotation, START, END } from '@langchain/langgraph';
import { RunnableConfig } from '@langchain/core/runnables';
// 定义条件节点状态
const ConditionalState = Annotation.Root({
userType: Annotation<string>(),
score: Annotation<number>(),
level: Annotation<string>(),
permissions: Annotation<string[]>(),
route: Annotation<string>(),
message: Annotation<string>(),
isValid: Annotation<boolean>(),
attempts: Annotation<number>(),
});
// 1. 用户类型分类节点
const classifyUserNode = (
state: typeof ConditionalState.State,
config?: RunnableConfig
) => {
const score = state.score || 0;
let userType: string;
let level: string;
let permissions: string[] = [];
if (score >= 90) {
userType = 'premium';
level = 'expert';
permissions = ['read', 'write', 'admin', 'delete'];
} else if (score >= 70) {
userType = 'standard';
level = 'advanced';
permissions = ['read', 'write', 'moderate'];
} else if (score >= 50) {
userType = 'basic';
level = 'intermediate';
permissions = ['read', 'write'];
} else {
userType = 'guest';
level = 'beginner';
permissions = ['read'];
}
return {
userType,
level,
permissions,
message: `用户分类为 ${userType},等级 ${level}`,
};
};
// 2. 路由决策节点
const routeDecisionNode = (
state: typeof ConditionalState.State,
config?: RunnableConfig
) => {
const userType = state.userType;
let route: string;
let message: string;
switch (userType) {
case 'premium':
route = 'premium_dashboard';
message = '跳转到高级用户面板';
break;
case 'standard':
route = 'standard_dashboard';
message = '跳转到标准用户面板';
break;
case 'basic':
route = 'basic_dashboard';
message = '跳转到基础用户面板';
break;
default:
route = 'guest_welcome';
message = '跳转到访客欢迎页';
}
return {
route,
message,
};
};
// 3. 验证节点
const validationNode = (
state: typeof ConditionalState.State,
config?: RunnableConfig
) => {
const score = state.score || 0;
const attempts = state.attempts || 0;
let isValid = false;
let message = '';
if (attempts >= 3) {
message = '尝试次数过多,账户已锁定';
isValid = false;
} else if (score < 0 || score > 100) {
message = '分数必须在0-100之间';
isValid = false;
} else if (!state.userType) {
message = '用户类型不能为空';
isValid = false;
} else {
message = '验证通过';
isValid = true;
}
return {
isValid,
message,
attempts: attempts + 1,
};
};
// 4. 权限检查节点
const permissionCheckNode = (
state: typeof ConditionalState.State,
config?: RunnableConfig
) => {
const requiredPermission = config?.configurable?.requiredPermission || 'read';
const userPermissions = state.permissions || [];
const hasPermission = userPermissions.includes(requiredPermission);
return {
isValid: hasPermission,
message: hasPermission
? `拥有 ${requiredPermission} 权限`
: `缺少 ${requiredPermission} 权限`,
};
};
// 5. 多条件判断节点
const complexConditionNode = (
state: typeof ConditionalState.State,
config?: RunnableConfig
) => {
const score = state.score || 0;
const userType = state.userType;
const level = state.level;
let route = '';
let message = '';
// 复杂条件判断
if (userType === 'premium' && score >= 95) {
route = 'vip_exclusive';
message = 'VIP专属通道';
} else if (userType === 'premium' && level === 'expert') {
route = 'expert_zone';
message = '专家区域';
} else if (
(userType === 'standard' || userType === 'premium') &&
score >= 80
) {
route = 'advanced_features';
message = '高级功能区';
} else if (score >= 60 && level !== 'beginner') {
route = 'regular_access';
message = '常规访问';
} else {
route = 'limited_access';
message = '受限访问';
}
return {
route,
message,
};
};
// 构建条件节点图
const conditionalGraph = new StateGraph(ConditionalState)
.addNode('classify', classifyUserNode)
.addNode('validate', validationNode)
.addNode('route', routeDecisionNode)
.addNode('permission', permissionCheckNode)
.addNode('complex', complexConditionNode)
.addEdge(START, 'classify')
.addEdge('classify', 'validate')
.addEdge('validate', 'route')
.addEdge('route', 'permission')
.addEdge('permission', 'complex')
.addEdge('complex', END)
.compile();
// 运行示例
async function runConditionalNodesExample() {
console.log('=== 条件节点示例 ===\n');
// 测试用例
const testCases = [
{ score: 95, description: '高分用户' },
{ score: 75, description: '中等分数用户' },
{ score: 45, description: '低分用户' },
{ score: 25, description: '新手用户' },
{ score: -10, description: '无效分数' },
];
for (const testCase of testCases) {
console.log(`测试: ${testCase.description} (分数: ${testCase.score})`);
const result = await conditionalGraph.invoke({
score: testCase.score,
});
console.log('结果:', {
userType: result.userType,
level: result.level,
permissions: result.permissions,
route: result.route,
isValid: result.isValid,
message: result.message,
});
console.log();
}
// 权限检查测试
console.log('权限检查测试:');
const permissionTests = ['read', 'write', 'admin', 'delete'];
for (const permission of permissionTests) {
const result = await conditionalGraph.invoke(
{ score: 85 }, // 标准用户
{ configurable: { requiredPermission: permission } }
);
console.log(`检查 ${permission} 权限:`, {
hasPermission: result.isValid,
message: result.message,
});
}
}
// 导出
export {
ConditionalState,
classifyUserNode,
routeDecisionNode,
validationNode,
permissionCheckNode,
complexConditionNode,
conditionalGraph,
runConditionalNodesExample,
};
节点设计最佳实践
1. 函数设计原则
// ✅ 好的节点设计
const goodNode = (state: StateType, config?: RunnableConfig) => {
// 1. 输入验证
if (!state.userId) {
throw new Error('用户ID不能为空');
}
// 2. 业务逻辑
const result = processData(state.data);
// 3. 明确的返回值
return {
processedData: result,
lastUpdated: new Date(),
};
};
// ❌ 避免的节点设计
const badNode = (state: any) => {
// 避免:修改输入状态
state.data.push('new item');
// 避免:不明确的返回值
return state;
// 避免:无错误处理
const result = riskyOperation();
return { result };
};
2. 错误处理模式
const safeNode = (state: StateType, config?: RunnableConfig) => {
try {
// 业务逻辑
const result = processData(state.data);
return {
result,
success: true,
error: null,
};
} catch (error) {
console.error('节点执行失败:', error);
return {
success: false,
error: error.message,
lastError: new Date(),
};
}
};
3. 配置参数使用
const configurableNode = (state: StateType, config?: RunnableConfig) => {
// 从配置中获取参数
const apiKey = config?.configurable?.apiKey;
const timeout = config?.configurable?.timeout || 5000;
const debug = config?.configurable?.debug || false;
if (debug) {
console.log('节点执行开始:', state);
}
// 使用配置参数
const result = callAPI(state.query, { apiKey, timeout });
return {
apiResult: result,
timestamp: new Date(),
};
};
4. 异步操作处理
const asyncNode = async (state: StateType, config?: RunnableConfig) => {
// 并行异步操作
const [userData, settingsData] = await Promise.all([
fetchUserData(state.userId),
fetchUserSettings(state.userId),
]);
// 串行异步操作
const profile = await buildUserProfile(userData);
const preferences = await applySettings(profile, settingsData);
return {
userProfile: profile,
userPreferences: preferences,
loadedAt: new Date(),
};
};
常见节点模式
1. 数据转换节点
const transformNode = (state: StateType) => {
const transformedData = state.rawData.map(item => ({
id: item.id,
name: item.name.toUpperCase(),
createdAt: new Date(item.timestamp),
}));
return {
transformedData,
transformCount: transformedData.length,
};
};
2. 验证节点
const validationNode = (state: StateType) => {
const errors: string[] = [];
if (!state.email?.includes('@')) {
errors.push('邮箱格式不正确');
}
if (!state.password || state.password.length < 8) {
errors.push('密码长度至少8位');
}
return {
isValid: errors.length === 0,
validationErrors: errors,
};
};
3. 聚合节点
const aggregationNode = (state: StateType) => {
const stats = {
totalItems: state.items?.length || 0,
completedItems: state.items?.filter(item => item.completed).length || 0,
averageScore: state.scores?.reduce((a, b) => a + b, 0) / (state.scores?.length || 1),
};
return {
statistics: stats,
lastAggregated: new Date(),
};
};
4. 路由节点
const routingNode = (state: StateType) => {
// 根据状态决定下一步
if (state.userType === 'admin') {
return { nextStep: 'admin_dashboard' };
} else if (state.userType === 'user') {
return { nextStep: 'user_dashboard' };
} else {
return { nextStep: 'login' };
}
};
性能优化技巧
1. 避免重复计算
const optimizedNode = (state: StateType) => {
// 检查是否需要重新计算
if (state.lastCalculated &&
Date.now() - state.lastCalculated.getTime() < 60000) {
return {}; // 1分钟内不重复计算
}
const expensiveResult = performExpensiveCalculation(state.data);
return {
calculatedResult: expensiveResult,
lastCalculated: new Date(),
};
};
2. 批量处理
const batchProcessingNode = (state: StateType) => {
const batchSize = 100;
const items = state.pendingItems || [];
if (items.length < batchSize) {
return {}; // 等待更多项目
}
const batch = items.slice(0, batchSize);
const processed = processBatch(batch);
return {
processedItems: [...(state.processedItems || []), ...processed],
pendingItems: items.slice(batchSize),
};
};
常见问题解答
Q: 节点函数可以修改输入状态吗?
A: 不建议直接修改输入状态。应该返回新的状态更新对象:
// ❌ 不推荐
const badNode = (state: StateType) => {
state.counter++; // 直接修改
return state;
};
// ✅ 推荐
const goodNode = (state: StateType) => {
return {
counter: state.counter + 1, // 返回新值
};
};
Q: 如何处理节点中的异常?
A: 使用 try-catch 包装业务逻辑:
const safeNode = (state: StateType) => {
try {
const result = riskyOperation(state.data);
return { result, success: true };
} catch (error) {
return {
error: error.message,
success: false
};
}
};
Q: 节点可以调用其他节点吗?
A: 节点应该是独立的,不直接调用其他节点。通过图的边来连接节点:
// ❌ 不推荐
const nodeA = (state: StateType) => {
const result = nodeB(state); // 直接调用
return result;
};
// ✅ 推荐:通过图连接
graph
.addNode('nodeA', nodeA)
.addNode('nodeB', nodeB)
.addEdge('nodeA', 'nodeB');
小结与延伸
在本节中,我们深入了解了 LangGraphJS 的节点机制:
🔑 核心要点:
- 节点是执行具体业务逻辑的函数
- 节点接收状态和配置,返回状态更新
- 支持同步和异步操作
- 通过类型安全确保代码可靠性
📈 最佳实践:
- 保持节点函数的纯净性
- 添加适当的错误处理
- 合理使用配置参数
- 考虑性能优化
🔗 与下一节的关联: 掌握了节点的实现后,我们将学习**边(Edges)**的详细机制。边定义了节点之间的连接关系和执行顺序,是构建复杂图结构的关键。理解边的工作原理将帮助你设计更灵活和强大的执行流程。
💡 进阶学习建议:
- 尝试设计不同类型的节点函数
- 实验异步操作和错误处理
- 探索节点的性能优化技巧
- 学习节点的测试和调试方法