Skip to content

Commit 6630770

Browse files
(fix): release workflow binary memory after use during init (#21525)
1 parent caba9cd commit 6630770

2 files changed

Lines changed: 30 additions & 17 deletions

File tree

core/services/workflows/syncer/v2/handler.go

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -757,6 +757,8 @@ func (h *eventHandler) tryEngineCreate(ctx context.Context, spec *job.WorkflowSp
757757
if err != nil {
758758
return fmt.Errorf("failed to decode workflow spec binary: %w", err)
759759
}
760+
// Free the hex-encoded binary string as it is not needed beyond this decode
761+
spec.Workflow = ""
760762

761763
// Workflow Registry version >2 no longer handles secrets
762764
secretsURL := ""
@@ -767,7 +769,8 @@ func (h *eventHandler) tryEngineCreate(ctx context.Context, spec *job.WorkflowSp
767769
if err != nil {
768770
return fmt.Errorf("failed to decode owner: %w", err)
769771
}
770-
hash, err := pkgworkflows.GenerateWorkflowID(ownerBytes, spec.WorkflowName, decodedBinary, []byte(spec.Config), secretsURL)
772+
configBytes := []byte(spec.Config)
773+
hash, err := pkgworkflows.GenerateWorkflowID(ownerBytes, spec.WorkflowName, decodedBinary, configBytes, secretsURL)
771774
if err != nil {
772775
return fmt.Errorf("failed to generate workflow id: %w", err)
773776
}
@@ -790,16 +793,19 @@ func (h *eventHandler) tryEngineCreate(ctx context.Context, spec *job.WorkflowSp
790793
// before emitting the workflowActivated event, ensuring the event accurately reflects deployment status.
791794
initDone := make(chan error, 1)
792795

793-
engine, err := h.engineFactory(
794-
ctx,
795-
spec.WorkflowID,
796-
spec.WorkflowOwner,
797-
workflowName,
798-
spec.WorkflowTag,
799-
[]byte(spec.Config),
800-
decodedBinary,
801-
initDone,
802-
)
796+
// Scope the engineFactory call so that decodedBinary goes out of scope immediately after the factory returns
797+
engine, err := func() (services.Service, error) {
798+
return h.engineFactory(
799+
ctx,
800+
spec.WorkflowID,
801+
spec.WorkflowOwner,
802+
workflowName,
803+
spec.WorkflowTag,
804+
configBytes,
805+
decodedBinary,
806+
initDone,
807+
)
808+
}()
803809
if err != nil {
804810
return fmt.Errorf("failed to create workflow engine: %w", err)
805811
}

core/services/workflows/syncer/v2/workflow_registry.go

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"io"
1010
"maps"
1111
"math/big"
12+
"runtime"
1213
"slices"
1314
"strings"
1415
"sync"
@@ -445,14 +446,14 @@ func (w *workflowRegistry) generateReconciliationEvents(
445446
) ([]*reconciliationEvent, error) {
446447
var events []*reconciliationEvent
447448
localHead := toLocalHead(head)
448-
// workflowMetadataMap is only used for lookups; disregard when reading the state machine.
449-
workflowMetadataMap := make(map[string]WorkflowMetadataView)
449+
// workflowMetadataIDs is a set of workflow IDs present in this tick's metadata
450+
workflowMetadataIDs := make(map[string]struct{}, len(workflowMetadata))
450451
for _, wfMeta := range workflowMetadata {
451-
workflowMetadataMap[wfMeta.WorkflowID.Hex()] = wfMeta
452+
workflowMetadataIDs[wfMeta.WorkflowID.Hex()] = struct{}{}
452453
}
453454

454-
// Keep track of which of the engines in the engineRegistry have been touched
455-
workflowsSeen := map[string]bool{}
455+
// Keep track of which of the engines in the engineRegistry have been touched.
456+
workflowsSeen := make(map[string]bool, len(workflowMetadata))
456457
for _, wfMeta := range workflowMetadata {
457458
id := wfMeta.WorkflowID.Hex()
458459
engineFound := w.engineRegistry.Contains(wfMeta.WorkflowID)
@@ -596,7 +597,7 @@ func (w *workflowRegistry) generateReconciliationEvents(
596597
// the workflow no longer exists in this source's metadata
597598
for id, event := range pendingEvents {
598599
if event.Name == WorkflowActivated {
599-
if _, ok := workflowMetadataMap[event.Data.(WorkflowActivatedEvent).WorkflowID.Hex()]; !ok {
600+
if _, ok := workflowMetadataIDs[event.Data.(WorkflowActivatedEvent).WorkflowID.Hex()]; !ok {
600601
delete(pendingEvents, id)
601602
}
602603
}
@@ -818,6 +819,12 @@ func (w *workflowRegistry) syncUsingReconciliationStrategy(ctx context.Context)
818819
}
819820
wg.Wait()
820821

822+
// prompt the GC to reclaim transient allocations from event handling
823+
// that would otherwise be delayed because the dominant CGo/wasmtime memory is invisible to the Go GC
824+
if dispatched > 0 {
825+
runtime.GC()
826+
}
827+
821828
batchDuration := time.Since(batchStart)
822829
w.metrics.recordReconcileBatch(ctx, sourceName, dispatched, batchDuration)
823830
if backoffCount > 0 {

0 commit comments

Comments
 (0)