Skip to content

Commit 39a3140

Browse files
committed
Merge branch 'main' of github.com:linkall-labs/vanus into clientstream-v2
Signed-off-by: jyjiangkai <jyjiangkai@163.com>
2 parents 83363a1 + b3f905f commit 39a3140

118 files changed

Lines changed: 4219 additions & 2366 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.golangci.yaml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -396,6 +396,9 @@ issues:
396396
- source: "strconv|make|len|math"
397397
linters:
398398
- gomnd
399+
- path: "action"
400+
linters:
401+
- dupl
399402
- path: "convert.go"
400403
linters:
401404
- dupl
@@ -412,7 +415,7 @@ issues:
412415
linters:
413416
- gosec
414417
- gomnd
415-
- path: "action.go"
418+
- path: "init.go"
416419
linters:
417420
- gochecknoinits
418421
- path: "^vsctl"

client/go.mod

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ module github.com/linkall-labs/vanus/client
33
go 1.18
44

55
require (
6-
cloudevents.io/genproto v1.0.2
76
github.com/cloudevents/sdk-go/v2 v2.11.0
87
github.com/golang/mock v1.6.0
98
github.com/linkall-labs/vanus/observability v0.5.1
@@ -29,9 +28,7 @@ require (
2928
github.com/modern-go/reflect2 v1.0.2 // indirect
3029
github.com/pkg/errors v0.9.1 // indirect
3130
github.com/sirupsen/logrus v1.9.0 // indirect
32-
github.com/stretchr/objx v0.4.0 // indirect
3331
go.opentelemetry.io/otel v1.11.1 // indirect
34-
go.opentelemetry.io/otel/exporters/jaeger v1.9.0 // indirect
3532
go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.9.0 // indirect
3633
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.9.0 // indirect
3734
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.9.0 // indirect
@@ -49,7 +46,7 @@ require (
4946
)
5047

5148
replace (
52-
cloudevents.io/genproto => ../proto/include/cloudevents/pkg
49+
cloud.google.com/go => cloud.google.com/go v0.100.2
5350
github.com/linkall-labs/vanus/observability => ../observability
5451
github.com/linkall-labs/vanus/pkg => ../pkg
5552
github.com/linkall-labs/vanus/proto => ../proto

client/go.sum

Lines changed: 38 additions & 262 deletions
Large diffs are not rendered by default.

client/internal/vanus/codec/protobuf.go

Lines changed: 22 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,9 @@ import (
1212
stdtime "time"
1313

1414
// third-party libraries
15-
cepb "cloudevents.io/genproto/v1"
1615
"github.com/cloudevents/sdk-go/v2/event"
1716
"github.com/cloudevents/sdk-go/v2/types"
17+
cepb "github.com/linkall-labs/vanus/proto/pkg/cloudevents"
1818
"google.golang.org/protobuf/types/known/anypb"
1919
"google.golang.org/protobuf/types/known/timestamppb"
2020
)
@@ -35,14 +35,14 @@ var (
3535
zeroTime = stdtime.Time{}
3636
)
3737

38-
// convert an SDK event to a protobuf variant of the event that can be marshaled.
38+
// ToProto convert an SDK event to a protobuf variant of the event that can be marshaled.
3939
func ToProto(e *event.Event) (*cepb.CloudEvent, error) {
4040
container := &cepb.CloudEvent{
4141
Id: e.ID(),
4242
Source: e.Source(),
4343
SpecVersion: e.SpecVersion(),
4444
Type: e.Type(),
45-
Attributes: make(map[string]*cepb.CloudEventAttributeValue),
45+
Attributes: make(map[string]*cepb.CloudEvent_CloudEventAttributeValue),
4646
}
4747
if e.DataContentType() != "" {
4848
container.Attributes[datacontenttype], _ = attributeFor(e.DataContentType())
@@ -78,39 +78,39 @@ func ToProto(e *event.Event) (*cepb.CloudEvent, error) {
7878
return container, nil
7979
}
8080

81-
func attributeFor(v interface{}) (*cepb.CloudEventAttributeValue, error) {
81+
func attributeFor(v interface{}) (*cepb.CloudEvent_CloudEventAttributeValue, error) {
8282
vv, err := types.Validate(v)
8383
if err != nil {
8484
return nil, err
8585
}
86-
attr := &cepb.CloudEventAttributeValue{}
86+
attr := &cepb.CloudEvent_CloudEventAttributeValue{}
8787
switch vt := vv.(type) {
8888
case bool:
89-
attr.Attr = &cepb.CloudEventAttributeValue_CeBoolean{
89+
attr.Attr = &cepb.CloudEvent_CloudEventAttributeValue_CeBoolean{
9090
CeBoolean: vt,
9191
}
9292
case int32:
93-
attr.Attr = &cepb.CloudEventAttributeValue_CeInteger{
93+
attr.Attr = &cepb.CloudEvent_CloudEventAttributeValue_CeInteger{
9494
CeInteger: vt,
9595
}
9696
case string:
97-
attr.Attr = &cepb.CloudEventAttributeValue_CeString{
97+
attr.Attr = &cepb.CloudEvent_CloudEventAttributeValue_CeString{
9898
CeString: vt,
9999
}
100100
case []byte:
101-
attr.Attr = &cepb.CloudEventAttributeValue_CeBytes{
101+
attr.Attr = &cepb.CloudEvent_CloudEventAttributeValue_CeBytes{
102102
CeBytes: vt,
103103
}
104104
case types.URI:
105-
attr.Attr = &cepb.CloudEventAttributeValue_CeUri{
105+
attr.Attr = &cepb.CloudEvent_CloudEventAttributeValue_CeUri{
106106
CeUri: vt.String(),
107107
}
108108
case types.URIRef:
109-
attr.Attr = &cepb.CloudEventAttributeValue_CeUriRef{
109+
attr.Attr = &cepb.CloudEvent_CloudEventAttributeValue_CeUriRef{
110110
CeUriRef: vt.String(),
111111
}
112112
case types.Timestamp:
113-
attr.Attr = &cepb.CloudEventAttributeValue_CeTimestamp{
113+
attr.Attr = &cepb.CloudEvent_CloudEventAttributeValue_CeTimestamp{
114114
CeTimestamp: timestamppb.New(vt.Time),
115115
}
116116
default:
@@ -119,38 +119,38 @@ func attributeFor(v interface{}) (*cepb.CloudEventAttributeValue, error) {
119119
return attr, nil
120120
}
121121

122-
func valueFrom(attr *cepb.CloudEventAttributeValue) (interface{}, error) {
122+
func valueFrom(attr *cepb.CloudEvent_CloudEventAttributeValue) (interface{}, error) {
123123
var v interface{}
124124
switch vt := attr.Attr.(type) {
125-
case *cepb.CloudEventAttributeValue_CeBoolean:
125+
case *cepb.CloudEvent_CloudEventAttributeValue_CeBoolean:
126126
v = vt.CeBoolean
127-
case *cepb.CloudEventAttributeValue_CeInteger:
127+
case *cepb.CloudEvent_CloudEventAttributeValue_CeInteger:
128128
v = vt.CeInteger
129-
case *cepb.CloudEventAttributeValue_CeString:
129+
case *cepb.CloudEvent_CloudEventAttributeValue_CeString:
130130
v = vt.CeString
131-
case *cepb.CloudEventAttributeValue_CeBytes:
131+
case *cepb.CloudEvent_CloudEventAttributeValue_CeBytes:
132132
v = vt.CeBytes
133-
case *cepb.CloudEventAttributeValue_CeUri:
133+
case *cepb.CloudEvent_CloudEventAttributeValue_CeUri:
134134
uri, err := url.Parse(vt.CeUri)
135135
if err != nil {
136136
return nil, fmt.Errorf("failed to parse URI value %s: %s", vt.CeUri, err.Error())
137137
}
138138
v = uri
139-
case *cepb.CloudEventAttributeValue_CeUriRef:
139+
case *cepb.CloudEvent_CloudEventAttributeValue_CeUriRef:
140140
uri, err := url.Parse(vt.CeUriRef)
141141
if err != nil {
142142
return nil, fmt.Errorf("failed to parse URIRef value %s: %s", vt.CeUriRef, err.Error())
143143
}
144144
v = types.URIRef{URL: *uri}
145-
case *cepb.CloudEventAttributeValue_CeTimestamp:
145+
case *cepb.CloudEvent_CloudEventAttributeValue_CeTimestamp:
146146
v = vt.CeTimestamp.AsTime()
147147
default:
148148
return nil, fmt.Errorf("unsupported attribute type: %T", vt)
149149
}
150150
return types.Validate(v)
151151
}
152152

153-
// Convert from a protobuf variant into the generic, SDK event.
153+
// FromProto Convert from a protobuf variant into the generic, SDK event.
154154
func FromProto(container *cepb.CloudEvent) (*event.Event, error) {
155155
e := event.New()
156156
e.SetID(container.Id)
@@ -179,7 +179,7 @@ func FromProto(container *cepb.CloudEvent) (*event.Event, error) {
179179
if container.Attributes != nil {
180180
attr := container.Attributes[datacontenttype]
181181
if attr != nil {
182-
if stattr, ok := attr.Attr.(*cepb.CloudEventAttributeValue_CeString); ok {
182+
if stattr, ok := attr.Attr.(*cepb.CloudEvent_CloudEventAttributeValue_CeString); ok {
183183
contentType = stattr.CeString
184184
}
185185
}

client/internal/vanus/store/block_store.go

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,13 @@ import (
2525
"github.com/linkall-labs/vanus/observability/tracing"
2626
"go.opentelemetry.io/otel/trace"
2727

28-
// third-party libraries
29-
cepb "cloudevents.io/genproto/v1"
3028
ce "github.com/cloudevents/sdk-go/v2"
3129
"google.golang.org/grpc"
3230

3331
// first-party libraries
32+
33+
// third-party libraries
34+
cepb "github.com/linkall-labs/vanus/proto/pkg/cloudevents"
3435
errpb "github.com/linkall-labs/vanus/proto/pkg/errors"
3536
segpb "github.com/linkall-labs/vanus/proto/pkg/segment"
3637

@@ -560,3 +561,25 @@ func (s *BlockStore) LookupOffset(ctx context.Context, blockID uint64, t time.Ti
560561
}
561562
return res.Offset, nil
562563
}
564+
565+
func (s *BlockStore) AppendBatch(ctx context.Context, block uint64, event *cepb.CloudEventBatch) (int64, error) {
566+
_ctx, span := s.tracer.Start(ctx, "AppendBatch")
567+
defer span.End()
568+
569+
req := &segpb.AppendToBlockRequest{
570+
BlockId: block,
571+
Events: event,
572+
}
573+
574+
client, err := s.client.Get(_ctx)
575+
if err != nil {
576+
return -1, err
577+
}
578+
579+
res, err := client.(segpb.SegmentServerClient).AppendToBlock(_ctx, req)
580+
if err != nil {
581+
return -1, err
582+
}
583+
// TODO(Y. F. Zhang): batch events
584+
return res.GetOffsets()[0], nil
585+
}

client/pkg/api/client.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"context"
2020

2121
ce "github.com/cloudevents/sdk-go/v2"
22+
"github.com/linkall-labs/vanus/proto/pkg/cloudevents"
2223
)
2324

2425
type Eventbus interface {
@@ -33,6 +34,7 @@ type Eventbus interface {
3334
type BusWriter interface {
3435
AppendOne(ctx context.Context, event *ce.Event, opts ...WriteOption) (eid string, err error)
3536
AppendMany(ctx context.Context, events []*ce.Event, opts ...WriteOption) (eid []string, err error)
37+
AppendBatch(ctx context.Context, events *cloudevents.CloudEventBatch, opts ...WriteOption) (err error)
3638
}
3739

3840
type BusReader interface {

client/pkg/api/mock_client.go

Lines changed: 20 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

client/pkg/eventbus/eventbus.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"encoding/base64"
2121
"encoding/binary"
2222
stderrors "errors"
23+
"github.com/linkall-labs/vanus/proto/pkg/cloudevents"
2324
"io"
2425
"sync"
2526

@@ -444,6 +445,29 @@ type busWriter struct {
444445
tracer *tracing.Tracer
445446
}
446447

448+
func (w *busWriter) AppendBatch(ctx context.Context, events *cloudevents.CloudEventBatch, opts ...api.WriteOption) (err error) {
449+
_ctx, span := w.tracer.Start(ctx, "CloudEventBatch")
450+
defer span.End()
451+
452+
var writeOpts *api.WriteOptions = w.opts
453+
if len(opts) > 0 {
454+
writeOpts = w.opts.Copy()
455+
for _, opt := range opts {
456+
opt(writeOpts)
457+
}
458+
}
459+
460+
// 1. pick a writer of eventlog
461+
lw, err := w.pickWritableLog(_ctx, writeOpts)
462+
if err != nil {
463+
return err
464+
}
465+
466+
// 2. append the event to the eventlog
467+
_, err = lw.AppendMany(_ctx, events)
468+
return err
469+
}
470+
447471
var _ api.BusWriter = (*busWriter)(nil)
448472

449473
func (w *busWriter) AppendOne(ctx context.Context, event *ce.Event, opts ...api.WriteOption) (eid string, err error) {

client/pkg/eventlog/eventlog.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ import (
1919
// standard libraries.
2020
"context"
2121

22+
"github.com/linkall-labs/vanus/proto/pkg/cloudevents"
23+
2224
// third-party libraries.
2325
ce "github.com/cloudevents/sdk-go/v2"
2426

@@ -51,6 +53,8 @@ type LogWriter interface {
5153

5254
Append(ctx context.Context, event *ce.Event) (off int64, err error)
5355

56+
AppendMany(ctx context.Context, events *cloudevents.CloudEventBatch) (off int64, err error)
57+
5458
AppendManyStream(ctx context.Context, events []*ce.Event) ([]int64, error)
5559
}
5660

client/pkg/eventlog/eventlog_impl.go

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ import (
2222
"sync"
2323
"time"
2424

25+
"github.com/linkall-labs/vanus/proto/pkg/cloudevents"
26+
2527
"github.com/linkall-labs/vanus/observability/tracing"
2628
"go.opentelemetry.io/otel/trace"
2729

@@ -329,6 +331,10 @@ func (l *eventlog) refreshReadableSegments(ctx context.Context) {
329331
_ = l.readableWatcher.Refresh(ctx)
330332
}
331333

334+
var (
335+
_ LogWriter = &logWriter{}
336+
)
337+
332338
// logWriter is the writer of eventlog.
333339
//
334340
// Append is thread-safety.
@@ -338,6 +344,28 @@ type logWriter struct {
338344
mu sync.RWMutex
339345
}
340346

347+
func (w *logWriter) AppendMany(ctx context.Context, events *cloudevents.CloudEventBatch) (off int64, err error) {
348+
retryTimes := defaultRetryTimes
349+
for i := 1; i <= retryTimes; i++ {
350+
offset, err := w.doAppendBatch(ctx, events)
351+
if err == nil {
352+
return offset, nil
353+
}
354+
vlog.Warning(ctx, "failed to Append", map[string]interface{}{
355+
vlog.KeyError: err,
356+
"offset": offset,
357+
})
358+
if errors.Is(err, errors.ErrFull) {
359+
if i < retryTimes {
360+
continue
361+
}
362+
}
363+
return -1, err
364+
}
365+
366+
return -1, errors.ErrUnknown
367+
}
368+
341369
func (w *logWriter) Log() Eventlog {
342370
return w.elog
343371
}
@@ -428,6 +456,21 @@ func (w *logWriter) doSyncAppendStream(ctx context.Context, events []*ce.Event)
428456
return offsets, nil
429457
}
430458

459+
func (w *logWriter) doAppendBatch(ctx context.Context, event *cloudevents.CloudEventBatch) (int64, error) {
460+
segment, err := w.selectWritableSegment(ctx)
461+
if err != nil {
462+
return -1, err
463+
}
464+
offset, err := segment.AppendBatch(ctx, event)
465+
if err != nil {
466+
if errors.Is(err, errors.ErrFull) {
467+
segment.SetNotWritable()
468+
}
469+
return -1, err
470+
}
471+
return offset, nil
472+
}
473+
431474
func (w *logWriter) selectWritableSegment(ctx context.Context) (*segment, error) {
432475
segment := func() *segment {
433476
w.mu.RLock()

0 commit comments

Comments
 (0)