diff --git a/portality/scripts/anon_import.py b/portality/scripts/anon_import.py index d754f8ae7..4924479df 100644 --- a/portality/scripts/anon_import.py +++ b/portality/scripts/anon_import.py @@ -19,13 +19,15 @@ import itertools import json import shutil +from dataclasses import dataclass +from time import sleep -import elasticsearch import esprit from doajtest.helpers import patch_config from portality import dao -from portality.core import app, es_connection, initialise_index +from portality.core import app, es_connection +from portality.lib import dates, es_data_mapping from portality.store import StoreFactory from portality.util import ipt_prefix @@ -46,6 +48,13 @@ def es_bulk(connection, data, type=""): return Resp(status_code=500, text=str(e)) +@dataclass +class IndexDetail: + index_type: str + instance_name: str + alias_name: str + + def do_import(config): # filter for the types we are going to work with import_types = {} @@ -59,19 +68,21 @@ def do_import(config): print(("{x} from {y}".format(x=count, y=import_type))) print("\n") - toberemoved_index_prefixes = [ - es_connection.indices.get(app.config['ELASTIC_SEARCH_DB_PREFIX'] + import_type) - for import_type in import_types.keys() - ] + toberemoved_index_prefixes = [ipt_prefix(import_type) for import_type in import_types.keys()] toberemoved_indexes = list(itertools.chain.from_iterable( dao.find_indexes_by_prefix(p) for p in toberemoved_index_prefixes )) toberemoved_index_aliases = list(dao.find_index_aliases(toberemoved_index_prefixes)) - print("==Removing the following indexes==") - print(' {}'.format(', '.join(toberemoved_indexes))) - print("==Removing the following aliases==") - print(' {}'.format(', '.join(alias for _, alias in toberemoved_index_aliases))) + if toberemoved_indexes: + print("==Removing the following indexes==") + print(' {}'.format(', '.join(toberemoved_indexes))) + print() + if toberemoved_index_aliases: + print("==Removing the following aliases==") + print(' {}'.format(', '.join(alias for _, alias in toberemoved_index_aliases))) + print() + if config.get("confirm", True): text = input("Continue? [y/N] ") if text.lower() != "y": @@ -79,24 +90,32 @@ def do_import(config): # remove all the types that we are going to import for index in toberemoved_indexes: - try: - if es_connection.indices.exists(index): - print("Deleting index: {}".format(index)) - es_connection.indices.delete(index) - except elasticsearch.exceptions.NotFoundError: - pass + if es_connection.indices.exists(index): + print("Deleting index: {}".format(index)) + es_connection.indices.delete(index, ignore=[404]) for index, alias in toberemoved_index_aliases: - try: - if es_connection.indices.exists_alias(index, alias): - print("Deleting alias: {} -> {}".format(index, alias)) - es_connection.indices.delete_alias(index, alias) - except elasticsearch.exceptions.NotFoundError: - pass + if es_connection.indices.exists_alias(alias, index=index): + print("Deleting alias: {} -> {}".format(index, alias)) + es_connection.indices.delete_alias(index, alias, ignore=[404]) + + index_details = {} + for import_type in import_types.keys(): + alias_name = ipt_prefix(import_type) + index_details[import_type] = IndexDetail( + index_type=import_type, + instance_name=alias_name + '-{}'.format(dates.today(dates.FMT_DATE_SHORT)), + alias_name=alias_name + ) # re-initialise the index (sorting out mappings, etc) print("==Initialising Index for Mappings==") - initialise_index(app, es_connection) + mappings = es_data_mapping.get_mappings(app) + for index_detail in index_details.values(): + print("Initialising index: {}".format(index_detail.instance_name)) + es_connection.indices.create(index=index_detail.instance_name, + body=mappings[index_detail.index_type], + request_timeout=app.config.get("ES_SOCKET_TIMEOUT", None)) mainStore = StoreFactory.get("anon_data") tempStore = StoreFactory.tmp() @@ -128,8 +147,9 @@ def do_import(config): shutil.copyfileobj(f_in, f_out) tempStore.delete_file(container, filename + ".gz") - print(("Importing from {x}".format(x=filename))) - imported_count = esprit.tasks.bulk_load(es_connection, ipt_prefix(import_type), uncompressed_file, + instance_index_name = index_details[import_type].instance_name + print("Importing from {x} to index[{index}]".format(x=filename, index=instance_index_name)) + imported_count = esprit.tasks.bulk_load(es_connection, instance_index_name, uncompressed_file, limit=limit, max_content_length=config.get("max_content_length", 100000000)) tempStore.delete_file(container, filename) @@ -144,6 +164,18 @@ def do_import(config): # once we've finished importing, clean up by deleting the entire temporary container tempStore.delete_container(container) + # create aliases for the indexes + print("\n==Creating Aliases==") + for index_detail in index_details.values(): + for retry in range(5): + if not es_connection.indices.exists(index_detail.alias_name): + break + print(f"Old alias exists, waiting for it to be removed, alias[{index_detail.alias_name}] retry[{retry}]") + sleep(5) + + print("Creating alias: {:<30} -> {}".format(index_detail.instance_name, index_detail.alias_name)) + es_connection.indices.put_alias(index=index_detail.instance_name, name=index_detail.alias_name) + if __name__ == '__main__':