@@ -24,7 +24,6 @@ import (
2424 "time"
2525
2626 // third-party libraries.
27- "go.opentelemetry.io/otel/attribute"
2827 "go.opentelemetry.io/otel/trace"
2928
3029 // first-party libraries.
@@ -67,11 +66,6 @@ type peer struct {
6766
6867type LeaderChangedListener func (block , leader vanus.ID , term uint64 )
6968
70- type commitWaiter struct {
71- offset int64
72- c chan struct {}
73- }
74-
7569type Appender interface {
7670 block.Appender
7771
@@ -86,11 +80,6 @@ type appender struct {
8680 actx block.AppendContext
8781 appendMu sync.RWMutex
8882
89- waiters []commitWaiter
90- commitIndex uint64
91- commitOffset int64
92- waitMu sync.Mutex
93-
9483 leaderID vanus.ID
9584 listener LeaderChangedListener
9685
@@ -116,7 +105,6 @@ func NewAppender(
116105
117106 a := & appender {
118107 raw : raw ,
119- waiters : make ([]commitWaiter , 0 ),
120108 listener : listener ,
121109 log : raftLog ,
122110 host : host ,
@@ -126,7 +114,6 @@ func NewAppender(
126114 tracer : tracing .NewTracer ("store.block.raft.appender" , trace .SpanKindInternal ),
127115 }
128116 a .actx = a .raw .NewAppendContext (nil )
129- a .commitOffset = a .actx .WriteOffset ()
130117
131118 a .log .SetSnapshotOperator (a )
132119 a .host .Register (a .ID ().Uint64 (), a )
@@ -145,9 +132,6 @@ func NewAppender(
145132 }
146133 a .node = raft .RestartNode (c )
147134
148- // Access Commit after raft.RestartNode to ensure raft state is initialized.
149- a .commitIndex = a .log .HardState ().Commit
150-
151135 go a .run (ctx )
152136
153137 return a
@@ -207,13 +191,6 @@ func (a *appender) run(ctx context.Context) {
207191 case rd := <- a .node .Ready ():
208192 rCtx , span := a .tracer .Start (ctx , "RaftReady" , trace .WithNewRoot ())
209193
210- var partial bool
211- stateChanged := ! raft .IsEmptyHardState (rd .HardState )
212- if stateChanged {
213- // Wake up fast before writing logs.
214- partial = a .wakeup (rCtx , rd .HardState .Commit )
215- }
216-
217194 if len (rd .Entries ) != 0 {
218195 log .Debug (rCtx , "Append entries to raft log." , map [string ]interface {}{
219196 "node_id" : a .ID (),
@@ -238,11 +215,7 @@ func (a *appender) run(ctx context.Context) {
238215 })
239216 }
240217
241- if stateChanged {
242- // Wake up after writing logs.
243- if partial {
244- _ = a .wakeup (rCtx , rd .HardState .Commit )
245- }
218+ if ! raft .IsEmptyHardState (rd .HardState ) {
246219 log .Debug (rCtx , "Persist raft hard state." , map [string ]interface {}{
247220 "node_id" : a .ID (),
248221 "hard_state" : rd .HardState ,
@@ -336,40 +309,6 @@ func (a *appender) applyEntries(ctx context.Context, committedEntries []raftpb.E
336309 return committedEntries [num - 1 ].Index
337310}
338311
339- // wakeup wakes up append requests to the smaller of the committed or last index.
340- func (a * appender ) wakeup (ctx context.Context , commit uint64 ) (partial bool ) {
341- _ , span := a .tracer .Start (ctx , "wakeup" , trace .WithAttributes (
342- attribute .Int64 ("commit" , int64 (commit ))))
343- defer span .End ()
344-
345- li , _ := a .log .LastIndex ()
346- if commit > li {
347- commit = li
348- partial = true
349- }
350-
351- if commit <= a .commitIndex {
352- return
353- }
354- a .commitIndex = commit
355-
356- for off := commit ; off > 0 ; off -- {
357- pbEntries , err := a .log .Entries (off , off + 1 , 0 )
358- if err != nil {
359- return
360- }
361-
362- pbEntry := pbEntries [0 ]
363- if pbEntry .Type == raftpb .EntryNormal && len (pbEntry .Data ) > 0 {
364- frag := block .NewFragment (pbEntry .Data )
365- a .doWakeup (ctx , frag .EndOffset ())
366- return
367- }
368- }
369-
370- return partial
371- }
372-
373312func (a * appender ) becomeLeader (ctx context.Context ) {
374313 ctx , span := a .tracer .Start (ctx , "becomeLeader" )
375314 defer span .End ()
@@ -459,65 +398,65 @@ func (a *appender) reset(ctx context.Context) {
459398}
460399
461400// Append implements block.raw.
462- func (a * appender ) Append (ctx context.Context , entries ... block.Entry ) ([] int64 , error ) {
401+ func (a * appender ) Append (ctx context.Context , entries [] block.Entry , cb block. AppendCallback ) {
463402 ctx , span := a .tracer .Start (ctx , "Append" )
464403 defer span .End ()
465404
466- seqs , offset , err := a .append (ctx , entries )
467- if err != nil {
468- if errors .Is (err , errors .ErrSegmentFull ) {
469- _ = a .waitCommit (ctx , offset )
470- }
471- return nil , err
472- }
473-
474- // Wait until entries is committed.
475- err = a .waitCommit (ctx , offset )
476- if err != nil {
477- return nil , err
478- }
479-
480- return seqs , nil
481- }
482-
483- func (a * appender ) append (ctx context.Context , entries []block.Entry ) ([]int64 , int64 , error ) {
484- ctx , span := a .tracer .Start (ctx , "append" )
485- defer span .End ()
486-
487405 span .AddEvent ("Acquiring append lock" )
488406 a .appendMu .Lock ()
489407 span .AddEvent ("Got append lock" )
490408
491- defer a .appendMu .Unlock ()
492-
493409 if ! a .isLeader () {
494- return nil , 0 , errors .ErrNotLeader
410+ a .appendMu .Unlock ()
411+ cb (nil , errors .ErrNotLeader )
412+ return
495413 }
496414
497415 if a .actx .Archived () {
498- return nil , a .actx .WriteOffset (), errors .ErrSegmentFull
416+ a .appendMu .Unlock ()
417+ cb (nil , errors .ErrSegmentFull )
418+ return
499419 }
500420
501421 seqs , frag , enough , err := a .raw .PrepareAppend (ctx , a .actx , entries ... )
502422 if err != nil {
503- return nil , 0 , err
423+ a .appendMu .Unlock ()
424+ cb (nil , err )
425+ return
504426 }
505- off := a .actx .WriteOffset ()
506427
507428 data , _ := block .MarshalFragment (ctx , frag )
508- if err = a .node .Propose (ctx , data ); err != nil {
509- return nil , 0 , err
510- }
511429
430+ var pds []raft.ProposeData
512431 if enough {
513- if frag , err = a .raw .PrepareArchive (ctx , a .actx ); err == nil {
514- data , _ := block .MarshalFragment (ctx , frag )
515- _ = a . node . Propose ( ctx , data )
432+ if frag , err : = a .raw .PrepareArchive (ctx , a .actx ); err == nil {
433+ archivedData , _ := block .MarshalFragment (ctx , frag )
434+ pds = make ([]raft. ProposeData , 2 )
516435 // FIXME(james.yin): revert archived if propose failed.
436+ pds [1 ] = raft.ProposeData {
437+ Data : archivedData ,
438+ }
439+ } else {
440+ pds = make ([]raft.ProposeData , 1 )
517441 }
442+ } else {
443+ pds = make ([]raft.ProposeData , 1 )
518444 }
519445
520- return seqs , off , nil
446+ pds [0 ] = raft.ProposeData {
447+ Data : data ,
448+ Callback : func (err error ) {
449+ if err != nil {
450+ cb (nil , err )
451+ } else {
452+ cb (seqs , nil )
453+ }
454+ },
455+ }
456+
457+ a .node .Propose (ctx , pds ... )
458+
459+ a .appendMu .Unlock ()
521460}
522461
523462func (a * appender ) doAppend (ctx context.Context , frags ... block.Fragment ) {
@@ -527,57 +466,6 @@ func (a *appender) doAppend(ctx context.Context, frags ...block.Fragment) {
527466 _ , _ = a .raw .CommitAppend (ctx , frags ... )
528467}
529468
530- func (a * appender ) waitCommit (ctx context.Context , offset int64 ) error {
531- ctx , span := a .tracer .Start (ctx , "waitCommit" )
532- defer span .End ()
533-
534- span .AddEvent ("Acquiring wait lock" )
535- a .waitMu .Lock ()
536- span .AddEvent ("Got wait lock" )
537-
538- if offset <= a .commitOffset {
539- a .waitMu .Unlock ()
540- return nil
541- }
542-
543- ch := make (chan struct {})
544- a .waiters = append (a .waiters , commitWaiter {
545- offset : offset ,
546- c : ch ,
547- })
548-
549- a .waitMu .Unlock ()
550-
551- // FIXME(james.yin): lost leader
552- select {
553- case <- ch :
554- return nil
555- case <- ctx .Done ():
556- return ctx .Err ()
557- }
558- }
559-
560- func (a * appender ) doWakeup (ctx context.Context , commit int64 ) {
561- _ , span := a .tracer .Start (ctx , "doWakeup" )
562- defer span .End ()
563-
564- span .AddEvent ("Acquiring wait lock" )
565- a .waitMu .Lock ()
566- span .AddEvent ("Got wait lock" )
567-
568- defer a .waitMu .Unlock ()
569-
570- for len (a .waiters ) != 0 {
571- waiter := a .waiters [0 ]
572- if waiter .offset > commit {
573- break
574- }
575- close (waiter .c )
576- a .waiters = a .waiters [1 :]
577- }
578- a .commitOffset = commit
579- }
580-
581469func (a * appender ) Status () ClusterStatus {
582470 leader , term := a .leaderInfo ()
583471 return ClusterStatus {
0 commit comments