Skip to content

Commit ed2616f

Browse files
committed
Merge branch 'main' of github.com:linkall-labs/vanus into clientstream-v2
Signed-off-by: jyjiangkai <jyjiangkai@163.com>
2 parents 023a19f + d8249cc commit ed2616f

64 files changed

Lines changed: 2761 additions & 1046 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

client/client.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ type client struct {
4040
}
4141

4242
func (c *client) Eventbus(ctx context.Context, ebName string) api.Eventbus {
43-
_, span := c.tracer.Start(ctx, "Eventbus")
43+
_, span := c.tracer.Start(ctx, "EventbusService")
4444
defer span.End()
4545

4646
bus := func() api.Eventbus {

client/internal/vanus/eventbus/name_service.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,11 @@ package eventbus
1717
import (
1818
// standard libraries
1919
"context"
20+
"github.com/linkall-labs/vanus/pkg/cluster"
2021

2122
"github.com/linkall-labs/vanus/observability/tracing"
2223
"go.opentelemetry.io/otel/trace"
2324

24-
// third-party libraries
25-
"github.com/linkall-labs/vanus/pkg/controller"
2625
"google.golang.org/grpc/credentials/insecure"
2726

2827
// first-party libraries
@@ -33,7 +32,7 @@ import (
3332

3433
func NewNameService(endpoints []string) *NameService {
3534
return &NameService{
36-
client: controller.NewEventbusClient(endpoints, insecure.NewCredentials()),
35+
client: cluster.NewClusterController(endpoints, insecure.NewCredentials()).EventbusService().RawClient(),
3736
tracer: tracing.NewTracer("internal.discovery.eventbus", trace.SpanKindClient),
3837
}
3938
}

client/internal/vanus/eventlog/name_service.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import (
2727
// first-party libraries.
2828
"github.com/linkall-labs/vanus/observability/log"
2929
"github.com/linkall-labs/vanus/observability/tracing"
30-
"github.com/linkall-labs/vanus/pkg/controller"
30+
"github.com/linkall-labs/vanus/pkg/cluster"
3131
ctrlpb "github.com/linkall-labs/vanus/proto/pkg/controller"
3232
metapb "github.com/linkall-labs/vanus/proto/pkg/meta"
3333

@@ -38,7 +38,7 @@ import (
3838

3939
func NewNameService(endpoints []string) *NameService {
4040
return &NameService{
41-
client: controller.NewEventlogClient(endpoints, insecure.NewCredentials()),
41+
client: cluster.NewClusterController(endpoints, insecure.NewCredentials()).EventlogService().RawClient(),
4242
tracer: tracing.NewTracer("internal.discovery.eventlog", trace.SpanKindClient),
4343
}
4444
}

client/mock_client.go

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

cmd/controller/main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ func main() {
8787

8888
segmentCtrl := eventbus.NewController(cfg.GetEventbusCtrlConfig(), etcd)
8989
if err = segmentCtrl.Start(ctx); err != nil {
90-
log.Error(ctx, "start Eventbus Controller failed", map[string]interface{}{
90+
log.Error(ctx, "start EventbusService Controller failed", map[string]interface{}{
9191
log.KeyError: err,
9292
})
9393
os.Exit(-1)

cmd/trigger/main.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,6 @@ func main() {
5454
os.Exit(-1)
5555
}
5656
ctx := signal.SetupSignalContext()
57-
metrics.RegisterTriggerMetrics()
5857
_ = observability.Initialize(cfg.Observability, metrics.RegisterTriggerMetrics)
5958
var opts []grpc.ServerOption
6059
grpcServer := grpc.NewServer(opts...)

go.mod

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,14 +23,14 @@ require (
2323
github.com/iceber/iouring-go v0.0.0-20220609112130-b1dc8dd9fbfd
2424
github.com/jedib0t/go-pretty/v6 v6.3.1
2525
github.com/json-iterator/go v1.1.12
26-
github.com/linkall-labs/embed-etcd v0.1.1
26+
github.com/linkall-labs/embed-etcd v0.1.2
2727
github.com/linkall-labs/vanus/client v0.5.1
2828
github.com/linkall-labs/vanus/observability v0.5.1
2929
github.com/linkall-labs/vanus/pkg v0.5.1
3030
github.com/linkall-labs/vanus/proto v0.5.1
3131
github.com/linkall-labs/vanus/raft v0.5.1
3232
github.com/ncw/directio v1.0.5
33-
github.com/oliveagle/jsonpath v0.0.0-20180606110733-2e52cf6e6852
33+
github.com/ohler55/ojg v1.14.5
3434
github.com/pkg/errors v0.9.1
3535
github.com/prashantv/gostub v1.1.0
3636
github.com/prometheus/client_golang v1.13.0

go.sum

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -292,8 +292,8 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
292292
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
293293
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
294294
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
295-
github.com/linkall-labs/embed-etcd v0.1.1 h1:WxV9wbnRtNf7DMW8SJauVYqhFLXzRfY5wpplFypXK9k=
296-
github.com/linkall-labs/embed-etcd v0.1.1/go.mod h1:dmleSy0Myllw6W5awwjyDMipgICVDHTHuTcRT4cqaIc=
295+
github.com/linkall-labs/embed-etcd v0.1.2 h1:1mTdXLwVvn9gi3XWh/PGhaEAfG8Zmxvjqwnfontb+fA=
296+
github.com/linkall-labs/embed-etcd v0.1.2/go.mod h1:QnecHaKt3WQBO9YGBckCDUTBd44VBR2VO8220BtWZ5U=
297297
github.com/mattn/go-colorable v0.1.9/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc=
298298
github.com/mattn/go-colorable v0.1.11 h1:nQ+aFkoE2TMGc0b68U2OKSexC+eq46+XwZzWXHRmPYs=
299299
github.com/mattn/go-colorable v0.1.11/go.mod h1:u5H1YNBxpqRaxsYJYSkiCWKzEfiAb1Gb520KVy5xxl4=
@@ -320,8 +320,8 @@ github.com/ncw/directio v1.0.5/go.mod h1:rX/pKEYkOXBGOggmcyJeJGloCkleSvphPx2eV3t
320320
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs=
321321
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
322322
github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
323-
github.com/oliveagle/jsonpath v0.0.0-20180606110733-2e52cf6e6852 h1:Yl0tPBa8QPjGmesFh1D0rDy+q1Twx6FyU7VWHi8wZbI=
324-
github.com/oliveagle/jsonpath v0.0.0-20180606110733-2e52cf6e6852/go.mod h1:eqOVx5Vwu4gd2mmMZvVZsgIqNSaW3xxRThUJ0k/TPk4=
323+
github.com/ohler55/ojg v1.14.5 h1:xCX2oyh/ZaoesbLH6fwVHStSJpk4o4eJs8ttXutzdg0=
324+
github.com/ohler55/ojg v1.14.5/go.mod h1:7Ghirupn8NC8hSSDpI0gcjorPxj+vSVIONDWfliHR1k=
325325
github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE=
326326
github.com/onsi/gomega v1.18.1 h1:M1GfJqGRrBrrGGsbxzV5dqM2U2ApXefZCQpkukxYRLE=
327327
github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=

internal/controller/eventbus/controller.go

Lines changed: 53 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import (
3232
"github.com/linkall-labs/vanus/internal/controller/eventbus/volume"
3333
"github.com/linkall-labs/vanus/internal/kv"
3434
"github.com/linkall-labs/vanus/internal/kv/etcd"
35+
"github.com/linkall-labs/vanus/internal/primitive"
3536
"github.com/linkall-labs/vanus/internal/primitive/vanus"
3637
"github.com/linkall-labs/vanus/observability/log"
3738
"github.com/linkall-labs/vanus/observability/metrics"
@@ -94,7 +95,6 @@ func (ctrl *controller) Start(_ context.Context) error {
9495
return err
9596
}
9697
ctrl.kvStore = store
97-
9898
ctrl.cancelCtx, ctrl.cancelFunc = context.WithCancel(context.Background())
9999
go ctrl.member.RegisterMembershipChangedProcessor(ctrl.membershipChangedProcessor)
100100
return nil
@@ -113,9 +113,48 @@ func (ctrl *controller) StopNotify() <-chan error {
113113
}
114114

115115
func (ctrl *controller) CreateEventBus(ctx context.Context,
116+
req *ctrlpb.CreateEventBusRequest) (*metapb.EventBus, error) {
117+
if err := isValidEventbusName(req.Name); err != nil {
118+
return nil, err
119+
}
120+
return ctrl.createEventBus(ctx, req)
121+
}
122+
123+
func isValidEventbusName(name string) error {
124+
name = strings.ToLower(name)
125+
for _, v := range name {
126+
if v == '.' || v == '_' || v == '-' {
127+
continue
128+
}
129+
c := v - 'a'
130+
if c >= 0 || c <= 26 {
131+
continue
132+
} else {
133+
c = v - '0'
134+
if c >= 0 || c <= 9 {
135+
continue
136+
}
137+
return errors.ErrInvalidRequest.WithMessage("eventbus name must be insist of 0-9a-zA-Z.-_")
138+
}
139+
}
140+
return nil
141+
}
142+
143+
func (ctrl *controller) CreateSystemEventBus(ctx context.Context,
144+
req *ctrlpb.CreateEventBusRequest) (*metapb.EventBus, error) {
145+
if !strings.HasPrefix(req.Name, primitive.SystemEventbusNamePrefix) {
146+
return nil, errors.ErrInvalidRequest.WithMessage("system eventbus must start with __")
147+
}
148+
return ctrl.createEventBus(ctx, req)
149+
}
150+
151+
func (ctrl *controller) createEventBus(ctx context.Context,
116152
req *ctrlpb.CreateEventBusRequest) (*metapb.EventBus, error) {
117153
ctrl.mutex.Lock()
118154
defer ctrl.mutex.Unlock()
155+
if !ctrl.isReady(ctx) {
156+
return nil, errors.ErrServerNotRunning.WithMessage("the cluster isn't ready to create eventbus")
157+
}
119158
logNum := req.LogNumber
120159
if logNum == 0 {
121160
logNum = 1
@@ -451,12 +490,23 @@ func (ctrl *controller) ReportSegmentBlockIsFull(ctx context.Context,
451490
return &emptypb.Empty{}, nil
452491
}
453492

454-
func (ctrl *controller) Ping(_ context.Context, _ *emptypb.Empty) (*ctrlpb.PingResponse, error) {
493+
func (ctrl *controller) Ping(ctx context.Context, _ *emptypb.Empty) (*ctrlpb.PingResponse, error) {
455494
return &ctrlpb.PingResponse{
456-
LeaderAddr: ctrl.member.GetLeaderAddr(),
495+
LeaderAddr: ctrl.member.GetLeaderAddr(),
496+
IsEventbusReady: ctrl.isReady(ctx),
457497
}, nil
458498
}
459499

500+
func (ctrl *controller) isReady(ctx context.Context) bool {
501+
if ctrl.member == nil {
502+
return false
503+
}
504+
if !ctrl.member.IsLeader() && !ctrl.member.IsReady() || ctrl.member.GetLeaderAddr() == "" {
505+
return false
506+
}
507+
return ctrl.ssMgr.CanCreateEventbus(ctx, int(ctrl.cfg.Replicas))
508+
}
509+
460510
func (ctrl *controller) ReportSegmentLeader(ctx context.Context,
461511
req *ctrlpb.ReportSegmentLeaderRequest) (*emptypb.Empty, error) {
462512
err := ctrl.eventLogMgr.UpdateSegmentReplicas(ctx, vanus.NewIDFromUint64(req.LeaderId), req.Term)

internal/controller/eventbus/controller_test.go

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,15 +20,16 @@ import (
2020
"sort"
2121
"testing"
2222

23+
"github.com/golang/mock/gomock"
24+
embedetcd "github.com/linkall-labs/embed-etcd"
2325
"github.com/linkall-labs/vanus/internal/controller/eventbus/eventlog"
2426
"github.com/linkall-labs/vanus/internal/controller/eventbus/metadata"
2527
"github.com/linkall-labs/vanus/internal/kv"
2628
"github.com/linkall-labs/vanus/internal/primitive/vanus"
2729
"github.com/linkall-labs/vanus/pkg/errors"
2830
ctrlpb "github.com/linkall-labs/vanus/proto/pkg/controller"
31+
errpb "github.com/linkall-labs/vanus/proto/pkg/errors"
2932
metapb "github.com/linkall-labs/vanus/proto/pkg/meta"
30-
31-
"github.com/golang/mock/gomock"
3233
. "github.com/smartystreets/goconvey/convey"
3334
)
3435

@@ -44,6 +45,12 @@ func TestController_CreateEventBus(t *testing.T) {
4445
ctrl.eventLogMgr = elMgr
4546
ctx := stdCtx.Background()
4647

48+
mockMember := embedetcd.NewMockMember(mockCtrl)
49+
ctrl.member = mockMember
50+
mockMember.EXPECT().IsLeader().AnyTimes().Return(true)
51+
mockMember.EXPECT().IsReady().AnyTimes().Return(true)
52+
mockMember.EXPECT().GetLeaderAddr().AnyTimes().Return("test")
53+
4754
Convey("test create a eventbus two times", func() {
4855
kvCli.EXPECT().Exists(ctx, metadata.GetEventbusMetadataKey("test-1")).Times(1).Return(false, nil)
4956
kvCli.EXPECT().Set(ctx, metadata.GetEventbusMetadataKey("test-1"), gomock.Any()).
@@ -88,7 +95,7 @@ func TestController_CreateEventBus(t *testing.T) {
8895
So(res, ShouldBeNil)
8996
et, ok := err.(*errors.ErrorType)
9097
So(ok, ShouldBeTrue)
91-
So(et.Code, ShouldEqual, errors.ErrorCode_RESOURCE_EXIST)
98+
So(et.Code, ShouldEqual, errpb.ErrorCode_RESOURCE_EXIST)
9299
So(et.Description, ShouldEqual, "resource already exist")
93100
So(et.Message, ShouldEqual, "the eventbus already exist")
94101
})
@@ -113,7 +120,7 @@ func TestController_DeleteEventBus(t *testing.T) {
113120
So(res, ShouldBeNil)
114121
et, ok := err.(*errors.ErrorType)
115122
So(ok, ShouldBeTrue)
116-
So(et.Code, ShouldEqual, errors.ErrorCode_RESOURCE_NOT_FOUND)
123+
So(et.Code, ShouldEqual, errpb.ErrorCode_RESOURCE_EXIST)
117124
So(et.Description, ShouldEqual, "resource not found")
118125
So(et.Message, ShouldEqual, "the eventbus doesn't exist")
119126
})
@@ -142,7 +149,7 @@ func TestController_DeleteEventBus(t *testing.T) {
142149
So(res, ShouldBeNil)
143150
et, ok := err.(*errors.ErrorType)
144151
So(ok, ShouldBeTrue)
145-
So(et.Code, ShouldEqual, errors.ErrorCode_INTERNAL)
152+
So(et.Code, ShouldEqual, errpb.ErrorCode_INTERNAL)
146153
So(et.Description, ShouldEqual, "internal error")
147154
So(et.Message, ShouldEqual, "delete eventbus metadata in kv failed")
148155
})

0 commit comments

Comments
 (0)