diff --git a/crates/aws/src/credentials.rs b/crates/aws/src/credentials.rs index 2a10d2825b..06fbf24654 100644 --- a/crates/aws/src/credentials.rs +++ b/crates/aws/src/credentials.rs @@ -4,7 +4,7 @@ use std::collections::HashMap; use std::str::FromStr; use std::sync::Arc; -use std::time::{Duration, SystemTime}; +use std::time::SystemTime; use aws_config::default_provider::credentials::DefaultCredentialsChain; use aws_config::meta::credentials::CredentialsProviderChain; diff --git a/crates/core/src/delta_datafusion/expr.rs b/crates/core/src/delta_datafusion/expr.rs index 54cebcfbd2..a421400791 100644 --- a/crates/core/src/delta_datafusion/expr.rs +++ b/crates/core/src/delta_datafusion/expr.rs @@ -28,7 +28,8 @@ use arrow_schema::{DataType, Field}; use chrono::{DateTime, NaiveDate}; use datafusion::execution::context::SessionState; use datafusion::execution::session_state::SessionStateBuilder; -use datafusion::functions_array::make_array::MakeArray; +use datafusion::functions_nested::make_array::MakeArray; +use datafusion::functions_nested::planner::{FieldAccessPlanner, NestedFunctionPlanner}; use datafusion_common::Result as DFResult; use datafusion_common::{config::ConfigOptions, DFSchema, Result, ScalarValue, TableReference}; use datafusion_expr::expr::InList; @@ -104,6 +105,7 @@ impl ScalarUDFImpl for MakeParquetArray { data_type = arg.data_type(); } + #[allow(deprecated)] match self.actual.invoke(args)? { ColumnarValue::Scalar(ScalarValue::List(df_array)) => { let field = Arc::new(Field::new("element", data_type, true)); @@ -126,7 +128,7 @@ impl ScalarUDFImpl for MakeParquetArray { } fn invoke_no_args(&self, number_rows: usize) -> Result { - self.actual.invoke_no_args(number_rows) + self.actual.invoke_batch(&[], number_rows) } fn aliases(&self) -> &[String] { @@ -142,9 +144,7 @@ impl ScalarUDFImpl for MakeParquetArray { } } -use datafusion::functions_array::planner::{FieldAccessPlanner, NestedFunctionPlanner}; - -/// This exists becxause the NestedFunctionPlanner _not_ the UserDefinedFunctionPlanner handles the +/// This exists because the NestedFunctionPlanner, _not_ the UserDefinedFunctionPlanner, handles the /// insertion of "make_array" which is used to turn [100] into List /// /// **screaming intensifies** @@ -505,7 +505,7 @@ impl<'a> fmt::Display for ScalarValueFormat<'a> { ScalarValue::Date32(e) => match e { Some(e) => write!( f, - "{}", + "'{}'::date", NaiveDate::from_num_days_from_ce_opt(EPOCH_DAYS_FROM_CE + (*e)).ok_or(Error)? )?, None => write!(f, "NULL")?, @@ -567,8 +567,8 @@ impl<'a> fmt::Display for ScalarValueFormat<'a> { #[cfg(test)] mod test { use arrow_schema::DataType as ArrowDataType; - use datafusion::functions_array::expr_fn::cardinality; use datafusion::functions_nested::expr_ext::{IndexAccessor, SliceAccessor}; + use datafusion::functions_nested::expr_fn::cardinality; use datafusion::prelude::SessionContext; use datafusion_common::{Column, ScalarValue, ToDFSchema}; use datafusion_expr::expr::ScalarFunction; @@ -875,6 +875,18 @@ mod test { ) )), }, + ParseTest { + expr: col("_date").eq(lit(ScalarValue::Date32(Some(18262)))), + expected: "_date = '2020-01-01'::date".to_string(), + override_expected_expr: Some(col("_date").eq( + Expr::Cast( + Cast { + expr: Box::from(lit("2020-01-01")), + data_type: arrow_schema::DataType::Date32 + } + ) + )), + }, ]; let session: SessionContext = DeltaSessionContext::default().into(); diff --git a/crates/core/src/writer/stats.rs b/crates/core/src/writer/stats.rs index ae763f7b72..984754d510 100644 --- a/crates/core/src/writer/stats.rs +++ b/crates/core/src/writer/stats.rs @@ -252,9 +252,9 @@ impl StatsScalar { macro_rules! get_stat { ($val: expr) => { if use_min { - *$val.min() + *$val.min_opt().unwrap() } else { - *$val.max() + *$val.max_opt().unwrap() } }; } @@ -304,10 +304,11 @@ impl StatsScalar { (Statistics::Double(v), _) => Ok(Self::Float64(get_stat!(v))), (Statistics::ByteArray(v), logical_type) => { let bytes = if use_min { - v.min_bytes() + v.min_bytes_opt() } else { - v.max_bytes() - }; + v.max_bytes_opt() + } + .unwrap_or_default(); match logical_type { None => Ok(Self::Bytes(bytes.to_vec())), Some(LogicalType::String) => { @@ -326,10 +327,11 @@ impl StatsScalar { } (Statistics::FixedLenByteArray(v), Some(LogicalType::Decimal { scale, precision })) => { let val = if use_min { - v.min_bytes() + v.min_bytes_opt() } else { - v.max_bytes() - }; + v.max_bytes_opt() + } + .unwrap_or_default(); let val = if val.len() <= 16 { i128::from_be_bytes(sign_extend_be(val)) as f64 @@ -356,10 +358,11 @@ impl StatsScalar { } (Statistics::FixedLenByteArray(v), Some(LogicalType::Uuid)) => { let val = if use_min { - v.min_bytes() + v.min_bytes_opt() } else { - v.max_bytes() - }; + v.max_bytes_opt() + } + .unwrap_or_default(); if val.len() != 16 { return Err(DeltaWriterError::StatsParsingFailed { @@ -432,8 +435,8 @@ struct AggregatedStats { impl From<(&Statistics, &Option)> for AggregatedStats { fn from(value: (&Statistics, &Option)) -> Self { let (stats, logical_type) = value; - let null_count = stats.null_count(); - if stats.has_min_max_set() { + let null_count = stats.null_count_opt().unwrap_or_default(); + if stats.min_bytes_opt().is_some() && stats.max_bytes_opt().is_some() { let min = StatsScalar::try_from_stats(stats, logical_type, true).ok(); let max = StatsScalar::try_from_stats(stats, logical_type, false).ok(); Self { diff --git a/crates/test/src/datafusion.rs b/crates/test/src/datafusion.rs index f6357ab3b7..5ca73a742e 100644 --- a/crates/test/src/datafusion.rs +++ b/crates/test/src/datafusion.rs @@ -7,7 +7,7 @@ use std::sync::Arc; pub fn context_with_delta_table_factory() -> SessionContext { let cfg = RuntimeConfig::new(); - let env = RuntimeEnv::new(cfg).unwrap(); + let env = RuntimeEnv::try_new(cfg).unwrap(); let ses = SessionConfig::new(); let mut state = SessionStateBuilder::new() .with_config(ses) diff --git a/python/src/features.rs b/python/src/features.rs index 155f7aa365..400cb91b33 100644 --- a/python/src/features.rs +++ b/python/src/features.rs @@ -2,8 +2,8 @@ use deltalake::kernel::TableFeatures as KernelTableFeatures; use pyo3::pyclass; /// High level table features -#[pyclass] -#[derive(Clone)] +#[pyclass(eq, eq_int)] +#[derive(Clone, PartialEq)] pub enum TableFeatures { /// Mapping of one column to another ColumnMapping, diff --git a/python/src/filesystem.rs b/python/src/filesystem.rs index 453d05e480..116b1b0cf1 100644 --- a/python/src/filesystem.rs +++ b/python/src/filesystem.rs @@ -503,10 +503,12 @@ impl ObjectInputFile { Err(PyNotImplementedError::new_err("'truncate' not implemented")) } + #[pyo3(signature = (_size=None))] fn readline(&self, _size: Option) -> PyResult<()> { Err(PyNotImplementedError::new_err("'readline' not implemented")) } + #[pyo3(signature = (_hint=None))] fn readlines(&self, _hint: Option) -> PyResult<()> { Err(PyNotImplementedError::new_err( "'readlines' not implemented", @@ -666,10 +668,12 @@ impl ObjectOutputStream { Err(PyNotImplementedError::new_err("'truncate' not implemented")) } + #[pyo3(signature = (_size=None))] fn readline(&self, _size: Option) -> PyResult<()> { Err(PyNotImplementedError::new_err("'readline' not implemented")) } + #[pyo3(signature = (_hint=None))] fn readlines(&self, _hint: Option) -> PyResult<()> { Err(PyNotImplementedError::new_err( "'readlines' not implemented", diff --git a/python/src/lib.rs b/python/src/lib.rs index 5a9a3ce237..304faa00c7 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -287,6 +287,7 @@ impl RawDeltaTable { }) } + #[pyo3(signature = (partition_filters=None))] pub fn files( &self, py: Python, @@ -316,6 +317,7 @@ impl RawDeltaTable { }) } + #[pyo3(signature = (partition_filters=None))] pub fn file_uris( &self, partition_filters: Option>, @@ -828,6 +830,7 @@ impl RawDeltaTable { } /// Run the History command on the Delta Table: Returns provenance information, including the operation, user, and so on, for each write to a table. + #[pyo3(signature = (limit=None))] pub fn history(&mut self, limit: Option) -> PyResult> { let history = rt() .block_on(self._table.history(limit)) @@ -845,6 +848,7 @@ impl RawDeltaTable { .map_err(PythonError::from)?) } + #[pyo3(signature = (schema, partition_filters=None))] pub fn dataset_partitions<'py>( &mut self, py: Python<'py>, @@ -876,6 +880,7 @@ impl RawDeltaTable { .collect() } + #[pyo3(signature = (partitions_filters=None))] fn get_active_partitions<'py>( &self, partitions_filters: Option>, @@ -969,6 +974,7 @@ impl RawDeltaTable { } #[allow(clippy::too_many_arguments)] + #[pyo3(signature = (add_actions, mode, partition_by, schema, partitions_filters=None, commit_properties=None, post_commithook_properties=None))] fn create_write_transaction( &mut self, py: Python, @@ -1431,7 +1437,7 @@ fn scalar_to_py<'py>(value: &Scalar, py_date: &Bound<'py, PyAny>) -> PyResult val.to_object(py), Timestamp(_) => { // We need to manually append 'Z' add to end so that pyarrow can cast the - // the scalar value to pa.timestamp("us","UTC") + // scalar value to pa.timestamp("us","UTC") let value = value.serialize(); format!("{}Z", value).to_object(py) } @@ -1453,7 +1459,7 @@ fn scalar_to_py<'py>(value: &Scalar, py_date: &Bound<'py, PyAny>) -> PyResult todo!("how should this be converted!"), + Array(_val) => todo!("how should this be converted!"), }; Ok(val.into_bound(py)) @@ -1747,6 +1753,7 @@ pub struct PyCommitProperties { #[pyfunction] #[allow(clippy::too_many_arguments)] +#[pyo3(signature = (table_uri, data, mode, table=None, schema_mode=None, partition_by=None, predicate=None, target_file_size=None, name=None, description=None, configuration=None, storage_options=None, writer_properties=None, commit_properties=None, post_commithook_properties=None))] fn write_to_deltalake( py: Python, table_uri: String, @@ -1828,6 +1835,7 @@ fn write_to_deltalake( #[pyfunction] #[allow(clippy::too_many_arguments)] +#[pyo3(signature = (table_uri, schema, partition_by, mode, raise_if_key_not_exists, name=None, description=None, configuration=None, storage_options=None, custom_metadata=None))] fn create_deltalake( py: Python, table_uri: String, @@ -1884,6 +1892,7 @@ fn create_deltalake( #[pyfunction] #[allow(clippy::too_many_arguments)] +#[pyo3(signature = (table_uri, schema, add_actions, _mode, partition_by, name=None, description=None, configuration=None, storage_options=None, custom_metadata=None))] fn write_new_deltalake( py: Python, table_uri: String, @@ -1938,6 +1947,7 @@ fn write_new_deltalake( #[pyfunction] #[allow(clippy::too_many_arguments)] +#[pyo3(signature = (uri, partition_schema=None, partition_strategy=None, name=None, description=None, configuration=None, storage_options=None, custom_metadata=None))] fn convert_to_deltalake( py: Python, uri: String, @@ -1992,6 +2002,7 @@ fn convert_to_deltalake( } #[pyfunction] +#[pyo3(signature = (table=None, configuration=None))] fn get_num_idx_cols_and_stats_columns( table: Option<&RawDeltaTable>, configuration: Option>>, diff --git a/python/tests/test_merge.py b/python/tests/test_merge.py index 2306e1668a..8645be538b 100644 --- a/python/tests/test_merge.py +++ b/python/tests/test_merge.py @@ -805,7 +805,10 @@ def test_merge_date_partitioned_2344(tmp_path: pathlib.Path): assert last_action["operation"] == "MERGE" assert result == data - assert last_action["operationParameters"].get("predicate") == "2022-02-01 = date" + assert ( + last_action["operationParameters"].get("predicate") + == "'2022-02-01'::date = date" + ) @pytest.mark.parametrize(