Skip to content

Commit b4cb3d4

Browse files
committed
defer free()
1 parent 868f609 commit b4cb3d4

4 files changed

Lines changed: 7 additions & 17 deletions

File tree

pkg/workflows/wasm/host/execution.go

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ type execution[T any] struct {
2323
capabilityResponses map[int32]<-chan *sdkpb.CapabilityResponse
2424
secretsResponses map[int32]<-chan *secretsResponse
2525
pendingCallsLimiter limits.ResourcePoolLimiter[int]
26-
pendingCallsFree map[int32]func()
2726
lock sync.RWMutex
2827
module *module
2928
executor ExecutionHelper
@@ -51,9 +50,10 @@ func (e *execution[T]) callCapAsync(ctx context.Context, req *sdkpb.CapabilityRe
5150
e.lock.Lock()
5251
defer e.lock.Unlock()
5352
e.capabilityResponses[req.CallbackId] = ch
54-
e.pendingCallsFree[req.CallbackId] = free
5553

5654
go func() {
55+
defer free()
56+
5757
resp, err := e.executor.CallCapability(ctx, req)
5858

5959
if err != nil {
@@ -92,10 +92,6 @@ func (e *execution[T]) awaitCapabilities(ctx context.Context, acr *sdkpb.AwaitCa
9292
}
9393

9494
delete(e.capabilityResponses, callId)
95-
if free, ok := e.pendingCallsFree[callId]; ok {
96-
free()
97-
delete(e.pendingCallsFree, callId)
98-
}
9995
}
10096

10197
return &sdkpb.AwaitCapabilitiesResponse{
@@ -119,9 +115,10 @@ func (e *execution[T]) getSecretsAsync(ctx context.Context, req *sdkpb.GetSecret
119115
e.lock.Lock()
120116
defer e.lock.Unlock()
121117
e.secretsResponses[req.CallbackId] = ch
122-
e.pendingCallsFree[req.CallbackId] = free
123118

124119
go func() {
120+
defer free()
121+
125122
resp, err := e.executor.GetSecrets(ctx, req)
126123
sr := &secretsResponse{responses: resp, err: err}
127124

@@ -157,10 +154,6 @@ func (e *execution[T]) awaitSecrets(ctx context.Context, acr *sdkpb.AwaitSecrets
157154
}
158155

159156
delete(e.secretsResponses, callId)
160-
if free, ok := e.pendingCallsFree[callId]; ok {
161-
free()
162-
delete(e.pendingCallsFree, callId)
163-
}
164157
}
165158

166159
return &sdkpb.AwaitSecretsResponse{

pkg/workflows/wasm/host/execution_await_order_test.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,6 @@ func TestAwaitCapabilities_headOfLineBlocksOnEarlierID(t *testing.T) {
7171
ctx: t.Context(),
7272
capabilityResponses: make(map[int32]<-chan *sdkpb.CapabilityResponse),
7373
pendingCallsLimiter: limits.GlobalResourcePoolLimiter(defaultMaxPendingCalls),
74-
pendingCallsFree: map[int32]func(){},
7574
executor: stub,
7675
}
7776

pkg/workflows/wasm/host/module.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -72,9 +72,9 @@ type ModuleConfig struct {
7272
IsUncompressed bool
7373
Fetch func(ctx context.Context, req *FetchRequest) (*FetchResponse, error)
7474
MaxFetchRequests int
75-
// MaxPendingCalls bounds the number of concurrent in-flight capability call
76-
// goroutines per execution. Additional calls block until a slot is freed.
77-
MaxPendingCalls int
75+
// MaxPendingCalls bounds concurrent in-flight capability calls per workflow.
76+
MaxPendingCalls int
77+
// When PendingCallsLimiter is set, it enforces a separate pending calls pool per workflow ID.
7878
PendingCallsLimiter limits.ResourcePoolLimiter[int] // supersedes MaxPendingCalls if set
7979
MaxCompressedBinarySize uint64
8080
MaxCompressedBinaryLimiter limits.BoundLimiter[config.Size] // supersedes MaxCompressedBinarySize if set
@@ -711,7 +711,6 @@ func runWasm[I, O proto.Message](
711711
capabilityResponses: map[int32]<-chan *sdkpb.CapabilityResponse{},
712712
secretsResponses: map[int32]<-chan *secretsResponse{},
713713
pendingCallsLimiter: m.cfg.PendingCallsLimiter,
714-
pendingCallsFree: map[int32]func(){},
715714
module: m,
716715
executor: helper,
717716
donSeed: donSeed,

pkg/workflows/wasm/host/module_test.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -635,7 +635,6 @@ func Test_CallAwaitRace(t *testing.T) {
635635
module: m,
636636
capabilityResponses: map[int32]<-chan *sdkpb.CapabilityResponse{},
637637
pendingCallsLimiter: limits.GlobalResourcePoolLimiter[int](defaultMaxPendingCalls),
638-
pendingCallsFree: map[int32]func(){},
639638
ctx: t.Context(),
640639
executor: mockExecHelper,
641640
}

0 commit comments

Comments
 (0)