diff --git a/api/src/main/java/org/apache/iceberg/expressions/BoundExtract.java b/api/src/main/java/org/apache/iceberg/expressions/BoundExtract.java index 8c333f5cf4c9..803a895871f4 100644 --- a/api/src/main/java/org/apache/iceberg/expressions/BoundExtract.java +++ b/api/src/main/java/org/apache/iceberg/expressions/BoundExtract.java @@ -26,9 +26,13 @@ public class BoundExtract implements BoundTerm { private final String path; private final Type type; + /** + * @param path normalized extract path, as from {@link UnboundExtract#path()}; already validated + * and canonical in {@link UnboundExtract#bind}. + */ BoundExtract(BoundReference ref, String path, Type type) { this.ref = ref; - this.path = PathUtil.toNormalizedPath(PathUtil.parse(path)); + this.path = path; this.type = type; } diff --git a/api/src/main/java/org/apache/iceberg/expressions/ExpressionUtil.java b/api/src/main/java/org/apache/iceberg/expressions/ExpressionUtil.java index af24ce40cac8..37ab44fa82d2 100644 --- a/api/src/main/java/org/apache/iceberg/expressions/ExpressionUtil.java +++ b/api/src/main/java/org/apache/iceberg/expressions/ExpressionUtil.java @@ -243,6 +243,24 @@ public static String describe(Term term) { return ((NamedReference) term).name(); } else if (term instanceof BoundReference) { return ((BoundReference) term).name(); + } else if (term instanceof UnboundExtract) { + UnboundExtract unboundExtract = (UnboundExtract) term; + return "extract(" + + describe(unboundExtract.ref()) + + ", " + + unboundExtract.path() + + ", " + + unboundExtract.type() + + ")"; + } else if (term instanceof BoundExtract) { + BoundExtract boundExtract = (BoundExtract) term; + return "extract(" + + describe(boundExtract.ref()) + + ", " + + boundExtract.path() + + ", " + + boundExtract.type() + + ")"; } else { throw new UnsupportedOperationException("Unsupported term: " + term); } @@ -254,6 +272,9 @@ public static UnboundTerm unbind(BoundTerm term) { return Expressions.transform(bound.ref().name(), bound.transform()); } else if (term instanceof BoundReference) { return Expressions.ref(((BoundReference) term).name()); + } else if (term instanceof BoundExtract) { + BoundExtract bound = (BoundExtract) term; + return Expressions.extract(bound.ref().name(), bound.path(), bound.type().toString()); } throw new UnsupportedOperationException("Cannot unbind unsupported term: " + term); diff --git a/api/src/main/java/org/apache/iceberg/expressions/Expressions.java b/api/src/main/java/org/apache/iceberg/expressions/Expressions.java index 78012def5a58..7f080b870a28 100644 --- a/api/src/main/java/org/apache/iceberg/expressions/Expressions.java +++ b/api/src/main/java/org/apache/iceberg/expressions/Expressions.java @@ -102,6 +102,12 @@ public static UnboundTerm truncate(String name, int width) { return new UnboundTransform<>(ref(name), Transforms.truncate(width)); } + /** + * Extract a field from a variant column. {@code path} is a small RFC 9535-style JSONPath: root + * {@code $}, then steps are {@code .name}, {@code ['name']} (RFC 9535 escapes inside quotes), or + * {@code [n]} for a zero-based array index. You can mix these ({@code $.a['b.c']}, {@code + * $.items[0].tags[1]}). {@link UnboundExtract#path()} returns the normalized string. + */ public static UnboundTerm extract(String name, String path, String type) { return new UnboundExtract<>(ref(name), path, type); } diff --git a/api/src/main/java/org/apache/iceberg/expressions/InclusiveMetricsEvaluator.java b/api/src/main/java/org/apache/iceberg/expressions/InclusiveMetricsEvaluator.java index 81cbbe785519..18c26c0bdd5d 100644 --- a/api/src/main/java/org/apache/iceberg/expressions/InclusiveMetricsEvaluator.java +++ b/api/src/main/java/org/apache/iceberg/expressions/InclusiveMetricsEvaluator.java @@ -21,6 +21,7 @@ import static org.apache.iceberg.expressions.Expressions.rewriteNot; import java.nio.ByteBuffer; +import java.nio.ByteOrder; import java.util.Collection; import java.util.Comparator; import java.util.Map; @@ -631,7 +632,15 @@ private boolean isNonNullPreserving(Bound term) { } } + /** + * Build a variant from the buffer, regardless of the ordering of the incoming buffer. + * + * @param buffer source data + * @return variant instance + */ private static VariantObject parseBounds(ByteBuffer buffer) { - return Variant.from(buffer).value().asObject(); + // Explicitly use little-endian encoding for reading buffer + ByteBuffer littleEndian = buffer.duplicate().order(ByteOrder.LITTLE_ENDIAN); + return Variant.from(littleEndian).value().asObject(); } } diff --git a/api/src/main/java/org/apache/iceberg/expressions/PathUtil.java b/api/src/main/java/org/apache/iceberg/expressions/PathUtil.java index 2e943bfdbfbd..b27a926f9c46 100644 --- a/api/src/main/java/org/apache/iceberg/expressions/PathUtil.java +++ b/api/src/main/java/org/apache/iceberg/expressions/PathUtil.java @@ -27,13 +27,23 @@ import java.util.stream.Collectors; import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; -import org.apache.iceberg.relocated.com.google.common.base.Splitter; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Streams; public class PathUtil { private PathUtil() {} + /** + * One step in a variant JSONPath: an object member name or a zero-based array index (RFC 9535 + * {@code [n]} selector). + */ + sealed interface PathSegment permits PathSegment.Name, PathSegment.Index { + record Name(String name) implements PathSegment {} + + record Index(int index) implements PathSegment {} + } + private static final String RFC9535_NAME_FIRST = "[A-Za-z_\\x{0080}-\\x{D7FF}\\x{E000}-\\x{10FFFF}]"; private static final String RFC9535_NAME_CHARS = @@ -41,46 +51,263 @@ private PathUtil() {} private static final Predicate RFC9535_MEMBER_NAME_SHORTHAND = Pattern.compile(RFC9535_NAME_FIRST + RFC9535_NAME_CHARS).asMatchPredicate(); + /** Letters that follow {@code \} for control-character escapes in RFC 9535 quoted segments. */ + private static final String RFC9535_SIMPLE_ESCAPE_LETTERS = "btnfr"; + + private static final String RFC9535_SIMPLE_ESCAPE_CHARS = "\b\t\n\f\r"; + private static final Pattern RFC9535_REQUIRES_ESCAPE = Pattern.compile( "[^\\x{0020}-\\x{0026}\\x{0028}-\\x{005B}\\x{005D}-\\x{D7FF}\\x{E000}-\\x{10FFFF}]"); + /** + * Matches one bracket segment {@code ['...']} where inner text may contain RFC 9535 escapes + * (quote, backslash, control characters, and four-digit hex escapes). + */ + private static final Pattern BRACKET_SEGMENT = Pattern.compile("\\['((?:[^'\\\\]|\\\\.)*)'\\]"); + private static final Map RFC9535_ESCAPE_REPLACEMENTS = buildReplacementMap(); - private static final Splitter DOT = Splitter.on("."); private static final String ROOT = "$"; - static List parse(String path) { + /** + * Parses a path into segments. After the root {@code $}, each segment is either dot shorthand + * ({@code .name} per RFC 9535), a single-quoted bracket name ({@code ['...']}) with RFC 9535 + * escapes, or a numeric array index ({@code [n]}). Forms may be mixed (e.g. {@code $.a['b.c']}, + * {@code $.items[0].tags}, {@code $.matrix[0][1]}). Wildcards and recursive descent are not + * supported. + * + *

The root path {@code $} yields an empty segment list. + */ + static List parse(String path) { Preconditions.checkArgument(path != null, "Invalid path: null"); + Preconditions.checkArgument(!path.isEmpty(), "Invalid path: empty"); + Preconditions.checkArgument( + path.startsWith(ROOT), "Invalid path, does not start with %s: %s", ROOT, path); + + if (path.equals(ROOT)) { + return Lists.newArrayList(); + } + + return parseAfterRoot(path); + } + + /** Normalizes object field names only (no array indices). */ + public static String toNormalizedPath(Iterable fields) { + return toNormalizedPath( + Streams.stream(fields).map(PathSegment.Name::new).collect(Collectors.toList())); + } + + static String toNormalizedPath(List segments) { + StringBuilder builder = new StringBuilder(ROOT); + for (PathSegment segment : segments) { + if (segment instanceof PathSegment.Name) { + String name = ((PathSegment.Name) segment).name(); + builder.append("['").append(rfc9535escape(name)).append("']"); + } else if (segment instanceof PathSegment.Index) { + int index = ((PathSegment.Index) segment).index(); + Preconditions.checkArgument(index >= 0, "Invalid path, negative array index: %s", index); + builder.append('[').append(index).append(']'); + } else { + throw new IllegalStateException("Unknown segment: " + segment); + } + } + return builder.toString(); + } + + private static List parseAfterRoot(String path) { + List segments = Lists.newArrayList(); + Matcher bracketMatcher = BRACKET_SEGMENT.matcher(path); + int len = path.length(); + int pos = ROOT.length(); + + while (pos < len) { + char ch = path.charAt(pos); + pos = + switch (ch) { + case '.' -> appendDotSegment(segments, path, pos); + case '[' -> appendBracketOrIndexSegment(segments, path, pos, bracketMatcher); + default -> + throw new IllegalArgumentException( + String.format( + "Invalid path, expected '.' or '[' at position %s: %s", pos, path)); + }; + } + + return segments; + } + + /** + * Appends a dot-style segment to {@code segments} by reading from {@code path[dotPos]}: a single + * leading {@code .} then an RFC 9535 shorthand name until the next {@code .} or {@code [}. + * + * @param segments output; segments parsed so far, updated in place + * @param path full path + * @param dotPos index of the {@code .} starting the segment + */ + private static int appendDotSegment(List segments, String path, int dotPos) { + int pos = dotPos + 1; + int pathLen = path.length(); + Preconditions.checkArgument(pos < pathLen, "Invalid path, trailing dot: %s", path); + int start = pos; + while (pos < pathLen) { + char ch = path.charAt(pos); + if (ch == '.' || ch == '[') { + break; + } + pos++; + } + + Preconditions.checkArgument(pos > start, "Invalid path, empty segment after '.': %s", path); + String name = path.substring(start, pos); Preconditions.checkArgument( - !path.contains("[") && !path.contains("]"), "Unsupported path, contains bracket: %s", path); + RFC9535_MEMBER_NAME_SHORTHAND.test(name), + "Invalid path: %s (%s has invalid characters)", + path, + name); + segments.add(new PathSegment.Name(name)); + return pos; + } + + /** + * Appends a bracket segment to {@code segments} starting at {@code path[bracketPos]}. If the next + * character is a digit, consumes a numeric array index {@code [n]}; otherwise consumes a quoted + * name {@code ['...']}. A lone {@code [} with no following quoted form (e.g. the path ends at + * {@code $[}) is rejected in {@link #appendQuotedBracketSegment} when the pattern does not match. + * + * @param segments output; segments parsed so far, updated in place + * @param path full path + * @param bracketPos index of the opening {@code [} + */ + private static int appendBracketOrIndexSegment( + List segments, String path, int bracketPos, Matcher bracketMatcher) { Preconditions.checkArgument( - !path.contains("*"), "Unsupported path, contains wildcard: %s", path); + bracketPos < path.length() && path.charAt(bracketPos) == '[', "Invalid path: %s", path); + if (bracketPos + 1 < path.length() && isAsciiDigit(path.charAt(bracketPos + 1))) { + return appendArrayIndexSegment(segments, path, bracketPos); + } + return appendQuotedBracketSegment(segments, path, bracketPos, bracketMatcher); + } + + private static boolean isAsciiDigit(char ch) { + return ch >= '0' && ch <= '9'; + } + + /** + * Appends a non-negative array index from {@code [n]} to {@code segments}, starting with {@code + * [} at {@code path[bracketPos]}. + * + * @param segments output; segments parsed so far, updated in place + * @param path full path + * @param bracketPos index of the opening {@code [} before the digits + */ + private static int appendArrayIndexSegment( + List segments, String path, int bracketPos) { + int pos = bracketPos + 1; + int len = path.length(); + int start = pos; + while (pos < len && isAsciiDigit(path.charAt(pos))) { + pos++; + } + Preconditions.checkArgument(pos > start, "Invalid path, empty array index in: %s", path); Preconditions.checkArgument( - !path.contains(".."), "Unsupported path, contains recursive descent: %s", path); + pos < len && path.charAt(pos) == ']', "Invalid path, unclosed array index in: %s", path); + int index; + String digits = path.substring(start, pos); + try { + index = Integer.parseInt(digits); + } catch (NumberFormatException e) { + throw new IllegalArgumentException( + String.format("Invalid path, array index out of int range: %s", path), e); + } + Preconditions.checkArgument(index >= 0, "Invalid path, negative array index in: %s", path); + segments.add(new PathSegment.Index(index)); + return pos + 1; + } - List parts = DOT.splitToList(path); + /** + * Appends a name from a {@code ['...']} segment to {@code segments} using the bracket matcher + * (inner text may use RFC 9535 escapes). Expects a full quoted bracket token at {@code + * path[bracketPos]}; otherwise the matcher or alignment checks throw. + * + * @param segments output; segments parsed so far, updated in place + * @param path full path + * @param bracketPos index of the opening {@code [} that must begin {@code ['} + */ + private static int appendQuotedBracketSegment( + List segments, String path, int bracketPos, Matcher bracketMatcher) { Preconditions.checkArgument( - ROOT.equals(parts.get(0)), "Invalid path, does not start with %s: %s", ROOT, path); - - List names = parts.subList(1, parts.size()); - for (String name : names) { - Preconditions.checkArgument( - RFC9535_MEMBER_NAME_SHORTHAND.test(name), - "Invalid path: %s (%s has invalid characters)", - path, - name); + bracketMatcher.find(bracketPos), "Invalid path, malformed bracket segment: %s", path); + Preconditions.checkArgument( + bracketMatcher.start() == bracketPos, + "Invalid path, unexpected characters at position %s: %s", + bracketPos, + path); + segments.add(new PathSegment.Name(rfc9535unescape(bracketMatcher.group(1)))); + return bracketMatcher.end(); + } + + /** Unescapes the inner text of a {@code ['...']} segment (inverse of {@link #rfc9535escape}). */ + @VisibleForTesting + @SuppressWarnings("StatementSwitchToExpressionSwitch") + static String rfc9535unescape(String escaped) { + if (!escaped.contains("\\")) { + return escaped; + } + + StringBuilder builder = new StringBuilder(escaped.length()); + int cursor = 0; + while (cursor < escaped.length()) { + char ch = escaped.charAt(cursor); + if (ch != '\\') { + builder.append(ch); + cursor += 1; + } else { + Preconditions.checkArgument( + cursor + 1 < escaped.length(), "Invalid escape sequence at end of: %s", escaped); + char next = escaped.charAt(cursor + 1); + switch (next) { + case 'u': + Preconditions.checkArgument( + cursor + 5 < escaped.length(), + "Invalid \\uXXXX escape at position %s in: %s", + cursor, + escaped); + builder.append((char) Integer.parseInt(escaped.substring(cursor + 2, cursor + 6), 16)); + cursor += 6; + break; + case 'b': + case 't': + case 'f': + case 'n': + case 'r': + case '\'': + case '\\': + builder.append(rfc9535SimpleEscapedChar(next)); + cursor += 2; + break; + default: + throw new IllegalArgumentException( + "Invalid escape sequence \\" + next + " in: " + escaped); + } + } } - return names; + return builder.toString(); } - public static String toNormalizedPath(Iterable fields) { - return ROOT - + Streams.stream(fields) - .map(PathUtil::rfc9535escape) - .map(name -> "['" + name + "']") - .collect(Collectors.joining("")); + private static char rfc9535SimpleEscapedChar(char next) { + int idx = RFC9535_SIMPLE_ESCAPE_LETTERS.indexOf(next); + if (idx >= 0) { + return RFC9535_SIMPLE_ESCAPE_CHARS.charAt(idx); + } + if (next == '\'') { + return '\''; + } + if (next == '\\') { + return '\\'; + } + throw new IllegalArgumentException("Invalid simple escape: \\" + next); } @VisibleForTesting diff --git a/api/src/main/java/org/apache/iceberg/expressions/UnboundExtract.java b/api/src/main/java/org/apache/iceberg/expressions/UnboundExtract.java index 1f29650c7411..5c5cc2e84899 100644 --- a/api/src/main/java/org/apache/iceberg/expressions/UnboundExtract.java +++ b/api/src/main/java/org/apache/iceberg/expressions/UnboundExtract.java @@ -29,10 +29,8 @@ public class UnboundExtract implements UnboundTerm { public UnboundExtract(NamedReference ref, String path, String type) { this.ref = ref; - this.path = path; + this.path = PathUtil.toNormalizedPath(PathUtil.parse(path)); this.type = Types.fromPrimitiveString(type); - // verify that the path is well-formed - PathUtil.parse(path); } @Override diff --git a/api/src/test/java/org/apache/iceberg/expressions/TestExpressionBinding.java b/api/src/test/java/org/apache/iceberg/expressions/TestExpressionBinding.java index 24e58ad1e808..90aba3a1cece 100644 --- a/api/src/test/java/org/apache/iceberg/expressions/TestExpressionBinding.java +++ b/api/src/test/java/org/apache/iceberg/expressions/TestExpressionBinding.java @@ -64,6 +64,12 @@ public class TestExpressionBinding { optional(5, "nullable", Types.IntegerType.get()), optional(6, "always_null", Types.UnknownType.get())); + private static void assertBoundVariantLongExtractPath(String extractPath, String expectedPath) { + Expression bound = Binder.bind(STRUCT, lessThan(extract("var", extractPath, "long"), 100)); + BoundExtract term = (BoundExtract) TestHelpers.assertAndUnwrap(bound).term(); + assertThat(term.path()).isEqualTo(expectedPath); + } + @Test public void testMissingReference() { Expression expr = and(equal("t", 5), equal("x", 7)); @@ -337,7 +343,7 @@ public void testNotNullWithRequiredVariant() { } @Test - public void testExtractExpressionBinding() { + public void testVariantExtractBindingNormalizesDotPathToBracket() { Expression bound = Binder.bind(STRUCT, lessThan(extract("var", "$.event_id", "long"), 100)); TestHelpers.assertAllReferencesBound("BoundExtract", bound); BoundPredicate pred = TestHelpers.assertAndUnwrap(bound); @@ -350,6 +356,61 @@ public void testExtractExpressionBinding() { assertThat(boundExtract.type()).isEqualTo(Types.LongType.get()); } + @Test + public void testVariantExtractBindingPreservesBracketInputPath() { + assertBoundVariantLongExtractPath("$['event_id']", "$['event_id']"); + } + + @Test + public void testVariantExtractBindingMixedDotAndBracketPath() { + assertBoundVariantLongExtractPath("$.employee['user.name']", "$['employee']['user.name']"); + } + + @Test + public void testVariantExtractBindingBracketPathWithRfc9535Unescape() { + // Inner segment is a'b after unescaping; normalized bracket form uses \' for the quote. + assertBoundVariantLongExtractPath("$['a\\'b']", "$['a\\'b']"); + } + + @Test + public void testVariantExtractBindingBracketPathWithUnicodeEscape() { + assertBoundVariantLongExtractPath("$['\\u00e9']", "$['é']"); + } + + @Test + public void testVariantExtractUnbindPreservesNormalizedPathForDotInSegmentName() { + Expression boundExpr = lessThan(extract("var", "$['a.b']", "long"), 100L).bind(STRUCT, true); + UnboundTerm unbound = ExpressionUtil.unbind(((BoundPredicate) boundExpr).term()); + assertThat(unbound).isInstanceOf(UnboundExtract.class); + assertThat(((UnboundExtract) unbound).path()).isEqualTo("$['a.b']"); + } + + @Test + public void testVariantExtractUnbindPreservesNormalizedPathAfterMixedPath() { + Expression boundExpr = lessThan(extract("var", "$.x['y.z']", "long"), 100L).bind(STRUCT, true); + UnboundTerm unbound = ExpressionUtil.unbind(((BoundPredicate) boundExpr).term()); + assertThat(unbound).isInstanceOf(UnboundExtract.class); + assertThat(((UnboundExtract) unbound).path()).isEqualTo("$['x']['y.z']"); + } + + @Test + public void testVariantExtractUnbindPreservesNormalizedPathAfterArrayAccesses() { + Expression boundExpr = + lessThan(extract("var", "$.items[0].tags[1]", "long"), 100L).bind(STRUCT, true); + UnboundTerm unbound = ExpressionUtil.unbind(((BoundPredicate) boundExpr).term()); + assertThat(unbound).isInstanceOf(UnboundExtract.class); + assertThat(((UnboundExtract) unbound).path()).isEqualTo("$['items'][0]['tags'][1]"); + } + + @Test + public void testVariantExtractUnbindPreservesNormalizedPathAfterBracketStyleArrayAccesses() { + Expression boundExpr = + lessThan(extract("var", "$['items'][0]['tags'][1]", "long"), 100L).bind(STRUCT, true); + UnboundTerm unbound = ExpressionUtil.unbind(((BoundPredicate) boundExpr).term()); + assertThat(unbound).isInstanceOf(UnboundExtract.class); + assertThat(((UnboundExtract) unbound).path()).isEqualTo("$['items'][0]['tags'][1]"); + } + @Test public void testExtractExpressionNonVariant() { assertThatThrownBy(() -> Binder.bind(STRUCT, lessThan(extract("x", "$.event_id", "long"), 100))) @@ -357,16 +418,30 @@ public void testExtractExpressionNonVariant() { .hasMessage("Cannot bind extract, not a variant: x"); } + /** + * Param smoke list for "extract binds". Dot path normalization, keys with dots, mixed bracket + * form, and array index paths are covered in dedicated tests above (e.g. {@link + * #testVariantExtractBindingNormalizesDotPathToBracket}, {@link + * #testVariantExtractBindingBracketPathWithRfc9535Unescape}, array cases from {@link + * #testVariantExtractUnbindPreservesNormalizedPathAfterArrayAccesses}). + */ private static final String[] VALID_PATHS = new String[] { "$", // root path "$.event_id", - "$.event.id" + "$.event.id", + "$['event_id']", + "$.a['b.c']", + "$.matrix[0][1]", + "$.basket[0][2].a", + "$.items[0].tags[1]", + "$['items'][0]['tags'][1]", + "$.events[0].event_id" }; @ParameterizedTest @FieldSource("VALID_PATHS") - public void testExtractExpressionBindingPaths(String path) { + public void testVariantExtractBindingAcceptsPaths(String path) { Expression bound = Binder.bind(STRUCT, lessThan(extract("var", path, "long"), 100)); TestHelpers.assertAllReferencesBound("BoundExtract", bound); BoundPredicate pred = TestHelpers.assertAndUnwrap(bound); @@ -378,15 +453,13 @@ public void testExtractExpressionBindingPaths(String path) { null, "", "event_id", // missing root - "$['event_id']", // uses bracket notation "$..event_id", // uses recursive descent - "$.events[0].event_id", // uses position accessor "$.events.*" // uses wildcard }; @ParameterizedTest @FieldSource("UNSUPPORTED_PATHS") - public void testExtractBindingWithInvalidPath(String path) { + public void testVariantExtractBindingRejectsInvalidPaths(String path) { assertThatThrownBy(() -> Binder.bind(STRUCT, lessThan(extract("var", path, "long"), 100))) .isInstanceOf(IllegalArgumentException.class) .hasMessageMatching("(Unsupported|Invalid) path.*"); diff --git a/api/src/test/java/org/apache/iceberg/expressions/TestExpressionUtil.java b/api/src/test/java/org/apache/iceberg/expressions/TestExpressionUtil.java index fdf3d9dcd1b0..f7983e879f42 100644 --- a/api/src/test/java/org/apache/iceberg/expressions/TestExpressionUtil.java +++ b/api/src/test/java/org/apache/iceberg/expressions/TestExpressionUtil.java @@ -373,6 +373,96 @@ public void testSanitizeNotStartsWith() { .isEqualTo("data NOT STARTS WITH (hash-34d05fb7)"); } + @Test + public void testSanitizeExtractTerm() { + Expression sanitized = + ExpressionUtil.sanitize( + Expressions.equal(Expressions.extract("var", "$.city", "string"), "Boston")); + assertThat(sanitized).isInstanceOf(UnboundPredicate.class); + UnboundPredicate pred = (UnboundPredicate) sanitized; + assertThat(pred.term()).isInstanceOf(UnboundExtract.class); + assertThat(pred.op()).isEqualTo(Expression.Operation.EQ); + assertThat(pred.literal().value().toString()).startsWith("(hash-").endsWith(")"); + + Expression boundSanitized = + ExpressionUtil.sanitize( + STRUCT, + Expressions.equal(Expressions.extract("var", "$.city", "string"), "Boston"), + true); + assertThat(boundSanitized).isInstanceOf(UnboundPredicate.class); + UnboundPredicate boundPred = (UnboundPredicate) boundSanitized; + assertThat(boundPred.term()).isInstanceOf(UnboundExtract.class); + + assertThat( + ExpressionUtil.toSanitizedString( + Expressions.equal(Expressions.extract("var", "$.city", "string"), "Boston"))) + .as("Sanitized string for extract term") + .startsWith("extract(var, $['city'], string) = (hash-") + .endsWith(")"); + + // Bound extract uses normalized path $['city'] in describe output + assertThat( + ExpressionUtil.toSanitizedString( + STRUCT, + Expressions.equal(Expressions.extract("var", "$.city", "string"), "Boston"), + true)) + .as("Sanitized string for extract term (bound)") + .startsWith("extract(var, $['city'], string) = (hash-") + .endsWith(")"); + } + + @Test + public void testUnbindBoundExtract() { + Expression boundExpr = + Expressions.greaterThan(Expressions.extract("var", "$.city", "string"), "Boston") + .bind(STRUCT, true); + UnboundTerm unbound = ExpressionUtil.unbind(((BoundPredicate) boundExpr).term()); + assertThat(unbound).isInstanceOf(UnboundExtract.class); + UnboundExtract extract = (UnboundExtract) unbound; + assertThat(extract.ref().name()).isEqualTo("var"); + assertThat(extract.path()).isEqualTo("$['city']"); + assertThat(extract.type().toString()).isEqualTo("string"); + } + + @Test + public void testUnbindBoundExtractBracketStyle() { + Expression boundExpr = + Expressions.greaterThan(Expressions.extract("var", "$['user.name']", "string"), "x") + .bind(STRUCT, true); + UnboundTerm unbound = ExpressionUtil.unbind(((BoundPredicate) boundExpr).term()); + assertThat(unbound).isInstanceOf(UnboundExtract.class); + UnboundExtract extract = (UnboundExtract) unbound; + assertThat(extract.ref().name()).isEqualTo("var"); + assertThat(extract.path()).isEqualTo("$['user.name']"); + assertThat(extract.type().toString()).isEqualTo("string"); + } + + @Test + public void testUnbindBoundExtractMixedStyle() { + Expression boundExpr = + Expressions.greaterThan(Expressions.extract("var", "$.a['b.c']", "long"), 1L) + .bind(STRUCT, true); + UnboundTerm unbound = ExpressionUtil.unbind(((BoundPredicate) boundExpr).term()); + assertThat(unbound).isInstanceOf(UnboundExtract.class); + UnboundExtract extract = (UnboundExtract) unbound; + assertThat(extract.ref().name()).isEqualTo("var"); + assertThat(extract.path()).isEqualTo("$['a']['b.c']"); + assertThat(extract.type().toString()).isEqualTo("long"); + } + + @Test + public void testUnbindBoundExtractArrayIndex() { + Expression boundExpr = + Expressions.greaterThan(Expressions.extract("var", "$.items[0].tags[1]", "long"), 100L) + .bind(STRUCT, true); + UnboundTerm unbound = ExpressionUtil.unbind(((BoundPredicate) boundExpr).term()); + assertThat(unbound).isInstanceOf(UnboundExtract.class); + UnboundExtract extract = (UnboundExtract) unbound; + assertThat(extract.ref().name()).isEqualTo("var"); + assertThat(extract.path()).isEqualTo("$['items'][0]['tags'][1]"); + assertThat(extract.type().toString()).isEqualTo("long"); + } + @Test public void testSanitizeTransformedTerm() { assertEquals( diff --git a/api/src/test/java/org/apache/iceberg/expressions/TestPathUtil.java b/api/src/test/java/org/apache/iceberg/expressions/TestPathUtil.java index 9115a8fcd2dd..4f4e92db22a5 100644 --- a/api/src/test/java/org/apache/iceberg/expressions/TestPathUtil.java +++ b/api/src/test/java/org/apache/iceberg/expressions/TestPathUtil.java @@ -18,11 +18,14 @@ */ package org.apache.iceberg.expressions; +import static org.apache.iceberg.expressions.PathUtil.PathSegment.Index; +import static org.apache.iceberg.expressions.PathUtil.PathSegment.Name; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatCode; import static org.assertj.core.api.Assertions.assertThatThrownBy; import java.util.List; +import java.util.stream.Collectors; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.FieldSource; @@ -30,9 +33,13 @@ @SuppressWarnings({"AvoidEscapedUnicodeCharacters", "IllegalTokenText"}) public class TestPathUtil { + private static List names(String... names) { + return java.util.Arrays.stream(names).map(Name::new).collect(Collectors.toList()); + } + @Test public void testSimplePath() { - assertThat(PathUtil.parse("$.event.id")).isEqualTo(List.of("event", "id")); + assertThat(PathUtil.parse("$.event.id")).isEqualTo(names("event", "id")); } private static final String[] VALID_PATHS = @@ -40,8 +47,18 @@ public void testSimplePath() { "$", // root path "$.event_id", "$.event.id", + "$['event_id']", // bracket form + "$.event['x.y']", // mixed: dot then bracket + "$['event']['id']", // bracket then bracket + "$['a'].b", // bracket then dot "$.\u2603", // snowman "$.\uD834\uDD1E", // surrogate pair, U+1D11E + "$.matrix[0][1]", + "$.basket[0][2].a", + "$.items[0].tags[1]", + "$['matrix'][0][1]", + "$['items'][0]['tags'][1]", + "$['basket'][0][2]['a']", }; @ParameterizedTest @@ -55,9 +72,7 @@ public void testExtractExpressionBindingPaths(String path) { null, "", "event_id", // missing root - "$['event_id']", // uses bracket notation "$..event_id", // uses recursive descent - "$.events[0].event_id", // uses position accessor "$.events.*", // uses wildcard "$.0invalid", // starts with a digit "$._\uD834", // dangling high surrogate @@ -79,6 +94,10 @@ public void testExtractBindingWithInvalidPath(String path) { new String[] {"$.a.b.c", "$['a']['b']['c']"}, new String[] {"$.\u2603", "$['☃']"}, new String[] {"$.a\uD834\uDD1Eb.x", "$['a\uD834\uDD1Eb']['x']"}, + // Dot shorthand vs bracket form for names; array steps use unquoted [n] in both. + new String[] {"$.matrix[0][1]", "$['matrix'][0][1]"}, + new String[] {"$.items[0].tags[1]", "$['items'][0]['tags'][1]"}, + new String[] {"$.basket[0][2].a", "$['basket'][0][2]['a']"}, }; @ParameterizedTest @@ -123,4 +142,163 @@ public void testNormalizedFieldLists(List fields, String normalizedPath) public void testPathEscaping(String name, String escaped) { assertThat(PathUtil.rfc9535escape(name)).isEqualTo(escaped); } + + @ParameterizedTest + @FieldSource("ESCAPE_CASES") + public void testPathUnescapeRoundTrip(String name, String escaped) { + assertThat(PathUtil.rfc9535unescape(escaped)).isEqualTo(name); + } + + @ParameterizedTest + @FieldSource("NORMALIZED_PATHS") + public void testParseDotAndBracketAgree(String dotPath, String normalizedPath) { + assertThat(PathUtil.parse(dotPath)).isEqualTo(PathUtil.parse(normalizedPath)); + } + + @Test + public void testParseSingleSegmentWithDot() { + assertThat(PathUtil.parse("$['a.b']")).isEqualTo(names("a.b")); + assertThat(PathUtil.parse("$['user.name']")).isEqualTo(names("user.name")); + } + + @Test + public void testParseDistinctFromDotSplit() { + assertThat(PathUtil.parse("$.a.b")).isEqualTo(names("a", "b")); + assertThat(PathUtil.parse("$['a.b']")).isEqualTo(names("a.b")); + } + + @Test + public void testParseNormalizedRoundTrip() { + for (Object[] row : NORMALIZED_FIELD_LISTS) { + @SuppressWarnings("unchecked") + List fields = (List) row[0]; + String normalized = (String) row[1]; + assertThat(PathUtil.toNormalizedPath(PathUtil.parse(normalized))).isEqualTo(normalized); + assertThat(PathUtil.parse(normalized)) + .isEqualTo(fields.stream().map(Name::new).collect(Collectors.toList())); + } + } + + @Test + public void testParseRoot() { + assertThat(PathUtil.parse("$")).isEqualTo(List.of()); + assertThat(PathUtil.toNormalizedPath(PathUtil.parse("$"))).isEqualTo("$"); + } + + /** + * Bracket paths can represent keys that dot notation cannot (empty name, {@code []}, {@code ..}). + */ + @Test + public void testParseSpecialFieldNames() { + assertThat(PathUtil.parse("$['']")).isEqualTo(names("")); + assertThat(PathUtil.parse("$['[]']")).isEqualTo(names("[]")); + assertThat(PathUtil.parse("$['..']")).isEqualTo(names("..")); + assertThat(PathUtil.parse("$['*']")).isEqualTo(names("*")); + assertThat(PathUtil.parse("$['[']")).isEqualTo(names("[")); + assertThat(PathUtil.parse("$[']']")).isEqualTo(names("]")); + // RFC 9535 simple escape \f in a quoted segment (also covered in ESCAPE_CASES round-trips) + assertThat(PathUtil.parse("$['\\f']")).isEqualTo(List.of(new Name("\f"))); + + assertThat(PathUtil.toNormalizedPath(PathUtil.parse("$['']"))).isEqualTo("$['']"); + assertThat(PathUtil.toNormalizedPath(PathUtil.parse("$['[]']"))).isEqualTo("$['[]']"); + assertThat(PathUtil.toNormalizedPath(PathUtil.parse("$['..']"))).isEqualTo("$['..']"); + } + + @Test + public void testParseNestedWithSpecialMiddleSegment() { + assertThat(PathUtil.parse("$['a']['..']['b']")).isEqualTo(names("a", "..", "b")); + assertThat(PathUtil.toNormalizedPath(List.of("a", "..", "b"))).isEqualTo("$['a']['..']['b']"); + } + + @Test + public void testParseMixedDotAndBracket() { + assertThat(PathUtil.parse("$.a['b.c']")).isEqualTo(names("a", "b.c")); + assertThat(PathUtil.parse("$.events['user.name'].id")) + .isEqualTo(names("events", "user.name", "id")); + assertThat(PathUtil.toNormalizedPath(PathUtil.parse("$.a['b.c']"))).isEqualTo("$['a']['b.c']"); + } + + @Test + public void testParseMixedBracketThenDot() { + assertThat(PathUtil.parse("$['x.y'].z")).isEqualTo(names("x.y", "z")); + assertThat(PathUtil.toNormalizedPath(PathUtil.parse("$['x.y'].z"))).isEqualTo("$['x.y']['z']"); + } + + @Test + public void testParseArrayIndexSegments() { + assertThat(PathUtil.parse("$.matrix[0][1]")) + .isEqualTo(List.of(new Name("matrix"), new Index(0), new Index(1))); + assertThat(PathUtil.toNormalizedPath(PathUtil.parse("$.matrix[0][1]"))) + .isEqualTo("$['matrix'][0][1]"); + + assertThat(PathUtil.parse("$.basket[0][2].a")) + .isEqualTo(List.of(new Name("basket"), new Index(0), new Index(2), new Name("a"))); + assertThat(PathUtil.toNormalizedPath(PathUtil.parse("$.basket[0][2].a"))) + .isEqualTo("$['basket'][0][2]['a']"); + + assertThat(PathUtil.parse("$.items[0].tags[1]")) + .isEqualTo(List.of(new Name("items"), new Index(0), new Name("tags"), new Index(1))); + assertThat(PathUtil.toNormalizedPath(PathUtil.parse("$.items[0].tags[1]"))) + .isEqualTo("$['items'][0]['tags'][1]"); + + assertThat(PathUtil.parse("$.events[0].event_id")) + .isEqualTo(List.of(new Name("events"), new Index(0), new Name("event_id"))); + assertThat(PathUtil.toNormalizedPath(PathUtil.parse("$.events[0].event_id"))) + .isEqualTo("$['events'][0]['event_id']"); + + assertThat(PathUtil.parse("$['items'][0]['tags'][1]")) + .isEqualTo(PathUtil.parse("$.items[0].tags[1]")); + assertThat(PathUtil.parse("$['matrix'][0][1]")).isEqualTo(PathUtil.parse("$.matrix[0][1]")); + } + + @Test + public void testParseArrayIndexRoundTrip() { + // parse(toNormalizedPath(parse(x))) == parse(x) for paths with [n] segments + for (String[] pair : NORMALIZED_PATHS) { + String normalized = pair[1]; + if (normalized.contains("[") && !normalized.contains("['")) { + // normalized path has array indices; round-trip must be stable + assertThat(PathUtil.parse(PathUtil.toNormalizedPath(PathUtil.parse(normalized)))) + .isEqualTo(PathUtil.parse(normalized)); + } + } + // explicit cases to be clear about what is covered + for (String path : + new String[] { + "$['matrix'][0][1]", + "$['items'][0]['tags'][1]", + "$['basket'][0][2]['a']", + "$['events'][0]['event_id']", + }) { + assertThat(PathUtil.parse(PathUtil.toNormalizedPath(PathUtil.parse(path)))) + .isEqualTo(PathUtil.parse(path)); + } + } + + private static final String[] INVALID_PARSE_PATHS = + new String[] { + null, + "", + "event_id", + "$.a..b", + "$.events.*", + "$$", // second $ is not a path step after root + "$['a'", // unclosed + "$[", // [ without quoted ['...'] or index + "$['a']x", // trailing junk + "$.['a']", // empty dot segment before bracket + "$a.b", // missing separator after $ + "$.a[]", // empty array index + "$.a[2147483648]", // index does not fit in int + "$.a[-1]", // negative index (not valid JSONPath index syntax here) + }; + + @ParameterizedTest + @FieldSource("INVALID_PARSE_PATHS") + public void testParseInvalid(String path) { + assertThatThrownBy(() -> PathUtil.parse(path)) + .as("parse of %s", path) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageMatching("(Unsupported|Invalid) path.*"); + } } diff --git a/core/src/test/java/org/apache/iceberg/expressions/TestInclusiveMetricsEvaluatorWithExtract.java b/core/src/test/java/org/apache/iceberg/expressions/TestInclusiveMetricsEvaluatorWithExtract.java index 03c1c12f4fba..1cc98fe5587a 100644 --- a/core/src/test/java/org/apache/iceberg/expressions/TestInclusiveMetricsEvaluatorWithExtract.java +++ b/core/src/test/java/org/apache/iceberg/expressions/TestInclusiveMetricsEvaluatorWithExtract.java @@ -42,6 +42,7 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy; import java.nio.ByteBuffer; +import java.nio.ByteOrder; import java.util.List; import java.util.Map; import java.util.UUID; @@ -193,6 +194,57 @@ public void testIsNaNAndNotNaN() { .isTrue(); } + @Test + public void testBoundsWithBigEndianByteBuffer() { + // Simulate manifest bounds: bytes are little-endian but ByteBuffer has default (big-endian) + // order + ByteBuffer leLower = + VariantTestUtil.variantBuffer( + Map.of( + "$['event_id']", Variants.of(INT_MIN_VALUE), + "$['str']", Variants.of("abc"))); + ByteBuffer leUpper = + VariantTestUtil.variantBuffer( + Map.of( + "$['event_id']", Variants.of(INT_MAX_VALUE), + "$['str']", Variants.of("abe"))); + byte[] lowerBytes = new byte[leLower.remaining()]; + leLower.duplicate().get(lowerBytes); + byte[] upperBytes = new byte[leUpper.remaining()]; + leUpper.duplicate().get(upperBytes); + ByteBuffer beLower = ByteBuffer.wrap(lowerBytes); + ByteBuffer beUpper = ByteBuffer.wrap(upperBytes); + assertThat(beLower.order()).isEqualTo(ByteOrder.BIG_ENDIAN); + + DataFile fileWithBigEndianBounds = + new TestDataFile( + "file.avro", + Row.of(), + 50, + ImmutableMap.builder() + .put(1, 50L) + .put(2, 50L) + .put(3, 50L) + .buildOrThrow(), + ImmutableMap.builder().put(1, 0L).put(2, 0L).put(3, 50L).buildOrThrow(), + null, + Map.of(2, beLower), + Map.of(2, beUpper)); + + // Should not throw; file skipping should work with big-endian buffer (fix applies + // LITTLE_ENDIAN) + assertThat( + shouldRead( + greaterThan(extract("variant", "$.event_id", "long"), 50), fileWithBigEndianBounds)) + .as("Should read: 50 is within [30, 79]") + .isTrue(); + assertThat( + shouldRead( + lessThan(extract("variant", "$.event_id", "long"), 20), fileWithBigEndianBounds)) + .as("Should skip: 20 is below min 30") + .isFalse(); + } + @Test public void testMissingColumn() { assertThatThrownBy(() -> shouldRead(lessThan(extract("missing", "$", "int"), 5))) diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetMetricsRowGroupFilter.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetMetricsRowGroupFilter.java index cae9447513c0..f5aefbe56726 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetMetricsRowGroupFilter.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetMetricsRowGroupFilter.java @@ -21,19 +21,26 @@ import java.nio.ByteBuffer; import java.util.Collection; import java.util.Comparator; +import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; import java.util.stream.Collectors; import org.apache.iceberg.Schema; import org.apache.iceberg.expressions.Binder; import org.apache.iceberg.expressions.Bound; +import org.apache.iceberg.expressions.BoundExtract; +import org.apache.iceberg.expressions.BoundPredicate; import org.apache.iceberg.expressions.BoundReference; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.expressions.ExpressionVisitors; import org.apache.iceberg.expressions.ExpressionVisitors.BoundExpressionVisitor; import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.expressions.Literal; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.types.Comparators; import org.apache.iceberg.types.Type; @@ -42,16 +49,42 @@ import org.apache.parquet.column.statistics.Statistics; import org.apache.parquet.hadoop.metadata.BlockMetaData; import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.parquet.hadoop.metadata.ColumnPath; import org.apache.parquet.io.api.Binary; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.PrimitiveType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class ParquetMetricsRowGroupFilter { - private static final int IN_PREDICATE_LIMIT = 200; + public static final Logger LOG = LoggerFactory.getLogger(ParquetMetricsRowGroupFilter.class); + private static final int IN_PREDICATE_LIMIT = 200; private final Schema schema; private final Expression expr; + /** Is special handling of variants needed? */ + private final boolean hasVariantPredicates; + + /** + * variantColumnNames is file-schema-scoped (same for all row groups in a file), so cache it. * + */ + private MessageType cachedVariantFileSchema = null; + + /** Map of schema ID to variant column names. */ + private Map variantColumnNames = null; + + /** Set of variant top-level column names, derived from variantColumnNames values. */ + private Set variantTopLevelNames = null; + + /** + * For testing, especially while there are requirements for predecessor patches to be applied. + * + *

This permits assertions to be made that variant predicate pushdown reached this far and + * processed shredded columns. + */ + private static final AtomicLong VARIANT_PREDICATES_SHREDDED_METRICS = new AtomicLong(); + public ParquetMetricsRowGroupFilter(Schema schema, Expression unbound) { this(schema, unbound, true); } @@ -60,27 +93,44 @@ public ParquetMetricsRowGroupFilter(Schema schema, Expression unbound, boolean c this.schema = schema; StructType struct = schema.asStruct(); this.expr = Binder.bind(struct, Expressions.rewriteNot(unbound), caseSensitive); + this.hasVariantPredicates = hasVariantPredicates(expr); } /** - * Test whether the file may contain records that match the expression. + * Test whether the row group may contain records that match the expression. * * @param fileSchema schema for the Parquet file * @param rowGroup metadata for a row group * @return false if the file cannot contain rows that match the expression, true otherwise. */ + @SuppressWarnings("ReferenceEquality") public boolean shouldRead(MessageType fileSchema, BlockMetaData rowGroup) { + // identity check: same object means same file schema, no need to recompute variant column names + // REVIST: would the cached schema ever change within a file? + if (hasVariantPredicates && fileSchema != cachedVariantFileSchema) { + cachedVariantFileSchema = fileSchema; + variantColumnNames = buildVariantColumnNames(fileSchema); + variantTopLevelNames = ImmutableSet.copyOf(variantColumnNames.values()); + } return new MetricsEvalVisitor().eval(fileSchema, rowGroup); } private static final boolean ROWS_MIGHT_MATCH = true; private static final boolean ROWS_CANNOT_MATCH = false; + private record VariantColumnInfo(PrimitiveType type, int id, ColumnChunkMetaData chunkMetaData) {} + + /** Evaluate metrics, including variants. */ private class MetricsEvalVisitor extends BoundExpressionVisitor { private Map> stats = null; private Map valueCounts = null; private Map> conversions = null; + // ID-less columns collected during the main column scan for lazy variantInfoByColumnPath build + private List shreddedVariantColumns = null; + // Built lazily on the first compareVariant() call; null means not yet built + private Map variantInfoByColumnPath = null; + private boolean eval(MessageType fileSchema, BlockMetaData rowGroup) { if (rowGroup.getRowCount() <= 0) { return ROWS_CANNOT_MATCH; @@ -89,6 +139,8 @@ private boolean eval(MessageType fileSchema, BlockMetaData rowGroup) { this.stats = Maps.newHashMap(); this.valueCounts = Maps.newHashMap(); this.conversions = Maps.newHashMap(); + this.shreddedVariantColumns = hasVariantPredicates ? Lists.newArrayList() : null; + this.variantInfoByColumnPath = null; for (ColumnChunkMetaData col : rowGroup.getColumns()) { PrimitiveType colType = fileSchema.getType(col.getPath().toArray()).asPrimitiveType(); if (colType.getId() != null) { @@ -97,12 +149,28 @@ private boolean eval(MessageType fileSchema, BlockMetaData rowGroup) { stats.put(id, col.getStatistics()); valueCounts.put(id, col.getValueCount()); conversions.put(id, ParquetConversions.converterFromParquet(colType, icebergType)); + } else if (shreddedVariantColumns != null) { + // Shredded variant typed_value columns have no Iceberg field ID, just parquet ones. + // Pre-filter to only those under known variant top-level column names; buildVariantInfo() + // then just copies the list into a map on the first compareVariant() call. + ColumnPath columnPath = col.getPath(); + String[] pathParts = columnPath.toArray(); + if (pathParts.length > 0 && variantTopLevelNames.contains(pathParts[0])) { + shreddedVariantColumns.add(new VariantColumnInfo(colType, -1, col)); + } } } - return ExpressionVisitors.visitEvaluator(expr, this); } + /** Build variantInfoByColumnPath lazily on the first compareVariant() call. */ + private void buildVariantInfo() { + variantInfoByColumnPath = Maps.newHashMap(); + for (VariantColumnInfo colInfo : shreddedVariantColumns) { + variantInfoByColumnPath.put(colInfo.chunkMetaData().getPath(), colInfo); + } + } + @Override public Boolean alwaysTrue() { return ROWS_MIGHT_MATCH; // all rows match @@ -560,6 +628,239 @@ private T max(Statistics statistics, int id) { return (T) conversions.get(id).apply(statistics.genericGetMax()); } + @Override + public Boolean predicate(BoundPredicate pred) { + if (pred.term() instanceof BoundExtract term) { + // it's a variant predicate: process accordingly. + return compareVariant(pred, term); + } else { + return super.predicate(pred); + } + } + + /** + * Compare a variant with the predicate. For floats and doubles, expects the parquet writer to + * have normalized -0.0 to +0.0. + * + * @param pred predicate + * @param extract extracted variant reference + * @param type of predicate + * @return true if the file rows should be read (i.e. false iff we are confident they can be + * skipped) + */ + private boolean compareVariant(BoundPredicate pred, BoundExtract extract) { + if (variantInfoByColumnPath == null) { + // TODO: concurrency ? + buildVariantInfo(); + } + int fieldId = extract.ref().fieldId(); + LOG.info("comparing variant {}", extract); + String colName = variantColumnNames.get(fieldId); + if (colName == null) { + // not in the variant columns + return ROWS_MIGHT_MATCH; + } + // Build the column path of which a shredded field would have + ColumnPath columnPath = ParquetVariantUtil.shreddedColumnPath(colName, extract.path()); + final VariantColumnInfo columnInfo = variantInfoByColumnPath.get(columnPath); + if (columnInfo == null) { + // the column isn't shredded in this file, so no statistics are available. + return ROWS_MIGHT_MATCH; + } + // increment shredded metrics counter. + VARIANT_PREDICATES_SHREDDED_METRICS.incrementAndGet(); + + // now do the evaluation. + LOG.info("Evaluating column {} with info {}", columnPath, columnInfo); + PrimitiveType parquetType = columnInfo.type(); + final ColumnChunkMetaData col = columnInfo.chunkMetaData; + Statistics colStats = col.getStatistics(); + long valueCount = col.getValueCount(); + if (parquetType == null || colStats == null) { + // no type info or column stats + return ROWS_MIGHT_MATCH; + } + + // everything here onwards expects colStats != null + if (pred.isUnaryPredicate()) { + return evalUnaryPredicate(pred, colStats, valueCount); + } + final Boolean outcome = evalShreddedColumnStats(colStats, valueCount); + if (outcome != null) { + return outcome; + } + if (pred.isSetPredicate()) { + // set check + return evalMembershipPredicateOnShreddedVariant(pred, extract, parquetType, colStats); + } + if (!pred.isLiteralPredicate()) { + return ROWS_MIGHT_MATCH; + } + // get this far: it's a shredded variant with column statistics + return evalBinaryPredicateOnShreddedVariant(pred, extract, parquetType, colStats); + } + + /** + * Evaluate the statistics, return an Boolean value if there was enough information to make a + * decision. + * + *

This is a bit contrived, but it keeps the complexity of {@code compareVariant()} below the + * checkstyle limit. + * + * @param colStats column statistics + * @param valueCount number of values. + * @return an boolean which is null if no conclusion is reached. + */ + private Boolean evalShreddedColumnStats(Statistics colStats, long valueCount) { + if (colStats.isEmpty()) { + return ROWS_MIGHT_MATCH; + } + if (allNulls(colStats, valueCount)) { + // there's no non-null columns, therefore all comparators will be false + return ROWS_CANNOT_MATCH; + } + if (minMaxUndefined(colStats)) { + // min or max undefined, so ranged comparisons not possible + return ROWS_MIGHT_MATCH; + } + return null; + } + + /** + * Evaluate a predicate against two shredded values: should the rowgroup be read? + * + * @param pred predicate + * @param extract bounded extrat + * @param parquetType the parquet type of the column + * @param colStats column statistics. + * @param type of the arguments + * @return true if the rowgroup should be read. + */ + @SuppressWarnings("unchecked") + private boolean evalBinaryPredicateOnShreddedVariant( + BoundPredicate pred, + BoundExtract extract, + PrimitiveType parquetType, + Statistics colStats) { + // it's a binary predicate with valid results from comparisons with + // the stats. So get their min and max, compare them with the literal value. + Literal lit = pred.asLiteralPredicate().literal(); + // get the type converter for the evaluation + Function converter = + ParquetConversions.converterFromParquet(parquetType, extract.type()); + T min = (T) converter.apply(colStats.genericGetMin()); + T max = (T) converter.apply(colStats.genericGetMax()); + + final int minVsLiteral = lit.comparator().compare(min, lit.value()); + final int literalVsMax = lit.comparator().compare(lit.value(), max); + + // is the min-max range within that of the predicate? + boolean inRange = + switch (pred.op()) { + // min value is less than the literal + case LT -> minVsLiteral < 0; + // min value is less than or equal to the literal + case LT_EQ -> minVsLiteral <= 0; + // max value is > the literal + case GT -> literalVsMax < 0; + // max value is > or == to the literal + case GT_EQ -> literalVsMax <= 0; + // min <= lit and max >= min so one element + // may match lit. + case EQ -> minVsLiteral <= 0 && literalVsMax <= 0; + // anything else + default -> true; + }; + + return inRange; + } + + /** + * Evaluate an IN predicate against a shredded variant column's min/max statistics. + * + *

A row group can be skipped if no value in the IN set falls within [min, max]. + * + * @param pred IN predicate + * @param extract the bound extract term + * @param parquetType the Parquet type of the shredded column + * @param colStats column statistics + * @param type of the predicate values + * @return true if the row group might match, false if it cannot match + */ + @SuppressWarnings("unchecked") + private boolean evalMembershipPredicateOnShreddedVariant( + BoundPredicate pred, + BoundExtract extract, + PrimitiveType parquetType, + Statistics colStats) { + + if (pred.op() != Expression.Operation.IN) { + // not looking at other set member operations. + return ROWS_MIGHT_MATCH; + } + Set literalSet = pred.asSetPredicate().literalSet(); + + if (literalSet.size() > IN_PREDICATE_LIMIT) { + return ROWS_MIGHT_MATCH; + } + LOG.info("Set membership evaluation"); + Function converter = + ParquetConversions.converterFromParquet(parquetType, extract.type()); + T min = (T) converter.apply(colStats.genericGetMin()); + T max = (T) converter.apply(colStats.genericGetMax()); + + // keep only values that are >= min + Collection candidates = + literalSet.stream().filter(v -> pred.term().comparator().compare(min, v) <= 0).toList(); + + // nothing is above the minimum + if (candidates.isEmpty()) { + return ROWS_CANNOT_MATCH; + } + + // second filter: keep only values that are <= max + candidates = + candidates.stream().filter(v -> pred.term().comparator().compare(max, v) >= 0).toList(); + + final boolean match = candidates.isEmpty() ? ROWS_CANNOT_MATCH : ROWS_MIGHT_MATCH; + LOG.info("Outcome match={}", match); + return match; + } + + /** + * Evaluate a Unary Predicate. Pulled out of the main compareVariant() call due to checkstyle + * rejecting the complexity of that method. + * + * @param pred predicate being evaluated. + * @param colStats column statistics + * @param valueCount number of elements in the rowgroup + * @param type of predicate + * @return true if the rowgroup should be read. + */ + private boolean evalUnaryPredicate( + BoundPredicate pred, Statistics colStats, long valueCount) { + LOG.info("Evaluating unary predicate: {}", pred.op()); + switch (pred.op()) { + case IS_NULL -> { + // If every row has a non-null typed value, no row can match IS_NULL + if (!colStats.isEmpty() && colStats.isNumNullsSet() && colStats.getNumNulls() == 0) { + return ROWS_CANNOT_MATCH; + } + return ROWS_MIGHT_MATCH; + } + case NOT_NULL -> { + // If every row has a null typed value, no row can match NOT_NULL + if (!colStats.isEmpty() && allNulls(colStats, valueCount)) { + return ROWS_CANNOT_MATCH; + } + return ROWS_MIGHT_MATCH; + } + default -> { + return ROWS_MIGHT_MATCH; + } + } + } + @Override public Boolean handleNonReference(Bound term) { return ROWS_MIGHT_MATCH; @@ -584,6 +885,8 @@ static boolean nullMinMax(Statistics statistics) { /** * The internal logic of Parquet-MR says that if numNulls is set but hasNonNull value is false, * then the min/max of the column are undefined. + * + *

Parquet also sets this for a float/double if there is a NaN in the row group. */ static boolean minMaxUndefined(Statistics statistics) { return (statistics.isNumNullsSet() && !statistics.hasNonNullValue()) || nullMinMax(statistics); @@ -596,4 +899,97 @@ static boolean allNulls(Statistics statistics, long valueCount) { private static boolean mayContainNull(Statistics statistics) { return !statistics.isNumNullsSet() || statistics.getNumNulls() > 0; } + + /** + * Scan the schema for variant types and build a map of variant columns. + * + * @param fileSchema file schema. + * @return the map of variant column names, may be empty. + */ + private Map buildVariantColumnNames(MessageType fileSchema) { + LOG.info("Building variant column names..."); + Map names = Maps.newHashMap(); + for (org.apache.parquet.schema.Type field : fileSchema.getFields()) { + if (field.getId() != null) { + int id = field.getId().intValue(); + Type icebergType = schema.findType(id); + if (icebergType != null && icebergType.isVariantType()) { + names.put(id, field.getName()); + } + } + } + LOG.info("Found {} names", names.size()); + return names; + } + + /** + * Does an expression have variant predicates? + * + * @param expr expression to evaluate. + * @return true if any part of the expression refers to a variant. + */ + private static boolean hasVariantPredicates(Expression expr) { + return ExpressionVisitors.visit(expr, HasVariantPredicateVisitor.INSTANCE); + } + + /** + * Visitor for scanning an expression for variants. + * + *

This isn't trying to evaluate the expression, so and/or/not aggregate the results, rather + * than apply boolean predicates on child nodes. + */ + private static final class HasVariantPredicateVisitor extends BoundExpressionVisitor { + static final HasVariantPredicateVisitor INSTANCE = new HasVariantPredicateVisitor(); + + @Override + public Boolean alwaysTrue() { + return false; + } + + @Override + public Boolean alwaysFalse() { + return false; + } + + @Override + public Boolean not(Boolean result) { + return result; + } + + @Override + public Boolean and(Boolean leftResult, Boolean rightResult) { + return leftResult || rightResult; + } + + @Override + public Boolean or(Boolean leftResult, Boolean rightResult) { + return leftResult || rightResult; + } + + @Override + public Boolean predicate(BoundPredicate pred) { + return pred.term() instanceof BoundExtract; + } + + @Override + public Boolean handleNonReference(Bound term) { + return false; + } + } + + /** + * The number of times shredded metric predicates have been evaluated. + * + * @return zero or a positive integer + */ + @VisibleForTesting + static long variantPredicatesShreddedMetrics() { + return VARIANT_PREDICATES_SHREDDED_METRICS.get(); + } + + /** Reset the shredded metrics counter. */ + @VisibleForTesting + static void resetShreddedMetricsCounter() { + VARIANT_PREDICATES_SHREDDED_METRICS.set(0); + } } diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetVariantUtil.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetVariantUtil.java index ac418a1127bd..ad298232b09c 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetVariantUtil.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetVariantUtil.java @@ -27,6 +27,8 @@ import java.util.List; import java.util.Objects; import java.util.Optional; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import org.apache.iceberg.expressions.PathUtil; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Lists; @@ -42,6 +44,7 @@ import org.apache.iceberg.variants.VariantValue; import org.apache.iceberg.variants.VariantVisitor; import org.apache.iceberg.variants.Variants; +import org.apache.parquet.hadoop.metadata.ColumnPath; import org.apache.parquet.io.api.Binary; import org.apache.parquet.schema.GroupType; import org.apache.parquet.schema.LogicalTypeAnnotation; @@ -548,4 +551,48 @@ private static Type shreddedPrimitive( return Types.optional(primitive).as(annotation).length(length).named("typed_value"); } } + + /** Matches bracket-notation path segments like {@code ['fieldName']} in normalized paths. */ + private static final Pattern PATH_SEGMENT_PATTERN = Pattern.compile("\\['([^']+)'\\]"); + + /** + * Build the Parquet column path for a shredded variant field. + * + *

For a variant column named {@code v} and path {@code $['price']}, the physical Parquet + * column path is {@code ["v", "typed_value", "price", "typed_value"]}. For nested paths like + * {@code $['user']['name']}, the path is {@code ["v", "typed_value", "user", "typed_value", + * "name", "typed_value"]}. For the root path {@code $}, it is {@code ["v", "typed_value"]}. + *

TODO: This doesn't handle numeric indices in the list, unless that becomes part of the schema. + * + * + */ + static ColumnPath shreddedColumnPath(String colName, String normalizedPath) { + List segments = parsePathSegments(normalizedPath); + String[] pathArray = new String[(1 + segments.size()) * 2]; + pathArray[0] = colName; + pathArray[1] = "typed_value"; + for (int i = 0, offset = 2; i < segments.size(); i++, offset += 2) { + pathArray[offset] = segments.get(i); + pathArray[offset + 1] = "typed_value"; + } + return ColumnPath.get(pathArray); + } + + /** + * Parse a normalized RFC9535 bracket-notation path into field name segments. The root path {@code + * $} returns an empty list. Paths are produced by {@code PathUtil#toNormalizedPath}() and always + * use single-quoted bracket notation with identifiers that contain only letters, digits, underscores, + * and high-Unicode characters (no escape sequences). + * + * @param normalizedPath path to parse + * @return the path segments. + */ + static List parsePathSegments(String normalizedPath) { + List segments = Lists.newArrayList(); + Matcher matcher = PATH_SEGMENT_PATTERN.matcher(normalizedPath); + while (matcher.find()) { + segments.add(matcher.group(1)); + } + return segments; + } } diff --git a/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetVariantUtil.java b/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetVariantUtil.java new file mode 100644 index 000000000000..9234b925b3d2 --- /dev/null +++ b/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetVariantUtil.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.parquet; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.List; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.FieldSource; + +/** Tests for {@code ParquetVariantUti]} */ +public class TestParquetVariantUtil { + + @ParameterizedTest + @FieldSource("PATH_MAPPINGS") + public void testParsePathSegments(Parsing args) { + assertThat(ParquetVariantUtil.parsePathSegments(args.input)) + .describedAs("Parsing of path segments %s", args.input) + .containsExactly(args.output); + } + + /** + * A variant path parses to a seqence of column names. + * + * @param input input string + * @param output varargs list of result + */ + private record Parsing(String input, String... output) {} + + private static final List PATH_MAPPINGS = + List.of( + new Parsing("$"), + new Parsing("$['a']['b']", "a", "b"), + new Parsing("$['0']['1']", "0", "1"), + new Parsing("$['user']['firstName']", "user", "firstName"), + new Parsing("$['_under_score']", "_under_score")); +} diff --git a/parquet/src/test/java/org/apache/iceberg/parquet/TestShreddedVariantRowGroupFilter.java b/parquet/src/test/java/org/apache/iceberg/parquet/TestShreddedVariantRowGroupFilter.java new file mode 100644 index 000000000000..367025c0b1b6 --- /dev/null +++ b/parquet/src/test/java/org/apache/iceberg/parquet/TestShreddedVariantRowGroupFilter.java @@ -0,0 +1,796 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.parquet; + +import static org.apache.iceberg.expressions.Expressions.and; +import static org.apache.iceberg.expressions.Expressions.equal; +import static org.apache.iceberg.expressions.Expressions.extract; +import static org.apache.iceberg.expressions.Expressions.greaterThan; +import static org.apache.iceberg.expressions.Expressions.greaterThanOrEqual; +import static org.apache.iceberg.expressions.Expressions.in; +import static org.apache.iceberg.expressions.Expressions.isNull; +import static org.apache.iceberg.expressions.Expressions.lessThan; +import static org.apache.iceberg.expressions.Expressions.lessThanOrEqual; +import static org.apache.iceberg.expressions.Expressions.not; +import static org.apache.iceberg.expressions.Expressions.notNull; +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.IOException; +import java.util.List; +import java.util.Set; +import java.util.UUID; +import org.apache.iceberg.Schema; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.data.parquet.InternalWriter; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.UnboundTerm; +import org.apache.iceberg.inmemory.InMemoryOutputFile; +import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.variants.ShreddedObject; +import org.apache.iceberg.variants.Variant; +import org.apache.iceberg.variants.VariantMetadata; +import org.apache.iceberg.variants.VariantTestUtil; +import org.apache.iceberg.variants.Variants; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.metadata.BlockMetaData; +import org.apache.parquet.schema.MessageType; +import org.assertj.core.api.AbstractBooleanAssert; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Tests for shredded variant row-group skipping in {@link ParquetMetricsRowGroupFilter}. + * + *

Verifies that when a variant column has shredded fields (e.g. {@code $.price}), the filter can + * use column chunk statistics to skip row groups that cannot match a predicate. + */ +class TestShreddedVariantRowGroupFilter { + + private static final Logger LOG = + LoggerFactory.getLogger(TestShreddedVariantRowGroupFilter.class); + + private static final Schema SCHEMA = + new Schema( + Types.NestedField.required(1, "id", Types.LongType.get()), + Types.NestedField.optional(2, "var", Types.VariantType.get())); + + private static final VariantMetadata METADATA = + VariantMetadata.from(VariantTestUtil.createMetadata(Set.of("price", "name", "user"), true)); + + private static final VariantMetadata UUID_METADATA = + VariantMetadata.from(VariantTestUtil.createMetadata(Set.of("deviceid"), true)); + + /** Price field evaluated as an integer. */ + private static final UnboundTerm PRICE = extract("var", "$.price", "int"); + + /** Name field evaluated as a string. */ + private static final UnboundTerm NAME = extract("var", "$.name", "string"); + + /** Device ID as a UUID. */ + private static final UnboundTerm DEVICEID_UUID = extract("var", "$.deviceid", "uuid"); + + /** Zero UUID. */ + private static final UUID UUID_ZERO = UUID.fromString("00000000-0000-0000-0000-000000000000"); + + private static final UUID UUID_LOW = UUID.fromString("00000000-0000-0000-0000-000000000001"); + private static final UUID UUID_MID = UUID.fromString("00000000-0000-0000-0000-000000000005"); + private static final UUID UUID_HIGH = UUID.fromString("00000000-0000-0000-0000-00000000000a"); + + /** IUnknown is abvoe the high marker. */ + private static final UUID UUID_ABOVE_HIGH = + UUID.fromString("00000000-0000-0000-C000-000000000046"); + + @BeforeEach + void before() { + ParquetMetricsRowGroupFilter.resetShreddedMetricsCounter(); + } + + @Test + void testShreddedIntLessThan() throws IOException { + // Row group has prices 10..14; predicate price < 10 should skip + List variants = intPriceVariants(10, 11, 12, 13, 14); + assertThat(shouldRead(lessThan(PRICE, 10), variants)) + .as("Should skip: all prices >= 10, predicate requires price < 10") + .isFalse(); + } + + @Test + void testShreddedIntLessThanOverlapping() throws IOException { + // Row group has prices 10..14; predicate price < 11 should read + List variants = intPriceVariants(10, 11, 12, 13, 14); + + assertThat(shouldRead(lessThan(PRICE, 11), variants)) + .as("Should read: price range [10,14] overlaps price < 11") + .isTrue(); + } + + @Test + void testShreddedIntLessThanOrEqual() throws IOException { + List variants = intPriceVariants(10, 11, 12, 13, 14); + assertThat(shouldRead(lessThanOrEqual(PRICE, 9), variants)) + .as("Should skip: all prices >= 10, predicate requires price <= 9") + .isFalse(); + + assertThat(shouldRead(lessThanOrEqual(PRICE, 10), variants)) + .as("Should read: min price == 10 matches price <= 10") + .isTrue(); + } + + @Test + void testShreddedIntGreaterThan() throws IOException { + List variants = intPriceVariants(10, 11, 12, 13, 14); + assertThat(shouldRead(greaterThan(PRICE, 14), variants)) + .as("Should skip: all prices <= 14, predicate requires price > 14") + .isFalse(); + + assertThat(shouldRead(greaterThan(PRICE, 13), variants)) + .as("Should read: price range [10,14] overlaps price > 13") + .isTrue(); + } + + @Test + void testShreddedIntGreaterThanOrEqual() throws IOException { + List variants = intPriceVariants(10, 11, 12, 13, 14); + assertThat(shouldRead(greaterThanOrEqual(PRICE, 15), variants)) + .as("Should skip: all prices <= 14, predicate requires price >= 15") + .isFalse(); + + assertThat(shouldRead(greaterThanOrEqual(PRICE, 14), variants)) + .as("Should read: max price == 14 matches price >= 14") + .isTrue(); + } + + @Test + void testShreddedIntEqual() throws IOException { + List variants = intPriceVariants(10, 11, 12, 13, 14); + assertThat(shouldRead(equal(PRICE, 9), variants)).as("Should skip: 9 < min price 10").isFalse(); + assertThat(shouldRead(equal(PRICE, 15), variants)) + .as("Should skip: 15 > max price 14") + .isFalse(); + assertThat(shouldRead(equal(PRICE, 12), variants)) + .as("Should read: 12 is within price range [10,14]") + .isTrue(); + } + + @Test + void testShreddedIsNullNoNulls() throws IOException { + // All rows have the typed price value (no nulls in typed_value) + List variants = intPriceVariants(10, 11, 12); + assertThat(shouldRead(isNull(PRICE), variants)) + .as("Should skip: all rows have typed price, IS_NULL cannot match") + .isFalse(); + } + + @Test + void testShreddedNotNullAllNulls() throws IOException { + // All rows have $.price as variant null; typed_value will be null for all rows + ImmutableList.Builder builder = ImmutableList.builder(); + for (int i = 0; i < 3; i++) { + ShreddedObject obj = Variants.object(METADATA); + obj.put("price", Variants.ofNull()); // explicit variant null → typed_value is null + builder.add(Variant.of(METADATA, obj)); + } + List variants = builder.build(); + + // Shredding function says price is an int; but values are variant nulls → typed_value all null + ShreddedObject example = Variants.object(METADATA); + example.put("price", Variants.of(0)); + VariantShreddingFunction shreddingFunc = + (id, name) -> ParquetVariantUtil.toParquetSchema(example); + + assertThat(shouldRead(notNull(PRICE), variants, shreddingFunc)) + .as("Should skip: no rows have typed price, NOT_NULL cannot match") + .isFalse(); + } + + @Test + void testUnshreddedPathMightMatch() throws IOException { + // $.name is not shredded; filter should return MIGHT_MATCH (true) + List variants = intPriceVariants(10, 11, 12); + // Use the price shredding function — $.name is not shredded so falls back + assertThat(shouldRead(equal(NAME, "foo"), variants)) + .as("Should read: unshredded path must fall back to MIGHT_MATCH") + .isTrue(); + } + + @Test + void testNestedShreddedPath() throws IOException { + // Shred $.user.name as a string + ShreddedObject user1 = Variants.object(METADATA); + user1.put("name", Variants.of("alice")); + ShreddedObject row1 = Variants.object(METADATA); + row1.put("user", user1); + + ShreddedObject user2 = Variants.object(METADATA); + user2.put("name", Variants.of("bob")); + ShreddedObject row2 = Variants.object(METADATA); + row2.put("user", user2); + + List variants = + ImmutableList.of(Variant.of(METADATA, row1), Variant.of(METADATA, row2)); + + // Shred $.user as an object with a shredded .name inside + VariantShreddingFunction shreddingFunc = (id, name) -> ParquetVariantUtil.toParquetSchema(row1); + + // The path $.user is shredded but $.user.name is nested deeper — + // the filter should handle multi-level typed_value paths correctly + final UnboundTerm username = extract("var", "$.user.name", "string"); + assertThat(shouldRead(equal(username, "charlie"), variants, shreddingFunc)) + .as("Should skip: 'charlie' is outside the [alice, bob] range") + .isFalse(); + + assertThat(shouldRead(equal(username, "alice"), variants, shreddingFunc)) + .as("Should read: 'alice' is within the range") + .isTrue(); + } + + @Test + void testShreddedLongPredicates() throws IOException { + List variants = longPriceVariants(100L, 101L, 102L, 103L, 104L); + + final UnboundTerm term = extract("var", "$.price", "long"); + assertNotRead(lessThan(term, 100L), variants, "< 100"); + assertIsRead(lessThanOrEqual(term, 100L), variants, "<= 100"); + assertIsRead(lessThan(term, 105L), variants, "< 105"); + assertIsRead(lessThan(term, 1119L), variants, "< 119L"); + + assertNotRead(greaterThan(term, 104L), variants, "> 104"); + assertIsRead(greaterThanOrEqual(term, 104L), variants, " >= 104"); + assertIsRead(greaterThan(term, 103L), variants, "> 103"); + assertIsRead(greaterThan(term, 99L), variants, "> 99"); + + assertNotRead(equal(term, 99L), variants, "= 99"); + assertNotRead(equal(term, 105L), variants, "= 105"); + assertIsRead(equal(term, 102L), variants, "= 102"); + assertNotRead(equal(term, 99L), variants, "= 105"); + // some not terms to see how they come out + assertIsRead(not(equal(term, 108L)), variants, "!(= 108)"); + assertIsRead(not((greaterThan(term, 104L))), variants, "!(> 104)"); + } + + private AbstractBooleanAssert assertExpression( + Expression expr, List variants, String text) throws IOException { + return assertThat(shouldRead(expr, variants)) + .as( + "Predicate '%s' on `range [%s, %s]", + text, variants.get(0).value(), variants.get(variants.size() - 1)); + } + + private void assertIsRead(Expression expr, List variants, String text) + throws IOException { + assertExpression(expr, variants, text).isTrue(); + LOG.info("Predicate '{}' succeeded", text); + } + + private void assertNotRead(Expression expr, List variants, String text) + throws IOException { + assertExpression(expr, variants, text).isFalse(); + LOG.info("Predicate '{}' succeessfully rejected", text); + } + + @Test + void testShreddedFloatPredicates() throws IOException { + List variants = floatPriceVariants(1.0F, 2.0F, 3.0F); + + final UnboundTerm term = extract("var", "$.price", "float"); + assertNotRead(lessThan(term, 1.0F), variants, "< 1.0F"); + assertNotRead(greaterThan(term, 3.0F), variants, "> 3.0F"); + // float equality is always dubious + assertIsRead(equal(term, 2.0F), variants, "= 2.0F"); + } + + @Test + void testShreddedFloatNaNDropsMinMax() throws IOException { + // When any value is NaN, Parquet drops min/max → filter must return MIGHT_MATCH + List variants = floatPriceVariants(1.0F, Float.NaN, 3.0F); + + // Even though 99.0 is nowhere near [1.0, NaN, 3.0], stats are dropped so must read + final UnboundTerm term = extract("var", "$.price", "float"); + assertIsRead(equal(term, 99.0F), variants, "= 99.0F with a NaN in the row group"); + assertIsRead( + greaterThan(term, 1000.0F), + variants, + "> 1000; NaN in row group causes Parquet to drop min/max → MIGHT_MATCH"); + } + + @Test + void testShreddedDoublePredicates() throws IOException { + List variants = doublePriceVariants(1.0D, 2.0D, 3.0D); + + final UnboundTerm term = extract("var", "$.price", "double"); + assertNotRead(lessThan(term, 1.0D), variants, "< 1.0D"); + assertNotRead(greaterThan(term, 3.0D), variants, "> 3.0D"); + assertIsRead(equal(term, 2.0D), variants, "= 2.0D"); + } + + @Test + void testShreddedDoubleNaNDropsMinMax() throws IOException { + // When any value is NaN, Parquet drops min/max → filter MUST return MIGHT_MATCH + List variants = doublePriceVariants(1.0D, Double.NaN, 3.0D); + VariantShreddingFunction shreddingFunc = doublePriceShreddingFunc(); + + final UnboundTerm term = extract("var", "$.price", "double"); + assertThat(shouldRead(equal(term, 99.0D), variants, shreddingFunc)) + .as("Should read: NaN in row group causes Parquet to drop min/max → MIGHT_MATCH") + .isTrue(); + assertThat(shouldRead(greaterThan(term, 1000.0D), variants, shreddingFunc)) + .as("Should read: NaN in row group causes Parquet to drop min/max → MIGHT_MATCH") + .isTrue(); + } + + @Test + void testShreddedIntNotEqual() throws IOException { + // NOT_EQ always returns MIGHT_MATCH + final UnboundTerm term = PRICE; + + assertIsRead(not(equal(term, 10)), intPriceVariants(10, 10, 10), "!(= 10) all values are 10"); + assertIsRead( + not(equal(term, 10)), intPriceVariants(10, 11, 12, 13, 14), "!(= 10) range [10,14]"); + assertIsRead( + not(equal(term, 99)), intPriceVariants(10, 11, 12, 13, 14), "!(= 99) range [10,14]"); + } + + @Test + void testShreddedStringPredicates() throws IOException { + // $.name shredded as a string field; names span "alice".."charlie" + List variants = nameStringVariants("alice", "bob", "charlie"); + VariantShreddingFunction shreddingFunc = nameStringShreddingFunc(); + + final UnboundTerm term = NAME; + + // Nothing is lexicographically less than the minimum "alice" + assertThat(shouldRead(lessThan(term, "alice"), variants, shreddingFunc)) + .as("Should skip: 'alice' is min, nothing is < 'alice'") + .isFalse(); + + // Nothing is greater than the maximum "charlie" + assertThat(shouldRead(greaterThan(term, "charlie"), variants, shreddingFunc)) + .as("Should skip: 'charlie' is max, nothing is > 'charlie'") + .isFalse(); + + // "david" sorts after max "charlie" → EQ cannot match + assertThat(shouldRead(equal(term, "david"), variants, shreddingFunc)) + .as("Should skip: 'david' > max 'charlie'") + .isFalse(); + + // "aardvark" sorts before min "alice" → EQ cannot match + assertThat(shouldRead(equal(term, "aardvark"), variants, shreddingFunc)) + .as("Should skip: 'aardvark' < min 'alice'") + .isFalse(); + + // "bob" is within [alice, charlie] → EQ might match + assertThat(shouldRead(equal(term, "bob"), variants, shreddingFunc)) + .as("Should read: 'bob' is within [alice, charlie]") + .isTrue(); + + // GT "alice" → might match (max "charlie" > "alice") + assertThat(shouldRead(greaterThan(term, "alice"), variants, shreddingFunc)) + .as("Should read: range [alice, charlie] overlaps > 'alice'") + .isTrue(); + } + + @Test + void testShreddedLiteralAllNulls() throws IOException { + // All typed_value entries are null: any literal predicate must return CANNOT_MATCH + ImmutableList.Builder builder = ImmutableList.builder(); + for (int i = 0; i < 3; i++) { + ShreddedObject obj = Variants.object(METADATA); + obj.put("price", Variants.ofNull()); // variant null → typed_value written as null + builder.add(Variant.of(METADATA, obj)); + } + List variants = builder.build(); + + // Shredding function defines price as int so the typed_value column exists in the schema + ShreddedObject example = Variants.object(METADATA); + example.put("price", Variants.of(0)); + VariantShreddingFunction shreddingFunc = + (id, name) -> ParquetVariantUtil.toParquetSchema(example); + + final UnboundTerm term = PRICE; + assertThat(shouldRead(equal(term, 5), variants, shreddingFunc)) + .as("Should skip: all typed_value are null, EQ cannot match") + .isFalse(); + assertThat(shouldRead(greaterThan(term, 0), variants, shreddingFunc)) + .as("Should skip: all typed_value are null, GT cannot match") + .isFalse(); + assertThat(shouldRead(lessThan(term, 100), variants, shreddingFunc)) + .as("Should skip: all typed_value are null, LT cannot match") + .isFalse(); + } + + @Test + void testShreddedIsNullSomeNulls() throws IOException { + // Mix of typed_value (non-null) and null typed_value — IS_NULL should read + ShreddedObject example = Variants.object(METADATA); + example.put("price", Variants.of(0)); + VariantShreddingFunction shreddingFunc = + (id, name) -> ParquetVariantUtil.toParquetSchema(example); + + ImmutableList.Builder builder = ImmutableList.builder(); + for (int price : new int[] {10, 11, 12}) { + ShreddedObject obj = Variants.object(METADATA); + obj.put("price", Variants.of(price)); + builder.add(Variant.of(METADATA, obj)); + } + for (int i = 0; i < 2; i++) { + ShreddedObject obj = Variants.object(METADATA); + obj.put("price", Variants.ofNull()); // null typed_value + builder.add(Variant.of(METADATA, obj)); + } + + assertThat(shouldRead(isNull(PRICE), builder.build(), shreddingFunc)) + .as("Should read: some rows have null typed_value, IS_NULL might match") + .isTrue(); + } + + @Test + void testShreddedNotNullSomeValues() throws IOException { + // Mix of typed_value (non-null) and null typed_value — NOT_NULL should read + ShreddedObject example = Variants.object(METADATA); + example.put("price", Variants.of(0)); + VariantShreddingFunction shreddingFunc = + (id, name) -> ParquetVariantUtil.toParquetSchema(example); + + ImmutableList.Builder builder = ImmutableList.builder(); + for (int price : new int[] {10, 11, 12}) { + ShreddedObject obj = Variants.object(METADATA); + obj.put("price", Variants.of(price)); + builder.add(Variant.of(METADATA, obj)); + } + for (int i = 0; i < 2; i++) { + ShreddedObject obj = Variants.object(METADATA); + obj.put("price", Variants.ofNull()); // null typed_value + builder.add(Variant.of(METADATA, obj)); + } + + assertThat(shouldRead(notNull(PRICE), builder.build(), shreddingFunc)) + .as("Should read: some rows have non-null typed_value, NOT_NULL might match") + .isTrue(); + } + + @Test + void testShreddedStringMaxIsAllMaxCodepoints() throws IOException { + // U+10FFFF is the highest Unicode code point; its UTF-8 encoding ends in bytes 0xBF 0xBF, + // which approach the 0xFF boundary. A string of 17 such characters is longer than the + // Iceberg-metrics truncation limit of 16 code points, so UnicodeUtil.truncateStringMax + // returns null (no valid upper bound can be computed by incrementing any code point because + // every code point is already at the maximum). This tests that the Parquet column chunk + // statistics — which store the exact bytes, not a truncated approximation — still allow the + // filter to make correct skip decisions. + final String maxCodepoint = "\uDBFF\uDFFF"; // U+10FFFF as a Java surrogate pair + final String allMax = maxCodepoint.repeat(17); // > 16 code points: truncation returns null + List variants = nameStringVariants("alpha", allMax); + VariantShreddingFunction shreddingFunc = nameStringShreddingFunc(); + + final UnboundTerm term = NAME; + + // Exact Parquet stats: max is known, so GT(max) can skip + assertThat(shouldRead(greaterThan(term, allMax), variants, shreddingFunc)) + .as("Should skip: nothing is greater than the all-U+10FFFF max") + .isFalse(); + + // EQ at the exact max: value is present, cannot skip + assertThat(shouldRead(equal(term, allMax), variants, shreddingFunc)) + .as("Should read: all-U+10FFFF string is the max value") + .isTrue(); + + // One extra U+10FFFF pushes the literal beyond the stored max + assertThat(shouldRead(equal(term, allMax + maxCodepoint), variants, shreddingFunc)) + .as("Should skip: literal is strictly greater than the max") + .isFalse(); + + // LT at min: nothing below "alpha" + assertThat(shouldRead(lessThan(term, "alpha"), variants, shreddingFunc)) + .as("Should skip: nothing is less than min 'alpha'") + .isFalse(); + + // Mid-range value: within [alpha, allMax] + assertThat(shouldRead(equal(term, "beta"), variants, shreddingFunc)) + .as("Should read: 'beta' is within [alpha, all-U+10FFFF]") + .isTrue(); + } + + @Test + void testShreddedSingleValueAtExactBoundary() throws IOException { + // When a row group contains only one distinct value (min == max), predicates at that exact + // boundary must be evaluated correctly: LT and GT should skip (the value cannot satisfy them), + // while LT_EQ, GT_EQ, and EQ should read (the value might satisfy them). + List variants = intPriceVariants(10, 10, 10); // all rows have price = 10 + final UnboundTerm term = PRICE; + + // min=10, LT(10): minVsLiteral=0, 0 < 0 is false → CANNOT_MATCH + assertNotRead(lessThan(term, 10), variants, "< 10 when all values are 10"); + // max=10, GT(10): literalVsMax=0, 0 < 0 is false → CANNOT_MATCH + assertNotRead(greaterThan(term, 10), variants, "> 10 when all values are 10"); + // min=10, LT_EQ(10): minVsLiteral=0, 0 <= 0 is true → MIGHT_MATCH + assertIsRead(lessThanOrEqual(term, 10), variants, "<= 10 when all values are 10"); + // max=10, GT_EQ(10): literalVsMax=0, 0 <= 0 is true → MIGHT_MATCH + assertIsRead(greaterThanOrEqual(term, 10), variants, ">= 10 when all values are 10"); + // EQ(10): both comparisons are 0 → MIGHT_MATCH + assertIsRead(equal(term, 10), variants, "= 10 when all values are 10"); + } + + @Test + void testShreddedAndCompoundPredicate() throws IOException { + // AND over two variant extract predicates: the row group can be skipped if either arm cannot + // match, and must be read only when both arms might match. + List variants = intPriceVariants(10, 11, 12); // min=10, max=12 + final UnboundTerm term = PRICE; + + // Both arms overlap the range [10,12] → MIGHT_MATCH + assertIsRead( + and(greaterThan(term, 5), lessThan(term, 15)), variants, "5 < price < 15 overlaps [10,12]"); + // two predicates will be evaluated here, min and max. + // this highlights at an in-range query will be expensive on unshredded numbers + // as they will need to be read twice. + assertShreddedMetricsProcessed(2); + + // GT(20) cannot match (max=12 < 20) → AND short-circuits to CANNOT_MATCH + assertNotRead( + and(greaterThan(term, 20), lessThan(term, 25)), + variants, + "20 < price < 25 is above the range [10,12]"); + + // LT(8) cannot match (min=10 > 8) → AND short-circuits to CANNOT_MATCH + assertNotRead( + and(greaterThan(term, 5), lessThan(term, 8)), + variants, + "5 < price < 8 is below the range [10,12]"); + } + + @Test + void testShreddedSetMembership() throws IOException { + List variants = intPriceVariants(10, 11, 12); // min=10, max=12 + final UnboundTerm term = PRICE; + + // All set values are below the range — row group can be skipped + assertNotRead(in(term, 1, 2, 3), variants, "IN {1,2,3} with range [10..12]"); + assertShreddedMetricsProcessed(1); + + // All set values are above the range — row group can be skipped + assertNotRead(in(term, 20, 30), variants, "IN {20,30} with range [10..12]"); + // Single value that is below the range — row group can be skipped + assertNotRead(in(term, 5), variants, "IN {5} with range [10..12]"); + assertNotRead(in(term, -100, 400), variants, "IN {-100, 400} with range [10..12]"); + // At least one set value is within the range — row group must be read + assertIsRead(in(term, 9, 10), variants, "IN {9,10} with range [10..12]"); + assertIsRead(in(term, 12, 13), variants, "IN {12,13} with range [10..12]"); + assertIsRead(in(term, 10, 11, 12), variants, "IN {10,11,12} with range [10..12]"); + // Exact boundary matches — row group must be read + assertIsRead(in(term, 10), variants, "IN {10} with range [10..12]"); + assertIsRead(in(term, 12), variants, "IN {12} with range [10..12]"); + } + + @Test + void testShreddedUUIDEqual() throws IOException { + // Row group has deviceid range [UUID_LOW, UUID_HIGH] + List variants = uuidDeviceIdVariants(UUID_LOW, UUID_MID, UUID_HIGH); + final UnboundTerm term = DEVICEID_UUID; + + // EQ with a value strictly below the range — row group can be skipped + assertNotRead(equal(term, UUID_ZERO), variants, "= nil UUID, range [LOW, HIGH]"); + + // EQ with a value strictly above the range — row group can be skipped + assertNotRead(equal(term, UUID_ABOVE_HIGH), variants, "= UUID 100, range [LOW, HIGH]"); + + // EQ with a value within the range — row group must be read + assertIsRead(equal(term, UUID_MID), variants, "= UUID_MID within [LOW, HIGH]"); + + // EQ with exact boundary values — row group must be read + assertIsRead(equal(term, UUID_LOW), variants, "= UUID_LOW, lower boundary"); + assertIsRead(equal(term, UUID_HIGH), variants, "= UUID_HIGH, upper boundary"); + } + + @Test + void testShreddedUUIDNotEqual() throws IOException { + List variants = uuidDeviceIdVariants(UUID_LOW, UUID_MID, UUID_HIGH); + final UnboundTerm term = DEVICEID_UUID; + + // NOT_EQ never uses min/max to skip — always MIGHT_MATCH + assertIsRead(not(equal(term, UUID_MID)), variants, "!= UUID_MID with range [LOW, HIGH]"); + assertIsRead( + not(equal(term, UUID_MID)), + uuidDeviceIdVariants(UUID_MID, UUID_MID, UUID_MID), + "!= UUID_MID, all values equal UUID_MID"); + } + + @Test + void testShreddedUUIDIn() throws IOException { + ParquetMetricsRowGroupFilter.resetShreddedMetricsCounter(); + + // Row group has deviceid range [UUID_LOW, UUID_HIGH] + List variants = uuidDeviceIdVariants(UUID_LOW, UUID_MID, UUID_HIGH); + final UnboundTerm term = DEVICEID_UUID; + + // All set values are below the range — row group can be skipped + assertNotRead(in(term, UUID_ZERO), variants, "IN {nil UUID}, range [LOW, HIGH]"); + int expected = 1; + assertShreddedMetricsProcessed(expected++); + + // All set values are above the range — row group can be skipped + assertNotRead(in(term, UUID_ABOVE_HIGH), variants, "IN {UUID 100}, range [LOW, HIGH]"); + assertShreddedMetricsProcessed(expected++); + + assertNotRead( + in(term, UUID_ABOVE_HIGH, UUID.fromString("00000000-0000-0000-0000-0000000000c8")), + variants, + "IN {UUID 100, UUID 200}, range [LOW, HIGH]"); + assertShreddedMetricsProcessed(expected++); + + // At least one set value is within the range — row group must be read + assertIsRead(in(term, UUID_MID), variants, "IN {UUID_MID} within [LOW, HIGH]"); + assertShreddedMetricsProcessed(expected++); + assertIsRead( + in(term, UUID_ZERO, UUID_MID), variants, "IN {below, UUID_MID} straddles [LOW, HIGH]"); + assertShreddedMetricsProcessed(expected++); + assertIsRead( + in(term, UUID_LOW, UUID_MID, UUID_HIGH), + variants, + "IN {LOW, MID, HIGH} covers [LOW, HIGH]"); + assertShreddedMetricsProcessed(expected++); + + // Exact boundary values — row group must be read + assertIsRead(in(term, UUID_LOW), variants, "IN {UUID_LOW}, lower boundary"); + assertShreddedMetricsProcessed(expected++); + assertIsRead(in(term, UUID_HIGH), variants, "IN {UUID_HIGH}, upper boundary"); + assertShreddedMetricsProcessed(expected++); + } + + private static void assertShreddedMetricsProcessed(final int expected) { + assertThat(ParquetMetricsRowGroupFilter.variantPredicatesShreddedMetrics()) + .describedAs("Count of shredded metrics filtered on in predicates") + .isEqualTo(expected); + } + + // --- helpers --- + + private List intPriceVariants(int... prices) { + ImmutableList.Builder builder = ImmutableList.builder(); + for (int price : prices) { + ShreddedObject obj = Variants.object(METADATA); + obj.put("price", Variants.of(price)); + builder.add(Variant.of(METADATA, obj)); + } + return builder.build(); + } + + private List longPriceVariants(long... prices) { + ImmutableList.Builder builder = ImmutableList.builder(); + for (long price : prices) { + ShreddedObject obj = Variants.object(METADATA); + obj.put("price", Variants.of(price)); + builder.add(Variant.of(METADATA, obj)); + } + return builder.build(); + } + + private List floatPriceVariants(float... prices) { + ImmutableList.Builder builder = ImmutableList.builder(); + for (float price : prices) { + ShreddedObject obj = Variants.object(METADATA); + obj.put("price", Variants.of(price)); + builder.add(Variant.of(METADATA, obj)); + } + return builder.build(); + } + + private List doublePriceVariants(double... prices) { + ImmutableList.Builder builder = ImmutableList.builder(); + for (double price : prices) { + ShreddedObject obj = Variants.object(METADATA); + obj.put("price", Variants.of(price)); + builder.add(Variant.of(METADATA, obj)); + } + return builder.build(); + } + + private VariantShreddingFunction floatPriceShreddingFunc() { + ShreddedObject example = Variants.object(METADATA); + example.put("price", Variants.of(0.0F)); + return (id, name) -> ParquetVariantUtil.toParquetSchema(example); + } + + private VariantShreddingFunction doublePriceShreddingFunc() { + ShreddedObject example = Variants.object(METADATA); + example.put("price", Variants.of(0.0D)); + return (id, name) -> ParquetVariantUtil.toParquetSchema(example); + } + + private List nameStringVariants(String... names) { + ImmutableList.Builder builder = ImmutableList.builder(); + for (String name : names) { + ShreddedObject obj = Variants.object(METADATA); + obj.put("name", Variants.of(name)); + builder.add(Variant.of(METADATA, obj)); + } + return builder.build(); + } + + private VariantShreddingFunction nameStringShreddingFunc() { + ShreddedObject example = Variants.object(METADATA); + example.put("name", Variants.of("x")); + return (id, name) -> ParquetVariantUtil.toParquetSchema(example); + } + + private List uuidDeviceIdVariants(UUID... uuids) { + ImmutableList.Builder builder = ImmutableList.builder(); + for (UUID uuid : uuids) { + ShreddedObject obj = Variants.object(UUID_METADATA); + obj.put("deviceid", Variants.ofUUID(uuid)); + builder.add(Variant.of(UUID_METADATA, obj)); + } + return builder.build(); + } + + /** + * Should the expression, when evaluated againt the fully shredded set of veriants, require the + * RowGroup to be read? + * + * @param expr expression + * @param variants list of variants + */ + private boolean shouldRead(Expression expr, List variants) throws IOException { + // Derive the shredding schema from the first variant's structure + VariantShreddingFunction shreddingFunc = + (id, name) -> ParquetVariantUtil.toParquetSchema(variants.get(0).value()); + return shouldRead(expr, variants, shreddingFunc); + } + + /** + * Should a set of variants, shred with the supplied shredding function, be read? + * + * @param expr expression + * @param variants list of variants + * @param shreddingFunc shredding function + * @return true if a file containing only these variants should be read. + */ + private boolean shouldRead( + Expression expr, List variants, VariantShreddingFunction shreddingFunc) + throws IOException { + OutputFile out = new InMemoryOutputFile(); + GenericRecord record = GenericRecord.create(SCHEMA); + + FileAppender writer = + Parquet.write(out) + .schema(SCHEMA) + .variantShreddingFunc(shreddingFunc) + .createWriterFunc(fileSchema -> InternalWriter.create(SCHEMA.asStruct(), fileSchema)) + .build(); + + try (writer) { + for (int i = 0; i < variants.size(); i++) { + record.setField("id", (long) i); + record.setField("var", variants.get(i)); + writer.add(record); + } + } + + try (ParquetFileReader reader = ParquetFileReader.open(ParquetIO.file(out.toInputFile()))) { + BlockMetaData rowGroup = reader.getRowGroups().get(0); + MessageType fileSchema = reader.getFileMetaData().getSchema(); + return new ParquetMetricsRowGroupFilter(SCHEMA, expr, true).shouldRead(fileSchema, rowGroup); + } + } +} diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkV2Filters.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkV2Filters.java index 57b9d61e38bd..9ca5d6f61b7a 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkV2Filters.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkV2Filters.java @@ -354,7 +354,7 @@ private static T childAtIndex(Predicate predicate, int index) { private static boolean canConvertToTerm( org.apache.spark.sql.connector.expressions.Expression expr) { - return isRef(expr) || isSystemFunc(expr); + return isRef(expr) || isSystemFunc(expr) || isVariantGetFunc(expr); } private static boolean isRef(org.apache.spark.sql.connector.expressions.Expression expr) { @@ -440,12 +440,50 @@ private static UnboundTerm toTerm(T input) { if (input instanceof NamedReference) { return Expressions.ref(SparkUtil.toColumnName((NamedReference) input)); } else if (input instanceof UserDefinedScalarFunc) { - return udfToTerm((UserDefinedScalarFunc) input); + UserDefinedScalarFunc udf = (UserDefinedScalarFunc) input; + if (isVariantGetFunc(udf)) { + return variantGetToTerm(udf); + } + return udfToTerm(udf); } else { return null; } } + private static boolean isVariantGetFunc( + org.apache.spark.sql.connector.expressions.Expression expr) { + if (!(expr instanceof UserDefinedScalarFunc udf)) { + return false; + } + String name = udf.name().toLowerCase(Locale.ROOT); + return ("variant_get".equals(name) || "try_variant_get".equals(name)) + && udf.children().length == 3 + && isRef(udf.children()[0]) + && isLiteral(udf.children()[1]) + && isLiteral(udf.children()[2]); + } + + private static UnboundTerm variantGetToTerm(UserDefinedScalarFunc udf) { + org.apache.spark.sql.connector.expressions.Expression[] children = udf.children(); + String colName = SparkUtil.toColumnName((NamedReference) children[0]); + String path = convertLiteral((Literal) children[1]).toString(); + String sparkTypeName = convertLiteral((Literal) children[2]).toString(); + String icebergTypeName = sparkTypeNameToIceberg(sparkTypeName); + try { + return Expressions.extract(colName, path, icebergTypeName); + } catch (IllegalArgumentException e) { + return null; + } + } + + private static String sparkTypeNameToIceberg(String sparkTypeName) { + return switch (sparkTypeName.toLowerCase(Locale.ROOT)) { + case "bigint" -> "long"; + case "tinyint", "smallint" -> "int"; + default -> sparkTypeName; + }; + } + @SuppressWarnings("checkstyle:CyclomaticComplexity") private static UnboundTerm udfToTerm(UserDefinedScalarFunc udf) { org.apache.spark.sql.connector.expressions.Expression[] children = udf.children(); diff --git a/spark/v4.1/spark/src/test/java/org/apache/iceberg/parquet/TestSparkVariantFilterPushDown.java b/spark/v4.1/spark/src/test/java/org/apache/iceberg/parquet/TestSparkVariantFilterPushDown.java new file mode 100644 index 000000000000..79eb62a3ef94 --- /dev/null +++ b/spark/v4.1/spark/src/test/java/org/apache/iceberg/parquet/TestSparkVariantFilterPushDown.java @@ -0,0 +1,456 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.parquet; + +import static org.apache.iceberg.PlanningMode.DISTRIBUTED; +import static org.apache.iceberg.PlanningMode.LOCAL; +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.LongStream; +import org.apache.iceberg.Parameter; +import org.apache.iceberg.ParameterizedTestExtension; +import org.apache.iceberg.Parameters; +import org.apache.iceberg.PlanningMode; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.spark.SparkCatalogConfig; +import org.apache.iceberg.spark.SparkSQLProperties; +import org.apache.iceberg.spark.TestBaseWithCatalog; +import org.apache.spark.sql.execution.SparkPlan; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Integration tests for Spark SQL variant predicate pushdown via the Iceberg data source. After + * creating a table with variants (parameterization includes shredded/unshredded), queries are + * issued against the table. + * + *

Each test verifies both the rows returned (correctness) and what pushed Iceberg scan filters + * were present in the physical plan. + * + *

They also make assertions on how many times {@code + * ParquetMetricsRowGroupFilter.compareVariant()} has been invoked on shredded data, which is why it + * needs to be in the same package as that class. + */ +@ExtendWith(ParameterizedTestExtension.class) +public class TestSparkVariantFilterPushDown extends TestBaseWithCatalog { + + public static final String SPARK_VARIANT_GET = + "variant_get(nested, $.varcategory, IntegerType, true, Some(UTC))"; + public static final String ICEBERG_VARCAT = "variant_get(nested, '$.varcategory', 'int')"; + public static final String SPARK_ISNOTNULL_NESTED = "isnotnull(nested)"; + public static final String ICEBERG_NESTED_ISNOTNULL = "nested IS NOT NULL"; + + @Parameters( + name = + "catalogName = {0}, implementation = {1}, config = {2}, planningMode = {3} shredded= {4}") + public static Object[][] parameters() { + return new Object[][] { + // unshredded: the reference + { + SparkCatalogConfig.HADOOP.catalogName(), + SparkCatalogConfig.HADOOP.implementation(), + SparkCatalogConfig.HADOOP.properties(), + LOCAL, + false + }, + // local planning and shredded + { + SparkCatalogConfig.HADOOP.catalogName(), + SparkCatalogConfig.HADOOP.implementation(), + SparkCatalogConfig.HADOOP.properties(), + LOCAL, + true + }, + // distributed planning and shredded + { + SparkCatalogConfig.HADOOP.catalogName(), + SparkCatalogConfig.HADOOP.implementation(), + SparkCatalogConfig.HADOOP.properties(), + DISTRIBUTED, + true + }, + }; + } + + public static final Logger LOG = LoggerFactory.getLogger(TestSparkVariantFilterPushDown.class); + + @Parameter(index = 3) + private PlanningMode planningMode; + + /** Should the variant be shredded? */ + @Parameter(index = 4) + private boolean shredded; + + /** Number of categories; each row has {@code category = id}. */ + private static final int NUM_CATEGORIES = 20; + + public TestSparkVariantFilterPushDown() {} + + @BeforeEach + public void createTable() { + LOG.info("Creating Spark Table with shredding {}", shredded); + if (shredded) { + spark.conf().set(SparkSQLProperties.SHRED_VARIANTS, "true"); + } + sql( + """ + CREATE TABLE %s (id BIGINT, category INT, nested VARIANT, arr VARIANT) + USING iceberg + TBLPROPERTIES ('format-version'='3', + 'read.parquet.vectorization.enabled'='false', + 'write.parquet.shred-variants'='%s')""", + selectTarget(), shredded); + configurePlanningMode(planningMode); + buildDataset(); + } + + @AfterEach + public void dropTable() { + sql("DROP TABLE IF EXISTS %s", tableName); + } + + /** + * Build a dataset in sql, using parse_json to create the variant data. Records are {@code (id, + * category, parse_json(nested_json), parse_json(arr_json))} + */ + private void buildDataset() { + + String values = + IntStream.range(0, NUM_CATEGORIES) + .mapToObj( + n -> + String.format( + "(%d, %d, parse_json('{\"varid\": %d, \"varcategory\": %d}'), parse_json('[%d]'))", + (long) n, n, n, n, n)) + .collect(Collectors.joining(", ")); + + sql("INSERT INTO %s VALUES %s", selectTarget(), values); + } + + /** + * Baseline: filter on a plain INT column, project the id. Iceberg pushes the predicate fully to + * the scan; Spark still evaluates the predicate post-scan as a safety check. + */ + @TestTemplate + public void filterCategoryProjectId() { + withDefaultTimeZone( + "UTC", + () -> + checkFilters( + "id", + "category = 5", + "isnotnull(category) AND (category = 5)", + "category IS NOT NULL, category = 5", + 0, 0, + ImmutableList.of(row(5L)))); + } + + /** + * Filter using variant field extraction (equality). Iceberg pushes both a null check and the + * equality predicate on the variant column; Spark also evaluates the filter post-scan. + */ + @TestTemplate + public void filterVariantCategoryProjectId() { + withDefaultTimeZone( + "UTC", + () -> + checkFilters( + "id", + ICEBERG_VARCAT + " = 5", + SPARK_ISNOTNULL_NESTED + " AND (" + SPARK_VARIANT_GET + " = 5)", + ICEBERG_NESTED_ISNOTNULL + ", " + ICEBERG_VARCAT + " = 5", + 2, 2, + ImmutableList.of(row(5L)))); + } + + /** Use the greater than and less than predicates in a query. Doubles the number of scans. */ + @TestTemplate + public void filterVariantCategoryInRange() { + withDefaultTimeZone( + "UTC", + () -> + checkFilters( + "id", + ICEBERG_VARCAT + " > 4 AND " + ICEBERG_VARCAT + " < 7", + "(" + + SPARK_ISNOTNULL_NESTED + + " AND (" + + SPARK_VARIANT_GET + + " > 4))" + + " AND (" + + SPARK_VARIANT_GET + + " < 7)", + ICEBERG_NESTED_ISNOTNULL + + ", " + + ICEBERG_VARCAT + + " > 4, " + + ICEBERG_VARCAT + + " < 7", + 4, 4, + ImmutableList.of(row(5L), row(6L)))); + } + + /** Use the greater than and less than predicates in a query. Doubles the number of scans. */ + @TestTemplate + public void filterVariantCategoryGreateThanEquals() { + withDefaultTimeZone( + "UTC", + () -> + checkFilters( + "id", + ICEBERG_VARCAT + " >= 4 AND " + ICEBERG_VARCAT + " <= 7", + "(" + + SPARK_ISNOTNULL_NESTED + + " AND (" + + SPARK_VARIANT_GET + + " >= 4))" + + " AND (" + + SPARK_VARIANT_GET + + " <= 7)", + ICEBERG_NESTED_ISNOTNULL + + ", " + + ICEBERG_VARCAT + + " >= 4, " + + ICEBERG_VARCAT + + " <= 7", + 4, 4, + ImmutableList.of(row(4L), row(5L), row(6L), row(7L)))); + } + + /** + * Project a variant field and filter on a different variant field. Iceberg pushes both a null + * check and the equality predicate; the projection is evaluated post-scan. + */ + @TestTemplate + public void filterVariantCategoryProjectVariantId() { + withDefaultTimeZone( + "UTC", + () -> + checkFilters( + "variant_get(nested, '$.varid', 'int')", + ICEBERG_VARCAT + " = 5", + SPARK_ISNOTNULL_NESTED + " AND (" + SPARK_VARIANT_GET + " = 5)", + ICEBERG_NESTED_ISNOTNULL + ", " + ICEBERG_VARCAT + " = 5", + 2, 2, + ImmutableList.of(row(5)))); + } + + /** IN predicate on a variant field using {@code variant_get}. */ + @TestTemplate + public void filterVariantCategorySetMembership() { + withDefaultTimeZone( + "UTC", + () -> + checkFilters( + "id", + ICEBERG_VARCAT + " IN (5, 10)", + SPARK_VARIANT_GET + " IN (5,10)", + ICEBERG_VARCAT + " IN (5, 10)", // no null check + 4, 4, + ImmutableList.of(row(5L), row(10L)))); + } + + /** + * Set members are all above or below the categories. The filter string this produces is slightly + * different from that of {@link #filterVariantCategorySetMembership()}. + */ + @TestTemplate + public void filterVariantCategorySetMembership2() { + withDefaultTimeZone( + "UTC", + () -> + checkFilters( + "id", + ICEBERG_VARCAT + " IN (100, 400)", + SPARK_VARIANT_GET + " IN (100,400)", + "", + 0, 0, + ImmutableList.of())); + } + + /** + * Set membership of a single element is remapped to equality, filtering takes place in + * planning. + */ + @TestTemplate + public void filterVariantCategorySetMembership3() { + withDefaultTimeZone( + "UTC", + () -> + checkFilters( + "id", + ICEBERG_VARCAT + " IN (100)", + SPARK_ISNOTNULL_NESTED + " AND (" + SPARK_VARIANT_GET + " = 100)", + "", + 0, 0, + ImmutableList.of())); + } + + /** + * Set membership of a single element is remapped to equality, and if that element is in range, a + * pushed down predicate is evaluated. + */ + @TestTemplate + public void filterVariantCategorySetMembership4() { + withDefaultTimeZone( + "UTC", + () -> + checkFilters( + "id", + ICEBERG_VARCAT + " IN (4)", + SPARK_ISNOTNULL_NESTED + " AND (" + SPARK_VARIANT_GET + " = 4)", + ICEBERG_NESTED_ISNOTNULL + ", " + ICEBERG_VARCAT + " = 4", + 2, 2, + ImmutableList.of(row(4L)))); + } + + /** + * Evaluation of the IS NULL predicate. + */ + @TestTemplate + public void filterVariantCategoryIsNull() { + withDefaultTimeZone( + "UTC", + () -> + checkFilters( + "id", + ICEBERG_VARCAT + " IS NULL", + "isnull(" + SPARK_VARIANT_GET + ")", + "", + 4, 4, + ImmutableList.of())); + } + + /** + * Evaluation of the IS NOT NULL predicate; this finds everything. + */ + @TestTemplate + public void filterVariantCategoryIsNotNull() { + List rows = LongStream.rangeClosed(0, 19) + .mapToObj(this::row) + .toList(); + withDefaultTimeZone( + "UTC", + () -> + checkFilters( + "id", + ICEBERG_VARCAT + " IS NOT NULL", + SPARK_ISNOTNULL_NESTED + " AND isnotnull(" + SPARK_VARIANT_GET + ")", + "nested IS NOT NULL, variant_get(nested, '$.varcategory', 'int') IS NOT NULL", + 4, 4, + rows)); + } + + /** + * Filter on element 0 of an array variant. Iceberg pushes a null check on the array column; the + * actual element comparison is done post-scan. + */ + @TestTemplate + public void filterArrayElementProjectId() { + withDefaultTimeZone( + "UTC", + () -> + checkFilters( + "id", + "variant_get(arr, '$[0]', 'int') = 5", + "isnotnull(arr) AND (variant_get(arr, $[0], IntegerType, true, Some(UTC)) = 5)", + "arr IS NOT NULL", + 0, 0, + ImmutableList.of(row(5L)))); + } + + // --------------------------------------------------------------------------- + // Helpers + // --------------------------------------------------------------------------- + + /** + * Run {@code SELECT FROM WHERE ORDER BY id}, assert the returned + * rows, and verify that the physical plan contains the expected Spark post-scan filter and + * Iceberg scan filters. + * + * @param projection column expression(s) to select + * @param predicate SQL WHERE clause (no "WHERE" keyword) + * @param sparkFilter expected post-scan Spark Filter node text + * @param icebergFilters expected {@code filters=...} value from the Iceberg scan node; empty + * string means no Iceberg pushdown shall take places. + * @param expectedPlanningEvaluations expected number of evaluations during planning + * @param expectedExecutionEvaluations number of evaluations of a rowgroup filter predicate + * on shredded column. + * @param expectedRows expected result rows in id order + */ + private void checkFilters( + String projection, + String predicate, + String sparkFilter, + String icebergFilters, + int expectedPlanningEvaluations, + int expectedExecutionEvaluations, + List expectedRows) { + + ParquetMetricsRowGroupFilter.resetShreddedMetricsCounter(); + String query = + String.format("SELECT %s FROM %s WHERE %s ORDER BY id", projection, tableName, predicate); + + SparkPlan plan = executeAndKeepPlan(query); + long planShredCount = ParquetMetricsRowGroupFilter.variantPredicatesShreddedMetrics(); + String planString = plan.toString().replaceAll("#\\d+L?", ""); + String summary = String.format("%s with plan shred count %d", query, planShredCount); + + assertThat(planString) + .as("Post-scan Spark filter of %s", summary) + .containsAnyOf("Filter (" + sparkFilter + ")", "Filter " + sparkFilter); + + if (!icebergFilters.isEmpty()) { + assertThat(planString).as("No iceberg scan generated from %s", summary).contains("IcebergScan"); + assertThat(planString) + .as("Iceberg pushed filters of must match from %s", summary) + .contains(", filters=" + icebergFilters + ","); + } else { + assertThat(planString) + .as("No iceberg scan generated from %s", summary) + .doesNotContain("IcebergScan"); + } + + if (shredded) { + assertThat(ParquetMetricsRowGroupFilter.variantPredicatesShreddedMetrics()) + .describedAs("Count of shredded metrics filtered during planning of of %s to plan %s", + summary, planString) + .isEqualTo(expectedPlanningEvaluations); + } + ParquetMetricsRowGroupFilter.resetShreddedMetricsCounter(); + final List rows = sql("SELECT %s FROM %s WHERE %s ORDER BY id", projection, selectTarget(), predicate); + assertEquals( + "Execution of " + summary + " to plan " + planString, + expectedRows, + rows); + if (shredded) { + assertThat(ParquetMetricsRowGroupFilter.variantPredicatesShreddedMetrics()) + .describedAs("Count of shredded metrics filtered during execution of of %s to plan %s", + summary, planString) + .isEqualTo(expectedExecutionEvaluations); + } + } +} diff --git a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestSpark3Util.java b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestSpark3Util.java index e4e66abfefa0..596375809a23 100644 --- a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestSpark3Util.java +++ b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestSpark3Util.java @@ -24,6 +24,7 @@ import static org.apache.iceberg.expressions.Expressions.bucket; import static org.apache.iceberg.expressions.Expressions.day; import static org.apache.iceberg.expressions.Expressions.equal; +import static org.apache.iceberg.expressions.Expressions.extract; import static org.apache.iceberg.expressions.Expressions.greaterThan; import static org.apache.iceberg.expressions.Expressions.greaterThanOrEqual; import static org.apache.iceberg.expressions.Expressions.hour; @@ -165,6 +166,21 @@ public void testDescribeExpression() { assertThat(Spark3Util.describe(andExpression)).isEqualTo("(id = 1 AND year(ts) > 10)"); } + @Test + public void testDescribeExtractExpression() { + Expression extractGt = greaterThan(extract("v", "$.city", "string"), "Boston"); + assertThat(Spark3Util.describe(extractGt)) + .isEqualTo("variant_get(v, '$.city', 'string') > 'Boston'"); + + Expression extractEq = equal(extract("v", "$.event.id", "long"), 42L); + assertThat(Spark3Util.describe(extractEq)) + .isEqualTo("variant_get(v, '$.event.id', 'long') = 42"); + + Expression extractIn = in(extract("v", "$.city", "string"), "NYC", "LA"); + assertThat(Spark3Util.describe(extractIn)) + .isEqualTo("variant_get(v, '$.city', 'string') IN ('NYC', 'LA')"); + } + private SortOrder buildSortOrder(String transform, Schema schema, int sourceId) { String jsonString = "{\n" diff --git a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestSparkV2Filters.java b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestSparkV2Filters.java index e0b590e5a6e8..0538b8130771 100644 --- a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestSparkV2Filters.java +++ b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestSparkV2Filters.java @@ -652,6 +652,66 @@ public void testTruncate() { testUDF(udf, Expressions.truncate("strCol", 6), "prefix", DataTypes.StringType); } + @Test + public void testVariantGet() { + UserDefinedScalarFunc udf = + new UserDefinedScalarFunc( + "variant_get", + "variant_get", + expressions( + FieldReference.apply("v"), + LiteralValue.apply(UTF8String.fromString("$.city"), DataTypes.StringType), + LiteralValue.apply(UTF8String.fromString("string"), DataTypes.StringType))); + testUDF(udf, Expressions.extract("v", "$.city", "string"), "NYC", DataTypes.StringType); + } + + @Test + public void testTryVariantGet() { + UserDefinedScalarFunc udf = + new UserDefinedScalarFunc( + "try_variant_get", + "try_variant_get", + expressions( + FieldReference.apply("v"), + LiteralValue.apply(UTF8String.fromString("$.city"), DataTypes.StringType), + LiteralValue.apply(UTF8String.fromString("string"), DataTypes.StringType))); + testUDF(udf, Expressions.extract("v", "$.city", "string"), "NYC", DataTypes.StringType); + } + + @Test + public void testVariantGetNestedPath() { + UserDefinedScalarFunc udf = + new UserDefinedScalarFunc( + "variant_get", + "variant_get", + expressions( + FieldReference.apply("v"), + LiteralValue.apply(UTF8String.fromString("$.event.id"), DataTypes.StringType), + LiteralValue.apply(UTF8String.fromString("long"), DataTypes.StringType))); + testUDF(udf, Expressions.extract("v", "$.event.id", "long"), 42L, DataTypes.LongType); + } + + @Test + public void testVariantGetInPredicate() { + UserDefinedScalarFunc udf = + new UserDefinedScalarFunc( + "variant_get", + "variant_get", + expressions( + FieldReference.apply("v"), + LiteralValue.apply(UTF8String.fromString("$.city"), DataTypes.StringType), + LiteralValue.apply(UTF8String.fromString("string"), DataTypes.StringType))); + org.apache.spark.sql.connector.expressions.Expression[] attrAndValues = + expressions( + udf, + LiteralValue.apply(UTF8String.fromString("NYC"), DataTypes.StringType), + LiteralValue.apply(UTF8String.fromString("LA"), DataTypes.StringType)); + Predicate in = new Predicate("IN", attrAndValues); + Expression actual = SparkV2Filters.convert(in); + Expression expected = Expressions.in(Expressions.extract("v", "$.city", "string"), "NYC", "LA"); + assertEquals(expected, actual); + } + @Test public void testUnsupportedUDFConvert() { ScalarFunction icebergVersionFunc =