-
Notifications
You must be signed in to change notification settings - Fork 268
Expand file tree
/
Copy pathaggregation.go
More file actions
137 lines (113 loc) · 3.9 KB
/
Copy pathaggregation.go
File metadata and controls
137 lines (113 loc) · 3.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
package block
import (
"context"
"fmt"
"time"
)
// AggregationLoop is responsible for aggregating transactions into blocks.
func (m *Manager) AggregationLoop(ctx context.Context, errCh chan<- error) {
initialHeight := m.genesis.InitialHeight //nolint:gosec
height, err := m.store.Height(ctx)
if err != nil {
m.logger.Error().Err(err).Msg("error while getting store height")
return
}
var delay time.Duration
if height < initialHeight {
delay = time.Until(m.genesis.StartTime.Add(m.config.Node.BlockTime.Duration))
} else {
lastBlockTime := m.getLastBlockTime()
delay = time.Until(lastBlockTime.Add(m.config.Node.BlockTime.Duration))
}
if delay > 0 {
m.logger.Info().Dur("delay", delay).Msg("waiting to produce block")
time.Sleep(delay)
}
// blockTimer is used to signal when to build a block based on the
// chain block time. A timer is used so that the time to build a block
// can be taken into account.
blockTimer := time.NewTimer(0)
defer blockTimer.Stop()
// Lazy Sequencer mode.
// In Lazy Sequencer mode, blocks are built only when there are
// transactions or every LazyBlockTime.
if m.config.Node.LazyMode {
if err := m.lazyAggregationLoop(ctx, blockTimer); err != nil {
errCh <- fmt.Errorf("error in lazy aggregation loop: %w", err)
}
return
}
if err := m.normalAggregationLoop(ctx, blockTimer); err != nil {
errCh <- fmt.Errorf("error in normal aggregation loop: %w", err)
}
}
func (m *Manager) lazyAggregationLoop(ctx context.Context, blockTimer *time.Timer) error {
// lazyTimer triggers block publication even during inactivity
lazyTimer := time.NewTimer(0)
defer lazyTimer.Stop()
for {
select {
case <-ctx.Done():
return nil
case <-lazyTimer.C:
m.logger.Debug().Msg("Lazy timer triggered block production")
if err := m.produceBlock(ctx, "lazy_timer", lazyTimer, blockTimer); err != nil {
return err
}
case <-blockTimer.C:
if m.txsAvailable {
if err := m.produceBlock(ctx, "block_timer", lazyTimer, blockTimer); err != nil {
return err
}
m.txsAvailable = false
} else {
// Ensure we keep ticking even when there are no txs
blockTimer.Reset(m.config.Node.BlockTime.Duration)
}
case <-m.txNotifyCh:
m.txsAvailable = true
}
}
}
// produceBlock handles the common logic for producing a block and resetting timers
func (m *Manager) produceBlock(ctx context.Context, mode string, lazyTimer, blockTimer *time.Timer) error {
start := time.Now()
// Attempt to publish the block
if err := m.publishBlock(ctx); err != nil && ctx.Err() == nil {
return fmt.Errorf("error while publishing block: %w", err)
}
m.logger.Debug().Str("mode", mode).Msg("Successfully published block")
// Reset both timers for the next aggregation window
lazyTimer.Reset(getRemainingSleep(start, m.config.Node.LazyBlockInterval.Duration))
blockTimer.Reset(getRemainingSleep(start, m.config.Node.BlockTime.Duration))
return nil
}
func (m *Manager) normalAggregationLoop(ctx context.Context, blockTimer *time.Timer) error {
for {
select {
case <-ctx.Done():
return nil
case <-blockTimer.C:
// Define the start time for the block production period
start := time.Now()
if err := m.publishBlock(ctx); err != nil && ctx.Err() == nil {
return fmt.Errorf("error while publishing block: %w", err)
}
// Reset the blockTimer to signal the next block production
// period based on the block time.
blockTimer.Reset(getRemainingSleep(start, m.config.Node.BlockTime.Duration))
case <-m.txNotifyCh:
// Transaction notifications are intentionally ignored in normal mode
// to avoid triggering block production outside the scheduled intervals.
// We just update the txsAvailable flag for tracking purposes
m.txsAvailable = true
}
}
}
func getRemainingSleep(start time.Time, interval time.Duration) time.Duration {
elapsed := time.Since(start)
if elapsed < interval {
return interval - elapsed
}
return time.Millisecond
}