Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 14 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,12 @@ under the License.

<img src="docs/source/_static/images/DataFusionComet-Logo-Light.png" width="512" alt="logo"/>

Apache DataFusion Comet is a high-performance accelerator for Apache Spark, built on top of the powerful
[Apache DataFusion] query engine. Comet is designed to significantly enhance the
performance of Apache Spark workloads while leveraging commodity hardware and seamlessly integrating with the
Spark ecosystem without requiring any code changes.
Apache DataFusion Comet is a high-performance accelerator for Apache Spark. Comet keeps Spark queries
**Arrow-native end-to-end**: operators, expressions, shuffle, and broadcast all stay in Apache Arrow
columnar format, avoiding the per-row overhead of Spark's row-based engine. Within the Arrow-native
pipeline, operators and expressions execute as Rust code (via the [Apache DataFusion] query engine)
or as JVM code that operates directly on Arrow batches. Comet integrates with the Spark ecosystem
without requiring any code changes.

**Comet provides a ~2x speedup for TPC-DS @ SF 1000 (1TB), resulting in ~50% cost savings.**

Expand All @@ -58,17 +60,22 @@ See the [Comet Benchmarking Guide](https://datafusion.apache.org/comet/contribut

## What Comet Accelerates

Comet replaces Spark operators and expressions with native Rust implementations that run on Apache DataFusion.
It uses Apache Arrow for zero-copy data transfer between the JVM and native code.
Comet replaces Spark operators and expressions with implementations that consume and produce Apache Arrow
batches. Most run as native Rust code on top of Apache DataFusion; some run as JVM code over Arrow batches.
Either way the work stays in the Comet pipeline without falling back to Spark's row-based engine.

- **Parquet scans**: native Parquet reader integrated with Spark's query planner
- **Apache Iceberg**: accelerated Parquet scans when reading Iceberg tables from Spark
(see the [Iceberg guide](https://datafusion.apache.org/comet/user-guide/iceberg.html))
- **Shuffle**: native columnar shuffle with support for hash and range partitioning
- **Shuffle**: Arrow-IPC columnar shuffle with support for hash and range partitioning, in a native Rust
implementation paired with a JVM fallback for unsupported partition key types
- **Expressions**: hundreds of supported Spark expressions across math, string, datetime, array,
map, JSON, hash, and predicate categories
- **Aggregations**: hash aggregate with support for `FILTER (WHERE ...)` clauses
- **Joins**: hash join, sort-merge join, and broadcast join
- **Scala/Java UDFs**: experimental support for keeping Scala/Java scalar UDFs in the Comet pipeline
via Spark's whole-stage codegen (see the
[Scala UDF guide](https://datafusion.apache.org/comet/user-guide/scala_java_udfs.html))

For the authoritative lists, see the [supported expressions](https://datafusion.apache.org/comet/user-guide/expressions.html)
and [supported operators](https://datafusion.apache.org/comet/user-guide/operators.html) pages.
Expand Down
31 changes: 19 additions & 12 deletions docs/source/about/gluten_comparison.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,23 +28,29 @@ This document is based on Comet 0.16.0 and Gluten 1.6.0.

## Architecture

Comet and Gluten have very similar architectures. Both are Spark plugins that translate Spark physical plans to
a serialized representation and pass the serialized plan to native code for execution.
Comet and Gluten have similar high-level architectures. Both are Spark plugins that translate Spark physical plans
into a serialized representation and run them through an Arrow-columnar execution engine instead of Spark's
row-based engine.

Gluten serializes the plans using the Substrait format and has an extensible architecture that supports execution
against multiple engines. Velox and Clickhouse are currently supported, but Velox is more widely used.

Comet serializes the plans in a proprietary Protocol Buffer format. Execution is delegated to Apache DataFusion. Comet
does not plan to support multiple engines, but rather focus on a tight integration between Spark and DataFusion.

A second architectural difference is how each project handles JVM-side code. Gluten offloads supported operators to
its native backend and falls back to row-based Spark execution otherwise. Comet additionally keeps a JVM execution
path that operates directly on Arrow batches, which lets some constructs (today, Scala/Java scalar UDFs via Spark's
whole-stage codegen) stay in the Arrow pipeline rather than triggering a fallback.

## Underlying Execution Engine: DataFusion vs Velox

One of the main differences between Comet and Gluten is the choice of native execution engine.
One of the main differences between Comet and Gluten is the choice of Arrow-columnar execution engine.

Gluten uses Velox, which is an open-source C++ vectorized query engine created by Meta.
Gluten uses Velox, which is an open-source C++ columnar query engine created by Meta.

Comet uses Apache DataFusion, which is an open-source vectorized query engine implemented in Rust and is governed by the
Apache Software Foundation.
Comet uses Apache DataFusion, which is an open-source columnar query engine implemented in Rust and is governed by
the Apache Software Foundation.

Velox and DataFusion are both mature query engines that are growing in popularity.

Expand All @@ -53,8 +59,8 @@ the choice of implementation language (Rust vs C++) and this may be the main fac
choosing a solution. For users wishing to implement UDFs in Rust, Comet would likely be a better choice. For users
wishing to implement UDFs in C++, Gluten would likely be a better choice.

If users are just interested in speeding up their existing Spark jobs and do not need to implement UDFs in native
code, then we suggest benchmarking with both solutions and choosing the fastest one for your use case.
If users are just interested in speeding up their existing Spark jobs and do not need to implement UDFs in Rust
or C++, then we suggest benchmarking with both solutions and choosing the fastest one for your use case.

![github-stars-datafusion-velox.png](/_static/images/github-stars-datafusion-velox.png)

Expand All @@ -75,17 +81,18 @@ Gluten supports Spark 3.3, 3.4, 3.5, 4.0, and 4.1.
Spark 4.0 enables ANSI SQL semantics by default, which changes how arithmetic overflow, invalid casts, division by
zero, and similar error conditions are handled. This is one area where the two projects currently differ.

Comet implements ANSI semantics for the expressions it supports natively, including arithmetic overflow checks,
ANSI cast behavior, and `try_*` variants. Queries running with `spark.sql.ansi.enabled=true` continue to be accelerated.
See the [Comet Compatibility Guide] for details on which expressions have full ANSI coverage.
Comet implements ANSI semantics for the expressions it supports in the Comet pipeline, including arithmetic
overflow checks, ANSI cast behavior, and `try_*` variants. Queries running with `spark.sql.ansi.enabled=true`
continue to be accelerated. See the [Comet Compatibility Guide] for details on which expressions have full
ANSI coverage.

The Gluten Velox backend documents that ANSI mode is not supported and that any query executed with ANSI enabled
will fall back to vanilla Spark. See the [Gluten Velox limitations] page for the current status.

[Gluten Velox limitations]: https://apache.github.io/gluten/velox-backend-limitations.html

For users adopting Spark 4.0 without disabling ANSI mode, this difference can have a significant impact on the
fraction of a workload that runs natively.
fraction of a workload Gluten can accelerate.

## Table Format Support

Expand Down
6 changes: 3 additions & 3 deletions docs/source/contributor-guide/adding_a_new_expression.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ org.apache.comet.CometNativeException: Error from DataFusion:
Function 'levenshtein' expects 2 arguments but received 3.
```

The classic case is `levenshtein`. Spark accepts an optional 3rd `threshold` argument, DataFusion's built-in is 2-arg only, so the 3-arg form fails native execution unless the serde sets the return type explicitly. Other names that exist in both engines with potentially different signatures include `concat`, `coalesce`, `sha2`, and `regexp_replace`. If you are adding a function whose name is shared with `datafusion-functions`, check the upstream signature before deciding how to serialize.
The classic case is `levenshtein`. Spark accepts an optional 3rd `threshold` argument, DataFusion's built-in is 2-arg only, so the 3-arg form fails at execution time unless the serde sets the return type explicitly. Other names that exist in both engines with potentially different signatures include `concat`, `coalesce`, `sha2`, and `regexp_replace`. If you are adding a function whose name is shared with `datafusion-functions`, check the upstream signature before deciding how to serialize.

To avoid the registry lookup, write a custom `CometExpressionSerde` and use `scalarFunctionExprToProtoWithReturnType`, passing the Spark expression's declared `dataType`:

Expand Down Expand Up @@ -143,7 +143,7 @@ The `SupportLevel` sealed trait has three possible values:

- **`Compatible(notes: Option[String] = None)`** - Comet supports this expression with full compatibility with Spark, or may have known differences in specific edge cases unlikely to affect most users. This is the default if you don't override `getSupportLevel`.
- **`Incompatible(notes: Option[String] = None)`** - Comet supports this expression but results can differ from Spark. The expression will only be used if `spark.comet.expr.allowIncompatible=true` or the expression-specific config `spark.comet.expr.<exprName>.allowIncompatible=true` is set.
- **`Unsupported(notes: Option[String] = None)`** - Comet does not support this expression under the current conditions. Spark will fall back to its native execution.
- **`Unsupported(notes: Option[String] = None)`** - Comet does not support this expression under the current conditions. The query falls back to Spark for this expression.

All three accept an optional `notes` parameter to provide additional context that is logged for debugging.

Expand Down Expand Up @@ -307,7 +307,7 @@ override def getUnsupportedReasons(): Seq[String] = Seq(

#### Adding Spark-side Tests for the New Expression

It is important to verify that the new expression is correctly recognized by the native execution engine and matches the expected Spark behavior. The preferred way to add test coverage is to write a Comet SQL Test. This approach is simpler than writing Comet Scala Tests and makes it easy to cover many input combinations and edge cases.
It is important to verify that the new expression is correctly recognized by Comet and matches the expected Spark behavior. The preferred way to add test coverage is to write a Comet SQL Test. This approach is simpler than writing Comet Scala Tests and makes it easy to cover many input combinations and edge cases.

##### Writing a Comet SQL Test

Expand Down
20 changes: 10 additions & 10 deletions docs/source/contributor-guide/adding_a_new_operator.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,18 @@ This guide explains how to add support for a new Spark physical operator in Apac
## Overview

`CometExecRule` is responsible for replacing Spark operators with Comet operators. There are different approaches to
implementing Comet operators depending on where they execute and how they integrate with the native execution engine.
implementing Comet operators depending on where they execute and how they integrate with the native DataFusion engine.

### Types of Comet Operators

`CometExecRule` maintains two distinct maps of operators:

#### 1. Native Operators (`nativeExecs` map)

These operators run entirely in native Rust code and are the primary way to accelerate Spark workloads. Native
operators are registered in the `nativeExecs` map in `CometExecRule.scala`.
These operators run entirely in native Rust code and are the primary way to accelerate Spark workloads. They are
registered in the `nativeExecs` map in `CometExecRule.scala`.

Key characteristics of native operators:
Key characteristics:

- They are converted to their corresponding native protobuf representation
- They execute as DataFusion operators in the native engine
Expand All @@ -53,7 +53,7 @@ Key characteristics of sinks:
- They become `ScanExec` operators in the native plan (see `operator2Proto` in `CometExecRule.scala`)
- They can be leaf nodes that feed data into native execution blocks
- They are wrapped with `CometScanWrapper` or `CometSinkPlaceHolder` during plan transformation
- Examples include operators that bring data from various sources into native execution
- Examples include operators that bring data from various sources into native execution blocks

Examples: `UnionExec`, `CoalesceExec`, `CollectLimitExec`, `TakeOrderedAndProjectExec`

Expand All @@ -65,7 +65,7 @@ Special sinks (not in the `sinks` map but also treated as sinks):

#### 3. Comet JVM Operators

These operators run in the JVM but are part of the Comet execution path. For JVM operators, all checks happen
These operators run in the JVM but are part of the Comet pipeline. For JVM operators, all checks happen
in `CometExecRule` rather than using `CometOperatorSerde`, because they don't need protobuf serialization.

Examples: `CometBroadcastExchangeExec`, `CometShuffleExchangeExec`
Expand All @@ -83,7 +83,7 @@ When adding a new operator, choose based on these criteria:

**Use Sink Operators when:**

- The operator serves as a data source for native execution (becomes a `ScanExec`)
- The operator serves as a data source for a native execution block (becomes a `ScanExec`)
- The operator brings data from non-native sources (e.g., `UnionExec` combining multiple inputs)
- The operator is typically a leaf or near-leaf node in the execution tree
- The operator needs special handling to interface with the native engine
Expand Down Expand Up @@ -393,7 +393,7 @@ test("your operator") {

The `checkSparkAnswerAndOperator` helper verifies:

1. Results match Spark's native execution
1. Results match Spark's reference execution
2. Your operator is actually being used (not falling back)

#### Rust Unit Tests
Expand All @@ -418,7 +418,7 @@ Add your operator to the supported operators list in `docs/source/user-guide/lat

## Implementing a Sink Operator

Sink operators are converted to `ScanExec` in the native plan and serve as entry points for native execution. The implementation is simpler than native operators because sink operators extend the `CometSink` base class which provides the conversion logic.
Sink operators are converted to `ScanExec` in the native plan and serve as entry points for native execution blocks. The implementation is simpler than native operators because sink operators extend the `CometSink` base class which provides the conversion logic.

### Step 1: Create a CometOperatorSerde Implementation

Expand Down Expand Up @@ -516,7 +516,7 @@ val COMET_EXEC_YOUR_SINK_ENABLED: ConfigEntry[Boolean] =

### Step 4: Add Tests

Test that your sink operator correctly feeds data into native execution:
Test that your sink operator correctly feeds data into a native execution block:

```scala
test("your sink operator") {
Expand Down
4 changes: 2 additions & 2 deletions docs/source/contributor-guide/development.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ under the License.

## Threading Architecture

Comet's native execution runs on a shared tokio multi-threaded runtime. Understanding this
architecture is important because it affects how you write native operators and JVM callbacks.
Comet's Rust side runs DataFusion plans on a shared tokio multi-threaded runtime. Understanding
this architecture is important because it affects how you write Rust operators and JVM callbacks.

### How execution works

Expand Down
Loading