From 2bce13f1d490529095b25ebf5f1b6a3d1a0a9b45 Mon Sep 17 00:00:00 2001 From: Prajwal Banakar Date: Tue, 16 Jun 2026 16:07:31 +0000 Subject: [PATCH] [FIP-37] Add bitmap scalar functions and register via FlussCatalog --- .../fluss/flink/catalog/FlinkCatalog.java | 14 ++ .../flink/functions/bitmap/RbAndFunction.java | 51 ++++ .../functions/bitmap/RbBuildFunction.java | 54 +++++ .../bitmap/RbCardinalityFunction.java | 47 ++++ .../functions/bitmap/RbContainsFunction.java | 48 ++++ .../flink/functions/bitmap/RbOrFunction.java | 56 +++++ .../functions/bitmap/RbToArrayFunction.java | 51 ++++ .../fluss/flink/catalog/FlinkCatalogTest.java | 40 ++- .../bitmap/RbFunctionsCatalogITCase.java | 126 ++++++++++ .../bitmap/RbScalarFunctionsTest.java | 229 ++++++++++++++++++ 10 files changed, 707 insertions(+), 9 deletions(-) create mode 100644 fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/functions/bitmap/RbAndFunction.java create mode 100644 fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/functions/bitmap/RbBuildFunction.java create mode 100644 fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/functions/bitmap/RbCardinalityFunction.java create mode 100644 fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/functions/bitmap/RbContainsFunction.java create mode 100644 fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/functions/bitmap/RbOrFunction.java create mode 100644 fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/functions/bitmap/RbToArrayFunction.java create mode 100644 fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/functions/bitmap/RbScalarFunctionsTest.java diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java index 103bf311b4..17194c05b8 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java @@ -26,8 +26,14 @@ import org.apache.fluss.flink.FlinkConnectorOptions; import org.apache.fluss.flink.adapter.CatalogTableAdapter; import org.apache.fluss.flink.functions.bitmap.RbAndAggFunction; +import org.apache.fluss.flink.functions.bitmap.RbAndFunction; import org.apache.fluss.flink.functions.bitmap.RbBuildAggFunction; +import org.apache.fluss.flink.functions.bitmap.RbBuildFunction; +import org.apache.fluss.flink.functions.bitmap.RbCardinalityFunction; +import org.apache.fluss.flink.functions.bitmap.RbContainsFunction; import org.apache.fluss.flink.functions.bitmap.RbOrAggFunction; +import org.apache.fluss.flink.functions.bitmap.RbOrFunction; +import org.apache.fluss.flink.functions.bitmap.RbToArrayFunction; import org.apache.fluss.flink.lake.LakeFlinkCatalog; import org.apache.fluss.flink.procedure.ProcedureManager; import org.apache.fluss.flink.utils.CatalogExceptionUtils; @@ -144,9 +150,17 @@ public class FlinkCatalog extends AbstractCatalog { static { Map map = new HashMap<>(); + // aggregate functions map.put("rb_build_agg", RbBuildAggFunction.class.getName()); map.put("rb_or_agg", RbOrAggFunction.class.getName()); map.put("rb_and_agg", RbAndAggFunction.class.getName()); + // scalar functions + map.put("rb_cardinality", RbCardinalityFunction.class.getName()); + map.put("rb_build", RbBuildFunction.class.getName()); + map.put("rb_contains", RbContainsFunction.class.getName()); + map.put("rb_to_array", RbToArrayFunction.class.getName()); + map.put("rb_or", RbOrFunction.class.getName()); + map.put("rb_and", RbAndFunction.class.getName()); BUILTIN_BITMAP_FUNCTIONS = Collections.unmodifiableMap(map); } diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/functions/bitmap/RbAndFunction.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/functions/bitmap/RbAndFunction.java new file mode 100644 index 0000000000..5b95f93a1c --- /dev/null +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/functions/bitmap/RbAndFunction.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.functions.bitmap; + +import org.apache.flink.table.functions.ScalarFunction; +import org.roaringbitmap.RoaringBitmap; + +import javax.annotation.Nullable; + +import java.io.IOException; + +/** + * {@code rb_and(left BYTES, right BYTES) -> BYTES} + * + *

Returns the bitwise AND (intersection) of two serialized {@link RoaringBitmap} values. Returns + * {@code null} if either argument is null. Returns the intersection bytes even if the result is an + * empty bitmap, since both inputs were explicitly provided by the caller. + */ +public class RbAndFunction extends ScalarFunction { + + /** + * @param leftBytes serialized left bitmap + * @param rightBytes serialized right bitmap + * @return intersection of left and right, or null if either argument is null + */ + @Nullable + public byte[] eval(@Nullable byte[] leftBytes, @Nullable byte[] rightBytes) throws IOException { + if (leftBytes == null || rightBytes == null) { + return null; + } + RoaringBitmap left = BitmapUtils.fromBytes(leftBytes); + RoaringBitmap right = BitmapUtils.fromBytes(rightBytes); + left.and(right); + return BitmapUtils.toBytes(left); + } +} diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/functions/bitmap/RbBuildFunction.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/functions/bitmap/RbBuildFunction.java new file mode 100644 index 0000000000..97e59753a9 --- /dev/null +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/functions/bitmap/RbBuildFunction.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.functions.bitmap; + +import org.apache.flink.table.functions.ScalarFunction; +import org.roaringbitmap.RoaringBitmap; + +import javax.annotation.Nullable; + +import java.io.IOException; + +/** + * {@code rb_build(v1 INT, v2 INT, ...) -> BYTES} + * + *

Builds a serialized {@link RoaringBitmap} from a variadic list of 32-bit integer values within + * a single row. Unlike {@code rb_build_agg}, this function operates on individual column values in + * the same row rather than aggregating across rows. Null values are ignored. Returns {@code null} + * if all inputs are null or no inputs are provided. + */ +public class RbBuildFunction extends ScalarFunction { + + /** + * @param values variadic integer values to add to the bitmap; null values are ignored + * @return serialized bitmap, or null if all inputs are null + */ + @Nullable + public byte[] eval(@Nullable Integer... values) throws IOException { + if (values == null || values.length == 0) { + return null; + } + RoaringBitmap bitmap = new RoaringBitmap(); + for (Integer v : values) { + if (v != null) { + bitmap.add(v); + } + } + return bitmap.isEmpty() ? null : BitmapUtils.toBytes(bitmap); + } +} diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/functions/bitmap/RbCardinalityFunction.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/functions/bitmap/RbCardinalityFunction.java new file mode 100644 index 0000000000..3c1364d583 --- /dev/null +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/functions/bitmap/RbCardinalityFunction.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.functions.bitmap; + +import org.apache.flink.table.functions.ScalarFunction; +import org.roaringbitmap.RoaringBitmap; + +import javax.annotation.Nullable; + +import java.io.IOException; + +/** + * {@code rb_cardinality(bitmap BYTES) -> BIGINT} + * + *

Returns the number of distinct integers in the serialized {@link RoaringBitmap}. Returns + * {@code null} for null or empty input. + */ +public class RbCardinalityFunction extends ScalarFunction { + + /** + * @param bitmapBytes serialized RoaringBitmap; null returns null + * @return number of distinct integers, or null if input is null + */ + @Nullable + public Long eval(@Nullable byte[] bitmapBytes) throws IOException { + if (bitmapBytes == null || bitmapBytes.length == 0) { + return null; + } + RoaringBitmap bitmap = BitmapUtils.fromBytes(bitmapBytes); + return bitmap.getLongCardinality(); + } +} diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/functions/bitmap/RbContainsFunction.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/functions/bitmap/RbContainsFunction.java new file mode 100644 index 0000000000..c77f1b5c6d --- /dev/null +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/functions/bitmap/RbContainsFunction.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.functions.bitmap; + +import org.apache.flink.table.functions.ScalarFunction; +import org.roaringbitmap.RoaringBitmap; + +import javax.annotation.Nullable; + +import java.io.IOException; + +/** + * {@code rb_contains(bitmap BYTES, value INT) -> BOOLEAN} + * + *

Returns {@code true} if the serialized {@link RoaringBitmap} contains the given integer. + * Returns {@code null} if either argument is null. + */ +public class RbContainsFunction extends ScalarFunction { + + /** + * @param bitmapBytes serialized RoaringBitmap + * @param value the integer to check + * @return true if the bitmap contains the value, null if either argument is null + */ + @Nullable + public Boolean eval(@Nullable byte[] bitmapBytes, @Nullable Integer value) throws IOException { + if (bitmapBytes == null || value == null) { + return null; + } + RoaringBitmap bitmap = BitmapUtils.fromBytes(bitmapBytes); + return bitmap.contains(value); + } +} diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/functions/bitmap/RbOrFunction.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/functions/bitmap/RbOrFunction.java new file mode 100644 index 0000000000..0e7e0fc039 --- /dev/null +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/functions/bitmap/RbOrFunction.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.functions.bitmap; + +import org.apache.flink.table.functions.ScalarFunction; +import org.roaringbitmap.RoaringBitmap; + +import javax.annotation.Nullable; + +import java.io.IOException; + +/** + * {@code rb_or(left BYTES, right BYTES) -> BYTES} + * + *

Returns the bitwise OR (union) of two serialized {@link RoaringBitmap} values. If one argument + * is null, the other is returned as-is. Returns {@code null} only if both are null. + */ +public class RbOrFunction extends ScalarFunction { + + /** + * @param leftBytes serialized left bitmap + * @param rightBytes serialized right bitmap + * @return union of left and right, or null if both are null + */ + @Nullable + public byte[] eval(@Nullable byte[] leftBytes, @Nullable byte[] rightBytes) throws IOException { + if (leftBytes == null && rightBytes == null) { + return null; + } + if (leftBytes == null) { + return rightBytes; + } + if (rightBytes == null) { + return leftBytes; + } + RoaringBitmap left = BitmapUtils.fromBytes(leftBytes); + RoaringBitmap right = BitmapUtils.fromBytes(rightBytes); + left.or(right); + return BitmapUtils.toBytes(left); + } +} diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/functions/bitmap/RbToArrayFunction.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/functions/bitmap/RbToArrayFunction.java new file mode 100644 index 0000000000..175a6fd825 --- /dev/null +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/functions/bitmap/RbToArrayFunction.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.functions.bitmap; + +import org.apache.flink.table.functions.ScalarFunction; +import org.roaringbitmap.RoaringBitmap; + +import javax.annotation.Nullable; + +import java.io.IOException; + +/** + * {@code rb_to_array(bitmap BYTES) -> ARRAY} + * + *

Expands a serialized {@link RoaringBitmap} into an array of its integer values in ascending + * order. Returns {@code null} for null input. Returns an empty array for an empty bitmap. + * + *

Note: The return type must be {@code Integer[]} (boxed), not {@code int[]} (primitive). + * Flink's type system maps {@code int[]} to {@code BYTES}, whereas {@code Integer[]} correctly maps + * to {@code ARRAY}. + */ +public class RbToArrayFunction extends ScalarFunction { + + /** + * @param bitmapBytes serialized RoaringBitmap; null returns null + * @return array of integers in ascending order, or null if input is null + */ + @Nullable + public Integer[] eval(@Nullable byte[] bitmapBytes) throws IOException { + if (bitmapBytes == null) { + return null; + } + RoaringBitmap bitmap = BitmapUtils.fromBytes(bitmapBytes); + return bitmap.stream().boxed().toArray(Integer[]::new); + } +} diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogTest.java index 8354fbf20c..6b08780390 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogTest.java @@ -1007,7 +1007,17 @@ void testViewsAndFunctions() throws Exception { // Test functions operations List functions = catalog.listFunctions(DEFAULT_DB); - assertThat(functions).contains("rb_build_agg", "rb_or_agg", "rb_and_agg"); + assertThat(functions) + .contains( + "rb_build_agg", + "rb_or_agg", + "rb_and_agg", + "rb_cardinality", + "rb_build", + "rb_contains", + "rb_to_array", + "rb_or", + "rb_and"); ObjectPath functionPath = new ObjectPath(DEFAULT_DB, "testFunction"); assertThat(catalog.functionExists(functionPath)).isFalse(); @@ -1105,17 +1115,29 @@ void registerLakeTable(ObjectPath tablePath, CatalogTable table) @Test void testBitmapFunctionsRegistered() throws Exception { - List functions = catalog.listFunctions(DEFAULT_DB); + // aggregate functions assertThat(functions).contains("rb_build_agg", "rb_or_agg", "rb_and_agg"); - - assertThat(catalog.functionExists(new ObjectPath(DEFAULT_DB, "rb_build_agg"))).isTrue(); - - assertThat(catalog.functionExists(new ObjectPath(DEFAULT_DB, "rb_or_agg"))).isTrue(); - - assertThat(catalog.functionExists(new ObjectPath(DEFAULT_DB, "rb_and_agg"))).isTrue(); - + // scalar functions + assertThat(functions) + .contains( + "rb_cardinality", + "rb_build", + "rb_contains", + "rb_to_array", + "rb_or", + "rb_and"); + + // verify each function exists and resolves to the correct class + assertThat(catalog.functionExists(new ObjectPath(DEFAULT_DB, "rb_cardinality"))).isTrue(); + assertThat(catalog.functionExists(new ObjectPath(DEFAULT_DB, "rb_build"))).isTrue(); + assertThat(catalog.functionExists(new ObjectPath(DEFAULT_DB, "rb_contains"))).isTrue(); + assertThat(catalog.functionExists(new ObjectPath(DEFAULT_DB, "rb_to_array"))).isTrue(); + assertThat(catalog.functionExists(new ObjectPath(DEFAULT_DB, "rb_or"))).isTrue(); + assertThat(catalog.functionExists(new ObjectPath(DEFAULT_DB, "rb_and"))).isTrue(); + + // verify unknown still returns false assertThat(catalog.functionExists(new ObjectPath(DEFAULT_DB, "unknown_fn"))).isFalse(); } } diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/functions/bitmap/RbFunctionsCatalogITCase.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/functions/bitmap/RbFunctionsCatalogITCase.java index 251d3a255c..700222ba14 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/functions/bitmap/RbFunctionsCatalogITCase.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/functions/bitmap/RbFunctionsCatalogITCase.java @@ -194,4 +194,130 @@ void testAllThreeFunctionsInSingleQuery() throws Exception { assertThat(rows.get(0).getField(2)).isNotNull(); // rb_or_agg assertThat(rows.get(0).getField(3)).isNotNull(); // rb_and_agg } + + @Test + void testRbCardinalityResolvedFromCatalog() throws Exception { + byte[] bitmap = BitmapUtils.toBytes(RoaringBitmap.bitmapOf(10, 20, 30, 40, 50)); + + Table source = + tEnv.fromValues( + DataTypes.ROW(DataTypes.FIELD("bmap", DataTypes.BYTES())), Row.of(bitmap)); + tEnv.createTemporaryView("bitmaps", source); + + TableResult result = tEnv.executeSql("SELECT rb_cardinality(bmap) FROM bitmaps"); + List rows = CollectionUtil.iteratorToList(result.collect()); + + assertThat(rows).hasSize(1); + assertThat(rows.get(0).getField(0)).isEqualTo(5L); + } + + @Test + void testRbBuildResolvedFromCatalog() throws Exception { + TableResult result = tEnv.executeSql("SELECT rb_cardinality(rb_build(1, 2, 3, 2))"); + List rows = CollectionUtil.iteratorToList(result.collect()); + + assertThat(rows).hasSize(1); + assertThat(rows.get(0).getField(0)).isEqualTo(3L); // duplicate 2 ignored + } + + @Test + void testRbContainsResolvedFromCatalog() throws Exception { + byte[] bitmap = BitmapUtils.toBytes(RoaringBitmap.bitmapOf(1, 2, 3)); + + Table source = + tEnv.fromValues( + DataTypes.ROW(DataTypes.FIELD("bmap", DataTypes.BYTES())), Row.of(bitmap)); + tEnv.createTemporaryView("bitmaps", source); + + TableResult result = + tEnv.executeSql("SELECT rb_contains(bmap, 2), rb_contains(bmap, 99) FROM bitmaps"); + List rows = CollectionUtil.iteratorToList(result.collect()); + + assertThat(rows).hasSize(1); + assertThat(rows.get(0).getField(0)).isEqualTo(true); + assertThat(rows.get(0).getField(1)).isEqualTo(false); + } + + @Test + void testRbToArrayResolvedFromCatalog() throws Exception { + byte[] bitmap = BitmapUtils.toBytes(RoaringBitmap.bitmapOf(3, 1, 2)); + + Table source = + tEnv.fromValues( + DataTypes.ROW(DataTypes.FIELD("bmap", DataTypes.BYTES())), Row.of(bitmap)); + tEnv.createTemporaryView("bitmaps", source); + + TableResult result = tEnv.executeSql("SELECT rb_to_array(bmap) FROM bitmaps"); + List rows = CollectionUtil.iteratorToList(result.collect()); + + assertThat(rows).hasSize(1); + Integer[] arr = (Integer[]) rows.get(0).getField(0); + assertThat(arr).containsExactly(1, 2, 3); // ascending order + } + + @Test + void testRbOrScalarResolvedFromCatalog() throws Exception { + byte[] left = BitmapUtils.toBytes(RoaringBitmap.bitmapOf(1, 2, 3)); + byte[] right = BitmapUtils.toBytes(RoaringBitmap.bitmapOf(3, 4, 5)); + + Table source = + tEnv.fromValues( + DataTypes.ROW( + DataTypes.FIELD("l", DataTypes.BYTES()), + DataTypes.FIELD("r", DataTypes.BYTES())), + Row.of(left, right)); + tEnv.createTemporaryView("bitmaps", source); + + TableResult result = tEnv.executeSql("SELECT rb_cardinality(rb_or(l, r)) FROM bitmaps"); + List rows = CollectionUtil.iteratorToList(result.collect()); + + assertThat(rows).hasSize(1); + assertThat(rows.get(0).getField(0)).isEqualTo(5L); + } + + @Test + void testRbAndScalarResolvedFromCatalog() throws Exception { + byte[] left = BitmapUtils.toBytes(RoaringBitmap.bitmapOf(1, 2, 3, 4)); + byte[] right = BitmapUtils.toBytes(RoaringBitmap.bitmapOf(2, 3, 5)); + + Table source = + tEnv.fromValues( + DataTypes.ROW( + DataTypes.FIELD("l", DataTypes.BYTES()), + DataTypes.FIELD("r", DataTypes.BYTES())), + Row.of(left, right)); + tEnv.createTemporaryView("bitmaps", source); + + TableResult result = tEnv.executeSql("SELECT rb_cardinality(rb_and(l, r)) FROM bitmaps"); + List rows = CollectionUtil.iteratorToList(result.collect()); + + assertThat(rows).hasSize(1); + assertThat(rows.get(0).getField(0)).isEqualTo(2L); // {2, 3} + } + + @Test + void testScalarAndAggFunctionsCombined() throws Exception { + // Validates the full Phase 1 function set works together: + // build a bitmap via rb_build_agg, then query it with scalar functions + Table source = + tEnv.fromValues( + DataTypes.ROW( + DataTypes.FIELD("k", DataTypes.INT()), + DataTypes.FIELD("v", DataTypes.INT())), + Row.of(1, 10), + Row.of(1, 20), + Row.of(1, 30)); + tEnv.createTemporaryView("events", source); + + TableResult result = + tEnv.executeSql( + "SELECT rb_cardinality(rb_build_agg(v)), " + + "rb_contains(rb_build_agg(v), 20) " + + "FROM events GROUP BY k"); + List rows = CollectionUtil.iteratorToList(result.collect()); + + assertThat(rows).hasSize(1); + assertThat(rows.get(0).getField(0)).isEqualTo(3L); + assertThat(rows.get(0).getField(1)).isEqualTo(true); + } } diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/functions/bitmap/RbScalarFunctionsTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/functions/bitmap/RbScalarFunctionsTest.java new file mode 100644 index 0000000000..2ab1261b11 --- /dev/null +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/functions/bitmap/RbScalarFunctionsTest.java @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.functions.bitmap; + +import org.junit.jupiter.api.Test; +import org.roaringbitmap.RoaringBitmap; + +import java.io.IOException; +import java.util.Arrays; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Unit tests for the six Phase 1 scalar bitmap functions. */ +class RbScalarFunctionsTest { + + // ------------------------------------------------------------------------- + // rb_cardinality + // ------------------------------------------------------------------------- + + @Test + void testCardinalityBasic() throws IOException { + RbCardinalityFunction fn = new RbCardinalityFunction(); + byte[] bytes = BitmapUtils.toBytes(RoaringBitmap.bitmapOf(1, 2, 3, 4, 5)); + assertThat(fn.eval(bytes)).isEqualTo(5L); + } + + @Test + void testCardinalityNullInput() throws IOException { + assertThat(new RbCardinalityFunction().eval(null)).isNull(); + } + + @Test + void testCardinalityEmptyInput() throws IOException { + assertThat(new RbCardinalityFunction().eval(new byte[0])).isNull(); + } + + @Test + void testCardinalityEmptyBitmap() throws IOException { + RbCardinalityFunction fn = new RbCardinalityFunction(); + byte[] bytes = BitmapUtils.toBytes(new RoaringBitmap()); + assertThat(fn.eval(bytes)).isEqualTo(0L); + } + + // ------------------------------------------------------------------------- + // rb_build + // ------------------------------------------------------------------------- + + @Test + void testBuildBasic() throws IOException { + RbBuildFunction fn = new RbBuildFunction(); + byte[] result = fn.eval(1, 2, 3, 2); // duplicate 2 ignored + assertThat(result).isNotNull(); + RoaringBitmap bitmap = BitmapUtils.fromBytes(result); + assertThat(bitmap.getLongCardinality()).isEqualTo(3L); + assertThat(bitmap.contains(1)).isTrue(); + assertThat(bitmap.contains(3)).isTrue(); + } + + @Test + void testBuildNullInputs() throws IOException { + RbBuildFunction fn = new RbBuildFunction(); + assertThat(fn.eval((Integer[]) null)).isNull(); + assertThat(fn.eval(new Integer[0])).isNull(); + assertThat(fn.eval(null, null)).isNull(); // all null values + } + + @Test + void testBuildNullValuesIgnored() throws IOException { + RbBuildFunction fn = new RbBuildFunction(); + byte[] result = fn.eval(1, null, 3); + RoaringBitmap bitmap = BitmapUtils.fromBytes(result); + assertThat(bitmap.getLongCardinality()).isEqualTo(2L); + assertThat(bitmap.contains(2)).isFalse(); + } + + // ------------------------------------------------------------------------- + // rb_contains + // ------------------------------------------------------------------------- + + @Test + void testContainsTrue() throws IOException { + RbContainsFunction fn = new RbContainsFunction(); + byte[] bytes = BitmapUtils.toBytes(RoaringBitmap.bitmapOf(10, 20, 30)); + assertThat(fn.eval(bytes, 20)).isTrue(); + } + + @Test + void testContainsFalse() throws IOException { + RbContainsFunction fn = new RbContainsFunction(); + byte[] bytes = BitmapUtils.toBytes(RoaringBitmap.bitmapOf(10, 20, 30)); + assertThat(fn.eval(bytes, 99)).isFalse(); + } + + @Test + void testContainsNullBitmap() throws IOException { + assertThat(new RbContainsFunction().eval(null, 1)).isNull(); + } + + @Test + void testContainsNullValue() throws IOException { + byte[] bytes = BitmapUtils.toBytes(RoaringBitmap.bitmapOf(1, 2)); + assertThat(new RbContainsFunction().eval(bytes, null)).isNull(); + } + + // ------------------------------------------------------------------------- + // rb_to_array + // ------------------------------------------------------------------------- + + @Test + void testToArrayBasic() throws IOException { + RbToArrayFunction fn = new RbToArrayFunction(); + byte[] bytes = BitmapUtils.toBytes(RoaringBitmap.bitmapOf(3, 1, 2)); + Integer[] result = fn.eval(bytes); + assertThat(result).isNotNull(); + // RoaringBitmap returns values in ascending order + assertThat(Arrays.asList(result)).containsExactly(1, 2, 3); + } + + @Test + void testToArrayNullInput() throws IOException { + assertThat(new RbToArrayFunction().eval(null)).isNull(); + } + + @Test + void testToArrayEmptyBitmap() throws IOException { + RbToArrayFunction fn = new RbToArrayFunction(); + byte[] bytes = BitmapUtils.toBytes(new RoaringBitmap()); + Integer[] result = fn.eval(bytes); + assertThat(result).isNotNull().isEmpty(); + } + + @Test + void testToArrayReturnTypeIsIntegerArray() throws IOException { + // Verifies the return type is Integer[] (ARRAY), not int[] (BYTES) + RbToArrayFunction fn = new RbToArrayFunction(); + byte[] bytes = BitmapUtils.toBytes(RoaringBitmap.bitmapOf(5)); + Object result = fn.eval(bytes); + assertThat(result).isInstanceOf(Integer[].class); + } + + // ------------------------------------------------------------------------- + // rb_or (scalar) + // ------------------------------------------------------------------------- + + @Test + void testOrBasic() throws IOException { + RbOrFunction fn = new RbOrFunction(); + byte[] left = BitmapUtils.toBytes(RoaringBitmap.bitmapOf(1, 2, 3)); + byte[] right = BitmapUtils.toBytes(RoaringBitmap.bitmapOf(3, 4, 5)); + byte[] result = fn.eval(left, right); + RoaringBitmap union = BitmapUtils.fromBytes(result); + assertThat(union.getLongCardinality()).isEqualTo(5L); + } + + @Test + void testOrLeftNull() throws IOException { + RbOrFunction fn = new RbOrFunction(); + byte[] right = BitmapUtils.toBytes(RoaringBitmap.bitmapOf(1, 2)); + assertThat(fn.eval(null, right)).isEqualTo(right); + } + + @Test + void testOrRightNull() throws IOException { + RbOrFunction fn = new RbOrFunction(); + byte[] left = BitmapUtils.toBytes(RoaringBitmap.bitmapOf(1, 2)); + assertThat(fn.eval(left, null)).isEqualTo(left); + } + + @Test + void testOrBothNull() throws IOException { + assertThat(new RbOrFunction().eval(null, null)).isNull(); + } + + // ------------------------------------------------------------------------- + // rb_and (scalar) + // ------------------------------------------------------------------------- + + @Test + void testAndBasic() throws IOException { + RbAndFunction fn = new RbAndFunction(); + byte[] left = BitmapUtils.toBytes(RoaringBitmap.bitmapOf(1, 2, 3)); + byte[] right = BitmapUtils.toBytes(RoaringBitmap.bitmapOf(2, 3, 4)); + byte[] result = fn.eval(left, right); + RoaringBitmap intersection = BitmapUtils.fromBytes(result); + assertThat(intersection.getLongCardinality()).isEqualTo(2L); + assertThat(intersection.contains(2)).isTrue(); + assertThat(intersection.contains(3)).isTrue(); + } + + @Test + void testAndLeftNull() throws IOException { + byte[] right = BitmapUtils.toBytes(RoaringBitmap.bitmapOf(1, 2)); + assertThat(new RbAndFunction().eval(null, right)).isNull(); + } + + @Test + void testAndRightNull() throws IOException { + byte[] left = BitmapUtils.toBytes(RoaringBitmap.bitmapOf(1, 2)); + assertThat(new RbAndFunction().eval(left, null)).isNull(); + } + + @Test + void testAndEmptyIntersectionReturnsBytes() throws IOException { + // Unlike rb_and_agg, scalar rb_and returns bytes even for empty intersection + // because both inputs were explicitly provided + RbAndFunction fn = new RbAndFunction(); + byte[] left = BitmapUtils.toBytes(RoaringBitmap.bitmapOf(1, 2)); + byte[] right = BitmapUtils.toBytes(RoaringBitmap.bitmapOf(3, 4)); + byte[] result = fn.eval(left, right); + assertThat(result).isNotNull(); + RoaringBitmap intersection = BitmapUtils.fromBytes(result); + assertThat(intersection.isEmpty()).isTrue(); + } +}