Skip to content

Commit 8cbe238

Browse files
Joibelisubasinghe
andauthored
fix: address semaphore/mutex unsoundness for Initalize (cherry-pick argoproj#16160 for 3.7) (argoproj#16254)
Signed-off-by: isubasinghe <isitha@pipekit.io> Signed-off-by: Alan Clucas <alan@clucas.org> Co-authored-by: Isitha Subasinghe <isitha@pipekit.io>
1 parent d67c1c4 commit 8cbe238

8 files changed

Lines changed: 776 additions & 57 deletions

File tree

workflow/controller/controller.go

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -458,7 +458,23 @@ func (wfc *WorkflowController) initManagers(ctx context.Context) error {
458458
return err
459459
}
460460

461-
wfc.syncManager.Initialize(ctx, wfList.Items)
461+
// A non-nil error means a recorded lock holder could not be re-established
462+
// (undecodable lock name, or an unavailable database session). This is fatal
463+
// by design: we fail closed rather than risk a silent double-acquire.
464+
staleHolds, err := wfc.syncManager.Initialize(ctx, wfList.Items)
465+
if err != nil {
466+
return err
467+
}
468+
// Stale holds are workflows whose recorded hold on a database-backed lock
469+
// the database no longer has (e.g. expired while the controller was down,
470+
// possibly acquired by someone else since). The database is the source of
471+
// truth, so these workflows must not keep running on a hold it does not
472+
// back: fail them; persistUpdates releases any locks they still hold.
473+
for _, stale := range staleHolds {
474+
woc := newWorkflowOperationCtx(stale.WF, wfc)
475+
woc.markWorkflowFailed(ctx, fmt.Sprintf("Failed to re-establish synchronization lock at controller startup: %s", stale.Reason))
476+
woc.persistUpdates(ctx)
477+
}
462478

463479
if err := wfc.throttler.Init(wfList.Items); err != nil {
464480
return err

workflow/sync/common.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,20 @@ import (
66

77
type semaphore interface {
88
acquire(holderKey string, tx *transaction) (bool, error)
9+
// reacquire re-establishes a recorded holder at controller startup.
10+
//
11+
// For an in-memory lock it force-registers the holder, ignoring the current
12+
// limit, so the in-memory count reflects persisted reality even when recorded
13+
// holders exceed a (since lowered) limit - new acquisitions then correctly
14+
// wait until the count drains below the limit, rather than dropping a holder
15+
// (a double-acquire) or poisoning the lock over a routine limit change.
16+
//
17+
// For a database-backed lock the database is the single source of truth:
18+
// reacquire mutates nothing and only asserts the recorded hold still exists
19+
// there. An error means the hold could not be verified - either the held row
20+
// is gone (e.g. expired while the controller was down) or the database could
21+
// not be queried - and the caller fails the holding workflow.
22+
reacquire(holderKey string, tx *transaction) error
923
checkAcquire(holderKey string, tx *transaction) (bool, bool, string)
1024
tryAcquire(holderKey string, tx *transaction) (bool, string, error)
1125
release(key string) bool

workflow/sync/database_semaphore.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -478,6 +478,25 @@ func (s *databaseSemaphore) acquire(holderKey string, tx *transaction) (bool, er
478478
return false, nil
479479
}
480480

481+
// reacquire asserts at startup that the recorded holder still holds this lock
482+
// in the database. The database is the single source of truth for a
483+
// database-backed lock: the held row is durable and survives the controller
484+
// restart, so nothing is inserted or mutated here. A missing row means the
485+
// hold no longer exists - e.g. it was expired by ExpireInactiveLocks while the
486+
// controller was down and may since have been acquired by another holder - so
487+
// the workflow's recorded hold is stale and the caller fails the workflow
488+
// rather than resurrect a hold the database does not back.
489+
func (s *databaseSemaphore) reacquire(holderKey string, tx *transaction) error {
490+
holders, err := s.currentHoldersSession(*tx.db)
491+
if err != nil {
492+
return fmt.Errorf("could not verify hold on %s for %s: %w", s.longDBKey(), holderKey, err)
493+
}
494+
if !slices.Contains(holders, holderKey) {
495+
return fmt.Errorf("hold on %s for %s is not present in the database", s.longDBKey(), holderKey)
496+
}
497+
return nil
498+
}
499+
481500
func (s *databaseSemaphore) tryAcquire(holderKey string, tx *transaction) (bool, string, error) {
482501
acq, already, msg := s.checkAcquire(holderKey, tx)
483502
if already {

workflow/sync/mutex_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,9 @@ func TestMutexLock(t *testing.T) {
122122
ctx := context.Background()
123123
wfList, err := wfclientset.ArgoprojV1alpha1().Workflows("default").List(ctx, metav1.ListOptions{})
124124
require.NoError(t, err)
125-
syncManager.Initialize(ctx, wfList.Items)
125+
staleHolds, err := syncManager.Initialize(ctx, wfList.Items)
126+
require.NoError(t, err)
127+
require.Empty(t, staleHolds)
126128
assert.Len(t, syncManager.syncLockMap, 1)
127129
})
128130
t.Run("WfLevelMutexAcquireAndRelease", func(t *testing.T) {

workflow/sync/poison.go

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
package sync
2+
3+
import (
4+
"fmt"
5+
"time"
6+
)
7+
8+
// poisonedLock is a sentinel lock installed into the Manager's syncLockMap when,
9+
// during Initialize, the controller cannot re-establish a holder that a Running
10+
// workflow's status claims to hold.
11+
//
12+
// The soundness invariant is: if a Workflow's status records that it is holding
13+
// a lock, the in-memory lock map must reflect that hold after Initialize.
14+
// Otherwise a racing workflow's TryAcquire would find the lock absent, create a
15+
// fresh one, and acquire a lock that is - per persisted state - already held.
16+
// For a mutex that means two workflows running concurrently under the same
17+
// mutex.
18+
//
19+
// Rather than silently dropping the holder (the previous behaviour), we install
20+
// this lock, which refuses every acquire and reports a poisoned-state message.
21+
// That message surfaces on the waiting node's synchronization status, marking
22+
// the node/workflow as blocked by a poisoned lock so an operator can intervene.
23+
//
24+
// The poison is in-memory only and is cleared on the next controller restart,
25+
// at which point Initialize re-evaluates: if the offending workflow is no longer
26+
// Running the lock is recreated clean; if it is still Running and still
27+
// unresolvable, it is poisoned again.
28+
type poisonedLock struct {
29+
name string
30+
reason string
31+
}
32+
33+
var _ semaphore = &poisonedLock{}
34+
35+
func newPoisonedLock(name, reason string) *poisonedLock {
36+
return &poisonedLock{name: name, reason: reason}
37+
}
38+
39+
func (p *poisonedLock) message() string {
40+
return fmt.Sprintf("lock %s is in a poisoned state: %s; manual intervention required", p.name, p.reason)
41+
}
42+
43+
func (p *poisonedLock) acquire(_ string, _ *transaction) (bool, error) {
44+
return false, nil
45+
}
46+
47+
// reacquire is a no-op: a poisoned lock refuses all holds until restart. It
48+
// returns nil because the poison already protects the recorded hold; failing
49+
// the holding workflow on top of that would punish it for an unrelated
50+
// holder's poisoning.
51+
func (p *poisonedLock) reacquire(_ string, _ *transaction) error {
52+
return nil
53+
}
54+
55+
func (p *poisonedLock) checkAcquire(_ string, _ *transaction) (bool, bool, string) {
56+
return false, false, p.message()
57+
}
58+
59+
func (p *poisonedLock) tryAcquire(_ string, _ *transaction) (bool, string, error) {
60+
return false, p.message(), nil
61+
}
62+
63+
func (p *poisonedLock) release(_ string) bool { return false }
64+
65+
func (p *poisonedLock) addToQueue(_ string, _ int32, _ time.Time) error {
66+
return nil
67+
}
68+
69+
func (p *poisonedLock) removeFromQueue(_ string) error { return nil }
70+
71+
func (p *poisonedLock) getCurrentHolders() ([]string, error) { return nil, nil }
72+
73+
func (p *poisonedLock) getCurrentPending() ([]string, error) { return nil, nil }
74+
75+
func (p *poisonedLock) getName() string { return p.name }
76+
77+
func (p *poisonedLock) getLimit() int { return 0 }
78+
79+
func (p *poisonedLock) probeWaiting() {}
80+
81+
// lock returns true so that tryAcquireImpl proceeds to checkAcquire, which
82+
// returns the poisoned-state message rather than a generic "failed to lock()".
83+
func (p *poisonedLock) lock() bool { return true }
84+
85+
func (p *poisonedLock) unlock() {}

workflow/sync/semaphore.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,24 @@ func (s *prioritySemaphore) acquire(holderKey string, _ *transaction) (bool, err
168168
return false, nil
169169
}
170170

171+
// reacquire re-establishes a recorded holder at startup, ignoring the limit. It
172+
// always registers the holder, even when the recorded holders already exceed the
173+
// current limit (e.g. the limit was lowered while held). The weighted semaphore
174+
// is capped at the limit, so a slot is only taken when one is free; the excess is
175+
// tracked solely in lockHolder, exactly as a downward resize leaves it. release()
176+
// already tolerates len(lockHolder) > limit and only frees a weighted slot once
177+
// the count drops below the limit, so new acquisitions wait until every recorded
178+
// holder has drained. It never fails: the in-memory map is the source of truth
179+
// here, so registering the holder is always possible.
180+
func (s *prioritySemaphore) reacquire(holderKey string, _ *transaction) error {
181+
if _, ok := s.lockHolder[holderKey]; ok {
182+
return nil
183+
}
184+
s.semaphore.TryAcquire(1) // best effort: take a slot if one is free
185+
s.lockHolder[holderKey] = true
186+
return nil
187+
}
188+
171189
func isSameWorkflowNodeKeys(firstKey, secondKey string) bool {
172190
firstItems := strings.Split(firstKey, "/")
173191
secondItems := strings.Split(secondKey, "/")

0 commit comments

Comments
 (0)