diff --git a/fluss-common/src/main/java/org/apache/fluss/metadata/AggFunctionType.java b/fluss-common/src/main/java/org/apache/fluss/metadata/AggFunctionType.java index f0026fbba7..4156a55c58 100644 --- a/fluss-common/src/main/java/org/apache/fluss/metadata/AggFunctionType.java +++ b/fluss-common/src/main/java/org/apache/fluss/metadata/AggFunctionType.java @@ -59,7 +59,10 @@ public enum AggFunctionType { // Roaring bitmap aggregation RBM32, - RBM64; + RBM64, + + // Apache DataSketches aggregation + HLL_SKETCH; // ------------------------------------------------------------------------------------------ @@ -120,6 +123,7 @@ public DataTypeRoot[] getSupportedDataTypeRoots() { return new DataTypeRoot[] {DataTypeRoot.BOOLEAN}; case RBM32: case RBM64: + case HLL_SKETCH: return new DataTypeRoot[] {DataTypeRoot.BYTES}; case LISTAGG: case STRING_AGG: diff --git a/fluss-common/src/main/java/org/apache/fluss/metadata/AggFunctions.java b/fluss-common/src/main/java/org/apache/fluss/metadata/AggFunctions.java index 86a704ef64..4a233db7fa 100644 --- a/fluss-common/src/main/java/org/apache/fluss/metadata/AggFunctions.java +++ b/fluss-common/src/main/java/org/apache/fluss/metadata/AggFunctions.java @@ -300,6 +300,22 @@ public static AggFunction RBM64() { return new AggFunction(AggFunctionType.RBM64, null); } + // =================================================================================== + // Apache DataSketches Aggregation Functions + // =================================================================================== + + /** + * Creates a HLL_SKETCH aggregation function that merges serialized Apache DataSketches HLL + * sketches. + * + *

Supported data types: BYTES + * + * @return a HLL_SKETCH aggregation function + */ + public static AggFunction HLL_SKETCH() { + return new AggFunction(AggFunctionType.HLL_SKETCH, null); + } + // =================================================================================== // Internal Factory Methods // =================================================================================== diff --git a/fluss-common/src/test/java/org/apache/fluss/metadata/TableDescriptorTest.java b/fluss-common/src/test/java/org/apache/fluss/metadata/TableDescriptorTest.java index bd99610752..69222c853f 100644 --- a/fluss-common/src/test/java/org/apache/fluss/metadata/TableDescriptorTest.java +++ b/fluss-common/src/test/java/org/apache/fluss/metadata/TableDescriptorTest.java @@ -384,8 +384,19 @@ void testValidateAggFunctionWithDataType() { .hasMessageContaining( Arrays.deepToString(AggFunctionType.MAX.getSupportedDataTypeRoots())); + assertThatThrownBy( + () -> + AggFunctions.of(AggFunctionType.HLL_SKETCH, params) + .validateDataType(DataTypes.STRING())) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("column must be part of") + .hasMessageContaining( + Arrays.deepToString( + AggFunctionType.HLL_SKETCH.getSupportedDataTypeRoots())); + // valid case AggFunctions.of(AggFunctionType.LAST_VALUE, params).validateDataType(DataTypes.STRING()); AggFunctions.of(AggFunctionType.LISTAGG, params).validateDataType(DataTypes.STRING()); + AggFunctions.of(AggFunctionType.HLL_SKETCH, params).validateDataType(DataTypes.BYTES()); } } diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkAggFunctionParserTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkAggFunctionParserTest.java index fe019a08ad..6a769fbedc 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkAggFunctionParserTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkAggFunctionParserTest.java @@ -55,6 +55,18 @@ void testParseFunctionWithoutParameters() { assertThat(result.get().hasParameters()).isFalse(); } + @Test + void testParseHllSketchFunction() { + Configuration options = new Configuration(); + options.setString("fields.user_hll.agg", "hll_sketch"); + + Optional result = + FlinkAggFunctionParser.parseAggFunction("user_hll", DataTypes.BYTES(), options); + + assertThat(result).isPresent(); + assertThat(result.get()).isEqualTo(AggFunctions.HLL_SKETCH()); + } + @Test void testParseFunctionWithParameters() { Configuration options = new Configuration(); @@ -131,6 +143,20 @@ void testParseInvalidFunctionName() { .hasMessageContaining("invalid_function"); } + @Test + void testParseHllSketchFunctionWithInvalidType() { + Configuration options = new Configuration(); + options.setString("fields.user_hll.agg", "hll_sketch"); + + assertThatThrownBy( + () -> + FlinkAggFunctionParser.parseAggFunction( + "user_hll", DataTypes.STRING(), options)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Invalid aggregation function configuration") + .hasMessageContaining("hll_sketch"); + } + @Test void testParseEmptyFunctionName() { Configuration options = new Configuration(); diff --git a/fluss-server/pom.xml b/fluss-server/pom.xml index 9664611f69..d4bea0b43d 100644 --- a/fluss-server/pom.xml +++ b/fluss-server/pom.xml @@ -78,6 +78,12 @@ ${roaringbitmap.version} + + org.apache.datasketches + datasketches-java + ${datasketches-java.version} + + @@ -168,4 +174,4 @@ - \ No newline at end of file + diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/aggregate/factory/FieldHllSketchAggFactory.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/aggregate/factory/FieldHllSketchAggFactory.java new file mode 100644 index 0000000000..923f26cc96 --- /dev/null +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/aggregate/factory/FieldHllSketchAggFactory.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.kv.rowmerger.aggregate.factory; + +import org.apache.fluss.metadata.AggFunction; +import org.apache.fluss.metadata.AggFunctionType; +import org.apache.fluss.server.kv.rowmerger.aggregate.functions.FieldHllSketchAgg; +import org.apache.fluss.types.DataType; +import org.apache.fluss.types.DataTypeRoot; + +import static org.apache.fluss.utils.Preconditions.checkArgument; + +/** Factory for {@link FieldHllSketchAgg}. */ +public class FieldHllSketchAggFactory implements FieldAggregatorFactory { + + @Override + public FieldHllSketchAgg create(DataType fieldType, AggFunction aggFunction) { + checkArgument( + fieldType.getTypeRoot() == DataTypeRoot.BYTES, + "Data type for hll_sketch column must be 'BytesType' but was '%s'.", + fieldType); + return new FieldHllSketchAgg(fieldType); + } + + @Override + public String identifier() { + return AggFunctionType.HLL_SKETCH.toString(); + } +} diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/aggregate/functions/FieldHllSketchAgg.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/aggregate/functions/FieldHllSketchAgg.java new file mode 100644 index 0000000000..682363bc72 --- /dev/null +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/aggregate/functions/FieldHllSketchAgg.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.kv.rowmerger.aggregate.functions; + +import org.apache.fluss.types.DataType; + +import org.apache.datasketches.hll.HllSketch; +import org.apache.datasketches.hll.Union; + +/** HLL sketch aggregator for serialized Apache DataSketches HLL sketches. */ +public class FieldHllSketchAgg extends FieldAggregator { + + private static final long serialVersionUID = 1L; + + public FieldHllSketchAgg(DataType dataType) { + super(dataType); + } + + @Override + public Object agg(Object accumulator, Object inputField) { + if (accumulator == null || inputField == null) { + return accumulator == null ? inputField : accumulator; + } + + try { + HllSketch accumulatorSketch = HllSketch.heapify((byte[]) accumulator); + HllSketch inputSketch = HllSketch.heapify((byte[]) inputField); + int lgMaxK = Math.max(accumulatorSketch.getLgConfigK(), inputSketch.getLgConfigK()); + Union union = new Union(lgMaxK); + union.update(accumulatorSketch); + union.update(inputSketch); + return union.getResult().toCompactByteArray(); + } catch (RuntimeException e) { + throw new RuntimeException("Unable to deserialize or merge HLL sketch bytes.", e); + } + } +} diff --git a/fluss-server/src/main/resources/META-INF/services/org.apache.fluss.server.kv.rowmerger.aggregate.factory.FieldAggregatorFactory b/fluss-server/src/main/resources/META-INF/services/org.apache.fluss.server.kv.rowmerger.aggregate.factory.FieldAggregatorFactory index cb4475ff3a..07e68bae14 100644 --- a/fluss-server/src/main/resources/META-INF/services/org.apache.fluss.server.kv.rowmerger.aggregate.factory.FieldAggregatorFactory +++ b/fluss-server/src/main/resources/META-INF/services/org.apache.fluss.server.kv.rowmerger.aggregate.factory.FieldAggregatorFactory @@ -30,3 +30,4 @@ org.apache.fluss.server.kv.rowmerger.aggregate.factory.FieldListaggAggFactory org.apache.fluss.server.kv.rowmerger.aggregate.factory.FieldStringAggFactory org.apache.fluss.server.kv.rowmerger.aggregate.factory.FieldRoaringBitmap32AggFactory org.apache.fluss.server.kv.rowmerger.aggregate.factory.FieldRoaringBitmap64AggFactory +org.apache.fluss.server.kv.rowmerger.aggregate.factory.FieldHllSketchAggFactory diff --git a/fluss-server/src/test/java/org/apache/fluss/server/kv/rowmerger/aggregate/FieldAggregatorParameterizedTest.java b/fluss-server/src/test/java/org/apache/fluss/server/kv/rowmerger/aggregate/FieldAggregatorParameterizedTest.java index 0968dffa08..53a5a2f57f 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/kv/rowmerger/aggregate/FieldAggregatorParameterizedTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/kv/rowmerger/aggregate/FieldAggregatorParameterizedTest.java @@ -29,11 +29,13 @@ import org.apache.fluss.row.TimestampLtz; import org.apache.fluss.row.TimestampNtz; import org.apache.fluss.server.kv.rowmerger.AggregateRowMerger; +import org.apache.fluss.server.kv.rowmerger.aggregate.functions.FieldHllSketchAgg; import org.apache.fluss.server.utils.RoaringBitmapUtils; import org.apache.fluss.types.DataType; import org.apache.fluss.types.DataTypeChecks; import org.apache.fluss.types.DataTypes; +import org.apache.datasketches.hll.HllSketch; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; @@ -52,6 +54,8 @@ import static org.apache.fluss.testutils.DataTestUtils.compactedRow; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.assertj.core.api.Assertions.within; /** Parameterized tests for all aggregation functions with different data types. */ class FieldAggregatorParameterizedTest { @@ -786,6 +790,72 @@ void testRbm64Aggregation() throws IOException { assertThat(merged.row.getBinary(1, expectedBytes.length)).isEqualTo(expectedBytes); } + @Test + void testHllSketchAggregation() { + Schema schema = + Schema.newBuilder() + .column("id", DataTypes.INT()) + .column("value", DataTypes.BYTES(), AggFunctions.HLL_SKETCH()) + .primaryKey("id") + .build(); + + TableConfig tableConfig = new TableConfig(new Configuration()); + AggregateRowMerger merger = createMerger(schema, tableConfig); + + HllSketch sketch1 = new HllSketch(12); + sketch1.update(1L); + sketch1.update(2L); + HllSketch sketch2 = new HllSketch(12); + sketch2.update(2L); + sketch2.update(3L); + + BinaryRow row1 = + compactedRow(schema.getRowType(), new Object[] {1, sketch1.toCompactByteArray()}); + BinaryRow row2 = + compactedRow(schema.getRowType(), new Object[] {1, sketch2.toCompactByteArray()}); + + BinaryValue merged = merger.merge(toBinaryValue(row1), toBinaryValue(row2)); + + HllSketch mergedSketch = HllSketch.heapify(merged.row.getBytes(1)); + assertThat(mergedSketch.getEstimate()).isCloseTo(3.0, within(0.01)); + } + + @Test + void testHllSketchAggregationWithNull() { + Schema schema = + Schema.newBuilder() + .column("id", DataTypes.INT()) + .column("value", DataTypes.BYTES(), AggFunctions.HLL_SKETCH()) + .primaryKey("id") + .build(); + + TableConfig tableConfig = new TableConfig(new Configuration()); + AggregateRowMerger merger = createMerger(schema, tableConfig); + + HllSketch sketch = new HllSketch(12); + sketch.update(42L); + byte[] sketchBytes = sketch.toCompactByteArray(); + + BinaryRow row1 = compactedRow(schema.getRowType(), new Object[] {1, sketchBytes}); + BinaryRow row2 = compactedRow(schema.getRowType(), new Object[] {1, null}); + + BinaryValue merged = merger.merge(toBinaryValue(row1), toBinaryValue(row2)); + + assertThat(merged.row.getBytes(1)).isEqualTo(sketchBytes); + } + + @Test + void testHllSketchAggregationWithInvalidPayload() { + FieldHllSketchAgg aggregator = new FieldHllSketchAgg(DataTypes.BYTES()); + + HllSketch sketch = new HllSketch(12); + sketch.update(1L); + + assertThatThrownBy(() -> aggregator.agg(sketch.toCompactByteArray(), new byte[] {1, 2, 3})) + .isInstanceOf(RuntimeException.class) + .hasMessageContaining("Unable to deserialize or merge HLL sketch bytes"); + } + // =================================================================================== // Helper Methods // =================================================================================== diff --git a/pom.xml b/pom.xml index dc5ec59d62..215be9ccb2 100644 --- a/pom.xml +++ b/pom.xml @@ -94,6 +94,7 @@ 1.10.1 1.1.0 1.3.0 + 6.2.0 2.12.18 diff --git a/website/docs/table-design/merge-engines/aggregation.md b/website/docs/table-design/merge-engines/aggregation.md index 4375404af0..99a20e7fee 100644 --- a/website/docs/table-design/merge-engines/aggregation.md +++ b/website/docs/table-design/merge-engines/aggregation.md @@ -970,6 +970,60 @@ TableDescriptor.builder() +### hll_sketch + +Aggregates serialized Apache DataSketches HLL sketch values by union. + +- **Supported Data Types**: `BYTES` +- **Behavior**: Merges incoming HLL sketches with the accumulator and stores compact serialized sketch bytes +- **Null Handling**: Null values are ignored + +**Example:** + + + +```sql +CREATE TABLE daily_user_metrics ( + metric_day STRING, + user_hll BYTES, + PRIMARY KEY (metric_day) NOT ENFORCED +) WITH ( + 'table.merge-engine' = 'aggregation', + 'fields.user_hll.agg' = 'hll_sketch' +); +``` + +:::note +`hll_sketch` expects values to be serialized Apache DataSketches HLL sketches. It does not build sketches from raw input values by itself. Flink/Spark SQL helper functions for building and estimating sketches can be added separately. +::: + + + + +```java +Schema schema = Schema.newBuilder() + .column("metric_day", DataTypes.STRING()) + .column("user_hll", DataTypes.BYTES(), AggFunctions.HLL_SKETCH()) + .primaryKey("metric_day") + .build(); + +TableDescriptor.builder() + .schema(schema) + .property("table.merge-engine", "aggregation") + .build(); + +// Serialize HLL sketches using the Apache DataSketches library +// HllSketch sketch1 = new HllSketch(12); +// sketch1.update(userId1); +// byte[] bytes1 = sketch1.toCompactByteArray(); + +// Input: (2026-05-16, hll{user_a,user_b}), (2026-05-16, hll{user_b,user_c}) +// Result: (2026-05-16, hll{user_a,user_b,user_c}) -- union of the two sketches +``` + + + + ### bool_and Evaluates whether all boolean values in a set are true (logical AND).