diff --git a/examples/java/src/main/java/org/apache/beam/examples/BatchElementsExample.java b/examples/java/src/main/java/org/apache/beam/examples/BatchElementsExample.java new file mode 100644 index 000000000000..79130d7ac138 --- /dev/null +++ b/examples/java/src/main/java/org/apache/beam/examples/BatchElementsExample.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.examples; + +import java.util.List; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.transforms.BatchElements; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +// beam-playground: +// name: BatchElements +// description: Demonstration of BatchElements transform usage. +// multifile: false +// default_example: false +// context_line: 47 +// categories: +// - Core Transforms +// complexity: BASIC +// tags: +// - transforms +// - batch + +public class BatchElementsExample { + public static void main(String[] args) { + PipelineOptions options = PipelineOptionsFactory.create(); + + Pipeline pipeline = Pipeline.create(options); + + // [START main_section] + // Create input + + PCollection inputs = + pipeline.apply(Create.of("apple", "strawberry", "orange", "peach", "cherry", "pear")); + + // Create Batch Config + BatchElements.BatchConfig config = + BatchElements.BatchConfig.builder().withMinBatchSize(2).withMaxBatchSize(4).build(); + // Batch Elements + PCollection> result = inputs.apply(BatchElements.withConfig(config)); + // [END main_section] + result.apply(ParDo.of(new LogOutput())); + pipeline.run(); + } + + static class LogOutput extends DoFn, String> { + private static final Logger LOG = LoggerFactory.getLogger(LogOutput.class); + + @ProcessElement + public void processElement(ProcessContext c) throws Exception { + List batch = c.element(); + + LOG.info("Batch Contents: {}", batch); + + for (String element : batch) { + c.output(element); + } + } + } +} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/BatchElements.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/BatchElements.java index 35796d1b1385..d410c5be6007 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/BatchElements.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/BatchElements.java @@ -17,6 +17,8 @@ */ package org.apache.beam.sdk.transforms; +import static java.util.Collections.singleton; + import java.io.Serializable; import java.util.ArrayList; import java.util.Comparator; @@ -27,6 +29,7 @@ import javax.annotation.Nullable; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.WindowingStrategy; @@ -568,7 +571,11 @@ public void processElement( try (BatchSizeEstimator.Stopwatch sw = estimator.recordTime(targetBatch.size)) { - receiver.outputWithTimestamp(targetBatch.elements, targetWindow.maxTimestamp()); + receiver.outputWindowedValue( + targetBatch.elements, + targetWindow.maxTimestamp(), + singleton(targetWindow), + PaneInfo.NO_FIRING); } batches.remove(targetWindow); diff --git a/website/www/site/content/en/documentation/transforms/java/aggregation/batchelements.md b/website/www/site/content/en/documentation/transforms/java/aggregation/batchelements.md new file mode 100644 index 000000000000..f842d303c365 --- /dev/null +++ b/website/www/site/content/en/documentation/transforms/java/aggregation/batchelements.md @@ -0,0 +1,31 @@ +--- +title: "BatchElements" +--- + + +# BatchElements + +BatchElements transform groups individual elements into batches before processing them downstream. +It is designed for operations where each call has a fixed overhead regardless of how many elements are processed and the transform amortizes that cost across multiple elements at once. +The transform takes a `PCollection` as input and produces a `PCollection>`, where each output element is a batch containing multiple input elements. +Batch sizes are chosen dynamically between the configured minimum and maximum values by measuring the execution time of downstream operations. + +Batching is performed per window. Each emitted batch belongs to the same window as its input elements. + +## Examples + +{{< playground height="700px" >}} +{{< playground_snippet language="java" path="SDK_JAVA_BatchElements" show="main_section" >}} +{{< /playground >}} diff --git a/website/www/site/content/en/documentation/transforms/java/overview.md b/website/www/site/content/en/documentation/transforms/java/overview.md index 59aa93930fbe..adb4da51a836 100644 --- a/website/www/site/content/en/documentation/transforms/java/overview.md +++ b/website/www/site/content/en/documentation/transforms/java/overview.md @@ -55,6 +55,7 @@ limitations under the License. GroupByKeyTakes a keyed collection of elements and produces a collection where each element consists of a key and all values associated with that key. GroupIntoBatchesBatches values associated with keys into Iterable batches of some size. Each batch contains elements associated with a specific key. + BatchElementsGroups individual elements into batches to amortize fixed processing costs, using dynamically estimated batch sizes. HllCountEstimates the number of distinct elements and creates re-aggregatable sketches using the HyperLogLog++ algorithm. LatestSelects the latest element within each aggregation according to the implicit timestamp. MaxOutputs the maximum element within each aggregation. diff --git a/website/www/site/layouts/partials/section-menu/en/documentation.html b/website/www/site/layouts/partials/section-menu/en/documentation.html index 57514935825e..41100270d827 100755 --- a/website/www/site/layouts/partials/section-menu/en/documentation.html +++ b/website/www/site/layouts/partials/section-menu/en/documentation.html @@ -404,6 +404,7 @@
  • Distinct
  • GroupByKey
  • GroupIntoBatches
  • +
  • BatchElements
  • HllCount
  • Latest
  • Max