Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.trino.spi.PageSorter;
import io.trino.spi.VersionEmbedder;
import io.trino.spi.connector.ConnectorContext;
import io.trino.spi.connector.ConnectorExpressionEvaluator;
import io.trino.spi.connector.MetadataProvider;
import io.trino.spi.function.FunctionBundleFactory;
import io.trino.spi.type.TypeManager;
Expand All @@ -38,6 +39,7 @@ public class ConnectorContextInstance
private final PageSorter pageSorter;
private final PageIndexerFactory pageIndexerFactory;
private final FunctionBundleFactory functionBundleFactory;
private final ConnectorExpressionEvaluator connectorExpressionEvaluator;

public ConnectorContextInstance(
OpenTelemetry openTelemetry,
Expand All @@ -48,7 +50,8 @@ public ConnectorContextInstance(
MetadataProvider metadataProvider,
PageSorter pageSorter,
PageIndexerFactory pageIndexerFactory,
FunctionBundleFactory functionBundleFactory)
FunctionBundleFactory functionBundleFactory,
ConnectorExpressionEvaluator connectorExpressionEvaluator)
{
this.openTelemetry = requireNonNull(openTelemetry, "openTelemetry is null");
this.tracer = requireNonNull(tracer, "tracer is null");
Expand All @@ -59,6 +62,7 @@ public ConnectorContextInstance(
this.pageSorter = requireNonNull(pageSorter, "pageSorter is null");
this.pageIndexerFactory = requireNonNull(pageIndexerFactory, "pageIndexerFactory is null");
this.functionBundleFactory = requireNonNull(functionBundleFactory, "functionBundleFactory is null");
this.connectorExpressionEvaluator = requireNonNull(connectorExpressionEvaluator, "connectorExpressionEvaluator is null");
}

@Override
Expand Down Expand Up @@ -114,4 +118,10 @@ public FunctionBundleFactory getFunctionBundleFactory()
{
return functionBundleFactory;
}

@Override
public ConnectorExpressionEvaluator getConnectorExpressionEvaluator()
{
return connectorExpressionEvaluator;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import io.trino.spi.classloader.ThreadContextClassLoader;
import io.trino.spi.connector.Connector;
import io.trino.spi.connector.ConnectorContext;
import io.trino.spi.connector.ConnectorExpressionEvaluator;
import io.trino.spi.connector.ConnectorFactory;
import io.trino.spi.connector.ConnectorName;
import io.trino.spi.type.TypeManager;
Expand Down Expand Up @@ -73,6 +74,7 @@ public class DefaultCatalogFactory

private final ConcurrentMap<ConnectorName, ConnectorFactory> connectorFactories = new ConcurrentHashMap<>();
private final SecretsResolver secretsResolver;
private final ConnectorExpressionEvaluator connectorExpressionEvaluator;

@Inject
public DefaultCatalogFactory(
Expand All @@ -88,7 +90,8 @@ public DefaultCatalogFactory(
TypeManager typeManager,
NodeSchedulerConfig nodeSchedulerConfig,
OptimizerConfig optimizerConfig,
SecretsResolver secretsResolver)
SecretsResolver secretsResolver,
ConnectorExpressionEvaluator connectorExpressionEvaluator)
{
this.metadata = requireNonNull(metadata, "metadata is null");
this.accessControl = requireNonNull(accessControl, "accessControl is null");
Expand All @@ -103,6 +106,7 @@ public DefaultCatalogFactory(
this.schedulerIncludeCoordinator = nodeSchedulerConfig.isIncludeCoordinator();
this.maxPrefetchedInformationSchemaPrefixes = optimizerConfig.getMaxPrefetchedInformationSchemaPrefixes();
this.secretsResolver = requireNonNull(secretsResolver, "secretsResolver is null");
this.connectorExpressionEvaluator = requireNonNull(connectorExpressionEvaluator, "connectorExpressionEvaluator is null");
}

@Override
Expand Down Expand Up @@ -154,7 +158,8 @@ private CatalogConnector createCatalog(CatalogHandle catalogHandle, ConnectorNam
currentNode,
metadata,
accessControl,
maxPrefetchedInformationSchemaPrefixes));
maxPrefetchedInformationSchemaPrefixes,
connectorExpressionEvaluator));

SystemTablesProvider systemTablesProvider = new SystemTablesProvider(
transactionManager,
Expand Down Expand Up @@ -194,7 +199,8 @@ private Connector createConnector(CatalogName catalogName, ConnectorFactory conn
new InternalMetadataProvider(metadata, typeManager),
pageSorter,
pageIndexerFactory,
new InternalFunctionBundleFactory());
new InternalFunctionBundleFactory(),
connectorExpressionEvaluator);

try (ThreadContextClassLoader _ = new ThreadContextClassLoader(connectorFactory.getClass().getClassLoader())) {
// TODO: connector factory should take CatalogName
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import io.trino.metadata.Metadata;
import io.trino.node.InternalNode;
import io.trino.security.AccessControl;
import io.trino.spi.connector.ConnectorExpressionEvaluator;
import io.trino.spi.connector.ConnectorMetadata;
import io.trino.spi.connector.ConnectorPageSourceProvider;
import io.trino.spi.connector.ConnectorSession;
Expand All @@ -35,11 +36,16 @@ public class InformationSchemaConnector
private final ConnectorPageSourceProvider pageSourceProvider;

public InformationSchemaConnector(String catalogName, InternalNode currentNode, Metadata metadata, AccessControl accessControl, int maxPrefetchedInformationSchemaPrefixes)
{
this(catalogName, currentNode, metadata, accessControl, maxPrefetchedInformationSchemaPrefixes, ConnectorExpressionEvaluator.NOOP);
}

public InformationSchemaConnector(String catalogName, InternalNode currentNode, Metadata metadata, AccessControl accessControl, int maxPrefetchedInformationSchemaPrefixes, ConnectorExpressionEvaluator connectorExpressionEvaluator)
{
requireNonNull(catalogName, "catalogName is null");
requireNonNull(metadata, "metadata is null");

this.metadata = new InformationSchemaMetadata(catalogName, metadata, maxPrefetchedInformationSchemaPrefixes);
this.metadata = new InformationSchemaMetadata(catalogName, metadata, maxPrefetchedInformationSchemaPrefixes, connectorExpressionEvaluator);
this.splitManager = new InformationSchemaSplitManager(currentNode.getHostAndPort());
this.pageSourceProvider = new InformationSchemaPageSourceProvider(metadata, accessControl);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.trino.spi.TrinoException;
import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.connector.ColumnMetadata;
import io.trino.spi.connector.ConnectorExpressionEvaluator;
import io.trino.spi.connector.ConnectorMetadata;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorTableHandle;
Expand All @@ -36,6 +37,7 @@
import io.trino.spi.connector.LimitApplicationResult;
import io.trino.spi.connector.RelationColumnsMetadata;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.expression.Constant;
import io.trino.spi.predicate.Domain;
import io.trino.spi.predicate.EquatableValueSet;
import io.trino.spi.predicate.NullableValue;
Expand All @@ -49,7 +51,6 @@
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.function.Predicate;
import java.util.function.UnaryOperator;
import java.util.stream.Stream;

Expand Down Expand Up @@ -84,12 +85,14 @@ public class InformationSchemaMetadata
private final String catalogName;
private final Metadata metadata;
private final int maxPrefetchedInformationSchemaPrefixes;
private final ConnectorExpressionEvaluator connectorExpressionEvaluator;

public InformationSchemaMetadata(String catalogName, Metadata metadata, int maxPrefetchedInformationSchemaPrefixes)
public InformationSchemaMetadata(String catalogName, Metadata metadata, int maxPrefetchedInformationSchemaPrefixes, ConnectorExpressionEvaluator connectorExpressionEvaluator)
{
this.catalogName = requireNonNull(catalogName, "catalogName is null");
this.metadata = requireNonNull(metadata, "metadata is null");
this.maxPrefetchedInformationSchemaPrefixes = maxPrefetchedInformationSchemaPrefixes;
this.connectorExpressionEvaluator = requireNonNull(connectorExpressionEvaluator, "connectorExpressionEvaluator is null");
}

@Override
Expand Down Expand Up @@ -232,8 +235,8 @@ private Set<QualifiedTablePrefix> getPrefixes(ConnectorSession session, Informat
}

InformationSchemaTable informationSchemaTable = table.table();
Set<QualifiedTablePrefix> schemaPrefixes = calculatePrefixesWithSchemaName(session, constraint.getSummary(), constraint.predicate());
Set<QualifiedTablePrefix> tablePrefixes = calculatePrefixesWithTableName(informationSchemaTable, session, schemaPrefixes, constraint.getSummary(), constraint.predicate());
Set<QualifiedTablePrefix> schemaPrefixes = calculatePrefixesWithSchemaName(session, constraint);
Set<QualifiedTablePrefix> tablePrefixes = calculatePrefixesWithTableName(informationSchemaTable, session, schemaPrefixes, constraint);
verify(tablePrefixes.size() <= maxPrefetchedInformationSchemaPrefixes, "calculatePrefixesWithTableName returned too many prefixes: %s", tablePrefixes.size());
return tablePrefixes;
}
Expand All @@ -245,14 +248,13 @@ public static boolean isTablesEnumeratingTable(InformationSchemaTable table)

private Set<QualifiedTablePrefix> calculatePrefixesWithSchemaName(
ConnectorSession connectorSession,
TupleDomain<ColumnHandle> constraint,
Optional<Predicate<Map<ColumnHandle, NullableValue>>> predicate)
Constraint constraint)
{
Optional<Set<String>> schemas = filterString(constraint, SCHEMA_COLUMN_HANDLE);
Optional<Set<String>> schemas = filterString(constraint.getSummary(), SCHEMA_COLUMN_HANDLE);
if (schemas.isPresent()) {
Set<QualifiedTablePrefix> schemasFromPredicate = schemas.get().stream()
.filter(this::isLowerCase)
.filter(schema -> predicate.isEmpty() || predicate.get().test(schemaAsFixedValues(schema)))
.filter(schema -> connectorExpressionEvaluator.evaluate(constraint.getExpression(), connectorSession, schemaAsFixedValues(schema), constraint.getEvaluationAssignments()).orElse(true))
.map(schema -> new QualifiedTablePrefix(catalogName, schema))
.collect(toImmutableSet());
if (schemasFromPredicate.size() > maxPrefetchedInformationSchemaPrefixes) {
Expand All @@ -261,13 +263,13 @@ private Set<QualifiedTablePrefix> calculatePrefixesWithSchemaName(
return schemasFromPredicate;
}

if (predicate.isEmpty()) {
if (Constant.TRUE.equals(constraint.getExpression()) || constraint.getEvaluationAssignments().isEmpty()) {
return ImmutableSet.of(new QualifiedTablePrefix(catalogName));
}

Session session = ((FullConnectorSession) connectorSession).getSession();
Set<QualifiedTablePrefix> schemaPrefixes = listSchemaNames(session)
.filter(prefix -> predicate.get().test(schemaAsFixedValues(prefix.getSchemaName().get())))
.filter(prefix -> connectorExpressionEvaluator.evaluate(constraint.getExpression(), connectorSession, schemaAsFixedValues(prefix.getSchemaName().get()), constraint.getEvaluationAssignments()).orElse(true))
.collect(toImmutableSet());
if (schemaPrefixes.size() > maxPrefetchedInformationSchemaPrefixes) {
// in case of high number of prefixes it is better to populate all data and then filter
Expand All @@ -281,12 +283,11 @@ private Set<QualifiedTablePrefix> calculatePrefixesWithTableName(
InformationSchemaTable informationSchemaTable,
ConnectorSession connectorSession,
Set<QualifiedTablePrefix> prefixes,
TupleDomain<ColumnHandle> constraint,
Optional<Predicate<Map<ColumnHandle, NullableValue>>> predicate)
Constraint constraint)
{
Session session = ((FullConnectorSession) connectorSession).getSession();

Optional<Set<String>> tables = filterString(constraint, TABLE_NAME_COLUMN_HANDLE);
Optional<Set<String>> tables = filterString(constraint.getSummary(), TABLE_NAME_COLUMN_HANDLE);
if (tables.isPresent()) {
Set<QualifiedTablePrefix> tablePrefixes = prefixes.stream()
.peek(prefix -> verify(prefix.asQualifiedObjectName().isEmpty()))
Expand All @@ -296,7 +297,7 @@ private Set<QualifiedTablePrefix> calculatePrefixesWithTableName(
.flatMap(prefix -> tables.get().stream()
.filter(this::isLowerCase)
.map(table -> new QualifiedObjectName(catalogName, prefix.getSchemaName().get(), table)))
.filter(objectName -> predicate.isEmpty() || predicate.get().test(asFixedValues(objectName)))
.filter(objectName -> connectorExpressionEvaluator.evaluate(constraint.getExpression(), connectorSession, asFixedValues(objectName), constraint.getEvaluationAssignments()).orElse(true))
.map(QualifiedObjectName::asQualifiedTablePrefix)
.distinct()
.limit(maxPrefetchedInformationSchemaPrefixes + 1)
Expand All @@ -310,13 +311,13 @@ private Set<QualifiedTablePrefix> calculatePrefixesWithTableName(
return tablePrefixes;
}

if (predicate.isEmpty() || !isColumnsEnumeratingTable(informationSchemaTable)) {
if (constraint.getEvaluationAssignments().isEmpty() || !isColumnsEnumeratingTable(informationSchemaTable)) {
return prefixes;
}

Set<QualifiedTablePrefix> tablePrefixes = prefixes.stream()
.flatMap(prefix -> metadata.listTables(session, prefix).stream())
.filter(objectName -> predicate.get().test(asFixedValues(objectName)))
.filter(objectName -> connectorExpressionEvaluator.evaluate(constraint.getExpression(), connectorSession, asFixedValues(objectName), constraint.getEvaluationAssignments()).orElse(true))
.map(QualifiedObjectName::asQualifiedTablePrefix)
.distinct()
.limit(maxPrefetchedInformationSchemaPrefixes + 1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
*/
package io.trino.connector.system;

import com.google.common.collect.Sets;
import io.trino.connector.system.jdbc.JdbcTable;
import io.trino.spi.TrinoException;
import io.trino.spi.connector.ColumnHandle;
Expand All @@ -31,6 +30,7 @@
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.connector.SystemColumnHandle;
import io.trino.spi.connector.SystemTable;
import io.trino.spi.expression.Constant;
import io.trino.spi.function.table.ConnectorTableFunctionHandle;
import io.trino.spi.predicate.TupleDomain;

Expand Down Expand Up @@ -150,13 +150,13 @@ public Optional<ConstraintApplicationResult<ConnectorTableHandle>> applyFilter(C

TupleDomain<ColumnHandle> oldDomain = table.constraint();
TupleDomain<ColumnHandle> newDomain = oldDomain.intersect(constraint.getSummary());
if (oldDomain.equals(newDomain) && constraint.predicate().isEmpty()) {
if (oldDomain.equals(newDomain) && (Constant.TRUE.equals(constraint.getExpression()) || constraint.getEvaluationAssignments().isEmpty())) {
return Optional.empty();
}

SystemTable systemTable = checkAndGetTable(session, table);
if (systemTable instanceof JdbcTable jdbcTable) {
TupleDomain<ColumnHandle> filtered = jdbcTable.applyFilter(session, effectiveConstraint(oldDomain, constraint, newDomain));
TupleDomain<ColumnHandle> filtered = jdbcTable.applyFilter(session, effectiveConstraint(constraint, newDomain));
newDomain = newDomain.intersect(filtered);
}

Expand Down Expand Up @@ -190,16 +190,8 @@ public Optional<ConnectorTableCredentials> getTableCredentials(ConnectorSession
throw new TrinoException(NOT_SUPPORTED, "This connector does not support table credentials");
}

private Constraint effectiveConstraint(TupleDomain<ColumnHandle> oldDomain, Constraint newConstraint, TupleDomain<ColumnHandle> effectiveDomain)
private Constraint effectiveConstraint(Constraint newConstraint, TupleDomain<ColumnHandle> effectiveDomain)
{
if (effectiveDomain.isNone() || newConstraint.predicate().isEmpty()) {
return new Constraint(effectiveDomain);
}
return new Constraint(
effectiveDomain,
oldDomain.asPredicate().and(newConstraint.predicate().get()),
Sets.union(
oldDomain.getDomains().orElseThrow().keySet(),
newConstraint.getPredicateColumns().orElseThrow()));
return new Constraint(effectiveDomain, newConstraint.getExpression(), newConstraint.getAssignments(), newConstraint.getEvaluationAssignments());
}
}
Loading