跳到主要内容

📊 数据分析

数据分析是 LangGraph 的另一个重要应用场景,它能够帮助我们构建智能的数据处理和洞察提取系统。通过结合 LLM 的理解能力和传统数据处理技术,我们可以创建出既强大又易用的数据分析工具。

引言

在现代数据驱动的世界中,数据分析已经成为决策制定的核心环节。LangGraph 为构建智能数据分析系统提供了独特的优势,它可以:

  • 自动理解数据结构和含义
  • 生成合适的分析策略
  • 执行复杂的数据处理流程
  • 提供自然语言形式的洞察报告

与前端开发的关联

对于前端开发者来说,数据分析就像是:

  • 状态管理:类似于 Redux 中的数据流转换
  • 数据可视化:类似于使用 Chart.js 或 D3.js 处理数据
  • 组件化分析:将分析步骤模块化,类似于 React 组件
  • 响应式数据:类似于 Vue 的响应式数据系统

核心概念

数据分析系统通常包含以下几个关键步骤:

数据获取

从各种数据源收集和整合数据:

  • 结构化数据(CSV、JSON、数据库)
  • 半结构化数据(XML、日志文件)
  • 非结构化数据(文本、图像)

数据清洗

确保数据质量和一致性:

  • 处理缺失值
  • 去除重复数据
  • 数据类型转换
  • 异常值检测

分析执行

执行各种分析算法:

  • 描述性统计
  • 相关性分析
  • 趋势分析
  • 聚类分析

基础数据分析器

让我们从一个简单的数据分析器开始:

基础数据分析器:

import '../../utils/loadEnv';
import { StateGraph, Annotation, START, END } from '@langchain/langgraph';
import { ChatOpenAI } from '@langchain/openai';

// 定义状态结构
const DataAnalyzerState = Annotation.Root({
// 原始数据
rawData: Annotation<any[]>(),
// 分析需求
analysisRequest: Annotation<string>(),
// 数据结构信息
dataStructure: Annotation<{
columns: string[];
types: Record<string, string>;
sampleSize: number;
hasNumerical: boolean;
hasCategorical: boolean;
hasTime: boolean;
}>(),
// 分析结果
analysisResults: Annotation<{
summary: string;
statistics: Record<string, any>;
insights: string[];
recommendations: string[];
}>(),
// 分析报告
report: Annotation<string>(),
});

// 初始化 LLM
const llm = new ChatOpenAI({
model: process.env.OPENAI_MODEL_NAME,
temperature: 0.1,
});

/**
* 数据理解节点
* 分析数据结构和特征
*/
async function understandData(state: typeof DataAnalyzerState.State) {
const { rawData } = state;

if (!rawData || rawData.length === 0) {
return {
dataStructure: {
columns: [],
types: {},
sampleSize: 0,
hasNumerical: false,
hasCategorical: false,
hasTime: false,
},
};
}

// 分析数据结构
const firstRow = rawData[0];
const columns = Object.keys(firstRow);
const types: Record<string, string> = {};
let hasNumerical = false;
let hasCategorical = false;
let hasTime = false;

// 分析每列的数据类型
for (const column of columns) {
const values = rawData.map((row) => row[column]).filter((v) => v != null);

if (values.length === 0) {
types[column] = 'empty';
continue;
}

const firstValue = values[0];

// 检查是否为数字
if (typeof firstValue === 'number' || !isNaN(Number(firstValue))) {
types[column] = 'numerical';
hasNumerical = true;
}
// 检查是否为日期
else if (isValidDate(firstValue)) {
types[column] = 'datetime';
hasTime = true;
}
// 其他情况视为分类数据
else {
types[column] = 'categorical';
hasCategorical = true;
}
}

return {
dataStructure: {
columns,
types,
sampleSize: rawData.length,
hasNumerical,
hasCategorical,
hasTime,
},
};
}

/**
* 数据分析节点
* 执行具体的数据分析
*/
async function analyzeData(state: typeof DataAnalyzerState.State) {
const { rawData, dataStructure, analysisRequest } = state;

// 基础统计分析
const statistics: Record<string, any> = {};

for (const column of dataStructure.columns) {
const values = rawData.map((row) => row[column]).filter((v) => v != null);

if (dataStructure.types[column] === 'numerical') {
const numbers = values.map(Number);
statistics[column] = {
count: numbers.length,
mean: numbers.reduce((a, b) => a + b, 0) / numbers.length,
min: Math.min(...numbers),
max: Math.max(...numbers),
median: calculateMedian(numbers),
std: calculateStandardDeviation(numbers),
};
} else if (dataStructure.types[column] === 'categorical') {
const frequency: Record<string, number> = {};
values.forEach((value) => {
frequency[value] = (frequency[value] || 0) + 1;
});
statistics[column] = {
count: values.length,
unique: Object.keys(frequency).length,
frequency,
mode: Object.keys(frequency).reduce((a, b) =>
frequency[a] > frequency[b] ? a : b
),
};
}
}

// 使用 LLM 生成深度洞察
const prompt = `
基于以下数据分析结果,生成深度洞察和建议:

分析需求:${analysisRequest}

数据概况:
- 样本数量:${dataStructure.sampleSize}
- 列数:${dataStructure.columns.length}
- 数据类型:${Object.entries(dataStructure.types)
.map(([col, type]) => `${col}(${type})`)
.join(', ')}

统计结果:
${JSON.stringify(statistics, null, 2)}

请提供:
1. 数据概要总结
2. 关键洞察(3-5个)
3. 行动建议(3-5个)

返回JSON格式:
{
"summary": "数据概要总结",
"insights": ["洞察1", "洞察2", "洞察3"],
"recommendations": ["建议1", "建议2", "建议3"]
}
`;

const response = await llm.invoke([
{
role: 'system',
content:
'你是一个专业的数据分析师,擅长从数据中发现有价值的洞察和提供实用的建议。',
},
{ role: 'user', content: prompt },
]);

try {
const analysis = JSON.parse(response.content as string);
return {
analysisResults: {
summary: analysis.summary,
statistics,
insights: analysis.insights || [],
recommendations: analysis.recommendations || [],
},
};
} catch (error) {
// 如果解析失败,返回基础分析结果
return {
analysisResults: {
summary: '数据分析完成,包含基础统计信息。',
statistics,
insights: ['数据质量良好', '包含多种数据类型', '样本量充足'],
recommendations: ['继续深入分析', '考虑数据可视化', '建立预测模型'],
},
};
}
}

/**
* 报告生成节点
* 生成最终的分析报告
*/
async function generateReport(state: typeof DataAnalyzerState.State) {
const { analysisResults, dataStructure, analysisRequest } = state;

const prompt = `
基于以下分析结果,生成一份专业的数据分析报告:

分析需求:${analysisRequest}

数据概况:
- 样本数量:${dataStructure.sampleSize}
- 字段数量:${dataStructure.columns.length}
- 数据类型分布:数值型(${Object.values(dataStructure.types).filter((t) => t === 'numerical').length}),分类型(${Object.values(dataStructure.types).filter((t) => t === 'categorical').length}),时间型(${Object.values(dataStructure.types).filter((t) => t === 'datetime').length})

分析结果:
${analysisResults.summary}

关键洞察:
${analysisResults.insights.map((insight, i) => `${i + 1}. ${insight}`).join('\n')}

建议:
${analysisResults.recommendations.map((rec, i) => `${i + 1}. ${rec}`).join('\n')}

请生成一份结构化的分析报告,包括:
1. 执行摘要
2. 数据概况
3. 分析发现
4. 关键洞察
5. 行动建议
6. 结论

报告应该专业、清晰、易于理解。
`;

const response = await llm.invoke([
{
role: 'system',
content:
'你是一个专业的数据分析报告撰写专家,擅长将复杂的分析结果转化为清晰易懂的商业报告。',
},
{ role: 'user', content: prompt },
]);

return {
report: response.content as string,
};
}

/**
* 构建基础数据分析器图
*/
function createBasicDataAnalyzer() {
const workflow = new StateGraph(DataAnalyzerState)
.addNode('understandData', understandData)
.addNode('analyzeData', analyzeData)
.addNode('generateReport', generateReport)
.addEdge(START, 'understandData')
.addEdge('understandData', 'analyzeData')
.addEdge('analyzeData', 'generateReport')
.addEdge('generateReport', END);

return workflow.compile();
}

// 辅助函数
function isValidDate(value: any): boolean {
if (typeof value === 'string') {
const date = new Date(value);
return !isNaN(date.getTime());
}
return value instanceof Date && !isNaN(value.getTime());
}

function calculateMedian(numbers: number[]): number {
const sorted = [...numbers].sort((a, b) => a - b);
const mid = Math.floor(sorted.length / 2);
return sorted.length % 2 === 0
? (sorted[mid - 1] + sorted[mid]) / 2
: sorted[mid];
}

function calculateStandardDeviation(numbers: number[]): number {
const mean = numbers.reduce((a, b) => a + b, 0) / numbers.length;
const variance =
numbers.reduce((sum, num) => sum + Math.pow(num - mean, 2), 0) /
numbers.length;
return Math.sqrt(variance);
}

// 使用示例
async function runBasicDataAnalyzer() {
const app = createBasicDataAnalyzer();

console.log('🚀 启动基础数据分析器...\n');

// 示例数据:销售数据
const salesData = [
{
product: 'iPhone',
category: 'Electronics',
price: 999,
quantity: 150,
date: '2024-01-15',
},
{
product: 'MacBook',
category: 'Electronics',
price: 1299,
quantity: 80,
date: '2024-01-16',
},
{
product: 'AirPods',
category: 'Electronics',
price: 179,
quantity: 300,
date: '2024-01-17',
},
{
product: 'iPad',
category: 'Electronics',
price: 599,
quantity: 120,
date: '2024-01-18',
},
{
product: 'Watch',
category: 'Electronics',
price: 399,
quantity: 200,
date: '2024-01-19',
},
{
product: 'Keyboard',
category: 'Accessories',
price: 99,
quantity: 250,
date: '2024-01-20',
},
{
product: 'Mouse',
category: 'Accessories',
price: 59,
quantity: 400,
date: '2024-01-21',
},
{
product: 'Monitor',
category: 'Electronics',
price: 299,
quantity: 90,
date: '2024-01-22',
},
];

const result = await app.invoke({
rawData: salesData,
analysisRequest:
'分析产品销售数据,了解销售趋势和产品表现,为库存管理和营销策略提供建议',
});

console.log('📊 数据分析结果:');
console.log('数据结构:', result.dataStructure);
console.log('\n📈 统计摘要:', result.analysisResults.summary);

console.log('\n🔍 关键洞察:');
result.analysisResults.insights.forEach((insight, index) => {
console.log(`${index + 1}. ${insight}`);
});

console.log('\n💡 行动建议:');
result.analysisResults.recommendations.forEach((rec, index) => {
console.log(`${index + 1}. ${rec}`);
});

console.log('\n📋 完整报告:');
console.log(result.report);
}

// 批量分析示例
async function runBatchAnalysis() {
const app = createBasicDataAnalyzer();

console.log('🚀 启动批量数据分析...\n');

const datasets = [
{
name: '用户行为数据',
data: [
{
userId: 1,
action: 'login',
duration: 120,
device: 'mobile',
timestamp: '2024-01-01T10:00:00Z',
},
{
userId: 2,
action: 'purchase',
duration: 300,
device: 'desktop',
timestamp: '2024-01-01T11:00:00Z',
},
{
userId: 3,
action: 'browse',
duration: 180,
device: 'mobile',
timestamp: '2024-01-01T12:00:00Z',
},
{
userId: 4,
action: 'login',
duration: 90,
device: 'tablet',
timestamp: '2024-01-01T13:00:00Z',
},
{
userId: 5,
action: 'purchase',
duration: 450,
device: 'desktop',
timestamp: '2024-01-01T14:00:00Z',
},
],
request: '分析用户行为模式,识别高价值用户特征',
},
{
name: '财务数据',
data: [
{
month: 'Jan',
revenue: 50000,
expenses: 30000,
profit: 20000,
customers: 1200,
},
{
month: 'Feb',
revenue: 55000,
expenses: 32000,
profit: 23000,
customers: 1350,
},
{
month: 'Mar',
revenue: 48000,
expenses: 29000,
profit: 19000,
customers: 1100,
},
{
month: 'Apr',
revenue: 62000,
expenses: 35000,
profit: 27000,
customers: 1500,
},
{
month: 'May',
revenue: 58000,
expenses: 33000,
profit: 25000,
customers: 1400,
},
],
request: '分析财务表现趋势,评估业务健康状况',
},
];

for (const dataset of datasets) {
console.log(`\n📊 分析数据集: ${dataset.name}`);
console.log('='.repeat(40));

const result = await app.invoke({
rawData: dataset.data,
analysisRequest: dataset.request,
});

console.log(
`✅ 分析完成 - 发现 ${result.analysisResults.insights.length} 个关键洞察`
);
console.log('主要发现:', result.analysisResults.insights[0]);
}
}

// 导出主要函数和类型
export {
DataAnalyzerState,
createBasicDataAnalyzer,
runBasicDataAnalyzer,
runBatchAnalysis,
understandData,
analyzeData,
generateReport,
isValidDate,
calculateMedian,
calculateStandardDeviation,
};

// 如果直接运行此文件,执行示例
if (require.main === module) {
runBasicDataAnalyzer()
.then(() => {
console.log('\n' + '='.repeat(60) + '\n');
return runBatchAnalysis();
})
.catch(console.error);
}

这个基础分析器展示了数据分析的核心流程:

  1. 接收数据和分析需求
  2. 理解数据结构
  3. 执行相应的分析
  4. 生成分析报告

可视化数据分析器

为了更好地展示分析结果,我们可以集成数据可视化功能:

可视化数据分析器:

import '../../utils/loadEnv';
import { StateGraph, Annotation, START, END } from '@langchain/langgraph';
import { ChatOpenAI } from '@langchain/openai';

// 图表配置接口
interface ChartConfig {
type: 'bar' | 'line' | 'pie' | 'scatter' | 'histogram';
title: string;
xAxis?: string;
yAxis?: string;
data: any[];
options?: Record<string, any>;
}

// 定义状态结构
const VisualizationAnalyzerState = Annotation.Root({
// 原始数据
rawData: Annotation<any[]>(),
// 分析需求
analysisRequest: Annotation<string>(),
// 数据结构信息
dataStructure: Annotation<{
columns: string[];
types: Record<string, string>;
sampleSize: number;
hasNumerical: boolean;
hasCategorical: boolean;
hasTime: boolean;
}>(),
// 分析结果
analysisResults: Annotation<{
summary: string;
statistics: Record<string, any>;
insights: string[];
correlations?: Array<{ x: string; y: string; correlation: number }>;
}>(),
// 可视化配置
visualizations: Annotation<ChartConfig[]>({
reducer: (x, y) => y,
default: () => [],
}),
// 最终报告
report: Annotation<{
summary: string;
charts: ChartConfig[];
insights: string[];
recommendations: string[];
}>(),
});

// 初始化 LLM
const llm = new ChatOpenAI({
model: process.env.OPENAI_MODEL_NAME,
temperature: 0.1,
});

/**
* 数据分析节点
* 执行数据分析并计算统计信息
*/
async function analyzeData(state: typeof VisualizationAnalyzerState.State) {
const { rawData, analysisRequest } = state;

if (!rawData || rawData.length === 0) {
return {
dataStructure: {
columns: [],
types: {},
sampleSize: 0,
hasNumerical: false,
hasCategorical: false,
hasTime: false,
},
analysisResults: {
summary: '没有可分析的数据',
statistics: {},
insights: [],
},
};
}

// 分析数据结构
const firstRow = rawData[0];
const columns = Object.keys(firstRow);
const types: Record<string, string> = {};
let hasNumerical = false;
let hasCategorical = false;
let hasTime = false;

// 分析每列的数据类型
for (const column of columns) {
const values = rawData.map((row) => row[column]).filter((v) => v != null);

if (values.length === 0) {
types[column] = 'empty';
continue;
}

const firstValue = values[0];

if (typeof firstValue === 'number' || !isNaN(Number(firstValue))) {
types[column] = 'numerical';
hasNumerical = true;
} else if (isValidDate(firstValue)) {
types[column] = 'datetime';
hasTime = true;
} else {
types[column] = 'categorical';
hasCategorical = true;
}
}

const dataStructure = {
columns,
types,
sampleSize: rawData.length,
hasNumerical,
hasCategorical,
hasTime,
};

// 基础统计分析
const statistics: Record<string, any> = {};
const correlations: Array<{ x: string; y: string; correlation: number }> = [];

// 数值列统计
const numericalColumns = columns.filter((col) => types[col] === 'numerical');

for (const column of numericalColumns) {
const values = rawData
.map((row) => Number(row[column]))
.filter((v) => !isNaN(v));
statistics[column] = {
count: values.length,
mean: values.reduce((a, b) => a + b, 0) / values.length,
min: Math.min(...values),
max: Math.max(...values),
median: calculateMedian(values),
std: calculateStandardDeviation(values),
};
}

// 计算数值列之间的相关性
for (let i = 0; i < numericalColumns.length; i++) {
for (let j = i + 1; j < numericalColumns.length; j++) {
const col1 = numericalColumns[i];
const col2 = numericalColumns[j];
const values1 = rawData.map((row) => Number(row[col1]));
const values2 = rawData.map((row) => Number(row[col2]));
const correlation = calculateCorrelation(values1, values2);
correlations.push({ x: col1, y: col2, correlation });
}
}

// 分类列统计
const categoricalColumns = columns.filter(
(col) => types[col] === 'categorical'
);

for (const column of categoricalColumns) {
const values = rawData.map((row) => row[column]).filter((v) => v != null);
const frequency: Record<string, number> = {};
values.forEach((value) => {
frequency[value] = (frequency[value] || 0) + 1;
});
statistics[column] = {
count: values.length,
unique: Object.keys(frequency).length,
frequency,
mode: Object.keys(frequency).reduce((a, b) =>
frequency[a] > frequency[b] ? a : b
),
};
}

// 使用 LLM 生成洞察
const prompt = `
基于以下数据分析结果,生成关键洞察:

分析需求:${analysisRequest}

数据概况:
- 样本数量:${dataStructure.sampleSize}
- 数值列:${numericalColumns.join(', ')}
- 分类列:${categoricalColumns.join(', ')}

统计结果:
${JSON.stringify(statistics, null, 2)}

相关性分析:
${correlations.map((c) => `${c.x} vs ${c.y}: ${c.correlation.toFixed(3)}`).join('\n')}

请提供:
1. 数据概要总结
2. 关键洞察(3-5个)

返回JSON格式:
{
"summary": "数据概要总结",
"insights": ["洞察1", "洞察2", "洞察3"]
}
`;

const response = await llm.invoke([
{
role: 'system',
content: '你是一个专业的数据分析师,擅长从统计数据中发现有价值的洞察。',
},
{ role: 'user', content: prompt },
]);

try {
const analysis = JSON.parse(response.content as string);
return {
dataStructure,
analysisResults: {
summary: analysis.summary,
statistics,
insights: analysis.insights || [],
correlations,
},
};
} catch (error) {
return {
dataStructure,
analysisResults: {
summary: '数据分析完成,包含基础统计信息。',
statistics,
insights: ['数据质量良好', '包含多种数据类型', '样本量充足'],
correlations,
},
};
}
}

/**
* 可视化生成节点
* 根据数据特征生成合适的图表配置
*/
async function generateVisualizations(
state: typeof VisualizationAnalyzerState.State
) {
const { rawData, dataStructure, analysisResults } = state;
const visualizations: ChartConfig[] = [];

// 为数值列生成直方图
const numericalColumns = dataStructure.columns.filter(
(col) => dataStructure.types[col] === 'numerical'
);

for (const column of numericalColumns.slice(0, 3)) {
// 限制图表数量
const values = rawData.map((row) => Number(row[column]));
const histogram = createHistogram(values, column);
visualizations.push({
type: 'histogram',
title: `${column} 分布图`,
data: histogram,
options: {
bins: 10,
color: '#3498db',
},
});
}

// 为分类列生成饼图
const categoricalColumns = dataStructure.columns.filter(
(col) => dataStructure.types[col] === 'categorical'
);

for (const column of categoricalColumns.slice(0, 2)) {
const stats = analysisResults.statistics[column];
if (stats && stats.frequency) {
const pieData = Object.entries(stats.frequency).map(([label, value]) => ({
label,
value: value as number,
}));
visualizations.push({
type: 'pie',
title: `${column} 分布`,
data: pieData,
options: {
colors: ['#e74c3c', '#3498db', '#2ecc71', '#f39c12', '#9b59b6'],
},
});
}
}

// 生成相关性散点图
if (analysisResults.correlations && analysisResults.correlations.length > 0) {
const strongCorrelations = analysisResults.correlations.filter(
(c) => Math.abs(c.correlation) > 0.5
);

for (const corr of strongCorrelations.slice(0, 2)) {
const scatterData = rawData.map((row) => ({
x: Number(row[corr.x]),
y: Number(row[corr.y]),
}));

visualizations.push({
type: 'scatter',
title: `${corr.x} vs ${corr.y} (相关性: ${corr.correlation.toFixed(3)})`,
xAxis: corr.x,
yAxis: corr.y,
data: scatterData,
options: {
color: corr.correlation > 0 ? '#2ecc71' : '#e74c3c',
},
});
}
}

// 如果有时间列,生成时间序列图
const timeColumns = dataStructure.columns.filter(
(col) => dataStructure.types[col] === 'datetime'
);

if (timeColumns.length > 0 && numericalColumns.length > 0) {
const timeCol = timeColumns[0];
const valueCol = numericalColumns[0];

const timeSeriesData = rawData
.map((row) => ({
x: new Date(row[timeCol]).getTime(),
y: Number(row[valueCol]),
date: row[timeCol],
}))
.sort((a, b) => a.x - b.x);

visualizations.push({
type: 'line',
title: `${valueCol} 时间趋势`,
xAxis: timeCol,
yAxis: valueCol,
data: timeSeriesData,
options: {
color: '#3498db',
smooth: true,
},
});
}

return {
visualizations,
};
}

/**
* 报告生成节点
* 整合分析结果和可视化,生成最终报告
*/
async function generateReport(state: typeof VisualizationAnalyzerState.State) {
const { analysisResults, visualizations, analysisRequest } = state;

const prompt = `
基于以下分析结果和可视化图表,生成一份综合的数据分析报告:

分析需求:${analysisRequest}

分析摘要:${analysisResults.summary}

关键洞察:
${analysisResults.insights.map((insight, i) => `${i + 1}. ${insight}`).join('\n')}

生成的图表:
${visualizations.map((chart, i) => `${i + 1}. ${chart.title} (${chart.type})`).join('\n')}

请提供:
1. 执行摘要
2. 基于图表的深度分析
3. 行动建议(3-5个)

返回JSON格式:
{
"summary": "执行摘要",
"insights": ["基于图表的洞察1", "洞察2", "洞察3"],
"recommendations": ["建议1", "建议2", "建议3"]
}
`;

const response = await llm.invoke([
{
role: 'system',
content: '你是一个数据可视化专家,擅长解读图表并提供商业洞察。',
},
{ role: 'user', content: prompt },
]);

try {
const reportData = JSON.parse(response.content as string);
return {
report: {
summary: reportData.summary,
charts: visualizations,
insights: reportData.insights || [],
recommendations: reportData.recommendations || [],
},
};
} catch (error) {
return {
report: {
summary: '数据可视化分析完成,生成了多个图表展示数据特征。',
charts: visualizations,
insights: analysisResults.insights,
recommendations: [
'基于图表进一步分析数据趋势',
'关注异常值和离群点',
'考虑更深入的统计建模',
],
},
};
}
}

/**
* 构建可视化数据分析器图
*/
function createVisualizationAnalyzer() {
const workflow = new StateGraph(VisualizationAnalyzerState)
.addNode('analyzeData', analyzeData)
.addNode('generateVisualizations', generateVisualizations)
.addNode('generateReport', generateReport)
.addEdge(START, 'analyzeData')
.addEdge('analyzeData', 'generateVisualizations')
.addEdge('generateVisualizations', 'generateReport')
.addEdge('generateReport', END);

return workflow.compile();
}

// 辅助函数
function isValidDate(value: any): boolean {
if (typeof value === 'string') {
const date = new Date(value);
return !isNaN(date.getTime());
}
return value instanceof Date && !isNaN(value.getTime());
}

function calculateMedian(numbers: number[]): number {
const sorted = [...numbers].sort((a, b) => a - b);
const mid = Math.floor(sorted.length / 2);
return sorted.length % 2 === 0
? (sorted[mid - 1] + sorted[mid]) / 2
: sorted[mid];
}

function calculateStandardDeviation(numbers: number[]): number {
const mean = numbers.reduce((a, b) => a + b, 0) / numbers.length;
const variance =
numbers.reduce((sum, num) => sum + Math.pow(num - mean, 2), 0) /
numbers.length;
return Math.sqrt(variance);
}

function calculateCorrelation(x: number[], y: number[]): number {
const n = Math.min(x.length, y.length);
const meanX = x.slice(0, n).reduce((a, b) => a + b, 0) / n;
const meanY = y.slice(0, n).reduce((a, b) => a + b, 0) / n;

let numerator = 0;
let sumXSquared = 0;
let sumYSquared = 0;

for (let i = 0; i < n; i++) {
const deltaX = x[i] - meanX;
const deltaY = y[i] - meanY;
numerator += deltaX * deltaY;
sumXSquared += deltaX * deltaX;
sumYSquared += deltaY * deltaY;
}

const denominator = Math.sqrt(sumXSquared * sumYSquared);
return denominator === 0 ? 0 : numerator / denominator;
}

function createHistogram(values: number[], columnName: string) {
const min = Math.min(...values);
const max = Math.max(...values);
const binCount = 10;
const binSize = (max - min) / binCount;

const bins = Array(binCount).fill(0);
const binLabels = [];

for (let i = 0; i < binCount; i++) {
const binStart = min + i * binSize;
const binEnd = min + (i + 1) * binSize;
binLabels.push(`${binStart.toFixed(1)}-${binEnd.toFixed(1)}`);
}

values.forEach((value) => {
const binIndex = Math.min(
Math.floor((value - min) / binSize),
binCount - 1
);
bins[binIndex]++;
});

return bins.map((count, index) => ({
range: binLabels[index],
count,
value: count,
}));
}

// 使用示例
async function runVisualizationAnalyzer() {
const app = createVisualizationAnalyzer();

console.log('🚀 启动可视化数据分析器...\n');

// 示例数据:销售业绩数据
const salesData = [
{
salesperson: 'Alice',
region: 'North',
sales: 120000,
deals: 45,
experience: 3,
},
{
salesperson: 'Bob',
region: 'South',
sales: 95000,
deals: 38,
experience: 2,
},
{
salesperson: 'Charlie',
region: 'East',
sales: 150000,
deals: 52,
experience: 5,
},
{
salesperson: 'Diana',
region: 'West',
sales: 110000,
deals: 41,
experience: 4,
},
{
salesperson: 'Eve',
region: 'North',
sales: 135000,
deals: 48,
experience: 6,
},
{
salesperson: 'Frank',
region: 'South',
sales: 88000,
deals: 35,
experience: 1,
},
{
salesperson: 'Grace',
region: 'East',
sales: 142000,
deals: 50,
experience: 4,
},
{
salesperson: 'Henry',
region: 'West',
sales: 98000,
deals: 39,
experience: 2,
},
{
salesperson: 'Ivy',
region: 'North',
sales: 125000,
deals: 46,
experience: 3,
},
{
salesperson: 'Jack',
region: 'South',
sales: 105000,
deals: 42,
experience: 3,
},
];

const result = await app.invoke({
rawData: salesData,
analysisRequest: '分析销售团队的业绩表现,识别高绩效因素和改进机会',
});

console.log('📊 可视化分析结果:');
console.log('数据结构:', result.dataStructure);
console.log('\n📈 分析摘要:', result.analysisResults.summary);

console.log('\n🔍 关键洞察:');
result.analysisResults.insights.forEach((insight, index) => {
console.log(`${index + 1}. ${insight}`);
});

console.log('\n📊 生成的图表:');
result.report.charts.forEach((chart, index) => {
console.log(`${index + 1}. ${chart.title} (${chart.type})`);
console.log(` 数据点数量: ${chart.data.length}`);
});

console.log('\n💡 行动建议:');
result.report.recommendations.forEach((rec, index) => {
console.log(`${index + 1}. ${rec}`);
});

console.log('\n📋 完整报告摘要:');
console.log(result.report.summary);
}

// 流式执行示例
async function runVisualizationAnalyzerWithStreaming() {
const app = createVisualizationAnalyzer();

console.log('🚀 启动流式可视化分析器...\n');

const stream = await app.stream(
{
rawData: [
{ month: 'Jan', revenue: 50000, customers: 1200, satisfaction: 4.2 },
{ month: 'Feb', revenue: 55000, customers: 1350, satisfaction: 4.3 },
{ month: 'Mar', revenue: 48000, customers: 1100, satisfaction: 4.1 },
{ month: 'Apr', revenue: 62000, customers: 1500, satisfaction: 4.4 },
{ month: 'May', revenue: 58000, customers: 1400, satisfaction: 4.2 },
{ month: 'Jun', revenue: 65000, customers: 1600, satisfaction: 4.5 },
],
analysisRequest: '分析月度业务指标趋势,评估客户满意度与收入的关系',
},
{ streamMode: 'updates' }
);

for await (const chunk of stream) {
const [nodeName, nodeOutput] = Object.entries(chunk)[0];
console.log(`📍 执行节点: ${nodeName}`);

if (nodeName === 'analyzeData') {
const output = nodeOutput as any;
console.log(
` 数据分析完成 - 发现 ${output.analysisResults?.insights?.length || 0} 个洞察`
);
} else if (nodeName === 'generateVisualizations') {
const output = nodeOutput as any;
console.log(
` 可视化生成完成 - 创建了 ${output.visualizations?.length || 0} 个图表`
);
} else if (nodeName === 'generateReport') {
console.log(' 📊 最终报告生成完成');
}
console.log('');
}
}

// 导出主要函数和类型
export {
VisualizationAnalyzerState,
ChartConfig,
createVisualizationAnalyzer,
runVisualizationAnalyzer,
runVisualizationAnalyzerWithStreaming,
analyzeData,
generateVisualizations,
generateReport,
calculateCorrelation,
createHistogram,
isValidDate,
calculateMedian,
calculateStandardDeviation,
};

// 如果直接运行此文件,执行示例
if (require.main === module) {
runVisualizationAnalyzer()
.then(() => {
console.log('\n' + '='.repeat(60) + '\n');
return runVisualizationAnalyzerWithStreaming();
})
.catch(console.error);
}

这个增强版本使用了 Analyzer-Visualizer 模式

  • 分析器节点:执行数据分析
  • 可视化节点:生成图表和可视化
  • 报告节点:整合分析结果和可视化

分析流程图

智能洞察提取器

对于复杂的数据分析,我们需要能够自动发现数据中的模式和洞察:

智能洞察提取器:

import '../../utils/loadEnv';
import { StateGraph, Annotation, START, END } from '@langchain/langgraph';
import { ChatOpenAI } from '@langchain/openai';

// 模式类型定义
interface Pattern {
type: 'trend' | 'anomaly' | 'correlation' | 'cluster' | 'seasonal';
description: string;
confidence: number;
data: any;
significance: 'high' | 'medium' | 'low';
}

// 洞察类型定义
interface Insight {
id: string;
title: string;
description: string;
type: 'opportunity' | 'risk' | 'trend' | 'recommendation';
confidence: number;
impact: 'high' | 'medium' | 'low';
evidence: string[];
actionable: boolean;
}

// 定义状态结构
const InsightExtractorState = Annotation.Root({
// 原始数据
rawData: Annotation<any[]>(),
// 分析目标
analysisGoal: Annotation<string>(),
// 业务上下文
businessContext: Annotation<string>(),
// 清洗后的数据
cleanedData: Annotation<any[]>(),
// 检测到的模式
patterns: Annotation<Pattern[]>({
reducer: (x, y) => y,
default: () => [],
}),
// 生成的洞察
insights: Annotation<Insight[]>({
reducer: (x, y) => y,
default: () => [],
}),
// 验证结果
validationResults: Annotation<{
validInsights: Insight[];
rejectedInsights: Insight[];
confidenceScore: number;
}>(),
// 最终报告
finalReport: Annotation<{
executiveSummary: string;
keyInsights: Insight[];
actionPlan: string[];
nextSteps: string[];
}>(),
});

// 初始化 LLM
const llm = new ChatOpenAI({
model: process.env.OPENAI_MODEL_NAME,
temperature: 0.1,
});

/**
* 数据预处理节点
* 清洗和准备数据用于模式检测
*/
async function preprocessData(state: typeof InsightExtractorState.State) {
const { rawData } = state;

if (!rawData || rawData.length === 0) {
return {
cleanedData: [],
};
}

// 数据清洗步骤
let cleanedData = [...rawData];

// 1. 移除空值和无效数据
cleanedData = cleanedData.filter((row) => {
return Object.values(row).some((value) => value != null && value !== '');
});

// 2. 标准化数值字段
const firstRow = cleanedData[0];
const columns = Object.keys(firstRow);

for (const column of columns) {
const values = cleanedData.map((row) => row[column]);
const isNumeric = values.every(
(value) => value == null || !isNaN(Number(value))
);

if (isNumeric) {
cleanedData = cleanedData.map((row) => ({
...row,
[column]: row[column] != null ? Number(row[column]) : null,
}));
}
}

// 3. 处理异常值(使用IQR方法)
for (const column of columns) {
const values = cleanedData
.map((row) => row[column])
.filter((v) => typeof v === 'number' && !isNaN(v));

if (values.length > 0) {
const sorted = values.sort((a, b) => a - b);
const q1 = sorted[Math.floor(sorted.length * 0.25)];
const q3 = sorted[Math.floor(sorted.length * 0.75)];
const iqr = q3 - q1;
const lowerBound = q1 - 1.5 * iqr;
const upperBound = q3 + 1.5 * iqr;

// 标记异常值而不是删除
cleanedData = cleanedData.map((row) => {
if (typeof row[column] === 'number') {
const isOutlier =
row[column] < lowerBound || row[column] > upperBound;
return {
...row,
[`${column}_outlier`]: isOutlier,
};
}
return row;
});
}
}

console.log(
`数据预处理完成: ${rawData.length} -> ${cleanedData.length} 条记录`
);

return {
cleanedData,
};
}

/**
* 模式检测节点
* 检测数据中的各种模式
*/
async function detectPatterns(state: typeof InsightExtractorState.State) {
const { cleanedData, analysisGoal } = state;
const patterns: Pattern[] = [];

if (!cleanedData || cleanedData.length === 0) {
return { patterns };
}

const firstRow = cleanedData[0];
const columns = Object.keys(firstRow).filter(
(col) => !col.endsWith('_outlier')
);

// 1. 趋势检测
const numericalColumns = columns.filter((col) => {
return cleanedData.every(
(row) => row[col] == null || typeof row[col] === 'number'
);
});

for (const column of numericalColumns) {
const values = cleanedData
.map((row, index) => ({ index, value: row[column] }))
.filter((item) => item.value != null);

if (values.length > 3) {
const trend = detectTrend(values);
if (trend.confidence > 0.6) {
patterns.push({
type: 'trend',
description: `${column} 显示${trend.direction}趋势`,
confidence: trend.confidence,
data: { column, trend: trend.direction, slope: trend.slope },
significance: trend.confidence > 0.8 ? 'high' : 'medium',
});
}
}
}

// 2. 相关性检测
for (let i = 0; i < numericalColumns.length; i++) {
for (let j = i + 1; j < numericalColumns.length; j++) {
const col1 = numericalColumns[i];
const col2 = numericalColumns[j];

const values1 = cleanedData
.map((row) => row[col1])
.filter((v) => v != null);
const values2 = cleanedData
.map((row) => row[col2])
.filter((v) => v != null);

if (values1.length > 3 && values2.length > 3) {
const correlation = calculateCorrelation(values1, values2);

if (Math.abs(correlation) > 0.5) {
patterns.push({
type: 'correlation',
description: `${col1}${col2} 存在${correlation > 0 ? '正' : '负'}相关关系`,
confidence: Math.abs(correlation),
data: { col1, col2, correlation },
significance: Math.abs(correlation) > 0.7 ? 'high' : 'medium',
});
}
}
}
}

// 3. 异常值检测
const outlierColumns = columns.filter((col) =>
cleanedData.some((row) => row[`${col}_outlier`])
);

for (const column of outlierColumns) {
const outlierCount = cleanedData.filter(
(row) => row[`${column}_outlier`]
).length;
const outlierRatio = outlierCount / cleanedData.length;

if (outlierRatio > 0.05) {
// 超过5%的异常值
patterns.push({
type: 'anomaly',
description: `${column} 存在显著异常值 (${(outlierRatio * 100).toFixed(1)}%)`,
confidence: Math.min(outlierRatio * 2, 1),
data: { column, outlierCount, outlierRatio },
significance: outlierRatio > 0.1 ? 'high' : 'medium',
});
}
}

// 4. 聚类模式检测(简化版本)
const categoricalColumns = columns.filter((col) => {
const uniqueValues = new Set(cleanedData.map((row) => row[col]));
return (
uniqueValues.size < cleanedData.length * 0.5 && uniqueValues.size > 1
);
});

for (const column of categoricalColumns) {
const distribution = getDistribution(cleanedData, column);
const entropy = calculateEntropy(Object.values(distribution));

if (entropy < 1.5) {
// 低熵表示数据集中
patterns.push({
type: 'cluster',
description: `${column} 显示明显的聚集模式`,
confidence: 1 - entropy / 2,
data: { column, distribution, entropy },
significance: entropy < 1 ? 'high' : 'medium',
});
}
}

console.log(`模式检测完成: 发现 ${patterns.length} 个模式`);

return { patterns };
}

/**
* 洞察生成节点
* 基于检测到的模式生成业务洞察
*/
async function generateInsights(state: typeof InsightExtractorState.State) {
const { patterns, analysisGoal, businessContext } = state;

if (!patterns || patterns.length === 0) {
return {
insights: [],
};
}

const prompt = `
基于以下数据模式,生成有价值的业务洞察:

分析目标:${analysisGoal}
业务背景:${businessContext}

检测到的模式:
${patterns
.map(
(pattern, i) => `
${i + 1}. ${pattern.type}: ${pattern.description}
- 置信度: ${(pattern.confidence * 100).toFixed(1)}%
- 重要性: ${pattern.significance}
- 数据: ${JSON.stringify(pattern.data)}
`
)
.join('\n')}

请为每个重要模式生成具体的业务洞察,包括:
1. 洞察标题
2. 详细描述
3. 洞察类型 (opportunity/risk/trend/recommendation)
4. 业务影响 (high/medium/low)
5. 支持证据
6. 是否可执行

返回JSON格式的洞察数组:
[
{
"title": "洞察标题",
"description": "详细描述",
"type": "opportunity|risk|trend|recommendation",
"confidence": 0.85,
"impact": "high|medium|low",
"evidence": ["证据1", "证据2"],
"actionable": true
}
]
`;

const response = await llm.invoke([
{
role: 'system',
content:
'你是一个资深的商业分析师,擅长从数据模式中提取有价值的商业洞察。',
},
{ role: 'user', content: prompt },
]);

try {
const insightData = JSON.parse(response.content as string);
const insights: Insight[] = insightData.map(
(insight: any, index: number) => ({
id: `insight_${index + 1}`,
title: insight.title,
description: insight.description,
type: insight.type,
confidence: insight.confidence || 0.7,
impact: insight.impact,
evidence: insight.evidence || [],
actionable: insight.actionable || false,
})
);

console.log(`洞察生成完成: 生成 ${insights.length} 个洞察`);

return { insights };
} catch (error) {
// 如果解析失败,生成基础洞察
const basicInsights: Insight[] = patterns
.filter((p) => p.significance === 'high')
.map((pattern, index) => ({
id: `insight_${index + 1}`,
title: `${pattern.type} 模式发现`,
description: pattern.description,
type: 'trend' as const,
confidence: pattern.confidence,
impact: 'medium' as const,
evidence: [pattern.description],
actionable: true,
}));

return { insights: basicInsights };
}
}

/**
* 洞察验证节点
* 验证洞察的可靠性和相关性
*/
async function validateInsights(state: typeof InsightExtractorState.State) {
const { insights, patterns, cleanedData } = state;

if (!insights || insights.length === 0) {
return {
validationResults: {
validInsights: [],
rejectedInsights: [],
confidenceScore: 0,
},
};
}

const validInsights: Insight[] = [];
const rejectedInsights: Insight[] = [];

for (const insight of insights) {
let validationScore = insight.confidence;

// 验证标准1: 置信度阈值
if (insight.confidence < 0.5) {
validationScore -= 0.2;
}

// 验证标准2: 支持证据数量
if (insight.evidence.length < 2) {
validationScore -= 0.1;
}

// 验证标准3: 与模式的一致性
const supportingPatterns = patterns.filter((pattern) =>
insight.evidence.some((evidence) =>
evidence.includes(pattern.description)
)
);

if (supportingPatterns.length === 0) {
validationScore -= 0.2;
}

// 验证标准4: 数据量充足性
if (cleanedData.length < 10) {
validationScore -= 0.1;
}

// 验证标准5: 可执行性
if (insight.actionable && insight.impact === 'high') {
validationScore += 0.1;
}

if (validationScore >= 0.6) {
validInsights.push({
...insight,
confidence: validationScore,
});
} else {
rejectedInsights.push(insight);
}
}

const confidenceScore =
validInsights.length > 0
? validInsights.reduce((sum, insight) => sum + insight.confidence, 0) /
validInsights.length
: 0;

console.log(
`洞察验证完成: ${validInsights.length} 个有效, ${rejectedInsights.length} 个被拒绝`
);

return {
validationResults: {
validInsights,
rejectedInsights,
confidenceScore,
},
};
}

/**
* 报告生成节点
* 生成最终的洞察报告
*/
async function generateFinalReport(state: typeof InsightExtractorState.State) {
const { validationResults, analysisGoal, businessContext } = state;
const { validInsights } = validationResults;

if (!validInsights || validInsights.length === 0) {
return {
finalReport: {
executiveSummary: '未发现有效的业务洞察',
keyInsights: [],
actionPlan: [],
nextSteps: ['收集更多数据', '重新定义分析目标'],
},
};
}

const prompt = `
基于以下验证过的洞察,生成一份执行摘要和行动计划:

分析目标:${analysisGoal}
业务背景:${businessContext}

有效洞察:
${validInsights
.map(
(insight, i) => `
${i + 1}. ${insight.title} (${insight.type}, 影响: ${insight.impact})
描述: ${insight.description}
置信度: ${(insight.confidence * 100).toFixed(1)}%
证据: ${insight.evidence.join(', ')}
可执行: ${insight.actionable ? '是' : '否'}
`
)
.join('\n')}

请生成:
1. 执行摘要(200字以内)
2. 具体的行动计划(3-5项)
3. 下一步建议(3-5项)

返回JSON格式:
{
"executiveSummary": "执行摘要",
"actionPlan": ["行动1", "行动2", "行动3"],
"nextSteps": ["步骤1", "步骤2", "步骤3"]
}
`;

const response = await llm.invoke([
{
role: 'system',
content: '你是一个高级商业顾问,擅长将数据洞察转化为可执行的商业策略。',
},
{ role: 'user', content: prompt },
]);

try {
const reportData = JSON.parse(response.content as string);
return {
finalReport: {
executiveSummary: reportData.executiveSummary,
keyInsights: validInsights,
actionPlan: reportData.actionPlan || [],
nextSteps: reportData.nextSteps || [],
},
};
} catch (error) {
return {
finalReport: {
executiveSummary: `基于数据分析,发现了 ${validInsights.length} 个关键洞察,需要进一步行动。`,
keyInsights: validInsights,
actionPlan: [
'基于高影响洞察制定具体行动',
'分配责任人和时间表',
'建立监控和评估机制',
],
nextSteps: [
'深入分析高置信度洞察',
'制定详细实施计划',
'定期跟踪和评估效果',
],
},
};
}
}

/**
* 构建智能洞察提取器图
*/
function createInsightExtractor() {
const workflow = new StateGraph(InsightExtractorState)
.addNode('preprocessData', preprocessData)
.addNode('detectPatterns', detectPatterns)
.addNode('generateInsights', generateInsights)
.addNode('validateInsights', validateInsights)
.addNode('generateFinalReport', generateFinalReport)
.addEdge(START, 'preprocessData')
.addEdge('preprocessData', 'detectPatterns')
.addEdge('detectPatterns', 'generateInsights')
.addEdge('generateInsights', 'validateInsights')
.addEdge('validateInsights', 'generateFinalReport')
.addEdge('generateFinalReport', END);

return workflow.compile();
}

// 辅助函数
function detectTrend(values: Array<{ index: number; value: number }>) {
const n = values.length;
const sumX = values.reduce((sum, item) => sum + item.index, 0);
const sumY = values.reduce((sum, item) => sum + item.value, 0);
const sumXY = values.reduce((sum, item) => sum + item.index * item.value, 0);
const sumXX = values.reduce((sum, item) => sum + item.index * item.index, 0);

const slope = (n * sumXY - sumX * sumY) / (n * sumXX - sumX * sumX);
const intercept = (sumY - slope * sumX) / n;

// 计算R²来评估趋势的置信度
const meanY = sumY / n;
const totalSumSquares = values.reduce(
(sum, item) => sum + Math.pow(item.value - meanY, 2),
0
);
const residualSumSquares = values.reduce((sum, item) => {
const predicted = slope * item.index + intercept;
return sum + Math.pow(item.value - predicted, 2);
}, 0);

const rSquared = 1 - residualSumSquares / totalSumSquares;
const confidence = Math.max(0, Math.min(1, rSquared));

return {
slope,
intercept,
confidence,
direction: slope > 0 ? '上升' : '下降',
};
}

function calculateCorrelation(x: number[], y: number[]): number {
const n = Math.min(x.length, y.length);
const meanX = x.slice(0, n).reduce((a, b) => a + b, 0) / n;
const meanY = y.slice(0, n).reduce((a, b) => a + b, 0) / n;

let numerator = 0;
let sumXSquared = 0;
let sumYSquared = 0;

for (let i = 0; i < n; i++) {
const deltaX = x[i] - meanX;
const deltaY = y[i] - meanY;
numerator += deltaX * deltaY;
sumXSquared += deltaX * deltaX;
sumYSquared += deltaY * deltaY;
}

const denominator = Math.sqrt(sumXSquared * sumYSquared);
return denominator === 0 ? 0 : numerator / denominator;
}

function getDistribution(data: any[], column: string): Record<string, number> {
const distribution: Record<string, number> = {};

data.forEach((row) => {
const value = String(row[column]);
distribution[value] = (distribution[value] || 0) + 1;
});

return distribution;
}

function calculateEntropy(values: number[]): number {
const total = values.reduce((sum, val) => sum + val, 0);
if (total === 0) return 0;

const probabilities = values.map((val) => val / total);
return -probabilities.reduce((entropy, prob) => {
return prob > 0 ? entropy + prob * Math.log2(prob) : entropy;
}, 0);
}

// 使用示例
async function runInsightExtractor() {
const app = createInsightExtractor();

console.log('🚀 启动智能洞察提取器...\n');

// 示例数据:电商销售数据
const ecommerceData = [
{
month: 'Jan',
sales: 120000,
orders: 450,
customers: 380,
avgOrderValue: 267,
returnRate: 0.05,
},
{
month: 'Feb',
sales: 135000,
orders: 520,
customers: 420,
avgOrderValue: 260,
returnRate: 0.04,
},
{
month: 'Mar',
sales: 98000,
orders: 380,
customers: 320,
avgOrderValue: 258,
returnRate: 0.08,
},
{
month: 'Apr',
sales: 165000,
orders: 610,
customers: 510,
avgOrderValue: 270,
returnRate: 0.03,
},
{
month: 'May',
sales: 142000,
orders: 530,
customers: 450,
avgOrderValue: 268,
returnRate: 0.06,
},
{
month: 'Jun',
sales: 178000,
orders: 680,
customers: 580,
avgOrderValue: 262,
returnRate: 0.04,
},
{
month: 'Jul',
sales: 195000,
orders: 750,
customers: 630,
avgOrderValue: 260,
returnRate: 0.05,
},
{
month: 'Aug',
sales: 210000,
orders: 820,
customers: 690,
avgOrderValue: 256,
returnRate: 0.07,
},
{
month: 'Sep',
sales: 185000,
orders: 720,
customers: 600,
avgOrderValue: 257,
returnRate: 0.06,
},
{
month: 'Oct',
sales: 225000,
orders: 880,
customers: 750,
avgOrderValue: 256,
returnRate: 0.04,
},
{
month: 'Nov',
sales: 280000,
orders: 1100,
customers: 920,
avgOrderValue: 255,
returnRate: 0.03,
},
{
month: 'Dec',
sales: 320000,
orders: 1250,
customers: 1050,
avgOrderValue: 256,
returnRate: 0.02,
},
];

const result = await app.invoke({
rawData: ecommerceData,
analysisGoal: '分析电商业务表现,识别增长机会和风险点',
businessContext:
'这是一家中型电商公司的年度销售数据,主要销售消费电子产品,目标是实现可持续增长并提升客户满意度。',
});

console.log('📊 洞察提取结果:');
console.log(`数据清洗: ${result.cleanedData.length} 条记录`);
console.log(`模式检测: ${result.patterns.length} 个模式`);
console.log(`洞察生成: ${result.insights.length} 个初始洞察`);
console.log(
`洞察验证: ${result.validationResults.validInsights.length} 个有效洞察`
);

console.log('\n🔍 关键洞察:');
result.finalReport.keyInsights.forEach((insight, index) => {
console.log(`${index + 1}. ${insight.title} (${insight.type})`);
console.log(
` 影响: ${insight.impact}, 置信度: ${(insight.confidence * 100).toFixed(1)}%`
);
console.log(` 描述: ${insight.description}`);
console.log(` 可执行: ${insight.actionable ? '是' : '否'}`);
console.log('');
});

console.log('📋 执行摘要:');
console.log(result.finalReport.executiveSummary);

console.log('\n💡 行动计划:');
result.finalReport.actionPlan.forEach((action, index) => {
console.log(`${index + 1}. ${action}`);
});

console.log('\n🎯 下一步建议:');
result.finalReport.nextSteps.forEach((step, index) => {
console.log(`${index + 1}. ${step}`);
});
}

// 流式执行示例
async function runInsightExtractorWithStreaming() {
const app = createInsightExtractor();

console.log('🚀 启动流式洞察提取器...\n');

const stream = await app.stream(
{
rawData: [
{
department: 'Sales',
employees: 25,
satisfaction: 7.2,
turnover: 0.15,
productivity: 85,
},
{
department: 'Marketing',
employees: 18,
satisfaction: 8.1,
turnover: 0.08,
productivity: 92,
},
{
department: 'Engineering',
employees: 45,
satisfaction: 7.8,
turnover: 0.12,
productivity: 88,
},
{
department: 'Support',
employees: 22,
satisfaction: 6.9,
turnover: 0.22,
productivity: 78,
},
{
department: 'HR',
employees: 8,
satisfaction: 7.5,
turnover: 0.1,
productivity: 82,
},
{
department: 'Finance',
employees: 12,
satisfaction: 7.3,
turnover: 0.18,
productivity: 80,
},
],
analysisGoal: '分析员工满意度和生产力,识别人力资源管理的改进机会',
businessContext:
'这是一家快速成长的科技公司,正在经历组织扩张,需要优化人力资源管理策略。',
},
{ streamMode: 'updates' }
);

for await (const chunk of stream) {
const [nodeName, nodeOutput] = Object.entries(chunk)[0];
console.log(`📍 执行节点: ${nodeName}`);

if (nodeName === 'preprocessData') {
const output = nodeOutput as any;
console.log(
` 数据预处理完成 - ${output.cleanedData?.length || 0} 条记录`
);
} else if (nodeName === 'detectPatterns') {
const output = nodeOutput as any;
console.log(
` 模式检测完成 - 发现 ${output.patterns?.length || 0} 个模式`
);
} else if (nodeName === 'generateInsights') {
const output = nodeOutput as any;
console.log(
` 洞察生成完成 - 生成 ${output.insights?.length || 0} 个洞察`
);
} else if (nodeName === 'validateInsights') {
const output = nodeOutput as any;
const valid = output.validationResults?.validInsights?.length || 0;
const rejected = output.validationResults?.rejectedInsights?.length || 0;
console.log(` 洞察验证完成 - ${valid} 个有效, ${rejected} 个被拒绝`);
} else if (nodeName === 'generateFinalReport') {
console.log(' 📊 最终报告生成完成');
}
console.log('');
}
}

// 导出主要函数和类型
export {
InsightExtractorState,
Pattern,
Insight,
createInsightExtractor,
runInsightExtractor,
runInsightExtractorWithStreaming,
preprocessData,
detectPatterns,
generateInsights,
validateInsights,
generateFinalReport,
detectTrend,
calculateCorrelation,
getDistribution,
calculateEntropy,
};

// 如果直接运行此文件,执行示例
if (require.main === module) {
runInsightExtractor()
.then(() => {
console.log('\n' + '='.repeat(60) + '\n');
return runInsightExtractorWithStreaming();
})
.catch(console.error);
}

这个系统使用了 Multi-Stage Analysis 模式

  • 预处理器:清洗和准备数据
  • 模式检测器:发现数据中的模式
  • 洞察生成器:基于模式生成业务洞察
  • 验证器:验证洞察的可靠性

洞察提取流程

数据分析工具集

数据分析系统需要集成各种专业工具:

数据分析工具:

import '../../utils/loadEnv';
import { tool } from '@langchain/core/tools';
import { z } from 'zod';

/**
* 统计分析工具
* 计算基础统计指标
*/
export const statisticalAnalysisTool = tool(
async ({ data, column }: { data: any[]; column: string }) => {
if (!data || data.length === 0) {
return {
error: '数据为空',
statistics: null,
};
}

const values = data
.map((row) => row[column])
.filter((value) => value != null && !isNaN(Number(value)))
.map(Number);

if (values.length === 0) {
return {
error: `${column} 不包含有效的数值数据`,
statistics: null,
};
}

// 基础统计
const count = values.length;
const sum = values.reduce((a, b) => a + b, 0);
const mean = sum / count;
const sortedValues = [...values].sort((a, b) => a - b);
const min = sortedValues[0];
const max = sortedValues[sortedValues.length - 1];

// 中位数
const median =
count % 2 === 0
? (sortedValues[count / 2 - 1] + sortedValues[count / 2]) / 2
: sortedValues[Math.floor(count / 2)];

// 四分位数
const q1Index = Math.floor(count * 0.25);
const q3Index = Math.floor(count * 0.75);
const q1 = sortedValues[q1Index];
const q3 = sortedValues[q3Index];
const iqr = q3 - q1;

// 方差和标准差
const variance =
values.reduce((sum, value) => sum + Math.pow(value - mean, 2), 0) / count;
const standardDeviation = Math.sqrt(variance);

// 偏度和峰度
const skewness = calculateSkewness(values, mean, standardDeviation);
const kurtosis = calculateKurtosis(values, mean, standardDeviation);

return {
column,
statistics: {
count,
sum,
mean: Number(mean.toFixed(4)),
median: Number(median.toFixed(4)),
min,
max,
range: max - min,
q1: Number(q1.toFixed(4)),
q3: Number(q3.toFixed(4)),
iqr: Number(iqr.toFixed(4)),
variance: Number(variance.toFixed(4)),
standardDeviation: Number(standardDeviation.toFixed(4)),
skewness: Number(skewness.toFixed(4)),
kurtosis: Number(kurtosis.toFixed(4)),
},
interpretation: {
distribution: getDistributionType(skewness, kurtosis),
outlierBounds: {
lower: q1 - 1.5 * iqr,
upper: q3 + 1.5 * iqr,
},
variability: getVariabilityLevel(standardDeviation / mean),
},
};
},
{
name: 'statistical_analysis',
description: '对数据列进行统计分析',
schema: z.object({
data: z.array(z.record(z.any())).describe('数据数组'),
column: z.string().describe('要分析的列名'),
}),
}
);

/**
* 相关性分析工具
* 计算两个变量之间的相关性
*/
export const correlationAnalysisTool = tool(
async ({
data,
column1,
column2,
}: {
data: any[];
column1: string;
column2: string;
}) => {
if (!data || data.length === 0) {
return {
error: '数据为空',
correlation: null,
};
}

const pairs = data
.map((row) => ({
x: Number(row[column1]),
y: Number(row[column2]),
}))
.filter((pair) => !isNaN(pair.x) && !isNaN(pair.y));

if (pairs.length < 2) {
return {
error: '有效数据点不足',
correlation: null,
};
}

const n = pairs.length;
const sumX = pairs.reduce((sum, pair) => sum + pair.x, 0);
const sumY = pairs.reduce((sum, pair) => sum + pair.y, 0);
const sumXY = pairs.reduce((sum, pair) => sum + pair.x * pair.y, 0);
const sumX2 = pairs.reduce((sum, pair) => sum + pair.x * pair.x, 0);
const sumY2 = pairs.reduce((sum, pair) => sum + pair.y * pair.y, 0);

const numerator = n * sumXY - sumX * sumY;
const denominator = Math.sqrt(
(n * sumX2 - sumX * sumX) * (n * sumY2 - sumY * sumY)
);

if (denominator === 0) {
return {
error: '无法计算相关性(分母为零)',
correlation: null,
};
}

const correlation = numerator / denominator;

// 计算决定系数
const rSquared = correlation * correlation;

// 计算显著性(简化版本)
const tStatistic = correlation * Math.sqrt((n - 2) / (1 - rSquared));
const degreesOfFreedom = n - 2;

return {
column1,
column2,
correlation: {
coefficient: Number(correlation.toFixed(4)),
rSquared: Number(rSquared.toFixed(4)),
strength: getCorrelationStrength(Math.abs(correlation)),
direction:
correlation > 0 ? 'positive' : correlation < 0 ? 'negative' : 'none',
tStatistic: Number(tStatistic.toFixed(4)),
degreesOfFreedom,
sampleSize: n,
},
interpretation: {
strength: getCorrelationStrength(Math.abs(correlation)),
explanation: getCorrelationExplanation(correlation),
varianceExplained: `${(rSquared * 100).toFixed(1)}%`,
},
};
},
{
name: 'correlation_analysis',
description: '分析两个变量之间的相关性',
schema: z.object({
data: z.array(z.record(z.any())).describe('数据数组'),
column1: z.string().describe('第一个变量列名'),
column2: z.string().describe('第二个变量列名'),
}),
}
);

/**
* 趋势分析工具
* 分析时间序列数据的趋势
*/
export const trendAnalysisTool = tool(
async ({
data,
timeColumn,
valueColumn,
}: {
data: any[];
timeColumn: string;
valueColumn: string;
}) => {
if (!data || data.length === 0) {
return {
error: '数据为空',
trend: null,
};
}

// 准备数据
const timeSeriesData = data
.map((row, index) => ({
time: new Date(row[timeColumn]).getTime(),
value: Number(row[valueColumn]),
index,
}))
.filter((point) => !isNaN(point.time) && !isNaN(point.value))
.sort((a, b) => a.time - b.time);

if (timeSeriesData.length < 3) {
return {
error: '时间序列数据点不足',
trend: null,
};
}

// 线性回归分析
const n = timeSeriesData.length;
const sumX = timeSeriesData.reduce((sum, point) => sum + point.index, 0);
const sumY = timeSeriesData.reduce((sum, point) => sum + point.value, 0);
const sumXY = timeSeriesData.reduce(
(sum, point) => sum + point.index * point.value,
0
);
const sumX2 = timeSeriesData.reduce(
(sum, point) => sum + point.index * point.index,
0
);

const slope = (n * sumXY - sumX * sumY) / (n * sumX2 - sumX * sumX);
const intercept = (sumY - slope * sumX) / n;

// 计算R²
const meanY = sumY / n;
const totalSumSquares = timeSeriesData.reduce(
(sum, point) => sum + Math.pow(point.value - meanY, 2),
0
);
const residualSumSquares = timeSeriesData.reduce((sum, point) => {
const predicted = slope * point.index + intercept;
return sum + Math.pow(point.value - predicted, 2);
}, 0);

const rSquared = 1 - residualSumSquares / totalSumSquares;

// 计算变化率
const firstValue = timeSeriesData[0].value;
const lastValue = timeSeriesData[timeSeriesData.length - 1].value;
const totalChange = lastValue - firstValue;
const percentageChange = (totalChange / firstValue) * 100;

// 检测季节性(简化版本)
const seasonality = detectSeasonality(timeSeriesData);

return {
timeColumn,
valueColumn,
trend: {
slope: Number(slope.toFixed(6)),
intercept: Number(intercept.toFixed(4)),
rSquared: Number(rSquared.toFixed(4)),
direction:
slope > 0 ? 'increasing' : slope < 0 ? 'decreasing' : 'stable',
strength: getTrendStrength(Math.abs(slope), rSquared),
totalChange: Number(totalChange.toFixed(4)),
percentageChange: Number(percentageChange.toFixed(2)),
dataPoints: n,
timeSpan: {
start: new Date(timeSeriesData[0].time).toISOString(),
end: new Date(
timeSeriesData[timeSeriesData.length - 1].time
).toISOString(),
},
},
seasonality,
interpretation: {
trendDescription: getTrendDescription(slope, rSquared),
significance:
rSquared > 0.7 ? 'high' : rSquared > 0.4 ? 'medium' : 'low',
forecast: generateSimpleForecast(slope, intercept, n),
},
};
},
{
name: 'trend_analysis',
description: '分析时间序列数据的趋势',
schema: z.object({
data: z.array(z.record(z.any())).describe('数据数组'),
timeColumn: z.string().describe('时间列名'),
valueColumn: z.string().describe('数值列名'),
}),
}
);

/**
* 异常值检测工具
* 检测数据中的异常值
*/
export const outlierDetectionTool = tool(
async ({
data,
column,
method = 'iqr',
}: {
data: any[];
column: string;
method?: string;
}) => {
if (!data || data.length === 0) {
return {
error: '数据为空',
outliers: null,
};
}

const values = data
.map((row, index) => ({ value: Number(row[column]), index, row }))
.filter((item) => !isNaN(item.value));

if (values.length === 0) {
return {
error: `${column} 不包含有效的数值数据`,
outliers: null,
};
}

let outliers: any[] = [];
let bounds: { lower: number; upper: number };

if (method === 'iqr') {
// IQR方法
const sortedValues = values
.map((item) => item.value)
.sort((a, b) => a - b);
const q1 = sortedValues[Math.floor(sortedValues.length * 0.25)];
const q3 = sortedValues[Math.floor(sortedValues.length * 0.75)];
const iqr = q3 - q1;

bounds = {
lower: q1 - 1.5 * iqr,
upper: q3 + 1.5 * iqr,
};

outliers = values.filter(
(item) => item.value < bounds.lower || item.value > bounds.upper
);
} else if (method === 'zscore') {
// Z-score方法
const mean =
values.reduce((sum, item) => sum + item.value, 0) / values.length;
const std = Math.sqrt(
values.reduce((sum, item) => sum + Math.pow(item.value - mean, 2), 0) /
values.length
);

bounds = {
lower: mean - 3 * std,
upper: mean + 3 * std,
};

outliers = values.filter((item) => {
const zscore = Math.abs((item.value - mean) / std);
return zscore > 3;
});
}

// 计算异常值统计
const outlierCount = outliers.length;
const outlierPercentage = (outlierCount / values.length) * 100;

return {
column,
method,
outliers: {
count: outlierCount,
percentage: Number(outlierPercentage.toFixed(2)),
bounds,
values: outliers.map((item) => ({
index: item.index,
value: item.value,
severity: calculateOutlierSeverity(item.value, bounds),
})),
summary: {
mild: outliers.filter(
(item) => calculateOutlierSeverity(item.value, bounds) === 'mild'
).length,
moderate: outliers.filter(
(item) =>
calculateOutlierSeverity(item.value, bounds) === 'moderate'
).length,
extreme: outliers.filter(
(item) => calculateOutlierSeverity(item.value, bounds) === 'extreme'
).length,
},
},
interpretation: {
severity:
outlierPercentage > 10
? 'high'
: outlierPercentage > 5
? 'medium'
: 'low',
recommendation: getOutlierRecommendation(outlierPercentage, method),
},
};
},
{
name: 'outlier_detection',
description: '检测数据中的异常值',
schema: z.object({
data: z.array(z.record(z.any())).describe('数据数组'),
column: z.string().describe('要检测异常值的列名'),
method: z.enum(['iqr', 'zscore']).optional().describe('检测方法'),
}),
}
);

// 辅助函数
function calculateSkewness(
values: number[],
mean: number,
std: number
): number {
const n = values.length;
const skewness =
values.reduce((sum, value) => {
return sum + Math.pow((value - mean) / std, 3);
}, 0) / n;
return skewness;
}

function calculateKurtosis(
values: number[],
mean: number,
std: number
): number {
const n = values.length;
const kurtosis =
values.reduce((sum, value) => {
return sum + Math.pow((value - mean) / std, 4);
}, 0) /
n -
3; // 减去3得到超额峰度
return kurtosis;
}

function getDistributionType(skewness: number, kurtosis: number): string {
if (Math.abs(skewness) < 0.5 && Math.abs(kurtosis) < 0.5) {
return 'normal';
} else if (skewness > 0.5) {
return 'right-skewed';
} else if (skewness < -0.5) {
return 'left-skewed';
} else if (kurtosis > 0.5) {
return 'heavy-tailed';
} else if (kurtosis < -0.5) {
return 'light-tailed';
}
return 'unknown';
}

function getVariabilityLevel(cv: number): string {
if (cv < 0.1) return 'low';
if (cv < 0.3) return 'medium';
return 'high';
}

function getCorrelationStrength(correlation: number): string {
const abs = Math.abs(correlation);
if (abs < 0.3) return 'weak';
if (abs < 0.7) return 'moderate';
return 'strong';
}

function getCorrelationExplanation(correlation: number): string {
const abs = Math.abs(correlation);
const direction = correlation > 0 ? '正' : '负';
const strength = getCorrelationStrength(abs);
return `存在${direction}相关关系,强度为${strength}`;
}

function detectSeasonality(
data: Array<{ time: number; value: number; index: number }>
) {
// 简化的季节性检测
if (data.length < 12) {
return { detected: false, period: null, strength: 0 };
}

// 检测12个月的周期性
const monthlyAvg: number[] = new Array(12).fill(0);
const monthlyCount: number[] = new Array(12).fill(0);

data.forEach((point) => {
const month = new Date(point.time).getMonth();
monthlyAvg[month] += point.value;
monthlyCount[month]++;
});

for (let i = 0; i < 12; i++) {
if (monthlyCount[i] > 0) {
monthlyAvg[i] /= monthlyCount[i];
}
}

const overallMean = monthlyAvg.reduce((sum, val) => sum + val, 0) / 12;
const seasonalVariance =
monthlyAvg.reduce((sum, val) => sum + Math.pow(val - overallMean, 2), 0) /
12;
const strength = seasonalVariance / (overallMean * overallMean);

return {
detected: strength > 0.1,
period: 12,
strength: Number(strength.toFixed(4)),
monthlyPattern: monthlyAvg.map((val) => Number(val.toFixed(2))),
};
}

function getTrendStrength(slope: number, rSquared: number): string {
if (rSquared < 0.3) return 'weak';
if (rSquared < 0.7) return 'moderate';
return 'strong';
}

function getTrendDescription(slope: number, rSquared: number): string {
const direction = slope > 0 ? '上升' : slope < 0 ? '下降' : '平稳';
const strength = getTrendStrength(Math.abs(slope), rSquared);
return `数据显示${direction}趋势,趋势强度为${strength}`;
}

function generateSimpleForecast(
slope: number,
intercept: number,
currentIndex: number
) {
const nextPeriods = 3;
const forecasts = [];

for (let i = 1; i <= nextPeriods; i++) {
const forecastValue = slope * (currentIndex + i) + intercept;
forecasts.push({
period: i,
value: Number(forecastValue.toFixed(2)),
});
}

return forecasts;
}

function calculateOutlierSeverity(
value: number,
bounds: { lower: number; upper: number }
): string {
const { lower, upper } = bounds;
const range = upper - lower;

if (value >= lower && value <= upper) {
return 'normal';
}

const distance = value < lower ? lower - value : value - upper;
const severity = distance / range;

if (severity < 0.5) return 'mild';
if (severity < 1.5) return 'moderate';
return 'extreme';
}

function getOutlierRecommendation(percentage: number, method: string): string {
if (percentage < 5) {
return '异常值比例正常,可以保留数据进行分析';
} else if (percentage < 10) {
return '异常值比例偏高,建议进一步调查异常值的原因';
} else {
return '异常值比例过高,建议检查数据质量或考虑使用其他检测方法';
}
}

/**
* 数据分组分析工具
* 按分类变量对数据进行分组分析
*/
export const groupAnalysisTool = tool(
async ({
data,
groupColumn,
valueColumn,
}: {
data: any[];
groupColumn: string;
valueColumn: string;
}) => {
if (!data || data.length === 0) {
return {
error: '数据为空',
groups: null,
};
}

// 按组分组数据
const groups: Record<string, any[]> = {};
data.forEach((row) => {
const groupValue = String(row[groupColumn]);
if (!groups[groupValue]) {
groups[groupValue] = [];
}
groups[groupValue].push(row);
});

// 计算每组的统计信息
const groupStats: Record<string, any> = {};
const allValues: number[] = [];

Object.entries(groups).forEach(([groupName, groupData]) => {
const values = groupData
.map((row) => Number(row[valueColumn]))
.filter((val) => !isNaN(val));

if (values.length > 0) {
const count = values.length;
const sum = values.reduce((a, b) => a + b, 0);
const mean = sum / count;
const sortedValues = [...values].sort((a, b) => a - b);
const median =
count % 2 === 0
? (sortedValues[count / 2 - 1] + sortedValues[count / 2]) / 2
: sortedValues[Math.floor(count / 2)];
const variance =
values.reduce((sum, val) => sum + Math.pow(val - mean, 2), 0) / count;
const std = Math.sqrt(variance);

groupStats[groupName] = {
count,
mean: Number(mean.toFixed(4)),
median: Number(median.toFixed(4)),
std: Number(std.toFixed(4)),
min: Math.min(...values),
max: Math.max(...values),
sum: Number(sum.toFixed(2)),
};

allValues.push(...values);
}
});

// 计算组间差异
const groupNames = Object.keys(groupStats);
const groupMeans = groupNames.map((name) => groupStats[name].mean);
const overallMean = allValues.reduce((a, b) => a + b, 0) / allValues.length;

// 计算组间方差和组内方差(简化版ANOVA)
const betweenGroupVariance =
groupNames.reduce((sum, name) => {
const groupMean = groupStats[name].mean;
const groupSize = groupStats[name].count;
return sum + groupSize * Math.pow(groupMean - overallMean, 2);
}, 0) /
(groupNames.length - 1);

const withinGroupVariance =
groupNames.reduce((sum, name) => {
const groupData = groups[name];
const groupMean = groupStats[name].mean;
const groupVariance = groupData.reduce((varSum, row) => {
const value = Number(row[valueColumn]);
return !isNaN(value)
? varSum + Math.pow(value - groupMean, 2)
: varSum;
}, 0);
return sum + groupVariance;
}, 0) /
(allValues.length - groupNames.length);

const fStatistic = betweenGroupVariance / withinGroupVariance;

return {
groupColumn,
valueColumn,
groups: groupStats,
summary: {
totalGroups: groupNames.length,
totalObservations: allValues.length,
overallMean: Number(overallMean.toFixed(4)),
betweenGroupVariance: Number(betweenGroupVariance.toFixed(4)),
withinGroupVariance: Number(withinGroupVariance.toFixed(4)),
fStatistic: Number(fStatistic.toFixed(4)),
},
interpretation: {
mostFrequentGroup: groupNames.reduce((a, b) =>
groupStats[a].count > groupStats[b].count ? a : b
),
highestMeanGroup: groupNames.reduce((a, b) =>
groupStats[a].mean > groupStats[b].mean ? a : b
),
lowestMeanGroup: groupNames.reduce((a, b) =>
groupStats[a].mean < groupStats[b].mean ? a : b
),
variabilityLevel:
fStatistic > 4 ? 'high' : fStatistic > 2 ? 'medium' : 'low',
},
};
},
{
name: 'group_analysis',
description: '按分类变量对数据进行分组分析',
schema: z.object({
data: z.array(z.record(z.any())).describe('数据数组'),
groupColumn: z.string().describe('分组列名'),
valueColumn: z.string().describe('数值列名'),
}),
}
);

// 导出所有工具
export const dataAnalysisTools = [
statisticalAnalysisTool,
correlationAnalysisTool,
trendAnalysisTool,
outlierDetectionTool,
groupAnalysisTool,
];
工具选择建议

选择合适的数据分析工具对系统性能至关重要:

  • 统计分析:使用 simple-statistics、ml-js 等库
  • 数据可视化:集成 Chart.js、D3.js、Plotly.js
  • 数据处理:使用 Lodash、Ramda 进行数据操作
  • 机器学习:集成 TensorFlow.js、ML5.js

实践指导

1. 数据质量检查

建立数据质量评估标准:

const dataQualityChecks = {
completeness: (data: any[]) => {
const totalFields = Object.keys(data[0] || {}).length;
const completeRecords = data.filter(record =>
Object.values(record).every(value => value != null)
).length;
return completeRecords / data.length;
},

consistency: (data: any[], field: string) => {
const types = new Set(data.map(record => typeof record[field]));
return types.size === 1;
},

accuracy: (data: any[], validationRules: Record<string, (value: any) => boolean>) => {
return data.every(record =>
Object.entries(validationRules).every(([field, validator]) =>
validator(record[field])
)
);
}
};

2. 分析策略选择

根据数据特征选择合适的分析方法:

const analysisStrategies = {
numerical: ['descriptive_stats', 'correlation', 'regression'],
categorical: ['frequency_analysis', 'chi_square', 'association_rules'],
temporal: ['trend_analysis', 'seasonality', 'forecasting'],
textual: ['sentiment_analysis', 'topic_modeling', 'keyword_extraction']
};

3. 结果验证

建立分析结果的验证机制:

const resultValidation = {
statisticalSignificance: (pValue: number) => pValue < 0.05,
effectSize: (effect: number, threshold: number) => Math.abs(effect) > threshold,
confidenceInterval: (ci: [number, number]) => ci[1] - ci[0] < 0.1
};

高级特性

自动化分析流程

构建自适应的分析流程:

const adaptiveAnalysis = {
selectMethods: (dataCharacteristics: any) => {
const methods = [];

if (dataCharacteristics.hasNumerical) {
methods.push('correlation_analysis');
}

if (dataCharacteristics.hasTime) {
methods.push('time_series_analysis');
}

if (dataCharacteristics.hasCategories) {
methods.push('categorical_analysis');
}

return methods;
}
};

交互式分析

支持用户交互和迭代分析:

const interactiveAnalysis = {
handleUserFeedback: (feedback: string, currentResults: any) => {
// 根据用户反馈调整分析策略
if (feedback.includes('更详细')) {
return { ...currentResults, detailLevel: 'high' };
}

if (feedback.includes('简化')) {
return { ...currentResults, detailLevel: 'low' };
}

return currentResults;
}
};

性能优化

数据分块处理

处理大型数据集时使用分块策略:

const chunkProcessor = {
processInChunks: async (data: any[], chunkSize: number, processor: Function) => {
const results = [];

for (let i = 0; i < data.length; i += chunkSize) {
const chunk = data.slice(i, i + chunkSize);
const chunkResult = await processor(chunk);
results.push(chunkResult);
}

return results;
}
};

缓存机制

缓存分析结果以提高性能:

const analysisCache = new Map<string, any>();

const getCachedAnalysis = (dataHash: string, analysisType: string) => {
const key = `${dataHash}_${analysisType}`;
return analysisCache.get(key);
};

const setCachedAnalysis = (dataHash: string, analysisType: string, result: any) => {
const key = `${dataHash}_${analysisType}`;
analysisCache.set(key, result);
};

小结与延伸

数据分析是 LangGraph 的一个强大应用场景,通过合理的架构设计和工具集成,可以构建出高效、智能的数据分析系统。

关键要点

  • 使用 Analyzer-Visualizer 模式提升结果展示
  • 利用 Multi-Stage Analysis 模式处理复杂分析
  • 集成适当的数据处理和可视化工具
  • 建立完善的数据质量检查机制

通过学习代码生成和数据分析这两个重要用例,我们已经掌握了 LangGraph 在实际应用中的核心模式和最佳实践。