Skip to content

Commit 79c3cac

Browse files
Merge pull request #382 from codefresh-io/CF-1821-repo-sync-2
chore: sync release-3.7 branch from argoproj/argo-workflows
2 parents db327f0 + 05bea5f commit 79c3cac

19 files changed

Lines changed: 1070 additions & 172 deletions

ui/src/cron-workflows/cron-workflow-list.tsx

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -51,16 +51,13 @@ export function CronWorkflowList({match, location, history}: RouteComponentProps
5151

5252
// save history
5353
useEffect(() => {
54-
if (isFirstRender.current) {
55-
isFirstRender.current = false;
56-
return;
57-
}
58-
history.push(
54+
(isFirstRender.current ? history.replace : history.push)(
5955
historyUrl('cron-workflows' + (nsUtils.getManagedNamespace() ? '' : '/{namespace}'), {
6056
namespace,
6157
sidePanel
6258
})
6359
);
60+
isFirstRender.current = false;
6461
}, [namespace, sidePanel]);
6562

6663
// internal state

ui/src/workflow-templates/workflow-template-list.tsx

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -58,16 +58,13 @@ export function WorkflowTemplateList({match, location, history}: RouteComponentP
5858
);
5959

6060
useEffect(() => {
61-
if (isFirstRender.current) {
62-
isFirstRender.current = false;
63-
return;
64-
}
65-
history.push(
61+
(isFirstRender.current ? history.replace : history.push)(
6662
historyUrl('workflow-templates' + (nsUtils.getManagedNamespace() ? '' : '/{namespace}'), {
6763
namespace,
6864
sidePanel
6965
})
7066
);
67+
isFirstRender.current = false;
7168
}, [namespace, sidePanel]);
7269

7370
// internal state

ui/src/workflows/components/workflows-list/workflows-list.tsx

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -126,10 +126,6 @@ export function WorkflowsList({match, location, history}: RouteComponentProps<an
126126

127127
// save history and localStorage
128128
useEffect(() => {
129-
if (isFirstRender.current) {
130-
isFirstRender.current = false;
131-
return;
132-
}
133129
// add empty selectedPhases + selectedLabels for forward-compat w/ old version: previous code relies on them existing, so if you move up a version and back down, it breaks
134130
const options = {selectedPhases: [], selectedLabels: []} as unknown as WorkflowListRenderOptions;
135131
options.phases = phases;
@@ -157,7 +153,10 @@ export function WorkflowsList({match, location, history}: RouteComponentProps<an
157153
if (finishedBefore) {
158154
params.append('finishedBefore', finishedBefore.toISOString());
159155
}
160-
history.push(historyUrl('workflows' + (nsUtils.getManagedNamespace() ? '' : '/{namespace}'), {namespace, extraSearchParams: params}));
156+
(isFirstRender.current ? history.replace : history.push)(
157+
historyUrl('workflows' + (nsUtils.getManagedNamespace() ? '' : '/{namespace}'), {namespace, extraSearchParams: params})
158+
);
159+
isFirstRender.current = false;
161160
}, [namespace, phases.toString(), labels.toString(), pagination.limit, pagination.offset, nameValue, nameFilter, createdAfter, finishedBefore]); // referential equality, so use values, not refs
162161

163162
useEffect(() => {

workflow/artifacts/s3/errors.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
package s3
22

33
import (
4+
stderrors "errors"
5+
6+
"github.com/minio/minio-go/v7"
47
log "github.com/sirupsen/logrus"
58

69
"github.com/argoproj/argo-workflows/v3/util/errors"
@@ -31,5 +34,15 @@ func isTransientS3Err(err error) bool {
3134
return true
3235
}
3336
}
37+
// When the response body is not a parsable S3 XML document (e.g. a proxy
38+
// or load balancer returned a bare 5xx response), minio-go sets Code to
39+
// the raw HTTP status string ("503 Service Unavailable"), which does not
40+
// match any entry in s3TransientErrorCodes. Fall back to StatusCode so
41+
// 5xx responses are still treated as transient per S3 retry semantics.
42+
var minioErr minio.ErrorResponse
43+
if stderrors.As(err, &minioErr) && minioErr.StatusCode >= 500 && minioErr.StatusCode < 600 {
44+
log.Errorf("Transient S3 error: %v", err)
45+
return true
46+
}
3447
return errors.IsTransientErr(err)
3548
}

workflow/artifacts/s3/errors_test.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,3 +23,16 @@ func TestIsTransientOSSErr(t *testing.T) {
2323
requestErr := minio.ErrorResponse{Code: "RequestError"}
2424
assert.True(t, isTransientS3Err(requestErr))
2525
}
26+
27+
func TestIsTransientS3Err_BareHTTPStatus(t *testing.T) {
28+
// minio-go falls back to resp.Status as Code when the error body is not
29+
// parsable S3 XML (e.g. a load balancer returned a plain 5xx response).
30+
bare503 := minio.ErrorResponse{Code: "503 Service Unavailable", StatusCode: 503}
31+
assert.True(t, isTransientS3Err(bare503))
32+
33+
bare500 := minio.ErrorResponse{Code: "500 Internal Server Error", StatusCode: 500}
34+
assert.True(t, isTransientS3Err(bare500))
35+
36+
bare404 := minio.ErrorResponse{Code: "404 Not Found", StatusCode: 404}
37+
assert.False(t, isTransientS3Err(bare404))
38+
}

workflow/controller/controller.go

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -458,7 +458,23 @@ func (wfc *WorkflowController) initManagers(ctx context.Context) error {
458458
return err
459459
}
460460

461-
wfc.syncManager.Initialize(ctx, wfList.Items)
461+
// A non-nil error means a recorded lock holder could not be re-established
462+
// (undecodable lock name, or an unavailable database session). This is fatal
463+
// by design: we fail closed rather than risk a silent double-acquire.
464+
staleHolds, err := wfc.syncManager.Initialize(ctx, wfList.Items)
465+
if err != nil {
466+
return err
467+
}
468+
// Stale holds are workflows whose recorded hold on a database-backed lock
469+
// the database no longer has (e.g. expired while the controller was down,
470+
// possibly acquired by someone else since). The database is the source of
471+
// truth, so these workflows must not keep running on a hold it does not
472+
// back: fail them; persistUpdates releases any locks they still hold.
473+
for _, stale := range staleHolds {
474+
woc := newWorkflowOperationCtx(stale.WF, wfc)
475+
woc.markWorkflowFailed(ctx, fmt.Sprintf("Failed to re-establish synchronization lock at controller startup: %s", stale.Reason))
476+
woc.persistUpdates(ctx)
477+
}
462478

463479
if err := wfc.throttler.Init(wfList.Items); err != nil {
464480
return err

workflow/controller/dag.go

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -280,9 +280,11 @@ func (woc *wfOperationCtx) executeDAG(ctx context.Context, nodeName string, tmpl
280280
for _, taskName := range targetTasks {
281281
woc.executeDAGTask(ctx, dagCtx, taskName)
282282

283-
// It is possible that target tasks are not reconsidered (i.e. executeDAGTask is not called on them) once they are
284-
// complete (since the DAG itself will have succeeded). To ensure that their exit handlers are run we also run them here. Note that
285-
// calls to runOnExitNode are idempotent: it is fine if they are called more than once for the same task.
283+
// The exit hook for each target task is started by executeDAGTask -> processTask.
284+
// We only inspect the onExit node's status here to decide whether the DAG can be
285+
// considered complete; calling runOnExitNode (and therefore executeTemplate) a second
286+
// time on the same onExit node would re-run checkParallelism against the count this
287+
// very pass just bumped.
286288
taskNode := dagCtx.getTaskNode(taskName)
287289

288290
if taskNode != nil {
@@ -298,15 +300,10 @@ func (woc *wfOperationCtx) executeDAG(ctx context.Context, nodeName string, tmpl
298300
woc.markNodeError(node.Name, err)
299301
return node, err
300302
}
301-
if taskNode.Fulfilled() {
302-
if taskNode.Completed() {
303-
hasOnExitNode, onExitNode, err := woc.runOnExitNode(ctx, dagCtx.GetTask(taskName).GetExitHook(woc.execWf.Spec.Arguments), taskNode, dagCtx.boundaryID, dagCtx.tmplCtx, "tasks."+taskName, scope)
304-
if err != nil {
305-
return node, err
306-
}
307-
if hasOnExitNode && (onExitNode == nil || !onExitNode.Fulfilled()) {
308-
onExitCompleted = false
309-
}
303+
if taskNode.Fulfilled() && taskNode.Completed() {
304+
onExitNodeName := common.GenerateOnExitNodeName(taskNode.Name)
305+
if onExitNode, onExitErr := woc.wf.GetNodeByName(onExitNodeName); onExitErr == nil && onExitNode != nil && !onExitNode.Fulfilled() {
306+
onExitCompleted = false
310307
}
311308
}
312309
}

workflow/sync/common.go

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,23 @@ import (
55
)
66

77
type semaphore interface {
8-
acquire(holderKey string, tx *transaction) bool
8+
acquire(holderKey string, tx *transaction) (bool, error)
9+
// reacquire re-establishes a recorded holder at controller startup.
10+
//
11+
// For an in-memory lock it force-registers the holder, ignoring the current
12+
// limit, so the in-memory count reflects persisted reality even when recorded
13+
// holders exceed a (since lowered) limit - new acquisitions then correctly
14+
// wait until the count drains below the limit, rather than dropping a holder
15+
// (a double-acquire) or poisoning the lock over a routine limit change.
16+
//
17+
// For a database-backed lock the database is the single source of truth:
18+
// reacquire mutates nothing and only asserts the recorded hold still exists
19+
// there. An error means the hold could not be verified - either the held row
20+
// is gone (e.g. expired while the controller was down) or the database could
21+
// not be queried - and the caller fails the holding workflow.
22+
reacquire(holderKey string, tx *transaction) error
923
checkAcquire(holderKey string, tx *transaction) (bool, bool, string)
10-
tryAcquire(holderKey string, tx *transaction) (bool, string)
24+
tryAcquire(holderKey string, tx *transaction) (bool, string, error)
1125
release(key string) bool
1226
addToQueue(holderKey string, priority int32, creationTime time.Time) error
1327
removeFromQueue(holderKey string) error

workflow/sync/database_mutex_test.go

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,18 +39,21 @@ func TestDatabaseMutexAcquireRelease(t *testing.T) {
3939
require.NoError(t, mutex.addToQueue("default/workflow2", 0, now.Add(time.Second)))
4040

4141
// First acquisition should succeed
42-
acquired, _ := mutex.tryAcquire("default/workflow1", tx)
42+
acquired, _, err := mutex.tryAcquire("default/workflow1", tx)
43+
require.NoError(t, err)
4344
assert.True(t, acquired, "First acquisition should succeed")
4445

4546
// Second acquisition should fail
46-
acquired, _ = mutex.tryAcquire("default/workflow2", tx)
47+
acquired, _, err = mutex.tryAcquire("default/workflow2", tx)
48+
require.NoError(t, err)
4749
assert.False(t, acquired, "Second acquisition should fail")
4850

4951
// Release the mutex
5052
mutex.release("default/workflow1")
5153

5254
// Now acquisition should succeed again
53-
acquired, _ = mutex.tryAcquire("default/workflow2", tx)
55+
acquired, _, err = mutex.tryAcquire("default/workflow2", tx)
56+
require.NoError(t, err)
5457
assert.True(t, acquired, "Acquisition after release should succeed")
5558
})
5659
}
@@ -73,11 +76,13 @@ func TestDatabaseMutexQueueOrder(t *testing.T) {
7376
require.NoError(t, mutex.addToQueue("default/workflow1", 0, now))
7477
require.NoError(t, mutex.addToQueue("default/workflow2", 0, now.Add(time.Second)))
7578

76-
acquired, _ := mutex.tryAcquire("default/workflow2", tx)
79+
acquired, _, err := mutex.tryAcquire("default/workflow2", tx)
80+
require.NoError(t, err)
7781
assert.False(t, acquired, "Second workflow should not acquire the mutex")
7882

7983
// Acquire the first one
80-
acquired, _ = mutex.tryAcquire("default/workflow1", tx)
84+
acquired, _, err = mutex.tryAcquire("default/workflow1", tx)
85+
require.NoError(t, err)
8186
assert.True(t, acquired, "First workflow should acquire the mutex")
8287

8388
// Release it - this should notify the next one

workflow/sync/database_semaphore.go

Lines changed: 34 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -416,12 +416,12 @@ func (s *databaseSemaphore) checkAcquire(holderKey string, tx *transaction) (boo
416416
return true, false, ""
417417
}
418418

419-
func (s *databaseSemaphore) acquire(holderKey string, tx *transaction) bool {
419+
func (s *databaseSemaphore) acquire(holderKey string, tx *transaction) (bool, error) {
420420
limit := s.getLimit()
421421
existing, err := s.currentHoldersSession(*tx.db)
422422
if err != nil {
423423
s.log.WithField("key", holderKey).WithError(err).Error("Failed to acquire lock")
424-
return false
424+
return false, err
425425
}
426426
if len(existing) < limit {
427427
var pending []stateRecord
@@ -435,7 +435,7 @@ func (s *databaseSemaphore) acquire(holderKey string, tx *transaction) bool {
435435
All(&pending)
436436
if err != nil {
437437
s.log.WithField("key", holderKey).WithError(err).Error("Failed to acquire lock")
438-
return false
438+
return false, err
439439
}
440440
if len(pending) > 0 {
441441
_, err := (*tx.db).SQL().Update(s.info.config.stateTable).
@@ -447,7 +447,7 @@ func (s *databaseSemaphore) acquire(holderKey string, tx *transaction) bool {
447447
Exec()
448448
if err != nil {
449449
s.log.WithField("key", holderKey).WithError(err).Error("Failed to acquire lock")
450-
return false
450+
return false, err
451451
}
452452
} else {
453453
record := &stateRecord{
@@ -459,14 +459,14 @@ func (s *databaseSemaphore) acquire(holderKey string, tx *transaction) bool {
459459
_, err := (*tx.db).Collection(s.info.config.stateTable).Insert(record)
460460
if err != nil {
461461
s.log.WithField("key", holderKey).WithError(err).Error("Failed to acquire lock")
462-
return false
462+
return false, err
463463
}
464464
}
465465
s.log.WithFields(log.Fields{
466466
"key": holderKey,
467467
"result": true,
468468
}).Info("Acquire succeeded")
469-
return true
469+
return true, nil
470470
}
471471
s.log.WithFields(log.Fields{
472472
"key": holderKey,
@@ -475,41 +475,62 @@ func (s *databaseSemaphore) acquire(holderKey string, tx *transaction) bool {
475475
"current_holders": len(existing),
476476
"limit": limit,
477477
}).Info("Acquire failed")
478-
return false
478+
return false, nil
479+
}
480+
481+
// reacquire asserts at startup that the recorded holder still holds this lock
482+
// in the database. The database is the single source of truth for a
483+
// database-backed lock: the held row is durable and survives the controller
484+
// restart, so nothing is inserted or mutated here. A missing row means the
485+
// hold no longer exists - e.g. it was expired by ExpireInactiveLocks while the
486+
// controller was down and may since have been acquired by another holder - so
487+
// the workflow's recorded hold is stale and the caller fails the workflow
488+
// rather than resurrect a hold the database does not back.
489+
func (s *databaseSemaphore) reacquire(holderKey string, tx *transaction) error {
490+
holders, err := s.currentHoldersSession(*tx.db)
491+
if err != nil {
492+
return fmt.Errorf("could not verify hold on %s for %s: %w", s.longDBKey(), holderKey, err)
493+
}
494+
if !slices.Contains(holders, holderKey) {
495+
return fmt.Errorf("hold on %s for %s is not present in the database", s.longDBKey(), holderKey)
496+
}
497+
return nil
479498
}
480499

481-
func (s *databaseSemaphore) tryAcquire(holderKey string, tx *transaction) (bool, string) {
500+
func (s *databaseSemaphore) tryAcquire(holderKey string, tx *transaction) (bool, string, error) {
482501
acq, already, msg := s.checkAcquire(holderKey, tx)
483502
if already {
484503
s.log.WithFields(log.Fields{
485504
"key": holderKey,
486505
"result": true,
487506
"message": msg,
488507
}).Info("tryAcquire - already held")
489-
return true, msg
508+
return true, msg, nil
490509
}
491510
if !acq {
492511
s.log.WithFields(log.Fields{
493512
"key": holderKey,
494513
"result": false,
495514
"message": msg,
496515
}).Info("tryAcquire - cannot acquire")
497-
return false, msg
516+
return false, msg, nil
498517
}
499-
if s.acquire(holderKey, tx) {
518+
acquired, err := s.acquire(holderKey, tx)
519+
if acquired {
500520
s.log.WithFields(log.Fields{
501521
"key": holderKey,
502522
"result": true,
503523
}).Info("tryAcquire succeeded")
504524
s.notifyWaiters()
505-
return true, ""
525+
return true, "", nil
506526
}
507527
s.log.WithFields(log.Fields{
508528
"key": holderKey,
509529
"result": false,
510530
"message": msg,
531+
"error": err,
511532
}).Info("tryAcquire failed")
512-
return false, msg
533+
return false, msg, err
513534
}
514535

515536
func (s *databaseSemaphore) expireLocks() {

0 commit comments

Comments
 (0)