Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 19 additions & 7 deletions cmd/tools/genrpcserverinterceptors/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,13 +137,6 @@ func workflowTagGetters(messageType reflect.Type, depth int) messageData {
case messageType.AssignableTo(workflowExecutionGetterT):
pd.WorkflowIDGetter = "GetWorkflowExecution().GetWorkflowId()"
pd.RunIDGetter = "GetWorkflowExecution().GetRunId()"
case messageType.AssignableTo(taskTokenGetterT):
for _, ert := range excludeTaskTokenTypes {
if messageType.AssignableTo(ert) {
return pd
}
}
pd.TaskTokenGetter = "GetTaskToken()"
default:
// Might have any combination of these, or none.
if messageType.AssignableTo(workflowIDGetterT) {
Expand All @@ -160,6 +153,25 @@ func workflowTagGetters(messageType reflect.Type, depth int) messageData {
}
}

if pd.ActivityIDGetter == "" && messageType.AssignableTo(activityIDGetterT) {
pd.ActivityIDGetter = "GetActivityId()"
}
if pd.OperationIDGetter == "" && messageType.AssignableTo(operationIDGetterT) {
pd.OperationIDGetter = "GetOperationId()"
}
if messageType.AssignableTo(taskTokenGetterT) {
excluded := false
for _, ert := range excludeTaskTokenTypes {
if messageType.AssignableTo(ert) {
excluded = true
break
}
}
if !excluded {
pd.TaskTokenGetter = "GetTaskToken()"
}
}
Copy link
Copy Markdown
Contributor Author

@stephanos stephanos Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

main change (2): also extract activity ID when workflow ID is present


// Iterates over fields in order they defined in proto file, not proto index.
// Order is important because the first match wins.
for fieldNum := 0; fieldNum < messageType.Elem().NumField(); fieldNum++ {
Expand Down
6 changes: 3 additions & 3 deletions cmd/tools/genrpcserverinterceptors/server_interceptors.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,7 @@ func (wt *WorkflowTags) extractFrom{{.Server}}Message(message any) []tag.Tag {
{{- range .Messages}}
case {{.Type}}:
{{- if or .TaskTokenGetter .WorkflowIDGetter .RunIDGetter .ActivityIDGetter .OperationIDGetter .ChasmRunIDGetter}}
{{- if .TaskTokenGetter}}
return wt.fromTaskToken(r.{{ .TaskTokenGetter}})
{{- else}}
{{- if or .WorkflowIDGetter .RunIDGetter .ActivityIDGetter .OperationIDGetter .ChasmRunIDGetter}}
return []tag.Tag{
{{if .WorkflowIDGetter}} tag.WorkflowID(r.{{.WorkflowIDGetter}}),
{{end -}}
Expand All @@ -30,6 +28,8 @@ func (wt *WorkflowTags) extractFrom{{.Server}}Message(message any) []tag.Tag {
{{if .ChasmRunIDGetter}} tag.ChasmRunID(r.{{.ChasmRunIDGetter}}),
{{end -}}
}
{{- else if .TaskTokenGetter}}
return wt.fromTaskToken(r.{{ .TaskTokenGetter}})
{{- end}}
{{- else}}
return nil
Expand Down
6 changes: 4 additions & 2 deletions common/log/tag/tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (

// LoggingCallAtKey is reserved tag
const (
ActivityIDKey = "activity-id"
ChasmRunIDKey = "run-id"
LoggingCallAtKey = "logging-call-at"
WorkflowIDKey = "wf-id"
WorkflowRunIDKey = "wf-run-id"
Expand Down Expand Up @@ -914,7 +916,7 @@ func ActivityInfo(activityInfo any) ZapTag {

// ActivityID returns tag for an activity ID
func ActivityID(id string) ZapTag {
return NewStringTag("activity-id", id)
return NewStringTag(ActivityIDKey, id)
}

// OperationID returns tag for a nexus operation ID
Expand All @@ -924,7 +926,7 @@ func OperationID(id string) ZapTag {

// ChasmRunID returns tag for an entity run ID
func ChasmRunID(id string) ZapTag {
return NewStringTag("run-id", id)
return NewStringTag(ChasmRunIDKey, id)
}

// ActivitySize returns a tag for a standalone activity size
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 13 additions & 1 deletion common/rpc/interceptor/logtags/workflow_tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,22 @@ func (wt *WorkflowTags) fromTaskToken(taskTokenBytes []byte) []tag.Tag {
if len(taskTokenBytes) == 0 {
return nil
}

taskToken, err := wt.serializer.Deserialize(taskTokenBytes)
if err != nil {
wt.logger.Warn("unable to deserialize task token while getting workflow tags", tag.Error(err))
return nil
}
return []tag.Tag{tag.WorkflowID(taskToken.WorkflowId), tag.WorkflowRunID(taskToken.RunId)}

var tags []tag.Tag
if taskToken.WorkflowId != "" {
tags = append(tags, tag.WorkflowID(taskToken.WorkflowId))
}
if taskToken.RunId != "" {
tags = append(tags, tag.WorkflowRunID(taskToken.RunId))
}
if len(taskToken.ComponentRef) > 0 && taskToken.ActivityId != "" {
tags = append(tags, tag.ActivityID(taskToken.ActivityId))
}
return tags
Copy link
Copy Markdown
Contributor Author

@stephanos stephanos Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

main change (1): allow multiple tags and draw them from ComponentRef, too

(OTEL re-uses the log tagging and converts them to OTEL tags)

Copy link
Copy Markdown
Contributor Author

@stephanos stephanos Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PS: bonus side effect - this also now tags SAA with an activity ID cc @dandavison

}
101 changes: 57 additions & 44 deletions common/rpc/interceptor/logtags/workflow_tags_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,12 @@ import (

func TestExtract(t *testing.T) {
serializer := tasktoken.NewSerializer()
mustSerialize := func(token *tokenspb.Task) []byte {
t.Helper()
taskTokenBytes, err := serializer.Serialize(token)
assert.NoError(t, err)
return taskTokenBytes
}

wt := logtags.NewWorkflowTags(
serializer,
Expand All @@ -26,32 +32,34 @@ func TestExtract(t *testing.T) {

tv := testvars.New(t)
tv = tv.WithRunID(tv.Any().RunID())
taskToken := tokenspb.Task{
taskTokenBytes := mustSerialize(&tokenspb.Task{
WorkflowId: tv.WorkflowID(),
RunId: tv.RunID(),
})

type expectation struct {
workflowID string
activityID string
runID string
}
taskTokenBytes, err := serializer.Serialize(&taskToken)
assert.NoError(t, err)

testCases := []struct {
name string
req any
fullMethod string
workflowID string
runID string
expect expectation
}{
{
name: "Frontend StartWorkflowExecutionRequest with only workflowID",
req: &workflowservice.StartWorkflowExecutionRequest{WorkflowId: tv.WorkflowID()},
fullMethod: "/temporal.api.workflowservice.v1.WorkflowService/StartWorkflowExecution",
workflowID: tv.WorkflowID(),
expect: expectation{workflowID: tv.WorkflowID()},
},
{
name: "Frontend RecordActivityTaskHeartbeatByIdRequest with workflowID and runID",
req: &workflowservice.RecordActivityTaskHeartbeatByIdRequest{WorkflowId: tv.WorkflowID(), RunId: tv.RunID()},
fullMethod: "/temporal.api.workflowservice.v1.WorkflowService/RecordActivityTaskHeartbeatById",
workflowID: tv.WorkflowID(),
runID: tv.RunID(),
expect: expectation{workflowID: tv.WorkflowID(), runID: tv.RunID()},
},
{
name: "Frontend GetWorkflowExecutionHistoryRequest with execution",
Expand All @@ -62,8 +70,7 @@ func TestExtract(t *testing.T) {
},
},
fullMethod: "/temporal.api.workflowservice.v1.WorkflowService/GetWorkflowExecutionHistory",
workflowID: tv.WorkflowID(),
runID: tv.RunID(),
expect: expectation{workflowID: tv.WorkflowID(), runID: tv.RunID()},
},
{
name: "Frontend RequestCancelWorkflowExecutionRequest with workflow_execution",
Expand All @@ -74,17 +81,39 @@ func TestExtract(t *testing.T) {
},
},
fullMethod: "/temporal.api.workflowservice.v1.WorkflowService/RequestCancelWorkflowExecution",
workflowID: tv.WorkflowID(),
runID: tv.RunID(),
expect: expectation{workflowID: tv.WorkflowID(), runID: tv.RunID()},
},
{
name: "Frontend RespondActivityTaskCompletedRequest with task_token",
req: &workflowservice.RespondActivityTaskCompletedRequest{
TaskToken: taskTokenBytes,
},
fullMethod: "/temporal.api.workflowservice.v1.WorkflowService/RespondActivityTaskCompleted",
workflowID: tv.WorkflowID(),
runID: tv.RunID(),
expect: expectation{workflowID: tv.WorkflowID(), runID: tv.RunID()},
},
{
name: "Frontend RespondActivityTaskCompletedRequest with workflow activity task_token",
req: &workflowservice.RespondActivityTaskCompletedRequest{
TaskToken: mustSerialize(&tokenspb.Task{
WorkflowId: tv.WorkflowID(),
RunId: tv.RunID(),
ActivityId: "workflow-activity-id",
}),
},
fullMethod: "/temporal.api.workflowservice.v1.WorkflowService/RespondActivityTaskCompleted",
expect: expectation{workflowID: tv.WorkflowID(), runID: tv.RunID()},
},
{
name: "Frontend RespondActivityTaskCompletedRequest with CHASM task_token",
req: &workflowservice.RespondActivityTaskCompletedRequest{
TaskToken: mustSerialize(&tokenspb.Task{
RunId: tv.RunID(),
ActivityId: "activity-id",
ComponentRef: []byte("component-ref"),
}),
},
fullMethod: "/temporal.api.workflowservice.v1.WorkflowService/RespondActivityTaskCompleted",
expect: expectation{activityID: "activity-id", runID: tv.RunID()},
},
{
name: "Frontend RespondQueryTaskCompletedRequest (task_token is ignored)",
Expand All @@ -104,8 +133,7 @@ func TestExtract(t *testing.T) {
},
},
fullMethod: "/temporal.server.api.historyservice.v1.HistoryService/DescribeWorkflowExecution",
workflowID: tv.WorkflowID(),
runID: tv.RunID(),
expect: expectation{workflowID: tv.WorkflowID(), runID: tv.RunID()},
},
{
name: "History RespondWorkflowTaskCompletedRequest",
Expand All @@ -115,8 +143,7 @@ func TestExtract(t *testing.T) {
},
},
fullMethod: "/temporal.server.api.historyservice.v1.HistoryService/RespondWorkflowTaskCompleted",
workflowID: tv.WorkflowID(),
runID: tv.RunID(),
expect: expectation{workflowID: tv.WorkflowID(), runID: tv.RunID()},
},
{
name: "Matching QueryWorkflowRequest",
Expand All @@ -129,8 +156,7 @@ func TestExtract(t *testing.T) {
},
},
fullMethod: "/temporal.server.api.matchingservice.v1.MatchingService/QueryWorkflow",
workflowID: tv.WorkflowID(),
runID: tv.RunID(),
expect: expectation{workflowID: tv.WorkflowID(), runID: tv.RunID()},
},
{
name: "Matching RespondWorkflowTaskCompletedRequest",
Expand All @@ -149,32 +175,19 @@ func TestExtract(t *testing.T) {

for _, tt := range testCases {
t.Run(tt.name, func(t *testing.T) {
tags := wt.Extract(tt.req, tt.fullMethod)
var (
workflowIDTag tag.Tag
runIDTag tag.Tag
)
for _, tg := range tags {
if tg.Key() == tag.WorkflowID("").Key() {
workflowIDTag = tg
}
if tg.Key() == tag.WorkflowRunID("").Key() {
runIDTag = tg
var got expectation
for _, tg := range wt.Extract(tt.req, tt.fullMethod) {
switch tg.Key() {
case tag.WorkflowID("").Key():
got.workflowID = tg.Value().(string)
case tag.ActivityID("").Key():
got.activityID = tg.Value().(string)
case tag.WorkflowRunID("").Key():
got.runID = tg.Value().(string)
default:
}
}

if tt.workflowID != "" {
assert.NotNil(t, workflowIDTag)
assert.Equal(t, workflowIDTag.Value(), tt.workflowID)
} else {
assert.Nil(t, workflowIDTag)
}
if tt.runID != "" {
assert.NotNil(t, runIDTag)
assert.Equal(t, runIDTag.Value(), tt.runID)
} else {
assert.Nil(t, runIDTag)
}
assert.Equal(t, tt.expect, got)
})
}
}
6 changes: 4 additions & 2 deletions common/telemetry/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,10 +172,12 @@ func (c *customServerStatsHandler) annotateTags(
for _, logTag := range c.tags.Extract(payload, methodName) {
var k string
switch logTag.Key() {
case tag.ActivityIDKey:
k = ActivityIDKey
case tag.WorkflowIDKey:
k = WorkflowIDKey
case tag.WorkflowRunIDKey:
k = WorkflowRunIDKey
case tag.ChasmRunIDKey, tag.WorkflowRunIDKey:
k = RunIDKey
default:
continue
}
Expand Down
5 changes: 3 additions & 2 deletions common/telemetry/tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ const (
ComponentQueueVisibility = "queue.visibility"
ComponentUpdateRegistry = "update.registry"

WorkflowIDKey = "temporalWorkflowID"
WorkflowRunIDKey = "temporalRunID"
ActivityIDKey = "temporalActivityID"
RunIDKey = "temporalRunID"
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Re-using this for all entities.

WorkflowIDKey = "temporalWorkflowID"
)
Loading