diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTSerializers.java b/core/src/main/java/org/apache/iceberg/rest/RESTSerializers.java index 366bbfa9171b..1b79edcced75 100644 --- a/core/src/main/java/org/apache/iceberg/rest/RESTSerializers.java +++ b/core/src/main/java/org/apache/iceberg/rest/RESTSerializers.java @@ -44,6 +44,8 @@ import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.catalog.TableIdentifierParser; import org.apache.iceberg.rest.auth.OAuth2Util; +import org.apache.iceberg.rest.events.Event; +import org.apache.iceberg.rest.events.EventParser; import org.apache.iceberg.rest.requests.CommitTransactionRequest; import org.apache.iceberg.rest.requests.CommitTransactionRequestParser; import org.apache.iceberg.rest.requests.CreateViewRequest; @@ -51,12 +53,15 @@ import org.apache.iceberg.rest.requests.FetchScanTasksRequest; import org.apache.iceberg.rest.requests.FetchScanTasksRequestParser; import org.apache.iceberg.rest.requests.ImmutableCreateViewRequest; +import org.apache.iceberg.rest.requests.ImmutableQueryEventsRequest; import org.apache.iceberg.rest.requests.ImmutableRegisterTableRequest; import org.apache.iceberg.rest.requests.ImmutableRegisterViewRequest; import org.apache.iceberg.rest.requests.ImmutableRemoteSignRequest; import org.apache.iceberg.rest.requests.ImmutableReportMetricsRequest; import org.apache.iceberg.rest.requests.PlanTableScanRequest; import org.apache.iceberg.rest.requests.PlanTableScanRequestParser; +import org.apache.iceberg.rest.requests.QueryEventsRequest; +import org.apache.iceberg.rest.requests.QueryEventsRequestParser; import org.apache.iceberg.rest.requests.RegisterTableRequest; import org.apache.iceberg.rest.requests.RegisterTableRequestParser; import org.apache.iceberg.rest.requests.RegisterViewRequest; @@ -77,6 +82,7 @@ import org.apache.iceberg.rest.responses.FetchScanTasksResponseParser; import org.apache.iceberg.rest.responses.ImmutableLoadCredentialsResponse; import org.apache.iceberg.rest.responses.ImmutableLoadViewResponse; +import org.apache.iceberg.rest.responses.ImmutableQueryEventsResponse; import org.apache.iceberg.rest.responses.ImmutableRemoteSignResponse; import org.apache.iceberg.rest.responses.LoadCredentialsResponse; import org.apache.iceberg.rest.responses.LoadCredentialsResponseParser; @@ -87,6 +93,8 @@ import org.apache.iceberg.rest.responses.OAuthTokenResponse; import org.apache.iceberg.rest.responses.PlanTableScanResponse; import org.apache.iceberg.rest.responses.PlanTableScanResponseParser; +import org.apache.iceberg.rest.responses.QueryEventsResponse; +import org.apache.iceberg.rest.responses.QueryEventsResponseParser; import org.apache.iceberg.rest.responses.RemoteSignResponse; import org.apache.iceberg.rest.responses.RemoteSignResponseParser; import org.apache.iceberg.util.JsonUtil; @@ -174,7 +182,18 @@ public static void registerAll(ObjectMapper mapper) { .addSerializer(RemoteSignResponse.class, new RemoteSignResponseSerializer<>()) .addSerializer(ImmutableRemoteSignResponse.class, new RemoteSignResponseSerializer<>()) .addDeserializer(RemoteSignResponse.class, new RemoteSignResponseDeserializer<>()) - .addDeserializer(ImmutableRemoteSignResponse.class, new RemoteSignResponseDeserializer<>()); + .addDeserializer(ImmutableRemoteSignResponse.class, new RemoteSignResponseDeserializer<>()) + .addSerializer(QueryEventsRequest.class, new QueryEventsRequestSerializer<>()) + .addSerializer(ImmutableQueryEventsRequest.class, new QueryEventsRequestSerializer<>()) + .addDeserializer(QueryEventsRequest.class, new QueryEventsRequestDeserializer<>()) + .addDeserializer(ImmutableQueryEventsRequest.class, new QueryEventsRequestDeserializer<>()) + .addSerializer(QueryEventsResponse.class, new QueryEventsResponseSerializer<>()) + .addSerializer(ImmutableQueryEventsResponse.class, new QueryEventsResponseSerializer<>()) + .addDeserializer(QueryEventsResponse.class, new QueryEventsResponseDeserializer<>()) + .addDeserializer( + ImmutableQueryEventsResponse.class, new QueryEventsResponseDeserializer<>()) + .addSerializer(Event.class, new EventSerializer()) + .addDeserializer(Event.class, new EventDeserializer()); mapper.registerModule(module); } @@ -699,4 +718,56 @@ public T deserialize(JsonParser p, DeserializationContext context) throws IOExce return (T) RemoteSignResponseParser.fromJson(jsonNode); } } + + static class QueryEventsRequestSerializer + extends JsonSerializer { + @Override + public void serialize(T request, JsonGenerator gen, SerializerProvider serializers) + throws IOException { + QueryEventsRequestParser.toJson(request, gen); + } + } + + static class QueryEventsRequestDeserializer + extends JsonDeserializer { + @Override + public T deserialize(JsonParser p, DeserializationContext context) throws IOException { + JsonNode jsonNode = p.getCodec().readTree(p); + return (T) QueryEventsRequestParser.fromJson(jsonNode); + } + } + + static class QueryEventsResponseSerializer + extends JsonSerializer { + @Override + public void serialize(T response, JsonGenerator gen, SerializerProvider serializers) + throws IOException { + QueryEventsResponseParser.toJson(response, gen); + } + } + + static class QueryEventsResponseDeserializer + extends JsonDeserializer { + @Override + public T deserialize(JsonParser p, DeserializationContext context) throws IOException { + JsonNode jsonNode = p.getCodec().readTree(p); + return (T) QueryEventsResponseParser.fromJson(jsonNode); + } + } + + static class EventSerializer extends JsonSerializer { + @Override + public void serialize(Event event, JsonGenerator gen, SerializerProvider serializers) + throws IOException { + EventParser.toJson(event, gen); + } + } + + static class EventDeserializer extends JsonDeserializer { + @Override + public Event deserialize(JsonParser p, DeserializationContext context) throws IOException { + JsonNode jsonNode = p.getCodec().readTree(p); + return EventParser.fromJson(jsonNode); + } + } } diff --git a/core/src/main/java/org/apache/iceberg/rest/events/CatalogOperation.java b/core/src/main/java/org/apache/iceberg/rest/events/CatalogOperation.java new file mode 100644 index 000000000000..20be15680374 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/rest/events/CatalogOperation.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.rest.events; + +/** + * Base interface for all catalog operations in the IRC Events endpoint. + * + *

Each operation type represents a specific catalog change (e.g., creating a table, dropping a + * namespace). The {@link #operationType()} is used as a discriminator for deserialization. + */ +public interface CatalogOperation { + + /** The type of operation, used as a discriminator for polymorphic deserialization. */ + String operationType(); +} diff --git a/core/src/main/java/org/apache/iceberg/rest/events/CatalogOperationParser.java b/core/src/main/java/org/apache/iceberg/rest/events/CatalogOperationParser.java new file mode 100644 index 000000000000..07f513574e79 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/rest/events/CatalogOperationParser.java @@ -0,0 +1,510 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.rest.events; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonNode; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import org.apache.iceberg.MetadataUpdate; +import org.apache.iceberg.MetadataUpdateParser; +import org.apache.iceberg.UpdateRequirement; +import org.apache.iceberg.UpdateRequirementParser; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.catalog.TableIdentifierParser; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.util.JsonUtil; + +/** + * Parser for {@link CatalogOperation} objects, handling polymorphic serialization and + * deserialization based on the operation-type discriminator. + */ +public class CatalogOperationParser { + + private static final String OPERATION_TYPE = "operation-type"; + private static final String NAMESPACE = "namespace"; + private static final String IDENTIFIER = "identifier"; + private static final String TABLE_UUID = "table-uuid"; + private static final String VIEW_UUID = "view-uuid"; + private static final String PURGE = "purge"; + private static final String PROPERTIES = "properties"; + private static final String UPDATES = "updates"; + private static final String REQUIREMENTS = "requirements"; + private static final String UPDATED = "updated"; + private static final String REMOVED = "removed"; + private static final String MISSING = "missing"; + private static final String SOURCE = "source"; + private static final String DESTINATION = "destination"; + private static final String CUSTOM_TYPE = "custom-type"; + private static final String ADDITIONAL_PROPERTIES = "additional-properties"; + + // Operation type constants + static final String CREATE_NAMESPACE = "create-namespace"; + static final String DROP_NAMESPACE = "drop-namespace"; + static final String UPDATE_NAMESPACE_PROPERTIES = "update-namespace-properties"; + static final String CREATE_TABLE = "create-table"; + static final String REGISTER_TABLE = "register-table"; + static final String DROP_TABLE = "drop-table"; + static final String UPDATE_TABLE = "update-table"; + static final String RENAME_TABLE = "rename-table"; + static final String CREATE_VIEW = "create-view"; + static final String DROP_VIEW = "drop-view"; + static final String UPDATE_VIEW = "update-view"; + static final String RENAME_VIEW = "rename-view"; + static final String CUSTOM = "custom"; + + private CatalogOperationParser() {} + + public static void toJson(CatalogOperation operation, JsonGenerator gen) throws IOException { + Preconditions.checkArgument(null != operation, "Invalid operation: null"); + + gen.writeStartObject(); + gen.writeStringField(OPERATION_TYPE, operation.operationType()); + + switch (operation.operationType()) { + case CREATE_NAMESPACE: + createNamespaceToJson((CreateNamespaceOperation) operation, gen); + break; + case DROP_NAMESPACE: + dropNamespaceToJson((DropNamespaceOperation) operation, gen); + break; + case UPDATE_NAMESPACE_PROPERTIES: + updateNamespacePropertiesToJson((UpdateNamespacePropertiesOperation) operation, gen); + break; + case CREATE_TABLE: + createTableToJson((CreateTableOperation) operation, gen); + break; + case REGISTER_TABLE: + registerTableToJson((RegisterTableOperation) operation, gen); + break; + case DROP_TABLE: + dropTableToJson((DropTableOperation) operation, gen); + break; + case UPDATE_TABLE: + updateTableToJson((UpdateTableOperation) operation, gen); + break; + case RENAME_TABLE: + renameTableToJson((RenameTableOperation) operation, gen); + break; + case CREATE_VIEW: + createViewToJson((CreateViewOperation) operation, gen); + break; + case DROP_VIEW: + dropViewToJson((DropViewOperation) operation, gen); + break; + case UPDATE_VIEW: + updateViewToJson((UpdateViewOperation) operation, gen); + break; + case RENAME_VIEW: + renameViewToJson((RenameViewOperation) operation, gen); + break; + case CUSTOM: + customToJson((CustomOperation) operation, gen); + break; + default: + throw new UnsupportedOperationException( + "Cannot serialize unknown operation type: " + operation.operationType()); + } + + gen.writeEndObject(); + } + + public static CatalogOperation fromJson(JsonNode json) { + Preconditions.checkArgument(null != json, "Cannot parse operation from null object"); + Preconditions.checkArgument( + json.isObject(), "Cannot parse operation from non-object: %s", json); + + String operationType = JsonUtil.getString(OPERATION_TYPE, json); + + switch (operationType) { + case CREATE_NAMESPACE: + return createNamespaceFromJson(json); + case DROP_NAMESPACE: + return dropNamespaceFromJson(json); + case UPDATE_NAMESPACE_PROPERTIES: + return updateNamespacePropertiesFromJson(json); + case CREATE_TABLE: + return createTableFromJson(json); + case REGISTER_TABLE: + return registerTableFromJson(json); + case DROP_TABLE: + return dropTableFromJson(json); + case UPDATE_TABLE: + return updateTableFromJson(json); + case RENAME_TABLE: + return renameTableFromJson(json); + case CREATE_VIEW: + return createViewFromJson(json); + case DROP_VIEW: + return dropViewFromJson(json); + case UPDATE_VIEW: + return updateViewFromJson(json); + case RENAME_VIEW: + return renameViewFromJson(json); + case CUSTOM: + return customFromJson(json); + default: + // Forward compatibility: return a custom operation for unknown types + // so that clients can gracefully handle new operation types + return ImmutableCustomOperation.builder() + .operationType(operationType) + .customType(operationType) + .build(); + } + } + + // --- Namespace operations --- + + private static void createNamespaceToJson(CreateNamespaceOperation op, JsonGenerator gen) + throws IOException { + writeNamespace(op.namespace(), gen); + if (!op.properties().isEmpty()) { + JsonUtil.writeStringMap(PROPERTIES, op.properties(), gen); + } + } + + private static CatalogOperation createNamespaceFromJson(JsonNode json) { + ImmutableCreateNamespaceOperation.Builder builder = + ImmutableCreateNamespaceOperation.builder().namespace(namespaceFromJson(json)); + if (json.has(PROPERTIES)) { + builder.properties(JsonUtil.getStringMap(PROPERTIES, json)); + } + return builder.build(); + } + + private static void dropNamespaceToJson(DropNamespaceOperation op, JsonGenerator gen) + throws IOException { + writeNamespace(op.namespace(), gen); + } + + private static CatalogOperation dropNamespaceFromJson(JsonNode json) { + return ImmutableDropNamespaceOperation.builder().namespace(namespaceFromJson(json)).build(); + } + + private static void updateNamespacePropertiesToJson( + UpdateNamespacePropertiesOperation op, JsonGenerator gen) throws IOException { + writeNamespace(op.namespace(), gen); + JsonUtil.writeStringArray(UPDATED, op.updated(), gen); + JsonUtil.writeStringArray(REMOVED, op.removed(), gen); + if (!op.missing().isEmpty()) { + JsonUtil.writeStringArray(MISSING, op.missing(), gen); + } + } + + private static CatalogOperation updateNamespacePropertiesFromJson(JsonNode json) { + ImmutableUpdateNamespacePropertiesOperation.Builder builder = + ImmutableUpdateNamespacePropertiesOperation.builder() + .namespace(namespaceFromJson(json)) + .updated(ImmutableList.copyOf(JsonUtil.getStringList(UPDATED, json))) + .removed(ImmutableList.copyOf(JsonUtil.getStringList(REMOVED, json))); + if (json.has(MISSING)) { + builder.missing(ImmutableList.copyOf(JsonUtil.getStringList(MISSING, json))); + } + return builder.build(); + } + + // --- Table operations --- + + private static void createTableToJson(CreateTableOperation op, JsonGenerator gen) + throws IOException { + writeIdentifier(op.identifier(), gen); + gen.writeStringField(TABLE_UUID, op.tableUuid()); + writeMetadataUpdates(op.updates(), gen); + } + + private static CatalogOperation createTableFromJson(JsonNode json) { + return ImmutableCreateTableOperation.builder() + .identifier(identifierFromJson(json)) + .tableUuid(JsonUtil.getString(TABLE_UUID, json)) + .updates(metadataUpdatesFromJson(json)) + .build(); + } + + private static void registerTableToJson(RegisterTableOperation op, JsonGenerator gen) + throws IOException { + writeIdentifier(op.identifier(), gen); + gen.writeStringField(TABLE_UUID, op.tableUuid()); + if (op.updates() != null) { + writeMetadataUpdates(op.updates(), gen); + } + } + + private static CatalogOperation registerTableFromJson(JsonNode json) { + ImmutableRegisterTableOperation.Builder builder = + ImmutableRegisterTableOperation.builder() + .identifier(identifierFromJson(json)) + .tableUuid(JsonUtil.getString(TABLE_UUID, json)); + if (json.has(UPDATES)) { + builder.updates(metadataUpdatesFromJson(json)); + } + return builder.build(); + } + + private static void dropTableToJson(DropTableOperation op, JsonGenerator gen) throws IOException { + writeIdentifier(op.identifier(), gen); + gen.writeStringField(TABLE_UUID, op.tableUuid()); + if (op.purge() != null) { + gen.writeBooleanField(PURGE, op.purge()); + } + } + + private static CatalogOperation dropTableFromJson(JsonNode json) { + ImmutableDropTableOperation.Builder builder = + ImmutableDropTableOperation.builder() + .identifier(identifierFromJson(json)) + .tableUuid(JsonUtil.getString(TABLE_UUID, json)); + if (json.has(PURGE)) { + builder.purge(json.get(PURGE).asBoolean()); + } + return builder.build(); + } + + private static void updateTableToJson(UpdateTableOperation op, JsonGenerator gen) + throws IOException { + writeIdentifier(op.identifier(), gen); + gen.writeStringField(TABLE_UUID, op.tableUuid()); + writeMetadataUpdates(op.updates(), gen); + if (op.requirements() != null) { + writeRequirements(op.requirements(), gen); + } + } + + private static CatalogOperation updateTableFromJson(JsonNode json) { + ImmutableUpdateTableOperation.Builder builder = + ImmutableUpdateTableOperation.builder() + .identifier(identifierFromJson(json)) + .tableUuid(JsonUtil.getString(TABLE_UUID, json)) + .updates(metadataUpdatesFromJson(json)); + if (json.has(REQUIREMENTS)) { + builder.requirements(requirementsFromJson(json)); + } + return builder.build(); + } + + private static void renameTableToJson(RenameTableOperation op, JsonGenerator gen) + throws IOException { + gen.writeFieldName(SOURCE); + TableIdentifierParser.toJson(op.source(), gen); + gen.writeFieldName(DESTINATION); + TableIdentifierParser.toJson(op.destination(), gen); + gen.writeStringField(TABLE_UUID, op.tableUuid()); + } + + private static CatalogOperation renameTableFromJson(JsonNode json) { + return ImmutableRenameTableOperation.builder() + .source(TableIdentifierParser.fromJson(JsonUtil.get(SOURCE, json))) + .destination(TableIdentifierParser.fromJson(JsonUtil.get(DESTINATION, json))) + .tableUuid(JsonUtil.getString(TABLE_UUID, json)) + .build(); + } + + // --- View operations --- + + private static void createViewToJson(CreateViewOperation op, JsonGenerator gen) + throws IOException { + writeIdentifier(op.identifier(), gen); + gen.writeStringField(VIEW_UUID, op.viewUuid()); + writeMetadataUpdates(op.updates(), gen); + } + + private static CatalogOperation createViewFromJson(JsonNode json) { + return ImmutableCreateViewOperation.builder() + .identifier(identifierFromJson(json)) + .viewUuid(JsonUtil.getString(VIEW_UUID, json)) + .updates(metadataUpdatesFromJson(json)) + .build(); + } + + private static void dropViewToJson(DropViewOperation op, JsonGenerator gen) throws IOException { + writeIdentifier(op.identifier(), gen); + gen.writeStringField(VIEW_UUID, op.viewUuid()); + } + + private static CatalogOperation dropViewFromJson(JsonNode json) { + return ImmutableDropViewOperation.builder() + .identifier(identifierFromJson(json)) + .viewUuid(JsonUtil.getString(VIEW_UUID, json)) + .build(); + } + + private static void updateViewToJson(UpdateViewOperation op, JsonGenerator gen) + throws IOException { + writeIdentifier(op.identifier(), gen); + gen.writeStringField(VIEW_UUID, op.viewUuid()); + writeMetadataUpdates(op.updates(), gen); + if (op.requirements() != null) { + writeRequirements(op.requirements(), gen); + } + } + + private static CatalogOperation updateViewFromJson(JsonNode json) { + ImmutableUpdateViewOperation.Builder builder = + ImmutableUpdateViewOperation.builder() + .identifier(identifierFromJson(json)) + .viewUuid(JsonUtil.getString(VIEW_UUID, json)) + .updates(metadataUpdatesFromJson(json)); + if (json.has(REQUIREMENTS)) { + builder.requirements(requirementsFromJson(json)); + } + return builder.build(); + } + + private static void renameViewToJson(RenameViewOperation op, JsonGenerator gen) + throws IOException { + gen.writeFieldName(SOURCE); + TableIdentifierParser.toJson(op.source(), gen); + gen.writeFieldName(DESTINATION); + TableIdentifierParser.toJson(op.destination(), gen); + gen.writeStringField(VIEW_UUID, op.viewUuid()); + } + + private static CatalogOperation renameViewFromJson(JsonNode json) { + return ImmutableRenameViewOperation.builder() + .source(TableIdentifierParser.fromJson(JsonUtil.get(SOURCE, json))) + .destination(TableIdentifierParser.fromJson(JsonUtil.get(DESTINATION, json))) + .viewUuid(JsonUtil.getString(VIEW_UUID, json)) + .build(); + } + + // --- Custom operation --- + + private static void customToJson(CustomOperation op, JsonGenerator gen) throws IOException { + gen.writeStringField(CUSTOM_TYPE, op.customType()); + if (op.identifier() != null) { + writeIdentifier(op.identifier(), gen); + } + if (op.namespace() != null) { + writeNamespace(op.namespace(), gen); + } + if (op.tableUuid() != null) { + gen.writeStringField(TABLE_UUID, op.tableUuid()); + } + if (op.viewUuid() != null) { + gen.writeStringField(VIEW_UUID, op.viewUuid()); + } + if (!op.additionalProperties().isEmpty()) { + gen.writeObjectFieldStart(ADDITIONAL_PROPERTIES); + for (Map.Entry entry : op.additionalProperties().entrySet()) { + gen.writeFieldName(entry.getKey()); + gen.writeObject(entry.getValue()); + } + gen.writeEndObject(); + } + } + + private static CatalogOperation customFromJson(JsonNode json) { + ImmutableCustomOperation.Builder builder = + ImmutableCustomOperation.builder().customType(JsonUtil.getString(CUSTOM_TYPE, json)); + if (json.has(IDENTIFIER)) { + builder.identifier(identifierFromJson(json)); + } + if (json.has(NAMESPACE)) { + builder.namespace(namespaceFromJson(json)); + } + if (json.has(TABLE_UUID)) { + builder.tableUuid(JsonUtil.getString(TABLE_UUID, json)); + } + if (json.has(VIEW_UUID)) { + builder.viewUuid(JsonUtil.getString(VIEW_UUID, json)); + } + if (json.has(ADDITIONAL_PROPERTIES)) { + ImmutableMap.Builder props = ImmutableMap.builder(); + json.get(ADDITIONAL_PROPERTIES) + .properties() + .forEach( + entry -> { + JsonNode value = entry.getValue(); + if (value.isTextual()) { + props.put(entry.getKey(), value.asText()); + } else if (value.isNumber()) { + props.put(entry.getKey(), value.numberValue()); + } else if (value.isBoolean()) { + props.put(entry.getKey(), value.asBoolean()); + } else { + props.put(entry.getKey(), value.toString()); + } + }); + builder.additionalProperties(props.build()); + } + return builder.build(); + } + + // --- Shared helpers --- + + private static void writeNamespace(Namespace namespace, JsonGenerator gen) throws IOException { + gen.writeArrayFieldStart(NAMESPACE); + for (String level : namespace.levels()) { + gen.writeString(level); + } + gen.writeEndArray(); + } + + private static Namespace namespaceFromJson(JsonNode json) { + String[] levels = JsonUtil.getStringArray(JsonUtil.get(NAMESPACE, json)); + return Namespace.of(levels); + } + + private static void writeIdentifier(TableIdentifier identifier, JsonGenerator gen) + throws IOException { + gen.writeFieldName(IDENTIFIER); + TableIdentifierParser.toJson(identifier, gen); + } + + private static TableIdentifier identifierFromJson(JsonNode json) { + return TableIdentifierParser.fromJson(JsonUtil.get(IDENTIFIER, json)); + } + + private static void writeMetadataUpdates(List updates, JsonGenerator gen) + throws IOException { + gen.writeArrayFieldStart(UPDATES); + for (MetadataUpdate update : updates) { + MetadataUpdateParser.toJson(update, gen); + } + gen.writeEndArray(); + } + + private static List metadataUpdatesFromJson(JsonNode json) { + ImmutableList.Builder updates = ImmutableList.builder(); + for (JsonNode node : json.get(UPDATES)) { + updates.add(MetadataUpdateParser.fromJson(node)); + } + return updates.build(); + } + + private static void writeRequirements(List requirements, JsonGenerator gen) + throws IOException { + gen.writeArrayFieldStart(REQUIREMENTS); + for (UpdateRequirement requirement : requirements) { + UpdateRequirementParser.toJson(requirement, gen); + } + gen.writeEndArray(); + } + + private static List requirementsFromJson(JsonNode json) { + ImmutableList.Builder requirements = ImmutableList.builder(); + for (JsonNode node : json.get(REQUIREMENTS)) { + requirements.add(UpdateRequirementParser.fromJson(node)); + } + return requirements.build(); + } +} diff --git a/core/src/main/java/org/apache/iceberg/rest/events/CreateNamespaceOperation.java b/core/src/main/java/org/apache/iceberg/rest/events/CreateNamespaceOperation.java new file mode 100644 index 000000000000..1956ce11d060 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/rest/events/CreateNamespaceOperation.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.rest.events; + +import java.util.Collections; +import java.util.Map; +import org.apache.iceberg.catalog.Namespace; +import org.immutables.value.Value; + +/** Operation representing the creation of a new namespace. */ +@Value.Immutable +public interface CreateNamespaceOperation extends CatalogOperation { + + @Override + @Value.Default + default String operationType() { + return "create-namespace"; + } + + /** The namespace that was created. */ + Namespace namespace(); + + /** Properties stored on the namespace. */ + @Value.Default + default Map properties() { + return Collections.emptyMap(); + } +} diff --git a/core/src/main/java/org/apache/iceberg/rest/events/CreateTableOperation.java b/core/src/main/java/org/apache/iceberg/rest/events/CreateTableOperation.java new file mode 100644 index 000000000000..8486464bd544 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/rest/events/CreateTableOperation.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.rest.events; + +import java.util.List; +import org.apache.iceberg.MetadataUpdate; +import org.apache.iceberg.catalog.TableIdentifier; +import org.immutables.value.Value; + +/** Operation representing the creation of a new table. */ +@Value.Immutable +public interface CreateTableOperation extends CatalogOperation { + + @Override + @Value.Default + default String operationType() { + return "create-table"; + } + + TableIdentifier identifier(); + + String tableUuid(); + + List updates(); +} diff --git a/core/src/main/java/org/apache/iceberg/rest/events/CreateViewOperation.java b/core/src/main/java/org/apache/iceberg/rest/events/CreateViewOperation.java new file mode 100644 index 000000000000..27a71cfa4604 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/rest/events/CreateViewOperation.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.rest.events; + +import java.util.List; +import org.apache.iceberg.MetadataUpdate; +import org.apache.iceberg.catalog.TableIdentifier; +import org.immutables.value.Value; + +/** Operation representing the creation of a new view. */ +@Value.Immutable +public interface CreateViewOperation extends CatalogOperation { + + @Override + @Value.Default + default String operationType() { + return "create-view"; + } + + TableIdentifier identifier(); + + String viewUuid(); + + List updates(); +} diff --git a/core/src/main/java/org/apache/iceberg/rest/events/CustomOperation.java b/core/src/main/java/org/apache/iceberg/rest/events/CustomOperation.java new file mode 100644 index 000000000000..7a561f3d2c2f --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/rest/events/CustomOperation.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.rest.events; + +import java.util.Map; +import javax.annotation.Nullable; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.immutables.value.Value; + +/** + * Extension point for catalog-specific operations not defined in the standard. + * + *

Custom operations use a type prefixed with "x-" (e.g., "x-compact-table") and may include + * arbitrary additional properties. + */ +@Value.Immutable +public interface CustomOperation extends CatalogOperation { + + @Override + @Value.Default + default String operationType() { + return "custom"; + } + + /** The custom operation type identifier, must start with "x-". */ + String customType(); + + @Nullable + TableIdentifier identifier(); + + @Nullable + Namespace namespace(); + + @Nullable + String tableUuid(); + + @Nullable + String viewUuid(); + + /** Additional properties specific to this custom operation. */ + Map additionalProperties(); +} diff --git a/core/src/main/java/org/apache/iceberg/rest/events/DropNamespaceOperation.java b/core/src/main/java/org/apache/iceberg/rest/events/DropNamespaceOperation.java new file mode 100644 index 000000000000..b2db8478de3c --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/rest/events/DropNamespaceOperation.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.rest.events; + +import org.apache.iceberg.catalog.Namespace; +import org.immutables.value.Value; + +/** Operation representing the dropping of a namespace. */ +@Value.Immutable +public interface DropNamespaceOperation extends CatalogOperation { + + @Override + @Value.Default + default String operationType() { + return "drop-namespace"; + } + + /** The namespace that was dropped. */ + Namespace namespace(); +} diff --git a/core/src/main/java/org/apache/iceberg/rest/events/DropTableOperation.java b/core/src/main/java/org/apache/iceberg/rest/events/DropTableOperation.java new file mode 100644 index 000000000000..b10e4337a5dc --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/rest/events/DropTableOperation.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.rest.events; + +import javax.annotation.Nullable; +import org.apache.iceberg.catalog.TableIdentifier; +import org.immutables.value.Value; + +/** Operation representing the dropping of a table. */ +@Value.Immutable +public interface DropTableOperation extends CatalogOperation { + + @Override + @Value.Default + default String operationType() { + return "drop-table"; + } + + /** The identifier of the table that was dropped. */ + TableIdentifier identifier(); + + /** The UUID of the table that was dropped. */ + String tableUuid(); + + /** Whether the purge flag was set. */ + @Nullable + Boolean purge(); +} diff --git a/core/src/main/java/org/apache/iceberg/rest/events/DropViewOperation.java b/core/src/main/java/org/apache/iceberg/rest/events/DropViewOperation.java new file mode 100644 index 000000000000..20e0dae086dc --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/rest/events/DropViewOperation.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.rest.events; + +import org.apache.iceberg.catalog.TableIdentifier; +import org.immutables.value.Value; + +/** Operation representing the dropping of a view. */ +@Value.Immutable +public interface DropViewOperation extends CatalogOperation { + + @Override + @Value.Default + default String operationType() { + return "drop-view"; + } + + TableIdentifier identifier(); + + String viewUuid(); +} diff --git a/core/src/main/java/org/apache/iceberg/rest/events/Event.java b/core/src/main/java/org/apache/iceberg/rest/events/Event.java new file mode 100644 index 000000000000..67781644d0e9 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/rest/events/Event.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.rest.events; + +import java.util.Map; +import javax.annotation.Nullable; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.immutables.value.Value; + +/** + * Represents a catalog change event in the IRC Events endpoint. + * + *

Each event describes a single operation that occurred in the catalog, such as creating a + * table, dropping a namespace, or updating a view. + */ +@Value.Immutable +public interface Event { + + /** Unique ID of this event. Clients should perform deduplication based on this ID. */ + String eventId(); + + /** Opaque ID of the request this event belongs to. Events from the same request share this ID. */ + String requestId(); + + /** Total number of events generated by the same request. */ + int requestEventCount(); + + /** Timestamp when this event occurred (epoch milliseconds). */ + long timestampMs(); + + /** + * The actor who performed the operation. The content is implementation-specific and may include + * fields such as user name, service account, or other identifying information. + */ + @Nullable + Map actor(); + + /** The operation that was performed. */ + CatalogOperation operation(); + + @Value.Check + default void validate() { + Preconditions.checkArgument(eventId() != null, "Invalid event-id: null"); + Preconditions.checkArgument(requestId() != null, "Invalid request-id: null"); + Preconditions.checkArgument(requestEventCount() > 0, "request-event-count must be > 0"); + Preconditions.checkArgument(operation() != null, "Invalid operation: null"); + } +} diff --git a/core/src/main/java/org/apache/iceberg/rest/events/EventParser.java b/core/src/main/java/org/apache/iceberg/rest/events/EventParser.java new file mode 100644 index 000000000000..ab33fd573522 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/rest/events/EventParser.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.rest.events; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonNode; +import java.io.IOException; +import java.util.Map; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.util.JsonUtil; + +public class EventParser { + + private static final String EVENT_ID = "event-id"; + private static final String REQUEST_ID = "request-id"; + private static final String REQUEST_EVENT_COUNT = "request-event-count"; + private static final String TIMESTAMP_MS = "timestamp-ms"; + private static final String ACTOR = "actor"; + private static final String OPERATION = "operation"; + + private EventParser() {} + + public static String toJson(Event event) { + return toJson(event, false); + } + + public static String toJson(Event event, boolean pretty) { + return JsonUtil.generate(gen -> toJson(event, gen), pretty); + } + + public static void toJson(Event event, JsonGenerator gen) throws IOException { + Preconditions.checkArgument(null != event, "Invalid event: null"); + + gen.writeStartObject(); + + gen.writeStringField(EVENT_ID, event.eventId()); + gen.writeStringField(REQUEST_ID, event.requestId()); + gen.writeNumberField(REQUEST_EVENT_COUNT, event.requestEventCount()); + gen.writeNumberField(TIMESTAMP_MS, event.timestampMs()); + + if (event.actor() != null && !event.actor().isEmpty()) { + gen.writeObjectFieldStart(ACTOR); + for (Map.Entry entry : event.actor().entrySet()) { + gen.writeFieldName(entry.getKey()); + gen.writeObject(entry.getValue()); + } + gen.writeEndObject(); + } + + gen.writeFieldName(OPERATION); + CatalogOperationParser.toJson(event.operation(), gen); + + gen.writeEndObject(); + } + + public static Event fromJson(String json) { + return JsonUtil.parse(json, EventParser::fromJson); + } + + public static Event fromJson(JsonNode json) { + Preconditions.checkArgument(null != json, "Cannot parse event from null object"); + Preconditions.checkArgument(json.isObject(), "Cannot parse event from non-object: %s", json); + + ImmutableEvent.Builder builder = + ImmutableEvent.builder() + .eventId(JsonUtil.getString(EVENT_ID, json)) + .requestId(JsonUtil.getString(REQUEST_ID, json)) + .requestEventCount(JsonUtil.getInt(REQUEST_EVENT_COUNT, json)) + .timestampMs(JsonUtil.getLong(TIMESTAMP_MS, json)); + + if (json.has(ACTOR)) { + ImmutableMap.Builder actor = ImmutableMap.builder(); + json.get(ACTOR) + .properties() + .forEach( + entry -> { + JsonNode value = entry.getValue(); + if (value.isTextual()) { + actor.put(entry.getKey(), value.asText()); + } else if (value.isNumber()) { + actor.put(entry.getKey(), value.numberValue()); + } else if (value.isBoolean()) { + actor.put(entry.getKey(), value.asBoolean()); + } else { + actor.put(entry.getKey(), value.toString()); + } + }); + builder.actor(actor.build()); + } + + builder.operation(CatalogOperationParser.fromJson(JsonUtil.get(OPERATION, json))); + + return builder.build(); + } +} diff --git a/core/src/main/java/org/apache/iceberg/rest/events/RegisterTableOperation.java b/core/src/main/java/org/apache/iceberg/rest/events/RegisterTableOperation.java new file mode 100644 index 000000000000..bddc021c422f --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/rest/events/RegisterTableOperation.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.rest.events; + +import java.util.List; +import javax.annotation.Nullable; +import org.apache.iceberg.MetadataUpdate; +import org.apache.iceberg.catalog.TableIdentifier; +import org.immutables.value.Value; + +/** Operation representing the registration of an existing table. */ +@Value.Immutable +public interface RegisterTableOperation extends CatalogOperation { + + @Override + @Value.Default + default String operationType() { + return "register-table"; + } + + TableIdentifier identifier(); + + String tableUuid(); + + @Nullable + List updates(); +} diff --git a/core/src/main/java/org/apache/iceberg/rest/events/RenameTableOperation.java b/core/src/main/java/org/apache/iceberg/rest/events/RenameTableOperation.java new file mode 100644 index 000000000000..56fe7d681363 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/rest/events/RenameTableOperation.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.rest.events; + +import org.apache.iceberg.catalog.TableIdentifier; +import org.immutables.value.Value; + +/** Operation representing a table rename. */ +@Value.Immutable +public interface RenameTableOperation extends CatalogOperation { + + @Override + @Value.Default + default String operationType() { + return "rename-table"; + } + + TableIdentifier source(); + + TableIdentifier destination(); + + String tableUuid(); +} diff --git a/core/src/main/java/org/apache/iceberg/rest/events/RenameViewOperation.java b/core/src/main/java/org/apache/iceberg/rest/events/RenameViewOperation.java new file mode 100644 index 000000000000..54a9db8f5c07 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/rest/events/RenameViewOperation.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.rest.events; + +import org.apache.iceberg.catalog.TableIdentifier; +import org.immutables.value.Value; + +/** Operation representing a view rename. */ +@Value.Immutable +public interface RenameViewOperation extends CatalogOperation { + + @Override + @Value.Default + default String operationType() { + return "rename-view"; + } + + TableIdentifier source(); + + TableIdentifier destination(); + + String viewUuid(); +} diff --git a/core/src/main/java/org/apache/iceberg/rest/events/UpdateNamespacePropertiesOperation.java b/core/src/main/java/org/apache/iceberg/rest/events/UpdateNamespacePropertiesOperation.java new file mode 100644 index 000000000000..d4613b7c6ffa --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/rest/events/UpdateNamespacePropertiesOperation.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.rest.events; + +import java.util.Collections; +import java.util.List; +import org.apache.iceberg.catalog.Namespace; +import org.immutables.value.Value; + +/** Operation representing an update to namespace properties. */ +@Value.Immutable +public interface UpdateNamespacePropertiesOperation extends CatalogOperation { + + @Override + @Value.Default + default String operationType() { + return "update-namespace-properties"; + } + + Namespace namespace(); + + List updated(); + + List removed(); + + @Value.Default + default List missing() { + return Collections.emptyList(); + } +} diff --git a/core/src/main/java/org/apache/iceberg/rest/events/UpdateTableOperation.java b/core/src/main/java/org/apache/iceberg/rest/events/UpdateTableOperation.java new file mode 100644 index 000000000000..09c9145a53e6 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/rest/events/UpdateTableOperation.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.rest.events; + +import java.util.List; +import javax.annotation.Nullable; +import org.apache.iceberg.MetadataUpdate; +import org.apache.iceberg.UpdateRequirement; +import org.apache.iceberg.catalog.TableIdentifier; +import org.immutables.value.Value; + +/** Operation representing an update to a table. */ +@Value.Immutable +public interface UpdateTableOperation extends CatalogOperation { + + @Override + @Value.Default + default String operationType() { + return "update-table"; + } + + TableIdentifier identifier(); + + String tableUuid(); + + List updates(); + + @Nullable + List requirements(); +} diff --git a/core/src/main/java/org/apache/iceberg/rest/events/UpdateViewOperation.java b/core/src/main/java/org/apache/iceberg/rest/events/UpdateViewOperation.java new file mode 100644 index 000000000000..135337a72d66 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/rest/events/UpdateViewOperation.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.rest.events; + +import java.util.List; +import javax.annotation.Nullable; +import org.apache.iceberg.MetadataUpdate; +import org.apache.iceberg.UpdateRequirement; +import org.apache.iceberg.catalog.TableIdentifier; +import org.immutables.value.Value; + +/** Operation representing an update to a view. */ +@Value.Immutable +public interface UpdateViewOperation extends CatalogOperation { + + @Override + @Value.Default + default String operationType() { + return "update-view"; + } + + TableIdentifier identifier(); + + String viewUuid(); + + List updates(); + + @Nullable + List requirements(); +} diff --git a/core/src/main/java/org/apache/iceberg/rest/requests/QueryEventsRequest.java b/core/src/main/java/org/apache/iceberg/rest/requests/QueryEventsRequest.java new file mode 100644 index 000000000000..bc57af47a8e6 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/rest/requests/QueryEventsRequest.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.rest.requests; + +import java.util.List; +import java.util.Map; +import javax.annotation.Nullable; +import org.apache.iceberg.rest.RESTRequest; +import org.immutables.value.Value; + +/** + * A request to query catalog change events. + * + *

All fields are optional and serve as filters. If no filters are provided, all events are + * returned. + */ +@Value.Immutable +public interface QueryEventsRequest extends RESTRequest { + + /** Continuation token to resume fetching events from a previous request. */ + @Nullable + String continuationToken(); + + /** Maximum number of events to return in a single response. */ + @Nullable + Integer pageSize(); + + /** Timestamp in milliseconds to start consuming events from (inclusive). */ + @Nullable + Long afterTimestampMs(); + + /** Filter events by the type of operation. */ + @Nullable + List operationTypes(); + + /** Filter events by catalog objects referenced by name (namespaces, tables, views). */ + @Nullable + List> catalogObjectsByName(); + + /** Filter events by catalog objects referenced by UUID. */ + @Nullable + List catalogObjectsById(); + + /** Filter events by the type of catalog object (namespace, table, view). */ + @Nullable + List objectTypes(); + + /** Implementation-specific filter extensions. */ + @Nullable + Map customFilters(); + + @Override + default void validate() {} + + /** Identifies a catalog object by UUID and type. */ + @Value.Immutable + interface CatalogObjectUuid { + String uuid(); + + String type(); + } +} diff --git a/core/src/main/java/org/apache/iceberg/rest/requests/QueryEventsRequestParser.java b/core/src/main/java/org/apache/iceberg/rest/requests/QueryEventsRequestParser.java new file mode 100644 index 000000000000..b438abec69ec --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/rest/requests/QueryEventsRequestParser.java @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.rest.requests; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonNode; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.util.JsonUtil; + +public class QueryEventsRequestParser { + + private static final String CONTINUATION_TOKEN = "continuation-token"; + private static final String PAGE_SIZE = "page-size"; + private static final String AFTER_TIMESTAMP_MS = "after-timestamp-ms"; + private static final String OPERATION_TYPES = "operation-types"; + private static final String CATALOG_OBJECTS_BY_NAME = "catalog-objects-by-name"; + private static final String CATALOG_OBJECTS_BY_ID = "catalog-objects-by-id"; + private static final String OBJECT_TYPES = "object-types"; + private static final String CUSTOM_FILTERS = "custom-filters"; + private static final String UUID_FIELD = "uuid"; + private static final String TYPE_FIELD = "type"; + + private QueryEventsRequestParser() {} + + public static String toJson(QueryEventsRequest request) { + return toJson(request, false); + } + + public static String toJson(QueryEventsRequest request, boolean pretty) { + return JsonUtil.generate(gen -> toJson(request, gen), pretty); + } + + public static void toJson(QueryEventsRequest request, JsonGenerator gen) throws IOException { + Preconditions.checkArgument(null != request, "Invalid query events request: null"); + + gen.writeStartObject(); + writeScalarFields(request, gen); + writeFilterFields(request, gen); + gen.writeEndObject(); + } + + public static QueryEventsRequest fromJson(String json) { + return JsonUtil.parse(json, QueryEventsRequestParser::fromJson); + } + + public static QueryEventsRequest fromJson(JsonNode json) { + Preconditions.checkArgument(null != json, "Cannot parse query events request from null object"); + Preconditions.checkArgument( + json.isObject(), "Cannot parse query events request from non-object: %s", json); + + ImmutableQueryEventsRequest.Builder builder = ImmutableQueryEventsRequest.builder(); + parseScalarFields(json, builder); + parseFilterFields(json, builder); + return builder.build(); + } + + private static void writeScalarFields(QueryEventsRequest request, JsonGenerator gen) + throws IOException { + if (request.continuationToken() != null) { + gen.writeStringField(CONTINUATION_TOKEN, request.continuationToken()); + } + + if (request.pageSize() != null) { + gen.writeNumberField(PAGE_SIZE, request.pageSize()); + } + + if (request.afterTimestampMs() != null) { + gen.writeNumberField(AFTER_TIMESTAMP_MS, request.afterTimestampMs()); + } + } + + private static void writeFilterFields(QueryEventsRequest request, JsonGenerator gen) + throws IOException { + if (request.operationTypes() != null) { + JsonUtil.writeStringArray(OPERATION_TYPES, request.operationTypes(), gen); + } + + if (request.catalogObjectsByName() != null) { + catalogObjectsByNameToJson(request.catalogObjectsByName(), gen); + } + + if (request.catalogObjectsById() != null) { + catalogObjectsByIdToJson(request.catalogObjectsById(), gen); + } + + if (request.objectTypes() != null) { + JsonUtil.writeStringArray(OBJECT_TYPES, request.objectTypes(), gen); + } + + if (request.customFilters() != null) { + customFiltersToJson(request.customFilters(), gen); + } + } + + private static void catalogObjectsByNameToJson(List> names, JsonGenerator gen) + throws IOException { + gen.writeArrayFieldStart(CATALOG_OBJECTS_BY_NAME); + for (List name : names) { + gen.writeStartArray(); + for (String part : name) { + gen.writeString(part); + } + gen.writeEndArray(); + } + gen.writeEndArray(); + } + + private static void catalogObjectsByIdToJson( + List objects, JsonGenerator gen) throws IOException { + gen.writeArrayFieldStart(CATALOG_OBJECTS_BY_ID); + for (QueryEventsRequest.CatalogObjectUuid obj : objects) { + gen.writeStartObject(); + gen.writeStringField(UUID_FIELD, obj.uuid()); + gen.writeStringField(TYPE_FIELD, obj.type()); + gen.writeEndObject(); + } + gen.writeEndArray(); + } + + private static void customFiltersToJson(Map filters, JsonGenerator gen) + throws IOException { + gen.writeObjectFieldStart(CUSTOM_FILTERS); + for (Map.Entry entry : filters.entrySet()) { + gen.writeFieldName(entry.getKey()); + gen.writeObject(entry.getValue()); + } + gen.writeEndObject(); + } + + private static void parseScalarFields( + JsonNode json, ImmutableQueryEventsRequest.Builder builder) { + if (json.has(CONTINUATION_TOKEN)) { + builder.continuationToken(JsonUtil.getString(CONTINUATION_TOKEN, json)); + } + + if (json.has(PAGE_SIZE)) { + builder.pageSize(JsonUtil.getInt(PAGE_SIZE, json)); + } + + if (json.has(AFTER_TIMESTAMP_MS)) { + builder.afterTimestampMs(JsonUtil.getLong(AFTER_TIMESTAMP_MS, json)); + } + } + + private static void parseFilterFields( + JsonNode json, ImmutableQueryEventsRequest.Builder builder) { + if (json.has(OPERATION_TYPES)) { + builder.operationTypes(ImmutableList.copyOf(JsonUtil.getStringList(OPERATION_TYPES, json))); + } + + if (json.has(CATALOG_OBJECTS_BY_NAME)) { + builder.catalogObjectsByName(catalogObjectsByNameFromJson(json)); + } + + if (json.has(CATALOG_OBJECTS_BY_ID)) { + builder.catalogObjectsById(catalogObjectsByIdFromJson(json)); + } + + if (json.has(OBJECT_TYPES)) { + builder.objectTypes(ImmutableList.copyOf(JsonUtil.getStringList(OBJECT_TYPES, json))); + } + + if (json.has(CUSTOM_FILTERS)) { + builder.customFilters(customFiltersFromJson(json)); + } + } + + private static List> catalogObjectsByNameFromJson(JsonNode json) { + ImmutableList.Builder> names = ImmutableList.builder(); + for (JsonNode nameNode : json.get(CATALOG_OBJECTS_BY_NAME)) { + names.add(ImmutableList.copyOf(JsonUtil.getStringArray(nameNode))); + } + return names.build(); + } + + private static List catalogObjectsByIdFromJson( + JsonNode json) { + ImmutableList.Builder objects = ImmutableList.builder(); + for (JsonNode objNode : json.get(CATALOG_OBJECTS_BY_ID)) { + objects.add( + ImmutableCatalogObjectUuid.builder() + .uuid(JsonUtil.getString(UUID_FIELD, objNode)) + .type(JsonUtil.getString(TYPE_FIELD, objNode)) + .build()); + } + return objects.build(); + } + + private static Map customFiltersFromJson(JsonNode json) { + ImmutableMap.Builder filters = ImmutableMap.builder(); + json.get(CUSTOM_FILTERS) + .properties() + .forEach( + entry -> { + JsonNode value = entry.getValue(); + if (value.isTextual()) { + filters.put(entry.getKey(), value.asText()); + } else if (value.isNumber()) { + filters.put(entry.getKey(), value.numberValue()); + } else if (value.isBoolean()) { + filters.put(entry.getKey(), value.asBoolean()); + } else { + filters.put(entry.getKey(), value.toString()); + } + }); + return filters.build(); + } +} diff --git a/core/src/main/java/org/apache/iceberg/rest/responses/QueryEventsResponse.java b/core/src/main/java/org/apache/iceberg/rest/responses/QueryEventsResponse.java new file mode 100644 index 000000000000..4a03995e5a3a --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/rest/responses/QueryEventsResponse.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.rest.responses; + +import java.util.List; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.rest.RESTResponse; +import org.apache.iceberg.rest.events.Event; +import org.immutables.value.Value; + +/** Response containing a page of catalog change events. */ +@Value.Immutable +public interface QueryEventsResponse extends RESTResponse { + + /** + * Opaque continuation token to fetch the next page of events. Clients should pass this unmodified + * in subsequent requests. + */ + String nextPageToken(); + + /** The highest event timestamp processed when generating this response. */ + long highestProcessedTimestampMs(); + + /** The list of events in this response page. */ + List events(); + + @Override + default void validate() { + Preconditions.checkArgument(nextPageToken() != null, "Invalid next-page-token: null"); + } +} diff --git a/core/src/main/java/org/apache/iceberg/rest/responses/QueryEventsResponseParser.java b/core/src/main/java/org/apache/iceberg/rest/responses/QueryEventsResponseParser.java new file mode 100644 index 000000000000..eedae45c5063 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/rest/responses/QueryEventsResponseParser.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.rest.responses; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonNode; +import java.io.IOException; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.rest.events.Event; +import org.apache.iceberg.rest.events.EventParser; +import org.apache.iceberg.util.JsonUtil; + +public class QueryEventsResponseParser { + + private static final String NEXT_PAGE_TOKEN = "next-page-token"; + private static final String HIGHEST_PROCESSED_TIMESTAMP_MS = "highest-processed-timestamp-ms"; + private static final String EVENTS = "events"; + + private QueryEventsResponseParser() {} + + public static String toJson(QueryEventsResponse response) { + return toJson(response, false); + } + + public static String toJson(QueryEventsResponse response, boolean pretty) { + return JsonUtil.generate(gen -> toJson(response, gen), pretty); + } + + public static void toJson(QueryEventsResponse response, JsonGenerator gen) throws IOException { + Preconditions.checkArgument(null != response, "Invalid query events response: null"); + + gen.writeStartObject(); + + gen.writeStringField(NEXT_PAGE_TOKEN, response.nextPageToken()); + gen.writeNumberField(HIGHEST_PROCESSED_TIMESTAMP_MS, response.highestProcessedTimestampMs()); + + gen.writeArrayFieldStart(EVENTS); + for (Event event : response.events()) { + EventParser.toJson(event, gen); + } + gen.writeEndArray(); + + gen.writeEndObject(); + } + + public static QueryEventsResponse fromJson(String json) { + return JsonUtil.parse(json, QueryEventsResponseParser::fromJson); + } + + public static QueryEventsResponse fromJson(JsonNode json) { + Preconditions.checkArgument( + null != json, "Cannot parse query events response from null object"); + Preconditions.checkArgument( + json.isObject(), "Cannot parse query events response from non-object: %s", json); + + ImmutableList.Builder events = ImmutableList.builder(); + for (JsonNode eventNode : json.get(EVENTS)) { + events.add(EventParser.fromJson(eventNode)); + } + + return ImmutableQueryEventsResponse.builder() + .nextPageToken(JsonUtil.getString(NEXT_PAGE_TOKEN, json)) + .highestProcessedTimestampMs(JsonUtil.getLong(HIGHEST_PROCESSED_TIMESTAMP_MS, json)) + .events(events.build()) + .build(); + } +} diff --git a/core/src/test/java/org/apache/iceberg/rest/events/TestEventParser.java b/core/src/test/java/org/apache/iceberg/rest/events/TestEventParser.java new file mode 100644 index 000000000000..e11b06834453 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/rest/events/TestEventParser.java @@ -0,0 +1,321 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.rest.events; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.junit.jupiter.api.Test; + +public class TestEventParser { + + @Test + public void testRoundTripCreateNamespaceEvent() { + Event event = + ImmutableEvent.builder() + .eventId("evt-001") + .requestId("req-001") + .requestEventCount(1) + .timestampMs(1714000000000L) + .actor(ImmutableMap.of("user", "alice")) + .operation( + ImmutableCreateNamespaceOperation.builder() + .namespace(Namespace.of("accounting", "tax")) + .putProperties("owner", "alice") + .build()) + .build(); + + String json = EventParser.toJson(event); + Event parsed = EventParser.fromJson(json); + + assertThat(parsed.eventId()).isEqualTo(event.eventId()); + assertThat(parsed.requestId()).isEqualTo(event.requestId()); + assertThat(parsed.requestEventCount()).isEqualTo(event.requestEventCount()); + assertThat(parsed.timestampMs()).isEqualTo(event.timestampMs()); + assertThat(parsed.actor()).isEqualTo(event.actor()); + assertThat(parsed.operation().operationType()).isEqualTo("create-namespace"); + + CreateNamespaceOperation op = (CreateNamespaceOperation) parsed.operation(); + assertThat(op.namespace()).isEqualTo(Namespace.of("accounting", "tax")); + assertThat(op.properties()).containsEntry("owner", "alice"); + } + + @Test + public void testRoundTripDropNamespaceEvent() { + Event event = + ImmutableEvent.builder() + .eventId("evt-002") + .requestId("req-002") + .requestEventCount(1) + .timestampMs(1714000000000L) + .operation( + ImmutableDropNamespaceOperation.builder() + .namespace(Namespace.of("old_namespace")) + .build()) + .build(); + + String json = EventParser.toJson(event); + Event parsed = EventParser.fromJson(json); + + assertThat(parsed.eventId()).isEqualTo("evt-002"); + assertThat(parsed.actor()).isNull(); + assertThat(parsed.operation().operationType()).isEqualTo("drop-namespace"); + + DropNamespaceOperation op = (DropNamespaceOperation) parsed.operation(); + assertThat(op.namespace()).isEqualTo(Namespace.of("old_namespace")); + } + + @Test + public void testRoundTripDropTableEvent() { + Event event = + ImmutableEvent.builder() + .eventId("evt-003") + .requestId("req-003") + .requestEventCount(2) + .timestampMs(1714000000000L) + .actor(ImmutableMap.of("user", "bob", "role", "admin")) + .operation( + ImmutableDropTableOperation.builder() + .identifier(TableIdentifier.of(Namespace.of("db"), "users")) + .tableUuid("123e4567-e89b-12d3-a456-426614174000") + .purge(true) + .build()) + .build(); + + String json = EventParser.toJson(event); + Event parsed = EventParser.fromJson(json); + + assertThat(parsed.eventId()).isEqualTo("evt-003"); + assertThat(parsed.requestEventCount()).isEqualTo(2); + assertThat(parsed.operation().operationType()).isEqualTo("drop-table"); + + DropTableOperation op = (DropTableOperation) parsed.operation(); + assertThat(op.identifier()).isEqualTo(TableIdentifier.of(Namespace.of("db"), "users")); + assertThat(op.tableUuid()).isEqualTo("123e4567-e89b-12d3-a456-426614174000"); + assertThat(op.purge()).isTrue(); + } + + @Test + public void testDropTableWithoutPurge() { + Event event = + ImmutableEvent.builder() + .eventId("evt-004") + .requestId("req-004") + .requestEventCount(1) + .timestampMs(1714000000000L) + .operation( + ImmutableDropTableOperation.builder() + .identifier(TableIdentifier.of(Namespace.of("db"), "temp")) + .tableUuid("abc-def-123") + .build()) + .build(); + + String json = EventParser.toJson(event); + Event parsed = EventParser.fromJson(json); + + DropTableOperation op = (DropTableOperation) parsed.operation(); + assertThat(op.purge()).isNull(); + } + + @Test + public void testRoundTripRenameTableEvent() { + Event event = + ImmutableEvent.builder() + .eventId("evt-005") + .requestId("req-005") + .requestEventCount(1) + .timestampMs(1714000000000L) + .operation( + ImmutableRenameTableOperation.builder() + .source(TableIdentifier.of(Namespace.of("db"), "old_name")) + .destination(TableIdentifier.of(Namespace.of("db"), "new_name")) + .tableUuid("uuid-123") + .build()) + .build(); + + String json = EventParser.toJson(event); + Event parsed = EventParser.fromJson(json); + + assertThat(parsed.operation().operationType()).isEqualTo("rename-table"); + RenameTableOperation op = (RenameTableOperation) parsed.operation(); + assertThat(op.source()).isEqualTo(TableIdentifier.of(Namespace.of("db"), "old_name")); + assertThat(op.destination()).isEqualTo(TableIdentifier.of(Namespace.of("db"), "new_name")); + assertThat(op.tableUuid()).isEqualTo("uuid-123"); + } + + @Test + public void testRoundTripDropViewEvent() { + Event event = + ImmutableEvent.builder() + .eventId("evt-006") + .requestId("req-006") + .requestEventCount(1) + .timestampMs(1714000000000L) + .operation( + ImmutableDropViewOperation.builder() + .identifier(TableIdentifier.of(Namespace.of("analytics"), "daily_report")) + .viewUuid("view-uuid-456") + .build()) + .build(); + + String json = EventParser.toJson(event); + Event parsed = EventParser.fromJson(json); + + assertThat(parsed.operation().operationType()).isEqualTo("drop-view"); + DropViewOperation op = (DropViewOperation) parsed.operation(); + assertThat(op.identifier()) + .isEqualTo(TableIdentifier.of(Namespace.of("analytics"), "daily_report")); + assertThat(op.viewUuid()).isEqualTo("view-uuid-456"); + } + + @Test + public void testRoundTripRenameViewEvent() { + Event event = + ImmutableEvent.builder() + .eventId("evt-007") + .requestId("req-007") + .requestEventCount(1) + .timestampMs(1714000000000L) + .operation( + ImmutableRenameViewOperation.builder() + .source(TableIdentifier.of(Namespace.of("db"), "old_view")) + .destination(TableIdentifier.of(Namespace.of("db"), "new_view")) + .viewUuid("view-uuid-789") + .build()) + .build(); + + String json = EventParser.toJson(event); + Event parsed = EventParser.fromJson(json); + + assertThat(parsed.operation().operationType()).isEqualTo("rename-view"); + RenameViewOperation op = (RenameViewOperation) parsed.operation(); + assertThat(op.source()).isEqualTo(TableIdentifier.of(Namespace.of("db"), "old_view")); + assertThat(op.destination()).isEqualTo(TableIdentifier.of(Namespace.of("db"), "new_view")); + assertThat(op.viewUuid()).isEqualTo("view-uuid-789"); + } + + @Test + public void testRoundTripUpdateNamespacePropertiesEvent() { + Event event = + ImmutableEvent.builder() + .eventId("evt-008") + .requestId("req-008") + .requestEventCount(1) + .timestampMs(1714000000000L) + .operation( + ImmutableUpdateNamespacePropertiesOperation.builder() + .namespace(Namespace.of("production")) + .updated(ImmutableList.of("owner", "team")) + .removed(ImmutableList.of("deprecated")) + .build()) + .build(); + + String json = EventParser.toJson(event); + Event parsed = EventParser.fromJson(json); + + assertThat(parsed.operation().operationType()).isEqualTo("update-namespace-properties"); + UpdateNamespacePropertiesOperation op = (UpdateNamespacePropertiesOperation) parsed.operation(); + assertThat(op.namespace()).isEqualTo(Namespace.of("production")); + assertThat(op.updated()).containsExactly("owner", "team"); + assertThat(op.removed()).containsExactly("deprecated"); + assertThat(op.missing()).isEmpty(); + } + + @Test + public void testRoundTripCustomOperationEvent() { + Event event = + ImmutableEvent.builder() + .eventId("evt-009") + .requestId("req-009") + .requestEventCount(1) + .timestampMs(1714000000000L) + .operation( + ImmutableCustomOperation.builder() + .customType("x-compact-table") + .identifier(TableIdentifier.of(Namespace.of("db"), "events")) + .tableUuid("table-uuid-123") + .putAdditionalProperties("compaction-strategy", "binpack") + .build()) + .build(); + + String json = EventParser.toJson(event); + Event parsed = EventParser.fromJson(json); + + assertThat(parsed.operation().operationType()).isEqualTo("custom"); + CustomOperation op = (CustomOperation) parsed.operation(); + assertThat(op.customType()).isEqualTo("x-compact-table"); + assertThat(op.identifier()).isEqualTo(TableIdentifier.of(Namespace.of("db"), "events")); + assertThat(op.tableUuid()).isEqualTo("table-uuid-123"); + assertThat(op.namespace()).isNull(); + assertThat(op.viewUuid()).isNull(); + assertThat(op.additionalProperties()).containsEntry("compaction-strategy", "binpack"); + } + + @Test + public void testForwardCompatibilityUnknownOperationType() { + // Unknown operation types should not throw — they should be deserialized as CustomOperation + String json = + "{\"event-id\":\"evt-010\",\"request-id\":\"req-010\"," + + "\"request-event-count\":1,\"timestamp-ms\":1714000000000," + + "\"operation\":{\"operation-type\":\"x-future-operation\"}}"; + + Event parsed = EventParser.fromJson(json); + assertThat(parsed.operation()).isInstanceOf(CustomOperation.class); + assertThat(parsed.operation().operationType()).isEqualTo("x-future-operation"); + } + + @Test + public void testCreateNamespaceWithEmptyProperties() { + Event event = + ImmutableEvent.builder() + .eventId("evt-011") + .requestId("req-011") + .requestEventCount(1) + .timestampMs(1714000000000L) + .operation( + ImmutableCreateNamespaceOperation.builder() + .namespace(Namespace.of("empty_ns")) + .build()) + .build(); + + String json = EventParser.toJson(event); + Event parsed = EventParser.fromJson(json); + + CreateNamespaceOperation op = (CreateNamespaceOperation) parsed.operation(); + assertThat(op.properties()).isEmpty(); + } + + @Test + public void testNullEventThrows() { + assertThatThrownBy(() -> EventParser.toJson(null)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Invalid event: null"); + } + + @Test + public void testNullJsonThrows() { + assertThatThrownBy(() -> EventParser.fromJson((String) null)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("argument \"content\" is null"); + } +} diff --git a/core/src/test/java/org/apache/iceberg/rest/requests/TestQueryEventsRequestParser.java b/core/src/test/java/org/apache/iceberg/rest/requests/TestQueryEventsRequestParser.java new file mode 100644 index 000000000000..6c8d9f584fb3 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/rest/requests/TestQueryEventsRequestParser.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.rest.requests; + +import static org.assertj.core.api.Assertions.assertThat; + +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.junit.jupiter.api.Test; + +public class TestQueryEventsRequestParser { + + @Test + public void testRoundTripEmptyRequest() { + QueryEventsRequest request = ImmutableQueryEventsRequest.builder().build(); + + String json = QueryEventsRequestParser.toJson(request); + QueryEventsRequest parsed = QueryEventsRequestParser.fromJson(json); + + assertThat(parsed.continuationToken()).isNull(); + assertThat(parsed.pageSize()).isNull(); + assertThat(parsed.afterTimestampMs()).isNull(); + assertThat(parsed.operationTypes()).isNull(); + assertThat(parsed.catalogObjectsByName()).isNull(); + assertThat(parsed.catalogObjectsById()).isNull(); + assertThat(parsed.objectTypes()).isNull(); + assertThat(parsed.customFilters()).isNull(); + } + + @Test + public void testRoundTripWithContinuationToken() { + QueryEventsRequest request = + ImmutableQueryEventsRequest.builder() + .continuationToken("token-abc-123") + .pageSize(100) + .build(); + + String json = QueryEventsRequestParser.toJson(request); + QueryEventsRequest parsed = QueryEventsRequestParser.fromJson(json); + + assertThat(parsed.continuationToken()).isEqualTo("token-abc-123"); + assertThat(parsed.pageSize()).isEqualTo(100); + } + + @Test + public void testRoundTripWithAllFields() { + QueryEventsRequest request = + ImmutableQueryEventsRequest.builder() + .continuationToken("token-xyz") + .pageSize(50) + .afterTimestampMs(1714000000000L) + .operationTypes(ImmutableList.of("create-table", "drop-table")) + .catalogObjectsByName( + ImmutableList.of( + ImmutableList.of("accounting", "tax"), ImmutableList.of("marketing"))) + .catalogObjectsById( + ImmutableList.of( + ImmutableCatalogObjectUuid.builder() + .uuid("123e4567-e89b-12d3-a456-426614174000") + .type("table") + .build())) + .objectTypes(ImmutableList.of("table", "view")) + .customFilters(ImmutableMap.of("region", "us-east-1")) + .build(); + + String json = QueryEventsRequestParser.toJson(request); + QueryEventsRequest parsed = QueryEventsRequestParser.fromJson(json); + + assertThat(parsed.continuationToken()).isEqualTo("token-xyz"); + assertThat(parsed.pageSize()).isEqualTo(50); + assertThat(parsed.afterTimestampMs()).isEqualTo(1714000000000L); + assertThat(parsed.operationTypes()).containsExactly("create-table", "drop-table"); + assertThat(parsed.catalogObjectsByName()).hasSize(2); + assertThat(parsed.catalogObjectsByName().get(0)).containsExactly("accounting", "tax"); + assertThat(parsed.catalogObjectsByName().get(1)).containsExactly("marketing"); + assertThat(parsed.catalogObjectsById()).hasSize(1); + assertThat(parsed.catalogObjectsById().get(0).uuid()) + .isEqualTo("123e4567-e89b-12d3-a456-426614174000"); + assertThat(parsed.catalogObjectsById().get(0).type()).isEqualTo("table"); + assertThat(parsed.objectTypes()).containsExactly("table", "view"); + assertThat(parsed.customFilters()).containsEntry("region", "us-east-1"); + } + + @Test + public void testRoundTripWithOperationTypesOnly() { + QueryEventsRequest request = + ImmutableQueryEventsRequest.builder() + .operationTypes(ImmutableList.of("create-namespace", "drop-namespace")) + .build(); + + String json = QueryEventsRequestParser.toJson(request); + QueryEventsRequest parsed = QueryEventsRequestParser.fromJson(json); + + assertThat(parsed.operationTypes()).containsExactly("create-namespace", "drop-namespace"); + assertThat(parsed.continuationToken()).isNull(); + } +} diff --git a/core/src/test/java/org/apache/iceberg/rest/responses/TestQueryEventsResponseParser.java b/core/src/test/java/org/apache/iceberg/rest/responses/TestQueryEventsResponseParser.java new file mode 100644 index 000000000000..53c6017a923c --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/rest/responses/TestQueryEventsResponseParser.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.rest.responses; + +import static org.assertj.core.api.Assertions.assertThat; + +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.rest.events.CreateNamespaceOperation; +import org.apache.iceberg.rest.events.DropTableOperation; +import org.apache.iceberg.rest.events.Event; +import org.apache.iceberg.rest.events.ImmutableCreateNamespaceOperation; +import org.apache.iceberg.rest.events.ImmutableDropTableOperation; +import org.apache.iceberg.rest.events.ImmutableEvent; +import org.junit.jupiter.api.Test; + +public class TestQueryEventsResponseParser { + + @Test + public void testRoundTripWithMultipleEvents() { + Event createNsEvent = + ImmutableEvent.builder() + .eventId("evt-001") + .requestId("req-001") + .requestEventCount(1) + .timestampMs(1714000000000L) + .actor(ImmutableMap.of("user", "alice")) + .operation( + ImmutableCreateNamespaceOperation.builder() + .namespace(Namespace.of("analytics")) + .putProperties("owner", "data-team") + .build()) + .build(); + + Event dropTableEvent = + ImmutableEvent.builder() + .eventId("evt-002") + .requestId("req-002") + .requestEventCount(1) + .timestampMs(1714000001000L) + .operation( + ImmutableDropTableOperation.builder() + .identifier(TableIdentifier.of(Namespace.of("analytics"), "old_metrics")) + .tableUuid("abc-123") + .purge(false) + .build()) + .build(); + + QueryEventsResponse response = + ImmutableQueryEventsResponse.builder() + .nextPageToken("next-token-xyz") + .highestProcessedTimestampMs(1714000001000L) + .events(ImmutableList.of(createNsEvent, dropTableEvent)) + .build(); + + String json = QueryEventsResponseParser.toJson(response); + QueryEventsResponse parsed = QueryEventsResponseParser.fromJson(json); + + assertThat(parsed.nextPageToken()).isEqualTo("next-token-xyz"); + assertThat(parsed.highestProcessedTimestampMs()).isEqualTo(1714000001000L); + assertThat(parsed.events()).hasSize(2); + + Event parsedEvent1 = parsed.events().get(0); + assertThat(parsedEvent1.eventId()).isEqualTo("evt-001"); + assertThat(parsedEvent1.operation()).isInstanceOf(CreateNamespaceOperation.class); + + Event parsedEvent2 = parsed.events().get(1); + assertThat(parsedEvent2.eventId()).isEqualTo("evt-002"); + assertThat(parsedEvent2.operation()).isInstanceOf(DropTableOperation.class); + assertThat(((DropTableOperation) parsedEvent2.operation()).purge()).isFalse(); + } + + @Test + public void testRoundTripEmptyEvents() { + QueryEventsResponse response = + ImmutableQueryEventsResponse.builder() + .nextPageToken("end-token") + .highestProcessedTimestampMs(1714000000000L) + .events(ImmutableList.of()) + .build(); + + String json = QueryEventsResponseParser.toJson(response); + QueryEventsResponse parsed = QueryEventsResponseParser.fromJson(json); + + assertThat(parsed.nextPageToken()).isEqualTo("end-token"); + assertThat(parsed.highestProcessedTimestampMs()).isEqualTo(1714000000000L); + assertThat(parsed.events()).isEmpty(); + } +}