Skip to content

Commit 3cff2cc

Browse files
committed
FIxes linting
1 parent a005109 commit 3cff2cc

3 files changed

Lines changed: 23 additions & 36 deletions

File tree

pkg/chipingress/batch/client.go

Lines changed: 16 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,12 @@ import (
44
"context"
55
"time"
66

7-
"github.com/smartcontractkit/chainlink-common/pkg/chipingress"
87
"go.uber.org/zap"
8+
9+
"github.com/smartcontractkit/chainlink-common/pkg/chipingress"
910
)
1011

11-
type BatchClient struct {
12+
type Client struct {
1213
client chipingress.Client
1314
batchSize int
1415
maxConcurrentSends chan struct{}
@@ -19,11 +20,10 @@ type BatchClient struct {
1920
log *zap.SugaredLogger
2021
}
2122

22-
type Opt func(*BatchClient)
23+
type Opt func(*Client)
2324

24-
func NewBatchClient(client chipingress.Client, opts ...Opt) (*BatchClient, error) {
25-
26-
c := &BatchClient{
25+
func NewBatchClient(client chipingress.Client, opts ...Opt) (*Client, error) {
26+
c := &Client{
2727
client: client,
2828
batchSize: 1,
2929
maxConcurrentSends: make(chan struct{}, 1),
@@ -40,9 +40,8 @@ func NewBatchClient(client chipingress.Client, opts ...Opt) (*BatchClient, error
4040
return c, nil
4141
}
4242

43-
func (b *BatchClient) Start(ctx context.Context) {
43+
func (b *Client) Start(ctx context.Context) {
4444
go func() {
45-
4645
batch := make([]*chipingress.CloudEventPb, 0, b.batchSize)
4746
timer := time.NewTimer(b.batchTimeout)
4847
timer.Stop()
@@ -57,7 +56,6 @@ func (b *BatchClient) Start(ctx context.Context) {
5756
b.flush(batch)
5857
return
5958
case event := <-b.messageBuffer:
60-
6159
if len(batch) == 0 {
6260
timer.Reset(b.batchTimeout)
6361
}
@@ -81,7 +79,7 @@ func (b *BatchClient) Start(ctx context.Context) {
8179
}()
8280
}
8381

84-
func (b *BatchClient) Stop() {
82+
func (b *Client) Stop() {
8583
close(b.shutdownChan)
8684
// wait for pending sends by getting all semaphore slots
8785
for range cap(b.maxConcurrentSends) {
@@ -92,8 +90,7 @@ func (b *BatchClient) Stop() {
9290
// QueueMessage queues a single message to the batch client.
9391
// Returns immediately with no blocking - drops message if channel is full.
9492
// Returns true if message was queued, false if it was dropped.
95-
func (b *BatchClient) QueueMessage(event *chipingress.CloudEventPb) bool {
96-
93+
func (b *Client) QueueMessage(event *chipingress.CloudEventPb) bool {
9794
if event == nil {
9895
return false
9996
}
@@ -106,8 +103,7 @@ func (b *BatchClient) QueueMessage(event *chipingress.CloudEventPb) bool {
106103
}
107104
}
108105

109-
func (b *BatchClient) sendBatch(ctx context.Context, events []*chipingress.CloudEventPb) {
110-
106+
func (b *Client) sendBatch(ctx context.Context, events []*chipingress.CloudEventPb) {
111107
if len(events) == 0 {
112108
return
113109
}
@@ -123,8 +119,7 @@ func (b *BatchClient) sendBatch(ctx context.Context, events []*chipingress.Cloud
123119
}()
124120
}
125121

126-
func (b *BatchClient) flush(batch []*chipingress.CloudEventPb) {
127-
122+
func (b *Client) flush(batch []*chipingress.CloudEventPb) {
128123
if len(batch) == 0 {
129124
return
130125
}
@@ -136,31 +131,31 @@ func (b *BatchClient) flush(batch []*chipingress.CloudEventPb) {
136131
}
137132

138133
func WithBatchSize(batchSize int) Opt {
139-
return func(c *BatchClient) {
134+
return func(c *Client) {
140135
c.batchSize = batchSize
141136
}
142137
}
143138

144139
func WithMaxConcurrentSends(maxConcurrentSends int) Opt {
145-
return func(c *BatchClient) {
140+
return func(c *Client) {
146141
c.maxConcurrentSends = make(chan struct{}, maxConcurrentSends)
147142
}
148143
}
149144

150145
func WithBatchTimeout(batchTimeout time.Duration) Opt {
151-
return func(c *BatchClient) {
146+
return func(c *Client) {
152147
c.batchTimeout = batchTimeout
153148
}
154149
}
155150

156151
func WithCompressionType(compressionType string) Opt {
157-
return func(c *BatchClient) {
152+
return func(c *Client) {
158153
c.compressionType = compressionType
159154
}
160155
}
161156

162157
func WithMessageBuffer(messageBufferSize int) Opt {
163-
return func(c *BatchClient) {
158+
return func(c *Client) {
164159
c.messageBuffer = make(chan *chipingress.CloudEventPb, messageBufferSize)
165160
}
166161
}

pkg/chipingress/batch/client_test.go

Lines changed: 6 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,15 @@ import (
66
"testing"
77
"time"
88

9-
"github.com/smartcontractkit/chainlink-common/pkg/chipingress"
10-
"github.com/smartcontractkit/chainlink-common/pkg/chipingress/mocks"
119
"github.com/stretchr/testify/assert"
1210
"github.com/stretchr/testify/mock"
1311
"github.com/stretchr/testify/require"
12+
13+
"github.com/smartcontractkit/chainlink-common/pkg/chipingress"
14+
"github.com/smartcontractkit/chainlink-common/pkg/chipingress/mocks"
1415
)
1516

1617
func TestNewBatchClient(t *testing.T) {
17-
1818
t.Run("NewBatchClient", func(t *testing.T) {
1919
client, err := NewBatchClient(nil)
2020
require.NoError(t, err)
@@ -53,9 +53,7 @@ func TestNewBatchClient(t *testing.T) {
5353
}
5454

5555
func TestQueueMessage(t *testing.T) {
56-
5756
t.Run("successfully queues a message", func(t *testing.T) {
58-
5957
client, err := NewBatchClient(nil, WithMessageBuffer(5))
6058
require.NoError(t, err)
6159

@@ -67,7 +65,7 @@ func TestQueueMessage(t *testing.T) {
6765

6866
client.QueueMessage(event)
6967

70-
assert.Equal(t, 1, len(client.messageBuffer))
68+
assert.Len(t, client.messageBuffer, 1)
7169

7270
received := <-client.messageBuffer
7371
assert.Equal(t, event.Id, received.Id)
@@ -76,7 +74,6 @@ func TestQueueMessage(t *testing.T) {
7674
})
7775

7876
t.Run("drops message if buffer is full", func(t *testing.T) {
79-
8077
client, err := NewBatchClient(nil, WithMessageBuffer(1))
8178
require.NoError(t, err)
8279
require.NotNil(t, client)
@@ -90,7 +87,7 @@ func TestQueueMessage(t *testing.T) {
9087
client.QueueMessage(event)
9188
client.QueueMessage(event)
9289

93-
assert.Equal(t, 1, len(client.messageBuffer))
90+
assert.Len(t, client.messageBuffer, 1)
9491

9592
received := <-client.messageBuffer
9693
assert.Equal(t, event.Id, received.Id)
@@ -103,14 +100,12 @@ func TestQueueMessage(t *testing.T) {
103100
require.NoError(t, err)
104101

105102
client.QueueMessage(nil)
106-
assert.Equal(t, 0, len(client.messageBuffer))
103+
assert.Empty(t, client.messageBuffer)
107104
})
108105
}
109106

110107
func TestSendBatch(t *testing.T) {
111-
112108
t.Run("successfully sends a batch", func(t *testing.T) {
113-
114109
mockClient := mocks.NewClient(t)
115110
done := make(chan struct{})
116111

@@ -150,7 +145,6 @@ func TestSendBatch(t *testing.T) {
150145
})
151146

152147
t.Run("doesn't publish empty batch", func(t *testing.T) {
153-
154148
mockClient := mocks.NewClient(t)
155149

156150
client, err := NewBatchClient(mockClient, WithMessageBuffer(5))
@@ -208,7 +202,6 @@ func TestSendBatch(t *testing.T) {
208202
}
209203

210204
func TestStart(t *testing.T) {
211-
212205
t.Run("batch size trigger", func(t *testing.T) {
213206
mockClient := mocks.NewClient(t)
214207
done := make(chan struct{})
@@ -366,7 +359,6 @@ func TestStart(t *testing.T) {
366359
})
367360

368361
t.Run("no flush when batch is empty", func(t *testing.T) {
369-
370362
mockClient := mocks.NewClient(t)
371363

372364
client, err := NewBatchClient(mockClient, WithBatchSize(10), WithBatchTimeout(5*time.Second))
@@ -385,7 +377,6 @@ func TestStart(t *testing.T) {
385377
})
386378

387379
t.Run("multiple batches via size trigger", func(t *testing.T) {
388-
389380
mockClient := mocks.NewClient(t)
390381
done := make(chan struct{})
391382
callCount := 0

pkg/chipingress/go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,4 +36,5 @@ require (
3636
gopkg.in/yaml.v3 v3.0.1 // indirect
3737
)
3838

39+
// Retracted due to initial development versions
3940
retract [v1.0.0, v1.0.1]

0 commit comments

Comments
 (0)