Skip to content

Latest commit

 

History

History
668 lines (516 loc) · 20.1 KB

File metadata and controls

668 lines (516 loc) · 20.1 KB

3.5 上下文管理的量化评估方法

3.5.1 引言:为什么需要量化评估

在上下文工程实践中,“这个优化是否有效”这样的问题常常被用定性的方式回答(“看起来好多了”)。但这种方式无法支撑工程决策:

  • 无法判断何时停止优化(边际收益递减点在哪里?)
  • 无法对比不同方案的优劣(方案A和B哪个更好?)
  • 无法持续追踪系统是否在退化(性能是否降低了?)
  • 无法向管理层证明ROI(优化是否值得投资?)

本节介绍一套系统的量化评估框架,包括相关性指标、检索质量指标、效率指标,以及如何在实践中组合使用这些指标。

3.5.2 相关性评分指标

from typing import List, Tuple, Set
import numpy as np

class RelevanceScorer:
    """相关性评分工具"""

    @staticmethod
    def mean_reciprocal_rank(
        retrieved_items: List[bool],  # True表示相关
        threshold_position: int = 10
    ) -> float:
        """
        平均倒数排名 (Mean Reciprocal Rank, MRR)

        衡量第一个相关项目出现的位置。
        越靠前,分数越高。

        公式: MRR = (1/position of first relevant item)

        示例:
        - [True, False, False] -> 1/1 = 1.0 (最优)
        - [False, True, False] -> 1/2 = 0.5
        - [False, False, False] -> 0.0 (无相关项)
        """

        for i, is_relevant in enumerate(retrieved_items[:threshold_position]):
            if is_relevant:
                return 1.0 / (i + 1)

        return 0.0

    @staticmethod
    def recall_at_k(
        retrieved_items: List[bool],
        relevant_item_count: int,
        k: int = 10
    ) -> float:
        """
        召回率 (Recall@k)

        在前k个结果中,找到了多少个相关项。

        公式: Recall@k = (前k个结果中的相关数) / 总相关数

        示例:
        - 总共10个相关项,前20个结果中找到8个 -> 8/10 = 0.8
        - 总共5个相关项,前10个结果中找到3个 -> 3/5 = 0.6
        """

        if relevant_item_count == 0:
            return 0.0

        relevant_in_top_k = sum(retrieved_items[:k])
        return relevant_in_top_k / relevant_item_count

    @staticmethod
    def precision_at_k(
        retrieved_items: List[bool],
        k: int = 10
    ) -> float:
        """
        精确度 (Precision@k)

        前k个结果中有多少比例是相关的。

        公式: Precision@k = (前k个中的相关数) / k

        示例:
        - 前10个结果中有8个相关 -> 8/10 = 0.8
        - 前5个结果中有1个相关 -> 1/5 = 0.2
        """

        if k == 0:
            return 0.0

        relevant_in_top_k = sum(retrieved_items[:k])
        return relevant_in_top_k / k

    @staticmethod
    def f1_score(
        retrieved_items: List[bool],
        relevant_item_count: int,
        k: int = 10
    ) -> float:
        """
        F1分数 (F1 Score)

        Precision和Recall的调和平均。
        在精确度和召回率之间找到平衡。

        公式: F1 = 2 * (Precision * Recall) / (Precision + Recall)
        """

        precision = RelevanceScorer.precision_at_k(retrieved_items, k)
        recall = RelevanceScorer.recall_at_k(retrieved_items, relevant_item_count, k)

        if precision + recall == 0:
            return 0.0

        return 2 * (precision * recall) / (precision + recall)

    @staticmethod
    def ndcg(
        relevance_scores: List[float],
        k: int = 10,
        ideal_dcg: float = None
    ) -> float:
        """
        归一化折损累计增益 (Normalized Discounted Cumulative Gain, NDCG)

        考虑相关性的"程度"(不只是相关/不相关)。
        排名靠前的相关项贡献更大。

        公式:
        DCG@k = sum(relevance[i] / log2(i+2)) for i in 0..k-1
        NDCG@k = DCG@k / IDCG@k

        其中IDCG是完美排序下的DCG。

        示例:
        relevance_scores = [0.9, 0.7, 0.0, 0.5, 0.8]
        DCG@5 = 0.9/1 + 0.7/log2(3) + 0/log2(4) + 0.5/log2(5) + 0.8/log2(6)
        """

        if not relevance_scores or k == 0:
            return 0.0

        # 计算实际的DCG
        dcg = 0.0
        for i, score in enumerate(relevance_scores[:k]):
            # 注意:log2(i+2)表示从位置1开始计数
            dcg += score / np.log2(i + 2)

        # 计算理想的DCG(如果没有给出,则用排序后的最优值)
        if ideal_dcg is None:
            sorted_scores = sorted(relevance_scores, reverse=True)
            ideal_dcg = 0.0
            for i, score in enumerate(sorted_scores[:k]):
                ideal_dcg += score / np.log2(i + 2)

        if ideal_dcg == 0:
            return 0.0

        return dcg / ideal_dcg


# 使用示例
if __name__ == "__main__":
    # 场景:搜索"Python教程",返回了10个结果
    # 其中标注的相关性:
    retrieved = [
        True,   # 位置1: 很相关
        False,  # 位置2: 不相关
        True,   # 位置3: 相关
        False,  # 位置4: 不相关
        False,  # 位置5: 不相关
        True,   # 位置6: 相关
        False,  # 位置7: 不相关
        False,  # 位置8: 不相关
        True,   # 位置9: 相关
        False,  # 位置10: 不相关
    ]

    total_relevant_count = 6  # 来自黄金标注:语料库中共有6个相关项

    scorer = RelevanceScorer()

    mrr = scorer.mean_reciprocal_rank(retrieved)
    recall = scorer.recall_at_k(retrieved, total_relevant_count, k=10)
    precision = scorer.precision_at_k(retrieved, k=10)
    f1 = scorer.f1_score(retrieved, total_relevant_count, k=10)

    print(f"MRR: {mrr:.2f}")        # 应该是1.0(第1个位置)
    print(f"Recall@10: {recall:.2f}")  # 应该是0.67(找到6个相关项中的4个)
    print(f"Precision@10: {precision:.2f}")  # 应该是0.4(4/10)
    print(f"F1: {f1:.2f}")

分级相关性指标

实际场景中,相关性常常不是二元的(相关/不相关),而是有多个等级:

class GradedRelevanceScorer:
    """处理分级相关性的评估工具"""

    # 常见的相关性等级
    RELEVANCE_GRADES = {
        0: "完全不相关",
        1: "部分相关",
        2: "相关",
        3: "高度相关",
    }

    @staticmethod
    def dcg_with_grades(
        relevance_grades: List[int],  # [3, 2, 0, 1, 3, ...]
        k: int = 10
    ) -> float:
        """
        使用分级相关性计算DCG。
        """

        dcg = 0.0
        for i, grade in enumerate(relevance_grades[:k]):
            # 标准NDCG使用 (2^grade - 1) 作为相关性权重
            weight = (2 ** grade) - 1
            dcg += weight / np.log2(i + 2)

        return dcg

    @staticmethod
    def ndcg_with_grades(
        relevance_grades: List[int],
        k: int = 10
    ) -> float:
        """
        使用分级相关性计算NDCG。
        """

        dcg = GradedRelevanceScorer.dcg_with_grades(relevance_grades, k)

        # 理想的DCG:把同一批候选的人工相关性等级按从高到低排序
        ideal_grades = sorted(relevance_grades, reverse=True)[:k]
        ideal_dcg = GradedRelevanceScorer.dcg_with_grades(ideal_grades, k)

        if ideal_dcg == 0:
            return 0.0

        return dcg / ideal_dcg

    @staticmethod
    def metric_at_each_level(
        relevance_grades: List[int],
        k: int = 10
    ) -> dict:
        """
        计算每个相关性等级的指标分布。
        """

        grade_counts = {grade: 0 for grade in range(4)}

        for grade in relevance_grades[:k]:
            grade_counts[grade] += 1

        return {
            'not_relevant': grade_counts[0],
            'partially_relevant': grade_counts[1],
            'relevant': grade_counts[2],
            'highly_relevant': grade_counts[3],
        }

3.5.3 检索质量度量

class RetrievalQualityEvaluator:
    """综合的检索质量评估"""

    def __init__(self):
        self.scorer = RelevanceScorer()

    def evaluate_retrieval(
        self,
        query: str,
        retrieved_results: List[dict],  # [{'id': '...', 'is_relevant': True/False, 'relevance_score': 0.8}]
        total_relevant_count: int,
        top_k_values: List[int] = None
    ) -> dict:
        """
        对单个查询的检索结果进行多维度评估。

        total_relevant_count 必须来自黄金标注或完整候选集,不能从 retrieved_results 推断。
        """

        if top_k_values is None:
            top_k_values = [1, 5, 10]

        # 提取相关性标签
        is_relevant = [r.get('is_relevant', False) for r in retrieved_results]
        relevance_scores = [r.get('relevance_score', 0.0) for r in retrieved_results]

        evaluation = {
            'query': query,
            'total_retrieved': len(retrieved_results),
            'total_relevant': total_relevant_count,
        }

        # 计算多个k值下的指标
        for k in top_k_values:
            k_metrics = {
                'mrr': self.scorer.mean_reciprocal_rank(is_relevant, k),
                'recall': self.scorer.recall_at_k(is_relevant, total_relevant_count, k),
                'precision': self.scorer.precision_at_k(is_relevant, k),
                'f1': self.scorer.f1_score(is_relevant, total_relevant_count, k),
            }

            evaluation[f'metrics@{k}'] = k_metrics

        return evaluation

    def evaluate_retrieval_batch(
        self,
        queries: List[str],
        retrieved_results_batch: List[List[dict]],
        total_relevant_counts: List[int],
        top_k_values: List[int] = None
    ) -> dict:
        """
        对多个查询的检索结果进行批量评估,并计算平均指标。
        """

        if top_k_values is None:
            top_k_values = [5, 10]

        individual_results = []
        aggregated_metrics = {f'@{k}': {} for k in top_k_values}

        for query, results, total_relevant_count in zip(queries, retrieved_results_batch, total_relevant_counts):
            indiv = self.evaluate_retrieval(query, results, total_relevant_count, top_k_values)
            individual_results.append(indiv)

            # 累积指标
            for k in top_k_values:
                k_str = f'@{k}'
                k_metrics = indiv[f'metrics@{k}']

                for metric_name, metric_value in k_metrics.items():
                    if metric_name not in aggregated_metrics[k_str]:
                        aggregated_metrics[k_str][metric_name] = []

                    aggregated_metrics[k_str][metric_name].append(metric_value)

        # 计算平均值
        for k_str in aggregated_metrics:
            for metric_name in aggregated_metrics[k_str]:
                values = aggregated_metrics[k_str][metric_name]
                aggregated_metrics[k_str][metric_name] = {
                    'mean': np.mean(values),
                    'std': np.std(values),
                    'min': np.min(values),
                    'max': np.max(values),
                }

        return {
            'individual_results': individual_results,
            'aggregated_metrics': aggregated_metrics,
            'total_queries': len(queries),
            'average_retrieval_count': np.mean([r['total_retrieved'] for r in individual_results]),
        }

3.5.4 上下文效率指标

class ContextEfficiencyAnalyzer:
    """分析上下文的使用效率"""

    @staticmethod
    def information_density(
        context: str,
        relevance_score: float
    ) -> float:
        """
        信息密度 = 相关信息密度 / 总信息量

        衡量上下文中"有用信息"的比例。
        """

        total_tokens = len(context) // 4  # 粗略估计
        useful_tokens = total_tokens * relevance_score

        return useful_tokens / total_tokens if total_tokens > 0 else 0

    @staticmethod
    def redundancy_ratio(
        context: str
    ) -> float:
        """
        冗余度 = 重复内容的比例

        通过n-gram重复频率估计冗余度。
        """

        # 简化:提取所有5-gram并统计重复
        words = context.split()

        if len(words) < 5:
            return 0.0

        ngrams = []
        for i in range(len(words) - 4):
            ngram = ' '.join(words[i:i+5])
            ngrams.append(ngram)

        unique_ngrams = len(set(ngrams))
        total_ngrams = len(ngrams)

        # 冗余度 = 1 - (unique / total)
        redundancy = 1 - (unique_ngrams / total_ngrams) if total_ngrams > 0 else 0

        return redundancy

    @staticmethod
    def context_coverage_ratio(
        context: str,
        query: str
    ) -> float:
        """
        覆盖率 = 查询中被上下文覆盖的关键词比例

        衡量上下文对查询的覆盖程度。
        """

        query_keywords = set(query.lower().split())
        context_lower = context.lower()

        covered = sum(1 for kw in query_keywords if kw in context_lower)
        total = len(query_keywords)

        return covered / total if total > 0 else 0

    @staticmethod
    def efficiency_score(
        context: str,
        query: str,
        relevance_score: float,
        max_tokens: int = 2000
    ) -> float:
        """
        综合效率分数 (0-100)

        综合考虑:
        - 信息密度(越高越好)
        - 冗余度(越低越好)
        - 覆盖率(越高越好)
        - Token利用率(在预算内)
        """

        density = ContextEfficiencyAnalyzer.information_density(context, relevance_score)
        redundancy = ContextEfficiencyAnalyzer.redundancy_ratio(context)
        coverage = ContextEfficiencyAnalyzer.context_coverage_ratio(context, query)

        # Token利用率
        context_tokens = len(context) // 4
        token_utilization = min(1.0, context_tokens / max_tokens)

        # 加权综合
        efficiency = (
            density * 0.35 +           # 信息密度权重35%
            (1 - redundancy) * 0.25 +  # 非冗余度权重25%
            coverage * 0.25 +           # 覆盖率权重25%
            token_utilization * 0.15   # Token利用率权重15%
        )

        # 规范化到0-100
        return efficiency * 100


# 使用示例
if __name__ == "__main__":
    evaluator = RetrievalQualityEvaluator()

    # 评估单个查询
    query = "Python教程"
    results = [
        {'id': '1', 'is_relevant': True, 'relevance_score': 0.95},
        {'id': '2', 'is_relevant': False, 'relevance_score': 0.2},
        {'id': '3', 'is_relevant': True, 'relevance_score': 0.85},
        {'id': '4', 'is_relevant': False, 'relevance_score': 0.15},
        {'id': '5', 'is_relevant': True, 'relevance_score': 0.8},
    ]

    total_relevant_count = 4  # 来自黄金标注:语料库中共有4个相关文档
    evaluation = evaluator.evaluate_retrieval(query, results, total_relevant_count)

    print("单个查询评估:")
    print(f"  总相关: {evaluation['total_relevant']}")
    print(f"  Recall@5: {evaluation['metrics@5']['recall']:.2f}")
    print(f"  Precision@5: {evaluation['metrics@5']['precision']:.2f}")

    # 评估效率
    context = """
    Python是一种高级编程语言,以其简洁易读的语法而闻名。
    Python支持多种编程范式,包括过程式、面向对象和函数式编程。
    Python广泛用于数据科学、机器学习和Web开发。
    Python有一个庞大的标准库和第三方库生态系统。
    """

    efficiency = ContextEfficiencyAnalyzer.efficiency_score(context, query, 0.9)
    print(f"\n上下文效率分数: {efficiency:.1f}/100")

3.5.5 A/B测试框架在上下文优化中的应用

from scipy import stats

class ABTestFramework:
    """
    A/B测试框架
    用于统计显著性验证
    """

    @staticmethod
    def paired_ttest(
        control_group: List[float],
        treatment_group: List[float],
        alpha: float = 0.05
    ) -> dict:
        """
        配对t检验

        同一批查询分别跑 control 和 treatment 时,应该比较逐查询差值。
        """

        t_stat, p_value = stats.ttest_rel(control_group, treatment_group)

        is_significant = p_value < alpha

        return {
            't_statistic': t_stat,
            'p_value': p_value,
            'is_significant': is_significant,
            'control_mean': np.mean(control_group),
            'treatment_mean': np.mean(treatment_group),
            'improvement': (
                (np.mean(treatment_group) - np.mean(control_group)) /
                np.mean(control_group) * 100
                if np.mean(control_group) != 0 else 0
            ),
        }

    @staticmethod
    def cohen_d(
        control_group: List[float],
        treatment_group: List[float]
    ) -> float:
        """
        计算效果量 (Effect Size)

        衡量两组差异的实际大小(不只是统计显著性)。
        """

        deltas = np.array(treatment_group) - np.array(control_group)
        mean_diff = np.mean(deltas)

        pooled_std = np.std(deltas)

        if pooled_std == 0:
            return 0

        return mean_diff / pooled_std

    @staticmethod
    def calculate_sample_size(
        effect_size: float = 0.5,  # Cohen's d
        alpha: float = 0.05,
        power: float = 0.8
    ) -> int:
        """
        计算所需样本量

        为了达到统计显著性,需要多少样本?
        """

        # 简化公式
        z_alpha = stats.norm.ppf(1 - alpha / 2)
        z_beta = stats.norm.ppf(power)

        sample_size = int(
            2 * ((z_alpha + z_beta) / effect_size) ** 2
        )

        return sample_size


class ContextOptimizationABTest:
    """上下文优化的A/B测试设计"""

    def __init__(self, control_strategy: str, treatment_strategy: str):
        self.control_strategy = control_strategy
        self.treatment_strategy = treatment_strategy
        self.control_results = []
        self.treatment_results = []

    def run_test(
        self,
        queries: List[str],
        ground_truth_answers: List[str],
        llm_generate_func
    ) -> dict:
        """
        执行A/B测试。

        control_strategy: 原始的上下文工程方案
        treatment_strategy: 新的上下文工程方案
        """

        print(f"Running A/B Test: {self.control_strategy} vs {self.treatment_strategy}")

        # 测试Control组
        for query, ground_truth in zip(queries, ground_truth_answers):
            context = self._prepare_context(query, strategy=self.control_strategy)
            answer = llm_generate_func(context, query)
            similarity = self._similarity(answer, ground_truth)
            self.control_results.append(similarity)

        # 测试Treatment组
        for query, ground_truth in zip(queries, ground_truth_answers):
            context = self._prepare_context(query, strategy=self.treatment_strategy)
            answer = llm_generate_func(context, query)
            similarity = self._similarity(answer, ground_truth)
            self.treatment_results.append(similarity)

        # 统计分析
        test_results = ABTestFramework.paired_ttest(
            self.control_results,
            self.treatment_results
        )

        effect_size = ABTestFramework.cohen_d(
            self.control_results,
            self.treatment_results
        )

        test_results['effect_size'] = effect_size

        return test_results

    def _prepare_context(self, query: str, strategy: str) -> str:
        """准备上下文(这里是简化实现)"""
        # 实际应该调用不同的上下文处理流程
        return f"Using {strategy}: {query}"

    def _similarity(self, text1: str, text2: str) -> float:
        """计算两个文本的相似度"""
        # 简化:基于词重叠。生产评估应换成经过校准的语义指标、
        # LLM-as-a-judge 量表或人工黄金标注,避免把复述当成正确性。
        words1 = set(text1.lower().split())
        words2 = set(text2.lower().split())

        if not words1 or not words2:
            return 0.0

        overlap = len(words1 & words2)
        union = len(words1 | words2)

        return overlap / union

本章节提供了一套完整的量化评估框架,使团队能够科学地衡量上下文工程的效果,而不是依赖主观判断。建议在每个优化周期中定期进行这些评估,以确保持续改进。