🔧 节点设计
节点是 LangGraphJS 中执行具体业务逻辑的核心组件。良好的节点设计能够提高代码的可维护性、可测试性和可扩展性。
📋 设计原则
1. 单一职责原则
每个节点应该只负责一个明确的功能,避免在单个节点中处理多个不相关的任务。
单一职责示例
import { Annotation } from '@langchain/langgraph';
import { BaseMessage, AIMessage } from '@langchain/core/messages';
// 状态定义
const StateAnnotation = Annotation.Root({
messages: Annotation<BaseMessage[]>({
reducer: (x, y) => x.concat(y),
default: () => [],
}),
userQuery: Annotation<string>(),
searchResults: Annotation<string[]>({
reducer: (x, y) => y,
default: () => [],
}),
response: Annotation<string>(),
});
// ❌ 错误:职责过多
async function badMultiPurposeNode(state: typeof StateAnnotation.State) {
const keywords = state.userQuery.split(' ').filter((w) => w.length > 2);
const searchResults = await mockSearch(keywords);
const analysis = analyzeResults(searchResults);
const response = generateResponse(analysis);
return {
searchResults,
analysis,
response,
messages: [new AIMessage(response)],
};
}
// ✅ 正确:单一职责节点
// 查询解析节点
async function parseQueryNode(state: typeof StateAnnotation.State) {
if (!state.userQuery?.trim()) {
throw new Error('Query cannot be empty');
}
const keywords = state.userQuery
.toLowerCase()
.split(/\s+/)
.filter((word) => word.length > 2 && !STOP_WORDS.includes(word));
return { userQuery: state.userQuery.trim() };
}
// 搜索执行节点
async function searchNode(state: typeof StateAnnotation.State) {
try {
const results = await performSearch(state.userQuery);
return { searchResults: results };
} catch (error) {
console.error('Search failed:', error);
return { searchResults: [] };
}
}
// 辅助函数
const STOP_WORDS = ['the', 'is', 'at', 'which', 'on', 'and', 'or', 'but'];
async function mockSearch(keywords: string[]): Promise<string[]> {
return new Promise((resolve) => {
setTimeout(() => {
resolve([
`Result 1 for ${keywords.join(', ')}`,
`Result 2 for ${keywords.join(', ')}`,
]);
}, 100);
});
}
async function performSearch(query: string): Promise<string[]> {
return mockSearch(query.split(' '));
}
2. 纯函数设计
节点函数应该尽可能设计为纯函数,相同的输入产生相同的输出,避免副作用。
纯函数设计
import { Annotation } from '@langchain/langgraph';
import { BaseMessage, AIMessage } from '@langchain/core/messages';
// 状态定义
const StateAnnotation = Annotation.Root({
messages: Annotation<BaseMessage[]>({
reducer: (x, y) => x.concat(y),
default: () => [],
}),
input: Annotation<string>(),
processedData: Annotation<any>(),
result: Annotation<string>(),
timestamp: Annotation<number>(),
});
// ❌ 错误示例:非纯函数节点
let globalCounter = 0;
const cache = new Map();
async function impureNode(state: typeof StateAnnotation.State) {
// 问题1:依赖全局变量
globalCounter++;
// 问题2:修改外部状态
cache.set('lastInput', state.input);
// 问题3:依赖当前时间(非确定性)
const timestamp = Date.now();
// 问题4:有副作用(日志、网络请求等)
console.log('Processing:', state.input);
// 问题5:依赖随机数
const randomId = Math.random().toString(36);
return {
result: `Processed ${state.input} at ${timestamp} with ID ${randomId}`,
processedData: { counter: globalCounter },
};
}
// ✅ 正确示例:纯函数节点
/**
* 纯函数数据处理节点
* 相同输入总是产生相同输出,无副作用
*/
async function pureDataProcessorNode(state: typeof StateAnnotation.State) {
const { input } = state;
// 纯函数处理:只依赖输入参数
const processedData = processInput(input);
return {
processedData,
};
}
/**
* 纯函数文本转换节点
*/
async function pureTextTransformNode(state: typeof StateAnnotation.State) {
const { input } = state;
if (!input) {
return { result: '' };
}
// 纯函数转换
const result = transformText(input);
return {
result,
};
}
// 纯函数辅助工具
/**
* 纯函数:处理输入数据
*/
function processInput(input: string): any {
if (!input) return null;
return {
originalLength: input.length,
wordCount: input.split(/\s+/).filter((word) => word.length > 0).length,
hasNumbers: /\d/.test(input),
hasSpecialChars: /[!@#$%^&*(),.?":{}|<>]/.test(input),
processed: true,
};
}
/**
* 纯函数:文本转换
*/
function transformText(input: string): string {
return input
.toLowerCase()
.trim()
.replace(/\s+/g, ' ')
.replace(/[^\w\s]/g, '');
}
3. 错误处理
每个节点都应该有完善的错误处理机制,确保异常情况下的系统稳定性。
错误处理
import { Annotation } from '@langchain/langgraph';
import { BaseMessage, AIMessage } from '@langchain/core/messages';
// 状态定义
const StateAnnotation = Annotation.Root({
messages: Annotation<BaseMessage[]>({
reducer: (x, y) => x.concat(y),
default: () => [],
}),
input: Annotation<string>(),
result: Annotation<string>(),
error: Annotation<string>(),
retryCount: Annotation<number>({
reducer: (x, y) => y,
default: () => 0,
}),
isSuccess: Annotation<boolean>({
reducer: (x, y) => y,
default: () => false,
}),
});
// 自定义错误类型
export class ValidationError extends Error {
constructor(message: string, public field: string) {
super(message);
this.name = 'ValidationError';
}
}
export class NetworkError extends Error {
constructor(message: string, public statusCode?: number) {
super(message);
this.name = 'NetworkError';
}
}
// ❌ 错误示例:缺乏错误处理的节点
async function badNodeWithoutErrorHandling(
state: typeof StateAnnotation.State
) {
// 没有输入验证
const data = JSON.parse(state.input); // 可能抛出异常
// 没有错误处理的网络请求
const response = await fetch('https://api.example.com/data');
const result = await response.json(); // 可能失败
// 没有验证结果
return {
result: result.data.value, // 可能访问不存在的属性
};
}
// ✅ 正确示例:完善的错误处理
/**
* 输入验证节点
*/
async function inputValidationNode(state: typeof StateAnnotation.State) {
try {
const { input } = state;
// 基础验证
if (!input) {
throw new ValidationError('Input is required', 'input');
}
if (typeof input !== 'string') {
throw new ValidationError('Input must be a string', 'input');
}
if (input.trim().length === 0) {
throw new ValidationError('Input cannot be empty', 'input');
}
return {
input: input.trim(),
isSuccess: true,
};
} catch (error) {
return handleNodeError(error, 'input_validation');
}
}
/**
* 网络请求节点(带重试机制)
*/
async function networkRequestNode(state: typeof StateAnnotation.State) {
const maxRetries = 3;
const retryDelay = 1000; // 1秒
for (let attempt = 0; attempt <= maxRetries; attempt++) {
try {
const { input } = state;
// 模拟网络请求
const response = await makeNetworkRequest(input);
if (!response.ok) {
throw new NetworkError(
`Request failed with status ${response.status}`,
response.status
);
}
const data = await response.json();
// 验证响应数据
if (!data || typeof data !== 'object') {
throw new ProcessingError('Invalid response format');
}
return {
result: JSON.stringify(data),
isSuccess: true,
retryCount: attempt,
};
} catch (error) {
console.warn(`Network request attempt ${attempt + 1} failed:`, error);
// 最后一次尝试失败
if (attempt === maxRetries) {
return handleNodeError(error, 'network_request', {
retryCount: attempt,
});
}
// 等待后重试
await delay(retryDelay * Math.pow(2, attempt)); // 指数退避
}
}
}
// 错误处理工具函数
/**
* 统一的错误处理函数
*/
function handleNodeError(
error: unknown,
context: string,
additionalState: Record<string, any> = {}
): Record<string, any> {
let errorMessage: string;
let errorType: string;
if (error instanceof ValidationError) {
errorMessage = `Validation error in ${context}: ${error.message} (field: ${error.field})`;
errorType = 'validation';
} else if (error instanceof NetworkError) {
errorMessage = `Network error in ${context}: ${error.message}`;
if (error.statusCode) {
errorMessage += ` (status: ${error.statusCode})`;
}
errorType = 'network';
} else if (error instanceof Error) {
errorMessage = `Error in ${context}: ${error.message}`;
errorType = 'unknown';
} else {
errorMessage = `Unknown error in ${context}: ${String(error)}`;
errorType = 'unknown';
}
// 记录错误日志
console.error(`[${errorType.toUpperCase()}] ${errorMessage}`, {
context,
error,
stack: error instanceof Error ? error.stack : undefined,
});
return {
error: errorMessage,
isSuccess: false,
...additionalState,
};
}
async function makeNetworkRequest(input: string): Promise<Response> {
// 模拟网络请求
return new Promise((resolve, reject) => {
setTimeout(() => {
if (Math.random() < 0.3) {
reject(new NetworkError('Network timeout'));
} else {
resolve({
ok: true,
status: 200,
json: async () => ({ data: `Processed: ${input}` }),
} as Response);
}
}, 100);
});
}
function delay(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}
🏗️ 节点架构模式
1. 输入验证模式
在节点开始处理之前,先验证输入数据的有效性。
2. 管道模式
将复杂的处理逻辑分解为多个简单的步骤,形成处理管道。
3. 策略模式
根据不同的条件选择不同的处理策略。
策略模式
import { Annotation } from '@langchain/langgraph';
import { BaseMessage, AIMessage } from '@langchain/core/messages';
// 状态定义
const StateAnnotation = Annotation.Root({
messages: Annotation<BaseMessage[]>({
reducer: (x, y) => x.concat(y),
default: () => [],
}),
input: Annotation<string>(),
processingType: Annotation<string>(),
result: Annotation<string>(),
metadata: Annotation<any>(),
});
// 策略接口
interface ProcessingStrategy {
name: string;
canHandle(input: string): boolean;
process(input: string): Promise<string>;
getMetadata(): any;
}
// 具体策略实现
/**
* 文本处理策略
*/
class TextProcessingStrategy implements ProcessingStrategy {
name = 'text';
canHandle(input: string): boolean {
return typeof input === 'string' && input.length > 0;
}
async process(input: string): Promise<string> {
return input
.toLowerCase()
.trim()
.replace(/\s+/g, ' ')
.replace(/[^\w\s]/g, '');
}
getMetadata(): any {
return { type: 'text', features: ['normalize', 'clean'] };
}
}
/**
* 数字处理策略
*/
class NumberProcessingStrategy implements ProcessingStrategy {
name = 'number';
canHandle(input: string): boolean {
return /^\d+$/.test(input);
}
async process(input: string): Promise<string> {
const num = parseInt(input);
return `Number: ${num}, Square: ${num * num}, Double: ${num * 2}`;
}
getMetadata(): any {
return { type: 'number', features: ['arithmetic'] };
}
}
/**
* 策略选择器节点
*/
async function strategySelectorNode(state: typeof StateAnnotation.State) {
const { input } = state;
const strategies: ProcessingStrategy[] = [
new TextProcessingStrategy(),
new NumberProcessingStrategy(),
];
for (const strategy of strategies) {
if (strategy.canHandle(input)) {
const result = await strategy.process(input);
return {
result,
processingType: strategy.name,
metadata: strategy.getMetadata(),
};
}
}
return {
result: `No suitable strategy found for: ${input}`,
processingType: 'none',
metadata: { error: true },
};
}
🔄 异步节点设计
1. 异步操作处理
正确处理异步操作,包括 API 调用、数据库操作等。
异步节点
/**
* ============================================================================
* 异步节点设计 - Async Node Design
* ============================================================================
*
* 📖 概述
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* 本文件展示如何设计和实现异步节点,处理异步操作、Promise、async/await以及异步错误处理。
*
* 🎯 核心功能
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* 1️⃣ 节点实现:展示节点函数的标准实现模式
* 2️⃣ 状态处理:正确读取和更新状态
* 3️⃣ 错误处理:优雅处理节点执行中的错误
* 4️⃣ 最佳实践:展示节点设计的最佳实践和模式
*
* 💡 实现思路
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* • 节点函数接收状态作为参数,返回状态更新
* • 使用async/await处理异步操作
* • 通过try-catch捕获和处理错误
* • 返回部分状态更新,由reducer合并
*
* 🚀 使用方式
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* // 运行示例
* $ npx esno 最佳实践/节点设计/async-nodes.ts
*
* ⚠️ 注意事项
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* • 节点应该是纯函数或只有可控的副作用
* • 避免在节点中直接修改状态对象
* • 合理处理异步操作和错误
* • 保持节点职责单一,易于测试和维护
* • 使用TypeScript类型确保类型安全
*
* @author 程哥
* @version 1.0.0
* @updated 2025-11
*/
import '../../utils/loadEnv';
/**
* 异步节点设计示例
*
* 演示如何正确设计和处理异步操作的节点
*/
import { Annotation } from '@langchain/langgraph';
import { BaseMessage, AIMessage } from '@langchain/core/messages';
// 状态定义
const StateAnnotation = Annotation.Root({
messages: Annotation<BaseMessage[]>({
reducer: (x, y) => x.concat(y),
default: () => [],
}),
input: Annotation<string>(),
results: Annotation<any[]>({
reducer: (x, y) => x.concat(y),
default: () => [],
}),
status: Annotation<string>(),
progress: Annotation<number>({
reducer: (x, y) => y,
default: () => 0,
}),
});
// 异步操作模拟
async function simulateApiCall(
data: string,
delay: number = 1000
): Promise<string> {
return new Promise((resolve, reject) => {
setTimeout(() => {
if (Math.random() < 0.1) {
reject(new Error(`API call failed for: ${data}`));
} else {
resolve(`API result for: ${data}`);
}
}, delay);
});
}
async function simulateDbQuery(query: string): Promise<any[]> {
return new Promise((resolve) => {
setTimeout(() => {
resolve([
{ id: 1, data: `Result 1 for ${query}` },
{ id: 2, data: `Result 2 for ${query}` },
]);
}, 500);
});
}
// ❌ 错误示例:阻塞式异步处理
async function badBlockingAsyncNode(state: typeof StateAnnotation.State) {
const { input } = state;
// 串行处理,效率低下
const result1 = await simulateApiCall(input, 2000);
const result2 = await simulateApiCall(result1, 2000);
const result3 = await simulateApiCall(result2, 2000);
return {
results: [result1, result2, result3],
status: 'completed',
};
}
// ✅ 正确示例:高效的异步节点设计
/**
* 并行异步处理节点
*/
async function parallelAsyncNode(state: typeof StateAnnotation.State) {
const { input } = state;
try {
// 并行执行多个异步操作
const [apiResult, dbResult, processedData] = await Promise.all([
simulateApiCall(input),
simulateDbQuery(input),
processDataAsync(input),
]);
return {
results: [
{ type: 'api', data: apiResult },
{ type: 'db', data: dbResult },
{ type: 'processed', data: processedData },
],
status: 'parallel_completed',
progress: 100,
};
} catch (error) {
return {
status: 'error',
results: [{ type: 'error', data: error.message }],
};
}
}
/**
* 带超时控制的异步节点
*/
async function timeoutAsyncNode(state: typeof StateAnnotation.State) {
const { input } = state;
const timeout = 5000; // 5秒超时
try {
const result = await Promise.race([
simulateApiCall(input, 3000),
new Promise<never>((_, reject) =>
setTimeout(() => reject(new Error('Operation timeout')), timeout)
),
]);
return {
results: [{ type: 'success', data: result }],
status: 'completed',
};
} catch (error) {
if (error.message === 'Operation timeout') {
return {
results: [{ type: 'timeout', data: 'Operation timed out' }],
status: 'timeout',
};
}
return {
results: [{ type: 'error', data: error.message }],
status: 'error',
};
}
}
/**
* 流式异步处理节点
*/
async function streamingAsyncNode(state: typeof StateAnnotation.State) {
const { input } = state;
const results: any[] = [];
// 模拟流式处理
const items = input.split(' ');
for (let i = 0; i < items.length; i++) {
try {
const result = await simulateApiCall(items[i], 200);
results.push({ index: i, item: items[i], result });
// 更新进度
const progress = Math.round(((i + 1) / items.length) * 100);
// 在实际应用中,这里可以发送进度更新事件
console.log(`Progress: ${progress}%`);
} catch (error) {
results.push({ index: i, item: items[i], error: error.message });
}
}
return {
results,
status: 'streaming_completed',
progress: 100,
};
}
/**
* 批处理异步节点
*/
async function batchAsyncNode(state: typeof StateAnnotation.State) {
const { input } = state;
const batchSize = 3;
const items = input.split(' ');
const allResults: any[] = [];
// 分批处理
for (let i = 0; i < items.length; i += batchSize) {
const batch = items.slice(i, i + batchSize);
try {
// 并行处理当前批次
const batchResults = await Promise.all(
batch.map(async (item, index) => {
try {
const result = await simulateApiCall(item, 300);
return { item, result, success: true };
} catch (error) {
return { item, error: error.message, success: false };
}
})
);
allResults.push(...batchResults);
// 更新进度
const progress = Math.round(
(Math.min(i + batchSize, items.length) / items.length) * 100
);
console.log(`Batch progress: ${progress}%`);
} catch (error) {
// 批次失败处理
allResults.push(
...batch.map((item) => ({
item,
error: 'Batch processing failed',
success: false,
}))
);
}
}
return {
results: allResults,
status: 'batch_completed',
progress: 100,
};
}
/**
* 重试机制异步节点
*/
async function retryAsyncNode(state: typeof StateAnnotation.State) {
const { input } = state;
const maxRetries = 3;
const retryDelay = 1000;
for (let attempt = 0; attempt <= maxRetries; attempt++) {
try {
const result = await simulateApiCall(input);
return {
results: [
{
data: result,
attempt: attempt + 1,
success: true,
},
],
status: 'retry_success',
};
} catch (error) {
if (attempt === maxRetries) {
// 最后一次尝试失败
return {
results: [
{
error: error.message,
attempts: attempt + 1,
success: false,
},
],
status: 'retry_failed',
};
}
// 等待后重试
await new Promise((resolve) =>
setTimeout(resolve, retryDelay * Math.pow(2, attempt))
);
}
}
return {
results: [{ error: 'Unexpected retry failure' }],
status: 'error',
};
}
/**
* 条件异步节点
*/
async function conditionalAsyncNode(state: typeof StateAnnotation.State) {
const { input, status } = state;
// 根据状态决定异步操作类型
if (status === 'fast_mode') {
// 快速模式:并行处理
const results = await Promise.all([
simulateApiCall(input, 100),
simulateApiCall(input + '_fast', 100),
]);
return {
results: results.map((r) => ({ type: 'fast', data: r })),
status: 'fast_completed',
};
} else if (status === 'thorough_mode') {
// 详细模式:串行处理
const results = [];
for (const suffix of ['_detailed', '_analyzed', '_verified']) {
const result = await simulateApiCall(input + suffix, 500);
results.push({ type: 'thorough', data: result });
}
return {
results,
status: 'thorough_completed',
};
} else {
// 默认模式
const result = await simulateApiCall(input);
return {
results: [{ type: 'default', data: result }],
status: 'default_completed',
};
}
}
/**
* 资源池管理异步节点
*/
class ResourcePool {
private pool: Promise<any>[] = [];
private maxConcurrency: number;
constructor(maxConcurrency: number = 5) {
this.maxConcurrency = maxConcurrency;
}
async execute<T>(task: () => Promise<T>): Promise<T> {
// 等待有可用资源
while (this.pool.length >= this.maxConcurrency) {
await Promise.race(this.pool);
this.pool = this.pool.filter((p) => p !== Promise.race(this.pool));
}
const taskPromise = task().finally(() => {
const index = this.pool.indexOf(taskPromise);
if (index > -1) {
this.pool.splice(index, 1);
}
});
this.pool.push(taskPromise);
return taskPromise;
}
}
async function resourcePoolAsyncNode(state: typeof StateAnnotation.State) {
const { input } = state;
const pool = new ResourcePool(3); // 最大3个并发
const items = input.split(' ');
const results = await Promise.all(
items.map((item) =>
pool
.execute(() => simulateApiCall(item, 1000))
.then((result) => ({ item, result, success: true }))
.catch((error) => ({ item, error: error.message, success: false }))
)
);
return {
results,
status: 'pool_completed',
};
}
// 辅助函数
async function processDataAsync(data: string): Promise<string> {
return new Promise((resolve) => {
setTimeout(() => {
resolve(`Processed: ${data.toUpperCase()}`);
}, 300);
});
}
// 使用示例
export function createAsyncNodeGraph() {
const { StateGraph } = require('@langchain/langgraph');
const graph = new StateGraph(StateAnnotation)
.addNode('parallel_async', parallelAsyncNode)
.addNode('timeout_async', timeoutAsyncNode)
.addNode('streaming_async', streamingAsyncNode)
.addNode('batch_async', batchAsyncNode)
.addNode('retry_async', retryAsyncNode)
.addNode('conditional_async', conditionalAsyncNode)
.addNode('resource_pool_async', resourcePoolAsyncNode)
.addConditionalEdges(
'parallel_async',
(state) => {
if (state.status === 'error') {
return 'retry_async';
}
return 'timeout_async';
},
{
retry_async: 'retry_async',
timeout_async: 'timeout_async',
}
)
.addEdge('timeout_async', 'streaming_async')
.addEdge('streaming_async', 'batch_async')
.addEdge('batch_async', 'conditional_async')
.addEdge('conditional_async', 'resource_pool_async')
.addEdge('retry_async', 'resource_pool_async')
.addEdge('resource_pool_async', '__end__')
.setEntryPoint('parallel_async');
return graph.compile();
}
// 异步节点设计最佳实践
export const ASYNC_NODE_BEST_PRACTICES = {
parallelization: '尽可能并行执行独立的异步操作',
timeout: '为异步操作设置合理的超时时间',
error_handling: '妥善处理异步操作中的错误',
progress_tracking: '提供异步操作的进度反馈',
resource_management: '控制并发数量,避免资源耗尽',
retry_strategy: '实现智能的重试机制',
cancellation: '支持异步操作的取消',
streaming: '对于大量数据,考虑流式处理',
};
/*
执行结果:
Line:8 🌭 path.resolve(__dirname, '.env') /Users/loock/myFile/langgraphjs-tutorial/websites/examples/utils/.env
*/
2. 并发控制
当需要处理多个异步操作时,合理控制并发数量。
并发控制
/**
* ============================================================================
* 并发控制 - Concurrency Control
* ============================================================================
*
* 📖 概述
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* 本文件展示如何在节点中实现并发控制,限制并发请求数量,防止资源耗尽。
*
* 🎯 核心功能
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* 1️⃣ 节点实现:展示节点函数的标准实现模式
* 2️⃣ 状态处理:正确读取和更新状态
* 3️⃣ 错误处理:优雅处理节点执行中的错误
* 4️⃣ 最佳实践:展示节点设计的最佳实践和模式
*
* 💡 实现思路
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* • 节点函数接收状态作为参数,返回状态更新
* • 使用async/await处理异步操作
* • 通过try-catch捕获和处理错误
* • 返回部分状态更新,由reducer合并
*
* 🚀 使用方式
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* // 运行示例
* $ npx esno 最佳实践/节点设计/concurrency-control.ts
*
* ⚠️ 注意事项
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* • 节点应该是纯函数或只有可控的副作用
* • 避免在节点中直接修改状态对象
* • 合理处理异步操作和错误
* • 保持节点职责单一,易于测试和维护
* • 使用TypeScript类型确保类型安全
*
* @author 程哥
* @version 1.0.0
* @updated 2025-11
*/
import '../../utils/loadEnv';
import { Annotation, StateGraph } from '@langchain/langgraph';
import { BaseMessage, HumanMessage, AIMessage } from '@langchain/core/messages';
// 状态定义
const StateAnnotation = Annotation.Root({
messages: Annotation<BaseMessage[]>({
reducer: (current, update) => [...current, ...update],
default: () => [],
}),
concurrentTasks: Annotation<
Array<{
id: string;
status: 'pending' | 'running' | 'completed' | 'failed';
result?: any;
error?: string;
}>
>({
reducer: (current, update) => [...current, ...update],
default: () => [],
}),
maxConcurrency: Annotation<number>({
reducer: (_, update) => update,
default: () => 3,
}),
activeTaskCount: Annotation<number>({
reducer: (_, update) => update,
default: () => 0,
}),
});
// 并发控制器
class ConcurrencyController {
private maxConcurrency: number;
private activeTasks: Set<string> = new Set();
private taskQueue: Array<() => Promise<any>> = [];
constructor(maxConcurrency: number = 3) {
this.maxConcurrency = maxConcurrency;
}
// 添加任务到队列
async addTask<T>(taskId: string, task: () => Promise<T>): Promise<T> {
return new Promise((resolve, reject) => {
const wrappedTask = async () => {
try {
this.activeTasks.add(taskId);
const result = await task();
resolve(result);
} catch (error) {
reject(error);
} finally {
this.activeTasks.delete(taskId);
this.processQueue();
}
};
if (this.activeTasks.size < this.maxConcurrency) {
wrappedTask();
} else {
this.taskQueue.push(wrappedTask);
}
});
}
// 处理队列中的任务
private processQueue() {
while (
this.taskQueue.length > 0 &&
this.activeTasks.size < this.maxConcurrency
) {
const task = this.taskQueue.shift();
if (task) {
task();
}
}
}
// 获取当前活跃任务数
getActiveTaskCount(): number {
return this.activeTasks.size;
}
// 获取队列中等待的任务数
getQueuedTaskCount(): number {
return this.taskQueue.length;
}
}
// 全局并发控制器实例
const globalConcurrencyController = new ConcurrencyController(3);
// 带并发控制的节点装饰器
function withConcurrencyControl<T extends Record<string, any>>(
nodeFunction: (state: T) => Promise<Partial<T>>,
taskId?: string
) {
return async (state: T): Promise<Partial<T>> => {
const id = taskId || `task-${Date.now()}-${Math.random()}`;
return globalConcurrencyController.addTask(id, () => nodeFunction(state));
};
}
// 模拟异步任务
async function simulateAsyncTask(
duration: number,
taskName: string
): Promise<string> {
console.log(`开始执行任务: ${taskName}`);
await new Promise((resolve) => setTimeout(resolve, duration));
console.log(`完成任务: ${taskName}`);
return `${taskName} 完成`;
}
// 并发处理节点
const concurrentProcessingNode = withConcurrencyControl(
async (state: typeof StateAnnotation.State) => {
const taskResults = await Promise.allSettled([
simulateAsyncTask(1000, '数据处理'),
simulateAsyncTask(1500, '文件上传'),
simulateAsyncTask(800, '缓存更新'),
]);
const completedTasks = taskResults.map((result, index) => ({
id: `task-${index}`,
status:
result.status === 'fulfilled'
? ('completed' as const)
: ('failed' as const),
result: result.status === 'fulfilled' ? result.value : undefined,
error: result.status === 'rejected' ? result.reason?.message : undefined,
}));
return {
concurrentTasks: completedTasks,
messages: [new AIMessage('并发任务处理完成')],
};
},
'concurrent-processing'
);
// 批量处理节点
const batchProcessingNode = withConcurrencyControl(
async (state: typeof StateAnnotation.State) => {
const batchSize = 5;
const items = Array.from({ length: 20 }, (_, i) => `item-${i}`);
const batches = [];
// 分批处理
for (let i = 0; i < items.length; i += batchSize) {
batches.push(items.slice(i, i + batchSize));
}
const batchResults = await Promise.allSettled(
batches.map(async (batch, batchIndex) => {
await simulateAsyncTask(500, `批次-${batchIndex}`);
return batch.map((item) => `处理完成: ${item}`);
})
);
const allResults = batchResults
.filter((result) => result.status === 'fulfilled')
.flatMap((result) => (result as PromiseFulfilledResult<string[]>).value);
return {
messages: [
new AIMessage(`批量处理完成,处理了 ${allResults.length} 个项目`),
],
};
},
'batch-processing'
);
// 限流节点
class RateLimiter {
private tokens: number;
private maxTokens: number;
private refillRate: number;
private lastRefill: number;
constructor(maxTokens: number, refillRate: number) {
this.maxTokens = maxTokens;
this.tokens = maxTokens;
this.refillRate = refillRate;
this.lastRefill = Date.now();
}
async acquire(): Promise<void> {
this.refill();
if (this.tokens > 0) {
this.tokens--;
return;
}
// 等待令牌补充
const waitTime = (1 / this.refillRate) * 1000;
await new Promise((resolve) => setTimeout(resolve, waitTime));
return this.acquire();
}
private refill() {
const now = Date.now();
const timePassed = (now - this.lastRefill) / 1000;
const tokensToAdd = Math.floor(timePassed * this.refillRate);
if (tokensToAdd > 0) {
this.tokens = Math.min(this.maxTokens, this.tokens + tokensToAdd);
this.lastRefill = now;
}
}
}
const rateLimiter = new RateLimiter(10, 2); // 每秒2个令牌,最多10个
function withRateLimit<T extends Record<string, any>>(
nodeFunction: (state: T) => Promise<Partial<T>>
) {
return async (state: T): Promise<Partial<T>> => {
await rateLimiter.acquire();
return nodeFunction(state);
};
}
// 带限流的API调用节点
const rateLimitedApiNode = withRateLimit(
async (state: typeof StateAnnotation.State) => {
// 模拟API调用
await simulateAsyncTask(200, 'API调用');
return {
messages: [new AIMessage('API调用完成(已限流)')],
};
}
);
// 超时控制装饰器
function withTimeout<T extends Record<string, any>>(
nodeFunction: (state: T) => Promise<Partial<T>>,
timeoutMs: number
) {
return async (state: T): Promise<Partial<T>> => {
const timeoutPromise = new Promise<never>((_, reject) => {
setTimeout(
() => reject(new Error(`节点执行超时 (${timeoutMs}ms)`)),
timeoutMs
);
});
try {
return await Promise.race([nodeFunction(state), timeoutPromise]);
} catch (error) {
console.error('节点执行超时:', error);
return {
messages: [
new AIMessage(
`节点执行超时: ${
error instanceof Error ? error.message : '未知错误'
}`
),
],
} as Partial<T>;
}
};
}
// 带超时的慢节点
const slowNodeWithTimeout = withTimeout(
async (state: typeof StateAnnotation.State) => {
// 模拟慢操作
await simulateAsyncTask(5000, '慢操作');
return {
messages: [new AIMessage('慢操作完成')],
};
},
3000 // 3秒超时
);
// 重试机制装饰器
function withRetry<T extends Record<string, any>>(
nodeFunction: (state: T) => Promise<Partial<T>>,
maxRetries: number = 3,
retryDelay: number = 1000
) {
return async (state: T): Promise<Partial<T>> => {
let lastError: Error | null = null;
for (let attempt = 0; attempt <= maxRetries; attempt++) {
try {
return await nodeFunction(state);
} catch (error) {
lastError = error instanceof Error ? error : new Error(String(error));
if (attempt < maxRetries) {
console.log(`节点执行失败,第 ${attempt + 1} 次重试...`);
await new Promise((resolve) =>
setTimeout(resolve, retryDelay * Math.pow(2, attempt))
);
}
}
}
throw lastError;
};
}
// 带重试的不稳定节点
const unstableNodeWithRetry = withRetry(
async (state: typeof StateAnnotation.State) => {
// 模拟不稳定的操作(30%成功率)
if (Math.random() < 0.3) {
return {
messages: [new AIMessage('不稳定操作成功')],
};
} else {
throw new Error('操作失败');
}
},
3,
500
);
// 组合多个装饰器的节点
const robustNode = withConcurrencyControl(
withTimeout(
withRetry(
async (state: typeof StateAnnotation.State) => {
await simulateAsyncTask(1000, '复杂操作');
return {
messages: [new AIMessage('复杂操作完成(带并发控制、超时和重试)')],
};
},
2,
1000
),
5000
),
'robust-operation'
);
// 创建图
function createConcurrencyControlGraph() {
const graph = new StateGraph(StateAnnotation)
.addNode('concurrent', concurrentProcessingNode)
.addNode('batch', batchProcessingNode)
.addNode('rateLimited', rateLimitedApiNode)
.addNode('timeout', slowNodeWithTimeout)
.addNode('retry', unstableNodeWithRetry)
.addNode('robust', robustNode)
.addEdge('__start__', 'concurrent')
.addEdge('concurrent', 'batch')
.addEdge('batch', 'rateLimited')
.addEdge('rateLimited', 'timeout')
.addEdge('timeout', 'retry')
.addEdge('retry', 'robust')
.addEdge('robust', '__end__');
return graph.compile();
}
// 使用示例
async function runConcurrencyExample() {
const graph = createConcurrencyControlGraph();
try {
const result = await graph.invoke({
messages: [new HumanMessage('开始并发控制测试')],
});
console.log('最终结果:', result);
} catch (error) {
console.error('执行失败:', error);
}
}
// 如果直接运行此文件
if (require.main === module) {
runConcurrencyExample().catch(console.error);
}
export {
StateAnnotation,
ConcurrencyController,
withConcurrencyControl,
withRateLimit,
withTimeout,
withRetry,
RateLimiter,
createConcurrencyControlGraph,
runConcurrencyExample,
};
/*
执行结果:
Line:8 🌭 path.resolve(__dirname, '.env') /Users/loock/myFile/langgraphjs-tutorial/websites/examples/utils/.env
开始执行任务: 数据处理
开始执行任务: 文件上传
开始执行任务: 缓存更新
完成任务: 缓存更新
完成任务: 数据处理
完成任务: 文件上传
开始执行任务: 批次-0
开始执行任务: 批次-1
开始执行任务: 批次-2
开始执行任务: 批次-3
完成任务: 批次-0
完成任务: 批次-1
完成任务: 批次-2
完成任务: 批次-3
开始执行任务: API调用
完成任务: API调用
开始执行任务: 慢操作
节点执行失败,第 1 次重试...
节点执行失败,第 2 次重试...
节点执行失败,第 3 次重试...
完成任务: 慢操作
开始执行任务: 复杂操作
完成任务: 复杂操作
最终结果: {
messages: [
HumanMessage {
"content": "开始并发控制测试",
"additional_kwargs": {},
"response_metadata": {}
},
AIMessage {
"content": "并发任务处理完成",
"additional_kwargs": {},
"response_metadata": {},
"tool_calls": [],
"invalid_tool_calls": []
},
AIMessage {
"content": "批量处理完成,处理了 20 个项目",
"additional_kwargs": {},
"response_metadata": {},
"tool_calls": [],
"invalid_tool_calls": []
},
AIMessage {
"content": "API调用完成(已限流)",
"additional_kwargs": {},
"response_metadata": {},
"tool_calls": [],
"invalid_tool_calls": []
},
AIMessage {
"content": "节点执行超时: 节点执行超时 (3000ms)",
"additional_kwargs": {},
"response_metadata": {},
"tool_calls": [],
"invalid_tool_calls": []
},
AIMessage {
"content": "不稳定操作成功",
"additional_kwargs": {},
"response_metadata": {},
"tool_calls": [],
"invalid_tool_calls": []
},
AIMessage {
"content": "复杂操作完成(带并发控制、超时和重试)",
"additional_kwargs": {},
"response_metadata": {},
"tool_calls": [],
"invalid_tool_calls": []
}
],
concurrentTasks: [
{
id: 'task-0',
status: 'completed',
result: '数据处理 完成',
error: undefined
},
{
id: 'task-1',
status: 'completed',
result: '文件上传 完成',
error: undefined
},
{
id: 'task-2',
status: 'completed',
result: '缓存更新 完成',
error: undefined
}
],
maxConcurrency: 3,
activeTaskCount: 0
}
*/
🧪 可测试性设计
1. 依赖注入
通过依赖注入提高节点的可测试性。
依赖注入
/**
* ============================================================================
* 依赖注入节点 - Dependency Injection
* ============================================================================
*
* 📖 概述
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* 本文件展示如何在节点中使用依赖注入模式,提高可测试性和模块解耦。
*
* 🎯 核心功能
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* 1️⃣ 节点实现:展示节点函数的标准实现模式
* 2️⃣ 状态处理:正确读取和更新状态
* 3️⃣ 错误处理:优雅处理节点执行中的错误
* 4️⃣ 最佳实践:展示节点设计的最佳实践和模式
*
* 💡 实现思路
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* • 节点函数接收状态作为参数,返回状态更新
* • 使用async/await处理异步操作
* • 通过try-catch捕获和处理错误
* • 返回部分状态更新,由reducer合并
*
* 🚀 使用方式
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* // 运行示例
* $ npx esno 最佳实践/节点设计/dependency-injection.ts
*
* ⚠️ 注意事项
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* • 节点应该是纯函数或只有可控的副作用
* • 避免在节点中直接修改状态对象
* • 合理处理异步操作和错误
* • 保持节点职责单一,易于测试和维护
* • 使用TypeScript类型确保类型安全
*
* @author 程哥
* @version 1.0.0
* @updated 2025-11
*/
import '../../utils/loadEnv';
import { Annotation, StateGraph } from '@langchain/langgraph';
import { BaseMessage, HumanMessage, AIMessage } from '@langchain/core/messages';
// 定义状态
const StateAnnotation = Annotation.Root({
messages: Annotation<BaseMessage[]>({
reducer: (x, y) => x.concat(y),
default: () => [],
}),
result: Annotation<string>({
reducer: (x, y) => y,
default: () => '',
}),
});
// 1. 定义服务接口
interface DatabaseService {
save(data: any): Promise<string>;
query(id: string): Promise<any>;
}
interface ApiService {
call(endpoint: string, data: any): Promise<any>;
}
interface LoggerService {
log(level: string, message: string): void;
}
// 2. 具体实现
class PostgresService implements DatabaseService {
async save(data: any): Promise<string> {
// 模拟数据库保存
console.log('Saving to PostgreSQL:', data);
return `pg_${Date.now()}`;
}
async query(id: string): Promise<any> {
console.log('Querying PostgreSQL:', id);
return { id, data: 'sample data' };
}
}
class HttpApiService implements ApiService {
async call(endpoint: string, data: any): Promise<any> {
console.log(`Calling API ${endpoint}:`, data);
return { status: 'success', result: 'api response' };
}
}
class ConsoleLogger implements LoggerService {
log(level: string, message: string): void {
console.log(`[${level.toUpperCase()}] ${message}`);
}
}
// 3. 依赖注入容器
class ServiceContainer {
private services = new Map<string, any>();
register<T>(name: string, service: T): void {
this.services.set(name, service);
}
get<T>(name: string): T {
const service = this.services.get(name);
if (!service) {
throw new Error(`Service ${name} not found`);
}
return service;
}
}
// 4. 创建节点工厂
function createNodeWithDependencies(container: ServiceContainer) {
return {
// 数据处理节点
processData: async (state: typeof StateAnnotation.State) => {
const db = container.get<DatabaseService>('database');
const logger = container.get<LoggerService>('logger');
logger.log('info', 'Processing data...');
const lastMessage = state.messages[state.messages.length - 1];
const data = { content: lastMessage.content, timestamp: Date.now() };
try {
const id = await db.save(data);
logger.log('info', `Data saved with ID: ${id}`);
return {
messages: [new AIMessage(`Data processed and saved with ID: ${id}`)],
result: id,
};
} catch (error) {
logger.log('error', `Failed to save data: ${error}`);
return {
messages: [new AIMessage('Failed to process data')],
result: 'error',
};
}
},
// API调用节点
callExternalApi: async (state: typeof StateAnnotation.State) => {
const api = container.get<ApiService>('api');
const logger = container.get<LoggerService>('logger');
logger.log('info', 'Calling external API...');
try {
const response = await api.call('/process', {
messages: state.messages.length,
result: state.result,
});
return {
messages: [
new AIMessage(`API response: ${JSON.stringify(response)}`),
],
};
} catch (error) {
logger.log('error', `API call failed: ${error}`);
return {
messages: [new AIMessage('API call failed')],
};
}
},
};
}
// 5. 测试用的模拟服务
class MockDatabaseService implements DatabaseService {
private data = new Map<string, any>();
async save(data: any): Promise<string> {
const id = `mock_${Date.now()}`;
this.data.set(id, data);
return id;
}
async query(id: string): Promise<any> {
return this.data.get(id) || null;
}
}
class MockApiService implements ApiService {
async call(endpoint: string, data: any): Promise<any> {
return {
endpoint,
data,
mock: true,
timestamp: Date.now(),
};
}
}
class MockLogger implements LoggerService {
public logs: Array<{ level: string; message: string }> = [];
log(level: string, message: string): void {
this.logs.push({ level, message });
}
}
// 6. 使用示例
async function createProductionGraph() {
// 生产环境配置
const container = new ServiceContainer();
container.register('database', new PostgresService());
container.register('api', new HttpApiService());
container.register('logger', new ConsoleLogger());
const nodes = createNodeWithDependencies(container);
const graph = new StateGraph(StateAnnotation)
.addNode('processData', nodes.processData)
.addNode('callApi', nodes.callExternalApi)
.addEdge('__start__', 'processData')
.addEdge('processData', 'callApi')
.addEdge('callApi', '__end__');
return graph.compile();
}
async function createTestGraph() {
// 测试环境配置
const container = new ServiceContainer();
container.register('database', new MockDatabaseService());
container.register('api', new MockApiService());
container.register('logger', new MockLogger());
const nodes = createNodeWithDependencies(container);
const graph = new StateGraph(StateAnnotation)
.addNode('processData', nodes.processData)
.addNode('callApi', nodes.callExternalApi)
.addEdge('__start__', 'processData')
.addEdge('processData', 'callApi')
.addEdge('callApi', '__end__');
return { graph: graph.compile(), container };
}
// 7. 运行示例
async function runExample() {
console.log('=== 生产环境示例 ===');
const prodGraph = await createProductionGraph();
const prodResult = await prodGraph.invoke({
messages: [new HumanMessage('Process this data')],
});
console.log('Production result:', prodResult);
console.log('\n=== 测试环境示例 ===');
const { graph: testGraph, container } = await createTestGraph();
const testResult = await testGraph.invoke({
messages: [new HumanMessage('Test data processing')],
});
console.log('Test result:', testResult);
// 检查测试日志
const logger = container.get<MockLogger>('logger');
console.log('Test logs:', logger.logs);
}
// 运行示例
if (require.main === module) {
runExample().catch(console.error);
}
export {
ServiceContainer,
createNodeWithDependencies,
MockDatabaseService,
MockApiService,
MockLogger,
createProductionGraph,
createTestGraph,
};
/*
执行结果:
Line:8 🌭 path.resolve(__dirname, '.env') /Users/loock/myFile/langgraphjs-tutorial/websites/examples/utils/.env
=== 生产环境示例 ===
[INFO] Processing data...
Saving to PostgreSQL: { content: 'Process this data', timestamp: 1763296506263 }
[INFO] Data saved with ID: pg_1763296506264
[INFO] Calling external API...
Calling API /process: { messages: 2, result: 'pg_1763296506264' }
Production result: {
messages: [
HumanMessage {
"content": "Process this data",
"additional_kwargs": {},
"response_metadata": {}
},
AIMessage {
"content": "Data processed and saved with ID: pg_1763296506264",
"additional_kwargs": {},
"response_metadata": {},
"tool_calls": [],
"invalid_tool_calls": []
},
AIMessage {
"content": "API response: {\"status\":\"success\",\"result\":\"api response\"}",
"additional_kwargs": {},
"response_metadata": {},
"tool_calls": [],
"invalid_tool_calls": []
}
],
result: 'pg_1763296506264'
}
=== 测试环境示例 ===
Test result: {
messages: [
HumanMessage {
"content": "Test data processing",
"additional_kwargs": {},
"response_metadata": {}
},
AIMessage {
"content": "Data processed and saved with ID: mock_1763296506272",
"additional_kwargs": {},
"response_metadata": {},
"tool_calls": [],
"invalid_tool_calls": []
},
AIMessage {
"content": "API response: {\"endpoint\":\"/process\",\"data\":{\"messages\":2,\"result\":\"mock_1763296506272\"},\"mock\":true,\"timestamp\":1763296506273}",
"additional_kwargs": {},
"response_metadata": {},
"tool_calls": [],
"invalid_tool_calls": []
}
],
result: 'mock_1763296506272'
}
Test logs: [
{ level: 'info', message: 'Processing data...' },
{ level: 'info', message: 'Data saved with ID: mock_1763296506272' },
{ level: 'info', message: 'Calling external API...' }
]
*/
2. 模拟和存根
为外部依赖创建模拟对象,便于单元测试。
测试模拟
/**
* ============================================================================
* 节点测试Mock - Testing Mocks
* ============================================================================
*
* 📖 概述
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* 本文件展示如何为节点创建测试Mock,隔离外部依赖,编写可靠的单元测试。
*
* 🎯 核心功能
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* 1️⃣ 节点实现:展示节点函数的标准实现模式
* 2️⃣ 状态处理:正确读取和更新状态
* 3️⃣ 错误处理:优雅处理节点执行中的错误
* 4️⃣ 最佳实践:展示节点设计的最佳实践和模式
*
* 💡 实现思路
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* • 节点函数接收状态作为参数,返回状态更新
* • 使用async/await处理异步操作
* • 通过try-catch捕获和处理错误
* • 返回部分状态更新,由reducer合并
*
* 🚀 使用方式
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* // 运行示例
* $ npx esno 最佳实践/节点设计/testing-mocks.ts
*
* ⚠️ 注意事项
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* • 节点应该是纯函数或只有可控的副作用
* • 避免在节点中直接修改状态对象
* • 合理处理异步操作和错误
* • 保持节点职责单一,易于测试和维护
* • 使用TypeScript类型确保类型安全
*
* @author 程哥
* @version 1.0.0
* @updated 2025-11
*/
import '../../utils/loadEnv';
import { Annotation, StateGraph } from '@langchain/langgraph';
import { BaseMessage, HumanMessage, AIMessage } from '@langchain/core/messages';
// 定义状态
const StateAnnotation = Annotation.Root({
messages: Annotation<BaseMessage[]>({
reducer: (x, y) => x.concat(y),
default: () => [],
}),
userData: Annotation<any>({
reducer: (x, y) => ({ ...x, ...y }),
default: () => ({}),
}),
apiResponse: Annotation<any>({
reducer: (x, y) => y,
default: () => null,
}),
});
// 1. 外部服务接口
interface UserService {
getUser(id: string): Promise<{ id: string; name: string; email: string }>;
updateUser(id: string, data: any): Promise<boolean>;
}
interface EmailService {
sendEmail(to: string, subject: string, body: string): Promise<boolean>;
}
interface PaymentService {
processPayment(
amount: number,
cardToken: string
): Promise<{ success: boolean; transactionId?: string; error?: string }>;
}
// 2. 真实服务实现
class RealUserService implements UserService {
async getUser(id: string) {
// 模拟数据库查询
console.log(`Fetching user ${id} from database...`);
await new Promise((resolve) => setTimeout(resolve, 100));
return { id, name: 'John Doe', email: 'john@example.com' };
}
async updateUser(id: string, data: any) {
console.log(`Updating user ${id}:`, data);
await new Promise((resolve) => setTimeout(resolve, 50));
return true;
}
}
class RealEmailService implements EmailService {
async sendEmail(to: string, subject: string, body: string) {
console.log(`Sending email to ${to}: ${subject}`);
await new Promise((resolve) => setTimeout(resolve, 200));
return true;
}
}
class RealPaymentService implements PaymentService {
async processPayment(amount: number, cardToken: string) {
console.log(`Processing payment: $${amount} with token ${cardToken}`);
await new Promise((resolve) => setTimeout(resolve, 300));
return { success: true, transactionId: `txn_${Date.now()}` };
}
}
// 3. Mock 服务实现
class MockUserService implements UserService {
private users = new Map([
['1', { id: '1', name: 'Test User', email: 'test@example.com' }],
['2', { id: '2', name: 'Mock User', email: 'mock@example.com' }],
]);
public calls: Array<{ method: string; args: any[] }> = [];
async getUser(id: string) {
this.calls.push({ method: 'getUser', args: [id] });
const user = this.users.get(id);
if (!user) {
throw new Error(`User ${id} not found`);
}
return user;
}
async updateUser(id: string, data: any) {
this.calls.push({ method: 'updateUser', args: [id, data] });
const user = this.users.get(id);
if (user) {
Object.assign(user, data);
return true;
}
return false;
}
// 测试辅助方法
setUser(id: string, userData: any) {
this.users.set(id, userData);
}
getCallCount(method: string): number {
return this.calls.filter((call) => call.method === method).length;
}
getLastCall(method: string) {
const calls = this.calls.filter((call) => call.method === method);
return calls[calls.length - 1];
}
reset() {
this.calls = [];
}
}
class MockEmailService implements EmailService {
public sentEmails: Array<{ to: string; subject: string; body: string }> = [];
public shouldFail = false;
async sendEmail(to: string, subject: string, body: string) {
if (this.shouldFail) {
throw new Error('Email service unavailable');
}
this.sentEmails.push({ to, subject, body });
return true;
}
// 测试辅助方法
getEmailCount(): number {
return this.sentEmails.length;
}
getEmailsTo(email: string) {
return this.sentEmails.filter((e) => e.to === email);
}
reset() {
this.sentEmails = [];
this.shouldFail = false;
}
}
class MockPaymentService implements PaymentService {
public payments: Array<{ amount: number; cardToken: string; result: any }> =
[];
public shouldFail = false;
public failureReason = 'Payment failed';
async processPayment(amount: number, cardToken: string) {
const result = this.shouldFail
? { success: false, error: this.failureReason }
: { success: true, transactionId: `mock_txn_${Date.now()}` };
this.payments.push({ amount, cardToken, result });
return result;
}
// 测试辅助方法
setFailure(shouldFail: boolean, reason?: string) {
this.shouldFail = shouldFail;
if (reason) this.failureReason = reason;
}
getTotalAmount(): number {
return this.payments.reduce((sum, p) => sum + p.amount, 0);
}
reset() {
this.payments = [];
this.shouldFail = false;
}
}
// 4. 服务容器
class ServiceContainer {
private services = new Map<string, any>();
register<T>(name: string, service: T): void {
this.services.set(name, service);
}
get<T>(name: string): T {
return this.services.get(name);
}
}
// 5. 节点实现
function createNodes(container: ServiceContainer) {
return {
fetchUser: async (state: typeof StateAnnotation.State) => {
const userService = container.get<UserService>('userService');
const lastMessage = state.messages[state.messages.length - 1];
// 从消息中提取用户ID
const content =
typeof lastMessage.content === 'string' ? lastMessage.content : '';
const userId = content.match(/user (\w+)/)?.[1] || '1';
try {
const userData = await userService.getUser(userId);
return {
messages: [new AIMessage(`Found user: ${userData.name}`)],
userData,
};
} catch (error) {
return {
messages: [new AIMessage(`User not found: ${error}`)],
};
}
},
sendNotification: async (state: typeof StateAnnotation.State) => {
const emailService = container.get<EmailService>('emailService');
if (!state.userData?.email) {
return {
messages: [new AIMessage('No user email available')],
};
}
try {
await emailService.sendEmail(
state.userData.email,
'Notification',
'You have a new message'
);
return {
messages: [new AIMessage('Notification sent successfully')],
};
} catch (error) {
return {
messages: [new AIMessage(`Failed to send notification: ${error}`)],
};
}
},
processPayment: async (state: typeof StateAnnotation.State) => {
const paymentService = container.get<PaymentService>('paymentService');
// 从第一条消息(用户输入)中提取金额
const firstMessage = state.messages[0];
const content =
typeof firstMessage.content === 'string' ? firstMessage.content : '';
const amountMatch = content.match(/\$(\d+(?:\.\d+)?)/);
const amount = amountMatch ? parseFloat(amountMatch[1]) : 0;
if (amount <= 0) {
return {
messages: [new AIMessage('Invalid payment amount')],
};
}
try {
const result = await paymentService.processPayment(
amount,
'mock_token'
);
return {
messages: [
new AIMessage(
result.success
? `Payment processed: ${result.transactionId}`
: `Payment failed: ${result.error}`
),
],
apiResponse: result,
};
} catch (error) {
return {
messages: [new AIMessage(`Payment error: ${error}`)],
};
}
},
};
}
// 6. 图构建函数
function createGraph(container: ServiceContainer) {
const nodes = createNodes(container);
return new StateGraph(StateAnnotation)
.addNode('fetchUser', nodes.fetchUser)
.addNode('sendNotification', nodes.sendNotification)
.addNode('processPayment', nodes.processPayment)
.addEdge('__start__', 'fetchUser')
.addEdge('fetchUser', 'sendNotification')
.addEdge('sendNotification', 'processPayment')
.addEdge('processPayment', '__end__')
.compile();
}
// 7. 测试示例
async function testWithMocks() {
console.log('=== 使用 Mock 服务测试 ===');
// 创建 mock 服务
const mockUserService = new MockUserService();
const mockEmailService = new MockEmailService();
const mockPaymentService = new MockPaymentService();
// 设置测试数据
mockUserService.setUser('123', {
id: '123',
name: 'Test Customer',
email: 'customer@test.com',
});
// 注册服务
const container = new ServiceContainer();
container.register('userService', mockUserService);
container.register('emailService', mockEmailService);
container.register('paymentService', mockPaymentService);
const graph = createGraph(container);
// 执行测试
const result = await graph.invoke({
messages: [new HumanMessage('Process payment for user 123 amount $99.99')],
});
console.log('Result:', result);
// 验证 mock 调用
console.log('\n=== Mock 调用验证 ===');
console.log('User service calls:', mockUserService.calls);
console.log('Emails sent:', mockEmailService.sentEmails);
console.log('Payments processed:', mockPaymentService.payments);
// 断言示例
console.log('\n=== 断言检查 ===');
console.log('User fetched:', mockUserService.getCallCount('getUser') === 1);
console.log('Email sent:', mockEmailService.getEmailCount() === 1);
console.log('Payment processed:', mockPaymentService.payments.length === 1);
console.log(
'Payment amount correct:',
mockPaymentService.getTotalAmount() === 99.99
);
}
async function testWithFailures() {
console.log('\n=== 测试失败场景 ===');
const mockUserService = new MockUserService();
const mockEmailService = new MockEmailService();
const mockPaymentService = new MockPaymentService();
// 设置失败场景
mockEmailService.shouldFail = true;
mockPaymentService.setFailure(true, 'Insufficient funds');
const container = new ServiceContainer();
container.register('userService', mockUserService);
container.register('emailService', mockEmailService);
container.register('paymentService', mockPaymentService);
const graph = createGraph(container);
const result = await graph.invoke({
messages: [new HumanMessage('Process payment for user 1 amount $50.00')],
});
console.log('Failure test result:', result);
}
// 8. 生产环境示例
async function runWithRealServices() {
console.log('\n=== 生产环境示例 ===');
const container = new ServiceContainer();
container.register('userService', new RealUserService());
container.register('emailService', new RealEmailService());
container.register('paymentService', new RealPaymentService());
const graph = createGraph(container);
const result = await graph.invoke({
messages: [new HumanMessage('Process payment for user 1 amount $25.00')],
});
console.log('Production result:', result);
}
// 运行示例
async function runExample() {
await testWithMocks();
await testWithFailures();
await runWithRealServices();
}
if (require.main === module) {
runExample().catch(console.error);
}
export {
MockUserService,
MockEmailService,
MockPaymentService,
ServiceContainer,
createNodes,
createGraph,
testWithMocks,
testWithFailures,
};
/*
执行结果:
Line:8 🌭 path.resolve(__dirname, '.env') /Users/loock/myFile/langgraphjs-tutorial/websites/examples/utils/.env
=== 使用 Mock 服务测试 ===
Result: {
messages: [
HumanMessage {
"content": "Process payment for user 123 amount $99.99",
"additional_kwargs": {},
"response_metadata": {}
},
AIMessage {
"content": "Found user: Test Customer",
"additional_kwargs": {},
"response_metadata": {},
"tool_calls": [],
"invalid_tool_calls": []
},
AIMessage {
"content": "Notification sent successfully",
"additional_kwargs": {},
"response_metadata": {},
"tool_calls": [],
"invalid_tool_calls": []
},
AIMessage {
"content": "Payment processed: mock_txn_1763376288628",
"additional_kwargs": {},
"response_metadata": {},
"tool_calls": [],
"invalid_tool_calls": []
}
],
userData: { id: '123', name: 'Test Customer', email: 'customer@test.com' },
apiResponse: { success: true, transactionId: 'mock_txn_1763376288628' }
}
=== Mock 调用验证 ===
User service calls: [ { method: 'getUser', args: [ '123' ] } ]
Emails sent: [
{
to: 'customer@test.com',
subject: 'Notification',
body: 'You have a new message'
}
]
Payments processed: [
{
amount: 99.99,
cardToken: 'mock_token',
result: { success: true, transactionId: 'mock_txn_1763376288628' }
}
]
=== 断言检查 ===
User fetched: true
Email sent: true
Payment processed: true
Payment amount correct: true
=== 测试失败场景 ===
Failure test result: {
messages: [
HumanMessage {
"content": "Process payment for user 1 amount $50.00",
"additional_kwargs": {},
"response_metadata": {}
},
AIMessage {
"content": "Found user: Test User",
"additional_kwargs": {},
"response_metadata": {},
"tool_calls": [],
"invalid_tool_calls": []
},
AIMessage {
"content": "Failed to send notification: Error: Email service unavailable",
"additional_kwargs": {},
"response_metadata": {},
"tool_calls": [],
"invalid_tool_calls": []
},
AIMessage {
"content": "Payment failed: Insufficient funds",
"additional_kwargs": {},
"response_metadata": {},
"tool_calls": [],
"invalid_tool_calls": []
}
],
userData: { id: '1', name: 'Test User', email: 'test@example.com' },
apiResponse: { success: false, error: 'Insufficient funds' }
}
=== 生产环境示例 ===
Fetching user 1 from database...
Sending email to john@example.com: Notification
Processing payment: $25 with token mock_token
Production result: {
messages: [
HumanMessage {
"content": "Process payment for user 1 amount $25.00",
"additional_kwargs": {},
"response_metadata": {}
},
AIMessage {
"content": "Found user: John Doe",
"additional_kwargs": {},
"response_metadata": {},
"tool_calls": [],
"invalid_tool_calls": []
},
AIMessage {
"content": "Notification sent successfully",
"additional_kwargs": {},
"response_metadata": {},
"tool_calls": [],
"invalid_tool_calls": []
},
AIMessage {
"content": "Payment processed: txn_1763376289254",
"additional_kwargs": {},
"response_metadata": {},
"tool_calls": [],
"invalid_tool_calls": []
}
],
userData: { id: '1', name: 'John Doe', email: 'john@example.com' },
apiResponse: { success: true, transactionId: 'txn_1763376289254' }
}
*/
📊 性能优化
1. 缓存策略
对计算结果进行缓存,避免重复计算。
缓存优化
/**
* ============================================================================
* 节点缓存 - Node Caching
* ============================================================================
*
* 📖 概述
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* 本文件展示如何在节点中实现缓存机制,避免重复计算,提高性能,包括内存缓存和持久化缓存。
*
* 🎯 核心功能
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* 1️⃣ 节点实现:展示节点函数的标准实现模式
* 2️⃣ 状态处理:正确读取和更新状态
* 3️⃣ 错误处理:优雅处理节点执行中的错误
* 4️⃣ 最佳实践:展示节点设计的最佳实践和模式
*
* 💡 实现思路
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* • 节点函数接收状态作为参数,返回状态更新
* • 使用async/await处理异步操作
* • 通过try-catch捕获和处理错误
* • 返回部分状态更新,由reducer合并
*
* 🚀 使用方式
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* // 运行示例
* $ npx esno 最佳实践/节点设计/caching.ts
*
* ⚠️ 注意事项
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* • 节点应该是纯函数或只有可控的副作用
* • 避免在节点中直接修改状态对象
* • 合理处理异步操作和错误
* • 保持节点职责单一,易于测试和维护
* • 使用TypeScript类型确保类型安全
*
* @author 程哥
* @version 1.0.0
* @updated 2025-11
*/
import '../../utils/loadEnv';
import { Annotation, StateGraph } from '@langchain/langgraph';
import { BaseMessage, HumanMessage, AIMessage } from '@langchain/core/messages';
// 定义状态
const StateAnnotation = Annotation.Root({
messages: Annotation<BaseMessage[]>({
reducer: (x, y) => x.concat(y),
default: () => [],
}),
query: Annotation<string>({
reducer: (x, y) => y,
default: () => '',
}),
result: Annotation<any>({
reducer: (x, y) => y,
default: () => null,
}),
cacheKey: Annotation<string>({
reducer: (x, y) => y,
default: () => '',
}),
});
// 1. 简单内存缓存
class MemoryCache {
private cache = new Map<
string,
{ data: any; timestamp: number; ttl: number }
>();
set(key: string, data: any, ttlMs: number = 300000): void {
// 默认5分钟
this.cache.set(key, {
data,
timestamp: Date.now(),
ttl: ttlMs,
});
}
get(key: string): any | null {
const entry = this.cache.get(key);
if (!entry) return null;
const now = Date.now();
if (now - entry.timestamp > entry.ttl) {
this.cache.delete(key);
return null;
}
return entry.data;
}
has(key: string): boolean {
return this.get(key) !== null;
}
clear(): void {
this.cache.clear();
}
size(): number {
// 清理过期条目
const now = Date.now();
for (const [key, entry] of this.cache.entries()) {
if (now - entry.timestamp > entry.ttl) {
this.cache.delete(key);
}
}
return this.cache.size;
}
}
// 2. LRU 缓存实现
class LRUCache {
private cache = new Map<string, { data: any; timestamp: number }>();
private maxSize: number;
constructor(maxSize: number = 100) {
this.maxSize = maxSize;
}
set(key: string, data: any): void {
// 如果已存在,先删除
if (this.cache.has(key)) {
this.cache.delete(key);
}
// 如果达到最大容量,删除最旧的条目
if (this.cache.size >= this.maxSize) {
const firstKey = this.cache.keys().next().value;
this.cache.delete(firstKey);
}
this.cache.set(key, { data, timestamp: Date.now() });
}
get(key: string): any | null {
const entry = this.cache.get(key);
if (!entry) return null;
// 重新插入以更新顺序
this.cache.delete(key);
this.cache.set(key, entry);
return entry.data;
}
has(key: string): boolean {
return this.cache.has(key);
}
clear(): void {
this.cache.clear();
}
size(): number {
return this.cache.size;
}
}
// 3. 缓存装饰器
function withCache(
cache: MemoryCache,
keyGenerator: (state: any) => string,
ttl?: number
) {
return function (nodeFunction: Function) {
return async function (state: any) {
const cacheKey = keyGenerator(state);
// 尝试从缓存获取
const cached = cache.get(cacheKey);
if (cached) {
console.log(`Cache hit for key: ${cacheKey}`);
return {
...cached,
messages: [...state.messages, new AIMessage('(从缓存返回)')],
};
}
// 缓存未命中,执行原函数
console.log(`Cache miss for key: ${cacheKey}`);
const result = await nodeFunction(state);
// 存储到缓存
cache.set(cacheKey, result, ttl);
return result;
};
};
}
// 4. 全局缓存实例
const globalCache = new MemoryCache();
const lruCache = new LRUCache(50);
// 5. 缓存节点示例
const cachedNodes = {
// 数据库查询节点(使用TTL缓存)
databaseQuery: withCache(
globalCache,
(state) => `db_query_${state.query}`,
60000 // 1分钟TTL
)(async (state: typeof StateAnnotation.State) => {
// 模拟数据库查询
console.log('执行数据库查询...');
await new Promise((resolve) => setTimeout(resolve, 1000));
const mockData = {
id: Math.floor(Math.random() * 1000),
name: `User_${state.query}`,
email: `${state.query}@example.com`,
};
return {
result: mockData,
messages: [new AIMessage(`查询到用户: ${mockData.name}`)],
};
}),
// API调用节点(使用LRU缓存)
apiCall: async (state: typeof StateAnnotation.State) => {
const cacheKey = `api_${state.query}`;
// 检查LRU缓存
const cached = lruCache.get(cacheKey);
if (cached) {
console.log(`LRU Cache hit for: ${cacheKey}`);
return {
...cached,
messages: [...state.messages, new AIMessage('(从LRU缓存返回)')],
};
}
// 模拟API调用
console.log('执行API调用...');
await new Promise((resolve) => setTimeout(resolve, 800));
const result = {
result: { status: 'success', data: `API response for ${state.query}` },
messages: [new AIMessage(`API调用成功: ${state.query}`)],
};
// 存储到LRU缓存
lruCache.set(cacheKey, result);
return result;
},
// 计算密集型节点(条件缓存)
heavyComputation: async (state: typeof StateAnnotation.State) => {
const input = state.query;
const cacheKey = `compute_${input}`;
// 只对复杂查询使用缓存
const shouldCache = input.length > 10 || /complex|heavy|slow/.test(input);
if (shouldCache) {
const cached = globalCache.get(cacheKey);
if (cached) {
console.log(`Conditional cache hit: ${cacheKey}`);
return {
...cached,
messages: [...state.messages, new AIMessage('(从条件缓存返回)')],
};
}
}
// 模拟重计算
console.log('执行重计算...');
const startTime = Date.now();
await new Promise((resolve) => setTimeout(resolve, 1500));
const endTime = Date.now();
const result = {
result: {
computation: `Result for ${input}`,
executionTime: endTime - startTime,
},
messages: [new AIMessage(`计算完成,耗时: ${endTime - startTime}ms`)],
};
// 条件存储到缓存
if (shouldCache) {
globalCache.set(cacheKey, result, 600000); // 10分钟TTL
}
return result;
},
};
// 6. 缓存管理节点
const cacheManagement = {
// 缓存统计
getCacheStats: async (state: typeof StateAnnotation.State) => {
const stats = {
memoryCache: {
size: globalCache.size(),
type: 'TTL Cache',
},
lruCache: {
size: lruCache.size(),
type: 'LRU Cache',
},
};
return {
result: stats,
messages: [new AIMessage(`缓存统计: ${JSON.stringify(stats, null, 2)}`)],
};
},
// 清理缓存
clearCache: async (state: typeof StateAnnotation.State) => {
const beforeSize = globalCache.size() + lruCache.size();
globalCache.clear();
lruCache.clear();
return {
messages: [new AIMessage(`已清理缓存,清理前大小: ${beforeSize}`)],
};
},
// 预热缓存
warmupCache: async (state: typeof StateAnnotation.State) => {
const commonQueries = ['user1', 'user2', 'admin', 'guest'];
for (const query of commonQueries) {
const mockState = { ...state, query };
await cachedNodes.databaseQuery(mockState);
}
return {
messages: [
new AIMessage(`缓存预热完成,预热了 ${commonQueries.length} 个查询`),
],
};
},
};
// 7. 构建图
function createCachedGraph() {
return new StateGraph(StateAnnotation)
.addNode('dbQuery', cachedNodes.databaseQuery)
.addNode('apiCall', cachedNodes.apiCall)
.addNode('heavyCompute', cachedNodes.heavyComputation)
.addNode('cacheStats', cacheManagement.getCacheStats)
.addEdge('__start__', 'dbQuery')
.addEdge('dbQuery', 'apiCall')
.addEdge('apiCall', 'heavyCompute')
.addEdge('heavyCompute', 'cacheStats')
.addEdge('cacheStats', '__end__')
.compile();
}
// 8. 测试示例
async function testCaching() {
console.log('=== 缓存测试 ===');
const graph = createCachedGraph();
// 第一次执行
console.log('\n--- 第一次执行 ---');
const result1 = await graph.invoke({
messages: [new HumanMessage('测试缓存功能')],
query: 'test_user',
});
console.log('第一次结果:', result1.result);
// 第二次执行(应该命中缓存)
console.log('\n--- 第二次执行 ---');
const result2 = await graph.invoke({
messages: [new HumanMessage('再次测试缓存')],
query: 'test_user',
});
console.log('第二次结果:', result2.result);
// 测试不同查询
console.log('\n--- 不同查询 ---');
const result3 = await graph.invoke({
messages: [new HumanMessage('测试新查询')],
query: 'another_user',
});
console.log('新查询结果:', result3.result);
}
async function testCacheManagement() {
console.log('\n=== 缓存管理测试 ===');
// 预热缓存
await cacheManagement.warmupCache({
messages: [],
query: '',
result: null,
cacheKey: '',
});
// 查看缓存统计
const stats = await cacheManagement.getCacheStats({
messages: [],
query: '',
result: null,
cacheKey: '',
});
console.log('缓存统计:', stats.result);
// 清理缓存
await cacheManagement.clearCache({
messages: [],
query: '',
result: null,
cacheKey: '',
});
}
// 运行示例
async function runExample() {
await testCaching();
await testCacheManagement();
}
if (require.main === module) {
runExample().catch(console.error);
}
export {
MemoryCache,
LRUCache,
withCache,
cachedNodes,
cacheManagement,
createCachedGraph,
testCaching,
};
/*
执行结果:
Line:8 🌭 path.resolve(__dirname, '.env') /Users/loock/myFile/langgraphjs-tutorial/websites/examples/utils/.env
=== 缓存测试 ===
--- 第一次执行 ---
Cache miss for key: db_query_test_user
执行数据库查询...
执行API调用...
执行重计算...
第一次结果: {
memoryCache: { size: 1, type: 'TTL Cache' },
lruCache: { size: 1, type: 'LRU Cache' }
}
--- 第二次执行 ---
Cache hit for key: db_query_test_user
LRU Cache hit for: api_test_user
执行重计算...
第二次结果: {
memoryCache: { size: 1, type: 'TTL Cache' },
lruCache: { size: 1, type: 'LRU Cache' }
}
--- 不同查询 ---
Cache miss for key: db_query_another_user
执行数据库查询...
执行API调用...
执行重计算...
新查询结果: {
memoryCache: { size: 3, type: 'TTL Cache' },
lruCache: { size: 2, type: 'LRU Cache' }
}
=== 缓存管理测试 ===
Cache miss for key: db_query_user1
执行数据库查询...
Cache miss for key: db_query_user2
执行数据库查询...
Cache miss for key: db_query_admin
执行数据库查询...
Cache miss for key: db_query_guest
执行数据库查询...
缓存统计: {
memoryCache: { size: 7, type: 'TTL Cache' },
lruCache: { size: 2, type: 'LRU Cache' }
}
*/
2. 批处理
将多个相似的操作合并为批处理,提高效率。
批处理
/**
* ============================================================================
* 批处理节点 - Batch Processing Node
* ============================================================================
*
* 📖 概述
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* 本文件展示如何设计批处理节点,高效处理大量数据,包括分批处理、并行处理和进度追踪。
*
* 🎯 核心功能
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* 1️⃣ 节点实现:展示节点函数的标准实现模式
* 2️⃣ 状态处理:正确读取和更新状态
* 3️⃣ 错误处理:优雅处理节点执行中的错误
* 4️⃣ 最佳实践:展示节点设计的最佳实践和模式
*
* 💡 实现思路
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* • 节点函数接收状态作为参数,返回状态更新
* • 使用async/await处理异步操作
* • 通过try-catch捕获和处理错误
* • 返回部分状态更新,由reducer合并
*
* 🚀 使用方式
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* // 运行示例
* $ npx esno 最佳实践/节点设计/batch-processing.ts
*
* ⚠️ 注意事项
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* • 节点应该是纯函数或只有可控的副作用
* • 避免在节点中直接修改状态对象
* • 合理处理异步操作和错误
* • 保持节点职责单一,易于测试和维护
* • 使用TypeScript类型确保类型安全
*
* @author 程哥
* @version 1.0.0
* @updated 2025-11
*/
import '../../utils/loadEnv';
import { Annotation, StateGraph } from '@langchain/langgraph';
import { BaseMessage, HumanMessage, AIMessage } from '@langchain/core/messages';
// 定义状态
const StateAnnotation = Annotation.Root({
messages: Annotation<BaseMessage[]>({
reducer: (x, y) => x.concat(y),
default: () => [],
}),
items: Annotation<any[]>({
reducer: (x, y) => [...x, ...y],
default: () => [],
}),
batchResults: Annotation<any[]>({
reducer: (x, y) => [...x, ...y],
default: () => [],
}),
processedCount: Annotation<number>({
reducer: (x, y) => x + y,
default: () => 0,
}),
batchSize: Annotation<number>({
reducer: (x, y) => y,
default: () => 10,
}),
});
// 1. 批处理工具函数
class BatchProcessor {
static chunk<T>(array: T[], size: number): T[][] {
const chunks: T[][] = [];
for (let i = 0; i < array.length; i += size) {
chunks.push(array.slice(i, i + size));
}
return chunks;
}
static async processWithConcurrency<T, R>(
items: T[],
processor: (item: T) => Promise<R>,
concurrency: number = 3
): Promise<R[]> {
const results: R[] = [];
const chunks = this.chunk(items, concurrency);
for (const chunk of chunks) {
const chunkResults = await Promise.all(
chunk.map((item) => processor(item))
);
results.push(...chunkResults);
}
return results;
}
static async processWithRetry<T, R>(
items: T[],
processor: (item: T) => Promise<R>,
maxRetries: number = 3
): Promise<Array<{ success: boolean; result?: R; error?: string; item: T }>> {
const results: Array<{
success: boolean;
result?: R;
error?: string;
item: T;
}> = [];
for (const item of items) {
let lastError: Error | null = null;
let success = false;
let result: R | undefined;
for (let attempt = 0; attempt <= maxRetries; attempt++) {
try {
result = await processor(item);
success = true;
break;
} catch (error) {
lastError = error as Error;
if (attempt < maxRetries) {
// 指数退避
await new Promise((resolve) =>
setTimeout(resolve, Math.pow(2, attempt) * 1000)
);
}
}
}
results.push({
success,
result,
error: lastError?.message,
item,
});
}
return results;
}
}
// 2. 批处理节点实现
const batchNodes = {
// 数据准备节点
prepareData: async (state: typeof StateAnnotation.State) => {
// 模拟生成大量数据
const items = Array.from({ length: 50 }, (_, i) => ({
id: i + 1,
name: `Item_${i + 1}`,
value: Math.random() * 100,
category: ['A', 'B', 'C'][i % 3],
}));
return {
items,
messages: [new AIMessage(`准备了 ${items.length} 个数据项`)],
};
},
// 简单批处理节点
simpleBatchProcess: async (state: typeof StateAnnotation.State) => {
const { items, batchSize } = state;
const batches = BatchProcessor.chunk(items, batchSize);
const results: any[] = [];
console.log(`开始批处理,共 ${batches.length} 个批次`);
for (let i = 0; i < batches.length; i++) {
const batch = batches[i];
console.log(`处理第 ${i + 1} 批,包含 ${batch.length} 个项目`);
// 模拟批处理操作
const batchResult = await Promise.all(
batch.map(async (item) => {
await new Promise((resolve) => setTimeout(resolve, 100));
return {
...item,
processed: true,
processedAt: new Date().toISOString(),
result: item.value * 2,
};
})
);
results.push(...batchResult);
}
return {
batchResults: results,
processedCount: results.length,
messages: [
new AIMessage(`简单批处理完成,处理了 ${results.length} 个项目`),
],
};
},
// 并发批处理节点
concurrentBatchProcess: async (state: typeof StateAnnotation.State) => {
const { items } = state;
console.log('开始并发批处理...');
const results = await BatchProcessor.processWithConcurrency(
items,
async (item) => {
// 模拟异步处理
await new Promise((resolve) => setTimeout(resolve, 200));
return {
...item,
processed: true,
processedAt: new Date().toISOString(),
concurrentResult: item.value * 3,
};
},
5 // 并发度为5
);
return {
batchResults: results,
processedCount: results.length,
messages: [
new AIMessage(`并发批处理完成,处理了 ${results.length} 个项目`),
],
};
},
// 带重试的批处理节点
retryBatchProcess: async (state: typeof StateAnnotation.State) => {
const { items } = state;
console.log('开始带重试的批处理...');
const results = await BatchProcessor.processWithRetry(
items.slice(0, 10), // 只处理前10个项目作为示例
async (item) => {
// 模拟可能失败的操作
if (Math.random() < 0.3) {
// 30% 失败率
throw new Error(`处理项目 ${item.id} 失败`);
}
await new Promise((resolve) => setTimeout(resolve, 150));
return {
...item,
processed: true,
processedAt: new Date().toISOString(),
retryResult: item.value * 1.5,
};
},
2 // 最多重试2次
);
const successCount = results.filter((r) => r.success).length;
const failureCount = results.filter((r) => !r.success).length;
return {
batchResults: results,
processedCount: successCount,
messages: [
new AIMessage(
`重试批处理完成,成功: ${successCount},失败: ${failureCount}`
),
],
};
},
// 流式批处理节点
streamBatchProcess: async (state: typeof StateAnnotation.State) => {
const { items, batchSize } = state;
const batches = BatchProcessor.chunk(items, batchSize);
let processedCount = 0;
const results: any[] = [];
console.log('开始流式批处理...');
// 使用异步生成器模拟流式处理
async function* processBatches() {
for (let i = 0; i < batches.length; i++) {
const batch = batches[i];
const batchResult = await Promise.all(
batch.map(async (item) => {
await new Promise((resolve) => setTimeout(resolve, 50));
return {
...item,
processed: true,
batchIndex: i,
streamResult: item.value * 1.2,
};
})
);
processedCount += batchResult.length;
results.push(...batchResult);
yield {
batchIndex: i,
batchSize: batchResult.length,
totalProcessed: processedCount,
progress: (processedCount / items.length) * 100,
};
}
}
// 处理所有批次
for await (const progress of processBatches()) {
console.log(
`批次 ${
progress.batchIndex + 1
} 完成,进度: ${progress.progress.toFixed(1)}%`
);
}
return {
batchResults: results,
processedCount,
messages: [
new AIMessage(`流式批处理完成,处理了 ${processedCount} 个项目`),
],
};
},
// 分组批处理节点
groupedBatchProcess: async (state: typeof StateAnnotation.State) => {
const { items } = state;
console.log('开始分组批处理...');
// 按类别分组
const groups = items.reduce((acc, item) => {
if (!acc[item.category]) {
acc[item.category] = [];
}
acc[item.category].push(item);
return acc;
}, {} as Record<string, any[]>);
const results: any[] = [];
const groupResults: Record<string, any> = {};
// 并行处理每个分组
await Promise.all(
Object.entries(groups).map(async ([category, groupItems]) => {
console.log(
`处理分组 ${category},包含 ${(groupItems as any[]).length} 个项目`
);
const groupResult = await Promise.all(
(groupItems as any[]).map(async (item) => {
await new Promise((resolve) => setTimeout(resolve, 100));
return {
...item,
processed: true,
groupProcessed: true,
groupResult:
item.value *
(category === 'A' ? 2 : category === 'B' ? 1.5 : 1.2),
};
})
);
results.push(...groupResult);
groupResults[category] = {
count: groupResult.length,
avgValue:
groupResult.reduce((sum, item) => sum + item.groupResult, 0) /
groupResult.length,
};
})
);
return {
batchResults: results,
processedCount: results.length,
messages: [
new AIMessage(`分组批处理完成,处理了 ${results.length} 个项目`),
new AIMessage(`分组统计: ${JSON.stringify(groupResults, null, 2)}`),
],
};
},
};
// 3. 批处理监控节点
const monitoringNodes = {
// 性能监控
performanceMonitor: async (state: typeof StateAnnotation.State) => {
const { batchResults, processedCount } = state;
if (batchResults.length === 0) {
return {
messages: [new AIMessage('没有批处理结果可监控')],
};
}
// 计算性能指标
const processingTimes = batchResults
.filter((item) => item.processedAt)
.map((item) => new Date(item.processedAt).getTime());
const minTime = Math.min(...processingTimes);
const maxTime = Math.max(...processingTimes);
const totalDuration = maxTime - minTime;
const throughput = processedCount / (totalDuration / 1000); // 每秒处理数
const metrics = {
totalProcessed: processedCount,
totalDuration: `${totalDuration}ms`,
throughput: `${throughput.toFixed(2)} items/sec`,
avgProcessingTime: `${totalDuration / processedCount}ms`,
};
return {
messages: [
new AIMessage(`性能监控结果: ${JSON.stringify(metrics, null, 2)}`),
],
};
},
// 质量检查
qualityCheck: async (state: typeof StateAnnotation.State) => {
const { batchResults } = state;
if (batchResults.length === 0) {
return {
messages: [new AIMessage('没有批处理结果可检查')],
};
}
// 检查处理质量
const processedItems = batchResults.filter((item) => item.processed);
const failedItems = batchResults.filter(
(item) => !item.success && item.success !== undefined
);
const qualityMetrics = {
totalItems: batchResults.length,
processedItems: processedItems.length,
failedItems: failedItems.length,
successRate: `${(
(processedItems.length / batchResults.length) *
100
).toFixed(2)}%`,
dataIntegrity: processedItems.every((item) => item.id && item.name)
? '通过'
: '失败',
};
return {
messages: [
new AIMessage(
`质量检查结果: ${JSON.stringify(qualityMetrics, null, 2)}`
),
],
};
},
};
// 4. 构建批处理图
function createBatchProcessingGraph() {
return new StateGraph(StateAnnotation)
.addNode('prepareData', batchNodes.prepareData)
.addNode('simpleBatch', batchNodes.simpleBatchProcess)
.addNode('concurrentBatch', batchNodes.concurrentBatchProcess)
.addNode('retryBatch', batchNodes.retryBatchProcess)
.addNode('streamBatch', batchNodes.streamBatchProcess)
.addNode('groupedBatch', batchNodes.groupedBatchProcess)
.addNode('performanceMonitor', monitoringNodes.performanceMonitor)
.addNode('qualityCheck', monitoringNodes.qualityCheck)
.addEdge('__start__', 'prepareData')
.addEdge('prepareData', 'simpleBatch')
.addEdge('simpleBatch', 'performanceMonitor')
.addEdge('performanceMonitor', 'qualityCheck')
.addEdge('qualityCheck', '__end__')
.compile();
}
// 5. 测试不同批处理策略
async function testBatchStrategies() {
console.log('=== 批处理策略测试 ===');
// 测试简单批处理
console.log('\n--- 简单批处理 ---');
const simpleGraph = new StateGraph(StateAnnotation)
.addNode('prepare', batchNodes.prepareData)
.addNode('process', batchNodes.simpleBatchProcess)
.addEdge('__start__', 'prepare')
.addEdge('prepare', 'process')
.addEdge('process', '__end__')
.compile();
const simpleResult = await simpleGraph.invoke({
messages: [new HumanMessage('测试简单批处理')],
batchSize: 8,
});
console.log('简单批处理结果:', simpleResult.processedCount);
// 测试并发批处理
console.log('\n--- 并发批处理 ---');
const concurrentGraph = new StateGraph(StateAnnotation)
.addNode('prepare', batchNodes.prepareData)
.addNode('process', batchNodes.concurrentBatchProcess)
.addEdge('__start__', 'prepare')
.addEdge('prepare', 'process')
.addEdge('process', '__end__')
.compile();
const concurrentResult = await concurrentGraph.invoke({
messages: [new HumanMessage('测试并发批处理')],
});
console.log('并发批处理结果:', concurrentResult.processedCount);
}
// 运行示例
async function runExample() {
await testBatchStrategies();
}
if (require.main === module) {
runExample().catch(console.error);
}
export {
BatchProcessor,
batchNodes,
monitoringNodes,
createBatchProcessingGraph,
testBatchStrategies,
};
/*
执行结果:
Line:8 🌭 path.resolve(__dirname, '.env') /Users/loock/myFile/langgraphjs-tutorial/websites/examples/utils/.env
=== 批处理策略测试 ===
--- 简单批处理 ---
开始批处理,共 7 个批次
处理第 1 批,包含 8 个项目
处理第 2 批,包含 8 个项目
处理第 3 批,包含 8 个项目
处理第 4 批,包含 8 个项目
处理第 5 批,包含 8 个项目
处理第 6 批,包含 8 个项目
处理第 7 批,包含 2 个项目
简单批处理结果: 50
--- 并发批处理 ---
开始并发批处理...
并发批处理结果: 50
*/
🔍 监控和日志
1. 结构化日志
使用结构化日志记录节点的执行情况。
结构化日志
/**
* ============================================================================
* 结构化日志 - Structured Logging
* ============================================================================
*
* 📖 概述
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* 本文件展示如何在节点中实现结构化日志,记录详细的执行信息,便于调试和监控。
*
* 🎯 核心功能
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* 1️⃣ 节点实现:展示节点函数的标准实现模式
* 2️⃣ 状态处理:正确读取和更新状态
* 3️⃣ 错误处理:优雅处理节点执行中的错误
* 4️⃣ 最佳实践:展示节点设计的最佳实践和模式
*
* 💡 实现思路
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* • 节点函数接收状态作为参数,返回状态更新
* • 使用async/await处理异步操作
* • 通过try-catch捕获和处理错误
* • 返回部分状态更新,由reducer合并
*
* 🚀 使用方式
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* // 运行示例
* $ npx esno 最佳实践/节点设计/structured-logging.ts
*
* ⚠️ 注意事项
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* • 节点应该是纯函数或只有可控的副作用
* • 避免在节点中直接修改状态对象
* • 合理处理异步操作和错误
* • 保持节点职责单一,易于测试和维护
* • 使用TypeScript类型确保类型安全
*
* @author 程哥
* @version 1.0.0
* @updated 2025-11
*/
import '../../utils/loadEnv';
import { Annotation, StateGraph } from '@langchain/langgraph';
import { BaseMessage, HumanMessage, AIMessage } from '@langchain/core/messages';
// 定义状态
const StateAnnotation = Annotation.Root({
messages: Annotation<BaseMessage[]>({
reducer: (x, y) => x.concat(y),
default: () => [],
}),
userId: Annotation<string>({
reducer: (x, y) => y,
default: () => '',
}),
sessionId: Annotation<string>({
reducer: (x, y) => y,
default: () => '',
}),
requestId: Annotation<string>({
reducer: (x, y) => y,
default: () => '',
}),
result: Annotation<any>({
reducer: (x, y) => y,
default: () => null,
}),
});
// 1. 日志级别枚举
enum LogLevel {
DEBUG = 'debug',
INFO = 'info',
WARN = 'warn',
ERROR = 'error',
}
// 2. 结构化日志接口
interface LogEntry {
timestamp: string;
level: LogLevel;
message: string;
context: {
nodeId?: string;
userId?: string;
sessionId?: string;
requestId?: string;
executionTime?: number;
[key: string]: any;
};
metadata?: {
version?: string;
environment?: string;
service?: string;
[key: string]: any;
};
error?: {
name: string;
message: string;
stack?: string;
code?: string;
};
}
// 3. 结构化日志器类
class StructuredLogger {
private serviceName: string;
private version: string;
private environment: string;
constructor(
serviceName: string = 'langgraph-app',
version: string = '1.0.0',
environment: string = 'development'
) {
this.serviceName = serviceName;
this.version = version;
this.environment = environment;
}
private createLogEntry(
level: LogLevel,
message: string,
context: any = {},
error?: Error
): LogEntry {
const entry: LogEntry = {
timestamp: new Date().toISOString(),
level,
message,
context: {
...context,
},
metadata: {
service: this.serviceName,
version: this.version,
environment: this.environment,
},
};
if (error) {
entry.error = {
name: error.name,
message: error.message,
stack: error.stack,
code: (error as any).code,
};
}
return entry;
}
private output(entry: LogEntry): void {
// 在生产环境中,这里应该发送到日志聚合服务
console.log(JSON.stringify(entry, null, 2));
}
debug(message: string, context?: any): void {
this.output(this.createLogEntry(LogLevel.DEBUG, message, context));
}
info(message: string, context?: any): void {
this.output(this.createLogEntry(LogLevel.INFO, message, context));
}
warn(message: string, context?: any): void {
this.output(this.createLogEntry(LogLevel.WARN, message, context));
}
error(message: string, context?: any, error?: Error): void {
this.output(this.createLogEntry(LogLevel.ERROR, message, context, error));
}
// 节点执行日志
nodeStart(nodeId: string, context: any = {}): void {
this.info(`Node execution started: ${nodeId}`, {
nodeId,
event: 'node_start',
...context,
});
}
nodeEnd(nodeId: string, executionTime: number, context: any = {}): void {
this.info(`Node execution completed: ${nodeId}`, {
nodeId,
event: 'node_end',
executionTime,
...context,
});
}
nodeError(nodeId: string, error: Error, context: any = {}): void {
this.error(
`Node execution failed: ${nodeId}`,
{
nodeId,
event: 'node_error',
...context,
},
error
);
}
// 业务事件日志
businessEvent(eventName: string, data: any, context: any = {}): void {
this.info(`Business event: ${eventName}`, {
event: 'business_event',
eventName,
data,
...context,
});
}
// 性能日志
performance(operation: string, duration: number, context: any = {}): void {
this.info(`Performance metric: ${operation}`, {
event: 'performance',
operation,
duration,
...context,
});
}
// 安全事件日志
security(eventType: string, details: any, context: any = {}): void {
this.warn(`Security event: ${eventType}`, {
event: 'security',
eventType,
details,
...context,
});
}
}
// 4. 全局日志器实例
const logger = new StructuredLogger();
// 5. 日志装饰器
function withLogging(nodeId: string) {
return function (nodeFunction: Function) {
return async function (state: any) {
const startTime = Date.now();
const context = {
nodeId,
userId: state.userId,
sessionId: state.sessionId,
requestId: state.requestId,
};
logger.nodeStart(nodeId, context);
try {
const result = await nodeFunction(state);
const executionTime = Date.now() - startTime;
logger.nodeEnd(nodeId, executionTime, {
...context,
resultSize: JSON.stringify(result).length,
});
return result;
} catch (error) {
logger.nodeError(nodeId, error as Error, context);
throw error;
}
};
};
}
// 6. 带日志的节点示例
const loggedNodes = {
// 用户认证节点
authenticate: withLogging('authenticate')(
async (state: typeof StateAnnotation.State) => {
const { userId } = state;
logger.businessEvent('user_authentication_attempt', { userId });
// 模拟认证逻辑
await new Promise((resolve) => setTimeout(resolve, 100));
if (!userId || userId === 'invalid') {
logger.security('authentication_failed', {
userId,
reason: 'invalid_credentials',
});
throw new Error('认证失败');
}
logger.businessEvent('user_authentication_success', { userId });
return {
messages: [new AIMessage(`用户 ${userId} 认证成功`)],
};
}
),
// 数据处理节点
processData: withLogging('processData')(
async (state: typeof StateAnnotation.State) => {
const { userId, sessionId } = state;
logger.debug('开始数据处理', {
inputSize: state.messages.length,
processingMode: 'standard',
});
// 模拟数据处理
const startTime = Date.now();
await new Promise((resolve) => setTimeout(resolve, 200));
const processingTime = Date.now() - startTime;
logger.performance('data_processing', processingTime, {
userId,
sessionId,
recordsProcessed: 100,
});
const result = {
processedData: `处理完成 - ${new Date().toISOString()}`,
recordCount: 100,
};
logger.businessEvent('data_processing_completed', {
userId,
sessionId,
recordCount: result.recordCount,
});
return {
result,
messages: [new AIMessage('数据处理完成')],
};
}
),
// API调用节点
callExternalAPI: withLogging('callExternalAPI')(
async (state: typeof StateAnnotation.State) => {
const { requestId } = state;
logger.info('调用外部API', {
apiEndpoint: 'https://api.example.com/data',
method: 'GET',
});
try {
// 模拟API调用
const apiStartTime = Date.now();
await new Promise((resolve) => setTimeout(resolve, 300));
const apiDuration = Date.now() - apiStartTime;
logger.performance('external_api_call', apiDuration, {
requestId,
endpoint: 'https://api.example.com/data',
statusCode: 200,
});
const apiResult = {
data: 'API响应数据',
timestamp: new Date().toISOString(),
};
logger.businessEvent('external_api_success', {
requestId,
responseSize: JSON.stringify(apiResult).length,
});
return {
result: apiResult,
messages: [new AIMessage('API调用成功')],
};
} catch (error) {
logger.error('API调用失败', { requestId }, error as Error);
throw error;
}
}
),
// 错误处理节点
handleError: withLogging('handleError')(
async (state: typeof StateAnnotation.State) => {
const { userId, sessionId } = state;
logger.warn('进入错误处理流程', {
userId,
sessionId,
errorContext: 'user_request_processing',
});
// 模拟错误恢复
await new Promise((resolve) => setTimeout(resolve, 50));
logger.businessEvent('error_recovery_attempted', {
userId,
sessionId,
recoveryStrategy: 'fallback_response',
});
return {
result: { recovered: true, fallbackData: '默认响应' },
messages: [new AIMessage('已使用备用方案处理')],
};
}
),
};
// 7. 日志聚合和分析
class LogAnalyzer {
private logs: LogEntry[] = [];
addLog(entry: LogEntry): void {
this.logs.push(entry);
}
getErrorRate(timeWindow: number = 3600000): number {
// 默认1小时
const now = Date.now();
const windowStart = now - timeWindow;
const recentLogs = this.logs.filter(
(log) => new Date(log.timestamp).getTime() > windowStart
);
const errorLogs = recentLogs.filter((log) => log.level === LogLevel.ERROR);
return recentLogs.length > 0 ? errorLogs.length / recentLogs.length : 0;
}
getAverageExecutionTime(nodeId?: string): number {
const perfLogs = this.logs.filter(
(log) =>
log.context.event === 'node_end' &&
(!nodeId || log.context.nodeId === nodeId)
);
if (perfLogs.length === 0) return 0;
const totalTime = perfLogs.reduce(
(sum, log) => sum + (log.context.executionTime || 0),
0
);
return totalTime / perfLogs.length;
}
getTopErrors(limit: number = 5): Array<{ error: string; count: number }> {
const errorCounts = new Map<string, number>();
this.logs
.filter((log) => log.level === LogLevel.ERROR)
.forEach((log) => {
const errorKey = log.error?.name || 'Unknown Error';
errorCounts.set(errorKey, (errorCounts.get(errorKey) || 0) + 1);
});
return Array.from(errorCounts.entries())
.map(([error, count]) => ({ error, count }))
.sort((a, b) => b.count - a.count)
.slice(0, limit);
}
generateReport(): any {
return {
totalLogs: this.logs.length,
errorRate: this.getErrorRate(),
averageExecutionTime: this.getAverageExecutionTime(),
topErrors: this.getTopErrors(),
logLevelDistribution: this.getLogLevelDistribution(),
};
}
private getLogLevelDistribution(): Record<string, number> {
const distribution: Record<string, number> = {};
this.logs.forEach((log) => {
distribution[log.level] = (distribution[log.level] || 0) + 1;
});
return distribution;
}
}
// 8. 构建带日志的图
function createLoggedGraph() {
return new StateGraph(StateAnnotation)
.addNode('authenticate', loggedNodes.authenticate)
.addNode('processData', loggedNodes.processData)
.addNode('callAPI', loggedNodes.callExternalAPI)
.addEdge('__start__', 'authenticate')
.addEdge('authenticate', 'processData')
.addEdge('processData', 'callAPI')
.addEdge('callAPI', '__end__')
.compile();
}
// 9. 测试示例
async function testStructuredLogging() {
console.log('=== 结构化日志测试 ===');
const graph = createLoggedGraph();
const analyzer = new LogAnalyzer();
// 模拟多个请求
for (let i = 0; i < 3; i++) {
try {
const result = await graph.invoke({
messages: [new HumanMessage(`测试请求 ${i + 1}`)],
userId: `user_${i + 1}`,
sessionId: `session_${Date.now()}_${i}`,
requestId: `req_${Date.now()}_${i}`,
});
logger.businessEvent('request_completed', {
requestId: `req_${Date.now()}_${i}`,
success: true,
});
} catch (error) {
logger.error(
'请求处理失败',
{
requestId: `req_${Date.now()}_${i}`,
},
error as Error
);
}
}
// 生成分析报告
console.log('\n=== 日志分析报告 ===');
console.log(JSON.stringify(analyzer.generateReport(), null, 2));
}
// 10. 日志配置管理
class LogConfig {
private static instance: LogConfig;
private config: {
level: LogLevel;
enableConsole: boolean;
enableFile: boolean;
enableRemote: boolean;
remoteEndpoint?: string;
bufferSize: number;
flushInterval: number;
};
private constructor() {
this.config = {
level: LogLevel.INFO,
enableConsole: true,
enableFile: false,
enableRemote: false,
bufferSize: 100,
flushInterval: 5000,
};
}
static getInstance(): LogConfig {
if (!LogConfig.instance) {
LogConfig.instance = new LogConfig();
}
return LogConfig.instance;
}
updateConfig(newConfig: Partial<LogConfig['config']>): void {
this.config = { ...this.config, ...newConfig };
}
getConfig() {
return { ...this.config };
}
}
// 运行示例
async function runExample() {
await testStructuredLogging();
}
if (require.main === module) {
runExample().catch(console.error);
}
export {
StructuredLogger,
LogLevel,
LogEntry,
LogAnalyzer,
LogConfig,
withLogging,
loggedNodes,
createLoggedGraph,
testStructuredLogging,
};
/*
执行结果:
Line:8 🌭 path.resolve(__dirname, '.env') /Users/loock/myFile/langgraphjs-tutorial/websites/examples/utils/.env
=== 结构化日志测试 ===
{
"timestamp": "2025-11-16T17:41:59.642Z",
"level": "info",
"message": "Node execution started: authenticate",
"context": {
"nodeId": "authenticate",
"event": "node_start",
"userId": "user_1",
"sessionId": "session_1763314919632_0",
"requestId": "req_1763314919632_0"
},
"metadata": {
"service": "langgraph-app",
"version": "1.0.0",
"environment": "development"
}
}
{
"timestamp": "2025-11-16T17:41:59.642Z",
"level": "info",
"message": "Business event: user_authentication_attempt",
"context": {
"event": "business_event",
"eventName": "user_authentication_attempt",
"data": {
"userId": "user_1"
}
},
"metadata": {
"service": "langgraph-app",
"version": "1.0.0",
"environment": "development"
}
}
{
"timestamp": "2025-11-16T17:41:59.742Z",
"level": "info",
"message": "Business event: user_authentication_success",
"context": {
"event": "business_event",
"eventName": "user_authentication_success",
"data": {
"userId": "user_1"
}
},
"metadata": {
"service": "langgraph-app",
"version": "1.0.0",
"environment": "development"
}
}
{
"timestamp": "2025-11-16T17:41:59.743Z",
"level": "info",
"message": "Node execution completed: authenticate",
"context": {
"nodeId": "authenticate",
"event": "node_end",
"executionTime": 101,
"userId": "user_1",
"sessionId": "session_1763314919632_0",
"requestId": "req_1763314919632_0",
"resultSize": 215
},
"metadata": {
"service": "langgraph-app",
"version": "1.0.0",
"environment": "development"
}
}
{
"timestamp": "2025-11-16T17:41:59.745Z",
"level": "info",
"message": "Node execution started: processData",
"context": {
"nodeId": "processData",
"event": "node_start",
"userId": "user_1",
"sessionId": "session_1763314919632_0",
"requestId": "req_1763314919632_0"
},
"metadata": {
"service": "langgraph-app",
"version": "1.0.0",
"environment": "development"
}
}
{
"timestamp": "2025-11-16T17:41:59.745Z",
"level": "debug",
"message": "开始数据处理",
"context": {
"inputSize": 2,
"processingMode": "standard"
},
"metadata": {
"service": "langgraph-app",
"version": "1.0.0",
"environment": "development"
}
}
{
"timestamp": "2025-11-16T17:41:59.946Z",
"level": "info",
"message": "Performance metric: data_processing",
"context": {
"event": "performance",
"operation": "data_processing",
"duration": 201,
"userId": "user_1",
"sessionId": "session_1763314919632_0",
"recordsProcessed": 100
},
"metadata": {
"service": "langgraph-app",
"version": "1.0.0",
"environment": "development"
}
}
{
"timestamp": "2025-11-16T17:41:59.946Z",
"level": "info",
"message": "Business event: data_processing_completed",
"context": {
"event": "business_event",
"eventName": "data_processing_completed",
"data": {
"userId": "user_1",
"sessionId": "session_1763314919632_0",
"recordCount": 100
}
},
"metadata": {
"service": "langgraph-app",
"version": "1.0.0",
"environment": "development"
}
}
{
"timestamp": "2025-11-16T17:41:59.946Z",
"level": "info",
"message": "Node execution completed: processData",
"context": {
"nodeId": "processData",
"event": "node_end",
"executionTime": 201,
"userId": "user_1",
"sessionId": "session_1763314919632_0",
"requestId": "req_1763314919632_0",
"resultSize": 286
},
"metadata": {
"service": "langgraph-app",
"version": "1.0.0",
"environment": "development"
}
}
{
"timestamp": "2025-11-16T17:41:59.948Z",
"level": "info",
"message": "Node execution started: callExternalAPI",
"context": {
"nodeId": "callExternalAPI",
"event": "node_start",
"userId": "user_1",
"sessionId": "session_1763314919632_0",
"requestId": "req_1763314919632_0"
},
"metadata": {
"service": "langgraph-app",
"version": "1.0.0",
"environment": "development"
}
}
{
"timestamp": "2025-11-16T17:41:59.948Z",
"level": "info",
"message": "调用外部API",
"context": {
"apiEndpoint": "https://api.example.com/data",
"method": "GET"
},
"metadata": {
"service": "langgraph-app",
"version": "1.0.0",
"environment": "development"
}
}
{
"timestamp": "2025-11-16T17:42:00.248Z",
"level": "info",
"message": "Performance metric: external_api_call",
"context": {
"event": "performance",
"operation": "external_api_call",
"duration": 300,
"requestId": "req_1763314919632_0",
"endpoint": "https://api.example.com/data",
"statusCode": 200
},
"metadata": {
"service": "langgraph-app",
"version": "1.0.0",
"environment": "development"
}
}
{
"timestamp": "2025-11-16T17:42:00.248Z",
"level": "info",
"message": "Business event: external_api_success",
"context": {
"event": "business_event",
"eventName": "external_api_success",
"data": {
"requestId": "req_1763314919632_0",
"responseSize": 57
}
},
"metadata": {
"service": "langgraph-app",
"version": "1.0.0",
"environment": "development"
}
}
{
"timestamp": "2025-11-16T17:42:00.248Z",
"level": "info",
"message": "Node execution completed: callExternalAPI",
"context": {
"nodeId": "callExternalAPI",
"event": "node_end",
"executionTime": 300,
"userId": "user_1",
"sessionId": "session_1763314919632_0",
"requestId": "req_1763314919632_0",
"resultSize": 275
},
"metadata": {
"service": "langgraph-app",
"version": "1.0.0",
"environment": "development"
}
}
{
"timestamp": "2025-11-16T17:42:00.254Z",
"level": "info",
"message": "Business event: request_completed",
"context": {
"event": "business_event",
"eventName": "request_completed",
"data": {
"requestId": "req_1763314920254_0",
"success": true
}
},
"metadata": {
"service": "langgraph-app",
"version": "1.0.0",
"environment": "development"
}
}
{
"timestamp": "2025-11-16T17:42:00.258Z",
"level": "info",
"message": "Node execution started: authenticate",
"context": {
"nodeId": "authenticate",
"event": "node_start",
"userId": "user_2",
"sessionId": "session_1763314920254_1",
"requestId": "req_1763314920254_1"
},
"metadata": {
"service": "langgraph-app",
"version": "1.0.0",
"environment": "development"
}
}
{
"timestamp": "2025-11-16T17:42:00.258Z",
"level": "info",
"message": "Business event: user_authentication_attempt",
"context": {
"event": "business_event",
"eventName": "user_authentication_attempt",
"data": {
"userId": "user_2"
}
},
"metadata": {
"service": "langgraph-app",
"version": "1.0.0",
"environment": "development"
}
}
{
"timestamp": "2025-11-16T17:42:00.359Z",
"level": "info",
"message": "Business event: user_authentication_success",
"context": {
"event": "business_event",
"eventName": "user_authentication_success",
"data": {
"userId": "user_2"
}
},
"metadata": {
"service": "langgraph-app",
"version": "1.0.0",
"environment": "development"
}
}
{
"timestamp": "2025-11-16T17:42:00.359Z",
"level": "info",
"message": "Node execution completed: authenticate",
"context": {
"nodeId": "authenticate",
"event": "node_end",
"executionTime": 101,
"userId": "user_2",
"sessionId": "session_1763314920254_1",
"requestId": "req_1763314920254_1",
"resultSize": 215
},
"metadata": {
"service": "langgraph-app",
"version": "1.0.0",
"environment": "development"
}
}
{
"timestamp": "2025-11-16T17:42:00.360Z",
"level": "info",
"message": "Node execution started: processData",
"context": {
"nodeId": "processData",
"event": "node_start",
"userId": "user_2",
"sessionId": "session_1763314920254_1",
"requestId": "req_1763314920254_1"
},
"metadata": {
"service": "langgraph-app",
"version": "1.0.0",
"environment": "development"
}
}
{
"timestamp": "2025-11-16T17:42:00.360Z",
"level": "debug",
"message": "开始数据处理",
"context": {
"inputSize": 2,
"processingMode": "standard"
},
"metadata": {
"service": "langgraph-app",
"version": "1.0.0",
"environment": "development"
}
}
{
"timestamp": "2025-11-16T17:42:00.562Z",
"level": "info",
"message": "Performance metric: data_processing",
"context": {
"event": "performance",
"operation": "data_processing",
"duration": 202,
"userId": "user_2",
"sessionId": "session_1763314920254_1",
"recordsProcessed": 100
},
"metadata": {
"service": "langgraph-app",
"version": "1.0.0",
"environment": "development"
}
}
{
"timestamp": "2025-11-16T17:42:00.562Z",
"level": "info",
"message": "Business event: data_processing_completed",
"context": {
"event": "business_event",
"eventName": "data_processing_completed",
"data": {
"userId": "user_2",
"sessionId": "session_1763314920254_1",
"recordCount": 100
}
},
"metadata": {
"service": "langgraph-app",
"version": "1.0.0",
"environment": "development"
}
}
{
"timestamp": "2025-11-16T17:42:00.562Z",
"level": "info",
"message": "Node execution completed: processData",
"context": {
"nodeId": "processData",
"event": "node_end",
"executionTime": 202,
"userId": "user_2",
"sessionId": "session_1763314920254_1",
"requestId": "req_1763314920254_1",
"resultSize": 286
},
"metadata": {
"service": "langgraph-app",
"version": "1.0.0",
"environment": "development"
}
}
{
"timestamp": "2025-11-16T17:42:00.565Z",
"level": "info",
"message": "Node execution started: callExternalAPI",
"context": {
"nodeId": "callExternalAPI",
"event": "node_start",
"userId": "user_2",
"sessionId": "session_1763314920254_1",
"requestId": "req_1763314920254_1"
},
"metadata": {
"service": "langgraph-app",
"version": "1.0.0",
"environment": "development"
}
}
{
"timestamp": "2025-11-16T17:42:00.565Z",
"level": "info",
"message": "调用外部API",
"context": {
"apiEndpoint": "https://api.example.com/data",
"method": "GET"
},
"metadata": {
"service": "langgraph-app",
"version": "1.0.0",
"environment": "development"
}
}
{
"timestamp": "2025-11-16T17:42:00.866Z",
"level": "info",
"message": "Performance metric: external_api_call",
"context": {
"event": "performance",
"operation": "external_api_call",
"duration": 301,
"requestId": "req_1763314920254_1",
"endpoint": "https://api.example.com/data",
"statusCode": 200
},
"metadata": {
"service": "langgraph-app",
"version": "1.0.0",
"environment": "development"
}
}
{
"timestamp": "2025-11-16T17:42:00.866Z",
"level": "info",
"message": "Business event: external_api_success",
"context": {
"event": "business_event",
"eventName": "external_api_success",
"data": {
"requestId": "req_1763314920254_1",
"responseSize": 57
}
},
"metadata": {
"service": "langgraph-app",
"version": "1.0.0",
"environment": "development"
}
}
{
"timestamp": "2025-11-16T17:42:00.866Z",
"level": "info",
"message": "Node execution completed: callExternalAPI",
"context": {
"nodeId": "callExternalAPI",
"event": "node_end",
"executionTime": 301,
"userId": "user_2",
"sessionId": "session_1763314920254_1",
"requestId": "req_1763314920254_1",
"resultSize": 275
},
"metadata": {
"service": "langgraph-app",
"version": "1.0.0",
"environment": "development"
}
}
{
"timestamp": "2025-11-16T17:42:00.867Z",
"level": "info",
"message": "Business event: request_completed",
"context": {
"event": "business_event",
"eventName": "request_completed",
"data": {
"requestId": "req_1763314920867_1",
"success": true
}
},
"metadata": {
"service": "langgraph-app",
"version": "1.0.0",
"environment": "development"
}
}
{
"timestamp": "2025-11-16T17:42:00.869Z",
"level": "info",
"message": "Node execution started: authenticate",
"context": {
"nodeId": "authenticate",
"event": "node_start",
"userId": "user_3",
"sessionId": "session_1763314920867_2",
"requestId": "req_1763314920867_2"
},
"metadata": {
"service": "langgraph-app",
"version": "1.0.0",
"environment": "development"
}
}
{
"timestamp": "2025-11-16T17:42:00.869Z",
"level": "info",
"message": "Business event: user_authentication_attempt",
"context": {
"event": "business_event",
"eventName": "user_authentication_attempt",
"data": {
"userId": "user_3"
}
},
"metadata": {
"service": "langgraph-app",
"version": "1.0.0",
"environment": "development"
}
}
{
"timestamp": "2025-11-16T17:42:00.970Z",
"level": "info",
"message": "Business event: user_authentication_success",
"context": {
"event": "business_event",
"eventName": "user_authentication_success",
"data": {
"userId": "user_3"
}
},
"metadata": {
"service": "langgraph-app",
"version": "1.0.0",
"environment": "development"
}
}
{
"timestamp": "2025-11-16T17:42:00.970Z",
"level": "info",
"message": "Node execution completed: authenticate",
"context": {
"nodeId": "authenticate",
"event": "node_end",
"executionTime": 101,
"userId": "user_3",
"sessionId": "session_1763314920867_2",
"requestId": "req_1763314920867_2",
"resultSize": 215
},
"metadata": {
"service": "langgraph-app",
"version": "1.0.0",
"environment": "development"
}
}
{
"timestamp": "2025-11-16T17:42:00.971Z",
"level": "info",
"message": "Node execution started: processData",
"context": {
"nodeId": "processData",
"event": "node_start",
"userId": "user_3",
"sessionId": "session_1763314920867_2",
"requestId": "req_1763314920867_2"
},
"metadata": {
"service": "langgraph-app",
"version": "1.0.0",
"environment": "development"
}
}
{
"timestamp": "2025-11-16T17:42:00.971Z",
"level": "debug",
"message": "开始数据处理",
"context": {
"inputSize": 2,
"processingMode": "standard"
},
"metadata": {
"service": "langgraph-app",
"version": "1.0.0",
"environment": "development"
}
}
{
"timestamp": "2025-11-16T17:42:01.172Z",
"level": "info",
"message": "Performance metric: data_processing",
"context": {
"event": "performance",
"operation": "data_processing",
"duration": 201,
"userId": "user_3",
"sessionId": "session_1763314920867_2",
"recordsProcessed": 100
},
"metadata": {
"service": "langgraph-app",
"version": "1.0.0",
"environment": "development"
}
}
{
"timestamp": "2025-11-16T17:42:01.172Z",
"level": "info",
"message": "Business event: data_processing_completed",
"context": {
"event": "business_event",
"eventName": "data_processing_completed",
"data": {
"userId": "user_3",
"sessionId": "session_1763314920867_2",
"recordCount": 100
}
},
"metadata": {
"service": "langgraph-app",
"version": "1.0.0",
"environment": "development"
}
}
{
"timestamp": "2025-11-16T17:42:01.172Z",
"level": "info",
"message": "Node execution completed: processData",
"context": {
"nodeId": "processData",
"event": "node_end",
"executionTime": 201,
"userId": "user_3",
"sessionId": "session_1763314920867_2",
"requestId": "req_1763314920867_2",
"resultSize": 286
},
"metadata": {
"service": "langgraph-app",
"version": "1.0.0",
"environment": "development"
}
}
{
"timestamp": "2025-11-16T17:42:01.174Z",
"level": "info",
"message": "Node execution started: callExternalAPI",
"context": {
"nodeId": "callExternalAPI",
"event": "node_start",
"userId": "user_3",
"sessionId": "session_1763314920867_2",
"requestId": "req_1763314920867_2"
},
"metadata": {
"service": "langgraph-app",
"version": "1.0.0",
"environment": "development"
}
}
{
"timestamp": "2025-11-16T17:42:01.174Z",
"level": "info",
"message": "调用外部API",
"context": {
"apiEndpoint": "https://api.example.com/data",
"method": "GET"
},
"metadata": {
"service": "langgraph-app",
"version": "1.0.0",
"environment": "development"
}
}
{
"timestamp": "2025-11-16T17:42:01.474Z",
"level": "info",
"message": "Performance metric: external_api_call",
"context": {
"event": "performance",
"operation": "external_api_call",
"duration": 300,
"requestId": "req_1763314920867_2",
"endpoint": "https://api.example.com/data",
"statusCode": 200
},
"metadata": {
"service": "langgraph-app",
"version": "1.0.0",
"environment": "development"
}
}
{
"timestamp": "2025-11-16T17:42:01.474Z",
"level": "info",
"message": "Business event: external_api_success",
"context": {
"event": "business_event",
"eventName": "external_api_success",
"data": {
"requestId": "req_1763314920867_2",
"responseSize": 57
}
},
"metadata": {
"service": "langgraph-app",
"version": "1.0.0",
"environment": "development"
}
}
{
"timestamp": "2025-11-16T17:42:01.474Z",
"level": "info",
"message": "Node execution completed: callExternalAPI",
"context": {
"nodeId": "callExternalAPI",
"event": "node_end",
"executionTime": 300,
"userId": "user_3",
"sessionId": "session_1763314920867_2",
"requestId": "req_1763314920867_2",
"resultSize": 275
},
"metadata": {
"service": "langgraph-app",
"version": "1.0.0",
"environment": "development"
}
}
{
"timestamp": "2025-11-16T17:42:01.475Z",
"level": "info",
"message": "Business event: request_completed",
"context": {
"event": "business_event",
"eventName": "request_completed",
"data": {
"requestId": "req_1763314921475_2",
"success": true
}
},
"metadata": {
"service": "langgraph-app",
"version": "1.0.0",
"environment": "development"
}
}
=== 日志分析报告 ===
{
"totalLogs": 0,
"errorRate": 0,
"averageExecutionTime": 0,
"topErrors": [],
"logLevelDistribution": {}
}
*/
2. 性能监控
监控节点的执行时间和资源使用情况。
性能监控
/**
* ============================================================================
* 性能监控节点 - Performance Monitoring
* ============================================================================
*
* 📖 概述
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* 本文件展示如何在节点中集成性能监控,记录执行时间、资源使用等性能指标。
*
* 🎯 核心功能
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* 1️⃣ 节点实现:展示节点函数的标准实现模式
* 2️⃣ 状态处理:正确读取和更新状态
* 3️⃣ 错误处理:优雅处理节点执行中的错误
* 4️⃣ 最佳实践:展示节点设计的最佳实践和模式
*
* 💡 实现思路
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* • 节点函数接收状态作为参数,返回状态更新
* • 使用async/await处理异步操作
* • 通过try-catch捕获和处理错误
* • 返回部分状态更新,由reducer合并
*
* 🚀 使用方式
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* // 运行示例
* $ npx esno 最佳实践/节点设计/performance-monitoring.ts
*
* ⚠️ 注意事项
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* • 节点应该是纯函数或只有可控的副作用
* • 避免在节点中直接修改状态对象
* • 合理处理异步操作和错误
* • 保持节点职责单一,易于测试和维护
* • 使用TypeScript类型确保类型安全
*
* @author 程哥
* @version 1.0.0
* @updated 2025-11
*/
import '../../utils/loadEnv';
import { StateGraph, Annotation } from '@langchain/langgraph';
// 性能监控状态定义
const MonitoringState = Annotation.Root({
step: Annotation<string>({
reducer: (x, y) => y,
}),
data: Annotation<any>({
reducer: (x, y) => ({ ...x, ...y }),
}),
metrics: Annotation<{
executionTime?: number;
memoryUsage?: number;
cpuUsage?: number;
nodeMetrics?: Record<string, any>;
}>({
reducer: (x, y) => ({ ...x, ...y }),
}),
history: Annotation<string[]>({
reducer: (x, y) => x.concat(y),
}),
});
// 性能监控工具类
class PerformanceMonitor {
private startTime: number = 0;
private metrics: Record<string, any> = {};
start(label: string): void {
this.startTime = performance.now();
this.metrics[label] = { startTime: this.startTime };
}
end(label: string): number {
const endTime = performance.now();
const duration = endTime - this.startTime;
if (this.metrics[label]) {
this.metrics[label].endTime = endTime;
this.metrics[label].duration = duration;
}
return duration;
}
getMetrics(): Record<string, any> {
return { ...this.metrics };
}
reset(): void {
this.metrics = {};
this.startTime = 0;
}
// 获取内存使用情况(Node.js 环境)
getMemoryUsage(): any {
if (typeof process !== 'undefined' && process.memoryUsage) {
return process.memoryUsage();
}
return null;
}
}
// 性能监控装饰器
function withPerformanceMonitoring<T extends any[], R>(
fn: (...args: T) => Promise<R>,
label: string
) {
return async (...args: T): Promise<R & { __performance?: any }> => {
const monitor = new PerformanceMonitor();
const memoryBefore = monitor.getMemoryUsage();
monitor.start(label);
try {
const result = await fn(...args);
const duration = monitor.end(label);
const memoryAfter = monitor.getMemoryUsage();
const performanceData = {
label,
duration,
memoryBefore,
memoryAfter,
timestamp: new Date().toISOString(),
};
// 如果结果是对象,添加性能数据
if (typeof result === 'object' && result !== null) {
return {
...result,
__performance: performanceData,
};
}
return result;
} catch (error) {
monitor.end(label);
throw error;
}
};
}
// 创建带性能监控的图
const createMonitoredGraph = () => {
const graph = new StateGraph(MonitoringState);
// 数据处理节点(带性能监控)
const dataProcessingNode = withPerformanceMonitoring(
async (state: typeof MonitoringState.State) => {
// 模拟数据处理
const data = Array.from({ length: 1000 }, (_, i) => ({
id: i,
value: Math.random() * 100,
}));
// 模拟计算密集型操作
const processedData = data
.filter((item) => item.value > 50)
.map((item) => ({ ...item, processed: true }))
.sort((a, b) => b.value - a.value);
return {
step: 'data_processing',
data: { processedCount: processedData.length },
history: ['数据处理完成'],
};
},
'data_processing'
);
// API 调用节点(带性能监控)
const apiCallNode = withPerformanceMonitoring(
async (state: typeof MonitoringState.State) => {
// 模拟 API 调用
await new Promise((resolve) => setTimeout(resolve, 100));
const apiResponse = {
status: 'success',
data: { result: 'API 调用成功' },
timestamp: Date.now(),
};
return {
step: 'api_call',
data: apiResponse,
history: ['API 调用完成'],
};
},
'api_call'
);
// 聚合节点(收集性能指标)
const metricsAggregationNode = async (
state: typeof MonitoringState.State
) => {
const nodeMetrics: Record<string, any> = {};
// 从历史记录中提取性能数据
if (state.data && typeof state.data === 'object') {
Object.keys(state.data).forEach((key) => {
const item = state.data[key];
if (item && typeof item === 'object' && item.__performance) {
nodeMetrics[key] = item.__performance;
}
});
}
return {
step: 'metrics_aggregation',
metrics: {
nodeMetrics,
totalNodes: Object.keys(nodeMetrics).length,
aggregatedAt: new Date().toISOString(),
},
history: ['性能指标聚合完成'],
};
};
return graph
.addNode('data_processing', dataProcessingNode)
.addNode('api_call', apiCallNode)
.addNode('metrics_aggregation', metricsAggregationNode)
.addEdge('__start__', 'data_processing')
.addEdge('data_processing', 'api_call')
.addEdge('api_call', 'metrics_aggregation')
.addEdge('metrics_aggregation', '__end__');
};
// 性能阈值监控
class PerformanceThresholdMonitor {
private thresholds: Record<string, number>;
private alerts: Array<{
node: string;
metric: string;
value: number;
threshold: number;
timestamp: string;
}> = [];
constructor(thresholds: Record<string, number> = {}) {
this.thresholds = {
executionTime: 1000, // 1秒
memoryUsage: 100 * 1024 * 1024, // 100MB
...thresholds,
};
}
checkThresholds(metrics: Record<string, any>): void {
Object.entries(metrics).forEach(([node, nodeMetrics]) => {
if (nodeMetrics && typeof nodeMetrics === 'object') {
// 检查执行时间
if (
nodeMetrics.duration &&
nodeMetrics.duration > this.thresholds.executionTime
) {
this.alerts.push({
node,
metric: 'executionTime',
value: nodeMetrics.duration,
threshold: this.thresholds.executionTime,
timestamp: new Date().toISOString(),
});
}
// 检查内存使用
if (nodeMetrics.memoryAfter && nodeMetrics.memoryBefore) {
const memoryDiff =
nodeMetrics.memoryAfter.heapUsed -
nodeMetrics.memoryBefore.heapUsed;
if (memoryDiff > this.thresholds.memoryUsage) {
this.alerts.push({
node,
metric: 'memoryUsage',
value: memoryDiff,
threshold: this.thresholds.memoryUsage,
timestamp: new Date().toISOString(),
});
}
}
}
});
}
getAlerts(): Array<any> {
return [...this.alerts];
}
clearAlerts(): void {
this.alerts = [];
}
}
// 使用示例
const demonstratePerformanceMonitoring = async () => {
const graph = createMonitoredGraph();
const app = graph.compile();
console.log('开始性能监控演示...');
const result = await app.invoke({
step: 'initial',
data: {},
metrics: {},
history: [],
});
console.log('执行结果:', result);
// 检查性能阈值
const thresholdMonitor = new PerformanceThresholdMonitor({
executionTime: 50, // 50ms 阈值(较低,用于演示)
memoryUsage: 1024, // 1KB 阈值(较低,用于演示)
});
if (result.metrics?.nodeMetrics) {
thresholdMonitor.checkThresholds(result.metrics.nodeMetrics);
const alerts = thresholdMonitor.getAlerts();
if (alerts.length > 0) {
console.log('性能告警:', alerts);
} else {
console.log('所有节点性能正常');
}
}
};
// 导出
export {
MonitoringState,
PerformanceMonitor,
withPerformanceMonitoring,
PerformanceThresholdMonitor,
createMonitoredGraph,
demonstratePerformanceMonitoring,
};
/*
执行结果:
Line:8 🌭 path.resolve(__dirname, '.env') /Users/loock/myFile/langgraphjs-tutorial/websites/examples/utils/.env
*/
🔧 配置管理
1. 配置驱动
通过配置文件控制节点的行为,提高灵活性。
配置驱动
/**
* ============================================================================
* 配置驱动节点 - Config-Driven Node
* ============================================================================
*
* 📖 概述
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* 本文件展示如何设计配置驱动的节点,通过配置参数控制节点行为,提高复用性和灵活性。
*
* 🎯 核心功能
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* 1️⃣ 节点实现:展示节点函数的标准实现模式
* 2️⃣ 状态处理:正确读取和更新状态
* 3️⃣ 错误处理:优雅处理节点执行中的错误
* 4️⃣ 最佳实践:展示节点设计的最佳实践和模式
*
* 💡 实现思路
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* • 节点函数接收状态作为参数,返回状态更新
* • 使用async/await处理异步操作
* • 通过try-catch捕获和处理错误
* • 返回部分状态更新,由reducer合并
*
* 🚀 使用方式
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* // 运行示例
* $ npx esno 最佳实践/节点设计/config-driven.ts
*
* ⚠️ 注意事项
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* • 节点应该是纯函数或只有可控的副作用
* • 避免在节点中直接修改状态对象
* • 合理处理异步操作和错误
* • 保持节点职责单一,易于测试和维护
* • 使用TypeScript类型确保类型安全
*
* @author 程哥
* @version 1.0.0
* @updated 2025-11
*/
import '../../utils/loadEnv';
import { StateGraph, Annotation } from '@langchain/langgraph';
// 配置驱动状态定义
const ConfigDrivenState = Annotation.Root({
step: Annotation<string>({
reducer: (x, y) => y,
}),
data: Annotation<any>({
reducer: (x, y) => ({ ...x, ...y }),
}),
config: Annotation<Record<string, any>>({
reducer: (x, y) => ({ ...x, ...y }),
}),
history: Annotation<string[]>({
reducer: (x, y) => x.concat(y),
}),
});
// 配置接口定义
interface NodeConfig {
enabled: boolean;
timeout?: number;
retries?: number;
parameters?: Record<string, any>;
}
interface ProcessingConfig {
batchSize: number;
maxConcurrency: number;
enableCaching: boolean;
cacheTimeout: number;
}
interface ValidationConfig {
strict: boolean;
requiredFields: string[];
customRules?: Array<(data: any) => boolean>;
}
// 配置管理器
class ConfigManager {
private config: Record<string, any>;
private defaultConfig: Record<string, any>;
constructor(config: Record<string, any> = {}) {
this.defaultConfig = {
processing: {
batchSize: 10,
maxConcurrency: 3,
enableCaching: true,
cacheTimeout: 300000, // 5分钟
},
validation: {
strict: false,
requiredFields: ['id'],
},
nodes: {
dataProcessor: {
enabled: true,
timeout: 5000,
retries: 3,
},
validator: {
enabled: true,
timeout: 1000,
retries: 1,
},
transformer: {
enabled: true,
timeout: 3000,
retries: 2,
},
},
};
this.config = this.mergeConfig(this.defaultConfig, config);
}
private mergeConfig(defaultConfig: any, userConfig: any): any {
const result = { ...defaultConfig };
for (const key in userConfig) {
if (
typeof userConfig[key] === 'object' &&
!Array.isArray(userConfig[key])
) {
result[key] = this.mergeConfig(
defaultConfig[key] || {},
userConfig[key]
);
} else {
result[key] = userConfig[key];
}
}
return result;
}
get<T = any>(path: string): T {
const keys = path.split('.');
let current = this.config;
for (const key of keys) {
if (current && typeof current === 'object' && key in current) {
current = current[key];
} else {
return undefined as T;
}
}
return current as T;
}
set(path: string, value: any): void {
const keys = path.split('.');
let current = this.config;
for (let i = 0; i < keys.length - 1; i++) {
const key = keys[i];
if (!(key in current) || typeof current[key] !== 'object') {
current[key] = {};
}
current = current[key];
}
current[keys[keys.length - 1]] = value;
}
getNodeConfig(nodeName: string): NodeConfig {
return (
this.get<NodeConfig>(`nodes.${nodeName}`) || {
enabled: true,
timeout: 5000,
retries: 1,
}
);
}
getProcessingConfig(): ProcessingConfig {
return this.get<ProcessingConfig>('processing');
}
getValidationConfig(): ValidationConfig {
return this.get<ValidationConfig>('validation');
}
getAllConfig(): Record<string, any> {
return { ...this.config };
}
}
// 配置驱动的节点工厂
class ConfigDrivenNodeFactory {
private configManager: ConfigManager;
constructor(configManager: ConfigManager) {
this.configManager = configManager;
}
createDataProcessorNode() {
const nodeConfig = this.configManager.getNodeConfig('dataProcessor');
const processingConfig = this.configManager.getProcessingConfig();
return async (state: typeof ConfigDrivenState.State) => {
if (!nodeConfig.enabled) {
return {
step: 'data_processor_skipped',
history: ['数据处理节点已禁用'],
};
}
try {
// 使用配置参数处理数据
const batchSize = processingConfig.batchSize;
const data = state.data?.items || [];
const processedBatches = [];
for (let i = 0; i < data.length; i += batchSize) {
const batch = data.slice(i, i + batchSize);
processedBatches.push({
batchIndex: Math.floor(i / batchSize),
items: batch.map((item: any) => ({
...item,
processed: true,
timestamp: Date.now(),
})),
});
}
return {
step: 'data_processor',
data: {
processedBatches,
totalBatches: processedBatches.length,
batchSize,
},
history: [`数据处理完成,共处理 ${processedBatches.length} 个批次`],
};
} catch (error) {
return {
step: 'data_processor_error',
data: { error: error instanceof Error ? error.message : '处理失败' },
history: ['数据处理失败'],
};
}
};
}
createValidatorNode() {
const nodeConfig = this.configManager.getNodeConfig('validator');
const validationConfig = this.configManager.getValidationConfig();
return async (state: typeof ConfigDrivenState.State) => {
if (!nodeConfig.enabled) {
return {
step: 'validator_skipped',
history: ['验证节点已禁用'],
};
}
const data = state.data;
const errors: string[] = [];
// 检查必需字段
for (const field of validationConfig.requiredFields) {
if (!data || !(field in data)) {
errors.push(`缺少必需字段: ${field}`);
}
}
// 应用自定义验证规则
if (validationConfig.customRules) {
for (const rule of validationConfig.customRules) {
if (!rule(data)) {
errors.push('自定义验证规则失败');
}
}
}
const isValid = errors.length === 0;
if (!isValid && validationConfig.strict) {
return {
step: 'validator_failed',
data: { errors, isValid: false },
history: [`验证失败: ${errors.join(', ')}`],
};
}
return {
step: 'validator',
data: {
...data,
validation: { isValid, errors },
},
history: [
isValid ? '验证通过' : `验证通过(有警告: ${errors.join(', ')})`,
],
};
};
}
createTransformerNode() {
const nodeConfig = this.configManager.getNodeConfig('transformer');
return async (state: typeof ConfigDrivenState.State) => {
if (!nodeConfig.enabled) {
return {
step: 'transformer_skipped',
history: ['转换节点已禁用'],
};
}
const transformRules = nodeConfig.parameters?.transformRules || {};
const data = state.data;
const transformedData = { ...data };
// 应用转换规则
Object.entries(transformRules).forEach(([field, rule]) => {
if (data && field in data) {
switch (rule) {
case 'uppercase':
transformedData[field] = String(data[field]).toUpperCase();
break;
case 'lowercase':
transformedData[field] = String(data[field]).toLowerCase();
break;
case 'number':
transformedData[field] = Number(data[field]);
break;
default:
// 保持原值
break;
}
}
});
return {
step: 'transformer',
data: transformedData,
history: ['数据转换完成'],
};
};
}
}
// 创建配置驱动的图
const createConfigDrivenGraph = (userConfig: Record<string, any> = {}) => {
const configManager = new ConfigManager(userConfig);
const nodeFactory = new ConfigDrivenNodeFactory(configManager);
const graph = new StateGraph(ConfigDrivenState);
// 添加配置驱动的节点
graph
.addNode('data_processor', nodeFactory.createDataProcessorNode())
.addNode('validator', nodeFactory.createValidatorNode())
.addNode('transformer', nodeFactory.createTransformerNode());
// 根据配置决定边的连接
const processingConfig = configManager.getProcessingConfig();
graph.addEdge('__start__', 'data_processor');
if (configManager.getNodeConfig('validator').enabled) {
graph.addEdge('data_processor', 'validator');
if (configManager.getNodeConfig('transformer').enabled) {
graph.addEdge('validator', 'transformer');
graph.addEdge('transformer', '__end__');
} else {
graph.addEdge('validator', '__end__');
}
} else if (configManager.getNodeConfig('transformer').enabled) {
graph.addEdge('data_processor', 'transformer');
graph.addEdge('transformer', '__end__');
} else {
graph.addEdge('data_processor', '__end__');
}
return { graph, configManager };
};
// 配置预设
const ConfigPresets = {
// 开发环境配置
development: {
processing: {
batchSize: 5,
maxConcurrency: 2,
enableCaching: false,
},
validation: {
strict: false,
requiredFields: ['id'],
},
nodes: {
dataProcessor: { enabled: true, timeout: 10000 },
validator: { enabled: true, timeout: 2000 },
transformer: { enabled: true, timeout: 5000 },
},
},
// 生产环境配置
production: {
processing: {
batchSize: 50,
maxConcurrency: 10,
enableCaching: true,
cacheTimeout: 600000, // 10分钟
},
validation: {
strict: true,
requiredFields: ['id', 'type', 'timestamp'],
},
nodes: {
dataProcessor: { enabled: true, timeout: 5000, retries: 3 },
validator: { enabled: true, timeout: 1000, retries: 2 },
transformer: { enabled: true, timeout: 3000, retries: 2 },
},
},
// 测试环境配置
testing: {
processing: {
batchSize: 3,
maxConcurrency: 1,
enableCaching: false,
},
validation: {
strict: true,
requiredFields: ['id'],
},
nodes: {
dataProcessor: { enabled: true, timeout: 1000 },
validator: { enabled: true, timeout: 500 },
transformer: { enabled: false }, // 测试时禁用转换
},
},
};
// 使用示例
const demonstrateConfigDriven = async () => {
console.log('=== 配置驱动节点演示 ===');
// 使用开发环境配置
const { graph: devGraph, configManager: devConfig } = createConfigDrivenGraph(
ConfigPresets.development
);
const devApp = devGraph.compile();
const devResult = await devApp.invoke({
step: 'initial',
data: {
id: 'test-001',
items: [
{ id: 1, name: 'item1' },
{ id: 2, name: 'item2' },
{ id: 3, name: 'item3' },
],
},
config: devConfig.getAllConfig(),
history: [],
});
console.log('开发环境结果:', devResult);
// 使用生产环境配置
const { graph: prodGraph, configManager: prodConfig } =
createConfigDrivenGraph(ConfigPresets.production);
const prodApp = prodGraph.compile();
const prodResult = await prodApp.invoke({
step: 'initial',
data: {
id: 'prod-001',
type: 'batch',
timestamp: Date.now(),
items: Array.from({ length: 20 }, (_, i) => ({
id: i,
name: `item${i}`,
})),
},
config: prodConfig.getAllConfig(),
history: [],
});
console.log('生产环境结果:', prodResult);
};
// 导出
export {
ConfigDrivenState,
ConfigManager,
ConfigDrivenNodeFactory,
createConfigDrivenGraph,
ConfigPresets,
demonstrateConfigDriven,
};
/*
执行结果:
Line:8 🌭 path.resolve(__dirname, '.env') /Users/loock/myFile/langgraphjs-tutorial/websites/examples/utils/.env
*/
2. 环境适配
根据不同的环境(开发、测试、生产)调整节点行为。
环境适配
/**
* ============================================================================
* 环境适配节点 - Environment Adaptation
* ============================================================================
*
* 📖 概述
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* 本文件展示如何设计能适配不同环境(开发、测试、生产)的节点,通过环境变量和配置调整行为。
*
* 🎯 核心功能
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* 1️⃣ 节点实现:展示节点函数的标准实现模式
* 2️⃣ 状态处理:正确读取和更新状态
* 3️⃣ 错误处理:优雅处理节点执行中的错误
* 4️⃣ 最佳实践:展示节点设计的最佳实践和模式
*
* 💡 实现思路
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* • 节点函数接收状态作为参数,返回状态更新
* • 使用async/await处理异步操作
* • 通过try-catch捕获和处理错误
* • 返回部分状态更新,由reducer合并
*
* 🚀 使用方式
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* // 运行示例
* $ npx esno 最佳实践/节点设计/environment-adaptation.ts
*
* ⚠️ 注意事项
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* • 节点应该是纯函数或只有可控的副作用
* • 避免在节点中直接修改状态对象
* • 合理处理异步操作和错误
* • 保持节点职责单一,易于测试和维护
* • 使用TypeScript类型确保类型安全
*
* @author 程哥
* @version 1.0.0
* @updated 2025-11
*/
import '../../utils/loadEnv';
import { StateGraph, Annotation } from '@langchain/langgraph';
// 环境适配状态定义
const EnvironmentState = Annotation.Root({
step: Annotation<string>({
reducer: (x, y) => y,
}),
data: Annotation<any>({
reducer: (x, y) => ({ ...x, ...y }),
}),
environment: Annotation<string>({
reducer: (x, y) => y,
}),
config: Annotation<Record<string, any>>({
reducer: (x, y) => ({ ...x, ...y }),
}),
history: Annotation<string[]>({
reducer: (x, y) => x.concat(y),
}),
});
// 环境类型枚举
enum Environment {
DEVELOPMENT = 'development',
TESTING = 'testing',
STAGING = 'staging',
PRODUCTION = 'production',
}
// 环境配置接口
interface EnvironmentConfig {
apiEndpoint: string;
timeout: number;
retries: number;
logLevel: 'debug' | 'info' | 'warn' | 'error';
enableMetrics: boolean;
enableCaching: boolean;
batchSize: number;
features: {
enableAdvancedProcessing: boolean;
enableExperimentalFeatures: boolean;
enableDetailedLogging: boolean;
};
}
// 环境配置管理器
class EnvironmentConfigManager {
private static configs: Record<Environment, EnvironmentConfig> = {
[Environment.DEVELOPMENT]: {
apiEndpoint: 'http://localhost:3000/api',
timeout: 10000,
retries: 1,
logLevel: 'debug',
enableMetrics: true,
enableCaching: false,
batchSize: 5,
features: {
enableAdvancedProcessing: true,
enableExperimentalFeatures: true,
enableDetailedLogging: true,
},
},
[Environment.TESTING]: {
apiEndpoint: 'http://test-api.example.com/api',
timeout: 5000,
retries: 2,
logLevel: 'info',
enableMetrics: false,
enableCaching: false,
batchSize: 3,
features: {
enableAdvancedProcessing: true,
enableExperimentalFeatures: false,
enableDetailedLogging: true,
},
},
[Environment.STAGING]: {
apiEndpoint: 'https://staging-api.example.com/api',
timeout: 8000,
retries: 3,
logLevel: 'warn',
enableMetrics: true,
enableCaching: true,
batchSize: 20,
features: {
enableAdvancedProcessing: true,
enableExperimentalFeatures: false,
enableDetailedLogging: false,
},
},
[Environment.PRODUCTION]: {
apiEndpoint: 'https://api.example.com/api',
timeout: 5000,
retries: 3,
logLevel: 'error',
enableMetrics: true,
enableCaching: true,
batchSize: 50,
features: {
enableAdvancedProcessing: true,
enableExperimentalFeatures: false,
enableDetailedLogging: false,
},
},
};
static getConfig(env: Environment): EnvironmentConfig {
return this.configs[env];
}
static getCurrentEnvironment(): Environment {
const nodeEnv = process.env.NODE_ENV;
switch (nodeEnv) {
case 'development':
return Environment.DEVELOPMENT;
case 'test':
return Environment.TESTING;
case 'staging':
return Environment.STAGING;
case 'production':
return Environment.PRODUCTION;
default:
return Environment.DEVELOPMENT;
}
}
static setCustomConfig(
env: Environment,
config: Partial<EnvironmentConfig>
): void {
this.configs[env] = { ...this.configs[env], ...config };
}
}
// 环境适配的日志器
class EnvironmentLogger {
private logLevel: string;
private environment: Environment;
constructor(environment: Environment) {
this.environment = environment;
const config = EnvironmentConfigManager.getConfig(environment);
this.logLevel = config.logLevel;
}
private shouldLog(level: string): boolean {
const levels = ['debug', 'info', 'warn', 'error'];
const currentLevelIndex = levels.indexOf(this.logLevel);
const messageLevelIndex = levels.indexOf(level);
return messageLevelIndex >= currentLevelIndex;
}
debug(message: string, data?: any): void {
if (this.shouldLog('debug')) {
console.log(`[${this.environment}] DEBUG:`, message, data || '');
}
}
info(message: string, data?: any): void {
if (this.shouldLog('info')) {
console.log(`[${this.environment}] INFO:`, message, data || '');
}
}
warn(message: string, data?: any): void {
if (this.shouldLog('warn')) {
console.warn(`[${this.environment}] WARN:`, message, data || '');
}
}
error(message: string, data?: any): void {
if (this.shouldLog('error')) {
console.error(`[${this.environment}] ERROR:`, message, data || '');
}
}
}
// 环境适配的 API 客户端
class EnvironmentApiClient {
private config: EnvironmentConfig;
private logger: EnvironmentLogger;
constructor(environment: Environment) {
this.config = EnvironmentConfigManager.getConfig(environment);
this.logger = new EnvironmentLogger(environment);
}
async makeRequest(endpoint: string, data?: any): Promise<any> {
const url = `${this.config.apiEndpoint}${endpoint}`;
let attempt = 0;
while (attempt < this.config.retries) {
try {
this.logger.debug(`Making API request to ${url}`, {
attempt: attempt + 1,
data,
});
// 模拟 API 调用
const response = await this.simulateApiCall(url, data);
this.logger.info(`API request successful`, { url, response });
return response;
} catch (error) {
attempt++;
this.logger.warn(`API request failed, attempt ${attempt}`, {
url,
error,
});
if (attempt >= this.config.retries) {
this.logger.error(
`API request failed after ${this.config.retries} attempts`,
{ url, error }
);
throw error;
}
// 等待重试
await new Promise((resolve) => setTimeout(resolve, 1000 * attempt));
}
}
}
private async simulateApiCall(url: string, data?: any): Promise<any> {
// 模拟网络延迟
await new Promise((resolve) => setTimeout(resolve, Math.random() * 100));
// 模拟偶尔的失败(开发环境更容易失败)
const failureRate = this.config.apiEndpoint.includes('localhost')
? 0.3
: 0.1;
if (Math.random() < failureRate) {
throw new Error('模拟 API 失败');
}
return {
success: true,
data: { processed: true, timestamp: Date.now() },
url,
requestData: data,
};
}
}
// 环境适配的节点工厂
class EnvironmentAdaptiveNodeFactory {
private environment: Environment;
private config: EnvironmentConfig;
private logger: EnvironmentLogger;
private apiClient: EnvironmentApiClient;
constructor(environment: Environment) {
this.environment = environment;
this.config = EnvironmentConfigManager.getConfig(environment);
this.logger = new EnvironmentLogger(environment);
this.apiClient = new EnvironmentApiClient(environment);
}
createDataProcessingNode() {
return async (state: typeof EnvironmentState.State) => {
this.logger.info('开始数据处理', { environment: this.environment });
const data = state.data?.items || [];
const batchSize = this.config.batchSize;
// 根据环境决定处理策略
let processedData;
if (this.config.features.enableAdvancedProcessing) {
// 高级处理模式
processedData = await this.advancedProcessing(data, batchSize);
} else {
// 简单处理模式
processedData = await this.simpleProcessing(data, batchSize);
}
return {
step: 'data_processing',
data: processedData,
environment: this.environment,
history: [`数据处理完成 (${this.environment} 环境)`],
};
};
}
createApiCallNode() {
return async (state: typeof EnvironmentState.State) => {
this.logger.info('开始 API 调用', { environment: this.environment });
try {
const response = await this.apiClient.makeRequest(
'/process',
state.data
);
return {
step: 'api_call',
data: { apiResponse: response },
environment: this.environment,
history: [`API 调用成功 (${this.environment} 环境)`],
};
} catch (error) {
this.logger.error('API 调用失败', error);
return {
step: 'api_call_failed',
data: { error: error instanceof Error ? error.message : '未知错误' },
environment: this.environment,
history: [`API 调用失败 (${this.environment} 环境)`],
};
}
};
}
createMetricsNode() {
return async (state: typeof EnvironmentState.State) => {
if (!this.config.enableMetrics) {
this.logger.debug('指标收集已禁用');
return {
step: 'metrics_skipped',
environment: this.environment,
history: ['指标收集已跳过'],
};
}
this.logger.info('收集性能指标');
const metrics = {
environment: this.environment,
timestamp: Date.now(),
processedItems: state.data?.processedItems?.length || 0,
executionTime: Date.now() - (state.data?.startTime || Date.now()),
memoryUsage: this.getMemoryUsage(),
};
return {
step: 'metrics_collection',
data: { metrics },
environment: this.environment,
history: ['性能指标收集完成'],
};
};
}
private async advancedProcessing(
data: any[],
batchSize: number
): Promise<any> {
this.logger.debug('使用高级处理模式', {
batchSize,
itemCount: data.length,
});
const batches = [];
for (let i = 0; i < data.length; i += batchSize) {
const batch = data.slice(i, i + batchSize);
// 高级处理:包含更多的数据转换和验证
const processedBatch = batch.map((item) => ({
...item,
processed: true,
processingMode: 'advanced',
timestamp: Date.now(),
hash: this.generateHash(item),
validation: this.validateItem(item),
}));
batches.push(processedBatch);
}
return {
processedItems: batches.flat(),
processingMode: 'advanced',
batchCount: batches.length,
startTime: Date.now(),
};
}
private async simpleProcessing(data: any[], batchSize: number): Promise<any> {
this.logger.debug('使用简单处理模式', {
batchSize,
itemCount: data.length,
});
const processedItems = data.map((item) => ({
...item,
processed: true,
processingMode: 'simple',
timestamp: Date.now(),
}));
return {
processedItems,
processingMode: 'simple',
batchCount: 1,
startTime: Date.now(),
};
}
private generateHash(item: any): string {
return `hash_${JSON.stringify(item).length}_${Date.now()}`;
}
private validateItem(item: any): { isValid: boolean; errors: string[] } {
const errors: string[] = [];
if (!item.id) errors.push('缺少 ID');
if (!item.name) errors.push('缺少名称');
return {
isValid: errors.length === 0,
errors,
};
}
private getMemoryUsage(): any {
if (typeof process !== 'undefined' && process.memoryUsage) {
return process.memoryUsage();
}
return { heapUsed: 0, heapTotal: 0 };
}
}
// 创建环境适配的图
const createEnvironmentAdaptiveGraph = (environment?: Environment) => {
const env = environment || EnvironmentConfigManager.getCurrentEnvironment();
const nodeFactory = new EnvironmentAdaptiveNodeFactory(env);
const graph = new StateGraph(EnvironmentState);
return graph
.addNode('data_processing', nodeFactory.createDataProcessingNode())
.addNode('api_call', nodeFactory.createApiCallNode())
.addNode('metrics_collection', nodeFactory.createMetricsNode())
.addEdge('__start__', 'data_processing')
.addEdge('data_processing', 'api_call')
.addEdge('api_call', 'metrics_collection')
.addEdge('metrics_collection', '__end__');
};
// 使用示例
const demonstrateEnvironmentAdaptation = async () => {
console.log('=== 环境适配演示 ===');
// 测试不同环境
const environments = [
Environment.DEVELOPMENT,
Environment.TESTING,
Environment.PRODUCTION,
];
for (const env of environments) {
console.log(`\n--- ${env.toUpperCase()} 环境 ---`);
const graph = createEnvironmentAdaptiveGraph(env);
const app = graph.compile();
const result = await app.invoke({
step: 'initial',
data: {
items: [
{ id: 1, name: 'item1' },
{ id: 2, name: 'item2' },
{ id: 3, name: 'item3' },
],
},
environment: env,
config: EnvironmentConfigManager.getConfig(env),
history: [],
});
console.log(`${env} 环境执行结果:`, {
step: result.step,
environment: result.environment,
dataKeys: Object.keys(result.data || {}),
historyCount: result.history?.length || 0,
});
}
};
// 如果直接运行此文件
if (require.main === module) {
demonstrateEnvironmentAdaptation().catch(console.error);
}
// 导出
export {
EnvironmentState,
Environment,
EnvironmentConfigManager,
EnvironmentLogger,
EnvironmentApiClient,
EnvironmentAdaptiveNodeFactory,
createEnvironmentAdaptiveGraph,
demonstrateEnvironmentAdaptation,
};
/*
执行结果:
Line:8 🌭 path.resolve(__dirname, '.env') /Users/loock/myFile/langgraphjs-tutorial/websites/examples/utils/.env
=== 环境适配演示 ===
--- DEVELOPMENT 环境 ---
[development] INFO: 开始数据处理 { environment: 'development' }
[development] DEBUG: 使用高级处理模式 { batchSize: 5, itemCount: 3 }
[development] INFO: 开始 API 调用 { environment: 'development' }
[development] DEBUG: Making API request to http://localhost:3000/api/process {
attempt: 1,
data: {
items: [ [Object], [Object], [Object] ],
processedItems: [ [Object], [Object], [Object] ],
processingMode: 'advanced',
batchCount: 1,
startTime: 1763376288620
}
}
[development] INFO: API request successful {
url: 'http://localhost:3000/api/process',
response: {
success: true,
data: { processed: true, timestamp: 1763376288709 },
url: 'http://localhost:3000/api/process',
requestData: {
items: [Array],
processedItems: [Array],
processingMode: 'advanced',
batchCount: 1,
startTime: 1763376288620
}
}
}
[development] INFO: 收集性能指标
development 环境执行结果: {
step: 'metrics_collection',
environment: 'development',
dataKeys: [
'items',
'processedItems',
'processingMode',
'batchCount',
'startTime',
'apiResponse',
'metrics'
],
historyCount: 3
}
--- TESTING 环境 ---
[testing] INFO: 开始数据处理 { environment: 'testing' }
[testing] INFO: 开始 API 调用 { environment: 'testing' }
[testing] INFO: API request successful {
url: 'http://test-api.example.com/api/process',
response: {
success: true,
data: { processed: true, timestamp: 1763376288786 },
url: 'http://test-api.example.com/api/process',
requestData: {
items: [Array],
processedItems: [Array],
processingMode: 'advanced',
batchCount: 1,
startTime: 1763376288725
}
}
}
testing 环境执行结果: {
step: 'metrics_skipped',
environment: 'testing',
dataKeys: [
'items',
'processedItems',
'processingMode',
'batchCount',
'startTime',
'apiResponse'
],
historyCount: 3
}
--- PRODUCTION 环境 ---
production 环境执行结果: {
step: 'metrics_collection',
environment: 'production',
dataKeys: [
'items',
'processedItems',
'processingMode',
'batchCount',
'startTime',
'apiResponse',
'metrics'
],
historyCount: 3
}
*/
🔄 状态管理
1. 状态更新模式
合理设计状态更新逻辑,确保状态的一致性。
2. 状态隔离
避免节点之间的状态污染,确保每个节点只处理自己相关的状态。
状态隔离
/**
* ============================================================================
* 状态隔离 - State Isolation
* ============================================================================
*
* 📖 概述
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* 本文件展示如何在节点中实现状态隔离,避免节点间的状态耦合和副作用。
*
* 🎯 核心功能
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* 1️⃣ 节点实现:展示节点函数的标准实现模式
* 2️⃣ 状态处理:正确读取和更新状态
* 3️⃣ 错误处理:优雅处理节点执行中的错误
* 4️⃣ 最佳实践:展示节点设计的最佳实践和模式
*
* 💡 实现思路
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* • 节点函数接收状态作为参数,返回状态更新
* • 使用async/await处理异步操作
* • 通过try-catch捕获和处理错误
* • 返回部分状态更新,由reducer合并
*
* 🚀 使用方式
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* // 运行示例
* $ npx esno 最佳实践/节点设计/state-isolation.ts
*
* ⚠️ 注意事项
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* • 节点应该是纯函数或只有可控的副作用
* • 避免在节点中直接修改状态对象
* • 合理处理异步操作和错误
* • 保持节点职责单一,易于测试和维护
* • 使用TypeScript类型确保类型安全
*
* @author 程哥
* @version 1.0.0
* @updated 2025-11
*/
import '../../utils/loadEnv';
import { StateGraph, Annotation } from '@langchain/langgraph';
// 状态隔离演示的基础状态
const BaseState = Annotation.Root({
step: Annotation<string>({
reducer: (x, y) => y,
}),
globalData: Annotation<any>({
reducer: (x, y) => ({ ...x, ...y }),
}),
history: Annotation<string[]>({
reducer: (x, y) => x.concat(y),
}),
});
// 用户会话状态(隔离的)
const UserSessionState = Annotation.Root({
...BaseState.spec,
userId: Annotation<string>({
reducer: (x, y) => y,
}),
sessionData: Annotation<any>({
reducer: (x, y) => ({ ...x, ...y }),
}),
preferences: Annotation<Record<string, any>>({
reducer: (x, y) => ({ ...x, ...y }),
}),
});
// 处理任务状态(隔离的)
const TaskProcessingState = Annotation.Root({
...BaseState.spec,
taskId: Annotation<string>({
reducer: (x, y) => y,
}),
taskData: Annotation<any>({
reducer: (x, y) => ({ ...x, ...y }),
}),
processingContext: Annotation<Record<string, any>>({
reducer: (x, y) => ({ ...x, ...y }),
}),
});
// 状态隔离管理器
class StateIsolationManager {
private userSessions: Map<string, any> = new Map();
private taskContexts: Map<string, any> = new Map();
private globalState: any = {};
// 用户会话状态管理
getUserSession(userId: string): any {
if (!this.userSessions.has(userId)) {
this.userSessions.set(userId, {
userId,
sessionData: {},
preferences: {},
createdAt: Date.now(),
lastAccessed: Date.now(),
});
}
const session = this.userSessions.get(userId);
session.lastAccessed = Date.now();
return { ...session };
}
updateUserSession(userId: string, updates: any): void {
const session = this.getUserSession(userId);
this.userSessions.set(userId, {
...session,
...updates,
lastAccessed: Date.now(),
});
}
clearUserSession(userId: string): void {
this.userSessions.delete(userId);
}
// 任务上下文管理
getTaskContext(taskId: string): any {
if (!this.taskContexts.has(taskId)) {
this.taskContexts.set(taskId, {
taskId,
taskData: {},
processingContext: {},
createdAt: Date.now(),
status: 'initialized',
});
}
return { ...this.taskContexts.get(taskId) };
}
updateTaskContext(taskId: string, updates: any): void {
const context = this.getTaskContext(taskId);
this.taskContexts.set(taskId, {
...context,
...updates,
updatedAt: Date.now(),
});
}
clearTaskContext(taskId: string): void {
this.taskContexts.delete(taskId);
}
// 全局状态管理
getGlobalState(): any {
return { ...this.globalState };
}
updateGlobalState(updates: any): void {
this.globalState = { ...this.globalState, ...updates };
}
// 清理过期状态
cleanupExpiredStates(maxAge: number = 3600000): void {
// 默认1小时
const now = Date.now();
// 清理过期的用户会话
for (const [userId, session] of this.userSessions.entries()) {
if (now - session.lastAccessed > maxAge) {
this.userSessions.delete(userId);
}
}
// 清理过期的任务上下文
for (const [taskId, context] of this.taskContexts.entries()) {
if (now - context.createdAt > maxAge) {
this.taskContexts.delete(taskId);
}
}
}
// 获取状态统计
getStateStats(): any {
return {
userSessions: this.userSessions.size,
taskContexts: this.taskContexts.size,
globalStateKeys: Object.keys(this.globalState).length,
};
}
}
// 创建全局状态管理器实例
const stateManager = new StateIsolationManager();
// 用户会话节点工厂
class UserSessionNodeFactory {
static createUserDataNode() {
return async (state: typeof UserSessionState.State) => {
const userId = state.userId;
const session = stateManager.getUserSession(userId);
// 处理用户特定的数据,不影响其他用户
const userData = {
profile: session.sessionData.profile || {},
activity: [
...(session.sessionData.activity || []),
{
action: 'data_processing',
timestamp: Date.now(),
},
],
};
// 更新用户会话状态
stateManager.updateUserSession(userId, {
sessionData: { ...session.sessionData, ...userData },
});
return {
step: 'user_data_processed',
sessionData: userData,
history: [`用户 ${userId} 数据处理完成`],
};
};
}
static createUserPreferencesNode() {
return async (state: typeof UserSessionState.State) => {
const userId = state.userId;
const session = stateManager.getUserSession(userId);
// 处理用户偏好设置
const preferences = {
...session.preferences,
theme: state.preferences?.theme || 'light',
language: state.preferences?.language || 'zh-CN',
notifications: state.preferences?.notifications !== false,
lastUpdated: Date.now(),
};
// 更新用户偏好,不影响其他用户
stateManager.updateUserSession(userId, { preferences });
return {
step: 'user_preferences_updated',
preferences,
history: [`用户 ${userId} 偏好设置已更新`],
};
};
}
}
// 任务处理节点工厂
class TaskProcessingNodeFactory {
static createTaskInitNode() {
return async (state: typeof TaskProcessingState.State) => {
const taskId = state.taskId;
// 初始化任务上下文,与其他任务隔离
const taskData = {
input: state.taskData?.input || {},
metadata: {
createdAt: Date.now(),
priority: state.taskData?.priority || 'normal',
category: state.taskData?.category || 'general',
},
};
stateManager.updateTaskContext(taskId, {
taskData,
status: 'processing',
});
return {
step: 'task_initialized',
taskData,
processingContext: { initialized: true },
history: [`任务 ${taskId} 初始化完成`],
};
};
}
static createTaskProcessNode() {
return async (state: typeof TaskProcessingState.State) => {
const taskId = state.taskId;
const context = stateManager.getTaskContext(taskId);
// 在隔离的上下文中处理任务
const processingResult = {
processedData: {
...context.taskData.input,
processed: true,
processingTime: Date.now() - context.createdAt,
},
metrics: {
itemsProcessed: Object.keys(context.taskData.input || {}).length,
processingDuration: Date.now() - context.createdAt,
},
};
// 更新任务上下文
stateManager.updateTaskContext(taskId, {
taskData: { ...context.taskData, result: processingResult },
processingContext: {
...context.processingContext,
completed: true,
completedAt: Date.now(),
},
status: 'completed',
});
return {
step: 'task_processed',
taskData: processingResult,
processingContext: { completed: true },
history: [`任务 ${taskId} 处理完成`],
};
};
}
}
// 全局状态节点工厂
class GlobalStateNodeFactory {
static createGlobalStatsNode() {
return async (state: typeof BaseState.State) => {
const globalState = stateManager.getGlobalState();
const stats = stateManager.getStateStats();
// 更新全局统计信息
const updatedStats = {
...globalState.stats,
totalRequests: (globalState.stats?.totalRequests || 0) + 1,
lastRequestTime: Date.now(),
activeStates: stats,
};
stateManager.updateGlobalState({
stats: updatedStats,
});
return {
step: 'global_stats_updated',
globalData: { stats: updatedStats },
history: ['全局统计信息已更新'],
};
};
}
static createCleanupNode() {
return async (state: typeof BaseState.State) => {
// 清理过期状态
const beforeStats = stateManager.getStateStats();
stateManager.cleanupExpiredStates(300000); // 5分钟过期
const afterStats = stateManager.getStateStats();
const cleanupResult = {
before: beforeStats,
after: afterStats,
cleaned: {
userSessions: beforeStats.userSessions - afterStats.userSessions,
taskContexts: beforeStats.taskContexts - afterStats.taskContexts,
},
cleanupTime: Date.now(),
};
return {
step: 'cleanup_completed',
globalData: { cleanupResult },
history: [
`清理完成,清理了 ${cleanupResult.cleaned.userSessions} 个用户会话和 ${cleanupResult.cleaned.taskContexts} 个任务上下文`,
],
};
};
}
}
// 创建用户会话图
const createUserSessionGraph = () => {
const graph = new StateGraph(UserSessionState);
return graph
.addNode('user_data', UserSessionNodeFactory.createUserDataNode())
.addNode(
'user_preferences',
UserSessionNodeFactory.createUserPreferencesNode()
)
.addNode('global_stats', GlobalStateNodeFactory.createGlobalStatsNode())
.addEdge('__start__', 'user_data')
.addEdge('user_data', 'user_preferences')
.addEdge('user_preferences', 'global_stats')
.addEdge('global_stats', '__end__');
};
// 创建任务处理图
const createTaskProcessingGraph = () => {
const graph = new StateGraph(TaskProcessingState);
return graph
.addNode('task_init', TaskProcessingNodeFactory.createTaskInitNode())
.addNode('task_process', TaskProcessingNodeFactory.createTaskProcessNode())
.addNode('global_stats', GlobalStateNodeFactory.createGlobalStatsNode())
.addEdge('__start__', 'task_init')
.addEdge('task_init', 'task_process')
.addEdge('task_process', 'global_stats')
.addEdge('global_stats', '__end__');
};
// 创建清理图
const createCleanupGraph = () => {
const graph = new StateGraph(BaseState);
return graph
.addNode('cleanup', GlobalStateNodeFactory.createCleanupNode())
.addEdge('__start__', 'cleanup')
.addEdge('cleanup', '__end__');
};
// 状态隔离演示
const demonstrateStateIsolation = async () => {
console.log('=== 状态隔离演示 ===');
// 创建图实例
const userGraph = createUserSessionGraph();
const taskGraph = createTaskProcessingGraph();
const cleanupGraph = createCleanupGraph();
const userApp = userGraph.compile();
const taskApp = taskGraph.compile();
const cleanupApp = cleanupGraph.compile();
console.log('\n1. 处理多个用户会话(状态隔离)');
// 并行处理多个用户会话
const userResults = await Promise.all([
userApp.invoke({
step: 'initial',
userId: 'user1',
sessionData: { profile: { name: 'Alice' } },
preferences: { theme: 'dark', language: 'en' },
globalData: {},
history: [],
}),
userApp.invoke({
step: 'initial',
userId: 'user2',
sessionData: { profile: { name: 'Bob' } },
preferences: { theme: 'light', language: 'zh-CN' },
globalData: {},
history: [],
}),
userApp.invoke({
step: 'initial',
userId: 'user3',
sessionData: { profile: { name: 'Charlie' } },
preferences: { theme: 'auto', notifications: false },
globalData: {},
history: [],
}),
]);
userResults.forEach((result, index) => {
console.log(`用户${index + 1}处理结果:`, {
userId: result.userId,
step: result.step,
preferencesTheme: result.preferences?.theme,
historyCount: result.history?.length,
});
});
console.log('\n2. 处理多个任务(状态隔离)');
// 并行处理多个任务
const taskResults = await Promise.all([
taskApp.invoke({
step: 'initial',
taskId: 'task1',
taskData: {
input: { type: 'data_analysis', items: [1, 2, 3] },
priority: 'high',
category: 'analytics',
},
processingContext: {},
globalData: {},
history: [],
}),
taskApp.invoke({
step: 'initial',
taskId: 'task2',
taskData: {
input: { type: 'image_processing', files: ['a.jpg', 'b.png'] },
priority: 'normal',
category: 'media',
},
processingContext: {},
globalData: {},
history: [],
}),
]);
taskResults.forEach((result, index) => {
console.log(`任务${index + 1}处理结果:`, {
taskId: result.taskId,
step: result.step,
processingCompleted: result.processingContext?.completed,
historyCount: result.history?.length,
});
});
console.log('\n3. 查看状态统计');
console.log('当前状态统计:', stateManager.getStateStats());
console.log('\n4. 执行清理操作');
const cleanupResult = await cleanupApp.invoke({
step: 'initial',
globalData: {},
history: [],
});
console.log('清理结果:', cleanupResult.globalData?.cleanupResult);
console.log('清理后状态统计:', stateManager.getStateStats());
};
// 导出
export {
BaseState,
UserSessionState,
TaskProcessingState,
StateIsolationManager,
UserSessionNodeFactory,
TaskProcessingNodeFactory,
GlobalStateNodeFactory,
createUserSessionGraph,
createTaskProcessingGraph,
createCleanupGraph,
demonstrateStateIsolation,
stateManager,
};
/*
执行结果:
Line:8 🌭 path.resolve(__dirname, '.env') /Users/loock/myFile/langgraphjs-tutorial/websites/examples/utils/.env
*/
📝 最佳实践总结
核心原则
- 单一职责:每个节点只做一件事,做好一件事
- 纯函数:尽可能避免副作用,提高可预测性
- 错误处理:完善的异常处理机制
- 可测试性:便于编写和执行单元测试
- 性能优化:合理使用缓存和批处理
- 监控日志:完善的观测能力
常见陷阱
- 过度复杂:避免在单个节点中处理过多逻辑
- 状态污染:避免节点间的状态相互影响
- 同步阻塞:合理处理异步操作,避免阻塞
- 资源泄漏:及时清理资源,避免内存泄漏
下一步
学习测试策略,了解如何为 LangGraphJS 应用编写有效的测试。