diff --git a/coral-schema/src/main/java/com/linkedin/coral/schema/avro/SchemaUtilities.java b/coral-schema/src/main/java/com/linkedin/coral/schema/avro/SchemaUtilities.java index 8b5bf292b..7a0231627 100644 --- a/coral-schema/src/main/java/com/linkedin/coral/schema/avro/SchemaUtilities.java +++ b/coral-schema/src/main/java/com/linkedin/coral/schema/avro/SchemaUtilities.java @@ -116,10 +116,7 @@ static Schema getAvroSchemaForTable(@Nonnull final Table table, boolean strictMo resultTableSchema = originalTableSchema; } else { final List cols = new ArrayList<>(table.getSd().getCols()); - // Add partition columns if table partitioned - if (isPartitioned(table)) { - cols.addAll(getPartitionCols(table)); - } + appendMissingPartitionCols(cols, table); resultTableSchema = MergeHiveSchemaWithAvro.visit(structTypeInfoFromCols(cols), originalTableSchema); } @@ -135,9 +132,7 @@ static Schema convertHiveSchemaToAvro(@Nonnull final Table table) { String recordNamespace = table.getDbName() + "." + recordName; final List cols = new ArrayList<>(table.getSd().getCols()); - if (isPartitioned(table)) { - cols.addAll(getPartitionCols(table)); - } + appendMissingPartitionCols(cols, table); return convertFieldSchemaToAvroSchema(recordName, recordNamespace, true, cols); } @@ -1025,6 +1020,23 @@ private static List getPartitionCols(@Nonnull Table tableOrView) { return partKeys; } + /** + * Appends the table's partition columns to {@code cols}, skipping any whose name is already present. + * A partition column may coexist as a regular column in the table's schema; re-adding it would + * produce a duplicate field when the combined list is converted to Avro. + */ + private static void appendMissingPartitionCols(@Nonnull List cols, @Nonnull Table tableOrView) { + if (!isPartitioned(tableOrView)) { + return; + } + Set existingNames = cols.stream().map(FieldSchema::getName).collect(Collectors.toSet()); + for (FieldSchema partCol : getPartitionCols(tableOrView)) { + if (!existingNames.contains(partCol.getName())) { + cols.add(partCol); + } + } + } + private static String getCompleteName(@Nonnull Table table) { Preconditions.checkNotNull(table); diff --git a/coral-schema/src/test/java/com/linkedin/coral/schema/avro/SchemaUtilitiesTests.java b/coral-schema/src/test/java/com/linkedin/coral/schema/avro/SchemaUtilitiesTests.java index 0dc5d94ca..71b52379e 100644 --- a/coral-schema/src/test/java/com/linkedin/coral/schema/avro/SchemaUtilitiesTests.java +++ b/coral-schema/src/test/java/com/linkedin/coral/schema/avro/SchemaUtilitiesTests.java @@ -6,6 +6,7 @@ package com.linkedin.coral.schema.avro; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -14,6 +15,9 @@ import org.apache.avro.Schema; import org.apache.avro.SchemaBuilder; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; import org.testng.Assert; import org.testng.annotations.Test; @@ -346,4 +350,58 @@ public void testSetupNameAndNamespaceDetectsDeeplyNestedCollisions() { Assert.assertTrue(metadataNamespace.contains("IntermediateRecord"), "Metadata namespace should follow hierarchical naming. Got: " + metadataNamespace); } + + /** + * Test that convertHiveSchemaToAvro does not duplicate a partition column when it already appears + * in the table's regular column list. This can happen when a partition column is materialized in + * the underlying data and also declared as a Hive partition key. + */ + @Test + public void testConvertHiveSchemaToAvroSkipsDuplicatePartitionCol() { + List regularCols = new ArrayList<>(); + regularCols.add(new FieldSchema("id", "int", null)); + regularCols.add(new FieldSchema("datepartition", "string", null)); + + StorageDescriptor sd = new StorageDescriptor(); + sd.setCols(regularCols); + + Table table = new Table(); + table.setDbName("default"); + table.setTableName("test_table"); + table.setSd(sd); + table.setPartitionKeys(Collections.singletonList(new FieldSchema("datepartition", "string", null))); + + Schema result = SchemaUtilities.convertHiveSchemaToAvro(table); + + Assert.assertEquals(result.getFields().size(), 2, + "Should have 2 fields: id and datepartition (partition col not duplicated)"); + Assert.assertNotNull(result.getField("id")); + Assert.assertNotNull(result.getField("datepartition")); + } + + /** + * Test that convertHiveSchemaToAvro still appends the partition column when it's not already present. + */ + @Test + public void testConvertHiveSchemaToAvroAppendsPartitionCol() { + List regularCols = new ArrayList<>(); + regularCols.add(new FieldSchema("id", "int", null)); + regularCols.add(new FieldSchema("value", "string", null)); + + StorageDescriptor sd = new StorageDescriptor(); + sd.setCols(regularCols); + + Table table = new Table(); + table.setDbName("default"); + table.setTableName("test_table"); + table.setSd(sd); + table.setPartitionKeys(Collections.singletonList(new FieldSchema("datepartition", "string", null))); + + Schema result = SchemaUtilities.convertHiveSchemaToAvro(table); + + Assert.assertEquals(result.getFields().size(), 3, "Should have 3 fields: id, value, datepartition"); + Assert.assertNotNull(result.getField("id")); + Assert.assertNotNull(result.getField("value")); + Assert.assertNotNull(result.getField("datepartition")); + } }