Skip to content

Commit

Permalink
Copied get_expired_items to new Events async service
Browse files Browse the repository at this point in the history
  • Loading branch information
BrianMwangi21 committed Nov 25, 2024
1 parent 6e60d55 commit c131916
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 3 deletions.
6 changes: 3 additions & 3 deletions server/planning/commands/delete_spiked_items.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from superdesk.celery_task_utils import get_lock_id
from superdesk.lock import lock, unlock, remove_locks
from planning.common import WORKFLOW_STATE
from planning.events import EventsAsyncService
from .async_cli import planning_cli


Expand Down Expand Up @@ -74,19 +75,18 @@ async def delete_spiked_items_handler():
remove_locks()


# TODO: Update use of events_service to new async methods
async def delete_spiked_events(expiry_datetime):
log_msg = log_msg_context.get()
logger.info(f"{log_msg} Starting to delete spiked events")
events_service = get_resource_service("events")
events_service = EventsAsyncService()

events_deleted = set()
series_to_delete = dict()

# Obtain the full list of Events that we're to process first
# As subsequent queries will change the list of returned items
events = dict()
for items in events_service.get_expired_items(expiry_datetime, spiked_events_only=True):
async for items in events_service.get_expired_items(expiry_datetime, spiked_events_only=True):
events.update({item[ID_FIELD]: item for item in items})

for event_id, event in events.items():
Expand Down
44 changes: 44 additions & 0 deletions server/planning/events/service.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,50 @@
from eve.utils import date_to_str

from planning.types import EventResourceModel
from planning.common import get_max_recurrent_events, WORKFLOW_STATE
from planning.core.service import PlanningAsyncResourceService


class EventsAsyncService(PlanningAsyncResourceService[EventResourceModel]):
resource_name = "events"

async def get_expired_items(self, expiry_datetime, spiked_events_only=False):
"""Get the expired items
Where end date is in the past
"""
query = {
"query": {"bool": {"must_not": [{"term": {"expired": True}}]}},
"filter": {"range": {"dates.end": {"lte": date_to_str(expiry_datetime)}}},
"sort": [{"dates.start": "asc"}],
"size": get_max_recurrent_events(),
}

if spiked_events_only:
query["query"] = {"bool": {"must": [{"term": {"state": WORKFLOW_STATE.SPIKED}}]}}

total_received = 0
total_events = -1

while True:
query["from"] = total_received

results = self.search(query)

# If the total_events has not been set, then this is the first query
# In which case we need to store the total hits from the search
if total_events < 0:
total_events = results.count()

# If the search doesn't contain any results, return here
if total_events < 1:
break

# If the last query doesn't contain any results, return here
if not len(results.docs):
break

total_received += len(results.docs)

# Yield the results for iteration by the callee
yield list(results.docs)

0 comments on commit c131916

Please sign in to comment.