Skip to content

Commit

Permalink
Merge remote-tracking branch 'apache/main' into alamb/improve_boolean…
Browse files Browse the repository at this point in the history
…_handling
  • Loading branch information
alamb committed Sep 27, 2024
2 parents e11f89c + 9b4f90a commit 72bea9d
Show file tree
Hide file tree
Showing 78 changed files with 1,359 additions and 389 deletions.
1 change: 1 addition & 0 deletions datafusion-cli/src/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ pub fn display_all_functions() -> Result<()> {
}

/// PARQUET_META table function
#[derive(Debug)]
struct ParquetMetadataTable {
schema: SchemaRef,
batch: RecordBatch,
Expand Down
1 change: 1 addition & 0 deletions datafusion-examples/examples/simple_udtf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ async fn main() -> Result<()> {
/// Usage: `read_csv(filename, [limit])`
///
/// [`read_csv`]: https://duckdb.org/docs/data/csv/overview.html
#[derive(Debug)]
struct LocalCsvTable {
schema: SchemaRef,
limit: Option<usize>,
Expand Down
8 changes: 5 additions & 3 deletions datafusion/catalog/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

use std::any::Any;
use std::borrow::Cow;
use std::fmt::Debug;
use std::sync::Arc;

use crate::session::Session;
Expand All @@ -31,7 +32,7 @@ use datafusion_physical_plan::ExecutionPlan;

/// Source table
#[async_trait]
pub trait TableProvider: Sync + Send {
pub trait TableProvider: Debug + Sync + Send {
/// Returns the table provider as [`Any`](std::any::Any) so that it can be
/// downcast to a specific implementation.
fn as_any(&self) -> &dyn Any;
Expand Down Expand Up @@ -193,6 +194,7 @@ pub trait TableProvider: Sync + Send {
/// # use datafusion_expr::{Expr, TableProviderFilterPushDown, TableType};
/// # use datafusion_physical_plan::ExecutionPlan;
/// // Define a struct that implements the TableProvider trait
/// #[derive(Debug)]
/// struct TestDataSource {}
///
/// #[async_trait]
Expand All @@ -212,7 +214,7 @@ pub trait TableProvider: Sync + Send {
/// // This example only supports a between expr with a single column named "c1".
/// Expr::Between(between_expr) => {
/// between_expr.expr
/// .try_into_col()
/// .try_as_col()
/// .map(|column| {
/// if column.name == "c1" {
/// TableProviderFilterPushDown::Exact
Expand Down Expand Up @@ -283,7 +285,7 @@ pub trait TableProvider: Sync + Send {
/// For example, this can be used to create a table "on the fly"
/// from a directory of files only when that name is referenced.
#[async_trait]
pub trait TableProviderFactory: Sync + Send {
pub trait TableProviderFactory: Debug + Sync + Send {
/// Create a TableProvider with the given url
async fn create(
&self,
Expand Down
22 changes: 18 additions & 4 deletions datafusion/core/src/catalog_common/information_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,15 @@
//!
//! [Information Schema]: https://en.wikipedia.org/wiki/Information_schema

use async_trait::async_trait;
use datafusion_common::DataFusionError;
use std::{any::Any, sync::Arc};

use arrow::{
array::{StringBuilder, UInt64Builder},
datatypes::{DataType, Field, Schema, SchemaRef},
record_batch::RecordBatch,
};
use async_trait::async_trait;
use datafusion_common::DataFusionError;
use std::fmt::{Debug, Formatter};
use std::{any::Any, sync::Arc};

use crate::catalog::{CatalogProviderList, SchemaProvider, TableProvider};
use crate::datasource::streaming::StreamingTable;
Expand Down Expand Up @@ -75,6 +75,15 @@ struct InformationSchemaConfig {
catalog_list: Arc<dyn CatalogProviderList>,
}

impl Debug for InformationSchemaConfig {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("InformationSchemaConfig")
// TODO it would be great to print the catalog list here
// but that would require CatalogProviderList to implement Debug
.finish_non_exhaustive()
}
}

impl InformationSchemaConfig {
/// Construct the `information_schema.tables` virtual table
async fn make_tables(
Expand Down Expand Up @@ -246,6 +255,7 @@ impl SchemaProvider for InformationSchemaProvider {
}
}

#[derive(Debug)]
struct InformationSchemaTables {
schema: SchemaRef,
config: InformationSchemaConfig,
Expand Down Expand Up @@ -337,6 +347,7 @@ impl InformationSchemaTablesBuilder {
}
}

#[derive(Debug)]
struct InformationSchemaViews {
schema: SchemaRef,
config: InformationSchemaConfig,
Expand Down Expand Up @@ -424,6 +435,7 @@ impl InformationSchemaViewBuilder {
}
}

#[derive(Debug)]
struct InformationSchemaColumns {
schema: SchemaRef,
config: InformationSchemaConfig,
Expand Down Expand Up @@ -640,6 +652,7 @@ impl InformationSchemaColumnsBuilder {
}
}

#[derive(Debug)]
struct InformationSchemata {
schema: SchemaRef,
config: InformationSchemaConfig,
Expand Down Expand Up @@ -741,6 +754,7 @@ impl PartitionStream for InformationSchemata {
}
}

#[derive(Debug)]
struct InformationSchemaDfSettings {
schema: SchemaRef,
config: InformationSchemaConfig,
Expand Down
38 changes: 38 additions & 0 deletions datafusion/core/src/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1873,6 +1873,7 @@ impl DataFrame {
}
}

#[derive(Debug)]
struct DataFrameTableProvider {
plan: LogicalPlan,
}
Expand Down Expand Up @@ -2028,6 +2029,43 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn test_coalesce_schema() -> Result<()> {
let ctx = SessionContext::new();

let query = r#"SELECT COALESCE(null, 5)"#;

let result = ctx.sql(query).await?;
assert_logical_expr_schema_eq_physical_expr_schema(result).await?;
Ok(())
}

#[tokio::test]
async fn test_coalesce_from_values_schema() -> Result<()> {
let ctx = SessionContext::new();

let query = r#"SELECT COALESCE(column1, column2) FROM VALUES (null, 1.2)"#;

let result = ctx.sql(query).await?;
assert_logical_expr_schema_eq_physical_expr_schema(result).await?;
Ok(())
}

#[tokio::test]
async fn test_coalesce_from_values_schema_multiple_rows() -> Result<()> {
let ctx = SessionContext::new();

let query = r#"SELECT COALESCE(column1, column2)
FROM VALUES
(null, 1.2),
(1.1, null),
(2, 5);"#;

let result = ctx.sql(query).await?;
assert_logical_expr_schema_eq_physical_expr_schema(result).await?;
Ok(())
}

#[tokio::test]
async fn test_array_agg_schema() -> Result<()> {
let ctx = SessionContext::new();
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/src/datasource/cte_worktable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use crate::datasource::{TableProvider, TableType};
/// The temporary working table where the previous iteration of a recursive query is stored
/// Naming is based on PostgreSQL's implementation.
/// See here for more details: www.postgresql.org/docs/11/queries-with.html#id-1.5.6.12.5.4
#[derive(Debug)]
pub struct CteWorkTable {
/// The name of the CTE work table
// WIP, see https://github.com/apache/datafusion/issues/462
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/src/datasource/empty.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use crate::physical_plan::{empty::EmptyExec, ExecutionPlan};

/// An empty plan that is useful for testing and generating plans
/// without mapping them to actual data.
#[derive(Debug)]
pub struct EmptyTable {
schema: SchemaRef,
partitions: usize,
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -646,6 +646,7 @@ impl ListingOptions {
/// # Ok(())
/// # }
/// ```
#[derive(Debug)]
pub struct ListingTable {
table_paths: Vec<ListingTableUrl>,
/// File fields only
Expand Down
2 changes: 2 additions & 0 deletions datafusion/core/src/datasource/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,7 @@ impl StreamConfig {
///
/// [Hadoop]: https://hadoop.apache.org/
/// [`ListingTable`]: crate::datasource::listing::ListingTable
#[derive(Debug)]
pub struct StreamTable(Arc<StreamConfig>);

impl StreamTable {
Expand Down Expand Up @@ -370,6 +371,7 @@ impl TableProvider for StreamTable {
}
}

#[derive(Debug)]
struct StreamRead(Arc<StreamConfig>);

impl PartitionStream for StreamRead {
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/src/datasource/streaming.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use datafusion_expr::{Expr, TableType};
use log::debug;

/// A [`TableProvider`] that streams a set of [`PartitionStream`]
#[derive(Debug)]
pub struct StreamingTable {
schema: SchemaRef,
partitions: Vec<Arc<dyn PartitionStream>>,
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/src/datasource/view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use datafusion_optimizer::Analyzer;
use crate::datasource::{TableProvider, TableType};

/// An implementation of `TableProvider` that uses another logical plan.
#[derive(Debug)]
pub struct ViewTable {
/// LogicalPlan of the view
logical_plan: LogicalPlan,
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/physical_optimizer/coalesce_batches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use datafusion_physical_optimizer::PhysicalOptimizerRule;

/// Optimizer rule that introduces CoalesceBatchesExec to avoid overhead with small batches that
/// are produced by highly selective filters
#[derive(Default)]
#[derive(Default, Debug)]
pub struct CoalesceBatches {}

impl CoalesceBatches {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ use itertools::izip;
///
/// This rule only chooses the exact match and satisfies the Distribution(a, b, c)
/// by a HashPartition(a, b, c).
#[derive(Default)]
#[derive(Default, Debug)]
pub struct EnforceDistribution {}

impl EnforceDistribution {
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/physical_optimizer/enforce_sorting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ use itertools::izip;

/// This rule inspects [`SortExec`]'s in the given physical plan and removes the
/// ones it can prove unnecessary.
#[derive(Default)]
#[derive(Default, Debug)]
pub struct EnforceSorting {}

impl EnforceSorting {
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/physical_optimizer/join_selection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ use datafusion_physical_optimizer::PhysicalOptimizerRule;
/// The [`JoinSelection`] rule tries to modify a given plan so that it can
/// accommodate infinite sources and optimize joins in the plan according to
/// available statistical information, if there is any.
#[derive(Default)]
#[derive(Default, Debug)]
pub struct JoinSelection {}

impl JoinSelection {
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/physical_optimizer/optimizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use crate::physical_optimizer::sanity_checker::SanityCheckPlan;
use crate::physical_optimizer::topk_aggregation::TopKAggregation;

/// A rule-based physical optimizer.
#[derive(Clone)]
#[derive(Clone, Debug)]
pub struct PhysicalOptimizer {
/// All rules to apply
pub rules: Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ use itertools::Itertools;

/// This rule inspects [`ProjectionExec`]'s in the given physical plan and tries to
/// remove or swap with its child.
#[derive(Default)]
#[derive(Default, Debug)]
pub struct ProjectionPushdown {}

impl ProjectionPushdown {
Expand Down Expand Up @@ -1831,6 +1831,7 @@ mod tests {

#[test]
fn test_streaming_table_after_projection() -> Result<()> {
#[derive(Debug)]
struct DummyStreamPartition {
schema: SchemaRef,
}
Expand Down
9 changes: 5 additions & 4 deletions datafusion/core/src/physical_optimizer/sanity_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use datafusion_physical_expr::intervals::utils::{check_support, is_datatype_supp
use datafusion_physical_plan::joins::SymmetricHashJoinExec;
use datafusion_physical_plan::{get_plan_string, ExecutionPlanProperties};

use datafusion_physical_expr_common::sort_expr::format_physical_sort_requirement_list;
use datafusion_physical_optimizer::PhysicalOptimizerRule;
use itertools::izip;

Expand All @@ -41,7 +42,7 @@ use itertools::izip;
/// are not satisfied by their children.
/// 2. Plans that use pipeline-breaking operators on infinite input(s),
/// it is impossible to execute such queries (they will never generate output nor finish)
#[derive(Default)]
#[derive(Default, Debug)]
pub struct SanityCheckPlan {}

impl SanityCheckPlan {
Expand Down Expand Up @@ -130,9 +131,9 @@ pub fn check_plan_sanity(
if !child_eq_props.ordering_satisfy_requirement(sort_req) {
let plan_str = get_plan_string(&plan);
return plan_err!(
"Plan: {:?} does not satisfy order requirements: {:?}. Child-{} order: {:?}",
"Plan: {:?} does not satisfy order requirements: {}. Child-{} order: {}",
plan_str,
sort_req,
format_physical_sort_requirement_list(sort_req),
idx,
child_eq_props.oeq_class
);
Expand All @@ -145,7 +146,7 @@ pub fn check_plan_sanity(
{
let plan_str = get_plan_string(&plan);
return plan_err!(
"Plan: {:?} does not satisfy distribution requirements: {:?}. Child-{} output partitioning: {:?}",
"Plan: {:?} does not satisfy distribution requirements: {}. Child-{} output partitioning: {}",
plan_str,
dist_req,
idx,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ use datafusion_physical_plan::{
/// This rule analyzes aggregate expressions of type `Beneficial` to see whether
/// their input ordering requirements are satisfied. If this is the case, the
/// aggregators are modified to run in a more efficient mode.
#[derive(Default)]
#[derive(Default, Debug)]
pub struct OptimizeAggregateOrder {}

impl OptimizeAggregateOrder {
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/src/test/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,7 @@ pub fn csv_exec_sorted(
}

// construct a stream partition for test purposes
#[derive(Debug)]
pub(crate) struct TestStreamPartition {
pub schema: SchemaRef,
}
Expand Down
2 changes: 2 additions & 0 deletions datafusion/core/src/test_util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ pub fn populate_csv_partitions(
}

/// TableFactory for tests
#[derive(Default, Debug)]
pub struct TestTableFactory {}

#[async_trait]
Expand All @@ -191,6 +192,7 @@ impl TableProviderFactory for TestTableFactory {
}

/// TableProvider for testing purposes
#[derive(Debug)]
pub struct TestTableProvider {
/// URL of table files or folder
pub url: String,
Expand Down
2 changes: 2 additions & 0 deletions datafusion/core/tests/custom_sources_cases/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,9 @@ macro_rules! TEST_CUSTOM_RECORD_BATCH {

//--- Custom source dataframe tests ---//

#[derive(Debug)]
struct CustomTableProvider;

#[derive(Debug, Clone)]
struct CustomExecutionPlan {
projection: Option<Vec<usize>>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ impl ExecutionPlan for CustomPlan {
}
}

#[derive(Clone)]
#[derive(Clone, Debug)]
struct CustomProvider {
zero_batch: RecordBatch,
one_batch: RecordBatch,
Expand Down
Loading

0 comments on commit 72bea9d

Please sign in to comment.