Skip to content

Commit

Permalink
fix: make vectorizer worker robust
Browse files Browse the repository at this point in the history
These fixes were mostly done in the context of usability with docker
compose. Includes the following changes:

- Handle being killed with SIGINT or SIGTERM
- Handle being unable to connect to the target DB
- Handle arbitrary failures in the core of vectorizer worker
  • Loading branch information
JamesGuthrie committed Nov 28, 2024
1 parent 13b4fd5 commit cd80284
Showing 1 changed file with 54 additions and 26 deletions.
80 changes: 54 additions & 26 deletions projects/pgai/pgai/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@
import logging
import os
import random
import signal
import sys
import time
from collections.abc import Sequence
from typing import Any

import click
import psycopg
Expand Down Expand Up @@ -162,6 +164,12 @@ def get_log_level(level: str) -> int:
return logging.getLevelName("INFO") # type: ignore


def shutdown_handler(signum: int, _frame: Any):
signame = signal.Signals(signum).name
log.info(f"received {signame}, exiting")
exit(0)


@click.command(name="worker")
@click.version_option(version=__version__)
@click.option(
Expand Down Expand Up @@ -218,42 +226,62 @@ def vectorizer_worker(
poll_interval: int,
once: bool,
) -> None:
# gracefully handle being asked to shut down
signal.signal(signal.SIGINT, shutdown_handler)
signal.signal(signal.SIGTERM, shutdown_handler)

structlog.configure(
wrapper_class=structlog.make_filtering_bound_logger(get_log_level(log_level))
)
log.debug("starting vectorizer worker")
poll_interval_str = datetime.timedelta(seconds=poll_interval)

with (
psycopg.Connection.connect(db_url) as con,
con.cursor(row_factory=namedtuple_row) as cur,
):
pgai_version = get_pgai_version(cur)
if pgai_version is None:
log.critical("the pgai extension is not installed")
sys.exit(1)
dynamic_mode = len(vectorizer_ids) == 0
valid_vectorizer_ids = []
if not dynamic_mode:
valid_vectorizer_ids = get_vectorizer_ids(db_url, vectorizer_ids)
if len(valid_vectorizer_ids) != len(vectorizer_ids):
log.critical(
f"invalid vectorizers, wanted: {list(vectorizer_ids)}, got: {valid_vectorizer_ids}" # noqa: E501 (line too long)
)
sys.exit(1)

can_connect = False
pgai_version = None

while True:
if dynamic_mode:
valid_vectorizer_ids = get_vectorizer_ids(db_url, vectorizer_ids)
if len(valid_vectorizer_ids) == 0:
log.warning("no vectorizers found")

for vectorizer_id in valid_vectorizer_ids:
vectorizer = get_vectorizer(db_url, vectorizer_id)
if vectorizer is None:
continue
log.info("running vectorizer", vectorizer_id=vectorizer_id)
run_vectorizer(db_url, vectorizer, concurrency)
try:
if not can_connect or pgai_version is None:
with (
psycopg.Connection.connect(db_url) as con,
con.cursor(row_factory=namedtuple_row) as cur,
):
pgai_version = get_pgai_version(cur)
can_connect = True
if pgai_version is None:
log.warn("the pgai extension is not installed")

if can_connect and pgai_version is not None:
if not dynamic_mode and len(valid_vectorizer_ids) != len(
vectorizer_ids
):
valid_vectorizer_ids = get_vectorizer_ids(db_url, vectorizer_ids)
if len(valid_vectorizer_ids) != len(vectorizer_ids):
log.critical(
f"invalid vectorizers, wanted: {list(vectorizer_ids)}, got: {valid_vectorizer_ids}" # noqa: E501 (line too long)
)
sys.exit(1)
else:
valid_vectorizer_ids = get_vectorizer_ids(db_url, vectorizer_ids)
if len(valid_vectorizer_ids) == 0:
log.warning("no vectorizers found")

for vectorizer_id in valid_vectorizer_ids:
vectorizer = get_vectorizer(db_url, vectorizer_id)
if vectorizer is None:
continue
log.info("running vectorizer", vectorizer_id=vectorizer_id)
run_vectorizer(db_url, vectorizer, concurrency)
except psycopg.OperationalError as e:
if "connection failed" in str(e):
log.error(f"unable to connect to database: {str(e)}")
except Exception as e:
# catch any exceptions, log them, and keep on going
log.error(f"unexpected error: {str(e)}")

if once:
return
log.info(f"sleeping for {poll_interval_str} before polling for new work")
Expand Down

0 comments on commit cd80284

Please sign in to comment.