Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -116,10 +116,7 @@ static Schema getAvroSchemaForTable(@Nonnull final Table table, boolean strictMo
resultTableSchema = originalTableSchema;
} else {
final List<FieldSchema> cols = new ArrayList<>(table.getSd().getCols());
// Add partition columns if table partitioned
if (isPartitioned(table)) {
cols.addAll(getPartitionCols(table));
}
appendMissingPartitionCols(cols, table);

resultTableSchema = MergeHiveSchemaWithAvro.visit(structTypeInfoFromCols(cols), originalTableSchema);
}
Expand All @@ -135,9 +132,7 @@ static Schema convertHiveSchemaToAvro(@Nonnull final Table table) {
String recordNamespace = table.getDbName() + "." + recordName;

final List<FieldSchema> cols = new ArrayList<>(table.getSd().getCols());
if (isPartitioned(table)) {
cols.addAll(getPartitionCols(table));
}
appendMissingPartitionCols(cols, table);

return convertFieldSchemaToAvroSchema(recordName, recordNamespace, true, cols);
}
Expand Down Expand Up @@ -1025,6 +1020,23 @@ private static List<FieldSchema> getPartitionCols(@Nonnull Table tableOrView) {
return partKeys;
}

/**
* Appends the table's partition columns to {@code cols}, skipping any whose name is already present.
* A partition column may coexist as a regular column in the table's schema; re-adding it would
* produce a duplicate field when the combined list is converted to Avro.
*/
private static void appendMissingPartitionCols(@Nonnull List<FieldSchema> cols, @Nonnull Table tableOrView) {
if (!isPartitioned(tableOrView)) {
return;
}
Set<String> existingNames = cols.stream().map(FieldSchema::getName).collect(Collectors.toSet());
for (FieldSchema partCol : getPartitionCols(tableOrView)) {
if (!existingNames.contains(partCol.getName())) {
cols.add(partCol);
}
}
}

private static String getCompleteName(@Nonnull Table table) {
Preconditions.checkNotNull(table);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package com.linkedin.coral.schema.avro;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -14,6 +15,9 @@

import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
import org.testng.Assert;
import org.testng.annotations.Test;

Expand Down Expand Up @@ -346,4 +350,58 @@ public void testSetupNameAndNamespaceDetectsDeeplyNestedCollisions() {
Assert.assertTrue(metadataNamespace.contains("IntermediateRecord"),
"Metadata namespace should follow hierarchical naming. Got: " + metadataNamespace);
}

/**
* Test that convertHiveSchemaToAvro does not duplicate a partition column when it already appears
* in the table's regular column list. This can happen when a partition column is materialized in
* the underlying data and also declared as a Hive partition key.
*/
@Test
public void testConvertHiveSchemaToAvroSkipsDuplicatePartitionCol() {
List<FieldSchema> regularCols = new ArrayList<>();
regularCols.add(new FieldSchema("id", "int", null));
regularCols.add(new FieldSchema("datepartition", "string", null));

StorageDescriptor sd = new StorageDescriptor();
sd.setCols(regularCols);

Table table = new Table();
table.setDbName("default");
table.setTableName("test_table");
table.setSd(sd);
table.setPartitionKeys(Collections.singletonList(new FieldSchema("datepartition", "string", null)));

Schema result = SchemaUtilities.convertHiveSchemaToAvro(table);

Assert.assertEquals(result.getFields().size(), 2,
"Should have 2 fields: id and datepartition (partition col not duplicated)");
Assert.assertNotNull(result.getField("id"));
Assert.assertNotNull(result.getField("datepartition"));
}

/**
* Test that convertHiveSchemaToAvro still appends the partition column when it's not already present.
*/
@Test
public void testConvertHiveSchemaToAvroAppendsPartitionCol() {
List<FieldSchema> regularCols = new ArrayList<>();
regularCols.add(new FieldSchema("id", "int", null));
regularCols.add(new FieldSchema("value", "string", null));

StorageDescriptor sd = new StorageDescriptor();
sd.setCols(regularCols);

Table table = new Table();
table.setDbName("default");
table.setTableName("test_table");
table.setSd(sd);
table.setPartitionKeys(Collections.singletonList(new FieldSchema("datepartition", "string", null)));

Schema result = SchemaUtilities.convertHiveSchemaToAvro(table);

Assert.assertEquals(result.getFields().size(), 3, "Should have 3 fields: id, value, datepartition");
Assert.assertNotNull(result.getField("id"));
Assert.assertNotNull(result.getField("value"));
Assert.assertNotNull(result.getField("datepartition"));
}
}
Loading