diff --git a/coral-schema/src/main/java/com/linkedin/coral/schema/avro/ToLowercaseSchemaVisitor.java b/coral-schema/src/main/java/com/linkedin/coral/schema/avro/ToLowercaseSchemaVisitor.java index ad2ca8c87..d1c32bc9c 100644 --- a/coral-schema/src/main/java/com/linkedin/coral/schema/avro/ToLowercaseSchemaVisitor.java +++ b/coral-schema/src/main/java/com/linkedin/coral/schema/avro/ToLowercaseSchemaVisitor.java @@ -1,16 +1,21 @@ /** - * Copyright 2022-2023 LinkedIn Corporation. All rights reserved. + * Copyright 2022-2026 LinkedIn Corporation. All rights reserved. * Licensed under the BSD-2 Clause license. * See LICENSE in the project root for license information. */ package com.linkedin.coral.schema.avro; +import java.util.LinkedHashMap; import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; import com.google.common.collect.Lists; import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper; import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; /** @@ -62,12 +67,163 @@ public Schema primitive(Schema primitive) { return primitive; } + /** + * Lowercases a field, including lowercasing any field names within its default value. + * @param field The original field + * @param schema The lowercased schema for this field + * @return A new field with lowercased name and lowercased default value + */ private Schema.Field lowercaseField(Schema.Field field, Schema schema) { + Object originalDefaultValue = SchemaUtilities.defaultValue(field); + Object lowercasedDefaultValue = lowercaseDefaultValue(originalDefaultValue, schema); + Schema.Field lowercasedField = AvroCompatibilityHelper.createSchemaField(field.name().toLowerCase(), schema, - field.doc(), SchemaUtilities.defaultValue(field), field.order()); + field.doc(), lowercasedDefaultValue, field.order()); SchemaUtilities.replicateFieldProps(field, lowercasedField); return lowercasedField; } + + /** + * Recursively lowercases field names within default values based on the schema structure. + * This handles complex types like records, maps, and arrays where field names appear in default values. + * + * @param fieldDefaultValue The original default value for a field (can be null, primitive, Map, List, etc.) + * @param fieldSchema The schema that describes the structure of this field's default value + * @return The default value with all field names lowercased + */ + @SuppressWarnings("unchecked") + private Object lowercaseDefaultValue(Object fieldDefaultValue, Schema fieldSchema) { + if (fieldDefaultValue == null) { + return null; + } + + // Handle union types to get the actual schema for processing the default value + // For nullable unions, extract the non-null type since we know defaultValue is non-null + Schema actualSchema = SchemaUtilities.extractIfOption(fieldSchema); + // If still a union after extracting nullable option (i.e., multi-type non-nullable union), + // the default value corresponds to the first type per Avro specification + if (actualSchema.getType() == Schema.Type.UNION) { + actualSchema = actualSchema.getTypes().get(0); + } + + switch (actualSchema.getType()) { + case RECORD: + // For records, the default value can be either a Map or GenericData.Record + if (fieldDefaultValue instanceof GenericData.Record) { + GenericData.Record record = (GenericData.Record) fieldDefaultValue; + return lowercaseRecordDefaultValue(actualSchema, lowercasedFieldName -> { + // Find the matching field in the original record's schema (case-insensitive) + Schema.Field originalField = record.getSchema().getField(lowercasedFieldName); + if (originalField == null) { + for (Schema.Field f : record.getSchema().getFields()) { + if (f.name().equalsIgnoreCase(lowercasedFieldName)) { + originalField = f; + break; + } + } + } + return originalField != null ? record.get(originalField.pos()) : null; + }); + } else if (fieldDefaultValue instanceof Map) { + Map recordMap = (Map) fieldDefaultValue; + return lowercaseRecordDefaultValue(actualSchema, lowercasedFieldName -> { + // Find the matching key in the original map (case-insensitive) + String matchingKey = findMatchingKeyForLowercased(recordMap, lowercasedFieldName); + return matchingKey != null ? recordMap.get(matchingKey) : null; + }); + } + // If neither Map nor GenericData.Record, return as-is + return fieldDefaultValue; + + case MAP: + // For maps, lowercase the keys and recursively process values + if (fieldDefaultValue instanceof Map) { + Map mapValue = (Map) fieldDefaultValue; // Use wildcards to handle Utf8 keys + Map lowercasedMap = new LinkedHashMap<>(); + Schema valueSchema = actualSchema.getValueType(); + + for (Map.Entry entry : mapValue.entrySet()) { + String originalKey = entry.getKey().toString(); // Handle both String and Utf8 + String lowercasedKey = originalKey.toLowerCase(); + Object lowercasedValue = lowercaseDefaultValue(entry.getValue(), valueSchema); + lowercasedMap.put(lowercasedKey, lowercasedValue); + } + return lowercasedMap; + } + return fieldDefaultValue; + + case ARRAY: + // For arrays, recursively process each element + if (fieldDefaultValue instanceof List) { + List arrayValue = (List) fieldDefaultValue; + Schema elementSchema = actualSchema.getElementType(); + + return arrayValue.stream().map(element -> lowercaseDefaultValue(element, elementSchema)) + .collect(Collectors.toList()); + } + return fieldDefaultValue; + + case NULL: + case BOOLEAN: + case INT: + case LONG: + case FLOAT: + case DOUBLE: + case BYTES: + case STRING: + case ENUM: + case FIXED: + default: + // Primitive types and others: return as-is + return fieldDefaultValue; + } + } + + /** + * Helper method that extracts the common logic for lowercasing record default values. + * This handles both GenericData.Record and Map-based default values. + * + * @param actualSchema The lowercased schema for the record + * @param valueExtractor Function that retrieves the original field value given a lowercased field name + * @return A Map with lowercased field names and recursively lowercased values + */ + private Map lowercaseRecordDefaultValue(Schema actualSchema, + Function valueExtractor) { + Map lowercasedRecordMap = new LinkedHashMap<>(); + + // Iterate through the lowercased schema fields + for (Schema.Field field : actualSchema.getFields()) { + String lowercasedFieldName = field.name(); + Object fieldValue = valueExtractor.apply(lowercasedFieldName); + + if (fieldValue != null) { + Object lowercasedFieldValue = lowercaseDefaultValue(fieldValue, field.schema()); + lowercasedRecordMap.put(lowercasedFieldName, lowercasedFieldValue); + } + } + + return lowercasedRecordMap; + } + + /** + * Finds a key in the original default value map that matches the lowercased field name. + * This is needed because the original default value may have field names in mixed case. + * + * @param map The map containing the original default value + * @param lowercasedFieldName The lowercased field name from the transformed schema + * @return The matching key from the original map, or null if not found + */ + private String findMatchingKeyForLowercased(Map map, String lowercasedFieldName) { + // Try case-insensitive match to find the original key + for (Object keyObj : map.keySet()) { + String key = keyObj.toString(); // Handle both String and Utf8 + if (key.equalsIgnoreCase(lowercasedFieldName)) { + return key; + } + } + + return null; + } } diff --git a/coral-schema/src/test/java/com/linkedin/coral/schema/avro/SchemaUtilitiesTests.java b/coral-schema/src/test/java/com/linkedin/coral/schema/avro/SchemaUtilitiesTests.java index 303b8cd52..b7e36fa74 100644 --- a/coral-schema/src/test/java/com/linkedin/coral/schema/avro/SchemaUtilitiesTests.java +++ b/coral-schema/src/test/java/com/linkedin/coral/schema/avro/SchemaUtilitiesTests.java @@ -271,4 +271,22 @@ public void testSetupNameAndNamespaceDetectsDeeplyNestedCollisions() { Assert.assertTrue(metadataNamespace.contains("IntermediateRecord"), "Metadata namespace should follow hierarchical naming. Got: " + metadataNamespace); } + + /** + * Test that ToLowercaseSchemaVisitor properly lowercases field names in default values. + * This test demonstrates the bug where complex default values (records, maps, arrays) + * retain their original casing while the schema itself is lowercased. + */ + @Test + public void testLowercaseSchemaWithComplexDefaultValues() { + Schema inputSchema = + AvroCompatibilityHelper.parse(TestUtils.loadSchema("testLowercaseSchemaWithDefaultValues-input.avsc")); + Schema outputSchema = ToLowercaseSchemaVisitor.visit(inputSchema); + + // Compare with expected output, trimming whitespace for comparison + String expected = TestUtils.loadSchema("testLowercaseSchemaWithDefaultValues-expected.avsc").trim(); + String actual = outputSchema.toString(true).trim(); + + Assert.assertEquals(actual, expected); + } } diff --git a/coral-schema/src/test/resources/testLowercaseSchemaWithDefaultValues-expected.avsc b/coral-schema/src/test/resources/testLowercaseSchemaWithDefaultValues-expected.avsc new file mode 100644 index 000000000..025500a5e --- /dev/null +++ b/coral-schema/src/test/resources/testLowercaseSchemaWithDefaultValues-expected.avsc @@ -0,0 +1,60 @@ +{ + "type" : "record", + "name" : "testrecord", + "namespace" : "com.test", + "fields" : [ { + "name" : "simple_field", + "type" : "int", + "default" : 42 + }, { + "name" : "struct_field", + "type" : { + "type" : "record", + "name" : "nestedrecord", + "fields" : [ { + "name" : "firstname", + "type" : "string" + }, { + "name" : "lastname", + "type" : "string" + }, { + "name" : "age", + "type" : "int" + } ] + }, + "default" : { + "firstname" : "John", + "lastname" : "Doe", + "age" : 30 + } + }, { + "name" : "map_field", + "type" : { + "type" : "map", + "values" : "string" + }, + "default" : { + "key_one" : "value1", + "key_two" : "value2" + } + }, { + "name" : "array_field", + "type" : { + "type" : "array", + "items" : { + "type" : "record", + "name" : "arrayitem", + "fields" : [ { + "name" : "item_name", + "type" : "string" + } ] + } + }, + "default" : [ { + "item_name" : "item1" + }, { + "item_name" : "item2" + } ] + } ] +} + diff --git a/coral-schema/src/test/resources/testLowercaseSchemaWithDefaultValues-input.avsc b/coral-schema/src/test/resources/testLowercaseSchemaWithDefaultValues-input.avsc new file mode 100644 index 000000000..c4b928aab --- /dev/null +++ b/coral-schema/src/test/resources/testLowercaseSchemaWithDefaultValues-input.avsc @@ -0,0 +1,60 @@ +{ + "type" : "record", + "name" : "TestRecord", + "namespace" : "com.test", + "fields" : [ { + "name" : "Simple_Field", + "type" : "int", + "default" : 42 + }, { + "name" : "Struct_Field", + "type" : { + "type" : "record", + "name" : "NestedRecord", + "fields" : [ { + "name" : "firstName", + "type" : "string" + }, { + "name" : "lastName", + "type" : "string" + }, { + "name" : "Age", + "type" : "int" + } ] + }, + "default" : { + "firstName" : "John", + "lastName" : "Doe", + "Age" : 30 + } + }, { + "name" : "Map_Field", + "type" : { + "type" : "map", + "values" : "string" + }, + "default" : { + "Key_One" : "value1", + "Key_Two" : "value2" + } + }, { + "name" : "Array_Field", + "type" : { + "type" : "array", + "items" : { + "type" : "record", + "name" : "ArrayItem", + "fields" : [ { + "name" : "Item_Name", + "type" : "string" + } ] + } + }, + "default" : [ { + "Item_Name" : "item1" + }, { + "Item_Name" : "item2" + } ] + } ] +} +