From 5df70704d229e1ed66be60c9a60ad47dac59ec7e Mon Sep 17 00:00:00 2001 From: Dan LaManna Date: Thu, 3 Oct 2024 18:29:06 -0400 Subject: [PATCH] Stop elasticsearch indexing from polluting the cache --- isic/core/search.py | 16 +++++++++------- isic/core/tasks.py | 2 +- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/isic/core/search.py b/isic/core/search.py index a6e9fe36..194263e3 100644 --- a/isic/core/search.py +++ b/isic/core/search.py @@ -107,7 +107,7 @@ def add_to_search_index(image: Image) -> None: def bulk_add_to_search_index(qs: QuerySet[Image], chunk_size: int = 2_000) -> None: - from opensearchpy.helpers import parallel_bulk + from opensearchpy.helpers import bulk # The opensearch logger is very noisy when updating records, # set it to warning during this operation. @@ -119,18 +119,20 @@ def bulk_add_to_search_index(qs: QuerySet[Image], chunk_size: int = 2_000) -> No # Use a generator for lazy evaluation image_documents = (image.to_elasticsearch_document() for image in qs.iterator()) - for success, info in parallel_bulk( + # note we can't use parallel_bulk because the cachalot_disabled context manager + # is thread local. + success, info = bulk( client=get_elasticsearch_client(), index=settings.ISIC_ELASTICSEARCH_INDEX, actions=image_documents, # The default chunk_size is 2000, but that may be too many models to fit into memory. # Note the default chunk_size matches QuerySet.iterator chunk_size=chunk_size, - # the thread count should be limited to avoid exhausting the connection pool - thread_count=2, - ): - if not success: - logger.error("Failed to insert document into elasticsearch: %s", info) + max_retries=3, + ) + + if not success: + logger.error("Failed to insert document into elasticsearch: %s", info) def _prettify_facets(facets: dict[str, Any]) -> dict[str, Any]: diff --git a/isic/core/tasks.py b/isic/core/tasks.py index e51b2384..7a2e797c 100644 --- a/isic/core/tasks.py +++ b/isic/core/tasks.py @@ -56,7 +56,7 @@ def share_collection_with_users_task(collection_pk: int, grantor_pk: int, user_p autoretry_for=(ConnectionError, TimeoutError), retry_backoff=True, retry_backoff_max=600, - retry_kwargs={"max_retries": 15}, + retry_kwargs={"max_retries": 3}, ) def sync_elasticsearch_index_task(): bulk_add_to_search_index(Image.objects.with_elasticsearch_properties())