From 227b8ea59a360f1ca72b8b0c8ea164497973ab0f Mon Sep 17 00:00:00 2001 From: CaralHsi Date: Thu, 25 Sep 2025 11:24:42 +0800 Subject: [PATCH 1/4] feat: add memory size in product api (#348) * feat: add memory size config in product api * fix: memory_size config bug --- src/memos/api/config.py | 12 ++++++++++++ .../textual/tree_text_memory/organize/manager.py | 1 + 2 files changed, 13 insertions(+) diff --git a/src/memos/api/config.py b/src/memos/api/config.py index 355ee0385..c9ff70d4e 100644 --- a/src/memos/api/config.py +++ b/src/memos/api/config.py @@ -518,6 +518,13 @@ def create_user_config(user_name: str, user_id: str) -> tuple[MOSConfig, General "embedder": APIConfig.get_embedder_config(), "internet_retriever": internet_config, "reranker": APIConfig.get_reranker_config(), + "reorganize": os.getenv("MOS_ENABLE_REORGANIZE", "false").lower() + == "true", + "memory_size": { + "WorkingMemory": os.getenv("NEBULAR_WORKING_MEMORY", 20), + "LongTermMemory": os.getenv("NEBULAR_LONGTERM_MEMORY", 1e6), + "UserMemory": os.getenv("NEBULAR_USER_MEMORY", 1e6), + }, }, }, "act_mem": {} @@ -575,6 +582,11 @@ def get_default_cube_config() -> GeneralMemCubeConfig | None: "reorganize": os.getenv("MOS_ENABLE_REORGANIZE", "false").lower() == "true", "internet_retriever": internet_config, + "memory_size": { + "WorkingMemory": os.getenv("NEBULAR_WORKING_MEMORY", 20), + "LongTermMemory": os.getenv("NEBULAR_LONGTERM_MEMORY", 1e6), + "UserMemory": os.getenv("NEBULAR_USER_MEMORY", 1e6), + }, }, }, "act_mem": {} diff --git a/src/memos/memories/textual/tree_text_memory/organize/manager.py b/src/memos/memories/textual/tree_text_memory/organize/manager.py index c9cd4de8a..5cc714806 100644 --- a/src/memos/memories/textual/tree_text_memory/organize/manager.py +++ b/src/memos/memories/textual/tree_text_memory/organize/manager.py @@ -44,6 +44,7 @@ def __init__( "LongTermMemory": 1500, "UserMemory": 480, } + logger.info(f"MemorySize is {self.memory_size}") self._threshold = threshold self.is_reorganize = is_reorganize self.reorganizer = GraphStructureReorganizer( From ea8e631b84f94adca7dfe3d2a7b4ecda88087407 Mon Sep 17 00:00:00 2001 From: CaralHsi Date: Fri, 10 Oct 2025 20:35:51 +0800 Subject: [PATCH 2/4] Fix/remove bug (#356) * fix: nebula search bug * fix: nebula search bug * fix: auto create bug * feat: add single-db-only assertion * feat: make count_nodes support optional memory_type filtering * fix: dim_field when filter non-embedding nodes * feat: add optional whether include embedding when export graph * fix[WIP]: remove oldest memory update * feat: modify nebula search embedding efficiency * fix: modify nebula remove old memory --- src/memos/graph_dbs/nebular.py | 88 ++++++++++++++----- src/memos/memories/textual/tree.py | 4 +- .../tree_text_memory/organize/manager.py | 43 ++++----- 3 files changed, 89 insertions(+), 46 deletions(-) diff --git a/src/memos/graph_dbs/nebular.py b/src/memos/graph_dbs/nebular.py index 66ad894ad..45656b770 100644 --- a/src/memos/graph_dbs/nebular.py +++ b/src/memos/graph_dbs/nebular.py @@ -188,6 +188,19 @@ def _get_or_create_shared_client(cls, cfg: NebulaGraphDBConfig) -> tuple[str, "N client = cls._CLIENT_CACHE.get(key) if client is None: # Connection setting + + tmp_client = NebulaClient( + hosts=cfg.uri, + username=cfg.user, + password=cfg.password, + session_config=SessionConfig(graph=None), + session_pool_config=SessionPoolConfig(size=1, wait_timeout=3000), + ) + try: + cls._ensure_space_exists(tmp_client, cfg) + finally: + tmp_client.close() + conn_conf: ConnectionConfig | None = getattr(cfg, "conn_config", None) if conn_conf is None: conn_conf = ConnectionConfig.from_defults( @@ -318,6 +331,7 @@ def __init__(self, config: NebulaGraphDBConfig): } """ + assert config.use_multi_db is False, "Multi-DB MODE IS NOT SUPPORTED" self.config = config self.db_name = config.space self.user_name = config.user_name @@ -429,15 +443,21 @@ def remove_oldest_memory(self, memory_type: str, keep_latest: int) -> None: if not self.config.use_multi_db and self.config.user_name: optional_condition = f"AND n.user_name = '{self.config.user_name}'" - query = f""" - MATCH (n@Memory) - WHERE n.memory_type = '{memory_type}' - {optional_condition} - ORDER BY n.updated_at DESC - OFFSET {keep_latest} - DETACH DELETE n - """ - self.execute_query(query) + count = self.count_nodes(memory_type) + + if count > keep_latest: + delete_query = f""" + MATCH (n@Memory) + WHERE n.memory_type = '{memory_type}' + {optional_condition} + ORDER BY n.updated_at DESC + OFFSET {keep_latest} + DETACH DELETE n + """ + try: + self.execute_query(delete_query) + except Exception as e: + logger.warning(f"Delete old mem error: {e}") @timed def add_node(self, id: str, memory: str, metadata: dict[str, Any]) -> None: @@ -597,14 +617,19 @@ def get_memory_count(self, memory_type: str) -> int: return -1 @timed - def count_nodes(self, scope: str) -> int: - query = f""" - MATCH (n@Memory) - WHERE n.memory_type = "{scope}" - """ + def count_nodes(self, scope: str | None = None) -> int: + query = "MATCH (n@Memory)" + conditions = [] + + if scope: + conditions.append(f'n.memory_type = "{scope}"') if not self.config.use_multi_db and self.config.user_name: user_name = self.config.user_name - query += f"\nAND n.user_name = '{user_name}'" + conditions.append(f"n.user_name = '{user_name}'") + + if conditions: + query += "\nWHERE " + " AND ".join(conditions) + query += "\nRETURN count(n) AS count" result = self.execute_query(query) @@ -985,8 +1010,7 @@ def search_by_embedding( dim = len(vector) vector_str = ",".join(f"{float(x)}" for x in vector) gql_vector = f"VECTOR<{dim}, FLOAT>([{vector_str}])" - - where_clauses = [] + where_clauses = [f"n.{self.dim_field} IS NOT NULL"] if scope: where_clauses.append(f'n.memory_type = "{scope}"') if status: @@ -1008,15 +1032,12 @@ def search_by_embedding( where_clause = f"WHERE {' AND '.join(where_clauses)}" if where_clauses else "" gql = f""" - MATCH (n@Memory) + let a = {gql_vector} + MATCH (n@Memory /*+ INDEX(idx_memory_user_name) */) {where_clause} - ORDER BY inner_product(n.{self.dim_field}, {gql_vector}) DESC - APPROXIMATE + ORDER BY inner_product(n.{self.dim_field}, a) DESC LIMIT {top_k} - OPTIONS {{ METRIC: IP, TYPE: IVF, NPROBE: 8 }} - RETURN n.id AS id, inner_product(n.{self.dim_field}, {gql_vector}) AS score - """ - + RETURN n.id AS id, inner_product(n.{self.dim_field}, a) AS score""" try: result = self.execute_query(gql) except Exception as e: @@ -1471,6 +1492,25 @@ def merge_nodes(self, id1: str, id2: str) -> str: """ raise NotImplementedError + @classmethod + def _ensure_space_exists(cls, tmp_client, cfg): + """Lightweight check to ensure target graph (space) exists.""" + db_name = getattr(cfg, "space", None) + if not db_name: + logger.warning("[NebulaGraphDBSync] No `space` specified in cfg.") + return + + try: + res = tmp_client.execute("SHOW GRAPHS;") + existing = {row.values()[0].as_string() for row in res} + if db_name not in existing: + tmp_client.execute(f"CREATE GRAPH IF NOT EXISTS `{db_name}` TYPED MemOSBgeM3Type;") + logger.info(f"✅ Graph `{db_name}` created before session binding.") + else: + logger.debug(f"Graph `{db_name}` already exists.") + except Exception: + logger.exception("[NebulaGraphDBSync] Failed to ensure space exists") + @timed def _ensure_database_exists(self): graph_type_name = "MemOSBgeM3Type" diff --git a/src/memos/memories/textual/tree.py b/src/memos/memories/textual/tree.py index f324f41c9..0048f4a59 100644 --- a/src/memos/memories/textual/tree.py +++ b/src/memos/memories/textual/tree.py @@ -326,10 +326,10 @@ def load(self, dir: str) -> None: except Exception as e: logger.error(f"An error occurred while loading memories: {e}") - def dump(self, dir: str) -> None: + def dump(self, dir: str, include_embedding: bool = False) -> None: """Dump memories to os.path.join(dir, self.config.memory_filename)""" try: - json_memories = self.graph_store.export_graph() + json_memories = self.graph_store.export_graph(include_embedding=include_embedding) os.makedirs(dir, exist_ok=True) memory_file = os.path.join(dir, self.config.memory_filename) diff --git a/src/memos/memories/textual/tree_text_memory/organize/manager.py b/src/memos/memories/textual/tree_text_memory/organize/manager.py index 5cc714806..b0224655c 100644 --- a/src/memos/memories/textual/tree_text_memory/organize/manager.py +++ b/src/memos/memories/textual/tree_text_memory/organize/manager.py @@ -67,30 +67,33 @@ def add(self, memories: list[TextualMemoryItem]) -> list[str]: except Exception as e: logger.exception("Memory processing error: ", exc_info=e) - try: - self.graph_store.remove_oldest_memory( - memory_type="WorkingMemory", keep_latest=self.memory_size["WorkingMemory"] - ) - except Exception: - logger.warning(f"Remove WorkingMemory error: {traceback.format_exc()}") - - try: - self.graph_store.remove_oldest_memory( - memory_type="LongTermMemory", keep_latest=self.memory_size["LongTermMemory"] - ) - except Exception: - logger.warning(f"Remove LongTermMemory error: {traceback.format_exc()}") - - try: - self.graph_store.remove_oldest_memory( - memory_type="UserMemory", keep_latest=self.memory_size["UserMemory"] - ) - except Exception: - logger.warning(f"Remove UserMemory error: {traceback.format_exc()}") + # Only clean up if we're close to or over the limit + self._cleanup_memories_if_needed() self._refresh_memory_size() return added_ids + def _cleanup_memories_if_needed(self) -> None: + """ + Only clean up memories if we're close to or over the limit. + This reduces unnecessary database operations. + """ + cleanup_threshold = 0.8 # Clean up when 80% full + + for memory_type, limit in self.memory_size.items(): + current_count = self.current_memory_size.get(memory_type, 0) + threshold = int(limit * cleanup_threshold) + + # Only clean up if we're at or above the threshold + if current_count >= threshold: + try: + self.graph_store.remove_oldest_memory( + memory_type=memory_type, keep_latest=limit + ) + logger.debug(f"Cleaned up {memory_type}: {current_count} -> {limit}") + except Exception: + logger.warning(f"Remove {memory_type} error: {traceback.format_exc()}") + def replace_working_memory(self, memories: list[TextualMemoryItem]) -> None: """ Replace WorkingMemory From bf8d458e356ab77aba74a4714c5f8803c481d6ee Mon Sep 17 00:00:00 2001 From: harvey_xiang Date: Sat, 11 Oct 2025 13:29:52 +0800 Subject: [PATCH 3/4] fix: api client get_message models --- src/memos/api/client.py | 12 +++++++++--- src/memos/api/product_models.py | 2 +- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/src/memos/api/client.py b/src/memos/api/client.py index d45276f2c..83b0b6f11 100644 --- a/src/memos/api/client.py +++ b/src/memos/api/client.py @@ -26,7 +26,10 @@ def __init__(self, api_key: str | None = None, base_url: str | None = None): if not api_key: raise ValueError("MemOS API key is required") - self.headers = {"Content-Type": "application/json", "Authorization": f"Token {api_key}"} + self.headers = { + "Content-Type": "application/json", + "Authorization": f"Token {api_key}" + } def _validate_required_params(self, **params): """Validate required parameters - if passed, they must not be empty""" @@ -42,7 +45,7 @@ def get_message( self._validate_required_params(user_id=user_id) url = f"{self.base_url}/get/message" - payload = {"user_id": user_id, "conversation_id": conversation_id} + payload = { "user_id": user_id, "conversation_id": conversation_id } for retry in range(MAX_RETRY_COUNT): try: response = requests.post( @@ -50,6 +53,7 @@ def get_message( ) response.raise_for_status() response_data = response.json() + return MemOSGetMessagesResponse(**response_data) except Exception as e: logger.error(f"Failed to get messages (retry {retry + 1}/3): {e}") @@ -66,7 +70,7 @@ def add_message( ) url = f"{self.base_url}/add/message" - payload = {"messages": messages, "user_id": user_id, "conversation_id": conversation_id} + payload = { "messages": messages, "user_id": user_id, "conversation_id": conversation_id } for retry in range(MAX_RETRY_COUNT): try: response = requests.post( @@ -74,6 +78,7 @@ def add_message( ) response.raise_for_status() response_data = response.json() + return MemOSAddResponse(**response_data) except Exception as e: logger.error(f"Failed to add memory (retry {retry + 1}/3): {e}") @@ -102,6 +107,7 @@ def search_memory( ) response.raise_for_status() response_data = response.json() + return MemOSSearchResponse(**response_data) except Exception as e: logger.error(f"Failed to search memory (retry {retry + 1}/3): {e}") diff --git a/src/memos/api/product_models.py b/src/memos/api/product_models.py index 7e425415b..2d03d2946 100644 --- a/src/memos/api/product_models.py +++ b/src/memos/api/product_models.py @@ -191,7 +191,7 @@ class GetMessagesData(BaseModel): """Data model for get messages response based on actual API.""" message_detail_list: list[MessageDetail] = Field( - default_factory=list, alias="memory_detail_list", description="List of message details" + default_factory=list, alias="message_detail_list", description="List of message details" ) From b35ec49e956e4e49efcaf6fb2ac7047b1d67649b Mon Sep 17 00:00:00 2001 From: harvey_xiang Date: Sat, 11 Oct 2025 13:38:53 +0800 Subject: [PATCH 4/4] fix: format error --- src/memos/api/client.py | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/src/memos/api/client.py b/src/memos/api/client.py index 83b0b6f11..912f883a7 100644 --- a/src/memos/api/client.py +++ b/src/memos/api/client.py @@ -26,10 +26,7 @@ def __init__(self, api_key: str | None = None, base_url: str | None = None): if not api_key: raise ValueError("MemOS API key is required") - self.headers = { - "Content-Type": "application/json", - "Authorization": f"Token {api_key}" - } + self.headers = {"Content-Type": "application/json", "Authorization": f"Token {api_key}"} def _validate_required_params(self, **params): """Validate required parameters - if passed, they must not be empty""" @@ -45,7 +42,7 @@ def get_message( self._validate_required_params(user_id=user_id) url = f"{self.base_url}/get/message" - payload = { "user_id": user_id, "conversation_id": conversation_id } + payload = {"user_id": user_id, "conversation_id": conversation_id} for retry in range(MAX_RETRY_COUNT): try: response = requests.post( @@ -70,7 +67,7 @@ def add_message( ) url = f"{self.base_url}/add/message" - payload = { "messages": messages, "user_id": user_id, "conversation_id": conversation_id } + payload = {"messages": messages, "user_id": user_id, "conversation_id": conversation_id} for retry in range(MAX_RETRY_COUNT): try: response = requests.post(