feat: add PostgreSQL/pgvector backend#86
Conversation
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughRefactors entity update flow into a template method on BaseEntityBackend with per-backend CRUD hooks; implements those hooks in Filesystem and Milvus; adds a new PostgresEntityBackend (pgvector, per-namespace tables, embeddings); adds serialize/deserialize helpers, Postgres config, client wiring, deps, and tests. Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant Base as BaseEntityBackend
participant Backend as ConcreteBackend
participant Storage as StorageLayer
Client->>Base: update_entities(namespace_id, entities)
Base->>Base: validate namespace & inputs
Base->>Base: build RecordedEntity prototypes
rect rgba(100,150,200,0.5)
Note over Base,Backend: Optional conflict resolution step
Base->>Backend: fetch existing entities
Backend->>Storage: query existing data
Storage-->>Backend: existing entities
Backend-->>Base: return existing entities
Base->>Base: resolve conflicts
end
rect rgba(150,100,200,0.5)
Note over Base,Backend: Apply mutations via hooks
Base->>Backend: _add_entity(...)
Backend->>Storage: INSERT / persist embedding
Storage-->>Backend: new id
Backend-->>Base: id
Base->>Backend: _update_entity(...)
Backend->>Storage: UPDATE row / embedding
Storage-->>Backend: ok
Base->>Backend: _delete_entity(...)
Backend->>Storage: DELETE row
Storage-->>Backend: ok
end
Base->>Backend: _post_update(namespace_id)
Backend->>Storage: commit / persist final state
Storage-->>Backend: ok
Base-->>Client: return list[EntityUpdate]
Estimated code review effort🎯 4 (Complex) | ⏱️ ~75 minutes Possibly related PRs
Suggested reviewers
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Tip Try Coding Plans. Let us write the prompt for your AI agent so you can ship faster (with fewer bugs). Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 6
🧹 Nitpick comments (2)
kaizen/backend/base.py (1)
16-20: Make non-string serialization deterministic.At Line 20, plain
json.dumps(content)can produce order-dependent strings for dict-like payloads. Stable dumps improve consistency for conflict-resolution/search inputs.Proposed refactor
def serialize_content(content) -> str: """Serialize content to a string for storage.""" if isinstance(content, str): return content - return json.dumps(content) + return json.dumps(content, sort_keys=True, ensure_ascii=False)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@kaizen/backend/base.py` around lines 16 - 20, The serialize_content function returns json.dumps(content) for non-strings which can be non-deterministic for dict-like inputs; update serialize_content to produce stable, deterministic JSON by calling json.dumps with deterministic options (e.g., sort_keys=True and a compact separators/encoding policy such as separators=(",", ":"), ensure_ascii=False) so the serialized string is consistent across runs for the same data structure.kaizen/backend/milvus.py (1)
288-289: Avoid redundant namespace checks in_delete_entityhook.
BaseEntityBackend.update_entitiesalready validates the namespace once before mutations. Callingdelete_entity_by_idhere re-runs_validate_namespaceper entity delete and adds avoidable overhead.♻️ Proposed refactor
def _delete_entity(self, namespace_id: str, entity_id: str) -> None: - self.delete_entity_by_id(namespace_id=namespace_id, entity_id=entity_id) + try: + entity_id_int = int(entity_id) + except ValueError as exc: + raise KaizenException(f"Invalid entity ID: {entity_id}. Entity IDs must be numeric.") from exc + self.milvus.delete(collection_name=namespace_id, ids=[entity_id_int])🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@kaizen/backend/milvus.py` around lines 288 - 289, The _delete_entity method redundantly calls delete_entity_by_id which re-runs _validate_namespace already performed by BaseEntityBackend.update_entities; remove that extra validation by replacing the delete_entity_by_id call with a direct lower-level delete operation (or introduce/use a private helper that performs deletion without namespace validation), e.g., call the storage-level delete helper used by delete_entity_by_id or add a new private method like _delete_entity_no_validation and invoke it from _delete_entity so namespace validation is not repeated; keep references to _delete_entity, delete_entity_by_id, BaseEntityBackend.update_entities, and _validate_namespace when making the change.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@kaizen/backend/filesystem.py`:
- Around line 177-180: The code sets self._active_data inside update_entities
(using self._lock and self._load_namespace_data) but never clears it on error,
allowing stale state to leak; wrap the body that sets self._active_data and
calls super().update_entities in a try/finally that always clears/reset
self._active_data (and releases any per-call state) and call _post_update only
on success, and mirror the same lifecycle guard for the early-return path (the
lock-free short-circuit at lines ~251-253) so that any temporary _active_data is
cleared in all code paths.
In `@kaizen/backend/postgres.py`:
- Around line 56-67: The constructor currently resolves config into the local
variable settings but then mistakenly uses the global postgres_db_settings for
other setup steps; update all uses to reference the resolved settings variable
instead of postgres_db_settings—specifically replace postgres_db_settings when
constructing self.embedding_model (SentenceTransformer(...)) and any
reporting/logging of DB details (the code around the embedding_model assignment
and the report block referenced at lines ~98-100) so they consistently read
host/port/dbname/embedding_model from settings; ensure no remaining references
to the global postgres_db_settings remain in the initialization path.
- Around line 123-124: The Postgres backend is instantiating SQLiteManager with
defaults, ignoring the configured SQLite path; update all places where
SQLiteManager() is called in this file (e.g., the create_namespace call and the
other occurrences around the indicated ranges) to pass the configured URI (use
SQLiteManager(self.sqlite_uri)) so namespace metadata uses the same configurable
SQLite DB path as kaizen/backend/milvus.py; ensure the constructor call matches
SQLiteManager's signature and update all four occurrences (including the
create_namespace call and the other instances at the noted ranges).
- Around line 179-180: The current return of the sentinel string "0" when
cur.fetchone() yields no row hides insertion failures; replace that sentinel
with a clear error path: detect when row is None and raise a descriptive
exception (e.g., RuntimeError or custom InsertError) that includes context
(table/name/parameters or the executed SQL) so callers don't receive an invalid
ID. Update the logic around cur.fetchone() (the row variable and its handling)
to raise the error instead of returning "0" and ensure callers handle or
propagate that exception.
In `@kaizen/frontend/client/kaizen_client.py`:
- Around line 35-43: The branch in KaizenClient that currently checks
self.config.backend == "pgvector" must be changed to the actual backend name
"postgres" and the settings import fixed: replace import of PostgresDBSettings
from kaizen.config.pgvector with kaizen.config.postgres so the type check and
instantiation work; update the branch condition and the import for
PostgresDBSettings while keeping the existing TypeError check and instantiation
of PostgresEntityBackend(self.config.settings).
---
Nitpick comments:
In `@kaizen/backend/base.py`:
- Around line 16-20: The serialize_content function returns json.dumps(content)
for non-strings which can be non-deterministic for dict-like inputs; update
serialize_content to produce stable, deterministic JSON by calling json.dumps
with deterministic options (e.g., sort_keys=True and a compact
separators/encoding policy such as separators=(",", ":"), ensure_ascii=False) so
the serialized string is consistent across runs for the same data structure.
In `@kaizen/backend/milvus.py`:
- Around line 288-289: The _delete_entity method redundantly calls
delete_entity_by_id which re-runs _validate_namespace already performed by
BaseEntityBackend.update_entities; remove that extra validation by replacing the
delete_entity_by_id call with a direct lower-level delete operation (or
introduce/use a private helper that performs deletion without namespace
validation), e.g., call the storage-level delete helper used by
delete_entity_by_id or add a new private method like
_delete_entity_no_validation and invoke it from _delete_entity so namespace
validation is not repeated; keep references to _delete_entity,
delete_entity_by_id, BaseEntityBackend.update_entities, and _validate_namespace
when making the change.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 5615151a-fa84-4b58-beee-8487790b625d
⛔ Files ignored due to path filters (1)
uv.lockis excluded by!**/*.lock
📒 Files selected for processing (10)
kaizen/backend/base.pykaizen/backend/filesystem.pykaizen/backend/milvus.pykaizen/backend/postgres.pykaizen/config/kaizen.pykaizen/config/postgres.pykaizen/frontend/client/kaizen_client.pypyproject.tomltests/unit/test_milvus_backend.pytests/unit/test_postgres_backend.py
| settings = config if isinstance(config, type(postgres_db_settings)) else postgres_db_settings | ||
| self.conn = psycopg.connect( | ||
| host=settings.host, | ||
| port=settings.port, | ||
| user=settings.user, | ||
| password=settings.password, | ||
| dbname=settings.dbname, | ||
| autocommit=True, | ||
| ) | ||
| register_vector(self.conn) | ||
| self._ensure_pgvector_extension() | ||
| self.embedding_model = SentenceTransformer(postgres_db_settings.embedding_model) |
There was a problem hiding this comment.
Resolved config is not consistently used.
Lines 67 and 100 read from global postgres_db_settings instead of the resolved settings, so a passed config can be silently ignored for embedding model and reported details.
🐛 Proposed fix
class PostgresEntityBackend(BaseEntityBackend):
@@
def __init__(self, config: BaseSettings | None = None):
super().__init__(config)
settings = config if isinstance(config, type(postgres_db_settings)) else postgres_db_settings
+ self.settings = settings
self.conn = psycopg.connect(
- host=settings.host,
- port=settings.port,
- user=settings.user,
- password=settings.password,
- dbname=settings.dbname,
+ host=self.settings.host,
+ port=self.settings.port,
+ user=self.settings.user,
+ password=self.settings.password,
+ dbname=self.settings.dbname,
autocommit=True,
)
@@
- self.embedding_model = SentenceTransformer(postgres_db_settings.embedding_model)
+ self.embedding_model = SentenceTransformer(self.settings.embedding_model)
@@
def details(self) -> dict:
"""Return details about the backend."""
- return {"backend": "postgres", "host": postgres_db_settings.host, "port": postgres_db_settings.port}
+ return {"backend": "postgres", "host": self.settings.host, "port": self.settings.port}Also applies to: 98-100
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@kaizen/backend/postgres.py` around lines 56 - 67, The constructor currently
resolves config into the local variable settings but then mistakenly uses the
global postgres_db_settings for other setup steps; update all uses to reference
the resolved settings variable instead of postgres_db_settings—specifically
replace postgres_db_settings when constructing self.embedding_model
(SentenceTransformer(...)) and any reporting/logging of DB details (the code
around the embedding_model assignment and the report block referenced at lines
~98-100) so they consistently read host/port/dbname/embedding_model from
settings; ensure no remaining references to the global postgres_db_settings
remain in the initialization path.
| for k, v in filters.items(): | ||
| where_parts.append(sql.SQL("{} = %s").format(sql.Identifier(k))) | ||
| params.append(v) | ||
| where_clause = sql.SQL(" AND ").join(where_parts) if where_parts else sql.SQL("TRUE") |
There was a problem hiding this comment.
Filter key handling can fail on non-column keys and misses metadata filtering semantics.
Every filter key is treated as a direct column identifier. Keys like metadata.foo (or any unknown key) will generate invalid SQL instead of filtering metadata safely.
🐛 Proposed fix
- for k, v in filters.items():
- where_parts.append(sql.SQL("{} = %s").format(sql.Identifier(k)))
- params.append(v)
+ schema_fields = {"id", "type", "content", "created_at"}
+ for k, v in filters.items():
+ if k in schema_fields:
+ where_parts.append(sql.SQL("{} = %s").format(sql.Identifier(k)))
+ params.append(v)
+ else:
+ meta_key = k.split(".", 1)[1] if k.startswith("metadata.") else k
+ where_parts.append(sql.SQL("metadata ->> %s = %s"))
+ params.extend([meta_key, str(v)])📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| for k, v in filters.items(): | |
| where_parts.append(sql.SQL("{} = %s").format(sql.Identifier(k))) | |
| params.append(v) | |
| where_clause = sql.SQL(" AND ").join(where_parts) if where_parts else sql.SQL("TRUE") | |
| schema_fields = {"id", "type", "content", "created_at"} | |
| for k, v in filters.items(): | |
| if k in schema_fields: | |
| where_parts.append(sql.SQL("{} = %s").format(sql.Identifier(k))) | |
| params.append(v) | |
| else: | |
| meta_key = k.split(".", 1)[1] if k.startswith("metadata.") else k | |
| where_parts.append(sql.SQL("metadata ->> %s = %s")) | |
| params.extend([meta_key, str(v)]) | |
| where_clause = sql.SQL(" AND ").join(where_parts) if where_parts else sql.SQL("TRUE") |
There was a problem hiding this comment.
Actionable comments posted: 1
♻️ Duplicate comments (5)
kaizen/backend/postgres.py (4)
123-124:⚠️ Potential issue | 🟠 MajorPass the configured SQLite URI to
SQLiteManagerfor namespace metadata calls.These calls currently use
SQLiteManager()defaults, which can bypass configured namespace storage.Proposed fix
import datetime import json import logging +import os import uuid @@ def __init__(self, config: BaseSettings | None = None): super().__init__(config) settings = config if isinstance(config, type(postgres_db_settings)) else postgres_db_settings + self.sqlite_uri = os.getenv("KAIZEN_SQLITE_PATH") or getattr(settings, "sqlite_uri", None) @@ - with SQLiteManager() as db_manager: + with SQLiteManager(self.sqlite_uri) as db_manager: return db_manager.create_namespace(namespace_id) @@ - with SQLiteManager() as db_manager: + with SQLiteManager(self.sqlite_uri) as db_manager: namespace = db_manager.get_namespace(namespace_id) @@ - with SQLiteManager() as db_manager: + with SQLiteManager(self.sqlite_uri) as db_manager: namespaces = [] @@ - with SQLiteManager() as db_manager: + with SQLiteManager(self.sqlite_uri) as db_manager: db_manager.delete_namespace(namespace_id)Also applies to: 130-131, 142-144, 162-163
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@kaizen/backend/postgres.py` around lines 123 - 124, The namespace metadata calls instantiate SQLiteManager with no args which ignores the configured SQLite URI; update all usages (e.g., the call that returns db_manager.create_namespace(namespace_id) and similar blocks at the other occurrences) to construct SQLiteManager with the configured URI/connection string (pass the same URI variable or config getter used elsewhere in this module) so namespace operations use the configured namespace storage backend; ensure the constructor argument matches the SQLiteManager signature and propagate the same change to the other occurrences at the indicated sites.
56-67:⚠️ Potential issue | 🟠 MajorUse the resolved config consistently (
settings), not the global singleton.Line 67 and Line 100 still read from
postgres_db_settings, so a passed config can be partially ignored.Proposed fix
def __init__(self, config: BaseSettings | None = None): super().__init__(config) settings = config if isinstance(config, type(postgres_db_settings)) else postgres_db_settings + self.settings = settings self.conn = psycopg.connect( - host=settings.host, - port=settings.port, - user=settings.user, - password=settings.password, - dbname=settings.dbname, + host=self.settings.host, + port=self.settings.port, + user=self.settings.user, + password=self.settings.password, + dbname=self.settings.dbname, autocommit=True, ) @@ - self.embedding_model = SentenceTransformer(postgres_db_settings.embedding_model) + self.embedding_model = SentenceTransformer(self.settings.embedding_model) @@ def details(self) -> dict: """Return details about the backend.""" - return {"backend": "postgres", "host": postgres_db_settings.host, "port": postgres_db_settings.port} + return {"backend": "postgres", "host": self.settings.host, "port": self.settings.port}Also applies to: 98-100
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@kaizen/backend/postgres.py` around lines 56 - 67, The constructor uses the local resolved variable "settings" but later still reads the global singleton "postgres_db_settings", causing partial ignores of a passed config; update all usages to consistently use "settings" instead of "postgres_db_settings" (notably the embedding model initialization in the constructor where SentenceTransformer(postgres_db_settings.embedding_model) should use SentenceTransformer(settings.embedding_model), and any other references around the _ensure_pgvector_extension or later code block that currently read postgres_db_settings at lines referenced—search for postgres_db_settings in this class and replace with settings).
213-216:⚠️ Potential issue | 🟠 MajorFilter key handling should separate schema columns from metadata keys.
Line 214 treats every key as a column identifier; keys like
metadata.task_idor unknown keys can generate invalid SQL instead of filtering metadata.Proposed fix
where_parts = [] params: list = [] - for k, v in filters.items(): - where_parts.append(sql.SQL("{} = %s").format(sql.Identifier(k))) - params.append(v) + schema_fields = {"id", "type", "content", "created_at"} + for k, v in filters.items(): + if k in schema_fields: + where_parts.append(sql.SQL("{} = %s").format(sql.Identifier(k))) + params.append(v) + else: + meta_key = k.split(".", 1)[1] if k.startswith("metadata.") else k + where_parts.append(sql.SQL("metadata ->> %s = %s")) + params.extend([meta_key, str(v)])🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@kaizen/backend/postgres.py` around lines 213 - 216, The loop building where_parts in the filters handling treats every key as a table column (using sql.Identifier) which breaks for metadata keys like "metadata.task_id"; update the logic in the filters.items() loop (the where_parts construction) to detect metadata keys (e.g. k.startswith("metadata.")), split out the JSON key (after the dot) and append a JSONB extraction filter (using metadata ->> 'key' = %s) to where_parts while still adding the value to params; for non-metadata keys continue using sql.Identifier(k); also add a fallback/validation path for unknown keys to avoid producing invalid SQL.
179-180:⚠️ Potential issue | 🟡 MinorDon’t return
"0"when INSERT returns no row.Line 180 masks insertion anomalies and propagates an invalid entity ID.
Proposed fix
row = cur.fetchone() - return str(row[0]) if row else "0" + if not row: + raise KaizenException("Insert failed: PostgreSQL did not return a new entity ID.") + return str(row[0])🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@kaizen/backend/postgres.py` around lines 179 - 180, The code currently returns the string "0" when row = cur.fetchone() is None, which masks insertion failures; instead, change the branch handling the INSERT/RETURNING result (the block with row = cur.fetchone() and return str(row[0]) if row else "0") to raise a clear exception (e.g., RuntimeError or ValueError) including contextual info (operation name or SQL and parameters or cursor.statusmessage) so callers get a visible failure; if the API design prefers a nullable return, return None rather than "0" and update callers accordingly.kaizen/backend/filesystem.py (1)
165-170:⚠️ Potential issue | 🔴 CriticalClear
_active_datain afinallyand scope it to the active namespace.At Line 179,
_active_datais set but not guaranteed to be reset (e.g., emptyentitiespath or exceptions before_post_update). Then Line 251 can serve stale data and even for the wrong namespace.Proposed fix
-from threading import Lock +from threading import RLock @@ - self._lock = Lock() + self._lock = RLock() # Holds the loaded namespace data during update_entities so hooks can access it. self._active_data: FilesystemNamespace | None = None + self._active_namespace_id: str | None = None @@ def _post_update(self, namespace_id: str) -> None: assert self._active_data is not None self._active_data.num_entities = len(self._active_data.entities) self._save_namespace_data(namespace_id, self._active_data) - self._active_data = None @@ def update_entities( self, namespace_id: str, entities: list[Entity], enable_conflict_resolution: bool = True, ) -> list[EntityUpdate]: """Override to wrap the base template in a lock with loaded data.""" with self._lock: self._active_data = self._load_namespace_data(namespace_id) - return super().update_entities(namespace_id, entities, enable_conflict_resolution) + self._active_namespace_id = namespace_id + try: + return super().update_entities(namespace_id, entities, enable_conflict_resolution) + finally: + self._active_data = None + self._active_namespace_id = None @@ def search_entities( self, namespace_id: str, query: str | None = None, filters: dict | None = None, limit: int = 10, ) -> list[RecordedEntity]: """Search for entities in a namespace.""" - # If called during update_entities (inside the lock), use the active data - if self._active_data is not None: - return self._search_entities_internal(self._active_data, query, filters, limit) with self._lock: + if self._active_data is not None and self._active_namespace_id == namespace_id: + return self._search_entities_internal(self._active_data, query, filters, limit) data = self._load_namespace_data(namespace_id) return self._search_entities_internal(data, query, filters, limit)Also applies to: 177-180, 251-253
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@kaizen/backend/filesystem.py` around lines 165 - 170, The _post_update method currently sets self._active_data = None only at the end and can leave stale or wrong-namespace data if an exception occurs earlier; change callers that set self._active_data (and _post_update itself) to ensure clearing happens in a finally block and scope active data to the current namespace: when you set self._active_data before processing a namespace, wrap the processing and the call to _save_namespace_data (via _post_update) in try/finally and always set self._active_data = None in the finally, and ensure _post_update only operates on self._active_data when its namespace matches the provided namespace_id (using the namespace identifier available where you set _active_data and in _post_update) so stale data cannot be written for the wrong namespace.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@tests/unit/test_postgres_backend.py`:
- Around line 212-220: The test defines sample_rows as plain dicts but later
asserts using attribute access (e.g., result[0].id), causing failures; update
the fixture so each entry in sample_rows is an object with attributes (for
example wrap each dict in types.SimpleNamespace or construct a small
dataclass/namedtuple) so the code that checks result[0].id, result[0].type, etc.
works, or alternatively change those assertions to use dict access
(result[0]["id"])—prefer converting sample_rows entries to attribute-accessible
objects (SimpleNamespace) so existing assertions remain unchanged.
---
Duplicate comments:
In `@kaizen/backend/filesystem.py`:
- Around line 165-170: The _post_update method currently sets self._active_data
= None only at the end and can leave stale or wrong-namespace data if an
exception occurs earlier; change callers that set self._active_data (and
_post_update itself) to ensure clearing happens in a finally block and scope
active data to the current namespace: when you set self._active_data before
processing a namespace, wrap the processing and the call to _save_namespace_data
(via _post_update) in try/finally and always set self._active_data = None in the
finally, and ensure _post_update only operates on self._active_data when its
namespace matches the provided namespace_id (using the namespace identifier
available where you set _active_data and in _post_update) so stale data cannot
be written for the wrong namespace.
In `@kaizen/backend/postgres.py`:
- Around line 123-124: The namespace metadata calls instantiate SQLiteManager
with no args which ignores the configured SQLite URI; update all usages (e.g.,
the call that returns db_manager.create_namespace(namespace_id) and similar
blocks at the other occurrences) to construct SQLiteManager with the configured
URI/connection string (pass the same URI variable or config getter used
elsewhere in this module) so namespace operations use the configured namespace
storage backend; ensure the constructor argument matches the SQLiteManager
signature and propagate the same change to the other occurrences at the
indicated sites.
- Around line 56-67: The constructor uses the local resolved variable "settings"
but later still reads the global singleton "postgres_db_settings", causing
partial ignores of a passed config; update all usages to consistently use
"settings" instead of "postgres_db_settings" (notably the embedding model
initialization in the constructor where
SentenceTransformer(postgres_db_settings.embedding_model) should use
SentenceTransformer(settings.embedding_model), and any other references around
the _ensure_pgvector_extension or later code block that currently read
postgres_db_settings at lines referenced—search for postgres_db_settings in this
class and replace with settings).
- Around line 213-216: The loop building where_parts in the filters handling
treats every key as a table column (using sql.Identifier) which breaks for
metadata keys like "metadata.task_id"; update the logic in the filters.items()
loop (the where_parts construction) to detect metadata keys (e.g.
k.startswith("metadata.")), split out the JSON key (after the dot) and append a
JSONB extraction filter (using metadata ->> 'key' = %s) to where_parts while
still adding the value to params; for non-metadata keys continue using
sql.Identifier(k); also add a fallback/validation path for unknown keys to avoid
producing invalid SQL.
- Around line 179-180: The code currently returns the string "0" when row =
cur.fetchone() is None, which masks insertion failures; instead, change the
branch handling the INSERT/RETURNING result (the block with row = cur.fetchone()
and return str(row[0]) if row else "0") to raise a clear exception (e.g.,
RuntimeError or ValueError) including contextual info (operation name or SQL and
parameters or cursor.statusmessage) so callers get a visible failure; if the API
design prefers a nullable return, return None rather than "0" and update callers
accordingly.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 84d0011a-2e6f-4951-bcb6-33047402e711
⛔ Files ignored due to path filters (1)
uv.lockis excluded by!**/*.lock
📒 Files selected for processing (11)
kaizen/backend/base.pykaizen/backend/filesystem.pykaizen/backend/milvus.pykaizen/backend/postgres.pykaizen/config/kaizen.pykaizen/config/postgres.pykaizen/frontend/client/kaizen_client.pykaizen/utils/utils.pypyproject.tomltests/unit/test_milvus_backend.pytests/unit/test_postgres_backend.py
🚧 Files skipped from review as they are similar to previous changes (3)
- kaizen/frontend/client/kaizen_client.py
- kaizen/config/kaizen.py
- pyproject.toml
There was a problem hiding this comment.
Actionable comments posted: 2
♻️ Duplicate comments (5)
kaizen/backend/postgres.py (4)
47-60:⚠️ Potential issue | 🟠 MajorResolved config not stored; global settings still used.
The resolved
settingsvariable is computed but not stored as an instance attribute. Consequently:
- Line 60 uses
postgres_db_settings.embedding_modelinstead ofsettings.embedding_model- Line 93 uses
postgres_db_settings.host/portinstead of resolved settingsThis means a custom config passed to the constructor will be partially ignored.
🐛 Proposed fix
def __init__(self, config: BaseSettings | None = None): super().__init__(config) settings = config if isinstance(config, type(postgres_db_settings)) else postgres_db_settings + self.settings = settings self.conn = psycopg.connect( host=settings.host, port=settings.port, user=settings.user, password=settings.password, dbname=settings.dbname, autocommit=True, ) register_vector(self.conn) self._ensure_pgvector_extension() - self.embedding_model = SentenceTransformer(postgres_db_settings.embedding_model) + self.embedding_model = SentenceTransformer(self.settings.embedding_model)And update
details():def details(self) -> dict: """Return details about the backend.""" - return {"backend": "postgres", "host": postgres_db_settings.host, "port": postgres_db_settings.port} + return {"backend": "postgres", "host": self.settings.host, "port": self.settings.port}🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@kaizen/backend/postgres.py` around lines 47 - 60, The constructor of the Postgres class computes a local settings variable but doesn't store it, causing later uses to reference the global postgres_db_settings (e.g., embedding_model and host/port in details()); update __init__ to assign the resolved settings to an instance attribute (e.g., self.settings = settings) and change any places that use postgres_db_settings (such as the SentenceTransformer initialization in __init__ and the host/port references used in details()) to use self.settings.embedding_model and self.settings.host/port respectively so the passed config is honored throughout the class.
160-173:⚠️ Potential issue | 🟡 MinorDon't return sentinel
"0"when insert returns no row.Returning
"0"masks insertion anomalies and propagates an invalid entity ID into later update/delete flows.🐛 Proposed fix
row = cur.fetchone() - return str(row[0]) if row else "0" + if not row: + raise KaizenException(f"Insert failed: PostgreSQL did not return a new entity ID for table {table}.") + return str(row[0])🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@kaizen/backend/postgres.py` around lines 160 - 173, The _add_entity method currently returns the sentinel string "0" when cur.fetchone() is None; instead detect the missing RETURNING id and raise a clear exception so callers don't receive an invalid ID. In _add_entity, after executing the INSERT ... RETURNING id and calling cur.fetchone(), if row is falsy raise a RuntimeError or ValueError (include context such as table/namepace_id/entity_type) rather than returning "0"; keep the successful path returning str(row[0]). This ensures insertion anomalies surface to callers of _add_entity and downstream update/delete flows won't receive a bogus ID.
116-117:⚠️ Potential issue | 🟠 MajorSQLiteManager called without configured path.
Unlike the Milvus backend which passes
self.sqlite_uri, the Postgres backend callsSQLiteManager()with no arguments at lines 116, 123, 135, and 155. This relies on environment variables or defaults, potentially breaking namespace isolation when a non-default SQLite path is configured.Note:
PostgresDBSettings(from context snippet) doesn't define asqlite_urifield, so you'd need to either add it to the settings class or use environment variable fallback like Milvus does.🐛 Proposed fix
In
__init__:+import os ... def __init__(self, config: BaseSettings | None = None): ... self.settings = settings + self.sqlite_uri = os.getenv("KAIZEN_SQLITE_PATH") or getattr(self.settings, "sqlite_uri", None)Then update all SQLiteManager calls:
- with SQLiteManager() as db_manager: + with SQLiteManager(self.sqlite_uri) as db_manager:🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@kaizen/backend/postgres.py` around lines 116 - 117, The Postgres backend is instantiating SQLiteManager without a configured path which breaks isolation; update the Postgres DB class to obtain a sqlite_uri (either add a sqlite_uri field to PostgresDBSettings or read the same env fallback used by Milvus) in __init__ and store it as self.sqlite_uri, then replace all bare SQLiteManager() calls (e.g., in create_namespace, delete_namespace, list_namespaces, get_namespace) with SQLiteManager(self.sqlite_uri) so the same configured SQLite path is always used.
204-209:⚠️ Potential issue | 🟠 MajorFilter handling treats all keys as column names, breaking metadata filters.
Every filter key is treated as a direct column identifier. Keys like
metadata.user_id(used byretrieve_user_factsinkaizen_client.py) will generate invalid SQL since there's no column namedmetadata.user_id.🐛 Proposed fix - distinguish schema fields from metadata filters
+ schema_fields = {"id", "type", "content", "created_at"} for k, v in filters.items(): - where_parts.append(sql.SQL("{} = %s").format(sql.Identifier(k))) - params.append(v) + if k in schema_fields: + where_parts.append(sql.SQL("{} = %s").format(sql.Identifier(k))) + params.append(v) + else: + # Handle metadata.key or plain key as JSONB access + meta_key = k.split(".", 1)[1] if k.startswith("metadata.") else k + where_parts.append(sql.SQL("metadata ->> %s = %s")) + params.extend([meta_key, str(v)])🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@kaizen/backend/postgres.py` around lines 204 - 209, The loop building where_parts treats every filter key as a column identifier (in the for k, v in filters.items() block), which breaks keys like "metadata.user_id"; fix by detecting keys with a dot, splitting into base column and JSON path (e.g., "metadata" and "user_id"), and generate JSONB access SQL for those (use ->> for single-key access or #>> for multi-segment paths) instead of sql.Identifier(k); keep using sql.Identifier for the base column and add the JSON path and filter value to params so the resulting clause becomes something like sql.SQL("{} ->> %s = %s").format(sql.Identifier(base)) for single-segment metadata filters, while leaving plain column names unchanged.tests/unit/test_postgres_backend.py (1)
208-248:⚠️ Potential issue | 🟠 MajorMock bypasses row_factory transformation, causing type mismatch.
The test mocks
cursor.fetchall()to return raw dicts, but the actualsearch_entitiesimplementation usesrow_factory=_entity_row_factorywhich transforms rows intoRecordedEntityinstances. When mockingfetchall, the row factory is bypassed, so the mock should returnRecordedEntityobjects directly to match the expected behavior.🐛 Proposed fix
+from kaizen.schema.core import Entity, Namespace, RecordedEntity + `@pytest.mark.unit` def test_search_entities(postgres_backend: PostgresEntityBackend, monkeypatch): """Test searching entities with and without a query string.""" now_ts = int(datetime.datetime.now(datetime.UTC).timestamp()) - sample_rows = [ - { - "id": 123, - "type": "fact", - "content": "Test content", - "created_at": now_ts, - "metadata": {}, - } - ] + sample_rows = [ + RecordedEntity( + id="123", + type="fact", + content="Test content", + created_at=datetime.datetime.fromtimestamp(now_ts, datetime.UTC), + metadata={}, + ) + ]🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/unit/test_postgres_backend.py` around lines 208 - 248, The test currently mocks cursor.fetchall() to return raw dicts which bypasses the row_factory transformation used by PostgresEntityBackend.search_entities; update the mock to return objects that match the transformed output (i.e., instances produced by _entity_row_factory / RecordedEntity) so assertions on .id, .type, .content match; specifically change mock_cursor.fetchall.return_value to a list containing RecordedEntity(...) (constructed with the same sample_rows data) or apply _entity_row_factory to the sample dicts before returning, keeping the mock on postgres_backend.conn.cursor as-is.
🧹 Nitpick comments (2)
kaizen/utils/utils.py (1)
12-17: Add return type annotation for consistency.The function lacks a return type annotation. Since it can return either the parsed JSON (dict, list, etc.) or the original string, consider adding
-> str | dict | Anyor simply-> Any.✨ Proposed fix
-def deserialize_content(content: str): +def deserialize_content(content: str) -> Any: """Deserialize content from storage."""Also add
Anyto imports:+from typing import Any + import json🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@kaizen/utils/utils.py` around lines 12 - 17, The function deserialize_content currently has no return type annotation; update its signature (deserialize_content) to include an appropriate return type such as -> Any or -> str | dict | list | Any and import Any from typing (add "from typing import Any" to the imports) so the annotation is valid and consistent with the possible return values (parsed JSON or original string).kaizen/backend/postgres.py (1)
26-40: Add defensive handling for malformedcreated_atvalues.The row factory assumes
row["created_at"]is always a valid integer. If the value isNone, empty, or malformed,datetime.fromtimestamp()will raise an exception. The Milvus backend'sparse_milvus_entityhandles this with try/except and falls back to the current time.✨ Proposed defensive handling
def make_row(values: Sequence[Any]) -> RecordedEntity: row = dict(zip(cols, values)) + created_at_value = row.get("created_at") + if created_at_value is not None and created_at_value != "": + try: + created_at = datetime.datetime.fromtimestamp(int(created_at_value), datetime.UTC) + except (TypeError, ValueError, OSError): + created_at = datetime.datetime.now(datetime.UTC) + else: + created_at = datetime.datetime.now(datetime.UTC) + return RecordedEntity( id=str(row["id"]), type=row["type"], content=deserialize_content(row["content"]), - created_at=datetime.datetime.fromtimestamp(row["created_at"], datetime.UTC), + created_at=created_at, metadata=row.get("metadata", {}), )🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@kaizen/backend/postgres.py` around lines 26 - 40, The make_row inside _entity_row_factory currently assumes row["created_at"] is a valid timestamp; wrap the timestamp conversion in defensive logic: in make_row, detect if row.get("created_at") is None or not an int/float (or already a datetime) and use a try/except around datetime.datetime.fromtimestamp(row["created_at"], datetime.UTC), falling back to datetime.datetime.now(datetime.UTC) on TypeError/ValueError/OverflowError or if the value is None/malformed; keep other fields (id, type, content via deserialize_content, metadata) the same and ensure RecordedEntity.created_at always gets a datetime instance.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@tests/unit/test_postgres_backend.py`:
- Around line 167-192: The test creates an EntityUpdate with its type and
content swapped; inside test_update_entities replace the EntityUpdate
construction so that the type is "fact" and the content is "Test entity content"
(i.e., change EntityUpdate(id="12345", type="Test entity content",
content="fact", ...) to EntityUpdate(id="12345", type="fact", content="Test
entity content", ...)) so downstream calls in update_entities and the
resolve_conflicts stub operate on the correct fields.
- Around line 16-28: The fixture currently creates patches inside a with block
that exits before tests run, so change the postgres_backend fixture (and keep
PostgresEntityBackend) to start patchers and yield the backend so patches remain
active for the test lifetime: create patchers for
"kaizen.backend.postgres.psycopg", "kaizen.backend.postgres.register_vector",
and "kaizen.backend.postgres.SentenceTransformer" (e.g., patchers = [patch(...),
...]; started = [p.start() for p in patchers]), set
started[0].connect.return_value = mock_conn, instantiate PostgresEntityBackend,
yield the backend, then stop all patchers after the yield to restore state; this
preserves mocks for psycopg.connect, register_vector, and SentenceTransformer
during tests.
---
Duplicate comments:
In `@kaizen/backend/postgres.py`:
- Around line 47-60: The constructor of the Postgres class computes a local
settings variable but doesn't store it, causing later uses to reference the
global postgres_db_settings (e.g., embedding_model and host/port in details());
update __init__ to assign the resolved settings to an instance attribute (e.g.,
self.settings = settings) and change any places that use postgres_db_settings
(such as the SentenceTransformer initialization in __init__ and the host/port
references used in details()) to use self.settings.embedding_model and
self.settings.host/port respectively so the passed config is honored throughout
the class.
- Around line 160-173: The _add_entity method currently returns the sentinel
string "0" when cur.fetchone() is None; instead detect the missing RETURNING id
and raise a clear exception so callers don't receive an invalid ID. In
_add_entity, after executing the INSERT ... RETURNING id and calling
cur.fetchone(), if row is falsy raise a RuntimeError or ValueError (include
context such as table/namepace_id/entity_type) rather than returning "0"; keep
the successful path returning str(row[0]). This ensures insertion anomalies
surface to callers of _add_entity and downstream update/delete flows won't
receive a bogus ID.
- Around line 116-117: The Postgres backend is instantiating SQLiteManager
without a configured path which breaks isolation; update the Postgres DB class
to obtain a sqlite_uri (either add a sqlite_uri field to PostgresDBSettings or
read the same env fallback used by Milvus) in __init__ and store it as
self.sqlite_uri, then replace all bare SQLiteManager() calls (e.g., in
create_namespace, delete_namespace, list_namespaces, get_namespace) with
SQLiteManager(self.sqlite_uri) so the same configured SQLite path is always
used.
- Around line 204-209: The loop building where_parts treats every filter key as
a column identifier (in the for k, v in filters.items() block), which breaks
keys like "metadata.user_id"; fix by detecting keys with a dot, splitting into
base column and JSON path (e.g., "metadata" and "user_id"), and generate JSONB
access SQL for those (use ->> for single-key access or #>> for multi-segment
paths) instead of sql.Identifier(k); keep using sql.Identifier for the base
column and add the JSON path and filter value to params so the resulting clause
becomes something like sql.SQL("{} ->> %s = %s").format(sql.Identifier(base))
for single-segment metadata filters, while leaving plain column names unchanged.
In `@tests/unit/test_postgres_backend.py`:
- Around line 208-248: The test currently mocks cursor.fetchall() to return raw
dicts which bypasses the row_factory transformation used by
PostgresEntityBackend.search_entities; update the mock to return objects that
match the transformed output (i.e., instances produced by _entity_row_factory /
RecordedEntity) so assertions on .id, .type, .content match; specifically change
mock_cursor.fetchall.return_value to a list containing RecordedEntity(...)
(constructed with the same sample_rows data) or apply _entity_row_factory to the
sample dicts before returning, keeping the mock on postgres_backend.conn.cursor
as-is.
---
Nitpick comments:
In `@kaizen/backend/postgres.py`:
- Around line 26-40: The make_row inside _entity_row_factory currently assumes
row["created_at"] is a valid timestamp; wrap the timestamp conversion in
defensive logic: in make_row, detect if row.get("created_at") is None or not an
int/float (or already a datetime) and use a try/except around
datetime.datetime.fromtimestamp(row["created_at"], datetime.UTC), falling back
to datetime.datetime.now(datetime.UTC) on TypeError/ValueError/OverflowError or
if the value is None/malformed; keep other fields (id, type, content via
deserialize_content, metadata) the same and ensure RecordedEntity.created_at
always gets a datetime instance.
In `@kaizen/utils/utils.py`:
- Around line 12-17: The function deserialize_content currently has no return
type annotation; update its signature (deserialize_content) to include an
appropriate return type such as -> Any or -> str | dict | list | Any and import
Any from typing (add "from typing import Any" to the imports) so the annotation
is valid and consistent with the possible return values (parsed JSON or original
string).
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: dcba9509-1639-4abd-a985-9a43925b5e9c
📒 Files selected for processing (5)
kaizen/backend/milvus.pykaizen/backend/postgres.pykaizen/frontend/client/kaizen_client.pykaizen/utils/utils.pytests/unit/test_postgres_backend.py
| @pytest.mark.unit | ||
| def test_update_entities(postgres_backend: PostgresEntityBackend, monkeypatch): | ||
| """Test updating entities.""" | ||
| entity_update = EntityUpdate(id="12345", type="Test entity content", content="fact", event="ADD") | ||
|
|
||
| def search_entities(self, namespace_id, query, filters=None, limit=10): | ||
| return [] | ||
|
|
||
| def resolve_conflicts(old_entities, new_entities): | ||
| return [entity_update] | ||
|
|
||
| monkeypatch.setattr(postgres_backend, "_table_exists", make_table_exists(True)) | ||
| monkeypatch.setattr(postgres_backend.embedding_model, "encode", arbitrary_embedding) | ||
| monkeypatch.setattr(postgres_backend, "search_entities", search_entities.__get__(postgres_backend, PostgresEntityBackend)) | ||
|
|
||
| mock_cursor = MagicMock() | ||
| mock_cursor.fetchone.return_value = (12345,) | ||
| postgres_backend.conn.cursor.return_value.__enter__ = Mock(return_value=mock_cursor) | ||
| postgres_backend.conn.cursor.return_value.__exit__ = Mock(return_value=False) | ||
|
|
||
| with patch("kaizen.llm.conflict_resolution.conflict_resolution.resolve_conflicts", resolve_conflicts): | ||
| entities = [Entity(type=entity_update.type, content=entity_update.content, metadata={"key": "value"})] | ||
| result = postgres_backend.update_entities(namespace_id="test_namespace", entities=entities, enable_conflict_resolution=True) | ||
|
|
||
| assert len(result) == 1 | ||
| assert result[0].event == "ADD" |
There was a problem hiding this comment.
EntityUpdate fields are swapped.
At line 170, type and content values appear to be swapped:
EntityUpdate(id="12345", type="Test entity content", content="fact", event="ADD")Based on the schema, type should be "fact" and content should be "Test entity content".
🐛 Proposed fix
- entity_update = EntityUpdate(id="12345", type="Test entity content", content="fact", event="ADD")
+ entity_update = EntityUpdate(id="12345", type="fact", content="Test entity content", event="ADD")🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@tests/unit/test_postgres_backend.py` around lines 167 - 192, The test creates
an EntityUpdate with its type and content swapped; inside test_update_entities
replace the EntityUpdate construction so that the type is "fact" and the content
is "Test entity content" (i.e., change EntityUpdate(id="12345", type="Test
entity content", content="fact", ...) to EntityUpdate(id="12345", type="fact",
content="Test entity content", ...)) so downstream calls in update_entities and
the resolve_conflicts stub operate on the correct fields.
There was a problem hiding this comment.
Actionable comments posted: 3
♻️ Duplicate comments (3)
kaizen/backend/postgres.py (3)
47-60:⚠️ Potential issue | 🟠 MajorUse the resolved backend settings everywhere.
__init__()connects withsettings, but model initialization anddetails()still read the module-globalpostgres_db_settings. One backend instance can therefore connect with one config and report/load another.🐛 Proposed fix
class PostgresEntityBackend(BaseEntityBackend): conn: psycopg.Connection embedding_model: SentenceTransformer def __init__(self, config: BaseSettings | None = None): super().__init__(config) - settings = config if isinstance(config, type(postgres_db_settings)) else postgres_db_settings + self.settings = config if isinstance(config, type(postgres_db_settings)) else postgres_db_settings self.conn = psycopg.connect( - host=settings.host, - port=settings.port, - user=settings.user, - password=settings.password, - dbname=settings.dbname, + host=self.settings.host, + port=self.settings.port, + user=self.settings.user, + password=self.settings.password, + dbname=self.settings.dbname, autocommit=True, ) - self.embedding_model = SentenceTransformer(postgres_db_settings.embedding_model) + self.embedding_model = SentenceTransformer(self.settings.embedding_model) @@ def details(self) -> dict: """Return details about the backend.""" - return {"backend": "postgres", "host": postgres_db_settings.host, "port": postgres_db_settings.port} + return {"backend": "postgres", "host": self.settings.host, "port": self.settings.port}Also applies to: 91-93
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@kaizen/backend/postgres.py` around lines 47 - 60, The constructor uses the local variable settings for the DB connection but still reads the module-global postgres_db_settings when initializing embedding_model and in details(); update all uses to consistently use the resolved settings instance (the local settings variable) so the backend reports and loads the same config it connected with—specifically change references in __init__ (where embedding_model = SentenceTransformer(postgres_db_settings.embedding_model)) and in details() (and any other reads at lines ~91-93) to use settings.embedding_model / settings.<field> instead of postgres_db_settings, ensuring _ensure_pgvector_extension, register_vector and any subsequent initialization consistently use the resolved settings object.
206-211:⚠️ Potential issue | 🟠 MajorValidate filter keys before converting them into column identifiers.
Every filter key becomes
sql.Identifier(k).metadata.fooand plain typos both turn into invalid SQL today, and there is no path for JSONB metadata filtering.🐛 Proposed fix
- for k, v in filters.items(): - where_parts.append(sql.SQL("{} = %s").format(sql.Identifier(k))) - params.append(v) + schema_fields = {"id", "type", "content", "created_at"} + for k, v in filters.items(): + if k in schema_fields: + where_parts.append(sql.SQL("{} = %s").format(sql.Identifier(k))) + params.append(v) + else: + meta_key = k.split(".", 1)[1] if k.startswith("metadata.") else k + where_parts.append(sql.SQL("metadata ->> %s = %s")) + params.extend([meta_key, str(v)])🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@kaizen/backend/postgres.py` around lines 206 - 211, Validate and whitelist filter keys before turning them into sql.Identifier or JSONB expressions: ensure keys in filters are either a known column name (present in an allowed_columns set) or a metadata path like "metadata.someKey"; for plain columns build the condition using sql.Identifier(k) and %s param as before (referencing where_parts, params), for metadata.* keys translate into a JSONB expression such as (sql.Identifier('metadata') ->> sql.Literal('someKey')) = %s and append the value to params, and for any key not in the whitelist raise an error or skip it explicitly to avoid injecting invalid identifiers into where_clause.
116-117:⚠️ Potential issue | 🟠 MajorDon’t hardwire namespace metadata to
SQLiteManager()defaults.These calls leave no path for a resolved SQLite URI, so Postgres tables can be created in one backend instance while namespace metadata is written and read from a different SQLite DB.
Also applies to: 123-124, 135-137, 155-156
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@kaizen/backend/postgres.py` around lines 116 - 117, The PostgreSQL backend is instantiating SQLiteManager() with defaults which can point to a different SQLite DB than the one used for namespace metadata; update each call that currently does "with SQLiteManager() as db_manager" (seen in methods like create_namespace, get_namespace, update_namespace and delete_namespace — the occurrences at the commented lines 116-117, 123-124, 135-137, 155-156) to pass the resolved SQLite URI used by this Postgres backend (e.g., self._sqlite_uri or a helper like self._get_sqlite_uri()) into SQLiteManager so both table creation and namespace metadata operations use the same SQLite file/URI.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@kaizen/backend/postgres.py`:
- Around line 58-60: Call _ensure_pgvector_extension() before register_vector()
because register_vector() requires the pgvector extension to already exist;
modify the initialization sequence in the class so that
_ensure_pgvector_extension() runs first, then call register_vector(self.conn),
and then continue with setting self.embedding_model (SentenceTransformer) to
avoid the ProgrammingError on fresh databases.
- Line 23: The hard-coded EMBEDDING_DIM = 384 causes schema mismatches when
KAIZEN_PG_EMBEDDING_MODEL is changed; replace the constant with a dynamic dim
derived from the configured embedding model (embedding_model /
KAIZEN_PG_EMBEDDING_MODEL) at init time and use that value when creating tables
(the CREATE TABLE block that uses sql.Literal(EMBEDDING_DIM)); specifically,
compute the embedding dimension by loading or querying the chosen embedding
model (or by generating one sample embedding via the same function that creates
embeddings) and set EMBEDDING_DIM from that runtime value before any calls to
the table creation logic so embedding vector({dim}) matches actual embedding
length.
- Around line 101-114: The table creation in create_namespace (the cur.execute
block that formats sql.Identifier(table)) creates the embedding column but no
vector index, causing search_entities (which uses ORDER BY embedding <=> ...) to
perform full table scans; modify the table creation routine to also create an
HNSW vector index for the embedding column (e.g., run a CREATE INDEX ON {table}
USING hnsw (embedding vector_cosine_ops) or vector_l2_ops depending on distance
metric) — either append the index DDL in the same cur.execute transaction or
execute a separate cur.execute after the table create, ensuring to use
sql.Identifier(table) when formatting the index statement and handle errors if
the index already exists.
---
Duplicate comments:
In `@kaizen/backend/postgres.py`:
- Around line 47-60: The constructor uses the local variable settings for the DB
connection but still reads the module-global postgres_db_settings when
initializing embedding_model and in details(); update all uses to consistently
use the resolved settings instance (the local settings variable) so the backend
reports and loads the same config it connected with—specifically change
references in __init__ (where embedding_model =
SentenceTransformer(postgres_db_settings.embedding_model)) and in details() (and
any other reads at lines ~91-93) to use settings.embedding_model /
settings.<field> instead of postgres_db_settings, ensuring
_ensure_pgvector_extension, register_vector and any subsequent initialization
consistently use the resolved settings object.
- Around line 206-211: Validate and whitelist filter keys before turning them
into sql.Identifier or JSONB expressions: ensure keys in filters are either a
known column name (present in an allowed_columns set) or a metadata path like
"metadata.someKey"; for plain columns build the condition using
sql.Identifier(k) and %s param as before (referencing where_parts, params), for
metadata.* keys translate into a JSONB expression such as
(sql.Identifier('metadata') ->> sql.Literal('someKey')) = %s and append the
value to params, and for any key not in the whitelist raise an error or skip it
explicitly to avoid injecting invalid identifiers into where_clause.
- Around line 116-117: The PostgreSQL backend is instantiating SQLiteManager()
with defaults which can point to a different SQLite DB than the one used for
namespace metadata; update each call that currently does "with SQLiteManager()
as db_manager" (seen in methods like create_namespace, get_namespace,
update_namespace and delete_namespace — the occurrences at the commented lines
116-117, 123-124, 135-137, 155-156) to pass the resolved SQLite URI used by this
Postgres backend (e.g., self._sqlite_uri or a helper like
self._get_sqlite_uri()) into SQLiteManager so both table creation and namespace
metadata operations use the same SQLite file/URI.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 60db7d83-f877-400e-a924-7fa623c8cfa7
📒 Files selected for processing (1)
kaizen/backend/postgres.py
| logging.basicConfig(level=logging.INFO) | ||
| logger = logging.getLogger("entities-db.pgvector") | ||
|
|
||
| EMBEDDING_DIM = 384 |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
# First, let's get a sense of the file size and structure
wc -l kaizen/backend/postgres.pyRepository: AgentToolkit/kaizen
Length of output: 93
🏁 Script executed:
# Read the relevant sections to understand the embedding dimension issue
cat -n kaizen/backend/postgres.py | head -120Repository: AgentToolkit/kaizen
Length of output: 5508
🏁 Script executed:
# Read the rest of the file to see how embeddings are used
cat -n kaizen/backend/postgres.py | tail -130Repository: AgentToolkit/kaizen
Length of output: 6911
🏁 Script executed:
# Check the postgres config to understand what embedding_model is and if it's configurable
find . -name "postgres.py" -path "*/config/*" | head -5Repository: AgentToolkit/kaizen
Length of output: 90
🏁 Script executed:
# Look for postgres config file
fd -e py -p "config.*postgres" 2>/dev/null | head -5Repository: AgentToolkit/kaizen
Length of output: 88
🏁 Script executed:
# Read the postgres config file to understand embedding_model configuration
cat -n kaizen/config/postgres.pyRepository: AgentToolkit/kaizen
Length of output: 750
🌐 Web query:
sentence-transformers/all-MiniLM-L6-v2 embedding dimension size
💡 Result:
sentence-transformers/all-MiniLM-L6-v2 produces 384-dimensional embedding vectors. [1], [2]
Sources: Hugging Face model card (MiniLM-L6-H384 base) [1]; ONNX package description listing “Embedding dimension: 384” [2].
Tie the vector dimension to the configured embedding model.
embedding_model is configurable via KAIZEN_PG_EMBEDDING_MODEL, but all namespace tables are hard-coded with vector(384). While the default model (sentence-transformers/all-MiniLM-L6-v2) produces 384-dimensional embeddings, any other embedding model with a different output dimension will fail at insertion time with a dimension mismatch error.
Affected code
Line 23:
EMBEDDING_DIM = 384
Line 104–113:
CREATE TABLE IF NOT EXISTS {table} (
...
embedding vector({dim}),
...
).format(table=sql.Identifier(table), dim=sql.Literal(EMBEDDING_DIM))
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@kaizen/backend/postgres.py` at line 23, The hard-coded EMBEDDING_DIM = 384
causes schema mismatches when KAIZEN_PG_EMBEDDING_MODEL is changed; replace the
constant with a dynamic dim derived from the configured embedding model
(embedding_model / KAIZEN_PG_EMBEDDING_MODEL) at init time and use that value
when creating tables (the CREATE TABLE block that uses
sql.Literal(EMBEDDING_DIM)); specifically, compute the embedding dimension by
loading or querying the chosen embedding model (or by generating one sample
embedding via the same function that creates embeddings) and set EMBEDDING_DIM
from that runtime value before any calls to the table creation logic so
embedding vector({dim}) matches actual embedding length.
| register_vector(self.conn) | ||
| self._ensure_pgvector_extension() | ||
| self.embedding_model = SentenceTransformer(postgres_db_settings.embedding_model) |
There was a problem hiding this comment.
🧩 Analysis chain
🌐 Web query:
For pgvector's Psycopg 3 integration, should register_vector(conn)be called only afterCREATE EXTENSION IF NOT EXISTS vector has succeeded for that database?
💡 Result:
Yes. register_vector(conn) should be called only after the vector extension is installed/enabled in that database (i.e., after CREATE EXTENSION IF NOT EXISTS vector has succeeded, or if you already know it’s installed).
- The pgvector-python Psycopg 3 docs show the intended order: enable the extension first, then register the types. [1]
- Implementation-wise,
register_vector()callsTypeInfo.fetch(..., "vector")and thenregister_vector_info(...); if thevectortype isn’t present, it raisespsycopg.ProgrammingError("vector type not found in the database"). [2] [3]
Sources: [1] (github.com) [2] (raw.githubusercontent.com) [3] (raw.githubusercontent.com)
Citations:
- 1: https://github.com/pgvector/pgvector-python
- 2: https://raw.githubusercontent.com/pgvector/pgvector-python/809287f92847e1c609a9c395891da76f674379ea/pgvector/psycopg/register.py
- 3: https://raw.githubusercontent.com/pgvector/pgvector-python/809287f92847e1c609a9c395891da76f674379ea/pgvector/psycopg/vector.py
🏁 Script executed:
# Locate and examine the postgres.py file
find . -name "postgres.py" -path "*/backend/*" 2>/dev/null | head -5Repository: AgentToolkit/kaizen
Length of output: 91
🏁 Script executed:
# Read the relevant section of the file
cat -n kaizen/backend/postgres.py | sed -n '50,75p'Repository: AgentToolkit/kaizen
Length of output: 1236
🏁 Script executed:
# Find the _ensure_pgvector_extension method implementation
rg -A 10 "_ensure_pgvector_extension" kaizen/backend/postgres.pyRepository: AgentToolkit/kaizen
Length of output: 659
Swap the order: call _ensure_pgvector_extension() before register_vector().
The pgvector Psycopg 3 integration requires the vector extension to exist in the database before register_vector() is invoked. On a fresh database, the current order causes register_vector() to fail with a ProgrammingError (vector type not found) before _ensure_pgvector_extension() has a chance to create it.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@kaizen/backend/postgres.py` around lines 58 - 60, Call
_ensure_pgvector_extension() before register_vector() because register_vector()
requires the pgvector extension to already exist; modify the initialization
sequence in the class so that _ensure_pgvector_extension() runs first, then call
register_vector(self.conn), and then continue with setting self.embedding_model
(SentenceTransformer) to avoid the ProgrammingError on fresh databases.
| cur.execute( | ||
| sql.SQL( | ||
| """ | ||
| CREATE TABLE IF NOT EXISTS {table} ( | ||
| id BIGSERIAL PRIMARY KEY, | ||
| type VARCHAR(128) NOT NULL, | ||
| content TEXT NOT NULL, | ||
| created_at BIGINT NOT NULL, | ||
| embedding vector({dim}), | ||
| metadata JSONB DEFAULT '{{}}'::jsonb | ||
| ) | ||
| """ | ||
| ).format(table=sql.Identifier(table), dim=sql.Literal(EMBEDDING_DIM)) | ||
| ) |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
# First, let's look at the complete context of kaizen/backend/postgres.py
# Get file size to understand how to read it safely
wc -l kaizen/backend/postgres.pyRepository: AgentToolkit/kaizen
Length of output: 93
🏁 Script executed:
# Read the file to examine the relevant functions
cat -n kaizen/backend/postgres.py | head -250Repository: AgentToolkit/kaizen
Length of output: 12303
🏁 Script executed:
# Search for any INDEX creation in the file
grep -n "INDEX\|index" kaizen/backend/postgres.pyRepository: AgentToolkit/kaizen
Length of output: 45
🌐 Web query:
pgvector HNSW IVFFlat index approximate nearest neighbor search default behavior
💡 Result:
- Default behavior (no vector index): pgvector does an exact nearest-neighbor search (perfect recall / full scan). Adding an HNSW or IVFFlat index switches you to approximate nearest-neighbor search, so results can change after the index is created. [1]
- When Postgres will use the pgvector index: the query must have
ORDER BY+LIMIT, and theORDER BYexpression must be a distance operator result (e.g.,<->,<=>, etc.) in ascending order (not wrapped in other expressions). [1] - IVFFlat “how approximate” defaults:
ivfflat.probescontrols how many inverted lists are searched; it is1by default (fastest, lowest recall). Settingprobeshigher improves recall but costs latency; settingprobes = listsmakes it effectively exact, and at that point the planner won’t use the index. [1][3] - HNSW “how approximate” defaults: query-time candidate list size is controlled by
hnsw.ef_search, which is40by default (higher = better recall, slower). Key build defaults includem = 16andef_construction = 64. [2] - Filtering + ANN indexes (common “gotcha”): with approximate indexes, filtering may not be fully satisfied by the first index pass; pgvector 0.8.0 added iterative scanning (
hnsw.iterative_scan,ivfflat.iterative_scan) so it can continue searching until thresholds likehnsw.max_scan_tuples/ivfflat.max_probes. [4]
Sources: GitHub pgvector README [1]; Crunchy Data pgvector docs (latest PDF) [2]; Crunchy Data pgvector docs (0.6.1 PDF) [3]; PostgreSQL.org pgvector 0.8.0 release notes [4].
Similarity search performs full table scans instead of approximate nearest-neighbor search.
search_entities() uses ORDER BY embedding <=> ... LIMIT (line 221), but create_namespace() only creates the table without an index. pgvector defaults to exact nearest-neighbor search without an HNSW or IVFFlat index. Add a vector index during table creation to enable approximate search:
Index creation
CREATE INDEX ON {table} USING hnsw (embedding vector_cosine_ops)
-- or for Euclidean: vector_l2_opsAlso applies to: 219-223
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@kaizen/backend/postgres.py` around lines 101 - 114, The table creation in
create_namespace (the cur.execute block that formats sql.Identifier(table))
creates the embedding column but no vector index, causing search_entities (which
uses ORDER BY embedding <=> ...) to perform full table scans; modify the table
creation routine to also create an HNSW vector index for the embedding column
(e.g., run a CREATE INDEX ON {table} USING hnsw (embedding vector_cosine_ops) or
vector_l2_ops depending on distance metric) — either append the index DDL in the
same cur.execute transaction or execute a separate cur.execute after the table
create, ensuring to use sql.Identifier(table) when formatting the index
statement and handle errors if the index already exists.
There was a problem hiding this comment.
Actionable comments posted: 1
♻️ Duplicate comments (1)
tests/unit/test_postgres_backend.py (1)
168-170:⚠️ Potential issue | 🟡 MinorUse the correct
type/contentvalues in this ADD-path fixture.
EntityUpdateis initialized with the fields reversed here, so this test currently drivesupdate_entities()with"Test entity content"as the entity type and"fact"as the content. It still passes, but it no longer verifies the schema mapping you actually want.🔧 Suggested change
- entity_update = EntityUpdate(id="12345", type="Test entity content", content="fact", event="ADD") + entity_update = EntityUpdate(id="12345", type="fact", content="Test entity content", event="ADD")🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/unit/test_postgres_backend.py` around lines 168 - 170, The test fixture in test_update_entities uses EntityUpdate with the type and content values reversed; update the EntityUpdate instantiation in test_update_entities so the type argument contains the actual schema type (e.g., "fact") and the content argument contains the entity text (e.g., "Test entity content") so the test drives update_entities() with the correct mapping.
🧹 Nitpick comments (1)
tests/e2e/test_mcp.py (1)
15-15: Consider adding"pgvector"to the fixture params as a follow-up.The fixture currently tests
["milvus", "filesystem"]. Once the pending Postgres e2e integration is ready, this should be extended to include"pgvector"to ensure the new backend is covered by MCP transport tests.Do you want me to open a tracking issue for adding pgvector e2e test coverage?
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/e2e/test_mcp.py` at line 15, Update the pytest fixture parameter list used for MCP transport tests (the `@pytest.fixture`(params=["milvus", "filesystem"]) declaration) to include "pgvector" once the Postgres e2e integration is ready; modify the params array to ["milvus", "filesystem", "pgvector"] so the new backend is exercised by the existing tests (do this as a follow-up when the pgvector backend and any necessary test setup/mocks are available).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@tests/unit/test_postgres_backend.py`:
- Around line 7-10: Guard the test module's dependency on optional Postgres
packages by calling pytest.importorskip for the optional packages (e.g.,
'psycopg' and 'pgvector') before importing PostgresEntityBackend (or
alternatively wrap the import of PostgresEntityBackend in a try/except
ImportError and call pytest.skip). This ensures the PostgresEntityBackend class
import is only attempted when the optional dependencies are present and prevents
test collection failures in environments without the [pgvector] extra.
---
Duplicate comments:
In `@tests/unit/test_postgres_backend.py`:
- Around line 168-170: The test fixture in test_update_entities uses
EntityUpdate with the type and content values reversed; update the EntityUpdate
instantiation in test_update_entities so the type argument contains the actual
schema type (e.g., "fact") and the content argument contains the entity text
(e.g., "Test entity content") so the test drives update_entities() with the
correct mapping.
---
Nitpick comments:
In `@tests/e2e/test_mcp.py`:
- Line 15: Update the pytest fixture parameter list used for MCP transport tests
(the `@pytest.fixture`(params=["milvus", "filesystem"]) declaration) to include
"pgvector" once the Postgres e2e integration is ready; modify the params array
to ["milvus", "filesystem", "pgvector"] so the new backend is exercised by the
existing tests (do this as a follow-up when the pgvector backend and any
necessary test setup/mocks are available).
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: b34fdebf-d461-45f1-8fcb-7c4c923bc259
📒 Files selected for processing (2)
tests/e2e/test_mcp.pytests/unit/test_postgres_backend.py
| import pytest | ||
| from unittest.mock import Mock, MagicMock, patch | ||
|
|
||
| from kaizen.backend.postgres import PostgresEntityBackend |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
rg -n '^\[project\.optional-dependencies\]|pgvector|psycopg|sentence-transformers' pyproject.toml
rg -n 'importorskip|from kaizen\.backend\.postgres import PostgresEntityBackend' tests/unit/test_postgres_backend.pyRepository: AgentToolkit/kaizen
Length of output: 304
Guard this test module behind importorskip for the optional Postgres dependencies.
Line 10 imports PostgresEntityBackend unconditionally, but this PR defines Postgres dependencies (psycopg and pgvector) as an optional extra in pyproject.toml. In environments running unit tests without the [pgvector] extra, test collection will fail before pytest can apply markers or load fixtures. Add import guards for the missing optional dependencies.
🛡️ Suggested change
import datetime
import pytest
from unittest.mock import Mock, MagicMock, patch
+pytest.importorskip("psycopg")
+pytest.importorskip("pgvector.psycopg")
+
from kaizen.backend.postgres import PostgresEntityBackend
from kaizen.schema.core import Entity, Namespace, RecordedEntity
from kaizen.schema.conflict_resolution import EntityUpdate🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@tests/unit/test_postgres_backend.py` around lines 7 - 10, Guard the test
module's dependency on optional Postgres packages by calling pytest.importorskip
for the optional packages (e.g., 'psycopg' and 'pgvector') before importing
PostgresEntityBackend (or alternatively wrap the import of PostgresEntityBackend
in a try/except ImportError and call pytest.skip). This ensures the
PostgresEntityBackend class import is only attempted when the optional
dependencies are present and prevents test collection failures in environments
without the [pgvector] extra.
…mocks - Update serialize_content() signature to accept list type matching Entity.content - Replace direct cursor method assignment with patch.object() in postgres tests - Fixes 19 mypy errors across backend/base.py and test_postgres_backend.py
feat: add PostgreSQL/pgvector backend
Introduces a new
PostgresEntityBackendbacked by PostgreSQL with thepgvectorextension for approximate-nearest-neighbour similarity search.Embeddings are generated with
sentence-transformers(default model:all-MiniLM-L6-v2, 384-d) and stored asVECTORcolumns alongside thestandard entity fields.
New files
kaizen/backend/postgres.py— fullPostgresEntityBackendimplementationkaizen/config/postgres.py—PostgresDBSettings(env prefixKAIZEN_PG_)tests/unit/test_postgres_backend.py— unit tests with mocked psycopg connectionChanged files
kaizen/config/kaizen.py— adds"pgvector"to the backendLiteralkaizen/frontend/client/kaizen_client.py— wires upPostgresEntityBackendpyproject.toml— new optional dependency group[pgvector]:psycopg[binary]>=3.1,pgvector>=0.3refactor: extract update_entities template method into BaseEntityBackend
Eliminates ~120 lines of duplicated conflict-resolution dispatch spread across
MilvusEntityBackend,FilesystemEntityBackend, and the newPostgresEntityBackendby lifting the shared logic intoBaseEntityBackend.Pattern
BaseEntityBackend.update_entitiesis now a concrete template method that:_validate_namespace(abstract)resolve_conflicts(lazy-imported to avoid circular deps)_add_entity/_update_entity/_delete_entity(all abstract) or skips onNONE_post_update(concrete no-op default, overridable)Changed files
kaizen/backend/base.pykaizen/backend/milvus.pykaizen/backend/filesystem.pykaizen/backend/postgres.pytests/unit/test_milvus_backend.py— updatedresolve_conflictsmock pathto
kaizen.llm.conflict_resolution.conflict_resolution.resolve_conflictstests/unit/test_postgres_backend.py— same mock path; scalar query mocksreturn tuples instead of dicts to match the default
TupleRowcursorTest plan
pytest -m unit tests/unit/test_postgres_backend.py— all postgres unit tests passpytest -m unit tests/unit/test_milvus_backend.py— existing milvus unit tests still passuvx ruff check/uvx ruff format— no lint or formatting issuespgvectorextension andrun
KAIZEN_BACKEND=pgvector pytest -m e2eagainst the postgres backendSummary by CodeRabbit
New Features
Refactor
Chores
Tests