Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/pr_build_linux.yml
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,7 @@ jobs:
org.apache.comet.CometMapExpressionSuite
org.apache.comet.CometCsvExpressionSuite
org.apache.comet.CometJsonExpressionSuite
org.apache.comet.CometJsonJvmSuite
org.apache.comet.SparkErrorConverterSuite
org.apache.comet.expressions.conditional.CometIfSuite
org.apache.comet.expressions.conditional.CometCoalesceSuite
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/pr_build_macos.yml
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ jobs:
org.apache.comet.CometMapExpressionSuite
org.apache.comet.CometCsvExpressionSuite
org.apache.comet.CometJsonExpressionSuite
org.apache.comet.CometJsonJvmSuite
org.apache.comet.SparkErrorConverterSuite
org.apache.comet.expressions.conditional.CometIfSuite
org.apache.comet.expressions.conditional.CometCoalesceSuite
Expand Down
45 changes: 45 additions & 0 deletions docs/source/user-guide/latest/compatibility/json.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
<!---
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->

# JSON Compatibility

Comet supports two engines for evaluating JSON expressions, selected by the
`spark.comet.exec.json.engine` configuration entry:

- `rust` (default): native DataFusion implementation. Fast, but has known
compatibility gaps with Spark on certain inputs.
- `java` (experimental): routes evaluation through Comet's Arrow-direct codegen
dispatcher so Spark's own `doGenCode` for the expression runs inside the Comet
pipeline. Byte-exact compatibility, at the cost of a JNI roundtrip per batch.
Requires `spark.comet.exec.scalaUDF.codegen.enabled=true`; otherwise the
operator falls back to Spark.

## Expression coverage

| SQL | `rust` engine | `java` engine |
| ----------------- | ---------------------------------------------------------------------------- | ------------- |
| `get_json_object` | Supported, with gaps on single-quoted JSON and unescaped control characters | Compatible |
| `from_json` | Supported with restrictions (PERMISSIVE mode only, simple schema types only) | Compatible |
| `to_json` | Supported for struct inputs only, no options | Compatible |

## When to use the `java` engine

- You hit a compatibility gap in the `rust` engine and need exact Spark output.
- You can absorb the JNI overhead. Typically negligible relative to JSON parse
cost, but verify with your own benchmarks.
8 changes: 5 additions & 3 deletions docs/source/user-guide/latest/expressions.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,11 @@ of expressions that be disabled.

## JSON Functions

| Expression |
| ------------- |
| GetJsonObject |
| Expression | SQL |
| ------------- | ----------------- |
| GetJsonObject | `get_json_object` |
| JsonToStructs | `from_json` |
| StructsToJson | `to_json` |

## Date/Time Functions

Expand Down
18 changes: 18 additions & 0 deletions spark/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,24 @@ object CometConf extends ShimCometConf {
.booleanConf
.createWithDefault(false)

val JSON_ENGINE_RUST = "rust"
val JSON_ENGINE_JAVA = "java"

val COMET_JSON_ENGINE: ConfigEntry[String] =
conf("spark.comet.exec.json.engine")
.category(CATEGORY_EXEC)
.doc(
"Selects the engine used to evaluate supported JSON expressions. " +
s"`$JSON_ENGINE_RUST` uses the native DataFusion JSON implementation. " +
s"`$JSON_ENGINE_JAVA` is experimental and routes through a JVM-side UDF " +
"that delegates to Spark's expression classes for byte-exact compatibility, " +
"at the cost of JNI roundtrips per batch. Expressions routed when set to java: " +
"get_json_object, from_json, to_json.")
.stringConf
.transform(_.toLowerCase(Locale.ROOT))
.checkValues(Set(JSON_ENGINE_RUST, JSON_ENGINE_JAVA))
.createWithDefault(JSON_ENGINE_RUST)

val COMET_SCALA_UDF_CODEGEN_ENABLED: ConfigEntry[Boolean] =
conf("spark.comet.exec.scalaUDF.codegen.enabled")
.category(CATEGORY_EXEC)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.arrow.vector._
import org.apache.arrow.vector.complex.{ListVector, MapVector, StructVector}
import org.apache.arrow.vector.types.pojo.Field
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.expressions.{BoundReference, Expression, HigherOrderFunction, LambdaFunction, Literal, NamedLambdaVariable, Unevaluable}
import org.apache.spark.sql.catalyst.expressions.{BoundReference, Expression, Literal, Unevaluable}
import org.apache.spark.sql.catalyst.expressions.codegen._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -107,9 +107,8 @@ object CometBatchKernelCodegen extends Logging with CometExprTraitShim {
* rather than crashing the Janino compile at execute time.
*
* Checks every `BoundReference`'s data type and the root `expr.dataType` against
* [[isSupportedDataType]], rejects aggregates / generators / `CodegenFallback` (other than
* HOFs, which are admitted), and gates total nested-field count on
* `spark.sql.codegen.maxFields`.
* [[isSupportedDataType]], rejects aggregates / generators / `Unevaluable`, and gates total
* nested-field count on `spark.sql.codegen.maxFields`.
*/
def canHandle(boundExpr: Expression): Option[String] = {
if (!isSupportedDataType(boundExpr.dataType)) {
Expand All @@ -127,12 +126,15 @@ object CometBatchKernelCodegen extends Logging with CometExprTraitShim {
s"codegen dispatch: too many nested fields ($totalFields > " +
s"spark.sql.codegen.maxFields=$maxFields)")
}
// HOFs are `CodegenFallback` but admitted: `CodegenFallback.doGenCode` emits one
// `((Expression) references[N]).eval(row)` call site per HOF. The kernel dispatches to the
// HOF's interpreted `eval`, which mutates `NamedLambdaVariable.value` per element and reads
// the input array through the kernel's typed Arrow getters. Per-task `boundExpr` isolation
// in `CometScalaUDFCodegen.kernelCache` prevents concurrent partitions from racing on the
// lambda variable's `AtomicReference`. See `CometCodegenHOFSuite`.
// `CodegenFallback` expressions are admitted. `CodegenFallback.doGenCode` emits one
// `((Expression) references[N]).eval(row)` call site per expression. The kernel dispatches
// to the expression's interpreted `eval` against `row` aliased to `this`, so the eval reads
// through the kernel's typed Arrow getters. This covers `HigherOrderFunction` (which mutates
// `NamedLambdaVariable.value` per element; see `CometCodegenHOFSuite`) as well as other
// CodegenFallback expressions like `JsonToStructs` / `StructsToJson` whose `eval(row)`
// simply calls `row.get(0, dataType)`. Per-task `boundExpr` isolation in
// `CometScalaUDFCodegen.kernelCache` prevents concurrent partitions from racing on shared
// state inside the expression.
//
// Nondeterministic / stateful expressions are accepted: each cache entry holds one kernel
// instance with a single `init(partitionIndex)` call, so `Rand` / `MonotonicallyIncreasingID`
Expand All @@ -150,18 +152,14 @@ object CometBatchKernelCodegen extends Logging with CometExprTraitShim {
boundExpr.find {
case _: org.apache.spark.sql.catalyst.expressions.aggregate.AggregateFunction => true
case _: org.apache.spark.sql.catalyst.expressions.Generator => true
case _: HigherOrderFunction => false
case _: LambdaFunction => false
case _: NamedLambdaVariable => false
case _: CodegenFallback => true
case u: Unevaluable if isCodegenInertUnevaluable(u) => false
case _: Unevaluable => true
case _ => false
} match {
case Some(bad) =>
return Some(
s"codegen dispatch: expression ${bad.getClass.getSimpleName} not supported " +
"(aggregate, generator, codegen-fallback, or unevaluable)")
"(aggregate, generator, or unevaluable)")
case None =>
}
val badRef = boundExpr.collectFirst {
Expand Down
16 changes: 13 additions & 3 deletions spark/src/main/scala/org/apache/comet/serde/CometScalaUDF.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
package org.apache.comet.serde

import org.apache.spark.SparkEnv
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, AttributeSeq, BindReferences, Expression, Literal, ScalaUDF}
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, AttributeSeq, BindReferences, Expression, Literal, RuntimeReplaceable, ScalaUDF}
import org.apache.spark.sql.types.BinaryType

import org.apache.comet.CometConf
Expand Down Expand Up @@ -77,10 +77,20 @@ object CometScalaUDF extends CometExpressionSerde[ScalaUDF] {
return None
}

// `RuntimeReplaceable` expressions (e.g. Spark 4's `StructsToJson`) have a `doGenCode` that
// always throws "Cannot generate code for expression". Catalyst's `ReplaceExpressions` rule
// normally rewrites them to their `replacement` form before codegen runs. Comet's serde
// sometimes works with the pre-rewrite form (via shim reconstruction) for matching purposes,
// so unwrap to the replacement here before binding so the kernel compiles.
val target = expr match {
case rr: RuntimeReplaceable => rr.replacement
case other => other
}

// Bind against only the AttributeReferences the tree actually reads, so ordinals align with
// the data args we ship.
val attrs = expr.collect { case a: AttributeReference => a }.distinct
val boundExpr = BindReferences.bindReference(expr, AttributeSeq(attrs))
val attrs = target.collect { case a: AttributeReference => a }.distinct
val boundExpr = BindReferences.bindReference(target, AttributeSeq(attrs))

// Gate at plan time. Surface the reason via withInfo rather than crashing Janino at execute.
CometBatchKernelCodegen.canHandle(boundExpr) match {
Expand Down
48 changes: 30 additions & 18 deletions spark/src/main/scala/org/apache/comet/serde/strings.scala
Original file line number Diff line number Diff line change
Expand Up @@ -445,28 +445,40 @@ object CometGetJsonObject extends CometExpressionSerde[GetJsonObject] {

override def getIncompatibleReasons(): Seq[String] = Seq(
"Spark allows single-quoted JSON and unescaped control characters which Comet does not" +
" support")

override def getSupportLevel(expr: GetJsonObject): SupportLevel =
Incompatible(
Some(
"Spark allows single-quoted JSON and unescaped control characters " +
"which Comet does not support"))
" support when using engine=rust")

override def getSupportLevel(expr: GetJsonObject): SupportLevel = {
JsonRoute.choose("get_json_object") match {
case JsonRoute.Native =>
Incompatible(
Some(
"Spark allows single-quoted JSON and unescaped control characters " +
"which Comet does not support"))
case JsonRoute.JvmCodegen => Compatible(None)
case JsonRoute.Fallback(reason) => Unsupported(Some(reason))
}
}

override def convert(
expr: GetJsonObject,
inputs: Seq[Attribute],
binding: Boolean): Option[Expr] = {
val jsonExpr = exprToProtoInternal(expr.json, inputs, binding)
val pathExpr = exprToProtoInternal(expr.path, inputs, binding)
val optExpr = scalarFunctionExprToProtoWithReturnType(
"get_json_object",
expr.dataType,
false,
jsonExpr,
pathExpr)
optExprWithInfo(optExpr, expr, expr.json, expr.path)
}
binding: Boolean): Option[Expr] =
JsonRoute.choose("get_json_object") match {
case JsonRoute.Native =>
val jsonExpr = exprToProtoInternal(expr.json, inputs, binding)
val pathExpr = exprToProtoInternal(expr.path, inputs, binding)
val optExpr = scalarFunctionExprToProtoWithReturnType(
"get_json_object",
expr.dataType,
false,
jsonExpr,
pathExpr)
optExprWithInfo(optExpr, expr, expr.json, expr.path)
case JsonRoute.JvmCodegen => CometScalaUDF.emitJvmCodegenDispatch(expr, inputs, binding)
case JsonRoute.Fallback(reason) =>
withInfo(expr, reason)
None
}
}

trait CommonStringExprs {
Expand Down
Loading
Loading