Skip to content

Commit 1913655

Browse files
committed
fix review comments
Signed-off-by: jyjiangkai <jyjiangkai@163.com>
1 parent b4423f7 commit 1913655

9 files changed

Lines changed: 172 additions & 108 deletions

File tree

client/internal/vanus/store/block_store.go

Lines changed: 69 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -28,16 +28,17 @@ import (
2828
"google.golang.org/grpc"
2929

3030
// first-party libraries
31-
"github.com/linkall-labs/vanus/client/pkg/codec"
3231
"github.com/linkall-labs/vanus/observability/log"
3332
"github.com/linkall-labs/vanus/observability/tracing"
3433
"github.com/linkall-labs/vanus/pkg/errors"
3534
cepb "github.com/linkall-labs/vanus/proto/pkg/cloudevents"
3635
errpb "github.com/linkall-labs/vanus/proto/pkg/errors"
3736
segpb "github.com/linkall-labs/vanus/proto/pkg/segment"
3837

38+
// this project
3939
"github.com/linkall-labs/vanus/client/internal/vanus/net/rpc"
4040
"github.com/linkall-labs/vanus/client/internal/vanus/net/rpc/bare"
41+
"github.com/linkall-labs/vanus/client/pkg/codec"
4142
"github.com/linkall-labs/vanus/client/pkg/primitive"
4243
)
4344

@@ -80,19 +81,31 @@ var (
8081
type appendStreamCache struct {
8182
opaqueID uint64
8283
stream segpb.SegmentServer_AppendToBlockStreamClient
84+
reqs chan *segpb.AppendToBlockStreamRequest
8385
callbacks sync.Map
8486
state streamState
8587
once sync.Once
88+
mu sync.RWMutex
8689
}
8790

8891
func (a *appendStreamCache) isRunning() bool {
92+
a.mu.RLock()
93+
defer a.mu.RUnlock()
8994
return a.state == stateRunning
9095
}
9196

9297
func (a *appendStreamCache) isClosed() bool {
98+
a.mu.RLock()
99+
defer a.mu.RUnlock()
93100
return a.state == stateClosed
94101
}
95102

103+
func (a *appendStreamCache) setState(state streamState) {
104+
a.mu.Lock()
105+
defer a.mu.Unlock()
106+
a.state = state
107+
}
108+
96109
func (a *appendStreamCache) release() {
97110
a.releaseStream()
98111
a.releaseCallbacks()
@@ -101,6 +114,7 @@ func (a *appendStreamCache) release() {
101114
func (a *appendStreamCache) releaseStream() {
102115
a.once.Do(func() {
103116
a.stream.CloseSend()
117+
close(a.reqs)
104118
a.state = stateClosed
105119
})
106120
}
@@ -119,19 +133,31 @@ func (a *appendStreamCache) releaseCallbacks() {
119133
type readStreamCache struct {
120134
opaqueID uint64
121135
stream segpb.SegmentServer_ReadFromBlockStreamClient
136+
reqs chan *segpb.ReadFromBlockStreamRequest
122137
callbacks sync.Map
123138
state streamState
124139
once sync.Once
140+
mu sync.RWMutex
125141
}
126142

127143
func (r *readStreamCache) isRunning() bool {
144+
r.mu.RLock()
145+
defer r.mu.RUnlock()
128146
return r.state == stateRunning
129147
}
130148

131149
func (r *readStreamCache) isClosed() bool {
150+
r.mu.RLock()
151+
defer r.mu.RUnlock()
132152
return r.state == stateClosed
133153
}
134154

155+
func (r *readStreamCache) setState(state streamState) {
156+
r.mu.Lock()
157+
defer r.mu.Unlock()
158+
r.state = state
159+
}
160+
135161
func (r *readStreamCache) release() {
136162
r.releaseStream()
137163
r.releaseCallbacks()
@@ -140,6 +166,7 @@ func (r *readStreamCache) release() {
140166
func (r *readStreamCache) releaseStream() {
141167
r.once.Do(func() {
142168
r.stream.CloseSend()
169+
close(r.reqs)
143170
r.state = stateClosed
144171
})
145172
}
@@ -182,6 +209,7 @@ func (s *BlockStore) runAppendStreamRecv(ctx context.Context, append *appendStre
182209
append.releaseCallbacks()
183210
return
184211
}
212+
append.setState(stateClosed)
185213
break
186214
}
187215
c, _ := append.callbacks.LoadAndDelete(res.Id)
@@ -191,6 +219,21 @@ func (s *BlockStore) runAppendStreamRecv(ctx context.Context, append *appendStre
191219
}
192220
}
193221

222+
func (s *BlockStore) runAppendStreamSend(ctx context.Context, append *appendStreamCache) {
223+
for req := range append.reqs {
224+
if err := append.stream.Send(req); err != nil {
225+
log.Error(ctx, "append stream send failed", map[string]interface{}{
226+
log.KeyError: err,
227+
})
228+
// close the current stream connection
229+
append.releaseStream()
230+
// reset new stream connections
231+
s.connectAppendStream(ctx)
232+
return
233+
}
234+
}
235+
}
236+
194237
func (s *BlockStore) runReadStreamRecv(ctx context.Context, read *readStreamCache) {
195238
for {
196239
res, err := read.stream.Recv()
@@ -203,6 +246,7 @@ func (s *BlockStore) runReadStreamRecv(ctx context.Context, read *readStreamCach
203246
read.releaseCallbacks()
204247
return
205248
}
249+
read.setState(stateClosed)
206250
break
207251
}
208252
c, _ := read.callbacks.LoadAndDelete(res.Id)
@@ -212,6 +256,21 @@ func (s *BlockStore) runReadStreamRecv(ctx context.Context, read *readStreamCach
212256
}
213257
}
214258

259+
func (s *BlockStore) runReadStreamSend(ctx context.Context, read *readStreamCache) {
260+
for req := range read.reqs {
261+
if err := read.stream.Send(req); err != nil {
262+
log.Error(ctx, "read stream send failed", map[string]interface{}{
263+
log.KeyError: err,
264+
})
265+
// close the current stream connection
266+
read.releaseStream()
267+
// reset new stream connections
268+
s.connectReadStream(ctx)
269+
return
270+
}
271+
}
272+
}
273+
215274
func (s *BlockStore) connectAppendStream(ctx context.Context) (*appendStreamCache, error) {
216275
s.appendMu.RLock()
217276
if s.append != nil && s.append.isRunning() {
@@ -240,11 +299,13 @@ func (s *BlockStore) connectAppendStream(ctx context.Context) (*appendStreamCach
240299

241300
cache := &appendStreamCache{
242301
stream: stream,
302+
reqs: make(chan *segpb.AppendToBlockStreamRequest, 100),
243303
state: stateRunning,
244304
callbacks: sync.Map{},
245305
}
246306
s.append = cache
247307
go s.runAppendStreamRecv(ctx, cache)
308+
go s.runAppendStreamSend(ctx, cache)
248309
return cache, nil
249310
}
250311

@@ -282,6 +343,7 @@ func (s *BlockStore) connectReadStream(ctx context.Context) (*readStreamCache, e
282343
}
283344
s.read = cache
284345
go s.runReadStreamRecv(ctx, cache)
346+
go s.runReadStreamSend(ctx, cache)
285347
return cache, nil
286348
}
287349

@@ -355,34 +417,14 @@ func (s *BlockStore) AppendManyStream(ctx context.Context, block uint64, events
355417
close(donec)
356418
}))
357419

358-
req := &segpb.AppendToBlockStreamRequest{
420+
append.reqs <- &segpb.AppendToBlockStreamRequest{
359421
Id: opaqueID,
360422
BlockId: block,
361423
Events: &cepb.CloudEventBatch{
362424
Events: eventpbs,
363425
},
364426
}
365427

366-
if err = append.stream.Send(req); err != nil {
367-
log.Error(ctx, "append stream send failed", map[string]interface{}{
368-
log.KeyError: err,
369-
})
370-
// close the current stream connection
371-
append.releaseStream()
372-
// reset new stream connections
373-
s.connectAppendStream(ctx)
374-
c, _ := append.callbacks.LoadAndDelete(opaqueID)
375-
if c != nil {
376-
c.(appendCallback)(&segpb.AppendToBlockStreamResponse{
377-
Id: opaqueID,
378-
ResponseCode: errpb.ErrorCode_CLOSED,
379-
ResponseMsg: "append stream closed",
380-
Offsets: []int64{},
381-
})
382-
}
383-
return nil, err
384-
}
385-
386428
select {
387429
case <-donec:
388430
case <-_ctx.Done():
@@ -394,6 +436,8 @@ func (s *BlockStore) AppendManyStream(ctx context.Context, block uint64, events
394436
ResponseMsg: "append stream context canceled",
395437
Offsets: []int64{},
396438
})
439+
} else {
440+
<-donec
397441
}
398442
}
399443

@@ -475,34 +519,14 @@ func (s *BlockStore) ReadStream(
475519
close(donec)
476520
}))
477521

478-
req := &segpb.ReadFromBlockStreamRequest{
522+
read.reqs <- &segpb.ReadFromBlockStreamRequest{
479523
Id: opaqueID,
480524
BlockId: block,
481525
Offset: offset,
482526
Number: int64(size),
483527
PollingTimeoutInMillisecond: pollingTimeout,
484528
}
485529

486-
if err = s.read.stream.Send(req); err != nil {
487-
log.Error(ctx, "read stream send failed", map[string]interface{}{
488-
log.KeyError: err,
489-
})
490-
read.releaseStream()
491-
s.connectReadStream(ctx)
492-
c, _ := read.callbacks.LoadAndDelete(opaqueID)
493-
if c != nil {
494-
c.(readCallback)(&segpb.ReadFromBlockStreamResponse{
495-
Id: opaqueID,
496-
ResponseCode: errpb.ErrorCode_CLOSED,
497-
ResponseMsg: "read stream closed",
498-
Events: &cepb.CloudEventBatch{
499-
Events: []*cepb.CloudEvent{},
500-
},
501-
})
502-
}
503-
return []*ce.Event{}, err
504-
}
505-
506530
select {
507531
case <-donec:
508532
case <-_ctx.Done():
@@ -516,6 +540,8 @@ func (s *BlockStore) ReadStream(
516540
Events: []*cepb.CloudEvent{},
517541
},
518542
})
543+
} else {
544+
<-donec
519545
}
520546
}
521547

client/pkg/api/client.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ type Eventbus interface {
3333

3434
type BusWriter interface {
3535
AppendOne(ctx context.Context, event *ce.Event, opts ...WriteOption) (eid string, err error)
36+
// TODO(jiangkai): just keep one or make their difference more obvious.
3637
AppendMany(ctx context.Context, events []*ce.Event, opts ...WriteOption) (eid []string, err error)
3738
AppendBatch(ctx context.Context, events *cloudevents.CloudEventBatch, opts ...WriteOption) (err error)
3839
}

client/pkg/eventlog/eventlog.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,15 @@ import (
1919
// standard libraries.
2020
"context"
2121

22-
"github.com/linkall-labs/vanus/proto/pkg/cloudevents"
23-
2422
// third-party libraries.
2523
ce "github.com/cloudevents/sdk-go/v2"
2624

2725
// first-party libraries.
28-
"github.com/linkall-labs/vanus/client/pkg/api"
26+
"github.com/linkall-labs/vanus/proto/pkg/cloudevents"
2927
segpb "github.com/linkall-labs/vanus/proto/pkg/segment"
28+
3029
// this project.
30+
"github.com/linkall-labs/vanus/client/pkg/api"
3131
)
3232

3333
const (
@@ -55,6 +55,7 @@ type LogWriter interface {
5555

5656
AppendMany(ctx context.Context, events *cloudevents.CloudEventBatch) (off int64, err error)
5757

58+
// TODO(jiangkai): unified event data format and maybe make stream as a option of writer is better
5859
AppendManyStream(ctx context.Context, events []*ce.Event) ([]int64, error)
5960
}
6061

client/pkg/eventlog/eventlog_impl.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -579,7 +579,7 @@ func (r *logReader) selectReadableSegment(ctx context.Context) (*segment, error)
579579
segment = r.cur
580580
if segment == nil { // double check
581581
var err error
582-
segment, err := r.elog.selectReadableSegment(ctx, r.pos)
582+
segment, err = r.elog.selectReadableSegment(ctx, r.pos)
583583
if errors.Is(err, errors.ErrOffsetOnEnd) {
584584
r.elog.refreshReadableSegments(ctx)
585585
segment, err = r.elog.selectReadableSegment(ctx, r.pos)

0 commit comments

Comments
 (0)