Skip to content

Commit 05c7c11

Browse files
authored
feat(gossip): implement result reporting for payloads buffer push (#5443)
Signed-off-by: Shubham Singh <shubhsoch@gmail.com>
1 parent 4298d72 commit 05c7c11

3 files changed

Lines changed: 19 additions & 12 deletions

File tree

gossip/state/payloads_buffer.go

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@ import (
2020
// sequence numbers. It also will provide the capability
2121
// to signal whenever expected block has arrived.
2222
type PayloadsBuffer interface {
23-
// Adds new block into the buffer
24-
Push(payload *proto.Payload)
23+
// Adds new block into the buffer, returns true if added
24+
Push(payload *proto.Payload) bool
2525

2626
// Returns next expected sequence number
2727
Next() uint64
@@ -73,16 +73,16 @@ func (b *PayloadsBufferImpl) Ready() chan struct{} {
7373
// Push new payload into the buffer structure in case new arrived payload
7474
// sequence number is below the expected next block number payload will be
7575
// thrown away.
76-
// TODO return bool to indicate if payload was added or not, so that caller can log result.
77-
func (b *PayloadsBufferImpl) Push(payload *proto.Payload) {
76+
// Returns true if payload was added, false otherwise.
77+
func (b *PayloadsBufferImpl) Push(payload *proto.Payload) bool {
7878
b.mutex.Lock()
7979
defer b.mutex.Unlock()
8080

8181
seqNum := payload.SeqNum
8282

8383
if seqNum < b.next || b.buf[seqNum] != nil {
8484
b.logger.Debugf("Payload with sequence number = %d has been already processed", payload.SeqNum)
85-
return
85+
return false
8686
}
8787

8888
b.buf[seqNum] = payload
@@ -91,6 +91,7 @@ func (b *PayloadsBufferImpl) Push(payload *proto.Payload) {
9191
if seqNum == b.next && len(b.readyChan) == 0 {
9292
b.readyChan <- struct{}{}
9393
}
94+
return true
9495
}
9596

9697
// Next function provides the number of the next expected block
@@ -153,9 +154,10 @@ type metricsBuffer struct {
153154
chainID string
154155
}
155156

156-
func (mb *metricsBuffer) Push(payload *proto.Payload) {
157-
mb.PayloadsBuffer.Push(payload)
157+
func (mb *metricsBuffer) Push(payload *proto.Payload) bool {
158+
res := mb.PayloadsBuffer.Push(payload)
158159
mb.reportSize()
160+
return res
159161
}
160162

161163
func (mb *metricsBuffer) Pop() *proto.Payload {

gossip/state/payloads_buffer_test.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,8 @@ func TestPayloadsBufferImpl_Push(t *testing.T) {
4747
t.Fatal("Wasn't able to generate random payload for test")
4848
}
4949

50-
t.Log("Pushing new payload into buffer")
51-
buffer.Push(payload)
50+
t.Log("Pushing old payload into buffer (should return false)")
51+
require.False(t, buffer.Push(payload))
5252

5353
// Payloads with sequence number less than buffer top
5454
// index should not be accepted
@@ -64,8 +64,8 @@ func TestPayloadsBufferImpl_Push(t *testing.T) {
6464
t.Fatal("Wasn't able to generate random payload for test")
6565
}
6666

67-
t.Log("Pushing new payload into buffer")
68-
buffer.Push(payload)
67+
t.Log("Pushing valid payload into buffer (should return true)")
68+
require.True(t, buffer.Push(payload))
6969
t.Log("Getting next block sequence number")
7070
require.Equal(t, buffer.Next(), uint64(5))
7171
t.Log("Check block buffer size")
@@ -141,6 +141,9 @@ func TestPayloadsBufferImpl_ConcurrentPush(t *testing.T) {
141141
require.Equal(t, int32(1), atomic.LoadInt32(&ready))
142142
// Buffer size has to be only one
143143
require.Equal(t, 1, buffer.Size())
144+
145+
// Check that we can't push it again
146+
require.False(t, buffer.Push(payload))
144147
}
145148

146149
// Tests the scenario where payload pushes and pops are interleaved after a Ready() signal.

gossip/state/state.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -775,7 +775,9 @@ func (s *GossipStateProviderImpl) addPayload(payload *proto.Payload, blockingMod
775775
time.Sleep(enqueueRetryInterval)
776776
}
777777

778-
s.payloads.Push(payload)
778+
if !s.payloads.Push(payload) {
779+
s.logger.Debugf("Payload with sequence number %d was not added to buffer (already processed or outdated)", payload.SeqNum)
780+
}
779781
s.logger.Debugf("Blocks payloads buffer size for channel [%s] is %d blocks", s.chainID, s.payloads.Size())
780782
return nil
781783
}

0 commit comments

Comments
 (0)