Skip to content

Commit

Permalink
Merge pull request #970 from ImageMarkup/add-low-priority-queue
Browse files Browse the repository at this point in the history
Track the last log files that were enqueued for processing
  • Loading branch information
danlamanna authored Sep 30, 2024
2 parents cfab4c3 + f144060 commit a851243
Show file tree
Hide file tree
Showing 5 changed files with 102 additions and 9 deletions.
3 changes: 2 additions & 1 deletion Procfile
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,6 @@ release: ./manage.py migrate --check || ./manage.py migrate
# https://devcenter.heroku.com/articles/http-routing#http-validation-and-restrictions
# long request lines are useful for long DSL search queries
web: gunicorn --timeout 120 --limit-request-line 8192 --bind 0.0.0.0:$PORT isic.wsgi
worker: REMAP_SIGTERM=SIGQUIT celery --app isic.celery worker --loglevel INFO --without-heartbeat --concurrency 2
worker: REMAP_SIGTERM=SIGQUIT celery --app isic.celery worker --loglevel INFO --without-heartbeat --concurrency 2 -X low-priority
low_priority_worker: REMAP_SIGTERM=SIGQUIT celery --app isic.celery worker --loglevel INFO --without-heartbeat --concurrency 2 -Q low-priority
beat: REMAP_SIGTERM=SIGQUIT celery --app isic.celery beat --loglevel INFO
39 changes: 39 additions & 0 deletions isic/stats/migrations/0002_lastenqueueds3log.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# Generated by Django 5.1.1 on 2024-09-30 15:43

import django.core.validators
from django.db import migrations, models


class Migration(migrations.Migration):
dependencies = [
("stats", "0001_initial"),
]

operations = [
migrations.CreateModel(
name="LastEnqueuedS3Log",
fields=[
(
"id",
models.BigAutoField(
auto_created=True,
primary_key=True,
serialize=False,
verbose_name="ID",
),
),
(
"name",
models.CharField(
max_length=36,
unique=True,
validators=[
django.core.validators.RegexValidator(
"^\\d{4}-(\\d{2}-){5}[A-F0-9]{16}$"
)
],
),
),
],
),
]
21 changes: 21 additions & 0 deletions isic/stats/models.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from django.core.validators import RegexValidator
from django.db import models
from django.db.models.constraints import CheckConstraint, UniqueConstraint
from django.db.models.expressions import F
Expand Down Expand Up @@ -42,3 +43,23 @@ class Meta:

def __str__(self):
return f"{self.ip_address} - {self.download_time}"


class LastEnqueuedS3Log(models.Model):
"""
The last S3 log file that was enqueued to be processed.
This table is intended to only have one row.
"""

name = models.CharField(
max_length=36,
validators=[
# https://docs.aws.amazon.com/AmazonS3/latest/userguide/ServerLogs.html#server-log-keyname-format
RegexValidator(r"^\d{4}-(\d{2}-){5}[A-F0-9]{16}$")
],
unique=True,
)

def __str__(self) -> str:
return self.name
34 changes: 26 additions & 8 deletions isic/stats/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,13 @@
from celery.utils.log import get_task_logger
from django.conf import settings
from django.db import transaction
from django.db.models import Max
from django.db.utils import IntegrityError
from django.utils import timezone
import pycountry

from isic.core.models.image import Image
from isic.stats.models import GaMetrics, ImageDownload
from isic.stats.models import GaMetrics, ImageDownload, LastEnqueuedS3Log

logger = get_task_logger(__name__)

Expand Down Expand Up @@ -88,6 +89,7 @@ def _country_from_iso_code(iso_code: str) -> dict:
@shared_task(
soft_time_limit=60,
time_limit=120,
queue="low-priority",
)
def collect_google_analytics_metrics_task():
if not settings.ISIC_GOOGLE_API_JSON_KEY:
Expand Down Expand Up @@ -119,8 +121,12 @@ def collect_google_analytics_metrics_task():
)


def _cdn_log_objects(s3) -> Iterable[dict]:
pages = s3.get_paginator("list_objects_v2").paginate(Bucket=settings.CDN_LOG_BUCKET)
def _cdn_log_objects(s3, after: str | None) -> Iterable[dict]:
kwargs = {}
if after:
kwargs["StartAfter"] = after

pages = s3.get_paginator("list_objects_v2").paginate(Bucket=settings.CDN_LOG_BUCKET, **kwargs)
for page in pages:
yield from page.get("Contents", [])

Expand Down Expand Up @@ -155,18 +161,30 @@ def _cdn_access_log_records(log_file_bytes: BytesIO) -> Iterable[dict]:
}


@shared_task(soft_time_limit=60, time_limit=120)
@shared_task(queue="low-priority")
def collect_image_download_records_task():
"""
Collect CDN logs to record image downloads.
This task is idempotent and can be run multiple times without issue. It tracks the
last log file that was enqueued. Theoretically it can fail to process a log file and
would have to be remedied by truncating the LastEnqueuedS3Log table and re-running the task.
"""
s3 = _s3_client()
# returns None in the case of an empty table
after = LastEnqueuedS3Log.objects.aggregate(last_log=Max("name"))["last_log"]

# gather all request log entries and group them by path
for s3_log_object in _cdn_log_objects(s3):
# break this out into subtasks as a single log file can consume a large amount of ram.
# gather all request log entries and enqueue them to be processed
for s3_log_object in _cdn_log_objects(s3, after):
process_s3_log_file_task.delay_on_commit(s3_log_object["Key"])
LastEnqueuedS3Log.objects.update_or_create(defaults={"name": s3_log_object["Key"]})


@shared_task(soft_time_limit=600, time_limit=630, max_retries=5, retry_backoff=True)
@shared_task(
soft_time_limit=600, time_limit=630, max_retries=5, retry_backoff=True, queue="low-priority"
)
def process_s3_log_file_task(s3_log_object_key: str):
logger.info("Processing s3 log file %s", s3_log_object_key)
s3 = _s3_client()

try:
Expand Down
14 changes: 14 additions & 0 deletions isic/stats/tests/test_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,3 +142,17 @@ def _delete_object(*args, **kwargs):

assert ImageDownload.objects.count() == 1
assert image.downloads.count() == 1

# assert that re-running the task only looks for new logs after "foo"
import isic.stats.tasks

cdn_log_objects = mocker.spy(isic.stats.tasks, "_cdn_log_objects")

with django_capture_on_commit_callbacks(execute=True):
collect_image_download_records_task()

assert ImageDownload.objects.count() == 1
assert image.downloads.count() == 1

assert cdn_log_objects.call_count == 1
assert cdn_log_objects.call_args[0][1] == "foo"

0 comments on commit a851243

Please sign in to comment.