Skip to content

Commit f285759

Browse files
committed
feat: transport layer performance optimization
Signed-off-by: jyjiangkai <jyjiangkai@163.com>
1 parent 6e9856b commit f285759

12 files changed

Lines changed: 1225 additions & 338 deletions

File tree

client/internal/vanus/store/block_store.go

Lines changed: 225 additions & 84 deletions
Large diffs are not rendered by default.

client/pkg/api/client.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,11 +37,12 @@ type Eventbus interface {
3737
type BusWriter interface {
3838
AppendOne(ctx context.Context, event *ce.Event, opts ...WriteOption) (eid string, err error)
3939
AppendMany(ctx context.Context, events []*ce.Event, opts ...WriteOption) (eid string, err error)
40-
AppendOneStream(ctx context.Context, event *ce.Event, cb Callback, opts ...WriteOption)
40+
SyncAppendOneStream(ctx context.Context, event *ce.Event, opts ...WriteOption) (eid string, err error)
4141
}
4242

4343
type BusReader interface {
4444
Read(ctx context.Context, opts ...ReadOption) ([]*ce.Event, int64, uint64, error)
45+
SyncReadStream(ctx context.Context, opts ...ReadOption) ([]*ce.Event, int64, uint64, error)
4546
}
4647

4748
type Eventlog interface {

client/pkg/eventbus/eventbus.go

Lines changed: 47 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -478,8 +478,8 @@ func (w *busWriter) AppendOne(ctx context.Context, event *ce.Event, opts ...api.
478478
return encoded, nil
479479
}
480480

481-
func (w *busWriter) AppendOneStream(ctx context.Context, event *ce.Event, cb api.Callback, opts ...api.WriteOption) {
482-
_ctx, span := w.tracer.Start(ctx, "AppendOneStream")
481+
func (w *busWriter) SyncAppendOneStream(ctx context.Context, event *ce.Event, opts ...api.WriteOption) (eid string, err error) {
482+
_ctx, span := w.tracer.Start(ctx, "SyncAppendOneStream")
483483
defer span.End()
484484

485485
var writeOpts *api.WriteOptions = w.opts
@@ -493,12 +493,22 @@ func (w *busWriter) AppendOneStream(ctx context.Context, event *ce.Event, cb api
493493
// 1. pick a writer of eventlog
494494
lw, err := w.pickWritableLog(_ctx, writeOpts)
495495
if err != nil {
496-
cb(err)
497-
return
496+
return "", err
498497
}
499498

500499
// 2. append the event to the eventlog
501-
lw.AppendStream(_ctx, event, cb)
500+
off, err := lw.SyncAppendStream(_ctx, event)
501+
if err != nil {
502+
return "", err
503+
}
504+
505+
// 3. generate event ID
506+
var buf [16]byte
507+
binary.BigEndian.PutUint64(buf[0:8], lw.Log().ID())
508+
binary.BigEndian.PutUint64(buf[8:16], uint64(off))
509+
encoded := base64.StdEncoding.EncodeToString(buf[:])
510+
511+
return encoded, nil
502512
}
503513

504514
func (w *busWriter) AppendMany(ctx context.Context, events []*ce.Event, opts ...api.WriteOption) (eid string, err error) {
@@ -567,6 +577,38 @@ func (r *busReader) Read(ctx context.Context, opts ...api.ReadOption) ([]*ce.Eve
567577
return events, off, lr.Log().ID(), nil
568578
}
569579

580+
func (r *busReader) SyncReadStream(ctx context.Context, opts ...api.ReadOption) ([]*ce.Event, int64, uint64, error) {
581+
_ctx, span := r.tracer.Start(ctx, "Read")
582+
defer span.End()
583+
584+
var readOpts *api.ReadOptions = r.opts
585+
if len(opts) > 0 {
586+
readOpts = r.opts.Copy()
587+
for _, opt := range opts {
588+
opt(readOpts)
589+
}
590+
}
591+
592+
// 1. pick a reader of eventlog
593+
lr, err := r.pickReadableLog(_ctx, readOpts)
594+
if err != nil {
595+
return []*ce.Event{}, 0, 0, err
596+
}
597+
598+
// TODO(jiangkai): refactor eventlog interface to avoid seek every time, by jiangkai, 2022.10.24
599+
off, err := lr.Seek(_ctx, readOpts.Policy.Offset(), io.SeekStart)
600+
if err != nil {
601+
return []*ce.Event{}, 0, 0, err
602+
}
603+
604+
// 2. read the event to the eventlog
605+
events, err := lr.SyncReadStream(_ctx, int16(readOpts.BatchSize))
606+
if err != nil {
607+
return []*ce.Event{}, 0, 0, err
608+
}
609+
return events, off, lr.Log().ID(), nil
610+
}
611+
570612
func (r *busReader) Bus() api.Eventbus {
571613
return r.ebus
572614
}

client/pkg/eventlog/eventlog.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ type LogWriter interface {
5151

5252
Append(ctx context.Context, event *ce.Event) (off int64, err error)
5353

54-
AppendStream(ctx context.Context, event *ce.Event, cb api.Callback)
54+
SyncAppendStream(ctx context.Context, event *ce.Event) (int64, error)
5555
}
5656

5757
type LogReader interface {
@@ -62,6 +62,8 @@ type LogReader interface {
6262
// TODO: async
6363
Read(ctx context.Context, size int16) (events []*ce.Event, err error)
6464

65+
SyncReadStream(ctx context.Context, size int16) (events []*ce.Event, err error)
66+
6567
// Seek sets the offset for the next Read to offset,
6668
// interpreted according to whence.
6769
//

client/pkg/eventlog/eventlog_impl.go

Lines changed: 66 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ import (
3030

3131
// this project.
3232
el "github.com/linkall-labs/vanus/client/internal/vanus/eventlog"
33-
"github.com/linkall-labs/vanus/client/pkg/api"
3433
"github.com/linkall-labs/vanus/client/pkg/record"
3534
vlog "github.com/linkall-labs/vanus/observability/log"
3635
"github.com/linkall-labs/vanus/pkg/errors"
@@ -386,13 +385,43 @@ func (w *logWriter) doAppend(ctx context.Context, event *ce.Event) (int64, error
386385
return offset, nil
387386
}
388387

389-
func (w *logWriter) AppendStream(ctx context.Context, event *ce.Event, cb api.Callback) {
388+
func (w *logWriter) SyncAppendStream(ctx context.Context, event *ce.Event) (int64, error) {
389+
// TODO: async for throughput
390+
391+
retryTimes := defaultRetryTimes
392+
for i := 1; i <= retryTimes; i++ {
393+
offset, err := w.doSyncAppendStream(ctx, event)
394+
if err == nil {
395+
return offset, nil
396+
}
397+
vlog.Warning(ctx, "failed to Append", map[string]interface{}{
398+
vlog.KeyError: err,
399+
"offset": offset,
400+
})
401+
if errors.Is(err, errors.ErrSegmentFull) {
402+
if i < retryTimes {
403+
continue
404+
}
405+
}
406+
return -1, err
407+
}
408+
409+
return -1, errors.ErrUnknown
410+
}
411+
412+
func (w *logWriter) doSyncAppendStream(ctx context.Context, event *ce.Event) (int64, error) {
390413
segment, err := w.selectWritableSegment(ctx)
391414
if err != nil {
392-
cb(err)
393-
return
415+
return -1, err
394416
}
395-
segment.AppendStream(ctx, event, cb)
417+
offset, err := segment.SyncAppendStream(ctx, event)
418+
if err != nil {
419+
if errors.Is(err, errors.ErrSegmentFull) {
420+
segment.SetNotWritable()
421+
}
422+
return -1, err
423+
}
424+
return offset, nil
396425
}
397426

398427
func (w *logWriter) selectWritableSegment(ctx context.Context) (*segment, error) {
@@ -470,6 +499,38 @@ func (r *logReader) Read(ctx context.Context, size int16) ([]*ce.Event, error) {
470499
return events, nil
471500
}
472501

502+
func (r *logReader) SyncReadStream(ctx context.Context, size int16) ([]*ce.Event, error) {
503+
if r.cur == nil {
504+
segment, err := r.elog.selectReadableSegment(ctx, r.pos)
505+
if errors.Is(err, errors.ErrOffsetOnEnd) {
506+
r.elog.refreshReadableSegments(ctx)
507+
segment, err = r.elog.selectReadableSegment(ctx, r.pos)
508+
}
509+
if err != nil {
510+
return nil, err
511+
}
512+
r.cur = segment
513+
}
514+
515+
events, err := r.cur.SyncReadStream(ctx, r.pos, size, uint32(r.pollingTimeout(ctx)))
516+
if err != nil {
517+
if errors.Is(err, errors.ErrOffsetOverflow) {
518+
r.elog.refreshReadableSegments(ctx)
519+
if r.switchSegment(ctx) {
520+
return nil, errors.ErrTryAgain
521+
}
522+
}
523+
return nil, err
524+
}
525+
526+
r.pos += int64(len(events))
527+
if r.pos == r.cur.EndOffset() {
528+
r.switchSegment(ctx)
529+
}
530+
531+
return events, nil
532+
}
533+
473534
func (r *logReader) pollingTimeout(ctx context.Context) int64 {
474535
if r.cfg.PollingTimeout == 0 {
475536
return 0

client/pkg/eventlog/log_segment.go

Lines changed: 52 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ import (
3333
segpb "github.com/linkall-labs/vanus/proto/pkg/segment"
3434

3535
// this project.
36-
"github.com/linkall-labs/vanus/client/pkg/api"
36+
3737
"github.com/linkall-labs/vanus/client/pkg/record"
3838
"github.com/linkall-labs/vanus/pkg/errors"
3939
)
@@ -171,16 +171,19 @@ func (s *segment) Append(ctx context.Context, event *ce.Event) (int64, error) {
171171
return off + s.startOffset, nil
172172
}
173173

174-
func (s *segment) AppendStream(ctx context.Context, event *ce.Event, cb api.Callback) {
174+
func (s *segment) SyncAppendStream(ctx context.Context, event *ce.Event) (int64, error) {
175175
_ctx, span := s.tracer.Start(ctx, "AppendStream")
176176
defer span.End()
177177

178178
b := s.preferSegmentBlock()
179179
if b == nil {
180-
cb(errors.ErrNotLeader)
181-
return
180+
return -1, errors.ErrNotLeader
181+
}
182+
off, err := b.SyncAppendStream(_ctx, event)
183+
if err != nil {
184+
return -1, err
182185
}
183-
b.AppendStream(_ctx, event, cb)
186+
return off + s.startOffset, nil
184187
}
185188

186189
func (s *segment) Read(ctx context.Context, from int64, size int16, pollingTimeout uint32) ([]*ce.Event, error) {
@@ -227,6 +230,50 @@ func (s *segment) Read(ctx context.Context, from int64, size int16, pollingTimeo
227230
return events, err
228231
}
229232

233+
func (s *segment) SyncReadStream(ctx context.Context, from int64, size int16, pollingTimeout uint32) ([]*ce.Event, error) {
234+
if from < s.startOffset {
235+
return nil, errors.ErrOffsetUnderflow
236+
}
237+
ctx, span := s.tracer.Start(ctx, "Read")
238+
defer span.End()
239+
240+
if eo := s.endOffset.Load(); eo >= 0 {
241+
if from > eo {
242+
return nil, errors.ErrOffsetOverflow
243+
}
244+
if int64(size) > eo-from {
245+
size = int16(eo - from)
246+
}
247+
}
248+
// TODO: cached read
249+
b := s.preferSegmentBlock()
250+
if b == nil {
251+
return nil, errors.ErrBlockNotFound
252+
}
253+
events, err := b.SyncReadStream(ctx, from-s.startOffset, size, pollingTimeout)
254+
if err != nil {
255+
return nil, err
256+
}
257+
258+
for _, e := range events {
259+
v, ok := e.Extensions()[segpb.XVanusBlockOffset]
260+
if !ok {
261+
continue
262+
}
263+
off, ok := v.(int32)
264+
if !ok {
265+
return events, errors.ErrCorruptedEvent
266+
}
267+
offset := s.startOffset + int64(off)
268+
buf := make([]byte, 8)
269+
binary.BigEndian.PutUint64(buf, uint64(offset))
270+
e.SetExtension(XVanusLogOffset, buf)
271+
e.SetExtension(segpb.XVanusBlockOffset, nil)
272+
}
273+
274+
return events, err
275+
}
276+
230277
func (s *segment) preferSegmentBlock() *block {
231278
s.mu.RLock()
232279
defer s.mu.RUnlock()

client/pkg/eventlog/segment_block.go

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ import (
2323

2424
// this project
2525
"github.com/linkall-labs/vanus/client/internal/vanus/store"
26-
"github.com/linkall-labs/vanus/client/pkg/api"
2726
"github.com/linkall-labs/vanus/client/pkg/record"
2827
"github.com/linkall-labs/vanus/pkg/errors"
2928
)
@@ -57,8 +56,8 @@ func (s *block) Append(ctx context.Context, event *ce.Event) (int64, error) {
5756
return s.store.Append(ctx, s.id, event)
5857
}
5958

60-
func (s *block) AppendStream(ctx context.Context, event *ce.Event, cb api.Callback) {
61-
s.store.AppendStream(ctx, s.id, event, cb)
59+
func (s *block) SyncAppendStream(ctx context.Context, event *ce.Event) (int64, error) {
60+
return s.store.SyncAppendStream(ctx, s.id, event)
6261
}
6362

6463
func (s *block) Read(ctx context.Context, offset int64, size int16, pollingTimeout uint32) ([]*ce.Event, error) {
@@ -74,3 +73,17 @@ func (s *block) Read(ctx context.Context, offset int64, size int16, pollingTimeo
7473
}
7574
return s.store.Read(ctx, s.id, offset, size, pollingTimeout)
7675
}
76+
77+
func (s *block) SyncReadStream(ctx context.Context, offset int64, size int16, pollingTimeout uint32) ([]*ce.Event, error) {
78+
if offset < 0 {
79+
return nil, errors.ErrOffsetUnderflow
80+
}
81+
if size > 0 {
82+
// doRead
83+
} else if size == 0 {
84+
return make([]*ce.Event, 0), nil
85+
} else if size < 0 {
86+
return nil, errors.ErrInvalidArgument
87+
}
88+
return s.store.SyncReadStream(ctx, s.id, offset, size, pollingTimeout)
89+
}

internal/gateway/gateway.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ func (ga *ceGateway) receive(ctx context.Context, event v2.Event) (*v2.Event, pr
141141
v, _ = ga.busWriter.LoadOrStore(ebName, ga.client.Eventbus(ctx, ebName).Writer())
142142
}
143143
writer, _ := v.(api.BusWriter)
144-
eventID, err := writer.AppendOne(_ctx, &event)
144+
eventID, err := writer.SyncAppendOneStream(_ctx, &event)
145145
if err != nil {
146146
log.Warning(_ctx, "append to failed", map[string]interface{}{
147147
log.KeyError: err,

0 commit comments

Comments
 (0)