From 304d21abec893d12489ee9a0b746c79e2d8a3f26 Mon Sep 17 00:00:00 2001 From: jiechenz Date: Thu, 2 Apr 2026 21:48:01 -0700 Subject: [PATCH 1/2] Add retry logging for replication task send attempts --- service/history/replication/stream_sender.go | 23 +++++++++++++++++--- 1 file changed, 20 insertions(+), 3 deletions(-) diff --git a/service/history/replication/stream_sender.go b/service/history/replication/stream_sender.go index ed67d3e9bf9..bdcc3340fb9 100644 --- a/service/history/replication/stream_sender.go +++ b/service/history/replication/stream_sender.go @@ -51,6 +51,7 @@ type ( taskConverter SourceTaskConverter metrics metrics.Handler logger log.Logger + throttledLogger log.Logger status int32 clientClusterName string clientShardKey ClusterShardKey @@ -92,6 +93,7 @@ func NewStreamSender( taskConverter: taskConverter, metrics: shardContext.GetMetricsHandler(), logger: logger, + throttledLogger: log.NewThrottledLogger(logger, func() float64 { return float64(config.ThrottledLogRPS()) }), status: common.DaemonStatusInitialized, clientClusterName: clientClusterName, clientShardKey: clientShardKey, @@ -559,7 +561,7 @@ Loop: }() task, err := s.taskConverter.Convert(item, s.clientShardKey.ClusterID, priority) if err != nil { - return err + return s.recordRetry(item, attempt, fmt.Errorf("convert: %w", err)) } if task == nil { return nil @@ -590,7 +592,7 @@ Loop: 0, "", )); err != nil { - return err + return s.recordRetry(item, attempt, fmt.Errorf("rate_limit: %w", err)) } metrics.ReplicationRateLimitLatency.With(s.metrics).Record(time.Since(rlStartTime), metrics.OperationTag(TaskOperationTag(task))) } @@ -604,7 +606,7 @@ Loop: }, }, }); err != nil { - return err + return s.recordRetry(item, attempt, fmt.Errorf("send: %w", err)) } skipCount = 0 metrics.ReplicationTasksSend.With(s.metrics).Record( @@ -731,3 +733,18 @@ func (s *StreamSenderImpl) getTaskTargetCluster(task tasks.Task) []string { return nil } } + +func (s *StreamSenderImpl) recordRetry( + item tasks.Task, + attempt int64, + err error, +) error { + s.throttledLogger.Warn("Replication task send retry", + tag.TaskID(item.GetTaskID()), + tag.WorkflowNamespaceID(item.GetNamespaceID()), + tag.WorkflowID(item.GetWorkflowID()), + tag.Counter(int(attempt)), + tag.Error(err), + ) + return err +} From 71adddf4dd7df3589749dcd3bb4690f8f38b4bc6 Mon Sep 17 00:00:00 2001 From: jiechenz Date: Fri, 3 Apr 2026 11:04:18 -0700 Subject: [PATCH 2/2] use throttledLogger from shardContext --- service/history/replication/stream_sender.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/service/history/replication/stream_sender.go b/service/history/replication/stream_sender.go index bdcc3340fb9..928034359f5 100644 --- a/service/history/replication/stream_sender.go +++ b/service/history/replication/stream_sender.go @@ -51,7 +51,6 @@ type ( taskConverter SourceTaskConverter metrics metrics.Handler logger log.Logger - throttledLogger log.Logger status int32 clientClusterName string clientShardKey ClusterShardKey @@ -93,7 +92,6 @@ func NewStreamSender( taskConverter: taskConverter, metrics: shardContext.GetMetricsHandler(), logger: logger, - throttledLogger: log.NewThrottledLogger(logger, func() float64 { return float64(config.ThrottledLogRPS()) }), status: common.DaemonStatusInitialized, clientClusterName: clientClusterName, clientShardKey: clientShardKey, @@ -739,7 +737,7 @@ func (s *StreamSenderImpl) recordRetry( attempt int64, err error, ) error { - s.throttledLogger.Warn("Replication task send retry", + s.shardContext.GetThrottledLogger().Warn("Replication task send retry", tag.TaskID(item.GetTaskID()), tag.WorkflowNamespaceID(item.GetNamespaceID()), tag.WorkflowID(item.GetWorkflowID()),