Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion core/trino-main/src/main/java/io/trino/event/QueryMonitor.java
Original file line number Diff line number Diff line change
Expand Up @@ -538,8 +538,21 @@ private static void extractPlanNodeStats(StageInfo stageInfo, ImmutableMultimap.
return;
}

List<OperatorStats> operatorSummaries = stageInfo.stageStats().getOperatorSummaries();
Map<PlanNodeId, Metrics> splitSourceMetrics = stageInfo.stageStats().getSplitSourceMetrics();
ImmutableList.Builder<OperatorStats> operatorStats = ImmutableList.builderWithExpectedSize(operatorSummaries.size());
for (OperatorStats stats : operatorSummaries) {
if (stats.getSourceId().isPresent()) {
Metrics metrics = splitSourceMetrics.get(stats.getSourceId().get());
if (metrics != null && metrics != Metrics.EMPTY) {
stats = stats.withConnectorSplitSourceMetrics(metrics);
}
}
operatorStats.add(stats);
}

// Note: a plan node may be mapped to multiple operators
Map<PlanNodeId, Collection<OperatorStats>> allOperatorStats = Multimaps.index(stageInfo.stageStats().getOperatorSummaries(), OperatorStats::getPlanNodeId).asMap();
Map<PlanNodeId, Collection<OperatorStats>> allOperatorStats = Multimaps.index(operatorStats.build(), OperatorStats::getPlanNodeId).asMap();

// Sometimes a plan node is merged with other nodes into a single operator, and in that case,
// use the stats of the nearest parent node with stats.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ public class MockConnector
private final Optional<ConnectorAccessControl> accessControl;
private final Function<SchemaTableName, List<List<?>>> data;
private final Function<SchemaTableName, Metrics> metrics;
private final Function<SchemaTableName, Metrics> splitSourceMetrics;
private final Set<Procedure> procedures;
private final Set<TableProcedureMetadata> tableProcedures;
private final Set<ConnectorTableFunction> tableFunctions;
Expand Down Expand Up @@ -236,6 +237,7 @@ public class MockConnector
Optional<ConnectorAccessControl> accessControl,
Function<SchemaTableName, List<List<?>>> data,
Function<SchemaTableName, Metrics> metrics,
Function<SchemaTableName, Metrics> splitSourceMetrics,
Set<Procedure> procedures,
Set<TableProcedureMetadata> tableProcedures,
Set<ConnectorTableFunction> tableFunctions,
Expand Down Expand Up @@ -291,6 +293,7 @@ public class MockConnector
this.accessControl = requireNonNull(accessControl, "accessControl is null");
this.data = requireNonNull(data, "data is null");
this.metrics = requireNonNull(metrics, "metrics is null");
this.splitSourceMetrics = requireNonNull(splitSourceMetrics, "splitSourceMetrics is null");
this.procedures = requireNonNull(procedures, "procedures is null");
this.tableProcedures = requireNonNull(tableProcedures, "tableProcedures is null");
this.tableFunctions = requireNonNull(tableFunctions, "tableFunctions is null");
Expand Down Expand Up @@ -351,7 +354,14 @@ public ConnectorSplitSource getSplits(
DynamicFilter dynamicFilter,
Constraint constraint)
{
return new FixedSplitSource(MOCK_CONNECTOR_SPLIT);
SchemaTableName tableName = ((MockConnectorTableHandle) table).getTableName();
return new FixedSplitSource(MOCK_CONNECTOR_SPLIT) {
@Override
public Metrics getMetrics()
{
return splitSourceMetrics.apply(tableName);
}
};
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ public class MockConnectorFactory
private final Collection<String> branches;
private final Function<SchemaTableName, List<List<?>>> data;
private final Function<SchemaTableName, Metrics> metrics;
private final Function<SchemaTableName, Metrics> splitSourceMetrics;
private final Set<Procedure> procedures;
private final Set<TableProcedureMetadata> tableProcedures;
private final Set<ConnectorTableFunction> tableFunctions;
Expand Down Expand Up @@ -192,6 +193,7 @@ private MockConnectorFactory(
Collection<String> branches,
Function<SchemaTableName, List<List<?>>> data,
Function<SchemaTableName, Metrics> metrics,
Function<SchemaTableName, Metrics> splitSourceMetrics,
Set<Procedure> procedures,
Set<TableProcedureMetadata> tableProcedures,
Set<ConnectorTableFunction> tableFunctions,
Expand Down Expand Up @@ -255,6 +257,7 @@ private MockConnectorFactory(
this.accessControl = requireNonNull(accessControl, "accessControl is null");
this.data = requireNonNull(data, "data is null");
this.metrics = requireNonNull(metrics, "metrics is null");
this.splitSourceMetrics = requireNonNull(splitSourceMetrics, "splitSourceMetrics is null");
this.procedures = requireNonNull(procedures, "procedures is null");
this.tableProcedures = requireNonNull(tableProcedures, "tableProcedures is null");
this.tableFunctions = requireNonNull(tableFunctions, "tableFunctions is null");
Expand Down Expand Up @@ -317,6 +320,7 @@ public Connector create(String catalogName, Map<String, String> config, Connecto
accessControl,
data,
metrics,
splitSourceMetrics,
procedures,
tableProcedures,
tableFunctions,
Expand Down Expand Up @@ -469,6 +473,7 @@ public static final class Builder
private BiFunction<ConnectorSession, SchemaTableName, Optional<CatalogSchemaTableName>> redirectTable = (session, tableName) -> Optional.empty();
private Function<SchemaTableName, List<List<?>>> data = schemaTableName -> ImmutableList.of();
private Function<SchemaTableName, Metrics> metrics = schemaTableName -> EMPTY;
private Function<SchemaTableName, Metrics> splitSourceMetrics = schemaTableName -> EMPTY;
private Set<Procedure> procedures = ImmutableSet.of();
private Set<TableProcedureMetadata> tableProcedures = ImmutableSet.of();
private Set<ConnectorTableFunction> tableFunctions = ImmutableSet.of();
Expand Down Expand Up @@ -735,6 +740,12 @@ public Builder withMetrics(Function<SchemaTableName, Metrics> metrics)
return this;
}

public Builder withSplitSourceMetrics(Function<SchemaTableName, Metrics> splitSourceMetrics)
{
this.splitSourceMetrics = requireNonNull(splitSourceMetrics, "splitSourceMetrics is null");
return this;
}

public Builder withProcedures(Iterable<Procedure> procedures)
{
this.procedures = ImmutableSet.copyOf(procedures);
Expand Down Expand Up @@ -904,6 +915,7 @@ public MockConnectorFactory build()
branches,
data,
metrics,
splitSourceMetrics,
procedures,
tableProcedures,
tableFunctions,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ public class TestEventListenerBasic
private static final String VARCHAR_TYPE = "varchar(15)";
private static final String BIGINT_TYPE = BIGINT.getDisplayName();
private static final Metrics TEST_METRICS = new Metrics(ImmutableMap.of("test_metrics", new LongCount(1)));
private static final Metrics TEST_SPLIT_SOURCE_METRICS = new Metrics(ImmutableMap.of("test_split_source_metrics", new LongCount(2)));

private EventsAwaitingQueries queries;

Expand Down Expand Up @@ -266,6 +267,12 @@ public Iterable<ConnectorFactory> getConnectorFactories()
}
return EMPTY;
})
.withSplitSourceMetrics(schemaTableName -> {
if (schemaTableName.equals(new SchemaTableName("tiny", "nation"))) {
return TEST_SPLIT_SOURCE_METRICS;
}
return EMPTY;
})
.withRowFilter(schemaTableName -> {
if (schemaTableName.getTableName().equals("test_table_with_row_filter")) {
return ViewExpression.builder()
Expand Down Expand Up @@ -1428,7 +1435,7 @@ public void testConnectorMetrics()
List<Metrics> connectorMetrics = event.getIoMetadata().getInputs().stream()
.map(QueryInputMetadata::getConnectorMetrics)
.collect(toImmutableList());
assertThat(connectorMetrics).containsExactly(TEST_METRICS);
assertThat(connectorMetrics).containsExactly(TEST_METRICS.mergeWith(TEST_SPLIT_SOURCE_METRICS));
}

@Test
Expand Down