diff --git a/CHANGELOG.md b/CHANGELOG.md index bce6cbbb66..c7091ff9cb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -107,6 +107,8 @@ request adding CHANGELOG notes for breaking (!) changes and possibly other secti ### New Features +- Added a **beta** Table Metrics REST API (`/api/metrics-reports/v1/`) that exposes persisted Iceberg scan and commit metrics reports over HTTP. Querying requires the new `TABLE_READ_METRICS` privilege on the target table. +- Added `envFrom` support in Helm chart. - Added `deploymentAnnotations` support in Helm chart. - Added KMS properties (optional) to catalog storage config to enable S3 data encryption. - Added `topologySpreadConstraints` support in Helm chart. diff --git a/api/metrics-reports-service/build.gradle.kts b/api/metrics-reports-service/build.gradle.kts new file mode 100644 index 0000000000..ce80aceb23 --- /dev/null +++ b/api/metrics-reports-service/build.gradle.kts @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.openapitools.generator.gradle.plugin.tasks.GenerateTask + +plugins { + alias(libs.plugins.openapi.generator) + id("polaris-client") + id("org.kordamp.gradle.jandex") +} + +dependencies { + implementation(project(":polaris-core")) + + compileOnly(platform(libs.jackson.bom)) + compileOnly("com.fasterxml.jackson.core:jackson-annotations") + + compileOnly(libs.jakarta.annotation.api) + compileOnly(libs.jakarta.inject.api) + compileOnly(libs.jakarta.validation.api) + compileOnly(libs.microprofile.fault.tolerance.api) + compileOnly(libs.swagger.annotations) + + implementation(libs.jakarta.servlet.api) + implementation(libs.jakarta.ws.rs.api) + + compileOnly(platform(libs.micrometer.bom)) + compileOnly("io.micrometer:micrometer-core") + + implementation(libs.slf4j.api) +} + +val rootDir = rootProject.layout.projectDirectory +val specsDir = rootDir.dir("spec") +val templatesDir = rootDir.dir("server-templates") +val generatedDir = project.layout.buildDirectory.dir("generated-openapi") +val generatedOpenApiSrcDir = project.layout.buildDirectory.dir("generated-openapi/src/main/java") + +openApiGenerate { + inputSpec = provider { specsDir.file("metrics-reports-service.yml").asFile.absolutePath } + generatorName = "jaxrs-resteasy" + outputDir = provider { generatedDir.get().asFile.absolutePath } + apiPackage = "org.apache.polaris.service.metrics.api" + modelPackage = "org.apache.polaris.core.metrics.api.model" + ignoreFileOverride.set(provider { rootDir.file(".openapi-generator-ignore").asFile.absolutePath }) + removeOperationIdPrefix.set(true) + templateDir.set(provider { templatesDir.asFile.absolutePath }) + globalProperties.put("apis", "") + globalProperties.put("models", "") + globalProperties.put("apiDocs", "false") + globalProperties.put("modelTests", "false") + configOptions.put("openApiNullable", "false") + configOptions.put("useBeanValidation", "true") + configOptions.put("sourceFolder", "src/main/java") + configOptions.put("useJakartaEe", "true") + configOptions.put("generateBuilders", "true") + configOptions.put("generateConstructorWithAllArgs", "true") + configOptions.put("hideGenerationTimestamp", "true") + additionalProperties.put("apiNamePrefix", "Polaris") + additionalProperties.put("apiNameSuffix", "Api") + additionalProperties.put("metricsPrefix", "polaris") + serverVariables.put("basePath", "api/metrics-reports/v1") +} + +listOf("sourcesJar", "compileJava", "processResources").forEach { task -> + tasks.named(task) { dependsOn("openApiGenerate") } +} + +sourceSets { main { java { srcDir(generatedOpenApiSrcDir) } } } + +tasks.named("openApiGenerate") { + inputs.dir(templatesDir) + inputs.dir(specsDir) + actions.addFirst { delete { delete(generatedDir) } } +} + +tasks.named("javadoc") { dependsOn("jandex") } diff --git a/bom/build.gradle.kts b/bom/build.gradle.kts index 8a1fa16b90..0769a60fb1 100644 --- a/bom/build.gradle.kts +++ b/bom/build.gradle.kts @@ -90,6 +90,7 @@ dependencies { api(project(":polaris-relational-jdbc")) api(project(":polaris-extensions-auth-opa")) + api(project(":polaris-extensions-metrics-reports")) api(project(":polaris-admin")) api(project(":polaris-runtime-common")) diff --git a/extensions/metrics-reports/impl/build.gradle.kts b/extensions/metrics-reports/impl/build.gradle.kts new file mode 100644 index 0000000000..08e060822c --- /dev/null +++ b/extensions/metrics-reports/impl/build.gradle.kts @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +plugins { + id("polaris-server") + id("org.kordamp.gradle.jandex") +} + +dependencies { + implementation(project(":polaris-core")) + implementation(project(":polaris-api-metrics-reports-service")) + + implementation(platform(libs.iceberg.bom)) + implementation("org.apache.iceberg:iceberg-api") + + implementation(libs.jakarta.enterprise.cdi.api) + implementation(libs.jakarta.inject.api) + implementation(libs.jakarta.ws.rs.api) + compileOnly(libs.jakarta.annotation.api) + + implementation(platform(libs.jackson.bom)) + implementation("com.fasterxml.jackson.core:jackson-annotations") + + testImplementation(platform(libs.junit.bom)) + testImplementation("org.junit.jupiter:junit-jupiter") + testImplementation(libs.assertj.core) + testImplementation(libs.mockito.core) + testImplementation(libs.jakarta.ws.rs.api) + + // Provides jakarta.ws.rs.ext.RuntimeDelegate needed to build Response objects in unit tests + testRuntimeOnly(enforcedPlatform(libs.quarkus.bom)) + testRuntimeOnly("io.quarkus.resteasy.reactive:resteasy-reactive") +} diff --git a/extensions/metrics-reports/impl/src/main/java/org/apache/polaris/extension/metrics/reports/CommitMetricsReport.java b/extensions/metrics-reports/impl/src/main/java/org/apache/polaris/extension/metrics/reports/CommitMetricsReport.java new file mode 100644 index 0000000000..d3c42bf1a4 --- /dev/null +++ b/extensions/metrics-reports/impl/src/main/java/org/apache/polaris/extension/metrics/reports/CommitMetricsReport.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.extension.metrics.reports; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; + +/** A single commit metrics report entry returned by the list API. */ +public record CommitMetricsReport( + @JsonProperty("id") String id, + @JsonProperty("timestampMs") long timestampMs, + @JsonProperty("actor") MetricsReportActor actor, + @JsonProperty("request") MetricsReportRequest request, + @JsonProperty("object") TableObject object, + @JsonProperty("payload") Payload payload) { + + /** The Iceberg object (table snapshot) associated with the commit. */ + public record TableObject(@JsonProperty("snapshotId") long snapshotId) {} + + /** The commit metrics payload envelope. */ + public record Payload( + @JsonProperty("type") String type, + @JsonProperty("version") int version, + @JsonProperty("data") Data data) { + + /** Iceberg commit metrics data fields. */ + @JsonInclude(JsonInclude.Include.NON_NULL) + public record Data( + @JsonProperty("sequenceNumber") Long sequenceNumber, + @JsonProperty("operation") String operation, + @JsonProperty("addedDataFiles") long addedDataFiles, + @JsonProperty("removedDataFiles") long removedDataFiles, + @JsonProperty("totalDataFiles") long totalDataFiles, + @JsonProperty("addedDeleteFiles") long addedDeleteFiles, + @JsonProperty("removedDeleteFiles") long removedDeleteFiles, + @JsonProperty("totalDeleteFiles") long totalDeleteFiles, + @JsonProperty("addedEqualityDeleteFiles") long addedEqualityDeleteFiles, + @JsonProperty("removedEqualityDeleteFiles") long removedEqualityDeleteFiles, + @JsonProperty("addedPositionalDeleteFiles") long addedPositionalDeleteFiles, + @JsonProperty("removedPositionalDeleteFiles") long removedPositionalDeleteFiles, + @JsonProperty("addedRecords") long addedRecords, + @JsonProperty("removedRecords") long removedRecords, + @JsonProperty("totalRecords") long totalRecords, + @JsonProperty("addedFileSizeBytes") long addedFileSizeBytes, + @JsonProperty("removedFileSizeBytes") long removedFileSizeBytes, + @JsonProperty("totalFileSizeBytes") long totalFileSizeBytes, + @JsonProperty("totalDurationMs") Long totalDurationMs, + @JsonProperty("attempts") int attempts) {} + } +} diff --git a/extensions/metrics-reports/impl/src/main/java/org/apache/polaris/extension/metrics/reports/MetricsListResponse.java b/extensions/metrics-reports/impl/src/main/java/org/apache/polaris/extension/metrics/reports/MetricsListResponse.java new file mode 100644 index 0000000000..d0be274913 --- /dev/null +++ b/extensions/metrics-reports/impl/src/main/java/org/apache/polaris/extension/metrics/reports/MetricsListResponse.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.extension.metrics.reports; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import java.util.List; + +/** Paginated list response for metrics reports. */ +public record MetricsListResponse( + @JsonProperty("metricType") String metricType, + @JsonInclude(JsonInclude.Include.NON_NULL) @JsonProperty("nextPageToken") String nextPageToken, + @JsonProperty("reports") List reports) {} diff --git a/extensions/metrics-reports/impl/src/main/java/org/apache/polaris/extension/metrics/reports/MetricsReportActor.java b/extensions/metrics-reports/impl/src/main/java/org/apache/polaris/extension/metrics/reports/MetricsReportActor.java new file mode 100644 index 0000000000..95827ecdf8 --- /dev/null +++ b/extensions/metrics-reports/impl/src/main/java/org/apache/polaris/extension/metrics/reports/MetricsReportActor.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.extension.metrics.reports; + +import com.fasterxml.jackson.annotation.JsonProperty; + +/** The principal that triggered the metrics report. */ +public record MetricsReportActor(@JsonProperty("principalName") String principalName) {} diff --git a/extensions/metrics-reports/impl/src/main/java/org/apache/polaris/extension/metrics/reports/MetricsReportRequest.java b/extensions/metrics-reports/impl/src/main/java/org/apache/polaris/extension/metrics/reports/MetricsReportRequest.java new file mode 100644 index 0000000000..038c8a17dc --- /dev/null +++ b/extensions/metrics-reports/impl/src/main/java/org/apache/polaris/extension/metrics/reports/MetricsReportRequest.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.extension.metrics.reports; + +import com.fasterxml.jackson.annotation.JsonProperty; + +/** Request context associated with a metrics report. */ +public record MetricsReportRequest( + @JsonProperty("requestId") String requestId, + @JsonProperty("otelTraceId") String otelTraceId, + @JsonProperty("otelSpanId") String otelSpanId) {} diff --git a/extensions/metrics-reports/impl/src/main/java/org/apache/polaris/extension/metrics/reports/MetricsReportsService.java b/extensions/metrics-reports/impl/src/main/java/org/apache/polaris/extension/metrics/reports/MetricsReportsService.java new file mode 100644 index 0000000000..faf85c3458 --- /dev/null +++ b/extensions/metrics-reports/impl/src/main/java/org/apache/polaris/extension/metrics/reports/MetricsReportsService.java @@ -0,0 +1,277 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.extension.metrics.reports; + +import jakarta.annotation.Nonnull; +import jakarta.enterprise.context.RequestScoped; +import jakarta.inject.Inject; +import jakarta.ws.rs.core.Response; +import jakarta.ws.rs.core.SecurityContext; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.NotFoundException; +import org.apache.polaris.core.auth.PolarisAuthorizableOperation; +import org.apache.polaris.core.auth.PolarisAuthorizer; +import org.apache.polaris.core.auth.PolarisPrincipal; +import org.apache.polaris.core.catalog.PolarisCatalogHelpers; +import org.apache.polaris.core.context.CallContext; +import org.apache.polaris.core.context.RealmContext; +import org.apache.polaris.core.entity.PolarisEntity; +import org.apache.polaris.core.entity.PolarisEntitySubType; +import org.apache.polaris.core.entity.PolarisEntityType; +import org.apache.polaris.core.persistence.PolarisMetaStoreManager; +import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper; +import org.apache.polaris.core.persistence.metrics.CommitMetricsRecord; +import org.apache.polaris.core.persistence.metrics.ScanMetricsRecord; +import org.apache.polaris.core.persistence.pagination.Page; +import org.apache.polaris.core.persistence.pagination.PageToken; +import org.apache.polaris.core.persistence.resolver.PolarisResolutionManifest; +import org.apache.polaris.core.persistence.resolver.ResolutionManifestFactory; +import org.apache.polaris.core.persistence.resolver.ResolvedPathKey; +import org.apache.polaris.core.persistence.resolver.ResolverPath; +import org.apache.polaris.core.persistence.resolver.ResolverStatus; +import org.apache.polaris.service.metrics.api.PolarisCatalogsApiService; + +/** + * Service implementation for the Metrics Reports API. + * + *

Resolves catalog/namespace/table names to internal IDs, performs authorization, and delegates + * to the {@link org.apache.polaris.core.persistence.metrics.MetricsPersistence} layer to retrieve + * persisted metrics reports. + */ +@RequestScoped +public class MetricsReportsService implements PolarisCatalogsApiService { + + private final CallContext callContext; + private final PolarisMetaStoreManager metaStoreManager; + private final PolarisAuthorizer authorizer; + private final PolarisPrincipal polarisPrincipal; + private final ResolutionManifestFactory resolutionManifestFactory; + + @Inject + public MetricsReportsService( + @Nonnull CallContext callContext, + @Nonnull PolarisMetaStoreManager metaStoreManager, + @Nonnull PolarisAuthorizer authorizer, + @Nonnull PolarisPrincipal polarisPrincipal, + @Nonnull ResolutionManifestFactory resolutionManifestFactory) { + this.callContext = callContext; + this.metaStoreManager = metaStoreManager; + this.authorizer = authorizer; + this.polarisPrincipal = polarisPrincipal; + this.resolutionManifestFactory = resolutionManifestFactory; + } + + @Override + public Response listTableMetrics( + String catalogName, + String namespace, + String table, + String metricType, + String pageToken, + Integer pageSize, + Long snapshotId, + String principalName, + Long timestampFrom, + Long timestampTo, + RealmContext realmContext, + SecurityContext securityContext) { + + Namespace ns = decodeNamespace(namespace); + TableIdentifier identifier = TableIdentifier.of(ns, table); + + // Resolve and authorize + PolarisResolvedPathWrapper tableWrapper = + resolveAndAuthorizeTableMetrics(catalogName, identifier); + + PolarisEntity tableEntity = tableWrapper.getResolvedLeafEntity().getEntity(); + long catalogId = tableEntity.getCatalogId(); + long tableId = tableEntity.getId(); + + PageToken token = PageToken.build(pageToken, pageSize, () -> true); + + return switch (metricType) { + case "scan" -> { + Page page = + metaStoreManager.listScanMetrics( + callContext.getPolarisCallContext(), + catalogId, + tableId, + snapshotId, + principalName, + timestampFrom, + timestampTo, + token); + yield Response.ok(buildScanResponse(page)).build(); + } + case "commit" -> { + Page page = + metaStoreManager.listCommitMetrics( + callContext.getPolarisCallContext(), + catalogId, + tableId, + snapshotId, + principalName, + timestampFrom, + timestampTo, + token); + yield Response.ok(buildCommitResponse(page)).build(); + } + default -> + throw new IllegalArgumentException( + "Invalid metricType: " + metricType + "; must be 'scan' or 'commit'"); + }; + } + + private PolarisResolvedPathWrapper resolveAndAuthorizeTableMetrics( + String catalogName, TableIdentifier identifier) { + PolarisResolutionManifest manifest = + resolutionManifestFactory.createResolutionManifest(polarisPrincipal, catalogName); + manifest.addPassthroughPath( + new ResolverPath( + Arrays.asList(identifier.namespace().levels()), PolarisEntityType.NAMESPACE)); + manifest.addPassthroughPath( + new ResolverPath( + PolarisCatalogHelpers.tableIdentifierToList(identifier), PolarisEntityType.TABLE_LIKE)); + ResolverStatus status = manifest.resolveAll(); + + if (status.getStatus() == ResolverStatus.StatusEnum.ENTITY_COULD_NOT_BE_RESOLVED) { + throw new NotFoundException( + "TopLevelEntity of type %s does not exist: %s", + status.getFailedToResolvedEntityType(), status.getFailedToResolvedEntityName()); + } + if (status.getStatus() == ResolverStatus.StatusEnum.PATH_COULD_NOT_BE_FULLY_RESOLVED) { + throw new NotFoundException("Table not found: %s", identifier); + } + + PolarisResolvedPathWrapper tableWrapper = + manifest.getResolvedPath( + ResolvedPathKey.ofTableLike(identifier), PolarisEntitySubType.ANY_SUBTYPE, true); + + if (tableWrapper == null) { + throw new NotFoundException("Table not found: %s", identifier); + } + + authorizer.authorizeOrThrow( + polarisPrincipal, + manifest.getAllActivatedCatalogRoleAndPrincipalRoles(), + PolarisAuthorizableOperation.LIST_TABLE_METRICS, + tableWrapper, + null); + + return tableWrapper; + } + + private static Namespace decodeNamespace(String encodedNamespace) { + if (encodedNamespace == null || encodedNamespace.isEmpty()) { + throw new IllegalArgumentException("namespace must not be empty"); + } + // Multi-level namespaces use unit separator (0x1F / %1F) between levels + String[] levels = encodedNamespace.split("\u001F", -1); + return Namespace.of(levels); + } + + private static MetricsListResponse buildScanResponse( + Page page) { + List reports = + page.items().stream().map(MetricsReportsService::toScanReport).toList(); + return new MetricsListResponse<>("scan", page.encodedResponseToken(), reports); + } + + private static MetricsListResponse buildCommitResponse( + Page page) { + List reports = + page.items().stream().map(MetricsReportsService::toCommitReport).toList(); + return new MetricsListResponse<>("commit", page.encodedResponseToken(), reports); + } + + private static ScanMetricsReport toScanReport(ScanMetricsRecord r) { + MetricsReportActor actor = new MetricsReportActor(r.principalName()); + MetricsReportRequest request = + new MetricsReportRequest(r.requestId(), r.otelTraceId(), r.otelSpanId()); + ScanMetricsReport.TableObject object = + new ScanMetricsReport.TableObject(r.snapshotId().orElse(null)); + ScanMetricsReport.Payload.Data data = + new ScanMetricsReport.Payload.Data( + r.schemaId().orElse(null), + r.filterExpression().orElse(null), + r.projectedFieldIds().isEmpty() + ? null + : r.projectedFieldIds().stream() + .map(Object::toString) + .collect(Collectors.joining(",")), + r.projectedFieldNames().isEmpty() ? null : String.join(",", r.projectedFieldNames()), + r.resultDataFiles(), + r.resultDeleteFiles(), + r.totalFileSizeBytes(), + r.totalDataManifests(), + r.totalDeleteManifests(), + r.scannedDataManifests(), + r.scannedDeleteManifests(), + r.skippedDataManifests(), + r.skippedDeleteManifests(), + r.skippedDataFiles(), + r.skippedDeleteFiles(), + r.totalPlanningDurationMs(), + r.equalityDeleteFiles(), + r.positionalDeleteFiles(), + r.indexedDeleteFiles(), + r.totalDeleteFileSizeBytes()); + ScanMetricsReport.Payload payload = + new ScanMetricsReport.Payload("iceberg.metrics.scan", 1, data); + return new ScanMetricsReport( + r.reportId(), r.timestamp().toEpochMilli(), actor, request, object, payload); + } + + private static CommitMetricsReport toCommitReport(CommitMetricsRecord r) { + MetricsReportActor actor = new MetricsReportActor(r.principalName()); + MetricsReportRequest request = + new MetricsReportRequest(r.requestId(), r.otelTraceId(), r.otelSpanId()); + CommitMetricsReport.TableObject object = new CommitMetricsReport.TableObject(r.snapshotId()); + CommitMetricsReport.Payload.Data data = + new CommitMetricsReport.Payload.Data( + r.sequenceNumber().orElse(null), + r.operation(), + r.addedDataFiles(), + r.removedDataFiles(), + r.totalDataFiles(), + r.addedDeleteFiles(), + r.removedDeleteFiles(), + r.totalDeleteFiles(), + r.addedEqualityDeleteFiles(), + r.removedEqualityDeleteFiles(), + r.addedPositionalDeleteFiles(), + r.removedPositionalDeleteFiles(), + r.addedRecords(), + r.removedRecords(), + r.totalRecords(), + r.addedFileSizeBytes(), + r.removedFileSizeBytes(), + r.totalFileSizeBytes(), + r.totalDurationMs().orElse(null), + r.attempts()); + CommitMetricsReport.Payload payload = + new CommitMetricsReport.Payload("iceberg.metrics.commit", 1, data); + return new CommitMetricsReport( + r.reportId(), r.timestamp().toEpochMilli(), actor, request, object, payload); + } +} diff --git a/extensions/metrics-reports/impl/src/main/java/org/apache/polaris/extension/metrics/reports/ScanMetricsReport.java b/extensions/metrics-reports/impl/src/main/java/org/apache/polaris/extension/metrics/reports/ScanMetricsReport.java new file mode 100644 index 0000000000..f4bd2f1f7c --- /dev/null +++ b/extensions/metrics-reports/impl/src/main/java/org/apache/polaris/extension/metrics/reports/ScanMetricsReport.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.extension.metrics.reports; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; + +/** A single scan metrics report entry returned by the list API. */ +public record ScanMetricsReport( + @JsonProperty("id") String id, + @JsonProperty("timestampMs") long timestampMs, + @JsonProperty("actor") MetricsReportActor actor, + @JsonProperty("request") MetricsReportRequest request, + @JsonProperty("object") TableObject object, + @JsonProperty("payload") Payload payload) { + + /** The Iceberg object (table snapshot) associated with the scan. */ + @JsonInclude(JsonInclude.Include.NON_NULL) + public record TableObject(@JsonProperty("snapshotId") Long snapshotId) {} + + /** The scan metrics payload envelope. */ + public record Payload( + @JsonProperty("type") String type, + @JsonProperty("version") int version, + @JsonProperty("data") Data data) { + + /** Iceberg scan metrics data fields. */ + @JsonInclude(JsonInclude.Include.NON_NULL) + public record Data( + @JsonProperty("schemaId") Integer schemaId, + @JsonProperty("filterExpression") String filterExpression, + @JsonProperty("projectedFieldIds") String projectedFieldIds, + @JsonProperty("projectedFieldNames") String projectedFieldNames, + @JsonProperty("resultDataFiles") long resultDataFiles, + @JsonProperty("resultDeleteFiles") long resultDeleteFiles, + @JsonProperty("totalFileSizeBytes") long totalFileSizeBytes, + @JsonProperty("totalDataManifests") long totalDataManifests, + @JsonProperty("totalDeleteManifests") long totalDeleteManifests, + @JsonProperty("scannedDataManifests") long scannedDataManifests, + @JsonProperty("scannedDeleteManifests") long scannedDeleteManifests, + @JsonProperty("skippedDataManifests") long skippedDataManifests, + @JsonProperty("skippedDeleteManifests") long skippedDeleteManifests, + @JsonProperty("skippedDataFiles") long skippedDataFiles, + @JsonProperty("skippedDeleteFiles") long skippedDeleteFiles, + @JsonProperty("totalPlanningDurationMs") long totalPlanningDurationMs, + @JsonProperty("equalityDeleteFiles") long equalityDeleteFiles, + @JsonProperty("positionalDeleteFiles") long positionalDeleteFiles, + @JsonProperty("indexedDeleteFiles") long indexedDeleteFiles, + @JsonProperty("totalDeleteFileSizeBytes") long totalDeleteFileSizeBytes) {} + } +} diff --git a/extensions/metrics-reports/impl/src/test/java/org/apache/polaris/extension/metrics/reports/MetricsReportsServiceTest.java b/extensions/metrics-reports/impl/src/test/java/org/apache/polaris/extension/metrics/reports/MetricsReportsServiceTest.java new file mode 100644 index 0000000000..1865688e0f --- /dev/null +++ b/extensions/metrics-reports/impl/src/test/java/org/apache/polaris/extension/metrics/reports/MetricsReportsServiceTest.java @@ -0,0 +1,796 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.extension.metrics.reports; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.isNull; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import jakarta.ws.rs.ForbiddenException; +import jakarta.ws.rs.core.Response; +import jakarta.ws.rs.core.SecurityContext; +import java.time.Instant; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import org.apache.polaris.core.PolarisCallContext; +import org.apache.polaris.core.auth.PolarisAuthorizableOperation; +import org.apache.polaris.core.auth.PolarisAuthorizer; +import org.apache.polaris.core.auth.PolarisPrincipal; +import org.apache.polaris.core.context.CallContext; +import org.apache.polaris.core.context.RealmContext; +import org.apache.polaris.core.entity.PolarisEntity; +import org.apache.polaris.core.entity.PolarisEntitySubType; +import org.apache.polaris.core.entity.PolarisEntityType; +import org.apache.polaris.core.persistence.PolarisMetaStoreManager; +import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper; +import org.apache.polaris.core.persistence.ResolvedPolarisEntity; +import org.apache.polaris.core.persistence.metrics.CommitMetricsRecord; +import org.apache.polaris.core.persistence.metrics.MetricsPersistence; +import org.apache.polaris.core.persistence.metrics.ScanMetricsRecord; +import org.apache.polaris.core.persistence.pagination.Page; +import org.apache.polaris.core.persistence.pagination.PageToken; +import org.apache.polaris.core.persistence.resolver.PolarisResolutionManifest; +import org.apache.polaris.core.persistence.resolver.ResolutionManifestFactory; +import org.apache.polaris.core.persistence.resolver.ResolvedPathKey; +import org.apache.polaris.core.persistence.resolver.ResolverPath; +import org.apache.polaris.core.persistence.resolver.ResolverStatus; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +/** + * Unit tests for {@link MetricsReportsService}. + * + *

All heavy dependencies (resolution manifest, authorizer, persistence) are mocked so these + * tests run without any database or CDI container. + */ +class MetricsReportsServiceTest { + + private static final String CATALOG = "test-catalog"; + private static final String NAMESPACE = "db\u001Fschema"; // multi-level: db.schema + private static final String TABLE = "events"; + private static final long CATALOG_ID = 100L; + private static final long TABLE_ID = 200L; + + private MetricsPersistence persistence; + private PolarisMetaStoreManager metaStoreManager; + private PolarisAuthorizer authorizer; + private PolarisResolutionManifest manifest; + private PolarisPrincipal principal; + private ResolutionManifestFactory factory; + private MetricsReportsService service; + private RealmContext realmContext; + private SecurityContext securityContext; + + @BeforeEach + void setUp() { + persistence = mock(MetricsPersistence.class); + + metaStoreManager = mock(PolarisMetaStoreManager.class); + when(metaStoreManager.listScanMetrics( + any(PolarisCallContext.class), anyLong(), anyLong(), any(), any(), any(), any(), any())) + .thenAnswer( + inv -> + persistence.listScanReports( + inv.getArgument(1), + inv.getArgument(2), + inv.getArgument(3), + inv.getArgument(4), + inv.getArgument(5), + inv.getArgument(6), + inv.getArgument(7))); + when(metaStoreManager.listCommitMetrics( + any(PolarisCallContext.class), anyLong(), anyLong(), any(), any(), any(), any(), any())) + .thenAnswer( + inv -> + persistence.listCommitReports( + inv.getArgument(1), + inv.getArgument(2), + inv.getArgument(3), + inv.getArgument(4), + inv.getArgument(5), + inv.getArgument(6), + inv.getArgument(7))); + + PolarisCallContext polarisCallContext = mock(PolarisCallContext.class); + + CallContext callContext = mock(CallContext.class); + when(callContext.getPolarisCallContext()).thenReturn(polarisCallContext); + + authorizer = mock(PolarisAuthorizer.class); + doNothing() + .when(authorizer) + .authorizeOrThrow( + any(PolarisPrincipal.class), + any(Set.class), + any(PolarisAuthorizableOperation.class), + any(), + (org.apache.polaris.core.persistence.PolarisResolvedPathWrapper) isNull()); + + principal = mock(PolarisPrincipal.class); + + // Wire a resolution manifest that always succeeds for CATALOG/NAMESPACE/TABLE. + PolarisEntity tableEntity = mock(PolarisEntity.class); + when(tableEntity.getCatalogId()).thenReturn(CATALOG_ID); + when(tableEntity.getId()).thenReturn(TABLE_ID); + + ResolvedPolarisEntity resolvedLeaf = mock(ResolvedPolarisEntity.class); + when(resolvedLeaf.getEntity()).thenReturn(tableEntity); + + PolarisResolvedPathWrapper tableWrapper = mock(PolarisResolvedPathWrapper.class); + when(tableWrapper.getResolvedLeafEntity()).thenReturn(resolvedLeaf); + + manifest = mock(PolarisResolutionManifest.class); + when(manifest.resolveAll()).thenReturn(new ResolverStatus(ResolverStatus.StatusEnum.SUCCESS)); + when(manifest.getResolvedPath( + any(ResolvedPathKey.class), eq(PolarisEntitySubType.ANY_SUBTYPE), eq(true))) + .thenReturn(tableWrapper); + when(manifest.getAllActivatedCatalogRoleAndPrincipalRoles()).thenReturn(Set.of()); + + factory = mock(ResolutionManifestFactory.class); + when(factory.createResolutionManifest(eq(principal), eq(CATALOG))).thenReturn(manifest); + + service = + new MetricsReportsService(callContext, metaStoreManager, authorizer, principal, factory); + realmContext = mock(RealmContext.class); + securityContext = mock(SecurityContext.class); + } + + // ── scan ───────────────────────────────────────────────────────────────── + + @Test + @SuppressWarnings("unchecked") + void listScanMetricsReturns200WithReports() { + ScanMetricsRecord r = scanRecord("r-1"); + when(persistence.listScanReports( + eq(CATALOG_ID), + eq(TABLE_ID), + isNull(), + isNull(), + isNull(), + isNull(), + any(PageToken.class))) + .thenReturn(Page.fromItems(List.of(r))); + + Response response = + service.listTableMetrics( + CATALOG, + NAMESPACE, + TABLE, + "scan", + null, + 10, + null, + null, + null, + null, + realmContext, + securityContext); + + assertThat(response.getStatus()).isEqualTo(200); + MetricsListResponse body = (MetricsListResponse) response.getEntity(); + assertThat(body.metricType()).isEqualTo("scan"); + assertThat(body.reports()).hasSize(1); + } + + @Test + @SuppressWarnings("unchecked") + void listScanMetricsPaginationTokenPropagated() { + when(persistence.listScanReports( + eq(CATALOG_ID), + eq(TABLE_ID), + isNull(), + isNull(), + isNull(), + isNull(), + any(PageToken.class))) + .thenReturn(Page.fromItems(List.of())); + + Response response = + service.listTableMetrics( + CATALOG, + NAMESPACE, + TABLE, + "scan", + null, + null, + null, + null, + null, + null, + realmContext, + securityContext); + + assertThat(response.getStatus()).isEqualTo(200); + MetricsListResponse body = (MetricsListResponse) response.getEntity(); + assertThat(body.nextPageToken()).isNull(); + } + + // ── commit ──────────────────────────────────────────────────────────────── + + @Test + @SuppressWarnings("unchecked") + void listCommitMetricsReturns200WithReports() { + CommitMetricsRecord r = commitRecord("c-1"); + when(persistence.listCommitReports( + eq(CATALOG_ID), + eq(TABLE_ID), + isNull(), + isNull(), + isNull(), + isNull(), + any(PageToken.class))) + .thenReturn(Page.fromItems(List.of(r))); + + Response response = + service.listTableMetrics( + CATALOG, + NAMESPACE, + TABLE, + "commit", + null, + 10, + null, + null, + null, + null, + realmContext, + securityContext); + + assertThat(response.getStatus()).isEqualTo(200); + MetricsListResponse body = (MetricsListResponse) response.getEntity(); + assertThat(body.metricType()).isEqualTo("commit"); + assertThat(body.reports()).hasSize(1); + } + + // ── bad requests ────────────────────────────────────────────────────────── + + @Test + void invalidMetricTypeThrowsIllegalArgument() { + // No persistence stub needed — invalid metricType is rejected before querying persistence. + org.assertj.core.api.Assertions.assertThatThrownBy( + () -> + service.listTableMetrics( + CATALOG, + NAMESPACE, + TABLE, + "bogus", + null, + 10, + null, + null, + null, + null, + realmContext, + securityContext)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("bogus"); + } + + @Test + void emptyNamespaceThrowsIllegalArgument() { + org.assertj.core.api.Assertions.assertThatThrownBy( + () -> + service.listTableMetrics( + CATALOG, + "", + TABLE, + "scan", + null, + 10, + null, + null, + null, + null, + realmContext, + securityContext)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("namespace"); + } + + @Test + void nullNamespaceThrowsIllegalArgument() { + org.assertj.core.api.Assertions.assertThatThrownBy( + () -> + service.listTableMetrics( + CATALOG, + null, + TABLE, + "scan", + null, + 10, + null, + null, + null, + null, + realmContext, + securityContext)) + .isInstanceOf(IllegalArgumentException.class); + } + + // ── namespace decoding ───────────────────────────────────────────────────── + + @Test + @SuppressWarnings("unchecked") + void singleLevelNamespaceDecoded() { + when(persistence.listScanReports(anyLong(), anyLong(), any(), any(), any(), any(), any())) + .thenReturn(Page.fromItems(List.of())); + + // Single-level namespace — no unit separator. + Response response = + service.listTableMetrics( + CATALOG, + "mydb", + TABLE, + "scan", + null, + 10, + null, + null, + null, + null, + realmContext, + securityContext); + + // Reaches persistence → namespace was decoded correctly. + assertThat(response.getStatus()).isEqualTo(200); + } + + @Test + @SuppressWarnings("unchecked") + void multiLevelNamespaceWithUnitSeparatorDecoded() { + when(persistence.listScanReports(anyLong(), anyLong(), any(), any(), any(), any(), any())) + .thenReturn(Page.fromItems(List.of())); + + // Two-level namespace separated by unit separator (0x1F), same as NAMESPACE constant. + Response response = + service.listTableMetrics( + CATALOG, + "level1\u001Flevel2", + TABLE, + "scan", + null, + 10, + null, + null, + null, + null, + realmContext, + securityContext); + + assertThat(response.getStatus()).isEqualTo(200); + } + + // ── filter propagation ──────────────────────────────────────────────────── + + @Test + @SuppressWarnings("unchecked") + void scanFiltersPropagateToPersistence() { + long snapshotId = 99L; + long tsFrom = 1_000L; + long tsTo = 9_000L; + String principal = "alice"; + + when(persistence.listScanReports( + eq(CATALOG_ID), + eq(TABLE_ID), + eq(snapshotId), + eq(principal), + eq(tsFrom), + eq(tsTo), + any(PageToken.class))) + .thenReturn(Page.fromItems(List.of())); + + Response response = + service.listTableMetrics( + CATALOG, + NAMESPACE, + TABLE, + "scan", + null, + 10, + snapshotId, + principal, + tsFrom, + tsTo, + realmContext, + securityContext); + + assertThat(response.getStatus()).isEqualTo(200); + verify(persistence) + .listScanReports( + eq(CATALOG_ID), + eq(TABLE_ID), + eq(snapshotId), + eq(principal), + eq(tsFrom), + eq(tsTo), + any(PageToken.class)); + } + + @Test + @SuppressWarnings("unchecked") + void commitFiltersPropagateToPersistence() { + long snapshotId = 77L; + long tsFrom = 500L; + long tsTo = 5_000L; + String principal = "bob"; + + when(persistence.listCommitReports( + eq(CATALOG_ID), + eq(TABLE_ID), + eq(snapshotId), + eq(principal), + eq(tsFrom), + eq(tsTo), + any(PageToken.class))) + .thenReturn(Page.fromItems(List.of())); + + Response response = + service.listTableMetrics( + CATALOG, + NAMESPACE, + TABLE, + "commit", + null, + 10, + snapshotId, + principal, + tsFrom, + tsTo, + realmContext, + securityContext); + + assertThat(response.getStatus()).isEqualTo(200); + verify(persistence) + .listCommitReports( + eq(CATALOG_ID), + eq(TABLE_ID), + eq(snapshotId), + eq(principal), + eq(tsFrom), + eq(tsTo), + any(PageToken.class)); + } + + // ── wrong token type → 400 ──────────────────────────────────────────────── + + @Test + @SuppressWarnings("unchecked") + void wrongPageTokenTypeScanPropagatesIllegalArgument() { + // Persistence throws IllegalArgumentException when the page token carries a cursor of the + // wrong type (e.g. EntityIdToken recycled from a different list operation). + // IcebergExceptionMapper maps IllegalArgumentException to HTTP 400. + when(persistence.listScanReports(anyLong(), anyLong(), any(), any(), any(), any(), any())) + .thenThrow( + new IllegalArgumentException( + "pageToken contains a cursor of an unexpected type; expected MetricsReportToken")); + + org.assertj.core.api.Assertions.assertThatThrownBy( + () -> + service.listTableMetrics( + CATALOG, + NAMESPACE, + TABLE, + "scan", + null, + 10, + null, + null, + null, + null, + realmContext, + securityContext)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("unexpected type"); + } + + @Test + void wrongPageTokenTypeCommitPropagatesIllegalArgument() { + when(persistence.listCommitReports(anyLong(), anyLong(), any(), any(), any(), any(), any())) + .thenThrow( + new IllegalArgumentException( + "pageToken contains a cursor of an unexpected type; expected MetricsReportToken")); + + org.assertj.core.api.Assertions.assertThatThrownBy( + () -> + service.listTableMetrics( + CATALOG, + NAMESPACE, + TABLE, + "commit", + null, + 10, + null, + null, + null, + null, + realmContext, + securityContext)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("unexpected type"); + } + + // ── envelope shape ─────────────────────────────────────────────────────── + + @Test + @SuppressWarnings("unchecked") + void scanReportHasEnvelopeStructure() { + ScanMetricsRecord r = scanRecord("r-envelope"); + when(persistence.listScanReports(anyLong(), anyLong(), any(), any(), any(), any(), any())) + .thenReturn(Page.fromItems(List.of(r))); + + Response response = + service.listTableMetrics( + CATALOG, + NAMESPACE, + TABLE, + "scan", + null, + 10, + null, + null, + null, + null, + realmContext, + securityContext); + + assertThat(response.getStatus()).isEqualTo(200); + @SuppressWarnings("unchecked") + MetricsListResponse body = + (MetricsListResponse) response.getEntity(); + ScanMetricsReport report = body.reports().get(0); + + assertThat(report.id()).isEqualTo("r-envelope"); + assertThat(report.timestampMs()).isPositive(); + assertThat(report.actor().principalName()).isEqualTo("alice"); + assertThat(report.request().requestId()).isEqualTo("req-1"); + assertThat(report.object()).isNotNull(); + assertThat(report.payload().type()).isEqualTo("iceberg.metrics.scan"); + assertThat(report.payload().version()).isEqualTo(1); + assertThat(report.payload().data()).isNotNull(); + } + + @Test + @SuppressWarnings("unchecked") + void commitReportHasEnvelopeStructure() { + CommitMetricsRecord r = commitRecord("c-envelope"); + when(persistence.listCommitReports(anyLong(), anyLong(), any(), any(), any(), any(), any())) + .thenReturn(Page.fromItems(List.of(r))); + + Response response = + service.listTableMetrics( + CATALOG, + NAMESPACE, + TABLE, + "commit", + null, + 10, + null, + null, + null, + null, + realmContext, + securityContext); + + assertThat(response.getStatus()).isEqualTo(200); + @SuppressWarnings("unchecked") + MetricsListResponse body = + (MetricsListResponse) response.getEntity(); + CommitMetricsReport report = body.reports().get(0); + + assertThat(report.id()).isEqualTo("c-envelope"); + assertThat(report.actor()).isNotNull(); + assertThat(report.request()).isNotNull(); + assertThat(report.object().snapshotId()).isEqualTo(42L); + assertThat(report.payload().type()).isEqualTo("iceberg.metrics.commit"); + assertThat(report.payload().version()).isEqualTo(1); + assertThat(report.payload().data().operation()).isEqualTo("append"); + } + + // ── authorization (403) ─────────────────────────────────────────────────── + + @Test + void unauthorizedPrincipalPropagatesForbidden() { + // Configure persistence so resolution succeeds, but the authorizer denies. + when(persistence.listScanReports(anyLong(), anyLong(), any(), any(), any(), any(), any())) + .thenReturn(Page.fromItems(List.of())); + doThrow(new ForbiddenException("insufficient privileges")) + .when(authorizer) + .authorizeOrThrow( + any(PolarisPrincipal.class), + any(Set.class), + any(PolarisAuthorizableOperation.class), + any(), + (org.apache.polaris.core.persistence.PolarisResolvedPathWrapper) isNull()); + + org.assertj.core.api.Assertions.assertThatThrownBy( + () -> + service.listTableMetrics( + CATALOG, + NAMESPACE, + TABLE, + "scan", + null, + 10, + null, + null, + null, + null, + realmContext, + securityContext)) + .isInstanceOf(ForbiddenException.class); + } + + // ── not found (404) ─────────────────────────────────────────────────────── + + @Test + void tableNotFoundPropagatesNotFoundException() { + // Manifest resolves but getResolvedPath returns null (table not found). + when(manifest.getResolvedPath( + any(ResolvedPathKey.class), eq(PolarisEntitySubType.ANY_SUBTYPE), eq(true))) + .thenReturn(null); + + org.assertj.core.api.Assertions.assertThatThrownBy( + () -> + service.listTableMetrics( + CATALOG, + NAMESPACE, + TABLE, + "scan", + null, + 10, + null, + null, + null, + null, + realmContext, + securityContext)) + .isInstanceOf(org.apache.iceberg.exceptions.NotFoundException.class) + .hasMessageContaining(TABLE); + } + + @Test + void catalogNotFoundPropagatesNotFoundException() { + // Top-level entity (catalog) could not be resolved. + when(manifest.resolveAll()).thenReturn(new ResolverStatus(PolarisEntityType.CATALOG, CATALOG)); + + org.assertj.core.api.Assertions.assertThatThrownBy( + () -> + service.listTableMetrics( + CATALOG, + NAMESPACE, + TABLE, + "scan", + null, + 10, + null, + null, + null, + null, + realmContext, + securityContext)) + .isInstanceOf(org.apache.iceberg.exceptions.NotFoundException.class) + .hasMessageContaining(CATALOG); + } + + @Test + void namespaceOrTablePathNotFoundPropagatesNotFoundException() { + // PATH_COULD_NOT_BE_FULLY_RESOLVED — namespace or table segment not found. + ResolverPath failedPath = new ResolverPath(List.of(NAMESPACE), PolarisEntityType.NAMESPACE); + when(manifest.resolveAll()).thenReturn(new ResolverStatus(failedPath, 0)); + + org.assertj.core.api.Assertions.assertThatThrownBy( + () -> + service.listTableMetrics( + CATALOG, + NAMESPACE, + TABLE, + "scan", + null, + 10, + null, + null, + null, + null, + realmContext, + securityContext)) + .isInstanceOf(org.apache.iceberg.exceptions.NotFoundException.class); + } + + // ── helpers ─────────────────────────────────────────────────────────────── + + private static ScanMetricsRecord scanRecord(String reportId) { + return ScanMetricsRecord.builder() + .reportId(reportId) + .catalogId(CATALOG_ID) + .tableId(TABLE_ID) + .timestamp(Instant.ofEpochMilli(1_000_000L)) + .metadata(Map.of()) + .principalName("alice") + .requestId("req-1") + .otelTraceId(null) + .otelSpanId(null) + .snapshotId(Optional.empty()) + .schemaId(Optional.empty()) + .filterExpression(Optional.empty()) + .projectedFieldIds(List.of()) + .projectedFieldNames(List.of()) + .resultDataFiles(1L) + .resultDeleteFiles(0L) + .totalFileSizeBytes(1024L) + .totalDataManifests(1L) + .totalDeleteManifests(0L) + .scannedDataManifests(1L) + .scannedDeleteManifests(0L) + .skippedDataManifests(0L) + .skippedDeleteManifests(0L) + .skippedDataFiles(0L) + .skippedDeleteFiles(0L) + .totalPlanningDurationMs(50L) + .equalityDeleteFiles(0L) + .positionalDeleteFiles(0L) + .indexedDeleteFiles(0L) + .totalDeleteFileSizeBytes(0L) + .build(); + } + + private static CommitMetricsRecord commitRecord(String reportId) { + return CommitMetricsRecord.builder() + .reportId(reportId) + .catalogId(CATALOG_ID) + .tableId(TABLE_ID) + .timestamp(Instant.ofEpochMilli(2_000_000L)) + .metadata(Map.of()) + .principalName("bob") + .requestId("req-2") + .otelTraceId(null) + .otelSpanId(null) + .snapshotId(42L) + .sequenceNumber(Optional.empty()) + .operation("append") + .addedDataFiles(1L) + .removedDataFiles(0L) + .totalDataFiles(10L) + .addedDeleteFiles(0L) + .removedDeleteFiles(0L) + .totalDeleteFiles(0L) + .addedEqualityDeleteFiles(0L) + .removedEqualityDeleteFiles(0L) + .addedPositionalDeleteFiles(0L) + .removedPositionalDeleteFiles(0L) + .addedRecords(100L) + .removedRecords(0L) + .totalRecords(1000L) + .addedFileSizeBytes(2048L) + .removedFileSizeBytes(0L) + .totalFileSizeBytes(20480L) + .totalDurationMs(Optional.empty()) + .attempts(1) + .build(); + } +} diff --git a/gradle/projects.main.properties b/gradle/projects.main.properties index 83327ffa08..0c5f36131c 100644 --- a/gradle/projects.main.properties +++ b/gradle/projects.main.properties @@ -23,6 +23,7 @@ polaris-core=polaris-core polaris-api-iceberg-service=api/iceberg-service polaris-api-management-model=api/management-model polaris-api-management-service=api/management-service +polaris-api-metrics-reports-service=api/metrics-reports-service polaris-api-catalog-service=api/polaris-catalog-service polaris-runtime-defaults=runtime/defaults polaris-runtime-service=runtime/service @@ -47,6 +48,7 @@ polaris-extensions-federation-hive=extensions/federation/hive polaris-extensions-federation-bigquery=extensions/federation/bigquery polaris-extensions-auth-opa=extensions/auth/opa/impl polaris-extensions-auth-opa-tests=extensions/auth/opa/tests +polaris-extensions-metrics-reports=extensions/metrics-reports/impl polaris-extensions-auth-ranger=extensions/auth/ranger/impl polaris-extensions-auth-ranger-tests=extensions/auth/ranger/tests diff --git a/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/JdbcBasePersistenceImpl.java b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/JdbcBasePersistenceImpl.java index bbc917bc0b..643c358cb6 100644 --- a/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/JdbcBasePersistenceImpl.java +++ b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/JdbcBasePersistenceImpl.java @@ -78,6 +78,7 @@ import org.apache.polaris.persistence.relational.jdbc.models.ModelPrincipalAuthenticationData; import org.apache.polaris.persistence.relational.jdbc.models.ModelScanMetricsReport; import org.apache.polaris.persistence.relational.jdbc.models.SchemaVersion; +import org.apache.polaris.persistence.relational.jdbc.pagination.MetricsReportToken; import org.jspecify.annotations.NonNull; import org.jspecify.annotations.Nullable; import org.slf4j.Logger; @@ -1347,4 +1348,141 @@ private void writeCommitMetricsReport(@NonNull ModelCommitMetricsReport report) String.format("Failed to write commit metrics report due to %s", e.getMessage()), e); } } + + @Override + public Page listScanReports( + long catalogId, + long tableId, + @Nullable Long snapshotId, + @Nullable String principalName, + @Nullable Long timestampFrom, + @Nullable Long timestampTo, + @NonNull PageToken pageToken) { + try { + PreparedQuery query = + buildMetricsQuery( + ModelScanMetricsReport.TABLE_NAME, + catalogId, + tableId, + snapshotId, + principalName, + timestampFrom, + timestampTo, + pageToken); + List rows = + datasourceOperations.executeSelect(query, ModelScanMetricsReport.CONVERTER); + return Page.mapped( + pageToken, + rows.stream().map(ModelScanMetricsReport::toRecord), + Function.identity(), + MetricsReportToken::fromRecord); + } catch (SQLException e) { + throw new RuntimeException( + String.format("Failed to list scan metrics reports: %s", e.getMessage()), e); + } + } + + @Override + public Page listCommitReports( + long catalogId, + long tableId, + @Nullable Long snapshotId, + @Nullable String principalName, + @Nullable Long timestampFrom, + @Nullable Long timestampTo, + @NonNull PageToken pageToken) { + try { + PreparedQuery query = + buildMetricsQuery( + ModelCommitMetricsReport.TABLE_NAME, + catalogId, + tableId, + snapshotId, + principalName, + timestampFrom, + timestampTo, + pageToken); + List rows = + datasourceOperations.executeSelect(query, ModelCommitMetricsReport.CONVERTER); + return Page.mapped( + pageToken, + rows.stream().map(ModelCommitMetricsReport::toRecord), + Function.identity(), + MetricsReportToken::fromRecord); + } catch (SQLException e) { + throw new RuntimeException( + String.format("Failed to list commit metrics reports: %s", e.getMessage()), e); + } + } + + /** + * Builds a parameterized SELECT query for a metrics report table using keyset pagination. + * + *

Rows are ordered by {@code (timestamp_ms DESC, report_id DESC)}. The cursor from {@link + * MetricsReportToken} drives the keyset predicate: {@code (timestamp_ms < cursorTs) OR + * (timestamp_ms = cursorTs AND report_id < cursorId)}. + */ + private PreparedQuery buildMetricsQuery( + String tableName, + long catalogId, + long tableId, + @Nullable Long snapshotId, + @Nullable String principalName, + @Nullable Long timestampFrom, + @Nullable Long timestampTo, + PageToken pageToken) { + StringBuilder sql = new StringBuilder("SELECT * FROM "); + sql.append(QueryGenerator.getFullyQualifiedTableName(tableName)); + sql.append(" WHERE realm_id = ? AND catalog_id = ? AND table_id = ?"); + + List params = new ArrayList<>(); + params.add(realmId); + params.add(catalogId); + params.add(tableId); + + if (snapshotId != null) { + sql.append(" AND snapshot_id = ?"); + params.add(snapshotId); + } + if (principalName != null) { + sql.append(" AND principal_name = ?"); + params.add(principalName); + } + if (timestampFrom != null) { + sql.append(" AND timestamp_ms >= ?"); + params.add(timestampFrom); + } + if (timestampTo != null) { + sql.append(" AND timestamp_ms < ?"); + params.add(timestampTo); + } + + if (pageToken.paginationRequested()) { + // If a token value is present it must be a MetricsReportToken; any other type indicates a + // caller bug (e.g. recycling a token from a different list operation). + if (pageToken.value().isPresent() && pageToken.valueAs(MetricsReportToken.class).isEmpty()) { + throw new IllegalArgumentException( + "pageToken contains a cursor of an unexpected type; expected MetricsReportToken"); + } + pageToken + .valueAs(MetricsReportToken.class) + .ifPresent( + cursor -> { + sql.append(" AND (timestamp_ms < ? OR (timestamp_ms = ? AND report_id < ?))"); + params.add(cursor.timestampMs()); + params.add(cursor.timestampMs()); + params.add(cursor.reportId()); + }); + } + + sql.append(" ORDER BY timestamp_ms DESC, report_id DESC"); + + // Fetch one extra row so Page.mapped() can determine whether a next page exists. + // Always apply a LIMIT: use the requested pageSize, or default to 100 if absent. + int limit = pageToken.pageSize().orElse(100); + sql.append(" LIMIT ?"); + params.add(limit + 1); + + return new PreparedQuery(sql.toString(), params); + } } diff --git a/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/models/MetricsModelUtils.java b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/models/MetricsModelUtils.java new file mode 100644 index 0000000000..511416775b --- /dev/null +++ b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/models/MetricsModelUtils.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.persistence.relational.jdbc.models; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.util.Map; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Shared utilities for metrics model classes. */ +public final class MetricsModelUtils { + + private static final Logger LOGGER = LoggerFactory.getLogger(MetricsModelUtils.class); + + public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + private MetricsModelUtils() {} + + /** + * Parses a JSON string into a {@code Map}. + * + * @param json the JSON string to parse; may be null or empty + * @return the parsed map, or an empty map if the input is null, empty, or unparseable + */ + public static Map parseMetadata(String json) { + if (json == null || json.isEmpty() || json.equals("{}")) { + return Map.of(); + } + try { + return OBJECT_MAPPER.readValue(json, new TypeReference>() {}); + } catch (JsonProcessingException e) { + LOGGER.warn("Failed to parse metadata JSON: {}", e.getMessage()); + return Map.of(); + } + } +} diff --git a/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/models/ModelCommitMetricsReport.java b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/models/ModelCommitMetricsReport.java index 03a9096fae..86c2d97e38 100644 --- a/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/models/ModelCommitMetricsReport.java +++ b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/models/ModelCommitMetricsReport.java @@ -19,12 +19,13 @@ package org.apache.polaris.persistence.relational.jdbc.models; import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; import java.sql.ResultSet; import java.sql.SQLException; +import java.time.Instant; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import org.apache.polaris.core.persistence.metrics.CommitMetricsRecord; import org.apache.polaris.immutables.PolarisImmutable; import org.apache.polaris.persistence.relational.jdbc.DatabaseType; @@ -250,8 +251,6 @@ default Map toMap(DatabaseType databaseType) { // === Static conversion methods (following ModelEntity pattern) === - ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - /** * Converts a CommitMetricsRecord (SPI) to ModelCommitMetricsReport (JDBC). * @@ -309,12 +308,54 @@ private static String toJsonString(Map map) { return "{}"; } try { - return OBJECT_MAPPER.writeValueAsString(map); + return MetricsModelUtils.OBJECT_MAPPER.writeValueAsString(map); } catch (JsonProcessingException e) { return "{}"; } } + /** + * Converts this JDBC model back to the backend-agnostic SPI record. + * + * @return a CommitMetricsRecord built from this model's fields + */ + default CommitMetricsRecord toRecord() { + return CommitMetricsRecord.builder() + .reportId(getReportId()) + .catalogId(getCatalogId()) + .tableId(getTableId()) + .timestamp(Instant.ofEpochMilli(getTimestampMs())) + .metadata(MetricsModelUtils.parseMetadata(getMetadata())) + .principalName(getPrincipalName()) + .requestId(getRequestId()) + .otelTraceId(getOtelTraceId()) + .otelSpanId(getOtelSpanId()) + .snapshotId(getSnapshotId()) + .sequenceNumber(Optional.ofNullable(getSequenceNumber())) + .operation(getOperation()) + .addedDataFiles(getAddedDataFiles()) + .removedDataFiles(getRemovedDataFiles()) + .totalDataFiles(getTotalDataFiles()) + .addedDeleteFiles(getAddedDeleteFiles()) + .removedDeleteFiles(getRemovedDeleteFiles()) + .totalDeleteFiles(getTotalDeleteFiles()) + .addedEqualityDeleteFiles(getAddedEqualityDeleteFiles()) + .removedEqualityDeleteFiles(getRemovedEqualityDeleteFiles()) + .addedPositionalDeleteFiles(getAddedPositionalDeleteFiles()) + .removedPositionalDeleteFiles(getRemovedPositionalDeleteFiles()) + .addedRecords(getAddedRecords()) + .removedRecords(getRemovedRecords()) + .totalRecords(getTotalRecords()) + .addedFileSizeBytes(getAddedFileSizeBytes()) + .removedFileSizeBytes(getRemovedFileSizeBytes()) + .totalFileSizeBytes(getTotalFileSizeBytes()) + // The write path stores Optional.empty() as 0L; treat 0 as "unknown" on read. + .totalDurationMs( + getTotalDurationMs() == 0L ? Optional.empty() : Optional.of(getTotalDurationMs())) + .attempts(getAttempts()) + .build(); + } + /** Dummy instance to be used as a Converter when calling fromResultSet(). */ ModelCommitMetricsReport CONVERTER = ImmutableModelCommitMetricsReport.builder() diff --git a/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/models/ModelScanMetricsReport.java b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/models/ModelScanMetricsReport.java index 93115aeac1..6d6022cdab 100644 --- a/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/models/ModelScanMetricsReport.java +++ b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/models/ModelScanMetricsReport.java @@ -19,12 +19,15 @@ package org.apache.polaris.persistence.relational.jdbc.models; import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; import java.sql.ResultSet; import java.sql.SQLException; +import java.time.Instant; +import java.util.Arrays; +import java.util.Collections; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.stream.Collectors; import org.apache.polaris.core.persistence.metrics.ScanMetricsRecord; import org.apache.polaris.immutables.PolarisImmutable; @@ -251,8 +254,6 @@ default Map toMap(DatabaseType databaseType) { // === Static conversion methods (following ModelEntity pattern) === - ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - /** * Converts a ScanMetricsRecord (SPI) to ModelScanMetricsReport (JDBC). * @@ -317,12 +318,76 @@ private static String toJsonString(Map map) { return "{}"; } try { - return OBJECT_MAPPER.writeValueAsString(map); + return MetricsModelUtils.OBJECT_MAPPER.writeValueAsString(map); } catch (JsonProcessingException e) { return "{}"; } } + /** + * Converts this JDBC model back to the backend-agnostic SPI record. + * + * @return a ScanMetricsRecord built from this model's fields + */ + default ScanMetricsRecord toRecord() { + String rawFieldIds = getProjectedFieldIds(); + String rawFieldNames = getProjectedFieldNames(); + List fieldIds = + rawFieldIds == null || rawFieldIds.isEmpty() + ? Collections.emptyList() + : Arrays.stream(rawFieldIds.split(",")) + .map(String::trim) + .filter(s -> !s.isEmpty()) + .flatMap( + s -> { + try { + return java.util.stream.Stream.of(Integer.parseInt(s)); + } catch (NumberFormatException e) { + return java.util.stream.Stream.empty(); + } + }) + .collect(Collectors.toList()); + List fieldNames = + rawFieldNames == null || rawFieldNames.isEmpty() + ? Collections.emptyList() + : Arrays.stream(rawFieldNames.split(",")) + .map(String::trim) + .filter(s -> !s.isEmpty()) + .collect(Collectors.toList()); + return ScanMetricsRecord.builder() + .reportId(getReportId()) + .catalogId(getCatalogId()) + .tableId(getTableId()) + .timestamp(Instant.ofEpochMilli(getTimestampMs())) + .metadata(MetricsModelUtils.parseMetadata(getMetadata())) + .principalName(getPrincipalName()) + .requestId(getRequestId()) + .otelTraceId(getOtelTraceId()) + .otelSpanId(getOtelSpanId()) + .snapshotId(Optional.ofNullable(getSnapshotId())) + .schemaId(Optional.ofNullable(getSchemaId())) + .filterExpression(Optional.ofNullable(getFilterExpression())) + .projectedFieldIds(fieldIds) + .projectedFieldNames(fieldNames) + .resultDataFiles(getResultDataFiles()) + .resultDeleteFiles(getResultDeleteFiles()) + .totalFileSizeBytes(getTotalFileSizeBytes()) + .totalDataManifests(getTotalDataManifests()) + .totalDeleteManifests(getTotalDeleteManifests()) + .scannedDataManifests(getScannedDataManifests()) + .scannedDeleteManifests(getScannedDeleteManifests()) + .skippedDataManifests(getSkippedDataManifests()) + .skippedDeleteManifests(getSkippedDeleteManifests()) + .skippedDataFiles(getSkippedDataFiles()) + .skippedDeleteFiles(getSkippedDeleteFiles()) + .totalPlanningDurationMs(getTotalPlanningDurationMs()) + .equalityDeleteFiles(getEqualityDeleteFiles()) + .positionalDeleteFiles(getPositionalDeleteFiles()) + .indexedDeleteFiles(getIndexedDeleteFiles()) + .totalDeleteFileSizeBytes(getTotalDeleteFileSizeBytes()) + .build(); + } + /** Dummy instance to be used as a Converter when calling fromResultSet(). */ ModelScanMetricsReport CONVERTER = ImmutableModelScanMetricsReport.builder() diff --git a/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/pagination/MetricsReportToken.java b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/pagination/MetricsReportToken.java new file mode 100644 index 0000000000..a438b72cff --- /dev/null +++ b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/pagination/MetricsReportToken.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.persistence.relational.jdbc.pagination; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; +import jakarta.annotation.Nullable; +import org.apache.polaris.core.persistence.metrics.MetricsRecordIdentity; +import org.apache.polaris.core.persistence.pagination.Token; +import org.apache.polaris.immutables.PolarisImmutable; + +/** + * Pagination {@linkplain Token token} for metrics reports, backed by {@code (timestamp_ms, + * report_id)}. + * + *

Metrics reports are sorted by {@code (timestamp_ms DESC, report_id DESC)}. The cursor encodes + * the last-seen {@code (timestamp_ms, report_id)} pair, enabling stable keyset pagination that + * remains correct under concurrent inserts. + */ +@PolarisImmutable +@JsonSerialize(as = ImmutableMetricsReportToken.class) +@JsonDeserialize(as = ImmutableMetricsReportToken.class) +public interface MetricsReportToken extends Token { + // Registered token type IDs (must be unique across all Token implementations): + // "e" - EntityIdToken + // "m" - MetricsReportToken (this class) + String ID = "m"; + + @JsonProperty("ts") + long timestampMs(); + + @JsonProperty("id") + String reportId(); + + @Override + default String getT() { + return ID; + } + + static @Nullable MetricsReportToken fromRecord(@Nullable MetricsRecordIdentity record) { + if (record == null) { + return null; + } + return ImmutableMetricsReportToken.builder() + .timestampMs(record.timestamp().toEpochMilli()) + .reportId(record.reportId()) + .build(); + } + + final class MetricsReportTokenType implements TokenType { + @Override + public String id() { + return ID; + } + + @Override + public Class javaType() { + return MetricsReportToken.class; + } + } +} diff --git a/persistence/relational-jdbc/src/main/resources/META-INF/services/org.apache.polaris.core.persistence.pagination.Token$TokenType b/persistence/relational-jdbc/src/main/resources/META-INF/services/org.apache.polaris.core.persistence.pagination.Token$TokenType new file mode 100644 index 0000000000..05b16a4e39 --- /dev/null +++ b/persistence/relational-jdbc/src/main/resources/META-INF/services/org.apache.polaris.core.persistence.pagination.Token$TokenType @@ -0,0 +1,20 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +org.apache.polaris.persistence.relational.jdbc.pagination.MetricsReportToken$MetricsReportTokenType diff --git a/persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/models/ModelCommitMetricsReportTest.java b/persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/models/ModelCommitMetricsReportTest.java index bdbb540853..22574a46bc 100644 --- a/persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/models/ModelCommitMetricsReportTest.java +++ b/persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/models/ModelCommitMetricsReportTest.java @@ -239,6 +239,97 @@ public void testConverterFromResultSet() throws SQLException { assertEquals(TEST_METADATA, result.getMetadata()); } + @Test + public void testToRecordAllFieldsRoundTrip() { + ModelCommitMetricsReport model = createTestReport(); + + org.apache.polaris.core.persistence.metrics.CommitMetricsRecord record = model.toRecord(); + + assertEquals(TEST_REPORT_ID, record.reportId()); + assertEquals(TEST_CATALOG_ID, record.catalogId()); + assertEquals(TEST_TABLE_ID, record.tableId()); + assertEquals(TEST_TIMESTAMP_MS, record.timestamp().toEpochMilli()); + assertEquals(TEST_PRINCIPAL, record.principalName()); + assertEquals(TEST_REQUEST_ID, record.requestId()); + assertEquals(TEST_OTEL_TRACE_ID, record.otelTraceId()); + assertEquals(TEST_SNAPSHOT_ID, record.snapshotId()); + assertEquals(java.util.Optional.of(TEST_SEQUENCE_NUMBER), record.sequenceNumber()); + assertEquals(TEST_OPERATION, record.operation()); + assertEquals(TEST_ADDED_DATA_FILES, record.addedDataFiles()); + assertEquals(TEST_REMOVED_DATA_FILES, record.removedDataFiles()); + assertEquals(TEST_TOTAL_DATA_FILES, record.totalDataFiles()); + assertEquals(TEST_ADDED_RECORDS, record.addedRecords()); + assertEquals(TEST_REMOVED_RECORDS, record.removedRecords()); + assertEquals(TEST_TOTAL_RECORDS, record.totalRecords()); + assertEquals(TEST_ADDED_FILE_SIZE, record.addedFileSizeBytes()); + assertEquals(TEST_REMOVED_FILE_SIZE, record.removedFileSizeBytes()); + assertEquals(TEST_TOTAL_FILE_SIZE, record.totalFileSizeBytes()); + // Non-zero totalDurationMs must be present in the Optional. + assertEquals(java.util.Optional.of(TEST_TOTAL_DURATION), record.totalDurationMs()); + assertEquals(TEST_ATTEMPTS, record.attempts()); + } + + @Test + public void testToRecordZeroDurationIsUnknown() { + // The write path stores Optional.empty() as 0L. On read, 0 must come back as empty, not 0. + ModelCommitMetricsReport model = + ImmutableModelCommitMetricsReport.builder() + .from(createTestReport()) + .totalDurationMs(0L) + .build(); + + org.apache.polaris.core.persistence.metrics.CommitMetricsRecord record = model.toRecord(); + + assertEquals(java.util.Optional.empty(), record.totalDurationMs()); + } + + @Test + public void testToRecordNullSequenceNumber() { + ModelCommitMetricsReport model = + ImmutableModelCommitMetricsReport.builder() + .from(createTestReport()) + .sequenceNumber(null) + .build(); + + org.apache.polaris.core.persistence.metrics.CommitMetricsRecord record = model.toRecord(); + + assertEquals(java.util.Optional.empty(), record.sequenceNumber()); + } + + @Test + public void testToRecordMetadataDeserialized() { + ModelCommitMetricsReport model = + ImmutableModelCommitMetricsReport.builder() + .from(createTestReport()) + .metadata("{\"env\":\"staging\",\"team\":\"data\"}") + .build(); + + org.apache.polaris.core.persistence.metrics.CommitMetricsRecord record = model.toRecord(); + + assertEquals("staging", record.metadata().get("env")); + assertEquals("data", record.metadata().get("team")); + } + + @Test + public void testToRecordEmptyMetadataReturnsEmptyMap() { + ModelCommitMetricsReport model = + ImmutableModelCommitMetricsReport.builder().from(createTestReport()).metadata("{}").build(); + + org.apache.polaris.core.persistence.metrics.CommitMetricsRecord record = model.toRecord(); + + assertEquals(java.util.Map.of(), record.metadata()); + } + + @Test + public void testToRecordNullMetadataReturnsEmptyMap() { + ModelCommitMetricsReport model = + ImmutableModelCommitMetricsReport.builder().from(createTestReport()).metadata(null).build(); + + org.apache.polaris.core.persistence.metrics.CommitMetricsRecord record = model.toRecord(); + + assertEquals(java.util.Map.of(), record.metadata()); + } + private ModelCommitMetricsReport createTestReport() { return ImmutableModelCommitMetricsReport.builder() .reportId(TEST_REPORT_ID) diff --git a/persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/models/ModelScanMetricsReportTest.java b/persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/models/ModelScanMetricsReportTest.java index f8068d9e04..1e456e51bf 100644 --- a/persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/models/ModelScanMetricsReportTest.java +++ b/persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/models/ModelScanMetricsReportTest.java @@ -237,6 +237,127 @@ public void testConverterFromResultSet() throws SQLException { assertEquals(TEST_METADATA, result.getMetadata()); } + @Test + public void testToRecordAllFieldsRoundTrip() { + // Build a model with ALL optional fields populated so every toRecord() branch is exercised. + ModelScanMetricsReport model = + ImmutableModelScanMetricsReport.builder() + .from(createTestReport()) + .schemaId(TEST_SCHEMA_ID) + .filterExpression(TEST_FILTER) + .projectedFieldIds(TEST_PROJECTED_IDS) + .projectedFieldNames(TEST_PROJECTED_NAMES) + .otelSpanId(TEST_OTEL_SPAN_ID) + .build(); + + org.apache.polaris.core.persistence.metrics.ScanMetricsRecord record = model.toRecord(); + + assertEquals(TEST_REPORT_ID, record.reportId()); + assertEquals(TEST_CATALOG_ID, record.catalogId()); + assertEquals(TEST_TABLE_ID, record.tableId()); + assertEquals(TEST_TIMESTAMP_MS, record.timestamp().toEpochMilli()); + assertEquals(TEST_PRINCIPAL, record.principalName()); + assertEquals(TEST_REQUEST_ID, record.requestId()); + assertEquals(TEST_OTEL_TRACE_ID, record.otelTraceId()); + assertEquals(java.util.Optional.of(TEST_SNAPSHOT_ID), record.snapshotId()); + assertEquals(java.util.Optional.of(TEST_SCHEMA_ID), record.schemaId()); + assertEquals(java.util.Optional.of(TEST_FILTER), record.filterExpression()); + assertEquals(java.util.List.of(1, 2, 3), record.projectedFieldIds()); + assertEquals(java.util.List.of("id", "name", "value"), record.projectedFieldNames()); + assertEquals(TEST_RESULT_DATA_FILES, record.resultDataFiles()); + assertEquals(TEST_TOTAL_FILE_SIZE, record.totalFileSizeBytes()); + assertEquals(TEST_PLANNING_DURATION, record.totalPlanningDurationMs()); + assertEquals(TEST_EQUALITY_DELETE_FILES, record.equalityDeleteFiles()); + assertEquals(TEST_POSITIONAL_DELETE_FILES, record.positionalDeleteFiles()); + assertEquals(TEST_INDEXED_DELETE_FILES, record.indexedDeleteFiles()); + assertEquals(TEST_DELETE_FILE_SIZE, record.totalDeleteFileSizeBytes()); + } + + @Test + public void testToRecordNullProjectedFields() { + // When projected_field_ids / projected_field_names are null, toRecord() returns empty lists. + ModelScanMetricsReport model = + ImmutableModelScanMetricsReport.builder() + .from(createTestReport()) + .projectedFieldIds(null) + .projectedFieldNames(null) + .build(); + + org.apache.polaris.core.persistence.metrics.ScanMetricsRecord record = model.toRecord(); + + assertEquals(java.util.List.of(), record.projectedFieldIds()); + assertEquals(java.util.List.of(), record.projectedFieldNames()); + } + + @Test + public void testToRecordEmptyProjectedFields() { + // Empty-string projected fields also produce empty lists. + ModelScanMetricsReport model = + ImmutableModelScanMetricsReport.builder() + .from(createTestReport()) + .projectedFieldIds("") + .projectedFieldNames("") + .build(); + + org.apache.polaris.core.persistence.metrics.ScanMetricsRecord record = model.toRecord(); + + assertEquals(java.util.List.of(), record.projectedFieldIds()); + assertEquals(java.util.List.of(), record.projectedFieldNames()); + } + + @Test + public void testToRecordNullOptionalFields() { + // snapshotId, schemaId, filterExpression are all nullable — must come back as Optional.empty(). + ModelScanMetricsReport model = + ImmutableModelScanMetricsReport.builder() + .from(createTestReport()) + .snapshotId(null) + .schemaId(null) + .filterExpression(null) + .build(); + + org.apache.polaris.core.persistence.metrics.ScanMetricsRecord record = model.toRecord(); + + assertEquals(java.util.Optional.empty(), record.snapshotId()); + assertEquals(java.util.Optional.empty(), record.schemaId()); + assertEquals(java.util.Optional.empty(), record.filterExpression()); + } + + @Test + public void testToRecordMetadataDeserialized() { + // Metadata stored as JSON must be deserialized back to a Map on read. + ModelScanMetricsReport model = + ImmutableModelScanMetricsReport.builder() + .from(createTestReport()) + .metadata("{\"env\":\"prod\",\"region\":\"us-east-1\"}") + .build(); + + org.apache.polaris.core.persistence.metrics.ScanMetricsRecord record = model.toRecord(); + + assertEquals("prod", record.metadata().get("env")); + assertEquals("us-east-1", record.metadata().get("region")); + } + + @Test + public void testToRecordEmptyMetadataReturnsEmptyMap() { + ModelScanMetricsReport model = + ImmutableModelScanMetricsReport.builder().from(createTestReport()).metadata("{}").build(); + + org.apache.polaris.core.persistence.metrics.ScanMetricsRecord record = model.toRecord(); + + assertEquals(java.util.Map.of(), record.metadata()); + } + + @Test + public void testToRecordNullMetadataReturnsEmptyMap() { + ModelScanMetricsReport model = + ImmutableModelScanMetricsReport.builder().from(createTestReport()).metadata(null).build(); + + org.apache.polaris.core.persistence.metrics.ScanMetricsRecord record = model.toRecord(); + + assertEquals(java.util.Map.of(), record.metadata()); + } + private ModelScanMetricsReport createTestReport() { return ImmutableModelScanMetricsReport.builder() .reportId(TEST_REPORT_ID) diff --git a/persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/pagination/MetricsReportTokenTest.java b/persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/pagination/MetricsReportTokenTest.java new file mode 100644 index 0000000000..d9bbcf22e4 --- /dev/null +++ b/persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/pagination/MetricsReportTokenTest.java @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.persistence.relational.jdbc.pagination; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.time.Instant; +import java.util.List; +import java.util.function.Function; +import java.util.stream.Stream; +import org.apache.polaris.core.persistence.metrics.MetricsRecordIdentity; +import org.apache.polaris.core.persistence.pagination.Page; +import org.apache.polaris.core.persistence.pagination.PageToken; +import org.junit.jupiter.api.Test; + +class MetricsReportTokenTest { + + @Test + void tokenIdIsM() { + assertThat(MetricsReportToken.ID).isEqualTo("m"); + MetricsReportToken token = + ImmutableMetricsReportToken.builder().timestampMs(1000L).reportId("abc").build(); + assertThat(token.getT()).isEqualTo("m"); + } + + @Test + void tokenTypeRegistration() { + // TokenType must be reachable via ServiceLoader for deserialization to work. + MetricsReportToken.MetricsReportTokenType type = + new MetricsReportToken.MetricsReportTokenType(); + assertThat(type.id()).isEqualTo(MetricsReportToken.ID); + assertThat(type.javaType()).isEqualTo(MetricsReportToken.class); + } + + @Test + void fromRecordNull() { + assertThat(MetricsReportToken.fromRecord(null)).isNull(); + } + + @Test + void fromRecordBuildsToken() { + MetricsRecordIdentity identity = stubIdentity("report-1", 5000L); + MetricsReportToken token = MetricsReportToken.fromRecord(identity); + + assertThat(token).isNotNull(); + assertThat(token.reportId()).isEqualTo("report-1"); + assertThat(token.timestampMs()).isEqualTo(5000L); + } + + @Test + void serializeRoundTrip() { + // Build a PageToken carrying a MetricsReportToken, encode it, then decode. + MetricsRecordIdentity seed = stubIdentity("round-trip-id", 9_000_000L); + MetricsReportToken cursor = MetricsReportToken.fromRecord(seed); + assertThat(cursor).isNotNull(); + + // Produce a Page with one item so we get an encodedResponseToken back. + PageToken firstPage = PageToken.fromLimit(1); + Page page = + Page.mapped( + firstPage, Stream.of("item-a", "item-b"), Function.identity(), ignored -> cursor); + + String encoded = page.encodedResponseToken(); + assertThat(encoded).isNotNull(); + + // Decode as a new PageToken and extract cursor. + PageToken decoded = PageToken.build(encoded, 1, () -> true); + assertThat(decoded.paginationRequested()).isTrue(); + assertThat(decoded.valueAs(MetricsReportToken.class)).isPresent(); + + MetricsReportToken recovered = decoded.valueAs(MetricsReportToken.class).get(); + assertThat(recovered.reportId()).isEqualTo("round-trip-id"); + assertThat(recovered.timestampMs()).isEqualTo(9_000_000L); + } + + @Test + void paginationWithCursorStopsAtPageSize() { + PageToken request = PageToken.fromLimit(2); + List items = List.of("a", "b", "c", "d"); + MetricsRecordIdentity identity = stubIdentity("id-b", 2000L); + + Page page = + Page.mapped( + request, + items.stream(), + Function.identity(), + item -> item.equals("b") ? MetricsReportToken.fromRecord(identity) : null); + + assertThat(page.items()).hasSize(2).containsExactly("a", "b"); + assertThat(page.encodedResponseToken()).isNotNull(); + } + + @Test + void lastPageReturnsNullToken() { + PageToken request = PageToken.fromLimit(10); + Page page = + Page.mapped( + request, + Stream.of("x", "y"), + Function.identity(), + // tokenBuilder receives null when there is no next page; must return null in that case. + item -> item != null ? MetricsReportToken.fromRecord(stubIdentity("id", 1L)) : null); + + // Fewer items than page size — no next page. + assertThat(page.items()).hasSize(2); + assertThat(page.encodedResponseToken()).isNull(); + } + + // Minimal stub so we don't need a full ScanMetricsRecord/CommitMetricsRecord. + private static MetricsRecordIdentity stubIdentity(String reportId, long timestampMs) { + return new MetricsRecordIdentity() { + @Override + public String reportId() { + return reportId; + } + + @Override + public long catalogId() { + return 0; + } + + @Override + public long tableId() { + return 0; + } + + @Override + public Instant timestamp() { + return Instant.ofEpochMilli(timestampMs); + } + + @Override + public java.util.Map metadata() { + return java.util.Map.of(); + } + + @Override + public String principalName() { + return null; + } + + @Override + public String requestId() { + return null; + } + + @Override + public String otelTraceId() { + return null; + } + + @Override + public String otelSpanId() { + return null; + } + }; + } +} diff --git a/polaris-core/src/main/java/org/apache/polaris/core/auth/PolarisAuthorizableOperation.java b/polaris-core/src/main/java/org/apache/polaris/core/auth/PolarisAuthorizableOperation.java index d579b131fb..615f60994b 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/auth/PolarisAuthorizableOperation.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/auth/PolarisAuthorizableOperation.java @@ -51,6 +51,7 @@ public enum PolarisAuthorizableOperation { VIEW_EXISTS, RENAME_VIEW, REPORT_READ_METRICS, + LIST_TABLE_METRICS, REPORT_WRITE_METRICS, SEND_NOTIFICATIONS, LIST_CATALOGS, diff --git a/polaris-core/src/main/java/org/apache/polaris/core/auth/PolarisAuthorizerImpl.java b/polaris-core/src/main/java/org/apache/polaris/core/auth/PolarisAuthorizerImpl.java index 7e49e875bc..903ecbf661 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/auth/PolarisAuthorizerImpl.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/auth/PolarisAuthorizerImpl.java @@ -98,6 +98,7 @@ import static org.apache.polaris.core.entity.PolarisPrivilege.TABLE_MANAGE_GRANTS_ON_SECURABLE; import static org.apache.polaris.core.entity.PolarisPrivilege.TABLE_MANAGE_STRUCTURE; import static org.apache.polaris.core.entity.PolarisPrivilege.TABLE_READ_DATA; +import static org.apache.polaris.core.entity.PolarisPrivilege.TABLE_READ_METRICS; import static org.apache.polaris.core.entity.PolarisPrivilege.TABLE_READ_PROPERTIES; import static org.apache.polaris.core.entity.PolarisPrivilege.TABLE_REMOVE_PARTITION_SPECS; import static org.apache.polaris.core.entity.PolarisPrivilege.TABLE_REMOVE_PROPERTIES; @@ -458,6 +459,9 @@ public class PolarisAuthorizerImpl implements PolarisAuthorizer { SUPER_PRIVILEGES.putAll( TABLE_READ_DATA, List.of(CATALOG_MANAGE_CONTENT, TABLE_READ_DATA, TABLE_WRITE_DATA)); SUPER_PRIVILEGES.putAll(TABLE_WRITE_DATA, List.of(CATALOG_MANAGE_CONTENT, TABLE_WRITE_DATA)); + SUPER_PRIVILEGES.putAll( + TABLE_READ_METRICS, + List.of(CATALOG_MANAGE_CONTENT, TABLE_FULL_METADATA, TABLE_READ_DATA, TABLE_READ_METRICS)); SUPER_PRIVILEGES.putAll( NAMESPACE_FULL_METADATA, List.of(CATALOG_MANAGE_CONTENT, CATALOG_MANAGE_METADATA, NAMESPACE_FULL_METADATA)); diff --git a/polaris-core/src/main/java/org/apache/polaris/core/auth/RbacOperationSemantics.java b/polaris-core/src/main/java/org/apache/polaris/core/auth/RbacOperationSemantics.java index cfd2253fcc..f71ff77468 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/auth/RbacOperationSemantics.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/auth/RbacOperationSemantics.java @@ -77,6 +77,7 @@ import static org.apache.polaris.core.auth.PolarisAuthorizableOperation.LIST_PRINCIPAL_ROLES; import static org.apache.polaris.core.auth.PolarisAuthorizableOperation.LIST_PRINCIPAL_ROLES_ASSIGNED; import static org.apache.polaris.core.auth.PolarisAuthorizableOperation.LIST_TABLES; +import static org.apache.polaris.core.auth.PolarisAuthorizableOperation.LIST_TABLE_METRICS; import static org.apache.polaris.core.auth.PolarisAuthorizableOperation.LIST_VIEWS; import static org.apache.polaris.core.auth.PolarisAuthorizableOperation.LOAD_NAMESPACE_METADATA; import static org.apache.polaris.core.auth.PolarisAuthorizableOperation.LOAD_POLICY; @@ -187,6 +188,7 @@ import static org.apache.polaris.core.entity.PolarisPrivilege.TABLE_LIST; import static org.apache.polaris.core.entity.PolarisPrivilege.TABLE_MANAGE_GRANTS_ON_SECURABLE; import static org.apache.polaris.core.entity.PolarisPrivilege.TABLE_READ_DATA; +import static org.apache.polaris.core.entity.PolarisPrivilege.TABLE_READ_METRICS; import static org.apache.polaris.core.entity.PolarisPrivilege.TABLE_READ_PROPERTIES; import static org.apache.polaris.core.entity.PolarisPrivilege.TABLE_REMOVE_PARTITION_SPECS; import static org.apache.polaris.core.entity.PolarisPrivilege.TABLE_REMOVE_PROPERTIES; @@ -310,6 +312,7 @@ private static void register( // Metrics and notifications register(REPORT_READ_METRICS, TABLE_READ_DATA); + register(LIST_TABLE_METRICS, TABLE_READ_METRICS); register(REPORT_WRITE_METRICS, TABLE_WRITE_DATA); register( SEND_NOTIFICATIONS, diff --git a/polaris-core/src/main/java/org/apache/polaris/core/entity/PolarisPrivilege.java b/polaris-core/src/main/java/org/apache/polaris/core/entity/PolarisPrivilege.java index ff7e029ba5..483d18b2e4 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/entity/PolarisPrivilege.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/entity/PolarisPrivilege.java @@ -256,6 +256,15 @@ public enum PolarisPrivilege { PolarisEntityType.TABLE_LIKE, List.of(PolarisEntitySubType.ICEBERG_TABLE, PolarisEntitySubType.GENERIC_TABLE), PolarisEntityType.CATALOG_ROLE), + /** + * Read-only access to table scan and commit metrics reports. Does not grant access to table data. + * Implied by TABLE_READ_DATA and TABLE_FULL_METADATA. + */ + TABLE_READ_METRICS( + 103, + PolarisEntityType.TABLE_LIKE, + List.of(PolarisEntitySubType.ICEBERG_TABLE, PolarisEntitySubType.GENERIC_TABLE), + PolarisEntityType.CATALOG_ROLE), ; /** diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/metrics/MetricsPersistence.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/metrics/MetricsPersistence.java index bec34a97a1..0bb87a516b 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/persistence/metrics/MetricsPersistence.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/persistence/metrics/MetricsPersistence.java @@ -19,7 +19,11 @@ package org.apache.polaris.core.persistence.metrics; import com.google.common.annotations.Beta; +import java.util.List; +import org.apache.polaris.core.persistence.pagination.Page; +import org.apache.polaris.core.persistence.pagination.PageToken; import org.jspecify.annotations.NonNull; +import org.jspecify.annotations.Nullable; /** * Service Provider Interface (SPI) for persisting Iceberg metrics reports. @@ -73,4 +77,56 @@ default void writeScanReport(@NonNull ScanMetricsRecord record) { default void writeCommitReport(@NonNull CommitMetricsRecord record) { // No-op by default - backends that don't support metrics silently ignore } + + /** + * Lists scan metrics reports for a given table, ordered by timestamp descending. + * + *

Default implementation returns an empty page. Override in implementations that support + * metrics reads. + * + * @param catalogId the internal catalog ID + * @param tableId the internal table entity ID + * @param snapshotId optional filter by snapshot ID + * @param principalName optional filter by principal name + * @param timestampFrom optional inclusive lower bound on timestamp (epoch ms) + * @param timestampTo optional exclusive upper bound on timestamp (epoch ms) + * @param pageToken pagination token + * @return a page of scan metrics records + */ + default Page listScanReports( + long catalogId, + long tableId, + @Nullable Long snapshotId, + @Nullable String principalName, + @Nullable Long timestampFrom, + @Nullable Long timestampTo, + @NonNull PageToken pageToken) { + return Page.fromItems(List.of()); + } + + /** + * Lists commit metrics reports for a given table, ordered by timestamp descending. + * + *

Default implementation returns an empty page. Override in implementations that support + * metrics reads. + * + * @param catalogId the internal catalog ID + * @param tableId the internal table entity ID + * @param snapshotId optional filter by snapshot ID + * @param principalName optional filter by principal name + * @param timestampFrom optional inclusive lower bound on timestamp (epoch ms) + * @param timestampTo optional exclusive upper bound on timestamp (epoch ms) + * @param pageToken pagination token + * @return a page of commit metrics records + */ + default Page listCommitReports( + long catalogId, + long tableId, + @Nullable Long snapshotId, + @Nullable String principalName, + @Nullable Long timestampFrom, + @Nullable Long timestampTo, + @NonNull PageToken pageToken) { + return Page.fromItems(List.of()); + } } diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/metrics/PolarisMetricsManager.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/metrics/PolarisMetricsManager.java index e3919e997a..8722339e1d 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/persistence/metrics/PolarisMetricsManager.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/persistence/metrics/PolarisMetricsManager.java @@ -18,8 +18,12 @@ */ package org.apache.polaris.core.persistence.metrics; +import com.google.common.annotations.Beta; import org.apache.polaris.core.PolarisCallContext; +import org.apache.polaris.core.persistence.pagination.Page; +import org.apache.polaris.core.persistence.pagination.PageToken; import org.jspecify.annotations.NonNull; +import org.jspecify.annotations.Nullable; /** * Interface for managing Iceberg metrics persistence through the metastore manager layer. @@ -40,6 +44,7 @@ * support this interface. Backends that want actual metrics persistence (e.g., JDBC) override the * methods; others use the default no-op behavior. */ +@Beta public interface PolarisMetricsManager { /** @@ -75,4 +80,34 @@ default void writeCommitMetrics( @NonNull PolarisCallContext callCtx, @NonNull CommitMetricsRecord record) { callCtx.getMetaStore().writeCommitReport(record); } + + default Page listScanMetrics( + @NonNull PolarisCallContext callCtx, + long catalogId, + long tableId, + @Nullable Long snapshotId, + @Nullable String principalName, + @Nullable Long timestampFrom, + @Nullable Long timestampTo, + @NonNull PageToken pageToken) { + return callCtx + .getMetaStore() + .listScanReports( + catalogId, tableId, snapshotId, principalName, timestampFrom, timestampTo, pageToken); + } + + default Page listCommitMetrics( + @NonNull PolarisCallContext callCtx, + long catalogId, + long tableId, + @Nullable Long snapshotId, + @Nullable String principalName, + @Nullable Long timestampFrom, + @Nullable Long timestampTo, + @NonNull PageToken pageToken) { + return callCtx + .getMetaStore() + .listCommitReports( + catalogId, tableId, snapshotId, principalName, timestampFrom, timestampTo, pageToken); + } } diff --git a/polaris-core/src/test/java/org/apache/polaris/core/entity/PolarisPrivilegeTest.java b/polaris-core/src/test/java/org/apache/polaris/core/entity/PolarisPrivilegeTest.java index 14596911fd..048f9ce889 100644 --- a/polaris-core/src/test/java/org/apache/polaris/core/entity/PolarisPrivilegeTest.java +++ b/polaris-core/src/test/java/org/apache/polaris/core/entity/PolarisPrivilegeTest.java @@ -131,7 +131,8 @@ static Stream polarisPrivileges() { Arguments.of(100, PolarisPrivilege.TABLE_REMOVE_STATISTICS), Arguments.of(101, PolarisPrivilege.TABLE_REMOVE_PARTITION_SPECS), Arguments.of(102, PolarisPrivilege.TABLE_MANAGE_STRUCTURE), - Arguments.of(103, null)); + Arguments.of(103, PolarisPrivilege.TABLE_READ_METRICS), + Arguments.of(104, null)); } @ParameterizedTest diff --git a/runtime/server/build.gradle.kts b/runtime/server/build.gradle.kts index 587bc2f2d0..5a5c22e8ce 100644 --- a/runtime/server/build.gradle.kts +++ b/runtime/server/build.gradle.kts @@ -38,6 +38,7 @@ dependencies { runtimeOnly("io.quarkus:quarkus-jdbc-postgresql") runtimeOnly(project(":polaris-extensions-federation-hadoop")) runtimeOnly(project(":polaris-extensions-auth-opa")) + runtimeOnly(project(":polaris-extensions-metrics-reports")) runtimeOnly(project(":polaris-extensions-auth-ranger")) if ((project.findProperty("NonRESTCatalogs") as String?)?.contains("HIVE") == true) { diff --git a/runtime/service/src/main/java/org/apache/polaris/service/reporting/PolarisMetricsReporter.java b/runtime/service/src/main/java/org/apache/polaris/service/reporting/PolarisMetricsReporter.java index c621be2a5d..4390ddb5ff 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/reporting/PolarisMetricsReporter.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/reporting/PolarisMetricsReporter.java @@ -18,6 +18,7 @@ */ package org.apache.polaris.service.reporting; +import com.google.common.annotations.Beta; import java.time.Instant; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.metrics.MetricsReport; @@ -37,6 +38,7 @@ * @see DefaultMetricsReporter * @see MetricsReportingConfiguration */ +@Beta public interface PolarisMetricsReporter { /** diff --git a/site/content/in-dev/unreleased/managing-security/access-control.md b/site/content/in-dev/unreleased/managing-security/access-control.md index 40752651b3..6bb4588864 100644 --- a/site/content/in-dev/unreleased/managing-security/access-control.md +++ b/site/content/in-dev/unreleased/managing-security/access-control.md @@ -115,6 +115,7 @@ To grant the full set of privileges (drop, list, read, write, etc.) on an object | TABLE_WRITE_PROPERTIES | Enables configuring properties for the table. | | TABLE_READ_DATA | Enables reading data from the table by receiving short-lived read-only storage credentials from the catalog. | | TABLE_WRITE_DATA | Enables writing data to the table by receiving short-lived read+write storage credentials from the catalog. | +| TABLE_READ_METRICS | Enables reading persisted Iceberg scan and commit metrics reports for the table via the Metrics Reports API. | | TABLE_FULL_METADATA | Grants all table privileges, except TABLE_READ_DATA and TABLE_WRITE_DATA, which need to be granted individually. | | TABLE_ATTACH_POLICY | Enables attaching policy to a table. | | TABLE_DETACH_POLICY | Enables detaching policy from a table. | diff --git a/site/content/in-dev/unreleased/polaris-api-specs/polaris-metrics-reports-api.md b/site/content/in-dev/unreleased/polaris-api-specs/polaris-metrics-reports-api.md new file mode 100644 index 0000000000..c4cc1f983a --- /dev/null +++ b/site/content/in-dev/unreleased/polaris-api-specs/polaris-metrics-reports-api.md @@ -0,0 +1,27 @@ +--- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +title: 'Apache Polaris Metrics Reports Service OpenAPI Specification' +linkTitle: 'Metrics Reports API ↗' +weight: 300 +params: + show_page_toc: false +--- + +{{< redoc-polaris "metrics-reports-service.yml" >}} diff --git a/site/content/in-dev/unreleased/telemetry.md b/site/content/in-dev/unreleased/telemetry.md index 59a9b763ee..ffb4723c6f 100644 --- a/site/content/in-dev/unreleased/telemetry.md +++ b/site/content/in-dev/unreleased/telemetry.md @@ -191,6 +191,72 @@ polaris.log.mdc.region=us-west-2 MDC context is propagated across threads, including in `TaskExecutor` threads. +## Iceberg Metrics Reports API + +Polaris can persist Iceberg scan and commit metrics reports submitted by clients to the database. +To enable persistence, set the following property: + +```properties +polaris.iceberg-metrics.reporting.type=persisting +``` + +Once enabled, persisted reports are queryable via the Metrics Reports API at +`/api/metrics-reports/v1/catalogs/{catalogName}/namespaces/{namespace}/tables/{table}`. + +### Prerequisites + +The caller must hold the `TABLE_READ_METRICS` privilege on the target table (or a privilege that +implies it, such as `TABLE_READ_DATA`, `TABLE_FULL_METADATA`, or `CATALOG_MANAGE_CONTENT`). + +### Query parameters + +| Parameter | Type | Description | +|-----------|------|-------------| +| `metricType` | string (required) | Either `scan` or `commit`. | +| `pageToken` | string | Pagination cursor from a previous response's `nextPageToken`. | +| `pageSize` | integer | Number of records per page (default: 100). | +| `snapshotId` | long | Filter to reports for a specific snapshot. | +| `principalName` | string | Filter to reports submitted by a specific principal. | +| `timestampFrom` | long (epoch ms) | Include only reports at or after this timestamp. | +| `timestampTo` | long (epoch ms) | Include only reports before this timestamp. | + +Results are returned in descending timestamp order. The response includes a `nextPageToken` field; +pass its value as `pageToken` in the next request to retrieve the following page. A `null` +`nextPageToken` indicates the last page. + +### Response envelope + +Each report follows a stable envelope structure that separates identity fields from type-specific +metrics. This allows the payload schema to evolve independently without breaking clients that only +need the envelope fields for pagination or correlation. + +```json +{ + "metricType": "scan", + "nextPageToken": null, + "reports": [{ + "id": "report-uuid", + "timestampMs": 1709337612345, + "actor": { "principalName": "alice" }, + "request": { "requestId": "req-1", "otelTraceId": "abc123", "otelSpanId": "def456" }, + "object": { "snapshotId": 99 }, + "payload": { + "type": "iceberg.metrics.scan", + "version": 1, + "data": { + "resultDataFiles": 150, + "totalFileSizeBytes": 1073741824, + "totalPlanningDurationMs": 250, + "..." + } + } + }] +} +``` + +See the [Metrics Reports API specification]({{% relref "polaris-api-specs/polaris-metrics-reports-api" %}}) +for the full schema reference. + ## Links Visit [Using Polaris with telemetry tools]({{% relref "getting-started/using-polaris/telemetry-tools" %}}) to see sample Polaris config with Prometheus and Jaeger. diff --git a/spec/metrics-reports-service.yml b/spec/metrics-reports-service.yml new file mode 100644 index 0000000000..66976c2ff8 --- /dev/null +++ b/spec/metrics-reports-service.yml @@ -0,0 +1,479 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +openapi: 3.0.3 +info: + title: Apache Polaris Metrics Reports API + description: > + **Beta**: Read-only API for querying persisted Iceberg table metrics (scan and commit reports) + from Apache Polaris. This API is in beta and may change in future releases. + Requires TABLE_READ_METRICS privilege on the target table. + version: 0.1.0 + license: + name: Apache 2.0 + url: https://www.apache.org/licenses/LICENSE-2.0.html + +servers: + - url: "{scheme}://{host}/api/metrics-reports/v1" + variables: + scheme: + default: https + host: + default: localhost + +paths: + /catalogs/{catalogName}/namespaces/{namespace}/tables/{table}: + parameters: + - $ref: '#/components/parameters/catalogName' + - $ref: '#/components/parameters/namespace' + - $ref: '#/components/parameters/table' + get: + operationId: listTableMetrics + summary: List metrics reports for a table + description: > + Returns persisted metrics reports for the specified table. The required `metricType` + parameter selects between scan reports (produced during table reads) and commit reports + (produced during table writes). Results are ordered by timestamp descending. + Requires TABLE_READ_METRICS privilege on the target table. + tags: + - Metrics + parameters: + - name: metricType + in: query + required: true + description: Type of metrics to retrieve + schema: + type: string + enum: [scan, commit] + - name: pageToken + in: query + required: false + schema: + type: string + description: Opaque cursor from a previous response's nextPageToken field + - name: pageSize + in: query + required: false + schema: + type: integer + minimum: 1 + default: 100 + description: Maximum number of results to return per page + - name: snapshotId + in: query + required: false + schema: + type: integer + format: int64 + description: Filter results to a specific snapshot ID + - name: principalName + in: query + required: false + schema: + type: string + description: Filter results to a specific principal (e.g. service account name) + - name: timestampFrom + in: query + required: false + schema: + type: integer + format: int64 + description: Inclusive lower bound on report timestamp (Unix epoch milliseconds) + - name: timestampTo + in: query + required: false + schema: + type: integer + format: int64 + description: Exclusive upper bound on report timestamp (Unix epoch milliseconds) + responses: + '200': + description: Paginated list of metrics reports + content: + application/json: + schema: + $ref: '#/components/schemas/ListMetricsResponse' + '400': + description: Bad request (missing or invalid parameters) + content: + application/json: + schema: + $ref: '#/components/schemas/ErrorResponse' + '403': + description: Insufficient privileges (TABLE_READ_METRICS required) + content: + application/json: + schema: + $ref: '#/components/schemas/ErrorResponse' + '404': + description: Catalog, namespace, or table not found + content: + application/json: + schema: + $ref: '#/components/schemas/ErrorResponse' + +components: + parameters: + catalogName: + name: catalogName + in: path + required: true + schema: + type: string + namespace: + name: namespace + in: path + required: true + description: > + Namespace path encoded using the same convention as the Polaris Iceberg REST API: + multi-level namespaces use the unit separator (0x1F) between levels, URL-encoded as %1F. + schema: + type: string + table: + name: table + in: path + required: true + schema: + type: string + + schemas: + ListMetricsResponse: + description: > + Polymorphic response for metrics queries. The concrete type is determined by the + metricType discriminator field, which echoes the requested metricType query parameter. + oneOf: + - $ref: '#/components/schemas/ListScanMetricsResponse' + - $ref: '#/components/schemas/ListCommitMetricsResponse' + discriminator: + propertyName: metricType + mapping: + scan: '#/components/schemas/ListScanMetricsResponse' + commit: '#/components/schemas/ListCommitMetricsResponse' + + ListScanMetricsResponse: + type: object + required: + - metricType + - reports + properties: + nextPageToken: + type: string + nullable: true + description: > + Opaque cursor for fetching the next page. Null or absent when no further pages exist. + metricType: + type: string + enum: [scan] + description: Discriminator — always "scan" for this response type + reports: + type: array + items: + $ref: '#/components/schemas/ScanMetricsReport' + + ListCommitMetricsResponse: + type: object + required: + - metricType + - reports + properties: + nextPageToken: + type: string + nullable: true + description: > + Opaque cursor for fetching the next page. Null or absent when no further pages exist. + metricType: + type: string + enum: [commit] + description: Discriminator — always "commit" for this response type + reports: + type: array + items: + $ref: '#/components/schemas/CommitMetricsReport' + + MetricsActor: + type: object + description: Identity of the principal who triggered the operation + properties: + principalName: + type: string + nullable: true + + MetricsRequest: + type: object + description: Request context for correlation with logs and traces + properties: + requestId: + type: string + nullable: true + otelTraceId: + type: string + nullable: true + description: OpenTelemetry trace ID + otelSpanId: + type: string + nullable: true + description: OpenTelemetry span ID + + ScanMetricsObject: + type: object + description: Resource context for the scanned table operation + properties: + snapshotId: + type: integer + format: int64 + nullable: true + + CommitMetricsObject: + type: object + description: Resource context for the committed table operation + required: + - snapshotId + properties: + snapshotId: + type: integer + format: int64 + + ScanPayloadData: + type: object + description: Iceberg scan metrics data + properties: + schemaId: + type: integer + nullable: true + filterExpression: + type: string + nullable: true + projectedFieldIds: + type: string + nullable: true + description: Comma-separated projected field IDs + projectedFieldNames: + type: string + nullable: true + description: Comma-separated projected field names + resultDataFiles: + type: integer + format: int64 + resultDeleteFiles: + type: integer + format: int64 + totalFileSizeBytes: + type: integer + format: int64 + totalDataManifests: + type: integer + format: int64 + totalDeleteManifests: + type: integer + format: int64 + scannedDataManifests: + type: integer + format: int64 + scannedDeleteManifests: + type: integer + format: int64 + skippedDataManifests: + type: integer + format: int64 + skippedDeleteManifests: + type: integer + format: int64 + skippedDataFiles: + type: integer + format: int64 + skippedDeleteFiles: + type: integer + format: int64 + totalPlanningDurationMs: + type: integer + format: int64 + equalityDeleteFiles: + type: integer + format: int64 + positionalDeleteFiles: + type: integer + format: int64 + indexedDeleteFiles: + type: integer + format: int64 + totalDeleteFileSizeBytes: + type: integer + format: int64 + + ScanPayload: + type: object + required: + - type + - version + - data + properties: + type: + type: string + enum: [iceberg.metrics.scan] + version: + type: integer + enum: [1] + data: + $ref: '#/components/schemas/ScanPayloadData' + + CommitPayloadData: + type: object + description: Iceberg commit metrics data + properties: + sequenceNumber: + type: integer + format: int64 + nullable: true + operation: + type: string + description: Commit operation (append, overwrite, delete, replace) + addedDataFiles: + type: integer + format: int64 + removedDataFiles: + type: integer + format: int64 + totalDataFiles: + type: integer + format: int64 + addedDeleteFiles: + type: integer + format: int64 + removedDeleteFiles: + type: integer + format: int64 + totalDeleteFiles: + type: integer + format: int64 + addedEqualityDeleteFiles: + type: integer + format: int64 + removedEqualityDeleteFiles: + type: integer + format: int64 + addedPositionalDeleteFiles: + type: integer + format: int64 + removedPositionalDeleteFiles: + type: integer + format: int64 + addedRecords: + type: integer + format: int64 + removedRecords: + type: integer + format: int64 + totalRecords: + type: integer + format: int64 + addedFileSizeBytes: + type: integer + format: int64 + removedFileSizeBytes: + type: integer + format: int64 + totalFileSizeBytes: + type: integer + format: int64 + totalDurationMs: + type: integer + format: int64 + nullable: true + attempts: + type: integer + + CommitPayload: + type: object + required: + - type + - version + - data + properties: + type: + type: string + enum: [iceberg.metrics.commit] + version: + type: integer + enum: [1] + data: + $ref: '#/components/schemas/CommitPayloadData' + + ScanMetricsReport: + type: object + description: Stable envelope for a persisted Iceberg scan metrics report + required: + - id + - timestampMs + - object + - payload + properties: + id: + type: string + description: Unique identifier for this report + timestampMs: + type: integer + format: int64 + description: Server-side timestamp when the report was received (Unix epoch milliseconds) + actor: + $ref: '#/components/schemas/MetricsActor' + nullable: true + request: + $ref: '#/components/schemas/MetricsRequest' + nullable: true + object: + $ref: '#/components/schemas/ScanMetricsObject' + payload: + $ref: '#/components/schemas/ScanPayload' + + CommitMetricsReport: + type: object + description: Stable envelope for a persisted Iceberg commit metrics report + required: + - id + - timestampMs + - object + - payload + properties: + id: + type: string + description: Unique identifier for this report + timestampMs: + type: integer + format: int64 + description: Server-side timestamp when the report was received (Unix epoch milliseconds) + actor: + $ref: '#/components/schemas/MetricsActor' + nullable: true + request: + $ref: '#/components/schemas/MetricsRequest' + nullable: true + object: + $ref: '#/components/schemas/CommitMetricsObject' + payload: + $ref: '#/components/schemas/CommitPayload' + + ErrorResponse: + type: object + properties: + message: + type: string + type: + type: string + code: + type: integer