Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions backend/api/handler/coze/loop/apis/wire_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions backend/modules/observability/application/wire.go
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,7 @@ func InitTaskApplication(
configFactory conf.IConfigLoaderFactory,
benefit benefit.IBenefitService,
ckDb ck.Provider,
meter metrics.Meter,
redis redis.Cmdable,
mqFactory mq.IFactory,
userClient userservice.Client,
Expand Down
17 changes: 15 additions & 2 deletions backend/modules/observability/application/wire_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,10 @@ func (h *TraceHubServiceImpl) processSpansForBackfill(ctx context.Context, spans
}

batch := spans[i:end]
err = h.traceService.MergeHistoryMessagesByRespIDBatch(ctx, spans, sub.t.GetPlatformType())
if err != nil {
return err, false
}
err, shouldFinish = h.processBatchSpans(ctx, batch, sub)
if err != nil {
logs.CtxError(ctx, "process batch spans failed, task_id=%d, batch_start=%d, err=%v",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,9 +173,14 @@ func TestTraceHubServiceImpl_ProcessBatchSpans_DispatchError(t *testing.T) {
t.Cleanup(ctrl.Finish)

mockRepo := repo_mocks.NewMockITaskRepo(ctrl)
mockTraceService := builder_mocks.NewMockITraceService(ctrl)
proc := &stubProcessor{invokeErr: errors.New("invoke fail")}

impl := &TraceHubServiceImpl{taskRepo: mockRepo}
impl := &TraceHubServiceImpl{taskRepo: mockRepo, traceService: mockTraceService}
mockTraceService.EXPECT().
MergeHistoryMessagesByRespIDBatch(gomock.Any(), gomock.Any(), gomock.Any()).
Return(nil).
AnyTimes()

now := time.Now()
sampler := &entity.Sampler{
Expand All @@ -202,12 +207,13 @@ func TestTraceHubServiceImpl_ProcessBatchSpans_DispatchError(t *testing.T) {
RunEndAt: now.Add(time.Minute),
}
sub := &spanSubscriber{
taskID: 1,
t: taskDO,
tr: taskRun,
processor: proc,
runType: entity.TaskRunTypeNewData,
taskRepo: mockRepo,
taskID: 1,
t: taskDO,
tr: taskRun,
processor: proc,
traceService: mockTraceService,
runType: entity.TaskRunTypeNewData,
taskRepo: mockRepo,
}

spanRun := &entity.TaskRun{
Expand Down Expand Up @@ -310,12 +316,14 @@ func TestTraceHubServiceImpl_ListAndSendSpans_WithoutLastSpanPageToken(t *testin
mockTenant := tenant_mocks.NewMockITenantProvider(ctrl)
mockBuilder := builder_mocks.NewMockTraceFilterProcessorBuilder(ctrl)
filterMock := spanfilter_mocks.NewMockFilter(ctrl)
mockTraceService := builder_mocks.NewMockITraceService(ctrl)

impl := &TraceHubServiceImpl{
taskRepo: mockTaskRepo,
traceRepo: mockTraceRepo,
tenantProvider: mockTenant,
buildHelper: mockBuilder,
traceService: mockTraceService,
}

now := time.Now()
Expand All @@ -329,6 +337,10 @@ func TestTraceHubServiceImpl_ListAndSendSpans_WithoutLastSpanPageToken(t *testin
filterMock.EXPECT().BuildRootSpanFilter(gomock.Any(), gomock.Any()).Return([]*loop_span.FilterField{}, nil)
mockBuilder.EXPECT().BuildGetTraceProcessors(gomock.Any(), gomock.Any()).Return([]span_processor.Processor(nil), nil).Times(2)
mockTenant.EXPECT().GetTenantsByPlatformType(gomock.Any(), loop_span.PlatformType(common.PlatformTypeCozeBot)).Return([]string{"tenant"}, nil)
mockTraceService.EXPECT().
MergeHistoryMessagesByRespIDBatch(gomock.Any(), gomock.Any(), gomock.Any()).
Return(nil).
Times(2)

mockTraceRepo.EXPECT().ListSpans(gomock.Any(), gomock.Any()).DoAndReturn(func(_ context.Context, param *repo.ListSpansParam) (*repo.ListSpansResult, error) {
switch param.PageToken {
Expand Down Expand Up @@ -370,12 +382,14 @@ func TestTraceHubServiceImpl_ListAndSendSpans_Success(t *testing.T) {
mockTenant := tenant_mocks.NewMockITenantProvider(ctrl)
mockBuilder := builder_mocks.NewMockTraceFilterProcessorBuilder(ctrl)
filterMock := spanfilter_mocks.NewMockFilter(ctrl)
mockTraceService := builder_mocks.NewMockITraceService(ctrl)

impl := &TraceHubServiceImpl{
taskRepo: mockTaskRepo,
traceRepo: mockTraceRepo,
tenantProvider: mockTenant,
buildHelper: mockBuilder,
traceService: mockTraceService,
}

now := time.Now()
Expand All @@ -390,6 +404,10 @@ func TestTraceHubServiceImpl_ListAndSendSpans_Success(t *testing.T) {
filterMock.EXPECT().BuildRootSpanFilter(gomock.Any(), gomock.Any()).Return([]*loop_span.FilterField{}, nil)
mockBuilder.EXPECT().BuildGetTraceProcessors(gomock.Any(), gomock.Any()).Return([]span_processor.Processor(nil), nil)
mockTenant.EXPECT().GetTenantsByPlatformType(gomock.Any(), loop_span.PlatformType(common.PlatformTypeCozeBot)).Return([]string{"tenant"}, nil)
mockTraceService.EXPECT().
MergeHistoryMessagesByRespIDBatch(gomock.Any(), gomock.Any(), gomock.Any()).
Return(nil).
Times(1)

mockTraceRepo.EXPECT().ListSpans(gomock.Any(), gomock.Any()).DoAndReturn(func(_ context.Context, param *repo.ListSpansParam) (*repo.ListSpansResult, error) {
require.Equal(t, "tenant", param.Tenants[0])
Expand Down Expand Up @@ -470,7 +488,8 @@ func TestTraceHubServiceImpl_DoFlush_NoMoreFinishError(t *testing.T) {
t.Cleanup(ctrl.Finish)

mockTaskRepo := repo_mocks.NewMockITaskRepo(ctrl)
impl := &TraceHubServiceImpl{taskRepo: mockTaskRepo}
mockTraceService := builder_mocks.NewMockITraceService(ctrl)
impl := &TraceHubServiceImpl{taskRepo: mockTaskRepo, traceService: mockTraceService}

now := time.Now()
sub, proc := newBackfillSubscriber(mockTaskRepo, now)
Expand All @@ -480,6 +499,10 @@ func TestTraceHubServiceImpl_DoFlush_NoMoreFinishError(t *testing.T) {

mockTaskRepo.EXPECT().GetTaskCount(gomock.Any(), int64(1)).Return(int64(0), nil)
mockTaskRepo.EXPECT().GetBackfillTaskRun(gomock.Any(), gomock.Nil(), int64(1)).Return(domainRun, nil)
mockTraceService.EXPECT().
MergeHistoryMessagesByRespIDBatch(gomock.Any(), gomock.Any(), gomock.Any()).
Return(nil).
Times(1)

// 调用flushSpans,然后手动调用OnTaskFinished来触发finish错误
err, _ := impl.flushSpans(context.Background(), []*loop_span.Span{span}, sub)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (

func (h *TraceHubServiceImpl) SpanTrigger(ctx context.Context, span *loop_span.Span) error {
logSuffix := fmt.Sprintf("log_id=%s, trace_id=%s, span_id=%s", span.LogID, span.TraceID, span.SpanID)

// 1. perform initial filtering based on space_id
// 1.1 Filter out spans that do not belong to any space or bot
cacheInfo := h.localCache.LoadTaskCache(ctx)
Expand Down Expand Up @@ -91,13 +90,14 @@ func (h *TraceHubServiceImpl) buildSubscriberOfSpan(ctx context.Context, span *l
return nil, err
}
subscribers = append(subscribers, &spanSubscriber{
taskID: taskDO.ID,
t: taskDO,
processor: proc,
taskRepo: h.taskRepo,
runType: entity.TaskRunTypeNewData,
buildHelper: h.buildHelper,
tenants: tenants,
taskID: taskDO.ID,
t: taskDO,
processor: proc,
taskRepo: h.taskRepo,
runType: entity.TaskRunTypeNewData,
buildHelper: h.buildHelper,
tenants: tenants,
traceService: h.traceService,
})
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ func TestTraceHubServiceImpl_SpanTriggerDispatchError(t *testing.T) {
mockFilter := span_filter_mocks.NewMockFilter(ctrl)
configLoader := config_mocks.NewMockITraceConfig(ctrl)
tenantProvider := tenant_mocks.NewMockITenantProvider(ctrl)
mockTraceService := trace_service_mocks.NewMockITraceService(ctrl)

now := time.Now()
workspaceID := int64(1)
Expand Down Expand Up @@ -115,6 +116,10 @@ func TestTraceHubServiceImpl_SpanTriggerDispatchError(t *testing.T) {
mockFilter.EXPECT().BuildALLSpanFilter(gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes()
mockBuilder.EXPECT().BuildPlatformRelatedFilter(gomock.Any(), gomock.Any()).Return(mockFilter, nil).AnyTimes()
tenantProvider.EXPECT().GetTenantsByPlatformType(gomock.Any(), loop_span.PlatformDefault, gomock.Any()).Return([]string{"tenant"}, nil).AnyTimes()
mockTraceService.EXPECT().
MergeHistoryMessagesByRespIDBatch(gomock.Any(), gomock.Any(), gomock.Any()).
Return(nil).
AnyTimes()

spanRun := &entity.TaskRun{
ID: 201,
Expand All @@ -141,6 +146,7 @@ func TestTraceHubServiceImpl_SpanTriggerDispatchError(t *testing.T) {
localCache: NewLocalCache(),
config: configLoader,
tenantProvider: tenantProvider,
traceService: mockTraceService,
}
impl.localCache.taskCache.Store("ObjListWithTask", TaskCacheInfo{WorkspaceIDs: []string{"space-1"}, Tasks: []*entity.ObservabilityTask{taskDO}})

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,15 @@ import (
)

type spanSubscriber struct {
taskID int64
t *entity.ObservabilityTask
tr *entity.TaskRun
processor taskexe.Processor
tenants []string
taskRepo repo.ITaskRepo
runType entity.TaskRunType
buildHelper service.TraceFilterProcessorBuilder
taskID int64
t *entity.ObservabilityTask
tr *entity.TaskRun
processor taskexe.Processor
tenants []string
taskRepo repo.ITaskRepo
runType entity.TaskRunType
buildHelper service.TraceFilterProcessorBuilder
traceService service.ITraceService
}

// Sampled determines whether a span is sampled based on the sampling rate; the sample size will be validated during flush.
Expand Down Expand Up @@ -209,6 +210,15 @@ func (s *spanSubscriber) AddSpan(ctx context.Context, span *loop_span.Span) erro
}
trigger := &taskexe.Trigger{Task: s.t, Span: span, TaskRun: taskRunConfig}
logs.CtxDebug(ctx, "invoke processor, trigger: %v", trigger)
// New Data 在这里处理
// Back fill 在前置批量处理
if s.runType == entity.TaskRunTypeNewData {
err := s.traceService.MergeHistoryMessagesByRespIDBatch(ctx, []*loop_span.Span{span}, s.t.GetPlatformType())
if err != nil {
logs.CtxError(ctx, "merge history messages failed, task_id=%d, span_id=%s err: %v", s.t.ID, span.SpanID, err)
return err
}
}
err = s.processor.Invoke(ctx, trigger)
if err != nil {
logs.CtxWarn(ctx, "invoke processor failed, trace_id=%s, span_id=%s, err: %v", span.TraceID, span.SpanID, err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func NewTraceHubImpl(
backfillProducer mq.IBackfillProducer,
locker lock.ILocker,
config config.ITraceConfig,
traceService service.ITraceService,
) (ITraceHubService, error) {
impl := &TraceHubServiceImpl{
taskRepo: tRepo,
Expand All @@ -48,6 +49,7 @@ func NewTraceHubImpl(
locker: locker,
config: config,
localCache: NewLocalCache(),
traceService: traceService,
}
return impl, nil
}
Expand All @@ -61,7 +63,7 @@ type TraceHubServiceImpl struct {
backfillProducer mq.IBackfillProducer
locker lock.ILocker
config config.ITraceConfig

traceService service.ITraceService
// Local cache - caching non-terminal task information
localCache *LocalCache

Expand Down
Loading
Loading