Skip to content

regression: ProjectionPushdown failed with project index 2 out of bounds, max field 2 #21459

@haohuaijin

Description

@haohuaijin

Describe the bug

ProjectionExec: expr=[timestamp@0 as timestamp, tokens@1 as tokens]
  FilterExec: timestamp@0 >= 1775570460000000, projection=[timestamp@0, tokens@1, service_name@2]
    DataSourceExec: partitions=1, partition_sizes=[1]

apply ProjectionPushdown to above plan will got error

thread 'main' (13911523) panicked at examples/query.rs:92:10:
called Result::unwrap() on an Err value: ArrowError(SchemaError("project index 2 out of bounds, max field 2"), Some(""))

To Reproduce

use std::sync::Arc;

use arrow::array::{Int64Array, RecordBatch, StringArray};
use arrow::datatypes::{DataType, Field, Schema};
use arrow::util::pretty::pretty_format_batches;
use datafusion::common::config::ConfigOptions;
use datafusion::execution::TaskContext;
use datafusion::physical_expr::expressions::{BinaryExpr, Literal, col};
use datafusion::physical_optimizer::PhysicalOptimizerRule;
use datafusion::physical_optimizer::projection_pushdown::ProjectionPushdown;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::physical_plan::displayable;
use datafusion::physical_plan::filter::FilterExecBuilder;
use datafusion::physical_plan::projection::{ProjectionExec, ProjectionExpr};
use datafusion_common::ScalarValue;
use datafusion_expr::Operator;
use futures::TryStreamExt;

#[tokio::main]
async fn main() {
    // Schema: [timestamp, tokens, service_name]
    let schema = Arc::new(Schema::new(vec![
        Field::new("timestamp", DataType::Int64, false),
        Field::new("tokens", DataType::Int64, false),
        Field::new("service_name", DataType::Utf8, false),
    ]));

    // Sample data
    let timestamps = vec![1775570460000000i64];
    let tokens = vec![100i64];
    let services = vec!["service-a"];

    let batch = RecordBatch::try_new(
        schema.clone(),
        vec![
            Arc::new(Int64Array::from(timestamps)),
            Arc::new(Int64Array::from(tokens)),
            Arc::new(StringArray::from(services)),
        ],
    )
    .unwrap();

    // DataSource: MemoryExec with projection=[timestamp, tokens, service_name]
    let memory_exec = datafusion::catalog::memory::MemorySourceConfig::try_new_exec(
        &[vec![batch]],
        schema.clone(),
        None,
    )
    .unwrap();

    // Filter predicate: timestamp >= 1775570460000000
    let predicate = Arc::new(BinaryExpr::new(
        col("timestamp", &memory_exec.schema()).unwrap(),
        Operator::GtEq,
        Arc::new(Literal::new(ScalarValue::Int64(Some(1775570460000000)))),
    ));

    // FilterExec: predicate with projection=[timestamp@0, tokens@1, service_name@2]
    let filter_exec = Arc::new(
        FilterExecBuilder::new(predicate, memory_exec)
            .apply_projection(Some(vec![0, 1, 2]))
            .unwrap()
            .build()
            .unwrap(),
    );

    // ProjectionExec: expr=[timestamp@0 as timestamp, tokens@1 as tokens]
    let proj_exprs = vec![
        ProjectionExpr {
            expr: col("timestamp", &filter_exec.schema()).unwrap(),
            alias: "timestamp".to_string(),
        },
        ProjectionExpr {
            expr: col("tokens", &filter_exec.schema()).unwrap(),
            alias: "tokens".to_string(),
        },
    ];
    let projection_exec =
        Arc::new(ProjectionExec::try_new(proj_exprs, filter_exec).unwrap());

    // Print the plan before optimization
    let display = displayable(projection_exec.as_ref() as &dyn ExecutionPlan);
    println!("before pushdown plan:\n{}", display.indent(true));

    // Apply projection pushdown optimization
    let config = ConfigOptions::default();
    let optimized = ProjectionPushdown::new()
        .optimize(
            Arc::clone(&projection_exec) as Arc<dyn ExecutionPlan>,
            &config,
        )
        .unwrap();

    // Print the plan after optimization
    let display = displayable(optimized.as_ref());
    println!("after pushdown plan:\n{}", display.indent(true));

    // Execute and print results before pushdown
    let task_ctx = Arc::new(TaskContext::default());
    let stream = projection_exec.execute(0, Arc::clone(&task_ctx)).unwrap();
    let batches: Vec<RecordBatch> = stream.try_collect().await.unwrap();
    println!(
        "before pushdown result:\n{}",
        pretty_format_batches(&batches).unwrap()
    );

    // Execute and print results after pushdown
    let stream = optimized.execute(0, Arc::clone(&task_ctx)).unwrap();
    let batches: Vec<RecordBatch> = stream.try_collect().await.unwrap();
    println!(
        "after pushdown result:\n{}",
        pretty_format_batches(&batches).unwrap()
    );
}

Expected behavior

i should work fine

Additional context

it work in datafusion v52, but report error in v53

Metadata

Metadata

Assignees

Labels

bugSomething isn't working

Type

No fields configured for Bug.

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions