Skip to content

Commit

Permalink
Rename BuiltInWindow to UDFWindow
Browse files Browse the repository at this point in the history
  • Loading branch information
irenjj committed Nov 23, 2024
1 parent 9fb7aee commit 28c79ad
Show file tree
Hide file tree
Showing 7 changed files with 40 additions and 134 deletions.

This file was deleted.

8 changes: 4 additions & 4 deletions datafusion/physical-expr/src/window/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,15 @@
// under the License.

mod aggregate;
mod built_in;
mod built_in_window_function_expr;
mod sliding_aggregate;
mod udf;
mod udf_window_function_expr;
mod window_expr;

pub use aggregate::PlainAggregateWindowExpr;
pub use built_in::BuiltInWindowExpr;
pub use built_in_window_function_expr::BuiltInWindowFunctionExpr;
pub use sliding_aggregate::SlidingAggregateWindowExpr;
pub use udf::UDFWindowExpr;
pub use udf_window_function_expr::UDFWindowFunctionExpr;
pub use window_expr::PartitionBatches;
pub use window_expr::PartitionKey;
pub use window_expr::PartitionWindowAggStates;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@
// specific language governing permissions and limitations
// under the License.

//! Physical exec for built-in window function expressions.
//! Physical exec for udf window function expressions.

use std::any::Any;
use std::ops::Range;
use std::sync::Arc;

use super::{BuiltInWindowFunctionExpr, WindowExpr};
use super::{UDFWindowFunctionExpr, WindowExpr};
use crate::window::window_expr::{get_orderby_values, WindowFn};
use crate::window::{PartitionBatches, PartitionWindowAggStates, WindowState};
use crate::{reverse_order_bys, EquivalenceProperties, PhysicalExpr};
Expand All @@ -35,19 +35,19 @@ use datafusion_expr::window_state::{WindowAggState, WindowFrameContext};
use datafusion_expr::WindowFrame;
use datafusion_physical_expr_common::sort_expr::LexOrdering;

/// A window expr that takes the form of a [`BuiltInWindowFunctionExpr`].
/// A window expr that takes the form of a [`UDFWindowFunctionExpr`].
#[derive(Debug)]
pub struct BuiltInWindowExpr {
expr: Arc<dyn BuiltInWindowFunctionExpr>,
pub struct UDFWindowExpr {
expr: Arc<dyn UDFWindowFunctionExpr>,
partition_by: Vec<Arc<dyn PhysicalExpr>>,
order_by: LexOrdering,
window_frame: Arc<WindowFrame>,
}

impl BuiltInWindowExpr {
/// create a new built-in window function expression
impl UDFWindowExpr {
/// create a new udf window function expression
pub fn new(
expr: Arc<dyn BuiltInWindowFunctionExpr>,
expr: Arc<dyn UDFWindowFunctionExpr>,
partition_by: &[Arc<dyn PhysicalExpr>],
order_by: &LexOrdering,
window_frame: Arc<WindowFrame>,
Expand All @@ -60,8 +60,8 @@ impl BuiltInWindowExpr {
}
}

/// Get BuiltInWindowFunction expr of BuiltInWindowExpr
pub fn get_built_in_func_expr(&self) -> &Arc<dyn BuiltInWindowFunctionExpr> {
/// Get UDFWindowFunction expr of UDFWindowExpr
pub fn get_udf_func_expr(&self) -> &Arc<dyn UDFWindowFunctionExpr> {
&self.expr
}

Expand All @@ -79,7 +79,7 @@ impl BuiltInWindowExpr {
eq_properties
.add_new_orderings([LexOrdering::new(vec![fn_res_ordering])]);
} else {
// If we have a PARTITION BY, built-in functions can not introduce
// If we have a PARTITION BY, udf functions can not introduce
// a global ordering unless the existing ordering is compatible
// with PARTITION BY expressions. To elaborate, when PARTITION BY
// expressions and existing ordering expressions are equal (w.r.t.
Expand All @@ -96,7 +96,7 @@ impl BuiltInWindowExpr {
}
}

impl WindowExpr for BuiltInWindowExpr {
impl WindowExpr for UDFWindowExpr {
/// Return a reference to Any that can be used for downcasting
fn as_any(&self) -> &dyn Any {
self
Expand Down Expand Up @@ -264,7 +264,7 @@ impl WindowExpr for BuiltInWindowExpr {

fn get_reverse_expr(&self) -> Option<Arc<dyn WindowExpr>> {
self.expr.reverse_expr().map(|reverse_expr| {
Arc::new(BuiltInWindowExpr::new(
Arc::new(UDFWindowExpr::new(
reverse_expr,
&self.partition_by.clone(),
reverse_order_bys(self.order_by.as_ref()).as_ref(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1189,7 +1189,7 @@ mod tests {

use crate::common::collect;
use crate::memory::MemoryExec;
use datafusion_physical_expr::window::BuiltInWindowExpr;
use datafusion_physical_expr::window::UDFWindowExpr;
use futures::future::Shared;
use futures::{pin_mut, ready, FutureExt, Stream, StreamExt};
use itertools::Itertools;
Expand Down Expand Up @@ -1562,7 +1562,7 @@ mod tests {

let window_exprs = vec![
// LAST_VALUE(a)
Arc::new(BuiltInWindowExpr::new(
Arc::new(UDFWindowExpr::new(
last_value_func,
&[],
&LexOrdering::default(),
Expand All @@ -1573,7 +1573,7 @@ mod tests {
)),
)) as _,
// NTH_VALUE(a, -1)
Arc::new(BuiltInWindowExpr::new(
Arc::new(UDFWindowExpr::new(
nth_value_func1,
&[],
&LexOrdering::default(),
Expand All @@ -1584,7 +1584,7 @@ mod tests {
)),
)) as _,
// NTH_VALUE(a, -2)
Arc::new(BuiltInWindowExpr::new(
Arc::new(UDFWindowExpr::new(
nth_value_func2,
&[],
&LexOrdering::default(),
Expand Down
22 changes: 11 additions & 11 deletions datafusion/physical-plan/src/windows/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctio
use datafusion_physical_expr::equivalence::collapse_lex_req;
use datafusion_physical_expr::{
reverse_order_bys,
window::{BuiltInWindowFunctionExpr, SlidingAggregateWindowExpr},
window::{UDFWindowFunctionExpr, SlidingAggregateWindowExpr},
ConstExpr, EquivalenceProperties, LexOrdering, PhysicalSortRequirement,
};
use itertools::Itertools;
Expand All @@ -50,7 +50,7 @@ use datafusion_functions_window_common::field::WindowUDFFieldArgs;
use datafusion_functions_window_common::partition::PartitionEvaluatorArgs;
use datafusion_physical_expr::expressions::Column;
pub use datafusion_physical_expr::window::{
BuiltInWindowExpr, PlainAggregateWindowExpr, WindowExpr,
UDFWindowExpr, PlainAggregateWindowExpr, WindowExpr,
};
use datafusion_physical_expr_common::sort_expr::LexRequirement;
pub use window_agg_exec::WindowAggExec;
Expand Down Expand Up @@ -117,7 +117,7 @@ pub fn create_window_expr(
aggregate,
)
}
WindowFunctionDefinition::WindowUDF(fun) => Arc::new(BuiltInWindowExpr::new(
WindowFunctionDefinition::WindowUDF(fun) => Arc::new(UDFWindowExpr::new(
create_udwf_window_expr(fun, args, input_schema, name, ignore_nulls)?,
partition_by,
order_by,
Expand Down Expand Up @@ -153,14 +153,14 @@ fn window_expr_from_aggregate_expr(
}
}

/// Creates a `BuiltInWindowFunctionExpr` suitable for a user defined window function
/// Creates a `UDFWindowFunctionExpr` suitable for a user defined window function
pub fn create_udwf_window_expr(
fun: &Arc<WindowUDF>,
args: &[Arc<dyn PhysicalExpr>],
input_schema: &Schema,
name: String,
ignore_nulls: bool,
) -> Result<Arc<dyn BuiltInWindowFunctionExpr>> {
) -> Result<Arc<dyn UDFWindowFunctionExpr>> {
// need to get the types into an owned vec for some reason
let input_types: Vec<_> = args
.iter()
Expand Down Expand Up @@ -192,7 +192,7 @@ pub fn create_udwf_window_expr(
Ok(udwf_expr)
}

/// Implements [`BuiltInWindowFunctionExpr`] for [`WindowUDF`]
/// Implements [`UDFWindowFunctionExpr`] for [`WindowUDF`]
#[derive(Clone, Debug)]
pub struct WindowUDFExpr {
fun: Arc<WindowUDF>,
Expand All @@ -215,7 +215,7 @@ impl WindowUDFExpr {
}
}

impl BuiltInWindowFunctionExpr for WindowUDFExpr {
impl UDFWindowFunctionExpr for WindowUDFExpr {
fn as_any(&self) -> &dyn std::any::Any {
self
}
Expand Down Expand Up @@ -244,7 +244,7 @@ impl BuiltInWindowFunctionExpr for WindowUDFExpr {
&self.name
}

fn reverse_expr(&self) -> Option<Arc<dyn BuiltInWindowFunctionExpr>> {
fn reverse_expr(&self) -> Option<Arc<dyn UDFWindowFunctionExpr>> {
match self.fun.reverse_expr() {
ReversedUDWF::Identical => Some(Arc::new(self.clone())),
ReversedUDWF::NotSupported => None,
Expand Down Expand Up @@ -345,10 +345,10 @@ pub(crate) fn window_equivalence_properties(
.extend(input.equivalence_properties().clone());

for expr in window_expr {
if let Some(builtin_window_expr) =
expr.as_any().downcast_ref::<BuiltInWindowExpr>()
if let Some(udf_window_expr) =
expr.as_any().downcast_ref::<UDFWindowExpr>()
{
builtin_window_expr.add_equal_orderings(&mut window_eq_properties);
udf_window_expr.add_equal_orderings(&mut window_eq_properties);
}
}
window_eq_properties
Expand Down
8 changes: 4 additions & 4 deletions datafusion/proto/src/physical_plan/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use std::sync::Arc;

#[cfg(feature = "parquet")]
use datafusion::datasource::file_format::parquet::ParquetSink;
use datafusion::physical_expr::window::{BuiltInWindowExpr, SlidingAggregateWindowExpr};
use datafusion::physical_expr::window::{UDFWindowExpr, SlidingAggregateWindowExpr};
use datafusion::physical_expr::{LexOrdering, PhysicalSortExpr, ScalarFunctionExpr};
use datafusion::physical_plan::expressions::{
BinaryExpr, CaseExpr, CastExpr, Column, InListExpr, IsNotNullExpr, IsNullExpr,
Expand Down Expand Up @@ -120,9 +120,9 @@ pub fn serialize_physical_window_expr(
window_frame,
codec,
)?
} else if let Some(built_in_window_expr) = expr.downcast_ref::<BuiltInWindowExpr>() {
if let Some(expr) = built_in_window_expr
.get_built_in_func_expr()
} else if let Some(udf_window_expr) = expr.downcast_ref::<UDFWindowExpr>() {
if let Some(expr) = udf_window_expr
.get_udf_func_expr()
.as_any()
.downcast_ref::<WindowUDFExpr>()
{
Expand Down
8 changes: 4 additions & 4 deletions datafusion/proto/tests/cases/roundtrip_physical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ use datafusion::functions_window::nth_value::nth_value_udwf;
use datafusion::functions_window::row_number::row_number_udwf;
use datafusion::logical_expr::{create_udf, JoinType, Operator, Volatility};
use datafusion::physical_expr::expressions::Literal;
use datafusion::physical_expr::window::{BuiltInWindowExpr, SlidingAggregateWindowExpr};
use datafusion::physical_expr::window::{UDFWindowExpr, SlidingAggregateWindowExpr};
use datafusion::physical_expr::{
LexOrdering, LexRequirement, PhysicalSortRequirement, ScalarFunctionExpr,
};
Expand Down Expand Up @@ -279,7 +279,7 @@ fn roundtrip_udwf() -> Result<()> {
let field_b = Field::new("b", DataType::Int64, false);
let schema = Arc::new(Schema::new(vec![field_a, field_b]));

let udwf_expr = Arc::new(BuiltInWindowExpr::new(
let udwf_expr = Arc::new(UDFWindowExpr::new(
create_udwf_window_expr(
&row_number_udwf(),
&[],
Expand Down Expand Up @@ -326,7 +326,7 @@ fn roundtrip_window() -> Result<()> {
"NTH_VALUE(a, 2) PARTITION BY [b] ORDER BY [a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW".to_string(),
false,
)?;
let udwf_expr = Arc::new(BuiltInWindowExpr::new(
let udwf_expr = Arc::new(UDFWindowExpr::new(
nth_value_window,
&[col("b", &schema)?],
&LexOrdering {
Expand Down Expand Up @@ -1125,7 +1125,7 @@ fn roundtrip_udwf_extension_codec() -> Result<()> {
WindowFrameBound::CurrentRow,
);

let udwf_expr = Arc::new(BuiltInWindowExpr::new(
let udwf_expr = Arc::new(UDFWindowExpr::new(
udwf,
&[col("b", &schema)?],
&LexOrdering {
Expand Down

0 comments on commit 28c79ad

Please sign in to comment.