Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Required
COGNEE_URL=https://cognee.example.com
COGNEE_API_KEY=
COGNEE_DATASET=
WATCH_ROOT=/data

# Optional (defaults shown)
DEBOUNCE_MS=1000
RECONCILE_INTERVAL_SEC=3600
LOG_LEVEL=info
IGNORE_GLOBS=.*,*.tmp,*.swp,*.swx,*~,*.part
REQUEST_TIMEOUT_SEC=30
MAX_RETRIES=6
RETRY_MAX_BACKOFF_SEC=60
# HEALTHZ_PORT=8080
# METRICS_PORT=9100
17 changes: 17 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
FROM python:3.12-slim

ENV PYTHONUNBUFFERED=1 \
PYTHONDONTWRITEBYTECODE=1 \
PIP_NO_CACHE_DIR=1

WORKDIR /app

COPY pyproject.toml README.md ./
COPY src ./src

RUN pip install --no-cache-dir .

ENV WATCH_ROOT=/data
VOLUME ["/data"]

ENTRYPOINT ["cognee-watcher"]
49 changes: 46 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,51 @@ A small, stateless daemon that mirrors a local directory into a remote [Cognee](

Designed as the connective tissue between any process with a local filesystem and a centrally-hosted Cognee server, with strict per-tenant isolation via Cognee datasets.

See [DESIGN.md](DESIGN.md) for the architecture, API surface, and rationale.
See [DESIGN.md](DESIGN.md) for the architecture and rationale.

## Status
## Quick start

Design phase. No implementation yet.
```bash
pip install -e .
cp .env.example .env # fill in COGNEE_URL, COGNEE_API_KEY, COGNEE_DATASET, WATCH_ROOT
cognee-watcher
```

Or via Docker:

```bash
docker build -t cognee-watcher .
docker run --rm \
-e COGNEE_URL=https://cognee.example.com \
-e COGNEE_API_KEY=$KEY \
-e COGNEE_DATASET=my-dataset \
-v /path/to/knowledge:/data \
cognee-watcher
```

## Configuration

All configuration is via environment variables — see [.env.example](.env.example) for the full list with defaults.

| Variable | Required | Purpose |
|---|---|---|
| `COGNEE_URL` | yes | Base URL of the Cognee server |
| `COGNEE_API_KEY` | yes | API key for the watcher's user |
| `COGNEE_DATASET` | yes | Dataset name to scope all writes |
| `WATCH_ROOT` | yes | Local directory to watch (recursive) |
| `DEBOUNCE_MS` | no | Coalesce window for bursty edits (default `1000`) |
| `RECONCILE_INTERVAL_SEC` | no | Periodic drift-repair sweep (default `3600`, `0` to disable) |
| `LOG_LEVEL` | no | `debug`/`info`/`warn`/`error` (default `info`) |
| `IGNORE_GLOBS` | no | Comma-separated globs to skip |

## Development

```bash
python -m venv .venv && source .venv/bin/activate
pip install -e ".[dev]"
pytest
```

## License

MIT.
59 changes: 59 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"

[project]
name = "cognee-watcher"
version = "0.1.0"
description = "Stateless daemon that mirrors a local directory into a remote Cognee instance for RAG indexing."
readme = "README.md"
requires-python = ">=3.11"
license = { text = "MIT" }
authors = [{ name = "Majus" }]
keywords = ["cognee", "rag", "knowledge-graph", "filesystem", "watcher", "inotify"]
classifiers = [
"License :: OSI Approved :: MIT License",
"Operating System :: POSIX :: Linux",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
"Topic :: System :: Filesystems",
"Topic :: Utilities",
]
dependencies = [
"httpx>=0.27",
"pydantic>=2.6",
"pydantic-settings>=2.2",
"python-json-logger>=2.0",
"tenacity>=8.2",
"watchfiles>=0.21",
]

[project.optional-dependencies]
dev = [
"pytest>=8.0",
"pytest-asyncio>=0.23",
"respx>=0.21",
"ruff>=0.5",
]

[project.scripts]
cognee-watcher = "cognee_watcher.main:run"

[project.urls]
Homepage = "https://github.com/majus/cognee-watcher"
Issues = "https://github.com/majus/cognee-watcher/issues"

[tool.hatch.build.targets.wheel]
packages = ["src/cognee_watcher"]

[tool.pytest.ini_options]
asyncio_mode = "auto"
testpaths = ["tests"]

[tool.ruff]
line-length = 100
target-version = "py311"

[tool.ruff.lint]
select = ["E", "F", "I", "B", "UP", "SIM", "RUF"]
1 change: 1 addition & 0 deletions src/cognee_watcher/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
__version__ = "0.1.0"
4 changes: 4 additions & 0 deletions src/cognee_watcher/__main__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from .main import run

if __name__ == "__main__":
run()
212 changes: 212 additions & 0 deletions src/cognee_watcher/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
"""Async REST client for the Cognee server.

Wraps the small subset of endpoints the watcher needs:
- list documents in a dataset (filtered by name)
- multipart add
- patch update
- delete
- cognify
"""

from __future__ import annotations

from dataclasses import dataclass
from pathlib import Path

import httpx
from tenacity import (
AsyncRetrying,
retry_if_exception,
stop_after_attempt,
wait_exponential,
)

from .log import get_logger

logger = get_logger(__name__)


class CogneeError(RuntimeError):
"""Non-retryable error from Cognee (typically 4xx)."""


@dataclass(slots=True)
class CogneeDocument:
data_id: str
name: str
raw: dict


def _should_retry(exc: BaseException) -> bool:
if isinstance(exc, httpx.HTTPStatusError):
return exc.response.status_code >= 500 or exc.response.status_code == 429
return isinstance(exc, httpx.TransportError | httpx.TimeoutException)


class CogneeClient:
def __init__(
self,
base_url: str,
api_key: str,
*,
timeout: float = 30.0,
max_retries: int = 6,
max_backoff: float = 60.0,
) -> None:
self._base = base_url.rstrip("/")
self._headers = {"X-Api-Key": api_key, "Accept": "application/json"}
self._timeout = timeout
self._max_retries = max_retries
self._max_backoff = max_backoff
self._client = httpx.AsyncClient(
base_url=self._base,
headers=self._headers,
timeout=timeout,
)

async def aclose(self) -> None:
await self._client.aclose()

async def __aenter__(self) -> CogneeClient:
return self

async def __aexit__(self, exc_type, exc, tb) -> None:
await self.aclose()

# --- internal -------------------------------------------------------

def _retrying(self) -> AsyncRetrying:
return AsyncRetrying(
stop=stop_after_attempt(self._max_retries + 1),
wait=wait_exponential(multiplier=1, min=1, max=self._max_backoff),
retry=retry_if_exception(_should_retry),
reraise=True,
)

async def _request(
self,
method: str,
url: str,
*,
json: dict | None = None,
files: dict | None = None,
data: dict | None = None,
params: dict | None = None,
) -> httpx.Response:
async for attempt in self._retrying():
with attempt:
response = await self._client.request(
method,
url,
json=json,
files=files,
data=data,
params=params,
)
response.raise_for_status()
return response
raise AssertionError("unreachable") # pragma: no cover

# --- public API ----------------------------------------------------

async def find_by_name(self, dataset: str, name: str) -> CogneeDocument | None:
"""Return the document with the given upload name in the dataset, or None.

Tries server-side filter first; falls back to client-side scan if the
server ignores the ``name`` query.
"""
params = {"name": name}
try:
response = await self._request(
"GET", f"/api/v1/datasets/{dataset}/data", params=params
)
except httpx.HTTPStatusError as e:
if e.response.status_code == 404:
return None
raise CogneeError(f"GET /data failed: {e.response.status_code}") from e

items = _coerce_list(response.json())
match = next((d for d in items if d.get("name") == name), None)
if match is None:
return None
return CogneeDocument(data_id=str(match["id"]), name=match["name"], raw=match)

async def add(self, dataset: str, path: Path, upload_name: str) -> CogneeDocument:
"""Multipart-upload a new document. Returns the created doc."""
with path.open("rb") as fh:
files = {"data": (upload_name, fh, "application/octet-stream")}
data = {"datasetName": dataset}
try:
response = await self._request(
"POST", "/api/v1/add", files=files, data=data
)
except httpx.HTTPStatusError as e:
raise CogneeError(
f"POST /add failed: {e.response.status_code} {e.response.text[:200]}"
) from e
body = response.json()
return CogneeDocument(data_id=str(body["id"]), name=body.get("name", upload_name), raw=body)

async def update(
self, dataset: str, data_id: str, path: Path, upload_name: str
) -> CogneeDocument:
"""Supersede an existing document. Returns the doc with the new ``data_id``."""
with path.open("rb") as fh:
files = {"data": (upload_name, fh, "application/octet-stream")}
data = {"datasetId": dataset, "dataId": data_id}
try:
response = await self._request(
"PATCH", "/api/v1/update", files=files, data=data
)
except httpx.HTTPStatusError as e:
raise CogneeError(
f"PATCH /update failed: {e.response.status_code} {e.response.text[:200]}"
) from e
body = response.json()
return CogneeDocument(data_id=str(body["id"]), name=body.get("name", upload_name), raw=body)

async def delete(self, dataset: str, data_id: str) -> None:
try:
await self._request(
"DELETE", f"/api/v1/datasets/{dataset}/data/{data_id}"
)
except httpx.HTTPStatusError as e:
if e.response.status_code == 404:
return
raise CogneeError(
f"DELETE failed: {e.response.status_code} {e.response.text[:200]}"
) from e

async def cognify(self, dataset: str) -> None:
try:
await self._request("POST", "/api/v1/cognify", json={"datasets": [dataset]})
except httpx.HTTPStatusError as e:
raise CogneeError(
f"POST /cognify failed: {e.response.status_code} {e.response.text[:200]}"
) from e

async def list_dataset(self, dataset: str) -> list[CogneeDocument]:
"""Return all documents in the dataset (used by the reconcile sweep)."""
try:
response = await self._request("GET", f"/api/v1/datasets/{dataset}/data")
except httpx.HTTPStatusError as e:
if e.response.status_code == 404:
return []
raise CogneeError(
f"GET /data list failed: {e.response.status_code}"
) from e
items = _coerce_list(response.json())
return [
CogneeDocument(data_id=str(d["id"]), name=d["name"], raw=d) for d in items if "id" in d
]


def _coerce_list(payload) -> list[dict]:
if isinstance(payload, list):
return payload
if isinstance(payload, dict):
for key in ("data", "items", "results"):
value = payload.get(key)
if isinstance(value, list):
return value
return []
Loading