-
Notifications
You must be signed in to change notification settings - Fork 324
feat(experimental): ScalaUDF and Java UDF support via Janino codegen #4267
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 85 commits
Commits
Show all changes
93 commits
Select commit
Hold shift + click to select a range
1746bcc
feat: Arrow-direct codegen dispatcher for Spark expressions and Scala…
mbutrovich 08d6b78
prettier, add new suites to CI checks.
mbutrovich 557752e
make format, fix shims for 4.0+
mbutrovich 896f61f
make format, fix shims for 4.0+
mbutrovich a82e160
Merge branch 'main' into codegen_scala_udf
mbutrovich 2a158f4
strengthen tests for composed expressions
mbutrovich 654bbad
make format, again.
mbutrovich 10df7e0
fix pr_benchmark_check.yml
mbutrovich 7afe69f
fix arrow shading issue in CI.
mbutrovich 0dc5855
fix Spark 4.0 collation expression shim
mbutrovich 43a7b0c
apply common subexpression elimination, add tests for subqueries in UDFs
mbutrovich 9640897
make format
mbutrovich f0c8296
decimal fast path. document 64KB limitation right now
mbutrovich 2173f40
pass through task context to get around tokio worker pool calling ove…
mbutrovich 2f9585b
fix compilation on scala 2.12, fix format issue
mbutrovich 582cd17
Merge branch 'main' into codegen_scala_udf
mbutrovich 22f3256
decimal output, utf8 output, non-nullable output optimizations
mbutrovich 7666715
optimization menu
mbutrovich 0a34636
estimate binaryview and binary size
mbutrovich e94b6db
fix "CSE collapses a repeated subtree to one evaluation in the genera…
mbutrovich d0f1f27
Merge remote-tracking branch 'origin/codegen_scala_udf' into codegen_…
mbutrovich 07e37ea
add some complex type support, remove #4239 code. update docs.
mbutrovich ebf77c4
split codegen input and output, basic struct WIP
mbutrovich 6836c30
split massive codegen file, handle recursive nested types
mbutrovich 5d91a8f
map input
mbutrovich 2a28aaf
more struct support
mbutrovich 0c6586a
revert some benchmark changes
mbutrovich 8497fe7
cleanup part 1
mbutrovich 8d703c3
cleanup part 2
mbutrovich 5ec0e3f
cleanup part 3
mbutrovich a22051e
remove view support, it's dead code right now
mbutrovich 421c60c
use cometplainvector part 1
mbutrovich 0705dff
use cometplainvector part 2
mbutrovich 9a00874
make generated class final
mbutrovich d7b43fc
clean up test names
mbutrovich 034e1f5
fix format
mbutrovich 317feaf
Merge branch 'main' into codegen_scala_udf
mbutrovich db1f1f2
Merge branch 'main' into codegen_scala_udf
mbutrovich caffed9
fix 2.12 mapvalues usage
mbutrovich 4be8144
Remove code related to #4239.
mbutrovich 6fcd81c
Merge remote-tracking branch 'apache/main' into codegen_scala_udf
mbutrovich 9f8aa07
fix after merging in upstream/main.
mbutrovich 17b2714
switch to taskid-keyed state for CometUDFs.
mbutrovich ff8ee79
Merge branch 'main' into codegen_scala_udf
mbutrovich 7ed806a
reduce the scope to just ScalaUDF instead of general spark expression…
mbutrovich 6ff5aa0
update docs
mbutrovich 935aec6
reorg codegen
mbutrovich cbf96df
more tests
mbutrovich 5966055
cleanup
mbutrovich 748f943
document optimizations
mbutrovich f9318d8
fix tests
mbutrovich 19ac9f6
try to trim comments a bit
mbutrovich 13270bf
update two tests
mbutrovich 1111c6f
revert unintended diff from main
mbutrovich 61ae5b7
add Java UDF test
mbutrovich 6643208
update stale TODO references
mbutrovich 965c2ba
better input fuzz coverage
mbutrovich 948f3b9
better input fuzz coverage
mbutrovich 41fc046
better input fuzz coverage
mbutrovich 25c2511
simplify input logic
mbutrovich a057687
fix format
mbutrovich 650f619
add fallback for too many args and a test, clean up printing code
mbutrovich b1e1c55
stronger tests
mbutrovich 0f6f68c
Merge branch 'main' into codegen_scala_udf
mbutrovich d967143
fix(udf): scope the dispatcher's compile cache per task to isolate bo…
mbutrovich 10da742
update docs
mbutrovich 23df354
add missing suite
mbutrovich b161169
synchronize per-task UDF evaluation
mbutrovich f86e70b
Merge branch 'main' into codegen_scala_udf
mbutrovich dca8b22
update spark diffs
mbutrovich b1fbbb8
Merge branch 'main' into codegen_scala_udf
mbutrovich 2be5f73
upmerge main, regenerate diffs
mbutrovich 4d471e1
Merge branch 'main' into codegen_scala_udf
mbutrovich e19683e
cleanup round 1
mbutrovich ec42809
cleanup round 2
mbutrovich 9089fa1
remove benchmark
mbutrovich 2259ff6
remove cast from JNI layer that was a bandaid for List types
mbutrovich 83096e7
Merge branch 'main' into codegen_scala_udf
mbutrovich 5ee1ddf
fix scala 2.12
mbutrovich 2102f62
Merge remote-tracking branch 'origin/codegen_scala_udf' into codegen_…
mbutrovich e98164c
set config to false by default since it's experimental
mbutrovich ca4cd41
Update fallback message.
mbutrovich c12096e
a159357
roll back diff changes
mbutrovich a68ba53
Merge branch 'main' into codegen_scala_udf
mbutrovich 8a651e5
Merge branch 'main' into codegen_scala_udf
mbutrovich 63573ba
address PR feedback
mbutrovich c9d2960
tighten comments, fix planner.rs builder changes to align to codebase…
mbutrovich 41ea025
Merge branch 'main' into codegen_scala_udf
mbutrovich 3edba99
swap init and process in CometBatchKernel
mbutrovich 79a4e98
fix format
mbutrovich 58757cb
update shading comments after #4325
mbutrovich 0b57f11
clean up more comments
mbutrovich File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,67 @@ | ||
| <!--- | ||
| Licensed to the Apache Software Foundation (ASF) under one | ||
| or more contributor license agreements. See the NOTICE file | ||
| distributed with this work for additional information | ||
| regarding copyright ownership. The ASF licenses this file | ||
| to you under the Apache License, Version 2.0 (the | ||
| "License"); you may not use this file except in compliance | ||
| with the License. You may obtain a copy of the License at | ||
|
|
||
| http://www.apache.org/licenses/LICENSE-2.0 | ||
|
|
||
| Unless required by applicable law or agreed to in writing, | ||
| software distributed under the License is distributed on an | ||
| "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| KIND, either express or implied. See the License for the | ||
| specific language governing permissions and limitations | ||
| under the License. | ||
| --> | ||
|
|
||
| # Scala UDF and Java UDF Support | ||
|
|
||
| Comet executes user-defined scalar functions written against the Scala or Java UDF APIs on the native Comet path. Surrounding native operators stay native; the operator no longer falls back to Spark just because a UDF is present. | ||
|
|
||
| This page covers `ScalaUDF` (Scala `udf(...)`, `spark.udf.register(...)` over Scala or Java functional interfaces, and SQL `CREATE FUNCTION ... AS 'com.example.MyUDF'`). Other UDF kinds (Python / Pandas, Hive, aggregate) are out of scope and continue to fall back to Spark. | ||
|
|
||
| This feature is experimental and disabled by default. | ||
|
|
||
| ## Configuration | ||
|
|
||
| | Key | Default | Description | | ||
| | ------------------------------------------- | ------- | ------------------------------------------------------------------------------------------------------------------ | | ||
| | `spark.comet.exec.scalaUDF.codegen.enabled` | `false` | When `true`, eligible `ScalaUDF`s run on the Comet path. When `false`, the enclosing operator falls back to Spark. | | ||
|
|
||
| ## Supported | ||
|
|
||
| - User functions registered via `udf(...)`, `spark.udf.register(...)` (Scala or Java functional interfaces), or SQL `CREATE FUNCTION ... AS 'com.example.MyUDF'`. | ||
| - Scalar input/output types: `Boolean`, `Byte`, `Short`, `Int`, `Long`, `Float`, `Double`, `Decimal`, `String`, `Binary`, `Date`, `Timestamp`, `TimestampNTZ`. | ||
| - Complex input/output types with arbitrary nesting: `ArrayType`, `StructType`, `MapType`. | ||
| - Composition with other Catalyst expressions inside the argument tree (e.g. `myUdf(upper(s))` runs as one native unit). | ||
| - Higher-order functions (`transform`, `filter`, `exists`, `aggregate`, `zip_with`, `map_filter`, `map_zip_with`, etc.) inside the argument tree. | ||
|
|
||
| ## Not supported | ||
|
|
||
| - Aggregate UDFs (`ScalaAggregator`, `TypedImperativeAggregate`, the legacy `UserDefinedAggregateFunction`). | ||
| - Table UDFs and generators. | ||
| - Python `@udf` and Pandas `@pandas_udf`. | ||
| - Hive `GenericUDF` and `SimpleUDF`. | ||
| - `CalendarIntervalType`, `NullType`, and `UserDefinedType` arguments and return types. | ||
| - Trees whose total nested-field count (output plus all input columns the UDF tree references) exceeds `spark.sql.codegen.maxFields` (default 100). Comet refuses these at plan time and the operator falls back to Spark. | ||
|
|
||
| When a UDF is rejected, the reason surfaces through Comet's standard fallback diagnostics; the query still runs on Spark. | ||
|
|
||
| ### Working around UDT arguments | ||
|
|
||
| Spark `UserDefinedType`s (e.g. MLlib's `VectorUDT`) wrap an underlying SQL representation, typically a struct or array of supported scalar types. To run a UDF over a UDT-typed column on the Comet path, register the function over the underlying representation instead of the UDT class and reconstruct the UDT object inside the function body. Convert back to the underlying representation on output. The same pattern works for the return type: produce a struct / array of supported scalars instead of returning the UDT directly, and rehydrate at the call site if needed. | ||
|
|
||
| This is awkward but unblocks UDT use cases without losing native execution of the surrounding plan. | ||
|
|
||
| ## Behavior | ||
|
|
||
| - Non-deterministic expressions referenced from the argument tree (`rand`, `uuid`, `monotonically_increasing_id`) produce per-partition sequences consistent with Spark. | ||
| - `TaskContext.get()` inside the user function returns the driving Spark task's context. | ||
| - The user function must be closure-serializable; the same function that works with Spark's executor execution works here. | ||
|
|
||
| ## Known limitations | ||
|
|
||
| - Comet specializes the UDF once per query. Spark's analyzer produces a fresh `ScalaUDF` instance per query, so two structurally identical queries do not share a specialization. Within one query, batches of the same shape reuse the specialization. | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
61 changes: 61 additions & 0 deletions
61
spark/src/main/java/org/apache/comet/codegen/CometBatchKernel.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,61 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
|
|
||
| package org.apache.comet.codegen; | ||
|
|
||
| import org.apache.arrow.vector.FieldVector; | ||
| import org.apache.arrow.vector.ValueVector; | ||
|
|
||
| /** | ||
| * Abstract base extended by the Janino-compiled batch kernel emitted by {@code | ||
| * CometBatchKernelCodegen}. The generated subclass extends {@code CometInternalRow} (so Spark's | ||
| * {@code BoundReference.genCode} can call {@code this.getUTF8String(ord)} directly) and carries | ||
| * typed input fields baked at codegen time, one per input column. Expression evaluation plus Arrow | ||
| * read/write fuse into one method per expression tree. | ||
| */ | ||
| public abstract class CometBatchKernel extends CometInternalRow { | ||
|
|
||
| protected final Object[] references; | ||
|
|
||
| protected CometBatchKernel(Object[] references) { | ||
| this.references = references; | ||
| } | ||
|
|
||
| /** | ||
| * Process one batch. | ||
| * | ||
| * @param inputs Arrow input vectors; length and concrete classes match the schema the kernel was | ||
| * compiled against | ||
| * @param output Arrow output vector; caller allocates to the expression's {@code dataType} | ||
| * @param numRows number of rows in this batch | ||
| */ | ||
| public abstract void process(ValueVector[] inputs, FieldVector output, int numRows); | ||
|
|
||
| /** | ||
| * Run partition-dependent initialization. The generated subclass overrides this to execute | ||
| * statements collected via {@code CodegenContext.addPartitionInitializationStatement}, e.g. | ||
| * reseeding {@code Rand}'s {@code XORShiftRandom} from {@code seed + partitionIndex}. | ||
| * Deterministic expressions leave this as a no-op. | ||
| * | ||
| * <p>The caller invokes this before the first {@code process} call of each partition. The | ||
| * generated subclass is not thread-safe across concurrent {@code process} calls; the dispatcher | ||
| * allocates one per partition and serializes calls. | ||
| */ | ||
| public void init(int partitionIndex) {} | ||
|
mbutrovich marked this conversation as resolved.
|
||
| } | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.