diff --git a/ui/src/cron-workflows/cron-workflow-list.tsx b/ui/src/cron-workflows/cron-workflow-list.tsx index 0b4521729adf..0502fd6bfe10 100644 --- a/ui/src/cron-workflows/cron-workflow-list.tsx +++ b/ui/src/cron-workflows/cron-workflow-list.tsx @@ -51,16 +51,13 @@ export function CronWorkflowList({match, location, history}: RouteComponentProps // save history useEffect(() => { - if (isFirstRender.current) { - isFirstRender.current = false; - return; - } - history.push( + (isFirstRender.current ? history.replace : history.push)( historyUrl('cron-workflows' + (nsUtils.getManagedNamespace() ? '' : '/{namespace}'), { namespace, sidePanel }) ); + isFirstRender.current = false; }, [namespace, sidePanel]); // internal state diff --git a/ui/src/workflow-templates/workflow-template-list.tsx b/ui/src/workflow-templates/workflow-template-list.tsx index 26dc9df01637..101bd4fc972c 100644 --- a/ui/src/workflow-templates/workflow-template-list.tsx +++ b/ui/src/workflow-templates/workflow-template-list.tsx @@ -58,16 +58,13 @@ export function WorkflowTemplateList({match, location, history}: RouteComponentP ); useEffect(() => { - if (isFirstRender.current) { - isFirstRender.current = false; - return; - } - history.push( + (isFirstRender.current ? history.replace : history.push)( historyUrl('workflow-templates' + (nsUtils.getManagedNamespace() ? '' : '/{namespace}'), { namespace, sidePanel }) ); + isFirstRender.current = false; }, [namespace, sidePanel]); // internal state diff --git a/ui/src/workflows/components/workflows-list/workflows-list.tsx b/ui/src/workflows/components/workflows-list/workflows-list.tsx index 3c87c8b9da3d..fe15d7d0f1c5 100644 --- a/ui/src/workflows/components/workflows-list/workflows-list.tsx +++ b/ui/src/workflows/components/workflows-list/workflows-list.tsx @@ -126,10 +126,6 @@ export function WorkflowsList({match, location, history}: RouteComponentProps { - if (isFirstRender.current) { - isFirstRender.current = false; - return; - } // add empty selectedPhases + selectedLabels for forward-compat w/ old version: previous code relies on them existing, so if you move up a version and back down, it breaks const options = {selectedPhases: [], selectedLabels: []} as unknown as WorkflowListRenderOptions; options.phases = phases; @@ -157,7 +153,10 @@ export function WorkflowsList({match, location, history}: RouteComponentProps { diff --git a/workflow/artifacts/s3/errors.go b/workflow/artifacts/s3/errors.go index 43b17b03e1c1..4cbc8856fc9c 100644 --- a/workflow/artifacts/s3/errors.go +++ b/workflow/artifacts/s3/errors.go @@ -1,6 +1,9 @@ package s3 import ( + stderrors "errors" + + "github.com/minio/minio-go/v7" log "github.com/sirupsen/logrus" "github.com/argoproj/argo-workflows/v3/util/errors" @@ -31,5 +34,15 @@ func isTransientS3Err(err error) bool { return true } } + // When the response body is not a parsable S3 XML document (e.g. a proxy + // or load balancer returned a bare 5xx response), minio-go sets Code to + // the raw HTTP status string ("503 Service Unavailable"), which does not + // match any entry in s3TransientErrorCodes. Fall back to StatusCode so + // 5xx responses are still treated as transient per S3 retry semantics. + var minioErr minio.ErrorResponse + if stderrors.As(err, &minioErr) && minioErr.StatusCode >= 500 && minioErr.StatusCode < 600 { + log.Errorf("Transient S3 error: %v", err) + return true + } return errors.IsTransientErr(err) } diff --git a/workflow/artifacts/s3/errors_test.go b/workflow/artifacts/s3/errors_test.go index e26f8117bada..10c0447a586e 100644 --- a/workflow/artifacts/s3/errors_test.go +++ b/workflow/artifacts/s3/errors_test.go @@ -23,3 +23,16 @@ func TestIsTransientOSSErr(t *testing.T) { requestErr := minio.ErrorResponse{Code: "RequestError"} assert.True(t, isTransientS3Err(requestErr)) } + +func TestIsTransientS3Err_BareHTTPStatus(t *testing.T) { + // minio-go falls back to resp.Status as Code when the error body is not + // parsable S3 XML (e.g. a load balancer returned a plain 5xx response). + bare503 := minio.ErrorResponse{Code: "503 Service Unavailable", StatusCode: 503} + assert.True(t, isTransientS3Err(bare503)) + + bare500 := minio.ErrorResponse{Code: "500 Internal Server Error", StatusCode: 500} + assert.True(t, isTransientS3Err(bare500)) + + bare404 := minio.ErrorResponse{Code: "404 Not Found", StatusCode: 404} + assert.False(t, isTransientS3Err(bare404)) +} diff --git a/workflow/controller/controller.go b/workflow/controller/controller.go index 0bbb9c2c9fa3..5b995566f0f9 100644 --- a/workflow/controller/controller.go +++ b/workflow/controller/controller.go @@ -458,7 +458,23 @@ func (wfc *WorkflowController) initManagers(ctx context.Context) error { return err } - wfc.syncManager.Initialize(ctx, wfList.Items) + // A non-nil error means a recorded lock holder could not be re-established + // (undecodable lock name, or an unavailable database session). This is fatal + // by design: we fail closed rather than risk a silent double-acquire. + staleHolds, err := wfc.syncManager.Initialize(ctx, wfList.Items) + if err != nil { + return err + } + // Stale holds are workflows whose recorded hold on a database-backed lock + // the database no longer has (e.g. expired while the controller was down, + // possibly acquired by someone else since). The database is the source of + // truth, so these workflows must not keep running on a hold it does not + // back: fail them; persistUpdates releases any locks they still hold. + for _, stale := range staleHolds { + woc := newWorkflowOperationCtx(stale.WF, wfc) + woc.markWorkflowFailed(ctx, fmt.Sprintf("Failed to re-establish synchronization lock at controller startup: %s", stale.Reason)) + woc.persistUpdates(ctx) + } if err := wfc.throttler.Init(wfList.Items); err != nil { return err diff --git a/workflow/controller/dag.go b/workflow/controller/dag.go index 101f7fe2c817..5d39faec051e 100644 --- a/workflow/controller/dag.go +++ b/workflow/controller/dag.go @@ -280,9 +280,11 @@ func (woc *wfOperationCtx) executeDAG(ctx context.Context, nodeName string, tmpl for _, taskName := range targetTasks { woc.executeDAGTask(ctx, dagCtx, taskName) - // It is possible that target tasks are not reconsidered (i.e. executeDAGTask is not called on them) once they are - // complete (since the DAG itself will have succeeded). To ensure that their exit handlers are run we also run them here. Note that - // calls to runOnExitNode are idempotent: it is fine if they are called more than once for the same task. + // The exit hook for each target task is started by executeDAGTask -> processTask. + // We only inspect the onExit node's status here to decide whether the DAG can be + // considered complete; calling runOnExitNode (and therefore executeTemplate) a second + // time on the same onExit node would re-run checkParallelism against the count this + // very pass just bumped. taskNode := dagCtx.getTaskNode(taskName) if taskNode != nil { @@ -298,15 +300,10 @@ func (woc *wfOperationCtx) executeDAG(ctx context.Context, nodeName string, tmpl woc.markNodeError(node.Name, err) return node, err } - if taskNode.Fulfilled() { - if taskNode.Completed() { - hasOnExitNode, onExitNode, err := woc.runOnExitNode(ctx, dagCtx.GetTask(taskName).GetExitHook(woc.execWf.Spec.Arguments), taskNode, dagCtx.boundaryID, dagCtx.tmplCtx, "tasks."+taskName, scope) - if err != nil { - return node, err - } - if hasOnExitNode && (onExitNode == nil || !onExitNode.Fulfilled()) { - onExitCompleted = false - } + if taskNode.Fulfilled() && taskNode.Completed() { + onExitNodeName := common.GenerateOnExitNodeName(taskNode.Name) + if onExitNode, onExitErr := woc.wf.GetNodeByName(onExitNodeName); onExitErr == nil && onExitNode != nil && !onExitNode.Fulfilled() { + onExitCompleted = false } } } diff --git a/workflow/sync/common.go b/workflow/sync/common.go index 4f07f5ad7d65..f5a4a29a754a 100644 --- a/workflow/sync/common.go +++ b/workflow/sync/common.go @@ -5,9 +5,23 @@ import ( ) type semaphore interface { - acquire(holderKey string, tx *transaction) bool + acquire(holderKey string, tx *transaction) (bool, error) + // reacquire re-establishes a recorded holder at controller startup. + // + // For an in-memory lock it force-registers the holder, ignoring the current + // limit, so the in-memory count reflects persisted reality even when recorded + // holders exceed a (since lowered) limit - new acquisitions then correctly + // wait until the count drains below the limit, rather than dropping a holder + // (a double-acquire) or poisoning the lock over a routine limit change. + // + // For a database-backed lock the database is the single source of truth: + // reacquire mutates nothing and only asserts the recorded hold still exists + // there. An error means the hold could not be verified - either the held row + // is gone (e.g. expired while the controller was down) or the database could + // not be queried - and the caller fails the holding workflow. + reacquire(holderKey string, tx *transaction) error checkAcquire(holderKey string, tx *transaction) (bool, bool, string) - tryAcquire(holderKey string, tx *transaction) (bool, string) + tryAcquire(holderKey string, tx *transaction) (bool, string, error) release(key string) bool addToQueue(holderKey string, priority int32, creationTime time.Time) error removeFromQueue(holderKey string) error diff --git a/workflow/sync/database_mutex_test.go b/workflow/sync/database_mutex_test.go index 042faf962b08..0d3a4802e446 100644 --- a/workflow/sync/database_mutex_test.go +++ b/workflow/sync/database_mutex_test.go @@ -39,18 +39,21 @@ func TestDatabaseMutexAcquireRelease(t *testing.T) { require.NoError(t, mutex.addToQueue("default/workflow2", 0, now.Add(time.Second))) // First acquisition should succeed - acquired, _ := mutex.tryAcquire("default/workflow1", tx) + acquired, _, err := mutex.tryAcquire("default/workflow1", tx) + require.NoError(t, err) assert.True(t, acquired, "First acquisition should succeed") // Second acquisition should fail - acquired, _ = mutex.tryAcquire("default/workflow2", tx) + acquired, _, err = mutex.tryAcquire("default/workflow2", tx) + require.NoError(t, err) assert.False(t, acquired, "Second acquisition should fail") // Release the mutex mutex.release("default/workflow1") // Now acquisition should succeed again - acquired, _ = mutex.tryAcquire("default/workflow2", tx) + acquired, _, err = mutex.tryAcquire("default/workflow2", tx) + require.NoError(t, err) assert.True(t, acquired, "Acquisition after release should succeed") }) } @@ -73,11 +76,13 @@ func TestDatabaseMutexQueueOrder(t *testing.T) { require.NoError(t, mutex.addToQueue("default/workflow1", 0, now)) require.NoError(t, mutex.addToQueue("default/workflow2", 0, now.Add(time.Second))) - acquired, _ := mutex.tryAcquire("default/workflow2", tx) + acquired, _, err := mutex.tryAcquire("default/workflow2", tx) + require.NoError(t, err) assert.False(t, acquired, "Second workflow should not acquire the mutex") // Acquire the first one - acquired, _ = mutex.tryAcquire("default/workflow1", tx) + acquired, _, err = mutex.tryAcquire("default/workflow1", tx) + require.NoError(t, err) assert.True(t, acquired, "First workflow should acquire the mutex") // Release it - this should notify the next one diff --git a/workflow/sync/database_semaphore.go b/workflow/sync/database_semaphore.go index 16b14c4e6d8d..4ed603f53ffc 100644 --- a/workflow/sync/database_semaphore.go +++ b/workflow/sync/database_semaphore.go @@ -416,12 +416,12 @@ func (s *databaseSemaphore) checkAcquire(holderKey string, tx *transaction) (boo return true, false, "" } -func (s *databaseSemaphore) acquire(holderKey string, tx *transaction) bool { +func (s *databaseSemaphore) acquire(holderKey string, tx *transaction) (bool, error) { limit := s.getLimit() existing, err := s.currentHoldersSession(*tx.db) if err != nil { s.log.WithField("key", holderKey).WithError(err).Error("Failed to acquire lock") - return false + return false, err } if len(existing) < limit { var pending []stateRecord @@ -435,7 +435,7 @@ func (s *databaseSemaphore) acquire(holderKey string, tx *transaction) bool { All(&pending) if err != nil { s.log.WithField("key", holderKey).WithError(err).Error("Failed to acquire lock") - return false + return false, err } if len(pending) > 0 { _, err := (*tx.db).SQL().Update(s.info.config.stateTable). @@ -447,7 +447,7 @@ func (s *databaseSemaphore) acquire(holderKey string, tx *transaction) bool { Exec() if err != nil { s.log.WithField("key", holderKey).WithError(err).Error("Failed to acquire lock") - return false + return false, err } } else { record := &stateRecord{ @@ -459,14 +459,14 @@ func (s *databaseSemaphore) acquire(holderKey string, tx *transaction) bool { _, err := (*tx.db).Collection(s.info.config.stateTable).Insert(record) if err != nil { s.log.WithField("key", holderKey).WithError(err).Error("Failed to acquire lock") - return false + return false, err } } s.log.WithFields(log.Fields{ "key": holderKey, "result": true, }).Info("Acquire succeeded") - return true + return true, nil } s.log.WithFields(log.Fields{ "key": holderKey, @@ -475,10 +475,29 @@ func (s *databaseSemaphore) acquire(holderKey string, tx *transaction) bool { "current_holders": len(existing), "limit": limit, }).Info("Acquire failed") - return false + return false, nil +} + +// reacquire asserts at startup that the recorded holder still holds this lock +// in the database. The database is the single source of truth for a +// database-backed lock: the held row is durable and survives the controller +// restart, so nothing is inserted or mutated here. A missing row means the +// hold no longer exists - e.g. it was expired by ExpireInactiveLocks while the +// controller was down and may since have been acquired by another holder - so +// the workflow's recorded hold is stale and the caller fails the workflow +// rather than resurrect a hold the database does not back. +func (s *databaseSemaphore) reacquire(holderKey string, tx *transaction) error { + holders, err := s.currentHoldersSession(*tx.db) + if err != nil { + return fmt.Errorf("could not verify hold on %s for %s: %w", s.longDBKey(), holderKey, err) + } + if !slices.Contains(holders, holderKey) { + return fmt.Errorf("hold on %s for %s is not present in the database", s.longDBKey(), holderKey) + } + return nil } -func (s *databaseSemaphore) tryAcquire(holderKey string, tx *transaction) (bool, string) { +func (s *databaseSemaphore) tryAcquire(holderKey string, tx *transaction) (bool, string, error) { acq, already, msg := s.checkAcquire(holderKey, tx) if already { s.log.WithFields(log.Fields{ @@ -486,7 +505,7 @@ func (s *databaseSemaphore) tryAcquire(holderKey string, tx *transaction) (bool, "result": true, "message": msg, }).Info("tryAcquire - already held") - return true, msg + return true, msg, nil } if !acq { s.log.WithFields(log.Fields{ @@ -494,22 +513,24 @@ func (s *databaseSemaphore) tryAcquire(holderKey string, tx *transaction) (bool, "result": false, "message": msg, }).Info("tryAcquire - cannot acquire") - return false, msg + return false, msg, nil } - if s.acquire(holderKey, tx) { + acquired, err := s.acquire(holderKey, tx) + if acquired { s.log.WithFields(log.Fields{ "key": holderKey, "result": true, }).Info("tryAcquire succeeded") s.notifyWaiters() - return true, "" + return true, "", nil } s.log.WithFields(log.Fields{ "key": holderKey, "result": false, "message": msg, + "error": err, }).Info("tryAcquire failed") - return false, msg + return false, msg, err } func (s *databaseSemaphore) expireLocks() { diff --git a/workflow/sync/database_semaphore_test.go b/workflow/sync/database_semaphore_test.go index bc0a7fe2137f..78bd97c1a68d 100644 --- a/workflow/sync/database_semaphore_test.go +++ b/workflow/sync/database_semaphore_test.go @@ -51,7 +51,7 @@ func TestInactiveControllerDBSemaphore(t *testing.T) { // Try to acquire - this should fail because the controller is considered inactive tx := &transaction{db: &info.session} - acquired, _ := s.tryAcquire("foo/wf-01", tx) + acquired, _, _ := s.tryAcquire("foo/wf-01", tx) assert.False(t, acquired, "Semaphore should not be acquired when controller is marked as inactive") // Now update the controller heartbeat to be current @@ -62,7 +62,7 @@ func TestInactiveControllerDBSemaphore(t *testing.T) { require.NoError(t, err) // Try again - now it should work - acquired, _ = s.tryAcquire("foo/wf-01", tx) + acquired, _, _ = s.tryAcquire("foo/wf-01", tx) assert.True(t, acquired, "Semaphore should be acquired when controller is alive") }) } @@ -105,7 +105,7 @@ func TestOtherControllerDBSemaphore(t *testing.T) { // Try to acquire - this should fail because the other controller's item is first in line tx := &transaction{db: &info.session} - acquired, _ := s.tryAcquire("foo/our-wf-01", tx) + acquired, _, _ := s.tryAcquire("foo/our-wf-01", tx) assert.False(t, acquired, "Semaphore should not be acquired when another controller's item is first in queue") // Now mark the other controller as inactive by setting its timestamp to be old @@ -117,7 +117,7 @@ func TestOtherControllerDBSemaphore(t *testing.T) { require.NoError(t, err) // Try again - now it should work because the other controller is considered inactive - acquired, _ = s.tryAcquire("foo/our-wf-01", tx) + acquired, _, _ = s.tryAcquire("foo/our-wf-01", tx) assert.True(t, acquired, "Semaphore should be acquired when other controller is marked as inactive") // Verify the semaphore is now held by our workflow @@ -166,7 +166,7 @@ func TestDifferentSemaphoreDBSemaphore(t *testing.T) { // Try to acquire - this should succeed because the other cluster's item is for a different semaphore tx := &transaction{db: &info.session} - acquired, _ := s.tryAcquire("foo/our-wf-01", tx) + acquired, _, _ := s.tryAcquire("foo/our-wf-01", tx) assert.True(t, acquired, "Semaphore should be acquired when another cluster's item is for a different semaphore") // Verify the semaphore is now held by our workflow @@ -200,38 +200,38 @@ func TestMutexAndSemaphoreWithSameName(t *testing.T) { // Mutex workflow 1 tx := &transaction{db: &info.session} require.NoError(t, mutex.addToQueue("foo/wf-mutex-1", 0, now)) - mutexAcquired1, _ := mutex.tryAcquire("foo/wf-mutex-1", tx) + mutexAcquired1, _, _ := mutex.tryAcquire("foo/wf-mutex-1", tx) assert.True(t, mutexAcquired1, "Mutex should be acquired by first workflow") // Semaphore workflow 1 require.NoError(t, semaphore.addToQueue("foo/wf-sem-1", 0, now)) - semAcquired1, _ := semaphore.tryAcquire("foo/wf-sem-1", tx) + semAcquired1, _, _ := semaphore.tryAcquire("foo/wf-sem-1", tx) assert.True(t, semAcquired1, "Semaphore should be acquired by first workflow") // Verify the mutex can't be acquired again require.NoError(t, mutex.addToQueue("foo/wf-mutex-2", 0, now)) - mutexAcquired2, _ := mutex.tryAcquire("foo/wf-mutex-2", tx) + mutexAcquired2, _, _ := mutex.tryAcquire("foo/wf-mutex-2", tx) assert.False(t, mutexAcquired2, "Mutex should not be acquired by second workflow") // But the semaphore can still be acquired (limit=2) require.NoError(t, semaphore.addToQueue("foo/wf-sem-2", 0, now)) - semAcquired2, _ := semaphore.tryAcquire("foo/wf-sem-2", tx) + semAcquired2, _, _ := semaphore.tryAcquire("foo/wf-sem-2", tx) assert.True(t, semAcquired2, "Semaphore should be acquired by second workflow") // But not a third time (because limit=2) require.NoError(t, semaphore.addToQueue("foo/wf-sem-3", 0, now)) - semAcquired3, _ := semaphore.tryAcquire("foo/wf-sem-3", tx) + semAcquired3, _, _ := semaphore.tryAcquire("foo/wf-sem-3", tx) assert.False(t, semAcquired3, "Semaphore should not be acquired by third workflow (at capacity)") // Now release the mutex mutex.release("foo/wf-mutex-1") // The mutex should be acquirable now - mutexAcquired2Again, _ := mutex.tryAcquire("foo/wf-mutex-2", tx) + mutexAcquired2Again, _, _ := mutex.tryAcquire("foo/wf-mutex-2", tx) assert.True(t, mutexAcquired2Again, "Mutex should be acquired after release") // But this shouldn't affect the semaphore's capacity - semAcquired3Again, _ := semaphore.tryAcquire("foo/wf-sem-3", tx) + semAcquired3Again, _, _ := semaphore.tryAcquire("foo/wf-sem-3", tx) assert.False(t, semAcquired3Again, "Semaphore should still be at capacity") // Now release one of the semaphore holders @@ -239,16 +239,16 @@ func TestMutexAndSemaphoreWithSameName(t *testing.T) { assert.True(t, released, "Semaphore should be released successfully") // Now we should be able to acquire the semaphore once - semAcquired3Again, _ = semaphore.tryAcquire("foo/wf-sem-3", tx) + semAcquired3Again, _, _ = semaphore.tryAcquire("foo/wf-sem-3", tx) assert.True(t, semAcquired3Again, "Semaphore should be acquired after release") // But not a fourth time (still at capacity with 2 holders) require.NoError(t, semaphore.addToQueue("foo/wf-sem-4", 0, now)) - semAcquired4, _ := semaphore.tryAcquire("foo/wf-sem-4", tx) + semAcquired4, _, _ := semaphore.tryAcquire("foo/wf-sem-4", tx) assert.False(t, semAcquired4, "Semaphore should not be acquired fourth time (at capacity again)") // The mutex should still be held - mutexAcquired3, _ := mutex.tryAcquire("foo/wf-mutex-3", tx) + mutexAcquired3, _, _ := mutex.tryAcquire("foo/wf-mutex-3", tx) assert.False(t, mutexAcquired3, "Mutex should still be held by another workflow") // Verify by checking the database directly diff --git a/workflow/sync/mutex_test.go b/workflow/sync/mutex_test.go index 91b55e90fa54..6a75d50341ce 100644 --- a/workflow/sync/mutex_test.go +++ b/workflow/sync/mutex_test.go @@ -122,7 +122,9 @@ func TestMutexLock(t *testing.T) { ctx := context.Background() wfList, err := wfclientset.ArgoprojV1alpha1().Workflows("default").List(ctx, metav1.ListOptions{}) require.NoError(t, err) - syncManager.Initialize(ctx, wfList.Items) + staleHolds, err := syncManager.Initialize(ctx, wfList.Items) + require.NoError(t, err) + require.Empty(t, staleHolds) assert.Len(t, syncManager.syncLockMap, 1) }) t.Run("WfLevelMutexAcquireAndRelease", func(t *testing.T) { diff --git a/workflow/sync/poison.go b/workflow/sync/poison.go new file mode 100644 index 000000000000..d78096a5f572 --- /dev/null +++ b/workflow/sync/poison.go @@ -0,0 +1,85 @@ +package sync + +import ( + "fmt" + "time" +) + +// poisonedLock is a sentinel lock installed into the Manager's syncLockMap when, +// during Initialize, the controller cannot re-establish a holder that a Running +// workflow's status claims to hold. +// +// The soundness invariant is: if a Workflow's status records that it is holding +// a lock, the in-memory lock map must reflect that hold after Initialize. +// Otherwise a racing workflow's TryAcquire would find the lock absent, create a +// fresh one, and acquire a lock that is - per persisted state - already held. +// For a mutex that means two workflows running concurrently under the same +// mutex. +// +// Rather than silently dropping the holder (the previous behaviour), we install +// this lock, which refuses every acquire and reports a poisoned-state message. +// That message surfaces on the waiting node's synchronization status, marking +// the node/workflow as blocked by a poisoned lock so an operator can intervene. +// +// The poison is in-memory only and is cleared on the next controller restart, +// at which point Initialize re-evaluates: if the offending workflow is no longer +// Running the lock is recreated clean; if it is still Running and still +// unresolvable, it is poisoned again. +type poisonedLock struct { + name string + reason string +} + +var _ semaphore = &poisonedLock{} + +func newPoisonedLock(name, reason string) *poisonedLock { + return &poisonedLock{name: name, reason: reason} +} + +func (p *poisonedLock) message() string { + return fmt.Sprintf("lock %s is in a poisoned state: %s; manual intervention required", p.name, p.reason) +} + +func (p *poisonedLock) acquire(_ string, _ *transaction) (bool, error) { + return false, nil +} + +// reacquire is a no-op: a poisoned lock refuses all holds until restart. It +// returns nil because the poison already protects the recorded hold; failing +// the holding workflow on top of that would punish it for an unrelated +// holder's poisoning. +func (p *poisonedLock) reacquire(_ string, _ *transaction) error { + return nil +} + +func (p *poisonedLock) checkAcquire(_ string, _ *transaction) (bool, bool, string) { + return false, false, p.message() +} + +func (p *poisonedLock) tryAcquire(_ string, _ *transaction) (bool, string, error) { + return false, p.message(), nil +} + +func (p *poisonedLock) release(_ string) bool { return false } + +func (p *poisonedLock) addToQueue(_ string, _ int32, _ time.Time) error { + return nil +} + +func (p *poisonedLock) removeFromQueue(_ string) error { return nil } + +func (p *poisonedLock) getCurrentHolders() ([]string, error) { return nil, nil } + +func (p *poisonedLock) getCurrentPending() ([]string, error) { return nil, nil } + +func (p *poisonedLock) getName() string { return p.name } + +func (p *poisonedLock) getLimit() int { return 0 } + +func (p *poisonedLock) probeWaiting() {} + +// lock returns true so that tryAcquireImpl proceeds to checkAcquire, which +// returns the poisoned-state message rather than a generic "failed to lock()". +func (p *poisonedLock) lock() bool { return true } + +func (p *poisonedLock) unlock() {} diff --git a/workflow/sync/semaphore.go b/workflow/sync/semaphore.go index 22dddbe7db1b..c7d72de0774e 100644 --- a/workflow/sync/semaphore.go +++ b/workflow/sync/semaphore.go @@ -160,12 +160,30 @@ func (s *prioritySemaphore) removeFromQueue(holderKey string) error { return nil } -func (s *prioritySemaphore) acquire(holderKey string, _ *transaction) bool { +func (s *prioritySemaphore) acquire(holderKey string, _ *transaction) (bool, error) { if s.semaphore.TryAcquire(1) { s.lockHolder[holderKey] = true - return true + return true, nil } - return false + return false, nil +} + +// reacquire re-establishes a recorded holder at startup, ignoring the limit. It +// always registers the holder, even when the recorded holders already exceed the +// current limit (e.g. the limit was lowered while held). The weighted semaphore +// is capped at the limit, so a slot is only taken when one is free; the excess is +// tracked solely in lockHolder, exactly as a downward resize leaves it. release() +// already tolerates len(lockHolder) > limit and only frees a weighted slot once +// the count drops below the limit, so new acquisitions wait until every recorded +// holder has drained. It never fails: the in-memory map is the source of truth +// here, so registering the holder is always possible. +func (s *prioritySemaphore) reacquire(holderKey string, _ *transaction) error { + if _, ok := s.lockHolder[holderKey]; ok { + return nil + } + s.semaphore.TryAcquire(1) // best effort: take a slot if one is free + s.lockHolder[holderKey] = true + return nil } func isSameWorkflowNodeKeys(firstKey, secondKey string) bool { @@ -226,23 +244,24 @@ func (s *prioritySemaphore) checkAcquire(holderKey string, _ *transaction) (bool return false, false, waitingMsg } -func (s *prioritySemaphore) tryAcquire(holderKey string, tx *transaction) (bool, string) { +func (s *prioritySemaphore) tryAcquire(holderKey string, tx *transaction) (bool, string, error) { acq, already, msg := s.checkAcquire(holderKey, tx) if already { - return true, msg + return true, msg, nil } if !acq { - return false, msg + return false, msg, nil } - if s.acquire(holderKey, tx) { + acquired, _ := s.acquire(holderKey, tx) + if acquired { s.pending.pop() limit := s.getLimit() s.log.Infof("%s acquired by %s. Lock availability: %d/%d", s.name, holderKey, limit-len(s.lockHolder), limit) s.notifyWaiters() - return true, "" + return true, "", nil } s.log.Debugf("Current semaphore Holders. %v", s.lockHolder) - return false, msg + return false, msg, nil } func (s *prioritySemaphore) probeWaiting() {} diff --git a/workflow/sync/semaphore_test.go b/workflow/sync/semaphore_test.go index d1b6b2e58495..6c192143d209 100644 --- a/workflow/sync/semaphore_test.go +++ b/workflow/sync/semaphore_test.go @@ -105,20 +105,20 @@ func testTryAcquireSemaphore(t *testing.T, factory semaphoreFactory) { require.NoError(t, s.addToQueue("default/wf-04", 0, now.Add(3*time.Second))) // verify only the first in line is allowed to acquired the semaphore var acquired bool - acquired, _ = s.tryAcquire("default/wf-04", tx) + acquired, _, _ = s.tryAcquire("default/wf-04", tx) assert.False(t, acquired) - acquired, _ = s.tryAcquire("default/wf-03", tx) + acquired, _, _ = s.tryAcquire("default/wf-03", tx) assert.False(t, acquired) - acquired, _ = s.tryAcquire("default/wf-02", tx) + acquired, _, _ = s.tryAcquire("default/wf-02", tx) assert.False(t, acquired) - acquired, _ = s.tryAcquire("default/wf-01", tx) + acquired, _, _ = s.tryAcquire("default/wf-01", tx) assert.True(t, acquired) // now that wf-01 obtained it, wf-02 can - acquired, _ = s.tryAcquire("default/wf-02", tx) + acquired, _, _ = s.tryAcquire("default/wf-02", tx) assert.True(t, acquired) - acquired, _ = s.tryAcquire("default/wf-03", tx) + acquired, _, _ = s.tryAcquire("default/wf-03", tx) assert.False(t, acquired) - acquired, _ = s.tryAcquire("default/wf-04", tx) + acquired, _, _ = s.tryAcquire("default/wf-04", tx) assert.False(t, acquired) } @@ -151,7 +151,7 @@ func testNotifyWaitersAcquire(t *testing.T, factory semaphoreFactory) { require.NoError(t, s.addToQueue("default/wf-03", 0, now.Add(2*time.Second))) tx := &transaction{db: &dbSession} - acquired, _ := s.tryAcquire("default/wf-01", tx) + acquired, _, _ := s.tryAcquire("default/wf-01", tx) assert.True(t, acquired) assert.Len(t, notified, 2) @@ -196,7 +196,7 @@ func testNotifyWorkflowFromTemplateSemaphore(t *testing.T, factory semaphoreFact require.NoError(t, s.addToQueue("foo/wf-02/nodeid-456", 0, now.Add(time.Second))) tx := &transaction{db: &dbSession} - acquired, _ := s.tryAcquire("foo/wf-01/nodeid-123", tx) + acquired, _, _ := s.tryAcquire("foo/wf-01/nodeid-123", tx) assert.True(t, acquired) assert.Len(t, notified, 1) diff --git a/workflow/sync/sync_manager.go b/workflow/sync/sync_manager.go index 3390377ccce2..52a1618d9dcf 100644 --- a/workflow/sync/sync_manager.go +++ b/workflow/sync/sync_manager.go @@ -4,6 +4,7 @@ import ( "context" "database/sql" "fmt" + "slices" "strings" "sync" "time" @@ -13,6 +14,7 @@ import ( runtimeutil "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes" + "k8s.io/client-go/util/retry" "github.com/argoproj/argo-workflows/v3/config" wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" @@ -128,6 +130,26 @@ func getUpgradedKey(wf *wfv1.Workflow, key string, level SyncLevelType) string { return key } +// upgradeHolderKey resolves a holder key recorded in a Workflow's +// synchronization status into the key form the in-memory lock expects. +// +// V2 keys are self-describing - they already encode whether the hold is at the +// workflow level (ns/wfname) or the template level (ns/wfname/nodeID) - so they +// are returned verbatim and no spec lookup is needed. Only legacy V1 keys are +// ambiguous and require getWorkflowSyncLevelByName to determine the level. This +// matters for workflowTemplateRef workflows, whose wf.Spec is empty: a V2 key +// can be re-established without ever resolving a level from the spec. +func upgradeHolderKey(ctx context.Context, wf *wfv1.Workflow, holderKey, lockName string) (string, error) { + if wfv1.CheckHolderKeyVersion(holderKey) != wfv1.HoldingNameV1 { + return holderKey, nil + } + level, err := getWorkflowSyncLevelByName(ctx, wf, lockName) + if err != nil { + return "", err + } + return getUpgradedKey(wf, holderKey, level), nil +} + type SyncLevelType int const ( @@ -158,8 +180,25 @@ const ( // and at the workflow level -> impossible to upgrade correctly // due to ambiguity. Currently we just assume workflow level. func getWorkflowSyncLevelByName(ctx context.Context, wf *wfv1.Workflow, lockName string) (SyncLevelType, error) { - if wf.Spec.Synchronization != nil { - syncItems, err := allSyncItems(ctx, wf.Spec.Synchronization) + // For workflowTemplateRef workflows wf.Spec.Synchronization and + // wf.Spec.Templates are empty; the rendered spec lives in + // wf.Status.StoredWorkflowSpec. Inspect both so the level can be resolved + // regardless of where the synchronization block was declared. + syncBlocks := []*wfv1.Synchronization{wf.Spec.Synchronization} + templates := wf.Spec.Templates + if wf.Status.StoredWorkflowSpec != nil { + syncBlocks = append(syncBlocks, wf.Status.StoredWorkflowSpec.Synchronization) + // slices.Concat allocates a fresh backing array; a plain append could + // write into wf.Spec.Templates' spare capacity and corrupt the caller's + // slice (which aliases the workflow in wfs). + templates = slices.Concat(wf.Spec.Templates, wf.Status.StoredWorkflowSpec.Templates) + } + + for _, sync := range syncBlocks { + if sync == nil { + continue + } + syncItems, err := allSyncItems(ctx, sync) if err != nil { return ErrorLevel, err } @@ -175,7 +214,7 @@ func getWorkflowSyncLevelByName(ctx context.Context, wf *wfv1.Workflow, lockName } var lastErr error - for _, template := range wf.Spec.Templates { + for _, template := range templates { if template.Synchronization != nil { syncItems, err := allSyncItems(ctx, template.Synchronization) if err != nil { @@ -199,67 +238,177 @@ func getWorkflowSyncLevelByName(ctx context.Context, wf *wfv1.Workflow, lockName return ErrorLevel, lastErr } -func (sm *Manager) Initialize(ctx context.Context, wfs []wfv1.Workflow) { - for _, wf := range wfs { +// initFailureFatal reports whether a failure to (re)establish a lock at startup +// is unrecoverable and must fail closed (crashloop). Only two cases qualify: +// - the lock name is undecodable, so there is no key to poison under and no way +// to prove the workflow's spec re-acquires the same lock; and +// - the lock is database-backed but no database session is configured, so +// nothing can back the lock. +// +// Everything else - a transient ConfigMap/DB read failure, a limit fetch +// returning 0 - is recoverable: the name decodes, so the lock can be poisoned +// (the poison key matches what a racer would compute) without halting the whole +// controller. +func (sm *Manager) initFailureFatal(lockName string) bool { + decoded, err := DecodeLockName(lockName) + if err != nil { + return true + } + return decoded.Kind == lockKindDatabase && sm.dbInfo.session == nil +} + +// poison installs a poisoned lock, refusing all acquires until the next restart. +func (sm *Manager) poison(lockName, reason string) { + log.WithFields(log.Fields{"lock": lockName, "reason": reason}).Warn("poisoning lock") + sm.syncLockMap[lockName] = newPoisonedLock(lockName, reason) +} + +// reestablishHolder re-establishes a single recorded holder of lockName in the +// in-memory lock map. lockType is "semaphore" or "mutex" (for logging) and +// initLock builds the backing lock when it is not yet present. +// +// It always reads the current lock from the map (never a stale local), so a lock +// poisoned by a previous holder stays poisoned and is not acquired on an orphaned +// object. A returned error is fatal (see initFailureFatal). An init failure or an +// unresolvable holder key poisons the lock. A non-empty staleReason means the +// hold could not be verified against its backing store (database-backed locks +// only); the caller fails the workflow, whose teardown releases its locks. +// +// Poisoning, not leaving absent, is required for the init-failure case: a +// ConfigMap-backed semaphore keeps its holders only in memory, so if we left the +// lock absent, prepAcquire would later rebuild it with zero holders once the +// backend recovered and let a racer acquire the slot this holder still owns. The +// poison is lock-scoped and clears on the next controller restart. +func (sm *Manager) reestablishHolder(ctx context.Context, wf *wfv1.Workflow, lockType, lockName, holder string, initLock func(string) (semaphore, error)) (staleReason string, fatalErr error) { + if sm.syncLockMap[lockName] == nil { + lock, err := initLock(lockName) + if err != nil { + if sm.initFailureFatal(lockName) { + // Undecodable name or a database hold with no session: we cannot + // poison to protect the recorded hold, so halt for an operator + // rather than risk a silent double-acquire. + log.WithField(lockType, lockName).WithError(err).Error("cannot initialize lock, failing closed") + return "", fmt.Errorf("cannot re-establish %s %q held by workflow %s/%s at startup: %w", lockType, lockName, wf.Namespace, wf.Name, err) + } + // Recoverable (e.g. transient ConfigMap unavailability) but the name + // decodes, so poison protects the recorded hold without crashlooping. + // Leaving the lock absent would be unsound: an in-memory semaphore + // rebuilt later would have zero holders and let a racer double-acquire. + sm.poison(lockName, fmt.Sprintf("controller could not initialize lock at startup: %v", err)) + return "", nil + } + sm.syncLockMap[lockName] = lock + } + + if holder == "" { + return "", nil + } + + key, err := upgradeHolderKey(ctx, wf, holder, lockName) + if err != nil { + sm.poison(lockName, fmt.Sprintf("controller could not re-establish recorded holder %q at startup: %v", holder, err)) + return "", nil + } + + // Re-read from the map: a previous holder of this same lock may have poisoned + // it, in which case reacquire is a no-op and we must not resurrect it. + lock := sm.syncLockMap[lockName] + // For in-memory locks reacquire force-registers the hold ignoring the limit, + // so the recorded hold is always represented: dropping it would let a racer + // double-acquire a semaphore whose holders exceed a lowered limit, and + // poisoning would block the whole shared lock over a routine limit change. + // For database-backed locks the database is the single source of truth and + // reacquire only asserts the hold is still recorded there; if it is not, the + // workflow's recorded hold is stale and the workflow is failed rather than + // left to run on a hold the database no longer backs. + if err := lock.reacquire(key, &transaction{db: &sm.dbInfo.session}); err != nil { + log.WithFields(log.Fields{"key": key, lockType: lockName}).WithError(err).Warn("could not re-establish recorded holder, failing the workflow") + return fmt.Sprintf("could not re-establish %s %q at controller startup: %v", lockType, lockName, err), nil + } + log.WithFields(log.Fields{"key": key, lockType: lockName}).Info("re-established recorded holder") + return "", nil +} + +// StaleHold records a workflow whose recorded hold on a database-backed lock +// could not be verified against the database during Initialize. The database +// is the single source of truth for such locks, so the workflow is running on +// a hold the database no longer backs (e.g. it was expired while the +// controller was down and may since have been acquired by another holder). +// The controller fails these workflows; their teardown releases any locks +// they still hold. +type StaleHold struct { + WF *wfv1.Workflow + Reason string +} + +// Initialize re-establishes, in the in-memory lock map, the holds that Running +// workflows record in their status. +// +// It fails closed only when a holder is genuinely unrecoverable (see +// initFailureFatal): an undecodable lock name, or a database-backed hold with no +// database session. Those return an error the controller treats as fatal, +// because we can neither poison the lock nor prove the spec re-acquires it, so +// continuing risks a silent double-acquire. +// +// Recoverable failures never crashloop: a lock that cannot be built (transient +// ConfigMap/DB read) or whose holder key is unresolvable is poisoned (lock-scoped, +// clears on restart). A database-backed hold that the database no longer records +// is returned as a StaleHold (at most one per workflow) for the controller to +// fail the workflow. +func (sm *Manager) Initialize(ctx context.Context, wfs []wfv1.Workflow) ([]StaleHold, error) { + // Hold the lock for the whole pass: a DB-backed Manager starts its + // backgroundNotifier goroutine in createLockManager (before initManagers + // calls Initialize), and that goroutine iterates syncLockMap under sm.lock. + sm.lock.Lock() + defer sm.lock.Unlock() + + var staleHolds []StaleHold + for i := range wfs { + wf := &wfs[i] if wf.Status.Synchronization == nil { continue } + // Record only the first stale hold per workflow (failing it once is + // enough), but keep re-establishing its remaining holds: they are real + // until the failed workflow's teardown releases them, and dropping one + // from the in-memory map would let a racer double-acquire it. + stale := func(reason string) { + // neat way to prevent double counting workflows, this works because we iterate workflow by workflow + // we only need to ask was the last stale workflow the same as this one as a result. + if len(staleHolds) == 0 || staleHolds[len(staleHolds)-1].WF != wf { + staleHolds = append(staleHolds, StaleHold{WF: wf, Reason: reason}) + } + } + if wf.Status.Synchronization.Semaphore != nil { for _, holding := range wf.Status.Synchronization.Semaphore.Holding { - semaphore := sm.syncLockMap[holding.Semaphore] - if semaphore == nil { - var err error - semaphore, err = sm.initializeSemaphore(holding.Semaphore) - if err != nil { - log.Warnf("cannot initialize semaphore '%s': %v", holding.Semaphore, err) - continue - } - sm.syncLockMap[holding.Semaphore] = semaphore - } - - for _, holders := range holding.Holders { - level, err := getWorkflowSyncLevelByName(ctx, &wf, holding.Semaphore) + for _, holder := range holding.Holders { + reason, err := sm.reestablishHolder(ctx, wf, "semaphore", holding.Semaphore, holder, sm.initializeSemaphore) if err != nil { - log.Warnf("cannot obtain lock level for '%s' : %v", holding.Semaphore, err) - continue + return nil, err } - key := getUpgradedKey(&wf, holders, level) - tx := &transaction{db: &sm.dbInfo.session} - if semaphore != nil && semaphore.acquire(key, tx) { - log.Infof("Lock acquired by %s from %s", key, holding.Semaphore) + if reason != "" { + stale(reason) } } - } } if wf.Status.Synchronization.Mutex != nil { for _, holding := range wf.Status.Synchronization.Mutex.Holding { - mutex := sm.syncLockMap[holding.Mutex] - if mutex == nil { - var err error - mutex, err = sm.initializeMutex(holding.Mutex) - if err != nil { - log.Warnf("cannot initialize mutex '%s': %v", holding.Mutex, err) - continue - } - if holding.Holder != "" { - level, err := getWorkflowSyncLevelByName(ctx, &wf, holding.Mutex) - if err != nil { - log.Warnf("cannot obtain lock level for '%s' : %v", holding.Mutex, err) - continue - } - key := getUpgradedKey(&wf, holding.Holder, level) - tx := &transaction{db: &sm.dbInfo.session} - mutex.acquire(key, tx) - } - sm.syncLockMap[holding.Mutex] = mutex + reason, err := sm.reestablishHolder(ctx, wf, "mutex", holding.Mutex, holding.Holder, sm.initializeMutex) + if err != nil { + return nil, err + } + if reason != "" { + stale(reason) } } } } log.Infof("Manager initialized successfully") + return staleHolds, nil } // TryAcquire tries to acquire the lock from semaphore. @@ -304,56 +453,62 @@ func (sm *Manager) TryAcquire(ctx context.Context, wf *wfv1.Workflow, nodeName s var updated bool var already bool var msg string - var failedLockName string - var lastErr error - for retryCounter := range 5 { - err := sm.dbInfo.session.TxContext(ctx, func(sess db.Session) error { - log.WithFields(log.Fields{ - "holderKey": holderKey, - "attempt": retryCounter + 1, - }).Info("TryAcquire - starting transaction") - var err error + // Backoff bounds: sm.lock is held for the whole loop, so cap each sleep + // modestly. Jitter prevents a fleet of replicas from retrying in lockstep + // after a shared conflict burst. + backoff := wait.Backoff{ + Steps: 5, + Duration: 10 * time.Millisecond, + Factor: 2.0, + Jitter: 0.5, + Cap: 600 * time.Millisecond, + } + attempt := 0 + err = retry.OnError(backoff, isRetryableSyncError, func() error { + attempt++ + log.WithFields(log.Fields{ + "holderKey": holderKey, + "attempt": attempt, + }).Info("TryAcquire - starting transaction") + txErr := sm.dbInfo.session.TxContext(ctx, func(sess db.Session) error { + var implErr error tx := &transaction{db: &sess} - already, updated, msg, failedLockName, err = sm.tryAcquireImpl(wf, tx, holderKey, failedLockName, syncItems, lockKeys) + already, updated, msg, failedLockName, implErr = sm.tryAcquireImpl(wf, tx, holderKey, failedLockName, syncItems, lockKeys) + return implErr + }, &sql.TxOptions{Isolation: sql.LevelSerializable, ReadOnly: false}) + if txErr != nil { log.WithFields(log.Fields{ "holderKey": holderKey, - "attempt": retryCounter + 1, - }).Info("TryAcquire - transaction completed") - return err - }, &sql.TxOptions{ - Isolation: sql.LevelSerializable, - ReadOnly: false, - }) - if err == nil { - return already, updated, msg, failedLockName, nil + "attempt": attempt, + "error": txErr, + "retryable": isRetryableSyncError(txErr), + }).Info("TryAcquire - transaction failed") } - lastErr = err - // Check if this is a serialization error - if strings.Contains(err.Error(), "serialization") || - strings.Contains(err.Error(), "dependencies") || - strings.Contains(err.Error(), "deadlock") || - strings.Contains(err.Error(), "rollback") { - log.WithFields(log.Fields{ - "holderKey": holderKey, - "attempt": retryCounter + 1, - "error": err, - }).Info("TryAcquire - serialization conflict, retrying") - continue - } else { - log.WithFields(log.Fields{ - "holderKey": holderKey, - "attempt": retryCounter + 1, - "error": err, - }).Info("TryAcquire - tx failed") - } - // For other errors, return immediately + return txErr + }) + if err != nil { return false, false, "", failedLockName, err } - return false, false, "", failedLockName, fmt.Errorf("failed after %d retries: %w", 5, lastErr) + return already, updated, msg, failedLockName, nil } return sm.tryAcquireImpl(wf, nil, holderKey, failedLockName, syncItems, lockKeys) } +// isRetryableSyncError reports whether a TryAcquire transaction failure should +// be retried. Matches PostgreSQL SERIALIZABLE conflict (40001), deadlock +// (40P01), and explicit rollback messages by substring against the driver's +// text. +func isRetryableSyncError(err error) bool { + if err == nil { + return false + } + s := strings.ToLower(err.Error()) + return strings.Contains(s, "serialization") || + strings.Contains(s, "dependencies") || + strings.Contains(s, "deadlock") || + strings.Contains(s, "rollback") +} + func (sm *Manager) prepAcquire(wf *wfv1.Workflow, holderKey string, syncItems []*syncItem, lockKeys []string) (bool, string, string, error) { for i, lockKey := range lockKeys { lock, found := sm.syncLockMap[lockKey] @@ -422,7 +577,16 @@ func (sm *Manager) tryAcquireImpl(wf *wfv1.Workflow, tx *transaction, holderKey updated := false for i, lockKey := range lockKeys { lock := sm.syncLockMap[lockKey] - acquired, msg := lock.tryAcquire(holderKey, tx) + var acquired bool + var acquireErr error + acquired, msg, acquireErr = lock.tryAcquire(holderKey, tx) + if acquireErr != nil { + // Surface the underlying error so callers (e.g. TryAcquire's + // retry loop) can decide whether it is retryable. Transient + // database errors like PostgreSQL SQLSTATE 40001 must reach + // the retry detector untouched. + return false, false, "", failedLockName, acquireErr + } if !acquired { return false, false, "", failedLockName, fmt.Errorf("bug: failed to acquire something that should have been checked: %s", msg) } diff --git a/workflow/sync/sync_manager_test.go b/workflow/sync/sync_manager_test.go index f62e9f7c4a94..163b6de672b5 100644 --- a/workflow/sync/sync_manager_test.go +++ b/workflow/sync/sync_manager_test.go @@ -368,7 +368,9 @@ func TestSemaphoreWfLevel(t *testing.T) { wfList, err := wfclientset.ArgoprojV1alpha1().Workflows("default").List(ctx, metav1.ListOptions{}) require.NoError(t, err) - syncManager.Initialize(ctx, wfList.Items) + staleHolds, err := syncManager.Initialize(ctx, wfList.Items) + require.NoError(t, err) + require.Empty(t, staleHolds) assert.Len(t, syncManager.syncLockMap, 1) }) t.Run("InitializeSynchronizationWithInvalid", func(t *testing.T) { @@ -380,7 +382,11 @@ func TestSemaphoreWfLevel(t *testing.T) { wfclientset := fakewfclientset.NewSimpleClientset(wf) wfList, err := wfclientset.ArgoprojV1alpha1().Workflows("default").List(ctx, metav1.ListOptions{}) require.NoError(t, err) - syncManager.Initialize(ctx, wfList.Items) + // An invalid/undecodable lock name now fails closed rather than being + // silently skipped: we can't poison it or prove the spec re-acquires it, + // so the controller halts for an operator instead of risking a double-acquire. + _, err = syncManager.Initialize(ctx, wfList.Items) + require.Error(t, err) assert.Empty(t, syncManager.syncLockMap) }) t.Run("InitializeMultipleWorkflowsHolding", func(t *testing.T) { @@ -411,7 +417,9 @@ func TestSemaphoreWfLevel(t *testing.T) { wf2.Status.Synchronization.Semaphore.Holding[0].Holders = []string{"default/hello-world-two"} // Initialize with both workflows - syncManager.Initialize(ctx, []wfv1.Workflow{*wf1, *wf2}) + staleHolds, err := syncManager.Initialize(ctx, []wfv1.Workflow{*wf1, *wf2}) + require.NoError(t, err) + require.Empty(t, staleHolds) // Verify the semaphore was created assert.Len(t, syncManager.syncLockMap, 1) @@ -426,6 +434,222 @@ func TestSemaphoreWfLevel(t *testing.T) { assert.Contains(t, holders, "default/hello-world-two") }) + t.Run("InitializeConfigMapInitFailurePoisonsNotFatal", func(t *testing.T) { + // The ConfigMap backing the semaphore limit is absent, so the limit fetch + // fails and initializeSemaphore errors. The name decodes (kind ConfigMap), + // so this is recoverable - it must not crashloop the controller. It must + // POISON the lock rather than leave it absent: a ConfigMap semaphore keeps + // its holders only in memory, so leaving it absent would let a later + // rebuild start with zero holders and a racer double-acquire this hold. + kubeClient := fake.NewSimpleClientset() // no ConfigMap created + syncManager := NewLockManager(ctx, kubeClient, "", nil, GetSyncLimitFunc(kubeClient), func(key string) { + }, WorkflowExistenceFunc) + + wf := wfv1.MustUnmarshalWorkflow(wfWithStatus) + wf.Name = "cm-holder" + wf.Status.Synchronization.Semaphore.Holding[0].Holders = []string{"default/cm-holder"} + + staleHolds, err := syncManager.Initialize(ctx, []wfv1.Workflow{*wf}) + require.NoError(t, err, "a recoverable ConfigMap failure must not be fatal") + require.Empty(t, staleHolds) + lock := syncManager.syncLockMap["default/ConfigMap/my-config/workflow"] + require.NotNil(t, lock) + _, poisoned := lock.(*poisonedLock) + assert.True(t, poisoned, "decodable lock that cannot be initialized must be poisoned, not left absent") + }) + + t.Run("InitializeLoweredLimitForceRegistersAllHolders", func(t *testing.T) { + // Two workflows are recorded as holders but the limit is now 1 (lowered + // from 2). Lowering a limit is routine: it must NOT poison the shared + // semaphore (which would block every contender until restart), NOR drop + // the over-limit holder (which would let a racer double-acquire once the + // other holder releases). Both holders must be force-registered so the + // in-memory count reflects reality and new acquisitions wait until the + // count drains below the limit. + kubeClient := fake.NewSimpleClientset() + _, err := kubeClient.CoreV1().ConfigMaps("default").Create(ctx, &v1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{Name: "my-config"}, + Data: map[string]string{"workflow": "1"}, + }, metav1.CreateOptions{}) + require.NoError(t, err) + + syncManager := NewLockManager(ctx, kubeClient, "", nil, GetSyncLimitFunc(kubeClient), func(key string) { + }, WorkflowExistenceFunc) + + wf1 := wfv1.MustUnmarshalWorkflow(wfWithStatus) + wf1.Name = "holder-one" + wf1.Status.Synchronization.Semaphore.Holding[0].Holders = []string{"default/holder-one"} + wf2 := wfv1.MustUnmarshalWorkflow(wfWithStatus) + wf2.Name = "holder-two" + wf2.Status.Synchronization.Semaphore.Holding[0].Holders = []string{"default/holder-two"} + + staleHolds, err := syncManager.Initialize(ctx, []wfv1.Workflow{*wf1, *wf2}) + require.NoError(t, err) + require.Empty(t, staleHolds) + + lock := syncManager.syncLockMap["default/ConfigMap/my-config/workflow"] + require.NotNil(t, lock) + _, poisoned := lock.(*poisonedLock) + assert.False(t, poisoned, "lowering a limit must not poison the semaphore") + holders, err := lock.getCurrentHolders() + require.NoError(t, err) + assert.Len(t, holders, 2, "both recorded holders must be force-registered, even over the lowered limit") + assert.Contains(t, holders, "default/holder-one") + assert.Contains(t, holders, "default/holder-two") + + // A new contender must NOT be able to acquire: the count (2) is over the + // limit (1), so the lock is correctly unavailable until holders drain. + racer := wfv1.MustUnmarshalWorkflow(wfWithStatus) + racer.Name = "racer" + acquired, _, _, _, err := syncManager.TryAcquire(ctx, racer, "", racer.Spec.Synchronization) + require.NoError(t, err) + assert.False(t, acquired, "a racer must wait while recorded holders exceed the lowered limit") + }) + + t.Run("InitializeMutexWithWorkflowTemplateRef", func(t *testing.T) { + // A workflowTemplateRef workflow has empty Spec.Synchronization and + // Spec.Templates; its rendered mutex lives in Status.StoredWorkflowSpec. + // Its recorded holder uses a self-describing V2 key, so Initialize must + // re-establish the hold WITHOUT needing to resolve a level from wf.Spec. + // Before the fix, getWorkflowSyncLevelByName could not find the mutex in + // the empty spec, Initialize hit "cannot obtain lock level" and dropped + // the holder, leaving the lock free for a racing workflow to acquire. + syncManager := NewLockManager(ctx, kube, "", nil, syncLimitFunc, func(key string) { + }, WorkflowExistenceFunc) + + wf := &wfv1.Workflow{ + ObjectMeta: metav1.ObjectMeta{Name: "tmplref-holder", Namespace: "default"}, + Spec: wfv1.WorkflowSpec{ + WorkflowTemplateRef: &wfv1.WorkflowTemplateRef{Name: "my-template"}, + }, + Status: wfv1.WorkflowStatus{ + StoredWorkflowSpec: &wfv1.WorkflowSpec{ + Synchronization: &wfv1.Synchronization{ + Mutexes: []*wfv1.Mutex{{Name: "my-mutex"}}, + }, + }, + Synchronization: &wfv1.SynchronizationStatus{ + Mutex: &wfv1.MutexStatus{ + Holding: []wfv1.MutexHolding{{ + Mutex: "default/Mutex/my-mutex", + Holder: "default/tmplref-holder", + }}, + }, + }, + }, + } + + staleHolds, err := syncManager.Initialize(ctx, []wfv1.Workflow{*wf}) + require.NoError(t, err) + require.Empty(t, staleHolds) + + lock := syncManager.syncLockMap["default/Mutex/my-mutex"] + require.NotNil(t, lock) + _, poisoned := lock.(*poisonedLock) + assert.False(t, poisoned, "holder should be re-established, not poisoned") + holders, err := lock.getCurrentHolders() + require.NoError(t, err) + assert.Equal(t, []string{"default/tmplref-holder"}, holders) + }) + + t.Run("InitializePoisonsUnresolvableHolder", func(t *testing.T) { + // A legacy V1 holder key that cannot be resolved to a level (no matching + // synchronization block in spec or stored spec) must poison the lock, + // rather than silently dropping the holder. Otherwise a racing workflow + // could acquire a mutex that persisted state says is already held. + syncManager := NewLockManager(ctx, kube, "", nil, syncLimitFunc, func(key string) { + }, WorkflowExistenceFunc) + + holder := &wfv1.Workflow{ + ObjectMeta: metav1.ObjectMeta{Name: "ghost-holder", Namespace: "default"}, + Status: wfv1.WorkflowStatus{ + Synchronization: &wfv1.SynchronizationStatus{ + Mutex: &wfv1.MutexStatus{ + Holding: []wfv1.MutexHolding{{ + Mutex: "default/Mutex/my-mutex", + Holder: "ghost-holder", // V1 key, unresolvable against empty spec + }}, + }, + }, + }, + } + + // A decodable name whose holder can't be re-established is poisoned, not + // fatal: the poison key matches what a racer computes, so soundness holds. + staleHolds, err := syncManager.Initialize(ctx, []wfv1.Workflow{*holder}) + require.NoError(t, err) + require.Empty(t, staleHolds) + + lock := syncManager.syncLockMap["default/Mutex/my-mutex"] + require.NotNil(t, lock) + _, poisoned := lock.(*poisonedLock) + assert.True(t, poisoned, "unresolvable holder must poison the lock") + + // A racing workflow must not be able to acquire the poisoned mutex. + racer := wfv1.MustUnmarshalWorkflow(wfWithMutex) + racer.Name = "racer" + acquired, _, msg, _, err := syncManager.TryAcquire(ctx, racer, "", racer.Spec.Synchronization) + require.NoError(t, err) + assert.False(t, acquired, "racing workflow must not acquire a poisoned lock") + assert.Contains(t, msg, "poisoned state") + }) + + t.Run("InitializeFailsClosedOnUndecodableLockName", func(t *testing.T) { + // A holder whose lock name cannot even be decoded is an unknowable hold: + // we can neither poison the lock (no key) nor prove the spec re-acquires + // it, so continuing risks a silent double-acquire. Initialize must fail + // closed (return an error) so the controller halts for an operator. + syncManager := NewLockManager(ctx, kube, "", nil, syncLimitFunc, func(key string) { + }, WorkflowExistenceFunc) + + holder := &wfv1.Workflow{ + ObjectMeta: metav1.ObjectMeta{Name: "broken-holder", Namespace: "default"}, + Status: wfv1.WorkflowStatus{ + Synchronization: &wfv1.SynchronizationStatus{ + Mutex: &wfv1.MutexStatus{ + Holding: []wfv1.MutexHolding{{ + Mutex: "garbage-undecodable-name", + Holder: "default/broken-holder", + }}, + }, + }, + }, + } + + _, err = syncManager.Initialize(ctx, []wfv1.Workflow{*holder}) + require.Error(t, err) + assert.Contains(t, err.Error(), "broken-holder") + assert.NotContains(t, syncManager.syncLockMap, "garbage-undecodable-name") + }) + + t.Run("InitializeFailsClosedWhenDBUnavailable", func(t *testing.T) { + // A Running workflow holds a database-backed lock, but the manager has no + // database session (e.g. the DB was unreachable at startup). The lock + // cannot be re-established, so Initialize must fail closed and let the + // controller crashloop until the database is reachable. + syncManager := NewLockManager(ctx, kube, "", nil, syncLimitFunc, func(key string) { + }, WorkflowExistenceFunc) + require.Nil(t, syncManager.dbInfo.session) + + holder := &wfv1.Workflow{ + ObjectMeta: metav1.ObjectMeta{Name: "db-holder", Namespace: "default"}, + Status: wfv1.WorkflowStatus{ + Synchronization: &wfv1.SynchronizationStatus{ + Mutex: &wfv1.MutexStatus{ + Holding: []wfv1.MutexHolding{{ + Mutex: "default/Database/my-db-lock", + Holder: "default/db-holder", + }}, + }, + }, + }, + } + + _, err = syncManager.Initialize(ctx, []wfv1.Workflow{*holder}) + require.Error(t, err) + assert.Contains(t, err.Error(), "database session") + }) + t.Run("WfLevelAcquireAndRelease", func(t *testing.T) { var nextKey string syncManager := NewLockManager(ctx, kube, "", nil, syncLimitFunc, func(key string) { @@ -1541,7 +1765,9 @@ func TestMutexMigration(t *testing.T) { syncMgr.syncLockMap = make(map[string]semaphore) wfs := []wfv1.Workflow{*wfMutex2.DeepCopy()} - syncMgr.Initialize(ctx, wfs) + staleHolds, err := syncMgr.Initialize(ctx, wfs) + require.NoError(err) + require.Empty(staleHolds) syncItems, err := allSyncItems(ctx, wfMutex2.Spec.Synchronization) require.NoError(err) @@ -1585,7 +1811,9 @@ func TestMutexMigration(t *testing.T) { assert.Equal(1, numFound) wfs := []wfv1.Workflow{*wfMutex3.DeepCopy()} - syncMgr.Initialize(ctx, wfs) + staleHolds, err := syncMgr.Initialize(ctx, wfs) + require.NoError(err) + require.Empty(staleHolds) syncItems, err := allSyncItems(ctx, wfMutex3.Spec.Templates[1].Synchronization) require.NoError(err) @@ -1858,3 +2086,198 @@ func TestUnconfiguredSemaphores(t *testing.T) { } }) } + +// TestDatabaseInitializeLoweredLimitAfterRestart drives the full restart path +// through Manager.Initialize for a database-backed semaphore. +// +// Scenario: a controller acquires a database semaphore for two workflows under +// limit 2, the controller is turned off, the limit is lowered to 1, and the +// controller restarts while both workflows are still Running. On restart +// Initialize re-establishes the recorded holders from the workflows' status. +// +// For a database semaphore the holds live in durable rows, so the holder count +// survives the restart regardless of the lowered limit. We assert that: +// - the lock is live (not poisoned - lowering a limit is routine); +// - both holders are still counted, even over the lowered limit; +// - each still-running workflow keeps its lock when it re-reconciles; +// - a new contender waits until the over-subscription drains below the limit. +func TestDatabaseInitializeLoweredLimitAfterRestart(t *testing.T) { + for _, dbType := range testDBTypes { + t.Run(string(dbType), func(t *testing.T) { + const dbLimitKey = "default/my-database-sem" + const lockName = "default/Database/my-database-sem" + + info, cleanup, syncConfig, err := createTestDBSession(t, dbType) + require.NoError(t, err) + defer cleanup() + + // Cancel the context before cleanup closes the database session so + // the managers' background notifier goroutines stop first (upstream + // these tests rely on t.Context() being canceled at test end). + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Original limit: 2. + _, err = info.session.SQL().Exec("INSERT INTO sync_limit (name, sizelimit) VALUES (?, ?)", dbLimitKey, 2) + require.NoError(t, err) + + // --- Controller instance #1: two workflows acquire the semaphore. --- + mgr1 := createLockManager(ctx, info.session, &syncConfig, nil, func(key string) {}, WorkflowExistenceFunc) + + creationTime := metav1.NewTime(time.Now()) + wf1 := wfv1.MustUnmarshalWorkflow(wfWithDBSemaphore) + wf1.Name = "holder-one" + wf1.CreationTimestamp = creationTime + wf2 := wfv1.MustUnmarshalWorkflow(wfWithDBSemaphore) + wf2.Name = "holder-two" + wf2.CreationTimestamp = creationTime + + acquired, _, _, _, err := mgr1.TryAcquire(ctx, wf1, "", wf1.Spec.Synchronization) + require.NoError(t, err) + require.True(t, acquired, "holder-one should acquire under limit 2") + acquired, _, _, _, err = mgr1.TryAcquire(ctx, wf2, "", wf2.Spec.Synchronization) + require.NoError(t, err) + require.True(t, acquired, "holder-two should acquire under limit 2") + + // Both workflows now record the hold in their status - this is what the + // informer persists and re-lists into Initialize after a restart. + require.NotNil(t, wf1.Status.Synchronization) + require.NotNil(t, wf2.Status.Synchronization) + + // --- Limit lowered to 1 while the controller is down. --- + _, err = info.session.SQL().Exec("UPDATE sync_limit SET sizelimit = ? WHERE name = ?", 1, dbLimitKey) + require.NoError(t, err) + + // --- Controller instance #2: restart. Initialize from the running workflows. --- + mgr2 := createLockManager(ctx, info.session, &syncConfig, nil, func(key string) {}, WorkflowExistenceFunc) + staleHolds, err := mgr2.Initialize(ctx, []wfv1.Workflow{*wf1, *wf2}) + require.NoError(t, err) + require.Empty(t, staleHolds, "holders backed by database rows must not be reported stale") + + lock := mgr2.syncLockMap[lockName] + require.NotNil(t, lock) + _, poisoned := lock.(*poisonedLock) + assert.False(t, poisoned, "lowering a DB semaphore limit must not poison the lock") + + holders, err := lock.getCurrentHolders() + require.NoError(t, err) + assert.Len(t, holders, 2, "both recorded holders must survive the restart, even over the lowered limit") + + // Each still-running workflow re-reconciles and re-acquires the lock it + // already holds. Both must succeed regardless of the lowered limit, since + // the already-held check precedes the limit check. + acquired, _, _, _, err = mgr2.TryAcquire(ctx, wf1, "", wf1.Spec.Synchronization) + require.NoError(t, err) + assert.True(t, acquired, "holder-one must keep its lock after the limit was lowered") + acquired, _, _, _, err = mgr2.TryAcquire(ctx, wf2, "", wf2.Spec.Synchronization) + require.NoError(t, err) + assert.True(t, acquired, "holder-two must keep its lock after the limit was lowered") + + // A new contender must wait: holders (2) exceed the lowered limit (1). + racer := wfv1.MustUnmarshalWorkflow(wfWithDBSemaphore) + racer.Name = "racer" + racer.CreationTimestamp = creationTime + acquired, _, _, _, err = mgr2.TryAcquire(ctx, racer, "", racer.Spec.Synchronization) + require.NoError(t, err) + assert.False(t, acquired, "a racer must wait while holders exceed the lowered limit") + + // Drain one holder. Still at the limit (1 holder, limit 1) - racer waits. + mgr2.Release(ctx, wf1, "", wf1.Spec.Synchronization) + acquired, _, _, _, err = mgr2.TryAcquire(ctx, racer, "", racer.Spec.Synchronization) + require.NoError(t, err) + assert.False(t, acquired, "racer still waits: one holder remains at limit 1") + + // Drain the last holder - now below the limit, so the racer is admitted. + mgr2.Release(ctx, wf2, "", wf2.Spec.Synchronization) + acquired, _, _, _, err = mgr2.TryAcquire(ctx, racer, "", racer.Spec.Synchronization) + require.NoError(t, err) + assert.True(t, acquired, "racer acquires once holders drain below the lowered limit") + }) + } +} + +// TestDatabaseInitializeStaleHoldFailsWorkflow drives the restart path through +// Manager.Initialize for a database-backed semaphore whose held row has gone +// away while the controller was down (e.g. expired by ExpireInactiveLocks and +// possibly acquired by another holder since). +// +// The database is the single source of truth for a database-backed lock, so +// Initialize must not resurrect the hold: it asserts the row exists, and when +// it does not it reports the workflow as a StaleHold for the controller to +// fail. We assert that: +// - the stale workflow is reported, with a reason naming the lock; +// - the lock is live (not poisoned) - the database still arbitrates it; +// - the hold is not re-inserted into the database; +// - a workflow whose row survived is re-established normally, not reported. +func TestDatabaseInitializeStaleHoldFailsWorkflow(t *testing.T) { + for _, dbType := range testDBTypes { + t.Run(string(dbType), func(t *testing.T) { + const dbLimitKey = "default/my-database-sem" + const lockName = "default/Database/my-database-sem" + + info, cleanup, syncConfig, err := createTestDBSession(t, dbType) + require.NoError(t, err) + defer cleanup() + + // Cancel the context before cleanup closes the database session so + // the managers' background notifier goroutines stop first (upstream + // these tests rely on t.Context() being canceled at test end). + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + _, err = info.session.SQL().Exec("INSERT INTO sync_limit (name, sizelimit) VALUES (?, ?)", dbLimitKey, 2) + require.NoError(t, err) + + // --- Controller instance #1: two workflows acquire the semaphore. --- + mgr1 := createLockManager(ctx, info.session, &syncConfig, nil, func(key string) {}, WorkflowExistenceFunc) + + creationTime := metav1.NewTime(time.Now()) + wf1 := wfv1.MustUnmarshalWorkflow(wfWithDBSemaphore) + wf1.Name = "stale-holder" + wf1.CreationTimestamp = creationTime + wf2 := wfv1.MustUnmarshalWorkflow(wfWithDBSemaphore) + wf2.Name = "live-holder" + wf2.CreationTimestamp = creationTime + + acquired, _, _, _, err := mgr1.TryAcquire(ctx, wf1, "", wf1.Spec.Synchronization) + require.NoError(t, err) + require.True(t, acquired, "stale-holder should acquire under limit 2") + acquired, _, _, _, err = mgr1.TryAcquire(ctx, wf2, "", wf2.Spec.Synchronization) + require.NoError(t, err) + require.True(t, acquired, "live-holder should acquire under limit 2") + require.NotNil(t, wf1.Status.Synchronization) + require.NotNil(t, wf2.Status.Synchronization) + + // --- wf1's held row disappears while the controller is down. --- + _, err = info.session.SQL().Exec("DELETE FROM sync_state WHERE name = ? AND workflowkey = ?", "sem/"+dbLimitKey, "default/stale-holder") + require.NoError(t, err) + + // --- Controller instance #2: restart. Initialize from the running workflows. --- + mgr2 := createLockManager(ctx, info.session, &syncConfig, nil, func(key string) {}, WorkflowExistenceFunc) + staleHolds, err := mgr2.Initialize(ctx, []wfv1.Workflow{*wf1, *wf2}) + require.NoError(t, err, "a stale hold is not fatal: the workflow is failed, not the controller") + + require.Len(t, staleHolds, 1, "only the workflow whose row is gone is stale") + assert.Equal(t, "stale-holder", staleHolds[0].WF.Name) + assert.Contains(t, staleHolds[0].Reason, lockName) + + // The lock stays live: the database arbitrates it, and the live holder + // (and any future contender) must not be blocked by wf1's stale status. + lock := mgr2.syncLockMap[lockName] + require.NotNil(t, lock) + _, poisoned := lock.(*poisonedLock) + assert.False(t, poisoned, "a stale hold must not poison the lock") + + // The stale hold was asserted, not re-acquired: only the surviving row + // is held. + holders, err := lock.getCurrentHolders() + require.NoError(t, err) + assert.Equal(t, []string{"default/live-holder"}, holders, "the stale hold must not be re-inserted into the database") + + // The surviving holder keeps its lock when it re-reconciles. + acquired, _, _, _, err = mgr2.TryAcquire(ctx, wf2, "", wf2.Spec.Synchronization) + require.NoError(t, err) + assert.True(t, acquired, "live-holder must keep its lock") + }) + } +} diff --git a/workflow/util/merge.go b/workflow/util/merge.go index d58b96f537b9..6a6b4005a5b3 100644 --- a/workflow/util/merge.go +++ b/workflow/util/merge.go @@ -53,6 +53,12 @@ func ValidateUserOverrides(userSpec *wfv1.WorkflowSpec) error { violations = append(violations, fieldName) } } + // ArtifactGC is allow-listed so that its benign fields (Strategy, + // ForceFinalizerRemoval) may be set, but its nested ServiceAccountName, + // PodSpecPatch and PodMetadata reach the artifact-GC Pod and would otherwise + // re-open the privilege escalation that the top-level ServiceAccountName / + // PodSpecPatch / PodMetadata blocks are meant to close, so reject them here. + violations = append(violations, artifactGCOverrideViolations(userSpec.ArtifactGC)...) if len(violations) > 0 { sort.Strings(violations) return fmt.Errorf("fields %v are not permitted when using workflowTemplateRef with templateReferencing restriction", violations) @@ -76,9 +82,50 @@ func SanitizeUserWorkflowSpec(userSpec *wfv1.WorkflowSpec) *wfv1.WorkflowSpec { dst.Field(i).Set(src.Field(i)) } } + // ArtifactGC is allow-listed wholesale above, which copies the pointer and + // so would carry through the user's ServiceAccountName/PodSpecPatch/PodMetadata. + // Strip those security-sensitive fields (on a copy, so the caller's spec is + // left untouched) while preserving the benign ones. + sanitized.ArtifactGC = sanitizeArtifactGC(sanitized.ArtifactGC) return sanitized } +// artifactGCOverrideViolations reports the security-sensitive fields set within a +// user-supplied workflow-level ArtifactGC. These reach the artifact-GC Pod and +// must not be user-controllable when a hardened WorkflowTemplate is referenced +// under Strict/Secure mode. +func artifactGCOverrideViolations(agc *wfv1.WorkflowLevelArtifactGC) []string { + if agc == nil { + return nil + } + var violations []string + if agc.PodSpecPatch != "" { + violations = append(violations, "ArtifactGC.PodSpecPatch") + } + if agc.ServiceAccountName != "" { + violations = append(violations, "ArtifactGC.ServiceAccountName") + } + if agc.PodMetadata != nil { + violations = append(violations, "ArtifactGC.PodMetadata") + } + return violations +} + +// sanitizeArtifactGC returns a deep copy of the workflow-level ArtifactGC with the +// security-sensitive override fields removed, leaving the benign fields (Strategy, +// ForceFinalizerRemoval) intact. A copy is returned so the caller's original spec +// is never mutated. +func sanitizeArtifactGC(agc *wfv1.WorkflowLevelArtifactGC) *wfv1.WorkflowLevelArtifactGC { + if agc == nil { + return nil + } + clean := agc.DeepCopy() + clean.PodSpecPatch = "" + clean.ServiceAccountName = "" + clean.PodMetadata = nil + return clean +} + // MergeTo will merge one workflow (the "patch" workflow) into another (the "target" workflow. // If the target workflow defines a field, this take precedence over the patch. func MergeTo(patch, target *wfv1.Workflow) error { diff --git a/workflow/util/merge_test.go b/workflow/util/merge_test.go index 6f0df688e9bd..8c84d22652ac 100644 --- a/workflow/util/merge_test.go +++ b/workflow/util/merge_test.go @@ -710,6 +710,92 @@ func TestSanitizeUserWorkflowSpec_Nil(t *testing.T) { assert.Nil(t, SanitizeUserWorkflowSpec(nil)) } +// TestValidateUserOverrides_ArtifactGCNestedFields guards against the +// GHSA-3775-99mw-8rp4 (CVE-2026-42296) bug class: ArtifactGC is allow-listed, +// but its nested ServiceAccountName/PodSpecPatch/PodMetadata reach the +// artifact-GC Pod and so must be rejected under Strict/Secure mode. +func TestValidateUserOverrides_ArtifactGCNestedFields(t *testing.T) { + t.Run("benign ArtifactGC fields are allowed", func(t *testing.T) { + spec := &wfv1.WorkflowSpec{ + WorkflowTemplateRef: &wfv1.WorkflowTemplateRef{Name: "my-template"}, + ArtifactGC: &wfv1.WorkflowLevelArtifactGC{ + ArtifactGC: wfv1.ArtifactGC{Strategy: wfv1.ArtifactGCOnWorkflowCompletion}, + ForceFinalizerRemoval: true, + }, + } + assert.NoError(t, ValidateUserOverrides(spec)) + }) + + tests := []struct { + name string + agc *wfv1.WorkflowLevelArtifactGC + field string + }{ + { + name: "ServiceAccountName", + agc: &wfv1.WorkflowLevelArtifactGC{ArtifactGC: wfv1.ArtifactGC{ServiceAccountName: "privileged"}}, + field: "ArtifactGC.ServiceAccountName", + }, + { + name: "PodSpecPatch", + agc: &wfv1.WorkflowLevelArtifactGC{PodSpecPatch: `{"containers":[{"name":"main","image":"attacker/x"}]}`}, + field: "ArtifactGC.PodSpecPatch", + }, + { + name: "PodMetadata", + agc: &wfv1.WorkflowLevelArtifactGC{ArtifactGC: wfv1.ArtifactGC{PodMetadata: &wfv1.Metadata{Labels: map[string]string{"a": "b"}}}}, + field: "ArtifactGC.PodMetadata", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + spec := &wfv1.WorkflowSpec{ + WorkflowTemplateRef: &wfv1.WorkflowTemplateRef{Name: "my-template"}, + ArtifactGC: tt.agc, + } + err := ValidateUserOverrides(spec) + require.Error(t, err) + assert.Contains(t, err.Error(), tt.field) + assert.Contains(t, err.Error(), "not permitted") + }) + } +} + +// TestSanitizeUserWorkflowSpec_ArtifactGC verifies defense-in-depth: the +// security-sensitive nested ArtifactGC fields are stripped while benign ones are +// kept, and the caller's original spec is not mutated. +func TestSanitizeUserWorkflowSpec_ArtifactGC(t *testing.T) { + spec := &wfv1.WorkflowSpec{ + WorkflowTemplateRef: &wfv1.WorkflowTemplateRef{Name: "my-template"}, + ArtifactGC: &wfv1.WorkflowLevelArtifactGC{ + ArtifactGC: wfv1.ArtifactGC{ + Strategy: wfv1.ArtifactGCOnWorkflowCompletion, + ServiceAccountName: "privileged", + PodMetadata: &wfv1.Metadata{Labels: map[string]string{"a": "b"}}, + }, + ForceFinalizerRemoval: true, + PodSpecPatch: `{"containers":[]}`, + }, + } + + sanitized := SanitizeUserWorkflowSpec(spec) + + // Benign fields are preserved. + require.NotNil(t, sanitized.ArtifactGC) + assert.Equal(t, wfv1.ArtifactGCOnWorkflowCompletion, sanitized.ArtifactGC.Strategy) + assert.True(t, sanitized.ArtifactGC.ForceFinalizerRemoval) + + // Security-sensitive fields are stripped. + assert.Empty(t, sanitized.ArtifactGC.ServiceAccountName) + assert.Empty(t, sanitized.ArtifactGC.PodSpecPatch) + assert.Nil(t, sanitized.ArtifactGC.PodMetadata) + + // The original spec must not be mutated. + assert.Equal(t, "privileged", spec.ArtifactGC.ServiceAccountName) + assert.JSONEq(t, `{"containers":[]}`, spec.ArtifactGC.PodSpecPatch) + assert.NotNil(t, spec.ArtifactGC.PodMetadata) +} + // TestAllWorkflowSpecFieldsAccountedFor is a compile-time safety net. // It ensures that every field in WorkflowSpec appears in either the // allowed or blocked list, so new fields force a conscious decision.