feat: Add Cognee integration#2979
Conversation
davidsbatista
left a comment
There was a problem hiding this comment.
@hande-k thank you for this contribution!
I left some initial comments/suggestions for improvements.
|
Thanks for the review @davidsbatista & @sjrl! I've addressed all the comments. A couple of notes:
Let me know if anything needs further adjustment! |
|
Hey @hande-k I wanted to provide you with an example script of how I imagined the cognee integration to work with Haystack. The main use case for us is to enable memories for our Agent component so I think the Retriever and Writer components need to be reworked to return and accept list of ChatMessages respectively. Here is a demo script I tried out that assumes these changes to the components have already been made. Feel free to include this in the examples folder if you'd like. #!/usr/bin/env python
"""
Demo: Memory-Augmented Agent with CogneeRetriever and CogneeWriter
Shows the core memory-agent loop where Cognee enriches every conversation
turn with memories from past sessions alongside the live chat history:
1. Pre-seeded memories: facts from previous sessions are loaded into Cognee
before the conversation starts, simulating persistent long-term memory.
2. Before each turn: CogneeRetriever fetches relevant memories and the
OutputAdapter prepends them to the live chat history + current user message.
3. The Agent processes the full context (memories + history + user message).
4. After each turn: CogneeWriter stores the agent's output messages so future
sessions can recall what happened in this one.
Pipeline structure (per turn):
query ──► CogneeRetriever ──► memories (list[ChatMessage]) ──┐
├──► OutputAdapter
history + user_message (pipeline inputs) ─────────────────────┘
│
Agent ──► messages, last_message
│
CogneeWriter ──► messages_written
NOTE: This demo assumes CogneeRetriever and CogneeWriter have been updated to
work with ChatMessages rather than Documents. See the review comments for the
required changes.
Prerequisites:
pip install -e "integrations/cognee"
Set your LLM and vector DB keys (Cognee uses them internally):
export OPENAI_API_KEY="sk-..."
"""
import logging
import os
# Must be set before importing cognee — its setup_logging() reads LOG_LEVEL at
# import time and installs structlog handlers that bypass standard logging config.
os.environ.setdefault("LOG_LEVEL", "WARNING")
import asyncio
import cognee
from haystack import Pipeline
logging.basicConfig(level=logging.WARNING)
from haystack.components.agents import Agent
from haystack.components.converters import OutputAdapter
from haystack.components.generators.chat import OpenAIChatGenerator
from haystack.dataclasses import ChatMessage
from haystack_integrations.components.retrievers.cognee import CogneeRetriever
from haystack_integrations.components.writers.cognee import CogneeWriter
from haystack_integrations.memory_stores.cognee import CogneeMemoryStore
DATASET_NAME = "agent_memory_demo"
# Facts that would have been stored in a previous session.
SEEDED_MEMORIES = [
"My name is Alice. I'm a senior data scientist at Acme Corp specialising in NLP and knowledge graphs.",
"My current project is building an internal documentation search system powered by Haystack and Cognee.",
"My team: Bob is the ML engineer and Carol handles infrastructure.",
"I prefer concise answers with Python code examples over long prose explanations.",
]
SYSTEM_PROMPT = (
"You are a helpful assistant with access to a persistent memory of past conversations. "
"Any system messages at the start of the conversation contain relevant memories retrieved "
"from previous interactions — use them to personalise your responses and maintain continuity "
"across turns. "
"Be concise. Prefer short answers and Python code examples over long prose unless the user "
"asks for more detail."
)
def build_memory_agent_pipeline(store: CogneeMemoryStore) -> Pipeline:
"""
Wire the memory-augmented agent pipeline.
The OutputAdapter merges three inputs into one flat ChatMessage list for
the Agent:
- memories: retrieved from Cognee by CogneeRetriever
- history: accumulated chat messages from previous turns
- user_messages: the current user message
unsafe=True is required so that the Jinja NativeEnvironment returns the
concatenated list as a real Python object rather than its string representation.
"""
pipeline = Pipeline()
pipeline.add_component("retriever", CogneeRetriever(memory_store=store))
pipeline.add_component(
"injector",
OutputAdapter(
template="{{ memories + history + user_messages }}",
output_type=list[ChatMessage],
unsafe=True,
),
)
pipeline.add_component(
"agent",
Agent(
chat_generator=OpenAIChatGenerator(model="gpt-4o-mini"),
system_prompt=SYSTEM_PROMPT,
),
)
pipeline.add_component("writer", CogneeWriter(dataset_name=store.dataset_name))
pipeline.connect("retriever.messages", "injector.memories")
pipeline.connect("injector.output", "agent.messages")
pipeline.connect("agent.messages", "writer.messages")
return pipeline
def run_turn(
pipeline: Pipeline,
user_text: str,
history: list[ChatMessage],
) -> str:
"""
Run one conversation turn, update history in-place, and return the agent reply.
History is maintained outside the pipeline so it contains only the clean
user/assistant exchange — not the memory context messages, which are
injected fresh on every turn.
"""
result = pipeline.run(
{
"retriever": {"query": user_text},
"injector": {
"history": history,
"user_messages": [ChatMessage.from_user(user_text)],
},
}
)
last_message = result["agent"]["last_message"]
reply = last_message.text or "(no reply)"
# Append this turn to history so the next turn has full conversational context.
history.append(ChatMessage.from_user(user_text))
history.append(last_message)
return reply
async def main():
print("=== Cognee Memory Agent Pipeline Demo ===\n")
print("Pruning previous data for a clean start...")
await cognee.prune.prune_data()
await cognee.prune.prune_system(metadata=True)
print("Done.\n")
# -------------------------------------------------------------------------
# Pre-seed long-term memories from a simulated previous session.
# In a real application these would already be in Cognee from earlier runs.
# -------------------------------------------------------------------------
print("Seeding memories from a previous session...")
store = CogneeMemoryStore(search_type="GRAPH_COMPLETION", dataset_name=DATASET_NAME)
store.add_memories(messages=[ChatMessage.from_user(fact) for fact in SEEDED_MEMORIES])
print(f" {len(SEEDED_MEMORIES)} facts stored.\n")
pipeline = build_memory_agent_pipeline(store)
history: list[ChatMessage] = []
# -------------------------------------------------------------------------
# Multi-turn conversation. The Agent receives retrieved memories + live
# history + the current message on every turn, so it can both recall past
# facts and follow the thread of the current exchange.
# -------------------------------------------------------------------------
turns = [
# Turn 1 — recall a specific memory (no history yet)
"Hi! Can you remind me what project I'm currently working on?",
# Turn 2 — follow-up that requires turn 1 to make sense
"What's the tech stack we're using for it?",
# Turn 3 — recall a different memory fact
"Who else is on my team, and what are their roles?",
# Turn 4 — requires both a recalled preference *and* conversation context
"Based on what you know about me, give me a quick tip for structuring a new Haystack pipeline component.",
]
for user_text in turns:
print(f"User: {user_text}")
reply = run_turn(pipeline, user_text, history)
print(f"Agent: {reply}\n")
print("=== Done ===")
if __name__ == "__main__":
asyncio.run(main()) |
|
Hey @hande-k soft ping. |
|
Thanks @sjrl for the patience on this one! cognee 1.0 shipping mid-review meant rebuilding the integration on the new apis. It is simpler and the agent-memory shape from your demo is the default path now. Appreciate the thorough review! let me know what else needs to be done |
|
Hey @hande-k just two last comments
Otherwise it looks ready to go! |
hey @sjrl , the first one is done! For the SQL error, I couldn't reproduce on my end (cognee 1.0.9, Python 3.12, macOS). It could be due to a stale ~/.cognee/ from a pre-1.0 install, |
|
@hande-k I was able to figure it out. I had langfuse configured via my .env file and it looks like trying to run the demo with the langfuse observability turned on then it seems to result in a SQL error when "langfuse<3.0.0" is installed. I removed the dep and the .env file and the demo worked! |
sjrl
left a comment
There was a problem hiding this comment.
Thanks for the contribution! I'll let you know once we have this released.
Closes https://github.com/deepset-ai/haystack-private/issues/240
Summary
CogneeWriter,CogneeCognifier,CogneeRetriever, andCogneeMemoryStoreCogneeWriteringests Haystack Documents into Cognee memory viacognee.add()+ optionalcognee.cognify()CogneeRetrieversearches Cognee's memory and returns Haystack DocumentsCogneeCognifierwrapscognee.cognify()as a standalone pipeline stepCogneeMemoryStoreimplements theMemoryStoreprotocol from haystack-experimental for use with Haystack's experimental AgentTest plan
hatch run test:unithatch run fmt-checkhatch run test:typesdemo_pipeline.py,demo_memory_agent.py)