Skip to content

Commit 1b94998

Browse files
authored
fix: BatchWriter: Ensure flushed batch has at least one record (#2489)
#### Summary Ensure only flushing batches that have more than 0 records. Issue originally was discovered in cloudquery/cloudquery#22750
1 parent 16dc4cd commit 1b94998

2 files changed

Lines changed: 54 additions & 2 deletions

File tree

writers/batchwriter/batchwriter.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -161,8 +161,9 @@ func (w *BatchWriter) worker(ctx context.Context, tableName string, ch <-chan *m
161161
limit.AddSlice(add)
162162
}
163163
if len(toFlush) > 0 || rest != nil || limit.ReachedLimit() {
164-
// flush current batch
165-
send()
164+
if limit.Rows() > 0 {
165+
send()
166+
}
166167
ticker.Reset(w.batchTimeout)
167168
}
168169
for _, sliceToFlush := range toFlush {

writers/batchwriter/batchwriter_test.go

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,57 @@ func TestBatchUpserts(t *testing.T) {
233233
}
234234
}
235235

236+
// noEmptyBatchClient wraps testBatchClient and fails the test if WriteTableBatch
237+
// is called with an empty messages slice.
238+
type noEmptyBatchClient struct {
239+
testBatchClient
240+
t *testing.T
241+
}
242+
243+
func (c *noEmptyBatchClient) WriteTableBatch(ctx context.Context, name string, messages message.WriteInserts) error {
244+
if len(messages) == 0 {
245+
// Use t.Error (not t.Fatal) because this may be called from a worker goroutine.
246+
c.t.Error("WriteTableBatch called with empty messages slice")
247+
return nil
248+
}
249+
return c.testBatchClient.WriteTableBatch(ctx, name, messages)
250+
}
251+
252+
// TestBatchNoEmptyFlush is a regression test ensuring WriteTableBatch is never called with
253+
// an empty messages slice. This can happen when batchSizeBytes is so small that no row fits
254+
// in the initial batch (SliceRecord returns add==nil while toFlush is non-empty), which
255+
// previously caused send() to call WriteTableBatch with an empty resources slice.
256+
func TestBatchNoEmptyFlush(t *testing.T) {
257+
ctx := context.Background()
258+
259+
testClient := &noEmptyBatchClient{t: t}
260+
// batchSizeBytes=1 ensures that no single row fits in the initial batch:
261+
// SliceRecord returns add==nil, toFlush=[one record per row], rest=nil.
262+
wr, err := New(testClient, WithBatchSizeBytes(1))
263+
if err != nil {
264+
t.Fatal(err)
265+
}
266+
267+
table := schema.Table{Name: "table1", Columns: []schema.Column{{Name: "id", Type: arrow.PrimitiveTypes.Int64}}}
268+
const numRows = 5
269+
record := getRecord(table.ToArrowSchema(), numRows)
270+
if err := wr.writeAll(ctx, []message.WriteMessage{&message.WriteInsert{Record: record}}); err != nil {
271+
t.Fatal(err)
272+
}
273+
274+
if err := wr.Flush(ctx); err != nil {
275+
t.Fatal(err)
276+
}
277+
if err := wr.Close(ctx); err != nil {
278+
t.Fatal(err)
279+
}
280+
281+
// All rows must have been written via at least one non-empty batch.
282+
if testClient.InsertsLen() == 0 {
283+
t.Fatalf("expected at least 1 insert message, got 0")
284+
}
285+
}
286+
236287
func getRecord(sc *arrow.Schema, rows int) arrow.RecordBatch {
237288
builder := array.NewRecordBuilder(memory.DefaultAllocator, sc)
238289
defer builder.Release()

0 commit comments

Comments
 (0)