Skip to content

Commit fec9c1b

Browse files
Ev4nFengHearyShen
authored andcommitted
[fix][backend]: Task filter tenant (#386)
* feat(backend): Task filter tenant * feat(backend): lint * feat(backend): lint * feat(backend): lint * feat(backend): fix * feat(backend): fix * feat(backend): fornax by default * feat(backend): delete debug logs * feat(backend): delete debug logs * feat(backend): fix UT * feat(backend): rm default tenant `fornax` * feat(backend): add schema key
1 parent 74732fa commit fec9c1b

8 files changed

Lines changed: 243 additions & 21 deletions

File tree

backend/modules/observability/domain/task/entity/event.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,11 @@ func (s *RawSpan) RawSpanConvertToLoopSpan() *loop_span.Span {
145145
callType := tagsString["call_type"]
146146
spanType := tagsString["span_type"]
147147

148+
// RawSpan 中的 tenant 一般在 Tags 字段里
149+
// 而 LoopSpan 在 systemTagsString 中
150+
// 这里是为了方便转化为 loopSpan 对象后统一使用 getTenant 方法
151+
systemTagsString["tenant"] = tagsString["tenant"]
152+
148153
result := &loop_span.Span{
149154
StartTime: s.StartTimeInUs,
150155
SpanID: s.SpanID,

backend/modules/observability/domain/task/service/taskexe/processor/auto_evaluate.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ func (p *AutoEvaluateProcessor) Invoke(ctx context.Context, trigger *taskexe.Tri
120120
_ = p.taskRepo.IncrTaskRunCount(ctx, trigger.Task.ID, taskRun.ID, taskTTL)
121121
taskCount, _ := p.taskRepo.GetTaskCount(ctx, trigger.Task.ID)
122122
taskRunCount, _ := p.taskRepo.GetTaskRunCount(ctx, trigger.Task.ID, taskRun.ID)
123-
if (trigger.Task.Sampler.CycleCount != 0 && taskRunCount > trigger.Task.Sampler.CycleCount) ||
123+
if (trigger.Task.Sampler.IsCycle && trigger.Task.Sampler.CycleCount != 0 && taskRunCount > trigger.Task.Sampler.CycleCount) ||
124124
(taskCount > trigger.Task.Sampler.SampleSize) {
125125
logs.CtxInfo(ctx, "[task-debug] AutoEvaluateProcessor Invoke, subCount:%v,taskCount:%v", taskRunCount, taskCount)
126126
_ = p.taskRepo.DecrTaskCount(ctx, trigger.Task.ID, taskTTL)

backend/modules/observability/domain/task/service/taskexe/processor/utils.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,12 +120,14 @@ func convertDatasetSchemaDTO2DO(schema *dataset0.DatasetSchema) entity.DatasetSc
120120
name := fs.GetName()
121121
description := fs.GetDescription()
122122
textSchema := fs.GetTextSchema()
123+
schemaKey := fs.GetSchemaKey()
123124
result.FieldSchemas[i] = entity.FieldSchema{
124125
Key: &key,
125126
Name: name,
126127
Description: description,
127128
ContentType: convertContentTypeDTO2DO(fs.GetContentType()),
128129
TextSchema: textSchema,
130+
SchemaKey: entity.SchemaKey(schemaKey),
129131
}
130132
}
131133
}

backend/modules/observability/domain/task/service/taskexe/tracehub/span_trigger.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,6 @@ func (h *TraceHubServiceImpl) buildSubscriberOfSpan(ctx context.Context, span *l
6666
logs.CtxError(ctx, "Failed to get consumer listening config, err: %v", err)
6767
return nil, err
6868
}
69-
7069
var subscribers []*spanSubscriber
7170
taskDOs, err := h.listNonFinalTaskByRedis(ctx, span.WorkspaceID)
7271
if err != nil {
@@ -86,13 +85,19 @@ func (h *TraceHubServiceImpl) buildSubscriberOfSpan(ctx context.Context, span *l
8685
}
8786

8887
proc := h.taskProcessor.GetTaskProcessor(taskDO.TaskType)
88+
tenants, err := h.getTenants(ctx, taskDO.GetPlatformType())
89+
if err != nil {
90+
logs.CtxError(ctx, "Failed to get tenants, err: %v", err)
91+
return nil, err
92+
}
8993
subscribers = append(subscribers, &spanSubscriber{
9094
taskID: taskDO.ID,
9195
t: taskDO,
9296
processor: proc,
9397
taskRepo: h.taskRepo,
9498
runType: entity.TaskRunTypeNewData,
9599
buildHelper: h.buildHelper,
100+
tenants: tenants,
96101
})
97102
}
98103

backend/modules/observability/domain/task/service/taskexe/tracehub/span_trigger_test.go

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
taskconvertor "github.com/coze-dev/coze-loop/backend/modules/observability/application/convertor/task"
1717
componentconfig "github.com/coze-dev/coze-loop/backend/modules/observability/domain/component/config"
1818
config_mocks "github.com/coze-dev/coze-loop/backend/modules/observability/domain/component/config/mocks"
19+
tenant_mocks "github.com/coze-dev/coze-loop/backend/modules/observability/domain/component/tenant/mocks"
1920
"github.com/coze-dev/coze-loop/backend/modules/observability/domain/task/entity"
2021
repo_mocks "github.com/coze-dev/coze-loop/backend/modules/observability/domain/task/repo/mocks"
2122
"github.com/coze-dev/coze-loop/backend/modules/observability/domain/task/service/taskexe/processor"
@@ -60,6 +61,7 @@ func TestTraceHubServiceImpl_SpanTriggerDispatchError(t *testing.T) {
6061
mockBuilder := trace_service_mocks.NewMockTraceFilterProcessorBuilder(ctrl)
6162
mockFilter := span_filter_mocks.NewMockFilter(ctrl)
6263
configLoader := config_mocks.NewMockITraceConfig(ctrl)
64+
tenantProvider := tenant_mocks.NewMockITenantProvider(ctrl)
6365

6466
now := time.Now()
6567
workspaceID := int64(1)
@@ -112,6 +114,7 @@ func TestTraceHubServiceImpl_SpanTriggerDispatchError(t *testing.T) {
112114
mockFilter.EXPECT().BuildBasicSpanFilter(gomock.Any(), gomock.Any()).Return(nil, false, nil).AnyTimes()
113115
mockFilter.EXPECT().BuildALLSpanFilter(gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes()
114116
mockBuilder.EXPECT().BuildPlatformRelatedFilter(gomock.Any(), gomock.Any()).Return(mockFilter, nil).AnyTimes()
117+
tenantProvider.EXPECT().GetTenantsByPlatformType(gomock.Any(), loop_span.PlatformDefault, gomock.Any()).Return([]string{"tenant"}, nil).AnyTimes()
115118

116119
spanRun := &entity.TaskRun{
117120
ID: 201,
@@ -132,11 +135,12 @@ func TestTraceHubServiceImpl_SpanTriggerDispatchError(t *testing.T) {
132135
taskProcessor.Register(entity.TaskTypeAutoEval, procMock)
133136

134137
impl := &TraceHubServiceImpl{
135-
taskRepo: mockRepo,
136-
buildHelper: mockBuilder,
137-
taskProcessor: taskProcessor,
138-
localCache: NewLocalCache(),
139-
config: configLoader,
138+
taskRepo: mockRepo,
139+
buildHelper: mockBuilder,
140+
taskProcessor: taskProcessor,
141+
localCache: NewLocalCache(),
142+
config: configLoader,
143+
tenantProvider: tenantProvider,
140144
}
141145
impl.localCache.taskCache.Store("ObjListWithTask", TaskCacheInfo{WorkspaceIDs: []string{"space-1"}, Tasks: []*entity.ObservabilityTask{taskDO}})
142146

@@ -146,13 +150,12 @@ func TestTraceHubServiceImpl_SpanTriggerDispatchError(t *testing.T) {
146150
LogID: "log",
147151
StartTimeInUs: now.UnixMicro(),
148152
Tags: map[string]any{
149-
"fornax_space_id": "space-1",
150-
"call_type": "",
151-
"bot_id": "bot-1",
152-
},
153-
SystemTags: map[string]any{
153+
"fornax_space_id": "space-1",
154+
"call_type": "",
155+
"bot_id": "bot-1",
154156
loop_span.SpanFieldTenant: "tenant",
155157
},
158+
SystemTags: map[string]any{},
156159
SensitiveTags: &entity.SensitiveTags{},
157160
ServerEnv: &entity.ServerInRawSpan{},
158161
}

backend/modules/observability/domain/task/service/taskexe/tracehub/subscriber.go

Lines changed: 27 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,11 @@ import (
2121
)
2222

2323
type spanSubscriber struct {
24-
taskID int64
25-
t *entity.ObservabilityTask
26-
tr *entity.TaskRun
27-
processor taskexe.Processor
28-
24+
taskID int64
25+
t *entity.ObservabilityTask
26+
tr *entity.TaskRun
27+
processor taskexe.Processor
28+
tenants []string
2929
taskRepo repo.ITaskRepo
3030
runType entity.TaskRunType
3131
buildHelper service.TraceFilterProcessorBuilder
@@ -66,15 +66,15 @@ func (s *spanSubscriber) Match(ctx context.Context, span *loop_span.Span) (bool,
6666
return false, nil
6767
}
6868

69-
filters := s.buildSpanFilters(ctx, task)
69+
filters := s.buildSpanFilters(ctx, task, span)
7070
if !filters.Satisfied(span) {
7171
return false, nil
7272
}
7373

7474
return true, nil
7575
}
7676

77-
func (s *spanSubscriber) buildSpanFilters(ctx context.Context, taskDO *entity.ObservabilityTask) *loop_span.FilterFields {
77+
func (s *spanSubscriber) buildSpanFilters(ctx context.Context, taskDO *entity.ObservabilityTask, span *loop_span.Span) *loop_span.FilterFields {
7878
// Additional filters can be constructed based on task configuration if needed.
7979
// Simplified handling here: returning nil means no extra filters are applied.
8080
filters := &loop_span.FilterFields{}
@@ -93,11 +93,30 @@ func (s *spanSubscriber) buildSpanFilters(ctx context.Context, taskDO *entity.Ob
9393
logs.CtxError(ctx, "traverse filter fields failed, task_id=%d, err=%v", taskDO.ID, err)
9494
return filters
9595
}
96-
filters = combineFilters(builtinFilter, &taskDO.SpanFilter.Filters)
96+
var tenantFilter *loop_span.FilterFields = nil
97+
if len(span.GetTenant()) > 0 {
98+
tenantFilter = buildTenantFilter(s.tenants)
99+
}
100+
filters = combineFilters(builtinFilter, &taskDO.SpanFilter.Filters, tenantFilter)
97101

98102
return filters
99103
}
100104

105+
func buildTenantFilter(tenants []string) *loop_span.FilterFields {
106+
return &loop_span.FilterFields{
107+
FilterFields: []*loop_span.FilterField{
108+
{
109+
FieldName: loop_span.SpanFieldTenant,
110+
FieldType: loop_span.FieldTypeString,
111+
Values: tenants,
112+
QueryType: ptr.Of(loop_span.QueryTypeEnumIn),
113+
IsSystem: true,
114+
},
115+
},
116+
QueryAndOr: ptr.Of(loop_span.QueryAndOrEnumAnd),
117+
}
118+
}
119+
101120
func buildBuiltinFilters(ctx context.Context, f span_filter.Filter, req *ListSpansReq) (*loop_span.FilterFields, error) {
102121
filters := make([]*loop_span.FilterField, 0)
103122
env := &span_filter.SpanEnv{

backend/modules/observability/domain/task/service/taskexe/tracehub/subscriber_test.go

Lines changed: 186 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@ import (
1515
repo_mocks "github.com/coze-dev/coze-loop/backend/modules/observability/domain/task/repo/mocks"
1616
"github.com/coze-dev/coze-loop/backend/modules/observability/domain/task/service/taskexe"
1717
"github.com/coze-dev/coze-loop/backend/modules/observability/domain/trace/entity/loop_span"
18+
"github.com/coze-dev/coze-loop/backend/modules/observability/domain/trace/service/trace/span_filter"
19+
"github.com/coze-dev/coze-loop/backend/modules/observability/domain/trace/service/trace/span_processor"
20+
"github.com/coze-dev/coze-loop/backend/pkg/lang/ptr"
1821
)
1922

2023
type noopProcessor struct{ invoked bool }
@@ -45,6 +48,60 @@ func (n *noopProcessor) OnTaskCreated(ctx context.Context, currentTask *entity.O
4548
return nil
4649
}
4750

51+
type fakeSpanFilter struct {
52+
basic []*loop_span.FilterField
53+
root []*loop_span.FilterField
54+
llm []*loop_span.FilterField
55+
all []*loop_span.FilterField
56+
force bool
57+
}
58+
59+
func (f *fakeSpanFilter) BuildBasicSpanFilter(ctx context.Context, env *span_filter.SpanEnv) ([]*loop_span.FilterField, bool, error) {
60+
return f.basic, f.force, nil
61+
}
62+
63+
func (f *fakeSpanFilter) BuildRootSpanFilter(ctx context.Context, env *span_filter.SpanEnv) ([]*loop_span.FilterField, error) {
64+
return f.root, nil
65+
}
66+
67+
func (f *fakeSpanFilter) BuildLLMSpanFilter(ctx context.Context, env *span_filter.SpanEnv) ([]*loop_span.FilterField, error) {
68+
return f.llm, nil
69+
}
70+
71+
func (f *fakeSpanFilter) BuildALLSpanFilter(ctx context.Context, env *span_filter.SpanEnv) ([]*loop_span.FilterField, error) {
72+
return f.all, nil
73+
}
74+
75+
type fakeBuilder struct{ f span_filter.Filter }
76+
77+
func (b *fakeBuilder) BuildPlatformRelatedFilter(ctx context.Context, pt loop_span.PlatformType) (span_filter.Filter, error) {
78+
return b.f, nil
79+
}
80+
81+
func (b *fakeBuilder) BuildGetTraceProcessors(ctx context.Context, set span_processor.Settings) ([]span_processor.Processor, error) {
82+
return nil, nil
83+
}
84+
85+
func (b *fakeBuilder) BuildListSpansProcessors(ctx context.Context, set span_processor.Settings) ([]span_processor.Processor, error) {
86+
return nil, nil
87+
}
88+
89+
func (b *fakeBuilder) BuildAdvanceInfoProcessors(ctx context.Context, set span_processor.Settings) ([]span_processor.Processor, error) {
90+
return nil, nil
91+
}
92+
93+
func (b *fakeBuilder) BuildIngestTraceProcessors(ctx context.Context, set span_processor.Settings) ([]span_processor.Processor, error) {
94+
return nil, nil
95+
}
96+
97+
func (b *fakeBuilder) BuildSearchTraceOApiProcessors(ctx context.Context, set span_processor.Settings) ([]span_processor.Processor, error) {
98+
return nil, nil
99+
}
100+
101+
func (b *fakeBuilder) BuildListSpansOApiProcessors(ctx context.Context, set span_processor.Settings) ([]span_processor.Processor, error) {
102+
return nil, nil
103+
}
104+
48105
func TestSpanSubscriber_AddSpan_SkipNonRunning(t *testing.T) {
49106
t.Parallel()
50107
ctrl := gomock.NewController(t)
@@ -78,3 +135,132 @@ func TestSpanSubscriber_AddSpan_SkipNonRunning(t *testing.T) {
78135
assert.NoError(t, err)
79136
assert.False(t, proc.invoked, "Invoke should not be called for non-running TaskRun")
80137
}
138+
139+
func TestSpanSubscriber_Match_PlatformAndTenant_Positive(t *testing.T) {
140+
t.Parallel()
141+
basic := []*loop_span.FilterField{
142+
{
143+
FieldName: loop_span.SpanFieldPSM,
144+
FieldType: loop_span.FieldTypeString,
145+
Values: []string{"coze-loop"},
146+
QueryType: ptr.Of(loop_span.QueryTypeEnumIn),
147+
},
148+
}
149+
f := &fakeSpanFilter{basic: basic, root: nil, force: true}
150+
task := &entity.ObservabilityTask{
151+
ID: 1,
152+
WorkspaceID: 7,
153+
TaskStatus: entity.TaskStatusRunning,
154+
SpanFilter: &entity.SpanFilterFields{
155+
PlatformType: loop_span.PlatformCozeLoop,
156+
SpanListType: loop_span.SpanListTypeRootSpan,
157+
},
158+
}
159+
sub := &spanSubscriber{
160+
taskID: task.ID,
161+
t: task,
162+
processor: &noopProcessor{},
163+
taskRepo: nil,
164+
runType: entity.TaskRunTypeNewData,
165+
buildHelper: &fakeBuilder{f: f},
166+
tenants: []string{"tenant1", "tenant2"},
167+
}
168+
span := &loop_span.Span{
169+
SpanID: "s1",
170+
TraceID: "t1",
171+
PSM: "coze-loop",
172+
SystemTagsString: map[string]string{
173+
loop_span.SpanFieldTenant: "tenant1",
174+
},
175+
StartTime: time.Now().UnixMilli(),
176+
}
177+
matched, err := sub.Match(context.Background(), span)
178+
assert.NoError(t, err)
179+
assert.True(t, matched)
180+
}
181+
182+
func TestSpanSubscriber_Match_NegativePSM(t *testing.T) {
183+
t.Parallel()
184+
basic := []*loop_span.FilterField{
185+
{
186+
FieldName: loop_span.SpanFieldPSM,
187+
FieldType: loop_span.FieldTypeString,
188+
Values: []string{"coze-loop"},
189+
QueryType: ptr.Of(loop_span.QueryTypeEnumIn),
190+
},
191+
}
192+
f := &fakeSpanFilter{basic: basic, root: nil, force: true}
193+
task := &entity.ObservabilityTask{
194+
ID: 2,
195+
WorkspaceID: 7,
196+
TaskStatus: entity.TaskStatusRunning,
197+
SpanFilter: &entity.SpanFilterFields{
198+
PlatformType: loop_span.PlatformCozeLoop,
199+
SpanListType: loop_span.SpanListTypeRootSpan,
200+
},
201+
}
202+
sub := &spanSubscriber{
203+
taskID: task.ID,
204+
t: task,
205+
processor: &noopProcessor{},
206+
taskRepo: nil,
207+
runType: entity.TaskRunTypeNewData,
208+
buildHelper: &fakeBuilder{f: f},
209+
tenants: []string{"tenant1"},
210+
}
211+
span := &loop_span.Span{
212+
SpanID: "s2",
213+
TraceID: "t2",
214+
PSM: "other",
215+
SystemTagsString: map[string]string{
216+
loop_span.SpanFieldTenant: "tenant1",
217+
},
218+
StartTime: time.Now().UnixMilli(),
219+
}
220+
matched, err := sub.Match(context.Background(), span)
221+
assert.NoError(t, err)
222+
assert.False(t, matched)
223+
}
224+
225+
func TestSpanSubscriber_Match_NegativeTenant(t *testing.T) {
226+
t.Parallel()
227+
basic := []*loop_span.FilterField{
228+
{
229+
FieldName: loop_span.SpanFieldPSM,
230+
FieldType: loop_span.FieldTypeString,
231+
Values: []string{"coze-loop"},
232+
QueryType: ptr.Of(loop_span.QueryTypeEnumIn),
233+
},
234+
}
235+
f := &fakeSpanFilter{basic: basic, root: nil, force: true}
236+
task := &entity.ObservabilityTask{
237+
ID: 3,
238+
WorkspaceID: 7,
239+
TaskStatus: entity.TaskStatusRunning,
240+
SpanFilter: &entity.SpanFilterFields{
241+
PlatformType: loop_span.PlatformCozeLoop,
242+
SpanListType: loop_span.SpanListTypeRootSpan,
243+
},
244+
}
245+
sub := &spanSubscriber{
246+
taskID: task.ID,
247+
t: task,
248+
processor: &noopProcessor{},
249+
taskRepo: nil,
250+
runType: entity.TaskRunTypeNewData,
251+
buildHelper: &fakeBuilder{f: f},
252+
tenants: []string{"tenantX"},
253+
}
254+
span := &loop_span.Span{
255+
SpanID: "s3",
256+
TraceID: "t3",
257+
PSM: "coze-loop",
258+
SystemTagsString: map[string]string{
259+
loop_span.SpanFieldTenant: "tenant1",
260+
},
261+
StartTime: time.Now().UnixMilli(),
262+
}
263+
matched, err := sub.Match(context.Background(), span)
264+
assert.NoError(t, err)
265+
assert.False(t, matched)
266+
}

0 commit comments

Comments
 (0)