Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,10 @@ public enum AggFunctionType {

// Roaring bitmap aggregation
RBM32,
RBM64;
RBM64,

// Apache DataSketches aggregation
HLL_SKETCH;

// ------------------------------------------------------------------------------------------

Expand Down Expand Up @@ -120,6 +123,7 @@ public DataTypeRoot[] getSupportedDataTypeRoots() {
return new DataTypeRoot[] {DataTypeRoot.BOOLEAN};
case RBM32:
case RBM64:
case HLL_SKETCH:
return new DataTypeRoot[] {DataTypeRoot.BYTES};
case LISTAGG:
case STRING_AGG:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,22 @@ public static AggFunction RBM64() {
return new AggFunction(AggFunctionType.RBM64, null);
}

// ===================================================================================
// Apache DataSketches Aggregation Functions
// ===================================================================================

/**
* Creates a HLL_SKETCH aggregation function that merges serialized Apache DataSketches HLL
* sketches.
*
* <p>Supported data types: BYTES
*
* @return a HLL_SKETCH aggregation function
*/
public static AggFunction HLL_SKETCH() {
return new AggFunction(AggFunctionType.HLL_SKETCH, null);
}

// ===================================================================================
// Internal Factory Methods
// ===================================================================================
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -384,8 +384,19 @@ void testValidateAggFunctionWithDataType() {
.hasMessageContaining(
Arrays.deepToString(AggFunctionType.MAX.getSupportedDataTypeRoots()));

assertThatThrownBy(
() ->
AggFunctions.of(AggFunctionType.HLL_SKETCH, params)
.validateDataType(DataTypes.STRING()))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("column must be part of")
.hasMessageContaining(
Arrays.deepToString(
AggFunctionType.HLL_SKETCH.getSupportedDataTypeRoots()));

// valid case
AggFunctions.of(AggFunctionType.LAST_VALUE, params).validateDataType(DataTypes.STRING());
AggFunctions.of(AggFunctionType.LISTAGG, params).validateDataType(DataTypes.STRING());
AggFunctions.of(AggFunctionType.HLL_SKETCH, params).validateDataType(DataTypes.BYTES());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,18 @@ void testParseFunctionWithoutParameters() {
assertThat(result.get().hasParameters()).isFalse();
}

@Test
void testParseHllSketchFunction() {
Configuration options = new Configuration();
options.setString("fields.user_hll.agg", "hll_sketch");

Optional<AggFunction> result =
FlinkAggFunctionParser.parseAggFunction("user_hll", DataTypes.BYTES(), options);

assertThat(result).isPresent();
assertThat(result.get()).isEqualTo(AggFunctions.HLL_SKETCH());
}

@Test
void testParseFunctionWithParameters() {
Configuration options = new Configuration();
Expand Down Expand Up @@ -131,6 +143,20 @@ void testParseInvalidFunctionName() {
.hasMessageContaining("invalid_function");
}

@Test
void testParseHllSketchFunctionWithInvalidType() {
Configuration options = new Configuration();
options.setString("fields.user_hll.agg", "hll_sketch");

assertThatThrownBy(
() ->
FlinkAggFunctionParser.parseAggFunction(
"user_hll", DataTypes.STRING(), options))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("Invalid aggregation function configuration")
.hasMessageContaining("hll_sketch");
}

@Test
void testParseEmptyFunctionName() {
Configuration options = new Configuration();
Expand Down
8 changes: 7 additions & 1 deletion fluss-server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,12 @@
<version>${roaringbitmap.version}</version>
</dependency>

<dependency>
<groupId>org.apache.datasketches</groupId>
<artifactId>datasketches-java</artifactId>
<version>${datasketches-java.version}</version>
</dependency>

<!--
we will start zookeeper server when start a local cluster,
need include this module for starting zookeeper requires it -->
Expand Down Expand Up @@ -168,4 +174,4 @@
</plugins>
</build>

</project>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.fluss.server.kv.rowmerger.aggregate.factory;

import org.apache.fluss.metadata.AggFunction;
import org.apache.fluss.metadata.AggFunctionType;
import org.apache.fluss.server.kv.rowmerger.aggregate.functions.FieldHllSketchAgg;
import org.apache.fluss.types.DataType;
import org.apache.fluss.types.DataTypeRoot;

import static org.apache.fluss.utils.Preconditions.checkArgument;

/** Factory for {@link FieldHllSketchAgg}. */
public class FieldHllSketchAggFactory implements FieldAggregatorFactory {

@Override
public FieldHllSketchAgg create(DataType fieldType, AggFunction aggFunction) {
checkArgument(
fieldType.getTypeRoot() == DataTypeRoot.BYTES,
"Data type for hll_sketch column must be 'BytesType' but was '%s'.",
fieldType);
return new FieldHllSketchAgg(fieldType);
}

@Override
public String identifier() {
return AggFunctionType.HLL_SKETCH.toString();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.fluss.server.kv.rowmerger.aggregate.functions;

import org.apache.fluss.types.DataType;

import org.apache.datasketches.hll.HllSketch;
import org.apache.datasketches.hll.Union;

/** HLL sketch aggregator for serialized Apache DataSketches HLL sketches. */
public class FieldHllSketchAgg extends FieldAggregator {

private static final long serialVersionUID = 1L;

public FieldHllSketchAgg(DataType dataType) {
super(dataType);
}

@Override
public Object agg(Object accumulator, Object inputField) {
if (accumulator == null || inputField == null) {
return accumulator == null ? inputField : accumulator;
}

try {
HllSketch accumulatorSketch = HllSketch.heapify((byte[]) accumulator);
HllSketch inputSketch = HllSketch.heapify((byte[]) inputField);
int lgMaxK = Math.max(accumulatorSketch.getLgConfigK(), inputSketch.getLgConfigK());
Union union = new Union(lgMaxK);
union.update(accumulatorSketch);
union.update(inputSketch);
return union.getResult().toCompactByteArray();
} catch (RuntimeException e) {
throw new RuntimeException("Unable to deserialize or merge HLL sketch bytes.", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,4 @@ org.apache.fluss.server.kv.rowmerger.aggregate.factory.FieldListaggAggFactory
org.apache.fluss.server.kv.rowmerger.aggregate.factory.FieldStringAggFactory
org.apache.fluss.server.kv.rowmerger.aggregate.factory.FieldRoaringBitmap32AggFactory
org.apache.fluss.server.kv.rowmerger.aggregate.factory.FieldRoaringBitmap64AggFactory
org.apache.fluss.server.kv.rowmerger.aggregate.factory.FieldHllSketchAggFactory
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,13 @@
import org.apache.fluss.row.TimestampLtz;
import org.apache.fluss.row.TimestampNtz;
import org.apache.fluss.server.kv.rowmerger.AggregateRowMerger;
import org.apache.fluss.server.kv.rowmerger.aggregate.functions.FieldHllSketchAgg;
import org.apache.fluss.server.utils.RoaringBitmapUtils;
import org.apache.fluss.types.DataType;
import org.apache.fluss.types.DataTypeChecks;
import org.apache.fluss.types.DataTypes;

import org.apache.datasketches.hll.HllSketch;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
Expand All @@ -52,6 +54,8 @@

import static org.apache.fluss.testutils.DataTestUtils.compactedRow;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.assertj.core.api.Assertions.within;

/** Parameterized tests for all aggregation functions with different data types. */
class FieldAggregatorParameterizedTest {
Expand Down Expand Up @@ -786,6 +790,72 @@ void testRbm64Aggregation() throws IOException {
assertThat(merged.row.getBinary(1, expectedBytes.length)).isEqualTo(expectedBytes);
}

@Test
void testHllSketchAggregation() {
Schema schema =
Schema.newBuilder()
.column("id", DataTypes.INT())
.column("value", DataTypes.BYTES(), AggFunctions.HLL_SKETCH())
.primaryKey("id")
.build();

TableConfig tableConfig = new TableConfig(new Configuration());
AggregateRowMerger merger = createMerger(schema, tableConfig);

HllSketch sketch1 = new HllSketch(12);
sketch1.update(1L);
sketch1.update(2L);
HllSketch sketch2 = new HllSketch(12);
sketch2.update(2L);
sketch2.update(3L);

BinaryRow row1 =
compactedRow(schema.getRowType(), new Object[] {1, sketch1.toCompactByteArray()});
BinaryRow row2 =
compactedRow(schema.getRowType(), new Object[] {1, sketch2.toCompactByteArray()});

BinaryValue merged = merger.merge(toBinaryValue(row1), toBinaryValue(row2));

HllSketch mergedSketch = HllSketch.heapify(merged.row.getBytes(1));
assertThat(mergedSketch.getEstimate()).isCloseTo(3.0, within(0.01));
}

@Test
void testHllSketchAggregationWithNull() {
Schema schema =
Schema.newBuilder()
.column("id", DataTypes.INT())
.column("value", DataTypes.BYTES(), AggFunctions.HLL_SKETCH())
.primaryKey("id")
.build();

TableConfig tableConfig = new TableConfig(new Configuration());
AggregateRowMerger merger = createMerger(schema, tableConfig);

HllSketch sketch = new HllSketch(12);
sketch.update(42L);
byte[] sketchBytes = sketch.toCompactByteArray();

BinaryRow row1 = compactedRow(schema.getRowType(), new Object[] {1, sketchBytes});
BinaryRow row2 = compactedRow(schema.getRowType(), new Object[] {1, null});

BinaryValue merged = merger.merge(toBinaryValue(row1), toBinaryValue(row2));

assertThat(merged.row.getBytes(1)).isEqualTo(sketchBytes);
}

@Test
void testHllSketchAggregationWithInvalidPayload() {
FieldHllSketchAgg aggregator = new FieldHllSketchAgg(DataTypes.BYTES());

HllSketch sketch = new HllSketch(12);
sketch.update(1L);

assertThatThrownBy(() -> aggregator.agg(sketch.toCompactByteArray(), new byte[] {1, 2, 3}))
.isInstanceOf(RuntimeException.class)
.hasMessageContaining("Unable to deserialize or merge HLL sketch bytes");
}

// ===================================================================================
// Helper Methods
// ===================================================================================
Expand Down
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@
<iceberg.version>1.10.1</iceberg.version>
<hudi.version>1.1.0</hudi.version>
<roaringbitmap.version>1.3.0</roaringbitmap.version>
<datasketches-java.version>6.2.0</datasketches-java.version>

<!-- spark & scala -->
<scala212.version>2.12.18</scala212.version>
Expand Down
54 changes: 54 additions & 0 deletions website/docs/table-design/merge-engines/aggregation.md
Original file line number Diff line number Diff line change
Expand Up @@ -970,6 +970,60 @@ TableDescriptor.builder()
</TabItem>
</Tabs>

### hll_sketch

Aggregates serialized Apache DataSketches HLL sketch values by union.

- **Supported Data Types**: `BYTES`
- **Behavior**: Merges incoming HLL sketches with the accumulator and stores compact serialized sketch bytes
- **Null Handling**: Null values are ignored

**Example:**
<Tabs>
<TabItem value="flink-sql" label="Flink SQL" default>

```sql
CREATE TABLE daily_user_metrics (
metric_day STRING,
user_hll BYTES,
PRIMARY KEY (metric_day) NOT ENFORCED
) WITH (
'table.merge-engine' = 'aggregation',
'fields.user_hll.agg' = 'hll_sketch'
);
```

:::note
`hll_sketch` expects values to be serialized Apache DataSketches HLL sketches. It does not build sketches from raw input values by itself. Flink/Spark SQL helper functions for building and estimating sketches can be added separately.
:::

</TabItem>
<TabItem value="java-client" label="Java Client">

```java
Schema schema = Schema.newBuilder()
.column("metric_day", DataTypes.STRING())
.column("user_hll", DataTypes.BYTES(), AggFunctions.HLL_SKETCH())
.primaryKey("metric_day")
.build();

TableDescriptor.builder()
.schema(schema)
.property("table.merge-engine", "aggregation")
.build();

// Serialize HLL sketches using the Apache DataSketches library
// HllSketch sketch1 = new HllSketch(12);
// sketch1.update(userId1);
// byte[] bytes1 = sketch1.toCompactByteArray();

// Input: (2026-05-16, hll{user_a,user_b}), (2026-05-16, hll{user_b,user_c})
// Result: (2026-05-16, hll{user_a,user_b,user_c}) -- union of the two sketches
```

</TabItem>
</Tabs>

### bool_and

Evaluates whether all boolean values in a set are true (logical AND).
Expand Down