|
| 1 | +# -*- coding: utf-8 -*- |
| 2 | +"""Chunk overlap 配置与文本上下文处理工具。""" |
| 3 | + |
| 4 | +from __future__ import annotations |
| 5 | + |
| 6 | +from dataclasses import dataclass |
| 7 | +from typing import TYPE_CHECKING, Any |
| 8 | + |
| 9 | +if TYPE_CHECKING: |
| 10 | + from src.core.llm.tokenizer import Tokenizer |
| 11 | +else: |
| 12 | + Tokenizer = Any |
| 13 | + |
| 14 | + |
| 15 | +@dataclass(slots=True) |
| 16 | +class ChunkOverlapConfig: |
| 17 | + """描述 chunk overlap 的独立配置。""" |
| 18 | + |
| 19 | + enabled: bool = True |
| 20 | + tokens: int = 64 |
| 21 | + |
| 22 | + def __post_init__(self) -> None: |
| 23 | + if self.tokens < 0 or self.tokens > 64: |
| 24 | + raise ValueError("overlap tokens must be between 0 and 64.") |
| 25 | + |
| 26 | + |
| 27 | +class ChunkOverlapper: |
| 28 | + """集中处理 chunk overlap 的 token 截取与上下文拼接。""" |
| 29 | + |
| 30 | + def __init__( |
| 31 | + self, |
| 32 | + tokenizer: Tokenizer, |
| 33 | + config: ChunkOverlapConfig | None = None, |
| 34 | + ) -> None: |
| 35 | + self.tokenizer = tokenizer |
| 36 | + self.config = config or ChunkOverlapConfig() |
| 37 | + |
| 38 | + @property |
| 39 | + def effective_tokens(self) -> int: |
| 40 | + """返回当前实际启用的 overlap token 数。""" |
| 41 | + if not self.config.enabled: |
| 42 | + return 0 |
| 43 | + return self.config.tokens |
| 44 | + |
| 45 | + def count_tokens(self, text: str) -> int: |
| 46 | + """统计文本 token 数。""" |
| 47 | + return self.tokenizer.count_tokens(text.strip()) if text else 0 |
| 48 | + |
| 49 | + def take_first_tokens(self, text: str, token_limit: int) -> str: |
| 50 | + """取出文本开头的指定数量 token。""" |
| 51 | + if not text or token_limit <= 0: |
| 52 | + return "" |
| 53 | + truncated, _ = self.tokenizer.truncate_text(text, token_limit) |
| 54 | + return truncated.strip() |
| 55 | + |
| 56 | + def take_last_tokens(self, text: str, token_limit: int) -> str: |
| 57 | + """取出文本末尾的指定数量 token。""" |
| 58 | + cleaned = text.strip() |
| 59 | + if not cleaned or token_limit <= 0: |
| 60 | + return "" |
| 61 | + if self.count_tokens(cleaned) <= token_limit: |
| 62 | + return cleaned |
| 63 | + |
| 64 | + left = 0 |
| 65 | + right = len(cleaned) - 1 |
| 66 | + best_start = right |
| 67 | + |
| 68 | + while left <= right: |
| 69 | + mid = (left + right) // 2 |
| 70 | + candidate = cleaned[mid:].lstrip() |
| 71 | + tokens = self.count_tokens(candidate) |
| 72 | + if tokens <= token_limit: |
| 73 | + best_start = mid |
| 74 | + right = mid - 1 |
| 75 | + else: |
| 76 | + left = mid + 1 |
| 77 | + |
| 78 | + return cleaned[best_start:].lstrip() |
| 79 | + |
| 80 | + def build_next_chunk( |
| 81 | + self, |
| 82 | + previous_chunk: str, |
| 83 | + next_atom: str, |
| 84 | + *, |
| 85 | + max_chunk_tokens: int, |
| 86 | + ) -> str: |
| 87 | + """在切分发生时,为下一块追加上一块尾部 overlap。""" |
| 88 | + overlap_budget = self.effective_tokens |
| 89 | + if overlap_budget <= 0: |
| 90 | + return next_atom |
| 91 | + |
| 92 | + next_tokens = self.count_tokens(next_atom) |
| 93 | + available_for_overlap = max(0, max_chunk_tokens - next_tokens) |
| 94 | + if available_for_overlap <= 0: |
| 95 | + return next_atom |
| 96 | + |
| 97 | + overlap_tail = self.take_last_tokens( |
| 98 | + previous_chunk, |
| 99 | + min(overlap_budget, available_for_overlap), |
| 100 | + ) |
| 101 | + if not overlap_tail: |
| 102 | + return next_atom |
| 103 | + |
| 104 | + return f"{overlap_tail}\n\n{next_atom}".strip() |
| 105 | + |
| 106 | + def build_neighbor_context( |
| 107 | + self, |
| 108 | + *, |
| 109 | + previous_content: str | None, |
| 110 | + current_content: str, |
| 111 | + next_content: str | None, |
| 112 | + ) -> tuple[str, int, int]: |
| 113 | + """为最终 chunk 构造相邻上下文,并返回实际追加的前后 token 数。""" |
| 114 | + overlap_budget = self.effective_tokens |
| 115 | + if overlap_budget <= 0: |
| 116 | + return current_content, 0, 0 |
| 117 | + |
| 118 | + contextual_parts: list[str] = [] |
| 119 | + previous_tokens = 0 |
| 120 | + next_tokens = 0 |
| 121 | + |
| 122 | + if previous_content: |
| 123 | + previous_context = self.take_last_tokens(previous_content, overlap_budget) |
| 124 | + if previous_context: |
| 125 | + previous_tokens = self.count_tokens(previous_context) |
| 126 | + contextual_parts.append(previous_context) |
| 127 | + |
| 128 | + contextual_parts.append(current_content) |
| 129 | + |
| 130 | + if next_content: |
| 131 | + next_context = self.take_first_tokens(next_content, overlap_budget) |
| 132 | + if next_context: |
| 133 | + next_tokens = self.count_tokens(next_context) |
| 134 | + contextual_parts.append(next_context) |
| 135 | + |
| 136 | + return "\n\n".join(contextual_parts).strip(), previous_tokens, next_tokens |
0 commit comments