Skip to content

Commit 800df06

Browse files
committed
ben comment
1 parent 22cc073 commit 800df06

4 files changed

Lines changed: 23 additions & 19 deletions

File tree

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ By default, the worker runs tasks without a concurrency limit. Use `--max-concur
5252
oz-agent-worker --api-key "wk-abc123" --worker-id "my-worker" --max-concurrent-tasks 4
5353
```
5454

55-
When the limit is reached, incoming tasks are queued and will execute once a slot becomes available. A value of `0` (the default) means unlimited.
55+
When the limit is reached, incoming tasks are rejected so the server can reassign them to other available workers. A value of `0` (the default) means unlimited.
5656

5757
## Environment Variables for Task Containers
5858

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ require (
5050
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.39.0 // indirect
5151
go.opentelemetry.io/otel/metric v1.39.0 // indirect
5252
go.opentelemetry.io/otel/trace v1.39.0 // indirect
53+
golang.org/x/sync v0.19.0 // indirect
5354
golang.org/x/sys v0.39.0 // indirect
5455
golang.org/x/time v0.14.0 // indirect
5556
google.golang.org/protobuf v1.36.10 // indirect

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,8 @@ golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJ
216216
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
217217
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
218218
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
219+
golang.org/x/sync v0.19.0 h1:vV+1eWNmZ5geRlYjzm2adRgW2/mcpevXNg50YZtPCE4=
220+
golang.org/x/sync v0.19.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI=
219221
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
220222
golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
221223
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=

internal/worker/worker.go

Lines changed: 19 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313

1414
"github.com/distribution/reference"
1515
cliconfig "github.com/docker/cli/cli/config"
16+
"golang.org/x/sync/semaphore"
1617
"github.com/docker/docker/api/types/container"
1718
"github.com/docker/docker/api/types/image"
1819
"github.com/docker/docker/api/types/volume"
@@ -58,7 +59,7 @@ type Worker struct {
5859
sendChan chan []byte
5960
activeTasks map[string]context.CancelFunc
6061
tasksMutex sync.Mutex
61-
taskSemaphore chan struct{} // nil when unlimited; buffered channel used as counting semaphore
62+
taskSemaphore *semaphore.Weighted // nil when unlimited
6263
dockerClient *client.Client
6364
platform string // Docker daemon platform (e.g., "linux/amd64" or "linux/arm64")
6465
}
@@ -107,9 +108,9 @@ func New(ctx context.Context, config Config) (*Worker, error) {
107108

108109
log.Debugf(ctx, "Docker daemon is reachable, platform: %s", platform)
109110

110-
var taskSemaphore chan struct{}
111+
var taskSemaphore *semaphore.Weighted
111112
if config.MaxConcurrentTasks > 0 {
112-
taskSemaphore = make(chan struct{}, config.MaxConcurrentTasks)
113+
taskSemaphore = semaphore.NewWeighted(int64(config.MaxConcurrentTasks))
113114
log.Infof(ctx, "Concurrency limit set to %d", config.MaxConcurrentTasks)
114115
}
115116

@@ -334,6 +335,18 @@ func (w *Worker) handleMessage(message []byte) {
334335
func (w *Worker) handleTaskAssignment(assignment *types.TaskAssignmentMessage) {
335336
log.Infof(w.ctx, "Received task assignment: taskID=%s, title=%s", assignment.TaskID, assignment.Task.Title)
336337

338+
// If a concurrency limit is configured, try to acquire a slot without blocking.
339+
// Failing fast lets the server reroute the task to another worker.
340+
if w.taskSemaphore != nil {
341+
if !w.taskSemaphore.TryAcquire(1) {
342+
log.Warnf(w.ctx, "At max concurrency (%d), rejecting task: taskID=%s", w.config.MaxConcurrentTasks, assignment.TaskID)
343+
if err := w.sendTaskFailed(assignment.TaskID, "worker at maximum concurrency"); err != nil {
344+
log.Errorf(w.ctx, "Failed to send task failed message: %v", err)
345+
}
346+
return
347+
}
348+
}
349+
337350
// It's important to update the task state to claimed as the task lifecycle treats this as a dependency to advance to further states.
338351
if err := w.sendTaskClaimed(assignment.TaskID); err != nil {
339352
log.Errorf(w.ctx, "Failed to send task claimed message: %v", err)
@@ -349,29 +362,17 @@ func (w *Worker) handleTaskAssignment(assignment *types.TaskAssignmentMessage) {
349362
}
350363

351364
func (w *Worker) executeTask(ctx context.Context, assignment *types.TaskAssignmentMessage) {
352-
acquiredSlot := false
353365
defer func() {
354366
w.tasksMutex.Lock()
355367
delete(w.activeTasks, assignment.TaskID)
356368
w.tasksMutex.Unlock()
357369

358-
// Release the semaphore slot if we acquired one.
359-
if w.taskSemaphore != nil && acquiredSlot {
360-
<-w.taskSemaphore
370+
// Release the semaphore slot if concurrency is limited.
371+
if w.taskSemaphore != nil {
372+
w.taskSemaphore.Release(1)
361373
}
362374
}()
363375

364-
// If a concurrency limit is configured, wait for an available slot.
365-
if w.taskSemaphore != nil {
366-
log.Infof(ctx, "Waiting for concurrency slot: taskID=%s", assignment.TaskID)
367-
select {
368-
case w.taskSemaphore <- struct{}{}:
369-
acquiredSlot = true
370-
case <-ctx.Done():
371-
return
372-
}
373-
}
374-
375376
taskID := assignment.TaskID
376377
log.Infof(ctx, "Starting task execution: taskID=%s, title=%s", taskID, assignment.Task.Title)
377378

0 commit comments

Comments
 (0)