Skip to content

Commit 9fae93b

Browse files
authored
Merge branch 'main' into rtinianov_teeAndRestrictions
2 parents 805f05b + fff78a2 commit 9fae93b

2 files changed

Lines changed: 10 additions & 9 deletions

File tree

pkg/durableemitter/durable_emitter.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -124,10 +124,10 @@ func DefaultConfig() Config {
124124
PublishTimeout: 5 * time.Second,
125125
PurgeInterval: 250 * time.Millisecond,
126126
PurgeBatchSize: 500,
127-
InsertBatchFlushInterval: 100 * time.Millisecond,
127+
InsertBatchFlushInterval: 500 * time.Millisecond,
128128
InsertBatchSize: 100,
129129
MarkBatchSize: 100,
130-
MarkBatchFlushInterval: 100 * time.Millisecond,
130+
MarkBatchFlushInterval: 500 * time.Millisecond,
131131
MarkBatchWorkers: 2,
132132
// Metrics is opt-in: callers who want instrumentation must set this
133133
// and pass a metric.Meter to NewDurableEmitter.
@@ -654,13 +654,13 @@ func (d *DurableEmitter) insertBatchLoop() {
654654
for i, r := range batch {
655655
payloads[i] = r.payload
656656
}
657-
ctx, cancel := context.WithTimeout(context.Background(), d.cfg.PublishTimeout)
658-
ids, batchErr := d.batchInserter.InsertBatch(ctx, payloads)
659-
cancel()
660-
if batchErr == nil {
661-
d.eng.Debugw("DurableEmitter: coalesced insert flushed", "count", len(payloads))
662-
}
663-
for i, r := range batch {
657+
ctx, cancel := context.WithTimeout(context.Background(), d.cfg.PublishTimeout)
658+
ids, batchErr := d.batchInserter.InsertBatch(ctx, payloads)
659+
cancel()
660+
if batchErr == nil {
661+
d.eng.Debugw("DurableEmitter: coalesced insert flushed", "count", len(payloads))
662+
}
663+
for i, r := range batch {
664664
if batchErr != nil {
665665
r.result <- insertResult{err: batchErr}
666666
} else {

pkg/durableemitter/durable_emitter_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,7 @@ func TestDurableEmitter_MarkCoalescingBatchesIds(t *testing.T) {
197197

198198
cfg := DefaultConfig()
199199
cfg.DisablePruning = true
200+
cfg.InsertBatchSize = 0 // disable insert coalescing so deliveries arrive in a burst
200201
cfg.MarkBatchSize = 100
201202
cfg.MarkBatchWorkers = 1
202203
cfg.MarkBatchFlushInterval = 200 * time.Millisecond

0 commit comments

Comments
 (0)