Skip to content

Commit f2b4fac

Browse files
committed
CHASM telemetry
1 parent 44a3f6b commit f2b4fac

11 files changed

Lines changed: 147 additions & 69 deletions

File tree

chasm/statemachine.go

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,10 @@ import (
44
"fmt"
55
"slices"
66

7+
"go.opentelemetry.io/otel/attribute"
8+
"go.opentelemetry.io/otel/trace"
79
"go.temporal.io/api/serviceerror"
10+
"go.temporal.io/server/common/telemetry"
811
)
912

1013
// ErrInvalidTransition is returned from [Transition.Apply] on an invalid state transition.
@@ -44,8 +47,24 @@ func (t Transition[S, SM, E]) Possible(sm SM) bool {
4447

4548
// Apply applies a transition event to the given state machine changing the state machine's state to the transition's
4649
// Destination on success.
47-
func (t Transition[S, SM, E]) Apply(sm SM, ctx MutableContext, event E) error {
50+
func (t Transition[S, SM, E]) Apply(sm SM, ctx MutableContext, event E) (retErr error) {
4851
prevState := sm.StateMachineState()
52+
53+
// Defer to always emit the transition telemetry event.
54+
if telemetry.DebugMode() {
55+
defer func() {
56+
attrs := []attribute.KeyValue{
57+
attribute.String("chasm.transition.source", fmt.Sprintf("%v", prevState)),
58+
attribute.String("chasm.transition.destination", fmt.Sprintf("%v", t.Destination)),
59+
}
60+
if retErr != nil {
61+
attrs = append(attrs, attribute.String("chasm.transition.error", retErr.Error()))
62+
}
63+
span := trace.SpanFromContext(ctx.goContext())
64+
span.AddEvent("chasm.transition", trace.WithAttributes(attrs...))
65+
}()
66+
}
67+
4968
if !t.Possible(sm) {
5069
return fmt.Errorf("%w from %v", ErrInvalidTransition, prevState)
5170
}

cmd/tools/genrpcserverinterceptors/main.go

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -137,13 +137,6 @@ func workflowTagGetters(messageType reflect.Type, depth int) messageData {
137137
case messageType.AssignableTo(workflowExecutionGetterT):
138138
pd.WorkflowIDGetter = "GetWorkflowExecution().GetWorkflowId()"
139139
pd.RunIDGetter = "GetWorkflowExecution().GetRunId()"
140-
case messageType.AssignableTo(taskTokenGetterT):
141-
for _, ert := range excludeTaskTokenTypes {
142-
if messageType.AssignableTo(ert) {
143-
return pd
144-
}
145-
}
146-
pd.TaskTokenGetter = "GetTaskToken()"
147140
default:
148141
// Might have any combination of these, or none.
149142
if messageType.AssignableTo(workflowIDGetterT) {
@@ -160,6 +153,25 @@ func workflowTagGetters(messageType reflect.Type, depth int) messageData {
160153
}
161154
}
162155

156+
if pd.ActivityIDGetter == "" && messageType.AssignableTo(activityIDGetterT) {
157+
pd.ActivityIDGetter = "GetActivityId()"
158+
}
159+
if pd.OperationIDGetter == "" && messageType.AssignableTo(operationIDGetterT) {
160+
pd.OperationIDGetter = "GetOperationId()"
161+
}
162+
if messageType.AssignableTo(taskTokenGetterT) {
163+
excluded := false
164+
for _, ert := range excludeTaskTokenTypes {
165+
if messageType.AssignableTo(ert) {
166+
excluded = true
167+
break
168+
}
169+
}
170+
if !excluded {
171+
pd.TaskTokenGetter = "GetTaskToken()"
172+
}
173+
}
174+
163175
// Iterates over fields in order they defined in proto file, not proto index.
164176
// Order is important because the first match wins.
165177
for fieldNum := 0; fieldNum < messageType.Elem().NumField(); fieldNum++ {

cmd/tools/genrpcserverinterceptors/server_interceptors.tmpl

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,7 @@ func (wt *WorkflowTags) extractFrom{{.Server}}Message(message any) []tag.Tag {
1515
{{- range .Messages}}
1616
case {{.Type}}:
1717
{{- if or .TaskTokenGetter .WorkflowIDGetter .RunIDGetter .ActivityIDGetter .OperationIDGetter .ChasmRunIDGetter}}
18-
{{- if .TaskTokenGetter}}
19-
return wt.fromTaskToken(r.{{ .TaskTokenGetter}})
20-
{{- else}}
18+
{{- if or .WorkflowIDGetter .RunIDGetter .ActivityIDGetter .OperationIDGetter .ChasmRunIDGetter}}
2119
return []tag.Tag{
2220
{{if .WorkflowIDGetter}} tag.WorkflowID(r.{{.WorkflowIDGetter}}),
2321
{{end -}}
@@ -30,6 +28,8 @@ func (wt *WorkflowTags) extractFrom{{.Server}}Message(message any) []tag.Tag {
3028
{{if .ChasmRunIDGetter}} tag.ChasmRunID(r.{{.ChasmRunIDGetter}}),
3129
{{end -}}
3230
}
31+
{{- else if .TaskTokenGetter}}
32+
return wt.fromTaskToken(r.{{ .TaskTokenGetter}})
3333
{{- end}}
3434
{{- else}}
3535
return nil

common/log/tag/tags.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ import (
2121

2222
// LoggingCallAtKey is reserved tag
2323
const (
24+
ActivityIDKey = "activity-id"
25+
ChasmRunIDKey = "run-id"
2426
LoggingCallAtKey = "logging-call-at"
2527
WorkflowIDKey = "wf-id"
2628
WorkflowRunIDKey = "wf-run-id"
@@ -914,7 +916,7 @@ func ActivityInfo(activityInfo any) ZapTag {
914916

915917
// ActivityID returns tag for an activity ID
916918
func ActivityID(id string) ZapTag {
917-
return NewStringTag("activity-id", id)
919+
return NewStringTag(ActivityIDKey, id)
918920
}
919921

920922
// OperationID returns tag for a nexus operation ID
@@ -924,7 +926,7 @@ func OperationID(id string) ZapTag {
924926

925927
// ChasmRunID returns tag for an entity run ID
926928
func ChasmRunID(id string) ZapTag {
927-
return NewStringTag("run-id", id)
929+
return NewStringTag(ChasmRunIDKey, id)
928930
}
929931

930932
// ActivitySize returns a tag for a standalone activity size

common/rpc/interceptor/logtags/matching_service_server_gen.go

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

common/rpc/interceptor/logtags/workflow_service_server_gen.go

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

common/rpc/interceptor/logtags/workflow_tags.go

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,10 +54,22 @@ func (wt *WorkflowTags) fromTaskToken(taskTokenBytes []byte) []tag.Tag {
5454
if len(taskTokenBytes) == 0 {
5555
return nil
5656
}
57+
5758
taskToken, err := wt.serializer.Deserialize(taskTokenBytes)
5859
if err != nil {
5960
wt.logger.Warn("unable to deserialize task token while getting workflow tags", tag.Error(err))
6061
return nil
6162
}
62-
return []tag.Tag{tag.WorkflowID(taskToken.WorkflowId), tag.WorkflowRunID(taskToken.RunId)}
63+
64+
var tags []tag.Tag
65+
if taskToken.WorkflowId != "" {
66+
tags = append(tags, tag.WorkflowID(taskToken.WorkflowId))
67+
}
68+
if taskToken.RunId != "" {
69+
tags = append(tags, tag.WorkflowRunID(taskToken.RunId))
70+
}
71+
if len(taskToken.ComponentRef) > 0 && taskToken.ActivityId != "" {
72+
tags = append(tags, tag.ActivityID(taskToken.ActivityId))
73+
}
74+
return tags
6375
}

common/rpc/interceptor/logtags/workflow_tags_test.go

Lines changed: 57 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,12 @@ import (
1818

1919
func TestExtract(t *testing.T) {
2020
serializer := tasktoken.NewSerializer()
21+
mustSerialize := func(token *tokenspb.Task) []byte {
22+
t.Helper()
23+
taskTokenBytes, err := serializer.Serialize(token)
24+
assert.NoError(t, err)
25+
return taskTokenBytes
26+
}
2127

2228
wt := logtags.NewWorkflowTags(
2329
serializer,
@@ -26,32 +32,34 @@ func TestExtract(t *testing.T) {
2632

2733
tv := testvars.New(t)
2834
tv = tv.WithRunID(tv.Any().RunID())
29-
taskToken := tokenspb.Task{
35+
taskTokenBytes := mustSerialize(&tokenspb.Task{
3036
WorkflowId: tv.WorkflowID(),
3137
RunId: tv.RunID(),
38+
})
39+
40+
type expectation struct {
41+
workflowID string
42+
activityID string
43+
runID string
3244
}
33-
taskTokenBytes, err := serializer.Serialize(&taskToken)
34-
assert.NoError(t, err)
3545

3646
testCases := []struct {
3747
name string
3848
req any
3949
fullMethod string
40-
workflowID string
41-
runID string
50+
expect expectation
4251
}{
4352
{
4453
name: "Frontend StartWorkflowExecutionRequest with only workflowID",
4554
req: &workflowservice.StartWorkflowExecutionRequest{WorkflowId: tv.WorkflowID()},
4655
fullMethod: "/temporal.api.workflowservice.v1.WorkflowService/StartWorkflowExecution",
47-
workflowID: tv.WorkflowID(),
56+
expect: expectation{workflowID: tv.WorkflowID()},
4857
},
4958
{
5059
name: "Frontend RecordActivityTaskHeartbeatByIdRequest with workflowID and runID",
5160
req: &workflowservice.RecordActivityTaskHeartbeatByIdRequest{WorkflowId: tv.WorkflowID(), RunId: tv.RunID()},
5261
fullMethod: "/temporal.api.workflowservice.v1.WorkflowService/RecordActivityTaskHeartbeatById",
53-
workflowID: tv.WorkflowID(),
54-
runID: tv.RunID(),
62+
expect: expectation{workflowID: tv.WorkflowID(), runID: tv.RunID()},
5563
},
5664
{
5765
name: "Frontend GetWorkflowExecutionHistoryRequest with execution",
@@ -62,8 +70,7 @@ func TestExtract(t *testing.T) {
6270
},
6371
},
6472
fullMethod: "/temporal.api.workflowservice.v1.WorkflowService/GetWorkflowExecutionHistory",
65-
workflowID: tv.WorkflowID(),
66-
runID: tv.RunID(),
73+
expect: expectation{workflowID: tv.WorkflowID(), runID: tv.RunID()},
6774
},
6875
{
6976
name: "Frontend RequestCancelWorkflowExecutionRequest with workflow_execution",
@@ -74,17 +81,39 @@ func TestExtract(t *testing.T) {
7481
},
7582
},
7683
fullMethod: "/temporal.api.workflowservice.v1.WorkflowService/RequestCancelWorkflowExecution",
77-
workflowID: tv.WorkflowID(),
78-
runID: tv.RunID(),
84+
expect: expectation{workflowID: tv.WorkflowID(), runID: tv.RunID()},
7985
},
8086
{
8187
name: "Frontend RespondActivityTaskCompletedRequest with task_token",
8288
req: &workflowservice.RespondActivityTaskCompletedRequest{
8389
TaskToken: taskTokenBytes,
8490
},
8591
fullMethod: "/temporal.api.workflowservice.v1.WorkflowService/RespondActivityTaskCompleted",
86-
workflowID: tv.WorkflowID(),
87-
runID: tv.RunID(),
92+
expect: expectation{workflowID: tv.WorkflowID(), runID: tv.RunID()},
93+
},
94+
{
95+
name: "Frontend RespondActivityTaskCompletedRequest with workflow activity task_token",
96+
req: &workflowservice.RespondActivityTaskCompletedRequest{
97+
TaskToken: mustSerialize(&tokenspb.Task{
98+
WorkflowId: tv.WorkflowID(),
99+
RunId: tv.RunID(),
100+
ActivityId: "workflow-activity-id",
101+
}),
102+
},
103+
fullMethod: "/temporal.api.workflowservice.v1.WorkflowService/RespondActivityTaskCompleted",
104+
expect: expectation{workflowID: tv.WorkflowID(), runID: tv.RunID()},
105+
},
106+
{
107+
name: "Frontend RespondActivityTaskCompletedRequest with CHASM task_token",
108+
req: &workflowservice.RespondActivityTaskCompletedRequest{
109+
TaskToken: mustSerialize(&tokenspb.Task{
110+
RunId: tv.RunID(),
111+
ActivityId: "activity-id",
112+
ComponentRef: []byte("component-ref"),
113+
}),
114+
},
115+
fullMethod: "/temporal.api.workflowservice.v1.WorkflowService/RespondActivityTaskCompleted",
116+
expect: expectation{activityID: "activity-id", runID: tv.RunID()},
88117
},
89118
{
90119
name: "Frontend RespondQueryTaskCompletedRequest (task_token is ignored)",
@@ -104,8 +133,7 @@ func TestExtract(t *testing.T) {
104133
},
105134
},
106135
fullMethod: "/temporal.server.api.historyservice.v1.HistoryService/DescribeWorkflowExecution",
107-
workflowID: tv.WorkflowID(),
108-
runID: tv.RunID(),
136+
expect: expectation{workflowID: tv.WorkflowID(), runID: tv.RunID()},
109137
},
110138
{
111139
name: "History RespondWorkflowTaskCompletedRequest",
@@ -115,8 +143,7 @@ func TestExtract(t *testing.T) {
115143
},
116144
},
117145
fullMethod: "/temporal.server.api.historyservice.v1.HistoryService/RespondWorkflowTaskCompleted",
118-
workflowID: tv.WorkflowID(),
119-
runID: tv.RunID(),
146+
expect: expectation{workflowID: tv.WorkflowID(), runID: tv.RunID()},
120147
},
121148
{
122149
name: "Matching QueryWorkflowRequest",
@@ -129,8 +156,7 @@ func TestExtract(t *testing.T) {
129156
},
130157
},
131158
fullMethod: "/temporal.server.api.matchingservice.v1.MatchingService/QueryWorkflow",
132-
workflowID: tv.WorkflowID(),
133-
runID: tv.RunID(),
159+
expect: expectation{workflowID: tv.WorkflowID(), runID: tv.RunID()},
134160
},
135161
{
136162
name: "Matching RespondWorkflowTaskCompletedRequest",
@@ -149,32 +175,19 @@ func TestExtract(t *testing.T) {
149175

150176
for _, tt := range testCases {
151177
t.Run(tt.name, func(t *testing.T) {
152-
tags := wt.Extract(tt.req, tt.fullMethod)
153-
var (
154-
workflowIDTag tag.Tag
155-
runIDTag tag.Tag
156-
)
157-
for _, tg := range tags {
158-
if tg.Key() == tag.WorkflowID("").Key() {
159-
workflowIDTag = tg
160-
}
161-
if tg.Key() == tag.WorkflowRunID("").Key() {
162-
runIDTag = tg
178+
var got expectation
179+
for _, tg := range wt.Extract(tt.req, tt.fullMethod) {
180+
switch tg.Key() {
181+
case tag.WorkflowID("").Key():
182+
got.workflowID = tg.Value().(string)
183+
case tag.ActivityID("").Key():
184+
got.activityID = tg.Value().(string)
185+
case tag.WorkflowRunID("").Key():
186+
got.runID = tg.Value().(string)
187+
default:
163188
}
164189
}
165-
166-
if tt.workflowID != "" {
167-
assert.NotNil(t, workflowIDTag)
168-
assert.Equal(t, workflowIDTag.Value(), tt.workflowID)
169-
} else {
170-
assert.Nil(t, workflowIDTag)
171-
}
172-
if tt.runID != "" {
173-
assert.NotNil(t, runIDTag)
174-
assert.Equal(t, runIDTag.Value(), tt.runID)
175-
} else {
176-
assert.Nil(t, runIDTag)
177-
}
190+
assert.Equal(t, tt.expect, got)
178191
})
179192
}
180193
}

common/telemetry/grpc.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -172,10 +172,12 @@ func (c *customServerStatsHandler) annotateTags(
172172
for _, logTag := range c.tags.Extract(payload, methodName) {
173173
var k string
174174
switch logTag.Key() {
175+
case tag.ActivityIDKey:
176+
k = ActivityIDKey
175177
case tag.WorkflowIDKey:
176178
k = WorkflowIDKey
177-
case tag.WorkflowRunIDKey:
178-
k = WorkflowRunIDKey
179+
case tag.ChasmRunIDKey, tag.WorkflowRunIDKey:
180+
k = RunIDKey
179181
default:
180182
continue
181183
}

common/telemetry/tags.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ const (
1010
ComponentQueueVisibility = "queue.visibility"
1111
ComponentUpdateRegistry = "update.registry"
1212

13-
WorkflowIDKey = "temporalWorkflowID"
14-
WorkflowRunIDKey = "temporalRunID"
13+
ActivityIDKey = "temporalActivityID"
14+
RunIDKey = "temporalRunID"
15+
WorkflowIDKey = "temporalWorkflowID"
1516
)

0 commit comments

Comments
 (0)