[FIP-37] Add bitmap infrastructure: BitmapUtils, RoaringBitmapSerializer, AbstractRbAggFunction#3319
[FIP-37] Add bitmap infrastructure: BitmapUtils, RoaringBitmapSerializer, AbstractRbAggFunction#3319Prajwal-banakar wants to merge 4 commits into
Conversation
|
Hi @polyzos @wuchong @platinumhamburg could you please help review here? |
| try { | ||
| return BitmapUtils.toBytes(acc); | ||
| } catch (IOException e) { | ||
| throw new RuntimeException("Failed to serialize bitmap accumulator.", e); |
There was a problem hiding this comment.
Done switched to FlussRuntimeException
| return new RoaringBitmap(); | ||
| } | ||
|
|
||
| /** Merges multiple accumulators — required for session window aggregation. */ |
There was a problem hiding this comment.
"required for session window aggregation" is misleading.
The merge operation is required for any two-phase / batch / merge-capable aggregation in the Flink Table API, not specifically session windows
There was a problem hiding this comment.
Updated the Javadoc
| } | ||
|
|
||
| @Override | ||
| public void serialize(RoaringBitmap record, DataOutputView target) throws IOException { |
There was a problem hiding this comment.
record.runOptimize() is called here and then BitmapUtils.toBytes(record) also calls runOptimize().
There was a problem hiding this comment.
Fixed, removed the redundant runOptimize() call from serialize(); BitmapUtils.toBytes() handles it.
| } | ||
|
|
||
| @Override | ||
| public boolean equals(Object obj) { |
There was a problem hiding this comment.
equals doesn't delegate through canEqual, which breaks Flink's documented symmetry contract for TypeInformation subclasses.
Either: return obj instanceof RoaringBitmapTypeInfo && ((RoaringBitmapTypeInfo) obj).canEqual(this); or simplify the whole class to reference equality since the constructor is private and INSTANCE is the only instance. Also worth adding @threadsafe here and on RoaringBitmapSerializer
There was a problem hiding this comment.
Fixed equals to delegate through canEqual. Added @threadsafe to both classes.
| <exclude>org.apache.fluss.flink.tiering.FlussLakeTieringEntrypoint</exclude> | ||
| <exclude>org.apache.fluss.flink.tiering.FlussLakeTiering</exclude> | ||
| <!-- end exclude for flink tiering service --> | ||
| <exclude> |
There was a problem hiding this comment.
instead of excluding can we add some test, for example for
merge / getValue / resetAccumulator? maybe ship with just only one concrete implementation?
without a consumer, there's no way to validate that the planner actually accepts the RAW(... bridgedTo = RoaringBitmap.class) hint together with the custom TypeInformation.
WDYT?
There was a problem hiding this comment.
Added a minimal concrete TestRbAggFunction in the test class and covered merge, getValue, resetAccumulator, and getAccumulatorType. Removed the jacoco exclude.
|
@Prajwal-banakar great work so far .. I just added a few comments, as small impovements, but it's almost there |
|
Hi @polyzos Thanks for the review, addresses all the comments, Please take another look when you get a time! |
Purpose
Linked issue: Part of #3289
This PR adds the foundational infrastructure for FIP-37 RoaringBitmap SQL function implementation. It provides the serialization utilities, custom Flink type serializer, and base aggregate function class that will be used by the bitmap SQL functions (
rb_build_agg,rb_or_agg,rb_and_agg, etc.) in subsequent PRs.Brief change log
Added the following infrastructure files in
fluss-flink/fluss-flink-common:RoaringBitmapusing the ByteBuffer-based approach, which matches the server-sideRoaringBitmapUtils.serializeRoaringBitmap32format used byFieldRoaringBitmap32Aggfor wire compatibility.TypeSerializerforRoaringBitmapaccumulators to ensure correct checkpoint/savepoint behavior. Without this, Flink falls back to Kryo which is sensitive to internal class layout changes across RoaringBitmap library versions.TypeInformationwrapper that provides the custom serializer to Flink's type system.@FunctionHint(accumulator = @DataTypeHint(value = "RAW", bridgedTo = RoaringBitmap.class))annotation. This tells Flink's Table planner to skip reflection-based POJO field extraction on RoaringBitmap and use the customTypeInformationinstead.RoaringBitmapSerializerandRoaringBitmapTypeInfobehavior, including serialization round-trip, copy methods, snapshot configuration, and type information checks.AbstractRbAggFunction, since it is abstract and cannot be directly covered by unit tests.RoaringBitmapdependency (version 1.3.0 from root pom).The aggregate functions (
rb_build_agg,rb_or_agg,rb_and_agg) and catalog registration will follow in subsequent PRs linked to this issue.Tests
Unit tests added and passing:
BitmapUtilsTest.testNullInputToBytes()- null handling.BitmapUtilsTest.testNullInputFromBytes()- null handling.BitmapUtilsTest.testEmptyBitmapRoundTrip()- empty bitmap serialization.BitmapUtilsTest.testKnownValuesRoundTrip()- correctness with known values.BitmapUtilsTest.testLargeCardinality()- performance with 100K elements.BitmapUtilsTest.testFormatCompatibleWithServerSerialization()- wire compatibility.Tests are still running and this PR description will be updated with the final verification results once the full validation completes.
Verified with:
./mvnw spotless:apply -pl fluss-flink/fluss-flink-common- BUILD SUCCESS../mvnw test -pl fluss-flink/fluss-flink-common -Dtest=BitmapUtilsTest- BUILD SUCCESS../mvnw clean install -pl fluss-flink/fluss-flink-common -DskipTests- BUILD SUCCESS../mvnw clean package -DskipTests(full project build) - BUILD SUCCESS.API and Format
This change does not affect any public API or storage format. It adds internal infrastructure utilities that will be used by future bitmap SQL functions.
Documentation
This change does not introduce new user-facing features yet. The bitmap SQL functions (
rb_build_agg,rb_or_agg,rb_and_agg) and their documentation will be added in follow-up PRs.