From 585f841c8a6ed95d89109cf413369c69f446ba7a Mon Sep 17 00:00:00 2001 From: Simbarashe Dzinamarira Date: Thu, 16 Apr 2026 17:44:57 -0700 Subject: [PATCH] [Coral-Schema] Skip duplicate partition columns when building Avro schema from Hive cols getAvroSchemaForTable and convertHiveSchemaToAvro both appended the table's partition columns to the regular column list before converting to Avro. When the regular column list already contained a partition column by name, the resulting Avro record failed with "Duplicate field X in record". This mirrors the dedup already applied in addPartitionColsToSchema. Extracted into a shared appendMissingPartitionCols helper and covered by tests for convertHiveSchemaToAvro. --- .../coral/schema/avro/SchemaUtilities.java | 26 ++++++--- .../schema/avro/SchemaUtilitiesTests.java | 58 +++++++++++++++++++ 2 files changed, 77 insertions(+), 7 deletions(-) diff --git a/coral-schema/src/main/java/com/linkedin/coral/schema/avro/SchemaUtilities.java b/coral-schema/src/main/java/com/linkedin/coral/schema/avro/SchemaUtilities.java index 8b5bf292b..7a0231627 100644 --- a/coral-schema/src/main/java/com/linkedin/coral/schema/avro/SchemaUtilities.java +++ b/coral-schema/src/main/java/com/linkedin/coral/schema/avro/SchemaUtilities.java @@ -116,10 +116,7 @@ static Schema getAvroSchemaForTable(@Nonnull final Table table, boolean strictMo resultTableSchema = originalTableSchema; } else { final List cols = new ArrayList<>(table.getSd().getCols()); - // Add partition columns if table partitioned - if (isPartitioned(table)) { - cols.addAll(getPartitionCols(table)); - } + appendMissingPartitionCols(cols, table); resultTableSchema = MergeHiveSchemaWithAvro.visit(structTypeInfoFromCols(cols), originalTableSchema); } @@ -135,9 +132,7 @@ static Schema convertHiveSchemaToAvro(@Nonnull final Table table) { String recordNamespace = table.getDbName() + "." + recordName; final List cols = new ArrayList<>(table.getSd().getCols()); - if (isPartitioned(table)) { - cols.addAll(getPartitionCols(table)); - } + appendMissingPartitionCols(cols, table); return convertFieldSchemaToAvroSchema(recordName, recordNamespace, true, cols); } @@ -1025,6 +1020,23 @@ private static List getPartitionCols(@Nonnull Table tableOrView) { return partKeys; } + /** + * Appends the table's partition columns to {@code cols}, skipping any whose name is already present. + * A partition column may coexist as a regular column in the table's schema; re-adding it would + * produce a duplicate field when the combined list is converted to Avro. + */ + private static void appendMissingPartitionCols(@Nonnull List cols, @Nonnull Table tableOrView) { + if (!isPartitioned(tableOrView)) { + return; + } + Set existingNames = cols.stream().map(FieldSchema::getName).collect(Collectors.toSet()); + for (FieldSchema partCol : getPartitionCols(tableOrView)) { + if (!existingNames.contains(partCol.getName())) { + cols.add(partCol); + } + } + } + private static String getCompleteName(@Nonnull Table table) { Preconditions.checkNotNull(table); diff --git a/coral-schema/src/test/java/com/linkedin/coral/schema/avro/SchemaUtilitiesTests.java b/coral-schema/src/test/java/com/linkedin/coral/schema/avro/SchemaUtilitiesTests.java index 0dc5d94ca..71b52379e 100644 --- a/coral-schema/src/test/java/com/linkedin/coral/schema/avro/SchemaUtilitiesTests.java +++ b/coral-schema/src/test/java/com/linkedin/coral/schema/avro/SchemaUtilitiesTests.java @@ -6,6 +6,7 @@ package com.linkedin.coral.schema.avro; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -14,6 +15,9 @@ import org.apache.avro.Schema; import org.apache.avro.SchemaBuilder; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; import org.testng.Assert; import org.testng.annotations.Test; @@ -346,4 +350,58 @@ public void testSetupNameAndNamespaceDetectsDeeplyNestedCollisions() { Assert.assertTrue(metadataNamespace.contains("IntermediateRecord"), "Metadata namespace should follow hierarchical naming. Got: " + metadataNamespace); } + + /** + * Test that convertHiveSchemaToAvro does not duplicate a partition column when it already appears + * in the table's regular column list. This can happen when a partition column is materialized in + * the underlying data and also declared as a Hive partition key. + */ + @Test + public void testConvertHiveSchemaToAvroSkipsDuplicatePartitionCol() { + List regularCols = new ArrayList<>(); + regularCols.add(new FieldSchema("id", "int", null)); + regularCols.add(new FieldSchema("datepartition", "string", null)); + + StorageDescriptor sd = new StorageDescriptor(); + sd.setCols(regularCols); + + Table table = new Table(); + table.setDbName("default"); + table.setTableName("test_table"); + table.setSd(sd); + table.setPartitionKeys(Collections.singletonList(new FieldSchema("datepartition", "string", null))); + + Schema result = SchemaUtilities.convertHiveSchemaToAvro(table); + + Assert.assertEquals(result.getFields().size(), 2, + "Should have 2 fields: id and datepartition (partition col not duplicated)"); + Assert.assertNotNull(result.getField("id")); + Assert.assertNotNull(result.getField("datepartition")); + } + + /** + * Test that convertHiveSchemaToAvro still appends the partition column when it's not already present. + */ + @Test + public void testConvertHiveSchemaToAvroAppendsPartitionCol() { + List regularCols = new ArrayList<>(); + regularCols.add(new FieldSchema("id", "int", null)); + regularCols.add(new FieldSchema("value", "string", null)); + + StorageDescriptor sd = new StorageDescriptor(); + sd.setCols(regularCols); + + Table table = new Table(); + table.setDbName("default"); + table.setTableName("test_table"); + table.setSd(sd); + table.setPartitionKeys(Collections.singletonList(new FieldSchema("datepartition", "string", null))); + + Schema result = SchemaUtilities.convertHiveSchemaToAvro(table); + + Assert.assertEquals(result.getFields().size(), 3, "Should have 3 fields: id, value, datepartition"); + Assert.assertNotNull(result.getField("id")); + Assert.assertNotNull(result.getField("value")); + Assert.assertNotNull(result.getField("datepartition")); + } }