Skip to content

Latest commit

 

History

History
552 lines (440 loc) · 19.8 KB

File metadata and controls

552 lines (440 loc) · 19.8 KB
layout default
title Chapter 4: Tool Integration
parent CrewAI Tutorial
nav_order 4

Chapter 4: Tool Integration

Welcome to Chapter 4: Tool Integration. In this part of CrewAI Tutorial: Building Collaborative AI Agent Teams, you will build an intuitive mental model first, then move into concrete implementation details and practical production tradeoffs.

Tools are the extensions that give AI agents superpowers. This chapter explores how to integrate external tools, APIs, and services into your CrewAI agents to expand their capabilities and enable real-world interactions.

Tool Integration Fundamentals

Tool Architecture

from crewai import Agent, Tool
from typing import Any, Dict, List, Optional

class CrewTool(Tool):
    def __init__(self,
                 name: str,
                 description: str,
                 parameters: Dict[str, Any],
                 execute_function: callable,
                 required_permissions: Optional[List[str]] = None):

        super().__init__(
            name=name,
            description=description,
            parameters=parameters
        )

        self.execute_function = execute_function
        self.required_permissions = required_permissions or []
        self.usage_count = 0
        self.success_rate = 1.0
        self.average_execution_time = 0

    def _execute(self, **kwargs) -> Any:
        """Execute the tool with given parameters"""
        start_time = time.time()

        try:
            result = self.execute_function(**kwargs)
            self._record_success(time.time() - start_time)
            return result
        except Exception as e:
            self._record_failure()
            raise e

    def _record_success(self, execution_time: float):
        """Record successful execution"""
        self.usage_count += 1
        self.success_rate = ((self.success_rate * (self.usage_count - 1)) + 1) / self.usage_count
        self.average_execution_time = ((self.average_execution_time * (self.usage_count - 1)) + execution_time) / self.usage_count

    def _record_failure(self):
        """Record failed execution"""
        self.usage_count += 1
        self.success_rate = (self.success_rate * (self.usage_count - 1)) / self.usage_count

Tool Registry

class ToolRegistry:
    def __init__(self):
        self.tools: Dict[str, CrewTool] = {}
        self.categories: Dict[str, List[str]] = {}

    def register_tool(self, tool: CrewTool, category: str = "general"):
        """Register a tool in the registry"""
        self.tools[tool.name] = tool

        if category not in self.categories:
            self.categories[category] = []
        self.categories[category].append(tool.name)

    def get_tool(self, name: str) -> Optional[CrewTool]:
        """Get tool by name"""
        return self.tools.get(name)

    def get_tools_by_category(self, category: str) -> List[CrewTool]:
        """Get all tools in a category"""
        tool_names = self.categories.get(category, [])
        return [self.tools[name] for name in tool_names if name in self.tools]

    def get_all_tools(self) -> List[CrewTool]:
        """Get all registered tools"""
        return list(self.tools.values())

    def get_tool_stats(self) -> Dict[str, Any]:
        """Get usage statistics for all tools"""
        return {
            name: {
                "usage_count": tool.usage_count,
                "success_rate": tool.success_rate,
                "average_time": tool.average_execution_time
            }
            for name, tool in self.tools.items()
        }

Built-in Tool Categories

Web and API Tools

# Web Search Tool
def create_web_search_tool(api_key: str) -> CrewTool:
    def search_web(query: str, num_results: int = 5) -> List[Dict[str, str]]:
        # Implementation using search API
        return search_api.search(query, num_results)

    return CrewTool(
        name="web_search",
        description="Search the web for information",
        parameters={
            "query": {"type": "string", "description": "Search query"},
            "num_results": {"type": "integer", "description": "Number of results", "default": 5}
        },
        execute_function=search_web
    )

# API Request Tool
def create_api_tool(base_url: str, headers: Dict[str, str]) -> CrewTool:
    def make_api_request(endpoint: str, method: str = "GET", data: Dict = None) -> Dict:
        url = f"{base_url}/{endpoint}"
        response = requests.request(method, url, headers=headers, json=data)
        return response.json()

    return CrewTool(
        name="api_request",
        description="Make HTTP requests to APIs",
        parameters={
            "endpoint": {"type": "string", "description": "API endpoint"},
            "method": {"type": "string", "description": "HTTP method", "default": "GET"},
            "data": {"type": "object", "description": "Request data", "required": False}
        },
        execute_function=make_api_request
    )

Data Processing Tools

# Data Analysis Tool
def create_data_analysis_tool() -> CrewTool:
    def analyze_data(data: List[Dict], operation: str) -> Dict:
        df = pd.DataFrame(data)

        if operation == "summary":
            return df.describe().to_dict()
        elif operation == "correlation":
            return df.corr().to_dict()
        elif operation == "trends":
            return {col: df[col].rolling(window=7).mean().tolist() for col in df.columns}

        return {"error": "Unknown operation"}

    return CrewTool(
        name="data_analysis",
        description="Analyze datasets with various operations",
        parameters={
            "data": {"type": "array", "description": "Data to analyze"},
            "operation": {"type": "string", "description": "Analysis operation"}
        },
        execute_function=analyze_data
    )

# File Operations Tool
def create_file_operations_tool(allowed_paths: List[str]) -> CrewTool:
    def file_operation(operation: str, path: str, content: str = None) -> Dict:
        if not any(path.startswith(allowed) for allowed in allowed_paths):
            raise ValueError("Access denied to this path")

        if operation == "read":
            with open(path, 'r') as f:
                return {"content": f.read()}
        elif operation == "write":
            with open(path, 'w') as f:
                f.write(content)
            return {"success": True}
        elif operation == "list":
            return {"files": os.listdir(path)}

        return {"error": "Unknown operation"}

    return CrewTool(
        name="file_operations",
        description="Perform file system operations",
        parameters={
            "operation": {"type": "string", "description": "File operation"},
            "path": {"type": "string", "description": "File path"},
            "content": {"type": "string", "description": "Content for write operations", "required": False}
        },
        execute_function=file_operation,
        required_permissions=["file_access"]
    )

Custom Tool Development

Tool Development Framework

class CustomToolBuilder:
    def __init__(self):
        self.templates = {
            "api_tool": self._create_api_tool_template,
            "data_tool": self._create_data_tool_template,
            "integration_tool": self._create_integration_tool_template
        }

    def create_custom_tool(self, tool_type: str, config: Dict[str, Any]) -> CrewTool:
        """Create a custom tool from configuration"""
        if tool_type not in self.templates:
            raise ValueError(f"Unknown tool type: {tool_type}")

        return self.templates[tool_type](config)

    def _create_api_tool_template(self, config: Dict[str, Any]) -> CrewTool:
        """Create API tool from configuration"""
        def api_call(endpoint: str, method: str = "GET", **kwargs):
            base_url = config["base_url"]
            headers = config.get("headers", {})

            url = f"{base_url}/{endpoint}"
            response = requests.request(method, url, headers=headers, **kwargs)
            return response.json()

        return CrewTool(
            name=config["name"],
            description=config["description"],
            parameters=config["parameters"],
            execute_function=api_call
        )

    def _create_data_tool_template(self, config: Dict[str, Any]) -> CrewTool:
        """Create data processing tool"""
        def process_data(data: Any, operation: str):
            # Custom data processing logic
            if operation == "transform":
                return self._transform_data(data, config.get("transform_rules", {}))
            elif operation == "validate":
                return self._validate_data(data, config.get("validation_rules", {}))
            elif operation == "aggregate":
                return self._aggregate_data(data, config.get("aggregation_rules", {}))

        return CrewTool(
            name=config["name"],
            description=config["description"],
            parameters=config["parameters"],
            execute_function=process_data
        )

Tool Testing Framework

class ToolTester:
    def __init__(self):
        self.test_cases = {}
        self.test_results = {}

    def add_test_case(self, tool_name: str, test_case: Dict[str, Any]):
        """Add test case for a tool"""
        if tool_name not in self.test_cases:
            self.test_cases[tool_name] = []
        self.test_cases[tool_name].append(test_case)

    def run_tests(self, tool: CrewTool) -> Dict[str, Any]:
        """Run all test cases for a tool"""
        test_results = []
        passed = 0
        failed = 0

        for test_case in self.test_cases.get(tool.name, []):
            try:
                result = tool._execute(**test_case["inputs"])
                expected = test_case["expected"]

                if self._compare_results(result, expected):
                    test_results.append({"status": "passed", "test": test_case["name"]})
                    passed += 1
                else:
                    test_results.append({
                        "status": "failed",
                        "test": test_case["name"],
                        "expected": expected,
                        "actual": result
                    })
                    failed += 1
            except Exception as e:
                test_results.append({
                    "status": "error",
                    "test": test_case["name"],
                    "error": str(e)
                })
                failed += 1

        return {
            "tool": tool.name,
            "passed": passed,
            "failed": failed,
            "total": passed + failed,
            "success_rate": passed / (passed + failed) if (passed + failed) > 0 else 0,
            "results": test_results
        }

    def _compare_results(self, actual: Any, expected: Any) -> bool:
        """Compare actual vs expected results"""
        if isinstance(expected, dict) and "type" in expected:
            if expected["type"] == "contains":
                return expected["value"] in str(actual)
            elif expected["type"] == "regex":
                import re
                return bool(re.search(expected["pattern"], str(actual)))

        return actual == expected

Tool Integration Patterns

Agent Tool Assignment

class ToolManager:
    def __init__(self, tool_registry: ToolRegistry):
        self.tool_registry = tool_registry
        self.agent_tools = {}

    def assign_tools_to_agent(self, agent: Agent, tool_names: List[str]):
        """Assign tools to an agent"""
        tools = []
        for tool_name in tool_names:
            tool = self.tool_registry.get_tool(tool_name)
            if tool:
                tools.append(tool)

        self.agent_tools[agent.id] = tools
        agent.tools = tools

    def recommend_tools_for_agent(self, agent: Agent, task_description: str) -> List[str]:
        """Recommend tools for agent based on task"""
        # Analyze task requirements
        task_keywords = self._extract_keywords(task_description)

        # Find relevant tools
        relevant_tools = []
        for tool in self.tool_registry.get_all_tools():
            if self._tool_matches_keywords(tool, task_keywords):
                relevant_tools.append(tool.name)

        return relevant_tools

    def _extract_keywords(self, text: str) -> List[str]:
        """Extract keywords from task description"""
        # Simple keyword extraction (could be enhanced with NLP)
        keywords = []
        common_keywords = {
            "search": ["search", "find", "lookup", "query"],
            "api": ["api", "request", "call", "endpoint"],
            "data": ["analyze", "process", "transform", "calculate"],
            "file": ["read", "write", "save", "load"]
        }

        for category, words in common_keywords.items():
            if any(word in text.lower() for word in words):
                keywords.append(category)

        return keywords

    def _tool_matches_keywords(self, tool: CrewTool, keywords: List[str]) -> bool:
        """Check if tool matches task keywords"""
        tool_desc = tool.description.lower()
        return any(keyword in tool_desc for keyword in keywords)

Tool Chain Execution

class ToolChain:
    def __init__(self):
        self.tools = []
        self.execution_order = []

    def add_tool(self, tool: CrewTool, order: int = None):
        """Add tool to chain"""
        self.tools.append(tool)
        if order is not None:
            self.execution_order.append((order, tool.name))

    def execute_chain(self, initial_input: Any) -> Any:
        """Execute tools in chain"""
        current_input = initial_input
        results = []

        # Sort tools by execution order
        sorted_tools = self._sort_tools_by_order()

        for tool in sorted_tools:
            try:
                result = tool._execute(input=current_input)
                results.append({"tool": tool.name, "result": result})
                current_input = result  # Pass result to next tool
            except Exception as e:
                results.append({"tool": tool.name, "error": str(e)})
                break

        return {
            "final_result": current_input,
            "execution_chain": results
        }

    def _sort_tools_by_order(self) -> List[CrewTool]:
        """Sort tools by execution order"""
        if not self.execution_order:
            return self.tools

        ordered_tools = []
        tool_map = {tool.name: tool for tool in self.tools}

        sorted_order = sorted(self.execution_order, key=lambda x: x[0])
        for _, tool_name in sorted_order:
            if tool_name in tool_map:
                ordered_tools.append(tool_map[tool_name])

        return ordered_tools

Advanced Tool Features

Tool Performance Monitoring

class ToolPerformanceMonitor:
    def __init__(self, tool_registry: ToolRegistry):
        self.tool_registry = tool_registry
        self.performance_metrics = {}

    def monitor_tool_performance(self):
        """Monitor and analyze tool performance"""
        for tool in self.tool_registry.get_all_tools():
            metrics = self._calculate_metrics(tool)
            self.performance_metrics[tool.name] = metrics

            # Identify performance issues
            if metrics["success_rate"] < 0.8:
                self._flag_performance_issue(tool, "low_success_rate")
            if metrics["average_time"] > 30:  # 30 seconds
                self._flag_performance_issue(tool, "slow_execution")

    def _calculate_metrics(self, tool: CrewTool) -> Dict[str, float]:
        """Calculate performance metrics for a tool"""
        return {
            "usage_count": tool.usage_count,
            "success_rate": tool.success_rate,
            "average_execution_time": tool.average_execution_time,
            "error_rate": 1 - tool.success_rate
        }

    def get_performance_report(self) -> Dict[str, Any]:
        """Generate comprehensive performance report"""
        report = {
            "overall_stats": self._calculate_overall_stats(),
            "tool_performance": self.performance_metrics,
            "recommendations": self._generate_recommendations()
        }

        return report

    def _generate_recommendations(self) -> List[str]:
        """Generate performance improvement recommendations"""
        recommendations = []

        for tool_name, metrics in self.performance_metrics.items():
            if metrics["success_rate"] < 0.8:
                recommendations.append(f"Improve error handling for {tool_name}")
            if metrics["average_execution_time"] > 30:
                recommendations.append(f"Optimize performance for {tool_name}")

        return recommendations

What We've Accomplished

Understood tool integration fundamentals and architecture ✅ Built custom tools with proper error handling and monitoring ✅ Created tool registry and management systems ✅ Implemented tool testing and validation frameworks ✅ Developed tool assignment and recommendation systems ✅ Established tool chains for complex workflows ✅ Set up performance monitoring and optimization

Next Steps

Ready to manage crew communication? In Chapter 5: Crew Communication, we'll explore how agents communicate, share information, and coordinate their activities.


Key Takeaway: Tools are the bridge between AI agents and the real world. Well-designed tools with proper error handling, monitoring, and integration enable agents to accomplish tasks that would otherwise be impossible.

What Problem Does This Solve?

Most teams struggle here because the hard part is not writing more code, but deciding clear boundaries for self, tool, name so behavior stays predictable as complexity grows.

In practical terms, this chapter helps you avoid three common failures:

  • coupling core logic too tightly to one implementation path
  • missing the handoff boundaries between setup, execution, and validation
  • shipping changes without clear rollback or observability strategy

After working through this chapter, you should be able to reason about Chapter 4: Tool Integration as an operating subsystem inside CrewAI Tutorial: Building Collaborative AI Agent Teams, with explicit contracts for inputs, state transitions, and outputs.

Use the implementation notes around tools, CrewTool, description as your checklist when adapting these patterns to your own repository.

How it Works Under the Hood

Under the hood, Chapter 4: Tool Integration usually follows a repeatable control path:

  1. Context bootstrap: initialize runtime config and prerequisites for self.
  2. Input normalization: shape incoming data so tool receives stable contracts.
  3. Core execution: run the main logic branch and propagate intermediate state through name.
  4. Policy and safety checks: enforce limits, auth scopes, and failure boundaries.
  5. Output composition: return canonical result payloads for downstream consumers.
  6. Operational telemetry: emit logs/metrics needed for debugging and performance tuning.

When debugging, walk this sequence in order and confirm each stage has explicit success/failure conditions.

Source Walkthrough

Use the following upstream sources to verify implementation details while reading this chapter:

  • View Repo Why it matters: authoritative reference on View Repo (github.com).

Suggested trace strategy:

  • search upstream code for self and tool to map concrete implementation paths
  • compare docs claims against actual runtime/config code before reusing patterns in production

Chapter Connections