跳到主要内容

🚀 性能优化

性能优化是构建高效 LangGraphJS 应用的关键。本节将介绍各种优化策略和技术,帮助你构建快速、可扩展的应用。

📊 性能监控

基础监控

基础性能监控:

import '../../utils/loadEnv';
import { StateGraph, Annotation } from '@langchain/langgraph';
import { BaseMessage, HumanMessage, AIMessage } from '@langchain/core/messages';

// 定义状态
const StateAnnotation = Annotation.Root({
messages: Annotation<BaseMessage[]>({
reducer: (x, y) => x.concat(y),
}),
metrics: Annotation<{
startTime?: number;
endTime?: number;
nodeExecutionTimes?: Record<string, number>;
totalExecutionTime?: number;
}>({
reducer: (x, y) => ({ ...x, ...y }),
default: () => ({}),
}),
});

// 性能监控装饰器
function withPerformanceMonitoring<T extends any[], R>(
nodeName: string,
nodeFunction: (...args: T) => Promise<R>
) {
return async (...args: T): Promise<R> => {
const startTime = performance.now();

try {
const result = await nodeFunction(...args);
const endTime = performance.now();
const executionTime = endTime - startTime;

console.log(
`[性能监控] 节点 "${nodeName}" 执行时间: ${executionTime.toFixed(2)}ms`
);

// 如果结果包含状态,添加性能指标
if (result && typeof result === 'object' && 'metrics' in result) {
(result as any).metrics = {
...(result as any).metrics,
nodeExecutionTimes: {
...((result as any).metrics?.nodeExecutionTimes || {}),
[nodeName]: executionTime,
},
};
}

return result;
} catch (error) {
const endTime = performance.now();
const executionTime = endTime - startTime;
console.error(
`[性能监控] 节点 "${nodeName}" 执行失败,耗时: ${executionTime.toFixed(
2
)}ms`,
error
);
throw error;
}
};
}

// 基础节点函数
async function processNode(state: typeof StateAnnotation.State) {
// 模拟一些处理时间
await new Promise((resolve) => setTimeout(resolve, Math.random() * 100));

return {
messages: [new AIMessage('处理完成')],
metrics: {
...state.metrics,
endTime: performance.now(),
},
};
}

async function analyzeNode(state: typeof StateAnnotation.State) {
// 模拟分析处理
await new Promise((resolve) => setTimeout(resolve, Math.random() * 150));

return {
messages: [new AIMessage('分析完成')],
metrics: {
...state.metrics,
},
};
}

async function finalizeNode(state: typeof StateAnnotation.State) {
// 模拟最终处理
await new Promise((resolve) => setTimeout(resolve, Math.random() * 50));

const endTime = performance.now();
const totalExecutionTime = state.metrics.startTime
? endTime - state.metrics.startTime
: 0;

return {
messages: [new AIMessage('处理完成')],
metrics: {
...state.metrics,
endTime,
totalExecutionTime,
},
};
}

// 包装节点函数
const monitoredProcessNode = withPerformanceMonitoring('process', processNode);
const monitoredAnalyzeNode = withPerformanceMonitoring('analyze', analyzeNode);
const monitoredFinalizeNode = withPerformanceMonitoring(
'finalize',
finalizeNode
);

// 创建图
const graph = new StateGraph(StateAnnotation)
.addNode('process', monitoredProcessNode)
.addNode('analyze', monitoredAnalyzeNode)
.addNode('finalize', monitoredFinalizeNode)
.addEdge('__start__', 'process')
.addEdge('process', 'analyze')
.addEdge('analyze', 'finalize')
.addEdge('finalize', '__end__');

const compiledGraph = graph.compile();

// 基础性能监控示例
export async function demonstrateBasicMonitoring() {
console.log('=== 基础性能监控演示 ===');

const startTime = performance.now();

const result = await compiledGraph.invoke({
messages: [new HumanMessage('开始处理')],
metrics: { startTime },
});

console.log('\n=== 性能报告 ===');
console.log(
'总执行时间:',
result.metrics.totalExecutionTime?.toFixed(2),
'ms'
);
console.log('节点执行时间:');

if (result.metrics.nodeExecutionTimes) {
Object.entries(result.metrics.nodeExecutionTimes).forEach(
([node, time]) => {
console.log(` ${node}: ${time.toFixed(2)}ms`);
}
);
}

return result;
}

// 性能监控类
export class PerformanceMonitor {
private metrics: Map<string, number[]> = new Map();
private currentExecution: Map<string, number> = new Map();

// 开始监控
startMonitoring(operationName: string) {
this.currentExecution.set(operationName, performance.now());
}

// 结束监控
endMonitoring(operationName: string) {
const startTime = this.currentExecution.get(operationName);
if (startTime) {
const endTime = performance.now();
const duration = endTime - startTime;

if (!this.metrics.has(operationName)) {
this.metrics.set(operationName, []);
}

this.metrics.get(operationName)!.push(duration);
this.currentExecution.delete(operationName);

return duration;
}

return 0;
}

// 获取统计信息
getStats(operationName: string) {
const times = this.metrics.get(operationName);
if (!times || times.length === 0) {
return null;
}

const sorted = [...times].sort((a, b) => a - b);
const sum = times.reduce((a, b) => a + b, 0);

return {
count: times.length,
min: Math.min(...times),
max: Math.max(...times),
avg: sum / times.length,
median: sorted[Math.floor(sorted.length / 2)],
p95: sorted[Math.floor(sorted.length * 0.95)],
p99: sorted[Math.floor(sorted.length * 0.99)],
};
}

// 获取所有统计信息
getAllStats() {
const stats: Record<string, any> = {};

for (const [operation] of this.metrics) {
stats[operation] = this.getStats(operation);
}

return stats;
}

// 重置监控数据
reset() {
this.metrics.clear();
this.currentExecution.clear();
}

// 生成报告
generateReport() {
const stats = this.getAllStats();

console.log('\n=== 性能监控报告 ===');

Object.entries(stats).forEach(([operation, stat]) => {
if (stat) {
console.log(`\n${operation}:`);
console.log(` 执行次数: ${stat.count}`);
console.log(` 平均时间: ${stat.avg.toFixed(2)}ms`);
console.log(` 最小时间: ${stat.min.toFixed(2)}ms`);
console.log(` 最大时间: ${stat.max.toFixed(2)}ms`);
console.log(` 中位数: ${stat.median.toFixed(2)}ms`);
console.log(` P95: ${stat.p95.toFixed(2)}ms`);
console.log(` P99: ${stat.p99.toFixed(2)}ms`);
}
});
}
}

// 全局性能监控器
export const globalMonitor = new PerformanceMonitor();

// 监控装饰器工厂
export function createMonitoredFunction<T extends any[], R>(
name: string,
fn: (...args: T) => Promise<R>,
monitor: PerformanceMonitor = globalMonitor
) {
return async (...args: T): Promise<R> => {
monitor.startMonitoring(name);

try {
const result = await fn(...args);
const duration = monitor.endMonitoring(name);
console.log(`[监控] ${name} 完成,耗时: ${duration.toFixed(2)}ms`);
return result;
} catch (error) {
monitor.endMonitoring(name);
console.error(`[监控] ${name} 失败:`, error);
throw error;
}
};
}

// 批量监控示例
export async function demonstrateBatchMonitoring() {
console.log('\n=== 批量监控演示 ===');

const monitor = new PerformanceMonitor();

// 创建监控的函数
const monitoredGraph = createMonitoredFunction(
'graph_execution',
async (input: any) => {
return compiledGraph.invoke({
...input,
metrics: { startTime: performance.now() },
});
},
monitor
);

// 执行多次以收集统计数据
for (let i = 0; i < 5; i++) {
await monitoredGraph({
messages: [new HumanMessage(`批量测试 ${i + 1}`)],
});

// 添加一些随机延迟
await new Promise((resolve) => setTimeout(resolve, Math.random() * 100));
}

// 生成报告
monitor.generateReport();

return monitor.getAllStats();
}

// 内存使用监控
export class MemoryMonitor {
private snapshots: Array<{
timestamp: number;
heapUsed: number;
heapTotal: number;
external: number;
}> = [];

// 拍摄内存快照
takeSnapshot() {
if (typeof process !== 'undefined' && process.memoryUsage) {
const memUsage = process.memoryUsage();
const snapshot = {
timestamp: Date.now(),
heapUsed: memUsage.heapUsed,
heapTotal: memUsage.heapTotal,
external: memUsage.external,
};

this.snapshots.push(snapshot);
return snapshot;
}

return null;
}

// 获取内存使用趋势
getMemoryTrend() {
if (this.snapshots.length < 2) {
return null;
}

const first = this.snapshots[0];
const last = this.snapshots[this.snapshots.length - 1];

return {
heapUsedChange: last.heapUsed - first.heapUsed,
heapTotalChange: last.heapTotal - first.heapTotal,
externalChange: last.external - first.external,
timeSpan: last.timestamp - first.timestamp,
};
}

// 生成内存报告
generateMemoryReport() {
if (this.snapshots.length === 0) {
console.log('没有内存快照数据');
return;
}

const latest = this.snapshots[this.snapshots.length - 1];
const trend = this.getMemoryTrend();

console.log('\n=== 内存使用报告 ===');
console.log(
`当前堆内存使用: ${(latest.heapUsed / 1024 / 1024).toFixed(2)} MB`
);
console.log(
`当前堆内存总量: ${(latest.heapTotal / 1024 / 1024).toFixed(2)} MB`
);
console.log(
`外部内存使用: ${(latest.external / 1024 / 1024).toFixed(2)} MB`
);

if (trend) {
console.log(`\n内存变化趋势 (${trend.timeSpan}ms 内):`);
console.log(
` 堆内存使用变化: ${(trend.heapUsedChange / 1024 / 1024).toFixed(
2
)} MB`
);
console.log(
` 堆内存总量变化: ${(trend.heapTotalChange / 1024 / 1024).toFixed(
2
)} MB`
);
console.log(
` 外部内存变化: ${(trend.externalChange / 1024 / 1024).toFixed(2)} MB`
);
}
}

// 重置快照
reset() {
this.snapshots = [];
}
}

// 综合监控示例
export async function demonstrateComprehensiveMonitoring() {
console.log('\n=== 综合监控演示 ===');

const perfMonitor = new PerformanceMonitor();
const memMonitor = new MemoryMonitor();

// 初始内存快照
memMonitor.takeSnapshot();

// 执行多次操作
for (let i = 0; i < 3; i++) {
perfMonitor.startMonitoring('comprehensive_test');

await compiledGraph.invoke({
messages: [new HumanMessage(`综合测试 ${i + 1}`)],
metrics: { startTime: performance.now() },
});

perfMonitor.endMonitoring('comprehensive_test');
memMonitor.takeSnapshot();
}

// 生成报告
perfMonitor.generateReport();
memMonitor.generateMemoryReport();

return {
performance: perfMonitor.getAllStats(),
memory: memMonitor.getMemoryTrend(),
};
}

⚡ 状态优化

状态最小化和优化策略

状态优化是提升 LangGraphJS 应用性能的关键。通过合理的状态设计和优化策略,可以显著减少内存使用和提高执行效率:

状态优化示例:

import '../../utils/loadEnv';
import { StateGraph, Annotation } from '@langchain/langgraph';
import { BaseMessage, HumanMessage, AIMessage } from '@langchain/core/messages';

// 未优化的状态定义 - 包含大量不必要的字段
const UnoptimizedStateAnnotation = Annotation.Root({
messages: Annotation<BaseMessage[]>({
reducer: (x, y) => x.concat(y),
}),
// 大量未使用的字段
unusedField1: Annotation<string>({
reducer: (x, y) => y ?? x,
default: () => '',
}),
unusedField2: Annotation<number[]>({
reducer: (x, y) => x.concat(y),
default: () => [],
}),
largeObject: Annotation<Record<string, any>>({
reducer: (x, y) => ({ ...x, ...y }),
default: () => ({}),
}),
// 深度嵌套的对象
deeplyNested: Annotation<{
level1: {
level2: {
level3: {
data: string[];
metadata: Record<string, any>;
};
};
};
}>({
reducer: (x, y) => ({ ...x, ...y }),
default: () => ({
level1: {
level2: {
level3: {
data: [],
metadata: {},
},
},
},
}),
}),
});

// 优化后的状态定义 - 只保留必要字段
const OptimizedStateAnnotation = Annotation.Root({
messages: Annotation<BaseMessage[]>({
reducer: (x, y) => x.concat(y),
}),
// 只保留实际使用的字段
currentStep: Annotation<string>({
reducer: (x, y) => y ?? x,
default: () => 'start',
}),
// 使用更高效的数据结构
processedCount: Annotation<number>({
reducer: (x, y) => y ?? x,
default: () => 0,
}),
});

// 状态最小化示例
export function demonstrateStateMinimization() {
console.log('=== 状态最小化演示 ===');

// 计算状态大小的辅助函数
const calculateStateSize = (state: any) => {
return JSON.stringify(state).length;
};

// 创建未优化状态的示例
const unoptimizedState = {
messages: [new HumanMessage('测试消息')],
unusedField1: 'unused data',
unusedField2: [1, 2, 3, 4, 5],
largeObject: {
key1: 'value1',
key2: 'value2',
key3: Array(100).fill('data'),
},
deeplyNested: {
level1: {
level2: {
level3: {
data: Array(50).fill('nested data'),
metadata: {
created: new Date().toISOString(),
version: '1.0.0',
tags: ['tag1', 'tag2', 'tag3'],
},
},
},
},
},
};

// 创建优化状态的示例
const optimizedState = {
messages: [new HumanMessage('测试消息')],
currentStep: 'processing',
processedCount: 1,
};

const unoptimizedSize = calculateStateSize(unoptimizedState);
const optimizedSize = calculateStateSize(optimizedState);

console.log('未优化状态大小:', unoptimizedSize, 'bytes');
console.log('优化后状态大小:', optimizedSize, 'bytes');
console.log(
'减少比例:',
(((unoptimizedSize - optimizedSize) / unoptimizedSize) * 100).toFixed(2) +
'%'
);

return {
unoptimizedSize,
optimizedSize,
reduction: (unoptimizedSize - optimizedSize) / unoptimizedSize,
};
}

// 状态字段使用分析器
export class StateFieldAnalyzer {
private fieldUsage: Map<string, number> = new Map();
private fieldAccess: Map<string, number> = new Map();

// 记录字段使用
recordFieldUsage(fieldName: string) {
this.fieldUsage.set(fieldName, (this.fieldUsage.get(fieldName) || 0) + 1);
}

// 记录字段访问
recordFieldAccess(fieldName: string) {
this.fieldAccess.set(fieldName, (this.fieldAccess.get(fieldName) || 0) + 1);
}

// 分析未使用的字段
analyzeUnusedFields(stateSchema: Record<string, any>) {
const allFields = Object.keys(stateSchema);
const usedFields = Array.from(this.fieldUsage.keys());
const unusedFields = allFields.filter(
(field) => !usedFields.includes(field)
);

console.log('\n=== 字段使用分析 ===');
console.log('总字段数:', allFields.length);
console.log('已使用字段数:', usedFields.length);
console.log('未使用字段数:', unusedFields.length);

if (unusedFields.length > 0) {
console.log('未使用的字段:', unusedFields);
}

// 显示字段使用频率
console.log('\n字段使用频率:');
for (const [field, count] of this.fieldUsage.entries()) {
console.log(` ${field}: ${count}`);
}

return {
totalFields: allFields.length,
usedFields: usedFields.length,
unusedFields: unusedFields.length,
unusedFieldsList: unusedFields,
usage: Object.fromEntries(this.fieldUsage),
};
}

// 重置统计
reset() {
this.fieldUsage.clear();
this.fieldAccess.clear();
}
}

// 智能状态优化器
export class StateOptimizer {
// 移除未使用的字段
static removeUnusedFields<T extends Record<string, any>>(
state: T,
usedFields: string[]
): Partial<T> {
const optimized: Partial<T> = {};

for (const field of usedFields) {
if (field in state) {
optimized[field as keyof T] = state[field];
}
}

return optimized;
}

// 压缩深度嵌套对象
static flattenNestedObject(
obj: any,
prefix = '',
maxDepth = 3
): Record<string, any> {
const flattened: Record<string, any> = {};

function flatten(current: any, prop: string, depth: number) {
if (depth > maxDepth) {
flattened[prop] = current;
return;
}

if (current && typeof current === 'object' && !Array.isArray(current)) {
for (const key in current) {
const newKey = prop ? `${prop}.${key}` : key;
flatten(current[key], newKey, depth + 1);
}
} else {
flattened[prop] = current;
}
}

flatten(obj, prefix, 0);
return flattened;
}

// 延迟加载大对象
static createLazyLoader<T>(loader: () => T): {
get: () => T;
loaded: boolean;
} {
let cached: T | null = null;
let loaded = false;

return {
get: () => {
if (!loaded) {
cached = loader();
loaded = true;
}
return cached!;
},
loaded,
};
}

// 状态大小分析
static analyzeStateSize(state: any): {
totalSize: number;
fieldSizes: Record<string, number>;
largestFields: Array<{ field: string; size: number }>;
} {
const fieldSizes: Record<string, number> = {};
let totalSize = 0;

for (const [key, value] of Object.entries(state)) {
const size = JSON.stringify(value).length;
fieldSizes[key] = size;
totalSize += size;
}

const largestFields = Object.entries(fieldSizes)
.map(([field, size]) => ({ field, size }))
.sort((a, b) => b.size - a.size)
.slice(0, 5);

return {
totalSize,
fieldSizes,
largestFields,
};
}
}

// 优化示例节点
async function optimizedProcessingNode(
state: typeof OptimizedStateAnnotation.State
) {
// 只访问需要的字段
const messageCount = state.messages.length;
const currentStep = state.currentStep;

// 模拟处理
await new Promise((resolve) => setTimeout(resolve, 10));

return {
messages: [
new AIMessage(`处理了 ${messageCount} 条消息,当前步骤: ${currentStep}`),
],
currentStep: 'completed',
processedCount: state.processedCount + 1,
};
}

// 创建优化的图
const optimizedGraph = new StateGraph(OptimizedStateAnnotation)
.addNode('process', optimizedProcessingNode)
.addEdge('__start__', 'process')
.addEdge('process', '__end__');

const compiledOptimizedGraph = optimizedGraph.compile();

// 状态优化演示
export async function demonstrateStateOptimization() {
console.log('=== 状态优化演示 ===');

// 1. 状态最小化
const minimizationResult = demonstrateStateMinimization();

// 2. 字段使用分析
const analyzer = new StateFieldAnalyzer();

// 模拟字段使用记录
analyzer.recordFieldUsage('messages');
analyzer.recordFieldUsage('currentStep');
analyzer.recordFieldUsage('processedCount');

const analysisResult = analyzer.analyzeUnusedFields({
messages: [],
currentStep: '',
processedCount: 0,
unusedField1: '',
unusedField2: [],
largeObject: {},
});

// 3. 执行优化的图
console.log('\n=== 执行优化图 ===');
const startTime = performance.now();

const result = await compiledOptimizedGraph.invoke({
messages: [new HumanMessage('开始优化处理')],
});

const endTime = performance.now();

console.log('执行时间:', (endTime - startTime).toFixed(2), 'ms');
console.log('最终状态大小:', JSON.stringify(result).length, 'bytes');

// 4. 状态大小分析
const sizeAnalysis = StateOptimizer.analyzeStateSize(result);

console.log('\n=== 状态大小分析 ===');
console.log('总大小:', sizeAnalysis.totalSize, 'bytes');
console.log('最大字段:');
sizeAnalysis.largestFields.forEach(({ field, size }) => {
console.log(` ${field}: ${size} bytes`);
});

return {
minimization: minimizationResult,
analysis: analysisResult,
execution: {
time: endTime - startTime,
stateSize: JSON.stringify(result).length,
},
sizeAnalysis,
};
}

// 延迟加载示例
export function demonstrateLazyLoading() {
console.log('\n=== 延迟加载演示 ===');

// 创建大数据的延迟加载器
const lazyData = StateOptimizer.createLazyLoader(() => {
console.log('正在加载大数据...');
return Array(1000)
.fill(0)
.map((_, i) => ({
id: i,
data: `large data item ${i}`,
metadata: { created: new Date(), size: Math.random() * 1000 },
}));
});

console.log('延迟加载器已创建,数据未加载');
console.log('已加载:', lazyData.loaded);

// 第一次访问时才加载数据
const data = lazyData.get();
console.log('数据已加载,项目数量:', data.length);
console.log('已加载:', lazyData.loaded);

// 后续访问使用缓存
const cachedData = lazyData.get();
console.log('使用缓存数据,项目数量:', cachedData.length);

return { dataSize: data.length, loaded: lazyData.loaded };
}

🔄 节点和流程优化

节点性能优化

  • 缓存策略:对重复计算结果进行缓存
  • 批处理:将多个小操作合并为批量操作
  • 异步优化:合理使用异步操作避免阻塞
  • 资源复用:复用昂贵的资源如数据库连接

流式处理优化

🔧 内存和资源管理

内存优化策略

  • 及时清理:主动释放不再需要的大对象
  • 对象池:复用频繁创建的对象
  • 弱引用:对于缓存数据使用弱引用
  • 分页处理:对大数据集进行分页处理

网络和并发优化

  • 连接池:复用网络连接减少开销
  • 请求合并:将多个小请求合并为批量请求
  • 并发控制:合理控制并发数量避免资源竞争
  • 背压机制:在高负载时实现流量控制

📈 并发优化

并发控制策略

通过合理的并发控制,可以在保证系统稳定性的同时最大化吞吐量。

🎯 特定场景优化

大规模数据处理

  • 流式处理:避免将大数据集完全加载到内存
  • 分块处理:将大任务分解为小块并行处理
  • 增量更新:只处理变化的数据部分

实时应用优化

  • 预热机制:提前初始化常用资源
  • 缓存预加载:预先加载可能需要的数据
  • 连接保持:维持长连接减少握手开销

📋 性能优化清单

性能优化检查清单

状态优化

  • 移除未使用的状态字段
  • 使用适当的 reducer 策略
  • 实现状态压缩
  • 避免深度嵌套对象

节点优化

  • 实现节点级缓存
  • 使用批处理减少调用次数
  • 优化异步操作
  • 避免阻塞操作

内存管理

  • 监控内存使用
  • 及时清理大对象
  • 使用对象池
  • 优化垃圾回收

网络优化

  • 使用连接池
  • 实现请求缓存
  • 压缩传输数据
  • 优化超时设置

并发优化

  • 控制并发数量
  • 使用工作队列
  • 实现背压机制
  • 优化资源分配

🎯 性能目标

响应时间目标

const PERFORMANCE_TARGETS = {
// 响应时间目标(毫秒)
simple_query: 100,
complex_query: 500,
batch_processing: 2000,

// 吞吐量目标(请求/秒)
concurrent_requests: 100,
peak_load: 500,

// 资源使用目标
memory_usage: '< 512MB',
cpu_usage: '< 70%',

// 可用性目标
uptime: '99.9%',
error_rate: '< 0.1%',
};

📈 监控和告警

关键指标

🔧 调优建议

开发阶段

  1. 设计阶段:考虑性能需求,选择合适的架构模式
  2. 编码阶段:遵循性能最佳实践,避免常见陷阱
  3. 测试阶段:进行性能测试,建立基准

生产阶段

  1. 监控:持续监控关键性能指标
  2. 分析:定期分析性能瓶颈
  3. 优化:基于数据进行针对性优化
  4. 验证:验证优化效果

小结

性能优化是一个持续的过程,需要:

  • 全面监控:建立完善的性能监控体系
  • 数据驱动:基于实际数据进行优化决策
  • 渐进优化:逐步优化,避免过度优化
  • 平衡权衡:在性能、可维护性和开发效率间找到平衡

下一节我们将学习测试策略,了解如何确保应用的质量和可靠性。