在上下文工程实践中,“这个优化是否有效”这样的问题常常被用定性的方式回答(“看起来好多了”)。但这种方式无法支撑工程决策:
- 无法判断何时停止优化(边际收益递减点在哪里?)
- 无法对比不同方案的优劣(方案A和B哪个更好?)
- 无法持续追踪系统是否在退化(性能是否降低了?)
- 无法向管理层证明ROI(优化是否值得投资?)
本节介绍一套系统的量化评估框架,包括相关性指标、检索质量指标、效率指标,以及如何在实践中组合使用这些指标。
from typing import List, Tuple, Set
import numpy as np
class RelevanceScorer:
"""相关性评分工具"""
@staticmethod
def mean_reciprocal_rank(
retrieved_items: List[bool], # True表示相关
threshold_position: int = 10
) -> float:
"""
平均倒数排名 (Mean Reciprocal Rank, MRR)
衡量第一个相关项目出现的位置。
越靠前,分数越高。
公式: MRR = (1/position of first relevant item)
示例:
- [True, False, False] -> 1/1 = 1.0 (最优)
- [False, True, False] -> 1/2 = 0.5
- [False, False, False] -> 0.0 (无相关项)
"""
for i, is_relevant in enumerate(retrieved_items[:threshold_position]):
if is_relevant:
return 1.0 / (i + 1)
return 0.0
@staticmethod
def recall_at_k(
retrieved_items: List[bool],
relevant_item_count: int,
k: int = 10
) -> float:
"""
召回率 (Recall@k)
在前k个结果中,找到了多少个相关项。
公式: Recall@k = (前k个结果中的相关数) / 总相关数
示例:
- 总共10个相关项,前20个结果中找到8个 -> 8/10 = 0.8
- 总共5个相关项,前10个结果中找到3个 -> 3/5 = 0.6
"""
if relevant_item_count == 0:
return 0.0
relevant_in_top_k = sum(retrieved_items[:k])
return relevant_in_top_k / relevant_item_count
@staticmethod
def precision_at_k(
retrieved_items: List[bool],
k: int = 10
) -> float:
"""
精确度 (Precision@k)
前k个结果中有多少比例是相关的。
公式: Precision@k = (前k个中的相关数) / k
示例:
- 前10个结果中有8个相关 -> 8/10 = 0.8
- 前5个结果中有1个相关 -> 1/5 = 0.2
"""
if k == 0:
return 0.0
relevant_in_top_k = sum(retrieved_items[:k])
return relevant_in_top_k / k
@staticmethod
def f1_score(
retrieved_items: List[bool],
relevant_item_count: int,
k: int = 10
) -> float:
"""
F1分数 (F1 Score)
Precision和Recall的调和平均。
在精确度和召回率之间找到平衡。
公式: F1 = 2 * (Precision * Recall) / (Precision + Recall)
"""
precision = RelevanceScorer.precision_at_k(retrieved_items, k)
recall = RelevanceScorer.recall_at_k(retrieved_items, relevant_item_count, k)
if precision + recall == 0:
return 0.0
return 2 * (precision * recall) / (precision + recall)
@staticmethod
def ndcg(
relevance_scores: List[float],
k: int = 10,
ideal_dcg: float = None
) -> float:
"""
归一化折损累计增益 (Normalized Discounted Cumulative Gain, NDCG)
考虑相关性的"程度"(不只是相关/不相关)。
排名靠前的相关项贡献更大。
公式:
DCG@k = sum(relevance[i] / log2(i+2)) for i in 0..k-1
NDCG@k = DCG@k / IDCG@k
其中IDCG是完美排序下的DCG。
示例:
relevance_scores = [0.9, 0.7, 0.0, 0.5, 0.8]
DCG@5 = 0.9/1 + 0.7/log2(3) + 0/log2(4) + 0.5/log2(5) + 0.8/log2(6)
"""
if not relevance_scores or k == 0:
return 0.0
# 计算实际的DCG
dcg = 0.0
for i, score in enumerate(relevance_scores[:k]):
# 注意:log2(i+2)表示从位置1开始计数
dcg += score / np.log2(i + 2)
# 计算理想的DCG(如果没有给出,则用排序后的最优值)
if ideal_dcg is None:
sorted_scores = sorted(relevance_scores, reverse=True)
ideal_dcg = 0.0
for i, score in enumerate(sorted_scores[:k]):
ideal_dcg += score / np.log2(i + 2)
if ideal_dcg == 0:
return 0.0
return dcg / ideal_dcg
# 使用示例
if __name__ == "__main__":
# 场景:搜索"Python教程",返回了10个结果
# 其中标注的相关性:
retrieved = [
True, # 位置1: 很相关
False, # 位置2: 不相关
True, # 位置3: 相关
False, # 位置4: 不相关
False, # 位置5: 不相关
True, # 位置6: 相关
False, # 位置7: 不相关
False, # 位置8: 不相关
True, # 位置9: 相关
False, # 位置10: 不相关
]
total_relevant_count = 6 # 来自黄金标注:语料库中共有6个相关项
scorer = RelevanceScorer()
mrr = scorer.mean_reciprocal_rank(retrieved)
recall = scorer.recall_at_k(retrieved, total_relevant_count, k=10)
precision = scorer.precision_at_k(retrieved, k=10)
f1 = scorer.f1_score(retrieved, total_relevant_count, k=10)
print(f"MRR: {mrr:.2f}") # 应该是1.0(第1个位置)
print(f"Recall@10: {recall:.2f}") # 应该是0.67(找到6个相关项中的4个)
print(f"Precision@10: {precision:.2f}") # 应该是0.4(4/10)
print(f"F1: {f1:.2f}")实际场景中,相关性常常不是二元的(相关/不相关),而是有多个等级:
class GradedRelevanceScorer:
"""处理分级相关性的评估工具"""
# 常见的相关性等级
RELEVANCE_GRADES = {
0: "完全不相关",
1: "部分相关",
2: "相关",
3: "高度相关",
}
@staticmethod
def dcg_with_grades(
relevance_grades: List[int], # [3, 2, 0, 1, 3, ...]
k: int = 10
) -> float:
"""
使用分级相关性计算DCG。
"""
dcg = 0.0
for i, grade in enumerate(relevance_grades[:k]):
# 标准NDCG使用 (2^grade - 1) 作为相关性权重
weight = (2 ** grade) - 1
dcg += weight / np.log2(i + 2)
return dcg
@staticmethod
def ndcg_with_grades(
relevance_grades: List[int],
k: int = 10
) -> float:
"""
使用分级相关性计算NDCG。
"""
dcg = GradedRelevanceScorer.dcg_with_grades(relevance_grades, k)
# 理想的DCG:把同一批候选的人工相关性等级按从高到低排序
ideal_grades = sorted(relevance_grades, reverse=True)[:k]
ideal_dcg = GradedRelevanceScorer.dcg_with_grades(ideal_grades, k)
if ideal_dcg == 0:
return 0.0
return dcg / ideal_dcg
@staticmethod
def metric_at_each_level(
relevance_grades: List[int],
k: int = 10
) -> dict:
"""
计算每个相关性等级的指标分布。
"""
grade_counts = {grade: 0 for grade in range(4)}
for grade in relevance_grades[:k]:
grade_counts[grade] += 1
return {
'not_relevant': grade_counts[0],
'partially_relevant': grade_counts[1],
'relevant': grade_counts[2],
'highly_relevant': grade_counts[3],
}class RetrievalQualityEvaluator:
"""综合的检索质量评估"""
def __init__(self):
self.scorer = RelevanceScorer()
def evaluate_retrieval(
self,
query: str,
retrieved_results: List[dict], # [{'id': '...', 'is_relevant': True/False, 'relevance_score': 0.8}]
total_relevant_count: int,
top_k_values: List[int] = None
) -> dict:
"""
对单个查询的检索结果进行多维度评估。
total_relevant_count 必须来自黄金标注或完整候选集,不能从 retrieved_results 推断。
"""
if top_k_values is None:
top_k_values = [1, 5, 10]
# 提取相关性标签
is_relevant = [r.get('is_relevant', False) for r in retrieved_results]
relevance_scores = [r.get('relevance_score', 0.0) for r in retrieved_results]
evaluation = {
'query': query,
'total_retrieved': len(retrieved_results),
'total_relevant': total_relevant_count,
}
# 计算多个k值下的指标
for k in top_k_values:
k_metrics = {
'mrr': self.scorer.mean_reciprocal_rank(is_relevant, k),
'recall': self.scorer.recall_at_k(is_relevant, total_relevant_count, k),
'precision': self.scorer.precision_at_k(is_relevant, k),
'f1': self.scorer.f1_score(is_relevant, total_relevant_count, k),
}
evaluation[f'metrics@{k}'] = k_metrics
return evaluation
def evaluate_retrieval_batch(
self,
queries: List[str],
retrieved_results_batch: List[List[dict]],
total_relevant_counts: List[int],
top_k_values: List[int] = None
) -> dict:
"""
对多个查询的检索结果进行批量评估,并计算平均指标。
"""
if top_k_values is None:
top_k_values = [5, 10]
individual_results = []
aggregated_metrics = {f'@{k}': {} for k in top_k_values}
for query, results, total_relevant_count in zip(queries, retrieved_results_batch, total_relevant_counts):
indiv = self.evaluate_retrieval(query, results, total_relevant_count, top_k_values)
individual_results.append(indiv)
# 累积指标
for k in top_k_values:
k_str = f'@{k}'
k_metrics = indiv[f'metrics@{k}']
for metric_name, metric_value in k_metrics.items():
if metric_name not in aggregated_metrics[k_str]:
aggregated_metrics[k_str][metric_name] = []
aggregated_metrics[k_str][metric_name].append(metric_value)
# 计算平均值
for k_str in aggregated_metrics:
for metric_name in aggregated_metrics[k_str]:
values = aggregated_metrics[k_str][metric_name]
aggregated_metrics[k_str][metric_name] = {
'mean': np.mean(values),
'std': np.std(values),
'min': np.min(values),
'max': np.max(values),
}
return {
'individual_results': individual_results,
'aggregated_metrics': aggregated_metrics,
'total_queries': len(queries),
'average_retrieval_count': np.mean([r['total_retrieved'] for r in individual_results]),
}class ContextEfficiencyAnalyzer:
"""分析上下文的使用效率"""
@staticmethod
def information_density(
context: str,
relevance_score: float
) -> float:
"""
信息密度 = 相关信息密度 / 总信息量
衡量上下文中"有用信息"的比例。
"""
total_tokens = len(context) // 4 # 粗略估计
useful_tokens = total_tokens * relevance_score
return useful_tokens / total_tokens if total_tokens > 0 else 0
@staticmethod
def redundancy_ratio(
context: str
) -> float:
"""
冗余度 = 重复内容的比例
通过n-gram重复频率估计冗余度。
"""
# 简化:提取所有5-gram并统计重复
words = context.split()
if len(words) < 5:
return 0.0
ngrams = []
for i in range(len(words) - 4):
ngram = ' '.join(words[i:i+5])
ngrams.append(ngram)
unique_ngrams = len(set(ngrams))
total_ngrams = len(ngrams)
# 冗余度 = 1 - (unique / total)
redundancy = 1 - (unique_ngrams / total_ngrams) if total_ngrams > 0 else 0
return redundancy
@staticmethod
def context_coverage_ratio(
context: str,
query: str
) -> float:
"""
覆盖率 = 查询中被上下文覆盖的关键词比例
衡量上下文对查询的覆盖程度。
"""
query_keywords = set(query.lower().split())
context_lower = context.lower()
covered = sum(1 for kw in query_keywords if kw in context_lower)
total = len(query_keywords)
return covered / total if total > 0 else 0
@staticmethod
def efficiency_score(
context: str,
query: str,
relevance_score: float,
max_tokens: int = 2000
) -> float:
"""
综合效率分数 (0-100)
综合考虑:
- 信息密度(越高越好)
- 冗余度(越低越好)
- 覆盖率(越高越好)
- Token利用率(在预算内)
"""
density = ContextEfficiencyAnalyzer.information_density(context, relevance_score)
redundancy = ContextEfficiencyAnalyzer.redundancy_ratio(context)
coverage = ContextEfficiencyAnalyzer.context_coverage_ratio(context, query)
# Token利用率
context_tokens = len(context) // 4
token_utilization = min(1.0, context_tokens / max_tokens)
# 加权综合
efficiency = (
density * 0.35 + # 信息密度权重35%
(1 - redundancy) * 0.25 + # 非冗余度权重25%
coverage * 0.25 + # 覆盖率权重25%
token_utilization * 0.15 # Token利用率权重15%
)
# 规范化到0-100
return efficiency * 100
# 使用示例
if __name__ == "__main__":
evaluator = RetrievalQualityEvaluator()
# 评估单个查询
query = "Python教程"
results = [
{'id': '1', 'is_relevant': True, 'relevance_score': 0.95},
{'id': '2', 'is_relevant': False, 'relevance_score': 0.2},
{'id': '3', 'is_relevant': True, 'relevance_score': 0.85},
{'id': '4', 'is_relevant': False, 'relevance_score': 0.15},
{'id': '5', 'is_relevant': True, 'relevance_score': 0.8},
]
total_relevant_count = 4 # 来自黄金标注:语料库中共有4个相关文档
evaluation = evaluator.evaluate_retrieval(query, results, total_relevant_count)
print("单个查询评估:")
print(f" 总相关: {evaluation['total_relevant']}")
print(f" Recall@5: {evaluation['metrics@5']['recall']:.2f}")
print(f" Precision@5: {evaluation['metrics@5']['precision']:.2f}")
# 评估效率
context = """
Python是一种高级编程语言,以其简洁易读的语法而闻名。
Python支持多种编程范式,包括过程式、面向对象和函数式编程。
Python广泛用于数据科学、机器学习和Web开发。
Python有一个庞大的标准库和第三方库生态系统。
"""
efficiency = ContextEfficiencyAnalyzer.efficiency_score(context, query, 0.9)
print(f"\n上下文效率分数: {efficiency:.1f}/100")from scipy import stats
class ABTestFramework:
"""
A/B测试框架
用于统计显著性验证
"""
@staticmethod
def paired_ttest(
control_group: List[float],
treatment_group: List[float],
alpha: float = 0.05
) -> dict:
"""
配对t检验
同一批查询分别跑 control 和 treatment 时,应该比较逐查询差值。
"""
t_stat, p_value = stats.ttest_rel(control_group, treatment_group)
is_significant = p_value < alpha
return {
't_statistic': t_stat,
'p_value': p_value,
'is_significant': is_significant,
'control_mean': np.mean(control_group),
'treatment_mean': np.mean(treatment_group),
'improvement': (
(np.mean(treatment_group) - np.mean(control_group)) /
np.mean(control_group) * 100
if np.mean(control_group) != 0 else 0
),
}
@staticmethod
def cohen_d(
control_group: List[float],
treatment_group: List[float]
) -> float:
"""
计算效果量 (Effect Size)
衡量两组差异的实际大小(不只是统计显著性)。
"""
deltas = np.array(treatment_group) - np.array(control_group)
mean_diff = np.mean(deltas)
pooled_std = np.std(deltas)
if pooled_std == 0:
return 0
return mean_diff / pooled_std
@staticmethod
def calculate_sample_size(
effect_size: float = 0.5, # Cohen's d
alpha: float = 0.05,
power: float = 0.8
) -> int:
"""
计算所需样本量
为了达到统计显著性,需要多少样本?
"""
# 简化公式
z_alpha = stats.norm.ppf(1 - alpha / 2)
z_beta = stats.norm.ppf(power)
sample_size = int(
2 * ((z_alpha + z_beta) / effect_size) ** 2
)
return sample_size
class ContextOptimizationABTest:
"""上下文优化的A/B测试设计"""
def __init__(self, control_strategy: str, treatment_strategy: str):
self.control_strategy = control_strategy
self.treatment_strategy = treatment_strategy
self.control_results = []
self.treatment_results = []
def run_test(
self,
queries: List[str],
ground_truth_answers: List[str],
llm_generate_func
) -> dict:
"""
执行A/B测试。
control_strategy: 原始的上下文工程方案
treatment_strategy: 新的上下文工程方案
"""
print(f"Running A/B Test: {self.control_strategy} vs {self.treatment_strategy}")
# 测试Control组
for query, ground_truth in zip(queries, ground_truth_answers):
context = self._prepare_context(query, strategy=self.control_strategy)
answer = llm_generate_func(context, query)
similarity = self._similarity(answer, ground_truth)
self.control_results.append(similarity)
# 测试Treatment组
for query, ground_truth in zip(queries, ground_truth_answers):
context = self._prepare_context(query, strategy=self.treatment_strategy)
answer = llm_generate_func(context, query)
similarity = self._similarity(answer, ground_truth)
self.treatment_results.append(similarity)
# 统计分析
test_results = ABTestFramework.paired_ttest(
self.control_results,
self.treatment_results
)
effect_size = ABTestFramework.cohen_d(
self.control_results,
self.treatment_results
)
test_results['effect_size'] = effect_size
return test_results
def _prepare_context(self, query: str, strategy: str) -> str:
"""准备上下文(这里是简化实现)"""
# 实际应该调用不同的上下文处理流程
return f"Using {strategy}: {query}"
def _similarity(self, text1: str, text2: str) -> float:
"""计算两个文本的相似度"""
# 简化:基于词重叠。生产评估应换成经过校准的语义指标、
# LLM-as-a-judge 量表或人工黄金标注,避免把复述当成正确性。
words1 = set(text1.lower().split())
words2 = set(text2.lower().split())
if not words1 or not words2:
return 0.0
overlap = len(words1 & words2)
union = len(words1 | words2)
return overlap / union本章节提供了一套完整的量化评估框架,使团队能够科学地衡量上下文工程的效果,而不是依赖主观判断。建议在每个优化周期中定期进行这些评估,以确保持续改进。