@@ -32,6 +32,7 @@ import (
3232 "google.golang.org/grpc"
3333
3434 // first-party libraries
35+ errpb "github.com/linkall-labs/vanus/proto/pkg/errors"
3536 segpb "github.com/linkall-labs/vanus/proto/pkg/segment"
3637
3738 // this project
@@ -62,8 +63,6 @@ func newBlockStore(endpoint string) (*BlockStore, error) {
6263 // TODO: check error
6364 return nil , err
6465 }
65- s .runAppendStreamRecv (context .Background (), s .appendStream )
66- s .runReadStreamRecv (context .Background (), s .readStream )
6766 return s , nil
6867}
6968
@@ -142,6 +141,8 @@ func (s *BlockStore) connectAppendStream(ctx context.Context) (segpb.SegmentServ
142141 })
143142 return nil , err
144143 }
144+
145+ s .runAppendStreamRecv (context .Background (), stream )
145146 return stream , nil
146147}
147148
@@ -169,6 +170,8 @@ func (s *BlockStore) connectReadStream(ctx context.Context) (segpb.SegmentServer
169170 })
170171 return nil , err
171172 }
173+
174+ s .runReadStreamRecv (ctx , stream )
172175 return stream , nil
173176}
174177
@@ -207,8 +210,8 @@ func (s *BlockStore) Append(ctx context.Context, block uint64, event *ce.Event)
207210 return res .GetOffsets ()[0 ], nil
208211}
209212
210- func (s * BlockStore ) SyncAppendStream (ctx context.Context , block uint64 , event * ce.Event ) (int64 , error ) {
211- _ctx , span := s .tracer .Start (ctx , "SyncAppendStream " )
213+ func (s * BlockStore ) AppendManyStream (ctx context.Context , block uint64 , events [] * ce.Event ) ([] int64 , error ) {
214+ _ctx , span := s .tracer .Start (ctx , "AppendManyStream " )
212215 defer span .End ()
213216
214217 var (
@@ -220,19 +223,23 @@ func (s *BlockStore) SyncAppendStream(ctx context.Context, block uint64, event *
220223 if s .appendStream == nil {
221224 s .appendStream , err = s .connectAppendStream (_ctx )
222225 if err != nil {
223- return - 1 , err
226+ return nil , err
224227 }
225- s .runAppendStreamRecv (_ctx , s .appendStream )
226228 }
227229
228230 // generate unique RequestId
229- requestID := rand .Uint64 ()
231+ requestID := rand .New ( rand . NewSource ( time . Now (). UnixNano ())). Uint64 ()
230232
231233 wg .Add (1 )
232234
233- eventpb , err := codec .ToProto (event )
234- if err != nil {
235- return - 1 , err
235+ //TODO(jiangkai): delete the reference of CloudEvents/v2 in Vanus
236+ eventpbs := make ([]* cepb.CloudEvent , len (events ))
237+ for idx := range events {
238+ eventpb , err := codec .ToProto (events [idx ])
239+ if err != nil {
240+ return nil , err
241+ }
242+ eventpbs = append (eventpbs , eventpb )
236243 }
237244
238245 s .appendCallbacks .Store (requestID , appendCallback (func (res * segpb.AppendToBlockStreamResponse ) {
@@ -244,7 +251,7 @@ func (s *BlockStore) SyncAppendStream(ctx context.Context, block uint64, event *
244251 RequestId : requestID ,
245252 BlockId : block ,
246253 Events : & cepb.CloudEventBatch {
247- Events : [] * cepb. CloudEvent { eventpb } ,
254+ Events : eventpbs ,
248255 },
249256 }
250257
@@ -259,27 +266,28 @@ func (s *BlockStore) SyncAppendStream(ctx context.Context, block uint64, event *
259266 if c != nil {
260267 c .(appendCallback )(& segpb.AppendToBlockStreamResponse {
261268 ResponseId : requestID ,
262- ResponseCode : segpb .ResponseCode_UNKNOWN ,
269+ ResponseCode : errpb .ErrorCode_CLOSED ,
270+ ResponseMsg : "append stream closed" ,
263271 Offsets : []int64 {},
264272 })
265273 }
266274 }
267- return - 1 , err
275+ return nil , err
268276 }
269277
270278 wg .Wait ()
271279
272- if resp .ResponseCode == segpb . ResponseCode_SegmentFull {
280+ if resp .ResponseCode == errpb . ErrorCode_FULL {
273281 log .Warning (ctx , "block append failed cause the segment is full" , nil )
274- return - 1 , errors .ErrSegmentFull
282+ return nil , errors .ErrFull . WithMessage ( "segment is full" )
275283 }
276284
277- if resp .ResponseCode != segpb . ResponseCode_SUCCESS {
285+ if resp .ResponseCode != errpb . ErrorCode_SUCCESS {
278286 log .Warning (ctx , "block append failed cause unknown error" , nil )
279- return - 1 , errors .ErrUnknown
287+ return nil , errors .ErrUnknown . WithMessage ( "append many stream failed" )
280288 }
281289
282- return resp .Offsets [ 0 ] , nil
290+ return resp .Offsets , nil
283291}
284292
285293func (s * BlockStore ) Read (
@@ -289,10 +297,10 @@ func (s *BlockStore) Read(
289297 defer span .End ()
290298
291299 req := & segpb.ReadFromBlockRequest {
292- BlockId : block ,
293- Offset : offset ,
294- Number : int64 (size ),
295- PollingTimeout : pollingTimeout ,
300+ BlockId : block ,
301+ Offset : offset ,
302+ Number : int64 (size ),
303+ PollingTimeoutInMillisecond : pollingTimeout ,
296304 }
297305
298306 client , err := s .client .Get (ctx )
@@ -323,10 +331,10 @@ func (s *BlockStore) Read(
323331 return []* ce.Event {}, err
324332}
325333
326- func (s * BlockStore ) SyncReadStream (
334+ func (s * BlockStore ) ReadStream (
327335 ctx context.Context , block uint64 , offset int64 , size int16 , pollingTimeout uint32 ,
328336) ([]* ce.Event , error ) {
329- _ctx , span := s .tracer .Start (ctx , "SyncReadStream " )
337+ _ctx , span := s .tracer .Start (ctx , "ReadStream " )
330338 defer span .End ()
331339
332340 var (
@@ -340,7 +348,6 @@ func (s *BlockStore) SyncReadStream(
340348 if err != nil {
341349 return []* ce.Event {}, err
342350 }
343- s .runReadStreamRecv (_ctx , s .readStream )
344351 }
345352
346353 // generate unique RequestId
@@ -354,10 +361,10 @@ func (s *BlockStore) SyncReadStream(
354361 }))
355362
356363 req := & segpb.ReadFromBlockStreamRequest {
357- BlockId : block ,
358- Offset : offset ,
359- Number : int64 (size ),
360- PollingTimeout : pollingTimeout ,
364+ BlockId : block ,
365+ Offset : offset ,
366+ Number : int64 (size ),
367+ PollingTimeoutInMillisecond : pollingTimeout ,
361368 }
362369
363370 if err = s .readStream .Send (req ); err != nil {
@@ -371,7 +378,8 @@ func (s *BlockStore) SyncReadStream(
371378 if c != nil {
372379 c .(readCallback )(& segpb.ReadFromBlockStreamResponse {
373380 ResponseId : requestID ,
374- ResponseCode : segpb .ResponseCode_UNKNOWN ,
381+ ResponseCode : errpb .ErrorCode_CLOSED ,
382+ ResponseMsg : "read stream closed" ,
375383 Events : & cepb.CloudEventBatch {
376384 Events : []* cepb.CloudEvent {},
377385 },
@@ -383,9 +391,9 @@ func (s *BlockStore) SyncReadStream(
383391
384392 wg .Wait ()
385393
386- if resp .ResponseCode != segpb . ResponseCode_SUCCESS {
387- log .Warning (ctx , "block append failed cause unknown error" , nil )
388- return []* ce.Event {}, errors .ErrUnknown
394+ if resp .ResponseCode != errpb . ErrorCode_SUCCESS {
395+ log .Warning (ctx , "block read failed cause unknown error" , nil )
396+ return []* ce.Event {}, errors .ErrUnknown . WithMessage ( "read stream failed" )
389397 }
390398
391399 if batch := resp .GetEvents (); batch != nil {
0 commit comments