From ac50592eda127930c213642e18465bdf4054b00b Mon Sep 17 00:00:00 2001 From: Yongting You <2010youy01@gmail.com> Date: Fri, 22 Nov 2024 20:21:52 +0800 Subject: [PATCH 1/4] Add generate_series() udtf --- Cargo.toml | 2 + datafusion-cli/Cargo.lock | 24 ++ datafusion-cli/Cargo.toml | 1 + datafusion-cli/src/functions.rs | 2 +- datafusion/catalog/src/table.rs | 41 ++- datafusion/core/Cargo.toml | 1 + datafusion/core/src/datasource/function.rs | 63 ---- datafusion/core/src/datasource/mod.rs | 1 - datafusion/core/src/execution/context/mod.rs | 9 +- .../core/src/execution/session_state.rs | 17 +- .../src/execution/session_state_defaults.rs | 8 +- datafusion/core/src/lib.rs | 5 + .../user_defined_table_functions.rs | 2 +- datafusion/functions-table/Cargo.toml | 61 ++++ .../functions-table/src/generate_series.rs | 137 +++++++++ datafusion/functions-table/src/lib.rs | 29 ++ datafusion/physical-plan/src/memory.rs | 286 +++++++++++++++++- .../test_files/table_functions.slt | 112 +++++++ 18 files changed, 726 insertions(+), 75 deletions(-) delete mode 100644 datafusion/core/src/datasource/function.rs create mode 100644 datafusion/functions-table/Cargo.toml create mode 100644 datafusion/functions-table/src/generate_series.rs create mode 100644 datafusion/functions-table/src/lib.rs create mode 100644 datafusion/sqllogictest/test_files/table_functions.slt diff --git a/Cargo.toml b/Cargo.toml index e947afff8f4f..aab371592dad 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -52,6 +52,7 @@ members = [ "datafusion-examples/examples/ffi/ffi_module_loader", "test-utils", "benchmarks", + "datafusion/functions-table", ] resolver = "2" @@ -108,6 +109,7 @@ datafusion-functions = { path = "datafusion/functions", version = "43.0.0" } datafusion-functions-aggregate = { path = "datafusion/functions-aggregate", version = "43.0.0" } datafusion-functions-aggregate-common = { path = "datafusion/functions-aggregate-common", version = "43.0.0" } datafusion-functions-nested = { path = "datafusion/functions-nested", version = "43.0.0" } +datafusion-functions-table = { path = "datafusion/functions-table", version = "43.0.0" } datafusion-functions-window = { path = "datafusion/functions-window", version = "43.0.0" } datafusion-functions-window-common = { path = "datafusion/functions-window-common", version = "43.0.0" } datafusion-optimizer = { path = "datafusion/optimizer", version = "43.0.0", default-features = false } diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock index 8afb096df55f..e896dac82a55 100644 --- a/datafusion-cli/Cargo.lock +++ b/datafusion-cli/Cargo.lock @@ -1210,6 +1210,7 @@ dependencies = [ "datafusion-functions", "datafusion-functions-aggregate", "datafusion-functions-nested", + "datafusion-functions-table", "datafusion-functions-window", "datafusion-optimizer", "datafusion-physical-expr", @@ -1271,6 +1272,7 @@ dependencies = [ "clap", "ctor", "datafusion", + "datafusion-catalog", "dirs", "env_logger", "futures", @@ -1448,6 +1450,28 @@ dependencies = [ "rand", ] +[[package]] +name = "datafusion-functions-table" +version = "43.0.0" +dependencies = [ + "ahash", + "arrow", + "arrow-schema", + "async-trait", + "datafusion-catalog", + "datafusion-common", + "datafusion-execution", + "datafusion-expr", + "datafusion-functions-aggregate-common", + "datafusion-physical-expr", + "datafusion-physical-expr-common", + "datafusion-physical-plan", + "half", + "indexmap", + "log", + "paste", +] + [[package]] name = "datafusion-functions-window" version = "43.0.0" diff --git a/datafusion-cli/Cargo.toml b/datafusion-cli/Cargo.toml index 784d47220c7c..743ec1b4a749 100644 --- a/datafusion-cli/Cargo.toml +++ b/datafusion-cli/Cargo.toml @@ -49,6 +49,7 @@ datafusion = { path = "../datafusion/core", version = "43.0.0", features = [ "unicode_expressions", "compression", ] } +datafusion-catalog = { path = "../datafusion/catalog", version = "43.0.0" } dirs = "5.0.1" env_logger = "0.11" futures = "0.3" diff --git a/datafusion-cli/src/functions.rs b/datafusion-cli/src/functions.rs index c622463de033..d7ca48d638b7 100644 --- a/datafusion-cli/src/functions.rs +++ b/datafusion-cli/src/functions.rs @@ -24,13 +24,13 @@ use async_trait::async_trait; use datafusion::catalog::Session; use datafusion::common::{plan_err, Column}; -use datafusion::datasource::function::TableFunctionImpl; use datafusion::datasource::TableProvider; use datafusion::error::Result; use datafusion::logical_expr::Expr; use datafusion::physical_plan::memory::MemoryExec; use datafusion::physical_plan::ExecutionPlan; use datafusion::scalar::ScalarValue; +use datafusion_catalog::TableFunctionImpl; use parquet::basic::ConvertedType; use parquet::data_type::{ByteArray, FixedLenByteArray}; use parquet::file::reader::FileReader; diff --git a/datafusion/catalog/src/table.rs b/datafusion/catalog/src/table.rs index d771930de25d..b6752191d9a7 100644 --- a/datafusion/catalog/src/table.rs +++ b/datafusion/catalog/src/table.rs @@ -25,9 +25,11 @@ use arrow_schema::SchemaRef; use async_trait::async_trait; use datafusion_common::Result; use datafusion_common::{not_impl_err, Constraints, Statistics}; +use datafusion_expr::Expr; + use datafusion_expr::dml::InsertOp; use datafusion_expr::{ - CreateExternalTable, Expr, LogicalPlan, TableProviderFilterPushDown, TableType, + CreateExternalTable, LogicalPlan, TableProviderFilterPushDown, TableType, }; use datafusion_physical_plan::ExecutionPlan; @@ -297,3 +299,40 @@ pub trait TableProviderFactory: Debug + Sync + Send { cmd: &CreateExternalTable, ) -> Result>; } + +/// A trait for table function implementations +pub trait TableFunctionImpl: Debug + Sync + Send { + /// Create a table provider + fn call(&self, args: &[Expr]) -> Result>; +} + +/// A table that uses a function to generate data +#[derive(Debug)] +pub struct TableFunction { + /// Name of the table function + name: String, + /// Function implementation + fun: Arc, +} + +impl TableFunction { + /// Create a new table function + pub fn new(name: String, fun: Arc) -> Self { + Self { name, fun } + } + + /// Get the name of the table function + pub fn name(&self) -> &str { + &self.name + } + + /// Get the implementation of the table function + pub fn function(&self) -> &Arc { + &self.fun + } + + /// Get the function implementation and generate a table + pub fn create_table_provider(&self, args: &[Expr]) -> Result> { + self.fun.call(args) + } +} diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml index d2365280937f..45a5a84b798d 100644 --- a/datafusion/core/Cargo.toml +++ b/datafusion/core/Cargo.toml @@ -104,6 +104,7 @@ datafusion-expr = { workspace = true } datafusion-functions = { workspace = true } datafusion-functions-aggregate = { workspace = true } datafusion-functions-nested = { workspace = true, optional = true } +datafusion-functions-table = { workspace = true } datafusion-functions-window = { workspace = true } datafusion-optimizer = { workspace = true } datafusion-physical-expr = { workspace = true } diff --git a/datafusion/core/src/datasource/function.rs b/datafusion/core/src/datasource/function.rs deleted file mode 100644 index 37ce59f8207b..000000000000 --- a/datafusion/core/src/datasource/function.rs +++ /dev/null @@ -1,63 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! A table that uses a function to generate data - -use super::TableProvider; - -use datafusion_common::Result; -use datafusion_expr::Expr; - -use std::fmt::Debug; -use std::sync::Arc; - -/// A trait for table function implementations -pub trait TableFunctionImpl: Debug + Sync + Send { - /// Create a table provider - fn call(&self, args: &[Expr]) -> Result>; -} - -/// A table that uses a function to generate data -#[derive(Debug)] -pub struct TableFunction { - /// Name of the table function - name: String, - /// Function implementation - fun: Arc, -} - -impl TableFunction { - /// Create a new table function - pub fn new(name: String, fun: Arc) -> Self { - Self { name, fun } - } - - /// Get the name of the table function - pub fn name(&self) -> &str { - &self.name - } - - /// Get the implementation of the table function - pub fn function(&self) -> &Arc { - &self.fun - } - - /// Get the function implementation and generate a table - pub fn create_table_provider(&self, args: &[Expr]) -> Result> { - self.fun.call(args) - } -} diff --git a/datafusion/core/src/datasource/mod.rs b/datafusion/core/src/datasource/mod.rs index ad369b75e130..7d3fe9ddd751 100644 --- a/datafusion/core/src/datasource/mod.rs +++ b/datafusion/core/src/datasource/mod.rs @@ -25,7 +25,6 @@ pub mod default_table_source; pub mod dynamic_file; pub mod empty; pub mod file_format; -pub mod function; pub mod listing; pub mod listing_table_factory; pub mod memory; diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index 5f01d41c31e7..1469b671d6c7 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -30,9 +30,8 @@ use crate::{ catalog_common::memory::MemorySchemaProvider, catalog_common::MemoryCatalogProvider, dataframe::DataFrame, - datasource::{ - function::{TableFunction, TableFunctionImpl}, - listing::{ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl}, + datasource::listing::{ + ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl, }, datasource::{provider_as_source, MemTable, ViewTable}, error::{DataFusionError, Result}, @@ -74,7 +73,9 @@ use crate::datasource::dynamic_file::DynamicListTableFactory; use crate::execution::session_state::SessionStateBuilder; use async_trait::async_trait; use chrono::{DateTime, Utc}; -use datafusion_catalog::{DynamicFileCatalog, SessionStore, UrlTableFactory}; +use datafusion_catalog::{ + DynamicFileCatalog, SessionStore, TableFunction, TableFunctionImpl, UrlTableFactory, +}; pub use datafusion_execution::config::SessionConfig; pub use datafusion_execution::TaskContext; pub use datafusion_expr::execution_props::ExecutionProps; diff --git a/datafusion/core/src/execution/session_state.rs b/datafusion/core/src/execution/session_state.rs index e99cf8222381..8f17ffb575aa 100644 --- a/datafusion/core/src/execution/session_state.rs +++ b/datafusion/core/src/execution/session_state.rs @@ -24,7 +24,6 @@ use crate::catalog_common::information_schema::{ use crate::catalog_common::MemoryCatalogProviderList; use crate::datasource::cte_worktable::CteWorkTable; use crate::datasource::file_format::{format_as_file_type, FileFormatFactory}; -use crate::datasource::function::{TableFunction, TableFunctionImpl}; use crate::datasource::provider_as_source; use crate::execution::context::{EmptySerializerRegistry, FunctionFactory, QueryPlanner}; use crate::execution::SessionStateDefaults; @@ -33,7 +32,7 @@ use crate::physical_planner::{DefaultPhysicalPlanner, PhysicalPlanner}; use arrow_schema::{DataType, SchemaRef}; use async_trait::async_trait; use chrono::{DateTime, Utc}; -use datafusion_catalog::Session; +use datafusion_catalog::{Session, TableFunction, TableFunctionImpl}; use datafusion_common::alias::AliasGenerator; use datafusion_common::config::{ConfigExtension, ConfigOptions, TableOptions}; use datafusion_common::display::{PlanType, StringifiedPlan, ToStringifiedPlan}; @@ -1074,6 +1073,7 @@ impl SessionStateBuilder { .with_scalar_functions(SessionStateDefaults::default_scalar_functions()) .with_aggregate_functions(SessionStateDefaults::default_aggregate_functions()) .with_window_functions(SessionStateDefaults::default_window_functions()) + .with_table_function_list(SessionStateDefaults::default_table_functions()) } /// Set the session id. @@ -1188,6 +1188,19 @@ impl SessionStateBuilder { self } + /// Set the list of [`TableFunction`]s + pub fn with_table_function_list( + mut self, + table_functions: Vec>, + ) -> Self { + let functions = table_functions + .into_iter() + .map(|f| (f.name().to_string(), f)) + .collect(); + self.table_functions = Some(functions); + self + } + /// Set the map of [`ScalarUDF`]s pub fn with_scalar_functions( mut self, diff --git a/datafusion/core/src/execution/session_state_defaults.rs b/datafusion/core/src/execution/session_state_defaults.rs index 7ba332c520c1..106082bc7b3b 100644 --- a/datafusion/core/src/execution/session_state_defaults.rs +++ b/datafusion/core/src/execution/session_state_defaults.rs @@ -29,7 +29,8 @@ use crate::datasource::provider::DefaultTableFactory; use crate::execution::context::SessionState; #[cfg(feature = "nested_expressions")] use crate::functions_nested; -use crate::{functions, functions_aggregate, functions_window}; +use crate::{functions, functions_aggregate, functions_table, functions_window}; +use datafusion_catalog::TableFunction; use datafusion_execution::config::SessionConfig; use datafusion_execution::object_store::ObjectStoreUrl; use datafusion_execution::runtime_env::RuntimeEnv; @@ -119,6 +120,11 @@ impl SessionStateDefaults { functions_window::all_default_window_functions() } + /// returns the list of default [`TableFunction`]s + pub fn default_table_functions() -> Vec> { + functions_table::all_default_table_functions() + } + /// returns the list of default [`FileFormatFactory']'s pub fn default_file_formats() -> Vec> { let file_formats: Vec> = vec![ diff --git a/datafusion/core/src/lib.rs b/datafusion/core/src/lib.rs index d049e774d7c6..a1b18b8bfe8c 100644 --- a/datafusion/core/src/lib.rs +++ b/datafusion/core/src/lib.rs @@ -773,6 +773,11 @@ pub mod functions_window { pub use datafusion_functions_window::*; } +/// re-export of [`datafusion_functions_table`] crate +pub mod functions_table { + pub use datafusion_functions_table::*; +} + /// re-export of variable provider for `@name` and `@@name` style runtime values. pub mod variable { pub use datafusion_expr::var_provider::{VarProvider, VarType}; diff --git a/datafusion/core/tests/user_defined/user_defined_table_functions.rs b/datafusion/core/tests/user_defined/user_defined_table_functions.rs index 0cc156866d4d..39f10ef11ab0 100644 --- a/datafusion/core/tests/user_defined/user_defined_table_functions.rs +++ b/datafusion/core/tests/user_defined/user_defined_table_functions.rs @@ -21,7 +21,6 @@ use arrow::csv::ReaderBuilder; use async_trait::async_trait; use datafusion::arrow::datatypes::SchemaRef; use datafusion::arrow::record_batch::RecordBatch; -use datafusion::datasource::function::TableFunctionImpl; use datafusion::datasource::TableProvider; use datafusion::error::Result; use datafusion::execution::TaskContext; @@ -29,6 +28,7 @@ use datafusion::physical_plan::memory::MemoryExec; use datafusion::physical_plan::{collect, ExecutionPlan}; use datafusion::prelude::SessionContext; use datafusion_catalog::Session; +use datafusion_catalog::TableFunctionImpl; use datafusion_common::{assert_batches_eq, DFSchema, ScalarValue}; use datafusion_expr::{EmptyRelation, Expr, LogicalPlan, Projection, TableType}; use std::fs::File; diff --git a/datafusion/functions-table/Cargo.toml b/datafusion/functions-table/Cargo.toml new file mode 100644 index 000000000000..cb81f27b55f2 --- /dev/null +++ b/datafusion/functions-table/Cargo.toml @@ -0,0 +1,61 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[package] +name = "datafusion-functions-table" +description = "Traits and types for logical plans and expressions for DataFusion query engine" +keywords = ["datafusion", "logical", "plan", "expressions"] +readme = "README.md" +version = { workspace = true } +edition = { workspace = true } +homepage = { workspace = true } +repository = { workspace = true } +license = { workspace = true } +authors = { workspace = true } +rust-version = { workspace = true } + +[lints] +workspace = true + +[lib] +name = "datafusion_functions_table" +path = "src/lib.rs" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +ahash = { workspace = true } +arrow = { workspace = true } +arrow-schema = { workspace = true } +async-trait = { workspace = true } +datafusion-catalog = { workspace = true } +datafusion-common = { workspace = true } +datafusion-execution = { workspace = true } +datafusion-expr = { workspace = true } +datafusion-functions-aggregate-common = { workspace = true } +datafusion-physical-expr = { workspace = true } +datafusion-physical-expr-common = { workspace = true } +datafusion-physical-plan = { workspace = true } +half = { workspace = true } +indexmap = { workspace = true } +log = { workspace = true } +paste = "1.0.14" + +[dev-dependencies] +arrow = { workspace = true, features = ["test_utils"] } +criterion = "0.5" +rand = { workspace = true } diff --git a/datafusion/functions-table/src/generate_series.rs b/datafusion/functions-table/src/generate_series.rs new file mode 100644 index 000000000000..d8f4933b0838 --- /dev/null +++ b/datafusion/functions-table/src/generate_series.rs @@ -0,0 +1,137 @@ +use arrow::array::Int64Array; +use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; +use arrow::record_batch::RecordBatch; +use async_trait::async_trait; +use datafusion_catalog::Session; +use datafusion_catalog::TableFunctionImpl; +use datafusion_catalog::TableProvider; +use datafusion_common::{not_impl_err, plan_err, Result, ScalarValue}; +use datafusion_expr::{Expr, TableType}; +use datafusion_physical_plan::memory::{StreamingBatchGenerator, StreamingMemoryExec}; +use datafusion_physical_plan::ExecutionPlan; +use std::fmt; +use std::sync::Arc; + +/// Table that generates a series of integers from `start`(inclusive) to `end`(inclusive) +#[derive(Debug, Clone)] +struct GenerateSeriesTable { + schema: SchemaRef, + start: i64, + end: i64, +} + +#[derive(Debug, Clone)] +struct GenerateSeriesState { + schema: SchemaRef, + _start: i64, + end: i64, + batch_size: usize, + + /// Tracks current position when generating table + current: i64, +} + +/// Detail to display for 'Explain' plan +impl fmt::Display for GenerateSeriesState { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!( + f, + "generate_series: start={}, end={}, batch_size={}", + self._start, self.end, self.batch_size + ) + } +} + +impl StreamingBatchGenerator for GenerateSeriesState { + fn generate_next_batch(&mut self) -> Result> { + // Check if we've reached the end + if self.current > self.end { + return Ok(None); + } + + // Construct batch + let batch_end = (self.current + self.batch_size as i64 - 1).min(self.end); + let array = Int64Array::from_iter_values(self.current..=batch_end); + let batch = RecordBatch::try_new(self.schema.clone(), vec![Arc::new(array)])?; + + // Update current position for next batch + self.current = batch_end + 1; + + Ok(Some(batch)) + } + + fn clone_box(&self) -> Box { + Box::new(self.clone()) + } +} + +#[async_trait] +impl TableProvider for GenerateSeriesTable { + fn as_any(&self) -> &dyn std::any::Any { + self + } + + fn schema(&self) -> SchemaRef { + self.schema.clone() + } + + fn table_type(&self) -> TableType { + TableType::Base + } + + async fn scan( + &self, + state: &dyn Session, + _projection: Option<&Vec>, + _filters: &[Expr], + _limit: Option, + ) -> Result> { + let batch_size = state.config_options().execution.batch_size; + + Ok(Arc::new(StreamingMemoryExec::try_new( + self.schema.clone(), + vec![Box::new(GenerateSeriesState { + schema: self.schema.clone(), + _start: self.start, + end: self.end, + current: self.start, + batch_size, + })], + )?)) + } +} + +#[derive(Debug)] +pub struct GenerateSeriesFunc {} + +impl TableFunctionImpl for GenerateSeriesFunc { + fn call(&self, exprs: &[Expr]) -> Result> { + if exprs.len() != 2 { + return plan_err!("generate_series expects 2 arguments"); + } + + let start = match &exprs[0] { + Expr::Literal(ScalarValue::Int64(Some(n))) => *n, + _ => return plan_err!("First argument must be an integer literal"), + }; + + let end = match &exprs[1] { + Expr::Literal(ScalarValue::Int64(Some(n))) => *n, + _ => return plan_err!("Second argument must be an integer literal"), + }; + + if end < start { + return not_impl_err!( + "End value must be greater than or equal to start value" + ); + } + + let schema = Arc::new(Schema::new(vec![Field::new( + "value", + DataType::Int64, + false, + )])); + + Ok(Arc::new(GenerateSeriesTable { schema, start, end })) + } +} diff --git a/datafusion/functions-table/src/lib.rs b/datafusion/functions-table/src/lib.rs new file mode 100644 index 000000000000..33cf21ca9a6e --- /dev/null +++ b/datafusion/functions-table/src/lib.rs @@ -0,0 +1,29 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +pub mod generate_series; + +use datafusion_catalog::TableFunction; +use std::sync::Arc; + +/// Returns all default table functions +pub fn all_default_table_functions() -> Vec> { + vec![Arc::new(TableFunction::new( + "generate_series".to_string(), + Arc::new(generate_series::GenerateSeriesFunc {}), + ))] +} diff --git a/datafusion/physical-plan/src/memory.rs b/datafusion/physical-plan/src/memory.rs index 272dcdc95bc0..69a219b6a8c1 100644 --- a/datafusion/physical-plan/src/memory.rs +++ b/datafusion/physical-plan/src/memory.rs @@ -365,8 +365,165 @@ impl RecordBatchStream for MemoryStream { } } +pub trait StreamingBatchGenerator: Send + Sync + fmt::Debug + fmt::Display { + /// Generate the next batch, return `None` when no more batches are available + fn generate_next_batch(&mut self) -> Result>; + + /// Creates a boxed clone of this generator. + /// + /// This method is required because `Clone` cannot be directly implemented for + /// trait objects. It provides a way to clone trait objects of + /// StreamingBatchGenerator while maintaining proper type erasure. + fn clone_box(&self) -> Box; +} + +/// Execution plan for streaming in-memory batches of data +/// +/// This plan generates output batches lazily, it doesn't have to buffer all batches +/// in memory up front (compared to `MemoryExec`), thus consuming constant memory. +pub struct StreamingMemoryExec { + /// Schema representing the data + schema: SchemaRef, + /// Functions to generate batches for each partition + batch_generators: Vec>, + /// Total number of rows to generate for statistics + cache: PlanProperties, +} + +impl StreamingMemoryExec { + /// Create a new streaming memory execution plan + pub fn try_new( + schema: SchemaRef, + generators: Vec>, + ) -> Result { + let cache = PlanProperties::new( + EquivalenceProperties::new(Arc::clone(&schema)), + Partitioning::RoundRobinBatch(generators.len()), + ExecutionMode::Bounded, + ); + Ok(Self { + schema, + batch_generators: generators, + cache, + }) + } +} + +impl fmt::Debug for StreamingMemoryExec { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("StreamingMemoryExec") + .field("schema", &self.schema) + .field("batch_generators", &self.batch_generators) + .finish() + } +} + +impl DisplayAs for StreamingMemoryExec { + fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result { + match t { + DisplayFormatType::Default | DisplayFormatType::Verbose => { + write!( + f, + "StreamingMemoryExec: partitions={}, batch_generators=[{}]", + self.batch_generators.len(), + self.batch_generators + .iter() + .map(|g| g.to_string()) + .collect::>() + .join(", ") + ) + } + } + } +} + +impl ExecutionPlan for StreamingMemoryExec { + fn name(&self) -> &'static str { + "StreamingMemoryExec" + } + + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + Arc::clone(&self.schema) + } + + fn properties(&self) -> &PlanProperties { + &self.cache + } + + fn children(&self) -> Vec<&Arc> { + vec![] + } + + fn with_new_children( + self: Arc, + children: Vec>, + ) -> Result> { + if children.is_empty() { + Ok(self) + } else { + internal_err!("Children cannot be replaced in StreamingMemoryExec") + } + } + + fn execute( + &self, + partition: usize, + _context: Arc, + ) -> Result { + if partition >= self.batch_generators.len() { + return internal_err!( + "Invalid partition {} for StreamingMemoryExec with {} partitions", + partition, + self.batch_generators.len() + ); + } + + Ok(Box::pin(StreamingMemoryStream { + schema: Arc::clone(&self.schema), + generator: self.batch_generators[partition].clone_box(), + })) + } + + fn statistics(&self) -> Result { + Ok(Statistics::new_unknown(&self.schema)) + } +} + +/// Stream that generates record batches on demand +pub struct StreamingMemoryStream { + schema: SchemaRef, + generator: Box, +} + +impl Stream for StreamingMemoryStream { + type Item = Result; + + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + _: &mut Context<'_>, + ) -> Poll> { + let batch = self.generator.generate_next_batch(); + + match batch { + Ok(Some(batch)) => Poll::Ready(Some(Ok(batch))), + Ok(None) => Poll::Ready(None), + Err(e) => Poll::Ready(Some(Err(e))), + } + } +} + +impl RecordBatchStream for StreamingMemoryStream { + fn schema(&self) -> SchemaRef { + Arc::clone(&self.schema) + } +} + #[cfg(test)] -mod tests { +mod memory_exec_tests { use std::sync::Arc; use crate::memory::MemoryExec; @@ -416,3 +573,130 @@ mod tests { Ok(()) } } + +#[cfg(test)] +mod streaming_memory_tests { + use super::*; + use arrow::array::Int64Array; + use arrow::datatypes::{DataType, Field, Schema}; + use futures::StreamExt; + + #[derive(Debug, Clone)] + struct TestGenerator { + counter: i64, + max_batches: i64, + batch_size: usize, + schema: SchemaRef, + } + + impl fmt::Display for TestGenerator { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!( + f, + "TestGenerator: counter={}, max_batches={}, batch_size={}", + self.counter, self.max_batches, self.batch_size + ) + } + } + + impl StreamingBatchGenerator for TestGenerator { + fn generate_next_batch(&mut self) -> Result> { + if self.counter >= self.max_batches { + return Ok(None); + } + + let array = Int64Array::from_iter_values( + (self.counter * self.batch_size as i64) + ..(self.counter * self.batch_size as i64 + self.batch_size as i64), + ); + self.counter += 1; + Ok(Some(RecordBatch::try_new( + self.schema.clone(), + vec![Arc::new(array)], + )?)) + } + + fn clone_box(&self) -> Box { + Box::new(TestGenerator { + counter: self.counter, + max_batches: self.max_batches, + batch_size: self.batch_size, + schema: self.schema.clone(), + }) + } + } + + #[tokio::test] + async fn test_streaming_memory_exec() -> Result<()> { + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, false)])); + let generator = TestGenerator { + counter: 0, + max_batches: 3, + batch_size: 2, + schema: schema.clone(), + }; + + let exec = StreamingMemoryExec::try_new(schema, vec![Box::new(generator)])?; + + // Test schema + assert_eq!(exec.schema().fields().len(), 1); + assert_eq!(exec.schema().field(0).name(), "a"); + + // Test execution + let stream = exec.execute(0, Arc::new(TaskContext::default()))?; + let batches: Vec<_> = stream.collect::>().await; + + assert_eq!(batches.len(), 3); + + // Verify batch contents + let batch0 = batches[0].as_ref().unwrap(); + let array0 = batch0 + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(array0.values(), &[0, 1]); + + let batch1 = batches[1].as_ref().unwrap(); + let array1 = batch1 + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(array1.values(), &[2, 3]); + + let batch2 = batches[2].as_ref().unwrap(); + let array2 = batch2 + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(array2.values(), &[4, 5]); + + Ok(()) + } + + #[tokio::test] + async fn test_streaming_memory_exec_invalid_partition() -> Result<()> { + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, false)])); + let generator = TestGenerator { + counter: 0, + max_batches: 1, + batch_size: 1, + schema: schema.clone(), + }; + + let exec = StreamingMemoryExec::try_new(schema, vec![Box::new(generator)])?; + + // Test invalid partition + let result = exec.execute(1, Arc::new(TaskContext::default())); + + // partition is 0-indexed, so there only should be partition 0 + assert!(matches!( + result, + Err(e) if e.to_string().contains("Invalid partition 1 for StreamingMemoryExec with 1 partitions") + )); + + Ok(()) + } +} diff --git a/datafusion/sqllogictest/test_files/table_functions.slt b/datafusion/sqllogictest/test_files/table_functions.slt new file mode 100644 index 000000000000..70569ea78e26 --- /dev/null +++ b/datafusion/sqllogictest/test_files/table_functions.slt @@ -0,0 +1,112 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Test generate_series table function + +query I rowsort +SELECT * FROM generate_series(1, 5) +---- +1 +2 +3 +4 +5 + +query I rowsort +SELECT * FROM generate_series(1, 1) +---- +1 + +query I rowsort +SELECT * FROM generate_series(3, 6) +---- +3 +4 +5 +6 + +query I rowsort +SELECT SUM(v1) FROM generate_series(1, 5) t1(v1) +---- +15 + +# Test generate_series with WHERE clause +query I rowsort +SELECT * FROM generate_series(1, 10) t1(v1) WHERE v1 % 2 = 0 +---- +10 +2 +4 +6 +8 + +# Test generate_series with ORDER BY +query I +SELECT * FROM generate_series(1, 5) t1(v1) ORDER BY v1 DESC +---- +5 +4 +3 +2 +1 + +# Test generate_series with LIMIT +query I rowsort +SELECT * FROM generate_series(1, 100) t1(v1) LIMIT 5 +---- +1 +2 +3 +4 +5 + +# Test generate_series in subquery +query I rowsort +SELECT v1 + 10 FROM (SELECT * FROM generate_series(1, 3) t1(v1)) +---- +11 +12 +13 + +# Test generate_series with JOIN +query II rowsort +SELECT a.v1, b.v1 +FROM generate_series(1, 3) a(v1) +JOIN generate_series(2, 4) b(v1) +ON a.v1 = b.v1 - 1 +---- +1 2 +2 3 +3 4 + +query TT +EXPLAIN SELECT * FROM generate_series(1, 5) +---- +logical_plan TableScan: tmp_table projection=[value] +physical_plan StreamingMemoryExec: partitions=1, batch_generators=[generate_series: start=1, end=5, batch_size=8192] + +query error DataFusion error: This feature is not implemented: End value must be greater than or equal to start value +SELECT * FROM generate_series(5, 1) + +statement error DataFusion error: Error during planning: First argument must be an integer literal +SELECT * FROM generate_series(NULL, 5) + +statement error DataFusion error: Error during planning: Second argument must be an integer literal +SELECT * FROM generate_series(1, NULL) + +statement error DataFusion error: Error during planning: generate_series expects 2 arguments +SELECT * FROM generate_series(1, 5, NULL) From 92e94e632ab5bcc434702d2e5cf0fd53796693b6 Mon Sep 17 00:00:00 2001 From: Yongting You <2010youy01@gmail.com> Date: Sun, 24 Nov 2024 00:17:12 +0800 Subject: [PATCH 2/4] liscence --- .../functions-table/src/generate_series.rs | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/datafusion/functions-table/src/generate_series.rs b/datafusion/functions-table/src/generate_series.rs index d8f4933b0838..f985a1a7a9e3 100644 --- a/datafusion/functions-table/src/generate_series.rs +++ b/datafusion/functions-table/src/generate_series.rs @@ -1,3 +1,20 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + use arrow::array::Int64Array; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use arrow::record_batch::RecordBatch; From 95f95f226ee56eeaf2a85d13903b343e5bd4550c Mon Sep 17 00:00:00 2001 From: Yongting You <2010youy01@gmail.com> Date: Sun, 24 Nov 2024 00:26:35 +0800 Subject: [PATCH 3/4] fix examples --- datafusion-examples/Cargo.toml | 1 + datafusion-examples/examples/simple_udtf.rs | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/datafusion-examples/Cargo.toml b/datafusion-examples/Cargo.toml index e2432abdc138..0305d9bd037c 100644 --- a/datafusion-examples/Cargo.toml +++ b/datafusion-examples/Cargo.toml @@ -60,6 +60,7 @@ async-trait = { workspace = true } bytes = { workspace = true } dashmap = { workspace = true } datafusion = { workspace = true, default-features = true, features = ["avro"] } +datafusion-catalog = { workspace = true } datafusion-common = { workspace = true, default-features = true } datafusion-expr = { workspace = true } datafusion-functions-window-common = { workspace = true } diff --git a/datafusion-examples/examples/simple_udtf.rs b/datafusion-examples/examples/simple_udtf.rs index 6faa397ef60f..f32560ede69d 100644 --- a/datafusion-examples/examples/simple_udtf.rs +++ b/datafusion-examples/examples/simple_udtf.rs @@ -21,13 +21,13 @@ use async_trait::async_trait; use datafusion::arrow::datatypes::SchemaRef; use datafusion::arrow::record_batch::RecordBatch; use datafusion::catalog::Session; -use datafusion::datasource::function::TableFunctionImpl; use datafusion::datasource::TableProvider; use datafusion::error::Result; use datafusion::execution::context::ExecutionProps; use datafusion::physical_plan::memory::MemoryExec; use datafusion::physical_plan::ExecutionPlan; use datafusion::prelude::SessionContext; +use datafusion_catalog::TableFunctionImpl; use datafusion_common::{plan_err, ScalarValue}; use datafusion_expr::simplify::SimplifyContext; use datafusion_expr::{Expr, TableType}; From 5df8964951136f4cc247360edd34f9b32d6a83d5 Mon Sep 17 00:00:00 2001 From: Yongting You <2010youy01@gmail.com> Date: Sun, 24 Nov 2024 19:15:19 +0800 Subject: [PATCH 4/4] clippy --- datafusion/physical-plan/src/memory.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/datafusion/physical-plan/src/memory.rs b/datafusion/physical-plan/src/memory.rs index 69a219b6a8c1..c5649be18baf 100644 --- a/datafusion/physical-plan/src/memory.rs +++ b/datafusion/physical-plan/src/memory.rs @@ -611,7 +611,7 @@ mod streaming_memory_tests { ); self.counter += 1; Ok(Some(RecordBatch::try_new( - self.schema.clone(), + Arc::clone(&self.schema), vec![Arc::new(array)], )?)) } @@ -621,7 +621,7 @@ mod streaming_memory_tests { counter: self.counter, max_batches: self.max_batches, batch_size: self.batch_size, - schema: self.schema.clone(), + schema: Arc::clone(&self.schema), }) } } @@ -633,7 +633,7 @@ mod streaming_memory_tests { counter: 0, max_batches: 3, batch_size: 2, - schema: schema.clone(), + schema: Arc::clone(&schema), }; let exec = StreamingMemoryExec::try_new(schema, vec![Box::new(generator)])?; @@ -683,7 +683,7 @@ mod streaming_memory_tests { counter: 0, max_batches: 1, batch_size: 1, - schema: schema.clone(), + schema: Arc::clone(&schema), }; let exec = StreamingMemoryExec::try_new(schema, vec![Box::new(generator)])?;