@@ -18,8 +18,8 @@ import (
1818 // standard libraries
1919 "context"
2020 "io"
21- "math/rand"
2221 "sync"
22+ "sync/atomic"
2323 "time"
2424
2525 "github.com/linkall-labs/vanus/observability/tracing"
@@ -66,17 +66,18 @@ func newBlockStore(endpoint string) (*BlockStore, error) {
6666 return s , nil
6767}
6868
69- type streamstate string
69+ type streamState string
7070
7171var (
72- stateRunning streamstate = "running"
73- stateCLosed streamstate = "closed"
72+ stateRunning streamState = "running"
73+ stateClosed streamState = "closed"
7474)
7575
7676type appendStreamCache struct {
77+ opaqueID uint64
7778 stream segpb.SegmentServer_AppendToBlockStreamClient
7879 callbacks sync.Map
79- state streamstate
80+ state streamState
8081 once sync.Once
8182}
8283
@@ -85,7 +86,7 @@ func (a *appendStreamCache) isRunning() bool {
8586}
8687
8788func (a * appendStreamCache ) isClosed () bool {
88- return a .state == stateCLosed
89+ return a .state == stateClosed
8990}
9091
9192func (a * appendStreamCache ) release () {
@@ -96,27 +97,26 @@ func (a *appendStreamCache) release() {
9697func (a * appendStreamCache ) releaseStream () {
9798 a .once .Do (func () {
9899 a .stream .CloseSend ()
99- a .state = stateCLosed
100+ a .state = stateClosed
100101 })
101102}
102103
103104func (a * appendStreamCache ) releaseCallbacks () {
104105 a .callbacks .Range (func (key , value interface {}) bool {
105- if value != nil {
106- value .(appendCallback )(& segpb.AppendToBlockStreamResponse {
107- ResponseCode : errpb .ErrorCode_CLOSED ,
108- ResponseMsg : "append stream closed" ,
109- Offsets : []int64 {},
110- })
111- }
106+ value .(appendCallback )(& segpb.AppendToBlockStreamResponse {
107+ ResponseCode : errpb .ErrorCode_CLOSED ,
108+ ResponseMsg : "append stream closed" ,
109+ Offsets : []int64 {},
110+ })
112111 return true
113112 })
114113}
115114
116115type readStreamCache struct {
116+ opaqueID uint64
117117 stream segpb.SegmentServer_ReadFromBlockStreamClient
118118 callbacks sync.Map
119- state streamstate
119+ state streamState
120120 once sync.Once
121121}
122122
@@ -125,7 +125,7 @@ func (r *readStreamCache) isRunning() bool {
125125}
126126
127127func (r * readStreamCache ) isClosed () bool {
128- return r .state == stateCLosed
128+ return r .state == stateClosed
129129}
130130
131131func (r * readStreamCache ) release () {
@@ -136,21 +136,19 @@ func (r *readStreamCache) release() {
136136func (r * readStreamCache ) releaseStream () {
137137 r .once .Do (func () {
138138 r .stream .CloseSend ()
139- r .state = stateCLosed
139+ r .state = stateClosed
140140 })
141141}
142142
143143func (r * readStreamCache ) releaseCallbacks () {
144144 r .callbacks .Range (func (key , value interface {}) bool {
145- if value != nil {
146- value .(readCallback )(& segpb.ReadFromBlockStreamResponse {
147- ResponseCode : errpb .ErrorCode_CLOSED ,
148- ResponseMsg : "read stream closed" ,
149- Events : & cepb.CloudEventBatch {
150- Events : []* cepb.CloudEvent {},
151- },
152- })
153- }
145+ value .(readCallback )(& segpb.ReadFromBlockStreamResponse {
146+ ResponseCode : errpb .ErrorCode_CLOSED ,
147+ ResponseMsg : "read stream closed" ,
148+ Events : & cepb.CloudEventBatch {
149+ Events : []* cepb.CloudEvent {},
150+ },
151+ })
154152 return true
155153 })
156154}
@@ -335,7 +333,7 @@ func (s *BlockStore) AppendManyStream(ctx context.Context, block uint64, events
335333 }
336334
337335 // generate unique opaqueID
338- opaqueID := rand . New ( rand . NewSource ( time . Now (). UnixNano ())). Uint64 ( )
336+ atomic . AddUint64 ( & append . opaqueID , 1 )
339337
340338 //TODO(jiangkai): delete the reference of CloudEvents/v2 in Vanus
341339 eventpbs := make ([]* cepb.CloudEvent , len (events ))
@@ -348,13 +346,13 @@ func (s *BlockStore) AppendManyStream(ctx context.Context, block uint64, events
348346 }
349347
350348 donec := make (chan struct {})
351- append .callbacks .Store (opaqueID , appendCallback (func (res * segpb.AppendToBlockStreamResponse ) {
349+ append .callbacks .Store (append . opaqueID , appendCallback (func (res * segpb.AppendToBlockStreamResponse ) {
352350 resp = res
353351 close (donec )
354352 }))
355353
356354 req := & segpb.AppendToBlockStreamRequest {
357- Id : opaqueID ,
355+ Id : append . opaqueID ,
358356 BlockId : block ,
359357 Events : & cepb.CloudEventBatch {
360358 Events : eventpbs ,
@@ -369,10 +367,10 @@ func (s *BlockStore) AppendManyStream(ctx context.Context, block uint64, events
369367 append .releaseStream ()
370368 // reset new stream connections
371369 s .connectAppendStream (ctx )
372- c , _ := append .callbacks .LoadAndDelete (opaqueID )
370+ c , _ := append .callbacks .LoadAndDelete (append . opaqueID )
373371 if c != nil {
374372 c .(appendCallback )(& segpb.AppendToBlockStreamResponse {
375- Id : opaqueID ,
373+ Id : append . opaqueID ,
376374 ResponseCode : errpb .ErrorCode_CLOSED ,
377375 ResponseMsg : "append stream closed" ,
378376 Offsets : []int64 {},
@@ -384,10 +382,10 @@ func (s *BlockStore) AppendManyStream(ctx context.Context, block uint64, events
384382 select {
385383 case <- donec :
386384 case <- _ctx .Done ():
387- c , _ := append .callbacks .LoadAndDelete (opaqueID )
385+ c , _ := append .callbacks .LoadAndDelete (append . opaqueID )
388386 if c != nil {
389387 c .(appendCallback )(& segpb.AppendToBlockStreamResponse {
390- Id : opaqueID ,
388+ Id : append . opaqueID ,
391389 ResponseCode : errpb .ErrorCode_CONTEXT_CANCELED ,
392390 ResponseMsg : "append stream context canceled" ,
393391 Offsets : []int64 {},
@@ -465,10 +463,10 @@ func (s *BlockStore) ReadStream(
465463 }
466464
467465 // generate unique RequestId
468- opaqueID := rand . New ( rand . NewSource ( time . Now (). UnixNano ())). Uint64 ( )
466+ atomic . AddUint64 ( & read . opaqueID , 1 )
469467
470468 donec := make (chan struct {})
471- read .callbacks .Store (opaqueID , readCallback (func (res * segpb.ReadFromBlockStreamResponse ) {
469+ read .callbacks .Store (read . opaqueID , readCallback (func (res * segpb.ReadFromBlockStreamResponse ) {
472470 resp = res
473471 close (donec )
474472 }))
@@ -486,10 +484,10 @@ func (s *BlockStore) ReadStream(
486484 })
487485 read .releaseStream ()
488486 s .connectReadStream (ctx )
489- c , _ := read .callbacks .LoadAndDelete (opaqueID )
487+ c , _ := read .callbacks .LoadAndDelete (read . opaqueID )
490488 if c != nil {
491489 c .(readCallback )(& segpb.ReadFromBlockStreamResponse {
492- Id : opaqueID ,
490+ Id : read . opaqueID ,
493491 ResponseCode : errpb .ErrorCode_CLOSED ,
494492 ResponseMsg : "read stream closed" ,
495493 Events : & cepb.CloudEventBatch {
@@ -503,10 +501,10 @@ func (s *BlockStore) ReadStream(
503501 select {
504502 case <- donec :
505503 case <- _ctx .Done ():
506- c , _ := read .callbacks .LoadAndDelete (opaqueID )
504+ c , _ := read .callbacks .LoadAndDelete (read . opaqueID )
507505 if c != nil {
508506 c .(readCallback )(& segpb.ReadFromBlockStreamResponse {
509- Id : opaqueID ,
507+ Id : read . opaqueID ,
510508 ResponseCode : errpb .ErrorCode_CONTEXT_CANCELED ,
511509 ResponseMsg : "read stream context canceled" ,
512510 Events : & cepb.CloudEventBatch {
0 commit comments