Skip to content

Latest commit

 

History

History
1500 lines (1212 loc) · 47.7 KB

File metadata and controls

1500 lines (1212 loc) · 47.7 KB

Agents: The Next Level of Software

A Seminar on Agent Oriented Programming


Table of Contents

  1. Introduction
  2. What are Agents?
  3. Agent Oriented Programming (AOP)
  4. Traditional vs. Agentic Programming
  5. Communication Technologies
  6. Development Frameworks
  7. Practical Examples
  8. Conclusion & Next Steps

Introduction

Welcome Software Engineers! 👋

  • Target Audience: Experienced developers familiar with:

    • Object-Oriented Programming (OOP)
    • REST APIs
    • Client-server architectures
  • Goal: Bridge from traditional programming to agent-oriented approaches

  • Focus: Practical implementation and real-world applications

Why Agents Matter Now?

  • AI Revolution: LLMs and AI capabilities are democratizing intelligent software
  • Complexity Management: Modern systems require autonomous, adaptive components
  • Scalability: Traditional architectures struggle with dynamic, intelligent behaviors
  • User Expectations: Users expect more intelligent, context-aware applications

What are Agents?

Definition: Software Agent

An agent is an autonomous software entity that:

  • Perceives its environment through sensors
  • Acts upon that environment through actuators
  • Pursues goals or objectives
  • Adapts behavior based on experience

Key Characteristics

1. Autonomy

  • Operates independently without constant human intervention
  • Makes decisions based on internal logic and external stimuli

2. Reactivity

  • Responds to changes in environment
  • Real-time adaptation to new conditions

3. Proactiveness

  • Takes initiative to achieve goals
  • Plans and executes strategies

4. Social Ability

  • Communicates with other agents
  • Collaborates to achieve collective objectives

Agent vs. Traditional Object

Aspect Traditional Object Software Agent
Behavior Passive (method calls) Active (autonomous actions)
State Data + methods Beliefs, desires, intentions
Interaction Direct method invocation Message passing, negotiation
Lifecycle Created/destroyed by code Persistent, goal-driven existence
Decision Making Deterministic logic AI-powered reasoning

Types of Agents

1. Reactive Agents

  • Simple stimulus-response behavior
  • No internal state or planning
  • Example: Thermostat, basic chatbot

2. Deliberative Agents

  • Internal world model
  • Planning and reasoning capabilities
  • Example: AI assistants, autonomous vehicles

3. Hybrid Agents

  • Combination of reactive and deliberative
  • Layered architecture
  • Example: Modern AI applications

Agent Oriented Programming (AOP)

What is AOP?

Agent Oriented Programming is a programming paradigm where:

  • Software systems are conceptualized as collections of interacting agents
  • Each agent encapsulates autonomous behavior and decision-making
  • System functionality emerges from agent interactions

Core Concepts

1. Agent Architecture

┌─────────────────┐
│   Environment   │
│                 │
│ ┌─────────────┐ │
│ │    Agent    │ │
│ │             │ │
│ │ ┌─────────┐ │ │
│ │ │ Sensors │ │ │
│ │ └─────────┘ │ │
│ │ ┌─────────┐ │ │
│ │ │Planning │ │ │
│ │ │ Engine  │ │ │
│ │ └─────────┘ │ │
│ │ ┌─────────┐ │ │
│ │ │Actuators│ │ │
│ │ └─────────┘ │ │
│ └─────────────┘ │
└─────────────────┘

2. Mental Models

  • Beliefs: Agent's knowledge about the world
  • Desires: Agent's goals and objectives
  • Intentions: Agent's committed plans of action

3. Agent Communication

  • Message passing between agents
  • Protocols for negotiation and coordination
  • Shared ontologies and vocabularies

AOP Principles

1. Encapsulation of Behavior

  • Each agent encapsulates specific capabilities
  • Clear boundaries and responsibilities
  • High cohesion, loose coupling

2. Distributed Problem Solving

  • Complex problems decomposed across multiple agents
  • Parallel processing and decision making
  • Fault tolerance through redundancy

3. Emergent Behavior

  • System behavior emerges from agent interactions
  • Collective intelligence greater than sum of parts
  • Adaptive and self-organizing systems

Traditional vs. Agentic Programming

Programming Paradigm Evolution

Procedural → Object-Oriented → Agent-Oriented
    ↓              ↓               ↓
 Functions      Objects         Agents
 Data Flow      Encapsulation   Autonomy
 Sequential     Inheritance     Intelligence

Detailed Comparison

1. System Architecture

Traditional (Client-Server)

Client ←→ Server ←→ Database
   ↓        ↓         ↓
Static   Central    Passive
Roles    Control    Storage

Agentic (Multi-Agent System)

Agent ←→ Agent ←→ Agent
  ↓       ↑↓       ↓
Dynamic  Distributed  Active
Roles    Control    Intelligence

2. Control Flow

Traditional Programming

  • Sequential execution
  • Deterministic flow
  • Centralized control
  • Synchronous communication

Agentic Programming

  • Concurrent execution
  • Non-deterministic behavior
  • Distributed control
  • Asynchronous communication

3. Problem Solving Approach

Aspect Traditional Agentic
Decomposition Functional modules Autonomous agents
Coordination Direct method calls Message negotiation
Adaptation Manual updates Autonomous learning
Scalability Vertical scaling Horizontal agent addition
Fault Handling Exception handling Agent redundancy

4. Code Example Comparison

Traditional OOP Approach:

class OrderProcessor:
    def __init__(self, inventory, payment, shipping):
        self.inventory = inventory
        self.payment = payment
        self.shipping = shipping
    
    def process_order(self, order):
        # Sequential, synchronous processing
        if self.inventory.check_availability(order.items):
            if self.payment.process_payment(order.payment_info):
                return self.shipping.schedule_delivery(order)
        return False

Agentic Approach:

class OrderAgent(Agent):
    async def handle_order(self, order):
        # Parallel, asynchronous agent coordination
        inventory_result = await self.communicate(
            InventoryAgent, CheckAvailabilityMessage(order.items)
        )
        
        if inventory_result.available:
            payment_task = self.communicate(
                PaymentAgent, ProcessPaymentMessage(order.payment_info)
            )
            shipping_task = self.communicate(
                ShippingAgent, ScheduleDeliveryMessage(order)
            )
            
            payment_result, shipping_result = await asyncio.gather(
                payment_task, shipping_task
            )
            
            return self.coordinate_results(payment_result, shipping_result)

5. Benefits of Agentic Approach

Flexibility

  • Agents can adapt behavior based on context
  • Easy to add/remove agents without system redesign

Scalability

  • Horizontal scaling by adding more agents
  • Load distribution across agent network

Fault Tolerance

  • Agent failure doesn't crash entire system
  • Self-healing through agent redundancy

Maintainability

  • Clear separation of concerns
  • Agents can be updated independently

Communication Technologies

Overview of Agent Communication

Agent systems require sophisticated communication mechanisms to enable:

  • Coordination: Agents working together toward common goals
  • Negotiation: Agents bargaining for resources or services
  • Information Sharing: Agents exchanging knowledge and data
  • Synchronization: Agents coordinating their actions in time

1. Model Context Protocol (MCP)

What is MCP?

Model Context Protocol is a standardized communication protocol designed for:

  • Connecting AI models with external data sources
  • Enabling context-aware AI applications
  • Standardizing tool and resource access for AI agents

Key Features

Standardized Interface

  • Uniform API for tool and resource access
  • Language-agnostic protocol
  • JSON-RPC based communication

Context Management

  • Maintains conversation context across interactions
  • Supports complex, multi-turn conversations
  • Context persistence and retrieval

Tool Integration

  • Standardized way to expose tools to AI models
  • Dynamic tool discovery and invocation
  • Type-safe tool interfaces

MCP Architecture

┌─────────────┐    MCP     ┌─────────────┐    MCP     ┌─────────────┐
│   AI Model  │ ←────────→ │ MCP Server  │ ←────────→ │  Resources  │
│   (Client)  │  Protocol  │             │  Protocol  │   (Tools)   │
└─────────────┘            └─────────────┘            └─────────────┘

MCP Message Types

# Tool Definition
{
    "method": "tools/list",
    "params": {},
    "result": {
        "tools": [
            {
                "name": "get_weather",
                "description": "Get current weather for a location",
                "inputSchema": {
                    "type": "object",
                    "properties": {
                        "location": {"type": "string"}
                    }
                }
            }
        ]
    }
}

# Tool Invocation
{
    "method": "tools/call",
    "params": {
        "name": "get_weather",
        "arguments": {"location": "New York"}
    },
    "result": {
        "content": [
            {
                "type": "text",
                "text": "Current weather in New York: 72°F, sunny"
            }
        ]
    }
}

2. Agent-to-Agent (A2A) Communication

What is A2A?

Agent-to-Agent communication refers to direct communication protocols between autonomous agents, enabling:

  • Peer-to-peer agent interactions
  • Distributed decision making
  • Collaborative problem solving
  • Dynamic agent coordination

A2A Communication Patterns

1. Request-Response

# Agent A requests service from Agent B
request = ServiceRequest(
    sender="AgentA",
    receiver="AgentB", 
    service="process_data",
    parameters={"data": data_payload}
)

response = await agent_a.send_request(request)

2. Publish-Subscribe

# Agent publishes event
event = DataUpdateEvent(
    publisher="DataAgent",
    topic="market_data",
    payload={"price": 100.50, "volume": 1000}
)
agent.publish(event)

# Other agents subscribe to events
@agent.subscribe("market_data")
async def handle_market_update(event):
    # Process market data update
    pass

3. Contract Net Protocol

# Agent announces task
task_announcement = TaskAnnouncement(
    task_id="optimize_route",
    requirements={"skills": ["pathfinding"], "load_capacity": 100},
    deadline=datetime.now() + timedelta(hours=1)
)

# Agents bid for task
bid = TaskBid(
    bidder="RouteAgent",
    task_id="optimize_route",
    cost=50,
    estimated_completion=timedelta(minutes=30)
)

# Task assignment based on bids

4. Blackboard Pattern

# Shared knowledge space
blackboard = Blackboard()

# Agents contribute knowledge
agent_a.contribute(blackboard, "sensor_data", temperature_reading)
agent_b.contribute(blackboard, "analysis", trend_analysis)

# Agents consume knowledge
weather_data = agent_c.consume(blackboard, "sensor_data")

Communication Security

Authentication

  • Agent identity verification
  • Certificate-based authentication
  • Token-based access control

Encryption

  • End-to-end encryption for sensitive data
  • TLS for transport security
  • Message signing for integrity

Access Control

  • Role-based permissions
  • Capability-based security
  • Audit trails for communication

Protocol Selection Guidelines

Use Case Recommended Protocol Reasoning
AI-Tool Integration MCP Standardized, context-aware
Agent Coordination A2A Request-Response Direct, efficient
Event Broadcasting A2A Publish-Subscribe Scalable, decoupled
Task Allocation A2A Contract Net Fair, competitive
Knowledge Sharing A2A Blackboard Collaborative, persistent

Development Frameworks

Framework Landscape

The agent development ecosystem offers various frameworks, each with unique strengths and use cases.

1. Agent Development Kit (ADK)

Overview

ADK is a comprehensive framework specifically designed for building production-ready agent systems with emphasis on:

  • Enterprise-grade reliability
  • Scalable agent architectures
  • Built-in communication protocols
  • Integration with existing systems

Key Features

Agent Lifecycle Management

from adk import Agent, AgentManager

class CustomerServiceAgent(Agent):
    def __init__(self):
        super().__init__("customer_service")
        self.knowledge_base = None
        
    async def on_start(self):
        self.knowledge_base = await self.load_knowledge_base()
        
    async def on_message(self, message):
        response = await self.process_customer_query(message.content)
        return response
        
    async def on_stop(self):
        await self.save_conversation_history()

# Agent deployment
manager = AgentManager()
agent = CustomerServiceAgent()
await manager.deploy(agent)

Built-in Communication

# MCP integration
@agent.expose_tool
async def get_order_status(order_id: str) -> str:
    order = await self.database.get_order(order_id)
    return f"Order {order_id} status: {order.status}"

# A2A communication
async def coordinate_with_shipping(self, order_data):
    shipping_agent = await self.find_agent("shipping_agent")
    response = await self.communicate(
        shipping_agent, 
        ScheduleDeliveryMessage(order_data)
    )
    return response

Advantages

  • Production-ready out of the box
  • Comprehensive monitoring and logging
  • Built-in fault tolerance
  • Enterprise integration capabilities

Best For

  • Large-scale enterprise applications
  • Mission-critical systems
  • Complex multi-agent coordination

2. LangChain

Overview

LangChain is a popular framework for building applications with large language models, offering:

  • Modular components for LLM applications
  • Pre-built chains for common patterns
  • Extensive tool and data source integrations
  • Strong community and ecosystem

Key Components

Agent Creation

from langchain.agents import initialize_agent, AgentType
from langchain.llms import OpenAI
from langchain.tools import Tool

def search_tool(query: str) -> str:
    # Custom tool implementation
    return f"Search results for: {query}"

tools = [
    Tool(
        name="Search",
        func=search_tool,
        description="Useful for searching information"
    )
]

llm = OpenAI(temperature=0)
agent = initialize_agent(
    tools, 
    llm, 
    agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION,
    verbose=True
)

result = agent.run("What is the weather like today?")

Chain Composition

from langchain.chains import LLMChain, SequentialChain
from langchain.prompts import PromptTemplate

# Multi-step processing chain
analysis_chain = LLMChain(
    llm=llm,
    prompt=PromptTemplate(
        input_variables=["text"],
        template="Analyze the sentiment of: {text}"
    ),
    output_key="sentiment"
)

summary_chain = LLMChain(
    llm=llm,
    prompt=PromptTemplate(
        input_variables=["text", "sentiment"],
        template="Summarize {text} with {sentiment} sentiment"
    ),
    output_key="summary"
)

overall_chain = SequentialChain(
    chains=[analysis_chain, summary_chain],
    input_variables=["text"],
    output_variables=["sentiment", "summary"]
)

Advantages

  • Rapid prototyping and development
  • Extensive pre-built components
  • Strong LLM integration
  • Active community support

Best For

  • LLM-powered applications
  • Rapid prototyping
  • Research and experimentation

3. LangGraph

Overview

LangGraph extends LangChain with graph-based agent architectures, enabling:

  • Complex multi-agent workflows
  • Stateful agent interactions
  • Conditional routing and branching
  • Visual workflow representation

Key Features

Graph-Based Agent Flow

from langgraph import StateGraph, END
from typing import TypedDict

class AgentState(TypedDict):
    messages: list
    current_step: str
    context: dict

def research_node(state: AgentState):
    # Research agent logic
    research_results = perform_research(state["messages"][-1])
    return {
        "messages": state["messages"] + [research_results],
        "current_step": "analysis",
        "context": {**state["context"], "research": research_results}
    }

def analysis_node(state: AgentState):
    # Analysis agent logic
    analysis = analyze_data(state["context"]["research"])
    return {
        "messages": state["messages"] + [analysis],
        "current_step": "complete",
        "context": {**state["context"], "analysis": analysis}
    }

# Build the graph
workflow = StateGraph(AgentState)
workflow.add_node("research", research_node)
workflow.add_node("analysis", analysis_node)

workflow.add_edge("research", "analysis")
workflow.add_conditional_edges(
    "analysis",
    lambda x: "complete" if x["current_step"] == "complete" else "research",
    {"complete": END, "research": "research"}
)

app = workflow.compile()

Conditional Logic

def route_decision(state: AgentState) -> str:
    confidence = state["context"].get("confidence", 0)
    if confidence > 0.8:
        return "finalize"
    elif confidence > 0.5:
        return "review"
    else:
        return "research_more"

workflow.add_conditional_edges(
    "analysis",
    route_decision,
    {
        "finalize": "finalize_node",
        "review": "review_node", 
        "research_more": "research_node"
    }
)

Advantages

  • Visual workflow representation
  • Complex multi-agent orchestration
  • Stateful agent interactions
  • Flexible routing logic

Best For

  • Complex multi-step workflows
  • Agent coordination scenarios
  • Research and analysis pipelines

4. Other Notable Frameworks

AutoGen

from autogen import ConversableAgent

assistant = ConversableAgent(
    name="assistant",
    system_message="You are a helpful AI assistant.",
    llm_config={"model": "gpt-4"}
)

user_proxy = ConversableAgent(
    name="user_proxy",
    human_input_mode="ALWAYS",
    code_execution_config={"use_docker": False}
)

# Multi-agent conversation
user_proxy.initiate_chat(
    assistant,
    message="Help me analyze this data and create a report."
)

CrewAI

from crewai import Agent, Task, Crew

researcher = Agent(
    role='Researcher',
    goal='Research and gather information',
    backstory='Expert researcher with analytical skills'
)

writer = Agent(
    role='Writer', 
    goal='Create compelling content',
    backstory='Skilled writer with creative abilities'
)

research_task = Task(
    description='Research the latest trends in AI',
    agent=researcher
)

writing_task = Task(
    description='Write an article based on research',
    agent=writer
)

crew = Crew(
    agents=[researcher, writer],
    tasks=[research_task, writing_task]
)

result = crew.kickoff()

Framework Selection Guide

Framework Strengths Use Cases Learning Curve
ADK Production-ready, enterprise features Large-scale systems, mission-critical Moderate
LangChain Rapid development, extensive tools LLM apps, prototyping Low
LangGraph Complex workflows, visual design Multi-step processes, research Moderate
AutoGen Conversational agents, code execution Interactive AI, coding assistance Low
CrewAI Role-based agents, task management Content creation, analysis Low

Practical Examples

Project 1: Intelligent Customer Service System

System Overview

A multi-agent customer service system that handles inquiries, processes orders, and provides support through coordinated agent interactions.

Architecture

┌─────────────────┐    ┌─────────────────┐    ┌─────────────────┐
│   Reception     │    │   Knowledge     │    │   Order         │
│   Agent         │←→  │   Agent         │←→  │   Agent         │
└─────────────────┘    └─────────────────┘    └─────────────────┘
         ↑                       ↑                       ↑
         │              MCP Protocol             A2A Communication
         ↓                       ↓                       ↓
┌─────────────────┐    ┌─────────────────┐    ┌─────────────────┐
│   Customer      │    │   Database      │    │   Shipping      │
│   Interface     │    │   Server        │    │   Agent         │
└─────────────────┘    └─────────────────┘    └─────────────────┘

Implementation with ADK

1. Reception Agent (Entry Point)

from adk import Agent, MCPServer
from typing import Dict, Any

class ReceptionAgent(Agent):
    def __init__(self):
        super().__init__("reception_agent")
        self.knowledge_agent = None
        self.order_agent = None
        
    async def on_start(self):
        # Find other agents in the system
        self.knowledge_agent = await self.discover_agent("knowledge_agent")
        self.order_agent = await self.discover_agent("order_agent")
        
        # Setup MCP server for external interfaces
        self.mcp_server = MCPServer()
        self.setup_mcp_tools()
        
    def setup_mcp_tools(self):
        @self.mcp_server.tool("handle_customer_inquiry")
        async def handle_inquiry(
            inquiry: str, 
            customer_id: str = None
        ) -> Dict[str, Any]:
            """Handle customer inquiry and route to appropriate agent"""
            
            # Classify the inquiry
            classification = await self.classify_inquiry(inquiry)
            
            if classification == "knowledge_query":
                response = await self.communicate(
                    self.knowledge_agent,
                    KnowledgeQueryMessage(
                        query=inquiry,
                        customer_id=customer_id
                    )
                )
            elif classification == "order_related":
                response = await self.communicate(
                    self.order_agent,
                    OrderInquiryMessage(
                        inquiry=inquiry,
                        customer_id=customer_id
                    )
                )
            else:
                response = await self.handle_general_inquiry(inquiry)
                
            return {
                "response": response.content,
                "classification": classification,
                "timestamp": datetime.now().isoformat()
            }
    
    async def classify_inquiry(self, inquiry: str) -> str:
        # Use LLM or rule-based classification
        classification_prompt = f"""
        Classify this customer inquiry:
        "{inquiry}"
        
        Categories:
        - knowledge_query: General questions about products/services
        - order_related: Order status, shipping, returns
        - general: Greetings, complaints, other
        
        Return only the category name.
        """
        
        result = await self.llm_call(classification_prompt)
        return result.strip().lower()

2. Knowledge Agent (Information Provider)

class KnowledgeAgent(Agent):
    def __init__(self):
        super().__init__("knowledge_agent")
        self.knowledge_base = None
        self.vector_store = None
        
    async def on_start(self):
        # Load knowledge base and setup vector search
        self.knowledge_base = await self.load_knowledge_base()
        self.vector_store = await self.setup_vector_store()
        
    @message_handler(KnowledgeQueryMessage)
    async def handle_knowledge_query(self, message: KnowledgeQueryMessage):
        # Perform semantic search in knowledge base
        relevant_docs = await self.vector_store.similarity_search(
            message.query, 
            k=5
        )
        
        # Generate contextual response
        context = "\n".join([doc.content for doc in relevant_docs])
        response = await self.generate_response(message.query, context)
        
        # Log interaction for learning
        await self.log_interaction(message.customer_id, message.query, response)
        
        return KnowledgeResponseMessage(
            response=response,
            sources=[doc.metadata for doc in relevant_docs],
            confidence=self.calculate_confidence(relevant_docs)
        )
    
    async def generate_response(self, query: str, context: str) -> str:
        prompt = f"""
        Based on the following knowledge base context, answer the customer query.
        
        Context:
        {context}
        
        Customer Query: {query}
        
        Provide a helpful, accurate, and friendly response. If the information
        is not available in the context, say so politely and suggest alternatives.
        """
        
        response = await self.llm_call(prompt)
        return response

3. Order Agent (Order Management)

class OrderAgent(Agent):
    def __init__(self):
        super().__init__("order_agent")
        self.shipping_agent = None
        
    async def on_start(self):
        self.shipping_agent = await self.discover_agent("shipping_agent")
        
    @message_handler(OrderInquiryMessage)
    async def handle_order_inquiry(self, message: OrderInquiryMessage):
        # Extract order ID from inquiry
        order_id = await self.extract_order_id(message.inquiry)
        
        if order_id:
            # Get order details
            order_details = await self.get_order_details(order_id)
            
            if "shipping" in message.inquiry.lower():
                # Get shipping information from shipping agent
                shipping_info = await self.communicate(
                    self.shipping_agent,
                    ShippingInquiryMessage(order_id=order_id)
                )
                response = self.format_shipping_response(order_details, shipping_info)
            else:
                response = self.format_order_response(order_details)
        else:
            response = "I couldn't find an order number in your message. Could you please provide your order ID?"
            
        return OrderResponseMessage(response=response)
    
    async def extract_order_id(self, inquiry: str) -> str:
        # Use regex or LLM to extract order ID
        import re
        pattern = r'#?(\d{6,})'
        match = re.search(pattern, inquiry)
        return match.group(1) if match else None

MCP Integration Example

External System Integration via MCP

# External CRM system MCP client
class CRMClient:
    def __init__(self, mcp_endpoint: str):
        self.client = MCPClient(mcp_endpoint)
        
    async def get_customer_profile(self, customer_id: str):
        response = await self.client.call_tool(
            "get_customer_profile",
            {"customer_id": customer_id}
        )
        return response
    
    async def update_interaction_history(self, customer_id: str, interaction: dict):
        await self.client.call_tool(
            "log_interaction",
            {
                "customer_id": customer_id,
                "interaction": interaction
            }
        )

# Integration in Reception Agent
class EnhancedReceptionAgent(ReceptionAgent):
    def __init__(self):
        super().__init__()
        self.crm_client = CRMClient("http://crm-system:8080/mcp")
        
    async def handle_customer_inquiry(self, inquiry: str, customer_id: str):
        # Get customer context from CRM
        if customer_id:
            customer_profile = await self.crm_client.get_customer_profile(customer_id)
            context = f"Customer tier: {customer_profile.get('tier', 'standard')}"
        else:
            context = "New customer interaction"
            
        # Process inquiry with context
        response = await super().handle_customer_inquiry(inquiry, customer_id)
        
        # Log interaction back to CRM
        if customer_id:
            await self.crm_client.update_interaction_history(
                customer_id,
                {
                    "inquiry": inquiry,
                    "response": response["response"],
                    "timestamp": response["timestamp"],
                    "agent": self.name
                }
            )
            
        return response

Project 2: Smart Content Management System

System Overview

A content management system where agents collaborate to create, review, and publish content automatically based on user requirements and content strategies.

Agent Architecture

┌─────────────────┐    ┌─────────────────┐    ┌─────────────────┐
│   Content       │    │   Research      │    │   Editor        │
│   Planner       │←→  │   Agent         │←→  │   Agent         │
└─────────────────┘    └─────────────────┘    └─────────────────┘
         ↑                       ↑                       ↑
         │                       │                       │
         ↓                       ↓                       ↓
┌─────────────────┐    ┌─────────────────┐    ┌─────────────────┐
│   SEO           │    │   Fact Checker  │    │   Publisher     │
│   Agent         │←→  │   Agent         │←→  │   Agent         │
└─────────────────┘    └─────────────────┘    └─────────────────┘

Implementation

Content Planner Agent

class ContentPlannerAgent(Agent):
    def __init__(self):
        super().__init__("content_planner")
        self.content_strategy = None
        
    async def on_start(self):
        self.content_strategy = await self.load_content_strategy()
        
    @message_handler(ContentRequestMessage)
    async def plan_content(self, message: ContentRequestMessage):
        # Analyze content requirements
        requirements = await self.analyze_requirements(message.brief)
        
        # Create content plan
        plan = ContentPlan(
            topic=requirements.topic,
            target_audience=requirements.audience,
            content_type=requirements.type,
            keywords=requirements.keywords,
            outline=await self.generate_outline(requirements),
            research_points=await self.identify_research_needs(requirements)
        )
        
        # Initiate content creation workflow
        await self.start_content_workflow(plan)
        
        return ContentPlanMessage(plan=plan)
    
    async def start_content_workflow(self, plan: ContentPlan):
        # Send research request
        research_agent = await self.discover_agent("research_agent")
        await self.communicate(
            research_agent,
            ResearchRequestMessage(
                topic=plan.topic,
                research_points=plan.research_points,
                content_id=plan.id
            )
        )

Research Agent with A2A Communication

class ResearchAgent(Agent):
    def __init__(self):
        super().__init__("research_agent")
        
    @message_handler(ResearchRequestMessage)
    async def conduct_research(self, message: ResearchRequestMessage):
        research_results = []
        
        # Parallel research across multiple sources
        tasks = [
            self.research_web_sources(message.topic),
            self.research_academic_sources(message.topic),
            self.research_industry_reports(message.topic)
        ]
        
        results = await asyncio.gather(*tasks, return_exceptions=True)
        
        # Compile research findings
        for result in results:
            if not isinstance(result, Exception):
                research_results.extend(result)
        
        # Send to fact checker for verification
        fact_checker = await self.discover_agent("fact_checker")
        verified_results = await self.communicate(
            fact_checker,
            FactCheckMessage(
                content_id=message.content_id,
                claims=self.extract_claims(research_results)
            )
        )
        
        # Send to editor for content creation
        editor_agent = await self.discover_agent("editor_agent")
        await self.communicate(
            editor_agent,
            ResearchDataMessage(
                content_id=message.content_id,
                research=research_results,
                verified_facts=verified_results.verified_claims
            )
        )
        
        return ResearchCompleteMessage(
            content_id=message.content_id,
            research_summary=self.summarize_research(research_results)
        )

Collaborative Editing with Real-time Communication

class EditorAgent(Agent):
    def __init__(self):
        super().__init__("editor_agent")
        self.active_documents = {}
        
    @message_handler(ResearchDataMessage)
    async def create_content(self, message: ResearchDataMessage):
        # Create initial draft
        draft = await self.generate_draft(
            message.research,
            message.verified_facts
        )
        
        # Store for collaborative editing
        self.active_documents[message.content_id] = draft
        
        # Request SEO optimization
        seo_agent = await self.discover_agent("seo_agent")
        seo_suggestions = await self.communicate(
            seo_agent,
            SEOAnalysisMessage(
                content_id=message.content_id,
                content=draft.content,
                target_keywords=draft.keywords
            )
        )
        
        # Apply SEO improvements
        optimized_draft = await self.apply_seo_suggestions(
            draft, 
            seo_suggestions.suggestions
        )
        
        # Send for final review
        await self.send_for_review(message.content_id, optimized_draft)
        
        return ContentDraftMessage(
            content_id=message.content_id,
            draft=optimized_draft
        )
    
    async def apply_seo_suggestions(self, draft, suggestions):
        # Apply SEO improvements while maintaining content quality
        for suggestion in suggestions:
            if suggestion.type == "keyword_density":
                draft.content = await self.optimize_keyword_density(
                    draft.content,
                    suggestion.keyword,
                    suggestion.target_density
                )
            elif suggestion.type == "meta_description":
                draft.meta_description = suggestion.content
            elif suggestion.type == "heading_structure":
                draft.content = await self.improve_heading_structure(
                    draft.content,
                    suggestion.structure
                )
        
        return draft

Project 3: Real-time Trading System

System Overview

A sophisticated trading system where multiple agents work together to analyze markets, make trading decisions, and manage risk in real-time.

Agent Network

    Market Data → ┌─────────────────┐ → Trading Signals
                  │   Analysis      │
                  │   Agent         │
                  └─────────────────┘
                           ↑
                           │ A2A Communication
                           ↓
┌─────────────────┐ ← → ┌─────────────────┐ ← → ┌─────────────────┐
│   Risk          │     │   Portfolio     │     │   Execution     │
│   Manager       │     │   Manager       │     │   Agent         │
└─────────────────┘     └─────────────────┘     └─────────────────┘

Real-time Market Analysis

class MarketAnalysisAgent(Agent):
    def __init__(self):
        super().__init__("market_analysis")
        self.data_streams = []
        
    async def on_start(self):
        # Setup real-time data streams
        await self.setup_market_data_streams()
        
        # Start continuous analysis
        asyncio.create_task(self.continuous_analysis())
        
    async def continuous_analysis(self):
        while True:
            try:
                # Get latest market data
                market_data = await self.get_latest_data()
                
                # Perform technical analysis
                signals = await self.analyze_market_data(market_data)
                
                if signals:
                    # Broadcast signals to portfolio manager
                    portfolio_manager = await self.discover_agent("portfolio_manager")
                    await self.communicate(
                        portfolio_manager,
                        TradingSignalMessage(
                            signals=signals,
                            confidence=self.calculate_confidence(signals),
                            timestamp=datetime.now()
                        )
                    )
                
                await asyncio.sleep(1)  # 1-second analysis interval
                
            except Exception as e:
                self.logger.error(f"Analysis error: {e}")
                await asyncio.sleep(5)  # Back off on error

Risk-Aware Portfolio Management

class PortfolioManagerAgent(Agent):
    def __init__(self):
        super().__init__("portfolio_manager")
        self.current_positions = {}
        self.risk_limits = {}
        
    @message_handler(TradingSignalMessage)
    async def evaluate_signal(self, message: TradingSignalMessage):
        for signal in message.signals:
            # Check with risk manager
            risk_manager = await self.discover_agent("risk_manager")
            risk_assessment = await self.communicate(
                risk_manager,
                RiskAssessmentMessage(
                    signal=signal,
                    current_portfolio=self.current_positions,
                    proposed_position=signal.position_size
                )
            )
            
            if risk_assessment.approved:
                # Send execution order
                execution_agent = await self.discover_agent("execution_agent")
                await self.communicate(
                    execution_agent,
                    ExecutionOrderMessage(
                        symbol=signal.symbol,
                        side=signal.side,
                        quantity=risk_assessment.adjusted_quantity,
                        order_type=signal.order_type,
                        risk_parameters=risk_assessment.risk_params
                    )
                )
                
                # Update portfolio tracking
                await self.update_portfolio_tracking(signal, risk_assessment)

Project Communication Patterns

MCP Integration Patterns

1. External Data Source Integration

# Weather data for trading algorithms
class WeatherDataMCPServer(MCPServer):
    @tool("get_weather_impact")
    async def get_weather_impact(self, commodity: str, region: str) -> dict:
        weather_data = await self.fetch_weather_data(region)
        impact_analysis = await self.analyze_commodity_impact(commodity, weather_data)
        
        return {
            "commodity": commodity,
            "region": region,
            "weather_conditions": weather_data,
            "price_impact_forecast": impact_analysis.price_impact,
            "confidence": impact_analysis.confidence
        }

2. Tool Ecosystem Integration

# Financial data tools via MCP
class FinancialDataTools:
    def __init__(self, mcp_client: MCPClient):
        self.client = mcp_client
        
    async def get_company_fundamentals(self, symbol: str):
        return await self.client.call_tool(
            "get_fundamentals",
            {"symbol": symbol}
        )
    
    async def get_economic_indicators(self, indicators: list):
        return await self.client.call_tool(
            "get_economic_data",
            {"indicators": indicators}
        )

A2A Communication Patterns

1. Event-Driven Architecture

class EventBus:
    def __init__(self):
        self.subscribers = defaultdict(list)
        
    def subscribe(self, event_type: str, agent: Agent):
        self.subscribers[event_type].append(agent)
        
    async def publish(self, event_type: str, event_data: dict):
        for agent in self.subscribers[event_type]:
            await agent.handle_event(event_type, event_data)

# Usage in agents
@agent.event_handler("market_volatility_spike")
async def handle_volatility_spike(self, event_data):
    # Adjust risk parameters
    await self.adjust_risk_limits(event_data["volatility_level"])

2. Consensus and Coordination

class ConsensusProtocol:
    async def reach_consensus(self, agents: List[Agent], decision_data: dict):
        votes = []
        
        # Collect votes from all agents
        for agent in agents:
            vote = await agent.vote_on_decision(decision_data)
            votes.append(vote)
        
        # Calculate consensus
        consensus = self.calculate_consensus(votes)
        
        # Notify agents of result
        for agent in agents:
            await agent.consensus_reached(consensus)
            
        return consensus

Conclusion & Next Steps

Key Takeaways

1. Paradigm Shift

  • From Objects to Agents: Moving beyond passive objects to autonomous, intelligent entities
  • From Sequential to Concurrent: Embracing parallel, asynchronous processing
  • From Deterministic to Adaptive: Building systems that learn and evolve

2. Communication is Critical

  • MCP: Standardizes AI-tool integration, enabling context-aware applications
  • A2A: Enables sophisticated agent coordination and collaboration
  • Protocol Selection: Choose the right communication pattern for your use case

3. Framework Ecosystem

  • ADK: Production-ready, enterprise-grade agent systems
  • LangChain: Rapid development with extensive LLM integration
  • LangGraph: Complex workflows with visual representation
  • Emerging Tools: Active ecosystem with specialized solutions

4. Real-World Applications

  • Customer Service: Intelligent, context-aware support systems
  • Content Management: Collaborative content creation and optimization
  • Financial Trading: Real-time, risk-aware decision making
  • Enterprise Automation: Intelligent process orchestration

Best Practices for Agent Development

1. Design Principles

  • Single Responsibility: Each agent should have a clear, focused purpose
  • Loose Coupling: Minimize dependencies between agents
  • High Cohesion: Group related functionality within agents
  • Idempotency: Ensure operations can be safely retried

2. Communication Guidelines

  • Async by Default: Use asynchronous communication patterns
  • Timeout Handling: Always set timeouts for agent communications
  • Error Recovery: Implement graceful degradation and retry logic
  • Message Versioning: Plan for message format evolution

3. Testing Strategies

  • Unit Testing: Test individual agent logic
  • Integration Testing: Test agent interactions
  • Simulation Testing: Test with synthetic data and scenarios
  • Chaos Engineering: Test resilience under failure conditions

4. Monitoring and Observability

  • Structured Logging: Use consistent, searchable log formats
  • Metrics Collection: Track performance and business metrics
  • Distributed Tracing: Understand request flows across agents
  • Alerting: Proactive notification of issues

Next Steps for Continued Learning

1. Hands-On Projects

  • Build the customer service system from our examples
  • Create a simple trading bot with risk management
  • Develop a content generation pipeline

2. Advanced Topics

  • Agent Learning: Implement reinforcement learning in agents
  • Formal Verification: Ensure agent behavior correctness
  • Blockchain Integration: Decentralized agent coordination
  • Edge Computing: Deploying agents on edge devices

3. Community and Resources

  • GitHub Repositories: Explore open-source agent projects
  • Research Papers: Stay current with academic research
  • Conferences: Attend agent and AI conferences
  • Online Communities: Join agent development forums

4. Technology Evolution

  • New Frameworks: Keep track of emerging frameworks
  • Standards: Follow MCP and other protocol developments
  • Hardware: Understand impact of new hardware (GPUs, quantum)
  • Regulations: Stay informed about AI and agent regulations

Questions for Further Exploration

  1. How might agents change software architecture patterns?
  2. What security challenges are unique to agent systems?
  3. How do we ensure agent decisions are explainable?
  4. What role will agents play in future software development?
  5. How do we balance agent autonomy with human control?

Resources and References

Documentation and Tutorials

Code Examples and Templates


Thank you for attending "Agents: The Next Level of Software"

Questions? Let's discuss the future of agent-oriented programming!