Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,11 @@
import io.trino.spi.connector.ConnectorPageSourceProvider;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorSplit;
import io.trino.spi.connector.ConnectorTableCredentials;
import io.trino.spi.connector.ConnectorTableHandle;
import io.trino.spi.connector.ConnectorTransactionHandle;
import io.trino.spi.connector.DynamicFilter;

import java.util.List;
import java.util.Optional;

import static java.util.Objects.requireNonNull;

Expand All @@ -52,7 +50,6 @@ public ConnectorPageSource createPageSource(
ConnectorSession session,
ConnectorSplit split,
ConnectorTableHandle tableHandle,
Optional<ConnectorTableCredentials> tableCredentials,
List<ColumnHandle> columns,
DynamicFilter dynamicFilter)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import io.trino.spi.connector.ConnectorPageSourceProviderFactory;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorSplit;
import io.trino.spi.connector.ConnectorTableCredentials;
import io.trino.spi.connector.ConnectorTableHandle;
import io.trino.spi.connector.ConnectorTransactionHandle;
import io.trino.spi.connector.DynamicFilter;
Expand Down Expand Up @@ -81,7 +80,6 @@ public ConnectorPageSource createPageSource(
ConnectorSession session,
ConnectorSplit split,
ConnectorTableHandle table,
Optional<ConnectorTableCredentials> tableCredentials,
List<ColumnHandle> columns,
DynamicFilter dynamicFilter)
{
Expand All @@ -91,7 +89,7 @@ public ConnectorPageSource createPageSource(
// if the split is not a SystemSplit, we immediately delegate to the Connector to build a PageSource
if (!(split instanceof SystemSplit systemSplit)) {
return connectorPageSourceProvider.orElseThrow()
.createPageSource(systemTransaction.getConnectorTransactionHandle(), session, split, table, tableCredentials, columns, dynamicFilter);
.createPageSource(systemTransaction.getConnectorTransactionHandle(), session, split, table, columns, dynamicFilter);
}

SchemaTableName tableName = ((SystemTableHandle) table).schemaTableName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,15 @@
import io.trino.spi.connector.ColumnMetadata;
import io.trino.spi.connector.ConnectorMetadata;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorTableCredentials;
import io.trino.spi.connector.ConnectorTableHandle;
import io.trino.spi.connector.ConnectorTableMetadata;
import io.trino.spi.connector.ConnectorTableVersion;
import io.trino.spi.connector.ConnectorWritableTableHandle;
import io.trino.spi.connector.Constraint;
import io.trino.spi.connector.ConstraintApplicationResult;
import io.trino.spi.connector.RelationColumnsMetadata;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.connector.SystemColumnHandle;
import io.trino.spi.connector.SystemTable;
import io.trino.spi.function.table.ConnectorTableFunctionHandle;
import io.trino.spi.predicate.TupleDomain;

import java.util.Iterator;
Expand Down Expand Up @@ -171,25 +168,6 @@ public Optional<ConstraintApplicationResult<ConnectorTableHandle>> applyFilter(C
return Optional.of(new ConstraintApplicationResult<>(table, constraint.getSummary(), constraint.getExpression(), false));
}

@Override
public Optional<ConnectorTableCredentials> getTableCredentials(ConnectorSession session, ConnectorTableHandle tableHandle)
{
SystemTable systemTable = checkAndGetTable(session, tableHandle);
return systemTable.getTableCredentials(session);
}

@Override
public Optional<ConnectorTableCredentials> getTableCredentials(ConnectorSession session, ConnectorWritableTableHandle tableHandle)
{
throw new TrinoException(NOT_SUPPORTED, "This connector does not support table credentials");
}

@Override
public Optional<ConnectorTableCredentials> getTableCredentials(ConnectorSession session, ConnectorTableFunctionHandle tableFunctionHandle)
{
throw new TrinoException(NOT_SUPPORTED, "This connector does not support table credentials");
}

private Constraint effectiveConstraint(TupleDomain<ColumnHandle> oldDomain, Constraint newConstraint, TupleDomain<ColumnHandle> effectiveDomain)
{
if (effectiveDomain.isNone() || newConstraint.predicate().isEmpty()) {
Expand Down
10 changes: 0 additions & 10 deletions core/trino-main/src/main/java/io/trino/execution/SqlStage.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import io.trino.metadata.Metadata;
import io.trino.metadata.Split;
import io.trino.metadata.TableFunctionHandle;
import io.trino.metadata.TableHandle;
import io.trino.node.InternalNode;
import io.trino.spi.connector.ConnectorTableCredentials;
import io.trino.spi.metrics.Metrics;
Expand All @@ -43,7 +42,6 @@
import io.trino.sql.planner.plan.SimplePlanRewriter;
import io.trino.sql.planner.plan.TableExecuteNode;
import io.trino.sql.planner.plan.TableFunctionProcessorNode;
import io.trino.sql.planner.plan.TableScanNode;
import io.trino.sql.planner.plan.TableWriterNode;

import java.util.HashSet;
Expand Down Expand Up @@ -499,14 +497,6 @@ public Void visitTableWriter(TableWriterNode node, Void context)
return null;
}

@Override
public Void visitTableScan(TableScanNode node, Void context)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm thinking about connectors that require, or can operate with, table-level credentials, in that case we probably still need this method

{
TableHandle table = node.getTable();
extract(builder, node, metadata.getTableCredentials(session, table.catalogHandle(), table.connectorHandle()));
return null;
}

@Override
public Void visitTableFunctionProcessor(TableFunctionProcessorNode node, Void context)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.connector.ConnectorPageSource;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorTableCredentials;
import io.trino.spi.connector.DynamicFilter;
import io.trino.spi.connector.EmptyPageSource;
import io.trino.spi.connector.SourcePage;
Expand Down Expand Up @@ -90,7 +89,6 @@ private ScanFilterAndProjectOperator(
PageSourceProvider pageSourceProvider,
PageProcessor pageProcessor,
TableHandle table,
Optional<ConnectorTableCredentials> tableCredentials,
List<ColumnHandle> columns,
DynamicFilter dynamicFilter,
List<Type> types,
Expand All @@ -105,7 +103,6 @@ private ScanFilterAndProjectOperator(
pageSourceProvider,
pageProcessor,
table,
tableCredentials,
columns,
dynamicFilter,
types,
Expand Down Expand Up @@ -190,7 +187,6 @@ private class SplitToPages
final PageSourceProvider pageSourceProvider;
final PageProcessor pageProcessor;
final TableHandle table;
final Optional<ConnectorTableCredentials> tableCredentials;
final List<ColumnHandle> columns;
final DynamicFilter dynamicFilter;
final List<Type> types;
Expand All @@ -207,7 +203,6 @@ private class SplitToPages
PageSourceProvider pageSourceProvider,
PageProcessor pageProcessor,
TableHandle table,
Optional<ConnectorTableCredentials> tableCredentials,
List<ColumnHandle> columns,
DynamicFilter dynamicFilter,
List<Type> types,
Expand All @@ -220,7 +215,6 @@ private class SplitToPages
this.pageSourceProvider = requireNonNull(pageSourceProvider, "pageSourceProvider is null");
this.pageProcessor = requireNonNull(pageProcessor, "pageProcessor is null");
this.table = requireNonNull(table, "table is null");
this.tableCredentials = requireNonNull(tableCredentials, "tableCredentials is null");
this.columns = ImmutableList.copyOf(requireNonNull(columns, "columns is null"));
this.dynamicFilter = requireNonNull(dynamicFilter, "dynamicFilter is null");
this.types = ImmutableList.copyOf(requireNonNull(types, "types is null"));
Expand Down Expand Up @@ -251,7 +245,7 @@ public TransformationState<WorkProcessor<Page>> process(Split split)
source = new EmptyPageSource();
}
else {
source = pageSourceProvider.createPageSource(session, split, table, tableCredentials, columns, dynamicFilter);
source = pageSourceProvider.createPageSource(session, split, table, columns, dynamicFilter);
}

pageSource = source;
Expand Down Expand Up @@ -365,7 +359,6 @@ public static class ScanFilterAndProjectOperatorFactory
private final PlanNodeId sourceId;
private final PageSourceProvider pageSourceProvider;
private final TableHandle table;
private final Optional<ConnectorTableCredentials> tableCredentials;
private final List<ColumnHandle> columns;
private final DynamicFilter dynamicFilter;
private final List<Type> types;
Expand All @@ -380,7 +373,6 @@ public ScanFilterAndProjectOperatorFactory(
PageSourceProviderFactory pageSourceProvider,
Function<DynamicFilter, PageProcessor> pageProcessor,
TableHandle table,
Optional<ConnectorTableCredentials> tableCredentials,
List<ColumnHandle> columns,
DynamicFilter dynamicFilter,
List<Type> types,
Expand All @@ -392,7 +384,6 @@ public ScanFilterAndProjectOperatorFactory(
this.pageProcessor = requireNonNull(pageProcessor, "pageProcessor is null");
this.sourceId = requireNonNull(sourceId, "sourceId is null");
this.table = requireNonNull(table, "table is null");
this.tableCredentials = requireNonNull(tableCredentials, "tableCredentials is null");
this.columns = ImmutableList.copyOf(requireNonNull(columns, "columns is null"));
this.dynamicFilter = dynamicFilter;
this.types = requireNonNull(types, "types is null");
Expand Down Expand Up @@ -457,7 +448,6 @@ public WorkProcessorSourceOperator create(
pageSourceProvider,
pageProcessor.apply(dynamicFilter),
table,
tableCredentials,
columns,
dynamicFilter,
types,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import io.trino.spi.Page;
import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.connector.ConnectorPageSource;
import io.trino.spi.connector.ConnectorTableCredentials;
import io.trino.spi.connector.DynamicFilter;
import io.trino.spi.connector.EmptyPageSource;
import io.trino.spi.connector.SourcePage;
Expand Down Expand Up @@ -57,7 +56,6 @@ public static class TableScanOperatorFactory
private final PlanNodeId sourceId;
private final PageSourceProvider pageSourceProvider;
private final TableHandle table;
private final Optional<ConnectorTableCredentials> tableCredentials;
private final List<ColumnHandle> columns;
private final List<Type> columnTypes;
private boolean closed;
Expand All @@ -68,15 +66,13 @@ public TableScanOperatorFactory(
PlanNodeId sourceId,
PageSourceProviderFactory pageSourceProvider,
TableHandle table,
Optional<ConnectorTableCredentials> tableCredentials,
List<ColumnHandle> columns,
List<Type> columnTypes)
{
this.operatorId = operatorId;
this.planNodeId = requireNonNull(planNodeId, "planNodeId is null");
this.sourceId = requireNonNull(sourceId, "sourceId is null");
this.table = requireNonNull(table, "table is null");
this.tableCredentials = requireNonNull(tableCredentials, "tableCredentials is null");
this.columns = ImmutableList.copyOf(requireNonNull(columns, "columns is null"));
this.columnTypes = ImmutableList.copyOf(requireNonNull(columnTypes, "columnTypes is null"));
this.pageSourceProvider = pageSourceProvider.createPageSourceProvider(table.catalogHandle());
Expand All @@ -99,7 +95,6 @@ public SourceOperator createOperator(DriverContext driverContext)
sourceId,
pageSourceProvider,
table,
tableCredentials,
columns);

if (isSourcePagesValidationEnabled(operatorContext.getSession())) {
Expand All @@ -122,7 +117,6 @@ public void noMoreOperators()
private final PlanNodeId sourceId;
private final PageSourceProvider pageSourceProvider;
private final TableHandle table;
private final Optional<ConnectorTableCredentials> tableCredentials;
private final List<ColumnHandle> columns;
private final LocalMemoryContext memoryContext;
private final SettableFuture<Void> blocked = SettableFuture.create();
Expand All @@ -143,14 +137,12 @@ public TableScanOperator(
PlanNodeId sourceId,
PageSourceProvider pageSourceProvider,
TableHandle table,
Optional<ConnectorTableCredentials> tableCredentials,
List<ColumnHandle> columns)
{
this.operatorContext = requireNonNull(operatorContext, "operatorContext is null");
this.sourceId = requireNonNull(sourceId, "planNodeId is null");
this.pageSourceProvider = requireNonNull(pageSourceProvider, "pageSourceProvider is null");
this.table = requireNonNull(table, "table is null");
this.tableCredentials = requireNonNull(tableCredentials, "tableCredentials is null");
this.columns = ImmutableList.copyOf(requireNonNull(columns, "columns is null"));
this.memoryContext = operatorContext.newLocalUserMemoryContext(TableScanOperator.class.getSimpleName());
}
Expand Down Expand Up @@ -268,7 +260,7 @@ public Page getOutput()
return null;
}
if (source == null) {
source = pageSourceProvider.createPageSource(operatorContext.getSession(), split, table, tableCredentials, columns, DynamicFilter.EMPTY);
source = pageSourceProvider.createPageSource(operatorContext.getSession(), split, table, columns, DynamicFilter.EMPTY);
}

SourcePage sourcePage = source.getNextSourcePage();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,11 @@
import io.trino.spi.connector.ConnectorPageSource;
import io.trino.spi.connector.ConnectorPageSourceProvider;
import io.trino.spi.connector.ConnectorPageSourceProviderFactory;
import io.trino.spi.connector.ConnectorTableCredentials;
import io.trino.spi.connector.DynamicFilter;
import io.trino.spi.connector.EmptyPageSource;
import io.trino.spi.predicate.TupleDomain;

import java.util.List;
import java.util.Optional;

import static com.google.common.base.Preconditions.checkArgument;
import static io.trino.SystemSessionProperties.isAllowPushdownIntoConnectors;
Expand Down Expand Up @@ -66,7 +64,6 @@ public ConnectorPageSource createPageSource(
Session session,
Split split,
TableHandle table,
Optional<ConnectorTableCredentials> tableCredentials,
List<ColumnHandle> columns,
DynamicFilter dynamicFilter)
{
Expand All @@ -85,7 +82,6 @@ public ConnectorPageSource createPageSource(
session.toConnectorSession(table.catalogHandle()),
split.getConnectorSplit(),
table.connectorHandle(),
tableCredentials,
columns,
dynamicFilter);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,16 @@
import io.trino.metadata.TableHandle;
import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.connector.ConnectorPageSource;
import io.trino.spi.connector.ConnectorTableCredentials;
import io.trino.spi.connector.DynamicFilter;

import java.util.List;
import java.util.Optional;

public interface PageSourceProvider
{
ConnectorPageSource createPageSource(
Session session,
Split split,
TableHandle table,
Optional<ConnectorTableCredentials> tableCredentials,
List<ColumnHandle> columns,
DynamicFilter dynamicFilter);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,12 @@
import io.trino.spi.connector.ConnectorRecordSetProvider;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorSplit;
import io.trino.spi.connector.ConnectorTableCredentials;
import io.trino.spi.connector.ConnectorTableHandle;
import io.trino.spi.connector.ConnectorTransactionHandle;
import io.trino.spi.connector.DynamicFilter;
import io.trino.spi.connector.RecordPageSource;

import java.util.List;
import java.util.Optional;

import static java.util.Objects.requireNonNull;

Expand All @@ -46,7 +44,6 @@ public ConnectorPageSource createPageSource(
ConnectorSession session,
ConnectorSplit split,
ConnectorTableHandle table,
Optional<ConnectorTableCredentials> tableCredentials,
List<ColumnHandle> columns,
DynamicFilter dynamicFilter)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2044,12 +2044,10 @@ private PhysicalOperation visitScanFilterAndProject(
// otherwise we plan it as a normal operator
Map<Symbol, Integer> sourceLayout;
TableHandle table = null;
Optional<ConnectorTableCredentials> tableCredentials = Optional.empty();
List<ColumnHandle> columns = null;
PhysicalOperation source = null;
if (sourceNode instanceof TableScanNode tableScanNode) {
table = tableScanNode.getTable();
tableCredentials = context.getTaskContext().getTableCredentials(tableScanNode.getId());
// extract the column handles and channel to type mapping
sourceLayout = new LinkedHashMap<>();
columns = new ArrayList<>();
Expand Down Expand Up @@ -2129,7 +2127,6 @@ else if (sourceNode instanceof SampleNode sampleNode) {
pageSourceManager,
pageProcessor,
table,
tableCredentials,
columns,
dynamicFilter,
getTypes(projections),
Expand Down Expand Up @@ -2173,8 +2170,7 @@ public PhysicalOperation visitTableScan(TableScanNode node, LocalExecutionPlanCo
columnTypes.add(symbol.type());
}

Optional<ConnectorTableCredentials> tableCredentials = context.getTaskContext().getTableCredentials(node.getId());
OperatorFactory operatorFactory = new TableScanOperatorFactory(context.getNextOperatorId(), planNodeId, node.getId(), pageSourceManager, node.getTable(), tableCredentials, columns.build(), columnTypes.build());
OperatorFactory operatorFactory = new TableScanOperatorFactory(context.getNextOperatorId(), planNodeId, node.getId(), pageSourceManager, node.getTable(), columns.build(), columnTypes.build());
return new PhysicalOperation(operatorFactory, makeLayout(node));
}

Expand Down
Loading
Loading