diff --git a/service/history/replication/stream_sender.go b/service/history/replication/stream_sender.go index ed67d3e9bf9..928034359f5 100644 --- a/service/history/replication/stream_sender.go +++ b/service/history/replication/stream_sender.go @@ -559,7 +559,7 @@ Loop: }() task, err := s.taskConverter.Convert(item, s.clientShardKey.ClusterID, priority) if err != nil { - return err + return s.recordRetry(item, attempt, fmt.Errorf("convert: %w", err)) } if task == nil { return nil @@ -590,7 +590,7 @@ Loop: 0, "", )); err != nil { - return err + return s.recordRetry(item, attempt, fmt.Errorf("rate_limit: %w", err)) } metrics.ReplicationRateLimitLatency.With(s.metrics).Record(time.Since(rlStartTime), metrics.OperationTag(TaskOperationTag(task))) } @@ -604,7 +604,7 @@ Loop: }, }, }); err != nil { - return err + return s.recordRetry(item, attempt, fmt.Errorf("send: %w", err)) } skipCount = 0 metrics.ReplicationTasksSend.With(s.metrics).Record( @@ -731,3 +731,18 @@ func (s *StreamSenderImpl) getTaskTargetCluster(task tasks.Task) []string { return nil } } + +func (s *StreamSenderImpl) recordRetry( + item tasks.Task, + attempt int64, + err error, +) error { + s.shardContext.GetThrottledLogger().Warn("Replication task send retry", + tag.TaskID(item.GetTaskID()), + tag.WorkflowNamespaceID(item.GetNamespaceID()), + tag.WorkflowID(item.GetWorkflowID()), + tag.Counter(int(attempt)), + tag.Error(err), + ) + return err +}