From 59ba6e218ecca36f1ad51673c13b4ad9cc9e2223 Mon Sep 17 00:00:00 2001 From: Giovanni Pizzi Date: Tue, 21 Jul 2020 19:07:06 +0200 Subject: [PATCH 01/18] Objects were not retrieved in disk order anymore + bulk store The main reason why the functions like `get_objects_stream*` do not return objects in the same order is for efficiency, and this is achieved by reorganising objects per pack, and reading the pack file in sequential order. During the code changes, this was not happening anymore. Ensuring now that packed objects are returned in disk order (test also added) Moreover, adding a lot of objects one to one is much less efficient than doing a bulk insert with lower-level non-ORM functionality of SQLAlchemy. For this reason, I am now committing in bulk all data to the DB at the end of each pack file. Some comments on the performance in issue #92. --- disk_objectstore/container.py | 66 +++++++++++++++++++++++++------ tests/test_container.py | 74 +++++++++++++++++++++++++++++++++++ 2 files changed, 127 insertions(+), 13 deletions(-) diff --git a/disk_objectstore/container.py b/disk_objectstore/container.py index 481711f..06fef27 100644 --- a/disk_objectstore/container.py +++ b/disk_objectstore/container.py @@ -467,12 +467,12 @@ def _get_objects_stream_meta_generator( # pylint: disable=too-many-branches,too for chunk in chunk_iterator(hashkeys_set, size=self._IN_SQL_MAX_LENGTH): query = session.query(Obj).filter( Obj.hashkey.in_(chunk) - ).with_entities(Obj.pack_id, Obj.hashkey, Obj.offset, Obj.length, Obj.compressed, - Obj.size).order_by(Obj.offset) + ).with_entities(Obj.pack_id, Obj.hashkey, Obj.offset, Obj.length, Obj.compressed, Obj.size) for res in query: packs[res[0]].append(ObjQueryResults(res[1], res[2], res[3], res[4], res[5])) for pack_int_id, pack_metadata in packs.items(): + pack_metadata.sort(key=lambda metadata: metadata.offset) hashkeys_in_packs.update(obj.hashkey for obj in pack_metadata) pack_path = self._get_pack_path_from_pack_id(str(pack_int_id)) try: @@ -568,8 +568,7 @@ def _get_objects_stream_meta_generator( # pylint: disable=too-many-branches,too for chunk in chunk_iterator(loose_not_found, size=self._IN_SQL_MAX_LENGTH): query = session.query(Obj).filter( Obj.hashkey.in_(chunk) - ).with_entities(Obj.pack_id, Obj.hashkey, Obj.offset, Obj.length, Obj.compressed, - Obj.size).order_by(Obj.offset) + ).with_entities(Obj.pack_id, Obj.hashkey, Obj.offset, Obj.length, Obj.compressed, Obj.size) for res in query: packs[res[0]].append(ObjQueryResults(res[1], res[2], res[3], res[4], res[5])) @@ -578,6 +577,7 @@ def _get_objects_stream_meta_generator( # pylint: disable=too-many-branches,too really_not_found = loose_not_found.copy() for pack_int_id, pack_metadata in packs.items(): + pack_metadata.sort(key=lambda metadata: metadata.offset) # I remove those that I found really_not_found.difference_update(obj.hashkey for obj in pack_metadata) @@ -1100,6 +1100,12 @@ def pack_all_loose(self, compress=False, validate_objects=True): with self.lock_pack(str(pack_int_id)) as pack_handle: # Inner loop: continue until when there is a file, or # if we need to change pack (in this case `break` is called) + + # We will store here the dictionaries with the data to be pushed in the DB + # By collecting the data and committing as a bulk operation at the end, + # we highly improve performance + obj_dicts = [] + while loose_objects: # Check in which pack I need to write to the next object pack_int_id = self._get_pack_id_to_write_to() @@ -1116,15 +1122,16 @@ def pack_all_loose(self, compress=False, validate_objects=True): # Get next hash key to process loose_hashkey = loose_objects.pop() - obj = Obj(hashkey=loose_hashkey) - obj.pack_id = pack_int_id - obj.compressed = compress - obj.offset = pack_handle.tell() + obj_dict = {} + obj_dict['hashkey'] = loose_hashkey + obj_dict['pack_id'] = pack_int_id + obj_dict['compressed'] = compress + obj_dict['offset'] = pack_handle.tell() try: with open(self._get_loose_path_from_hashkey(loose_hashkey), 'rb') as loose_handle: # The second parameter is `None` since we are not computing the hash # We can instead pass the hash algorithm and assert that it is correct - obj.size, new_hashkey = self._write_data_to_packfile( + obj_dict['size'], new_hashkey = self._write_data_to_packfile( pack_handle=pack_handle, read_handle=loose_handle, compress=compress, @@ -1140,8 +1147,21 @@ def pack_all_loose(self, compress=False, validate_objects=True): loose_hashkey, new_hashkey ) ) - obj.length = pack_handle.tell() - obj.offset - session.add(obj) + obj_dict['length'] = pack_handle.tell() - obj_dict['offset'] + + # Appending for later bulk commit - see comments in add_streamed_objects_to_pack + obj_dicts.append(obj_dict) + + # It's now time to write to the DB, in a single bulk operation (per pack) + if obj_dicts: + # Here I shouldn't need to do `OR IGNORE` as in `add_streamed_objects_to_pack` + # Because I'm already checking the hash keys and avoiding to add twice the same + session.execute(Obj.__table__.insert(), obj_dicts) + # Clean up the list - this will be cleaned up also later, + # but it's better to make sure that we do it here, to avoid trying to rewrite + # the same objects again + obj_dicts = [] + # I don't commit here; I commit after making sure the file is flushed and closed # flush and sync to disk before closing safe_flush_to_disk(pack_handle, os.path.realpath(pack_handle.name), use_fullsync=True) @@ -1223,6 +1243,12 @@ def add_streamed_objects_to_pack( # pylint: disable=too-many-locals, too-many-b with self.lock_pack(str(pack_int_id)) as pack_handle: # Inner loop: continue until when there is a file, or # if we need to change pack (in this case `break` is called) + + # We will store here the dictionaries with the data to be pushed in the DB + # By collecting the data and committing as a bulk operation at the end, + # we highly improve performance + obj_dicts = [] + while working_stream_list: # Check in which pack I need to write to the next object pack_int_id = self._get_pack_id_to_write_to() @@ -1294,8 +1320,12 @@ def add_streamed_objects_to_pack( # pylint: disable=too-many-locals, too-many-b # Either no_holes is False: then I don't know if the object exists, so I just try to insert # it and in case do nothing; the space on disk might remain allocated (but unreferenced). # Or `no_holes` is True and I don't have the object: this will insert the entry - insert_command = Obj.__table__.insert().prefix_with('OR IGNORE').values(obj_dict) - session.execute(insert_command) + obj_dicts.append(obj_dict) + + # In the future, if there are memory issues with millions of objects, + # We can flush here to DB if there are too many objects in the cache. + # Also, there are other optimisations that can be done, like deleting + # the pack_metadata when not needed anymore etc. # I also add the hash key in the known_packed_hashkeys (if no_holes, when this is defined) if no_holes: @@ -1328,6 +1358,16 @@ def add_streamed_objects_to_pack( # pylint: disable=too-many-locals, too-many-b # I have written some bytes and then I have seeked back. I truncate then the file at the current # position. pack_handle.truncate() + + # It's now time to write to the DB, in a single bulk operation (per pack) + if obj_dicts: + session.execute(Obj.__table__.insert().prefix_with('OR IGNORE'), obj_dicts) + # Clean up the list - this will be cleaned up also later, + # but it's better to make sure that we do it here, to avoid trying to rewrite + # the same objects again + obj_dicts = [] + # I don't commit here; I commit after making sure the file is flushed and closed + # flush and sync to disk before closing safe_flush_to_disk(pack_handle, os.path.realpath(pack_handle.name), use_fullsync=True) diff --git a/tests/test_container.py b/tests/test_container.py index b6fb07c..5fb231d 100644 --- a/tests/test_container.py +++ b/tests/test_container.py @@ -2701,3 +2701,77 @@ def new_write_data_to_packfile(self, call_counter, *args, **kwargs): assert new_sizes['total_size_packfiles_on_disk'] == 2 * sizes['total_size_packfiles_on_disk'] # (but shouldn't have increased the size of packed objects on disk) assert new_sizes['total_size_packed_on_disk'] == sizes['total_size_packed_on_disk'] + + +def test_packs_read_in_order(temp_dir): + """Test that when reading the objects from packs, they are read grouped by packs, and in offset order. + + This is very important for performance. + + .. note:: IMPORTANT: This is not running with concurrent packing, so only the first internal loop of + get_objects_stream_and_meta is triggered and the order is the one described above. + The application should NOT make any assumption on this because, during concurrent packing of loose objects, + the recently packed/clean_stored objects might be returned later. + + .. note:: We are not checking the order in which packs are considered + """ + num_objects = 10000 # Number of objects + obj_size = 999 + # This will generate N objects of size obj_size each + # They are different because the at least the first characters until the first dash are different + + temp_container = Container(temp_dir) + # A apck should accomodate ~100 objects, and there should be > 90 packs + temp_container.init_container(clear=True, pack_size_target=100000) + + data = [('{}-'.format(i).encode('ascii') * obj_size)[:obj_size] for i in range(num_objects)] + hashkeys = temp_container.add_objects_to_pack(data, compress=False) + + # Check that I indeed created num_objects (different) objects + assert len(set(hashkeys)) == num_objects + + # Shuffle the array. When retrieving data, I should still fetch them per pack, and then in offset order + # (so the pack file is read sequentially rather than randomly) + random.shuffle(hashkeys) + + last_offset = None + last_pack = None + seen_packs = set() + + with temp_container.get_objects_stream_and_meta(hashkeys, skip_if_missing=False) as triplets: + for _, _, meta in triplets: + assert meta['type'] == ObjectType.PACKED + if last_pack is None: + last_pack = meta['pack_id'] + seen_packs.add(meta['pack_id']) + last_offset = 0 + elif meta['pack_id'] != last_pack: + assert meta['pack_id'] not in seen_packs, ( + 'Objects were already retrieved from pack {}, the last pack was {} ' + 'and we are trying to retrieve again from pack {}'.format( + meta['pack_id'], last_pack, meta['pack_id'] + ) + ) + last_pack = meta['pack_id'] + seen_packs.add(meta['pack_id']) + last_offset = 0 + # We are still in the same pack + assert last_offset <= meta['pack_offset'], ( + 'in pack {} we are reading offset {}, but before we were reading a later offset {}'.format( + meta['pack_id'], meta['pack_offset'], last_offset + ) + ) + last_offset = meta['pack_offset'] + + # I want to make sure to have generated enough packs, meaning this function is actually testing the behavior + # This should generated 90 packs + # NOTE: if you use compress = True, you get many less packs since the data is very compressible! (only 2) + # So we only test with compress=False + largest_pack = max(seen_packs) + assert largest_pack > 80 + + # Check that all packs were scanned through + assert sorted(seen_packs) == list(range(largest_pack + 1)) + + # Important before exiting from the tests + temp_container.close() From 3cebca7653f310be62a5ff306be964429e4bdb11 Mon Sep 17 00:00:00 2001 From: Giovanni Pizzi Date: Tue, 21 Jul 2020 20:28:43 +0200 Subject: [PATCH 02/18] Adding callbacks to export and add_objects_to_pack Callback tests still missing --- disk_objectstore/container.py | 135 +++++++++++++++++++++++++++++----- disk_objectstore/utils.py | 101 +++++++++++++++++++++++++ 2 files changed, 217 insertions(+), 19 deletions(-) diff --git a/disk_objectstore/container.py b/disk_objectstore/container.py index 06fef27..bbf51a4 100644 --- a/disk_objectstore/container.py +++ b/disk_objectstore/container.py @@ -18,8 +18,8 @@ from .models import Base, Obj from .utils import ( - ObjectWriter, PackedObjectReader, StreamDecompresser, chunk_iterator, is_known_hash, nullcontext, - safe_flush_to_disk, get_hash, compute_hash_and_size + ObjectWriter, PackedObjectReader, StreamDecompresser, CallbackStreamWrapper, chunk_iterator, is_known_hash, + nullcontext, safe_flush_to_disk, get_hash, compute_hash_and_size ) from .exceptions import NotExistent, NotInitialised, InconsistentContent @@ -441,7 +441,7 @@ def _get_objects_stream_meta_generator( # pylint: disable=too-many-branches,too (i.e., neither packed nor loose). If False, return ``None`` instead of the stream. :param with_streams: if True, yield triplets (hashkey, stream, meta). - :param with_streams: if False, yield pairs (hashkey, meta) and avoid to open any file. + If False, yield pairs (hashkey, meta) and avoid to open any file. """ # pylint: disable=too-many-nested-blocks @@ -1181,8 +1181,46 @@ def pack_all_loose(self, compress=False, validate_objects=True): # on Mac and on Windows (see issues #37 and #43). Therefore, I do NOT delete them, # and deletion is deferred to a manual clean-up operation. - def add_streamed_objects_to_pack( # pylint: disable=too-many-locals, too-many-branches, too-many-statements - self, stream_list, compress=False, open_streams=False, no_holes=False, no_holes_read_twice=True): + def add_streamed_object_to_pack( # pylint: disable=too-many-arguments + self, + stream, + compress=False, + open_streams=False, + no_holes=False, + no_holes_read_twice=True, + callback=None, + callback_size_hint=0 + ): + """Add a single object in streamed form to a pack. + + For the description of the parameters, see the docstring of ``add_streamed_objects_to_pack``. + + The only difference is that here the callback will provide feedback on the progress of this specific object. + :param callback_size_hint: the expected size of the stream - if not provided, it is used send back the total + length in the callbacks + :return: a single objec hash key + """ + streams = [CallbackStreamWrapper(stream, callback=callback, total_length=callback_size_hint)] + + # I specifically set the callback to None + retval = self.add_streamed_objects_to_pack( + streams, + compress=compress, + open_streams=open_streams, + no_holes=no_holes, + no_holes_read_twice=no_holes_read_twice, + callback=None + ) + + # Close the callback so the bar doesn't remaing open + streams[0].close_callback() + + return retval[0] + + + def add_streamed_objects_to_pack( # pylint: disable=too-many-locals, too-many-branches, too-many-statements, too-many-arguments + self, stream_list, compress=False, open_streams=False, no_holes=False, no_holes_read_twice=True, + callback=None): """Add objects directly to a pack, reading from a list of streams. This is a maintenance operation, available mostly for efficiency reasons @@ -1217,6 +1255,17 @@ def add_streamed_objects_to_pack( # pylint: disable=too-many-locals, too-many-b session = self._get_cached_session() if no_holes: + if callback: + total = session.query(Obj).count() + if total: + # If we have a callback, compute the total count of objects in this pack + callback(action='init', value={'total': total, 'description': 'List existing'}) + # Update at most 400 times, avoiding to increase CPU usage; if the list is small: every object. + update_every = max(int(total / 400), 1) + # Counter of how many objects have been since since the last update. + # A new callback will be performed when this value is > update_every. + since_last_update = 0 + known_packed_hashkeys = set() # I need to get the full list of PKs to know if the object exists # As this is expensive, I will do it only if it is needed, i.e. when no_holes is True @@ -1226,19 +1275,42 @@ def add_streamed_objects_to_pack( # pylint: disable=too-many-locals, too-many-b Obj.id ).limit(yield_per_size).with_entities(Obj.id, Obj.hashkey).all() + if not results_chunk: + # No more packed objects + break + for _, hashkey in results_chunk: known_packed_hashkeys.add(hashkey) - if results_chunk: - last_pk = results_chunk[-1][0] - else: - # No more packed objects - break + last_pk = results_chunk[-1][0] + if callback: + since_last_update += len(results_chunk) + if since_last_update >= update_every: + callback(action='update', value=since_last_update) + since_last_update = 0 + + if callback and total: + # Final call to complete the bar + if since_last_update: + callback(action='update', value=since_last_update) + # Perform any wrap-up, if needed + callback(action='close', value=None) + + if callback: + total = len(working_stream_list) + # If we have a callback, compute the total count of objects in this pack + callback(action='init', value={'total': total, 'description': 'Bulk storing'}) + # Update at most 400 times, avoiding to increase CPU usage; if the list is small: every object. + update_every = max(int(total / 400), 1) + # Counter of how many objects have been since since the last update. + # A new callback will be performed when this value is > update_every. + since_last_update = 0 # Outer loop: this is used to continue when a new pack file needs to be created while working_stream_list: # Store the last pack integer ID, needed to know later if I need to open a new pack last_pack_int_id = pack_int_id + # Avoid concurrent writes on the pack file with self.lock_pack(str(pack_int_id)) as pack_handle: # Inner loop: continue until when there is a file, or @@ -1269,6 +1341,12 @@ def add_streamed_objects_to_pack( # pylint: disable=too-many-locals, too-many-b else: stream_context_manager = nullcontext(next_stream) + if callback: + since_last_update += 1 + if since_last_update >= update_every: + callback(action='update', value=since_last_update) + since_last_update = 0 + # Get the position before writing the object - I need it if `no_holes` is True and the object # is already there position_before = pack_handle.tell() @@ -1378,9 +1456,20 @@ def add_streamed_objects_to_pack( # pylint: disable=too-many-locals, too-many-b # Note: because of the logic above, in theory this should not raise an IntegrityError! session.commit() + if callback: + # Final call to complete the bar + if since_last_update: + callback(action='update', value=since_last_update) + # Perform any wrap-up, if needed + callback(action='close', value=None) + return hashkeys - def add_objects_to_pack(self, content_list, compress=False, no_holes=False, no_holes_read_twice=True): + # yapf: disable + def add_objects_to_pack( + self, content_list, compress=False, no_holes=False, no_holes_read_twice=True, callback=None + ): + # yapf: enable """Add objects directly to a pack, reading from a list of content byte arrays. This is a maintenance operation, available mostly for efficiency reasons @@ -1402,7 +1491,11 @@ def add_objects_to_pack(self, content_list, compress=False, no_holes=False, no_h """ stream_list = [io.BytesIO(content) for content in content_list] return self.add_streamed_objects_to_pack( - stream_list=stream_list, compress=compress, no_holes=no_holes, no_holes_read_twice=no_holes_read_twice + stream_list=stream_list, + compress=compress, + no_holes=no_holes, + no_holes_read_twice=no_holes_read_twice, + callback=callback ) def clean_storage(self): # pylint: disable=too-many-branches @@ -1497,7 +1590,7 @@ def clean_storage(self): # pylint: disable=too-many-branches # I just ignore, I will remove it in a future call of this method. pass - def export(self, hashkeys, other_container, compress=False, target_memory_bytes=104857600): + def export(self, hashkeys, other_container, compress=False, target_memory_bytes=104857600, callback=None): """Export the specified hashkeys to a new container (must be already initialised). :param hashkeys: an iterable of hash keys. @@ -1530,10 +1623,14 @@ def export(self, hashkeys, other_container, compress=False, target_memory_bytes= # but I avoid to write a huge object to disk when it's not needed because already available # on the destination new_obj_hashkeys.append( - other_container.add_streamed_objects_to_pack([stream], - compress=compress, - no_holes=True, - no_holes_read_twice=True)[0] + other_container.add_streamed_object_to_pack( + stream, + compress=compress, + no_holes=True, + no_holes_read_twice=True, + callback=callback, + callback_size_hint=meta['size'] + ) ) elif cache_size + meta['size'] > target_memory_bytes: # I were to read the content, I would be filling too much memory - I flush the cache first, @@ -1548,7 +1645,7 @@ def export(self, hashkeys, other_container, compress=False, target_memory_bytes= # I put all of them in bulk # I accept the performance hit of reading twice (especially since it's already on memory) temp_new_hashkeys = other_container.add_objects_to_pack( - data, compress=compress, no_holes=True, no_holes_read_twice=True + data, compress=compress, no_holes=True, no_holes_read_twice=True, callback=callback ) # I update the list of known old (this container) and new (other_container) hash keys @@ -1582,7 +1679,7 @@ def export(self, hashkeys, other_container, compress=False, target_memory_bytes= temp_old_hashkeys, data = zip(*content_cache.items()) # I put all of them in bulk temp_new_hashkeys = other_container.add_objects_to_pack( - data, compress=compress, no_holes=True, no_holes_read_twice=True + data, compress=compress, no_holes=True, no_holes_read_twice=True, callback=callback ) # I update the list of known old (this container) and new (other_container) hash keys diff --git a/disk_objectstore/utils.py b/disk_objectstore/utils.py index 428ab81..36cb42a 100644 --- a/disk_objectstore/utils.py +++ b/disk_objectstore/utils.py @@ -413,6 +413,107 @@ def read(self, size=-1): return stream +class CallbackStreamWrapper: + """A class to just wrap a read stream, but perform a callback every few bytes. + + Should be used only for streams open in read mode. + """ + + @property + def mode(self): + return self._stream.mode + + @property + def seekable(self): + """Return whether object supports random access.""" + return self._stream.seekable + + def seek(self, target, whence=0): + """Change stream position.""" + if target > self.tell(): + if self._callback: + self._since_last_update += target - self.tell() + if self._since_last_update >= self._update_every: + self._callback(action='update', value=self._since_last_update) + self._since_last_update = 0 + else: + self.close_callback() + if self._callback: + # If we have a callback, compute the total count of objects in this pack + self._callback( + action='init', + value={ + 'total': self._total_length, + 'description': '{} [rewind]'.format(self._description) + } + ) + # Counter of how many objects have been since since the last update. + # A new callback will be performed when this value is > update_every. + self._since_last_update = target + self._callback(action='update', value=self._since_last_update) + + return self._stream.seek(target, whence) + + def tell(self): + """Return current stream position.""" + return self._stream.tell() + + def __init__(self, stream, callback, total_length=0, description='Streamed object'): + """ + Initialises the reader to a given stream. + + :param stream: an open stream + :param callback: a callback to call to update the status (or None if not needed) + :param total_length: the expected length + """ + self._stream = stream + self._callback = callback + self._total_length = total_length + self._description = description + + if self._callback: + # If we have a callback, compute the total count of objects in this pack + self._callback(action='init', value={'total': total_length, 'description': description}) + # Update at most 400 times, avoiding to increase CPU usage; if the list is small: every object. + self._update_every = max(int(total_length / 400), 1) if total_length else 1 + # Counter of how many objects have been since since the last update. + # A new callback will be performed when this value is > update_every. + self._since_last_update = 0 + + def read(self, size=-1): + """ + Read and return up to n bytes. + + If the argument is omitted, None, or negative, reads and + returns all data until EOF (that corresponds to the length specified + in the __init__ method). + + Returns an empty bytes object on EOF. + """ + data = self._stream.read(size) + + if self._callback: + self._since_last_update += len(data) + if self._since_last_update >= self._update_every: + self._callback(action='update', value=self._since_last_update) + self._since_last_update = 0 + + return data + + def close_callback(self): + """ + Call the wrap up closing calls for the callback. + + .. note:: it DOES NOT close the stream. + """ + if self._callback: + # Final call to complete the bar + if self._since_last_update: + self._callback(action='update', value=self._since_last_update) + # Perform any wrap-up, if needed + self._callback(action='close', value=None) + + class StreamDecompresser: """A class that gets a stream of compressed zlib bytes, and returns the corresponding uncompressed bytes when being read via the .read() method. From acc2186dd9e9d4ae2113739d2fbcd279a34b2052 Mon Sep 17 00:00:00 2001 From: Giovanni Pizzi Date: Tue, 21 Jul 2020 20:42:50 +0200 Subject: [PATCH 03/18] Changing the logic for the callback on the export It now loops over the objects to export --- disk_objectstore/container.py | 37 +++++++++++++++++++++++++++++++---- 1 file changed, 33 insertions(+), 4 deletions(-) diff --git a/disk_objectstore/container.py b/disk_objectstore/container.py index bbf51a4..300d4c1 100644 --- a/disk_objectstore/container.py +++ b/disk_objectstore/container.py @@ -1590,7 +1590,14 @@ def clean_storage(self): # pylint: disable=too-many-branches # I just ignore, I will remove it in a future call of this method. pass - def export(self, hashkeys, other_container, compress=False, target_memory_bytes=104857600, callback=None): + def export( # pylint: disable=too-many-locals + self, + hashkeys, + other_container, + compress=False, + target_memory_bytes=104857600, + callback=None + ): """Export the specified hashkeys to a new container (must be already initialised). :param hashkeys: an iterable of hash keys. @@ -1611,6 +1618,17 @@ def export(self, hashkeys, other_container, compress=False, target_memory_bytes= # We then flush in 'bulk' to the `other_container`, thus speeding up the process content_cache = {} cache_size = 0 + + if callback: + # If we have a callback, compute the total count of objects in this pack + total = len(hashkeys) + callback(action='init', value={'total': total, 'description': 'Objects'}) + # Update at most 400 times, avoiding to increase CPU usage; if the list is small: every object. + update_every = max(int(total / 1000), 1) + # Counter of how many objects have been since since the last update. + # A new callback will be performed when this value is > update_every. + since_last_update = 0 + with self.get_objects_stream_and_meta(hashkeys) as triplets: for old_obj_hashkey, stream, meta in triplets: if meta['size'] > target_memory_bytes: @@ -1628,8 +1646,6 @@ def export(self, hashkeys, other_container, compress=False, target_memory_bytes= compress=compress, no_holes=True, no_holes_read_twice=True, - callback=callback, - callback_size_hint=meta['size'] ) ) elif cache_size + meta['size'] > target_memory_bytes: @@ -1645,7 +1661,7 @@ def export(self, hashkeys, other_container, compress=False, target_memory_bytes= # I put all of them in bulk # I accept the performance hit of reading twice (especially since it's already on memory) temp_new_hashkeys = other_container.add_objects_to_pack( - data, compress=compress, no_holes=True, no_holes_read_twice=True, callback=callback + data, compress=compress, no_holes=True, no_holes_read_twice=True ) # I update the list of known old (this container) and new (other_container) hash keys @@ -1669,6 +1685,19 @@ def export(self, hashkeys, other_container, compress=False, target_memory_bytes= # I update the cache size cache_size += meta['size'] + if callback: + since_last_update += 1 + if since_last_update >= update_every: + callback(action='update', value=since_last_update) + since_last_update = 0 + + if callback: + # Final call to complete the bar + if since_last_update: + callback(action='update', value=since_last_update) + # Perform any wrap-up, if needed + callback(action='close', value=None) + # The for loop is finished. I can also go out of the `with` context manager because whatever is in the # cache is in memory. Most probably I still have content in the cache, just flush it, # with the same logic as above. From 53ad44c348280578c929e56114597ffecc0eed5f Mon Sep 17 00:00:00 2001 From: Giovanni Pizzi Date: Wed, 29 Jul 2020 17:57:41 +0200 Subject: [PATCH 04/18] Implementing basic repacking logic Addresses #12 This is already quite safe to be called, but I didn't cover all corner cases also related to power loss etc. --- disk_objectstore/__init__.py | 4 +- disk_objectstore/container.py | 152 ++++++++++++++++++++++++++++++++-- tests/test_container.py | 81 +++++++++++++++++- 3 files changed, 226 insertions(+), 11 deletions(-) diff --git a/disk_objectstore/__init__.py b/disk_objectstore/__init__.py index d819f8f..eaadf61 100644 --- a/disk_objectstore/__init__.py +++ b/disk_objectstore/__init__.py @@ -2,8 +2,8 @@ It does not require a server running. """ -from .container import Container, ObjectType +from .container import Container, ObjectType, CompressMode -__all__ = ('Container', 'ObjectType') +__all__ = ('Container', 'ObjectType', 'CompressMode') __version__ = '0.4.0' diff --git a/disk_objectstore/container.py b/disk_objectstore/container.py index 300d4c1..036542b 100644 --- a/disk_objectstore/container.py +++ b/disk_objectstore/container.py @@ -33,6 +33,17 @@ class ObjectType(Enum): MISSING = 'missing' +class CompressMode(Enum): + """Various possible behaviors when compressing. + + For now used only in the `repack` function, should probably be applied to all functions + that have a `compress` kwarg. + """ + NO = 'no' # pylint: disable=invalid-name + YES = 'yes' + KEEP = 'keep' # Keep the current compression when repacking. + + class Container: # pylint: disable=too-many-public-methods """A class representing a container of objects (which is stored on a disk folder)""" @@ -44,6 +55,10 @@ class Container: # pylint: disable=too-many-public-methods # when packing. _CHUNKSIZE = 65536 + # The pack ID that is used for repacking as a temporary location. + # NOTE: It MUST be an integer and it MUST be < 0 to avoid collisions with 'actual' packs + _REPACK_PACK_ID = -1 + # When performing an `in_` query in SQLite, this is converted to something like # 'SELECT * FROM db_object WHERE db_object.hashkey IN (?, ?)' with parameters = ('hash1', 'hash2') # Now, the maximum number of parameters is limited in SQLite, see variable SQLITE_MAX_VARIABLE_NUMBER @@ -199,13 +214,15 @@ def _get_loose_path_from_hashkey(self, hashkey): # if loose_prefix_len is zero, there is no subfolder return os.path.join(self._get_loose_folder(), hashkey) - def _get_pack_path_from_pack_id(self, pack_id): + def _get_pack_path_from_pack_id(self, pack_id, allow_repack_pack=False): """Return the path of the pack file on disk for the given pack ID. :param pack_id: the pack ID. + :param pack_id: Whether to allow the repack pack id """ pack_id = str(pack_id) - assert self._is_valid_pack_id(pack_id), 'Invalid pack ID {}'.format(pack_id) + assert self._is_valid_pack_id(pack_id, + allow_repack_pack=allow_repack_pack), 'Invalid pack ID {}'.format(pack_id) return os.path.join(self._get_pack_folder(), pack_id) def _get_pack_index_path(self): @@ -835,15 +852,20 @@ def count_objects(self): return retval - @staticmethod - def _is_valid_pack_id(pack_id): - """Return True if the name is a valid pack ID.""" + @classmethod + def _is_valid_pack_id(cls, pack_id, allow_repack_pack=False): + """Return True if the name is a valid pack ID. + + If allow_repack_pack is True, also the pack id used for repacking is considered as valid. + """ if not pack_id: # Must be a non-empty string return False if pack_id != '0' and pack_id[0] == '0': # The ID must be a valid integer: either zero, or it should not start by zero return False + if allow_repack_pack and pack_id == str(cls._REPACK_PACK_ID): + return True if not all(char in '0123456789' for char in pack_id): return False return True @@ -904,7 +926,7 @@ def get_total_size(self): return retval @contextmanager - def lock_pack(self, pack_id): + def lock_pack(self, pack_id, allow_repack_pack=False): """Lock the given pack id. Use as a context manager. Raise if the pack is already locked. If you enter the context manager, @@ -912,14 +934,16 @@ def lock_pack(self, pack_id): Important to use for avoiding concurrent access/append to the same file. :param pack_id: a string with a valid pack name. + :param allow_pack_repack: if True, allow to open the pack file used for repacking """ - assert self._is_valid_pack_id(pack_id) + assert self._is_valid_pack_id(pack_id, allow_repack_pack=allow_repack_pack) # Open file in exclusive mode lock_file = os.path.join(self._get_pack_folder(), '{}.lock'.format(pack_id)) + pack_file = self._get_pack_path_from_pack_id(pack_id, allow_repack_pack=allow_repack_pack) try: with open(lock_file, 'x'): - with open(self._get_pack_path_from_pack_id(pack_id), 'ab') as pack_handle: + with open(pack_file, 'ab') as pack_handle: yield pack_handle finally: # Release resource (I check if it exists in case there was an exception) @@ -1944,3 +1968,115 @@ def delete_objects(self, hashkeys): # an error while deleting the packed version of an object (even if the loose version of the same object # was deleted) should be considered as if the object has *not* been deleted return list(deleted_loose.union(deleted_packed)) + + def repack(self, compress_mode=CompressMode.KEEP): + """Perform a repack of the packed objects.""" + for pack_id in self._list_packs(): + self.repack_pack(pack_id, compress_mode=compress_mode) + + def repack_pack(self, pack_id, compress_mode=CompressMode.KEEP): + """Perform a repack of a given pack object. + + This is a maintenance operation.""" + if compress_mode != CompressMode.KEEP: + raise NotImplementedError('Only keep method currently implemented') + + assert pack_id != self._REPACK_PACK_ID, ( + "The specified pack_id '{}' is invalid, it is the one used for repacking".format(pack_id) + ) + + # Check that it does not exist + assert not os.path.exists( + self._get_pack_path_from_pack_id(self._REPACK_PACK_ID, allow_repack_pack=True) + ), ("The repack pack '{}' already exists, probably a previous repacking aborted?".format(self._REPACK_PACK_ID)) + + session = self._get_cached_session() + one_object_in_pack = session.query(Obj.id).filter(Obj.pack_id == pack_id).limit(1).all() + if not one_object_in_pack: + # No objects. Clean up the pack file, if it exists. + if os.path.exists(self._get_pack_path_from_pack_id(pack_id)): + os.remove(self._get_pack_path_from_pack_id(pack_id)) + return + + obj_dicts = [] + # At least one object. Let's repack. We have checked before that the + # REPACK_PACK_ID did not exist. + with self.lock_pack(str(self._REPACK_PACK_ID), allow_repack_pack=True) as write_pack_handle: + with open(self._get_pack_path_from_pack_id(pack_id), 'rb') as read_pack: + query = session.query(Obj.id, Obj.hashkey, Obj.size, Obj.offset, Obj.length, + Obj.compressed).filter(Obj.pack_id == pack_id).order_by(Obj.offset) + for rowid, hashkey, size, offset, length, compressed in query: + # Since I am assuming above that the method is `KEEP`, I will just transfer + # the bytes. Otherwise I have to properly take into account compression in the + # source and in the destination. + read_handle = PackedObjectReader(read_pack, offset, length) + + obj_dict = {} + obj_dict['id'] = rowid + obj_dict['hashkey'] = hashkey + obj_dict['pack_id'] = self._REPACK_PACK_ID + obj_dict['compressed'] = compressed + obj_dict['size'] = size + obj_dict['offset'] = write_pack_handle.tell() + + # Transfer data in chunks. + # No need to rehash - it's the same container so the same hash. + # Not checking the compression on source or destination - we are assuming + # for now that the mode is KEEP. + while True: + chunk = read_handle.read(self._CHUNKSIZE) + if chunk == b'': + # Returns an empty bytes object on EOF. + break + write_pack_handle.write(chunk) + obj_dict['length'] = write_pack_handle.tell() - obj_dict['offset'] + + # Appending for later bulk commit + # I will assume that all objects of a single pack fit in memory + obj_dicts.append(obj_dict) + safe_flush_to_disk( + write_pack_handle, self._get_pack_path_from_pack_id(self._REPACK_PACK_ID, allow_repack_pack=True) + ) + + # We are done with data transfer. + # At this stage we just have a new pack -1 (_REPACK_PACK_ID) but it is never referenced. + # Let us store the information in the DB. + # We had already checked earlier that this at least one exists. + session.bulk_update_mappings(Obj, obj_dicts) + # I also commit. + session.commit() + # Clean up the cache + obj_dicts = [] + + # Now we can safely delete the old object. I just check that there is no object still + # refencing the old pack, to be sure. + one_object_in_pack = session.query(Obj.id).filter(Obj.pack_id == pack_id).limit(1).all() + assert not one_object_in_pack, ( + "I moved the objects of pack '{pack_id}' to pack '{repack_id}' " + "but there are still references to pack '{pack_id}'!".format( + pack_id=pack_id, repack_id=self._REPACK_PACK_ID + ) + ) + os.remove(self._get_pack_path_from_pack_id(pack_id)) + + # I need now to move the file back. I need to be careful, to avoid conditions in which + # I remain with inconsistent data. + # Since hard links seem to be supported on all three platforms, I do a hard link + # of -1 back to the correct pack ID. + os.link( + self._get_pack_path_from_pack_id(self._REPACK_PACK_ID, allow_repack_pack=True), + self._get_pack_path_from_pack_id(pack_id) + ) + + # Before deleting the source (pack -1) I need now to update again all + # entries to point to the correct pack id + session.query(Obj).filter(Obj.pack_id == self._REPACK_PACK_ID).update({Obj.pack_id: pack_id}) + session.commit() + + # Technically, to be crash safe, before deleting I should also fsync the folder + # I am not doing this for now + # I now can unlink/delete the original source + os.unlink(self._get_pack_path_from_pack_id(self._REPACK_PACK_ID, allow_repack_pack=True)) + + # We are now done. The temporary pack is gone, and the old `pack_id` + # has now been replaced with an udpated, repacked pack. diff --git a/tests/test_container.py b/tests/test_container.py index 5fb231d..3eb0920 100644 --- a/tests/test_container.py +++ b/tests/test_container.py @@ -14,7 +14,7 @@ import psutil import pytest -from disk_objectstore import Container, ObjectType +from disk_objectstore import Container, ObjectType, CompressMode from disk_objectstore import utils, models import disk_objectstore.exceptions as exc @@ -2775,3 +2775,82 @@ def test_packs_read_in_order(temp_dir): # Important before exiting from the tests temp_container.close() + + +def test_repack(temp_dir): + """Test the repacking functionality.""" + temp_container = Container(temp_dir) + temp_container.init_container(clear=True, pack_size_target=39) + + # data of 10 bytes each. Will fill two packs. + data = [ + b'-123456789', b'a123456789', b'b123456789', b'c123456789', b'd123456789', b'e123456789', b'f123456789', + b'g123456789', b'h123456789' + ] + + hashkeys = [] + # Add them one by one, so I am sure in wich pack they go + for datum in data: + hashkeys.append(temp_container.add_objects_to_pack([datum])[0]) + + assert temp_container.get_object_meta(hashkeys[0])['pack_id'] == 0 + assert temp_container.get_object_meta(hashkeys[1])['pack_id'] == 0 + assert temp_container.get_object_meta(hashkeys[2])['pack_id'] == 0 + assert temp_container.get_object_meta(hashkeys[3])['pack_id'] == 0 + assert temp_container.get_object_meta(hashkeys[4])['pack_id'] == 1 + assert temp_container.get_object_meta(hashkeys[5])['pack_id'] == 1 + assert temp_container.get_object_meta(hashkeys[6])['pack_id'] == 1 + assert temp_container.get_object_meta(hashkeys[7])['pack_id'] == 1 + assert temp_container.get_object_meta(hashkeys[8])['pack_id'] == 2 + + # I check which packs exist + assert sorted(temp_container._list_packs()) == ['0', '1', '2'] # pylint: disable=protected-access + + counts = temp_container.count_objects() + assert counts['packed'] == len(data) + size = temp_container.get_total_size() + assert size['total_size_packed'] == 10 * len(data) + assert size['total_size_packfiles_on_disk'] == 10 * len(data) + + # I delete an object in the middle, an object at the end of a pack, and an object at the beginning. + # I also delete the only object + to_delete = [hashkeys[1], hashkeys[3], hashkeys[4], hashkeys[8]] + temp_container.delete_objects(to_delete) + + # I check that all packs are still there + assert sorted(temp_container._list_packs()) == ['0', '1', '2'] # pylint: disable=protected-access + + counts = temp_container.count_objects() + assert counts['packed'] == len(data) - len(to_delete) + size = temp_container.get_total_size() + # I deleted 4 objects + assert size['total_size_packed'] == 10 * (len(data) - len(to_delete)) + # Still full size on disk + assert size['total_size_packfiles_on_disk'] == 10 * len(data) + + # I now repack + temp_container.repack(compress_mode=CompressMode.KEEP) + + # I check that all packs are still there, but pack 2 was deleted + assert sorted(temp_container._list_packs()) == ['0', '1'] # pylint: disable=protected-access + + counts = temp_container.count_objects() + assert counts['packed'] == len(data) - len(to_delete) + size = temp_container.get_total_size() + assert size['total_size_packed'] == 10 * (len(data) - len(to_delete)) + # This time also the size on disk should be adapted (it's the main goal of repacking) + assert size['total_size_packfiles_on_disk'] == 10 * (len(data) - len(to_delete)) + + # Important before exiting from the tests + temp_container.close() + + +def test_not_implemented_repacks(temp_container): + """Check the error for not implemented repack methods.""" + # We need to have at least one pack + temp_container.add_objects_to_pack([b'23r2']) + for compress_mode in CompressMode: + if compress_mode == CompressMode.KEEP: + continue + with pytest.raises(NotImplementedError): + temp_container.repack(compress_mode=compress_mode) From 585351e10284a14d44d2c3d446dd40d1467c62ba Mon Sep 17 00:00:00 2001 From: Giovanni Pizzi Date: Wed, 29 Jul 2020 18:37:32 +0200 Subject: [PATCH 05/18] We also want to test that after repacking content is correct --- tests/test_container.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/tests/test_container.py b/tests/test_container.py index 3eb0920..249baf8 100644 --- a/tests/test_container.py +++ b/tests/test_container.py @@ -2841,6 +2841,11 @@ def test_repack(temp_dir): # This time also the size on disk should be adapted (it's the main goal of repacking) assert size['total_size_packfiles_on_disk'] == 10 * (len(data) - len(to_delete)) + # Check that the content is still correct + # Should not raise + errors = temp_container.validate() + assert not any(errors.values()) + # Important before exiting from the tests temp_container.close() From 38471b667a529e391f845fde73b2c94936f72f61 Mon Sep 17 00:00:00 2001 From: Giovanni Pizzi Date: Sat, 8 Aug 2020 18:27:00 +0200 Subject: [PATCH 06/18] Reverting the logic of the export function Now we define an `import_objects` function instead, that (when importing from a container using the same hashing algorithm) can be much more efficient by deciding beforehand which objects to import, and then importing only those. This uses the efficient method of #93 to iterate only once through two sorted unique lists to get their intersection or left/right difference. The performance still needs to be benchmarked. The `export` function might be deprecated if this is more efficient. --- disk_objectstore/container.py | 123 ++++++++++++++++++++++++----- disk_objectstore/utils.py | 137 ++++++++++++++++++++++++++++++++ tests/test_utils.py | 144 ++++++++++++++++++++++++++++++++++ 3 files changed, 386 insertions(+), 18 deletions(-) diff --git a/disk_objectstore/container.py b/disk_objectstore/container.py index 036542b..b6ee19c 100644 --- a/disk_objectstore/container.py +++ b/disk_objectstore/container.py @@ -18,8 +18,9 @@ from .models import Base, Obj from .utils import ( - ObjectWriter, PackedObjectReader, StreamDecompresser, CallbackStreamWrapper, chunk_iterator, is_known_hash, - nullcontext, safe_flush_to_disk, get_hash, compute_hash_and_size + ObjectWriter, PackedObjectReader, StreamDecompresser, CallbackStreamWrapper, Location, chunk_iterator, + is_known_hash, nullcontext, safe_flush_to_disk, get_hash, compute_hash_and_size, merge_sorted, detect_where_sorted, + yield_first_element ) from .exceptions import NotExistent, NotInitialised, InconsistentContent @@ -1614,26 +1615,26 @@ def clean_storage(self): # pylint: disable=too-many-branches # I just ignore, I will remove it in a future call of this method. pass - def export( # pylint: disable=too-many-locals + def import_objects( # pylint: disable=too-many-locals,too-many-statements,too-many-branches self, hashkeys, - other_container, + source_container, compress=False, target_memory_bytes=104857600, callback=None ): - """Export the specified hashkeys to a new container (must be already initialised). + """Imports the objects with the specified hashkeys into the container. :param hashkeys: an iterable of hash keys. - :param new_container: another Container class into which you want to export the specified hash keys of this - container. + :param source_container: another Container class containing the objects with the given hash keys. :param compress: specifies if content should be stored in compressed form. - :param target_memory_bytes: how much data to store in RAM before dumping to the new container. Larger values - allow to read and write in bulk that is more efficient, but of course require more memory. + :param target_memory_bytes: how much data to store in RAM before actually storing in the container. + Larger values allow to read and write in bulk that is more efficient, but of course require more memory. Note that actual memory usage will be larger (SQLite DB, storage of the hashkeys are not included - this only counts the RAM needed for the object content). Default: 100MB. - :return: a mapping from the old hash keys (in this container) to the new hash keys (in `other_container`). + :return: a mapping from the old hash keys (in the ``source_container``) to the new hash keys + (in this container). """ old_obj_hashkeys = [] new_obj_hashkeys = [] @@ -1643,6 +1644,68 @@ def export( # pylint: disable=too-many-locals content_cache = {} cache_size = 0 + if source_container.hash_type == self.hash_type: + # In this case, I can use some optimisation, because I can just work on the intersection + # of the hash keys, since I can know in advnace which objects are already present. + sorted_hashkeys = sorted(set(hashkeys)) + + if callback: + # If we have a callback, compute the total count of objects in this pack + total = len(sorted_hashkeys) + callback(action='init', value={'total': total, 'description': 'New objects'}) + # Update at most 400 times, avoiding to increase CPU usage; if the list is small: every object. + update_every = max(int(total / 1000), 1) + # Counter of how many objects have been since since the last update. + # A new callback will be performed when this value is > update_every. + since_last_update = 0 + + sorted_loose = sorted(self._list_loose()) + # This is a very efficient way to get a sorted iterator without preloading everything in memory + # NOTE: this might be slow in the combination of these two cases: + # 1. the pack index (SQLite DB) of this repository is not VACUUMed + # AND + # 2. the pack index (SQLite DB) is not in the OS disk cache + # In this case, also the index on the hash key is scattered on disk and reading will be very slow, + # see issue #94. + # NOTE: I need to wrap in the `yield_first_element` iterator since it returns a list of lists + sorted_packed = yield_first_element( + self._get_cached_session().execute('SELECT hashkey FROM db_object ORDER BY hashkey') + ) + sorted_existing = merge_sorted(sorted_loose, sorted_packed) + + # Hashkeys will be replaced with only those that are not yet in this repository (i.e., LEFTONLY) + hashkeys = [] + for item, where in detect_where_sorted(sorted_hashkeys, sorted_existing): + if callback and where in [Location.BOTH, Location.LEFTONLY]: + # It is in the sorted hash keys. Since this is the one for which I know the length efficiently, + # I use it for the progress bar. This will be relatively accurate for large lists of hash keys, + # but will not show a continuous bar if the list of hash keys to import is much shorter than + # the list of hash keys in this (destination) container. This is probably OK, though. + since_last_update += 1 + if since_last_update >= update_every: + callback(action='update', value=since_last_update) + since_last_update = 0 + + if where == Location.LEFTONLY: + hashkeys.append(item) + + if callback: + # Final call to complete the bar + if since_last_update: + callback(action='update', value=since_last_update) + # Perform any wrap-up, if needed + callback(action='close', value=None) + + # I just insert the new objects without first checking that I am not leaving holes in the pack files, + # as I already checked here. + no_holes = False + no_holes_read_twice = False + else: + # hash types are different: I have to add all objects that were provided as I have no way to check + # if they already exist + no_holes = True + no_holes_read_twice = True + if callback: # If we have a callback, compute the total count of objects in this pack total = len(hashkeys) @@ -1653,7 +1716,7 @@ def export( # pylint: disable=too-many-locals # A new callback will be performed when this value is > update_every. since_last_update = 0 - with self.get_objects_stream_and_meta(hashkeys) as triplets: + with source_container.get_objects_stream_and_meta(hashkeys) as triplets: for old_obj_hashkey, stream, meta in triplets: if meta['size'] > target_memory_bytes: # If the object itself is too big, just write it directly @@ -1665,11 +1728,11 @@ def export( # pylint: disable=too-many-locals # but I avoid to write a huge object to disk when it's not needed because already available # on the destination new_obj_hashkeys.append( - other_container.add_streamed_object_to_pack( + self.add_streamed_object_to_pack( stream, compress=compress, - no_holes=True, - no_holes_read_twice=True, + no_holes=no_holes, + no_holes_read_twice=no_holes_read_twice, ) ) elif cache_size + meta['size'] > target_memory_bytes: @@ -1684,8 +1747,8 @@ def export( # pylint: disable=too-many-locals # I put all of them in bulk # I accept the performance hit of reading twice (especially since it's already on memory) - temp_new_hashkeys = other_container.add_objects_to_pack( - data, compress=compress, no_holes=True, no_holes_read_twice=True + temp_new_hashkeys = self.add_objects_to_pack( + data, compress=compress, no_holes=no_holes, no_holes_read_twice=no_holes_read_twice ) # I update the list of known old (this container) and new (other_container) hash keys @@ -1731,8 +1794,8 @@ def export( # pylint: disable=too-many-locals # I create a list of hash keys and the corresponding content temp_old_hashkeys, data = zip(*content_cache.items()) # I put all of them in bulk - temp_new_hashkeys = other_container.add_objects_to_pack( - data, compress=compress, no_holes=True, no_holes_read_twice=True, callback=callback + temp_new_hashkeys = self.add_objects_to_pack( + data, compress=compress, no_holes=no_holes, no_holes_read_twice=no_holes_read_twice, callback=callback ) # I update the list of known old (this container) and new (other_container) hash keys @@ -1744,6 +1807,30 @@ def export( # pylint: disable=too-many-locals return old_new_obj_hashkey_mapping + def export(self, hashkeys, other_container, compress=False, target_memory_bytes=104857600, callback=None): + """Export the specified hashkeys to a new container (must be already initialised). + + .. note:: This is a wrapper of the ``import_objects`` function of the ``other_container``. + + :param hashkeys: an iterable of hash keys. + :param other_container: another Container class into which you want to export the specified hash keys of this + container. + :param compress: specifies if content should be stored in compressed form. + :param target_memory_bytes: how much data to store in RAM before dumping to the new container. Larger values + allow to read and write in bulk that is more efficient, but of course require more memory. + Note that actual memory usage will be larger (SQLite DB, storage of the hashkeys are not included - this + only counts the RAM needed for the object content). Default: 100MB. + + :return: a mapping from the old hash keys (in this container) to the new hash keys (in `other_container`). + """ + return other_container.import_objects( + hashkeys=hashkeys, + source_container=self, + compress=compress, + target_memory_bytes=target_memory_bytes, + callback=callback + ) + # Let us also compute the hash def _validate_hashkeys_pack(self, pack_id, callback=None): # pylint: disable=too-many-locals """Validate all hashkeys and returns a dictionary of problematic entries. diff --git a/disk_objectstore/utils.py b/disk_objectstore/utils.py index 36cb42a..b54f66d 100644 --- a/disk_objectstore/utils.py +++ b/disk_objectstore/utils.py @@ -3,12 +3,15 @@ Some might be useful also for end users, like the wrappers to get streams, like the ``LazyOpener``. """ +# pylint: disable= too-many-lines import hashlib import itertools import os import uuid import zlib +from enum import Enum + try: import fcntl except ImportError: @@ -25,6 +28,13 @@ _MACOS_ALWAYS_USE_FULLSYNC = False +class Location(Enum): + """Enum that describes if an element is only on the left or right iterator, or on both.""" + LEFTONLY = -1 + BOTH = 0 + RIGHTONLY = 1 + + class LazyOpener: """A class to return a stream to a given file, that however is opened lazily. @@ -887,3 +897,130 @@ def compute_hash_and_size(stream, hash_type): size += len(next_chunk) return hasher.hexdigest(), size + + +def detect_where_sorted(left_iterator, right_iterator): # pylint: disable=too-many-branches, too-many-statements + """Generator that loops in alternation (but only once each) the two iterators and yields an element, specifying if + it's only on the left, only on the right, or in both. + + .. note:: IMPORTANT! The two iterators MUST return unique and sorted results. + + This function will check and raise a ValueError if it detects non-unique or non-sorted elements. + HOWEVER, this exception is raised only at the first occurrence of the issue, that can be very late in the execution, + so if you process results in a streamed way, please ensure that you pass sorted iterators. + """ + left_exhausted = False + right_exhausted = False + + # Convert first in iterators (in case they are, e.g., lists) + left_iterator = iter(left_iterator) + right_iterator = iter(right_iterator) + + try: + last_left = next(left_iterator) + except StopIteration: + left_exhausted = True + + try: + last_right = next(right_iterator) + except StopIteration: + right_exhausted = True + + if left_exhausted and right_exhausted: + # Nothing to be done, both iterators are empty + return + + now_left = True + if left_exhausted or (not right_exhausted and last_left > last_right): + now_left = False # I want the 'current' (now) to be behind or at the same position of the other at any time + + while not (left_exhausted and right_exhausted): + advance_both = False + if now_left: + if right_exhausted: + yield last_left, Location.LEFTONLY + else: + if last_left == last_right: + # They are equal: add to intersection and continue + yield last_left, Location.BOTH + # I need to consume and advance on both iterators at the next iteration + advance_both = True + elif last_left < last_right: + # the new entry (last_left) is still smaller: it's on the left only + yield last_left, Location.LEFTONLY + else: + # the new entry (last_left) is now larger: then, last_right is only on the right + # and I switch to now_right + yield last_right, Location.RIGHTONLY + now_left = False + else: + if left_exhausted: + yield last_right, Location.RIGHTONLY + else: + if last_left == last_right: + # They are equal: add to intersection and continue + yield last_right, Location.BOTH + # I need to consume and advance on both iterators at the next iteration + advance_both = True + elif last_left > last_right: + # the new entry (last_right) is still smaller: it's on the right only + yield last_right, Location.RIGHTONLY + else: + # the new entry (last_right) is now larger: then, last_left is only on the left + # and I switch to now_left + yield last_left, Location.LEFTONLY + now_left = True + + # When we are here: if now_left, then last_left has been inserted in one of the lists; + # if not now_left, then last_right has been insterted in one of the lists. + # If advance both, they both can be discarded. So if I exhausted an iterator, I am not losing + # any entry. + + # I will need to cache the old value, see comments below in the `except StopIteration` block + new_now_left = now_left + if now_left or advance_both: + try: + new = next(left_iterator) + if new <= last_left: + raise ValueError( + "The left iterator does not return sorted unique entries, I got '{}' after '{}'".format( + new, last_left + ) + ) + last_left = new + except StopIteration: + left_exhausted = True + # I need to store in a different variable, otherwise in this case + # I would also enter the next iteration even if advance_both is False! + new_now_left = False + + if not now_left or advance_both: + try: + new = next(right_iterator) + if new <= last_right: + raise ValueError( + "The right iterator does not return sorted unique entries, I got '{}' after '{}'".format( + new, last_right + ) + ) + last_right = new + except StopIteration: + right_exhausted = True + # For consistency, also here I set new_now_left + new_now_left = True + + # Set the new now_left value + now_left = new_now_left + + +def yield_first_element(iterator): + """Given an iterator that returns a tuple, return an iterator that yields only the first element of the tuple.""" + for elem in iterator: + yield elem[0] + + +def merge_sorted(iterator1, iterator2): + """Given two sorted iterators, return another sorted iterator being the union of the two.""" + for item, _ in detect_where_sorted(iterator1, iterator2): + # Whereever it is (only left, only right, on both) I return the object. + yield item diff --git a/tests/test_utils.py b/tests/test_utils.py index c3ea935..7878ea4 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -1211,3 +1211,147 @@ def test_compute_hash_and_size(hash_type): hashkey, size = utils.compute_hash_and_size(stream, hash_type=hash_type) assert hashkey == expected_hash assert size == expected_size + + +LEFT_RIGHT_PAIRS = [ + # Both empty + [[], []], + # Left empty + [ + [], + [1, 2, 3], + ], + # Right empty + [ + [1, 2, 4], + [], + ], + # Some lists with some overlap, right exhausted first + [ + [1, 3, 5, 7, 9, 10, 11, 20], + [0, 2, 4, 5, 8, 10, 12], + ], + # Some lists with some overlap, left exhausted first + [ + [0, 2, 4, 5, 8, 10, 12], + [1, 3, 5, 7, 9, 10, 11, 20], + ], + # Start with both, continue left + [ + [0, 1, 2], + [0, 3, 5, 7], + ], + # Start with both, continue right + [ + [0, 3, 5, 7], + [0, 1, 2], + ], + # End with both, coming from left + [ + [0, 3, 5, 6, 8], + [1, 2, 8], + ], + # End with both, coming from right + [ + [1, 2, 8], + [0, 3, 5, 6, 8], + ], +] + + +@pytest.mark.parametrize('left,right', LEFT_RIGHT_PAIRS) +def test_detect_where(left, right): + """Test the detect_where_sorted function.""" + # Compute the expected result + merged = sorted(set(left + right)) + idx = -1 # Needed when detect_where_sorted is an empty iterator + for idx, (item, where) in enumerate(utils.detect_where_sorted(left, right)): + assert item == merged[idx] + if merged[idx] in left: + if merged[idx] in right: + expected_where = utils.Location.BOTH + else: + expected_where = utils.Location.LEFTONLY + else: + expected_where = utils.Location.RIGHTONLY + assert where == expected_where + assert idx + 1 == len(merged) + + +LEFT_RIGHT_PAIRS_UNSORTED = [ + # Unsorted at end, left + [ + [1, 4, 5, 1], + [1, 2, 3], + ], + # Unsorted at end, right + [ + [1, 4, 5], + [1, 2, 3, 1], + ], + # Unsorted at beginning, left + [ + [1, 0, 4, 5], + [1, 2, 3], + ], + # Unsorted at beginning, right + [ + [1, 4, 5], + [1, 0, 2, 3], + ], + # not unique at end, left + [ + [1, 4, 5, 5], + [1, 2, 3], + ], + # Not unique at end, right + [ + [1, 4, 5], + [1, 2, 3, 3], + ], + # Not unique at beginning, left + [ + [1, 1, 4, 5], + [1, 2, 3], + ], + # Not unique at beginning, right + [ + [1, 4, 5], + [1, 1, 2, 3], + ] +] + + +@pytest.mark.parametrize('left,right', LEFT_RIGHT_PAIRS_UNSORTED) +def test_detect_where_unsorted(left, right): + """Test the detect_where_sorted function when the lists are not sorted.""" + with pytest.raises(ValueError) as excinfo: + list(utils.detect_where_sorted(left, right)) + assert 'does not return sorted', str(excinfo.value) + + +def test_yield_first(): + """Test the yield_first_element function.""" + + first = [1, 3, 5, 7, 9] + second = [0, 2, 4, 6, 8] + + # [(1, 0), (3, 2), ...] + inner_iter = zip(first, second) + + result = list(utils.yield_first_element(inner_iter)) + + assert result == first + + +def test_merge_sorted(): + """Test the merge_sorted function.""" + # I also put some repetitions + first = sorted([1, 3, 5, 7, 9, 10, 11, 20]) + second = sorted([0, 2, 4, 5, 8, 10, 12]) + + result1 = list(utils.merge_sorted(first, second)) + result2 = list(utils.merge_sorted(second, first)) + + assert result1 == sorted(set(first + second)) + assert result2 == sorted(set(first + second)) From 1f71b4de7af445c79082a570fec4f88e47d929f4 Mon Sep 17 00:00:00 2001 From: Giovanni Pizzi Date: Sun, 9 Aug 2020 00:41:04 +0200 Subject: [PATCH 07/18] Adding VACUUMing operations I both add a private _vacuum() operation, and I call it: - optionally (False by default) in clean_storage() - at the end of a full repack of all packs --- disk_objectstore/container.py | 41 ++++++++++++++++++++++++++++++++--- 1 file changed, 38 insertions(+), 3 deletions(-) diff --git a/disk_objectstore/container.py b/disk_objectstore/container.py index b6ee19c..6109352 100644 --- a/disk_objectstore/container.py +++ b/disk_objectstore/container.py @@ -1523,15 +1523,34 @@ def add_objects_to_pack( callback=callback ) - def clean_storage(self): # pylint: disable=too-many-branches + def _vacuum(self): + """Perform a `VACUUM` operation on the SQLite operation. + + This is critical for two aspects: + + 1. reclaiming unused space after many deletions + 2. reordering data on disk to make data access *much* more efficient + + (See also description in issue #94). + """ + engine = self._get_cached_session().get_bind() + engine.execute('VACUUM') + + def clean_storage(self, vacuum=False): # pylint: disable=too-many-branches """Perform some maintenance clean-up of the container. .. note:: this is a maintenance operation, must be performed when nobody is using the container! In particular: + - if `vacuum` is True, it first VACUUMs the DB, reclaiming unused space and + making access much faster - it removes duplicates if any, with some validation - it cleans up loose objects that are already in packs """ + # I start by VACUUMing the DB - this is something useful to do + if vacuum: + self._vacuum() + all_duplicates = os.listdir(self._get_duplicates_folder()) duplicates_mapping = defaultdict(list) @@ -2057,14 +2076,30 @@ def delete_objects(self, hashkeys): return list(deleted_loose.union(deleted_packed)) def repack(self, compress_mode=CompressMode.KEEP): - """Perform a repack of the packed objects.""" + """Perform a repack of all packed objects. + + At the end, it also VACUUMs the DB to reclaim unused space and make + access more efficient. + + This is a maintenance operation. + + :param compress_mode: see docstring of ``repack_pack``. + """ for pack_id in self._list_packs(): self.repack_pack(pack_id, compress_mode=compress_mode) + self._vacuum() def repack_pack(self, pack_id, compress_mode=CompressMode.KEEP): """Perform a repack of a given pack object. - This is a maintenance operation.""" + This is a maintenance operation. + + :param compress_mode: must be a valid CompressMode enum type. + Currently, the only implemented mode is KEEP, meaning that it + preserves the same compression (this means that repacking is *much* faster + as it can simply transfer the bytes without decompressing everything first, + and recompressing it back again). + """ if compress_mode != CompressMode.KEEP: raise NotImplementedError('Only keep method currently implemented') From f04f63ee6d9c22a2674fdf9939c38550cdf08d9b Mon Sep 17 00:00:00 2001 From: Giovanni Pizzi Date: Sun, 9 Aug 2020 17:21:06 +0200 Subject: [PATCH 08/18] Adding a new threshold to iterate on all objects when asking for too many hash keys Above a certain threshold, it's just faster to iterate on all items rather than making an SQL `IN` query on chunks of `_IN_SQL_MAX_LENGTH = 950` (e.g. if we have millions of objects). For now, this is set as `_MAX_CHUNK_ITERATE_LENGTH = 9500`, but this needs to be benchmarked. Also, this is still missing tests and a real benchmark. --- disk_objectstore/container.py | 99 ++++++++++++++++++++++++++--------- disk_objectstore/utils.py | 33 ++++++++---- 2 files changed, 99 insertions(+), 33 deletions(-) diff --git a/disk_objectstore/container.py b/disk_objectstore/container.py index 6109352..8c774c1 100644 --- a/disk_objectstore/container.py +++ b/disk_objectstore/container.py @@ -72,6 +72,12 @@ class Container: # pylint: disable=too-many-public-methods # See also e.g. this comment https://bugzilla.redhat.com/show_bug.cgi?id=1798134 _IN_SQL_MAX_LENGTH = 950 + # If the length of required elements is larger than this, instead of iterating an IN statement over chunks of size + # _IN_SQL_MAX_LENGTH, it just quickly lists all elements (ordered by hashkey, requires a VACUUMed DB for + # performance) and returns only the intersection. + # TODO: benchmark this value and set an an appropriate value. + _MAX_CHUNK_ITERATE_LENGTH = 9500 + def __init__(self, folder): """Create the class that represents the container. @@ -480,14 +486,26 @@ def _get_objects_stream_meta_generator( # pylint: disable=too-many-branches,too # to order in python instead session = self._get_cached_session() - # Operate in chunks, due to the SQLite limits - # (see comment above the definition of self._IN_SQL_MAX_LENGTH) - for chunk in chunk_iterator(hashkeys_set, size=self._IN_SQL_MAX_LENGTH): - query = session.query(Obj).filter( - Obj.hashkey.in_(chunk) - ).with_entities(Obj.pack_id, Obj.hashkey, Obj.offset, Obj.length, Obj.compressed, Obj.size) - for res in query: - packs[res[0]].append(ObjQueryResults(res[1], res[2], res[3], res[4], res[5])) + if len(hashkeys_set) <= self._MAX_CHUNK_ITERATE_LENGTH: + # Operate in chunks, due to the SQLite limits + # (see comment above the definition of self._IN_SQL_MAX_LENGTH) + for chunk in chunk_iterator(hashkeys_set, size=self._IN_SQL_MAX_LENGTH): + query = session.query(Obj).filter( + Obj.hashkey.in_(chunk) + ).with_entities(Obj.pack_id, Obj.hashkey, Obj.offset, Obj.length, Obj.compressed, Obj.size) + for res in query: + packs[res[0]].append(ObjQueryResults(res[1], res[2], res[3], res[4], res[5])) + else: + sorted_hashkeys = sorted(hashkeys_set) + pack_iterator = session.execute( + 'SELECT pack_id, hashkey, offset, length, compressed, size FROM db_object ORDER BY hashkey' + ) + # The left_key returns the second element of the tuple, i.e. the hashkey (that is the value to compare + # with the right iterator) + for res, where in detect_where_sorted(pack_iterator, sorted_hashkeys, left_key=lambda x: x[1]): + if where == Location.BOTH: + # If it's in both, it returns the left one, i.e. the full data from the DB + packs[res[0]].append(ObjQueryResults(res[1], res[2], res[3], res[4], res[5])) for pack_int_id, pack_metadata in packs.items(): pack_metadata.sort(key=lambda metadata: metadata.offset) @@ -583,12 +601,24 @@ def _get_objects_stream_meta_generator( # pylint: disable=too-many-branches,too packs = defaultdict(list) session = self._get_cached_session() - for chunk in chunk_iterator(loose_not_found, size=self._IN_SQL_MAX_LENGTH): - query = session.query(Obj).filter( - Obj.hashkey.in_(chunk) - ).with_entities(Obj.pack_id, Obj.hashkey, Obj.offset, Obj.length, Obj.compressed, Obj.size) - for res in query: - packs[res[0]].append(ObjQueryResults(res[1], res[2], res[3], res[4], res[5])) + if len(loose_not_found) <= self._MAX_CHUNK_ITERATE_LENGTH: + for chunk in chunk_iterator(loose_not_found, size=self._IN_SQL_MAX_LENGTH): + query = session.query(Obj).filter( + Obj.hashkey.in_(chunk) + ).with_entities(Obj.pack_id, Obj.hashkey, Obj.offset, Obj.length, Obj.compressed, Obj.size) + for res in query: + packs[res[0]].append(ObjQueryResults(res[1], res[2], res[3], res[4], res[5])) + else: + sorted_hashkeys = sorted(loose_not_found) + pack_iterator = session.execute( + 'SELECT pack_id, hashkey, offset, length, compressed, size FROM db_object ORDER BY hashkey' + ) + # The left_key returns the second element of the tuple, i.e. the hashkey (that is the value to compare + # with the right iterator) + for res, where in detect_where_sorted(pack_iterator, sorted_hashkeys, left_key=lambda x: x[1]): + if where == Location.BOTH: + # If it's in both, it returns the left one, i.e. the full data from the DB + packs[res[0]].append(ObjQueryResults(res[1], res[2], res[3], res[4], res[5])) # I will construct here the really missing objects. # I make a copy of the set. @@ -1101,10 +1131,20 @@ def pack_all_loose(self, compress=False, validate_objects=True): existing_packed_hashkeys = [] - for chunk in chunk_iterator(loose_objects, size=self._IN_SQL_MAX_LENGTH): - # I check the hash keys that are already in the pack - for res in session.query(Obj).filter(Obj.hashkey.in_(chunk)).with_entities(Obj.hashkey).all(): - existing_packed_hashkeys.append(res[0]) + if len(loose_objects) <= self._MAX_CHUNK_ITERATE_LENGTH: + for chunk in chunk_iterator(loose_objects, size=self._IN_SQL_MAX_LENGTH): + # I check the hash keys that are already in the pack + for res in session.query(Obj).filter(Obj.hashkey.in_(chunk)).with_entities(Obj.hashkey).all(): + existing_packed_hashkeys.append(res[0]) + else: + sorted_hashkeys = sorted(loose_objects) + pack_iterator = session.execute('SELECT hashkey FROM db_object ORDER BY hashkey') + + # The query returns a tuple of length 1, so I still need a left_key + for res, where in detect_where_sorted(pack_iterator, sorted_hashkeys, left_key=lambda x: x[0]): + if where == Location.BOTH: + existing_packed_hashkeys.append(res[0]) + # I remove them from the loose_objects list loose_objects.difference_update(existing_packed_hashkeys) # Now, I should be left only with objects with hash keys that are not yet known. @@ -1618,10 +1658,19 @@ def clean_storage(self, vacuum=False): # pylint: disable=too-many-branches session = self._get_cached_session() # I search now for all loose hash keys that exist also in the packs existing_packed_hashkeys = [] - for chunk in chunk_iterator(loose_objects, size=self._IN_SQL_MAX_LENGTH): - # I check the hash keys that are already in the pack - for res in session.query(Obj).filter(Obj.hashkey.in_(chunk)).with_entities(Obj.hashkey).all(): - existing_packed_hashkeys.append(res[0]) + if len(loose_objects) <= self._MAX_CHUNK_ITERATE_LENGTH: + for chunk in chunk_iterator(loose_objects, size=self._IN_SQL_MAX_LENGTH): + # I check the hash keys that are already in the pack + for res in session.query(Obj).filter(Obj.hashkey.in_(chunk)).with_entities(Obj.hashkey).all(): + existing_packed_hashkeys.append(res[0]) + else: + sorted_hashkeys = sorted(loose_objects) + pack_iterator = session.execute('SELECT hashkey FROM db_object ORDER BY hashkey') + + # The query returns a tuple of length 1, so I still need a left_key + for res, where in detect_where_sorted(pack_iterator, sorted_hashkeys, left_key=lambda x: x[0]): + if where == Location.BOTH: + existing_packed_hashkeys.append(res[0]) # I now clean up loose objects that are already in the packs. # Here, we assume that if it's already packed, it's safe to assume it's uncorrupted. @@ -1671,7 +1720,7 @@ def import_objects( # pylint: disable=too-many-locals,too-many-statements,too-m if callback: # If we have a callback, compute the total count of objects in this pack total = len(sorted_hashkeys) - callback(action='init', value={'total': total, 'description': 'New objects'}) + callback(action='init', value={'total': total, 'description': 'Listing objects'}) # Update at most 400 times, avoiding to increase CPU usage; if the list is small: every object. update_every = max(int(total / 1000), 1) # Counter of how many objects have been since since the last update. @@ -1728,7 +1777,7 @@ def import_objects( # pylint: disable=too-many-locals,too-many-statements,too-m if callback: # If we have a callback, compute the total count of objects in this pack total = len(hashkeys) - callback(action='init', value={'total': total, 'description': 'Objects'}) + callback(action='init', value={'total': total, 'description': 'Copy objects'}) # Update at most 400 times, avoiding to increase CPU usage; if the list is small: every object. update_every = max(int(total / 1000), 1) # Counter of how many objects have been since since the last update. @@ -1813,6 +1862,8 @@ def import_objects( # pylint: disable=too-many-locals,too-many-statements,too-m # I create a list of hash keys and the corresponding content temp_old_hashkeys, data = zip(*content_cache.items()) # I put all of them in bulk + + # TODO: make a callback wrapper that renames the bar to, e.g., 'final flush' temp_new_hashkeys = self.add_objects_to_pack( data, compress=compress, no_holes=no_holes, no_holes_read_twice=no_holes_read_twice, callback=callback ) diff --git a/disk_objectstore/utils.py b/disk_objectstore/utils.py index b54f66d..7a94148 100644 --- a/disk_objectstore/utils.py +++ b/disk_objectstore/utils.py @@ -899,19 +899,34 @@ def compute_hash_and_size(stream, hash_type): return hasher.hexdigest(), size -def detect_where_sorted(left_iterator, right_iterator): # pylint: disable=too-many-branches, too-many-statements +def detect_where_sorted(left_iterator, right_iterator, left_key=None): # pylint: disable=too-many-branches, too-many-statements """Generator that loops in alternation (but only once each) the two iterators and yields an element, specifying if it's only on the left, only on the right, or in both. .. note:: IMPORTANT! The two iterators MUST return unique and sorted results. + .. note:: if it's on both, the one on the left is returned. + This function will check and raise a ValueError if it detects non-unique or non-sorted elements. HOWEVER, this exception is raised only at the first occurrence of the issue, that can be very late in the execution, so if you process results in a streamed way, please ensure that you pass sorted iterators. + + :param left_iterator: a left iterator + :param right_iterator: a right iterator + :param left_key: if specified, it's a lambda that determines how to process each element + of the left iterator when comparing with the right iterator. For instance, the left + iterator might be a tuple, whose first element is a hash key, while the right iterator + just a list of hash keys. In this case, left_key could be defined as a lambda returning + the first element of the tuple. + Note that when the element is in both iterators, the left one is returned (i.e. the + full tuple, in this example). """ left_exhausted = False right_exhausted = False + if left_key is None: + left_key = lambda x: x + # Convert first in iterators (in case they are, e.g., lists) left_iterator = iter(left_iterator) right_iterator = iter(right_iterator) @@ -931,7 +946,7 @@ def detect_where_sorted(left_iterator, right_iterator): # pylint: disable=too-m return now_left = True - if left_exhausted or (not right_exhausted and last_left > last_right): + if left_exhausted or (not right_exhausted and left_key(last_left) > last_right): now_left = False # I want the 'current' (now) to be behind or at the same position of the other at any time while not (left_exhausted and right_exhausted): @@ -940,12 +955,12 @@ def detect_where_sorted(left_iterator, right_iterator): # pylint: disable=too-m if right_exhausted: yield last_left, Location.LEFTONLY else: - if last_left == last_right: + if left_key(last_left) == last_right: # They are equal: add to intersection and continue yield last_left, Location.BOTH # I need to consume and advance on both iterators at the next iteration advance_both = True - elif last_left < last_right: + elif left_key(last_left) < last_right: # the new entry (last_left) is still smaller: it's on the left only yield last_left, Location.LEFTONLY else: @@ -957,12 +972,12 @@ def detect_where_sorted(left_iterator, right_iterator): # pylint: disable=too-m if left_exhausted: yield last_right, Location.RIGHTONLY else: - if last_left == last_right: + if left_key(last_left) == last_right: # They are equal: add to intersection and continue - yield last_right, Location.BOTH + yield last_left, Location.BOTH # I need to consume and advance on both iterators at the next iteration advance_both = True - elif last_left > last_right: + elif left_key(last_left) > last_right: # the new entry (last_right) is still smaller: it's on the right only yield last_right, Location.RIGHTONLY else: @@ -981,10 +996,10 @@ def detect_where_sorted(left_iterator, right_iterator): # pylint: disable=too-m if now_left or advance_both: try: new = next(left_iterator) - if new <= last_left: + if left_key(new) <= left_key(last_left): raise ValueError( "The left iterator does not return sorted unique entries, I got '{}' after '{}'".format( - new, last_left + left_key(new), left_key(last_left) ) ) last_left = new From 690ac34e011f9c7fda0b36e0be7d330f3eecd348 Mon Sep 17 00:00:00 2001 From: Giovanni Pizzi Date: Wed, 26 Aug 2020 12:37:57 +0200 Subject: [PATCH 09/18] Adding missing optional dev dependencies They were listed in dev-requirements.txt with the correct version (and used by the tests) but they were missing in `setup.py`. They were `pytest-benchmark` and `coverage`. --- setup.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/setup.py b/setup.py index ffba7ec..94c13fa 100644 --- a/setup.py +++ b/setup.py @@ -33,8 +33,8 @@ ], extras_require={ 'dev': [ - 'profilehooks', 'psutil', 'click', 'pre-commit', 'yapf', 'prospector', 'pylint', 'pytest', 'pytest-cov', - 'memory-profiler', 'pywin32; platform_system == "Windows"' + 'click', 'coverage', 'memory-profiler', 'pre-commit', 'profilehooks', 'prospector', 'psutil', 'pylint', + 'pytest', 'pytest-cov', 'pytest-benchmark', 'pywin32; platform_system == "Windows"', 'yapf' ], }, packages=find_packages(), From 471ac9bee661f5c2eaa578989fe062ff8346b096 Mon Sep 17 00:00:00 2001 From: Giovanni Pizzi Date: Wed, 26 Aug 2020 15:31:42 +0200 Subject: [PATCH 10/18] Adding parameters to decide whether to do fsync and to commit in some writing methods These changes are motivated by benchmarks on a large repository ("SDB") with 6.8M nodes. --- disk_objectstore/container.py | 107 ++++++++++++++++++++++++++-------- disk_objectstore/utils.py | 22 +++++++ 2 files changed, 104 insertions(+), 25 deletions(-) diff --git a/disk_objectstore/container.py b/disk_objectstore/container.py index 8c774c1..62fec78 100644 --- a/disk_objectstore/container.py +++ b/disk_objectstore/container.py @@ -6,6 +6,7 @@ import json import os import shutil +import warnings import zlib from collections import defaultdict, namedtuple @@ -19,8 +20,8 @@ from .models import Base, Obj from .utils import ( ObjectWriter, PackedObjectReader, StreamDecompresser, CallbackStreamWrapper, Location, chunk_iterator, - is_known_hash, nullcontext, safe_flush_to_disk, get_hash, compute_hash_and_size, merge_sorted, detect_where_sorted, - yield_first_element + is_known_hash, nullcontext, rename_callback, safe_flush_to_disk, get_hash, compute_hash_and_size, merge_sorted, + detect_where_sorted, yield_first_element ) from .exceptions import NotExistent, NotInitialised, InconsistentContent @@ -75,7 +76,8 @@ class Container: # pylint: disable=too-many-public-methods # If the length of required elements is larger than this, instead of iterating an IN statement over chunks of size # _IN_SQL_MAX_LENGTH, it just quickly lists all elements (ordered by hashkey, requires a VACUUMed DB for # performance) and returns only the intersection. - # TODO: benchmark this value and set an an appropriate value. + # This length might need some benchmarking, but seems OK on very large DBs of 6M nodes + # (after VACUUMing, as mentioned above). _MAX_CHUNK_ITERATE_LENGTH = 9500 def __init__(self, folder): @@ -1111,12 +1113,18 @@ def _write_data_to_packfile(self, pack_handle, read_handle, compress, hash_type= return (count_read_bytes, hasher.hexdigest() if hash_type else None) - def pack_all_loose(self, compress=False, validate_objects=True): + def pack_all_loose( # pylint: disable=too-many-locals,too-many-branches + self, compress=False, validate_objects=True, do_fsync=True + ): """Pack all loose objects. This is a maintenance operation, needs to be done only by one process. :param compress: if True, compress objects before storing them. :param validate_objects: if True, recompute the hash while packing, and raises if there is a problem. + :param do_fsync: if True, calls a flush to disk of the pack files before closing it. + Needed to guarantee that data will be there even in the case of a power loss. + Set to False if you don't need such a guarantee (anyway the loose version will be kept, + so often this guarantee is not strictly needed). """ hash_type = self.hash_type if validate_objects else None @@ -1229,7 +1237,8 @@ def pack_all_loose(self, compress=False, validate_objects=True): # I don't commit here; I commit after making sure the file is flushed and closed # flush and sync to disk before closing - safe_flush_to_disk(pack_handle, os.path.realpath(pack_handle.name), use_fullsync=True) + if do_fsync: + safe_flush_to_disk(pack_handle, os.path.realpath(pack_handle.name), use_fullsync=True) # OK, if we are here, file was flushed, synced to disk and closed. # Let's commit then the information to the DB, so it's officially a @@ -1254,16 +1263,18 @@ def add_streamed_object_to_pack( # pylint: disable=too-many-arguments no_holes=False, no_holes_read_twice=True, callback=None, - callback_size_hint=0 + callback_size_hint=0, + do_fsync=True, + do_commit=True ): """Add a single object in streamed form to a pack. For the description of the parameters, see the docstring of ``add_streamed_objects_to_pack``. The only difference is that here the callback will provide feedback on the progress of this specific object. - :param callback_size_hint: the expected size of the stream - if not provided, it is used send back the total + :param callback_size_hint: the expected size of the stream - if provided, it is used send back the total length in the callbacks - :return: a single objec hash key + :return: a single object hash key """ streams = [CallbackStreamWrapper(stream, callback=callback, total_length=callback_size_hint)] @@ -1274,7 +1285,9 @@ def add_streamed_object_to_pack( # pylint: disable=too-many-arguments open_streams=open_streams, no_holes=no_holes, no_holes_read_twice=no_holes_read_twice, - callback=None + callback=None, + do_fsync=do_fsync, + do_commit=do_commit, ) # Close the callback so the bar doesn't remaing open @@ -1285,7 +1298,7 @@ def add_streamed_object_to_pack( # pylint: disable=too-many-arguments def add_streamed_objects_to_pack( # pylint: disable=too-many-locals, too-many-branches, too-many-statements, too-many-arguments self, stream_list, compress=False, open_streams=False, no_holes=False, no_holes_read_twice=True, - callback=None): + callback=None, do_fsync=True, do_commit=True): """Add objects directly to a pack, reading from a list of streams. This is a maintenance operation, available mostly for efficiency reasons @@ -1308,6 +1321,15 @@ def add_streamed_objects_to_pack( # pylint: disable=too-many-locals, too-many-b This of course gives a performance hit as data has to be read twice, and rehashed twice; but avoids risking to damage the hard drive if e.g. re-importing the exact same data). This variable is ignored if `no_holes` is False. + :param do_fsync: if True (default), call an fsync for every pack file, to ensure flushing to + disk. Important to guarantee that data is not lost even in the case of a power loss. + For performance (especially if you don't need such a guarantee, e.g. if you are creating + from scratch a new repository with copy of objects), set it to False. + :param do_commit: if True (default), commit data to the DB after every pack is written. + In this way, even if there is an issue, partial objects end up in the repository. + Set to False for efficiency if you need to call this function multiple times. In this case, + however, remember to call a `commit()` call on the `session` manually at the end of the + operations! (See e.g. the `import_files()` method). :return: a list of object hash keys """ yield_per_size = 1000 @@ -1511,15 +1533,18 @@ def add_streamed_objects_to_pack( # pylint: disable=too-many-locals, too-many-b obj_dicts = [] # I don't commit here; I commit after making sure the file is flushed and closed - # flush and sync to disk before closing - safe_flush_to_disk(pack_handle, os.path.realpath(pack_handle.name), use_fullsync=True) + if do_fsync: + safe_flush_to_disk(pack_handle, os.path.realpath(pack_handle.name), use_fullsync=True) # OK, if we are here, file was flushed, synced to disk and closed. # Let's commit then the information to the DB, so it's officially a # packed object. Note: committing as soon as we are done with one pack, # so if there's a problem with one pack we don't start operating on the next one # Note: because of the logic above, in theory this should not raise an IntegrityError! - session.commit() + # For efficiency, you might want to set do_commit = False in the call, and then + # call a `session.commit()` in the caller, as it is done for instance in `import_files()`. + if do_commit: + session.commit() if callback: # Final call to complete the bar @@ -1530,11 +1555,10 @@ def add_streamed_objects_to_pack( # pylint: disable=too-many-locals, too-many-b return hashkeys - # yapf: disable - def add_objects_to_pack( - self, content_list, compress=False, no_holes=False, no_holes_read_twice=True, callback=None + def add_objects_to_pack( # pylint: disable=too-many-arguments + self, content_list, compress=False, no_holes=False, no_holes_read_twice=True, + callback=None, do_fsync=True, do_commit=True ): - # yapf: enable """Add objects directly to a pack, reading from a list of content byte arrays. This is a maintenance operation, available mostly for efficiency reasons @@ -1552,6 +1576,12 @@ def add_objects_to_pack( to write on disk and then overwrite with another object). See comments in the docstring of ``add_streamed_objects_to_pack``. This variable is ignored if `no_holes` is False. + :param callback: a callback to monitor the progress, see docstring of `_validate_hashkeys_pack()` + :param do_fsync: if True (default), call an fsync for every pack file, to ensure flushing to + disk. See docstring of `add_streamed_objects_to_pack()` for further comments on the use of this flag. + :param do_commit: if True (default), commit data to the DB after every pack is written. + See docstring of `add_streamed_objects_to_pack()` for further comments on the use of this flag. + :return: a list of object hash keys """ stream_list = [io.BytesIO(content) for content in content_list] @@ -1560,7 +1590,9 @@ def add_objects_to_pack( compress=compress, no_holes=no_holes, no_holes_read_twice=no_holes_read_twice, - callback=callback + callback=callback, + do_fsync=do_fsync, + do_commit=do_commit ) def _vacuum(self): @@ -1683,13 +1715,14 @@ def clean_storage(self, vacuum=False): # pylint: disable=too-many-branches # I just ignore, I will remove it in a future call of this method. pass - def import_objects( # pylint: disable=too-many-locals,too-many-statements,too-many-branches + def import_objects( # pylint: disable=too-many-locals,too-many-statements,too-many-branches,too-many-arguments self, hashkeys, source_container, compress=False, target_memory_bytes=104857600, - callback=None + callback=None, + do_fsync=True ): """Imports the objects with the specified hashkeys into the container. @@ -1700,6 +1733,10 @@ def import_objects( # pylint: disable=too-many-locals,too-many-statements,too-m Larger values allow to read and write in bulk that is more efficient, but of course require more memory. Note that actual memory usage will be larger (SQLite DB, storage of the hashkeys are not included - this only counts the RAM needed for the object content). Default: 100MB. + :param callback: a callback to monitor the importing process. See docstring of `_validate_hashkeys_pack()`. + :param do_fsync: whether to do a fsync on every pack object when it's written. True by default; set it + to False for efficiency if this guarantee is not needed, e.g. if you are creating a new + Container from scratch as a part of a larger import/export operation. :return: a mapping from the old hash keys (in the ``source_container``) to the new hash keys (in this container). @@ -1801,6 +1838,8 @@ def import_objects( # pylint: disable=too-many-locals,too-many-statements,too-m compress=compress, no_holes=no_holes, no_holes_read_twice=no_holes_read_twice, + do_fsync=do_fsync, + do_commit=False # I will do a final commit ) ) elif cache_size + meta['size'] > target_memory_bytes: @@ -1814,9 +1853,15 @@ def import_objects( # pylint: disable=too-many-locals,too-many-statements,too-m temp_old_hashkeys, data = zip(*content_cache.items()) # I put all of them in bulk - # I accept the performance hit of reading twice (especially since it's already on memory) + # I accept the performance hit of reading twice if the hash type is different + # (especially since it's already on memory) temp_new_hashkeys = self.add_objects_to_pack( - data, compress=compress, no_holes=no_holes, no_holes_read_twice=no_holes_read_twice + data, + compress=compress, + no_holes=no_holes, + no_holes_read_twice=no_holes_read_twice, + do_fsync=do_fsync, + do_commit=False ) # I update the list of known old (this container) and new (other_container) hash keys @@ -1863,9 +1908,15 @@ def import_objects( # pylint: disable=too-many-locals,too-many-statements,too-m temp_old_hashkeys, data = zip(*content_cache.items()) # I put all of them in bulk - # TODO: make a callback wrapper that renames the bar to, e.g., 'final flush' temp_new_hashkeys = self.add_objects_to_pack( - data, compress=compress, no_holes=no_holes, no_holes_read_twice=no_holes_read_twice, callback=callback + data, + compress=compress, + no_holes=no_holes, + no_holes_read_twice=no_holes_read_twice, + callback=rename_callback(callback, new_description='Final flush'), + do_fsync=do_fsync, + # I will commit at the end + do_commit=False ) # I update the list of known old (this container) and new (other_container) hash keys @@ -1875,12 +1926,17 @@ def import_objects( # pylint: disable=too-many-locals,too-many-statements,too-m # Create a mapping from the old to the new hash keys: old_new_obj_hashkey_mapping[old_hashkey] = new_hashkey old_new_obj_hashkey_mapping = dict(zip(old_obj_hashkeys, new_obj_hashkeys)) + # Since I called the `add_objects_to_pack` without committing (gives a boost for performance), + # I need now to commit to save what I've been doing. + self._get_cached_session().commit() + return old_new_obj_hashkey_mapping def export(self, hashkeys, other_container, compress=False, target_memory_bytes=104857600, callback=None): """Export the specified hashkeys to a new container (must be already initialised). - .. note:: This is a wrapper of the ``import_objects`` function of the ``other_container``. + ..deprecated:: 0.6 + Deprecated: use the ``import_objects`` method of ``other_container`` instead. :param hashkeys: an iterable of hash keys. :param other_container: another Container class into which you want to export the specified hash keys of this @@ -1893,6 +1949,7 @@ def export(self, hashkeys, other_container, compress=False, target_memory_bytes= :return: a mapping from the old hash keys (in this container) to the new hash keys (in `other_container`). """ + warnings.warn('function is deprecated, use `import_objects` instead', DeprecationWarning) return other_container.import_objects( hashkeys=hashkeys, source_container=self, diff --git a/disk_objectstore/utils.py b/disk_objectstore/utils.py index 7a94148..74b4a8d 100644 --- a/disk_objectstore/utils.py +++ b/disk_objectstore/utils.py @@ -524,6 +524,28 @@ def close_callback(self): self._callback(action='close', value=None) +def rename_callback(callback, new_description): + """Given a callback, return a new one where the description will be changed to `new_name`. + + Works even if `callback` is None (in this case, it returns None). + :param callback: a callback function. + :param new_description: a string with a modified description for the callback. + This will be replaced during the `init` call to the callback. + """ + if callback is None: + return None + + def wrapper_callback(action, value): + """A wrapper callback with changed description.""" + if action == 'init': + new_value = value.copy() + new_value['description'] = new_description + return callback(action, new_value) + return callback(action, value) + + return wrapper_callback + + class StreamDecompresser: """A class that gets a stream of compressed zlib bytes, and returns the corresponding uncompressed bytes when being read via the .read() method. From 8df5e2b4190ae643c870c58ab5ddfd0a7aa9ba00 Mon Sep 17 00:00:00 2001 From: Giovanni Pizzi Date: Wed, 26 Aug 2020 20:42:40 +0200 Subject: [PATCH 11/18] Adding callback tests - Added a callback_instance fixture to facilitate tests - Added complete coverage of the new callback-related functions in the utils module - Started to implement coverage of some callback-related function in the container file. --- tests/conftest.py | 33 ++++++++++ tests/test_container.py | 63 ++++++++++--------- tests/test_utils.py | 131 ++++++++++++++++++++++++++++++++++++++++ 3 files changed, 198 insertions(+), 29 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index f96bb6e..a031314 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -25,6 +25,39 @@ def pytest_generate_tests(metafunc): metafunc.parametrize('concurrency_repetition_index', range(metafunc.config.option.concurrency_repetitions)) +@pytest.fixture(scope='function') +def callback_instance(): + """Return the CallbackClass for the tests.""" + + class CallbackClass: + """Class that manages the callback and checks that it is correctly called.""" + + def __init__(self): + """Initialise the class.""" + self.current_action = None + self.performed_actions = [] + + def callback(self, action, value): + """Check how the callback is called.""" + + if action == 'init': + assert self.current_action is None, "Starting a new action '{}' without closing the old one {}".format( + action, self.current_action + ) + self.current_action = {'start_value': value, 'value': 0} + elif action == 'update': + # Track the current position + self.current_action['value'] += value + elif action == 'close': + # Add to list of performed actions + self.performed_actions.append(self.current_action) + self.current_action = None + else: + raise AssertionError("Unknown action '{}'".format(action)) + + yield CallbackClass() + + @pytest.fixture(scope='function') def temp_container(temp_dir): # pylint: disable=redefined-outer-name """Return an object-store container in a given temporary directory. diff --git a/tests/test_container.py b/tests/test_container.py index 249baf8..47123f3 100644 --- a/tests/test_container.py +++ b/tests/test_container.py @@ -2045,37 +2045,10 @@ def test_validate_corrupt_packed_size(temp_container): # pylint: disable=invali assert not any(errors.values()) -def test_validate_callback(temp_container): +def test_validate_callback(temp_container, callback_instance): """Test the correctness of the callbacks. Stores the calls to check at the end that everything was called correctly.""" - - class CallbackClass: - """Class that manages the callback.""" - - def __init__(self): - """Initialise the class.""" - self.current_action = None - self.performed_actions = [] - - def callback(self, action, value): - """Check how the callback is called.""" - - if action == 'init': - assert self.current_action is None, "Starting a new action '{}' without closing the old one {}".format( - action, self.current_action - ) - self.current_action = {'start_value': value, 'value': 0} - elif action == 'update': - # Track the current position - self.current_action['value'] += value - elif action == 'close': - # Add to list of performed actions - self.performed_actions.append(self.current_action) - self.current_action = None - else: - raise AssertionError("Unknown action '{}'".format(action)) - # Add packed objects (2001, 10 chars each), *not* a multiple of 400 (that is the internal value # of how many events should be triggered as a maximum) len_packed = 2001 @@ -2088,7 +2061,6 @@ def callback(self, action, value): for content in data: temp_container.add_object(content) - callback_instance = CallbackClass() temp_container.validate(callback=callback_instance.callback) assert callback_instance.current_action is None, ( @@ -2118,6 +2090,39 @@ def callback(self, action, value): } +@pytest.mark.parametrize('use_size_hint', [True, False]) +def test_add_streamed_object_to_pack_callback( # pylint: disable=invalid-name + temp_container, use_size_hint, callback_instance + ): + """Test the correctness of the callback of add_streamed_object_to_pack.""" + # Add packed objects (2001, 10 chars each), *not* a multiple of 400 (that is the internal value + # of how many events should be triggered as a maximum) + length = 1000000 + content = b'0' * length + stream = io.BytesIO(content) + + if use_size_hint: + hashkey = temp_container.add_streamed_object_to_pack( + stream, callback_size_hint=length, callback=callback_instance.callback + ) + else: + hashkey = temp_container.add_streamed_object_to_pack(stream, callback=callback_instance.callback) + + assert temp_container.get_object_content(hashkey) == content + + assert callback_instance.current_action is None, ( + "The 'validate' call did not perform a final callback with a 'close' event" + ) + + assert callback_instance.performed_actions == [{ + 'start_value': { + 'total': length if use_size_hint else 0, + 'description': 'Streamed object' + }, + 'value': length + }] + + @pytest.mark.parametrize('ask_deleting_unknown', [True, False]) @pytest.mark.parametrize('compress', [True, False]) def test_delete(temp_container, compress, ask_deleting_unknown): # pylint: disable=too-many-statements diff --git a/tests/test_utils.py b/tests/test_utils.py index 7878ea4..472b622 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -1355,3 +1355,134 @@ def test_merge_sorted(): assert result1 == sorted(set(first + second)) assert result2 == sorted(set(first + second)) + + +def test_callback_stream_wrapper_none(): # pylint: disable=invalid-name + """Test the callback stream wrapper with no actual callback.""" + with tempfile.TemporaryFile(mode='rb+') as fhandle: + fhandle.write(b'abc') + fhandle.seek(0) + + wrapped = utils.CallbackStreamWrapper(fhandle, callback=None) + + assert wrapped.mode == 'rb+' + assert wrapped.seekable + # Seek forward; read from byte 1 + wrapped.seek(1) + assert wrapped.tell() == 1 + assert wrapped.read() == b'bc' + assert wrapped.tell() == 3 + # Seek backwards; read from byte 0 + wrapped.seek(0) + assert wrapped.read() == b'abc' + + wrapped.close_callback() + + +@pytest.mark.parametrize('with_total_length', [True, False]) +def test_callback_stream_wrapper(callback_instance, with_total_length): + """Test the callback stream wrapper.""" + description = 'SOME CALLBACK DESCRIPTION' + # Long string so we trigger the update_every logic + content = b'abc' * 4000 + + with tempfile.TemporaryFile(mode='rb+') as fhandle: + fhandle.write(content) + fhandle.seek(0) + + if with_total_length: + wrapped = utils.CallbackStreamWrapper( + fhandle, callback=callback_instance.callback, total_length=len(content), description=description + ) + else: + wrapped = utils.CallbackStreamWrapper(fhandle, callback=callback_instance.callback, description=description) + + assert wrapped.mode == 'rb+' + assert wrapped.seekable + # Seek forward; read from byte 10 + wrapped.seek(10) + assert wrapped.tell() == 10 + assert wrapped.read() == content[10:] + assert wrapped.tell() == len(content) + # Seek backwards; read from byte 0, all + wrapped.seek(0) + assert wrapped.read() == content + + # Seek backwards; read from byte 0, only 2 bytes + wrapped.seek(0) + assert wrapped.read(2) == content[0:2] + # Close the callback. It should be long enough so that + # the close_callback has to "flush" the internal buffer + # (when we provide the total_length) + wrapped.close_callback() + + assert callback_instance.performed_actions == [{ + 'start_value': { + 'total': len(content) if with_total_length else 0, + 'description': description + }, + 'value': len(content) + }, { + 'start_value': { + 'total': len(content) if with_total_length else 0, + 'description': '{} [rewind]'.format(description) + }, + 'value': len(content) + }, { + 'start_value': { + 'total': len(content) if with_total_length else 0, + 'description': '{} [rewind]'.format(description) + }, + 'value': 2 + }] + + +def test_rename_callback(callback_instance): + """Check the rename_callback function.""" + old_description = 'original description' + new_description = 'SOME NEW DESC' + content = b'some content' + + assert utils.rename_callback(None, new_description=new_description) is None + + # First call with the original one + wrapped = utils.CallbackStreamWrapper( + io.BytesIO(content), + callback=callback_instance.callback, + total_length=len(content), + description=old_description + ) + # Call read so the callback is called + wrapped.read() + # We need to close the callback before reusing it + wrapped.close_callback() + + # Now call with the modified one + wrapped = utils.CallbackStreamWrapper( + io.BytesIO(content), + callback=utils.rename_callback(callback_instance.callback, new_description=new_description), + total_length=len(content), + description=old_description + ) + # Call read so the callback is called + wrapped.read() + # Close the callback to flush out + wrapped.close_callback() + + assert callback_instance.performed_actions == [ + { + 'start_value': { + 'total': len(content), + 'description': old_description + }, + 'value': len(content) + }, + { + 'start_value': { + 'total': len(content), + # Here there should be the new description + 'description': new_description + }, + 'value': len(content) + } + ] From 2d8775707fedb4174e52a5b2300ce76078a8a63a Mon Sep 17 00:00:00 2001 From: Giovanni Pizzi Date: Wed, 26 Aug 2020 23:21:40 +0200 Subject: [PATCH 12/18] Adding sha1 as a second possible algorithm and getting to full coverage The addition of sha1 was required to get full coverage, as there is a part of the code (in import_object) that is different if the hash algorithms of the two containers are the same or are different (in order to optimize speed if the algorithm is the same). --- disk_objectstore/container.py | 6 +- disk_objectstore/utils.py | 4 +- tests/test_container.py | 209 ++++++++++++++++++++++++++++++---- tests/test_utils.py | 7 +- 4 files changed, 197 insertions(+), 29 deletions(-) diff --git a/disk_objectstore/container.py b/disk_objectstore/container.py index 62fec78..6d168da 100644 --- a/disk_objectstore/container.py +++ b/disk_objectstore/container.py @@ -1049,11 +1049,9 @@ def list_all_objects(self): loose_objects.difference_update((hashkey,)) yield hashkey - if results_chunk: - last_pk = results_chunk[-1][0] - else: - # No more packed objects + if not results_chunk: break + last_pk = results_chunk[-1][0] # What is left are the loose objects that are not in the packs for hashkey in loose_objects: diff --git a/disk_objectstore/utils.py b/disk_objectstore/utils.py index 74b4a8d..f9d1846 100644 --- a/disk_objectstore/utils.py +++ b/disk_objectstore/utils.py @@ -723,9 +723,7 @@ def is_known_hash(hash_type): def get_hash(hash_type): """Return a hash class with an update method and a hexdigest method.""" - known_hashes = { - 'sha256': hashlib.sha256, - } + known_hashes = {'sha1': hashlib.sha1, 'sha256': hashlib.sha256} try: return known_hashes[hash_type] diff --git a/tests/test_container.py b/tests/test_container.py index 47123f3..e0f52a3 100644 --- a/tests/test_container.py +++ b/tests/test_container.py @@ -628,7 +628,7 @@ def test_initialisation(temp_dir): assert 'already some file or folder' in str(excinfo.value) -@pytest.mark.parametrize('hash_type', ['sha256']) +@pytest.mark.parametrize('hash_type', ['sha256', 'sha1']) @pytest.mark.parametrize('compress', [True, False]) def test_check_hash_computation(temp_container, hash_type, compress): """Check that the hashes are correctly computed, when storing loose, @@ -636,6 +636,8 @@ def test_check_hash_computation(temp_container, hash_type, compress): Check both compressed and uncompressed packed objects. """ + # Re-init the container with the correct hash type + temp_container.init_container(hash_type=hash_type, clear=True) content1 = b'1' content2 = b'222' content3 = b'n2fwd' @@ -1504,8 +1506,9 @@ def test_simulate_concurrent_packing(temp_container, compress): # pylint: disab assert not os.path.exists(fname) +@pytest.mark.parametrize('do_vacuum', [True, False]) @pytest.mark.parametrize('compress', [True, False]) -def test_simulate_concurrent_packing_multiple(temp_container, compress): # pylint: disable=invalid-name +def test_simulate_concurrent_packing_multiple(temp_container, compress, do_vacuum): # pylint: disable=invalid-name """Simulate race conditions while reading and packing.""" content1 = b'abc' content2 = b'def' @@ -1527,13 +1530,13 @@ def test_simulate_concurrent_packing_multiple(temp_container, compress): # pyli assert stream.read(1) == b'a' temp_container.pack_all_loose(compress=compress) # Remove loose files - temp_container.clean_storage() + temp_container.clean_storage(vacuum=do_vacuum) assert stream.read() == b'bc' elif obj_hashkey == hashkey2: assert stream.read(1) == b'd' temp_container.pack_all_loose(compress=compress) # Remove loose files - temp_container.clean_storage() + temp_container.clean_storage(vacuum=do_vacuum) assert stream.read() == b'ef' else: # Should not happen! @@ -1555,10 +1558,45 @@ def test_simulate_concurrent_packing_multiple(temp_container, compress): # pyli assert data == temp_container.get_objects_content([hashkey1, hashkey2]) # After a second cleaning of the storage, the loose file *must* have been removed - temp_container.clean_storage() + temp_container.clean_storage(vacuum=do_vacuum) assert not os.path.exists(fname) +def test_simulate_concurrent_packing_multiple_many(temp_container): # pylint: disable=invalid-name + """Simulate race conditions while reading and packing, with more than objects _MAX_CHUNK_ITERATE_LENGTH changing.""" + expected = {} + + # I put at least one object already packed + preliminary_content = b'AAA' + preliminary_hashkey = temp_container.add_object(preliminary_content) + temp_container.pack_all_loose() + temp_container.clean_storage() + expected[preliminary_hashkey] = preliminary_content + + for idx in range(temp_container._MAX_CHUNK_ITERATE_LENGTH + 10): # pylint: disable=protected-access + content = '{}'.format(idx).encode('ascii') + expected[temp_container.add_object(content)] = content + + retrieved = {} + first = True + with temp_container.get_objects_stream_and_meta(expected.keys()) as triplets: + for obj_hashkey, stream, meta in triplets: + retrieved[obj_hashkey] = stream.read() + if first: + # I should have found only one packed object (preliminary_content). + assert obj_hashkey == preliminary_hashkey + assert meta['type'] == ObjectType.PACKED + # I will not look for the loose until I exhaust the packed objects. + # In the meantime, therefore, I pack all the rest, and I clean the storage: + # this will trigger the fallback logic to check again if there are + # objects that have been packed in the meantime. + temp_container.pack_all_loose() + temp_container.clean_storage() + first = False + + assert expected == retrieved + + @pytest.mark.parametrize('compress', [True, False]) def test_simulate_concurrent_packing_multiple_meta_only(temp_container, compress): # pylint: disable=invalid-name """Simulate race conditions while reading and packing (as earlier function, but no streams).""" @@ -1841,8 +1879,10 @@ def test_list_all_objects_extraneous(temp_dir, loose_prefix_len): # pylint: dis @pytest.mark.parametrize('target_memory_bytes', [1, 9, 100 * 1024 * 1024]) @pytest.mark.parametrize('compress_dest', [True, False]) @pytest.mark.parametrize('compress_source', [True, False]) -def test_export_to_pack(temp_container, compress_source, compress_dest, target_memory_bytes): - """Test the functionality to export to a new container.""" +# Test both the same hash and another one +@pytest.mark.parametrize('other_container_hash_type', ['sha256', 'sha1']) +def test_import_to_pack(temp_container, compress_source, compress_dest, target_memory_bytes, other_container_hash_type): + """Test the functionality to import from another container.""" obj1 = b'111111' obj2 = b'222222' obj3 = b'333332' @@ -1850,7 +1890,7 @@ def test_export_to_pack(temp_container, compress_source, compress_dest, target_m with tempfile.TemporaryDirectory() as tmpdir: other_container = Container(tmpdir) # Use the same hash type - other_container.init_container(clear=True, hash_type=temp_container.hash_type) + other_container.init_container(clear=True, hash_type=other_container_hash_type) hashkey1 = temp_container.add_object(obj1) hashkey2, hashkey3 = temp_container.add_objects_to_pack([obj2, obj3], compress=compress_source) @@ -1862,10 +1902,10 @@ def test_export_to_pack(temp_container, compress_source, compress_dest, target_m assert other_container.count_objects()['packed'] == 0 # Put only two objects - old_new_mapping = temp_container.export([hashkey1, hashkey2], - other_container, - compress=compress_dest, - target_memory_bytes=target_memory_bytes) + old_new_mapping = other_container.import_objects([hashkey1, hashkey2], + temp_container, + compress=compress_dest, + target_memory_bytes=target_memory_bytes) # Two objects should appear assert other_container.count_objects()['loose'] == 0 assert other_container.count_objects()['packed'] == 2 @@ -1877,10 +1917,10 @@ def test_export_to_pack(temp_container, compress_source, compress_dest, target_m # Add two more, one of which is already in the destination old_new_mapping.update( - temp_container.export([hashkey2, hashkey3], - other_container, - compress=compress_dest, - target_memory_bytes=target_memory_bytes) + other_container.import_objects([hashkey2, hashkey3], + temp_container, + compress=compress_dest, + target_memory_bytes=target_memory_bytes) ) # All three objects should be there, no duplicates assert other_container.count_objects()['loose'] == 0 @@ -1893,13 +1933,35 @@ def test_export_to_pack(temp_container, compress_source, compress_dest, target_m assert other_container.get_object_content(old_new_mapping[hashkey3]) == obj3 old_hashkeys, new_hashkeys = zip(*old_new_mapping.items()) - # Since we are using the same hash algorithm, the hashes should be the same! - assert old_hashkeys == new_hashkeys + if other_container_hash_type == temp_container.hash_type: + # Since we are using the same hash algorithm, the hashes ashould be the same! + assert old_hashkeys == new_hashkeys # close before exiting the context manager, so files are closed. other_container.close() +def test_export_deprecated(temp_container): + """Test that the export_function exists but is deprecated.""" + obj1 = b'111111' + + with tempfile.TemporaryDirectory() as tmpdir: + other_container = Container(tmpdir) + # Use the same hash type + other_container.init_container(clear=True, hash_type=temp_container.hash_type) + + hashkey1 = temp_container.add_object(obj1) + + # Put only two objects + with pytest.warns(DeprecationWarning): + temp_container.export([hashkey1], other_container) + + assert other_container.get_object_content(hashkey1) == obj1 + + # Close before going out, or the test will fail on Windows not being able to delete the folder + other_container.close() + + @pytest.mark.parametrize('compress', [True, False]) def test_validate(temp_container, compress): """Test the validation function.""" @@ -2095,8 +2157,6 @@ def test_add_streamed_object_to_pack_callback( # pylint: disable=invalid-name temp_container, use_size_hint, callback_instance ): """Test the correctness of the callback of add_streamed_object_to_pack.""" - # Add packed objects (2001, 10 chars each), *not* a multiple of 400 (that is the internal value - # of how many events should be triggered as a maximum) length = 1000000 content = b'0' * length stream = io.BytesIO(content) @@ -2123,6 +2183,95 @@ def test_add_streamed_object_to_pack_callback( # pylint: disable=invalid-name }] +@pytest.mark.parametrize('no_holes,no_holes_read_twice', [[True, True], [True, False], [False, False]]) +def test_add_streamed_objects_to_pack_callback( # pylint: disable=invalid-name + temp_container, callback_instance, no_holes, no_holes_read_twice + ): + """Test the correctness of the callback of add_streamed_objects_to_pack.""" + # Add packed objects (2001, 10 chars each) + len_packed = 2001 + stream_list = [io.BytesIO('p{:09d}'.format(i).encode('ascii')) for i in range(len_packed)] + + temp_container.add_streamed_objects_to_pack( + stream_list, no_holes=no_holes, no_holes_read_twice=no_holes_read_twice, callback=callback_instance.callback + ) + + # Add another 4001 packed objects with 2001 already-existing objects + len_packed2 = 4001 + stream_list = [io.BytesIO('2p{:09d}'.format(i).encode('ascii')) for i in range(len_packed2)] + + temp_container.add_streamed_objects_to_pack( + stream_list, no_holes=no_holes, no_holes_read_twice=no_holes_read_twice, callback=callback_instance.callback + ) + + assert callback_instance.current_action is None, ( + "The 'add_streamed_objects_to_pack' call did not perform a final callback with a 'close' event" + ) + + expected_actions = [] + # First call + expected_actions.append({'start_value': {'total': len_packed, 'description': 'Bulk storing'}, 'value': len_packed}) + # Second call + if no_holes: + # If no_holes is True, i.e. we do not want holes, we compute an initial list of the existing ones + expected_actions.append({ + 'start_value': { + 'total': len_packed, + 'description': 'List existing' + }, + 'value': len_packed + }) + expected_actions.append({ + 'start_value': { + 'total': len_packed2, + 'description': 'Bulk storing' + }, + 'value': len_packed2 + }) + + assert callback_instance.performed_actions == expected_actions + + +# Check both with the same hash type and with a different one +@pytest.mark.parametrize('other_container_hash_type', ['sha256', 'sha1']) +def test_import_objects_callback(temp_container, callback_instance, other_container_hash_type): + """Test the correctness of the callback of import_objects.""" + # Add packed objects (2001, 10 chars each) + len_packed = 2001 + stream_list = [io.BytesIO('p{:09d}'.format(i).encode('ascii')) for i in range(len_packed)] + hashkeys = temp_container.add_streamed_objects_to_pack(stream_list) + + with tempfile.TemporaryDirectory() as tmpdir: + other_container = Container(tmpdir) + # Use the same hash type + other_container.init_container(clear=True, hash_type=other_container_hash_type) + + # Import objects + other_container.import_objects(hashkeys, temp_container, callback=callback_instance.callback) + + assert other_container.count_objects()['loose'] == temp_container.count_objects()['loose'] + assert other_container.count_objects()['packed'] == temp_container.count_objects()['packed'] + + # close before exiting the context manager, so files are closed. + other_container.close() + + expected_actions = [] + if other_container_hash_type == temp_container.hash_type: + expected_actions.append({ + 'start_value': { + 'description': 'Listing objects', + 'total': len_packed + }, + 'value': len_packed + }) + expected_actions.append({'start_value': {'description': 'Copy objects', 'total': len_packed}, 'value': len_packed}) + # len_packed is small (and the objects are small) + # so they all end up in the final flush + expected_actions.append({'start_value': {'description': 'Final flush', 'total': len_packed}, 'value': len_packed}) + + assert callback_instance.performed_actions == expected_actions + + @pytest.mark.parametrize('ask_deleting_unknown', [True, False]) @pytest.mark.parametrize('compress', [True, False]) def test_delete(temp_container, compress, ask_deleting_unknown): # pylint: disable=too-many-statements @@ -2864,3 +3013,23 @@ def test_not_implemented_repacks(temp_container): continue with pytest.raises(NotImplementedError): temp_container.repack(compress_mode=compress_mode) + + +def test_pack_all_loose_many(temp_container): + """Check the pack_all_loose when there are many objects to pack, more than _MAX_CHUNK_ITERATE_LENGTH.""" + expected = {} + for idx in range(temp_container._MAX_CHUNK_ITERATE_LENGTH + 10): # pylint: disable=protected-access + content = '{}'.format(idx).encode('utf8') + expected[temp_container.add_object(content)] = content + + # Pack all loose objects + temp_container.pack_all_loose() + + retrieved = temp_container.get_objects_content(expected.keys()) + assert retrieved == expected + + # Pack again, nothing should happen, but it should trigger the logic in pack_all_loose at the beginning, + # with `if where == Location.BOTH` + temp_container.pack_all_loose() + retrieved = temp_container.get_objects_content(expected.keys()) + assert retrieved == expected diff --git a/tests/test_utils.py b/tests/test_utils.py index 472b622..980fdd2 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -1140,7 +1140,8 @@ def test_zero_stream_multi_read(): # Set as the second parameter the hash of the 'content' string written below # inside the test function @pytest.mark.parametrize( - 'hash_type,expected_hash', [['sha256', '9975d00a6e715d830aeaa035347b3e601a0c0bb457a7f87816300e7c01c0c39b']] + 'hash_type,expected_hash', [['sha256', '9975d00a6e715d830aeaa035347b3e601a0c0bb457a7f87816300e7c01c0c39b'], + ['sha1', '2a0439b5b34b74808b6cc7a2bf04dd02604c20b0']] ) def test_hash_writer_wrapper(temp_dir, hash_type, expected_hash): """Test some functionality of the HashWriterWrapper class.""" @@ -1193,11 +1194,13 @@ def test_is_known_hash(): """Check the functionality of the is_known_hash function.""" # At least sha256 should be supported assert utils.is_known_hash('sha256') + # sha1 should also be supported + assert utils.is_known_hash('sha1') # A weird string should not be a valid known hash assert not utils.is_known_hash('SOME_UNKNOWN_HASH_TYPE') -@pytest.mark.parametrize('hash_type', ['sha256']) +@pytest.mark.parametrize('hash_type', ['sha256', 'sha1']) def test_compute_hash_and_size(hash_type): """Check the funtion to compute the hash and size.""" From 1d7c389c353185c1923c9addb1b107c283d5f561 Mon Sep 17 00:00:00 2001 From: Giovanni Pizzi Date: Fri, 2 Oct 2020 06:26:39 +0200 Subject: [PATCH 13/18] =?UTF-8?q?=E2=9C=A8=20Add=20the=20concept=20of=20a?= =?UTF-8?q?=20(unique)=20container=20ID=20(#97)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Allows for the association of a container with an existing DB, or to uniquely refer to it. 🐛 This also fixes a bug, whereby config values were cached, but the cache was not cleared when re-initialising the container. To reduce the risk of such a problem, now only the whole configuration dictionary is cached, rather than each single config value. --- disk_objectstore/container.py | 59 ++++++++++++++++++++++++----------- tests/test_container.py | 11 +++++++ 2 files changed, 51 insertions(+), 19 deletions(-) diff --git a/disk_objectstore/container.py b/disk_objectstore/container.py index 6d168da..dd9e158 100644 --- a/disk_objectstore/container.py +++ b/disk_objectstore/container.py @@ -6,6 +6,7 @@ import json import os import shutil +import uuid import warnings import zlib @@ -87,11 +88,11 @@ def __init__(self, folder): """ self._folder = os.path.realpath(folder) self._session = None # Will be populated by the _get_session function + # These act as caches and will be populated by the corresponding properties - self._loose_prefix_len = None - self._pack_size_target = None + # IMPORANT! IF YOU ADD MORE, REMEMBER TO CLEAR THEM IN `init_container()`! self._current_pack_id = None - self._hash_type = None + self._config = None def get_folder(self): """Return the path to the folder that will host the object-store container.""" @@ -315,12 +316,24 @@ def init_container( # pylint: disable=bad-continuation if os.path.exists(self._folder): shutil.rmtree(self._folder) + # Reinitialize the configuration cache, since this will change + # (at least the container_id, possibly the rest), and the other caches + self._config = None + self._current_pack_id = None + if self.is_initialised: raise FileExistsError( 'The container already exists, so you cannot initialise it - ' 'use the clear option if you want to overwrite with a clean one' ) + # If we are here, either the folder is empty, or just cleared. + # It could also be that one of the folders does not exist. This is considered an invalid situation. + # But this will be catched later, where I check that the folder is empty before overwriting the + # configuration file. + # In this case, I have to generate a new UUID to be used as the container_id + container_id = uuid.uuid4().hex + try: os.makedirs(self._folder) except FileExistsError: @@ -339,7 +352,8 @@ def init_container( # pylint: disable=bad-continuation 'container_version': 1, # For future compatibility, this is the version of the format 'loose_prefix_len': loose_prefix_len, 'pack_size_target': pack_size_target, - 'hash_type': hash_type + 'hash_type': hash_type, + 'container_id': container_id }, fhandle ) @@ -358,39 +372,46 @@ def _get_repository_config(self): """Return the repository config.""" if not self.is_initialised: raise NotInitialised('The container is not initialised yet - use .init_container() first') - with open(self._get_config_file()) as fhandle: - config = json.load(fhandle) - return config + if self._config is None: + with open(self._get_config_file()) as fhandle: + self._config = json.load(fhandle) + return self._config @property def loose_prefix_len(self): """Return the length of the prefix of loose objects, when sharding. - This is read from the repository configuration. + This is read from the (cached) repository configuration. """ - if self._loose_prefix_len is None: - self._loose_prefix_len = self._get_repository_config()['loose_prefix_len'] - return self._loose_prefix_len + return self._get_repository_config()['loose_prefix_len'] @property def pack_size_target(self): """Return the length of the pack name, when sharding. - This is read from the repository configuration. + This is read from the (cached) repository configuration. """ - if self._pack_size_target is None: - self._pack_size_target = self._get_repository_config()['pack_size_target'] - return self._pack_size_target + return self._get_repository_config()['pack_size_target'] @property def hash_type(self): """Return the length of the prefix of loose objects, when sharding. - This is read from the repository configuration. + This is read from the (cached) repository configuration. + """ + return self._get_repository_config()['hash_type'] + + @property + def container_id(self): + """Return the repository unique ID. + + This is read from the (cached) repository configuration, and is a UUID uniquely identifying + this specific container. This is generated at the container initialization (call `init_container`) and will + never change for this container. + + Clones of the container should have a different ID even if they have the same content. """ - if self._hash_type is None: - self._hash_type = self._get_repository_config()['hash_type'] - return self._hash_type + return self._get_repository_config()['container_id'] def get_object_content(self, hashkey): """Get the content of an object with a given hash key. diff --git a/tests/test_container.py b/tests/test_container.py index e0f52a3..1676058 100644 --- a/tests/test_container.py +++ b/tests/test_container.py @@ -3033,3 +3033,14 @@ def test_pack_all_loose_many(temp_container): temp_container.pack_all_loose() retrieved = temp_container.get_objects_content(expected.keys()) assert retrieved == expected + + +def test_container_id(temp_container): + """Check the creation of unique container IDs.""" + old_container_id = temp_container.container_id + assert old_container_id is not None + assert isinstance(old_container_id, str) + + # Re-initialize: it should get a new container_id + temp_container.init_container(clear=True) + assert old_container_id != temp_container.container_id From 1b84d6b00ad68bf8e58861c712c3cb9b6394abfd Mon Sep 17 00:00:00 2001 From: Chris Sewell Date: Fri, 2 Oct 2020 18:45:30 +0100 Subject: [PATCH 14/18] =?UTF-8?q?=F0=9F=90=9B=20Fix=20performance=20regres?= =?UTF-8?q?sion=20(#102)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `Container.is_initialised` is a costly operation, loading the config JSON every time. In 1d7c389, the config is now called on every call to `loose_prefix_len`, leading to a large performance degradation. This PR makes sure the `is_initialised` test is called only if the config has not already been loaded into memory. --- disk_objectstore/container.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/disk_objectstore/container.py b/disk_objectstore/container.py index dd9e158..61b5f83 100644 --- a/disk_objectstore/container.py +++ b/disk_objectstore/container.py @@ -370,9 +370,9 @@ def init_container( # pylint: disable=bad-continuation def _get_repository_config(self): """Return the repository config.""" - if not self.is_initialised: - raise NotInitialised('The container is not initialised yet - use .init_container() first') if self._config is None: + if not self.is_initialised: + raise NotInitialised('The container is not initialised yet - use .init_container() first') with open(self._get_config_file()) as fhandle: self._config = json.load(fhandle) return self._config From d786296bc67219512f4058265ffbd8c9e6f06b0a Mon Sep 17 00:00:00 2001 From: Giovanni Pizzi Date: Sun, 4 Oct 2020 12:31:10 +0200 Subject: [PATCH 15/18] =?UTF-8?q?=E2=9C=A8=20Generalize=20compression=20al?= =?UTF-8?q?gorithm=20(#99)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The container configuration now accepts a variable for the compression algorithm to use. Currently, the supported values are zlib, with levels from 1 to 9, but this can be expanded in the future. --- disk_objectstore/container.py | 47 +++++++++++++----- disk_objectstore/utils.py | 91 +++++++++++++++++++++++++++++++++-- tests/test_container.py | 71 +++++++++++++++++++-------- tests/test_utils.py | 90 ++++++++++++++++++++++++++++------ 4 files changed, 245 insertions(+), 54 deletions(-) diff --git a/disk_objectstore/container.py b/disk_objectstore/container.py index 61b5f83..c3be5d0 100644 --- a/disk_objectstore/container.py +++ b/disk_objectstore/container.py @@ -8,7 +8,6 @@ import shutil import uuid import warnings -import zlib from collections import defaultdict, namedtuple from contextlib import contextmanager @@ -20,9 +19,9 @@ from .models import Base, Obj from .utils import ( - ObjectWriter, PackedObjectReader, StreamDecompresser, CallbackStreamWrapper, Location, chunk_iterator, - is_known_hash, nullcontext, rename_callback, safe_flush_to_disk, get_hash, compute_hash_and_size, merge_sorted, - detect_where_sorted, yield_first_element + ObjectWriter, PackedObjectReader, CallbackStreamWrapper, Location, chunk_iterator, is_known_hash, nullcontext, + rename_callback, safe_flush_to_disk, get_hash, get_compressobj_instance, get_stream_decompresser, + compute_hash_and_size, merge_sorted, detect_where_sorted, yield_first_element ) from .exceptions import NotExistent, NotInitialised, InconsistentContent @@ -51,9 +50,6 @@ class Container: # pylint: disable=too-many-public-methods """A class representing a container of objects (which is stored on a disk folder)""" _PACK_INDEX_SUFFIX = '.idx' - # Default compression level (when compression is requested) - # This is the lowest one, to get some reasonable compression without too much CPU time required - _COMPRESSLEVEL = 1 # Size in bytes of each of the chunks used when (internally) reading or writing in chunks, e.g. # when packing. _CHUNKSIZE = 65536 @@ -289,7 +285,8 @@ def is_initialised(self): return True def init_container( # pylint: disable=bad-continuation - self, clear=False, pack_size_target=4 * 1024 * 1024 * 1024, loose_prefix_len=2, hash_type='sha256' + self, clear=False, pack_size_target=4 * 1024 * 1024 * 1024, loose_prefix_len=2, hash_type='sha256', + compression_algorithm='zlib+1' ): """Initialise the container folder, if not already done. @@ -304,6 +301,7 @@ def init_container( # pylint: disable=bad-continuation The longer the length, the more folders will be used to store loose objects. Suggested values: 0 (for not using subfolders) or 2. :param hash_type: a string defining the hash type to use. + :param compression_algorithm: a string defining the compression algorithm to use for compressed objects. """ if loose_prefix_len < 0: raise ValueError('The loose prefix length can only be zero or a positive integer') @@ -345,6 +343,11 @@ def init_container( # pylint: disable=bad-continuation 'There is already some file or folder in the Container folder, I cannot initialise it!' ) + # validate the compression algorithm: check if I'm able to load the classes to compress and decompress + # with the given specified string + get_compressobj_instance(compression_algorithm) + get_stream_decompresser(compression_algorithm) + # Create config file with open(self._get_config_file(), 'w') as fhandle: json.dump( @@ -353,7 +356,8 @@ def init_container( # pylint: disable=bad-continuation 'loose_prefix_len': loose_prefix_len, 'pack_size_target': pack_size_target, 'hash_type': hash_type, - 'container_id': container_id + 'container_id': container_id, + 'compression_algorithm': compression_algorithm }, fhandle ) @@ -413,6 +417,23 @@ def container_id(self): """ return self._get_repository_config()['container_id'] + @property + def compression_algorithm(self): + """Return the compression algorithm defined for this container. + + This is read from the repository configuration.""" + return self._get_repository_config()['compression_algorithm'] + + def _get_compressobj_instance(self): + """Return the correct `compressobj` class for the compression algorithm defined for this container.""" + return get_compressobj_instance(self.compression_algorithm) + + def _get_stream_decompresser(self): + """Return a new instance of the correct StreamDecompresser class for the compression algorithm + defined for this container. + """ + return get_stream_decompresser(self.compression_algorithm) + def get_object_content(self, hashkey): """Get the content of an object with a given hash key. @@ -553,7 +574,7 @@ def _get_objects_stream_meta_generator( # pylint: disable=too-many-branches,too fhandle=last_open_file, offset=metadata.offset, length=metadata.length ) if metadata.compressed: - obj_reader = StreamDecompresser(obj_reader) + obj_reader = self._get_stream_decompresser()(obj_reader) yield metadata.hashkey, obj_reader, meta else: yield metadata.hashkey, meta @@ -671,7 +692,7 @@ def _get_objects_stream_meta_generator( # pylint: disable=too-many-branches,too fhandle=last_open_file, offset=metadata.offset, length=metadata.length ) if metadata.compressed: - obj_reader = StreamDecompresser(obj_reader) + obj_reader = self._get_stream_decompresser()(obj_reader) yield metadata.hashkey, obj_reader, meta else: yield metadata.hashkey, meta @@ -1107,7 +1128,7 @@ def _write_data_to_packfile(self, pack_handle, read_handle, compress, hash_type= hasher = get_hash(hash_type=hash_type)() if compress: - compressobj = zlib.compressobj(level=self._COMPRESSLEVEL) + compressobj = self._get_compressobj_instance() count_read_bytes = 0 while True: @@ -2057,7 +2078,7 @@ def callback(self, action, value): for hashkey, size, offset, length, compressed in query: obj_reader = PackedObjectReader(fhandle=pack_handle, offset=offset, length=length) if compressed: - obj_reader = StreamDecompresser(obj_reader) + obj_reader = self._get_stream_decompresser()(obj_reader) computed_hash, computed_size = compute_hash_and_size(obj_reader, self.hash_type) diff --git a/disk_objectstore/utils.py b/disk_objectstore/utils.py index f9d1846..9e1bff6 100644 --- a/disk_objectstore/utils.py +++ b/disk_objectstore/utils.py @@ -4,6 +4,7 @@ like the ``LazyOpener``. """ # pylint: disable= too-many-lines +import abc import hashlib import itertools import os @@ -546,13 +547,32 @@ def wrapper_callback(action, value): return wrapper_callback -class StreamDecompresser: - """A class that gets a stream of compressed zlib bytes, and returns the corresponding +class ZlibLikeBaseStreamDecompresser(abc.ABC): + """A class that gets a stream of compressed bytes, and returns the corresponding uncompressed bytes when being read via the .read() method. + + This is the base class. Define the `decompressobj_class` and `decompress_error` properties to implement concrete + decompresser classes using specific algorithms that follow the zlib API. """ _CHUNKSIZE = 524288 + @property + @abc.abstractmethod + def decompressobj_class(self): + """Return here the `decompressobj` class of the given compression type. + + Needs to be implemented by subclasses. + """ + + @property + @abc.abstractmethod + def decompress_error(self): + """Return here the Exception (or tuple of exceptions) that need to be caught if there is a compression error. + + Needs to be implemented by subclasses. + """ + def __init__(self, compressed_stream): """Create the class from a given compressed bytestream. @@ -560,7 +580,7 @@ def __init__(self, compressed_stream): returning a valid compressed stream. """ self._compressed_stream = compressed_stream - self._decompressor = zlib.decompressobj() + self._decompressor = self.decompressobj_class() self._internal_buffer = b'' self._pos = 0 @@ -602,7 +622,7 @@ def read(self, size=-1): # .unconsumed_tail and reused a the next loop try: decompressed_chunk = self._decompressor.decompress(compressed_chunk, size) - except zlib.error as exc: + except self.decompress_error as exc: raise ValueError('Error while uncompressing data: {}'.format(exc)) self._internal_buffer += decompressed_chunk @@ -653,7 +673,7 @@ def seek(self, target, whence=0): if target == 0: # Going back to zero it's efficient. I need to reset all internal variables, as in the init. self._compressed_stream.seek(0) - self._decompressor = zlib.decompressobj() + self._decompressor = self.decompressobj_class() self._internal_buffer = b'' self._pos = 0 return 0 @@ -673,6 +693,67 @@ def seek(self, target, whence=0): return self._pos +class ZlibStreamDecompresser(ZlibLikeBaseStreamDecompresser): + """A class that gets a stream of compressed bytes using ZLIB, and returns the corresponding + uncompressed bytes when being read via the .read() method.""" + + @property + def decompressobj_class(self): + """Return the `decompressobj` class of zlib.""" + return zlib.decompressobj + + @property + def decompress_error(self): + """Return the zlib error raised when there is an error.""" + return zlib.error + + +def _get_compression_algorithm_info(algorithm): + """Return a compresser and a decompresser for the given algorithm.""" + known_algorithms = { + 'zlib': { + 'compressobj': zlib.compressobj, + 'variant_name': 'level', + 'variant_mapper': {str(i): i for i in range(1, 10)}, # from 1 to 9 + 'decompresser': ZlibStreamDecompresser + } + } + + algorithm_name, _, variant = algorithm.partition('+') + try: + algorithm_info = known_algorithms[algorithm_name] + except KeyError: + raise ValueError("Unknown or unsupported compression algorithm '{}'".format(algorithm_name)) + try: + kwargs = {algorithm_info['variant_name']: algorithm_info['variant_mapper'][variant]} + compresser = algorithm_info['compressobj'](**kwargs) + except KeyError: + raise ValueError("Invalid variant '{}' for compression algorithm '{}'".format(variant, algorithm_name)) + + decompresser = algorithm_info['decompresser'] + + return compresser, decompresser + + +def get_compressobj_instance(algorithm): + """Return a compressobj class with a given algorithm. + + :param algorithm: A string defining the algorithm and its variant. + The algorithm is split by a + sign from the variant. + E.g. 'zlib+1' means using a level 1, while 'zlib+9' indicates a zlib compression with level 9 + (slower but compressing more). + """ + return _get_compression_algorithm_info(algorithm)[0] + + +def get_stream_decompresser(algorithm): + """Return a StreamDecompresser class with a given algorithm. + + :param algorithm: a compression algorithm (see `get_compressionobj_instance` for a description). + """ + return _get_compression_algorithm_info(algorithm)[1] + + class ZeroStream: """A class to return an (unseekable) stream returning only zeros, with length length.""" diff --git a/tests/test_container.py b/tests/test_container.py index 1676058..6060e64 100644 --- a/tests/test_container.py +++ b/tests/test_container.py @@ -8,7 +8,6 @@ import shutil import stat import tempfile -import zlib import pathlib import psutil @@ -18,6 +17,8 @@ from disk_objectstore import utils, models import disk_objectstore.exceptions as exc +COMPRESSION_ALGORITHMS_TO_TEST = ['zlib+1', 'zlib+9'] + class UnopenableBytesIO(io.BytesIO): """An extension of BytesIO that cannot be used as a context manager.""" @@ -951,9 +952,13 @@ def test_locked_object_while_packing( # pylint: disable=invalid-name assert temp_container.get_object_content(hashkey2) == content2 +# Note that until when we want to support py3.5, we cannot specify the +# level as a keyword argument, as this was added only in python 3.6 +@pytest.mark.parametrize('compression_algorithm', COMPRESSION_ALGORITHMS_TO_TEST) @pytest.mark.parametrize('compress_packs', [True, False]) -def test_sizes(temp_container, generate_random_data, compress_packs): +def test_sizes(temp_container, generate_random_data, compress_packs, compression_algorithm): """Check that the information on size is reliable.""" + temp_container.init_container(clear=True, compression_algorithm=compression_algorithm) size_info = temp_container.get_total_size() assert size_info['total_size_packed'] == 0 assert size_info['total_size_packed_on_disk'] == 0 @@ -988,15 +993,12 @@ def test_sizes(temp_container, generate_random_data, compress_packs): ) == total_object_size if compress_packs: - # Compress data manually to get compressed size - # In the current version, compression level is hardcoded. - # If this becomes a parameter, we need to change this test - # Note that until when we want to support py3.5, we cannot specify the - # level as a keyword argument, as this was added only in python 3.6 - compressed_data = { - key: zlib.compress(val, temp_container._COMPRESSLEVEL) # pylint: disable=protected-access - for key, val in data.items() - } + compressed_data = {} + for key, val in data.items(): + compresser = utils.get_compressobj_instance(compression_algorithm) + compressed = compresser.compress(val) + compressed += compresser.flush() + compressed_data[key] = compressed total_compressed_size = sum(len(value) for value in compressed_data.values()) size_info = temp_container.get_total_size() @@ -1190,10 +1192,16 @@ def test_get_objects_meta_doesnt_open(temp_container, generate_random_data): # assert len(current_process.open_files()) == start_open_files +# Note that until when we want to support py3.5, we cannot specify the +# level as a keyword argument, as this was added only in python 3.6 +@pytest.mark.parametrize('compression_algorithm', COMPRESSION_ALGORITHMS_TO_TEST) @pytest.mark.parametrize('compress', [True, False]) @pytest.mark.parametrize('skip_if_missing', [True, False]) -def test_stream_meta(temp_container, compress, skip_if_missing): +def test_stream_meta( # pylint: disable=too-many-locals + temp_container, compress, skip_if_missing, compression_algorithm + ): """Validate the meta dictionary returned by the get_objects_stream_and_meta and get_objects_meta.""" + temp_container.init_container(clear=True, compression_algorithm=compression_algorithm) # This is the list of all known meta keys. # I do also an explicit check that all and only these are present # This is implicit since I will later also compare the exact dictionaries and not only their keys, @@ -1206,10 +1214,10 @@ def test_stream_meta(temp_container, compress, skip_if_missing): hashkey_packed = temp_container.add_objects_to_pack([content_packed], compress=compress)[0] hashkey_loose = temp_container.add_object(content_loose) hashkey_missing = 'unknown' - # Assuming only zlib compression for now. Needs to be adapted when changing the possible compression libraries - object_pack_length = len(content_packed) if not compress else len( - zlib.compress(content_packed, temp_container._COMPRESSLEVEL) # pylint: disable=protected-access - ) + compresser = utils.get_compressobj_instance(compression_algorithm) + content_packed_compressed = compresser.compress(content_packed) + content_packed_compressed += compresser.flush() + object_pack_length = len(content_packed) if not compress else len(content_packed_compressed) expected_skip_missing_true = { hashkey_packed: { @@ -1280,11 +1288,15 @@ def test_stream_meta(temp_container, compress, skip_if_missing): assert check_dict == {k: v['meta'] for k, v in expected_skip_missing_false.items()} +# Note that until when we want to support py3.5, we cannot specify the +# level as a keyword argument, as this was added only in python 3.6 +@pytest.mark.parametrize('compression_algorithm', COMPRESSION_ALGORITHMS_TO_TEST) @pytest.mark.parametrize('compress', [True, False]) -def test_stream_meta_single(temp_container, compress): +def test_stream_meta_single(temp_container, compress, compression_algorithm): """Validate the meta dictionary returned by the single-object methods. (i.e., get_object_stream_and_meta and get_object_meta).""" + temp_container.init_container(clear=True, compression_algorithm=compression_algorithm) # This is the list of all known meta keys. # I do also an explicit check that all and only these are present # This is implicit since I will later also compare the exact dictionaries and not only their keys, @@ -1297,10 +1309,10 @@ def test_stream_meta_single(temp_container, compress): hashkey_packed = temp_container.add_objects_to_pack([content_packed], compress=compress)[0] hashkey_loose = temp_container.add_object(content_loose) hashkey_missing = 'unknown' - # Assuming only zlib compression for now. Needs to be adapted when changing the possible compression libraries - object_pack_length = len(content_packed) if not compress else len( - zlib.compress(content_packed, temp_container._COMPRESSLEVEL) # pylint: disable=protected-access - ) + compresser = utils.get_compressobj_instance(compression_algorithm) + content_packed_compressed = compresser.compress(content_packed) + content_packed_compressed += compresser.flush() + object_pack_length = len(content_packed) if not compress else len(content_packed_compressed) expected_skip_missing_true = { hashkey_packed: { @@ -3044,3 +3056,20 @@ def test_container_id(temp_container): # Re-initialize: it should get a new container_id temp_container.init_container(clear=True) assert old_container_id != temp_container.container_id + + +@pytest.mark.parametrize( + 'compression_algorithm', + [ + 'gzip', # unknown + 'zlib', # no variant + 'zlib+a', + 'zlib+-1', + 'zlib+10' # Invalid variant + 'unknown-method' + ] +) +def test_unknown_compressers(temp_container, compression_algorithm): + """Check that unknown or invalid compressers give a ValueError.""" + with pytest.raises(ValueError): + temp_container.init_container(clear=True, compression_algorithm=compression_algorithm) diff --git a/tests/test_utils.py b/tests/test_utils.py index 980fdd2..a82c970 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -5,7 +5,6 @@ import io import os import tempfile -import zlib import psutil import pytest @@ -23,6 +22,9 @@ # reopened as locked, to be closed before the test finishes LOCKED_FILES_FD = [] +# NOTE: test_compressers must be adapted by hand when adding new algorithms +COMPRESSION_ALGORITHMS_TO_TEST = ['zlib+1', 'zlib+9'] + def test_lazy_opener_read(): """Test the LazyOpener.""" @@ -980,8 +982,11 @@ def test_packed_object_reader_mode(): assert reader.mode == handle.mode -def test_stream_decompresser(): +@pytest.mark.parametrize('compression_algorithm', COMPRESSION_ALGORITHMS_TO_TEST) +def test_stream_decompresser(compression_algorithm): """Test the stream decompresser.""" + StreamDecompresser = utils.get_stream_decompresser(compression_algorithm) # pylint: disable=invalid-name + # A short binary string (1025 bytes, an odd number to avoid possible alignments with the chunk size) original_data_short = b'0123456789abcdef' * 64 + b'E' # A longish binary string (2097153 bytes ~ 2MB @@ -993,27 +998,42 @@ def test_stream_decompresser(): original_data = [original_data_short, original_data_long, original_data_long_random] - compressed_streams = [io.BytesIO(zlib.compress(data)) for data in original_data] + compressed_streams = [] + for data in original_data: + compresser = utils.get_compressobj_instance(compression_algorithm) + compressed = compresser.compress(data) + compressed += compresser.flush() + compressed_streams.append(io.BytesIO(compressed)) for original, compressed_stream in zip(original_data, compressed_streams): - decompresser = utils.StreamDecompresser(compressed_stream) + decompresser = StreamDecompresser(compressed_stream) # Read in one chunk assert original == decompresser.read(), 'Uncompressed data is wrong (single read)' # Redo the same, but do a read of zero bytes first, checking that # it returns a zero-length bytes, and that it does not move the offset - compressed_streams = [io.BytesIO(zlib.compress(data)) for data in original_data] + compressed_streams = [] + for data in original_data: + compresser = utils.get_compressobj_instance(compression_algorithm) + compressed = compresser.compress(data) + compressed += compresser.flush() + compressed_streams.append(io.BytesIO(compressed)) for original, compressed_stream in zip(original_data, compressed_streams): - decompresser = utils.StreamDecompresser(compressed_stream) + decompresser = StreamDecompresser(compressed_stream) # Read in one chunk tmp = decompresser.read(size=0) assert not tmp assert original == decompresser.read(), 'Uncompressed data is wrong (single read)' - compressed_streams = [io.BytesIO(zlib.compress(data)) for data in original_data] + compressed_streams = [] + for data in original_data: + compresser = utils.get_compressobj_instance(compression_algorithm) + compressed = compresser.compress(data) + compressed += compresser.flush() + compressed_streams.append(io.BytesIO(compressed)) chunk_size = 1024 for original, compressed_stream in zip(original_data, compressed_streams): data_chunks = [] - decompresser = utils.StreamDecompresser(compressed_stream) + decompresser = StreamDecompresser(compressed_stream) # Read in multiple chunk while True: chunk = decompresser.read(size=chunk_size) @@ -1025,13 +1045,18 @@ def test_stream_decompresser(): assert original == data, 'Uncompressed data is wrong (chunked read)' -def test_stream_decompresser_seek(): +@pytest.mark.parametrize('compression_algorithm', COMPRESSION_ALGORITHMS_TO_TEST) +def test_stream_decompresser_seek(compression_algorithm): """Test the seek (and tell) functionality of the StreamDecompresser.""" + StreamDecompresser = utils.get_stream_decompresser(compression_algorithm) # pylint: disable=invalid-name original_data = b'0123456789abcdefABCDEF' length = len(original_data) - compressed_stream = io.BytesIO(zlib.compress(original_data)) - decompresser = utils.StreamDecompresser(compressed_stream) + compresser = utils.get_compressobj_instance(compression_algorithm) + compressed = compresser.compress(original_data) + compressed += compresser.flush() + compressed_stream = io.BytesIO(compressed) + decompresser = StreamDecompresser(compressed_stream) # Check the functionality is disabled assert decompresser.seekable @@ -1090,22 +1115,26 @@ def test_stream_decompresser_seek(): assert decompresser.tell() == length -def test_decompresser_corrupt(): +@pytest.mark.parametrize('compression_algorithm', COMPRESSION_ALGORITHMS_TO_TEST) +def test_decompresser_corrupt(compression_algorithm): """Test that the stream decompresser raises on a corrupt input.""" + StreamDecompresser = utils.get_stream_decompresser(compression_algorithm) # pylint: disable=invalid-name # Check that we get an error for an invalid stream of bytes - decompresser = utils.StreamDecompresser(io.BytesIO(b'1234543')) + decompresser = StreamDecompresser(io.BytesIO(b'1234543')) with pytest.raises(ValueError) as excinfo: print(decompresser.read()) assert 'Error while uncompressing data' in str(excinfo.value) # Check that we get an error for a truncated stream of bytes original_data = b'someDATAotherTHINGS' - compressed_data = zlib.compress(original_data) + compresser = utils.get_compressobj_instance(compression_algorithm) + compressed_data = compresser.compress(original_data) + compressed_data += compresser.flush() # I remove the last byte, so it's corrupted corrupted_stream = io.BytesIO(compressed_data[:-1]) - decompresser = utils.StreamDecompresser(corrupted_stream) + decompresser = StreamDecompresser(corrupted_stream) with pytest.raises(ValueError) as excinfo: print(decompresser.read()) assert 'problem in the incoming buffer' in str(excinfo.value) @@ -1489,3 +1518,34 @@ def test_rename_callback(callback_instance): 'value': len(content) } ] + + +@pytest.mark.parametrize( + 'compression_algorithm,compressed_expected', [['zlib+1', b'x\x013426153\xb7\xb040Df\x01\xf9\x00G\xb2\x05R'], + ['zlib+9', b'x\xda3426153\xb7\xb040Df\x01I\x00G\xb2\x05R']] +) +def test_compressers(compression_algorithm, compressed_expected): + """Check that the data is compressed as expected.""" + uncompressed = b'12345678901234567890123890' + compresser = utils.get_compressobj_instance(compression_algorithm) + compressed = compresser.compress(uncompressed) + compressed += compresser.flush() + + assert compressed == compressed_expected + + +def test_unknown_compressers(): + """Check that unknown or invalid compressers give a ValueError.""" + invalid_methods = [ + 'gzip', # unknown + 'zlib', # no variant + 'zlib+a', + 'zlib+-1', + 'zlib+10' # Invalid variant + 'unknown-method' + ] + for invalid in invalid_methods: + with pytest.raises(ValueError): + utils.get_compressobj_instance(invalid) + with pytest.raises(ValueError): + utils.get_stream_decompresser(invalid) From 3120efe5ece7a0962252dbd013dffdf600ec77b6 Mon Sep 17 00:00:00 2001 From: Giovanni Pizzi Date: Mon, 12 Oct 2020 08:38:57 +0200 Subject: [PATCH 16/18] Release 0.5 - Various general (but very important) speed improvements - Adding callbacks to a number of functions (e.g. export, add_objects_to_pack, ...) to allow showing progress bars or similar indicators - Implement repacking (at least when not changing hashing or compression) - Remove `export`, implement `import_objects` function instead on the other side (more efficient) - Add support for VACUUMing operations (very important for efficiency) - Added support for multiple hashing algorithms - Added concept of (unique) container_id - Generalized the compression algorithm, multiple algorithms supported now --- disk_objectstore/__init__.py | 2 +- disk_objectstore/container.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/disk_objectstore/__init__.py b/disk_objectstore/__init__.py index eaadf61..5328de3 100644 --- a/disk_objectstore/__init__.py +++ b/disk_objectstore/__init__.py @@ -6,4 +6,4 @@ __all__ = ('Container', 'ObjectType', 'CompressMode') -__version__ = '0.4.0' +__version__ = '0.5.0' diff --git a/disk_objectstore/container.py b/disk_objectstore/container.py index c3be5d0..e6a7df5 100644 --- a/disk_objectstore/container.py +++ b/disk_objectstore/container.py @@ -1975,8 +1975,8 @@ def import_objects( # pylint: disable=too-many-locals,too-many-statements,too-m def export(self, hashkeys, other_container, compress=False, target_memory_bytes=104857600, callback=None): """Export the specified hashkeys to a new container (must be already initialised). - ..deprecated:: 0.6 - Deprecated: use the ``import_objects`` method of ``other_container`` instead. + ..deprecated:: 0.5 + Deprecated: use the ``import_objects`` method of ``other_container`` instead. Will be removed in 0.6. :param hashkeys: an iterable of hash keys. :param other_container: another Container class into which you want to export the specified hash keys of this From c3eabd03f377a09ed4e4f56d2cf86fa5cc7e1182 Mon Sep 17 00:00:00 2001 From: Giovanni Pizzi Date: Wed, 4 Nov 2020 17:37:09 +0100 Subject: [PATCH 17/18] Added Changelog --- CHANGELOG.md | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 810cc7f..74b5229 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,13 @@ +# v0.5.0 (November 2020) +- Various general (but very important) speed improvements +- Add callbacks to a number of functions (e.g. export, add_objects_to_pack, ... to allow showing progress bars or similar indicators +- Implement repacking (at least when not changing hashing or compression) +- Remove `export` function, implement `import_objects` function instead, to be called on the other side (it's more efficient) +- Add support for VACUUMing operations on the SQLite database (very important for efficiency) +- Add support for multiple hashing algorithms +- Add concept of (unique) `container_id` +- Generalize the compression algorithm implementation, and multiple algorithms are supported now + # v0.4.0 (20 July 2020) - Major robustness improvements and new functionality (possibility to pack while using the repository, tested on all platforms) - Not deleting loose files when packing; now there is a `clean_storage()` function to do it afterwards, as a maintenance operation From 02cadddfef6988c7051251f0dd8c245594f7577a Mon Sep 17 00:00:00 2001 From: Giovanni Pizzi Date: Tue, 10 Nov 2020 21:31:37 +0100 Subject: [PATCH 18/18] Adding links to PRs for change log of v0.5 I am not adding it for earlier versions as they are still pre-production versions, v0.4.0 was really the first production-ready version --- CHANGELOG.md | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 74b5229..b52844f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,12 +1,12 @@ # v0.5.0 (November 2020) -- Various general (but very important) speed improvements -- Add callbacks to a number of functions (e.g. export, add_objects_to_pack, ... to allow showing progress bars or similar indicators -- Implement repacking (at least when not changing hashing or compression) -- Remove `export` function, implement `import_objects` function instead, to be called on the other side (it's more efficient) -- Add support for VACUUMing operations on the SQLite database (very important for efficiency) -- Add support for multiple hashing algorithms -- Add concept of (unique) `container_id` -- Generalize the compression algorithm implementation, and multiple algorithms are supported now +- Various general (but very important) speed improvements [[#96]](https://github.com/aiidateam/disk-objectstore/pull/96) [[#102]](https://github.com/aiidateam/disk-objectstore/pull/102) +- Add callbacks to a number of functions (e.g. export, add_objects_to_pack, ... to allow showing progress bars or similar indicators [[#96]](https://github.com/aiidateam/disk-objectstore/pull/96) +- Implement repacking (at least when not changing hashing or compression) [[#96]](https://github.com/aiidateam/disk-objectstore/pull/96) +- Remove `export` function, implement `import_objects` function instead, to be called on the other side (it's more efficient) [[#96]](https://github.com/aiidateam/disk-objectstore/pull/96) +- Add support for VACUUMing operations on the SQLite database (very important for efficiency) [[#96]](https://github.com/aiidateam/disk-objectstore/pull/96) +- Add support for multiple hashing algorithms [[#96]](https://github.com/aiidateam/disk-objectstore/pull/96) +- Add concept of (unique) `container_id` [[#97]](https://github.com/aiidateam/disk-objectstore/pull/97) +- Generalize the compression algorithm implementation, and multiple algorithms are supported now [[#99]](https://github.com/aiidateam/disk-objectstore/pull/99) # v0.4.0 (20 July 2020) - Major robustness improvements and new functionality (possibility to pack while using the repository, tested on all platforms)