Skip to content

Unordered kafka reader: data loss on cooperative rebalance (MarkCommitRecords after revoke) #4010

@TayPark

Description

@TayPark

We found a data loss bug in the unordered kafka reader during cooperative-sticky rebalancing. This write-up covers the background, the exact mechanism, and a fix (PR #4009).

Background: Why we're here

We run ~20 Kafka-to-S3 pipelines. We were using Vector, but kept hitting two problems:

                      Vector (current)
                      ┌──────────────────────────────────────┐
  Kafka ──────────►   │  adaptive_concurrency                │
  (multi-partition)   │  ┌─────────┐     ┌──────────────┐    │
                      │  │ consume ├────►│ S3 sink      │    │   Problems:
                      │  └─────────┘     │              │    │   1. IRSA token expire → batch drop
                      │                  │ (token=IRSA) │    │   2. adaptive concurrency → OOM
                      │                  └──────────────┘    │
                      └──────────────────────────────────────┘
  1. S3 token expiry: Vector doesn't refresh IRSA tokens well. When the token expires mid-upload, the batch is dropped silently.
  2. OOM: The adaptive concurrency controller sometimes allocates more memory than expected under bursty load, crashing the pod.

So we started a PoC with Redpanda Connect:

                      Redpanda Connect (PoC)
                      ┌──────────────────────────────────────────────┐
  Kafka ──────────►   │  consumer_group + unordered_processing       │
  (multi-partition)   │  ┌─────────────────┐    ┌──────────────┐    │
                      │  │ per-partition    │    │ S3 sink      │    │
                      │  │ input batcher   ├───►│              │    │
                      │  │ (50MB or 1min)  │    │ deterministic│    │
                      │  └─────────────────┘    │ path naming  │    │
                      │                         └──────────────┘    │
                      └──────────────────────────────────────────────┘

Deterministic sink: the key design choice

We configured input batching per partition with deterministic S3 paths so that reprocessing the same offset range always writes to the same file:

input:
  redpanda:
    consumer_group: my-group
    unordered_processing:
      enabled: true
      checkpoint_limit: 1
      batching:
        byte_size: 52428800  # 50 MiB per partition
        period: "1m"

output:
  aws_s3:
    path: '${!meta("kafka_topic")}+${!meta("kafka_partition")}+${!meta("kafka_offset")}.jsonl.zst'
    max_in_flight: 10

This produces files like:

S3 path structure (one file = one batch = one partition):

  my-topic+0+00000042.jsonl.zst    ← partition 0, batch starting at offset 42
  my-topic+0+00001387.jsonl.zst    ← partition 0, next batch at offset 1387
  my-topic+1+00000099.jsonl.zst    ← partition 1, batch starting at offset 99
  ...

  If the same batch is reprocessed, it overwrites the same file → idempotent.

The goal: at-least-once delivery with idempotent writes. A rebalance should either commit the offset after the sink confirms, or not commit it at all (so the new owner reprocesses and overwrites the same file).

The problem: rebalance drops a whole batch

During rebalance testing (HPA scale-up triggering cooperative-sticky rebalance), we observed:

Partition 31 — S3 files and committed offsets:

  ...
  my-topic+31+00045291.jsonl.zst   ← last file before rebalance
                                      (contains offsets 45291..46603)

  ┌──────────────────────────────────────────────────────────────┐
  │  OFFSET GAP: 46604 .. 47891  (1,288 messages)               │
  │                                                              │
  │  committed offset = 47892                                    │
  │  no S3 file exists for this range                            │
  │  these messages are permanently lost                         │
  └──────────────────────────────────────────────────────────────┘

  my-topic+31+00047892.jsonl.zst   ← first file after rebalance
                                      (new consumer starts here)

All 7 revoked partitions showed the same pattern. Per-partition loss was 1,300–1,600 contiguous messages (= one batcher window). Total: 10,433 messages.

Root cause

File: internal/impl/kafka/franz_reader_unordered.go

The race happens because OnPartitionsRevoked returns before in-flight batches complete, and nothing prevents MarkCommitRecords from being called on revoked partitions afterward.

 Timeline         Consumer A                         Kafka broker
 ────────────────────────────────────────────────────────────────────

 T0  normal        sendBatch()
                     Track(r)               ← checkpointer tracks batch
                     batchChan ← batch      ← batch enters pipeline
                     pipeline processing... ← S3 upload in progress

 T1  rebalance     OnPartitionsRevoked(rctx, revoked=[partition 0]):
                     CommitMarkedOffsets()   ← commits what's marked so far
                                               (in-flight batch NOT marked yet)
                     removeTopicPartitions()
                       close(rctx)          ← SoftStop loop, batcher.Close()
                       delete from map      ← partitionTracker gone
                     return                 ─────► broker proceeds with rebalance
                                                    assigns partition 0 to Consumer B

 T2  pipeline      ...still running...
     completes     onAck()
                     releaseFn()            ← checkpointer releases
                     commitFn(record)
                       MarkCommitRecords(r) ← franz-go: NO ownership check!
                                               re-inserts into g.uncommitted

 T3  auto-commit   AutoCommitMarks tick    ─────► broker accepts commit
                     commits offset 47892           (no ownership validation
                     for partition 0                 within session epoch)

 T4  Consumer B    starts at offset 47892  ─────► offsets 46604..47891 = LOST
                   (never sees the gap)

The core issue is at T2→T3. franz-go's MarkCommitRecords doesn't validate partition ownership. It unconditionally writes to g.uncommitted. After revoke() deletes the partition from g.uncommitted, a late MarkCommitRecords call re-inserts it. The next loopCommit tick commits it.

Secondary issue: batcher silent discard

 loop() receives SoftStop signal:

   Before fix                          After fix
   ─────────────────────────           ─────────────────────────
   case <-SoftStopChan():              case <-SoftStopChan():
     return  ← batcher has               batcher.Flush()  ← drain
              3 buffered msgs             sendBatch()      ← deliver
              they are gone               return

When the batcher hasn't hit its count/byte threshold yet (only the period timer would flush it), SoftStop discards whatever is buffered. This is the other half of the data loss.

Fix (PR #4009)

Two changes:

1. Block MarkCommitRecords for revoked partitions

// checkpointTracker gains a revoked set:

type checkpointTracker struct {
    mut     sync.Mutex
    topics  map[string]map[int32]*partitionTracker
    revoked map[string]map[int32]struct{}       // ← NEW
    ...
}

// removeTopicPartitions marks before deleting:

    revoked[topic][partition] = struct{}{}       // mark
    delete(trackedTopic, partition)              // then delete

// commitFn skips revoked partitions:

    commitFn = func(r *kgo.Record) {
        if checkpoints.isRevoked(r.Topic, r.Partition) {
            return                                // ← skip
        }
        cl.MarkCommitRecords(r)
    }

// OnPartitionsAssigned clears the revoked set:

    kgo.OnPartitionsAssigned(func(_ context.Context, _ *kgo.Client, m map[string][]int32) {
        checkpoints.clearRevoked(m)              // ← reset for re-assigned partitions
    })

2. Flush batcher on SoftStop

// In loop(), before returning on SoftStop:

    case <-p.shutSig.SoftStopChan():
        if p.batcher != nil {
            p.batcherLock.Lock()
            if batch, _ := p.batcher.Flush(context.Background()); len(batch) > 0 {
                record := p.topBatchRecord
                p.topBatchRecord = nil
                p.batcherLock.Unlock()
                _ = p.sendBatch(context.Background(), batch, record)
            } else {
                p.batcherLock.Unlock()
            }
        }
        return

How they work together

 With the fix applied:

 T0  sendBatch → Track(r) → pipeline processing...

 T1  OnPartitionsRevoked:
       CommitMarkedOffsets()
       removeTopicPartitions()
         SoftStop → batcher.Flush() → sendBatch()  ← buffered msgs delivered
         revoked[partition] = marked                ← partition flagged
         delete from map

 T2  pipeline completes → onAck() → commitFn(r)
       isRevoked(r.Topic, r.Partition) == true
       return (skip MarkCommitRecords)              ← offset NOT advanced

 T3  Consumer B starts at last committed offset
       reprocesses the same range
       writes to same S3 path (deterministic)       ← idempotent, no loss

Version / Environment

  • Redpanda Connect: verified against main at v4.81.0
  • Consumer group protocol: cooperative-sticky
  • Batching: input batching (byte_size or period)
  • Sink: S3 with non-trivial latency

Suggested labels

bug, kafka, inputs

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions