From 28c79ad2c84aec3098c23829b16b89df4d508574 Mon Sep 17 00:00:00 2001 From: irenjj Date: Sat, 23 Nov 2024 16:36:33 +0800 Subject: [PATCH] Rename `BuiltInWindow` to `UDFWindow` --- .../window/built_in_window_function_expr.rs | 94 ------------------- datafusion/physical-expr/src/window/mod.rs | 8 +- .../src/window/{built_in.rs => udf.rs} | 26 ++--- .../src/windows/bounded_window_agg_exec.rs | 8 +- datafusion/physical-plan/src/windows/mod.rs | 22 ++--- .../proto/src/physical_plan/to_proto.rs | 8 +- .../tests/cases/roundtrip_physical_plan.rs | 8 +- 7 files changed, 40 insertions(+), 134 deletions(-) delete mode 100644 datafusion/physical-expr/src/window/built_in_window_function_expr.rs rename datafusion/physical-expr/src/window/{built_in.rs => udf.rs} (93%) diff --git a/datafusion/physical-expr/src/window/built_in_window_function_expr.rs b/datafusion/physical-expr/src/window/built_in_window_function_expr.rs deleted file mode 100644 index 7aa4f6536a6e..000000000000 --- a/datafusion/physical-expr/src/window/built_in_window_function_expr.rs +++ /dev/null @@ -1,94 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use crate::{PhysicalExpr, PhysicalSortExpr}; - -use arrow::array::ArrayRef; -use arrow::datatypes::Field; -use arrow::record_batch::RecordBatch; -use arrow_schema::SchemaRef; -use datafusion_common::Result; -use datafusion_expr::PartitionEvaluator; - -use std::any::Any; -use std::sync::Arc; - -/// Evaluates a window function by instantiating a -/// `[PartitionEvaluator]` for calculating the function's output in -/// that partition. -/// -/// Note that unlike aggregation based window functions, some window -/// functions such as `rank` ignore the values in the window frame, -/// but others such as `first_value`, `last_value`, and -/// `nth_value` need the value. -#[allow(rustdoc::private_intra_doc_links)] -pub trait BuiltInWindowFunctionExpr: Send + Sync + std::fmt::Debug { - /// Returns the aggregate expression as [`Any`] so that it can be - /// downcast to a specific implementation. - fn as_any(&self) -> &dyn Any; - - /// The field of the final result of evaluating this window function. - fn field(&self) -> Result; - - /// Expressions that are passed to the [`PartitionEvaluator`]. - fn expressions(&self) -> Vec>; - - /// Human readable name such as `"MIN(c2)"` or `"RANK()"`. The default - /// implementation returns placeholder text. - fn name(&self) -> &str { - "BuiltInWindowFunctionExpr: default name" - } - - /// Evaluate window function's arguments against the input window - /// batch and return an [`ArrayRef`]. - /// - /// Typically, the resulting vector is a single element vector. - fn evaluate_args(&self, batch: &RecordBatch) -> Result> { - self.expressions() - .iter() - .map(|e| { - e.evaluate(batch) - .and_then(|v| v.into_array(batch.num_rows())) - }) - .collect() - } - - /// Create a [`PartitionEvaluator`] for evaluating the function on - /// a particular partition. - fn create_evaluator(&self) -> Result>; - - /// Construct a new [`BuiltInWindowFunctionExpr`] that produces - /// the same result as this function on a window with reverse - /// order. The return value of this function is used by the - /// DataFusion optimizer to avoid re-sorting the data when - /// possible. - /// - /// Returns `None` (the default) if no reverse is known (or possible). - /// - /// For example, the reverse of `lead(10)` is `lag(10)`. - fn reverse_expr(&self) -> Option> { - None - } - - /// Returns the ordering introduced by the window function, if applicable. - /// Most window functions don't introduce an ordering, hence the default - /// value is `None`. Note that this information is used to update ordering - /// equivalences. - fn get_result_ordering(&self, _schema: &SchemaRef) -> Option { - None - } -} diff --git a/datafusion/physical-expr/src/window/mod.rs b/datafusion/physical-expr/src/window/mod.rs index e7a318b860fd..6fbce2320118 100644 --- a/datafusion/physical-expr/src/window/mod.rs +++ b/datafusion/physical-expr/src/window/mod.rs @@ -16,15 +16,15 @@ // under the License. mod aggregate; -mod built_in; -mod built_in_window_function_expr; mod sliding_aggregate; +mod udf; +mod udf_window_function_expr; mod window_expr; pub use aggregate::PlainAggregateWindowExpr; -pub use built_in::BuiltInWindowExpr; -pub use built_in_window_function_expr::BuiltInWindowFunctionExpr; pub use sliding_aggregate::SlidingAggregateWindowExpr; +pub use udf::UDFWindowExpr; +pub use udf_window_function_expr::UDFWindowFunctionExpr; pub use window_expr::PartitionBatches; pub use window_expr::PartitionKey; pub use window_expr::PartitionWindowAggStates; diff --git a/datafusion/physical-expr/src/window/built_in.rs b/datafusion/physical-expr/src/window/udf.rs similarity index 93% rename from datafusion/physical-expr/src/window/built_in.rs rename to datafusion/physical-expr/src/window/udf.rs index 0f6c3f921892..c2f162c06f39 100644 --- a/datafusion/physical-expr/src/window/built_in.rs +++ b/datafusion/physical-expr/src/window/udf.rs @@ -15,13 +15,13 @@ // specific language governing permissions and limitations // under the License. -//! Physical exec for built-in window function expressions. +//! Physical exec for udf window function expressions. use std::any::Any; use std::ops::Range; use std::sync::Arc; -use super::{BuiltInWindowFunctionExpr, WindowExpr}; +use super::{UDFWindowFunctionExpr, WindowExpr}; use crate::window::window_expr::{get_orderby_values, WindowFn}; use crate::window::{PartitionBatches, PartitionWindowAggStates, WindowState}; use crate::{reverse_order_bys, EquivalenceProperties, PhysicalExpr}; @@ -35,19 +35,19 @@ use datafusion_expr::window_state::{WindowAggState, WindowFrameContext}; use datafusion_expr::WindowFrame; use datafusion_physical_expr_common::sort_expr::LexOrdering; -/// A window expr that takes the form of a [`BuiltInWindowFunctionExpr`]. +/// A window expr that takes the form of a [`UDFWindowFunctionExpr`]. #[derive(Debug)] -pub struct BuiltInWindowExpr { - expr: Arc, +pub struct UDFWindowExpr { + expr: Arc, partition_by: Vec>, order_by: LexOrdering, window_frame: Arc, } -impl BuiltInWindowExpr { - /// create a new built-in window function expression +impl UDFWindowExpr { + /// create a new udf window function expression pub fn new( - expr: Arc, + expr: Arc, partition_by: &[Arc], order_by: &LexOrdering, window_frame: Arc, @@ -60,8 +60,8 @@ impl BuiltInWindowExpr { } } - /// Get BuiltInWindowFunction expr of BuiltInWindowExpr - pub fn get_built_in_func_expr(&self) -> &Arc { + /// Get UDFWindowFunction expr of UDFWindowExpr + pub fn get_udf_func_expr(&self) -> &Arc { &self.expr } @@ -79,7 +79,7 @@ impl BuiltInWindowExpr { eq_properties .add_new_orderings([LexOrdering::new(vec![fn_res_ordering])]); } else { - // If we have a PARTITION BY, built-in functions can not introduce + // If we have a PARTITION BY, udf functions can not introduce // a global ordering unless the existing ordering is compatible // with PARTITION BY expressions. To elaborate, when PARTITION BY // expressions and existing ordering expressions are equal (w.r.t. @@ -96,7 +96,7 @@ impl BuiltInWindowExpr { } } -impl WindowExpr for BuiltInWindowExpr { +impl WindowExpr for UDFWindowExpr { /// Return a reference to Any that can be used for downcasting fn as_any(&self) -> &dyn Any { self @@ -264,7 +264,7 @@ impl WindowExpr for BuiltInWindowExpr { fn get_reverse_expr(&self) -> Option> { self.expr.reverse_expr().map(|reverse_expr| { - Arc::new(BuiltInWindowExpr::new( + Arc::new(UDFWindowExpr::new( reverse_expr, &self.partition_by.clone(), reverse_order_bys(self.order_by.as_ref()).as_ref(), diff --git a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs index 50666f6cc368..bbb62057b6a9 100644 --- a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs +++ b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs @@ -1189,7 +1189,7 @@ mod tests { use crate::common::collect; use crate::memory::MemoryExec; - use datafusion_physical_expr::window::BuiltInWindowExpr; + use datafusion_physical_expr::window::UDFWindowExpr; use futures::future::Shared; use futures::{pin_mut, ready, FutureExt, Stream, StreamExt}; use itertools::Itertools; @@ -1562,7 +1562,7 @@ mod tests { let window_exprs = vec![ // LAST_VALUE(a) - Arc::new(BuiltInWindowExpr::new( + Arc::new(UDFWindowExpr::new( last_value_func, &[], &LexOrdering::default(), @@ -1573,7 +1573,7 @@ mod tests { )), )) as _, // NTH_VALUE(a, -1) - Arc::new(BuiltInWindowExpr::new( + Arc::new(UDFWindowExpr::new( nth_value_func1, &[], &LexOrdering::default(), @@ -1584,7 +1584,7 @@ mod tests { )), )) as _, // NTH_VALUE(a, -2) - Arc::new(BuiltInWindowExpr::new( + Arc::new(UDFWindowExpr::new( nth_value_func2, &[], &LexOrdering::default(), diff --git a/datafusion/physical-plan/src/windows/mod.rs b/datafusion/physical-plan/src/windows/mod.rs index 32173c3ef17d..4821ebc270a3 100644 --- a/datafusion/physical-plan/src/windows/mod.rs +++ b/datafusion/physical-plan/src/windows/mod.rs @@ -35,7 +35,7 @@ use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctio use datafusion_physical_expr::equivalence::collapse_lex_req; use datafusion_physical_expr::{ reverse_order_bys, - window::{BuiltInWindowFunctionExpr, SlidingAggregateWindowExpr}, + window::{UDFWindowFunctionExpr, SlidingAggregateWindowExpr}, ConstExpr, EquivalenceProperties, LexOrdering, PhysicalSortRequirement, }; use itertools::Itertools; @@ -50,7 +50,7 @@ use datafusion_functions_window_common::field::WindowUDFFieldArgs; use datafusion_functions_window_common::partition::PartitionEvaluatorArgs; use datafusion_physical_expr::expressions::Column; pub use datafusion_physical_expr::window::{ - BuiltInWindowExpr, PlainAggregateWindowExpr, WindowExpr, + UDFWindowExpr, PlainAggregateWindowExpr, WindowExpr, }; use datafusion_physical_expr_common::sort_expr::LexRequirement; pub use window_agg_exec::WindowAggExec; @@ -117,7 +117,7 @@ pub fn create_window_expr( aggregate, ) } - WindowFunctionDefinition::WindowUDF(fun) => Arc::new(BuiltInWindowExpr::new( + WindowFunctionDefinition::WindowUDF(fun) => Arc::new(UDFWindowExpr::new( create_udwf_window_expr(fun, args, input_schema, name, ignore_nulls)?, partition_by, order_by, @@ -153,14 +153,14 @@ fn window_expr_from_aggregate_expr( } } -/// Creates a `BuiltInWindowFunctionExpr` suitable for a user defined window function +/// Creates a `UDFWindowFunctionExpr` suitable for a user defined window function pub fn create_udwf_window_expr( fun: &Arc, args: &[Arc], input_schema: &Schema, name: String, ignore_nulls: bool, -) -> Result> { +) -> Result> { // need to get the types into an owned vec for some reason let input_types: Vec<_> = args .iter() @@ -192,7 +192,7 @@ pub fn create_udwf_window_expr( Ok(udwf_expr) } -/// Implements [`BuiltInWindowFunctionExpr`] for [`WindowUDF`] +/// Implements [`UDFWindowFunctionExpr`] for [`WindowUDF`] #[derive(Clone, Debug)] pub struct WindowUDFExpr { fun: Arc, @@ -215,7 +215,7 @@ impl WindowUDFExpr { } } -impl BuiltInWindowFunctionExpr for WindowUDFExpr { +impl UDFWindowFunctionExpr for WindowUDFExpr { fn as_any(&self) -> &dyn std::any::Any { self } @@ -244,7 +244,7 @@ impl BuiltInWindowFunctionExpr for WindowUDFExpr { &self.name } - fn reverse_expr(&self) -> Option> { + fn reverse_expr(&self) -> Option> { match self.fun.reverse_expr() { ReversedUDWF::Identical => Some(Arc::new(self.clone())), ReversedUDWF::NotSupported => None, @@ -345,10 +345,10 @@ pub(crate) fn window_equivalence_properties( .extend(input.equivalence_properties().clone()); for expr in window_expr { - if let Some(builtin_window_expr) = - expr.as_any().downcast_ref::() + if let Some(udf_window_expr) = + expr.as_any().downcast_ref::() { - builtin_window_expr.add_equal_orderings(&mut window_eq_properties); + udf_window_expr.add_equal_orderings(&mut window_eq_properties); } } window_eq_properties diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs index 7d9a524af828..951727c7d8a3 100644 --- a/datafusion/proto/src/physical_plan/to_proto.rs +++ b/datafusion/proto/src/physical_plan/to_proto.rs @@ -19,7 +19,7 @@ use std::sync::Arc; #[cfg(feature = "parquet")] use datafusion::datasource::file_format::parquet::ParquetSink; -use datafusion::physical_expr::window::{BuiltInWindowExpr, SlidingAggregateWindowExpr}; +use datafusion::physical_expr::window::{UDFWindowExpr, SlidingAggregateWindowExpr}; use datafusion::physical_expr::{LexOrdering, PhysicalSortExpr, ScalarFunctionExpr}; use datafusion::physical_plan::expressions::{ BinaryExpr, CaseExpr, CastExpr, Column, InListExpr, IsNotNullExpr, IsNullExpr, @@ -120,9 +120,9 @@ pub fn serialize_physical_window_expr( window_frame, codec, )? - } else if let Some(built_in_window_expr) = expr.downcast_ref::() { - if let Some(expr) = built_in_window_expr - .get_built_in_func_expr() + } else if let Some(udf_window_expr) = expr.downcast_ref::() { + if let Some(expr) = udf_window_expr + .get_udf_func_expr() .as_any() .downcast_ref::() { diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index efa462aa7a85..297d14d5bd34 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -54,7 +54,7 @@ use datafusion::functions_window::nth_value::nth_value_udwf; use datafusion::functions_window::row_number::row_number_udwf; use datafusion::logical_expr::{create_udf, JoinType, Operator, Volatility}; use datafusion::physical_expr::expressions::Literal; -use datafusion::physical_expr::window::{BuiltInWindowExpr, SlidingAggregateWindowExpr}; +use datafusion::physical_expr::window::{UDFWindowExpr, SlidingAggregateWindowExpr}; use datafusion::physical_expr::{ LexOrdering, LexRequirement, PhysicalSortRequirement, ScalarFunctionExpr, }; @@ -279,7 +279,7 @@ fn roundtrip_udwf() -> Result<()> { let field_b = Field::new("b", DataType::Int64, false); let schema = Arc::new(Schema::new(vec![field_a, field_b])); - let udwf_expr = Arc::new(BuiltInWindowExpr::new( + let udwf_expr = Arc::new(UDFWindowExpr::new( create_udwf_window_expr( &row_number_udwf(), &[], @@ -326,7 +326,7 @@ fn roundtrip_window() -> Result<()> { "NTH_VALUE(a, 2) PARTITION BY [b] ORDER BY [a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW".to_string(), false, )?; - let udwf_expr = Arc::new(BuiltInWindowExpr::new( + let udwf_expr = Arc::new(UDFWindowExpr::new( nth_value_window, &[col("b", &schema)?], &LexOrdering { @@ -1125,7 +1125,7 @@ fn roundtrip_udwf_extension_codec() -> Result<()> { WindowFrameBound::CurrentRow, ); - let udwf_expr = Arc::new(BuiltInWindowExpr::new( + let udwf_expr = Arc::new(UDFWindowExpr::new( udwf, &[col("b", &schema)?], &LexOrdering {