Skip to content

Commit dd2fb12

Browse files
committed
Add context metadata provider interface for root components to implement
1 parent e4bc001 commit dd2fb12

3 files changed

Lines changed: 519 additions & 23 deletions

File tree

chasm/context_metadata.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
package chasm
2+
3+
// ContextMetadataProvider if implemented by the root Component, allows the CHASM
4+
// framework to export execution metadata into the request context. This is typically used for
5+
// observability use cases eg. metrics requiring high cardinality for metering.
6+
type ContextMetadataProvider interface {
7+
ContextMetadata(Context) map[string]string
8+
}

service/history/chasm_engine.go

Lines changed: 79 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
enumsspb "go.temporal.io/server/api/enums/v1"
1212
"go.temporal.io/server/chasm"
1313
"go.temporal.io/server/common"
14+
"go.temporal.io/server/common/contextutil"
1415
"go.temporal.io/server/common/convert"
1516
"go.temporal.io/server/common/definition"
1617
"go.temporal.io/server/common/headers"
@@ -108,6 +109,63 @@ func (e *ChasmEngine) NotifyExecution(key chasm.ExecutionKey) {
108109
e.notifier.Notify(key)
109110
}
110111

112+
func (e *ChasmEngine) setContextMetadata(
113+
ctx context.Context,
114+
chasmTree *chasm.Node,
115+
) chasm.Context {
116+
chasmContext := chasm.NewContext(ctx, chasmTree)
117+
118+
rootComponent, err := chasmTree.Component(chasmContext, chasm.ComponentRef{})
119+
if err != nil {
120+
executionKey := chasmContext.ExecutionKey()
121+
e.logger.Error(
122+
"Failed to resolve CHASM root component for context metadata",
123+
tag.WorkflowNamespaceID(executionKey.NamespaceID),
124+
tag.WorkflowID(executionKey.BusinessID),
125+
tag.WorkflowRunID(executionKey.RunID),
126+
tag.Error(err),
127+
)
128+
return chasmContext
129+
}
130+
131+
provider, ok := rootComponent.(chasm.ContextMetadataProvider)
132+
if !ok {
133+
return chasmContext
134+
}
135+
136+
for key, value := range provider.ContextMetadata(chasmContext) {
137+
contextutil.ContextMetadataSet(ctx, key, value)
138+
}
139+
140+
return chasmContext
141+
}
142+
143+
func chasmTreeFromMutableState(
144+
mutableState historyi.MutableState,
145+
) (*chasm.Node, error) {
146+
chasmTree, ok := mutableState.ChasmTree().(*chasm.Node)
147+
if !ok {
148+
return nil, serviceerror.NewInternalf(
149+
"CHASM tree implementation not properly wired up, encountered type: %T, expected type: %T",
150+
mutableState.ChasmTree(),
151+
&chasm.Node{},
152+
)
153+
}
154+
return chasmTree, nil
155+
}
156+
157+
func (e *ChasmEngine) setContextMetadataFromMutableState(
158+
ctx context.Context,
159+
mutableState historyi.MutableState,
160+
) {
161+
chasmTree, err := chasmTreeFromMutableState(mutableState)
162+
if err != nil {
163+
e.logger.Error("Failed to resolve CHASM tree for context metadata", tag.Error(err))
164+
return
165+
}
166+
e.setContextMetadata(ctx, chasmTree)
167+
}
168+
111169
func (e *ChasmEngine) StartExecution(
112170
ctx context.Context,
113171
executionRef chasm.ComponentRef,
@@ -177,6 +235,7 @@ func (e *ChasmEngine) startExecution(
177235
return chasm.StartExecutionResult{}, err
178236
}
179237
if !hasCurrentRun {
238+
e.setContextMetadataFromMutableState(ctx, newExecutionParams.mutableState)
180239
serializedRef, err := newExecutionParams.executionRef.Serialize(e.registry)
181240
if err != nil {
182241
// Created is true here because persistAsBrandNew succeeded, but we failed to serialize the ref.
@@ -354,13 +413,9 @@ func (e *ChasmEngine) applyUpdateWithLease(
354413
updateFn func(chasm.MutableContext, chasm.Component) error,
355414
) ([]byte, error) {
356415
mutableState := executionLease.GetMutableState()
357-
chasmTree, ok := mutableState.ChasmTree().(*chasm.Node)
358-
if !ok {
359-
return nil, serviceerror.NewInternalf(
360-
"CHASM tree implementation not properly wired up, encountered type: %T, expected type: %T",
361-
mutableState.ChasmTree(),
362-
&chasm.Node{},
363-
)
416+
chasmTree, err := chasmTreeFromMutableState(mutableState)
417+
if err != nil {
418+
return nil, err
364419
}
365420

366421
mutableContext := chasm.NewMutableContext(ctx, chasmTree)
@@ -375,6 +430,8 @@ func (e *ChasmEngine) applyUpdateWithLease(
375430

376431
// TODO: Support WithSpeculative() TransitionOption.
377432

433+
e.setContextMetadata(ctx, chasmTree)
434+
378435
if err := executionLease.GetContext().UpdateWorkflowExecutionAsActive(
379436
ctx,
380437
shardContext,
@@ -438,6 +495,8 @@ func (e *ChasmEngine) startAndUpdateExecution(
438495
return chasm.ExecutionKey{}, nil, false, currentRunInfo.CurrentWorkflowConditionFailedError
439496
}
440497

498+
e.setContextMetadataFromMutableState(ctx, newExecutionParams.mutableState)
499+
441500
serializedRef, err := newExecutionParams.executionRef.Serialize(e.registry)
442501

443502
return newExecutionParams.executionRef.ExecutionKey, serializedRef, true, err
@@ -502,6 +561,8 @@ func (e *ChasmEngine) deleteExecution(
502561
mutableState := executionLease.GetMutableState()
503562
we := mutableState.GetWorkflowKey()
504563

564+
e.setContextMetadataFromMutableState(ctx, mutableState)
565+
505566
log.With(shardContext.GetLogger(),
506567
tag.WorkflowNamespaceID(ref.NamespaceID),
507568
tag.WorkflowID(we.WorkflowID),
@@ -580,16 +641,12 @@ func (e *ChasmEngine) readComponent(
580641
executionLease.GetReleaseFn()(nil)
581642
}()
582643

583-
chasmTree, ok := executionLease.GetMutableState().ChasmTree().(*chasm.Node)
584-
if !ok {
585-
return serviceerror.NewInternalf(
586-
"CHASM tree implementation not properly wired up, encountered type: %T, expected type: %T",
587-
executionLease.GetMutableState().ChasmTree(),
588-
&chasm.Node{},
589-
)
644+
chasmTree, err := chasmTreeFromMutableState(executionLease.GetMutableState())
645+
if err != nil {
646+
return err
590647
}
591648

592-
chasmContext := chasm.NewContext(ctx, chasmTree)
649+
chasmContext := e.setContextMetadata(ctx, chasmTree)
593650
component, err := chasmTree.Component(chasmContext, ref)
594651
if err != nil {
595652
return err
@@ -686,16 +743,13 @@ func (e *ChasmEngine) predicateSatisfied(
686743
ref chasm.ComponentRef,
687744
executionLease api.WorkflowLease,
688745
) ([]byte, error) {
689-
chasmTree, ok := executionLease.GetMutableState().ChasmTree().(*chasm.Node)
690-
if !ok {
691-
return nil, serviceerror.NewInternalf(
692-
"CHASM tree implementation not properly wired up, encountered type: %T, expected type: %T",
693-
executionLease.GetMutableState().ChasmTree(),
694-
&chasm.Node{},
695-
)
746+
chasmTree, err := chasmTreeFromMutableState(executionLease.GetMutableState())
747+
if err != nil {
748+
return nil, err
696749
}
697750

698-
chasmContext := chasm.NewContext(ctx, chasmTree)
751+
chasmContext := e.setContextMetadata(ctx, chasmTree)
752+
699753
component, err := chasmTree.Component(chasmContext, ref)
700754
if err != nil {
701755
return nil, err
@@ -1052,6 +1106,8 @@ func (e *ChasmEngine) handleReusePolicy(
10521106
return chasm.StartExecutionResult{}, err
10531107
}
10541108

1109+
e.setContextMetadataFromMutableState(ctx, newExecutionParams.mutableState)
1110+
10551111
serializedRef, err := newExecutionParams.executionRef.Serialize(e.registry)
10561112
if err != nil {
10571113
return chasm.StartExecutionResult{ExecutionKey: newExecutionParams.executionRef.ExecutionKey, Created: true}, err

0 commit comments

Comments
 (0)