Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add generate_series() udtf (and introduce 'lazy' MemoryExec) #13540

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ members = [
"datafusion-examples/examples/ffi/ffi_module_loader",
"test-utils",
"benchmarks",
"datafusion/functions-table",
]
resolver = "2"

Expand Down Expand Up @@ -108,6 +109,7 @@ datafusion-functions = { path = "datafusion/functions", version = "43.0.0" }
datafusion-functions-aggregate = { path = "datafusion/functions-aggregate", version = "43.0.0" }
datafusion-functions-aggregate-common = { path = "datafusion/functions-aggregate-common", version = "43.0.0" }
datafusion-functions-nested = { path = "datafusion/functions-nested", version = "43.0.0" }
datafusion-functions-table = { path = "datafusion/functions-table", version = "43.0.0" }
datafusion-functions-window = { path = "datafusion/functions-window", version = "43.0.0" }
datafusion-functions-window-common = { path = "datafusion/functions-window-common", version = "43.0.0" }
datafusion-optimizer = { path = "datafusion/optimizer", version = "43.0.0", default-features = false }
Expand Down
24 changes: 24 additions & 0 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions datafusion-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ datafusion = { path = "../datafusion/core", version = "43.0.0", features = [
"unicode_expressions",
"compression",
] }
datafusion-catalog = { path = "../datafusion/catalog", version = "43.0.0" }
dirs = "5.0.1"
env_logger = "0.11"
futures = "0.3"
Expand Down
2 changes: 1 addition & 1 deletion datafusion-cli/src/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@ use async_trait::async_trait;

use datafusion::catalog::Session;
use datafusion::common::{plan_err, Column};
use datafusion::datasource::function::TableFunctionImpl;
use datafusion::datasource::TableProvider;
use datafusion::error::Result;
use datafusion::logical_expr::Expr;
use datafusion::physical_plan::memory::MemoryExec;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::scalar::ScalarValue;
use datafusion_catalog::TableFunctionImpl;
use parquet::basic::ConvertedType;
use parquet::data_type::{ByteArray, FixedLenByteArray};
use parquet::file::reader::FileReader;
Expand Down
1 change: 1 addition & 0 deletions datafusion-examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ async-trait = { workspace = true }
bytes = { workspace = true }
dashmap = { workspace = true }
datafusion = { workspace = true, default-features = true, features = ["avro"] }
datafusion-catalog = { workspace = true }
datafusion-common = { workspace = true, default-features = true }
datafusion-expr = { workspace = true }
datafusion-functions-window-common = { workspace = true }
Expand Down
2 changes: 1 addition & 1 deletion datafusion-examples/examples/simple_udtf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@ use async_trait::async_trait;
use datafusion::arrow::datatypes::SchemaRef;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::catalog::Session;
use datafusion::datasource::function::TableFunctionImpl;
use datafusion::datasource::TableProvider;
use datafusion::error::Result;
use datafusion::execution::context::ExecutionProps;
use datafusion::physical_plan::memory::MemoryExec;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::prelude::SessionContext;
use datafusion_catalog::TableFunctionImpl;
use datafusion_common::{plan_err, ScalarValue};
use datafusion_expr::simplify::SimplifyContext;
use datafusion_expr::{Expr, TableType};
Expand Down
41 changes: 40 additions & 1 deletion datafusion/catalog/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@ use arrow_schema::SchemaRef;
use async_trait::async_trait;
use datafusion_common::Result;
use datafusion_common::{not_impl_err, Constraints, Statistics};
use datafusion_expr::Expr;

use datafusion_expr::dml::InsertOp;
use datafusion_expr::{
CreateExternalTable, Expr, LogicalPlan, TableProviderFilterPushDown, TableType,
CreateExternalTable, LogicalPlan, TableProviderFilterPushDown, TableType,
};
use datafusion_physical_plan::ExecutionPlan;

Expand Down Expand Up @@ -297,3 +299,40 @@ pub trait TableProviderFactory: Debug + Sync + Send {
cmd: &CreateExternalTable,
) -> Result<Arc<dyn TableProvider>>;
}

/// A trait for table function implementations
pub trait TableFunctionImpl: Debug + Sync + Send {
/// Create a table provider
fn call(&self, args: &[Expr]) -> Result<Arc<dyn TableProvider>>;
}

/// A table that uses a function to generate data
#[derive(Debug)]
pub struct TableFunction {
/// Name of the table function
name: String,
/// Function implementation
fun: Arc<dyn TableFunctionImpl>,
}

impl TableFunction {
/// Create a new table function
pub fn new(name: String, fun: Arc<dyn TableFunctionImpl>) -> Self {
Self { name, fun }
}

/// Get the name of the table function
pub fn name(&self) -> &str {
&self.name
}

/// Get the implementation of the table function
pub fn function(&self) -> &Arc<dyn TableFunctionImpl> {
&self.fun
}

/// Get the function implementation and generate a table
pub fn create_table_provider(&self, args: &[Expr]) -> Result<Arc<dyn TableProvider>> {
self.fun.call(args)
}
}
1 change: 1 addition & 0 deletions datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ datafusion-expr = { workspace = true }
datafusion-functions = { workspace = true }
datafusion-functions-aggregate = { workspace = true }
datafusion-functions-nested = { workspace = true, optional = true }
datafusion-functions-table = { workspace = true }
datafusion-functions-window = { workspace = true }
datafusion-optimizer = { workspace = true }
datafusion-physical-expr = { workspace = true }
Expand Down
63 changes: 0 additions & 63 deletions datafusion/core/src/datasource/function.rs

This file was deleted.

1 change: 0 additions & 1 deletion datafusion/core/src/datasource/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ pub mod default_table_source;
pub mod dynamic_file;
pub mod empty;
pub mod file_format;
pub mod function;
pub mod listing;
pub mod listing_table_factory;
pub mod memory;
Expand Down
9 changes: 5 additions & 4 deletions datafusion/core/src/execution/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,8 @@ use crate::{
catalog_common::memory::MemorySchemaProvider,
catalog_common::MemoryCatalogProvider,
dataframe::DataFrame,
datasource::{
function::{TableFunction, TableFunctionImpl},
listing::{ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl},
datasource::listing::{
ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
},
datasource::{provider_as_source, MemTable, ViewTable},
error::{DataFusionError, Result},
Expand Down Expand Up @@ -74,7 +73,9 @@ use crate::datasource::dynamic_file::DynamicListTableFactory;
use crate::execution::session_state::SessionStateBuilder;
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use datafusion_catalog::{DynamicFileCatalog, SessionStore, UrlTableFactory};
use datafusion_catalog::{
DynamicFileCatalog, SessionStore, TableFunction, TableFunctionImpl, UrlTableFactory,
};
pub use datafusion_execution::config::SessionConfig;
pub use datafusion_execution::TaskContext;
pub use datafusion_expr::execution_props::ExecutionProps;
Expand Down
17 changes: 15 additions & 2 deletions datafusion/core/src/execution/session_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ use crate::catalog_common::information_schema::{
use crate::catalog_common::MemoryCatalogProviderList;
use crate::datasource::cte_worktable::CteWorkTable;
use crate::datasource::file_format::{format_as_file_type, FileFormatFactory};
use crate::datasource::function::{TableFunction, TableFunctionImpl};
use crate::datasource::provider_as_source;
use crate::execution::context::{EmptySerializerRegistry, FunctionFactory, QueryPlanner};
use crate::execution::SessionStateDefaults;
Expand All @@ -33,7 +32,7 @@ use crate::physical_planner::{DefaultPhysicalPlanner, PhysicalPlanner};
use arrow_schema::{DataType, SchemaRef};
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use datafusion_catalog::Session;
use datafusion_catalog::{Session, TableFunction, TableFunctionImpl};
use datafusion_common::alias::AliasGenerator;
use datafusion_common::config::{ConfigExtension, ConfigOptions, TableOptions};
use datafusion_common::display::{PlanType, StringifiedPlan, ToStringifiedPlan};
Expand Down Expand Up @@ -1074,6 +1073,7 @@ impl SessionStateBuilder {
.with_scalar_functions(SessionStateDefaults::default_scalar_functions())
.with_aggregate_functions(SessionStateDefaults::default_aggregate_functions())
.with_window_functions(SessionStateDefaults::default_window_functions())
.with_table_function_list(SessionStateDefaults::default_table_functions())
}

/// Set the session id.
Expand Down Expand Up @@ -1188,6 +1188,19 @@ impl SessionStateBuilder {
self
}

/// Set the list of [`TableFunction`]s
pub fn with_table_function_list(
mut self,
table_functions: Vec<Arc<TableFunction>>,
) -> Self {
let functions = table_functions
.into_iter()
.map(|f| (f.name().to_string(), f))
.collect();
self.table_functions = Some(functions);
self
}

/// Set the map of [`ScalarUDF`]s
pub fn with_scalar_functions(
mut self,
Expand Down
8 changes: 7 additions & 1 deletion datafusion/core/src/execution/session_state_defaults.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ use crate::datasource::provider::DefaultTableFactory;
use crate::execution::context::SessionState;
#[cfg(feature = "nested_expressions")]
use crate::functions_nested;
use crate::{functions, functions_aggregate, functions_window};
use crate::{functions, functions_aggregate, functions_table, functions_window};
use datafusion_catalog::TableFunction;
use datafusion_execution::config::SessionConfig;
use datafusion_execution::object_store::ObjectStoreUrl;
use datafusion_execution::runtime_env::RuntimeEnv;
Expand Down Expand Up @@ -119,6 +120,11 @@ impl SessionStateDefaults {
functions_window::all_default_window_functions()
}

/// returns the list of default [`TableFunction`]s
pub fn default_table_functions() -> Vec<Arc<TableFunction>> {
functions_table::all_default_table_functions()
}

/// returns the list of default [`FileFormatFactory']'s
pub fn default_file_formats() -> Vec<Arc<dyn FileFormatFactory>> {
let file_formats: Vec<Arc<dyn FileFormatFactory>> = vec![
Expand Down
5 changes: 5 additions & 0 deletions datafusion/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -773,6 +773,11 @@ pub mod functions_window {
pub use datafusion_functions_window::*;
}

/// re-export of [`datafusion_functions_table`] crate
pub mod functions_table {
pub use datafusion_functions_table::*;
}

/// re-export of variable provider for `@name` and `@@name` style runtime values.
pub mod variable {
pub use datafusion_expr::var_provider::{VarProvider, VarType};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@ use arrow::csv::ReaderBuilder;
use async_trait::async_trait;
use datafusion::arrow::datatypes::SchemaRef;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::datasource::function::TableFunctionImpl;
use datafusion::datasource::TableProvider;
use datafusion::error::Result;
use datafusion::execution::TaskContext;
use datafusion::physical_plan::memory::MemoryExec;
use datafusion::physical_plan::{collect, ExecutionPlan};
use datafusion::prelude::SessionContext;
use datafusion_catalog::Session;
use datafusion_catalog::TableFunctionImpl;
use datafusion_common::{assert_batches_eq, DFSchema, ScalarValue};
use datafusion_expr::{EmptyRelation, Expr, LogicalPlan, Projection, TableType};
use std::fs::File;
Expand Down
Loading
Loading