Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/main' into quote-col-name
Browse files Browse the repository at this point in the history
  • Loading branch information
korowa committed Mar 18, 2023
2 parents 12b1048 + 3ccf1ae commit ad38bfa
Show file tree
Hide file tree
Showing 23 changed files with 1,807 additions and 449 deletions.
337 changes: 232 additions & 105 deletions datafusion-cli/Cargo.lock

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions datafusion/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,14 @@ pyarrow = ["pyo3", "arrow/pyarrow"]
[dependencies]
apache-avro = { version = "0.14", default-features = false, features = ["snappy"], optional = true }
arrow = { workspace = true, default-features = false }
arrow-array = { version = "35.0.0", default-features = false, features = ["chrono-tz"] }
chrono = { version = "0.4", default-features = false }
cranelift-module = { version = "0.92.0", optional = true }
num_cpus = "1.13.0"
object_store = { version = "0.5.4", default-features = false, optional = true }
parquet = { workspace = true, default-features = false, optional = true }
pyo3 = { version = "0.18.0", optional = true }
sqlparser = "0.32"

[dev-dependencies]
rand = "0.8.4"
941 changes: 932 additions & 9 deletions datafusion/common/src/scalar.rs

Large diffs are not rendered by default.

88 changes: 71 additions & 17 deletions datafusion/common/src/table_reference.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,38 @@ impl<'a> std::fmt::Display for ResolvedTableReference<'a> {
}
}

/// Represents a path to a table that may require further resolution
/// [`TableReference`]s represent a multi part identifier (path) to a
/// table that may require further resolution.
///
/// # Creating [`TableReference`]
///
/// When converting strings to [`TableReference`]s, the string is
/// parsed as though it were a SQL identifier, normalizing (convert to
/// lowercase) any unquoted identifiers.
///
/// See [`TableReference::bare`] to create references without applying
/// normalization semantics
///
/// # Examples
/// ```
/// # use datafusion_common::TableReference;
/// // Get a table reference to 'mytable'
/// let table_reference = TableReference::from("mytable");
/// assert_eq!(table_reference, TableReference::bare("mytable"));
///
/// // Get a table reference to 'mytable' (note the capitalization)
/// let table_reference = TableReference::from("MyTable");
/// assert_eq!(table_reference, TableReference::bare("mytable"));
///
/// // Get a table reference to 'MyTable' (note the capitalization) using double quotes
/// // (programatically it is better to use `TableReference::bare` for this)
/// let table_reference = TableReference::from(r#""MyTable""#);
/// assert_eq!(table_reference, TableReference::bare("MyTable"));
///
/// // Get a table reference to 'myschema.mytable' (note the capitalization)
/// let table_reference = TableReference::from("MySchema.MyTable");
/// assert_eq!(table_reference, TableReference::partial("myschema", "mytable"));
///```
#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub enum TableReference<'a> {
/// An unqualified table reference, e.g. "table"
Expand All @@ -61,6 +92,16 @@ pub enum TableReference<'a> {
},
}

/// This is a [`TableReference`] that has 'static lifetime (aka it
/// owns the underlying string)
///
/// To convert a [`TableReference`] to an [`OwnedTableReference`], use
///
/// ```
/// # use datafusion_common::{OwnedTableReference, TableReference};
/// let table_reference = TableReference::from("mytable");
/// let owned_reference = table_reference.to_owned_reference();
/// ```
pub type OwnedTableReference = TableReference<'static>;

impl std::fmt::Display for TableReference<'_> {
Expand All @@ -85,14 +126,20 @@ impl<'a> TableReference<'a> {
None
}

/// Convenience method for creating a `Bare` variant of `TableReference`
/// Convenience method for creating a [`TableReference::Bare`]
///
/// As described on [`TableReference`] this does *NO* parsing at
/// all, so "Foo.Bar" stays as a reference to the table named
/// "Foo.Bar" (rather than "foo"."bar")
pub fn bare(table: impl Into<Cow<'a, str>>) -> TableReference<'a> {
TableReference::Bare {
table: table.into(),
}
}

/// Convenience method for creating a `Partial` variant of `TableReference`
/// Convenience method for creating a [`TableReference::Partial`].
///
/// As described on [`TableReference`] this does *NO* parsing at all.
pub fn partial(
schema: impl Into<Cow<'a, str>>,
table: impl Into<Cow<'a, str>>,
Expand All @@ -103,7 +150,9 @@ impl<'a> TableReference<'a> {
}
}

/// Convenience method for creating a `Full` variant of `TableReference`
/// Convenience method for creating a [`TableReference::Full`]
///
/// As described on [`TableReference`] this does *NO* parsing at all.
pub fn full(
catalog: impl Into<Cow<'a, str>>,
schema: impl Into<Cow<'a, str>>,
Expand Down Expand Up @@ -141,12 +190,12 @@ impl<'a> TableReference<'a> {
}
}

/// Compare with another `TableReference` as if both are resolved.
/// Compare with another [`TableReference`] as if both are resolved.
/// This allows comparing across variants, where if a field is not present
/// in both variants being compared then it is ignored in the comparison.
///
/// e.g. this allows a `TableReference::Bare` to be considered equal to a
/// fully qualified `TableReference::Full` if the table names match.
/// e.g. this allows a [`TableReference::Bare`] to be considered equal to a
/// fully qualified [`TableReference::Full`] if the table names match.
pub fn resolved_eq(&self, other: &Self) -> bool {
match self {
TableReference::Bare { table } => table == other.table(),
Expand Down Expand Up @@ -194,7 +243,8 @@ impl<'a> TableReference<'a> {
}
}

/// Converts directly into an [`OwnedTableReference`]
/// Converts directly into an [`OwnedTableReference`] by cloning
/// the underlying data.
pub fn to_owned_reference(&self) -> OwnedTableReference {
match self {
Self::Full {
Expand All @@ -217,6 +267,16 @@ impl<'a> TableReference<'a> {
}

/// Forms a string where the identifiers are quoted
///
/// # Example
/// ```
/// # use datafusion_common::TableReference;
/// let table_reference = TableReference::partial("myschema", "mytable");
/// assert_eq!(table_reference.to_quoted_string(), r#""myschema"."mytable""#);
///
/// let table_reference = TableReference::partial("MySchema", "MyTable");
/// assert_eq!(table_reference.to_quoted_string(), r#""MySchema"."MyTable""#);
/// ```
pub fn to_quoted_string(&self) -> String {
match self {
TableReference::Bare { table } => quote_identifier(table),
Expand All @@ -236,14 +296,8 @@ impl<'a> TableReference<'a> {
}
}

/// Forms a [`TableReference`] by attempting to parse `s` as a multipart identifier,
/// failing that then taking the entire unnormalized input as the identifier itself.
///
/// Will normalize (convert to lowercase) any unquoted identifiers.
///
/// e.g. `Foo` will be parsed as `foo`, and `"Foo"".bar"` will be parsed as
/// `Foo".bar` (note the preserved case and requiring two double quotes to represent
/// a single double quote in the identifier)
/// Forms a [`TableReference`] by parsing `s` as a multipart SQL
/// identifier. See docs on [`TableReference`] for more details.
pub fn parse_str(s: &'a str) -> Self {
let mut parts = parse_identifiers_normalized(s);

Expand All @@ -265,7 +319,7 @@ impl<'a> TableReference<'a> {
}
}

/// Parse a `String` into a OwnedTableReference
/// Parse a `String` into a OwnedTableReference as a multipart SQL identifier.
impl From<String> for OwnedTableReference {
fn from(s: String) -> Self {
TableReference::parse_str(&s).to_owned_reference()
Expand Down
185 changes: 92 additions & 93 deletions datafusion/core/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -344,91 +344,70 @@ impl DataFrame {
//collect recordBatch
let describe_record_batch = vec![
// count aggregation
self.clone()
.aggregate(
vec![],
original_schema_fields
.clone()
.map(|f| count(col(f.name())).alias(f.name()))
.collect::<Vec<_>>(),
)?
.collect()
.await?,
self.clone().aggregate(
vec![],
original_schema_fields
.clone()
.map(|f| count(col(f.name())).alias(f.name()))
.collect::<Vec<_>>(),
),
// null_count aggregation
self.clone()
.aggregate(
vec![],
original_schema_fields
.clone()
.map(|f| count(is_null(col(f.name()))).alias(f.name()))
.collect::<Vec<_>>(),
)?
.collect()
.await?,
self.clone().aggregate(
vec![],
original_schema_fields
.clone()
.map(|f| count(is_null(col(f.name()))).alias(f.name()))
.collect::<Vec<_>>(),
),
// mean aggregation
self.clone()
.aggregate(
vec![],
original_schema_fields
.clone()
.filter(|f| f.data_type().is_numeric())
.map(|f| avg(col(f.name())).alias(f.name()))
.collect::<Vec<_>>(),
)?
.collect()
.await?,
self.clone().aggregate(
vec![],
original_schema_fields
.clone()
.filter(|f| f.data_type().is_numeric())
.map(|f| avg(col(f.name())).alias(f.name()))
.collect::<Vec<_>>(),
),
// std aggregation
self.clone()
.aggregate(
vec![],
original_schema_fields
.clone()
.filter(|f| f.data_type().is_numeric())
.map(|f| stddev(col(f.name())).alias(f.name()))
.collect::<Vec<_>>(),
)?
.collect()
.await?,
self.clone().aggregate(
vec![],
original_schema_fields
.clone()
.filter(|f| f.data_type().is_numeric())
.map(|f| stddev(col(f.name())).alias(f.name()))
.collect::<Vec<_>>(),
),
// min aggregation
self.clone()
.aggregate(
vec![],
original_schema_fields
.clone()
.filter(|f| {
!matches!(f.data_type(), DataType::Binary | DataType::Boolean)
})
.map(|f| min(col(f.name())).alias(f.name()))
.collect::<Vec<_>>(),
)?
.collect()
.await?,
self.clone().aggregate(
vec![],
original_schema_fields
.clone()
.filter(|f| {
!matches!(f.data_type(), DataType::Binary | DataType::Boolean)
})
.map(|f| min(col(f.name())).alias(f.name()))
.collect::<Vec<_>>(),
),
// max aggregation
self.clone()
.aggregate(
vec![],
original_schema_fields
.clone()
.filter(|f| {
!matches!(f.data_type(), DataType::Binary | DataType::Boolean)
})
.map(|f| max(col(f.name())).alias(f.name()))
.collect::<Vec<_>>(),
)?
.collect()
.await?,
self.clone().aggregate(
vec![],
original_schema_fields
.clone()
.filter(|f| {
!matches!(f.data_type(), DataType::Binary | DataType::Boolean)
})
.map(|f| max(col(f.name())).alias(f.name()))
.collect::<Vec<_>>(),
),
// median aggregation
self.clone()
.aggregate(
vec![],
original_schema_fields
.clone()
.filter(|f| f.data_type().is_numeric())
.map(|f| median(col(f.name())).alias(f.name()))
.collect::<Vec<_>>(),
)?
.collect()
.await?,
self.clone().aggregate(
vec![],
original_schema_fields
.clone()
.filter(|f| f.data_type().is_numeric())
.map(|f| median(col(f.name())).alias(f.name()))
.collect::<Vec<_>>(),
),
];

// first column with function names
Expand All @@ -437,24 +416,44 @@ impl DataFrame {
))];
for field in original_schema_fields {
let mut array_datas = vec![];
for record_batch in describe_record_batch.iter() {
// safe unwrap since aggregate record batches should have at least 1 record
let column = record_batch.get(0).unwrap().column_by_name(field.name());
match column {
Some(c) => {
if field.data_type().is_numeric() {
array_datas.push(cast(c, &DataType::Float64)?);
} else {
array_datas.push(cast(c, &DataType::Utf8)?);
for result in describe_record_batch.iter() {
let array_ref = match result {
Ok(df) => {
let batchs = df.clone().collect().await;
match batchs {
Ok(batchs)
if batchs.len() == 1
&& batchs[0]
.column_by_name(field.name())
.is_some() =>
{
let column =
batchs[0].column_by_name(field.name()).unwrap();
if field.data_type().is_numeric() {
cast(column, &DataType::Float64)?
} else {
cast(column, &DataType::Utf8)?
}
}
_ => Arc::new(StringArray::from_slice(["null"])),
}
}
//if None mean the column cannot be min/max aggregation
None => {
array_datas.push(Arc::new(StringArray::from_slice(["null"])));
//Handling error when only boolean/binary column, and in other cases
Err(err)
if err.to_string().contains(
"Error during planning: \
Aggregate requires at least one grouping \
or aggregate expression",
) =>
{
Arc::new(StringArray::from_slice(["null"]))
}
}
Err(other_err) => {
panic!("{other_err}")
}
};
array_datas.push(array_ref);
}

array_ref_vec.push(concat(
array_datas
.iter()
Expand Down
Loading

0 comments on commit ad38bfa

Please sign in to comment.