Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,8 @@ Optional<ConnectorOutputMetadata> finishRefreshMaterializedView(
Collection<Slice> fragments,
Collection<ComputedStatistics> computedStatistics,
List<TableHandle> sourceTableHandles,
List<String> sourceTableFunctions);
List<String> sourceTableFunctions,
boolean hasNonDeterministicFunctions);

/**
* Push update into connector
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1319,7 +1319,8 @@ public Optional<ConnectorOutputMetadata> finishRefreshMaterializedView(
Collection<Slice> fragments,
Collection<ComputedStatistics> computedStatistics,
List<TableHandle> sourceTableHandles,
List<String> sourceTableFunctions)
List<String> sourceTableFunctions,
boolean hasNonDeterministicFunctions)
{
CatalogHandle catalogHandle = insertHandle.catalogHandle();
ConnectorMetadata metadata = getMetadata(session, catalogHandle);
Expand All @@ -1337,7 +1338,8 @@ public Optional<ConnectorOutputMetadata> finishRefreshMaterializedView(
computedStatistics,
sourceConnectorHandles,
sourceConnectorHandles.size() < sourceTableHandles.size(),
!sourceTableFunctions.isEmpty());
!sourceTableFunctions.isEmpty(),
hasNonDeterministicFunctions);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.trino.sql.tree.FunctionCall;
import io.trino.sql.tree.LocalTime;
import io.trino.sql.tree.LocalTimestamp;
import io.trino.sql.tree.Node;

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
Expand Down Expand Up @@ -66,12 +67,12 @@ protected Void visitFunctionCall(FunctionCall node, AtomicBoolean deterministic)
}
}

public static boolean containsCurrentTimeFunctions(Expression expression)
public static boolean containsCurrentTimeFunctions(Node node)
{
requireNonNull(expression, "expression is null");
requireNonNull(node, "node is null");

AtomicBoolean hasTemporalFunction = new AtomicBoolean(false);
new DeterminismEvaluator.TemporalFunctionVisitor().process(expression, hasTemporalFunction);
new DeterminismEvaluator.TemporalFunctionVisitor().process(node, hasTemporalFunction);
return hasTemporalFunction.get();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4199,7 +4199,8 @@ private static TableFinisher createTableFinisher(Session session, TableFinishNod
fragments,
statistics,
refreshTarget.getSourceTableHandles(),
refreshTarget.getSourceTableFunctions());
refreshTarget.getSourceTableFunctions(),
refreshTarget.hasNonDeterministicFunctions());
}
if (target instanceof TableExecuteTarget tableExecuteTarget) {
TableExecuteHandle tableExecuteHandle = tableExecuteTarget.getExecuteHandle();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@
import static io.trino.spi.type.IntegerType.INTEGER;
import static io.trino.spi.type.VarbinaryType.VARBINARY;
import static io.trino.spi.type.VarcharType.VARCHAR;
import static io.trino.sql.analyzer.DeterminismEvaluator.containsCurrentTimeFunctions;
import static io.trino.sql.analyzer.SemanticExceptions.semanticException;
import static io.trino.sql.analyzer.TypeSignatureProvider.fromTypes;
import static io.trino.sql.ir.Booleans.TRUE;
Expand Down Expand Up @@ -605,7 +606,9 @@ private RelationPlan getInsertPlan(
TableStatisticsMetadata statisticsMetadata = metadata.getStatisticsCollectionMetadataForWrite(session, tableHandle.catalogHandle(), tableMetadata.metadata(), false);

if (materializedViewRefreshWriterTarget.isPresent()) {
RefreshType refreshType = IncrementalRefreshVisitor.canIncrementallyRefresh(plan.getRoot());
RefreshType refreshType = materializedViewRefreshWriterTarget.get().hasNonDeterministicFunctions()
? RefreshType.FULL
Comment thread
hashhar marked this conversation as resolved.
: IncrementalRefreshVisitor.canIncrementallyRefresh(plan.getRoot());
WriterTarget writerTarget = materializedViewRefreshWriterTarget.get().withRefreshType(refreshType);
return createTableWriterPlan(
analysis,
Expand Down Expand Up @@ -688,11 +691,16 @@ private RelationPlan createRefreshMaterializedViewPlan(Analysis analysis)
List<String> tableFunctions = analysis.getPolymorphicTableFunctions().stream()
.map(polymorphicTableFunction -> polymorphicTableFunction.getNode().getName().toString())
.collect(toImmutableList());
// TODO: For time-based functions (current_date, current_timestamp) smarter freshness tracking
// could avoid treating the MV as stale when the time hasn't meaningfully changed. See https://github.com/trinodb/trino/issues/28731
boolean hasNonDeterministicFunctions = analysis.getResolvedFunctions().stream().anyMatch(function -> !function.deterministic())
|| containsCurrentTimeFunctions(query);
Comment thread
hashhar marked this conversation as resolved.
RefreshMaterializedViewReference writerTarget = new RefreshMaterializedViewReference(
viewAnalysis.getTable().toString(),
tableHandle,
ImmutableList.copyOf(analysis.getTables()),
tableFunctions,
hasNonDeterministicFunctions,
// this is a placeholder value - refresh type will be determined by getInsertPlan based on the plan tree
RefreshType.FULL);
return getInsertPlan(analysis, viewAnalysis.getTable(), query, tableHandle, viewAnalysis.getColumns(), newTableLayout, Optional.of(writerTarget));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,7 @@ private WriterTarget createWriterTarget(WriterTarget target, PlanNode planNode)
metadata.getTableName(session, refreshMV.getStorageTableHandle()).getSchemaTableName(),
refreshMV.getSourceTableHandles(),
refreshMV.getSourceTableFunctions(),
refreshMV.hasNonDeterministicFunctions(),
refreshMV.getWriterScalingOptions(metadata, session));
}
if (target instanceof TableExecuteTarget tableExecute) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -489,19 +489,22 @@ public static class RefreshMaterializedViewReference
private final TableHandle storageTableHandle;
private final List<TableHandle> sourceTableHandles;
private final List<String> sourceTableFunctions;
private final boolean hasNonDeterministicFunctions;
private final RefreshType refreshType;

public RefreshMaterializedViewReference(
String table,
TableHandle storageTableHandle,
List<TableHandle> sourceTableHandles,
List<String> sourceTableFunctions,
boolean hasNonDeterministicFunctions,
RefreshType refreshType)
{
this.table = requireNonNull(table, "table is null");
this.storageTableHandle = requireNonNull(storageTableHandle, "storageTableHandle is null");
this.sourceTableHandles = ImmutableList.copyOf(sourceTableHandles);
this.sourceTableFunctions = ImmutableList.copyOf(sourceTableFunctions);
this.hasNonDeterministicFunctions = hasNonDeterministicFunctions;
this.refreshType = requireNonNull(refreshType, "refreshType is null");
}

Expand Down Expand Up @@ -540,6 +543,11 @@ public List<String> getSourceTableFunctions()
return sourceTableFunctions;
}

public boolean hasNonDeterministicFunctions()
{
return hasNonDeterministicFunctions;
}

@Override
public WriterScalingOptions getWriterScalingOptions(Metadata metadata, Session session)
{
Expand All @@ -553,7 +561,7 @@ public RefreshType getRefreshType()

public RefreshMaterializedViewReference withRefreshType(RefreshType refreshType)
{
return new RefreshMaterializedViewReference(table, storageTableHandle, sourceTableHandles, sourceTableFunctions, refreshType);
return new RefreshMaterializedViewReference(table, storageTableHandle, sourceTableHandles, sourceTableFunctions, hasNonDeterministicFunctions, refreshType);
}
}

Expand All @@ -565,6 +573,7 @@ public static class RefreshMaterializedViewTarget
private final SchemaTableName schemaTableName;
private final List<TableHandle> sourceTableHandles;
private final List<String> sourceTableFunctions;
private final boolean hasNonDeterministicFunctions;
private final WriterScalingOptions writerScalingOptions;

@JsonCreator
Expand All @@ -574,13 +583,15 @@ public RefreshMaterializedViewTarget(
@JsonProperty("schemaTableName") SchemaTableName schemaTableName,
@JsonProperty("sourceTableHandles") List<TableHandle> sourceTableHandles,
@JsonProperty("sourceTableFunctions") List<String> sourceTableFunctions,
@JsonProperty("hasNonDeterministicFunctions") boolean hasNonDeterministicFunctions,
@JsonProperty("writerScalingOptions") WriterScalingOptions writerScalingOptions)
{
this.tableHandle = requireNonNull(tableHandle, "tableHandle is null");
this.insertHandle = requireNonNull(insertHandle, "insertHandle is null");
this.schemaTableName = requireNonNull(schemaTableName, "schemaTableName is null");
this.sourceTableHandles = ImmutableList.copyOf(sourceTableHandles);
this.sourceTableFunctions = ImmutableList.copyOf(sourceTableFunctions);
this.hasNonDeterministicFunctions = hasNonDeterministicFunctions;
this.writerScalingOptions = requireNonNull(writerScalingOptions, "writerScalingOptions is null");
}

Expand Down Expand Up @@ -614,6 +625,12 @@ public List<String> getSourceTableFunctions()
return sourceTableFunctions;
}

@JsonProperty
public boolean hasNonDeterministicFunctions()
{
return hasNonDeterministicFunctions;
}

@JsonProperty
public WriterScalingOptions getWriterScalingOptions()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -748,11 +748,11 @@ public ConnectorInsertTableHandle beginRefreshMaterializedView(ConnectorSession
}

@Override
public Optional<ConnectorOutputMetadata> finishRefreshMaterializedView(ConnectorSession session, ConnectorTableHandle tableHandle, ConnectorInsertTableHandle insertHandle, Collection<Slice> fragments, Collection<ComputedStatistics> computedStatistics, List<ConnectorTableHandle> sourceTableHandles, boolean hasForeignSourceTables, boolean hasSourceTableFunctions)
public Optional<ConnectorOutputMetadata> finishRefreshMaterializedView(ConnectorSession session, ConnectorTableHandle tableHandle, ConnectorInsertTableHandle insertHandle, Collection<Slice> fragments, Collection<ComputedStatistics> computedStatistics, List<ConnectorTableHandle> sourceTableHandles, boolean hasForeignSourceTables, boolean hasSourceTableFunctions, boolean hasNonDeterministicFunctions)
{
Span span = startSpan("finishRefreshMaterializedView", tableHandle);
try (var _ = scopedSpan(span)) {
return delegate.finishRefreshMaterializedView(session, tableHandle, insertHandle, fragments, computedStatistics, sourceTableHandles, hasForeignSourceTables, hasSourceTableFunctions);
return delegate.finishRefreshMaterializedView(session, tableHandle, insertHandle, fragments, computedStatistics, sourceTableHandles, hasForeignSourceTables, hasSourceTableFunctions, hasNonDeterministicFunctions);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -791,7 +791,8 @@ public Optional<ConnectorOutputMetadata> finishRefreshMaterializedView(
Collection<Slice> fragments,
Collection<ComputedStatistics> computedStatistics,
List<TableHandle> sourceTableHandles,
List<String> sourceTableFunctions)
List<String> sourceTableFunctions,
boolean hasNonDeterministicFunctions)
{
Span span = startSpan("finishRefreshMaterializedView", tableHandle);
try (var _ = scopedSpan(span)) {
Expand All @@ -802,7 +803,8 @@ public Optional<ConnectorOutputMetadata> finishRefreshMaterializedView(
fragments,
computedStatistics,
sourceTableHandles,
sourceTableFunctions);
sourceTableFunctions,
hasNonDeterministicFunctions);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -771,7 +771,7 @@ public ConnectorInsertTableHandle beginRefreshMaterializedView(ConnectorSession
}

@Override
public Optional<ConnectorOutputMetadata> finishRefreshMaterializedView(ConnectorSession session, ConnectorTableHandle tableHandle, ConnectorInsertTableHandle insertHandle, Collection<Slice> fragments, Collection<ComputedStatistics> computedStatistics, List<ConnectorTableHandle> sourceTableHandles, boolean hasForeignSourceTables, boolean hasSourceTableFunctions)
public Optional<ConnectorOutputMetadata> finishRefreshMaterializedView(ConnectorSession session, ConnectorTableHandle tableHandle, ConnectorInsertTableHandle insertHandle, Collection<Slice> fragments, Collection<ComputedStatistics> computedStatistics, List<ConnectorTableHandle> sourceTableHandles, boolean hasForeignSourceTables, boolean hasSourceTableFunctions, boolean hasNonDeterministicFunctions)
{
return Optional.empty();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -538,7 +538,8 @@ public Optional<ConnectorOutputMetadata> finishRefreshMaterializedView(
Collection<Slice> fragments,
Collection<ComputedStatistics> computedStatistics,
List<TableHandle> sourceTableHandles,
List<String> sourceTableFunctions)
List<String> sourceTableFunctions,
boolean hasNonDeterministicFunctions)
{
throw new UnsupportedOperationException();
}
Expand Down
Loading