diff --git a/service/history/ndc/workflow_state_replicator.go b/service/history/ndc/workflow_state_replicator.go index c9d3ba35413..8b0419c7890 100644 --- a/service/history/ndc/workflow_state_replicator.go +++ b/service/history/ndc/workflow_state_replicator.go @@ -793,7 +793,7 @@ func (r *WorkflowStateReplicatorImpl) applySnapshot( } snapshot := attribute.State if localMutableState == nil { - return r.applySnapshotWhenWorkflowNotExist( + err := r.applySnapshotWhenWorkflowNotExist( ctx, namespaceID, workflowID, @@ -807,6 +807,31 @@ func (r *WorkflowStateReplicatorImpl) applySnapshot( true, versionedTransition.IsCloseTransferTaskAcked && versionedTransition.IsForceReplication, ) + if err != nil { + if !errors.Is(err, consts.ErrDuplicate) { + return err + } + // Handle duplicate error + ms, msErr := wfCtx.LoadMutableState(ctx, r.shardContext) + switch msErr.(type) { + case *serviceerror.NotFound: + return err + case nil: + // The previous run may replicate the first batch of workflow + // and this mutable state may update after acquire the workflow lock. + // Retry to apply snapshot with mutable state + localMutableState = ms + r.logger.Warn("duplicate error applying snapshot for new workflow; mutable state already exists, retrying with existing state", + tag.WorkflowNamespaceID(namespaceID.String()), + tag.WorkflowID(workflowID), + tag.WorkflowRunID(runID), + tag.ArchetypeID(archetypeID), + tag.Error(err), + ) + default: + return err + } + } } return r.applySnapshotWhenWorkflowExist( ctx, diff --git a/service/history/replication/raw_task_converter.go b/service/history/replication/raw_task_converter.go index eb7cb451669..7089f55cf79 100644 --- a/service/history/replication/raw_task_converter.go +++ b/service/history/replication/raw_task_converter.go @@ -685,7 +685,7 @@ func (c *syncVersionedTransitionTaskConverter) convert( progress := c.replicationCache.Get(taskInfo.RunID, targetClusterID) - if progress.VersionedTransitionSent(taskInfo.VersionedTransition) { + if progress != nil && progress.VersionedTransitionSent(taskInfo.VersionedTransition) { return c.generateVerifyVersionedTransitionTask(taskInfo, mutableState) } diff --git a/service/history/replication/sequential_queue.go b/service/history/replication/sequential_queue.go index 984c7bb3db2..21da8213058 100644 --- a/service/history/replication/sequential_queue.go +++ b/service/history/replication/sequential_queue.go @@ -18,16 +18,6 @@ type ( } ) -func NewSequentialTaskQueue(task TrackableExecutableTask) ctasks.SequentialTaskQueue[TrackableExecutableTask] { - return &SequentialTaskQueue{ - id: task.QueueID(), - - taskQueue: collection.NewPriorityQueue[TrackableExecutableTask]( - SequentialTaskQueueCompareLess, - ), - } -} - func NewSequentialTaskQueueWithID(id any) ctasks.SequentialTaskQueue[TrackableExecutableTask] { return &SequentialTaskQueue{ id: id,