Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 14 additions & 14 deletions docs/src/main/sphinx/connector/pinot.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,20 +66,20 @@ If `pinot.controller-urls` uses `https` scheme then TLS is enabled for all conne

### gRPC configuration properties

| Property name | Required | Description |
| ------------------------------------- | -------- | -------------------------------------------------------------------- |
| `pinot.grpc.port` | No | Pinot gRPC port, default to `8090`. |
| `pinot.grpc.max-inbound-message-size` | No | Max inbound message bytes when init gRPC client, default is `128MB`. |
| `pinot.grpc.use-plain-text` | No | Use plain text for gRPC communication, default to `true`. |
| `pinot.grpc.tls.keystore-type` | No | TLS keystore type for gRPC connection, default is `JKS`. |
| `pinot.grpc.tls.keystore-path` | No | TLS keystore file location for gRPC connection, default is empty. |
| `pinot.grpc.tls.keystore-password` | No | TLS keystore password, default is empty. |
| `pinot.grpc.tls.truststore-type` | No | TLS truststore type for gRPC connection, default is `JKS`. |
| `pinot.grpc.tls.truststore-path` | No | TLS truststore file location for gRPC connection, default is empty. |
| `pinot.grpc.tls.truststore-password` | No | TLS truststore password, default is empty. |
| `pinot.grpc.tls.ssl-provider` | No | SSL provider, default is `JDK`. |
| `pinot.grpc.proxy-uri` | No | Pinot Rest Proxy gRPC endpoint URI, default is null. |

| Property name | Required | Description |
| ------------------------------------- | -------- |--------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `pinot.grpc.port` | No | Pinot gRPC port, default to `8090`. |
| `pinot.grpc.max-inbound-message-size` | No | Max inbound message bytes when init gRPC client, default is `128MB`. |
| `pinot.grpc.use-plain-text` | No | Use plain text for gRPC communication, default to `true`. |
| `pinot.grpc.tls.keystore-type` | No | TLS keystore type for gRPC connection, default is `JKS`. |
| `pinot.grpc.tls.keystore-path` | No | TLS keystore file location for gRPC connection, default is empty. |
| `pinot.grpc.tls.keystore-password` | No | TLS keystore password, default is empty. |
| `pinot.grpc.tls.truststore-type` | No | TLS truststore type for gRPC connection, default is `JKS`. |
| `pinot.grpc.tls.truststore-path` | No | TLS truststore file location for gRPC connection, default is empty. |
| `pinot.grpc.tls.truststore-password` | No | TLS truststore password, default is empty. |
| `pinot.grpc.tls.ssl-provider` | No | SSL provider, default is `JDK`. |
| `pinot.grpc.proxy-uri` | No | Pinot Rest Proxy gRPC endpoint URI, default is null. |
| `pinot.grpc.query.enforce-metadata-exception` | No | When enabled, metadata exceptions from Pinot gRPC queries will be enforced, causing query failures instead of returning empty/partial results. Default is `false`. |
For more Apache Pinot TLS configurations, please also refer to [Configuring TLS/SSL](https://docs.pinot.apache.org/operators/tutorials/configuring-tls-ssl).

You can use {doc}`secrets </security/secrets>` to avoid actual values in the catalog properties files.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ public class PinotConfig
private boolean countDistinctPushdownEnabled = true;
private boolean proxyEnabled;
private DataSize targetSegmentPageSize = DataSize.of(1, MEGABYTE);
private boolean grpcQueryEnforceMetadataException;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not a fan of making exception handling configurable based on a configuration.

@elonazoulay Do you have any other idea?

Copy link
Copy Markdown
Member

@elonazoulay elonazoulay May 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, especially since it's a choice between empty results masking an exception or a query failure with detailed exception (so it can be investigated and resolved).

Is there any issue with always surfacing exceptions when the query fails?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for reviewing! One reason we were considering making this behavior configurable is that, today, there are cases where partial results are returned instead of raising an exception—for example, when one or a few segments are missing for a table. Some users may be fine with receiving most of the query's result, while others, especially those with stricter consistency requirements, might prefer to get an explicit segment-missing exception.

That said, if we can align on always surfacing exceptions, it would certainly simplify the logic.

(cc @xiangfu0)


@NotEmpty(message = "pinot.controller-urls cannot be empty")
public List<URI> getControllerUrls()
Expand Down Expand Up @@ -270,4 +271,17 @@ public boolean allUrlSchemesEqual()
.distinct()
.count() == 1;
}

public boolean isGrpcQueryEnforceMetadataException()
{
return grpcQueryEnforceMetadataException;
}

@Config("pinot.grpc.query.enforce-metadata-exception")
Comment thread
ebyhr marked this conversation as resolved.
@ConfigDescription("When enabled, metadata exceptions from Pinot gRPC queries will be enforced, causing query failures instead of returning empty/partial results")
public PinotConfig setGrpcQueryEnforceMetadataException(boolean grpcQueryEnforceMetadataException)
Comment thread
ebyhr marked this conversation as resolved.
{
this.grpcQueryEnforceMetadataException = grpcQueryEnforceMetadataException;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public ConnectorPageSource createPageSource(
switch (pinotSplit.getSplitType()) {
case SEGMENT:
String segmentQuery = generatePql(pinotTableHandle, handles, pinotSplit.getSuffix(), pinotSplit.getTimePredicate(), limitForSegmentQueries);
PinotDataFetcher pinotDataFetcher = pinotDataFetcherFactory.create(segmentQuery, pinotSplit);
PinotDataFetcher pinotDataFetcher = pinotDataFetcherFactory.create(session, segmentQuery, pinotSplit);
return new PinotSegmentPageSource(
targetSegmentPageSizeBytes,
handles,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public class PinotSessionProperties
private static final String SEGMENTS_PER_SPLIT = "segments_per_split";
private static final String AGGREGATION_PUSHDOWN_ENABLED = "aggregation_pushdown_enabled";
private static final String COUNT_DISTINCT_PUSHDOWN_ENABLED = "count_distinct_pushdown_enabled";
private static final String GRPC_QUERY_ENFORCE_METADATA_EXCEPTION = "grpc_query_enforce_metadata_exception";

private final List<PropertyMetadata<?>> sessionProperties;

Expand Down Expand Up @@ -84,6 +85,11 @@ public PinotSessionProperties(PinotConfig pinotConfig)
COUNT_DISTINCT_PUSHDOWN_ENABLED,
"Enable count distinct pushdown",
pinotConfig.isCountDistinctPushdownEnabled(),
false),
booleanProperty(
GRPC_QUERY_ENFORCE_METADATA_EXCEPTION,
"When true, enforce metadata exception in gRPC query even when response type is metadata",
pinotConfig.isGrpcQueryEnforceMetadataException(),
false));
}

Expand Down Expand Up @@ -130,6 +136,11 @@ public static boolean isCountDistinctPushdownEnabled(ConnectorSession session)
return session.getProperty(COUNT_DISTINCT_PUSHDOWN_ENABLED, Boolean.class);
}

public static boolean isGrpcQueryEnforceMetadataException(ConnectorSession session)
{
return session.getProperty(GRPC_QUERY_ENFORCE_METADATA_EXCEPTION, Boolean.class);
}

public List<PropertyMetadata<?>> getSessionProperties()
{
return sessionProperties;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.google.common.collect.ImmutableList;
import io.trino.plugin.pinot.PinotException;
import io.trino.plugin.pinot.PinotSplit;
import io.trino.spi.connector.ConnectorSession;
import org.apache.pinot.common.datatable.DataTable;

import java.util.List;
Expand Down Expand Up @@ -70,7 +71,7 @@ public void checkTooManyRows(DataTable dataTable)

interface Factory
{
PinotDataFetcher create(String query, PinotSplit split);
PinotDataFetcher create(ConnectorSession session, String query, PinotSplit split);

int getRowLimit();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.trino.plugin.pinot.PinotException;
import io.trino.plugin.pinot.PinotSplit;
import io.trino.plugin.pinot.query.PinotProxyGrpcRequestBuilder;
import io.trino.spi.connector.ConnectorSession;
import jakarta.annotation.PreDestroy;
import org.apache.pinot.common.config.GrpcConfig;
import org.apache.pinot.common.datatable.DataTable;
Expand All @@ -42,6 +43,7 @@

import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.trino.plugin.pinot.PinotErrorCode.PINOT_EXCEPTION;
import static io.trino.plugin.pinot.PinotSessionProperties.isGrpcQueryEnforceMetadataException;
import static java.lang.Boolean.FALSE;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
Expand All @@ -60,9 +62,16 @@ public class PinotGrpcDataFetcher
private boolean isPinotDataFetched;
private final RowCountChecker rowCountChecker;
private long estimatedMemoryUsageInBytes;

public PinotGrpcDataFetcher(PinotGrpcServerQueryClient pinotGrpcClient, PinotSplit split, String query, RowCountChecker rowCountChecker)
private final ConnectorSession session;

public PinotGrpcDataFetcher(
ConnectorSession session,
PinotGrpcServerQueryClient pinotGrpcClient,
PinotSplit split,
String query,
RowCountChecker rowCountChecker)
{
this.session = requireNonNull(session, "session is null");
this.pinotGrpcClient = requireNonNull(pinotGrpcClient, "pinotGrpcClient is null");
this.split = requireNonNull(split, "split is null");
this.query = requireNonNull(query, "query is null");
Expand Down Expand Up @@ -98,7 +107,7 @@ public void fetchData()
{
long startTimeNanos = System.nanoTime();
String serverHost = split.getSegmentHost().orElseThrow(() -> new PinotException(PinotErrorCode.PINOT_INVALID_PQL_GENERATED, Optional.empty(), "Expected the segment split to contain the host"));
this.responseIterator = pinotGrpcClient.queryPinot(query, serverHost, split.getSegments());
this.responseIterator = pinotGrpcClient.queryPinot(query, serverHost, split.getSegments(), session);
readTimeNanos += System.nanoTime() - startTimeNanos;
isPinotDataFetched = true;
}
Expand Down Expand Up @@ -136,9 +145,9 @@ public void shutdown()
}

@Override
public PinotDataFetcher create(String query, PinotSplit split)
public PinotDataFetcher create(ConnectorSession session, String query, PinotSplit split)
{
return new PinotGrpcDataFetcher(queryClient, split, query, new RowCountChecker(limitForSegmentQueries, query));
return new PinotGrpcDataFetcher(session, queryClient, split, query, new RowCountChecker(limitForSegmentQueries, query));
}

@Override
Expand Down Expand Up @@ -239,7 +248,7 @@ private PinotGrpcServerQueryClient(PinotHostMapper pinotHostMapper, PinotGrpcSer
this.proxyUri = pinotGrpcServerQueryClientConfig.getProxyUri();
}

public Iterator<PinotDataTableWithSize> queryPinot(String query, String serverHost, List<String> segments)
public Iterator<PinotDataTableWithSize> queryPinot(String query, String serverHost, List<String> segments, ConnectorSession session)
{
HostAndPort mappedHostAndPort = pinotHostMapper.getServerGrpcHostAndPort(serverHost, grpcPort);
// GrpcQueryClient does not implement Closeable. The idle timeout is 30 minutes (grpc default).
Expand All @@ -257,19 +266,21 @@ public Iterator<PinotDataTableWithSize> queryPinot(String query, String serverHo
grpcRequestBuilder.setHostName(mappedHostAndPort.getHost()).setPort(grpcPort);
}
Server.ServerRequest serverRequest = grpcRequestBuilder.build();
return new ResponseIterator(client.submit(serverRequest), query);
return new ResponseIterator(session, client.submit(serverRequest), query);
}

public static class ResponseIterator
extends AbstractIterator<PinotDataTableWithSize>
{
private final Iterator<Server.ServerResponse> responseIterator;
private final String query;
private final ConnectorSession session;

public ResponseIterator(Iterator<Server.ServerResponse> responseIterator, String query)
public ResponseIterator(ConnectorSession session, Iterator<Server.ServerResponse> responseIterator, String query)
{
this.responseIterator = requireNonNull(responseIterator, "responseIterator is null");
this.query = requireNonNull(query, "query is null");
this.session = requireNonNull(session, "session is null");
}

@Override
Expand All @@ -280,7 +291,7 @@ protected PinotDataTableWithSize computeNext()
}
Server.ServerResponse response = responseIterator.next();
String responseType = response.getMetadataMap().get(MetadataKeys.RESPONSE_TYPE);
if (responseType.equals(ResponseType.METADATA)) {
if (responseType.equals(ResponseType.METADATA) && !isGrpcQueryEnforceMetadataException(session)) {
return endOfData();
}
ByteBuffer buffer = response.getPayload().asReadOnlyByteBuffer();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ public void testDefaults()
.setAggregationPushdownEnabled(true)
.setCountDistinctPushdownEnabled(true)
.setProxyEnabled(false)
.setTargetSegmentPageSize(DataSize.of(1, MEGABYTE)));
.setTargetSegmentPageSize(DataSize.of(1, MEGABYTE))
.setGrpcQueryEnforceMetadataException(false));
}

@Test
Expand All @@ -70,6 +71,7 @@ public void testExplicitPropertyMappings()
.put("pinot.count-distinct-pushdown.enabled", "false")
.put("pinot.proxy.enabled", "true")
.put("pinot.target-segment-page-size", "2MB")
.put("pinot.grpc.query.enforce-metadata-exception", "true")
.buildOrThrow();

PinotConfig expected = new PinotConfig()
Expand All @@ -86,7 +88,8 @@ public void testExplicitPropertyMappings()
.setAggregationPushdownEnabled(false)
.setCountDistinctPushdownEnabled(false)
.setProxyEnabled(true)
.setTargetSegmentPageSize(DataSize.of(2, MEGABYTE));
.setTargetSegmentPageSize(DataSize.of(2, MEGABYTE))
.setGrpcQueryEnforceMetadataException(true);

ConfigAssertions.assertFullMapping(properties, expected);
}
Expand Down