Skip to content

Commit 01f41ee

Browse files
authored
fix(kafka/sarama_broker): drain async producer Successes and Errors (#72) (#120)
The async Sarama producer was configured with Return.Successes=true and Return.Errors=true but no goroutine drained those channels, so they filled up and the producer blocked indefinitely. Two drainer goroutines now consume both channels for the producer's lifetime. Errors are logged and counted via the existing arcade_kafka_produce_errors_total metric. Closes F-014.
1 parent 63ab3fd commit 01f41ee

2 files changed

Lines changed: 266 additions & 5 deletions

File tree

kafka/sarama_broker.go

Lines changed: 92 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,23 +4,49 @@ import (
44
"context"
55
"errors"
66
"fmt"
7+
"sync"
78

89
"github.com/IBM/sarama"
10+
"go.uber.org/zap"
11+
12+
"github.com/bsv-blockchain/arcade/metrics"
913
)
1014

1115
// saramaBroker is the production Broker backed by IBM Sarama. It owns both a
1216
// sync and async producer so Send/SendAsync/SendBatch can pick the appropriate
1317
// one without the caller caring.
18+
//
19+
// The async producer is configured with Return.Successes=true and
20+
// Return.Errors=true. Sarama routes every produced message's outcome onto
21+
// those channels, and if no goroutine drains them they fill up and the
22+
// producer blocks indefinitely on Input(). To keep SendAsync non-blocking,
23+
// the broker spawns two drain goroutines for the producer's lifetime: one
24+
// discards successes (the SendAsync caller already counted the produce in
25+
// the Producer wrapper) and one logs/counts errors via metrics.
26+
// Close() waits for both drainers to exit after closing the underlying
27+
// async producer, which closes both channels in turn.
1428
type saramaBroker struct {
1529
syncProducer sarama.SyncProducer
1630
asyncProducer sarama.AsyncProducer
1731
brokers []string
1832
consumerGroup string
33+
34+
logger *zap.Logger
35+
drainersWG sync.WaitGroup
1936
}
2037

2138
// NewSaramaBroker constructs a Sarama-backed Broker with sensible defaults
22-
// (WaitForAll on sync, WaitForLocal on async, 5 retries).
39+
// (WaitForAll on sync, WaitForLocal on async, 5 retries). Errors from the
40+
// async producer are logged via the package-global zap logger; callers that
41+
// want a custom logger should use NewSaramaBrokerWithLogger.
2342
func NewSaramaBroker(brokers []string, consumerGroup string) (Broker, error) {
43+
return NewSaramaBrokerWithLogger(brokers, consumerGroup, nil)
44+
}
45+
46+
// NewSaramaBrokerWithLogger is like NewSaramaBroker but lets callers inject a
47+
// zap logger for async-producer error logging. A nil logger falls back to
48+
// zap.NewNop() so the broker is always safe to construct.
49+
func NewSaramaBrokerWithLogger(brokers []string, consumerGroup string, logger *zap.Logger) (Broker, error) {
2450
syncCfg := sarama.NewConfig()
2551
syncCfg.Producer.RequiredAcks = sarama.WaitForAll
2652
syncCfg.Producer.Retry.Max = 5
@@ -44,12 +70,69 @@ func NewSaramaBroker(brokers []string, consumerGroup string) (Broker, error) {
4470
return nil, fmt.Errorf("creating async producer: %w", err)
4571
}
4672

47-
return &saramaBroker{
48-
syncProducer: syncProducer,
49-
asyncProducer: asyncProducer,
73+
return newSaramaBrokerFromProducers(syncProducer, asyncProducer, brokers, consumerGroup, logger), nil
74+
}
75+
76+
// newSaramaBrokerFromProducers wires the broker around already-constructed
77+
// sync and async producers. Extracted so tests can substitute Sarama mocks
78+
// without standing up a real Kafka cluster. It also starts the async-producer
79+
// drainer goroutines, which is the only place those should be spawned —
80+
// duplicating that elsewhere would race for ownership of Successes/Errors.
81+
func newSaramaBrokerFromProducers(
82+
sync sarama.SyncProducer,
83+
async sarama.AsyncProducer,
84+
brokers []string,
85+
consumerGroup string,
86+
logger *zap.Logger,
87+
) *saramaBroker {
88+
if logger == nil {
89+
logger = zap.NewNop()
90+
}
91+
b := &saramaBroker{
92+
syncProducer: sync,
93+
asyncProducer: async,
5094
brokers: brokers,
5195
consumerGroup: consumerGroup,
52-
}, nil
96+
logger: logger,
97+
}
98+
b.startAsyncDrainers()
99+
return b
100+
}
101+
102+
// startAsyncDrainers spawns the two goroutines that consume the async
103+
// producer's Successes and Errors channels for the producer's lifetime.
104+
// They return when the underlying channels close, which Sarama does as
105+
// part of asyncProducer.Close(). Close() then waits on drainersWG so the
106+
// broker does not return from Close until both goroutines have exited —
107+
// otherwise a test or a process restart could observe partial shutdown.
108+
func (b *saramaBroker) startAsyncDrainers() {
109+
b.drainersWG.Add(2)
110+
go func() {
111+
defer b.drainersWG.Done()
112+
// Successes are already accounted for by Producer.SendAsync at
113+
// enqueue time, so we just discard them here. Draining is the
114+
// whole point — a full Successes channel blocks Input().
115+
successes := b.asyncProducer.Successes()
116+
for {
117+
if _, ok := <-successes; !ok {
118+
return
119+
}
120+
}
121+
}()
122+
go func() {
123+
defer b.drainersWG.Done()
124+
for produceErr := range b.asyncProducer.Errors() {
125+
topic := ""
126+
if produceErr.Msg != nil {
127+
topic = produceErr.Msg.Topic
128+
}
129+
metrics.KafkaProduceErrors.WithLabelValues(topic).Inc()
130+
b.logger.Error("async kafka produce failed",
131+
zap.String("topic", topic),
132+
zap.Error(produceErr.Err),
133+
)
134+
}
135+
}()
53136
}
54137

55138
func (b *saramaBroker) Send(_ context.Context, topic, key string, value []byte) error {
@@ -137,9 +220,13 @@ func (b *saramaBroker) Close() error {
137220
if err := b.syncProducer.Close(); err != nil {
138221
errs = append(errs, err)
139222
}
223+
// Closing the async producer closes its Successes/Errors channels once
224+
// in-flight messages have been flushed, which is what unblocks the
225+
// drain goroutines below.
140226
if err := b.asyncProducer.Close(); err != nil {
141227
errs = append(errs, err)
142228
}
229+
b.drainersWG.Wait()
143230
if len(errs) > 0 {
144231
return fmt.Errorf("closing producers: %v", errs)
145232
}

kafka/sarama_broker_test.go

Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
package kafka
2+
3+
import (
4+
"context"
5+
"errors"
6+
"sync/atomic"
7+
"testing"
8+
"time"
9+
10+
"github.com/IBM/sarama"
11+
"github.com/IBM/sarama/mocks"
12+
"go.uber.org/zap"
13+
"go.uber.org/zap/zaptest"
14+
)
15+
16+
// fakeSyncProducer satisfies sarama.SyncProducer so the saramaBroker can be
17+
// constructed without a real Kafka cluster. The async-channel-drainer fix
18+
// only exercises the async producer side, but Close() drives both producers.
19+
type fakeSyncProducer struct {
20+
closed atomic.Bool
21+
}
22+
23+
func (f *fakeSyncProducer) SendMessage(*sarama.ProducerMessage) (int32, int64, error) {
24+
return 0, 0, nil
25+
}
26+
27+
func (f *fakeSyncProducer) SendMessages([]*sarama.ProducerMessage) error { return nil }
28+
29+
func (f *fakeSyncProducer) Close() error { f.closed.Store(true); return nil }
30+
31+
func (f *fakeSyncProducer) AbortTxn() error { return nil }
32+
33+
func (f *fakeSyncProducer) AddMessageToTxn(*sarama.ConsumerMessage, string, *string) error {
34+
return nil
35+
}
36+
37+
func (f *fakeSyncProducer) AddOffsetsToTxn(map[string][]*sarama.PartitionOffsetMetadata, string) error {
38+
return nil
39+
}
40+
41+
func (f *fakeSyncProducer) BeginTxn() error { return nil }
42+
43+
func (f *fakeSyncProducer) CommitTxn() error { return nil }
44+
45+
func (f *fakeSyncProducer) IsTransactional() bool { return false }
46+
47+
func (f *fakeSyncProducer) TxnStatus() sarama.ProducerTxnStatusFlag { return 0 }
48+
49+
func (f *fakeSyncProducer) TxnAddOffsetsToTxn(map[string][]*sarama.PartitionOffsetMetadata, string) error {
50+
return nil
51+
}
52+
53+
// newAsyncProducerMock builds a Sarama mock async producer with the same
54+
// Return.Successes/Return.Errors flags the production broker uses, so the
55+
// drainer goroutines see the same channel behavior they would in a real
56+
// deployment.
57+
func newAsyncProducerMock(t *testing.T) *mocks.AsyncProducer {
58+
t.Helper()
59+
cfg := sarama.NewConfig()
60+
cfg.Producer.Return.Successes = true
61+
cfg.Producer.Return.Errors = true
62+
return mocks.NewAsyncProducer(t, cfg)
63+
}
64+
65+
// TestSaramaBroker_AsyncDrainer_DoesNotBlockOnSuccesses sends N messages
66+
// through SendAsync. Without a Successes drainer the mock's success channel
67+
// (capacity = ChannelBufferSize, default 256) would fill and the next send
68+
// would block on Input(). N is chosen well above that buffer to make the
69+
// regression mode unambiguous.
70+
func TestSaramaBroker_AsyncDrainer_DoesNotBlockOnSuccesses(t *testing.T) {
71+
mp := newAsyncProducerMock(t)
72+
const n = 1024
73+
for range n {
74+
mp.ExpectInputAndSucceed()
75+
}
76+
77+
b := newSaramaBrokerFromProducers(&fakeSyncProducer{}, mp, nil, "", zaptest.NewLogger(t))
78+
79+
done := make(chan error, 1)
80+
go func() {
81+
ctx := context.Background()
82+
for i := 0; i < n; i++ {
83+
if err := b.SendAsync(ctx, "tx.validated", "k", []byte("v")); err != nil {
84+
done <- err
85+
return
86+
}
87+
}
88+
done <- nil
89+
}()
90+
91+
select {
92+
case err := <-done:
93+
if err != nil {
94+
t.Fatalf("SendAsync returned error: %v", err)
95+
}
96+
case <-time.After(5 * time.Second):
97+
t.Fatal("SendAsync blocked — Successes channel was not drained")
98+
}
99+
100+
if err := b.Close(); err != nil {
101+
t.Fatalf("Close returned error: %v", err)
102+
}
103+
}
104+
105+
// TestSaramaBroker_AsyncDrainer_HandlesErrors verifies that messages routed
106+
// to the Errors channel are drained (not blocking the producer) and logged
107+
// without panicking. The mock fails every input, so without an Errors
108+
// drainer the channel would fill and SendAsync would deadlock.
109+
func TestSaramaBroker_AsyncDrainer_HandlesErrors(t *testing.T) {
110+
mp := newAsyncProducerMock(t)
111+
const n = 512
112+
produceErr := errors.New("simulated produce failure")
113+
for range n {
114+
mp.ExpectInputAndFail(produceErr)
115+
}
116+
117+
b := newSaramaBrokerFromProducers(&fakeSyncProducer{}, mp, nil, "", zaptest.NewLogger(t))
118+
119+
done := make(chan error, 1)
120+
go func() {
121+
ctx := context.Background()
122+
for i := 0; i < n; i++ {
123+
if err := b.SendAsync(ctx, "tx.validated", "k", []byte("v")); err != nil {
124+
done <- err
125+
return
126+
}
127+
}
128+
done <- nil
129+
}()
130+
131+
select {
132+
case err := <-done:
133+
if err != nil {
134+
t.Fatalf("SendAsync returned error: %v", err)
135+
}
136+
case <-time.After(5 * time.Second):
137+
t.Fatal("SendAsync blocked — Errors channel was not drained")
138+
}
139+
140+
if err := b.Close(); err != nil {
141+
t.Fatalf("Close returned error: %v", err)
142+
}
143+
}
144+
145+
// TestSaramaBroker_Close_WaitsForDrainers asserts that Close() blocks until
146+
// the drainer goroutines exit. After Close returns, no goroutine should
147+
// still be reading from Successes/Errors, which we approximate by checking
148+
// that the broker's WaitGroup has zero counter (drainersWG.Wait() returns
149+
// immediately on a second call).
150+
func TestSaramaBroker_Close_WaitsForDrainers(t *testing.T) {
151+
mp := newAsyncProducerMock(t)
152+
mp.ExpectInputAndSucceed()
153+
154+
b := newSaramaBrokerFromProducers(&fakeSyncProducer{}, mp, nil, "", zap.NewNop())
155+
156+
if err := b.SendAsync(context.Background(), "t", "k", []byte("v")); err != nil {
157+
t.Fatalf("SendAsync: %v", err)
158+
}
159+
if err := b.Close(); err != nil {
160+
t.Fatalf("Close: %v", err)
161+
}
162+
163+
// A second Wait must be a no-op; if it blocks the drainers leaked.
164+
waitDone := make(chan struct{})
165+
go func() {
166+
b.drainersWG.Wait()
167+
close(waitDone)
168+
}()
169+
select {
170+
case <-waitDone:
171+
case <-time.After(time.Second):
172+
t.Fatal("drainer goroutines leaked past Close()")
173+
}
174+
}

0 commit comments

Comments
 (0)