diff --git a/README.md b/README.md
index cba865d96a..4827879671 100644
--- a/README.md
+++ b/README.md
@@ -35,10 +35,12 @@ under the License.
-Apache DataFusion Comet is a high-performance accelerator for Apache Spark, built on top of the powerful
-[Apache DataFusion] query engine. Comet is designed to significantly enhance the
-performance of Apache Spark workloads while leveraging commodity hardware and seamlessly integrating with the
-Spark ecosystem without requiring any code changes.
+Apache DataFusion Comet is a high-performance accelerator for Apache Spark. Comet keeps Spark queries
+**Arrow-native end-to-end**: operators, expressions, shuffle, and broadcast all stay in Apache Arrow
+columnar format, avoiding the per-row overhead of Spark's row-based engine. Within the Arrow-native
+pipeline, operators and expressions execute as Rust code (via the [Apache DataFusion] query engine)
+or as JVM code that operates directly on Arrow batches. Comet integrates with the Spark ecosystem
+without requiring any code changes.
**Comet provides a ~2x speedup for TPC-DS @ SF 1000 (1TB), resulting in ~50% cost savings.**
@@ -58,17 +60,22 @@ See the [Comet Benchmarking Guide](https://datafusion.apache.org/comet/contribut
## What Comet Accelerates
-Comet replaces Spark operators and expressions with native Rust implementations that run on Apache DataFusion.
-It uses Apache Arrow for zero-copy data transfer between the JVM and native code.
+Comet replaces Spark operators and expressions with implementations that consume and produce Apache Arrow
+batches. Most run as native Rust code on top of Apache DataFusion; some run as JVM code over Arrow batches.
+Either way the work stays in the Comet pipeline without falling back to Spark's row-based engine.
- **Parquet scans**: native Parquet reader integrated with Spark's query planner
- **Apache Iceberg**: accelerated Parquet scans when reading Iceberg tables from Spark
(see the [Iceberg guide](https://datafusion.apache.org/comet/user-guide/iceberg.html))
-- **Shuffle**: native columnar shuffle with support for hash and range partitioning
+- **Shuffle**: Arrow-IPC columnar shuffle with support for hash and range partitioning, in a native Rust
+ implementation paired with a JVM fallback for unsupported partition key types
- **Expressions**: hundreds of supported Spark expressions across math, string, datetime, array,
map, JSON, hash, and predicate categories
- **Aggregations**: hash aggregate with support for `FILTER (WHERE ...)` clauses
- **Joins**: hash join, sort-merge join, and broadcast join
+- **Scala/Java UDFs**: experimental support for keeping Scala/Java scalar UDFs in the Comet pipeline
+ via Spark's whole-stage codegen (see the
+ [Scala UDF guide](https://datafusion.apache.org/comet/user-guide/scala_java_udfs.html))
For the authoritative lists, see the [supported expressions](https://datafusion.apache.org/comet/user-guide/expressions.html)
and [supported operators](https://datafusion.apache.org/comet/user-guide/operators.html) pages.
diff --git a/docs/source/about/gluten_comparison.md b/docs/source/about/gluten_comparison.md
index 3e59feffb7..1609f1859e 100644
--- a/docs/source/about/gluten_comparison.md
+++ b/docs/source/about/gluten_comparison.md
@@ -28,8 +28,9 @@ This document is based on Comet 0.16.0 and Gluten 1.6.0.
## Architecture
-Comet and Gluten have very similar architectures. Both are Spark plugins that translate Spark physical plans to
-a serialized representation and pass the serialized plan to native code for execution.
+Comet and Gluten have similar high-level architectures. Both are Spark plugins that translate Spark physical plans
+into a serialized representation and run them through an Arrow-columnar execution engine instead of Spark's
+row-based engine.
Gluten serializes the plans using the Substrait format and has an extensible architecture that supports execution
against multiple engines. Velox and Clickhouse are currently supported, but Velox is more widely used.
@@ -37,14 +38,19 @@ against multiple engines. Velox and Clickhouse are currently supported, but Velo
Comet serializes the plans in a proprietary Protocol Buffer format. Execution is delegated to Apache DataFusion. Comet
does not plan to support multiple engines, but rather focus on a tight integration between Spark and DataFusion.
+A second architectural difference is how each project handles JVM-side code. Gluten offloads supported operators to
+its native backend and falls back to row-based Spark execution otherwise. Comet additionally keeps a JVM execution
+path that operates directly on Arrow batches, which lets some constructs (today, Scala/Java scalar UDFs via Spark's
+whole-stage codegen) stay in the Arrow pipeline rather than triggering a fallback.
+
## Underlying Execution Engine: DataFusion vs Velox
-One of the main differences between Comet and Gluten is the choice of native execution engine.
+One of the main differences between Comet and Gluten is the choice of Arrow-columnar execution engine.
-Gluten uses Velox, which is an open-source C++ vectorized query engine created by Meta.
+Gluten uses Velox, which is an open-source C++ columnar query engine created by Meta.
-Comet uses Apache DataFusion, which is an open-source vectorized query engine implemented in Rust and is governed by the
-Apache Software Foundation.
+Comet uses Apache DataFusion, which is an open-source columnar query engine implemented in Rust and is governed by
+the Apache Software Foundation.
Velox and DataFusion are both mature query engines that are growing in popularity.
@@ -53,8 +59,8 @@ the choice of implementation language (Rust vs C++) and this may be the main fac
choosing a solution. For users wishing to implement UDFs in Rust, Comet would likely be a better choice. For users
wishing to implement UDFs in C++, Gluten would likely be a better choice.
-If users are just interested in speeding up their existing Spark jobs and do not need to implement UDFs in native
-code, then we suggest benchmarking with both solutions and choosing the fastest one for your use case.
+If users are just interested in speeding up their existing Spark jobs and do not need to implement UDFs in Rust
+or C++, then we suggest benchmarking with both solutions and choosing the fastest one for your use case.

@@ -75,9 +81,10 @@ Gluten supports Spark 3.3, 3.4, 3.5, 4.0, and 4.1.
Spark 4.0 enables ANSI SQL semantics by default, which changes how arithmetic overflow, invalid casts, division by
zero, and similar error conditions are handled. This is one area where the two projects currently differ.
-Comet implements ANSI semantics for the expressions it supports natively, including arithmetic overflow checks,
-ANSI cast behavior, and `try_*` variants. Queries running with `spark.sql.ansi.enabled=true` continue to be accelerated.
-See the [Comet Compatibility Guide] for details on which expressions have full ANSI coverage.
+Comet implements ANSI semantics for the expressions it supports in the Comet pipeline, including arithmetic
+overflow checks, ANSI cast behavior, and `try_*` variants. Queries running with `spark.sql.ansi.enabled=true`
+continue to be accelerated. See the [Comet Compatibility Guide] for details on which expressions have full
+ANSI coverage.
The Gluten Velox backend documents that ANSI mode is not supported and that any query executed with ANSI enabled
will fall back to vanilla Spark. See the [Gluten Velox limitations] page for the current status.
@@ -85,7 +92,7 @@ will fall back to vanilla Spark. See the [Gluten Velox limitations] page for the
[Gluten Velox limitations]: https://apache.github.io/gluten/velox-backend-limitations.html
For users adopting Spark 4.0 without disabling ANSI mode, this difference can have a significant impact on the
-fraction of a workload that runs natively.
+fraction of a workload Gluten can accelerate.
## Table Format Support
diff --git a/docs/source/contributor-guide/adding_a_new_expression.md b/docs/source/contributor-guide/adding_a_new_expression.md
index 0eb5fa55db..e8f206c1af 100644
--- a/docs/source/contributor-guide/adding_a_new_expression.md
+++ b/docs/source/contributor-guide/adding_a_new_expression.md
@@ -95,7 +95,7 @@ org.apache.comet.CometNativeException: Error from DataFusion:
Function 'levenshtein' expects 2 arguments but received 3.
```
-The classic case is `levenshtein`. Spark accepts an optional 3rd `threshold` argument, DataFusion's built-in is 2-arg only, so the 3-arg form fails native execution unless the serde sets the return type explicitly. Other names that exist in both engines with potentially different signatures include `concat`, `coalesce`, `sha2`, and `regexp_replace`. If you are adding a function whose name is shared with `datafusion-functions`, check the upstream signature before deciding how to serialize.
+The classic case is `levenshtein`. Spark accepts an optional 3rd `threshold` argument, DataFusion's built-in is 2-arg only, so the 3-arg form fails at execution time unless the serde sets the return type explicitly. Other names that exist in both engines with potentially different signatures include `concat`, `coalesce`, `sha2`, and `regexp_replace`. If you are adding a function whose name is shared with `datafusion-functions`, check the upstream signature before deciding how to serialize.
To avoid the registry lookup, write a custom `CometExpressionSerde` and use `scalarFunctionExprToProtoWithReturnType`, passing the Spark expression's declared `dataType`:
@@ -143,7 +143,7 @@ The `SupportLevel` sealed trait has three possible values:
- **`Compatible(notes: Option[String] = None)`** - Comet supports this expression with full compatibility with Spark, or may have known differences in specific edge cases unlikely to affect most users. This is the default if you don't override `getSupportLevel`.
- **`Incompatible(notes: Option[String] = None)`** - Comet supports this expression but results can differ from Spark. The expression will only be used if `spark.comet.expr.allowIncompatible=true` or the expression-specific config `spark.comet.expr..allowIncompatible=true` is set.
-- **`Unsupported(notes: Option[String] = None)`** - Comet does not support this expression under the current conditions. Spark will fall back to its native execution.
+- **`Unsupported(notes: Option[String] = None)`** - Comet does not support this expression under the current conditions. The query falls back to Spark for this expression.
All three accept an optional `notes` parameter to provide additional context that is logged for debugging.
@@ -307,7 +307,7 @@ override def getUnsupportedReasons(): Seq[String] = Seq(
#### Adding Spark-side Tests for the New Expression
-It is important to verify that the new expression is correctly recognized by the native execution engine and matches the expected Spark behavior. The preferred way to add test coverage is to write a Comet SQL Test. This approach is simpler than writing Comet Scala Tests and makes it easy to cover many input combinations and edge cases.
+It is important to verify that the new expression is correctly recognized by Comet and matches the expected Spark behavior. The preferred way to add test coverage is to write a Comet SQL Test. This approach is simpler than writing Comet Scala Tests and makes it easy to cover many input combinations and edge cases.
##### Writing a Comet SQL Test
diff --git a/docs/source/contributor-guide/adding_a_new_operator.md b/docs/source/contributor-guide/adding_a_new_operator.md
index de2a73da88..4c7d7b47ac 100644
--- a/docs/source/contributor-guide/adding_a_new_operator.md
+++ b/docs/source/contributor-guide/adding_a_new_operator.md
@@ -24,7 +24,7 @@ This guide explains how to add support for a new Spark physical operator in Apac
## Overview
`CometExecRule` is responsible for replacing Spark operators with Comet operators. There are different approaches to
-implementing Comet operators depending on where they execute and how they integrate with the native execution engine.
+implementing Comet operators depending on where they execute and how they integrate with the native DataFusion engine.
### Types of Comet Operators
@@ -32,10 +32,10 @@ implementing Comet operators depending on where they execute and how they integr
#### 1. Native Operators (`nativeExecs` map)
-These operators run entirely in native Rust code and are the primary way to accelerate Spark workloads. Native
-operators are registered in the `nativeExecs` map in `CometExecRule.scala`.
+These operators run entirely in native Rust code and are the primary way to accelerate Spark workloads. They are
+registered in the `nativeExecs` map in `CometExecRule.scala`.
-Key characteristics of native operators:
+Key characteristics:
- They are converted to their corresponding native protobuf representation
- They execute as DataFusion operators in the native engine
@@ -53,7 +53,7 @@ Key characteristics of sinks:
- They become `ScanExec` operators in the native plan (see `operator2Proto` in `CometExecRule.scala`)
- They can be leaf nodes that feed data into native execution blocks
- They are wrapped with `CometScanWrapper` or `CometSinkPlaceHolder` during plan transformation
-- Examples include operators that bring data from various sources into native execution
+- Examples include operators that bring data from various sources into native execution blocks
Examples: `UnionExec`, `CoalesceExec`, `CollectLimitExec`, `TakeOrderedAndProjectExec`
@@ -65,7 +65,7 @@ Special sinks (not in the `sinks` map but also treated as sinks):
#### 3. Comet JVM Operators
-These operators run in the JVM but are part of the Comet execution path. For JVM operators, all checks happen
+These operators run in the JVM but are part of the Comet pipeline. For JVM operators, all checks happen
in `CometExecRule` rather than using `CometOperatorSerde`, because they don't need protobuf serialization.
Examples: `CometBroadcastExchangeExec`, `CometShuffleExchangeExec`
@@ -83,7 +83,7 @@ When adding a new operator, choose based on these criteria:
**Use Sink Operators when:**
-- The operator serves as a data source for native execution (becomes a `ScanExec`)
+- The operator serves as a data source for a native execution block (becomes a `ScanExec`)
- The operator brings data from non-native sources (e.g., `UnionExec` combining multiple inputs)
- The operator is typically a leaf or near-leaf node in the execution tree
- The operator needs special handling to interface with the native engine
@@ -393,7 +393,7 @@ test("your operator") {
The `checkSparkAnswerAndOperator` helper verifies:
-1. Results match Spark's native execution
+1. Results match Spark's reference execution
2. Your operator is actually being used (not falling back)
#### Rust Unit Tests
@@ -418,7 +418,7 @@ Add your operator to the supported operators list in `docs/source/user-guide/lat
## Implementing a Sink Operator
-Sink operators are converted to `ScanExec` in the native plan and serve as entry points for native execution. The implementation is simpler than native operators because sink operators extend the `CometSink` base class which provides the conversion logic.
+Sink operators are converted to `ScanExec` in the native plan and serve as entry points for native execution blocks. The implementation is simpler than native operators because sink operators extend the `CometSink` base class which provides the conversion logic.
### Step 1: Create a CometOperatorSerde Implementation
@@ -516,7 +516,7 @@ val COMET_EXEC_YOUR_SINK_ENABLED: ConfigEntry[Boolean] =
### Step 4: Add Tests
-Test that your sink operator correctly feeds data into native execution:
+Test that your sink operator correctly feeds data into a native execution block:
```scala
test("your sink operator") {
diff --git a/docs/source/contributor-guide/development.md b/docs/source/contributor-guide/development.md
index 3785358cf3..fe77c00ca6 100644
--- a/docs/source/contributor-guide/development.md
+++ b/docs/source/contributor-guide/development.md
@@ -30,8 +30,8 @@ under the License.
## Threading Architecture
-Comet's native execution runs on a shared tokio multi-threaded runtime. Understanding this
-architecture is important because it affects how you write native operators and JVM callbacks.
+Comet's Rust side runs DataFusion plans on a shared tokio multi-threaded runtime. Understanding
+this architecture is important because it affects how you write Rust operators and JVM callbacks.
### How execution works
diff --git a/docs/source/contributor-guide/jvm_shuffle.md b/docs/source/contributor-guide/jvm_shuffle.md
index 2145c82eba..b7c014259d 100644
--- a/docs/source/contributor-guide/jvm_shuffle.md
+++ b/docs/source/contributor-guide/jvm_shuffle.md
@@ -27,12 +27,12 @@ alternative, see [Native Shuffle](native_shuffle.md).
Comet provides two shuffle implementations:
-- **CometNativeShuffle** (`CometExchange`): Fully native shuffle using Rust. Takes columnar input directly
- from Comet native operators and performs partitioning in native code.
+- **CometNativeShuffle** (`CometExchange`): Fully Rust-implemented shuffle. Takes Arrow input directly
+ from upstream Comet operators and performs partitioning in Rust.
- **CometColumnarShuffle** (`CometColumnarExchange`): JVM-based shuffle that operates on rows internally,
- buffers `UnsafeRow`s in memory pages, and uses native code (via JNI) to encode them to Arrow IPC format.
- Uses Spark's partitioner for partition assignment. Can accept either row-based or columnar input
- (columnar input is converted to rows via `ColumnarToRowExec`).
+ buffers `UnsafeRow`s in memory pages, and calls into Rust (via JNI) to encode them to Arrow IPC format.
+ Uses Spark's partitioner for partition assignment. Can accept either row-based or Arrow input
+ (Arrow input is converted to rows via `ColumnarToRowExec`).
The JVM shuffle is selected via `CometShuffleDependency.shuffleType`.
@@ -42,13 +42,13 @@ JVM shuffle (`CometColumnarExchange`) is used instead of native shuffle (`CometE
1. **Shuffle mode is explicitly set to "jvm"**: When `spark.comet.exec.shuffle.mode` is set to `jvm`.
-2. **Child plan is not a Comet native operator**: When the child plan is a Spark row-based operator
- (not a `CometPlan`), JVM shuffle is the only option since native shuffle requires columnar input
- from Comet operators.
+2. **Child plan is not a Comet operator**: When the child plan is a Spark row-based operator
+ (not a `CometPlan`), JVM shuffle is the only option since native shuffle requires Arrow input
+ from upstream Comet operators.
3. **Unsupported partition key types**: For `HashPartitioning` and `RangePartitioning`, native shuffle
only supports primitive types as partition keys. Complex types (struct, array, map) cannot be used
- as partition keys in native shuffle and will fall back to JVM columnar shuffle. Note that complex types are
+ as partition keys in native shuffle and will fall back to JVM shuffle. Note that complex types are
fully supported as data columns in both implementations.
## Input Handling
@@ -58,9 +58,9 @@ JVM shuffle (`CometColumnarExchange`) is used instead of native shuffle (`CometE
When the child plan is a Spark row-based operator, `CometColumnarExchange` calls `child.execute()` which
returns an `RDD[InternalRow]`. The rows flow directly to the JVM shuffle writers.
-### Comet Columnar Input
+### Comet Arrow Input
-When the child plan is a Comet native operator (e.g., `CometHashAggregate`) but JVM shuffle is selected
+When the child plan is a Comet operator (e.g., `CometHashAggregate`) but JVM shuffle is selected
(due to shuffle mode setting or unsupported partitioning), `CometColumnarExchange` still calls
`child.execute()`. Comet operators implement `doExecute()` by wrapping themselves with `ColumnarToRowExec`:
@@ -73,13 +73,13 @@ override def doExecute(): RDD[InternalRow] =
This means the data path becomes:
```
-Comet Native (columnar) → ColumnarToRowExec → rows → JVM Shuffle → Arrow IPC → columnar
+Comet operator (Arrow) → ColumnarToRowExec → rows → JVM Shuffle → Arrow IPC → Arrow
```
-This is less efficient than native shuffle which avoids the columnar-to-row conversion:
+This is less efficient than native shuffle which avoids the Arrow-to-row conversion:
```
-Comet Native (columnar) → Native Shuffle → Arrow IPC → columnar
+Comet operator (Arrow) → Native Shuffle → Arrow IPC → Arrow
```
### Why Use Spark's Partitioner?
@@ -87,7 +87,7 @@ Comet Native (columnar) → Native Shuffle → Arrow IPC → columnar
JVM shuffle uses row-based input so it can leverage Spark's existing partitioner infrastructure
(`partitioner.getPartition(key)`). This allows Comet to support all of Spark's partitioning schemes
without reimplementing them in Rust. Native shuffle, by contrast, serializes the partitioning scheme
-to protobuf and implements the partitioning logic in native code.
+to protobuf and implements the partitioning logic in Rust.
## Architecture
@@ -145,13 +145,13 @@ Selection logic in `CometShuffleManager.shouldBypassMergeSort()`:
### Writers
-| Class | Location | Description |
-| ----------------------------------- | ---------------------------------------------------- | ------------------------------------------------------------------------------------------------------------------- |
-| `CometBypassMergeSortShuffleWriter` | `.../shuffle/CometBypassMergeSortShuffleWriter.java` | Hash-based writer. Creates one `CometDiskBlockWriter` per partition. Supports async writes. |
-| `CometUnsafeShuffleWriter` | `.../shuffle/CometUnsafeShuffleWriter.java` | Sort-based writer. Uses `CometShuffleExternalSorter` to buffer and sort records, then merges spill files. |
-| `CometDiskBlockWriter` | `.../shuffle/CometDiskBlockWriter.java` | Buffers rows in memory pages for a single partition. Spills to disk via native encoding. Used by bypass writer. |
-| `CometShuffleExternalSorter` | `.../shuffle/sort/CometShuffleExternalSorter.java` | Buffers records across all partitions, sorts by partition ID, spills sorted data. Used by unsafe writer. |
-| `SpillWriter` | `.../shuffle/SpillWriter.java` | Base class for spill logic. Manages memory pages and calls `Native.writeSortedFileNative()` for Arrow IPC encoding. |
+| Class | Location | Description |
+| ----------------------------------- | ---------------------------------------------------- | --------------------------------------------------------------------------------------------------------------------------- |
+| `CometBypassMergeSortShuffleWriter` | `.../shuffle/CometBypassMergeSortShuffleWriter.java` | Hash-based writer. Creates one `CometDiskBlockWriter` per partition. Supports async writes. |
+| `CometUnsafeShuffleWriter` | `.../shuffle/CometUnsafeShuffleWriter.java` | Sort-based writer. Uses `CometShuffleExternalSorter` to buffer and sort records, then merges spill files. |
+| `CometDiskBlockWriter` | `.../shuffle/CometDiskBlockWriter.java` | Buffers rows in memory pages for a single partition. Spills to disk via the Rust IPC encoder. Used by bypass writer. |
+| `CometShuffleExternalSorter` | `.../shuffle/sort/CometShuffleExternalSorter.java` | Buffers records across all partitions, sorts by partition ID, spills sorted data. Used by unsafe writer. |
+| `SpillWriter` | `.../shuffle/SpillWriter.java` | Base class for spill logic. Manages memory pages and calls `Native.writeSortedFileNative()` for Arrow IPC encoding in Rust. |
### Reader
diff --git a/docs/source/contributor-guide/native_shuffle.md b/docs/source/contributor-guide/native_shuffle.md
index 18e80a90c8..f5cb1bfe2d 100644
--- a/docs/source/contributor-guide/native_shuffle.md
+++ b/docs/source/contributor-guide/native_shuffle.md
@@ -25,18 +25,18 @@ see [JVM Shuffle](jvm_shuffle.md).
## Overview
-Native shuffle takes columnar input directly from Comet native operators and performs partitioning,
-encoding, and writing in native Rust code. This avoids the columnar-to-row-to-columnar conversion
-overhead that JVM shuffle incurs.
+Native shuffle takes Arrow input directly from upstream Comet operators and performs partitioning,
+encoding, and writing as Rust code. This avoids the Arrow-to-row-to-Arrow conversion overhead that
+JVM shuffle incurs.
```
-Comet Native (columnar) → Native Shuffle → Arrow IPC → columnar
+Comet operator (Arrow) → Native Shuffle → Arrow IPC → Arrow
```
Compare this to JVM shuffle's data path:
```
-Comet Native (columnar) → ColumnarToRowExec → rows → JVM Shuffle → Arrow IPC → columnar
+Comet operator (Arrow) → ColumnarToRowExec → rows → JVM Shuffle → Arrow IPC → Arrow
```
## When Native Shuffle is Used
@@ -45,8 +45,8 @@ Native shuffle (`CometExchange`) is selected when all of the following condition
1. **Shuffle mode allows native**: `spark.comet.exec.shuffle.mode` is `native` or `auto`.
-2. **Child plan is a Comet native operator**: The child must be a `CometPlan` that produces
- columnar output. Row-based Spark operators require JVM shuffle.
+2. **Child plan is a Comet operator**: The child must be a `CometPlan` that produces Arrow
+ output. Row-based Spark operators require JVM shuffle.
3. **Supported partitioning type**: Native shuffle supports:
- `HashPartitioning`
@@ -70,7 +70,7 @@ Native shuffle (`CometExchange`) is selected when all of the following condition
┌─────────────────────────────────────────────────────────────────────────────┐
│ CometNativeShuffleWriter │
│ - Constructs protobuf operator plan │
-│ - Invokes native execution via CometExec.getCometIterator() │
+│ - Invokes the Rust shuffle writer via CometExec.getCometIterator() │
└─────────────────────────────────────────────────────────────────────────────┘
│
▼ (JNI)
@@ -106,7 +106,7 @@ Native shuffle (`CometExchange`) is selected when all of the following condition
| Class | Location | Description |
| ------------------------------ | ------------------------------------------------ | --------------------------------------------------------------------------------------------- |
| `CometShuffleExchangeExec` | `.../shuffle/CometShuffleExchangeExec.scala` | Physical plan node. Validates types and partitioning, creates `CometShuffleDependency`. |
-| `CometNativeShuffleWriter` | `.../shuffle/CometNativeShuffleWriter.scala` | Implements `ShuffleWriter`. Builds protobuf plan and invokes native execution. |
+| `CometNativeShuffleWriter` | `.../shuffle/CometNativeShuffleWriter.scala` | Implements `ShuffleWriter`. Builds the protobuf plan and invokes the Rust shuffle writer. |
| `CometShuffleDependency` | `.../shuffle/CometShuffleDependency.scala` | Extends `ShuffleDependency`. Holds shuffle type, schema, and range partition bounds. |
| `CometBlockStoreShuffleReader` | `.../shuffle/CometBlockStoreShuffleReader.scala` | Reads shuffle blocks via `ShuffleBlockFetcherIterator`. Decodes Arrow IPC to `ColumnarBatch`. |
| `NativeBatchDecoderIterator` | `.../shuffle/NativeBatchDecoderIterator.scala` | Reads compressed Arrow IPC from input stream. Calls native decode via JNI. |
diff --git a/docs/source/contributor-guide/plugin_overview.md b/docs/source/contributor-guide/plugin_overview.md
index 444c9ccb60..9073c00bd0 100644
--- a/docs/source/contributor-guide/plugin_overview.md
+++ b/docs/source/contributor-guide/plugin_overview.md
@@ -21,9 +21,11 @@ under the License.
## Overview
-The Comet plugin enhances Spark SQL by introducing optimized query execution and shuffle mechanisms leveraging
-native code. It integrates with Spark's plugin framework and extension API to replace or extend Spark's
-default behavior.
+The Comet plugin keeps Spark queries inside an Arrow-native pipeline: supported operators, expressions,
+shuffle, and broadcast all consume and produce Apache Arrow columnar batches rather than Spark's row
+format. Within that pipeline, individual operators and expressions may be implemented as Rust code
+running in DataFusion or as JVM code that operates directly on Arrow batches. Comet integrates with
+Spark's plugin framework and extension API to install the rules and shuffle machinery that drive this.
---
@@ -78,13 +80,13 @@ If an operator, expression, or data type is not supported by Comet then the reas
underlying Spark node and the plan will not be converted.
Comet does not support partially replacing subsets of the plan within a query stage because this would involve adding
-transitions to convert between row-based and columnar data between Spark operators and Comet operators and the overhead
-of this could outweigh the benefits of running parts of the query stage natively in Comet.
+transitions to convert between row-based and Arrow data between Spark operators and Comet operators and the overhead
+of this could outweigh the benefits of running parts of the query stage inside the Comet pipeline.
## Query Execution
-Once the plan has been transformed, any consecutive native Comet operators are combined into a `CometNativeExec` which contains
-a protocol buffer serialized version of the plan (the serialization code can be found in `QueryPlanSerde`).
+Once the plan has been transformed, any consecutive Rust-implemented Comet operators are combined into a `CometNativeExec`
+which contains a protocol buffer serialized version of the plan (the serialization code can be found in `QueryPlanSerde`).
Spark serializes the physical plan and sends it to the executors when executing tasks. The executors deserialize the
plan and invoke it.
@@ -97,8 +99,8 @@ Apache DataFusion `ExecutionPlan`. In some cases, Comet provides specialized phy
override the DataFusion versions to ensure compatibility with Apache Spark.
The leaf nodes in the physical plan are always `ScanExec` and each of these operators will make a JNI call to
-`CometBatchIterator.next()` to fetch the next input batch. The input could be a Comet native Parquet scan,
-a Spark exchange, or another native plan.
+`CometBatchIterator.next()` to fetch the next input batch. The input could be a `CometNativeScan` Parquet scan,
+a Spark exchange, or another DataFusion plan.
`CometNativeExec` creates a `CometExecIterator` and applies this iterator to the input RDD
partitions. Each call to `CometExecIterator.next()` will invoke `Native.executePlan`. Once the plan finishes
diff --git a/docs/source/contributor-guide/roadmap.md b/docs/source/contributor-guide/roadmap.md
index bae4ddba58..7421a2248a 100644
--- a/docs/source/contributor-guide/roadmap.md
+++ b/docs/source/contributor-guide/roadmap.md
@@ -82,8 +82,9 @@ performance for memory-intensive workloads.
## Java/Scala Columnar and Arrow UDF Support
Spark users frequently define custom UDFs in Java or Scala. Comet currently falls back to Spark when a query
-contains a JVM UDF. Adding support for calling Java/Scala UDFs that operate on columnar Arrow data directly
-from native execution will reduce fallbacks and allow more queries to run end-to-end in Comet.
+contains a JVM UDF other than the Scala UDFs covered by the codegen path. Adding support for Java/Scala UDFs
+that operate on Arrow batches directly will let more UDFs stay in the Comet pipeline and allow more queries
+to run end-to-end in Comet.
## Memory Management Improvements
diff --git a/docs/source/contributor-guide/spark_configs_support.md b/docs/source/contributor-guide/spark_configs_support.md
index fe1da3cd23..ac0200ddfc 100644
--- a/docs/source/contributor-guide/spark_configs_support.md
+++ b/docs/source/contributor-guide/spark_configs_support.md
@@ -27,11 +27,12 @@ verification has been performed, and any known gaps.
The status column uses these values:
-- **Supported** -- Comet runs the affected expressions natively under every value of
+- **Supported** -- Comet handles the affected expressions in the Comet pipeline under every value of
the config, and produces results matching Spark.
-- **Partial** -- Comet runs natively for some values of the config but falls back to
- Spark for others, or runs natively but with documented incompatibilities.
-- **Falls back** -- Comet does not run the affected expressions natively under this
+- **Partial** -- Comet handles the affected expressions in the Comet pipeline for some values of the
+ config but falls back to Spark for others, or handles them in the pipeline with documented
+ incompatibilities.
+- **Falls back** -- Comet does not handle the affected expressions in the Comet pipeline under this
config and always defers to Spark.
- **Unaudited** -- the config's interaction with Comet has not yet been verified.
diff --git a/docs/source/contributor-guide/sql-file-tests.md b/docs/source/contributor-guide/sql-file-tests.md
index 9b63dcf8ed..f0d5373237 100644
--- a/docs/source/contributor-guide/sql-file-tests.md
+++ b/docs/source/contributor-guide/sql-file-tests.md
@@ -145,7 +145,7 @@ mode controls how results are validated.
#### `query` (default mode)
-Checks that the query runs natively on Comet (not falling back to Spark) and that results
+Checks that the query runs in the Comet pipeline (not falling back to Spark) and that results
match Spark exactly.
```sql
@@ -155,9 +155,9 @@ SELECT abs(v) FROM test_abs
#### `query spark_answer_only`
-Only checks that Comet results match Spark. Does not assert that the query runs natively.
-Use this for expressions that Comet may not fully support yet but should still produce
-correct results.
+Only checks that Comet results match Spark. Does not assert that the query stays in the
+Comet pipeline. Use this for expressions that Comet may not fully support yet but should
+still produce correct results.
```sql
query spark_answer_only
@@ -232,8 +232,8 @@ SELECT array(1, 2, 3)[10]
`NULL`, boundary values, and negative numbers.
5. Add `query` blocks for each expression or behavior to test. Use the default `query` mode
- when you expect Comet to run the expression natively. Use `query spark_answer_only` when
- native execution is not yet expected.
+ when you expect Comet to handle the expression in the Comet pipeline. Use
+ `query spark_answer_only` when Comet support is not yet expected.
6. Run the tests to verify:
diff --git a/docs/source/index.md b/docs/source/index.md
index d1ea261824..b1f92c73c6 100644
--- a/docs/source/index.md
+++ b/docs/source/index.md
@@ -27,12 +27,14 @@ under the License.
Fork
-Apache DataFusion Comet is a high-performance accelerator for Apache Spark, built on top of the powerful
-[Apache DataFusion] query engine. Comet is designed to significantly enhance the
-performance of Apache Spark workloads while leveraging commodity hardware and seamlessly integrating with the
-Spark ecosystem without requiring any code changes.
-
-Comet also accelerates Apache Iceberg, when performing Parquet scans from Spark.
+Apache DataFusion Comet is a high-performance accelerator for Apache Spark. Comet keeps Spark
+queries **Arrow-native end-to-end**: operators, expressions, shuffle, and broadcast all stay in
+Apache Arrow columnar format, avoiding the per-row overhead of Spark's row-based engine. Within
+the Arrow-native pipeline, operators and expressions execute as Rust code (via the
+[Apache DataFusion] query engine) or as JVM code that operates directly on Arrow batches.
+
+Comet integrates with the Spark ecosystem without requiring any code changes, and also accelerates
+Apache Iceberg when performing Parquet scans from Spark.
[Apache DataFusion]: https://datafusion.apache.org
diff --git a/docs/source/user-guide/latest/compatibility/expressions/cast.md b/docs/source/user-guide/latest/compatibility/expressions/cast.md
index f7182e5713..eff2c613fa 100644
--- a/docs/source/user-guide/latest/compatibility/expressions/cast.md
+++ b/docs/source/user-guide/latest/compatibility/expressions/cast.md
@@ -24,9 +24,9 @@ Cast operations in Comet fall into three levels of support:
- **C (Compatible)**: The results match Apache Spark
- **I (Incompatible)**: The results may match Apache Spark for some inputs, but there are known issues where some inputs
will result in incorrect results or exceptions. The query stage will fall back to Spark by default. Setting
- `spark.comet.expression.Cast.allowIncompatible=true` will allow all incompatible casts to run natively in Comet, but this is not
- recommended for production use.
-- **U (Unsupported)**: Comet does not provide a native version of this cast expression and the query stage will fall back to
+ `spark.comet.expression.Cast.allowIncompatible=true` will allow all incompatible casts to run inside the Comet pipeline,
+ but this is not recommended for production use.
+- **U (Unsupported)**: Comet does not provide an implementation of this cast expression and the query stage will fall back to
Spark.
- **N/A**: Spark does not support this cast.
@@ -83,9 +83,9 @@ is the same regardless of the session timezone setting.
In Legacy mode, `CAST(date AS INT)`, `CAST(date AS LONG)`, and casts to all other numeric
types (Boolean, Byte, Short, Float, Double, Decimal) always return `NULL`. Comet handles
-this by short-circuiting to a null literal during query planning, so no native execution
-is needed. In ANSI and Try modes, Spark rejects these casts at analysis time (before
-execution reaches Comet).
+this by short-circuiting to a null literal during query planning, so the cast is removed
+before reaching the runtime. In ANSI and Try modes, Spark rejects these casts at analysis
+time (before execution reaches Comet).
## String to Timestamp
@@ -139,7 +139,7 @@ The result is always a wall-clock timestamp with no timezone conversion or DST a
Casting a `DecimalType` with a negative scale to `StringType` is marked as incompatible when
`spark.sql.legacy.allowNegativeScaleOfDecimal` is `false` (the default). When that config is
disabled, Spark cannot create negative-scale decimals, so Comet falls back to avoid running
-native execution on unexpected inputs.
+its cast implementation on unexpected inputs.
When `spark.sql.legacy.allowNegativeScaleOfDecimal=true`, the cast is compatible. Comet matches
Spark's behavior of using Java `BigDecimal.toString()` semantics, which produces scientific
diff --git a/docs/source/user-guide/latest/compatibility/scans.md b/docs/source/user-guide/latest/compatibility/scans.md
index 37524a829e..f5b921f91c 100644
--- a/docs/source/user-guide/latest/compatibility/scans.md
+++ b/docs/source/user-guide/latest/compatibility/scans.md
@@ -63,8 +63,8 @@ The following limitation may produce incorrect results without falling back to S
The following limitation raises an error at scan time rather than falling back to Spark:
- Invalid UTF-8 bytes in `STRING` columns. Spark permits arbitrary byte sequences in a `STRING`
- column (for example from `CAST(X'C1' AS STRING)`), but Comet's native execution path is built on
- Arrow, whose string type is strictly UTF-8. Reading a Parquet file whose `STRING` column contains
+ column (for example from `CAST(X'C1' AS STRING)`), but Comet's pipeline is built on Arrow,
+ whose string type is strictly UTF-8. Reading a Parquet file whose `STRING` column contains
non-UTF-8 bytes fails with `Parquet error: encountered non UTF-8 data`. Disable Comet for the
query, or cast the column to `BINARY` before persisting, if you need to preserve non-UTF-8 bytes.
See [#4121](https://github.com/apache/datafusion-comet/issues/4121).
diff --git a/docs/source/user-guide/latest/datasources.md b/docs/source/user-guide/latest/datasources.md
index f1d20fa3a4..a93cc413e4 100644
--- a/docs/source/user-guide/latest/datasources.md
+++ b/docs/source/user-guide/latest/datasources.md
@@ -23,9 +23,10 @@
### Parquet
-Parquet scans are performed natively by Comet if all data types in the schema are supported. When the scan
-falls back to Spark, enabling `spark.comet.convert.parquet.enabled` will immediately convert the data into
-Arrow format, allowing native execution to happen after that, but the process may not be efficient.
+Parquet scans are read directly into Arrow by Comet's Rust scan when all data types in the schema are supported.
+When the scan falls back to Spark, enabling `spark.comet.convert.parquet.enabled` will immediately convert the
+data into Arrow format so the rest of the plan can stay in the Comet pipeline, though the conversion itself
+may not be efficient.
### Apache Iceberg
@@ -35,17 +36,17 @@ Comet accelerates Iceberg scans of Parquet files. See the [Iceberg Guide] for mo
### CSV
-Comet provides experimental native CSV scan support. When `spark.comet.scan.csv.v2.enabled` is enabled, CSV files
-are read natively for improved performance. This feature is experimental and performance benefits are
-workload-dependent.
+Comet provides an experimental Rust-implemented CSV scan. When `spark.comet.scan.csv.v2.enabled` is enabled, CSV
+files are read directly into Arrow for improved performance. This feature is experimental and performance benefits
+are workload-dependent.
Alternatively, when `spark.comet.convert.csv.enabled` is enabled, data from Spark's CSV reader is immediately
-converted into Arrow format, allowing native execution to happen after that.
+converted into Arrow format so the rest of the plan can stay in the Comet pipeline.
### JSON
-Comet does not provide native JSON scan, but when `spark.comet.convert.json.enabled` is enabled, data is immediately
-converted into Arrow format, allowing native execution to happen after that.
+Comet does not provide a Rust-implemented JSON scan, but when `spark.comet.convert.json.enabled` is enabled,
+data is immediately converted into Arrow format so the rest of the plan can stay in the Comet pipeline.
## Data Catalogs
diff --git a/docs/source/user-guide/latest/iceberg.md b/docs/source/user-guide/latest/iceberg.md
index f22180ec77..82941a1804 100644
--- a/docs/source/user-guide/latest/iceberg.md
+++ b/docs/source/user-guide/latest/iceberg.md
@@ -22,7 +22,7 @@
## Native Reader
Comet's native Iceberg reader relies on reflection to extract `FileScanTask`s from Iceberg, which are
-then serialized to Comet's native execution engine (see
+then serialized to DataFusion for execution (see
[PR #2528](https://github.com/apache/datafusion-comet/pull/2528)).
The example below uses Spark's package downloader to retrieve Comet $COMET_VERSION and Iceberg
@@ -157,12 +157,12 @@ Iceberg ships several `ScalaUDF`s that surface in user queries and maintenance a
(`INT_ORDERED_BYTES`, `LONG_ORDERED_BYTES`, ..., `INTERLEAVE_BYTES`) over the sort key columns
during compaction.
-By default these UDFs cause the enclosing operator to fall back to Spark, which forces a
-columnar-to-row roundtrip and demotes the surrounding shuffle from `CometExchange` to
+By default these UDFs cause the enclosing operator to fall back to Spark, which forces an
+Arrow-to-row roundtrip and demotes the surrounding shuffle from `CometExchange` to
`CometColumnarExchange`. Enabling the experimental
[Scala UDF and Java UDF Support](scala_java_udfs.md) feature
-(`spark.comet.exec.scalaUDF.codegen.enabled=true`) routes these UDFs through native execution so
-the project, exchange, and sort operators around them stay on the Comet path end-to-end.
+(`spark.comet.exec.scalaUDF.codegen.enabled=true`) keeps these UDFs in the Comet pipeline so
+the project, exchange, and sort operators around them stay accelerated end-to-end.
### Task input metrics
diff --git a/docs/source/user-guide/latest/scala_java_udfs.md b/docs/source/user-guide/latest/scala_java_udfs.md
index e8163e494c..6ee2fd9c4b 100644
--- a/docs/source/user-guide/latest/scala_java_udfs.md
+++ b/docs/source/user-guide/latest/scala_java_udfs.md
@@ -19,7 +19,7 @@
# Scala UDF and Java UDF Support
-Comet executes Spark's Scala and Java [scalar user-defined functions (UDFs)](https://spark.apache.org/docs/latest/sql-ref-functions-udf-scalar.html) on the native Comet path. The presence of a UDF does not force the enclosing operator off the native path; surrounding native operators stay native.
+Comet executes Spark's Scala and Java [scalar user-defined functions (UDFs)](https://spark.apache.org/docs/latest/sql-ref-functions-udf-scalar.html) inside the Comet pipeline. The UDF body is JVM bytecode produced by Spark's whole-stage codegen, but it runs over Arrow batches alongside the surrounding Arrow-native operators rather than triggering a fallback to row-based Spark execution. The presence of a UDF does not force the enclosing operator out of the pipeline; surrounding Comet operators continue to run as usual.
This page covers Spark's `ScalaUDF` (Scala `udf(...)`, `spark.udf.register(...)` over Scala or Java functional interfaces, and SQL `CREATE FUNCTION ... AS 'com.example.MyUDF'`). Other UDF kinds (Python / Pandas, Hive, aggregate) are out of scope and continue to fall back to Spark.
@@ -27,16 +27,16 @@ This feature is experimental and disabled by default.
## Configuration
-| Key | Default | Description |
-| ------------------------------------------- | ------- | ------------------------------------------------------------------------------------------------------------------ |
-| `spark.comet.exec.scalaUDF.codegen.enabled` | `false` | When `true`, eligible `ScalaUDF`s run on the Comet path. When `false`, the enclosing operator falls back to Spark. |
+| Key | Default | Description |
+| ------------------------------------------- | ------- | -------------------------------------------------------------------------------------------------------------------------- |
+| `spark.comet.exec.scalaUDF.codegen.enabled` | `false` | When `true`, eligible `ScalaUDF`s run inside the Comet pipeline. When `false`, the enclosing operator falls back to Spark. |
## Supported
- User functions registered via `udf(...)`, `spark.udf.register(...)` (Scala or Java functional interfaces), or SQL `CREATE FUNCTION ... AS 'com.example.MyUDF'`.
- Scalar input/output types: `Boolean`, `Byte`, `Short`, `Int`, `Long`, `Float`, `Double`, `Decimal`, `String`, `Binary`, `Date`, `Timestamp`, `TimestampNTZ`.
- Complex input/output types with arbitrary nesting: `ArrayType`, `StructType`, `MapType`.
-- Composition with other Catalyst expressions inside the argument tree (e.g. `myUdf(upper(s))` runs as one native unit).
+- Composition with other Catalyst expressions inside the argument tree (e.g. `myUdf(upper(s))` runs as a single compiled kernel without leaving the Comet pipeline).
- Higher-order functions (`transform`, `filter`, `exists`, `aggregate`, `zip_with`, `map_filter`, `map_zip_with`, etc.) inside the argument tree.
## Not supported
@@ -45,7 +45,7 @@ This feature is experimental and disabled by default.
- Table UDFs and generators.
- Python `@udf` and Pandas `@pandas_udf`.
- Hive `GenericUDF` and `SimpleUDF`.
-- `CalendarIntervalType`, `NullType`, and `UserDefinedType` arguments and return types. UDT-typed columns fall back to Spark; for native execution, store and read the underlying representation directly (e.g. write MLlib `Vector` outputs as `Struct, values: Array>` rather than `VectorUDT`).
+- `CalendarIntervalType`, `NullType`, and `UserDefinedType` arguments and return types. UDT-typed columns fall back to Spark; to keep the work in the Comet pipeline, store and read the underlying representation directly (e.g. write MLlib `Vector` outputs as `Struct, values: Array>` rather than `VectorUDT`).
- Trees whose total nested-field count (output plus all input columns the UDF tree references) exceeds `spark.sql.codegen.maxFields` (default 100). Comet refuses these at plan time and the operator falls back to Spark.
When a UDF is rejected, the reason surfaces through Comet's standard fallback diagnostics; the query still runs on Spark.
diff --git a/docs/source/user-guide/latest/tuning.md b/docs/source/user-guide/latest/tuning.md
index a394b4a4cb..6d9527c94e 100644
--- a/docs/source/user-guide/latest/tuning.md
+++ b/docs/source/user-guide/latest/tuning.md
@@ -61,8 +61,8 @@ The valid pool types are:
- `fair_unified` (default when `spark.memory.offHeap.enabled=true` is set)
- `greedy_unified`
-Both pool types are shared across all native execution contexts within the same Spark task. When
-Comet executes a shuffle, it runs two native execution contexts concurrently (e.g. one for
+Both pool types are shared across all DataFusion execution contexts within the same Spark task. When
+Comet executes a shuffle, it runs two DataFusion execution contexts concurrently (e.g. one for
pre-shuffle operators and one for the shuffle writer). The shared pool ensures that the combined
memory usage stays within the per-task limit.
diff --git a/docs/source/user-guide/latest/understanding-comet-plans.md b/docs/source/user-guide/latest/understanding-comet-plans.md
index 7fb93c8f53..bfa1e7f0d7 100644
--- a/docs/source/user-guide/latest/understanding-comet-plans.md
+++ b/docs/source/user-guide/latest/understanding-comet-plans.md
@@ -27,44 +27,60 @@ inspect that behavior.
When Comet is enabled, the `CometSparkSessionExtensions` rules walk the
physical plan bottom-up and replace Spark operators with Comet equivalents
-where possible. Consecutive native operators are combined into a single block
-that is serialized as protobuf and executed by DataFusion on the executor.
-Operators that Comet does not support remain as their original Spark form.
-
-As a result, a plan can mix three kinds of nodes:
-
-- **`Comet*` nodes** that run natively in Rust (for example `CometProject`,
- `CometHashAggregate`).
-- **`Comet*` nodes that run on the JVM** but are still part of the Comet
- pipeline (for example `CometBroadcastExchange`, `CometColumnarExchange`).
-- **Standard Spark nodes** (for example `Project`, `HashAggregate`) where
- Comet either does not support the operator or has fallen back due to an
- unsupported expression, data type, or configuration.
-
-Wherever data crosses between columnar and row-based execution, Comet inserts
-a transition node such as `CometColumnarToRow` or `CometSparkRowToColumnar`.
+where possible. Consecutive Rust-implemented operators are combined into a
+single block that is serialized as protobuf and executed by DataFusion on
+the executor. Operators that Comet does not support remain as their
+original Spark form.
+
+The unifying property of the Comet pipeline is that data stays in Apache
+Arrow columnar format throughout: operators, expressions, shuffle, and
+broadcast all consume and produce Arrow batches. Within that pipeline an
+individual operator or expression may be implemented in Rust (via
+DataFusion) or in JVM code that operates directly on Arrow batches. The
+implementation language is an internal detail; from the query's
+perspective the work is Arrow-native regardless.
+
+As a result, a plan can mix four kinds of nodes:
+
+- **Arrow-native Rust operators** that execute as Rust code in DataFusion
+ (for example `CometProject`, `CometHashAggregate`, `CometSort`).
+- **Arrow-native JVM expressions** that operate directly on Arrow batches
+ from JVM code (today, the Scala UDF codegen path; future Arrow UDFs and
+ hybrid JVM/native expressions slot in here as well).
+- **Arrow-native JVM plumbing operators** that coordinate Arrow batches
+ without per-row compute (for example `CometUnion`, `CometCoalesce`,
+ `CometBroadcastExchange`).
+- **Spark fallback nodes** (for example `Project`, `HashAggregate`,
+ plain `Exchange`) where Comet either does not support the operator or
+ has fallen back due to an unsupported expression, data type, or
+ configuration. These run as row-based Spark execution.
+
+Wherever data crosses between Arrow columnar and Spark row-based execution,
+Comet inserts a transition node such as `CometColumnarToRow` or
+`CometSparkRowToColumnar`.
## Reading a Plan
You can print a plan with `df.explain("formatted")` or `EXPLAIN FORMATTED `, and
the same plan is shown in the Spark SQL UI. When reading a plan, look for:
-- **Node prefix.** `Comet*` nodes are accelerated by Comet. Anything without
- the prefix is unmodified Spark.
+- **Node prefix.** `Comet*` nodes are part of the Comet pipeline. Anything
+ without the prefix is unmodified Spark.
- **Transitions.** `CometColumnarToRow`, `CometNativeColumnarToRow`, and
- `CometSparkRowToColumnar` mark boundaries between columnar Comet execution
- and row-based Spark execution. Frequent transitions usually indicate
- fallback inside the plan.
-- **Exchange type.** `CometExchange` is the native shuffle path,
- `CometColumnarExchange` is the JVM columnar shuffle path, and a plain
- `Exchange` means Spark shuffle. See [Shuffle Operators](#shuffle-operators)
- below.
+ `CometSparkRowToColumnar` mark boundaries between the Arrow-native
+ pipeline and row-based Spark execution. Frequent transitions usually
+ indicate fallback inside the plan.
+- **Exchange type.** `CometExchange` is the native shuffle path (partition,
+ encode, and compress run in Rust), `CometColumnarExchange` is the JVM
+ shuffle path, and a plain `Exchange` means Spark shuffle. See
+ [Shuffle Operators](#shuffle-operators) below.
## Fallback
-A "fallback" happens when Comet cannot translate part of a plan into native
-execution. Fallback can be partial (a subtree falls back while the rest stays
-native) or full (no Comet nodes appear).
+A "fallback" happens when Comet cannot translate part of a plan into the
+Comet pipeline and the work runs as row-based Spark execution instead.
+Fallback can be partial (a subtree falls back while the rest stays in the
+Comet pipeline) or full (no Comet nodes appear).
Common reasons:
@@ -88,16 +104,16 @@ They serve different purposes and produce output in different places.
| Config | Output destination | What you see |
| ---------------------------------------- | ---------------------------------- | --------------------------------------------------------------------------------------------- |
-| `spark.comet.explainFallback.enabled` | Driver log (only when fallback) | A WARN with the list of reasons each query stage could not run natively. |
+| `spark.comet.explainFallback.enabled` | Driver log (only when fallback) | A WARN with the list of reasons each query stage could not stay in the Comet pipeline. |
| `spark.comet.logFallbackReasons.enabled` | Driver log | One WARN per fallback reason as it is encountered, without surrounding plan context. |
| `spark.comet.explain.format` | Spark SQL UI (Spark 4.0 and newer) | Annotated plan or fallback-reason list, depending on `verbose` (default) or `fallback` value. |
-| `spark.comet.explain.native.enabled` | Executor logs, per task | The DataFusion native plan with metrics, useful for inspecting native execution. |
+| `spark.comet.explain.native.enabled` | Executor logs, per task | The DataFusion plan with metrics, useful for inspecting the Rust-side execution. |
### `spark.comet.explainFallback.enabled`
-Logs a single WARN listing the reasons each query stage could not be executed
-natively. Nothing is logged when the entire stage runs in Comet. Useful as a
-low-noise check that fallback is or is not happening.
+Logs a single WARN listing the reasons each query stage fell back to Spark.
+Nothing is logged when the entire stage stays in the Comet pipeline. Useful
+as a low-noise check that fallback is or is not happening.
### `spark.comet.logFallbackReasons.enabled`
@@ -131,12 +147,12 @@ the config has no effect there.
### `spark.comet.explain.native.enabled`
-When enabled, each executor task logs the DataFusion native plan it executes,
-along with metrics. This is verbose because there is one plan per task, but it
-is the only way to see the native plan as DataFusion sees it (including how
+When enabled, each executor task logs the DataFusion plan it executes,
+along with metrics. This is verbose because there is one plan per task, but
+it is the only way to see the plan as DataFusion sees it (including how
operators were arranged after Comet's serialization). See the
-[Metrics Guide](metrics.md) for details on the native metrics that appear in
-this output.
+[Metrics Guide](metrics.md) for details on the metrics that appear in this
+output.
## Comet Operator Reference
@@ -145,17 +161,17 @@ by role. Names match what is shown in the plan output.
### Scans
-| Node | Description |
-| ------------------------ | --------------------------------------------------------------------------------------------- |
-| `CometBatchScan` | DataSource V2 scan, including Iceberg Parquet, that produces Arrow batches consumed by Comet. |
-| `CometNativeScan` | Fully native Parquet scan that runs entirely in DataFusion. |
-| `CometIcebergNativeScan` | Fully native Iceberg Parquet scan. |
-| `CometCsvNativeScan` | Fully native CSV scan (experimental). |
+| Node | Description |
+| ------------------------ | ----------------------------------------------------------------------------------------- |
+| `CometBatchScan` | DataSource V2 scan, including Iceberg Parquet, with a JVM reader producing Arrow batches. |
+| `CometNativeScan` | Fully Rust-implemented Parquet scan that runs entirely in DataFusion. |
+| `CometIcebergNativeScan` | Fully Rust-implemented Iceberg Parquet scan. |
+| `CometCsvNativeScan` | Fully Rust-implemented CSV scan (experimental). |
-### Native Execution Operators
+### Arrow-Native Rust Operators
-These run natively in DataFusion. When several appear consecutively in a plan,
-they execute as a single fused native block.
+These execute as Rust code in DataFusion. When several appear consecutively
+in a plan, they run as a single fused block.
| Node | Spark equivalent |
| ---------------------------- | ----------------------------------------------- |
@@ -173,9 +189,11 @@ they execute as a single fused native block.
| `CometWindow` | `WindowExec` |
| `CometTakeOrderedAndProject` | `TakeOrderedAndProjectExec` |
-### JVM-Side Operators
+### Arrow-Native JVM Plumbing
-These keep their data on the JVM but participate in the Comet pipeline.
+These coordinate Arrow batches from JVM code without performing per-row
+compute. They are part of the Comet pipeline and consume and produce Arrow
+batches, but the work happens in Scala/Java.
| Node | Notes |
| ------------------------ | ------------------------------------------------------------------------------------- |
@@ -185,24 +203,35 @@ These keep their data on the JVM but participate in the Comet pipeline.
| `CometBroadcastExchange` | Broadcast exchange producing serialized Arrow batches that the consumer can decode. |
| `CometSubqueryBroadcast` | Companion to `CometBroadcastExchange` for dynamic partition pruning subqueries. |
+### Arrow-Native JVM Expressions
+
+Some expressions execute as JVM code that operates directly on Arrow batches
+rather than as Rust code in DataFusion. Today this category covers the Scala
+UDF codegen path (see the [Scala UDF and Java UDF Support](scala_java_udfs.md)
+guide); future Arrow-format UDFs and hybrid JVM/native expressions slot in
+here as well. These expressions do not introduce a new plan-node prefix:
+they appear inside the operators that contain them (for example a
+`CometProject` whose project list includes a codegen-dispatched
+`ScalaUDF`).
+
### Shuffle Operators
Comet has two shuffle implementations and the plan tells you which one is in
-use:
+use. Both serialize batches via Arrow IPC; the difference is implementation
+language, not data format.
- **`CometExchange`** is the **native shuffle** path. The child must already
- be a Comet operator producing columnar Arrow batches; the node calls
+ be a Comet operator producing Arrow batches; the node calls
`executeColumnar()` on its child and the partition, encode, and compress
steps run in native code. Hash and range partitioning **keys** must be
primitive types because native hashing and ordering do not support complex
types, but the data columns themselves can include `StructType`,
- `ArrayType`, and `MapType` since batches are serialized via the Arrow IPC
- writer.
-- **`CometColumnarExchange`** is the **JVM columnar shuffle** path. It accepts
- either Spark row-based input or Comet columnar input, which makes it the
- fallback when the child is not a Comet operator or when a hash/range key
- type is not supported by native shuffle (for example, collated strings). It
- is still preferred over Spark's native shuffle when Comet shuffle is
+ `ArrayType`, and `MapType`.
+- **`CometColumnarExchange`** is the **JVM shuffle** path. It accepts either
+ Spark row-based input or Comet Arrow input, which makes it the fallback
+ when the child is not a Comet operator or when a hash/range key type is
+ not supported by native shuffle (for example, collated strings). It is
+ still preferred over Spark's built-in shuffle when Comet shuffle is
enabled.
Both paths support the same set of partitioning schemes
@@ -215,16 +244,17 @@ shuffle and choose between the implementations.
### Columnar/Row Transitions
-Comet inserts these nodes wherever data has to cross the columnar/row boundary.
-Multiple implementations exist because the optimal strategy depends on what
-produced the columnar data.
-
-| Node | Direction | Notes |
-| ------------------------------ | ------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
-| `CometColumnarToRow` | columnar → row | JVM-based row conversion. A fork of Spark's `ColumnarToRowExec` that includes the SPARK-50235 fix. |
-| `CometNativeColumnarToRow` | columnar → row | Native row conversion that decodes broadcast Arrow batches via `NativeColumnarToRowConverter`. Used downstream of `CometBroadcastExchange`. Zero-copy for variable-length types and avoids an extra JVM materialization step. |
-| `CometSparkColumnarToColumnar` | columnar → columnar | Converts a Spark columnar input (a non-Comet `ColumnarBatch`) into Comet's Arrow batches. |
-| `CometSparkRowToColumnar` | row → columnar | Converts a Spark row input into Comet's Arrow batches. |
+Comet inserts these nodes wherever data has to cross between the Arrow-native
+pipeline and Spark's row-based execution, or between a non-Comet Spark
+columnar batch and Arrow. Multiple implementations exist because the optimal
+strategy depends on what produced the incoming data.
+
+| Node | Direction | Notes |
+| ------------------------------ | ------------------ | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ |
+| `CometColumnarToRow` | Arrow → row | JVM-based row conversion. A fork of Spark's `ColumnarToRowExec` that includes the SPARK-50235 fix. |
+| `CometNativeColumnarToRow` | Arrow → row | Native row conversion that decodes broadcast Arrow batches via `NativeColumnarToRowConverter`. Used downstream of `CometBroadcastExchange`. Zero-copy for variable-length types and avoids an extra JVM materialization. |
+| `CometSparkColumnarToColumnar` | Spark cols → Arrow | Converts a Spark columnar input (a non-Comet `ColumnarBatch`) into Comet's Arrow batches. |
+| `CometSparkRowToColumnar` | row → Arrow | Converts a Spark row input into Comet's Arrow batches. |
The two `CometSpark*` names come from a single `CometSparkToColumnarExec`
operator that picks the node name based on whether its child supports