Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,22 @@
package io.trino.execution;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.errorprone.annotations.concurrent.GuardedBy;

import java.util.List;
import java.util.Map;
import java.util.Optional;

import static com.google.common.base.Preconditions.checkState;
import static java.util.Objects.requireNonNull;

public class TableExecuteContext
{
@GuardedBy("this")
private List<Object> splitsInfo;
@GuardedBy("this")
private Map<String, Long> metrics;

public synchronized void setSplitsInfo(List<Object> splitsInfo)
{
Expand All @@ -41,4 +47,15 @@ public synchronized List<Object> getSplitsInfo()
}
return splitsInfo;
}

public synchronized Optional<Map<String, Long>> getMetrics()
{
return Optional.ofNullable(metrics);
}

public synchronized void setMetrics(Map<String, Long> newMetrics)
{
checkState(metrics == null, "metrics already set to %s", metrics);
metrics = ImmutableMap.copyOf(newMetrics);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ Optional<TableExecuteHandle> getTableHandleForExecute(

BeginTableExecuteResult<TableExecuteHandle, TableHandle> beginTableExecute(Session session, TableExecuteHandle handle, TableHandle updatedSourceTableHandle);

void finishTableExecute(Session session, TableExecuteHandle handle, Collection<Slice> fragments, List<Object> tableExecuteState);
Map<String, Long> finishTableExecute(Session session, TableExecuteHandle handle, Collection<Slice> fragments, List<Object> tableExecuteState);

Map<String, Long> executeTableExecute(Session session, TableExecuteHandle handle);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -395,11 +395,11 @@ public BeginTableExecuteResult<TableExecuteHandle, TableHandle> beginTableExecut
}

@Override
public void finishTableExecute(Session session, TableExecuteHandle tableExecuteHandle, Collection<Slice> fragments, List<Object> tableExecuteState)
public Map<String, Long> finishTableExecute(Session session, TableExecuteHandle tableExecuteHandle, Collection<Slice> fragments, List<Object> tableExecuteState)
{
CatalogHandle catalogHandle = tableExecuteHandle.catalogHandle();
ConnectorMetadata metadata = getMetadata(session, catalogHandle);
metadata.finishTableExecute(session.toConnectorSession(catalogHandle), tableExecuteHandle.connectorHandle(), fragments, tableExecuteState);
return metadata.finishTableExecute(session.toConnectorSession(catalogHandle), tableExecuteHandle.connectorHandle(), fragments, tableExecuteState);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import io.trino.spi.PageBuilder;
import io.trino.spi.QueryId;
import io.trino.spi.block.Block;
import io.trino.spi.block.BlockBuilder;
import io.trino.spi.connector.ConnectorOutputMetadata;
import io.trino.spi.statistics.ComputedStatistics;
import io.trino.spi.type.Type;
Expand All @@ -34,16 +35,20 @@

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;

import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Verify.verify;
import static io.airlift.slice.Slices.utf8Slice;
import static io.trino.operator.TableWriterOperator.FRAGMENT_CHANNEL;
import static io.trino.operator.TableWriterOperator.ROW_COUNT_CHANNEL;
import static io.trino.spi.type.BigintType.BIGINT;
import static io.trino.spi.type.VarbinaryType.VARBINARY;
import static io.trino.spi.type.VarcharType.VARCHAR;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.TimeUnit.NANOSECONDS;

Expand Down Expand Up @@ -314,6 +319,21 @@ public Page getOutput()

this.outputMetadata.set(tableFinisher.finishTable(fragmentBuilder.build(), computedStatisticsBuilder.build(), tableExecuteContext));

// Check if table execute context has metrics to output
Optional<Map<String, Long>> metricsOptional = tableExecuteContext.getMetrics();
if (metricsOptional.isPresent()) {
Map<String, Long> metrics = metricsOptional.get();
PageBuilder pageBuilder = new PageBuilder(ImmutableList.of(VARCHAR, BIGINT));
BlockBuilder metricNameBuilder = pageBuilder.getBlockBuilder(0);
BlockBuilder metricValueBuilder = pageBuilder.getBlockBuilder(1);
for (Entry<String, Long> entry : metrics.entrySet()) {
VARCHAR.writeSlice(metricNameBuilder, utf8Slice(entry.getKey()));
BIGINT.writeLong(metricValueBuilder, entry.getValue());
pageBuilder.declarePosition();
}
return pageBuilder.build();
}

// output page will only be constructed once,
// so a new PageBuilder is constructed (instead of using PageBuilder.reset)
PageBuilder page = new PageBuilder(1, TYPES);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1338,10 +1338,7 @@ protected Scope visitTableExecute(TableExecute node, Optional<Scope> scope)
analysis.setUpdateType("ALTER TABLE EXECUTE");
analysis.setUpdateTarget(executeHandle.catalogHandle().getVersion(), tableName, Optional.of(table), Optional.empty());

if (!procedureMetadata.getExecutionMode().isReadsData()) {
return createAndAssignScope(node, scope, Field.newUnqualified("metric_name", VARCHAR), Field.newUnqualified("metric_value", BIGINT));
}
return createAndAssignScope(node, scope, Field.newUnqualified("rows", BIGINT));
return createAndAssignScope(node, scope, Field.newUnqualified("metric_name", VARCHAR), Field.newUnqualified("metric_value", BIGINT));
}

private List<Property> processTableExecuteArguments(TableExecute node, TableProcedureMetadata procedureMetadata, Optional<Scope> scope)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3472,7 +3472,7 @@ public PhysicalOperation visitTableFinish(TableFinishNode node, LocalExecutionPl
tableExecuteContextManager,
shouldOutputRowCount(node),
session);
Map<Symbol, Integer> layout = ImmutableMap.of(getOnlyElement(node.getOutputSymbols()), 0);
Map<Symbol, Integer> layout = makeLayoutFromOutputSymbols(node.getOutputSymbols());

return new PhysicalOperation(operatorFactory, layout, source);
}
Expand Down Expand Up @@ -4206,7 +4206,8 @@ private static TableFinisher createTableFinisher(Session session, TableFinishNod
}
if (target instanceof TableExecuteTarget tableExecuteTarget) {
TableExecuteHandle tableExecuteHandle = tableExecuteTarget.getExecuteHandle();
metadata.finishTableExecute(session, tableExecuteHandle, fragments, tableExecuteContext.getSplitsInfo());
Map<String, Long> metrics = metadata.finishTableExecute(session, tableExecuteHandle, fragments, tableExecuteContext.getSplitsInfo());
tableExecuteContext.setMetrics(metrics);
return Optional.empty();
}
if (target instanceof MergeTarget mergeTarget) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -779,7 +779,7 @@ else if (isUsePreferredWritePartitioning(session)) {
Optional.of(partialAggregation),
Optional.of(result.getDescriptor().map(aggregations.getMappings()::get))),
target,
symbolAllocator.newSymbol("rows", BIGINT),
ImmutableList.of(symbolAllocator.newSymbol("rows", BIGINT)),
Optional.of(aggregations.getFinalAggregation()),
Optional.of(result.getDescriptor()));

Expand All @@ -800,7 +800,7 @@ else if (isUsePreferredWritePartitioning(session)) {
Optional.empty(),
Optional.empty()),
target,
symbolAllocator.newSymbol("rows", BIGINT),
ImmutableList.of(symbolAllocator.newSymbol("rows", BIGINT)),
Optional.empty(),
Optional.empty());

Expand Down Expand Up @@ -865,7 +865,7 @@ private RelationPlan createDeletePlan(Analysis analysis, Delete node)
idAllocator.getNextId(),
planNode,
target,
symbolAllocator.newSymbol("rows", BIGINT),
ImmutableList.of(symbolAllocator.newSymbol("rows", BIGINT)),
Optional.empty(),
Optional.empty());

Expand All @@ -882,7 +882,7 @@ private RelationPlan createUpdatePlan(Analysis analysis, Update node)
idAllocator.getNextId(),
planNode,
target,
symbolAllocator.newSymbol("rows", BIGINT),
ImmutableList.of(symbolAllocator.newSymbol("rows", BIGINT)),
Optional.empty(),
Optional.empty());

Expand All @@ -898,7 +898,7 @@ private RelationPlan createMergePlan(Analysis analysis, Merge node)
idAllocator.getNextId(),
mergeNode,
mergeNode.getTarget(),
symbolAllocator.newSymbol("rows", BIGINT),
ImmutableList.of(symbolAllocator.newSymbol("rows", BIGINT)),
Optional.empty(),
Optional.empty());

Expand Down Expand Up @@ -980,7 +980,7 @@ private RelationPlan createTableExecutePlan(Analysis analysis, TableExecute stat
symbolAllocator.newSymbol("metricName", VARCHAR),
symbolAllocator.newSymbol("metricValue", BIGINT)),
executeHandle);
return new RelationPlan(node, analysis.getRootScope(), node.getOutputSymbols(), Optional.empty());
return new RelationPlan(node, analysis.getScope(statement), node.getOutputSymbols(), Optional.empty());
}

TableHandle tableHandle = analysis.getTableHandle(table);
Expand Down Expand Up @@ -1056,6 +1056,12 @@ else if (isUsePreferredWritePartitioning(session)) {
}

verify(columnNames.size() == symbols.size(), "columnNames.size() != symbols.size(): %s and %s", columnNames, symbols);

List<Symbol> outputSymbols = ImmutableList.<Symbol>builder()
.add(symbolAllocator.newSymbol("metricName", VARCHAR))
.add(symbolAllocator.newSymbol("metricValue", BIGINT))
.build();

TableFinishNode commitNode = new TableFinishNode(
idAllocator.getNextId(),
new TableExecuteNode(
Expand All @@ -1068,11 +1074,11 @@ else if (isUsePreferredWritePartitioning(session)) {
columnNames,
partitioningScheme),
tableExecuteTarget,
symbolAllocator.newSymbol("rows", BIGINT),
outputSymbols,
Optional.empty(),
Optional.empty());

return new RelationPlan(commitNode, analysis.getRootScope(), commitNode.getOutputSymbols(), Optional.empty());
return new RelationPlan(commitNode, analysis.getScope(statement), outputSymbols, Optional.empty());
}

private static class Key
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import io.trino.matching.Captures;
import io.trino.matching.Pattern;
import io.trino.sql.ir.Constant;
import io.trino.sql.ir.Expression;
import io.trino.sql.ir.Row;
import io.trino.sql.planner.iterative.Lookup;
import io.trino.sql.planner.iterative.Rule;
Expand All @@ -26,10 +27,11 @@
import io.trino.sql.planner.plan.TableFinishNode;
import io.trino.sql.planner.plan.ValuesNode;

import java.util.List;
import java.util.Optional;

import static com.google.common.base.Verify.verify;
import static io.trino.spi.type.BigintType.BIGINT;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.trino.sql.planner.plan.Patterns.tableFinish;

/**
Expand Down Expand Up @@ -76,11 +78,15 @@ public Result apply(TableFinishNode finishNode, Captures captures, Context conte
}
verify(valuesNode.getRowCount() == 0, "Unexpected non-empty Values as source of TableExecuteNode");

List<Expression> rowValues = finishNode.getOutputSymbols().stream()
.map(symbol -> new Constant(symbol.type(), null))
.collect(toImmutableList());

return Result.ofPlanNode(
new ValuesNode(
finishNode.getId(),
finishNode.getOutputSymbols(),
ImmutableList.of(new Row(ImmutableList.of(new Constant(BIGINT, null))))));
ImmutableList.of(new Row(rowValues))));
}

private Optional<PlanNode> getSingleSourceSkipExchange(PlanNode node, Lookup lookup)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ public PlanNode visitTableFinish(TableFinishNode node, RewriteContext<Optional<W
node.getId(),
child,
newTarget,
node.getRowCountSymbol(),
node.getOutputSymbols(),
node.getStatisticsAggregation(),
node.getStatisticsAggregationDescriptor());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -612,7 +612,7 @@ public TableFinishNode map(TableFinishNode node, PlanNode source)
node.getId(),
source,
node.getTarget(),
map(node.getRowCountSymbol()),
node.getOutputSymbols(),
node.getStatisticsAggregation().map(this::map),
node.getStatisticsAggregationDescriptor().map(descriptor -> descriptor.map(this::map)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public class TableFinishNode
{
private final PlanNode source;
private final WriterTarget target;
private final Symbol rowCountSymbol;
private final List<Symbol> outputSymbols;
private final Optional<StatisticAggregations> statisticsAggregation;
private final Optional<StatisticAggregationsDescriptor<Symbol>> statisticsAggregationDescriptor;

Expand All @@ -42,7 +42,7 @@ public TableFinishNode(
@JsonProperty("id") PlanNodeId id,
@JsonProperty("source") PlanNode source,
@JsonProperty("target") WriterTarget target,
@JsonProperty("rowCountSymbol") Symbol rowCountSymbol,
@JsonProperty("outputSymbols") List<Symbol> outputSymbols,
@JsonProperty("statisticsAggregation") Optional<StatisticAggregations> statisticsAggregation,
@JsonProperty("statisticsAggregationDescriptor") Optional<StatisticAggregationsDescriptor<Symbol>> statisticsAggregationDescriptor)
{
Expand All @@ -51,7 +51,7 @@ public TableFinishNode(
checkArgument(target != null || source instanceof TableWriterNode);
this.source = requireNonNull(source, "source is null");
this.target = requireNonNull(target, "target is null");
this.rowCountSymbol = requireNonNull(rowCountSymbol, "rowCountSymbol is null");
this.outputSymbols = ImmutableList.copyOf(outputSymbols);
this.statisticsAggregation = requireNonNull(statisticsAggregation, "statisticsAggregation is null");
this.statisticsAggregationDescriptor = requireNonNull(statisticsAggregationDescriptor, "statisticsAggregationDescriptor is null");
checkArgument(statisticsAggregation.isPresent() == statisticsAggregationDescriptor.isPresent(), "statisticsAggregation and statisticsAggregationDescriptor must both be either present or absent");
Expand All @@ -69,12 +69,6 @@ public WriterTarget getTarget()
return target;
}

@JsonProperty
public Symbol getRowCountSymbol()
{
return rowCountSymbol;
}

@JsonProperty
public Optional<StatisticAggregations> getStatisticsAggregation()
{
Expand All @@ -93,10 +87,11 @@ public List<PlanNode> getSources()
return ImmutableList.of(source);
}

@JsonProperty
@Override
public List<Symbol> getOutputSymbols()
{
return ImmutableList.of(rowCountSymbol);
return outputSymbols;
}

@Override
Expand All @@ -112,7 +107,7 @@ public PlanNode replaceChildren(List<PlanNode> newChildren)
getId(),
Iterables.getOnlyElement(newChildren),
target,
rowCountSymbol,
outputSymbols,
statisticsAggregation,
statisticsAggregationDescriptor);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,11 +183,11 @@ public BeginTableExecuteResult<ConnectorTableExecuteHandle, ConnectorTableHandle
}

@Override
public void finishTableExecute(ConnectorSession session, ConnectorTableExecuteHandle tableExecuteHandle, Collection<Slice> fragments, List<Object> tableExecuteState)
public Map<String, Long> finishTableExecute(ConnectorSession session, ConnectorTableExecuteHandle tableExecuteHandle, Collection<Slice> fragments, List<Object> tableExecuteState)
{
Span span = startSpan("finishTableExecute", tableExecuteHandle);
try (var _ = scopedSpan(span)) {
delegate.finishTableExecute(session, tableExecuteHandle, fragments, tableExecuteState);
return delegate.finishTableExecute(session, tableExecuteHandle, fragments, tableExecuteState);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,11 +242,11 @@ public BeginTableExecuteResult<TableExecuteHandle, TableHandle> beginTableExecut
}

@Override
public void finishTableExecute(Session session, TableExecuteHandle handle, Collection<Slice> fragments, List<Object> tableExecuteState)
public Map<String, Long> finishTableExecute(Session session, TableExecuteHandle handle, Collection<Slice> fragments, List<Object> tableExecuteState)
{
Span span = startSpan("finishTableExecute", handle);
try (var _ = scopedSpan(span)) {
delegate.finishTableExecute(session, handle, fragments, tableExecuteState);
return delegate.finishTableExecute(session, handle, fragments, tableExecuteState);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -902,7 +902,10 @@ public Map<String, Long> executeTableExecute(ConnectorSession session, Connector
}

@Override
public void finishTableExecute(ConnectorSession session, ConnectorTableExecuteHandle tableExecuteHandle, Collection<Slice> fragments, List<Object> tableExecuteState) {}
public Map<String, Long> finishTableExecute(ConnectorSession session, ConnectorTableExecuteHandle tableExecuteHandle, Collection<Slice> fragments, List<Object> tableExecuteState)
{
return ImmutableMap.of();
}

@Override
public Collection<FunctionMetadata> listFunctions(ConnectorSession session, String schemaName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ public BeginTableExecuteResult<TableExecuteHandle, TableHandle> beginTableExecut
}

@Override
public void finishTableExecute(Session session, TableExecuteHandle handle, Collection<Slice> fragments, List<Object> tableExecuteState)
public Map<String, Long> finishTableExecute(Session session, TableExecuteHandle handle, Collection<Slice> fragments, List<Object> tableExecuteState)
{
throw new UnsupportedOperationException();
}
Expand Down
Loading