[lake/hudi] Introduce HudiBucketingFunction for bucket strategy#3316
[lake/hudi] Introduce HudiBucketingFunction for bucket strategy#3316fhan688 wants to merge 9 commits into
Conversation
…coder,add four test cases in HudiBucketingFunctionTest
…tingFunctionTest(composite keys、multi data type).
|
please help review, thanks! |
There was a problem hiding this comment.
Pull request overview
This PR introduces Hudi-specific bucket-key encoding and bucket-id calculation so Fluss can route lake-tiered records consistently with Hudi bucket indexing.
Changes:
- Adds
HudiBucketingFunctionand wires it intoBucketingFunction.of(...). - Adds
HudiKeyEncoderand wires it intoKeyEncoder.of(...). - Adds Hudi-based unit tests and a test-scoped Hudi bundle dependency.
Reviewed changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 1 comment.
Show a summary per file
| File | Description |
|---|---|
fluss-common/src/main/java/org/apache/fluss/bucketing/BucketingFunction.java |
Routes HUDI lake format to the new bucketing function. |
fluss-common/src/main/java/org/apache/fluss/bucketing/HudiBucketingFunction.java |
Implements Hudi-style bucket id calculation from encoded hash bytes. |
fluss-common/src/main/java/org/apache/fluss/row/encode/KeyEncoder.java |
Routes HUDI lake format to the new key encoder. |
fluss-common/src/main/java/org/apache/fluss/row/encode/hudi/HudiKeyEncoder.java |
Encodes Hudi bucket keys as a 4-byte hash of stringified key fields. |
fluss-lake/fluss-lake-hudi/pom.xml |
Adds Hudi bundle as a test-scoped dependency. |
fluss-lake/fluss-lake-hudi/src/test/java/org/apache/fluss/bucketing/HudiBucketingFunctionTest.java |
Adds Hudi cross-validation and edge-case tests for encoding and bucketing. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
|
||
| public HudiKeyEncoder(RowType rowType, List<String> keys) { | ||
| // for getting key fields out of fluss internal row | ||
| fieldGetters = new InternalRow.FieldGetter[keys.size()]; |
There was a problem hiding this comment.
The encoded hash here is values.hashCode() over a List<String> built directly from each key field's toString(). However, Hudi's production path goes through BucketIdentifier#getBucketId(HoodieKey, indexKeyFields, numBuckets), which parses the record-key string by splitting on : and ,. As soon as a key field's string form contains : or , (very common for TIMESTAMP_LTZ, e.g. 2023-10-25T10:01:13.182Z, or any user string with a comma), the List<String> Hudi reconstructs differs from the one we hash here, and the resulting bucket id will diverge from Hudi's.
Note that HudiBucketingFunctionTest#testTimestampLtzType only validates against the BucketIdentifier.getBucketId(List<String>, int) overload, which sidesteps this parsing step. Please add an end-to-end test that goes through the HoodieKey overload, and either escape : / , inside stringifyForRecordKey, or document the limitation explicitly in the class Javadoc.
There was a problem hiding this comment.
Thanks — re-read Hudi's KeyGenUtils.extractRecordKeysByFields in Hudi master/release-1.x. The parser has two modes that we now mirror in HudiKeyEncoder:
Single-part recordKey (no , and no :) → returned verbatim, placeholder NOT round-tripped.
Composite recordKey → split on :/,, with "__null__" → null and "__empty__" → "" before List#hashCode().
Two findings while implementing the alignment:
The :-in-value concern in your comment is actually handled correctly by Hudi's look-ahead loop (the commaPosition < keyValueSep1 retry), so TIMESTAMP_LTZ values like 2023-10-25T10:01:13.182Z are fine — confirmed by a new end-to-end test that goes through BucketIdentifier.getBucketId(recordKey, "f1,f2", n).
The real divergence sources are (a) values containing , (Hudi doesn't escape) and (b) null/empty placeholder round-trip in composite mode. Both are now handled: the encoder produces List<String> elements element-wise equal to Hudi's parsed list, and rejects unrepresentable inputs (, in value, literal placeholder collision) up front.
Result: bucket id is bit-for-bit equal to Hudi's BucketIdentifier across all 21 tests, including the recordKey-string overload that exercises Hudi's production parse path.
Purpose
Linked issue: #3274
Introduce Hudi's bucketing strategy into Fluss so that the Fluss server/client can compute the same bucket id as Hudi's BucketIdentifier when tiering data into a Hudi table with bucket index. This is a prerequisite for the upcoming HudiLakeWriter and HudiCompaction PRs, which need to route records to the correct Hudi bucket file.
Brief change log
fluss-common (production code)
BucketingFunction.of(...) — add DataLakeFormat.HUDI branch that returns HudiBucketingFunction.
HudiBucketingFunction — implements BucketingFunction. Decodes a 4-byte big-endian int produced by HudiKeyEncoder and computes (hash & Integer.MAX_VALUE) % numBuckets, matching Hudi's BucketIdentifier.getBucketId(List, int). Includes strict input validation (bucketKey must be exactly 4 bytes, numBuckets must be positive).
KeyEncoder.createKeyEncoder(...) — add DataLakeFormat.HUDI branch that returns HudiKeyEncoder.
HudiKeyEncoder — implements KeyEncoder. Computes List.hashCode() inline (h = 31*h + elementStringHash) over the stringified key fields, avoiding intermediate ArrayList/String.valueOf allocations on the hot path. For common numeric types (int, long, byte, short, boolean) the string hash code is computed without materializing the string. Null fields are encoded as "null" placeholder (aligned with Hudi's KeyGenUtils.NULL_RECORDKEY_PLACEHOLDER) to avoid collision with the literal string "null".
fluss-lake-hudi (test & build)
Single-field types: INT, BIGINT, STRING, DECIMAL, TIMESTAMP_NTZ
Additional types: BOOLEAN, TINYINT, SMALLINT, FLOAT, DATE, TIME, TIMESTAMP_LTZ
Composite (multi-field) bucket keys with and without null fields
Null field uses placeholder (not literal "null") — regression test
Illegal input: bucketKey null / wrong length / numBuckets ≤ 0
Boundary: numBuckets=1, Integer.MIN_VALUE hash, negative hash sign-bit handling
All tests cross-validate against Hudi's BucketIdentifier.getBucketId(List, int)
Tests
HudiBucketingFunctionTest (13 test cases, all passing):
testIntegerHash / testLongHash / testStringHash / testDecimalHash / testTimestampEncodingHash — original single-field coverage
testNullFieldUsesPlaceholder / testNullFieldDoesNotCollideWithLiteralNullString — null handling
testBucketingRejectsInvalidBucketKey / testBucketingRejectsNonPositiveNumBuckets — input validation
testCompositeBucketKeyMatchesHudiFieldValueRecordKey / testCompositeBucketKeyWithNullFieldUsesPlaceholder — multi-field keys
testBooleanAndIntegralTypes / testDateAndTimeTypes / testTimestampLtzType — type coverage
testBucketingNumBucketsBoundaryValues — boundary conditions
API and Format
No API or storage format changes. This PR only adds new implementations behind existing interfaces (BucketingFunction and KeyEncoder) for a new DataLakeFormat.HUDI enum value that was already defined.
Documentation
No new user-facing documentation required. This is an internal bucketing strategy implementation.