Skip to content

Commit 6ce31f8

Browse files
committed
fix review comments
Signed-off-by: jyjiangkai <jyjiangkai@163.com>
1 parent 98dc0f9 commit 6ce31f8

6 files changed

Lines changed: 42 additions & 39 deletions

File tree

client/internal/vanus/store/block_store.go

Lines changed: 29 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -22,26 +22,24 @@ import (
2222
"sync/atomic"
2323
"time"
2424

25-
"github.com/linkall-labs/vanus/observability/tracing"
26-
"go.opentelemetry.io/otel/trace"
27-
28-
ce "github.com/cloudevents/sdk-go/v2"
29-
"google.golang.org/grpc"
30-
3125
// first-party libraries
32-
33-
// third-party libraries
26+
"github.com/linkall-labs/vanus/observability/log"
27+
"github.com/linkall-labs/vanus/observability/tracing"
28+
"github.com/linkall-labs/vanus/pkg/errors"
3429
cepb "github.com/linkall-labs/vanus/proto/pkg/cloudevents"
3530
errpb "github.com/linkall-labs/vanus/proto/pkg/errors"
3631
segpb "github.com/linkall-labs/vanus/proto/pkg/segment"
3732

33+
// third-party libraries
34+
ce "github.com/cloudevents/sdk-go/v2"
35+
"go.opentelemetry.io/otel/trace"
36+
"google.golang.org/grpc"
37+
3838
// this project
3939
"github.com/linkall-labs/vanus/client/internal/vanus/codec"
4040
"github.com/linkall-labs/vanus/client/internal/vanus/net/rpc"
4141
"github.com/linkall-labs/vanus/client/internal/vanus/net/rpc/bare"
4242
"github.com/linkall-labs/vanus/client/pkg/primitive"
43-
"github.com/linkall-labs/vanus/observability/log"
44-
"github.com/linkall-labs/vanus/pkg/errors"
4543
)
4644

4745
func newBlockStore(endpoint string) (*BlockStore, error) {
@@ -53,13 +51,20 @@ func newBlockStore(endpoint string) (*BlockStore, error) {
5351
})),
5452
tracer: tracing.NewTracer("internal.store.BlockStore", trace.SpanKindClient),
5553
}
54+
// TODO(jiangkai): delay creating streams to reduce invalid overhead.
5655
_, err = s.connectAppendStream(context.Background())
5756
if err != nil {
57+
// close client
58+
s.client.Close()
5859
// TODO: check error
5960
return nil, err
6061
}
6162
_, err = s.connectReadStream(context.Background())
6263
if err != nil {
64+
// close append stream
65+
s.append.releaseStream()
66+
// close client
67+
s.client.Close()
6368
// TODO: check error
6469
return nil, err
6570
}
@@ -333,7 +338,7 @@ func (s *BlockStore) AppendManyStream(ctx context.Context, block uint64, events
333338
}
334339

335340
// generate unique opaqueID
336-
atomic.AddUint64(&append.opaqueID, 1)
341+
opaqueID := atomic.AddUint64(&append.opaqueID, 1)
337342

338343
//TODO(jiangkai): delete the reference of CloudEvents/v2 in Vanus
339344
eventpbs := make([]*cepb.CloudEvent, len(events))
@@ -346,13 +351,13 @@ func (s *BlockStore) AppendManyStream(ctx context.Context, block uint64, events
346351
}
347352

348353
donec := make(chan struct{})
349-
append.callbacks.Store(append.opaqueID, appendCallback(func(res *segpb.AppendToBlockStreamResponse) {
354+
append.callbacks.Store(opaqueID, appendCallback(func(res *segpb.AppendToBlockStreamResponse) {
350355
resp = res
351356
close(donec)
352357
}))
353358

354359
req := &segpb.AppendToBlockStreamRequest{
355-
Id: append.opaqueID,
360+
Id: opaqueID,
356361
BlockId: block,
357362
Events: &cepb.CloudEventBatch{
358363
Events: eventpbs,
@@ -367,10 +372,10 @@ func (s *BlockStore) AppendManyStream(ctx context.Context, block uint64, events
367372
append.releaseStream()
368373
// reset new stream connections
369374
s.connectAppendStream(ctx)
370-
c, _ := append.callbacks.LoadAndDelete(append.opaqueID)
375+
c, _ := append.callbacks.LoadAndDelete(opaqueID)
371376
if c != nil {
372377
c.(appendCallback)(&segpb.AppendToBlockStreamResponse{
373-
Id: append.opaqueID,
378+
Id: opaqueID,
374379
ResponseCode: errpb.ErrorCode_CLOSED,
375380
ResponseMsg: "append stream closed",
376381
Offsets: []int64{},
@@ -382,10 +387,10 @@ func (s *BlockStore) AppendManyStream(ctx context.Context, block uint64, events
382387
select {
383388
case <-donec:
384389
case <-_ctx.Done():
385-
c, _ := append.callbacks.LoadAndDelete(append.opaqueID)
390+
c, _ := append.callbacks.LoadAndDelete(opaqueID)
386391
if c != nil {
387392
c.(appendCallback)(&segpb.AppendToBlockStreamResponse{
388-
Id: append.opaqueID,
393+
Id: opaqueID,
389394
ResponseCode: errpb.ErrorCode_CONTEXT_CANCELED,
390395
ResponseMsg: "append stream context canceled",
391396
Offsets: []int64{},
@@ -463,15 +468,16 @@ func (s *BlockStore) ReadStream(
463468
}
464469

465470
// generate unique RequestId
466-
atomic.AddUint64(&read.opaqueID, 1)
471+
opaqueID := atomic.AddUint64(&read.opaqueID, 1)
467472

468473
donec := make(chan struct{})
469-
read.callbacks.Store(read.opaqueID, readCallback(func(res *segpb.ReadFromBlockStreamResponse) {
474+
read.callbacks.Store(opaqueID, readCallback(func(res *segpb.ReadFromBlockStreamResponse) {
470475
resp = res
471476
close(donec)
472477
}))
473478

474479
req := &segpb.ReadFromBlockStreamRequest{
480+
Id: opaqueID,
475481
BlockId: block,
476482
Offset: offset,
477483
Number: int64(size),
@@ -484,10 +490,10 @@ func (s *BlockStore) ReadStream(
484490
})
485491
read.releaseStream()
486492
s.connectReadStream(ctx)
487-
c, _ := read.callbacks.LoadAndDelete(read.opaqueID)
493+
c, _ := read.callbacks.LoadAndDelete(opaqueID)
488494
if c != nil {
489495
c.(readCallback)(&segpb.ReadFromBlockStreamResponse{
490-
Id: read.opaqueID,
496+
Id: opaqueID,
491497
ResponseCode: errpb.ErrorCode_CLOSED,
492498
ResponseMsg: "read stream closed",
493499
Events: &cepb.CloudEventBatch{
@@ -501,10 +507,10 @@ func (s *BlockStore) ReadStream(
501507
select {
502508
case <-donec:
503509
case <-_ctx.Done():
504-
c, _ := read.callbacks.LoadAndDelete(read.opaqueID)
510+
c, _ := read.callbacks.LoadAndDelete(opaqueID)
505511
if c != nil {
506512
c.(readCallback)(&segpb.ReadFromBlockStreamResponse{
507-
Id: read.opaqueID,
513+
Id: opaqueID,
508514
ResponseCode: errpb.ErrorCode_CONTEXT_CANCELED,
509515
ResponseMsg: "read stream context canceled",
510516
Events: &cepb.CloudEventBatch{

client/pkg/eventlog/log_segment.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ func newBlockExt(ctx context.Context, r *record.Segment, leaderOnly bool) (*bloc
7070
id := r.LeaderBlockID
7171
if id == 0 {
7272
if leaderOnly {
73-
return nil, errors.ErrNotLeader.WithMessage("the block is not leader")
73+
return nil, errors.ErrNoLeader.WithMessage("the block no leader")
7474
}
7575
for _, b := range r.Blocks {
7676
if b.Endpoint != "" {
@@ -283,7 +283,7 @@ func (s *segment) ReadStream(ctx context.Context, from int64, size int16, pollin
283283
}
284284
off, ok := v.(int32)
285285
if !ok {
286-
return events, errors.ErrInvalidRequest.WithMessage("corrupted event")
286+
return events, errors.ErrCorruptedEvent.WithMessage("corrupted event")
287287
}
288288
offset := s.startOffset + int64(off)
289289
buf := make([]byte, 8)

internal/store/block/raft/appender.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,6 @@ func NewAppender(
104104
raftLog *raftlog.Log,
105105
host transport.Host,
106106
listener LeaderChangedListener,
107-
callbackC chan func(),
108107
) Appender {
109108
ctx, cancel := context.WithCancel(ctx)
110109

internal/store/segment/recovery.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ func (s *server) recoverBlocks(ctx context.Context, logs map[vanus.ID]*raftlog.L
7878
return err
7979
}
8080
}
81-
a := raft.NewAppender(context.TODO(), r, l, s.host, s.leaderChanged, s.callbackC)
81+
a := raft.NewAppender(context.TODO(), r, l, s.host, s.leaderChanged)
8282
s.replicas.Store(id, &replica{
8383
id: id,
8484
idStr: id.String(),

internal/store/segment/replica.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ func (s *server) createBlock(ctx context.Context, id vanus.ID, size int64) (Repl
116116

117117
// Create replica.
118118
l := raftlog.NewLog(id, s.wal, s.metaStore, s.offsetStore, nil)
119-
a := raft.NewAppender(context.TODO(), r, l, s.host, s.leaderChanged, s.callbackC)
119+
a := raft.NewAppender(context.TODO(), r, l, s.host, s.leaderChanged)
120120

121121
return &replica{
122122
id: id,

internal/store/segment/server.go

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -713,17 +713,15 @@ func (s *server) AppendToBlock(ctx context.Context, id vanus.ID, events []*cepb.
713713
metrics.WriteTPSCounterVec.WithLabelValues(s.volumeIDStr, b.IDStr()).Add(float64(len(events)))
714714
metrics.WriteThroughputCounterVec.WithLabelValues(s.volumeIDStr, b.IDStr()).Add(float64(size))
715715

716-
future := newAppendFuture()
717-
b.Append(ctx, entries, future.onAppended)
718-
seqs, err := future.wait()
719-
if err != nil {
720-
cb(nil, s.processAppendError(ctx, b, err))
721-
return
722-
}
723-
724-
// TODO(weihe.yin) make this method deep to code
725-
s.pm.NewMessageArrived(id)
726-
cb(seqs, nil)
716+
b.Append(ctx, entries, func(seqs []int64, err error) {
717+
s.callbackC <- func() {
718+
if err == nil {
719+
// TODO(weihe.yin) make this method deep to code
720+
s.pm.NewMessageArrived(id)
721+
}
722+
cb(seqs, err)
723+
}
724+
})
727725
}
728726

729727
func (s *server) processAppendError(ctx context.Context, b Replica, err error) error {

0 commit comments

Comments
 (0)