diff --git a/src/memos/mem_scheduler/base_mixins/queue_ops.py b/src/memos/mem_scheduler/base_mixins/queue_ops.py index 590189c24..13de79b3d 100644 --- a/src/memos/mem_scheduler/base_mixins/queue_ops.py +++ b/src/memos/mem_scheduler/base_mixins/queue_ops.py @@ -65,7 +65,13 @@ def submit_messages(self, messages: ScheduleMessageItem | list[ScheduleMessageIt logger.warning("status_tracker.task_submitted failed", exc_info=True) if self.disabled_handlers and msg.label in self.disabled_handlers: - logger.info("Skipping disabled handler: %s - %s", msg.label, msg.content) + logger.debug( + "Skip disabled handler. label=%s item_id=%s user_id=%s mem_cube_id=%s", + msg.label, + msg.item_id, + msg.user_id, + msg.mem_cube_id, + ) continue task_priority = self.orchestrator.get_task_priority(task_label=msg.label) @@ -74,6 +80,14 @@ def submit_messages(self, messages: ScheduleMessageItem | list[ScheduleMessageIt else: queued_msgs.append(msg) + logger.info( + "Submit scheduler messages summary. total=%s immediate=%s queued=%s queue_backend=%s", + len(messages), + len(immediate_msgs), + len(queued_msgs), + "redis_queue" if self.use_redis_queue else "local_queue", + ) + if immediate_msgs: for m in immediate_msgs: emit_monitor_event( @@ -199,6 +213,15 @@ def _message_consumer(self) -> None: if messages: self.dispatcher.on_messages_enqueued(messages) + if len(messages) >= self.consume_batch: + unique_labels = sorted({msg.label for msg in messages}) + logger.debug( + "Consumer dequeued batch. batch_size=%s consume_batch=%s unique_labels=%s queue_backend=%s", + len(messages), + self.consume_batch, + unique_labels, + "redis_queue" if self.use_redis_queue else "local_queue", + ) self.dispatcher.dispatch(messages) except Exception as e: logger.error("Error dispatching messages: %s", e) diff --git a/src/memos/mem_scheduler/task_schedule_modules/dispatcher.py b/src/memos/mem_scheduler/task_schedule_modules/dispatcher.py index 74ab15209..690c8d123 100644 --- a/src/memos/mem_scheduler/task_schedule_modules/dispatcher.py +++ b/src/memos/mem_scheduler/task_schedule_modules/dispatcher.py @@ -226,7 +226,7 @@ def wrapped_handler(messages: list[ScheduleMessageItem]): if task_item.item_id in self._running_tasks: task_item.mark_completed(result) del self._running_tasks[task_item.item_id] - logger.info(f"Task completed: {task_item.get_execution_info()}") + logger.debug(f"Task completed: {task_item.get_execution_info()}") return result except Exception as e: @@ -630,12 +630,12 @@ def execute_task( with self._task_lock: self._futures.add(future) future.add_done_callback(self._handle_future_result) - logger.info( + logger.debug( f"Dispatch {len(msgs)} message(s) to {task_label} handler for user {user_id} and mem_cube {mem_cube_id}." ) else: # For synchronous execution, the wrapper will run and remove the task upon completion - logger.info( + logger.debug( f"Execute {len(msgs)} message(s) synchronously for {task_label} for user {user_id} and mem_cube {mem_cube_id}." ) wrapped_handler(msgs) @@ -653,6 +653,12 @@ def dispatch(self, msg_list: list[ScheduleMessageItem]): # Group messages by user_id and mem_cube_id first user_cube_groups = group_messages_by_user_and_mem_cube(msg_list) + logger.info( + "Dispatcher received batch. total_messages=%s user_groups=%s unique_labels=%s", + len(msg_list), + len(user_cube_groups), + sorted({msg.label for msg in msg_list}), + ) # Process each user and mem_cube combination for user_id, cube_groups in user_cube_groups.items(): diff --git a/src/memos/mem_scheduler/task_schedule_modules/local_queue.py b/src/memos/mem_scheduler/task_schedule_modules/local_queue.py index 791cedf41..33d007313 100644 --- a/src/memos/mem_scheduler/task_schedule_modules/local_queue.py +++ b/src/memos/mem_scheduler/task_schedule_modules/local_queue.py @@ -95,8 +95,12 @@ def put( try: self.queue_streams[stream_key].put(item=message, block=block, timeout=timeout) - logger.info( - f"Message successfully put into queue '{stream_key}'. Current size: {self.queue_streams[stream_key].qsize()}" + logger.debug( + "Local queue enqueued. stream=%s size=%s label=%s item_id=%s", + stream_key, + self.queue_streams[stream_key].qsize(), + message.label, + message.item_id, ) except Exception as e: logger.error(f"Failed to put message into queue '{stream_key}': {e}", exc_info=True) @@ -117,7 +121,7 @@ def get( # Return empty list if queue does not exist if stream_key not in self.queue_streams: - logger.error(f"Stream {stream_key} does not exist when trying to get messages.") + logger.debug("Stream %s does not exist when trying to get messages", stream_key) return [] # Ensure we always request a batch so we get a list back @@ -174,6 +178,14 @@ def get_messages(self, batch_size: int) -> list[ScheduleMessageItem]: fetched = self.get_nowait(stream_key=stream_key, batch_size=needed) messages.extend(fetched) + if messages and len(messages) >= batch_size: + logger.debug( + "Local queue dequeued batch. batch_size=%s requested_batch_size=%s active_streams=%s", + len(messages), + batch_size, + len(stream_keys), + ) + return messages def qsize(self) -> dict: @@ -196,9 +208,11 @@ def clear(self, stream_key: str | None = None) -> None: if stream_key: if stream_key in self.queue_streams: self.queue_streams[stream_key].clear() + logger.info("Cleared local queue stream: %s", stream_key) else: for queue in self.queue_streams.values(): queue.clear() + logger.info("Cleared all local queue streams. stream_count=%s", len(self.queue_streams)) @property def unfinished_tasks(self) -> int: diff --git a/src/memos/mem_scheduler/task_schedule_modules/redis_queue.py b/src/memos/mem_scheduler/task_schedule_modules/redis_queue.py index dc7d86752..1277c5465 100644 --- a/src/memos/mem_scheduler/task_schedule_modules/redis_queue.py +++ b/src/memos/mem_scheduler/task_schedule_modules/redis_queue.py @@ -384,7 +384,16 @@ def get_messages(self, batch_size: int) -> list[ScheduleMessageItem]: if len(self.message_pack_cache) == 0: return [] else: - return self.message_pack_cache.popleft() + batch = self.message_pack_cache.popleft() + if len(batch) >= batch_size: + logger.debug( + "[REDIS_QUEUE] Dequeued batch. batch_size=%s requested_batch_size=%s cache_packs_remaining=%s stream_count=%s", + len(batch), + batch_size, + len(self.message_pack_cache), + len(self.get_stream_keys()), + ) + return batch def _ensure_consumer_group(self, stream_key) -> None: """Ensure the consumer group exists for the stream.""" @@ -449,9 +458,13 @@ def put( message_id = self._redis_conn.xadd( stream_key, message_data, maxlen=self.max_len, approximate=True ) - - logger.info( - f"Added message {message_id} to Redis stream: {message.label} - {message.content[:100]}..." + logger.debug( + "[REDIS_QUEUE] Enqueued message. message_id=%s stream=%s label=%s item_id=%s stream_cache_size=%s", + message_id, + stream_key, + message.label, + message.item_id, + len(self._stream_keys_cache), ) except Exception as e: @@ -494,7 +507,11 @@ def ack_message( # Optionally delete the message from the stream to keep it clean try: self._redis_conn.xdel(stream_key, redis_message_id) - logger.info(f"Successfully delete acknowledged message {redis_message_id}") + logger.debug( + "[REDIS_QUEUE] Ack/delete message. redis_message_id=%s stream=%s", + redis_message_id, + stream_key, + ) except Exception as e: logger.warning(f"Failed to delete acknowledged message {redis_message_id}: {e}") @@ -989,7 +1006,7 @@ def show_task_status(self, stream_key_prefix: str | None = None) -> dict[str, di ) stream_keys = self.get_stream_keys(stream_key_prefix=effective_prefix) if not stream_keys: - logger.info(f"No Redis streams found for the configured prefix: {effective_prefix}") + logger.debug(f"No Redis streams found for the configured prefix: {effective_prefix}") return {} grouped: dict[str, dict[str, int]] = {} @@ -1157,7 +1174,7 @@ def connect(self) -> None: self._redis_conn.ping() self._is_connected = True self._check_xautoclaim_support() - logger.debug("Redis connection established successfully") + logger.info("Redis connection established successfully") # Start stream keys refresher when connected self._start_stream_keys_refresh_thread() except Exception as e: @@ -1174,7 +1191,7 @@ def disconnect(self) -> None: self._stop_stream_keys_refresh_thread() if self._is_listening: self.stop_listening() - logger.debug("Disconnected from Redis") + logger.info("Disconnected from Redis") def __enter__(self): """Context manager entry.""" @@ -1379,7 +1396,7 @@ def _update_stream_cache_with_log( self._stream_keys_cache = active_stream_keys self._stream_keys_last_refresh = time.time() cache_count = len(self._stream_keys_cache) - logger.info( + logger.debug( f"Refreshed stream keys cache: {cache_count} active keys, " f"{deleted_count} deleted, {len(candidate_keys)} candidates examined." ) diff --git a/src/memos/mem_scheduler/task_schedule_modules/task_queue.py b/src/memos/mem_scheduler/task_schedule_modules/task_queue.py index b49db2b36..1f2e81bef 100644 --- a/src/memos/mem_scheduler/task_schedule_modules/task_queue.py +++ b/src/memos/mem_scheduler/task_schedule_modules/task_queue.py @@ -93,6 +93,9 @@ def submit_messages(self, messages: ScheduleMessageItem | list[ScheduleMessageIt """Submit messages to the message queue (either local queue or Redis).""" if isinstance(messages, ScheduleMessageItem): messages = [messages] + if len(messages) < 1: + logger.error("submit_messages called with empty payload") + return current_trace_id = get_current_trace_id() @@ -104,18 +107,25 @@ def submit_messages(self, messages: ScheduleMessageItem | list[ScheduleMessageIt user_id=msg.user_id, mem_cube_id=msg.mem_cube_id, task_label=msg.label ) - if len(messages) < 1: - logger.error("Submit empty") - elif len(messages) == 1: + if len(messages) == 1: if getattr(messages[0], "timestamp", None) is None: messages[0].timestamp = get_utc_now() - enqueue_ts = to_iso(getattr(messages[0], "timestamp", None)) - emit_monitor_event( - "enqueue", - messages[0], - {"enqueue_ts": enqueue_ts, "event_duration_ms": 0, "total_duration_ms": 0}, - ) - self.memos_message_queue.put(messages[0]) + if self.disabled_handlers and messages[0].label in self.disabled_handlers: + logger.debug( + "Skip disabled handler. label=%s item_id=%s user_id=%s mem_cube_id=%s", + messages[0].label, + messages[0].item_id, + messages[0].user_id, + messages[0].mem_cube_id, + ) + else: + enqueue_ts = to_iso(getattr(messages[0], "timestamp", None)) + emit_monitor_event( + "enqueue", + messages[0], + {"enqueue_ts": enqueue_ts, "event_duration_ms": 0, "total_duration_ms": 0}, + ) + self.memos_message_queue.put(messages[0]) else: user_cube_groups = group_messages_by_user_and_mem_cube(messages) @@ -132,8 +142,12 @@ def submit_messages(self, messages: ScheduleMessageItem | list[ScheduleMessageIt message.timestamp = get_utc_now() if self.disabled_handlers and message.label in self.disabled_handlers: - logger.info( - f"Skipping disabled handler: {message.label} - {message.content}" + logger.debug( + "Skip disabled handler. label=%s item_id=%s user_id=%s mem_cube_id=%s", + message.label, + message.item_id, + message.user_id, + message.mem_cube_id, ) continue @@ -148,9 +162,12 @@ def submit_messages(self, messages: ScheduleMessageItem | list[ScheduleMessageIt }, ) self.memos_message_queue.put(message) - logger.info( - f"Submitted message to local queue: {message.label} - {message.content}" - ) + + logger.info( + "Queue submit completed. backend=%s total=%s", + "redis_queue" if self.use_redis_queue else "local_queue", + len(messages), + ) def get_messages(self, batch_size: int) -> list[ScheduleMessageItem]: return self.memos_message_queue.get_messages(batch_size=batch_size)