From a7eb1682f8903a0d70cf28b5547dbc763c48aef3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B8=AD=E9=98=B3=E9=98=B3?= Date: Sun, 28 Sep 2025 20:32:54 +0800 Subject: [PATCH 01/22] feat: define mem-read schedular message&consumer; add async mem-reader mode in core; --- src/memos/mem_os/core.py | 14 +++++++ src/memos/mem_reader/base.py | 2 +- src/memos/mem_reader/simple_struct.py | 4 +- src/memos/mem_scheduler/general_scheduler.py | 5 +++ .../mem_scheduler/schemas/general_schemas.py | 1 + src/memos/memories/textual/base.py | 3 ++ src/memos/memories/textual/general.py | 2 + src/memos/memories/textual/naive.py | 2 + src/memos/memories/textual/tree.py | 30 +++---------- .../tree_text_memory/organize/manager.py | 42 ++++++++----------- 10 files changed, 54 insertions(+), 51 deletions(-) diff --git a/src/memos/mem_os/core.py b/src/memos/mem_os/core.py index 54e507b50..0055e8953 100644 --- a/src/memos/mem_os/core.py +++ b/src/memos/mem_os/core.py @@ -17,6 +17,7 @@ from memos.mem_scheduler.schemas.general_schemas import ( ADD_LABEL, ANSWER_LABEL, + MEM_READ_LABEL, QUERY_LABEL, ) from memos.mem_scheduler.schemas.message_schemas import ScheduleMessageItem @@ -695,6 +696,7 @@ def add( logger.info( f"time add: messages is not None and enable_textual_memory and text_mem is not None time user_id: {target_user_id} time is: {time.time() - time_start_1}" ) + sync_mode = self.mem_cubes[mem_cube_id].text_mem.mode if self.mem_cubes[mem_cube_id].config.text_mem.backend != "tree_text": add_memory = [] metadata = TextualMemoryMetadata( @@ -712,6 +714,7 @@ def add( messages_list, type="chat", info={"user_id": target_user_id, "session_id": target_session_id}, + mode="fast" if sync_mode == "async" else "fine", ) logger.info( f"time add: get mem_reader time user_id: {target_user_id} time is: {time.time() - time_start_2}" @@ -724,6 +727,17 @@ def add( f"Added memory user {target_user_id} to memcube {mem_cube_id}: {mem_id_list}" ) + if sync_mode == "async" and self.mem_scheduler is not None: + message_item = ScheduleMessageItem( + user_id=target_user_id, + mem_cube_id=mem_cube_id, + mem_cube=self.mem_cubes[mem_cube_id], + label=MEM_READ_LABEL, + content={json.dumps(mem_ids)}, + timestamp=datetime.utcnow(), + ) + self.mem_scheduler.submit_messages(messages=[message_item]) + # submit messages for scheduler if self.enable_mem_scheduler and self.mem_scheduler is not None: mem_cube = self.mem_cubes[mem_cube_id] diff --git a/src/memos/mem_reader/base.py b/src/memos/mem_reader/base.py index f092c3870..ba8be8652 100644 --- a/src/memos/mem_reader/base.py +++ b/src/memos/mem_reader/base.py @@ -18,7 +18,7 @@ def get_scene_data_info(self, scene_data: list, type: str) -> list[str]: @abstractmethod def get_memory( - self, scene_data: list, type: str, info: dict[str, Any] + self, scene_data: list, type: str, info: dict[str, Any], mode: str = "fast" ) -> list[list[TextualMemoryItem]]: """Various types of memories extracted from scene_data""" diff --git a/src/memos/mem_reader/simple_struct.py b/src/memos/mem_reader/simple_struct.py index b439cb2b2..2d20453ab 100644 --- a/src/memos/mem_reader/simple_struct.py +++ b/src/memos/mem_reader/simple_struct.py @@ -202,7 +202,7 @@ def _process_chat_data(self, scene_data_info, info): return chat_read_nodes def get_memory( - self, scene_data: list, type: str, info: dict[str, Any] + self, scene_data: list, type: str, info: dict[str, Any], mode: str = "fast" ) -> list[list[TextualMemoryItem]]: """ Extract and classify memory content from scene_data. @@ -219,6 +219,8 @@ def get_memory( - topic_chunk_overlap: Overlap for large topic chunks (default: 100) - chunk_size: Size for small chunks (default: 256) - chunk_overlap: Overlap for small chunks (default: 50) + mode: mem-reader mode, fast for quick process while fine for + better understanding via calling llm Returns: list[list[TextualMemoryItem]] containing memory content with summaries as keys and original text as values Raises: diff --git a/src/memos/mem_scheduler/general_scheduler.py b/src/memos/mem_scheduler/general_scheduler.py index 340400abf..c85bdb756 100644 --- a/src/memos/mem_scheduler/general_scheduler.py +++ b/src/memos/mem_scheduler/general_scheduler.py @@ -8,6 +8,7 @@ ADD_LABEL, ANSWER_LABEL, DEFAULT_MAX_QUERY_KEY_WORDS, + MEM_READ_LABEL, QUERY_LABEL, WORKING_MEMORY_TYPE, MemCubeID, @@ -34,6 +35,7 @@ def __init__(self, config: GeneralSchedulerConfig): QUERY_LABEL: self._query_message_consumer, ANSWER_LABEL: self._answer_message_consumer, ADD_LABEL: self._add_message_consumer, + MEM_READ_LABEL: self._mem_read_message_consumer, } self.dispatcher.register_handlers(handlers) @@ -216,6 +218,9 @@ def _add_message_consumer(self, messages: list[ScheduleMessageItem]) -> None: except Exception as e: logger.error(f"Error: {e}", exc_info=True) + def _mem_read_message_consumer(self, messages: list[ScheduleMessageItem]) -> None: + logger.info(f"Messages {messages} assigned to {ADD_LABEL} handler.") + def process_session_turn( self, queries: str | list[str], diff --git a/src/memos/mem_scheduler/schemas/general_schemas.py b/src/memos/mem_scheduler/schemas/general_schemas.py index a81caf5a8..ac24cd2ee 100644 --- a/src/memos/mem_scheduler/schemas/general_schemas.py +++ b/src/memos/mem_scheduler/schemas/general_schemas.py @@ -8,6 +8,7 @@ QUERY_LABEL = "query" ANSWER_LABEL = "answer" ADD_LABEL = "add" +MEM_READ_LABEL = "mem_read" TreeTextMemory_SEARCH_METHOD = "tree_text_memory_search" TreeTextMemory_FINE_SEARCH_METHOD = "tree_text_memory_fine_search" diff --git a/src/memos/memories/textual/base.py b/src/memos/memories/textual/base.py index 8171fadce..26efb1cb3 100644 --- a/src/memos/memories/textual/base.py +++ b/src/memos/memories/textual/base.py @@ -10,6 +10,9 @@ class BaseTextMemory(BaseMemory): """Base class for all textual memory implementations.""" + # Default mode configuration - can be overridden by subclasses + mode: str = "sync" # Default mode: 'async' or 'sync' + @abstractmethod def __init__(self, config: BaseTextMemoryConfig): """Initialize memory with the given configuration.""" diff --git a/src/memos/memories/textual/general.py b/src/memos/memories/textual/general.py index 9793224b5..d71a86d2e 100644 --- a/src/memos/memories/textual/general.py +++ b/src/memos/memories/textual/general.py @@ -26,6 +26,8 @@ class GeneralTextMemory(BaseTextMemory): def __init__(self, config: GeneralTextMemoryConfig): """Initialize memory with the given configuration.""" + # Set mode from class default or override if needed + self.mode = getattr(self.__class__, "mode", "sync") self.config: GeneralTextMemoryConfig = config self.extractor_llm: OpenAILLM | OllamaLLM | AzureLLM = LLMFactory.from_config( config.extractor_llm diff --git a/src/memos/memories/textual/naive.py b/src/memos/memories/textual/naive.py index f8684729a..7bc49e767 100644 --- a/src/memos/memories/textual/naive.py +++ b/src/memos/memories/textual/naive.py @@ -61,6 +61,8 @@ class NaiveTextMemory(BaseTextMemory): def __init__(self, config: NaiveTextMemoryConfig): """Initialize memory with the given configuration.""" + # Set mode from class default or override if needed + self.mode = getattr(self.__class__, "mode", "sync") self.config = config self.extractor_llm = LLMFactory.from_config(config.extractor_llm) self.memories = [] diff --git a/src/memos/memories/textual/tree.py b/src/memos/memories/textual/tree.py index f324f41c9..7196738d8 100644 --- a/src/memos/memories/textual/tree.py +++ b/src/memos/memories/textual/tree.py @@ -2,7 +2,6 @@ import os import shutil import tempfile -import time from datetime import datetime from pathlib import Path @@ -31,30 +30,22 @@ class TreeTextMemory(BaseTextMemory): """General textual memory implementation for storing and retrieving memories.""" + # Override the default mode to async for TreeTextMemory + mode: str = "async" + def __init__(self, config: TreeTextMemoryConfig): """Initialize memory with the given configuration.""" - time_start = time.time() + # Set mode from class default or override if needed + self.mode = getattr(self.__class__, "mode", "async") self.config: TreeTextMemoryConfig = config self.extractor_llm: OpenAILLM | OllamaLLM | AzureLLM = LLMFactory.from_config( config.extractor_llm ) - logger.info(f"time init: extractor_llm time is: {time.time() - time_start}") - - time_start_ex = time.time() self.dispatcher_llm: OpenAILLM | OllamaLLM | AzureLLM = LLMFactory.from_config( config.dispatcher_llm ) - logger.info(f"time init: dispatcher_llm time is: {time.time() - time_start_ex}") - - time_start_em = time.time() self.embedder: OllamaEmbedder = EmbedderFactory.from_config(config.embedder) - logger.info(f"time init: embedder time is: {time.time() - time_start_em}") - - time_start_gs = time.time() self.graph_store: Neo4jGraphDB = GraphStoreFactory.from_config(config.graph_db) - logger.info(f"time init: graph_store time is: {time.time() - time_start_gs}") - - time_start_rr = time.time() if config.reranker is None: default_cfg = RerankerConfigFactory.model_validate( { @@ -68,10 +59,7 @@ def __init__(self, config: TreeTextMemoryConfig): self.reranker = RerankerFactory.from_config(default_cfg) else: self.reranker = RerankerFactory.from_config(config.reranker) - logger.info(f"time init: reranker time is: {time.time() - time_start_rr}") self.is_reorganize = config.reorganize - - time_start_mm = time.time() self.memory_manager: MemoryManager = MemoryManager( self.graph_store, self.embedder, @@ -84,8 +72,6 @@ def __init__(self, config: TreeTextMemoryConfig): }, is_reorganize=self.is_reorganize, ) - logger.info(f"time init: memory_manager time is: {time.time() - time_start_mm}") - time_start_ir = time.time() # Create internet retriever if configured self.internet_retriever = None if config.internet_retriever is not None: @@ -97,17 +83,11 @@ def __init__(self, config: TreeTextMemoryConfig): ) else: logger.info("No internet retriever configured") - logger.info(f"time init: internet_retriever time is: {time.time() - time_start_ir}") def add(self, memories: list[TextualMemoryItem | dict[str, Any]]) -> list[str]: """Add memories. Args: memories: List of TextualMemoryItem objects or dictionaries to add. - Later: - memory_items = [TextualMemoryItem(**m) if isinstance(m, dict) else m for m in memories] - metadata = extract_metadata(memory_items, self.extractor_llm) - plan = plan_memory_operations(memory_items, metadata, self.graph_store) - execute_plan(memory_items, metadata, plan, self.graph_store) """ return self.memory_manager.add(memories) diff --git a/src/memos/memories/textual/tree_text_memory/organize/manager.py b/src/memos/memories/textual/tree_text_memory/organize/manager.py index 5cc714806..0e86fe41f 100644 --- a/src/memos/memories/textual/tree_text_memory/organize/manager.py +++ b/src/memos/memories/textual/tree_text_memory/organize/manager.py @@ -1,3 +1,4 @@ +import asyncio import traceback import uuid @@ -54,7 +55,7 @@ def __init__( def add(self, memories: list[TextualMemoryItem]) -> list[str]: """ - Add new memories in parallel to different memory types (WorkingMemory, LongTermMemory, UserMemory). + Add new memories in parallel to different memory types. """ added_ids: list[str] = [] @@ -66,29 +67,6 @@ def add(self, memories: list[TextualMemoryItem]) -> list[str]: added_ids.extend(ids) except Exception as e: logger.exception("Memory processing error: ", exc_info=e) - - try: - self.graph_store.remove_oldest_memory( - memory_type="WorkingMemory", keep_latest=self.memory_size["WorkingMemory"] - ) - except Exception: - logger.warning(f"Remove WorkingMemory error: {traceback.format_exc()}") - - try: - self.graph_store.remove_oldest_memory( - memory_type="LongTermMemory", keep_latest=self.memory_size["LongTermMemory"] - ) - except Exception: - logger.warning(f"Remove LongTermMemory error: {traceback.format_exc()}") - - try: - self.graph_store.remove_oldest_memory( - memory_type="UserMemory", keep_latest=self.memory_size["UserMemory"] - ) - except Exception: - logger.warning(f"Remove UserMemory error: {traceback.format_exc()}") - - self._refresh_memory_size() return added_ids def replace_working_memory(self, memories: list[TextualMemoryItem]) -> None: @@ -266,6 +244,22 @@ def _ensure_structure_path( # Step 3: Return this structure node ID as the parent_id return node_id + async def _remove_and_refresh_memory(self): + remove_tasks = [ + self._remove_oldest_memory_async("WorkingMemory", self.memory_size["WorkingMemory"]), + self._remove_oldest_memory_async("LongTermMemory", self.memory_size["LongTermMemory"]), + self._remove_oldest_memory_async("UserMemory", self.memory_size["UserMemory"]), + ] + await asyncio.gather(*remove_tasks) + await asyncio.to_thread(self._refresh_memory_size) + print("finished remove and refresh memory") + + async def _remove_oldest_memory_async(self, memory_type: str, memory_size: int): + try: + await asyncio.to_thread(self.graph_store.remove_oldest_memory, memory_type, memory_size) + except Exception: + logger.warning(f"Remove {memory_type} error: {traceback.format_exc()}") + def wait_reorganizer(self): """ Wait for the reorganizer to finish processing all messages. From 8a24ec741e8a4c21850a4800fe3dc39585f3862c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B8=AD=E9=98=B3=E9=98=B3?= Date: Sun, 28 Sep 2025 21:10:44 +0800 Subject: [PATCH 02/22] feat: add fast/fine mode in mem-reader; --- src/memos/mem_reader/simple_struct.py | 10 ++++-- src/memos/mem_scheduler/general_scheduler.py | 36 +++++++++++++++++++- 2 files changed, 43 insertions(+), 3 deletions(-) diff --git a/src/memos/mem_reader/simple_struct.py b/src/memos/mem_reader/simple_struct.py index 2d20453ab..a0e4c061e 100644 --- a/src/memos/mem_reader/simple_struct.py +++ b/src/memos/mem_reader/simple_struct.py @@ -128,7 +128,10 @@ def __init__(self, config: SimpleStructMemReaderConfig): self.chunker = ChunkerFactory.from_config(config.chunker) @timed - def _process_chat_data(self, scene_data_info, info): + def _process_chat_data(self, scene_data_info, info, **kwargs): + mode = kwargs.get("mode", "fine") + if mode == "fast": + raise NotImplementedError mem_list = [] for item in scene_data_info: if "chat_time" in item: @@ -255,7 +258,7 @@ def get_memory( # Process Q&A pairs concurrently with context propagation with ContextThreadPoolExecutor() as executor: futures = [ - executor.submit(processing_func, scene_data_info, info) + executor.submit(processing_func, scene_data_info, info, mode) for scene_data_info in list_scene_data_info ] for future in concurrent.futures.as_completed(futures): @@ -319,6 +322,9 @@ def get_scene_data_info(self, scene_data: list, type: str) -> list[str]: return results def _process_doc_data(self, scene_data_info, info, **kwargs): + mode = kwargs.get("mode", "fine") + if mode == "fast": + raise NotImplementedError chunks = self.chunker.chunk(scene_data_info["text"]) messages = [] for chunk in chunks: diff --git a/src/memos/mem_scheduler/general_scheduler.py b/src/memos/mem_scheduler/general_scheduler.py index c85bdb756..a0056d98c 100644 --- a/src/memos/mem_scheduler/general_scheduler.py +++ b/src/memos/mem_scheduler/general_scheduler.py @@ -219,7 +219,41 @@ def _add_message_consumer(self, messages: list[ScheduleMessageItem]) -> None: logger.error(f"Error: {e}", exc_info=True) def _mem_read_message_consumer(self, messages: list[ScheduleMessageItem]) -> None: - logger.info(f"Messages {messages} assigned to {ADD_LABEL} handler.") + logger.info(f"Messages {messages} assigned to {MEM_READ_LABEL} handler.") + + for message in messages: + try: + user_id = message.user_id + mem_cube_id = message.mem_cube_id + mem_cube = message.mem_cube + content = message.content + + # Parse the memory IDs from content + mem_ids = json.loads(content) if isinstance(content, str) else content + + logger.info( + f"Processing mem_read for user_id={user_id}, mem_cube_id={mem_cube_id}, mem_ids={mem_ids}" + ) + + # Get the text memory from the mem_cube + text_mem = mem_cube.text_mem + if not isinstance(text_mem, TreeTextMemory): + logger.error(f"Expected TreeTextMemory but got {type(text_mem).__name__}") + continue + + # Process the memory reading/retrieval logic here + # This could include: + # 1. Triggering memory reorganization + # 2. Updating memory relationships + # 3. Performing additional memory processing + + # For now, just log the successful processing + logger.info( + f"Successfully processed mem_read for user_id={user_id}, mem_cube_id={mem_cube_id}" + ) + + except Exception as e: + logger.error(f"Error processing mem_read message: {e}", exc_info=True) def process_session_turn( self, From 81915a33f1899050be3b0e0a104dd4057d8a22bc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B8=AD=E9=98=B3=E9=98=B3?= Date: Mon, 29 Sep 2025 11:55:16 +0800 Subject: [PATCH 03/22] feat: add mem-reader in scheduler --- src/memos/mem_os/core.py | 3 +- src/memos/mem_scheduler/base_scheduler.py | 1 + src/memos/mem_scheduler/general_scheduler.py | 138 ++++++++++++++++++- 3 files changed, 135 insertions(+), 7 deletions(-) diff --git a/src/memos/mem_os/core.py b/src/memos/mem_os/core.py index 0055e8953..261a18d12 100644 --- a/src/memos/mem_os/core.py +++ b/src/memos/mem_os/core.py @@ -71,6 +71,7 @@ def __init__(self, config: MOSConfig, user_manager: UserManager | None = None): if self.enable_mem_scheduler: self._mem_scheduler = self._initialize_mem_scheduler() self._mem_scheduler.mem_cubes = self.mem_cubes + self._mem_scheduler.mem_reader = self.mem_reader else: self._mem_scheduler: GeneralScheduler = None @@ -733,7 +734,7 @@ def add( mem_cube_id=mem_cube_id, mem_cube=self.mem_cubes[mem_cube_id], label=MEM_READ_LABEL, - content={json.dumps(mem_ids)}, + content=json.dumps(mem_ids), timestamp=datetime.utcnow(), ) self.mem_scheduler.submit_messages(messages=[message_item]) diff --git a/src/memos/mem_scheduler/base_scheduler.py b/src/memos/mem_scheduler/base_scheduler.py index b6ef00d8d..0a8c43d78 100644 --- a/src/memos/mem_scheduler/base_scheduler.py +++ b/src/memos/mem_scheduler/base_scheduler.py @@ -67,6 +67,7 @@ def __init__(self, config: BaseSchedulerConfig): self.db_engine: Engine | None = None self.monitor: SchedulerGeneralMonitor | None = None self.dispatcher_monitor: SchedulerDispatcherMonitor | None = None + self.mem_reader = None # Will be set by MOSCore self.dispatcher = SchedulerDispatcher( max_workers=self.thread_pool_max_workers, enable_parallel_dispatch=self.enable_parallel_dispatch, diff --git a/src/memos/mem_scheduler/general_scheduler.py b/src/memos/mem_scheduler/general_scheduler.py index a0056d98c..2058f1c87 100644 --- a/src/memos/mem_scheduler/general_scheduler.py +++ b/src/memos/mem_scheduler/general_scheduler.py @@ -241,13 +241,15 @@ def _mem_read_message_consumer(self, messages: list[ScheduleMessageItem]) -> Non logger.error(f"Expected TreeTextMemory but got {type(text_mem).__name__}") continue - # Process the memory reading/retrieval logic here - # This could include: - # 1. Triggering memory reorganization - # 2. Updating memory relationships - # 3. Performing additional memory processing + # Use mem_reader to process the memories + self._process_memories_with_reader( + mem_ids=mem_ids, + user_id=user_id, + mem_cube_id=mem_cube_id, + mem_cube=mem_cube, + text_mem=text_mem, + ) - # For now, just log the successful processing logger.info( f"Successfully processed mem_read for user_id={user_id}, mem_cube_id={mem_cube_id}" ) @@ -255,6 +257,130 @@ def _mem_read_message_consumer(self, messages: list[ScheduleMessageItem]) -> Non except Exception as e: logger.error(f"Error processing mem_read message: {e}", exc_info=True) + def _process_memories_with_reader( + self, + mem_ids: list[str], + user_id: str, + mem_cube_id: str, + mem_cube: GeneralMemCube, + text_mem: TreeTextMemory, + ) -> None: + """ + Process memories using mem_reader for enhanced memory processing. + + Args: + mem_ids: List of memory IDs to process + user_id: User ID + mem_cube_id: Memory cube ID + mem_cube: Memory cube instance + text_mem: Text memory instance + """ + try: + # Get the mem_reader from the parent MOSCore + if not hasattr(self, "mem_reader") or self.mem_reader is None: + logger.warning( + "mem_reader not available in scheduler, skipping enhanced processing" + ) + return + + # Get the original memory items + memory_items = [] + for mem_id in mem_ids: + try: + memory_item = text_mem.get(mem_id) + memory_items.append(memory_item) + except Exception as e: + logger.warning(f"Failed to get memory {mem_id}: {e}") + continue + + if not memory_items: + logger.warning("No valid memory items found for processing") + return + + # Prepare scene data for mem_reader + scene_data = [] + for memory_item in memory_items: + scene_data.append( + { + "role": "user", # or determine from metadata + "content": memory_item.memory, + "chat_time": memory_item.metadata.updated_at + or memory_item.metadata.created_at, + } + ) + + # Use mem_reader to process the memories + logger.info(f"Processing {len(scene_data)} memories with mem_reader") + + # Extract memories using mem_reader + processed_memories = self.mem_reader.get_memory( + scene_data=scene_data, + type="chat", + info={"user_id": user_id, "session_id": "", "mem_cube_id": mem_cube_id}, + mode="fast", # Use fast mode for async processing + ) + + if processed_memories and len(processed_memories) > 0: + # Flatten the results (mem_reader returns list of lists) + flattened_memories = [] + for memory_list in processed_memories: + flattened_memories.extend(memory_list) + + logger.info(f"mem_reader processed {len(flattened_memories)} enhanced memories") + + # Add the enhanced memories back to the memory system + if flattened_memories: + enhanced_mem_ids = text_mem.add(flattened_memories) + logger.info( + f"Added {len(enhanced_mem_ids)} enhanced memories: {enhanced_mem_ids}" + ) + + # Trigger memory reorganization if needed + self._trigger_memory_reorganization( + text_mem=text_mem, user_id=user_id, mem_cube_id=mem_cube_id + ) + else: + logger.info("No enhanced memories generated by mem_reader") + else: + logger.info("mem_reader returned no processed memories") + + except Exception as e: + logger.error(f"Error in _process_memories_with_reader: {e}", exc_info=True) + + def _trigger_memory_reorganization( + self, + text_mem: TreeTextMemory, + user_id: str, + mem_cube_id: str, + ) -> None: + """ + Trigger memory reorganization after enhanced processing. + + Args: + text_mem: Text memory instance + user_id: User ID + mem_cube_id: Memory cube ID + """ + try: + # Check if reorganization is enabled + if hasattr(text_mem, "is_reorganize") and text_mem.is_reorganize: + logger.info( + f"Triggering memory reorganization for user_id={user_id}, mem_cube_id={mem_cube_id}" + ) + + # Get current working memory size + current_sizes = text_mem.get_current_memory_size() + logger.info(f"Current memory sizes: {current_sizes}") + + # The reorganization will be handled by the memory manager + # This is just a trigger point for logging and monitoring + logger.info("Memory reorganization triggered successfully") + else: + logger.info("Memory reorganization is disabled, skipping reorganization trigger") + + except Exception as e: + logger.error(f"Error in _trigger_memory_reorganization: {e}", exc_info=True) + def process_session_turn( self, queries: str | list[str], From b5086e77c8a4a1c12aae3d94a321d19cf06a5856 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B8=AD=E9=98=B3=E9=98=B3?= Date: Mon, 13 Oct 2025 20:58:18 +0800 Subject: [PATCH 04/22] feat: change async remove --- .../tree_text_memory/organize/manager.py | 58 +++++++------------ 1 file changed, 21 insertions(+), 37 deletions(-) diff --git a/src/memos/memories/textual/tree_text_memory/organize/manager.py b/src/memos/memories/textual/tree_text_memory/organize/manager.py index 6be610761..53b01fd78 100644 --- a/src/memos/memories/textual/tree_text_memory/organize/manager.py +++ b/src/memos/memories/textual/tree_text_memory/organize/manager.py @@ -67,33 +67,8 @@ def add(self, memories: list[TextualMemoryItem]) -> list[str]: added_ids.extend(ids) except Exception as e: logger.exception("Memory processing error: ", exc_info=e) - - # Only clean up if we're close to or over the limit - self._cleanup_memories_if_needed() - self._refresh_memory_size() return added_ids - def _cleanup_memories_if_needed(self) -> None: - """ - Only clean up memories if we're close to or over the limit. - This reduces unnecessary database operations. - """ - cleanup_threshold = 0.8 # Clean up when 80% full - - for memory_type, limit in self.memory_size.items(): - current_count = self.current_memory_size.get(memory_type, 0) - threshold = int(limit * cleanup_threshold) - - # Only clean up if we're at or above the threshold - if current_count >= threshold: - try: - self.graph_store.remove_oldest_memory( - memory_type=memory_type, keep_latest=limit - ) - logger.debug(f"Cleaned up {memory_type}: {current_count} -> {limit}") - except Exception: - logger.warning(f"Remove {memory_type} error: {traceback.format_exc()}") - def replace_working_memory(self, memories: list[TextualMemoryItem]) -> None: """ Replace WorkingMemory @@ -270,20 +245,29 @@ def _ensure_structure_path( return node_id async def _remove_and_refresh_memory(self): - remove_tasks = [ - self._remove_oldest_memory_async("WorkingMemory", self.memory_size["WorkingMemory"]), - self._remove_oldest_memory_async("LongTermMemory", self.memory_size["LongTermMemory"]), - self._remove_oldest_memory_async("UserMemory", self.memory_size["UserMemory"]), - ] - await asyncio.gather(*remove_tasks) + await asyncio.to_thread(self._cleanup_memories_if_needed) await asyncio.to_thread(self._refresh_memory_size) - print("finished remove and refresh memory") - async def _remove_oldest_memory_async(self, memory_type: str, memory_size: int): - try: - await asyncio.to_thread(self.graph_store.remove_oldest_memory, memory_type, memory_size) - except Exception: - logger.warning(f"Remove {memory_type} error: {traceback.format_exc()}") + def _cleanup_memories_if_needed(self) -> None: + """ + Only clean up memories if we're close to or over the limit. + This reduces unnecessary database operations. + """ + cleanup_threshold = 0.8 # Clean up when 80% full + + for memory_type, limit in self.memory_size.items(): + current_count = self.current_memory_size.get(memory_type, 0) + threshold = int(limit * cleanup_threshold) + + # Only clean up if we're at or above the threshold + if current_count >= threshold: + try: + self.graph_store.remove_oldest_memory( + memory_type=memory_type, keep_latest=limit + ) + logger.debug(f"Cleaned up {memory_type}: {current_count} -> {limit}") + except Exception: + logger.warning(f"Remove {memory_type} error: {traceback.format_exc()}") def wait_reorganizer(self): """ From 0b2649d7934d325ffc0660064e86074969b18847 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B8=AD=E9=98=B3=E9=98=B3?= Date: Tue, 14 Oct 2025 14:39:02 +0800 Subject: [PATCH 05/22] feat: modify async-add in core.py --- src/memos/mem_os/core.py | 27 ++++++++++++++++----------- 1 file changed, 16 insertions(+), 11 deletions(-) diff --git a/src/memos/mem_os/core.py b/src/memos/mem_os/core.py index 261a18d12..b536ec5b2 100644 --- a/src/memos/mem_os/core.py +++ b/src/memos/mem_os/core.py @@ -698,6 +698,11 @@ def add( f"time add: messages is not None and enable_textual_memory and text_mem is not None time user_id: {target_user_id} time is: {time.time() - time_start_1}" ) sync_mode = self.mem_cubes[mem_cube_id].text_mem.mode + if sync_mode == "async": + assert self.mem_scheduler is not None, ( + "Mem-Scheduler must be working when use synchronous memory adding." + ) + if self.mem_cubes[mem_cube_id].config.text_mem.backend != "tree_text": add_memory = [] metadata = TextualMemoryMetadata( @@ -728,20 +733,20 @@ def add( f"Added memory user {target_user_id} to memcube {mem_cube_id}: {mem_id_list}" ) - if sync_mode == "async" and self.mem_scheduler is not None: - message_item = ScheduleMessageItem( - user_id=target_user_id, - mem_cube_id=mem_cube_id, - mem_cube=self.mem_cubes[mem_cube_id], - label=MEM_READ_LABEL, - content=json.dumps(mem_ids), - timestamp=datetime.utcnow(), - ) - self.mem_scheduler.submit_messages(messages=[message_item]) - # submit messages for scheduler if self.enable_mem_scheduler and self.mem_scheduler is not None: mem_cube = self.mem_cubes[mem_cube_id] + if sync_mode == "async": + message_item = ScheduleMessageItem( + user_id=target_user_id, + mem_cube_id=mem_cube_id, + mem_cube=mem_cube, + label=MEM_READ_LABEL, + content=json.dumps(mem_ids), + timestamp=datetime.utcnow(), + ) + self.mem_scheduler.submit_messages(messages=[message_item]) + message_item = ScheduleMessageItem( user_id=target_user_id, mem_cube_id=mem_cube_id, From 9dd632f0aadf237a331ee1400d2d6ea4f6be2dd2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B8=AD=E9=98=B3=E9=98=B3?= Date: Tue, 14 Oct 2025 17:26:05 +0800 Subject: [PATCH 06/22] feat: add 'remove and refresh memory in schedular' --- src/memos/mem_reader/simple_struct.py | 2 +- src/memos/mem_scheduler/general_scheduler.py | 7 ++++++- .../memories/textual/tree_text_memory/organize/manager.py | 7 +++---- 3 files changed, 10 insertions(+), 6 deletions(-) diff --git a/src/memos/mem_reader/simple_struct.py b/src/memos/mem_reader/simple_struct.py index a0e4c061e..3ecc15e80 100644 --- a/src/memos/mem_reader/simple_struct.py +++ b/src/memos/mem_reader/simple_struct.py @@ -258,7 +258,7 @@ def get_memory( # Process Q&A pairs concurrently with context propagation with ContextThreadPoolExecutor() as executor: futures = [ - executor.submit(processing_func, scene_data_info, info, mode) + executor.submit(processing_func, scene_data_info, info, mode=mode) for scene_data_info in list_scene_data_info ] for future in concurrent.futures.as_completed(futures): diff --git a/src/memos/mem_scheduler/general_scheduler.py b/src/memos/mem_scheduler/general_scheduler.py index 2058f1c87..9e64e7311 100644 --- a/src/memos/mem_scheduler/general_scheduler.py +++ b/src/memos/mem_scheduler/general_scheduler.py @@ -317,7 +317,7 @@ def _process_memories_with_reader( scene_data=scene_data, type="chat", info={"user_id": user_id, "session_id": "", "mem_cube_id": mem_cube_id}, - mode="fast", # Use fast mode for async processing + mode="fine", # Use fast mode for async processing ) if processed_memories and len(processed_memories) > 0: @@ -344,6 +344,11 @@ def _process_memories_with_reader( else: logger.info("mem_reader returned no processed memories") + text_mem.delete(mem_ids) + logger.info("Delete raw mem_ids") + text_mem.memory_manager.remove_and_refresh_memory() + logger.info("Remove and Refresh Memories") + except Exception as e: logger.error(f"Error in _process_memories_with_reader: {e}", exc_info=True) diff --git a/src/memos/memories/textual/tree_text_memory/organize/manager.py b/src/memos/memories/textual/tree_text_memory/organize/manager.py index 53b01fd78..1dd4aee5e 100644 --- a/src/memos/memories/textual/tree_text_memory/organize/manager.py +++ b/src/memos/memories/textual/tree_text_memory/organize/manager.py @@ -1,4 +1,3 @@ -import asyncio import traceback import uuid @@ -244,9 +243,9 @@ def _ensure_structure_path( # Step 3: Return this structure node ID as the parent_id return node_id - async def _remove_and_refresh_memory(self): - await asyncio.to_thread(self._cleanup_memories_if_needed) - await asyncio.to_thread(self._refresh_memory_size) + def remove_and_refresh_memory(self): + self._cleanup_memories_if_needed() + self._refresh_memory_size() def _cleanup_memories_if_needed(self) -> None: """ From 8c970587f92030e8564033100a9a9f1c6161b975 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B8=AD=E9=98=B3=E9=98=B3?= Date: Tue, 14 Oct 2025 21:56:33 +0800 Subject: [PATCH 07/22] feat: add naive fast mode in mem-reader --- src/memos/chunkers/sentence_chunker.py | 5 +- src/memos/mem_reader/simple_struct.py | 217 ++++++++++++++++++++++--- 2 files changed, 201 insertions(+), 21 deletions(-) diff --git a/src/memos/chunkers/sentence_chunker.py b/src/memos/chunkers/sentence_chunker.py index 4de0cf32b..c499a49d2 100644 --- a/src/memos/chunkers/sentence_chunker.py +++ b/src/memos/chunkers/sentence_chunker.py @@ -28,8 +28,11 @@ def __init__(self, config: SentenceChunkerConfig): ) logger.info(f"Initialized SentenceChunker with config: {config}") - def chunk(self, text: str) -> list[Chunk]: + def chunk(self, text: str) -> list[str] | list[Chunk]: """Chunk the given text into smaller chunks based on sentences.""" + if len(text) <= self.config.chunk_size: + return [text] + chonkie_chunks = self.chunker.chunk(text) chunks = [] diff --git a/src/memos/mem_reader/simple_struct.py b/src/memos/mem_reader/simple_struct.py index 3ecc15e80..cc979bb41 100644 --- a/src/memos/mem_reader/simple_struct.py +++ b/src/memos/mem_reader/simple_struct.py @@ -112,6 +112,14 @@ def _build_node(idx, message, info, scene_file, llm, parse_json_result, embedder return None +def _derive_key(text: str, max_len: int = 80) -> str: + """default key when without LLM: first max_len words""" + if not text: + return "" + sent = re.split(r"[。!?!?]\s*|\n", text.strip())[0] + return (sent[:max_len]).strip() + + class SimpleStructMemReader(BaseMemReader, ABC): """Naive implementation of MemReader.""" @@ -127,11 +135,186 @@ def __init__(self, config: SimpleStructMemReaderConfig): self.embedder = EmbedderFactory.from_config(config.embedder) self.chunker = ChunkerFactory.from_config(config.chunker) + def _make_memory_item( + self, + value: str, + info: dict, + memory_type: str, + tags: list[str] | None = None, + key: str | None = None, + sources: list | None = None, + background: str = "", + type_: str = "fact", + confidence: float = 0.99, + ) -> TextualMemoryItem: + """construct memory item""" + return TextualMemoryItem( + memory=value, + metadata=TreeNodeTextualMemoryMetadata( + user_id=info.get("user_id", ""), + session_id=info.get("session_id", ""), + memory_type=memory_type, + status="activated", + tags=tags or [], + key=key if key is not None else _derive_key(value), + embedding=self.embedder.embed([value])[0], + usage=[], + sources=sources or [], + background=background, + confidence=confidence, + type=type_, + ), + ) + @timed def _process_chat_data(self, scene_data_info, info, **kwargs): mode = kwargs.get("mode", "fine") if mode == "fast": - raise NotImplementedError + # 使用合并逻辑处理短消息 + raw_content_list = [] + current_content = "" + current_roles = set() + current_sources = [] + current_idx = 0 + + for idx, item in enumerate(scene_data_info): + try: + role = item.get("role", "user") + content = item.get("content", "") + chat_time = item.get("chat_time", None) + + prefix = f"{role}: " + (f"[{chat_time}]: " if chat_time else "") + mem = f"{prefix}{content}\n" + + if len(mem) > 2000: + if current_content: + raw_content_list.append( + { + "text": current_content, + "roles": current_roles, + "sources": current_sources, + "start_idx": current_idx, + } + ) + current_content = "" + current_roles = set() + current_sources = [] + + try: + chunks = self.chunker.chunk(content) or [] + except Exception as e: + logger.warning(f"[ChatFast] chunker failed on item {idx}: {e}") + chunks = [] + + if not chunks: + chunks = [type("C", (), {"text": content})] + + for chunk in chunks: + chunk_text = f"{prefix}{chunk.text}" + raw_content_list.append( + { + "text": chunk_text, + "roles": {role}, + "sources": [ + { + "type": "chat", + "index": idx, + "role": role, + "chat_time": chat_time, + } + ], + "start_idx": idx, + } + ) + else: + if len(current_content + mem) > 2000: + if current_content: + raw_content_list.append( + { + "text": current_content, + "roles": current_roles, + "sources": current_sources, + "start_idx": current_idx, + } + ) + current_content = mem + current_roles = {role} + current_sources = [ + { + "type": "chat", + "index": idx, + "role": role, + "chat_time": chat_time, + } + ] + current_idx = idx + else: + current_content += mem + current_roles.add(role) + current_sources.append( + { + "type": "chat", + "index": idx, + "role": role, + "chat_time": chat_time, + } + ) + + except Exception as e: + logger.error(f"[ChatFast] Error preparing item {idx}: {e}") + + if current_content: + raw_content_list.append( + { + "text": current_content, + "roles": current_roles, + "sources": current_sources, + "start_idx": current_idx, + } + ) + + chat_nodes = [] + + def _process_single_item(item_data): + try: + text = item_data["text"] + roles = item_data["roles"] + sources = item_data["sources"] + + mem_type = "UserMemory" if (roles and roles == {"user"}) else "LongTermMemory" + tags = ["mode:fast", f"lang:{detect_lang(text)}"] + [ + f"role:{r}" for r in sorted(roles) + ] + + node = self._make_memory_item( + value=text, + info=info, + memory_type=mem_type, + tags=tags, + key=None, + sources=sources, + background="", + type_="fact", + confidence=0.99, + ) + return node + except Exception as e: + logger.error(f"[ChatFast] Error processing item: {e}") + return None + + with ContextThreadPoolExecutor(max_workers=8) as executor: + futures = [executor.submit(_process_single_item, item) for item in raw_content_list] + + for future in concurrent.futures.as_completed(futures): + try: + node = future.result() + if node: + chat_nodes.append(node) + except Exception as e: + logger.error(f"[ChatFast] Future result error: {e}") + + return chat_nodes + mem_list = [] for item in scene_data_info: if "chat_time" in item: @@ -179,24 +362,18 @@ def _process_chat_data(self, scene_data_info, info, **kwargs): if memory_type not in ["LongTermMemory", "UserMemory"]: memory_type = "LongTermMemory" - node_i = TextualMemoryItem( - memory=memory_i_raw.get("value", ""), - metadata=TreeNodeTextualMemoryMetadata( - user_id=info.get("user_id"), - session_id=info.get("session_id"), - memory_type=memory_type, - status="activated", - tags=memory_i_raw.get("tags", []) - if type(memory_i_raw.get("tags", [])) is list - else [], - key=memory_i_raw.get("key", ""), - embedding=self.embedder.embed([memory_i_raw.get("value", "")])[0], - usage=[], - sources=scene_data_info, - background=response_json.get("summary", ""), - confidence=0.99, - type="fact", - ), + node_i = self._make_memory_item( + value=memory_i_raw.get("value", ""), + info=info, + memory_type=memory_type, + tags=memory_i_raw.get("tags", []) + if isinstance(memory_i_raw.get("tags", []), list) + else [], + key=memory_i_raw.get("key", ""), + sources=scene_data_info, + background=response_json.get("summary", ""), + type_="fact", + confidence=0.99, ) chat_read_nodes.append(node_i) except Exception as e: @@ -205,7 +382,7 @@ def _process_chat_data(self, scene_data_info, info, **kwargs): return chat_read_nodes def get_memory( - self, scene_data: list, type: str, info: dict[str, Any], mode: str = "fast" + self, scene_data: list, type: str, info: dict[str, Any], mode: str = "fine" ) -> list[list[TextualMemoryItem]]: """ Extract and classify memory content from scene_data. From 3e08a82540b62f30d0d5fbab108835c0997d0edf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B8=AD=E9=98=B3=E9=98=B3?= Date: Wed, 15 Oct 2025 15:13:32 +0800 Subject: [PATCH 08/22] feat: finish fast mode in mem-reader --- examples/mem_reader/reader.py | 83 ++++++++++++++++++++------ src/memos/chunkers/sentence_chunker.py | 3 - src/memos/mem_reader/simple_struct.py | 29 +++------ 3 files changed, 72 insertions(+), 43 deletions(-) diff --git a/examples/mem_reader/reader.py b/examples/mem_reader/reader.py index e26d00a67..30e42d497 100644 --- a/examples/mem_reader/reader.py +++ b/examples/mem_reader/reader.py @@ -11,7 +11,7 @@ def main(): ) reader = SimpleStructMemReader(reader_config) - # 3. Define scene data + # 2. Define scene data scene_data = [ [ {"role": "user", "chat_time": "3 May 2025", "content": "I'm feeling a bit down today."}, @@ -187,32 +187,77 @@ def main(): ], ] - # 4. Acquiring memories + print("=== Mem-Reader Fast vs Fine Mode Comparison ===\n") + + # 3. Test Fine Mode (default) + print("🔄 Testing FINE mode (default, with LLM processing)...") start_time = time.time() - chat_memory = reader.get_memory( - scene_data, type="chat", info={"user_id": "user1", "session_id": "session1"} + fine_memory = reader.get_memory( + scene_data, type="chat", info={"user_id": "user1", "session_id": "session1"}, mode="fine" ) - print("\nChat Memory:\n", chat_memory) + fine_time = time.time() - start_time + print(f"✅ Fine mode completed in {fine_time:.2f} seconds") + print(f"📊 Fine mode generated {sum(len(mem_list) for mem_list in fine_memory)} memory items") + + # 4. Test Fast Mode + print("\n⚡ Testing FAST mode (quick processing, no LLM calls)...") + start_time = time.time() + fast_memory = reader.get_memory( + scene_data, type="chat", info={"user_id": "user1", "session_id": "session1"}, mode="fast" + ) + fast_time = time.time() - start_time + print(f"✅ Fast mode completed in {fast_time:.2f} seconds") + print(f"📊 Fast mode generated {sum(len(mem_list) for mem_list in fast_memory)} memory items") + + # 5. Performance Comparison + print("\n📈 Performance Comparison:") + print(f" Fine mode: {fine_time:.2f}s") + print(f" Fast mode: {fast_time:.2f}s") + print(f" Speed improvement: {fine_time / fast_time:.1f}x faster") - # 5. Example of processing documents - print("\n=== Processing Documents ===") + # 6. Show sample results from both modes + print("\n🔍 Sample Results Comparison:") + print("\n--- FINE Mode Results (first 3 items) ---") + for i, mem_list in enumerate(fine_memory[:3]): + for j, mem_item in enumerate(mem_list[:2]): # Show first 2 items from each list + print(f" [{i}][{j}] {mem_item.memory[:100]}...") + + print("\n--- FAST Mode Results (first 3 items) ---") + for i, mem_list in enumerate(fast_memory[:3]): + for j, mem_item in enumerate(mem_list[:2]): # Show first 2 items from each list + print(f" [{i}][{j}] {mem_item.memory[:100]}...") + + # 7. Example of processing documents (only in fine mode) + print("\n=== Processing Documents (Fine Mode Only) ===") # Example document paths (you should replace these with actual document paths) doc_paths = [ "examples/mem_reader/text1.txt", "examples/mem_reader/text2.txt", ] - # 6. Acquiring memories from documents - doc_memory = reader.get_memory( - doc_paths, - "doc", - info={ - "user_id": "1111", - "session_id": "2222", - }, - ) - print("\nDocument Memory:\n", doc_memory) - end_time = time.time() - print(f"The runtime is {end_time - start_time} seconds.") + + try: + # 6. Acquiring memories from documents + doc_memory = reader.get_memory( + doc_paths, + "doc", + info={ + "user_id": "1111", + "session_id": "2222", + }, + mode="fine", + ) + print( + f"\n📄 Document Memory generated {sum(len(mem_list) for mem_list in doc_memory)} items" + ) + except Exception as e: + print(f"⚠️ Document processing failed: {e}") + print(" (This is expected if document files don't exist)") + + print("\n🎯 Summary:") + print(f" • Fast mode: {fast_time:.2f}s - Quick processing, no LLM calls") + print(f" • Fine mode: {fine_time:.2f}s - Full LLM processing for better understanding") + print(" • Use fast mode for: Real-time applications, high-throughput scenarios") + print(" • Use fine mode for: Quality analysis, detailed memory extraction") if __name__ == "__main__": diff --git a/src/memos/chunkers/sentence_chunker.py b/src/memos/chunkers/sentence_chunker.py index c499a49d2..080962482 100644 --- a/src/memos/chunkers/sentence_chunker.py +++ b/src/memos/chunkers/sentence_chunker.py @@ -30,9 +30,6 @@ def __init__(self, config: SentenceChunkerConfig): def chunk(self, text: str) -> list[str] | list[Chunk]: """Chunk the given text into smaller chunks based on sentences.""" - if len(text) <= self.config.chunk_size: - return [text] - chonkie_chunks = self.chunker.chunk(text) chunks = [] diff --git a/src/memos/mem_reader/simple_struct.py b/src/memos/mem_reader/simple_struct.py index cc979bb41..fc0e81e52 100644 --- a/src/memos/mem_reader/simple_struct.py +++ b/src/memos/mem_reader/simple_struct.py @@ -134,6 +134,7 @@ def __init__(self, config: SimpleStructMemReaderConfig): self.llm = LLMFactory.from_config(config.llm) self.embedder = EmbedderFactory.from_config(config.embedder) self.chunker = ChunkerFactory.from_config(config.chunker) + self.memory_max_length = 8000 def _make_memory_item( self, @@ -170,7 +171,6 @@ def _make_memory_item( def _process_chat_data(self, scene_data_info, info, **kwargs): mode = kwargs.get("mode", "fine") if mode == "fast": - # 使用合并逻辑处理短消息 raw_content_list = [] current_content = "" current_roles = set() @@ -185,8 +185,7 @@ def _process_chat_data(self, scene_data_info, info, **kwargs): prefix = f"{role}: " + (f"[{chat_time}]: " if chat_time else "") mem = f"{prefix}{content}\n" - - if len(mem) > 2000: + if len(mem) > self.memory_max_length: if current_content: raw_content_list.append( { @@ -196,9 +195,7 @@ def _process_chat_data(self, scene_data_info, info, **kwargs): "start_idx": current_idx, } ) - current_content = "" - current_roles = set() - current_sources = [] + current_content, current_roles, current_sources = "", set(), [] try: chunks = self.chunker.chunk(content) or [] @@ -209,8 +206,8 @@ def _process_chat_data(self, scene_data_info, info, **kwargs): if not chunks: chunks = [type("C", (), {"text": content})] - for chunk in chunks: - chunk_text = f"{prefix}{chunk.text}" + for c in chunks: + chunk_text = c.text if hasattr(c, "text") else c raw_content_list.append( { "text": chunk_text, @@ -227,7 +224,7 @@ def _process_chat_data(self, scene_data_info, info, **kwargs): } ) else: - if len(current_content + mem) > 2000: + if len(current_content + mem) > self.memory_max_length: if current_content: raw_content_list.append( { @@ -240,24 +237,14 @@ def _process_chat_data(self, scene_data_info, info, **kwargs): current_content = mem current_roles = {role} current_sources = [ - { - "type": "chat", - "index": idx, - "role": role, - "chat_time": chat_time, - } + {"type": "chat", "index": idx, "role": role, "chat_time": chat_time} ] current_idx = idx else: current_content += mem current_roles.add(role) current_sources.append( - { - "type": "chat", - "index": idx, - "role": role, - "chat_time": chat_time, - } + {"type": "chat", "index": idx, "role": role, "chat_time": chat_time} ) except Exception as e: From 37fcff81b475be581f18abfb44a6055bf71c78ee Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B8=AD=E9=98=B3=E9=98=B3?= Date: Thu, 16 Oct 2025 16:51:33 +0800 Subject: [PATCH 09/22] feat: add token-based window splitting and concurrency improvements --- src/memos/mem_reader/simple_struct.py | 227 +++++++++++++++++++------- 1 file changed, 172 insertions(+), 55 deletions(-) diff --git a/src/memos/mem_reader/simple_struct.py b/src/memos/mem_reader/simple_struct.py index fc0e81e52..0bd2932a0 100644 --- a/src/memos/mem_reader/simple_struct.py +++ b/src/memos/mem_reader/simple_struct.py @@ -3,6 +3,7 @@ import json import os import re +import traceback from abc import ABC from typing import Any @@ -41,6 +42,26 @@ "doc": {"en": SIMPLE_STRUCT_DOC_READER_PROMPT, "zh": SIMPLE_STRUCT_DOC_READER_PROMPT_ZH}, } +try: + import tiktoken + + try: + _ENC = tiktoken.encoding_for_model("gpt-4o-mini") + except Exception: + _ENC = tiktoken.get_encoding("cl100k_base") + + def _count_tokens_text(s: str) -> int: + return len(_ENC.encode(s or "")) +except Exception: + # Heuristic fallback: zh chars ~1 token, others ~1 token per ~4 chars + def _count_tokens_text(s: str) -> int: + if not s: + return 0 + zh_chars = re.findall(r"[\u4e00-\u9fff]", s) + zh = len(zh_chars) + rest = len(s) - zh + return zh + max(1, rest // 4) + def detect_lang(text): try: @@ -135,6 +156,9 @@ def __init__(self, config: SimpleStructMemReaderConfig): self.embedder = EmbedderFactory.from_config(config.embedder) self.chunker = ChunkerFactory.from_config(config.chunker) self.memory_max_length = 8000 + # Use token-based windowing; default to ~5000 tokens if not configured + self.chat_window_max_tokens = getattr(self.config, "chat_window_max_tokens", 5000) + self._count_tokens = _count_tokens_text def _make_memory_item( self, @@ -167,10 +191,37 @@ def _make_memory_item( ), ) + def _get_llm_response(self, mem_str: str) -> dict: + lang = detect_lang(mem_str) + template = PROMPT_DICT["chat"][lang] + examples = PROMPT_DICT["chat"][f"{lang}_example"] + prompt = template.replace("${conversation}", mem_str) + if self.config.remove_prompt_example: + prompt = prompt.replace(examples, "") + messages = [{"role": "user", "content": prompt}] + try: + response_text = self.llm.generate(messages) + response_json = self.parse_json_result(response_text) + except Exception as e: + logger.error(f"[LLM] Exception during chat generation: {e}") + response_json = { + "memory list": [ + { + "key": mem_str[:10], + "memory_type": "UserMemory", + "value": mem_str, + "tags": [], + } + ], + "summary": mem_str, + } + return response_json + @timed def _process_chat_data(self, scene_data_info, info, **kwargs): mode = kwargs.get("mode", "fine") if mode == "fast": + logger.debug("Using Fast Mode") raw_content_list = [] current_content = "" current_roles = set() @@ -179,13 +230,19 @@ def _process_chat_data(self, scene_data_info, info, **kwargs): for idx, item in enumerate(scene_data_info): try: - role = item.get("role", "user") + role = item.get("role", "") content = item.get("content", "") chat_time = item.get("chat_time", None) - prefix = f"{role}: " + (f"[{chat_time}]: " if chat_time else "") + prefix = ( + f"{role}: " + if (role and role != "mix") + else f"[{chat_time}]: " + if chat_time + else "" + ) mem = f"{prefix}{content}\n" - if len(mem) > self.memory_max_length: + if self._count_tokens(mem) > self.chat_window_max_tokens: if current_content: raw_content_list.append( { @@ -207,7 +264,8 @@ def _process_chat_data(self, scene_data_info, info, **kwargs): chunks = [type("C", (), {"text": content})] for c in chunks: - chunk_text = c.text if hasattr(c, "text") else c + chunk_body = c.text if hasattr(c, "text") else c + chunk_text = f"{prefix}{chunk_body}" raw_content_list.append( { "text": chunk_text, @@ -224,7 +282,7 @@ def _process_chat_data(self, scene_data_info, info, **kwargs): } ) else: - if len(current_content + mem) > self.memory_max_length: + if self._count_tokens(current_content + mem) > self.chat_window_max_tokens: if current_content: raw_content_list.append( { @@ -290,53 +348,73 @@ def _process_single_item(item_data): return None with ContextThreadPoolExecutor(max_workers=8) as executor: - futures = [executor.submit(_process_single_item, item) for item in raw_content_list] - - for future in concurrent.futures.as_completed(futures): + futures = { + executor.submit(_process_single_item, item): i + for i, item in enumerate(raw_content_list) + } + + chat_nodes = [None] * len(futures) + for fut in concurrent.futures.as_completed(futures): + i = futures[fut] try: - node = future.result() + node = fut.result() if node: - chat_nodes.append(node) + chat_nodes[i] = node except Exception as e: logger.error(f"[ChatFast] Future result error: {e}") + chat_nodes = [n for n in chat_nodes if n is not None] return chat_nodes + else: + logger.debug("Using Fine Mode") + mem_list = [] + for item in scene_data_info: + role = item.get("role", "") + content = item.get("content", "") + chat_time = item.get("chat_time", "") + prefix = ( + f"{role}: " + if (role and role != "mix") + else f"[{chat_time}]: " + if chat_time + else "" + ) + mem_list.append(f"{prefix}{content}\n") + response_json = self._get_llm_response("\n".join(mem_list)) + chat_read_nodes = [] + for memory_i_raw in response_json.get("memory list", []): + try: + memory_type = ( + memory_i_raw.get("memory_type", "LongTermMemory") + .replace("长期记忆", "LongTermMemory") + .replace("用户记忆", "UserMemory") + ) - mem_list = [] - for item in scene_data_info: - if "chat_time" in item: - mem = item["role"] + ": " + f"[{item['chat_time']}]: " + item["content"] - mem_list.append(mem) - else: - mem = item["role"] + ":" + item["content"] - mem_list.append(mem) - lang = detect_lang("\n".join(mem_list)) - template = PROMPT_DICT["chat"][lang] - examples = PROMPT_DICT["chat"][f"{lang}_example"] - - prompt = template.replace("${conversation}", "\n".join(mem_list)) - if self.config.remove_prompt_example: - prompt = prompt.replace(examples, "") + if memory_type not in ["LongTermMemory", "UserMemory"]: + memory_type = "LongTermMemory" - messages = [{"role": "user", "content": prompt}] + node_i = self._make_memory_item( + value=memory_i_raw.get("value", ""), + info=info, + memory_type=memory_type, + tags=memory_i_raw.get("tags", []) + if isinstance(memory_i_raw.get("tags", []), list) + else [], + key=memory_i_raw.get("key", ""), + sources=scene_data_info, + background=response_json.get("summary", ""), + type_="fact", + confidence=0.99, + ) + chat_read_nodes.append(node_i) + except Exception as e: + logger.error(f"[ChatReader] Error parsing memory item: {e}") - try: - response_text = self.llm.generate(messages) - response_json = self.parse_json_result(response_text) - except Exception as e: - logger.error(f"[LLM] Exception during chat generation: {e}") - response_json = { - "memory list": [ - { - "key": "\n".join(mem_list)[:10], - "memory_type": "UserMemory", - "value": "\n".join(mem_list), - "tags": [], - } - ], - "summary": "\n".join(mem_list), - } + return chat_read_nodes + def _process_transfer_chat_data(self, raw_node: TextualMemoryItem): + raw_memory = raw_node.memory + response_json = self._get_llm_response(raw_memory) chat_read_nodes = [] for memory_i_raw in response_json.get("memory list", []): try: @@ -345,19 +423,20 @@ def _process_single_item(item_data): .replace("长期记忆", "LongTermMemory") .replace("用户记忆", "UserMemory") ) - if memory_type not in ["LongTermMemory", "UserMemory"]: memory_type = "LongTermMemory" - node_i = self._make_memory_item( value=memory_i_raw.get("value", ""), - info=info, + info={ + "user_id": raw_node.metadata.user_id, + "session_id": raw_node.metadata.session_id, + }, memory_type=memory_type, tags=memory_i_raw.get("tags", []) if isinstance(memory_i_raw.get("tags", []), list) else [], key=memory_i_raw.get("key", ""), - sources=scene_data_info, + sources=raw_node.metadata.sources, background=response_json.get("summary", ""), type_="fact", confidence=0.99, @@ -426,9 +505,44 @@ def get_memory( for scene_data_info in list_scene_data_info ] for future in concurrent.futures.as_completed(futures): - res_memory = future.result() - memory_list.append(res_memory) + try: + res_memory = future.result() + if res_memory is not None: + memory_list.append(res_memory) + except Exception as e: + logger.error(f"Task failed with exception: {e}") + logger.error(traceback.format_exc()) + return memory_list + + def fine_transfer_simple_mem( + self, input_memories: list[list[TextualMemoryItem]], type: str + ) -> list[list[TextualMemoryItem]]: + if not input_memories: + return [] + + memory_list = [] + if type == "chat": + processing_func = self._process_transfer_chat_data + elif type == "doc": + processing_func = self._process_transfer_doc_data + else: + processing_func = self._process_transfer_doc_data + + # Process Q&A pairs concurrently with context propagation + with ContextThreadPoolExecutor() as executor: + futures = [ + executor.submit(processing_func, scene_data_info) + for scene_data_info in input_memories + ] + for future in concurrent.futures.as_completed(futures): + try: + res_memory = future.result() + if res_memory is not None: + memory_list.append(res_memory) + except Exception as e: + logger.error(f"Task failed with exception: {e}") + logger.error(traceback.format_exc()) return memory_list def get_scene_data_info(self, scene_data: list, type: str) -> list[str]: @@ -444,13 +558,6 @@ def get_scene_data_info(self, scene_data: list, type: str) -> list[str]: List of strings containing the processed scene data """ results = [] - parser_config = ParserConfigFactory.model_validate( - { - "backend": "markitdown", - "config": {}, - } - ) - parser = ParserFactory.from_config(parser_config) if type == "chat": for items in scene_data: @@ -468,6 +575,13 @@ def get_scene_data_info(self, scene_data: list, type: str) -> list[str]: if result: results.append(result) elif type == "doc": + parser_config = ParserConfigFactory.model_validate( + { + "backend": "markitdown", + "config": {}, + } + ) + parser = ParserFactory.from_config(parser_config) for item in scene_data: try: if os.path.exists(item): @@ -529,6 +643,9 @@ def _process_doc_data(self, scene_data_info, info, **kwargs): logger.error(f"[DocReader] Future task failed: {e}") return doc_nodes + def _process_transfer_doc_data(self, raw_node: TextualMemoryItem): + raise NotImplementedError + def parse_json_result(self, response_text): try: json_start = response_text.find("{") From 5f7e8e0528c22c2467b1da90043526cf074744e4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B8=AD=E9=98=B3=E9=98=B3?= Date: Thu, 16 Oct 2025 17:11:41 +0800 Subject: [PATCH 10/22] feat: add split chunker into mode in simple struct mem reader --- src/memos/mem_reader/simple_struct.py | 254 ++++++++------------------ 1 file changed, 76 insertions(+), 178 deletions(-) diff --git a/src/memos/mem_reader/simple_struct.py b/src/memos/mem_reader/simple_struct.py index 0bd2932a0..1197ef04d 100644 --- a/src/memos/mem_reader/simple_struct.py +++ b/src/memos/mem_reader/simple_struct.py @@ -217,199 +217,97 @@ def _get_llm_response(self, mem_str: str) -> dict: } return response_json - @timed - def _process_chat_data(self, scene_data_info, info, **kwargs): - mode = kwargs.get("mode", "fine") - if mode == "fast": - logger.debug("Using Fast Mode") - raw_content_list = [] - current_content = "" - current_roles = set() - current_sources = [] - current_idx = 0 - - for idx, item in enumerate(scene_data_info): - try: - role = item.get("role", "") - content = item.get("content", "") - chat_time = item.get("chat_time", None) - - prefix = ( - f"{role}: " - if (role and role != "mix") - else f"[{chat_time}]: " - if chat_time - else "" - ) - mem = f"{prefix}{content}\n" - if self._count_tokens(mem) > self.chat_window_max_tokens: - if current_content: - raw_content_list.append( - { - "text": current_content, - "roles": current_roles, - "sources": current_sources, - "start_idx": current_idx, - } - ) - current_content, current_roles, current_sources = "", set(), [] - - try: - chunks = self.chunker.chunk(content) or [] - except Exception as e: - logger.warning(f"[ChatFast] chunker failed on item {idx}: {e}") - chunks = [] - - if not chunks: - chunks = [type("C", (), {"text": content})] - - for c in chunks: - chunk_body = c.text if hasattr(c, "text") else c - chunk_text = f"{prefix}{chunk_body}" - raw_content_list.append( - { - "text": chunk_text, - "roles": {role}, - "sources": [ - { - "type": "chat", - "index": idx, - "role": role, - "chat_time": chat_time, - } - ], - "start_idx": idx, - } - ) - else: - if self._count_tokens(current_content + mem) > self.chat_window_max_tokens: - if current_content: - raw_content_list.append( - { - "text": current_content, - "roles": current_roles, - "sources": current_sources, - "start_idx": current_idx, - } - ) - current_content = mem - current_roles = {role} - current_sources = [ - {"type": "chat", "index": idx, "role": role, "chat_time": chat_time} - ] - current_idx = idx - else: - current_content += mem - current_roles.add(role) - current_sources.append( - {"type": "chat", "index": idx, "role": role, "chat_time": chat_time} - ) + def _iter_chat_windows(self, scene_data_info, max_tokens=None, overlap=200): + """ + use token counter to get a slide window generator + """ + max_tokens = max_tokens or self.chat_window_max_tokens + buf, sources, start_idx = [], [], 0 + cur_text = "" + + for idx, item in enumerate(scene_data_info): + role = item.get("role", "") + content = item.get("content", "") + chat_time = item.get("chat_time", None) + prefix = ( + f"{role}: " + if (role and role != "mix") + else (f"[{chat_time}]: " if chat_time else "") + ) + line = f"{prefix}{content}\n" - except Exception as e: - logger.error(f"[ChatFast] Error preparing item {idx}: {e}") + if self._count_tokens(cur_text + line) > max_tokens and cur_text: + text = "".join(buf) + yield {"text": text, "sources": sources.copy(), "start_idx": start_idx} + while buf and self._count_tokens("".join(buf)) > overlap: + buf.pop(0) + sources.pop(0) + start_idx = idx + cur_text = "".join(buf) - if current_content: - raw_content_list.append( - { - "text": current_content, - "roles": current_roles, - "sources": current_sources, - "start_idx": current_idx, - } - ) + buf.append(line) + sources.append({"type": "chat", "index": idx, "role": role, "chat_time": chat_time}) + cur_text = "".join(buf) - chat_nodes = [] + if buf: + yield {"text": "".join(buf), "sources": sources.copy(), "start_idx": start_idx} - def _process_single_item(item_data): - try: - text = item_data["text"] - roles = item_data["roles"] - sources = item_data["sources"] - - mem_type = "UserMemory" if (roles and roles == {"user"}) else "LongTermMemory" - tags = ["mode:fast", f"lang:{detect_lang(text)}"] + [ - f"role:{r}" for r in sorted(roles) - ] - - node = self._make_memory_item( - value=text, - info=info, - memory_type=mem_type, - tags=tags, - key=None, - sources=sources, - background="", - type_="fact", - confidence=0.99, - ) - return node - except Exception as e: - logger.error(f"[ChatFast] Error processing item: {e}") - return None + @timed + def _process_chat_data(self, scene_data_info, info, **kwargs): + mode = kwargs.get("mode", "fine") + windows = list(self._iter_chat_windows(scene_data_info)) - with ContextThreadPoolExecutor(max_workers=8) as executor: - futures = { - executor.submit(_process_single_item, item): i - for i, item in enumerate(raw_content_list) - } + if mode == "fast": + logger.debug("Using unified Fast Mode") + + def _build_fast_node(w): + text = w["text"] + roles = {s.get("role", "") for s in w["sources"] if s.get("role")} + mem_type = "UserMemory" if roles == {"user"} else "LongTermMemory" + tags = ["mode:fast", f"lang:{detect_lang(text)}"] + [ + f"role:{r}" for r in sorted(roles) + ] + return self._make_memory_item( + value=text, info=info, memory_type=mem_type, tags=tags, sources=w["sources"] + ) - chat_nodes = [None] * len(futures) + with ContextThreadPoolExecutor(max_workers=8) as ex: + futures = {ex.submit(_build_fast_node, w): i for i, w in enumerate(windows)} + results = [None] * len(futures) for fut in concurrent.futures.as_completed(futures): i = futures[fut] try: node = fut.result() if node: - chat_nodes[i] = node + results[i] = node except Exception as e: - logger.error(f"[ChatFast] Future result error: {e}") - - chat_nodes = [n for n in chat_nodes if n is not None] + logger.error(f"[ChatFast] error: {e}") + chat_nodes = [r for r in results if r] return chat_nodes else: - logger.debug("Using Fine Mode") - mem_list = [] - for item in scene_data_info: - role = item.get("role", "") - content = item.get("content", "") - chat_time = item.get("chat_time", "") - prefix = ( - f"{role}: " - if (role and role != "mix") - else f"[{chat_time}]: " - if chat_time - else "" - ) - mem_list.append(f"{prefix}{content}\n") - response_json = self._get_llm_response("\n".join(mem_list)) + logger.debug("Using unified Fine Mode") chat_read_nodes = [] - for memory_i_raw in response_json.get("memory list", []): - try: - memory_type = ( - memory_i_raw.get("memory_type", "LongTermMemory") - .replace("长期记忆", "LongTermMemory") - .replace("用户记忆", "UserMemory") - ) - - if memory_type not in ["LongTermMemory", "UserMemory"]: - memory_type = "LongTermMemory" - - node_i = self._make_memory_item( - value=memory_i_raw.get("value", ""), - info=info, - memory_type=memory_type, - tags=memory_i_raw.get("tags", []) - if isinstance(memory_i_raw.get("tags", []), list) - else [], - key=memory_i_raw.get("key", ""), - sources=scene_data_info, - background=response_json.get("summary", ""), - type_="fact", - confidence=0.99, - ) - chat_read_nodes.append(node_i) - except Exception as e: - logger.error(f"[ChatReader] Error parsing memory item: {e}") - + for w in windows: + resp = self._get_llm_response(w["text"]) + for m in resp.get("memory list", []): + try: + memory_type = ( + m.get("memory_type", "LongTermMemory") + .replace("长期记忆", "LongTermMemory") + .replace("用户记忆", "UserMemory") + ) + node = self._make_memory_item( + value=m.get("value", ""), + info=info, + memory_type=memory_type, + tags=m.get("tags", []), + key=m.get("key", ""), + sources=w["sources"], + background=resp.get("summary", ""), + ) + chat_read_nodes.append(node) + except Exception as e: + logger.error(f"[ChatFine] parse error: {e}") return chat_read_nodes def _process_transfer_chat_data(self, raw_node: TextualMemoryItem): From 23555275ef22f450d45bf31385661ec88fe7ab74 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B8=AD=E9=98=B3=E9=98=B3?= Date: Thu, 16 Oct 2025 20:36:36 +0800 Subject: [PATCH 11/22] feat: update async-mode add --- examples/mem_reader/reader.py | 317 ++++++++++++++++++ src/memos/graph_dbs/neo4j.py | 2 +- src/memos/mem_os/core.py | 23 +- src/memos/mem_reader/base.py | 7 + src/memos/mem_reader/simple_struct.py | 2 +- src/memos/mem_scheduler/base_scheduler.py | 68 +++- src/memos/mem_scheduler/general_scheduler.py | 27 +- src/memos/memories/textual/tree.py | 9 +- .../tree_text_memory/organize/manager.py | 3 +- .../tree_text_memory/retrieve/recall.py | 2 + .../tree_text_memory/retrieve/searcher.py | 2 +- 11 files changed, 426 insertions(+), 36 deletions(-) diff --git a/examples/mem_reader/reader.py b/examples/mem_reader/reader.py index 30e42d497..3da5d5e76 100644 --- a/examples/mem_reader/reader.py +++ b/examples/mem_reader/reader.py @@ -2,6 +2,11 @@ from memos.configs.mem_reader import SimpleStructMemReaderConfig from memos.mem_reader.simple_struct import SimpleStructMemReader +from memos.memories.textual.item import ( + SourceMessage, + TextualMemoryItem, + TreeNodeTextualMemoryMetadata, +) def main(): @@ -227,6 +232,318 @@ def main(): for j, mem_item in enumerate(mem_list[:2]): # Show first 2 items from each list print(f" [{i}][{j}] {mem_item.memory[:100]}...") + # 7. Example of transfer fast mode result into fine result + fast_mode_memories = [ + TextualMemoryItem( + id="4553141b-3a33-4548-b779-e677ec797a9f", + memory="user: Nate:Oh cool! I might check that one out some time soon! I do love watching classics.\nassistant: Joanna:Yep, that movie is awesome. I first watched it around 3 years ago. I even went out and got a physical copy!\nuser: Nate:Sounds cool! Have you seen it a lot? sounds like you know the movie well!\nassistant: Joanna:A few times. It's one of my favorites! I really like the idea and the acting.\nuser: Nate:Cool! I'll definitely check it out. Thanks for the recommendation!\nassistant: Joanna:No problem, Nate! Let me know if you like it!\n", + metadata=TreeNodeTextualMemoryMetadata( + user_id="nate_test", + session_id="root_session", + status="activated", + type="fact", + key="user: Nate:Oh cool", + confidence=0.9900000095367432, + source=None, + tags=["mode:fast", "lang:en", "role:assistant", "role:user"], + visibility=None, + updated_at="2025-10-16T17:16:30.094877+08:00", + memory_type="LongTermMemory", + sources=[ + SourceMessage( + type="chat", + role="user", + chat_time="7:31 pm on 21 January, 2022", + message_id=None, + content=None, + doc_path=None, + index=0, + ), + SourceMessage( + type="chat", + role="assistant", + chat_time="7:31 pm on 21 January, 2022", + message_id=None, + content=None, + doc_path=None, + index=1, + ), + SourceMessage( + type="chat", + role="user", + chat_time="7:31 pm on 21 January, 2022", + message_id=None, + content=None, + doc_path=None, + index=2, + ), + SourceMessage( + type="chat", + role="assistant", + chat_time="7:31 pm on 21 January, 2022", + message_id=None, + content=None, + doc_path=None, + index=3, + ), + SourceMessage( + type="chat", + role="user", + chat_time="7:31 pm on 21 January, 2022", + message_id=None, + content=None, + doc_path=None, + index=4, + ), + SourceMessage( + type="chat", + role="assistant", + chat_time="7:31 pm on 21 January, 2022", + message_id=None, + content=None, + doc_path=None, + index=5, + ), + ], + embedding=None, + created_at="2025-10-16T17:16:30.094919+08:00", + usage=[], + background="", + ), + ), + TextualMemoryItem( + id="752e42fa-92b6-491a-a430-6864a7730fba", + memory="user: Nate:It was! How about you? Do you have any hobbies you love?\nassistant: Joanna:Yeah! Besides writing, I also enjoy reading, watching movies, and exploring nature. Anything else you enjoy doing, Nate?\nuser: Nate:Playing video games and watching movies are my main hobbies.\nassistant: Joanna:Cool, Nate! So we both have similar interests. What type of movies do you like best?\nuser: Nate:I love action and sci-fi movies, the effects are so cool! What about you, what's your favorite genre?\nassistant: Joanna:I'm all about dramas and romcoms. I love getting immersed in the feelings and plots.\nuser: Nate:Wow, movies can be so powerful! Do you have any recommendations for me?\nassistant: Joanna:Yeah, totally! Have you seen this romantic drama that's all about memory and relationships? It's such a good one.\nuser: Nate:Oh cool! I might check that one out some time soon! I do love watching classics.\nassistant: Joanna:Yep, that movie is awesome. I first watched it around 3 years ago. I even went out and got a physical copy!\n", + metadata=TreeNodeTextualMemoryMetadata( + user_id="nate_test", + session_id="root_session", + status="activated", + type="fact", + key="user: Nate:It was", + confidence=0.9900000095367432, + source=None, + tags=["mode:fast", "lang:en", "role:assistant", "role:user"], + visibility=None, + updated_at="2025-10-16T17:16:30.095726+08:00", + memory_type="LongTermMemory", + sources=[ + SourceMessage( + type="chat", + role="user", + chat_time="7:31 pm on 21 January, 2022", + message_id=None, + content=None, + doc_path=None, + index=0, + ), + SourceMessage( + type="chat", + role="assistant", + chat_time="7:31 pm on 21 January, 2022", + message_id=None, + content=None, + doc_path=None, + index=1, + ), + SourceMessage( + type="chat", + role="user", + chat_time="7:31 pm on 21 January, 2022", + message_id=None, + content=None, + doc_path=None, + index=2, + ), + SourceMessage( + type="chat", + role="assistant", + chat_time="7:31 pm on 21 January, 2022", + message_id=None, + content=None, + doc_path=None, + index=3, + ), + SourceMessage( + type="chat", + role="user", + chat_time="7:31 pm on 21 January, 2022", + message_id=None, + content=None, + doc_path=None, + index=4, + ), + SourceMessage( + type="chat", + role="assistant", + chat_time="7:31 pm on 21 January, 2022", + message_id=None, + content=None, + doc_path=None, + index=5, + ), + SourceMessage( + type="chat", + role="user", + chat_time="7:31 pm on 21 January, 2022", + message_id=None, + content=None, + doc_path=None, + index=6, + ), + SourceMessage( + type="chat", + role="assistant", + chat_time="7:31 pm on 21 January, 2022", + message_id=None, + content=None, + doc_path=None, + index=7, + ), + SourceMessage( + type="chat", + role="user", + chat_time="7:31 pm on 21 January, 2022", + message_id=None, + content=None, + doc_path=None, + index=8, + ), + SourceMessage( + type="chat", + role="assistant", + chat_time="7:31 pm on 21 January, 2022", + message_id=None, + content=None, + doc_path=None, + index=9, + ), + ], + embedding=None, + created_at="2025-10-16T17:16:30.095767+08:00", + usage=[], + background="", + ), + ), + TextualMemoryItem( + id="c9cf448c-deee-43a8-bafd-eb15fde535b2", + memory="user: Nate:Hey Joanna! Long time no see! What's up? Anything fun going on?\nassistant: Joanna:Hey Nate! Long time no see! I've been working on a project lately - it's been pretty cool. What about you - any fun projects or hobbies?\nuser: Nate:Hey Joanna! That's cool! I won my first video game tournament last week - so exciting!\nassistant: Joanna:Wow Nate! Congrats on winning! Tell me more - what game was it?\nuser: Nate:Thanks! it's a team shooter game.\nassistant: Joanna:Wow, great job! What was is called?\nuser: Nate:The game was called Counter-Strike: Global Offensive, and me and my team had a blast to the very end!\nassistant: Joanna:Cool, Nate! Sounds like a fun experience, even if I'm not into games.\nuser: Nate:It was! How about you? Do you have any hobbies you love?\nassistant: Joanna:Yeah! Besides writing, I also enjoy reading, watching movies, and exploring nature. Anything else you enjoy doing, Nate?\n", + metadata=TreeNodeTextualMemoryMetadata( + user_id="nate_test", + session_id="root_session", + status="activated", + type="fact", + key="user: Nate:Hey Joanna", + confidence=0.9900000095367432, + source=None, + tags=["mode:fast", "lang:en", "role:assistant", "role:user"], + visibility=None, + updated_at="2025-10-16T17:16:30.098208+08:00", + memory_type="LongTermMemory", + sources=[ + SourceMessage( + type="chat", + role="user", + chat_time="7:31 pm on 21 January, 2022", + message_id=None, + content=None, + doc_path=None, + index=0, + ), + SourceMessage( + type="chat", + role="assistant", + chat_time="7:31 pm on 21 January, 2022", + message_id=None, + content=None, + doc_path=None, + index=1, + ), + SourceMessage( + type="chat", + role="user", + chat_time="7:31 pm on 21 January, 2022", + message_id=None, + content=None, + doc_path=None, + index=2, + ), + SourceMessage( + type="chat", + role="assistant", + chat_time="7:31 pm on 21 January, 2022", + message_id=None, + content=None, + doc_path=None, + index=3, + ), + SourceMessage( + type="chat", + role="user", + chat_time="7:31 pm on 21 January, 2022", + message_id=None, + content=None, + doc_path=None, + index=4, + ), + SourceMessage( + type="chat", + role="assistant", + chat_time="7:31 pm on 21 January, 2022", + message_id=None, + content=None, + doc_path=None, + index=5, + ), + SourceMessage( + type="chat", + role="user", + chat_time="7:31 pm on 21 January, 2022", + message_id=None, + content=None, + doc_path=None, + index=6, + ), + SourceMessage( + type="chat", + role="assistant", + chat_time="7:31 pm on 21 January, 2022", + message_id=None, + content=None, + doc_path=None, + index=7, + ), + SourceMessage( + type="chat", + role="user", + chat_time="7:31 pm on 21 January, 2022", + message_id=None, + content=None, + doc_path=None, + index=8, + ), + SourceMessage( + type="chat", + role="assistant", + chat_time="7:31 pm on 21 January, 2022", + message_id=None, + content=None, + doc_path=None, + index=9, + ), + ], + embedding=None, + created_at="2025-10-16T17:16:30.098246+08:00", + usage=[], + background="", + ), + ), + ] + fine_memories = reader.fine_transfer_simple_mem(fast_mode_memories, type="chat") + print("\n--- Transfer Mode Results (first 3 items) ---") + for i, mem_list in enumerate(fine_memories[:3]): + for j, mem_item in enumerate(mem_list[:2]): # Show first 2 items from each list + print(f" [{i}][{j}] {mem_item.memory[:100]}...") + # 7. Example of processing documents (only in fine mode) print("\n=== Processing Documents (Fine Mode Only) ===") # Example document paths (you should replace these with actual document paths) diff --git a/src/memos/graph_dbs/neo4j.py b/src/memos/graph_dbs/neo4j.py index 96908913d..b109fb001 100644 --- a/src/memos/graph_dbs/neo4j.py +++ b/src/memos/graph_dbs/neo4j.py @@ -623,7 +623,7 @@ def search_by_embedding( vector (list[float]): The embedding vector representing query semantics. top_k (int): Number of top similar nodes to retrieve. scope (str, optional): Memory type filter (e.g., 'WorkingMemory', 'LongTermMemory'). - status (str, optional): Node status filter (e.g., 'active', 'archived'). + status (str, optional): Node status filter (e.g., 'activated', 'archived'). If provided, restricts results to nodes with matching status. threshold (float, optional): Minimum similarity score threshold (0 ~ 1). search_filter (dict, optional): Additional metadata filters for search results. diff --git a/src/memos/mem_os/core.py b/src/memos/mem_os/core.py index b536ec5b2..0487540fb 100644 --- a/src/memos/mem_os/core.py +++ b/src/memos/mem_os/core.py @@ -688,6 +688,12 @@ def add( logger.info( f"time add: get mem_cube_id check in mem_cubes time user_id: {target_user_id} time is: {time.time() - time_start_0}" ) + sync_mode = self.mem_cubes[mem_cube_id].text_mem.mode + if sync_mode == "async": + assert self.mem_scheduler is not None, ( + "Mem-Scheduler must be working when use synchronous memory adding." + ) + logger.debug(f"Mem-reader mode is: {sync_mode}") time_start_1 = time.time() if ( (messages is not None) @@ -697,11 +703,6 @@ def add( logger.info( f"time add: messages is not None and enable_textual_memory and text_mem is not None time user_id: {target_user_id} time is: {time.time() - time_start_1}" ) - sync_mode = self.mem_cubes[mem_cube_id].text_mem.mode - if sync_mode == "async": - assert self.mem_scheduler is not None, ( - "Mem-Scheduler must be working when use synchronous memory adding." - ) if self.mem_cubes[mem_cube_id].config.text_mem.backend != "tree_text": add_memory = [] @@ -774,10 +775,12 @@ def add( messages_list = [ [{"role": "user", "content": memory_content}] ] # for only user-str input and convert message + memories = self.mem_reader.get_memory( messages_list, type="chat", info={"user_id": target_user_id, "session_id": target_session_id}, + mode="fast" if sync_mode == "async" else "fine", ) mem_ids = [] @@ -791,6 +794,16 @@ def add( # submit messages for scheduler if self.enable_mem_scheduler and self.mem_scheduler is not None: mem_cube = self.mem_cubes[mem_cube_id] + if sync_mode == "async": + message_item = ScheduleMessageItem( + user_id=target_user_id, + mem_cube_id=mem_cube_id, + mem_cube=mem_cube, + label=MEM_READ_LABEL, + content=json.dumps(mem_ids), + timestamp=datetime.utcnow(), + ) + self.mem_scheduler.submit_messages(messages=[message_item]) message_item = ScheduleMessageItem( user_id=target_user_id, mem_cube_id=mem_cube_id, diff --git a/src/memos/mem_reader/base.py b/src/memos/mem_reader/base.py index ba8be8652..3095a0bc6 100644 --- a/src/memos/mem_reader/base.py +++ b/src/memos/mem_reader/base.py @@ -25,3 +25,10 @@ def get_memory( @abstractmethod def transform_memreader(self, data: dict) -> list[TextualMemoryItem]: """Transform the memory data into a list of TextualMemoryItem objects.""" + + @abstractmethod + def fine_transfer_simple_mem( + self, input_memories: list[list[TextualMemoryItem]], type: str + ) -> list[list[TextualMemoryItem]]: + """Fine Transform TextualMemoryItem List into another list of + TextualMemoryItem objects via calling llm to better understand users.""" diff --git a/src/memos/mem_reader/simple_struct.py b/src/memos/mem_reader/simple_struct.py index 1197ef04d..caf4c19df 100644 --- a/src/memos/mem_reader/simple_struct.py +++ b/src/memos/mem_reader/simple_struct.py @@ -413,7 +413,7 @@ def get_memory( return memory_list def fine_transfer_simple_mem( - self, input_memories: list[list[TextualMemoryItem]], type: str + self, input_memories: list[TextualMemoryItem], type: str ) -> list[list[TextualMemoryItem]]: if not input_memories: return [] diff --git a/src/memos/mem_scheduler/base_scheduler.py b/src/memos/mem_scheduler/base_scheduler.py index 0a8c43d78..550aee1f6 100644 --- a/src/memos/mem_scheduler/base_scheduler.py +++ b/src/memos/mem_scheduler/base_scheduler.py @@ -125,12 +125,17 @@ def initialize_modules( self.dispatcher_monitor.start() # initialize with auth_config - if self.auth_config_path is not None and Path(self.auth_config_path).exists(): - self.auth_config = AuthConfig.from_local_config(config_path=self.auth_config_path) - elif AuthConfig.default_config_exists(): - self.auth_config = AuthConfig.from_local_config() - else: - self.auth_config = AuthConfig.from_local_env() + try: + if self.auth_config_path is not None and Path(self.auth_config_path).exists(): + self.auth_config = AuthConfig.from_local_config( + config_path=self.auth_config_path + ) + elif AuthConfig.default_config_exists(): + self.auth_config = AuthConfig.from_local_config() + else: + self.auth_config = AuthConfig.from_local_env() + except Exception: + pass if self.auth_config is not None: self.rabbitmq_config = self.auth_config.rabbitmq @@ -637,3 +642,54 @@ def _cleanup_queues(self) -> None: self._web_log_message_queue.get_nowait() except queue.Empty: pass + + def mem_scheduler_wait(self, timeout: float = 180.0, poll: float = 0.1) -> bool: + """ + Block until the scheduler has finished processing all submitted messages. + + Strategy: + 1) Wait for the internal memos_message_queue to drain + - Prefer "unfinished_tasks" if available; otherwise fallback to empty() polling. + 2) If parallel dispatch is enabled, wait for all dispatched futures to complete via dispatcher.join(). + + Args: + timeout: Maximum seconds to wait in total. + poll: Polling interval when falling back to checks. + Returns: + True if drained before timeout, otherwise False. + """ + deadline = time.time() + timeout + + # 1) Wait for internal queue to drain + while True: + try: + unfinished = getattr(self.memos_message_queue, "unfinished_tasks", None) + if unfinished is not None: + if int(unfinished) == 0: + break + else: + if self.memos_message_queue.empty(): + break + except Exception: + # Be conservative: if any issue reading metrics, fallback to empty() + if self.memos_message_queue.empty(): + break + + if time.time() >= deadline: + logger.warning("mem_scheduler_wait: queue did not drain before timeout") + return False + time.sleep(poll) + + # 2) Wait for dispatcher futures (if running in parallel mode) + remaining = max(0.0, deadline - time.time()) + if self.enable_parallel_dispatch and self.dispatcher is not None: + try: + ok = self.dispatcher.join(timeout=remaining if remaining > 0 else 0) + except TypeError: + # Some implementations may not accept timeout + ok = self.dispatcher.join() + if not ok: + logger.warning("mem_scheduler_wait: dispatcher did not complete before timeout") + return False + + return True diff --git a/src/memos/mem_scheduler/general_scheduler.py b/src/memos/mem_scheduler/general_scheduler.py index 9e64e7311..13e92715d 100644 --- a/src/memos/mem_scheduler/general_scheduler.py +++ b/src/memos/mem_scheduler/general_scheduler.py @@ -1,4 +1,5 @@ import json +import traceback from memos.configs.mem_scheduler import GeneralSchedulerConfig from memos.log import get_logger @@ -297,27 +298,13 @@ def _process_memories_with_reader( logger.warning("No valid memory items found for processing") return - # Prepare scene data for mem_reader - scene_data = [] - for memory_item in memory_items: - scene_data.append( - { - "role": "user", # or determine from metadata - "content": memory_item.memory, - "chat_time": memory_item.metadata.updated_at - or memory_item.metadata.created_at, - } - ) - # Use mem_reader to process the memories - logger.info(f"Processing {len(scene_data)} memories with mem_reader") + logger.info(f"Processing {len(memory_items)} memories with mem_reader") # Extract memories using mem_reader - processed_memories = self.mem_reader.get_memory( - scene_data=scene_data, + processed_memories = self.mem_reader.fine_transfer_simple_mem( + memory_items, type="chat", - info={"user_id": user_id, "session_id": "", "mem_cube_id": mem_cube_id}, - mode="fine", # Use fast mode for async processing ) if processed_memories and len(processed_memories) > 0: @@ -349,8 +336,10 @@ def _process_memories_with_reader( text_mem.memory_manager.remove_and_refresh_memory() logger.info("Remove and Refresh Memories") - except Exception as e: - logger.error(f"Error in _process_memories_with_reader: {e}", exc_info=True) + except Exception: + logger.error( + f"Error in _process_memories_with_reader: {traceback.format_exc()}", exc_info=True + ) def _trigger_memory_reorganization( self, diff --git a/src/memos/memories/textual/tree.py b/src/memos/memories/textual/tree.py index 311bdd87a..ea087eac9 100644 --- a/src/memos/memories/textual/tree.py +++ b/src/memos/memories/textual/tree.py @@ -274,7 +274,14 @@ def get_all(self) -> dict: return all_items def delete(self, memory_ids: list[str]) -> None: - raise NotImplementedError + """Hard delete: permanently remove nodes and their edges from the graph.""" + if not memory_ids: + return + for mid in memory_ids: + try: + self.graph_store.delete_node(mid) + except Exception as e: + logger.warning(f"TreeTextMemory.delete_hard: failed to delete {mid}: {e}") def delete_all(self) -> None: """Delete all memories and their relationships from the graph store.""" diff --git a/src/memos/memories/textual/tree_text_memory/organize/manager.py b/src/memos/memories/textual/tree_text_memory/organize/manager.py index 1dd4aee5e..e8dc5c250 100644 --- a/src/memos/memories/textual/tree_text_memory/organize/manager.py +++ b/src/memos/memories/textual/tree_text_memory/organize/manager.py @@ -112,8 +112,7 @@ def _process_memory(self, memory: TextualMemoryItem): ids = [] # Add to WorkingMemory - working_id = self._add_memory_to_db(memory, "WorkingMemory") - ids.append(working_id) + self._add_memory_to_db(memory, "WorkingMemory") # Add to LongTermMemory and UserMemory if memory.metadata.memory_type in ["LongTermMemory", "UserMemory"]: diff --git a/src/memos/memories/textual/tree_text_memory/retrieve/recall.py b/src/memos/memories/textual/tree_text_memory/retrieve/recall.py index 84cc8ecb3..c6de85b0a 100644 --- a/src/memos/memories/textual/tree_text_memory/retrieve/recall.py +++ b/src/memos/memories/textual/tree_text_memory/retrieve/recall.py @@ -192,6 +192,7 @@ def _vector_recall( memory_scope: str, top_k: int = 20, max_num: int = 3, + status: str = "activated", cube_name: str | None = None, search_filter: dict | None = None, ) -> list[TextualMemoryItem]: @@ -207,6 +208,7 @@ def search_single(vec, filt=None): self.graph_store.search_by_embedding( vector=vec, top_k=top_k, + status=status, scope=memory_scope, cube_name=cube_name, search_filter=filt, diff --git a/src/memos/memories/textual/tree_text_memory/retrieve/searcher.py b/src/memos/memories/textual/tree_text_memory/retrieve/searcher.py index df154f23a..78bd73007 100644 --- a/src/memos/memories/textual/tree_text_memory/retrieve/searcher.py +++ b/src/memos/memories/textual/tree_text_memory/retrieve/searcher.py @@ -118,7 +118,7 @@ def _parse_task(self, query, info, mode, top_k=5, search_filter: dict | None = N related_nodes = [ self.graph_store.get_node(n["id"]) for n in self.graph_store.search_by_embedding( - query_embedding, top_k=top_k, search_filter=search_filter + query_embedding, top_k=top_k, status="activated", search_filter=search_filter ) ] memories = [] From 2ee4c4cfe0205e5abea20eb26a570eb0aada85d0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B8=AD=E9=98=B3=E9=98=B3?= Date: Thu, 16 Oct 2025 21:06:19 +0800 Subject: [PATCH 12/22] chore: update gitignore --- .gitignore | 1 + 1 file changed, 1 insertion(+) diff --git a/.gitignore b/.gitignore index ae7bdc4d6..8319a4d2f 100644 --- a/.gitignore +++ b/.gitignore @@ -15,6 +15,7 @@ evaluation/.env !evaluation/configs-example/*.json evaluation/configs/* **tree_textual_memory_locomo** +**script.py** .env evaluation/scripts/personamem From 593faa503c97f25460a16ee42e5fca762ced3258 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B8=AD=E9=98=B3=E9=98=B3?= Date: Fri, 17 Oct 2025 14:42:06 +0800 Subject: [PATCH 13/22] feat: improve database note write performance --- src/memos/mem_os/core.py | 13 +++---- .../tree_text_memory/organize/manager.py | 34 ++++++++++++------- 2 files changed, 27 insertions(+), 20 deletions(-) diff --git a/src/memos/mem_os/core.py b/src/memos/mem_os/core.py index 0487540fb..efb2d0622 100644 --- a/src/memos/mem_os/core.py +++ b/src/memos/mem_os/core.py @@ -726,14 +726,11 @@ def add( logger.info( f"time add: get mem_reader time user_id: {target_user_id} time is: {time.time() - time_start_2}" ) - mem_ids = [] - for mem in memories: - mem_id_list: list[str] = self.mem_cubes[mem_cube_id].text_mem.add(mem) - mem_ids.extend(mem_id_list) - logger.info( - f"Added memory user {target_user_id} to memcube {mem_cube_id}: {mem_id_list}" - ) - + memories_flatten = [m for m_list in memories for m in m_list] + mem_ids: list[str] = self.mem_cubes[mem_cube_id].text_mem.add(memories_flatten) + logger.info( + f"Added memory user {target_user_id} to memcube {mem_cube_id}: {mem_ids}" + ) # submit messages for scheduler if self.enable_mem_scheduler and self.mem_scheduler is not None: mem_cube = self.mem_cubes[mem_cube_id] diff --git a/src/memos/memories/textual/tree_text_memory/organize/manager.py b/src/memos/memories/textual/tree_text_memory/organize/manager.py index e8dc5c250..0abefa19e 100644 --- a/src/memos/memories/textual/tree_text_memory/organize/manager.py +++ b/src/memos/memories/textual/tree_text_memory/organize/manager.py @@ -58,7 +58,7 @@ def add(self, memories: list[TextualMemoryItem]) -> list[str]: """ added_ids: list[str] = [] - with ContextThreadPoolExecutor(max_workers=8) as executor: + with ContextThreadPoolExecutor(max_workers=20) as executor: futures = {executor.submit(self._process_memory, m): m for m in memories} for future in as_completed(futures, timeout=60): try: @@ -109,18 +109,28 @@ def _process_memory(self, memory: TextualMemoryItem): Process and add memory to different memory types (WorkingMemory, LongTermMemory, UserMemory). This method runs asynchronously to process each memory item. """ - ids = [] - - # Add to WorkingMemory - self._add_memory_to_db(memory, "WorkingMemory") + ids: list[str] = [] + futures = [] + + with ContextThreadPoolExecutor(max_workers=2, thread_name_prefix="mem") as ex: + f_working = ex.submit(self._add_memory_to_db, memory, "WorkingMemory") + futures.append(f_working) + + if memory.metadata.memory_type in ("LongTermMemory", "UserMemory"): + f_graph = ex.submit( + self._add_to_graph_memory, + memory=memory, + memory_type=memory.metadata.memory_type, + ) + futures.append(f_graph) - # Add to LongTermMemory and UserMemory - if memory.metadata.memory_type in ["LongTermMemory", "UserMemory"]: - added_id = self._add_to_graph_memory( - memory=memory, - memory_type=memory.metadata.memory_type, - ) - ids.append(added_id) + for fut in as_completed(futures): + try: + res = fut.result() + if isinstance(res, str) and res: + ids.append(res) + except Exception: + logger.warning("Parallel memory processing failed:\n%s", traceback.format_exc()) return ids From 8d2263a1f8e1e5c65e5b9ddc7ea423878bcf90ab Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B8=AD=E9=98=B3=E9=98=B3?= Date: Mon, 20 Oct 2025 15:42:47 +0800 Subject: [PATCH 14/22] feat: fix mem-read scheduler --- src/memos/mem_reader/simple_struct.py | 49 ++++-- src/memos/mem_scheduler/base_scheduler.py | 143 ++++++++++++++---- .../mem_scheduler/general_modules/misc.py | 23 ++- src/memos/mem_scheduler/general_scheduler.py | 15 +- .../tree_text_memory/organize/manager.py | 1 - 5 files changed, 182 insertions(+), 49 deletions(-) diff --git a/src/memos/mem_reader/simple_struct.py b/src/memos/mem_reader/simple_struct.py index caf4c19df..5e2ffef59 100644 --- a/src/memos/mem_reader/simple_struct.py +++ b/src/memos/mem_reader/simple_struct.py @@ -264,9 +264,7 @@ def _build_fast_node(w): text = w["text"] roles = {s.get("role", "") for s in w["sources"] if s.get("role")} mem_type = "UserMemory" if roles == {"user"} else "LongTermMemory" - tags = ["mode:fast", f"lang:{detect_lang(text)}"] + [ - f"role:{r}" for r in sorted(roles) - ] + tags = ["mode:fast"] return self._make_memory_item( value=text, info=info, memory_type=mem_type, tags=tags, sources=w["sources"] ) @@ -544,19 +542,42 @@ def _process_doc_data(self, scene_data_info, info, **kwargs): def _process_transfer_doc_data(self, raw_node: TextualMemoryItem): raise NotImplementedError - def parse_json_result(self, response_text): + def parse_json_result(self, response_text: str) -> dict: + s = (response_text or "").strip() + + m = re.search(r"```(?:json)?\s*([\s\S]*?)```", s, flags=re.I) + s = (m.group(1) if m else s.replace("```", "")).strip() + + i = s.find("{") + if i == -1: + return {} + s = s[i:].strip() + + try: + return json.loads(s) + except json.JSONDecodeError: + pass + + j = max(s.rfind("}"), s.rfind("]")) + if j != -1: + try: + return json.loads(s[: j + 1]) + except json.JSONDecodeError: + pass + + def _cheap_close(t: str) -> str: + t += "}" * max(0, t.count("{") - t.count("}")) + t += "]" * max(0, t.count("[") - t.count("]")) + return t + + t = _cheap_close(s) try: - json_start = response_text.find("{") - response_text = response_text[json_start:] - response_text = response_text.replace("```", "").strip() - if not response_text.endswith("}"): - response_text += "}" - return json.loads(response_text) + return json.loads(t) except json.JSONDecodeError as e: - logger.error(f"[JSONParse] Failed to decode JSON: {e}\nRaw:\n{response_text}") - return {} - except Exception as e: - logger.error(f"[JSONParse] Unexpected error: {e}") + logger.error( + f"[JSONParse] Failed to decode JSON: {e}\nTail: Raw {response_text} \ + n{s[-400:]}" + ) return {} def transform_memreader(self, data: dict) -> list[TextualMemoryItem]: diff --git a/src/memos/mem_scheduler/base_scheduler.py b/src/memos/mem_scheduler/base_scheduler.py index 550aee1f6..ab657bdfa 100644 --- a/src/memos/mem_scheduler/base_scheduler.py +++ b/src/memos/mem_scheduler/base_scheduler.py @@ -643,50 +643,135 @@ def _cleanup_queues(self) -> None: except queue.Empty: pass - def mem_scheduler_wait(self, timeout: float = 180.0, poll: float = 0.1) -> bool: + def mem_scheduler_wait( + self, timeout: float = 180.0, poll: float = 0.1, log_every: float = 1.0 + ) -> bool: """ - Block until the scheduler has finished processing all submitted messages. - - Strategy: - 1) Wait for the internal memos_message_queue to drain - - Prefer "unfinished_tasks" if available; otherwise fallback to empty() polling. - 2) If parallel dispatch is enabled, wait for all dispatched futures to complete via dispatcher.join(). - - Args: - timeout: Maximum seconds to wait in total. - poll: Polling interval when falling back to checks. - Returns: - True if drained before timeout, otherwise False. + Uses EWMA throughput, detects leaked `unfinished_tasks`, and waits for dispatcher. """ - deadline = time.time() + timeout + deadline = time.monotonic() + timeout + + # --- helpers (local, no external deps) --- + def _unfinished() -> int: + """Prefer `unfinished_tasks`; fallback to `qsize()`.""" + try: + u = getattr(self.memos_message_queue, "unfinished_tasks", None) + if u is not None: + return int(u) + except Exception: + pass + try: + return int(self.memos_message_queue.qsize()) + except Exception: + return 0 + + def _fmt_eta(seconds: float | None) -> str: + """Format seconds to human-readable string.""" + if seconds is None or seconds != seconds or seconds == float("inf"): + return "unknown" + s = max(0, int(seconds)) + h, s = divmod(s, 3600) + m, s = divmod(s, 60) + if h > 0: + return f"{h:d}h{m:02d}m{s:02d}s" + if m > 0: + return f"{m:d}m{s:02d}s" + return f"{s:d}s" + + # --- EWMA throughput state (tasks/s) --- + alpha = 0.3 + rate = 0.0 + last_t = None # type: float | None + last_done = 0 + + # --- dynamic totals & stuck detection --- + init_unfinished = _unfinished() + done_total = 0 + last_unfinished = None + stuck_ticks = 0 + next_log = 0.0 - # 1) Wait for internal queue to drain while True: + # 1) read counters + curr_unfinished = _unfinished() try: - unfinished = getattr(self.memos_message_queue, "unfinished_tasks", None) - if unfinished is not None: - if int(unfinished) == 0: - break - else: - if self.memos_message_queue.empty(): - break + qsz = int(self.memos_message_queue.qsize()) except Exception: - # Be conservative: if any issue reading metrics, fallback to empty() - if self.memos_message_queue.empty(): - break + qsz = -1 + + pend = run = 0 + stats_fn = getattr(self.dispatcher, "stats", None) + if self.enable_parallel_dispatch and self.dispatcher is not None and callable(stats_fn): + try: + st = ( + stats_fn() + ) # expected: {'pending':int,'running':int,'done':int?,'rate':float?} + pend = int(st.get("pending", 0)) + run = int(st.get("running", 0)) + except Exception: + pass + + # 2) dynamic total (allows new tasks queued while waiting) + total_now = max(init_unfinished, done_total + curr_unfinished) + done_total = max(0, total_now - curr_unfinished) + + # 3) update EWMA throughput + now = time.monotonic() + if last_t is None: + last_t = now + else: + dt = max(1e-6, now - last_t) + dc = max(0, done_total - last_done) + inst = dc / dt + rate = inst if rate == 0.0 else alpha * inst + (1 - alpha) * rate + last_t = now + last_done = done_total + + eta = None if rate <= 1e-9 else (curr_unfinished / rate) + + # 4) progress log (throttled) + if now >= next_log: + print( + f"[mem_scheduler_wait] remaining≈{curr_unfinished} | throughput≈{rate:.2f} msg/s | ETA≈{_fmt_eta(eta)} " + f"| qsize={qsz} pending={pend} running={run}" + ) + next_log = now + max(0.2, log_every) + + # 5) exit / stuck detection + idle_dispatcher = ( + (pend == 0 and run == 0) + if (self.enable_parallel_dispatch and self.dispatcher is not None) + else True + ) + if curr_unfinished == 0: + break + if curr_unfinished > 0 and qsz == 0 and idle_dispatcher: + if last_unfinished == curr_unfinished: + stuck_ticks += 1 + else: + stuck_ticks = 0 + else: + stuck_ticks = 0 + last_unfinished = curr_unfinished + + if stuck_ticks >= 3: + logger.warning( + "mem_scheduler_wait: detected leaked 'unfinished_tasks' -> treating queue as drained" + ) + break - if time.time() >= deadline: + if now >= deadline: logger.warning("mem_scheduler_wait: queue did not drain before timeout") return False + time.sleep(poll) - # 2) Wait for dispatcher futures (if running in parallel mode) - remaining = max(0.0, deadline - time.time()) + # 6) wait dispatcher (second stage) + remaining = max(0.0, deadline - time.monotonic()) if self.enable_parallel_dispatch and self.dispatcher is not None: try: ok = self.dispatcher.join(timeout=remaining if remaining > 0 else 0) except TypeError: - # Some implementations may not accept timeout ok = self.dispatcher.join() if not ok: logger.warning("mem_scheduler_wait: dispatcher did not complete before timeout") diff --git a/src/memos/mem_scheduler/general_modules/misc.py b/src/memos/mem_scheduler/general_modules/misc.py index 3c7116b74..abc41ca36 100644 --- a/src/memos/mem_scheduler/general_modules/misc.py +++ b/src/memos/mem_scheduler/general_modules/misc.py @@ -173,7 +173,9 @@ def put(self, item: T, block: bool = False, timeout: float | None = None) -> Non """Put an item into the queue. If the queue is full, the oldest item will be automatically removed to make space. - This operation is thread-safe. + IMPORTANT: When we drop an item we also call `task_done()` to keep + the internal `unfinished_tasks` counter consistent (the dropped task + will never be processed). Args: item: The item to be put into the queue @@ -184,19 +186,34 @@ def put(self, item: T, block: bool = False, timeout: float | None = None) -> Non # First try non-blocking put super().put(item, block=block, timeout=timeout) except Full: + # Remove oldest item and mark it done to avoid leaking unfinished_tasks with suppress(Empty): - self.get_nowait() # Remove oldest item + _ = self.get_nowait() + # If the removed item had previously incremented unfinished_tasks, + # we must decrement here since it will never be processed. + with suppress(ValueError): + self.task_done() # Retry putting the new item super().put(item, block=block, timeout=timeout) def get_queue_content_without_pop(self) -> list[T]: """Return a copy of the queue's contents without modifying it.""" - return list(self.queue) + # Ensure a consistent snapshot by holding the mutex + with self.mutex: + return list(self.queue) def clear(self) -> None: """Remove all items from the queue. This operation is thread-safe. + IMPORTANT: We also decrement `unfinished_tasks` by the number of + items cleared, since those tasks will never be processed. """ with self.mutex: + dropped = len(self.queue) self.queue.clear() + # Call task_done() outside of the mutex to avoid deadlocks because + # Queue.task_done() acquires the same condition bound to `self.mutex`. + for _ in range(dropped): + with suppress(ValueError): + self.task_done() diff --git a/src/memos/mem_scheduler/general_scheduler.py b/src/memos/mem_scheduler/general_scheduler.py index 13e92715d..688396c54 100644 --- a/src/memos/mem_scheduler/general_scheduler.py +++ b/src/memos/mem_scheduler/general_scheduler.py @@ -1,3 +1,4 @@ +import concurrent.futures import json import traceback @@ -222,7 +223,7 @@ def _add_message_consumer(self, messages: list[ScheduleMessageItem]) -> None: def _mem_read_message_consumer(self, messages: list[ScheduleMessageItem]) -> None: logger.info(f"Messages {messages} assigned to {MEM_READ_LABEL} handler.") - for message in messages: + def process_message(message: ScheduleMessageItem): try: user_id = message.user_id mem_cube_id = message.mem_cube_id @@ -231,6 +232,8 @@ def _mem_read_message_consumer(self, messages: list[ScheduleMessageItem]) -> Non # Parse the memory IDs from content mem_ids = json.loads(content) if isinstance(content, str) else content + if not mem_ids: + return logger.info( f"Processing mem_read for user_id={user_id}, mem_cube_id={mem_cube_id}, mem_ids={mem_ids}" @@ -240,7 +243,7 @@ def _mem_read_message_consumer(self, messages: list[ScheduleMessageItem]) -> Non text_mem = mem_cube.text_mem if not isinstance(text_mem, TreeTextMemory): logger.error(f"Expected TreeTextMemory but got {type(text_mem).__name__}") - continue + return # Use mem_reader to process the memories self._process_memories_with_reader( @@ -258,6 +261,14 @@ def _mem_read_message_consumer(self, messages: list[ScheduleMessageItem]) -> Non except Exception as e: logger.error(f"Error processing mem_read message: {e}", exc_info=True) + with concurrent.futures.ThreadPoolExecutor(max_workers=min(8, len(messages))) as executor: + futures = [executor.submit(process_message, msg) for msg in messages] + for future in concurrent.futures.as_completed(futures): + try: + future.result() + except Exception as e: + logger.error(f"Thread task failed: {e}", exc_info=True) + def _process_memories_with_reader( self, mem_ids: list[str], diff --git a/src/memos/memories/textual/tree_text_memory/organize/manager.py b/src/memos/memories/textual/tree_text_memory/organize/manager.py index 0abefa19e..49d01e841 100644 --- a/src/memos/memories/textual/tree_text_memory/organize/manager.py +++ b/src/memos/memories/textual/tree_text_memory/organize/manager.py @@ -146,7 +146,6 @@ def _add_memory_to_db(self, memory: TextualMemoryItem, memory_type: str) -> str: # Insert node into graph self.graph_store.add_node(working_memory.id, working_memory.memory, metadata) - return working_memory.id def _add_to_graph_memory(self, memory: TextualMemoryItem, memory_type: str): """ From e250ab801e4e415a5e8aa06ea771a7eb243ae160 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B8=AD=E9=98=B3=E9=98=B3?= Date: Mon, 20 Oct 2025 16:07:54 +0800 Subject: [PATCH 15/22] fix: nebula group-by bug --- src/memos/graph_dbs/nebular.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/memos/graph_dbs/nebular.py b/src/memos/graph_dbs/nebular.py index 38f08ff8d..3eed41b69 100644 --- a/src/memos/graph_dbs/nebular.py +++ b/src/memos/graph_dbs/nebular.py @@ -1144,10 +1144,9 @@ def get_grouped_counts( group_by_fields.append(alias) # Full GQL query construction gql = f""" - MATCH (n) + MATCH (n@Memory /*+ INDEX(idx_memory_user_name) */) {where_clause} RETURN {", ".join(return_fields)}, COUNT(n) AS count - GROUP BY {", ".join(group_by_fields)} """ result = self.execute_query(gql) # Pure GQL string execution From 14e986e893c90320240da4765c819f7042d9e2a4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B8=AD=E9=98=B3=E9=98=B3?= Date: Mon, 20 Oct 2025 16:25:16 +0800 Subject: [PATCH 16/22] fix: bug in adding mem scheduler --- src/memos/mem_scheduler/general_scheduler.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/src/memos/mem_scheduler/general_scheduler.py b/src/memos/mem_scheduler/general_scheduler.py index 688396c54..e3fe5a7de 100644 --- a/src/memos/mem_scheduler/general_scheduler.py +++ b/src/memos/mem_scheduler/general_scheduler.py @@ -201,7 +201,15 @@ def _add_message_consumer(self, messages: list[ScheduleMessageItem]) -> None: mem_cube = msg.mem_cube for memory_id in userinput_memory_ids: - mem_item: TextualMemoryItem = mem_cube.text_mem.get(memory_id=memory_id) + try: + mem_item: TextualMemoryItem = mem_cube.text_mem.get( + memory_id=memory_id + ) + except Exception: + logger.warning( + f"This MemoryItem {memory_id} has already been deleted." + ) + continue mem_type = mem_item.metadata.memory_type mem_content = mem_item.memory From 31adec0f96246bdeeca692025d4c1d5520d87284 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B8=AD=E9=98=B3=E9=98=B3?= Date: Tue, 21 Oct 2025 11:24:10 +0800 Subject: [PATCH 17/22] fix: nebula index; mem-reader chat-time; --- src/memos/graph_dbs/nebular.py | 13 ++++++++++--- src/memos/mem_os/core.py | 2 +- src/memos/mem_reader/simple_struct.py | 15 ++++++++------- src/memos/mem_scheduler/base_scheduler.py | 2 +- src/memos/mem_scheduler/general_scheduler.py | 12 ++++++++---- 5 files changed, 28 insertions(+), 16 deletions(-) diff --git a/src/memos/graph_dbs/nebular.py b/src/memos/graph_dbs/nebular.py index 3eed41b69..96151b441 100644 --- a/src/memos/graph_dbs/nebular.py +++ b/src/memos/graph_dbs/nebular.py @@ -445,7 +445,7 @@ def remove_oldest_memory(self, memory_type: str, keep_latest: int) -> None: count = self.count_nodes(memory_type) if count > keep_latest: delete_query = f""" - MATCH (n@Memory /*+ INDEX(idx_memory_user_name) */) + MATCH (n@Memory /*+ INDEX(idx_memory_user_name_memory_type) */) WHERE n.memory_type = '{memory_type}' {optional_condition} ORDER BY n.updated_at DESC @@ -605,7 +605,7 @@ def get_memory_count(self, memory_type: str) -> int: @timed def count_nodes(self, scope: str | None = None) -> int: - query = "MATCH (n@Memory)" + query = "MATCH (n@Memory /*+ INDEX(idx_memory_user_name) */)" conditions = [] if scope: @@ -1584,7 +1584,14 @@ def _create_basic_property_indexes(self) -> None: Create standard B-tree indexes on user_name when use Shared Database Multi-Tenant Mode. """ - fields = ["status", "memory_type", "created_at", "updated_at", "user_name"] + fields = [ + "status", + "memory_type", + "created_at", + "updated_at", + "user_name", + "user_name_memory_type", + ] for field in fields: index_name = f"idx_memory_{field}" diff --git a/src/memos/mem_os/core.py b/src/memos/mem_os/core.py index efb2d0622..71f85fc86 100644 --- a/src/memos/mem_os/core.py +++ b/src/memos/mem_os/core.py @@ -691,7 +691,7 @@ def add( sync_mode = self.mem_cubes[mem_cube_id].text_mem.mode if sync_mode == "async": assert self.mem_scheduler is not None, ( - "Mem-Scheduler must be working when use synchronous memory adding." + "Mem-Scheduler must be working when use asynchronous memory adding." ) logger.debug(f"Mem-reader mode is: {sync_mode}") time_start_1 = time.time() diff --git a/src/memos/mem_reader/simple_struct.py b/src/memos/mem_reader/simple_struct.py index 5e2ffef59..9ec8ca166 100644 --- a/src/memos/mem_reader/simple_struct.py +++ b/src/memos/mem_reader/simple_struct.py @@ -157,7 +157,7 @@ def __init__(self, config: SimpleStructMemReaderConfig): self.chunker = ChunkerFactory.from_config(config.chunker) self.memory_max_length = 8000 # Use token-based windowing; default to ~5000 tokens if not configured - self.chat_window_max_tokens = getattr(self.config, "chat_window_max_tokens", 5000) + self.chat_window_max_tokens = getattr(self.config, "chat_window_max_tokens", 1024) self._count_tokens = _count_tokens_text def _make_memory_item( @@ -229,11 +229,12 @@ def _iter_chat_windows(self, scene_data_info, max_tokens=None, overlap=200): role = item.get("role", "") content = item.get("content", "") chat_time = item.get("chat_time", None) - prefix = ( - f"{role}: " - if (role and role != "mix") - else (f"[{chat_time}]: " if chat_time else "") - ) + parts = [] + if role and str(role).lower() != "mix": + parts.append(f"{role}: ") + if chat_time: + parts.append(f"[{chat_time}]: ") + prefix = "".join(parts) line = f"{prefix}{content}\n" if self._count_tokens(cur_text + line) > max_tokens and cur_text: @@ -576,7 +577,7 @@ def _cheap_close(t: str) -> str: except json.JSONDecodeError as e: logger.error( f"[JSONParse] Failed to decode JSON: {e}\nTail: Raw {response_text} \ - n{s[-400:]}" + json: {s}" ) return {} diff --git a/src/memos/mem_scheduler/base_scheduler.py b/src/memos/mem_scheduler/base_scheduler.py index ab657bdfa..9ec976405 100644 --- a/src/memos/mem_scheduler/base_scheduler.py +++ b/src/memos/mem_scheduler/base_scheduler.py @@ -644,7 +644,7 @@ def _cleanup_queues(self) -> None: pass def mem_scheduler_wait( - self, timeout: float = 180.0, poll: float = 0.1, log_every: float = 1.0 + self, timeout: float = 180.0, poll: float = 0.1, log_every: float = 0.01 ) -> bool: """ Uses EWMA throughput, detects leaked `unfinished_tasks`, and waits for dispatcher. diff --git a/src/memos/mem_scheduler/general_scheduler.py b/src/memos/mem_scheduler/general_scheduler.py index e3fe5a7de..5c6152b32 100644 --- a/src/memos/mem_scheduler/general_scheduler.py +++ b/src/memos/mem_scheduler/general_scheduler.py @@ -321,10 +321,14 @@ def _process_memories_with_reader( logger.info(f"Processing {len(memory_items)} memories with mem_reader") # Extract memories using mem_reader - processed_memories = self.mem_reader.fine_transfer_simple_mem( - memory_items, - type="chat", - ) + try: + processed_memories = self.mem_reader.fine_transfer_simple_mem( + memory_items, + type="chat", + ) + except Exception as e: + logger.warning(f"{e}: Fail to transfer mem: {memory_items}") + processed_memories = [] if processed_memories and len(processed_memories) > 0: # Flatten the results (mem_reader returns list of lists) From 18f3cc8b49168d626e7b8773bd174689c99ab166 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B8=AD=E9=98=B3=E9=98=B3?= Date: Tue, 21 Oct 2025 11:36:10 +0800 Subject: [PATCH 18/22] format: searcher --- .../memories/textual/tree_text_memory/retrieve/searcher.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/memos/memories/textual/tree_text_memory/retrieve/searcher.py b/src/memos/memories/textual/tree_text_memory/retrieve/searcher.py index 9263c34dc..96c6c97f1 100644 --- a/src/memos/memories/textual/tree_text_memory/retrieve/searcher.py +++ b/src/memos/memories/textual/tree_text_memory/retrieve/searcher.py @@ -134,7 +134,11 @@ def _parse_task( related_nodes = [ self.graph_store.get_node(n["id"]) for n in self.graph_store.search_by_embedding( - query_embedding, top_k=top_k, status="activated", search_filter=search_filter, user_name=user_name + query_embedding, + top_k=top_k, + status="activated", + search_filter=search_filter, + user_name=user_name, ) ] memories = [] From 4e0133e57c7fdb0e55c8c633a81e80eacad379f2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B8=AD=E9=98=B3=E9=98=B3?= Date: Tue, 21 Oct 2025 16:13:31 +0800 Subject: [PATCH 19/22] fix: some bug in shceduler and mem-reader --- src/memos/graph_dbs/nebular.py | 3 +-- src/memos/mem_reader/simple_struct.py | 3 +++ src/memos/mem_scheduler/general_scheduler.py | 22 ++-------------- src/memos/memories/textual/tree.py | 2 +- .../tree_text_memory/organize/manager.py | 25 +++++++++++-------- 5 files changed, 21 insertions(+), 34 deletions(-) diff --git a/src/memos/graph_dbs/nebular.py b/src/memos/graph_dbs/nebular.py index 9f0b38635..12b493e58 100644 --- a/src/memos/graph_dbs/nebular.py +++ b/src/memos/graph_dbs/nebular.py @@ -446,7 +446,7 @@ def remove_oldest_memory( count = self.count_nodes(memory_type, user_name) if count > keep_latest: delete_query = f""" - MATCH (n@Memory /*+ INDEX(idx_memory_user_name_memory_type) */) + MATCH (n@Memory /*+ INDEX(idx_memory_user_name) */) WHERE n.memory_type = '{memory_type}' {optional_condition} ORDER BY n.updated_at DESC @@ -1627,7 +1627,6 @@ def _create_basic_property_indexes(self) -> None: "created_at", "updated_at", "user_name", - "user_name_memory_type", ] for field in fields: diff --git a/src/memos/mem_reader/simple_struct.py b/src/memos/mem_reader/simple_struct.py index 9ec8ca166..9f5eb9832 100644 --- a/src/memos/mem_reader/simple_struct.py +++ b/src/memos/mem_reader/simple_struct.py @@ -575,6 +575,9 @@ def _cheap_close(t: str) -> str: try: return json.loads(t) except json.JSONDecodeError as e: + if "Invalid \\escape" in str(e): + s = s.replace("\\", "\\\\") + return json.loads(s) logger.error( f"[JSONParse] Failed to decode JSON: {e}\nTail: Raw {response_text} \ json: {s}" diff --git a/src/memos/mem_scheduler/general_scheduler.py b/src/memos/mem_scheduler/general_scheduler.py index e1c940290..e25858dc7 100644 --- a/src/memos/mem_scheduler/general_scheduler.py +++ b/src/memos/mem_scheduler/general_scheduler.py @@ -184,7 +184,7 @@ def _answer_message_consumer(self, messages: list[ScheduleMessageItem]) -> None: def _add_message_consumer(self, messages: list[ScheduleMessageItem]) -> None: logger.info(f"Messages {messages} assigned to {ADD_LABEL} handler.") # Process the query in a session turn - grouped_messages = self.dispatcher.group_messages_by_user_and_cube(messages=messages) + grouped_messages = self.dispatcher._group_messages_by_user_and_mem_cube(messages=messages) self.validate_schedule_messages(messages=messages, label=ADD_LABEL) try: @@ -364,6 +364,7 @@ def _process_memories_with_reader( logger.info("Delete raw mem_ids") text_mem.memory_manager.remove_and_refresh_memory() logger.info("Remove and Refresh Memories") + logger.debug(f"Finished add {user_id} memory: {mem_ids}") except Exception: logger.error( @@ -384,25 +385,6 @@ def _trigger_memory_reorganization( user_id: User ID mem_cube_id: Memory cube ID """ - try: - # Check if reorganization is enabled - if hasattr(text_mem, "is_reorganize") and text_mem.is_reorganize: - logger.info( - f"Triggering memory reorganization for user_id={user_id}, mem_cube_id={mem_cube_id}" - ) - - # Get current working memory size - current_sizes = text_mem.get_current_memory_size() - logger.info(f"Current memory sizes: {current_sizes}") - - # The reorganization will be handled by the memory manager - # This is just a trigger point for logging and monitoring - logger.info("Memory reorganization triggered successfully") - else: - logger.info("Memory reorganization is disabled, skipping reorganization trigger") - - except Exception as e: - logger.error(f"Error in _trigger_memory_reorganization: {e}", exc_info=True) def process_session_turn( self, diff --git a/src/memos/memories/textual/tree.py b/src/memos/memories/textual/tree.py index ea087eac9..23244d98e 100644 --- a/src/memos/memories/textual/tree.py +++ b/src/memos/memories/textual/tree.py @@ -89,7 +89,7 @@ def add(self, memories: list[TextualMemoryItem | dict[str, Any]]) -> list[str]: Args: memories: List of TextualMemoryItem objects or dictionaries to add. """ - return self.memory_manager.add(memories) + return self.memory_manager.add(memories, mode=self.mode) def replace_working_memory(self, memories: list[TextualMemoryItem]) -> None: self.memory_manager.replace_working_memory(memories) diff --git a/src/memos/memories/textual/tree_text_memory/organize/manager.py b/src/memos/memories/textual/tree_text_memory/organize/manager.py index d014dcd47..54776134b 100644 --- a/src/memos/memories/textual/tree_text_memory/organize/manager.py +++ b/src/memos/memories/textual/tree_text_memory/organize/manager.py @@ -52,7 +52,9 @@ def __init__( ) self._merged_threshold = merged_threshold - def add(self, memories: list[TextualMemoryItem], user_name: str | None = None) -> list[str]: + def add( + self, memories: list[TextualMemoryItem], user_name: str | None = None, mode: str = "sync" + ) -> list[str]: """ Add new memories in parallel to different memory types. """ @@ -67,17 +69,18 @@ def add(self, memories: list[TextualMemoryItem], user_name: str | None = None) - except Exception as e: logger.exception("Memory processing error: ", exc_info=e) - for mem_type in ["WorkingMemory", "LongTermMemory", "UserMemory"]: - try: - self.graph_store.remove_oldest_memory( - memory_type="WorkingMemory", - keep_latest=self.memory_size[mem_type], - user_name=user_name, - ) - except Exception: - logger.warning(f"Remove {mem_type} error: {traceback.format_exc()}") + if mode == "sync": + for mem_type in ["WorkingMemory", "LongTermMemory", "UserMemory"]: + try: + self.graph_store.remove_oldest_memory( + memory_type="WorkingMemory", + keep_latest=self.memory_size[mem_type], + user_name=user_name, + ) + except Exception: + logger.warning(f"Remove {mem_type} error: {traceback.format_exc()}") - self._refresh_memory_size(user_name=user_name) + self._refresh_memory_size(user_name=user_name) return added_ids def replace_working_memory( From 6653beafc55c8d8dd9ccd1c71735e33d32d66226 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B8=AD=E9=98=B3=E9=98=B3?= Date: Tue, 21 Oct 2025 16:57:36 +0800 Subject: [PATCH 20/22] feat: add mem-organize in scheduler --- src/memos/mem_scheduler/general_scheduler.py | 100 ++++++++++++++++-- .../mem_scheduler/schemas/general_schemas.py | 1 + 2 files changed, 92 insertions(+), 9 deletions(-) diff --git a/src/memos/mem_scheduler/general_scheduler.py b/src/memos/mem_scheduler/general_scheduler.py index e25858dc7..f47cc0cc5 100644 --- a/src/memos/mem_scheduler/general_scheduler.py +++ b/src/memos/mem_scheduler/general_scheduler.py @@ -10,6 +10,7 @@ ADD_LABEL, ANSWER_LABEL, DEFAULT_MAX_QUERY_KEY_WORDS, + MEM_ORGANIZE_LABEL, MEM_READ_LABEL, QUERY_LABEL, WORKING_MEMORY_TYPE, @@ -38,6 +39,7 @@ def __init__(self, config: GeneralSchedulerConfig): ANSWER_LABEL: self._answer_message_consumer, ADD_LABEL: self._add_message_consumer, MEM_READ_LABEL: self._mem_read_message_consumer, + MEM_ORGANIZE_LABEL: self._mem_reorganize_message_consumer, } self.dispatcher.register_handlers(handlers) @@ -350,11 +352,6 @@ def _process_memories_with_reader( logger.info( f"Added {len(enhanced_mem_ids)} enhanced memories: {enhanced_mem_ids}" ) - - # Trigger memory reorganization if needed - self._trigger_memory_reorganization( - text_mem=text_mem, user_id=user_id, mem_cube_id=mem_cube_id - ) else: logger.info("No enhanced memories generated by mem_reader") else: @@ -371,20 +368,105 @@ def _process_memories_with_reader( f"Error in _process_memories_with_reader: {traceback.format_exc()}", exc_info=True ) - def _trigger_memory_reorganization( + def _mem_reorganize_message_consumer(self, messages: list[ScheduleMessageItem]) -> None: + logger.info(f"Messages {messages} assigned to {MEM_READ_LABEL} handler.") + + def process_message(message: ScheduleMessageItem): + try: + user_id = message.user_id + mem_cube_id = message.mem_cube_id + mem_cube = message.mem_cube + content = message.content + + # Parse the memory IDs from content + mem_ids = json.loads(content) if isinstance(content, str) else content + if not mem_ids: + return + + logger.info( + f"Processing mem_read for user_id={user_id}, mem_cube_id={mem_cube_id}, mem_ids={mem_ids}" + ) + + # Get the text memory from the mem_cube + text_mem = mem_cube.text_mem + if not isinstance(text_mem, TreeTextMemory): + logger.error(f"Expected TreeTextMemory but got {type(text_mem).__name__}") + return + + # Use mem_reader to process the memories + self._process_memories_with_reorganize( + mem_ids=mem_ids, + user_id=user_id, + mem_cube_id=mem_cube_id, + mem_cube=mem_cube, + text_mem=text_mem, + ) + + logger.info( + f"Successfully processed mem_read for user_id={user_id}, mem_cube_id={mem_cube_id}" + ) + + except Exception as e: + logger.error(f"Error processing mem_read message: {e}", exc_info=True) + + with concurrent.futures.ThreadPoolExecutor(max_workers=min(8, len(messages))) as executor: + futures = [executor.submit(process_message, msg) for msg in messages] + for future in concurrent.futures.as_completed(futures): + try: + future.result() + except Exception as e: + logger.error(f"Thread task failed: {e}", exc_info=True) + + def _process_memories_with_reorganize( self, - text_mem: TreeTextMemory, + mem_ids: list[str], user_id: str, mem_cube_id: str, + mem_cube: GeneralMemCube, + text_mem: TreeTextMemory, ) -> None: """ - Trigger memory reorganization after enhanced processing. + Process memories using mem_reorganize for enhanced memory processing. Args: - text_mem: Text memory instance + mem_ids: List of memory IDs to process user_id: User ID mem_cube_id: Memory cube ID + mem_cube: Memory cube instance + text_mem: Text memory instance """ + try: + # Get the mem_reader from the parent MOSCore + if not hasattr(self, "mem_reader") or self.mem_reader is None: + logger.warning( + "mem_reader not available in scheduler, skipping enhanced processing" + ) + return + + # Get the original memory items + memory_items = [] + for mem_id in mem_ids: + try: + memory_item = text_mem.get(mem_id) + memory_items.append(memory_item) + except Exception as e: + logger.warning(f"Failed to get memory {mem_id}: {e}") + continue + + if not memory_items: + logger.warning("No valid memory items found for processing") + return + + # Use mem_reader to process the memories + logger.info(f"Processing {len(memory_items)} memories with mem_reader") + text_mem.memory_manager.remove_and_refresh_memory() + logger.info("Remove and Refresh Memories") + logger.debug(f"Finished add {user_id} memory: {mem_ids}") + + except Exception: + logger.error( + f"Error in _process_memories_with_reader: {traceback.format_exc()}", exc_info=True + ) def process_session_turn( self, diff --git a/src/memos/mem_scheduler/schemas/general_schemas.py b/src/memos/mem_scheduler/schemas/general_schemas.py index 6599a012e..248c42e80 100644 --- a/src/memos/mem_scheduler/schemas/general_schemas.py +++ b/src/memos/mem_scheduler/schemas/general_schemas.py @@ -9,6 +9,7 @@ ANSWER_LABEL = "answer" ADD_LABEL = "add" MEM_READ_LABEL = "mem_read" +MEM_ORGANIZE_LABEL = "mem_organize" TreeTextMemory_SEARCH_METHOD = "tree_text_memory_search" TreeTextMemory_FINE_SEARCH_METHOD = "tree_text_memory_fine_search" From af5c940abbb737c91288b5ea60578a49dfbf02af Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B8=AD=E9=98=B3=E9=98=B3?= Date: Tue, 21 Oct 2025 20:16:52 +0800 Subject: [PATCH 21/22] feat: add tree.mode to config; modify scheduler config --- src/memos/configs/mem_scheduler.py | 2 -- src/memos/configs/memory.py | 5 +++++ src/memos/mem_scheduler/base_scheduler.py | 2 +- src/memos/memories/textual/tree.py | 5 +---- 4 files changed, 7 insertions(+), 7 deletions(-) diff --git a/src/memos/configs/mem_scheduler.py b/src/memos/configs/mem_scheduler.py index 39586081c..2d6155ec2 100644 --- a/src/memos/configs/mem_scheduler.py +++ b/src/memos/configs/mem_scheduler.py @@ -28,13 +28,11 @@ class BaseSchedulerConfig(BaseConfig): thread_pool_max_workers: int = Field( default=DEFAULT_THREAD_POOL_MAX_WORKERS, gt=1, - lt=20, description=f"Maximum worker threads in pool (default: {DEFAULT_THREAD_POOL_MAX_WORKERS})", ) consume_interval_seconds: float = Field( default=DEFAULT_CONSUME_INTERVAL_SECONDS, gt=0, - le=60, description=f"Interval for consuming messages from queue in seconds (default: {DEFAULT_CONSUME_INTERVAL_SECONDS})", ) auth_config_path: str | None = Field( diff --git a/src/memos/configs/memory.py b/src/memos/configs/memory.py index 237450e15..2c3a715f7 100644 --- a/src/memos/configs/memory.py +++ b/src/memos/configs/memory.py @@ -179,6 +179,11 @@ class TreeTextMemoryConfig(BaseTextMemoryConfig): ), ) + mode: str | None = Field( + default="sync", + description=("whether use asynchronous mode in memory add"), + ) + class SimpleTreeTextMemoryConfig(TreeTextMemoryConfig): """Simple tree text memory configuration class.""" diff --git a/src/memos/mem_scheduler/base_scheduler.py b/src/memos/mem_scheduler/base_scheduler.py index 4950f87bb..1e8b042b1 100644 --- a/src/memos/mem_scheduler/base_scheduler.py +++ b/src/memos/mem_scheduler/base_scheduler.py @@ -88,7 +88,7 @@ def __init__(self, config: BaseSchedulerConfig): # internal message queue self.max_internal_message_queue_size = self.config.get( - "max_internal_message_queue_size", 100 + "max_internal_message_queue_size", 10000 ) self.memos_message_queue: Queue[ScheduleMessageItem] = Queue( maxsize=self.max_internal_message_queue_size diff --git a/src/memos/memories/textual/tree.py b/src/memos/memories/textual/tree.py index 23244d98e..74a52eb8e 100644 --- a/src/memos/memories/textual/tree.py +++ b/src/memos/memories/textual/tree.py @@ -30,13 +30,10 @@ class TreeTextMemory(BaseTextMemory): """General textual memory implementation for storing and retrieving memories.""" - # Override the default mode to async for TreeTextMemory - mode: str = "async" - def __init__(self, config: TreeTextMemoryConfig): """Initialize memory with the given configuration.""" # Set mode from class default or override if needed - self.mode = getattr(self.__class__, "mode", "async") + self.mode = config.mode self.config: TreeTextMemoryConfig = config self.extractor_llm: OpenAILLM | OllamaLLM | AzureLLM = LLMFactory.from_config( config.extractor_llm From 28a20e98060c86ab76161ef86398896d80164e9e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B8=AD=E9=98=B3=E9=98=B3?= Date: Tue, 21 Oct 2025 20:40:26 +0800 Subject: [PATCH 22/22] fix: test bug --- src/memos/memories/textual/tree.py | 2 +- tests/memories/textual/test_tree.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/memos/memories/textual/tree.py b/src/memos/memories/textual/tree.py index 74a52eb8e..fccd83fa6 100644 --- a/src/memos/memories/textual/tree.py +++ b/src/memos/memories/textual/tree.py @@ -81,7 +81,7 @@ def __init__(self, config: TreeTextMemoryConfig): else: logger.info("No internet retriever configured") - def add(self, memories: list[TextualMemoryItem | dict[str, Any]]) -> list[str]: + def add(self, memories: list[TextualMemoryItem | dict[str, Any]], **kwargs) -> list[str]: """Add memories. Args: memories: List of TextualMemoryItem objects or dictionaries to add. diff --git a/tests/memories/textual/test_tree.py b/tests/memories/textual/test_tree.py index f3e662992..772a79d78 100644 --- a/tests/memories/textual/test_tree.py +++ b/tests/memories/textual/test_tree.py @@ -66,7 +66,7 @@ def test_add_calls_manager(mock_tree_text_memory): metadata=TreeNodeTextualMemoryMetadata(updated_at=None), ) mock_tree_text_memory.add([mock_item]) - mock_tree_text_memory.memory_manager.add.assert_called_once() + mock_tree_text_memory.memory_manager.add.assert_called_once_with([mock_item], mode="sync") def test_get_working_memory_sorted(mock_tree_text_memory): @@ -161,4 +161,4 @@ def test_add_returns_ids(mock_tree_text_memory): result = mock_tree_text_memory.add(mock_items) assert result == dummy_ids - mock_tree_text_memory.memory_manager.add.assert_called_once_with(mock_items) + mock_tree_text_memory.memory_manager.add.assert_called_once_with(mock_items, mode="sync")