-
Notifications
You must be signed in to change notification settings - Fork 542
[FIP-37] Add bitmap infrastructure: BitmapUtils, RoaringBitmapSerializer, AbstractRbAggFunction #3319
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
[FIP-37] Add bitmap infrastructure: BitmapUtils, RoaringBitmapSerializer, AbstractRbAggFunction #3319
Changes from all commits
3530dbd
28d260f
b8cd74a
fbe6484
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,79 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.fluss.flink.functions.bitmap; | ||
|
|
||
| import org.apache.fluss.exception.FlussRuntimeException; | ||
|
|
||
| import org.apache.flink.api.common.typeinfo.TypeInformation; | ||
| import org.apache.flink.table.annotation.DataTypeHint; | ||
| import org.apache.flink.table.annotation.FunctionHint; | ||
| import org.apache.flink.table.functions.AggregateFunction; | ||
| import org.roaringbitmap.RoaringBitmap; | ||
|
|
||
| import javax.annotation.Nullable; | ||
|
|
||
| import java.io.IOException; | ||
|
|
||
| /** | ||
| * Shared base for bitmap aggregate UDFs that use {@link RoaringBitmap} as the accumulator. | ||
| * | ||
| * <p>The {@code @FunctionHint} annotation with {@code accumulator = @DataTypeHint("RAW")} tells | ||
| * Flink's Table planner to skip reflection-based POJO extraction and instead use the {@link | ||
| * TypeInformation} returned by {@link #getAccumulatorType()}, which provides the custom {@link | ||
| * RoaringBitmapSerializer}. Without this annotation, Flink attempts POJO field extraction on | ||
| * RoaringBitmap and fails. | ||
| */ | ||
| @FunctionHint(accumulator = @DataTypeHint(value = "RAW", bridgedTo = RoaringBitmap.class)) | ||
| abstract class AbstractRbAggFunction extends AggregateFunction<byte[], RoaringBitmap> { | ||
|
|
||
| @Override | ||
| public RoaringBitmap createAccumulator() { | ||
| return new RoaringBitmap(); | ||
| } | ||
|
|
||
| /** Merges partial accumulators, required for two-phase aggregation in the Flink Table API. */ | ||
| public void merge(RoaringBitmap acc, Iterable<RoaringBitmap> it) { | ||
| for (RoaringBitmap other : it) { | ||
| if (other != null) { | ||
| acc.or(other); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| public void resetAccumulator(RoaringBitmap acc) { | ||
| acc.clear(); | ||
| } | ||
|
|
||
| @Override | ||
| @Nullable | ||
| public byte[] getValue(RoaringBitmap acc) { | ||
| if (acc == null || acc.isEmpty()) { | ||
| return null; | ||
| } | ||
| try { | ||
| return BitmapUtils.toBytes(acc); | ||
| } catch (IOException e) { | ||
| throw new FlussRuntimeException("Failed to serialize bitmap accumulator.", e); | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public TypeInformation<RoaringBitmap> getAccumulatorType() { | ||
| return RoaringBitmapTypeInfo.INSTANCE; | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,70 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.fluss.flink.functions.bitmap; | ||
|
|
||
| import org.roaringbitmap.RoaringBitmap; | ||
|
|
||
| import javax.annotation.Nullable; | ||
|
|
||
| import java.io.IOException; | ||
| import java.nio.ByteBuffer; | ||
|
|
||
| /** | ||
| * Utility methods for serializing and deserializing {@link RoaringBitmap}. | ||
| * | ||
| * <p>Uses the ByteBuffer-based serialization approach, which is the preferred method recommended by | ||
| * the RoaringBitmap library. This format is compatible with the server-side {@code | ||
| * RoaringBitmapUtils.serializeRoaringBitmap32} used by {@code FieldRoaringBitmap32Agg}. | ||
| */ | ||
| public final class BitmapUtils { | ||
|
|
||
| private BitmapUtils() {} | ||
|
|
||
| /** | ||
| * Serializes a {@link RoaringBitmap} to a byte array. | ||
| * | ||
| * @param bitmap the bitmap to serialize; null returns null | ||
| * @return serialized byte array, or null if input is null | ||
| */ | ||
| @Nullable | ||
| public static byte[] toBytes(@Nullable RoaringBitmap bitmap) throws IOException { | ||
| if (bitmap == null) { | ||
| return null; | ||
| } | ||
| bitmap.runOptimize(); | ||
| ByteBuffer buffer = ByteBuffer.allocate(bitmap.serializedSizeInBytes()); | ||
| bitmap.serialize(buffer); | ||
| return buffer.array(); | ||
| } | ||
|
|
||
| /** | ||
| * Deserializes a {@link RoaringBitmap} from a byte array. | ||
| * | ||
| * @param bytes the serialized bitmap bytes; null returns null | ||
| * @return deserialized RoaringBitmap, or null if input is null | ||
| */ | ||
| @Nullable | ||
| public static RoaringBitmap fromBytes(@Nullable byte[] bytes) throws IOException { | ||
| if (bytes == null) { | ||
| return null; | ||
| } | ||
| RoaringBitmap bitmap = new RoaringBitmap(); | ||
| bitmap.deserialize(ByteBuffer.wrap(bytes)); | ||
| return bitmap; | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,115 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.fluss.flink.functions.bitmap; | ||
|
|
||
| import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot; | ||
| import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; | ||
| import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton; | ||
| import org.apache.flink.core.memory.DataInputView; | ||
| import org.apache.flink.core.memory.DataOutputView; | ||
| import org.roaringbitmap.RoaringBitmap; | ||
|
|
||
| import javax.annotation.concurrent.ThreadSafe; | ||
|
|
||
| import java.io.IOException; | ||
|
|
||
| /** | ||
| * Flink {@link org.apache.flink.api.common.typeutils.TypeSerializer} for {@link RoaringBitmap}. | ||
| * | ||
| * <p>Used as the accumulator serializer for bitmap aggregate functions to ensure correct | ||
| * checkpoint/savepoint behavior. Without a custom serializer, Flink falls back to Kryo which is | ||
| * sensitive to internal class layout changes across RoaringBitmap library versions. | ||
| */ | ||
| @ThreadSafe | ||
| public final class RoaringBitmapSerializer extends TypeSerializerSingleton<RoaringBitmap> { | ||
|
|
||
| public static final RoaringBitmapSerializer INSTANCE = new RoaringBitmapSerializer(); | ||
|
|
||
| private static final long serialVersionUID = 1L; | ||
|
|
||
| private RoaringBitmapSerializer() {} | ||
|
|
||
| @Override | ||
| public boolean isImmutableType() { | ||
| return false; | ||
| } | ||
|
|
||
| @Override | ||
| public RoaringBitmap createInstance() { | ||
| return new RoaringBitmap(); | ||
| } | ||
|
|
||
| @Override | ||
| public RoaringBitmap copy(RoaringBitmap from) { | ||
| return from.clone(); | ||
| } | ||
|
|
||
| @Override | ||
| public RoaringBitmap copy(RoaringBitmap from, RoaringBitmap reuse) { | ||
| return from.clone(); | ||
| } | ||
|
|
||
| @Override | ||
| public int getLength() { | ||
| return -1; | ||
| } | ||
|
|
||
| @Override | ||
| public void serialize(RoaringBitmap record, DataOutputView target) throws IOException { | ||
| int size = record.serializedSizeInBytes(); | ||
| target.writeInt(size); | ||
| byte[] bytes = BitmapUtils.toBytes(record); | ||
| target.write(bytes); | ||
| } | ||
|
|
||
| @Override | ||
| public RoaringBitmap deserialize(DataInputView source) throws IOException { | ||
| int size = source.readInt(); | ||
| byte[] bytes = new byte[size]; | ||
| source.readFully(bytes); | ||
| return BitmapUtils.fromBytes(bytes); | ||
| } | ||
|
|
||
| @Override | ||
| public RoaringBitmap deserialize(RoaringBitmap reuse, DataInputView source) throws IOException { | ||
| return deserialize(source); | ||
| } | ||
|
|
||
| @Override | ||
| public void copy(DataInputView source, DataOutputView target) throws IOException { | ||
| int size = source.readInt(); | ||
| target.writeInt(size); | ||
| byte[] buffer = new byte[size]; | ||
| source.readFully(buffer); | ||
| target.write(buffer); | ||
| } | ||
|
|
||
| @Override | ||
| public TypeSerializerSnapshot<RoaringBitmap> snapshotConfiguration() { | ||
| return new RoaringBitmapSerializerSnapshot(); | ||
| } | ||
|
|
||
| /** Snapshot for {@link RoaringBitmapSerializer}. */ | ||
| public static final class RoaringBitmapSerializerSnapshot | ||
| extends SimpleTypeSerializerSnapshot<RoaringBitmap> { | ||
|
|
||
| public RoaringBitmapSerializerSnapshot() { | ||
| super(() -> INSTANCE); | ||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,98 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.fluss.flink.functions.bitmap; | ||
|
|
||
| import org.apache.flink.api.common.ExecutionConfig; | ||
| import org.apache.flink.api.common.typeinfo.TypeInformation; | ||
| import org.apache.flink.api.common.typeutils.TypeSerializer; | ||
| import org.roaringbitmap.RoaringBitmap; | ||
|
|
||
| import javax.annotation.concurrent.ThreadSafe; | ||
|
|
||
| import java.util.Objects; | ||
|
|
||
| /** | ||
| * {@link TypeInformation} for {@link RoaringBitmap}. | ||
| * | ||
| * <p>Provides the custom {@link RoaringBitmapSerializer} to Flink's type system, ensuring correct | ||
| * checkpoint and savepoint behavior for bitmap aggregate function accumulators. | ||
| */ | ||
| @ThreadSafe | ||
| public final class RoaringBitmapTypeInfo extends TypeInformation<RoaringBitmap> { | ||
|
|
||
| public static final RoaringBitmapTypeInfo INSTANCE = new RoaringBitmapTypeInfo(); | ||
|
|
||
| private static final long serialVersionUID = 1L; | ||
|
|
||
| private RoaringBitmapTypeInfo() {} | ||
|
|
||
| @Override | ||
| public boolean isBasicType() { | ||
| return false; | ||
| } | ||
|
|
||
| @Override | ||
| public boolean isTupleType() { | ||
| return false; | ||
| } | ||
|
|
||
| @Override | ||
| public int getArity() { | ||
| return 1; | ||
| } | ||
|
|
||
| @Override | ||
| public int getTotalFields() { | ||
| return 1; | ||
| } | ||
|
|
||
| @Override | ||
| public Class<RoaringBitmap> getTypeClass() { | ||
| return RoaringBitmap.class; | ||
| } | ||
|
|
||
| @Override | ||
| public boolean isKeyType() { | ||
| return false; | ||
| } | ||
|
|
||
| @Override | ||
| public TypeSerializer<RoaringBitmap> createSerializer(ExecutionConfig config) { | ||
| return RoaringBitmapSerializer.INSTANCE; | ||
| } | ||
|
|
||
| @Override | ||
| public String toString() { | ||
| return "RoaringBitmapTypeInfo"; | ||
| } | ||
|
|
||
| @Override | ||
| public boolean equals(Object obj) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. equals doesn't delegate through Either: return obj instanceof RoaringBitmapTypeInfo && ((RoaringBitmapTypeInfo) obj).canEqual(this); or simplify the whole class to reference equality since the constructor is private and INSTANCE is the only instance. Also worth adding @threadsafe here and on RoaringBitmapSerializer
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed equals to delegate through canEqual. Added @threadsafe to both classes. |
||
| return obj instanceof RoaringBitmapTypeInfo && ((RoaringBitmapTypeInfo) obj).canEqual(this); | ||
| } | ||
|
|
||
| @Override | ||
| public int hashCode() { | ||
| return Objects.hash(getTypeClass()); | ||
| } | ||
|
|
||
| @Override | ||
| public boolean canEqual(Object obj) { | ||
| return obj instanceof RoaringBitmapTypeInfo; | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
record.runOptimize()is called here and then BitmapUtils.toBytes(record) also calls runOptimize().There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed, removed the redundant runOptimize() call from serialize(); BitmapUtils.toBytes() handles it.