Skip to content

Commit 86a6f0e

Browse files
committed
fix review comments
Signed-off-by: jyjiangkai <jyjiangkai@163.com>
1 parent 98dc0f9 commit 86a6f0e

8 files changed

Lines changed: 52 additions & 94 deletions

File tree

client/internal/vanus/store/block_store.go

Lines changed: 26 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -22,15 +22,15 @@ import (
2222
"sync/atomic"
2323
"time"
2424

25-
"github.com/linkall-labs/vanus/observability/tracing"
26-
"go.opentelemetry.io/otel/trace"
27-
25+
// third-party libraries
2826
ce "github.com/cloudevents/sdk-go/v2"
27+
"go.opentelemetry.io/otel/trace"
2928
"google.golang.org/grpc"
3029

3130
// first-party libraries
32-
33-
// third-party libraries
31+
"github.com/linkall-labs/vanus/observability/log"
32+
"github.com/linkall-labs/vanus/observability/tracing"
33+
"github.com/linkall-labs/vanus/pkg/errors"
3434
cepb "github.com/linkall-labs/vanus/proto/pkg/cloudevents"
3535
errpb "github.com/linkall-labs/vanus/proto/pkg/errors"
3636
segpb "github.com/linkall-labs/vanus/proto/pkg/segment"
@@ -40,8 +40,6 @@ import (
4040
"github.com/linkall-labs/vanus/client/internal/vanus/net/rpc"
4141
"github.com/linkall-labs/vanus/client/internal/vanus/net/rpc/bare"
4242
"github.com/linkall-labs/vanus/client/pkg/primitive"
43-
"github.com/linkall-labs/vanus/observability/log"
44-
"github.com/linkall-labs/vanus/pkg/errors"
4543
)
4644

4745
func newBlockStore(endpoint string) (*BlockStore, error) {
@@ -53,13 +51,20 @@ func newBlockStore(endpoint string) (*BlockStore, error) {
5351
})),
5452
tracer: tracing.NewTracer("internal.store.BlockStore", trace.SpanKindClient),
5553
}
54+
// TODO(jiangkai): delay creating streams to reduce invalid overhead.
5655
_, err = s.connectAppendStream(context.Background())
5756
if err != nil {
57+
// close client
58+
s.client.Close()
5859
// TODO: check error
5960
return nil, err
6061
}
6162
_, err = s.connectReadStream(context.Background())
6263
if err != nil {
64+
// close append stream
65+
s.append.releaseStream()
66+
// close client
67+
s.client.Close()
6368
// TODO: check error
6469
return nil, err
6570
}
@@ -333,7 +338,7 @@ func (s *BlockStore) AppendManyStream(ctx context.Context, block uint64, events
333338
}
334339

335340
// generate unique opaqueID
336-
atomic.AddUint64(&append.opaqueID, 1)
341+
opaqueID := atomic.AddUint64(&append.opaqueID, 1)
337342

338343
//TODO(jiangkai): delete the reference of CloudEvents/v2 in Vanus
339344
eventpbs := make([]*cepb.CloudEvent, len(events))
@@ -346,13 +351,13 @@ func (s *BlockStore) AppendManyStream(ctx context.Context, block uint64, events
346351
}
347352

348353
donec := make(chan struct{})
349-
append.callbacks.Store(append.opaqueID, appendCallback(func(res *segpb.AppendToBlockStreamResponse) {
354+
append.callbacks.Store(opaqueID, appendCallback(func(res *segpb.AppendToBlockStreamResponse) {
350355
resp = res
351356
close(donec)
352357
}))
353358

354359
req := &segpb.AppendToBlockStreamRequest{
355-
Id: append.opaqueID,
360+
Id: opaqueID,
356361
BlockId: block,
357362
Events: &cepb.CloudEventBatch{
358363
Events: eventpbs,
@@ -367,10 +372,10 @@ func (s *BlockStore) AppendManyStream(ctx context.Context, block uint64, events
367372
append.releaseStream()
368373
// reset new stream connections
369374
s.connectAppendStream(ctx)
370-
c, _ := append.callbacks.LoadAndDelete(append.opaqueID)
375+
c, _ := append.callbacks.LoadAndDelete(opaqueID)
371376
if c != nil {
372377
c.(appendCallback)(&segpb.AppendToBlockStreamResponse{
373-
Id: append.opaqueID,
378+
Id: opaqueID,
374379
ResponseCode: errpb.ErrorCode_CLOSED,
375380
ResponseMsg: "append stream closed",
376381
Offsets: []int64{},
@@ -382,10 +387,10 @@ func (s *BlockStore) AppendManyStream(ctx context.Context, block uint64, events
382387
select {
383388
case <-donec:
384389
case <-_ctx.Done():
385-
c, _ := append.callbacks.LoadAndDelete(append.opaqueID)
390+
c, _ := append.callbacks.LoadAndDelete(opaqueID)
386391
if c != nil {
387392
c.(appendCallback)(&segpb.AppendToBlockStreamResponse{
388-
Id: append.opaqueID,
393+
Id: opaqueID,
389394
ResponseCode: errpb.ErrorCode_CONTEXT_CANCELED,
390395
ResponseMsg: "append stream context canceled",
391396
Offsets: []int64{},
@@ -463,15 +468,16 @@ func (s *BlockStore) ReadStream(
463468
}
464469

465470
// generate unique RequestId
466-
atomic.AddUint64(&read.opaqueID, 1)
471+
opaqueID := atomic.AddUint64(&read.opaqueID, 1)
467472

468473
donec := make(chan struct{})
469-
read.callbacks.Store(read.opaqueID, readCallback(func(res *segpb.ReadFromBlockStreamResponse) {
474+
read.callbacks.Store(opaqueID, readCallback(func(res *segpb.ReadFromBlockStreamResponse) {
470475
resp = res
471476
close(donec)
472477
}))
473478

474479
req := &segpb.ReadFromBlockStreamRequest{
480+
Id: opaqueID,
475481
BlockId: block,
476482
Offset: offset,
477483
Number: int64(size),
@@ -484,10 +490,10 @@ func (s *BlockStore) ReadStream(
484490
})
485491
read.releaseStream()
486492
s.connectReadStream(ctx)
487-
c, _ := read.callbacks.LoadAndDelete(read.opaqueID)
493+
c, _ := read.callbacks.LoadAndDelete(opaqueID)
488494
if c != nil {
489495
c.(readCallback)(&segpb.ReadFromBlockStreamResponse{
490-
Id: read.opaqueID,
496+
Id: opaqueID,
491497
ResponseCode: errpb.ErrorCode_CLOSED,
492498
ResponseMsg: "read stream closed",
493499
Events: &cepb.CloudEventBatch{
@@ -501,10 +507,10 @@ func (s *BlockStore) ReadStream(
501507
select {
502508
case <-donec:
503509
case <-_ctx.Done():
504-
c, _ := read.callbacks.LoadAndDelete(read.opaqueID)
510+
c, _ := read.callbacks.LoadAndDelete(opaqueID)
505511
if c != nil {
506512
c.(readCallback)(&segpb.ReadFromBlockStreamResponse{
507-
Id: read.opaqueID,
513+
Id: opaqueID,
508514
ResponseCode: errpb.ErrorCode_CONTEXT_CANCELED,
509515
ResponseMsg: "read stream context canceled",
510516
Events: &cepb.CloudEventBatch{

client/pkg/eventlog/eventlog_impl.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,19 +22,19 @@ import (
2222
"sync"
2323
"time"
2424

25-
"github.com/linkall-labs/vanus/proto/pkg/cloudevents"
26-
27-
"github.com/linkall-labs/vanus/observability/tracing"
28-
"go.opentelemetry.io/otel/trace"
29-
3025
// third-party libraries.
3126
ce "github.com/cloudevents/sdk-go/v2"
27+
"go.opentelemetry.io/otel/trace"
28+
29+
// first-party libraries
30+
vlog "github.com/linkall-labs/vanus/observability/log"
31+
"github.com/linkall-labs/vanus/observability/tracing"
32+
"github.com/linkall-labs/vanus/pkg/errors"
33+
"github.com/linkall-labs/vanus/proto/pkg/cloudevents"
3234

3335
// this project.
3436
el "github.com/linkall-labs/vanus/client/internal/vanus/eventlog"
3537
"github.com/linkall-labs/vanus/client/pkg/record"
36-
vlog "github.com/linkall-labs/vanus/observability/log"
37-
"github.com/linkall-labs/vanus/pkg/errors"
3838
)
3939

4040
const (

client/pkg/eventlog/log_segment.go

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -22,22 +22,19 @@ import (
2222
"sync"
2323
"time"
2424

25-
"github.com/linkall-labs/vanus/proto/pkg/cloudevents"
26-
27-
"github.com/linkall-labs/vanus/observability/tracing"
28-
"go.opentelemetry.io/otel/trace"
29-
3025
// third-party libraries.
3126
ce "github.com/cloudevents/sdk-go/v2"
27+
"go.opentelemetry.io/otel/trace"
3228
"go.uber.org/atomic"
3329

3430
// first-party libraries.
31+
"github.com/linkall-labs/vanus/observability/tracing"
32+
"github.com/linkall-labs/vanus/pkg/errors"
33+
"github.com/linkall-labs/vanus/proto/pkg/cloudevents"
3534
segpb "github.com/linkall-labs/vanus/proto/pkg/segment"
3635

3736
// this project.
38-
3937
"github.com/linkall-labs/vanus/client/pkg/record"
40-
"github.com/linkall-labs/vanus/pkg/errors"
4138
)
4239

4340
func newSegment(ctx context.Context, r *record.Segment, towrite bool) (*segment, error) {
@@ -70,7 +67,7 @@ func newBlockExt(ctx context.Context, r *record.Segment, leaderOnly bool) (*bloc
7067
id := r.LeaderBlockID
7168
if id == 0 {
7269
if leaderOnly {
73-
return nil, errors.ErrNotLeader.WithMessage("the block is not leader")
70+
return nil, errors.ErrNoLeader.WithMessage("the block no leader")
7471
}
7572
for _, b := range r.Blocks {
7673
if b.Endpoint != "" {
@@ -283,7 +280,7 @@ func (s *segment) ReadStream(ctx context.Context, from int64, size int16, pollin
283280
}
284281
off, ok := v.(int32)
285282
if !ok {
286-
return events, errors.ErrInvalidRequest.WithMessage("corrupted event")
283+
return events, errors.ErrCorruptedEvent.WithMessage("corrupted event")
287284
}
288285
offset := s.startOffset + int64(off)
289286
buf := make([]byte, 8)

internal/store/block/raft/appender.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,6 @@ func NewAppender(
104104
raftLog *raftlog.Log,
105105
host transport.Host,
106106
listener LeaderChangedListener,
107-
callbackC chan func(),
108107
) Appender {
109108
ctx, cancel := context.WithCancel(ctx)
110109

internal/store/segment/api.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,13 @@ import (
1919
"context"
2020

2121
// third-party libraries.
22-
cepb "github.com/linkall-labs/vanus/proto/pkg/cloudevents"
22+
2323
"google.golang.org/protobuf/types/known/emptypb"
2424

2525
// first-party libraries.
2626
"github.com/linkall-labs/vanus/observability/log"
2727
"github.com/linkall-labs/vanus/pkg/errors"
28+
cepb "github.com/linkall-labs/vanus/proto/pkg/cloudevents"
2829
errpb "github.com/linkall-labs/vanus/proto/pkg/errors"
2930
segpb "github.com/linkall-labs/vanus/proto/pkg/segment"
3031

internal/store/segment/recovery.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ func (s *server) recoverBlocks(ctx context.Context, logs map[vanus.ID]*raftlog.L
7878
return err
7979
}
8080
}
81-
a := raft.NewAppender(context.TODO(), r, l, s.host, s.leaderChanged, s.callbackC)
81+
a := raft.NewAppender(context.TODO(), r, l, s.host, s.leaderChanged)
8282
s.replicas.Store(id, &replica{
8383
id: id,
8484
idStr: id.String(),

internal/store/segment/replica.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ func (s *server) createBlock(ctx context.Context, id vanus.ID, size int64) (Repl
116116

117117
// Create replica.
118118
l := raftlog.NewLog(id, s.wal, s.metaStore, s.offsetStore, nil)
119-
a := raft.NewAppender(context.TODO(), r, l, s.host, s.leaderChanged, s.callbackC)
119+
a := raft.NewAppender(context.TODO(), r, l, s.host, s.leaderChanged)
120120

121121
return &replica{
122122
id: id,

internal/store/segment/server.go

Lines changed: 9 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ import (
4444
"github.com/linkall-labs/vanus/observability/metrics"
4545
"github.com/linkall-labs/vanus/observability/tracing"
4646
"github.com/linkall-labs/vanus/pkg/cluster"
47+
"github.com/linkall-labs/vanus/pkg/errors"
4748
"github.com/linkall-labs/vanus/pkg/util"
4849
cepb "github.com/linkall-labs/vanus/proto/pkg/cloudevents"
4950
ctrlpb "github.com/linkall-labs/vanus/proto/pkg/controller"
@@ -64,7 +65,6 @@ import (
6465
ceschema "github.com/linkall-labs/vanus/internal/store/schema/ce"
6566
ceconv "github.com/linkall-labs/vanus/internal/store/schema/ce/convert"
6667
"github.com/linkall-labs/vanus/internal/store/vsb"
67-
"github.com/linkall-labs/vanus/pkg/errors"
6868
)
6969

7070
const (
@@ -136,29 +136,6 @@ type leaderInfo struct {
136136
term uint64
137137
}
138138

139-
type appendResult struct {
140-
seqs []int64
141-
err error
142-
}
143-
144-
type appendFuture chan appendResult
145-
146-
func newAppendFuture() appendFuture {
147-
return make(appendFuture, 1)
148-
}
149-
150-
func (af appendFuture) onAppended(seqs []int64, err error) {
151-
af <- appendResult{
152-
seqs: seqs,
153-
err: err,
154-
}
155-
}
156-
157-
func (af appendFuture) wait() ([]int64, error) {
158-
res := <-af
159-
return res.seqs, res.err
160-
}
161-
162139
type server struct {
163140
replicas sync.Map // vanus.ID, Replica
164141

@@ -712,37 +689,15 @@ func (s *server) AppendToBlock(ctx context.Context, id vanus.ID, events []*cepb.
712689

713690
metrics.WriteTPSCounterVec.WithLabelValues(s.volumeIDStr, b.IDStr()).Add(float64(len(events)))
714691
metrics.WriteThroughputCounterVec.WithLabelValues(s.volumeIDStr, b.IDStr()).Add(float64(size))
715-
716-
future := newAppendFuture()
717-
b.Append(ctx, entries, future.onAppended)
718-
seqs, err := future.wait()
719-
if err != nil {
720-
cb(nil, s.processAppendError(ctx, b, err))
721-
return
722-
}
723-
724-
// TODO(weihe.yin) make this method deep to code
725-
s.pm.NewMessageArrived(id)
726-
cb(seqs, nil)
727-
}
728-
729-
func (s *server) processAppendError(ctx context.Context, b Replica, err error) error {
730-
if stderr.As(err, &errors.ErrorType{}) {
731-
return err
732-
}
733-
734-
if errors.Is(err, errors.ErrFull) {
735-
log.Debug(ctx, "Append failed: block is full.", map[string]interface{}{
736-
"block_id": b.ID(),
737-
})
738-
return errors.ErrFull
739-
}
740-
741-
log.Warning(ctx, "Append failed.", map[string]interface{}{
742-
"block_id": b.ID(),
743-
log.KeyError: err,
692+
b.Append(ctx, entries, func(seqs []int64, err error) {
693+
s.callbackC <- func() {
694+
if err == nil {
695+
// TODO(weihe.yin) make this method deep to code
696+
s.pm.NewMessageArrived(id)
697+
}
698+
cb(seqs, err)
699+
}
744700
})
745-
return errors.ErrInternal.WithMessage("write to storage failed").Wrap(err)
746701
}
747702

748703
func (s *server) onBlockArchived(stat block.Statistics) {

0 commit comments

Comments
 (0)