Skip to content

Commit 19b982d

Browse files
twmbclaude
andauthored
public/service: expose collapsed count helpers for plugin developers (#396)
* public/service: expose collapsed count helpers for plugin developers Adds Message.WithCollapsedCount and MessageBatch.CollapsedCount methods to the public service package so that plugin developers can properly set collapsed counts for output metrics when implementing batch-collapsing processors like archive. Closes redpanda-data/connect#1877 Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * public/service: add test for CollapsedCount helpers Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * public/service: clarify WithCollapsedCount accumulation semantics in doc Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 5a065ba commit 19b982d

2 files changed

Lines changed: 49 additions & 0 deletions

File tree

public/service/message.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"context"
77
"errors"
88

9+
"github.com/redpanda-data/benthos/v4/internal/batch"
910
"github.com/redpanda-data/benthos/v4/internal/bloblang/mapping"
1011
"github.com/redpanda-data/benthos/v4/internal/bloblang/query"
1112
"github.com/redpanda-data/benthos/v4/internal/message"
@@ -215,6 +216,32 @@ func (m *Message) WithContext(ctx context.Context) *Message {
215216
}
216217
}
217218

219+
// WithCollapsedCount returns a new message indicating that it is the result of
220+
// collapsing count messages into one. The count is accumulated: if a message
221+
// already has a collapsed count of 3 and WithCollapsedCount(2) is called, the
222+
// result has a collapsed count of 4 (the existing count plus count-1 to avoid
223+
// double-counting the message itself). The count parameter must be >= 1.
224+
//
225+
// This allows downstream components to know how many total messages were
226+
// combined, which is important for accurate output metrics (e.g. output_sent).
227+
// This is useful when implementing processors that combine multiple messages
228+
// into one (such as archive).
229+
func (m *Message) WithCollapsedCount(count int) *Message {
230+
ctx := batch.CtxWithCollapsedCount(m.Context(), count)
231+
return m.WithContext(ctx)
232+
}
233+
234+
// CollapsedCount returns the actual number of messages that were collapsed into
235+
// the resulting message batch. This value could differ from len(batch) when
236+
// processors that archive batched message parts have been applied.
237+
func (b MessageBatch) CollapsedCount() int {
238+
total := 0
239+
for _, m := range b {
240+
total += batch.CtxCollapsedCount(m.Context())
241+
}
242+
return total
243+
}
244+
218245
// AsBytes returns the underlying byte array contents of a message or, if the
219246
// contents are a structured type, attempts to marshal the contents as a JSON
220247
// document and returns either the byte array result or an error.

public/service/message_test.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1083,3 +1083,25 @@ func TestSyncResponseBatched(t *testing.T) {
10831083
assert.Equal(t, c, string(data))
10841084
}
10851085
}
1086+
1087+
func TestMessageWithCollapsedCount(t *testing.T) {
1088+
m1 := NewMessage([]byte("foo"))
1089+
1090+
// Default collapsed count is 1
1091+
b1 := MessageBatch{m1}
1092+
assert.Equal(t, 1, b1.CollapsedCount())
1093+
1094+
// Setting collapsed count to 3
1095+
m2 := m1.WithCollapsedCount(3)
1096+
b2 := MessageBatch{m2}
1097+
assert.Equal(t, 3, b2.CollapsedCount())
1098+
1099+
// Chaining collapsed counts accumulates
1100+
m3 := m2.WithCollapsedCount(2)
1101+
b3 := MessageBatch{m3}
1102+
assert.Equal(t, 4, b3.CollapsedCount())
1103+
1104+
// Multiple messages in a batch sum their collapsed counts
1105+
bAll := MessageBatch{m1, m2, m3}
1106+
assert.Equal(t, 8, bAll.CollapsedCount())
1107+
}

0 commit comments

Comments
 (0)