diff --git a/deeplake/core/dataset/dataset.py b/deeplake/core/dataset/dataset.py index f5bab4617a..75e755a518 100644 --- a/deeplake/core/dataset/dataset.py +++ b/deeplake/core/dataset/dataset.py @@ -24,6 +24,7 @@ integrity_check, save_commit_info, rebuild_version_info, + _squash_main, ) from deeplake.util.invalid_view_op import invalid_view_op from deeplake.util.spinner import spinner @@ -1722,7 +1723,7 @@ def delete_branch(self, name: str) -> None: Raises: CommitError: If ``branch`` could not be found. ReadOnlyModeError: If branch deletion is attempted in read-only mode. - Exception: If you have have the given branch currently checked out. + Exception: If you have the given branch currently checked out. Examples: @@ -1767,6 +1768,38 @@ def _delete_branch(self, name: str) -> None: self._set_read_only(read_only, err=True) self.storage.autoflush = self._initial_autoflush.pop() + @invalid_view_op + def _squash_main(self) -> None: + """ + DEPRECATED: This method is deprecated and will be removed in a future release. + + Squashes all commits in current branch into one commit. + NOTE: This cannot be run if there are any branches besides ``main`` + + Raises: + ReadOnlyModeError: If branch deletion is attempted in read-only mode. + VersionControlError: If the branch cannot be squashed. + """ + if self._is_filtered_view: + raise Exception( + "Cannot perform version control operations on a filtered dataset view." + ) + read_only = self._read_only + if read_only: + raise ReadOnlyModeError() + + try_flushing(self) + + self._initial_autoflush.append(self.storage.autoflush) + self.storage.autoflush = False + try: + self._unlock() + _squash_main(self) + finally: + self._set_read_only(read_only, err=True) + self.libdeeplake_dataset = None + self.storage.autoflush = self._initial_autoflush.pop() + def log(self): """Displays the details of all the past commits.""" @@ -4463,8 +4496,6 @@ def pop(self, index: Optional[int] = None): Raises: IndexError: If the index is out of range. """ - self._initial_autoflush.append(self.storage.autoflush) - self.storage.autoflush = False max_len = max((t.num_samples for t in self.tensors.values()), default=0) if max_len == 0: raise IndexError("Can't pop from empty dataset.") @@ -4479,11 +4510,10 @@ def pop(self, index: Optional[int] = None): f"Index {index} is out of range. The longest tensor has {max_len} samples." ) - for tensor in self.tensors.values(): - if tensor.num_samples > index: - tensor.pop(index) - - self.storage.autoflush = self._initial_autoflush.pop() + with self: + for tensor in self.tensors.values(): + if tensor.num_samples > index: + tensor.pop(index) @property def is_view(self) -> bool: diff --git a/deeplake/core/vectorstore/test_deeplake_vectorstore.py b/deeplake/core/vectorstore/test_deeplake_vectorstore.py index 5cc28e4a7b..277f957d40 100644 --- a/deeplake/core/vectorstore/test_deeplake_vectorstore.py +++ b/deeplake/core/vectorstore/test_deeplake_vectorstore.py @@ -556,6 +556,8 @@ def test_search_managed(hub_cloud_dev_token): exec_option="tensor_db", ) + assert "vectordb/" in vector_store.dataset.base_storage.path + assert len(data_ce["score"]) == len(data_db["score"]) assert all( [ diff --git a/deeplake/core/vectorstore/vector_search/utils.py b/deeplake/core/vectorstore/vector_search/utils.py index 5ff3928bc1..f513332a6e 100644 --- a/deeplake/core/vectorstore/vector_search/utils.py +++ b/deeplake/core/vectorstore/vector_search/utils.py @@ -26,7 +26,7 @@ def parse_exec_option(dataset, exec_option, indra_installed): if exec_option is None or exec_option == "auto": if isinstance(dataset, DeepLakeCloudDataset): - if "vector_db/" in dataset.base_storage.path: + if "vectordb/" in dataset.base_storage.path: return "tensor_db" elif indra_installed: return "compute_engine" diff --git a/deeplake/core/version_control/test_version_control.py b/deeplake/core/version_control/test_version_control.py index 1e8539acc4..6008f92cf2 100644 --- a/deeplake/core/version_control/test_version_control.py +++ b/deeplake/core/version_control/test_version_control.py @@ -1,5 +1,9 @@ import glob +import json from collections import OrderedDict + +from deeplake.constants import FIRST_COMMIT_ID + import deeplake import pytest import numpy as np @@ -2551,3 +2555,88 @@ def test_branch_delete(local_ds_generator): local_ds.delete_branch("alt1_sub1") assert len(local_ds.branches) == 2 + + +def test_squash_main_has_branch(local_ds_generator): + local_ds = local_ds_generator() + local_ds.create_tensor("test") + with local_ds: + local_ds.test.append("main 1") + local_ds.commit("first main commit") + local_ds.checkout("alt", create=True) + + with pytest.raises(VersionControlError) as e: + local_ds._squash_main() + assert "Cannot squash commits if there are multiple branches" in str(e.value) + + +def test_squash_main_has_view(local_ds_generator): + local_ds = local_ds_generator() + local_ds.create_tensor("test") + with local_ds: + local_ds.test.append("main 1") + local_ds.commit("first main commit") + query = local_ds.filter("test == 'a'") + query.save_view("test_view") + + with pytest.raises(VersionControlError) as e: + local_ds._squash_main() + assert "Cannot squash commits if there are views present" in str(e.value) + + +def test_squash_main(local_ds_generator): + local_ds = local_ds_generator() + local_ds.create_tensor("test") + + with local_ds: + # Add commits to main + local_ds.test.append("main 1") + local_ds.test.append("main 2") + local_ds.commit("first main commit") + local_ds.test.append("main 3") + local_ds.commit("second main commit") + local_ds.test.append("main 4") + local_ds.commit("third main commit") + local_ds.test.append("main uncommitted") + + assert len(local_ds.branches) == 1 + assert len(glob.glob(local_ds.path + "/versions/*")) > 0 + assert len(local_ds.test) == 5 + assert [i.data()["value"] for i in local_ds.test] == [ + "main 1", + "main 2", + "main 3", + "main 4", + "main uncommitted", + ] + assert [i["message"] for i in local_ds.commits] == [ + "third main commit", + "second main commit", + "first main commit", + ] + + local_ds._squash_main() + + assert len(local_ds.branches) == 1 + assert len(glob.glob(local_ds.path + "/versions/*")) == 1 + assert [commit["message"] for commit in local_ds.commits] == ["Squashed commits"] + assert local_ds.pending_commit_id != FIRST_COMMIT_ID + + with open(local_ds.path + "/version_control_info.json", "r") as f: + data = json.load(f) + assert len(data["commits"]) == 1 + assert data["commits"][FIRST_COMMIT_ID]["commit_message"] == None + assert data["commits"][FIRST_COMMIT_ID]["commit_time"] == None + assert data["commits"][FIRST_COMMIT_ID]["commit_user_name"] == None + assert len(data["commits"][FIRST_COMMIT_ID]["children"]) == 0 + assert data["commits"][FIRST_COMMIT_ID]["parent"] == None + + assert [i.data()["value"] for i in local_ds.test] == [ + "main 1", + "main 2", + "main 3", + "main 4", + "main uncommitted", + ] + assert [i["message"] for i in local_ds.commits] == ["Squashed commits"] + assert local_ds.pending_commit_id != FIRST_COMMIT_ID diff --git a/deeplake/util/version_control.py b/deeplake/util/version_control.py index 0fb8336c59..086b893034 100644 --- a/deeplake/util/version_control.py +++ b/deeplake/util/version_control.py @@ -4,6 +4,9 @@ import pickle from typing import Any, Dict, Optional, List import warnings + +from deeplake.core.meta.encode.chunk_id import ChunkIdEncoder + from deeplake.client.log import logger from deeplake.constants import FIRST_COMMIT_ID from deeplake.core import lock @@ -40,6 +43,7 @@ get_version_control_info_key_old, get_version_control_info_lock_key, get_commit_info_key, + get_pad_encoder_key, ) from deeplake.constants import COMMIT_INFO_FILENAME from deeplake.util.path import relpath @@ -288,6 +292,113 @@ def checkout( ) from e +def _squash_main( + dataset, +) -> None: + """ + Combines all commits in the main branch into a single commit. + """ + storage = dataset.storage + storage.check_readonly() + + version_state = dataset.version_state + + if len(dataset.branches) > 1: + raise VersionControlError( + f"Cannot squash commits if there are multiple branches" + ) + if len(dataset.get_views()) > 0: + raise VersionControlError(f"Cannot squash commits if there are views present") + + try: + base_storage = get_base_storage(storage) + versioncontrol_lock = PersistentLock( + base_storage, get_version_control_info_lock_key() + ) + versioncontrol_lock.acquire() # Blocking + + dataset_lock = lock.lock_dataset(dataset, dataset.branches[0]) + + for tensor in dataset._tensors( + include_hidden=True, include_disabled=True + ).values(): + chunk_engine = tensor.chunk_engine + for chunk_id in [row[0] for row in chunk_engine.chunk_id_encoder._encoded]: + chunk = chunk_engine.get_chunk_from_chunk_id(chunk_id) + if chunk.key.startswith("versions"): + base_storage[ + "/".join( + [ + tensor.key, + "chunks", + ChunkIdEncoder.name_from_id(chunk_id), + ] + ) + ] = chunk.tobytes() + + for key_fn in [ + get_tensor_info_key, + get_tensor_meta_key, + get_creds_encoder_key, + get_chunk_id_encoder_key, + get_pad_encoder_key, + get_sequence_encoder_key, + get_tensor_tile_encoder_key, + ]: + try: + data_bytes = storage.get_bytes( + key_fn(chunk_engine.key, dataset.pending_commit_id) + ) + except KeyError: + continue + + base_storage[key_fn(chunk_engine.key, FIRST_COMMIT_ID)] = data_bytes + + commits_to_delete = [ + commit_id + for commit_id in version_state["commit_node_map"].keys() + if commit_id != FIRST_COMMIT_ID + ] + + dataset.version_state["commit_node_map"] = { + FIRST_COMMIT_ID: dataset.version_state["commit_node_map"][FIRST_COMMIT_ID], + } + dataset.version_state["commit_node_map"][FIRST_COMMIT_ID].children = [] + dataset.version_state["commit_node_map"][FIRST_COMMIT_ID].commit_message = None + dataset.version_state["commit_node_map"][FIRST_COMMIT_ID].commit_time = None + dataset.version_state["commit_node_map"][ + FIRST_COMMIT_ID + ].commit_user_name = None + + dataset.version_state["branch_commit_map"]["main"] = FIRST_COMMIT_ID + dataset.version_state["commit_id"] = FIRST_COMMIT_ID + dataset.version_state["commit_node"] = dataset.version_state["commit_node_map"][ + FIRST_COMMIT_ID + ] + + base_storage[get_version_control_info_key()] = json.dumps( + _version_info_to_json( + { + "commit_node_map": version_state["commit_node_map"], + "branch_commit_map": version_state["branch_commit_map"], + } + ) + ).encode("utf-8") + + for commit_to_delete in commits_to_delete: + delete_version_from_storage(dataset.storage, commit_to_delete) + + dataset._reload_version_state() + + dataset.commit("Squashed commits") + + finally: + versioncontrol_lock.release() + dataset_lock and dataset_lock.release() + # + # dataset._send_branch_deletion_event(branch_name) + + def delete_branch( dataset, branch_name: str,