跳到主要内容

🧪 测试策略

完善的测试策略是确保 LangGraphJS 应用质量和可靠性的关键。本节将介绍各种测试方法和最佳实践。

🎯 测试金字塔

测试层级说明

  • 单元测试:测试单个节点和函数
  • 集成测试:测试节点间的交互
  • 端到端测试:测试完整的用户场景

🔧 单元测试

节点测试

节点单元测试
import { describe, it, expect } from 'vitest';
import { StateGraph, Annotation } from '@langchain/langgraph';

// 测试状态
const NodeTestState = Annotation.Root({
input: Annotation<string>(),
output: Annotation<string>(),
processed: Annotation<boolean>(),
});

// 测试节点
const processTextNode = async (state: typeof NodeTestState.State) => {
if (!state.input) throw new Error('Input required');
return {
output: state.input.toUpperCase(),
processed: true,
};
};

const validateInputNode = async (state: typeof NodeTestState.State) => {
if (!state.input?.trim()) throw new Error('Invalid input');
if (state.input.length > 100) throw new Error('Input too long');
return { processed: true };
};

describe('节点单元测试', () => {
it('应该处理文本转换', async () => {
const result = await processTextNode({
input: 'hello world',
output: '',
processed: false,
});

expect(result.output).toBe('HELLO WORLD');
expect(result.processed).toBe(true);
});

it('应该验证输入', async () => {
await expect(
validateInputNode({
input: '',
output: '',
processed: false,
})
).rejects.toThrow('Invalid input');
});

it('应该在图中正确执行', async () => {
const graph = new StateGraph(NodeTestState)
.addNode('validate', validateInputNode)
.addNode('process', processTextNode)
.addEdge('__start__', 'validate')
.addEdge('validate', 'process')
.addEdge('process', '__end__');

const app = graph.compile();
const result = await app.invoke({
input: 'test input',
output: '',
processed: false,
});

expect(result.output).toBe('TEST INPUT');
expect(result.processed).toBe(true);
});
});

状态测试

状态单元测试
import { describe, it, expect } from 'vitest';
import { Annotation } from '@langchain/langgraph';

// 基础状态
const BasicState = Annotation.Root({
message: Annotation<string>(),
count: Annotation<number>(),
items: Annotation<string[]>({
reducer: (x, y) => [...(x || []), ...(y || [])],
}),
});

// 复杂状态
const ComplexState = Annotation.Root({
user: Annotation<{ id: string; name: string }>(),
messages: Annotation<Array<{ id: string; content: string }>>({
reducer: (x, y) => [...(x || []), ...(y || [])],
}),
});

describe('状态单元测试', () => {
it('应该正确初始化状态', () => {
const state: typeof BasicState.State = {
message: 'Hello',
count: 5,
items: ['item1', 'item2'],
};

expect(state.message).toBe('Hello');
expect(state.count).toBe(5);
expect(state.items).toEqual(['item1', 'item2']);
});

it('应该正确处理状态更新', () => {
const itemsReducer = BasicState.spec.items.reducer;
const newItems = itemsReducer(['item1'], ['item2', 'item3']);

expect(newItems).toEqual(['item1', 'item2', 'item3']);
});

it('应该正确合并复杂对象', () => {
const userReducer = ComplexState.spec.user.reducer;
const updatedUser = userReducer(
{ id: 'user1', name: 'John' },
{ name: 'Jane' }
);

expect(updatedUser).toEqual({ id: 'user1', name: 'Jane' });
});

it('应该保持状态不变性', () => {
const original = ['item1'];
const itemsReducer = BasicState.spec.items.reducer;
const result = itemsReducer(original, ['item2']);

expect(original).toEqual(['item1']);
expect(result).not.toBe(original);
});
});

Reducer 测试

Reducer测试
import { describe, it, expect } from 'vitest';

// 基础 reducer 函数
const stringReplaceReducer = (x: string | undefined, y: string) => y;
const numberAddReducer = (x: number | undefined, y: number) => (x || 0) + y;
const arrayMergeReducer = (x: string[] | undefined, y: string[]) => [
...(x || []),
...y,
];
const objectMergeReducer = (
x: Record<string, any> | undefined,
y: Record<string, any>
) => ({ ...x, ...y });

// 复杂 reducer:消息历史处理
const messageHistoryReducer = (
x: Array<{ role: string; content: string; timestamp: number }> | undefined,
y: Array<{ role: string; content: string; timestamp: number }>
) => {
const existing = x || [];
const combined = [...existing, ...y];
const unique = combined.filter(
(msg, index, arr) =>
arr.findIndex((m) => m.timestamp === msg.timestamp) === index
);
return unique.sort((a, b) => a.timestamp - b.timestamp);
};

describe('Reducer 测试', () => {
it('应该正确替换和累加', () => {
expect(stringReplaceReducer('old', 'new')).toBe('new');
expect(numberAddReducer(5, 3)).toBe(8);
expect(numberAddReducer(undefined, 5)).toBe(5);
});

it('应该正确合并数组和对象', () => {
expect(arrayMergeReducer(['a', 'b'], ['c', 'd'])).toEqual([
'a',
'b',
'c',
'd',
]);
expect(objectMergeReducer({ a: 1 }, { b: 2 })).toEqual({ a: 1, b: 2 });
});

it('应该处理消息历史', () => {
const existing = [{ role: 'user', content: 'Hello', timestamp: 1000 }];
const newMessages = [{ role: 'assistant', content: 'Hi', timestamp: 2000 }];

const result = messageHistoryReducer(existing, newMessages);
expect(result).toHaveLength(2);
expect(result[0].timestamp).toBe(1000);
});

it('应该保持不变性', () => {
const original = ['a', 'b'];
const result = arrayMergeReducer(original, ['c']);

expect(original).toEqual(['a', 'b']);
expect(result).not.toBe(original);
});
});

🔗 集成测试

图执行测试

图集成测试
import { describe, it, expect, beforeEach } from 'vitest';
import { StateGraph, Annotation, START, END } from '@langchain/langgraph';

// 测试状态定义
const IntegrationTestState = Annotation.Root({
input: Annotation<string>({
reducer: (x, y) => y,
}),
output: Annotation<string>({
reducer: (x, y) => y,
}),
step: Annotation<string>({
reducer: (x, y) => y,
}),
count: Annotation<number>({
reducer: (x, y) => (x || 0) + (y || 0),
}),
processed: Annotation<boolean>({
reducer: (x, y) => y,
}),
errors: Annotation<string[]>({
reducer: (x, y) => [...(x || []), ...(y || [])],
}),
});

// 节点函数
const inputNode = async (state: typeof IntegrationTestState.State) => {
return {
step: 'input_processed',
count: 1,
};
};

const processNode = async (state: typeof IntegrationTestState.State) => {
const { input } = state;

if (!input) {
return {
errors: ['Input is required'],
step: 'error',
};
}

return {
output: input.toUpperCase(),
step: 'processed',
count: 1,
processed: true,
};
};

const validateNode = async (state: typeof IntegrationTestState.State) => {
const { output } = state;

if (!output || output.length === 0) {
return {
errors: ['Output validation failed'],
step: 'validation_error',
};
}

return {
step: 'validated',
count: 1,
};
};

// 条件函数
const shouldProcess = (state: typeof IntegrationTestState.State) => {
return state.input && state.input.length > 0 ? 'process' : 'error';
};

const shouldValidate = (state: typeof IntegrationTestState.State) => {
return state.processed ? 'validate' : 'error';
};

describe('图集成测试', () => {
describe('基础图执行测试', () => {
it('应该正确执行简单的线性图', async () => {
const graph = new StateGraph(IntegrationTestState);

graph
.addNode('input', inputNode)
.addNode('process', processNode)
.addEdge(START, 'input')
.addEdge('input', 'process')
.addEdge('process', END);

const app = graph.compile();

const result = await app.invoke({
input: 'hello world',
output: '',
step: '',
count: 0,
processed: false,
errors: [],
});

expect(result.output).toBe('HELLO WORLD');
expect(result.step).toBe('processed');
expect(result.count).toBe(2); // input + process
expect(result.processed).toBe(true);
});

it('应该正确处理条件边', async () => {
const graph = new StateGraph(IntegrationTestState);

graph
.addNode('input', inputNode)
.addNode('process', processNode)
.addEdge(START, 'input')
.addConditionalEdges('input', shouldProcess, {
process: 'process',
error: 'error',
})
.addEdge('process', END)
.addEdge('error', END);

const app = graph.compile();

// 测试正常流程
const result1 = await app.invoke({
input: 'test',
output: '',
step: '',
count: 0,
processed: false,
errors: [],
});

expect(result1.output).toBe('TEST');
expect(result1.processed).toBe(true);

// 测试错误流程
const result2 = await app.invoke({
input: '',
output: '',
step: '',
count: 0,
processed: false,
errors: [],
});

expect(result2.step).toBe('error');
expect(result2.processed).toBe(false);
});

it('应该正确处理复杂的图结构', async () => {
const graph = new StateGraph(IntegrationTestState);

graph
.addNode('input', inputNode)
.addNode('process', processNode)
.addNode('validate', validateNode)
.addEdge(START, 'input')
.addConditionalEdges('input', shouldProcess, {
process: 'process',
error: 'error',
})
.addConditionalEdges('process', shouldValidate, {
validate: 'validate',
error: 'error',
})
.addEdge('validate', END)
.addEdge('error', END);

const app = graph.compile();

const result = await app.invoke({
input: 'integration test',
output: '',
step: '',
count: 0,
processed: false,
errors: [],
});

expect(result.output).toBe('INTEGRATION TEST');
expect(result.step).toBe('validated');
expect(result.count).toBe(3); // input + process + validate
expect(result.processed).toBe(true);
});
});

describe('并行执行测试', () => {
it('应该支持并行节点执行', async () => {
const parallelNode1 = async (
state: typeof IntegrationTestState.State
) => {
await new Promise((resolve) => setTimeout(resolve, 50));
return {
step: 'parallel1_done',
count: 1,
};
};

const parallelNode2 = async (
state: typeof IntegrationTestState.State
) => {
await new Promise((resolve) => setTimeout(resolve, 30));
return {
step: 'parallel2_done',
count: 1,
};
};

const mergeNode = async (state: typeof IntegrationTestState.State) => {
return {
step: 'merged',
count: 1,
processed: true,
};
};

const graph = new StateGraph(IntegrationTestState);

graph
.addNode('input', inputNode)
.addNode('parallel1', parallelNode1)
.addNode('parallel2', parallelNode2)
.addNode('merge', mergeNode)
.addEdge(START, 'input')
.addEdge('input', 'parallel1')
.addEdge('input', 'parallel2')
.addEdge('parallel1', 'merge')
.addEdge('parallel2', 'merge')
.addEdge('merge', END);

const app = graph.compile();

const startTime = performance.now();
const result = await app.invoke({
input: 'parallel test',
output: '',
step: '',
count: 0,
processed: false,
errors: [],
});
const endTime = performance.now();

expect(result.step).toBe('merged');
expect(result.count).toBe(4); // input + parallel1 + parallel2 + merge
expect(result.processed).toBe(true);

// 并行执行应该比串行执行快
expect(endTime - startTime).toBeLessThan(100);
});
});

describe('状态管理测试', () => {
it('应该正确累积状态变化', async () => {
const incrementNode = async (
state: typeof IntegrationTestState.State
) => {
return {
count: 5,
step: `increment_${state.count || 0}`,
};
};

const graph = new StateGraph(IntegrationTestState);

graph
.addNode('increment1', incrementNode)
.addNode('increment2', incrementNode)
.addNode('increment3', incrementNode)
.addEdge(START, 'increment1')
.addEdge('increment1', 'increment2')
.addEdge('increment2', 'increment3')
.addEdge('increment3', END);

const app = graph.compile();

const result = await app.invoke({
input: '',
output: '',
step: '',
count: 0,
processed: false,
errors: [],
});

expect(result.count).toBe(15); // 5 + 5 + 5
expect(result.step).toBe('increment_10'); // 最后一次的状态
});
});
});

工具集成测试

工具集成测试
import { describe, it, expect, beforeEach } from 'vitest';
import { StateGraph, Annotation } from '@langchain/langgraph';

// 工具集成测试状态定义
const ToolIntegrationState = Annotation.Root({
step: Annotation<string>({
reducer: (x, y) => y,
}),
data: Annotation<any>({
reducer: (x, y) => ({ ...x, ...y }),
}),
toolResults: Annotation<any[]>({
reducer: (x, y) => [...(x || []), ...y],
}),
});

// 模拟工具
class MockSearchTool {
async search(query: string): Promise<any> {
await new Promise((resolve) => setTimeout(resolve, 100));
return {
query,
results: [
{ title: 'Result 1', url: 'https://example.com/1' },
{ title: 'Result 2', url: 'https://example.com/2' },
],
timestamp: Date.now(),
};
}
}

class MockCalculatorTool {
calculate(expression: string): number {
try {
return eval(expression);
} catch (error) {
throw new Error(`Invalid expression: ${expression}`);
}
}
}

// 工具节点创建函数
const createToolNode = (tool: any, toolName: string) => {
return async (state: typeof ToolIntegrationState.State) => {
const { data } = state;

try {
let result;

switch (toolName) {
case 'search':
result = await tool.search(data.query);
break;
case 'calculator':
result = tool.calculate(data.expression);
break;
default:
throw new Error(`Unknown tool: ${toolName}`);
}

return {
step: `${toolName}_completed`,
data: { ...data, [`${toolName}Result`]: result },
toolResults: [{ tool: toolName, result, timestamp: Date.now() }],
};
} catch (error) {
return {
step: `${toolName}_error`,
data: { ...data, error: error.message },
toolResults: [
{ tool: toolName, error: error.message, timestamp: Date.now() },
],
};
}
};
};

describe('工具集成测试', () => {
let searchTool: MockSearchTool;
let calculatorTool: MockCalculatorTool;

beforeEach(() => {
searchTool = new MockSearchTool();
calculatorTool = new MockCalculatorTool();
});

describe('单个工具集成', () => {
it('应该正确集成搜索工具', async () => {
const graph = new StateGraph(ToolIntegrationState);

graph
.addNode('search', createToolNode(searchTool, 'search'))
.addEdge('__start__', 'search')
.addEdge('search', '__end__');

const app = graph.compile();

const result = await app.invoke({
step: 'initial',
data: { query: 'LangGraphJS tutorial' },
toolResults: [],
});

expect(result.step).toBe('search_completed');
expect(result.data.searchResult).toBeDefined();
expect(result.data.searchResult.query).toBe('LangGraphJS tutorial');
expect(result.data.searchResult.results).toHaveLength(2);
expect(result.toolResults).toHaveLength(1);
});

it('应该正确集成计算器工具', async () => {
const graph = new StateGraph(ToolIntegrationState);

graph
.addNode('calculator', createToolNode(calculatorTool, 'calculator'))
.addEdge('__start__', 'calculator')
.addEdge('calculator', '__end__');

const app = graph.compile();

const result = await app.invoke({
step: 'initial',
data: { expression: '2 + 3 * 4' },
toolResults: [],
});

expect(result.step).toBe('calculator_completed');
expect(result.data.calculatorResult).toBe(14);
expect(result.toolResults).toHaveLength(1);
});
});

describe('工具错误处理', () => {
it('应该处理工具执行错误', async () => {
const graph = new StateGraph(ToolIntegrationState);

graph
.addNode('calculator', createToolNode(calculatorTool, 'calculator'))
.addEdge('__start__', 'calculator')
.addEdge('calculator', '__end__');

const app = graph.compile();

const result = await app.invoke({
step: 'initial',
data: { expression: 'invalid expression' },
toolResults: [],
});

expect(result.step).toBe('calculator_error');
expect(result.data.error).toContain('Invalid expression');
expect(result.toolResults).toHaveLength(1);
expect(result.toolResults[0].error).toBeDefined();
});
});
});

🌐 端到端测试

完整流程测试

端到端测试
import { describe, it, expect, beforeEach, afterEach } from 'vitest';
import { StateGraph, Annotation } from '@langchain/langgraph';

// E2E 测试状态定义
const E2ETestState = Annotation.Root({
step: Annotation<string>({
reducer: (x, y) => y,
}),
userInput: Annotation<string>({
reducer: (x, y) => y,
}),
context: Annotation<any>({
reducer: (x, y) => ({ ...x, ...y }),
}),
messages: Annotation<any[]>({
reducer: (x, y) => [...(x || []), ...y],
}),
result: Annotation<any>({
reducer: (x, y) => y,
}),
});

// 模拟 LLM 服务
class MockLLMService {
private responses: Map<string, string> = new Map();

setResponse(input: string, response: string): void {
this.responses.set(input.toLowerCase(), response);
}

async generate(prompt: string): Promise<string> {
await new Promise((resolve) => setTimeout(resolve, 100));

const key = prompt.toLowerCase();
for (const [inputKey, response] of this.responses.entries()) {
if (key.includes(inputKey)) {
return response;
}
}

return `Mock response for: ${prompt}`;
}
}

// 模拟数据库服务
class MockDatabaseService {
private data: Map<string, any> = new Map();

async save(key: string, value: any): Promise<void> {
await new Promise((resolve) => setTimeout(resolve, 50));
this.data.set(key, value);
}

async load(key: string): Promise<any> {
await new Promise((resolve) => setTimeout(resolve, 50));
return this.data.get(key);
}

async clear(): Promise<void> {
this.data.clear();
}
}

// 创建完整的应用图
const createE2EApp = (
llmService: MockLLMService,
dbService: MockDatabaseService
) => {
// 输入处理节点
const inputProcessorNode = async (state: typeof E2ETestState.State) => {
const { userInput } = state;

return {
step: 'input_processed',
userInput,
context: {
processedAt: Date.now(),
inputLength: userInput.length,
},
messages: [{ type: 'user', content: userInput }],
};
};

// LLM 处理节点
const llmProcessorNode = async (state: typeof E2ETestState.State) => {
const { userInput, context } = state;

const prompt = `User input: ${userInput}\nContext: ${JSON.stringify(
context
)}`;
const response = await llmService.generate(prompt);

return {
step: 'llm_processed',
context: { ...context, llmResponse: response },
messages: [{ type: 'assistant', content: response }],
};
};

// 数据持久化节点
const persistenceNode = async (state: typeof E2ETestState.State) => {
const { userInput, context, messages } = state;

const sessionId = `session_${Date.now()}`;
await dbService.save(sessionId, {
userInput,
context,
messages,
timestamp: Date.now(),
});

return {
step: 'data_persisted',
context: { ...context, sessionId },
};
};

// 结果生成节点
const resultGeneratorNode = async (state: typeof E2ETestState.State) => {
const { context, messages } = state;

const result = {
success: true,
sessionId: context.sessionId,
messageCount: messages.length,
processingTime: Date.now() - context.processedAt,
response: context.llmResponse,
};

return {
step: 'completed',
result,
};
};

// 构建图
const graph = new StateGraph(E2ETestState);

graph
.addNode('input_processor', inputProcessorNode)
.addNode('llm_processor', llmProcessorNode)
.addNode('persistence', persistenceNode)
.addNode('result_generator', resultGeneratorNode)
.addEdge('__start__', 'input_processor')
.addEdge('input_processor', 'llm_processor')
.addEdge('llm_processor', 'persistence')
.addEdge('persistence', 'result_generator')
.addEdge('result_generator', '__end__');

return graph.compile();
};

describe('端到端测试', () => {
let llmService: MockLLMService;
let dbService: MockDatabaseService;
let app: any;

beforeEach(() => {
llmService = new MockLLMService();
dbService = new MockDatabaseService();
app = createE2EApp(llmService, dbService);
});

afterEach(async () => {
await dbService.clear();
});

describe('完整流程测试', () => {
it('应该完成完整的用户交互流程', async () => {
// 设置 LLM 响应
llmService.setResponse('hello', 'Hello! How can I help you today?');

const result = await app.invoke({
step: 'initial',
userInput: 'Hello, I need help',
context: {},
messages: [],
result: null,
});

expect(result.step).toBe('completed');
expect(result.result.success).toBe(true);
expect(result.result.response).toContain('Hello!');
expect(result.result.messageCount).toBe(2); // user + assistant
expect(result.result.sessionId).toBeDefined();
expect(result.result.processingTime).toBeGreaterThan(0);
});

it('应该正确保存会话数据', async () => {
llmService.setResponse('test', 'This is a test response.');

const result = await app.invoke({
step: 'initial',
userInput: 'This is a test message',
context: {},
messages: [],
result: null,
});

// 验证数据已保存
const sessionId = result.result.sessionId;
const savedData = await dbService.load(sessionId);

expect(savedData).toBeDefined();
expect(savedData.userInput).toBe('This is a test message');
expect(savedData.messages).toHaveLength(2);
expect(savedData.context.llmResponse).toContain('test response');
});
});

describe('性能测试', () => {
it('应该在合理时间内完成处理', async () => {
llmService.setResponse('performance', 'Performance test completed.');

const startTime = performance.now();
const result = await app.invoke({
step: 'initial',
userInput: 'Performance test',
context: {},
messages: [],
result: null,
});
const endTime = performance.now();

expect(result.step).toBe('completed');
expect(endTime - startTime).toBeLessThan(1000); // 1秒内完成
expect(result.result.processingTime).toBeLessThan(500); // 内部处理时间
});
});
});

用户场景测试

用户场景测试
/**
* ============================================================================
* 用户场景测试 - User Scenario Testing
* ============================================================================
*
* 📖 概述
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* 本文件展示如何编写基于真实用户场景的测试(也称为验收测试或BDD测试),模拟完整的用户交互流程,验证应用在实际使用场景下的行为、性能和用户体验。
*
* 🎯 核心功能
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* 1️⃣ 用户故事测试:基于用户故事编写测试
* 2️⃣ 端到端场景测试:模拟完整的用户交互流程
* 3️⃣ 多步骤流程测试:测试复杂的多步骤操作
* 4️⃣ 用户体验测试:验证响应时间和交互体验
*
* 💡 实现思路
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* • 使用Given-When-Then结构描述场景
* • 模拟真实的用户输入和操作
* • 验证每个步骤的状态和输出
* • 测试场景的各种变体和边界情况
*
* 🚀 使用方式
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* // 运行测试
* $ npx vitest 最佳实践/测试策略/user-scenario-tests.ts
*
* ⚠️ 注意事项
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* • 场景测试应该基于真实用户需求
* • 测试应该覆盖主要用户路径
* • 验证用户体验和性能指标
* • 场景测试可以作为活文档使用
*
* @author 程哥
* @version 1.0.0
* @updated 2025-11
*/

import '../../utils/loadEnv';
import { describe, it, expect, beforeEach } from 'vitest';
import { StateGraph, Annotation } from '@langchain/langgraph';

// 用户场景测试状态定义
const UserScenarioState = Annotation.Root({
step: Annotation<string>({
reducer: (x, y) => y,
}),
userProfile: Annotation<any>({
reducer: (x, y) => ({ ...x, ...y }),
}),
sessionData: Annotation<any>({
reducer: (x, y) => ({ ...x, ...y }),
}),
interactions: Annotation<any[]>({
reducer: (x, y) => [...(x || []), ...y],
}),
results: Annotation<any[]>({
reducer: (x, y) => [...(x || []), ...y],
}),
});

// 模拟用户类型
interface UserProfile {
id: string;
type: 'new_user' | 'returning_user' | 'power_user';
preferences: any;
history: any[];
}

// 创建用户场景应用
const createUserScenarioApp = () => {
// 用户认证节点
const authenticateUserNode = async (
state: typeof UserScenarioState.State
) => {
const { userProfile } = state;

// 模拟认证过程
await new Promise((resolve) => setTimeout(resolve, 100));

return {
step: 'user_authenticated',
sessionData: {
authenticated: true,
sessionId: `session_${Date.now()}`,
loginTime: new Date().toISOString(),
},
interactions: [
{ type: 'login', timestamp: Date.now(), userId: userProfile.id },
],
};
};

// 个性化推荐节点
const personalizeContentNode = async (
state: typeof UserScenarioState.State
) => {
const { userProfile, sessionData } = state;

let recommendations = [];

switch (userProfile.type) {
case 'new_user':
recommendations = ['tutorial', 'getting_started', 'basic_features'];
break;
case 'returning_user':
recommendations = [
'recent_updates',
'saved_items',
'continue_where_left',
];
break;
case 'power_user':
recommendations = ['advanced_features', 'api_docs', 'beta_features'];
break;
}

return {
step: 'content_personalized',
sessionData: {
...sessionData,
recommendations,
personalizedAt: Date.now(),
},
interactions: [
{ type: 'personalization', recommendations, timestamp: Date.now() },
],
};
};

// 用户交互处理节点
const handleUserInteractionNode = async (
state: typeof UserScenarioState.State
) => {
const { userProfile, sessionData, interactions } = state;

// 模拟用户交互处理
const lastInteraction = interactions[interactions.length - 1];
let response = '';

if (lastInteraction?.type === 'query') {
response = `处理查询: ${lastInteraction.content}`;
} else if (lastInteraction?.type === 'action') {
response = `执行操作: ${lastInteraction.action}`;
} else {
response = '欢迎使用我们的服务!';
}

return {
step: 'interaction_handled',
results: [
{
type: 'response',
content: response,
timestamp: Date.now(),
userId: userProfile.id,
},
],
};
};

// 构建图
const graph = new StateGraph(UserScenarioState);

graph
.addNode('authenticate', authenticateUserNode)
.addNode('personalize', personalizeContentNode)
.addNode('handle_interaction', handleUserInteractionNode)
.addEdge('__start__', 'authenticate')
.addEdge('authenticate', 'personalize')
.addEdge('personalize', 'handle_interaction')
.addEdge('handle_interaction', '__end__');

return graph.compile();
};

describe('用户场景测试', () => {
let app: any;

beforeEach(() => {
app = createUserScenarioApp();
});

describe('新用户场景', () => {
it('应该为新用户提供入门指导', async () => {
const newUser: UserProfile = {
id: 'user_001',
type: 'new_user',
preferences: {},
history: [],
};

const result = await app.invoke({
step: 'initial',
userProfile: newUser,
sessionData: {},
interactions: [],
results: [],
});

expect(result.step).toBe('interaction_handled');
expect(result.sessionData.authenticated).toBe(true);
expect(result.sessionData.recommendations).toContain('tutorial');
expect(result.sessionData.recommendations).toContain('getting_started');
expect(result.results).toHaveLength(1);
expect(result.results[0].content).toContain('欢迎');
});

it('应该记录新用户的首次交互', async () => {
const newUser: UserProfile = {
id: 'user_002',
type: 'new_user',
preferences: {},
history: [],
};

const result = await app.invoke({
step: 'initial',
userProfile: newUser,
sessionData: {},
interactions: [
{ type: 'query', content: '如何开始使用?', timestamp: Date.now() },
],
results: [],
});

expect(result.interactions).toHaveLength(3); // login + personalization + query
expect(result.interactions.some((i) => i.type === 'login')).toBe(true);
expect(
result.interactions.some((i) => i.type === 'personalization')
).toBe(true);
expect(result.results[0].content).toContain('如何开始使用?');
});
});

describe('回访用户场景', () => {
it('应该为回访用户显示个性化内容', async () => {
const returningUser: UserProfile = {
id: 'user_003',
type: 'returning_user',
preferences: { theme: 'dark', language: 'zh-CN' },
history: [
{ action: 'viewed_tutorial', timestamp: Date.now() - 86400000 },
{
action: 'saved_item',
itemId: 'item_123',
timestamp: Date.now() - 3600000,
},
],
};

const result = await app.invoke({
step: 'initial',
userProfile: returningUser,
sessionData: {},
interactions: [],
results: [],
});

expect(result.sessionData.recommendations).toContain('recent_updates');
expect(result.sessionData.recommendations).toContain('saved_items');
expect(result.sessionData.recommendations).toContain(
'continue_where_left'
);
});

it('应该处理回访用户的特定操作', async () => {
const returningUser: UserProfile = {
id: 'user_004',
type: 'returning_user',
preferences: {},
history: [],
};

const result = await app.invoke({
step: 'initial',
userProfile: returningUser,
sessionData: {},
interactions: [
{ type: 'action', action: 'view_saved_items', timestamp: Date.now() },
],
results: [],
});

expect(result.results[0].content).toContain('view_saved_items');
expect(result.results[0].userId).toBe('user_004');
});
});

describe('高级用户场景', () => {
it('应该为高级用户提供高级功能', async () => {
const powerUser: UserProfile = {
id: 'user_005',
type: 'power_user',
preferences: {
advancedMode: true,
betaFeatures: true,
apiAccess: true,
},
history: [
{
action: 'used_api',
endpoint: '/advanced',
timestamp: Date.now() - 86400000,
},
{
action: 'enabled_beta',
feature: 'new_ui',
timestamp: Date.now() - 3600000,
},
],
};

const result = await app.invoke({
step: 'initial',
userProfile: powerUser,
sessionData: {},
interactions: [],
results: [],
});

expect(result.sessionData.recommendations).toContain('advanced_features');
expect(result.sessionData.recommendations).toContain('api_docs');
expect(result.sessionData.recommendations).toContain('beta_features');
});

it('应该处理高级用户的复杂查询', async () => {
const powerUser: UserProfile = {
id: 'user_006',
type: 'power_user',
preferences: {},
history: [],
};

const complexQuery = 'API 限流策略和错误处理最佳实践';

const result = await app.invoke({
step: 'initial',
userProfile: powerUser,
sessionData: {},
interactions: [
{ type: 'query', content: complexQuery, timestamp: Date.now() },
],
results: [],
});

expect(result.results[0].content).toContain(complexQuery);
expect(result.results[0].type).toBe('response');
});
});

describe('多步骤用户旅程', () => {
it('应该支持完整的用户旅程', async () => {
const user: UserProfile = {
id: 'user_007',
type: 'new_user',
preferences: {},
history: [],
};

// 第一步:初始登录
let result = await app.invoke({
step: 'initial',
userProfile: user,
sessionData: {},
interactions: [],
results: [],
});

expect(result.sessionData.authenticated).toBe(true);
expect(result.sessionData.recommendations).toContain('tutorial');

// 第二步:用户查询
result = await app.invoke({
step: 'initial',
userProfile: user,
sessionData: result.sessionData,
interactions: [
...result.interactions,
{ type: 'query', content: '教程在哪里?', timestamp: Date.now() },
],
results: result.results,
});

expect(result.results).toHaveLength(2);
expect(result.results[1].content).toContain('教程在哪里?');

// 第三步:用户执行操作
result = await app.invoke({
step: 'initial',
userProfile: user,
sessionData: result.sessionData,
interactions: [
...result.interactions,
{ type: 'action', action: 'start_tutorial', timestamp: Date.now() },
],
results: result.results,
});

expect(result.results).toHaveLength(3);
expect(result.results[2].content).toContain('start_tutorial');
});
});

describe('错误场景处理', () => {
it('应该处理无效用户类型', async () => {
const invalidUser: any = {
id: 'user_008',
type: 'invalid_type',
preferences: {},
history: [],
};

const result = await app.invoke({
step: 'initial',
userProfile: invalidUser,
sessionData: {},
interactions: [],
results: [],
});

// 应该有默认行为
expect(result.sessionData.authenticated).toBe(true);
expect(result.sessionData.recommendations).toEqual([]);
});

it('应该处理缺失的用户数据', async () => {
const incompleteUser: any = {
id: 'user_009',
// 缺少 type 字段
preferences: {},
history: [],
};

const result = await app.invoke({
step: 'initial',
userProfile: incompleteUser,
sessionData: {},
interactions: [],
results: [],
});

expect(result.sessionData.authenticated).toBe(true);
expect(result.results).toHaveLength(1);
});
});
});

// 导出测试工具
export { UserScenarioState, UserProfile, createUserScenarioApp };
/*
执行结果:
Line:8 🌭 path.resolve(__dirname, '.env') /home/user/langgraphjs-tutorial/websites/examples/utils/.env
Error: Vitest cannot be imported in a CommonJS module
(这些是测试文件,需要使用 pnpm test 而不是 esno 运行)
*/

🎭 模拟和存根

LLM 模拟

LLM模拟测试
import { describe, it, expect, vi, beforeEach } from 'vitest';
import { StateGraph, Annotation } from '@langchain/langgraph';

// 模拟 LLM 响应的接口
interface MockLLMResponse {
content: string;
tool_calls?: Array<{
name: string;
args: Record<string, any>;
}>;
}

// 创建 LLM 模拟器
class MockLLM {
private responses: MockLLMResponse[] = [];
private currentIndex = 0;

// 设置预定义的响应
setResponses(responses: MockLLMResponse[]) {
this.responses = responses;
this.currentIndex = 0;
}

// 模拟 LLM 调用
async invoke(input: string): Promise<MockLLMResponse> {
if (this.currentIndex >= this.responses.length) {
throw new Error('No more mock responses available');
}

const response = this.responses[this.currentIndex];
this.currentIndex++;

// 模拟异步延迟
await new Promise((resolve) => setTimeout(resolve, 10));

return response;
}

// 重置模拟器
reset() {
this.currentIndex = 0;
this.responses = [];
}
}

// 状态定义
const GraphState = Annotation.Root({
messages: Annotation<string[]>({
reducer: (x, y) => x.concat(y),
}),
llm_response: Annotation<string>({
reducer: (x, y) => y,
}),
});

describe('LLM 模拟测试', () => {
let mockLLM: MockLLM;
let graph: StateGraph<typeof GraphState.State>;

beforeEach(() => {
mockLLM = new MockLLM();

// 创建测试图
graph = new StateGraph(GraphState)
.addNode('llm_node', async (state) => {
const lastMessage = state.messages[state.messages.length - 1];
const response = await mockLLM.invoke(lastMessage);

return {
llm_response: response.content,
};
})
.addEdge('__start__', 'llm_node')
.addEdge('llm_node', '__end__');
});

it('应该正确模拟简单的 LLM 响应', async () => {
// 设置模拟响应
mockLLM.setResponses([{ content: '你好!我是AI助手,很高兴为您服务。' }]);

const app = graph.compile();
const result = await app.invoke({
messages: ['你好'],
});

expect(result.llm_response).toBe('你好!我是AI助手,很高兴为您服务。');
});

it('应该正确模拟带工具调用的 LLM 响应', async () => {
// 设置带工具调用的模拟响应
mockLLM.setResponses([
{
content: '我需要查询天气信息',
tool_calls: [
{
name: 'get_weather',
args: { city: '北京' },
},
],
},
]);

const app = graph.compile();
const result = await app.invoke({
messages: ['北京今天天气怎么样?'],
});

expect(result.llm_response).toBe('我需要查询天气信息');
});

it('应该正确处理多轮对话模拟', async () => {
// 设置多轮对话的模拟响应
mockLLM.setResponses([
{ content: '第一轮响应' },
{ content: '第二轮响应' },
{ content: '第三轮响应' },
]);

const app = graph.compile();

// 第一轮
let result = await app.invoke({
messages: ['第一个问题'],
});
expect(result.llm_response).toBe('第一轮响应');

// 第二轮
result = await app.invoke({
messages: result.messages.concat(['第二个问题']),
});
expect(result.llm_response).toBe('第二轮响应');

// 第三轮
result = await app.invoke({
messages: result.messages.concat(['第三个问题']),
});
expect(result.llm_response).toBe('第三轮响应');
});

it('应该正确处理 LLM 错误情况', async () => {
// 不设置任何响应,模拟错误情况
const app = graph.compile();

await expect(
app.invoke({
messages: ['测试消息'],
})
).rejects.toThrow('No more mock responses available');
});

it('应该支持条件性响应模拟', async () => {
// 创建条件性模拟器
const conditionalMockLLM = {
async invoke(input: string) {
if (input.includes('天气')) {
return { content: '今天天气晴朗' };
} else if (input.includes('时间')) {
return { content: '现在是下午3点' };
} else {
return { content: '我不理解您的问题' };
}
},
};

// 创建使用条件性模拟的图
const conditionalGraph = new StateGraph(GraphState)
.addNode('llm_node', async (state) => {
const lastMessage = state.messages[state.messages.length - 1];
const response = await conditionalMockLLM.invoke(lastMessage);

return {
llm_response: response.content,
};
})
.addEdge('__start__', 'llm_node')
.addEdge('llm_node', '__end__');

const app = conditionalGraph.compile();

// 测试天气查询
let result = await app.invoke({
messages: ['今天天气怎么样?'],
});
expect(result.llm_response).toBe('今天天气晴朗');

// 测试时间查询
result = await app.invoke({
messages: ['现在几点了?'],
});
expect(result.llm_response).toBe('现在是下午3点');

// 测试未知查询
result = await app.invoke({
messages: ['你好'],
});
expect(result.llm_response).toBe('我不理解您的问题');
});
});

外部服务模拟

外部服务模拟
import { describe, it, expect, beforeEach, afterEach } from 'vitest';
import { StateGraph, Annotation } from '@langchain/langgraph';

// 外部服务模拟示例

// 模拟 HTTP 服务
class MockHTTPService {
private responses: Map<string, any> = new Map();
private delays: Map<string, number> = new Map();

setResponse(url: string, response: any, delay: number = 0) {
this.responses.set(url, response);
this.delays.set(url, delay);
}

async get(url: string): Promise<any> {
const delay = this.delays.get(url) || 0;
if (delay > 0) {
await new Promise((resolve) => setTimeout(resolve, delay));
}

const response = this.responses.get(url);
if (!response) {
throw new Error(`No mock response for ${url}`);
}

return response;
}

async post(url: string, data: any): Promise<any> {
const delay = this.delays.get(url) || 0;
if (delay > 0) {
await new Promise((resolve) => setTimeout(resolve, delay));
}

const response = this.responses.get(url);
if (!response) {
throw new Error(`No mock response for ${url}`);
}

return { ...response, receivedData: data };
}
}

// 模拟数据库服务
class MockDatabaseService {
private data: Map<string, any> = new Map();
private shouldFail: boolean = false;

setShouldFail(shouldFail: boolean) {
this.shouldFail = shouldFail;
}

async save(key: string, value: any): Promise<void> {
if (this.shouldFail) {
throw new Error('Database connection failed');
}

await new Promise((resolve) => setTimeout(resolve, 50));
this.data.set(key, { ...value, timestamp: Date.now() });
}

async load(key: string): Promise<any> {
if (this.shouldFail) {
throw new Error('Database connection failed');
}

await new Promise((resolve) => setTimeout(resolve, 30));
return this.data.get(key);
}

clear(): void {
this.data.clear();
this.shouldFail = false;
}
}

// 状态定义
const ServiceMockState = Annotation.Root({
step: Annotation<string>({
reducer: (x, y) => y,
}),
apiData: Annotation<any>({
reducer: (x, y) => y,
}),
dbData: Annotation<any>({
reducer: (x, y) => y,
}),
errors: Annotation<string[]>({
reducer: (x, y) => [...(x || []), ...(y || [])],
}),
});

describe('外部服务模拟测试', () => {
let httpService: MockHTTPService;
let dbService: MockDatabaseService;
let graph: StateGraph<typeof ServiceMockState.State>;

beforeEach(() => {
httpService = new MockHTTPService();
dbService = new MockDatabaseService();

// API 调用节点
const apiCallNode = async (state: typeof ServiceMockState.State) => {
try {
const data = await httpService.get('/api/data');
return {
step: 'api_completed',
apiData: data,
};
} catch (error) {
return {
step: 'api_error',
errors: [error.message],
};
}
};

// 数据库操作节点
const dbOperationNode = async (state: typeof ServiceMockState.State) => {
try {
const key = `data_${Date.now()}`;
await dbService.save(key, state.apiData);
const savedData = await dbService.load(key);

return {
step: 'db_completed',
dbData: savedData,
};
} catch (error) {
return {
step: 'db_error',
errors: [error.message],
};
}
};

graph = new StateGraph(ServiceMockState)
.addNode('api_call', apiCallNode)
.addNode('db_operation', dbOperationNode)
.addEdge('__start__', 'api_call')
.addEdge('api_call', 'db_operation')
.addEdge('db_operation', '__end__');
});

afterEach(() => {
dbService.clear();
});

it('应该正确模拟成功的 API 调用和数据库操作', async () => {
// 设置模拟响应
httpService.setResponse('/api/data', { message: 'Hello API' });

const app = graph.compile();
const result = await app.invoke({
step: 'initial',
apiData: null,
dbData: null,
errors: [],
});

expect(result.step).toBe('db_completed');
expect(result.apiData).toEqual({ message: 'Hello API' });
expect(result.dbData).toBeDefined();
expect(result.dbData.message).toBe('Hello API');
expect(result.errors).toHaveLength(0);
});

it('应该正确处理 API 调用失败', async () => {
// 不设置 API 响应,模拟失败
const app = graph.compile();
const result = await app.invoke({
step: 'initial',
apiData: null,
dbData: null,
errors: [],
});

expect(result.step).toBe('api_error');
expect(result.errors).toHaveLength(1);
expect(result.errors[0]).toContain('No mock response');
});

it('应该正确处理数据库操作失败', async () => {
// 设置 API 成功响应
httpService.setResponse('/api/data', { message: 'Hello API' });
// 设置数据库失败
dbService.setShouldFail(true);

const app = graph.compile();
const result = await app.invoke({
step: 'initial',
apiData: null,
dbData: null,
errors: [],
});

expect(result.step).toBe('db_error');
expect(result.apiData).toEqual({ message: 'Hello API' });
expect(result.errors).toHaveLength(1);
expect(result.errors[0]).toBe('Database connection failed');
});

it('应该正确处理服务延迟', async () => {
// 设置延迟响应
httpService.setResponse('/api/data', { message: 'Delayed response' }, 100);

const app = graph.compile();

const startTime = performance.now();
const result = await app.invoke({
step: 'initial',
apiData: null,
dbData: null,
errors: [],
});
const endTime = performance.now();

expect(result.step).toBe('db_completed');
expect(endTime - startTime).toBeGreaterThan(100); // 至少 100ms 延迟
});
});

⏱️ 异步测试

Promise 测试

异步操作测试
/**
* ============================================================================
* 异步操作测试 - Asynchronous Operations Testing
* ============================================================================
*
* 📖 概述
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* 本文件展示如何全面测试 LangGraph 中的异步操作,包括异步延迟、异步数据获取、
* 异步错误处理、并发操作、异步操作链、超时处理、重试逻辑等。通过 AsyncOperationSimulator
* 类模拟各种异步场景,帮助开发者编写可靠的异步测试用例。
*
* 🎯 核心功能
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* 1️⃣ 异步操作模拟器:提供延迟、数据获取、失败操作等异步模拟工具
* 2️⃣ 异步错误处理测试:验证异步操作中的错误捕获和处理机制
* 3️⃣ 并发操作测试:测试 Promise.all 和多个异步操作的并发执行
* 4️⃣ 超时和重试测试:验证异步操作的超时控制和重试逻辑
* 5️⃣ Promise.allSettled 测试:处理部分成功、部分失败的异步操作场景
* 6️⃣ 执行时间测试:验证异步操作的性能和执行时间
*
* 💡 实现思路
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* • 使用 setTimeout 和 Promise 模拟真实的异步操作
* • 通过参数控制操作是否失败,模拟不可靠的异步服务
* • 使用 Promise.race 实现超时控制机制
* • 采用循环和 try-catch 实现重试逻辑
* • 利用 Promise.allSettled 处理混合成功/失败的场景
* • 使用 performance.now() 或 Date.now() 测量异步操作时间
*
* 🚀 使用方式
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* // 运行测试
* $ npx vitest 最佳实践/测试策略/async-tests.ts
*
* ⚠️ 注意事项
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* • 异步测试需要正确使用 async/await,避免测试提前结束
* • 超时测试的时间阈值需要考虑 CI 环境的性能差异
* • 重试逻辑测试需要准确控制尝试次数,避免无限重试
* • 并发测试要注意资源竞争和共享状态问题
* • 使用 Promise.allSettled 而非 Promise.all 可以更好地处理部分失败
*
* @author 程哥
* @version 1.0.0
* @updated 2025-11
*/

import '../../utils/loadEnv';
import { describe, it, expect, beforeEach } from 'vitest';
import { StateGraph, Annotation } from '@langchain/langgraph';

// 异步操作模拟器
class AsyncOperationSimulator {
// 模拟延迟操作
static async delay(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}

// 模拟异步数据获取
static async fetchData(id: string): Promise<{ id: string; data: string }> {
await this.delay(100);
return { id, data: `数据_${id}` };
}

// 模拟可能失败的异步操作
static async unreliableOperation(
shouldFail: boolean = false
): Promise<string> {
await this.delay(50);
if (shouldFail) {
throw new Error('操作失败');
}
return '操作成功';
}

// 模拟并发操作
static async concurrentOperations(count: number): Promise<string[]> {
const promises = Array.from({ length: count }, (_, i) =>
this.fetchData(`item_${i}`)
);
const results = await Promise.all(promises);
return results.map((r) => r.data);
}
}

// 状态定义
const AsyncGraphState = Annotation.Root({
input: Annotation<string>({
reducer: (x, y) => y,
}),
results: Annotation<string[]>({
reducer: (x, y) => x.concat(y),
}),
error: Annotation<string | null>({
reducer: (x, y) => y,
}),
completed: Annotation<boolean>({
reducer: (x, y) => y,
}),
});

describe('异步操作测试', () => {
let graph: StateGraph<typeof AsyncGraphState.State>;

beforeEach(() => {
graph = new StateGraph(AsyncGraphState);
});

it('应该正确处理简单异步操作', async () => {
const testGraph = graph
.addNode('async_node', async (state) => {
const result = await AsyncOperationSimulator.fetchData(state.input);
return {
results: [result.data],
completed: true,
};
})
.addEdge('__start__', 'async_node')
.addEdge('async_node', '__end__');

const app = testGraph.compile();
const result = await app.invoke({ input: 'test123' });

expect(result.results).toEqual(['数据_test123']);
expect(result.completed).toBe(true);
});

it('应该正确处理异步错误', async () => {
const testGraph = graph
.addNode('error_node', async (state) => {
try {
await AsyncOperationSimulator.unreliableOperation(true);
return { completed: true };
} catch (error) {
return {
error: error instanceof Error ? error.message : '未知错误',
completed: false,
};
}
})
.addEdge('__start__', 'error_node')
.addEdge('error_node', '__end__');

const app = testGraph.compile();
const result = await app.invoke({ input: 'test' });

expect(result.error).toBe('操作失败');
expect(result.completed).toBe(false);
});

it('应该正确处理并发异步操作', async () => {
const testGraph = graph
.addNode('concurrent_node', async (state) => {
const results = await AsyncOperationSimulator.concurrentOperations(3);
return {
results,
completed: true,
};
})
.addEdge('__start__', 'concurrent_node')
.addEdge('concurrent_node', '__end__');

const app = testGraph.compile();
const result = await app.invoke({ input: 'concurrent_test' });

expect(result.results).toHaveLength(3);
expect(result.results).toEqual([
'数据_item_0',
'数据_item_1',
'数据_item_2',
]);
expect(result.completed).toBe(true);
});

it('应该正确处理异步操作链', async () => {
const testGraph = graph
.addNode('step1', async (state) => {
const data = await AsyncOperationSimulator.fetchData('step1');
return { results: [data.data] };
})
.addNode('step2', async (state) => {
const data = await AsyncOperationSimulator.fetchData('step2');
return { results: [data.data] };
})
.addNode('step3', async (state) => {
const data = await AsyncOperationSimulator.fetchData('step3');
return {
results: [data.data],
completed: true,
};
})
.addEdge('__start__', 'step1')
.addEdge('step1', 'step2')
.addEdge('step2', 'step3')
.addEdge('step3', '__end__');

const app = testGraph.compile();
const result = await app.invoke({ input: 'chain_test' });

expect(result.results).toEqual(['数据_step1', '数据_step2', '数据_step3']);
expect(result.completed).toBe(true);
});

it('应该正确处理异步操作超时', async () => {
const timeoutPromise = <T>(
promise: Promise<T>,
timeout: number
): Promise<T> => {
return Promise.race([
promise,
new Promise<T>((_, reject) =>
setTimeout(() => reject(new Error('操作超时')), timeout)
),
]);
};

const testGraph = graph
.addNode('timeout_node', async (state) => {
try {
// 模拟长时间运行的操作
const longOperation = AsyncOperationSimulator.delay(2000);
await timeoutPromise(longOperation, 100);
return { completed: true };
} catch (error) {
return {
error: error instanceof Error ? error.message : '未知错误',
completed: false,
};
}
})
.addEdge('__start__', 'timeout_node')
.addEdge('timeout_node', '__end__');

const app = testGraph.compile();
const result = await app.invoke({ input: 'timeout_test' });

expect(result.error).toBe('操作超时');
expect(result.completed).toBe(false);
});

it('应该正确处理异步重试逻辑', async () => {
let attemptCount = 0;

const testGraph = graph
.addNode('retry_node', async (state) => {
const maxRetries = 3;
let lastError: string | null = null;

for (let i = 0; i < maxRetries; i++) {
try {
attemptCount++;
// 前两次尝试失败,第三次成功
const shouldFail = attemptCount < 3;
const result = await AsyncOperationSimulator.unreliableOperation(
shouldFail
);
return {
results: [result],
completed: true,
};
} catch (error) {
lastError = error instanceof Error ? error.message : '未知错误';
if (i < maxRetries - 1) {
await AsyncOperationSimulator.delay(10); // 重试延迟
}
}
}

return {
error: `重试失败: ${lastError}`,
completed: false,
};
})
.addEdge('__start__', 'retry_node')
.addEdge('retry_node', '__end__');

const app = testGraph.compile();
const result = await app.invoke({ input: 'retry_test' });

expect(result.results).toEqual(['操作成功']);
expect(result.completed).toBe(true);
expect(attemptCount).toBe(3);
});

it('应该正确处理Promise.allSettled', async () => {
const testGraph = graph
.addNode('all_settled_node', async (state) => {
const operations = [
AsyncOperationSimulator.fetchData('success1'),
AsyncOperationSimulator.unreliableOperation(true), // 这个会失败
AsyncOperationSimulator.fetchData('success2'),
];

const results = await Promise.allSettled(operations);
const successResults: string[] = [];
const errors: string[] = [];

results.forEach((result, index) => {
if (result.status === 'fulfilled') {
if (typeof result.value === 'string') {
successResults.push(result.value);
} else if (
result.value &&
typeof result.value === 'object' &&
'data' in result.value
) {
successResults.push(result.value.data as string);
}
} else {
errors.push(`操作${index}失败: ${result.reason.message}`);
}
});

return {
results: successResults,
error: errors.length > 0 ? errors.join('; ') : null,
completed: true,
};
})
.addEdge('__start__', 'all_settled_node')
.addEdge('all_settled_node', '__end__');

const app = testGraph.compile();
const result = await app.invoke({ input: 'all_settled_test' });

expect(result.results).toEqual(['数据_success1', '数据_success2']);
expect(result.error).toContain('操作1失败');
expect(result.completed).toBe(true);
});

it('应该正确测试异步操作的执行时间', async () => {
const testGraph = graph
.addNode('timed_node', async (state) => {
const startTime = Date.now();
await AsyncOperationSimulator.delay(100);
const endTime = Date.now();
const duration = endTime - startTime;

return {
results: [`执行时间: ${duration}ms`],
completed: true,
};
})
.addEdge('__start__', 'timed_node')
.addEdge('timed_node', '__end__');

const app = testGraph.compile();
const startTime = Date.now();
const result = await app.invoke({ input: 'timing_test' });
const totalTime = Date.now() - startTime;

expect(result.completed).toBe(true);
expect(totalTime).toBeGreaterThanOrEqual(100);
expect(totalTime).toBeLessThan(200); // 允许一些执行开销
});
});

// 导出异步测试工具
export { AsyncOperationSimulator };

/*
执行结果:
RUN v3.2.4 /Users/loock/myFile/langgraphjs-tutorial/websites

stdout | examples/最佳实践/测试策略/async-tests.test.ts
Line:8 🌭 path.resolve(__dirname, '.env') /Users/loock/myFile/langgraphjs-tutorial/websites/examples/utils/.env

✓ examples/最佳实践/测试策略/async-tests.test.ts (8 tests) 1129ms
✓ 异步操作测试 > 应该正确处理异步操作链 318ms

Test Files 1 passed (1)
Tests 8 passed (8)
Start at 02:39:25
Duration 5.15s (transform 157ms, setup 393ms, collect 671ms, tests 1.13s, environment 1.06s, prepare 870ms)
*/

流式测试

流式处理测试
/**
* ============================================================================
* 流式处理测试 - Streaming Testing
* ============================================================================
*
* 📖 概述
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* 本文件展示如何测试LangGraph的流式处理功能,包括流式执行、事件流处理、流式错误处理、流式数据完整性验证以及流式性能测试。
*
* 🎯 核心功能
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* 1️⃣ 流式执行测试:测试graph.stream()的流式输出
* 2️⃣ 事件流测试:验证流式事件的顺序和内容
* 3️⃣ 流式错误处理测试:测试流中的错误处理
* 4️⃣ 流式完整性测试:验证所有事件都被正确接收
*
* 💡 实现思路
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* • 使用for await...of遍历异步迭代器
* • 收集所有流式事件进行验证
* • 测试流的暂停和恢复
* • 验证流式数据的顺序和完整性
*
* 🚀 使用方式
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* // 运行测试
* $ npx vitest 最佳实践/测试策略/streaming-tests.ts
*
* ⚠️ 注意事项
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* • 流式测试需要处理异步迭代器
* • 验证所有事件都按预期顺序到达
* • 测试流的错误处理和中断
* • 注意流式测试的超时设置
*
* @author 程哥
* @version 1.0.0
* @updated 2025-11
*/

import '../../utils/loadEnv';
import { describe, it, expect, beforeEach } from 'vitest';
import { StateGraph, Annotation } from '@langchain/langgraph';

// 流式数据模拟器
class StreamingSimulator {
// 模拟流式数据生成
static async *generateStream(
count: number,
delay: number = 50
): AsyncGenerator<string> {
for (let i = 0; i < count; i++) {
await new Promise((resolve) => setTimeout(resolve, delay));
yield `数据块_${i}`;
}
}

// 模拟流式处理
static async *processStream(
input: AsyncIterable<string>
): AsyncGenerator<string> {
for await (const item of input) {
yield `处理后_${item}`;
}
}

// 模拟流式聚合
static async collectStream(stream: AsyncIterable<string>): Promise<string[]> {
const results: string[] = [];
for await (const item of stream) {
results.push(item);
}
return results;
}

// 模拟带错误的流
static async *errorStream(errorAt: number): AsyncGenerator<string> {
for (let i = 0; i < 5; i++) {
if (i === errorAt) {
throw new Error(`流错误在位置 ${i}`);
}
yield `数据_${i}`;
}
}
}

// 状态定义
const StreamingState = Annotation.Root({
input: Annotation<string>({
reducer: (x, y) => y,
}),
stream_data: Annotation<string[]>({
reducer: (x, y) => x.concat(y),
}),
processed_count: Annotation<number>({
reducer: (x, y) => y,
}),
error: Annotation<string | null>({
reducer: (x, y) => y,
}),
completed: Annotation<boolean>({
reducer: (x, y) => y,
}),
});

describe('流式处理测试', () => {
let graph: StateGraph<typeof StreamingState.State>;

beforeEach(() => {
graph = new StateGraph(StreamingState);
});

it('应该正确处理基本流式数据', async () => {
const testGraph = graph
.addNode('streaming_node', async (state) => {
const stream = StreamingSimulator.generateStream(3, 10);
const results = await StreamingSimulator.collectStream(stream);

return {
stream_data: results,
processed_count: results.length,
completed: true,
};
})
.addEdge('__start__', 'streaming_node')
.addEdge('streaming_node', '__end__');

const app = testGraph.compile();
const result = await app.invoke({ input: 'basic_stream' });

expect(result.stream_data).toEqual(['数据块_0', '数据块_1', '数据块_2']);
expect(result.processed_count).toBe(3);
expect(result.completed).toBe(true);
});

it('应该正确处理流式数据转换', async () => {
const testGraph = graph
.addNode('transform_node', async (state) => {
const sourceStream = StreamingSimulator.generateStream(3, 10);
const processedStream = StreamingSimulator.processStream(sourceStream);
const results = await StreamingSimulator.collectStream(processedStream);

return {
stream_data: results,
processed_count: results.length,
completed: true,
};
})
.addEdge('__start__', 'transform_node')
.addEdge('transform_node', '__end__');

const app = testGraph.compile();
const result = await app.invoke({ input: 'transform_stream' });

expect(result.stream_data).toEqual([
'处理后_数据块_0',
'处理后_数据块_1',
'处理后_数据块_2',
]);
expect(result.processed_count).toBe(3);
expect(result.completed).toBe(true);
});

it('应该正确处理流式错误', async () => {
const testGraph = graph
.addNode('error_stream_node', async (state) => {
try {
const errorStream = StreamingSimulator.errorStream(2);
const results = await StreamingSimulator.collectStream(errorStream);

return {
stream_data: results,
completed: true,
};
} catch (error) {
return {
error: error instanceof Error ? error.message : '流处理错误',
completed: false,
};
}
})
.addEdge('__start__', 'error_stream_node')
.addEdge('error_stream_node', '__end__');

const app = testGraph.compile();
const result = await app.invoke({ input: 'error_stream' });

expect(result.error).toBe('流错误在位置 2');
expect(result.completed).toBe(false);
});

it('应该正确处理流式数据的分批处理', async () => {
const batchProcessor = async function* (
stream: AsyncIterable<string>,
batchSize: number
): AsyncGenerator<string[]> {
let batch: string[] = [];

for await (const item of stream) {
batch.push(item);
if (batch.length >= batchSize) {
yield [...batch];
batch = [];
}
}

if (batch.length > 0) {
yield batch;
}
};

const testGraph = graph
.addNode('batch_node', async (state) => {
const sourceStream = StreamingSimulator.generateStream(7, 10);
const batchStream = batchProcessor(sourceStream, 3);

const batches: string[][] = [];
for await (const batch of batchStream) {
batches.push(batch);
}

const flatResults = batches.flat();

return {
stream_data: flatResults,
processed_count: batches.length,
completed: true,
};
})
.addEdge('__start__', 'batch_node')
.addEdge('batch_node', '__end__');

const app = testGraph.compile();
const result = await app.invoke({ input: 'batch_stream' });

expect(result.stream_data).toHaveLength(7);
expect(result.processed_count).toBe(3); // 3个批次:[3, 3, 1]
expect(result.completed).toBe(true);
});

it('应该正确处理流式数据的过滤', async () => {
const filterStream = async function* (
stream: AsyncIterable<string>,
predicate: (item: string) => boolean
): AsyncGenerator<string> {
for await (const item of stream) {
if (predicate(item)) {
yield item;
}
}
};

const testGraph = graph
.addNode('filter_node', async (state) => {
const sourceStream = StreamingSimulator.generateStream(5, 10);
// 只保留偶数索引的数据
const filteredStream = filterStream(sourceStream, (item) => {
const index = parseInt(item.split('_')[1]);
return index % 2 === 0;
});

const results = await StreamingSimulator.collectStream(filteredStream);

return {
stream_data: results,
processed_count: results.length,
completed: true,
};
})
.addEdge('__start__', 'filter_node')
.addEdge('filter_node', '__end__');

const app = testGraph.compile();
const result = await app.invoke({ input: 'filter_stream' });

expect(result.stream_data).toEqual(['数据块_0', '数据块_2', '数据块_4']);
expect(result.processed_count).toBe(3);
expect(result.completed).toBe(true);
});

it('应该正确处理并发流式处理', async () => {
const testGraph = graph
.addNode('concurrent_streams_node', async (state) => {
// 创建多个并发流
const streams = [
StreamingSimulator.generateStream(2, 20),
StreamingSimulator.generateStream(2, 30),
StreamingSimulator.generateStream(2, 10),
];

// 并发收集所有流的数据
const results = await Promise.all(
streams.map((stream) => StreamingSimulator.collectStream(stream))
);

const flatResults = results.flat();

return {
stream_data: flatResults,
processed_count: results.length,
completed: true,
};
})
.addEdge('__start__', 'concurrent_streams_node')
.addEdge('concurrent_streams_node', '__end__');

const app = testGraph.compile();
const result = await app.invoke({ input: 'concurrent_streams' });

expect(result.stream_data).toHaveLength(6); // 3个流,每个2个元素
expect(result.processed_count).toBe(3); // 3个流
expect(result.completed).toBe(true);
});

it('应该正确测试流式处理的性能', async () => {
const testGraph = graph
.addNode('performance_node', async (state) => {
const startTime = Date.now();

// 创建大量数据的流
const largeStream = StreamingSimulator.generateStream(100, 1);
const results = await StreamingSimulator.collectStream(largeStream);

const endTime = Date.now();
const duration = endTime - startTime;

return {
stream_data: [`处理了${results.length}个项目,耗时${duration}ms`],
processed_count: results.length,
completed: true,
};
})
.addEdge('__start__', 'performance_node')
.addEdge('performance_node', '__end__');

const app = testGraph.compile();
const startTime = Date.now();
const result = await app.invoke({ input: 'performance_test' });
const totalTime = Date.now() - startTime;

expect(result.processed_count).toBe(100);
expect(result.completed).toBe(true);
expect(totalTime).toBeGreaterThan(100); // 至少100ms(100个项目 * 1ms延迟)
});

it('应该正确处理流式数据的背压控制', async () => {
// 模拟背压控制的流处理器
const backpressureProcessor = async function* (
stream: AsyncIterable<string>,
maxBuffer: number
): AsyncGenerator<string> {
const buffer: string[] = [];

for await (const item of stream) {
buffer.push(item);

// 当缓冲区满时,处理并清空
if (buffer.length >= maxBuffer) {
for (const bufferedItem of buffer) {
yield `缓冲处理_${bufferedItem}`;
}
buffer.length = 0;
}
}

// 处理剩余的缓冲数据
for (const bufferedItem of buffer) {
yield `最终处理_${bufferedItem}`;
}
};

const testGraph = graph
.addNode('backpressure_node', async (state) => {
const sourceStream = StreamingSimulator.generateStream(5, 10);
const processedStream = backpressureProcessor(sourceStream, 2);
const results = await StreamingSimulator.collectStream(processedStream);

return {
stream_data: results,
processed_count: results.length,
completed: true,
};
})
.addEdge('__start__', 'backpressure_node')
.addEdge('backpressure_node', '__end__');

const app = testGraph.compile();
const result = await app.invoke({ input: 'backpressure_test' });

expect(result.stream_data).toHaveLength(5);
expect(result.processed_count).toBe(5);
expect(result.completed).toBe(true);

// 验证背压处理的结果格式
const hasBufferedItems = result.stream_data.some(
(item) => item.includes('缓冲处理_') || item.includes('最终处理_')
);
expect(hasBufferedItems).toBe(true);
});
});

// 导出流式测试工具
export { StreamingSimulator };

/*
执行结果:
RUN v3.2.4 /Users/loock/myFile/langgraphjs-tutorial/websites

stdout | examples/最佳实践/测试策略/streaming-tests.test.ts
Line:8 🌭 path.resolve(__dirname, '.env') /Users/loock/myFile/langgraphjs-tutorial/websites/examples/utils/.env

✓ examples/最佳实践/测试策略/streaming-tests.test.ts (8 tests) 876ms

Test Files 1 passed (1)
Tests 8 passed (8)
Start at 02:39:28
Duration 2.66s (transform 54ms, setup 148ms, collect 353ms, tests 876ms, environment 763ms, prepare 123ms)
*/

🔄 状态管理测试

状态变化测试

状态变化测试
/**
* ============================================================================
* 状态转换测试 - State Transition Testing
* ============================================================================
*
* 📖 概述
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* 本文件展示如何测试LangGraph应用中的状态转换逻辑,验证状态在节点执行过程中的正确变化,包括状态初始化、增量更新、累积合并以及最终状态的完整性验证。
*
* 🎯 核心功能
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* 1️⃣ 状态初始化测试:验证状态的初始值和默认值
* 2️⃣ 状态更新测试:测试单个节点的状态更新
* 3️⃣ 状态累积测试:测试跨多个节点的状态累积
* 4️⃣ 状态转换链测试:验证完整的状态转换流程
*
* 💡 实现思路
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* • 在每个节点执行后验证状态变化
* • 使用状态快照对比验证增量更新
* • 测试reducer的累积行为
* • 验证状态不变性和数据隔离
*
* 🚀 使用方式
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* // 运行测试
* $ npx vitest 最佳实践/测试策略/state-transition-tests.ts
*
* ⚠️ 注意事项
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* • 状态转换测试应该覆盖所有状态字段
* • 测试状态的中间状态和最终状态
* • 验证reducer的正确行为
* • 注意状态的不可变性
*
* @author 程哥
* @version 1.0.0
* @updated 2025-11
*/

import '../../utils/loadEnv';
import { describe, it, expect, beforeEach } from 'vitest';
import { StateGraph, Annotation } from '@langchain/langgraph';

// 状态转换测试的状态定义
const TransitionState = Annotation.Root({
current_step: Annotation<string>({
reducer: (x, y) => y,
}),
data: Annotation<any>({
reducer: (x, y) => ({ ...x, ...y }),
}),
history: Annotation<string[]>({
reducer: (x, y) => x.concat(y),
}),
error: Annotation<string | null>({
reducer: (x, y) => y,
}),
completed: Annotation<boolean>({
reducer: (x, y) => y,
}),
});

// 状态转换模拟器
class StateTransitionSimulator {
// 模拟状态验证
static validateState(state: any, expectedStep: string): boolean {
return state.current_step === expectedStep;
}

// 模拟状态转换条件
static shouldTransition(state: any, condition: string): boolean {
switch (condition) {
case 'has_data':
return state.data && Object.keys(state.data).length > 0;
case 'no_error':
return !state.error;
case 'step_complete':
return state.current_step !== 'initial';
default:
return true;
}
}

// 模拟复杂状态转换逻辑
static getNextStep(currentStep: string, data: any): string {
switch (currentStep) {
case 'initial':
return data.type === 'urgent'
? 'priority_processing'
: 'normal_processing';
case 'normal_processing':
return data.needsReview ? 'review' : 'finalize';
case 'priority_processing':
return 'finalize';
case 'review':
return data.approved ? 'finalize' : 'revision';
case 'revision':
return 'review';
case 'finalize':
return 'completed';
default:
return 'error';
}
}
}

describe('状态转换测试', () => {
let graph: StateGraph<typeof TransitionState.State>;

beforeEach(() => {
graph = new StateGraph(TransitionState);
});

it('应该正确处理简单状态转换', async () => {
const testGraph = graph
.addNode('step1', async (state) => {
return {
current_step: 'step1',
history: ['进入step1'],
data: { value: 1 },
};
})
.addNode('step2', async (state) => {
return {
current_step: 'step2',
history: ['进入step2'],
data: { value: state.data.value + 1 },
};
})
.addNode('step3', async (state) => {
return {
current_step: 'step3',
history: ['进入step3'],
completed: true,
};
})
.addEdge('__start__', 'step1')
.addEdge('step1', 'step2')
.addEdge('step2', 'step3')
.addEdge('step3', '__end__');

const app = testGraph.compile();
const result = await app.invoke({
current_step: 'initial',
data: {},
history: [],
});

expect(result.current_step).toBe('step3');
expect(result.history).toEqual(['进入step1', '进入step2', '进入step3']);
expect(result.data.value).toBe(2);
expect(result.completed).toBe(true);
});

it('应该正确处理条件状态转换', async () => {
const testGraph = graph
.addNode('initial', async (state) => {
return {
current_step: 'initial',
history: ['初始化'],
data: { type: state.data.type || 'normal' },
};
})
.addNode('normal_processing', async (state) => {
return {
current_step: 'normal_processing',
history: ['普通处理'],
data: {
processed: true,
needsReview: state.data.needsReview || false,
},
};
})
.addNode('priority_processing', async (state) => {
return {
current_step: 'priority_processing',
history: ['优先处理'],
data: { processed: true, priority: true },
};
})
.addNode('review', async (state) => {
return {
current_step: 'review',
history: ['审核中'],
data: { reviewed: true, approved: state.data.approved || false },
};
})
.addNode('finalize', async (state) => {
return {
current_step: 'finalize',
history: ['完成'],
completed: true,
};
})
.addEdge('__start__', 'initial')
.addConditionalEdges(
'initial',
(state) => (state.data.type === 'urgent' ? 'priority' : 'normal'),
{
priority: 'priority_processing',
normal: 'normal_processing',
}
)
.addConditionalEdges(
'normal_processing',
(state) => (state.data.needsReview ? 'review' : 'finalize'),
{
review: 'review',
finalize: 'finalize',
}
)
.addEdge('priority_processing', 'finalize')
.addConditionalEdges(
'review',
(state) => (state.data.approved ? 'finalize' : 'normal_processing'),
{
finalize: 'finalize',
normal_processing: 'normal_processing',
}
)
.addEdge('finalize', '__end__');

// 测试普通流程
const normalResult = await testGraph.compile().invoke({
current_step: 'start',
data: { type: 'normal' },
history: [],
});

expect(normalResult.current_step).toBe('finalize');
expect(normalResult.history).toEqual(['初始化', '普通处理', '完成']);

// 测试优先流程
const priorityResult = await testGraph.compile().invoke({
current_step: 'start',
data: { type: 'urgent' },
history: [],
});

expect(priorityResult.current_step).toBe('finalize');
expect(priorityResult.history).toEqual(['初始化', '优先处理', '完成']);

// 测试需要审核的流程
const reviewResult = await testGraph.compile().invoke({
current_step: 'start',
data: { type: 'normal', needsReview: true, approved: true },
history: [],
});

expect(reviewResult.current_step).toBe('finalize');
expect(reviewResult.history).toEqual([
'初始化',
'普通处理',
'审核中',
'完成',
]);
});

it('应该正确处理状态回滚', async () => {
let retryCount = 0;

const testGraph = graph
.addNode('process', async (state) => {
retryCount++;
const shouldFail = retryCount < 3; // 前两次失败,第三次成功

if (shouldFail) {
return {
current_step: 'process',
history: [`处理失败 (尝试 ${retryCount})`],
error: '处理失败,需要重试',
};
}

return {
current_step: 'process',
history: [`处理成功 (尝试 ${retryCount})`],
error: null,
data: { success: true },
};
})
.addNode('retry', async (state) => {
return {
current_step: 'retry',
history: ['准备重试'],
error: null,
};
})
.addNode('success', async (state) => {
return {
current_step: 'success',
history: ['成功完成'],
completed: true,
};
})
.addNode('failure', async (state) => {
return {
current_step: 'failure',
history: ['最终失败'],
completed: true,
};
})
.addEdge('__start__', 'process')
.addConditionalEdges(
'process',
(state) => {
if (!state.error && state.data?.success) return 'success';
if (retryCount >= 5) return 'failure'; // 最多重试5次
return 'retry';
},
{
success: 'success',
retry: 'retry',
failure: 'failure',
}
)
.addEdge('retry', 'process')
.addEdge('success', '__end__')
.addEdge('failure', '__end__');

const app = testGraph.compile();
const result = await app.invoke({
current_step: 'initial',
data: {},
history: [],
});

expect(result.current_step).toBe('success');
expect(result.completed).toBe(true);
expect(retryCount).toBe(3);
expect(result.history).toContain('处理成功 (尝试 3)');
});

it('应该正确处理并行状态转换', async () => {
const testGraph = graph
.addNode('start', async (state) => {
return {
current_step: 'start',
history: ['开始并行处理'],
data: { tasks: ['task1', 'task2', 'task3'] },
};
})
.addNode('parallel_processor', async (state) => {
const tasks = state.data.tasks || [];
const results = await Promise.all(
tasks.map(async (task: string, index: number) => {
// 模拟异步处理
await new Promise((resolve) =>
setTimeout(resolve, 10 * (index + 1))
);
return `${task}_完成`;
})
);

return {
current_step: 'parallel_processor',
history: ['并行处理完成'],
data: { results },
};
})
.addNode('aggregate', async (state) => {
const results = state.data.results || [];
return {
current_step: 'aggregate',
history: ['聚合结果'],
data: {
final_result: results.join(', '),
count: results.length,
},
completed: true,
};
})
.addEdge('__start__', 'start')
.addEdge('start', 'parallel_processor')
.addEdge('parallel_processor', 'aggregate')
.addEdge('aggregate', '__end__');

const app = testGraph.compile();
const result = await app.invoke({
current_step: 'initial',
data: {},
history: [],
});

expect(result.current_step).toBe('aggregate');
expect(result.data.count).toBe(3);
expect(result.data.final_result).toBe('task1_完成, task2_完成, task3_完成');
expect(result.completed).toBe(true);
});

it('应该正确处理状态持久化和恢复', async () => {
// 模拟状态持久化
let persistedState: any = null;

const testGraph = graph
.addNode('checkpoint1', async (state) => {
const newState = {
current_step: 'checkpoint1',
history: ['到达检查点1'],
data: { checkpoint: 1, value: 100 },
};

// 模拟持久化
persistedState = { ...state, ...newState };

return newState;
})
.addNode('checkpoint2', async (state) => {
return {
current_step: 'checkpoint2',
history: ['到达检查点2'],
data: { checkpoint: 2, value: state.data.value + 50 },
};
})
.addNode('restore', async (state) => {
// 模拟从持久化状态恢复
if (persistedState) {
return {
current_step: 'restore',
history: ['从检查点恢复', ...persistedState.history],
data: { ...persistedState.data, restored: true },
};
}

return {
current_step: 'restore',
history: ['无法恢复'],
error: '没有可恢复的状态',
};
})
.addNode('final', async (state) => {
return {
current_step: 'final',
history: ['最终完成'],
completed: true,
};
})
.addEdge('__start__', 'checkpoint1')
.addEdge('checkpoint1', 'checkpoint2')
.addEdge('checkpoint2', 'restore')
.addEdge('restore', 'final')
.addEdge('final', '__end__');

const app = testGraph.compile();
const result = await app.invoke({
current_step: 'initial',
data: {},
history: [],
});

expect(result.current_step).toBe('final');
expect(result.data.restored).toBe(true);
expect(result.data.checkpoint).toBe(1); // 从检查点1恢复
expect(result.completed).toBe(true);
expect(persistedState).not.toBeNull();
});

it('应该正确验证状态转换的不变性', async () => {
const testGraph = graph
.addNode('validate_input', async (state) => {
// 验证输入状态
if (!state.data || typeof state.data.value !== 'number') {
return {
current_step: 'validate_input',
error: '输入验证失败',
history: ['输入验证失败'],
};
}

return {
current_step: 'validate_input',
history: ['输入验证通过'],
data: { ...state.data, validated: true },
};
})
.addNode('process_data', async (state) => {
// 确保状态不变性
const originalValue = state.data.value;

return {
current_step: 'process_data',
history: ['数据处理完成'],
data: {
...state.data,
processed_value: originalValue * 2,
original_value: originalValue, // 保持原始值不变
},
};
})
.addNode('verify_invariants', async (state) => {
// 验证不变性
const isValid =
state.data.validated === true &&
state.data.original_value === state.data.value &&
state.data.processed_value === state.data.value * 2;

return {
current_step: 'verify_invariants',
history: ['不变性验证完成'],
data: { ...state.data, invariants_valid: isValid },
completed: true,
};
})
.addEdge('__start__', 'validate_input')
.addEdge('validate_input', 'process_data')
.addEdge('process_data', 'verify_invariants')
.addEdge('verify_invariants', '__end__');

const app = testGraph.compile();
const result = await app.invoke({
current_step: 'initial',
data: { value: 10 },
history: [],
});

expect(result.current_step).toBe('verify_invariants');
expect(result.data.validated).toBe(true);
expect(result.data.original_value).toBe(10);
expect(result.data.processed_value).toBe(20);
expect(result.data.invariants_valid).toBe(true);
expect(result.completed).toBe(true);
});
});

// 导出状态转换测试工具
export { StateTransitionSimulator };

/*
执行结果:
RUN v3.2.4 /Users/loock/myFile/langgraphjs-tutorial/websites

stdout | examples/最佳实践/测试策略/state-transition-tests.test.ts
Line:8 🌭 path.resolve(__dirname, '.env') /Users/loock/myFile/langgraphjs-tutorial/websites/examples/utils/.env

✓ examples/最佳实践/测试策略/state-transition-tests.test.ts (6 tests) 230ms

Test Files 1 passed (1)
Tests 6 passed (6)
Start at 02:39:25
Duration 4.26s (transform 171ms, setup 434ms, collect 639ms, tests 230ms, environment 823ms, prepare 542ms)
*/

持久化测试

持久化测试
/**
* ============================================================================
* 持久化测试 - Persistence Testing
* ============================================================================
*
* 📖 概述
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* 本文件展示如何测试 LangGraph 应用的持久化功能,包括状态保存和加载、检查点
* 管理、多会话隔离、持久化错误处理、状态版本控制、大数据持久化以及并发持久化
* 操作。通过 MockPersistenceStore 模拟持久化存储,验证数据的正确性和完整性。
*
* 🎯 核心功能
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* 1️⃣ MockPersistenceStore 类:模拟持久化存储的 save/load/delete/list 操作
* 2️⃣ 状态保存和加载测试:验证状态的正确保存和恢复
* 3️⃣ 检查点和恢复测试:测试从不同检查点恢复状态
* 4️⃣ 多会话持久化测试:验证多个会话的数据隔离
* 5️⃣ 持久化错误处理测试:测试存储失败、加载失败等错误场景
* 6️⃣ 状态版本控制测试:测试不同版本状态的保存和迁移
* 7️⃣ 大数据持久化测试:测试大量数据的保存和加载性能
* 8️⃣ 并发持久化测试:测试多个并发操作的数据一致性
*
* 💡 实现思路
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* • 使用 Map 实现简单的内存持久化存储
* • 通过 JSON.stringify/parse 实现深拷贝,避免引用共享
* • 为每个会话使用唯一的 key,确保数据隔离
* • 通过检查点 ID 管理多个状态快照
* • 模拟存储错误以测试错误处理逻辑
* • 使用版本号标记不同版本的状态结构
* • 使用 beforeEach/afterEach 清理测试数据
*
* 🚀 使用方式
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* // 运行测试
* $ npx vitest 最佳实践/测试策略/persistence-tests.ts
*
* ⚠️ 注意事项
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* • 持久化测试应该覆盖保存、加载、删除等所有 CRUD 操作
* • 使用深拷贝避免测试中的对象引用问题
* • 多会话测试需要确保 session ID 的唯一性
* • 检查点恢复应该准确还原到指定的状态
* • 版本迁移测试应验证数据兼容性
* • 大数据测试要注意内存占用和性能
* • 并发测试要验证数据一致性,避免竞态条件
* • 使用 afterEach 清理测试数据,确保测试独立性
*
* @author 程哥
* @version 1.0.0
* @updated 2025-11
*/

import '../../utils/loadEnv';
import { describe, it, expect, beforeEach, afterEach } from 'vitest';
import { StateGraph, Annotation, MemorySaver } from '@langchain/langgraph';

// 持久化测试的状态定义
const PersistenceState = Annotation.Root({
step: Annotation<string>({
reducer: (x, y) => y,
}),
data: Annotation<any>({
reducer: (x, y) => ({ ...x, ...y }),
}),
counter: Annotation<number>({
reducer: (x, y) => y,
}),
history: Annotation<string[]>({
reducer: (x, y) => x.concat(y),
}),
});

// 模拟持久化存储
class MockPersistenceStore {
private store: Map<string, any> = new Map();

async save(key: string, data: any): Promise<void> {
this.store.set(key, JSON.parse(JSON.stringify(data)));
}

async load(key: string): Promise<any | null> {
const data = this.store.get(key);
return data ? JSON.parse(JSON.stringify(data)) : null;
}

async delete(key: string): Promise<void> {
this.store.delete(key);
}

async list(): Promise<string[]> {
return Array.from(this.store.keys());
}

clear(): void {
this.store.clear();
}

size(): number {
return this.store.size;
}
}

describe('持久化测试', () => {
let graph: StateGraph<typeof PersistenceState.State>;
let mockStore: MockPersistenceStore;
let memorySaver: MemorySaver;

beforeEach(() => {
graph = new StateGraph(PersistenceState);
mockStore = new MockPersistenceStore();
memorySaver = new MemorySaver();
});

afterEach(() => {
mockStore.clear();
});

it('应该正确保存和加载状态', async () => {
const testGraph = graph
.addNode('save_step', async (state) => {
const newData = { timestamp: Date.now(), value: 42 };
await mockStore.save('test_state', { ...state, data: newData });

return {
step: 'save_step',
data: newData,
history: ['状态已保存'],
};
})
.addNode('load_step', async (state) => {
const savedState = await mockStore.load('test_state');

return {
step: 'load_step',
data: savedState ? savedState.data : {},
history: ['状态已加载'],
};
})
.addEdge('__start__', 'save_step')
.addEdge('save_step', 'load_step')
.addEdge('load_step', '__end__');

const app = testGraph.compile();
const result = await app.invoke({
step: 'initial',
data: {},
counter: 0,
history: [],
});

expect(result.step).toBe('load_step');
expect(result.data.value).toBe(42);
expect(result.history).toEqual(['状态已保存', '状态已加载']);

// 验证数据确实被保存了
const savedData = await mockStore.load('test_state');
expect(savedData.data.value).toBe(42);
});

it('应该正确处理检查点和恢复', async () => {
const testGraph = graph
.addNode('checkpoint1', async (state) => {
const checkpointData = {
checkpoint_id: 'cp1',
counter: state.counter + 1,
timestamp: Date.now(),
};

await mockStore.save('checkpoint_1', checkpointData);

return {
step: 'checkpoint1',
data: checkpointData,
counter: checkpointData.counter,
history: ['检查点1已创建'],
};
})
.addNode('checkpoint2', async (state) => {
const checkpointData = {
checkpoint_id: 'cp2',
counter: state.counter + 1,
timestamp: Date.now(),
};

await mockStore.save('checkpoint_2', checkpointData);

return {
step: 'checkpoint2',
data: checkpointData,
counter: checkpointData.counter,
history: ['检查点2已创建'],
};
})
.addNode('restore', async (state) => {
// 模拟从检查点1恢复
const checkpoint = await mockStore.load('checkpoint_1');

return {
step: 'restore',
data: checkpoint || {},
counter: checkpoint ? checkpoint.counter : 0,
history: ['从检查点1恢复'],
};
})
.addEdge('__start__', 'checkpoint1')
.addEdge('checkpoint1', 'checkpoint2')
.addEdge('checkpoint2', 'restore')
.addEdge('restore', '__end__');

const app = testGraph.compile();
const result = await app.invoke({
step: 'initial',
data: {},
counter: 0,
history: [],
});

expect(result.step).toBe('restore');
expect(result.counter).toBe(1); // 从检查点1恢复,counter应该是1
expect(result.data.checkpoint_id).toBe('cp1');
expect(result.history).toEqual([
'检查点1已创建',
'检查点2已创建',
'从检查点1恢复',
]);
});

it('应该正确处理多个会话的持久化', async () => {
const createSessionGraph = (sessionId: string) => {
const sessionGraph = new StateGraph(PersistenceState);
return sessionGraph
.addNode('session_step', async (state) => {
const sessionData = {
session_id: sessionId,
counter: state.counter + 1,
timestamp: Date.now(),
};

await mockStore.save(`session_${sessionId}`, sessionData);

return {
step: 'session_step',
data: sessionData,
counter: sessionData.counter,
history: [`会话${sessionId}已处理`],
};
})
.addEdge('__start__', 'session_step')
.addEdge('session_step', '__end__');
};

// 创建多个会话
const session1Graph = createSessionGraph('user1');
const session2Graph = createSessionGraph('user2');

const app1 = session1Graph.compile();
const app2 = session2Graph.compile();

// 并行执行多个会话
const [result1, result2] = await Promise.all([
app1.invoke({ step: 'initial', data: {}, counter: 0, history: [] }),
app2.invoke({ step: 'initial', data: {}, counter: 5, history: [] }),
]);

expect(result1.data.session_id).toBe('user1');
expect(result1.counter).toBe(1);
expect(result2.data.session_id).toBe('user2');
expect(result2.counter).toBe(6);

// 验证会话数据独立保存
const session1Data = await mockStore.load('session_user1');
const session2Data = await mockStore.load('session_user2');

expect(session1Data.session_id).toBe('user1');
expect(session2Data.session_id).toBe('user2');
expect(session1Data.counter).toBe(1);
expect(session2Data.counter).toBe(6);
});

it('应该正确处理持久化错误', async () => {
// 模拟存储错误
const errorStore = {
save: async () => {
throw new Error('存储失败');
},
load: async () => {
throw new Error('加载失败');
},
};

const testGraph = graph
.addNode('error_save', async (state) => {
try {
await errorStore.save('test', state);
return {
step: 'error_save',
history: ['保存成功'],
};
} catch (error) {
return {
step: 'error_save',
data: {
error: error instanceof Error ? error.message : '未知错误',
},
history: ['保存失败'],
};
}
})
.addNode('error_load', async (state) => {
try {
await errorStore.load('test');
return {
step: 'error_load',
history: ['加载成功'],
};
} catch (error) {
return {
step: 'error_load',
data: {
...state.data,
load_error: error instanceof Error ? error.message : '未知错误',
},
history: ['加载失败'],
};
}
})
.addEdge('__start__', 'error_save')
.addEdge('error_save', 'error_load')
.addEdge('error_load', '__end__');

const app = testGraph.compile();
const result = await app.invoke({
step: 'initial',
data: {},
counter: 0,
history: [],
});

expect(result.step).toBe('error_load');
expect(result.data.error).toBe('存储失败');
expect(result.data.load_error).toBe('加载失败');
expect(result.history).toEqual(['保存失败', '加载失败']);
});

it('应该正确处理状态版本控制', async () => {
const testGraph = graph
.addNode('version1', async (state) => {
const versionedData = {
version: 1,
data: { value: 'v1_data' },
timestamp: Date.now(),
};

await mockStore.save('versioned_state_v1', versionedData);

return {
step: 'version1',
data: versionedData,
history: ['版本1已保存'],
};
})
.addNode('version2', async (state) => {
const versionedData = {
version: 2,
data: { value: 'v2_data', newField: 'added_in_v2' },
timestamp: Date.now(),
};

await mockStore.save('versioned_state_v2', versionedData);

return {
step: 'version2',
data: versionedData,
history: ['版本2已保存'],
};
})
.addNode('migrate', async (state) => {
const v1Data = await mockStore.load('versioned_state_v1');
const v2Data = await mockStore.load('versioned_state_v2');

// 模拟数据迁移
const migratedData = {
version: 2,
data: {
...v1Data.data,
newField: 'migrated_from_v1',
},
migrated: true,
};

await mockStore.save('migrated_state', migratedData);

return {
step: 'migrate',
data: migratedData,
history: ['数据迁移完成'],
};
})
.addEdge('__start__', 'version1')
.addEdge('version1', 'version2')
.addEdge('version2', 'migrate')
.addEdge('migrate', '__end__');

const app = testGraph.compile();
const result = await app.invoke({
step: 'initial',
data: {},
counter: 0,
history: [],
});

expect(result.step).toBe('migrate');
expect(result.data.version).toBe(2);
expect(result.data.data.value).toBe('v1_data');
expect(result.data.data.newField).toBe('migrated_from_v1');
expect(result.data.migrated).toBe(true);
});

it('应该正确处理大量数据的持久化', async () => {
const testGraph = graph
.addNode('generate_large_data', async (state) => {
// 生成大量测试数据
const largeData = Array.from({ length: 1000 }, (_, i) => ({
id: i,
value: `item_${i}`,
timestamp: Date.now() + i,
}));

await mockStore.save('large_dataset', largeData);

return {
step: 'generate_large_data',
data: { count: largeData.length },
history: ['大数据集已生成'],
};
})
.addNode('process_large_data', async (state) => {
const largeData = await mockStore.load('large_dataset');

// 模拟数据处理
const processedCount = largeData ? largeData.length : 0;
const summary = {
total: processedCount,
processed: processedCount,
timestamp: Date.now(),
};

await mockStore.save('processing_summary', summary);

return {
step: 'process_large_data',
data: summary,
history: ['大数据集处理完成'],
};
})
.addEdge('__start__', 'generate_large_data')
.addEdge('generate_large_data', 'process_large_data')
.addEdge('process_large_data', '__end__');

const app = testGraph.compile();
const startTime = Date.now();
const result = await app.invoke({
step: 'initial',
data: {},
counter: 0,
history: [],
});
const endTime = Date.now();

expect(result.step).toBe('process_large_data');
expect(result.data.total).toBe(1000);
expect(result.data.processed).toBe(1000);

// 验证性能 - 大数据处理应该在合理时间内完成
expect(endTime - startTime).toBeLessThan(5000); // 5秒内完成

// 验证数据确实被保存
const savedData = await mockStore.load('large_dataset');
expect(savedData).toHaveLength(1000);
});

it('应该正确处理并发持久化操作', async () => {
const createConcurrentGraph = (id: number) => {
const concurrentGraph = new StateGraph(PersistenceState);
return concurrentGraph
.addNode('concurrent_save', async (state) => {
// 模拟并发保存操作
await new Promise((resolve) =>
setTimeout(resolve, Math.random() * 100)
);

const data = {
id,
timestamp: Date.now(),
counter: state.counter + id,
};

await mockStore.save(`concurrent_${id}`, data);

return {
step: 'concurrent_save',
data,
counter: data.counter,
history: [`并发操作${id}完成`],
};
})
.addEdge('__start__', 'concurrent_save')
.addEdge('concurrent_save', '__end__');
};

// 创建多个并发图
const concurrentGraphs = Array.from({ length: 5 }, (_, i) =>
createConcurrentGraph(i + 1)
);

// 并发执行
const results = await Promise.all(
concurrentGraphs.map((graph, index) =>
graph.compile().invoke({
step: 'initial',
data: {},
counter: 0,
history: [],
})
)
);

// 验证所有操作都成功完成
results.forEach((result, index) => {
expect(result.step).toBe('concurrent_save');
expect(result.data.id).toBe(index + 1);
expect(result.counter).toBe(index + 1);
});

// 验证所有数据都被正确保存
const savedKeys = await mockStore.list();
expect(savedKeys).toHaveLength(5);

for (let i = 1; i <= 5; i++) {
const savedData = await mockStore.load(`concurrent_${i}`);
expect(savedData.id).toBe(i);
}
});
});

// 导出持久化测试工具
export { MockPersistenceStore };

/*
执行结果:
RUN v3.2.4 /Users/loock/myFile/langgraphjs-tutorial/websites

stdout | examples/最佳实践/测试策略/persistence-tests.test.ts
Line:8 🌭 path.resolve(__dirname, '.env') /Users/loock/myFile/langgraphjs-tutorial/websites/examples/utils/.env

✓ examples/最佳实践/测试策略/persistence-tests.test.ts (7 tests) 276ms

Test Files 1 passed (1)
Tests 7 passed (7)
Start at 02:39:26
Duration 3.57s (transform 87ms, setup 484ms, collect 528ms, tests 276ms, environment 697ms, prepare 270ms)
*/

🚨 错误处理测试

错误场景测试

错误处理测试
/**
* ============================================================================
* 错误处理测试 - Error Handling Testing
* ============================================================================
*
* 📖 概述
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* 本文件展示如何全面测试 LangGraph 应用的错误处理能力,包括基本错误捕获、
* 错误恢复机制(重试)、错误传播、异步错误处理、并发错误处理以及错误分类
* 和差异化处理策略。通过多种错误场景确保应用的健壮性和可靠性。
*
* 🎯 核心功能
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* 1️⃣ 基本错误捕获:测试网络错误、超时错误、验证错误、权限错误等
* 2️⃣ 错误恢复机制:实现重试逻辑,测试自动恢复和最大重试限制
* 3️⃣ 错误传播与累积:测试错误信息的正确传递和多个错误的累积
* 4️⃣ 异步错误处理:测试 Promise 拒绝和并发操作中的错误处理
* 5️⃣ 错误分类处理:根据错误类型采取不同的处理策略(可恢复、致命、警告)
*
* 💡 实现思路
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* • 使用工厂函数创建可配置的错误节点,支持不同类型的错误
* • 通过状态中的 retryCount 字段跟踪重试次数
* • 使用 try-catch 捕获错误并转换为状态更新
* • 利用 errors 数组累积多个错误信息
* • 使用 Promise.allSettled 处理并发错误
* • 通过 switch 语句实现基于错误类型的差异化处理
*
* 🚀 使用方式
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* // 运行测试
* $ npx vitest 最佳实践/测试策略/error-handling-tests.ts
*
* ⚠️ 注意事项
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* • 错误测试应该覆盖所有可能的错误路径,不仅测试成功情况
* • 重试逻辑需要设置合理的最大重试次数,避免无限循环
* • 使用 vi.fn() 模拟可以精确控制失败次数和成功时机
* • 错误消息应该包含足够的上下文信息,便于调试
* • 区分可恢复错误和致命错误,采取不同的处理策略
* • 异步错误需要正确处理 Promise 拒绝,避免未捕获的错误
*
* @author 程哥
* @version 1.0.0
* @updated 2025-11
*/

import '../../utils/loadEnv';
import { describe, it, expect, vi } from 'vitest';
import { StateGraph, Annotation } from '@langchain/langgraph';

// 错误处理测试状态定义
const ErrorTestState = Annotation.Root({
step: Annotation<string>({
reducer: (x, y) => y,
}),
data: Annotation<any>({
reducer: (x, y) => ({ ...x, ...y }),
}),
errors: Annotation<string[]>({
reducer: (x, y) => x.concat(y),
}),
retryCount: Annotation<number>({
reducer: (x, y) => y,
}),
});

// 模拟错误节点
const createErrorNode = (errorType: string, shouldFail: boolean = true) => {
return async (state: typeof ErrorTestState.State) => {
if (shouldFail) {
switch (errorType) {
case 'network':
throw new Error('网络连接失败');
case 'timeout':
throw new Error('请求超时');
case 'validation':
throw new Error('数据验证失败');
case 'permission':
throw new Error('权限不足');
default:
throw new Error('未知错误');
}
}

return {
step: 'success',
data: { processed: true },
};
};
};

// 错误恢复节点
const createRecoveryNode = () => {
return async (state: typeof ErrorTestState.State) => {
const retryCount = state.retryCount || 0;

if (retryCount < 3) {
return {
step: 'retry',
retryCount: retryCount + 1,
errors: [`重试第 ${retryCount + 1}`],
};
}

return {
step: 'failed',
errors: ['达到最大重试次数'],
};
};
};

describe('错误处理测试', () => {
describe('基本错误捕获', () => {
it('应该捕获网络错误', async () => {
const graph = new StateGraph(ErrorTestState);

graph
.addNode('network_call', createErrorNode('network'))
.addEdge('__start__', 'network_call')
.addEdge('network_call', '__end__');

const app = graph.compile();

await expect(
app.invoke({
step: 'initial',
data: {},
errors: [],
retryCount: 0,
})
).rejects.toThrow('网络连接失败');
});

it('应该捕获超时错误', async () => {
const graph = new StateGraph(ErrorTestState);

graph
.addNode('timeout_call', createErrorNode('timeout'))
.addEdge('__start__', 'timeout_call')
.addEdge('timeout_call', '__end__');

const app = graph.compile();

await expect(
app.invoke({
step: 'initial',
data: {},
errors: [],
retryCount: 0,
})
).rejects.toThrow('请求超时');
});

it('应该捕获验证错误', async () => {
const graph = new StateGraph(ErrorTestState);

graph
.addNode('validation', createErrorNode('validation'))
.addEdge('__start__', 'validation')
.addEdge('validation', '__end__');

const app = graph.compile();

await expect(
app.invoke({
step: 'initial',
data: {},
errors: [],
retryCount: 0,
})
).rejects.toThrow('数据验证失败');
});
});

describe('错误恢复机制', () => {
it('应该实现重试机制', async () => {
const mockNode = vi
.fn()
.mockRejectedValueOnce(new Error('第一次失败'))
.mockRejectedValueOnce(new Error('第二次失败'))
.mockResolvedValueOnce({
step: 'success',
data: { processed: true },
});

const retryNode = async (state: typeof ErrorTestState.State) => {
try {
return await mockNode();
} catch (error) {
const retryCount = state.retryCount || 0;
if (retryCount < 2) {
return {
step: 'retry',
retryCount: retryCount + 1,
errors: [`重试第 ${retryCount + 1} 次: ${error.message}`],
};
}
throw error;
}
};

const graph = new StateGraph(ErrorTestState);

graph
.addNode('retry_node', retryNode)
.addEdge('__start__', 'retry_node')
.addEdge('retry_node', '__end__');

const app = graph.compile();

const result = await app.invoke({
step: 'initial',
data: {},
errors: [],
retryCount: 0,
});

expect(result.step).toBe('success');
expect(mockNode).toHaveBeenCalledTimes(3);
});

it('应该在达到最大重试次数后失败', async () => {
const mockNode = vi.fn().mockRejectedValue(new Error('持续失败'));

const retryNode = async (state: typeof ErrorTestState.State) => {
try {
return await mockNode();
} catch (error) {
const retryCount = state.retryCount || 0;
if (retryCount < 2) {
return {
step: 'retry',
retryCount: retryCount + 1,
errors: [`重试第 ${retryCount + 1} 次: ${error.message}`],
};
}
throw new Error(`达到最大重试次数: ${error.message}`);
}
};

const graph = new StateGraph(ErrorTestState);

graph
.addNode('retry_node', retryNode)
.addEdge('__start__', 'retry_node')
.addEdge('retry_node', '__end__');

const app = graph.compile();

await expect(
app.invoke({
step: 'initial',
data: {},
errors: [],
retryCount: 0,
})
).rejects.toThrow('达到最大重试次数');

expect(mockNode).toHaveBeenCalledTimes(3);
});
});

describe('错误传播', () => {
it('应该正确传播错误信息', async () => {
const errorCollectorNode = async (state: typeof ErrorTestState.State) => {
try {
throw new Error('测试错误');
} catch (error) {
return {
step: 'error_collected',
errors: [error.message],
data: { errorHandled: true },
};
}
};

const graph = new StateGraph(ErrorTestState);

graph
.addNode('error_collector', errorCollectorNode)
.addEdge('__start__', 'error_collector')
.addEdge('error_collector', '__end__');

const app = graph.compile();

const result = await app.invoke({
step: 'initial',
data: {},
errors: [],
retryCount: 0,
});

expect(result.step).toBe('error_collected');
expect(result.errors).toContain('测试错误');
expect(result.data.errorHandled).toBe(true);
});

it('应该累积多个错误', async () => {
const multiErrorNode = async (state: typeof ErrorTestState.State) => {
const errors = [...state.errors];

// 模拟多个验证错误
if (!state.data?.name) {
errors.push('缺少名称字段');
}
if (!state.data?.email) {
errors.push('缺少邮箱字段');
}
if (state.data?.age && state.data.age < 0) {
errors.push('年龄不能为负数');
}

return {
step: 'validation_complete',
errors,
data: { validated: errors.length === 0 },
};
};

const graph = new StateGraph(ErrorTestState);

graph
.addNode('multi_error', multiErrorNode)
.addEdge('__start__', 'multi_error')
.addEdge('multi_error', '__end__');

const app = graph.compile();

const result = await app.invoke({
step: 'initial',
data: { age: -5 },
errors: [],
retryCount: 0,
});

expect(result.errors).toHaveLength(3);
expect(result.errors).toContain('缺少名称字段');
expect(result.errors).toContain('缺少邮箱字段');
expect(result.errors).toContain('年龄不能为负数');
expect(result.data.validated).toBe(false);
});
});

describe('异步错误处理', () => {
it('应该处理 Promise 拒绝', async () => {
const asyncErrorNode = async (state: typeof ErrorTestState.State) => {
const promise = new Promise((_, reject) => {
setTimeout(() => reject(new Error('异步操作失败')), 10);
});

try {
await promise;
return { step: 'success' };
} catch (error) {
return {
step: 'async_error',
errors: [error.message],
};
}
};

const graph = new StateGraph(ErrorTestState);

graph
.addNode('async_error', asyncErrorNode)
.addEdge('__start__', 'async_error')
.addEdge('async_error', '__end__');

const app = graph.compile();

const result = await app.invoke({
step: 'initial',
data: {},
errors: [],
retryCount: 0,
});

expect(result.step).toBe('async_error');
expect(result.errors).toContain('异步操作失败');
});

it('应该处理并发错误', async () => {
const concurrentErrorNode = async (
state: typeof ErrorTestState.State
) => {
const promises = [
Promise.reject(new Error('任务1失败')),
Promise.reject(new Error('任务2失败')),
Promise.resolve('任务3成功'),
];

const results = await Promise.allSettled(promises);
const errors = results
.filter((result) => result.status === 'rejected')
.map((result) => (result as PromiseRejectedResult).reason.message);

return {
step: 'concurrent_complete',
errors,
data: {
successCount: results.filter((r) => r.status === 'fulfilled')
.length,
failureCount: errors.length,
},
};
};

const graph = new StateGraph(ErrorTestState);

graph
.addNode('concurrent_error', concurrentErrorNode)
.addEdge('__start__', 'concurrent_error')
.addEdge('concurrent_error', '__end__');

const app = graph.compile();

const result = await app.invoke({
step: 'initial',
data: {},
errors: [],
retryCount: 0,
});

expect(result.step).toBe('concurrent_complete');
expect(result.errors).toHaveLength(2);
expect(result.data.successCount).toBe(1);
expect(result.data.failureCount).toBe(2);
});
});

describe('错误分类和处理', () => {
it('应该根据错误类型采取不同处理策略', async () => {
const errorClassificationNode = async (
state: typeof ErrorTestState.State
) => {
const errorType = state.data?.errorType;

switch (errorType) {
case 'recoverable':
return {
step: 'retry_scheduled',
errors: ['可恢复错误,已安排重试'],
};
case 'fatal':
return {
step: 'failed',
errors: ['致命错误,无法恢复'],
};
case 'warning':
return {
step: 'continue_with_warning',
errors: ['警告:操作可能不完整'],
data: { hasWarning: true },
};
default:
return {
step: 'unknown_error',
errors: ['未知错误类型'],
};
}
};

const graph = new StateGraph(ErrorTestState);

graph
.addNode('error_classification', errorClassificationNode)
.addEdge('__start__', 'error_classification')
.addEdge('error_classification', '__end__');

const app = graph.compile();

// 测试可恢复错误
const recoverableResult = await app.invoke({
step: 'initial',
data: { errorType: 'recoverable' },
errors: [],
retryCount: 0,
});

expect(recoverableResult.step).toBe('retry_scheduled');
expect(recoverableResult.errors).toContain('可恢复错误,已安排重试');

// 测试致命错误
const fatalResult = await app.invoke({
step: 'initial',
data: { errorType: 'fatal' },
errors: [],
retryCount: 0,
});

expect(fatalResult.step).toBe('failed');
expect(fatalResult.errors).toContain('致命错误,无法恢复');
});
});
});

// 导出测试工具
export { ErrorTestState, createErrorNode, createRecoveryNode };
/*
执行结果:
Line:8 🌭 path.resolve(__dirname, '.env') /home/user/langgraphjs-tutorial/websites/examples/utils/.env
Error: Vitest cannot be imported in a CommonJS module
(这些是测试文件,需要使用 pnpm test 而不是 esno 运行)
*/

边界条件测试

边界条件测试
/**
* ============================================================================
* 边界条件测试 - Boundary Condition Testing
* ============================================================================
*
* 📖 概述
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* 本文件展示如何全面测试 LangGraph 应用的边界条件,包括数值边界(最小值、最大值、
* 超出范围)、字符串边界(空字符串、最大长度、超长字符串)、数组边界(空数组、
* 单元素、大数组)以及特殊值(null、undefined、NaN、Infinity)的测试。
*
* 🎯 核心功能
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* 1️⃣ 数值边界测试:测试最小值(0)、最大值(100)、负数、超过最大值等边界情况
* 2️⃣ 字符串边界测试:测试空字符串、最大长度(255)、超长字符串等情况
* 3️⃣ 数组边界测试:测试空数组、单元素数组、大数组等边界情况
* 4️⃣ 特殊值测试:测试 null、undefined、NaN、Infinity、-Infinity 等特殊值
* 5️⃣ 类型转换边界测试:测试类型转换的边界情况和失败场景
*
* 💡 实现思路
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* • 使用通用的边界测试节点工厂函数,根据数据类型执行不同的边界检查
* • 通过 typeof 和 Array.isArray 区分不同的数据类型
* • 对每种类型定义明确的边界值(如数值的 0-100,字符串的 255)
* • 使用 Number.isNaN、Number.isFinite 等 API 检测特殊数值
* • 测试类型转换的有效性和边界情况
*
* 🚀 使用方式
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* // 运行测试
* $ npx vitest 最佳实践/测试策略/boundary-tests.ts
*
* ⚠️ 注意事项
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* • 边界值应根据实际业务需求定义,本例中的值(100、255、1000)仅为示例
* • NaN 的比较需要使用 Number.isNaN(),直接用 === 比较会失败
* • null 和 undefined 的行为不同,需要分别测试
* • 类型转换可能产生意外结果,需要充分测试
* • 边界测试应覆盖"恰好在边界"、"刚好超出边界"两种情况
*
* @author 程哥
* @version 1.0.0
* @updated 2025-11
*/

import '../../utils/loadEnv';
import { describe, it, expect } from 'vitest';
import { StateGraph, Annotation } from '@langchain/langgraph';

// 边界测试状态定义
const BoundaryTestState = Annotation.Root({
step: Annotation<string>({
reducer: (x, y) => y,
}),
data: Annotation<any>({
reducer: (x, y) => ({ ...x, ...y }),
}),
results: Annotation<any[]>({
reducer: (x, y) => x.concat(y),
}),
});

// 边界值测试节点
const createBoundaryTestNode = () => {
return async (state: typeof BoundaryTestState.State) => {
const value = state.data?.value;
const results = [];

// 测试数值边界
if (typeof value === 'number') {
results.push({
type: 'number_boundary',
value,
isValid: value >= 0 && value <= 100,
isMinBoundary: value === 0,
isMaxBoundary: value === 100,
isOutOfRange: value < 0 || value > 100,
});
}

// 测试字符串边界
if (typeof value === 'string') {
results.push({
type: 'string_boundary',
value,
length: value.length,
isEmpty: value.length === 0,
isMaxLength: value.length === 255,
isOverLimit: value.length > 255,
});
}

// 测试数组边界
if (Array.isArray(value)) {
results.push({
type: 'array_boundary',
value,
length: value.length,
isEmpty: value.length === 0,
isSingleItem: value.length === 1,
isMaxSize: value.length === 1000,
isOverLimit: value.length > 1000,
});
}

return {
step: 'boundary_tested',
results,
};
};
};

describe('边界条件测试', () => {
describe('数值边界测试', () => {
it('应该正确处理最小边界值', async () => {
const graph = new StateGraph(BoundaryTestState);

graph
.addNode('boundary_test', createBoundaryTestNode())
.addEdge('__start__', 'boundary_test')
.addEdge('boundary_test', '__end__');

const app = graph.compile();

const result = await app.invoke({
step: 'initial',
data: { value: 0 },
results: [],
});

expect(result.results[0].isMinBoundary).toBe(true);
expect(result.results[0].isValid).toBe(true);
expect(result.results[0].isOutOfRange).toBe(false);
});

it('应该正确处理最大边界值', async () => {
const graph = new StateGraph(BoundaryTestState);

graph
.addNode('boundary_test', createBoundaryTestNode())
.addEdge('__start__', 'boundary_test')
.addEdge('boundary_test', '__end__');

const app = graph.compile();

const result = await app.invoke({
step: 'initial',
data: { value: 100 },
results: [],
});

expect(result.results[0].isMaxBoundary).toBe(true);
expect(result.results[0].isValid).toBe(true);
expect(result.results[0].isOutOfRange).toBe(false);
});

it('应该正确处理超出范围的值', async () => {
const graph = new StateGraph(BoundaryTestState);

graph
.addNode('boundary_test', createBoundaryTestNode())
.addEdge('__start__', 'boundary_test')
.addEdge('boundary_test', '__end__');

const app = graph.compile();

// 测试负数
const negativeResult = await app.invoke({
step: 'initial',
data: { value: -1 },
results: [],
});

expect(negativeResult.results[0].isOutOfRange).toBe(true);
expect(negativeResult.results[0].isValid).toBe(false);

// 测试超过最大值
const overMaxResult = await app.invoke({
step: 'initial',
data: { value: 101 },
results: [],
});

expect(overMaxResult.results[0].isOutOfRange).toBe(true);
expect(overMaxResult.results[0].isValid).toBe(false);
});
});

describe('字符串边界测试', () => {
it('应该正确处理空字符串', async () => {
const graph = new StateGraph(BoundaryTestState);

graph
.addNode('boundary_test', createBoundaryTestNode())
.addEdge('__start__', 'boundary_test')
.addEdge('boundary_test', '__end__');

const app = graph.compile();

const result = await app.invoke({
step: 'initial',
data: { value: '' },
results: [],
});

expect(result.results[0].isEmpty).toBe(true);
expect(result.results[0].length).toBe(0);
});

it('应该正确处理最大长度字符串', async () => {
const graph = new StateGraph(BoundaryTestState);

graph
.addNode('boundary_test', createBoundaryTestNode())
.addEdge('__start__', 'boundary_test')
.addEdge('boundary_test', '__end__');

const app = graph.compile();

const maxLengthString = 'a'.repeat(255);
const result = await app.invoke({
step: 'initial',
data: { value: maxLengthString },
results: [],
});

expect(result.results[0].isMaxLength).toBe(true);
expect(result.results[0].length).toBe(255);
});

it('应该正确处理超长字符串', async () => {
const graph = new StateGraph(BoundaryTestState);

graph
.addNode('boundary_test', createBoundaryTestNode())
.addEdge('__start__', 'boundary_test')
.addEdge('boundary_test', '__end__');

const app = graph.compile();

const overLengthString = 'a'.repeat(256);
const result = await app.invoke({
step: 'initial',
data: { value: overLengthString },
results: [],
});

expect(result.results[0].isOverLimit).toBe(true);
expect(result.results[0].length).toBe(256);
});
});

describe('数组边界测试', () => {
it('应该正确处理空数组', async () => {
const graph = new StateGraph(BoundaryTestState);

graph
.addNode('boundary_test', createBoundaryTestNode())
.addEdge('__start__', 'boundary_test')
.addEdge('boundary_test', '__end__');

const app = graph.compile();

const result = await app.invoke({
step: 'initial',
data: { value: [] },
results: [],
});

expect(result.results[0].isEmpty).toBe(true);
expect(result.results[0].length).toBe(0);
});

it('应该正确处理单元素数组', async () => {
const graph = new StateGraph(BoundaryTestState);

graph
.addNode('boundary_test', createBoundaryTestNode())
.addEdge('__start__', 'boundary_test')
.addEdge('boundary_test', '__end__');

const app = graph.compile();

const result = await app.invoke({
step: 'initial',
data: { value: [1] },
results: [],
});

expect(result.results[0].isSingleItem).toBe(true);
expect(result.results[0].length).toBe(1);
});

it('应该正确处理大数组', async () => {
const graph = new StateGraph(BoundaryTestState);

graph
.addNode('boundary_test', createBoundaryTestNode())
.addEdge('__start__', 'boundary_test')
.addEdge('boundary_test', '__end__');

const app = graph.compile();

const largeArray = new Array(1001).fill(0);
const result = await app.invoke({
step: 'initial',
data: { value: largeArray },
results: [],
});

expect(result.results[0].isOverLimit).toBe(true);
expect(result.results[0].length).toBe(1001);
});
});

describe('特殊值测试', () => {
it('应该正确处理 null 和 undefined', async () => {
const nullTestNode = async (state: typeof BoundaryTestState.State) => {
const value = state.data?.value;

return {
step: 'null_tested',
results: [
{
type: 'null_test',
value,
isNull: value === null,
isUndefined: value === undefined,
isFalsy: !value,
},
],
};
};

const graph = new StateGraph(BoundaryTestState);

graph
.addNode('null_test', nullTestNode)
.addEdge('__start__', 'null_test')
.addEdge('null_test', '__end__');

const app = graph.compile();

// 测试 null
const nullResult = await app.invoke({
step: 'initial',
data: { value: null },
results: [],
});

expect(nullResult.results[0].isNull).toBe(true);
expect(nullResult.results[0].isFalsy).toBe(true);

// 测试 undefined
const undefinedResult = await app.invoke({
step: 'initial',
data: { value: undefined },
results: [],
});

expect(undefinedResult.results[0].isUndefined).toBe(true);
expect(undefinedResult.results[0].isFalsy).toBe(true);
});

it('应该正确处理 NaN 和 Infinity', async () => {
const specialNumberTestNode = async (
state: typeof BoundaryTestState.State
) => {
const value = state.data?.value;

return {
step: 'special_number_tested',
results: [
{
type: 'special_number_test',
value,
isNaN: Number.isNaN(value),
isInfinite: !Number.isFinite(value) && !Number.isNaN(value),
isPositiveInfinity: value === Infinity,
isNegativeInfinity: value === -Infinity,
},
],
};
};

const graph = new StateGraph(BoundaryTestState);

graph
.addNode('special_number_test', specialNumberTestNode)
.addEdge('__start__', 'special_number_test')
.addEdge('special_number_test', '__end__');

const app = graph.compile();

// 测试 NaN
const nanResult = await app.invoke({
step: 'initial',
data: { value: NaN },
results: [],
});

expect(nanResult.results[0].isNaN).toBe(true);

// 测试 Infinity
const infinityResult = await app.invoke({
step: 'initial',
data: { value: Infinity },
results: [],
});

expect(infinityResult.results[0].isPositiveInfinity).toBe(true);
expect(infinityResult.results[0].isInfinite).toBe(true);

// 测试 -Infinity
const negInfinityResult = await app.invoke({
step: 'initial',
data: { value: -Infinity },
results: [],
});

expect(negInfinityResult.results[0].isNegativeInfinity).toBe(true);
expect(negInfinityResult.results[0].isInfinite).toBe(true);
});
});

describe('类型边界测试', () => {
it('应该正确处理类型转换边界', async () => {
const typeConversionTestNode = async (
state: typeof BoundaryTestState.State
) => {
const value = state.data?.value;

return {
step: 'type_conversion_tested',
results: [
{
type: 'type_conversion_test',
originalValue: value,
originalType: typeof value,
stringValue: String(value),
numberValue: Number(value),
booleanValue: Boolean(value),
isValidNumber: !Number.isNaN(Number(value)),
},
],
};
};

const graph = new StateGraph(BoundaryTestState);

graph
.addNode('type_conversion_test', typeConversionTestNode)
.addEdge('__start__', 'type_conversion_test')
.addEdge('type_conversion_test', '__end__');

const app = graph.compile();

// 测试字符串转数字
const stringToNumberResult = await app.invoke({
step: 'initial',
data: { value: '123' },
results: [],
});

expect(stringToNumberResult.results[0].isValidNumber).toBe(true);
expect(stringToNumberResult.results[0].numberValue).toBe(123);

// 测试无效字符串转数字
const invalidStringResult = await app.invoke({
step: 'initial',
data: { value: 'abc' },
results: [],
});

expect(invalidStringResult.results[0].isValidNumber).toBe(false);
expect(Number.isNaN(invalidStringResult.results[0].numberValue)).toBe(
true
);
});
});
});

// 导出测试工具
export { BoundaryTestState, createBoundaryTestNode };

/*
执行结果:
RUN v3.2.4 /Users/loock/myFile/langgraphjs-tutorial/websites

stdout | examples/最佳实践/测试策略/boundary-tests.test.ts
Line:8 🌭 path.resolve(__dirname, '.env') /Users/loock/myFile/langgraphjs-tutorial/websites/examples/utils/.env

✓ examples/最佳实践/测试策略/boundary-tests.test.ts (12 tests) 160ms

Test Files 1 passed (1)
Tests 12 passed (12)
Start at 02:39:25
Duration 4.12s (transform 96ms, setup 479ms, collect 503ms, tests 160ms, environment 764ms, prepare 691ms)
*/

📊 性能测试

基准测试

性能基准测试
/**
* ============================================================================
* 性能基准测试 - Performance Benchmark Testing
* ============================================================================
*
* 📖 概述
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* 本文件展示如何对 LangGraph 应用进行性能基准测试,包括不同工作负载(轻量级、
* 中等、重量级)下的性能测试、并发性能测试、内存使用监控、性能回归测试以及
* 资源限制测试。通过 PerformanceMonitor 类收集执行时间和内存使用数据。
*
* 🎯 核心功能
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* 1️⃣ PerformanceMonitor 类:监控执行时间和内存使用的工具
* 2️⃣ 工作负载测试:测试轻量级、中等、重量级三种工作负载的性能
* 3️⃣ 并发性能测试:测试多个并发请求的执行效率
* 4️⃣ 内存使用测试:监控堆内存和外部内存的使用情况
* 5️⃣ 性能回归测试:通过多次运行检测性能变化和稳定性
* 6️⃣ 资源限制测试:测试在时间或资源受限环境下的表现
*
* 💡 实现思路
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* • 使用 performance.now() 获取高精度时间戳
* • 通过 process.memoryUsage() 监控内存使用情况
* • 设计不同强度的工作负载(数据量、计算复杂度)
* • 计算吞吐量(items/second)评估处理能力
* • 使用 Promise.all 测试并发性能
* • 多次运行计算平均值、方差,检测性能稳定性
* • 在资源限制下定期检查执行时间并提前终止
*
* 🚀 使用方式
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* // 运行测试
* $ npx vitest 最佳实践/测试策略/performance-tests.ts
*
* ⚠️ 注意事项
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* • 性能测试结果受运行环境影响,应在稳定环境下运行
* • 重量级测试需要较长的超时时间(如 10000ms)
* • 内存监控依赖 Node.js 环境,浏览器环境不可用
* • 并发测试应考虑系统并发能力限制
* • 性能阈值应基于实际业务需求和硬件条件设定
* • 性能回归测试通过变异系数(variance)评估稳定性
*
* @author 程哥
* @version 1.0.0
* @updated 2025-11
*/

import '../../utils/loadEnv';
import { describe, it, expect } from 'vitest';
import { StateGraph, Annotation } from '@langchain/langgraph';

// 性能测试状态定义
const PerformanceTestState = Annotation.Root({
step: Annotation<string>({
reducer: (x, y) => y,
}),
data: Annotation<any>({
reducer: (x, y) => ({ ...x, ...y }),
}),
metrics: Annotation<any>({
reducer: (x, y) => ({ ...x, ...y }),
}),
});

// 性能监控工具
class PerformanceMonitor {
private startTime: number = 0;
private endTime: number = 0;
private memoryStart: any = null;

start() {
this.startTime = performance.now();
if (typeof process !== 'undefined' && process.memoryUsage) {
this.memoryStart = process.memoryUsage();
}
}

end() {
this.endTime = performance.now();
const duration = this.endTime - this.startTime;

let memoryUsage = null;
if (
this.memoryStart &&
typeof process !== 'undefined' &&
process.memoryUsage
) {
const memoryEnd = process.memoryUsage();
memoryUsage = {
heapUsed: memoryEnd.heapUsed - this.memoryStart.heapUsed,
heapTotal: memoryEnd.heapTotal - this.memoryStart.heapTotal,
external: memoryEnd.external - this.memoryStart.external,
};
}

return {
duration,
memoryUsage,
startTime: this.startTime,
endTime: this.endTime,
};
}
}

// 创建性能测试节点
const createPerformanceTestNode = (workload: 'light' | 'medium' | 'heavy') => {
return async (state: typeof PerformanceTestState.State) => {
const monitor = new PerformanceMonitor();
monitor.start();

// 模拟不同强度的工作负载
let result;
switch (workload) {
case 'light':
result = await lightWorkload();
break;
case 'medium':
result = await mediumWorkload();
break;
case 'heavy':
result = await heavyWorkload();
break;
}

const metrics = monitor.end();

return {
step: `${workload}_workload_completed`,
data: result,
metrics: {
...metrics,
workload,
throughput: result.itemsProcessed / (metrics.duration / 1000),
},
};
};
};

// 轻量级工作负载
const lightWorkload = async () => {
const items = Array.from({ length: 100 }, (_, i) => i);
const processed = items.map((x) => x * 2);

return {
itemsProcessed: processed.length,
result: processed.slice(0, 5),
};
};

// 中等工作负载
const mediumWorkload = async () => {
const items = Array.from({ length: 1000 }, (_, i) => i);
const processed = items.map((x) => Math.sqrt(x * x + 1));

// 模拟异步操作
await new Promise((resolve) => setTimeout(resolve, 10));

return {
itemsProcessed: processed.length,
result: processed.slice(0, 5),
};
};

// 重量级工作负载
const heavyWorkload = async () => {
const items = Array.from({ length: 10000 }, (_, i) => i);
const processed = [];

for (let i = 0; i < items.length; i++) {
// 模拟复杂计算
let sum = 0;
for (let j = 0; j < 100; j++) {
sum += Math.sin(items[i] + j);
}
processed.push(sum);

// 每1000次迭代暂停一下
if (i % 1000 === 0) {
await new Promise((resolve) => setTimeout(resolve, 1));
}
}

return {
itemsProcessed: processed.length,
result: processed.slice(0, 5),
};
};

describe('性能基准测试', () => {
describe('单节点性能测试', () => {
it('轻量级工作负载应在合理时间内完成', async () => {
const graph = new StateGraph(PerformanceTestState);

graph
.addNode('light_work', createPerformanceTestNode('light'))
.addEdge('__start__', 'light_work')
.addEdge('light_work', '__end__');

const app = graph.compile();

const result = await app.invoke({
step: 'initial',
data: {},
metrics: {},
});

expect(result.metrics.duration).toBeLessThan(100); // 100ms
expect(result.data.itemsProcessed).toBe(100);
expect(result.metrics.throughput).toBeGreaterThan(1000); // items/second
});

it('中等工作负载性能测试', async () => {
const graph = new StateGraph(PerformanceTestState);

graph
.addNode('medium_work', createPerformanceTestNode('medium'))
.addEdge('__start__', 'medium_work')
.addEdge('medium_work', '__end__');

const app = graph.compile();

const result = await app.invoke({
step: 'initial',
data: {},
metrics: {},
});

expect(result.metrics.duration).toBeLessThan(1000); // 1s
expect(result.data.itemsProcessed).toBe(1000);
expect(result.metrics.throughput).toBeGreaterThan(100); // items/second
});

it('重量级工作负载性能测试', async () => {
const graph = new StateGraph(PerformanceTestState);

graph
.addNode('heavy_work', createPerformanceTestNode('heavy'))
.addEdge('__start__', 'heavy_work')
.addEdge('heavy_work', '__end__');

const app = graph.compile();

const result = await app.invoke({
step: 'initial',
data: {},
metrics: {},
});

expect(result.metrics.duration).toBeLessThan(5000); // 5s
expect(result.data.itemsProcessed).toBe(10000);
expect(result.metrics.throughput).toBeGreaterThan(1000); // items/second
}, 10000); // 10s timeout
});

describe('并发性能测试', () => {
it('应该能够处理并发请求', async () => {
const graph = new StateGraph(PerformanceTestState);

graph
.addNode('concurrent_work', createPerformanceTestNode('medium'))
.addEdge('__start__', 'concurrent_work')
.addEdge('concurrent_work', '__end__');

const app = graph.compile();

const concurrentRequests = 5;
const startTime = performance.now();

const promises = Array.from({ length: concurrentRequests }, () =>
app.invoke({
step: 'initial',
data: {},
metrics: {},
})
);

const results = await Promise.all(promises);
const totalTime = performance.now() - startTime;

// 验证所有请求都成功完成
results.forEach((result) => {
expect(result.data.itemsProcessed).toBe(1000);
});

// 并发执行应该比串行执行快
const averageSequentialTime =
results.reduce((sum, r) => sum + r.metrics.duration, 0) /
results.length;
expect(totalTime).toBeLessThan(
averageSequentialTime * concurrentRequests * 0.8
);
});
});

describe('内存使用测试', () => {
it('应该监控内存使用情况', async () => {
const graph = new StateGraph(PerformanceTestState);

graph
.addNode('memory_test', createPerformanceTestNode('heavy'))
.addEdge('__start__', 'memory_test')
.addEdge('memory_test', '__end__');

const app = graph.compile();

const result = await app.invoke({
step: 'initial',
data: {},
metrics: {},
});

// 检查是否收集了内存使用数据
if (result.metrics.memoryUsage) {
expect(typeof result.metrics.memoryUsage.heapUsed).toBe('number');
expect(typeof result.metrics.memoryUsage.heapTotal).toBe('number');
}
});
});

describe('性能回归测试', () => {
it('性能不应该显著下降', async () => {
const graph = new StateGraph(PerformanceTestState);

graph
.addNode('regression_test', createPerformanceTestNode('medium'))
.addEdge('__start__', 'regression_test')
.addEdge('regression_test', '__end__');

const app = graph.compile();

// 运行多次测试获取基准
const runs = 5;
const results = [];

for (let i = 0; i < runs; i++) {
const result = await app.invoke({
step: 'initial',
data: {},
metrics: {},
});
results.push(result.metrics.duration);
}

const averageTime = results.reduce((sum, time) => sum + time, 0) / runs;
const maxTime = Math.max(...results);
const minTime = Math.min(...results);

// 性能变化不应该超过50%
const variance = (maxTime - minTime) / averageTime;
expect(variance).toBeLessThan(0.5);

// 平均时间应该在合理范围内
expect(averageTime).toBeLessThan(1000); // 1s
});
});

describe('资源限制测试', () => {
it('应该在资源限制下正常工作', async () => {
const resourceLimitedNode = async (
state: typeof PerformanceTestState.State
) => {
const monitor = new PerformanceMonitor();
monitor.start();

// 模拟资源受限环境
const maxIterations = 1000;
const results = [];

for (let i = 0; i < maxIterations; i++) {
// 每100次迭代检查时间限制
if (i % 100 === 0) {
const currentMetrics = monitor.end();
if (currentMetrics.duration > 500) {
// 500ms 限制
break;
}
monitor.start(); // 重新开始计时
}

results.push(Math.random() * i);
}

const finalMetrics = monitor.end();

return {
step: 'resource_limited_completed',
data: { itemsProcessed: results.length },
metrics: finalMetrics,
};
};

const graph = new StateGraph(PerformanceTestState);

graph
.addNode('resource_limited', resourceLimitedNode)
.addEdge('__start__', 'resource_limited')
.addEdge('resource_limited', '__end__');

const app = graph.compile();

const result = await app.invoke({
step: 'initial',
data: {},
metrics: {},
});

// 应该在时间限制内完成
expect(result.metrics.duration).toBeLessThan(600); // 稍微超过限制是可以接受的
expect(result.data.itemsProcessed).toBeGreaterThan(0);
});
});
});

// 导出性能测试工具
export {
PerformanceTestState,
PerformanceMonitor,
createPerformanceTestNode,
lightWorkload,
mediumWorkload,
heavyWorkload,
};

/*
执行结果:
RUN v3.2.4 /Users/loock/myFile/langgraphjs-tutorial/websites

stdout | examples/最佳实践/测试策略/performance-tests.test.ts
Line:8 🌭 path.resolve(__dirname, '.env') /Users/loock/myFile/langgraphjs-tutorial/websites/examples/utils/.env

✓ examples/最佳实践/测试策略/performance-tests.test.ts (7 tests) 353ms

Test Files 1 passed (1)
Tests 7 passed (7)
Start at 02:40:15
Duration 3.61s (transform 52ms, setup 422ms, collect 419ms, tests 353ms, environment 865ms, prepare 343ms)
*/

负载测试

负载测试
/**
* ============================================================================
* 负载测试 - Load Testing
* ============================================================================
*
* 📖 概述
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* 本文件展示如何对 LangGraph 应用进行全面的负载测试和性能测试,包括低负载、
* 中等负载、高负载、压力测试、峰值测试和耐久性测试。通过 LoadTester 类
* 模拟不同数量的并发用户和不同的测试持续时间,收集并分析性能指标。
*
* 🎯 核心功能
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* 1️⃣ LoadTester 类:并发负载测试工具,支持多用户持续请求
* 2️⃣ 性能指标收集:吞吐量、平均响应时间、P50/P95/P99 百分位数等
* 3️⃣ 基本负载测试:测试低、中、高三种负载下的应用表现
* 4️⃣ 压力测试:测试极高负载下的系统稳定性
* 5️⃣ 峰值测试:测试突发大量请求时的应对能力
* 6️⃣ 耐久性测试:测试长时间运行下的性能稳定性
*
* 💡 实现思路
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* • 使用多个 worker 并发执行请求,模拟多用户场景
* • 在指定时间内持续发送请求,收集所有请求的结果
* • 计算成功率、吞吐量、响应时间等关键性能指标
* • 对响应时间排序后计算百分位数(P50/P95/P99)
* • 使用 Promise.allSettled 处理突发流量测试
* • 计算变异系数(标准差/平均值)评估性能稳定性
*
* 🚀 使用方式
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* // 运行测试
* $ npx vitest 最佳实践/测试策略/load-tests.ts
*
* ⚠️ 注意事项
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* • 负载测试会占用较多系统资源,建议在专门的测试环境运行
* • 测试超时时间需要设置得足够长(如 10000ms、15000ms)
* • 性能阈值应该基于实际需求和环境设定,避免过于严格或宽松
* • 不同环境(开发机、CI、生产)的性能差异较大,需要分别设置基准
* • 峰值测试和压力测试允许更低的成功率
* • 耐久性测试通过变异系数评估性能是否随时间降低
*
* @author 程哥
* @version 1.0.0
* @updated 2025-11
*/

import '../../utils/loadEnv';
import { describe, it, expect } from 'vitest';
import { StateGraph, Annotation } from '@langchain/langgraph';

// 负载测试状态定义
const LoadTestState = Annotation.Root({
step: Annotation<string>({
reducer: (x, y) => y,
}),
data: Annotation<any>({
reducer: (x, y) => ({ ...x, ...y }),
}),
metrics: Annotation<any>({
reducer: (x, y) => ({ ...x, ...y }),
}),
});

// 负载测试工具
class LoadTester {
private results: any[] = [];
private startTime: number = 0;
private endTime: number = 0;

async runConcurrentLoad(
app: any,
concurrency: number,
duration: number,
payload: any
) {
this.startTime = performance.now();
const endTime = this.startTime + duration;
const promises: Promise<any>[] = [];

// 启动并发请求
for (let i = 0; i < concurrency; i++) {
promises.push(this.runContinuousRequests(app, endTime, payload, i));
}

await Promise.all(promises);
this.endTime = performance.now();

return this.getResults();
}

private async runContinuousRequests(
app: any,
endTime: number,
payload: any,
workerId: number
) {
const workerResults: any[] = [];

while (performance.now() < endTime) {
const requestStart = performance.now();

try {
const result = await app.invoke(payload);
const requestEnd = performance.now();

workerResults.push({
workerId,
duration: requestEnd - requestStart,
success: true,
timestamp: requestStart,
result,
});
} catch (error) {
const requestEnd = performance.now();

workerResults.push({
workerId,
duration: requestEnd - requestStart,
success: false,
timestamp: requestStart,
error: error.message,
});
}

// 短暂休息避免过度占用资源
await new Promise((resolve) => setTimeout(resolve, 1));
}

this.results.push(...workerResults);
}

private getResults() {
const totalDuration = this.endTime - this.startTime;
const successfulRequests = this.results.filter((r) => r.success);
const failedRequests = this.results.filter((r) => !r.success);

const durations = successfulRequests.map((r) => r.duration);
const avgDuration =
durations.reduce((sum, d) => sum + d, 0) / durations.length;
const minDuration = Math.min(...durations);
const maxDuration = Math.max(...durations);

// 计算百分位数
const sortedDurations = durations.sort((a, b) => a - b);
const p50 = sortedDurations[Math.floor(sortedDurations.length * 0.5)];
const p95 = sortedDurations[Math.floor(sortedDurations.length * 0.95)];
const p99 = sortedDurations[Math.floor(sortedDurations.length * 0.99)];

return {
totalRequests: this.results.length,
successfulRequests: successfulRequests.length,
failedRequests: failedRequests.length,
successRate: (successfulRequests.length / this.results.length) * 100,
totalDuration,
throughput: this.results.length / (totalDuration / 1000),
avgDuration,
minDuration,
maxDuration,
p50Duration: p50,
p95Duration: p95,
p99Duration: p99,
results: this.results,
};
}
}

// 创建负载测试节点
const createLoadTestNode = (processingTime: number = 10) => {
return async (state: typeof LoadTestState.State) => {
// 模拟处理时间
await new Promise((resolve) => setTimeout(resolve, processingTime));

return {
step: 'load_processed',
data: {
processed: true,
processingTime,
timestamp: Date.now(),
},
};
};
};

describe('负载测试', () => {
describe('基本负载测试', () => {
it('应该处理低负载', async () => {
const graph = new StateGraph(LoadTestState);

graph
.addNode('load_node', createLoadTestNode(5))
.addEdge('__start__', 'load_node')
.addEdge('load_node', '__end__');

const app = graph.compile();
const loadTester = new LoadTester();

const results = await loadTester.runConcurrentLoad(
app,
2, // 2个并发用户
1000, // 1秒测试时间
{
step: 'initial',
data: {},
metrics: {},
}
);

expect(results.successRate).toBeGreaterThan(95);
expect(results.avgDuration).toBeLessThan(100);
expect(results.throughput).toBeGreaterThan(10);
});

it('应该处理中等负载', async () => {
const graph = new StateGraph(LoadTestState);

graph
.addNode('load_node', createLoadTestNode(10))
.addEdge('__start__', 'load_node')
.addEdge('load_node', '__end__');

const app = graph.compile();
const loadTester = new LoadTester();

const results = await loadTester.runConcurrentLoad(
app,
5, // 5个并发用户
2000, // 2秒测试时间
{
step: 'initial',
data: {},
metrics: {},
}
);

expect(results.successRate).toBeGreaterThan(90);
expect(results.avgDuration).toBeLessThan(200);
expect(results.throughput).toBeGreaterThan(20);
});

it('应该处理高负载', async () => {
const graph = new StateGraph(LoadTestState);

graph
.addNode('load_node', createLoadTestNode(20))
.addEdge('__start__', 'load_node')
.addEdge('load_node', '__end__');

const app = graph.compile();
const loadTester = new LoadTester();

const results = await loadTester.runConcurrentLoad(
app,
10, // 10个并发用户
3000, // 3秒测试时间
{
step: 'initial',
data: {},
metrics: {},
}
);

expect(results.successRate).toBeGreaterThan(80);
expect(results.p95Duration).toBeLessThan(500);
expect(results.throughput).toBeGreaterThan(15);
}, 10000);
});

describe('压力测试', () => {
it('应该在极高负载下保持稳定', async () => {
const graph = new StateGraph(LoadTestState);

graph
.addNode('stress_node', createLoadTestNode(50))
.addEdge('__start__', 'stress_node')
.addEdge('stress_node', '__end__');

const app = graph.compile();
const loadTester = new LoadTester();

const results = await loadTester.runConcurrentLoad(
app,
20, // 20个并发用户
5000, // 5秒测试时间
{
step: 'initial',
data: {},
metrics: {},
}
);

// 在压力测试下,允许更低的成功率
expect(results.successRate).toBeGreaterThan(70);
expect(results.p99Duration).toBeLessThan(1000);
expect(results.totalRequests).toBeGreaterThan(100);
}, 15000);
});

describe('峰值测试', () => {
it('应该处理突发流量', async () => {
const graph = new StateGraph(LoadTestState);

graph
.addNode('spike_node', createLoadTestNode(15))
.addEdge('__start__', 'spike_node')
.addEdge('spike_node', '__end__');

const app = graph.compile();

// 模拟突发流量:快速发送大量请求
const spikeRequests = 50;
const promises = Array.from({ length: spikeRequests }, () =>
app.invoke({
step: 'initial',
data: {},
metrics: {},
})
);

const startTime = performance.now();
const results = await Promise.allSettled(promises);
const endTime = performance.now();

const successful = results.filter((r) => r.status === 'fulfilled').length;
const failed = results.filter((r) => r.status === 'rejected').length;
const successRate = (successful / spikeRequests) * 100;
const duration = endTime - startTime;

expect(successRate).toBeGreaterThan(80);
expect(duration).toBeLessThan(5000); // 5秒内完成
expect(successful).toBeGreaterThan(40);
});
});

describe('耐久性测试', () => {
it('应该在长时间运行中保持性能', async () => {
const graph = new StateGraph(LoadTestState);

graph
.addNode('endurance_node', createLoadTestNode(25))
.addEdge('__start__', 'endurance_node')
.addEdge('endurance_node', '__end__');

const app = graph.compile();
const loadTester = new LoadTester();

const results = await loadTester.runConcurrentLoad(
app,
3, // 3个并发用户
10000, // 10秒测试时间
{
step: 'initial',
data: {},
metrics: {},
}
);

expect(results.successRate).toBeGreaterThan(95);
expect(results.avgDuration).toBeLessThan(100);

// 检查性能是否稳定(标准差不应该太大)
const durations = results.results
.filter((r) => r.success)
.map((r) => r.duration);

const mean = durations.reduce((sum, d) => sum + d, 0) / durations.length;
const variance =
durations.reduce((sum, d) => sum + Math.pow(d - mean, 2), 0) /
durations.length;
const stdDev = Math.sqrt(variance);

expect(stdDev / mean).toBeLessThan(0.5); // 变异系数小于50%
}, 15000);
});
});

// 导出负载测试工具
export { LoadTestState, LoadTester, createLoadTestNode };

/*
执行结果:
RUN v3.2.4 /Users/loock/myFile/langgraphjs-tutorial/websites

stdout | examples/最佳实践/测试策略/load-tests.test.ts
Line:8 🌭 path.resolve(__dirname, '.env') /Users/loock/myFile/langgraphjs-tutorial/websites/examples/utils/.env

✓ examples/最佳实践/测试策略/load-tests.test.ts (6 tests) 21110ms
✓ 负载测试 > 基本负载测试 > 应该处理低负载 1011ms
✓ 负载测试 > 基本负载测试 > 应该处理中等负载 2006ms
✓ 负载测试 > 基本负载测试 > 应该处理高负载 3022ms
✓ 负载测试 > 压力测试 > 应该在极高负载下保持稳定 5018ms
✓ 负载测试 > 耐久性测试 > 应该在长时间运行中保持性能 10015ms

Test Files 1 passed (1)
Tests 6 passed (6)
Start at 02:39:25
Duration 24.80s (transform 138ms, setup 456ms, collect 650ms, tests 21.11s, environment 865ms, prepare 324ms)
*/

🔍 测试工具和框架

推荐工具栈

// 测试框架
import { describe, it, expect, beforeEach, afterEach } from 'vitest';

// 模拟工具
import { vi } from 'vitest';

// 测试工具
import { render, screen, fireEvent } from '@testing-library/react';
import { waitFor } from '@testing-library/dom';

// 自定义测试工具
import { createTestGraph, mockLLM, createTestState } from './test-utils';

测试配置

测试配置
/**
* ============================================================================
* 测试配置 - Test Configuration
* ============================================================================
*
* 📖 概述
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* 本文件提供统一的测试环境配置、测试工具设置、全局测试fixtures、测试辅助函数以及测试常量,为所有测试提供一致的配置和工具支持。
*
* 🎯 核心功能
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* 1️⃣ 测试配置管理:统一的测试超时、重试等配置
* 2️⃣ 测试环境设置:测试环境变量和全局配置
* 3️⃣ 测试Fixtures:可复用的测试数据和对象
* 4️⃣ 测试工具函数:通用的测试辅助函数
*
* 💡 实现思路
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* • 使用接口定义测试配置结构
* • 提供默认配置和自定义配置合并
* • 导出全局测试常量和枚举
* • 提供测试环境初始化函数
*
* 🚀 使用方式
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* // 运行测试
* $ npx vitest 最佳实践/测试策略/test-config.ts
*
* ⚠️ 注意事项
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* • 配置应该易于覆盖和定制
* • 区分开发、CI、生产环境的配置
* • 提供合理的默认值
* • 配置应该类型安全
*
* @author 程哥
* @version 1.0.0
* @updated 2025-11
*/

import '../../utils/loadEnv';
// 测试配置管理
export interface TestConfig {
timeout: number;
retries: number;
parallel: boolean;
coverage: boolean;
environment: 'development' | 'testing' | 'production';
mockServices: boolean;
logLevel: 'debug' | 'info' | 'warn' | 'error';
}

// 默认测试配置
export const defaultTestConfig: TestConfig = {
timeout: 5000,
retries: 2,
parallel: true,
coverage: true,
environment: 'testing',
mockServices: true,
logLevel: 'info',
};

// 环境特定配置
export const testConfigs = {
unit: {
...defaultTestConfig,
timeout: 1000,
parallel: true,
mockServices: true,
},
integration: {
...defaultTestConfig,
timeout: 10000,
parallel: false,
mockServices: false,
},
e2e: {
...defaultTestConfig,
timeout: 30000,
retries: 3,
parallel: false,
mockServices: false,
},
};

// 测试环境设置
export class TestEnvironment {
private config: TestConfig;

constructor(config: Partial<TestConfig> = {}) {
this.config = { ...defaultTestConfig, ...config };
}

getConfig(): TestConfig {
return this.config;
}

updateConfig(updates: Partial<TestConfig>): void {
this.config = { ...this.config, ...updates };
}

isProduction(): boolean {
return this.config.environment === 'production';
}

shouldMockServices(): boolean {
return this.config.mockServices;
}

getTimeout(): number {
return this.config.timeout;
}
}

// 测试数据管理
export class TestDataManager {
private testData: Map<string, any> = new Map();

setTestData(key: string, data: any): void {
this.testData.set(key, data);
}

getTestData(key: string): any {
return this.testData.get(key);
}

clearTestData(): void {
this.testData.clear();
}

// 生成测试用的状态数据
generateTestState(overrides: any = {}): any {
return {
step: 'test_step',
data: { test: true },
messages: [],
...overrides,
};
}

// 生成随机测试数据
generateRandomData(type: 'string' | 'number' | 'array' | 'object'): any {
switch (type) {
case 'string':
return Math.random().toString(36).substring(7);
case 'number':
return Math.floor(Math.random() * 1000);
case 'array':
return Array.from({ length: 5 }, () => Math.random());
case 'object':
return {
id: Math.random().toString(36).substring(7),
value: Math.random(),
timestamp: Date.now(),
};
default:
return null;
}
}
}

// 测试工具类
export class TestUtils {
static async waitFor(
condition: () => boolean | Promise<boolean>,
timeout: number = 5000,
interval: number = 100
): Promise<void> {
const startTime = Date.now();

while (Date.now() - startTime < timeout) {
const result = await condition();
if (result) {
return;
}
await new Promise((resolve) => setTimeout(resolve, interval));
}

throw new Error(`Condition not met within ${timeout}ms`);
}

static createMockFunction<T extends (...args: any[]) => any>(
implementation?: T
): T {
const mockFn = (implementation || (() => {})) as T;
return mockFn;
}

static async measureExecutionTime<T>(
fn: () => Promise<T>
): Promise<{ result: T; duration: number }> {
const startTime = performance.now();
const result = await fn();
const endTime = performance.now();

return {
result,
duration: endTime - startTime,
};
}

static generateTestId(): string {
return `test_${Date.now()}_${Math.random().toString(36).substring(7)}`;
}
}

// 测试断言辅助函数
export class TestAssertions {
static assertStateShape(state: any, expectedShape: any): void {
for (const key in expectedShape) {
if (!(key in state)) {
throw new Error(`Expected state to have property: ${key}`);
}

const expectedType = typeof expectedShape[key];
const actualType = typeof state[key];

if (expectedType !== actualType) {
throw new Error(
`Expected ${key} to be ${expectedType}, got ${actualType}`
);
}
}
}

static assertGraphStructure(graph: any, expectedNodes: string[]): void {
const actualNodes = Object.keys(graph.nodes || {});

for (const node of expectedNodes) {
if (!actualNodes.includes(node)) {
throw new Error(`Expected graph to have node: ${node}`);
}
}
}

static assertPerformance(
duration: number,
maxDuration: number,
operation: string
): void {
if (duration > maxDuration) {
throw new Error(
`${operation} took ${duration}ms, expected less than ${maxDuration}ms`
);
}
}
}

/*
执行结果:
Line:8 🌭 path.resolve(__dirname, '.env') /Users/loock/myFile/langgraphjs-tutorial/websites/examples/utils/.env
*/

📋 测试最佳实践

测试组织

测试组织结构
/**
* ============================================================================
* 测试组织 - Test Organization
* ============================================================================
*
* 📖 概述
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* 本文件展示如何组织和结构化测试代码,包括测试文件组织策略、测试套件划分、测试命名规范、测试代码复用以及测试文档化,提升测试的可维护性和可读性。
*
* 🎯 核心功能
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* 1️⃣ 测试文件组织:按功能、模块或层次组织测试文件
* 2️⃣ 测试套件划分:使用describe嵌套组织相关测试
* 3️⃣ 命名规范:统一的测试用例命名约定
* 4️⃣ 代码复用:提取公共测试逻辑和辅助函数
*
* 💡 实现思路
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* • 使用describe组织相关测试用例
* • 采用Given-When-Then或AAA模式
* • 提取通用setup和teardown逻辑
* • 使用test.each进行参数化测试
*
* 🚀 使用方式
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* // 运行测试
* $ npx vitest 最佳实践/测试策略/test-organization.ts
*
* ⚠️ 注意事项
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* • 测试文件应该与源文件对应
* • 测试套件应该有清晰的层次结构
* • 测试用例名称应该描述测试意图
* • 避免测试之间的依赖关系
*
* @author 程哥
* @version 1.0.0
* @updated 2025-11
*/

import '../../utils/loadEnv';
import { describe, it, expect, beforeEach, afterEach } from 'vitest';
import { StateGraph, Annotation } from '@langchain/langgraph';

// 测试组织结构示例

// 基础测试状态
const BaseTestState = Annotation.Root({
step: Annotation<string>({
reducer: (x, y) => y,
}),
data: Annotation<any>({
reducer: (x, y) => ({ ...x, ...y }),
}),
});

// 测试套件组织类
class TestSuiteOrganizer {
private testGroups: Map<string, any[]> = new Map();
private sharedSetup: any = {};
private sharedTeardown: any = {};

addTestGroup(name: string, tests: any[]): void {
this.testGroups.set(name, tests);
}

setSharedSetup(setup: any): void {
this.sharedSetup = setup;
}

setSharedTeardown(teardown: any): void {
this.sharedTeardown = teardown;
}

getTestGroups(): Map<string, any[]> {
return this.testGroups;
}
}

// 测试数据工厂
class TestDataFactory {
static createUser(overrides: any = {}) {
return {
id: `user_${Date.now()}`,
name: 'Test User',
email: 'test@example.com',
role: 'user',
...overrides,
};
}

static createGraph(overrides: any = {}) {
const graph = new StateGraph(BaseTestState);

const defaultNode = async (state: typeof BaseTestState.State) => {
return { step: 'completed', data: { processed: true } };
};

graph
.addNode('process', overrides.nodeFunction || defaultNode)
.addEdge('__start__', 'process')
.addEdge('process', '__end__');

return graph.compile();
}

static createTestState(overrides: any = {}) {
return {
step: 'initial',
data: {},
...overrides,
};
}
}

// 测试助手类
class TestHelpers {
static async waitFor(
condition: () => boolean,
timeout: number = 5000
): Promise<void> {
const start = Date.now();
while (!condition() && Date.now() - start < timeout) {
await new Promise((resolve) => setTimeout(resolve, 100));
}
if (!condition()) {
throw new Error('Condition not met within timeout');
}
}

static createMockFunction(returnValue?: any) {
let callCount = 0;
const calls: any[] = [];

const mockFn = (...args: any[]) => {
callCount++;
calls.push(args);
return returnValue;
};

mockFn.callCount = () => callCount;
mockFn.calls = () => calls;
mockFn.reset = () => {
callCount = 0;
calls.length = 0;
};

return mockFn;
}

static deepClone(obj: any): any {
return JSON.parse(JSON.stringify(obj));
}
}

// 测试断言扩展
class CustomAssertions {
static expectGraphToComplete(
result: any,
expectedStep: string = 'completed'
) {
expect(result).toBeDefined();
expect(result.step).toBe(expectedStep);
}

static expectStateToContain(state: any, expectedKeys: string[]) {
expectedKeys.forEach((key) => {
expect(state).toHaveProperty(key);
});
}

static expectTimingToBe(
actualTime: number,
expectedTime: number,
tolerance: number = 100
) {
expect(Math.abs(actualTime - expectedTime)).toBeLessThanOrEqual(tolerance);
}
}

// 示例:组织良好的测试套件
describe('测试组织示例', () => {
let organizer: TestSuiteOrganizer;
let testData: any;

beforeEach(() => {
organizer = new TestSuiteOrganizer();
testData = {
users: [],
graphs: [],
states: [],
};
});

afterEach(() => {
// 清理测试数据
testData = null;
});

describe('单元测试组', () => {
describe('数据工厂测试', () => {
it('应该创建默认用户', () => {
const user = TestDataFactory.createUser();

expect(user).toHaveProperty('id');
expect(user).toHaveProperty('name', 'Test User');
expect(user).toHaveProperty('email', 'test@example.com');
expect(user).toHaveProperty('role', 'user');
});

it('应该创建自定义用户', () => {
const customUser = TestDataFactory.createUser({
name: 'Custom User',
role: 'admin',
});

expect(customUser.name).toBe('Custom User');
expect(customUser.role).toBe('admin');
expect(customUser.email).toBe('test@example.com'); // 默认值保持
});

it('应该创建基础图', async () => {
const graph = TestDataFactory.createGraph();
const result = await graph.invoke(TestDataFactory.createTestState());

CustomAssertions.expectGraphToComplete(result);
expect(result.data.processed).toBe(true);
});
});

describe('测试助手测试', () => {
it('应该等待条件满足', async () => {
let condition = false;
setTimeout(() => {
condition = true;
}, 200);

await TestHelpers.waitFor(() => condition);
expect(condition).toBe(true);
});

it('应该在超时时抛出错误', async () => {
const condition = () => false;

await expect(TestHelpers.waitFor(condition, 100)).rejects.toThrow(
'Condition not met within timeout'
);
});

it('应该创建模拟函数', () => {
const mockFn = TestHelpers.createMockFunction('test result');

const result1 = mockFn('arg1', 'arg2');
const result2 = mockFn('arg3');

expect(result1).toBe('test result');
expect(result2).toBe('test result');
expect(mockFn.callCount()).toBe(2);
expect(mockFn.calls()).toEqual([['arg1', 'arg2'], ['arg3']]);
});

it('应该深度克隆对象', () => {
const original = { a: 1, b: { c: 2 } };
const cloned = TestHelpers.deepClone(original);

cloned.b.c = 3;

expect(original.b.c).toBe(2);
expect(cloned.b.c).toBe(3);
});
});

describe('自定义断言测试', () => {
it('应该验证图完成状态', async () => {
const graph = TestDataFactory.createGraph();
const result = await graph.invoke(TestDataFactory.createTestState());

CustomAssertions.expectGraphToComplete(result);
});

it('应该验证状态包含指定键', () => {
const state = { step: 'test', data: { key1: 'value1' }, extra: 'info' };

CustomAssertions.expectStateToContain(state, ['step', 'data', 'extra']);
});

it('应该验证时间在容差范围内', () => {
const actualTime = 1000;
const expectedTime = 1050;

CustomAssertions.expectTimingToBe(actualTime, expectedTime, 100);
});
});
});

describe('集成测试组', () => {
describe('完整流程测试', () => {
it('应该执行完整的用户创建流程', async () => {
// 准备测试数据
const userData = TestDataFactory.createUser({
name: 'Integration User',
});

// 创建处理图
const processUserNode = async (state: typeof BaseTestState.State) => {
const { data } = state;
return {
step: 'user_processed',
data: {
...data,
user: userData,
processedAt: Date.now(),
},
};
};

const graph = TestDataFactory.createGraph({
nodeFunction: processUserNode,
});

// 执行测试
const result = await graph.invoke(
TestDataFactory.createTestState({
data: { operation: 'create_user' },
})
);

// 验证结果
expect(result.step).toBe('user_processed');
expect(result.data.user.name).toBe('Integration User');
expect(result.data.processedAt).toBeDefined();
});

it('应该处理错误情况', async () => {
const errorNode = async (state: typeof BaseTestState.State) => {
throw new Error('Processing failed');
};

const graph = TestDataFactory.createGraph({ nodeFunction: errorNode });

await expect(
graph.invoke(TestDataFactory.createTestState())
).rejects.toThrow('Processing failed');
});
});
});

describe('性能测试组', () => {
it('应该在合理时间内完成处理', async () => {
const graph = TestDataFactory.createGraph();

const startTime = performance.now();
await graph.invoke(TestDataFactory.createTestState());
const endTime = performance.now();

const processingTime = endTime - startTime;
expect(processingTime).toBeLessThan(100); // 100ms内完成
});

it('应该支持批量处理', async () => {
const graph = TestDataFactory.createGraph();
const batchSize = 10;

const promises = Array.from({ length: batchSize }, () =>
graph.invoke(TestDataFactory.createTestState())
);

const startTime = performance.now();
const results = await Promise.all(promises);
const endTime = performance.now();

expect(results).toHaveLength(batchSize);
results.forEach((result) => {
CustomAssertions.expectGraphToComplete(result);
});

const totalTime = endTime - startTime;
expect(totalTime).toBeLessThan(1000); // 1秒内完成批量处理
});
});
});

// 导出测试工具
export {
TestSuiteOrganizer,
TestDataFactory,
TestHelpers,
CustomAssertions,
BaseTestState,
};

/*
执行结果:
RUN v3.2.4 /Users/loock/myFile/langgraphjs-tutorial/websites

stdout | examples/最佳实践/测试策略/test-organization.test.ts
Line:8 🌭 path.resolve(__dirname, '.env') /Users/loock/myFile/langgraphjs-tutorial/websites/examples/utils/.env

✓ examples/最佳实践/测试策略/test-organization.test.ts (14 tests) 348ms

Test Files 1 passed (1)
Tests 14 passed (14)
Start at 02:39:30
Duration 2.10s (transform 74ms, setup 266ms, collect 295ms, tests 348ms, environment 418ms, prepare 427ms)
*/

测试数据管理

测试数据管理
/**
* ============================================================================
* 测试数据管理 - Test Data Management
* ============================================================================
*
* 📖 概述
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* 本文件展示如何管理和组织测试数据,包括测试数据生成器、测试fixture管理、测试数据库设置、测试数据清理以及测试数据隔离策略,确保测试的独立性和可维护性。
*
* 🎯 核心功能
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* 1️⃣ 测试数据生成器:自动生成各种类型的测试数据
* 2️⃣ Fixture管理:管理可复用的测试数据和对象
* 3️⃣ 数据清理:在测试前后清理测试数据
* 4️⃣ 数据隔离:确保测试之间的数据独立性
*
* 💡 实现思路
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* • 使用工厂函数生成测试数据
* • 提供Builder模式构建复杂测试对象
* • 使用beforeEach/afterEach管理数据生命周期
* • 为不同测试场景提供专用数据集
*
* 🚀 使用方式
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* // 运行测试
* $ npx vitest 最佳实践/测试策略/test-data-management.ts
*
* ⚠️ 注意事项
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* • 测试数据应该具有代表性和多样性
* • 避免测试之间共享可变数据
* • 使用有意义的测试数据名称
* • 定期清理测试数据,避免数据积累
*
* @author 程哥
* @version 1.0.0
* @updated 2025-11
*/

import '../../utils/loadEnv';
import { describe, it, expect, beforeEach, afterEach } from 'vitest';
import { StateGraph, Annotation } from '@langchain/langgraph';

// 测试数据管理示例

// 测试数据状态定义
const TestDataState = Annotation.Root({
step: Annotation<string>({
reducer: (x, y) => y,
}),
testData: Annotation<any>({
reducer: (x, y) => ({ ...x, ...y }),
}),
results: Annotation<any[]>({
reducer: (x, y) => [...(x || []), ...y],
}),
});

// 测试数据管理器
class TestDataManager {
private static instance: TestDataManager;
private dataStore: Map<string, any> = new Map();
private fixtures: Map<string, any> = new Map();

static getInstance(): TestDataManager {
if (!TestDataManager.instance) {
TestDataManager.instance = new TestDataManager();
}
return TestDataManager.instance;
}

// 设置测试夹具
setFixture(name: string, data: any): void {
this.fixtures.set(name, data);
}

// 获取测试夹具
getFixture(name: string): any {
return this.fixtures.get(name);
}

// 存储测试数据
store(key: string, data: any): void {
this.dataStore.set(key, data);
}

// 获取测试数据
retrieve(key: string): any {
return this.dataStore.get(key);
}

// 清理所有数据
clear(): void {
this.dataStore.clear();
this.fixtures.clear();
}

// 生成测试数据
generateTestData(type: string, count: number = 1): any[] {
const data = [];
for (let i = 0; i < count; i++) {
switch (type) {
case 'user':
data.push({
id: `user_${i + 1}`,
name: `Test User ${i + 1}`,
email: `user${i + 1}@test.com`,
createdAt: new Date().toISOString(),
});
break;
case 'message':
data.push({
id: `msg_${i + 1}`,
content: `Test message ${i + 1}`,
timestamp: Date.now() + i * 1000,
type: 'user',
});
break;
case 'graph_state':
data.push({
step: `step_${i + 1}`,
testData: { iteration: i + 1 },
results: [],
});
break;
default:
data.push({ id: i + 1, type, value: `test_${i + 1}` });
}
}
return count === 1 ? data[0] : data;
}
}

// 测试数据构建器
class TestDataBuilder {
private data: any = {};

static create(): TestDataBuilder {
return new TestDataBuilder();
}

withUser(userData: any = {}): TestDataBuilder {
this.data.user = {
id: 'test_user',
name: 'Test User',
email: 'test@example.com',
...userData,
};
return this;
}

withMessages(count: number = 3): TestDataBuilder {
this.data.messages = Array.from({ length: count }, (_, i) => ({
id: `msg_${i + 1}`,
content: `Message ${i + 1}`,
timestamp: Date.now() + i * 1000,
type: i % 2 === 0 ? 'user' : 'assistant',
}));
return this;
}

withGraphState(stateData: any = {}): TestDataBuilder {
this.data.graphState = {
step: 'initial',
testData: {},
results: [],
...stateData,
};
return this;
}

withCustomData(key: string, value: any): TestDataBuilder {
this.data[key] = value;
return this;
}

build(): any {
return { ...this.data };
}
}

// 测试数据验证器
class TestDataValidator {
static validateUser(user: any): boolean {
return (
user &&
typeof user.id === 'string' &&
typeof user.name === 'string' &&
typeof user.email === 'string' &&
user.email.includes('@')
);
}

static validateMessage(message: any): boolean {
return (
message &&
typeof message.id === 'string' &&
typeof message.content === 'string' &&
typeof message.timestamp === 'number' &&
['user', 'assistant'].includes(message.type)
);
}

static validateGraphState(state: any): boolean {
return (
state &&
typeof state.step === 'string' &&
typeof state.testData === 'object' &&
Array.isArray(state.results)
);
}
}

// 测试数据清理器
class TestDataCleaner {
private cleanupTasks: (() => void)[] = [];

addCleanupTask(task: () => void): void {
this.cleanupTasks.push(task);
}

cleanup(): void {
this.cleanupTasks.forEach((task) => {
try {
task();
} catch (error) {
console.warn('Cleanup task failed:', error);
}
});
this.cleanupTasks = [];
}
}

describe('测试数据管理', () => {
let dataManager: TestDataManager;
let dataCleaner: TestDataCleaner;

beforeEach(() => {
dataManager = TestDataManager.getInstance();
dataCleaner = new TestDataCleaner();
});

afterEach(() => {
dataManager.clear();
dataCleaner.cleanup();
});

describe('数据管理器测试', () => {
it('应该存储和检索测试数据', () => {
const testData = { id: 1, name: 'Test' };

dataManager.store('test_key', testData);
const retrieved = dataManager.retrieve('test_key');

expect(retrieved).toEqual(testData);
});

it('应该管理测试夹具', () => {
const fixture = { users: [{ id: 1, name: 'User 1' }] };

dataManager.setFixture('user_fixture', fixture);
const retrieved = dataManager.getFixture('user_fixture');

expect(retrieved).toEqual(fixture);
});

it('应该生成测试数据', () => {
const users = dataManager.generateTestData('user', 3);

expect(users).toHaveLength(3);
users.forEach((user, index) => {
expect(user.id).toBe(`user_${index + 1}`);
expect(user.name).toBe(`Test User ${index + 1}`);
expect(TestDataValidator.validateUser(user)).toBe(true);
});
});

it('应该生成单个测试数据', () => {
const user = dataManager.generateTestData('user', 1);

expect(user).toHaveProperty('id', 'user_1');
expect(user).toHaveProperty('name', 'Test User 1');
expect(TestDataValidator.validateUser(user)).toBe(true);
});
});

describe('数据构建器测试', () => {
it('应该构建完整的测试数据', () => {
const testData = TestDataBuilder.create()
.withUser({ name: 'Custom User' })
.withMessages(2)
.withGraphState({ step: 'processing' })
.withCustomData('extra', 'value')
.build();

expect(testData.user.name).toBe('Custom User');
expect(testData.messages).toHaveLength(2);
expect(testData.graphState.step).toBe('processing');
expect(testData.extra).toBe('value');
});

it('应该使用默认值构建数据', () => {
const testData = TestDataBuilder.create()
.withUser()
.withMessages()
.build();

expect(testData.user.name).toBe('Test User');
expect(testData.messages).toHaveLength(3);
expect(TestDataValidator.validateUser(testData.user)).toBe(true);
testData.messages.forEach((msg) => {
expect(TestDataValidator.validateMessage(msg)).toBe(true);
});
});
});

describe('数据验证器测试', () => {
it('应该验证有效的用户数据', () => {
const validUser = {
id: 'user_1',
name: 'Test User',
email: 'test@example.com',
};

expect(TestDataValidator.validateUser(validUser)).toBe(true);
});

it('应该拒绝无效的用户数据', () => {
const invalidUsers = [
null,
{},
{ id: 'user_1' }, // 缺少必需字段
{ id: 'user_1', name: 'Test', email: 'invalid-email' }, // 无效邮箱
];

invalidUsers.forEach((user) => {
expect(TestDataValidator.validateUser(user)).toBe(false);
});
});

it('应该验证有效的消息数据', () => {
const validMessage = {
id: 'msg_1',
content: 'Test message',
timestamp: Date.now(),
type: 'user',
};

expect(TestDataValidator.validateMessage(validMessage)).toBe(true);
});

it('应该验证有效的图状态数据', () => {
const validState = {
step: 'processing',
testData: { key: 'value' },
results: [{ result: 'test' }],
};

expect(TestDataValidator.validateGraphState(validState)).toBe(true);
});
});

describe('数据清理器测试', () => {
it('应该执行清理任务', () => {
let cleaned = false;

dataCleaner.addCleanupTask(() => {
cleaned = true;
});

dataCleaner.cleanup();

expect(cleaned).toBe(true);
});

it('应该处理清理任务中的错误', () => {
let task1Executed = false;
let task2Executed = false;

dataCleaner.addCleanupTask(() => {
task1Executed = true;
throw new Error('Cleanup error');
});

dataCleaner.addCleanupTask(() => {
task2Executed = true;
});

// 不应该抛出错误
expect(() => dataCleaner.cleanup()).not.toThrow();

expect(task1Executed).toBe(true);
expect(task2Executed).toBe(true);
});
});

describe('集成测试', () => {
it('应该支持完整的数据管理流程', async () => {
// 1. 构建测试数据
const testData = TestDataBuilder.create()
.withUser({ name: 'Integration User' })
.withMessages(2)
.withGraphState({ step: 'initial' })
.build();

// 2. 存储测试数据
dataManager.store('integration_test', testData);

// 3. 创建图进行测试
const graph = new StateGraph(TestDataState);

const processNode = async (state: typeof TestDataState.State) => {
const storedData = dataManager.retrieve('integration_test');
return {
step: 'processed',
testData: storedData,
results: [{ processed: true, timestamp: Date.now() }],
};
};

graph
.addNode('process', processNode)
.addEdge('__start__', 'process')
.addEdge('process', '__end__');

const app = graph.compile();

// 4. 执行测试
const result = await app.invoke({
step: 'initial',
testData: {},
results: [],
});

// 5. 验证结果
expect(result.step).toBe('processed');
expect(result.testData.user.name).toBe('Integration User');
expect(result.testData.messages).toHaveLength(2);
expect(result.results).toHaveLength(1);

// 6. 添加清理任务
dataCleaner.addCleanupTask(() => {
dataManager.clear();
});
});
});
});

// 导出测试工具
export {
TestDataManager,
TestDataBuilder,
TestDataValidator,
TestDataCleaner,
TestDataState,
};
/*
执行结果:
Line:8 🌭 path.resolve(__dirname, '.env') /home/user/langgraphjs-tutorial/websites/examples/utils/.env
Error: Vitest cannot be imported in a CommonJS module
(这些是测试文件,需要使用 pnpm test 而不是 esno 运行)
*/

🔄 持续集成测试

CI/CD 配置

GitHub Actions 配置
name: Test

on:
push:
branches: [main, develop]
pull_request:
branches: [main]

jobs:
test:
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v3

- name: Setup Node.js
uses: actions/setup-node@v3
with:
node-version: '18'
cache: 'npm'

- name: Install dependencies
run: npm ci

- name: Run unit tests
run: npm run test:unit

- name: Run integration tests
run: npm run test:integration

- name: Run E2E tests
run: npm run test:e2e

- name: Generate coverage report
run: npm run test:coverage

- name: Upload coverage to Codecov
uses: codecov/codecov-action@v3

测试报告

测试报告生成
/**
* ============================================================================
* 测试报告 - Test Reporting
* ============================================================================
*
* 📖 概述
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* 本文件展示如何生成和定制测试报告,包括测试结果汇总、代码覆盖率报告、性能基准报告、自定义报告格式以及CI/CD集成报告,帮助团队了解测试质量和趋势。
*
* 🎯 核心功能
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* 1️⃣ 测试结果报告:汇总测试通过、失败、跳过等信息
* 2️⃣ 覆盖率报告:生成代码覆盖率统计和可视化
* 3️⃣ 性能报告:记录和对比测试执行时间
* 4️⃣ 自定义报告:创建符合团队需求的报告格式
*
* 💡 实现思路
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* • 使用Vitest内置reporter
* • 配置覆盖率工具(如c8或Istanbul)
* • 实现自定义reporter收集额外指标
* • 导出JSON/HTML/XML格式的报告
*
* 🚀 使用方式
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* // 运行测试
* $ npx vitest 最佳实践/测试策略/test-reporting.ts
*
* ⚠️ 注意事项
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* • 覆盖率报告应该关注有意义的覆盖
* • 性能报告应该设置基准和阈值
* • 报告应该易于理解和分享
* • CI环境应该生成机器可读的报告格式
*
* @author 程哥
* @version 1.0.0
* @updated 2025-11
*/

import '../../utils/loadEnv';
import { describe, it, expect, beforeEach, afterEach } from 'vitest';
import { StateGraph, Annotation } from '@langchain/langgraph';

// 测试报告生成示例

// 测试报告状态定义
const TestReportState = Annotation.Root({
testName: Annotation<string>({
reducer: (x, y) => y,
}),
results: Annotation<any[]>({
reducer: (x, y) => [...(x || []), ...y],
}),
metrics: Annotation<any>({
reducer: (x, y) => ({ ...x, ...y }),
}),
summary: Annotation<any>({
reducer: (x, y) => ({ ...x, ...y }),
}),
});

// 测试结果收集器
class TestResultCollector {
private results: any[] = [];
private startTime: number = 0;
private endTime: number = 0;

start(): void {
this.startTime = performance.now();
this.results = [];
}

end(): void {
this.endTime = performance.now();
}

addResult(result: any): void {
this.results.push({
...result,
timestamp: Date.now(),
});
}

getResults(): any[] {
return this.results;
}

getDuration(): number {
return this.endTime - this.startTime;
}

getMetrics(): any {
const total = this.results.length;
const passed = this.results.filter((r) => r.status === 'passed').length;
const failed = this.results.filter((r) => r.status === 'failed').length;
const skipped = this.results.filter((r) => r.status === 'skipped').length;

return {
total,
passed,
failed,
skipped,
passRate: total > 0 ? (passed / total) * 100 : 0,
duration: this.getDuration(),
};
}
}

// 测试报告生成器
class TestReportGenerator {
static generateHtmlReport(collector: TestResultCollector): string {
const metrics = collector.getMetrics();
const results = collector.getResults();

return `
<!DOCTYPE html>
<html>
<head>
<title>测试报告</title>
<style>
body { font-family: Arial, sans-serif; margin: 20px; }
.header { background: #f5f5f5; padding: 20px; border-radius: 5px; }
.metrics { display: flex; gap: 20px; margin: 20px 0; }
.metric { background: #e3f2fd; padding: 15px; border-radius: 5px; text-align: center; }
.passed { background: #c8e6c9; }
.failed { background: #ffcdd2; }
.results { margin-top: 20px; }
.result { padding: 10px; margin: 5px 0; border-left: 4px solid #ccc; }
.result.passed { border-color: #4caf50; }
.result.failed { border-color: #f44336; }
</style>
</head>
<body>
<div class="header">
<h1>LangGraphJS 测试报告</h1>
<p>生成时间: ${new Date().toLocaleString()}</p>
</div>

<div class="metrics">
<div class="metric">
<h3>总计</h3>
<p>${metrics.total}</p>
</div>
<div class="metric passed">
<h3>通过</h3>
<p>${metrics.passed}</p>
</div>
<div class="metric failed">
<h3>失败</h3>
<p>${metrics.failed}</p>
</div>
<div class="metric">
<h3>通过率</h3>
<p>${metrics.passRate.toFixed(1)}%</p>
</div>
<div class="metric">
<h3>耗时</h3>
<p>${metrics.duration.toFixed(2)}ms</p>
</div>
</div>

<div class="results">
<h2>测试结果详情</h2>
${results
.map(
(result) => `
<div class="result ${result.status}">
<h4>${result.name}</h4>
<p>状态: ${result.status}</p>
<p>耗时: ${result.duration || 0}ms</p>
${result.error ? `<p>错误: ${result.error}</p>` : ''}
</div>
`
)
.join('')}
</div>
</body>
</html>`;
}

static generateJsonReport(collector: TestResultCollector): string {
return JSON.stringify(
{
timestamp: new Date().toISOString(),
metrics: collector.getMetrics(),
results: collector.getResults(),
},
null,
2
);
}

static generateMarkdownReport(collector: TestResultCollector): string {
const metrics = collector.getMetrics();
const results = collector.getResults();

return `# LangGraphJS 测试报告

## 测试概览

| 指标 | 值 |
|------|-----|
| 总测试数 | ${metrics.total} |
| 通过数 | ${metrics.passed} |
| 失败数 | ${metrics.failed} |
| 跳过数 | ${metrics.skipped} |
| 通过率 | ${metrics.passRate.toFixed(1)}% |
| 总耗时 | ${metrics.duration.toFixed(2)}ms |

## 测试结果

${results
.map(
(result) => `
### ${result.name}

- **状态**: ${result.status}
- **耗时**: ${result.duration || 0}ms
${result.error ? `- **错误**: ${result.error}` : ''}
`
)
.join('')}

---
*报告生成时间: ${new Date().toLocaleString()}*
`;
}
}

// 性能监控器
class PerformanceMonitor {
private measurements: Map<string, number[]> = new Map();

measure(name: string, fn: () => Promise<any>): Promise<any> {
const start = performance.now();
return fn().finally(() => {
const duration = performance.now() - start;
if (!this.measurements.has(name)) {
this.measurements.set(name, []);
}
this.measurements.get(name)!.push(duration);
});
}

getStats(name: string): any {
const measurements = this.measurements.get(name) || [];
if (measurements.length === 0) {
return null;
}

const sorted = [...measurements].sort((a, b) => a - b);
const sum = measurements.reduce((a, b) => a + b, 0);

return {
count: measurements.length,
min: Math.min(...measurements),
max: Math.max(...measurements),
avg: sum / measurements.length,
median: sorted[Math.floor(sorted.length / 2)],
p95: sorted[Math.floor(sorted.length * 0.95)],
};
}

getAllStats(): any {
const stats = {};
for (const [name] of this.measurements) {
stats[name] = this.getStats(name);
}
return stats;
}
}

describe('测试报告生成', () => {
let collector: TestResultCollector;
let monitor: PerformanceMonitor;

beforeEach(() => {
collector = new TestResultCollector();
monitor = new PerformanceMonitor();
});

describe('结果收集器测试', () => {
it('应该收集测试结果', () => {
collector.start();

collector.addResult({
name: 'Test 1',
status: 'passed',
duration: 100,
});

collector.addResult({
name: 'Test 2',
status: 'failed',
duration: 200,
error: 'Test failed',
});

collector.end();

const results = collector.getResults();
expect(results).toHaveLength(2);
expect(results[0].name).toBe('Test 1');
expect(results[1].status).toBe('failed');
});

it('应该计算测试指标', () => {
collector.start();

collector.addResult({ name: 'Test 1', status: 'passed' });
collector.addResult({ name: 'Test 2', status: 'passed' });
collector.addResult({ name: 'Test 3', status: 'failed' });
collector.addResult({ name: 'Test 4', status: 'skipped' });

collector.end();

const metrics = collector.getMetrics();
expect(metrics.total).toBe(4);
expect(metrics.passed).toBe(2);
expect(metrics.failed).toBe(1);
expect(metrics.skipped).toBe(1);
expect(metrics.passRate).toBe(50);
});
});

describe('报告生成器测试', () => {
beforeEach(() => {
collector.start();
collector.addResult({ name: 'Test 1', status: 'passed', duration: 100 });
collector.addResult({
name: 'Test 2',
status: 'failed',
duration: 200,
error: 'Error message',
});
collector.end();
});

it('应该生成 HTML 报告', () => {
const htmlReport = TestReportGenerator.generateHtmlReport(collector);

expect(htmlReport).toContain('<!DOCTYPE html>');
expect(htmlReport).toContain('LangGraphJS 测试报告');
expect(htmlReport).toContain('Test 1');
expect(htmlReport).toContain('Test 2');
expect(htmlReport).toContain('Error message');
});

it('应该生成 JSON 报告', () => {
const jsonReport = TestReportGenerator.generateJsonReport(collector);
const parsed = JSON.parse(jsonReport);

expect(parsed).toHaveProperty('timestamp');
expect(parsed).toHaveProperty('metrics');
expect(parsed).toHaveProperty('results');
expect(parsed.metrics.total).toBe(2);
expect(parsed.results).toHaveLength(2);
});

it('应该生成 Markdown 报告', () => {
const markdownReport =
TestReportGenerator.generateMarkdownReport(collector);

expect(markdownReport).toContain('# LangGraphJS 测试报告');
expect(markdownReport).toContain('| 总测试数 | 2 |');
expect(markdownReport).toContain('### Test 1');
expect(markdownReport).toContain('### Test 2');
});
});

describe('性能监控器测试', () => {
it('应该监控函数执行时间', async () => {
const testFn = async () => {
await new Promise((resolve) => setTimeout(resolve, 100));
return 'result';
};

const result = await monitor.measure('test_function', testFn);

expect(result).toBe('result');

const stats = monitor.getStats('test_function');
expect(stats).toBeDefined();
expect(stats.count).toBe(1);
expect(stats.min).toBeGreaterThan(90);
expect(stats.max).toBeLessThan(150);
});

it('应该收集多次测量的统计信息', async () => {
const testFn = async () => {
await new Promise((resolve) =>
setTimeout(resolve, Math.random() * 50 + 50)
);
};

// 执行多次测量
for (let i = 0; i < 5; i++) {
await monitor.measure('multi_test', testFn);
}

const stats = monitor.getStats('multi_test');
expect(stats.count).toBe(5);
expect(stats.avg).toBeGreaterThan(0);
expect(stats.min).toBeLessThanOrEqual(stats.max);
expect(stats.median).toBeDefined();
expect(stats.p95).toBeDefined();
});

it('应该获取所有统计信息', async () => {
await monitor.measure('test1', async () => {});
await monitor.measure('test2', async () => {});

const allStats = monitor.getAllStats();
expect(allStats).toHaveProperty('test1');
expect(allStats).toHaveProperty('test2');
});
});

describe('集成测试', () => {
it('应该生成完整的测试报告', async () => {
// 模拟测试执行
collector.start();

// 创建测试图
const graph = new StateGraph(TestReportState);

const testNode = async (state: typeof TestReportState.State) => {
return {
testName: 'Integration Test',
results: [{ name: 'Node Test', status: 'passed', duration: 50 }],
metrics: { nodeExecutions: 1 },
};
};

graph
.addNode('test', testNode)
.addEdge('__start__', 'test')
.addEdge('test', '__end__');

const app = graph.compile();

// 执行测试并监控性能
const result = await monitor.measure('graph_execution', async () => {
return await app.invoke({
testName: '',
results: [],
metrics: {},
summary: {},
});
});

// 收集结果
collector.addResult({
name: result.testName,
status: 'passed',
duration: monitor.getStats('graph_execution')?.avg || 0,
});

collector.end();

// 生成报告
const htmlReport = TestReportGenerator.generateHtmlReport(collector);
const jsonReport = TestReportGenerator.generateJsonReport(collector);
const markdownReport =
TestReportGenerator.generateMarkdownReport(collector);

// 验证报告内容
expect(htmlReport).toContain('Integration Test');
expect(jsonReport).toContain('Integration Test');
expect(markdownReport).toContain('Integration Test');

const metrics = collector.getMetrics();
expect(metrics.total).toBe(1);
expect(metrics.passed).toBe(1);
expect(metrics.passRate).toBe(100);
});
});
});

// 导出测试工具
export {
TestResultCollector,
TestReportGenerator,
PerformanceMonitor,
TestReportState,
};

/*
执行结果:
RUN v3.2.4 /Users/loock/myFile/langgraphjs-tutorial/websites

stdout | examples/最佳实践/测试策略/test-reporting.test.ts
Line:8 🌭 path.resolve(__dirname, '.env') /Users/loock/myFile/langgraphjs-tutorial/websites/examples/utils/.env

✓ examples/最佳实践/测试策略/test-reporting.test.ts (9 tests) 533ms
✓ 测试报告生成 > 性能监控器测试 > 应该收集多次测量的统计信息 391ms

Test Files 1 passed (1)
Tests 9 passed (9)
Start at 02:40:15
Duration 4.14s (transform 101ms, setup 309ms, collect 644ms, tests 533ms, environment 1.24s, prepare 700ms)
*/

📈 测试覆盖率

覆盖率目标

const COVERAGE_TARGETS = {
statements: 90,
branches: 85,
functions: 90,
lines: 90,
};

覆盖率分析

🎯 测试策略矩阵

测试类型范围速度成本信心度
单元测试
集成测试
E2E 测试

🔧 测试工具集

核心工具

测试工具集
/**
* ============================================================================
* 测试工具函数 - Test Utilities
* ============================================================================
*
* 📖 概述
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* 本文件提供一系列实用的测试辅助工具函数,包括状态构建器、断言助手、异步等待工具、Mock生成器以及测试数据工厂,简化测试代码编写,提高测试效率。
*
* 🎯 核心功能
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* 1️⃣ 状态构建器:简化测试状态的构建
* 2️⃣ 断言助手:提供领域特定的断言函数
* 3️⃣ 异步工具:处理异步操作的辅助函数
* 4️⃣ Mock生成器:快速生成各种Mock对象
*
* 💡 实现思路
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* • 提供Builder模式的状态构建器
* • 封装常用的断言组合
* • 实现waitFor、waitUntil等异步工具
* • 提供可配置的Mock工厂函数
*
* 🚀 使用方式
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* // 运行测试
* $ npx vitest 最佳实践/测试策略/test-utilities.ts
*
* ⚠️ 注意事项
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* • 工具函数应该简单、可组合
* • 提供良好的TypeScript类型支持
* • 文档化所有导出的工具函数
* • 保持工具函数的纯粹性
*
* @author 程哥
* @version 1.0.0
* @updated 2025-11
*/

import '../../utils/loadEnv';
import { describe, it, expect, beforeEach } from 'vitest';
import { StateGraph, Annotation } from '@langchain/langgraph';

// 测试工具集合

// 基础测试状态
const UtilityTestState = Annotation.Root({
step: Annotation<string>({
reducer: (x, y) => y,
}),
data: Annotation<any>({
reducer: (x, y) => ({ ...x, ...y }),
}),
results: Annotation<any[]>({
reducer: (x, y) => [...(x || []), ...y],
}),
});

// 测试工具类
class TestUtils {
// 等待指定时间
static async sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}

// 重试执行函数
static async retry<T>(
fn: () => Promise<T>,
maxAttempts: number = 3,
delay: number = 1000
): Promise<T> {
let lastError: Error;

for (let attempt = 1; attempt <= maxAttempts; attempt++) {
try {
return await fn();
} catch (error) {
lastError = error as Error;
if (attempt < maxAttempts) {
await this.sleep(delay);
}
}
}

throw lastError!;
}

// 超时执行函数
static async timeout<T>(
promise: Promise<T>,
timeoutMs: number,
timeoutMessage: string = 'Operation timed out'
): Promise<T> {
const timeoutPromise = new Promise<never>((_, reject) => {
setTimeout(() => reject(new Error(timeoutMessage)), timeoutMs);
});

return Promise.race([promise, timeoutPromise]);
}

// 生成随机字符串
static randomString(length: number = 8): string {
const chars =
'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789';
let result = '';
for (let i = 0; i < length; i++) {
result += chars.charAt(Math.floor(Math.random() * chars.length));
}
return result;
}

// 生成随机数字
static randomNumber(min: number = 0, max: number = 100): number {
return Math.floor(Math.random() * (max - min + 1)) + min;
}

// 深度比较对象
static deepEqual(obj1: any, obj2: any): boolean {
if (obj1 === obj2) return true;

if (obj1 == null || obj2 == null) return false;

if (typeof obj1 !== typeof obj2) return false;

if (typeof obj1 !== 'object') return obj1 === obj2;

const keys1 = Object.keys(obj1);
const keys2 = Object.keys(obj2);

if (keys1.length !== keys2.length) return false;

for (const key of keys1) {
if (!keys2.includes(key)) return false;
if (!this.deepEqual(obj1[key], obj2[key])) return false;
}

return true;
}

// 创建模拟函数
static createMock<T extends (...args: any[]) => any>(
implementation?: T
): MockFunction<T> {
return new MockFunction(implementation);
}
}

// 模拟函数类
class MockFunction<T extends (...args: any[]) => any> {
private calls: Parameters<T>[] = [];
private results: ReturnType<T>[] = [];
private implementation?: T;

constructor(implementation?: T) {
this.implementation = implementation;
}

// 执行模拟函数
execute(...args: Parameters<T>): ReturnType<T> {
this.calls.push(args);

if (this.implementation) {
const result = this.implementation(...args);
this.results.push(result);
return result;
}

return undefined as ReturnType<T>;
}

// 获取调用次数
getCallCount(): number {
return this.calls.length;
}

// 获取所有调用参数
getCalls(): Parameters<T>[] {
return [...this.calls];
}

// 获取指定调用的参数
getCall(index: number): Parameters<T> | undefined {
return this.calls[index];
}

// 获取所有返回值
getResults(): ReturnType<T>[] {
return [...this.results];
}

// 重置模拟函数
reset(): void {
this.calls = [];
this.results = [];
}

// 设置实现
setImplementation(implementation: T): void {
this.implementation = implementation;
}
}

// 测试断言工具
class AssertionUtils {
// 断言数组包含元素
static assertArrayContains<T>(array: T[], element: T): void {
if (!array.includes(element)) {
throw new Error(`Array does not contain element: ${element}`);
}
}

// 断言数组长度
static assertArrayLength<T>(array: T[], expectedLength: number): void {
if (array.length !== expectedLength) {
throw new Error(
`Expected array length ${expectedLength}, got ${array.length}`
);
}
}

// 断言对象属性
static assertObjectHasProperty(obj: any, property: string): void {
if (!(property in obj)) {
throw new Error(`Object does not have property: ${property}`);
}
}

// 断言函数抛出错误
static async assertThrows(
fn: () => Promise<any> | any,
expectedError?: string | RegExp
): Promise<void> {
try {
await fn();
throw new Error('Expected function to throw, but it did not');
} catch (error) {
if (expectedError) {
const message = (error as Error).message;
if (typeof expectedError === 'string') {
if (!message.includes(expectedError)) {
throw new Error(
`Expected error message to contain "${expectedError}", got "${message}"`
);
}
} else if (expectedError instanceof RegExp) {
if (!expectedError.test(message)) {
throw new Error(
`Expected error message to match ${expectedError}, got "${message}"`
);
}
}
}
}
}

// 断言时间范围
static assertTimeInRange(
actualTime: number,
expectedTime: number,
tolerance: number = 100
): void {
const diff = Math.abs(actualTime - expectedTime);
if (diff > tolerance) {
throw new Error(
`Time ${actualTime} is not within ${tolerance}ms of ${expectedTime}`
);
}
}
}

// 测试数据生成器
class TestDataGenerator {
// 生成用户数据
static generateUser(overrides: any = {}): any {
return {
id: TestUtils.randomString(8),
name: `User ${TestUtils.randomString(4)}`,
email: `${TestUtils.randomString(6)}@test.com`,
age: TestUtils.randomNumber(18, 80),
createdAt: new Date().toISOString(),
...overrides,
};
}

// 生成消息数据
static generateMessage(overrides: any = {}): any {
return {
id: TestUtils.randomString(10),
content: `Test message ${TestUtils.randomString(6)}`,
timestamp: Date.now(),
type: Math.random() > 0.5 ? 'user' : 'assistant',
...overrides,
};
}

// 生成图状态数据
static generateGraphState(overrides: any = {}): any {
return {
step: `step_${TestUtils.randomString(4)}`,
data: {
id: TestUtils.randomString(8),
value: TestUtils.randomNumber(1, 1000),
},
results: [],
...overrides,
};
}

// 生成批量数据
static generateBatch<T>(
generator: (overrides?: any) => T,
count: number,
overrides: any = {}
): T[] {
return Array.from({ length: count }, () => generator(overrides));
}
}

// 测试环境管理器
class TestEnvironment {
private static instance: TestEnvironment;
private variables: Map<string, any> = new Map();
private cleanupTasks: (() => void)[] = [];

static getInstance(): TestEnvironment {
if (!TestEnvironment.instance) {
TestEnvironment.instance = new TestEnvironment();
}
return TestEnvironment.instance;
}

// 设置环境变量
setVariable(key: string, value: any): void {
this.variables.set(key, value);
}

// 获取环境变量
getVariable(key: string): any {
return this.variables.get(key);
}

// 添加清理任务
addCleanupTask(task: () => void): void {
this.cleanupTasks.push(task);
}

// 执行清理
cleanup(): void {
this.cleanupTasks.forEach((task) => {
try {
task();
} catch (error) {
console.warn('Cleanup task failed:', error);
}
});
this.cleanupTasks = [];
this.variables.clear();
}
}

describe('测试工具集', () => {
let testEnv: TestEnvironment;

beforeEach(() => {
testEnv = TestEnvironment.getInstance();
});

describe('TestUtils 测试', () => {
it('应该等待指定时间', async () => {
const start = performance.now();
await TestUtils.sleep(100);
const end = performance.now();

expect(end - start).toBeGreaterThanOrEqual(90);
expect(end - start).toBeLessThan(150);
});

it('应该重试失败的函数', async () => {
let attempts = 0;
const failingFunction = async () => {
attempts++;
if (attempts < 3) {
throw new Error('Function failed');
}
return 'success';
};

const result = await TestUtils.retry(failingFunction, 3, 10);

expect(result).toBe('success');
expect(attempts).toBe(3);
});

it('应该在超时时抛出错误', async () => {
const slowFunction = async () => {
await TestUtils.sleep(200);
return 'result';
};

await expect(
TestUtils.timeout(slowFunction(), 100, 'Custom timeout')
).rejects.toThrow('Custom timeout');
});

it('应该生成随机字符串', () => {
const str1 = TestUtils.randomString(10);
const str2 = TestUtils.randomString(10);

expect(str1).toHaveLength(10);
expect(str2).toHaveLength(10);
expect(str1).not.toBe(str2);
});

it('应该深度比较对象', () => {
const obj1 = { a: 1, b: { c: 2 } };
const obj2 = { a: 1, b: { c: 2 } };
const obj3 = { a: 1, b: { c: 3 } };

expect(TestUtils.deepEqual(obj1, obj2)).toBe(true);
expect(TestUtils.deepEqual(obj1, obj3)).toBe(false);
});
});

describe('MockFunction 测试', () => {
it('应该记录函数调用', () => {
const mock = TestUtils.createMock((a: number, b: string) => `${a}-${b}`);

mock.execute(1, 'test');
mock.execute(2, 'hello');

expect(mock.getCallCount()).toBe(2);
expect(mock.getCall(0)).toEqual([1, 'test']);
expect(mock.getCall(1)).toEqual([2, 'hello']);
});

it('应该执行自定义实现', () => {
const mock = TestUtils.createMock((x: number) => x * 2);

const result1 = mock.execute(5);
const result2 = mock.execute(10);

expect(result1).toBe(10);
expect(result2).toBe(20);
expect(mock.getResults()).toEqual([10, 20]);
});
});

describe('AssertionUtils 测试', () => {
it('应该断言数组包含元素', () => {
const array = [1, 2, 3, 4, 5];

expect(() => AssertionUtils.assertArrayContains(array, 3)).not.toThrow();
expect(() => AssertionUtils.assertArrayContains(array, 6)).toThrow();
});

it('应该断言函数抛出错误', async () => {
const throwingFunction = () => {
throw new Error('Test error message');
};

await expect(
AssertionUtils.assertThrows(throwingFunction, 'Test error')
).resolves.not.toThrow();

await expect(
AssertionUtils.assertThrows(() => 'no error', 'Expected error')
).rejects.toThrow('Expected function to throw');
});
});

describe('TestDataGenerator 测试', () => {
it('应该生成用户数据', () => {
const user = TestDataGenerator.generateUser({ name: 'Custom User' });

expect(user).toHaveProperty('id');
expect(user).toHaveProperty('email');
expect(user.name).toBe('Custom User');
expect(user.email).toContain('@test.com');
});

it('应该生成批量数据', () => {
const users = TestDataGenerator.generateBatch(
TestDataGenerator.generateUser,
5,
{ age: 25 }
);

expect(users).toHaveLength(5);
users.forEach((user) => {
expect(user.age).toBe(25);
expect(user).toHaveProperty('id');
});
});
});

describe('TestEnvironment 测试', () => {
it('应该管理环境变量', () => {
testEnv.setVariable('test_key', 'test_value');

expect(testEnv.getVariable('test_key')).toBe('test_value');
expect(testEnv.getVariable('nonexistent')).toBeUndefined();
});

it('应该执行清理任务', () => {
let cleaned = false;

testEnv.addCleanupTask(() => {
cleaned = true;
});

testEnv.cleanup();

expect(cleaned).toBe(true);
});
});
});

// 导出测试工具
export {
TestUtils,
MockFunction,
AssertionUtils,
TestDataGenerator,
TestEnvironment,
UtilityTestState,
};

/*
执行结果:
RUN v3.2.4 /Users/loock/myFile/langgraphjs-tutorial/websites

stdout | examples/最佳实践/测试策略/test-utilities.test.ts
Line:8 🌭 path.resolve(__dirname, '.env') /Users/loock/myFile/langgraphjs-tutorial/websites/examples/utils/.env

✓ examples/最佳实践/测试策略/test-utilities.test.ts (13 tests) 229ms

Test Files 1 passed (1)
Tests 13 passed (13)
Start at 02:39:30
Duration 1.98s (transform 79ms, setup 242ms, collect 334ms, tests 229ms, environment 469ms, prepare 368ms)
*/

自定义匹配器

自定义测试匹配器
/**
* ============================================================================
* 自定义测试匹配器 - Custom Test Matchers
* ============================================================================
*
* 📖 概述
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* 本文件提供一套专为 LangGraph 测试定制的 Vitest 自定义匹配器,包括状态验证、
* 图结构验证、性能测试、错误处理、流式处理、持久化等多个领域的匹配器。通过
* 扩展 expect 接口,提供更语义化、更易读的测试断言,提升测试代码质量。
*
* 🎯 核心功能
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* 1️⃣ 状态验证匹配器:toBeValidState、toHaveExecutedNodes、toHaveStateProperty
* 2️⃣ 性能测试匹配器:toBeWithinTimeRange,验证执行时间是否在合理范围内
* 3️⃣ 错误处理匹配器:toHaveErrorType,验证错误类型
* 4️⃣ 图结构匹配器:toBeValidGraph、toHaveNodeCount、toHaveEdgeCount
* 5️⃣ 流式处理匹配器:toBeStreamingResponse,验证异步迭代器或流
* 6️⃣ 持久化匹配器:toHaveValidCheckpoint,验证检查点结构
* 7️⃣ 复合匹配器:toBeSuccessfulGraphExecution,综合验证图执行结果
* 8️⃣ 异步匹配器:toResolveWithin,验证 Promise 在指定时间内完成
*
* 💡 实现思路
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* • 使用 expect.extend() 扩展 Vitest 的匹配器功能
* • 通过 TypeScript 模块声明扩展 Vitest 接口,提供类型支持
* • 每个匹配器返回 { pass, message } 对象,支持正向和反向断言
* • 使用 Symbol.asyncIterator 检测异步迭代器
* • 提供匹配器工厂函数,支持创建自定义匹配器
* • 使用类型守卫函数增强类型安全性
*
* 🚀 使用方式
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* // 在测试文件中导入
* import './custom-matchers';
*
* // 使用自定义匹配器
* expect(state).toBeValidState();
* expect(state).toHaveStateProperty('messages');
* expect(executionTime).toBeWithinTimeRange(100, 200);
*
* ⚠️ 注意事项
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* • 自定义匹配器需要在测试文件顶部导入才能生效
* • TypeScript 需要正确配置才能识别扩展的接口
* • 匹配器的错误消息应该清晰地说明期望和实际值
* • 复杂的匹配器应该拆分为多个简单匹配器组合使用
* • 异步匹配器需要在 async 测试函数中使用 await
*
* @author 程哥
* @version 1.0.0
* @updated 2025-11
*/

import '../../utils/loadEnv';
import { expect } from 'vitest';

// 自定义测试匹配器

// 扩展 expect 接口
interface CustomMatchers<R = unknown> {
toBeValidState(): R;
toHaveExecutedNodes(nodes: string[]): R;
toHaveStateProperty(property: string, value?: any): R;
toBeWithinTimeRange(min: number, max: number): R;
toHaveErrorType(errorType: string): R;
toBeValidGraph(): R;
toHaveNodeCount(count: number): R;
toHaveEdgeCount(count: number): R;
toBeStreamingResponse(): R;
toHaveValidCheckpoint(): R;
}

declare module 'vitest' {
interface Assertion<T = any> extends CustomMatchers<T> {}
interface AsymmetricMatchersContaining extends CustomMatchers {}
}

// 状态验证匹配器
expect.extend({
toBeValidState(received: any) {
const { isNot } = this;

const pass =
received && typeof received === 'object' && !Array.isArray(received);

return {
pass,
message: () =>
`${received} ${
isNot ? 'should not' : 'should'
} be a valid state object`,
};
},

toHaveExecutedNodes(received: any, expectedNodes: string[]) {
const { isNot } = this;

if (!received || !received.executedNodes) {
return {
pass: false,
message: () => 'State does not have executedNodes property',
};
}

const executedNodes = received.executedNodes;
const pass = expectedNodes.every((node) => executedNodes.includes(node));

return {
pass,
message: () =>
`Expected state ${
isNot ? 'not ' : ''
}to have executed nodes: ${expectedNodes.join(', ')}\n` +
`Received: ${executedNodes.join(', ')}`,
};
},

toHaveStateProperty(received: any, property: string, expectedValue?: any) {
const { isNot } = this;

if (!received || typeof received !== 'object') {
return {
pass: false,
message: () => 'Received value is not a valid state object',
};
}

const hasProperty = property in received;

if (expectedValue !== undefined) {
const pass = hasProperty && received[property] === expectedValue;
return {
pass,
message: () =>
`Expected state ${
isNot ? 'not ' : ''
}to have property "${property}" with value ${expectedValue}\n` +
`Received: ${received[property]}`,
};
}

return {
pass: hasProperty,
message: () =>
`Expected state ${isNot ? 'not ' : ''}to have property "${property}"`,
};
},
});

// 性能测试匹配器
expect.extend({
toBeWithinTimeRange(received: number, min: number, max: number) {
const { isNot } = this;

const pass = received >= min && received <= max;

return {
pass,
message: () =>
`Expected execution time ${
isNot ? 'not ' : ''
}to be within ${min}ms - ${max}ms\n` + `Received: ${received}ms`,
};
},
});

// 错误处理匹配器
expect.extend({
toHaveErrorType(received: any, expectedErrorType: string) {
const { isNot } = this;

if (!received || !(received instanceof Error)) {
return {
pass: false,
message: () => 'Received value is not an Error instance',
};
}

const pass = received.constructor.name === expectedErrorType;

return {
pass,
message: () =>
`Expected error ${
isNot ? 'not ' : ''
}to be of type "${expectedErrorType}"\n` +
`Received: ${received.constructor.name}`,
};
},
});

// 图结构验证匹配器
expect.extend({
toBeValidGraph(received: any) {
const { isNot } = this;

if (!received || typeof received !== 'object') {
return {
pass: false,
message: () => 'Received value is not a valid graph object',
};
}

// 检查图的基本结构
const hasNodes = 'nodes' in received || '_nodes' in received;
const hasEdges = 'edges' in received || '_edges' in received;

const pass = hasNodes && hasEdges;

return {
pass,
message: () =>
`Expected ${
isNot ? 'not ' : ''
}to be a valid graph with nodes and edges`,
};
},

toHaveNodeCount(received: any, expectedCount: number) {
const { isNot } = this;

if (!received || typeof received !== 'object') {
return {
pass: false,
message: () => 'Received value is not a valid graph object',
};
}

// 尝试获取节点数量
const nodes = received.nodes || received._nodes || {};
const nodeCount = Object.keys(nodes).length;

const pass = nodeCount === expectedCount;

return {
pass,
message: () =>
`Expected graph ${
isNot ? 'not ' : ''
}to have ${expectedCount} nodes\n` + `Received: ${nodeCount} nodes`,
};
},

toHaveEdgeCount(received: any, expectedCount: number) {
const { isNot } = this;

if (!received || typeof received !== 'object') {
return {
pass: false,
message: () => 'Received value is not a valid graph object',
};
}

// 尝试获取边数量
const edges = received.edges || received._edges || [];
const edgeCount = Array.isArray(edges)
? edges.length
: Object.keys(edges).length;

const pass = edgeCount === expectedCount;

return {
pass,
message: () =>
`Expected graph ${
isNot ? 'not ' : ''
}to have ${expectedCount} edges\n` + `Received: ${edgeCount} edges`,
};
},
});

// 流式处理匹配器
expect.extend({
toBeStreamingResponse(received: any) {
const { isNot } = this;

// 检查是否是异步迭代器
const isAsyncIterable =
received && typeof received[Symbol.asyncIterator] === 'function';

// 检查是否是 ReadableStream
const isReadableStream = received instanceof ReadableStream;

// 检查是否有 stream 方法
const hasStreamMethod = received && typeof received.stream === 'function';

const pass = isAsyncIterable || isReadableStream || hasStreamMethod;

return {
pass,
message: () =>
`Expected ${
isNot ? 'not ' : ''
}to be a streaming response (AsyncIterable, ReadableStream, or have stream method)`,
};
},
});

// 持久化匹配器
expect.extend({
toHaveValidCheckpoint(received: any) {
const { isNot } = this;

if (!received || typeof received !== 'object') {
return {
pass: false,
message: () => 'Received value is not a valid checkpoint object',
};
}

// 检查检查点的基本结构
const hasId = 'id' in received || 'checkpoint_id' in received;
const hasState = 'state' in received;
const hasTimestamp = 'timestamp' in received || 'created_at' in received;

const pass = hasId && hasState && hasTimestamp;

return {
pass,
message: () =>
`Expected ${
isNot ? 'not ' : ''
}to be a valid checkpoint with id, state, and timestamp`,
};
},
});

// 辅助函数:创建自定义匹配器
export const createCustomMatcher = (
name: string,
matcherFn: (
received: any,
...args: any[]
) => { pass: boolean; message: () => string }
) => {
expect.extend({
[name]: matcherFn,
});
};

// 复合匹配器:检查图执行结果
export const toBeSuccessfulGraphExecution = (received: any) => {
const { isNot } = expect.getState();

if (!received || typeof received !== 'object') {
return {
pass: false,
message: () => 'Received value is not a valid execution result',
};
}

// 检查执行是否成功
const hasResult = 'result' in received || 'output' in received;
const noErrors = !received.errors || received.errors.length === 0;
const completed =
received.status === 'completed' || received.finished === true;

const pass = hasResult && noErrors && completed;

return {
pass,
message: () =>
`Expected ${isNot ? 'not ' : ''}to be a successful graph execution\n` +
`Has result: ${hasResult}\n` +
`No errors: ${noErrors}\n` +
`Completed: ${completed}`,
};
};

// 注册复合匹配器
expect.extend({
toBeSuccessfulGraphExecution,
});

// 类型安全的匹配器工厂
export const createTypedMatcher = <T>(
validator: (value: any) => value is T,
name: string
) => {
return (received: any) => {
const { isNot } = expect.getState();
const pass = validator(received);

return {
pass,
message: () =>
`Expected value ${isNot ? 'not ' : ''}to match type ${name}`,
};
};
};

// 示例:状态类型验证器
export const isValidLangGraphState = (
value: any
): value is Record<string, any> => {
return (
value &&
typeof value === 'object' &&
!Array.isArray(value) &&
Object.keys(value).length > 0
);
};

// 注册类型化匹配器
expect.extend({
toBeValidLangGraphState: createTypedMatcher(
isValidLangGraphState,
'LangGraphState'
),
});

// 异步匹配器:检查异步操作
export const toResolveWithin = async (
received: Promise<any>,
timeoutMs: number
) => {
const { isNot } = expect.getState();

try {
const timeoutPromise = new Promise((_, reject) =>
setTimeout(() => reject(new Error('Timeout')), timeoutMs)
);

await Promise.race([received, timeoutPromise]);

return {
pass: true,
message: () =>
`Expected promise ${
isNot ? 'not ' : ''
}to resolve within ${timeoutMs}ms`,
};
} catch (error) {
const isTimeout = (error as Error).message === 'Timeout';

return {
pass: false,
message: () =>
isTimeout
? `Promise did not resolve within ${timeoutMs}ms`
: `Promise rejected with: ${(error as Error).message}`,
};
}
};

// 注册异步匹配器
expect.extend({
toResolveWithin,
});

// 使
/*
执行结果:
Line:8 🌭 path.resolve(__dirname, '.env') /home/user/langgraphjs-tutorial/websites/examples/utils/.env
Error: Vitest cannot be imported in a CommonJS module
(这些是测试文件,需要使用 pnpm test 而不是 esno 运行)
*/

📋 测试检查清单

测试检查清单

单元测试

  • 所有节点函数都有单元测试
  • 状态 reducer 逻辑测试完整
  • 边界条件和错误情况已覆盖
  • 测试数据独立且可重复

集成测试

  • 节点间交互测试
  • 图执行流程测试
  • 外部依赖集成测试
  • 配置变更影响测试

端到端测试

  • 关键用户场景覆盖
  • 性能基准测试
  • 错误恢复测试
  • 跨浏览器兼容性测试

测试质量

  • 测试代码可读性良好
  • 测试运行稳定可靠
  • 测试覆盖率达标
  • 测试执行时间合理

🚀 测试自动化

自动化流程

📊 测试指标

关键指标

  • 测试覆盖率:代码覆盖百分比
  • 测试通过率:测试成功率
  • 测试执行时间:测试套件运行时间
  • 缺陷发现率:测试发现的问题数量

小结

有效的测试策略应该:

  • 分层测试:合理分配不同层级的测试
  • 自动化优先:尽可能自动化测试流程
  • 快速反馈:提供及时的测试结果
  • 持续改进:根据反馈不断优化测试策略

下一节我们将学习常见用例,了解 LangGraphJS 在实际项目中的应用。