Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 2 additions & 5 deletions ui/src/cron-workflows/cron-workflow-list.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -51,16 +51,13 @@ export function CronWorkflowList({match, location, history}: RouteComponentProps

// save history
useEffect(() => {
if (isFirstRender.current) {
isFirstRender.current = false;
return;
}
history.push(
(isFirstRender.current ? history.replace : history.push)(
historyUrl('cron-workflows' + (nsUtils.getManagedNamespace() ? '' : '/{namespace}'), {
namespace,
sidePanel
})
);
isFirstRender.current = false;
}, [namespace, sidePanel]);

// internal state
Expand Down
7 changes: 2 additions & 5 deletions ui/src/workflow-templates/workflow-template-list.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -58,16 +58,13 @@ export function WorkflowTemplateList({match, location, history}: RouteComponentP
);

useEffect(() => {
if (isFirstRender.current) {
isFirstRender.current = false;
return;
}
history.push(
(isFirstRender.current ? history.replace : history.push)(
historyUrl('workflow-templates' + (nsUtils.getManagedNamespace() ? '' : '/{namespace}'), {
namespace,
sidePanel
})
);
isFirstRender.current = false;
}, [namespace, sidePanel]);

// internal state
Expand Down
9 changes: 4 additions & 5 deletions ui/src/workflows/components/workflows-list/workflows-list.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -126,10 +126,6 @@ export function WorkflowsList({match, location, history}: RouteComponentProps<an

// save history and localStorage
useEffect(() => {
if (isFirstRender.current) {
isFirstRender.current = false;
return;
}
// add empty selectedPhases + selectedLabels for forward-compat w/ old version: previous code relies on them existing, so if you move up a version and back down, it breaks
const options = {selectedPhases: [], selectedLabels: []} as unknown as WorkflowListRenderOptions;
options.phases = phases;
Expand Down Expand Up @@ -157,7 +153,10 @@ export function WorkflowsList({match, location, history}: RouteComponentProps<an
if (finishedBefore) {
params.append('finishedBefore', finishedBefore.toISOString());
}
history.push(historyUrl('workflows' + (nsUtils.getManagedNamespace() ? '' : '/{namespace}'), {namespace, extraSearchParams: params}));
(isFirstRender.current ? history.replace : history.push)(
historyUrl('workflows' + (nsUtils.getManagedNamespace() ? '' : '/{namespace}'), {namespace, extraSearchParams: params})
);
isFirstRender.current = false;
}, [namespace, phases.toString(), labels.toString(), pagination.limit, pagination.offset, nameValue, nameFilter, createdAfter, finishedBefore]); // referential equality, so use values, not refs

useEffect(() => {
Expand Down
13 changes: 13 additions & 0 deletions workflow/artifacts/s3/errors.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package s3

import (
stderrors "errors"

"github.com/minio/minio-go/v7"
log "github.com/sirupsen/logrus"

"github.com/argoproj/argo-workflows/v3/util/errors"
Expand Down Expand Up @@ -31,5 +34,15 @@ func isTransientS3Err(err error) bool {
return true
}
}
// When the response body is not a parsable S3 XML document (e.g. a proxy
// or load balancer returned a bare 5xx response), minio-go sets Code to
// the raw HTTP status string ("503 Service Unavailable"), which does not
// match any entry in s3TransientErrorCodes. Fall back to StatusCode so
// 5xx responses are still treated as transient per S3 retry semantics.
var minioErr minio.ErrorResponse
if stderrors.As(err, &minioErr) && minioErr.StatusCode >= 500 && minioErr.StatusCode < 600 {
log.Errorf("Transient S3 error: %v", err)
return true
}
return errors.IsTransientErr(err)
}
13 changes: 13 additions & 0 deletions workflow/artifacts/s3/errors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,16 @@ func TestIsTransientOSSErr(t *testing.T) {
requestErr := minio.ErrorResponse{Code: "RequestError"}
assert.True(t, isTransientS3Err(requestErr))
}

func TestIsTransientS3Err_BareHTTPStatus(t *testing.T) {
// minio-go falls back to resp.Status as Code when the error body is not
// parsable S3 XML (e.g. a load balancer returned a plain 5xx response).
bare503 := minio.ErrorResponse{Code: "503 Service Unavailable", StatusCode: 503}
assert.True(t, isTransientS3Err(bare503))

bare500 := minio.ErrorResponse{Code: "500 Internal Server Error", StatusCode: 500}
assert.True(t, isTransientS3Err(bare500))

bare404 := minio.ErrorResponse{Code: "404 Not Found", StatusCode: 404}
assert.False(t, isTransientS3Err(bare404))
}
18 changes: 17 additions & 1 deletion workflow/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,23 @@ func (wfc *WorkflowController) initManagers(ctx context.Context) error {
return err
}

wfc.syncManager.Initialize(ctx, wfList.Items)
// A non-nil error means a recorded lock holder could not be re-established
// (undecodable lock name, or an unavailable database session). This is fatal
// by design: we fail closed rather than risk a silent double-acquire.
staleHolds, err := wfc.syncManager.Initialize(ctx, wfList.Items)
if err != nil {
return err
}
// Stale holds are workflows whose recorded hold on a database-backed lock
// the database no longer has (e.g. expired while the controller was down,
// possibly acquired by someone else since). The database is the source of
// truth, so these workflows must not keep running on a hold it does not
// back: fail them; persistUpdates releases any locks they still hold.
for _, stale := range staleHolds {
woc := newWorkflowOperationCtx(stale.WF, wfc)
woc.markWorkflowFailed(ctx, fmt.Sprintf("Failed to re-establish synchronization lock at controller startup: %s", stale.Reason))
woc.persistUpdates(ctx)
}

if err := wfc.throttler.Init(wfList.Items); err != nil {
return err
Expand Down
21 changes: 9 additions & 12 deletions workflow/controller/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,9 +280,11 @@ func (woc *wfOperationCtx) executeDAG(ctx context.Context, nodeName string, tmpl
for _, taskName := range targetTasks {
woc.executeDAGTask(ctx, dagCtx, taskName)

// It is possible that target tasks are not reconsidered (i.e. executeDAGTask is not called on them) once they are
// complete (since the DAG itself will have succeeded). To ensure that their exit handlers are run we also run them here. Note that
// calls to runOnExitNode are idempotent: it is fine if they are called more than once for the same task.
// The exit hook for each target task is started by executeDAGTask -> processTask.
// We only inspect the onExit node's status here to decide whether the DAG can be
// considered complete; calling runOnExitNode (and therefore executeTemplate) a second
// time on the same onExit node would re-run checkParallelism against the count this
// very pass just bumped.
taskNode := dagCtx.getTaskNode(taskName)

if taskNode != nil {
Expand All @@ -298,15 +300,10 @@ func (woc *wfOperationCtx) executeDAG(ctx context.Context, nodeName string, tmpl
woc.markNodeError(node.Name, err)
return node, err
}
if taskNode.Fulfilled() {
if taskNode.Completed() {
hasOnExitNode, onExitNode, err := woc.runOnExitNode(ctx, dagCtx.GetTask(taskName).GetExitHook(woc.execWf.Spec.Arguments), taskNode, dagCtx.boundaryID, dagCtx.tmplCtx, "tasks."+taskName, scope)
if err != nil {
return node, err
}
if hasOnExitNode && (onExitNode == nil || !onExitNode.Fulfilled()) {
onExitCompleted = false
}
if taskNode.Fulfilled() && taskNode.Completed() {
onExitNodeName := common.GenerateOnExitNodeName(taskNode.Name)
if onExitNode, onExitErr := woc.wf.GetNodeByName(onExitNodeName); onExitErr == nil && onExitNode != nil && !onExitNode.Fulfilled() {
onExitCompleted = false
}
}
}
Expand Down
18 changes: 16 additions & 2 deletions workflow/sync/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,23 @@ import (
)

type semaphore interface {
acquire(holderKey string, tx *transaction) bool
acquire(holderKey string, tx *transaction) (bool, error)
// reacquire re-establishes a recorded holder at controller startup.
//
// For an in-memory lock it force-registers the holder, ignoring the current
// limit, so the in-memory count reflects persisted reality even when recorded
// holders exceed a (since lowered) limit - new acquisitions then correctly
// wait until the count drains below the limit, rather than dropping a holder
// (a double-acquire) or poisoning the lock over a routine limit change.
//
// For a database-backed lock the database is the single source of truth:
// reacquire mutates nothing and only asserts the recorded hold still exists
// there. An error means the hold could not be verified - either the held row
// is gone (e.g. expired while the controller was down) or the database could
// not be queried - and the caller fails the holding workflow.
reacquire(holderKey string, tx *transaction) error
checkAcquire(holderKey string, tx *transaction) (bool, bool, string)
tryAcquire(holderKey string, tx *transaction) (bool, string)
tryAcquire(holderKey string, tx *transaction) (bool, string, error)
release(key string) bool
addToQueue(holderKey string, priority int32, creationTime time.Time) error
removeFromQueue(holderKey string) error
Expand Down
15 changes: 10 additions & 5 deletions workflow/sync/database_mutex_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,18 +39,21 @@ func TestDatabaseMutexAcquireRelease(t *testing.T) {
require.NoError(t, mutex.addToQueue("default/workflow2", 0, now.Add(time.Second)))

// First acquisition should succeed
acquired, _ := mutex.tryAcquire("default/workflow1", tx)
acquired, _, err := mutex.tryAcquire("default/workflow1", tx)
require.NoError(t, err)
assert.True(t, acquired, "First acquisition should succeed")

// Second acquisition should fail
acquired, _ = mutex.tryAcquire("default/workflow2", tx)
acquired, _, err = mutex.tryAcquire("default/workflow2", tx)
require.NoError(t, err)
assert.False(t, acquired, "Second acquisition should fail")

// Release the mutex
mutex.release("default/workflow1")

// Now acquisition should succeed again
acquired, _ = mutex.tryAcquire("default/workflow2", tx)
acquired, _, err = mutex.tryAcquire("default/workflow2", tx)
require.NoError(t, err)
assert.True(t, acquired, "Acquisition after release should succeed")
})
}
Expand All @@ -73,11 +76,13 @@ func TestDatabaseMutexQueueOrder(t *testing.T) {
require.NoError(t, mutex.addToQueue("default/workflow1", 0, now))
require.NoError(t, mutex.addToQueue("default/workflow2", 0, now.Add(time.Second)))

acquired, _ := mutex.tryAcquire("default/workflow2", tx)
acquired, _, err := mutex.tryAcquire("default/workflow2", tx)
require.NoError(t, err)
assert.False(t, acquired, "Second workflow should not acquire the mutex")

// Acquire the first one
acquired, _ = mutex.tryAcquire("default/workflow1", tx)
acquired, _, err = mutex.tryAcquire("default/workflow1", tx)
require.NoError(t, err)
assert.True(t, acquired, "First workflow should acquire the mutex")

// Release it - this should notify the next one
Expand Down
47 changes: 34 additions & 13 deletions workflow/sync/database_semaphore.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,12 +416,12 @@ func (s *databaseSemaphore) checkAcquire(holderKey string, tx *transaction) (boo
return true, false, ""
}

func (s *databaseSemaphore) acquire(holderKey string, tx *transaction) bool {
func (s *databaseSemaphore) acquire(holderKey string, tx *transaction) (bool, error) {
limit := s.getLimit()
existing, err := s.currentHoldersSession(*tx.db)
if err != nil {
s.log.WithField("key", holderKey).WithError(err).Error("Failed to acquire lock")
return false
return false, err
}
if len(existing) < limit {
var pending []stateRecord
Expand All @@ -435,7 +435,7 @@ func (s *databaseSemaphore) acquire(holderKey string, tx *transaction) bool {
All(&pending)
if err != nil {
s.log.WithField("key", holderKey).WithError(err).Error("Failed to acquire lock")
return false
return false, err
}
if len(pending) > 0 {
_, err := (*tx.db).SQL().Update(s.info.config.stateTable).
Expand All @@ -447,7 +447,7 @@ func (s *databaseSemaphore) acquire(holderKey string, tx *transaction) bool {
Exec()
if err != nil {
s.log.WithField("key", holderKey).WithError(err).Error("Failed to acquire lock")
return false
return false, err
}
} else {
record := &stateRecord{
Expand All @@ -459,14 +459,14 @@ func (s *databaseSemaphore) acquire(holderKey string, tx *transaction) bool {
_, err := (*tx.db).Collection(s.info.config.stateTable).Insert(record)
if err != nil {
s.log.WithField("key", holderKey).WithError(err).Error("Failed to acquire lock")
return false
return false, err
}
}
s.log.WithFields(log.Fields{
"key": holderKey,
"result": true,
}).Info("Acquire succeeded")
return true
return true, nil
}
s.log.WithFields(log.Fields{
"key": holderKey,
Expand All @@ -475,41 +475,62 @@ func (s *databaseSemaphore) acquire(holderKey string, tx *transaction) bool {
"current_holders": len(existing),
"limit": limit,
}).Info("Acquire failed")
return false
return false, nil
}

// reacquire asserts at startup that the recorded holder still holds this lock
// in the database. The database is the single source of truth for a
// database-backed lock: the held row is durable and survives the controller
// restart, so nothing is inserted or mutated here. A missing row means the
// hold no longer exists - e.g. it was expired by ExpireInactiveLocks while the
// controller was down and may since have been acquired by another holder - so
// the workflow's recorded hold is stale and the caller fails the workflow
// rather than resurrect a hold the database does not back.
func (s *databaseSemaphore) reacquire(holderKey string, tx *transaction) error {
holders, err := s.currentHoldersSession(*tx.db)
if err != nil {
return fmt.Errorf("could not verify hold on %s for %s: %w", s.longDBKey(), holderKey, err)
}
if !slices.Contains(holders, holderKey) {
return fmt.Errorf("hold on %s for %s is not present in the database", s.longDBKey(), holderKey)
}
return nil
}

func (s *databaseSemaphore) tryAcquire(holderKey string, tx *transaction) (bool, string) {
func (s *databaseSemaphore) tryAcquire(holderKey string, tx *transaction) (bool, string, error) {
acq, already, msg := s.checkAcquire(holderKey, tx)
if already {
s.log.WithFields(log.Fields{
"key": holderKey,
"result": true,
"message": msg,
}).Info("tryAcquire - already held")
return true, msg
return true, msg, nil
}
if !acq {
s.log.WithFields(log.Fields{
"key": holderKey,
"result": false,
"message": msg,
}).Info("tryAcquire - cannot acquire")
return false, msg
return false, msg, nil
}
if s.acquire(holderKey, tx) {
acquired, err := s.acquire(holderKey, tx)
if acquired {
s.log.WithFields(log.Fields{
"key": holderKey,
"result": true,
}).Info("tryAcquire succeeded")
s.notifyWaiters()
return true, ""
return true, "", nil
}
s.log.WithFields(log.Fields{
"key": holderKey,
"result": false,
"message": msg,
"error": err,
}).Info("tryAcquire failed")
return false, msg
return false, msg, err
}

func (s *databaseSemaphore) expireLocks() {
Expand Down
Loading
Loading