⏰ 时间旅行
引言
时间旅行(Time Travel)是 LangGraphJS 提供的一项强大调试功能,允许开发者回到应用执行的任何历史状态,查看当时的数据,甚至从那个时间点重新开始执行。这就像是为你的 AI 应用提供了一个"时光机"。
对于前端开发者来说,这个概念类似于:
- Git 版本控制:可以查看和回滚到任何历史提交
- 浏览器历史记录:可以前进和后退到访问过的页面
- Redux DevTools:可以查看和重放状态变化
- Chrome DevTools:可以查看调用栈和变量状态
时间旅行功能在调试复杂的 AI 应用时特别有用,特别是当你需要理解状态变化的过程或测试不同的执行路径时。
核心概念
🔍 检查点系统
时间旅行功能基于 LangGraphJS 的检查点(Checkpoint)机制:
📸 状态快照
每个检查点都包含完整的状态快照:
- 消息历史:所有的对话消息
- 变量状态:所有状态变量的值
- 执行上下文:当前执行位置和配置
- 时间戳:检查点创建时间
🕐 时间轴概念
基础用法
📋 查看历史状态
基础时间旅行示例:
import '../../utils/loadEnv';
import { StateGraph, Annotation, MemorySaver } from '@langchain/langgraph';
import { BaseMessage, HumanMessage, AIMessage } from '@langchain/core/messages';
// 定义状态
const StateAnnotation = Annotation.Root({
messages: Annotation<BaseMessage[]>({
reducer: (x, y) => x.concat(y),
}),
step: Annotation<number>({
reducer: (x, y) => y ?? x,
default: () => 0,
}),
});
// 创建简单的节点
async function step1(state: typeof StateAnnotation.State) {
return {
messages: [new AIMessage('执行步骤1')],
step: state.step + 1,
};
}
async function step2(state: typeof StateAnnotation.State) {
return {
messages: [new AIMessage('执行步骤2')],
step: state.step + 1,
};
}
async function step3(state: typeof StateAnnotation.State) {
return {
messages: [new AIMessage('执行步骤3')],
step: state.step + 1,
};
}
// 创建图
const graph = new StateGraph(StateAnnotation)
.addNode('step1', step1)
.addNode('step2', step2)
.addNode('step3', step3)
.addEdge('__start__', 'step1')
.addEdge('step1', 'step2')
.addEdge('step2', 'step3')
.addEdge('step3', '__end__');
// 使用内存保存器启用检查点
const memory = new MemorySaver();
const compiledGraph = graph.compile({ checkpointer: memory });
// 基础时间旅行示例
export async function demonstrateBasicTimeTravel() {
console.log('=== 基础时间旅行示例 ===');
const config = { configurable: { thread_id: 'basic-demo' } };
// 1. 执行图
console.log('\n1. 执行图...');
const result = await compiledGraph.invoke(
{ messages: [new HumanMessage('开始执行')] },
config
);
console.log('最终结果:', result);
// 2. 查看历史状态
console.log('\n2. 查看历史状态...');
const history = [];
for await (const checkpoint of compiledGraph.getStateHistory(config)) {
history.push({
id: checkpoint.config.configurable.checkpoint_id,
step: checkpoint.metadata?.step,
values: checkpoint.values,
});
}
// 按时间顺序显示
history.reverse().forEach((checkpoint, index) => {
console.log(`\n检查点 ${index + 1}:`);
console.log(` ID: ${checkpoint.id?.slice(0, 8)}`);
console.log(` 步骤: ${checkpoint.step}`);
console.log(` 消息数量: ${checkpoint.values.messages?.length || 0}`);
console.log(` 当前步骤: ${checkpoint.values.step}`);
});
// 3. 获取特定检查点的状态
if (history.length > 1) {
console.log('\n3. 获取特定检查点的状态...');
const targetCheckpoint = history[1]; // 获取第二个检查点
const specificState = await compiledGraph.getState({
...config,
configurable: {
...config.configurable,
checkpoint_id: targetCheckpoint.id,
},
});
console.log('特定检查点状态:');
console.log(
' 消息:',
specificState.values.messages?.map((m) => m.content)
);
console.log(' 步骤:', specificState.values.step);
}
return history;
}
// 状态查看工具
export class StateViewer {
private graph: any;
private config: any;
constructor(graph: any, config: any) {
this.graph = graph;
this.config = config;
}
// 获取所有检查点
async getAllCheckpoints() {
const checkpoints = [];
for await (const checkpoint of this.graph.getStateHistory(this.config)) {
checkpoints.push({
id: checkpoint.config.configurable.checkpoint_id,
step: checkpoint.metadata?.step,
values: checkpoint.values,
config: checkpoint.config,
metadata: checkpoint.metadata,
});
}
return checkpoints.reverse(); // 按时间顺序
}
// 显示时间线
async showTimeline() {
const checkpoints = await this.getAllCheckpoints();
console.log('\n=== 执行时间线 ===');
checkpoints.forEach((checkpoint, index) => {
const id = checkpoint.id?.slice(0, 8) || 'unknown';
const step = checkpoint.metadata?.step || index;
const messageCount = checkpoint.values.messages?.length || 0;
console.log(
`${index + 1}. [${id}] 步骤 ${step} - ${messageCount} 条消息`
);
// 显示最后一条消息
if (checkpoint.values.messages && checkpoint.values.messages.length > 0) {
const lastMessage =
checkpoint.values.messages[checkpoint.values.messages.length - 1];
console.log(` └─ "${lastMessage.content}"`);
}
});
}
// 比较两个检查点
async compareCheckpoints(checkpointId1: string, checkpointId2: string) {
const state1 = await this.graph.getState({
...this.config,
configurable: {
...this.config.configurable,
checkpoint_id: checkpointId1,
},
});
const state2 = await this.graph.getState({
...this.config,
configurable: {
...this.config.configurable,
checkpoint_id: checkpointId2,
},
});
console.log('\n=== 检查点比较 ===');
console.log(`检查点1 (${checkpointId1.slice(0, 8)}):`);
console.log(' 消息数量:', state1.values.messages?.length || 0);
console.log(' 步骤:', state1.values.step);
console.log(`检查点2 (${checkpointId2.slice(0, 8)}):`);
console.log(' 消息数量:', state2.values.messages?.length || 0);
console.log(' 步骤:', state2.values.step);
// 分析差异
const messageDiff =
(state2.values.messages?.length || 0) -
(state1.values.messages?.length || 0);
const stepDiff = (state2.values.step || 0) - (state1.values.step || 0);
console.log('\n差异:');
console.log(
' 消息变化:',
messageDiff > 0 ? `+${messageDiff}` : messageDiff
);
console.log(' 步骤变化:', stepDiff > 0 ? `+${stepDiff}` : stepDiff);
return { state1, state2, messageDiff, stepDiff };
}
// 查找特定内容的检查点
async findCheckpointsByContent(searchTerm: string) {
const checkpoints = await this.getAllCheckpoints();
const matches = checkpoints.filter((checkpoint) => {
const messages = checkpoint.values.messages || [];
return messages.some((message: BaseMessage) => {
const content =
typeof message.content === 'string'
? message.content
: JSON.stringify(message.content);
return content.toLowerCase().includes(searchTerm.toLowerCase());
});
});
console.log(`\n找到 ${matches.length} 个包含 "${searchTerm}" 的检查点:`);
matches.forEach((checkpoint, index) => {
console.log(
`${index + 1}. ${checkpoint.id?.slice(0, 8)} - 步骤 ${
checkpoint.metadata?.step
}`
);
});
return matches;
}
}
// 使用示例
export async function demonstrateStateViewer() {
console.log('\n=== 状态查看器演示 ===');
const config = { configurable: { thread_id: 'viewer-demo' } };
// 执行图
await compiledGraph.invoke(
{ messages: [new HumanMessage('开始状态查看器演示')] },
config
);
// 使用状态查看器
const viewer = new StateViewer(compiledGraph, config);
// 显示时间线
await viewer.showTimeline();
// 获取检查点并比较
const checkpoints = await viewer.getAllCheckpoints();
if (checkpoints.length >= 2) {
await viewer.compareCheckpoints(checkpoints[0].id, checkpoints[1].id);
}
// 搜索特定内容
await viewer.findCheckpointsByContent('步骤');
}
// 运行示例
if (require.main === module) {
demonstrateBasicTimeTravel()
.then(() => demonstrateStateViewer())
.catch(console.error);
}
🔄 状态回滚
状态回滚示例:
import '../../utils/loadEnv';
import { StateGraph, Annotation, MemorySaver } from '@langchain/langgraph';
import { BaseMessage, HumanMessage, AIMessage } from '@langchain/core/messages';
// 定义状态
const StateAnnotation = Annotation.Root({
messages: Annotation<BaseMessage[]>({
reducer: (x, y) => x.concat(y),
}),
counter: Annotation<number>({
reducer: (x, y) => y ?? x,
default: () => 0,
}),
operations: Annotation<string[]>({
reducer: (x, y) => x.concat(y),
default: () => [],
}),
});
// 节点函数
async function incrementNode(state: typeof StateAnnotation.State) {
return {
messages: [new AIMessage(`计数器增加到 ${state.counter + 1}`)],
counter: state.counter + 1,
operations: ['increment'],
};
}
async function multiplyNode(state: typeof StateAnnotation.State) {
return {
messages: [new AIMessage(`计数器乘以2变为 ${state.counter * 2}`)],
counter: state.counter * 2,
operations: ['multiply'],
};
}
async function resetNode(state: typeof StateAnnotation.State) {
return {
messages: [new AIMessage('计数器重置为0')],
counter: 0,
operations: ['reset'],
};
}
// 创建图
const graph = new StateGraph(StateAnnotation)
.addNode('increment', incrementNode)
.addNode('multiply', multiplyNode)
.addNode('reset', resetNode)
.addEdge('__start__', 'increment')
.addEdge('increment', 'multiply')
.addEdge('multiply', 'reset')
.addEdge('reset', '__end__');
const memory = new MemorySaver();
const compiledGraph = graph.compile({ checkpointer: memory });
// 状态回滚演示
export async function demonstrateStateRollback() {
console.log('=== 状态回滚演示 ===');
const config = { configurable: { thread_id: 'rollback-demo' } };
// 1. 执行完整流程
console.log('\n1. 执行完整流程...');
await compiledGraph.invoke(
{ messages: [new HumanMessage('开始计数器操作')] },
config
);
// 2. 查看所有检查点
console.log('\n2. 查看所有检查点...');
const checkpoints = [];
for await (const checkpoint of compiledGraph.getStateHistory(config)) {
checkpoints.push({
id: checkpoint.config.configurable.checkpoint_id,
values: checkpoint.values,
metadata: checkpoint.metadata,
});
}
checkpoints.reverse().forEach((checkpoint, index) => {
console.log(`\n检查点 ${index + 1}:`);
console.log(` ID: ${checkpoint.id?.slice(0, 8)}`);
console.log(` 计数器: ${checkpoint.values.counter}`);
console.log(` 操作: ${checkpoint.values.operations?.join(', ') || '无'}`);
console.log(` 消息数: ${checkpoint.values.messages?.length || 0}`);
});
// 3. 回滚到特定检查点
if (checkpoints.length >= 3) {
console.log('\n3. 回滚到乘法操作后的状态...');
const targetCheckpoint = checkpoints[2]; // 乘法操作后的检查点
// 从特定检查点继续执行
const rollbackConfig = {
...config,
configurable: {
...config.configurable,
checkpoint_id: targetCheckpoint.id,
},
};
// 获取回滚后的状态
const rolledBackState = await compiledGraph.getState(rollbackConfig);
console.log('回滚后的状态:');
console.log(' 计数器:', rolledBackState.values.counter);
console.log(' 操作历史:', rolledBackState.values.operations);
// 从回滚点继续执行不同的操作
console.log('\n4. 从回滚点执行不同的操作...');
const newResult = await compiledGraph.invoke(
{
messages: [new HumanMessage('从回滚点继续')],
counter: rolledBackState.values.counter,
operations: ['rollback_continue'],
},
{ configurable: { thread_id: 'rollback-continue' } }
);
console.log('新分支的最终状态:');
console.log(' 计数器:', newResult.counter);
console.log(' 操作历史:', newResult.operations);
}
return checkpoints;
}
// 高级回滚工具
export class StateRollbackManager {
private graph: any;
private config: any;
constructor(graph: any, config: any) {
this.graph = graph;
this.config = config;
}
// 获取所有检查点
async getCheckpoints() {
const checkpoints = [];
for await (const checkpoint of this.graph.getStateHistory(this.config)) {
checkpoints.push({
id: checkpoint.config.configurable.checkpoint_id,
values: checkpoint.values,
metadata: checkpoint.metadata,
config: checkpoint.config,
});
}
return checkpoints.reverse();
}
// 回滚到指定检查点
async rollbackTo(checkpointId: string) {
const rollbackConfig = {
...this.config,
configurable: {
...this.config.configurable,
checkpoint_id: checkpointId,
},
};
const state = await this.graph.getState(rollbackConfig);
console.log(`已回滚到检查点: ${checkpointId.slice(0, 8)}`);
console.log('当前状态:', state.values);
return { state, config: rollbackConfig };
}
// 回滚到指定步数之前
async rollbackSteps(steps: number) {
const checkpoints = await this.getCheckpoints();
if (steps >= checkpoints.length) {
throw new Error(
`无法回滚 ${steps} 步,只有 ${checkpoints.length} 个检查点`
);
}
const targetCheckpoint = checkpoints[checkpoints.length - 1 - steps];
return this.rollbackTo(targetCheckpoint.id);
}
// 回滚到满足条件的检查点
async rollbackToCondition(condition: (state: any) => boolean) {
const checkpoints = await this.getCheckpoints();
// 从最新开始向前查找
for (let i = checkpoints.length - 1; i >= 0; i--) {
if (condition(checkpoints[i].values)) {
return this.rollbackTo(checkpoints[i].id);
}
}
throw new Error('未找到满足条件的检查点');
}
// 创建分支
async createBranch(checkpointId: string, branchName: string) {
const rollbackConfig = {
configurable: {
thread_id: `${this.config.configurable.thread_id}-${branchName}`,
checkpoint_id: checkpointId,
},
};
const state = await this.graph.getState(rollbackConfig);
console.log(
`创建分支 "${branchName}" 从检查点: ${checkpointId.slice(0, 8)}`
);
return {
branchConfig: {
configurable: { thread_id: rollbackConfig.configurable.thread_id },
},
initialState: state.values,
};
}
// 比较分支
async compareBranches(branchId1: string, branchId2: string) {
const state1 = await this.graph.getState({
configurable: { thread_id: branchId1 },
});
const state2 = await this.graph.getState({
configurable: { thread_id: branchId2 },
});
console.log('\n=== 分支比较 ===');
console.log(`分支1 (${branchId1}):`);
console.log(' 计数器:', state1.values.counter);
console.log(' 操作:', state1.values.operations);
console.log(`分支2 (${branchId2}):`);
console.log(' 计数器:', state2.values.counter);
console.log(' 操作:', state2.values.operations);
return { branch1: state1.values, branch2: state2.values };
}
}
// 智能回滚示例
export async function demonstrateSmartRollback() {
console.log('\n=== 智能回滚演示 ===');
const config = { configurable: { thread_id: 'smart-rollback' } };
// 执行一系列操作
await compiledGraph.invoke(
{ messages: [new HumanMessage('开始智能回滚演示')] },
config
);
const manager = new StateRollbackManager(compiledGraph, config);
// 1. 回滚到计数器大于1的状态
console.log('\n1. 回滚到计数器大于1的状态...');
try {
const { state } = await manager.rollbackToCondition(
(state) => state.counter > 1
);
console.log('找到状态:', state.values);
} catch (error) {
console.log('未找到满足条件的状态');
}
// 2. 回滚2步
console.log('\n2. 回滚2步...');
try {
const { state } = await manager.rollbackSteps(2);
console.log('回滚后状态:', state.values);
} catch (error) {
console.log('回滚失败:', error.message);
}
// 3. 创建分支
console.log('\n3. 创建分支...');
const checkpoints = await manager.getCheckpoints();
if (checkpoints.length > 1) {
const { branchConfig } = await manager.createBranch(
checkpoints[1].id,
'alternative-path'
);
// 在新分支中执行不同操作
await compiledGraph.invoke(
{
messages: [new HumanMessage('分支操作')],
operations: ['branch_operation'],
},
branchConfig
);
// 比较分支
await manager.compareBranches(
config.configurable.thread_id,
branchConfig.configurable.thread_id
);
}
}
// 运行示例
if (require.main === module) {
demonstrateStateRollback()
.then(() => demonstrateSmartRollback())
.catch(console.error);
}
高级功能
✏️ 状态修改和分支探索
时间旅行不仅可以查看历史状态,还可以修改历史状态后重新执行。通过状态回滚示例中的 StateRollbackManager 类,你可以:
- 修改历史状态:回滚到特定检查点并修改状态
- 创建执行分支:从同一历史点尝试不同路径
- 比较不同结果:对比不同分支的执行结果
实际应用场景
🐛 调试复杂问题
当 AI 应用出现意外行为时,时间旅行可以帮助你快速定位问题。通过查看状态变化历史和回滚到问题发生前的状态,你可以:
- 分析状态变化趋势
- 定位问题发生的具体时间点
- 测试修复方案的有效性
🧪 A/B 测试
使用分支功能测试不同的决策路径和参数配置,比较不同方案的效果。
🔧 错误恢复
当执行过程中出现错误时,可以回滚到安全状态:
错误恢复示例:
import '../../utils/loadEnv';
import { StateGraph, Annotation, MemorySaver } from '@langchain/langgraph';
import { BaseMessage, HumanMessage, AIMessage } from '@langchain/core/messages';
// 定义状态
const StateAnnotation = Annotation.Root({
messages: Annotation<BaseMessage[]>({
reducer: (x, y) => x.concat(y),
}),
error_count: Annotation<number>({
reducer: (x, y) => y ?? x,
default: () => 0,
}),
last_safe_checkpoint: Annotation<string>({
reducer: (x, y) => y ?? x,
default: () => '',
}),
});
// 模拟可能出错的节点
async function riskyNode(state: typeof StateAnnotation.State) {
const { messages, error_count } = state;
// 模拟随机错误
if (Math.random() < 0.3) {
throw new Error(`处理失败 (错误次数: ${error_count + 1})`);
}
return {
messages: [new AIMessage('处理成功完成')],
last_safe_checkpoint: 'after_risky_processing',
};
}
// 错误处理节点
async function errorHandler(state: typeof StateAnnotation.State) {
const { error_count } = state;
return {
messages: [
new AIMessage(`检测到错误,准备恢复 (第${error_count + 1}次尝试)`),
],
error_count: error_count + 1,
};
}
// 恢复决策节点
function shouldRecover(state: typeof StateAnnotation.State) {
const { error_count } = state;
// 最多重试3次
if (error_count < 3) {
return 'recover';
}
return 'fail';
}
// 创建带错误恢复的图
function createErrorRecoveryGraph() {
const graph = new StateGraph(StateAnnotation)
.addNode('risky_processing', riskyNode)
.addNode('error_handler', errorHandler)
.addNode('recovery_decision', (state) => state)
.addEdge('__start__', 'risky_processing')
.addEdge('error_handler', 'recovery_decision')
.addConditionalEdges('recovery_decision', shouldRecover, {
recover: 'risky_processing',
fail: '__end__',
})
.addEdge('risky_processing', '__end__');
return graph;
}
// 时间旅行错误恢复示例
export async function demonstrateErrorRecovery() {
const memory = new MemorySaver();
const graph = createErrorRecoveryGraph().compile({
checkpointer: memory,
interruptBefore: ['error_handler'], // 在错误处理前中断
});
const config = { configurable: { thread_id: 'error-recovery-demo' } };
try {
console.log('=== 开始执行可能出错的流程 ===');
const result = await graph.invoke(
{
messages: [new HumanMessage('请处理这个任务')],
},
config
);
console.log(
'执行成功:',
result.messages[result.messages.length - 1].content
);
} catch (error) {
console.log('捕获到错误:', error.message);
// 使用时间旅行查看错误发生时的状态
const checkpoints = [];
for await (const checkpoint of graph.getStateHistory(config)) {
checkpoints.push({
checkpoint_id: checkpoint.config.configurable.checkpoint_id,
error_count: checkpoint.values.error_count,
messages_count: checkpoint.values.messages.length,
});
}
console.log('\n=== 检查点历史 ===');
checkpoints.forEach((cp, index) => {
console.log(`${index + 1}. ID: ${cp.checkpoint_id?.slice(0, 8)}...`);
console.log(` 错误次数: ${cp.error_count}`);
console.log(` 消息数量: ${cp.messages_count}`);
});
// 选择一个安全的检查点进行恢复
if (checkpoints.length > 1) {
const safeCheckpoint = checkpoints[1]; // 选择第二个检查点
console.log(`\n=== 恢复到安全检查点 ===`);
const recoveredState = await graph.getState({
...config,
configurable: {
...config.configurable,
checkpoint_id: safeCheckpoint.checkpoint_id,
},
});
console.log('恢复的状态:');
console.log('- 错误次数:', recoveredState.values.error_count);
console.log('- 消息数量:', recoveredState.values.messages.length);
// 从恢复点重新执行
try {
const retryResult = await graph.invoke(null, {
...config,
configurable: {
...config.configurable,
checkpoint_id: safeCheckpoint.checkpoint_id,
},
});
console.log(
'重试成功:',
retryResult.messages[retryResult.messages.length - 1].content
);
} catch (retryError) {
console.log('重试仍然失败:', retryError.message);
}
}
}
}
// 智能错误恢复策略
export class SmartErrorRecovery {
private memory: MemorySaver;
private maxRetries: number;
private backoffStrategy: 'linear' | 'exponential';
constructor(
options: {
maxRetries?: number;
backoffStrategy?: 'linear' | 'exponential';
} = {}
) {
this.memory = new MemorySaver();
this.maxRetries = options.maxRetries ?? 3;
this.backoffStrategy = options.backoffStrategy ?? 'exponential';
}
async executeWithRecovery<T>(
graph: any,
input: any,
config: any,
recoveryStrategy?: (error: Error, attempt: number) => Promise<any>
): Promise<T> {
let lastError: Error;
let attempt = 0;
while (attempt < this.maxRetries) {
try {
return await graph.invoke(input, config);
} catch (error) {
lastError = error as Error;
attempt++;
console.log(`执行失败 (第${attempt}次尝试):`, error.message);
if (attempt < this.maxRetries) {
// 应用退避策略
const delay = this.calculateBackoff(attempt);
console.log(`等待 ${delay}ms 后重试...`);
await new Promise((resolve) => setTimeout(resolve, delay));
// 如果提供了自定义恢复策略,执行它
if (recoveryStrategy) {
try {
input = await recoveryStrategy(error as Error, attempt);
} catch (recoveryError) {
console.log('恢复策略失败:', recoveryError.message);
}
}
// 尝试回滚到最近的安全检查点
await this.rollbackToSafeState(graph, config);
}
}
}
throw new Error(
`执行失败,已重试 ${this.maxRetries} 次。最后错误: ${lastError.message}`
);
}
private calculateBackoff(attempt: number): number {
const baseDelay = 1000; // 1秒
switch (this.backoffStrategy) {
case 'linear':
return baseDelay * attempt;
case 'exponential':
return baseDelay * Math.pow(2, attempt - 1);
default:
return baseDelay;
}
}
private async rollbackToSafeState(graph: any, config: any) {
try {
// 获取检查点历史
const checkpoints = [];
for await (const checkpoint of graph.getStateHistory(config)) {
checkpoints.push(checkpoint);
if (checkpoints.length >= 5) break; // 只检查最近5个检查点
}
// 寻找安全的检查点(没有错误的状态)
for (const checkpoint of checkpoints.slice(1)) {
// 跳过当前状态
if (checkpoint.values.error_count === 0) {
console.log(
'回滚到安全检查点:',
checkpoint.config.configurable.checkpoint_id?.slice(0, 8)
);
// 更新配置以使用安全检查点
config.configurable.checkpoint_id =
checkpoint.config.configurable.checkpoint_id;
break;
}
}
} catch (error) {
console.log('回滚失败:', error.message);
}
}
}
// 使用示例
export async function demonstrateSmartRecovery() {
const recovery = new SmartErrorRecovery({
maxRetries: 3,
backoffStrategy: 'exponential',
});
const graph = createErrorRecoveryGraph().compile({
checkpointer: recovery['memory'],
});
const config = { configurable: { thread_id: 'smart-recovery-demo' } };
try {
const result = await recovery.executeWithRecovery(
graph,
{ messages: [new HumanMessage('执行任务')] },
config,
// 自定义恢复策略
async (error, attempt) => {
console.log(`应用恢复策略 (第${attempt}次尝试)`);
return {
messages: [new HumanMessage(`重试任务 (第${attempt}次)`)],
};
}
);
console.log('最终成功:', result);
} catch (error) {
console.log('最终失败:', error.message);
}
}
// 运行示例
if (require.main === module) {
demonstrateErrorRecovery()
.then(() => demonstrateSmartRecovery())
.catch(console.error);
}
时间旅行工作流
🔄 完整的时间旅行流程
🛠️ 时间旅行工具类
时间旅行工具类:
import '../../utils/loadEnv';
import { StateGraph, Annotation, MemorySaver } from '@langchain/langgraph';
import { BaseMessage, HumanMessage, AIMessage } from '@langchain/core/messages';
// 时间旅行工具包
export class TimeTravelToolkit {
private memory: MemorySaver;
private graph: any;
private config: any;
constructor(graph: any, config: any) {
this.memory = new MemorySaver();
this.graph = graph;
this.config = config;
}
// 获取所有检查点
async getAllCheckpoints() {
const checkpoints = [];
for await (const checkpoint of this.graph.getStateHistory(this.config)) {
checkpoints.push({
id: checkpoint.config.configurable.checkpoint_id,
step: checkpoint.metadata?.step,
values: checkpoint.values,
config: checkpoint.config,
metadata: checkpoint.metadata,
});
}
return checkpoints.reverse(); // 按时间顺序排列
}
// 获取特定检查点的详细信息
async getCheckpointDetails(checkpointId: string) {
const state = await this.graph.getState({
...this.config,
configurable: {
...this.config.configurable,
checkpoint_id: checkpointId,
},
});
return state;
}
// 比较两个检查点的差异
async compareCheckpoints(checkpointId1: string, checkpointId2: string) {
const state1 = await this.getCheckpointDetails(checkpointId1);
const state2 = await this.getCheckpointDetails(checkpointId2);
const differences = {
messages: this.compareMessages(
state1.values.messages,
state2.values.messages
),
otherFields: this.compareObjects(
{ ...state1.values, messages: undefined },
{ ...state2.values, messages: undefined }
),
};
return differences;
}
// 回滚到指定检查点
async rollbackTo(checkpointId: string) {
const newConfig = {
...this.config,
configurable: {
...this.config.configurable,
checkpoint_id: checkpointId,
},
};
return newConfig;
}
// 从指定检查点重新执行
async resumeFrom(checkpointId: string, newInput?: any) {
const config = await this.rollbackTo(checkpointId);
if (newInput) {
return await this.graph.invoke(newInput, config);
} else {
return await this.graph.invoke(null, config);
}
}
// 创建检查点分支
async createBranch(checkpointId: string, branchName: string) {
const branchConfig = {
...this.config,
configurable: {
...this.config.configurable,
checkpoint_id: checkpointId,
thread_id: `${this.config.configurable.thread_id}_${branchName}`,
},
};
return branchConfig;
}
// 可视化检查点历史
async visualizeHistory() {
const checkpoints = await this.getAllCheckpoints();
console.log('\n=== 检查点历史时间线 ===');
checkpoints.forEach((checkpoint, index) => {
const id = checkpoint.id?.slice(0, 8) || 'unknown';
const step = checkpoint.metadata?.step || index;
const messageCount = checkpoint.values.messages?.length || 0;
console.log(
`${index + 1}. [${id}] Step ${step} - ${messageCount} messages`
);
// 显示最后一条消息的摘要
if (checkpoint.values.messages && checkpoint.values.messages.length > 0) {
const lastMessage =
checkpoint.values.messages[checkpoint.values.messages.length - 1];
const content = lastMessage.content.slice(0, 50);
console.log(` └─ "${content}${content.length >= 50 ? '...' : ''}"`);
}
});
}
// 搜索包含特定内容的检查点
async searchCheckpoints(searchTerm: string) {
const checkpoints = await this.getAllCheckpoints();
return checkpoints.filter((checkpoint) => {
const messages = checkpoint.values.messages || [];
return messages.some((message: BaseMessage) => {
const content =
typeof message.content === 'string'
? message.content
: JSON.stringify(message.content);
return content.toLowerCase().includes(searchTerm.toLowerCase());
});
});
}
// 导出检查点数据
async exportCheckpoint(checkpointId: string) {
const details = await this.getCheckpointDetails(checkpointId);
return {
checkpoint_id: checkpointId,
exported_at: new Date().toISOString(),
state: details.values,
config: details.config,
metadata: details.metadata,
};
}
// 导入检查点数据
async importCheckpoint(exportedData: any) {
// 这里可以实现从导出数据恢复检查点的逻辑
console.log('导入检查点:', exportedData.checkpoint_id);
return exportedData;
}
// 私有方法:比较消息数组
private compareMessages(messages1: BaseMessage[], messages2: BaseMessage[]) {
const diff = {
added: [] as BaseMessage[],
removed: [] as BaseMessage[],
modified: [] as { old: BaseMessage; new: BaseMessage }[],
};
const maxLength = Math.max(messages1.length, messages2.length);
for (let i = 0; i < maxLength; i++) {
const msg1 = messages1[i];
const msg2 = messages2[i];
if (!msg1 && msg2) {
diff.added.push(msg2);
} else if (msg1 && !msg2) {
diff.removed.push(msg1);
} else if (msg1 && msg2 && msg1.content !== msg2.content) {
diff.modified.push({ old: msg1, new: msg2 });
}
}
return diff;
}
// 私有方法:比较对象
private compareObjects(obj1: any, obj2: any) {
const diff: any = {};
const allKeys = new Set([
...Object.keys(obj1 || {}),
...Object.keys(obj2 || {}),
]);
for (const key of allKeys) {
if (obj1[key] !== obj2[key]) {
diff[key] = {
old: obj1[key],
new: obj2[key],
};
}
}
return diff;
}
}
// 时间旅行调试器
export class TimeTravelDebugger extends TimeTravelToolkit {
private breakpoints: Set<string> = new Set();
private watchedVariables: Map<string, any> = new Map();
// 设置断点
setBreakpoint(checkpointId: string) {
this.breakpoints.add(checkpointId);
console.log(`断点已设置: ${checkpointId.slice(0, 8)}`);
}
// 移除断点
removeBreakpoint(checkpointId: string) {
this.breakpoints.delete(checkpointId);
console.log(`断点已移除: ${checkpointId.slice(0, 8)}`);
}
// 检查是否命中断点
async checkBreakpoint(checkpointId: string) {
if (this.breakpoints.has(checkpointId)) {
console.log(`\n🔴 断点命中: ${checkpointId.slice(0, 8)}`);
const details = await this.getCheckpointDetails(checkpointId);
console.log('当前状态:', details.values);
return true;
}
return false;
}
// 监视变量
watchVariable(name: string, value: any) {
const oldValue = this.watchedVariables.get(name);
this.watchedVariables.set(name, value);
if (oldValue !== undefined && oldValue !== value) {
console.log(`\n👁️ 变量变化: ${name}`);
console.log(` 旧值: ${JSON.stringify(oldValue)}`);
console.log(` 新值: ${JSON.stringify(value)}`);
}
}
// 单步执行
async stepThrough() {
const checkpoints = await this.getAllCheckpoints();
for (let i = 0; i < checkpoints.length; i++) {
const checkpoint = checkpoints[i];
console.log(`\n📍 步骤 ${i + 1}/${checkpoints.length}`);
console.log(`检查点: ${checkpoint.id?.slice(0, 8)}`);
// 检查断点
if (checkpoint.id && (await this.checkBreakpoint(checkpoint.id))) {
// 等待用户输入继续
await this.waitForUserInput();
}
// 显示状态变化
if (i > 0) {
const prevCheckpoint = checkpoints[i - 1];
if (checkpoint.id && prevCheckpoint.id) {
const diff = await this.compareCheckpoints(
prevCheckpoint.id,
checkpoint.id
);
if (diff.messages.added.length > 0) {
console.log(
'新增消息:',
diff.messages.added.map((m) => m.content)
);
}
}
}
}
}
// 等待用户输入
private async waitForUserInput(): Promise<void> {
return new Promise((resolve) => {
console.log('按 Enter 继续...');
process.stdin.once('data', () => {
resolve();
});
});
}
}
// 使用示例
export async function demonstrateTimeTravelToolkit() {
// 创建一个简单的图用于演示
const StateAnnotation = Annotation.Root({
messages: Annotation<BaseMessage[]>({
reducer: (x, y) => x.concat(y),
}),
counter: Annotation<number>({
reducer: (x, y) => y ?? x,
default: () => 0,
}),
});
const graph = new StateGraph(StateAnnotation)
.addNode('step1', async (state) => ({
messages: [new AIMessage('执行步骤1')],
counter: state.counter + 1,
}))
.addNode('step2', async (state) => ({
messages: [new AIMessage('执行步骤2')],
counter: state.counter + 1,
}))
.addNode('step3', async (state) => ({
messages: [new AIMessage('执行步骤3')],
counter: state.counter + 1,
}))
.addEdge('__start__', 'step1')
.addEdge('step1', 'step2')
.addEdge('step2', 'step3')
.addEdge('step3', '__end__');
const memory = new MemorySaver();
const compiledGraph = graph.compile({ checkpointer: memory });
const config = { configurable: { thread_id: 'toolkit-demo' } };
// 执行图
console.log('=== 执行图 ===');
await compiledGraph.invoke(
{ messages: [new HumanMessage('开始执行')] },
config
);
// 使用时间旅行工具包
const toolkit = new TimeTravelToolkit(compiledGraph, config);
console.log('\n=== 时间旅行工具包演示 ===');
// 可视化历史
await toolkit.visualizeHistory();
// 获取所有检查点
const checkpoints = await toolkit.getAllCheckpoints();
if (checkpoints.length >= 2) {
// 比较检查点
console.log('\n=== 检查点比较 ===');
const diff = await toolkit.compareCheckpoints(
checkpoints[0].id,
checkpoints[1].id
);
console.log('消息差异:', diff.messages);
// 搜索检查点
console.log('\n=== 搜索检查点 ===');
const searchResults = await toolkit.searchCheckpoints('步骤');
console.log(`找到 ${searchResults.length} 个匹配的检查点`);
// 导出检查点
console.log('\n=== 导出检查点 ===');
const exported = await toolkit.exportCheckpoint(checkpoints[0].id);
console.log('导出成功:', exported.checkpoint_id);
}
}
// 调试器使用示例
export async function demonstrateTimeTravelDebugger() {
// 使用与上面相同的图设置
const StateAnnotation = Annotation.Root({
messages: Annotation<BaseMessage[]>({
reducer: (x, y) => x.concat(y),
}),
counter: Annotation<number>({
reducer: (x, y) => y ?? x,
default: () => 0,
}),
});
const graph = new StateGraph(StateAnnotation)
.addNode('step1', async (state) => ({
messages: [new AIMessage('调试步骤1')],
counter: state.counter + 1,
}))
.addNode('step2', async (state) => ({
messages: [new AIMessage('调试步骤2')],
counter: state.counter + 1,
}))
.addEdge('__start__', 'step1')
.addEdge('step1', 'step2')
.addEdge('step2', '__end__');
const memory = new MemorySaver();
const compiledGraph = graph.compile({ checkpointer: memory });
const config = { configurable: { thread_id: 'debugger-demo' } };
// 执行图
await compiledGraph.invoke(
{ messages: [new HumanMessage('开始调试')] },
config
);
// 使用调试器
const timeDebugger = new TimeTravelDebugger(compiledGraph, config);
console.log('\n=== 时间旅行调试器演示 ===');
// 获取检查点并设置断点
const checkpoints = await timeDebugger.getAllCheckpoints();
if (checkpoints.length > 0) {
timeDebugger.setBreakpoint(checkpoints[0].id);
}
// 监视变量
timeDebugger.watchVariable('counter', 0);
timeDebugger.watchVariable('counter', 1);
timeDebugger.watchVariable('counter', 2);
console.log('调试器设置完成');
}
// 运行示例
if (require.main === module) {
demonstrateTimeTravelToolkit()
.then(() => demonstrateTimeTravelDebugger())
.catch(console.error);
}
最佳实践
✅ 使用建议
最佳实践
-
合理设置检查点频率
- 在关键决策点设置检查点
- 避免过于频繁的检查点影响性能
-
状态设计考虑
- 保持状态结构简洁
- 避免在状态中存储大量临时数据
-
调试策略
- 先查看状态变化趋势
- 再定位具体问题点
- 最后进行修复验证
⚠️ 注意事项
注意事项
-
性能影响
- 检查点存储会占用额外空间
- 频繁的状态快照可能影响执行速度
-
状态一致性
- 修改历史状态时要保持数据一致性
- 注意状态修改对后续执行的影响
-
并发安全
- 多线程环境下要注意检查点的同步
- 避免同时修改同一个检查点
🎯 调试技巧
调试技巧示例:
import '../../utils/loadEnv';
import { StateGraph, Annotation, MemorySaver } from '@langchain/langgraph';
import { BaseMessage, HumanMessage, AIMessage } from '@langchain/core/messages';
// 时间旅行调试技巧集合
export class TimeTravelDebuggingTips {
private graph: any;
private config: any;
constructor(graph: any, config: any) {
this.graph = graph;
this.config = config;
}
// 技巧1: 快速定位问题检查点
async findProblemCheckpoint(errorPattern: string) {
console.log(`🔍 搜索包含错误模式的检查点: "${errorPattern}"`);
const problemCheckpoints = [];
for await (const checkpoint of this.graph.getStateHistory(this.config)) {
const messages = checkpoint.values.messages || [];
const hasError = messages.some((msg: BaseMessage) => {
const content =
typeof msg.content === 'string'
? msg.content
: JSON.stringify(msg.content);
return content.toLowerCase().includes(errorPattern.toLowerCase());
});
if (hasError) {
problemCheckpoints.push({
id: checkpoint.config.configurable.checkpoint_id,
step: checkpoint.metadata?.step,
values: checkpoint.values,
});
}
}
console.log(`找到 ${problemCheckpoints.length} 个可能有问题的检查点`);
return problemCheckpoints;
}
// 技巧2: 状态变化追踪
async trackStateChanges(fieldName: string) {
console.log(`📊 追踪字段变化: ${fieldName}`);
const changes = [];
let previousValue: any = undefined;
for await (const checkpoint of this.graph.getStateHistory(this.config)) {
const currentValue = checkpoint.values[fieldName];
if (previousValue !== undefined && currentValue !== previousValue) {
changes.push({
checkpointId: checkpoint.config.configurable.checkpoint_id,
step: checkpoint.metadata?.step,
from: previousValue,
to: currentValue,
change: this.describeChange(previousValue, currentValue),
});
}
previousValue = currentValue;
}
console.log(`发现 ${changes.length} 次变化:`);
changes.reverse().forEach((change, index) => {
console.log(`${index + 1}. 步骤 ${change.step}: ${change.change}`);
console.log(` 检查点: ${change.checkpointId?.slice(0, 8)}`);
});
return changes;
}
// 技巧3: 性能瓶颈分析
async analyzePerformanceBottlenecks() {
console.log('⚡ 分析性能瓶颈...');
const checkpoints = [];
for await (const checkpoint of this.graph.getStateHistory(this.config)) {
checkpoints.push(checkpoint);
}
checkpoints.reverse(); // 按时间顺序
const stepDurations = [];
for (let i = 1; i < checkpoints.length; i++) {
const prev = checkpoints[i - 1];
const curr = checkpoints[i];
// 这里可以添加实际的时间戳比较逻辑
const duration = Math.random() * 1000; // 模拟持续时间
stepDurations.push({
step: curr.metadata?.step || i,
duration,
checkpointId: curr.config.configurable.checkpoint_id,
});
}
// 找出最慢的步骤
const slowSteps = stepDurations
.sort((a, b) => b.duration - a.duration)
.slice(0, 3);
console.log('🐌 最慢的3个步骤:');
slowSteps.forEach((step, index) => {
console.log(
`${index + 1}. 步骤 ${step.step}: ${step.duration.toFixed(2)}ms`
);
console.log(` 检查点: ${step.checkpointId?.slice(0, 8)}`);
});
return slowSteps;
}
// 技巧4: 消息流分析
async analyzeMessageFlow() {
console.log('💬 分析消息流...');
const messageStats = {
totalMessages: 0,
messageTypes: new Map<string, number>(),
averageLength: 0,
longestMessage: '',
shortestMessage: '',
};
let totalLength = 0;
let minLength = Infinity;
let maxLength = 0;
for await (const checkpoint of this.graph.getStateHistory(this.config)) {
const messages = checkpoint.values.messages || [];
messages.forEach((msg: BaseMessage) => {
messageStats.totalMessages++;
// 统计消息类型
const type = msg.constructor.name;
messageStats.messageTypes.set(
type,
(messageStats.messageTypes.get(type) || 0) + 1
);
// 统计消息长度
const content =
typeof msg.content === 'string'
? msg.content
: JSON.stringify(msg.content);
const length = content.length;
totalLength += length;
if (length > maxLength) {
maxLength = length;
messageStats.longestMessage = content.slice(0, 100) + '...';
}
if (length < minLength) {
minLength = length;
messageStats.shortestMessage = content;
}
});
}
messageStats.averageLength = totalLength / messageStats.totalMessages;
console.log('📈 消息统计:');
console.log(`总消息数: ${messageStats.totalMessages}`);
console.log(`平均长度: ${messageStats.averageLength.toFixed(2)} 字符`);
console.log(`最长消息: ${messageStats.longestMessage}`);
console.log(`最短消息: ${messageStats.shortestMessage}`);
console.log('\n消息类型分布:');
messageStats.messageTypes.forEach((count, type) => {
const percentage = ((count / messageStats.totalMessages) * 100).toFixed(
1
);
console.log(`${type}: ${count} (${percentage}%)`);
});
return messageStats;
}
// 技巧5: 状态大小监控
async monitorStateSize() {
console.log('📏 监控状态大小...');
const sizeHistory = [];
for await (const checkpoint of this.graph.getStateHistory(this.config)) {
const stateSize = JSON.stringify(checkpoint.values).length;
sizeHistory.push({
checkpointId: checkpoint.config.configurable.checkpoint_id,
step: checkpoint.metadata?.step,
size: stateSize,
});
}
sizeHistory.reverse(); // 按时间顺序
// 找出状态大小异常增长的点
const growthPoints = [];
for (let i = 1; i < sizeHistory.length; i++) {
const prev = sizeHistory[i - 1];
const curr = sizeHistory[i];
const growth = curr.size - prev.size;
const growthRate = (growth / prev.size) * 100;
if (growthRate > 50) {
// 增长超过50%
growthPoints.push({
step: curr.step,
growth,
growthRate,
checkpointId: curr.checkpointId,
});
}
}
console.log('📊 状态大小历史:');
sizeHistory.forEach((item, index) => {
console.log(`步骤 ${item.step}: ${item.size} 字节`);
});
if (growthPoints.length > 0) {
console.log('\n⚠️ 状态大小异常增长点:');
growthPoints.forEach((point, index) => {
console.log(
`${index + 1}. 步骤 ${point.step}: +${
point.growth
} 字节 (+${point.growthRate.toFixed(1)}%)`
);
});
}
return { sizeHistory, growthPoints };
}
// 技巧6: 错误模式识别
async identifyErrorPatterns() {
console.log('🔍 识别错误模式...');
const errorPatterns = new Map<string, number>();
const errorCheckpoints = [];
for await (const checkpoint of this.graph.getStateHistory(this.config)) {
const messages = checkpoint.values.messages || [];
messages.forEach((msg: BaseMessage) => {
const content =
typeof msg.content === 'string'
? msg.content
: JSON.stringify(msg.content);
// 检查常见错误模式
const patterns = [
/error/i,
/failed/i,
/exception/i,
/timeout/i,
/invalid/i,
/not found/i,
/unauthorized/i,
];
patterns.forEach((pattern) => {
if (pattern.test(content)) {
const patternName = pattern.source;
errorPatterns.set(
patternName,
(errorPatterns.get(patternName) || 0) + 1
);
errorCheckpoints.push({
checkpointId: checkpoint.config.configurable.checkpoint_id,
step: checkpoint.metadata?.step,
pattern: patternName,
content: content.slice(0, 100),
});
}
});
});
}
console.log('🚨 发现的错误模式:');
errorPatterns.forEach((count, pattern) => {
console.log(`${pattern}: ${count} 次`);
});
return { errorPatterns, errorCheckpoints };
}
// 技巧7: 状态一致性检查
async checkStateConsistency() {
console.log('✅ 检查状态一致性...');
const inconsistencies = [];
for await (const checkpoint of this.graph.getStateHistory(this.config)) {
const state = checkpoint.values;
// 检查消息数组是否为空但有其他数据
if (
(!state.messages || state.messages.length === 0) &&
Object.keys(state).length > 1
) {
inconsistencies.push({
type: 'empty_messages_with_data',
checkpointId: checkpoint.config.configurable.checkpoint_id,
step: checkpoint.metadata?.step,
description: '消息数组为空但状态包含其他数据',
});
}
// 检查是否有未定义的字段
Object.entries(state).forEach(([key, value]) => {
if (value === undefined) {
inconsistencies.push({
type: 'undefined_field',
checkpointId: checkpoint.config.configurable.checkpoint_id,
step: checkpoint.metadata?.step,
field: key,
description: `字段 ${key} 的值为 undefined`,
});
}
});
}
if (inconsistencies.length === 0) {
console.log('✅ 未发现状态一致性问题');
} else {
console.log(`⚠️ 发现 ${inconsistencies.length} 个一致性问题:`);
inconsistencies.forEach((issue, index) => {
console.log(`${index + 1}. ${issue.description}`);
console.log(
` 步骤: ${issue.step}, 检查点: ${issue.checkpointId?.slice(0, 8)}`
);
});
}
return inconsistencies;
}
// 辅助方法:描述变化
private describeChange(from: any, to: any): string {
if (typeof from === 'number' && typeof to === 'number') {
const diff = to - from;
return `${from} → ${to} (${diff > 0 ? '+' : ''}${diff})`;
}
if (Array.isArray(from) && Array.isArray(to)) {
return `数组长度: ${from.length} → ${to.length}`;
}
return `${JSON.stringify(from)} → ${JSON.stringify(to)}`;
}
}
// 调试会话管理器
export class DebuggingSession {
private tips: TimeTravelDebuggingTips;
private findings: Map<string, any> = new Map();
constructor(graph: any, config: any) {
this.tips = new TimeTravelDebuggingTips(graph, config);
}
// 运行完整的调试分析
async runFullAnalysis() {
console.log('🔬 开始完整调试分析...\n');
try {
// 1. 性能分析
console.log('=== 性能分析 ===');
const performance = await this.tips.analyzePerformanceBottlenecks();
this.findings.set('performance', performance);
// 2. 消息流分析
console.log('\n=== 消息流分析 ===');
const messageFlow = await this.tips.analyzeMessageFlow();
this.findings.set('messageFlow', messageFlow);
// 3. 状态大小监控
console.log('\n=== 状态大小监控 ===');
const stateSize = await this.tips.monitorStateSize();
this.findings.set('stateSize', stateSize);
// 4. 错误模式识别
console.log('\n=== 错误模式识别 ===');
const errorPatterns = await this.tips.identifyErrorPatterns();
this.findings.set('errorPatterns', errorPatterns);
// 5. 状态一致性检查
console.log('\n=== 状态一致性检查 ===');
const consistency = await this.tips.checkStateConsistency();
this.findings.set('consistency', consistency);
// 生成总结报告
this.generateSummaryReport();
} catch (error) {
console.error('调试分析过程中出现错误:', error);
}
}
// 生成总结报告
private generateSummaryReport() {
console.log('\n📋 === 调试总结报告 ===');
const performance = this.findings.get('performance') || [];
const messageFlow = this.findings.get('messageFlow') || {};
const stateSize = this.findings.get('stateSize') || {};
const errorPatterns = this.findings.get('errorPatterns') || {};
const consistency = this.findings.get('consistency') || [];
console.log('\n🎯 关键发现:');
if (performance.length > 0) {
console.log(`• 发现 ${performance.length} 个性能瓶颈`);
}
if (messageFlow.totalMessages) {
console.log(`• 总共处理了 ${messageFlow.totalMessages} 条消息`);
}
if (stateSize.growthPoints && stateSize.growthPoints.length > 0) {
console.log(
`• 发现 ${stateSize.growthPoints.length} 个状态大小异常增长点`
);
}
if (errorPatterns.errorPatterns && errorPatterns.errorPatterns.size > 0) {
console.log(`• 识别出 ${errorPatterns.errorPatterns.size} 种错误模式`);
}
if (consistency.length > 0) {
console.log(`• 发现 ${consistency.length} 个状态一致性问题`);
}
console.log('\n💡 建议:');
if (performance.length > 0) {
console.log('• 优化性能瓶颈步骤');
}
if (stateSize.growthPoints && stateSize.growthPoints.length > 0) {
console.log('• 检查状态大小异常增长的原因');
}
if (consistency.length > 0) {
console.log('• 修复状态一致性问题');
}
}
// 获取调试发现
getFindings() {
return this.findings;
}
}
// 使用示例
export async function demonstrateDebuggingTips() {
// 创建一个包含各种问题的图用于演示
const StateAnnotation = Annotation.Root({
messages: Annotation<BaseMessage[]>({
reducer: (x, y) => x.concat(y),
}),
counter: Annotation<number>({
reducer: (x, y) => y ?? x,
default: () => 0,
}),
data: Annotation<any>({
reducer: (x, y) => y ?? x,
default: () => ({}),
}),
});
const graph = new StateGraph(StateAnnotation)
.addNode('normal_step', async (state) => ({
messages: [new AIMessage('正常步骤')],
counter: state.counter + 1,
}))
.addNode('slow_step', async (state) => {
// 模拟慢步骤
await new Promise((resolve) => setTimeout(resolve, 100));
return {
messages: [new AIMessage('慢步骤完成')],
counter: state.counter + 1,
};
})
.addNode('error_step', async (state) => ({
messages: [new AIMessage('Error: 处理失败')],
counter: state.counter + 1,
}))
.addNode('data_growth_step', async (state) => ({
messages: [new AIMessage('数据增长步骤')],
counter: state.counter + 1,
data: {
...state.data,
[`item_${state.counter}`]: 'large data'.repeat(100),
},
}))
.addEdge('__start__', 'normal_step')
.addEdge('normal_step', 'slow_step')
.addEdge('slow_step', 'error_step')
.addEdge('error_step', 'data_growth_step')
.addEdge('data_growth_step', '__end__');
const memory = new MemorySaver();
const compiledGraph = graph.compile({ checkpointer: memory });
const config = { configurable: { thread_id: 'debugging-demo' } };
// 执行图
console.log('=== 执行包含问题的图 ===');
await compiledGraph.invoke(
{ messages: [new HumanMessage('开始调试演示')] },
config
);
// 使用调试技巧
const tips = new TimeTravelDebuggingTips(compiledGraph, config);
console.log('\n=== 调试技巧演示 ===');
// 演示各种调试技巧
await tips.findProblemCheckpoint('error');
await tips.trackStateChanges('counter');
await tips.analyzePerformanceBottlenecks();
await tips.analyzeMessageFlow();
await tips.monitorStateSize();
await tips.identifyErrorPatterns();
await tips.checkStateConsistency();
}
// 调试会话演示
export async function demonstrateDebuggingSession() {
// 使用相同的图设置
const StateAnnotation = Annotation.Root({
messages: Annotation<BaseMessage[]>({
reducer: (x, y) => x.concat(y),
}),
counter: Annotation<number>({
reducer: (x, y) => y ?? x,
default: () => 0,
}),
});
const graph = new StateGraph(StateAnnotation)
.addNode('step1', async (state) => ({
messages: [new AIMessage('步骤1')],
counter: state.counter + 1,
}))
.addNode('step2', async (state) => ({
messages: [new AIMessage('Failed: 步骤2失败')],
counter: state.counter + 1,
}))
.addEdge('__start__', 'step1')
.addEdge('step1', 'step2')
.addEdge('step2', '__end__');
const memory = new MemorySaver();
const compiledGraph = graph.compile({ checkpointer: memory });
const config = { configurable: { thread_id: 'session-demo' } };
// 执行图
await compiledGraph.invoke(
{ messages: [new HumanMessage('开始会话演示')] },
config
);
// 使用调试会话
const session = new DebuggingSession(compiledGraph, config);
await session.runFullAnalysis();
}
// 运行示例
if (require.main === module) {
demonstrateDebuggingTips()
.then(() => demonstrateDebuggingSession())
.catch(console.error);
}
与前端开发工具对比
| 功能 | LangGraphJS 时间旅行 | Redux DevTools | Chrome DevTools |
|---|---|---|---|
| 状态查看 | ✅ 完整状态快照 | ✅ Action 历史 | ✅ 变量检查 |
| 状态回滚 | ✅ 任意检查点 | ✅ 时间旅行 | ❌ 不支持 |
| 状态修改 | ✅ 修改后重执行 | ✅ 状态编辑 | ✅ 变量修改 |
| 分支探索 | ✅ 多路径测试 | ❌ 单一路径 | ❌ 单一路径 |
| 持久化 | ✅ 数据库存储 | ❌ 会话级别 | ❌ 会话级别 |
小结与延伸
时间旅行功能为 LangGraphJS 应用提供了强大的调试和测试能力。通过检查点机制,开发者可以:
- 深入理解应用的执行过程和状态变化
- 快速定位复杂问题的根本原因
- 灵活测试不同的执行路径和参数配置
- 安全恢复从错误状态中恢复正常执行
掌握时间旅行功能后,你将能够更加自信地开发和调试复杂的 AI 应用。在下一章节中,我们将学习架构模式,了解如何设计和组织大型的 LangGraphJS 应用。