From 8a240d3e11ecce35b476af38bda0dbf0a9ae0932 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 12 May 2026 11:21:17 -0600 Subject: [PATCH 1/4] feat: add JVM UDF engine for Spark JSON expressions Add `spark.comet.exec.json.engine` (default `rust`, experimental `java`) that routes the JSON expressions in scope through the JVM UDF framework introduced in #4232, delegating to Spark's own expression classes for byte-exact compatibility at the cost of JNI roundtrips per batch. Expressions in scope when `engine=java`: - `get_json_object` -> `GetJsonObjectUDF` - `from_json` -> `FromJsonUDF` - `to_json` -> `ToJsonUDF` A fresh Spark expression is built per `evaluate` call. Spark's JSON evaluators (`GetJsonObjectEvaluator`, `StructsToJsonEvaluator`, `JsonToStructsEvaluator`) hold mutable per-row state, and the JVM UDF framework shares one UDF instance across native worker threads, so a cached cross-thread expression races on its evaluator state. `from_json` / `to_json` use a serde-side `CometLambdaRegistry` to pass the configured Spark expression (schema, options, timezone) to the UDF. The serde rebinds the child to `BoundReference(0)` so the UDF can call `eval(row)` against a single-column wrapper row. `json_array_length` and `json_object_keys` are out of scope: both are `RuntimeReplaceable` in Spark 4.x and Catalyst's `ReplaceExpressions` rule rewrites them to `StaticInvoke` before Comet sees the plan, so `classOf[LengthOfJsonArray]` / `classOf[JsonObjectKeys]` serde registrations never match. Adding support requires recognizing the rewritten `StaticInvoke` form in Comet's serde dispatch. This PR was scaffolded with the project's brainstorming, writing-plans, and subagent-driven-development skills. --- .../scala/org/apache/comet/CometConf.scala | 18 ++ .../org/apache/comet/udf/FromJsonUDF.scala | 207 ++++++++++++++++++ .../apache/comet/udf/GetJsonObjectUDF.scala | 84 +++++++ .../org/apache/comet/udf/ToJsonUDF.scala | 130 +++++++++++ .../user-guide/latest/compatibility/json.md | 43 ++++ docs/source/user-guide/latest/expressions.md | 8 +- .../org/apache/comet/serde/strings.scala | 80 +++++-- .../org/apache/comet/serde/structs.scala | 203 ++++++++++++----- .../org/apache/comet/CometJsonJvmSuite.scala | 67 ++++++ .../apache/comet/udf/FromJsonUDFSuite.scala | 107 +++++++++ .../comet/udf/GetJsonObjectUDFSuite.scala | 98 +++++++++ .../org/apache/comet/udf/ToJsonUDFSuite.scala | 117 ++++++++++ 12 files changed, 1091 insertions(+), 71 deletions(-) create mode 100644 common/src/main/scala/org/apache/comet/udf/FromJsonUDF.scala create mode 100644 common/src/main/scala/org/apache/comet/udf/GetJsonObjectUDF.scala create mode 100644 common/src/main/scala/org/apache/comet/udf/ToJsonUDF.scala create mode 100644 docs/source/user-guide/latest/compatibility/json.md create mode 100644 spark/src/test/scala/org/apache/comet/CometJsonJvmSuite.scala create mode 100644 spark/src/test/scala/org/apache/comet/udf/FromJsonUDFSuite.scala create mode 100644 spark/src/test/scala/org/apache/comet/udf/GetJsonObjectUDFSuite.scala create mode 100644 spark/src/test/scala/org/apache/comet/udf/ToJsonUDFSuite.scala diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index 9b376837f7..087cb99e04 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -380,6 +380,24 @@ object CometConf extends ShimCometConf { .booleanConf .createWithDefault(false) + val JSON_ENGINE_RUST = "rust" + val JSON_ENGINE_JAVA = "java" + + val COMET_JSON_ENGINE: ConfigEntry[String] = + conf("spark.comet.exec.json.engine") + .category(CATEGORY_EXEC) + .doc( + "Selects the engine used to evaluate supported JSON expressions. " + + s"`$JSON_ENGINE_RUST` uses the native DataFusion JSON implementation. " + + s"`$JSON_ENGINE_JAVA` is experimental and routes through a JVM-side UDF " + + "that delegates to Spark's expression classes for byte-exact compatibility, " + + "at the cost of JNI roundtrips per batch. Expressions routed when set to java: " + + "get_json_object, from_json, to_json.") + .stringConf + .transform(_.toLowerCase(Locale.ROOT)) + .checkValues(Set(JSON_ENGINE_RUST, JSON_ENGINE_JAVA)) + .createWithDefault(JSON_ENGINE_RUST) + val COMET_EXEC_SHUFFLE_WITH_HASH_PARTITIONING_ENABLED: ConfigEntry[Boolean] = conf("spark.comet.native.shuffle.partitioning.hash.enabled") .category(CATEGORY_SHUFFLE) diff --git a/common/src/main/scala/org/apache/comet/udf/FromJsonUDF.scala b/common/src/main/scala/org/apache/comet/udf/FromJsonUDF.scala new file mode 100644 index 0000000000..7a94030f7b --- /dev/null +++ b/common/src/main/scala/org/apache/comet/udf/FromJsonUDF.scala @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.udf + +import java.nio.charset.StandardCharsets + +import org.apache.arrow.vector.{BigIntVector, BitVector, FieldVector, Float4Vector, Float8Vector, IntVector, ValueVector, VarCharVector} +import org.apache.arrow.vector.complex.StructVector +import org.apache.arrow.vector.types.FloatingPointPrecision +import org.apache.arrow.vector.types.pojo.{ArrowType, FieldType} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{BoundReference, Expression, GenericInternalRow, JsonToStructs, RuntimeReplaceable} +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.UTF8String + +import org.apache.comet.CometArrowAllocator + +/** + * `from_json(json, schema)` implemented via Spark's `JsonToStructs` for byte-exact compatibility. + * + * The registered expression is looked up from `CometLambdaRegistry` using a scalar key argument. + * The output schema is read from the registered expression's `dataType`. The UDF is stateless: + * every call resolves the expression from the registry, so a single UDF instance can be shared + * across native worker threads. + * + * Inputs: + * - inputs(0): VarCharVector json column + * - inputs(1): VarCharVector scalar (length-1) containing the registry key string + * + * Output: StructVector matching the registered schema. The caller owns the returned vector and + * must close it; closing the StructVector recursively closes its child vectors. + * + * Supported field types (matching `CometJsonToStructs.isSupportedType`): Boolean, Integer, Long, + * Float, Double, String, plus nested struct of those. + */ +class FromJsonUDF extends CometUDF { + + override def evaluate(inputs: Array[ValueVector]): ValueVector = { + require(inputs.length == 2, s"FromJsonUDF expects 2 inputs (json, key), got ${inputs.length}") + val json = inputs(0).asInstanceOf[VarCharVector] + val keyVec = inputs(1).asInstanceOf[VarCharVector] + require( + keyVec.getValueCount >= 1 && !keyVec.isNull(0), + "FromJsonUDF requires a non-null scalar registry key") + + val key = new String(keyVec.get(0), StandardCharsets.UTF_8) + val configExpr = CometLambdaRegistry.get(key).asInstanceOf[JsonToStructs] + val schema = configExpr.dataType.asInstanceOf[StructType] + // Build a fresh expression per call: Spark's JsonToStructsEvaluator may hold mutable + // per-row state, so a shared cross-thread instance is unsafe (the JVM UDF framework + // reuses one UDF instance across native worker threads). + val sparkExpr = JsonToStructs( + schema, + configExpr.options, + BoundReference(0, StringType, nullable = true), + configExpr.timeZoneId) + val evalExpr: Expression = sparkExpr match { + case r: RuntimeReplaceable => r.replacement + case other => other + } + + val n = json.getValueCount + val out = StructVector.empty("from_json_result", CometArrowAllocator) + schema.fields.foreach(f => addChild(out, f.name, f.dataType, f.nullable)) + out.setInitialCapacity(n) + out.allocateNew() + + val row = new GenericInternalRow(1) + var i = 0 + while (i < n) { + if (json.isNull(i)) { + // entry stays null (default after allocateNew) + } else { + row.update(0, UTF8String.fromBytes(json.get(i))) + val result = evalExpr.eval(row) + if (result == null) { + // null result → null struct entry + } else { + out.setIndexDefined(i) + val struct = result.asInstanceOf[InternalRow] + var f = 0 + while (f < schema.fields.length) { + writeChild(out.getChildByOrdinal(f), i, schema.fields(f).dataType, struct, f) + f += 1 + } + } + } + i += 1 + } + // Set value count on each child so its validity buffer is sized correctly. + var f = 0 + while (f < schema.fields.length) { + out.getChildByOrdinal(f).setValueCount(n) + f += 1 + } + out.setValueCount(n) + out + } + + private def addChild( + parent: StructVector, + name: String, + dt: DataType, + nullable: Boolean): Unit = { + dt match { + case BooleanType => + parent.addOrGet( + name, + new FieldType(nullable, ArrowType.Bool.INSTANCE, null), + classOf[BitVector]) + case IntegerType => + parent.addOrGet( + name, + new FieldType(nullable, new ArrowType.Int(32, true), null), + classOf[IntVector]) + case LongType => + parent.addOrGet( + name, + new FieldType(nullable, new ArrowType.Int(64, true), null), + classOf[BigIntVector]) + case FloatType => + parent.addOrGet( + name, + new FieldType( + nullable, + new ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE), + null), + classOf[Float4Vector]) + case DoubleType => + parent.addOrGet( + name, + new FieldType( + nullable, + new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE), + null), + classOf[Float8Vector]) + case StringType => + parent.addOrGet( + name, + new FieldType(nullable, ArrowType.Utf8.INSTANCE, null), + classOf[VarCharVector]) + case nested: StructType => + parent.addOrGet( + name, + new FieldType(nullable, ArrowType.Struct.INSTANCE, null), + classOf[StructVector]) + val child = parent.getChild(name).asInstanceOf[StructVector] + nested.fields.foreach(ff => addChild(child, ff.name, ff.dataType, ff.nullable)) + case other => + throw new UnsupportedOperationException(s"FromJsonUDF: unsupported type $other") + } + } + + private def writeChild( + child: ValueVector, + i: Int, + dt: DataType, + struct: InternalRow, + f: Int): Unit = { + if (struct.isNullAt(f)) { + child.asInstanceOf[FieldVector].setNull(i) + return + } + dt match { + case BooleanType => + child.asInstanceOf[BitVector].setSafe(i, if (struct.getBoolean(f)) 1 else 0) + case IntegerType => + child.asInstanceOf[IntVector].setSafe(i, struct.getInt(f)) + case LongType => + child.asInstanceOf[BigIntVector].setSafe(i, struct.getLong(f)) + case FloatType => + child.asInstanceOf[Float4Vector].setSafe(i, struct.getFloat(f)) + case DoubleType => + child.asInstanceOf[Float8Vector].setSafe(i, struct.getDouble(f)) + case StringType => + child.asInstanceOf[VarCharVector].setSafe(i, struct.getUTF8String(f).getBytes) + case nested: StructType => + val sv = child.asInstanceOf[StructVector] + sv.setIndexDefined(i) + val inner = struct.getStruct(f, nested.fields.length) + var ff = 0 + while (ff < nested.fields.length) { + writeChild(sv.getChildByOrdinal(ff), i, nested.fields(ff).dataType, inner, ff) + ff += 1 + } + case other => + throw new UnsupportedOperationException(s"FromJsonUDF: unsupported type $other") + } + } +} diff --git a/common/src/main/scala/org/apache/comet/udf/GetJsonObjectUDF.scala b/common/src/main/scala/org/apache/comet/udf/GetJsonObjectUDF.scala new file mode 100644 index 0000000000..17500d318b --- /dev/null +++ b/common/src/main/scala/org/apache/comet/udf/GetJsonObjectUDF.scala @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.udf + +import java.nio.charset.StandardCharsets + +import org.apache.arrow.vector.{ValueVector, VarCharVector} +import org.apache.spark.sql.catalyst.expressions.{BoundReference, Expression, GenericInternalRow, GetJsonObject, Literal, RuntimeReplaceable} +import org.apache.spark.sql.types.StringType +import org.apache.spark.unsafe.types.UTF8String + +import org.apache.comet.CometArrowAllocator + +/** + * `get_json_object(json, path)` implemented via Spark's `GetJsonObject` expression for byte-exact + * compatibility. Path must be a non-null scalar (enforced by the serde when routing here). + * + * Inputs: + * - inputs(0): VarCharVector json column + * - inputs(1): VarCharVector path (scalar, length-1) + * + * Output: VarCharVector, same length as json input. + * + * A fresh Spark expression is built per `evaluate` call (per batch). Spark's `GetJsonObject` + * holds mutable per-row state in its evaluator, so a shared cross-thread instance is unsafe; the + * JVM UDF framework reuses one UDF instance across native worker threads. + */ +class GetJsonObjectUDF extends CometUDF { + + override def evaluate(inputs: Array[ValueVector]): ValueVector = { + require(inputs.length == 2, s"GetJsonObjectUDF expects 2 inputs, got ${inputs.length}") + val jsonVec = inputs(0).asInstanceOf[VarCharVector] + val pathVec = inputs(1).asInstanceOf[VarCharVector] + require( + pathVec.getValueCount >= 1 && !pathVec.isNull(0), + "GetJsonObjectUDF requires a non-null scalar path") + + val pathStr = new String(pathVec.get(0), StandardCharsets.UTF_8) + val sparkExpr = GetJsonObject( + BoundReference(0, StringType, nullable = true), + Literal(UTF8String.fromString(pathStr), StringType)) + val evalExpr: Expression = sparkExpr match { + case r: RuntimeReplaceable => r.replacement + case other => other + } + + val n = jsonVec.getValueCount + val out = new VarCharVector("get_json_object_result", CometArrowAllocator) + out.allocateNew(n) + + val row = new GenericInternalRow(1) + var i = 0 + while (i < n) { + if (jsonVec.isNull(i)) { + out.setNull(i) + } else { + row.update(0, UTF8String.fromBytes(jsonVec.get(i))) + val result = evalExpr.eval(row) + if (result == null) out.setNull(i) + else out.setSafe(i, result.asInstanceOf[UTF8String].getBytes) + } + i += 1 + } + out.setValueCount(n) + out + } +} diff --git a/common/src/main/scala/org/apache/comet/udf/ToJsonUDF.scala b/common/src/main/scala/org/apache/comet/udf/ToJsonUDF.scala new file mode 100644 index 0000000000..9c7a9a9bb5 --- /dev/null +++ b/common/src/main/scala/org/apache/comet/udf/ToJsonUDF.scala @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.udf + +import java.nio.charset.StandardCharsets + +import org.apache.arrow.vector.{BigIntVector, BitVector, Float4Vector, Float8Vector, IntVector, SmallIntVector, TinyIntVector, ValueVector, VarCharVector} +import org.apache.arrow.vector.complex.StructVector +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{BoundReference, Expression, GenericInternalRow, RuntimeReplaceable, StructsToJson} +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.UTF8String + +import org.apache.comet.CometArrowAllocator + +/** + * `to_json(struct)` implemented via Spark's `StructsToJson` for byte-exact compatibility. + * + * The registered expression is looked up from `CometLambdaRegistry` using a scalar key argument. + * The schema is read from the registered expression's child datatype. The UDF is stateless: every + * call resolves the expression from the registry, so a single UDF instance can be shared across + * native worker threads. + * + * Inputs: + * - inputs(0): StructVector of arbitrary supported schema + * - inputs(1): VarCharVector scalar (length-1) containing the registry key string + * + * Output: VarCharVector of JSON strings. + * + * Supported field types (matching `CometStructsToJson.isSupportedType`): Boolean, Byte, Short, + * Integer, Long, Float, Double, String, plus nested struct of those. + */ +class ToJsonUDF extends CometUDF { + + override def evaluate(inputs: Array[ValueVector]): ValueVector = { + require(inputs.length == 2, s"ToJsonUDF expects 2 inputs (struct, key), got ${inputs.length}") + val struct = inputs(0).asInstanceOf[StructVector] + val keyVec = inputs(1).asInstanceOf[VarCharVector] + require( + keyVec.getValueCount >= 1 && !keyVec.isNull(0), + "ToJsonUDF requires a non-null scalar registry key") + + val key = new String(keyVec.get(0), StandardCharsets.UTF_8) + val configExpr = CometLambdaRegistry.get(key).asInstanceOf[StructsToJson] + val schema = configExpr.child.dataType.asInstanceOf[StructType] + // Build a fresh expression per call: Spark's StructsToJsonEvaluator holds mutable + // per-row state, so a shared cross-thread instance is unsafe (the JVM UDF framework + // reuses one UDF instance across native worker threads). + val sparkExpr = + StructsToJson( + configExpr.options, + BoundReference(0, schema, nullable = true), + configExpr.timeZoneId) + val evalExpr: Expression = sparkExpr match { + case r: RuntimeReplaceable => r.replacement + case other => other + } + + val n = struct.getValueCount + val out = new VarCharVector("to_json_result", CometArrowAllocator) + out.allocateNew(n) + + val row = new GenericInternalRow(1) + var i = 0 + while (i < n) { + if (struct.isNull(i)) { + out.setNull(i) + } else { + val inner = arrowStructRowToSparkRow(struct, i, schema) + row.update(0, inner) + val result = evalExpr.eval(row) + if (result == null) out.setNull(i) + else out.setSafe(i, result.asInstanceOf[UTF8String].getBytes) + } + i += 1 + } + out.setValueCount(n) + out + } + + private def arrowStructRowToSparkRow( + struct: StructVector, + rowIdx: Int, + schema: StructType): InternalRow = { + val values = new Array[Any](schema.fields.length) + var f = 0 + while (f < schema.fields.length) { + val child = struct.getChildByOrdinal(f) + if (child.isNull(rowIdx)) { + values(f) = null + } else { + values(f) = readScalar(child, rowIdx, schema.fields(f).dataType) + } + f += 1 + } + new GenericInternalRow(values) + } + + private def readScalar(v: ValueVector, i: Int, dt: DataType): Any = dt match { + case BooleanType => v.asInstanceOf[BitVector].get(i) == 1 + case ByteType => v.asInstanceOf[TinyIntVector].get(i) + case ShortType => v.asInstanceOf[SmallIntVector].get(i) + case IntegerType => v.asInstanceOf[IntVector].get(i) + case LongType => v.asInstanceOf[BigIntVector].get(i) + case FloatType => v.asInstanceOf[Float4Vector].get(i) + case DoubleType => v.asInstanceOf[Float8Vector].get(i) + case StringType => UTF8String.fromBytes(v.asInstanceOf[VarCharVector].get(i)) + case nested: StructType => + arrowStructRowToSparkRow(v.asInstanceOf[StructVector], i, nested) + case other => + throw new UnsupportedOperationException(s"ToJsonUDF: unsupported type $other") + } +} diff --git a/docs/source/user-guide/latest/compatibility/json.md b/docs/source/user-guide/latest/compatibility/json.md new file mode 100644 index 0000000000..cbdb589fe2 --- /dev/null +++ b/docs/source/user-guide/latest/compatibility/json.md @@ -0,0 +1,43 @@ + + +# JSON Compatibility + +Comet supports two engines for evaluating JSON expressions, selected by the +`spark.comet.exec.json.engine` configuration entry: + +- `rust` (default): native DataFusion implementation. Fast, but has known + compatibility gaps with Spark on certain inputs. +- `java` (experimental): routes evaluation through a JVM UDF that delegates to + Spark's own expression classes for byte-exact compatibility, at the cost of a + JNI roundtrip per batch. + +## Expression coverage + +| SQL | `rust` engine | `java` engine | +| ----------------- | ---------------------------------------------------------------------------- | ------------- | +| `get_json_object` | Supported, with gaps on single-quoted JSON and unescaped control characters | Compatible | +| `from_json` | Supported with restrictions (PERMISSIVE mode only, simple schema types only) | Compatible | +| `to_json` | Supported for struct inputs only, no options | Compatible | + +## When to use the `java` engine + +- You hit a compatibility gap in the `rust` engine and need exact Spark output. +- You can absorb the JNI overhead. Typically negligible relative to JSON parse + cost, but verify with your own benchmarks. diff --git a/docs/source/user-guide/latest/expressions.md b/docs/source/user-guide/latest/expressions.md index 3842148a43..3843c6b7ba 100644 --- a/docs/source/user-guide/latest/expressions.md +++ b/docs/source/user-guide/latest/expressions.md @@ -88,9 +88,11 @@ of expressions that be disabled. ## JSON Functions -| Expression | -| ------------- | -| GetJsonObject | +| Expression | SQL | +| ------------- | ----------------- | +| GetJsonObject | `get_json_object` | +| JsonToStructs | `from_json` | +| StructsToJson | `to_json` | ## Date/Time Functions diff --git a/spark/src/main/scala/org/apache/comet/serde/strings.scala b/spark/src/main/scala/org/apache/comet/serde/strings.scala index 968fe8cd69..159bf0cbdb 100644 --- a/spark/src/main/scala/org/apache/comet/serde/strings.scala +++ b/spark/src/main/scala/org/apache/comet/serde/strings.scala @@ -29,7 +29,7 @@ import org.apache.comet.CometConf import org.apache.comet.CometSparkSessionExtensions.withInfo import org.apache.comet.expressions.{CometCast, CometEvalMode, RegExp} import org.apache.comet.serde.ExprOuterClass.Expr -import org.apache.comet.serde.QueryPlanSerde.{createBinaryExpr, exprToProtoInternal, optExprWithInfo, scalarFunctionExprToProto, scalarFunctionExprToProtoWithReturnType} +import org.apache.comet.serde.QueryPlanSerde.{createBinaryExpr, exprToProtoInternal, optExprWithInfo, scalarFunctionExprToProto, scalarFunctionExprToProtoWithReturnType, serializeDataType} object CometStringRepeat extends CometExpressionSerde[StringRepeat] { @@ -429,27 +429,73 @@ object CometGetJsonObject extends CometExpressionSerde[GetJsonObject] { override def getIncompatibleReasons(): Seq[String] = Seq( "Spark allows single-quoted JSON and unescaped control characters which Comet does not" + - " support") - - override def getSupportLevel(expr: GetJsonObject): SupportLevel = - Incompatible( - Some( - "Spark allows single-quoted JSON and unescaped control characters " + - "which Comet does not support")) + " support when using engine=rust") + + override def getSupportLevel(expr: GetJsonObject): SupportLevel = { + if (CometConf.COMET_JSON_ENGINE.get() == CometConf.JSON_ENGINE_JAVA) { + expr.path match { + case _: Literal => Compatible(None) + case _ => Unsupported(Some("Only scalar paths are supported")) + } + } else { + Incompatible( + Some( + "Spark allows single-quoted JSON and unescaped control characters " + + "which Comet does not support")) + } + } override def convert( expr: GetJsonObject, inputs: Seq[Attribute], binding: Boolean): Option[Expr] = { - val jsonExpr = exprToProtoInternal(expr.json, inputs, binding) - val pathExpr = exprToProtoInternal(expr.path, inputs, binding) - val optExpr = scalarFunctionExprToProtoWithReturnType( - "get_json_object", - expr.dataType, - false, - jsonExpr, - pathExpr) - optExprWithInfo(optExpr, expr, expr.json, expr.path) + if (CometConf.COMET_JSON_ENGINE.get() == CometConf.JSON_ENGINE_JAVA) { + convertViaJvmUdf(expr, inputs, binding) + } else { + val jsonExpr = exprToProtoInternal(expr.json, inputs, binding) + val pathExpr = exprToProtoInternal(expr.path, inputs, binding) + val optExpr = scalarFunctionExprToProtoWithReturnType( + "get_json_object", + expr.dataType, + false, + jsonExpr, + pathExpr) + optExprWithInfo(optExpr, expr, expr.json, expr.path) + } + } + + private def convertViaJvmUdf( + expr: GetJsonObject, + inputs: Seq[Attribute], + binding: Boolean): Option[Expr] = { + expr.path match { + case Literal(value, DataTypes.StringType) => + if (value == null) { + withInfo(expr, "Null literal path is handled by Spark fallback") + return None + } + val jsonProto = exprToProtoInternal(expr.json, inputs, binding) + val pathProto = exprToProtoInternal(expr.path, inputs, binding) + if (jsonProto.isEmpty || pathProto.isEmpty) { + return None + } + val returnType = serializeDataType(DataTypes.StringType).getOrElse(return None) + val udfBuilder = ExprOuterClass.JvmScalarUdf + .newBuilder() + .setClassName("org.apache.comet.udf.GetJsonObjectUDF") + .addArgs(jsonProto.get) + .addArgs(pathProto.get) + .setReturnType(returnType) + .setReturnNullable(expr.nullable) + Some( + ExprOuterClass.Expr + .newBuilder() + .setJvmScalarUdf(udfBuilder.build()) + .build()) + case _ => + withInfo(expr, "Only scalar paths are supported") + None + } } } diff --git a/spark/src/main/scala/org/apache/comet/serde/structs.scala b/spark/src/main/scala/org/apache/comet/serde/structs.scala index 688d27f91f..25522be4c9 100644 --- a/spark/src/main/scala/org/apache/comet/serde/structs.scala +++ b/spark/src/main/scala/org/apache/comet/serde/structs.scala @@ -22,12 +22,15 @@ package org.apache.comet.serde import scala.jdk.CollectionConverters._ import scala.util.Try -import org.apache.spark.sql.catalyst.expressions.{Attribute, CreateNamedStruct, GetArrayStructFields, GetStructField, JsonToStructs, StructsToCsv, StructsToJson} +import org.apache.spark.sql.catalyst.expressions.{Attribute, BoundReference, CreateNamedStruct, GetArrayStructFields, GetStructField, JsonToStructs, Literal, StructsToCsv, StructsToJson} import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.UTF8String +import org.apache.comet.CometConf import org.apache.comet.CometSparkSessionExtensions.withInfo import org.apache.comet.DataTypeSupport import org.apache.comet.serde.QueryPlanSerde.{exprToProtoInternal, serializeDataType} +import org.apache.comet.udf.CometLambdaRegistry object CometCreateNamedStruct extends CometExpressionSerde[CreateNamedStruct] { override def convert( @@ -109,11 +112,21 @@ object CometStructsToJson extends CometExpressionSerde[StructsToJson] { "Does not support `+Infinity` and `-Infinity` for numeric types (float, double)." + " (https://github.com/apache/datafusion-comet/issues/3016)") - override def getSupportLevel(expr: StructsToJson): SupportLevel = - Incompatible( - Some( - "Does not support Infinity/-Infinity for numeric types" + - " (https://github.com/apache/datafusion-comet/issues/3016)")) + override def getSupportLevel(expr: StructsToJson): SupportLevel = { + if (CometConf.COMET_JSON_ENGINE.get() == CometConf.JSON_ENGINE_JAVA) { + expr.child.dataType match { + case s: StructType if s.fields.forall(f => isSupportedType(f.dataType)) => + Compatible(None) + case _ => + Unsupported(Some("to_json: only StructType with supported fields is supported")) + } + } else { + Incompatible( + Some( + "Does not support Infinity/-Infinity for numeric types" + + " (https://github.com/apache/datafusion-comet/issues/3016)")) + } + } override def convert( expr: StructsToJson, @@ -121,41 +134,82 @@ object CometStructsToJson extends CometExpressionSerde[StructsToJson] { binding: Boolean): Option[ExprOuterClass.Expr] = { if (expr.options.nonEmpty) { withInfo(expr, "StructsToJson with options is not supported") - None - } else { - val isSupported = expr.child.dataType match { - case s: StructType => - s.fields.forall(f => isSupportedType(f.dataType)) - case _: MapType | _: ArrayType => - // Spark supports map and array in StructsToJson but this is not yet - // implemented in Comet - false + return None + } + if (CometConf.COMET_JSON_ENGINE.get() == CometConf.JSON_ENGINE_JAVA) { + return convertViaJvmUdf(expr, inputs, binding) + } + val isSupported = expr.child.dataType match { + case s: StructType => + s.fields.forall(f => isSupportedType(f.dataType)) + case _: MapType | _: ArrayType => + // Spark supports map and array in StructsToJson but this is not yet + // implemented in Comet + false + case _ => + false + } + + if (isSupported) { + exprToProtoInternal(expr.child, inputs, binding) match { + case Some(p) => + val toJson = ExprOuterClass.ToJson + .newBuilder() + .setChild(p) + .setTimezone(expr.timeZoneId.getOrElse("UTC")) + .setIgnoreNullFields(true) + .build() + Some( + ExprOuterClass.Expr + .newBuilder() + .setToJson(toJson) + .build()) case _ => - false + withInfo(expr, expr.child) + None } + } else { + withInfo(expr, "Unsupported data type", expr.child) + None + } + } - if (isSupported) { - exprToProtoInternal(expr.child, inputs, binding) match { - case Some(p) => - val toJson = ExprOuterClass.ToJson - .newBuilder() - .setChild(p) - .setTimezone(expr.timeZoneId.getOrElse("UTC")) - .setIgnoreNullFields(true) - .build() - Some( - ExprOuterClass.Expr - .newBuilder() - .setToJson(toJson) - .build()) - case _ => - withInfo(expr, expr.child) - None - } - } else { - withInfo(expr, "Unsupported data type", expr.child) + private def convertViaJvmUdf( + expr: StructsToJson, + inputs: Seq[Attribute], + binding: Boolean): Option[ExprOuterClass.Expr] = { + expr.child.dataType match { + case s: StructType if s.fields.forall(f => isSupportedType(f.dataType)) => + val childProto = exprToProtoInternal(expr.child, inputs, binding) + if (childProto.isEmpty) return None + // TODO follow-up: the registry retains expressions for the JVM lifetime and + // does not propagate across executors. This is acceptable for the experimental + // engine=java mode in local Spark; cluster runs require encoding the + // schema/timezone directly in the JvmScalarUdf args. + // Rebind the child so the UDF can call eval(row) against a single-column + // wrapper InternalRow without hitting an unevaluable AttributeReference. + val rebound = + expr.copy(child = BoundReference(0, expr.child.dataType, expr.child.nullable)) + val key = CometLambdaRegistry.register(rebound) + val keyProto = + exprToProtoInternal(Literal(UTF8String.fromString(key), StringType), inputs, binding) + if (keyProto.isEmpty) return None + val returnType = serializeDataType(DataTypes.StringType).getOrElse(return None) + val udfBuilder = ExprOuterClass.JvmScalarUdf + .newBuilder() + .setClassName("org.apache.comet.udf.ToJsonUDF") + .addArgs(childProto.get) + .addArgs(keyProto.get) + .setReturnType(returnType) + .setReturnNullable(expr.nullable) + Some( + ExprOuterClass.Expr + .newBuilder() + .setJvmScalarUdf(udfBuilder.build()) + .build()) + case _ => + withInfo(expr, "to_json: only StructType with supported fields is supported") None - } } } @@ -187,8 +241,15 @@ object CometJsonToStructs extends CometExpressionSerde[JsonToStructs] { override def getUnsupportedReasons(): Seq[String] = Seq("Requires an explicit schema") override def getSupportLevel(expr: JsonToStructs): SupportLevel = { - // this feature is partially implemented and not comprehensively tested yet - Incompatible() + if (CometConf.COMET_JSON_ENGINE.get() == CometConf.JSON_ENGINE_JAVA) { + expr.schema match { + case s: StructType if isSupportedSchema(s) => Compatible(None) + case _ => Unsupported(Some("from_json: only StructType schemas are supported")) + } + } else { + // this feature is partially implemented and not comprehensively tested yet + Incompatible() + } } override def convert( @@ -201,19 +262,11 @@ object CometJsonToStructs extends CometExpressionSerde[JsonToStructs] { return None } - def isSupportedType(dt: DataType): Boolean = { - dt match { - case StructType(fields) => - fields.nonEmpty && fields.forall(f => isSupportedType(f.dataType)) - case DataTypes.IntegerType | DataTypes.LongType | DataTypes.FloatType | - DataTypes.DoubleType | DataTypes.BooleanType | DataTypes.StringType => - true - case _ => false - } + if (CometConf.COMET_JSON_ENGINE.get() == CometConf.JSON_ENGINE_JAVA) { + return convertViaJvmUdf(expr, inputs, binding) } - val schemaType = expr.schema - if (!isSupportedType(schemaType)) { + if (!isSupportedSchema(expr.schema)) { withInfo(expr, "from_json: Unsupported schema type") return None } @@ -235,7 +288,7 @@ object CometJsonToStructs extends CometExpressionSerde[JsonToStructs] { // Convert child expression and schema to protobuf for { childProto <- exprToProtoInternal(expr.child, inputs, binding) - schemaProto <- serializeDataType(schemaType) + schemaProto <- serializeDataType(expr.schema) } yield { val fromJson = ExprOuterClass.FromJson .newBuilder() @@ -246,6 +299,54 @@ object CometJsonToStructs extends CometExpressionSerde[JsonToStructs] { ExprOuterClass.Expr.newBuilder().setFromJson(fromJson).build() } } + + private def isSupportedSchema(dt: DataType): Boolean = dt match { + case StructType(fields) => + fields.nonEmpty && fields.forall(f => isSupportedSchema(f.dataType)) + case DataTypes.IntegerType | DataTypes.LongType | DataTypes.FloatType | DataTypes.DoubleType | + DataTypes.BooleanType | DataTypes.StringType => + true + case _ => false + } + + private def convertViaJvmUdf( + expr: JsonToStructs, + inputs: Seq[Attribute], + binding: Boolean): Option[ExprOuterClass.Expr] = { + expr.schema match { + case s: StructType if isSupportedSchema(s) => + val childProto = exprToProtoInternal(expr.child, inputs, binding) + if (childProto.isEmpty) return None + // TODO follow-up: the registry retains expressions for the JVM lifetime and + // does not propagate across executors. This is acceptable for the experimental + // engine=java mode in local Spark; cluster runs require encoding the + // schema/timezone directly in the JvmScalarUdf args. + // Rebind the child so the UDF can call eval(row) against a single-column + // wrapper InternalRow without hitting an unevaluable AttributeReference. + val rebound = + expr.copy(child = BoundReference(0, expr.child.dataType, expr.child.nullable)) + val key = CometLambdaRegistry.register(rebound) + val keyProto = + exprToProtoInternal(Literal(UTF8String.fromString(key), StringType), inputs, binding) + if (keyProto.isEmpty) return None + val returnType = serializeDataType(expr.schema).getOrElse(return None) + val udfBuilder = ExprOuterClass.JvmScalarUdf + .newBuilder() + .setClassName("org.apache.comet.udf.FromJsonUDF") + .addArgs(childProto.get) + .addArgs(keyProto.get) + .setReturnType(returnType) + .setReturnNullable(expr.nullable) + Some( + ExprOuterClass.Expr + .newBuilder() + .setJvmScalarUdf(udfBuilder.build()) + .build()) + case _ => + withInfo(expr, "from_json: only StructType schemas are supported") + None + } + } } object CometStructsToCsv extends CometExpressionSerde[StructsToCsv] { diff --git a/spark/src/test/scala/org/apache/comet/CometJsonJvmSuite.scala b/spark/src/test/scala/org/apache/comet/CometJsonJvmSuite.scala new file mode 100644 index 0000000000..5d20b718c8 --- /dev/null +++ b/spark/src/test/scala/org/apache/comet/CometJsonJvmSuite.scala @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet + +import org.apache.spark.SparkConf +import org.apache.spark.sql.CometTestBase +import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper + +class CometJsonJvmSuite extends CometTestBase with AdaptiveSparkPlanHelper { + + override protected def sparkConf: SparkConf = + super.sparkConf.set(CometConf.COMET_JSON_ENGINE.key, CometConf.JSON_ENGINE_JAVA) + + private val rows = Seq( + """{"a":1,"b":"x","arr":[1,2,3]}""", + """{"a":2,"b":"y","arr":[]}""", + """{"a":null,"b":"z","arr":[10]}""", + null, + """not json""") + + private def withJsonTable(f: => Unit): Unit = { + withTable("t") { + sql("CREATE TABLE t (j STRING) USING parquet") + val sqlRows = rows + .map(v => if (v == null) "(NULL)" else s"('${v.replace("'", "''")}')") + .mkString(", ") + sql(s"INSERT INTO t VALUES $sqlRows") + f + } + } + + test("get_json_object via JVM engine") { + withJsonTable { + checkSparkAnswerAndOperator(sql("SELECT get_json_object(j, '$.a') FROM t")) + checkSparkAnswerAndOperator(sql("SELECT get_json_object(j, '$.b') FROM t")) + } + } + + test("from_json with explicit schema via JVM engine") { + withJsonTable { + checkSparkAnswerAndOperator(sql("SELECT from_json(j, 'a INT, b STRING') FROM t")) + } + } + + test("to_json round-trip via JVM engine") { + withJsonTable { + checkSparkAnswerAndOperator(sql("SELECT to_json(from_json(j, 'a INT, b STRING')) FROM t")) + } + } +} diff --git a/spark/src/test/scala/org/apache/comet/udf/FromJsonUDFSuite.scala b/spark/src/test/scala/org/apache/comet/udf/FromJsonUDFSuite.scala new file mode 100644 index 0000000000..292f6d96e5 --- /dev/null +++ b/spark/src/test/scala/org/apache/comet/udf/FromJsonUDFSuite.scala @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.udf + +import java.nio.charset.StandardCharsets + +import org.scalatest.funsuite.AnyFunSuite + +import org.apache.arrow.vector.{IntVector, VarCharVector} +import org.apache.arrow.vector.complex.StructVector +import org.apache.spark.sql.catalyst.expressions.{BoundReference, JsonToStructs} +import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} + +import org.apache.comet.CometArrowAllocator + +class FromJsonUDFSuite extends AnyFunSuite { + + private def stringVec(name: String, values: Seq[Option[String]]): VarCharVector = { + val v = new VarCharVector(name, CometArrowAllocator) + v.allocateNew(values.size) + values.zipWithIndex.foreach { + case (Some(s), i) => v.setSafe(i, s.getBytes(StandardCharsets.UTF_8)) + case (None, i) => v.setNull(i) + } + v.setValueCount(values.size) + v + } + + test("parse struct from JSON strings") { + val schema = StructType( + Seq( + StructField("a", IntegerType, nullable = true), + StructField("b", StringType, nullable = true))) + val sparkExpr = JsonToStructs( + schema, + Map.empty[String, String], + BoundReference(0, StringType, nullable = true), + Some("UTC")) + val key = CometLambdaRegistry.register(sparkExpr) + try { + val udf = new FromJsonUDF + val json = stringVec( + "json", + Seq(Some("""{"a":1,"b":"x"}"""), Some("""{"a":null,"b":"y"}"""), None, Some("bad"))) + val keyVec = stringVec("key", Seq(Some(key))) + try { + val out = udf.evaluate(Array(json, keyVec)).asInstanceOf[StructVector] + try { + assert(out.getValueCount == 4) + val aVec = out.getChildByOrdinal(0).asInstanceOf[IntVector] + val bVec = out.getChildByOrdinal(1).asInstanceOf[VarCharVector] + // Row 0: a=1, b="x" + assert(!out.isNull(0)) + assert(aVec.get(0) == 1) + assert(new String(bVec.get(0), StandardCharsets.UTF_8) == "x") + // Row 1: a=null, b="y" + assert(!out.isNull(1)) + assert(aVec.isNull(1)) + assert(new String(bVec.get(1), StandardCharsets.UTF_8) == "y") + // Row 2: input was null + assert(out.isNull(2)) + // Row 3: malformed JSON, PERMISSIVE → struct of all nulls + assert(!out.isNull(3)) + assert(aVec.isNull(3)) + assert(bVec.isNull(3)) + } finally out.close() + } finally { json.close(); keyVec.close() } + } finally CometLambdaRegistry.remove(key) + } + + test("empty input vector produces empty output struct") { + val schema = StructType(Seq(StructField("a", IntegerType, nullable = true))) + val sparkExpr = JsonToStructs( + schema, + Map.empty[String, String], + BoundReference(0, StringType, nullable = true), + Some("UTC")) + val key = CometLambdaRegistry.register(sparkExpr) + try { + val udf = new FromJsonUDF + val json = stringVec("json", Seq.empty) + val keyVec = stringVec("key", Seq(Some(key))) + try { + val out = udf.evaluate(Array(json, keyVec)).asInstanceOf[StructVector] + try assert(out.getValueCount == 0) + finally out.close() + } finally { json.close(); keyVec.close() } + } finally CometLambdaRegistry.remove(key) + } +} diff --git a/spark/src/test/scala/org/apache/comet/udf/GetJsonObjectUDFSuite.scala b/spark/src/test/scala/org/apache/comet/udf/GetJsonObjectUDFSuite.scala new file mode 100644 index 0000000000..fd1c21dcd1 --- /dev/null +++ b/spark/src/test/scala/org/apache/comet/udf/GetJsonObjectUDFSuite.scala @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.udf + +import java.nio.charset.StandardCharsets + +import org.scalatest.funsuite.AnyFunSuite + +import org.apache.arrow.vector.VarCharVector + +import org.apache.comet.CometArrowAllocator + +class GetJsonObjectUDFSuite extends AnyFunSuite { + + private def stringVec(name: String, values: Seq[Option[String]]): VarCharVector = { + val v = new VarCharVector(name, CometArrowAllocator) + v.allocateNew(values.size) + values.zipWithIndex.foreach { + case (Some(s), i) => v.setSafe(i, s.getBytes(StandardCharsets.UTF_8)) + case (None, i) => v.setNull(i) + } + v.setValueCount(values.size) + v + } + + private def get(out: VarCharVector, i: Int): Option[String] = + if (out.isNull(i)) None else Some(new String(out.get(i), StandardCharsets.UTF_8)) + + test("extract values using a scalar path") { + val udf = new GetJsonObjectUDF + val json = stringVec( + "json", + Seq(Some("""{"a":{"b":42}}"""), Some("""{"a":{"b":7}}"""), None, Some("garbage"))) + val path = stringVec("path", Seq(Some("$.a.b"))) + try { + val out = udf.evaluate(Array(json, path)).asInstanceOf[VarCharVector] + try { + assert(out.getValueCount == 4) + assert(get(out, 0) == Some("42")) + assert(get(out, 1) == Some("7")) + assert(get(out, 2).isEmpty) + assert(get(out, 3).isEmpty) + } finally out.close() + } finally { + json.close(); path.close() + } + } + + test("empty input vector produces empty output") { + val udf = new GetJsonObjectUDF + val json = stringVec("json", Seq.empty) + val path = stringVec("path", Seq(Some("$.a"))) + try { + val out = udf.evaluate(Array(json, path)).asInstanceOf[VarCharVector] + try assert(out.getValueCount == 0) + finally out.close() + } finally { + json.close(); path.close() + } + } + + test("different scalar paths across separate evaluations share the same UDF instance") { + val udf = new GetJsonObjectUDF + val json = stringVec("json", Seq(Some("""{"a":1,"b":2}"""))) + val pathA = stringVec("pathA", Seq(Some("$.a"))) + val pathB = stringVec("pathB", Seq(Some("$.b"))) + try { + val outA = udf.evaluate(Array(json, pathA)).asInstanceOf[VarCharVector] + try assert(get(outA, 0) == Some("1")) + finally outA.close() + val json2 = stringVec("json2", Seq(Some("""{"a":1,"b":2}"""))) + try { + val outB = udf.evaluate(Array(json2, pathB)).asInstanceOf[VarCharVector] + try assert(get(outB, 0) == Some("2")) + finally outB.close() + } finally json2.close() + } finally { + json.close(); pathA.close(); pathB.close() + } + } +} diff --git a/spark/src/test/scala/org/apache/comet/udf/ToJsonUDFSuite.scala b/spark/src/test/scala/org/apache/comet/udf/ToJsonUDFSuite.scala new file mode 100644 index 0000000000..389e74c5e2 --- /dev/null +++ b/spark/src/test/scala/org/apache/comet/udf/ToJsonUDFSuite.scala @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.udf + +import java.nio.charset.StandardCharsets + +import org.scalatest.funsuite.AnyFunSuite + +import org.apache.arrow.vector.{IntVector, VarCharVector} +import org.apache.arrow.vector.complex.StructVector +import org.apache.arrow.vector.types.pojo.{ArrowType, FieldType} +import org.apache.spark.sql.catalyst.expressions.{BoundReference, StructsToJson} +import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} + +import org.apache.comet.CometArrowAllocator + +class ToJsonUDFSuite extends AnyFunSuite { + + private def stringVec(name: String, values: Seq[Option[String]]): VarCharVector = { + val v = new VarCharVector(name, CometArrowAllocator) + v.allocateNew(values.size) + values.zipWithIndex.foreach { + case (Some(s), i) => v.setSafe(i, s.getBytes(StandardCharsets.UTF_8)) + case (None, i) => v.setNull(i) + } + v.setValueCount(values.size) + v + } + + test("serialize struct rows to JSON") { + val schema = StructType( + Seq( + StructField("a", IntegerType, nullable = true), + StructField("b", StringType, nullable = true))) + val sparkExpr = StructsToJson( + Map.empty[String, String], + BoundReference(0, schema, nullable = true), + Some("UTC")) + val key = CometLambdaRegistry.register(sparkExpr) + try { + val udf = new ToJsonUDF + + val struct = StructVector.empty("in", CometArrowAllocator) + val aVec = + struct.addOrGet( + "a", + new FieldType(true, new ArrowType.Int(32, true), null), + classOf[IntVector]) + val bVec = + struct.addOrGet( + "b", + new FieldType(true, ArrowType.Utf8.INSTANCE, null), + classOf[VarCharVector]) + struct.allocateNew() + + aVec.setSafe(0, 1); bVec.setSafe(0, "x".getBytes(StandardCharsets.UTF_8)) + aVec.setNull(1); bVec.setSafe(1, "y".getBytes(StandardCharsets.UTF_8)) + struct.setIndexDefined(0) + struct.setIndexDefined(1) + aVec.setValueCount(2); bVec.setValueCount(2); struct.setValueCount(2) + + val keyVec = stringVec("key", Seq(Some(key))) + try { + val out = udf.evaluate(Array(struct, keyVec)).asInstanceOf[VarCharVector] + try { + assert(out.getValueCount == 2) + assert(new String(out.get(0), StandardCharsets.UTF_8) == """{"a":1,"b":"x"}""") + // Spark drops null fields by default; the second row's `a` is null, + // so `to_json` produces only `b`. + assert(new String(out.get(1), StandardCharsets.UTF_8) == """{"b":"y"}""") + } finally out.close() + } finally { struct.close(); keyVec.close() } + } finally CometLambdaRegistry.remove(key) + } + + test("empty input vector produces empty output") { + val schema = StructType(Seq(StructField("a", IntegerType, nullable = true))) + val sparkExpr = StructsToJson( + Map.empty[String, String], + BoundReference(0, schema, nullable = true), + Some("UTC")) + val key = CometLambdaRegistry.register(sparkExpr) + try { + val udf = new ToJsonUDF + val struct = StructVector.empty("in", CometArrowAllocator) + struct.addOrGet( + "a", + new FieldType(true, new ArrowType.Int(32, true), null), + classOf[IntVector]) + struct.allocateNew() + struct.setValueCount(0) + val keyVec = stringVec("key", Seq(Some(key))) + try { + val out = udf.evaluate(Array(struct, keyVec)).asInstanceOf[VarCharVector] + try assert(out.getValueCount == 0) + finally out.close() + } finally { struct.close(); keyVec.close() } + } finally CometLambdaRegistry.remove(key) + } +} From abca85b5f13def142716eb57ebb8942c4246d882 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 26 May 2026 16:39:18 -0600 Subject: [PATCH 2/4] refactor: route JSON expressions through codegen dispatcher instead of hand-written UDFs Replace the three hand-written `GetJsonObjectUDF` / `FromJsonUDF` / `ToJsonUDF` JVM UDF implementations and the `CometLambdaRegistry` indirection with the Arrow-direct codegen dispatcher introduced in PR #4417 (`CometScalaUDF.emitJvmCodegenDispatch`). The dispatcher Janino-compiles Spark's own `doGenCode` (or `eval(row)` for CodegenFallback expressions) so the JSON family inherits Spark-identical semantics with no per-expression glue. Changes: - Delete the three hand-written UDF files under `spark/src/main/scala/org/apache/comet/udf/` and their unit-test suites. The codegen dispatcher's per-task `kernelCache` provides the same per-thread isolation that `CometLambdaRegistry` was working around. - Rewrite the JSON serdes (`CometGetJsonObject` in `strings.scala`, `CometStructsToJson` and `CometJsonToStructs` in `structs.scala`) to go through a new `JsonRoute` helper. `engine=rust` keeps the native path; `engine=java` delegates to `CometScalaUDF.emitJvmCodegenDispatch` when `spark.comet.exec.scalaUDF.codegen.enabled=true`. - Generalize the codegen dispatcher to accept `CodegenFallback` expressions. `CodegenFallback.doGenCode` emits `references[N].eval(row)`, the same shape the `HigherOrderFunction` carve-out already relied on; lifting the rejection lets `JsonToStructs` and `StructsToJson` (which are `CodegenFallback` in Spark 4) ride the same path. - Unwrap `RuntimeReplaceable` expressions inside `CometScalaUDF.emitJvmCodegenDispatch` before binding. Spark 4's `StructsToJson` is `RuntimeReplaceable` and its `doGenCode` throws "Cannot generate code for expression"; calling `.replacement` gives the `Invoke(StructsToJsonEvaluator, ...)` form that does codegen. - Update the JSON compatibility doc and the `CometJsonJvmSuite` config to reference the codegen flag. Test plan: - `CometJsonJvmSuite`: 3/3 pass (get_json_object, from_json, to_json round-trip via the codegen dispatcher). - `CometJsonExpressionSuite`: 8/8 pass on the unchanged native path. - `CometStringExpressionSuite`: 33/33, `CometCodegenSuite`: 60/60, `CometCodegenSourceSuite`: 50/50, `CometSqlFileTestSuite`: 284/284. - `cargo clippy --all-targets --workspace -- -D warnings`: clean. --- .../user-guide/latest/compatibility/json.md | 8 +- .../codegen/CometBatchKernelCodegen.scala | 26 +- .../apache/comet/serde/CometScalaUDF.scala | 16 +- .../org/apache/comet/serde/strings.scala | 80 ++--- .../org/apache/comet/serde/structs.scala | 306 ++++++++---------- .../org/apache/comet/udf/FromJsonUDF.scala | 207 ------------ .../apache/comet/udf/GetJsonObjectUDF.scala | 84 ----- .../org/apache/comet/udf/ToJsonUDF.scala | 130 -------- .../comet/CometCodegenSourceSuite.scala | 11 +- .../org/apache/comet/CometJsonJvmSuite.scala | 4 +- .../apache/comet/udf/FromJsonUDFSuite.scala | 107 ------ .../comet/udf/GetJsonObjectUDFSuite.scala | 98 ------ .../org/apache/comet/udf/ToJsonUDFSuite.scala | 117 ------- 13 files changed, 204 insertions(+), 990 deletions(-) delete mode 100644 spark/src/main/scala/org/apache/comet/udf/FromJsonUDF.scala delete mode 100644 spark/src/main/scala/org/apache/comet/udf/GetJsonObjectUDF.scala delete mode 100644 spark/src/main/scala/org/apache/comet/udf/ToJsonUDF.scala delete mode 100644 spark/src/test/scala/org/apache/comet/udf/FromJsonUDFSuite.scala delete mode 100644 spark/src/test/scala/org/apache/comet/udf/GetJsonObjectUDFSuite.scala delete mode 100644 spark/src/test/scala/org/apache/comet/udf/ToJsonUDFSuite.scala diff --git a/docs/source/user-guide/latest/compatibility/json.md b/docs/source/user-guide/latest/compatibility/json.md index cbdb589fe2..152f9c4169 100644 --- a/docs/source/user-guide/latest/compatibility/json.md +++ b/docs/source/user-guide/latest/compatibility/json.md @@ -24,9 +24,11 @@ Comet supports two engines for evaluating JSON expressions, selected by the - `rust` (default): native DataFusion implementation. Fast, but has known compatibility gaps with Spark on certain inputs. -- `java` (experimental): routes evaluation through a JVM UDF that delegates to - Spark's own expression classes for byte-exact compatibility, at the cost of a - JNI roundtrip per batch. +- `java` (experimental): routes evaluation through Comet's Arrow-direct codegen + dispatcher so Spark's own `doGenCode` for the expression runs inside the Comet + pipeline. Byte-exact compatibility, at the cost of a JNI roundtrip per batch. + Requires `spark.comet.exec.scalaUDF.codegen.enabled=true`; otherwise the + operator falls back to Spark. ## Expression coverage diff --git a/spark/src/main/scala/org/apache/comet/codegen/CometBatchKernelCodegen.scala b/spark/src/main/scala/org/apache/comet/codegen/CometBatchKernelCodegen.scala index 042fd9ced3..0b2d430fc9 100644 --- a/spark/src/main/scala/org/apache/comet/codegen/CometBatchKernelCodegen.scala +++ b/spark/src/main/scala/org/apache/comet/codegen/CometBatchKernelCodegen.scala @@ -107,9 +107,8 @@ object CometBatchKernelCodegen extends Logging with CometExprTraitShim { * rather than crashing the Janino compile at execute time. * * Checks every `BoundReference`'s data type and the root `expr.dataType` against - * [[isSupportedDataType]], rejects aggregates / generators / `CodegenFallback` (other than - * HOFs, which are admitted), and gates total nested-field count on - * `spark.sql.codegen.maxFields`. + * [[isSupportedDataType]], rejects aggregates / generators / `Unevaluable`, and gates total + * nested-field count on `spark.sql.codegen.maxFields`. */ def canHandle(boundExpr: Expression): Option[String] = { if (!isSupportedDataType(boundExpr.dataType)) { @@ -127,12 +126,15 @@ object CometBatchKernelCodegen extends Logging with CometExprTraitShim { s"codegen dispatch: too many nested fields ($totalFields > " + s"spark.sql.codegen.maxFields=$maxFields)") } - // HOFs are `CodegenFallback` but admitted: `CodegenFallback.doGenCode` emits one - // `((Expression) references[N]).eval(row)` call site per HOF. The kernel dispatches to the - // HOF's interpreted `eval`, which mutates `NamedLambdaVariable.value` per element and reads - // the input array through the kernel's typed Arrow getters. Per-task `boundExpr` isolation - // in `CometScalaUDFCodegen.kernelCache` prevents concurrent partitions from racing on the - // lambda variable's `AtomicReference`. See `CometCodegenHOFSuite`. + // `CodegenFallback` expressions are admitted. `CodegenFallback.doGenCode` emits one + // `((Expression) references[N]).eval(row)` call site per expression. The kernel dispatches + // to the expression's interpreted `eval` against `row` aliased to `this`, so the eval reads + // through the kernel's typed Arrow getters. This covers `HigherOrderFunction` (which mutates + // `NamedLambdaVariable.value` per element; see `CometCodegenHOFSuite`) as well as other + // CodegenFallback expressions like `JsonToStructs` / `StructsToJson` whose `eval(row)` + // simply calls `row.get(0, dataType)`. Per-task `boundExpr` isolation in + // `CometScalaUDFCodegen.kernelCache` prevents concurrent partitions from racing on shared + // state inside the expression. // // Nondeterministic / stateful expressions are accepted: each cache entry holds one kernel // instance with a single `init(partitionIndex)` call, so `Rand` / `MonotonicallyIncreasingID` @@ -150,10 +152,6 @@ object CometBatchKernelCodegen extends Logging with CometExprTraitShim { boundExpr.find { case _: org.apache.spark.sql.catalyst.expressions.aggregate.AggregateFunction => true case _: org.apache.spark.sql.catalyst.expressions.Generator => true - case _: HigherOrderFunction => false - case _: LambdaFunction => false - case _: NamedLambdaVariable => false - case _: CodegenFallback => true case u: Unevaluable if isCodegenInertUnevaluable(u) => false case _: Unevaluable => true case _ => false @@ -161,7 +159,7 @@ object CometBatchKernelCodegen extends Logging with CometExprTraitShim { case Some(bad) => return Some( s"codegen dispatch: expression ${bad.getClass.getSimpleName} not supported " + - "(aggregate, generator, codegen-fallback, or unevaluable)") + "(aggregate, generator, or unevaluable)") case None => } val badRef = boundExpr.collectFirst { diff --git a/spark/src/main/scala/org/apache/comet/serde/CometScalaUDF.scala b/spark/src/main/scala/org/apache/comet/serde/CometScalaUDF.scala index 852e80ae44..9f6a24c5b4 100644 --- a/spark/src/main/scala/org/apache/comet/serde/CometScalaUDF.scala +++ b/spark/src/main/scala/org/apache/comet/serde/CometScalaUDF.scala @@ -20,7 +20,7 @@ package org.apache.comet.serde import org.apache.spark.SparkEnv -import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, AttributeSeq, BindReferences, Expression, Literal, ScalaUDF} +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, AttributeSeq, BindReferences, Expression, Literal, RuntimeReplaceable, ScalaUDF} import org.apache.spark.sql.types.BinaryType import org.apache.comet.CometConf @@ -77,10 +77,20 @@ object CometScalaUDF extends CometExpressionSerde[ScalaUDF] { return None } + // `RuntimeReplaceable` expressions (e.g. Spark 4's `StructsToJson`) have a `doGenCode` that + // always throws "Cannot generate code for expression". Catalyst's `ReplaceExpressions` rule + // normally rewrites them to their `replacement` form before codegen runs. Comet's serde + // sometimes works with the pre-rewrite form (via shim reconstruction) for matching purposes, + // so unwrap to the replacement here before binding so the kernel compiles. + val target = expr match { + case rr: RuntimeReplaceable => rr.replacement + case other => other + } + // Bind against only the AttributeReferences the tree actually reads, so ordinals align with // the data args we ship. - val attrs = expr.collect { case a: AttributeReference => a }.distinct - val boundExpr = BindReferences.bindReference(expr, AttributeSeq(attrs)) + val attrs = target.collect { case a: AttributeReference => a }.distinct + val boundExpr = BindReferences.bindReference(target, AttributeSeq(attrs)) // Gate at plan time. Surface the reason via withInfo rather than crashing Janino at execute. CometBatchKernelCodegen.canHandle(boundExpr) match { diff --git a/spark/src/main/scala/org/apache/comet/serde/strings.scala b/spark/src/main/scala/org/apache/comet/serde/strings.scala index 599e552f66..04d74c471a 100644 --- a/spark/src/main/scala/org/apache/comet/serde/strings.scala +++ b/spark/src/main/scala/org/apache/comet/serde/strings.scala @@ -448,71 +448,37 @@ object CometGetJsonObject extends CometExpressionSerde[GetJsonObject] { " support when using engine=rust") override def getSupportLevel(expr: GetJsonObject): SupportLevel = { - if (CometConf.COMET_JSON_ENGINE.get() == CometConf.JSON_ENGINE_JAVA) { - expr.path match { - case _: Literal => Compatible(None) - case _ => Unsupported(Some("Only scalar paths are supported")) - } - } else { - Incompatible( - Some( - "Spark allows single-quoted JSON and unescaped control characters " + - "which Comet does not support")) + JsonRoute.choose("get_json_object") match { + case JsonRoute.Native => + Incompatible( + Some( + "Spark allows single-quoted JSON and unescaped control characters " + + "which Comet does not support")) + case JsonRoute.JvmCodegen => Compatible(None) + case JsonRoute.Fallback(reason) => Unsupported(Some(reason)) } } override def convert( expr: GetJsonObject, inputs: Seq[Attribute], - binding: Boolean): Option[Expr] = { - if (CometConf.COMET_JSON_ENGINE.get() == CometConf.JSON_ENGINE_JAVA) { - convertViaJvmUdf(expr, inputs, binding) - } else { - val jsonExpr = exprToProtoInternal(expr.json, inputs, binding) - val pathExpr = exprToProtoInternal(expr.path, inputs, binding) - val optExpr = scalarFunctionExprToProtoWithReturnType( - "get_json_object", - expr.dataType, - false, - jsonExpr, - pathExpr) - optExprWithInfo(optExpr, expr, expr.json, expr.path) - } - } - - private def convertViaJvmUdf( - expr: GetJsonObject, - inputs: Seq[Attribute], - binding: Boolean): Option[Expr] = { - expr.path match { - case Literal(value, DataTypes.StringType) => - if (value == null) { - withInfo(expr, "Null literal path is handled by Spark fallback") - return None - } - val jsonProto = exprToProtoInternal(expr.json, inputs, binding) - val pathProto = exprToProtoInternal(expr.path, inputs, binding) - if (jsonProto.isEmpty || pathProto.isEmpty) { - return None - } - val returnType = serializeDataType(DataTypes.StringType).getOrElse(return None) - val udfBuilder = ExprOuterClass.JvmScalarUdf - .newBuilder() - .setClassName("org.apache.comet.udf.GetJsonObjectUDF") - .addArgs(jsonProto.get) - .addArgs(pathProto.get) - .setReturnType(returnType) - .setReturnNullable(expr.nullable) - Some( - ExprOuterClass.Expr - .newBuilder() - .setJvmScalarUdf(udfBuilder.build()) - .build()) - case _ => - withInfo(expr, "Only scalar paths are supported") + binding: Boolean): Option[Expr] = + JsonRoute.choose("get_json_object") match { + case JsonRoute.Native => + val jsonExpr = exprToProtoInternal(expr.json, inputs, binding) + val pathExpr = exprToProtoInternal(expr.path, inputs, binding) + val optExpr = scalarFunctionExprToProtoWithReturnType( + "get_json_object", + expr.dataType, + false, + jsonExpr, + pathExpr) + optExprWithInfo(optExpr, expr, expr.json, expr.path) + case JsonRoute.JvmCodegen => CometScalaUDF.emitJvmCodegenDispatch(expr, inputs, binding) + case JsonRoute.Fallback(reason) => + withInfo(expr, reason) None } - } } trait CommonStringExprs { diff --git a/spark/src/main/scala/org/apache/comet/serde/structs.scala b/spark/src/main/scala/org/apache/comet/serde/structs.scala index eaea21ffea..904873b588 100644 --- a/spark/src/main/scala/org/apache/comet/serde/structs.scala +++ b/spark/src/main/scala/org/apache/comet/serde/structs.scala @@ -22,16 +22,58 @@ package org.apache.comet.serde import scala.jdk.CollectionConverters._ import scala.util.Try -import org.apache.spark.sql.catalyst.expressions.{Attribute, BoundReference, CreateNamedStruct, GetArrayStructFields, GetStructField, JsonToStructs, Literal, StructsToCsv, StructsToJson} +import org.apache.spark.sql.catalyst.expressions.{Attribute, CreateNamedStruct, GetArrayStructFields, GetStructField, JsonToStructs, StructsToCsv, StructsToJson} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ -import org.apache.spark.unsafe.types.UTF8String import org.apache.comet.CometConf import org.apache.comet.CometSparkSessionExtensions.withInfo import org.apache.comet.DataTypeSupport import org.apache.comet.serde.QueryPlanSerde.{exprToProtoInternal, serializeDataType} -import org.apache.comet.udf.CometLambdaRegistry + +/** + * Routing decision for a JSON expression. Each JSON serde delegates to [[JsonRoute.choose]] to + * pick between the native Rust path and the Arrow-direct codegen dispatcher (which + * Janino-compiles Spark's own `doGenCode` for the expression). + */ +private[serde] sealed trait JsonRoute +private[serde] object JsonRoute { + + /** Run the native DataFusion JSON implementation. */ + case object Native extends JsonRoute + + /** + * Route through the codegen dispatcher. Spark's `doGenCode` for the expression compiles into a + * per-batch kernel, so semantics match Spark exactly. + */ + case object JvmCodegen extends JsonRoute + + /** Decline to run natively; the operator falls back to Spark with the given reason. */ + case class Fallback(reason: String) extends JsonRoute + + /** + * Pick a route given the user's config. `engine=rust` (default) runs the native path. + * `engine=java` routes through the codegen dispatcher when + * [[CometConf.COMET_SCALA_UDF_CODEGEN_ENABLED]] is true; otherwise Spark fallback. + */ + def choose(exprName: String): JsonRoute = { + val engine = CometConf.COMET_JSON_ENGINE.get() + val codegenEnabled = CometConf.COMET_SCALA_UDF_CODEGEN_ENABLED.get() + engine match { + case CometConf.JSON_ENGINE_RUST => Native + case CometConf.JSON_ENGINE_JAVA => + if (codegenEnabled) { + JvmCodegen + } else { + Fallback( + s"$exprName requires ${CometConf.COMET_SCALA_UDF_CODEGEN_ENABLED.key}=true when " + + s"${CometConf.COMET_JSON_ENGINE.key}=${CometConf.JSON_ENGINE_JAVA}. " + + "The codegen dispatcher is experimental and disabled by default.") + } + case other => Fallback(s"Unknown ${CometConf.COMET_JSON_ENGINE.key}=$other") + } + } +} object CometCreateNamedStruct extends CometExpressionSerde[CreateNamedStruct] { override def convert( @@ -110,94 +152,60 @@ object CometGetArrayStructFields extends CometExpressionSerde[GetArrayStructFiel object CometStructsToJson extends CometExpressionSerde[StructsToJson] { override def getSupportLevel(expr: StructsToJson): SupportLevel = { - if (CometConf.COMET_JSON_ENGINE.get() == CometConf.JSON_ENGINE_JAVA) { - expr.child.dataType match { - case s: StructType if s.fields.forall(f => isSupportedType(f.dataType)) => - Compatible(None) - case _ => - Unsupported(Some("to_json: only StructType with supported fields is supported")) - } - } else { - if (expr.options.nonEmpty) { - return Unsupported(Some("StructsToJson with options is not supported")) - } - val dataType = expr.child.dataType - if (!isSupportedType(dataType)) { - return Unsupported(Some(s"Struct type: $dataType contains unsupported types")) - } - Compatible() + JsonRoute.choose("to_json") match { + case JsonRoute.JvmCodegen => + expr.child.dataType match { + case s: StructType if s.fields.forall(f => isSupportedType(f.dataType)) => + Compatible(None) + case _ => + Unsupported(Some("to_json: only StructType with supported fields is supported")) + } + case JsonRoute.Native => + if (expr.options.nonEmpty) { + return Unsupported(Some("StructsToJson with options is not supported")) + } + val dataType = expr.child.dataType + if (!isSupportedType(dataType)) { + return Unsupported(Some(s"Struct type: $dataType contains unsupported types")) + } + Compatible() + case JsonRoute.Fallback(reason) => Unsupported(Some(reason)) } } override def convert( expr: StructsToJson, inputs: Seq[Attribute], - binding: Boolean): Option[ExprOuterClass.Expr] = { - if (CometConf.COMET_JSON_ENGINE.get() == CometConf.JSON_ENGINE_JAVA) { - return convertViaJvmUdf(expr, inputs, binding) - } - if (expr.options.nonEmpty) { - withInfo(expr, "StructsToJson with options is not supported") - return None - } - val ignoreNullFields = SQLConf.get.jsonGeneratorIgnoreNullFields - exprToProtoInternal(expr.child, inputs, binding) match { - case Some(p) => - val toJson = ExprOuterClass.ToJson - .newBuilder() - .setChild(p) - .setTimezone(expr.timeZoneId.getOrElse("UTC")) - .setIgnoreNullFields(ignoreNullFields) - .build() - Some( - ExprOuterClass.Expr - .newBuilder() - .setToJson(toJson) - .build()) - case _ => - withInfo(expr, expr.child) + binding: Boolean): Option[ExprOuterClass.Expr] = + JsonRoute.choose("to_json") match { + case JsonRoute.JvmCodegen => CometScalaUDF.emitJvmCodegenDispatch(expr, inputs, binding) + case JsonRoute.Fallback(reason) => + withInfo(expr, reason) None + case JsonRoute.Native => + if (expr.options.nonEmpty) { + withInfo(expr, "StructsToJson with options is not supported") + return None + } + val ignoreNullFields = SQLConf.get.jsonGeneratorIgnoreNullFields + exprToProtoInternal(expr.child, inputs, binding) match { + case Some(p) => + val toJson = ExprOuterClass.ToJson + .newBuilder() + .setChild(p) + .setTimezone(expr.timeZoneId.getOrElse("UTC")) + .setIgnoreNullFields(ignoreNullFields) + .build() + Some( + ExprOuterClass.Expr + .newBuilder() + .setToJson(toJson) + .build()) + case _ => + withInfo(expr, expr.child) + None + } } - } - - private def convertViaJvmUdf( - expr: StructsToJson, - inputs: Seq[Attribute], - binding: Boolean): Option[ExprOuterClass.Expr] = { - expr.child.dataType match { - case s: StructType if s.fields.forall(f => isSupportedType(f.dataType)) => - val childProto = exprToProtoInternal(expr.child, inputs, binding) - if (childProto.isEmpty) return None - // TODO follow-up: the registry retains expressions for the JVM lifetime and - // does not propagate across executors. This is acceptable for the experimental - // engine=java mode in local Spark; cluster runs require encoding the - // schema/timezone directly in the JvmScalarUdf args. - // Rebind the child so the UDF can call eval(row) against a single-column - // wrapper InternalRow without hitting an unevaluable AttributeReference. - val rebound = - expr.copy(child = BoundReference(0, expr.child.dataType, expr.child.nullable)) - val key = CometLambdaRegistry.register(rebound) - val keyProto = - exprToProtoInternal(Literal(UTF8String.fromString(key), StringType), inputs, binding) - if (keyProto.isEmpty) return None - val returnType = serializeDataType(DataTypes.StringType).getOrElse(return None) - val udfBuilder = ExprOuterClass.JvmScalarUdf - .newBuilder() - .setClassName("org.apache.comet.udf.ToJsonUDF") - .addArgs(childProto.get) - .addArgs(keyProto.get) - .setReturnType(returnType) - .setReturnNullable(expr.nullable) - Some( - ExprOuterClass.Expr - .newBuilder() - .setJvmScalarUdf(udfBuilder.build()) - .build()) - case _ => - withInfo(expr, "to_json: only StructType with supported fields is supported") - None - } - } def isSupportedType(dt: DataType): Boolean = { dt match { @@ -227,14 +235,16 @@ object CometJsonToStructs extends CometExpressionSerde[JsonToStructs] { override def getUnsupportedReasons(): Seq[String] = Seq("Requires an explicit schema") override def getSupportLevel(expr: JsonToStructs): SupportLevel = { - if (CometConf.COMET_JSON_ENGINE.get() == CometConf.JSON_ENGINE_JAVA) { - expr.schema match { - case s: StructType if isSupportedSchema(s) => Compatible(None) - case _ => Unsupported(Some("from_json: only StructType schemas are supported")) - } - } else { - // this feature is partially implemented and not comprehensively tested yet - Incompatible() + JsonRoute.choose("from_json") match { + case JsonRoute.JvmCodegen => + expr.schema match { + case s: StructType if isSupportedSchema(s) => Compatible(None) + case _ => Unsupported(Some("from_json: only StructType schemas are supported")) + } + case JsonRoute.Native => + // this feature is partially implemented and not comprehensively tested yet + Incompatible() + case JsonRoute.Fallback(reason) => Unsupported(Some(reason)) } } @@ -248,41 +258,46 @@ object CometJsonToStructs extends CometExpressionSerde[JsonToStructs] { return None } - if (CometConf.COMET_JSON_ENGINE.get() == CometConf.JSON_ENGINE_JAVA) { - return convertViaJvmUdf(expr, inputs, binding) - } - - if (!isSupportedSchema(expr.schema)) { - withInfo(expr, "from_json: Unsupported schema type") - return None - } - - val options = expr.options - if (options.nonEmpty) { - val mode = options.getOrElse("mode", "PERMISSIVE") - if (mode != "PERMISSIVE") { - withInfo(expr, s"from_json: Only PERMISSIVE mode supported, got: $mode") - return None - } - val knownOptions = Set("mode") - val unknownOpts = options.keySet -- knownOptions - if (unknownOpts.nonEmpty) { - withInfo(expr, s"from_json: Ignoring unsupported options: ${unknownOpts.mkString(", ")}") - } - } - - // Convert child expression and schema to protobuf - for { - childProto <- exprToProtoInternal(expr.child, inputs, binding) - schemaProto <- serializeDataType(expr.schema) - } yield { - val fromJson = ExprOuterClass.FromJson - .newBuilder() - .setChild(childProto) - .setSchema(schemaProto) - .setTimezone(expr.timeZoneId.getOrElse("UTC")) - .build() - ExprOuterClass.Expr.newBuilder().setFromJson(fromJson).build() + JsonRoute.choose("from_json") match { + case JsonRoute.JvmCodegen => CometScalaUDF.emitJvmCodegenDispatch(expr, inputs, binding) + case JsonRoute.Fallback(reason) => + withInfo(expr, reason) + None + case JsonRoute.Native => + if (!isSupportedSchema(expr.schema)) { + withInfo(expr, "from_json: Unsupported schema type") + return None + } + + val options = expr.options + if (options.nonEmpty) { + val mode = options.getOrElse("mode", "PERMISSIVE") + if (mode != "PERMISSIVE") { + withInfo(expr, s"from_json: Only PERMISSIVE mode supported, got: $mode") + return None + } + val knownOptions = Set("mode") + val unknownOpts = options.keySet -- knownOptions + if (unknownOpts.nonEmpty) { + withInfo( + expr, + s"from_json: Ignoring unsupported options: ${unknownOpts.mkString(", ")}") + } + } + + // Convert child expression and schema to protobuf + for { + childProto <- exprToProtoInternal(expr.child, inputs, binding) + schemaProto <- serializeDataType(expr.schema) + } yield { + val fromJson = ExprOuterClass.FromJson + .newBuilder() + .setChild(childProto) + .setSchema(schemaProto) + .setTimezone(expr.timeZoneId.getOrElse("UTC")) + .build() + ExprOuterClass.Expr.newBuilder().setFromJson(fromJson).build() + } } } @@ -294,45 +309,6 @@ object CometJsonToStructs extends CometExpressionSerde[JsonToStructs] { true case _ => false } - - private def convertViaJvmUdf( - expr: JsonToStructs, - inputs: Seq[Attribute], - binding: Boolean): Option[ExprOuterClass.Expr] = { - expr.schema match { - case s: StructType if isSupportedSchema(s) => - val childProto = exprToProtoInternal(expr.child, inputs, binding) - if (childProto.isEmpty) return None - // TODO follow-up: the registry retains expressions for the JVM lifetime and - // does not propagate across executors. This is acceptable for the experimental - // engine=java mode in local Spark; cluster runs require encoding the - // schema/timezone directly in the JvmScalarUdf args. - // Rebind the child so the UDF can call eval(row) against a single-column - // wrapper InternalRow without hitting an unevaluable AttributeReference. - val rebound = - expr.copy(child = BoundReference(0, expr.child.dataType, expr.child.nullable)) - val key = CometLambdaRegistry.register(rebound) - val keyProto = - exprToProtoInternal(Literal(UTF8String.fromString(key), StringType), inputs, binding) - if (keyProto.isEmpty) return None - val returnType = serializeDataType(expr.schema).getOrElse(return None) - val udfBuilder = ExprOuterClass.JvmScalarUdf - .newBuilder() - .setClassName("org.apache.comet.udf.FromJsonUDF") - .addArgs(childProto.get) - .addArgs(keyProto.get) - .setReturnType(returnType) - .setReturnNullable(expr.nullable) - Some( - ExprOuterClass.Expr - .newBuilder() - .setJvmScalarUdf(udfBuilder.build()) - .build()) - case _ => - withInfo(expr, "from_json: only StructType schemas are supported") - None - } - } } object CometStructsToCsv extends CometExpressionSerde[StructsToCsv] { diff --git a/spark/src/main/scala/org/apache/comet/udf/FromJsonUDF.scala b/spark/src/main/scala/org/apache/comet/udf/FromJsonUDF.scala deleted file mode 100644 index 7a94030f7b..0000000000 --- a/spark/src/main/scala/org/apache/comet/udf/FromJsonUDF.scala +++ /dev/null @@ -1,207 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.comet.udf - -import java.nio.charset.StandardCharsets - -import org.apache.arrow.vector.{BigIntVector, BitVector, FieldVector, Float4Vector, Float8Vector, IntVector, ValueVector, VarCharVector} -import org.apache.arrow.vector.complex.StructVector -import org.apache.arrow.vector.types.FloatingPointPrecision -import org.apache.arrow.vector.types.pojo.{ArrowType, FieldType} -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.{BoundReference, Expression, GenericInternalRow, JsonToStructs, RuntimeReplaceable} -import org.apache.spark.sql.types._ -import org.apache.spark.unsafe.types.UTF8String - -import org.apache.comet.CometArrowAllocator - -/** - * `from_json(json, schema)` implemented via Spark's `JsonToStructs` for byte-exact compatibility. - * - * The registered expression is looked up from `CometLambdaRegistry` using a scalar key argument. - * The output schema is read from the registered expression's `dataType`. The UDF is stateless: - * every call resolves the expression from the registry, so a single UDF instance can be shared - * across native worker threads. - * - * Inputs: - * - inputs(0): VarCharVector json column - * - inputs(1): VarCharVector scalar (length-1) containing the registry key string - * - * Output: StructVector matching the registered schema. The caller owns the returned vector and - * must close it; closing the StructVector recursively closes its child vectors. - * - * Supported field types (matching `CometJsonToStructs.isSupportedType`): Boolean, Integer, Long, - * Float, Double, String, plus nested struct of those. - */ -class FromJsonUDF extends CometUDF { - - override def evaluate(inputs: Array[ValueVector]): ValueVector = { - require(inputs.length == 2, s"FromJsonUDF expects 2 inputs (json, key), got ${inputs.length}") - val json = inputs(0).asInstanceOf[VarCharVector] - val keyVec = inputs(1).asInstanceOf[VarCharVector] - require( - keyVec.getValueCount >= 1 && !keyVec.isNull(0), - "FromJsonUDF requires a non-null scalar registry key") - - val key = new String(keyVec.get(0), StandardCharsets.UTF_8) - val configExpr = CometLambdaRegistry.get(key).asInstanceOf[JsonToStructs] - val schema = configExpr.dataType.asInstanceOf[StructType] - // Build a fresh expression per call: Spark's JsonToStructsEvaluator may hold mutable - // per-row state, so a shared cross-thread instance is unsafe (the JVM UDF framework - // reuses one UDF instance across native worker threads). - val sparkExpr = JsonToStructs( - schema, - configExpr.options, - BoundReference(0, StringType, nullable = true), - configExpr.timeZoneId) - val evalExpr: Expression = sparkExpr match { - case r: RuntimeReplaceable => r.replacement - case other => other - } - - val n = json.getValueCount - val out = StructVector.empty("from_json_result", CometArrowAllocator) - schema.fields.foreach(f => addChild(out, f.name, f.dataType, f.nullable)) - out.setInitialCapacity(n) - out.allocateNew() - - val row = new GenericInternalRow(1) - var i = 0 - while (i < n) { - if (json.isNull(i)) { - // entry stays null (default after allocateNew) - } else { - row.update(0, UTF8String.fromBytes(json.get(i))) - val result = evalExpr.eval(row) - if (result == null) { - // null result → null struct entry - } else { - out.setIndexDefined(i) - val struct = result.asInstanceOf[InternalRow] - var f = 0 - while (f < schema.fields.length) { - writeChild(out.getChildByOrdinal(f), i, schema.fields(f).dataType, struct, f) - f += 1 - } - } - } - i += 1 - } - // Set value count on each child so its validity buffer is sized correctly. - var f = 0 - while (f < schema.fields.length) { - out.getChildByOrdinal(f).setValueCount(n) - f += 1 - } - out.setValueCount(n) - out - } - - private def addChild( - parent: StructVector, - name: String, - dt: DataType, - nullable: Boolean): Unit = { - dt match { - case BooleanType => - parent.addOrGet( - name, - new FieldType(nullable, ArrowType.Bool.INSTANCE, null), - classOf[BitVector]) - case IntegerType => - parent.addOrGet( - name, - new FieldType(nullable, new ArrowType.Int(32, true), null), - classOf[IntVector]) - case LongType => - parent.addOrGet( - name, - new FieldType(nullable, new ArrowType.Int(64, true), null), - classOf[BigIntVector]) - case FloatType => - parent.addOrGet( - name, - new FieldType( - nullable, - new ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE), - null), - classOf[Float4Vector]) - case DoubleType => - parent.addOrGet( - name, - new FieldType( - nullable, - new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE), - null), - classOf[Float8Vector]) - case StringType => - parent.addOrGet( - name, - new FieldType(nullable, ArrowType.Utf8.INSTANCE, null), - classOf[VarCharVector]) - case nested: StructType => - parent.addOrGet( - name, - new FieldType(nullable, ArrowType.Struct.INSTANCE, null), - classOf[StructVector]) - val child = parent.getChild(name).asInstanceOf[StructVector] - nested.fields.foreach(ff => addChild(child, ff.name, ff.dataType, ff.nullable)) - case other => - throw new UnsupportedOperationException(s"FromJsonUDF: unsupported type $other") - } - } - - private def writeChild( - child: ValueVector, - i: Int, - dt: DataType, - struct: InternalRow, - f: Int): Unit = { - if (struct.isNullAt(f)) { - child.asInstanceOf[FieldVector].setNull(i) - return - } - dt match { - case BooleanType => - child.asInstanceOf[BitVector].setSafe(i, if (struct.getBoolean(f)) 1 else 0) - case IntegerType => - child.asInstanceOf[IntVector].setSafe(i, struct.getInt(f)) - case LongType => - child.asInstanceOf[BigIntVector].setSafe(i, struct.getLong(f)) - case FloatType => - child.asInstanceOf[Float4Vector].setSafe(i, struct.getFloat(f)) - case DoubleType => - child.asInstanceOf[Float8Vector].setSafe(i, struct.getDouble(f)) - case StringType => - child.asInstanceOf[VarCharVector].setSafe(i, struct.getUTF8String(f).getBytes) - case nested: StructType => - val sv = child.asInstanceOf[StructVector] - sv.setIndexDefined(i) - val inner = struct.getStruct(f, nested.fields.length) - var ff = 0 - while (ff < nested.fields.length) { - writeChild(sv.getChildByOrdinal(ff), i, nested.fields(ff).dataType, inner, ff) - ff += 1 - } - case other => - throw new UnsupportedOperationException(s"FromJsonUDF: unsupported type $other") - } - } -} diff --git a/spark/src/main/scala/org/apache/comet/udf/GetJsonObjectUDF.scala b/spark/src/main/scala/org/apache/comet/udf/GetJsonObjectUDF.scala deleted file mode 100644 index 17500d318b..0000000000 --- a/spark/src/main/scala/org/apache/comet/udf/GetJsonObjectUDF.scala +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.comet.udf - -import java.nio.charset.StandardCharsets - -import org.apache.arrow.vector.{ValueVector, VarCharVector} -import org.apache.spark.sql.catalyst.expressions.{BoundReference, Expression, GenericInternalRow, GetJsonObject, Literal, RuntimeReplaceable} -import org.apache.spark.sql.types.StringType -import org.apache.spark.unsafe.types.UTF8String - -import org.apache.comet.CometArrowAllocator - -/** - * `get_json_object(json, path)` implemented via Spark's `GetJsonObject` expression for byte-exact - * compatibility. Path must be a non-null scalar (enforced by the serde when routing here). - * - * Inputs: - * - inputs(0): VarCharVector json column - * - inputs(1): VarCharVector path (scalar, length-1) - * - * Output: VarCharVector, same length as json input. - * - * A fresh Spark expression is built per `evaluate` call (per batch). Spark's `GetJsonObject` - * holds mutable per-row state in its evaluator, so a shared cross-thread instance is unsafe; the - * JVM UDF framework reuses one UDF instance across native worker threads. - */ -class GetJsonObjectUDF extends CometUDF { - - override def evaluate(inputs: Array[ValueVector]): ValueVector = { - require(inputs.length == 2, s"GetJsonObjectUDF expects 2 inputs, got ${inputs.length}") - val jsonVec = inputs(0).asInstanceOf[VarCharVector] - val pathVec = inputs(1).asInstanceOf[VarCharVector] - require( - pathVec.getValueCount >= 1 && !pathVec.isNull(0), - "GetJsonObjectUDF requires a non-null scalar path") - - val pathStr = new String(pathVec.get(0), StandardCharsets.UTF_8) - val sparkExpr = GetJsonObject( - BoundReference(0, StringType, nullable = true), - Literal(UTF8String.fromString(pathStr), StringType)) - val evalExpr: Expression = sparkExpr match { - case r: RuntimeReplaceable => r.replacement - case other => other - } - - val n = jsonVec.getValueCount - val out = new VarCharVector("get_json_object_result", CometArrowAllocator) - out.allocateNew(n) - - val row = new GenericInternalRow(1) - var i = 0 - while (i < n) { - if (jsonVec.isNull(i)) { - out.setNull(i) - } else { - row.update(0, UTF8String.fromBytes(jsonVec.get(i))) - val result = evalExpr.eval(row) - if (result == null) out.setNull(i) - else out.setSafe(i, result.asInstanceOf[UTF8String].getBytes) - } - i += 1 - } - out.setValueCount(n) - out - } -} diff --git a/spark/src/main/scala/org/apache/comet/udf/ToJsonUDF.scala b/spark/src/main/scala/org/apache/comet/udf/ToJsonUDF.scala deleted file mode 100644 index 9c7a9a9bb5..0000000000 --- a/spark/src/main/scala/org/apache/comet/udf/ToJsonUDF.scala +++ /dev/null @@ -1,130 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.comet.udf - -import java.nio.charset.StandardCharsets - -import org.apache.arrow.vector.{BigIntVector, BitVector, Float4Vector, Float8Vector, IntVector, SmallIntVector, TinyIntVector, ValueVector, VarCharVector} -import org.apache.arrow.vector.complex.StructVector -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.{BoundReference, Expression, GenericInternalRow, RuntimeReplaceable, StructsToJson} -import org.apache.spark.sql.types._ -import org.apache.spark.unsafe.types.UTF8String - -import org.apache.comet.CometArrowAllocator - -/** - * `to_json(struct)` implemented via Spark's `StructsToJson` for byte-exact compatibility. - * - * The registered expression is looked up from `CometLambdaRegistry` using a scalar key argument. - * The schema is read from the registered expression's child datatype. The UDF is stateless: every - * call resolves the expression from the registry, so a single UDF instance can be shared across - * native worker threads. - * - * Inputs: - * - inputs(0): StructVector of arbitrary supported schema - * - inputs(1): VarCharVector scalar (length-1) containing the registry key string - * - * Output: VarCharVector of JSON strings. - * - * Supported field types (matching `CometStructsToJson.isSupportedType`): Boolean, Byte, Short, - * Integer, Long, Float, Double, String, plus nested struct of those. - */ -class ToJsonUDF extends CometUDF { - - override def evaluate(inputs: Array[ValueVector]): ValueVector = { - require(inputs.length == 2, s"ToJsonUDF expects 2 inputs (struct, key), got ${inputs.length}") - val struct = inputs(0).asInstanceOf[StructVector] - val keyVec = inputs(1).asInstanceOf[VarCharVector] - require( - keyVec.getValueCount >= 1 && !keyVec.isNull(0), - "ToJsonUDF requires a non-null scalar registry key") - - val key = new String(keyVec.get(0), StandardCharsets.UTF_8) - val configExpr = CometLambdaRegistry.get(key).asInstanceOf[StructsToJson] - val schema = configExpr.child.dataType.asInstanceOf[StructType] - // Build a fresh expression per call: Spark's StructsToJsonEvaluator holds mutable - // per-row state, so a shared cross-thread instance is unsafe (the JVM UDF framework - // reuses one UDF instance across native worker threads). - val sparkExpr = - StructsToJson( - configExpr.options, - BoundReference(0, schema, nullable = true), - configExpr.timeZoneId) - val evalExpr: Expression = sparkExpr match { - case r: RuntimeReplaceable => r.replacement - case other => other - } - - val n = struct.getValueCount - val out = new VarCharVector("to_json_result", CometArrowAllocator) - out.allocateNew(n) - - val row = new GenericInternalRow(1) - var i = 0 - while (i < n) { - if (struct.isNull(i)) { - out.setNull(i) - } else { - val inner = arrowStructRowToSparkRow(struct, i, schema) - row.update(0, inner) - val result = evalExpr.eval(row) - if (result == null) out.setNull(i) - else out.setSafe(i, result.asInstanceOf[UTF8String].getBytes) - } - i += 1 - } - out.setValueCount(n) - out - } - - private def arrowStructRowToSparkRow( - struct: StructVector, - rowIdx: Int, - schema: StructType): InternalRow = { - val values = new Array[Any](schema.fields.length) - var f = 0 - while (f < schema.fields.length) { - val child = struct.getChildByOrdinal(f) - if (child.isNull(rowIdx)) { - values(f) = null - } else { - values(f) = readScalar(child, rowIdx, schema.fields(f).dataType) - } - f += 1 - } - new GenericInternalRow(values) - } - - private def readScalar(v: ValueVector, i: Int, dt: DataType): Any = dt match { - case BooleanType => v.asInstanceOf[BitVector].get(i) == 1 - case ByteType => v.asInstanceOf[TinyIntVector].get(i) - case ShortType => v.asInstanceOf[SmallIntVector].get(i) - case IntegerType => v.asInstanceOf[IntVector].get(i) - case LongType => v.asInstanceOf[BigIntVector].get(i) - case FloatType => v.asInstanceOf[Float4Vector].get(i) - case DoubleType => v.asInstanceOf[Float8Vector].get(i) - case StringType => UTF8String.fromBytes(v.asInstanceOf[VarCharVector].get(i)) - case nested: StructType => - arrowStructRowToSparkRow(v.asInstanceOf[StructVector], i, nested) - case other => - throw new UnsupportedOperationException(s"ToJsonUDF: unsupported type $other") - } -} diff --git a/spark/src/test/scala/org/apache/comet/CometCodegenSourceSuite.scala b/spark/src/test/scala/org/apache/comet/CometCodegenSourceSuite.scala index e8f59e8b22..b58fd00a04 100644 --- a/spark/src/test/scala/org/apache/comet/CometCodegenSourceSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometCodegenSourceSuite.scala @@ -166,13 +166,16 @@ class CometCodegenSourceSuite extends AnyFunSuite { s"got:\n$src") } - test("canHandle rejects CodegenFallback expressions") { + test("canHandle accepts CodegenFallback expressions (delegates to eval(row))") { + // CodegenFallback.doGenCode emits ((Expression) references[N]).eval(row) which is the same + // mechanism that backs HigherOrderFunction support: the eval reads through the kernel's typed + // Arrow getters via the row alias. Other CodegenFallback expressions (JsonToStructs, + // StructsToJson, ...) ride the same path. val expr = FakeCodegenFallback(BoundReference(0, StringType, nullable = true)) val reason = CometBatchKernelCodegen.canHandle(expr) - assert(reason.isDefined, "expected canHandle to reject CodegenFallback") assert( - reason.get.contains("FakeCodegenFallback"), - s"expected reason to name the rejected expression class; got: ${reason.get}") + reason.isEmpty, + s"expected canHandle to accept CodegenFallback; got rejection: ${reason.getOrElse("")}") } test("canHandle accepts Nondeterministic expressions (per-partition kernel handles state)") { diff --git a/spark/src/test/scala/org/apache/comet/CometJsonJvmSuite.scala b/spark/src/test/scala/org/apache/comet/CometJsonJvmSuite.scala index 5d20b718c8..2e1242660d 100644 --- a/spark/src/test/scala/org/apache/comet/CometJsonJvmSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometJsonJvmSuite.scala @@ -26,7 +26,9 @@ import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper class CometJsonJvmSuite extends CometTestBase with AdaptiveSparkPlanHelper { override protected def sparkConf: SparkConf = - super.sparkConf.set(CometConf.COMET_JSON_ENGINE.key, CometConf.JSON_ENGINE_JAVA) + super.sparkConf + .set(CometConf.COMET_JSON_ENGINE.key, CometConf.JSON_ENGINE_JAVA) + .set(CometConf.COMET_SCALA_UDF_CODEGEN_ENABLED.key, "true") private val rows = Seq( """{"a":1,"b":"x","arr":[1,2,3]}""", diff --git a/spark/src/test/scala/org/apache/comet/udf/FromJsonUDFSuite.scala b/spark/src/test/scala/org/apache/comet/udf/FromJsonUDFSuite.scala deleted file mode 100644 index 292f6d96e5..0000000000 --- a/spark/src/test/scala/org/apache/comet/udf/FromJsonUDFSuite.scala +++ /dev/null @@ -1,107 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.comet.udf - -import java.nio.charset.StandardCharsets - -import org.scalatest.funsuite.AnyFunSuite - -import org.apache.arrow.vector.{IntVector, VarCharVector} -import org.apache.arrow.vector.complex.StructVector -import org.apache.spark.sql.catalyst.expressions.{BoundReference, JsonToStructs} -import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} - -import org.apache.comet.CometArrowAllocator - -class FromJsonUDFSuite extends AnyFunSuite { - - private def stringVec(name: String, values: Seq[Option[String]]): VarCharVector = { - val v = new VarCharVector(name, CometArrowAllocator) - v.allocateNew(values.size) - values.zipWithIndex.foreach { - case (Some(s), i) => v.setSafe(i, s.getBytes(StandardCharsets.UTF_8)) - case (None, i) => v.setNull(i) - } - v.setValueCount(values.size) - v - } - - test("parse struct from JSON strings") { - val schema = StructType( - Seq( - StructField("a", IntegerType, nullable = true), - StructField("b", StringType, nullable = true))) - val sparkExpr = JsonToStructs( - schema, - Map.empty[String, String], - BoundReference(0, StringType, nullable = true), - Some("UTC")) - val key = CometLambdaRegistry.register(sparkExpr) - try { - val udf = new FromJsonUDF - val json = stringVec( - "json", - Seq(Some("""{"a":1,"b":"x"}"""), Some("""{"a":null,"b":"y"}"""), None, Some("bad"))) - val keyVec = stringVec("key", Seq(Some(key))) - try { - val out = udf.evaluate(Array(json, keyVec)).asInstanceOf[StructVector] - try { - assert(out.getValueCount == 4) - val aVec = out.getChildByOrdinal(0).asInstanceOf[IntVector] - val bVec = out.getChildByOrdinal(1).asInstanceOf[VarCharVector] - // Row 0: a=1, b="x" - assert(!out.isNull(0)) - assert(aVec.get(0) == 1) - assert(new String(bVec.get(0), StandardCharsets.UTF_8) == "x") - // Row 1: a=null, b="y" - assert(!out.isNull(1)) - assert(aVec.isNull(1)) - assert(new String(bVec.get(1), StandardCharsets.UTF_8) == "y") - // Row 2: input was null - assert(out.isNull(2)) - // Row 3: malformed JSON, PERMISSIVE → struct of all nulls - assert(!out.isNull(3)) - assert(aVec.isNull(3)) - assert(bVec.isNull(3)) - } finally out.close() - } finally { json.close(); keyVec.close() } - } finally CometLambdaRegistry.remove(key) - } - - test("empty input vector produces empty output struct") { - val schema = StructType(Seq(StructField("a", IntegerType, nullable = true))) - val sparkExpr = JsonToStructs( - schema, - Map.empty[String, String], - BoundReference(0, StringType, nullable = true), - Some("UTC")) - val key = CometLambdaRegistry.register(sparkExpr) - try { - val udf = new FromJsonUDF - val json = stringVec("json", Seq.empty) - val keyVec = stringVec("key", Seq(Some(key))) - try { - val out = udf.evaluate(Array(json, keyVec)).asInstanceOf[StructVector] - try assert(out.getValueCount == 0) - finally out.close() - } finally { json.close(); keyVec.close() } - } finally CometLambdaRegistry.remove(key) - } -} diff --git a/spark/src/test/scala/org/apache/comet/udf/GetJsonObjectUDFSuite.scala b/spark/src/test/scala/org/apache/comet/udf/GetJsonObjectUDFSuite.scala deleted file mode 100644 index fd1c21dcd1..0000000000 --- a/spark/src/test/scala/org/apache/comet/udf/GetJsonObjectUDFSuite.scala +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.comet.udf - -import java.nio.charset.StandardCharsets - -import org.scalatest.funsuite.AnyFunSuite - -import org.apache.arrow.vector.VarCharVector - -import org.apache.comet.CometArrowAllocator - -class GetJsonObjectUDFSuite extends AnyFunSuite { - - private def stringVec(name: String, values: Seq[Option[String]]): VarCharVector = { - val v = new VarCharVector(name, CometArrowAllocator) - v.allocateNew(values.size) - values.zipWithIndex.foreach { - case (Some(s), i) => v.setSafe(i, s.getBytes(StandardCharsets.UTF_8)) - case (None, i) => v.setNull(i) - } - v.setValueCount(values.size) - v - } - - private def get(out: VarCharVector, i: Int): Option[String] = - if (out.isNull(i)) None else Some(new String(out.get(i), StandardCharsets.UTF_8)) - - test("extract values using a scalar path") { - val udf = new GetJsonObjectUDF - val json = stringVec( - "json", - Seq(Some("""{"a":{"b":42}}"""), Some("""{"a":{"b":7}}"""), None, Some("garbage"))) - val path = stringVec("path", Seq(Some("$.a.b"))) - try { - val out = udf.evaluate(Array(json, path)).asInstanceOf[VarCharVector] - try { - assert(out.getValueCount == 4) - assert(get(out, 0) == Some("42")) - assert(get(out, 1) == Some("7")) - assert(get(out, 2).isEmpty) - assert(get(out, 3).isEmpty) - } finally out.close() - } finally { - json.close(); path.close() - } - } - - test("empty input vector produces empty output") { - val udf = new GetJsonObjectUDF - val json = stringVec("json", Seq.empty) - val path = stringVec("path", Seq(Some("$.a"))) - try { - val out = udf.evaluate(Array(json, path)).asInstanceOf[VarCharVector] - try assert(out.getValueCount == 0) - finally out.close() - } finally { - json.close(); path.close() - } - } - - test("different scalar paths across separate evaluations share the same UDF instance") { - val udf = new GetJsonObjectUDF - val json = stringVec("json", Seq(Some("""{"a":1,"b":2}"""))) - val pathA = stringVec("pathA", Seq(Some("$.a"))) - val pathB = stringVec("pathB", Seq(Some("$.b"))) - try { - val outA = udf.evaluate(Array(json, pathA)).asInstanceOf[VarCharVector] - try assert(get(outA, 0) == Some("1")) - finally outA.close() - val json2 = stringVec("json2", Seq(Some("""{"a":1,"b":2}"""))) - try { - val outB = udf.evaluate(Array(json2, pathB)).asInstanceOf[VarCharVector] - try assert(get(outB, 0) == Some("2")) - finally outB.close() - } finally json2.close() - } finally { - json.close(); pathA.close(); pathB.close() - } - } -} diff --git a/spark/src/test/scala/org/apache/comet/udf/ToJsonUDFSuite.scala b/spark/src/test/scala/org/apache/comet/udf/ToJsonUDFSuite.scala deleted file mode 100644 index 389e74c5e2..0000000000 --- a/spark/src/test/scala/org/apache/comet/udf/ToJsonUDFSuite.scala +++ /dev/null @@ -1,117 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.comet.udf - -import java.nio.charset.StandardCharsets - -import org.scalatest.funsuite.AnyFunSuite - -import org.apache.arrow.vector.{IntVector, VarCharVector} -import org.apache.arrow.vector.complex.StructVector -import org.apache.arrow.vector.types.pojo.{ArrowType, FieldType} -import org.apache.spark.sql.catalyst.expressions.{BoundReference, StructsToJson} -import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} - -import org.apache.comet.CometArrowAllocator - -class ToJsonUDFSuite extends AnyFunSuite { - - private def stringVec(name: String, values: Seq[Option[String]]): VarCharVector = { - val v = new VarCharVector(name, CometArrowAllocator) - v.allocateNew(values.size) - values.zipWithIndex.foreach { - case (Some(s), i) => v.setSafe(i, s.getBytes(StandardCharsets.UTF_8)) - case (None, i) => v.setNull(i) - } - v.setValueCount(values.size) - v - } - - test("serialize struct rows to JSON") { - val schema = StructType( - Seq( - StructField("a", IntegerType, nullable = true), - StructField("b", StringType, nullable = true))) - val sparkExpr = StructsToJson( - Map.empty[String, String], - BoundReference(0, schema, nullable = true), - Some("UTC")) - val key = CometLambdaRegistry.register(sparkExpr) - try { - val udf = new ToJsonUDF - - val struct = StructVector.empty("in", CometArrowAllocator) - val aVec = - struct.addOrGet( - "a", - new FieldType(true, new ArrowType.Int(32, true), null), - classOf[IntVector]) - val bVec = - struct.addOrGet( - "b", - new FieldType(true, ArrowType.Utf8.INSTANCE, null), - classOf[VarCharVector]) - struct.allocateNew() - - aVec.setSafe(0, 1); bVec.setSafe(0, "x".getBytes(StandardCharsets.UTF_8)) - aVec.setNull(1); bVec.setSafe(1, "y".getBytes(StandardCharsets.UTF_8)) - struct.setIndexDefined(0) - struct.setIndexDefined(1) - aVec.setValueCount(2); bVec.setValueCount(2); struct.setValueCount(2) - - val keyVec = stringVec("key", Seq(Some(key))) - try { - val out = udf.evaluate(Array(struct, keyVec)).asInstanceOf[VarCharVector] - try { - assert(out.getValueCount == 2) - assert(new String(out.get(0), StandardCharsets.UTF_8) == """{"a":1,"b":"x"}""") - // Spark drops null fields by default; the second row's `a` is null, - // so `to_json` produces only `b`. - assert(new String(out.get(1), StandardCharsets.UTF_8) == """{"b":"y"}""") - } finally out.close() - } finally { struct.close(); keyVec.close() } - } finally CometLambdaRegistry.remove(key) - } - - test("empty input vector produces empty output") { - val schema = StructType(Seq(StructField("a", IntegerType, nullable = true))) - val sparkExpr = StructsToJson( - Map.empty[String, String], - BoundReference(0, schema, nullable = true), - Some("UTC")) - val key = CometLambdaRegistry.register(sparkExpr) - try { - val udf = new ToJsonUDF - val struct = StructVector.empty("in", CometArrowAllocator) - struct.addOrGet( - "a", - new FieldType(true, new ArrowType.Int(32, true), null), - classOf[IntVector]) - struct.allocateNew() - struct.setValueCount(0) - val keyVec = stringVec("key", Seq(Some(key))) - try { - val out = udf.evaluate(Array(struct, keyVec)).asInstanceOf[VarCharVector] - try assert(out.getValueCount == 0) - finally out.close() - } finally { struct.close(); keyVec.close() } - } finally CometLambdaRegistry.remove(key) - } -} From e60655f1e74e3f99b19d8a0b3ec11d47a45b3c29 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 26 May 2026 21:04:02 -0600 Subject: [PATCH 3/4] ci: register CometJsonJvmSuite and drop unused imports - Add CometJsonJvmSuite to pr_build_linux.yml so check-missing-suites passes. - Remove unused HigherOrderFunction, LambdaFunction, NamedLambdaVariable imports from CometBatchKernelCodegen.scala (referenced only in comments). - Remove unused serializeDataType import from strings.scala. --- .github/workflows/pr_build_linux.yml | 1 + .../org/apache/comet/codegen/CometBatchKernelCodegen.scala | 2 +- spark/src/main/scala/org/apache/comet/serde/strings.scala | 2 +- 3 files changed, 3 insertions(+), 2 deletions(-) diff --git a/.github/workflows/pr_build_linux.yml b/.github/workflows/pr_build_linux.yml index f7d6c1a73d..5b610faf73 100644 --- a/.github/workflows/pr_build_linux.yml +++ b/.github/workflows/pr_build_linux.yml @@ -403,6 +403,7 @@ jobs: org.apache.comet.CometMapExpressionSuite org.apache.comet.CometCsvExpressionSuite org.apache.comet.CometJsonExpressionSuite + org.apache.comet.CometJsonJvmSuite org.apache.comet.SparkErrorConverterSuite org.apache.comet.expressions.conditional.CometIfSuite org.apache.comet.expressions.conditional.CometCoalesceSuite diff --git a/spark/src/main/scala/org/apache/comet/codegen/CometBatchKernelCodegen.scala b/spark/src/main/scala/org/apache/comet/codegen/CometBatchKernelCodegen.scala index 0b2d430fc9..e394bbb7d7 100644 --- a/spark/src/main/scala/org/apache/comet/codegen/CometBatchKernelCodegen.scala +++ b/spark/src/main/scala/org/apache/comet/codegen/CometBatchKernelCodegen.scala @@ -23,7 +23,7 @@ import org.apache.arrow.vector._ import org.apache.arrow.vector.complex.{ListVector, MapVector, StructVector} import org.apache.arrow.vector.types.pojo.Field import org.apache.spark.internal.Logging -import org.apache.spark.sql.catalyst.expressions.{BoundReference, Expression, HigherOrderFunction, LambdaFunction, Literal, NamedLambdaVariable, Unevaluable} +import org.apache.spark.sql.catalyst.expressions.{BoundReference, Expression, Literal, Unevaluable} import org.apache.spark.sql.catalyst.expressions.codegen._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ diff --git a/spark/src/main/scala/org/apache/comet/serde/strings.scala b/spark/src/main/scala/org/apache/comet/serde/strings.scala index 04d74c471a..8f9f6c1dd6 100644 --- a/spark/src/main/scala/org/apache/comet/serde/strings.scala +++ b/spark/src/main/scala/org/apache/comet/serde/strings.scala @@ -29,7 +29,7 @@ import org.apache.comet.CometConf import org.apache.comet.CometSparkSessionExtensions.withInfo import org.apache.comet.expressions.{CometCast, CometEvalMode, RegExp} import org.apache.comet.serde.ExprOuterClass.Expr -import org.apache.comet.serde.QueryPlanSerde.{createBinaryExpr, exprToProtoInternal, optExprWithInfo, scalarFunctionExprToProto, scalarFunctionExprToProtoWithReturnType, serializeDataType} +import org.apache.comet.serde.QueryPlanSerde.{createBinaryExpr, exprToProtoInternal, optExprWithInfo, scalarFunctionExprToProto, scalarFunctionExprToProtoWithReturnType} object CometStringRepeat extends CometExpressionSerde[StringRepeat] { From ef4f749325a602e78b9fecb865087905e752f175 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 27 May 2026 06:53:13 -0600 Subject: [PATCH 4/4] ci: register CometJsonJvmSuite in macOS workflow --- .github/workflows/pr_build_macos.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/pr_build_macos.yml b/.github/workflows/pr_build_macos.yml index 7af77ca2c9..76bf124dec 100644 --- a/.github/workflows/pr_build_macos.yml +++ b/.github/workflows/pr_build_macos.yml @@ -243,6 +243,7 @@ jobs: org.apache.comet.CometMapExpressionSuite org.apache.comet.CometCsvExpressionSuite org.apache.comet.CometJsonExpressionSuite + org.apache.comet.CometJsonJvmSuite org.apache.comet.SparkErrorConverterSuite org.apache.comet.expressions.conditional.CometIfSuite org.apache.comet.expressions.conditional.CometCoalesceSuite