Skip to content

Commit 8081cac

Browse files
authored
[fix] auto task (#235)
* add switch auto_task * add hidden * add hidden proc * fix hidden * fix * fix * add * add hidden proc * add * add debug log * add * fix * add source * add redis proc * add hindden proc * delete useless code * fix ut * fix * fix redisdao * add ut mock * delete useless code * add ctx * fix redis * fix redis * fix * fix redis proc * go fmt * fix redis proc * fix * redis * add ut * add ut * add ut * go fmt * add logs * delete hidden idl * fix * add filter * fix * fix panic * fix * fix redis * fix ut * delete useless code * delete useless code * mq err * fix ut * add * fix cache * add switch * fix lock * fix cache * add cache * fix ut
1 parent 98db15a commit 8081cac

7 files changed

Lines changed: 81 additions & 61 deletions

File tree

backend/modules/observability/domain/task/service/taskexe/tracehub/scheduled_task.go

Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212

1313
"github.com/coze-dev/coze-loop/backend/kitex_gen/coze/loop/observability/domain/filter"
1414
"github.com/coze-dev/coze-loop/backend/kitex_gen/coze/loop/observability/domain/task"
15+
"github.com/coze-dev/coze-loop/backend/modules/observability/domain/component/config"
1516
"github.com/coze-dev/coze-loop/backend/modules/observability/domain/task/entity"
1617
"github.com/coze-dev/coze-loop/backend/modules/observability/domain/task/service/taskexe"
1718
"github.com/coze-dev/coze-loop/backend/modules/observability/infra/repo/mysql"
@@ -45,6 +46,7 @@ const (
4546

4647
// startScheduledTask launches the scheduled task goroutine
4748
func (h *TraceHubServiceImpl) startScheduledTask() {
49+
h.syncTaskCache()
4850
go func() {
4951
for {
5052
select {
@@ -75,6 +77,15 @@ func (h *TraceHubServiceImpl) startScheduledTask() {
7577
}
7678

7779
func (h *TraceHubServiceImpl) transformTaskStatus() {
80+
const key = "consumer_listening"
81+
cfg := &config.ConsumerListening{}
82+
if err := h.loader.UnmarshalKey(context.Background(), key, cfg); err != nil {
83+
return
84+
}
85+
if !cfg.IsEnabled || !cfg.IsAllSpace {
86+
return
87+
}
88+
7889
if slices.Contains([]string{TracehubClusterName, InjectClusterName}, os.Getenv(TceCluster)) {
7990
return
8091
}
@@ -91,11 +102,6 @@ func (h *TraceHubServiceImpl) transformTaskStatus() {
91102
logs.CtxInfo(ctx, "transformTaskStatus lock held by others, skip execution")
92103
return
93104
}
94-
defer func() {
95-
if _, err := h.locker.Unlock(transformTaskStatusLockKey); err != nil {
96-
logs.CtxWarn(ctx, "transformTaskStatus release lock failed", "err", err)
97-
}
98-
}()
99105
}
100106
logs.CtxInfo(ctx, "Scheduled task started...")
101107

@@ -264,11 +270,6 @@ func (h *TraceHubServiceImpl) syncTaskRunCounts() {
264270
logs.CtxInfo(ctx, "syncTaskRunCounts lock held by others, skip execution")
265271
return
266272
}
267-
defer func() {
268-
if _, err := h.locker.Unlock(syncTaskRunCountsLockKey); err != nil {
269-
logs.CtxWarn(ctx, "syncTaskRunCounts release lock failed", "err", err)
270-
}
271-
}()
272273
}
273274
logs.CtxInfo(ctx, "Start syncing TaskRunCounts to database...")
274275
// 1. Retrieve non-final task list
@@ -339,11 +340,8 @@ func (h *TraceHubServiceImpl) syncTaskCache() {
339340
h.taskCacheLock.Lock()
340341
defer h.taskCacheLock.Unlock()
341342

342-
// Clear old cache
343-
h.taskCache.Delete("ObjListWithTask")
344-
345343
// 4. Write new cache into local cache
346-
h.taskCache.Store("ObjListWithTask", &newCache)
344+
h.taskCache.Store("ObjListWithTask", newCache)
347345

348346
logs.CtxInfo(ctx, "Task cache sync completed, taskCount:%d, updateTime:%s", len(tasks), newCache.UpdateTime.Format(time.RFC3339))
349347
}

backend/modules/observability/domain/task/service/taskexe/tracehub/scheduled_task_test.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@ func TestTraceHubServiceImpl_transformTaskStatus(t *testing.T) {
100100
impl := &TraceHubServiceImpl{
101101
taskRepo: mockRepo,
102102
taskProcessor: tp,
103+
loader: newEnabledConsumerLoader(),
103104
}
104105
return impl, proc
105106
},
@@ -133,6 +134,7 @@ func TestTraceHubServiceImpl_transformTaskStatus(t *testing.T) {
133134
impl := &TraceHubServiceImpl{
134135
taskRepo: mockRepo,
135136
taskProcessor: tp,
137+
loader: newEnabledConsumerLoader(),
136138
}
137139
return impl, proc
138140
},
@@ -172,6 +174,7 @@ func TestTraceHubServiceImpl_transformTaskStatus(t *testing.T) {
172174
impl := &TraceHubServiceImpl{
173175
taskRepo: mockRepo,
174176
taskProcessor: tp,
177+
loader: newEnabledConsumerLoader(),
175178
}
176179
return impl, proc
177180
},
@@ -217,7 +220,6 @@ func TestTraceHubServiceImpl_transformTaskStatus(t *testing.T) {
217220
mockRepo.EXPECT().ListTasks(gomock.Any(), gomock.Any()).Return([]*entity.ObservabilityTask{taskPO}, int64(1), nil)
218221
locker.EXPECT().LockWithRenew(gomock.Any(), gomock.Any(), transformTaskStatusLockTTL, backfillLockMaxHold).
219222
Return(false, context.Background(), func() {}, errors.New("lock failed"))
220-
locker.EXPECT().Unlock(transformTaskStatusLockKey).Return(true, nil)
221223

222224
proc := newTrackingProcessor()
223225
tp := processor.NewTaskProcessor()
@@ -229,6 +231,7 @@ func TestTraceHubServiceImpl_transformTaskStatus(t *testing.T) {
229231
taskProcessor: tp,
230232
locker: locker,
231233
backfillProducer: producer,
234+
loader: newEnabledConsumerLoader(),
232235
}
233236
return impl, proc
234237
},
@@ -331,7 +334,7 @@ func TestTraceHubServiceImpl_syncTaskCache(t *testing.T) {
331334

332335
mockRepo := repo_mocks.NewMockITaskRepo(ctrl)
333336
impl := &TraceHubServiceImpl{taskRepo: mockRepo}
334-
impl.taskCache.Store("ObjListWithTask", &TaskCacheInfo{})
337+
impl.taskCache.Store("ObjListWithTask", TaskCacheInfo{})
335338

336339
workspaceIDs := []string{"space-1"}
337340
botIDs := []string{"bot-1"}
@@ -343,7 +346,7 @@ func TestTraceHubServiceImpl_syncTaskCache(t *testing.T) {
343346

344347
val, ok := impl.taskCache.Load("ObjListWithTask")
345348
require.True(t, ok)
346-
cache, ok := val.(*TaskCacheInfo)
349+
cache, ok := val.(TaskCacheInfo)
347350
require.True(t, ok)
348351
require.Equal(t, workspaceIDs, cache.WorkspaceIDs)
349352
require.Equal(t, botIDs, cache.BotIDs)

backend/modules/observability/domain/task/service/taskexe/tracehub/span_trigger.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -276,14 +276,14 @@ func (h *TraceHubServiceImpl) getObjListWithTaskFromCache(ctx context.Context) (
276276
objListWithTask, ok := h.taskCache.Load("ObjListWithTask")
277277
if !ok {
278278
// Cache is empty, fallback to the database
279-
logs.CtxInfo(ctx, "Cache is empty, retrieving task list from database")
280-
return h.taskRepo.GetObjListWithTask(ctx)
279+
logs.CtxError(ctx, "Cache is empty, retrieving task list from database")
280+
return nil, nil, nil
281281
}
282282

283-
cacheInfo, ok := objListWithTask.(*TaskCacheInfo)
283+
cacheInfo, ok := objListWithTask.(TaskCacheInfo)
284284
if !ok {
285285
logs.CtxError(ctx, "Cache data type mismatch")
286-
return h.taskRepo.GetObjListWithTask(ctx)
286+
return nil, nil, nil
287287
}
288288

289289
logs.CtxInfo(ctx, "Retrieve task list from cache, taskCount=%d, spaceCount=%d, botCount=%d", len(cacheInfo.Tasks), len(cacheInfo.WorkspaceIDs), len(cacheInfo.BotIDs))

backend/modules/observability/domain/task/service/taskexe/tracehub/span_trigger_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ func TestTraceHubServiceImpl_SpanTriggerSkipNoWorkspace(t *testing.T) {
3030
t.Parallel()
3131

3232
impl := &TraceHubServiceImpl{}
33-
impl.taskCache.Store("ObjListWithTask", &TaskCacheInfo{})
33+
impl.taskCache.Store("ObjListWithTask", TaskCacheInfo{})
3434

3535
raw := &entity.RawSpan{
3636
TraceID: "trace",
@@ -132,7 +132,7 @@ func TestTraceHubServiceImpl_SpanTriggerDispatchError(t *testing.T) {
132132
taskProcessor: taskProcessor,
133133
loader: configLoader,
134134
}
135-
impl.taskCache.Store("ObjListWithTask", &TaskCacheInfo{WorkspaceIDs: []string{"space-1"}, Tasks: []*entity.ObservabilityTask{taskDO}})
135+
impl.taskCache.Store("ObjListWithTask", TaskCacheInfo{WorkspaceIDs: []string{"space-1"}, Tasks: []*entity.ObservabilityTask{taskDO}})
136136

137137
raw := &entity.RawSpan{
138138
TraceID: "trace",

backend/modules/observability/domain/task/service/taskexe/tracehub/test_helpers_test.go

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,12 @@ package tracehub
55

66
import (
77
"context"
8+
"encoding/json"
89

10+
"github.com/coze-dev/coze-loop/backend/modules/observability/domain/component/config"
911
"github.com/coze-dev/coze-loop/backend/modules/observability/domain/task/entity"
1012
"github.com/coze-dev/coze-loop/backend/modules/observability/domain/task/service/taskexe"
13+
"github.com/coze-dev/coze-loop/backend/pkg/conf"
1114
)
1215

1316
func floatPtr(v float64) *float64 { return &v }
@@ -75,3 +78,47 @@ func (s *stubProcessor) OnCreateTaskRunChange(_ context.Context, req taskexe.OnC
7578
func (s *stubProcessor) OnFinishTaskRunChange(context.Context, taskexe.OnFinishTaskRunChangeReq) error {
7679
return s.finishTaskRunErr
7780
}
81+
82+
type stubConfigLoader struct {
83+
values map[string]any
84+
}
85+
86+
func newStubConfigLoader() *stubConfigLoader {
87+
return &stubConfigLoader{values: make(map[string]any)}
88+
}
89+
90+
func (s *stubConfigLoader) Set(key string, value any) {
91+
s.values[key] = value
92+
}
93+
94+
func (s *stubConfigLoader) Get(_ context.Context, key string) any {
95+
if s.values == nil {
96+
return nil
97+
}
98+
return s.values[key]
99+
}
100+
101+
func (s *stubConfigLoader) UnmarshalKey(_ context.Context, key string, value any, _ ...conf.DecodeOptionFn) error {
102+
v, ok := s.values[key]
103+
if !ok {
104+
return nil
105+
}
106+
data, err := json.Marshal(v)
107+
if err != nil {
108+
return err
109+
}
110+
return json.Unmarshal(data, value)
111+
}
112+
113+
func (s *stubConfigLoader) Unmarshal(context.Context, any, ...conf.DecodeOptionFn) error {
114+
return nil
115+
}
116+
117+
func newEnabledConsumerLoader() *stubConfigLoader {
118+
loader := newStubConfigLoader()
119+
loader.Set("consumer_listening", &config.ConsumerListening{
120+
IsEnabled: true,
121+
IsAllSpace: true,
122+
})
123+
return loader
124+
}

backend/modules/observability/domain/task/service/taskexe/tracehub/trace_hub.go

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ import (
1010

1111
"github.com/coze-dev/coze-loop/backend/infra/external/benefit"
1212
"github.com/coze-dev/coze-loop/backend/infra/lock"
13-
"github.com/coze-dev/coze-loop/backend/modules/observability/domain/component/config"
1413
"github.com/coze-dev/coze-loop/backend/modules/observability/domain/component/mq"
1514
"github.com/coze-dev/coze-loop/backend/modules/observability/domain/component/tenant"
1615
"github.com/coze-dev/coze-loop/backend/modules/observability/domain/task/entity"
@@ -63,14 +62,7 @@ func NewTraceHubImpl(
6362
}
6463

6564
// Start the scheduled tasks immediately
66-
const key = "consumer_listening"
67-
cfg := &config.ConsumerListening{}
68-
if err := loader.UnmarshalKey(context.Background(), key, cfg); err != nil {
69-
return nil, err
70-
}
71-
if cfg.IsEnabled && cfg.IsAllSpace {
72-
impl.startScheduledTask()
73-
}
65+
impl.startScheduledTask()
7466

7567
// default+lane?+新集群?——定时任务和任务处理分开——内场
7668
return impl, nil

backend/modules/observability/domain/task/service/taskexe/tracehub/trace_hub_test.go

Lines changed: 9 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -22,23 +22,13 @@ import (
2222
func TestTraceHubServiceImpl_getObjListWithTaskFromCache_Fallback(t *testing.T) {
2323
t.Parallel()
2424

25-
ctrl := gomock.NewController(t)
26-
t.Cleanup(ctrl.Finish)
27-
28-
mockRepo := repo_mocks.NewMockITaskRepo(ctrl)
29-
impl := &TraceHubServiceImpl{taskRepo: mockRepo}
30-
3125
ctx := context.Background()
32-
spaceIDs := []string{"space-1"}
33-
botIDs := []string{"bot-1"}
34-
tasks := []*entity.ObservabilityTask{{}}
35-
36-
mockRepo.EXPECT().GetObjListWithTask(gomock.Any()).Return(spaceIDs, botIDs, tasks)
26+
impl := &TraceHubServiceImpl{}
3727

3828
gotSpaces, gotBots, gotTasks := impl.getObjListWithTaskFromCache(ctx)
39-
require.Equal(t, spaceIDs, gotSpaces)
40-
require.Equal(t, botIDs, gotBots)
41-
require.Equal(t, tasks, gotTasks)
29+
require.Nil(t, gotSpaces)
30+
require.Nil(t, gotBots)
31+
require.Nil(t, gotTasks)
4232
}
4333

4434
func TestTraceHubServiceImpl_getObjListWithTaskFromCache_FromCache(t *testing.T) {
@@ -50,7 +40,7 @@ func TestTraceHubServiceImpl_getObjListWithTaskFromCache_FromCache(t *testing.T)
5040
mockRepo := repo_mocks.NewMockITaskRepo(ctrl)
5141
impl := &TraceHubServiceImpl{taskRepo: mockRepo}
5242

53-
expected := &TaskCacheInfo{
43+
expected := TaskCacheInfo{
5444
WorkspaceIDs: []string{"space-2"},
5545
BotIDs: []string{"bot-2"},
5646
Tasks: []*entity.ObservabilityTask{{}},
@@ -66,24 +56,14 @@ func TestTraceHubServiceImpl_getObjListWithTaskFromCache_FromCache(t *testing.T)
6656
func TestTraceHubServiceImpl_getObjListWithTaskFromCache_TypeMismatch(t *testing.T) {
6757
t.Parallel()
6858

69-
ctrl := gomock.NewController(t)
70-
t.Cleanup(ctrl.Finish)
71-
72-
mockRepo := repo_mocks.NewMockITaskRepo(ctrl)
73-
impl := &TraceHubServiceImpl{taskRepo: mockRepo}
59+
impl := &TraceHubServiceImpl{}
7460

7561
impl.taskCache.Store("ObjListWithTask", "invalid")
7662

77-
spaceIDs := []string{"fallback-space"}
78-
botIDs := []string{"fallback-bot"}
79-
tasks := []*entity.ObservabilityTask{{}}
80-
81-
mockRepo.EXPECT().GetObjListWithTask(gomock.Any()).Return(spaceIDs, botIDs, tasks)
82-
8363
gotSpaces, gotBots, gotTasks := impl.getObjListWithTaskFromCache(context.Background())
84-
require.Equal(t, spaceIDs, gotSpaces)
85-
require.Equal(t, botIDs, gotBots)
86-
require.Equal(t, tasks, gotTasks)
64+
require.Nil(t, gotSpaces)
65+
require.Nil(t, gotBots)
66+
require.Nil(t, gotTasks)
8767
}
8868

8969
func TestTraceHubServiceImpl_applySampling(t *testing.T) {

0 commit comments

Comments
 (0)