From a76731ce242372a20b94b63d5650982008b33b9c Mon Sep 17 00:00:00 2001 From: Dyn Date: Sat, 14 Mar 2026 13:36:00 +0000 Subject: [PATCH 1/2] Add WritBase integration: persistent task backend for CrewAI agents Co-Authored-By: Claude Opus 4.6 (1M context) --- integrations/writbase/.env.example | 3 + integrations/writbase/.gitignore | 4 + integrations/writbase/README.md | 111 ++++++++++++++++ integrations/writbase/main.py | 177 +++++++++++++++++++++++++ integrations/writbase/requirements.txt | 3 + 5 files changed, 298 insertions(+) create mode 100644 integrations/writbase/.env.example create mode 100644 integrations/writbase/.gitignore create mode 100644 integrations/writbase/README.md create mode 100644 integrations/writbase/main.py create mode 100644 integrations/writbase/requirements.txt diff --git a/integrations/writbase/.env.example b/integrations/writbase/.env.example new file mode 100644 index 00000000..16f26edb --- /dev/null +++ b/integrations/writbase/.env.example @@ -0,0 +1,3 @@ +WRITBASE_URL=https://your-project.supabase.co +WRITBASE_AGENT_KEY=wb_keyid_secret +OPENAI_API_KEY=sk-... diff --git a/integrations/writbase/.gitignore b/integrations/writbase/.gitignore new file mode 100644 index 00000000..fc36f7d8 --- /dev/null +++ b/integrations/writbase/.gitignore @@ -0,0 +1,4 @@ +.env +__pycache__/ +*.pyc +.venv/ diff --git a/integrations/writbase/README.md b/integrations/writbase/README.md new file mode 100644 index 00000000..422759c9 --- /dev/null +++ b/integrations/writbase/README.md @@ -0,0 +1,111 @@ +# WritBase Task Backend for CrewAI + +## Introduction + +Use [WritBase](https://github.com/Writbase/writbase) as a persistent, shared task backend for CrewAI agents. Tasks survive crew restarts, are auditable, and can be shared across multiple crews. + +WritBase is an MCP-native task management system for AI agent fleets. It provides: + +- **Persistent tasks** — tasks stored in Postgres, not lost when a crew stops +- **Multi-agent coordination** — multiple crews can share a task queue +- **Full provenance** — every status change and note is logged with actor attribution +- **Agent permissions** — fine-grained control over what each agent can read, create, or update + +## Prerequisites + +- A Supabase project with WritBase deployed (`npx writbase init`) +- An agent key created via the WritBase dashboard or CLI (`npx writbase key create`) +- Python 3.10+ +- An OpenAI API key (or any LLM provider supported by CrewAI) + +## Setup + +1. Clone this example: + +```bash +git clone https://github.com/crewAIInc/crewAI-examples.git +cd crewAI-examples/integrations/writbase +``` + +2. Create a virtual environment and install dependencies: + +```bash +python -m venv .venv +source .venv/bin/activate +pip install -r requirements.txt +``` + +3. Copy the environment template and fill in your values: + +```bash +cp .env.example .env +``` + +Edit `.env` with your WritBase Supabase URL, agent key, and OpenAI API key. + +4. Run the example: + +```bash +python main.py +``` + +## How It Works + +The crew connects to WritBase via its MCP HTTP endpoint using JSON-RPC 2.0 calls: + +1. **Fetch tasks** — calls `get_tasks` to retrieve tasks assigned to this agent with status `todo` +2. **Process tasks** — a CrewAI crew with a researcher and writer agent processes each task +3. **Update results** — calls `update_task` to set the task status to `done` and attach the result as notes + +## Architecture + +``` ++-------------------+ +---------------------------+ +------------+ +| | HTTP | | | | +| CrewAI Crew +-------->+ WritBase MCP Endpoint +-------->+ Postgres | +| (this script) | JSON- | (Supabase Edge Function) | | (tasks, | +| | RPC | | | logs) | ++-------------------+ +---------------------------+ +------------+ + | ^ + | other crews / agents | + | +------------------------+ | + +-------------------->+ WritBase Dashboard +------------------+ + +------------------------+ +``` + +## MCP Endpoint Reference + +All calls go to `{WRITBASE_URL}/functions/v1/mcp-server` with header `X-Agent-Key: {WRITBASE_AGENT_KEY}`. + +### Fetch assigned tasks + +```json +{ + "jsonrpc": "2.0", + "method": "tools/call", + "params": { + "name": "get_tasks", + "arguments": {"status": "todo", "assigned_to_me": true} + }, + "id": 1 +} +``` + +### Update a task + +```json +{ + "jsonrpc": "2.0", + "method": "tools/call", + "params": { + "name": "update_task", + "arguments": {"task_id": "...", "status": "done", "notes": "Result here"} + }, + "id": 2 +} +``` + +## Learn More + +- [WritBase GitHub](https://github.com/Writbase/writbase) +- [CrewAI Documentation](https://docs.crewai.com) diff --git a/integrations/writbase/main.py b/integrations/writbase/main.py new file mode 100644 index 00000000..1fa7e539 --- /dev/null +++ b/integrations/writbase/main.py @@ -0,0 +1,177 @@ +""" +WritBase Task Backend for CrewAI + +Fetches tasks assigned to this agent from WritBase, processes them with a +CrewAI crew (researcher + writer), and writes results back. + +Usage: + cp .env.example .env # fill in your values + pip install -r requirements.txt + python main.py +""" + +import json +import sys + +import httpx +from crewai import Agent, Crew, Process, Task +from dotenv import load_dotenv +import os + +load_dotenv() + +WRITBASE_URL = os.environ["WRITBASE_URL"] +WRITBASE_AGENT_KEY = os.environ["WRITBASE_AGENT_KEY"] +MCP_ENDPOINT = f"{WRITBASE_URL}/functions/v1/mcp-server" + + +# --------------------------------------------------------------------------- +# WritBase MCP helpers +# --------------------------------------------------------------------------- + +def mcp_call(method: str, params: dict, request_id: int = 1) -> dict: + """Send a JSON-RPC 2.0 request to the WritBase MCP endpoint.""" + payload = { + "jsonrpc": "2.0", + "method": method, + "params": params, + "id": request_id, + } + headers = { + "Content-Type": "application/json", + "X-Agent-Key": WRITBASE_AGENT_KEY, + } + response = httpx.post(MCP_ENDPOINT, json=payload, headers=headers, timeout=30) + response.raise_for_status() + result = response.json() + if "error" in result: + raise RuntimeError(f"MCP error: {result['error']}") + return result.get("result", {}) + + +def fetch_todo_tasks() -> list[dict]: + """Fetch tasks assigned to this agent with status 'todo'.""" + result = mcp_call( + method="tools/call", + params={ + "name": "get_tasks", + "arguments": {"status": "todo", "assigned_to_me": True}, + }, + ) + # MCP tool results come as content blocks; parse the text block + content = result.get("content", []) + for block in content: + if block.get("type") == "text": + return json.loads(block["text"]) + return [] + + +def update_task(task_id: str, status: str, notes: str) -> dict: + """Update a task's status and notes in WritBase.""" + return mcp_call( + method="tools/call", + params={ + "name": "update_task", + "arguments": { + "task_id": task_id, + "status": status, + "notes": notes, + }, + }, + ) + + +# --------------------------------------------------------------------------- +# CrewAI crew definition +# --------------------------------------------------------------------------- + +researcher = Agent( + role="Researcher", + goal="Analyze the task description and gather key information", + backstory=( + "You are a meticulous researcher who breaks down task descriptions " + "into actionable insights. You identify what needs to be done and " + "gather the relevant context." + ), + verbose=True, +) + +writer = Agent( + role="Writer", + goal="Produce a clear, concise result summary for the task", + backstory=( + "You are a skilled writer who takes research findings and produces " + "well-structured summaries. You focus on clarity and actionability." + ), + verbose=True, +) + + +def build_crew(task_description: str) -> Crew: + """Build a CrewAI crew to process a single WritBase task.""" + research_task = Task( + description=( + f"Analyze the following task and identify the key requirements, " + f"constraints, and any relevant context:\n\n{task_description}" + ), + expected_output="A structured analysis of the task requirements.", + agent=researcher, + ) + + writing_task = Task( + description=( + "Based on the research analysis, produce a clear and actionable " + "result summary. Include specific recommendations or deliverables." + ), + expected_output="A concise result summary with actionable items.", + agent=writer, + ) + + return Crew( + agents=[researcher, writer], + tasks=[research_task, writing_task], + process=Process.sequential, + verbose=True, + ) + + +# --------------------------------------------------------------------------- +# Main loop +# --------------------------------------------------------------------------- + +def main(): + print("Fetching tasks from WritBase...") + tasks = fetch_todo_tasks() + + if not tasks: + print("No tasks assigned. Nothing to do.") + return + + print(f"Found {len(tasks)} task(s) to process.\n") + + for wb_task in tasks: + task_id = wb_task["id"] + title = wb_task.get("title", "(untitled)") + description = wb_task.get("description", title) + + print(f"--- Processing task: {title} (id: {task_id}) ---") + + # Mark as in_progress + update_task(task_id, status="in_progress", notes="CrewAI processing started") + + try: + crew = build_crew(description) + result = crew.kickoff() + + # Write result back to WritBase + update_task(task_id, status="done", notes=str(result)) + print(f"Task {task_id} completed.\n") + + except Exception as exc: + error_msg = f"CrewAI processing failed: {exc}" + update_task(task_id, status="failed", notes=error_msg) + print(f"Task {task_id} failed: {exc}\n", file=sys.stderr) + + +if __name__ == "__main__": + main() diff --git a/integrations/writbase/requirements.txt b/integrations/writbase/requirements.txt new file mode 100644 index 00000000..0e1f92a1 --- /dev/null +++ b/integrations/writbase/requirements.txt @@ -0,0 +1,3 @@ +crewai>=0.80.0 +httpx>=0.27.0 +python-dotenv>=1.0.0 From 187c6e458627cf817febc86bbbb359c0284b5c72 Mon Sep 17 00:00:00 2001 From: Dyn Date: Sat, 14 Mar 2026 14:44:46 +0000 Subject: [PATCH 2/2] Fix WritBase API compatibility: add version param, fix response parsing - update_task requires version for conflict detection - get_tasks response wraps tasks in {"tasks": [...]} - WritBase tasks use description, not title Co-Authored-By: Claude Opus 4.6 (1M context) --- integrations/writbase/main.py | 21 ++++++++++++--------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/integrations/writbase/main.py b/integrations/writbase/main.py index 1fa7e539..625874ee 100644 --- a/integrations/writbase/main.py +++ b/integrations/writbase/main.py @@ -62,11 +62,12 @@ def fetch_todo_tasks() -> list[dict]: content = result.get("content", []) for block in content: if block.get("type") == "text": - return json.loads(block["text"]) + data = json.loads(block["text"]) + return data.get("tasks", data) if isinstance(data, dict) else data return [] -def update_task(task_id: str, status: str, notes: str) -> dict: +def update_task(task_id: str, version: int, status: str, notes: str) -> dict: """Update a task's status and notes in WritBase.""" return mcp_call( method="tools/call", @@ -74,6 +75,7 @@ def update_task(task_id: str, status: str, notes: str) -> dict: "name": "update_task", "arguments": { "task_id": task_id, + "version": version, "status": status, "notes": notes, }, @@ -151,25 +153,26 @@ def main(): for wb_task in tasks: task_id = wb_task["id"] - title = wb_task.get("title", "(untitled)") - description = wb_task.get("description", title) + version = wb_task["version"] + description = wb_task.get("description", "(no description)") - print(f"--- Processing task: {title} (id: {task_id}) ---") + print(f"--- Processing task: {description[:60]} (id: {task_id}) ---") - # Mark as in_progress - update_task(task_id, status="in_progress", notes="CrewAI processing started") + # Mark as in_progress (version increments after each update) + update_task(task_id, version=version, status="in_progress", notes="CrewAI processing started") + version += 1 try: crew = build_crew(description) result = crew.kickoff() # Write result back to WritBase - update_task(task_id, status="done", notes=str(result)) + update_task(task_id, version=version, status="done", notes=str(result)) print(f"Task {task_id} completed.\n") except Exception as exc: error_msg = f"CrewAI processing failed: {exc}" - update_task(task_id, status="failed", notes=error_msg) + update_task(task_id, version=version, status="failed", notes=error_msg) print(f"Task {task_id} failed: {exc}\n", file=sys.stderr)