Skip to content

Commit 1693593

Browse files
authored
[feat][backend]: response API context refill (#415)
* feat(backend): response id context refill * feat(backend): lint & mock * feat(backend): fix UT * feat(backend): add some log * feat(backend): log * feat(backend): log * feat(backend): log * feat(backend): time range * feat(backend): debug log * feat(backend): debug log * feat(backend): bug fix * feat(backend): bug fix * feat(backend): some log * feat(backend): some log * feat(backend): some log * feat(backend): add current span * feat(backend): multi sub fix * feat(backend): loop span resp id * feat(backend): raw span resp id * feat(backend): solve cmts * feat(backend): remove logs * feat(backend): * feat(backend): UT * feat(backend): log info * feat(backend): remove handle resp id
1 parent 3122c69 commit 1693593

19 files changed

Lines changed: 2005 additions & 54 deletions

File tree

backend/api/handler/coze/loop/apis/wire_gen.go

Lines changed: 1 addition & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

backend/modules/observability/application/wire.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -462,6 +462,7 @@ func InitTaskApplication(
462462
configFactory conf.IConfigLoaderFactory,
463463
benefit benefit.IBenefitService,
464464
ckDb ck.Provider,
465+
meter metrics.Meter,
465466
redis redis.Cmdable,
466467
mqFactory mq.IFactory,
467468
userClient userservice.Client,

backend/modules/observability/application/wire_gen.go

Lines changed: 15 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

backend/modules/observability/domain/task/service/taskexe/tracehub/backfill.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -412,6 +412,10 @@ func (h *TraceHubServiceImpl) processSpansForBackfill(ctx context.Context, spans
412412
}
413413

414414
batch := spans[i:end]
415+
err = h.traceService.MergeHistoryMessagesByRespIDBatch(ctx, spans, sub.t.GetPlatformType())
416+
if err != nil {
417+
return err, false
418+
}
415419
err, shouldFinish = h.processBatchSpans(ctx, batch, sub)
416420
if err != nil {
417421
logs.CtxError(ctx, "process batch spans failed, task_id=%d, batch_start=%d, err=%v",

backend/modules/observability/domain/task/service/taskexe/tracehub/backfill_test.go

Lines changed: 31 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -173,9 +173,14 @@ func TestTraceHubServiceImpl_ProcessBatchSpans_DispatchError(t *testing.T) {
173173
t.Cleanup(ctrl.Finish)
174174

175175
mockRepo := repo_mocks.NewMockITaskRepo(ctrl)
176+
mockTraceService := builder_mocks.NewMockITraceService(ctrl)
176177
proc := &stubProcessor{invokeErr: errors.New("invoke fail")}
177178

178-
impl := &TraceHubServiceImpl{taskRepo: mockRepo}
179+
impl := &TraceHubServiceImpl{taskRepo: mockRepo, traceService: mockTraceService}
180+
mockTraceService.EXPECT().
181+
MergeHistoryMessagesByRespIDBatch(gomock.Any(), gomock.Any(), gomock.Any()).
182+
Return(nil).
183+
AnyTimes()
179184

180185
now := time.Now()
181186
sampler := &entity.Sampler{
@@ -202,12 +207,13 @@ func TestTraceHubServiceImpl_ProcessBatchSpans_DispatchError(t *testing.T) {
202207
RunEndAt: now.Add(time.Minute),
203208
}
204209
sub := &spanSubscriber{
205-
taskID: 1,
206-
t: taskDO,
207-
tr: taskRun,
208-
processor: proc,
209-
runType: entity.TaskRunTypeNewData,
210-
taskRepo: mockRepo,
210+
taskID: 1,
211+
t: taskDO,
212+
tr: taskRun,
213+
processor: proc,
214+
traceService: mockTraceService,
215+
runType: entity.TaskRunTypeNewData,
216+
taskRepo: mockRepo,
211217
}
212218

213219
spanRun := &entity.TaskRun{
@@ -310,12 +316,14 @@ func TestTraceHubServiceImpl_ListAndSendSpans_WithoutLastSpanPageToken(t *testin
310316
mockTenant := tenant_mocks.NewMockITenantProvider(ctrl)
311317
mockBuilder := builder_mocks.NewMockTraceFilterProcessorBuilder(ctrl)
312318
filterMock := spanfilter_mocks.NewMockFilter(ctrl)
319+
mockTraceService := builder_mocks.NewMockITraceService(ctrl)
313320

314321
impl := &TraceHubServiceImpl{
315322
taskRepo: mockTaskRepo,
316323
traceRepo: mockTraceRepo,
317324
tenantProvider: mockTenant,
318325
buildHelper: mockBuilder,
326+
traceService: mockTraceService,
319327
}
320328

321329
now := time.Now()
@@ -329,6 +337,10 @@ func TestTraceHubServiceImpl_ListAndSendSpans_WithoutLastSpanPageToken(t *testin
329337
filterMock.EXPECT().BuildRootSpanFilter(gomock.Any(), gomock.Any()).Return([]*loop_span.FilterField{}, nil)
330338
mockBuilder.EXPECT().BuildGetTraceProcessors(gomock.Any(), gomock.Any()).Return([]span_processor.Processor(nil), nil).Times(2)
331339
mockTenant.EXPECT().GetTenantsByPlatformType(gomock.Any(), loop_span.PlatformType(common.PlatformTypeCozeBot)).Return([]string{"tenant"}, nil)
340+
mockTraceService.EXPECT().
341+
MergeHistoryMessagesByRespIDBatch(gomock.Any(), gomock.Any(), gomock.Any()).
342+
Return(nil).
343+
Times(2)
332344

333345
mockTraceRepo.EXPECT().ListSpans(gomock.Any(), gomock.Any()).DoAndReturn(func(_ context.Context, param *repo.ListSpansParam) (*repo.ListSpansResult, error) {
334346
switch param.PageToken {
@@ -370,12 +382,14 @@ func TestTraceHubServiceImpl_ListAndSendSpans_Success(t *testing.T) {
370382
mockTenant := tenant_mocks.NewMockITenantProvider(ctrl)
371383
mockBuilder := builder_mocks.NewMockTraceFilterProcessorBuilder(ctrl)
372384
filterMock := spanfilter_mocks.NewMockFilter(ctrl)
385+
mockTraceService := builder_mocks.NewMockITraceService(ctrl)
373386

374387
impl := &TraceHubServiceImpl{
375388
taskRepo: mockTaskRepo,
376389
traceRepo: mockTraceRepo,
377390
tenantProvider: mockTenant,
378391
buildHelper: mockBuilder,
392+
traceService: mockTraceService,
379393
}
380394

381395
now := time.Now()
@@ -390,6 +404,10 @@ func TestTraceHubServiceImpl_ListAndSendSpans_Success(t *testing.T) {
390404
filterMock.EXPECT().BuildRootSpanFilter(gomock.Any(), gomock.Any()).Return([]*loop_span.FilterField{}, nil)
391405
mockBuilder.EXPECT().BuildGetTraceProcessors(gomock.Any(), gomock.Any()).Return([]span_processor.Processor(nil), nil)
392406
mockTenant.EXPECT().GetTenantsByPlatformType(gomock.Any(), loop_span.PlatformType(common.PlatformTypeCozeBot)).Return([]string{"tenant"}, nil)
407+
mockTraceService.EXPECT().
408+
MergeHistoryMessagesByRespIDBatch(gomock.Any(), gomock.Any(), gomock.Any()).
409+
Return(nil).
410+
Times(1)
393411

394412
mockTraceRepo.EXPECT().ListSpans(gomock.Any(), gomock.Any()).DoAndReturn(func(_ context.Context, param *repo.ListSpansParam) (*repo.ListSpansResult, error) {
395413
require.Equal(t, "tenant", param.Tenants[0])
@@ -470,7 +488,8 @@ func TestTraceHubServiceImpl_DoFlush_NoMoreFinishError(t *testing.T) {
470488
t.Cleanup(ctrl.Finish)
471489

472490
mockTaskRepo := repo_mocks.NewMockITaskRepo(ctrl)
473-
impl := &TraceHubServiceImpl{taskRepo: mockTaskRepo}
491+
mockTraceService := builder_mocks.NewMockITraceService(ctrl)
492+
impl := &TraceHubServiceImpl{taskRepo: mockTaskRepo, traceService: mockTraceService}
474493

475494
now := time.Now()
476495
sub, proc := newBackfillSubscriber(mockTaskRepo, now)
@@ -480,6 +499,10 @@ func TestTraceHubServiceImpl_DoFlush_NoMoreFinishError(t *testing.T) {
480499

481500
mockTaskRepo.EXPECT().GetTaskCount(gomock.Any(), int64(1)).Return(int64(0), nil)
482501
mockTaskRepo.EXPECT().GetBackfillTaskRun(gomock.Any(), gomock.Nil(), int64(1)).Return(domainRun, nil)
502+
mockTraceService.EXPECT().
503+
MergeHistoryMessagesByRespIDBatch(gomock.Any(), gomock.Any(), gomock.Any()).
504+
Return(nil).
505+
Times(1)
483506

484507
// 调用flushSpans,然后手动调用OnTaskFinished来触发finish错误
485508
err, _ := impl.flushSpans(context.Background(), []*loop_span.Span{span}, sub)

backend/modules/observability/domain/task/service/taskexe/tracehub/span_trigger.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,6 @@ func (h *TraceHubServiceImpl) withTaskRunCreateLock(
6666

6767
func (h *TraceHubServiceImpl) SpanTrigger(ctx context.Context, span *loop_span.Span) error {
6868
logSuffix := fmt.Sprintf("log_id=%s, trace_id=%s, span_id=%s", span.LogID, span.TraceID, span.SpanID)
69-
7069
// 1. perform initial filtering based on space_id
7170
// 1.1 Filter out spans that do not belong to any space or bot
7271
cacheInfo := h.localCache.LoadTaskCache(ctx)
@@ -144,13 +143,14 @@ func (h *TraceHubServiceImpl) buildSubscriberOfSpan(ctx context.Context, span *l
144143
return nil, err
145144
}
146145
subscribers = append(subscribers, &spanSubscriber{
147-
taskID: taskDO.ID,
148-
t: taskDO,
149-
processor: proc,
150-
taskRepo: h.taskRepo,
151-
runType: entity.TaskRunTypeNewData,
152-
buildHelper: h.buildHelper,
153-
tenants: tenants,
146+
taskID: taskDO.ID,
147+
t: taskDO,
148+
processor: proc,
149+
taskRepo: h.taskRepo,
150+
runType: entity.TaskRunTypeNewData,
151+
buildHelper: h.buildHelper,
152+
tenants: tenants,
153+
traceService: h.traceService,
154154
})
155155
}
156156

backend/modules/observability/domain/task/service/taskexe/tracehub/span_trigger_test.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ func TestTraceHubServiceImpl_SpanTriggerDispatchError(t *testing.T) {
6666
mockFilter := span_filter_mocks.NewMockFilter(ctrl)
6767
configLoader := config_mocks.NewMockITraceConfig(ctrl)
6868
tenantProvider := tenant_mocks.NewMockITenantProvider(ctrl)
69+
mockTraceService := trace_service_mocks.NewMockITraceService(ctrl)
6970

7071
now := time.Now()
7172
workspaceID := int64(1)
@@ -119,6 +120,10 @@ func TestTraceHubServiceImpl_SpanTriggerDispatchError(t *testing.T) {
119120
mockFilter.EXPECT().BuildALLSpanFilter(gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes()
120121
mockBuilder.EXPECT().BuildPlatformRelatedFilter(gomock.Any(), gomock.Any()).Return(mockFilter, nil).AnyTimes()
121122
tenantProvider.EXPECT().GetTenantsByPlatformType(gomock.Any(), loop_span.PlatformDefault, gomock.Any()).Return([]string{"tenant"}, nil).AnyTimes()
123+
mockTraceService.EXPECT().
124+
MergeHistoryMessagesByRespIDBatch(gomock.Any(), gomock.Any(), gomock.Any()).
125+
Return(nil).
126+
AnyTimes()
122127

123128
spanRun := &entity.TaskRun{
124129
ID: 201,
@@ -145,6 +150,7 @@ func TestTraceHubServiceImpl_SpanTriggerDispatchError(t *testing.T) {
145150
localCache: NewLocalCache(),
146151
config: configLoader,
147152
tenantProvider: tenantProvider,
153+
traceService: mockTraceService,
148154
}
149155
impl.localCache.taskCache.Store("ObjListWithTask", TaskCacheInfo{WorkspaceIDs: []string{"space-1"}, Tasks: []*entity.ObservabilityTask{taskDO}})
150156

backend/modules/observability/domain/task/service/taskexe/tracehub/subscriber.go

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,15 @@ import (
2121
)
2222

2323
type spanSubscriber struct {
24-
taskID int64
25-
t *entity.ObservabilityTask
26-
tr *entity.TaskRun
27-
processor taskexe.Processor
28-
tenants []string
29-
taskRepo repo.ITaskRepo
30-
runType entity.TaskRunType
31-
buildHelper service.TraceFilterProcessorBuilder
24+
taskID int64
25+
t *entity.ObservabilityTask
26+
tr *entity.TaskRun
27+
processor taskexe.Processor
28+
tenants []string
29+
taskRepo repo.ITaskRepo
30+
runType entity.TaskRunType
31+
buildHelper service.TraceFilterProcessorBuilder
32+
traceService service.ITraceService
3233
}
3334

3435
// Sampled determines whether a span is sampled based on the sampling rate; the sample size will be validated during flush.
@@ -209,6 +210,15 @@ func (s *spanSubscriber) AddSpan(ctx context.Context, span *loop_span.Span) erro
209210
}
210211
trigger := &taskexe.Trigger{Task: s.t, Span: span, TaskRun: taskRunConfig}
211212
logs.CtxDebug(ctx, "invoke processor, trigger: %v", trigger)
213+
// New Data 在这里处理
214+
// Back fill 在前置批量处理
215+
if s.runType == entity.TaskRunTypeNewData {
216+
err := s.traceService.MergeHistoryMessagesByRespIDBatch(ctx, []*loop_span.Span{span}, s.t.GetPlatformType())
217+
if err != nil {
218+
logs.CtxError(ctx, "merge history messages failed, task_id=%d, span_id=%s err: %v", s.t.ID, span.SpanID, err)
219+
return err
220+
}
221+
}
212222
err = s.processor.Invoke(ctx, trigger)
213223
if err != nil {
214224
logs.CtxWarn(ctx, "invoke processor failed, trace_id=%s, span_id=%s, err: %v", span.TraceID, span.SpanID, err)

backend/modules/observability/domain/task/service/taskexe/tracehub/trace_hub.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ func NewTraceHubImpl(
3636
backfillProducer mq.IBackfillProducer,
3737
locker lock.ILocker,
3838
config config.ITraceConfig,
39+
traceService service.ITraceService,
3940
) (ITraceHubService, error) {
4041
impl := &TraceHubServiceImpl{
4142
taskRepo: tRepo,
@@ -48,6 +49,7 @@ func NewTraceHubImpl(
4849
locker: locker,
4950
config: config,
5051
localCache: NewLocalCache(),
52+
traceService: traceService,
5153
}
5254
return impl, nil
5355
}
@@ -61,7 +63,7 @@ type TraceHubServiceImpl struct {
6163
backfillProducer mq.IBackfillProducer
6264
locker lock.ILocker
6365
config config.ITraceConfig
64-
66+
traceService service.ITraceService
6567
// Local cache - caching non-terminal task information
6668
localCache *LocalCache
6769

0 commit comments

Comments
 (0)