From 51be9a55007c2dfdef01ed864fb4119561332f87 Mon Sep 17 00:00:00 2001
From: Andy Grove
Date: Mon, 25 May 2026 11:11:05 -0600
Subject: [PATCH 1/4] docs: adopt Arrow-native nomenclature across user and
contributor guides
Comet's documentation conflated several distinct ideas under the word
'native': implementation language (Rust vs JVM), pipeline membership
(handled by Comet vs falls back to Spark), and data format (Arrow
columnar vs Spark rows). The same problem appears in the shuffle
naming, where both implementations are columnar and both use Arrow IPC
but only one operator name says 'Columnar'.
This change updates the documentation to use the vocabulary spelled
out in #4419:
- 'Arrow-native' for the data format property that unifies the
pipeline.
- 'Comet pipeline' for membership, replacing 'native Comet path' / 'on
the native Comet path' / 'accelerated by Comet'.
- 'Rust-implemented' / 'native Rust' / 'Rust code' for the
implementation language axis.
- Compound forms 'native shuffle', 'native scan' (paired with
CometBatchScan), and 'Arrow-native' stay; bare 'native execution',
'runs natively', 'the native path' as vague adjectives are
replaced.
- The 'three kinds of nodes' framing in understanding-comet-plans.md
becomes four: Arrow-native Rust operators, Arrow-native JVM
expressions, Arrow-native JVM plumbing, and Spark fallback.
This is the documentation-only phase. Operator renames
(CometExchange -> CometNativeShuffleExchange and friends) are tracked
separately and will land with deprecation aliases and plan-stability
golden updates.
Part of #4419
---
docs/source/about/gluten_comparison.md | 31 +--
.../adding_a_new_expression.md | 6 +-
.../adding_a_new_operator.md | 49 ++---
docs/source/contributor-guide/development.md | 4 +-
docs/source/contributor-guide/ffi.md | 2 +-
docs/source/contributor-guide/jvm_shuffle.md | 44 ++---
.../contributor-guide/native_shuffle.md | 18 +-
.../contributor-guide/plugin_overview.md | 20 +-
docs/source/contributor-guide/profiling.md | 2 +-
docs/source/contributor-guide/roadmap.md | 5 +-
.../spark_configs_support.md | 9 +-
.../contributor-guide/sql-file-tests.md | 12 +-
.../sql_error_propagation.md | 4 +-
docs/source/index.md | 14 +-
.../latest/compatibility/expressions/cast.md | 14 +-
.../user-guide/latest/compatibility/scans.md | 4 +-
docs/source/user-guide/latest/datasources.md | 19 +-
docs/source/user-guide/latest/iceberg.md | 10 +-
docs/source/user-guide/latest/installation.md | 2 +-
.../user-guide/latest/scala_java_udfs.md | 8 +-
docs/source/user-guide/latest/tuning.md | 4 +-
.../latest/understanding-comet-plans.md | 186 ++++++++++--------
22 files changed, 256 insertions(+), 211 deletions(-)
diff --git a/docs/source/about/gluten_comparison.md b/docs/source/about/gluten_comparison.md
index 3e59feffb7..a3ffb60bed 100644
--- a/docs/source/about/gluten_comparison.md
+++ b/docs/source/about/gluten_comparison.md
@@ -28,8 +28,9 @@ This document is based on Comet 0.16.0 and Gluten 1.6.0.
## Architecture
-Comet and Gluten have very similar architectures. Both are Spark plugins that translate Spark physical plans to
-a serialized representation and pass the serialized plan to native code for execution.
+Comet and Gluten have similar high-level architectures. Both are Spark plugins that translate Spark physical plans
+into a serialized representation and run them through an Arrow-columnar execution engine instead of Spark's
+row-based engine.
Gluten serializes the plans using the Substrait format and has an extensible architecture that supports execution
against multiple engines. Velox and Clickhouse are currently supported, but Velox is more widely used.
@@ -37,14 +38,19 @@ against multiple engines. Velox and Clickhouse are currently supported, but Velo
Comet serializes the plans in a proprietary Protocol Buffer format. Execution is delegated to Apache DataFusion. Comet
does not plan to support multiple engines, but rather focus on a tight integration between Spark and DataFusion.
+A second architectural difference is how each project handles JVM-side code. Gluten offloads supported operators to
+its native backend and falls back to row-based Spark execution otherwise. Comet additionally keeps a JVM execution
+path that operates directly on Arrow batches, which lets some constructs (today, Scala/Java scalar UDFs via Spark's
+whole-stage codegen) stay in the Arrow pipeline rather than triggering a fallback.
+
## Underlying Execution Engine: DataFusion vs Velox
-One of the main differences between Comet and Gluten is the choice of native execution engine.
+One of the main differences between Comet and Gluten is the choice of Arrow-columnar execution engine.
-Gluten uses Velox, which is an open-source C++ vectorized query engine created by Meta.
+Gluten uses Velox, which is an open-source C++ columnar query engine created by Meta.
-Comet uses Apache DataFusion, which is an open-source vectorized query engine implemented in Rust and is governed by the
-Apache Software Foundation.
+Comet uses Apache DataFusion, which is an open-source columnar query engine implemented in Rust and is governed by
+the Apache Software Foundation.
Velox and DataFusion are both mature query engines that are growing in popularity.
@@ -53,8 +59,8 @@ the choice of implementation language (Rust vs C++) and this may be the main fac
choosing a solution. For users wishing to implement UDFs in Rust, Comet would likely be a better choice. For users
wishing to implement UDFs in C++, Gluten would likely be a better choice.
-If users are just interested in speeding up their existing Spark jobs and do not need to implement UDFs in native
-code, then we suggest benchmarking with both solutions and choosing the fastest one for your use case.
+If users are just interested in speeding up their existing Spark jobs and do not need to implement UDFs in Rust
+or C++, then we suggest benchmarking with both solutions and choosing the fastest one for your use case.

@@ -75,9 +81,10 @@ Gluten supports Spark 3.3, 3.4, 3.5, 4.0, and 4.1.
Spark 4.0 enables ANSI SQL semantics by default, which changes how arithmetic overflow, invalid casts, division by
zero, and similar error conditions are handled. This is one area where the two projects currently differ.
-Comet implements ANSI semantics for the expressions it supports natively, including arithmetic overflow checks,
-ANSI cast behavior, and `try_*` variants. Queries running with `spark.sql.ansi.enabled=true` continue to be accelerated.
-See the [Comet Compatibility Guide] for details on which expressions have full ANSI coverage.
+Comet implements ANSI semantics for the expressions it supports in the Comet pipeline, including arithmetic
+overflow checks, ANSI cast behavior, and `try_*` variants. Queries running with `spark.sql.ansi.enabled=true`
+continue to be accelerated. See the [Comet Compatibility Guide] for details on which expressions have full
+ANSI coverage.
The Gluten Velox backend documents that ANSI mode is not supported and that any query executed with ANSI enabled
will fall back to vanilla Spark. See the [Gluten Velox limitations] page for the current status.
@@ -85,7 +92,7 @@ will fall back to vanilla Spark. See the [Gluten Velox limitations] page for the
[Gluten Velox limitations]: https://apache.github.io/gluten/velox-backend-limitations.html
For users adopting Spark 4.0 without disabling ANSI mode, this difference can have a significant impact on the
-fraction of a workload that runs natively.
+fraction of a workload Comet can accelerate.
## Table Format Support
diff --git a/docs/source/contributor-guide/adding_a_new_expression.md b/docs/source/contributor-guide/adding_a_new_expression.md
index 0eb5fa55db..e8f206c1af 100644
--- a/docs/source/contributor-guide/adding_a_new_expression.md
+++ b/docs/source/contributor-guide/adding_a_new_expression.md
@@ -95,7 +95,7 @@ org.apache.comet.CometNativeException: Error from DataFusion:
Function 'levenshtein' expects 2 arguments but received 3.
```
-The classic case is `levenshtein`. Spark accepts an optional 3rd `threshold` argument, DataFusion's built-in is 2-arg only, so the 3-arg form fails native execution unless the serde sets the return type explicitly. Other names that exist in both engines with potentially different signatures include `concat`, `coalesce`, `sha2`, and `regexp_replace`. If you are adding a function whose name is shared with `datafusion-functions`, check the upstream signature before deciding how to serialize.
+The classic case is `levenshtein`. Spark accepts an optional 3rd `threshold` argument, DataFusion's built-in is 2-arg only, so the 3-arg form fails at execution time unless the serde sets the return type explicitly. Other names that exist in both engines with potentially different signatures include `concat`, `coalesce`, `sha2`, and `regexp_replace`. If you are adding a function whose name is shared with `datafusion-functions`, check the upstream signature before deciding how to serialize.
To avoid the registry lookup, write a custom `CometExpressionSerde` and use `scalarFunctionExprToProtoWithReturnType`, passing the Spark expression's declared `dataType`:
@@ -143,7 +143,7 @@ The `SupportLevel` sealed trait has three possible values:
- **`Compatible(notes: Option[String] = None)`** - Comet supports this expression with full compatibility with Spark, or may have known differences in specific edge cases unlikely to affect most users. This is the default if you don't override `getSupportLevel`.
- **`Incompatible(notes: Option[String] = None)`** - Comet supports this expression but results can differ from Spark. The expression will only be used if `spark.comet.expr.allowIncompatible=true` or the expression-specific config `spark.comet.expr..allowIncompatible=true` is set.
-- **`Unsupported(notes: Option[String] = None)`** - Comet does not support this expression under the current conditions. Spark will fall back to its native execution.
+- **`Unsupported(notes: Option[String] = None)`** - Comet does not support this expression under the current conditions. The query falls back to Spark for this expression.
All three accept an optional `notes` parameter to provide additional context that is logged for debugging.
@@ -307,7 +307,7 @@ override def getUnsupportedReasons(): Seq[String] = Seq(
#### Adding Spark-side Tests for the New Expression
-It is important to verify that the new expression is correctly recognized by the native execution engine and matches the expected Spark behavior. The preferred way to add test coverage is to write a Comet SQL Test. This approach is simpler than writing Comet Scala Tests and makes it easy to cover many input combinations and edge cases.
+It is important to verify that the new expression is correctly recognized by Comet and matches the expected Spark behavior. The preferred way to add test coverage is to write a Comet SQL Test. This approach is simpler than writing Comet Scala Tests and makes it easy to cover many input combinations and edge cases.
##### Writing a Comet SQL Test
diff --git a/docs/source/contributor-guide/adding_a_new_operator.md b/docs/source/contributor-guide/adding_a_new_operator.md
index de2a73da88..02504cdacf 100644
--- a/docs/source/contributor-guide/adding_a_new_operator.md
+++ b/docs/source/contributor-guide/adding_a_new_operator.md
@@ -24,36 +24,37 @@ This guide explains how to add support for a new Spark physical operator in Apac
## Overview
`CometExecRule` is responsible for replacing Spark operators with Comet operators. There are different approaches to
-implementing Comet operators depending on where they execute and how they integrate with the native execution engine.
+implementing Comet operators depending on where they execute and how they integrate with the Rust-side DataFusion
+engine.
### Types of Comet Operators
`CometExecRule` maintains two distinct maps of operators:
-#### 1. Native Operators (`nativeExecs` map)
+#### 1. Rust-Implemented Operators (`nativeExecs` map)
-These operators run entirely in native Rust code and are the primary way to accelerate Spark workloads. Native
-operators are registered in the `nativeExecs` map in `CometExecRule.scala`.
+These operators run entirely as Rust code and are the primary way to accelerate Spark workloads. They are
+registered in the `nativeExecs` map in `CometExecRule.scala`.
-Key characteristics of native operators:
+Key characteristics:
-- They are converted to their corresponding native protobuf representation
-- They execute as DataFusion operators in the native engine
+- They are converted to their corresponding protobuf representation
+- They execute as DataFusion operators on the Rust side
- The `CometOperatorSerde` implementation handles enable/disable checks, support validation, and protobuf serialization
Examples: `ProjectExec`, `FilterExec`, `SortExec`, `HashAggregateExec`, `SortMergeJoinExec`, `ExpandExec`, `WindowExec`
#### 2. Sink Operators (`sinks` map)
-Sink operators serve as entry points (data sources) for native execution blocks. They are registered in the `sinks`
+Sink operators serve as entry points (data sources) for fused Rust-side blocks. They are registered in the `sinks`
map in `CometExecRule.scala`.
Key characteristics of sinks:
-- They become `ScanExec` operators in the native plan (see `operator2Proto` in `CometExecRule.scala`)
-- They can be leaf nodes that feed data into native execution blocks
+- They become `ScanExec` operators in the serialized DataFusion plan (see `operator2Proto` in `CometExecRule.scala`)
+- They can be leaf nodes that feed data into the Rust-side block
- They are wrapped with `CometScanWrapper` or `CometSinkPlaceHolder` during plan transformation
-- Examples include operators that bring data from various sources into native execution
+- Examples include operators that bring data from various sources into the Rust-side block
Examples: `UnionExec`, `CoalesceExec`, `CollectLimitExec`, `TakeOrderedAndProjectExec`
@@ -65,7 +66,7 @@ Special sinks (not in the `sinks` map but also treated as sinks):
#### 3. Comet JVM Operators
-These operators run in the JVM but are part of the Comet execution path. For JVM operators, all checks happen
+These operators run in the JVM but are part of the Comet pipeline. For JVM operators, all checks happen
in `CometExecRule` rather than using `CometOperatorSerde`, because they don't need protobuf serialization.
Examples: `CometBroadcastExchangeExec`, `CometShuffleExchangeExec`
@@ -74,26 +75,26 @@ Examples: `CometBroadcastExchangeExec`, `CometShuffleExchangeExec`
When adding a new operator, choose based on these criteria:
-**Use Native Operators when:**
+**Use Rust-implemented operators when:**
- The operator transforms data (e.g., project, filter, sort, aggregate, join)
- The operator has a direct DataFusion equivalent or custom implementation
-- The operator consumes native child operators and produces native output
+- The operator consumes Comet child operators and produces Arrow output
- The operator is in the middle of an execution pipeline
**Use Sink Operators when:**
-- The operator serves as a data source for native execution (becomes a `ScanExec`)
-- The operator brings data from non-native sources (e.g., `UnionExec` combining multiple inputs)
+- The operator serves as a data source for the Rust-side block (becomes a `ScanExec`)
+- The operator brings data from non-Comet sources (e.g., `UnionExec` combining multiple inputs)
- The operator is typically a leaf or near-leaf node in the execution tree
-- The operator needs special handling to interface with the native engine
+- The operator needs special handling to interface with the Rust-side engine
**Implementation Note for Sinks:**
Sink operators are handled specially in `CometExecRule.operator2Proto`. Instead of converting to their own operator
-type, they are converted to `ScanExec` in the native plan. This allows them to serve as entry points for native
-execution blocks. The original Spark operator is wrapped with `CometScanWrapper` or `CometSinkPlaceHolder` which
-manages the boundary between JVM and native execution.
+type, they are converted to `ScanExec` in the serialized DataFusion plan. This allows them to serve as entry points
+for the fused Rust-side block. The original Spark operator is wrapped with `CometScanWrapper` or
+`CometSinkPlaceHolder` which manages the JVM/Rust boundary.
## Implementing a Native Operator
@@ -393,7 +394,7 @@ test("your operator") {
The `checkSparkAnswerAndOperator` helper verifies:
-1. Results match Spark's native execution
+1. Results match Spark's reference execution
2. Your operator is actually being used (not falling back)
#### Rust Unit Tests
@@ -418,7 +419,7 @@ Add your operator to the supported operators list in `docs/source/user-guide/lat
## Implementing a Sink Operator
-Sink operators are converted to `ScanExec` in the native plan and serve as entry points for native execution. The implementation is simpler than native operators because sink operators extend the `CometSink` base class which provides the conversion logic.
+Sink operators are converted to `ScanExec` in the serialized DataFusion plan and serve as entry points for the fused Rust-side block. The implementation is simpler than Rust-implemented operators because sink operators extend the `CometSink` base class which provides the conversion logic.
### Step 1: Create a CometOperatorSerde Implementation
@@ -516,7 +517,7 @@ val COMET_EXEC_YOUR_SINK_ENABLED: ConfigEntry[Boolean] =
### Step 4: Add Tests
-Test that your sink operator correctly feeds data into native execution:
+Test that your sink operator correctly feeds data into the Rust-side block:
```scala
test("your sink operator") {
@@ -524,7 +525,7 @@ test("your sink operator") {
sql("CREATE TABLE test_table(col1 INT, col2 STRING) USING parquet")
sql("INSERT INTO test_table VALUES (1, 'a'), (2, 'b')")
- // Test query that uses your sink operator followed by native operators
+ // Test query that uses your sink operator followed by Rust-implemented operators
checkSparkAnswerAndOperator(
"SELECT col1 + 1 FROM (/* query that produces YourSinkExec */)"
)
diff --git a/docs/source/contributor-guide/development.md b/docs/source/contributor-guide/development.md
index 3785358cf3..fe77c00ca6 100644
--- a/docs/source/contributor-guide/development.md
+++ b/docs/source/contributor-guide/development.md
@@ -30,8 +30,8 @@ under the License.
## Threading Architecture
-Comet's native execution runs on a shared tokio multi-threaded runtime. Understanding this
-architecture is important because it affects how you write native operators and JVM callbacks.
+Comet's Rust side runs DataFusion plans on a shared tokio multi-threaded runtime. Understanding
+this architecture is important because it affects how you write Rust operators and JVM callbacks.
### How execution works
diff --git a/docs/source/contributor-guide/ffi.md b/docs/source/contributor-guide/ffi.md
index 24e6843ba1..d7a97beb61 100644
--- a/docs/source/contributor-guide/ffi.md
+++ b/docs/source/contributor-guide/ffi.md
@@ -193,7 +193,7 @@ heap memory configured.
### Architecture
-When JVM needs results from native execution:
+When the JVM needs results from the Rust-side DataFusion plan:
```
┌─────────────────┐
diff --git a/docs/source/contributor-guide/jvm_shuffle.md b/docs/source/contributor-guide/jvm_shuffle.md
index 2145c82eba..c20d6121e0 100644
--- a/docs/source/contributor-guide/jvm_shuffle.md
+++ b/docs/source/contributor-guide/jvm_shuffle.md
@@ -27,12 +27,12 @@ alternative, see [Native Shuffle](native_shuffle.md).
Comet provides two shuffle implementations:
-- **CometNativeShuffle** (`CometExchange`): Fully native shuffle using Rust. Takes columnar input directly
- from Comet native operators and performs partitioning in native code.
+- **CometNativeShuffle** (`CometExchange`): Fully Rust-implemented shuffle. Takes Arrow input directly
+ from upstream Comet operators and performs partitioning in Rust.
- **CometColumnarShuffle** (`CometColumnarExchange`): JVM-based shuffle that operates on rows internally,
- buffers `UnsafeRow`s in memory pages, and uses native code (via JNI) to encode them to Arrow IPC format.
- Uses Spark's partitioner for partition assignment. Can accept either row-based or columnar input
- (columnar input is converted to rows via `ColumnarToRowExec`).
+ buffers `UnsafeRow`s in memory pages, and calls into Rust (via JNI) to encode them to Arrow IPC format.
+ Uses Spark's partitioner for partition assignment. Can accept either row-based or Arrow input
+ (Arrow input is converted to rows via `ColumnarToRowExec`).
The JVM shuffle is selected via `CometShuffleDependency.shuffleType`.
@@ -42,13 +42,13 @@ JVM shuffle (`CometColumnarExchange`) is used instead of native shuffle (`CometE
1. **Shuffle mode is explicitly set to "jvm"**: When `spark.comet.exec.shuffle.mode` is set to `jvm`.
-2. **Child plan is not a Comet native operator**: When the child plan is a Spark row-based operator
- (not a `CometPlan`), JVM shuffle is the only option since native shuffle requires columnar input
- from Comet operators.
+2. **Child plan is not a Comet operator**: When the child plan is a Spark row-based operator
+ (not a `CometPlan`), JVM shuffle is the only option since native shuffle requires Arrow input
+ from upstream Comet operators.
3. **Unsupported partition key types**: For `HashPartitioning` and `RangePartitioning`, native shuffle
only supports primitive types as partition keys. Complex types (struct, array, map) cannot be used
- as partition keys in native shuffle and will fall back to JVM columnar shuffle. Note that complex types are
+ as partition keys in native shuffle and will fall back to JVM shuffle. Note that complex types are
fully supported as data columns in both implementations.
## Input Handling
@@ -58,9 +58,9 @@ JVM shuffle (`CometColumnarExchange`) is used instead of native shuffle (`CometE
When the child plan is a Spark row-based operator, `CometColumnarExchange` calls `child.execute()` which
returns an `RDD[InternalRow]`. The rows flow directly to the JVM shuffle writers.
-### Comet Columnar Input
+### Comet Arrow Input
-When the child plan is a Comet native operator (e.g., `CometHashAggregate`) but JVM shuffle is selected
+When the child plan is a Comet operator (e.g., `CometHashAggregate`) but JVM shuffle is selected
(due to shuffle mode setting or unsupported partitioning), `CometColumnarExchange` still calls
`child.execute()`. Comet operators implement `doExecute()` by wrapping themselves with `ColumnarToRowExec`:
@@ -73,13 +73,13 @@ override def doExecute(): RDD[InternalRow] =
This means the data path becomes:
```
-Comet Native (columnar) → ColumnarToRowExec → rows → JVM Shuffle → Arrow IPC → columnar
+Comet operator (Arrow) → ColumnarToRowExec → rows → JVM Shuffle → Arrow IPC → Arrow
```
-This is less efficient than native shuffle which avoids the columnar-to-row conversion:
+This is less efficient than native shuffle which avoids the Arrow-to-row conversion:
```
-Comet Native (columnar) → Native Shuffle → Arrow IPC → columnar
+Comet operator (Arrow) → Native Shuffle → Arrow IPC → Arrow
```
### Why Use Spark's Partitioner?
@@ -87,7 +87,7 @@ Comet Native (columnar) → Native Shuffle → Arrow IPC → columnar
JVM shuffle uses row-based input so it can leverage Spark's existing partitioner infrastructure
(`partitioner.getPartition(key)`). This allows Comet to support all of Spark's partitioning schemes
without reimplementing them in Rust. Native shuffle, by contrast, serializes the partitioning scheme
-to protobuf and implements the partitioning logic in native code.
+to protobuf and implements the partitioning logic in Rust.
## Architecture
@@ -147,11 +147,11 @@ Selection logic in `CometShuffleManager.shouldBypassMergeSort()`:
| Class | Location | Description |
| ----------------------------------- | ---------------------------------------------------- | ------------------------------------------------------------------------------------------------------------------- |
-| `CometBypassMergeSortShuffleWriter` | `.../shuffle/CometBypassMergeSortShuffleWriter.java` | Hash-based writer. Creates one `CometDiskBlockWriter` per partition. Supports async writes. |
-| `CometUnsafeShuffleWriter` | `.../shuffle/CometUnsafeShuffleWriter.java` | Sort-based writer. Uses `CometShuffleExternalSorter` to buffer and sort records, then merges spill files. |
-| `CometDiskBlockWriter` | `.../shuffle/CometDiskBlockWriter.java` | Buffers rows in memory pages for a single partition. Spills to disk via native encoding. Used by bypass writer. |
-| `CometShuffleExternalSorter` | `.../shuffle/sort/CometShuffleExternalSorter.java` | Buffers records across all partitions, sorts by partition ID, spills sorted data. Used by unsafe writer. |
-| `SpillWriter` | `.../shuffle/SpillWriter.java` | Base class for spill logic. Manages memory pages and calls `Native.writeSortedFileNative()` for Arrow IPC encoding. |
+| `CometBypassMergeSortShuffleWriter` | `.../shuffle/CometBypassMergeSortShuffleWriter.java` | Hash-based writer. Creates one `CometDiskBlockWriter` per partition. Supports async writes. |
+| `CometUnsafeShuffleWriter` | `.../shuffle/CometUnsafeShuffleWriter.java` | Sort-based writer. Uses `CometShuffleExternalSorter` to buffer and sort records, then merges spill files. |
+| `CometDiskBlockWriter` | `.../shuffle/CometDiskBlockWriter.java` | Buffers rows in memory pages for a single partition. Spills to disk via the Rust IPC encoder. Used by bypass writer. |
+| `CometShuffleExternalSorter` | `.../shuffle/sort/CometShuffleExternalSorter.java` | Buffers records across all partitions, sorts by partition ID, spills sorted data. Used by unsafe writer. |
+| `SpillWriter` | `.../shuffle/SpillWriter.java` | Base class for spill logic. Manages memory pages and calls `Native.writeSortedFileNative()` for Arrow IPC encoding in Rust. |
### Reader
@@ -168,7 +168,7 @@ Selection logic in `CometShuffleManager.shouldBypassMergeSort()`:
2. Writer receives `Iterator[Product2[K, V]]` where V is `UnsafeRow`
3. Rows are serialized and buffered in off-heap memory pages
4. When memory threshold or batch size is reached, `SpillWriter.doSpilling()` is called
-5. Native code (`Native.writeSortedFileNative()`) converts rows to Arrow arrays and writes IPC format
+5. The Rust side (`Native.writeSortedFileNative()`) converts rows to Arrow arrays and writes IPC format
6. For bypass writer: partition files are concatenated into final output
7. For sort writer: spill files are merged
@@ -176,7 +176,7 @@ Selection logic in `CometShuffleManager.shouldBypassMergeSort()`:
1. `CometBlockStoreShuffleReader.read()` creates `ShuffleBlockFetcherIterator`
2. For each block, `NativeBatchDecoderIterator` reads the IPC stream
-3. Native code (`Native.decodeShuffleBlock()`) decompresses and decodes to Arrow arrays
+3. The Rust side (`Native.decodeShuffleBlock()`) decompresses and decodes to Arrow arrays
4. Arrow FFI imports arrays as `ColumnarBatch`
## Memory Management
diff --git a/docs/source/contributor-guide/native_shuffle.md b/docs/source/contributor-guide/native_shuffle.md
index 18e80a90c8..f5cb1bfe2d 100644
--- a/docs/source/contributor-guide/native_shuffle.md
+++ b/docs/source/contributor-guide/native_shuffle.md
@@ -25,18 +25,18 @@ see [JVM Shuffle](jvm_shuffle.md).
## Overview
-Native shuffle takes columnar input directly from Comet native operators and performs partitioning,
-encoding, and writing in native Rust code. This avoids the columnar-to-row-to-columnar conversion
-overhead that JVM shuffle incurs.
+Native shuffle takes Arrow input directly from upstream Comet operators and performs partitioning,
+encoding, and writing as Rust code. This avoids the Arrow-to-row-to-Arrow conversion overhead that
+JVM shuffle incurs.
```
-Comet Native (columnar) → Native Shuffle → Arrow IPC → columnar
+Comet operator (Arrow) → Native Shuffle → Arrow IPC → Arrow
```
Compare this to JVM shuffle's data path:
```
-Comet Native (columnar) → ColumnarToRowExec → rows → JVM Shuffle → Arrow IPC → columnar
+Comet operator (Arrow) → ColumnarToRowExec → rows → JVM Shuffle → Arrow IPC → Arrow
```
## When Native Shuffle is Used
@@ -45,8 +45,8 @@ Native shuffle (`CometExchange`) is selected when all of the following condition
1. **Shuffle mode allows native**: `spark.comet.exec.shuffle.mode` is `native` or `auto`.
-2. **Child plan is a Comet native operator**: The child must be a `CometPlan` that produces
- columnar output. Row-based Spark operators require JVM shuffle.
+2. **Child plan is a Comet operator**: The child must be a `CometPlan` that produces Arrow
+ output. Row-based Spark operators require JVM shuffle.
3. **Supported partitioning type**: Native shuffle supports:
- `HashPartitioning`
@@ -70,7 +70,7 @@ Native shuffle (`CometExchange`) is selected when all of the following condition
┌─────────────────────────────────────────────────────────────────────────────┐
│ CometNativeShuffleWriter │
│ - Constructs protobuf operator plan │
-│ - Invokes native execution via CometExec.getCometIterator() │
+│ - Invokes the Rust shuffle writer via CometExec.getCometIterator() │
└─────────────────────────────────────────────────────────────────────────────┘
│
▼ (JNI)
@@ -106,7 +106,7 @@ Native shuffle (`CometExchange`) is selected when all of the following condition
| Class | Location | Description |
| ------------------------------ | ------------------------------------------------ | --------------------------------------------------------------------------------------------- |
| `CometShuffleExchangeExec` | `.../shuffle/CometShuffleExchangeExec.scala` | Physical plan node. Validates types and partitioning, creates `CometShuffleDependency`. |
-| `CometNativeShuffleWriter` | `.../shuffle/CometNativeShuffleWriter.scala` | Implements `ShuffleWriter`. Builds protobuf plan and invokes native execution. |
+| `CometNativeShuffleWriter` | `.../shuffle/CometNativeShuffleWriter.scala` | Implements `ShuffleWriter`. Builds the protobuf plan and invokes the Rust shuffle writer. |
| `CometShuffleDependency` | `.../shuffle/CometShuffleDependency.scala` | Extends `ShuffleDependency`. Holds shuffle type, schema, and range partition bounds. |
| `CometBlockStoreShuffleReader` | `.../shuffle/CometBlockStoreShuffleReader.scala` | Reads shuffle blocks via `ShuffleBlockFetcherIterator`. Decodes Arrow IPC to `ColumnarBatch`. |
| `NativeBatchDecoderIterator` | `.../shuffle/NativeBatchDecoderIterator.scala` | Reads compressed Arrow IPC from input stream. Calls native decode via JNI. |
diff --git a/docs/source/contributor-guide/plugin_overview.md b/docs/source/contributor-guide/plugin_overview.md
index 444c9ccb60..9073c00bd0 100644
--- a/docs/source/contributor-guide/plugin_overview.md
+++ b/docs/source/contributor-guide/plugin_overview.md
@@ -21,9 +21,11 @@ under the License.
## Overview
-The Comet plugin enhances Spark SQL by introducing optimized query execution and shuffle mechanisms leveraging
-native code. It integrates with Spark's plugin framework and extension API to replace or extend Spark's
-default behavior.
+The Comet plugin keeps Spark queries inside an Arrow-native pipeline: supported operators, expressions,
+shuffle, and broadcast all consume and produce Apache Arrow columnar batches rather than Spark's row
+format. Within that pipeline, individual operators and expressions may be implemented as Rust code
+running in DataFusion or as JVM code that operates directly on Arrow batches. Comet integrates with
+Spark's plugin framework and extension API to install the rules and shuffle machinery that drive this.
---
@@ -78,13 +80,13 @@ If an operator, expression, or data type is not supported by Comet then the reas
underlying Spark node and the plan will not be converted.
Comet does not support partially replacing subsets of the plan within a query stage because this would involve adding
-transitions to convert between row-based and columnar data between Spark operators and Comet operators and the overhead
-of this could outweigh the benefits of running parts of the query stage natively in Comet.
+transitions to convert between row-based and Arrow data between Spark operators and Comet operators and the overhead
+of this could outweigh the benefits of running parts of the query stage inside the Comet pipeline.
## Query Execution
-Once the plan has been transformed, any consecutive native Comet operators are combined into a `CometNativeExec` which contains
-a protocol buffer serialized version of the plan (the serialization code can be found in `QueryPlanSerde`).
+Once the plan has been transformed, any consecutive Rust-implemented Comet operators are combined into a `CometNativeExec`
+which contains a protocol buffer serialized version of the plan (the serialization code can be found in `QueryPlanSerde`).
Spark serializes the physical plan and sends it to the executors when executing tasks. The executors deserialize the
plan and invoke it.
@@ -97,8 +99,8 @@ Apache DataFusion `ExecutionPlan`. In some cases, Comet provides specialized phy
override the DataFusion versions to ensure compatibility with Apache Spark.
The leaf nodes in the physical plan are always `ScanExec` and each of these operators will make a JNI call to
-`CometBatchIterator.next()` to fetch the next input batch. The input could be a Comet native Parquet scan,
-a Spark exchange, or another native plan.
+`CometBatchIterator.next()` to fetch the next input batch. The input could be a `CometNativeScan` Parquet scan,
+a Spark exchange, or another DataFusion plan.
`CometNativeExec` creates a `CometExecIterator` and applies this iterator to the input RDD
partitions. Each call to `CometExecIterator.next()` will invoke `Native.executePlan`. Once the plan finishes
diff --git a/docs/source/contributor-guide/profiling.md b/docs/source/contributor-guide/profiling.md
index 67729a235e..709a1a2a9c 100644
--- a/docs/source/contributor-guide/profiling.md
+++ b/docs/source/contributor-guide/profiling.md
@@ -293,4 +293,4 @@ cargo flamegraph --root --bench
In async-profiler flame graphs, native Rust frames appear below JNI entry points like
`Java_org_apache_comet_Native_*`. Look for these transition points to understand how
-time is split between Spark's JVM code and Comet's native execution.
+time is split between JVM code and Comet's Rust-side execution.
diff --git a/docs/source/contributor-guide/roadmap.md b/docs/source/contributor-guide/roadmap.md
index bae4ddba58..7421a2248a 100644
--- a/docs/source/contributor-guide/roadmap.md
+++ b/docs/source/contributor-guide/roadmap.md
@@ -82,8 +82,9 @@ performance for memory-intensive workloads.
## Java/Scala Columnar and Arrow UDF Support
Spark users frequently define custom UDFs in Java or Scala. Comet currently falls back to Spark when a query
-contains a JVM UDF. Adding support for calling Java/Scala UDFs that operate on columnar Arrow data directly
-from native execution will reduce fallbacks and allow more queries to run end-to-end in Comet.
+contains a JVM UDF other than the Scala UDFs covered by the codegen path. Adding support for Java/Scala UDFs
+that operate on Arrow batches directly will let more UDFs stay in the Comet pipeline and allow more queries
+to run end-to-end in Comet.
## Memory Management Improvements
diff --git a/docs/source/contributor-guide/spark_configs_support.md b/docs/source/contributor-guide/spark_configs_support.md
index fe1da3cd23..ac0200ddfc 100644
--- a/docs/source/contributor-guide/spark_configs_support.md
+++ b/docs/source/contributor-guide/spark_configs_support.md
@@ -27,11 +27,12 @@ verification has been performed, and any known gaps.
The status column uses these values:
-- **Supported** -- Comet runs the affected expressions natively under every value of
+- **Supported** -- Comet handles the affected expressions in the Comet pipeline under every value of
the config, and produces results matching Spark.
-- **Partial** -- Comet runs natively for some values of the config but falls back to
- Spark for others, or runs natively but with documented incompatibilities.
-- **Falls back** -- Comet does not run the affected expressions natively under this
+- **Partial** -- Comet handles the affected expressions in the Comet pipeline for some values of the
+ config but falls back to Spark for others, or handles them in the pipeline with documented
+ incompatibilities.
+- **Falls back** -- Comet does not handle the affected expressions in the Comet pipeline under this
config and always defers to Spark.
- **Unaudited** -- the config's interaction with Comet has not yet been verified.
diff --git a/docs/source/contributor-guide/sql-file-tests.md b/docs/source/contributor-guide/sql-file-tests.md
index 9b63dcf8ed..f0d5373237 100644
--- a/docs/source/contributor-guide/sql-file-tests.md
+++ b/docs/source/contributor-guide/sql-file-tests.md
@@ -145,7 +145,7 @@ mode controls how results are validated.
#### `query` (default mode)
-Checks that the query runs natively on Comet (not falling back to Spark) and that results
+Checks that the query runs in the Comet pipeline (not falling back to Spark) and that results
match Spark exactly.
```sql
@@ -155,9 +155,9 @@ SELECT abs(v) FROM test_abs
#### `query spark_answer_only`
-Only checks that Comet results match Spark. Does not assert that the query runs natively.
-Use this for expressions that Comet may not fully support yet but should still produce
-correct results.
+Only checks that Comet results match Spark. Does not assert that the query stays in the
+Comet pipeline. Use this for expressions that Comet may not fully support yet but should
+still produce correct results.
```sql
query spark_answer_only
@@ -232,8 +232,8 @@ SELECT array(1, 2, 3)[10]
`NULL`, boundary values, and negative numbers.
5. Add `query` blocks for each expression or behavior to test. Use the default `query` mode
- when you expect Comet to run the expression natively. Use `query spark_answer_only` when
- native execution is not yet expected.
+ when you expect Comet to handle the expression in the Comet pipeline. Use
+ `query spark_answer_only` when Comet support is not yet expected.
6. Run the tests to verify:
diff --git a/docs/source/contributor-guide/sql_error_propagation.md b/docs/source/contributor-guide/sql_error_propagation.md
index a27408510d..48620a84a9 100644
--- a/docs/source/contributor-guide/sql_error_propagation.md
+++ b/docs/source/contributor-guide/sql_error_propagation.md
@@ -352,8 +352,8 @@ public final class CometQueryExecutionException extends CometNativeException {
## Step 7: Scala Converts JSON Back to a Real Spark Exception
-`CometExecIterator.scala` is the Scala code that drives the native execution. Every time it
-calls into the native engine for the next batch of data, it catches
+`CometExecIterator.scala` is the Scala code that drives the Rust-side DataFusion plan. Every
+time it calls into the native engine for the next batch of data, it catches
`CometQueryExecutionException` and converts it:
```scala
diff --git a/docs/source/index.md b/docs/source/index.md
index d1ea261824..b1f92c73c6 100644
--- a/docs/source/index.md
+++ b/docs/source/index.md
@@ -27,12 +27,14 @@ under the License.
Fork
-Apache DataFusion Comet is a high-performance accelerator for Apache Spark, built on top of the powerful
-[Apache DataFusion] query engine. Comet is designed to significantly enhance the
-performance of Apache Spark workloads while leveraging commodity hardware and seamlessly integrating with the
-Spark ecosystem without requiring any code changes.
-
-Comet also accelerates Apache Iceberg, when performing Parquet scans from Spark.
+Apache DataFusion Comet is a high-performance accelerator for Apache Spark. Comet keeps Spark
+queries **Arrow-native end-to-end**: operators, expressions, shuffle, and broadcast all stay in
+Apache Arrow columnar format, avoiding the per-row overhead of Spark's row-based engine. Within
+the Arrow-native pipeline, operators and expressions execute as Rust code (via the
+[Apache DataFusion] query engine) or as JVM code that operates directly on Arrow batches.
+
+Comet integrates with the Spark ecosystem without requiring any code changes, and also accelerates
+Apache Iceberg when performing Parquet scans from Spark.
[Apache DataFusion]: https://datafusion.apache.org
diff --git a/docs/source/user-guide/latest/compatibility/expressions/cast.md b/docs/source/user-guide/latest/compatibility/expressions/cast.md
index f7182e5713..eff2c613fa 100644
--- a/docs/source/user-guide/latest/compatibility/expressions/cast.md
+++ b/docs/source/user-guide/latest/compatibility/expressions/cast.md
@@ -24,9 +24,9 @@ Cast operations in Comet fall into three levels of support:
- **C (Compatible)**: The results match Apache Spark
- **I (Incompatible)**: The results may match Apache Spark for some inputs, but there are known issues where some inputs
will result in incorrect results or exceptions. The query stage will fall back to Spark by default. Setting
- `spark.comet.expression.Cast.allowIncompatible=true` will allow all incompatible casts to run natively in Comet, but this is not
- recommended for production use.
-- **U (Unsupported)**: Comet does not provide a native version of this cast expression and the query stage will fall back to
+ `spark.comet.expression.Cast.allowIncompatible=true` will allow all incompatible casts to run inside the Comet pipeline,
+ but this is not recommended for production use.
+- **U (Unsupported)**: Comet does not provide an implementation of this cast expression and the query stage will fall back to
Spark.
- **N/A**: Spark does not support this cast.
@@ -83,9 +83,9 @@ is the same regardless of the session timezone setting.
In Legacy mode, `CAST(date AS INT)`, `CAST(date AS LONG)`, and casts to all other numeric
types (Boolean, Byte, Short, Float, Double, Decimal) always return `NULL`. Comet handles
-this by short-circuiting to a null literal during query planning, so no native execution
-is needed. In ANSI and Try modes, Spark rejects these casts at analysis time (before
-execution reaches Comet).
+this by short-circuiting to a null literal during query planning, so the cast is removed
+before reaching the runtime. In ANSI and Try modes, Spark rejects these casts at analysis
+time (before execution reaches Comet).
## String to Timestamp
@@ -139,7 +139,7 @@ The result is always a wall-clock timestamp with no timezone conversion or DST a
Casting a `DecimalType` with a negative scale to `StringType` is marked as incompatible when
`spark.sql.legacy.allowNegativeScaleOfDecimal` is `false` (the default). When that config is
disabled, Spark cannot create negative-scale decimals, so Comet falls back to avoid running
-native execution on unexpected inputs.
+its cast implementation on unexpected inputs.
When `spark.sql.legacy.allowNegativeScaleOfDecimal=true`, the cast is compatible. Comet matches
Spark's behavior of using Java `BigDecimal.toString()` semantics, which produces scientific
diff --git a/docs/source/user-guide/latest/compatibility/scans.md b/docs/source/user-guide/latest/compatibility/scans.md
index 37524a829e..f5b921f91c 100644
--- a/docs/source/user-guide/latest/compatibility/scans.md
+++ b/docs/source/user-guide/latest/compatibility/scans.md
@@ -63,8 +63,8 @@ The following limitation may produce incorrect results without falling back to S
The following limitation raises an error at scan time rather than falling back to Spark:
- Invalid UTF-8 bytes in `STRING` columns. Spark permits arbitrary byte sequences in a `STRING`
- column (for example from `CAST(X'C1' AS STRING)`), but Comet's native execution path is built on
- Arrow, whose string type is strictly UTF-8. Reading a Parquet file whose `STRING` column contains
+ column (for example from `CAST(X'C1' AS STRING)`), but Comet's pipeline is built on Arrow,
+ whose string type is strictly UTF-8. Reading a Parquet file whose `STRING` column contains
non-UTF-8 bytes fails with `Parquet error: encountered non UTF-8 data`. Disable Comet for the
query, or cast the column to `BINARY` before persisting, if you need to preserve non-UTF-8 bytes.
See [#4121](https://github.com/apache/datafusion-comet/issues/4121).
diff --git a/docs/source/user-guide/latest/datasources.md b/docs/source/user-guide/latest/datasources.md
index f1d20fa3a4..a93cc413e4 100644
--- a/docs/source/user-guide/latest/datasources.md
+++ b/docs/source/user-guide/latest/datasources.md
@@ -23,9 +23,10 @@
### Parquet
-Parquet scans are performed natively by Comet if all data types in the schema are supported. When the scan
-falls back to Spark, enabling `spark.comet.convert.parquet.enabled` will immediately convert the data into
-Arrow format, allowing native execution to happen after that, but the process may not be efficient.
+Parquet scans are read directly into Arrow by Comet's Rust scan when all data types in the schema are supported.
+When the scan falls back to Spark, enabling `spark.comet.convert.parquet.enabled` will immediately convert the
+data into Arrow format so the rest of the plan can stay in the Comet pipeline, though the conversion itself
+may not be efficient.
### Apache Iceberg
@@ -35,17 +36,17 @@ Comet accelerates Iceberg scans of Parquet files. See the [Iceberg Guide] for mo
### CSV
-Comet provides experimental native CSV scan support. When `spark.comet.scan.csv.v2.enabled` is enabled, CSV files
-are read natively for improved performance. This feature is experimental and performance benefits are
-workload-dependent.
+Comet provides an experimental Rust-implemented CSV scan. When `spark.comet.scan.csv.v2.enabled` is enabled, CSV
+files are read directly into Arrow for improved performance. This feature is experimental and performance benefits
+are workload-dependent.
Alternatively, when `spark.comet.convert.csv.enabled` is enabled, data from Spark's CSV reader is immediately
-converted into Arrow format, allowing native execution to happen after that.
+converted into Arrow format so the rest of the plan can stay in the Comet pipeline.
### JSON
-Comet does not provide native JSON scan, but when `spark.comet.convert.json.enabled` is enabled, data is immediately
-converted into Arrow format, allowing native execution to happen after that.
+Comet does not provide a Rust-implemented JSON scan, but when `spark.comet.convert.json.enabled` is enabled,
+data is immediately converted into Arrow format so the rest of the plan can stay in the Comet pipeline.
## Data Catalogs
diff --git a/docs/source/user-guide/latest/iceberg.md b/docs/source/user-guide/latest/iceberg.md
index f22180ec77..82941a1804 100644
--- a/docs/source/user-guide/latest/iceberg.md
+++ b/docs/source/user-guide/latest/iceberg.md
@@ -22,7 +22,7 @@
## Native Reader
Comet's native Iceberg reader relies on reflection to extract `FileScanTask`s from Iceberg, which are
-then serialized to Comet's native execution engine (see
+then serialized to DataFusion for execution (see
[PR #2528](https://github.com/apache/datafusion-comet/pull/2528)).
The example below uses Spark's package downloader to retrieve Comet $COMET_VERSION and Iceberg
@@ -157,12 +157,12 @@ Iceberg ships several `ScalaUDF`s that surface in user queries and maintenance a
(`INT_ORDERED_BYTES`, `LONG_ORDERED_BYTES`, ..., `INTERLEAVE_BYTES`) over the sort key columns
during compaction.
-By default these UDFs cause the enclosing operator to fall back to Spark, which forces a
-columnar-to-row roundtrip and demotes the surrounding shuffle from `CometExchange` to
+By default these UDFs cause the enclosing operator to fall back to Spark, which forces an
+Arrow-to-row roundtrip and demotes the surrounding shuffle from `CometExchange` to
`CometColumnarExchange`. Enabling the experimental
[Scala UDF and Java UDF Support](scala_java_udfs.md) feature
-(`spark.comet.exec.scalaUDF.codegen.enabled=true`) routes these UDFs through native execution so
-the project, exchange, and sort operators around them stay on the Comet path end-to-end.
+(`spark.comet.exec.scalaUDF.codegen.enabled=true`) keeps these UDFs in the Comet pipeline so
+the project, exchange, and sort operators around them stay accelerated end-to-end.
### Task input metrics
diff --git a/docs/source/user-guide/latest/installation.md b/docs/source/user-guide/latest/installation.md
index 4b7717b688..106a232227 100644
--- a/docs/source/user-guide/latest/installation.md
+++ b/docs/source/user-guide/latest/installation.md
@@ -175,4 +175,4 @@ Some cluster managers may require additional configuration, see , values: Array>` rather than `VectorUDT`).
+- `CalendarIntervalType`, `NullType`, and `UserDefinedType` arguments and return types. UDT-typed columns fall back to Spark; to keep the work in the Comet pipeline, store and read the underlying representation directly (e.g. write MLlib `Vector` outputs as `Struct, values: Array>` rather than `VectorUDT`).
- Trees whose total nested-field count (output plus all input columns the UDF tree references) exceeds `spark.sql.codegen.maxFields` (default 100). Comet refuses these at plan time and the operator falls back to Spark.
When a UDF is rejected, the reason surfaces through Comet's standard fallback diagnostics; the query still runs on Spark.
diff --git a/docs/source/user-guide/latest/tuning.md b/docs/source/user-guide/latest/tuning.md
index a394b4a4cb..6d9527c94e 100644
--- a/docs/source/user-guide/latest/tuning.md
+++ b/docs/source/user-guide/latest/tuning.md
@@ -61,8 +61,8 @@ The valid pool types are:
- `fair_unified` (default when `spark.memory.offHeap.enabled=true` is set)
- `greedy_unified`
-Both pool types are shared across all native execution contexts within the same Spark task. When
-Comet executes a shuffle, it runs two native execution contexts concurrently (e.g. one for
+Both pool types are shared across all DataFusion execution contexts within the same Spark task. When
+Comet executes a shuffle, it runs two DataFusion execution contexts concurrently (e.g. one for
pre-shuffle operators and one for the shuffle writer). The shared pool ensures that the combined
memory usage stays within the per-task limit.
diff --git a/docs/source/user-guide/latest/understanding-comet-plans.md b/docs/source/user-guide/latest/understanding-comet-plans.md
index 7fb93c8f53..892cafc8fe 100644
--- a/docs/source/user-guide/latest/understanding-comet-plans.md
+++ b/docs/source/user-guide/latest/understanding-comet-plans.md
@@ -27,44 +27,60 @@ inspect that behavior.
When Comet is enabled, the `CometSparkSessionExtensions` rules walk the
physical plan bottom-up and replace Spark operators with Comet equivalents
-where possible. Consecutive native operators are combined into a single block
-that is serialized as protobuf and executed by DataFusion on the executor.
-Operators that Comet does not support remain as their original Spark form.
-
-As a result, a plan can mix three kinds of nodes:
-
-- **`Comet*` nodes** that run natively in Rust (for example `CometProject`,
- `CometHashAggregate`).
-- **`Comet*` nodes that run on the JVM** but are still part of the Comet
- pipeline (for example `CometBroadcastExchange`, `CometColumnarExchange`).
-- **Standard Spark nodes** (for example `Project`, `HashAggregate`) where
- Comet either does not support the operator or has fallen back due to an
- unsupported expression, data type, or configuration.
-
-Wherever data crosses between columnar and row-based execution, Comet inserts
-a transition node such as `CometColumnarToRow` or `CometSparkRowToColumnar`.
+where possible. Consecutive Rust-implemented operators are combined into a
+single block that is serialized as protobuf and executed by DataFusion on
+the executor. Operators that Comet does not support remain as their
+original Spark form.
+
+The unifying property of the Comet pipeline is that data stays in Apache
+Arrow columnar format throughout: operators, expressions, shuffle, and
+broadcast all consume and produce Arrow batches. Within that pipeline an
+individual operator or expression may be implemented in Rust (via
+DataFusion) or in JVM code that operates directly on Arrow batches. The
+implementation language is an internal detail; from the query's
+perspective the work is Arrow-native regardless.
+
+As a result, a plan can mix four kinds of nodes:
+
+- **Arrow-native Rust operators** that execute as Rust code in DataFusion
+ (for example `CometProject`, `CometHashAggregate`, `CometSort`).
+- **Arrow-native JVM expressions** that operate directly on Arrow batches
+ from JVM code (today, the Scala UDF codegen path; future Arrow UDFs and
+ hybrid JVM/native expressions slot in here as well).
+- **Arrow-native JVM plumbing operators** that coordinate Arrow batches
+ without per-row compute (for example `CometUnion`, `CometCoalesce`,
+ `CometBroadcastExchange`).
+- **Spark fallback nodes** (for example `Project`, `HashAggregate`,
+ plain `Exchange`) where Comet either does not support the operator or
+ has fallen back due to an unsupported expression, data type, or
+ configuration. These run as row-based Spark execution.
+
+Wherever data crosses between Arrow columnar and Spark row-based execution,
+Comet inserts a transition node such as `CometColumnarToRow` or
+`CometSparkRowToColumnar`.
## Reading a Plan
You can print a plan with `df.explain("formatted")` or `EXPLAIN FORMATTED `, and
the same plan is shown in the Spark SQL UI. When reading a plan, look for:
-- **Node prefix.** `Comet*` nodes are accelerated by Comet. Anything without
- the prefix is unmodified Spark.
+- **Node prefix.** `Comet*` nodes are part of the Comet pipeline. Anything
+ without the prefix is unmodified Spark.
- **Transitions.** `CometColumnarToRow`, `CometNativeColumnarToRow`, and
- `CometSparkRowToColumnar` mark boundaries between columnar Comet execution
- and row-based Spark execution. Frequent transitions usually indicate
- fallback inside the plan.
-- **Exchange type.** `CometExchange` is the native shuffle path,
- `CometColumnarExchange` is the JVM columnar shuffle path, and a plain
- `Exchange` means Spark shuffle. See [Shuffle Operators](#shuffle-operators)
- below.
+ `CometSparkRowToColumnar` mark boundaries between the Arrow-native
+ pipeline and row-based Spark execution. Frequent transitions usually
+ indicate fallback inside the plan.
+- **Exchange type.** `CometExchange` is the native shuffle path (partition,
+ encode, and compress run in Rust), `CometColumnarExchange` is the JVM
+ shuffle path, and a plain `Exchange` means Spark shuffle. See
+ [Shuffle Operators](#shuffle-operators) below.
## Fallback
-A "fallback" happens when Comet cannot translate part of a plan into native
-execution. Fallback can be partial (a subtree falls back while the rest stays
-native) or full (no Comet nodes appear).
+A "fallback" happens when Comet cannot translate part of a plan into the
+Comet pipeline and the work runs as row-based Spark execution instead.
+Fallback can be partial (a subtree falls back while the rest stays in the
+Comet pipeline) or full (no Comet nodes appear).
Common reasons:
@@ -88,16 +104,16 @@ They serve different purposes and produce output in different places.
| Config | Output destination | What you see |
| ---------------------------------------- | ---------------------------------- | --------------------------------------------------------------------------------------------- |
-| `spark.comet.explainFallback.enabled` | Driver log (only when fallback) | A WARN with the list of reasons each query stage could not run natively. |
+| `spark.comet.explainFallback.enabled` | Driver log (only when fallback) | A WARN with the list of reasons each query stage could not stay in the Comet pipeline. |
| `spark.comet.logFallbackReasons.enabled` | Driver log | One WARN per fallback reason as it is encountered, without surrounding plan context. |
| `spark.comet.explain.format` | Spark SQL UI (Spark 4.0 and newer) | Annotated plan or fallback-reason list, depending on `verbose` (default) or `fallback` value. |
-| `spark.comet.explain.native.enabled` | Executor logs, per task | The DataFusion native plan with metrics, useful for inspecting native execution. |
+| `spark.comet.explain.native.enabled` | Executor logs, per task | The DataFusion plan with metrics, useful for inspecting the Rust-side execution. |
### `spark.comet.explainFallback.enabled`
-Logs a single WARN listing the reasons each query stage could not be executed
-natively. Nothing is logged when the entire stage runs in Comet. Useful as a
-low-noise check that fallback is or is not happening.
+Logs a single WARN listing the reasons each query stage fell back to Spark.
+Nothing is logged when the entire stage stays in the Comet pipeline. Useful
+as a low-noise check that fallback is or is not happening.
### `spark.comet.logFallbackReasons.enabled`
@@ -131,12 +147,12 @@ the config has no effect there.
### `spark.comet.explain.native.enabled`
-When enabled, each executor task logs the DataFusion native plan it executes,
-along with metrics. This is verbose because there is one plan per task, but it
-is the only way to see the native plan as DataFusion sees it (including how
+When enabled, each executor task logs the DataFusion plan it executes,
+along with metrics. This is verbose because there is one plan per task, but
+it is the only way to see the plan as DataFusion sees it (including how
operators were arranged after Comet's serialization). See the
-[Metrics Guide](metrics.md) for details on the native metrics that appear in
-this output.
+[Metrics Guide](metrics.md) for details on the metrics that appear in this
+output.
## Comet Operator Reference
@@ -145,17 +161,17 @@ by role. Names match what is shown in the plan output.
### Scans
-| Node | Description |
-| ------------------------ | --------------------------------------------------------------------------------------------- |
-| `CometBatchScan` | DataSource V2 scan, including Iceberg Parquet, that produces Arrow batches consumed by Comet. |
-| `CometNativeScan` | Fully native Parquet scan that runs entirely in DataFusion. |
-| `CometIcebergNativeScan` | Fully native Iceberg Parquet scan. |
-| `CometCsvNativeScan` | Fully native CSV scan (experimental). |
+| Node | Description |
+| ------------------------ | -------------------------------------------------------------------------------------------------- |
+| `CometBatchScan` | DataSource V2 scan, including Iceberg Parquet, with a JVM reader producing Arrow batches. |
+| `CometNativeScan` | Fully Rust-implemented Parquet scan that runs entirely in DataFusion. |
+| `CometIcebergNativeScan` | Fully Rust-implemented Iceberg Parquet scan. |
+| `CometCsvNativeScan` | Fully Rust-implemented CSV scan (experimental). |
-### Native Execution Operators
+### Arrow-Native Rust Operators
-These run natively in DataFusion. When several appear consecutively in a plan,
-they execute as a single fused native block.
+These execute as Rust code in DataFusion. When several appear consecutively
+in a plan, they run as a single fused block.
| Node | Spark equivalent |
| ---------------------------- | ----------------------------------------------- |
@@ -173,36 +189,49 @@ they execute as a single fused native block.
| `CometWindow` | `WindowExec` |
| `CometTakeOrderedAndProject` | `TakeOrderedAndProjectExec` |
-### JVM-Side Operators
+### Arrow-Native JVM Plumbing
-These keep their data on the JVM but participate in the Comet pipeline.
+These coordinate Arrow batches from JVM code without performing per-row
+compute. They are part of the Comet pipeline and consume and produce Arrow
+batches, but the work happens in Scala/Java.
-| Node | Notes |
-| ------------------------ | ------------------------------------------------------------------------------------- |
-| `CometUnion` | JVM-side union of Comet inputs. The native side reads each branch as a separate scan. |
-| `CometCoalesce` | JVM-side partition coalesce. |
-| `CometCollectLimit` | JVM-side collect limit, equivalent to `CollectLimitExec`. |
-| `CometBroadcastExchange` | Broadcast exchange producing serialized Arrow batches that the consumer can decode. |
-| `CometSubqueryBroadcast` | Companion to `CometBroadcastExchange` for dynamic partition pruning subqueries. |
+| Node | Notes |
+| ------------------------ | ---------------------------------------------------------------------------------- |
+| `CometUnion` | JVM-side union of Comet inputs. The Rust side reads each branch as a separate scan.|
+| `CometCoalesce` | JVM-side partition coalesce. |
+| `CometCollectLimit` | JVM-side collect limit, equivalent to `CollectLimitExec`. |
+| `CometBroadcastExchange` | Broadcast exchange producing serialized Arrow batches that the consumer can decode.|
+| `CometSubqueryBroadcast` | Companion to `CometBroadcastExchange` for dynamic partition pruning subqueries. |
+
+### Arrow-Native JVM Expressions
+
+Some expressions execute as JVM code that operates directly on Arrow batches
+rather than as Rust code in DataFusion. Today this category covers the Scala
+UDF codegen path (see the [Scala UDF and Java UDF Support](scala_java_udfs.md)
+guide); future Arrow-format UDFs and hybrid JVM/native expressions slot in
+here as well. These expressions do not introduce a new plan-node prefix:
+they appear inside the operators that contain them (for example a
+`CometProject` whose project list includes a codegen-dispatched
+`ScalaUDF`).
### Shuffle Operators
Comet has two shuffle implementations and the plan tells you which one is in
-use:
+use. Both serialize batches via Arrow IPC; the difference is implementation
+language, not data format.
- **`CometExchange`** is the **native shuffle** path. The child must already
- be a Comet operator producing columnar Arrow batches; the node calls
+ be a Comet operator producing Arrow batches; the node calls
`executeColumnar()` on its child and the partition, encode, and compress
- steps run in native code. Hash and range partitioning **keys** must be
- primitive types because native hashing and ordering do not support complex
- types, but the data columns themselves can include `StructType`,
- `ArrayType`, and `MapType` since batches are serialized via the Arrow IPC
- writer.
-- **`CometColumnarExchange`** is the **JVM columnar shuffle** path. It accepts
- either Spark row-based input or Comet columnar input, which makes it the
- fallback when the child is not a Comet operator or when a hash/range key
- type is not supported by native shuffle (for example, collated strings). It
- is still preferred over Spark's native shuffle when Comet shuffle is
+ steps run as Rust code. Hash and range partitioning **keys** must be
+ primitive types because the Rust-side hashing and ordering do not support
+ complex types, but the data columns themselves can include `StructType`,
+ `ArrayType`, and `MapType`.
+- **`CometColumnarExchange`** is the **JVM shuffle** path. It accepts either
+ Spark row-based input or Comet Arrow input, which makes it the fallback
+ when the child is not a Comet operator or when a hash/range key type is
+ not supported by native shuffle (for example, collated strings). It is
+ still preferred over Spark's built-in shuffle when Comet shuffle is
enabled.
Both paths support the same set of partitioning schemes
@@ -215,16 +244,17 @@ shuffle and choose between the implementations.
### Columnar/Row Transitions
-Comet inserts these nodes wherever data has to cross the columnar/row boundary.
-Multiple implementations exist because the optimal strategy depends on what
-produced the columnar data.
-
-| Node | Direction | Notes |
-| ------------------------------ | ------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
-| `CometColumnarToRow` | columnar → row | JVM-based row conversion. A fork of Spark's `ColumnarToRowExec` that includes the SPARK-50235 fix. |
-| `CometNativeColumnarToRow` | columnar → row | Native row conversion that decodes broadcast Arrow batches via `NativeColumnarToRowConverter`. Used downstream of `CometBroadcastExchange`. Zero-copy for variable-length types and avoids an extra JVM materialization step. |
-| `CometSparkColumnarToColumnar` | columnar → columnar | Converts a Spark columnar input (a non-Comet `ColumnarBatch`) into Comet's Arrow batches. |
-| `CometSparkRowToColumnar` | row → columnar | Converts a Spark row input into Comet's Arrow batches. |
+Comet inserts these nodes wherever data has to cross between the Arrow-native
+pipeline and Spark's row-based execution, or between a non-Comet Spark
+columnar batch and Arrow. Multiple implementations exist because the optimal
+strategy depends on what produced the incoming data.
+
+| Node | Direction | Notes |
+| ------------------------------ | ------------------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
+| `CometColumnarToRow` | Arrow → row | JVM-based row conversion. A fork of Spark's `ColumnarToRowExec` that includes the SPARK-50235 fix. |
+| `CometNativeColumnarToRow` | Arrow → row | Rust-side row conversion that decodes broadcast Arrow batches via `NativeColumnarToRowConverter`. Used downstream of `CometBroadcastExchange`. Zero-copy for variable-length types and avoids an extra JVM materialization. |
+| `CometSparkColumnarToColumnar` | Spark cols → Arrow | Converts a Spark columnar input (a non-Comet `ColumnarBatch`) into Comet's Arrow batches. |
+| `CometSparkRowToColumnar` | row → Arrow | Converts a Spark row input into Comet's Arrow batches. |
The two `CometSpark*` names come from a single `CometSparkToColumnarExec`
operator that picks the node name based on whether its child supports
From 364a5f525ba91852cacac6a7cdf78e7752299b07 Mon Sep 17 00:00:00 2001
From: Andy Grove
Date: Mon, 25 May 2026 11:24:37 -0600
Subject: [PATCH 2/4] docs: run prettier
---
docs/source/contributor-guide/jvm_shuffle.md | 4 +-
.../user-guide/latest/scala_java_udfs.md | 4 +-
.../latest/understanding-comet-plans.md | 38 +++++++++----------
3 files changed, 23 insertions(+), 23 deletions(-)
diff --git a/docs/source/contributor-guide/jvm_shuffle.md b/docs/source/contributor-guide/jvm_shuffle.md
index c20d6121e0..5a794beea1 100644
--- a/docs/source/contributor-guide/jvm_shuffle.md
+++ b/docs/source/contributor-guide/jvm_shuffle.md
@@ -145,8 +145,8 @@ Selection logic in `CometShuffleManager.shouldBypassMergeSort()`:
### Writers
-| Class | Location | Description |
-| ----------------------------------- | ---------------------------------------------------- | ------------------------------------------------------------------------------------------------------------------- |
+| Class | Location | Description |
+| ----------------------------------- | ---------------------------------------------------- | --------------------------------------------------------------------------------------------------------------------------- |
| `CometBypassMergeSortShuffleWriter` | `.../shuffle/CometBypassMergeSortShuffleWriter.java` | Hash-based writer. Creates one `CometDiskBlockWriter` per partition. Supports async writes. |
| `CometUnsafeShuffleWriter` | `.../shuffle/CometUnsafeShuffleWriter.java` | Sort-based writer. Uses `CometShuffleExternalSorter` to buffer and sort records, then merges spill files. |
| `CometDiskBlockWriter` | `.../shuffle/CometDiskBlockWriter.java` | Buffers rows in memory pages for a single partition. Spills to disk via the Rust IPC encoder. Used by bypass writer. |
diff --git a/docs/source/user-guide/latest/scala_java_udfs.md b/docs/source/user-guide/latest/scala_java_udfs.md
index 6895c1977d..6ee2fd9c4b 100644
--- a/docs/source/user-guide/latest/scala_java_udfs.md
+++ b/docs/source/user-guide/latest/scala_java_udfs.md
@@ -27,8 +27,8 @@ This feature is experimental and disabled by default.
## Configuration
-| Key | Default | Description |
-| ------------------------------------------- | ------- | ------------------------------------------------------------------------------------------------------------------ |
+| Key | Default | Description |
+| ------------------------------------------- | ------- | -------------------------------------------------------------------------------------------------------------------------- |
| `spark.comet.exec.scalaUDF.codegen.enabled` | `false` | When `true`, eligible `ScalaUDF`s run inside the Comet pipeline. When `false`, the enclosing operator falls back to Spark. |
## Supported
diff --git a/docs/source/user-guide/latest/understanding-comet-plans.md b/docs/source/user-guide/latest/understanding-comet-plans.md
index 892cafc8fe..6a190e0b6d 100644
--- a/docs/source/user-guide/latest/understanding-comet-plans.md
+++ b/docs/source/user-guide/latest/understanding-comet-plans.md
@@ -161,12 +161,12 @@ by role. Names match what is shown in the plan output.
### Scans
-| Node | Description |
-| ------------------------ | -------------------------------------------------------------------------------------------------- |
-| `CometBatchScan` | DataSource V2 scan, including Iceberg Parquet, with a JVM reader producing Arrow batches. |
-| `CometNativeScan` | Fully Rust-implemented Parquet scan that runs entirely in DataFusion. |
-| `CometIcebergNativeScan` | Fully Rust-implemented Iceberg Parquet scan. |
-| `CometCsvNativeScan` | Fully Rust-implemented CSV scan (experimental). |
+| Node | Description |
+| ------------------------ | ----------------------------------------------------------------------------------------- |
+| `CometBatchScan` | DataSource V2 scan, including Iceberg Parquet, with a JVM reader producing Arrow batches. |
+| `CometNativeScan` | Fully Rust-implemented Parquet scan that runs entirely in DataFusion. |
+| `CometIcebergNativeScan` | Fully Rust-implemented Iceberg Parquet scan. |
+| `CometCsvNativeScan` | Fully Rust-implemented CSV scan (experimental). |
### Arrow-Native Rust Operators
@@ -195,13 +195,13 @@ These coordinate Arrow batches from JVM code without performing per-row
compute. They are part of the Comet pipeline and consume and produce Arrow
batches, but the work happens in Scala/Java.
-| Node | Notes |
-| ------------------------ | ---------------------------------------------------------------------------------- |
-| `CometUnion` | JVM-side union of Comet inputs. The Rust side reads each branch as a separate scan.|
-| `CometCoalesce` | JVM-side partition coalesce. |
-| `CometCollectLimit` | JVM-side collect limit, equivalent to `CollectLimitExec`. |
-| `CometBroadcastExchange` | Broadcast exchange producing serialized Arrow batches that the consumer can decode.|
-| `CometSubqueryBroadcast` | Companion to `CometBroadcastExchange` for dynamic partition pruning subqueries. |
+| Node | Notes |
+| ------------------------ | ----------------------------------------------------------------------------------- |
+| `CometUnion` | JVM-side union of Comet inputs. The Rust side reads each branch as a separate scan. |
+| `CometCoalesce` | JVM-side partition coalesce. |
+| `CometCollectLimit` | JVM-side collect limit, equivalent to `CollectLimitExec`. |
+| `CometBroadcastExchange` | Broadcast exchange producing serialized Arrow batches that the consumer can decode. |
+| `CometSubqueryBroadcast` | Companion to `CometBroadcastExchange` for dynamic partition pruning subqueries. |
### Arrow-Native JVM Expressions
@@ -249,12 +249,12 @@ pipeline and Spark's row-based execution, or between a non-Comet Spark
columnar batch and Arrow. Multiple implementations exist because the optimal
strategy depends on what produced the incoming data.
-| Node | Direction | Notes |
-| ------------------------------ | ------------------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
-| `CometColumnarToRow` | Arrow → row | JVM-based row conversion. A fork of Spark's `ColumnarToRowExec` that includes the SPARK-50235 fix. |
-| `CometNativeColumnarToRow` | Arrow → row | Rust-side row conversion that decodes broadcast Arrow batches via `NativeColumnarToRowConverter`. Used downstream of `CometBroadcastExchange`. Zero-copy for variable-length types and avoids an extra JVM materialization. |
-| `CometSparkColumnarToColumnar` | Spark cols → Arrow | Converts a Spark columnar input (a non-Comet `ColumnarBatch`) into Comet's Arrow batches. |
-| `CometSparkRowToColumnar` | row → Arrow | Converts a Spark row input into Comet's Arrow batches. |
+| Node | Direction | Notes |
+| ------------------------------ | ------------------ | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
+| `CometColumnarToRow` | Arrow → row | JVM-based row conversion. A fork of Spark's `ColumnarToRowExec` that includes the SPARK-50235 fix. |
+| `CometNativeColumnarToRow` | Arrow → row | Rust-side row conversion that decodes broadcast Arrow batches via `NativeColumnarToRowConverter`. Used downstream of `CometBroadcastExchange`. Zero-copy for variable-length types and avoids an extra JVM materialization. |
+| `CometSparkColumnarToColumnar` | Spark cols → Arrow | Converts a Spark columnar input (a non-Comet `ColumnarBatch`) into Comet's Arrow batches. |
+| `CometSparkRowToColumnar` | row → Arrow | Converts a Spark row input into Comet's Arrow batches. |
The two `CometSpark*` names come from a single `CometSparkToColumnarExec`
operator that picks the node name based on whether its child supports
From b2ad0b17930204db11de511f924cc6a327042cfb Mon Sep 17 00:00:00 2001
From: Andy Grove
Date: Mon, 25 May 2026 11:26:10 -0600
Subject: [PATCH 3/4] docs: fix Comet/Gluten swap in ANSI mode workload
coverage note
---
docs/source/about/gluten_comparison.md | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/docs/source/about/gluten_comparison.md b/docs/source/about/gluten_comparison.md
index a3ffb60bed..1609f1859e 100644
--- a/docs/source/about/gluten_comparison.md
+++ b/docs/source/about/gluten_comparison.md
@@ -92,7 +92,7 @@ will fall back to vanilla Spark. See the [Gluten Velox limitations] page for the
[Gluten Velox limitations]: https://apache.github.io/gluten/velox-backend-limitations.html
For users adopting Spark 4.0 without disabling ANSI mode, this difference can have a significant impact on the
-fraction of a workload Comet can accelerate.
+fraction of a workload Gluten can accelerate.
## Table Format Support
From f84408cf363db459fc98c3ccfff45873af997fcb Mon Sep 17 00:00:00 2001
From: Andy Grove
Date: Mon, 25 May 2026 11:32:13 -0600
Subject: [PATCH 4/4] docs: revert overzealous Rust-side rewrites and add
Arrow-native framing to README
Several spots in the previous nomenclature pass replaced 'native execution' /
'native side' / 'native block' with 'Rust-side ...' even where the original
compound was already clear from surrounding context (e.g. the very next
sentence said 'calls into the native engine', or the section was already
titled 'Native -> JVM Data Flow'). Revert those to the original wording.
The genuinely improved replacements (Comet pipeline framing, the
four-category plan-node taxonomy, the gluten_comparison value prop, the
shuffle prose) stay.
Also pull the Arrow-native framing into the top-level README so the value
prop matches docs/source/index.md.
---
README.md | 21 ++++++----
.../adding_a_new_operator.md | 41 +++++++++----------
docs/source/contributor-guide/ffi.md | 2 +-
docs/source/contributor-guide/jvm_shuffle.md | 4 +-
docs/source/contributor-guide/profiling.md | 2 +-
.../sql_error_propagation.md | 4 +-
docs/source/user-guide/latest/installation.md | 2 +-
.../latest/understanding-comet-plans.md | 32 +++++++--------
8 files changed, 57 insertions(+), 51 deletions(-)
diff --git a/README.md b/README.md
index cba865d96a..4827879671 100644
--- a/README.md
+++ b/README.md
@@ -35,10 +35,12 @@ under the License.
-Apache DataFusion Comet is a high-performance accelerator for Apache Spark, built on top of the powerful
-[Apache DataFusion] query engine. Comet is designed to significantly enhance the
-performance of Apache Spark workloads while leveraging commodity hardware and seamlessly integrating with the
-Spark ecosystem without requiring any code changes.
+Apache DataFusion Comet is a high-performance accelerator for Apache Spark. Comet keeps Spark queries
+**Arrow-native end-to-end**: operators, expressions, shuffle, and broadcast all stay in Apache Arrow
+columnar format, avoiding the per-row overhead of Spark's row-based engine. Within the Arrow-native
+pipeline, operators and expressions execute as Rust code (via the [Apache DataFusion] query engine)
+or as JVM code that operates directly on Arrow batches. Comet integrates with the Spark ecosystem
+without requiring any code changes.
**Comet provides a ~2x speedup for TPC-DS @ SF 1000 (1TB), resulting in ~50% cost savings.**
@@ -58,17 +60,22 @@ See the [Comet Benchmarking Guide](https://datafusion.apache.org/comet/contribut
## What Comet Accelerates
-Comet replaces Spark operators and expressions with native Rust implementations that run on Apache DataFusion.
-It uses Apache Arrow for zero-copy data transfer between the JVM and native code.
+Comet replaces Spark operators and expressions with implementations that consume and produce Apache Arrow
+batches. Most run as native Rust code on top of Apache DataFusion; some run as JVM code over Arrow batches.
+Either way the work stays in the Comet pipeline without falling back to Spark's row-based engine.
- **Parquet scans**: native Parquet reader integrated with Spark's query planner
- **Apache Iceberg**: accelerated Parquet scans when reading Iceberg tables from Spark
(see the [Iceberg guide](https://datafusion.apache.org/comet/user-guide/iceberg.html))
-- **Shuffle**: native columnar shuffle with support for hash and range partitioning
+- **Shuffle**: Arrow-IPC columnar shuffle with support for hash and range partitioning, in a native Rust
+ implementation paired with a JVM fallback for unsupported partition key types
- **Expressions**: hundreds of supported Spark expressions across math, string, datetime, array,
map, JSON, hash, and predicate categories
- **Aggregations**: hash aggregate with support for `FILTER (WHERE ...)` clauses
- **Joins**: hash join, sort-merge join, and broadcast join
+- **Scala/Java UDFs**: experimental support for keeping Scala/Java scalar UDFs in the Comet pipeline
+ via Spark's whole-stage codegen (see the
+ [Scala UDF guide](https://datafusion.apache.org/comet/user-guide/scala_java_udfs.html))
For the authoritative lists, see the [supported expressions](https://datafusion.apache.org/comet/user-guide/expressions.html)
and [supported operators](https://datafusion.apache.org/comet/user-guide/operators.html) pages.
diff --git a/docs/source/contributor-guide/adding_a_new_operator.md b/docs/source/contributor-guide/adding_a_new_operator.md
index 02504cdacf..4c7d7b47ac 100644
--- a/docs/source/contributor-guide/adding_a_new_operator.md
+++ b/docs/source/contributor-guide/adding_a_new_operator.md
@@ -24,37 +24,36 @@ This guide explains how to add support for a new Spark physical operator in Apac
## Overview
`CometExecRule` is responsible for replacing Spark operators with Comet operators. There are different approaches to
-implementing Comet operators depending on where they execute and how they integrate with the Rust-side DataFusion
-engine.
+implementing Comet operators depending on where they execute and how they integrate with the native DataFusion engine.
### Types of Comet Operators
`CometExecRule` maintains two distinct maps of operators:
-#### 1. Rust-Implemented Operators (`nativeExecs` map)
+#### 1. Native Operators (`nativeExecs` map)
-These operators run entirely as Rust code and are the primary way to accelerate Spark workloads. They are
+These operators run entirely in native Rust code and are the primary way to accelerate Spark workloads. They are
registered in the `nativeExecs` map in `CometExecRule.scala`.
Key characteristics:
-- They are converted to their corresponding protobuf representation
-- They execute as DataFusion operators on the Rust side
+- They are converted to their corresponding native protobuf representation
+- They execute as DataFusion operators in the native engine
- The `CometOperatorSerde` implementation handles enable/disable checks, support validation, and protobuf serialization
Examples: `ProjectExec`, `FilterExec`, `SortExec`, `HashAggregateExec`, `SortMergeJoinExec`, `ExpandExec`, `WindowExec`
#### 2. Sink Operators (`sinks` map)
-Sink operators serve as entry points (data sources) for fused Rust-side blocks. They are registered in the `sinks`
+Sink operators serve as entry points (data sources) for native execution blocks. They are registered in the `sinks`
map in `CometExecRule.scala`.
Key characteristics of sinks:
-- They become `ScanExec` operators in the serialized DataFusion plan (see `operator2Proto` in `CometExecRule.scala`)
-- They can be leaf nodes that feed data into the Rust-side block
+- They become `ScanExec` operators in the native plan (see `operator2Proto` in `CometExecRule.scala`)
+- They can be leaf nodes that feed data into native execution blocks
- They are wrapped with `CometScanWrapper` or `CometSinkPlaceHolder` during plan transformation
-- Examples include operators that bring data from various sources into the Rust-side block
+- Examples include operators that bring data from various sources into native execution blocks
Examples: `UnionExec`, `CoalesceExec`, `CollectLimitExec`, `TakeOrderedAndProjectExec`
@@ -75,26 +74,26 @@ Examples: `CometBroadcastExchangeExec`, `CometShuffleExchangeExec`
When adding a new operator, choose based on these criteria:
-**Use Rust-implemented operators when:**
+**Use Native Operators when:**
- The operator transforms data (e.g., project, filter, sort, aggregate, join)
- The operator has a direct DataFusion equivalent or custom implementation
-- The operator consumes Comet child operators and produces Arrow output
+- The operator consumes native child operators and produces native output
- The operator is in the middle of an execution pipeline
**Use Sink Operators when:**
-- The operator serves as a data source for the Rust-side block (becomes a `ScanExec`)
-- The operator brings data from non-Comet sources (e.g., `UnionExec` combining multiple inputs)
+- The operator serves as a data source for a native execution block (becomes a `ScanExec`)
+- The operator brings data from non-native sources (e.g., `UnionExec` combining multiple inputs)
- The operator is typically a leaf or near-leaf node in the execution tree
-- The operator needs special handling to interface with the Rust-side engine
+- The operator needs special handling to interface with the native engine
**Implementation Note for Sinks:**
Sink operators are handled specially in `CometExecRule.operator2Proto`. Instead of converting to their own operator
-type, they are converted to `ScanExec` in the serialized DataFusion plan. This allows them to serve as entry points
-for the fused Rust-side block. The original Spark operator is wrapped with `CometScanWrapper` or
-`CometSinkPlaceHolder` which manages the JVM/Rust boundary.
+type, they are converted to `ScanExec` in the native plan. This allows them to serve as entry points for native
+execution blocks. The original Spark operator is wrapped with `CometScanWrapper` or `CometSinkPlaceHolder` which
+manages the boundary between JVM and native execution.
## Implementing a Native Operator
@@ -419,7 +418,7 @@ Add your operator to the supported operators list in `docs/source/user-guide/lat
## Implementing a Sink Operator
-Sink operators are converted to `ScanExec` in the serialized DataFusion plan and serve as entry points for the fused Rust-side block. The implementation is simpler than Rust-implemented operators because sink operators extend the `CometSink` base class which provides the conversion logic.
+Sink operators are converted to `ScanExec` in the native plan and serve as entry points for native execution blocks. The implementation is simpler than native operators because sink operators extend the `CometSink` base class which provides the conversion logic.
### Step 1: Create a CometOperatorSerde Implementation
@@ -517,7 +516,7 @@ val COMET_EXEC_YOUR_SINK_ENABLED: ConfigEntry[Boolean] =
### Step 4: Add Tests
-Test that your sink operator correctly feeds data into the Rust-side block:
+Test that your sink operator correctly feeds data into a native execution block:
```scala
test("your sink operator") {
@@ -525,7 +524,7 @@ test("your sink operator") {
sql("CREATE TABLE test_table(col1 INT, col2 STRING) USING parquet")
sql("INSERT INTO test_table VALUES (1, 'a'), (2, 'b')")
- // Test query that uses your sink operator followed by Rust-implemented operators
+ // Test query that uses your sink operator followed by native operators
checkSparkAnswerAndOperator(
"SELECT col1 + 1 FROM (/* query that produces YourSinkExec */)"
)
diff --git a/docs/source/contributor-guide/ffi.md b/docs/source/contributor-guide/ffi.md
index d7a97beb61..24e6843ba1 100644
--- a/docs/source/contributor-guide/ffi.md
+++ b/docs/source/contributor-guide/ffi.md
@@ -193,7 +193,7 @@ heap memory configured.
### Architecture
-When the JVM needs results from the Rust-side DataFusion plan:
+When JVM needs results from native execution:
```
┌─────────────────┐
diff --git a/docs/source/contributor-guide/jvm_shuffle.md b/docs/source/contributor-guide/jvm_shuffle.md
index 5a794beea1..b7c014259d 100644
--- a/docs/source/contributor-guide/jvm_shuffle.md
+++ b/docs/source/contributor-guide/jvm_shuffle.md
@@ -168,7 +168,7 @@ Selection logic in `CometShuffleManager.shouldBypassMergeSort()`:
2. Writer receives `Iterator[Product2[K, V]]` where V is `UnsafeRow`
3. Rows are serialized and buffered in off-heap memory pages
4. When memory threshold or batch size is reached, `SpillWriter.doSpilling()` is called
-5. The Rust side (`Native.writeSortedFileNative()`) converts rows to Arrow arrays and writes IPC format
+5. Native code (`Native.writeSortedFileNative()`) converts rows to Arrow arrays and writes IPC format
6. For bypass writer: partition files are concatenated into final output
7. For sort writer: spill files are merged
@@ -176,7 +176,7 @@ Selection logic in `CometShuffleManager.shouldBypassMergeSort()`:
1. `CometBlockStoreShuffleReader.read()` creates `ShuffleBlockFetcherIterator`
2. For each block, `NativeBatchDecoderIterator` reads the IPC stream
-3. The Rust side (`Native.decodeShuffleBlock()`) decompresses and decodes to Arrow arrays
+3. Native code (`Native.decodeShuffleBlock()`) decompresses and decodes to Arrow arrays
4. Arrow FFI imports arrays as `ColumnarBatch`
## Memory Management
diff --git a/docs/source/contributor-guide/profiling.md b/docs/source/contributor-guide/profiling.md
index 709a1a2a9c..67729a235e 100644
--- a/docs/source/contributor-guide/profiling.md
+++ b/docs/source/contributor-guide/profiling.md
@@ -293,4 +293,4 @@ cargo flamegraph --root --bench
In async-profiler flame graphs, native Rust frames appear below JNI entry points like
`Java_org_apache_comet_Native_*`. Look for these transition points to understand how
-time is split between JVM code and Comet's Rust-side execution.
+time is split between Spark's JVM code and Comet's native execution.
diff --git a/docs/source/contributor-guide/sql_error_propagation.md b/docs/source/contributor-guide/sql_error_propagation.md
index 48620a84a9..a27408510d 100644
--- a/docs/source/contributor-guide/sql_error_propagation.md
+++ b/docs/source/contributor-guide/sql_error_propagation.md
@@ -352,8 +352,8 @@ public final class CometQueryExecutionException extends CometNativeException {
## Step 7: Scala Converts JSON Back to a Real Spark Exception
-`CometExecIterator.scala` is the Scala code that drives the Rust-side DataFusion plan. Every
-time it calls into the native engine for the next batch of data, it catches
+`CometExecIterator.scala` is the Scala code that drives the native execution. Every time it
+calls into the native engine for the next batch of data, it catches
`CometQueryExecutionException` and converts it:
```scala
diff --git a/docs/source/user-guide/latest/installation.md b/docs/source/user-guide/latest/installation.md
index 106a232227..4b7717b688 100644
--- a/docs/source/user-guide/latest/installation.md
+++ b/docs/source/user-guide/latest/installation.md
@@ -175,4 +175,4 @@ Some cluster managers may require additional configuration, see