diff --git a/.github/workflows/pr_build_linux.yml b/.github/workflows/pr_build_linux.yml index 422232f546..bbd7c61486 100644 --- a/.github/workflows/pr_build_linux.yml +++ b/.github/workflows/pr_build_linux.yml @@ -351,6 +351,7 @@ jobs: org.apache.comet.CometMapExpressionSuite org.apache.comet.CometCsvExpressionSuite org.apache.comet.CometJsonExpressionSuite + org.apache.comet.CometJsonJvmSuite org.apache.comet.SparkErrorConverterSuite org.apache.comet.expressions.conditional.CometIfSuite org.apache.comet.expressions.conditional.CometCoalesceSuite diff --git a/.github/workflows/pr_build_macos.yml b/.github/workflows/pr_build_macos.yml index d0a03eeb75..41285dc8fd 100644 --- a/.github/workflows/pr_build_macos.yml +++ b/.github/workflows/pr_build_macos.yml @@ -193,6 +193,7 @@ jobs: org.apache.comet.CometMapExpressionSuite org.apache.comet.CometCsvExpressionSuite org.apache.comet.CometJsonExpressionSuite + org.apache.comet.CometJsonJvmSuite org.apache.comet.SparkErrorConverterSuite org.apache.comet.expressions.conditional.CometIfSuite org.apache.comet.expressions.conditional.CometCoalesceSuite diff --git a/docs/source/user-guide/latest/compatibility/index.md b/docs/source/user-guide/latest/compatibility/index.md index 542d6902ec..46578d1683 100644 --- a/docs/source/user-guide/latest/compatibility/index.md +++ b/docs/source/user-guide/latest/compatibility/index.md @@ -28,4 +28,5 @@ This guide documents areas where Comet's behavior is known to differ from Spark. - **Regular expressions**: differences between the Rust regexp crate and Java's regex engine. - **Operators**: operator-level compatibility notes, including window functions and round-robin partitioning. - **Expressions**: per-expression compatibility notes, including cast. +- **JSON**: choosing between the native and Spark-compatible engines for JSON expressions. - **Spark versions**: version-specific known issues and limitations. diff --git a/docs/source/user-guide/latest/compatibility/json.md b/docs/source/user-guide/latest/compatibility/json.md new file mode 100644 index 0000000000..5856046211 --- /dev/null +++ b/docs/source/user-guide/latest/compatibility/json.md @@ -0,0 +1,53 @@ + + +# JSON Compatibility + +Comet supports two engines for evaluating JSON expressions (`get_json_object`, +`from_json`, `to_json`), selected by the `spark.comet.exec.json.engine` +configuration entry: + +- `java` (default): routes evaluation through Comet's Arrow-direct codegen + dispatcher so Spark's own `doGenCode` for the expression runs inside the Comet + pipeline. Byte-exact compatibility with Spark, at the cost of a JNI roundtrip + per batch. This rides the codegen dispatcher + (`spark.comet.exec.scalaUDF.codegen.enabled`, enabled by default). If the + dispatcher is disabled, the operator falls back to Spark. +- `rust`: native DataFusion implementation. Faster, but has known compatibility + gaps with Spark on certain inputs. An expression or input case with no native + implementation falls back to the `java` engine. + +## Expression coverage + +| SQL | `rust` engine | `java` engine | +| ----------------- | ---------------------------------------------------------------------------- | ------------- | +| `get_json_object` | Supported, with gaps on single-quoted JSON and unescaped control characters | Compatible | +| `from_json` | Supported with restrictions (PERMISSIVE mode only, simple schema types only) | Compatible | +| `to_json` | Supported for struct inputs only, no options | Compatible | + +When the `rust` engine is selected but an expression or input case has no native +implementation (for example `to_json` with map or array inputs, or `from_json` +with an unsupported schema), Comet falls back to the `java` engine for that case. + +## When to use the `rust` engine + +- You want the faster native path and your inputs avoid the known compatibility + gaps above. +- Enable it with `spark.comet.exec.json.engine=rust`. Cases the native path does + not cover still fall back to the `java` engine. diff --git a/docs/source/user-guide/latest/expressions.md b/docs/source/user-guide/latest/expressions.md index 0823e79fed..ba9543bb1c 100644 --- a/docs/source/user-guide/latest/expressions.md +++ b/docs/source/user-guide/latest/expressions.md @@ -93,9 +93,11 @@ of expressions that be disabled. ## JSON Functions -| Expression | -| ------------- | -| GetJsonObject | +| Expression | SQL | +| ------------- | ----------------- | +| GetJsonObject | `get_json_object` | +| JsonToStructs | `from_json` | +| StructsToJson | `to_json` | ## Date/Time Functions diff --git a/docs/source/user-guide/latest/index.rst b/docs/source/user-guide/latest/index.rst index 00f770c27e..621fb9d73e 100644 --- a/docs/source/user-guide/latest/index.rst +++ b/docs/source/user-guide/latest/index.rst @@ -62,6 +62,7 @@ to read more. compatibility/regex compatibility/operators compatibility/expressions/index + compatibility/json compatibility/spark-versions .. toctree:: diff --git a/spark/src/main/scala/org/apache/comet/CometConf.scala b/spark/src/main/scala/org/apache/comet/CometConf.scala index 78ea0f0168..f2aca848f4 100644 --- a/spark/src/main/scala/org/apache/comet/CometConf.scala +++ b/spark/src/main/scala/org/apache/comet/CometConf.scala @@ -362,6 +362,25 @@ object CometConf extends ShimCometConf { .booleanConf .createWithDefault(false) + val JSON_ENGINE_RUST = "rust" + val JSON_ENGINE_JAVA = "java" + + val COMET_JSON_ENGINE: ConfigEntry[String] = + conf("spark.comet.exec.json.engine") + .category(CATEGORY_EXEC) + .doc("Selects the engine used to evaluate supported JSON expressions " + + "(get_json_object, from_json, to_json). " + + s"`$JSON_ENGINE_JAVA` (default) routes through the codegen dispatcher, which runs " + + "Spark's own expression code inside the Comet pipeline for byte-exact compatibility, " + + "at the cost of JNI roundtrips per batch. " + + s"`$JSON_ENGINE_RUST` uses the native DataFusion JSON implementation, which is faster " + + "but has known compatibility gaps; an expression or case with no native " + + s"implementation falls back to the `$JSON_ENGINE_JAVA` engine.") + .stringConf + .transform(_.toLowerCase(Locale.ROOT)) + .checkValues(Set(JSON_ENGINE_RUST, JSON_ENGINE_JAVA)) + .createWithDefault(JSON_ENGINE_JAVA) + val COMET_SCALA_UDF_CODEGEN_ENABLED: ConfigEntry[Boolean] = conf("spark.comet.exec.scalaUDF.codegen.enabled") .category(CATEGORY_EXEC) diff --git a/spark/src/main/scala/org/apache/comet/codegen/CometBatchKernelCodegen.scala b/spark/src/main/scala/org/apache/comet/codegen/CometBatchKernelCodegen.scala index 23d170743e..8ebd1a5cc8 100644 --- a/spark/src/main/scala/org/apache/comet/codegen/CometBatchKernelCodegen.scala +++ b/spark/src/main/scala/org/apache/comet/codegen/CometBatchKernelCodegen.scala @@ -23,7 +23,7 @@ import org.apache.arrow.vector._ import org.apache.arrow.vector.complex.{ListVector, MapVector, StructVector} import org.apache.arrow.vector.types.pojo.Field import org.apache.spark.internal.Logging -import org.apache.spark.sql.catalyst.expressions.{BoundReference, Expression, HigherOrderFunction, LambdaFunction, Literal, NamedLambdaVariable, Unevaluable} +import org.apache.spark.sql.catalyst.expressions.{BoundReference, Expression, Literal, Unevaluable} import org.apache.spark.sql.catalyst.expressions.codegen._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ @@ -107,9 +107,8 @@ object CometBatchKernelCodegen extends Logging with CometExprTraitShim { * back cleanly rather than crashing the Janino compile at execute time. * * Checks every `BoundReference`'s data type and the root `expr.dataType` against - * [[isSupportedDataType]], rejects aggregates / generators / `CodegenFallback` (other than - * HOFs, which are admitted), and gates total nested-field count on - * `spark.sql.codegen.maxFields`. + * [[isSupportedDataType]], rejects aggregates / generators / `Unevaluable`, and gates total + * nested-field count on `spark.sql.codegen.maxFields`. */ def canHandle(boundExpr: Expression): Option[String] = { if (!isSupportedDataType(boundExpr.dataType)) { @@ -127,12 +126,15 @@ object CometBatchKernelCodegen extends Logging with CometExprTraitShim { s"codegen dispatch: too many nested fields ($totalFields > " + s"spark.sql.codegen.maxFields=$maxFields)") } - // HOFs are `CodegenFallback` but admitted: `CodegenFallback.doGenCode` emits one - // `((Expression) references[N]).eval(row)` call site per HOF. The kernel dispatches to the - // HOF's interpreted `eval`, which mutates `NamedLambdaVariable.value` per element and reads - // the input array through the kernel's typed Arrow getters. Per-task `boundExpr` isolation - // in `CometScalaUDFCodegen.kernelCache` prevents concurrent partitions from racing on the - // lambda variable's `AtomicReference`. See `CometCodegenHOFSuite`. + // `CodegenFallback` expressions are admitted. `CodegenFallback.doGenCode` emits one + // `((Expression) references[N]).eval(row)` call site per expression. The kernel dispatches + // to the expression's interpreted `eval` against `row` aliased to `this`, so the eval reads + // through the kernel's typed Arrow getters. This covers `HigherOrderFunction` (which mutates + // `NamedLambdaVariable.value` per element; see `CometCodegenHOFSuite`) as well as other + // CodegenFallback expressions like `JsonToStructs` / `StructsToJson` whose `eval(row)` + // simply calls `row.get(0, dataType)`. Per-task `boundExpr` isolation in + // `CometScalaUDFCodegen.kernelCache` prevents concurrent partitions from racing on shared + // state inside the expression. // // Nondeterministic / stateful expressions are accepted: each cache entry holds one kernel // instance with a single `init(partitionIndex)` call, so `Rand` / `MonotonicallyIncreasingID` @@ -150,10 +152,6 @@ object CometBatchKernelCodegen extends Logging with CometExprTraitShim { boundExpr.find { case _: org.apache.spark.sql.catalyst.expressions.aggregate.AggregateFunction => true case _: org.apache.spark.sql.catalyst.expressions.Generator => true - case _: HigherOrderFunction => false - case _: LambdaFunction => false - case _: NamedLambdaVariable => false - case _: CodegenFallback => true case u: Unevaluable if isCodegenInertUnevaluable(u) => false case _: Unevaluable => true case _ => false @@ -161,7 +159,7 @@ object CometBatchKernelCodegen extends Logging with CometExprTraitShim { case Some(bad) => return Some( s"codegen dispatch: expression ${bad.getClass.getSimpleName} not supported " + - "(aggregate, generator, codegen-fallback, or unevaluable)") + "(aggregate, generator, or unevaluable)") case None => } val badRef = boundExpr.collectFirst { diff --git a/spark/src/main/scala/org/apache/comet/serde/CometScalaUDF.scala b/spark/src/main/scala/org/apache/comet/serde/CometScalaUDF.scala index a1d5be84ff..3df275f057 100644 --- a/spark/src/main/scala/org/apache/comet/serde/CometScalaUDF.scala +++ b/spark/src/main/scala/org/apache/comet/serde/CometScalaUDF.scala @@ -20,7 +20,7 @@ package org.apache.comet.serde import org.apache.spark.SparkEnv -import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, AttributeSeq, BindReferences, Expression, Literal, ScalaUDF} +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, AttributeSeq, BindReferences, Expression, Literal, RuntimeReplaceable, ScalaUDF} import org.apache.spark.sql.types.BinaryType import org.apache.comet.CometConf @@ -78,10 +78,20 @@ object CometScalaUDF extends CometExpressionSerde[ScalaUDF] { return None } + // `RuntimeReplaceable` expressions (e.g. Spark 4's `StructsToJson`) have a `doGenCode` that + // always throws "Cannot generate code for expression". Catalyst's `ReplaceExpressions` rule + // normally rewrites them to their `replacement` form before codegen runs. Comet's serde + // sometimes works with the pre-rewrite form (via shim reconstruction) for matching purposes, + // so unwrap to the replacement here before binding so the kernel compiles. + val target = expr match { + case rr: RuntimeReplaceable => rr.replacement + case other => other + } + // Bind against only the AttributeReferences the tree actually reads, so ordinals align with // the data args we ship. - val attrs = expr.collect { case a: AttributeReference => a }.distinct - val boundExpr = BindReferences.bindReference(expr, AttributeSeq(attrs)) + val attrs = target.collect { case a: AttributeReference => a }.distinct + val boundExpr = BindReferences.bindReference(target, AttributeSeq(attrs)) // Gate at plan time. Surface the reason via withFallbackReason rather than crashing Janino // at execute. diff --git a/spark/src/main/scala/org/apache/comet/serde/strings.scala b/spark/src/main/scala/org/apache/comet/serde/strings.scala index f2f10d5f1c..ff289fb2d2 100644 --- a/spark/src/main/scala/org/apache/comet/serde/strings.scala +++ b/spark/src/main/scala/org/apache/comet/serde/strings.scala @@ -442,7 +442,13 @@ object CometStringSplit extends CometExpressionSerde[StringSplit] { } } -object CometGetJsonObject extends CometExpressionSerde[GetJsonObject] { +/** + * `get_json_object` runs on the native (rust) engine when selected, where it is incompatible with + * Spark for single-quoted JSON and unescaped control characters. Otherwise it rides the codegen + * dispatcher (the default `java` engine) via [[CometCodegenDispatch]], which runs Spark's own + * implementation for byte-exact results. + */ +object CometGetJsonObject extends CometCodegenDispatch[GetJsonObject] { private val incompatReason = "Spark allows single-quoted JSON and unescaped control characters which Comet does not" + @@ -451,22 +457,26 @@ object CometGetJsonObject extends CometExpressionSerde[GetJsonObject] { override def getIncompatibleReasons(): Seq[String] = Seq(incompatReason) override def getSupportLevel(expr: GetJsonObject): SupportLevel = - Incompatible(Some(incompatReason)) + if (JsonEngine.nativeSelected) Incompatible(Some(incompatReason)) + else super.getSupportLevel(expr) override def convert( expr: GetJsonObject, inputs: Seq[Attribute], - binding: Boolean): Option[Expr] = { - val jsonExpr = exprToProtoInternal(expr.json, inputs, binding) - val pathExpr = exprToProtoInternal(expr.path, inputs, binding) - val optExpr = scalarFunctionExprToProtoWithReturnType( - "get_json_object", - expr.dataType, - false, - jsonExpr, - pathExpr) - optExprWithFallbackReason(optExpr, expr, expr.json, expr.path) - } + binding: Boolean): Option[Expr] = + if (JsonEngine.nativeSelected) { + val jsonExpr = exprToProtoInternal(expr.json, inputs, binding) + val pathExpr = exprToProtoInternal(expr.path, inputs, binding) + val optExpr = scalarFunctionExprToProtoWithReturnType( + "get_json_object", + expr.dataType, + false, + jsonExpr, + pathExpr) + optExprWithFallbackReason(optExpr, expr, expr.json, expr.path) + } else { + super.convert(expr, inputs, binding) + } } trait CommonStringExprs { diff --git a/spark/src/main/scala/org/apache/comet/serde/structs.scala b/spark/src/main/scala/org/apache/comet/serde/structs.scala index 3c7dc17844..9cf88f23d7 100644 --- a/spark/src/main/scala/org/apache/comet/serde/structs.scala +++ b/spark/src/main/scala/org/apache/comet/serde/structs.scala @@ -26,10 +26,21 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, CreateNamedStruct, import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ +import org.apache.comet.CometConf import org.apache.comet.CometSparkSessionExtensions.withFallbackReason import org.apache.comet.DataTypeSupport import org.apache.comet.serde.QueryPlanSerde.{exprToProtoInternal, serializeDataType} +/** + * Whether the native (rust) JSON engine is selected. When it is not, or when a JSON expression + * has no native implementation for a given case, the serde falls through to the codegen + * dispatcher (the default `java` engine) via [[CometCodegenDispatch]]. + */ +private[serde] object JsonEngine { + def nativeSelected: Boolean = + CometConf.COMET_JSON_ENGINE.get() == CometConf.JSON_ENGINE_RUST +} + object CometCreateNamedStruct extends CometExpressionSerde[CreateNamedStruct] { private val duplicateNamesReason = @@ -113,42 +124,46 @@ object CometGetArrayStructFields extends CometExpressionSerde[GetArrayStructFiel } } -object CometStructsToJson extends CometExpressionSerde[StructsToJson] { +/** + * `to_json` runs on the native (rust) engine when selected and the child is a struct of supported + * types with no options. Any other case (unsupported types, options, or the default `java` + * engine) falls through to the codegen dispatcher via [[CometCodegenDispatch]], which runs + * Spark's own implementation. + */ +object CometStructsToJson extends CometCodegenDispatch[StructsToJson] { - override def getSupportLevel(expr: StructsToJson): SupportLevel = { - if (expr.options.nonEmpty) { - return Unsupported(Some("StructsToJson with options is not supported")) - } - val dataType = expr.child.dataType - if (!isSupportedType(dataType)) { - return Unsupported(Some(s"Struct type: $dataType contains unsupported types")) - } - Compatible() - } + private def nativeSupported(expr: StructsToJson): Boolean = + JsonEngine.nativeSelected && expr.options.isEmpty && isSupportedType(expr.child.dataType) + + override def getSupportLevel(expr: StructsToJson): SupportLevel = + if (nativeSupported(expr)) Compatible() else super.getSupportLevel(expr) override def convert( expr: StructsToJson, inputs: Seq[Attribute], - binding: Boolean): Option[ExprOuterClass.Expr] = { - val ignoreNullFields = SQLConf.get.jsonGeneratorIgnoreNullFields - exprToProtoInternal(expr.child, inputs, binding) match { - case Some(p) => - val toJson = ExprOuterClass.ToJson - .newBuilder() - .setChild(p) - .setTimezone(expr.timeZoneId.getOrElse("UTC")) - .setIgnoreNullFields(ignoreNullFields) - .build() - Some( - ExprOuterClass.Expr + binding: Boolean): Option[ExprOuterClass.Expr] = + if (nativeSupported(expr)) { + val ignoreNullFields = SQLConf.get.jsonGeneratorIgnoreNullFields + exprToProtoInternal(expr.child, inputs, binding) match { + case Some(p) => + val toJson = ExprOuterClass.ToJson .newBuilder() - .setToJson(toJson) - .build()) - case _ => - withFallbackReason(expr, expr.child) - None + .setChild(p) + .setTimezone(expr.timeZoneId.getOrElse("UTC")) + .setIgnoreNullFields(ignoreNullFields) + .build() + Some( + ExprOuterClass.Expr + .newBuilder() + .setToJson(toJson) + .build()) + case _ => + withFallbackReason(expr, expr.child) + None + } + } else { + super.convert(expr, inputs, binding) } - } def isSupportedType(dt: DataType): Boolean = { dt match { @@ -170,17 +185,27 @@ object CometStructsToJson extends CometExpressionSerde[StructsToJson] { } } -object CometJsonToStructs extends CometExpressionSerde[JsonToStructs] { +/** + * `from_json` runs on the native (rust) engine when selected and the target schema is one the + * native path supports, where it is incompatible (partially implemented). Any other case (a + * schema the native path does not support, or the default `java` engine) falls through to the + * codegen dispatcher via [[CometCodegenDispatch]], which runs Spark's own implementation. + */ +object CometJsonToStructs extends CometCodegenDispatch[JsonToStructs] { override def getIncompatibleReasons(): Seq[String] = Seq( - "Partially implemented and not comprehensively tested") + "The native (rust) engine is partially implemented and not comprehensively tested") - override def getUnsupportedReasons(): Seq[String] = Seq("Requires an explicit schema") + private def nativeSupported(expr: JsonToStructs): Boolean = + JsonEngine.nativeSelected && expr.schema != null && isSupportedSchema(expr.schema) - override def getSupportLevel(expr: JsonToStructs): SupportLevel = { - // this feature is partially implemented and not comprehensively tested yet - Incompatible() - } + override def getSupportLevel(expr: JsonToStructs): SupportLevel = + if (nativeSupported(expr)) { + // the native path is partially implemented and not comprehensively tested yet + Incompatible() + } else { + super.getSupportLevel(expr) + } override def convert( expr: JsonToStructs, @@ -192,21 +217,8 @@ object CometJsonToStructs extends CometExpressionSerde[JsonToStructs] { return None } - def isSupportedType(dt: DataType): Boolean = { - dt match { - case StructType(fields) => - fields.nonEmpty && fields.forall(f => isSupportedType(f.dataType)) - case DataTypes.IntegerType | DataTypes.LongType | DataTypes.FloatType | - DataTypes.DoubleType | DataTypes.BooleanType | DataTypes.StringType => - true - case _ => false - } - } - - val schemaType = expr.schema - if (!isSupportedType(schemaType)) { - withFallbackReason(expr, "from_json: Unsupported schema type") - return None + if (!nativeSupported(expr)) { + return super.convert(expr, inputs, binding) } val options = expr.options @@ -228,7 +240,7 @@ object CometJsonToStructs extends CometExpressionSerde[JsonToStructs] { // Convert child expression and schema to protobuf for { childProto <- exprToProtoInternal(expr.child, inputs, binding) - schemaProto <- serializeDataType(schemaType) + schemaProto <- serializeDataType(expr.schema) } yield { val fromJson = ExprOuterClass.FromJson .newBuilder() @@ -239,6 +251,15 @@ object CometJsonToStructs extends CometExpressionSerde[JsonToStructs] { ExprOuterClass.Expr.newBuilder().setFromJson(fromJson).build() } } + + private def isSupportedSchema(dt: DataType): Boolean = dt match { + case StructType(fields) => + fields.nonEmpty && fields.forall(f => isSupportedSchema(f.dataType)) + case DataTypes.IntegerType | DataTypes.LongType | DataTypes.FloatType | DataTypes.DoubleType | + DataTypes.BooleanType | DataTypes.StringType => + true + case _ => false + } } object CometStructsToCsv extends CometExpressionSerde[StructsToCsv] { diff --git a/spark/src/test/resources/sql-tests/expressions/string/get_json_object.sql b/spark/src/test/resources/sql-tests/expressions/string/get_json_object.sql index edbebbdb77..45d552057c 100644 --- a/spark/src/test/resources/sql-tests/expressions/string/get_json_object.sql +++ b/spark/src/test/resources/sql-tests/expressions/string/get_json_object.sql @@ -16,6 +16,7 @@ -- under the License. -- Config: spark.comet.expression.GetJsonObject.allowIncompatible=true +-- Config: spark.comet.exec.json.engine=rust -- ConfigMatrix: parquet.enable.dictionary=false,true statement diff --git a/spark/src/test/resources/sql-tests/expressions/struct/structs_to_json.sql b/spark/src/test/resources/sql-tests/expressions/struct/structs_to_json.sql index 3cfd636d25..6896881343 100644 --- a/spark/src/test/resources/sql-tests/expressions/struct/structs_to_json.sql +++ b/spark/src/test/resources/sql-tests/expressions/struct/structs_to_json.sql @@ -15,6 +15,7 @@ -- specific language governing permissions and limitations -- under the License. +-- Config: spark.comet.exec.json.engine=rust -- ConfigMatrix: spark.sql.jsonGenerator.ignoreNullFields=false,true statement @@ -30,8 +31,10 @@ SELECT to_json(named_struct('a', a, 'b', b, 'f', f, 'd', d)) FROM test_to_json query SELECT to_json(named_struct('a', 1, 'b', 'hello')) -query expect_fallback(StructsToJson with options is not supported) +-- to_json with options and with array fields are not supported by the native (rust) path, so they +-- route to the codegen (java) engine, which runs Spark's own implementation inside Comet. +query SELECT to_json(named_struct('a', a, 'b', b), map('timestampFormat', 'dd/MM/yyyy')) FROM test_to_json -query expect_fallback(Struct type: StructType(StructField(a,IntegerType,true),StructField(b,ArrayType(StringType,true),false)) contains unsupported types) +query SELECT to_json(named_struct('a', a, 'b', array(b))) FROM test_to_json diff --git a/spark/src/test/scala/org/apache/comet/CometCodegenSourceSuite.scala b/spark/src/test/scala/org/apache/comet/CometCodegenSourceSuite.scala index e8f59e8b22..b58fd00a04 100644 --- a/spark/src/test/scala/org/apache/comet/CometCodegenSourceSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometCodegenSourceSuite.scala @@ -166,13 +166,16 @@ class CometCodegenSourceSuite extends AnyFunSuite { s"got:\n$src") } - test("canHandle rejects CodegenFallback expressions") { + test("canHandle accepts CodegenFallback expressions (delegates to eval(row))") { + // CodegenFallback.doGenCode emits ((Expression) references[N]).eval(row) which is the same + // mechanism that backs HigherOrderFunction support: the eval reads through the kernel's typed + // Arrow getters via the row alias. Other CodegenFallback expressions (JsonToStructs, + // StructsToJson, ...) ride the same path. val expr = FakeCodegenFallback(BoundReference(0, StringType, nullable = true)) val reason = CometBatchKernelCodegen.canHandle(expr) - assert(reason.isDefined, "expected canHandle to reject CodegenFallback") assert( - reason.get.contains("FakeCodegenFallback"), - s"expected reason to name the rejected expression class; got: ${reason.get}") + reason.isEmpty, + s"expected canHandle to accept CodegenFallback; got rejection: ${reason.getOrElse("")}") } test("canHandle accepts Nondeterministic expressions (per-partition kernel handles state)") { diff --git a/spark/src/test/scala/org/apache/comet/CometJsonExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometJsonExpressionSuite.scala index c3446f6264..b4ba5e6da8 100644 --- a/spark/src/test/scala/org/apache/comet/CometJsonExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometJsonExpressionSuite.scala @@ -39,6 +39,8 @@ class CometJsonExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelpe pos: Position): Unit = { super.test(testName, testTags: _*) { withSQLConf( + // This suite exercises the native (rust) JSON path. The default engine is now `java`. + CometConf.COMET_JSON_ENGINE.key -> CometConf.JSON_ENGINE_RUST, CometConf.getExprAllowIncompatConfigKey(classOf[JsonToStructs]) -> "true", CometConf.getExprAllowIncompatConfigKey(classOf[StructsToJson]) -> "true") { testFun @@ -70,16 +72,17 @@ class CometJsonExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelpe } } - test("to_json - fallback reasons") { + test("to_json - options and unsupported native types route to the codegen engine") { withTable("t") { sql("CREATE TABLE t(a INT, b STRING) USING parquet") sql("INSERT INTO t VALUES (1, 'hello')") - checkSparkAnswerAndFallbackReason( - "SELECT to_json(named_struct('a', a, 'b', b), map('timestampFormat', 'dd/MM/yyyy')) FROM t", - "StructsToJson with options is not supported") - checkSparkAnswerAndFallbackReason( - "SELECT to_json(named_struct('b', array(b))) FROM t", - "Struct type: StructType(StructField(b,ArrayType(StringType,true),false)) contains unsupported types") + // The native (rust) path does not support to_json with options or array/map types. With the + // rust engine selected, these cases fall back to the codegen (java) engine, which runs + // Spark's own implementation inside the Comet pipeline rather than falling back to Spark. + checkSparkAnswerAndOperator( + "SELECT to_json(named_struct('a', a, 'b', b), map('timestampFormat', 'dd/MM/yyyy')) FROM t") + checkSparkAnswerAndOperator( + "SELECT to_json(named_struct('b', array(b))) FROM t") } } diff --git a/spark/src/test/scala/org/apache/comet/CometJsonJvmSuite.scala b/spark/src/test/scala/org/apache/comet/CometJsonJvmSuite.scala new file mode 100644 index 0000000000..2e1242660d --- /dev/null +++ b/spark/src/test/scala/org/apache/comet/CometJsonJvmSuite.scala @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet + +import org.apache.spark.SparkConf +import org.apache.spark.sql.CometTestBase +import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper + +class CometJsonJvmSuite extends CometTestBase with AdaptiveSparkPlanHelper { + + override protected def sparkConf: SparkConf = + super.sparkConf + .set(CometConf.COMET_JSON_ENGINE.key, CometConf.JSON_ENGINE_JAVA) + .set(CometConf.COMET_SCALA_UDF_CODEGEN_ENABLED.key, "true") + + private val rows = Seq( + """{"a":1,"b":"x","arr":[1,2,3]}""", + """{"a":2,"b":"y","arr":[]}""", + """{"a":null,"b":"z","arr":[10]}""", + null, + """not json""") + + private def withJsonTable(f: => Unit): Unit = { + withTable("t") { + sql("CREATE TABLE t (j STRING) USING parquet") + val sqlRows = rows + .map(v => if (v == null) "(NULL)" else s"('${v.replace("'", "''")}')") + .mkString(", ") + sql(s"INSERT INTO t VALUES $sqlRows") + f + } + } + + test("get_json_object via JVM engine") { + withJsonTable { + checkSparkAnswerAndOperator(sql("SELECT get_json_object(j, '$.a') FROM t")) + checkSparkAnswerAndOperator(sql("SELECT get_json_object(j, '$.b') FROM t")) + } + } + + test("from_json with explicit schema via JVM engine") { + withJsonTable { + checkSparkAnswerAndOperator(sql("SELECT from_json(j, 'a INT, b STRING') FROM t")) + } + } + + test("to_json round-trip via JVM engine") { + withJsonTable { + checkSparkAnswerAndOperator(sql("SELECT to_json(from_json(j, 'a INT, b STRING')) FROM t")) + } + } +}