|
| 1 | +/* |
| 2 | + * Licensed to the Apache Software Foundation (ASF) under one |
| 3 | + * or more contributor license agreements. See the NOTICE file |
| 4 | + * distributed with this work for additional information |
| 5 | + * regarding copyright ownership. The ASF licenses this file |
| 6 | + * to you under the Apache License, Version 2.0 (the |
| 7 | + * "License"); you may not use this file except in compliance |
| 8 | + * with the License. You may obtain a copy of the License at |
| 9 | + * |
| 10 | + * http://www.apache.org/licenses/LICENSE-2.0 |
| 11 | + * |
| 12 | + * Unless required by applicable law or agreed to in writing, software |
| 13 | + * distributed under the License is distributed on an "AS IS" BASIS, |
| 14 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 15 | + * See the License for the specific language governing permissions and |
| 16 | + * limitations under the License. |
| 17 | + */ |
| 18 | + |
| 19 | +package org.apache.flink.table.runtime.operators.aggregate; |
| 20 | + |
| 21 | +import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; |
| 22 | +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; |
| 23 | +import org.apache.flink.table.data.GenericRowData; |
| 24 | +import org.apache.flink.table.data.RowData; |
| 25 | +import org.apache.flink.table.runtime.generated.GeneratedAggsHandleFunction; |
| 26 | +import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser; |
| 27 | +import org.apache.flink.table.runtime.generated.RecordEqualiser; |
| 28 | +import org.apache.flink.table.runtime.keyselector.RowDataKeySelector; |
| 29 | +import org.apache.flink.table.runtime.operators.bundle.KeyedMapBundleOperator; |
| 30 | +import org.apache.flink.table.runtime.operators.bundle.trigger.CountBundleTrigger; |
| 31 | +import org.apache.flink.table.runtime.operators.over.SumAggsHandleFunction; |
| 32 | +import org.apache.flink.table.runtime.util.GenericRowRecordSortComparator; |
| 33 | +import org.apache.flink.table.runtime.util.RowDataHarnessAssertor; |
| 34 | +import org.apache.flink.table.types.logical.BigIntType; |
| 35 | +import org.apache.flink.table.types.logical.LogicalType; |
| 36 | +import org.apache.flink.table.types.logical.RowType; |
| 37 | +import org.apache.flink.table.types.logical.VarCharType; |
| 38 | +import org.apache.flink.table.utils.HandwrittenSelectorUtil; |
| 39 | + |
| 40 | +import org.junit.jupiter.api.Test; |
| 41 | + |
| 42 | +import java.util.ArrayList; |
| 43 | +import java.util.List; |
| 44 | + |
| 45 | +import static org.apache.flink.table.runtime.util.StreamRecordUtils.deleteRecord; |
| 46 | +import static org.apache.flink.table.runtime.util.StreamRecordUtils.insertRecord; |
| 47 | + |
| 48 | +/** |
| 49 | + * Tests for {@link MiniBatchGroupAggFunction}. |
| 50 | + * |
| 51 | + * <p>This test covers the scenario where MiniBatchGroupAggFunction.finishBundle() encounters a key |
| 52 | + * with only retraction messages and no state. |
| 53 | + */ |
| 54 | +class MiniBatchGroupAggFunctionTest { |
| 55 | + |
| 56 | + // Input row: (key: String, value: Long) |
| 57 | + private final LogicalType[] inputFieldTypes = |
| 58 | + new LogicalType[] {VarCharType.STRING_TYPE, new BigIntType()}; |
| 59 | + |
| 60 | + private final RowType inputRowType = RowType.of(inputFieldTypes, new String[] {"key", "value"}); |
| 61 | + |
| 62 | + // Accumulator: (sum: Long) |
| 63 | + private final LogicalType[] accTypes = new LogicalType[] {new BigIntType()}; |
| 64 | + |
| 65 | + // Output row: (key: String, sum: Long) |
| 66 | + private final LogicalType[] outputTypes = |
| 67 | + new LogicalType[] {VarCharType.STRING_TYPE, new BigIntType()}; |
| 68 | + |
| 69 | + private final int keyIdx = 0; |
| 70 | + private final RowDataKeySelector keySelector = |
| 71 | + HandwrittenSelectorUtil.getRowDataSelector(new int[] {keyIdx}, inputFieldTypes); |
| 72 | + |
| 73 | + private final RowDataHarnessAssertor assertor = |
| 74 | + new RowDataHarnessAssertor( |
| 75 | + outputTypes, new GenericRowRecordSortComparator(keyIdx, outputTypes[keyIdx])); |
| 76 | + |
| 77 | + private final GeneratedAggsHandleFunction genAggsHandler = |
| 78 | + new GeneratedAggsHandleFunction("SumAgg", "", new Object[0]) { |
| 79 | + @Override |
| 80 | + public SumAggsHandleFunction newInstance(ClassLoader classLoader) { |
| 81 | + return new SumAggsHandleFunction(1); // inputIndex = 1 (the value field) |
| 82 | + } |
| 83 | + }; |
| 84 | + |
| 85 | + private final GeneratedRecordEqualiser genRecordEqualiser = |
| 86 | + new GeneratedRecordEqualiser("Equaliser", "", new Object[0]) { |
| 87 | + @Override |
| 88 | + public RecordEqualiser newInstance(ClassLoader classLoader) { |
| 89 | + return (row1, row2) -> { |
| 90 | + if (row1 instanceof GenericRowData && row2 instanceof GenericRowData) { |
| 91 | + return row1.equals(row2); |
| 92 | + } |
| 93 | + return false; |
| 94 | + }; |
| 95 | + } |
| 96 | + }; |
| 97 | + |
| 98 | + private MiniBatchGroupAggFunction createFunction(boolean generateUpdateBefore) { |
| 99 | + return new MiniBatchGroupAggFunction( |
| 100 | + genAggsHandler, |
| 101 | + genRecordEqualiser, |
| 102 | + accTypes, |
| 103 | + inputRowType, |
| 104 | + -1, // no COUNT(*) for this test |
| 105 | + generateUpdateBefore, |
| 106 | + 0); // no state retention |
| 107 | + } |
| 108 | + |
| 109 | + @SuppressWarnings({"unchecked", "rawtypes"}) |
| 110 | + private OneInputStreamOperatorTestHarness<RowData, RowData> createTestHarness( |
| 111 | + MiniBatchGroupAggFunction function) throws Exception { |
| 112 | + CountBundleTrigger trigger = new CountBundleTrigger<>(10); |
| 113 | + KeyedMapBundleOperator operator = new KeyedMapBundleOperator(function, trigger); |
| 114 | + return new KeyedOneInputStreamOperatorTestHarness<>( |
| 115 | + operator, keySelector, keySelector.getProducedType()); |
| 116 | + } |
| 117 | + |
| 118 | + /** |
| 119 | + * Verifies that when finishBundle processes a key with only retraction messages (which gets |
| 120 | + * filtered out because there's no accumulator state), the method continues to process |
| 121 | + * subsequent keys in the bundle instead of returning early. |
| 122 | + */ |
| 123 | + @Test |
| 124 | + void testFinishBundleContinuesAfterEmptyInputRows() throws Exception { |
| 125 | + MiniBatchGroupAggFunction function = createFunction(false); |
| 126 | + OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = |
| 127 | + createTestHarness(function); |
| 128 | + testHarness.open(); |
| 129 | + |
| 130 | + // Process a DELETE for key "aaa" (no existing state, will be filtered out) |
| 131 | + testHarness.processElement(deleteRecord("aaa", 1L)); |
| 132 | + |
| 133 | + // Process INSERTs for keys "bbb" and "ccc" |
| 134 | + testHarness.processElement(insertRecord("bbb", 2L)); |
| 135 | + testHarness.processElement(insertRecord("ccc", 3L)); |
| 136 | + |
| 137 | + // Close to trigger finishBundle |
| 138 | + testHarness.close(); |
| 139 | + |
| 140 | + // Verify that keys "bbb" and "ccc" were processed correctly |
| 141 | + List<Object> expectedOutput = new ArrayList<>(); |
| 142 | + expectedOutput.add(insertRecord("bbb", 2L)); |
| 143 | + expectedOutput.add(insertRecord("ccc", 3L)); |
| 144 | + |
| 145 | + assertor.assertOutputEqualsSorted( |
| 146 | + "Keys after retraction-only key should still be processed", |
| 147 | + expectedOutput, |
| 148 | + testHarness.getOutput()); |
| 149 | + } |
| 150 | +} |
0 commit comments