Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 12 additions & 6 deletions datafusion/core/src/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1015,6 +1015,16 @@ impl DataFrame {
//the functions now supported
let supported_describe_functions =
vec!["count", "null_count", "mean", "std", "min", "max", "median"];
let supports_describe_min_max = |data_type: &DataType| {
!matches!(
data_type,
DataType::Boolean
| DataType::Binary
| DataType::LargeBinary
| DataType::BinaryView
| DataType::FixedSizeBinary(_)
)
};

let original_schema_fields = self.schema().fields().iter();

Expand Down Expand Up @@ -1075,9 +1085,7 @@ impl DataFrame {
vec![],
original_schema_fields
.clone()
.filter(|f| {
!matches!(f.data_type(), DataType::Binary | DataType::Boolean)
})
.filter(|f| supports_describe_min_max(f.data_type()))
.map(|f| min(ident(f.name())).alias(f.name()))
.collect::<Vec<_>>(),
),
Expand All @@ -1086,9 +1094,7 @@ impl DataFrame {
vec![],
original_schema_fields
.clone()
.filter(|f| {
!matches!(f.data_type(), DataType::Binary | DataType::Boolean)
})
.filter(|f| supports_describe_min_max(f.data_type()))
.map(|f| max(ident(f.name())).alias(f.name()))
.collect::<Vec<_>>(),
),
Expand Down
92 changes: 92 additions & 0 deletions datafusion/core/tests/dataframe/describe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ use datafusion::prelude::{ParquetReadOptions, SessionContext};
use datafusion_common::test_util::batches_to_string;
use datafusion_common::{Result, test_util::parquet_test_data};
use insta::assert_snapshot;
use std::sync::Arc;

use arrow::array::{FixedSizeBinaryArray, Int32Array, RecordBatch};
use arrow::datatypes::{DataType, Field, Schema};

#[tokio::test]
async fn describe() -> Result<()> {
Expand Down Expand Up @@ -81,6 +85,94 @@ async fn describe_boolean_binary() -> Result<()> {
Ok(())
}

#[tokio::test]
async fn describe_fixed_size_binary() -> Result<()> {
let ctx = SessionContext::new();
let batch = RecordBatch::try_new(
Arc::new(Schema::new(vec![Field::new(
"fsb",
DataType::FixedSizeBinary(3),
true,
)])),
vec![Arc::new(FixedSizeBinaryArray::from(vec![
Some(&[1_u8, 2, 3][..]),
None,
Some(&[4_u8, 5, 6][..]),
]))],
)?;
ctx.register_batch("test", batch)?;

let result = ctx.table("test").await?.describe().await?.collect().await?;

assert_snapshot!(
batches_to_string(&result),
@r"
+------------+------+
| describe | fsb |
+------------+------+
| count | 2 |
| null_count | 1 |
| mean | null |
| std | null |
| min | null |
| max | null |
| median | null |
+------------+------+
"
);
Ok(())
}

#[tokio::test]
async fn describe_mixed_numeric_and_fixed_size_binary() -> Result<()> {
let ctx = SessionContext::new();
let batch = RecordBatch::try_new(
Arc::new(Schema::new(vec![
Field::new("num", DataType::Int32, true),
Field::new("fsb", DataType::FixedSizeBinary(3), true),
])),
vec![
Arc::new(Int32Array::from(vec![Some(10), Some(20), Some(30)])),
Arc::new(FixedSizeBinaryArray::from(vec![
Some(&[1_u8, 2, 3][..]),
None,
Some(&[4_u8, 5, 6][..]),
])),
],
)?;
ctx.register_batch("test_mixed", batch)?;

let result = ctx
.table("test_mixed")
.await?
.describe()
.await?
.collect()
.await?;

// num is numeric so min/max/mean/median/std are computed;
// fsb is FixedSizeBinary so it is filtered out of min/max but still
// appears in count/null_count. This exercises the filter path (partial
// column list in the aggregate) rather than the empty-aggregate fallback.
assert_snapshot!(
batches_to_string(&result),
@r"
+------------+------+------+
| describe | num | fsb |
+------------+------+------+
| count | 3.0 | 2 |
| null_count | 0.0 | 1 |
| mean | 20.0 | null |
| std | 10.0 | null |
| min | 10.0 | null |
| max | 30.0 | null |
| median | 20.0 | null |
+------------+------+------+
"
);
Ok(())
}

#[tokio::test]
async fn describe_null() -> Result<()> {
let ctx = parquet_context().await;
Expand Down