Skip to content

Commit f7887ea

Browse files
authored
fix: optimize dispatch task && update log level (#1437)
1 parent 3069462 commit f7887ea

File tree

5 files changed

+108
-31
lines changed

5 files changed

+108
-31
lines changed

src/memos/mem_scheduler/base_mixins/queue_ops.py

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,13 @@ def submit_messages(self, messages: ScheduleMessageItem | list[ScheduleMessageIt
6565
logger.warning("status_tracker.task_submitted failed", exc_info=True)
6666

6767
if self.disabled_handlers and msg.label in self.disabled_handlers:
68-
logger.info("Skipping disabled handler: %s - %s", msg.label, msg.content)
68+
logger.debug(
69+
"Skip disabled handler. label=%s item_id=%s user_id=%s mem_cube_id=%s",
70+
msg.label,
71+
msg.item_id,
72+
msg.user_id,
73+
msg.mem_cube_id,
74+
)
6975
continue
7076

7177
task_priority = self.orchestrator.get_task_priority(task_label=msg.label)
@@ -74,6 +80,14 @@ def submit_messages(self, messages: ScheduleMessageItem | list[ScheduleMessageIt
7480
else:
7581
queued_msgs.append(msg)
7682

83+
logger.info(
84+
"Submit scheduler messages summary. total=%s immediate=%s queued=%s queue_backend=%s",
85+
len(messages),
86+
len(immediate_msgs),
87+
len(queued_msgs),
88+
"redis_queue" if self.use_redis_queue else "local_queue",
89+
)
90+
7791
if immediate_msgs:
7892
for m in immediate_msgs:
7993
emit_monitor_event(
@@ -199,6 +213,15 @@ def _message_consumer(self) -> None:
199213
if messages:
200214
self.dispatcher.on_messages_enqueued(messages)
201215

216+
if len(messages) >= self.consume_batch:
217+
unique_labels = sorted({msg.label for msg in messages})
218+
logger.debug(
219+
"Consumer dequeued batch. batch_size=%s consume_batch=%s unique_labels=%s queue_backend=%s",
220+
len(messages),
221+
self.consume_batch,
222+
unique_labels,
223+
"redis_queue" if self.use_redis_queue else "local_queue",
224+
)
202225
self.dispatcher.dispatch(messages)
203226
except Exception as e:
204227
logger.error("Error dispatching messages: %s", e)

src/memos/mem_scheduler/task_schedule_modules/dispatcher.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,7 @@ def wrapped_handler(messages: list[ScheduleMessageItem]):
226226
if task_item.item_id in self._running_tasks:
227227
task_item.mark_completed(result)
228228
del self._running_tasks[task_item.item_id]
229-
logger.info(f"Task completed: {task_item.get_execution_info()}")
229+
logger.debug(f"Task completed: {task_item.get_execution_info()}")
230230
return result
231231

232232
except Exception as e:
@@ -630,12 +630,12 @@ def execute_task(
630630
with self._task_lock:
631631
self._futures.add(future)
632632
future.add_done_callback(self._handle_future_result)
633-
logger.info(
633+
logger.debug(
634634
f"Dispatch {len(msgs)} message(s) to {task_label} handler for user {user_id} and mem_cube {mem_cube_id}."
635635
)
636636
else:
637637
# For synchronous execution, the wrapper will run and remove the task upon completion
638-
logger.info(
638+
logger.debug(
639639
f"Execute {len(msgs)} message(s) synchronously for {task_label} for user {user_id} and mem_cube {mem_cube_id}."
640640
)
641641
wrapped_handler(msgs)
@@ -653,6 +653,12 @@ def dispatch(self, msg_list: list[ScheduleMessageItem]):
653653

654654
# Group messages by user_id and mem_cube_id first
655655
user_cube_groups = group_messages_by_user_and_mem_cube(msg_list)
656+
logger.info(
657+
"Dispatcher received batch. total_messages=%s user_groups=%s unique_labels=%s",
658+
len(msg_list),
659+
len(user_cube_groups),
660+
sorted({msg.label for msg in msg_list}),
661+
)
656662

657663
# Process each user and mem_cube combination
658664
for user_id, cube_groups in user_cube_groups.items():

src/memos/mem_scheduler/task_schedule_modules/local_queue.py

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -95,8 +95,12 @@ def put(
9595

9696
try:
9797
self.queue_streams[stream_key].put(item=message, block=block, timeout=timeout)
98-
logger.info(
99-
f"Message successfully put into queue '{stream_key}'. Current size: {self.queue_streams[stream_key].qsize()}"
98+
logger.debug(
99+
"Local queue enqueued. stream=%s size=%s label=%s item_id=%s",
100+
stream_key,
101+
self.queue_streams[stream_key].qsize(),
102+
message.label,
103+
message.item_id,
100104
)
101105
except Exception as e:
102106
logger.error(f"Failed to put message into queue '{stream_key}': {e}", exc_info=True)
@@ -117,7 +121,7 @@ def get(
117121

118122
# Return empty list if queue does not exist
119123
if stream_key not in self.queue_streams:
120-
logger.error(f"Stream {stream_key} does not exist when trying to get messages.")
124+
logger.debug("Stream %s does not exist when trying to get messages", stream_key)
121125
return []
122126

123127
# Ensure we always request a batch so we get a list back
@@ -174,6 +178,14 @@ def get_messages(self, batch_size: int) -> list[ScheduleMessageItem]:
174178
fetched = self.get_nowait(stream_key=stream_key, batch_size=needed)
175179
messages.extend(fetched)
176180

181+
if messages and len(messages) >= batch_size:
182+
logger.debug(
183+
"Local queue dequeued batch. batch_size=%s requested_batch_size=%s active_streams=%s",
184+
len(messages),
185+
batch_size,
186+
len(stream_keys),
187+
)
188+
177189
return messages
178190

179191
def qsize(self) -> dict:
@@ -196,9 +208,11 @@ def clear(self, stream_key: str | None = None) -> None:
196208
if stream_key:
197209
if stream_key in self.queue_streams:
198210
self.queue_streams[stream_key].clear()
211+
logger.info("Cleared local queue stream: %s", stream_key)
199212
else:
200213
for queue in self.queue_streams.values():
201214
queue.clear()
215+
logger.info("Cleared all local queue streams. stream_count=%s", len(self.queue_streams))
202216

203217
@property
204218
def unfinished_tasks(self) -> int:

src/memos/mem_scheduler/task_schedule_modules/redis_queue.py

Lines changed: 26 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -384,7 +384,16 @@ def get_messages(self, batch_size: int) -> list[ScheduleMessageItem]:
384384
if len(self.message_pack_cache) == 0:
385385
return []
386386
else:
387-
return self.message_pack_cache.popleft()
387+
batch = self.message_pack_cache.popleft()
388+
if len(batch) >= batch_size:
389+
logger.debug(
390+
"[REDIS_QUEUE] Dequeued batch. batch_size=%s requested_batch_size=%s cache_packs_remaining=%s stream_count=%s",
391+
len(batch),
392+
batch_size,
393+
len(self.message_pack_cache),
394+
len(self.get_stream_keys()),
395+
)
396+
return batch
388397

389398
def _ensure_consumer_group(self, stream_key) -> None:
390399
"""Ensure the consumer group exists for the stream."""
@@ -449,9 +458,13 @@ def put(
449458
message_id = self._redis_conn.xadd(
450459
stream_key, message_data, maxlen=self.max_len, approximate=True
451460
)
452-
453-
logger.info(
454-
f"Added message {message_id} to Redis stream: {message.label} - {message.content[:100]}..."
461+
logger.debug(
462+
"[REDIS_QUEUE] Enqueued message. message_id=%s stream=%s label=%s item_id=%s stream_cache_size=%s",
463+
message_id,
464+
stream_key,
465+
message.label,
466+
message.item_id,
467+
len(self._stream_keys_cache),
455468
)
456469

457470
except Exception as e:
@@ -494,7 +507,11 @@ def ack_message(
494507
# Optionally delete the message from the stream to keep it clean
495508
try:
496509
self._redis_conn.xdel(stream_key, redis_message_id)
497-
logger.info(f"Successfully delete acknowledged message {redis_message_id}")
510+
logger.debug(
511+
"[REDIS_QUEUE] Ack/delete message. redis_message_id=%s stream=%s",
512+
redis_message_id,
513+
stream_key,
514+
)
498515
except Exception as e:
499516
logger.warning(f"Failed to delete acknowledged message {redis_message_id}: {e}")
500517

@@ -989,7 +1006,7 @@ def show_task_status(self, stream_key_prefix: str | None = None) -> dict[str, di
9891006
)
9901007
stream_keys = self.get_stream_keys(stream_key_prefix=effective_prefix)
9911008
if not stream_keys:
992-
logger.info(f"No Redis streams found for the configured prefix: {effective_prefix}")
1009+
logger.debug(f"No Redis streams found for the configured prefix: {effective_prefix}")
9931010
return {}
9941011

9951012
grouped: dict[str, dict[str, int]] = {}
@@ -1157,7 +1174,7 @@ def connect(self) -> None:
11571174
self._redis_conn.ping()
11581175
self._is_connected = True
11591176
self._check_xautoclaim_support()
1160-
logger.debug("Redis connection established successfully")
1177+
logger.info("Redis connection established successfully")
11611178
# Start stream keys refresher when connected
11621179
self._start_stream_keys_refresh_thread()
11631180
except Exception as e:
@@ -1174,7 +1191,7 @@ def disconnect(self) -> None:
11741191
self._stop_stream_keys_refresh_thread()
11751192
if self._is_listening:
11761193
self.stop_listening()
1177-
logger.debug("Disconnected from Redis")
1194+
logger.info("Disconnected from Redis")
11781195

11791196
def __enter__(self):
11801197
"""Context manager entry."""
@@ -1379,7 +1396,7 @@ def _update_stream_cache_with_log(
13791396
self._stream_keys_cache = active_stream_keys
13801397
self._stream_keys_last_refresh = time.time()
13811398
cache_count = len(self._stream_keys_cache)
1382-
logger.info(
1399+
logger.debug(
13831400
f"Refreshed stream keys cache: {cache_count} active keys, "
13841401
f"{deleted_count} deleted, {len(candidate_keys)} candidates examined."
13851402
)

src/memos/mem_scheduler/task_schedule_modules/task_queue.py

Lines changed: 32 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,9 @@ def submit_messages(self, messages: ScheduleMessageItem | list[ScheduleMessageIt
9393
"""Submit messages to the message queue (either local queue or Redis)."""
9494
if isinstance(messages, ScheduleMessageItem):
9595
messages = [messages]
96+
if len(messages) < 1:
97+
logger.error("submit_messages called with empty payload")
98+
return
9699

97100
current_trace_id = get_current_trace_id()
98101

@@ -104,18 +107,25 @@ def submit_messages(self, messages: ScheduleMessageItem | list[ScheduleMessageIt
104107
user_id=msg.user_id, mem_cube_id=msg.mem_cube_id, task_label=msg.label
105108
)
106109

107-
if len(messages) < 1:
108-
logger.error("Submit empty")
109-
elif len(messages) == 1:
110+
if len(messages) == 1:
110111
if getattr(messages[0], "timestamp", None) is None:
111112
messages[0].timestamp = get_utc_now()
112-
enqueue_ts = to_iso(getattr(messages[0], "timestamp", None))
113-
emit_monitor_event(
114-
"enqueue",
115-
messages[0],
116-
{"enqueue_ts": enqueue_ts, "event_duration_ms": 0, "total_duration_ms": 0},
117-
)
118-
self.memos_message_queue.put(messages[0])
113+
if self.disabled_handlers and messages[0].label in self.disabled_handlers:
114+
logger.debug(
115+
"Skip disabled handler. label=%s item_id=%s user_id=%s mem_cube_id=%s",
116+
messages[0].label,
117+
messages[0].item_id,
118+
messages[0].user_id,
119+
messages[0].mem_cube_id,
120+
)
121+
else:
122+
enqueue_ts = to_iso(getattr(messages[0], "timestamp", None))
123+
emit_monitor_event(
124+
"enqueue",
125+
messages[0],
126+
{"enqueue_ts": enqueue_ts, "event_duration_ms": 0, "total_duration_ms": 0},
127+
)
128+
self.memos_message_queue.put(messages[0])
119129
else:
120130
user_cube_groups = group_messages_by_user_and_mem_cube(messages)
121131

@@ -132,8 +142,12 @@ def submit_messages(self, messages: ScheduleMessageItem | list[ScheduleMessageIt
132142
message.timestamp = get_utc_now()
133143

134144
if self.disabled_handlers and message.label in self.disabled_handlers:
135-
logger.info(
136-
f"Skipping disabled handler: {message.label} - {message.content}"
145+
logger.debug(
146+
"Skip disabled handler. label=%s item_id=%s user_id=%s mem_cube_id=%s",
147+
message.label,
148+
message.item_id,
149+
message.user_id,
150+
message.mem_cube_id,
137151
)
138152
continue
139153

@@ -148,9 +162,12 @@ def submit_messages(self, messages: ScheduleMessageItem | list[ScheduleMessageIt
148162
},
149163
)
150164
self.memos_message_queue.put(message)
151-
logger.info(
152-
f"Submitted message to local queue: {message.label} - {message.content}"
153-
)
165+
166+
logger.info(
167+
"Queue submit completed. backend=%s total=%s",
168+
"redis_queue" if self.use_redis_queue else "local_queue",
169+
len(messages),
170+
)
154171

155172
def get_messages(self, batch_size: int) -> list[ScheduleMessageItem]:
156173
return self.memos_message_queue.get_messages(batch_size=batch_size)

0 commit comments

Comments
 (0)