diff --git a/docs/src/main/sphinx/connector/pinot.md b/docs/src/main/sphinx/connector/pinot.md index 7c11b331c164..087853ae0830 100644 --- a/docs/src/main/sphinx/connector/pinot.md +++ b/docs/src/main/sphinx/connector/pinot.md @@ -66,20 +66,20 @@ If `pinot.controller-urls` uses `https` scheme then TLS is enabled for all conne ### gRPC configuration properties -| Property name | Required | Description | -| ------------------------------------- | -------- | -------------------------------------------------------------------- | -| `pinot.grpc.port` | No | Pinot gRPC port, default to `8090`. | -| `pinot.grpc.max-inbound-message-size` | No | Max inbound message bytes when init gRPC client, default is `128MB`. | -| `pinot.grpc.use-plain-text` | No | Use plain text for gRPC communication, default to `true`. | -| `pinot.grpc.tls.keystore-type` | No | TLS keystore type for gRPC connection, default is `JKS`. | -| `pinot.grpc.tls.keystore-path` | No | TLS keystore file location for gRPC connection, default is empty. | -| `pinot.grpc.tls.keystore-password` | No | TLS keystore password, default is empty. | -| `pinot.grpc.tls.truststore-type` | No | TLS truststore type for gRPC connection, default is `JKS`. | -| `pinot.grpc.tls.truststore-path` | No | TLS truststore file location for gRPC connection, default is empty. | -| `pinot.grpc.tls.truststore-password` | No | TLS truststore password, default is empty. | -| `pinot.grpc.tls.ssl-provider` | No | SSL provider, default is `JDK`. | -| `pinot.grpc.proxy-uri` | No | Pinot Rest Proxy gRPC endpoint URI, default is null. | - +| Property name | Required | Description | +| ------------------------------------- | -------- |--------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| `pinot.grpc.port` | No | Pinot gRPC port, default to `8090`. | +| `pinot.grpc.max-inbound-message-size` | No | Max inbound message bytes when init gRPC client, default is `128MB`. | +| `pinot.grpc.use-plain-text` | No | Use plain text for gRPC communication, default to `true`. | +| `pinot.grpc.tls.keystore-type` | No | TLS keystore type for gRPC connection, default is `JKS`. | +| `pinot.grpc.tls.keystore-path` | No | TLS keystore file location for gRPC connection, default is empty. | +| `pinot.grpc.tls.keystore-password` | No | TLS keystore password, default is empty. | +| `pinot.grpc.tls.truststore-type` | No | TLS truststore type for gRPC connection, default is `JKS`. | +| `pinot.grpc.tls.truststore-path` | No | TLS truststore file location for gRPC connection, default is empty. | +| `pinot.grpc.tls.truststore-password` | No | TLS truststore password, default is empty. | +| `pinot.grpc.tls.ssl-provider` | No | SSL provider, default is `JDK`. | +| `pinot.grpc.proxy-uri` | No | Pinot Rest Proxy gRPC endpoint URI, default is null. | +| `pinot.grpc.query.enforce-metadata-exception` | No | When enabled, metadata exceptions from Pinot gRPC queries will be enforced, causing query failures instead of returning empty/partial results. Default is `false`. | For more Apache Pinot TLS configurations, please also refer to [Configuring TLS/SSL](https://docs.pinot.apache.org/operators/tutorials/configuring-tls-ssl). You can use {doc}`secrets ` to avoid actual values in the catalog properties files. diff --git a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotConfig.java b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotConfig.java index ae070687d3c9..6b420a4d4d31 100755 --- a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotConfig.java +++ b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotConfig.java @@ -64,6 +64,7 @@ public class PinotConfig private boolean countDistinctPushdownEnabled = true; private boolean proxyEnabled; private DataSize targetSegmentPageSize = DataSize.of(1, MEGABYTE); + private boolean grpcQueryEnforceMetadataException; @NotEmpty(message = "pinot.controller-urls cannot be empty") public List getControllerUrls() @@ -270,4 +271,17 @@ public boolean allUrlSchemesEqual() .distinct() .count() == 1; } + + public boolean isGrpcQueryEnforceMetadataException() + { + return grpcQueryEnforceMetadataException; + } + + @Config("pinot.grpc.query.enforce-metadata-exception") + @ConfigDescription("When enabled, metadata exceptions from Pinot gRPC queries will be enforced, causing query failures instead of returning empty/partial results") + public PinotConfig setGrpcQueryEnforceMetadataException(boolean grpcQueryEnforceMetadataException) + { + this.grpcQueryEnforceMetadataException = grpcQueryEnforceMetadataException; + return this; + } } diff --git a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotPageSourceProvider.java b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotPageSourceProvider.java index 5188f4997fa4..edc6a0d69ece 100755 --- a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotPageSourceProvider.java +++ b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotPageSourceProvider.java @@ -78,7 +78,7 @@ public ConnectorPageSource createPageSource( switch (pinotSplit.getSplitType()) { case SEGMENT: String segmentQuery = generatePql(pinotTableHandle, handles, pinotSplit.getSuffix(), pinotSplit.getTimePredicate(), limitForSegmentQueries); - PinotDataFetcher pinotDataFetcher = pinotDataFetcherFactory.create(segmentQuery, pinotSplit); + PinotDataFetcher pinotDataFetcher = pinotDataFetcherFactory.create(session, segmentQuery, pinotSplit); return new PinotSegmentPageSource( targetSegmentPageSizeBytes, handles, diff --git a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotSessionProperties.java b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotSessionProperties.java index feb82c1cd4cf..40f120c8dacc 100755 --- a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotSessionProperties.java +++ b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotSessionProperties.java @@ -37,6 +37,7 @@ public class PinotSessionProperties private static final String SEGMENTS_PER_SPLIT = "segments_per_split"; private static final String AGGREGATION_PUSHDOWN_ENABLED = "aggregation_pushdown_enabled"; private static final String COUNT_DISTINCT_PUSHDOWN_ENABLED = "count_distinct_pushdown_enabled"; + private static final String GRPC_QUERY_ENFORCE_METADATA_EXCEPTION = "grpc_query_enforce_metadata_exception"; private final List> sessionProperties; @@ -84,6 +85,11 @@ public PinotSessionProperties(PinotConfig pinotConfig) COUNT_DISTINCT_PUSHDOWN_ENABLED, "Enable count distinct pushdown", pinotConfig.isCountDistinctPushdownEnabled(), + false), + booleanProperty( + GRPC_QUERY_ENFORCE_METADATA_EXCEPTION, + "When true, enforce metadata exception in gRPC query even when response type is metadata", + pinotConfig.isGrpcQueryEnforceMetadataException(), false)); } @@ -130,6 +136,11 @@ public static boolean isCountDistinctPushdownEnabled(ConnectorSession session) return session.getProperty(COUNT_DISTINCT_PUSHDOWN_ENABLED, Boolean.class); } + public static boolean isGrpcQueryEnforceMetadataException(ConnectorSession session) + { + return session.getProperty(GRPC_QUERY_ENFORCE_METADATA_EXCEPTION, Boolean.class); + } + public List> getSessionProperties() { return sessionProperties; diff --git a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/client/PinotDataFetcher.java b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/client/PinotDataFetcher.java index bf75a2b35f44..8b26437cc7de 100644 --- a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/client/PinotDataFetcher.java +++ b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/client/PinotDataFetcher.java @@ -16,6 +16,7 @@ import com.google.common.collect.ImmutableList; import io.trino.plugin.pinot.PinotException; import io.trino.plugin.pinot.PinotSplit; +import io.trino.spi.connector.ConnectorSession; import org.apache.pinot.common.datatable.DataTable; import java.util.List; @@ -70,7 +71,7 @@ public void checkTooManyRows(DataTable dataTable) interface Factory { - PinotDataFetcher create(String query, PinotSplit split); + PinotDataFetcher create(ConnectorSession session, String query, PinotSplit split); int getRowLimit(); } diff --git a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/client/PinotGrpcDataFetcher.java b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/client/PinotGrpcDataFetcher.java index 9a3483c10e7c..7a9f050401a0 100644 --- a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/client/PinotGrpcDataFetcher.java +++ b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/client/PinotGrpcDataFetcher.java @@ -22,6 +22,7 @@ import io.trino.plugin.pinot.PinotException; import io.trino.plugin.pinot.PinotSplit; import io.trino.plugin.pinot.query.PinotProxyGrpcRequestBuilder; +import io.trino.spi.connector.ConnectorSession; import jakarta.annotation.PreDestroy; import org.apache.pinot.common.config.GrpcConfig; import org.apache.pinot.common.datatable.DataTable; @@ -42,6 +43,7 @@ import static com.google.common.collect.ImmutableList.toImmutableList; import static io.trino.plugin.pinot.PinotErrorCode.PINOT_EXCEPTION; +import static io.trino.plugin.pinot.PinotSessionProperties.isGrpcQueryEnforceMetadataException; import static java.lang.Boolean.FALSE; import static java.lang.String.format; import static java.util.Objects.requireNonNull; @@ -60,9 +62,16 @@ public class PinotGrpcDataFetcher private boolean isPinotDataFetched; private final RowCountChecker rowCountChecker; private long estimatedMemoryUsageInBytes; - - public PinotGrpcDataFetcher(PinotGrpcServerQueryClient pinotGrpcClient, PinotSplit split, String query, RowCountChecker rowCountChecker) + private final ConnectorSession session; + + public PinotGrpcDataFetcher( + ConnectorSession session, + PinotGrpcServerQueryClient pinotGrpcClient, + PinotSplit split, + String query, + RowCountChecker rowCountChecker) { + this.session = requireNonNull(session, "session is null"); this.pinotGrpcClient = requireNonNull(pinotGrpcClient, "pinotGrpcClient is null"); this.split = requireNonNull(split, "split is null"); this.query = requireNonNull(query, "query is null"); @@ -98,7 +107,7 @@ public void fetchData() { long startTimeNanos = System.nanoTime(); String serverHost = split.getSegmentHost().orElseThrow(() -> new PinotException(PinotErrorCode.PINOT_INVALID_PQL_GENERATED, Optional.empty(), "Expected the segment split to contain the host")); - this.responseIterator = pinotGrpcClient.queryPinot(query, serverHost, split.getSegments()); + this.responseIterator = pinotGrpcClient.queryPinot(query, serverHost, split.getSegments(), session); readTimeNanos += System.nanoTime() - startTimeNanos; isPinotDataFetched = true; } @@ -136,9 +145,9 @@ public void shutdown() } @Override - public PinotDataFetcher create(String query, PinotSplit split) + public PinotDataFetcher create(ConnectorSession session, String query, PinotSplit split) { - return new PinotGrpcDataFetcher(queryClient, split, query, new RowCountChecker(limitForSegmentQueries, query)); + return new PinotGrpcDataFetcher(session, queryClient, split, query, new RowCountChecker(limitForSegmentQueries, query)); } @Override @@ -239,7 +248,7 @@ private PinotGrpcServerQueryClient(PinotHostMapper pinotHostMapper, PinotGrpcSer this.proxyUri = pinotGrpcServerQueryClientConfig.getProxyUri(); } - public Iterator queryPinot(String query, String serverHost, List segments) + public Iterator queryPinot(String query, String serverHost, List segments, ConnectorSession session) { HostAndPort mappedHostAndPort = pinotHostMapper.getServerGrpcHostAndPort(serverHost, grpcPort); // GrpcQueryClient does not implement Closeable. The idle timeout is 30 minutes (grpc default). @@ -257,7 +266,7 @@ public Iterator queryPinot(String query, String serverHo grpcRequestBuilder.setHostName(mappedHostAndPort.getHost()).setPort(grpcPort); } Server.ServerRequest serverRequest = grpcRequestBuilder.build(); - return new ResponseIterator(client.submit(serverRequest), query); + return new ResponseIterator(session, client.submit(serverRequest), query); } public static class ResponseIterator @@ -265,11 +274,13 @@ public static class ResponseIterator { private final Iterator responseIterator; private final String query; + private final ConnectorSession session; - public ResponseIterator(Iterator responseIterator, String query) + public ResponseIterator(ConnectorSession session, Iterator responseIterator, String query) { this.responseIterator = requireNonNull(responseIterator, "responseIterator is null"); this.query = requireNonNull(query, "query is null"); + this.session = requireNonNull(session, "session is null"); } @Override @@ -280,7 +291,7 @@ protected PinotDataTableWithSize computeNext() } Server.ServerResponse response = responseIterator.next(); String responseType = response.getMetadataMap().get(MetadataKeys.RESPONSE_TYPE); - if (responseType.equals(ResponseType.METADATA)) { + if (responseType.equals(ResponseType.METADATA) && !isGrpcQueryEnforceMetadataException(session)) { return endOfData(); } ByteBuffer buffer = response.getPayload().asReadOnlyByteBuffer(); diff --git a/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestPinotConfig.java b/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestPinotConfig.java index 36d08af59a49..a90a43be7a17 100755 --- a/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestPinotConfig.java +++ b/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestPinotConfig.java @@ -49,7 +49,8 @@ public void testDefaults() .setAggregationPushdownEnabled(true) .setCountDistinctPushdownEnabled(true) .setProxyEnabled(false) - .setTargetSegmentPageSize(DataSize.of(1, MEGABYTE))); + .setTargetSegmentPageSize(DataSize.of(1, MEGABYTE)) + .setGrpcQueryEnforceMetadataException(false)); } @Test @@ -70,6 +71,7 @@ public void testExplicitPropertyMappings() .put("pinot.count-distinct-pushdown.enabled", "false") .put("pinot.proxy.enabled", "true") .put("pinot.target-segment-page-size", "2MB") + .put("pinot.grpc.query.enforce-metadata-exception", "true") .buildOrThrow(); PinotConfig expected = new PinotConfig() @@ -86,7 +88,8 @@ public void testExplicitPropertyMappings() .setAggregationPushdownEnabled(false) .setCountDistinctPushdownEnabled(false) .setProxyEnabled(true) - .setTargetSegmentPageSize(DataSize.of(2, MEGABYTE)); + .setTargetSegmentPageSize(DataSize.of(2, MEGABYTE)) + .setGrpcQueryEnforceMetadataException(true); ConfigAssertions.assertFullMapping(properties, expected); }