Skip to content

Commit ab493cd

Browse files
[FIP-37] Add bitmap aggregate functions: rb_build_agg, rb_or_agg, rb_and_agg and register via FlussCatalog (#3398)
* Add bitmap aggregate functions: rb_build_agg, rb_or_agg, rb_and_agg and register via FlussCatalog * Improve coverage for RbAndAggFunction inner classes * add RbFunctionsCatalogITCase and address review comments * add some small improvements --------- Co-authored-by: ipolyzos <ipolyzos@apache.org>
1 parent 0f5909a commit ab493cd

8 files changed

Lines changed: 1016 additions & 8 deletions

File tree

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java

Lines changed: 30 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,9 @@
2525
import org.apache.fluss.exception.InvalidTableException;
2626
import org.apache.fluss.flink.FlinkConnectorOptions;
2727
import org.apache.fluss.flink.adapter.CatalogTableAdapter;
28+
import org.apache.fluss.flink.functions.bitmap.RbAndAggFunction;
29+
import org.apache.fluss.flink.functions.bitmap.RbBuildAggFunction;
30+
import org.apache.fluss.flink.functions.bitmap.RbOrAggFunction;
2831
import org.apache.fluss.flink.lake.LakeFlinkCatalog;
2932
import org.apache.fluss.flink.procedure.ProcedureManager;
3033
import org.apache.fluss.flink.utils.CatalogExceptionUtils;
@@ -48,11 +51,13 @@
4851
import org.apache.flink.table.catalog.CatalogDatabase;
4952
import org.apache.flink.table.catalog.CatalogDatabaseImpl;
5053
import org.apache.flink.table.catalog.CatalogFunction;
54+
import org.apache.flink.table.catalog.CatalogFunctionImpl;
5155
import org.apache.flink.table.catalog.CatalogMaterializedTable;
5256
import org.apache.flink.table.catalog.CatalogPartition;
5357
import org.apache.flink.table.catalog.CatalogPartitionSpec;
5458
import org.apache.flink.table.catalog.CatalogTable;
5559
import org.apache.flink.table.catalog.CatalogView;
60+
import org.apache.flink.table.catalog.FunctionLanguage;
5661
import org.apache.flink.table.catalog.ObjectPath;
5762
import org.apache.flink.table.catalog.ResolvedCatalogBaseTable;
5863
import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable;
@@ -83,6 +88,7 @@
8388
import java.util.HashMap;
8489
import java.util.HashSet;
8590
import java.util.List;
91+
import java.util.Locale;
8692
import java.util.Map;
8793
import java.util.Objects;
8894
import java.util.Optional;
@@ -134,6 +140,16 @@ public class FlinkCatalog extends AbstractCatalog {
134140
protected Connection connection;
135141
protected Admin admin;
136142

143+
private static final Map<String, String> BUILTIN_BITMAP_FUNCTIONS;
144+
145+
static {
146+
Map<String, String> map = new HashMap<>();
147+
map.put("rb_build_agg", RbBuildAggFunction.class.getName());
148+
map.put("rb_or_agg", RbOrAggFunction.class.getName());
149+
map.put("rb_and_agg", RbAndAggFunction.class.getName());
150+
BUILTIN_BITMAP_FUNCTIONS = Collections.unmodifiableMap(map);
151+
}
152+
137153
public FlinkCatalog(
138154
String name,
139155
String defaultDatabase,
@@ -746,19 +762,26 @@ public void alterPartition(
746762
}
747763

748764
@Override
749-
public List<String> listFunctions(String s) throws DatabaseNotExistException, CatalogException {
750-
return Collections.emptyList();
765+
public List<String> listFunctions(String dbName)
766+
throws DatabaseNotExistException, CatalogException {
767+
return new ArrayList<>(BUILTIN_BITMAP_FUNCTIONS.keySet());
751768
}
752769

753770
@Override
754-
public CatalogFunction getFunction(ObjectPath functionPath)
755-
throws FunctionNotExistException, CatalogException {
756-
throw new FunctionNotExistException(getName(), functionPath);
771+
public boolean functionExists(ObjectPath objectPath) throws CatalogException {
772+
return BUILTIN_BITMAP_FUNCTIONS.containsKey(
773+
objectPath.getObjectName().toLowerCase(Locale.ROOT));
757774
}
758775

759776
@Override
760-
public boolean functionExists(ObjectPath objectPath) throws CatalogException {
761-
return false;
777+
public CatalogFunction getFunction(ObjectPath functionPath)
778+
throws FunctionNotExistException, CatalogException {
779+
String className =
780+
BUILTIN_BITMAP_FUNCTIONS.get(functionPath.getObjectName().toLowerCase(Locale.ROOT));
781+
if (className == null) {
782+
throw new FunctionNotExistException(getName(), functionPath);
783+
}
784+
return new CatalogFunctionImpl(className, FunctionLanguage.JAVA);
762785
}
763786

764787
@Override
Lines changed: 310 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,310 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.flink.functions.bitmap;
19+
20+
import org.apache.fluss.exception.FlussRuntimeException;
21+
22+
import org.apache.flink.api.common.ExecutionConfig;
23+
import org.apache.flink.api.common.typeinfo.TypeInformation;
24+
import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
25+
import org.apache.flink.api.common.typeutils.TypeSerializer;
26+
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
27+
import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
28+
import org.apache.flink.core.memory.DataInputView;
29+
import org.apache.flink.core.memory.DataOutputView;
30+
import org.apache.flink.table.annotation.DataTypeHint;
31+
import org.apache.flink.table.annotation.FunctionHint;
32+
import org.apache.flink.table.functions.AggregateFunction;
33+
import org.roaringbitmap.RoaringBitmap;
34+
35+
import javax.annotation.Nullable;
36+
import javax.annotation.concurrent.ThreadSafe;
37+
38+
import java.io.IOException;
39+
import java.util.Objects;
40+
41+
/**
42+
* {@code rb_and_agg(bitmap BYTES) -> BYTES}
43+
*
44+
* <p>Intersects multiple serialized {@link RoaringBitmap} values using bitwise AND across rows.
45+
*
46+
* <p>Unlike {@link RbOrAggFunction}, this function requires a custom {@link Accumulator} that
47+
* carries an {@code initialized} flag alongside the bitmap. This is necessary because an empty
48+
* bitmap cannot be used as a sentinel for "no input received yet": once the AND result becomes the
49+
* empty set, it must stay empty even if further inputs arrive, whereas a truly uninitialized
50+
* accumulator must be seeded with the first input via OR before AND can proceed.
51+
*
52+
* <p>Note: there is no server-side {@code FieldRoaringBitmapAndAgg} counterpart. This function
53+
* executes entirely in Flink. Users should be aware that combining it with {@code
54+
* table.merge-engine=aggregation} may produce unexpected results during server-side compaction.
55+
*
56+
* <p>Returns {@code null} when all inputs are null or the input set is empty.
57+
*/
58+
@FunctionHint(
59+
accumulator = @DataTypeHint(value = "RAW", bridgedTo = RbAndAggFunction.Accumulator.class))
60+
public class RbAndAggFunction extends AggregateFunction<byte[], RbAndAggFunction.Accumulator> {
61+
62+
// -------------------------------------------------------------------------
63+
// Accumulator
64+
// -------------------------------------------------------------------------
65+
66+
/** Mutable accumulator that tracks initialization state for AND aggregation. */
67+
public static final class Accumulator {
68+
69+
/** True after the first non-null input has been accumulated. */
70+
public boolean initialized = false;
71+
72+
/** Current AND result; meaningless if {@code initialized} is false. */
73+
public RoaringBitmap value = new RoaringBitmap();
74+
}
75+
76+
// -------------------------------------------------------------------------
77+
// AggregateFunction implementation
78+
// -------------------------------------------------------------------------
79+
80+
@Override
81+
public Accumulator createAccumulator() {
82+
return new Accumulator();
83+
}
84+
85+
/**
86+
* Intersects the input bitmap into the accumulator.
87+
*
88+
* @param acc the running accumulator
89+
* @param bitmapBytes serialized RoaringBitmap bytes; null and empty arrays are ignored
90+
*/
91+
public void accumulate(Accumulator acc, @Nullable byte[] bitmapBytes) throws IOException {
92+
if (bitmapBytes == null || bitmapBytes.length == 0) {
93+
return;
94+
}
95+
RoaringBitmap input = BitmapUtils.fromBytes(bitmapBytes);
96+
if (!acc.initialized) {
97+
acc.value.or(input);
98+
acc.initialized = true;
99+
} else {
100+
acc.value.and(input);
101+
}
102+
}
103+
104+
/** Merges partial accumulators, required for two-phase aggregation in the Flink Table API. */
105+
public void merge(Accumulator acc, Iterable<Accumulator> it) {
106+
for (Accumulator other : it) {
107+
if (!other.initialized) {
108+
continue;
109+
}
110+
if (!acc.initialized) {
111+
acc.value.or(other.value);
112+
acc.initialized = true;
113+
} else {
114+
acc.value.and(other.value);
115+
}
116+
}
117+
}
118+
119+
public void resetAccumulator(Accumulator acc) {
120+
acc.initialized = false;
121+
acc.value.clear();
122+
}
123+
124+
public void retract(Accumulator acc, byte[] bitmapBytes) {
125+
throw new UnsupportedOperationException(
126+
"rb_and_agg does not support retraction. " + "Use it only on append-only streams.");
127+
}
128+
129+
@Override
130+
@Nullable
131+
public byte[] getValue(Accumulator acc) {
132+
if (!acc.initialized || acc.value.isEmpty()) {
133+
return null;
134+
}
135+
try {
136+
return BitmapUtils.toBytes(acc.value);
137+
} catch (IOException e) {
138+
throw new FlussRuntimeException("Failed to serialize rb_and_agg accumulator.", e);
139+
}
140+
}
141+
142+
@Override
143+
public TypeInformation<Accumulator> getAccumulatorType() {
144+
return AccumulatorTypeInfo.INSTANCE;
145+
}
146+
147+
// -------------------------------------------------------------------------
148+
// TypeSerializer and TypeInformation for Accumulator
149+
// -------------------------------------------------------------------------
150+
151+
/** {@link TypeInformation} for {@link Accumulator}. */
152+
@ThreadSafe
153+
public static final class AccumulatorTypeInfo extends TypeInformation<Accumulator> {
154+
155+
public static final AccumulatorTypeInfo INSTANCE = new AccumulatorTypeInfo();
156+
157+
private static final long serialVersionUID = 1L;
158+
159+
private AccumulatorTypeInfo() {}
160+
161+
@Override
162+
public boolean isBasicType() {
163+
return false;
164+
}
165+
166+
@Override
167+
public boolean isTupleType() {
168+
return false;
169+
}
170+
171+
@Override
172+
public int getArity() {
173+
return 1;
174+
}
175+
176+
@Override
177+
public int getTotalFields() {
178+
return 1;
179+
}
180+
181+
@Override
182+
public Class<Accumulator> getTypeClass() {
183+
return Accumulator.class;
184+
}
185+
186+
@Override
187+
public boolean isKeyType() {
188+
return false;
189+
}
190+
191+
@Override
192+
public TypeSerializer<Accumulator> createSerializer(ExecutionConfig config) {
193+
return AccumulatorSerializer.INSTANCE;
194+
}
195+
196+
@Override
197+
public String toString() {
198+
return "RbAndAccumulatorTypeInfo";
199+
}
200+
201+
@Override
202+
public boolean equals(Object obj) {
203+
return obj instanceof AccumulatorTypeInfo && ((AccumulatorTypeInfo) obj).canEqual(this);
204+
}
205+
206+
@Override
207+
public int hashCode() {
208+
return Objects.hash(getTypeClass());
209+
}
210+
211+
@Override
212+
public boolean canEqual(Object obj) {
213+
return obj instanceof AccumulatorTypeInfo;
214+
}
215+
}
216+
217+
/** {@link TypeSerializer} for {@link Accumulator}. */
218+
@ThreadSafe
219+
public static final class AccumulatorSerializer extends TypeSerializerSingleton<Accumulator> {
220+
221+
public static final AccumulatorSerializer INSTANCE = new AccumulatorSerializer();
222+
223+
private static final long serialVersionUID = 1L;
224+
225+
private AccumulatorSerializer() {}
226+
227+
@Override
228+
public boolean isImmutableType() {
229+
return false;
230+
}
231+
232+
@Override
233+
public Accumulator createInstance() {
234+
return new Accumulator();
235+
}
236+
237+
@Override
238+
public Accumulator copy(Accumulator from) {
239+
Accumulator copy = new Accumulator();
240+
copy.initialized = from.initialized;
241+
copy.value = from.value.clone();
242+
return copy;
243+
}
244+
245+
@Override
246+
public Accumulator copy(Accumulator from, Accumulator reuse) {
247+
return copy(from);
248+
}
249+
250+
@Override
251+
public int getLength() {
252+
return -1;
253+
}
254+
255+
@Override
256+
public void serialize(Accumulator record, DataOutputView target) throws IOException {
257+
target.writeBoolean(record.initialized);
258+
if (record.initialized) {
259+
byte[] bytes = BitmapUtils.toBytes(record.value);
260+
target.writeInt(bytes.length);
261+
target.write(bytes);
262+
}
263+
}
264+
265+
@Override
266+
public Accumulator deserialize(DataInputView source) throws IOException {
267+
Accumulator acc = new Accumulator();
268+
acc.initialized = source.readBoolean();
269+
if (acc.initialized) {
270+
int size = source.readInt();
271+
byte[] bytes = new byte[size];
272+
source.readFully(bytes);
273+
acc.value = BitmapUtils.fromBytes(bytes);
274+
}
275+
return acc;
276+
}
277+
278+
@Override
279+
public Accumulator deserialize(Accumulator reuse, DataInputView source) throws IOException {
280+
return deserialize(source);
281+
}
282+
283+
@Override
284+
public void copy(DataInputView source, DataOutputView target) throws IOException {
285+
boolean initialized = source.readBoolean();
286+
target.writeBoolean(initialized);
287+
if (initialized) {
288+
int size = source.readInt();
289+
target.writeInt(size);
290+
byte[] buffer = new byte[size];
291+
source.readFully(buffer);
292+
target.write(buffer);
293+
}
294+
}
295+
296+
@Override
297+
public TypeSerializerSnapshot<Accumulator> snapshotConfiguration() {
298+
return new AccumulatorSerializerSnapshot();
299+
}
300+
301+
/** Snapshot for {@link AccumulatorSerializer}. */
302+
public static final class AccumulatorSerializerSnapshot
303+
extends SimpleTypeSerializerSnapshot<Accumulator> {
304+
305+
public AccumulatorSerializerSnapshot() {
306+
super(() -> INSTANCE);
307+
}
308+
}
309+
}
310+
}

0 commit comments

Comments
 (0)