Skip to content

Commit 8364da7

Browse files
Handle empty batches in Python worker counters (#38748)
* Handle empty batches in Python worker counters * Apply suggestion from @gemini-code-assist[bot] Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> --------- Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
1 parent 8ac0348 commit 8364da7

2 files changed

Lines changed: 18 additions & 0 deletions

File tree

sdks/python/apache_beam/runners/worker/opcounters.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,8 @@ def update_from_batch(self, windowed_batch):
217217
batch_length = self.producer_batch_converter.get_length(
218218
windowed_batch.values)
219219
self.element_counter.update(batch_length)
220+
if batch_length == 0:
221+
return
220222

221223
mean_element_size = self.producer_batch_converter.estimate_byte_size(
222224
windowed_batch.values) / batch_length

sdks/python/apache_beam/runners/worker/opcounters_test.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,22 @@ def test_update_batch(self):
184184

185185
self.verify_counters(opcounts, 200, size_per_element)
186186

187+
def test_update_empty_batch(self):
188+
opcounts = OperationCounters(
189+
CounterFactory(),
190+
'some-name',
191+
coders.FastPrimitivesCoder(),
192+
0,
193+
producer_batch_converter=typehints.batch.BatchConverter.from_typehints(
194+
element_type=typehints.Any,
195+
batch_type=typehints.List[typehints.Any]))
196+
197+
self.verify_counters(opcounts, 0, math.nan)
198+
199+
opcounts.update_from_batch(GlobalWindows.windowed_batch([]))
200+
201+
self.verify_counters(opcounts, 0, math.nan)
202+
187203
def test_should_sample(self):
188204
# Order of magnitude more buckets than highest constant in code under test.
189205
buckets = [0] * 300

0 commit comments

Comments
 (0)