From 327210530796caba950ce5d3a4851d8911a0e5b1 Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Fri, 22 May 2026 07:46:49 -0400 Subject: [PATCH 1/5] add NullType to toArrowType --- .../main/scala/org/apache/spark/sql/comet/util/Utils.scala | 1 + .../test/scala/org/apache/comet/exec/CometExecSuite.scala | 7 +++++++ 2 files changed, 8 insertions(+) diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/util/Utils.scala b/spark/src/main/scala/org/apache/spark/sql/comet/util/Utils.scala index 783367c054..4605e641f1 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/util/Utils.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/util/Utils.scala @@ -148,6 +148,7 @@ object Utils extends CometTypeShim with Logging { } case TimestampNTZType => new ArrowType.Timestamp(TimeUnit.MICROSECOND, null) + case NullType => ArrowType.Null.INSTANCE case dt if isTimeType(dt) => new ArrowType.Time(TimeUnit.NANOSECOND, 64) case _ => diff --git a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala index 16601d056b..8bf00de20c 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala @@ -3925,6 +3925,13 @@ class CometExecSuite extends CometTestBase { } } + test("CometLocalTableScanExec handles NullType column") { + withSQLConf(CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.key -> "true") { + val df = spark.sql("SELECT * FROM VALUES ('a', null), ('b', null) AS t(x, y)") + checkSparkAnswer(df) + } + } + test("Native_datafusion reports correct files and bytes scanned") { val inputFiles = 2 From cf8cd338c260a84692c37a8e1938a2f2c8183c56 Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Fri, 22 May 2026 08:02:09 -0400 Subject: [PATCH 2/5] add NullType to shuffles --- native/shuffle/src/spark_unsafe/row.rs | 15 ++++++++++-- .../shuffle/CometShuffleExchangeExec.scala | 6 ++--- .../exec/CometColumnarShuffleSuite.scala | 23 ++++--------------- .../comet/exec/CometNativeShuffleSuite.scala | 6 +++++ 4 files changed, 27 insertions(+), 23 deletions(-) diff --git a/native/shuffle/src/spark_unsafe/row.rs b/native/shuffle/src/spark_unsafe/row.rs index ec0903bc56..6ffe9d0b6e 100644 --- a/native/shuffle/src/spark_unsafe/row.rs +++ b/native/shuffle/src/spark_unsafe/row.rs @@ -28,8 +28,8 @@ use arrow::array::{ builder::{ ArrayBuilder, BinaryBuilder, BinaryDictionaryBuilder, BooleanBuilder, Date32Builder, Decimal128Builder, Float32Builder, Float64Builder, Int16Builder, Int32Builder, - Int64Builder, Int8Builder, ListBuilder, MapBuilder, StringBuilder, StringDictionaryBuilder, - StructBuilder, TimestampMicrosecondBuilder, + Int64Builder, Int8Builder, ListBuilder, MapBuilder, NullBuilder, StringBuilder, + StringDictionaryBuilder, StructBuilder, TimestampMicrosecondBuilder, }, types::Int32Type, Array, ArrayRef, RecordBatch, RecordBatchOptions, @@ -267,6 +267,10 @@ pub(super) fn append_field( append_field_to_builder!(Date32Builder, |builder: &mut Date32Builder| builder .append_value(row.get_date(idx))); } + DataType::Null => { + let field_builder = get_field_builder!(struct_builder, NullBuilder, idx); + field_builder.append_null(); + } DataType::Timestamp(TimeUnit::Microsecond, _) => { append_field_to_builder!( TimestampMicrosecondBuilder, @@ -1148,6 +1152,12 @@ fn append_columns( .append_value(row.get_date(idx)) ); } + DataType::Null => { + let null_builder = downcast_builder_ref!(NullBuilder, builder); + for _ in row_start..row_end { + null_builder.append_null(); + } + } DataType::Timestamp(TimeUnit::Microsecond, _) => { append_column_to_builder!( TimestampMicrosecondBuilder, @@ -1252,6 +1262,7 @@ fn make_builders( } } DataType::Date32 => Box::new(Date32Builder::with_capacity(row_num)), + DataType::Null => Box::new(NullBuilder::new()), DataType::Timestamp(TimeUnit::Microsecond, _) => { Box::new(TimestampMicrosecondBuilder::with_capacity(row_num).with_data_type(dt.clone())) } diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala index 493c20f8b7..16e7a8b774 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala @@ -40,7 +40,7 @@ import org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec import org.apache.spark.sql.execution.exchange.{ENSURE_REQUIREMENTS, ShuffleExchangeExec, ShuffleExchangeLike, ShuffleOrigin} import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics, SQLShuffleReadMetricsReporter, SQLShuffleWriteMetricsReporter} import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.{ArrayType, BinaryType, BooleanType, ByteType, DataType, DateType, DecimalType, DoubleType, FloatType, IntegerType, LongType, MapType, ShortType, StringType, StructType, TimestampNTZType, TimestampType} +import org.apache.spark.sql.types.{ArrayType, BinaryType, BooleanType, ByteType, DataType, DateType, DecimalType, DoubleType, FloatType, IntegerType, LongType, MapType, NullType, ShortType, StringType, StructType, TimestampNTZType, TimestampType} import org.apache.spark.sql.vectorized.ColumnarBatch import org.apache.spark.util.MutablePair import org.apache.spark.util.collection.unsafe.sort.{PrefixComparators, RecordComparator} @@ -364,7 +364,7 @@ object CometShuffleExchangeExec def supportedSerializableDataType(dt: DataType): Boolean = dt match { case _: BooleanType | _: ByteType | _: ShortType | _: IntegerType | _: LongType | _: FloatType | _: DoubleType | _: StringType | _: BinaryType | _: TimestampType | - _: TimestampNTZType | _: DecimalType | _: DateType => + _: TimestampNTZType | _: DecimalType | _: DateType | _: NullType => true case StructType(fields) => fields.nonEmpty && fields.forall(f => supportedSerializableDataType(f.dataType)) @@ -487,7 +487,7 @@ object CometShuffleExchangeExec def supportedSerializableDataType(dt: DataType): Boolean = dt match { case _: BooleanType | _: ByteType | _: ShortType | _: IntegerType | _: LongType | _: FloatType | _: DoubleType | _: StringType | _: BinaryType | _: TimestampType | - _: TimestampNTZType | _: DecimalType | _: DateType => + _: TimestampNTZType | _: DecimalType | _: DateType | _: NullType => true case StructType(fields) => fields.nonEmpty && fields.forall(f => supportedSerializableDataType(f.dataType)) && diff --git a/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala index 86c6a6aa4b..70d427972a 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala @@ -22,14 +22,13 @@ package org.apache.comet.exec import java.nio.file.{Files, Paths} import scala.reflect.runtime.universe._ -import scala.util.Random import org.scalactic.source.Position import org.scalatest.Tag import org.apache.hadoop.fs.Path import org.apache.spark.{Partitioner, SparkConf} -import org.apache.spark.sql.{CometTestBase, DataFrame, RandomDataGenerator, Row} +import org.apache.spark.sql.{CometTestBase, DataFrame, Row} import org.apache.spark.sql.comet.execution.shuffle.{CometShuffleDependency, CometShuffleExchangeExec, CometShuffleManager} import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanHelper, AQEShuffleReadExec, ShuffleQueryStageExec} import org.apache.spark.sql.execution.exchange.ReusedExchangeExec @@ -94,22 +93,10 @@ abstract class CometColumnarShuffleSuite extends CometTestBase with AdaptiveSpar """.stripMargin)) } - test("Fallback to Spark for unsupported input besides ordering") { - val dataGenerator = RandomDataGenerator - .forType( - dataType = NullType, - nullable = true, - new Random(System.nanoTime()), - validJulianDatetime = false) - .get - - val schema = new StructType() - .add("index", IntegerType, nullable = false) - .add("col", NullType, nullable = true) - val rdd = - spark.sparkContext.parallelize((1 to 20).map(i => Row(i, dataGenerator()))) - val df = spark.createDataFrame(rdd, schema).orderBy("index").coalesce(1) - checkSparkAnswer(df) + test("columnar shuffle with NullType passthrough column") { + val df = sql("SELECT x, y FROM VALUES ('a', null), ('b', null), ('c', null) AS t(x, y)") + val shuffled = df.repartition(2, $"x") + checkShuffleAnswer(shuffled, 1) } test("columnar shuffle on nested struct including nulls") { diff --git a/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala index e0ef1df1f4..60637102f0 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala @@ -218,6 +218,12 @@ class CometNativeShuffleSuite extends CometTestBase with AdaptiveSparkPlanHelper } } + test("native shuffle with NullType passthrough column") { + val df = spark.sql("SELECT x, y FROM VALUES ('a', null), ('b', null), ('c', null) AS t(x, y)") + val shuffled = df.repartition(2, $"x") + checkShuffleAnswer(shuffled, 1) + } + test("fix: Comet native shuffle with binary data") { withParquetTable((0 until 5).map(i => (i, (i + 1).toLong)), "tbl") { val df = sql("SELECT cast(cast(_1 as STRING) as BINARY) as binary, _2 FROM tbl") From 0ed96bad7b681e8f4a7addc34fb1aa0ab7cc6efb Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Wed, 27 May 2026 17:32:05 -0400 Subject: [PATCH 3/5] Remove changes unrelated to NullType and LocalTableScanexec. --- .../scala/org/apache/comet/exec/CometExecSuite.scala | 8 ++++++++ .../org/apache/comet/exec/CometNativeShuffleSuite.scala | 9 ++++++--- 2 files changed, 14 insertions(+), 3 deletions(-) diff --git a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala index 8bf00de20c..507ef3583e 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala @@ -3932,6 +3932,14 @@ class CometExecSuite extends CometTestBase { } } + test("CometLocalTableScanExec handles NullType nested in struct/array/map") { + withSQLConf(CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.key -> "true") { + checkSparkAnswer( + spark.sql("SELECT named_struct('a', 1, 'b', null) AS s, array(null, null) AS a, " + + "map('k', null) AS m")) + } + } + test("Native_datafusion reports correct files and bytes scanned") { val inputFiles = 2 diff --git a/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala index 60637102f0..86c99dde79 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala @@ -219,9 +219,12 @@ class CometNativeShuffleSuite extends CometTestBase with AdaptiveSparkPlanHelper } test("native shuffle with NullType passthrough column") { - val df = spark.sql("SELECT x, y FROM VALUES ('a', null), ('b', null), ('c', null) AS t(x, y)") - val shuffled = df.repartition(2, $"x") - checkShuffleAnswer(shuffled, 1) + withSQLConf(CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.key -> "true") { + val df = + spark.sql("SELECT x, y FROM VALUES ('a', null), ('b', null), ('c', null) AS t(x, y)") + val shuffled = df.repartition(2, $"x") + checkShuffleAnswer(shuffled, 1) + } } test("fix: Comet native shuffle with binary data") { From 68381a01b102236077a64e1f9465facf3ad0d930 Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Wed, 27 May 2026 17:40:50 -0400 Subject: [PATCH 4/5] Tighten tests. --- .../test/scala/org/apache/comet/exec/CometExecSuite.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala index 507ef3583e..a1460427c0 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala @@ -3928,15 +3928,15 @@ class CometExecSuite extends CometTestBase { test("CometLocalTableScanExec handles NullType column") { withSQLConf(CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.key -> "true") { val df = spark.sql("SELECT * FROM VALUES ('a', null), ('b', null) AS t(x, y)") - checkSparkAnswer(df) + checkSparkAnswerAndOperator(df) } } test("CometLocalTableScanExec handles NullType nested in struct/array/map") { withSQLConf(CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.key -> "true") { - checkSparkAnswer( + checkSparkAnswerAndOperator( spark.sql("SELECT named_struct('a', 1, 'b', null) AS s, array(null, null) AS a, " + - "map('k', null) AS m")) + "map('k', null) AS m FROM VALUES (1), (2) AS t(id)")) } } From cd84c7ab39295b302ce32767271b90deda6d9019 Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Wed, 27 May 2026 19:22:56 -0400 Subject: [PATCH 5/5] I missed a commit. --- native/shuffle/src/spark_unsafe/list.rs | 8 +++++++- .../apache/comet/exec/CometColumnarShuffleSuite.scala | 6 ++++++ .../org/apache/comet/exec/CometNativeShuffleSuite.scala | 9 +++++++++ 3 files changed, 22 insertions(+), 1 deletion(-) diff --git a/native/shuffle/src/spark_unsafe/list.rs b/native/shuffle/src/spark_unsafe/list.rs index 3fea3fadeb..14f9feb843 100644 --- a/native/shuffle/src/spark_unsafe/list.rs +++ b/native/shuffle/src/spark_unsafe/list.rs @@ -24,7 +24,7 @@ use arrow::array::{ builder::{ ArrayBuilder, BinaryBuilder, BooleanBuilder, Date32Builder, Decimal128Builder, Float32Builder, Float64Builder, Int16Builder, Int32Builder, Int64Builder, Int8Builder, - ListBuilder, StringBuilder, StructBuilder, TimestampMicrosecondBuilder, + ListBuilder, NullBuilder, StringBuilder, StructBuilder, TimestampMicrosecondBuilder, }, MapBuilder, }; @@ -393,6 +393,12 @@ pub fn append_to_builder( let builder = downcast_builder_ref!(Date32Builder, builder); array.append_dates_to_builder::(builder); } + DataType::Null => { + let builder = downcast_builder_ref!(NullBuilder, builder); + for _ in 0..array.get_num_elements() { + builder.append_null(); + } + } DataType::Binary => { add_values!( BinaryBuilder, diff --git a/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala index 70d427972a..b0be2b90ac 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala @@ -99,6 +99,12 @@ abstract class CometColumnarShuffleSuite extends CometTestBase with AdaptiveSpar checkShuffleAnswer(shuffled, 1) } + test("columnar shuffle with Map[_, NullType] column") { + val df = sql("SELECT id, map(id, null) AS m FROM VALUES (1), (2), (3) AS t(id)") + val shuffled = df.repartition(2, $"id") + checkShuffleAnswer(shuffled, 1) + } + test("columnar shuffle on nested struct including nulls") { Seq(10, 201).foreach { numPartitions => Seq("1.0", "10.0").foreach { ratio => diff --git a/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala index 86c99dde79..7c70c373fe 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala @@ -227,6 +227,15 @@ class CometNativeShuffleSuite extends CometTestBase with AdaptiveSparkPlanHelper } } + test("native shuffle with Map[_, NullType] column") { + withSQLConf(CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.key -> "true") { + val df = spark.sql("SELECT id, map(id, null) AS m FROM VALUES (1), (2), (3) AS t(id)") + val shuffled = df.repartition(2, $"id") + println(shuffled.queryExecution.executedPlan) + checkShuffleAnswer(shuffled, 1) + } + } + test("fix: Comet native shuffle with binary data") { withParquetTable((0 until 5).map(i => (i, (i + 1).toLong)), "tbl") { val df = sql("SELECT cast(cast(_1 as STRING) as BINARY) as binary, _2 FROM tbl")