Skip to content

Commit

Permalink
Merge pull request #227 from sennetconsortium/tjmadonna/reindex-all-fix
Browse files Browse the repository at this point in the history
Tjmadonna/reindex all fix
  • Loading branch information
maxsibilla authored Aug 1, 2024
2 parents 1640cc5 + 79ca788 commit 9279719
Showing 1 changed file with 27 additions and 24 deletions.
51 changes: 27 additions & 24 deletions src/sennet_translator.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ def translate_all(self):
start = time.time()
delete_failure_results = {}

# Make calls to entity-api to get a list of uuids for entity types
source_uuids_list = self._call_entity_api(
session=session,
endpoint_base="source",
Expand All @@ -204,32 +205,30 @@ def translate_all(self):
endpoint_suffix="entities",
url_property="uuid"
)
sample_uuids_list = self._call_entity_api(
session=session,
endpoint_base="sample",
endpoint_suffix="entities",
url_property="uuid"
)
dataset_uuids_list = self._call_entity_api(
session=session,
endpoint_base="dataset",
endpoint_suffix="entities",
url_property="uuid"
)

# Merge into a big list that with no duplicates
all_entities_uuids = set(
source_uuids_list
+ sample_uuids_list
+ dataset_uuids_list
+ upload_uuids_list
+ collection_uuids_list
)

# Only need this comparison for the live /reindex-all PUT call
if not self.skip_comparison:
# Make calls to entity-api to get a list of uuids for rest of entity types
sample_uuids_list = self._call_entity_api(
session=session,
endpoint_base="sample",
endpoint_suffix="entities",
url_property="uuid"
)
dataset_uuids_list = self._call_entity_api(
session=session,
endpoint_base="dataset",
endpoint_suffix="entities",
url_property="uuid"
)

# Merge into a big list that with no duplicates
all_entities_uuids = set(
source_uuids_list
+ sample_uuids_list
+ dataset_uuids_list
+ upload_uuids_list
+ collection_uuids_list
)

es_uuids = set()
index_names = get_all_reindex_enabled_indice_names(self.INDICES)

Expand All @@ -250,6 +249,9 @@ def translate_all(self):
failures = self._bulk_update(update, index.public, index.url)
delete_failure_results[index.public] = failures

# No need to update the entities that were just deleted
all_entities_uuids = all_entities_uuids - uuids_to_delete

failure_results = {}
for index in self.index_config:
failure_results[index.private] = []
Expand All @@ -261,7 +263,8 @@ def translate_all(self):
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = []
for index in self.index_config:
futures.extend([executor.submit(self._upsert_index, uuids, index, session) for uuids in batched_uuids])
for uuids in batched_uuids:
futures.append(executor.submit(self._upsert_index, uuids, index, session))

for f in concurrent.futures.as_completed(futures):
failures = f.result()
Expand Down

0 comments on commit 9279719

Please sign in to comment.