88
99from __future__ import annotations
1010
11+ import json
1112import os
1213import sqlite3
14+ import threading
1315import uuid
1416from collections .abc import Callable
1517from dataclasses import dataclass , field
@@ -201,6 +203,7 @@ def __init__(self, db_path: str | None = None):
201203
202204 self .db_path = db_path
203205 self ._is_memory = db_path == ":memory:"
206+ self ._owner_thread_id = threading .get_ident ()
204207 self ._persistent_conn : sqlite3 .Connection | None = None
205208
206209 # Python SQLite for auxiliary tables (hyperedges, confidence_updates)
@@ -216,6 +219,7 @@ def __init__(self, db_path: str | None = None):
216219 self ._core_store = rlm_core .MemoryStore .in_memory ()
217220 else :
218221 self ._core_store = rlm_core .MemoryStore .open (db_path )
222+ self ._core_supports_update_fields = hasattr (self ._core_store , "update_fields" )
219223
220224 @property
221225 def uses_rlm_core (self ) -> bool :
@@ -388,9 +392,15 @@ def _get_connection(self) -> sqlite3.Connection:
388392 Returns a non-closeable wrapper around a persistent connection
389393 to avoid opening multiple connections that corrupt rlm_core's WAL state.
390394 """
391- if self ._persistent_conn is not None :
395+ if (
396+ self ._persistent_conn is not None
397+ and threading .get_ident () == self ._owner_thread_id
398+ ):
392399 self ._persistent_conn .row_factory = sqlite3 .Row
393400 return _NoCloseConnection (self ._persistent_conn )
401+
402+ # Use thread-local connections for non-owner threads to avoid sqlite
403+ # thread-affinity errors when tests exercise concurrent session access.
394404 conn = sqlite3 .connect (self .db_path )
395405 conn .row_factory = sqlite3 .Row
396406 return conn
@@ -435,13 +445,51 @@ def _rfc3339_to_epoch_ms(rfc3339_str: str) -> int:
435445 """Convert RFC3339 timestamp string to epoch milliseconds."""
436446 from datetime import datetime
437447
448+ if isinstance (rfc3339_str , int | float ):
449+ return int (rfc3339_str )
438450 try :
439451 # Handle various RFC3339 formats
440452 dt = datetime .fromisoformat (rfc3339_str .replace ("Z" , "+00:00" ))
441453 return int (dt .timestamp () * 1000 )
442- except (ValueError , AttributeError ):
454+ except (ValueError , AttributeError , TypeError ):
443455 return 0
444456
457+ @staticmethod
458+ def _map_db_tier_to_str (tier_value : Any ) -> str :
459+ """Map integer/string DB tier representation to Python tier string."""
460+ if isinstance (tier_value , int ):
461+ mapping = {0 : "task" , 1 : "session" , 2 : "longterm" , 3 : "archive" }
462+ return mapping .get (tier_value , "task" )
463+ tier_name = str (tier_value ).split ("." )[- 1 ].lower ()
464+ return tier_name if tier_name in {"task" , "session" , "longterm" , "archive" } else "task"
465+
466+ def _db_row_to_python_node (self , row : sqlite3 .Row ) -> Node :
467+ """Convert sqlite row from nodes table to Python Node dataclass."""
468+ raw_metadata = row ["metadata" ]
469+ metadata : dict [str , Any ] = {}
470+ if raw_metadata :
471+ try :
472+ metadata = json .loads (raw_metadata )
473+ except (TypeError , ValueError ):
474+ metadata = {}
475+ provenance = metadata .pop ("_provenance" , None ) or row ["provenance_ref" ]
476+ confidence = row ["confidence" ] if row ["confidence" ] is not None else 0.5
477+ return Node (
478+ id = row ["id" ],
479+ type = row ["node_type" ] if row ["node_type" ] in self .VALID_NODE_TYPES else "fact" ,
480+ content = row ["content" ],
481+ tier = self ._map_db_tier_to_str (row ["tier" ]),
482+ confidence = confidence ,
483+ subtype = row ["subtype" ],
484+ embedding = row ["embedding" ],
485+ provenance = provenance ,
486+ metadata = metadata ,
487+ created_at = self ._rfc3339_to_epoch_ms (row ["created_at" ]),
488+ updated_at = self ._rfc3339_to_epoch_ms (row ["updated_at" ]),
489+ last_accessed = self ._rfc3339_to_epoch_ms (row ["last_accessed" ]),
490+ access_count = row ["access_count" ] if row ["access_count" ] is not None else 0 ,
491+ )
492+
445493 def _core_node_to_python (self , node : Any ) -> Node :
446494 """Convert rlm_core.Node to Python Node dataclass."""
447495 meta = node .metadata or {}
@@ -503,6 +551,16 @@ def _create_node_core(
503551
504552 def _get_node_core (self , node_id : str ) -> Node | None :
505553 """Get a node using rlm_core backend."""
554+ if not self ._core_supports_update_fields :
555+ conn = self ._get_connection ()
556+ try :
557+ row = conn .execute ("SELECT * FROM nodes WHERE id = ?" , (node_id ,)).fetchone ()
558+ finally :
559+ conn .close ()
560+ if row is None :
561+ return None
562+ return self ._db_row_to_python_node (row )
563+
506564 try :
507565 result = self ._core_store .get_node (node_id )
508566 except (ValueError , RuntimeError ):
@@ -526,6 +584,29 @@ def _query_nodes_core(
526584 limit : int = 100 ,
527585 ) -> list [Node ]:
528586 """Query nodes using rlm_core backend."""
587+ if not self ._core_supports_update_fields :
588+ where_clauses : list [str ] = []
589+ params : list [Any ] = []
590+ if node_type is not None :
591+ where_clauses .append ("node_type = ?" )
592+ params .append (node_type )
593+ if tier is not None :
594+ where_clauses .append ("tier = ?" )
595+ params .append (self ._core_tier_to_db_value (tier ))
596+ where_sql = f"WHERE { ' AND ' .join (where_clauses )} " if where_clauses else ""
597+ query = (
598+ "SELECT * FROM nodes "
599+ f"{ where_sql } "
600+ "ORDER BY updated_at DESC "
601+ "LIMIT ?"
602+ )
603+ params .append (limit )
604+ conn = self ._get_connection ()
605+ try :
606+ rows = conn .execute (query , params ).fetchall ()
607+ finally :
608+ conn .close ()
609+ return [self ._db_row_to_python_node (row ) for row in rows ]
529610
530611 if node_type is not None :
531612 results = self ._core_store .query_by_type (self ._map_node_type_to_core (node_type ), limit )
@@ -550,6 +631,37 @@ def _query_nodes_core(
550631
551632 def _search_core (self , query : str , limit : int ) -> list [SearchResult ]:
552633 """Search using rlm_core backend."""
634+ if not self ._core_supports_update_fields :
635+ sql = """
636+ SELECT
637+ n.id AS node_id,
638+ n.content AS content,
639+ n.node_type AS node_type,
640+ n.confidence AS confidence,
641+ snippet(nodes_fts, 0, '', '', '...', 10) AS snippet
642+ FROM nodes_fts
643+ JOIN nodes AS n ON n.rowid = nodes_fts.rowid
644+ WHERE nodes_fts MATCH ?
645+ ORDER BY bm25(nodes_fts)
646+ LIMIT ?
647+ """
648+ conn = self ._get_connection ()
649+ try :
650+ rows = conn .execute (sql , (query , limit )).fetchall ()
651+ finally :
652+ conn .close ()
653+ return [
654+ SearchResult (
655+ node_id = row ["node_id" ],
656+ content = row ["content" ],
657+ node_type = row ["node_type" ],
658+ bm25_score = row ["confidence" ] if row ["confidence" ] is not None else 0.5 ,
659+ snippet = row ["snippet" ]
660+ if row ["snippet" ] is not None
661+ else (row ["content" ][:100 ] if len (row ["content" ]) > 100 else row ["content" ]),
662+ )
663+ for row in rows
664+ ]
553665
554666 results = self ._core_store .search_content (query , limit )
555667 return [
@@ -563,6 +675,104 @@ def _search_core(self, query: str, limit: int) -> list[SearchResult]:
563675 for node in results
564676 ]
565677
678+ @staticmethod
679+ def _now_rfc3339 () -> str :
680+ """Return current timestamp in RFC3339 format."""
681+ from datetime import datetime
682+
683+ return datetime .now (UTC ).isoformat (timespec = "microseconds" )
684+
685+ @staticmethod
686+ def _core_tier_to_db_value (tier : Any ) -> int :
687+ """Map rlm_core tier enum/string/int to integer DB representation."""
688+ if isinstance (tier , int ):
689+ if 0 <= tier <= 3 :
690+ return tier
691+ raise ValueError (f"Invalid tier value: { tier } " )
692+ tier_name = str (tier ).split ("." )[- 1 ].lower ()
693+ mapping = {
694+ "task" : 0 ,
695+ "session" : 1 ,
696+ "longterm" : 2 ,
697+ "archive" : 3 ,
698+ }
699+ if tier_name not in mapping :
700+ raise ValueError (f"Invalid tier value: { tier } " )
701+ return mapping [tier_name ]
702+
703+ def _update_core_fields_via_sql (self , node_id : str , ** fields : Any ) -> bool :
704+ """
705+ Update node fields directly in SQLite for older rlm_core builds.
706+
707+ This preserves full update behavior when rlm_core does not expose
708+ MemoryStore.update_fields (e.g., rlm_core 0.1.x).
709+ """
710+ conn = self ._get_connection ()
711+ try :
712+ exists = conn .execute ("SELECT 1 FROM nodes WHERE id = ?" , (node_id ,)).fetchone ()
713+ if exists is None :
714+ return False
715+
716+ assignments : list [str ] = []
717+ params : list [Any ] = []
718+
719+ if "content" in fields :
720+ assignments .append ("content = ?" )
721+ params .append (fields ["content" ])
722+ if "confidence" in fields :
723+ assignments .append ("confidence = ?" )
724+ params .append (fields ["confidence" ])
725+ if "tier" in fields :
726+ assignments .append ("tier = ?" )
727+ params .append (self ._core_tier_to_db_value (fields ["tier" ]))
728+ if "subtype" in fields :
729+ assignments .append ("subtype = ?" )
730+ params .append (fields ["subtype" ])
731+ if "metadata" in fields :
732+ metadata = fields ["metadata" ] if fields ["metadata" ] is not None else {}
733+ assignments .append ("metadata = ?" )
734+ params .append (json .dumps (metadata , separators = ("," , ":" )))
735+ if "provenance_source" in fields :
736+ assignments .append ("provenance_source = ?" )
737+ params .append (fields ["provenance_source" ])
738+ if "provenance_ref" in fields :
739+ assignments .append ("provenance_ref = ?" )
740+ params .append (fields ["provenance_ref" ])
741+ if "last_accessed" in fields :
742+ assignments .append ("last_accessed = ?" )
743+ params .append (fields ["last_accessed" ])
744+ if "access_count" in fields :
745+ assignments .append ("access_count = ?" )
746+ params .append (fields ["access_count" ])
747+
748+ if not assignments :
749+ return True
750+
751+ assignments .append ("updated_at = ?" )
752+ params .append (self ._now_rfc3339 ())
753+ params .append (node_id )
754+
755+ conn .execute (f"UPDATE nodes SET { ', ' .join (assignments )} WHERE id = ?" , params )
756+ conn .commit ()
757+ # Ensure sequential store instances see updates even if their
758+ # SQLite reader configuration does not consume WAL directly.
759+ try :
760+ conn .execute ("PRAGMA wal_checkpoint(FULL)" )
761+ except sqlite3 .OperationalError :
762+ pass
763+ return True
764+ finally :
765+ conn .close ()
766+
767+ def _update_core_fields (self , node_id : str , ** fields : Any ) -> bool :
768+ """Update node fields via rlm_core API or SQL compatibility fallback."""
769+ if not fields :
770+ return True
771+ if self ._core_supports_update_fields :
772+ self ._core_store .update_fields (node_id , ** fields )
773+ return True
774+ return self ._update_core_fields_via_sql (node_id , ** fields )
775+
566776 # =========================================================================
567777 # Node CRUD Operations (SPEC-02.20-24)
568778 # =========================================================================
@@ -673,7 +883,7 @@ def get_node(self, node_id: str, include_archived: bool = False) -> Node | None:
673883 return None
674884 # Track access (best-effort, don't fail reads)
675885 try :
676- self ._core_store . update_fields (
886+ self ._update_core_fields (
677887 node_id ,
678888 access_count = node .access_count + 1 ,
679889 )
@@ -730,7 +940,7 @@ def update_node(self, node_id: str, **kwargs: Any) -> bool:
730940 if "metadata" in kwargs :
731941 update_kwargs ["metadata" ] = kwargs ["metadata" ]
732942
733- self ._core_store . update_fields (node_id , ** update_kwargs )
943+ self ._update_core_fields (node_id , ** update_kwargs )
734944
735945 # Log tier transition if tier changed (SPEC-02.19)
736946 if "tier" in kwargs and kwargs ["tier" ] != old_tier :
@@ -1900,7 +2110,7 @@ def _set_last_accessed(self, node_id: str, timestamp: Any) -> bool:
19002110 rfc3339 = timestamp .astimezone (UTC ).strftime ("%Y-%m-%dT%H:%M:%S.000000+00:00" )
19012111 else :
19022112 rfc3339 = str (timestamp )
1903- return self ._core_store . update_fields (node_id , last_accessed = rfc3339 )
2113+ return self ._update_core_fields (node_id , last_accessed = rfc3339 )
19042114
19052115 def get_nodes_by_metadata (
19062116 self ,
0 commit comments