From 9ff254e2524b9ad98c476a3a1b37a71b4683ab09 Mon Sep 17 00:00:00 2001 From: Ganeshsivakumar Date: Tue, 12 May 2026 13:04:29 +0530 Subject: [PATCH 1/6] update transform catalogue --- .../beam/examples/BatchElementsExample.java | 83 +++++++++++++++++++ .../java/aggregation/batchelements.md | 30 +++++++ .../documentation/transforms/java/overview.md | 1 + 3 files changed, 114 insertions(+) create mode 100644 examples/java/src/main/java/org/apache/beam/examples/BatchElementsExample.java create mode 100644 website/www/site/content/en/documentation/transforms/java/aggregation/batchelements.md diff --git a/examples/java/src/main/java/org/apache/beam/examples/BatchElementsExample.java b/examples/java/src/main/java/org/apache/beam/examples/BatchElementsExample.java new file mode 100644 index 000000000000..1af72889dc79 --- /dev/null +++ b/examples/java/src/main/java/org/apache/beam/examples/BatchElementsExample.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.examples; + +import java.util.List; + +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.transforms.BatchElements; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +// beam-playground: +// name: BatchElements +// description: Demonstration of BatchElements transform usage. +// multifile: false +// default_example: false +// context_line: 47 +// categories: +// - Core Transforms +// complexity: BASIC +// tags: +// - transforms +// - batch + +public class BatchElementsExample { + public static void main(String[] args) { + PipelineOptions options = PipelineOptionsFactory.create(); + + Pipeline pipeline = Pipeline.create(options); + + // [START main_section] + // Create input + + PCollection inputs = pipeline.apply(Create.of("apple", "strawberry", "orange", "peach", "cherry", "pear")); + + //Create Batch Config + BatchElements.BatchConfig config = BatchElements.BatchConfig.builder() + .withMinBatchSize(2) + .withMaxBatchSize(4) + .build(); + // Batch Elements + PCollection> result = inputs.apply(BatchElements.withConfig(config)); + // [END main_section] + result.apply(ParDo.of(new LogOutput())); + pipeline.run(); + } + + static class LogOutput extends DoFn, String> { + private static final Logger LOG = LoggerFactory.getLogger(LogOutput.class); + + @ProcessElement + public void processElement(ProcessContext c) throws Exception { + List batch = c.element(); + + LOG.info("Batch Contents: {}", batch); + + for (String element : batch) { + c.output(element); + } + } + } +} diff --git a/website/www/site/content/en/documentation/transforms/java/aggregation/batchelements.md b/website/www/site/content/en/documentation/transforms/java/aggregation/batchelements.md new file mode 100644 index 000000000000..6c1726d0d114 --- /dev/null +++ b/website/www/site/content/en/documentation/transforms/java/aggregation/batchelements.md @@ -0,0 +1,30 @@ +--- +title: "BatchElements" +--- + + +# BatchElements + +BatchElements transform groups individual elements into batches before processing them downstream. +The transform takes a `PCollection` as input and produces a `PCollection>`, where each output element is a batch containing multiple input elements. +Batch sizes are chosen dynamically between the configured minimum and maximum values by measuring the execution time of downstream operations. + +Batching is performed per window. Each emitted batch belongs to the same window as its input elements + +## Examples + +{{< playground height="700px" >}} +{{< playground_snippet language="java" path="SDK_JAVA_BatchElements" show="main_section" >}} +{{< /playground >}} diff --git a/website/www/site/content/en/documentation/transforms/java/overview.md b/website/www/site/content/en/documentation/transforms/java/overview.md index 59aa93930fbe..adb4da51a836 100644 --- a/website/www/site/content/en/documentation/transforms/java/overview.md +++ b/website/www/site/content/en/documentation/transforms/java/overview.md @@ -55,6 +55,7 @@ limitations under the License. GroupByKeyTakes a keyed collection of elements and produces a collection where each element consists of a key and all values associated with that key. GroupIntoBatchesBatches values associated with keys into Iterable batches of some size. Each batch contains elements associated with a specific key. + BatchElementsGroups individual elements into batches to amortize fixed processing costs, using dynamically estimated batch sizes. HllCountEstimates the number of distinct elements and creates re-aggregatable sketches using the HyperLogLog++ algorithm. LatestSelects the latest element within each aggregation according to the implicit timestamp. MaxOutputs the maximum element within each aggregation. From 05d4ef87da801e43cd622b38c427037753c45d06 Mon Sep 17 00:00:00 2001 From: Ganeshsivakumar Date: Tue, 12 May 2026 15:17:10 +0530 Subject: [PATCH 2/6] spotless --- .../beam/examples/BatchElementsExample.java | 54 +++++++++---------- 1 file changed, 26 insertions(+), 28 deletions(-) diff --git a/examples/java/src/main/java/org/apache/beam/examples/BatchElementsExample.java b/examples/java/src/main/java/org/apache/beam/examples/BatchElementsExample.java index 1af72889dc79..79130d7ac138 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/BatchElementsExample.java +++ b/examples/java/src/main/java/org/apache/beam/examples/BatchElementsExample.java @@ -18,7 +18,6 @@ package org.apache.beam.examples; import java.util.List; - import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; @@ -44,40 +43,39 @@ // - batch public class BatchElementsExample { - public static void main(String[] args) { - PipelineOptions options = PipelineOptionsFactory.create(); + public static void main(String[] args) { + PipelineOptions options = PipelineOptionsFactory.create(); - Pipeline pipeline = Pipeline.create(options); + Pipeline pipeline = Pipeline.create(options); - // [START main_section] - // Create input + // [START main_section] + // Create input - PCollection inputs = pipeline.apply(Create.of("apple", "strawberry", "orange", "peach", "cherry", "pear")); + PCollection inputs = + pipeline.apply(Create.of("apple", "strawberry", "orange", "peach", "cherry", "pear")); - //Create Batch Config - BatchElements.BatchConfig config = BatchElements.BatchConfig.builder() - .withMinBatchSize(2) - .withMaxBatchSize(4) - .build(); - // Batch Elements - PCollection> result = inputs.apply(BatchElements.withConfig(config)); - // [END main_section] - result.apply(ParDo.of(new LogOutput())); - pipeline.run(); - } + // Create Batch Config + BatchElements.BatchConfig config = + BatchElements.BatchConfig.builder().withMinBatchSize(2).withMaxBatchSize(4).build(); + // Batch Elements + PCollection> result = inputs.apply(BatchElements.withConfig(config)); + // [END main_section] + result.apply(ParDo.of(new LogOutput())); + pipeline.run(); + } - static class LogOutput extends DoFn, String> { - private static final Logger LOG = LoggerFactory.getLogger(LogOutput.class); + static class LogOutput extends DoFn, String> { + private static final Logger LOG = LoggerFactory.getLogger(LogOutput.class); - @ProcessElement - public void processElement(ProcessContext c) throws Exception { - List batch = c.element(); + @ProcessElement + public void processElement(ProcessContext c) throws Exception { + List batch = c.element(); - LOG.info("Batch Contents: {}", batch); + LOG.info("Batch Contents: {}", batch); - for (String element : batch) { - c.output(element); - } - } + for (String element : batch) { + c.output(element); + } } + } } From 480cc3c3a30e966585a0910bd2d9af8b5e48d70d Mon Sep 17 00:00:00 2001 From: Ganeshsivakumar Date: Tue, 12 May 2026 17:02:29 +0530 Subject: [PATCH 3/6] fix for window bug --- .../org/apache/beam/sdk/transforms/BatchElements.java | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/BatchElements.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/BatchElements.java index 35796d1b1385..d410c5be6007 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/BatchElements.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/BatchElements.java @@ -17,6 +17,8 @@ */ package org.apache.beam.sdk.transforms; +import static java.util.Collections.singleton; + import java.io.Serializable; import java.util.ArrayList; import java.util.Comparator; @@ -27,6 +29,7 @@ import javax.annotation.Nullable; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.WindowingStrategy; @@ -568,7 +571,11 @@ public void processElement( try (BatchSizeEstimator.Stopwatch sw = estimator.recordTime(targetBatch.size)) { - receiver.outputWithTimestamp(targetBatch.elements, targetWindow.maxTimestamp()); + receiver.outputWindowedValue( + targetBatch.elements, + targetWindow.maxTimestamp(), + singleton(targetWindow), + PaneInfo.NO_FIRING); } batches.remove(targetWindow); From 08efcd424259f1fb6733c5f16dd2c5d75cd768ca Mon Sep 17 00:00:00 2001 From: Ganesh Sivakumar Date: Tue, 12 May 2026 17:29:57 +0530 Subject: [PATCH 4/6] Update website/www/site/content/en/documentation/transforms/java/aggregation/batchelements.md Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> --- .../documentation/transforms/java/aggregation/batchelements.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/website/www/site/content/en/documentation/transforms/java/aggregation/batchelements.md b/website/www/site/content/en/documentation/transforms/java/aggregation/batchelements.md index 6c1726d0d114..aff952164d1d 100644 --- a/website/www/site/content/en/documentation/transforms/java/aggregation/batchelements.md +++ b/website/www/site/content/en/documentation/transforms/java/aggregation/batchelements.md @@ -21,7 +21,7 @@ BatchElements transform groups individual elements into batches before processin The transform takes a `PCollection` as input and produces a `PCollection>`, where each output element is a batch containing multiple input elements. Batch sizes are chosen dynamically between the configured minimum and maximum values by measuring the execution time of downstream operations. -Batching is performed per window. Each emitted batch belongs to the same window as its input elements +Batching is performed per window. Each emitted batch belongs to the same window as its input elements. ## Examples From af1aa24a306e0b7c5b2180063045406bc5ae46ce Mon Sep 17 00:00:00 2001 From: Ganeshsivakumar Date: Tue, 12 May 2026 19:17:46 +0530 Subject: [PATCH 5/6] update website sidebar --- .../www/site/layouts/partials/section-menu/en/documentation.html | 1 + 1 file changed, 1 insertion(+) diff --git a/website/www/site/layouts/partials/section-menu/en/documentation.html b/website/www/site/layouts/partials/section-menu/en/documentation.html index 57514935825e..41100270d827 100755 --- a/website/www/site/layouts/partials/section-menu/en/documentation.html +++ b/website/www/site/layouts/partials/section-menu/en/documentation.html @@ -404,6 +404,7 @@
  • Distinct
  • GroupByKey
  • GroupIntoBatches
  • +
  • BatchElements
  • HllCount
  • Latest
  • Max
  • From d8df95d488bd9ceadb3425413236bbc104404be1 Mon Sep 17 00:00:00 2001 From: Ganeshsivakumar Date: Wed, 13 May 2026 13:04:00 +0530 Subject: [PATCH 6/6] update doc --- .../documentation/transforms/java/aggregation/batchelements.md | 1 + 1 file changed, 1 insertion(+) diff --git a/website/www/site/content/en/documentation/transforms/java/aggregation/batchelements.md b/website/www/site/content/en/documentation/transforms/java/aggregation/batchelements.md index aff952164d1d..f842d303c365 100644 --- a/website/www/site/content/en/documentation/transforms/java/aggregation/batchelements.md +++ b/website/www/site/content/en/documentation/transforms/java/aggregation/batchelements.md @@ -18,6 +18,7 @@ limitations under the License. # BatchElements BatchElements transform groups individual elements into batches before processing them downstream. +It is designed for operations where each call has a fixed overhead regardless of how many elements are processed and the transform amortizes that cost across multiple elements at once. The transform takes a `PCollection` as input and produces a `PCollection>`, where each output element is a batch containing multiple input elements. Batch sizes are chosen dynamically between the configured minimum and maximum values by measuring the execution time of downstream operations.