@@ -275,9 +275,13 @@ public void assign(Collection<MessageQueue> mqs) {
275275
276276 void assign (Map <MessageQueue , FilterExpression > subscriptions ) {
277277 assign0 (subscriptions );
278+ // Notify the scanner to scan the assigned queue immediately.
278279 scanner .signal ();
279280 }
280281
282+ /**
283+ * Replace the current subscriptions with the latest subscriptions.
284+ */
281285 void assign0 (Map <MessageQueue , FilterExpression > subscriptions ) {
282286 checkNotNull (subscriptions , "subscriptions to be assigned should not be null" );
283287 checkArgument (!subscriptions .isEmpty (), "subscription should not be empty" );
@@ -502,7 +506,7 @@ protected void startUp() throws Exception {
502506 try {
503507 commit ();
504508 } catch (Throwable t ) {
505- log .error ("Failed to commit offset, clientId={}" , clientId , t );
509+ log .error ("Failed to commit offset for pull consumer , clientId={}" , clientId , t );
506510 }
507511 }, AUTO_COMMIT_DELAY .toNanos (), autoCommitInterval .toNanos (), TimeUnit .NANOSECONDS );
508512 }
@@ -560,7 +564,6 @@ public void onTopicRouteDataUpdate0(String topic, TopicRouteData topicRouteData)
560564 public void doStats () {
561565 final long pullTimes = this .pullTimes .getAndSet (0 );
562566 final long pulledMessagesQuantity = this .pulledMessagesQuantity .getAndSet (0 );
563-
564567 log .info ("clientId={}, consumerGroup={}, pullTimes={}, pulledMessageQuantity={}" , clientId , consumerGroup ,
565568 pullTimes , pulledMessagesQuantity );
566569 processQueueTable .values ().forEach (ProcessQueue ::doStats );
0 commit comments