diff --git a/python/examples/README.md b/python/examples/README.md index dfdc6de24..bb226e4ac 100644 --- a/python/examples/README.md +++ b/python/examples/README.md @@ -59,9 +59,10 @@ This repository contains examples demonstrating the usage of the BeeAI Framework - [`decorator.py`](/python/examples/tools/decorator.py): Tool creation using decorator - [`duckduckgo.py`](/python/examples/tools/duckduckgo.py): DDG Search Tool for searching the web - [`openmeteo.py`](/python/examples/tools/openmeteo.py): Open-Meteo Tool for retrieving weather data +- [`custom/xquik.py`](/python/examples/tools/custom/xquik.py): Xquik API tool for searching X posts ## Serve - [`acp.py`](/python/examples/serve/acp.py): Expose agents as an [ACP](https://agentcommunicationprotocol.dev/how-to/wrap-existing-agent) server - [`acp_with_custom_agent.py`](/python/examples/serve/acp_with_custom_agent.py): Implement an ACP factory and expose custom agents as an ACP server -- [`mcp_tool.py`](/python/examples/serve/mcp_tool.py): Expose tools as [MCP](https://modelcontextprotocol.io/docs/concepts/tools) server \ No newline at end of file +- [`mcp_tool.py`](/python/examples/serve/mcp_tool.py): Expose tools as [MCP](https://modelcontextprotocol.io/docs/concepts/tools) server diff --git a/python/examples/tools/custom/xquik.py b/python/examples/tools/custom/xquik.py new file mode 100644 index 000000000..b9fbdfae0 --- /dev/null +++ b/python/examples/tools/custom/xquik.py @@ -0,0 +1,107 @@ +import asyncio +import json +import os +import sys +from typing import Any, Literal +from urllib.error import HTTPError, URLError +from urllib.parse import urlencode +from urllib.request import Request, urlopen + +from beeai_framework.context import RunContext +from beeai_framework.emitter import Emitter +from beeai_framework.errors import FrameworkError +from beeai_framework.tools import JSONToolOutput, Tool, ToolError, ToolRunOptions +from pydantic import BaseModel, Field + + +class XquikSearchTweetsToolInput(BaseModel): + query: str = Field(description="X search query with standard search operators.") + query_type: Literal["Latest", "Top"] = Field(default="Latest", description="Sort order.") + limit: int = Field(default=5, ge=1, le=20, description="Maximum tweets to return.") + + +class XquikSearchTweetsToolOutput(JSONToolOutput[dict[str, Any]]): + pass + + +def fetch_xquik_json(url: str, api_key: str) -> dict[str, Any]: + request = Request( + url, + headers={"Accept": "application/json", "x-api-key": api_key}, + ) + try: + with urlopen(request, timeout=30) as response: + body = response.read().decode("utf-8", errors="replace") + except HTTPError as exc: + detail = exc.read().decode("utf-8", errors="replace") + raise ToolError( + "Request to Xquik API failed.", + cause=RuntimeError(detail), + context={"status_code": exc.code}, + ) from exc + except URLError as exc: + raise ToolError("Could not connect to Xquik API.", cause=exc) from exc + + result = json.loads(body) + if not isinstance(result, dict): + raise ToolError("Xquik API returned an unexpected response shape.") + + return result + + +class XquikSearchTweetsTool( + Tool[XquikSearchTweetsToolInput, ToolRunOptions, XquikSearchTweetsToolOutput] +): + name = "XquikSearchTweets" + description = "Searches X posts through the Xquik REST API." + input_schema = XquikSearchTweetsToolInput + + def _create_emitter(self) -> Emitter: + return Emitter.root().child( + namespace=["tool", "example", "xquik"], + creator=self, + ) + + async def _run( + self, + tool_input: XquikSearchTweetsToolInput, + options: ToolRunOptions | None, + context: RunContext, + ) -> XquikSearchTweetsToolOutput: + api_key = os.getenv("XQUIK_API_KEY") + if not api_key: + raise ToolError("Set XQUIK_API_KEY before running the Xquik search example.") + + base_url = os.getenv("XQUIK_BASE_URL", "https://xquik.com/api/v1").rstrip("/") + params = urlencode( + { + "q": tool_input.query, + "queryType": tool_input.query_type, + "limit": tool_input.limit, + } + ) + result = await asyncio.to_thread(fetch_xquik_json, f"{base_url}/x/tweets/search?{params}", api_key) + + return XquikSearchTweetsToolOutput(result) + + +async def main() -> None: + if not os.getenv("XQUIK_API_KEY"): + print("Set XQUIK_API_KEY to run this example.") + return + + tool = XquikSearchTweetsTool() + result = await tool.run( + XquikSearchTweetsToolInput( + query="from:xquikcom", + limit=5, + ) + ) + print(result.get_text_content()) + + +if __name__ == "__main__": + try: + asyncio.run(main()) + except FrameworkError as e: + sys.exit(e.explain()) diff --git a/typescript/examples/README.md b/typescript/examples/README.md index f53fc60e5..d37c78ffa 100644 --- a/typescript/examples/README.md +++ b/typescript/examples/README.md @@ -118,6 +118,7 @@ This repository contains examples demonstrating the usage of the BeeAI Framework - [`base.ts`](/typescript/examples/tools/custom/base.ts): Custom tool base implementation - [`dynamic.ts`](/typescript/examples/tools/custom/dynamic.ts): Dynamic tool creation - [`openLibrary.ts`](/typescript/examples/tools/custom/openLibrary.ts): OpenLibrary API tool +- [`xquik.ts`](/typescript/examples/tools/custom/xquik.ts): Xquik API tool for searching X posts - [`python.ts`](/typescript/examples/tools/custom/python.ts): Python-based custom tool - [`langchain.ts`](/typescript/examples/tools/langchain.ts): LangChain tool integration diff --git a/typescript/examples/tools/custom/xquik.ts b/typescript/examples/tools/custom/xquik.ts new file mode 100644 index 000000000..377fbf54a --- /dev/null +++ b/typescript/examples/tools/custom/xquik.ts @@ -0,0 +1,58 @@ +import { DynamicTool, JSONToolOutput, ToolError } from "beeai-framework/tools/base"; +import { pathToFileURL } from "node:url"; +import { z } from "zod"; + +type XquikSearchTweetsResponse = Record; + +const getBaseUrl = () => + (process.env.XQUIK_BASE_URL ?? "https://xquik.com/api/v1").replace(/\/$/, ""); + +export const xquikSearchTweetsTool = new DynamicTool({ + name: "XquikSearchTweets", + description: "Searches X posts through the Xquik REST API.", + inputSchema: z.object({ + query: z.string().min(1).describe("X search query with standard search operators."), + queryType: z.enum(["Latest", "Top"]).default("Latest").describe("Sort order."), + limit: z.number().int().min(1).max(20).default(5).describe("Maximum tweets to return."), + }), + async handler(input, _options, run) { + const apiKey = process.env.XQUIK_API_KEY; + if (!apiKey) { + throw new ToolError("Set XQUIK_API_KEY before running the Xquik search example."); + } + + const url = new URL(`${getBaseUrl()}/x/tweets/search`); + url.searchParams.set("q", input.query); + url.searchParams.set("queryType", input.queryType); + url.searchParams.set("limit", input.limit.toString()); + + const response = await fetch(url, { + headers: { + "Accept": "application/json", + "x-api-key": apiKey, + }, + signal: run.signal, + }); + + if (!response.ok) { + throw new ToolError("Request to Xquik API failed.", [new Error(await response.text())], { + context: { statusCode: response.status }, + }); + } + + const result = (await response.json()) as XquikSearchTweetsResponse; + return new JSONToolOutput(result); + }, +}); + +const isDirectRun = import.meta.url === pathToFileURL(process.argv[1] ?? "").href; + +if (isDirectRun && !process.env.XQUIK_API_KEY) { + console.log("Set XQUIK_API_KEY to run this example."); +} else if (isDirectRun) { + const result = await xquikSearchTweetsTool.run({ + query: "from:xquikcom", + limit: 5, + }); + console.log(result.getTextContent()); +}