Skip to content

Commit

Permalink
Improve recursive unnest options API (#12836)
Browse files Browse the repository at this point in the history
* refactor

* refactor unnest options

* more test

* resolve comments

* add back doc

* fix proto

* flaky test

* clippy

* use indexmap

* chore: compile err

* chore: update cargo

* chore: fmt cargotoml

---------

Co-authored-by: Andrew Lamb <[email protected]>
  • Loading branch information
duongcongtoai and alamb authored Oct 20, 2024
1 parent 7a34147 commit c7e5d8d
Show file tree
Hide file tree
Showing 17 changed files with 504 additions and 529 deletions.
1 change: 1 addition & 0 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion datafusion/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ pub use scalar::{ScalarType, ScalarValue};
pub use schema_reference::SchemaReference;
pub use stats::{ColumnStatistics, Statistics};
pub use table_reference::{ResolvedTableReference, TableReference};
pub use unnest::UnnestOptions;
pub use unnest::{RecursionUnnestOption, UnnestOptions};
pub use utils::project_schema;

// These are hidden from docs purely to avoid polluting the public view of what this crate exports.
Expand Down
26 changes: 26 additions & 0 deletions datafusion/common/src/unnest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

//! [`UnnestOptions`] for unnesting structured types

use crate::Column;

/// Options for unnesting a column that contains a list type,
/// replicating values in the other, non nested rows.
///
Expand Down Expand Up @@ -60,17 +62,35 @@
/// └─────────┘ └─────┘ └─────────┘ └─────┘
/// c1 c2 c1 c2
/// ```
///
/// `recursions` instruct how a column should be unnested (e.g unnesting a column multiple
/// time, with depth = 1 and depth = 2). Any unnested column not being mentioned inside this
/// options is inferred to be unnested with depth = 1
#[derive(Debug, Clone, PartialEq, PartialOrd, Hash, Eq)]
pub struct UnnestOptions {
/// Should nulls in the input be preserved? Defaults to true
pub preserve_nulls: bool,
/// If specific columns need to be unnested multiple times (e.g at different depth),
/// declare them here. Any unnested columns not being mentioned inside this option
/// will be unnested with depth = 1
pub recursions: Vec<RecursionUnnestOption>,
}

/// Instruction on how to unnest a column (mostly with a list type)
/// such as how to name the output, and how many level it should be unnested
#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd)]
pub struct RecursionUnnestOption {
pub input_column: Column,
pub output_column: Column,
pub depth: usize,
}

impl Default for UnnestOptions {
fn default() -> Self {
Self {
// default to true to maintain backwards compatible behavior
preserve_nulls: true,
recursions: vec![],
}
}
}
Expand All @@ -87,4 +107,10 @@ impl UnnestOptions {
self.preserve_nulls = preserve_nulls;
self
}

/// Set the recursions for the unnest operation
pub fn with_recursions(mut self, recursion: RecursionUnnestOption) -> Self {
self.recursions.push(recursion);
self
}
}
186 changes: 70 additions & 116 deletions datafusion/expr/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ use crate::{
TableProviderFilterPushDown, TableSource, WriteOp,
};

use super::dml::InsertOp;
use super::plan::ColumnUnnestList;
use arrow::datatypes::{DataType, Field, Fields, Schema, SchemaRef};
use datafusion_common::display::ToStringifiedPlan;
use datafusion_common::file_options::file_type::FileType;
Expand All @@ -54,9 +56,6 @@ use datafusion_common::{
};
use datafusion_expr_common::type_coercion::binary::type_union_resolution;

use super::dml::InsertOp;
use super::plan::{ColumnUnnestList, ColumnUnnestType};

/// Default table name for unnamed table
pub const UNNAMED_TABLE: &str = "?table?";

Expand Down Expand Up @@ -1186,7 +1185,7 @@ impl LogicalPlanBuilder {
) -> Result<Self> {
unnest_with_options(
Arc::unwrap_or_clone(self.plan),
vec![(column.into(), ColumnUnnestType::Inferred)],
vec![column.into()],
options,
)
.map(Self::new)
Expand All @@ -1197,26 +1196,6 @@ impl LogicalPlanBuilder {
self,
columns: Vec<Column>,
options: UnnestOptions,
) -> Result<Self> {
unnest_with_options(
Arc::unwrap_or_clone(self.plan),
columns
.into_iter()
.map(|c| (c, ColumnUnnestType::Inferred))
.collect(),
options,
)
.map(Self::new)
}

/// Unnest the given columns with the given [`UnnestOptions`]
/// if one column is a list type, it can be recursively and simultaneously
/// unnested into the desired recursion levels
/// e.g select unnest(list_col,depth=1), unnest(list_col,depth=2)
pub fn unnest_columns_recursive_with_options(
self,
columns: Vec<(Column, ColumnUnnestType)>,
options: UnnestOptions,
) -> Result<Self> {
unnest_with_options(Arc::unwrap_or_clone(self.plan), columns, options)
.map(Self::new)
Expand Down Expand Up @@ -1594,14 +1573,12 @@ impl TableSource for LogicalTableSource {

/// Create a [`LogicalPlan::Unnest`] plan
pub fn unnest(input: LogicalPlan, columns: Vec<Column>) -> Result<LogicalPlan> {
let unnestings = columns
.into_iter()
.map(|c| (c, ColumnUnnestType::Inferred))
.collect();
unnest_with_options(input, unnestings, UnnestOptions::default())
unnest_with_options(input, columns, UnnestOptions::default())
}

pub fn get_unnested_list_datatype_recursive(
// Get the data type of a multi-dimensional type after unnesting it
// with a given depth
fn get_unnested_list_datatype_recursive(
data_type: &DataType,
depth: usize,
) -> Result<DataType> {
Expand All @@ -1620,27 +1597,6 @@ pub fn get_unnested_list_datatype_recursive(
internal_err!("trying to unnest on invalid data type {:?}", data_type)
}

/// Infer the unnest type based on the data type:
/// - list type: infer to unnest(list(col, depth=1))
/// - struct type: infer to unnest(struct)
fn infer_unnest_type(
col_name: &String,
data_type: &DataType,
) -> Result<ColumnUnnestType> {
match data_type {
DataType::List(_) | DataType::FixedSizeList(_, _) | DataType::LargeList(_) => {
Ok(ColumnUnnestType::List(vec![ColumnUnnestList {
output_column: Column::from_name(col_name),
depth: 1,
}]))
}
DataType::Struct(_) => Ok(ColumnUnnestType::Struct),
_ => {
internal_err!("trying to unnest on invalid data type {:?}", data_type)
}
}
}

pub fn get_struct_unnested_columns(
col_name: &String,
inner_fields: &Fields,
Expand Down Expand Up @@ -1729,20 +1685,15 @@ pub fn get_unnested_columns(
/// ```
pub fn unnest_with_options(
input: LogicalPlan,
columns_to_unnest: Vec<(Column, ColumnUnnestType)>,
columns_to_unnest: Vec<Column>,
options: UnnestOptions,
) -> Result<LogicalPlan> {
let mut list_columns: Vec<(usize, ColumnUnnestList)> = vec![];
let mut struct_columns = vec![];
let indices_to_unnest = columns_to_unnest
.iter()
.map(|col_unnesting| {
Ok((
input.schema().index_of_column(&col_unnesting.0)?,
col_unnesting,
))
})
.collect::<Result<HashMap<usize, &(Column, ColumnUnnestType)>>>()?;
.map(|c| Ok((input.schema().index_of_column(c)?, c)))
.collect::<Result<HashMap<usize, &Column>>>()?;

let input_schema = input.schema();

Expand All @@ -1767,51 +1718,59 @@ pub fn unnest_with_options(
.enumerate()
.map(|(index, (original_qualifier, original_field))| {
match indices_to_unnest.get(&index) {
Some((column_to_unnest, unnest_type)) => {
let mut inferred_unnest_type = unnest_type.clone();
if let ColumnUnnestType::Inferred = unnest_type {
inferred_unnest_type = infer_unnest_type(
Some(column_to_unnest) => {
let recursions_on_column = options
.recursions
.iter()
.filter(|p| -> bool { &p.input_column == *column_to_unnest })
.collect::<Vec<_>>();
let mut transformed_columns = recursions_on_column
.iter()
.map(|r| {
list_columns.push((
index,
ColumnUnnestList {
output_column: r.output_column.clone(),
depth: r.depth,
},
));
Ok(get_unnested_columns(
&r.output_column.name,
original_field.data_type(),
r.depth,
)?
.into_iter()
.next()
.unwrap()) // because unnesting a list column always result into one result
})
.collect::<Result<Vec<(Column, Arc<Field>)>>>()?;
if transformed_columns.is_empty() {
transformed_columns = get_unnested_columns(
&column_to_unnest.name,
original_field.data_type(),
1,
)?;
}
let transformed_columns: Vec<(Column, Arc<Field>)> =
match inferred_unnest_type {
ColumnUnnestType::Struct => {
match original_field.data_type() {
DataType::Struct(_) => {
struct_columns.push(index);
get_unnested_columns(
&column_to_unnest.name,
original_field.data_type(),
1,
)?
}
ColumnUnnestType::List(unnest_lists) => {
list_columns.extend(
unnest_lists
.iter()
.map(|ul| (index, ul.to_owned().clone())),
);
unnest_lists
.iter()
.map(
|ColumnUnnestList {
output_column,
depth,
}| {
get_unnested_columns(
&output_column.name,
original_field.data_type(),
*depth,
)
},
)
.collect::<Result<Vec<Vec<(Column, Arc<Field>)>>>>()?
.into_iter()
.flatten()
.collect::<Vec<_>>()
DataType::List(_)
| DataType::FixedSizeList(_, _)
| DataType::LargeList(_) => {
list_columns.push((
index,
ColumnUnnestList {
output_column: Column::from_name(
&column_to_unnest.name,
),
depth: 1,
},
));
}
_ => return internal_err!("Invalid unnest type"),
_ => {}
};
}

// new columns dependent on the same original index
dependency_indices
.extend(std::iter::repeat(index).take(transformed_columns.len()));
Expand Down Expand Up @@ -1860,7 +1819,7 @@ mod tests {
use crate::logical_plan::StringifiedPlan;
use crate::{col, expr, expr_fn::exists, in_subquery, lit, scalar_subquery};

use datafusion_common::SchemaError;
use datafusion_common::{RecursionUnnestOption, SchemaError};

#[test]
fn plan_builder_simple() -> Result<()> {
Expand Down Expand Up @@ -2268,24 +2227,19 @@ mod tests {

// Simultaneously unnesting a list (with different depth) and a struct column
let plan = nested_table_scan("test_table")?
.unnest_columns_recursive_with_options(
vec![
(
"stringss".into(),
ColumnUnnestType::List(vec![
ColumnUnnestList {
output_column: Column::from_name("stringss_depth_1"),
depth: 1,
},
ColumnUnnestList {
output_column: Column::from_name("stringss_depth_2"),
depth: 2,
},
]),
),
("struct_singular".into(), ColumnUnnestType::Inferred),
],
UnnestOptions::default(),
.unnest_columns_with_options(
vec!["stringss".into(), "struct_singular".into()],
UnnestOptions::default()
.with_recursions(RecursionUnnestOption {
input_column: "stringss".into(),
output_column: "stringss_depth_1".into(),
depth: 1,
})
.with_recursions(RecursionUnnestOption {
input_column: "stringss".into(),
output_column: "stringss_depth_2".into(),
depth: 2,
}),
)?
.build()?;

Expand Down
4 changes: 2 additions & 2 deletions datafusion/expr/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ pub use ddl::{
};
pub use dml::{DmlStatement, WriteOp};
pub use plan::{
projection_schema, Aggregate, Analyze, ColumnUnnestList, ColumnUnnestType, CrossJoin,
DescribeTable, Distinct, DistinctOn, EmptyRelation, Explain, Extension, Filter, Join,
projection_schema, Aggregate, Analyze, ColumnUnnestList, CrossJoin, DescribeTable,
Distinct, DistinctOn, EmptyRelation, Explain, Extension, Filter, Join,
JoinConstraint, JoinType, Limit, LogicalPlan, Partitioning, PlanType, Prepare,
Projection, RecursiveQuery, Repartition, Sort, StringifiedPlan, Subquery,
SubqueryAlias, TableScan, ToStringifiedPlan, Union, Unnest, Values, Window,
Expand Down
Loading

0 comments on commit c7e5d8d

Please sign in to comment.