feat: expand date/time expression support using codegen dispatcher#4417
Conversation
…l short-circuit CometBatchKernelCodegen.defaultBody emitted this.col$ord.isNull(i) for every NullIntolerant input, but primitive Arrow vectors (timestamp / int / float / date / bool / ...) are wrapped in CometPlainVector at input-cast time and expose isNullAt rather than the raw Arrow isNull. The short-circuit therefore failed to compile for any primitive-typed column with a Janino "method isNull not declared" error. Share the existing nullCheckMethod helper between emitTypedGetters and defaultBody so both sites pick the right method name per column. Add a source test that pins the chosen method for TimeStampMicroTZVector inputs.
CometDateFormat keeps the native to_char path for UTC sessions with a format
literal in the strftime-mappable whitelist, and now routes every other case
through the Arrow-direct codegen dispatcher (CometScalaUDFCodegen) so that
non-UTC sessions, non-literal formats, and formats outside the whitelist
stay inside the Comet pipeline running Spark's own DateFormatClass.doGenCode.
Refactor: extract the closure-serialize + JvmScalarUdf-proto emission from
CometScalaUDF.convert into a reusable CometScalaUDF.emitJvmCodegenDispatch
helper. Any serde that wants to fall back to a Spark built-in expression
through the dispatcher can call it. Gated by COMET_SCALA_UDF_CODEGEN_ENABLED
so the default remains a clean Spark fallback for those cases until the
dispatcher graduates from experimental.
Reasoning notes:
- DateFormatClass already has a proper doGenCode (not CodegenFallback),
NullIntolerant, and ResolveTimeZone stamps the timeZoneId on it during
analysis. Closure-serializing the bound tree therefore reproduces
Spark-identical behavior for every timezone.
- The kernel cache key already encodes the literal format and timezone via
the serialized expression bytes, so (format, tz) combinations get
distinct cached kernels just like a bespoke (format, tz) -> formatter
cache would. Saves an entire DateFormatUDF.scala class.
Tests:
- date_format - timestamp_ntz input: now runs checkSparkAnswerAndOperator
for every timezone under the codegen flag instead of falling back for
non-UTC.
- Split each previous "falls back to Spark" Scala test into two: one
asserting the codegen-on path stays in Comet, one asserting the
codegen-off path falls back with the dispatcher flag as the reason.
- date_format.sql now pins a non-UTC session timezone and enables the
codegen flag at file scope; all queries are plain query and assert
in-Comet execution.
The CometScalaUDF fallback message was generalized from 'ScalaUDF has no native path' to 'expression has no native path' when the dispatcher helper was extracted for reuse by CometDateFormat.
- Drop the getCompatibleNotes override on CometCodegenDispatch. The docs generator emits compat notes under a heading promising 'no additional configuration', which contradicts a note describing the dispatcher flag. Keep getSupportLevel=Compatible and surface the flag dependency via withInfo / EXPLAIN instead. - Add a sentinel non-error query to each *_ansi.sql fixture. The expect_error semantics pass vacuously when the dispatcher silently falls back to Spark (both paths throw identical exceptions); the sentinel uses checkSparkAnswerAndOperator and fails if Comet did not run the expression natively. - Pin spark.sql.legacy.timeParserPolicy=CORRECTED in to_unix_timestamp_ansi.sql so the JDK java.time formatter is exercised regardless of runtime default; LEGACY policy uses SimpleDateFormat with a different exception class. - Annotate the three ANSI fixtures with MinSparkVersion: 3.5 since the DATETIME_FIELD_OUT_OF_BOUNDS and CANNOT_PARSE_TIMESTAMP error classes were standardized in Spark 3.5. Spark 3.4 coverage is delivered separately.
Mirror the existing MinSparkVersion gate with a MaxSparkVersion gate so SQL fixtures can pair a 3.5+ variant (using post-3.5 error class names) with a 3.4 variant (using the pre-classification JDK java.time exception text). The make_timestamp and to_unix_timestamp ANSI exception paths produce different exception wording on Spark 3.4 versus 3.5+; before this commit only the 3.5+ side had coverage and 3.4 ANSI behavior went untested. Framework: - SqlTestFile gains maxSparkVersion: Option[String]. - SqlFileTestParser recognizes -- MaxSparkVersion: lines. - CometSqlFileTestSuite gains meetsMaxSparkVersion / skipReason helpers; the skip-and-log path now reports whether the constraint was a floor or ceiling. Coverage: - make_timestamp_ansi_spark34.sql: MaxSparkVersion: 3.4, expect_error patterns target the JDK DateTimeException field-name text (MonthOfYear, Invalid date, HourOfDay) which is stable in 3.4's pre-classification error path. - to_unix_timestamp_ansi_spark34.sql: MaxSparkVersion: 3.4, expect_error pattern targets the JDK DateTimeParseException 'could not be parsed' wording.
a3923d4 to
1b952e7
Compare
CI failed on Spark 3.5.8 because the executor-thrown SparkDateTimeException's
getMessage() does NOT preserve the driver-formatted '[DATETIME_FIELD_OUT_OF_BOUNDS]'
error-class prefix; only the inner JDK message ('Invalid value for MonthOfYear ...',
'Invalid date FEBRUARY 30', 'Invalid value for HourOfDay ...') survives the
'Job aborted ... Lost task ... SparkDateTimeException: <inner>' wrapping that
shows up in the test's caught exception.
Switching to the JDK java.time field-name substrings (MonthOfYear, Invalid date,
HourOfDay) makes the assertions stable across Spark 3.4, 3.5.x, and 4.x without
needing a MinSparkVersion gate, so the make_timestamp_ansi_spark34.sql variant
becomes redundant and is deleted in the same commit.
Verified locally: passes under -Pspark-3.4 (3.4.3) and -Pspark-3.5 (3.5.8).
|
Awesome to see the codegen framework being put to good use, and bugs being found and fixed! A couple of things on the broader dispatcher pattern worth thinking about before the second wave of expressions lands. The sentinel On the cache-key side, three of the new expressions are |
Address review feedback on Bucket 4 codegen PR: - `CometSqlFileTestSuite.requireSentinelForCodegenExpectError` rejects any fixture combining `expect_error` with the codegen flag unless at least one non-error sentinel query is present. Documents the rule on `ExpectError`. Without the sentinel a silent dispatcher fallback to Spark would let the `expect_error` queries pass vacuously. - New `CometCodegenSourceSuite` test serializes pairs of bound expressions that differ only in `failOnError` (MakeTimestamp, ToUnixTimestamp), `roundOff` (MonthsBetween), or `timeZoneId` and asserts the closure- serialized bytes diverge. Pins the invariant that the dispatcher caches these variants under distinct keys rather than colliding.
mbutrovich
left a comment
There was a problem hiding this comment.
I think expressions.md needs updating, but that's just a docs PR, no reasont to kick CI again on this PR. Awesome to see more use for the codegen path!
|
Merged. Thanks @mbutrovich. I will follow up with docs PR. |
…of hand-written UDFs Replace the six hand-written `RegExp*UDF` / `StringSplitUDF` JVM UDF implementations with the Arrow-direct codegen dispatcher introduced in PR apache#4417 (`CometScalaUDF.emitJvmCodegenDispatch`). The dispatcher Janino-compiles Spark's own `doGenCode` for the expression, so the regex family inherits Spark-identical semantics with no per-expression glue code. Changes: - Delete `spark/src/main/scala/org/apache/comet/udf/RegExp*UDF.scala` and `StringSplitUDF.scala`. Their behavior is now provided by Spark's `doGenCode` running inside the dispatcher. - Rewrite the regex serdes in `strings.scala`. Expressions with no native Rust path (`RegExpExtract`, `RegExpExtractAll`, `RegExpInStr`) share a new `CometRegexpCodegenOnly` base; expressions with a native path (`RLike`, `RegExpReplace`, `StringSplit`) keep an explicit route table where the JVM arm now delegates to `CometScalaUDF.emitJvmCodegenDispatch`. - Drop the `spark.comet.jvmUdf.enabled` config. The codegen dispatcher already has its own master switch (`spark.comet.exec.scalaUDF.codegen.enabled`); gating the regex family on the same flag avoids two flags for the same path. `spark.comet.exec.regexp.engine` keeps the `java`/`rust` selector semantics, and `engine=java` now requires the codegen flag. - Revert the native Rust additions in `jvm_udf/mod.rs` and `jni-bridge/src/lib.rs`. The codegen dispatcher constructs Arrow output fields JVM-side via `CometBatchKernelCodegenOutput.toFfiArrowField`, so the list-vector field-name normalization cast is unnecessary. - Update `CometRegExpJvmSuite`, `CometRegExpBenchmark`, the regex SQL test fixtures, and the regex compatibility doc to reflect the new gating. Test plan: - `CometRegExpJvmSuite`: 45/45 pass (covers all six regex expressions through the codegen dispatcher). - `CometSqlFileTestSuite`: 289/289 pass. - `CometStringExpressionSuite`: 33/33 pass. - `CometCodegenSuite`: 60/60 pass. - `cargo clippy --all-targets --workspace -- -D warnings`: clean.
…f hand-written UDFs Replace the three hand-written `GetJsonObjectUDF` / `FromJsonUDF` / `ToJsonUDF` JVM UDF implementations and the `CometLambdaRegistry` indirection with the Arrow-direct codegen dispatcher introduced in PR apache#4417 (`CometScalaUDF.emitJvmCodegenDispatch`). The dispatcher Janino-compiles Spark's own `doGenCode` (or `eval(row)` for CodegenFallback expressions) so the JSON family inherits Spark-identical semantics with no per-expression glue. Changes: - Delete the three hand-written UDF files under `spark/src/main/scala/org/apache/comet/udf/` and their unit-test suites. The codegen dispatcher's per-task `kernelCache` provides the same per-thread isolation that `CometLambdaRegistry` was working around. - Rewrite the JSON serdes (`CometGetJsonObject` in `strings.scala`, `CometStructsToJson` and `CometJsonToStructs` in `structs.scala`) to go through a new `JsonRoute` helper. `engine=rust` keeps the native path; `engine=java` delegates to `CometScalaUDF.emitJvmCodegenDispatch` when `spark.comet.exec.scalaUDF.codegen.enabled=true`. - Generalize the codegen dispatcher to accept `CodegenFallback` expressions. `CodegenFallback.doGenCode` emits `references[N].eval(row)`, the same shape the `HigherOrderFunction` carve-out already relied on; lifting the rejection lets `JsonToStructs` and `StructsToJson` (which are `CodegenFallback` in Spark 4) ride the same path. - Unwrap `RuntimeReplaceable` expressions inside `CometScalaUDF.emitJvmCodegenDispatch` before binding. Spark 4's `StructsToJson` is `RuntimeReplaceable` and its `doGenCode` throws "Cannot generate code for expression"; calling `.replacement` gives the `Invoke(StructsToJsonEvaluator, ...)` form that does codegen. - Update the JSON compatibility doc and the `CometJsonJvmSuite` config to reference the codegen flag. Test plan: - `CometJsonJvmSuite`: 3/3 pass (get_json_object, from_json, to_json round-trip via the codegen dispatcher). - `CometJsonExpressionSuite`: 8/8 pass on the unchanged native path. - `CometStringExpressionSuite`: 33/33, `CometCodegenSuite`: 60/60, `CometCodegenSourceSuite`: 50/50, `CometSqlFileTestSuite`: 284/284. - `cargo clippy --all-targets --workspace -- -D warnings`: clean.
|
Some post merge comments -
|
This is just adding a cheaper fallback for expressions that are not yet implemented. I'm happy to review PRs to add native implementations and remove the fallbacks. |
Which issue does this PR close?
Part of #3202. Supersedes and consolidates #4373.
Expressions covered
Ten Spark date/time expressions are routed through the Arrow-direct codegen dispatcher in this PR. All run inside the Comet pipeline (no operator-level Spark fallback) when
spark.comet.exec.scalaUDF.codegen.enabled=trueis set. Default behavior is unchanged when the flag is off.date_format(originally feat: route date_format through codegen dispatcher for non-native cases #4373; nativeto_charpath retained for UTC + whitelisted-format cases, dispatcher path for everything else)add_monthsmonths_betweenmake_timestamptimestamp_millis(MillisToTimestamp)timestamp_micros(MicrosToTimestamp)unix_secondsunix_millisunix_microsto_unix_timestampRationale for this change
Comet's plan rules fall back to Spark for any expression that lacks a native serde, breaking up the Comet pipeline at the operator boundary. The Arrow-direct codegen dispatcher already in tree (
CometScalaUDFCodegen, behindspark.comet.exec.scalaUDF.codegen.enabled) closure-serializes a bound Catalyst expression, ships it through aJvmScalarUdfproto, and Janino-compiles Spark's owndoGenCodeinto a per-batch kernel that reads Arrow vectors and writes an Arrow output vector directly. For any expression whosedoGenCodeis real (notCodegenFallback) and whose input/output types fitCometBatchKernelCodegen.isSupportedDataType, routing through this dispatcher reproduces Spark behavior exactly without a bespoke UDF class.This PR establishes a reusable helper for that routing pattern and applies it to the ten expressions above in two waves:
DateFormatClassfirst (originally #4373, folded in here), then nine currently-unsupported expressions that share the same shape.What changes are included in this PR?
fix(codegen)— Pre-existing bug surfaced while wiringdate_format.CometBatchKernelCodegen.defaultBodyemittedthis.col$ord.isNull(i)for everyNullIntolerantinput, but primitive Arrow vectors are wrapped inCometPlainVectorat input-cast time and exposeisNullAt, notisNull. Janino rejected the kernel with "methodisNullnot declared".emitTypedGettersalready knew the right method name vianullCheckMethod; the fix exposes that helper sodefaultBodypicks the same name per column ordinal. New source test pins the chosen method forTimeStampMicroTZVectorso the regression can't recur.feat— dispatcher helper extraction — Extract the closure-serialize +JvmScalarUdfemission fromCometScalaUDF.convertintoCometScalaUDF.emitJvmCodegenDispatch(expr, inputs, binding)so other serdes can reuse the path.CometDateFormat.convertkeeps the nativeto_charpath for UTC + whitelisted-format cases and now calls the helper for everything else. Gated byspark.comet.exec.scalaUDF.codegen.enabled(default false, experimental); default behavior is unchanged.feat—CometCodegenDispatch[T]base class — A one-class wrapper aroundemitJvmCodegenDispatchlets any expression with a realdoGenCodeslot in with a single-lineobjectdeclaration. The helper marks the expressionCompatible()because the dispatcher runs Spark's owndoGenCodeinside the kernel; behavior matches Spark exactly when the flag is on, and the operator falls back cleanly when it is off.feat— nine new datetime serdes — Each of the nine new expressions becomes a one-line singletonextends CometCodegenDispatch[T]and is registered intemporalExpressions. Three are ANSI-sensitive (MakeTimestamp,MillisToTimestamp,ToUnixTimestampcarryfailOnError); the dispatcher inherits the throw site from Spark's owndoGenCode, so exception semantics propagate without any serde-level branching.test—MaxSparkVersionannotation — Mirror the existingMinSparkVersionparser gate with aMaxSparkVersionceiling. TheCometSqlFileTestSuiteskip logic now reports whether a constraint was a floor or ceiling. This lets fixtures pair a 3.5+ variant against a 3.4 variant where the expected error class wording differs.Interval-producing expressions (
MakeInterval/MakeYMInterval/MakeDTInterval) are explicitly out of scope: the dispatcher'sisSupportedDataTypedoes not include Spark's interval types. Version-conditional expressions (TimestampAdd/TimestampDiff3.4+,DayName3.5+,MonthName4.0+) are deferred to a follow-on so this PR avoids touching theCometExprShimfiles.Scaffolded with the
superpowers:brainstormingandsuperpowers:writing-plansskills.How are these changes tested?
CometTemporalExpressionSuitedate_formattests: 10/10 pass. Three "falls back to Spark" tests are paired with a "routes via codegen dispatcher" sibling that enables the flag and asserts in-Comet execution.date_format - timestamp_ntz inputrunscheckSparkAnswerAndOperatorfor every timezone under the codegen flag.CometSqlFileTestSuite: nine new per-expression SQL fixtures (add_months.sql,months_between.sql,make_timestamp.sql,timestamp_millis.sql,timestamp_micros.sql,unix_seconds.sql,unix_millis.sql,unix_micros.sql,to_unix_timestamp.sql) pin a non-UTC session timezone and the codegen flag at file scope.date_format.sqlfrom feat: route date_format through codegen dispatcher for non-native cases #4373 is included.make_timestamp_ansi.sql/make_timestamp_ansi_spark34.sql,to_unix_timestamp_ansi.sql/to_unix_timestamp_ansi_spark34.sql, plustimestamp_millis_ansi.sql. The Spark 3.5+ files match theDATETIME_FIELD_OUT_OF_BOUNDS/CANNOT_PARSE_TIMESTAMPerror classes; the 3.4 files match the underlying JDKjava.timeexception text (MonthOfYear,Invalid date,HourOfDay,could not be parsed). Each ANSI file includes a sentinel non-errorquerythat usescheckSparkAnswerAndOperatorso a silent dispatcher fallback would fail the file (theexpect_errorpath alone passes vacuously because Spark fallback also throws).CometCodegenSourceSuite: includes the newNullIntolerant short-circuit uses isNullAt for CometPlainVector-wrapped columnsregression test plus a parameterizedBucket 4 datetime expressions produce non-empty generated kernel sourcetest covering all nine new expressions.CometCodegenSuite: no regressions in the dispatcher's existing surface.-Pspark-3.4: 284/284 SQL fixtures pass (3.5+ variants correctly skip).