From 3069462e67dd41ee64894eba5bd084c329c36363 Mon Sep 17 00:00:00 2001 From: CaralHsi Date: Tue, 7 Apr 2026 15:14:02 +0800 Subject: [PATCH 1/4] feat: support per-cube filter routing in multi-cube search (#1431) --- src/memos/multi_mem_cube/single_cube.py | 9 ++- src/memos/search/__init__.py | 14 +++- src/memos/search/search_service.py | 29 ++++++++ tests/search/test_resolve_filter_for_cube.py | 78 ++++++++++++++++++++ 4 files changed, 127 insertions(+), 3 deletions(-) create mode 100644 tests/search/test_resolve_filter_for_cube.py diff --git a/src/memos/multi_mem_cube/single_cube.py b/src/memos/multi_mem_cube/single_cube.py index 6a91f436f..355cb8cee 100644 --- a/src/memos/multi_mem_cube/single_cube.py +++ b/src/memos/multi_mem_cube/single_cube.py @@ -22,7 +22,7 @@ ) from memos.memories.textual.item import TextualMemoryItem from memos.multi_mem_cube.views import MemCubeView -from memos.search import search_text_memories +from memos.search import resolve_filter_for_cube, search_text_memories from memos.templates.mem_reader_prompts import PROMPT_MAPPING from memos.types.general_types import ( FINE_STRATEGY, @@ -91,6 +91,13 @@ def search_memories(self, search_req: APISearchRequest) -> dict[str, Any]: Unified memory search handling (text + preference memories). Preference memories are now searched through the same _search_text flow. """ + cube_filter = resolve_filter_for_cube(search_req.filter, self.cube_id) + if cube_filter is not search_req.filter: + import copy + + search_req = copy.copy(search_req) + search_req.filter = cube_filter + # Create UserContext object user_context = UserContext( user_id=search_req.user_id, diff --git a/src/memos/search/__init__.py b/src/memos/search/__init__.py index 71388c62b..d2c197403 100644 --- a/src/memos/search/__init__.py +++ b/src/memos/search/__init__.py @@ -1,4 +1,14 @@ -from .search_service import SearchContext, build_search_context, search_text_memories +from .search_service import ( + SearchContext, + build_search_context, + resolve_filter_for_cube, + search_text_memories, +) -__all__ = ["SearchContext", "build_search_context", "search_text_memories"] +__all__ = [ + "SearchContext", + "build_search_context", + "resolve_filter_for_cube", + "search_text_memories", +] diff --git a/src/memos/search/search_service.py b/src/memos/search/search_service.py index fa713a7d1..f4092d168 100644 --- a/src/memos/search/search_service.py +++ b/src/memos/search/search_service.py @@ -36,6 +36,35 @@ def build_search_context( ) +def resolve_filter_for_cube( + raw_filter: dict[str, Any] | None, cube_id: str +) -> dict[str, Any] | None: + """Resolve a multi-cube filter dict into the sub-filter for a single cube. + + Supported forms: + - None → None (no filter) + - {"and": [...]} / {"or": [...]} → returned as-is (unified, all cubes share) + - {"cube_A": {...}, "cube_B": {...}} → return raw_filter[cube_id] or None + Mixed top-level (and/or + cube keys) is rejected. + """ + if raw_filter is None: + return None + + has_logic_key = "and" in raw_filter or "or" in raw_filter + other_keys = {k for k in raw_filter if k not in ("and", "or")} + + if has_logic_key and other_keys: + raise ValueError( + "Invalid filter: top-level 'and'/'or' cannot coexist with per-cube keys " + f"{other_keys}. Use either a unified filter or per-cube filter, not both." + ) + + if has_logic_key: + return raw_filter + + return raw_filter.get(cube_id) + + def search_text_memories( text_mem: Any, search_req: APISearchRequest, diff --git a/tests/search/test_resolve_filter_for_cube.py b/tests/search/test_resolve_filter_for_cube.py new file mode 100644 index 000000000..ab6be71a3 --- /dev/null +++ b/tests/search/test_resolve_filter_for_cube.py @@ -0,0 +1,78 @@ +import pytest + +from memos.search.search_service import resolve_filter_for_cube + + +class TestResolveFilterForCube: + """Tests for resolve_filter_for_cube — multi-cube filter routing.""" + + # ── None passthrough ── + + def test_none_returns_none(self): + assert resolve_filter_for_cube(None, "cube_001") is None + + # ── Unified filter (filter2): top-level and/or ── + + def test_unified_and_returns_same_for_any_cube(self): + f = {"and": [{"tags": {"contains": "阅读"}}, {"created_at": {"gte": "2025-01-01"}}]} + assert resolve_filter_for_cube(f, "cube_001") is f + assert resolve_filter_for_cube(f, "cube_999") is f + + def test_unified_or_returns_same_for_any_cube(self): + f = {"or": [{"tags": {"contains": "A"}}, {"tags": {"contains": "B"}}]} + assert resolve_filter_for_cube(f, "cube_001") is f + + # ── Per-cube filter (filter1 / filter4) ── + + def test_per_cube_returns_matching_sub_filter(self): + sub_a = {"and": [{"tags": {"contains": "阅读"}}]} + sub_b = {"and": [{"tags": {"contains": "工作"}}]} + f = {"cube_A": sub_a, "cube_B": sub_b} + + assert resolve_filter_for_cube(f, "cube_A") is sub_a + assert resolve_filter_for_cube(f, "cube_B") is sub_b + + def test_per_cube_missing_key_returns_none(self): + f = { + "cube_A": {"and": [{"tags": {"contains": "阅读"}}]}, + "cube_B": {"and": [{"tags": {"contains": "工作"}}]}, + } + assert resolve_filter_for_cube(f, "cube_C") is None + + def test_per_cube_single_key(self): + sub = {"and": [{"created_at": {"gte": "2025-01-01"}}]} + f = {"cube_only": sub} + assert resolve_filter_for_cube(f, "cube_only") is sub + assert resolve_filter_for_cube(f, "other") is None + + # ── Mixed (filter3): illegal ── + + def test_mixed_and_with_cube_key_raises(self): + f = { + "and": [{"tags": {"contains": "阅读"}}], + "cube_A": {"and": [{"tags": {"contains": "工作"}}]}, + } + with pytest.raises(ValueError, match="cannot coexist"): + resolve_filter_for_cube(f, "cube_A") + + def test_mixed_or_with_cube_key_raises(self): + f = { + "or": [{"tags": {"contains": "阅读"}}], + "cube_B": {"and": [{"tags": {"contains": "工作"}}]}, + } + with pytest.raises(ValueError, match="cannot coexist"): + resolve_filter_for_cube(f, "cube_B") + + # ── Edge cases ── + + def test_empty_dict_returns_none(self): + assert resolve_filter_for_cube({}, "cube_001") is None + + def test_per_cube_with_empty_sub_filter(self): + f = {"cube_A": {}} + result = resolve_filter_for_cube(f, "cube_A") + assert result == {} + + def test_unified_and_empty_list(self): + f = {"and": []} + assert resolve_filter_for_cube(f, "any") is f From f7887eaba7778d178b879a9e6d6f5da8ae0edf83 Mon Sep 17 00:00:00 2001 From: Hustzdy <67457465+wustzdy@users.noreply.github.com> Date: Wed, 8 Apr 2026 19:12:47 +0800 Subject: [PATCH 2/4] fix: optimize dispatch task && update log level (#1437) --- .../mem_scheduler/base_mixins/queue_ops.py | 25 +++++++++- .../task_schedule_modules/dispatcher.py | 12 +++-- .../task_schedule_modules/local_queue.py | 20 ++++++-- .../task_schedule_modules/redis_queue.py | 35 ++++++++++---- .../task_schedule_modules/task_queue.py | 47 +++++++++++++------ 5 files changed, 108 insertions(+), 31 deletions(-) diff --git a/src/memos/mem_scheduler/base_mixins/queue_ops.py b/src/memos/mem_scheduler/base_mixins/queue_ops.py index 590189c24..13de79b3d 100644 --- a/src/memos/mem_scheduler/base_mixins/queue_ops.py +++ b/src/memos/mem_scheduler/base_mixins/queue_ops.py @@ -65,7 +65,13 @@ def submit_messages(self, messages: ScheduleMessageItem | list[ScheduleMessageIt logger.warning("status_tracker.task_submitted failed", exc_info=True) if self.disabled_handlers and msg.label in self.disabled_handlers: - logger.info("Skipping disabled handler: %s - %s", msg.label, msg.content) + logger.debug( + "Skip disabled handler. label=%s item_id=%s user_id=%s mem_cube_id=%s", + msg.label, + msg.item_id, + msg.user_id, + msg.mem_cube_id, + ) continue task_priority = self.orchestrator.get_task_priority(task_label=msg.label) @@ -74,6 +80,14 @@ def submit_messages(self, messages: ScheduleMessageItem | list[ScheduleMessageIt else: queued_msgs.append(msg) + logger.info( + "Submit scheduler messages summary. total=%s immediate=%s queued=%s queue_backend=%s", + len(messages), + len(immediate_msgs), + len(queued_msgs), + "redis_queue" if self.use_redis_queue else "local_queue", + ) + if immediate_msgs: for m in immediate_msgs: emit_monitor_event( @@ -199,6 +213,15 @@ def _message_consumer(self) -> None: if messages: self.dispatcher.on_messages_enqueued(messages) + if len(messages) >= self.consume_batch: + unique_labels = sorted({msg.label for msg in messages}) + logger.debug( + "Consumer dequeued batch. batch_size=%s consume_batch=%s unique_labels=%s queue_backend=%s", + len(messages), + self.consume_batch, + unique_labels, + "redis_queue" if self.use_redis_queue else "local_queue", + ) self.dispatcher.dispatch(messages) except Exception as e: logger.error("Error dispatching messages: %s", e) diff --git a/src/memos/mem_scheduler/task_schedule_modules/dispatcher.py b/src/memos/mem_scheduler/task_schedule_modules/dispatcher.py index 74ab15209..690c8d123 100644 --- a/src/memos/mem_scheduler/task_schedule_modules/dispatcher.py +++ b/src/memos/mem_scheduler/task_schedule_modules/dispatcher.py @@ -226,7 +226,7 @@ def wrapped_handler(messages: list[ScheduleMessageItem]): if task_item.item_id in self._running_tasks: task_item.mark_completed(result) del self._running_tasks[task_item.item_id] - logger.info(f"Task completed: {task_item.get_execution_info()}") + logger.debug(f"Task completed: {task_item.get_execution_info()}") return result except Exception as e: @@ -630,12 +630,12 @@ def execute_task( with self._task_lock: self._futures.add(future) future.add_done_callback(self._handle_future_result) - logger.info( + logger.debug( f"Dispatch {len(msgs)} message(s) to {task_label} handler for user {user_id} and mem_cube {mem_cube_id}." ) else: # For synchronous execution, the wrapper will run and remove the task upon completion - logger.info( + logger.debug( f"Execute {len(msgs)} message(s) synchronously for {task_label} for user {user_id} and mem_cube {mem_cube_id}." ) wrapped_handler(msgs) @@ -653,6 +653,12 @@ def dispatch(self, msg_list: list[ScheduleMessageItem]): # Group messages by user_id and mem_cube_id first user_cube_groups = group_messages_by_user_and_mem_cube(msg_list) + logger.info( + "Dispatcher received batch. total_messages=%s user_groups=%s unique_labels=%s", + len(msg_list), + len(user_cube_groups), + sorted({msg.label for msg in msg_list}), + ) # Process each user and mem_cube combination for user_id, cube_groups in user_cube_groups.items(): diff --git a/src/memos/mem_scheduler/task_schedule_modules/local_queue.py b/src/memos/mem_scheduler/task_schedule_modules/local_queue.py index 791cedf41..33d007313 100644 --- a/src/memos/mem_scheduler/task_schedule_modules/local_queue.py +++ b/src/memos/mem_scheduler/task_schedule_modules/local_queue.py @@ -95,8 +95,12 @@ def put( try: self.queue_streams[stream_key].put(item=message, block=block, timeout=timeout) - logger.info( - f"Message successfully put into queue '{stream_key}'. Current size: {self.queue_streams[stream_key].qsize()}" + logger.debug( + "Local queue enqueued. stream=%s size=%s label=%s item_id=%s", + stream_key, + self.queue_streams[stream_key].qsize(), + message.label, + message.item_id, ) except Exception as e: logger.error(f"Failed to put message into queue '{stream_key}': {e}", exc_info=True) @@ -117,7 +121,7 @@ def get( # Return empty list if queue does not exist if stream_key not in self.queue_streams: - logger.error(f"Stream {stream_key} does not exist when trying to get messages.") + logger.debug("Stream %s does not exist when trying to get messages", stream_key) return [] # Ensure we always request a batch so we get a list back @@ -174,6 +178,14 @@ def get_messages(self, batch_size: int) -> list[ScheduleMessageItem]: fetched = self.get_nowait(stream_key=stream_key, batch_size=needed) messages.extend(fetched) + if messages and len(messages) >= batch_size: + logger.debug( + "Local queue dequeued batch. batch_size=%s requested_batch_size=%s active_streams=%s", + len(messages), + batch_size, + len(stream_keys), + ) + return messages def qsize(self) -> dict: @@ -196,9 +208,11 @@ def clear(self, stream_key: str | None = None) -> None: if stream_key: if stream_key in self.queue_streams: self.queue_streams[stream_key].clear() + logger.info("Cleared local queue stream: %s", stream_key) else: for queue in self.queue_streams.values(): queue.clear() + logger.info("Cleared all local queue streams. stream_count=%s", len(self.queue_streams)) @property def unfinished_tasks(self) -> int: diff --git a/src/memos/mem_scheduler/task_schedule_modules/redis_queue.py b/src/memos/mem_scheduler/task_schedule_modules/redis_queue.py index dc7d86752..1277c5465 100644 --- a/src/memos/mem_scheduler/task_schedule_modules/redis_queue.py +++ b/src/memos/mem_scheduler/task_schedule_modules/redis_queue.py @@ -384,7 +384,16 @@ def get_messages(self, batch_size: int) -> list[ScheduleMessageItem]: if len(self.message_pack_cache) == 0: return [] else: - return self.message_pack_cache.popleft() + batch = self.message_pack_cache.popleft() + if len(batch) >= batch_size: + logger.debug( + "[REDIS_QUEUE] Dequeued batch. batch_size=%s requested_batch_size=%s cache_packs_remaining=%s stream_count=%s", + len(batch), + batch_size, + len(self.message_pack_cache), + len(self.get_stream_keys()), + ) + return batch def _ensure_consumer_group(self, stream_key) -> None: """Ensure the consumer group exists for the stream.""" @@ -449,9 +458,13 @@ def put( message_id = self._redis_conn.xadd( stream_key, message_data, maxlen=self.max_len, approximate=True ) - - logger.info( - f"Added message {message_id} to Redis stream: {message.label} - {message.content[:100]}..." + logger.debug( + "[REDIS_QUEUE] Enqueued message. message_id=%s stream=%s label=%s item_id=%s stream_cache_size=%s", + message_id, + stream_key, + message.label, + message.item_id, + len(self._stream_keys_cache), ) except Exception as e: @@ -494,7 +507,11 @@ def ack_message( # Optionally delete the message from the stream to keep it clean try: self._redis_conn.xdel(stream_key, redis_message_id) - logger.info(f"Successfully delete acknowledged message {redis_message_id}") + logger.debug( + "[REDIS_QUEUE] Ack/delete message. redis_message_id=%s stream=%s", + redis_message_id, + stream_key, + ) except Exception as e: logger.warning(f"Failed to delete acknowledged message {redis_message_id}: {e}") @@ -989,7 +1006,7 @@ def show_task_status(self, stream_key_prefix: str | None = None) -> dict[str, di ) stream_keys = self.get_stream_keys(stream_key_prefix=effective_prefix) if not stream_keys: - logger.info(f"No Redis streams found for the configured prefix: {effective_prefix}") + logger.debug(f"No Redis streams found for the configured prefix: {effective_prefix}") return {} grouped: dict[str, dict[str, int]] = {} @@ -1157,7 +1174,7 @@ def connect(self) -> None: self._redis_conn.ping() self._is_connected = True self._check_xautoclaim_support() - logger.debug("Redis connection established successfully") + logger.info("Redis connection established successfully") # Start stream keys refresher when connected self._start_stream_keys_refresh_thread() except Exception as e: @@ -1174,7 +1191,7 @@ def disconnect(self) -> None: self._stop_stream_keys_refresh_thread() if self._is_listening: self.stop_listening() - logger.debug("Disconnected from Redis") + logger.info("Disconnected from Redis") def __enter__(self): """Context manager entry.""" @@ -1379,7 +1396,7 @@ def _update_stream_cache_with_log( self._stream_keys_cache = active_stream_keys self._stream_keys_last_refresh = time.time() cache_count = len(self._stream_keys_cache) - logger.info( + logger.debug( f"Refreshed stream keys cache: {cache_count} active keys, " f"{deleted_count} deleted, {len(candidate_keys)} candidates examined." ) diff --git a/src/memos/mem_scheduler/task_schedule_modules/task_queue.py b/src/memos/mem_scheduler/task_schedule_modules/task_queue.py index b49db2b36..1f2e81bef 100644 --- a/src/memos/mem_scheduler/task_schedule_modules/task_queue.py +++ b/src/memos/mem_scheduler/task_schedule_modules/task_queue.py @@ -93,6 +93,9 @@ def submit_messages(self, messages: ScheduleMessageItem | list[ScheduleMessageIt """Submit messages to the message queue (either local queue or Redis).""" if isinstance(messages, ScheduleMessageItem): messages = [messages] + if len(messages) < 1: + logger.error("submit_messages called with empty payload") + return current_trace_id = get_current_trace_id() @@ -104,18 +107,25 @@ def submit_messages(self, messages: ScheduleMessageItem | list[ScheduleMessageIt user_id=msg.user_id, mem_cube_id=msg.mem_cube_id, task_label=msg.label ) - if len(messages) < 1: - logger.error("Submit empty") - elif len(messages) == 1: + if len(messages) == 1: if getattr(messages[0], "timestamp", None) is None: messages[0].timestamp = get_utc_now() - enqueue_ts = to_iso(getattr(messages[0], "timestamp", None)) - emit_monitor_event( - "enqueue", - messages[0], - {"enqueue_ts": enqueue_ts, "event_duration_ms": 0, "total_duration_ms": 0}, - ) - self.memos_message_queue.put(messages[0]) + if self.disabled_handlers and messages[0].label in self.disabled_handlers: + logger.debug( + "Skip disabled handler. label=%s item_id=%s user_id=%s mem_cube_id=%s", + messages[0].label, + messages[0].item_id, + messages[0].user_id, + messages[0].mem_cube_id, + ) + else: + enqueue_ts = to_iso(getattr(messages[0], "timestamp", None)) + emit_monitor_event( + "enqueue", + messages[0], + {"enqueue_ts": enqueue_ts, "event_duration_ms": 0, "total_duration_ms": 0}, + ) + self.memos_message_queue.put(messages[0]) else: user_cube_groups = group_messages_by_user_and_mem_cube(messages) @@ -132,8 +142,12 @@ def submit_messages(self, messages: ScheduleMessageItem | list[ScheduleMessageIt message.timestamp = get_utc_now() if self.disabled_handlers and message.label in self.disabled_handlers: - logger.info( - f"Skipping disabled handler: {message.label} - {message.content}" + logger.debug( + "Skip disabled handler. label=%s item_id=%s user_id=%s mem_cube_id=%s", + message.label, + message.item_id, + message.user_id, + message.mem_cube_id, ) continue @@ -148,9 +162,12 @@ def submit_messages(self, messages: ScheduleMessageItem | list[ScheduleMessageIt }, ) self.memos_message_queue.put(message) - logger.info( - f"Submitted message to local queue: {message.label} - {message.content}" - ) + + logger.info( + "Queue submit completed. backend=%s total=%s", + "redis_queue" if self.use_redis_queue else "local_queue", + len(messages), + ) def get_messages(self, batch_size: int) -> list[ScheduleMessageItem]: return self.memos_message_queue.get_messages(batch_size=batch_size) From 07c6fb6febd60307bdce3d1a95b8d3cb9e121202 Mon Sep 17 00:00:00 2001 From: Wenqiang Wei <46308778+endxxxx@users.noreply.github.com> Date: Fri, 10 Apr 2026 14:17:55 +0800 Subject: [PATCH 3/4] chore: change version number to v2.0.13 --- pyproject.toml | 2 +- src/memos/__init__.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index e7fca38ff..b04319f7d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ ############################################################################## name = "MemoryOS" -version = "2.0.12" +version = "2.0.13" description = "Intelligence Begins with Memory" license = {text = "Apache-2.0"} readme = "README.md" diff --git a/src/memos/__init__.py b/src/memos/__init__.py index 8687b9b9a..89e86a9df 100644 --- a/src/memos/__init__.py +++ b/src/memos/__init__.py @@ -1,4 +1,4 @@ -__version__ = "2.0.12" +__version__ = "2.0.13" from memos.configs.mem_cube import GeneralMemCubeConfig from memos.configs.mem_os import MOSConfig From d2e7a04fe3c0c5e45b136f6093bf384402141ea7 Mon Sep 17 00:00:00 2001 From: Wenqiang Wei <46308778+endxxxx@users.noreply.github.com> Date: Fri, 10 Apr 2026 15:27:12 +0800 Subject: [PATCH 4/4] style: apply ruff format to hermes adapters scripts --- apps/memos-local-plugin/adapters/hermes/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/memos-local-plugin/adapters/hermes/__init__.py b/apps/memos-local-plugin/adapters/hermes/__init__.py index 4e2f2d514..59e12c7c8 100644 --- a/apps/memos-local-plugin/adapters/hermes/__init__.py +++ b/apps/memos-local-plugin/adapters/hermes/__init__.py @@ -78,7 +78,7 @@ def _is_trivial(text: str) -> bool: if keys <= {"ok", "success", "status", "result", "error", "message"}: vals = list(obj.values()) if all( - isinstance(v, (bool, type(None))) or (isinstance(v, str) and len(v) < 20) + isinstance(v, bool | type(None)) or (isinstance(v, str) and len(v) < 20) for v in vals ): return True