跳到主要内容

📚 RAG 系统

引言

RAG(Retrieval-Augmented Generation,检索增强生成)是当前最热门的 AI 应用模式之一。它结合了信息检索和文本生成的能力,让 LLM 能够基于外部知识库回答问题,解决了传统 LLM 知识更新滞后和幻觉问题。

作为前端开发者,你可能需要构建各种基于知识库的问答系统,如:

  • 企业知识库问答:基于内部文档的智能客服
  • 产品文档助手:帮助用户快速找到相关信息
  • 学习助手:基于教材和资料的学习辅导
  • 代码助手:基于代码库的编程帮助

核心概念

RAG 工作流程

RAG 系统的核心工作流程可以分为两个阶段:

关键组件

  1. 向量数据库:存储文档的向量表示
  2. 检索器:根据查询找到相关文档
  3. 生成器:基于检索结果生成答案
  4. 评估器:验证答案质量

基础 RAG 系统

让我们从最简单的 RAG 系统开始:

基础 RAG 系统
import { StateGraph, Annotation, START, END } from '@langchain/langgraph';
import { ChatOpenAI, OpenAIEmbeddings } from '@langchain/openai';
import { Document } from '@langchain/core/documents';
import { HumanMessage, SystemMessage } from '@langchain/core/messages';

// 简化的向量存储
class SimpleVectorStore {
private documents: Document[] = [];
private embeddings: number[][] = [];

constructor(private embeddingModel: OpenAIEmbeddings) {}

static async fromDocuments(docs: Document[], embeddings: OpenAIEmbeddings) {
const store = new SimpleVectorStore(embeddings);
await store.addDocuments(docs);
return store;
}

async addDocuments(docs: Document[]) {
for (const doc of docs) {
const embedding = await this.embeddingModel.embedQuery(doc.pageContent);
this.documents.push(doc);
this.embeddings.push(embedding);
}
}

async similaritySearch(query: string, k: number = 4): Promise<Document[]> {
const queryEmbedding = await this.embeddingModel.embedQuery(query);
const similarities = this.embeddings.map((embedding, index) => ({
index,
similarity: this.cosineSimilarity(queryEmbedding, embedding),
}));

similarities.sort((a, b) => b.similarity - a.similarity);
return similarities.slice(0, k).map(({ index }) => this.documents[index]);
}

private cosineSimilarity(a: number[], b: number[]): number {
const dotProduct = a.reduce((sum, val, i) => sum + val * b[i], 0);
const magnitudeA = Math.sqrt(a.reduce((sum, val) => sum + val * val, 0));
const magnitudeB = Math.sqrt(b.reduce((sum, val) => sum + val * val, 0));
return dotProduct / (magnitudeA * magnitudeB);
}
}

// RAG 状态
const StateAnnotation = Annotation.Root({
question: Annotation<string>(),
documents: Annotation<Document[]>({
reducer: (state, update) => update,
default: () => [],
}),
context: Annotation<string>(),
answer: Annotation<string>(),
});

// 初始化组件
const embeddings = new OpenAIEmbeddings({
model: process.env.OPENAI_MODEL_NAME,
});
const model = new ChatOpenAI({
model: process.env.OPENAI_MODEL_NAME,
temperature: 0.1,
});
let vectorStore: SimpleVectorStore;

// 初始化知识库
async function initializeKnowledgeBase() {
const documents = [
new Document({
pageContent:
'React 是一个用于构建用户界面的 JavaScript 库,核心概念包括组件、状态管理和虚拟 DOM。',
metadata: { source: 'react-basics', topic: 'React基础' },
}),
new Document({
pageContent:
'Vue.js 是一个渐进式 JavaScript 框架,具有响应式数据绑定、组件系统和指令系统等特性。',
metadata: { source: 'vue-basics', topic: 'Vue基础' },
}),
new Document({
pageContent:
'LangGraph 是一个用于构建智能代理应用的框架,将 LLM 应用的控制流程建模为图结构。',
metadata: { source: 'langgraph-intro', topic: 'LangGraph介绍' },
}),
];

vectorStore = await SimpleVectorStore.fromDocuments(documents, embeddings);
console.log('知识库初始化完成,包含', documents.length, '个文档');
}

// 检索节点
async function retrieveDocuments(state: typeof StateAnnotation.State) {
const relevantDocs = await vectorStore.similaritySearch(state.question, 3);
return { documents: relevantDocs };
}

// 生成上下文节点
function generateContext(state: typeof StateAnnotation.State) {
const context = state.documents
.map(
(doc, index) =>
`文档 ${index + 1} (${doc.metadata.topic}):\n${doc.pageContent}`
)
.join('\n\n---\n\n');

return { context };
}

// 生成答案节点
async function generateAnswer(state: typeof StateAnnotation.State) {
const systemPrompt = `基于以下上下文回答问题:
${state.context}

规则:
1. 只能基于提供的上下文信息回答
2. 如果上下文中没有相关信息,请明确说明
3. 回答要准确、简洁`;

const messages = [
new SystemMessage(systemPrompt),
new HumanMessage(state.question),
];

const response = await model.invoke(messages);
return { answer: response.content as string };
}

// 构建 RAG 图
const workflow = new StateGraph(StateAnnotation)
.addNode('retrieve', retrieveDocuments)
.addNode('generate_context', generateContext)
.addNode('generate_answer', generateAnswer)
.addEdge(START, 'retrieve')
.addEdge('retrieve', 'generate_context')
.addEdge('generate_context', 'generate_answer')
.addEdge('generate_answer', END);

const app = workflow.compile();

// 查询函数
async function queryRAG(question: string) {
const result = await app.invoke({ question });

return {
question: result.question,
answer: result.answer,
sources: result.documents.map((doc) => doc.metadata.topic),
};
}

// 使用示例
async function runBasicRAGExample() {
await initializeKnowledgeBase();

const questions = ['React 的核心概念有哪些?', 'LangGraph 是什么?'];

for (const question of questions) {
const result = await queryRAG(question);
console.log(`问题: ${result.question}`);
console.log(`答案: ${result.answer}`);
console.log(`来源: ${result.sources.join(', ')}`);
}
}

export {
app as basicRAG,
queryRAG,
initializeKnowledgeBase,
runBasicRAGExample,
};

这个基础版本展示了 RAG 系统的核心流程:

  1. 文档向量化:将文档转换为向量表示
  2. 相似性检索:根据查询找到最相关的文档
  3. 上下文生成:将检索结果作为上下文传递给 LLM
  4. 答案生成:基于上下文生成准确答案

高级 RAG 系统

为了提高答案质量,我们可以添加文档质量评估和多轮检索:

高级 RAG 系统
import { StateGraph, Annotation, START, END } from '@langchain/langgraph';
import { ChatOpenAI } from '@langchain/openai';

// 简化的文档接口
interface SimpleDocument {
pageContent: string;
metadata: Record<string, any>;
}

// 简化的向量存储
class SimpleVectorStore {
private documents: SimpleDocument[] = [];

async addDocuments(docs: SimpleDocument[]) {
this.documents.push(...docs);
}

async similaritySearch(
query: string,
k: number = 5
): Promise<SimpleDocument[]> {
// 简化的相似性搜索 - 基于关键词匹配
const queryLower = query.toLowerCase();
const scored = this.documents.map((doc) => ({
doc,
score: this.calculateSimpleScore(
doc.pageContent.toLowerCase(),
queryLower
),
}));

return scored
.sort((a, b) => b.score - a.score)
.slice(0, k)
.map((item) => item.doc);
}

private calculateSimpleScore(content: string, query: string): number {
const queryWords = query.split(' ');
let score = 0;

for (const word of queryWords) {
if (content.includes(word)) {
score += 1;
}
}

return score / queryWords.length;
}
}

// 定义状态结构
const AdvancedRAGState = Annotation.Root({
query: Annotation<string>(),
rewrittenQuery: Annotation<string>(),
documents: Annotation<SimpleDocument[]>(),
relevanceScores: Annotation<number[]>(),
needsReretrieval: Annotation<boolean>(),
answer: Annotation<string>(),
answerQuality: Annotation<number>(),
retrievalRound: Annotation<number>(),
});

// 初始化组件
const llm = new ChatOpenAI({
model: process.env.OPENAI_MODEL_NAME,
temperature: 0.1,
});

// 创建向量存储(示例数据)
const createVectorStore = async () => {
const documents: SimpleDocument[] = [
{
pageContent:
'LangGraph 是一个用于构建有状态的多参与者应用程序的库,基于 LangChain 构建。它扩展了 LangChain Expression Language,具有在多个计算步骤中协调多个链(或参与者)的能力。',
metadata: { source: 'langgraph-intro', type: 'documentation' },
},
{
pageContent:
'StateGraph 是 LangGraph 的核心类,用于定义图的结构。它允许你添加节点、边和条件边来创建复杂的工作流程。',
metadata: { source: 'langgraph-stategraph', type: 'documentation' },
},
{
pageContent:
'RAG(检索增强生成)是一种结合信息检索和文本生成的技术,可以让 LLM 基于外部知识库生成更准确的答案。',
metadata: { source: 'rag-concept', type: 'concept' },
},
{
pageContent:
'在 LangGraph 中,你可以使用 Annotation 来定义状态的结构,包括字段类型、默认值和 reducer 函数。',
metadata: { source: 'langgraph-annotation', type: 'documentation' },
},
{
pageContent:
'向量数据库是 RAG 系统的核心组件,它存储文档的向量表示,支持高效的相似性搜索。',
metadata: { source: 'vector-database', type: 'concept' },
},
];

const vectorStore = new SimpleVectorStore();
await vectorStore.addDocuments(documents);
return vectorStore;
};

/**
* 查询理解和重写节点
*/
async function queryUnderstanding(state: typeof AdvancedRAGState.State) {
const { query, retrievalRound = 1 } = state;

// 如果是第一轮检索,直接使用原查询
if (retrievalRound === 1) {
return {
rewrittenQuery: query,
};
}

// 后续轮次,重写查询以提高检索效果
const rewritePrompt = `
原始查询: ${query}

这是第 ${retrievalRound} 轮检索。之前的检索结果不够相关。
请重写查询以提高检索效果,使其更具体、更清晰:

重写后的查询:`;

const response = await llm.invoke([
{
role: 'system',
content: '你是一个查询优化专家,擅长重写查询以提高信息检索的效果。',
},
{ role: 'user', content: rewritePrompt },
]);

return {
rewrittenQuery: response.content as string,
};
}

/**
* 文档检索节点
*/
async function retrieveDocuments(state: typeof AdvancedRAGState.State) {
const { rewrittenQuery } = state;
const vectorStore = await createVectorStore();

// 检索相关文档
const documents = await vectorStore.similaritySearch(rewrittenQuery, 5);

console.log(`检索到 ${documents.length} 个文档`);

return {
documents,
};
}

/**
* 文档相关性评估节点
*/
async function evaluateRelevance(state: typeof AdvancedRAGState.State) {
const { rewrittenQuery, documents = [] } = state;

if (documents.length === 0) {
return {
relevanceScores: [],
needsReretrieval: true,
};
}

const relevanceScores: number[] = [];

for (const doc of documents) {
const evaluationPrompt = `
查询: ${rewrittenQuery}
文档内容: ${doc.pageContent}

请评估这个文档与查询的相关性,给出 0-1 之间的分数:
- 1.0: 完全相关,直接回答了查询
- 0.7-0.9: 高度相关,包含重要信息
- 0.4-0.6: 中等相关,包含一些有用信息
- 0.1-0.3: 低相关性,信息有限
- 0.0: 完全不相关

只返回数字分数,不要其他内容。`;

const response = await llm.invoke([
{
role: 'system',
content: '你是一个文档相关性评估专家,能够准确评估文档与查询的相关性。',
},
{ role: 'user', content: evaluationPrompt },
]);

const score = parseFloat(response.content as string) || 0;
relevanceScores.push(score);
}

// 计算平均相关性分数
const avgScore =
relevanceScores.reduce((sum, score) => sum + score, 0) /
relevanceScores.length;

// 如果平均分数低于阈值,需要重新检索
const needsReretrieval = avgScore < 0.6;

console.log(
`文档相关性评分: ${relevanceScores.map((s) => s.toFixed(2)).join(', ')}`
);
console.log(
`平均相关性: ${avgScore.toFixed(2)}, 需要重新检索: ${needsReretrieval}`
);

return {
relevanceScores,
needsReretrieval,
};
}

/**
* 重新检索决策节点
*/
function shouldRetrieve(state: typeof AdvancedRAGState.State) {
const { needsReretrieval, retrievalRound = 1 } = state;

// 最多重试 3 轮
if (needsReretrieval && retrievalRound < 3) {
return 'incrementRound';
}

return 'generateAnswer';
}

/**
* 答案生成节点
*/
async function generateAnswer(state: typeof AdvancedRAGState.State) {
const { query, documents = [], relevanceScores = [] } = state;

if (documents.length === 0) {
return {
answer: '抱歉,我没有找到相关的信息来回答您的问题。',
answerQuality: 0,
};
}

// 根据相关性分数过滤和排序文档
const documentWithScores = documents.map((doc, index) => ({
document: doc,
score: relevanceScores[index] || 0,
}));

const relevantDocs = documentWithScores
.filter((item) => item.score > 0.3)
.sort((a, b) => b.score - a.score)
.slice(0, 3)
.map((item) => item.document);

if (relevantDocs.length === 0) {
return {
answer: '抱歉,检索到的文档与您的问题相关性较低,无法提供准确答案。',
answerQuality: 0.2,
};
}

// 构建上下文
const context = relevantDocs
.map((doc, index) => `文档 ${index + 1}:\n${doc.pageContent}`)
.join('\n\n');

const answerPrompt = `
基于以下上下文信息回答用户问题。请确保答案准确、完整,并且完全基于提供的上下文。

上下文:
${context}

用户问题: ${query}

请提供一个准确、有帮助的答案。如果上下文中没有足够信息回答问题,请明确说明。`;

const response = await llm.invoke([
{
role: 'system',
content:
'你是一个有帮助的助手,能够基于提供的上下文准确回答问题。只使用上下文中的信息,不要添加额外的知识。',
},
{ role: 'user', content: answerPrompt },
]);

return {
answer: response.content as string,
};
}

/**
* 构建高级 RAG 系统
*/
function createAdvancedRAG() {
const workflow = new StateGraph(AdvancedRAGState)
.addNode('queryUnderstanding', queryUnderstanding)
.addNode('retrieveDocuments', retrieveDocuments)
.addNode('evaluateRelevance', evaluateRelevance)
.addNode('generateAnswer', generateAnswer)
.addEdge(START, 'queryUnderstanding')
.addEdge('queryUnderstanding', 'retrieveDocuments')
.addEdge('retrieveDocuments', 'evaluateRelevance')
.addConditionalEdges('evaluateRelevance', shouldRetrieve)
.addEdge('generateAnswer', END);

return workflow.compile();
}

// 使用示例
async function runAdvancedRAG() {
const app = createAdvancedRAG();

console.log('🚀 启动高级 RAG 系统...\n');

const queries = [
'LangGraph 是什么?',
'RAG 系统的工作原理是什么?',
'如何在 Python 中使用机器学习?', // 这个查询应该触发重新检索
];

for (const query of queries) {
console.log(`\n${'='.repeat(50)}`);
console.log(`查询: ${query}`);
console.log('='.repeat(50));

const result = await app.invoke({
query,
retrievalRound: 1,
needsReretrieval: false,
});

console.log(`\n📝 最终答案:`);
console.log(result.answer);
console.log(`\n🔄 检索轮次: ${result.retrievalRound || 1}`);
}
}

export {
AdvancedRAGState,
SimpleDocument,
SimpleVectorStore,
createAdvancedRAG,
runAdvancedRAG,
};

高级版本的改进:

  • 文档质量评估:检查检索到的文档是否与问题相关
  • 自适应检索:根据质量评估决定是否需要重新检索
  • 答案验证:确保生成的答案基于提供的上下文

多轮对话 RAG

在实际应用中,用户往往需要进行多轮对话:

多轮对话 RAG
/**
* ============================================================================
* 对话式RAG系统 - Conversational RAG System
* ============================================================================
*
* 📖 概述
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* 这是一个支持多轮对话的智能RAG系统,能够理解对话上下文并进行上下文化的
* 文档检索和答案生成。系统会将用户的当前问题与历史对话结合,生成更准确的
* 查询语句,从而提供更连贯和精准的回答。
*
* 🎯 核心功能
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* 1️⃣ 查询上下文化:结合对话历史重新表述用户问题
* 2️⃣ 多轮对话:支持连续对话,保持上下文一致性
* 3️⃣ 智能检索:基于上下文化查询进行文档检索
* 4️⃣ 对话管理:自动管理对话历史和会话状态
* 5️⃣ 会话隔离:支持多个独立的对话会话
*
* 💡 实现思路
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* • 使用 LLM 进行查询上下文化,理解用户真实意图
* • 维护对话历史,取最近3轮对话作为上下文
* • 构建四阶段流程:上下文化→检索→生成→更新历史
* • 提供会话管理器支持多用户并发对话
*
* 🚀 使用方式
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* // 运行示例
* $ esno conversational-rag.ts
*
* ⚠️ 注意事项
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* • 需要配置 OPENAI_API_KEY 环境变量
* • 查询上下文化会增加额外的 LLM 调用
* • 对话历史越长,token 消耗越多
* • 会话数据仅存储在内存中
*
* @author 程哥
* @version 1.0.0
* @updated 2025-11
*/

import '../../utils/loadEnv';
import { StateGraph, Annotation, START, END } from '@langchain/langgraph';
import { ChatOpenAI } from '@langchain/openai';
import { AIMessage, HumanMessage, BaseMessage } from '@langchain/core/messages';

// 简化的文档接口
interface SimpleDocument {
pageContent: string;
metadata: Record<string, any>;
}

// 对话历史接口
interface ConversationTurn {
role: 'user' | 'assistant';
content: string;
timestamp: Date;
}

// 简化的向量存储
class SimpleVectorStore {
private documents: SimpleDocument[] = [];

async addDocuments(docs: SimpleDocument[]) {
this.documents.push(...docs);
}

async similaritySearch(
query: string,
k: number = 5
): Promise<SimpleDocument[]> {
const queryLower = query.toLowerCase();
const scored = this.documents.map((doc) => ({
doc,
score: this.calculateSimpleScore(
doc.pageContent.toLowerCase(),
queryLower
),
}));

return scored
.sort((a, b) => b.score - a.score)
.slice(0, k)
.map((item) => item.doc);
}

private calculateSimpleScore(content: string, query: string): number {
const queryWords = query.split(' ');
let score = 0;

for (const word of queryWords) {
if (content.includes(word)) {
score += 1;
}
}

return score / queryWords.length;
}
}

// 定义状态结构
const ConversationalRAGState = Annotation.Root({
// 当前用户查询
currentQuery: Annotation<string>(),
// 对话历史
conversationHistory: Annotation<ConversationTurn[]>({
reducer: (x, y) => [...(x || []), ...(y || [])],
default: () => [],
}),
// 上下文化的查询
contextualizedQuery: Annotation<string>(),
// 检索到的文档
documents: Annotation<SimpleDocument[]>(),
// 生成的答案
answer: Annotation<string>(),
// 会话ID
sessionId: Annotation<string>(),
});

// 初始化组件
const llm = new ChatOpenAI({
model: process.env.OPENAI_MODEL_NAME,
temperature: 0.1,
});

// 创建向量存储(示例数据)
const createVectorStore = async () => {
const documents: SimpleDocument[] = [
{
pageContent:
'LangGraph 是一个用于构建有状态的多参与者应用程序的库,基于 LangChain 构建。它扩展了 LangChain Expression Language,具有在多个计算步骤中协调多个链(或参与者)的能力。',
metadata: { source: 'langgraph-intro', type: 'documentation' },
},
{
pageContent:
'StateGraph 是 LangGraph 的核心类,用于定义图的结构。它允许你添加节点、边和条件边来创建复杂的工作流程。',
metadata: { source: 'langgraph-stategraph', type: 'documentation' },
},
{
pageContent:
'RAG(检索增强生成)是一种结合信息检索和文本生成的技术,可以让 LLM 基于外部知识库生成更准确的答案。',
metadata: { source: 'rag-concept', type: 'concept' },
},
{
pageContent:
'在 LangGraph 中,你可以使用 Annotation 来定义状态的结构,包括字段类型、默认值和 reducer 函数。',
metadata: { source: 'langgraph-annotation', type: 'documentation' },
},
{
pageContent:
'向量数据库是 RAG 系统的核心组件,它存储文档的向量表示,支持高效的相似性搜索。',
metadata: { source: 'vector-database', type: 'concept' },
},
{
pageContent:
'LangGraph 支持流式处理,允许你实时获取节点的执行结果,提供更好的用户体验。',
metadata: { source: 'langgraph-streaming', type: 'feature' },
},
{
pageContent:
'条件边允许你根据状态动态决定下一个要执行的节点,实现复杂的分支逻辑。',
metadata: { source: 'langgraph-conditional', type: 'feature' },
},
];

const vectorStore = new SimpleVectorStore();
await vectorStore.addDocuments(documents);
return vectorStore;
};

/**
* 查询上下文化节点
* 将当前查询与对话历史结合,生成上下文化的查询
*/
async function contextualizeQuery(state: typeof ConversationalRAGState.State) {
const { currentQuery, conversationHistory = [] } = state;

// 如果没有对话历史,直接使用当前查询
if (conversationHistory.length === 0) {
return {
contextualizedQuery: currentQuery,
};
}

// 构建对话历史上下文
const recentHistory = conversationHistory
.slice(-6) // 只取最近3轮对话
.map((turn) => `${turn.role}: ${turn.content}`)
.join('\n');

const contextualizationPrompt = `
对话历史:
${recentHistory}

当前用户问题: ${currentQuery}

请基于对话历史,重新表述用户的当前问题,使其包含必要的上下文信息。
如果当前问题已经很清晰且不需要上下文,请直接返回原问题。

上下文化的问题:`;

const response = await llm.invoke([
{
role: 'system',
content: '你是一个对话理解专家,擅长根据对话历史理解用户的真实意图。',
},
{ role: 'user', content: contextualizationPrompt },
]);

const contextualizedQuery = response.content as string;

console.log(`原始查询: ${currentQuery}`);
console.log(`上下文化查询: ${contextualizedQuery}`);

return {
contextualizedQuery,
};
}

/**
* 文档检索节点
* 根据上下文化的查询检索相关文档
*/
async function retrieveDocuments(state: typeof ConversationalRAGState.State) {
const { contextualizedQuery } = state;
const vectorStore = await createVectorStore();

// 检索相关文档
const documents = await vectorStore.similaritySearch(contextualizedQuery, 5);

console.log(`检索到 ${documents.length} 个文档`);

return {
documents,
};
}

/**
* 答案生成节点
* 基于检索到的文档和对话历史生成答案
*/
async function generateAnswer(state: typeof ConversationalRAGState.State) {
const {
currentQuery,
contextualizedQuery,
documents = [],
conversationHistory = [],
} = state;

if (documents.length === 0) {
return {
answer: '抱歉,我没有找到相关的信息来回答您的问题。',
};
}

// 构建文档上下文
const documentContext = documents
.map((doc, index) => `文档 ${index + 1}:\n${doc.pageContent}`)
.join('\n\n');

// 构建对话历史上下文
const conversationContext =
conversationHistory.length > 0
? conversationHistory
.slice(-4) // 最近2轮对话
.map((turn) => `${turn.role}: ${turn.content}`)
.join('\n')
: '';

const answerPrompt = `
${conversationContext ? `对话历史:\n${conversationContext}\n\n` : ''}文档上下文:
${documentContext}

用户当前问题: ${currentQuery}
${
contextualizedQuery !== currentQuery
? `上下文化问题: ${contextualizedQuery}`
: ''
}

请基于提供的文档上下文和对话历史,回答用户的当前问题。
要求:
1. 答案应该准确且基于提供的文档
2. 考虑对话的连续性和上下文
3. 如果问题涉及之前的对话内容,请适当引用
4. 保持回答的自然和友好

回答:`;

const response = await llm.invoke([
{
role: 'system',
content:
'你是一个有帮助的助手,能够基于文档和对话历史提供准确、连贯的回答。',
},
{ role: 'user', content: answerPrompt },
]);

const answer = response.content as string;

return {
answer,
};
}

/**
* 更新对话历史节点
* 将当前的问答对添加到对话历史中
*/
function updateConversationHistory(state: typeof ConversationalRAGState.State) {
const { currentQuery, answer } = state;

const newTurns: ConversationTurn[] = [
{
role: 'user',
content: currentQuery,
timestamp: new Date(),
},
{
role: 'assistant',
content: answer,
timestamp: new Date(),
},
];

return {
conversationHistory: newTurns,
};
}

/**
* 构建多轮对话 RAG 系统
*/
function createConversationalRAG() {
const workflow = new StateGraph(ConversationalRAGState)
.addNode('contextualizeQuery', contextualizeQuery)
.addNode('retrieveDocuments', retrieveDocuments)
.addNode('generateAnswer', generateAnswer)
.addNode('updateHistory', updateConversationHistory)
.addEdge(START, 'contextualizeQuery')
.addEdge('contextualizeQuery', 'retrieveDocuments')
.addEdge('retrieveDocuments', 'generateAnswer')
.addEdge('generateAnswer', 'updateHistory')
.addEdge('updateHistory', END);

return workflow.compile();
}

// 会话管理器 - 兼容测试接口
class ConversationManager {
private messages: BaseMessage[] = [];
private maxHistory: number;

constructor(maxHistory: number = 10) {
this.maxHistory = maxHistory;
}

addMessage(message: BaseMessage): void {
this.messages.push(message);
if (this.messages.length > this.maxHistory) {
this.messages = this.messages.slice(-this.maxHistory);
}
}

getHistory(): BaseMessage[] {
return [...this.messages];
}

getRecentMessages(count: number): BaseMessage[] {
return this.messages.slice(-count);
}

clear(): void {
this.messages = [];
}
}

// 上下文化查询函数 - 兼容测试接口
async function contextualizeQuestion(
question: string,
chatHistory: BaseMessage[]
): Promise<string> {
if (!chatHistory || chatHistory.length === 0) {
return question;
}

// 构建对话历史上下文
const recentHistory = chatHistory
.slice(-6) // 只取最近3轮对话
.map(
(msg) =>
`${msg.constructor.name === 'HumanMessage' ? 'user' : 'assistant'}: ${
msg.content
}`
)
.join('\n');

const contextualizationPrompt = `
对话历史:
${recentHistory}

当前用户问题: ${question}

请基于对话历史,重新表述用户的当前问题,使其包含必要的上下文信息。
如果当前问题已经很清晰且不需要上下文,请直接返回原问题。

上下文化的问题:`;

const response = await llm.invoke([
{
role: 'system',
content: '你是一个对话理解专家,擅长根据对话历史理解用户的真实意图。',
},
{ role: 'user', content: contextualizationPrompt },
]);

return response.content as string;
}

// 对话式 RAG 状态接口 - 兼容测试
const ConversationalRAGTestState = Annotation.Root({
input: Annotation<string>(),
chatHistory: Annotation<BaseMessage[]>({
reducer: (x, y) => [...(x || []), ...(y || [])],
default: () => [],
}),
contextualizedQuestion: Annotation<string>(),
documents: Annotation<SimpleDocument[]>(),
answer: Annotation<string>(),
});

// 测试兼容的节点函数
async function testContextualizeQuery(
state: typeof ConversationalRAGTestState.State
) {
const { input, chatHistory = [] } = state;
const contextualizedQuestion = await contextualizeQuestion(
input,
chatHistory
);

return {
contextualizedQuestion,
};
}

async function testRetrieveDocuments(
state: typeof ConversationalRAGTestState.State
) {
const { contextualizedQuestion } = state;
const vectorStore = await createVectorStore();
const documents = await vectorStore.similaritySearch(
contextualizedQuestion,
5
);

return {
documents,
};
}

async function testGenerateAnswer(
state: typeof ConversationalRAGTestState.State
) {
const {
input,
contextualizedQuestion,
documents = [],
chatHistory = [],
} = state;

if (documents.length === 0) {
return {
answer: '抱歉,我没有找到相关的信息来回答您的问题。',
};
}

// 构建文档上下文
const documentContext = documents
.map((doc, index) => `文档 ${index + 1}:\n${doc.pageContent}`)
.join('\n\n');

// 构建对话历史上下文
const conversationContext =
chatHistory.length > 0
? chatHistory
.slice(-4) // 最近2轮对话
.map(
(msg) =>
`${
msg.constructor.name === 'HumanMessage' ? 'user' : 'assistant'
}: ${msg.content}`
)
.join('\n')
: '';

const answerPrompt = `
${conversationContext ? `对话历史:\n${conversationContext}\n\n` : ''}文档上下文:
${documentContext}

用户当前问题: ${input}
${
contextualizedQuestion !== input
? `上下文化问题: ${contextualizedQuestion}`
: ''
}

请基于提供的文档上下文和对话历史,回答用户的当前问题。
要求:
1. 答案应该准确且基于提供的文档
2. 考虑对话的连续性和上下文
3. 如果问题涉及之前的对话内容,请适当引用
4. 保持回答的自然和友好

回答:`;

const response = await llm.invoke([
{
role: 'system',
content:
'你是一个有帮助的助手,能够基于文档和对话历史提供准确、连贯的回答。',
},
{ role: 'user', content: answerPrompt },
]);

return {
answer: response.content as string,
};
}

// 更新对话历史节点 - 测试兼容版本
function testUpdateChatHistory(state: typeof ConversationalRAGTestState.State) {
const { input, answer, chatHistory = [] } = state;

const newMessages: BaseMessage[] = [
new HumanMessage(input),
new AIMessage(answer),
];

return {
chatHistory: newMessages,
};
}

// 创建测试兼容的对话式 RAG 图
function createTestConversationalRAG() {
const workflow = new StateGraph(ConversationalRAGTestState)
.addNode('contextualizeQuery', testContextualizeQuery)
.addNode('retrieveDocuments', testRetrieveDocuments)
.addNode('generateAnswer', testGenerateAnswer)
.addNode('updateChatHistory', testUpdateChatHistory)
.addEdge(START, 'contextualizeQuery')
.addEdge('contextualizeQuery', 'retrieveDocuments')
.addEdge('retrieveDocuments', 'generateAnswer')
.addEdge('generateAnswer', 'updateChatHistory')
.addEdge('updateChatHistory', END);

return workflow.compile();
}

// 对话式 RAG 图实例 - 兼容测试
const conversationalRAG = createTestConversationalRAG();

// 查询函数 - 兼容测试接口
async function queryConversationalRAG(
question: string,
chatHistory: BaseMessage[]
): Promise<{
question: string;
answer: string;
sources: SimpleDocument[];
chatHistory: BaseMessage[];
contextualizedQuestion: string;
}> {
const result = await conversationalRAG.invoke({
input: question,
chatHistory,
});

// 更新对话历史
const updatedHistory = [
...chatHistory,
new HumanMessage(question),
new AIMessage(result.answer),
];

return {
question,
answer: result.answer,
sources: result.documents || [],
chatHistory: updatedHistory,
contextualizedQuestion: result.contextualizedQuestion,
};
}

// 原有的会话管理器保持不变
class SessionConversationManager {
private sessions: Map<string, ConversationTurn[]> = new Map();
private app = createConversationalRAG();

async askQuestion(sessionId: string, question: string): Promise<string> {
// 获取会话历史
const conversationHistory = this.sessions.get(sessionId) || [];

// 执行 RAG 流程
const result = await this.app.invoke({
currentQuery: question,
conversationHistory,
sessionId,
});

// 更新会话历史
this.sessions.set(sessionId, result.conversationHistory);

return result.answer;
}

getConversationHistory(sessionId: string): ConversationTurn[] {
return this.sessions.get(sessionId) || [];
}

clearConversation(sessionId: string): void {
this.sessions.delete(sessionId);
}

getAllSessions(): string[] {
return Array.from(this.sessions.keys());
}
}

// 使用示例
async function runConversationalRAG() {
const conversationManager = new SessionConversationManager();
const sessionId = 'user-123';

console.log('🚀 启动多轮对话 RAG 系统...\n');

// 模拟多轮对话
const questions = [
'LangGraph 是什么?',
'它有什么主要特性?',
'如何使用 StateGraph?',
'刚才提到的条件边是什么意思?',
'能给我一个具体的例子吗?',
];

for (let i = 0; i < questions.length; i++) {
const question = questions[i];

console.log(`\n${'='.repeat(50)}`);
console.log(`${i + 1} 轮对话`);
console.log(`用户: ${question}`);
console.log('='.repeat(50));

const answer = await conversationManager.askQuestion(sessionId, question);

console.log(`助手: ${answer}`);

// 显示当前对话历史长度
const history = conversationManager.getConversationHistory(sessionId);
console.log(`\n📊 对话历史: ${history.length} 条记录`);
}

// 显示完整对话历史
console.log('\n📜 完整对话历史:');
const fullHistory = conversationManager.getConversationHistory(sessionId);
fullHistory.forEach((turn, index) => {
console.log(
`${index + 1}. ${turn.role}: ${turn.content.substring(0, 100)}...`
);
});
}

// 流式对话示例
async function runStreamingConversationalRAG() {
const app = createConversationalRAG();

console.log('\n🚀 启动流式多轮对话 RAG 系统...\n');

let conversationHistory: ConversationTurn[] = [];

const questions = ['RAG 系统是什么?', '它是如何工作的?'];

for (const question of questions) {
console.log(`\n用户: ${question}`);
console.log('助手: ');

const stream = await app.stream({
currentQuery: question,
conversationHistory,
sessionId: 'stream-session',
});

let finalResult: any = {};

for await (const chunk of stream) {
const [nodeName, nodeOutput] = Object.entries(chunk)[0];

if (nodeName === 'contextualizeQuery') {
console.log(`[查询理解] ${nodeOutput.contextualizedQuery}`);
} else if (nodeName === 'retrieveDocuments') {
console.log(
`[文档检索] 找到 ${nodeOutput.documents?.length || 0} 个相关文档`
);
} else if (nodeName === 'generateAnswer') {
console.log(`[答案生成] ${nodeOutput.answer}`);
finalResult = nodeOutput;
} else if (nodeName === 'updateHistory') {
conversationHistory = nodeOutput.conversationHistory;
}
}
}
}

// 导出主要函数和类型
export {
ConversationalRAGState,
ConversationTurn,
SimpleDocument,
ConversationManager,
SessionConversationManager,
createConversationalRAG,
runConversationalRAG,
runStreamingConversationalRAG,
contextualizeQuery,
retrieveDocuments,
generateAnswer,
updateConversationHistory,
contextualizeQuestion,
conversationalRAG,
queryConversationalRAG,
};

// 如果直接运行此文件,执行示例
if (require.main === module) {
runConversationalRAG().catch(console.error);
}

/*
执行结果:
Line:8 🌭 path.resolve(__dirname, '.env') /Users/loock/myFile/langgraphjs-tutorial/websites/examples/utils/.env
🚀 启动多轮对话 RAG 系统...


==================================================
第 1 轮对话
用户: LangGraph 是什么?
==================================================
检索到 5 个文档
助手: LangGraph 是一个基于 LangChain 构建的库,专门用于创建有状态的多参与者应用程序。它扩展了 LangChain 表达式语言(LCEL),提供了在多个计算步骤中协调多个链(或称为“参与者”)的能力,非常适合构建复杂的、需要记忆和协作的 AI 工作流。

你可以使用它的核心类 **StateGraph** 来定义图的结构,通过添加节点、边和条件边来组织工作流程。同时,LangGraph 支持用 **Annotation** 来清晰地定义应用的状态结构,包括字段类型、默认值等。此外,它还支持**流式处理**,可以实时输出节点执行结果,提升用户体验。

简单来说,LangGraph 让你能像设计流程图一样构建智能代理系统,特别适合如对话系统、RAG 流程、多智能体协作等场景。

📊 对话历史: 2 条记录

==================================================
第 2 轮对话
用户: 它有什么主要特性?
==================================================
原始查询: 它有什么主要特性?
上下文化查询: LangGraph 有哪些主要特性?
检索到 5 个文档
助手: LangGraph 的主要特性可以总结为以下几点,基于它作为构建有状态、多参与者应用的强大工具定位:

1. **基于 StateGraph 的流程编排**:LangGraph 的核心是 `StateGraph` 类,它让你能够以可视化图的方式定义复杂的工作流。你可以添加节点(代表各个参与者或处理步骤)、边(表示执行顺序)以及条件边(实现分支逻辑),从而灵活控制整个应用的执行路径。

2. **状态管理与 Annotation 支持**:通过使用 `Annotation`,你可以清晰地定义应用的状态结构,包括每个字段的类型、默认值,甚至指定 reducer 函数来处理状态更新。这使得多步交互中的状态管理更加类型安全和可维护。

3. **多参与者协调能力**:LangGraph 扩展了 LangChain 表达式语言(LCEL),支持在多个计算步骤中协调多个“参与者”(如不同的 AI 代理或链),非常适合构建需要协作与记忆的系统,比如多智能体对话、RAG 流程等。

4. **流式输出支持**:它支持实时流式处理,能够在节点执行过程中逐步返回结果,提升用户体验,特别适用于需要低延迟反馈的交互式应用,如聊天机器人。

总的来说,LangGraph 让你能像设计流程图一样构建复杂的 AI 应用,结合了灵活性、可扩展性和良好的开发体验,是构建高级代理系统和工作流的理想选择。

📊 对话历史: 4 条记录

==================================================
第 3 轮对话
用户: 如何使用 StateGraph?
==================================================
原始查询: 如何使用 StateGraph?
上下文化查询: 如何使用 LangGraph 中的 StateGraph 来定义和编排有状态的多参与者应用流程?
检索到 5 个文档
助手: 要使用 LangGraph 中的 `StateGraph` 来定义和编排有状态的多参与者应用流程,你可以按照以下步骤进行操作。这正是 LangGraph 作为构建复杂 AI 工作流核心工具的强大之处。

### 1. 定义应用状态(使用 Annotation)

首先,你需要清晰地定义整个图的状态结构。LangGraph 提供了 `Annotation` 来帮助你做到这一点。通过它,你可以指定状态中包含哪些字段、它们的类型、默认值,甚至如何合并更新(例如用 reducer 函数)。

```python
from typing import TypedDict, Annotated
from langgraph.graph import StateGraph, START, END
import operator

# 定义状态结构
class AgentState(TypedDict):
messages: Annotated[list, operator.add] # 消息列表会自动累加
current_step: str

# 或者使用更现代的 Annotation 语法(推荐)
from langgraph.graph import Annotation

state_schema = Annotation({
"messages": Annotated[list, operator.add],
"current_step": str
})
```

这里我们使用了 `Annotated` 和 `operator.add`,表示每次更新 `messages` 字段时都会将新内容追加到历史记录中,实现自然的消息累积——这对于对话系统等有状态场景非常关键。

---

### 2. 创建 StateGraph 实例

接下来,创建一个 `StateGraph` 实例,并传入你的状态定义:

```python
graph_builder = StateGraph(state_schema)
```

这个图将成为你工作流的“画布”。

---

### 3. 添加节点(Nodes)

每个节点代表一个参与者或处理步骤,比如一个 LLM 调用、工具执行或决策逻辑。

```python
def step_one(state):
print("执行第一步")
return {"current_step": "step_one_done"}

def step_two(state):
print("执行第二步")
return {"current_step": "step_two_done"}

# 将节点添加到图中
graph_builder.add_node("start", step_one)
graph_builder.add_node("process", step_two)
```

---

### 4. 添加边(Edges)来控制流程

你可以添加普通边来指定执行顺序,也可以添加条件边来实现分支逻辑。

```python
# 设置入口点
graph_builder.add_edge(START, "start")

# 普通边:从 start 到 process
graph_builder.add_edge("start", "process")

# 结束边
graph_builder.add_edge("process", END)
```

如果你需要根据状态决定下一步,可以使用**条件边**:

```python
def decide_next(state):
if "done" in state["current_step"]:
return "process"
else:
return END

graph_builder.add_conditional_edges("start", decide_next)
```

---

### 5. 编译并运行图

最后,编译图以生成可调用的应用实例:

```python
app = graph_builder.compile()

# 运行图
result = app.invoke({
"messages": [],
"current_step": "initial"
})

print(result)
```

你还可以使用 `stream()` 方法获取流式输出,实现实时反馈:

```python
for s in app.stream({
"messages": [],
"current_step": "initial"
}):
print(s)
```

---

### 总结

正如之前提到的,`StateGraph` 是 LangGraph 的核心类,让你像设计流程图一样组织多个参与者之间的协作。结合 **状态管理(Annotation)**、**节点编排** 和 **条件流转**,你可以构建出如对话系统、RAG 流程或多智能体协作等复杂的有状态 AI 应用。

如果你想进一步扩展,比如集成 LangChain 的链(Chain)或代理(Agent),LangGraph 也完全兼容这些组件,真正实现了灵活而强大的工作流编排。

📊 对话历史: 6 条记录

==================================================
第 4 轮对话
用户: 刚才提到的条件边是什么意思?
==================================================
原始查询: 刚才提到的条件边是什么意思?
上下文化查询: 在之前介绍 LangGraph 的使用时,提到了可以通过“条件边”来实现分支逻辑。那么,这里的“条件边”具体是什么意思?它是如何根据状态决定流程走向的?
检索到 5 个文档
助手: 很好的问题!我们之前在介绍如何使用 `StateGraph` 时提到了“条件边”(Conditional Edges),它正是 LangGraph 实现**动态流程控制**的关键机制。

简单来说,**条件边的作用是:根据当前应用的状态(state)来决定下一步该走向哪个节点**——就像程序中的 `if-else` 判断一样,让工作流可以根据运行时的数据智能地选择路径。

### 举个例子来说明

假设你正在构建一个客服对话机器人,当用户提问后:

- 如果问题是“订单查询”,就进入“订单处理”节点;
- 如果是“技术支持”,则跳转到“技术支援”团队;
- 否则,转给通用客服。

这个判断逻辑就可以通过**条件边**来实现。

在代码中,你会定义一个“条件函数”(conditional function),它的输入是当前状态,输出是下一个目标节点的名称:

```python
def route_based_on_question(state):
last_message = state["messages"][-1].content.lower()
if "order" in last_message:
return "handle_order"
elif "support" in last_message:
return "tech_support"
else:
return "general_customer_service"
```

然后把这个函数注册为某个节点之后的条件边:

```python
graph_builder.add_conditional_edges(
"classify_question", # 从哪个节点出发?
route_based_on_question, # 根据什么逻辑判断?
{
"handle_order": "handle_order",
"tech_support": "tech_support",
"general_customer_service": "general_customer_service"
}
)
```

这样,图就会根据实际内容自动选择后续路径,而不再是固定的线性流程。

---

### 和普通边的区别

- **普通边(Regular Edge)**:是静态的、固定的跳转。比如“执行完 A 就一定去 B”。
- **条件边(Conditional Edge)**:是动态的,会运行一段逻辑来决定“接下来去哪儿”。

这使得 LangGraph 非常适合用于构建需要决策能力的 AI 应用,比如:
- 多轮对话中的意图识别与路由
- RAG 流程中判断是否需要检索
- 自动化代理根据反馈重试或终止任务

正如文档 2 所说:`StateGraph` 允许你添加节点、边和**条件边**来创建复杂的工作流程 —— 正是这些条件边赋予了工作流“智能流转”的能力。

如果你希望流程更灵活、能根据上下文做判断,那条件边就是不可或缺的工具 😊

📊 对话历史: 8 条记录

==================================================
第 5 轮对话
用户: 能给我一个具体的例子吗?
==================================================
原始查询: 能给我一个具体的例子吗?
上下文化查询: 能给我一个使用 LangGraph 中条件边的具体例子吗?
检索到 5 个文档
助手: 当然可以!我们来一起看一个**使用 LangGraph 中条件边的具体例子**,结合你之前问到的 `StateGraph` 和条件边的概念,让你更直观地理解它是如何工作的。

---

### 🎯 场景:智能客服机器人

假设我们要构建一个简单的 AI 客服系统,它会根据用户的问题内容自动路由到不同的处理模块:

- 如果用户提到“订单”,就交给 **订单处理节点**
- 如果提到“退款”,就进入 **退款流程**
- 其他情况则转给 **通用客服**

这就非常适合用 **条件边** 来实现动态跳转!

---

### ✅ 第一步:定义状态结构

我们使用 `Annotation` 来定义应用的状态,特别是让 `messages` 自动累积:

```python
from langgraph.graph import StateGraph, Annotation, START, END
from typing import Annotated
import operator

# 定义状态
state_schema = Annotation({
"messages": Annotated[list, operator.add], # 消息会自动追加
"next_step": str # 记录下一步该做什么(可选)
})
```

---

### ✅ 第二步:创建图和节点

```python
def classify_question(state):
"""判断用户问题类型"""
last_message = state["messages"][-1].content.lower()
if "order" in last_message:
return {"next_step": "handle_order"}
elif "refund" in last_message or "money back" in last_message:
return {"next_step": "process_refund"}
else:
return {"next_step": "general_support"}

def handle_order(state):
return {"messages": [{"role": "assistant", "content": "正在查询您的订单..."}]}

def process_refund(state):
return {"messages": [{"role": "assistant", "content": "正在为您处理退款申请..."}]}

def general_support(state):
return {"messages": [{"role": "assistant", "content": "您好,请告诉我您需要什么帮助?"}]}

# 创建图
builder = StateGraph(state_schema)

# 添加节点
builder.add_node("classify_question", classify_question)
builder.add_node("handle_order", handle_order)
builder.add_node("process_refund", process_refund)
builder.add_node("general_support", general_support)
```

---

### ✅ 第三步:添加边,包括条件边!

这是关键部分 👇

```python
# 设置起始点
builder.add_edge(START, "classify_question")

# 使用条件边:根据 classify_question 的返回值决定去哪
def route_next_step(state):
return state["next_step"] # 返回的是 'handle_order'、'process_refund' 或 'general_support'

builder.add_conditional_edges(
"classify_question", # 从哪个节点出发?
route_next_step # 根据什么逻辑判断?
# 不需要显式写出映射也可以(默认会按名字匹配)
)

# 所有处理完成后都结束
builder.add_edge("handle_order", END)
builder.add_edge("process_refund", END)
builder.add_edge("general_support", END)
```

---

### ✅ 第四步:编译并运行

```python
app = builder.compile()

# 模拟用户提问
result = app.invoke({
"messages": [{"role": "user", "content": "我的订单在哪里?"}]
})

print(result["messages"][-1]["content"])
# 输出:正在查询您的订单...
```

换一个问题试试:

```python
result = app.invoke({
"messages": [{"role": "user", "content": "我想申请退款"}]
})

print(result["messages"][-1]["content"])
# 输出:正在为您处理退款申请...
```

---

### 🔍 总结一下

这个例子展示了:
- 如何通过 `Annotation` 定义有状态的数据结构(文档 3)
- 使用 `StateGraph` 构建工作流(文档 2)
- **条件边如何根据运行时状态动态选择路径** —— 这正是你关心的核心功能!

就像我们在前面聊到的,条件边就像是工作流里的“大脑决策器”,让 AI 应用不再是死板的线性流程,而是能像人一样根据不同情境做出反应。

如果你还想把这个流程做成实时流式输出(比如逐字打印回复),LangGraph 也支持 `.stream()` 方法哦(参考文档 4 😊)

需要我加上流式输出的版本吗?

📊 对话历史: 10 条记录

📜 完整对话历史:
1. user: LangGraph 是什么?...
2. assistant: LangGraph 是一个基于 LangChain 构建的库,专门用于创建有状态的多参与者应用程序。它扩展了 LangChain 表达式语言(LCEL),提供了在多个计算步骤中协调多个链(或称为“参与...
3. user: 它有什么主要特性?...
4. assistant: LangGraph 的主要特性可以总结为以下几点,基于它作为构建有状态、多参与者应用的强大工具定位:

1. **基于 StateGraph 的流程编排**:LangGraph 的核心是 `State...
5. user: 如何使用 StateGraph?...
6. assistant: 要使用 LangGraph 中的 `StateGraph` 来定义和编排有状态的多参与者应用流程,你可以按照以下步骤进行操作。这正是 LangGraph 作为构建复杂 AI 工作流核心工具的强大之处。...
7. user: 刚才提到的条件边是什么意思?...
8. assistant: 很好的问题!我们之前在介绍如何使用 `StateGraph` 时提到了“条件边”(Conditional Edges),它正是 LangGraph 实现**动态流程控制**的关键机制。

简单来说,*...
9. user: 能给我一个具体的例子吗?...
10. assistant: 当然可以!我们来一起看一个**使用 LangGraph 中条件边的具体例子**,结合你之前问到的 `StateGraph` 和条件边的概念,让你更直观地理解它是如何工作的。

---

### 🎯 ...
*/

文档管理系统

为了支持动态文档管理,我们需要一个完整的文档处理系统:

文档管理系统
/**
* ============================================================================
* 文档管理器 - Document Manager
* ============================================================================
*
* 📖 概述
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* 这是一个功能完善的文档管理系统,提供文档的增删改查、搜索和统计功能。
* 采用简化的向量存储实现,支持文档版本管理、元数据管理和基于关键词的
* 相似度搜索,为 RAG 系统提供文档管理基础设施。
*
* 🎯 核心功能
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* 1️⃣ 文档CRUD:添加、更新、删除、查询文档
* 2️⃣ 相似度搜索:基于文本匹配的文档检索
* 3️⃣ 元数据管理:支持标题、来源、类型、标签等元数据
* 4️⃣ 版本控制:自动跟踪文档版本和更新时间
* 5️⃣ 统计分析:按类型、标签统计文档分布
*
* 💡 实现思路
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* • 使用 Map 数据结构实现内存文档存储
* • 通过关键词匹配算法实现简化的相似度搜索
* • 自动生成文档ID和管理版本号
* • 提供 DocumentManager 类封装所有文档操作
*
* 🚀 使用方式
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* // 运行示例
* $ esno document-manager.ts
*
* ⚠️ 注意事项
* ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
* • 文档仅存储在内存中,应用重启后数据丢失
* • 相似度搜索基于简单的关键词匹配,不是真正的向量搜索
* • 生产环境建议使用真实的向量数据库(如 Pinecone、Weaviate)
* • 适合演示和学习,不适合大规模生产应用
*
* @author 程哥
* @version 1.0.0
* @updated 2025-11
*/

import '../../utils/loadEnv';
import { StateGraph, Annotation, START, END } from '@langchain/langgraph';
import { ChatOpenAI } from '@langchain/openai';

// 文档接口
interface Document {
id: string;
content: string;
metadata: {
title?: string;
source?: string;
type?: string;
tags?: string[];
createdAt: Date;
updatedAt: Date;
version: number;
};
}

// 简化的向量存储
class DocumentVectorStore {
private documents: Map<string, Document> = new Map();
private nextId = 1;

// 添加文档
async addDocument(
content: string,
metadata: Partial<Document['metadata']> = {}
): Promise<Document> {
const id = `doc_${this.nextId++}`;
const now = new Date();

const document: Document = {
id,
content,
metadata: {
title: metadata.title || `文档 ${id}`,
source: metadata.source || 'unknown',
type: metadata.type || 'text',
tags: metadata.tags || [],
createdAt: now,
updatedAt: now,
version: 1,
...metadata,
},
};

this.documents.set(id, document);
return document;
}

// 更新文档
async updateDocument(
id: string,
content?: string,
metadata?: Partial<Document['metadata']>
): Promise<Document | null> {
const existingDoc = this.documents.get(id);
if (!existingDoc) {
return null;
}

const updatedDoc: Document = {
...existingDoc,
content: content || existingDoc.content,
metadata: {
...existingDoc.metadata,
...metadata,
updatedAt: new Date(),
version: existingDoc.metadata.version + 1,
},
};

this.documents.set(id, updatedDoc);
return updatedDoc;
}

// 删除文档
async deleteDocument(id: string): Promise<boolean> {
return this.documents.delete(id);
}

// 获取文档
async getDocument(id: string): Promise<Document | null> {
return this.documents.get(id) || null;
}

// 列出所有文档
async listDocuments(): Promise<Document[]> {
return Array.from(this.documents.values());
}

// 搜索文档
async searchDocuments(
query: string,
limit: number = 10
): Promise<Document[]> {
const queryLower = query.toLowerCase();

const scored = Array.from(this.documents.values()).map((doc) => ({
document: doc,
score: this.calculateTextScore(doc.content.toLowerCase(), queryLower),
}));

return scored
.filter((item) => item.score > 0)
.sort((a, b) => b.score - a.score)
.slice(0, limit)
.map((item) => item.document);
}

// 计算文本匹配分数
private calculateTextScore(content: string, query: string): number {
const queryWords = query.split(/\s+/);
let score = 0;

for (const word of queryWords) {
if (content.includes(word)) {
score += 1;
}
}

return queryWords.length > 0 ? score / queryWords.length : 0;
}
}

// 全局文档存储实例
const documentStore = new DocumentVectorStore();

// 文档管理器类
class DocumentManager {
async addDocument(
content: string,
metadata?: Partial<Document['metadata']>
): Promise<Document> {
const newDoc = await documentStore.addDocument(content, metadata);
console.log(`✅ 成功添加文档: ${newDoc.id}`);
return newDoc;
}

async updateDocument(
id: string,
content?: string,
metadata?: Partial<Document['metadata']>
): Promise<Document> {
const updatedDoc = await documentStore.updateDocument(
id,
content,
metadata
);

if (!updatedDoc) {
throw new Error(`文档 ${id} 不存在`);
}

console.log(`✅ 成功更新文档: ${updatedDoc.id}`);
return updatedDoc;
}

async deleteDocument(id: string): Promise<void> {
const deleted = await documentStore.deleteDocument(id);

if (!deleted) {
throw new Error(`文档 ${id} 不存在`);
}

console.log(`✅ 成功删除文档: ${id}`);
}

async searchDocuments(query: string): Promise<Document[]> {
const results = await documentStore.searchDocuments(query, 10);
console.log(`🔍 搜索 "${query}" 找到 ${results.length} 个结果`);
return results;
}

async listDocuments(): Promise<Document[]> {
const documents = await documentStore.listDocuments();
console.log(`📋 列出所有文档,共 ${documents.length}`);
return documents;
}

async getDocument(id: string): Promise<Document | null> {
return await documentStore.getDocument(id);
}

async getStats() {
const documents = await this.listDocuments();

const documentsByType: Record<string, number> = {};
const documentsByTag: Record<string, number> = {};

for (const doc of documents) {
// 按类型统计
const type = doc.metadata.type || 'unknown';
documentsByType[type] = (documentsByType[type] || 0) + 1;

// 按标签统计
for (const tag of doc.metadata.tags || []) {
documentsByTag[tag] = (documentsByTag[tag] || 0) + 1;
}
}

return {
totalDocuments: documents.length,
documentsByType,
documentsByTag,
};
}
}

// 使用示例
async function runDocumentManager() {
const manager = new DocumentManager();

console.log('🚀 启动文档管理系统...\n');

try {
// 添加一些示例文档
console.log('📝 添加示例文档...');

const doc1 = await manager.addDocument(
'LangGraph 是一个用于构建有状态的多参与者应用程序的库,基于 LangChain 构建。它扩展了 LangChain Expression Language,具有在多个计算步骤中协调多个链(或参与者)的能力。',
{
title: 'LangGraph 简介',
type: 'documentation',
tags: ['langgraph', 'introduction'],
source: 'official-docs',
}
);

const doc2 = await manager.addDocument(
'RAG(检索增强生成)是一种结合信息检索和文本生成的技术,可以让 LLM 基于外部知识库生成更准确的答案。',
{
title: 'RAG 概念',
type: 'concept',
tags: ['rag', 'retrieval', 'generation'],
source: 'tutorial',
}
);

const doc3 = await manager.addDocument(
'向量数据库是 RAG 系统的核心组件,它存储文档的向量表示,支持高效的相似性搜索。',
{
title: '向量数据库',
type: 'concept',
tags: ['vector', 'database', 'search'],
source: 'tutorial',
}
);

console.log(`\n📊 文档添加完成,共添加 3 个文档\n`);

// 列出所有文档
console.log('📋 列出所有文档:');
const allDocs = await manager.listDocuments();
allDocs.forEach((doc) => {
console.log(`- ${doc.id}: ${doc.metadata.title} (${doc.metadata.type})`);
});

// 搜索文档
console.log('\n🔍 搜索测试:');
const searchQueries = ['LangGraph', 'RAG', '向量', '不存在的内容'];

for (const query of searchQueries) {
const results = await manager.searchDocuments(query);
console.log(`查询 "${query}": 找到 ${results.length} 个结果`);
results.forEach((doc) => {
console.log(` - ${doc.metadata.title}`);
});
}

// 更新文档
console.log('\n✏️ 更新文档测试:');
const updatedDoc = await manager.updateDocument(doc1.id, undefined, {
tags: ['langgraph', 'introduction', 'updated'],
});
console.log(`更新后的标签: ${updatedDoc.metadata.tags?.join(', ')}`);

// 获取统计信息
console.log('\n📈 统计信息:');
const stats = await manager.getStats();
console.log(`总文档数: ${stats.totalDocuments}`);
console.log('按类型分布:', stats.documentsByType);
console.log('按标签分布:', stats.documentsByTag);

// 删除文档
console.log('\n🗑️ 删除文档测试:');
await manager.deleteDocument(doc3.id);

const finalDocs = await manager.listDocuments();
console.log(`删除后剩余文档数: ${finalDocs.length}`);
} catch (error) {
console.error('❌ 操作失败:', error);
}
}

// 导出主要函数和类型
export { Document, DocumentVectorStore, DocumentManager, runDocumentManager };

// 如果直接运行此文件,执行示例
if (require.main === module) {
runDocumentManager().catch(console.error);
}

/*
执行结果:
Line:8 🌭 path.resolve(__dirname, '.env') /Users/loock/myFile/langgraphjs-tutorial/websites/examples/utils/.env
🚀 启动文档管理系统...

📝 添加示例文档...
✅ 成功添加文档: doc_1
✅ 成功添加文档: doc_2
✅ 成功添加文档: doc_3

📊 文档添加完成,共添加 3 个文档

📋 列出所有文档:
📋 列出所有文档,共 3 个
- doc_1: LangGraph 简介 (documentation)
- doc_2: RAG 概念 (concept)
- doc_3: 向量数据库 (concept)

🔍 搜索测试:
🔍 搜索 "LangGraph" 找到 1 个结果
查询 "LangGraph": 找到 1 个结果
- LangGraph 简介
🔍 搜索 "RAG" 找到 2 个结果
查询 "RAG": 找到 2 个结果
- RAG 概念
- 向量数据库
🔍 搜索 "向量" 找到 1 个结果
查询 "向量": 找到 1 个结果
- 向量数据库
🔍 搜索 "不存在的内容" 找到 0 个结果
查询 "不存在的内容": 找到 0 个结果

✏️ 更新文档测试:
✅ 成功更新文档: doc_1
更新后的标签: langgraph, introduction, updated

📈 统计信息:
📋 列出所有文档,共 3 个
总文档数: 3
按类型分布: { documentation: 1, concept: 2 }
按标签分布: {
langgraph: 1,
introduction: 1,
updated: 1,
rag: 1,
retrieval: 1,
generation: 1,
vector: 1,
database: 1,
search: 1
}

🗑️ 删除文档测试:
✅ 成功删除文档: doc_3
📋 列出所有文档,共 2 个
删除后剩余文档数: 2
*/

React 集成示例

在前端应用中集成 RAG 系统:

RAG 问答组件
import React, { useState, useRef, useEffect } from 'react';
import { StateGraph, Annotation, START, END } from '@langchain/langgraph';
import { ChatOpenAI } from '@langchain/openai';

// 简化的文档接口
interface SimpleDocument {
pageContent: string;
metadata: Record<string, any>;
}

// 消息接口
interface Message {
id: string;
role: 'user' | 'assistant';
content: string;
timestamp: Date;
sources?: SimpleDocument[];
}

// RAG 状态定义
const RAGState = Annotation.Root({
query: Annotation<string>(),
documents: Annotation<SimpleDocument[]>(),
answer: Annotation<string>(),
sources: Annotation<SimpleDocument[]>(),
});

// 简化的向量存储
class SimpleVectorStore {
private documents: SimpleDocument[] = [];

constructor() {
// 初始化一些示例文档
this.documents = [
{
pageContent:
'LangGraph 是一个用于构建有状态的多参与者应用程序的库,基于 LangChain 构建。它扩展了 LangChain Expression Language,具有在多个计算步骤中协调多个链(或参与者)的能力。',
metadata: { source: 'langgraph-intro', type: 'documentation' },
},
{
pageContent:
'StateGraph 是 LangGraph 的核心类,用于定义图的结构。它允许你添加节点、边和条件边来创建复杂的工作流程。',
metadata: { source: 'langgraph-stategraph', type: 'documentation' },
},
{
pageContent:
'RAG(检索增强生成)是一种结合信息检索和文本生成的技术,可以让 LLM 基于外部知识库生成更准确的答案。',
metadata: { source: 'rag-concept', type: 'concept' },
},
{
pageContent:
'在 LangGraph 中,你可以使用 Annotation 来定义状态的结构,包括字段类型、默认值和 reducer 函数。',
metadata: { source: 'langgraph-annotation', type: 'documentation' },
},
{
pageContent:
'向量数据库是 RAG 系统的核心组件,它存储文档的向量表示,支持高效的相似性搜索。',
metadata: { source: 'vector-database', type: 'concept' },
},
];
}

async similaritySearch(
query: string,
k: number = 3
): Promise<SimpleDocument[]> {
const queryLower = query.toLowerCase();
const scored = this.documents.map((doc) => ({
doc,
score: this.calculateSimpleScore(
doc.pageContent.toLowerCase(),
queryLower
),
}));

return scored
.sort((a, b) => b.score - a.score)
.slice(0, k)
.map((item) => item.doc);
}

private calculateSimpleScore(content: string, query: string): number {
const queryWords = query.split(' ');
let score = 0;

for (const word of queryWords) {
if (content.includes(word)) {
score += 1;
}
}

return score / queryWords.length;
}
}

// 初始化组件
const llm = new ChatOpenAI({
model: process.env.OPENAI_MODEL_NAME,
temperature: 0.1,
});

const vectorStore = new SimpleVectorStore();

/**
* 文档检索节点
*/
async function retrieveDocuments(state: typeof RAGState.State) {
const { query } = state;
const documents = await vectorStore.similaritySearch(query, 3);

return {
documents,
sources: documents,
};
}

/**
* 答案生成节点
*/
async function generateAnswer(state: typeof RAGState.State) {
const { query, documents = [] } = state;

if (documents.length === 0) {
return {
answer: '抱歉,我没有找到相关的信息来回答您的问题。',
};
}

const context = documents
.map((doc, index) => `文档 ${index + 1}:\n${doc.pageContent}`)
.join('\n\n');

const prompt = `
基于以下上下文信息回答用户问题:

上下文:
${context}

用户问题: ${query}

请提供一个准确、有帮助的答案。如果上下文中没有足够信息回答问题,请明确说明。

回答:`;

const response = await llm.invoke([
{
role: 'system',
content: '你是一个有帮助的助手,能够基于提供的上下文准确回答问题。',
},
{ role: 'user', content: prompt },
]);

return {
answer: response.content as string,
};
}

/**
* 创建 RAG 系统
*/
function createRAGSystem() {
const workflow = new StateGraph(RAGState)
.addNode('retrieveDocuments', retrieveDocuments)
.addNode('generateAnswer', generateAnswer)
.addEdge(START, 'retrieveDocuments')
.addEdge('retrieveDocuments', 'generateAnswer')
.addEdge('generateAnswer', END);

return workflow.compile();
}

// RAG 组件
const RAGComponent: React.FC = () => {
const [messages, setMessages] = useState<Message[]>([]);
const [input, setInput] = useState('');
const [isLoading, setIsLoading] = useState(false);
const [ragApp] = useState(() => createRAGSystem());
const messagesEndRef = useRef<HTMLDivElement>(null);

const scrollToBottom = () => {
messagesEndRef.current?.scrollIntoView({ behavior: 'smooth' });
};

useEffect(() => {
scrollToBottom();
}, [messages]);

const handleSubmit = async (e: React.FormEvent) => {
e.preventDefault();
if (!input.trim() || isLoading) return;

const userMessage: Message = {
id: Date.now().toString(),
role: 'user',
content: input.trim(),
timestamp: new Date(),
};

setMessages((prev) => [...prev, userMessage]);
setInput('');
setIsLoading(true);

try {
const result = await ragApp.invoke({
query: userMessage.content,
});

const assistantMessage: Message = {
id: (Date.now() + 1).toString(),
role: 'assistant',
content: result.answer,
timestamp: new Date(),
sources: result.sources,
};

setMessages((prev) => [...prev, assistantMessage]);
} catch (error) {
console.error('RAG 查询失败:', error);
const errorMessage: Message = {
id: (Date.now() + 1).toString(),
role: 'assistant',
content: '抱歉,处理您的问题时出现了错误。请稍后再试。',
timestamp: new Date(),
};
setMessages((prev) => [...prev, errorMessage]);
} finally {
setIsLoading(false);
}
};

const clearMessages = () => {
setMessages([]);
};

return (
<div
style={{
maxWidth: '800px',
margin: '0 auto',
padding: '20px',
fontFamily:
'-apple-system, BlinkMacSystemFont, "Segoe UI", Roboto, sans-serif',
}}
>
<div
style={{
textAlign: 'center',
marginBottom: '30px',
padding: '20px',
background: 'linear-gradient(135deg, #667eea 0%, #764ba2 100%)',
color: 'white',
borderRadius: '12px',
}}
>
<h2 style={{ margin: '0 0 10px 0', fontSize: '24px' }}>
🔍 RAG 问答系统
</h2>
<p style={{ margin: '0 0 15px 0', opacity: 0.9 }}>
基于知识库的智能问答,支持上下文检索和答案生成
</p>
<button
onClick={clearMessages}
style={{
background: 'rgba(255, 255, 255, 0.2)',
color: 'white',
border: '1px solid rgba(255, 255, 255, 0.3)',
padding: '8px 16px',
borderRadius: '6px',
cursor: 'pointer',
}}
>
清空对话
</button>
</div>

<div
style={{
height: '500px',
overflowY: 'auto',
border: '1px solid #e1e5e9',
borderRadius: '8px',
padding: '20px',
marginBottom: '20px',
background: '#fafbfc',
}}
>
{messages.length === 0 && (
<div
style={{ textAlign: 'center', color: '#666', padding: '40px 20px' }}
>
<h3 style={{ color: '#333', marginBottom: '10px' }}>
👋 欢迎使用 RAG 问答系统
</h3>
<p>您可以询问关于 LangGraph、RAG 系统等相关问题</p>
<div
style={{
marginTop: '30px',
textAlign: 'left',
maxWidth: '400px',
marginLeft: 'auto',
marginRight: 'auto',
}}
>
<h4 style={{ color: '#333', marginBottom: '10px' }}>
示例问题:
</h4>
<ul style={{ listStyle: 'none', padding: 0 }}>
<li
style={{
padding: '8px 12px',
margin: '5px 0',
background: '#f0f2f5',
borderRadius: '6px',
}}
>
LangGraph 是什么?
</li>
<li
style={{
padding: '8px 12px',
margin: '5px 0',
background: '#f0f2f5',
borderRadius: '6px',
}}
>
RAG 系统如何工作?
</li>
<li
style={{
padding: '8px 12px',
margin: '5px 0',
background: '#f0f2f5',
borderRadius: '6px',
}}
>
什么是向量数据库?
</li>
<li
style={{
padding: '8px 12px',
margin: '5px 0',
background: '#f0f2f5',
borderRadius: '6px',
}}
>
如何使用 StateGraph?
</li>
</ul>
</div>
</div>
)}

{messages.map((message) => (
<div
key={message.id}
style={{
marginBottom: '20px',
padding: '15px',
borderRadius: '12px',
maxWidth: '85%',
background: message.role === 'user' ? '#007bff' : 'white',
color: message.role === 'user' ? 'white' : 'black',
marginLeft: message.role === 'user' ? 'auto' : '0',
marginRight: message.role === 'assistant' ? 'auto' : '0',
border:
message.role === 'assistant' ? '1px solid #e1e5e9' : 'none',
}}
>
<div
style={{
display: 'flex',
justifyContent: 'space-between',
alignItems: 'center',
marginBottom: '8px',
fontSize: '12px',
opacity: 0.8,
}}
>
<span>{message.role === 'user' ? '👤 用户' : '🤖 助手'}</span>
<span>{message.timestamp.toLocaleTimeString()}</span>
</div>
<div style={{ lineHeight: 1.5, whiteSpace: 'pre-wrap' }}>
{message.content}
</div>
{message.sources && message.sources.length > 0 && (
<div
style={{
marginTop: '15px',
paddingTop: '15px',
borderTop: '1px solid #e1e5e9',
}}
>
<h4
style={{
margin: '0 0 10px 0',
fontSize: '14px',
color: '#666',
}}
>
📚 参考来源:
</h4>
{message.sources.map((source, index) => (
<div
key={index}
style={{
marginBottom: '10px',
padding: '10px',
background: '#f8f9fa',
borderRadius: '6px',
}}
>
<div
style={{
display: 'flex',
justifyContent: 'space-between',
marginBottom: '5px',
fontSize: '12px',
}}
>
<span
style={{
background: '#e9ecef',
padding: '2px 6px',
borderRadius: '3px',
}}
>
{source.metadata.type || 'document'}
</span>
<span style={{ color: '#666' }}>
{source.metadata.source || `来源 ${index + 1}`}
</span>
</div>
<div style={{ fontSize: '13px', color: '#555' }}>
{source.pageContent.substring(0, 150)}...
</div>
</div>
))}
</div>
)}
</div>
))}

{isLoading && (
<div
style={{
marginBottom: '20px',
padding: '15px',
borderRadius: '12px',
maxWidth: '85%',
background: 'white',
border: '1px solid #e1e5e9',
opacity: 0.7,
}}
>
<div
style={{
display: 'flex',
justifyContent: 'space-between',
alignItems: 'center',
marginBottom: '8px',
fontSize: '12px',
opacity: 0.8,
}}
>
<span>🤖 助手</span>
<span>正在思考...</span>
</div>
<div>
<div style={{ display: 'flex', gap: '4px' }}>
<span
style={{
width: '8px',
height: '8px',
background: '#007bff',
borderRadius: '50%',
animation: 'pulse 1.5s ease-in-out infinite',
}}
>

</span>
<span
style={{
width: '8px',
height: '8px',
background: '#007bff',
borderRadius: '50%',
animation: 'pulse 1.5s ease-in-out 0.1s infinite',
}}
>

</span>
<span
style={{
width: '8px',
height: '8px',
background: '#007bff',
borderRadius: '50%',
animation: 'pulse 1.5s ease-in-out 0.2s infinite',
}}
>

</span>
</div>
</div>
</div>
)}

<div ref={messagesEndRef} />
</div>

<form onSubmit={handleSubmit}>
<div style={{ display: 'flex', gap: '10px' }}>
<input
type='text'
value={input}
onChange={(e) => setInput(e.target.value)}
placeholder='请输入您的问题...'
disabled={isLoading}
style={{
flex: 1,
padding: '12px',
border: '1px solid #ddd',
borderRadius: '6px',
fontSize: '14px',
}}
/>
<button
type='submit'
disabled={!input.trim() || isLoading}
style={{
padding: '12px 20px',
background: '#007bff',
color: 'white',
border: 'none',
borderRadius: '6px',
cursor: 'pointer',
fontSize: '14px',
}}
>
{isLoading ? '⏳' : '📤'}
</button>
</div>
</form>
</div>
);
};

export default RAGComponent;

架构模式

简单 RAG 架构

高级 RAG 架构

最佳实践

1. 文档预处理

文档分块策略

合理的文档分块对检索质量至关重要:

// ✅ 好的分块策略
const chunkText = (
text: string,
chunkSize: number = 1000,
overlap: number = 200
) => {
const chunks = [];
let start = 0;

while (start < text.length) {
const end = Math.min(start + chunkSize, text.length);
const chunk = text.slice(start, end);

// 确保在句子边界分割
const lastSentence = chunk.lastIndexOf('。');
const actualEnd =
lastSentence > start + chunkSize * 0.8 ? lastSentence + 1 : end;

chunks.push({
content: text.slice(start, actualEnd),
metadata: { start, end: actualEnd },
});

start = actualEnd - overlap;
}

return chunks;
};

// ❌ 避免的做法
const badChunking = (text: string) => {
// 简单按字符数分割,可能在句子中间断开
return text.match(/.{1,1000}/g) || [];
};

2. 检索优化

// 混合检索策略
const hybridRetrieval = async (query: string, topK: number = 5) => {
// 1. 向量检索
const vectorResults = await vectorStore.similaritySearch(query, topK);

// 2. 关键词检索
const keywordResults = await keywordSearch(query, topK);

// 3. 结果融合
const combinedResults = fuseResults(vectorResults, keywordResults);

return combinedResults.slice(0, topK);
};

3. 上下文管理

上下文长度控制

合理控制上下文长度,避免超出模型限制:

const buildContext = (documents: Document[], maxTokens: number = 4000) => {
let context = '';
let tokenCount = 0;

for (const doc of documents) {
const docTokens = estimateTokens(doc.content);

if (tokenCount + docTokens > maxTokens) {
// 截断文档内容
const remainingTokens = maxTokens - tokenCount;
const truncatedContent = truncateToTokens(doc.content, remainingTokens);
context += truncatedContent;
break;
}

context += doc.content + '\n\n';
tokenCount += docTokens;
}

return context;
};

4. 答案质量评估

// 答案质量评估
const evaluateAnswer = async (
question: string,
answer: string,
context: string
) => {
const evaluationPrompt = `
请评估以下答案的质量:

问题:${question}
上下文:${context}
答案:${answer}

评估标准:
1. 准确性:答案是否基于提供的上下文
2. 完整性:答案是否充分回答了问题
3. 相关性:答案是否与问题相关

请给出评分(1-5分)和简要说明。
`;

const evaluation = await model.invoke([
{ role: 'user', content: evaluationPrompt },
]);

return parseEvaluation(evaluation.content);
};

性能优化

1. 缓存策略

// 查询结果缓存
const queryCache = new Map<string, any>();

const cachedRetrieval = async (query: string) => {
const cacheKey = hashQuery(query);

if (queryCache.has(cacheKey)) {
return queryCache.get(cacheKey);
}

const results = await performRetrieval(query);
queryCache.set(cacheKey, results);

return results;
};

2. 批量处理

// 批量向量化
const batchEmbedding = async (texts: string[], batchSize: number = 10) => {
const embeddings = [];

for (let i = 0; i < texts.length; i += batchSize) {
const batch = texts.slice(i, i + batchSize);
const batchEmbeddings = await embeddings.embedDocuments(batch);
embeddings.push(...batchEmbeddings);
}

return embeddings;
};

3. 异步处理

// 异步文档索引
const indexDocumentsAsync = async (documents: Document[]) => {
const indexPromises = documents.map(async (doc) => {
const embedding = await embeddings.embedQuery(doc.content);
return vectorStore.addDocument(doc, embedding);
});

await Promise.all(indexPromises);
};

常见问题解决

1. 检索质量问题

问题:检索到的文档与问题不相关

解决方案

  • 改进查询预处理
  • 使用混合检索策略
  • 调整相似性阈值
  • 优化文档分块

2. 答案幻觉问题

问题:LLM 生成了不基于上下文的答案

解决方案

  • 强化提示词约束
  • 添加答案验证步骤
  • 使用更严格的生成参数
  • 实施后处理检查

3. 性能问题

问题:检索和生成速度慢

解决方案

  • 实施缓存机制
  • 优化向量数据库
  • 使用批量处理
  • 考虑模型量化

小结

RAG 系统是构建智能问答应用的核心技术,通过本节的学习,你应该掌握了:

  1. 基础架构:如何构建简单的 RAG 系统
  2. 高级功能:文档质量评估和自适应检索
  3. 多轮对话:如何处理上下文相关的连续问题
  4. 文档管理:动态添加和更新知识库
  5. 前端集成:如何在 React 应用中使用 RAG 系统
  6. 性能优化:提高系统响应速度和准确性

接下来,我们将学习代码生成,了解如何构建智能编程助手。