本章节汇总第十四章中最常用的代码模式,覆盖上下文工程的核心场景。这些代码分为两类:
- 可直接运行的最小示例:完整、自包含,适合先本地跑通关键链路。
- 教学性实现模式:用于展示接口边界、系统拆分和核心算法思路,接入真实 SDK 或线上基础设施前通常需要补齐鉴权、模型调用层和错误处理。
建议在学习这些章节时,不仅仅阅读代码,更要动手运行、修改和扩展这些示例。通过实际操作来理解上下文工程的各个环节。
本节先给出一个 可直接运行的本地最小示例。它不依赖外部模型 API,而是用标准库完成分词、检索、上下文组装和带来源标注的回答,便于先验证完整链路。若希望运行带样本文档、权限过滤和评估集的版本,可直接使用 examples/enterprise_know/。
"""
minimal_rag_system.py
一个可直接运行的本地 RAG 最小示例。
依赖: Python 3.11+(仅标准库)
"""
from __future__ import annotations
from dataclasses import dataclass
from math import sqrt
import re
from typing import Dict, List
def tokenize(text: str) -> List[str]:
"""同时兼容英文单词和中文字符的简单分词。"""
raw_tokens = re.findall(r"[\u4e00-\u9fff]+|[A-Za-z0-9_]+", text.lower())
normalized_tokens: List[str] = []
for token in raw_tokens:
if re.fullmatch(r"[\u4e00-\u9fff]+", token):
normalized_tokens.extend(list(token))
else:
normalized_tokens.append(token)
return normalized_tokens
def term_frequency(text: str) -> Dict[str, float]:
"""构造简单的词频向量。"""
frequencies: Dict[str, float] = {}
for token in tokenize(text):
frequencies[token] = frequencies.get(token, 0.0) + 1.0
return frequencies
def cosine_similarity(left: Dict[str, float], right: Dict[str, float]) -> float:
"""计算两个稀疏词频向量的余弦相似度。"""
shared_terms = set(left) & set(right)
dot_product = sum(left[term] * right[term] for term in shared_terms)
left_norm = sqrt(sum(value * value for value in left.values()))
right_norm = sqrt(sum(value * value for value in right.values()))
if left_norm == 0 or right_norm == 0:
return 0.0
return dot_product / (left_norm * right_norm)
def split_sentences(text: str) -> List[str]:
"""按中英文句号做简单切句。"""
sentences = re.split(r"(?<=[。!?.!?])\s*", text.strip())
return [sentence.strip() for sentence in sentences if sentence.strip()]
@dataclass
class Document:
"""文档结构"""
id: str
content: str
metadata: Dict
@dataclass
class RetrievalResult:
"""检索结果"""
document: Document
score: float
class SimpleEmbeddingModel:
"""
极简嵌入模型包装。
这里直接使用词频向量,便于本地运行和理解检索原理。
实际生产可替换为领域嵌入模型或向量服务。
"""
def embed(self, text: str) -> Dict[str, float]:
"""获取文本的嵌入向量"""
return term_frequency(text)
class SimpleVectorStore:
"""
简化的向量存储。
实际应用中应使用带持久化和过滤能力的向量数据库。
"""
def __init__(self):
self.documents: List[Document] = []
self.vectors: List[Dict[str, float]] = []
self.embedding_model = SimpleEmbeddingModel()
def add_document(self, doc: Document) -> None:
"""添加文档到向量库"""
vector = self.embedding_model.embed(doc.content)
self.documents.append(doc)
self.vectors.append(vector)
def add_documents(self, docs: List[Document]) -> None:
"""批量添加文档"""
for doc in docs:
self.add_document(doc)
def retrieve(self, query: str, top_k: int = 3) -> List[RetrievalResult]:
"""检索与查询最相似的文档"""
if not self.vectors:
return []
# 获取查询的嵌入
query_vector = self.embedding_model.embed(query)
# 计算相似度
similarities = [
cosine_similarity(query_vector, document_vector)
for document_vector in self.vectors
]
# 获取 top-k 最相似的文档
top_indices = sorted(
range(len(similarities)),
key=lambda index: similarities[index],
reverse=True
)[:top_k]
results = [
RetrievalResult(
document=self.documents[i],
score=float(similarities[i])
)
for i in top_indices
if similarities[i] > 0.0 # 过滤负相似度
]
return results
class ContextAssembler:
"""
上下文组装器。
将检索到的文档组织成提示词中的上下文。
"""
def __init__(self, max_context_chars: int = 1200):
self.max_context_chars = max_context_chars
def assemble_context(
self,
retrieved_results: List[RetrievalResult],
user_query: str
) -> str:
"""组装最终的上下文提示词"""
current_chars = len(user_query)
context_parts = []
# 按相似度从高到低添加文档
for result in retrieved_results:
doc_chars = len(result.document.content)
# 检查是否超过上下文预算
if current_chars + doc_chars > self.max_context_chars:
break
context_parts.append(
f"【文档 {result.document.id}】(相关度: {result.score:.2f})\n"
f"{result.document.content}\n"
)
current_chars += doc_chars
return f"""你是一个有帮助的问答助手。请只依据参考文档回答,并在结论后附上来源。
【参考文档】
{chr(10).join(context_parts)}
【用户问题】
{user_query}
请基于上述参考文档回答问题,并在答案中使用 [来源: 文档ID] 标注依据。"""
class LocalAnswerGenerator:
"""基于检索结果生成带来源标注的本地答案。"""
def __init__(self, max_sentences: int = 2):
self.max_sentences = max_sentences
def generate(self, query: str, retrieved_results: List[RetrievalResult]) -> str:
if not retrieved_results:
return "未在参考文档中找到相关信息。"
query_terms = set(tokenize(query))
ranked_sentences = []
for result in retrieved_results:
for sentence in split_sentences(result.document.content):
sentence_terms = set(tokenize(sentence))
overlap = len(query_terms & sentence_terms)
if overlap > 0:
ranked_sentences.append((overlap, result.score, sentence, result.document.id))
if not ranked_sentences:
return "未在参考文档中找到足够依据回答该问题。"
ranked_sentences.sort(reverse=True)
selected_sentences: List[str] = []
selected_sources: List[str] = []
seen_sentences = set()
for _, _, sentence, source_id in ranked_sentences:
if sentence in seen_sentences:
continue
selected_sentences.append(sentence)
selected_sources.append(source_id)
seen_sentences.add(sentence)
if len(selected_sentences) >= self.max_sentences:
break
source_list = " ".join(f"[来源: {source_id}]" for source_id in dict.fromkeys(selected_sources))
return f"{';'.join(selected_sentences)} {source_list}".strip()
class RAGPipeline:
"""
完整的RAG管道。
管理从查询到生成的整个流程。
"""
def __init__(self):
self.vector_store = SimpleVectorStore()
self.context_assembler = ContextAssembler(max_context_chars=1200)
self.answer_generator = LocalAnswerGenerator(max_sentences=2)
def add_documents(self, documents: List[Dict]) -> None:
"""添加文档到系统"""
doc_objects = [
Document(
id=doc.get('id', str(i)),
content=doc['content'],
metadata=doc.get('metadata', {})
)
for i, doc in enumerate(documents)
]
self.vector_store.add_documents(doc_objects)
def retrieve(self, query: str, top_k: int = 3) -> List[RetrievalResult]:
"""检索相关文档"""
return self.vector_store.retrieve(query, top_k=top_k)
def query(self, user_query: str, top_k: int = 3) -> Dict:
"""执行完整的RAG查询"""
# 1. 检索相关文档
retrieved = self.retrieve(user_query, top_k=top_k)
# 2. 组装上下文
assembled_context = self.context_assembler.assemble_context(retrieved, user_query)
# 3. 基于检索结果生成带引用的答案
answer = self.answer_generator.generate(user_query, retrieved)
# 4. 返回结果(包含中间步骤用于调试)
return {
'query': user_query,
'retrieved_documents': [
{
'id': r.document.id,
'content': r.document.content[:100] + '...',
'relevance_score': r.score
}
for r in retrieved
],
'assembled_context': assembled_context,
'generated_answer': answer,
'context_size_chars': len(assembled_context),
}
# 使用示例
if __name__ == "__main__":
# 可直接运行的最小示例
rag = RAGPipeline()
# 添加示例文档
documents = [
{
'id': 'doc1',
'content': 'Python是一种高级编程语言,以其简洁易读的语法而闻名。Python广泛用于数据科学、机器学习和Web开发。',
'metadata': {'source': 'wiki'}
},
{
'id': 'doc2',
'content': '机器学习是人工智能的一个分支,它使计算机能够从数据中学习而无需显式编程。常见的算法包括决策树、随机森林和神经网络。',
'metadata': {'source': 'wiki'}
},
{
'id': 'doc3',
'content': '向量数据库是一种优化的数据库系统,专门用于存储和检索高维向量数据。它们在语义搜索和RAG系统中至关重要。',
'metadata': {'source': 'tech_blog'}
},
]
rag.add_documents(documents)
# 执行查询
query = "Python有什么用途?"
result = rag.query(query, top_k=2)
print("=" * 50)
print(f"查询: {result['query']}")
print("\n检索到的文档:")
for doc in result['retrieved_documents']:
print(f" - {doc['id']}: 相关度 {doc['relevance_score']:.2f}")
print(f"\nContext大小: {result['context_size_chars']} 字符")
print(f"\n生成的答案:\n{result['generated_answer']}")从本节开始,代码重点展示实现模式与接口边界,并不保证复制后即可在未配置依赖、环境变量和外部服务的环境中运行。若接入真实模型服务,请按所选供应商的 SDK、鉴权方式和错误处理规范替换调用层。
"""
context_compression.py
演示上下文压缩的多种策略。
"""
from typing import Dict, List
from openai import OpenAI
client = OpenAI()
class ContextCompressionStrategy:
"""上下文压缩策略基类"""
def compress(self, context: str) -> str:
"""压缩上下文"""
raise NotImplementedError
class TokenLimitCompression(ContextCompressionStrategy):
"""基于Token限制的压缩"""
def __init__(self, max_tokens: int = 1000):
self.max_tokens = max_tokens
def compress(self, context: str) -> str:
"""简单的Token限制压缩:截断"""
# 简化:假设1个Token约4个字符
max_chars = self.max_tokens * 4
return context[:max_chars]
class LLMSummarizationCompression(ContextCompressionStrategy):
"""使用LLM的摘要压缩"""
def __init__(self, model: str = "your-chat-model", compression_ratio: float = 0.3):
"""
compression_ratio: 压缩后的长度与原始长度的比例
"""
self.model = model
self.compression_ratio = compression_ratio
def compress(self, context: str) -> str:
"""使用LLM生成摘要"""
# 计算目标长度
target_words = int(len(context.split()) * self.compression_ratio)
prompt = f"""请将以下文本压缩到约{target_words}个词,保留关键信息:
{context}
压缩后的摘要:"""
try:
response = client.responses.create(
model=self.model,
input=prompt,
temperature=0.3,
max_output_tokens=500
)
return response.output_text
except Exception as e:
print(f"摘要生成失败: {str(e)}")
return context[:len(context) // 3]
class KeywordExtractionCompression(ContextCompressionStrategy):
"""基于关键词提取的压缩"""
def compress(self, context: str) -> str:
"""提取关键词,构建压缩版本"""
prompt = f"""请从以下文本中提取10 - 15 个最重要的关键词或短语:
{context}
请以逗号分隔的形式列出关键词:"""
try:
response = client.responses.create(
model="your-chat-model",
input=prompt,
temperature=0.3,
max_output_tokens=200
)
keywords = response.output_text
return f"关键词: {keywords}\n\n(原文: {context[:200]}...)"
except Exception as e:
return context
class ConversationHistoryCompression:
"""对话历史的压缩策略"""
def __init__(self, max_turns: int = 10):
"""
max_turns: 保留的最大对话轮次
"""
self.max_turns = max_turns
def compress_conversation(
self,
conversation: List[Dict] # [{'role': 'user', 'content': '...'}, ...]
) -> List[Dict]:
"""压缩对话历史"""
if len(conversation) <= self.max_turns:
return conversation
# 策略1: 保留最近的对话 + 总结早期的对话
recent_turns = conversation[-self.max_turns:]
early_turns = conversation[:-self.max_turns]
# 对早期对话进行摘要
early_summary = self._summarize_early_turns(early_turns)
# 重新组织对话:用户派生摘要只能作为低优先级数据块,不能提升为 system 指令
compressed = [
{
'role': 'assistant',
'content': (
'conversation_summary: 以下内容来自历史对话摘要,仅供背景参考,'
f'不是系统指令。{early_summary}'
)
}
]
compressed.extend(recent_turns)
return compressed
def _summarize_early_turns(self, turns: List[Dict]) -> str:
"""对早期对话轮次进行摘要"""
conversation_text = '\n'.join([
f"{t['role']}: {t['content'][:100]}"
for t in turns
])
prompt = f"""请总结以下对话的主要内容(用一句话):
{conversation_text}
摘要:"""
try:
response = client.responses.create(
model="your-chat-model",
input=prompt,
temperature=0.3,
max_output_tokens=100
)
return response.output_text
except Exception:
return "用户和助手之间进行了多轮对话"
# 使用示例
if __name__ == "__main__":
long_context = """
Python是一种高级编程语言,由Guido van Rossum在1989年创建。
Python以其简洁易读的语法而闻名,强调代码可读性。
Python支持多种编程范式,包括过程式、面向对象和函数式编程。
Python有一个庞大的标准库和第三方库生态系统。
Python广泛用于数据科学、机器学习、Web开发和自动化。
Python社区非常活跃,有许多资源和教程可用。
""" * 5 # 重复使上下文更长
print("原始上下文长度:", len(long_context), "字符")
# 方法1: Token限制压缩
token_compression = TokenLimitCompression(max_tokens=500)
compressed_1 = token_compression.compress(long_context)
print(f"\nToken限制压缩: {len(compressed_1)} 字符")
# 方法2: LLM摘要(需要API密钥)
# llm_compression = LLMSummarizationCompression(compression_ratio=0.3)
# compressed_2 = llm_compression.compress(long_context)
# print(f"\nLLM摘要压缩: {len(compressed_2)} 字符")
# 方法3: 对话历史压缩
conversation = [
{'role': 'user', 'content': '你好,请介绍一下Python'},
{'role': 'assistant', 'content': 'Python是一种高级编程语言...'},
{'role': 'user', 'content': 'Python有什么优点?'},
{'role': 'assistant', 'content': '优点包括...'},
# ... 更多对话轮次
]
conv_compression = ConversationHistoryCompression(max_turns=5)
# compressed_conv = conv_compression.compress_conversation(conversation)
# print(f"\n压缩后的对话轮次: {len(compressed_conv)}")"""
dynamic_context_selection.py
根据查询动态选择最相关的上下文。
"""
from typing import List, Dict, Tuple
import numpy as np
class DynamicContextSelector:
"""动态上下文选择器"""
def __init__(self, initial_budget_tokens: int = 3000):
"""
initial_budget_tokens: 初始的Token预算
"""
self.initial_budget_tokens = initial_budget_tokens
def select_context(
self,
query: str,
candidates: List[Dict], # [{'id': '...', 'content': '...', 'relevance_score': 0.8}]
token_budget: int = None
) -> List[Dict]:
"""
根据Token预算和相关度动态选择上下文。
返回最多的相关且在Token限制内的文档。
"""
if token_budget is None:
token_budget = self.initial_budget_tokens
# 按相关度排序
sorted_candidates = sorted(
candidates,
key=lambda x: x.get('relevance_score', 0),
reverse=True
)
# 贪心地选择文档
selected = []
current_tokens = len(query) // 4 # 查询本身的Token数
for doc in sorted_candidates:
doc_tokens = len(doc.get('content', '')) // 4
# 如果加入这个文档不会超过预算,则加入
if current_tokens + doc_tokens <= token_budget:
selected.append(doc)
current_tokens += doc_tokens
else:
# 如果预算用完,停止
break
return selected
def adaptive_selection_with_quality(
self,
query: str,
candidates: List[Dict],
token_budget: int = None,
quality_threshold: float = 0.5
) -> Tuple[List[Dict], Dict]:
"""
更高级的自适应选择:考虑质量阈值和边际收益。
返回: (选中的文档, 选择统计)
"""
if token_budget is None:
token_budget = self.initial_budget_tokens
# 过滤低质量的候选
high_quality_candidates = [
doc for doc in candidates
if doc.get('relevance_score', 0) >= quality_threshold
]
if not high_quality_candidates:
# 如果没有高质量候选,返回最相关的
high_quality_candidates = sorted(
candidates,
key=lambda x: x.get('relevance_score', 0),
reverse=True
)[:1]
# 计算每个候选的"效率"(相关度 / token数)
candidates_with_efficiency = []
for doc in high_quality_candidates:
doc_tokens = max(1, len(doc.get('content', '')) // 4)
relevance = doc.get('relevance_score', 0)
efficiency = relevance / doc_tokens
candidates_with_efficiency.append({
'doc': doc,
'tokens': doc_tokens,
'relevance': relevance,
'efficiency': efficiency
})
# 按效率排序
candidates_with_efficiency.sort(
key=lambda x: x['efficiency'],
reverse=True
)
# 贪心选择
selected = []
current_tokens = len(query) // 4
stats = {
'total_candidates': len(candidates),
'high_quality_candidates': len(high_quality_candidates),
'selected_count': 0,
'total_tokens_used': 0,
'avg_relevance': 0,
}
for item in candidates_with_efficiency:
if current_tokens + item['tokens'] <= token_budget:
selected.append(item['doc'])
current_tokens += item['tokens']
stats['selected_count'] += 1
else:
break
if selected:
stats['total_tokens_used'] = current_tokens
stats['avg_relevance'] = np.mean([d.get('relevance_score', 0) for d in selected])
return selected, stats"""
simple_memory_system.py
为AI智能体实现简单的记忆系统。
"""
from typing import List, Dict, Optional
from datetime import datetime, timedelta
import json
import re
SAFE_MEMORY_TYPES = {"fact", "event", "preference"}
class Memory:
"""单条记忆项"""
def __init__(self, content: str, memory_type: str = "fact", importance: float = 0.5):
"""
memory_type: 'fact', 'event', 'preference'
importance: 0-1,表示重要程度
"""
if memory_type not in SAFE_MEMORY_TYPES:
raise ValueError(f"Unsupported memory type: {memory_type}")
self.id = str(datetime.now().timestamp())
self.content = content
self.memory_type = memory_type
self.importance = importance
self.created_at = datetime.now()
self.last_accessed = datetime.now()
self.access_count = 0
def to_dict(self) -> Dict:
return {
'id': self.id,
'content': self.content,
'type': self.memory_type,
'importance': self.importance,
'created_at': self.created_at.isoformat(),
'last_accessed': self.last_accessed.isoformat(),
'access_count': self.access_count,
}
class SimpleMemorySystem:
"""
简单的记忆系统。
用于存储和检索对话中的关键信息。
"""
def __init__(self, max_memories: int = 100, retention_days: int = 30):
"""
max_memories: 最多保留的记忆条数
retention_days: 记忆保留天数
"""
self.max_memories = max_memories
self.retention_days = retention_days
self.memories: List[Memory] = []
def store_memory(self, content: str, memory_type: str = "fact", importance: float = 0.5) -> Memory:
"""
存储一条新记忆。
注意:用户原文不能直接作为“指令”长期保存。生产系统应记录来源、同意状态、
置信度和可回滚版本,并把疑似提示注入或敏感信息隔离到人工复核队列。
"""
memory = Memory(content, memory_type, importance)
self.memories.append(memory)
# 如果超过容量,删除最不重要的记忆
if len(self.memories) > self.max_memories:
self._cleanup()
return memory
def recall(self, query: str, top_k: int = 3) -> List[Memory]:
"""
回忆相关的记忆。
简化版本:基于关键词匹配。
实际应用应使用向量相似度。
"""
# 清理过期记忆
self._cleanup_expired()
# 简单的关键词匹配
query_words = self._tokenize_keywords(query)
scored_memories = []
for memory in self.memories:
memory_words = self._tokenize_keywords(memory.content)
overlap = len(query_words & memory_words)
if overlap > 0:
# 匹配分数 = 关键词重叠 + 重要性 + 访问频率衰减
score = (
overlap +
memory.importance * 2 +
(memory.access_count * 0.1)
)
scored_memories.append((memory, score))
# 排序并返回top-k
scored_memories.sort(key=lambda x: x[1], reverse=True)
recalled = [mem for mem, _ in scored_memories[:top_k]]
# 更新访问信息
for mem in recalled:
mem.last_accessed = datetime.now()
mem.access_count += 1
return recalled
def _tokenize_keywords(self, text: str):
"""演示用关键词切分:英文按词,中文按单字;生产环境应使用分词或向量检索。"""
return set(re.findall(r"[a-zA-Z0-9_]+|[\u4e00-\u9fff]", text.lower()))
def _cleanup(self) -> None:
"""
清理:删除最不重要的记忆以腾出空间。
重要性评分 = 重要度 + 访问频率 - 年龄衰减
"""
current_time = datetime.now()
# 计算每条记忆的优先级分数
for memory in self.memories:
age_days = (current_time - memory.created_at).days
priority = (
memory.importance * 100 +
memory.access_count * 10 -
age_days * 0.5
)
memory.priority_score = priority
# 按优先级排序,保留分数最高的
self.memories.sort(key=lambda x: getattr(x, 'priority_score', 0), reverse=True)
self.memories = self.memories[:self.max_memories]
def _cleanup_expired(self) -> None:
"""删除过期的记忆"""
current_time = datetime.now()
cutoff_date = current_time - timedelta(days=self.retention_days)
self.memories = [
mem for mem in self.memories
if mem.created_at > cutoff_date
]
def export_memories(self) -> str:
"""导出记忆为JSON格式(用于持久化)。生产环境应加密并控制访问权限。"""
return json.dumps([mem.to_dict() for mem in self.memories], indent=2, default=str)
def import_memories(self, json_str: str) -> None:
"""从JSON导入记忆"""
data = json.loads(json_str)
for item in data:
memory = Memory(
content=item['content'],
memory_type=item.get('type', 'fact'),
importance=item.get('importance', 0.5)
)
memory.id = item['id']
memory.created_at = datetime.fromisoformat(item['created_at'])
memory.last_accessed = datetime.fromisoformat(item['last_accessed'])
memory.access_count = item['access_count']
self.memories.append(memory)
def get_context_for_conversation(self, user_input: str, max_tokens: int = 1000) -> str:
"""
为对话生成包含记忆的上下文。
"""
recalled = self.recall(user_input, top_k=5)
context = "【用户背景信息】\n"
tokens_used = 0
for memory in recalled:
if memory.memory_type == 'preference':
prefix = "用户偏好:"
elif memory.memory_type == 'event':
prefix = "相关事件:"
else:
prefix = "已知事实:"
memory_text = f"{prefix} {memory.content}\n"
memory_tokens = len(memory_text) // 4
if tokens_used + memory_tokens <= max_tokens:
context += memory_text
tokens_used += memory_tokens
else:
break
return context if tokens_used > 0 else ""
# 使用示例
if __name__ == "__main__":
# 创建记忆系统
memory = SimpleMemorySystem(max_memories=50)
# 存储一些记忆
memory.store_memory("用户名是张三", memory_type="fact", importance=0.9)
memory.store_memory("用户来自北京", memory_type="fact", importance=0.7)
memory.store_memory("用户喜欢Python编程", memory_type="preference", importance=0.8)
memory.store_memory("用户是数据科学家", memory_type="fact", importance=0.8)
# 检索相关记忆
recalled = memory.recall("告诉我关于用户的信息")
print("回忆起的记忆:")
for mem in recalled:
print(f" - {mem.content}")
# 为对话生成上下文
context = memory.get_context_for_conversation("你好,有什么问题吗?")
print("\n对话上下文:")
print(context)这些代码示例展示了上下文工程中的核心实现模式,可作为教学参考骨架。实际生产系统还需要补齐鉴权、持久化、监控、错误处理、成本控制和安全评审。