Allow distributed table procedures to output metrics in result#28992
Conversation
1f13039 to
c15ccff
Compare
|
@coderabbitai full review |
✅ Actions performedFull review triggered. |
📝 WalkthroughWalkthroughThis pull request refactors table execution to return structured metrics instead of void. The Warning There were issues while running some tools. Please review the errors and either fix the tool's configuration or disable the tool if it's a critical failure. 🔧 ast-grep (0.42.0)core/trino-main/src/main/java/io/trino/metadata/MetadataManager.javacore/trino-main/src/main/java/io/trino/sql/analyzer/StatementAnalyzer.javacore/trino-main/src/main/java/io/trino/sql/planner/LocalExecutionPlanner.java
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 3
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
core/trino-main/src/main/java/io/trino/sql/planner/optimizations/SymbolMapper.java (1)
611-617:⚠️ Potential issue | 🔴 CriticalMap
TableFinishNodeoutputs before rebuilding the node.This change stopped rewriting the node’s own output symbols. The old code mapped the single output symbol, and leaving
node.getOutputSymbols()untouched here can produce aTableFinishNodewhose advertised outputs no longer match the remapped symbols seen by parent nodes.Proposed fix
return new TableFinishNode( node.getId(), source, node.getTarget(), - node.getOutputSymbols(), + map(node.getOutputSymbols()), node.getStatisticsAggregation().map(this::map), node.getStatisticsAggregationDescriptor().map(descriptor -> descriptor.map(this::map)));🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@core/trino-main/src/main/java/io/trino/sql/planner/optimizations/SymbolMapper.java` around lines 611 - 617, The new TableFinishNode is rebuilt without remapping its own output symbols, causing a mismatch; update the TableFinishNode construction in SymbolMapper so that node.getOutputSymbols() is transformed via this::map (i.e., map each output symbol through SymbolMapper.map) and pass those mapped output symbols into the TableFinishNode constructor (keeping the existing statisticsAggregation and descriptor mappings).
🧹 Nitpick comments (4)
core/trino-main/src/test/java/io/trino/sql/planner/iterative/rule/test/PlanBuilder.java (1)
689-697: Add a list-basedtableFinishhelper as well.Line 695 still hardcodes the old single-symbol shape, so planner tests can't build the new multi-column
TableFinishNoderesult through this helper. Keeping aList<Symbol>overload and delegating the single-symbol convenience method to it would make the new(metric_name, metric_value)path testable.♻️ Suggested helper overload
+ public TableFinishNode tableFinish(PlanNode source, WriterTarget target, List<Symbol> outputSymbols) + { + return new TableFinishNode( + idAllocator.getNextId(), + source, + target, + ImmutableList.copyOf(outputSymbols), + Optional.empty(), + Optional.empty()); + } + public TableFinishNode tableFinish(PlanNode source, WriterTarget target, Symbol rowCountSymbol) { - return new TableFinishNode( - idAllocator.getNextId(), - source, - target, - ImmutableList.of(rowCountSymbol), - Optional.empty(), - Optional.empty()); + return tableFinish(source, target, ImmutableList.of(rowCountSymbol)); }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@core/trino-main/src/test/java/io/trino/sql/planner/iterative/rule/test/PlanBuilder.java` around lines 689 - 697, Add a new overload of PlanBuilder.tableFinish that accepts List<Symbol> rowCountSymbols and constructs the TableFinishNode using that list (passing idAllocator.getNextId(), source, target, ImmutableList.copyOf(rowCountSymbols), Optional.empty(), Optional.empty()); then change the existing tableFinish(PlanNode, WriterTarget, Symbol) convenience method to delegate to the new list-based overload (e.g., by calling ImmutableList.of(rowCountSymbol)). Update references to use the new overload so tests can build multi-column TableFinishNode results; keep TableFinishNode and idAllocator usage unchanged.core/trino-main/src/test/java/io/trino/connector/MockConnector.java (1)
905-908: MakefinishTableExecutemetrics configurable in the mock.Always returning
ImmutableMap.of()means tests usingMockConnectorcan only exercise the empty-metrics path, so this new result-row behavior is hard to validate through the standard test connector. Consider threading a callback throughMockConnectorFactory, like the other metadata hooks, so tests can assert non-empty metric propagation.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@core/trino-main/src/test/java/io/trino/connector/MockConnector.java` around lines 905 - 908, The finishTableExecute method in MockConnector always returns ImmutableMap.of(), preventing tests from exercising non-empty metrics; modify MockConnector to accept a configurable callback/supplier (e.g., Function<ConnectorTableExecuteHandle, Map<String,Long>> or Supplier<Map<String,Long>>) passed in from MockConnectorFactory (similar to other metadata hooks), store it as a field on MockConnector, and change finishTableExecute to call that callback and return its result (falling back to ImmutableMap.of() if callback is null) so tests can inject and assert non-empty metric maps.core/trino-main/src/main/java/io/trino/metadata/MetadataManager.java (1)
398-403: Snapshot connector metrics before returning them.This map now leaves the connector boundary and becomes query output. Returning it directly exposes connector-owned mutable state; a defensive copy gives callers a stable snapshot and fails fast on null keys or values.
♻️ Proposed fix
`@Override` public Map<String, Long> finishTableExecute(Session session, TableExecuteHandle tableExecuteHandle, Collection<Slice> fragments, List<Object> tableExecuteState) { CatalogHandle catalogHandle = tableExecuteHandle.catalogHandle(); ConnectorMetadata metadata = getMetadata(session, catalogHandle); - return metadata.finishTableExecute(session.toConnectorSession(catalogHandle), tableExecuteHandle.connectorHandle(), fragments, tableExecuteState); + return ImmutableMap.copyOf(metadata.finishTableExecute( + session.toConnectorSession(catalogHandle), + tableExecuteHandle.connectorHandle(), + fragments, + tableExecuteState)); }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@core/trino-main/src/main/java/io/trino/metadata/MetadataManager.java` around lines 398 - 403, The map returned by metadata.finishTableExecute in finishTableExecute leaks connector-owned mutable state; change finishTableExecute to capture the returned Map from ConnectorMetadata.finishTableExecute (call site: metadata.finishTableExecute(session.toConnectorSession(catalogHandle), tableExecuteHandle.connectorHandle(), fragments, tableExecuteState)), create a defensive snapshot (e.g., immutable copy or new HashMap) and return that snapshot instead, validating there are no null keys or values before returning so callers get a stable, independent result.core/trino-main/src/main/java/io/trino/sql/planner/plan/TableFinishNode.java (1)
45-55: Reject emptyoutputSymbolsup front.Moving from a single
rowCountSymbolto a list removes the old “at least one output” invariant. Failing fast here would keep malformed finish nodes from slipping through with a zero-column schema.♻️ Proposed fix
- this.outputSymbols = ImmutableList.copyOf(outputSymbols); + this.outputSymbols = ImmutableList.copyOf(requireNonNull(outputSymbols, "outputSymbols is null")); + checkArgument(!this.outputSymbols.isEmpty(), "outputSymbols is empty");🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@core/trino-main/src/main/java/io/trino/sql/planner/plan/TableFinishNode.java` around lines 45 - 55, The constructor of TableFinishNode should reject an empty outputSymbols list to preserve the "at least one output" invariant: in the TableFinishNode constructor (the method that takes outputSymbols and sets this.outputSymbols) add a guard after validating non-null to check that outputSymbols is not empty (e.g., checkArgument(!outputSymbols.isEmpty(), "outputSymbols is empty") or equivalent) so malformed finish nodes with zero output columns are rejected early.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@core/trino-spi/src/main/java/io/trino/spi/connector/ConnectorMetadata.java`:
- Around line 172-176: The change replaced the original void finishTableExecute
signature and breaks binary compatibility; restore the original void
finishTableExecute(ConnectorSession, ConnectorTableExecuteHandle,
Collection<Slice>, List<Object>) as a default method (preserving existing
behavior) and add a new default method finishTableExecuteWithMetrics or
finishTableExecute(ConnectorSession, ConnectorTableExecuteHandle,
Collection<Slice>, List<Object>) returning Map<String, Long> that by default
delegates to the void method (or calls it and then returns
Collections.emptyMap()) so existing connectors still implement the original
method while new code can use the metrics-returning variant; update
ConnectorMetadata to include both methods with the default delegating behavior.
In
`@plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java`:
- Around line 2941-2947: The finishTableExecute currently returns an empty
ImmutableMap for OPTIMIZE; update it to return real metrics by having
finishOptimize produce and return a metrics map (or populate metrics on
DeltaLakeTableExecuteHandle) that includes counts like rewritten files and added
files derived from scannedDataFiles and dataFileInfos; change finishOptimize
signature to return Map<String,Long> (or set fields on
DeltaLakeTableExecuteHandle) and in finishTableExecute call Map<String,Long>
metrics = finishOptimize(session, executeHandle, fragments, splitSourceInfo);
then return that metrics map instead of ImmutableMap.of(), ensuring keys are
descriptive (e.g., "rewritten_data_files", "added_data_files") and values are
Long.
In
`@plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java`:
- Around line 5706-5708: The test currently asserts a fixed
('added_data_files_count', 1) in the assertUpdate call that executes OPTIMIZE
for tableName (invoked via withSingleWriterPerTask(getSession())), but the
actual added file count can fan out up to workerCount and cause flakiness;
update the assertUpdate assertion to accept a range or matcher for
added_data_files_count (e.g., 1..workerCount) or compute the expectedAddedFiles
based on workerCount and use that value in the assertUpdate result check so the
test tolerates multi-worker fanout.
---
Outside diff comments:
In
`@core/trino-main/src/main/java/io/trino/sql/planner/optimizations/SymbolMapper.java`:
- Around line 611-617: The new TableFinishNode is rebuilt without remapping its
own output symbols, causing a mismatch; update the TableFinishNode construction
in SymbolMapper so that node.getOutputSymbols() is transformed via this::map
(i.e., map each output symbol through SymbolMapper.map) and pass those mapped
output symbols into the TableFinishNode constructor (keeping the existing
statisticsAggregation and descriptor mappings).
---
Nitpick comments:
In `@core/trino-main/src/main/java/io/trino/metadata/MetadataManager.java`:
- Around line 398-403: The map returned by metadata.finishTableExecute in
finishTableExecute leaks connector-owned mutable state; change
finishTableExecute to capture the returned Map from
ConnectorMetadata.finishTableExecute (call site:
metadata.finishTableExecute(session.toConnectorSession(catalogHandle),
tableExecuteHandle.connectorHandle(), fragments, tableExecuteState)), create a
defensive snapshot (e.g., immutable copy or new HashMap) and return that
snapshot instead, validating there are no null keys or values before returning
so callers get a stable, independent result.
In
`@core/trino-main/src/main/java/io/trino/sql/planner/plan/TableFinishNode.java`:
- Around line 45-55: The constructor of TableFinishNode should reject an empty
outputSymbols list to preserve the "at least one output" invariant: in the
TableFinishNode constructor (the method that takes outputSymbols and sets
this.outputSymbols) add a guard after validating non-null to check that
outputSymbols is not empty (e.g., checkArgument(!outputSymbols.isEmpty(),
"outputSymbols is empty") or equivalent) so malformed finish nodes with zero
output columns are rejected early.
In `@core/trino-main/src/test/java/io/trino/connector/MockConnector.java`:
- Around line 905-908: The finishTableExecute method in MockConnector always
returns ImmutableMap.of(), preventing tests from exercising non-empty metrics;
modify MockConnector to accept a configurable callback/supplier (e.g.,
Function<ConnectorTableExecuteHandle, Map<String,Long>> or
Supplier<Map<String,Long>>) passed in from MockConnectorFactory (similar to
other metadata hooks), store it as a field on MockConnector, and change
finishTableExecute to call that callback and return its result (falling back to
ImmutableMap.of() if callback is null) so tests can inject and assert non-empty
metric maps.
In
`@core/trino-main/src/test/java/io/trino/sql/planner/iterative/rule/test/PlanBuilder.java`:
- Around line 689-697: Add a new overload of PlanBuilder.tableFinish that
accepts List<Symbol> rowCountSymbols and constructs the TableFinishNode using
that list (passing idAllocator.getNextId(), source, target,
ImmutableList.copyOf(rowCountSymbols), Optional.empty(), Optional.empty()); then
change the existing tableFinish(PlanNode, WriterTarget, Symbol) convenience
method to delegate to the new list-based overload (e.g., by calling
ImmutableList.of(rowCountSymbol)). Update references to use the new overload so
tests can build multi-column TableFinishNode results; keep TableFinishNode and
idAllocator usage unchanged.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 6ce4c8b7-b7fc-43fd-8558-e108c636e89d
📒 Files selected for processing (23)
core/trino-main/src/main/java/io/trino/execution/TableExecuteContext.javacore/trino-main/src/main/java/io/trino/metadata/Metadata.javacore/trino-main/src/main/java/io/trino/metadata/MetadataManager.javacore/trino-main/src/main/java/io/trino/operator/TableFinishOperator.javacore/trino-main/src/main/java/io/trino/sql/analyzer/StatementAnalyzer.javacore/trino-main/src/main/java/io/trino/sql/planner/LocalExecutionPlanner.javacore/trino-main/src/main/java/io/trino/sql/planner/LogicalPlanner.javacore/trino-main/src/main/java/io/trino/sql/planner/optimizations/BeginTableWrite.javacore/trino-main/src/main/java/io/trino/sql/planner/optimizations/SymbolMapper.javacore/trino-main/src/main/java/io/trino/sql/planner/plan/TableFinishNode.javacore/trino-main/src/main/java/io/trino/tracing/TracingConnectorMetadata.javacore/trino-main/src/main/java/io/trino/tracing/TracingMetadata.javacore/trino-main/src/test/java/io/trino/connector/MockConnector.javacore/trino-main/src/test/java/io/trino/metadata/AbstractMockMetadata.javacore/trino-main/src/test/java/io/trino/sql/planner/iterative/rule/test/PlanBuilder.javacore/trino-spi/pom.xmlcore/trino-spi/src/main/java/io/trino/spi/connector/ConnectorMetadata.javalib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/classloader/ClassLoaderSafeConnectorMetadata.javaplugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.javaplugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadata.javaplugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.javaplugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.javaplugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseMetadata.java
379ead2 to
7e11365
Compare
raunaqmorarka
left a comment
There was a problem hiding this comment.
Let's update the documentation as well
7e11365 to
ed5908b
Compare
Description
Allow distributed table procedures to output metrics in result.
Iceberg OPTIMIZE procedure returns the output below:
We can handle other table procedures as a follow-up.
Release notes