Support retrieving clustering information of Delta Lake tables.#27052
Support retrieving clustering information of Delta Lake tables.#27052yangshangqing95 wants to merge 1 commit into
Conversation
Reviewer's GuideThis PR adds support for retrieving clustering information from Delta Lake tables behind a new feature flag. It introduces a session/catalog property to toggle visibility, extends the metadata layer and table handles to carry an optional List of clustered columns, implements logic to parse clustering info from the transaction log (via a new utility and an Operation enum), and updates tests and documentation accordingly. Sequence diagram for retrieving clustered columns from Delta Lake tablesequenceDiagram
participant Session
participant DeltaLakeMetadata
participant TransactionLogAccess
participant TableSnapshot
participant ClusteringMetadataUtil
Session->>DeltaLakeMetadata: getTableHandle(session, ...)
DeltaLakeMetadata->>TransactionLogAccess: getClusteredColumns(fileSystem, tableSnapshot)
TransactionLogAccess->>TableSnapshot: getCachedClusteredColumns()
alt Not cached
TransactionLogAccess->>ClusteringMetadataUtil: getLatestClusteredColumns(fileSystem, tableSnapshot)
ClusteringMetadataUtil-->>TransactionLogAccess: clusteredColumns
TransactionLogAccess->>TableSnapshot: setCachedClusteredColumns(clusteredColumns)
end
TransactionLogAccess-->>DeltaLakeMetadata: clusteredColumns
DeltaLakeMetadata-->>Session: LocatedTableHandle(clusteredColumns)
ER diagram for Delta Lake table properties with clustering infoerDiagram
DELTA_LAKE_TABLE_PROPERTIES {
string location
list partitioned_by
list clustered_by
long checkpoint_interval
string change_data_feed_enabled
string column_mapping_mode
}
DELTA_LAKE_TABLE_HANDLE {
string location
object metadata_entry
object protocol_entry
list clustered_columns
}
DELTA_LAKE_TABLE_PROPERTIES ||--|| DELTA_LAKE_TABLE_HANDLE : "table handle for properties"
DELTA_LAKE_TABLE_HANDLE {
list clustered_columns
}
Class diagram for Delta Lake clustering metadata supportclassDiagram
class DeltaLakeConfig {
- boolean showClusteredColumns
+ boolean isShowClusteredColumns()
+ DeltaLakeConfig setShowClusteredColumns(boolean)
}
class DeltaLakeSessionProperties {
+ static boolean ifShowClusteredColumns(ConnectorSession)
}
class DeltaLakeTableProperties {
+ static final String CLUSTER_BY_PROPERTY
+ static List<String> getClusteredBy(Map<String, Object>)
}
class DeltaLakeTableHandle {
- Optional<List<String>> clusteredColumns
+ Optional<List<String>> getClusteredColumns()
}
class TableSnapshot {
- Optional<List<String>> cachedClusteredColumns
+ Optional<List<String>> getCachedClusteredColumns()
+ void setCachedClusteredColumns(Optional<List<String>>)
}
class TransactionLogAccess {
+ Optional<List<String>> getClusteredColumns(TrinoFileSystem, TableSnapshot)
}
class ClusteringMetadataUtil {
+ static Optional<List<String>> getLatestClusteredColumns(TrinoFileSystem, TableSnapshot)
}
class Operation {
<<enum>>
+ static Operation fromString(String)
}
DeltaLakeConfig --> DeltaLakeSessionProperties
DeltaLakeSessionProperties --> DeltaLakeTableProperties
DeltaLakeTableProperties --> DeltaLakeTableHandle
DeltaLakeTableHandle --> TableSnapshot
TableSnapshot --> TransactionLogAccess
TransactionLogAccess --> ClusteringMetadataUtil
ClusteringMetadataUtil --> Operation
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
There was a problem hiding this comment.
Hey there - I've reviewed your changes and they look great!
Prompt for AI Agents
Please address the comments from this code review:
## Individual Comments
### Comment 1
<location> `plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/clustering/ClusteringMetadataUtil.java:94` </location>
<code_context>
+ REPLACE_TABLE_KEYWORD, CLUSTERING_PARAMETER_KEY,
+ CLUSTER_BY, NEW_CLUSTERING_PARAMETER_KEY);
+
+ private static final ThreadLocal<Map<String, String>> OLD_TO_NEW_RENAMED_COLUMNS = ThreadLocal.withInitial(HashMap::new);
+
+ private ClusteringMetadataUtil()
</code_context>
<issue_to_address>
**issue (bug_risk):** ThreadLocal usage for OLD_TO_NEW_RENAMED_COLUMNS may leak memory if not cleared in all code paths.
If an exception occurs before ThreadLocal removal, it may not be cleared. Use a try-finally block to guarantee cleanup and prevent memory leaks.
</issue_to_address>
### Comment 2
<location> `plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/clustering/ClusteringMetadataUtil.java:249` </location>
<code_context>
+ }
+
+ @VisibleForTesting
+ static void recordRenamedColumns(CommitInfoEntry commitInfoEntry)
+ {
+ String oldName = commitInfoEntry.operationParameters().get(RENAMED_OLD_COLUMN_KEY);
</code_context>
<issue_to_address>
**suggestion:** The logic for updating OLD_TO_NEW_RENAMED_COLUMNS may not handle multiple renames correctly.
The current approach may lose information if a column is renamed multiple times. Please consider tracking all previous names to ensure the mapping remains accurate.
</issue_to_address>
### Comment 3
<location> `plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/clustering/Operation.java:96` </location>
<code_context>
+ CREATE_TABLE_KEYWORD.getOperationName().toLowerCase(), CREATE_TABLE_KEYWORD,
+ REPLACE_TABLE_KEYWORD.getOperationName().toLowerCase(), REPLACE_TABLE_KEYWORD);
+
+ public static Operation fromString(String operationName)
+ {
+ Operation operation = LOWERCASE_NAME_TO_OPERATION.get(operationName.toLowerCase());
</code_context>
<issue_to_address>
**suggestion:** fromString may return UNKNOW_OPERATION for valid but differently-cased or formatted operation names.
Currently, only exact matches are supported, so inputs with extra whitespace or formatting may not be recognized. Consider trimming whitespace or using regex to improve matching robustness.
</issue_to_address>
### Comment 4
<location> `plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeBasic.java:2735-2736` </location>
<code_context>
assertQuery("SELECT * FROM " + sourceTable, sourceTableValues);
}
+ @Test
+ void testShowCreateTableWithClusteredInfo()
+ {
+ Session session = Session.builder(getSession())
</code_context>
<issue_to_address>
**suggestion (testing):** Test for clustered columns in SHOW CREATE TABLE covers both enabled and disabled states.
Please also add a test case for the default configuration (without setting the session property) to confirm expected default behavior.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
9163402 to
660241a
Compare
|
Is this PR preparatory work for future performance improvements, with no immediate benefit? |
Hi @ebyhr |
|
Could you please share the final solution (= how to improve performance eventually)? |
|
Sure @ebyhr
In simple terms, compared with partitions, Liquid Clustering provides:
Compared with Parquet column statistics, which only store per-file min/max values without any global organization, Liquid Clustering provides a higher-level layout strategy. It ensures that values of clustering keys are physically localized across files, making Parquet’s per-file statistics far more effective for data skipping and reducing the number of files scanned. Once Trino integrates with this metadata, it will be able to perform more accurate file pruning and data skipping, enabling more flexible data organization and clustering, thereby significantly improving the performance of queries filtering on clustering keys. |
|
Thanks for explaining the details. I already know about liquid clustering. I wanted to know the actual follow-up plan (especially read part) for the Delta Lake connector.
The connector already uses stats from the transaction logs. Are you planning to read different metadata in the future? If so, could you elaborate on that? |
|
@yangshangqing95 Would you mind sharing the plan for supporting the write path of liquid clustering? I suspect the read path won’t change much, since Trino already supports pruning with statistics, so reading clustering information doesn’t seem to provide additional benefit in my opinion |
|
Haha, please ignore my long-winded message above.
Coming back to this PR itself — It’s mostly about test data and test code — I hope to cover enough cases. My idea is to break down the larger system into smaller modules or features, which makes it easier to review and identify issues during testing. Open to any discussions or suggestions. |
660241a to
f0d770b
Compare
|
Hi @chenjian2664, any advice on this? Looking forward to proceeding ~ |
|
@yangshangqing95 I thought liquid clustering mainly affects the writing side, the reading part shouldn't need too many changes if we already support pruning scan files using field stats. Please help educate me: does liquid clustering write a different form of stats compared to non-clustered tables (for both primitive and struct columns)? If we already support reading complex type stats, do we actually need the clustering info (since we don't write it)? Why would that be needed? |
Hi @chenjian2664 Good questions, what I mentioned earlier was too general. just share some ideas, let's make it more clear with a simple example.
I can set the cluster key for this table in Databricks by using
Then, during a query, when we use person.age as a filter condition, Databricks can leverage the clustered column statistics it maintains internally to perform pruning. But when the same query executed in Trino, things are different. Due to the subfield can not be pushed down to the tables-scan stage, the statistics of clustered fields can not be used for pruning. As a result, there would be a full table scan. This is also an issue I encounter in practice. I found that for the same query, Trino takes significantly longer to execute, and this performance degradation becomes more pronounced as the data volume grows. Of course, in practice, the actual table structure and data volume are much more complex than in the example above. Now back to your question
No, these stats will still stored in the addfile entry, for both primitive and struct columns.
We support read complex type stats but now we're not actually using them. As we don't know if a subfield is a clustered key and don't know if the stats of the subfield has been recorded. My thought is that we can extract the clustered key separately for filtering, elevating it to the level of a partition, then allows us to make full use of all statistical information for more thorough pruning. And the first step is to identify which field/column is the clustered key. |
| { | ||
| public static final String LOCATION_PROPERTY = "location"; | ||
| public static final String PARTITIONED_BY_PROPERTY = "partitioned_by"; | ||
| public static final String CLUSTER_BY_PROPERTY = "clustered_by"; |
There was a problem hiding this comment.
We shouldn't expose this property until we support creating a table with this property. Otherwise, SHOW CREATE TABLE result gets non-reusable (exception should happen). The current implementation ignores the property in CREATE TABLE - this is also no go.
There was a problem hiding this comment.
make sense, removed
| .collect(toImmutableList()); | ||
| } | ||
| else { | ||
| LOG.error(String.format("Unknown clustering key: %s", clusteredKey)); |
There was a problem hiding this comment.
Avoid using String.format in logger.
| public static final String VARIANT_TYPE_FEATURE_NAME = "variantType"; | ||
| public static final String VARIANT_TYPE_PREVIEW_FEATURE_NAME = "variantType-preview"; | ||
| public static final String V2_CHECKPOINT_FEATURE_NAME = "v2Checkpoint"; | ||
| public static final String CLUSTERED_TABLES_FEATURE_NAME = "clustering"; |
There was a problem hiding this comment.
Move this constant under CHECK_CONSTRAINTS_FEATURE_NAME for ordering by alphabetically.
| public static int getTemporalTimeTravelLinearSearchMaxSize() | ||
| { | ||
| return TEMPORAL_TIME_TRAVEL_LINEAR_SEARCH_MAX_SIZE; | ||
| } |
There was a problem hiding this comment.
Make TEMPORAL_TIME_TRAVEL_LINEAR_SEARCH_MAX_SIZE public, and remove this method.
|
|
||
| import static org.assertj.core.api.AssertionsForClassTypes.assertThat; | ||
|
|
||
| public class OperationTest |
There was a problem hiding this comment.
| assertThat(result.isPresent()).isTrue(); | ||
| assertThat(ImmutableList.of()).isEqualTo(result.get()); |
There was a problem hiding this comment.
AssertJ provides contains method for optional type.
assertThat(result).contains(ImmutableList.of());Also, the order of parameter is wrong. assertThat() should take actual, not expected.
| Optional<CommitInfoEntry> commitInfo = Optional.of(commitInfoEntry); | ||
| List<String> result = ClusteringMetadataUtil.extractClusteredColumns(commitInfo); | ||
|
|
||
| assertThat(result.isEmpty()).isTrue(); |
| assertThat(2).isEqualTo(result.size()); | ||
| assertThat(result.containsAll(ImmutableList.of("col1", "col2"))).isTrue(); |
There was a problem hiding this comment.
Please use helper methods in AssertJ as much as possible:
assertThat(result).containsExactly("col1", "col2");| @@ -0,0 +1 @@ | |||
| {"commitInfo":{"timestamp":1760712838006,"userId":"user1","userName":"user1","operation":"OPTIMIZE","operationParameters":{"clusterBy":"[]","zOrderBy":"[]","batchId":"0","predicate":"[]","auto":true},"notebook":{"notebookId":"xxxxxx"},"clusterId":"cluster","readVersion":9,"isolationLevel":"SnapshotIsolation","isBlindAppend":false,"operationMetrics":{"numRemovedFiles":"2","numRemovedBytes":"5398","p25FileSize":"3647","numDeletionVectorsRemoved":"1","minFileSize":"3647","p75FileSize":"3647","p50FileSize":"3647","numAddedBytes":"3647","numAddedFiles":"1","maxFileSize":"3647"},"tags":{"delta.rowTracking.preserved":"true"},"engineInfo":"Databricks-Runtime/17.2.x-photon-scala2.13","txnId":"xxxxxx"}} | |||
There was a problem hiding this comment.
pls add README.md on how the data was created - see the other resource directories in the module as point of reference.
5a7a547 to
199ad1c
Compare
In that example above, does the parquet file have min/max statistics on the struct subfields ? |
Hi @raunaqmorarka I don’t think there’s any relationship between them.
|
|
Hi @ebyhr @findinpath I've fixed/updated all above comments, any new thoughts on this? I’m very much looking forward to starting the next phase of work, thank you all! |
|
This pull request has gone a while without any activity. Ask for help on #core-dev on Trino slack. |
|
Closing this pull request, as it has been stale for six weeks. Feel free to re-open at any time. |
795de15 to
f5c8230
Compare
f3ae6e2 to
f5c8230
Compare
f5c8230 to
d2911cf
Compare
|
This pull request has gone a while without any activity. Ask for help on #core-dev on Trino slack. |






Description
Support retrieving clustering information from Delta Lake tables, controlled by session and configuration settings.
By default, retrieving clustering information is disabled (false).
This serves as a foundation for future integration with the Delta Lake Liquid Clustering feature.
The work to fully support Delta Lake’s Liquid Clustering capability is already planned and in progress.
Additional context and related issues
About Delta Lake Liquid Clustering: https://delta.io/blog/liquid-clustering/
Release notes
( ) This is not user-visible or is docs only, and no release notes are required.
( ) Release notes are required. Please propose a release note for me.
(x) Release notes are required, with the following suggested text:
Summary by Sourcery
Enable optional retrieval of clustering information for Delta Lake tables and expose it as a new table property to support future Liquid Clustering integration.
New Features:
Enhancements:
Documentation:
Tests: