Skip to content

Commit

Permalink
create instance indexes and aliases
Browse files Browse the repository at this point in the history
  • Loading branch information
philipkcl committed Nov 20, 2024
1 parent 41e2d74 commit 13c0d24
Showing 1 changed file with 57 additions and 25 deletions.
82 changes: 57 additions & 25 deletions portality/scripts/anon_import.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@
import itertools
import json
import shutil
from dataclasses import dataclass
from time import sleep

import elasticsearch
import esprit

from doajtest.helpers import patch_config
from portality import dao
from portality.core import app, es_connection, initialise_index
from portality.core import app, es_connection
from portality.lib import dates, es_data_mapping
from portality.store import StoreFactory
from portality.util import ipt_prefix

Expand All @@ -46,6 +48,13 @@ def es_bulk(connection, data, type=""):
return Resp(status_code=500, text=str(e))


@dataclass
class IndexDetail:
index_type: str
instance_name: str
alias_name: str


def do_import(config):
# filter for the types we are going to work with
import_types = {}
Expand All @@ -59,44 +68,54 @@ def do_import(config):
print(("{x} from {y}".format(x=count, y=import_type)))
print("\n")

toberemoved_index_prefixes = [
es_connection.indices.get(app.config['ELASTIC_SEARCH_DB_PREFIX'] + import_type)
for import_type in import_types.keys()
]
toberemoved_index_prefixes = [ipt_prefix(import_type) for import_type in import_types.keys()]
toberemoved_indexes = list(itertools.chain.from_iterable(
dao.find_indexes_by_prefix(p) for p in toberemoved_index_prefixes
))
toberemoved_index_aliases = list(dao.find_index_aliases(toberemoved_index_prefixes))

print("==Removing the following indexes==")
print(' {}'.format(', '.join(toberemoved_indexes)))
print("==Removing the following aliases==")
print(' {}'.format(', '.join(alias for _, alias in toberemoved_index_aliases)))
if toberemoved_indexes:
print("==Removing the following indexes==")
print(' {}'.format(', '.join(toberemoved_indexes)))
print()
if toberemoved_index_aliases:
print("==Removing the following aliases==")
print(' {}'.format(', '.join(alias for _, alias in toberemoved_index_aliases)))
print()

if config.get("confirm", True):
text = input("Continue? [y/N] ")
if text.lower() != "y":
exit()

# remove all the types that we are going to import
for index in toberemoved_indexes:
try:
if es_connection.indices.exists(index):
print("Deleting index: {}".format(index))
es_connection.indices.delete(index)
except elasticsearch.exceptions.NotFoundError:
pass
if es_connection.indices.exists(index):
print("Deleting index: {}".format(index))
es_connection.indices.delete(index, ignore=[404])

for index, alias in toberemoved_index_aliases:
try:
if es_connection.indices.exists_alias(index, alias):
print("Deleting alias: {} -> {}".format(index, alias))
es_connection.indices.delete_alias(index, alias)
except elasticsearch.exceptions.NotFoundError:
pass
if es_connection.indices.exists_alias(alias, index=index):
print("Deleting alias: {} -> {}".format(index, alias))
es_connection.indices.delete_alias(index, alias, ignore=[404])

index_details = {}
for import_type in import_types.keys():
alias_name = ipt_prefix(import_type)
index_details[import_type] = IndexDetail(
index_type=import_type,
instance_name=alias_name + '-{}'.format(dates.today(dates.FMT_DATE_SHORT)),
alias_name=alias_name
)

# re-initialise the index (sorting out mappings, etc)
print("==Initialising Index for Mappings==")
initialise_index(app, es_connection)
mappings = es_data_mapping.get_mappings(app)
for index_detail in index_details.values():
print("Initialising index: {}".format(index_detail.instance_name))
es_connection.indices.create(index=index_detail.instance_name,
body=mappings[index_detail.index_type],
request_timeout=app.config.get("ES_SOCKET_TIMEOUT", None))

mainStore = StoreFactory.get("anon_data")
tempStore = StoreFactory.tmp()
Expand Down Expand Up @@ -128,8 +147,9 @@ def do_import(config):
shutil.copyfileobj(f_in, f_out)
tempStore.delete_file(container, filename + ".gz")

print(("Importing from {x}".format(x=filename)))
imported_count = esprit.tasks.bulk_load(es_connection, ipt_prefix(import_type), uncompressed_file,
instance_index_name = index_details[import_type].instance_name
print("Importing from {x} to index[{index}]".format(x=filename, index=instance_index_name))
imported_count = esprit.tasks.bulk_load(es_connection, instance_index_name, uncompressed_file,
limit=limit,
max_content_length=config.get("max_content_length", 100000000))
tempStore.delete_file(container, filename)
Expand All @@ -144,6 +164,18 @@ def do_import(config):
# once we've finished importing, clean up by deleting the entire temporary container
tempStore.delete_container(container)

# create aliases for the indexes
print("\n==Creating Aliases==")
for index_detail in index_details.values():
for retry in range(5):
if not es_connection.indices.exists(index_detail.alias_name):
break
print(f"Old alias exists, waiting for it to be removed, alias[{index_detail.alias_name}] retry[{retry}]")
sleep(5)

print("Creating alias: {:<30} -> {}".format(index_detail.instance_name, index_detail.alias_name))
es_connection.indices.put_alias(index=index_detail.instance_name, name=index_detail.alias_name)


if __name__ == '__main__':

Expand Down

0 comments on commit 13c0d24

Please sign in to comment.