| layout | default |
|---|---|
| title | Chapter 7: Advanced Agent Patterns |
| parent | AgentGPT Tutorial |
| nav_order | 7 |
Welcome to Chapter 7: Advanced Agent Patterns. In this part of AgentGPT Tutorial: Building Autonomous AI Agents, you will build an intuitive mental model first, then move into concrete implementation details and practical production tradeoffs.
This chapter explores advanced patterns for building sophisticated multi-agent systems and complex agent interactions.
// Orchestrate multiple agents working together
class AgentOrchestrator {
private agents: Map<string, Agent> = new Map()
private workflows: Map<string, Workflow> = new Map()
registerAgent(agent: Agent) {
this.agents.set(agent.id, agent)
}
async executeWorkflow(workflowId: string, input: any): Promise<WorkflowResult> {
const workflow = this.workflows.get(workflowId)
if (!workflow) {
throw new Error(`Workflow ${workflowId} not found`)
}
const context = {
input,
results: new Map(),
agents: this.agents
}
try {
await this.executeSteps(workflow.steps, context)
return {
success: true,
results: Object.fromEntries(context.results),
executionTime: Date.now() - context.startTime
}
} catch (error) {
return {
success: false,
error: error.message,
partialResults: Object.fromEntries(context.results)
}
}
}
private async executeSteps(steps: WorkflowStep[], context: WorkflowContext) {
for (const step of steps) {
const agent = context.agents.get(step.agentId)
if (!agent) {
throw new Error(`Agent ${step.agentId} not found`)
}
const stepInput = this.prepareStepInput(step, context)
const result = await agent.execute(stepInput)
context.results.set(step.id, result)
// Check conditions for next steps
if (!this.evaluateConditions(step.conditions, result)) {
break
}
}
}
private prepareStepInput(step: WorkflowStep, context: WorkflowContext): any {
if (typeof step.input === 'function') {
return step.input(context.results)
}
return step.input
}
private evaluateConditions(conditions: Condition[], result: any): boolean {
return conditions.every(condition => {
const value = this.getNestedValue(result, condition.field)
return this.evaluateCondition(value, condition.operator, condition.value)
})
}
}// Define communication protocols between agents
interface Message {
id: string
from: string
to: string
type: MessageType
payload: any
timestamp: Date
correlationId?: string
}
enum MessageType {
REQUEST = 'request',
RESPONSE = 'response',
NOTIFICATION = 'notification',
COMMAND = 'command',
EVENT = 'event'
}
class AgentCommunicator {
private messageQueue: Message[] = []
private subscriptions: Map<string, MessageHandler[]> = new Map()
async send(message: Omit<Message, 'id' | 'timestamp'>): Promise<void> {
const fullMessage: Message = {
...message,
id: this.generateId(),
timestamp: new Date()
}
// Handle local delivery
if (this.isLocalRecipient(message.to)) {
await this.deliverLocally(fullMessage)
} else {
// Handle remote delivery
await this.deliverRemotely(fullMessage)
}
}
subscribe(eventType: string, handler: MessageHandler): void {
if (!this.subscriptions.has(eventType)) {
this.subscriptions.set(eventType, [])
}
this.subscriptions.get(eventType)!.push(handler)
}
private async deliverLocally(message: Message): Promise<void> {
const handlers = this.subscriptions.get(message.type) || []
await Promise.all(handlers.map(handler => handler(message)))
}
private isLocalRecipient(recipient: string): boolean {
// Check if recipient is a local agent
return this.localAgents.has(recipient)
}
}// Implement saga pattern for complex transactions
class SagaOrchestrator {
private sagas: Map<string, Saga> = new Map()
async executeSaga(sagaId: string, initialData: any): Promise<SagaResult> {
const saga = this.sagas.get(sagaId)
if (!saga) {
throw new Error(`Saga ${sagaId} not found`)
}
const context = {
sagaId,
data: initialData,
completedSteps: [],
failedSteps: [],
compensations: []
}
try {
for (const step of saga.steps) {
const result = await this.executeStep(step, context)
context.completedSteps.push(step.id)
if (!result.success) {
await this.compensate(context)
return {
success: false,
completedSteps: context.completedSteps,
failedStep: step.id,
error: result.error
}
}
}
return {
success: true,
completedSteps: context.completedSteps,
result: context.data
}
} catch (error) {
await this.compensate(context)
throw error
}
}
private async compensate(context: SagaContext): Promise<void> {
// Execute compensation actions in reverse order
for (const stepId of context.completedSteps.reverse()) {
const compensation = context.compensations.find(c => c.stepId === stepId)
if (compensation) {
await compensation.action(context.data)
}
}
}
}// Implement event-driven agent interactions
class EventDrivenAgent {
private eventBus: EventBus
private handlers: Map<string, EventHandler> = new Map()
constructor(eventBus: EventBus) {
this.eventBus = eventBus
this.setupEventHandlers()
}
private setupEventHandlers() {
this.eventBus.subscribe('task.created', this.handleTaskCreated.bind(this))
this.eventBus.subscribe('task.completed', this.handleTaskCompleted.bind(this))
this.eventBus.subscribe('agent.available', this.handleAgentAvailable.bind(this))
}
async handleTaskCreated(event: TaskCreatedEvent) {
if (this.canHandleTask(event.task)) {
await this.claimTask(event.task.id)
await this.processTask(event.task)
}
}
async handleTaskCompleted(event: TaskCompletedEvent) {
// Update internal state
this.updateTaskStatus(event.taskId, 'completed')
// Trigger dependent tasks
await this.triggerDependentTasks(event.taskId)
}
async handleAgentAvailable(event: AgentAvailableEvent) {
// Check for pending tasks this agent can handle
const pendingTasks = await this.getPendingTasks(event.agent.capabilities)
for (const task of pendingTasks) {
await this.assignTask(task.id, event.agent.id)
}
}
private async triggerDependentTasks(completedTaskId: string) {
const dependentTasks = await this.getDependentTasks(completedTaskId)
for (const task of dependentTasks) {
const dependenciesMet = await this.checkDependencies(task.id)
if (dependenciesMet) {
await this.eventBus.publish({
type: 'task.ready',
taskId: task.id,
timestamp: new Date()
})
}
}
}
}// Define agent roles and capabilities
enum AgentRole {
COORDINATOR = 'coordinator',
EXECUTOR = 'executor',
ANALYST = 'analyst',
SPECIALIST = 'specialist',
SUPERVISOR = 'supervisor'
}
interface AgentRoleDefinition {
role: AgentRole
capabilities: string[]
responsibilities: string[]
authority: number
communication: CommunicationPattern[]
}
class RoleBasedAgentSystem {
private roleDefinitions: Map<AgentRole, AgentRoleDefinition> = new Map()
private agents: Map<string, AgentWithRole> = new Map()
defineRole(role: AgentRole, definition: AgentRoleDefinition) {
this.roleDefinitions.set(role, definition)
}
assignRole(agentId: string, role: AgentRole) {
const agent = this.agents.get(agentId)
const roleDef = this.roleDefinitions.get(role)
if (agent && roleDef) {
agent.role = role
agent.capabilities = roleDef.capabilities
agent.authority = roleDef.authority
}
}
async delegateTask(task: Task, fromRole: AgentRole, toRole: AgentRole): Promise<TaskResult> {
const fromRoleDef = this.roleDefinitions.get(fromRole)
const toRoleDef = this.roleDefinitions.get(toRole)
// Check if delegation is authorized
if (fromRoleDef!.authority < toRoleDef!.authority) {
throw new Error('Insufficient authority for delegation')
}
// Find suitable agent for target role
const targetAgent = await this.findAgentByRole(toRole, task.requirements)
return await targetAgent.execute(task)
}
private async findAgentByRole(role: AgentRole, requirements: string[]): Promise<AgentWithRole> {
const candidates = Array.from(this.agents.values())
.filter(agent => agent.role === role)
// Find best match based on capabilities and availability
return candidates.find(agent =>
requirements.every(req => agent.capabilities.includes(req))
) || candidates[0]
}
}// Implement hierarchical agent structures
class HierarchicalAgentManager {
private hierarchy: Map<string, AgentHierarchyNode> = new Map()
createHierarchy(rootAgentId: string) {
this.hierarchy.set(rootAgentId, {
agentId: rootAgentId,
parent: null,
children: [],
level: 0
})
}
addSubordinate(managerId: string, subordinateId: string) {
const manager = this.hierarchy.get(managerId)
const subordinate = this.hierarchy.get(subordinateId)
if (manager && subordinate) {
subordinate.parent = managerId
subordinate.level = manager.level + 1
manager.children.push(subordinateId)
}
}
async delegateDown(agentId: string, task: Task): Promise<TaskResult> {
const agent = this.hierarchy.get(agentId)
if (agent && agent.children.length > 0) {
// Find best subordinate for task
const subordinateId = await this.selectSubordinate(agent.children, task)
const subordinate = this.agents.get(subordinateId)
return await subordinate.execute(task)
}
// Execute task directly if no subordinates
return await this.agents.get(agentId)!.execute(task)
}
async escalateUp(agentId: string, issue: Issue): Promise<Resolution> {
const agent = this.hierarchy.get(agentId)
if (agent && agent.parent) {
const parent = this.agents.get(agent.parent)
return await parent.handleEscalation(issue)
}
// Handle at top level
return await this.handleTopLevelIssue(issue)
}
private async selectSubordinate(subordinates: string[], task: Task): Promise<string> {
// Select subordinate based on task requirements and agent capabilities
const subordinateScores = await Promise.all(
subordinates.map(async subId => {
const subordinate = this.agents.get(subId)
const score = await this.calculateSuitabilityScore(subordinate, task)
return { id: subId, score }
})
)
return subordinateScores.sort((a, b) => b.score - a.score)[0].id
}
}// Dynamically compose agents from components
class AgentAssembler {
private components: Map<string, AgentComponent> = new Map()
registerComponent(component: AgentComponent) {
this.components.set(component.id, component)
}
async assembleAgent(specification: AgentSpec): Promise<ComposedAgent> {
const components = []
for (const componentId of specification.componentIds) {
const component = this.components.get(componentId)
if (component) {
components.push(await this.instantiateComponent(component))
}
}
return new ComposedAgent(components, specification.config)
}
private async instantiateComponent(component: AgentComponent): Promise<AgentComponentInstance> {
// Initialize component with dependencies
const dependencies = await this.resolveDependencies(component.dependencies)
return component.factory(dependencies)
}
private async resolveDependencies(dependencyIds: string[]): Promise<Map<string, any>> {
const dependencies = new Map()
for (const depId of dependencyIds) {
const dependency = this.components.get(depId)
if (dependency) {
dependencies.set(depId, await this.instantiateComponent(dependency))
}
}
return dependencies
}
}// Implement self-organizing agent systems
class SelfOrganizingSystem {
private agents: AdaptiveAgent[] = []
private performanceHistory: Map<string, number[]> = new Map()
async optimizeOrganization(): Promise<void> {
// Analyze current performance
const performance = await this.analyzePerformance()
// Identify bottlenecks and opportunities
const issues = this.identifyIssues(performance)
// Generate reorganization plan
const plan = await this.generateReorganizationPlan(issues)
// Execute reorganization
await this.executeReorganization(plan)
// Validate improvements
await this.validateImprovements()
}
private async analyzePerformance(): Promise<SystemPerformance> {
const agentPerformances = await Promise.all(
this.agents.map(async agent => ({
agentId: agent.id,
performance: await agent.getPerformanceMetrics(),
workload: await agent.getWorkload(),
capabilities: agent.capabilities
}))
)
return {
overall: this.calculateOverallPerformance(agentPerformances),
byAgent: agentPerformances,
bottlenecks: this.identifyBottlenecks(agentPerformances),
opportunities: this.identifyOpportunities(agentPerformances)
}
}
private identifyIssues(performance: SystemPerformance): SystemIssue[] {
const issues = []
// Check for overloaded agents
for (const agentPerf of performance.byAgent) {
if (agentPerf.workload > 0.8) {
issues.push({
type: 'overload',
agentId: agentPerf.agentId,
severity: 'high'
})
}
}
// Check for underutilized agents
for (const agentPerf of performance.byAgent) {
if (agentPerf.workload < 0.3) {
issues.push({
type: 'underutilization',
agentId: agentPerf.agentId,
severity: 'medium'
})
}
}
return issues
}
private async generateReorganizationPlan(issues: SystemIssue[]): Promise<ReorganizationPlan> {
const plan = {
reassignments: [],
newAgents: [],
agentUpdates: []
}
for (const issue of issues) {
switch (issue.type) {
case 'overload':
plan.reassignments.push(await this.generateReassignment(issue.agentId))
break
case 'underutilization':
plan.reassignments.push(await this.generateAdditionalAssignment(issue.agentId))
break
}
}
return plan
}
}// Implement swarm intelligence patterns
class SwarmIntelligence {
private agents: SwarmAgent[] = []
async solveProblem(problem: Problem): Promise<Solution> {
// Initialize swarm
await this.initializeSwarm(problem)
// Execute swarm algorithm
for (let iteration = 0; iteration < this.maxIterations; iteration++) {
// Update agent positions
await this.updateAgentPositions()
// Share information
await this.shareInformation()
// Evaluate solutions
await this.evaluateSolutions()
// Check convergence
if (await this.hasConverged()) {
break
}
}
return await this.getBestSolution()
}
private async updateAgentPositions(): Promise<void> {
await Promise.all(
this.agents.map(async agent => {
const bestNeighbor = await this.findBestNeighbor(agent)
const newPosition = this.calculateNewPosition(agent, bestNeighbor)
agent.updatePosition(newPosition)
})
)
}
private async shareInformation(): Promise<void> {
const bestSolutions = this.agents
.map(agent => agent.getBestSolution())
.sort((a, b) => b.fitness - a.fitness)
// Share top solutions with all agents
for (const agent of this.agents) {
agent.receiveInformation(bestSolutions.slice(0, 3))
}
}
}// Implement evolutionary algorithms for agent improvement
class EvolutionaryAgentSystem {
private population: Agent[] = []
private generation = 0
async evolve(generations: number = 100): Promise<EvolutionResult> {
// Initialize population
await this.initializePopulation()
for (let gen = 0; gen < generations; gen++) {
// Evaluate fitness
const fitnessScores = await this.evaluatePopulation()
// Select parents
const parents = this.selectParents(fitnessScores)
// Create offspring
const offspring = await this.createOffspring(parents)
// Mutate
const mutatedOffspring = await this.mutatePopulation(offspring)
// Replace population
this.population = [...parents, ...mutatedOffspring]
this.generation++
// Check for convergence
if (this.hasConverged(fitnessScores)) {
break
}
}
return {
bestAgent: this.getBestAgent(),
generation: this.generation,
fitnessHistory: this.fitnessHistory
}
}
private async evaluatePopulation(): Promise<number[]> {
const fitnessPromises = this.population.map(async agent => {
const performance = await this.evaluateAgent(agent)
return this.calculateFitness(performance)
})
return Promise.all(fitnessPromises)
}
private selectParents(fitnessScores: number[]): Agent[] {
// Tournament selection
const parents = []
for (let i = 0; i < this.populationSize / 2; i++) {
const tournament = this.selectTournament(fitnessScores)
parents.push(this.population[tournament.winner])
}
return parents
}
private async createOffspring(parents: Agent[]): Promise<Agent[]> {
const offspring = []
for (let i = 0; i < parents.length; i += 2) {
const parent1 = parents[i]
const parent2 = parents[i + 1]
const child = await this.crossover(parent1, parent2)
offspring.push(child)
}
return offspring
}
}- ✅ Built multi-agent collaboration systems
- ✅ Implemented complex workflow patterns
- ✅ Created role-based agent organizations
- ✅ Developed dynamic agent composition
- ✅ Explored swarm intelligence patterns
- ✅ Built evolutionary agent systems
Key Takeaways:
- Multi-agent systems enable complex problem solving
- Different patterns suit different collaboration needs
- Hierarchical structures provide clear authority chains
- Self-organizing systems adapt to changing conditions
- Evolutionary algorithms enable continuous improvement
- Swarm intelligence handles distributed problem solving
Most teams struggle here because the hard part is not writing more code, but deciding clear boundaries for agent, Promise, agents so behavior stays predictable as complexity grows.
In practical terms, this chapter helps you avoid three common failures:
- coupling core logic too tightly to one implementation path
- missing the handoff boundaries between setup, execution, and validation
- shipping changes without clear rollback or observability strategy
After working through this chapter, you should be able to reason about Chapter 7: Advanced Agent Patterns as an operating subsystem inside AgentGPT Tutorial: Building Autonomous AI Agents, with explicit contracts for inputs, state transitions, and outputs.
Use the implementation notes around context, task, agentId as your checklist when adapting these patterns to your own repository.
Under the hood, Chapter 7: Advanced Agent Patterns usually follows a repeatable control path:
- Context bootstrap: initialize runtime config and prerequisites for
agent. - Input normalization: shape incoming data so
Promisereceives stable contracts. - Core execution: run the main logic branch and propagate intermediate state through
agents. - Policy and safety checks: enforce limits, auth scopes, and failure boundaries.
- Output composition: return canonical result payloads for downstream consumers.
- Operational telemetry: emit logs/metrics needed for debugging and performance tuning.
When debugging, walk this sequence in order and confirm each stage has explicit success/failure conditions.
Use the following upstream sources to verify implementation details while reading this chapter:
- View Repo
Why it matters: authoritative reference on
View Repo(github.com).
Suggested trace strategy:
- search upstream code for
agentandPromiseto map concrete implementation paths - compare docs claims against actual runtime/config code before reusing patterns in production