Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
78 commits
Select commit Hold shift + click to select a range
ea43e8f
fix(evaluation): idl anno
lsy357 Jan 15, 2026
37545bc
fix(evaluation): idl param anno
lsy357 Jan 19, 2026
da1fcf8
[feat][backend] add model param
SharkeyChen Jan 19, 2026
60bb3d4
[feat][backend] add model param
SharkeyChen Jan 19, 2026
fe4a629
[feat][backend] add model param
SharkeyChen Jan 19, 2026
3944afa
[feat][backend] fix
SharkeyChen Jan 19, 2026
91788a2
feat(backend): add some log
Ev4nFeng Jan 20, 2026
b7b79b8
fix(evaluation): EvaluatorIDVersionItem version tag
lsy357 Jan 21, 2026
7ba393e
[feat][backend] add protocol
SharkeyChen Jan 21, 2026
7e23ad4
[feat][backend] fix idl
SharkeyChen Jan 21, 2026
57efe4f
[feat][backend] support agentkit: code gen
SharkeyChen Jan 21, 2026
e2a05d9
[feat][backend] support agentkit: code gen
SharkeyChen Jan 21, 2026
0bab4e7
[feat][backend] add do field
SharkeyChen Jan 21, 2026
9c19a53
fix(evaluation): create evaluator with workspace_id
lsy357 Jan 22, 2026
2993c6c
fix(evaluation): CreateEvaluatorRequest workspace_id
lsy357 Jan 22, 2026
da29e41
fix(evaluation): update idl
lsy357 Jan 22, 2026
59e730b
feat(llm): pick idl
lsy357 Jan 23, 2026
b833774
feat(backend): remove some log
Ev4nFeng Jan 23, 2026
34a5033
[feat][backend] add thinking
SharkeyChen Jan 25, 2026
9ef244d
[feat][backend] add param option
SharkeyChen Jan 25, 2026
5154a1d
[feat][backend] add undefined protocol
SharkeyChen Jan 25, 2026
bd9d9bd
[feat][backend] add convert
SharkeyChen Jan 25, 2026
bfe92dd
[feat][backend] add convert
SharkeyChen Jan 25, 2026
b3931aa
[feat][backend] add param value
SharkeyChen Jan 25, 2026
734b85b
feat(evaluation): evaluator ModelConfig
lsy357 Jan 26, 2026
c1f35e5
fix(evaluation): llm chat modelconfig
lsy357 Jan 26, 2026
d6adf9d
Merge branch 'main' into feat/agentkit
lsy357 Jan 26, 2026
4a29d52
[feat][backend] fix convert model resp
SharkeyChen Jan 26, 2026
2182c02
[feat][backend] add family
SharkeyChen Jan 27, 2026
3fcd191
feat(infra): rmq config
lsy357 Jan 27, 2026
1aaa67e
feat(infra): rmq config
lsy357 Jan 27, 2026
540f95d
fix(evaluation): rmq config
lsy357 Jan 27, 2026
41d03a3
fix(evaluation): evaluator ModelID optional
lsy357 Jan 29, 2026
89792ab
Merge branch 'main' into feat/agentkit
lsy357 Jan 29, 2026
613df0a
fix auth
tpfz Jan 29, 2026
52d1e68
[feat][backend] add convert
SharkeyChen Jan 27, 2026
0a58ad4
fix 主从延迟
tpfz Jan 29, 2026
f1ee948
[feat][backend] fix reaction
SharkeyChen Jan 29, 2026
0a75f9b
新增target type
tpfz Jan 29, 2026
6d31236
新增EvalTargetType
tpfz Jan 29, 2026
6d36c1b
add VolcengineAgentAgentkit
tpfz Jan 29, 2026
3c7a879
[feat][backend] add family enum
SharkeyChen Jan 29, 2026
f3c4625
Merge branch 'feat/model_agentkit' into feat/agentkit
SharkeyChen Jan 29, 2026
1136f30
[feat][backend] merge from feat/model_agent
SharkeyChen Jan 29, 2026
baaee1c
[feat][backend] regenerate llm idl
SharkeyChen Jan 29, 2026
a062a2d
[feat][backend] fix doubao
SharkeyChen Jan 29, 2026
53c4f06
[feat][backend] generate ability
SharkeyChen Jan 29, 2026
a63a006
fix(infra): add registry close
lsy357 Jan 29, 2026
79f73e9
fix dto
tpfz Jan 30, 2026
a5af492
fix(evaluation): ListEvaluatorTemplate repo err
lsy357 Jan 31, 2026
5b2769e
fix(evaluation): GetExptResultExportRecordResponse
lsy357 Jan 31, 2026
a6d71e2
[feat][backend] add model url
SharkeyChen Feb 1, 2026
fc4d302
[feat][backend] add model url
SharkeyChen Feb 1, 2026
305096f
feat(evaluation): expttpl api in volcagentkit
lsy357 Feb 2, 2026
5af762d
fix(evaluation): targetpo EvalTargetTypeVolcengineAgentAgentkit
lsy357 Feb 2, 2026
1fd03e5
fix(evaluation): CheckExperimentTemplateNameRequest
lsy357 Feb 2, 2026
23350a9
[feat][backend] add preset model
SharkeyChen Feb 3, 2026
6df2db5
Merge remote-tracking branch 'origin/feat/agentkit' into feat/agentkit
SharkeyChen Feb 3, 2026
d5ae362
[feat][backend] add preset model
SharkeyChen Feb 3, 2026
d76d51f
[feat][backend] add preset model
SharkeyChen Feb 3, 2026
60b40a4
[feat][backend] add preset model
SharkeyChen Feb 3, 2026
f09a6df
[feat][backend] add preset model
SharkeyChen Feb 3, 2026
4294f10
fix(evaluation): evaluator modelconfig with preset_model
lsy357 Feb 3, 2026
4764d9d
fix(evaluation): evaluator llm preset_model
lsy357 Feb 3, 2026
ed66bff
fix(evaluation): BatchGetExperimentAggrResultResponse ExptAggregateRe…
lsy357 Feb 3, 2026
3d1dc95
fix(evaluation): VolcengineAgent runtimeid
lsy357 Feb 3, 2026
66a36b4
fix(evaluation): evaluator debug timeout
lsy357 Feb 4, 2026
60256f4
Merge branch 'main' into feat/agentkit
lsy357 Feb 4, 2026
1e90e83
fix(evaluation): MockEvalTargetOutput tag
lsy357 Feb 5, 2026
bca49ef
fix(evaluation): DebugBuiltinEvaluator timeout
lsy357 Feb 5, 2026
4d87b1c
feat(backend): ut
lsy357 Feb 6, 2026
8a4229c
fix(evaluation): ut
lsy357 Feb 6, 2026
b57af9a
fix(evaluation): golint
lsy357 Feb 6, 2026
79e65cb
[feat][backend] add ut for ConvertToParamValues
SharkeyChen Feb 6, 2026
489b9cb
[feat][backend] add ut for GetAbilityEnums
SharkeyChen Feb 6, 2026
6aaa0a3
fix(evaluation): mq close
lsy357 Jan 29, 2026
e9052e7
Merge branch 'main' into feat/agentkit
lsy357 Feb 6, 2026
927dccc
fix(evaluation): api opentag
lsy357 Feb 9, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 14 additions & 3 deletions backend/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"fmt"
"net/url"
"os"
"os/signal"
"syscall"
"time"

"github.com/bytedance/gg/gptr"
Expand Down Expand Up @@ -60,12 +62,21 @@ func main() {
if err := initTracer(handler); err != nil {
panic(err)
}
consumerWorkers := MustInitConsumerWorkers(c.cfgFactory, handler, handler, handler, handler)
if err := registry.NewConsumerRegistry(c.mqFactory).Register(consumerWorkers).StartAll(ctx); err != nil {

signalCtx, signalCancel := signal.NotifyContext(ctx, syscall.SIGTERM, syscall.SIGINT)
defer signalCancel()

r := registry.NewConsumerRegistryWithShutdown(signalCtx, c.mqFactory).Register(MustInitConsumerWorkers(c.cfgFactory, handler, handler, handler, handler))
if err := r.StartAll(ctx); err != nil {
panic(err)
}

api.Start(handler)
go api.Start(handler)
<-signalCtx.Done()

stopCtx, stopCancel := context.WithTimeout(context.Background(), 30*time.Second)
defer stopCancel()
_ = r.StopAll(stopCtx)
}

type ComponentConfig struct {
Expand Down
6 changes: 6 additions & 0 deletions backend/infra/mq/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ type ProducerConfig struct {
FlushFrequency time.Duration
// How long to wait for the cluster to settle between retries
RetryBackoff time.Duration

AccessKey *string
AccessSecret *string
}

type ConsumerConfig struct {
Expand All @@ -50,6 +53,9 @@ type ConsumerConfig struct {
ConsumeTimeout time.Duration
EnablePPE *bool
IsEnabled *bool

AccessKey *string
AccessSecret *string
}

type CompressionCodec int
Expand Down
14 changes: 14 additions & 0 deletions backend/infra/mq/mocks/registry.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions backend/infra/mq/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
type ConsumerRegistry interface {
Register(worker []IConsumerWorker) ConsumerRegistry
StartAll(ctx context.Context) error
StopAll(ctx context.Context) error
}

type IConsumerWorker interface {
Expand Down
69 changes: 66 additions & 3 deletions backend/infra/mq/registry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package registry

import (
"context"
"errors"

"github.com/coze-dev/coze-loop/backend/infra/mq"
"github.com/coze-dev/coze-loop/backend/pkg/errorx"
Expand All @@ -14,20 +15,27 @@ import (
)

type defaultConsumerRegistry struct {
factory mq.IFactory
workers []mq.IConsumerWorker
factory mq.IFactory
workers []mq.IConsumerWorker
consumers []mq.IConsumer
shutdownCtx context.Context
}

func NewConsumerRegistry(factory mq.IFactory) mq.ConsumerRegistry {
return &defaultConsumerRegistry{factory: factory}
}

func NewConsumerRegistryWithShutdown(shutdownCtx context.Context, factory mq.IFactory) mq.ConsumerRegistry {
return &defaultConsumerRegistry{factory: factory, shutdownCtx: shutdownCtx}
}

func (d *defaultConsumerRegistry) Register(worker []mq.IConsumerWorker) mq.ConsumerRegistry {
d.workers = append(d.workers, worker...)
return d
}

func (d *defaultConsumerRegistry) StartAll(ctx context.Context) error {
d.consumers = nil
for _, worker := range d.workers {
cfg, err := worker.ConsumerCfg(ctx)
if err != nil {
Expand All @@ -39,14 +47,47 @@ func (d *defaultConsumerRegistry) StartAll(ctx context.Context) error {
return errorx.Wrapf(err, "NewConsumer fail, cfg: %v", json.Jsonify(cfg))
}

consumer.RegisterHandler(newSafeConsumerWrapper(worker))
handler := newSafeConsumerWrapper(worker)
if d.shutdownCtx != nil {
handler = newShutdownContextWrapper(handler, d.shutdownCtx)
}
consumer.RegisterHandler(handler)
if err := consumer.Start(); err != nil {
return errorx.Wrapf(err, "StartConsumer fail, cfg: %v", json.Jsonify(cfg))
}
d.consumers = append(d.consumers, consumer)
}
return nil
}

func (d *defaultConsumerRegistry) StopAll(ctx context.Context) error {
if len(d.consumers) == 0 {
return nil
}
var errs []error
for i := len(d.consumers) - 1; i >= 0; i-- {
select {
case <-ctx.Done():
errs = append(errs, ctx.Err())
return errors.Join(errs...)
default:
consumer := d.consumers[i]
done := make(chan error, 1)
go func(c mq.IConsumer) { done <- c.Close() }(consumer)
select {
case err := <-done:
if err != nil {
errs = append(errs, err)
}
case <-ctx.Done():
errs = append(errs, ctx.Err())
return errors.Join(errs...)
}
}
}
return errors.Join(errs...)
}

type safeConsumerHandlerDecorator struct {
handler mq.IConsumerHandler
}
Expand All @@ -59,3 +100,25 @@ func (s *safeConsumerHandlerDecorator) HandleMessage(ctx context.Context, msg *m
func newSafeConsumerWrapper(h mq.IConsumerHandler) mq.IConsumerHandler {
return &safeConsumerHandlerDecorator{handler: h}
}

type shutdownContextDecorator struct {
handler mq.IConsumerHandler
shutdownCtx context.Context
}

func (s *shutdownContextDecorator) HandleMessage(ctx context.Context, msg *mq.MessageExt) error {
nctx, cancel := context.WithCancel(ctx)
go func() {
defer goroutine.Recovery(ctx)
select {
case <-ctx.Done():
case <-s.shutdownCtx.Done():
}
cancel()
}()
return s.handler.HandleMessage(nctx, msg)
}

func newShutdownContextWrapper(h mq.IConsumerHandler, shutdownCtx context.Context) mq.IConsumerHandler {
return &shutdownContextDecorator{handler: h, shutdownCtx: shutdownCtx}
}
157 changes: 154 additions & 3 deletions backend/infra/mq/registry/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ func TestDefaultConsumerRegistry_StartAll(t *testing.T) {
name string
workers []mq.IConsumerWorker
setupMocks func(*mocks.MockIFactory, []*mocks.MockIConsumer, []*mocks.MockIConsumerWorker)
shutdownCtx context.Context
expectedError error
}{
{
Expand All @@ -39,6 +40,21 @@ func TestDefaultConsumerRegistry_StartAll(t *testing.T) {
},
expectedError: nil,
},
{
name: "successfully start all workers with shutdown ctx",
workers: []mq.IConsumerWorker{
mocks.NewMockIConsumerWorker(gomock.NewController(t)),
},
setupMocks: func(factory *mocks.MockIFactory, consumers []*mocks.MockIConsumer, workers []*mocks.MockIConsumerWorker) {
cfg := &mq.ConsumerConfig{}
workers[0].EXPECT().ConsumerCfg(gomock.Any()).Return(cfg, nil)
consumers[0].EXPECT().RegisterHandler(gomock.Any()).Return()
consumers[0].EXPECT().Start().Return(nil)
factory.EXPECT().NewConsumer(gomock.Any()).Return(consumers[0], nil)
},
shutdownCtx: context.Background(),
expectedError: nil,
},
{
name: "fail to get consumer config",
workers: []mq.IConsumerWorker{
Expand Down Expand Up @@ -92,9 +108,12 @@ func TestDefaultConsumerRegistry_StartAll(t *testing.T) {
}

tt.setupMocks(factory, consumers, workers)

registry := NewConsumerRegistry(factory).Register(tt.workers)

var registry mq.ConsumerRegistry
if tt.shutdownCtx != nil {
registry = NewConsumerRegistryWithShutdown(tt.shutdownCtx, factory).Register(tt.workers)
} else {
registry = NewConsumerRegistry(factory).Register(tt.workers)
}
err := registry.StartAll(context.Background())
if tt.expectedError != nil {
assert.Error(t, err)
Expand All @@ -106,6 +125,69 @@ func TestDefaultConsumerRegistry_StartAll(t *testing.T) {
}
}

func TestDefaultConsumerRegistry_StopAll(t *testing.T) {
t.Run("no consumers", func(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
factory := mocks.NewMockIFactory(ctrl)
registry := NewConsumerRegistry(factory)
err := registry.StopAll(context.Background())
assert.NoError(t, err)
})

t.Run("successfully stop all consumers", func(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
factory := mocks.NewMockIFactory(ctrl)
workers := []mq.IConsumerWorker{
mocks.NewMockIConsumerWorker(ctrl),
mocks.NewMockIConsumerWorker(ctrl),
}
consumers := []*mocks.MockIConsumer{
mocks.NewMockIConsumer(ctrl),
mocks.NewMockIConsumer(ctrl),
}
cfg := &mq.ConsumerConfig{}
for i := range workers {
workers[i].(*mocks.MockIConsumerWorker).EXPECT().ConsumerCfg(gomock.Any()).Return(cfg, nil)
factory.EXPECT().NewConsumer(gomock.Any()).Return(consumers[i], nil)
consumers[i].EXPECT().RegisterHandler(gomock.Any())
consumers[i].EXPECT().Start().Return(nil)
}
registry := NewConsumerRegistry(factory).Register(workers)
err := registry.StartAll(context.Background())
assert.NoError(t, err)

// StopAll 按逆序关闭,先关 consumers[1] 再关 consumers[0]
consumers[1].EXPECT().Close().Return(nil)
consumers[0].EXPECT().Close().Return(nil)
err = registry.StopAll(context.Background())
assert.NoError(t, err)
})

t.Run("context cancelled during stop", func(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
factory := mocks.NewMockIFactory(ctrl)
worker := mocks.NewMockIConsumerWorker(ctrl)
consumer := mocks.NewMockIConsumer(ctrl)
cfg := &mq.ConsumerConfig{}
worker.EXPECT().ConsumerCfg(gomock.Any()).Return(cfg, nil)
factory.EXPECT().NewConsumer(gomock.Any()).Return(consumer, nil)
consumer.EXPECT().RegisterHandler(gomock.Any())
consumer.EXPECT().Start().Return(nil)
registry := NewConsumerRegistry(factory).Register([]mq.IConsumerWorker{worker})
err := registry.StartAll(context.Background())
assert.NoError(t, err)

ctx, cancel := context.WithCancel(context.Background())
cancel()
err = registry.StopAll(ctx)
assert.Error(t, err)
assert.True(t, errors.Is(err, context.Canceled))
})
}

func TestSafeConsumerHandlerDecorator_HandleMessage(t *testing.T) {
tests := []struct {
name string
Expand Down Expand Up @@ -150,3 +232,72 @@ func TestSafeConsumerHandlerDecorator_HandleMessage(t *testing.T) {
})
}
}

func TestNewConsumerRegistryWithShutdown(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

factory := mocks.NewMockIFactory(ctrl)
shutdownCtx, cancel := context.WithCancel(context.Background())
defer cancel()

registry := NewConsumerRegistryWithShutdown(shutdownCtx, factory).(*defaultConsumerRegistry)
assert.Equal(t, factory, registry.factory)
assert.Equal(t, shutdownCtx, registry.shutdownCtx)
}

func TestShutdownContextDecorator_HandleMessage(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

mockHandler := mocks.NewMockIConsumerWorker(ctrl)
shutdownCtx, shutdownCancel := context.WithCancel(context.Background())

decorator := &shutdownContextDecorator{
handler: mockHandler,
shutdownCtx: shutdownCtx,
}

tests := []struct {
name string
setupMock func()
triggerCancel func()
ctx context.Context
}{
{
name: "normal execution",
setupMock: func() {
mockHandler.EXPECT().HandleMessage(gomock.Any(), gomock.Any()).Return(nil)
},
triggerCancel: func() {},
ctx: context.Background(),
},
{
name: "shutdown context cancelled",
setupMock: func() {
mockHandler.EXPECT().HandleMessage(gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, msg *mq.MessageExt) error {
<-ctx.Done()
return ctx.Err()
})
},
triggerCancel: func() {
shutdownCancel()
},
ctx: context.Background(),
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
tt.setupMock()
go tt.triggerCancel()
err := decorator.HandleMessage(tt.ctx, &mq.MessageExt{})
if tt.name == "shutdown context cancelled" {
assert.Error(t, err)
assert.True(t, errors.Is(err, context.Canceled))
} else {
assert.NoError(t, err)
}
})
}
}
Loading
Loading