diff --git a/.github/labeler.yml b/.github/labeler.yml index 72fb9b2c71..b90ef9fd72 100644 --- a/.github/labeler.yml +++ b/.github/labeler.yml @@ -63,6 +63,11 @@ integration:chroma: - changed-files: - any-glob-to-any-file: "integrations/chroma/**/*" - any-glob-to-any-file: ".github/workflows/chroma.yml" + +integration:cognee: + - changed-files: + - any-glob-to-any-file: "integrations/cognee/**/*" + - any-glob-to-any-file: ".github/workflows/cognee.yml" integration:cohere: - changed-files: diff --git a/.github/workflows/cognee.yml b/.github/workflows/cognee.yml new file mode 100644 index 0000000000..d00696263c --- /dev/null +++ b/.github/workflows/cognee.yml @@ -0,0 +1,92 @@ +# This workflow comes from https://github.com/ofek/hatch-mypyc +# https://github.com/ofek/hatch-mypyc/blob/5a198c0ba8660494d02716cfc9d79ce4adfb1442/.github/workflows/test.yml +name: Test / cognee + +on: + schedule: + - cron: "0 0 * * *" + pull_request: + paths: + - "integrations/cognee/**" + - "!integrations/cognee/*.md" + - ".github/workflows/cognee.yml" + push: + branches: + - main + paths: + - "integrations/cognee/**" + - "!integrations/cognee/*.md" + - ".github/workflows/cognee.yml" + +defaults: + run: + working-directory: integrations/cognee + +concurrency: + group: cognee-${{ github.head_ref }} + cancel-in-progress: true + +env: + PYTHONUNBUFFERED: "1" + FORCE_COLOR: "1" + # cognee runs an LLM internally for remember/recall/improve; the integration suite needs a key. + LLM_API_KEY: ${{ secrets.OPENAI_API_KEY }} + +jobs: + run: + name: Python ${{ matrix.python-version }} on ${{ startsWith(matrix.os, 'macos-') && 'macOS' || startsWith(matrix.os, 'windows-') && 'Windows' || 'Linux' }} + runs-on: ${{ matrix.os }} + strategy: + fail-fast: false + matrix: + os: [ubuntu-latest, windows-latest, macos-latest] + python-version: ["3.10", "3.14"] + + steps: + - name: Support longpaths + if: matrix.os == 'windows-latest' + working-directory: . + run: git config --system core.longpaths true + + - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2 + + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@a309ff8b426b58ec0e2a45f0f869d46889d02405 # v6.2.0 + with: + python-version: ${{ matrix.python-version }} + + - name: Install Hatch + run: pip install --upgrade hatch + - name: Lint + if: matrix.python-version == '3.10' && runner.os == 'Linux' + run: hatch run fmt-check && hatch run test:types + + - name: Run unit tests + run: hatch run test:unit-cov-retry + + - name: Run integration tests + run: hatch run test:integration-cov-append-retry + + - name: Run unit tests with lowest direct dependencies + run: | + hatch run uv pip compile pyproject.toml --resolution lowest-direct --output-file requirements_lowest_direct.txt + hatch -e test env run -- uv pip install -r requirements_lowest_direct.txt + hatch run test:unit + + - name: Nightly - run tests with Haystack main branch + if: github.event_name == 'schedule' + run: | + hatch env prune + hatch -e test env run -- uv pip install git+https://github.com/deepset-ai/haystack.git@main + hatch run test:unit-cov-retry + hatch run test:integration-cov-append-retry + + + notify-slack-on-failure: + needs: run + if: failure() && github.event_name == 'schedule' + runs-on: ubuntu-slim + steps: + - uses: deepset-ai/notify-slack-action@3cda73b77a148f16f703274198e7771340cf862b # v1 + with: + slack-webhook-url: ${{ secrets.SLACK_WEBHOOK_URL_NOTIFICATIONS }} diff --git a/integrations/cognee/LICENSE b/integrations/cognee/LICENSE new file mode 100644 index 0000000000..de4c7f39f1 --- /dev/null +++ b/integrations/cognee/LICENSE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright 2023 deepset GmbH + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/integrations/cognee/README.md b/integrations/cognee/README.md new file mode 100644 index 0000000000..049f783ee4 --- /dev/null +++ b/integrations/cognee/README.md @@ -0,0 +1,14 @@ +# cognee-haystack + +[![PyPI - Version](https://img.shields.io/pypi/v/cognee-haystack.svg)](https://pypi.org/project/cognee-haystack) +[![PyPI - Python Version](https://img.shields.io/pypi/pyversions/cognee-haystack.svg)](https://pypi.org/project/cognee-haystack) + +[Cognee](https://www.cognee.ai/) integration for [Haystack](https://haystack.deepset.ai/) — open-source memory for AI agents. + +- [Changelog](https://github.com/deepset-ai/haystack-core-integrations/blob/main/integrations/cognee/CHANGELOG.md) + +--- + +## Contributing + +Refer to the general [Contribution Guidelines](https://github.com/deepset-ai/haystack-core-integrations/blob/main/CONTRIBUTING.md). diff --git a/integrations/cognee/examples/README.md b/integrations/cognee/examples/README.md new file mode 100644 index 0000000000..718a4b2b2f --- /dev/null +++ b/integrations/cognee/examples/README.md @@ -0,0 +1,41 @@ +# Cognee-Haystack Examples + +## Prerequisites + +Install the integration from the repository root: + +```bash +pip install -e "integrations/cognee" +``` + +Set the keys used by cognee and by Haystack's `OpenAIChatGenerator`: + +```bash +export LLM_API_KEY="sk-your-openai-api-key" # cognee: remember/recall/improve +export EMBEDDING_API_KEY="sk-your-openai-api-key" # cognee: embeddings (often same key) +export OPENAI_API_KEY="sk-your-openai-api-key" # Haystack OpenAIChatGenerator (Agent) +``` + +In practice all three can point at the same OpenAI key. For other LLM providers and full +configuration options, see [Cognee Documentation](https://docs.cognee.ai/getting-started/installation#environment-configuration). + +cognee's session cache is on by default (`CACHING=true`, `CACHE_BACKEND=fs`); set +`CACHE_BACKEND=redis` plus `CACHE_HOST` / `CACHE_PORT` / `CACHE_USERNAME` / +`CACHE_PASSWORD` to point at a Redis instance instead. + +## Examples + +### Memory Agent Demo (`demo_memory_agent.py`) + +Wires `CogneeRetriever`, `CogneeMemoryStore`, and `CogneeWriter` around Haystack's +`Agent` to enrich every conversation turn with persistent memory, in four phases: + +1. `persistent_writer` seeds long-lived facts into the permanent graph. +2. `session_writer` seeds session-only context (`session_id=...`). +3. Agent loop: `CogneeRetriever` calls `cognee.recall(query, session_id=...)` which + auto-captures each turn as a QA entry in the session — no writer in the pipeline. +4. `chat_store.improve()` promotes the session into the permanent graph. + +```bash +python integrations/cognee/examples/demo_memory_agent.py +``` diff --git a/integrations/cognee/examples/demo_memory_agent.py b/integrations/cognee/examples/demo_memory_agent.py new file mode 100644 index 0000000000..e6074eb544 --- /dev/null +++ b/integrations/cognee/examples/demo_memory_agent.py @@ -0,0 +1,204 @@ +#!/usr/bin/env python +""" +Minimal cognee-haystack demo — persistent vs session memory in four phases. + +Phase 1 — `persistent_writer` (no `session_id`) seeds long-lived facts into + cognee's permanent knowledge graph. +Phase 2 — `session_writer` (`session_id=...`) seeds session-only context into + cognee's session cache. The graph itself doesn't change. +Phase 3 — Agent loop: `CogneeRetriever` calls `cognee.recall(query, session_id=...)` + which auto-captures each turn as a QA entry in the session. **No + CogneeWriter in the pipeline** — cognee's recall is the session-write + path per the docs. +Phase 4 — `chat_store.improve()` promotes the session into the permanent + graph via `cognee.improve(dataset=..., session_ids=[...])`. + +Environment (loaded from repo-root .env): + LLM_API_KEY Required. cognee's LLM provider key. + EMBEDDING_API_KEY Optional; defaults to LLM_API_KEY when unset. + OPENAI_API_KEY Required. Used by Haystack's OpenAIChatGenerator. + +Run: + cd integrations/cognee + .venv/bin/hatch run test:python examples/demo_memory_agent.py +""" + +import asyncio +import logging +import os +from pathlib import Path + +from dotenv import load_dotenv + +os.environ.setdefault("LOG_LEVEL", "WARNING") + +# Load .env from the repo root before cognee imports read any config. +load_dotenv(Path(__file__).resolve().parents[3] / ".env", override=True) + +import cognee +from cognee.api.v1.visualize import visualize_multi_user_graph +from cognee.modules.users.methods import get_default_user +from haystack import Pipeline +from haystack.components.agents import Agent +from haystack.components.converters import OutputAdapter +from haystack.components.generators.chat import OpenAIChatGenerator +from haystack.dataclasses import ChatMessage + +from haystack_integrations.components.retrievers.cognee import CogneeRetriever +from haystack_integrations.components.writers.cognee import CogneeWriter +from haystack_integrations.memory_stores.cognee import CogneeMemoryStore + +# cognee binds its graph engine to whichever event loop touches it first. +# Route the demo's direct cognee calls through the integration's background +# loop so reads and writes share state. +from haystack_integrations.memory_stores.cognee.memory_store import _run_sync + +logging.basicConfig(level=logging.WARNING) + +DATASET = "agent_memory_minimal" +SESSION = "alice_chat_42" + +ARTIFACTS = Path(__file__).resolve().parent / "graph_snapshots" +ARTIFACTS.mkdir(exist_ok=True) + +# Long-lived facts +PERSISTENT_MEMORIES = [ + "My name is Alice. I'm a senior data scientist at Acme Corp specialising in NLP and knowledge graphs.", + "My current project is building an internal documentation search system powered by Haystack and Cognee.", + "My team: Bob is the ML engineer and Carol handles infrastructure.", + "I prefer concise answers with Python code examples over long prose explanations.", +] + +# Session-only context +SESSION_MEMORIES = [ + "Bob is having trouble with the new documentation search system.", + "Carol helps Bob troubleshoot the issue.", +] + +SYSTEM_PROMPT = ( + "You are a helpful assistant with access to persistent memory of past conversations. " + "Any system messages at the start of the conversation contain relevant memories. " + "Be concise; prefer short answers and Python code examples." +) + + +async def _visualize_all_datasets(destination_file_path: str) -> None: + """Render a combined graph across every dataset the default user can read. + + Uses `cognee.visualize_multi_user_graph` with explicit `(user, dataset)` pairs + so each dataset's graph is read inside its own database context. Works whether + or not `ENABLE_BACKEND_ACCESS_CONTROL` is enabled. + """ + user = await get_default_user() + datasets = await cognee.datasets.list_datasets(user=user) + pairs = [(user, ds) for ds in datasets] + await visualize_multi_user_graph(pairs, destination_file_path=destination_file_path) + + +def build_pipeline(chat_store: CogneeMemoryStore) -> Pipeline: + """Retriever → injector → agent. No writer: cognee.recall auto-captures the session QA.""" + pipeline = Pipeline() + pipeline.add_component("retriever", CogneeRetriever(memory_store=chat_store)) + pipeline.add_component( + "injector", + OutputAdapter( + template="{{ memories + user_messages }}", + output_type=list[ChatMessage], + unsafe=True, + ), + ) + pipeline.add_component( + "agent", + Agent( + chat_generator=OpenAIChatGenerator(model="gpt-4o-mini"), + system_prompt=SYSTEM_PROMPT, + ), + ) + pipeline.connect("retriever.messages", "injector.memories") + pipeline.connect("injector.output", "agent.messages") + return pipeline + + +async def main() -> None: + # Direct `cognee.forget(everything=True)` instead of `store.delete_all_memories()`: + # the protocol method is dataset-scoped and leaves the session cache alone, but at + # demo start we want a global wipe including any leftover session entries. + print("Forgetting previous data for a clean start...") + _run_sync(cognee.forget(everything=True)) + print("Done.\n") + + # One store, two writers — `session_id` on the writer picks the tier. + # `self_improvement=False` keeps Phase 4's `improve()` as the only improve + # trigger; otherwise cognee runs improve per write and we'd see duplicated + # nodes after the explicit improve. + seed_store = CogneeMemoryStore(dataset_name=DATASET, self_improvement=False) + + # ─── Phase 1: persistent seed (writer has no session_id) ─────────────────── + print(f"Phase 1: persistent_writer -> permanent graph ({len(PERSISTENT_MEMORIES)} facts)...") + persistent_writer = CogneeWriter(memory_store=seed_store) + persistent_writer.run(messages=[ChatMessage.from_user(fact) for fact in PERSISTENT_MEMORIES]) + + snapshot_1 = ARTIFACTS / "1_after_persistent_seed.html" + _run_sync(_visualize_all_datasets(str(snapshot_1))) + print(f" Graph snapshot -> {snapshot_1}\n") + + # ─── Phase 2: session seed (writer has session_id set) ───────────────────── + print(f"Phase 2: session_writer -> session cache ({len(SESSION_MEMORIES)} facts)...") + session_writer = CogneeWriter(memory_store=seed_store, session_id=SESSION) + session_writer.run(messages=[ChatMessage.from_user(fact) for fact in SESSION_MEMORIES]) + + snapshot_2 = ARTIFACTS / "2_after_session_seed.html" + _run_sync(_visualize_all_datasets(str(snapshot_2))) + print(f" Graph snapshot -> {snapshot_2}") + print(" (Session writes don't touch the graph — should look like snapshot 1.)\n") + + # ─── Phase 3: agent loop (no writer; cognee.recall auto-captures the session QA) ── + print("Phase 3: agent loop (retriever -> injector -> agent)\n") + # Session-scoped store so the retriever's recall is session-aware. + chat_store = CogneeMemoryStore(dataset_name=DATASET, session_id=SESSION) + pipeline = build_pipeline(chat_store) + + turns = [ + "Hi! Can you remind me what project I'm currently working on?", + "What's the tech stack we're using for it?", + "Who else is on my team, and what are their roles?", + "Based on what you know about me, give me a quick tip for structuring a new Haystack pipeline component.", + ] + for user_text in turns: + print(f"User: {user_text}") + result = pipeline.run( + { + "retriever": {"query": user_text}, + "injector": {"user_messages": [ChatMessage.from_user(user_text)]}, + } + ) + reply = result["agent"]["last_message"].text or "(no reply)" + print(f"Agent: {reply}\n") + + snapshot_3 = ARTIFACTS / "3_after_chat.html" + _run_sync(_visualize_all_datasets(str(snapshot_3))) + print(f" Graph snapshot -> {snapshot_3}") + print(" (Still graph-unchanged: session writes from recall live in the cache.)\n") + + print("--- Session cache contents (cognee.session.get_session) ---") + entries = _run_sync(cognee.session.get_session(session_id=SESSION)) + print(f"{len(entries)} entries in session {SESSION!r}") + for i, e in enumerate(entries, 1): + print(f"\n[{i}] qa_id={e.qa_id} time={e.time}") + print(f" question: {e.question!r}") + print(f" answer : {e.answer!r}") + + # ─── Phase 4: improve session -> permanent graph ────────────────────────── + print(f"\nPhase 4: chat_store.improve() -> cognee.improve(dataset={DATASET!r}, session_ids=[{SESSION!r}])...") + chat_store.improve() + + snapshot_4 = ARTIFACTS / "4_after_improve.html" + _run_sync(_visualize_all_datasets(str(snapshot_4))) + print(f" Graph snapshot -> {snapshot_4}") + print(" (Graph now includes session-derived nodes.)") + + print("\nDone.") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/integrations/cognee/pydoc/config_docusaurus.yml b/integrations/cognee/pydoc/config_docusaurus.yml new file mode 100644 index 0000000000..d0412a6523 --- /dev/null +++ b/integrations/cognee/pydoc/config_docusaurus.yml @@ -0,0 +1,15 @@ +loaders: + - modules: + - haystack_integrations.components.retrievers.cognee.memory_retriever + - haystack_integrations.components.writers.cognee.memory_writer + - haystack_integrations.memory_stores.cognee.memory_store + search_path: [../src] +processors: + - type: filter + documented_only: true + skip_empty_modules: true +renderer: + description: Cognee integration for Haystack + id: integrations-cognee + filename: cognee.md + title: Cognee diff --git a/integrations/cognee/pyproject.toml b/integrations/cognee/pyproject.toml new file mode 100644 index 0000000000..4d3b155b01 --- /dev/null +++ b/integrations/cognee/pyproject.toml @@ -0,0 +1,164 @@ +[build-system] +requires = ["hatchling", "hatch-vcs"] +build-backend = "hatchling.build" + +[project] +name = "cognee-haystack" +dynamic = ["version"] +description = "Haystack integration for Cognee — memory for AI agents" +readme = "README.md" +requires-python = ">=3.10" +license = "Apache-2.0" +keywords = [] +authors = [{ name = "deepset GmbH", email = "info@deepset.ai" }] +classifiers = [ + "License :: OSI Approved :: Apache Software License", + "Development Status :: 3 - Alpha", + "Programming Language :: Python", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", + "Programming Language :: Python :: 3.13", + "Programming Language :: Python :: 3.14", + "Programming Language :: Python :: Implementation :: CPython", + "Programming Language :: Python :: Implementation :: PyPy", +] +dependencies = [ + "haystack-ai>=2.24.0", + "cognee>=1.0.9,<2.0", +] + +[project.urls] +Documentation = "https://github.com/deepset-ai/haystack-core-integrations/tree/main/integrations/cognee#readme" +Issues = "https://github.com/deepset-ai/haystack-core-integrations/issues" +Source = "https://github.com/deepset-ai/haystack-core-integrations/tree/main/integrations/cognee" + +[tool.hatch.build.targets.wheel] +packages = ["src/haystack_integrations"] + +[tool.hatch.version] +source = "vcs" +tag-pattern = 'integrations\/cognee-v(?P.*)' + +[tool.hatch.version.raw-options] +root = "../.." +git_describe_command = 'git describe --tags --match="integrations/cognee-v[0-9]*"' + +[tool.hatch.envs.default] +installer = "uv" +dependencies = ["haystack-pydoc-tools", "ruff"] + +[tool.hatch.envs.default.scripts] +docs = ["haystack-pydoc pydoc/config_docusaurus.yml"] +fmt = "ruff check --fix {args}; ruff format {args}" +fmt-check = "ruff check {args} && ruff format --check {args}" + +[tool.hatch.envs.test] +dependencies = [ + "pytest", + "pytest-asyncio", + "pytest-cov", + "pytest-rerunfailures", + "mypy", + "pip", +] + +[tool.hatch.envs.test.scripts] +unit = 'pytest -m "not integration" {args:tests}' +integration = 'pytest -m "integration" {args:tests}' +all = 'pytest {args:tests}' +unit-cov-retry = 'pytest --cov=haystack_integrations --reruns 3 --reruns-delay 30 -x -m "not integration" {args:tests}' +integration-cov-append-retry = 'pytest --cov=haystack_integrations --cov-append --reruns 3 --reruns-delay 30 -x -m "integration" {args:tests}' + +types = "mypy -p haystack_integrations.components.retrievers.cognee -p haystack_integrations.components.writers.cognee -p haystack_integrations.memory_stores.cognee {args}" + +[tool.mypy] +install_types = true +non_interactive = true +check_untyped_defs = true +disallow_incomplete_defs = true + +[tool.ruff] +line-length = 120 + +[tool.ruff.lint] +select = [ + "A", + "ARG", + "B", + "C", + "D102", # Missing docstring in public method + "D103", # Missing docstring in public function + "D205", # 1 blank line required between summary line and description + "D209", # Closing triple quotes go to new line + "D213", # summary lines must be positioned on the second physical line of the docstring + "D417", # Missing argument descriptions in the docstring + "D419", # Docstring is empty + "DTZ", + "E", + "EM", + "F", + "FBT", + "I", + "ICN", + "ISC", + "N", + "PLC", + "PLE", + "PLR", + "PLW", + "Q", + "RUF", + "S", + "T", + "TID", + "UP", + "W", + "YTT", +] +ignore = [ + # Allow non-abstract empty methods in abstract base classes + "B027", + # Allow boolean positional values in function calls, like `dict.get(... True)` + "FBT003", + # Ignore checks for possible passwords + "S105", + "S106", + "S107", + # Ignore complexity + "C901", + "PLR0911", + "PLR0912", + "PLR0913", + "PLR0915", + # Ignore unused params + "ARG002", + # Allow assertions + "S101", +] +exclude = ["examples"] + +[tool.ruff.lint.isort] +known-first-party = ["haystack_integrations"] + +[tool.ruff.lint.flake8-tidy-imports] +ban-relative-imports = "parents" + +[tool.ruff.lint.per-file-ignores] +# Tests can use magic values, assertions, and relative imports +"tests/**/*" = ["D", "PLR2004", "S101", "TID252"] +"examples/**/*" = ["D", "T201", "E402"] + +[tool.coverage.run] +source = ["haystack_integrations"] +branch = true +parallel = false + +[tool.coverage.report] +omit = ["*/tests/*", "*/__init__.py"] +show_missing = true +exclude_lines = ["no cov", "if __name__ == .__main__.:", "if TYPE_CHECKING:"] + +[tool.pytest.ini_options] +minversion = "6.0" +markers = ["integration: integration tests"] diff --git a/integrations/cognee/src/haystack_integrations/components/retrievers/cognee/__init__.py b/integrations/cognee/src/haystack_integrations/components/retrievers/cognee/__init__.py new file mode 100644 index 0000000000..90cf73038c --- /dev/null +++ b/integrations/cognee/src/haystack_integrations/components/retrievers/cognee/__init__.py @@ -0,0 +1,9 @@ +# SPDX-FileCopyrightText: 2022-present deepset GmbH +# +# SPDX-License-Identifier: Apache-2.0 + +from .memory_retriever import CogneeRetriever + +__all__ = [ + "CogneeRetriever", +] diff --git a/integrations/cognee/src/haystack_integrations/components/retrievers/cognee/memory_retriever.py b/integrations/cognee/src/haystack_integrations/components/retrievers/cognee/memory_retriever.py new file mode 100644 index 0000000000..f5fd7e65f7 --- /dev/null +++ b/integrations/cognee/src/haystack_integrations/components/retrievers/cognee/memory_retriever.py @@ -0,0 +1,61 @@ +# SPDX-FileCopyrightText: 2022-present deepset GmbH +# +# SPDX-License-Identifier: Apache-2.0 + +from typing import Any + +from haystack import component, default_from_dict, default_to_dict +from haystack.dataclasses import ChatMessage + +from haystack_integrations.memory_stores.cognee import CogneeMemoryStore + + +@component +class CogneeRetriever: + """ + Retrieves memories from a `CogneeMemoryStore` as `ChatMessage` instances. + + Configuration (`search_type`, `top_k`, `dataset_name`, `session_id`) lives on + the store; this retriever is a thin pipeline adapter over `search_memories`. + """ + + def __init__(self, *, memory_store: CogneeMemoryStore, top_k: int | None = None): + """ + Initialize the retriever. + + :param memory_store: Backing `CogneeMemoryStore` to query. + :param top_k: Default max results; falls back to the store's `top_k` when `None`. + """ + if not isinstance(memory_store, CogneeMemoryStore): + msg = "memory_store must be an instance of CogneeMemoryStore" + raise ValueError(msg) + self._memory_store = memory_store + self._top_k = top_k + + @component.output_types(messages=list[ChatMessage]) + def run( + self, + query: str, + top_k: int | None = None, + user_id: str | None = None, + ) -> dict[str, list[ChatMessage]]: + """ + Search the attached store and return matching memories as ChatMessages. + + :param query: Natural-language query. + :param top_k: Per-call override; falls back to init `top_k`, then the store's default. + :param user_id: Cognee user UUID; scopes the search to that user. + """ + effective_top_k = top_k if top_k is not None else self._top_k + messages = self._memory_store.search_memories(query=query, top_k=effective_top_k, user_id=user_id) + return {"messages": messages} + + def to_dict(self) -> dict[str, Any]: + """Serialize this component to a dictionary.""" + return default_to_dict(self, memory_store=self._memory_store.to_dict(), top_k=self._top_k) + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> "CogneeRetriever": + """Deserialize a component from a dictionary.""" + data["init_parameters"]["memory_store"] = CogneeMemoryStore.from_dict(data["init_parameters"]["memory_store"]) + return default_from_dict(cls, data) diff --git a/integrations/cognee/src/haystack_integrations/components/retrievers/py.typed b/integrations/cognee/src/haystack_integrations/components/retrievers/py.typed new file mode 100644 index 0000000000..e69de29bb2 diff --git a/integrations/cognee/src/haystack_integrations/components/writers/cognee/__init__.py b/integrations/cognee/src/haystack_integrations/components/writers/cognee/__init__.py new file mode 100644 index 0000000000..7d88e60c29 --- /dev/null +++ b/integrations/cognee/src/haystack_integrations/components/writers/cognee/__init__.py @@ -0,0 +1,9 @@ +# SPDX-FileCopyrightText: 2022-present deepset GmbH +# +# SPDX-License-Identifier: Apache-2.0 + +from .memory_writer import CogneeWriter + +__all__ = [ + "CogneeWriter", +] diff --git a/integrations/cognee/src/haystack_integrations/components/writers/cognee/memory_writer.py b/integrations/cognee/src/haystack_integrations/components/writers/cognee/memory_writer.py new file mode 100644 index 0000000000..926c92a82e --- /dev/null +++ b/integrations/cognee/src/haystack_integrations/components/writers/cognee/memory_writer.py @@ -0,0 +1,65 @@ +# SPDX-FileCopyrightText: 2022-present deepset GmbH +# +# SPDX-License-Identifier: Apache-2.0 + +from typing import Any + +from haystack import component, default_from_dict, default_to_dict +from haystack.dataclasses import ChatMessage + +from haystack_integrations.memory_stores.cognee import CogneeMemoryStore + + +@component +class CogneeWriter: + """ + Persists `ChatMessage`s into a `CogneeMemoryStore`. + + Use without `session_id` to write to the permanent graph; pass `session_id` to + target cognee's session cache for that writer's writes. The writer's + `session_id` overrides the store's own `session_id` per call, so one store can + back multiple writers writing to different tiers. + """ + + def __init__( + self, + *, + memory_store: CogneeMemoryStore, + session_id: str | None = None, + ): + """ + Initialize the writer. + + :param memory_store: Backing `CogneeMemoryStore` to write into. + :param session_id: Overrides the store's `session_id` for this writer's writes. + """ + if not isinstance(memory_store, CogneeMemoryStore): + msg = "memory_store must be an instance of CogneeMemoryStore" + raise ValueError(msg) + self._memory_store = memory_store + self._session_id = session_id + + @component.output_types(messages_written=list[ChatMessage]) + def run( + self, + messages: list[ChatMessage], + user_id: str | None = None, + ) -> dict[str, list[ChatMessage]]: + """ + Store `messages` in Cognee memory and pass them through unchanged. + + :param messages: Messages to persist. + :param user_id: Cognee user UUID; scopes the write to that user. + """ + self._memory_store.add_memories(messages=messages, user_id=user_id, session_id=self._session_id) + return {"messages_written": messages} + + def to_dict(self) -> dict[str, Any]: + """Serialize this component to a dictionary.""" + return default_to_dict(self, memory_store=self._memory_store.to_dict(), session_id=self._session_id) + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> "CogneeWriter": + """Deserialize a component from a dictionary.""" + data["init_parameters"]["memory_store"] = CogneeMemoryStore.from_dict(data["init_parameters"]["memory_store"]) + return default_from_dict(cls, data) diff --git a/integrations/cognee/src/haystack_integrations/components/writers/py.typed b/integrations/cognee/src/haystack_integrations/components/writers/py.typed new file mode 100644 index 0000000000..e69de29bb2 diff --git a/integrations/cognee/src/haystack_integrations/memory_stores/cognee/__init__.py b/integrations/cognee/src/haystack_integrations/memory_stores/cognee/__init__.py new file mode 100644 index 0000000000..eb9a6b81a8 --- /dev/null +++ b/integrations/cognee/src/haystack_integrations/memory_stores/cognee/__init__.py @@ -0,0 +1,9 @@ +# SPDX-FileCopyrightText: 2022-present deepset GmbH +# +# SPDX-License-Identifier: Apache-2.0 + +from .memory_store import CogneeMemoryStore + +__all__ = [ + "CogneeMemoryStore", +] diff --git a/integrations/cognee/src/haystack_integrations/memory_stores/cognee/memory_store.py b/integrations/cognee/src/haystack_integrations/memory_stores/cognee/memory_store.py new file mode 100644 index 0000000000..6e8ab6de3c --- /dev/null +++ b/integrations/cognee/src/haystack_integrations/memory_stores/cognee/memory_store.py @@ -0,0 +1,273 @@ +# SPDX-FileCopyrightText: 2022-present deepset GmbH +# +# SPDX-License-Identifier: Apache-2.0 + +import asyncio +import threading +from collections.abc import Coroutine +from typing import Any, Literal, TypeVar +from uuid import UUID + +from haystack import default_from_dict, default_to_dict, logging +from haystack.dataclasses import ChatMessage + +import cognee # type: ignore[import-untyped] +from cognee.modules.users.methods import get_user # type: ignore[import-untyped] + +logger = logging.getLogger(__name__) + +T = TypeVar("T") + +CogneeSearchType = Literal[ + "SUMMARIES", + "CHUNKS", + "RAG_COMPLETION", + "TRIPLET_COMPLETION", + "GRAPH_COMPLETION", + "GRAPH_COMPLETION_DECOMPOSITION", + "GRAPH_SUMMARY_COMPLETION", + "CYPHER", + "NATURAL_LANGUAGE", + "GRAPH_COMPLETION_COT", + "GRAPH_COMPLETION_CONTEXT_EXTENSION", + "FEELING_LUCKY", + "TEMPORAL", + "CODING_RULES", + "CHUNKS_LEXICAL", +] + + +# Persistent background loop reused across calls when the caller is already +# inside an event loop. cognee creates asyncio.Lock objects bound to the loop +# they're first awaited on, so a fresh loop per call would later raise +# "lock bound to a different loop" — hence one shared loop. +_background_loop: asyncio.AbstractEventLoop | None = None +_loop_lock = threading.Lock() + + +def _run_sync(coro: Coroutine[Any, Any, T], *, timeout: float = 300) -> T: + try: + asyncio.get_running_loop() + except RuntimeError: + return asyncio.run(coro) + + global _background_loop # noqa: PLW0603 + with _loop_lock: + if _background_loop is None or _background_loop.is_closed(): + _background_loop = asyncio.new_event_loop() + threading.Thread(target=_background_loop.run_forever, daemon=True).start() + return asyncio.run_coroutine_threadsafe(coro, _background_loop).result(timeout=timeout) + + +async def _resolve_user(user_id: str | None) -> Any: + if user_id is None: + return None + return await get_user(UUID(user_id)) + + +def _render(item: Any) -> str | None: + # cognee.recall returns a discriminated RecallResponse union — pick the right + # text field per source rather than probing attributes. + source = getattr(item, "source", None) + if source == "graph": + return item.text + if source == "session": + return item.answer or item.question or None + if source == "graph_context": + return item.content + if source == "trace": + return item.memory_context or None + return None + + +class CogneeMemoryStore: + """ + Memory backend backed by Cognee, implementing the haystack-experimental `MemoryStore` protocol. + + Wraps cognee's V2 memory API: `add_memories` -> `cognee.remember`, + `search_memories` -> `cognee.recall`, `improve` -> `cognee.improve`, + `delete_all_memories` -> `cognee.forget`. + + `session_id` selects the tier — set it to use cognee's session cache (cheap, + no LLM extraction, session-aware recall); leave `None` for the permanent + graph. + + `self_improvement` is forwarded to `cognee.remember` and defaults to `True` + (same as cognee). On the permanent tier it awaits `improve` inline; on the + session tier it schedules `improve` as a fire-and-forget background task. + Set to `False` when you want `improve()` to be the only improve trigger + — otherwise an explicit `improve()` runs improve twice and produces + near-duplicate graph nodes. + + `timeout` (seconds) caps how long any single cognee call may run before + raising `concurrent.futures.TimeoutError`. The default of 300s covers + single-message agent-memory writes comfortably; bulk ingestion of long + documents may need a larger value. + """ + + def __init__( + self, + *, + search_type: CogneeSearchType = "GRAPH_COMPLETION", + top_k: int = 5, + dataset_name: str = "haystack_memory", + session_id: str | None = None, + self_improvement: bool = True, + timeout: float = 300, + ): + """ + Initialize the store. + + :param search_type: Cognee search strategy used by `search_memories`. + :param top_k: Default max results for `search_memories`. + :param dataset_name: Cognee dataset backing this store. + :param session_id: When set, use the session-cache tier; otherwise the permanent graph. + :param self_improvement: Forwarded to `cognee.remember` (default `True`, matches cognee). + Set to `False` when `improve()` should be the only improve trigger. + :param timeout: Per-call timeout in seconds for any cognee operation. + Raise this for bulk ingestion workloads that legitimately need >300s. + """ + self.search_type = search_type + self.top_k = top_k + self.dataset_name = dataset_name + self.session_id = session_id + self.self_improvement = self_improvement + self.timeout = timeout + + def add_memories( + self, + *, + messages: list[ChatMessage], + user_id: str | None = None, + session_id: str | None = None, + ) -> None: + """ + Persist messages via `cognee.remember`. + + Permanent tier batches all texts into one call; session tier writes one + entry per message (matches cognee's session example). Empty messages + are skipped. + + :param messages: Messages to store. + :param user_id: Cognee user UUID; `None` uses cognee's default user. + :param session_id: Per-call override of the store's `session_id`. + """ + texts = [m.text for m in messages if m.text] + if not texts: + return + + target_session = session_id if session_id is not None else self.session_id + + async def _store() -> None: + user = await _resolve_user(user_id) + if target_session is not None: + for text in texts: + await cognee.remember( + text, + dataset_name=self.dataset_name, + session_id=target_session, + user=user, + self_improvement=self.self_improvement, + ) + else: + await cognee.remember( + texts, + dataset_name=self.dataset_name, + user=user, + self_improvement=self.self_improvement, + ) + + _run_sync(_store(), timeout=self.timeout) + logger.info( + "Stored {n} memories in '{ds}' (session={s})", + n=len(texts), + ds=self.dataset_name, + s=target_session, + ) + + def search_memories( + self, + *, + query: str | None = None, + top_k: int | None = None, + user_id: str | None = None, + ) -> list[ChatMessage]: + """ + Search via `cognee.recall` and wrap each hit in a system `ChatMessage`. + + :param query: Natural-language query. Empty/`None` returns `[]`. + :param top_k: Per-call override of the store's default. + :param user_id: Cognee user UUID; `None` uses cognee's default user. + """ + if not query: + return [] + + async def _search() -> list[Any]: + user = await _resolve_user(user_id) + return await cognee.recall( + query, + query_type=cognee.SearchType[self.search_type], + datasets=[self.dataset_name], + top_k=top_k if top_k is not None else self.top_k, + session_id=self.session_id, + user=user, + ) + + results = _run_sync(_search(), timeout=self.timeout) or [] + memories = [ChatMessage.from_system(text) for item in results if (text := _render(item))] + logger.info("Found {n} memories for '{q}'", n=len(memories), q=query[:80]) + return memories + + def improve(self, *, session_id: str | None = None, user_id: str | None = None) -> None: + """ + Promote session-cache content into the permanent graph via `cognee.improve`. + + Without any session_id this is a plain graph-enrichment pass. + + :param session_id: Session to promote; defaults to the store's `session_id`. + :param user_id: Cognee user UUID; `None` uses cognee's default user. + """ + target_session = session_id or self.session_id + + async def _improve() -> None: + user = await _resolve_user(user_id) + await cognee.improve( + dataset=self.dataset_name, + session_ids=[target_session] if target_session else None, + user=user, + ) + + _run_sync(_improve(), timeout=self.timeout) + logger.info("Improved '{ds}' (session={s})", ds=self.dataset_name, s=target_session) + + def delete_all_memories(self, *, user_id: str | None = None) -> None: + """ + Delete this dataset via `cognee.forget(dataset=...)`. + + Session cache survives (sessions aren't dataset-scoped) — use + `cognee.forget(everything=True)` for a full wipe. + """ + + async def _delete() -> None: + user = await _resolve_user(user_id) + await cognee.forget(dataset=self.dataset_name, user=user) + + _run_sync(_delete(), timeout=self.timeout) + logger.info("Deleted '{ds}'", ds=self.dataset_name) + + def to_dict(self) -> dict[str, Any]: + """Serialize this store for pipeline persistence.""" + return default_to_dict( + self, + search_type=self.search_type, + top_k=self.top_k, + dataset_name=self.dataset_name, + session_id=self.session_id, + self_improvement=self.self_improvement, + timeout=self.timeout, + ) + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> "CogneeMemoryStore": + """Deserialize a store from a dict produced by `to_dict`.""" + return default_from_dict(cls, data) diff --git a/integrations/cognee/src/haystack_integrations/memory_stores/py.typed b/integrations/cognee/src/haystack_integrations/memory_stores/py.typed new file mode 100644 index 0000000000..e69de29bb2 diff --git a/integrations/cognee/tests/__init__.py b/integrations/cognee/tests/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/integrations/cognee/tests/conftest.py b/integrations/cognee/tests/conftest.py new file mode 100644 index 0000000000..ce1acf514d --- /dev/null +++ b/integrations/cognee/tests/conftest.py @@ -0,0 +1,19 @@ +# SPDX-FileCopyrightText: 2022-present deepset GmbH +# +# SPDX-License-Identifier: Apache-2.0 + +import logging +import os + +# Set before cognee is imported so setup_logging() picks up WARNING level. +os.environ.setdefault("LOG_LEVEL", "WARNING") + +_NOISY_LOGGERS = ("aiosqlite", "sqlalchemy", "sqlalchemy.engine", "alembic") + + +def _silence_noisy_loggers() -> None: + for name in _NOISY_LOGGERS: + logging.getLogger(name).setLevel(logging.WARNING) + + +_silence_noisy_loggers() diff --git a/integrations/cognee/tests/test_integration.py b/integrations/cognee/tests/test_integration.py new file mode 100644 index 0000000000..6a2c0422b9 --- /dev/null +++ b/integrations/cognee/tests/test_integration.py @@ -0,0 +1,70 @@ +# SPDX-FileCopyrightText: 2022-present deepset GmbH +# +# SPDX-License-Identifier: Apache-2.0 + +import asyncio +import contextlib +import os +import uuid + +import cognee # type: ignore[import-untyped] +import pytest +from cognee.modules.data.exceptions import DatasetNotFoundError # type: ignore[import-untyped] +from haystack.dataclasses import ChatMessage + +from haystack_integrations.components.retrievers.cognee import CogneeRetriever +from haystack_integrations.components.writers.cognee import CogneeWriter +from haystack_integrations.memory_stores.cognee import CogneeMemoryStore + +pytestmark = [ + pytest.mark.integration, + pytest.mark.skipif( + not os.environ.get("LLM_API_KEY"), + reason="Set LLM_API_KEY (cognee's LLM provider key) to run cognee integration tests.", + ), +] + + +@pytest.fixture +def dataset_name() -> str: + # Unique per-run so concurrent CI shards don't collide. + return f"haystack_it_{uuid.uuid4().hex[:8]}" + + +@pytest.fixture(autouse=True) +def _cleanup(dataset_name: str): + yield + # Best-effort — dataset may already be gone. + with contextlib.suppress(Exception): + asyncio.run(cognee.forget(dataset=dataset_name)) + + +class TestCogneeIntegration: + def test_remember_then_recall(self, dataset_name: str): + store = CogneeMemoryStore(dataset_name=dataset_name, search_type="GRAPH_COMPLETION") + store.add_memories(messages=[ChatMessage.from_user("Marie Curie discovered radium in 1898.")]) + + results = store.search_memories(query="Who discovered radium?") + + assert results, "expected at least one memory from cognee" + assert any("curie" in m.text.lower() or "radium" in m.text.lower() for m in results) + + def test_writer_then_retriever_pipeline(self, dataset_name: str): + store = CogneeMemoryStore(dataset_name=dataset_name) + writer = CogneeWriter(memory_store=store) + retriever = CogneeRetriever(memory_store=store, top_k=3) + + writer.run(messages=[ChatMessage.from_user("Ada Lovelace wrote the first computer program.")]) + out = retriever.run(query="Who wrote the first computer program?") + + assert out["messages"], "retriever returned no messages" + assert any("lovelace" in m.text.lower() or "program" in m.text.lower() for m in out["messages"]) + + def test_search_after_forget_raises(self, dataset_name: str): + """Forget actually deletes the dataset (recall against it raises DatasetNotFoundError).""" + store = CogneeMemoryStore(dataset_name=dataset_name) + store.add_memories(messages=[ChatMessage.from_user("Marie Curie discovered radium in 1898.")]) + store.delete_all_memories() + + with pytest.raises(DatasetNotFoundError): + store.search_memories(query="radium") diff --git a/integrations/cognee/tests/test_memory_store.py b/integrations/cognee/tests/test_memory_store.py new file mode 100644 index 0000000000..1ac11756a8 --- /dev/null +++ b/integrations/cognee/tests/test_memory_store.py @@ -0,0 +1,270 @@ +# SPDX-FileCopyrightText: 2022-present deepset GmbH +# +# SPDX-License-Identifier: Apache-2.0 + +from types import SimpleNamespace +from unittest.mock import AsyncMock, MagicMock, patch +from uuid import uuid4 + +from haystack.dataclasses import ChatMessage + +from haystack_integrations.memory_stores.cognee import CogneeMemoryStore + + +def _graph(text: str) -> SimpleNamespace: + return SimpleNamespace(source="graph", text=text) + + +def _session(answer: str, question: str = "") -> SimpleNamespace: + return SimpleNamespace(source="session", answer=answer, question=question) + + +class TestCogneeMemoryStore: + def test_init_defaults(self): + store = CogneeMemoryStore() + assert store.search_type == "GRAPH_COMPLETION" + assert store.top_k == 5 + assert store.dataset_name == "haystack_memory" + assert store.session_id is None + + def test_init_custom(self): + store = CogneeMemoryStore(search_type="CHUNKS", top_k=10, dataset_name="custom", session_id="s1") + assert store.search_type == "CHUNKS" + assert store.top_k == 10 + assert store.dataset_name == "custom" + assert store.session_id == "s1" + + def test_init_default_self_improvement(self): + """Mirrors cognee.remember's default of self_improvement=True.""" + assert CogneeMemoryStore().self_improvement is True + + def test_init_default_timeout(self): + assert CogneeMemoryStore().timeout == 300 + + def test_to_from_dict_roundtrip(self): + store = CogneeMemoryStore( + search_type="SUMMARIES", + top_k=3, + dataset_name="mem", + session_id="abc", + self_improvement=False, + timeout=600, + ) + data = store.to_dict() + assert data["type"] == "haystack_integrations.memory_stores.cognee.memory_store.CogneeMemoryStore" + assert data["init_parameters"] == { + "search_type": "SUMMARIES", + "top_k": 3, + "dataset_name": "mem", + "session_id": "abc", + "self_improvement": False, + "timeout": 600, + } + restored = CogneeMemoryStore.from_dict(data) + assert restored.search_type == "SUMMARIES" + assert restored.top_k == 3 + assert restored.dataset_name == "mem" + assert restored.session_id == "abc" + assert restored.self_improvement is False + assert restored.timeout == 600 + + @patch("haystack_integrations.memory_stores.cognee.memory_store.cognee") + def test_add_memories_batches_permanent_tier(self, mock_cognee): + mock_cognee.remember = AsyncMock() + store = CogneeMemoryStore(dataset_name="my_ds") + + store.add_memories( + messages=[ + ChatMessage.from_user("First fact"), + ChatMessage.from_assistant("Second fact"), + ] + ) + + mock_cognee.remember.assert_awaited_once_with( + ["First fact", "Second fact"], dataset_name="my_ds", user=None, self_improvement=True + ) + + @patch("haystack_integrations.memory_stores.cognee.memory_store.cognee") + def test_add_memories_session_tier_writes_one_per_message(self, mock_cognee): + mock_cognee.remember = AsyncMock() + store = CogneeMemoryStore(dataset_name="ds", session_id="sess1") + + store.add_memories(messages=[ChatMessage.from_user("a"), ChatMessage.from_user("b")]) + + assert mock_cognee.remember.await_count == 2 + first_call = mock_cognee.remember.await_args_list[0] + assert first_call.args == ("a",) + assert first_call.kwargs == { + "dataset_name": "ds", + "session_id": "sess1", + "user": None, + "self_improvement": True, + } + + @patch("haystack_integrations.memory_stores.cognee.memory_store.cognee") + def test_add_memories_self_improvement_false_forwarded(self, mock_cognee): + """`self_improvement=False` flows through to cognee.remember (both tiers).""" + mock_cognee.remember = AsyncMock() + + perm = CogneeMemoryStore(dataset_name="ds", self_improvement=False) + perm.add_memories(messages=[ChatMessage.from_user("a")]) + assert mock_cognee.remember.await_args.kwargs["self_improvement"] is False + + mock_cognee.remember.reset_mock() + sess = CogneeMemoryStore(dataset_name="ds", session_id="s", self_improvement=False) + sess.add_memories(messages=[ChatMessage.from_user("a")]) + assert mock_cognee.remember.await_args.kwargs["self_improvement"] is False + + @patch("haystack_integrations.memory_stores.cognee.memory_store.cognee") + def test_add_memories_session_id_override_writes_to_override(self, mock_cognee): + """`session_id` kwarg routes a permanent-tier store's write into a session.""" + mock_cognee.remember = AsyncMock() + store = CogneeMemoryStore(dataset_name="ds") # no session_id on the store + + store.add_memories(messages=[ChatMessage.from_user("a")], session_id="call_sess") + + mock_cognee.remember.assert_awaited_once_with( + "a", dataset_name="ds", session_id="call_sess", user=None, self_improvement=True + ) + + @patch("haystack_integrations.memory_stores.cognee.memory_store.cognee") + def test_add_memories_session_id_override_beats_store_session(self, mock_cognee): + """Per-call `session_id` wins over `self.session_id`.""" + mock_cognee.remember = AsyncMock() + store = CogneeMemoryStore(dataset_name="ds", session_id="store_sess") + + store.add_memories(messages=[ChatMessage.from_user("a")], session_id="call_sess") + + assert mock_cognee.remember.await_args.kwargs["session_id"] == "call_sess" + + @patch("haystack_integrations.memory_stores.cognee.memory_store.cognee") + def test_add_memories_skips_when_no_text(self, mock_cognee): + mock_cognee.remember = AsyncMock() + store = CogneeMemoryStore() + + store.add_memories(messages=[]) + store.add_memories(messages=[ChatMessage.from_user("")]) + + mock_cognee.remember.assert_not_called() + + @patch("haystack_integrations.memory_stores.cognee.memory_store._resolve_user", new_callable=AsyncMock) + @patch("haystack_integrations.memory_stores.cognee.memory_store.cognee") + def test_add_memories_resolves_user(self, mock_cognee, mock_resolve): + mock_cognee.remember = AsyncMock() + sentinel_user = object() + mock_resolve.return_value = sentinel_user + uid = str(uuid4()) + + store = CogneeMemoryStore(dataset_name="ds") + store.add_memories(messages=[ChatMessage.from_user("hi")], user_id=uid) + + mock_resolve.assert_awaited_once_with(uid) + mock_cognee.remember.assert_awaited_once_with( + ["hi"], dataset_name="ds", user=sentinel_user, self_improvement=True + ) + + @patch("haystack_integrations.memory_stores.cognee.memory_store.cognee") + def test_search_memories_passes_recall_args(self, mock_cognee): + mock_cognee.SearchType = {"GRAPH_COMPLETION": "GRAPH_COMPLETION_ENUM"} + mock_cognee.recall = AsyncMock(return_value=[]) + + store = CogneeMemoryStore(dataset_name="ds", top_k=7, session_id="s") + store.search_memories(query="hello") + + mock_cognee.recall.assert_awaited_once_with( + "hello", + query_type="GRAPH_COMPLETION_ENUM", + datasets=["ds"], + top_k=7, + session_id="s", + user=None, + ) + + @patch("haystack_integrations.memory_stores.cognee.memory_store.cognee") + def test_search_memories_wraps_results_per_source(self, mock_cognee): + mock_cognee.SearchType = {"GRAPH_COMPLETION": MagicMock()} + mock_cognee.recall = AsyncMock( + return_value=[ + _graph("graph hit"), + _session("session answer", "question text"), + SimpleNamespace(source="graph_context", content="ctx blob"), + SimpleNamespace(source="trace", memory_context="trace blob"), + ] + ) + + store = CogneeMemoryStore() + out = store.search_memories(query="q") + + assert [m.text for m in out] == [ + "graph hit", + "session answer", + "ctx blob", + "trace blob", + ] + assert all(isinstance(m, ChatMessage) for m in out) + + @patch("haystack_integrations.memory_stores.cognee.memory_store.cognee") + def test_search_memories_top_k_override(self, mock_cognee): + mock_cognee.SearchType = {"GRAPH_COMPLETION": MagicMock()} + mock_cognee.recall = AsyncMock(return_value=[]) + + store = CogneeMemoryStore(top_k=5) + store.search_memories(query="q", top_k=2) + + assert mock_cognee.recall.await_args.kwargs["top_k"] == 2 + + def test_search_memories_empty_query(self): + store = CogneeMemoryStore() + assert store.search_memories(query=None) == [] + assert store.search_memories(query="") == [] + + @patch("haystack_integrations.memory_stores.cognee.memory_store._resolve_user", new_callable=AsyncMock) + @patch("haystack_integrations.memory_stores.cognee.memory_store.cognee") + def test_search_memories_resolves_user(self, mock_cognee, mock_resolve): + mock_cognee.SearchType = {"GRAPH_COMPLETION": MagicMock()} + mock_cognee.recall = AsyncMock(return_value=[]) + sentinel_user = object() + mock_resolve.return_value = sentinel_user + uid = str(uuid4()) + + store = CogneeMemoryStore() + store.search_memories(query="q", user_id=uid) + + mock_resolve.assert_awaited_once_with(uid) + assert mock_cognee.recall.await_args.kwargs["user"] is sentinel_user + + @patch("haystack_integrations.memory_stores.cognee.memory_store.cognee") + def test_improve_uses_store_session(self, mock_cognee): + mock_cognee.improve = AsyncMock() + + store = CogneeMemoryStore(dataset_name="ds", session_id="s1") + store.improve() + + mock_cognee.improve.assert_awaited_once_with(dataset="ds", session_ids=["s1"], user=None) + + @patch("haystack_integrations.memory_stores.cognee.memory_store.cognee") + def test_improve_overrides_session(self, mock_cognee): + mock_cognee.improve = AsyncMock() + + store = CogneeMemoryStore(dataset_name="ds") + store.improve(session_id="explicit") + + mock_cognee.improve.assert_awaited_once_with(dataset="ds", session_ids=["explicit"], user=None) + + @patch("haystack_integrations.memory_stores.cognee.memory_store.cognee") + def test_improve_no_session_runs_graph_enrichment(self, mock_cognee): + mock_cognee.improve = AsyncMock() + + store = CogneeMemoryStore(dataset_name="ds") + store.improve() + + mock_cognee.improve.assert_awaited_once_with(dataset="ds", session_ids=None, user=None) + + @patch("haystack_integrations.memory_stores.cognee.memory_store.cognee") + def test_delete_all_memories_calls_forget_with_dataset(self, mock_cognee): + mock_cognee.forget = AsyncMock() + + store = CogneeMemoryStore(dataset_name="ds") + store.delete_all_memories() + + mock_cognee.forget.assert_awaited_once_with(dataset="ds", user=None) diff --git a/integrations/cognee/tests/test_retriever.py b/integrations/cognee/tests/test_retriever.py new file mode 100644 index 0000000000..33ae9ea3fc --- /dev/null +++ b/integrations/cognee/tests/test_retriever.py @@ -0,0 +1,94 @@ +# SPDX-FileCopyrightText: 2022-present deepset GmbH +# +# SPDX-License-Identifier: Apache-2.0 + +from unittest.mock import MagicMock + +import pytest +from haystack.dataclasses import ChatMessage + +from haystack_integrations.components.retrievers.cognee import CogneeRetriever +from haystack_integrations.memory_stores.cognee import CogneeMemoryStore + + +class TestCogneeRetriever: + def test_init_requires_memory_store(self): + with pytest.raises(ValueError, match="memory_store must be an instance of CogneeMemoryStore"): + CogneeRetriever(memory_store="not a store") # type: ignore[arg-type] + + def test_init_defaults(self): + store = CogneeMemoryStore() + retriever = CogneeRetriever(memory_store=store) + assert retriever._memory_store is store + assert retriever._top_k is None + + def test_init_with_top_k(self): + store = CogneeMemoryStore() + retriever = CogneeRetriever(memory_store=store, top_k=3) + assert retriever._top_k == 3 + + def test_to_from_dict_roundtrip(self): + store = CogneeMemoryStore(search_type="SUMMARIES", top_k=3, dataset_name="ds") + retriever = CogneeRetriever(memory_store=store, top_k=7) + + data = retriever.to_dict() + assert data["type"] == "haystack_integrations.components.retrievers.cognee.memory_retriever.CogneeRetriever" + assert data["init_parameters"]["top_k"] == 7 + assert ( + data["init_parameters"]["memory_store"]["type"] + == "haystack_integrations.memory_stores.cognee.memory_store.CogneeMemoryStore" + ) + assert data["init_parameters"]["memory_store"]["init_parameters"]["dataset_name"] == "ds" + + restored = CogneeRetriever.from_dict(data) + assert restored._top_k == 7 + assert isinstance(restored._memory_store, CogneeMemoryStore) + assert restored._memory_store.search_type == "SUMMARIES" + assert restored._memory_store.dataset_name == "ds" + + def test_run_delegates_to_store(self): + store = MagicMock(spec=CogneeMemoryStore) + store.search_memories.return_value = [ + ChatMessage.from_system("result one"), + ChatMessage.from_system("result two"), + ] + + retriever = CogneeRetriever(memory_store=store) + out = retriever.run(query="What is Cognee?") + + store.search_memories.assert_called_once_with(query="What is Cognee?", top_k=None, user_id=None) + assert out["messages"] == store.search_memories.return_value + + def test_run_top_k_override_takes_precedence(self): + store = MagicMock(spec=CogneeMemoryStore) + store.search_memories.return_value = [] + + retriever = CogneeRetriever(memory_store=store, top_k=10) + retriever.run(query="q", top_k=2) + + store.search_memories.assert_called_once_with(query="q", top_k=2, user_id=None) + + def test_run_falls_back_to_init_top_k(self): + store = MagicMock(spec=CogneeMemoryStore) + store.search_memories.return_value = [] + + retriever = CogneeRetriever(memory_store=store, top_k=4) + retriever.run(query="q") + + store.search_memories.assert_called_once_with(query="q", top_k=4, user_id=None) + + def test_run_passes_user_id(self): + store = MagicMock(spec=CogneeMemoryStore) + store.search_memories.return_value = [] + + retriever = CogneeRetriever(memory_store=store) + retriever.run(query="q", user_id="user-abc") + + store.search_memories.assert_called_once_with(query="q", top_k=None, user_id="user-abc") + + def test_run_empty_results(self): + store = MagicMock(spec=CogneeMemoryStore) + store.search_memories.return_value = [] + + retriever = CogneeRetriever(memory_store=store) + assert retriever.run(query="nothing here")["messages"] == [] diff --git a/integrations/cognee/tests/test_writer.py b/integrations/cognee/tests/test_writer.py new file mode 100644 index 0000000000..e1866968dd --- /dev/null +++ b/integrations/cognee/tests/test_writer.py @@ -0,0 +1,75 @@ +# SPDX-FileCopyrightText: 2022-present deepset GmbH +# +# SPDX-License-Identifier: Apache-2.0 + +from unittest.mock import MagicMock + +import pytest +from haystack.dataclasses import ChatMessage + +from haystack_integrations.components.writers.cognee import CogneeWriter +from haystack_integrations.memory_stores.cognee import CogneeMemoryStore + + +class TestCogneeWriter: + def test_init_requires_memory_store(self): + with pytest.raises(ValueError, match="memory_store must be an instance of CogneeMemoryStore"): + CogneeWriter(memory_store="not a store") # type: ignore[arg-type] + + def test_init_holds_store(self): + store = CogneeMemoryStore(dataset_name="ds", session_id="s") + writer = CogneeWriter(memory_store=store) + assert writer._memory_store is store + assert writer._session_id is None + + def test_init_with_session_id(self): + store = CogneeMemoryStore(dataset_name="ds") + writer = CogneeWriter(memory_store=store, session_id="override") + assert writer._session_id == "override" + + def test_to_from_dict_roundtrip(self): + store = CogneeMemoryStore(dataset_name="ds", session_id="s") + writer = CogneeWriter(memory_store=store, session_id="writer_sess") + + data = writer.to_dict() + assert data["type"] == "haystack_integrations.components.writers.cognee.memory_writer.CogneeWriter" + assert data["init_parameters"]["session_id"] == "writer_sess" + assert ( + data["init_parameters"]["memory_store"]["type"] + == "haystack_integrations.memory_stores.cognee.memory_store.CogneeMemoryStore" + ) + assert data["init_parameters"]["memory_store"]["init_parameters"]["dataset_name"] == "ds" + + restored = CogneeWriter.from_dict(data) + assert isinstance(restored._memory_store, CogneeMemoryStore) + assert restored._memory_store.dataset_name == "ds" + assert restored._memory_store.session_id == "s" + assert restored._session_id == "writer_sess" + + def test_run_delegates_to_store_and_echoes_messages(self): + store = MagicMock(spec=CogneeMemoryStore) + writer = CogneeWriter(memory_store=store) + + messages = [ChatMessage.from_user("hi"), ChatMessage.from_assistant("hello")] + out = writer.run(messages=messages) + + store.add_memories.assert_called_once_with(messages=messages, user_id=None, session_id=None) + assert out == {"messages_written": messages} + + def test_run_passes_user_id(self): + store = MagicMock(spec=CogneeMemoryStore) + writer = CogneeWriter(memory_store=store) + + messages = [ChatMessage.from_user("hi")] + writer.run(messages=messages, user_id="user-abc") + + store.add_memories.assert_called_once_with(messages=messages, user_id="user-abc", session_id=None) + + def test_run_forwards_writer_session_id(self): + store = MagicMock(spec=CogneeMemoryStore) + writer = CogneeWriter(memory_store=store, session_id="writer_sess") + + messages = [ChatMessage.from_user("hi")] + writer.run(messages=messages) + + store.add_memories.assert_called_once_with(messages=messages, user_id=None, session_id="writer_sess")