跳到主要内容

🗃️ 状态(State)管理

引言

在 LangGraphJS 中,状态(State) 是图的核心数据结构,它代表了应用在任何时刻的完整快照。如果你熟悉前端开发,可以将状态理解为:

  • React 的 State + Context 的组合
  • Redux Store 的全局状态管理
  • Vuex/Pinia 的状态存储

状态在 LangGraph 中扮演着"数据总线"的角色,所有节点都可以读取和修改状态,实现节点间的数据共享和通信。

与前端开发的类比
  • 状态定义 ≈ TypeScript 接口定义 + Redux Store Schema
  • Annotation ≈ React PropTypes + TypeScript 类型定义
  • Reducer ≈ Redux Reducer 函数
  • 状态更新 ≈ setState() 或 dispatch(action)

概念解释

状态的核心特性

📊 共享数据结构

  • 所有节点都可以访问完整的状态
  • 节点可以读取任何状态字段
  • 节点可以更新部分或全部状态字段

🔄 快照机制

  • 状态代表应用在特定时刻的完整快照
  • 每次状态更新都会创建新的快照
  • 支持状态的持久化和恢复

🛠️ 类型安全

  • 完全支持 TypeScript 类型推导
  • 编译时类型检查
  • 运行时类型验证

Annotation 对象

Annotation 是 LangGraphJS 中定义状态结构的核心工具,类似于 TypeScript 的接口定义,但功能更强大。

基本语法:

const StateAnnotation = Annotation.Root({
fieldName: Annotation<FieldType>(),

// 带 Reducer 和 默认值
fieldWithReducer: Annotation<FieldType>({
reducer: (current, update) => mergedValue,
default: () => defaultValue
})
});

Reducers(状态合并器)

Reducer 定义了如何将节点的更新应用到现有状态上,这与 Redux 的 Reducer 概念非常相似。

默认行为: 覆盖现有值 自定义 Reducer: 实现累加、合并、过滤等逻辑

代码示例

让我们通过完整的示例来理解状态管理:

基础状态管理示例:

import '../../utils/loadEnv';
import { StateGraph, Annotation, START, END } from '@langchain/langgraph';
import { RunnableConfig } from '@langchain/core/runnables';

// 基础状态定义示例
const BasicStateAnnotation = Annotation.Root({
// 基础数据类型
userId: Annotation<string>(),
userName: Annotation<string>(),
counter: Annotation<number>(),
isActive: Annotation<boolean>(),

// 复杂数据类型
userProfile: Annotation<{
email: string;
age: number;
preferences: string[];
}>(),

// 数组类型
tags: Annotation<string[]>(),

// 时间戳
createdAt: Annotation<Date>(),
updatedAt: Annotation<Date>(),
});

// 节点函数示例
const initializeNode = (
state: typeof BasicStateAnnotation.State,
config?: RunnableConfig
) => {
console.log('初始化用户状态...');

return {
userId: 'user_001',
userName: '张三',
counter: 0,
isActive: true,
userProfile: {
email: 'zhangsan@example.com',
age: 25,
preferences: ['技术', '阅读'],
},
tags: ['新用户'],
createdAt: new Date(),
updatedAt: new Date(),
};
};

const updateCounterNode = (
state: typeof BasicStateAnnotation.State,
config?: RunnableConfig
) => {
console.log(`更新计数器: ${state.counter} -> ${state.counter + 1}`);

return {
counter: state.counter + 1,
updatedAt: new Date(),
};
};

const addTagNode = (
state: typeof BasicStateAnnotation.State,
config?: RunnableConfig
) => {
const newTag = `活跃用户_${state.counter}`;
console.log(`添加标签: ${newTag}`);

return {
tags: [...state.tags, newTag],
updatedAt: new Date(),
};
};

const updateProfileNode = (
state: typeof BasicStateAnnotation.State,
config?: RunnableConfig
) => {
console.log('更新用户偏好...');

return {
userProfile: {
...state.userProfile,
preferences: [...state.userProfile.preferences, '编程'],
},
updatedAt: new Date(),
};
};

// 条件路由函数
const shouldContinue = (state: typeof BasicStateAnnotation.State) => {
return state.counter < 3 ? 'update_counter' : 'finalize';
};

const finalizeNode = (
state: typeof BasicStateAnnotation.State,
config?: RunnableConfig
) => {
console.log('完成状态更新');

return {
isActive: false,
updatedAt: new Date(),
};
};

// 构建图
const basicStateGraph = new StateGraph(BasicStateAnnotation)
.addNode('initialize', initializeNode)
.addNode('update_counter', updateCounterNode)
.addNode('add_tag', addTagNode)
.addNode('update_profile', updateProfileNode)
.addNode('finalize', finalizeNode)

// 定义执行流程
.addEdge(START, 'initialize')
.addEdge('initialize', 'update_counter')
.addEdge('update_counter', 'add_tag')
.addEdge('add_tag', 'update_profile')
.addConditionalEdges('update_profile', shouldContinue, {
update_counter: 'update_counter',
finalize: 'finalize',
})
.addEdge('finalize', END)
.compile();

// 带默认值的状态示例
const StateWithDefaults = Annotation.Root({
// 带默认值的基础类型
counter: Annotation<number>(),

// 带默认值的数组
items: Annotation<string[]>(),

// 带默认值的对象
config: Annotation<{
theme: string;
language: string;
notifications: boolean;
}>(),

// 动态默认值
sessionId: Annotation<string>(),

timestamp: Annotation<Date>(),
});

const defaultsExampleNode = (state: typeof StateWithDefaults.State) => {
// 处理可能的undefined值,提供默认值
const counter = state.counter || 0;
const items = state.items || [];
const config = state.config || {
theme: 'light',
language: 'zh-CN',
notifications: true,
};
const sessionId = state.sessionId || `session_${Date.now()}`;
const timestamp = state.timestamp || new Date();

console.log('当前状态:', {
counter,
items,
config,
sessionId,
timestamp,
});

return {
counter: counter + 1,
items: [...items, `item_${counter + 1}`],
config,
sessionId,
timestamp: new Date(),
};
};

const defaultsGraph = new StateGraph(StateWithDefaults)
.addNode('example', defaultsExampleNode)
.addEdge(START, 'example')
.addEdge('example', END)
.compile();

// 运行示例
async function runBasicStateExamples() {
console.log('=== 基础状态管理示例 ===\n');

// 示例 1: 基础状态操作
console.log('1. 基础状态操作:');
const result1 = await basicStateGraph.invoke({});
console.log('最终状态:', {
userId: result1.userId,
userName: result1.userName,
counter: result1.counter,
isActive: result1.isActive,
tags: result1.tags,
userProfile: result1.userProfile,
});
console.log();

// 示例 2: 带默认值的状态
console.log('2. 带默认值的状态:');
const result2 = await defaultsGraph.invoke({});
console.log('默认值状态结果:', result2);
console.log();

// 示例 3: 部分状态更新
console.log('3. 部分状态更新:');
const result3 = await basicStateGraph.invoke({
userId: 'existing_user',
userName: '李四',
counter: 5,
});
console.log('部分更新结果:', {
userId: result3.userId,
userName: result3.userName,
counter: result3.counter,
tags: result3.tags,
});
console.log();

// 示例 4: 流式状态更新
console.log('4. 流式状态更新:');
const stream = await basicStateGraph.stream({
userId: 'stream_user',
userName: '王五',
});

for await (const chunk of stream) {
const nodeNames = Object.keys(chunk);
for (const nodeName of nodeNames) {
console.log(`节点 ${nodeName} 更新:`, {
counter: chunk[nodeName].counter,
tags: chunk[nodeName].tags,
isActive: chunk[nodeName].isActive,
});
}
}
}

// 状态验证示例
const validateState = (state: typeof BasicStateAnnotation.State) => {
const errors: string[] = [];

if (!state.userId) {
errors.push('用户ID不能为空');
}

if (!state.userName) {
errors.push('用户名不能为空');
}

if (state.counter < 0) {
errors.push('计数器不能为负数');
}

if (!state.userProfile?.email) {
errors.push('用户邮箱不能为空');
}

if (errors.length > 0) {
throw new Error(`状态验证失败: ${errors.join(', ')}`);
}

return true;
};

const safeUpdateNode = (state: typeof BasicStateAnnotation.State) => {
try {
validateState(state);

return {
counter: state.counter + 1,
isActive: true,
updatedAt: new Date(),
};
} catch (error) {
console.error('状态更新失败:', error.message);

return {
isActive: false,
updatedAt: new Date(),
};
}
};

// 导出
export {
BasicStateAnnotation,
StateWithDefaults,
basicStateGraph,
defaultsGraph,
runBasicStateExamples,
validateState,
safeUpdateNode,
};

// 如果直接运行此文件
if (require.main === module) {
runBasicStateExamples().catch(console.error);
}

可视化说明

下面的图表展示了状态在 LangGraph 中的作用和流动:

图表说明:

  • 中心状态:所有节点都围绕状态进行操作
  • 读取操作:节点可以访问完整状态
  • 更新操作:通过 Reducer 处理状态更新
  • 数据流:状态更新后立即可被其他节点访问

实践指导

1. 基础状态定义

import { Annotation } from '@langchain/langgraph';

// ✅ 简单状态定义
const SimpleState = Annotation.Root({
// 基础类型
userId: Annotation<string>(),
count: Annotation<number>(),
isActive: Annotation<boolean>(),

// 复杂类型
userInfo: Annotation<{
name: string;
email: string;
preferences: string[];
}>(),

// 数组类型
tags: Annotation<string[]>(),

// 可选类型
metadata: Annotation<Record<string, any>>()
});

2. 带默认值的状态

const StateWithDefaults = Annotation.Root({


// 对象默认值
config: Annotation<{
theme: string;
language: string;
}>({
default: () => ({
theme: 'light',
language: 'zh-CN'
})
}),

// 动态默认值
timestamp: Annotation<Date>({
default: () => new Date()
})
});

3. 使用 Reducer 的状态

const StateWithReducers = Annotation.Root({
// 累加 Reducer
totalScore: Annotation<number>({
reducer: (current: number, update: number) => current + update,
default: () => 0
}),

// 数组合并 Reducer
logs: Annotation<string[]>({
reducer: (current: string[], update: string[]) => [...current, ...update],
default: () => []
}),

// 对象合并 Reducer
settings: Annotation<Record<string, any>>({
reducer: (current: Record<string, any>, update: Record<string, any>) => ({
...current,
...update
}),
default: () => ({})
}),

// 条件更新 Reducer
maxValue: Annotation<number>({
reducer: (current: number, update: number) => Math.max(current, update),
default: () => 0
})
});

4. 消息状态管理

消息状态管理示例:

import '../../utils/loadEnv';
import {
StateGraph,
Annotation,
START,
END,
MessagesAnnotation,
} from '@langchain/langgraph';
import {
BaseMessage,
HumanMessage,
AIMessage,
SystemMessage,
} from '@langchain/core/messages';
import { messagesStateReducer } from '@langchain/langgraph';
import { RunnableConfig } from '@langchain/core/runnables';

// 基础消息状态示例
const BasicMessagesState = Annotation.Root({
// 使用内置的消息状态
messages: Annotation<BaseMessage[]>({
reducer: messagesStateReducer,
default: () => [],
}),
});

// 扩展的消息状态示例
const ExtendedMessagesState = Annotation.Root({
// 继承 MessagesAnnotation 的所有字段
...MessagesAnnotation.spec,

// 添加额外的状态字段
userId: Annotation<string>(),
sessionId: Annotation<string>(),

// 对话上下文
context: Annotation<Record<string, any>>({
reducer: (current: Record<string, any>, update: Record<string, any>) => ({
...current,
...update,
}),
default: () => ({}),
}),

// 消息统计
messageCount: Annotation<number>({
reducer: (current: number, update: number) => current + update,
default: () => 0,
}),

// 最后活动时间
lastActivity: Annotation<Date>(),
});

// 聊天机器人节点
const chatbotNode = (
state: typeof BasicMessagesState.State,
config?: RunnableConfig
) => {
const lastMessage = state.messages[state.messages.length - 1];
const messageContent =
typeof lastMessage.content === 'string' ? lastMessage.content : '';

console.log(`收到消息: ${messageContent}`);

// 简单的回复逻辑
let responseContent = '';
if (messageContent.includes('你好')) {
responseContent = '你好!我是AI助手,有什么可以帮助你的吗?';
} else if (messageContent.includes('天气')) {
responseContent = '今天天气不错,适合外出活动。';
} else if (messageContent.includes('再见')) {
responseContent = '再见!祝你有美好的一天!';
} else {
responseContent = `我理解你说的是:"${messageContent}"。请问还有什么其他问题吗?`;
}

const aiMessage = new AIMessage({
content: responseContent,
});

return {
messages: [aiMessage],
};
};

// 扩展聊天机器人节点
const extendedChatbotNode = (
state: typeof ExtendedMessagesState.State,
config?: RunnableConfig
) => {
const lastMessage = state.messages[state.messages.length - 1];
const messageCount = state.messageCount || 0;
const messageContent =
typeof lastMessage.content === 'string' ? lastMessage.content : '';

console.log(
`用户 ${state.userId} 发送消息 #${messageCount + 1}: ${messageContent}`
);

// 更智能的回复逻辑
let responseContent = '';
const context = state.context || {};

if (messageContent.includes('你好')) {
responseContent = `你好 ${context.userName || '朋友'}!这是我们的第 ${
messageCount + 1
} 次对话。`;
} else if (messageContent.includes('我的名字是')) {
const name = messageContent.replace('我的名字是', '').trim();
responseContent = `很高兴认识你,${name}!我会记住你的名字。`;

return {
messages: [new AIMessage({ content: responseContent })],
context: { ...context, userName: name },
messageCount: 1,
lastActivity: new Date(),
};
} else if (context.userName) {
responseContent = `${context.userName},关于"${messageContent}",我来为你解答...`;
} else {
responseContent = `关于"${messageContent}",我来为你解答...`;
}

return {
messages: [new AIMessage({ content: responseContent })],
messageCount: 1,
lastActivity: new Date(),
};
};

// 消息过滤节点
const messageFilterNode = (
state: typeof ExtendedMessagesState.State,
config?: RunnableConfig
) => {
const messages = state.messages || [];

// 过滤掉系统消息,只保留用户和AI消息
const filteredMessages = messages.filter(
(msg) => msg._getType() === 'human' || msg._getType() === 'ai'
);

console.log(`过滤消息: ${messages.length} -> ${filteredMessages.length}`);

return {
messages: filteredMessages,
};
};

// 消息摘要节点
const messageSummaryNode = (
state: typeof ExtendedMessagesState.State,
config?: RunnableConfig
) => {
const messages = state.messages || [];
const messageCount = state.messageCount || 0;

if (messages.length > 10) {
// 当消息过多时,创建摘要
const summary = `对话摘要:共 ${messageCount} 条消息,最近讨论了 ${messages
.slice(-3)
.map((m) => m.content)
.join('、')}`;

const summaryMessage = new SystemMessage({
content: summary,
});

// 保留最近的几条消息和摘要
const recentMessages = messages.slice(-5);

return {
messages: [summaryMessage, ...recentMessages],
};
}

return {}; // 不需要摘要时不更新
};

// 构建基础消息图
const basicMessagesGraph = new StateGraph(BasicMessagesState)
.addNode('chatbot', chatbotNode)
.addEdge(START, 'chatbot')
.addEdge('chatbot', END)
.compile();

// 构建扩展消息图
const extendedMessagesGraph = new StateGraph(ExtendedMessagesState)
.addNode('chatbot', extendedChatbotNode)
.addNode('filter', messageFilterNode)
.addNode('summary', messageSummaryNode)
.addEdge(START, 'filter')
.addEdge('filter', 'chatbot')
.addEdge('chatbot', 'summary')
.addEdge('summary', END)
.compile();

// 多轮对话状态管理
const MultiTurnChatState = Annotation.Root({
...MessagesAnnotation.spec,

// 对话轮次
turnCount: Annotation<number>({
reducer: (current: number, update: number) => current + update,
default: () => 0,
}),

// 对话主题
topics: Annotation<string[]>({
reducer: (current: string[], update: string[]) => [...current, ...update],
default: () => [],
}),

// 用户意图历史
intentHistory: Annotation<string[]>({
reducer: (current: string[], update: string[]) => [...current, ...update],
default: () => [],
}),

// 对话状态
conversationState: Annotation<'active' | 'waiting' | 'ended'>(),
});

const multiTurnChatNode = (
state: typeof MultiTurnChatState.State,
config?: RunnableConfig
) => {
const lastMessage = state.messages[state.messages.length - 1];
const turnCount = (state.turnCount || 0) + 1;
const messageContent =
typeof lastMessage.content === 'string' ? lastMessage.content : '';

// 简单的意图识别
let intent = 'general';
let topic = 'general';

if (messageContent.includes('天气')) {
intent = 'weather_query';
topic = '天气';
} else if (messageContent.includes('时间')) {
intent = 'time_query';
topic = '时间';
} else if (messageContent.includes('再见')) {
intent = 'goodbye';
topic = '告别';
}

let responseContent = '';
let conversationState: 'active' | 'waiting' | 'ended' = 'active';

switch (intent) {
case 'weather_query':
responseContent = '今天天气晴朗,温度适宜,适合外出活动。';
break;
case 'time_query':
responseContent = `现在是 ${new Date().toLocaleString()}`;
break;
case 'goodbye':
responseContent = '再见!感谢你的对话,期待下次见面!';
conversationState = 'ended';
break;
default:
responseContent = `这是第 ${turnCount} 轮对话。关于"${messageContent}",我来为你解答...`;
}

return {
messages: [new AIMessage({ content: responseContent })],
turnCount: 1,
topics: [topic],
intentHistory: [intent],
conversationState,
};
};

const multiTurnGraph = new StateGraph(MultiTurnChatState)
.addNode('chat', multiTurnChatNode)
.addEdge(START, 'chat')
.addEdge('chat', END)
.compile();

// 运行示例
async function runMessagesStateExamples() {
console.log('=== 消息状态管理示例 ===\n');

// 示例 1: 基础消息对话
console.log('1. 基础消息对话:');
const result1 = await basicMessagesGraph.invoke({
messages: [new HumanMessage({ content: '你好,AI助手!' })],
});
console.log('对话结果:');
result1.messages.forEach((msg, index) => {
console.log(` ${index + 1}. ${msg._getType()}: ${msg.content}`);
});
console.log();

// 示例 2: 扩展消息对话
console.log('2. 扩展消息对话:');
const result2 = await extendedMessagesGraph.invoke({
messages: [new HumanMessage({ content: '我的名字是张三' })],
userId: 'user_001',
sessionId: 'session_001',
});
console.log('扩展对话结果:', {
messageCount: result2.messageCount,
context: result2.context,
lastActivity: result2.lastActivity,
messages: result2.messages.map((m) => `${m._getType()}: ${m.content}`),
});
console.log();

// 示例 3: 多轮对话
console.log('3. 多轮对话:');

// 第一轮
let chatState = await multiTurnGraph.invoke({
messages: [new HumanMessage({ content: '今天天气怎么样?' })],
});

console.log('第一轮对话:', {
turnCount: chatState.turnCount,
topics: chatState.topics,
intentHistory: chatState.intentHistory,
conversationState: chatState.conversationState,
});

// 第二轮
chatState = await multiTurnGraph.invoke({
...chatState,
messages: [
...chatState.messages,
new HumanMessage({ content: '现在几点了?' }),
],
});

console.log('第二轮对话:', {
turnCount: chatState.turnCount,
topics: chatState.topics,
intentHistory: chatState.intentHistory,
conversationState: chatState.conversationState,
});

// 第三轮
chatState = await multiTurnGraph.invoke({
...chatState,
messages: [
...chatState.messages,
new HumanMessage({ content: '谢谢,再见!' }),
],
});

console.log('第三轮对话:', {
turnCount: chatState.turnCount,
topics: chatState.topics,
intentHistory: chatState.intentHistory,
conversationState: chatState.conversationState,
});

console.log('\n完整对话历史:');
chatState.messages.forEach((msg, index) => {
console.log(` ${index + 1}. ${msg._getType()}: ${msg.content}`);
});
}

// 消息状态工具函数
const getLastUserMessage = (messages: BaseMessage[]): HumanMessage | null => {
for (let i = messages.length - 1; i >= 0; i--) {
if (messages[i]._getType() === 'human') {
return messages[i] as HumanMessage;
}
}
return null;
};

const getLastAIMessage = (messages: BaseMessage[]): AIMessage | null => {
for (let i = messages.length - 1; i >= 0; i--) {
if (messages[i]._getType() === 'ai') {
return messages[i] as AIMessage;
}
}
return null;
};

const getMessagesByType = (
messages: BaseMessage[],
type: string
): BaseMessage[] => {
return messages.filter((msg) => msg._getType() === type);
};

const formatConversationHistory = (messages: BaseMessage[]): string => {
return messages
.map((msg, index) => `${index + 1}. ${msg._getType()}: ${msg.content}`)
.join('\n');
};

// 导出
export {
BasicMessagesState,
ExtendedMessagesState,
MultiTurnChatState,
basicMessagesGraph,
extendedMessagesGraph,
multiTurnGraph,
runMessagesStateExamples,
getLastUserMessage,
getLastAIMessage,
getMessagesByType,
formatConversationHistory,
};

// 如果直接运行此文件
if (require.main === module) {
runMessagesStateExamples().catch(console.error);
}

5. 高级状态模式

高级状态模式示例:

import '../../utils/loadEnv';
import { StateGraph, Annotation, START, END } from '@langchain/langgraph';
import { RunnableConfig } from '@langchain/core/runnables';

// 高级状态模式:工作流状态
const WorkflowState = Annotation.Root({
// 当前步骤
currentStep: Annotation<string>(),

// 步骤历史
stepHistory: Annotation<string[]>({
reducer: (current: string[], update: string[]) => [...current, ...update],
default: () => [],
}),

// 步骤数据
stepData: Annotation<Record<string, any>>({
reducer: (current: Record<string, any>, update: Record<string, any>) => ({
...current,
...update,
}),
default: () => ({}),
}),

// 完成状态
isComplete: Annotation<boolean>(),

// 错误信息
errors: Annotation<string[]>({
reducer: (current: string[], update: string[]) => [...current, ...update],
default: () => [],
}),

// 进度百分比
progress: Annotation<number>({
reducer: (current: number, update: number) => Math.max(current, update),
default: () => 0,
}),
});

// 数据处理状态模式
const DataProcessingState = Annotation.Root({
// 输入数据
inputData: Annotation<any[]>(),

// 处理结果
results: Annotation<any[]>({
reducer: (current: any[], update: any[]) => [...current, ...update],
default: () => [],
}),

// 处理统计
stats: Annotation<{
processed: number;
failed: number;
total: number;
startTime: Date;
endTime?: Date;
}>({
reducer: (current, update) => ({
processed: current.processed + (update.processed || 0),
failed: current.failed + (update.failed || 0),
total: Math.max(current.total, update.total || current.total),
startTime: current.startTime || update.startTime || new Date(),
endTime: update.endTime || current.endTime,
}),
default: () => ({
processed: 0,
failed: 0,
total: 0,
startTime: new Date(),
}),
}),

// 当前批次
currentBatch: Annotation<number>(),

// 批次大小
batchSize: Annotation<number>(),
});

// 缓存状态模式
const CacheState = Annotation.Root({
// 缓存数据
cache: Annotation<Record<string, any>>({
reducer: (current: Record<string, any>, update: Record<string, any>) => ({
...current,
...update,
}),
default: () => ({}),
}),

// 缓存命中统计
cacheStats: Annotation<{
hits: number;
misses: number;
totalRequests: number;
}>({
reducer: (current, update) => ({
hits: current.hits + (update.hits || 0),
misses: current.misses + (update.misses || 0),
totalRequests: current.totalRequests + (update.totalRequests || 0),
}),
default: () => ({ hits: 0, misses: 0, totalRequests: 0 }),
}),

// 缓存过期时间
cacheExpiry: Annotation<Record<string, Date>>({
reducer: (current: Record<string, Date>, update: Record<string, Date>) => ({
...current,
...update,
}),
default: () => ({}),
}),
});

// 工作流节点实现
const initWorkflowNode = (
state: typeof WorkflowState.State,
config?: RunnableConfig
) => {
console.log('初始化工作流...');

return {
currentStep: 'validation',
stepHistory: ['init'],
stepData: {
startTime: new Date(),
workflowId: `workflow_${Date.now()}`,
},
isComplete: false,
progress: 10,
};
};

const validationNode = (
state: typeof WorkflowState.State,
config?: RunnableConfig
) => {
console.log('执行数据验证...');

// 模拟验证逻辑
const isValid = Math.random() > 0.2; // 80% 成功率

if (!isValid) {
return {
currentStep: 'error',
stepHistory: ['validation'],
errors: ['数据验证失败'],
progress: 20,
};
}

return {
currentStep: 'processing',
stepHistory: ['validation'],
stepData: {
validationTime: new Date(),
isValid: true,
},
progress: 30,
};
};

const processingNode = (
state: typeof WorkflowState.State,
config?: RunnableConfig
) => {
console.log('执行数据处理...');

// 模拟处理逻辑
const processingTime = Math.random() * 1000 + 500; // 500-1500ms

return new Promise((resolve) => {
setTimeout(() => {
resolve({
currentStep: 'completion',
stepHistory: ['processing'],
stepData: {
processingTime,
processedAt: new Date(),
},
progress: 80,
});
}, processingTime);
});
};

const completionNode = (
state: typeof WorkflowState.State,
config?: RunnableConfig
) => {
console.log('完成工作流...');

return {
currentStep: 'done',
stepHistory: ['completion'],
isComplete: true,
stepData: {
completedAt: new Date(),
totalSteps: state.stepHistory.length + 1,
},
progress: 100,
};
};

// 路由函数
const workflowRouter = (state: typeof WorkflowState.State) => {
switch (state.currentStep) {
case 'validation':
return 'validation';
case 'processing':
return 'processing';
case 'completion':
return 'completion';
case 'error':
return 'error_handler';
case 'done':
return END;
default:
return 'validation';
}
};

const errorHandlerNode = (
state: typeof WorkflowState.State,
config?: RunnableConfig
) => {
console.log('处理错误:', state.errors);

return {
currentStep: 'done',
stepHistory: ['error_handled'],
isComplete: false,
progress: 0,
};
};

// 构建工作流图
const workflowGraph = new StateGraph(WorkflowState)
.addNode('init', initWorkflowNode)
.addNode('validation', validationNode)
.addNode('processing', processingNode)
.addNode('completion', completionNode)
.addNode('error_handler', errorHandlerNode)
.addEdge(START, 'init')
.addConditionalEdges('init', workflowRouter)
.addConditionalEdges('validation', workflowRouter)
.addConditionalEdges('processing', workflowRouter)
.addConditionalEdges('completion', workflowRouter)
.addEdge('error_handler', END)
.compile();

// 数据处理节点实现
const initDataProcessingNode = (
state: typeof DataProcessingState.State,
config?: RunnableConfig
) => {
const inputData = state.inputData || [];
const batchSize = state.batchSize || 10;

console.log(`初始化数据处理,总数据量: ${inputData.length}`);

return {
currentBatch: 0,
batchSize,
stats: {
processed: 0,
failed: 0,
total: inputData.length,
startTime: new Date(),
},
};
};

const processBatchNode = (
state: typeof DataProcessingState.State,
config?: RunnableConfig
) => {
const inputData = state.inputData || [];
const currentBatch = state.currentBatch || 0;
const batchSize = state.batchSize || 10;

const startIndex = currentBatch * batchSize;
const endIndex = Math.min(startIndex + batchSize, inputData.length);
const batch = inputData.slice(startIndex, endIndex);

console.log(
`处理批次 ${currentBatch + 1},数据范围: ${startIndex}-${endIndex}`
);

// 模拟批次处理
const processedResults = [];
let processed = 0;
let failed = 0;

for (const item of batch) {
try {
// 模拟处理逻辑,90% 成功率
if (Math.random() > 0.1) {
processedResults.push({
original: item,
processed: `processed_${item}`,
timestamp: new Date(),
});
processed++;
} else {
throw new Error(`处理失败: ${item}`);
}
} catch (error) {
failed++;
console.error(error.message);
}
}

return {
currentBatch: currentBatch + 1,
results: processedResults,
stats: {
processed,
failed,
total: state.stats?.total || inputData.length,
startTime: state.stats?.startTime || new Date(),
},
};
};

const dataProcessingRouter = (state: typeof DataProcessingState.State) => {
const inputData = state.inputData || [];
const currentBatch = state.currentBatch || 0;
const batchSize = state.batchSize || 10;

const totalBatches = Math.ceil(inputData.length / batchSize);

if (currentBatch < totalBatches) {
return 'process_batch';
} else {
return 'finalize';
}
};

const finalizeDataProcessingNode = (
state: typeof DataProcessingState.State,
config?: RunnableConfig
) => {
console.log('完成数据处理');

return {
stats: {
processed: state.stats?.processed || 0,
failed: state.stats?.failed || 0,
total: state.stats?.total || 0,
startTime: state.stats?.startTime || new Date(),
endTime: new Date(),
},
};
};

// 构建数据处理图
const dataProcessingGraph = new StateGraph(DataProcessingState)
.addNode('init', initDataProcessingNode)
.addNode('process_batch', processBatchNode)
.addNode('finalize', finalizeDataProcessingNode)
.addEdge(START, 'init')
.addConditionalEdges('init', dataProcessingRouter)
.addConditionalEdges('process_batch', dataProcessingRouter)
.addEdge('finalize', END)
.compile();

// 缓存节点实现
const cacheGetNode = (
state: typeof CacheState.State,
config?: RunnableConfig
) => {
const key = config?.configurable?.cacheKey as string;
const cache = state.cache || {};
const cacheExpiry = state.cacheExpiry || {};

console.log(`查找缓存: ${key}`);

// 检查缓存是否存在且未过期
const now = new Date();
const expiry = cacheExpiry[key];
const isExpired = expiry && now > expiry;

if (cache[key] && !isExpired) {
console.log('缓存命中');
return {
cacheStats: {
hits: 1,
misses: 0,
totalRequests: 1,
},
};
} else {
console.log('缓存未命中');
return {
cacheStats: {
hits: 0,
misses: 1,
totalRequests: 1,
},
};
}
};

const cacheSetNode = (
state: typeof CacheState.State,
config?: RunnableConfig
) => {
const key = config?.configurable?.cacheKey as string;
const value = config?.configurable?.cacheValue;
const ttl = (config?.configurable?.cacheTTL as number) || 3600000; // 默认1小时

console.log(`设置缓存: ${key}`);

const expiry = new Date(Date.now() + ttl);

return {
cache: {
[key]: value,
},
cacheExpiry: {
[key]: expiry,
},
};
};

// 构建缓存图
const cacheGraph = new StateGraph(CacheState)
.addNode('get', cacheGetNode)
.addNode('set', cacheSetNode)
.addEdge(START, 'get')
.addEdge('get', 'set')
.addEdge('set', END)
.compile();

// 运行示例
async function runAdvancedStateExamples() {
console.log('=== 高级状态模式示例 ===\n');

// 示例 1: 工作流状态管理
console.log('1. 工作流状态管理:');
const workflowResult = await workflowGraph.invoke({});
console.log('工作流结果:', {
currentStep: workflowResult.currentStep,
isComplete: workflowResult.isComplete,
progress: workflowResult.progress,
stepHistory: workflowResult.stepHistory,
stepData: workflowResult.stepData,
errors: workflowResult.errors,
});
console.log();

// 示例 2: 数据处理状态管理
console.log('2. 数据处理状态管理:');
const testData = Array.from({ length: 25 }, (_, i) => `item_${i + 1}`);
const dataProcessingResult = await dataProcessingGraph.invoke({
inputData: testData,
batchSize: 5,
});

console.log('数据处理结果:', {
totalResults: dataProcessingResult.results.length,
stats: dataProcessingResult.stats,
sampleResults: dataProcessingResult.results.slice(0, 3),
});
console.log();

// 示例 3: 缓存状态管理
console.log('3. 缓存状态管理:');

// 第一次访问(缓存未命中)
const cacheResult1 = await cacheGraph.invoke(
{},
{
configurable: {
cacheKey: 'user_data_123',
cacheValue: { name: '张三', age: 25 },
cacheTTL: 60000, // 1分钟
},
}
);

console.log('第一次缓存访问:', {
cacheStats: cacheResult1.cacheStats,
cacheHitRate:
cacheResult1.cacheStats.hits / cacheResult1.cacheStats.totalRequests,
});

// 第二次访问(缓存命中)
const cacheResult2 = await cacheGraph.invoke(cacheResult1, {
configurable: {
cacheKey: 'user_data_123',
cacheValue: { name: '张三', age: 25 },
},
});

console.log('第二次缓存访问:', {
cacheStats: cacheResult2.cacheStats,
cacheHitRate:
cacheResult2.cacheStats.hits / cacheResult2.cacheStats.totalRequests,
});
}

// 状态模式工具函数
const calculateWorkflowProgress = (
stepHistory: string[],
totalSteps: number = 5
) => {
return Math.round((stepHistory.length / totalSteps) * 100);
};

const getProcessingRate = (stats: {
processed: number;
failed: number;
startTime: Date;
endTime?: Date;
}) => {
const duration = stats.endTime
? stats.endTime.getTime() - stats.startTime.getTime()
: Date.now() - stats.startTime.getTime();

const totalProcessed = stats.processed + stats.failed;
return totalProcessed / (duration / 1000); // 每秒处理数量
};

const getCacheHitRate = (cacheStats: {
hits: number;
misses: number;
totalRequests: number;
}) => {
return cacheStats.totalRequests > 0
? (cacheStats.hits / cacheStats.totalRequests) * 100
: 0;
};

// 状态验证函数
const validateWorkflowState = (state: typeof WorkflowState.State) => {
const errors: string[] = [];

if (!state.currentStep) {
errors.push('当前步骤不能为空');
}

if (state.progress < 0 || state.progress > 100) {
errors.push('进度必须在0-100之间');
}

if (state.isComplete && state.progress !== 100) {
errors.push('完成状态与进度不匹配');
}

return errors;
};

const validateDataProcessingState = (
state: typeof DataProcessingState.State
) => {
const errors: string[] = [];

if (!state.inputData || state.inputData.length === 0) {
errors.push('输入数据不能为空');
}

if (state.batchSize && state.batchSize <= 0) {
errors.push('批次大小必须大于0');
}

if (state.stats && state.stats.processed < 0) {
errors.push('处理数量不能为负数');
}

return errors;
};

// 导出
export {
WorkflowState,
DataProcessingState,
CacheState,
workflowGraph,
dataProcessingGraph,
cacheGraph,
runAdvancedStateExamples,
calculateWorkflowProgress,
getProcessingRate,
getCacheHitRate,
validateWorkflowState,
validateDataProcessingState,
};

// 如果直接运行此文件
if (require.main === module) {
runAdvancedStateExamples().catch(console.error);
}

状态设计最佳实践

1. 状态结构设计原则

// ✅ 好的状态设计
const GoodStateDesign = Annotation.Root({
// 1. 扁平化结构,避免深度嵌套
userId: Annotation<string>(),
userName: Annotation<string>(),
userEmail: Annotation<string>(),

// 2. 明确的数据类型
createdAt: Annotation<Date>(),
isVerified: Annotation<boolean>(),

// 3. 合理的默认值
preferences: Annotation<string[]>({
default: () => []
}),

// 4. 适当的 Reducer
activityLog: Annotation<string[]>({
reducer: (current, update) => [...current, ...update],
default: () => []
})
});

// ❌ 避免的状态设计
const BadStateDesign = Annotation.Root({
// 避免深度嵌套
user: Annotation<{
profile: {
personal: {
details: {
name: string;
age: number;
}
}
}
}>(),

// 避免存储函数
callback: Annotation<Function>(),

// 避免存储不可序列化的对象
domElement: Annotation<HTMLElement>()
});

2. 状态更新模式

// 节点中的状态更新
const updateNode = (state: typeof StateAnnotation.State) => {
// ✅ 部分更新
return {
counter: state.counter + 1,
lastUpdated: new Date()
};

// ✅ 条件更新
const updates: any = {};
if (state.counter > 10) {
updates.status = 'high';
}
return updates;

// ✅ 复杂更新
return {
items: [...state.items, 'new item'],
metadata: {
...state.metadata,
lastAction: 'add_item'
}
};
};

3. 状态验证和错误处理

const validateState = (state: typeof StateAnnotation.State) => {
// 状态验证逻辑
if (!state.userId) {
throw new Error('用户ID不能为空');
}

if (state.counter < 0) {
throw new Error('计数器不能为负数');
}

return state;
};

const safeUpdateNode = (state: typeof StateAnnotation.State) => {
try {
validateState(state);

return {
counter: state.counter + 1,
isValid: true
};
} catch (error) {
return {
error: error.message,
isValid: false
};
}
};

常见状态模式

1. 工作流状态模式

const WorkflowState = Annotation.Root({
// 当前步骤
currentStep: Annotation<string>(),

// 步骤历史
stepHistory: Annotation<string[]>({
reducer: (current, update) => [...current, ...update],
default: () => []
}),

// 步骤数据
stepData: Annotation<Record<string, any>>({
reducer: (current, update) => ({ ...current, ...update }),
default: () => ({})
}),

// 完成状态
isComplete: Annotation<boolean>({
default: () => false
}),

// 错误信息
errors: Annotation<string[]>({
reducer: (current, update) => [...current, ...update],
default: () => []
})
});

2. 聊天应用状态模式

import { MessagesAnnotation } from '@langchain/langgraph';

const ChatAppState = Annotation.Root({
// 继承消息状态
...MessagesAnnotation.spec,

// 用户信息
userId: Annotation<string>(),
sessionId: Annotation<string>(),

// 对话上下文
context: Annotation<Record<string, any>>({
reducer: (current, update) => ({ ...current, ...update }),
default: () => ({})
}),

// 意图识别
currentIntent: Annotation<string>(),
confidence: Annotation<number>(),

// 工具调用状态
pendingToolCalls: Annotation<string[]>({
default: () => []
})
});

3. 数据处理状态模式

const DataProcessingState = Annotation.Root({
// 输入数据
inputData: Annotation<any>(),

// 处理结果
results: Annotation<any[]>({
reducer: (current, update) => [...current, ...update],
default: () => []
}),

// 处理统计
stats: Annotation<{
processed: number;
failed: number;
total: number;
}>({
reducer: (current, update) => ({
processed: current.processed + (update.processed || 0),
failed: current.failed + (update.failed || 0),
total: Math.max(current.total, update.total || current.total)
}),
default: () => ({ processed: 0, failed: 0, total: 0 })
}),

// 进度跟踪
progress: Annotation<number>({
reducer: (current, update) => Math.max(current, update),
default: () => 0
})
});

常见问题解答

Q: 什么时候应该使用 Reducer?

A: 在以下情况下使用 Reducer:

  • 累加操作:计数器、分数累计
  • 数组合并:日志收集、结果聚合
  • 对象合并:配置更新、元数据收集
  • 条件更新:最大值、最小值更新

Q: 如何处理大型状态对象?

A: 处理大型状态的策略:

// 1. 分解状态结构
const LargeAppState = Annotation.Root({
// 用户相关状态
user: Annotation<UserState>(),

// 应用相关状态
app: Annotation<AppState>(),

// 数据相关状态
data: Annotation<DataState>()
});

// 2. 使用私有状态
const PrivateState = Annotation.Root({
// 内部处理状态,不暴露给外部
_internalCache: Annotation<any>(),

// 公开状态
publicData: Annotation<any>()
});

Q: 状态更新的性能考虑?

A: 性能优化建议:

  • 避免不必要的状态更新
  • 使用浅拷贝而非深拷贝
  • 合理设计 Reducer 逻辑
  • 考虑状态的序列化成本

小结与延伸

在本节中,我们深入了解了 LangGraphJS 的状态管理机制:

🔑 核心要点:

  • 状态是图中所有节点的共享数据结构
  • Annotation 对象提供了强大的状态定义能力
  • Reducer 函数控制状态更新的逻辑
  • 类型安全的状态定义确保代码的可靠性
  • 灵活的状态模式适应不同的应用场景

📈 最佳实践:

  • 保持状态结构简洁明了
  • 合理使用 Reducer 处理状态合并
  • 添加适当的状态验证逻辑
  • 考虑状态的序列化和性能影响

🔗 与下一节的关联: 掌握了状态管理后,我们将学习**节点(Nodes)**的详细实现。节点是状态的操作者,它们读取状态、执行业务逻辑、并更新状态。理解节点的工作原理将帮助你构建更复杂和强大的 LangGraph 应用。

💡 进阶学习建议:

  • 尝试设计适合你业务场景的状态结构
  • 实验不同的 Reducer 模式
  • 探索状态持久化和恢复机制
  • 学习状态的性能优化技巧