Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/4016 anon import aliases #2433

Merged
merged 7 commits into from
Nov 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 31 additions & 9 deletions portality/dao.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
import time
from __future__ import annotations

import json
import re
import os
import sys
import uuid
import json
import elasticsearch
import time
import urllib.parse

import uuid
from collections import UserDict
from copy import deepcopy
from datetime import timedelta
from typing import List
from typing import List, Iterable, Tuple

import elasticsearch

from portality.core import app, es_connection as ES
from portality.lib import dates
Expand Down Expand Up @@ -724,15 +726,16 @@ def dump(cls, q=None, page_size=1000, limit=None, out=None, out_template=None, o
return filenames

@classmethod
def bulk_load_from_file(cls, source_file, limit=None, max_content_length=100000000):
def bulk_load_from_file(cls, source_file, index=None, limit=None, max_content_length=100000000):
""" ported from esprit.tasks - bulk load to index from file """
index = index or cls.index_name()

source_size = os.path.getsize(source_file)
with open(source_file, "r") as f:
if limit is None and source_size < max_content_length:
# if we aren't selecting a portion of the file, and the file is below the max content length, then
# we can just serve it directly
ES.bulk(body=f.read(), index=cls.index_name(), doc_type=cls.doc_type(), request_timeout=120)
ES.bulk(body=f.read(), index=index, doc_type=cls.doc_type(), request_timeout=120)
return -1
else:
count = 0
Expand All @@ -755,7 +758,7 @@ def bulk_load_from_file(cls, source_file, limit=None, max_content_length=1000000
else:
count += records

ES.bulk(body=chunk, index=cls.index_name(), doc_type=cls.doc_type(), request_timeout=120)
ES.bulk(body=chunk, index=index, doc_type=cls.doc_type(), request_timeout=120)
if finished:
break
if limit is not None:
Expand Down Expand Up @@ -1065,6 +1068,25 @@ def refresh():
return ES.indices.refresh()


def find_indexes_by_prefix(index_prefix) -> list[str]:
data = ES.indices.get(f'{index_prefix}*')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it might be good to enforce the separator here (if not index_prefix.endswith('-') ...) because here is where a missing separator could match a whole lot more indexes than we indend. E.g. if you supply doaj as the index prefix you'll delete indexes doajstatic-article etc. That won't happen unless the comment in settings.py to include the separator is ignored.

Perhaps we should just ensure the setting always ends with - instead. Either way, not a blocker for release.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not recommend it.

It will fail the first time import. The first time import meaning when the the index name still calling doaj-account instead of doaj-account-20111212, which mean it will help to migrate if the environment still using old index name doaj-account.

Moreover, prefix search by index_type and the script will show which index will be removed, it should be safe enough.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added some condition check to avoid future index like doaj-account_abcde

return list(data.keys())


def find_index_aliases(alias_prefixes=None) -> Iterable[Tuple[str, str]]:
def _yield_index_alias():
data = ES.indices.get_alias()
for index, d in data.items():
for alias in d['aliases'].keys():
yield index, alias

index_aliases = _yield_index_alias()
if alias_prefixes:
index_aliases = ((index, alias) for index, alias in index_aliases
if any(alias.startswith(p) for p in alias_prefixes))
return index_aliases


class BlockTimeOutException(Exception):
pass

Expand Down
4 changes: 2 additions & 2 deletions portality/lib/dates.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,8 @@ def now_str_with_microseconds() -> str:
return format(now(), format=FMT_DATETIME_MS_STD)


def today() -> str:
return format(now(), format=FMT_DATE_STD)
def today(str_format=FMT_DATE_STD) -> str:
return format(now(), format=str_format)


def random_date(fro: datetime = None, to: datetime = None) -> str:
Expand Down
103 changes: 87 additions & 16 deletions portality/scripts/anon_import.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,40 @@
DOAJENV=test python portality/scripts/anon_import.py data_import_settings/test_server.json
"""

import json, gzip, shutil, elasticsearch
from portality.core import app, es_connection, initialise_index
from portality.store import StoreFactory
from portality.dao import DomainObject
from portality import models
from __future__ import annotations

import gzip
import itertools
import json
import re
import shutil
from dataclasses import dataclass
from time import sleep

import portality.dao
from doajtest.helpers import patch_config
from portality import models
from portality.core import app, es_connection
from portality.dao import DomainObject
from portality.lib import dates, es_data_mapping
from portality.store import StoreFactory
from portality.util import ipt_prefix


def do_import(config):
@dataclass
class IndexDetail:
index_type: str
instance_name: str
alias_name: str


def find_toberemoved_indexes(prefix):
for index in portality.dao.find_indexes_by_prefix(prefix):
if index == prefix or re.match(rf"{prefix}-\d+", index):
yield index


def do_import(config):
# filter for the types we are going to work with
import_types = {}
for t, s in config.get("types", {}).items():
Expand All @@ -35,22 +59,58 @@ def do_import(config):
print(("{x} from {y}".format(x=count, y=import_type)))
print("\n")

toberemoved_index_prefixes = [ipt_prefix(import_type) for import_type in import_types.keys()]
toberemoved_indexes = itertools.chain.from_iterable(
find_toberemoved_indexes(p) for p in toberemoved_index_prefixes
)
toberemoved_index_aliases = list(portality.dao.find_index_aliases(toberemoved_index_prefixes))

if toberemoved_indexes:
print("==Removing the following indexes==")
print(' {}'.format(', '.join(toberemoved_indexes)))
print()
if toberemoved_index_aliases:
print("==Removing the following aliases==")
print(' {}'.format(', '.join(alias for _, alias in toberemoved_index_aliases)))
print()

if config.get("confirm", True):
text = input("Continue? [y/N] ")
if text.lower() != "y":
exit()

# remove all the types that we are going to import
for import_type in list(import_types.keys()):
try:
if es_connection.indices.get(app.config['ELASTIC_SEARCH_DB_PREFIX'] + import_type):
es_connection.indices.delete(app.config['ELASTIC_SEARCH_DB_PREFIX'] + import_type)
except elasticsearch.exceptions.NotFoundError:
pass
for index in toberemoved_indexes:
if es_connection.indices.exists(index):
print("Deleting index: {}".format(index))
es_connection.indices.delete(index, ignore=[404])

for index, alias in toberemoved_index_aliases:
if es_connection.indices.exists_alias(alias, index=index):
print("Deleting alias: {} -> {}".format(index, alias))
es_connection.indices.delete_alias(index, alias, ignore=[404])

index_details = {}
for import_type in import_types.keys():
alias_name = ipt_prefix(import_type)
index_details[import_type] = IndexDetail(
index_type=import_type,
instance_name=alias_name + '-{}'.format(dates.today(dates.FMT_DATE_SHORT)),
alias_name=alias_name
)

# re-initialise the index (sorting out mappings, etc)
print("==Initialising Index for Mappings==")
initialise_index(app, es_connection)
print("==Initialising Index Mappings and alias ==")
mappings = es_data_mapping.get_mappings(app)
for index_detail in index_details.values():
print("Initialising index: {}".format(index_detail.instance_name))
es_connection.indices.create(index=index_detail.instance_name,
body=mappings[index_detail.index_type],
request_timeout=app.config.get("ES_SOCKET_TIMEOUT", None))

print("Creating alias: {:<25} -> {}".format(index_detail.instance_name, index_detail.alias_name))
blocking_if_indices_exist(index_detail.alias_name)
es_connection.indices.put_alias(index=index_detail.instance_name, name=index_detail.alias_name)

mainStore = StoreFactory.get("anon_data")
tempStore = StoreFactory.tmp()
Expand Down Expand Up @@ -85,10 +145,12 @@ def do_import(config):
shutil.copyfileobj(f_in, f_out)
tempStore.delete_file(container, filename + ".gz")

print(("Importing from {x}".format(x=filename)))
instance_index_name = index_details[import_type].instance_name
print("Importing from {x} to index[{index}]".format(x=filename, index=instance_index_name))

imported_count = dao.bulk_load_from_file(uncompressed_file,
limit=limit, max_content_length=config.get("max_content_length", 100000000))
index=instance_index_name, limit=limit,
max_content_length=config.get("max_content_length", 100000000))
tempStore.delete_file(container, filename)

if limit is not None and imported_count != -1:
Expand All @@ -105,9 +167,18 @@ def do_import(config):
tempStore.delete_container(container)


def blocking_if_indices_exist(index_name):
for retry in range(5):
if not es_connection.indices.exists(index_name):
break
print(f"Old alias exists, waiting for it to be removed, alias[{index_name}] retry[{retry}]...")
sleep(5)


if __name__ == '__main__':

import argparse

parser = argparse.ArgumentParser()

parser.add_argument("config", help="Config file for import run, e.g dev_basics.json")
Expand Down