Skip to content

Commit

Permalink
Split up large delete_branch () method
Browse files Browse the repository at this point in the history
  • Loading branch information
nvoxland committed Aug 4, 2023
1 parent 7bb350b commit f84ee82
Showing 1 changed file with 114 additions and 67 deletions.
181 changes: 114 additions & 67 deletions deeplake/util/version_control.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import random
import shutil
import time
import hashlib
import pickle
from typing import Any, Dict, Optional, List
import warnings
from deeplake.client.log import logger
from deeplake.constants import FIRST_COMMIT_ID
from deeplake.core import lock
from deeplake.core.fast_forwarding import ffw_dataset_meta
from deeplake.core.meta.dataset_meta import DatasetMeta
from deeplake.core.storage.deeplake_memory_object import DeepLakeMemoryObject
Expand All @@ -17,7 +17,7 @@
from deeplake.core.version_control.commit_node import CommitNode # type: ignore
from deeplake.core.version_control.commit_chunk_map import CommitChunkMap # type: ignore
from deeplake.core.storage import LRUCache
from deeplake.core.lock import Lock
from deeplake.core.lock import Lock, PersistentLock
from deeplake.util.exceptions import (
CheckoutError,
CommitError,
Expand Down Expand Up @@ -291,7 +291,10 @@ def delete_branch(
dataset,
branch_name: str,
) -> None:
"""Deletes the branch reference and cleans up any unneeded data."""
"""
Deletes the branch reference and cleans up any unneeded data.
Branches can only be deleted if there are no sub-branches or if the branch has been merged into another branch ever.
"""

storage = dataset.storage
storage.check_readonly()
Expand All @@ -306,80 +309,124 @@ def delete_branch(
if branch_name == "main":
raise VersionControlError("Cannot delete the main branch")

if branch_name not in version_state["branch_commit_map"].keys():
raise VersionControlError(f"Branch {branch_name} does not exist")

storage = get_base_storage(storage)
lock = Lock(storage, get_version_control_info_lock_key(), duration=10)
lock.acquire() # Blocking
versioncontrol_lock = PersistentLock(storage, get_version_control_info_lock_key())
versioncontrol_lock.acquire() # Blocking

dataset_lock = lock.lock_dataset(
dataset, version=dataset.version_state["branch_commit_map"][branch_name]
)

try:
key = get_version_control_info_key()
all_branch_commits = _find_branch_commits(branch_name, version_state)

try:
stored_version_info = _version_info_from_json(
json.loads(storage[key].decode("utf-8"))
)
except KeyError:
try:
stored_version_info = pickle.loads(
storage[get_version_control_info_key_old()]
) # backward compatiblity
except KeyError:
raise VersionControlError(f"Cannot read version control info.")

if branch_name not in stored_version_info["branch_commit_map"].keys():
raise VersionControlError(f"Branch {branch_name} does not exist.")

# remove branch reference
stored_version_info["branch_commit_map"].pop(branch_name)

# garbage collect unreferenced commits
referenced_commits = set()
for existing_branch, branch_head_commit in stored_version_info[
"branch_commit_map"
].items():
referenced_commits.add(branch_head_commit)

parent_commit = stored_version_info["commit_node_map"][
branch_head_commit
].parent
while parent_commit is not None:
referenced_commits.add(parent_commit.commit_id)
parent_commit = parent_commit.parent

for existing_commit in list(stored_version_info["commit_node_map"].keys()):
if existing_commit not in referenced_commits:
print(f"deleting commit {existing_commit}")
stored_version_info["commit_node_map"].pop(existing_commit)

delete_version_from_storage(storage, existing_commit)
else:
version_to_keep = stored_version_info["commit_node_map"][
existing_commit
]
# clear out invalid children
version_to_keep.children = [
child
for child in version_to_keep.children
if child.commit_id in referenced_commits
]
# Check that nothing points to any of the commits to delete
for commit_id, commit_node in version_state["commit_node_map"].items():
if commit_id in all_branch_commits:
continue

# set branch to a valid branch name
if (
version_to_keep.branch
not in stored_version_info["branch_commit_map"].keys()
):
version_to_keep.branch = version_to_keep.children[0].branch
if commit_node.parent in all_branch_commits:
raise VersionControlError(
f"Cannot delete branch {branch_name} because it has been previously merged"
)

storage[key] = json.dumps(_version_info_to_json(stored_version_info)).encode(
"utf-8"
)
storage.flush()
finally:
lock.release()
for tensor in dataset.tensors:
chunk_map_key = get_tensor_commit_chunk_map_key(tensor, commit_id)

dataset._reload_version_state()
try:
found_map = dataset.storage.get_deeplake_object(
chunk_map_key, CommitChunkMap
)
if (
len(
[
1
for val in found_map.chunks.values()
if "commit_id" in val.keys()
and val["commit_id"] in all_branch_commits
]
)
> 0
):
raise VersionControlError(
f"Cannot delete branch {branch_name} because it has been previously merged into {commit_node.branch}"
)
except KeyError:
pass # no chunk map for this commit
except FileNotFoundError:
pass # no chunk map for this commit

_delete_branch_and_commits(branch_name, all_branch_commits, dataset, storage)

finally:
versioncontrol_lock.release()
dataset_lock and dataset_lock.release()

dataset._send_branch_deletion_event(branch_name)


def _delete_branch_and_commits(
branch_name: str, all_branch_commits: list[str], dataset, storage
) -> None:
"""
Physically deletes the given branch and list of commits from the version_control_info.json and versions directories.
Any validation on the information should have been performed before this method is called
"""
version_state = dataset.version_state

version_state["branch_commit_map"].pop(branch_name)
for commit_id, commit_node in list(version_state["commit_node_map"].items()):
if commit_id in all_branch_commits:
version_state["commit_node_map"].pop(commit_id)
continue

commit_node.children = [
child
for child in commit_node.children
if child.commit_id not in all_branch_commits
]
for commit_id in all_branch_commits:
delete_version_from_storage(dataset.storage, commit_id)

storage[get_version_control_info_key()] = json.dumps(
_version_info_to_json(
{
"commit_node_map": version_state["commit_node_map"],
"branch_commit_map": version_state["branch_commit_map"],
}
)
).encode("utf-8")


def _find_branch_commits(branch_name: str, version_state: dict) -> list[str]:
"""
Returns a list of all commits used by the given branch
"""
all_branch_commits = []
branch_commit = version_state["branch_commit_map"][branch_name]
branch_commit_node = version_state["commit_node_map"][branch_commit]
while branch_commit_node.branch == branch_name:
all_branch_commits.append(branch_commit_node.commit_id)
if (
len(
[
child
for child in branch_commit_node.children
if child.commit_id not in all_branch_commits
]
)
> 0
):
raise VersionControlError(
f"Cannot delete branch {branch_name} because it has sub-branches"
)
branch_commit_node = branch_commit_node.parent
return all_branch_commits


def copy_metas(
src_commit_id: str,
dest_commit_id: str,
Expand Down

0 comments on commit f84ee82

Please sign in to comment.