diff --git a/coral-schema/src/main/java/com/linkedin/coral/schema/avro/SchemaUtilities.java b/coral-schema/src/main/java/com/linkedin/coral/schema/avro/SchemaUtilities.java index be5ffb2c0..fc96da69e 100644 --- a/coral-schema/src/main/java/com/linkedin/coral/schema/avro/SchemaUtilities.java +++ b/coral-schema/src/main/java/com/linkedin/coral/schema/avro/SchemaUtilities.java @@ -496,7 +496,12 @@ static Schema addPartitionColsToSchema(@Nonnull Schema schema, @Nonnull Table ta convertFieldSchemaToAvroSchema("partitionCols", "partitionCols", true, tableOrView.getPartitionKeys()); List fieldsWithPartitionColumns = cloneFieldList(schema.getFields()); - fieldsWithPartitionColumns.addAll(cloneFieldList(partitionColumnsSchema.getFields(), true)); + Set existingFieldNames = schema.getFields().stream().map(Schema.Field::name).collect(Collectors.toSet()); + for (Schema.Field partitionField : cloneFieldList(partitionColumnsSchema.getFields(), true)) { + if (!existingFieldNames.contains(partitionField.name())) { + fieldsWithPartitionColumns.add(partitionField); + } + } Schema schemaWithPartitionColumns = Schema.createRecord(schema.getName(), schema.getDoc(), schema.getNamespace(), schema.isError()); diff --git a/coral-schema/src/test/java/com/linkedin/coral/schema/avro/SchemaUtilitiesTests.java b/coral-schema/src/test/java/com/linkedin/coral/schema/avro/SchemaUtilitiesTests.java index 0a53c87f1..83ae7a34f 100644 --- a/coral-schema/src/test/java/com/linkedin/coral/schema/avro/SchemaUtilitiesTests.java +++ b/coral-schema/src/test/java/com/linkedin/coral/schema/avro/SchemaUtilitiesTests.java @@ -6,6 +6,7 @@ package com.linkedin.coral.schema.avro; import java.util.ArrayList; +import java.util.Collections; import java.util.Arrays; import java.util.HashMap; import java.util.List; @@ -15,6 +16,9 @@ import org.apache.avro.Schema; import org.apache.avro.SchemaBuilder; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; import org.testng.Assert; import org.testng.annotations.Test; @@ -483,4 +487,50 @@ public void testSetupNameAndNamespaceDetectsDeeplyNestedCollisions() { Assert.assertTrue(metadataNamespace.contains("IntermediateRecord"), "Metadata namespace should follow hierarchical naming. Got: " + metadataNamespace); } + + /** + * Test that addPartitionColsToSchema does not add a duplicate field when the schema + * already contains a field with the same name as a partition column. + * This can happen when a Hive view projects a partition column as a regular column. + */ + @Test + public void testAddPartitionColsToSchemaSkipsDuplicates() { + Schema schemaWithPartCol = SchemaBuilder.record("testRecord").namespace("com.test").fields().name("id").type() + .nullable().intType().noDefault().name("date_col").type().nullable().stringType().noDefault().endRecord(); + + Table table = new Table(); + table.setDbName("default"); + table.setTableName("test_table"); + table.setPartitionKeys(Collections.singletonList(new FieldSchema("date_col", "string", null))); + table.setSd(new StorageDescriptor()); + + // Before the fix, this would throw: "Duplicate field date_col in record testRecord" + Schema result = SchemaUtilities.addPartitionColsToSchema(schemaWithPartCol, table); + + Assert.assertEquals(result.getFields().size(), 2, "Should have 2 fields: id and date_col (no duplicate)"); + Assert.assertNotNull(result.getField("id")); + Assert.assertNotNull(result.getField("date_col")); + } + + /** + * Test that addPartitionColsToSchema still adds partition columns when they don't already exist. + */ + @Test + public void testAddPartitionColsToSchemaAddsNewPartitionCol() { + Schema schemaWithoutPartCol = SchemaBuilder.record("testRecord").namespace("com.test").fields().name("id").type() + .nullable().intType().noDefault().name("value").type().nullable().stringType().noDefault().endRecord(); + + Table table = new Table(); + table.setDbName("default"); + table.setTableName("test_table"); + table.setPartitionKeys(Collections.singletonList(new FieldSchema("date_col", "string", null))); + table.setSd(new StorageDescriptor()); + + Schema result = SchemaUtilities.addPartitionColsToSchema(schemaWithoutPartCol, table); + + Assert.assertEquals(result.getFields().size(), 3, "Should have 3 fields: id, value, and date_col"); + Assert.assertNotNull(result.getField("id")); + Assert.assertNotNull(result.getField("value")); + Assert.assertNotNull(result.getField("date_col")); + } }