Skip to content

feat: Add Cognee integration#2979

Merged
sjrl merged 18 commits into
deepset-ai:mainfrom
hande-k:feat/add-cognee
May 28, 2026
Merged

feat: Add Cognee integration#2979
sjrl merged 18 commits into
deepset-ai:mainfrom
hande-k:feat/add-cognee

Conversation

@hande-k

@hande-k hande-k commented Mar 18, 2026

Copy link
Copy Markdown
Contributor

Closes https://github.com/deepset-ai/haystack-private/issues/240

Summary

  • Adds Cognee integration with 4 components: CogneeWriter, CogneeCognifier, CogneeRetriever, and CogneeMemoryStore
  • CogneeWriter ingests Haystack Documents into Cognee memory via cognee.add() + optional cognee.cognify()
  • CogneeRetriever searches Cognee's memory and returns Haystack Documents
  • CogneeCognifier wraps cognee.cognify() as a standalone pipeline step
  • CogneeMemoryStore implements the MemoryStore protocol from haystack-experimental for use with Haystack's experimental Agent

Test plan

  • Unit tests pass via hatch run test:unit
  • Linting passes via hatch run fmt-check
  • Type checking passes via hatch run test:types
  • Demo scripts tested manually (demo_pipeline.py, demo_memory_agent.py)

@hande-k hande-k requested a review from a team as a code owner March 18, 2026 12:27
@hande-k hande-k requested review from davidsbatista and removed request for a team March 18, 2026 12:27
@github-actions github-actions Bot added topic:CI type:documentation Improvements or additions to documentation labels Mar 18, 2026
@hande-k hande-k changed the title add fix feat: Add Cognee integration Mar 18, 2026
@CLAassistant

CLAassistant commented Mar 18, 2026

Copy link
Copy Markdown

CLA assistant check
All committers have signed the CLA.

@davidsbatista davidsbatista left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hande-k thank you for this contribution!

I left some initial comments/suggestions for improvements.

Comment thread integrations/cognee/pyproject.toml Outdated
Comment thread integrations/cognee/CHANGELOG.md Outdated
Comment thread integrations/cognee/.gitignore Outdated
Comment thread integrations/cognee/README.md
@hande-k

hande-k commented Mar 21, 2026

Copy link
Copy Markdown
Contributor Author

Thanks for the review @davidsbatista & @sjrl! I've addressed all the comments. A couple of notes:

  • py.typed: Since the components are now split across components/retrievers/, components/writers/,
    and memory_stores/, I added py.typed markers at the parent level for each (matching the pattern used
    by other multi-package integrations).
  • dataset_name on CogneeCognifier: Added str | list[str] | None support so it can target one or multiple
    datasets

Let me know if anything needs further adjustment!

Comment thread .github/workflows/cognee.yml Outdated
Comment thread .github/workflows/cognee.yml
Comment thread .github/workflows/cognee.yml
Comment thread integrations/cognee/pyproject.toml Outdated
Comment thread integrations/cognee/examples/demo_memory_agent.py
@sjrl

sjrl commented Apr 23, 2026

Copy link
Copy Markdown
Contributor

Hey @hande-k I wanted to provide you with an example script of how I imagined the cognee integration to work with Haystack. The main use case for us is to enable memories for our Agent component so I think the Retriever and Writer components need to be reworked to return and accept list of ChatMessages respectively.

Here is a demo script I tried out that assumes these changes to the components have already been made. Feel free to include this in the examples folder if you'd like.

#!/usr/bin/env python
"""
Demo: Memory-Augmented Agent with CogneeRetriever and CogneeWriter

Shows the core memory-agent loop where Cognee enriches every conversation
turn with memories from past sessions alongside the live chat history:

1. Pre-seeded memories: facts from previous sessions are loaded into Cognee
   before the conversation starts, simulating persistent long-term memory.
2. Before each turn: CogneeRetriever fetches relevant memories and the
   OutputAdapter prepends them to the live chat history + current user message.
3. The Agent processes the full context (memories + history + user message).
4. After each turn: CogneeWriter stores the agent's output messages so future
   sessions can recall what happened in this one.

Pipeline structure (per turn):

    query ──► CogneeRetriever ──► memories (list[ChatMessage])  ──┐
                                                                   ├──► OutputAdapter 
    history + user_message (pipeline inputs) ─────────────────────┘

                                                                      Agent ──► messages, last_message

                                                                               CogneeWriter ──► messages_written

NOTE: This demo assumes CogneeRetriever and CogneeWriter have been updated to
work with ChatMessages rather than Documents. See the review comments for the
required changes.

Prerequisites:
    pip install -e "integrations/cognee"

Set your LLM and vector DB keys (Cognee uses them internally):
    export OPENAI_API_KEY="sk-..."
"""

import logging
import os

# Must be set before importing cognee — its setup_logging() reads LOG_LEVEL at
# import time and installs structlog handlers that bypass standard logging config.
os.environ.setdefault("LOG_LEVEL", "WARNING")

import asyncio

import cognee

from haystack import Pipeline

logging.basicConfig(level=logging.WARNING)
from haystack.components.agents import Agent
from haystack.components.converters import OutputAdapter
from haystack.components.generators.chat import OpenAIChatGenerator
from haystack.dataclasses import ChatMessage

from haystack_integrations.components.retrievers.cognee import CogneeRetriever
from haystack_integrations.components.writers.cognee import CogneeWriter
from haystack_integrations.memory_stores.cognee import CogneeMemoryStore

DATASET_NAME = "agent_memory_demo"

# Facts that would have been stored in a previous session.
SEEDED_MEMORIES = [
    "My name is Alice. I'm a senior data scientist at Acme Corp specialising in NLP and knowledge graphs.",
    "My current project is building an internal documentation search system powered by Haystack and Cognee.",
    "My team: Bob is the ML engineer and Carol handles infrastructure.",
    "I prefer concise answers with Python code examples over long prose explanations.",
]

SYSTEM_PROMPT = (
    "You are a helpful assistant with access to a persistent memory of past conversations. "
    "Any system messages at the start of the conversation contain relevant memories retrieved "
    "from previous interactions — use them to personalise your responses and maintain continuity "
    "across turns. "
    "Be concise. Prefer short answers and Python code examples over long prose unless the user "
    "asks for more detail."
)


def build_memory_agent_pipeline(store: CogneeMemoryStore) -> Pipeline:
    """
    Wire the memory-augmented agent pipeline.

    The OutputAdapter merges three inputs into one flat ChatMessage list for
    the Agent:
      - memories:      retrieved from Cognee by CogneeRetriever
      - history:       accumulated chat messages from previous turns
      - user_messages: the current user message

    unsafe=True is required so that the Jinja NativeEnvironment returns the
    concatenated list as a real Python object rather than its string representation.
    """
    pipeline = Pipeline()

    pipeline.add_component("retriever", CogneeRetriever(memory_store=store))
    pipeline.add_component(
        "injector",
        OutputAdapter(
            template="{{ memories + history + user_messages }}",
            output_type=list[ChatMessage],
            unsafe=True,
        ),
    )
    pipeline.add_component(
        "agent",
        Agent(
            chat_generator=OpenAIChatGenerator(model="gpt-4o-mini"),
            system_prompt=SYSTEM_PROMPT,
        ),
    )
    pipeline.add_component("writer", CogneeWriter(dataset_name=store.dataset_name))

    pipeline.connect("retriever.messages", "injector.memories")
    pipeline.connect("injector.output", "agent.messages")
    pipeline.connect("agent.messages", "writer.messages")

    return pipeline


def run_turn(
    pipeline: Pipeline,
    user_text: str,
    history: list[ChatMessage],
) -> str:
    """
    Run one conversation turn, update history in-place, and return the agent reply.

    History is maintained outside the pipeline so it contains only the clean
    user/assistant exchange — not the memory context messages, which are
    injected fresh on every turn.
    """
    result = pipeline.run(
        {
            "retriever": {"query": user_text},
            "injector": {
                "history": history,
                "user_messages": [ChatMessage.from_user(user_text)],
            },
        }
    )
    last_message = result["agent"]["last_message"]
    reply = last_message.text or "(no reply)"

    # Append this turn to history so the next turn has full conversational context.
    history.append(ChatMessage.from_user(user_text))
    history.append(last_message)

    return reply


async def main():
    print("=== Cognee Memory Agent Pipeline Demo ===\n")

    print("Pruning previous data for a clean start...")
    await cognee.prune.prune_data()
    await cognee.prune.prune_system(metadata=True)
    print("Done.\n")

    # -------------------------------------------------------------------------
    # Pre-seed long-term memories from a simulated previous session.
    # In a real application these would already be in Cognee from earlier runs.
    # -------------------------------------------------------------------------
    print("Seeding memories from a previous session...")
    store = CogneeMemoryStore(search_type="GRAPH_COMPLETION", dataset_name=DATASET_NAME)
    store.add_memories(messages=[ChatMessage.from_user(fact) for fact in SEEDED_MEMORIES])
    print(f"  {len(SEEDED_MEMORIES)} facts stored.\n")

    pipeline = build_memory_agent_pipeline(store)
    history: list[ChatMessage] = []

    # -------------------------------------------------------------------------
    # Multi-turn conversation. The Agent receives retrieved memories + live
    # history + the current message on every turn, so it can both recall past
    # facts and follow the thread of the current exchange.
    # -------------------------------------------------------------------------
    turns = [
        # Turn 1 — recall a specific memory (no history yet)
        "Hi! Can you remind me what project I'm currently working on?",
        # Turn 2 — follow-up that requires turn 1 to make sense
        "What's the tech stack we're using for it?",
        # Turn 3 — recall a different memory fact
        "Who else is on my team, and what are their roles?",
        # Turn 4 — requires both a recalled preference *and* conversation context
        "Based on what you know about me, give me a quick tip for structuring a new Haystack pipeline component.",
    ]

    for user_text in turns:
        print(f"User:  {user_text}")
        reply = run_turn(pipeline, user_text, history)
        print(f"Agent: {reply}\n")

    print("=== Done ===")


if __name__ == "__main__":
    asyncio.run(main())

@sjrl

sjrl commented May 4, 2026

Copy link
Copy Markdown
Contributor

Hey @hande-k soft ping.

Comment thread integrations/cognee/pyproject.toml
Comment thread integrations/cognee/tests/test_integration.py Outdated
Comment thread integrations/cognee/tests/__init__.py
@hande-k

hande-k commented May 15, 2026

Copy link
Copy Markdown
Contributor Author

Thanks @sjrl for the patience on this one! cognee 1.0 shipping mid-review meant rebuilding the integration on the new apis. It is simpler and the agent-memory shape from your demo is the default path now. Appreciate the thorough review! let me know what else needs to be done

@sjrl

sjrl commented May 20, 2026

Copy link
Copy Markdown
Contributor

Hey @hande-k just two last comments

Otherwise it looks ready to go!

@hande-k

hande-k commented May 26, 2026

Copy link
Copy Markdown
Contributor Author

Hey @hande-k just two last comments

Otherwise it looks ready to go!

hey @sjrl , the first one is done! For the SQL error, I couldn't reproduce on my end (cognee 1.0.9, Python 3.12, macOS). It could be due to a stale ~/.cognee/ from a pre-1.0 install, rm -rf ~/.cognee/ would clear that but happy to look at the error if if you could share the trace with the cognee version

@sjrl

sjrl commented May 28, 2026

Copy link
Copy Markdown
Contributor

@hande-k I was able to figure it out. I had langfuse configured via my .env file and it looks like trying to run the demo with the langfuse observability turned on then it seems to result in a SQL error when "langfuse<3.0.0" is installed. I removed the dep and the .env file and the demo worked!

@sjrl sjrl left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the contribution! I'll let you know once we have this released.

@sjrl sjrl merged commit 018470d into deepset-ai:main May 28, 2026
17 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

integration:cognee topic:CI type:documentation Improvements or additions to documentation

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants