Skip to content

Commit 804f744

Browse files
committed
Merge branch 'main' of github.com:linkall-labs/vanus into clientstream-v2
Signed-off-by: jyjiangkai <jyjiangkai@163.com>
2 parents 023a19f + d8249cc commit 804f744

71 files changed

Lines changed: 3052 additions & 1154 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

client/client.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ type client struct {
4040
}
4141

4242
func (c *client) Eventbus(ctx context.Context, ebName string) api.Eventbus {
43-
_, span := c.tracer.Start(ctx, "Eventbus")
43+
_, span := c.tracer.Start(ctx, "EventbusService")
4444
defer span.End()
4545

4646
bus := func() api.Eventbus {

client/internal/vanus/eventbus/name_service.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,11 @@ package eventbus
1717
import (
1818
// standard libraries
1919
"context"
20+
"github.com/linkall-labs/vanus/pkg/cluster"
2021

2122
"github.com/linkall-labs/vanus/observability/tracing"
2223
"go.opentelemetry.io/otel/trace"
2324

24-
// third-party libraries
25-
"github.com/linkall-labs/vanus/pkg/controller"
2625
"google.golang.org/grpc/credentials/insecure"
2726

2827
// first-party libraries
@@ -33,7 +32,7 @@ import (
3332

3433
func NewNameService(endpoints []string) *NameService {
3534
return &NameService{
36-
client: controller.NewEventbusClient(endpoints, insecure.NewCredentials()),
35+
client: cluster.NewClusterController(endpoints, insecure.NewCredentials()).EventbusService().RawClient(),
3736
tracer: tracing.NewTracer("internal.discovery.eventbus", trace.SpanKindClient),
3837
}
3938
}

client/internal/vanus/eventlog/name_service.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import (
2727
// first-party libraries.
2828
"github.com/linkall-labs/vanus/observability/log"
2929
"github.com/linkall-labs/vanus/observability/tracing"
30-
"github.com/linkall-labs/vanus/pkg/controller"
30+
"github.com/linkall-labs/vanus/pkg/cluster"
3131
ctrlpb "github.com/linkall-labs/vanus/proto/pkg/controller"
3232
metapb "github.com/linkall-labs/vanus/proto/pkg/meta"
3333

@@ -38,7 +38,7 @@ import (
3838

3939
func NewNameService(endpoints []string) *NameService {
4040
return &NameService{
41-
client: controller.NewEventlogClient(endpoints, insecure.NewCredentials()),
41+
client: cluster.NewClusterController(endpoints, insecure.NewCredentials()).EventlogService().RawClient(),
4242
tracer: tracing.NewTracer("internal.discovery.eventlog", trace.SpanKindClient),
4343
}
4444
}

client/internal/vanus/store/block_store.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,7 @@ func (s *BlockStore) AppendManyStream(ctx context.Context, block uint64, events
214214
_ctx, span := s.tracer.Start(ctx, "AppendManyStream")
215215
defer span.End()
216216

217+
log.Error(ctx, "===jk1===", nil)
217218
var (
218219
err error
219220
wg sync.WaitGroup
@@ -239,7 +240,7 @@ func (s *BlockStore) AppendManyStream(ctx context.Context, block uint64, events
239240
if err != nil {
240241
return nil, err
241242
}
242-
eventpbs = append(eventpbs, eventpb)
243+
eventpbs[idx] = eventpb
243244
}
244245

245246
s.appendCallbacks.Store(requestID, appendCallback(func(res *segpb.AppendToBlockStreamResponse) {
@@ -255,6 +256,10 @@ func (s *BlockStore) AppendManyStream(ctx context.Context, block uint64, events
255256
},
256257
}
257258

259+
log.Error(ctx, "===jk2===", map[string]interface{}{
260+
"RequestId": requestID,
261+
})
262+
258263
if err = s.appendStream.Send(req); err != nil {
259264
log.Error(ctx, "append stream send failed", map[string]interface{}{
260265
log.KeyError: err,
@@ -277,6 +282,10 @@ func (s *BlockStore) AppendManyStream(ctx context.Context, block uint64, events
277282

278283
wg.Wait()
279284

285+
log.Error(ctx, "===jk3===", map[string]interface{}{
286+
"ResponseId": resp.ResponseId,
287+
})
288+
280289
if resp.ResponseCode == errpb.ErrorCode_FULL {
281290
log.Warning(ctx, "block append failed cause the segment is full", nil)
282291
return nil, errors.ErrFull.WithMessage("segment is full")

client/mock_client.go

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

client/pkg/eventbus/eventbus.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -512,7 +512,7 @@ func (w *busWriter) AppendMany(ctx context.Context, events []*ce.Event, opts ...
512512
return encoded
513513
}
514514

515-
eventIDs := make([]string, len(eid))
515+
eventIDs := make([]string, len(offsets))
516516
for idx := range offsets {
517517
eventIDs[idx] = genFunc(offsets[idx])
518518
}

cmd/controller/main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ func main() {
8787

8888
segmentCtrl := eventbus.NewController(cfg.GetEventbusCtrlConfig(), etcd)
8989
if err = segmentCtrl.Start(ctx); err != nil {
90-
log.Error(ctx, "start Eventbus Controller failed", map[string]interface{}{
90+
log.Error(ctx, "start EventbusService Controller failed", map[string]interface{}{
9191
log.KeyError: err,
9292
})
9393
os.Exit(-1)

cmd/trigger/main.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,6 @@ func main() {
5454
os.Exit(-1)
5555
}
5656
ctx := signal.SetupSignalContext()
57-
metrics.RegisterTriggerMetrics()
5857
_ = observability.Initialize(cfg.Observability, metrics.RegisterTriggerMetrics)
5958
var opts []grpc.ServerOption
6059
grpcServer := grpc.NewServer(opts...)

go.mod

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,14 +23,14 @@ require (
2323
github.com/iceber/iouring-go v0.0.0-20220609112130-b1dc8dd9fbfd
2424
github.com/jedib0t/go-pretty/v6 v6.3.1
2525
github.com/json-iterator/go v1.1.12
26-
github.com/linkall-labs/embed-etcd v0.1.1
26+
github.com/linkall-labs/embed-etcd v0.1.2
2727
github.com/linkall-labs/vanus/client v0.5.1
2828
github.com/linkall-labs/vanus/observability v0.5.1
2929
github.com/linkall-labs/vanus/pkg v0.5.1
3030
github.com/linkall-labs/vanus/proto v0.5.1
3131
github.com/linkall-labs/vanus/raft v0.5.1
3232
github.com/ncw/directio v1.0.5
33-
github.com/oliveagle/jsonpath v0.0.0-20180606110733-2e52cf6e6852
33+
github.com/ohler55/ojg v1.14.5
3434
github.com/pkg/errors v0.9.1
3535
github.com/prashantv/gostub v1.1.0
3636
github.com/prometheus/client_golang v1.13.0

go.sum

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -292,8 +292,8 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
292292
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
293293
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
294294
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
295-
github.com/linkall-labs/embed-etcd v0.1.1 h1:WxV9wbnRtNf7DMW8SJauVYqhFLXzRfY5wpplFypXK9k=
296-
github.com/linkall-labs/embed-etcd v0.1.1/go.mod h1:dmleSy0Myllw6W5awwjyDMipgICVDHTHuTcRT4cqaIc=
295+
github.com/linkall-labs/embed-etcd v0.1.2 h1:1mTdXLwVvn9gi3XWh/PGhaEAfG8Zmxvjqwnfontb+fA=
296+
github.com/linkall-labs/embed-etcd v0.1.2/go.mod h1:QnecHaKt3WQBO9YGBckCDUTBd44VBR2VO8220BtWZ5U=
297297
github.com/mattn/go-colorable v0.1.9/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc=
298298
github.com/mattn/go-colorable v0.1.11 h1:nQ+aFkoE2TMGc0b68U2OKSexC+eq46+XwZzWXHRmPYs=
299299
github.com/mattn/go-colorable v0.1.11/go.mod h1:u5H1YNBxpqRaxsYJYSkiCWKzEfiAb1Gb520KVy5xxl4=
@@ -320,8 +320,8 @@ github.com/ncw/directio v1.0.5/go.mod h1:rX/pKEYkOXBGOggmcyJeJGloCkleSvphPx2eV3t
320320
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs=
321321
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
322322
github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
323-
github.com/oliveagle/jsonpath v0.0.0-20180606110733-2e52cf6e6852 h1:Yl0tPBa8QPjGmesFh1D0rDy+q1Twx6FyU7VWHi8wZbI=
324-
github.com/oliveagle/jsonpath v0.0.0-20180606110733-2e52cf6e6852/go.mod h1:eqOVx5Vwu4gd2mmMZvVZsgIqNSaW3xxRThUJ0k/TPk4=
323+
github.com/ohler55/ojg v1.14.5 h1:xCX2oyh/ZaoesbLH6fwVHStSJpk4o4eJs8ttXutzdg0=
324+
github.com/ohler55/ojg v1.14.5/go.mod h1:7Ghirupn8NC8hSSDpI0gcjorPxj+vSVIONDWfliHR1k=
325325
github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE=
326326
github.com/onsi/gomega v1.18.1 h1:M1GfJqGRrBrrGGsbxzV5dqM2U2ApXefZCQpkukxYRLE=
327327
github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=

0 commit comments

Comments
 (0)