From 60e419e83f2f9f9c0b93585080564abf46edffe5 Mon Sep 17 00:00:00 2001 From: helanto Date: Sun, 25 Aug 2024 17:23:08 +0300 Subject: [PATCH 1/6] chore: set max_retries in CommitProperties --- crates/core/src/operations/transaction/mod.rs | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/crates/core/src/operations/transaction/mod.rs b/crates/core/src/operations/transaction/mod.rs index 6a601e0b73..dfce2eb4a8 100644 --- a/crates/core/src/operations/transaction/mod.rs +++ b/crates/core/src/operations/transaction/mod.rs @@ -346,6 +346,15 @@ impl CommitProperties { self } + /// Specify maximum number of times to retry the transaction before failing to commit + pub fn with_max_retries( + mut self, + max_retries: usize, + ) -> Self { + self.max_retries = max_retries; + self + } + /// Specify if it should create a checkpoint when the commit interval condition is met pub fn with_create_checkpoint(mut self, create_checkpoint: bool) -> Self { self.create_checkpoint = create_checkpoint; From a5c4937557ea0d11d10b6b25ae78e1cb6ffe2066 Mon Sep 17 00:00:00 2001 From: helanto Date: Wed, 28 Aug 2024 18:40:31 +0300 Subject: [PATCH 2/6] chore: format Rust code --- crates/core/src/operations/transaction/mod.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/crates/core/src/operations/transaction/mod.rs b/crates/core/src/operations/transaction/mod.rs index dfce2eb4a8..6c4e81dc63 100644 --- a/crates/core/src/operations/transaction/mod.rs +++ b/crates/core/src/operations/transaction/mod.rs @@ -347,10 +347,7 @@ impl CommitProperties { } /// Specify maximum number of times to retry the transaction before failing to commit - pub fn with_max_retries( - mut self, - max_retries: usize, - ) -> Self { + pub fn with_max_retries(mut self, max_retries: usize) -> Self { self.max_retries = max_retries; self } From f4123f9d2ed8d9d95de83fe32c197a6f12928470 Mon Sep 17 00:00:00 2001 From: helanto Date: Thu, 29 Aug 2024 14:41:58 +0300 Subject: [PATCH 3/6] feat: Python binding support --- python/deltalake/_internal.pyi | 14 ++++ python/deltalake/table.py | 43 +++++++++-- python/deltalake/writer.py | 7 ++ python/src/lib.rs | 133 +++++++++++++++++++++++---------- python/src/merge.rs | 9 ++- 5 files changed, 157 insertions(+), 49 deletions(-) diff --git a/python/deltalake/_internal.pyi b/python/deltalake/_internal.pyi index ceac16e7f8..6bf80a8a03 100644 --- a/python/deltalake/_internal.pyi +++ b/python/deltalake/_internal.pyi @@ -59,6 +59,7 @@ class RawDeltaTable: enforce_retention_duration: bool, custom_metadata: Optional[Dict[str, str]], post_commithook_properties: Optional[PostCommitHookProperties], + max_commit_retries: Optional[int], ) -> List[str]: ... def compact_optimize( self, @@ -69,6 +70,7 @@ class RawDeltaTable: writer_properties: Optional[WriterProperties], custom_metadata: Optional[Dict[str, str]], post_commithook_properties: Optional[PostCommitHookProperties], + max_commit_retries: Optional[int], ) -> str: ... def z_order_optimize( self, @@ -81,18 +83,21 @@ class RawDeltaTable: writer_properties: Optional[WriterProperties], custom_metadata: Optional[Dict[str, str]], post_commithook_properties: Optional[PostCommitHookProperties], + max_commit_retries: Optional[int], ) -> str: ... def add_columns( self, fields: List[Field], custom_metadata: Optional[Dict[str, str]], post_commithook_properties: Optional[PostCommitHookProperties], + max_commit_retries: Optional[int], ) -> None: ... def add_constraints( self, constraints: Dict[str, str], custom_metadata: Optional[Dict[str, str]], post_commithook_properties: Optional[PostCommitHookProperties], + max_commit_retries: Optional[int], ) -> None: ... def drop_constraints( self, @@ -100,12 +105,14 @@ class RawDeltaTable: raise_if_not_exists: bool, custom_metadata: Optional[Dict[str, str]], post_commithook_properties: Optional[PostCommitHookProperties], + max_commit_retries: Optional[int], ) -> None: ... def set_table_properties( self, properties: Dict[str, str], raise_if_not_exists: bool, custom_metadata: Optional[Dict[str, str]], + max_commit_retries: Optional[int], ) -> None: ... def restore( self, @@ -113,6 +120,7 @@ class RawDeltaTable: ignore_missing_files: bool, protocol_downgrade_allowed: bool, custom_metadata: Optional[Dict[str, str]], + max_commit_retries: Optional[int], ) -> str: ... def history(self, limit: Optional[int]) -> List[str]: ... def update_incremental(self) -> None: ... @@ -127,12 +135,14 @@ class RawDeltaTable: writer_properties: Optional[WriterProperties], custom_metadata: Optional[Dict[str, str]], post_commithook_properties: Optional[PostCommitHookProperties], + max_commit_retries: Optional[int], ) -> str: ... def repair( self, dry_run: bool, custom_metadata: Optional[Dict[str, str]], post_commithook_properties: Optional[PostCommitHookProperties], + max_commit_retries: Optional[int], ) -> str: ... def update( self, @@ -142,6 +152,7 @@ class RawDeltaTable: safe_cast: bool, custom_metadata: Optional[Dict[str, str]], post_commithook_properties: Optional[PostCommitHookProperties], + max_commit_retries: Optional[int], ) -> str: ... def create_merge_builder( self, @@ -153,6 +164,7 @@ class RawDeltaTable: custom_metadata: Optional[Dict[str, str]], post_commithook_properties: Optional[PostCommitHookProperties], safe_cast: bool, + max_commit_retries: Optional[int], ) -> PyMergeBuilder: ... def merge_execute(self, merge_builder: PyMergeBuilder) -> str: ... def get_active_partitions( @@ -167,6 +179,7 @@ class RawDeltaTable: partitions_filters: Optional[FilterType], custom_metadata: Optional[Dict[str, str]], post_commithook_properties: Optional[PostCommitHookProperties], + max_commit_retries: Optional[int], ) -> None: ... def cleanup_metadata(self) -> None: ... def check_can_write_timestamp_ntz(self, schema: pyarrow.Schema) -> None: ... @@ -208,6 +221,7 @@ def write_to_deltalake( writer_properties: Optional[WriterProperties], custom_metadata: Optional[Dict[str, str]], post_commithook_properties: Optional[PostCommitHookProperties], + max_commit_retries: Optional[int], ) -> None: ... def convert_to_deltalake( uri: str, diff --git a/python/deltalake/table.py b/python/deltalake/table.py index eefafc8d9e..cce5f5bcbd 100644 --- a/python/deltalake/table.py +++ b/python/deltalake/table.py @@ -744,6 +744,7 @@ def vacuum( enforce_retention_duration: bool = True, custom_metadata: Optional[Dict[str, str]] = None, post_commithook_properties: Optional[PostCommitHookProperties] = None, + max_commit_retries: Optional[int] = None, ) -> List[str]: """ Run the Vacuum command on the Delta Table: list and delete files no longer referenced by the Delta table and are older than the retention threshold. @@ -754,6 +755,7 @@ def vacuum( enforce_retention_duration: when disabled, accepts retention hours smaller than the value from `delta.deletedFileRetentionDuration`. custom_metadata: custom metadata that will be added to the transaction commit. post_commithook_properties: properties for the post commit hook. If None, default values are used. + max_commit_retries: maximum number of times to retry the transaction commit. Returns: the list of files no longer referenced by the Delta Table and are older than the retention threshold. """ @@ -767,6 +769,7 @@ def vacuum( enforce_retention_duration, custom_metadata, post_commithook_properties, + max_commit_retries, ) def update( @@ -780,6 +783,7 @@ def update( error_on_type_mismatch: bool = True, custom_metadata: Optional[Dict[str, str]] = None, post_commithook_properties: Optional[PostCommitHookProperties] = None, + max_commit_retries: Optional[int] = None, ) -> Dict[str, Any]: """`UPDATE` records in the Delta Table that matches an optional predicate. Either updates or new_values needs to be passed for it to execute. @@ -792,6 +796,7 @@ def update( error_on_type_mismatch: specify if update will return error if data types are mismatching :default = True custom_metadata: custom metadata that will be added to the transaction commit. post_commithook_properties: properties for the post commit hook. If None, default values are used. + max_commit_retries: maximum number of times to retry the transaction commit. Returns: the metrics from update @@ -871,6 +876,7 @@ def update( safe_cast=not error_on_type_mismatch, custom_metadata=custom_metadata, post_commithook_properties=post_commithook_properties, + max_commit_retries=max_commit_retries, ) return json.loads(metrics) @@ -913,6 +919,7 @@ def merge( large_dtypes: Optional[bool] = None, custom_metadata: Optional[Dict[str, str]] = None, post_commithook_properties: Optional[PostCommitHookProperties] = None, + max_commit_retries: Optional[int] = None, ) -> "TableMerger": """Pass the source data which you want to merge on the target delta table, providing a predicate in SQL query like format. You can also specify on what to do when the underlying data types do not @@ -929,6 +936,7 @@ def merge( arrow_schema_conversion_mode: Large converts all types of data schema into Large Arrow types, passthrough keeps string/binary/list types untouched custom_metadata: custom metadata that will be added to the transaction commit. post_commithook_properties: properties for the post commit hook. If None, default values are used. + max_commit_retries: maximum number of times to retry the transaction commit. Returns: TableMerger: TableMerger Object @@ -978,6 +986,7 @@ def merge( writer_properties=writer_properties, custom_metadata=custom_metadata, post_commithook_properties=post_commithook_properties, + max_commit_retries=max_commit_retries, ) return TableMerger(py_merge_builder, self._table) @@ -988,6 +997,7 @@ def restore( ignore_missing_files: bool = False, protocol_downgrade_allowed: bool = False, custom_metadata: Optional[Dict[str, str]] = None, + max_commit_retries: Optional[int] = None, ) -> Dict[str, Any]: """ Run the Restore command on the Delta Table: restore table to a given version or datetime. @@ -997,6 +1007,7 @@ def restore( ignore_missing_files: whether the operation carry on when some data files missing. protocol_downgrade_allowed: whether the operation when protocol version upgraded. custom_metadata: custom metadata that will be added to the transaction commit. + max_commit_retries: maximum number of times to retry the transaction commit. Returns: the metrics from restore. @@ -1007,6 +1018,7 @@ def restore( ignore_missing_files=ignore_missing_files, protocol_downgrade_allowed=protocol_downgrade_allowed, custom_metadata=custom_metadata, + max_commit_retries=max_commit_retries, ) else: metrics = self._table.restore( @@ -1014,6 +1026,7 @@ def restore( ignore_missing_files=ignore_missing_files, protocol_downgrade_allowed=protocol_downgrade_allowed, custom_metadata=custom_metadata, + max_commit_retries=max_commit_retries, ) return json.loads(metrics) @@ -1242,6 +1255,7 @@ def delete( writer_properties: Optional[WriterProperties] = None, custom_metadata: Optional[Dict[str, str]] = None, post_commithook_properties: Optional[PostCommitHookProperties] = None, + max_commit_retries: Optional[int] = None, ) -> Dict[str, Any]: """Delete records from a Delta Table that statisfy a predicate. @@ -1255,12 +1269,13 @@ def delete( writer_properties: Pass writer properties to the Rust parquet writer. custom_metadata: custom metadata that will be added to the transaction commit. post_commithook_properties: properties for the post commit hook. If None, default values are used. + max_commit_retries: maximum number of times to retry the transaction commit. Returns: the metrics from delete. """ metrics = self._table.delete( - predicate, writer_properties, custom_metadata, post_commithook_properties + predicate, writer_properties, custom_metadata, post_commithook_properties, max_commit_retries ) return json.loads(metrics) @@ -1269,6 +1284,7 @@ def repair( dry_run: bool = False, custom_metadata: Optional[Dict[str, str]] = None, post_commithook_properties: Optional[PostCommitHookProperties] = None, + max_commit_retries: Optional[int] = None, ) -> Dict[str, Any]: """Repair the Delta Table by auditing active files that do not exist in the underlying filesystem and removes them. This can be useful when there are accidental deletions or corrupted files. @@ -1281,6 +1297,7 @@ def repair( dry_run: when activated, list only the files, otherwise add remove actions to transaction log. Defaults to False. custom_metadata: custom metadata that will be added to the transaction commit. post_commithook_properties: properties for the post commit hook. If None, default values are used. + max_commit_retries: maximum number of times to retry the transaction commit. Returns: The metrics from repair (FSCK) action. @@ -1297,7 +1314,7 @@ def repair( ``` """ metrics = self._table.repair( - dry_run, custom_metadata, post_commithook_properties + dry_run, custom_metadata, post_commithook_properties, max_commit_retries ) return json.loads(metrics) @@ -1689,6 +1706,7 @@ def add_columns( fields: Union[DeltaField, List[DeltaField]], custom_metadata: Optional[Dict[str, str]] = None, post_commithook_properties: Optional[PostCommitHookProperties] = None, + max_commit_retries: Optional[int] = None, ) -> None: """Add new columns and/or update the fields of a stuctcolumn @@ -1696,6 +1714,7 @@ def add_columns( fields: fields to merge into schema custom_metadata: custom metadata that will be added to the transaction commit. post_commithook_properties: properties for the post commit hook. If None, default values are used. + max_commit_retries: maximum number of times to retry the transaction commit. Example: ```python @@ -1715,7 +1734,7 @@ def add_columns( fields = [fields] self.table._table.add_columns( - fields, custom_metadata, post_commithook_properties + fields, custom_metadata, post_commithook_properties, max_commit_retries ) def add_constraint( @@ -1723,6 +1742,7 @@ def add_constraint( constraints: Dict[str, str], custom_metadata: Optional[Dict[str, str]] = None, post_commithook_properties: Optional[PostCommitHookProperties] = None, + max_commit_retries: Optional[int] = None, ) -> None: """ Add constraints to the table. Limited to `single constraint` at once. @@ -1731,6 +1751,7 @@ def add_constraint( constraints: mapping of constraint name to SQL-expression to evaluate on write custom_metadata: custom metadata that will be added to the transaction commit. post_commithook_properties: properties for the post commit hook. If None, default values are used. + max_commit_retries: maximum number of times to retry the transaction commit. Example: ```python @@ -1754,7 +1775,7 @@ def add_constraint( ) self.table._table.add_constraints( - constraints, custom_metadata, post_commithook_properties + constraints, custom_metadata, post_commithook_properties, max_commit_retries ) def drop_constraint( @@ -1763,6 +1784,7 @@ def drop_constraint( raise_if_not_exists: bool = True, custom_metadata: Optional[Dict[str, str]] = None, post_commithook_properties: Optional[PostCommitHookProperties] = None, + max_commit_retries: Optional[int] = None, ) -> None: """ Drop constraints from a table. Limited to `single constraint` at once. @@ -1772,6 +1794,7 @@ def drop_constraint( raise_if_not_exists: set if should raise if not exists. custom_metadata: custom metadata that will be added to the transaction commit. post_commithook_properties: properties for the post commit hook. If None, default values are used. + max_commit_retries: maximum number of times to retry the transaction commit. Example: ```python @@ -1793,7 +1816,7 @@ def drop_constraint( ``` """ self.table._table.drop_constraints( - name, raise_if_not_exists, custom_metadata, post_commithook_properties + name, raise_if_not_exists, custom_metadata, post_commithook_properties, max_commit_retries, ) def set_table_properties( @@ -1801,6 +1824,7 @@ def set_table_properties( properties: Dict[str, str], raise_if_not_exists: bool = True, custom_metadata: Optional[Dict[str, str]] = None, + max_commit_retries: Optional[int] = None, ) -> None: """ Set properties from the table. @@ -1809,6 +1833,7 @@ def set_table_properties( properties: properties which set raise_if_not_exists: set if should raise if not exists. custom_metadata: custom metadata that will be added to the transaction commit. + max_commit_retries: maximum number of times to retry the transaction commit. Example: ```python @@ -1826,7 +1851,7 @@ def set_table_properties( ``` """ self.table._table.set_table_properties( - properties, raise_if_not_exists, custom_metadata + properties, raise_if_not_exists, custom_metadata, max_commit_retries ) @@ -1845,6 +1870,7 @@ def compact( writer_properties: Optional[WriterProperties] = None, custom_metadata: Optional[Dict[str, str]] = None, post_commithook_properties: Optional[PostCommitHookProperties] = None, + max_commit_retries: Optional[int] = None, ) -> Dict[str, Any]: """ Compacts small files to reduce the total number of files in the table. @@ -1869,6 +1895,7 @@ def compact( writer_properties: Pass writer properties to the Rust parquet writer. custom_metadata: custom metadata that will be added to the transaction commit. post_commithook_properties: properties for the post commit hook. If None, default values are used. + max_commit_retries: maximum number of times to retry the transaction commit. Returns: the metrics from optimize @@ -1900,6 +1927,7 @@ def compact( writer_properties, custom_metadata, post_commithook_properties, + max_commit_retries, ) self.table.update_incremental() return json.loads(metrics) @@ -1915,6 +1943,7 @@ def z_order( writer_properties: Optional[WriterProperties] = None, custom_metadata: Optional[Dict[str, str]] = None, post_commithook_properties: Optional[PostCommitHookProperties] = None, + max_commit_retries: Optional[int] = None, ) -> Dict[str, Any]: """ Reorders the data using a Z-order curve to improve data skipping. @@ -1937,6 +1966,7 @@ def z_order( writer_properties: Pass writer properties to the Rust parquet writer. custom_metadata: custom metadata that will be added to the transaction commit. post_commithook_properties: properties for the post commit hook. If None, default values are used. + max_commit_retries: maximum number of times to retry the transaction commit. Returns: the metrics from optimize @@ -1970,6 +2000,7 @@ def z_order( writer_properties, custom_metadata, post_commithook_properties, + max_commit_retries, ) self.table.update_incremental() return json.loads(metrics) diff --git a/python/deltalake/writer.py b/python/deltalake/writer.py index e08d9cc9b8..c3cdd92473 100644 --- a/python/deltalake/writer.py +++ b/python/deltalake/writer.py @@ -124,6 +124,7 @@ def write_deltalake( engine: Literal["pyarrow"] = ..., custom_metadata: Optional[Dict[str, str]] = ..., post_commithook_properties: Optional[PostCommitHookProperties] = ..., + max_commit_retries: Optional[int] = ..., ) -> None: ... @@ -153,6 +154,7 @@ def write_deltalake( writer_properties: WriterProperties = ..., custom_metadata: Optional[Dict[str, str]] = ..., post_commithook_properties: Optional[PostCommitHookProperties] = ..., + max_commit_retries: Optional[int] = ..., ) -> None: ... @@ -184,6 +186,7 @@ def write_deltalake( writer_properties: WriterProperties = ..., custom_metadata: Optional[Dict[str, str]] = ..., post_commithook_properties: Optional[PostCommitHookProperties] = ..., + max_commit_retries: Optional[int] = ..., ) -> None: ... @@ -221,6 +224,7 @@ def write_deltalake( writer_properties: Optional[WriterProperties] = None, custom_metadata: Optional[Dict[str, str]] = None, post_commithook_properties: Optional[PostCommitHookProperties] = None, + max_commit_retries: Optional[int] = None, ) -> None: """Write to a Delta Lake table @@ -277,6 +281,7 @@ def write_deltalake( writer_properties: Pass writer properties to the Rust parquet writer. custom_metadata: Custom metadata to add to the commitInfo. post_commithook_properties: properties for the post commit hook. If None, default values are used. + max_commit_retries: maximum number of times to retry the transaction commit. """ table, table_uri = try_get_table_and_table_uri(table_or_uri, storage_options) if table is not None: @@ -319,6 +324,7 @@ def write_deltalake( writer_properties=writer_properties, custom_metadata=custom_metadata, post_commithook_properties=post_commithook_properties, + max_commit_retries=max_commit_retries, ) if table: table.update_incremental() @@ -552,6 +558,7 @@ def validate_batch(batch: pa.RecordBatch) -> pa.RecordBatch: partition_filters, custom_metadata, post_commithook_properties=post_commithook_properties, + max_commit_retries=max_commit_retries, ) table.update_incremental() else: diff --git a/python/src/lib.rs b/python/src/lib.rs index aeb1b3c429..3ed38dd8cd 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -321,7 +321,7 @@ impl RawDeltaTable { /// Run the Vacuum command on the Delta Table: list and delete files no longer referenced /// by the Delta table and are older than the retention threshold. - #[pyo3(signature = (dry_run, retention_hours = None, enforce_retention_duration = true, custom_metadata=None, post_commithook_properties=None))] + #[pyo3(signature = (dry_run, retention_hours = None, enforce_retention_duration = true, custom_metadata=None, post_commithook_properties=None, max_commit_retries=None))] pub fn vacuum( &mut self, py: Python, @@ -330,6 +330,7 @@ impl RawDeltaTable { enforce_retention_duration: bool, custom_metadata: Option>, post_commithook_properties: Option, + max_commit_retries: Option, ) -> PyResult> { let (table, metrics) = py.allow_threads(|| { let mut cmd = VacuumBuilder::new( @@ -342,9 +343,11 @@ impl RawDeltaTable { cmd = cmd.with_retention_period(Duration::hours(retention_period as i64)); } - if let Some(commit_properties) = - maybe_create_commit_properties(custom_metadata, post_commithook_properties) - { + if let Some(commit_properties) = maybe_create_commit_properties( + custom_metadata, + max_commit_retries, + post_commithook_properties, + ) { cmd = cmd.with_commit_properties(commit_properties); } rt().block_on(cmd.into_future()).map_err(PythonError::from) @@ -354,7 +357,7 @@ impl RawDeltaTable { } /// Run the UPDATE command on the Delta Table - #[pyo3(signature = (updates, predicate=None, writer_properties=None, safe_cast = false, custom_metadata = None, post_commithook_properties=None))] + #[pyo3(signature = (updates, predicate=None, writer_properties=None, safe_cast = false, custom_metadata = None, post_commithook_properties=None, max_commit_retries=None))] #[allow(clippy::too_many_arguments)] pub fn update( &mut self, @@ -365,6 +368,7 @@ impl RawDeltaTable { safe_cast: bool, custom_metadata: Option>, post_commithook_properties: Option, + max_commit_retries: Option, ) -> PyResult { let (table, metrics) = py.allow_threads(|| { let mut cmd = UpdateBuilder::new( @@ -387,9 +391,11 @@ impl RawDeltaTable { cmd = cmd.with_predicate(update_predicate); } - if let Some(commit_properties) = - maybe_create_commit_properties(custom_metadata, post_commithook_properties) - { + if let Some(commit_properties) = maybe_create_commit_properties( + custom_metadata, + max_commit_retries, + post_commithook_properties, + ) { cmd = cmd.with_commit_properties(commit_properties); } @@ -408,7 +414,8 @@ impl RawDeltaTable { min_commit_interval = None, writer_properties=None, custom_metadata=None, - post_commithook_properties=None + post_commithook_properties=None, + max_commit_retries=None, ))] #[allow(clippy::too_many_arguments)] pub fn compact_optimize( @@ -421,6 +428,7 @@ impl RawDeltaTable { writer_properties: Option, custom_metadata: Option>, post_commithook_properties: Option, + max_commit_retries: Option, ) -> PyResult { let (table, metrics) = py.allow_threads(|| { let mut cmd = OptimizeBuilder::new( @@ -441,9 +449,11 @@ impl RawDeltaTable { ); } - if let Some(commit_properties) = - maybe_create_commit_properties(custom_metadata, post_commithook_properties) - { + if let Some(commit_properties) = maybe_create_commit_properties( + custom_metadata, + max_commit_retries, + post_commithook_properties, + ) { cmd = cmd.with_commit_properties(commit_properties); } @@ -468,7 +478,8 @@ impl RawDeltaTable { min_commit_interval = None, writer_properties=None, custom_metadata=None, - post_commithook_properties=None))] + post_commithook_properties=None, + max_commit_retries=None))] pub fn z_order_optimize( &mut self, py: Python, @@ -481,6 +492,7 @@ impl RawDeltaTable { writer_properties: Option, custom_metadata: Option>, post_commithook_properties: Option, + max_commit_retries: Option, ) -> PyResult { let (table, metrics) = py.allow_threads(|| { let mut cmd = OptimizeBuilder::new( @@ -503,9 +515,11 @@ impl RawDeltaTable { ); } - if let Some(commit_properties) = - maybe_create_commit_properties(custom_metadata, post_commithook_properties) - { + if let Some(commit_properties) = maybe_create_commit_properties( + custom_metadata, + max_commit_retries, + post_commithook_properties, + ) { cmd = cmd.with_commit_properties(commit_properties); } @@ -520,13 +534,14 @@ impl RawDeltaTable { Ok(serde_json::to_string(&metrics).unwrap()) } - #[pyo3(signature = (fields, custom_metadata=None, post_commithook_properties=None))] + #[pyo3(signature = (fields, custom_metadata=None, post_commithook_properties=None, max_commit_retries=None))] pub fn add_columns( &mut self, py: Python, fields: Vec, custom_metadata: Option>, post_commithook_properties: Option, + max_commit_retries: Option, ) -> PyResult<()> { let table = py.allow_threads(|| { let mut cmd = AddColumnBuilder::new( @@ -541,9 +556,11 @@ impl RawDeltaTable { cmd = cmd.with_fields(new_fields); - if let Some(commit_properties) = - maybe_create_commit_properties(custom_metadata, post_commithook_properties) - { + if let Some(commit_properties) = maybe_create_commit_properties( + custom_metadata, + max_commit_retries, + post_commithook_properties, + ) { cmd = cmd.with_commit_properties(commit_properties); } @@ -553,13 +570,14 @@ impl RawDeltaTable { Ok(()) } - #[pyo3(signature = (constraints, custom_metadata=None, post_commithook_properties=None))] + #[pyo3(signature = (constraints, custom_metadata=None, post_commithook_properties=None, max_commit_retries=None))] pub fn add_constraints( &mut self, py: Python, constraints: HashMap, custom_metadata: Option>, post_commithook_properties: Option, + max_commit_retries: Option, ) -> PyResult<()> { let table = py.allow_threads(|| { let mut cmd = ConstraintBuilder::new( @@ -571,9 +589,11 @@ impl RawDeltaTable { cmd = cmd.with_constraint(col_name.clone(), expression.clone()); } - if let Some(commit_properties) = - maybe_create_commit_properties(custom_metadata, post_commithook_properties) - { + if let Some(commit_properties) = maybe_create_commit_properties( + custom_metadata, + max_commit_retries, + post_commithook_properties, + ) { cmd = cmd.with_commit_properties(commit_properties); } @@ -583,7 +603,7 @@ impl RawDeltaTable { Ok(()) } - #[pyo3(signature = (name, raise_if_not_exists, custom_metadata=None, post_commithook_properties=None))] + #[pyo3(signature = (name, raise_if_not_exists, custom_metadata=None, post_commithook_properties=None, max_commit_retries=None))] pub fn drop_constraints( &mut self, py: Python, @@ -591,6 +611,7 @@ impl RawDeltaTable { raise_if_not_exists: bool, custom_metadata: Option>, post_commithook_properties: Option, + max_commit_retries: Option, ) -> PyResult<()> { let table = py.allow_threads(|| { let mut cmd = DropConstraintBuilder::new( @@ -600,9 +621,11 @@ impl RawDeltaTable { .with_constraint(name) .with_raise_if_not_exists(raise_if_not_exists); - if let Some(commit_properties) = - maybe_create_commit_properties(custom_metadata, post_commithook_properties) - { + if let Some(commit_properties) = maybe_create_commit_properties( + custom_metadata, + max_commit_retries, + post_commithook_properties, + ) { cmd = cmd.with_commit_properties(commit_properties); } @@ -689,6 +712,7 @@ impl RawDeltaTable { writer_properties = None, post_commithook_properties = None, custom_metadata = None, + max_commit_retries=None, ))] pub fn create_merge_builder( &self, @@ -701,6 +725,7 @@ impl RawDeltaTable { writer_properties: Option, post_commithook_properties: Option, custom_metadata: Option>, + max_commit_retries: Option, ) -> PyResult { py.allow_threads(|| { Ok(PyMergeBuilder::new( @@ -714,6 +739,7 @@ impl RawDeltaTable { writer_properties, post_commithook_properties, custom_metadata, + max_commit_retries, ) .map_err(PythonError::from)?) }) @@ -735,13 +761,14 @@ impl RawDeltaTable { } // Run the restore command on the Delta Table: restore table to a given version or datetime - #[pyo3(signature = (target, *, ignore_missing_files = false, protocol_downgrade_allowed = false, custom_metadata=None))] + #[pyo3(signature = (target, *, ignore_missing_files = false, protocol_downgrade_allowed = false, custom_metadata=None, max_commit_retries=None))] pub fn restore( &mut self, target: Option<&Bound<'_, PyAny>>, ignore_missing_files: bool, protocol_downgrade_allowed: bool, custom_metadata: Option>, + max_commit_retries: Option, ) -> PyResult { let mut cmd = RestoreBuilder::new( self._table.log_store(), @@ -763,7 +790,9 @@ impl RawDeltaTable { cmd = cmd.with_ignore_missing_files(ignore_missing_files); cmd = cmd.with_protocol_downgrade_allowed(protocol_downgrade_allowed); - if let Some(commit_properties) = maybe_create_commit_properties(custom_metadata, None) { + if let Some(commit_properties) = + maybe_create_commit_properties(custom_metadata, max_commit_retries, None) + { cmd = cmd.with_commit_properties(commit_properties); } @@ -926,6 +955,7 @@ impl RawDeltaTable { partitions_filters: Option>, custom_metadata: Option>, post_commithook_properties: Option, + max_commit_retries: Option, ) -> PyResult<()> { py.allow_threads(|| { let mode = mode.parse().map_err(PythonError::from)?; @@ -1016,6 +1046,10 @@ impl RawDeltaTable { commit_properties = commit_properties.with_metadata(json_metadata); }; + if let Some(max_retries) = max_commit_retries { + commit_properties = commit_properties.with_max_retries(max_retries); + }; + if let Some(post_commit_hook_props) = post_commithook_properties { commit_properties = set_post_commithook_properties(commit_properties, post_commit_hook_props) @@ -1088,7 +1122,7 @@ impl RawDeltaTable { .collect::>()) } /// Run the delete command on the delta table: delete records following a predicate and return the delete metrics. - #[pyo3(signature = (predicate = None, writer_properties=None, custom_metadata=None, post_commithook_properties=None))] + #[pyo3(signature = (predicate = None, writer_properties=None, custom_metadata=None, post_commithook_properties=None, max_commit_retries=None))] pub fn delete( &mut self, py: Python, @@ -1096,6 +1130,7 @@ impl RawDeltaTable { writer_properties: Option, custom_metadata: Option>, post_commithook_properties: Option, + max_commit_retries: Option, ) -> PyResult { let (table, metrics) = py.allow_threads(|| { let mut cmd = DeleteBuilder::new( @@ -1110,9 +1145,11 @@ impl RawDeltaTable { set_writer_properties(writer_props).map_err(PythonError::from)?, ); } - if let Some(commit_properties) = - maybe_create_commit_properties(custom_metadata, post_commithook_properties) - { + if let Some(commit_properties) = maybe_create_commit_properties( + custom_metadata, + max_commit_retries, + post_commithook_properties, + ) { cmd = cmd.with_commit_properties(commit_properties); } @@ -1122,12 +1159,13 @@ impl RawDeltaTable { Ok(serde_json::to_string(&metrics).unwrap()) } - #[pyo3(signature = (properties, raise_if_not_exists, custom_metadata=None))] + #[pyo3(signature = (properties, raise_if_not_exists, custom_metadata=None, max_commit_retries=None))] pub fn set_table_properties( &mut self, properties: HashMap, raise_if_not_exists: bool, custom_metadata: Option>, + max_commit_retries: Option, ) -> PyResult<()> { let mut cmd = SetTablePropertiesBuilder::new( self._table.log_store(), @@ -1136,7 +1174,9 @@ impl RawDeltaTable { .with_properties(properties) .with_raise_if_not_exists(raise_if_not_exists); - if let Some(commit_properties) = maybe_create_commit_properties(custom_metadata, None) { + if let Some(commit_properties) = + maybe_create_commit_properties(custom_metadata, max_commit_retries, None) + { cmd = cmd.with_commit_properties(commit_properties); } @@ -1149,12 +1189,13 @@ impl RawDeltaTable { /// Execute the File System Check command (FSCK) on the delta table: removes old reference to files that /// have been deleted or are malformed - #[pyo3(signature = (dry_run = true, custom_metadata = None, post_commithook_properties=None))] + #[pyo3(signature = (dry_run = true, custom_metadata = None, post_commithook_properties=None, max_commit_retries=None))] pub fn repair( &mut self, dry_run: bool, custom_metadata: Option>, post_commithook_properties: Option, + max_commit_retries: Option, ) -> PyResult { let mut cmd = FileSystemCheckBuilder::new( self._table.log_store(), @@ -1162,9 +1203,11 @@ impl RawDeltaTable { ) .with_dry_run(dry_run); - if let Some(commit_properties) = - maybe_create_commit_properties(custom_metadata, post_commithook_properties) - { + if let Some(commit_properties) = maybe_create_commit_properties( + custom_metadata, + max_commit_retries, + post_commithook_properties, + ) { cmd = cmd.with_commit_properties(commit_properties); } @@ -1303,6 +1346,7 @@ fn convert_partition_filters( fn maybe_create_commit_properties( custom_metadata: Option>, + max_commit_retries: Option, post_commithook_properties: Option, ) -> Option { if custom_metadata.is_none() && post_commithook_properties.is_none() { @@ -1315,6 +1359,10 @@ fn maybe_create_commit_properties( commit_properties = commit_properties.with_metadata(json_metadata); }; + if let Some(max_retries) = max_commit_retries { + commit_properties = commit_properties.with_max_retries(max_retries); + }; + if let Some(post_commit_hook_props) = post_commithook_properties { commit_properties = set_post_commithook_properties(commit_properties, post_commit_hook_props) @@ -1610,6 +1658,7 @@ fn write_to_deltalake( writer_properties: Option, custom_metadata: Option>, post_commithook_properties: Option, + max_commit_retries: Option, ) -> PyResult<()> { py.allow_threads(|| { let batches = data.0.map(|batch| batch.unwrap()).collect::>(); @@ -1667,6 +1716,10 @@ fn write_to_deltalake( commit_properties = commit_properties.with_metadata(json_metadata); }; + if let Some(max_retries) = max_commit_retries { + commit_properties = commit_properties.with_max_retries(max_retries); + }; + if let Some(post_commit_hook_props) = post_commithook_properties { commit_properties = set_post_commithook_properties(commit_properties, post_commit_hook_props) diff --git a/python/src/merge.rs b/python/src/merge.rs index 8fb1f044d8..fd5e95081c 100644 --- a/python/src/merge.rs +++ b/python/src/merge.rs @@ -44,6 +44,7 @@ impl PyMergeBuilder { writer_properties: Option, post_commithook_properties: Option, custom_metadata: Option>, + max_commit_retries: Option, ) -> DeltaResult { let ctx = SessionContext::new(); let schema = source.schema(); @@ -67,9 +68,11 @@ impl PyMergeBuilder { cmd = cmd.with_writer_properties(set_writer_properties(writer_props)?); } - if let Some(commit_properties) = - maybe_create_commit_properties(custom_metadata, post_commithook_properties) - { + if let Some(commit_properties) = maybe_create_commit_properties( + custom_metadata, + max_commit_retries, + post_commithook_properties, + ) { cmd = cmd.with_commit_properties(commit_properties); } Ok(Self { From e09035ee47250b9ad0031f38121d650ebc78349d Mon Sep 17 00:00:00 2001 From: helanto Date: Tue, 3 Sep 2024 09:37:52 +0300 Subject: [PATCH 4/6] feat: Add CommitProperties data class --- python/deltalake/table.py | 168 ++++++++++++++++++++-------------- python/deltalake/writer.py | 34 ++++--- python/tests/test_alter.py | 7 +- python/tests/test_delete.py | 5 +- python/tests/test_merge.py | 4 +- python/tests/test_optimize.py | 9 +- python/tests/test_repair.py | 4 +- python/tests/test_restore.py | 4 +- python/tests/test_update.py | 4 +- python/tests/test_vacuum.py | 4 +- 10 files changed, 147 insertions(+), 96 deletions(-) diff --git a/python/deltalake/table.py b/python/deltalake/table.py index cce5f5bcbd..55998c19a1 100644 --- a/python/deltalake/table.py +++ b/python/deltalake/table.py @@ -148,6 +148,25 @@ def __init__( self.cleanup_expired_logs = cleanup_expired_logs +@dataclass(init=True) +class CommitProperties: + """The commit properties. Controls the behaviour of the commit.""" + + def __init__( + self, + custom_metadata: Optional[Dict[str, str]] = None, + max_commit_retries: Optional[int] = None, + ): + """Custom metadata to be stored in the commit. Controls the number of retries for the commit. + + Args: + custom_metadata: custom metadata that will be added to the transaction commit. + max_commit_retries: maximum number of times to retry the transaction commit. + """ + self.custom_metadata = custom_metadata + self.max_commit_retries = max_commit_retries + + @dataclass(init=True) class BloomFilterProperties: """The Bloom Filter Properties instance for the Rust parquet writer.""" @@ -742,9 +761,8 @@ def vacuum( retention_hours: Optional[int] = None, dry_run: bool = True, enforce_retention_duration: bool = True, - custom_metadata: Optional[Dict[str, str]] = None, + commit_properties: Optional[CommitProperties] = None, post_commithook_properties: Optional[PostCommitHookProperties] = None, - max_commit_retries: Optional[int] = None, ) -> List[str]: """ Run the Vacuum command on the Delta Table: list and delete files no longer referenced by the Delta table and are older than the retention threshold. @@ -753,9 +771,8 @@ def vacuum( retention_hours: the retention threshold in hours, if none then the value from `delta.deletedFileRetentionDuration` is used or default of 1 week otherwise. dry_run: when activated, list only the files, delete otherwise enforce_retention_duration: when disabled, accepts retention hours smaller than the value from `delta.deletedFileRetentionDuration`. - custom_metadata: custom metadata that will be added to the transaction commit. + commit_properties: properties of the transaction commit. If None, default values are used. post_commithook_properties: properties for the post commit hook. If None, default values are used. - max_commit_retries: maximum number of times to retry the transaction commit. Returns: the list of files no longer referenced by the Delta Table and are older than the retention threshold. """ @@ -767,9 +784,9 @@ def vacuum( dry_run, retention_hours, enforce_retention_duration, - custom_metadata, + commit_properties.custom_metadata if commit_properties else None, post_commithook_properties, - max_commit_retries, + commit_properties.max_commit_retries if commit_properties else None, ) def update( @@ -781,9 +798,8 @@ def update( predicate: Optional[str] = None, writer_properties: Optional[WriterProperties] = None, error_on_type_mismatch: bool = True, - custom_metadata: Optional[Dict[str, str]] = None, + commit_properties: Optional[CommitProperties] = None, post_commithook_properties: Optional[PostCommitHookProperties] = None, - max_commit_retries: Optional[int] = None, ) -> Dict[str, Any]: """`UPDATE` records in the Delta Table that matches an optional predicate. Either updates or new_values needs to be passed for it to execute. @@ -794,9 +810,8 @@ def update( predicate: a logical expression. writer_properties: Pass writer properties to the Rust parquet writer. error_on_type_mismatch: specify if update will return error if data types are mismatching :default = True - custom_metadata: custom metadata that will be added to the transaction commit. + commit_properties: properties of the transaction commit. If None, default values are used. post_commithook_properties: properties for the post commit hook. If None, default values are used. - max_commit_retries: maximum number of times to retry the transaction commit. Returns: the metrics from update @@ -874,9 +889,13 @@ def update( predicate, writer_properties, safe_cast=not error_on_type_mismatch, - custom_metadata=custom_metadata, + custom_metadata=commit_properties.custom_metadata + if commit_properties + else None, post_commithook_properties=post_commithook_properties, - max_commit_retries=max_commit_retries, + max_commit_retries=commit_properties.max_commit_retries + if commit_properties + else None, ) return json.loads(metrics) @@ -917,9 +936,8 @@ def merge( error_on_type_mismatch: bool = True, writer_properties: Optional[WriterProperties] = None, large_dtypes: Optional[bool] = None, - custom_metadata: Optional[Dict[str, str]] = None, + commit_properties: Optional[CommitProperties] = None, post_commithook_properties: Optional[PostCommitHookProperties] = None, - max_commit_retries: Optional[int] = None, ) -> "TableMerger": """Pass the source data which you want to merge on the target delta table, providing a predicate in SQL query like format. You can also specify on what to do when the underlying data types do not @@ -934,9 +952,9 @@ def merge( writer_properties: Pass writer properties to the Rust parquet writer large_dtypes: Deprecated, will be removed in 1.0 arrow_schema_conversion_mode: Large converts all types of data schema into Large Arrow types, passthrough keeps string/binary/list types untouched - custom_metadata: custom metadata that will be added to the transaction commit. + custom_metadata: properties for the commit. If None, default values are used. post_commithook_properties: properties for the post commit hook. If None, default values are used. - max_commit_retries: maximum number of times to retry the transaction commit. + Returns: TableMerger: TableMerger Object @@ -984,9 +1002,13 @@ def merge( target_alias=target_alias, safe_cast=not error_on_type_mismatch, writer_properties=writer_properties, - custom_metadata=custom_metadata, + custom_metadata=commit_properties.custom_metadata + if commit_properties + else None, post_commithook_properties=post_commithook_properties, - max_commit_retries=max_commit_retries, + max_commit_retries=commit_properties.max_commit_retries + if commit_properties + else None, ) return TableMerger(py_merge_builder, self._table) @@ -996,8 +1018,7 @@ def restore( *, ignore_missing_files: bool = False, protocol_downgrade_allowed: bool = False, - custom_metadata: Optional[Dict[str, str]] = None, - max_commit_retries: Optional[int] = None, + commit_properties: Optional[CommitProperties] = None, ) -> Dict[str, Any]: """ Run the Restore command on the Delta Table: restore table to a given version or datetime. @@ -1006,8 +1027,7 @@ def restore( target: the expected version will restore, which represented by int, date str or datetime. ignore_missing_files: whether the operation carry on when some data files missing. protocol_downgrade_allowed: whether the operation when protocol version upgraded. - custom_metadata: custom metadata that will be added to the transaction commit. - max_commit_retries: maximum number of times to retry the transaction commit. + commit_properties: properties of the transaction commit. If None, default values are used. Returns: the metrics from restore. @@ -1017,16 +1037,24 @@ def restore( target.isoformat(), ignore_missing_files=ignore_missing_files, protocol_downgrade_allowed=protocol_downgrade_allowed, - custom_metadata=custom_metadata, - max_commit_retries=max_commit_retries, + custom_metadata=commit_properties.custom_metadata + if commit_properties + else None, + max_commit_retries=commit_properties.max_commit_retries + if commit_properties + else None, ) else: metrics = self._table.restore( target, ignore_missing_files=ignore_missing_files, protocol_downgrade_allowed=protocol_downgrade_allowed, - custom_metadata=custom_metadata, - max_commit_retries=max_commit_retries, + custom_metadata=commit_properties.custom_metadata + if commit_properties + else None, + max_commit_retries=commit_properties.max_commit_retries + if commit_properties + else None, ) return json.loads(metrics) @@ -1253,9 +1281,8 @@ def delete( self, predicate: Optional[str] = None, writer_properties: Optional[WriterProperties] = None, - custom_metadata: Optional[Dict[str, str]] = None, + commit_properties: Optional[CommitProperties] = None, post_commithook_properties: Optional[PostCommitHookProperties] = None, - max_commit_retries: Optional[int] = None, ) -> Dict[str, Any]: """Delete records from a Delta Table that statisfy a predicate. @@ -1267,24 +1294,26 @@ def delete( Args: predicate: a SQL where clause. If not passed, will delete all rows. writer_properties: Pass writer properties to the Rust parquet writer. - custom_metadata: custom metadata that will be added to the transaction commit. + commit_properties: properties of the transaction commit. If None, default values are used. post_commithook_properties: properties for the post commit hook. If None, default values are used. - max_commit_retries: maximum number of times to retry the transaction commit. Returns: the metrics from delete. """ metrics = self._table.delete( - predicate, writer_properties, custom_metadata, post_commithook_properties, max_commit_retries + predicate, + writer_properties, + commit_properties.custom_metadata if commit_properties else None, + post_commithook_properties, + commit_properties.max_commit_retries if commit_properties else None, ) return json.loads(metrics) def repair( self, dry_run: bool = False, - custom_metadata: Optional[Dict[str, str]] = None, + commit_properties: Optional[CommitProperties] = None, post_commithook_properties: Optional[PostCommitHookProperties] = None, - max_commit_retries: Optional[int] = None, ) -> Dict[str, Any]: """Repair the Delta Table by auditing active files that do not exist in the underlying filesystem and removes them. This can be useful when there are accidental deletions or corrupted files. @@ -1295,9 +1324,8 @@ def repair( Args: dry_run: when activated, list only the files, otherwise add remove actions to transaction log. Defaults to False. - custom_metadata: custom metadata that will be added to the transaction commit. + commit_properties: properties of the transaction commit. If None, default values are used. post_commithook_properties: properties for the post commit hook. If None, default values are used. - max_commit_retries: maximum number of times to retry the transaction commit. Returns: The metrics from repair (FSCK) action. @@ -1314,7 +1342,10 @@ def repair( ``` """ metrics = self._table.repair( - dry_run, custom_metadata, post_commithook_properties, max_commit_retries + dry_run, + commit_properties.custom_metadata if commit_properties else None, + post_commithook_properties, + commit_properties.max_commit_retries if commit_properties else None, ) return json.loads(metrics) @@ -1704,17 +1735,15 @@ def __init__(self, table: DeltaTable) -> None: def add_columns( self, fields: Union[DeltaField, List[DeltaField]], - custom_metadata: Optional[Dict[str, str]] = None, + commit_properties: Optional[CommitProperties] = None, post_commithook_properties: Optional[PostCommitHookProperties] = None, - max_commit_retries: Optional[int] = None, ) -> None: """Add new columns and/or update the fields of a stuctcolumn Args: fields: fields to merge into schema - custom_metadata: custom metadata that will be added to the transaction commit. + commit_properties: properties of the transaction commit. If None, default values are used. post_commithook_properties: properties for the post commit hook. If None, default values are used. - max_commit_retries: maximum number of times to retry the transaction commit. Example: ```python @@ -1734,24 +1763,25 @@ def add_columns( fields = [fields] self.table._table.add_columns( - fields, custom_metadata, post_commithook_properties, max_commit_retries + fields, + commit_properties.custom_metadata if commit_properties else None, + post_commithook_properties, + commit_properties.max_commit_retries if commit_properties else None, ) def add_constraint( self, constraints: Dict[str, str], - custom_metadata: Optional[Dict[str, str]] = None, + commit_properties: Optional[CommitProperties] = None, post_commithook_properties: Optional[PostCommitHookProperties] = None, - max_commit_retries: Optional[int] = None, ) -> None: """ Add constraints to the table. Limited to `single constraint` at once. Args: constraints: mapping of constraint name to SQL-expression to evaluate on write - custom_metadata: custom metadata that will be added to the transaction commit. + commit_properties: properties of the transaction commit. If None, default values are used. post_commithook_properties: properties for the post commit hook. If None, default values are used. - max_commit_retries: maximum number of times to retry the transaction commit. Example: ```python @@ -1775,16 +1805,18 @@ def add_constraint( ) self.table._table.add_constraints( - constraints, custom_metadata, post_commithook_properties, max_commit_retries + constraints, + commit_properties.custom_metadata if commit_properties else None, + post_commithook_properties, + commit_properties.max_commit_retries if commit_properties else None, ) def drop_constraint( self, name: str, raise_if_not_exists: bool = True, - custom_metadata: Optional[Dict[str, str]] = None, + commit_properties: Optional[CommitProperties] = None, post_commithook_properties: Optional[PostCommitHookProperties] = None, - max_commit_retries: Optional[int] = None, ) -> None: """ Drop constraints from a table. Limited to `single constraint` at once. @@ -1792,9 +1824,8 @@ def drop_constraint( Args: name: constraint name which to drop. raise_if_not_exists: set if should raise if not exists. - custom_metadata: custom metadata that will be added to the transaction commit. + commit_properties: properties of the transaction commit. If None, default values are used. post_commithook_properties: properties for the post commit hook. If None, default values are used. - max_commit_retries: maximum number of times to retry the transaction commit. Example: ```python @@ -1816,15 +1847,18 @@ def drop_constraint( ``` """ self.table._table.drop_constraints( - name, raise_if_not_exists, custom_metadata, post_commithook_properties, max_commit_retries, + name, + raise_if_not_exists, + commit_properties.custom_metadata if commit_properties else None, + post_commithook_properties, + commit_properties.max_commit_retries if commit_properties else None, ) def set_table_properties( self, properties: Dict[str, str], raise_if_not_exists: bool = True, - custom_metadata: Optional[Dict[str, str]] = None, - max_commit_retries: Optional[int] = None, + commit_properties: Optional[CommitProperties] = None, ) -> None: """ Set properties from the table. @@ -1832,8 +1866,7 @@ def set_table_properties( Args: properties: properties which set raise_if_not_exists: set if should raise if not exists. - custom_metadata: custom metadata that will be added to the transaction commit. - max_commit_retries: maximum number of times to retry the transaction commit. + commit_properties: properties of the transaction commit. If None, default values are used. Example: ```python @@ -1851,7 +1884,10 @@ def set_table_properties( ``` """ self.table._table.set_table_properties( - properties, raise_if_not_exists, custom_metadata, max_commit_retries + properties, + raise_if_not_exists, + commit_properties.custom_metadata if commit_properties else None, + commit_properties.max_commit_retries if commit_properties else None, ) @@ -1868,9 +1904,8 @@ def compact( max_concurrent_tasks: Optional[int] = None, min_commit_interval: Optional[Union[int, timedelta]] = None, writer_properties: Optional[WriterProperties] = None, - custom_metadata: Optional[Dict[str, str]] = None, + commit_properties: Optional[CommitProperties] = None, post_commithook_properties: Optional[PostCommitHookProperties] = None, - max_commit_retries: Optional[int] = None, ) -> Dict[str, Any]: """ Compacts small files to reduce the total number of files in the table. @@ -1893,9 +1928,8 @@ def compact( created. Interval is useful for long running executions. Set to 0 or timedelta(0), if you want a commit per partition. writer_properties: Pass writer properties to the Rust parquet writer. - custom_metadata: custom metadata that will be added to the transaction commit. + commit_properties: properties of the transaction commit. If None, default values are used. post_commithook_properties: properties for the post commit hook. If None, default values are used. - max_commit_retries: maximum number of times to retry the transaction commit. Returns: the metrics from optimize @@ -1925,9 +1959,9 @@ def compact( max_concurrent_tasks, min_commit_interval, writer_properties, - custom_metadata, + commit_properties.custom_metadata if commit_properties else None, post_commithook_properties, - max_commit_retries, + commit_properties.max_commit_retries if commit_properties else None, ) self.table.update_incremental() return json.loads(metrics) @@ -1941,9 +1975,8 @@ def z_order( max_spill_size: int = 20 * 1024 * 1024 * 1024, min_commit_interval: Optional[Union[int, timedelta]] = None, writer_properties: Optional[WriterProperties] = None, - custom_metadata: Optional[Dict[str, str]] = None, + commit_properties: Optional[CommitProperties] = None, post_commithook_properties: Optional[PostCommitHookProperties] = None, - max_commit_retries: Optional[int] = None, ) -> Dict[str, Any]: """ Reorders the data using a Z-order curve to improve data skipping. @@ -1964,9 +1997,8 @@ def z_order( created. Interval is useful for long running executions. Set to 0 or timedelta(0), if you want a commit per partition. writer_properties: Pass writer properties to the Rust parquet writer. - custom_metadata: custom metadata that will be added to the transaction commit. + commit_properties: properties of the transaction commit. If None, default values are used. post_commithook_properties: properties for the post commit hook. If None, default values are used. - max_commit_retries: maximum number of times to retry the transaction commit. Returns: the metrics from optimize @@ -1998,9 +2030,9 @@ def z_order( max_spill_size, min_commit_interval, writer_properties, - custom_metadata, + commit_properties.custom_metadata if commit_properties else None, post_commithook_properties, - max_commit_retries, + commit_properties.max_commit_retries if commit_properties else None, ) self.table.update_incremental() return json.loads(metrics) diff --git a/python/deltalake/writer.py b/python/deltalake/writer.py index c3cdd92473..3ae5c5084e 100644 --- a/python/deltalake/writer.py +++ b/python/deltalake/writer.py @@ -52,6 +52,7 @@ MAX_SUPPORTED_PYARROW_WRITER_VERSION, NOT_SUPPORTED_PYARROW_WRITER_VERSIONS, SUPPORTED_WRITER_FEATURES, + CommitProperties, DeltaTable, PostCommitHookProperties, WriterProperties, @@ -122,9 +123,8 @@ def write_deltalake( partition_filters: Optional[List[Tuple[str, str, Any]]] = ..., large_dtypes: bool = ..., engine: Literal["pyarrow"] = ..., - custom_metadata: Optional[Dict[str, str]] = ..., + commit_properties: Optional[CommitProperties] = ..., post_commithook_properties: Optional[PostCommitHookProperties] = ..., - max_commit_retries: Optional[int] = ..., ) -> None: ... @@ -152,9 +152,8 @@ def write_deltalake( large_dtypes: bool = ..., engine: Literal["rust"] = ..., writer_properties: WriterProperties = ..., - custom_metadata: Optional[Dict[str, str]] = ..., + commit_properties: Optional[CommitProperties] = ..., post_commithook_properties: Optional[PostCommitHookProperties] = ..., - max_commit_retries: Optional[int] = ..., ) -> None: ... @@ -184,9 +183,8 @@ def write_deltalake( large_dtypes: bool = ..., engine: Literal["rust"] = ..., writer_properties: WriterProperties = ..., - custom_metadata: Optional[Dict[str, str]] = ..., + commit_properties: Optional[CommitProperties] = ..., post_commithook_properties: Optional[PostCommitHookProperties] = ..., - max_commit_retries: Optional[int] = ..., ) -> None: ... @@ -222,9 +220,8 @@ def write_deltalake( large_dtypes: bool = False, engine: Literal["pyarrow", "rust"] = "rust", writer_properties: Optional[WriterProperties] = None, - custom_metadata: Optional[Dict[str, str]] = None, + commit_properties: Optional[CommitProperties] = None, post_commithook_properties: Optional[PostCommitHookProperties] = None, - max_commit_retries: Optional[int] = None, ) -> None: """Write to a Delta Lake table @@ -279,9 +276,8 @@ def write_deltalake( large_dtypes: Only used for pyarrow engine engine: writer engine to write the delta table. PyArrow engine is deprecated, and will be removed in v1.0. writer_properties: Pass writer properties to the Rust parquet writer. - custom_metadata: Custom metadata to add to the commitInfo. + commit_properties: properties of the transaction commit. If None, default values are used. post_commithook_properties: properties for the post commit hook. If None, default values are used. - max_commit_retries: maximum number of times to retry the transaction commit. """ table, table_uri = try_get_table_and_table_uri(table_or_uri, storage_options) if table is not None: @@ -322,9 +318,13 @@ def write_deltalake( configuration=configuration, storage_options=storage_options, writer_properties=writer_properties, - custom_metadata=custom_metadata, + custom_metadata=commit_properties.custom_metadata + if commit_properties + else None, post_commithook_properties=post_commithook_properties, - max_commit_retries=max_commit_retries, + max_commit_retries=commit_properties.max_commit_retries + if commit_properties + else None, ) if table: table.update_incremental() @@ -547,7 +547,7 @@ def validate_batch(batch: pa.RecordBatch) -> pa.RecordBatch: description, configuration, storage_options, - custom_metadata, + commit_properties.custom_metadata if commit_properties else None, ) else: table._table.create_write_transaction( @@ -556,9 +556,13 @@ def validate_batch(batch: pa.RecordBatch) -> pa.RecordBatch: partition_by or [], schema, partition_filters, - custom_metadata, + custom_metadata=commit_properties.custom_metadata + if commit_properties + else None, post_commithook_properties=post_commithook_properties, - max_commit_retries=max_commit_retries, + max_commit_retries=commit_properties.max_commit_retries + if commit_properties + else None, ) table.update_incremental() else: diff --git a/python/tests/test_alter.py b/python/tests/test_alter.py index b931939348..65ac7e07ac 100644 --- a/python/tests/test_alter.py +++ b/python/tests/test_alter.py @@ -7,6 +7,7 @@ from deltalake import DeltaTable, write_deltalake from deltalake.exceptions import DeltaError, DeltaProtocolError from deltalake.schema import Field, PrimitiveType, StructType +from deltalake.table import CommitProperties def test_add_constraint(tmp_path: pathlib.Path, sample_table: pa.Table): @@ -58,8 +59,9 @@ def test_add_constraint_roundtrip_metadata( dt = DeltaTable(tmp_path) + commit_properties = CommitProperties(custom_metadata={"userName": "John Doe"}) dt.alter.add_constraint( - {"check_price2": "price >= 0"}, custom_metadata={"userName": "John Doe"} + {"check_price2": "price >= 0"}, commit_properties=commit_properties ) assert dt.history(1)[0]["userName"] == "John Doe" @@ -112,7 +114,8 @@ def test_drop_constraint_roundtrip_metadata( dt = DeltaTable(tmp_path) dt.alter.add_constraint({"check_price2": "price >= 0"}) - dt.alter.drop_constraint("check_price2", custom_metadata={"userName": "John Doe"}) + commit_properties = CommitProperties(custom_metadata={"userName": "John Doe"}) + dt.alter.drop_constraint("check_price2", commit_properties=commit_properties) assert dt.history(1)[0]["userName"] == "John Doe" diff --git a/python/tests/test_delete.py b/python/tests/test_delete.py index 65b5ebdec3..9d93b9f95f 100644 --- a/python/tests/test_delete.py +++ b/python/tests/test_delete.py @@ -4,14 +4,15 @@ import pyarrow.compute as pc import pytest -from deltalake.table import DeltaTable +from deltalake.table import CommitProperties, DeltaTable from deltalake.writer import write_deltalake def test_delete_no_predicates(existing_table: DeltaTable): old_version = existing_table.version() - existing_table.delete(custom_metadata={"userName": "John Doe"}) + commit_properties = CommitProperties(custom_metadata={"userName": "John Doe"}) + existing_table.delete(commit_properties=commit_properties) last_action = existing_table.history(1)[0] assert last_action["operation"] == "DELETE" diff --git a/python/tests/test_merge.py b/python/tests/test_merge.py index ea13adf85b..54c2726fd3 100644 --- a/python/tests/test_merge.py +++ b/python/tests/test_merge.py @@ -4,6 +4,7 @@ import pytest from deltalake import DeltaTable, write_deltalake +from deltalake.table import CommitProperties def test_merge_when_matched_delete_wo_predicate( @@ -20,12 +21,13 @@ def test_merge_when_matched_delete_wo_predicate( } ) + commit_properties = CommitProperties(custom_metadata={"userName": "John Doe"}) dt.merge( source=source_table, predicate="t.id = s.id", source_alias="s", target_alias="t", - custom_metadata={"userName": "John Doe"}, + commit_properties=commit_properties, ).when_matched_delete().execute() nrows = 4 diff --git a/python/tests/test_optimize.py b/python/tests/test_optimize.py index 8cb0902dae..2c9685e116 100644 --- a/python/tests/test_optimize.py +++ b/python/tests/test_optimize.py @@ -5,6 +5,7 @@ import pytest from deltalake import DeltaTable, write_deltalake +from deltalake.table import CommitProperties @pytest.mark.parametrize("engine", ["pyarrow", "rust"]) @@ -39,7 +40,8 @@ def test_optimize_run_table( old_data = dt.to_pyarrow_table() old_version = dt.version() - dt.optimize.compact(custom_metadata={"userName": "John Doe"}) + commit_properties = CommitProperties(custom_metadata={"userName": "John Doe"}) + dt.optimize.compact(commit_properties=commit_properties) new_data = dt.to_pyarrow_table() last_action = dt.history(1)[0] @@ -70,9 +72,8 @@ def test_z_order_optimize( dt = DeltaTable(tmp_path) old_version = dt.version() - dt.optimize.z_order( - ["date32", "timestamp"], custom_metadata={"userName": "John Doe"} - ) + commit_properties = CommitProperties(custom_metadata={"userName": "John Doe"}) + dt.optimize.z_order(["date32", "timestamp"], commit_properties=commit_properties) last_action = dt.history(1)[0] assert last_action["operation"] == "OPTIMIZE" assert last_action["userName"] == "John Doe" diff --git a/python/tests/test_repair.py b/python/tests/test_repair.py index 1d4a6adfa8..634bcbd441 100644 --- a/python/tests/test_repair.py +++ b/python/tests/test_repair.py @@ -1,6 +1,7 @@ import os from deltalake import DeltaTable, write_deltalake +from deltalake.table import CommitProperties def test_repair_with_dry_run(tmp_path, sample_data): @@ -23,7 +24,8 @@ def test_repair_wo_dry_run(tmp_path, sample_data): dt = DeltaTable(tmp_path) os.remove(dt.file_uris()[0]) - metrics = dt.repair(dry_run=False, custom_metadata={"userName": "John Doe"}) + commit_properties = CommitProperties(custom_metadata={"userName": "John Doe"}) + metrics = dt.repair(dry_run=False, commit_properties=commit_properties) last_action = dt.history(1)[0] assert len(metrics["files_removed"]) == 1 diff --git a/python/tests/test_restore.py b/python/tests/test_restore.py index 099b887726..47fd5c21de 100644 --- a/python/tests/test_restore.py +++ b/python/tests/test_restore.py @@ -5,6 +5,7 @@ import pytest from deltalake import DeltaTable, write_deltalake +from deltalake.table import CommitProperties @pytest.mark.parametrize("use_relative", [True, False]) @@ -24,7 +25,8 @@ def test_restore_with_version( dt = DeltaTable(table_path) old_version = dt.version() - dt.restore(1, custom_metadata={"userName": "John Doe"}) + commit_properties = CommitProperties(custom_metadata={"userName": "John Doe"}) + dt.restore(1, commit_properties=commit_properties) last_action = dt.history(1)[0] assert last_action["operation"] == "RESTORE" assert last_action["userName"] == "John Doe" diff --git a/python/tests/test_update.py b/python/tests/test_update.py index 554cc276b5..3ae39dadae 100644 --- a/python/tests/test_update.py +++ b/python/tests/test_update.py @@ -4,6 +4,7 @@ import pytest from deltalake import DeltaTable, write_deltalake +from deltalake.table import CommitProperties @pytest.fixture() @@ -38,10 +39,11 @@ def test_update_with_predicate(tmp_path: pathlib.Path, sample_table: pa.Table): } ) + commit_properties = CommitProperties(custom_metadata={"userName": "John Doe"}) dt.update( updates={"deleted": "True"}, predicate="price > 3", - custom_metadata={"userName": "John Doe"}, + commit_properties=commit_properties, ) result = dt.to_pyarrow_table() diff --git a/python/tests/test_vacuum.py b/python/tests/test_vacuum.py index 44c2195e17..6f1b1dd7c5 100644 --- a/python/tests/test_vacuum.py +++ b/python/tests/test_vacuum.py @@ -5,6 +5,7 @@ import pytest from deltalake import DeltaTable, write_deltalake +from deltalake.table import CommitProperties def test_vacuum_dry_run_simple_table(): @@ -72,11 +73,12 @@ def test_vacuum_transaction_log(tmp_path: pathlib.Path, sample_data: pa.Table): dt = DeltaTable(tmp_path) + commit_properties = CommitProperties(custom_metadata={"userName": "John Doe"}) dt.vacuum( retention_hours=0, dry_run=False, enforce_retention_duration=False, - custom_metadata={"userName": "John Doe"}, + commit_properties=commit_properties, ) dt = DeltaTable(tmp_path) From 5c9a39ee06e962e597095893cbaa884a514b7797 Mon Sep 17 00:00:00 2001 From: helanto Date: Tue, 10 Sep 2024 10:42:36 +0300 Subject: [PATCH 5/6] chore: deprecation warning for commit_metadata --- python/deltalake/table.py | 175 ++++++++++++++++++++++++++++++------- python/deltalake/writer.py | 30 +++++-- 2 files changed, 163 insertions(+), 42 deletions(-) diff --git a/python/deltalake/table.py b/python/deltalake/table.py index 55998c19a1..72ffd9e3ec 100644 --- a/python/deltalake/table.py +++ b/python/deltalake/table.py @@ -761,8 +761,9 @@ def vacuum( retention_hours: Optional[int] = None, dry_run: bool = True, enforce_retention_duration: bool = True, - commit_properties: Optional[CommitProperties] = None, + custom_metadata: Optional[Dict[str, str]] = None, post_commithook_properties: Optional[PostCommitHookProperties] = None, + commit_properties: Optional[CommitProperties] = None, ) -> List[str]: """ Run the Vacuum command on the Delta Table: list and delete files no longer referenced by the Delta table and are older than the retention threshold. @@ -771,11 +772,19 @@ def vacuum( retention_hours: the retention threshold in hours, if none then the value from `delta.deletedFileRetentionDuration` is used or default of 1 week otherwise. dry_run: when activated, list only the files, delete otherwise enforce_retention_duration: when disabled, accepts retention hours smaller than the value from `delta.deletedFileRetentionDuration`. - commit_properties: properties of the transaction commit. If None, default values are used. + custom_metadata: Deprecated and will be removed in future versions. Use commit_properties instead. post_commithook_properties: properties for the post commit hook. If None, default values are used. + commit_properties: properties of the transaction commit. If None, default values are used. Returns: the list of files no longer referenced by the Delta Table and are older than the retention threshold. """ + if custom_metadata: + warnings.warn( + "custom_metadata is deprecated, please use commit_properties instead.", + category=DeprecationWarning, + stacklevel=2, + ) + if retention_hours: if retention_hours < 0: raise ValueError("The retention periods should be positive.") @@ -784,7 +793,7 @@ def vacuum( dry_run, retention_hours, enforce_retention_duration, - commit_properties.custom_metadata if commit_properties else None, + commit_properties.custom_metadata if commit_properties else custom_metadata, post_commithook_properties, commit_properties.max_commit_retries if commit_properties else None, ) @@ -798,8 +807,9 @@ def update( predicate: Optional[str] = None, writer_properties: Optional[WriterProperties] = None, error_on_type_mismatch: bool = True, - commit_properties: Optional[CommitProperties] = None, + custom_metadata: Optional[Dict[str, str]] = None, post_commithook_properties: Optional[PostCommitHookProperties] = None, + commit_properties: Optional[CommitProperties] = None, ) -> Dict[str, Any]: """`UPDATE` records in the Delta Table that matches an optional predicate. Either updates or new_values needs to be passed for it to execute. @@ -810,8 +820,9 @@ def update( predicate: a logical expression. writer_properties: Pass writer properties to the Rust parquet writer. error_on_type_mismatch: specify if update will return error if data types are mismatching :default = True - commit_properties: properties of the transaction commit. If None, default values are used. + custom_metadata: Deprecated and will be removed in future versions. Use commit_properties instead. post_commithook_properties: properties for the post commit hook. If None, default values are used. + commit_properties: properties of the transaction commit. If None, default values are used. Returns: the metrics from update @@ -853,6 +864,13 @@ def update( {'num_added_files': 1, 'num_removed_files': 1, 'num_updated_rows': 1, 'num_copied_rows': 2, 'execution_time_ms': ..., 'scan_time_ms': ...} ``` """ + if custom_metadata: + warnings.warn( + "custom_metadata is deprecated, please use commit_properties instead.", + category=DeprecationWarning, + stacklevel=2, + ) + if updates is None and new_values is not None: updates = {} for key, value in new_values.items(): @@ -891,7 +909,7 @@ def update( safe_cast=not error_on_type_mismatch, custom_metadata=commit_properties.custom_metadata if commit_properties - else None, + else custom_metadata, post_commithook_properties=post_commithook_properties, max_commit_retries=commit_properties.max_commit_retries if commit_properties @@ -936,8 +954,9 @@ def merge( error_on_type_mismatch: bool = True, writer_properties: Optional[WriterProperties] = None, large_dtypes: Optional[bool] = None, - commit_properties: Optional[CommitProperties] = None, + custom_metadata: Optional[Dict[str, str]] = None, post_commithook_properties: Optional[PostCommitHookProperties] = None, + commit_properties: Optional[CommitProperties] = None, ) -> "TableMerger": """Pass the source data which you want to merge on the target delta table, providing a predicate in SQL query like format. You can also specify on what to do when the underlying data types do not @@ -952,13 +971,20 @@ def merge( writer_properties: Pass writer properties to the Rust parquet writer large_dtypes: Deprecated, will be removed in 1.0 arrow_schema_conversion_mode: Large converts all types of data schema into Large Arrow types, passthrough keeps string/binary/list types untouched - custom_metadata: properties for the commit. If None, default values are used. + custom_metadata: Deprecated and will be removed in future versions. Use commit_properties instead. post_commithook_properties: properties for the post commit hook. If None, default values are used. - + commit_properties: properties for the commit. If None, default values are used. Returns: TableMerger: TableMerger Object """ + if custom_metadata: + warnings.warn( + "custom_metadata is deprecated, please use commit_properties instead.", + category=DeprecationWarning, + stacklevel=2, + ) + if large_dtypes: warnings.warn( "large_dtypes is deprecated", @@ -1004,7 +1030,7 @@ def merge( writer_properties=writer_properties, custom_metadata=commit_properties.custom_metadata if commit_properties - else None, + else custom_metadata, post_commithook_properties=post_commithook_properties, max_commit_retries=commit_properties.max_commit_retries if commit_properties @@ -1018,6 +1044,7 @@ def restore( *, ignore_missing_files: bool = False, protocol_downgrade_allowed: bool = False, + custom_metadata: Optional[Dict[str, str]] = None, commit_properties: Optional[CommitProperties] = None, ) -> Dict[str, Any]: """ @@ -1027,11 +1054,19 @@ def restore( target: the expected version will restore, which represented by int, date str or datetime. ignore_missing_files: whether the operation carry on when some data files missing. protocol_downgrade_allowed: whether the operation when protocol version upgraded. + custom_metadata: Deprecated and will be removed in future versions. Use commit_properties instead. commit_properties: properties of the transaction commit. If None, default values are used. Returns: the metrics from restore. """ + if custom_metadata: + warnings.warn( + "custom_metadata is deprecated, please use commit_properties instead.", + category=DeprecationWarning, + stacklevel=2, + ) + if isinstance(target, datetime): metrics = self._table.restore( target.isoformat(), @@ -1039,7 +1074,7 @@ def restore( protocol_downgrade_allowed=protocol_downgrade_allowed, custom_metadata=commit_properties.custom_metadata if commit_properties - else None, + else custom_metadata, max_commit_retries=commit_properties.max_commit_retries if commit_properties else None, @@ -1051,7 +1086,7 @@ def restore( protocol_downgrade_allowed=protocol_downgrade_allowed, custom_metadata=commit_properties.custom_metadata if commit_properties - else None, + else custom_metadata, max_commit_retries=commit_properties.max_commit_retries if commit_properties else None, @@ -1281,8 +1316,9 @@ def delete( self, predicate: Optional[str] = None, writer_properties: Optional[WriterProperties] = None, - commit_properties: Optional[CommitProperties] = None, + custom_metadata: Optional[Dict[str, str]] = None, post_commithook_properties: Optional[PostCommitHookProperties] = None, + commit_properties: Optional[CommitProperties] = None, ) -> Dict[str, Any]: """Delete records from a Delta Table that statisfy a predicate. @@ -1294,16 +1330,24 @@ def delete( Args: predicate: a SQL where clause. If not passed, will delete all rows. writer_properties: Pass writer properties to the Rust parquet writer. - commit_properties: properties of the transaction commit. If None, default values are used. + custom_metadata: Deprecated and will be removed in future versions. Use commit_properties instead. post_commithook_properties: properties for the post commit hook. If None, default values are used. + commit_properties: properties of the transaction commit. If None, default values are used. Returns: the metrics from delete. """ + if custom_metadata: + warnings.warn( + "custom_metadata is deprecated, please use commit_properties instead.", + category=DeprecationWarning, + stacklevel=2, + ) + metrics = self._table.delete( predicate, writer_properties, - commit_properties.custom_metadata if commit_properties else None, + commit_properties.custom_metadata if commit_properties else custom_metadata, post_commithook_properties, commit_properties.max_commit_retries if commit_properties else None, ) @@ -1312,8 +1356,9 @@ def delete( def repair( self, dry_run: bool = False, - commit_properties: Optional[CommitProperties] = None, + custom_metadata: Optional[Dict[str, str]] = None, post_commithook_properties: Optional[PostCommitHookProperties] = None, + commit_properties: Optional[CommitProperties] = None, ) -> Dict[str, Any]: """Repair the Delta Table by auditing active files that do not exist in the underlying filesystem and removes them. This can be useful when there are accidental deletions or corrupted files. @@ -1324,8 +1369,9 @@ def repair( Args: dry_run: when activated, list only the files, otherwise add remove actions to transaction log. Defaults to False. - commit_properties: properties of the transaction commit. If None, default values are used. + custom_metadata: Deprecated and will be removed in future versions. Use commit_properties instead. post_commithook_properties: properties for the post commit hook. If None, default values are used. + commit_properties: properties of the transaction commit. If None, default values are used. Returns: The metrics from repair (FSCK) action. @@ -1341,9 +1387,16 @@ def repair( {'dry_run': False, 'files_removed': ['6-0d084325-6885-4847-b008-82c1cf30674c-0.parquet', 5-4fba1d3e-3e20-4de1-933d-a8e13ac59f53-0.parquet']} ``` """ + if custom_metadata: + warnings.warn( + "custom_metadata is deprecated, please use commit_properties instead.", + category=DeprecationWarning, + stacklevel=2, + ) + metrics = self._table.repair( dry_run, - commit_properties.custom_metadata if commit_properties else None, + commit_properties.custom_metadata if commit_properties else custom_metadata, post_commithook_properties, commit_properties.max_commit_retries if commit_properties else None, ) @@ -1735,15 +1788,17 @@ def __init__(self, table: DeltaTable) -> None: def add_columns( self, fields: Union[DeltaField, List[DeltaField]], - commit_properties: Optional[CommitProperties] = None, + custom_metadata: Optional[Dict[str, str]] = None, post_commithook_properties: Optional[PostCommitHookProperties] = None, + commit_properties: Optional[CommitProperties] = None, ) -> None: """Add new columns and/or update the fields of a stuctcolumn Args: fields: fields to merge into schema - commit_properties: properties of the transaction commit. If None, default values are used. + custom_metadata: Deprecated and will be removed in future versions. Use commit_properties instead. post_commithook_properties: properties for the post commit hook. If None, default values are used. + commit_properties: properties of the transaction commit. If None, default values are used. Example: ```python @@ -1759,12 +1814,19 @@ def add_columns( ) ``` """ + if custom_metadata: + warnings.warn( + "custom_metadata is deprecated, please use commit_properties instead.", + category=DeprecationWarning, + stacklevel=2, + ) + if isinstance(fields, DeltaField): fields = [fields] self.table._table.add_columns( fields, - commit_properties.custom_metadata if commit_properties else None, + commit_properties.custom_metadata if commit_properties else custom_metadata, post_commithook_properties, commit_properties.max_commit_retries if commit_properties else None, ) @@ -1772,16 +1834,18 @@ def add_columns( def add_constraint( self, constraints: Dict[str, str], - commit_properties: Optional[CommitProperties] = None, + custom_metadata: Optional[Dict[str, str]] = None, post_commithook_properties: Optional[PostCommitHookProperties] = None, + commit_properties: Optional[CommitProperties] = None, ) -> None: """ Add constraints to the table. Limited to `single constraint` at once. Args: constraints: mapping of constraint name to SQL-expression to evaluate on write - commit_properties: properties of the transaction commit. If None, default values are used. + custom_metadata: Deprecated and will be removed in future versions. Use commit_properties instead. post_commithook_properties: properties for the post commit hook. If None, default values are used. + commit_properties: properties of the transaction commit. If None, default values are used. Example: ```python @@ -1798,6 +1862,13 @@ def add_constraint( {'delta.constraints.value_gt_5': 'value > 5'} ``` """ + if custom_metadata: + warnings.warn( + "custom_metadata is deprecated, please use commit_properties instead.", + category=DeprecationWarning, + stacklevel=2, + ) + if len(constraints.keys()) > 1: raise ValueError( """add_constraints is limited to a single constraint addition at once for now. @@ -1806,7 +1877,7 @@ def add_constraint( self.table._table.add_constraints( constraints, - commit_properties.custom_metadata if commit_properties else None, + commit_properties.custom_metadata if commit_properties else custom_metadata, post_commithook_properties, commit_properties.max_commit_retries if commit_properties else None, ) @@ -1815,8 +1886,9 @@ def drop_constraint( self, name: str, raise_if_not_exists: bool = True, - commit_properties: Optional[CommitProperties] = None, + custom_metadata: Optional[Dict[str, str]] = None, post_commithook_properties: Optional[PostCommitHookProperties] = None, + commit_properties: Optional[CommitProperties] = None, ) -> None: """ Drop constraints from a table. Limited to `single constraint` at once. @@ -1824,8 +1896,9 @@ def drop_constraint( Args: name: constraint name which to drop. raise_if_not_exists: set if should raise if not exists. - commit_properties: properties of the transaction commit. If None, default values are used. + custom_metadata: Deprecated and will be removed in future versions. Use commit_properties instead. post_commithook_properties: properties for the post commit hook. If None, default values are used. + commit_properties: properties of the transaction commit. If None, default values are used. Example: ```python @@ -1846,10 +1919,17 @@ def drop_constraint( {} ``` """ + if custom_metadata: + warnings.warn( + "custom_metadata is deprecated, please use commit_properties instead.", + category=DeprecationWarning, + stacklevel=2, + ) + self.table._table.drop_constraints( name, raise_if_not_exists, - commit_properties.custom_metadata if commit_properties else None, + commit_properties.custom_metadata if commit_properties else custom_metadata, post_commithook_properties, commit_properties.max_commit_retries if commit_properties else None, ) @@ -1858,6 +1938,7 @@ def set_table_properties( self, properties: Dict[str, str], raise_if_not_exists: bool = True, + custom_metadata: Optional[Dict[str, str]] = None, commit_properties: Optional[CommitProperties] = None, ) -> None: """ @@ -1866,6 +1947,7 @@ def set_table_properties( Args: properties: properties which set raise_if_not_exists: set if should raise if not exists. + custom_metadata: Deprecated and will be removed in future versions. Use commit_properties instead. commit_properties: properties of the transaction commit. If None, default values are used. Example: @@ -1883,10 +1965,17 @@ def set_table_properties( dt.alter.set_table_properties({"delta.enableChangeDataFeed": "true"}) ``` """ + if custom_metadata: + warnings.warn( + "custom_metadata is deprecated, please use commit_properties instead.", + category=DeprecationWarning, + stacklevel=2, + ) + self.table._table.set_table_properties( properties, raise_if_not_exists, - commit_properties.custom_metadata if commit_properties else None, + commit_properties.custom_metadata if commit_properties else custom_metadata, commit_properties.max_commit_retries if commit_properties else None, ) @@ -1904,8 +1993,9 @@ def compact( max_concurrent_tasks: Optional[int] = None, min_commit_interval: Optional[Union[int, timedelta]] = None, writer_properties: Optional[WriterProperties] = None, - commit_properties: Optional[CommitProperties] = None, + custom_metadata: Optional[Dict[str, str]] = None, post_commithook_properties: Optional[PostCommitHookProperties] = None, + commit_properties: Optional[CommitProperties] = None, ) -> Dict[str, Any]: """ Compacts small files to reduce the total number of files in the table. @@ -1928,8 +2018,9 @@ def compact( created. Interval is useful for long running executions. Set to 0 or timedelta(0), if you want a commit per partition. writer_properties: Pass writer properties to the Rust parquet writer. - commit_properties: properties of the transaction commit. If None, default values are used. + custom_metadata: Deprecated and will be removed in future versions. Use commit_properties instead. post_commithook_properties: properties for the post commit hook. If None, default values are used. + commit_properties: properties of the transaction commit. If None, default values are used. Returns: the metrics from optimize @@ -1950,6 +2041,13 @@ def compact( {'numFilesAdded': 1, 'numFilesRemoved': 2, 'filesAdded': ..., 'filesRemoved': ..., 'partitionsOptimized': 1, 'numBatches': 2, 'totalConsideredFiles': 2, 'totalFilesSkipped': 0, 'preserveInsertionOrder': True} ``` """ + if custom_metadata: + warnings.warn( + "custom_metadata is deprecated, please use commit_properties instead.", + category=DeprecationWarning, + stacklevel=2, + ) + if isinstance(min_commit_interval, timedelta): min_commit_interval = int(min_commit_interval.total_seconds()) @@ -1959,7 +2057,7 @@ def compact( max_concurrent_tasks, min_commit_interval, writer_properties, - commit_properties.custom_metadata if commit_properties else None, + commit_properties.custom_metadata if commit_properties else custom_metadata, post_commithook_properties, commit_properties.max_commit_retries if commit_properties else None, ) @@ -1975,8 +2073,9 @@ def z_order( max_spill_size: int = 20 * 1024 * 1024 * 1024, min_commit_interval: Optional[Union[int, timedelta]] = None, writer_properties: Optional[WriterProperties] = None, - commit_properties: Optional[CommitProperties] = None, + custom_metadata: Optional[Dict[str, str]] = None, post_commithook_properties: Optional[PostCommitHookProperties] = None, + commit_properties: Optional[CommitProperties] = None, ) -> Dict[str, Any]: """ Reorders the data using a Z-order curve to improve data skipping. @@ -1997,8 +2096,9 @@ def z_order( created. Interval is useful for long running executions. Set to 0 or timedelta(0), if you want a commit per partition. writer_properties: Pass writer properties to the Rust parquet writer. - commit_properties: properties of the transaction commit. If None, default values are used. + custom_metadata: Deprecated and will be removed in future versions. Use commit_properties instead. post_commithook_properties: properties for the post commit hook. If None, default values are used. + commit_properties: properties of the transaction commit. If None, default values are used. Returns: the metrics from optimize @@ -2019,6 +2119,13 @@ def z_order( {'numFilesAdded': 1, 'numFilesRemoved': 2, 'filesAdded': ..., 'filesRemoved': ..., 'partitionsOptimized': 0, 'numBatches': 1, 'totalConsideredFiles': 2, 'totalFilesSkipped': 0, 'preserveInsertionOrder': True} ``` """ + if custom_metadata: + warnings.warn( + "custom_metadata is deprecated, please use commit_properties instead.", + category=DeprecationWarning, + stacklevel=2, + ) + if isinstance(min_commit_interval, timedelta): min_commit_interval = int(min_commit_interval.total_seconds()) @@ -2030,7 +2137,7 @@ def z_order( max_spill_size, min_commit_interval, writer_properties, - commit_properties.custom_metadata if commit_properties else None, + commit_properties.custom_metadata if commit_properties else custom_metadata, post_commithook_properties, commit_properties.max_commit_retries if commit_properties else None, ) diff --git a/python/deltalake/writer.py b/python/deltalake/writer.py index 3ae5c5084e..fdadb59570 100644 --- a/python/deltalake/writer.py +++ b/python/deltalake/writer.py @@ -123,8 +123,9 @@ def write_deltalake( partition_filters: Optional[List[Tuple[str, str, Any]]] = ..., large_dtypes: bool = ..., engine: Literal["pyarrow"] = ..., - commit_properties: Optional[CommitProperties] = ..., + custom_metadata: Optional[Dict[str, str]] = ..., post_commithook_properties: Optional[PostCommitHookProperties] = ..., + commit_properties: Optional[CommitProperties] = ..., ) -> None: ... @@ -152,8 +153,9 @@ def write_deltalake( large_dtypes: bool = ..., engine: Literal["rust"] = ..., writer_properties: WriterProperties = ..., - commit_properties: Optional[CommitProperties] = ..., + custom_metadata: Optional[Dict[str, str]] = ..., post_commithook_properties: Optional[PostCommitHookProperties] = ..., + commit_properties: Optional[CommitProperties] = ..., ) -> None: ... @@ -183,8 +185,9 @@ def write_deltalake( large_dtypes: bool = ..., engine: Literal["rust"] = ..., writer_properties: WriterProperties = ..., - commit_properties: Optional[CommitProperties] = ..., + custom_metadata: Optional[Dict[str, str]] = ..., post_commithook_properties: Optional[PostCommitHookProperties] = ..., + commit_properties: Optional[CommitProperties] = ..., ) -> None: ... @@ -220,8 +223,9 @@ def write_deltalake( large_dtypes: bool = False, engine: Literal["pyarrow", "rust"] = "rust", writer_properties: Optional[WriterProperties] = None, - commit_properties: Optional[CommitProperties] = None, + custom_metadata: Optional[Dict[str, str]] = None, post_commithook_properties: Optional[PostCommitHookProperties] = None, + commit_properties: Optional[CommitProperties] = None, ) -> None: """Write to a Delta Lake table @@ -276,9 +280,17 @@ def write_deltalake( large_dtypes: Only used for pyarrow engine engine: writer engine to write the delta table. PyArrow engine is deprecated, and will be removed in v1.0. writer_properties: Pass writer properties to the Rust parquet writer. - commit_properties: properties of the transaction commit. If None, default values are used. + custom_metadata: Deprecated and will be removed in future versions. Use commit_properties instead. post_commithook_properties: properties for the post commit hook. If None, default values are used. + commit_properties: properties of the transaction commit. If None, default values are used. """ + if custom_metadata: + warnings.warn( + "custom_metadata is deprecated, please use commit_properties instead.", + category=DeprecationWarning, + stacklevel=2, + ) + table, table_uri = try_get_table_and_table_uri(table_or_uri, storage_options) if table is not None: storage_options = table._storage_options or {} @@ -320,7 +332,7 @@ def write_deltalake( writer_properties=writer_properties, custom_metadata=commit_properties.custom_metadata if commit_properties - else None, + else custom_metadata, post_commithook_properties=post_commithook_properties, max_commit_retries=commit_properties.max_commit_retries if commit_properties @@ -547,7 +559,9 @@ def validate_batch(batch: pa.RecordBatch) -> pa.RecordBatch: description, configuration, storage_options, - commit_properties.custom_metadata if commit_properties else None, + commit_properties.custom_metadata + if commit_properties + else custom_metadata, ) else: table._table.create_write_transaction( @@ -558,7 +572,7 @@ def validate_batch(batch: pa.RecordBatch) -> pa.RecordBatch: partition_filters, custom_metadata=commit_properties.custom_metadata if commit_properties - else None, + else custom_metadata, post_commithook_properties=post_commithook_properties, max_commit_retries=commit_properties.max_commit_retries if commit_properties From e99e64f571bd948e0c616b1ee4d8f2e277a543e9 Mon Sep 17 00:00:00 2001 From: helanto Date: Wed, 11 Sep 2024 19:59:30 +0300 Subject: [PATCH 6/6] chore: Create pyo3 class for CommitProperties --- python/deltalake/__init__.py | 1 + python/deltalake/_internal.pyi | 49 +++---- python/deltalake/table.py | 102 ++++++++------ python/deltalake/writer.py | 18 +-- python/src/lib.rs | 234 +++++++++++++-------------------- python/src/merge.rs | 15 +-- 6 files changed, 188 insertions(+), 231 deletions(-) diff --git a/python/deltalake/__init__.py b/python/deltalake/__init__.py index fda126d2e6..981fda53c0 100644 --- a/python/deltalake/__init__.py +++ b/python/deltalake/__init__.py @@ -10,6 +10,7 @@ from .table import ( ColumnProperties as ColumnProperties, ) +from .table import CommitProperties as CommitProperties from .table import DeltaTable as DeltaTable from .table import Metadata as Metadata from .table import PostCommitHookProperties as PostCommitHookProperties diff --git a/python/deltalake/_internal.pyi b/python/deltalake/_internal.pyi index 6bf80a8a03..b7fc21f484 100644 --- a/python/deltalake/_internal.pyi +++ b/python/deltalake/_internal.pyi @@ -3,7 +3,12 @@ from typing import Any, Dict, List, Literal, Mapping, Optional, Tuple, Union import pyarrow import pyarrow.fs as fs -from deltalake.writer import AddAction, PostCommitHookProperties, WriterProperties +from deltalake.writer import ( + AddAction, + CommitProperties, + PostCommitHookProperties, + WriterProperties, +) __version__: str @@ -57,9 +62,8 @@ class RawDeltaTable: dry_run: bool, retention_hours: Optional[int], enforce_retention_duration: bool, - custom_metadata: Optional[Dict[str, str]], + commit_properties: Optional[CommitProperties], post_commithook_properties: Optional[PostCommitHookProperties], - max_commit_retries: Optional[int], ) -> List[str]: ... def compact_optimize( self, @@ -68,9 +72,8 @@ class RawDeltaTable: max_concurrent_tasks: Optional[int], min_commit_interval: Optional[int], writer_properties: Optional[WriterProperties], - custom_metadata: Optional[Dict[str, str]], + commit_properties: Optional[CommitProperties], post_commithook_properties: Optional[PostCommitHookProperties], - max_commit_retries: Optional[int], ) -> str: ... def z_order_optimize( self, @@ -81,46 +84,40 @@ class RawDeltaTable: max_spill_size: Optional[int], min_commit_interval: Optional[int], writer_properties: Optional[WriterProperties], - custom_metadata: Optional[Dict[str, str]], + commit_properties: Optional[CommitProperties], post_commithook_properties: Optional[PostCommitHookProperties], - max_commit_retries: Optional[int], ) -> str: ... def add_columns( self, fields: List[Field], - custom_metadata: Optional[Dict[str, str]], + commit_properties: Optional[CommitProperties], post_commithook_properties: Optional[PostCommitHookProperties], - max_commit_retries: Optional[int], ) -> None: ... def add_constraints( self, constraints: Dict[str, str], - custom_metadata: Optional[Dict[str, str]], + commit_properties: Optional[CommitProperties], post_commithook_properties: Optional[PostCommitHookProperties], - max_commit_retries: Optional[int], ) -> None: ... def drop_constraints( self, name: str, raise_if_not_exists: bool, - custom_metadata: Optional[Dict[str, str]], + commit_properties: Optional[CommitProperties], post_commithook_properties: Optional[PostCommitHookProperties], - max_commit_retries: Optional[int], ) -> None: ... def set_table_properties( self, properties: Dict[str, str], raise_if_not_exists: bool, - custom_metadata: Optional[Dict[str, str]], - max_commit_retries: Optional[int], + commit_properties: Optional[CommitProperties], ) -> None: ... def restore( self, target: Optional[Any], ignore_missing_files: bool, protocol_downgrade_allowed: bool, - custom_metadata: Optional[Dict[str, str]], - max_commit_retries: Optional[int], + commit_properties: Optional[CommitProperties], ) -> str: ... def history(self, limit: Optional[int]) -> List[str]: ... def update_incremental(self) -> None: ... @@ -133,16 +130,14 @@ class RawDeltaTable: self, predicate: Optional[str], writer_properties: Optional[WriterProperties], - custom_metadata: Optional[Dict[str, str]], + commit_properties: Optional[CommitProperties], post_commithook_properties: Optional[PostCommitHookProperties], - max_commit_retries: Optional[int], ) -> str: ... def repair( self, dry_run: bool, - custom_metadata: Optional[Dict[str, str]], + commit_properties: Optional[CommitProperties], post_commithook_properties: Optional[PostCommitHookProperties], - max_commit_retries: Optional[int], ) -> str: ... def update( self, @@ -150,9 +145,8 @@ class RawDeltaTable: predicate: Optional[str], writer_properties: Optional[WriterProperties], safe_cast: bool, - custom_metadata: Optional[Dict[str, str]], + commit_properties: Optional[CommitProperties], post_commithook_properties: Optional[PostCommitHookProperties], - max_commit_retries: Optional[int], ) -> str: ... def create_merge_builder( self, @@ -161,10 +155,9 @@ class RawDeltaTable: source_alias: Optional[str], target_alias: Optional[str], writer_properties: Optional[WriterProperties], - custom_metadata: Optional[Dict[str, str]], + commit_properties: Optional[CommitProperties], post_commithook_properties: Optional[PostCommitHookProperties], safe_cast: bool, - max_commit_retries: Optional[int], ) -> PyMergeBuilder: ... def merge_execute(self, merge_builder: PyMergeBuilder) -> str: ... def get_active_partitions( @@ -177,9 +170,8 @@ class RawDeltaTable: partition_by: List[str], schema: pyarrow.Schema, partitions_filters: Optional[FilterType], - custom_metadata: Optional[Dict[str, str]], + commit_properties: Optional[CommitProperties], post_commithook_properties: Optional[PostCommitHookProperties], - max_commit_retries: Optional[int], ) -> None: ... def cleanup_metadata(self) -> None: ... def check_can_write_timestamp_ntz(self, schema: pyarrow.Schema) -> None: ... @@ -219,9 +211,8 @@ def write_to_deltalake( configuration: Optional[Mapping[str, Optional[str]]], storage_options: Optional[Dict[str, str]], writer_properties: Optional[WriterProperties], - custom_metadata: Optional[Dict[str, str]], + commit_properties: Optional[CommitProperties], post_commithook_properties: Optional[PostCommitHookProperties], - max_commit_retries: Optional[int], ) -> None: ... def convert_to_deltalake( uri: str, diff --git a/python/deltalake/table.py b/python/deltalake/table.py index 72ffd9e3ec..11a8baa0dd 100644 --- a/python/deltalake/table.py +++ b/python/deltalake/table.py @@ -167,6 +167,17 @@ def __init__( self.max_commit_retries = max_commit_retries +def _commit_properties_from_custom_metadata( + maybe_properties: Optional[CommitProperties], custom_metadata: Dict[str, str] +) -> CommitProperties: + if maybe_properties is not None: + if maybe_properties.custom_metadata is None: + maybe_properties.custom_metadata = custom_metadata + return maybe_properties + return maybe_properties + return CommitProperties(custom_metadata=custom_metadata) + + @dataclass(init=True) class BloomFilterProperties: """The Bloom Filter Properties instance for the Rust parquet writer.""" @@ -784,6 +795,9 @@ def vacuum( category=DeprecationWarning, stacklevel=2, ) + commit_properties = _commit_properties_from_custom_metadata( + commit_properties, custom_metadata + ) if retention_hours: if retention_hours < 0: @@ -793,9 +807,8 @@ def vacuum( dry_run, retention_hours, enforce_retention_duration, - commit_properties.custom_metadata if commit_properties else custom_metadata, + commit_properties, post_commithook_properties, - commit_properties.max_commit_retries if commit_properties else None, ) def update( @@ -870,6 +883,9 @@ def update( category=DeprecationWarning, stacklevel=2, ) + commit_properties = _commit_properties_from_custom_metadata( + commit_properties, custom_metadata + ) if updates is None and new_values is not None: updates = {} @@ -907,13 +923,8 @@ def update( predicate, writer_properties, safe_cast=not error_on_type_mismatch, - custom_metadata=commit_properties.custom_metadata - if commit_properties - else custom_metadata, + commit_properties=commit_properties, post_commithook_properties=post_commithook_properties, - max_commit_retries=commit_properties.max_commit_retries - if commit_properties - else None, ) return json.loads(metrics) @@ -984,6 +995,9 @@ def merge( category=DeprecationWarning, stacklevel=2, ) + commit_properties = _commit_properties_from_custom_metadata( + commit_properties, custom_metadata + ) if large_dtypes: warnings.warn( @@ -1028,13 +1042,8 @@ def merge( target_alias=target_alias, safe_cast=not error_on_type_mismatch, writer_properties=writer_properties, - custom_metadata=commit_properties.custom_metadata - if commit_properties - else custom_metadata, + commit_properties=commit_properties, post_commithook_properties=post_commithook_properties, - max_commit_retries=commit_properties.max_commit_retries - if commit_properties - else None, ) return TableMerger(py_merge_builder, self._table) @@ -1066,30 +1075,23 @@ def restore( category=DeprecationWarning, stacklevel=2, ) + commit_properties = _commit_properties_from_custom_metadata( + commit_properties, custom_metadata + ) if isinstance(target, datetime): metrics = self._table.restore( target.isoformat(), ignore_missing_files=ignore_missing_files, protocol_downgrade_allowed=protocol_downgrade_allowed, - custom_metadata=commit_properties.custom_metadata - if commit_properties - else custom_metadata, - max_commit_retries=commit_properties.max_commit_retries - if commit_properties - else None, + commit_properties=commit_properties, ) else: metrics = self._table.restore( target, ignore_missing_files=ignore_missing_files, protocol_downgrade_allowed=protocol_downgrade_allowed, - custom_metadata=commit_properties.custom_metadata - if commit_properties - else custom_metadata, - max_commit_retries=commit_properties.max_commit_retries - if commit_properties - else None, + commit_properties=commit_properties, ) return json.loads(metrics) @@ -1343,13 +1345,15 @@ def delete( category=DeprecationWarning, stacklevel=2, ) + commit_properties = _commit_properties_from_custom_metadata( + commit_properties, custom_metadata + ) metrics = self._table.delete( predicate, writer_properties, - commit_properties.custom_metadata if commit_properties else custom_metadata, + commit_properties, post_commithook_properties, - commit_properties.max_commit_retries if commit_properties else None, ) return json.loads(metrics) @@ -1393,12 +1397,14 @@ def repair( category=DeprecationWarning, stacklevel=2, ) + commit_properties = _commit_properties_from_custom_metadata( + commit_properties, custom_metadata + ) metrics = self._table.repair( dry_run, - commit_properties.custom_metadata if commit_properties else custom_metadata, + commit_properties, post_commithook_properties, - commit_properties.max_commit_retries if commit_properties else None, ) return json.loads(metrics) @@ -1820,15 +1826,17 @@ def add_columns( category=DeprecationWarning, stacklevel=2, ) + commit_properties = _commit_properties_from_custom_metadata( + commit_properties, custom_metadata + ) if isinstance(fields, DeltaField): fields = [fields] self.table._table.add_columns( fields, - commit_properties.custom_metadata if commit_properties else custom_metadata, + commit_properties, post_commithook_properties, - commit_properties.max_commit_retries if commit_properties else None, ) def add_constraint( @@ -1868,6 +1876,9 @@ def add_constraint( category=DeprecationWarning, stacklevel=2, ) + commit_properties = _commit_properties_from_custom_metadata( + commit_properties, custom_metadata + ) if len(constraints.keys()) > 1: raise ValueError( @@ -1877,9 +1888,8 @@ def add_constraint( self.table._table.add_constraints( constraints, - commit_properties.custom_metadata if commit_properties else custom_metadata, + commit_properties, post_commithook_properties, - commit_properties.max_commit_retries if commit_properties else None, ) def drop_constraint( @@ -1925,13 +1935,15 @@ def drop_constraint( category=DeprecationWarning, stacklevel=2, ) + commit_properties = _commit_properties_from_custom_metadata( + commit_properties, custom_metadata + ) self.table._table.drop_constraints( name, raise_if_not_exists, - commit_properties.custom_metadata if commit_properties else custom_metadata, + commit_properties, post_commithook_properties, - commit_properties.max_commit_retries if commit_properties else None, ) def set_table_properties( @@ -1971,12 +1983,14 @@ def set_table_properties( category=DeprecationWarning, stacklevel=2, ) + commit_properties = _commit_properties_from_custom_metadata( + commit_properties, custom_metadata + ) self.table._table.set_table_properties( properties, raise_if_not_exists, - commit_properties.custom_metadata if commit_properties else custom_metadata, - commit_properties.max_commit_retries if commit_properties else None, + commit_properties, ) @@ -2047,6 +2061,9 @@ def compact( category=DeprecationWarning, stacklevel=2, ) + commit_properties = _commit_properties_from_custom_metadata( + commit_properties, custom_metadata + ) if isinstance(min_commit_interval, timedelta): min_commit_interval = int(min_commit_interval.total_seconds()) @@ -2057,9 +2074,8 @@ def compact( max_concurrent_tasks, min_commit_interval, writer_properties, - commit_properties.custom_metadata if commit_properties else custom_metadata, + commit_properties, post_commithook_properties, - commit_properties.max_commit_retries if commit_properties else None, ) self.table.update_incremental() return json.loads(metrics) @@ -2125,6 +2141,9 @@ def z_order( category=DeprecationWarning, stacklevel=2, ) + commit_properties = _commit_properties_from_custom_metadata( + commit_properties, custom_metadata + ) if isinstance(min_commit_interval, timedelta): min_commit_interval = int(min_commit_interval.total_seconds()) @@ -2137,9 +2156,8 @@ def z_order( max_spill_size, min_commit_interval, writer_properties, - commit_properties.custom_metadata if commit_properties else custom_metadata, + commit_properties, post_commithook_properties, - commit_properties.max_commit_retries if commit_properties else None, ) self.table.update_incremental() return json.loads(metrics) diff --git a/python/deltalake/writer.py b/python/deltalake/writer.py index fdadb59570..535a6e7a13 100644 --- a/python/deltalake/writer.py +++ b/python/deltalake/writer.py @@ -56,6 +56,7 @@ DeltaTable, PostCommitHookProperties, WriterProperties, + _commit_properties_from_custom_metadata, ) try: @@ -290,6 +291,9 @@ def write_deltalake( category=DeprecationWarning, stacklevel=2, ) + commit_properties = _commit_properties_from_custom_metadata( + commit_properties, custom_metadata + ) table, table_uri = try_get_table_and_table_uri(table_or_uri, storage_options) if table is not None: @@ -330,13 +334,8 @@ def write_deltalake( configuration=configuration, storage_options=storage_options, writer_properties=writer_properties, - custom_metadata=commit_properties.custom_metadata - if commit_properties - else custom_metadata, + commit_properties=commit_properties, post_commithook_properties=post_commithook_properties, - max_commit_retries=commit_properties.max_commit_retries - if commit_properties - else None, ) if table: table.update_incremental() @@ -570,13 +569,8 @@ def validate_batch(batch: pa.RecordBatch) -> pa.RecordBatch: partition_by or [], schema, partition_filters, - custom_metadata=commit_properties.custom_metadata - if commit_properties - else custom_metadata, + commit_properties=commit_properties, post_commithook_properties=post_commithook_properties, - max_commit_retries=commit_properties.max_commit_retries - if commit_properties - else None, ) table.update_incremental() else: diff --git a/python/src/lib.rs b/python/src/lib.rs index 3ed38dd8cd..fc1c18c880 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -321,16 +321,15 @@ impl RawDeltaTable { /// Run the Vacuum command on the Delta Table: list and delete files no longer referenced /// by the Delta table and are older than the retention threshold. - #[pyo3(signature = (dry_run, retention_hours = None, enforce_retention_duration = true, custom_metadata=None, post_commithook_properties=None, max_commit_retries=None))] + #[pyo3(signature = (dry_run, retention_hours = None, enforce_retention_duration = true, commit_properties=None, post_commithook_properties=None))] pub fn vacuum( &mut self, py: Python, dry_run: bool, retention_hours: Option, enforce_retention_duration: bool, - custom_metadata: Option>, + commit_properties: Option, post_commithook_properties: Option, - max_commit_retries: Option, ) -> PyResult> { let (table, metrics) = py.allow_threads(|| { let mut cmd = VacuumBuilder::new( @@ -343,11 +342,9 @@ impl RawDeltaTable { cmd = cmd.with_retention_period(Duration::hours(retention_period as i64)); } - if let Some(commit_properties) = maybe_create_commit_properties( - custom_metadata, - max_commit_retries, - post_commithook_properties, - ) { + if let Some(commit_properties) = + maybe_create_commit_properties(commit_properties, post_commithook_properties) + { cmd = cmd.with_commit_properties(commit_properties); } rt().block_on(cmd.into_future()).map_err(PythonError::from) @@ -357,7 +354,7 @@ impl RawDeltaTable { } /// Run the UPDATE command on the Delta Table - #[pyo3(signature = (updates, predicate=None, writer_properties=None, safe_cast = false, custom_metadata = None, post_commithook_properties=None, max_commit_retries=None))] + #[pyo3(signature = (updates, predicate=None, writer_properties=None, safe_cast = false, commit_properties = None, post_commithook_properties=None))] #[allow(clippy::too_many_arguments)] pub fn update( &mut self, @@ -366,9 +363,8 @@ impl RawDeltaTable { predicate: Option, writer_properties: Option, safe_cast: bool, - custom_metadata: Option>, + commit_properties: Option, post_commithook_properties: Option, - max_commit_retries: Option, ) -> PyResult { let (table, metrics) = py.allow_threads(|| { let mut cmd = UpdateBuilder::new( @@ -391,11 +387,9 @@ impl RawDeltaTable { cmd = cmd.with_predicate(update_predicate); } - if let Some(commit_properties) = maybe_create_commit_properties( - custom_metadata, - max_commit_retries, - post_commithook_properties, - ) { + if let Some(commit_properties) = + maybe_create_commit_properties(commit_properties, post_commithook_properties) + { cmd = cmd.with_commit_properties(commit_properties); } @@ -413,9 +407,8 @@ impl RawDeltaTable { max_concurrent_tasks = None, min_commit_interval = None, writer_properties=None, - custom_metadata=None, - post_commithook_properties=None, - max_commit_retries=None, + commit_properties=None, + post_commithook_properties=None ))] #[allow(clippy::too_many_arguments)] pub fn compact_optimize( @@ -426,9 +419,8 @@ impl RawDeltaTable { max_concurrent_tasks: Option, min_commit_interval: Option, writer_properties: Option, - custom_metadata: Option>, + commit_properties: Option, post_commithook_properties: Option, - max_commit_retries: Option, ) -> PyResult { let (table, metrics) = py.allow_threads(|| { let mut cmd = OptimizeBuilder::new( @@ -449,11 +441,9 @@ impl RawDeltaTable { ); } - if let Some(commit_properties) = maybe_create_commit_properties( - custom_metadata, - max_commit_retries, - post_commithook_properties, - ) { + if let Some(commit_properties) = + maybe_create_commit_properties(commit_properties, post_commithook_properties) + { cmd = cmd.with_commit_properties(commit_properties); } @@ -477,9 +467,8 @@ impl RawDeltaTable { max_spill_size = 20 * 1024 * 1024 * 1024, min_commit_interval = None, writer_properties=None, - custom_metadata=None, - post_commithook_properties=None, - max_commit_retries=None))] + commit_properties=None, + post_commithook_properties=None))] pub fn z_order_optimize( &mut self, py: Python, @@ -490,9 +479,8 @@ impl RawDeltaTable { max_spill_size: usize, min_commit_interval: Option, writer_properties: Option, - custom_metadata: Option>, + commit_properties: Option, post_commithook_properties: Option, - max_commit_retries: Option, ) -> PyResult { let (table, metrics) = py.allow_threads(|| { let mut cmd = OptimizeBuilder::new( @@ -515,11 +503,9 @@ impl RawDeltaTable { ); } - if let Some(commit_properties) = maybe_create_commit_properties( - custom_metadata, - max_commit_retries, - post_commithook_properties, - ) { + if let Some(commit_properties) = + maybe_create_commit_properties(commit_properties, post_commithook_properties) + { cmd = cmd.with_commit_properties(commit_properties); } @@ -534,14 +520,13 @@ impl RawDeltaTable { Ok(serde_json::to_string(&metrics).unwrap()) } - #[pyo3(signature = (fields, custom_metadata=None, post_commithook_properties=None, max_commit_retries=None))] + #[pyo3(signature = (fields, commit_properties=None, post_commithook_properties=None))] pub fn add_columns( &mut self, py: Python, fields: Vec, - custom_metadata: Option>, + commit_properties: Option, post_commithook_properties: Option, - max_commit_retries: Option, ) -> PyResult<()> { let table = py.allow_threads(|| { let mut cmd = AddColumnBuilder::new( @@ -556,11 +541,9 @@ impl RawDeltaTable { cmd = cmd.with_fields(new_fields); - if let Some(commit_properties) = maybe_create_commit_properties( - custom_metadata, - max_commit_retries, - post_commithook_properties, - ) { + if let Some(commit_properties) = + maybe_create_commit_properties(commit_properties, post_commithook_properties) + { cmd = cmd.with_commit_properties(commit_properties); } @@ -570,14 +553,13 @@ impl RawDeltaTable { Ok(()) } - #[pyo3(signature = (constraints, custom_metadata=None, post_commithook_properties=None, max_commit_retries=None))] + #[pyo3(signature = (constraints, commit_properties=None, post_commithook_properties=None))] pub fn add_constraints( &mut self, py: Python, constraints: HashMap, - custom_metadata: Option>, + commit_properties: Option, post_commithook_properties: Option, - max_commit_retries: Option, ) -> PyResult<()> { let table = py.allow_threads(|| { let mut cmd = ConstraintBuilder::new( @@ -589,11 +571,9 @@ impl RawDeltaTable { cmd = cmd.with_constraint(col_name.clone(), expression.clone()); } - if let Some(commit_properties) = maybe_create_commit_properties( - custom_metadata, - max_commit_retries, - post_commithook_properties, - ) { + if let Some(commit_properties) = + maybe_create_commit_properties(commit_properties, post_commithook_properties) + { cmd = cmd.with_commit_properties(commit_properties); } @@ -603,15 +583,14 @@ impl RawDeltaTable { Ok(()) } - #[pyo3(signature = (name, raise_if_not_exists, custom_metadata=None, post_commithook_properties=None, max_commit_retries=None))] + #[pyo3(signature = (name, raise_if_not_exists, commit_properties=None, post_commithook_properties=None))] pub fn drop_constraints( &mut self, py: Python, name: String, raise_if_not_exists: bool, - custom_metadata: Option>, + commit_properties: Option, post_commithook_properties: Option, - max_commit_retries: Option, ) -> PyResult<()> { let table = py.allow_threads(|| { let mut cmd = DropConstraintBuilder::new( @@ -621,11 +600,9 @@ impl RawDeltaTable { .with_constraint(name) .with_raise_if_not_exists(raise_if_not_exists); - if let Some(commit_properties) = maybe_create_commit_properties( - custom_metadata, - max_commit_retries, - post_commithook_properties, - ) { + if let Some(commit_properties) = + maybe_create_commit_properties(commit_properties, post_commithook_properties) + { cmd = cmd.with_commit_properties(commit_properties); } @@ -711,8 +688,7 @@ impl RawDeltaTable { safe_cast = false, writer_properties = None, post_commithook_properties = None, - custom_metadata = None, - max_commit_retries=None, + commit_properties = None, ))] pub fn create_merge_builder( &self, @@ -724,8 +700,7 @@ impl RawDeltaTable { safe_cast: bool, writer_properties: Option, post_commithook_properties: Option, - custom_metadata: Option>, - max_commit_retries: Option, + commit_properties: Option, ) -> PyResult { py.allow_threads(|| { Ok(PyMergeBuilder::new( @@ -738,8 +713,7 @@ impl RawDeltaTable { safe_cast, writer_properties, post_commithook_properties, - custom_metadata, - max_commit_retries, + commit_properties, ) .map_err(PythonError::from)?) }) @@ -761,14 +735,13 @@ impl RawDeltaTable { } // Run the restore command on the Delta Table: restore table to a given version or datetime - #[pyo3(signature = (target, *, ignore_missing_files = false, protocol_downgrade_allowed = false, custom_metadata=None, max_commit_retries=None))] + #[pyo3(signature = (target, *, ignore_missing_files = false, protocol_downgrade_allowed = false, commit_properties=None))] pub fn restore( &mut self, target: Option<&Bound<'_, PyAny>>, ignore_missing_files: bool, protocol_downgrade_allowed: bool, - custom_metadata: Option>, - max_commit_retries: Option, + commit_properties: Option, ) -> PyResult { let mut cmd = RestoreBuilder::new( self._table.log_store(), @@ -790,9 +763,7 @@ impl RawDeltaTable { cmd = cmd.with_ignore_missing_files(ignore_missing_files); cmd = cmd.with_protocol_downgrade_allowed(protocol_downgrade_allowed); - if let Some(commit_properties) = - maybe_create_commit_properties(custom_metadata, max_commit_retries, None) - { + if let Some(commit_properties) = maybe_create_commit_properties(commit_properties, None) { cmd = cmd.with_commit_properties(commit_properties); } @@ -953,9 +924,8 @@ impl RawDeltaTable { partition_by: Vec, schema: PyArrowType, partitions_filters: Option>, - custom_metadata: Option>, + commit_properties: Option, post_commithook_properties: Option, - max_commit_retries: Option, ) -> PyResult<()> { py.allow_threads(|| { let mode = mode.parse().map_err(PythonError::from)?; @@ -1039,24 +1009,25 @@ impl RawDeltaTable { predicate: None, }; - let mut commit_properties = CommitProperties::default(); - if let Some(metadata) = custom_metadata { - let json_metadata: Map = - metadata.into_iter().map(|(k, v)| (k, v.into())).collect(); - commit_properties = commit_properties.with_metadata(json_metadata); - }; - - if let Some(max_retries) = max_commit_retries { - commit_properties = commit_properties.with_max_retries(max_retries); - }; + let mut properties = CommitProperties::default(); + if let Some(props) = commit_properties { + if let Some(metadata) = props.custom_metadata { + let json_metadata: Map = + metadata.into_iter().map(|(k, v)| (k, v.into())).collect(); + properties = properties.with_metadata(json_metadata); + }; + + if let Some(max_retries) = props.max_commit_retries { + properties = properties.with_max_retries(max_retries); + }; + } if let Some(post_commit_hook_props) = post_commithook_properties { - commit_properties = - set_post_commithook_properties(commit_properties, post_commit_hook_props) + properties = set_post_commithook_properties(properties, post_commit_hook_props) } rt().block_on( - CommitBuilder::from(commit_properties) + CommitBuilder::from(properties) .with_actions(actions) .build( Some(self._table.snapshot().map_err(PythonError::from)?), @@ -1122,15 +1093,14 @@ impl RawDeltaTable { .collect::>()) } /// Run the delete command on the delta table: delete records following a predicate and return the delete metrics. - #[pyo3(signature = (predicate = None, writer_properties=None, custom_metadata=None, post_commithook_properties=None, max_commit_retries=None))] + #[pyo3(signature = (predicate = None, writer_properties=None, commit_properties=None, post_commithook_properties=None))] pub fn delete( &mut self, py: Python, predicate: Option, writer_properties: Option, - custom_metadata: Option>, + commit_properties: Option, post_commithook_properties: Option, - max_commit_retries: Option, ) -> PyResult { let (table, metrics) = py.allow_threads(|| { let mut cmd = DeleteBuilder::new( @@ -1145,11 +1115,9 @@ impl RawDeltaTable { set_writer_properties(writer_props).map_err(PythonError::from)?, ); } - if let Some(commit_properties) = maybe_create_commit_properties( - custom_metadata, - max_commit_retries, - post_commithook_properties, - ) { + if let Some(commit_properties) = + maybe_create_commit_properties(commit_properties, post_commithook_properties) + { cmd = cmd.with_commit_properties(commit_properties); } @@ -1159,13 +1127,12 @@ impl RawDeltaTable { Ok(serde_json::to_string(&metrics).unwrap()) } - #[pyo3(signature = (properties, raise_if_not_exists, custom_metadata=None, max_commit_retries=None))] + #[pyo3(signature = (properties, raise_if_not_exists, commit_properties=None))] pub fn set_table_properties( &mut self, properties: HashMap, raise_if_not_exists: bool, - custom_metadata: Option>, - max_commit_retries: Option, + commit_properties: Option, ) -> PyResult<()> { let mut cmd = SetTablePropertiesBuilder::new( self._table.log_store(), @@ -1174,9 +1141,7 @@ impl RawDeltaTable { .with_properties(properties) .with_raise_if_not_exists(raise_if_not_exists); - if let Some(commit_properties) = - maybe_create_commit_properties(custom_metadata, max_commit_retries, None) - { + if let Some(commit_properties) = maybe_create_commit_properties(commit_properties, None) { cmd = cmd.with_commit_properties(commit_properties); } @@ -1189,13 +1154,12 @@ impl RawDeltaTable { /// Execute the File System Check command (FSCK) on the delta table: removes old reference to files that /// have been deleted or are malformed - #[pyo3(signature = (dry_run = true, custom_metadata = None, post_commithook_properties=None, max_commit_retries=None))] + #[pyo3(signature = (dry_run = true, commit_properties = None, post_commithook_properties=None))] pub fn repair( &mut self, dry_run: bool, - custom_metadata: Option>, + commit_properties: Option, post_commithook_properties: Option, - max_commit_retries: Option, ) -> PyResult { let mut cmd = FileSystemCheckBuilder::new( self._table.log_store(), @@ -1203,11 +1167,9 @@ impl RawDeltaTable { ) .with_dry_run(dry_run); - if let Some(commit_properties) = maybe_create_commit_properties( - custom_metadata, - max_commit_retries, - post_commithook_properties, - ) { + if let Some(commit_properties) = + maybe_create_commit_properties(commit_properties, post_commithook_properties) + { cmd = cmd.with_commit_properties(commit_properties); } @@ -1345,23 +1307,25 @@ fn convert_partition_filters( } fn maybe_create_commit_properties( - custom_metadata: Option>, - max_commit_retries: Option, + maybe_commit_properties: Option, post_commithook_properties: Option, ) -> Option { - if custom_metadata.is_none() && post_commithook_properties.is_none() { + if maybe_commit_properties.is_none() && post_commithook_properties.is_none() { return None; } let mut commit_properties = CommitProperties::default(); - if let Some(metadata) = custom_metadata { - let json_metadata: Map = - metadata.into_iter().map(|(k, v)| (k, v.into())).collect(); - commit_properties = commit_properties.with_metadata(json_metadata); - }; - if let Some(max_retries) = max_commit_retries { - commit_properties = commit_properties.with_max_retries(max_retries); - }; + if let Some(commit_props) = maybe_commit_properties { + if let Some(metadata) = commit_props.custom_metadata { + let json_metadata: Map = + metadata.into_iter().map(|(k, v)| (k, v.into())).collect(); + commit_properties = commit_properties.with_metadata(json_metadata); + }; + + if let Some(max_retries) = commit_props.max_commit_retries { + commit_properties = commit_properties.with_max_retries(max_retries); + }; + } if let Some(post_commit_hook_props) = post_commithook_properties { commit_properties = @@ -1639,6 +1603,12 @@ pub struct PyPostCommitHookProperties { cleanup_expired_logs: Option, } +#[derive(FromPyObject)] +pub struct PyCommitProperties { + custom_metadata: Option>, + max_commit_retries: Option, +} + #[pyfunction] #[allow(clippy::too_many_arguments)] fn write_to_deltalake( @@ -1656,9 +1626,8 @@ fn write_to_deltalake( configuration: Option>>, storage_options: Option>, writer_properties: Option, - custom_metadata: Option>, + commit_properties: Option, post_commithook_properties: Option, - max_commit_retries: Option, ) -> PyResult<()> { py.allow_threads(|| { let batches = data.0.map(|batch| batch.unwrap()).collect::>(); @@ -1708,24 +1677,11 @@ fn write_to_deltalake( builder = builder.with_configuration(config); }; - if custom_metadata.is_some() || post_commithook_properties.is_some() { - let mut commit_properties = CommitProperties::default(); - if let Some(metadata) = custom_metadata { - let json_metadata: Map = - metadata.into_iter().map(|(k, v)| (k, v.into())).collect(); - commit_properties = commit_properties.with_metadata(json_metadata); - }; - - if let Some(max_retries) = max_commit_retries { - commit_properties = commit_properties.with_max_retries(max_retries); - }; - - if let Some(post_commit_hook_props) = post_commithook_properties { - commit_properties = - set_post_commithook_properties(commit_properties, post_commit_hook_props) - } + if let Some(commit_properties) = + maybe_create_commit_properties(commit_properties, post_commithook_properties) + { builder = builder.with_commit_properties(commit_properties); - } + }; rt().block_on(builder.into_future()) .map_err(PythonError::from)?; diff --git a/python/src/merge.rs b/python/src/merge.rs index fd5e95081c..e1e427f46d 100644 --- a/python/src/merge.rs +++ b/python/src/merge.rs @@ -17,8 +17,8 @@ use std::sync::Arc; use crate::error::PythonError; use crate::utils::rt; use crate::{ - maybe_create_commit_properties, set_writer_properties, PyPostCommitHookProperties, - PyWriterProperties, + maybe_create_commit_properties, set_writer_properties, PyCommitProperties, + PyPostCommitHookProperties, PyWriterProperties, }; #[pyclass(module = "deltalake._internal")] @@ -43,8 +43,7 @@ impl PyMergeBuilder { safe_cast: bool, writer_properties: Option, post_commithook_properties: Option, - custom_metadata: Option>, - max_commit_retries: Option, + commit_properties: Option, ) -> DeltaResult { let ctx = SessionContext::new(); let schema = source.schema(); @@ -68,11 +67,9 @@ impl PyMergeBuilder { cmd = cmd.with_writer_properties(set_writer_properties(writer_props)?); } - if let Some(commit_properties) = maybe_create_commit_properties( - custom_metadata, - max_commit_retries, - post_commithook_properties, - ) { + if let Some(commit_properties) = + maybe_create_commit_properties(commit_properties, post_commithook_properties) + { cmd = cmd.with_commit_properties(commit_properties); } Ok(Self {