Skip to content
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ request adding CHANGELOG notes for breaking (!) changes and possibly other secti

### New Features

- Added a Table Metrics REST API (`/api/metrics-reports/v1/`) that exposes persisted Iceberg scan and commit metrics reports over HTTP. Querying requires the new `TABLE_READ_METRICS` privilege on the target table.
Comment thread
obelix74 marked this conversation as resolved.
Outdated
- Added `deploymentAnnotations` support in Helm chart.
- Added KMS properties (optional) to catalog storage config to enable S3 data encryption.
- Added `topologySpreadConstraints` support in Helm chart.
Expand Down
93 changes: 93 additions & 0 deletions api/metrics-reports-service/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

import org.openapitools.generator.gradle.plugin.tasks.GenerateTask

plugins {
alias(libs.plugins.openapi.generator)
id("polaris-client")
id("org.kordamp.gradle.jandex")
}

dependencies {
implementation(project(":polaris-core"))

compileOnly(platform(libs.jackson.bom))
compileOnly("com.fasterxml.jackson.core:jackson-annotations")

compileOnly(libs.jakarta.annotation.api)
compileOnly(libs.jakarta.inject.api)
compileOnly(libs.jakarta.validation.api)
compileOnly(libs.microprofile.fault.tolerance.api)
compileOnly(libs.swagger.annotations)

implementation(libs.jakarta.servlet.api)
implementation(libs.jakarta.ws.rs.api)

compileOnly(platform(libs.micrometer.bom))
compileOnly("io.micrometer:micrometer-core")

implementation(libs.slf4j.api)
}

val rootDir = rootProject.layout.projectDirectory
val specsDir = rootDir.dir("spec")
val templatesDir = rootDir.dir("server-templates")
val generatedDir = project.layout.buildDirectory.dir("generated-openapi")
val generatedOpenApiSrcDir = project.layout.buildDirectory.dir("generated-openapi/src/main/java")

openApiGenerate {
inputSpec = provider { specsDir.file("metrics-reports-service.yml").asFile.absolutePath }
generatorName = "jaxrs-resteasy"
outputDir = provider { generatedDir.get().asFile.absolutePath }
apiPackage = "org.apache.polaris.service.metrics.api"
modelPackage = "org.apache.polaris.core.metrics.api.model"
ignoreFileOverride.set(provider { rootDir.file(".openapi-generator-ignore").asFile.absolutePath })
removeOperationIdPrefix.set(true)
templateDir.set(provider { templatesDir.asFile.absolutePath })
globalProperties.put("apis", "")
globalProperties.put("models", "")
globalProperties.put("apiDocs", "false")
globalProperties.put("modelTests", "false")
configOptions.put("openApiNullable", "false")
configOptions.put("useBeanValidation", "true")
configOptions.put("sourceFolder", "src/main/java")
configOptions.put("useJakartaEe", "true")
configOptions.put("generateBuilders", "true")
configOptions.put("generateConstructorWithAllArgs", "true")
configOptions.put("hideGenerationTimestamp", "true")
additionalProperties.put("apiNamePrefix", "Polaris")
additionalProperties.put("apiNameSuffix", "Api")
additionalProperties.put("metricsPrefix", "polaris")
serverVariables.put("basePath", "api/metrics-reports/v1")
}

listOf("sourcesJar", "compileJava", "processResources").forEach { task ->
tasks.named(task) { dependsOn("openApiGenerate") }
}

sourceSets { main { java { srcDir(generatedOpenApiSrcDir) } } }

tasks.named<GenerateTask>("openApiGenerate") {
inputs.dir(templatesDir)
inputs.dir(specsDir)
actions.addFirst { delete { delete(generatedDir) } }
}

tasks.named("javadoc") { dependsOn("jandex") }
1 change: 1 addition & 0 deletions gradle/projects.main.properties
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ polaris-core=polaris-core
polaris-api-iceberg-service=api/iceberg-service
polaris-api-management-model=api/management-model
polaris-api-management-service=api/management-service
polaris-api-metrics-reports-service=api/metrics-reports-service
polaris-api-catalog-service=api/polaris-catalog-service
polaris-runtime-defaults=runtime/defaults
polaris-runtime-service=runtime/service
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import org.apache.polaris.core.persistence.metrics.CommitMetricsRecord;
import org.apache.polaris.core.persistence.metrics.ScanMetricsRecord;
import org.apache.polaris.core.persistence.pagination.EntityIdToken;
import org.apache.polaris.core.persistence.pagination.MetricsReportToken;
import org.apache.polaris.core.persistence.pagination.Page;
import org.apache.polaris.core.persistence.pagination.PageToken;
import org.apache.polaris.core.policy.PolarisPolicyMappingRecord;
Expand Down Expand Up @@ -1336,4 +1337,141 @@ private void writeCommitMetricsReport(@Nonnull ModelCommitMetricsReport report)
String.format("Failed to write commit metrics report due to %s", e.getMessage()), e);
}
}

@Override
public Page<ScanMetricsRecord> listScanReports(
long catalogId,
long tableId,
@Nullable Long snapshotId,
@Nullable String principalName,
@Nullable Long timestampFrom,
@Nullable Long timestampTo,
@Nonnull PageToken pageToken) {
try {
PreparedQuery query =
buildMetricsQuery(
ModelScanMetricsReport.TABLE_NAME,
catalogId,
tableId,
snapshotId,
principalName,
timestampFrom,
timestampTo,
pageToken);
List<ModelScanMetricsReport> rows =
datasourceOperations.executeSelect(query, ModelScanMetricsReport.CONVERTER);
return Page.mapped(
pageToken,
rows.stream().map(ModelScanMetricsReport::toRecord),
Function.identity(),
MetricsReportToken::fromRecord);
} catch (SQLException e) {
throw new RuntimeException(
String.format("Failed to list scan metrics reports: %s", e.getMessage()), e);
}
}

@Override
public Page<CommitMetricsRecord> listCommitReports(
long catalogId,
long tableId,
@Nullable Long snapshotId,
@Nullable String principalName,
@Nullable Long timestampFrom,
@Nullable Long timestampTo,
@Nonnull PageToken pageToken) {
try {
PreparedQuery query =
buildMetricsQuery(
ModelCommitMetricsReport.TABLE_NAME,
catalogId,
tableId,
snapshotId,
principalName,
timestampFrom,
timestampTo,
pageToken);
List<ModelCommitMetricsReport> rows =
datasourceOperations.executeSelect(query, ModelCommitMetricsReport.CONVERTER);
return Page.mapped(
pageToken,
rows.stream().map(ModelCommitMetricsReport::toRecord),
Function.identity(),
MetricsReportToken::fromRecord);
} catch (SQLException e) {
throw new RuntimeException(
String.format("Failed to list commit metrics reports: %s", e.getMessage()), e);
}
}

/**
* Builds a parameterized SELECT query for a metrics report table using keyset pagination.
*
* <p>Rows are ordered by {@code (timestamp_ms DESC, report_id DESC)}. The cursor from {@link
* MetricsReportToken} drives the keyset predicate: {@code (timestamp_ms < cursorTs) OR
* (timestamp_ms = cursorTs AND report_id < cursorId)}.
*/
private PreparedQuery buildMetricsQuery(
Comment thread
obelix74 marked this conversation as resolved.
String tableName,
long catalogId,
long tableId,
@Nullable Long snapshotId,
@Nullable String principalName,
@Nullable Long timestampFrom,
@Nullable Long timestampTo,
PageToken pageToken) {
StringBuilder sql = new StringBuilder("SELECT * FROM ");
sql.append(QueryGenerator.getFullyQualifiedTableName(tableName));
sql.append(" WHERE realm_id = ? AND catalog_id = ? AND table_id = ?");

List<Object> params = new ArrayList<>();
params.add(realmId);
params.add(catalogId);
params.add(tableId);

if (snapshotId != null) {
sql.append(" AND snapshot_id = ?");
params.add(snapshotId);
}
if (principalName != null) {
sql.append(" AND principal_name = ?");
params.add(principalName);
}
if (timestampFrom != null) {
sql.append(" AND timestamp_ms >= ?");
params.add(timestampFrom);
}
if (timestampTo != null) {
sql.append(" AND timestamp_ms < ?");
params.add(timestampTo);
}

if (pageToken.paginationRequested()) {
// If a token value is present it must be a MetricsReportToken; any other type indicates a
// caller bug (e.g. recycling a token from a different list operation).
if (pageToken.value().isPresent() && pageToken.valueAs(MetricsReportToken.class).isEmpty()) {
throw new IllegalArgumentException(
"pageToken contains a cursor of an unexpected type; expected MetricsReportToken");
}
pageToken
.valueAs(MetricsReportToken.class)
.ifPresent(
cursor -> {
sql.append(" AND (timestamp_ms < ? OR (timestamp_ms = ? AND report_id < ?))");
params.add(cursor.timestampMs());
params.add(cursor.timestampMs());
params.add(cursor.reportId());
});
}

sql.append(" ORDER BY timestamp_ms DESC, report_id DESC");

// Fetch one extra row so Page.mapped() can determine whether a next page exists.
// Always apply a LIMIT: use the requested pageSize, or default to 100 if absent.
int limit = pageToken.pageSize().orElse(100);
sql.append(" LIMIT ?");
params.add(limit + 1);

return new PreparedQuery(sql.toString(), params);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.polaris.persistence.relational.jdbc.models;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** Shared utilities for metrics model classes. */
public final class MetricsModelUtils {

private static final Logger LOGGER = LoggerFactory.getLogger(MetricsModelUtils.class);

public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

private MetricsModelUtils() {}

/**
* Parses a JSON string into a {@code Map<String, String>}.
*
* @param json the JSON string to parse; may be null or empty
* @return the parsed map, or an empty map if the input is null, empty, or unparseable
*/
public static Map<String, String> parseMetadata(String json) {
if (json == null || json.isEmpty() || json.equals("{}")) {
return Map.of();
}
try {
return OBJECT_MAPPER.readValue(json, new TypeReference<Map<String, String>>() {});
} catch (JsonProcessingException e) {
LOGGER.warn("Failed to parse metadata JSON: {}", e.getMessage());
return Map.of();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,14 @@
package org.apache.polaris.persistence.relational.jdbc.models;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import jakarta.annotation.Nullable;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.time.Instant;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.polaris.core.persistence.metrics.CommitMetricsRecord;
import org.apache.polaris.immutables.PolarisImmutable;
import org.apache.polaris.persistence.relational.jdbc.DatabaseType;
Expand Down Expand Up @@ -257,8 +258,6 @@ default Map<String, Object> toMap(DatabaseType databaseType) {

// === Static conversion methods (following ModelEntity pattern) ===

ObjectMapper OBJECT_MAPPER = new ObjectMapper();

/**
* Converts a CommitMetricsRecord (SPI) to ModelCommitMetricsReport (JDBC).
*
Expand Down Expand Up @@ -316,12 +315,54 @@ private static String toJsonString(Map<String, String> map) {
return "{}";
}
try {
return OBJECT_MAPPER.writeValueAsString(map);
return MetricsModelUtils.OBJECT_MAPPER.writeValueAsString(map);
} catch (JsonProcessingException e) {
return "{}";
}
}

/**
* Converts this JDBC model back to the backend-agnostic SPI record.
*
* @return a CommitMetricsRecord built from this model's fields
*/
default CommitMetricsRecord toRecord() {
return CommitMetricsRecord.builder()
.reportId(getReportId())
.catalogId(getCatalogId())
.tableId(getTableId())
.timestamp(Instant.ofEpochMilli(getTimestampMs()))
.metadata(MetricsModelUtils.parseMetadata(getMetadata()))
.principalName(getPrincipalName())
.requestId(getRequestId())
.otelTraceId(getOtelTraceId())
.otelSpanId(getOtelSpanId())
.snapshotId(getSnapshotId())
.sequenceNumber(Optional.ofNullable(getSequenceNumber()))
.operation(getOperation())
.addedDataFiles(getAddedDataFiles())
.removedDataFiles(getRemovedDataFiles())
.totalDataFiles(getTotalDataFiles())
.addedDeleteFiles(getAddedDeleteFiles())
.removedDeleteFiles(getRemovedDeleteFiles())
.totalDeleteFiles(getTotalDeleteFiles())
.addedEqualityDeleteFiles(getAddedEqualityDeleteFiles())
.removedEqualityDeleteFiles(getRemovedEqualityDeleteFiles())
.addedPositionalDeleteFiles(getAddedPositionalDeleteFiles())
.removedPositionalDeleteFiles(getRemovedPositionalDeleteFiles())
.addedRecords(getAddedRecords())
.removedRecords(getRemovedRecords())
.totalRecords(getTotalRecords())
.addedFileSizeBytes(getAddedFileSizeBytes())
.removedFileSizeBytes(getRemovedFileSizeBytes())
.totalFileSizeBytes(getTotalFileSizeBytes())
// The write path stores Optional.empty() as 0L; treat 0 as "unknown" on read.
.totalDurationMs(
getTotalDurationMs() == 0L ? Optional.empty() : Optional.of(getTotalDurationMs()))
.attempts(getAttempts())
.build();
}

/** Dummy instance to be used as a Converter when calling fromResultSet(). */
ModelCommitMetricsReport CONVERTER =
ImmutableModelCommitMetricsReport.builder()
Expand Down
Loading
Loading