diff --git a/core/trino-main/src/main/java/io/trino/event/QueryMonitor.java b/core/trino-main/src/main/java/io/trino/event/QueryMonitor.java index 7634f69f5636..02075e4ff852 100644 --- a/core/trino-main/src/main/java/io/trino/event/QueryMonitor.java +++ b/core/trino-main/src/main/java/io/trino/event/QueryMonitor.java @@ -538,8 +538,21 @@ private static void extractPlanNodeStats(StageInfo stageInfo, ImmutableMultimap. return; } + List operatorSummaries = stageInfo.stageStats().getOperatorSummaries(); + Map splitSourceMetrics = stageInfo.stageStats().getSplitSourceMetrics(); + ImmutableList.Builder operatorStats = ImmutableList.builderWithExpectedSize(operatorSummaries.size()); + for (OperatorStats stats : operatorSummaries) { + if (stats.getSourceId().isPresent()) { + Metrics metrics = splitSourceMetrics.get(stats.getSourceId().get()); + if (metrics != null && metrics != Metrics.EMPTY) { + stats = stats.withConnectorSplitSourceMetrics(metrics); + } + } + operatorStats.add(stats); + } + // Note: a plan node may be mapped to multiple operators - Map> allOperatorStats = Multimaps.index(stageInfo.stageStats().getOperatorSummaries(), OperatorStats::getPlanNodeId).asMap(); + Map> allOperatorStats = Multimaps.index(operatorStats.build(), OperatorStats::getPlanNodeId).asMap(); // Sometimes a plan node is merged with other nodes into a single operator, and in that case, // use the stats of the nearest parent node with stats. diff --git a/core/trino-main/src/test/java/io/trino/connector/MockConnector.java b/core/trino-main/src/test/java/io/trino/connector/MockConnector.java index 4b57c05d4929..b3aa8f30e55c 100644 --- a/core/trino-main/src/test/java/io/trino/connector/MockConnector.java +++ b/core/trino-main/src/test/java/io/trino/connector/MockConnector.java @@ -180,6 +180,7 @@ public class MockConnector private final Optional accessControl; private final Function>> data; private final Function metrics; + private final Function splitSourceMetrics; private final Set procedures; private final Set tableProcedures; private final Set tableFunctions; @@ -236,6 +237,7 @@ public class MockConnector Optional accessControl, Function>> data, Function metrics, + Function splitSourceMetrics, Set procedures, Set tableProcedures, Set tableFunctions, @@ -291,6 +293,7 @@ public class MockConnector this.accessControl = requireNonNull(accessControl, "accessControl is null"); this.data = requireNonNull(data, "data is null"); this.metrics = requireNonNull(metrics, "metrics is null"); + this.splitSourceMetrics = requireNonNull(splitSourceMetrics, "splitSourceMetrics is null"); this.procedures = requireNonNull(procedures, "procedures is null"); this.tableProcedures = requireNonNull(tableProcedures, "tableProcedures is null"); this.tableFunctions = requireNonNull(tableFunctions, "tableFunctions is null"); @@ -351,7 +354,14 @@ public ConnectorSplitSource getSplits( DynamicFilter dynamicFilter, Constraint constraint) { - return new FixedSplitSource(MOCK_CONNECTOR_SPLIT); + SchemaTableName tableName = ((MockConnectorTableHandle) table).getTableName(); + return new FixedSplitSource(MOCK_CONNECTOR_SPLIT) { + @Override + public Metrics getMetrics() + { + return splitSourceMetrics.apply(tableName); + } + }; } @Override diff --git a/core/trino-main/src/test/java/io/trino/connector/MockConnectorFactory.java b/core/trino-main/src/test/java/io/trino/connector/MockConnectorFactory.java index 55542585dd32..36a86b71be58 100644 --- a/core/trino-main/src/test/java/io/trino/connector/MockConnectorFactory.java +++ b/core/trino-main/src/test/java/io/trino/connector/MockConnectorFactory.java @@ -132,6 +132,7 @@ public class MockConnectorFactory private final Collection branches; private final Function>> data; private final Function metrics; + private final Function splitSourceMetrics; private final Set procedures; private final Set tableProcedures; private final Set tableFunctions; @@ -192,6 +193,7 @@ private MockConnectorFactory( Collection branches, Function>> data, Function metrics, + Function splitSourceMetrics, Set procedures, Set tableProcedures, Set tableFunctions, @@ -255,6 +257,7 @@ private MockConnectorFactory( this.accessControl = requireNonNull(accessControl, "accessControl is null"); this.data = requireNonNull(data, "data is null"); this.metrics = requireNonNull(metrics, "metrics is null"); + this.splitSourceMetrics = requireNonNull(splitSourceMetrics, "splitSourceMetrics is null"); this.procedures = requireNonNull(procedures, "procedures is null"); this.tableProcedures = requireNonNull(tableProcedures, "tableProcedures is null"); this.tableFunctions = requireNonNull(tableFunctions, "tableFunctions is null"); @@ -317,6 +320,7 @@ public Connector create(String catalogName, Map config, Connecto accessControl, data, metrics, + splitSourceMetrics, procedures, tableProcedures, tableFunctions, @@ -469,6 +473,7 @@ public static final class Builder private BiFunction> redirectTable = (session, tableName) -> Optional.empty(); private Function>> data = schemaTableName -> ImmutableList.of(); private Function metrics = schemaTableName -> EMPTY; + private Function splitSourceMetrics = schemaTableName -> EMPTY; private Set procedures = ImmutableSet.of(); private Set tableProcedures = ImmutableSet.of(); private Set tableFunctions = ImmutableSet.of(); @@ -735,6 +740,12 @@ public Builder withMetrics(Function metrics) return this; } + public Builder withSplitSourceMetrics(Function splitSourceMetrics) + { + this.splitSourceMetrics = requireNonNull(splitSourceMetrics, "splitSourceMetrics is null"); + return this; + } + public Builder withProcedures(Iterable procedures) { this.procedures = ImmutableSet.copyOf(procedures); @@ -904,6 +915,7 @@ public MockConnectorFactory build() branches, data, metrics, + splitSourceMetrics, procedures, tableProcedures, tableFunctions, diff --git a/testing/trino-tests/src/test/java/io/trino/execution/TestEventListenerBasic.java b/testing/trino-tests/src/test/java/io/trino/execution/TestEventListenerBasic.java index 9de6f5b60a88..3b1261a37dc0 100644 --- a/testing/trino-tests/src/test/java/io/trino/execution/TestEventListenerBasic.java +++ b/testing/trino-tests/src/test/java/io/trino/execution/TestEventListenerBasic.java @@ -118,6 +118,7 @@ public class TestEventListenerBasic private static final String VARCHAR_TYPE = "varchar(15)"; private static final String BIGINT_TYPE = BIGINT.getDisplayName(); private static final Metrics TEST_METRICS = new Metrics(ImmutableMap.of("test_metrics", new LongCount(1))); + private static final Metrics TEST_SPLIT_SOURCE_METRICS = new Metrics(ImmutableMap.of("test_split_source_metrics", new LongCount(2))); private EventsAwaitingQueries queries; @@ -266,6 +267,12 @@ public Iterable getConnectorFactories() } return EMPTY; }) + .withSplitSourceMetrics(schemaTableName -> { + if (schemaTableName.equals(new SchemaTableName("tiny", "nation"))) { + return TEST_SPLIT_SOURCE_METRICS; + } + return EMPTY; + }) .withRowFilter(schemaTableName -> { if (schemaTableName.getTableName().equals("test_table_with_row_filter")) { return ViewExpression.builder() @@ -1428,7 +1435,7 @@ public void testConnectorMetrics() List connectorMetrics = event.getIoMetadata().getInputs().stream() .map(QueryInputMetadata::getConnectorMetrics) .collect(toImmutableList()); - assertThat(connectorMetrics).containsExactly(TEST_METRICS); + assertThat(connectorMetrics).containsExactly(TEST_METRICS.mergeWith(TEST_SPLIT_SOURCE_METRICS)); } @Test