Skip to content

Commit

Permalink
Merge branch 'main' into identify_slow_tests
Browse files Browse the repository at this point in the history
  • Loading branch information
nvoxland committed Aug 14, 2023
2 parents 2282b53 + b67d527 commit d8b8686
Show file tree
Hide file tree
Showing 5 changed files with 241 additions and 9 deletions.
46 changes: 38 additions & 8 deletions deeplake/core/dataset/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
integrity_check,
save_commit_info,
rebuild_version_info,
_squash_main,
)
from deeplake.util.invalid_view_op import invalid_view_op
from deeplake.util.spinner import spinner
Expand Down Expand Up @@ -1722,7 +1723,7 @@ def delete_branch(self, name: str) -> None:
Raises:
CommitError: If ``branch`` could not be found.
ReadOnlyModeError: If branch deletion is attempted in read-only mode.
Exception: If you have have the given branch currently checked out.
Exception: If you have the given branch currently checked out.
Examples:
Expand Down Expand Up @@ -1767,6 +1768,38 @@ def _delete_branch(self, name: str) -> None:
self._set_read_only(read_only, err=True)
self.storage.autoflush = self._initial_autoflush.pop()

@invalid_view_op
def _squash_main(self) -> None:
"""
DEPRECATED: This method is deprecated and will be removed in a future release.
Squashes all commits in current branch into one commit.
NOTE: This cannot be run if there are any branches besides ``main``
Raises:
ReadOnlyModeError: If branch deletion is attempted in read-only mode.
VersionControlError: If the branch cannot be squashed.
"""
if self._is_filtered_view:
raise Exception(
"Cannot perform version control operations on a filtered dataset view."
)
read_only = self._read_only
if read_only:
raise ReadOnlyModeError()

try_flushing(self)

self._initial_autoflush.append(self.storage.autoflush)
self.storage.autoflush = False
try:
self._unlock()
_squash_main(self)
finally:
self._set_read_only(read_only, err=True)
self.libdeeplake_dataset = None
self.storage.autoflush = self._initial_autoflush.pop()

def log(self):
"""Displays the details of all the past commits."""

Expand Down Expand Up @@ -4463,8 +4496,6 @@ def pop(self, index: Optional[int] = None):
Raises:
IndexError: If the index is out of range.
"""
self._initial_autoflush.append(self.storage.autoflush)
self.storage.autoflush = False
max_len = max((t.num_samples for t in self.tensors.values()), default=0)
if max_len == 0:
raise IndexError("Can't pop from empty dataset.")
Expand All @@ -4479,11 +4510,10 @@ def pop(self, index: Optional[int] = None):
f"Index {index} is out of range. The longest tensor has {max_len} samples."
)

for tensor in self.tensors.values():
if tensor.num_samples > index:
tensor.pop(index)

self.storage.autoflush = self._initial_autoflush.pop()
with self:
for tensor in self.tensors.values():
if tensor.num_samples > index:
tensor.pop(index)

@property
def is_view(self) -> bool:
Expand Down
2 changes: 2 additions & 0 deletions deeplake/core/vectorstore/test_deeplake_vectorstore.py
Original file line number Diff line number Diff line change
Expand Up @@ -556,6 +556,8 @@ def test_search_managed(hub_cloud_dev_token):
exec_option="tensor_db",
)

assert "vectordb/" in vector_store.dataset.base_storage.path

assert len(data_ce["score"]) == len(data_db["score"])
assert all(
[
Expand Down
2 changes: 1 addition & 1 deletion deeplake/core/vectorstore/vector_search/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def parse_exec_option(dataset, exec_option, indra_installed):

if exec_option is None or exec_option == "auto":
if isinstance(dataset, DeepLakeCloudDataset):
if "vector_db/" in dataset.base_storage.path:
if "vectordb/" in dataset.base_storage.path:
return "tensor_db"
elif indra_installed:
return "compute_engine"
Expand Down
89 changes: 89 additions & 0 deletions deeplake/core/version_control/test_version_control.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
import glob
import json
from collections import OrderedDict

from deeplake.constants import FIRST_COMMIT_ID

import deeplake
import pytest
import numpy as np
Expand Down Expand Up @@ -2551,3 +2555,88 @@ def test_branch_delete(local_ds_generator):

local_ds.delete_branch("alt1_sub1")
assert len(local_ds.branches) == 2


def test_squash_main_has_branch(local_ds_generator):
local_ds = local_ds_generator()
local_ds.create_tensor("test")
with local_ds:
local_ds.test.append("main 1")
local_ds.commit("first main commit")
local_ds.checkout("alt", create=True)

with pytest.raises(VersionControlError) as e:
local_ds._squash_main()
assert "Cannot squash commits if there are multiple branches" in str(e.value)


def test_squash_main_has_view(local_ds_generator):
local_ds = local_ds_generator()
local_ds.create_tensor("test")
with local_ds:
local_ds.test.append("main 1")
local_ds.commit("first main commit")
query = local_ds.filter("test == 'a'")
query.save_view("test_view")

with pytest.raises(VersionControlError) as e:
local_ds._squash_main()
assert "Cannot squash commits if there are views present" in str(e.value)


def test_squash_main(local_ds_generator):
local_ds = local_ds_generator()
local_ds.create_tensor("test")

with local_ds:
# Add commits to main
local_ds.test.append("main 1")
local_ds.test.append("main 2")
local_ds.commit("first main commit")
local_ds.test.append("main 3")
local_ds.commit("second main commit")
local_ds.test.append("main 4")
local_ds.commit("third main commit")
local_ds.test.append("main uncommitted")

assert len(local_ds.branches) == 1
assert len(glob.glob(local_ds.path + "/versions/*")) > 0
assert len(local_ds.test) == 5
assert [i.data()["value"] for i in local_ds.test] == [
"main 1",
"main 2",
"main 3",
"main 4",
"main uncommitted",
]
assert [i["message"] for i in local_ds.commits] == [
"third main commit",
"second main commit",
"first main commit",
]

local_ds._squash_main()

assert len(local_ds.branches) == 1
assert len(glob.glob(local_ds.path + "/versions/*")) == 1
assert [commit["message"] for commit in local_ds.commits] == ["Squashed commits"]
assert local_ds.pending_commit_id != FIRST_COMMIT_ID

with open(local_ds.path + "/version_control_info.json", "r") as f:
data = json.load(f)
assert len(data["commits"]) == 1
assert data["commits"][FIRST_COMMIT_ID]["commit_message"] == None
assert data["commits"][FIRST_COMMIT_ID]["commit_time"] == None
assert data["commits"][FIRST_COMMIT_ID]["commit_user_name"] == None
assert len(data["commits"][FIRST_COMMIT_ID]["children"]) == 0
assert data["commits"][FIRST_COMMIT_ID]["parent"] == None

assert [i.data()["value"] for i in local_ds.test] == [
"main 1",
"main 2",
"main 3",
"main 4",
"main uncommitted",
]
assert [i["message"] for i in local_ds.commits] == ["Squashed commits"]
assert local_ds.pending_commit_id != FIRST_COMMIT_ID
111 changes: 111 additions & 0 deletions deeplake/util/version_control.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
import pickle
from typing import Any, Dict, Optional, List
import warnings

from deeplake.core.meta.encode.chunk_id import ChunkIdEncoder

from deeplake.client.log import logger
from deeplake.constants import FIRST_COMMIT_ID
from deeplake.core import lock
Expand Down Expand Up @@ -40,6 +43,7 @@
get_version_control_info_key_old,
get_version_control_info_lock_key,
get_commit_info_key,
get_pad_encoder_key,
)
from deeplake.constants import COMMIT_INFO_FILENAME
from deeplake.util.path import relpath
Expand Down Expand Up @@ -288,6 +292,113 @@ def checkout(
) from e


def _squash_main(
dataset,
) -> None:
"""
Combines all commits in the main branch into a single commit.
"""
storage = dataset.storage
storage.check_readonly()

version_state = dataset.version_state

if len(dataset.branches) > 1:
raise VersionControlError(
f"Cannot squash commits if there are multiple branches"
)
if len(dataset.get_views()) > 0:
raise VersionControlError(f"Cannot squash commits if there are views present")

try:
base_storage = get_base_storage(storage)
versioncontrol_lock = PersistentLock(
base_storage, get_version_control_info_lock_key()
)
versioncontrol_lock.acquire() # Blocking

dataset_lock = lock.lock_dataset(dataset, dataset.branches[0])

for tensor in dataset._tensors(
include_hidden=True, include_disabled=True
).values():
chunk_engine = tensor.chunk_engine
for chunk_id in [row[0] for row in chunk_engine.chunk_id_encoder._encoded]:
chunk = chunk_engine.get_chunk_from_chunk_id(chunk_id)
if chunk.key.startswith("versions"):
base_storage[
"/".join(
[
tensor.key,
"chunks",
ChunkIdEncoder.name_from_id(chunk_id),
]
)
] = chunk.tobytes()

for key_fn in [
get_tensor_info_key,
get_tensor_meta_key,
get_creds_encoder_key,
get_chunk_id_encoder_key,
get_pad_encoder_key,
get_sequence_encoder_key,
get_tensor_tile_encoder_key,
]:
try:
data_bytes = storage.get_bytes(
key_fn(chunk_engine.key, dataset.pending_commit_id)
)
except KeyError:
continue

base_storage[key_fn(chunk_engine.key, FIRST_COMMIT_ID)] = data_bytes

commits_to_delete = [
commit_id
for commit_id in version_state["commit_node_map"].keys()
if commit_id != FIRST_COMMIT_ID
]

dataset.version_state["commit_node_map"] = {
FIRST_COMMIT_ID: dataset.version_state["commit_node_map"][FIRST_COMMIT_ID],
}
dataset.version_state["commit_node_map"][FIRST_COMMIT_ID].children = []
dataset.version_state["commit_node_map"][FIRST_COMMIT_ID].commit_message = None
dataset.version_state["commit_node_map"][FIRST_COMMIT_ID].commit_time = None
dataset.version_state["commit_node_map"][
FIRST_COMMIT_ID
].commit_user_name = None

dataset.version_state["branch_commit_map"]["main"] = FIRST_COMMIT_ID
dataset.version_state["commit_id"] = FIRST_COMMIT_ID
dataset.version_state["commit_node"] = dataset.version_state["commit_node_map"][
FIRST_COMMIT_ID
]

base_storage[get_version_control_info_key()] = json.dumps(
_version_info_to_json(
{
"commit_node_map": version_state["commit_node_map"],
"branch_commit_map": version_state["branch_commit_map"],
}
)
).encode("utf-8")

for commit_to_delete in commits_to_delete:
delete_version_from_storage(dataset.storage, commit_to_delete)

dataset._reload_version_state()

dataset.commit("Squashed commits")

finally:
versioncontrol_lock.release()
dataset_lock and dataset_lock.release()
#
# dataset._send_branch_deletion_event(branch_name)


def delete_branch(
dataset,
branch_name: str,
Expand Down

0 comments on commit d8b8686

Please sign in to comment.