Skip to content
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ public class PinotConfig
private boolean countDistinctPushdownEnabled = true;
private boolean proxyEnabled;
private DataSize targetSegmentPageSize = DataSize.of(1, MEGABYTE);
private boolean grpcQueryEnforceMetadataException;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not a fan of making exception handling configurable based on a configuration.

@elonazoulay Do you have any other idea?

Copy link
Copy Markdown
Member

@elonazoulay elonazoulay May 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, especially since it's a choice between empty results masking an exception or a query failure with detailed exception (so it can be investigated and resolved).

Is there any issue with always surfacing exceptions when the query fails?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for reviewing! One reason we were considering making this behavior configurable is that, today, there are cases where partial results are returned instead of raising an exception—for example, when one or a few segments are missing for a table. Some users may be fine with receiving most of the query's result, while others, especially those with stricter consistency requirements, might prefer to get an explicit segment-missing exception.

That said, if we can align on always surfacing exceptions, it would certainly simplify the logic.

(cc @xiangfu0)


@NotEmpty(message = "pinot.controller-urls cannot be empty")
public List<URI> getControllerUrls()
Expand Down Expand Up @@ -270,4 +271,16 @@ public boolean allUrlSchemesEqual()
.distinct()
.count() == 1;
}

public boolean isGrpcQueryEnforceMetadataException()
{
return grpcQueryEnforceMetadataException;
}

@Config("pinot.grpc.query.enforce-metadata-exception")
Comment thread
ebyhr marked this conversation as resolved.
public PinotConfig setGrpcQueryEnforceMetadataException(boolean grpcQueryEnforceMetadataException)
Comment thread
ebyhr marked this conversation as resolved.
{
this.grpcQueryEnforceMetadataException = grpcQueryEnforceMetadataException;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public ConnectorPageSource createPageSource(
switch (pinotSplit.getSplitType()) {
case SEGMENT:
String segmentQuery = generatePql(pinotTableHandle, handles, pinotSplit.getSuffix(), pinotSplit.getTimePredicate(), limitForSegmentQueries);
PinotDataFetcher pinotDataFetcher = pinotDataFetcherFactory.create(segmentQuery, pinotSplit);
PinotDataFetcher pinotDataFetcher = pinotDataFetcherFactory.create(session, segmentQuery, pinotSplit);
return new PinotSegmentPageSource(
targetSegmentPageSizeBytes,
handles,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,19 @@ public class PinotSessionProperties
private static final String SEGMENTS_PER_SPLIT = "segments_per_split";
private static final String AGGREGATION_PUSHDOWN_ENABLED = "aggregation_pushdown_enabled";
private static final String COUNT_DISTINCT_PUSHDOWN_ENABLED = "count_distinct_pushdown_enabled";
public static final String GRPC_QUERY_ENFORCE_METADATA_EXCEPTION = "grpc_query_enforce_metadata_exception";
Comment thread
ebyhr marked this conversation as resolved.
Outdated

private final List<PropertyMetadata<?>> sessionProperties;

@Inject
public PinotSessionProperties(PinotConfig pinotConfig)
{
sessionProperties = ImmutableList.of(
booleanProperty(
GRPC_QUERY_ENFORCE_METADATA_EXCEPTION,
"When true, enforce metadata exception in gRPC query even when response type is metadata",
pinotConfig.isGrpcQueryEnforceMetadataException(),
false),
Comment thread
ebyhr marked this conversation as resolved.
Outdated
booleanProperty(
PREFER_BROKER_QUERIES,
"Prefer queries to broker even when parallel scan is enabled for aggregation queries",
Expand Down Expand Up @@ -130,6 +136,11 @@ public static boolean isCountDistinctPushdownEnabled(ConnectorSession session)
return session.getProperty(COUNT_DISTINCT_PUSHDOWN_ENABLED, Boolean.class);
}

public static boolean isGrpcQueryEnforceMetadataException(ConnectorSession session)
{
return session.getProperty(GRPC_QUERY_ENFORCE_METADATA_EXCEPTION, Boolean.class);
}

public List<PropertyMetadata<?>> getSessionProperties()
{
return sessionProperties;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.google.common.collect.ImmutableList;
import io.trino.plugin.pinot.PinotException;
import io.trino.plugin.pinot.PinotSplit;
import io.trino.spi.connector.ConnectorSession;
import org.apache.pinot.common.datatable.DataTable;

import java.util.List;
Expand Down Expand Up @@ -70,7 +71,7 @@ public void checkTooManyRows(DataTable dataTable)

interface Factory
{
PinotDataFetcher create(String query, PinotSplit split);
PinotDataFetcher create(ConnectorSession session, String query, PinotSplit split);

int getRowLimit();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@
import com.google.inject.Inject;
import io.trino.plugin.pinot.PinotErrorCode;
import io.trino.plugin.pinot.PinotException;
import io.trino.plugin.pinot.PinotSessionProperties;
import io.trino.plugin.pinot.PinotSplit;
import io.trino.plugin.pinot.query.PinotProxyGrpcRequestBuilder;
import io.trino.spi.connector.ConnectorSession;
import jakarta.annotation.PreDestroy;
import org.apache.pinot.common.config.GrpcConfig;
import org.apache.pinot.common.datatable.DataTable;
Expand Down Expand Up @@ -60,9 +62,16 @@ public class PinotGrpcDataFetcher
private boolean isPinotDataFetched;
private final RowCountChecker rowCountChecker;
private long estimatedMemoryUsageInBytes;

public PinotGrpcDataFetcher(PinotGrpcServerQueryClient pinotGrpcClient, PinotSplit split, String query, RowCountChecker rowCountChecker)
private final ConnectorSession session;

public PinotGrpcDataFetcher(
ConnectorSession session,
PinotGrpcServerQueryClient pinotGrpcClient,
PinotSplit split,
String query,
RowCountChecker rowCountChecker)
{
this.session = requireNonNull(session, "session is null");
this.pinotGrpcClient = requireNonNull(pinotGrpcClient, "pinotGrpcClient is null");
this.split = requireNonNull(split, "split is null");
this.query = requireNonNull(query, "query is null");
Expand Down Expand Up @@ -98,7 +107,7 @@ public void fetchData()
{
long startTimeNanos = System.nanoTime();
String serverHost = split.getSegmentHost().orElseThrow(() -> new PinotException(PinotErrorCode.PINOT_INVALID_PQL_GENERATED, Optional.empty(), "Expected the segment split to contain the host"));
this.responseIterator = pinotGrpcClient.queryPinot(query, serverHost, split.getSegments());
this.responseIterator = pinotGrpcClient.queryPinot(query, serverHost, split.getSegments(), session);
readTimeNanos += System.nanoTime() - startTimeNanos;
isPinotDataFetched = true;
}
Expand Down Expand Up @@ -136,9 +145,9 @@ public void shutdown()
}

@Override
public PinotDataFetcher create(String query, PinotSplit split)
public PinotDataFetcher create(ConnectorSession session, String query, PinotSplit split)
{
return new PinotGrpcDataFetcher(queryClient, split, query, new RowCountChecker(limitForSegmentQueries, query));
return new PinotGrpcDataFetcher(session, queryClient, split, query, new RowCountChecker(limitForSegmentQueries, query));
}

@Override
Expand Down Expand Up @@ -239,7 +248,7 @@ private PinotGrpcServerQueryClient(PinotHostMapper pinotHostMapper, PinotGrpcSer
this.proxyUri = pinotGrpcServerQueryClientConfig.getProxyUri();
}

public Iterator<PinotDataTableWithSize> queryPinot(String query, String serverHost, List<String> segments)
public Iterator<PinotDataTableWithSize> queryPinot(String query, String serverHost, List<String> segments, ConnectorSession session)
{
HostAndPort mappedHostAndPort = pinotHostMapper.getServerGrpcHostAndPort(serverHost, grpcPort);
// GrpcQueryClient does not implement Closeable. The idle timeout is 30 minutes (grpc default).
Expand All @@ -257,19 +266,21 @@ public Iterator<PinotDataTableWithSize> queryPinot(String query, String serverHo
grpcRequestBuilder.setHostName(mappedHostAndPort.getHost()).setPort(grpcPort);
}
Server.ServerRequest serverRequest = grpcRequestBuilder.build();
return new ResponseIterator(client.submit(serverRequest), query);
return new ResponseIterator(client.submit(serverRequest), query, session);
}

public static class ResponseIterator
extends AbstractIterator<PinotDataTableWithSize>
{
private final Iterator<Server.ServerResponse> responseIterator;
private final String query;
private final ConnectorSession session;

public ResponseIterator(Iterator<Server.ServerResponse> responseIterator, String query)
public ResponseIterator(Iterator<Server.ServerResponse> responseIterator, String query, ConnectorSession session)
Comment thread
ebyhr marked this conversation as resolved.
Outdated
{
this.responseIterator = requireNonNull(responseIterator, "responseIterator is null");
this.query = requireNonNull(query, "query is null");
this.session = requireNonNull(session, "session is null");
}

@Override
Expand All @@ -280,7 +291,8 @@ protected PinotDataTableWithSize computeNext()
}
Server.ServerResponse response = responseIterator.next();
String responseType = response.getMetadataMap().get(MetadataKeys.RESPONSE_TYPE);
if (responseType.equals(ResponseType.METADATA)) {
if (responseType.equals(ResponseType.METADATA)
&& !PinotSessionProperties.isGrpcQueryEnforceMetadataException(session)) {
Copy link
Copy Markdown
Author

@rhodo rhodo Mar 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea is that instead of using metadata type to signal the end of data, we rely on response iterator check here to signal the end of data

Comment thread
ebyhr marked this conversation as resolved.
Outdated
return endOfData();
}
ByteBuffer buffer = response.getPayload().asReadOnlyByteBuffer();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public void testDefaults()
.setAggregationPushdownEnabled(true)
.setCountDistinctPushdownEnabled(true)
.setProxyEnabled(false)
.setGrpcQueryEnforceMetadataException(false)
Comment thread
ebyhr marked this conversation as resolved.
Outdated
.setTargetSegmentPageSize(DataSize.of(1, MEGABYTE)));
}

Expand All @@ -70,6 +71,7 @@ public void testExplicitPropertyMappings()
.put("pinot.count-distinct-pushdown.enabled", "false")
.put("pinot.proxy.enabled", "true")
.put("pinot.target-segment-page-size", "2MB")
.put("pinot.grpc.query.enforce-metadata-exception", "true")
.buildOrThrow();

PinotConfig expected = new PinotConfig()
Expand All @@ -86,6 +88,7 @@ public void testExplicitPropertyMappings()
.setAggregationPushdownEnabled(false)
.setCountDistinctPushdownEnabled(false)
.setProxyEnabled(true)
.setGrpcQueryEnforceMetadataException(true)
Comment thread
ebyhr marked this conversation as resolved.
Outdated
.setTargetSegmentPageSize(DataSize.of(2, MEGABYTE));

ConfigAssertions.assertFullMapping(properties, expected);
Expand Down