diff --git a/deeplake/util/version_control.py b/deeplake/util/version_control.py index b6b383431a..2ff1671556 100644 --- a/deeplake/util/version_control.py +++ b/deeplake/util/version_control.py @@ -1,5 +1,4 @@ import random -import shutil import time import hashlib import pickle @@ -7,6 +6,7 @@ import warnings from deeplake.client.log import logger from deeplake.constants import FIRST_COMMIT_ID +from deeplake.core import lock from deeplake.core.fast_forwarding import ffw_dataset_meta from deeplake.core.meta.dataset_meta import DatasetMeta from deeplake.core.storage.deeplake_memory_object import DeepLakeMemoryObject @@ -17,7 +17,7 @@ from deeplake.core.version_control.commit_node import CommitNode # type: ignore from deeplake.core.version_control.commit_chunk_map import CommitChunkMap # type: ignore from deeplake.core.storage import LRUCache -from deeplake.core.lock import Lock +from deeplake.core.lock import Lock, PersistentLock from deeplake.util.exceptions import ( CheckoutError, CommitError, @@ -291,7 +291,10 @@ def delete_branch( dataset, branch_name: str, ) -> None: - """Deletes the branch reference and cleans up any unneeded data.""" + """ + Deletes the branch reference and cleans up any unneeded data. + Branches can only be deleted if there are no sub-branches or if the branch has been merged into another branch ever. + """ storage = dataset.storage storage.check_readonly() @@ -306,80 +309,124 @@ def delete_branch( if branch_name == "main": raise VersionControlError("Cannot delete the main branch") + if branch_name not in version_state["branch_commit_map"].keys(): + raise VersionControlError(f"Branch {branch_name} does not exist") + storage = get_base_storage(storage) - lock = Lock(storage, get_version_control_info_lock_key(), duration=10) - lock.acquire() # Blocking + versioncontrol_lock = PersistentLock(storage, get_version_control_info_lock_key()) + versioncontrol_lock.acquire() # Blocking + + dataset_lock = lock.lock_dataset( + dataset, version=dataset.version_state["branch_commit_map"][branch_name] + ) + try: - key = get_version_control_info_key() + all_branch_commits = _find_branch_commits(branch_name, version_state) - try: - stored_version_info = _version_info_from_json( - json.loads(storage[key].decode("utf-8")) - ) - except KeyError: - try: - stored_version_info = pickle.loads( - storage[get_version_control_info_key_old()] - ) # backward compatiblity - except KeyError: - raise VersionControlError(f"Cannot read version control info.") - - if branch_name not in stored_version_info["branch_commit_map"].keys(): - raise VersionControlError(f"Branch {branch_name} does not exist.") - - # remove branch reference - stored_version_info["branch_commit_map"].pop(branch_name) - - # garbage collect unreferenced commits - referenced_commits = set() - for existing_branch, branch_head_commit in stored_version_info[ - "branch_commit_map" - ].items(): - referenced_commits.add(branch_head_commit) - - parent_commit = stored_version_info["commit_node_map"][ - branch_head_commit - ].parent - while parent_commit is not None: - referenced_commits.add(parent_commit.commit_id) - parent_commit = parent_commit.parent - - for existing_commit in list(stored_version_info["commit_node_map"].keys()): - if existing_commit not in referenced_commits: - print(f"deleting commit {existing_commit}") - stored_version_info["commit_node_map"].pop(existing_commit) - - delete_version_from_storage(storage, existing_commit) - else: - version_to_keep = stored_version_info["commit_node_map"][ - existing_commit - ] - # clear out invalid children - version_to_keep.children = [ - child - for child in version_to_keep.children - if child.commit_id in referenced_commits - ] + # Check that nothing points to any of the commits to delete + for commit_id, commit_node in version_state["commit_node_map"].items(): + if commit_id in all_branch_commits: + continue - # set branch to a valid branch name - if ( - version_to_keep.branch - not in stored_version_info["branch_commit_map"].keys() - ): - version_to_keep.branch = version_to_keep.children[0].branch + if commit_node.parent in all_branch_commits: + raise VersionControlError( + f"Cannot delete branch {branch_name} because it has been previously merged" + ) - storage[key] = json.dumps(_version_info_to_json(stored_version_info)).encode( - "utf-8" - ) - storage.flush() - finally: - lock.release() + for tensor in dataset.tensors: + chunk_map_key = get_tensor_commit_chunk_map_key(tensor, commit_id) - dataset._reload_version_state() + try: + found_map = dataset.storage.get_deeplake_object( + chunk_map_key, CommitChunkMap + ) + if ( + len( + [ + 1 + for val in found_map.chunks.values() + if "commit_id" in val.keys() + and val["commit_id"] in all_branch_commits + ] + ) + > 0 + ): + raise VersionControlError( + f"Cannot delete branch {branch_name} because it has been previously merged into {commit_node.branch}" + ) + except KeyError: + pass # no chunk map for this commit + except FileNotFoundError: + pass # no chunk map for this commit + + _delete_branch_and_commits(branch_name, all_branch_commits, dataset, storage) + + finally: + versioncontrol_lock.release() + dataset_lock and dataset_lock.release() dataset._send_branch_deletion_event(branch_name) +def _delete_branch_and_commits( + branch_name: str, all_branch_commits: list[str], dataset, storage +) -> None: + """ + Physically deletes the given branch and list of commits from the version_control_info.json and versions directories. + Any validation on the information should have been performed before this method is called + """ + version_state = dataset.version_state + + version_state["branch_commit_map"].pop(branch_name) + for commit_id, commit_node in list(version_state["commit_node_map"].items()): + if commit_id in all_branch_commits: + version_state["commit_node_map"].pop(commit_id) + continue + + commit_node.children = [ + child + for child in commit_node.children + if child.commit_id not in all_branch_commits + ] + for commit_id in all_branch_commits: + delete_version_from_storage(dataset.storage, commit_id) + + storage[get_version_control_info_key()] = json.dumps( + _version_info_to_json( + { + "commit_node_map": version_state["commit_node_map"], + "branch_commit_map": version_state["branch_commit_map"], + } + ) + ).encode("utf-8") + + +def _find_branch_commits(branch_name: str, version_state: dict) -> list[str]: + """ + Returns a list of all commits used by the given branch + """ + all_branch_commits = [] + branch_commit = version_state["branch_commit_map"][branch_name] + branch_commit_node = version_state["commit_node_map"][branch_commit] + while branch_commit_node.branch == branch_name: + all_branch_commits.append(branch_commit_node.commit_id) + if ( + len( + [ + child + for child in branch_commit_node.children + if child.commit_id not in all_branch_commits + ] + ) + > 0 + ): + raise VersionControlError( + f"Cannot delete branch {branch_name} because it has sub-branches" + ) + branch_commit_node = branch_commit_node.parent + return all_branch_commits + + def copy_metas( src_commit_id: str, dest_commit_id: str,