@@ -48,7 +48,6 @@ const (
4848 defaultHeartbeatTick = 3
4949 defaultMaxSizePerMsg = 4096
5050 defaultMaxInflightMsgs = 256
51- defaultWaiterWorker = 8
5251)
5352
5453type Peer struct {
@@ -68,13 +67,17 @@ type peer struct {
6867
6968type LeaderChangedListener func (block , leader vanus.ID , term uint64 )
7069
71- type commitWaiter struct {
70+ type CommitWaiter struct {
7271 seqs []int64
7372 offset int64
7473 err error
7574 callback func ([]int64 , error )
7675}
7776
77+ func (w CommitWaiter ) Do () {
78+ w .callback (w .seqs , w .err )
79+ }
80+
7881type Appender interface {
7982 block.Appender
8083
@@ -89,8 +92,8 @@ type appender struct {
8992 actx block.AppendContext
9093 appendMu sync.RWMutex
9194
92- waiters []commitWaiter
93- waiterC chan commitWaiter
95+ waiters []CommitWaiter
96+ waiterC chan CommitWaiter
9497 commitIndex uint64
9598 commitOffset int64
9699 waitMu sync.Mutex
@@ -114,14 +117,19 @@ type appender struct {
114117var _ Appender = (* appender )(nil )
115118
116119func NewAppender (
117- ctx context.Context , raw block.Raw , raftLog * raftlog.Log , host transport.Host , listener LeaderChangedListener ,
120+ ctx context.Context ,
121+ raw block.Raw ,
122+ raftLog * raftlog.Log ,
123+ host transport.Host ,
124+ listener LeaderChangedListener ,
125+ waiterC chan CommitWaiter ,
118126) Appender {
119127 ctx , cancel := context .WithCancel (ctx )
120128
121129 a := & appender {
122130 raw : raw ,
123- waiters : make ([]commitWaiter , 0 ),
124- waiterC : make ( chan commitWaiter ) ,
131+ waiters : make ([]CommitWaiter , 0 ),
132+ waiterC : waiterC ,
125133 listener : listener ,
126134 log : raftLog ,
127135 host : host ,
@@ -200,29 +208,11 @@ func (a *appender) Delete(ctx context.Context) {
200208 a .log .Delete (ctx )
201209}
202210
203- func (a * appender ) runWaiterWorker (ctx context.Context ) {
204- for i := 0 ; i < defaultWaiterWorker ; i ++ {
205- go func () {
206- for {
207- select {
208- case waiter := <- a .waiterC :
209- waiter .callback (waiter .seqs , waiter .err )
210- case <- ctx .Done ():
211- close (a .waiterC )
212- return
213- }
214- }
215- }()
216- }
217- }
218-
219211func (a * appender ) run (ctx context.Context ) {
220212 // TODO(james.yin): reduce Ticker
221213 t := time .NewTicker (defaultTickInterval )
222214 defer t .Stop ()
223215
224- a .runWaiterWorker (ctx )
225-
226216 for {
227217 select {
228218 case <- t .C :
@@ -482,18 +472,24 @@ func (a *appender) reset(ctx context.Context) {
482472}
483473
484474// Append implements async block.raw.
485- func (a * appender ) Append (ctx context.Context , cb func ([]int64 , error ), entries ... block. Entry ) {
475+ func (a * appender ) Append (ctx context.Context , entries []block. Entry , cb func ([]int64 , error )) {
486476 ctx , span := a .tracer .Start (ctx , "Append" )
487477 defer span .End ()
488478
479+ span .AddEvent ("Acquiring append lock" )
480+ a .appendMu .Lock ()
481+ span .AddEvent ("Got append lock" )
482+
483+ defer a .appendMu .Unlock ()
484+
489485 seqs , offset , err := a .append (ctx , entries )
490486 if err != nil && ! errors .Is (err , errors .ErrFull ) {
491487 cb (nil , err )
492488 return
493489 }
494490
495491 // register callback and wait until entries is committed.
496- a .registerCommitWaiter (ctx , commitWaiter {
492+ a .registerCommitWaiter (ctx , CommitWaiter {
497493 seqs : seqs ,
498494 offset : offset ,
499495 err : err ,
@@ -505,12 +501,6 @@ func (a *appender) append(ctx context.Context, entries []block.Entry) ([]int64,
505501 ctx , span := a .tracer .Start (ctx , "append" )
506502 defer span .End ()
507503
508- span .AddEvent ("Acquiring append lock" )
509- a .appendMu .Lock ()
510- span .AddEvent ("Got append lock" )
511-
512- defer a .appendMu .Unlock ()
513-
514504 if ! a .isLeader () {
515505 return nil , 0 , errors .ErrNotLeader .WithMessage ("the appender is not leader" )
516506 }
@@ -548,7 +538,7 @@ func (a *appender) doAppend(ctx context.Context, frags ...block.Fragment) {
548538 _ , _ = a .raw .CommitAppend (ctx , frags ... )
549539}
550540
551- func (a * appender ) registerCommitWaiter (ctx context.Context , waiter commitWaiter ) {
541+ func (a * appender ) registerCommitWaiter (ctx context.Context , waiter CommitWaiter ) {
552542 _ , span := a .tracer .Start (ctx , "waitCommit" )
553543 defer span .End ()
554544
0 commit comments