🚀 性能优化
性能优化是构建高效 LangGraphJS 应用的关键。本节将介绍各种优化策略和技术,帮助你构建快速、可扩展的应用。
📊 性能监控
基础监控
基础性能监控:
import '../../utils/loadEnv';
import { StateGraph, Annotation } from '@langchain/langgraph';
import { BaseMessage, HumanMessage, AIMessage } from '@langchain/core/messages';
// 定义状态
const StateAnnotation = Annotation.Root({
messages: Annotation<BaseMessage[]>({
reducer: (x, y) => x.concat(y),
}),
metrics: Annotation<{
startTime?: number;
endTime?: number;
nodeExecutionTimes?: Record<string, number>;
totalExecutionTime?: number;
}>({
reducer: (x, y) => ({ ...x, ...y }),
default: () => ({}),
}),
});
// 性能监控装饰器
function withPerformanceMonitoring<T extends any[], R>(
nodeName: string,
nodeFunction: (...args: T) => Promise<R>
) {
return async (...args: T): Promise<R> => {
const startTime = performance.now();
try {
const result = await nodeFunction(...args);
const endTime = performance.now();
const executionTime = endTime - startTime;
console.log(
`[性能监控] 节点 "${nodeName}" 执行时间: ${executionTime.toFixed(2)}ms`
);
// 如果结果包含状态,添加性能指标
if (result && typeof result === 'object' && 'metrics' in result) {
(result as any).metrics = {
...(result as any).metrics,
nodeExecutionTimes: {
...((result as any).metrics?.nodeExecutionTimes || {}),
[nodeName]: executionTime,
},
};
}
return result;
} catch (error) {
const endTime = performance.now();
const executionTime = endTime - startTime;
console.error(
`[性能监控] 节点 "${nodeName}" 执行失败,耗时: ${executionTime.toFixed(
2
)}ms`,
error
);
throw error;
}
};
}
// 基础节点函数
async function processNode(state: typeof StateAnnotation.State) {
// 模拟一些处理时间
await new Promise((resolve) => setTimeout(resolve, Math.random() * 100));
return {
messages: [new AIMessage('处理完成')],
metrics: {
...state.metrics,
endTime: performance.now(),
},
};
}
async function analyzeNode(state: typeof StateAnnotation.State) {
// 模拟分析处理
await new Promise((resolve) => setTimeout(resolve, Math.random() * 150));
return {
messages: [new AIMessage('分析完成')],
metrics: {
...state.metrics,
},
};
}
async function finalizeNode(state: typeof StateAnnotation.State) {
// 模拟最终处理
await new Promise((resolve) => setTimeout(resolve, Math.random() * 50));
const endTime = performance.now();
const totalExecutionTime = state.metrics.startTime
? endTime - state.metrics.startTime
: 0;
return {
messages: [new AIMessage('处理完成')],
metrics: {
...state.metrics,
endTime,
totalExecutionTime,
},
};
}
// 包装节点函数
const monitoredProcessNode = withPerformanceMonitoring('process', processNode);
const monitoredAnalyzeNode = withPerformanceMonitoring('analyze', analyzeNode);
const monitoredFinalizeNode = withPerformanceMonitoring(
'finalize',
finalizeNode
);
// 创建图
const graph = new StateGraph(StateAnnotation)
.addNode('process', monitoredProcessNode)
.addNode('analyze', monitoredAnalyzeNode)
.addNode('finalize', monitoredFinalizeNode)
.addEdge('__start__', 'process')
.addEdge('process', 'analyze')
.addEdge('analyze', 'finalize')
.addEdge('finalize', '__end__');
const compiledGraph = graph.compile();
// 基础性能监控示例
export async function demonstrateBasicMonitoring() {
console.log('=== 基础性能监控演示 ===');
const startTime = performance.now();
const result = await compiledGraph.invoke({
messages: [new HumanMessage('开始处理')],
metrics: { startTime },
});
console.log('\n=== 性能报告 ===');
console.log(
'总执行时间:',
result.metrics.totalExecutionTime?.toFixed(2),
'ms'
);
console.log('节点执行时间:');
if (result.metrics.nodeExecutionTimes) {
Object.entries(result.metrics.nodeExecutionTimes).forEach(
([node, time]) => {
console.log(` ${node}: ${time.toFixed(2)}ms`);
}
);
}
return result;
}
// 性能监控类
export class PerformanceMonitor {
private metrics: Map<string, number[]> = new Map();
private currentExecution: Map<string, number> = new Map();
// 开始监控
startMonitoring(operationName: string) {
this.currentExecution.set(operationName, performance.now());
}
// 结束监控
endMonitoring(operationName: string) {
const startTime = this.currentExecution.get(operationName);
if (startTime) {
const endTime = performance.now();
const duration = endTime - startTime;
if (!this.metrics.has(operationName)) {
this.metrics.set(operationName, []);
}
this.metrics.get(operationName)!.push(duration);
this.currentExecution.delete(operationName);
return duration;
}
return 0;
}
// 获取统计信息
getStats(operationName: string) {
const times = this.metrics.get(operationName);
if (!times || times.length === 0) {
return null;
}
const sorted = [...times].sort((a, b) => a - b);
const sum = times.reduce((a, b) => a + b, 0);
return {
count: times.length,
min: Math.min(...times),
max: Math.max(...times),
avg: sum / times.length,
median: sorted[Math.floor(sorted.length / 2)],
p95: sorted[Math.floor(sorted.length * 0.95)],
p99: sorted[Math.floor(sorted.length * 0.99)],
};
}
// 获取所有统计信息
getAllStats() {
const stats: Record<string, any> = {};
for (const [operation] of this.metrics) {
stats[operation] = this.getStats(operation);
}
return stats;
}
// 重置监控数据
reset() {
this.metrics.clear();
this.currentExecution.clear();
}
// 生成报告
generateReport() {
const stats = this.getAllStats();
console.log('\n=== 性能监控报告 ===');
Object.entries(stats).forEach(([operation, stat]) => {
if (stat) {
console.log(`\n${operation}:`);
console.log(` 执行次数: ${stat.count}`);
console.log(` 平均时间: ${stat.avg.toFixed(2)}ms`);
console.log(` 最小时间: ${stat.min.toFixed(2)}ms`);
console.log(` 最大时间: ${stat.max.toFixed(2)}ms`);
console.log(` 中位数: ${stat.median.toFixed(2)}ms`);
console.log(` P95: ${stat.p95.toFixed(2)}ms`);
console.log(` P99: ${stat.p99.toFixed(2)}ms`);
}
});
}
}
// 全局性能监控器
export const globalMonitor = new PerformanceMonitor();
// 监控装饰器工厂
export function createMonitoredFunction<T extends any[], R>(
name: string,
fn: (...args: T) => Promise<R>,
monitor: PerformanceMonitor = globalMonitor
) {
return async (...args: T): Promise<R> => {
monitor.startMonitoring(name);
try {
const result = await fn(...args);
const duration = monitor.endMonitoring(name);
console.log(`[监控] ${name} 完成,耗时: ${duration.toFixed(2)}ms`);
return result;
} catch (error) {
monitor.endMonitoring(name);
console.error(`[监控] ${name} 失败:`, error);
throw error;
}
};
}
// 批量监控示例
export async function demonstrateBatchMonitoring() {
console.log('\n=== 批量监控演示 ===');
const monitor = new PerformanceMonitor();
// 创建监控的函数
const monitoredGraph = createMonitoredFunction(
'graph_execution',
async (input: any) => {
return compiledGraph.invoke({
...input,
metrics: { startTime: performance.now() },
});
},
monitor
);
// 执行多次以收集统计数据
for (let i = 0; i < 5; i++) {
await monitoredGraph({
messages: [new HumanMessage(`批量测试 ${i + 1}`)],
});
// 添加一些随机延迟
await new Promise((resolve) => setTimeout(resolve, Math.random() * 100));
}
// 生成报告
monitor.generateReport();
return monitor.getAllStats();
}
// 内存使用监控
export class MemoryMonitor {
private snapshots: Array<{
timestamp: number;
heapUsed: number;
heapTotal: number;
external: number;
}> = [];
// 拍摄内存快照
takeSnapshot() {
if (typeof process !== 'undefined' && process.memoryUsage) {
const memUsage = process.memoryUsage();
const snapshot = {
timestamp: Date.now(),
heapUsed: memUsage.heapUsed,
heapTotal: memUsage.heapTotal,
external: memUsage.external,
};
this.snapshots.push(snapshot);
return snapshot;
}
return null;
}
// 获取内存使用趋势
getMemoryTrend() {
if (this.snapshots.length < 2) {
return null;
}
const first = this.snapshots[0];
const last = this.snapshots[this.snapshots.length - 1];
return {
heapUsedChange: last.heapUsed - first.heapUsed,
heapTotalChange: last.heapTotal - first.heapTotal,
externalChange: last.external - first.external,
timeSpan: last.timestamp - first.timestamp,
};
}
// 生成内存报告
generateMemoryReport() {
if (this.snapshots.length === 0) {
console.log('没有内存快照数据');
return;
}
const latest = this.snapshots[this.snapshots.length - 1];
const trend = this.getMemoryTrend();
console.log('\n=== 内存使用报告 ===');
console.log(
`当前堆内存使用: ${(latest.heapUsed / 1024 / 1024).toFixed(2)} MB`
);
console.log(
`当前堆内存总量: ${(latest.heapTotal / 1024 / 1024).toFixed(2)} MB`
);
console.log(
`外部内存使用: ${(latest.external / 1024 / 1024).toFixed(2)} MB`
);
if (trend) {
console.log(`\n内存变化趋势 (${trend.timeSpan}ms 内):`);
console.log(
` 堆内存使用变化: ${(trend.heapUsedChange / 1024 / 1024).toFixed(
2
)} MB`
);
console.log(
` 堆内存总量变化: ${(trend.heapTotalChange / 1024 / 1024).toFixed(
2
)} MB`
);
console.log(
` 外部内存变化: ${(trend.externalChange / 1024 / 1024).toFixed(2)} MB`
);
}
}
// 重置快照
reset() {
this.snapshots = [];
}
}
// 综合监控示例
export async function demonstrateComprehensiveMonitoring() {
console.log('\n=== 综合监控演示 ===');
const perfMonitor = new PerformanceMonitor();
const memMonitor = new MemoryMonitor();
// 初始内存快照
memMonitor.takeSnapshot();
// 执行多次操作
for (let i = 0; i < 3; i++) {
perfMonitor.startMonitoring('comprehensive_test');
await compiledGraph.invoke({
messages: [new HumanMessage(`综合测试 ${i + 1}`)],
metrics: { startTime: performance.now() },
});
perfMonitor.endMonitoring('comprehensive_test');
memMonitor.takeSnapshot();
}
// 生成报告
perfMonitor.generateReport();
memMonitor.generateMemoryReport();
return {
performance: perfMonitor.getAllStats(),
memory: memMonitor.getMemoryTrend(),
};
}
⚡ 状态优化
状态最小化和优化策略
状态优化是提升 LangGraphJS 应用性能的关键。通过合理的状态设计和优化策略,可以显著减少内存使用和提高执行效率:
状态优化示例:
import '../../utils/loadEnv';
import { StateGraph, Annotation } from '@langchain/langgraph';
import { BaseMessage, HumanMessage, AIMessage } from '@langchain/core/messages';
// 未优化的状态定义 - 包含大量不必要的字段
const UnoptimizedStateAnnotation = Annotation.Root({
messages: Annotation<BaseMessage[]>({
reducer: (x, y) => x.concat(y),
}),
// 大量未使用的字段
unusedField1: Annotation<string>({
reducer: (x, y) => y ?? x,
default: () => '',
}),
unusedField2: Annotation<number[]>({
reducer: (x, y) => x.concat(y),
default: () => [],
}),
largeObject: Annotation<Record<string, any>>({
reducer: (x, y) => ({ ...x, ...y }),
default: () => ({}),
}),
// 深度嵌套的对象
deeplyNested: Annotation<{
level1: {
level2: {
level3: {
data: string[];
metadata: Record<string, any>;
};
};
};
}>({
reducer: (x, y) => ({ ...x, ...y }),
default: () => ({
level1: {
level2: {
level3: {
data: [],
metadata: {},
},
},
},
}),
}),
});
// 优化后的状态定义 - 只保留必要字段
const OptimizedStateAnnotation = Annotation.Root({
messages: Annotation<BaseMessage[]>({
reducer: (x, y) => x.concat(y),
}),
// 只保留实际使用的字段
currentStep: Annotation<string>({
reducer: (x, y) => y ?? x,
default: () => 'start',
}),
// 使用更高效的数据结构
processedCount: Annotation<number>({
reducer: (x, y) => y ?? x,
default: () => 0,
}),
});
// 状态最小化示例
export function demonstrateStateMinimization() {
console.log('=== 状态最小化演示 ===');
// 计算状态大小的辅助函数
const calculateStateSize = (state: any) => {
return JSON.stringify(state).length;
};
// 创建未优化状态的示例
const unoptimizedState = {
messages: [new HumanMessage('测试消息')],
unusedField1: 'unused data',
unusedField2: [1, 2, 3, 4, 5],
largeObject: {
key1: 'value1',
key2: 'value2',
key3: Array(100).fill('data'),
},
deeplyNested: {
level1: {
level2: {
level3: {
data: Array(50).fill('nested data'),
metadata: {
created: new Date().toISOString(),
version: '1.0.0',
tags: ['tag1', 'tag2', 'tag3'],
},
},
},
},
},
};
// 创建优化状态的示例
const optimizedState = {
messages: [new HumanMessage('测试消息')],
currentStep: 'processing',
processedCount: 1,
};
const unoptimizedSize = calculateStateSize(unoptimizedState);
const optimizedSize = calculateStateSize(optimizedState);
console.log('未优化状态大小:', unoptimizedSize, 'bytes');
console.log('优化后状态大小:', optimizedSize, 'bytes');
console.log(
'减少比例:',
(((unoptimizedSize - optimizedSize) / unoptimizedSize) * 100).toFixed(2) +
'%'
);
return {
unoptimizedSize,
optimizedSize,
reduction: (unoptimizedSize - optimizedSize) / unoptimizedSize,
};
}
// 状态字段使用分析器
export class StateFieldAnalyzer {
private fieldUsage: Map<string, number> = new Map();
private fieldAccess: Map<string, number> = new Map();
// 记录字段使用
recordFieldUsage(fieldName: string) {
this.fieldUsage.set(fieldName, (this.fieldUsage.get(fieldName) || 0) + 1);
}
// 记录字段访问
recordFieldAccess(fieldName: string) {
this.fieldAccess.set(fieldName, (this.fieldAccess.get(fieldName) || 0) + 1);
}
// 分析未使用的字段
analyzeUnusedFields(stateSchema: Record<string, any>) {
const allFields = Object.keys(stateSchema);
const usedFields = Array.from(this.fieldUsage.keys());
const unusedFields = allFields.filter(
(field) => !usedFields.includes(field)
);
console.log('\n=== 字段使用分析 ===');
console.log('总字段数:', allFields.length);
console.log('已使用字段数:', usedFields.length);
console.log('未使用字段数:', unusedFields.length);
if (unusedFields.length > 0) {
console.log('未使用的字段:', unusedFields);
}
// 显示字段使用频率
console.log('\n字段使用频率:');
for (const [field, count] of this.fieldUsage.entries()) {
console.log(` ${field}: ${count} 次`);
}
return {
totalFields: allFields.length,
usedFields: usedFields.length,
unusedFields: unusedFields.length,
unusedFieldsList: unusedFields,
usage: Object.fromEntries(this.fieldUsage),
};
}
// 重置统计
reset() {
this.fieldUsage.clear();
this.fieldAccess.clear();
}
}
// 智能状态优化器
export class StateOptimizer {
// 移除未使用的字段
static removeUnusedFields<T extends Record<string, any>>(
state: T,
usedFields: string[]
): Partial<T> {
const optimized: Partial<T> = {};
for (const field of usedFields) {
if (field in state) {
optimized[field as keyof T] = state[field];
}
}
return optimized;
}
// 压缩深度嵌套对象
static flattenNestedObject(
obj: any,
prefix = '',
maxDepth = 3
): Record<string, any> {
const flattened: Record<string, any> = {};
function flatten(current: any, prop: string, depth: number) {
if (depth > maxDepth) {
flattened[prop] = current;
return;
}
if (current && typeof current === 'object' && !Array.isArray(current)) {
for (const key in current) {
const newKey = prop ? `${prop}.${key}` : key;
flatten(current[key], newKey, depth + 1);
}
} else {
flattened[prop] = current;
}
}
flatten(obj, prefix, 0);
return flattened;
}
// 延迟加载大对象
static createLazyLoader<T>(loader: () => T): {
get: () => T;
loaded: boolean;
} {
let cached: T | null = null;
let loaded = false;
return {
get: () => {
if (!loaded) {
cached = loader();
loaded = true;
}
return cached!;
},
loaded,
};
}
// 状态大小分析
static analyzeStateSize(state: any): {
totalSize: number;
fieldSizes: Record<string, number>;
largestFields: Array<{ field: string; size: number }>;
} {
const fieldSizes: Record<string, number> = {};
let totalSize = 0;
for (const [key, value] of Object.entries(state)) {
const size = JSON.stringify(value).length;
fieldSizes[key] = size;
totalSize += size;
}
const largestFields = Object.entries(fieldSizes)
.map(([field, size]) => ({ field, size }))
.sort((a, b) => b.size - a.size)
.slice(0, 5);
return {
totalSize,
fieldSizes,
largestFields,
};
}
}
// 优化示例节点
async function optimizedProcessingNode(
state: typeof OptimizedStateAnnotation.State
) {
// 只访问需要的字段
const messageCount = state.messages.length;
const currentStep = state.currentStep;
// 模拟处理
await new Promise((resolve) => setTimeout(resolve, 10));
return {
messages: [
new AIMessage(`处理了 ${messageCount} 条消息,当前步骤: ${currentStep}`),
],
currentStep: 'completed',
processedCount: state.processedCount + 1,
};
}
// 创建优化的图
const optimizedGraph = new StateGraph(OptimizedStateAnnotation)
.addNode('process', optimizedProcessingNode)
.addEdge('__start__', 'process')
.addEdge('process', '__end__');
const compiledOptimizedGraph = optimizedGraph.compile();
// 状态优化演示
export async function demonstrateStateOptimization() {
console.log('=== 状态优化演示 ===');
// 1. 状态最小化
const minimizationResult = demonstrateStateMinimization();
// 2. 字段使用分析
const analyzer = new StateFieldAnalyzer();
// 模拟字段使用记录
analyzer.recordFieldUsage('messages');
analyzer.recordFieldUsage('currentStep');
analyzer.recordFieldUsage('processedCount');
const analysisResult = analyzer.analyzeUnusedFields({
messages: [],
currentStep: '',
processedCount: 0,
unusedField1: '',
unusedField2: [],
largeObject: {},
});
// 3. 执行优化的图
console.log('\n=== 执行优化图 ===');
const startTime = performance.now();
const result = await compiledOptimizedGraph.invoke({
messages: [new HumanMessage('开始优化处理')],
});
const endTime = performance.now();
console.log('执行时间:', (endTime - startTime).toFixed(2), 'ms');
console.log('最终状态大小:', JSON.stringify(result).length, 'bytes');
// 4. 状态大小分析
const sizeAnalysis = StateOptimizer.analyzeStateSize(result);
console.log('\n=== 状态大小分析 ===');
console.log('总大小:', sizeAnalysis.totalSize, 'bytes');
console.log('最大字段:');
sizeAnalysis.largestFields.forEach(({ field, size }) => {
console.log(` ${field}: ${size} bytes`);
});
return {
minimization: minimizationResult,
analysis: analysisResult,
execution: {
time: endTime - startTime,
stateSize: JSON.stringify(result).length,
},
sizeAnalysis,
};
}
// 延迟加载示例
export function demonstrateLazyLoading() {
console.log('\n=== 延迟加载演示 ===');
// 创建大数据的延迟加载器
const lazyData = StateOptimizer.createLazyLoader(() => {
console.log('正在加载大数据...');
return Array(1000)
.fill(0)
.map((_, i) => ({
id: i,
data: `large data item ${i}`,
metadata: { created: new Date(), size: Math.random() * 1000 },
}));
});
console.log('延迟加载器已创建,数据未加载');
console.log('已加载:', lazyData.loaded);
// 第一次访问时才加载数据
const data = lazyData.get();
console.log('数据已加载,项目数量:', data.length);
console.log('已加载:', lazyData.loaded);
// 后续访问使用缓存
const cachedData = lazyData.get();
console.log('使用缓存数据,项目数量:', cachedData.length);
return { dataSize: data.length, loaded: lazyData.loaded };
}
🔄 节点和流程优化
节点性能优化
- 缓存策略:对重复计算结果进行缓存
- 批处理:将多个小操作合并为批量操作
- 异步优化:合理使用异步操作避免阻塞
- 资源复用:复用昂贵的资源如数据库连接
流式处理优化
🔧 内存和资源管理
内存优化策略
- 及时清理:主动释放不再需要的大对象
- 对象池:复用频繁创建的对象
- 弱引用:对于缓存数据使用弱引用
- 分页处理:对大数据集进行分页处理
网络和并发优化
- 连接池:复用网络连接减少开销
- 请求合并:将多个小请求合并为批量请求
- 并发控制:合理控制并发数量避免资源竞争
- 背压机制:在高负载时实现流量控制
📈 并发优化
并发控制策略
通过合理的并发控制,可以在保证系统稳定性的同时最大化吞吐量。
🎯 特定场景优化
大规模数据处理
- 流式处理:避免将大数据集完全加载到内存
- 分块处理:将大任务分解为小块并行处理
- 增量更新:只处理变化的数据部分
实时应用优化
- 预热机制:提前初始化常用资源
- 缓存预加载:预先加载可能需要的数据
- 连接保持:维持长连接减少握手开销
📋 性能优化清单
性能优化检查清单
状态优化
- 移除未使用的状态字段
- 使用适当的 reducer 策略
- 实现状态压缩
- 避免深度嵌套对象
节点优化
- 实现节点级缓存
- 使用批处理减少调用次数
- 优化异步操作
- 避免阻塞操作
内存管理
- 监控内存使用
- 及时清理大对象
- 使用对象池
- 优化垃圾回收
网络优化
- 使用连接池
- 实现请求缓存
- 压缩传输数据
- 优化超时设置
并发优化
- 控制并发数量
- 使用工作队列
- 实现背压机制
- 优化资源分配
🎯 性能目标
响应时间目标
const PERFORMANCE_TARGETS = {
// 响应时间目标(毫秒)
simple_query: 100,
complex_query: 500,
batch_processing: 2000,
// 吞吐量目标(请求/秒)
concurrent_requests: 100,
peak_load: 500,
// 资源使用目标
memory_usage: '< 512MB',
cpu_usage: '< 70%',
// 可用性目标
uptime: '99.9%',
error_rate: '< 0.1%',
};
📈 监控和告警
关键指标
🔧 调优建议
开发阶段
- 设计阶段:考虑性能需求,选择合适的架构模式
- 编码阶段:遵循性能最佳实践,避免常见陷阱
- 测试阶段:进行性能测试,建立基准
生产阶段
- 监控:持续监控关键性能指标
- 分析:定期分析性能瓶颈
- 优化:基于数据进行针对性优化
- 验证:验证优化效果
小结
性能优化是一个持续的过程,需要:
- 全面监控:建立完善的性能监控体系
- 数据驱动:基于实际数据进行优化决策
- 渐进优化:逐步优化,避免过度优化
- 平衡权衡:在性能、可维护性和开发效率间找到平衡
下一节我们将学习测试策略,了解如何确保应用的质量和可靠性。