| layout | default |
|---|---|
| title | Pydantic AI Tutorial - Chapter 7: Advanced Patterns |
| nav_order | 7 |
| has_children | false |
| parent | Pydantic AI Tutorial |
Welcome to Chapter 7: Advanced Patterns & Multi-Step Workflows. In this part of Pydantic AI Tutorial: Type-Safe AI Agent Development, you will build an intuitive mental model first, then move into concrete implementation details and practical production tradeoffs.
Master complex agent workflows, dynamic reasoning patterns, and sophisticated interaction paradigms for advanced AI applications.
from typing import Dict, Any, Type, Optional
from pydantic_ai import Agent
from pydantic import BaseModel
class AgentFactory:
"""Factory for creating specialized agents dynamically."""
def __init__(self):
self.agent_templates: Dict[str, Dict[str, Any]] = {}
self.created_agents: Dict[str, Agent] = {}
def register_template(self, name: str, template: Dict[str, Any]):
"""Register an agent template."""
self.agent_templates[name] = template
def create_agent(self, template_name: str, customizations: Dict[str, Any] = None) -> Agent:
"""Create agent from template with customizations."""
if template_name not in self.agent_templates:
raise ValueError(f"Template {template_name} not found")
template = self.agent_templates[template_name].copy()
customizations = customizations or {}
# Apply customizations
for key, value in customizations.items():
if key in template:
if isinstance(template[key], dict) and isinstance(value, dict):
template[key].update(value)
else:
template[key] = value
else:
template[key] = value
# Create agent
agent = Agent(**template)
# Store reference
agent_id = f"{template_name}_{id(agent)}"
self.created_agents[agent_id] = agent
return agent
def get_agent_stats(self) -> Dict[str, Any]:
"""Get statistics about created agents."""
template_usage = {}
for agent_id, agent in self.created_agents.items():
template_name = agent_id.split('_')[0]
template_usage[template_name] = template_usage.get(template_name, 0) + 1
return {
"total_agents": len(self.created_agents),
"template_usage": template_usage,
"active_agents": len([a for a in self.created_agents.values() if hasattr(a, '_active')])
}
# Create agent factory
factory = AgentFactory()
# Register agent templates
factory.register_template("code_assistant", {
"model": "openai:gpt-4",
"system_prompt": "You are an expert software developer. Provide clear, correct code solutions with explanations.",
"result_type": None
})
factory.register_template("data_analyst", {
"model": "openai:gpt-4",
"system_prompt": "You are a data analysis expert. Provide insights from data with clear explanations.",
"result_type": None
})
factory.register_template("creative_writer", {
"model": "anthropic:claude-3-sonnet-20240229",
"system_prompt": "You are a creative writer. Craft engaging, imaginative content.",
"result_type": None
})
factory.register_template("math_tutor", {
"model": "groq:mixtral-8x7b-32768",
"system_prompt": "You are a patient math tutor. Explain concepts clearly with examples.",
"result_type": None
})
# Create agents with customizations
code_agent = factory.create_agent("code_assistant", {
"system_prompt": "You are an expert Python developer specializing in web applications."
})
creative_agent = factory.create_agent("creative_writer", {
"model": "openai:gpt-4", # Override model
"system_prompt": "You are a creative writer specializing in science fiction."
})
print("🎯 Created Agents:")
print(f"Code Agent: {code_agent.model}")
print(f"Creative Agent: {creative_agent.model}")
# Test agents
code_result = code_agent.run_sync("Write a Python function to validate email addresses")
creative_result = creative_agent.run_sync("Write a short sci-fi story about AI consciousness")
print("
📝 Agent Results:")
print(f"Code: {len(code_result.data)} chars")
print(f"Creative: {len(creative_result.data)} chars")
print("
📊 Factory Stats:")
stats = factory.get_agent_stats()
for key, value in stats.items():
print(f"{key}: {value}")from typing import List, Dict, Any, Callable
import re
class IntelligentRouter:
"""Router that dynamically selects agents based on query analysis."""
def __init__(self, agent_factory: AgentFactory):
self.agent_factory = agent_factory
self.routing_rules: List[Dict[str, Any]] = []
self.performance_history: Dict[str, int] = {}
def add_routing_rule(self, condition: Callable[[str], bool],
agent_template: str, priority: int = 1):
"""Add a routing rule."""
self.routing_rules.append({
"condition": condition,
"agent_template": agent_template,
"priority": priority
})
# Sort by priority (higher priority first)
self.routing_rules.sort(key=lambda x: x["priority"], reverse=True)
def analyze_query(self, query: str) -> str:
"""Analyze query to determine appropriate agent."""
# Check routing rules
for rule in self.routing_rules:
if rule["condition"](query):
agent_template = rule["agent_template"]
self.performance_history[agent_template] = self.performance_history.get(agent_template, 0) + 1
return agent_template
# Default fallback
return "general_assistant"
def get_agent_for_query(self, query: str) -> Agent:
"""Get appropriate agent for query."""
agent_template = self.analyze_query(query)
# Create agent with query-specific customizations
customizations = self._get_query_customizations(query)
agent = self.agent_factory.create_agent(agent_template, customizations)
return agent
def _get_query_customizations(self, query: str) -> Dict[str, Any]:
"""Get query-specific customizations."""
customizations = {}
# Language-specific customizations for code queries
if "python" in query.lower():
customizations["system_prompt"] = "You are a Python expert. Focus on Pythonic solutions."
elif "javascript" in query.lower():
customizations["system_prompt"] = "You are a JavaScript expert. Use modern ES6+ features."
# Complexity-based customizations
if len(query) > 200:
customizations["model"] = "openai:gpt-4" # Use more capable model
elif len(query) < 50:
customizations["model"] = "groq:mixtral-8x7b-32768" # Use faster model
return customizations
def get_routing_stats(self) -> Dict[str, Any]:
"""Get routing statistics."""
return {
"total_queries": sum(self.performance_history.values()),
"agent_usage": self.performance_history,
"routing_rules": len(self.routing_rules)
}
# Create intelligent router
router = IntelligentRouter(factory)
# Add routing rules
def is_code_query(query: str) -> bool:
code_keywords = ["function", "class", "code", "program", "algorithm", "script"]
return any(keyword in query.lower() for keyword in code_keywords)
def is_math_query(query: str) -> bool:
math_keywords = ["calculate", "equation", "solve", "math", "algebra", "geometry"]
return any(keyword in query.lower() for keyword in math_keywords)
def is_creative_query(query: str) -> bool:
creative_keywords = ["write", "story", "poem", "create", "design", "imagine"]
return any(keyword in query.lower() for keyword in creative_keywords)
def is_data_query(query: str) -> bool:
data_keywords = ["analyze", "data", "statistics", "chart", "graph", "trend"]
return any(keyword in query.lower() for keyword in data_keywords)
router.add_routing_rule(is_code_query, "code_assistant", priority=3)
router.add_routing_rule(is_math_query, "math_tutor", priority=3)
router.add_routing_rule(is_creative_query, "creative_writer", priority=2)
router.add_routing_rule(is_data_query, "data_analyst", priority=2)
# Test intelligent routing
test_queries = [
"Write a Python function to sort a list",
"Calculate the area of a circle with radius 5",
"Write a short story about time travel",
"Analyze this sales data: [100, 150, 200, 180, 220]",
"Hello, how are you today?"
]
print("🎯 Intelligent Routing Test:")
for query in test_queries:
agent = router.get_agent_for_query(query)
print(f"Query: {query[:40]}...")
print(f" Routed to: {agent.model}")
print(f" Agent type: {type(agent).__name__}")
print()
print("📊 Routing Statistics:")
stats = router.get_routing_stats()
for key, value in stats.items():
print(f"{key}: {value}")from typing import List, Dict, Any, Optional
from pydantic import BaseModel, Field
from pydantic_ai import Agent
class ReasoningStep(BaseModel):
step_number: int
thought: str
action: str
observation: str
confidence: float = Field(ge=0.0, le=1.0)
class ChainOfThoughtResult(BaseModel):
problem: str
reasoning_chain: List[ReasoningStep]
final_answer: str
confidence: float
class ChainOfThoughtAgent(Agent):
"""Agent implementing chain-of-thought reasoning."""
def __init__(self, max_steps: int = 5, **kwargs):
super().__init__(**kwargs)
self.max_steps = max_steps
async def reason_step_by_step(self, problem: str) -> ChainOfThoughtResult:
"""Execute chain-of-thought reasoning."""
reasoning_chain = []
current_thought = f"I need to solve: {problem}"
for step in range(self.max_steps):
# Generate next reasoning step
step_prompt = self._create_reasoning_prompt(
problem, current_thought, reasoning_chain, step
)
# Get structured reasoning step
step_result = await self.run(step_prompt)
# Parse the response (simplified - in practice you'd use structured output)
try:
# Extract thought, action, observation from response
thought, action, observation, confidence = self._parse_step_response(step_result.data)
reasoning_step = ReasoningStep(
step_number=step + 1,
thought=thought,
action=action,
observation=observation,
confidence=confidence
)
reasoning_chain.append(reasoning_step)
# Check if we should stop
if confidence > 0.9 or "final answer" in observation.lower():
break
# Update current thought
current_thought = observation
except Exception as e:
print(f"Error in reasoning step {step + 1}: {e}")
break
# Generate final answer
final_answer = await self._generate_final_answer(problem, reasoning_chain)
return ChainOfThoughtResult(
problem=problem,
reasoning_chain=reasoning_chain,
final_answer=final_answer,
confidence=sum(step.confidence for step in reasoning_chain) / len(reasoning_chain)
)
def _create_reasoning_prompt(self, problem: str, current_thought: str,
reasoning_chain: List[ReasoningStep], step: int) -> str:
"""Create prompt for next reasoning step."""
chain_summary = "\n".join([
f"Step {s.step_number}: {s.thought[:50]}... -> {s.action[:30]}... -> {s.observation[:50]}..."
for s in reasoning_chain[-2:] # Last 2 steps for context
])
prompt = f"""
You are solving this problem using step-by-step reasoning: {problem}
Current thought: {current_thought}
Previous reasoning steps:
{chain_summary}
For step {step + 1}, provide:
1. Your current thought process
2. What action you'll take
3. Your observation/result
4. Confidence level (0-1)
Format your response as:
Thought: [your thinking]
Action: [what you'll do]
Observation: [result of action]
Confidence: [0.0-1.0]
""".strip()
return prompt
def _parse_step_response(self, response: str) -> tuple[str, str, str, float]:
"""Parse step response into components."""
lines = response.strip().split('\n')
thought = ""
action = ""
observation = ""
confidence = 0.5
for line in lines:
line = line.strip()
if line.startswith("Thought:"):
thought = line[8:].strip()
elif line.startswith("Action:"):
action = line[7:].strip()
elif line.startswith("Observation:"):
observation = line[13:].strip()
elif line.startswith("Confidence:"):
try:
confidence = float(line[11:].strip())
confidence = max(0.0, min(1.0, confidence)) # Clamp to valid range
except:
pass
return thought, action, observation, confidence
async def _generate_final_answer(self, problem: str, reasoning_chain: List[ReasoningStep]) -> str:
"""Generate final answer from reasoning chain."""
chain_summary = "\n".join([
f"Step {step.step_number}: {step.observation}"
for step in reasoning_chain
])
final_prompt = f"""
Based on this reasoning chain, provide a final answer to: {problem}
Reasoning process:
{chain_summary}
Final Answer:
""".strip()
final_result = await self.run(final_prompt)
return final_result.data
# Create chain-of-thought agent
cot_agent = ChainOfThoughtAgent(
model='openai:gpt-4',
max_steps=4
)
# Test chain-of-thought reasoning
reasoning_problem = """
A bat and a ball cost $1.10 in total. The bat costs $1.00 more than the ball.
How much does the ball cost?
"""
async def test_chain_of_thought():
"""Test chain-of-thought reasoning."""
print("🧠 Testing Chain-of-Thought Reasoning:")
print(f"Problem: {reasoning_problem.strip()}")
result = await cot_agent.reason_step_by_step(reasoning_problem)
print(f"\nReasoning Steps: {len(result.reasoning_chain)}")
print(f"Final Confidence: {result.confidence:.2f}")
print(f"Final Answer: {result.final_answer}")
print("\nDetailed Reasoning Chain:")
for step in result.reasoning_chain:
print(f"Step {step.step_number}:")
print(f" Thought: {step.thought}")
print(f" Action: {step.action}")
print(f" Observation: {step.observation}")
print(f" Confidence: {step.confidence:.2f}")
print()
# Run chain-of-thought test
asyncio.run(test_chain_of_thought())from typing import List, Dict, Any, Optional
from enum import Enum
import asyncio
class CollaborationMode(Enum):
SEQUENTIAL = "sequential"
PARALLEL = "parallel"
HIERARCHICAL = "hierarchical"
class AgentOrchestrator:
"""Orchestrate multiple agents for complex tasks."""
def __init__(self):
self.agents: Dict[str, Agent] = {}
self.collaboration_history: List[Dict[str, Any]] = []
def register_agent(self, name: str, agent: Agent, capabilities: List[str] = None):
"""Register an agent with capabilities."""
self.agents[name] = {
"agent": agent,
"capabilities": capabilities or [],
"performance_score": 1.0
}
def find_agents_for_task(self, task_requirements: List[str]) -> List[str]:
"""Find agents suitable for task requirements."""
suitable_agents = []
for name, agent_info in self.agents.items():
agent_caps = agent_info["capabilities"]
if any(req in agent_caps for req in task_requirements):
suitable_agents.append(name)
# Sort by performance score
suitable_agents.sort(
key=lambda x: self.agents[x]["performance_score"],
reverse=True
)
return suitable_agents[:3] # Return top 3
async def execute_collaborative_task(self, task: str, mode: CollaborationMode = CollaborationMode.SEQUENTIAL) -> Dict[str, Any]:
"""Execute task with multiple agents collaborating."""
collaboration_session = {
"task": task,
"mode": mode.value,
"start_time": asyncio.get_event_loop().time(),
"agent_contributions": [],
"final_result": None
}
print(f"🤝 Starting {mode.value} collaboration for: {task[:50]}...")
if mode == CollaborationMode.SEQUENTIAL:
result = await self._execute_sequential(task)
elif mode == CollaborationMode.PARALLEL:
result = await self._execute_parallel(task)
elif mode == CollaborationMode.HIERARCHICAL:
result = await self._execute_hierarchical(task)
collaboration_session["final_result"] = result
collaboration_session["end_time"] = asyncio.get_event_loop().time()
collaboration_session["duration"] = collaboration_session["end_time"] - collaboration_session["start_time"]
self.collaboration_history.append(collaboration_session)
return collaboration_session
async def _execute_sequential(self, task: str) -> str:
"""Execute task sequentially through agents."""
# Break task into subtasks
subtasks = await self._decompose_task(task)
current_result = ""
for subtask in subtasks:
agent_name = self.find_agents_for_task([subtask["type"]])[0]
agent = self.agents[agent_name]["agent"]
# Enhance prompt with context
enhanced_prompt = f"""
Main task: {task}
Current progress: {current_result}
Subtask: {subtask["description"]}
Build upon previous work to complete this subtask.
""".strip()
result = await agent.run(enhanced_prompt)
current_result += f"\n\n{subtask['description']}:\n{result.data}"
# Record contribution
self._record_contribution(agent_name, subtask["description"], result.data)
return current_result
async def _execute_parallel(self, task: str) -> str:
"""Execute task components in parallel."""
subtasks = await self._decompose_task(task)
# Execute subtasks concurrently
parallel_tasks = []
for subtask in subtasks:
agent_name = self.find_agents_for_task([subtask["type"]])[0]
agent = self.agents[agent_name]["agent"]
task_coro = self._execute_subtask(agent, agent_name, subtask, task)
parallel_tasks.append(task_coro)
results = await asyncio.gather(*parallel_tasks)
# Combine results
combined_result = "\n\n".join(results)
# Final synthesis
synthesis_agent = self.agents["synthesizer"]["agent"]
final_result = await synthesis_agent.run(f"""
Synthesize these parallel contributions into a coherent final result:
Task: {task}
Contributions:
{combined_result}
Provide a unified, well-structured final answer.
""")
return final_result.data
async def _execute_hierarchical(self, task: str) -> str:
"""Execute task with hierarchical agent coordination."""
# Coordinator agent breaks down task
coordinator = self.agents["coordinator"]["agent"]
breakdown = await coordinator.run(f"""
Break down this complex task into coordinated subtasks: {task}
Identify:
1. Key phases or components
2. Which types of specialists are needed for each
3. Dependencies between subtasks
4. Coordination requirements
Provide a structured breakdown.
""")
# Parse breakdown and execute hierarchically
# (Simplified - in practice, you'd parse the breakdown properly)
return await self._execute_sequential_with_dependencies(task, breakdown.data)
async def _decompose_task(self, task: str) -> List[Dict[str, Any]]:
"""Decompose task into subtasks."""
decomposer = self.agents["decomposer"]["agent"]
decomposition = await decomposer.run(f"""
Decompose this task into 3-5 logical subtasks: {task}
For each subtask, specify:
- Description
- Type (research, analysis, writing, coding, etc.)
- Dependencies (if any)
Format as a clear list.
""")
# Parse decomposition (simplified)
subtasks = [
{"description": "Research the topic thoroughly", "type": "research"},
{"description": "Analyze findings and extract insights", "type": "analysis"},
{"description": "Create final deliverable", "type": "writing"}
]
return subtasks
async def _execute_subtask(self, agent: Agent, agent_name: str, subtask: Dict[str, Any], main_task: str) -> str:
"""Execute a single subtask."""
result = await agent.run(f"""
Main task: {main_task}
Your subtask: {subtask["description"]}
Type: {subtask["type"]}
Provide your contribution to this subtask.
""")
self._record_contribution(agent_name, subtask["description"], result.data)
return f"**{agent_name}** ({subtask['type']}): {result.data}"
def _record_contribution(self, agent_name: str, subtask: str, result: str):
"""Record agent contribution for performance tracking."""
contribution = {
"agent": agent_name,
"subtask": subtask,
"result_length": len(result),
"timestamp": asyncio.get_event_loop().time()
}
# Update performance score
if len(result) > 100: # Simple quality metric
self.agents[agent_name]["performance_score"] *= 1.01 # Small boost
else:
self.agents[agent_name]["performance_score"] *= 0.99 # Small penalty
# Create orchestrator and register agents
orchestrator = AgentOrchestrator()
# Register agents with capabilities
orchestrator.register_agent("researcher", Agent('openai:gpt-4'), ["research", "information_gathering"])
orchestrator.register_agent("analyst", Agent('anthropic:claude-3-sonnet-20240229'), ["analysis", "insights"])
orchestrator.register_agent("writer", Agent('google:gemini-1.5-pro'), ["writing", "content_creation"])
orchestrator.register_agent("decomposer", Agent('groq:mixtral-8x7b-32768'), ["task_decomposition"])
orchestrator.register_agent("synthesizer", Agent('openai:gpt-4'), ["synthesis", "coordination"])
async def test_agent_collaboration():
"""Test multi-agent collaboration modes."""
complex_task = """
Create a comprehensive business plan for a sustainable energy startup.
Include market analysis, technical approach, financial projections,
and go-to-market strategy.
"""
print("🎯 Testing Multi-Agent Collaboration:")
# Test sequential collaboration
print("\n📋 Sequential Collaboration:")
sequential_result = await orchestrator.execute_collaborative_task(
complex_task, CollaborationMode.SEQUENTIAL
)
print(f"Duration: {sequential_result['duration']:.2f}s")
print(f"Agents involved: {len(sequential_result['agent_contributions'])}")
# Test parallel collaboration
print("\n⚡ Parallel Collaboration:")
parallel_result = await orchestrator.execute_collaborative_task(
complex_task, CollaborationMode.PARALLEL
)
print(f"Duration: {parallel_result['duration']:.2f}s")
print(f"Agents involved: {len(parallel_result['agent_contributions'])}")
# Compare performance
speedup = sequential_result['duration'] / parallel_result['duration']
print(f"\n🚀 Parallel speedup: {speedup:.2f}x")
# Show final results
print(f"\n📄 Sequential result length: {len(sequential_result['final_result'])} chars")
print(f"📄 Parallel result length: {len(parallel_result['final_result'])} chars")
# Run collaboration tests
asyncio.run(test_agent_collaboration())from typing import Dict, List, Any, Optional
from collections import defaultdict
import json
class SelfImprovingAgent(Agent):
"""Agent that learns and improves from feedback."""
def __init__(self, learning_file: str = "agent_learning.json", **kwargs):
super().__init__(**kwargs)
self.learning_file = learning_file
self.feedback_history: List[Dict[str, Any]] = []
self.patterns_learned: Dict[str, Dict[str, Any]] = {}
self.performance_metrics: Dict[str, float] = defaultdict(float)
self._load_learning_data()
def _load_learning_data(self):
"""Load learned patterns and feedback."""
try:
with open(self.learning_file, 'r') as f:
data = json.load(f)
self.feedback_history = data.get("feedback_history", [])
self.patterns_learned = data.get("patterns_learned", {})
self.performance_metrics.update(data.get("performance_metrics", {}))
except FileNotFoundError:
# Initialize with empty learning data
pass
def _save_learning_data(self):
"""Save learned patterns and feedback."""
data = {
"feedback_history": self.feedback_history[-100:], # Keep last 100
"patterns_learned": self.patterns_learned,
"performance_metrics": dict(self.performance_metrics)
}
with open(self.learning_file, 'w') as f:
json.dump(data, f, indent=2)
async def run_with_learning(self, prompt: str, context: Dict[str, Any] = None) -> Dict[str, Any]:
"""Run with learning and improvement capabilities."""
# Analyze query and retrieve relevant patterns
relevant_patterns = self._find_relevant_patterns(prompt)
# Enhance prompt with learned patterns
enhanced_prompt = self._enhance_prompt_with_patterns(prompt, relevant_patterns)
# Execute with enhanced context
result = await self.run(enhanced_prompt)
# Store for potential feedback
execution_record = {
"prompt": prompt,
"enhanced_prompt": enhanced_prompt,
"result": result.data,
"patterns_used": list(relevant_patterns.keys()),
"timestamp": asyncio.get_event_loop().time(),
"feedback_pending": True
}
self.feedback_history.append(execution_record)
return {
"result": result.data,
"patterns_applied": len(relevant_patterns),
"execution_record": execution_record
}
def provide_feedback(self, execution_record: Dict[str, Any], rating: float, comments: str = ""):
"""Provide feedback on execution to enable learning."""
execution_record["feedback"] = {
"rating": rating, # 1.0 to 5.0
"comments": comments,
"timestamp": asyncio.get_event_loop().time()
}
execution_record["feedback_pending"] = False
# Learn from feedback
self._learn_from_feedback(execution_record)
# Update performance metrics
self.performance_metrics["total_feedback"] += 1
self.performance_metrics["average_rating"] = (
(self.performance_metrics["average_rating"] * (self.performance_metrics["total_feedback"] - 1)) + rating
) / self.performance_metrics["total_feedback"]
self._save_learning_data()
def _find_relevant_patterns(self, prompt: str) -> Dict[str, Dict[str, Any]]:
"""Find patterns relevant to the current prompt."""
relevant_patterns = {}
prompt_lower = prompt.lower()
for pattern_name, pattern_data in self.patterns_learned.items():
# Check if pattern keywords match prompt
keywords = pattern_data.get("keywords", [])
if any(keyword in prompt_lower for keyword in keywords):
relevant_patterns[pattern_name] = pattern_data
return relevant_patterns
def _enhance_prompt_with_patterns(self, prompt: str, patterns: Dict[str, Dict[str, Any]]) -> str:
"""Enhance prompt with learned patterns."""
if not patterns:
return prompt
enhancements = []
for pattern_name, pattern_data in patterns.items():
if pattern_data.get("successful", False):
enhancement = pattern_data.get("enhancement_template", "")
if enhancement:
enhancements.append(enhancement)
if enhancements:
enhanced_prompt = f"""
{prompt}
Based on successful past approaches:
{chr(10).join(f"- {enh}" for enh in enhancements[:3])}
Apply these insights to provide an excellent response.
""".strip()
return enhanced_prompt
return prompt
def _learn_from_feedback(self, execution_record: Dict[str, Any]):
"""Learn from feedback to improve future responses."""
feedback = execution_record.get("feedback", {})
rating = feedback.get("rating", 3.0)
if rating >= 4.0: # Successful execution
# Extract successful patterns
prompt = execution_record["prompt"]
result = execution_record["result"]
# Create pattern from successful execution
pattern_name = f"success_pattern_{len(self.patterns_learned)}"
self.patterns_learned[pattern_name] = {
"keywords": self._extract_keywords(prompt),
"enhancement_template": f"Use approach similar to: {result[:100]}...",
"successful": True,
"rating": rating,
"created_from": execution_record["timestamp"]
}
elif rating <= 2.0: # Unsuccessful execution
# Learn what to avoid
prompt = execution_record["prompt"]
avoid_pattern = f"avoid_pattern_{len(self.patterns_learned)}"
self.patterns_learned[avoid_pattern] = {
"keywords": self._extract_keywords(prompt),
"avoid_approach": f"Avoid approach that led to: {feedback.get('comments', 'poor results')}",
"successful": False,
"rating": rating
}
def _extract_keywords(self, text: str) -> List[str]:
"""Extract keywords from text for pattern matching."""
# Simple keyword extraction (could be enhanced with NLP)
words = text.lower().split()
keywords = []
# Remove common stop words
stop_words = {"the", "a", "an", "and", "or", "but", "in", "on", "at", "to", "for", "of", "with", "by"}
meaningful_words = [word for word in words if word not in stop_words and len(word) > 3]
# Get most frequent meaningful words
from collections import Counter
word_counts = Counter(meaningful_words)
keywords = [word for word, count in word_counts.most_common(5)]
return keywords
def get_learning_stats(self) -> Dict[str, Any]:
"""Get learning and performance statistics."""
successful_patterns = len([p for p in self.patterns_learned.values() if p.get("successful", False)])
unsuccessful_patterns = len([p for p in self.patterns_learned.values() if not p.get("successful", False)])
return {
"total_patterns_learned": len(self.patterns_learned),
"successful_patterns": successful_patterns,
"unsuccessful_patterns": unsuccessful_patterns,
"total_feedback": self.performance_metrics["total_feedback"],
"average_rating": self.performance_metrics["average_rating"],
"learning_efficiency": successful_patterns / max(1, len(self.patterns_learned))
}
# Create self-improving agent
learning_agent = SelfImprovingAgent(
model='openai:gpt-4',
learning_file="agent_learning.json"
)
async def test_self_improvement():
"""Test self-improving agent capabilities."""
test_prompts = [
"Explain machine learning in simple terms",
"Write a Python function to calculate fibonacci numbers",
"Analyze the benefits of renewable energy",
"Create a recipe for chocolate chip cookies"
]
print("🧠 Testing Self-Improving Agent:")
for i, prompt in enumerate(test_prompts, 1):
print(f"\n🎯 Test {i}: {prompt[:40]}...")
# Execute with learning
result = await learning_agent.run_with_learning(prompt)
print(f"Patterns applied: {result['patterns_applied']}")
print(f"Response length: {len(result['result'])} chars")
# Simulate feedback (in practice, this would come from users)
rating = 4.0 if len(result['result']) > 100 else 3.0 # Simple quality metric
learning_agent.provide_feedback(result['execution_record'], rating, "Good response")
print(f"Feedback provided: {rating}/5.0")
# Show learning statistics
stats = learning_agent.get_learning_stats()
print("
📊 Learning Statistics:")
for key, value in stats.items():
if isinstance(value, float):
print(f"{key}: {value:.2f}")
else:
print(f"{key}: {value}")
print(f"\nLearned patterns: {stats['total_patterns_learned']}")
print(f"Learning efficiency: {stats['learning_efficiency']:.1%}")
# Run self-improvement test
asyncio.run(test_self_improvement())This advanced patterns chapter demonstrates sophisticated agent architectures including dynamic composition, chain-of-thought reasoning, multi-agent collaboration, and self-improving systems that learn from feedback and experience. 🚀
Most teams struggle here because the hard part is not writing more code, but deciding clear boundaries for self, print, agent so behavior stays predictable as complexity grows.
In practical terms, this chapter helps you avoid three common failures:
- coupling core logic too tightly to one implementation path
- missing the handoff boundaries between setup, execution, and validation
- shipping changes without clear rollback or observability strategy
After working through this chapter, you should be able to reason about Chapter 7: Advanced Patterns & Multi-Step Workflows as an operating subsystem inside Pydantic AI Tutorial: Type-Safe AI Agent Development, with explicit contracts for inputs, state transitions, and outputs.
Use the implementation notes around result, Dict, task as your checklist when adapting these patterns to your own repository.
Under the hood, Chapter 7: Advanced Patterns & Multi-Step Workflows usually follows a repeatable control path:
- Context bootstrap: initialize runtime config and prerequisites for
self. - Input normalization: shape incoming data so
printreceives stable contracts. - Core execution: run the main logic branch and propagate intermediate state through
agent. - Policy and safety checks: enforce limits, auth scopes, and failure boundaries.
- Output composition: return canonical result payloads for downstream consumers.
- Operational telemetry: emit logs/metrics needed for debugging and performance tuning.
When debugging, walk this sequence in order and confirm each stage has explicit success/failure conditions.
Use the following upstream sources to verify implementation details while reading this chapter:
- View Repo
Why it matters: authoritative reference on
View Repo(github.com).
Suggested trace strategy:
- search upstream code for
selfandprintto map concrete implementation paths - compare docs claims against actual runtime/config code before reusing patterns in production