diff --git a/core/trino-main/src/main/java/io/trino/execution/TableExecuteContext.java b/core/trino-main/src/main/java/io/trino/execution/TableExecuteContext.java index 5f73ce670ae5..c113ffcc5e38 100644 --- a/core/trino-main/src/main/java/io/trino/execution/TableExecuteContext.java +++ b/core/trino-main/src/main/java/io/trino/execution/TableExecuteContext.java @@ -14,16 +14,22 @@ package io.trino.execution; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.errorprone.annotations.concurrent.GuardedBy; import java.util.List; +import java.util.Map; +import java.util.Optional; +import static com.google.common.base.Preconditions.checkState; import static java.util.Objects.requireNonNull; public class TableExecuteContext { @GuardedBy("this") private List splitsInfo; + @GuardedBy("this") + private Map metrics; public synchronized void setSplitsInfo(List splitsInfo) { @@ -41,4 +47,15 @@ public synchronized List getSplitsInfo() } return splitsInfo; } + + public synchronized Optional> getMetrics() + { + return Optional.ofNullable(metrics); + } + + public synchronized void setMetrics(Map newMetrics) + { + checkState(metrics == null, "metrics already set to %s", metrics); + metrics = ImmutableMap.copyOf(newMetrics); + } } diff --git a/core/trino-main/src/main/java/io/trino/metadata/Metadata.java b/core/trino-main/src/main/java/io/trino/metadata/Metadata.java index 96e904f72558..c572f9a94f4f 100644 --- a/core/trino-main/src/main/java/io/trino/metadata/Metadata.java +++ b/core/trino-main/src/main/java/io/trino/metadata/Metadata.java @@ -128,7 +128,7 @@ Optional getTableHandleForExecute( BeginTableExecuteResult beginTableExecute(Session session, TableExecuteHandle handle, TableHandle updatedSourceTableHandle); - void finishTableExecute(Session session, TableExecuteHandle handle, Collection fragments, List tableExecuteState); + Map finishTableExecute(Session session, TableExecuteHandle handle, Collection fragments, List tableExecuteState); Map executeTableExecute(Session session, TableExecuteHandle handle); diff --git a/core/trino-main/src/main/java/io/trino/metadata/MetadataManager.java b/core/trino-main/src/main/java/io/trino/metadata/MetadataManager.java index 1978754120ac..eb933d89301d 100644 --- a/core/trino-main/src/main/java/io/trino/metadata/MetadataManager.java +++ b/core/trino-main/src/main/java/io/trino/metadata/MetadataManager.java @@ -395,11 +395,11 @@ public BeginTableExecuteResult beginTableExecut } @Override - public void finishTableExecute(Session session, TableExecuteHandle tableExecuteHandle, Collection fragments, List tableExecuteState) + public Map finishTableExecute(Session session, TableExecuteHandle tableExecuteHandle, Collection fragments, List tableExecuteState) { CatalogHandle catalogHandle = tableExecuteHandle.catalogHandle(); ConnectorMetadata metadata = getMetadata(session, catalogHandle); - metadata.finishTableExecute(session.toConnectorSession(catalogHandle), tableExecuteHandle.connectorHandle(), fragments, tableExecuteState); + return metadata.finishTableExecute(session.toConnectorSession(catalogHandle), tableExecuteHandle.connectorHandle(), fragments, tableExecuteState); } @Override diff --git a/core/trino-main/src/main/java/io/trino/operator/TableFinishOperator.java b/core/trino-main/src/main/java/io/trino/operator/TableFinishOperator.java index 463fc4c0b989..d4910fe20ba5 100644 --- a/core/trino-main/src/main/java/io/trino/operator/TableFinishOperator.java +++ b/core/trino-main/src/main/java/io/trino/operator/TableFinishOperator.java @@ -26,6 +26,7 @@ import io.trino.spi.PageBuilder; import io.trino.spi.QueryId; import io.trino.spi.block.Block; +import io.trino.spi.block.BlockBuilder; import io.trino.spi.connector.ConnectorOutputMetadata; import io.trino.spi.statistics.ComputedStatistics; import io.trino.spi.type.Type; @@ -34,16 +35,20 @@ import java.util.Collection; import java.util.List; +import java.util.Map; +import java.util.Map.Entry; import java.util.Optional; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; import static com.google.common.base.Preconditions.checkState; import static com.google.common.base.Verify.verify; +import static io.airlift.slice.Slices.utf8Slice; import static io.trino.operator.TableWriterOperator.FRAGMENT_CHANNEL; import static io.trino.operator.TableWriterOperator.ROW_COUNT_CHANNEL; import static io.trino.spi.type.BigintType.BIGINT; import static io.trino.spi.type.VarbinaryType.VARBINARY; +import static io.trino.spi.type.VarcharType.VARCHAR; import static java.util.Objects.requireNonNull; import static java.util.concurrent.TimeUnit.NANOSECONDS; @@ -314,6 +319,21 @@ public Page getOutput() this.outputMetadata.set(tableFinisher.finishTable(fragmentBuilder.build(), computedStatisticsBuilder.build(), tableExecuteContext)); + // Check if table execute context has metrics to output + Optional> metricsOptional = tableExecuteContext.getMetrics(); + if (metricsOptional.isPresent()) { + Map metrics = metricsOptional.get(); + PageBuilder pageBuilder = new PageBuilder(ImmutableList.of(VARCHAR, BIGINT)); + BlockBuilder metricNameBuilder = pageBuilder.getBlockBuilder(0); + BlockBuilder metricValueBuilder = pageBuilder.getBlockBuilder(1); + for (Entry entry : metrics.entrySet()) { + VARCHAR.writeSlice(metricNameBuilder, utf8Slice(entry.getKey())); + BIGINT.writeLong(metricValueBuilder, entry.getValue()); + pageBuilder.declarePosition(); + } + return pageBuilder.build(); + } + // output page will only be constructed once, // so a new PageBuilder is constructed (instead of using PageBuilder.reset) PageBuilder page = new PageBuilder(1, TYPES); diff --git a/core/trino-main/src/main/java/io/trino/sql/analyzer/StatementAnalyzer.java b/core/trino-main/src/main/java/io/trino/sql/analyzer/StatementAnalyzer.java index 0b97bef4a5c8..cac970de0786 100644 --- a/core/trino-main/src/main/java/io/trino/sql/analyzer/StatementAnalyzer.java +++ b/core/trino-main/src/main/java/io/trino/sql/analyzer/StatementAnalyzer.java @@ -1338,10 +1338,7 @@ protected Scope visitTableExecute(TableExecute node, Optional scope) analysis.setUpdateType("ALTER TABLE EXECUTE"); analysis.setUpdateTarget(executeHandle.catalogHandle().getVersion(), tableName, Optional.of(table), Optional.empty()); - if (!procedureMetadata.getExecutionMode().isReadsData()) { - return createAndAssignScope(node, scope, Field.newUnqualified("metric_name", VARCHAR), Field.newUnqualified("metric_value", BIGINT)); - } - return createAndAssignScope(node, scope, Field.newUnqualified("rows", BIGINT)); + return createAndAssignScope(node, scope, Field.newUnqualified("metric_name", VARCHAR), Field.newUnqualified("metric_value", BIGINT)); } private List processTableExecuteArguments(TableExecute node, TableProcedureMetadata procedureMetadata, Optional scope) diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/LocalExecutionPlanner.java b/core/trino-main/src/main/java/io/trino/sql/planner/LocalExecutionPlanner.java index 0b6056306680..5d8fe6b24de6 100644 --- a/core/trino-main/src/main/java/io/trino/sql/planner/LocalExecutionPlanner.java +++ b/core/trino-main/src/main/java/io/trino/sql/planner/LocalExecutionPlanner.java @@ -3472,7 +3472,7 @@ public PhysicalOperation visitTableFinish(TableFinishNode node, LocalExecutionPl tableExecuteContextManager, shouldOutputRowCount(node), session); - Map layout = ImmutableMap.of(getOnlyElement(node.getOutputSymbols()), 0); + Map layout = makeLayoutFromOutputSymbols(node.getOutputSymbols()); return new PhysicalOperation(operatorFactory, layout, source); } @@ -4206,7 +4206,8 @@ private static TableFinisher createTableFinisher(Session session, TableFinishNod } if (target instanceof TableExecuteTarget tableExecuteTarget) { TableExecuteHandle tableExecuteHandle = tableExecuteTarget.getExecuteHandle(); - metadata.finishTableExecute(session, tableExecuteHandle, fragments, tableExecuteContext.getSplitsInfo()); + Map metrics = metadata.finishTableExecute(session, tableExecuteHandle, fragments, tableExecuteContext.getSplitsInfo()); + tableExecuteContext.setMetrics(metrics); return Optional.empty(); } if (target instanceof MergeTarget mergeTarget) { diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/LogicalPlanner.java b/core/trino-main/src/main/java/io/trino/sql/planner/LogicalPlanner.java index ca5a8bde3b15..9d4714117698 100644 --- a/core/trino-main/src/main/java/io/trino/sql/planner/LogicalPlanner.java +++ b/core/trino-main/src/main/java/io/trino/sql/planner/LogicalPlanner.java @@ -779,7 +779,7 @@ else if (isUsePreferredWritePartitioning(session)) { Optional.of(partialAggregation), Optional.of(result.getDescriptor().map(aggregations.getMappings()::get))), target, - symbolAllocator.newSymbol("rows", BIGINT), + ImmutableList.of(symbolAllocator.newSymbol("rows", BIGINT)), Optional.of(aggregations.getFinalAggregation()), Optional.of(result.getDescriptor())); @@ -800,7 +800,7 @@ else if (isUsePreferredWritePartitioning(session)) { Optional.empty(), Optional.empty()), target, - symbolAllocator.newSymbol("rows", BIGINT), + ImmutableList.of(symbolAllocator.newSymbol("rows", BIGINT)), Optional.empty(), Optional.empty()); @@ -865,7 +865,7 @@ private RelationPlan createDeletePlan(Analysis analysis, Delete node) idAllocator.getNextId(), planNode, target, - symbolAllocator.newSymbol("rows", BIGINT), + ImmutableList.of(symbolAllocator.newSymbol("rows", BIGINT)), Optional.empty(), Optional.empty()); @@ -882,7 +882,7 @@ private RelationPlan createUpdatePlan(Analysis analysis, Update node) idAllocator.getNextId(), planNode, target, - symbolAllocator.newSymbol("rows", BIGINT), + ImmutableList.of(symbolAllocator.newSymbol("rows", BIGINT)), Optional.empty(), Optional.empty()); @@ -898,7 +898,7 @@ private RelationPlan createMergePlan(Analysis analysis, Merge node) idAllocator.getNextId(), mergeNode, mergeNode.getTarget(), - symbolAllocator.newSymbol("rows", BIGINT), + ImmutableList.of(symbolAllocator.newSymbol("rows", BIGINT)), Optional.empty(), Optional.empty()); @@ -980,7 +980,7 @@ private RelationPlan createTableExecutePlan(Analysis analysis, TableExecute stat symbolAllocator.newSymbol("metricName", VARCHAR), symbolAllocator.newSymbol("metricValue", BIGINT)), executeHandle); - return new RelationPlan(node, analysis.getRootScope(), node.getOutputSymbols(), Optional.empty()); + return new RelationPlan(node, analysis.getScope(statement), node.getOutputSymbols(), Optional.empty()); } TableHandle tableHandle = analysis.getTableHandle(table); @@ -1056,6 +1056,12 @@ else if (isUsePreferredWritePartitioning(session)) { } verify(columnNames.size() == symbols.size(), "columnNames.size() != symbols.size(): %s and %s", columnNames, symbols); + + List outputSymbols = ImmutableList.builder() + .add(symbolAllocator.newSymbol("metricName", VARCHAR)) + .add(symbolAllocator.newSymbol("metricValue", BIGINT)) + .build(); + TableFinishNode commitNode = new TableFinishNode( idAllocator.getNextId(), new TableExecuteNode( @@ -1068,11 +1074,11 @@ else if (isUsePreferredWritePartitioning(session)) { columnNames, partitioningScheme), tableExecuteTarget, - symbolAllocator.newSymbol("rows", BIGINT), + outputSymbols, Optional.empty(), Optional.empty()); - return new RelationPlan(commitNode, analysis.getRootScope(), commitNode.getOutputSymbols(), Optional.empty()); + return new RelationPlan(commitNode, analysis.getScope(statement), outputSymbols, Optional.empty()); } private static class Key diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/iterative/rule/RemoveEmptyTableExecute.java b/core/trino-main/src/main/java/io/trino/sql/planner/iterative/rule/RemoveEmptyTableExecute.java index aaa7885749b7..d23e4f92fcbb 100644 --- a/core/trino-main/src/main/java/io/trino/sql/planner/iterative/rule/RemoveEmptyTableExecute.java +++ b/core/trino-main/src/main/java/io/trino/sql/planner/iterative/rule/RemoveEmptyTableExecute.java @@ -17,6 +17,7 @@ import io.trino.matching.Captures; import io.trino.matching.Pattern; import io.trino.sql.ir.Constant; +import io.trino.sql.ir.Expression; import io.trino.sql.ir.Row; import io.trino.sql.planner.iterative.Lookup; import io.trino.sql.planner.iterative.Rule; @@ -26,10 +27,11 @@ import io.trino.sql.planner.plan.TableFinishNode; import io.trino.sql.planner.plan.ValuesNode; +import java.util.List; import java.util.Optional; import static com.google.common.base.Verify.verify; -import static io.trino.spi.type.BigintType.BIGINT; +import static com.google.common.collect.ImmutableList.toImmutableList; import static io.trino.sql.planner.plan.Patterns.tableFinish; /** @@ -76,11 +78,15 @@ public Result apply(TableFinishNode finishNode, Captures captures, Context conte } verify(valuesNode.getRowCount() == 0, "Unexpected non-empty Values as source of TableExecuteNode"); + List rowValues = finishNode.getOutputSymbols().stream() + .map(symbol -> new Constant(symbol.type(), null)) + .collect(toImmutableList()); + return Result.ofPlanNode( new ValuesNode( finishNode.getId(), finishNode.getOutputSymbols(), - ImmutableList.of(new Row(ImmutableList.of(new Constant(BIGINT, null)))))); + ImmutableList.of(new Row(rowValues)))); } private Optional getSingleSourceSkipExchange(PlanNode node, Lookup lookup) diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/optimizations/BeginTableWrite.java b/core/trino-main/src/main/java/io/trino/sql/planner/optimizations/BeginTableWrite.java index effa7ed25cac..bae8522b5d0f 100644 --- a/core/trino-main/src/main/java/io/trino/sql/planner/optimizations/BeginTableWrite.java +++ b/core/trino-main/src/main/java/io/trino/sql/planner/optimizations/BeginTableWrite.java @@ -180,7 +180,7 @@ public PlanNode visitTableFinish(TableFinishNode node, RewriteContext descriptor.map(this::map))); } diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/plan/TableFinishNode.java b/core/trino-main/src/main/java/io/trino/sql/planner/plan/TableFinishNode.java index 6a9017af8e3d..a68b904030ff 100644 --- a/core/trino-main/src/main/java/io/trino/sql/planner/plan/TableFinishNode.java +++ b/core/trino-main/src/main/java/io/trino/sql/planner/plan/TableFinishNode.java @@ -33,7 +33,7 @@ public class TableFinishNode { private final PlanNode source; private final WriterTarget target; - private final Symbol rowCountSymbol; + private final List outputSymbols; private final Optional statisticsAggregation; private final Optional> statisticsAggregationDescriptor; @@ -42,7 +42,7 @@ public TableFinishNode( @JsonProperty("id") PlanNodeId id, @JsonProperty("source") PlanNode source, @JsonProperty("target") WriterTarget target, - @JsonProperty("rowCountSymbol") Symbol rowCountSymbol, + @JsonProperty("outputSymbols") List outputSymbols, @JsonProperty("statisticsAggregation") Optional statisticsAggregation, @JsonProperty("statisticsAggregationDescriptor") Optional> statisticsAggregationDescriptor) { @@ -51,7 +51,7 @@ public TableFinishNode( checkArgument(target != null || source instanceof TableWriterNode); this.source = requireNonNull(source, "source is null"); this.target = requireNonNull(target, "target is null"); - this.rowCountSymbol = requireNonNull(rowCountSymbol, "rowCountSymbol is null"); + this.outputSymbols = ImmutableList.copyOf(outputSymbols); this.statisticsAggregation = requireNonNull(statisticsAggregation, "statisticsAggregation is null"); this.statisticsAggregationDescriptor = requireNonNull(statisticsAggregationDescriptor, "statisticsAggregationDescriptor is null"); checkArgument(statisticsAggregation.isPresent() == statisticsAggregationDescriptor.isPresent(), "statisticsAggregation and statisticsAggregationDescriptor must both be either present or absent"); @@ -69,12 +69,6 @@ public WriterTarget getTarget() return target; } - @JsonProperty - public Symbol getRowCountSymbol() - { - return rowCountSymbol; - } - @JsonProperty public Optional getStatisticsAggregation() { @@ -93,10 +87,11 @@ public List getSources() return ImmutableList.of(source); } + @JsonProperty @Override public List getOutputSymbols() { - return ImmutableList.of(rowCountSymbol); + return outputSymbols; } @Override @@ -112,7 +107,7 @@ public PlanNode replaceChildren(List newChildren) getId(), Iterables.getOnlyElement(newChildren), target, - rowCountSymbol, + outputSymbols, statisticsAggregation, statisticsAggregationDescriptor); } diff --git a/core/trino-main/src/main/java/io/trino/tracing/TracingConnectorMetadata.java b/core/trino-main/src/main/java/io/trino/tracing/TracingConnectorMetadata.java index 58e5d52094a3..4ff7d28c5c81 100644 --- a/core/trino-main/src/main/java/io/trino/tracing/TracingConnectorMetadata.java +++ b/core/trino-main/src/main/java/io/trino/tracing/TracingConnectorMetadata.java @@ -183,11 +183,11 @@ public BeginTableExecuteResult fragments, List tableExecuteState) + public Map finishTableExecute(ConnectorSession session, ConnectorTableExecuteHandle tableExecuteHandle, Collection fragments, List tableExecuteState) { Span span = startSpan("finishTableExecute", tableExecuteHandle); try (var _ = scopedSpan(span)) { - delegate.finishTableExecute(session, tableExecuteHandle, fragments, tableExecuteState); + return delegate.finishTableExecute(session, tableExecuteHandle, fragments, tableExecuteState); } } diff --git a/core/trino-main/src/main/java/io/trino/tracing/TracingMetadata.java b/core/trino-main/src/main/java/io/trino/tracing/TracingMetadata.java index 70b0ca009c2a..9b71a87914c2 100644 --- a/core/trino-main/src/main/java/io/trino/tracing/TracingMetadata.java +++ b/core/trino-main/src/main/java/io/trino/tracing/TracingMetadata.java @@ -242,11 +242,11 @@ public BeginTableExecuteResult beginTableExecut } @Override - public void finishTableExecute(Session session, TableExecuteHandle handle, Collection fragments, List tableExecuteState) + public Map finishTableExecute(Session session, TableExecuteHandle handle, Collection fragments, List tableExecuteState) { Span span = startSpan("finishTableExecute", handle); try (var _ = scopedSpan(span)) { - delegate.finishTableExecute(session, handle, fragments, tableExecuteState); + return delegate.finishTableExecute(session, handle, fragments, tableExecuteState); } } diff --git a/core/trino-main/src/test/java/io/trino/connector/MockConnector.java b/core/trino-main/src/test/java/io/trino/connector/MockConnector.java index 4b57c05d4929..ef8b16f5357e 100644 --- a/core/trino-main/src/test/java/io/trino/connector/MockConnector.java +++ b/core/trino-main/src/test/java/io/trino/connector/MockConnector.java @@ -902,7 +902,10 @@ public Map executeTableExecute(ConnectorSession session, Connector } @Override - public void finishTableExecute(ConnectorSession session, ConnectorTableExecuteHandle tableExecuteHandle, Collection fragments, List tableExecuteState) {} + public Map finishTableExecute(ConnectorSession session, ConnectorTableExecuteHandle tableExecuteHandle, Collection fragments, List tableExecuteState) + { + return ImmutableMap.of(); + } @Override public Collection listFunctions(ConnectorSession session, String schemaName) diff --git a/core/trino-main/src/test/java/io/trino/metadata/AbstractMockMetadata.java b/core/trino-main/src/test/java/io/trino/metadata/AbstractMockMetadata.java index 28d343dcb681..056a98aeb6ca 100644 --- a/core/trino-main/src/test/java/io/trino/metadata/AbstractMockMetadata.java +++ b/core/trino-main/src/test/java/io/trino/metadata/AbstractMockMetadata.java @@ -177,7 +177,7 @@ public BeginTableExecuteResult beginTableExecut } @Override - public void finishTableExecute(Session session, TableExecuteHandle handle, Collection fragments, List tableExecuteState) + public Map finishTableExecute(Session session, TableExecuteHandle handle, Collection fragments, List tableExecuteState) { throw new UnsupportedOperationException(); } diff --git a/core/trino-main/src/test/java/io/trino/sql/planner/iterative/rule/test/PlanBuilder.java b/core/trino-main/src/test/java/io/trino/sql/planner/iterative/rule/test/PlanBuilder.java index d5f2af82fb75..904dbbe471d9 100644 --- a/core/trino-main/src/test/java/io/trino/sql/planner/iterative/rule/test/PlanBuilder.java +++ b/core/trino-main/src/test/java/io/trino/sql/planner/iterative/rule/test/PlanBuilder.java @@ -692,7 +692,7 @@ public TableFinishNode tableFinish(PlanNode source, WriterTarget target, Symbol idAllocator.getNextId(), source, target, - rowCountSymbol, + ImmutableList.of(rowCountSymbol), Optional.empty(), Optional.empty()); } diff --git a/core/trino-spi/pom.xml b/core/trino-spi/pom.xml index c764cf503520..e1e8ac799f12 100644 --- a/core/trino-spi/pom.xml +++ b/core/trino-spi/pom.xml @@ -270,6 +270,11 @@ method java.util.Optional<io.trino.spi.connector.ConnectorOutputMetadata> io.trino.spi.connector.ConnectorMetadata::finishRefreshMaterializedView(io.trino.spi.connector.ConnectorSession, io.trino.spi.connector.ConnectorTableHandle, io.trino.spi.connector.ConnectorInsertTableHandle, java.util.Collection<io.airlift.slice.Slice>, java.util.Collection<io.trino.spi.statistics.ComputedStatistics>, java.util.List<io.trino.spi.connector.ConnectorTableHandle>, boolean, boolean, boolean) Add hasNonDeterministicFunctions parameter to detect stale materialized views + + java.method.returnTypeChanged + method void io.trino.spi.connector.ConnectorMetadata::finishTableExecute(io.trino.spi.connector.ConnectorSession, io.trino.spi.connector.ConnectorTableExecuteHandle, java.util.Collection<io.airlift.slice.Slice>, java.util.List<java.lang.Object>) + method java.util.Map<java.lang.String, java.lang.Long> io.trino.spi.connector.ConnectorMetadata::finishTableExecute(io.trino.spi.connector.ConnectorSession, io.trino.spi.connector.ConnectorTableExecuteHandle, java.util.Collection<io.airlift.slice.Slice>, java.util.List<java.lang.Object>) + diff --git a/core/trino-spi/src/main/java/io/trino/spi/connector/ConnectorMetadata.java b/core/trino-spi/src/main/java/io/trino/spi/connector/ConnectorMetadata.java index 8bf5ab4bfeec..7e68c2502ea0 100644 --- a/core/trino-spi/src/main/java/io/trino/spi/connector/ConnectorMetadata.java +++ b/core/trino-spi/src/main/java/io/trino/spi/connector/ConnectorMetadata.java @@ -169,8 +169,9 @@ default BeginTableExecuteResult fragments, List tableExecuteState) + default Map finishTableExecute(ConnectorSession session, ConnectorTableExecuteHandle tableExecuteHandle, Collection fragments, List tableExecuteState) { throw new TrinoException(GENERIC_INTERNAL_ERROR, "ConnectorMetadata getTableHandleForExecute() is implemented without finishTableExecute()"); } diff --git a/docs/src/main/sphinx/connector/iceberg.md b/docs/src/main/sphinx/connector/iceberg.md index 9bb3cd7e88ad..736c45cc2066 100644 --- a/docs/src/main/sphinx/connector/iceberg.md +++ b/docs/src/main/sphinx/connector/iceberg.md @@ -869,6 +869,14 @@ following conditions are met per partition: ALTER TABLE test_table EXECUTE optimize ``` +```text + metric_name | metric_value +----------------------------+-------------- + rewritten_data_files_count | 1 + removed_delete_files_count | 1 + added_data_files_count | 2 +``` + The following statement merges files in a table that are under 128 megabytes in size: diff --git a/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/classloader/ClassLoaderSafeConnectorMetadata.java b/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/classloader/ClassLoaderSafeConnectorMetadata.java index 818b5605bb4f..9c787b2164da 100644 --- a/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/classloader/ClassLoaderSafeConnectorMetadata.java +++ b/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/classloader/ClassLoaderSafeConnectorMetadata.java @@ -245,10 +245,10 @@ public BeginTableExecuteResult fragments, List tableExecuteState) + public Map finishTableExecute(ConnectorSession session, ConnectorTableExecuteHandle tableExecuteHandle, Collection fragments, List tableExecuteState) { try (ThreadContextClassLoader _ = new ThreadContextClassLoader(classLoader)) { - delegate.finishTableExecute(session, tableExecuteHandle, fragments, tableExecuteState); + return delegate.finishTableExecute(session, tableExecuteHandle, fragments, tableExecuteState); } } diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java index 32c724dabeb1..9e7b1ab30b66 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java @@ -2938,13 +2938,13 @@ private BeginTableExecuteResult fragments, List splitSourceInfo) + public Map finishTableExecute(ConnectorSession session, ConnectorTableExecuteHandle tableExecuteHandle, Collection fragments, List splitSourceInfo) { DeltaLakeTableExecuteHandle executeHandle = (DeltaLakeTableExecuteHandle) tableExecuteHandle; switch (executeHandle.procedureId()) { case OPTIMIZE: finishOptimize(session, executeHandle, fragments, splitSourceInfo); - return; + return ImmutableMap.of(); } throw new IllegalArgumentException("Unknown procedure '" + executeHandle.procedureId() + "'"); } diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadata.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadata.java index 7b28516ecb80..c5c50651f80d 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadata.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadata.java @@ -2633,13 +2633,13 @@ private BeginTableExecuteResult fragments, List splitSourceInfo) + public Map finishTableExecute(ConnectorSession session, ConnectorTableExecuteHandle tableExecuteHandle, Collection fragments, List splitSourceInfo) { String procedureName = ((HiveTableExecuteHandle) tableExecuteHandle).getProcedureName(); if (procedureName.equals(OptimizeTableProcedure.NAME)) { finishOptimize(session, tableExecuteHandle, fragments, splitSourceInfo); - return; + return ImmutableMap.of(); } throw new IllegalArgumentException("Unknown procedure '" + procedureName + "'"); } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java index c9e256a38549..2a264142036b 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java @@ -2107,13 +2107,12 @@ private BeginTableExecuteResult fragments, List splitSourceInfo) + public Map finishTableExecute(ConnectorSession session, ConnectorTableExecuteHandle tableExecuteHandle, Collection fragments, List splitSourceInfo) { IcebergTableExecuteHandle executeHandle = (IcebergTableExecuteHandle) tableExecuteHandle; switch (executeHandle.procedureId()) { case OPTIMIZE: - finishOptimize(session, executeHandle, fragments, splitSourceInfo); - return; + return finishOptimize(session, executeHandle, fragments, splitSourceInfo); case OPTIMIZE_MANIFESTS: case DROP_EXTENDED_STATS: case ROLLBACK_TO_SNAPSHOT: @@ -2126,7 +2125,7 @@ public void finishTableExecute(ConnectorSession session, ConnectorTableExecuteHa throw new IllegalArgumentException("Unknown procedure '" + executeHandle.procedureId() + "'"); } - private void finishOptimize(ConnectorSession session, IcebergTableExecuteHandle executeHandle, Collection fragments, List splitSourceInfo) + private Map finishOptimize(ConnectorSession session, IcebergTableExecuteHandle executeHandle, Collection fragments, List splitSourceInfo) { IcebergOptimizeHandle optimizeHandle = (IcebergOptimizeHandle) executeHandle.procedureHandle(); Table icebergTable = transaction.table(); @@ -2175,7 +2174,7 @@ private void finishOptimize(ConnectorSession session, IcebergTableExecuteHandle if (optimizeHandle.snapshotId().isEmpty() || scannedDataFiles.isEmpty() && fullyAppliedDeleteFiles.isEmpty() && newFiles.isEmpty()) { // Either the table is empty, or the table scan turned out to be empty, nothing to commit transaction = null; - return; + return new OptimizeResult(0L, 0L, 0L).toMap(); } RewriteFiles rewriteFiles = transaction.newRewrite(); @@ -2199,6 +2198,20 @@ private void finishOptimize(ConnectorSession session, IcebergTableExecuteHandle commitTransaction(transaction, "optimize"); transaction = null; + + return new OptimizeResult(scannedDataFiles.size(), fullyAppliedDeleteFiles.size(), newFiles.size()).toMap(); + } + + private record OptimizeResult(long rewrittenDataFiles, long removedDeleteFiles, long addedDataFiles) + { + Map toMap() + { + return ImmutableMap.builder() + .put("rewritten_data_files_count", rewrittenDataFiles) + .put("removed_delete_files_count", removedDeleteFiles) + .put("added_data_files_count", addedDataFiles) + .buildOrThrow(); + } } private static void commitUpdateAndTransaction(SnapshotUpdate update, ConnectorSession session, Transaction transaction, String operation) diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java index d0c2d1dda464..2b36a0c77144 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java @@ -5703,7 +5703,9 @@ public void testOptimize() .hasSizeGreaterThan(workerCount); // For optimize we need to set task_min_writer_count to 1, otherwise it will create more than one file. - computeActual(withSingleWriterPerTask(getSession()), "ALTER TABLE " + tableName + " EXECUTE OPTIMIZE"); + assertUpdate( + withSingleWriterPerTask(getSession()), "ALTER TABLE " + tableName + " EXECUTE OPTIMIZE", + "VALUES ('rewritten_data_files_count', 5), ('removed_delete_files_count', 0), ('added_data_files_count', 1)"); assertThat(query("SELECT sum(key), listagg(value, ' ') WITHIN GROUP (ORDER BY key) FROM " + tableName)) .matches("VALUES (BIGINT '65', VARCHAR 'eleven zwölf trzynaście quatorze пʼятнадцять')"); List updatedFiles = getActiveFiles(tableName); @@ -5908,7 +5910,9 @@ public void testOptimizeTableAfterDeleteWithFormatVersion2() "VALUES '1'"); // For optimize we need to set task_min_writer_count to 1, otherwise it will create more than one file. - computeActual(withSingleWriterPerTask(getSession()), "ALTER TABLE " + tableName + " EXECUTE OPTIMIZE"); + assertUpdate( + withSingleWriterPerTask(getSession()), "ALTER TABLE " + tableName + " EXECUTE OPTIMIZE", + "VALUES ('rewritten_data_files_count', 1), ('removed_delete_files_count', 1), ('added_data_files_count', 1)"); List updatedFiles = getActiveFiles(tableName); assertThat(updatedFiles) @@ -6035,6 +6039,9 @@ void testOptimizeOnlyOneFileShouldHaveNoEffect() assertThat(initialFiles).hasSize(1); computeActual("ALTER TABLE " + tableName + " EXECUTE OPTIMIZE"); + assertUpdate( + "ALTER TABLE " + tableName + " EXECUTE OPTIMIZE", + "VALUES ('rewritten_data_files_count', 0), ('removed_delete_files_count', 0), ('added_data_files_count', 0)"); assertThat(query("SELECT a FROM " + tableName)) .matches("VALUES 1, 2"); assertThat(getActiveFiles(tableName)) diff --git a/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseMetadata.java b/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseMetadata.java index 6d7bb516517d..dea2a8b15829 100644 --- a/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseMetadata.java +++ b/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseMetadata.java @@ -205,9 +205,9 @@ public BeginTableExecuteResult fragments, List tableExecuteState) + public Map finishTableExecute(ConnectorSession session, ConnectorTableExecuteHandle tableExecuteHandle, Collection fragments, List tableExecuteState) { - forHandle(tableExecuteHandle).finishTableExecute(session, tableExecuteHandle, fragments, tableExecuteState); + return forHandle(tableExecuteHandle).finishTableExecute(session, tableExecuteHandle, fragments, tableExecuteState); } @Override