Skip to content

Commit fd13b5b

Browse files
committed
CHASM telemetry
1 parent 44a3f6b commit fd13b5b

11 files changed

Lines changed: 182 additions & 25 deletions

File tree

chasm/registrable_task.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,13 @@ import (
44
"context"
55
"fmt"
66
"reflect"
7+
"strings"
78
)
89

910
type (
1011
RegistrableTask struct {
1112
taskType string
13+
telemetryTypeName string
1214
goType reflect.Type
1315
componentGoType reflect.Type // It is not clear how this one is used.
1416
validateFn validateFn
@@ -155,6 +157,7 @@ func (rt *RegistrableTask) registerToLibrary(
155157

156158
fqn := rt.fqType()
157159
rt.taskTypeID = GenerateTypeID(fqn)
160+
rt.telemetryTypeName = deriveTelemetryTypeName(rt.goType, fqn)
158161
return fqn, rt.taskTypeID, nil
159162
}
160163

@@ -173,3 +176,23 @@ func (rt *RegistrableTask) fqType() string {
173176
}
174177
return FullyQualifiedName(rt.library.Name(), rt.taskType)
175178
}
179+
180+
func (rt *RegistrableTask) telemetryType() string {
181+
if rt.telemetryTypeName != "" {
182+
return rt.telemetryTypeName
183+
}
184+
return rt.fqType()
185+
}
186+
187+
func deriveTelemetryTypeName(taskType reflect.Type, fallback string) string {
188+
for taskType.Kind() == reflect.Pointer {
189+
taskType = taskType.Elem()
190+
}
191+
// Remove common prefix/suffix from type name
192+
if name := taskType.Name(); name != "" {
193+
name = strings.TrimPrefix(name, "Chasm")
194+
name = strings.TrimSuffix(name, "Data")
195+
return name
196+
}
197+
return fallback
198+
}

chasm/registry.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,15 @@ func (r *Registry) TaskFqnByID(id uint32) (string, bool) {
168168
return rt.fqType(), true
169169
}
170170

171+
// TaskTelemetryNameByID returns the human-readable telemetry label for a task type ID.
172+
func (r *Registry) TaskTelemetryNameByID(id uint32) (string, bool) {
173+
rt, ok := r.rtByID[id]
174+
if !ok {
175+
return "", false
176+
}
177+
return rt.telemetryType(), true
178+
}
179+
171180
// TaskIDFor converts registered task instance to task type ID.
172181
// This method should only be used by CHASM framework internal code,
173182
// NOT CHASM library developers.

chasm/registry_test.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,18 @@ func (s *RegistryTestSuite) TestRegistry_RegisterTasks_Success() {
147147
require.True(s.T(), ok)
148148
require.Equal(s.T(), "TestLibrary.Task2", rt2.FqType())
149149

150+
taskID1, ok := r.TaskIDFor(testTask1{})
151+
require.True(s.T(), ok)
152+
name1, ok := r.TaskTelemetryNameByID(taskID1)
153+
require.True(s.T(), ok)
154+
require.Equal(s.T(), "testTask1", name1)
155+
156+
taskID2, ok := r.TaskIDFor(testTask2{})
157+
require.True(s.T(), ok)
158+
name2, ok := r.TaskTelemetryNameByID(taskID2)
159+
require.True(s.T(), ok)
160+
require.Equal(s.T(), "testTask2", name2)
161+
150162
tInstance2 := "invalid task instance"
151163
rt3, ok := r.TaskFor(tInstance2)
152164
require.False(s.T(), ok)

chasm/statemachine.go

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,10 @@ import (
44
"fmt"
55
"slices"
66

7+
"go.opentelemetry.io/otel/attribute"
8+
"go.opentelemetry.io/otel/trace"
79
"go.temporal.io/api/serviceerror"
10+
"go.temporal.io/server/common/telemetry"
811
)
912

1013
// ErrInvalidTransition is returned from [Transition.Apply] on an invalid state transition.
@@ -44,8 +47,24 @@ func (t Transition[S, SM, E]) Possible(sm SM) bool {
4447

4548
// Apply applies a transition event to the given state machine changing the state machine's state to the transition's
4649
// Destination on success.
47-
func (t Transition[S, SM, E]) Apply(sm SM, ctx MutableContext, event E) error {
50+
func (t Transition[S, SM, E]) Apply(sm SM, ctx MutableContext, event E) (retErr error) {
4851
prevState := sm.StateMachineState()
52+
53+
// Defer to always emit the transition telemetry event.
54+
if telemetry.DebugMode() {
55+
defer func() {
56+
attrs := []attribute.KeyValue{
57+
attribute.String("chasm.transition.source", fmt.Sprintf("%v", prevState)),
58+
attribute.String("chasm.transition.destination", fmt.Sprintf("%v", t.Destination)),
59+
}
60+
if retErr != nil {
61+
attrs = append(attrs, attribute.String("chasm.transition.error", retErr.Error()))
62+
}
63+
span := trace.SpanFromContext(ctx.goContext())
64+
span.AddEvent("chasm.transition", trace.WithAttributes(attrs...))
65+
}()
66+
}
67+
4968
if !t.Possible(sm) {
5069
return fmt.Errorf("%w from %v", ErrInvalidTransition, prevState)
5170
}

common/log/tag/tags.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ import (
2121

2222
// LoggingCallAtKey is reserved tag
2323
const (
24+
ActivityIDKey = "activity-id"
25+
ChasmRunIDKey = "run-id"
2426
LoggingCallAtKey = "logging-call-at"
2527
WorkflowIDKey = "wf-id"
2628
WorkflowRunIDKey = "wf-run-id"
@@ -914,7 +916,7 @@ func ActivityInfo(activityInfo any) ZapTag {
914916

915917
// ActivityID returns tag for an activity ID
916918
func ActivityID(id string) ZapTag {
917-
return NewStringTag("activity-id", id)
919+
return NewStringTag(ActivityIDKey, id)
918920
}
919921

920922
// OperationID returns tag for a nexus operation ID
@@ -924,7 +926,7 @@ func OperationID(id string) ZapTag {
924926

925927
// ChasmRunID returns tag for an entity run ID
926928
func ChasmRunID(id string) ZapTag {
927-
return NewStringTag("run-id", id)
929+
return NewStringTag(ChasmRunIDKey, id)
928930
}
929931

930932
// ActivitySize returns a tag for a standalone activity size

common/rpc/interceptor/logtags/workflow_tags.go

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,10 +54,26 @@ func (wt *WorkflowTags) fromTaskToken(taskTokenBytes []byte) []tag.Tag {
5454
if len(taskTokenBytes) == 0 {
5555
return nil
5656
}
57+
5758
taskToken, err := wt.serializer.Deserialize(taskTokenBytes)
5859
if err != nil {
5960
wt.logger.Warn("unable to deserialize task token while getting workflow tags", tag.Error(err))
6061
return nil
6162
}
62-
return []tag.Tag{tag.WorkflowID(taskToken.WorkflowId), tag.WorkflowRunID(taskToken.RunId)}
63+
64+
var tags []tag.Tag
65+
if taskToken.WorkflowId != "" {
66+
tags = append(tags, tag.WorkflowID(taskToken.WorkflowId))
67+
}
68+
if taskToken.RunId != "" {
69+
tags = append(tags, tag.WorkflowRunID(taskToken.RunId))
70+
}
71+
if len(taskToken.ComponentRef) > 0 && taskToken.ActivityId != "" {
72+
tags = append(tags, tag.ActivityID(taskToken.ActivityId))
73+
}
74+
75+
if len(tags) == 0 {
76+
return nil
77+
}
78+
return tags
6379
}

common/rpc/interceptor/logtags/workflow_tags_test.go

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,30 @@ func TestExtract(t *testing.T) {
3333
taskTokenBytes, err := serializer.Serialize(&taskToken)
3434
assert.NoError(t, err)
3535

36+
activityTaskToken := tokenspb.Task{
37+
WorkflowId: tv.WorkflowID(),
38+
RunId: tv.RunID(),
39+
ActivityId: "workflow-activity-id",
40+
}
41+
activityTaskTokenBytes, err := serializer.Serialize(&activityTaskToken)
42+
assert.NoError(t, err)
43+
44+
chasmTaskToken := tokenspb.Task{
45+
WorkflowId: tv.WorkflowID(),
46+
RunId: tv.RunID(),
47+
ActivityId: "activity-id",
48+
ComponentRef: []byte("component-ref"),
49+
}
50+
chasmTaskTokenBytes, err := serializer.Serialize(&chasmTaskToken)
51+
assert.NoError(t, err)
52+
53+
chasmTaskTokenNoActivityID := tokenspb.Task{
54+
RunId: tv.RunID(),
55+
ComponentRef: []byte("component-ref"),
56+
}
57+
chasmTaskTokenNoActivityIDBytes, err := serializer.Serialize(&chasmTaskTokenNoActivityID)
58+
assert.NoError(t, err)
59+
3660
testCases := []struct {
3761
name string
3862
req any
@@ -86,6 +110,32 @@ func TestExtract(t *testing.T) {
86110
workflowID: tv.WorkflowID(),
87111
runID: tv.RunID(),
88112
},
113+
{
114+
name: "Frontend RespondActivityTaskCompletedRequest with workflow activity task_token",
115+
req: &workflowservice.RespondActivityTaskCompletedRequest{
116+
TaskToken: activityTaskTokenBytes,
117+
},
118+
fullMethod: "/temporal.api.workflowservice.v1.WorkflowService/RespondActivityTaskCompleted",
119+
workflowID: tv.WorkflowID(),
120+
runID: tv.RunID(),
121+
},
122+
{
123+
name: "Frontend RespondActivityTaskCompletedRequest with CHASM task_token",
124+
req: &workflowservice.RespondActivityTaskCompletedRequest{
125+
TaskToken: chasmTaskTokenBytes,
126+
},
127+
fullMethod: "/temporal.api.workflowservice.v1.WorkflowService/RespondActivityTaskCompleted",
128+
workflowID: tv.WorkflowID(),
129+
runID: tv.RunID(),
130+
},
131+
{
132+
name: "Frontend RespondActivityTaskCompletedRequest with CHASM task_token and empty activity_id",
133+
req: &workflowservice.RespondActivityTaskCompletedRequest{
134+
TaskToken: chasmTaskTokenNoActivityIDBytes,
135+
},
136+
fullMethod: "/temporal.api.workflowservice.v1.WorkflowService/RespondActivityTaskCompleted",
137+
runID: tv.RunID(),
138+
},
89139
{
90140
name: "Frontend RespondQueryTaskCompletedRequest (task_token is ignored)",
91141
req: &workflowservice.RespondQueryTaskCompletedRequest{
@@ -152,12 +202,16 @@ func TestExtract(t *testing.T) {
152202
tags := wt.Extract(tt.req, tt.fullMethod)
153203
var (
154204
workflowIDTag tag.Tag
205+
activityIDTag tag.Tag
155206
runIDTag tag.Tag
156207
)
157208
for _, tg := range tags {
158209
if tg.Key() == tag.WorkflowID("").Key() {
159210
workflowIDTag = tg
160211
}
212+
if tg.Key() == tag.ActivityID("").Key() && tg.Value() != "" {
213+
activityIDTag = tg
214+
}
161215
if tg.Key() == tag.WorkflowRunID("").Key() {
162216
runIDTag = tg
163217
}
@@ -169,6 +223,12 @@ func TestExtract(t *testing.T) {
169223
} else {
170224
assert.Nil(t, workflowIDTag)
171225
}
226+
if tt.name == "Frontend RespondActivityTaskCompletedRequest with CHASM task_token" {
227+
assert.NotNil(t, activityIDTag)
228+
assert.Equal(t, "activity-id", activityIDTag.Value())
229+
} else {
230+
assert.Nil(t, activityIDTag)
231+
}
172232
if tt.runID != "" {
173233
assert.NotNil(t, runIDTag)
174234
assert.Equal(t, runIDTag.Value(), tt.runID)

common/telemetry/grpc.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -172,10 +172,12 @@ func (c *customServerStatsHandler) annotateTags(
172172
for _, logTag := range c.tags.Extract(payload, methodName) {
173173
var k string
174174
switch logTag.Key() {
175+
case tag.ActivityIDKey:
176+
k = BusinessIDKey
175177
case tag.WorkflowIDKey:
176178
k = WorkflowIDKey
177-
case tag.WorkflowRunIDKey:
178-
k = WorkflowRunIDKey
179+
case tag.ChasmRunIDKey, tag.WorkflowRunIDKey:
180+
k = RunIDKey
179181
default:
180182
continue
181183
}

common/telemetry/tags.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ const (
1010
ComponentQueueVisibility = "queue.visibility"
1111
ComponentUpdateRegistry = "update.registry"
1212

13-
WorkflowIDKey = "temporalWorkflowID"
14-
WorkflowRunIDKey = "temporalRunID"
13+
BusinessIDKey = "temporalBusinessID"
14+
RunIDKey = "temporalRunID"
15+
WorkflowIDKey = "temporalWorkflowID"
1516
)

service/history/queues/executable.go

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -227,7 +227,6 @@ func NewExecutable(
227227
}
228228

229229
func (e *executableImpl) Execute() (retErr error) {
230-
231230
startTime := e.timeSource.Now()
232231
e.scheduleLatency = startTime.Sub(e.scheduledTime)
233232

@@ -257,15 +256,29 @@ func (e *executableImpl) Execute() (retErr error) {
257256
// Wrapped in if block to avoid unnecessary allocations when OTEL is disabled.
258257
if telemetry.IsEnabled(e.tracer) {
259258
var span trace.Span
259+
// tasks.Task is workflow-centric, but CHASM tasks reuse the same getter to
260+
// carry the root entity business ID (e.g. workflow ID, activity ID, schedule ID).
261+
entityID := e.GetWorkflowID()
262+
idKey := telemetry.WorkflowIDKey
263+
taskLabel := e.GetType().String()
264+
if task, ok := e.GetTask().(tasks.HasArchetypeID); ok {
265+
idKey = telemetry.BusinessIDKey
266+
if ct, ok := task.(*tasks.ChasmTask); ok {
267+
if name, ok := e.chasmRegistry.TaskTelemetryNameByID(ct.Info.GetTypeId()); ok {
268+
taskLabel = name
269+
}
270+
}
271+
}
260272
ctx, span = e.tracer.Start(
261273
ctx,
262-
fmt.Sprintf("queue.Execute/%v", e.GetType().String()),
274+
fmt.Sprintf("queue.Execute/%v", taskLabel),
263275
trace.WithSpanKind(trace.SpanKindConsumer),
264-
trace.WithAttributes(
265-
attribute.Key(telemetry.WorkflowIDKey).String(e.GetWorkflowID()),
266-
attribute.Key(telemetry.WorkflowRunIDKey).String(e.GetRunID()),
267-
attribute.Key("queue.task.type").String(e.GetType().String()),
268-
attribute.Key("queue.task.id").Int64(e.GetTaskID())))
276+
)
277+
span.SetAttributes(
278+
attribute.Key(idKey).String(entityID),
279+
attribute.Key(telemetry.RunIDKey).String(e.GetRunID()),
280+
attribute.Key("queue.task.type").String(e.GetType().String()),
281+
attribute.Key("queue.task.id").Int64(e.GetTaskID()))
269282

270283
if telemetry.DebugMode() {
271284
if taskPayload, err := json.Marshal(e.GetTask()); err != nil {

0 commit comments

Comments
 (0)