-
Notifications
You must be signed in to change notification settings - Fork 438
Expand file tree
/
Copy pathxquik.py
More file actions
107 lines (87 loc) · 3.32 KB
/
xquik.py
File metadata and controls
107 lines (87 loc) · 3.32 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
import asyncio
import json
import os
import sys
from typing import Any, Literal
from urllib.error import HTTPError, URLError
from urllib.parse import urlencode
from urllib.request import Request, urlopen
from beeai_framework.context import RunContext
from beeai_framework.emitter import Emitter
from beeai_framework.errors import FrameworkError
from beeai_framework.tools import JSONToolOutput, Tool, ToolError, ToolRunOptions
from pydantic import BaseModel, Field
class XquikSearchTweetsToolInput(BaseModel):
query: str = Field(description="X search query with standard search operators.")
query_type: Literal["Latest", "Top"] = Field(default="Latest", description="Sort order.")
limit: int = Field(default=5, ge=1, le=20, description="Maximum tweets to return.")
class XquikSearchTweetsToolOutput(JSONToolOutput[dict[str, Any]]):
pass
def fetch_xquik_json(url: str, api_key: str) -> dict[str, Any]:
request = Request(
url,
headers={"Accept": "application/json", "x-api-key": api_key},
)
try:
with urlopen(request, timeout=30) as response:
body = response.read().decode("utf-8", errors="replace")
except HTTPError as exc:
detail = exc.read().decode("utf-8", errors="replace")
raise ToolError(
"Request to Xquik API failed.",
cause=RuntimeError(detail),
context={"status_code": exc.code},
) from exc
except URLError as exc:
raise ToolError("Could not connect to Xquik API.", cause=exc) from exc
result = json.loads(body)
if not isinstance(result, dict):
raise ToolError("Xquik API returned an unexpected response shape.")
return result
class XquikSearchTweetsTool(
Tool[XquikSearchTweetsToolInput, ToolRunOptions, XquikSearchTweetsToolOutput]
):
name = "XquikSearchTweets"
description = "Searches X posts through the Xquik REST API."
input_schema = XquikSearchTweetsToolInput
def _create_emitter(self) -> Emitter:
return Emitter.root().child(
namespace=["tool", "example", "xquik"],
creator=self,
)
async def _run(
self,
tool_input: XquikSearchTweetsToolInput,
options: ToolRunOptions | None,
context: RunContext,
) -> XquikSearchTweetsToolOutput:
api_key = os.getenv("XQUIK_API_KEY")
if not api_key:
raise ToolError("Set XQUIK_API_KEY before running the Xquik search example.")
base_url = os.getenv("XQUIK_BASE_URL", "https://xquik.com/api/v1").rstrip("/")
params = urlencode(
{
"q": tool_input.query,
"queryType": tool_input.query_type,
"limit": tool_input.limit,
}
)
result = await asyncio.to_thread(fetch_xquik_json, f"{base_url}/x/tweets/search?{params}", api_key)
return XquikSearchTweetsToolOutput(result)
async def main() -> None:
if not os.getenv("XQUIK_API_KEY"):
print("Set XQUIK_API_KEY to run this example.")
return
tool = XquikSearchTweetsTool()
result = await tool.run(
XquikSearchTweetsToolInput(
query="from:xquikcom",
limit=5,
)
)
print(result.get_text_content())
if __name__ == "__main__":
try:
asyncio.run(main())
except FrameworkError as e:
sys.exit(e.explain())