Skip to content

Commit be5693e

Browse files
lsy357SharkeyChenEv4nFengtpfz
authored
[feat][backend] volcengine agentkit (#427)
* fix(evaluation): idl anno # Conflicts: # idl/thrift/coze/loop/evaluation/coze.loop.evaluation.evaluator.thrift * fix(evaluation): idl param anno * [feat][backend] add model param * [feat][backend] add model param * [feat][backend] add model param * [feat][backend] fix * feat(backend): add some log * fix(evaluation): EvaluatorIDVersionItem version tag * [feat][backend] add protocol * [feat][backend] fix idl * [feat][backend] support agentkit: code gen * [feat][backend] support agentkit: code gen * [feat][backend] add do field * fix(evaluation): create evaluator with workspace_id * fix(evaluation): CreateEvaluatorRequest workspace_id * fix(evaluation): update idl * feat(llm): pick idl * feat(backend): remove some log * [feat][backend] add thinking * [feat][backend] add param option * [feat][backend] add undefined protocol * [feat][backend] add convert * [feat][backend] add convert * [feat][backend] add param value * feat(evaluation): evaluator ModelConfig * fix(evaluation): llm chat modelconfig * [feat][backend] fix convert model resp * [feat][backend] add family * feat(infra): rmq config * feat(infra): rmq config * fix(evaluation): rmq config * fix(evaluation): evaluator ModelID optional * fix auth * [feat][backend] add convert * fix 主从延迟 * [feat][backend] fix reaction * 新增target type * 新增EvalTargetType * add VolcengineAgentAgentkit * [feat][backend] add family enum * [feat][backend] merge from feat/model_agent * [feat][backend] regenerate llm idl * [feat][backend] fix doubao * [feat][backend] generate ability * fix(infra): add registry close * fix dto * fix(evaluation): ListEvaluatorTemplate repo err * fix(evaluation): GetExptResultExportRecordResponse * [feat][backend] add model url * [feat][backend] add model url * feat(evaluation): expttpl api in volcagentkit * fix(evaluation): targetpo EvalTargetTypeVolcengineAgentAgentkit * fix(evaluation): CheckExperimentTemplateNameRequest * [feat][backend] add preset model * [feat][backend] add preset model * [feat][backend] add preset model * [feat][backend] add preset model * [feat][backend] add preset model * fix(evaluation): evaluator modelconfig with preset_model * fix(evaluation): evaluator llm preset_model * fix(evaluation): BatchGetExperimentAggrResultResponse ExptAggregateResult * fix(evaluation): VolcengineAgent runtimeid * fix(evaluation): evaluator debug timeout * fix(evaluation): MockEvalTargetOutput tag * fix(evaluation): DebugBuiltinEvaluator timeout * feat(backend): ut * fix(evaluation): ut * fix(evaluation): golint * [feat][backend] add ut for ConvertToParamValues * [feat][backend] add ut for GetAbilityEnums * fix(evaluation): mq close * fix(evaluation): api opentag --------- Co-authored-by: chenzhe.29 <chenzhe.29@bytedance.com> Co-authored-by: fengboyun.evan <fengboyun.evan@bytedance.com> Co-authored-by: tpfz <wangziqi.9425@bytedance.com>
1 parent 795a4f1 commit be5693e

90 files changed

Lines changed: 15707 additions & 2212 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

backend/cmd/main.go

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ import (
99
"fmt"
1010
"net/url"
1111
"os"
12+
"os/signal"
13+
"syscall"
1214
"time"
1315

1416
"github.com/bytedance/gg/gptr"
@@ -60,12 +62,21 @@ func main() {
6062
if err := initTracer(handler); err != nil {
6163
panic(err)
6264
}
63-
consumerWorkers := MustInitConsumerWorkers(c.cfgFactory, handler, handler, handler, handler)
64-
if err := registry.NewConsumerRegistry(c.mqFactory).Register(consumerWorkers).StartAll(ctx); err != nil {
65+
66+
signalCtx, signalCancel := signal.NotifyContext(ctx, syscall.SIGTERM, syscall.SIGINT)
67+
defer signalCancel()
68+
69+
r := registry.NewConsumerRegistryWithShutdown(signalCtx, c.mqFactory).Register(MustInitConsumerWorkers(c.cfgFactory, handler, handler, handler, handler))
70+
if err := r.StartAll(ctx); err != nil {
6571
panic(err)
6672
}
6773

68-
api.Start(handler)
74+
go api.Start(handler)
75+
<-signalCtx.Done()
76+
77+
stopCtx, stopCancel := context.WithTimeout(context.Background(), 30*time.Second)
78+
defer stopCancel()
79+
_ = r.StopAll(stopCtx)
6980
}
7081

7182
type ComponentConfig struct {

backend/infra/mq/factory.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,9 @@ type ProducerConfig struct {
3131
FlushFrequency time.Duration
3232
// How long to wait for the cluster to settle between retries
3333
RetryBackoff time.Duration
34+
35+
AccessKey *string
36+
AccessSecret *string
3437
}
3538

3639
type ConsumerConfig struct {
@@ -50,6 +53,9 @@ type ConsumerConfig struct {
5053
ConsumeTimeout time.Duration
5154
EnablePPE *bool
5255
IsEnabled *bool
56+
57+
AccessKey *string
58+
AccessSecret *string
5359
}
5460

5561
type CompressionCodec int

backend/infra/mq/mocks/registry.go

Lines changed: 14 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

backend/infra/mq/registry.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
type ConsumerRegistry interface {
1313
Register(worker []IConsumerWorker) ConsumerRegistry
1414
StartAll(ctx context.Context) error
15+
StopAll(ctx context.Context) error
1516
}
1617

1718
type IConsumerWorker interface {

backend/infra/mq/registry/registry.go

Lines changed: 66 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ package registry
55

66
import (
77
"context"
8+
"errors"
89

910
"github.com/coze-dev/coze-loop/backend/infra/mq"
1011
"github.com/coze-dev/coze-loop/backend/pkg/errorx"
@@ -14,20 +15,27 @@ import (
1415
)
1516

1617
type defaultConsumerRegistry struct {
17-
factory mq.IFactory
18-
workers []mq.IConsumerWorker
18+
factory mq.IFactory
19+
workers []mq.IConsumerWorker
20+
consumers []mq.IConsumer
21+
shutdownCtx context.Context
1922
}
2023

2124
func NewConsumerRegistry(factory mq.IFactory) mq.ConsumerRegistry {
2225
return &defaultConsumerRegistry{factory: factory}
2326
}
2427

28+
func NewConsumerRegistryWithShutdown(shutdownCtx context.Context, factory mq.IFactory) mq.ConsumerRegistry {
29+
return &defaultConsumerRegistry{factory: factory, shutdownCtx: shutdownCtx}
30+
}
31+
2532
func (d *defaultConsumerRegistry) Register(worker []mq.IConsumerWorker) mq.ConsumerRegistry {
2633
d.workers = append(d.workers, worker...)
2734
return d
2835
}
2936

3037
func (d *defaultConsumerRegistry) StartAll(ctx context.Context) error {
38+
d.consumers = nil
3139
for _, worker := range d.workers {
3240
cfg, err := worker.ConsumerCfg(ctx)
3341
if err != nil {
@@ -39,14 +47,47 @@ func (d *defaultConsumerRegistry) StartAll(ctx context.Context) error {
3947
return errorx.Wrapf(err, "NewConsumer fail, cfg: %v", json.Jsonify(cfg))
4048
}
4149

42-
consumer.RegisterHandler(newSafeConsumerWrapper(worker))
50+
handler := newSafeConsumerWrapper(worker)
51+
if d.shutdownCtx != nil {
52+
handler = newShutdownContextWrapper(handler, d.shutdownCtx)
53+
}
54+
consumer.RegisterHandler(handler)
4355
if err := consumer.Start(); err != nil {
4456
return errorx.Wrapf(err, "StartConsumer fail, cfg: %v", json.Jsonify(cfg))
4557
}
58+
d.consumers = append(d.consumers, consumer)
4659
}
4760
return nil
4861
}
4962

63+
func (d *defaultConsumerRegistry) StopAll(ctx context.Context) error {
64+
if len(d.consumers) == 0 {
65+
return nil
66+
}
67+
var errs []error
68+
for i := len(d.consumers) - 1; i >= 0; i-- {
69+
select {
70+
case <-ctx.Done():
71+
errs = append(errs, ctx.Err())
72+
return errors.Join(errs...)
73+
default:
74+
consumer := d.consumers[i]
75+
done := make(chan error, 1)
76+
go func(c mq.IConsumer) { done <- c.Close() }(consumer)
77+
select {
78+
case err := <-done:
79+
if err != nil {
80+
errs = append(errs, err)
81+
}
82+
case <-ctx.Done():
83+
errs = append(errs, ctx.Err())
84+
return errors.Join(errs...)
85+
}
86+
}
87+
}
88+
return errors.Join(errs...)
89+
}
90+
5091
type safeConsumerHandlerDecorator struct {
5192
handler mq.IConsumerHandler
5293
}
@@ -59,3 +100,25 @@ func (s *safeConsumerHandlerDecorator) HandleMessage(ctx context.Context, msg *m
59100
func newSafeConsumerWrapper(h mq.IConsumerHandler) mq.IConsumerHandler {
60101
return &safeConsumerHandlerDecorator{handler: h}
61102
}
103+
104+
type shutdownContextDecorator struct {
105+
handler mq.IConsumerHandler
106+
shutdownCtx context.Context
107+
}
108+
109+
func (s *shutdownContextDecorator) HandleMessage(ctx context.Context, msg *mq.MessageExt) error {
110+
nctx, cancel := context.WithCancel(ctx)
111+
go func() {
112+
defer goroutine.Recovery(ctx)
113+
select {
114+
case <-ctx.Done():
115+
case <-s.shutdownCtx.Done():
116+
}
117+
cancel()
118+
}()
119+
return s.handler.HandleMessage(nctx, msg)
120+
}
121+
122+
func newShutdownContextWrapper(h mq.IConsumerHandler, shutdownCtx context.Context) mq.IConsumerHandler {
123+
return &shutdownContextDecorator{handler: h, shutdownCtx: shutdownCtx}
124+
}

backend/infra/mq/registry/registry_test.go

Lines changed: 154 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ func TestDefaultConsumerRegistry_StartAll(t *testing.T) {
2020
name string
2121
workers []mq.IConsumerWorker
2222
setupMocks func(*mocks.MockIFactory, []*mocks.MockIConsumer, []*mocks.MockIConsumerWorker)
23+
shutdownCtx context.Context
2324
expectedError error
2425
}{
2526
{
@@ -39,6 +40,21 @@ func TestDefaultConsumerRegistry_StartAll(t *testing.T) {
3940
},
4041
expectedError: nil,
4142
},
43+
{
44+
name: "successfully start all workers with shutdown ctx",
45+
workers: []mq.IConsumerWorker{
46+
mocks.NewMockIConsumerWorker(gomock.NewController(t)),
47+
},
48+
setupMocks: func(factory *mocks.MockIFactory, consumers []*mocks.MockIConsumer, workers []*mocks.MockIConsumerWorker) {
49+
cfg := &mq.ConsumerConfig{}
50+
workers[0].EXPECT().ConsumerCfg(gomock.Any()).Return(cfg, nil)
51+
consumers[0].EXPECT().RegisterHandler(gomock.Any()).Return()
52+
consumers[0].EXPECT().Start().Return(nil)
53+
factory.EXPECT().NewConsumer(gomock.Any()).Return(consumers[0], nil)
54+
},
55+
shutdownCtx: context.Background(),
56+
expectedError: nil,
57+
},
4258
{
4359
name: "fail to get consumer config",
4460
workers: []mq.IConsumerWorker{
@@ -92,9 +108,12 @@ func TestDefaultConsumerRegistry_StartAll(t *testing.T) {
92108
}
93109

94110
tt.setupMocks(factory, consumers, workers)
95-
96-
registry := NewConsumerRegistry(factory).Register(tt.workers)
97-
111+
var registry mq.ConsumerRegistry
112+
if tt.shutdownCtx != nil {
113+
registry = NewConsumerRegistryWithShutdown(tt.shutdownCtx, factory).Register(tt.workers)
114+
} else {
115+
registry = NewConsumerRegistry(factory).Register(tt.workers)
116+
}
98117
err := registry.StartAll(context.Background())
99118
if tt.expectedError != nil {
100119
assert.Error(t, err)
@@ -106,6 +125,69 @@ func TestDefaultConsumerRegistry_StartAll(t *testing.T) {
106125
}
107126
}
108127

128+
func TestDefaultConsumerRegistry_StopAll(t *testing.T) {
129+
t.Run("no consumers", func(t *testing.T) {
130+
ctrl := gomock.NewController(t)
131+
defer ctrl.Finish()
132+
factory := mocks.NewMockIFactory(ctrl)
133+
registry := NewConsumerRegistry(factory)
134+
err := registry.StopAll(context.Background())
135+
assert.NoError(t, err)
136+
})
137+
138+
t.Run("successfully stop all consumers", func(t *testing.T) {
139+
ctrl := gomock.NewController(t)
140+
defer ctrl.Finish()
141+
factory := mocks.NewMockIFactory(ctrl)
142+
workers := []mq.IConsumerWorker{
143+
mocks.NewMockIConsumerWorker(ctrl),
144+
mocks.NewMockIConsumerWorker(ctrl),
145+
}
146+
consumers := []*mocks.MockIConsumer{
147+
mocks.NewMockIConsumer(ctrl),
148+
mocks.NewMockIConsumer(ctrl),
149+
}
150+
cfg := &mq.ConsumerConfig{}
151+
for i := range workers {
152+
workers[i].(*mocks.MockIConsumerWorker).EXPECT().ConsumerCfg(gomock.Any()).Return(cfg, nil)
153+
factory.EXPECT().NewConsumer(gomock.Any()).Return(consumers[i], nil)
154+
consumers[i].EXPECT().RegisterHandler(gomock.Any())
155+
consumers[i].EXPECT().Start().Return(nil)
156+
}
157+
registry := NewConsumerRegistry(factory).Register(workers)
158+
err := registry.StartAll(context.Background())
159+
assert.NoError(t, err)
160+
161+
// StopAll 按逆序关闭,先关 consumers[1] 再关 consumers[0]
162+
consumers[1].EXPECT().Close().Return(nil)
163+
consumers[0].EXPECT().Close().Return(nil)
164+
err = registry.StopAll(context.Background())
165+
assert.NoError(t, err)
166+
})
167+
168+
t.Run("context cancelled during stop", func(t *testing.T) {
169+
ctrl := gomock.NewController(t)
170+
defer ctrl.Finish()
171+
factory := mocks.NewMockIFactory(ctrl)
172+
worker := mocks.NewMockIConsumerWorker(ctrl)
173+
consumer := mocks.NewMockIConsumer(ctrl)
174+
cfg := &mq.ConsumerConfig{}
175+
worker.EXPECT().ConsumerCfg(gomock.Any()).Return(cfg, nil)
176+
factory.EXPECT().NewConsumer(gomock.Any()).Return(consumer, nil)
177+
consumer.EXPECT().RegisterHandler(gomock.Any())
178+
consumer.EXPECT().Start().Return(nil)
179+
registry := NewConsumerRegistry(factory).Register([]mq.IConsumerWorker{worker})
180+
err := registry.StartAll(context.Background())
181+
assert.NoError(t, err)
182+
183+
ctx, cancel := context.WithCancel(context.Background())
184+
cancel()
185+
err = registry.StopAll(ctx)
186+
assert.Error(t, err)
187+
assert.True(t, errors.Is(err, context.Canceled))
188+
})
189+
}
190+
109191
func TestSafeConsumerHandlerDecorator_HandleMessage(t *testing.T) {
110192
tests := []struct {
111193
name string
@@ -150,3 +232,72 @@ func TestSafeConsumerHandlerDecorator_HandleMessage(t *testing.T) {
150232
})
151233
}
152234
}
235+
236+
func TestNewConsumerRegistryWithShutdown(t *testing.T) {
237+
ctrl := gomock.NewController(t)
238+
defer ctrl.Finish()
239+
240+
factory := mocks.NewMockIFactory(ctrl)
241+
shutdownCtx, cancel := context.WithCancel(context.Background())
242+
defer cancel()
243+
244+
registry := NewConsumerRegistryWithShutdown(shutdownCtx, factory).(*defaultConsumerRegistry)
245+
assert.Equal(t, factory, registry.factory)
246+
assert.Equal(t, shutdownCtx, registry.shutdownCtx)
247+
}
248+
249+
func TestShutdownContextDecorator_HandleMessage(t *testing.T) {
250+
ctrl := gomock.NewController(t)
251+
defer ctrl.Finish()
252+
253+
mockHandler := mocks.NewMockIConsumerWorker(ctrl)
254+
shutdownCtx, shutdownCancel := context.WithCancel(context.Background())
255+
256+
decorator := &shutdownContextDecorator{
257+
handler: mockHandler,
258+
shutdownCtx: shutdownCtx,
259+
}
260+
261+
tests := []struct {
262+
name string
263+
setupMock func()
264+
triggerCancel func()
265+
ctx context.Context
266+
}{
267+
{
268+
name: "normal execution",
269+
setupMock: func() {
270+
mockHandler.EXPECT().HandleMessage(gomock.Any(), gomock.Any()).Return(nil)
271+
},
272+
triggerCancel: func() {},
273+
ctx: context.Background(),
274+
},
275+
{
276+
name: "shutdown context cancelled",
277+
setupMock: func() {
278+
mockHandler.EXPECT().HandleMessage(gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, msg *mq.MessageExt) error {
279+
<-ctx.Done()
280+
return ctx.Err()
281+
})
282+
},
283+
triggerCancel: func() {
284+
shutdownCancel()
285+
},
286+
ctx: context.Background(),
287+
},
288+
}
289+
290+
for _, tt := range tests {
291+
t.Run(tt.name, func(t *testing.T) {
292+
tt.setupMock()
293+
go tt.triggerCancel()
294+
err := decorator.HandleMessage(tt.ctx, &mq.MessageExt{})
295+
if tt.name == "shutdown context cancelled" {
296+
assert.Error(t, err)
297+
assert.True(t, errors.Is(err, context.Canceled))
298+
} else {
299+
assert.NoError(t, err)
300+
}
301+
})
302+
}
303+
}

0 commit comments

Comments
 (0)