Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dataset Upload with Force Replace #9072

Open
wants to merge 43 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
d785b0e
[syft/store] fix `KeyValueStorePartition._remove_keys` bug not removi…
khoaguin Jul 22, 2024
569cd93
Merge branch 'dev' into upload-dataset-with-force-replace
khoaguin Jul 22, 2024
d2c6e7f
[syft/chore] add `message=...` for some `SyftError`
khoaguin Jul 22, 2024
a8d0618
Merge branch 'dev' into upload-dataset-with-force-replace
khoaguin Jul 22, 2024
e830aad
[tests/unit] add a test for request service to check if its `stash.up…
khoaguin Jul 23, 2024
61d5bf1
[syft/dataset] implement upload dataset with force_replace
khoaguin Jul 23, 2024
bfe674a
Merge branch 'dev' into upload-dataset-with-force-replace
khoaguin Jul 23, 2024
6f10586
[syft/dataset] update `DatasetUpdate`
khoaguin Jul 24, 2024
e40ce31
Merge branch 'dev' into upload-dataset-with-force-replace
khoaguin Jul 25, 2024
1e9861c
[syft/dataset] merge with `dev` and move logic to include deleted dat…
khoaguin Jul 25, 2024
50b2e0d
[syft/datasite_client] refactoring `upload_dataset`: separating logic…
khoaguin Jul 25, 2024
1d907ee
[syft/datasite_client] separating logic to upload a new dataset and r…
khoaguin Jul 25, 2024
d7912e1
[syft/dataset_service] done `DatasetService.replace` endpoint (withou…
khoaguin Jul 25, 2024
8c1b034
[tests/unit] add a test for uploading with force replace a small dataset
khoaguin Jul 26, 2024
fa89985
Merge branch 'dev' into upload-dataset-with-force-replace
khoaguin Jul 26, 2024
4ee5e39
[syft/dataset] add blob storage entry id attributes for Asset and Cre…
khoaguin Jul 30, 2024
689ee61
[tests/unit] implement unit test for uploading a big dataset with for…
khoaguin Jul 30, 2024
9b63d2a
[tests/unit] add a check to test access permissions for blob entries …
khoaguin Jul 30, 2024
df04ad2
Merge branch 'dev' into upload-dataset-with-force-replace
khoaguin Jul 30, 2024
85a9915
Merge branch 'dev' into upload-dataset-with-force-replace
khoaguin Jul 31, 2024
4d6b30a
Merge branch 'dev' into upload-dataset-with-force-replace
khoaguin Jul 31, 2024
04bb965
[syft/dataset] add migrations for `Asset` and `CreateAsset`
khoaguin Jul 31, 2024
68b8b1c
[tests/unit] get the ds_client directly from conftest instead of an r…
khoaguin Jul 31, 2024
87caff9
Merge branch 'dev' into upload-dataset-with-force-replace
khoaguin Aug 2, 2024
859a166
Merge branch 'dev' into upload-dataset-with-force-replace
khoaguin Aug 5, 2024
c62e115
Merge branch 'dev' into upload-dataset-with-force-replace
khoaguin Aug 6, 2024
6c0487c
[syft/dataset] Define `DatasetV1` class that contains a list of `Asse…
khoaguin Aug 7, 2024
59994b1
[syft/dataset] migrating functions to transform list of assets from V…
khoaguin Aug 7, 2024
5b7a12b
Merge branch 'dev' into upload-dataset-with-force-replace
khoaguin Aug 7, 2024
b8788ad
fix lint
khoaguin Aug 7, 2024
adcff8b
[syft/dataset] migrating functions to transform list of assets from …
khoaguin Aug 8, 2024
c1b9c79
[syft/dataset] handling exception when user does not have permission …
khoaguin Aug 8, 2024
21b5e6b
[tests/unit] modify `test_datasite_client_cannot_upload_dataset_with_…
khoaguin Aug 8, 2024
01ae483
protocol: fix handling of syft object subclass in annotation repr
shubham3121 Aug 8, 2024
97dc55a
Merge branch 'dev' into upload-dataset-with-force-replace
shubham3121 Aug 12, 2024
8d627ac
[syft/dataset] remove mock and data blob entries on the Asset level a…
khoaguin Aug 13, 2024
a925f63
Merge branch 'dev' into upload-dataset-with-force-replace
khoaguin Aug 13, 2024
435754c
Merge branch 'dev' into upload-dataset-with-force-replace
khoaguin Aug 16, 2024
adca0ee
Merge branch 'dev' into upload-dataset-with-force-replace
khoaguin Aug 23, 2024
a69b3de
Merging `datasite_client.py` with dev
khoaguin Sep 10, 2024
fff17e9
Merge branch 'dev' into upload-dataset-with-force-replace
khoaguin Sep 10, 2024
f053035
[dataset_service] stop unwrapping `replace` if success
khoaguin Sep 10, 2024
37a052c
[protocol_version] add `CreateAsset` to protocol_version.json
khoaguin Sep 10, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 71 additions & 16 deletions packages/syft/src/syft/client/datasite_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from ..service.dataset.dataset import Contributor
from ..service.dataset.dataset import CreateAsset
from ..service.dataset.dataset import CreateDataset
from ..service.dataset.dataset import Dataset
from ..service.dataset.dataset import _check_asset_must_contain_mock
from ..service.migration.object_migration_state import MigrationData
from ..service.request.request import Request
Expand All @@ -36,6 +37,7 @@
from ..service.user.user import UserView
from ..types.blob_storage import BlobFile
from ..types.errors import SyftException
from ..types.twin_object import TwinObject
from ..types.uid import UID
from ..util.misc_objs import HTMLObject
from ..util.util import get_mb_size
Expand Down Expand Up @@ -98,27 +100,24 @@ class DatasiteClient(SyftClient):
def __repr__(self) -> str:
return f"<DatasiteClient: {self.name}>"

def upload_dataset(self, dataset: CreateDataset) -> SyftSuccess:
# relative
from ..types.twin_object import TwinObject

def upload_dataset(
self, dataset: CreateDataset, force_replace: bool = False
) -> SyftSuccess | SyftError:
if self.users is None:
raise SyftException(public_message=f"can't get user service for {self}")

user = self.users.get_current_user()

if user.role not in [ServiceRole.DATA_OWNER, ServiceRole.ADMIN]:
return SyftError(message="You don't have permission to upload datasets.")

dataset = add_default_uploader(user, dataset)

for i in range(len(dataset.asset_list)):
asset = dataset.asset_list[i]
dataset.asset_list[i] = add_default_uploader(user, asset)

# dataset._check_asset_must_contain_mock()
dataset_size: float = 0.0

# TODO: Refactor so that object can also be passed to generate warnings

self.api.connection = cast(ServerConnection, self.api.connection)

metadata = self.api.connection.get_server_metadata(self.api.signing_key)
Expand All @@ -134,10 +133,27 @@ def upload_dataset(self, dataset: CreateDataset) -> SyftSuccess:
)
prompt_warning_message(message=message, confirm=True)

with tqdm(
total=len(dataset.asset_list), colour="green", desc="Uploading"
) as pbar:
for asset in dataset.asset_list:
# check if the a dataset with the same name already exists
search_res = self.api.services.dataset.search(dataset.name)
dataset_exists: bool = len(search_res) > 0

if not dataset_exists:
return self._upload_new_dataset(dataset)

existed_dataset: Dataset = search_res[0]
if not force_replace:
return SyftError(
message=f"Dataset with name the '{dataset.name}' already exists. "
"Please use `upload_dataset(dataset, force_replace=True)` to overwrite."
)
return self._replace_dataset(existed_dataset, dataset)

def _upload_assets(self, assets: list[CreateAsset]) -> float | SyftError:
total_assets_size: float = 0.0

with tqdm(total=len(assets), colour="green", desc="Uploading") as pbar:
for asset in assets:
# create and save a twin object representing the asset to the blob store
try:
contains_empty: bool = asset.contains_empty()
twin = TwinObject(
Expand All @@ -163,16 +179,55 @@ def upload_dataset(self, dataset: CreateDataset) -> SyftSuccess:

asset.action_id = twin.id
asset.server_uid = self.id
dataset_size += get_mb_size(asset.data)

total_assets_size += get_mb_size(asset.data)

# Update the progress bar and set the dynamic description
pbar.set_description(f"Uploading: {asset.name}")
pbar.update(1)

dataset.mb_size = dataset_size
return total_assets_size

def _upload_new_dataset(self, dataset: CreateDataset) -> SyftSuccess | SyftError:
# upload the assets
total_assets_size: float | SyftError = self._upload_assets(dataset.asset_list)
if isinstance(total_assets_size, SyftError):
return total_assets_size

# check if the types of the assets are valid
dataset.mb_size = total_assets_size
_check_asset_must_contain_mock(dataset.asset_list)
dataset.check()
return self.api.services.dataset.add(dataset=dataset)
valid = dataset.check()
if isinstance(valid, SyftError):
return valid

# add the dataset object to the dataset store
try:
return self.api.services.dataset.add(dataset=dataset)
except Exception as e:
return SyftError(message=f"Failed to upload dataset. {e}")

def _replace_dataset(
self, existed_dataset: Dataset, dataset: CreateDataset
) -> SyftSuccess | SyftError:
# TODO: is there a way to check if the assets already exist and have not changed,
# since if uploading the assets will have different UIDs
total_assets_size: float | SyftError = self._upload_assets(dataset.asset_list)
if isinstance(total_assets_size, SyftError):
return total_assets_size

# check if the types of the assets are valid
dataset.mb_size = total_assets_size
valid = dataset.check()
_check_asset_must_contain_mock(dataset.asset_list)
if isinstance(valid, SyftError):
return valid
try:
return self.api.services.dataset.replace(
existed_dataset_uid=existed_dataset.id, dataset=dataset
)
except Exception as e:
return SyftError(message=f"Failed to replace dataset. {e}")

def forgot_password(self, email: str) -> SyftSuccess | SyftError:
return self.connection.forgot_password(email=email)
Expand Down
2 changes: 1 addition & 1 deletion packages/syft/src/syft/orchestra.py
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,7 @@ def launch(
display(
SyftInfo(
message=f"You have launched a development server at http://{host}:{server_handle.port}."
+ "It is intended only for local use."
+ " It is intended only for local use."
)
)
return server_handle
Expand Down
17 changes: 16 additions & 1 deletion packages/syft/src/syft/protocol/data_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,27 @@ def handle_union_type_klass_name(type_klass_name: str) -> str:
return type_klass_name


def get_klass_or_canonical_name(arg: Any) -> str:
"""Get the class name or canonical name of the object.

If the object is a subclass of SyftBaseObject, then use canonical name
to identify the object.

"""

return (
arg.__canonical_name__ # If SyftBaseObject subclass, ignore class name
if hasattr(arg, "__canonical_name__")
else getattr(arg, "__name__", str(arg))
)


def handle_annotation_repr_(annotation: type) -> str:
"""Handle typing representation."""
origin = typing.get_origin(annotation)
args = typing.get_args(annotation)
if origin and args:
args_repr = ", ".join(getattr(arg, "__name__", str(arg)) for arg in args)
args_repr = ", ".join(get_klass_or_canonical_name(arg) for arg in args)
origin_repr = getattr(origin, "__name__", str(origin))

# Handle typing.Union and types.UnionType
Expand Down
11 changes: 11 additions & 0 deletions packages/syft/src/syft/protocol/protocol_version.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,16 @@
{
"1": {
"release_name": "0.9.1.json"
},
"dev": {
"object_versions": {
"CreateAsset": {
"2": {
"version": 2,
"hash": "1637c1e35c8cb65c9d667ad91c824b2cc5cf5b281e93770915a68c4926e6a567",
"action": "add"
}
}
}
}
}
54 changes: 46 additions & 8 deletions packages/syft/src/syft/service/dataset/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ class Asset(SyftObject):
shape: tuple | None = None
created_at: DateTime = DateTime.now()
uploader: Contributor | None = None

# _kwarg_name and _dataset_name are set by the UserCode.assets
_kwarg_name: str | None = None
_dataset_name: str | None = None
Expand Down Expand Up @@ -314,7 +313,7 @@ def check_mock(data: Any, mock: Any) -> bool:


@serializable()
class CreateAsset(SyftObject):
class CreateAssetV1(SyftObject):
# version
__canonical_name__ = "CreateAsset"
__version__ = SYFT_OBJECT_VERSION_1
Expand All @@ -336,6 +335,30 @@ class CreateAsset(SyftObject):
__repr_attrs__ = ["name"]
model_config = ConfigDict(validate_assignment=True, extra="forbid")


@serializable()
class CreateAsset(SyftObject):
# version
__canonical_name__ = "CreateAsset"
__version__ = SYFT_OBJECT_VERSION_2

id: UID | None = None # type:ignore[assignment]
name: str
description: MarkdownDescription | None = None
contributors: set[Contributor] = set()
data_subjects: list[DataSubjectCreate] = []
server_uid: UID | None = None
action_id: UID | None = None
data: Any | None = None
mock: Any | None = None
shape: tuple | None = None
mock_is_real: bool = False
created_at: DateTime | None = None
uploader: Contributor | None = None

__repr_attrs__ = ["name"]
model_config = ConfigDict(validate_assignment=True, extra="forbid")

def __init__(self, description: str | None = None, **data: Any) -> None:
if isinstance(description, str):
description = MarkdownDescription(text=description)
Expand Down Expand Up @@ -514,9 +537,7 @@ def _repr_html_(self) -> Any:
"""
else:
description_info_message = ""
if self.to_be_deleted:
return "This dataset has been marked for deletion. The underlying data may be not available."
return f"""
repr_html = f"""
<div class='syft-dataset'>
<h1>{self.name}</h1>
<h2><strong><span class='pr-8'>Summary</span></strong></h2>
Expand All @@ -529,9 +550,19 @@ def _repr_html_(self) -> Any:
</span></strong><a href='{self.url}'>{self.url}</a></p>
<p class='paragraph-sm'><strong><span class='pr-8'>Contributors:</span></strong>
To see full details call <strong>dataset.contributors</strong>.</p>
<h2><strong><span class='pr-8'>Assets</span></strong></h2>
{self.assets._repr_html_()}
"""
"""
if self.to_be_deleted:
repr_html += (
"<h2><strong><span class='pr-8'>"
"This dataset has been marked for deletion. The underlying data may be not available"
"</span></strong></h2>"
)
else:
repr_html += f"""
<h2><strong><span class='pr-8'>Assets</span></strong></h2>
{self.assets._repr_html_()}
"""
return repr_html

def action_ids(self) -> list[UID]:
return [asset.action_id for asset in self.asset_list if asset.action_id]
Expand Down Expand Up @@ -874,3 +905,10 @@ class DatasetUpdate(PartialSyftObject):

name: str
to_be_deleted: bool
asset_list: list[Asset]
contributors: set[Contributor]
citation: str
url: str
description: MarkdownDescription
uploader: Contributor
summary: str
33 changes: 27 additions & 6 deletions packages/syft/src/syft/service/dataset/dataset_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,15 +115,16 @@ def get_all(
context: AuthedServiceContext,
page_size: int | None = 0,
page_index: int | None = 0,
include_deleted: bool = False,
) -> DatasetPageView | DictTuple[str, Dataset]:
"""Get a Dataset"""
datasets = self.stash.get_all(context.credentials).unwrap()
datasets = self.stash.get_all(
context.credentials, include_deleted=include_deleted
).unwrap()

for dataset in datasets:
if context.server is not None:
dataset.server_uid = context.server.id
if dataset.to_be_deleted:
datasets.remove(dataset)

return _paginate_dataset_collection(
datasets=datasets, page_size=page_size, page_index=page_index
Expand All @@ -141,9 +142,7 @@ def search(
results = self.get_all(context)

filtered_results = [
dataset
for dataset_name, dataset in results.items()
if name in dataset_name and not dataset.to_be_deleted
dataset for dataset_name, dataset in results.items() if name in dataset_name
]

return _paginate_dataset_collection(
Expand Down Expand Up @@ -242,6 +241,28 @@ def delete(
return_msg.append(f"Dataset with id '{uid}' successfully deleted.")
return SyftSuccess(message="\n".join(return_msg))

@service_method(
path="dataset.replace",
name="replace",
roles=DATA_OWNER_ROLE_LEVEL,
unwrap_on_success=False,
)
def replace(
self,
context: AuthedServiceContext,
existed_dataset_uid: UID,
dataset: CreateDataset,
) -> SyftSuccess:
dataset = dataset.to(Dataset, context=context)
dataset.id = existed_dataset_uid
self.stash.update(
credentials=context.credentials, dataset_update=dataset
).unwrap()
# TODO: should we delete the existed dataset's asssets after force replace?
return SyftSuccess(
message=f"Dataset with id '{existed_dataset_uid}' successfully replaced."
)


TYPE_TO_SERVICE[Dataset] = DatasetService
SERVICE_TO_TYPES[DatasetService].update({Dataset})
Loading
Loading