Skip to content

Commit

Permalink
Add script to trim auxiliary tables (#314)
Browse files Browse the repository at this point in the history
  • Loading branch information
jsangmeister authored Feb 23, 2024
1 parent 6f0c166 commit 3f13318
Show file tree
Hide file tree
Showing 14 changed files with 133 additions and 18 deletions.
7 changes: 7 additions & 0 deletions cli/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,10 @@ started. Most of them are useful for developing and testing with the datastore.
`export_to_os3.py` can be used to export an existing OS4 meeting into the OS3 format as an SQL
script. The script `healthcheck.py` can be used to programmatically call the health route of the
current module.

## Trim collectionfield tables

The script `trim_collectionfields.py` can be used to remove outdated entries from the two
collectionfield helper tables to improve performance. This is best used in regular intervals, e.g.,
via a cronjob. It can be safely executed during production without shutting down any services as
long as the time span is long enough (longer than any running backend process, e.g., import may take).
Empty file added cli/__init__.py
Empty file.
File renamed without changes.
File renamed without changes.
47 changes: 47 additions & 0 deletions cli/trim_collectionfield_tables.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import sys
from datetime import datetime, timedelta
from textwrap import dedent

from datastore.shared.di import injector
from datastore.shared.postgresql_backend import ConnectionHandler
from datastore.writer.app import register_services


def main(args: list[str] = []):
"""
Usage: python trim_collectionfield_tables.py [days=1]
Trims all collectionfield tables by deleting all entries which are older than the given amount
of days (which may be a floating point number).
"""
register_services()
connection: ConnectionHandler = injector.get(ConnectionHandler)

delta = float(args[1]) if len(args) > 1 else 1
threshold = datetime.now() - timedelta(days=delta)
with connection.get_connection_context():
# delete collectionsfields which haven't been updated in the last 24 hours
connection.execute(
dedent(
"""\
DELETE FROM collectionfields cf
USING positions p
WHERE cf.position = p.position AND p.timestamp < %s
"""
),
[threshold],
)
# delete events_to_collectionfields from events older than 24 hours
connection.execute(
dedent(
"""\
DELETE FROM events_to_collectionfields ecf
USING events e, positions p
WHERE ecf.event_id = e.id AND e.position = p.position AND p.timestamp < %s
"""
),
[threshold],
)


if __name__ == "__main__":
sys.exit(main(sys.argv))
4 changes: 2 additions & 2 deletions datastore/shared/postgresql_backend/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
-- * fqids can be max 48 chars long. The longest collection is currently `motion_change_recommendation` with 28 chars. The
-- maximum is 32 chars. So 15 chars are left for ids, which means there can be (10^16)-1 ids. That are about 4.5x10^6 more ids
-- in 15 characters in comparison to (2^31)-1 for the sql INTEGER type. This should be enough.
-- * In contrast, collectionfields cna be very long in fact of structured keys. I choose 255 to be save. Maybe this can be
-- * In contrast, collectionfields can be very long in fact of structured keys. I choose 255 to be save. Maybe this can be
-- reduced in the future to save space...


Expand Down Expand Up @@ -43,7 +43,7 @@ CREATE TABLE IF NOT EXISTS events (
);
CREATE INDEX IF NOT EXISTS event_position_idx ON events (position);
CREATE INDEX IF NOT EXISTS event_fqid_idx ON events (fqid);
-- TODO: create index for data->>meeting_id for collectionfieldlocks
CREATE INDEX IF NOT EXISTS event_data_meeting_id_idx ON events ((data->>'meeting_id')) WHERE data->>'meeting_id' IS NOT NULL;

-- For the `reserve_ids` feature
CREATE TABLE IF NOT EXISTS id_sequences (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,17 +92,14 @@ def get_locked_fqfields(self, fqfields: Dict[str, int]) -> List[str]:
]
query = dedent(
f"""\
with all_together as (
select e.fqid, cf.collectionfield from events e
inner join events_to_collectionfields ecf on e.id=ecf.event_id
inner join collectionfields cf on ecf.collectionfield_id=cf.id
where {event_filter})
select fqid || %s || split_part(collectionfield, %s, 2) from all_together where {collectionfield_filter}"""
select e.fqid || %s || split_part(cf.collectionfield, %s, 2) from events e
inner join events_to_collectionfields ecf on e.id=ecf.event_id
inner join collectionfields cf on ecf.collectionfield_id=cf.id
where ({event_filter}) and ({collectionfield_filter})"""
)
query_arguments = (
event_query_arguments + [KEYSEPARATOR] * 2 + collectionfield_query_arguments
[KEYSEPARATOR] * 2 + event_query_arguments + collectionfield_query_arguments
)

return self.connection.query_list_of_single_values(query, query_arguments)

def get_locked_collectionfields(
Expand Down
4 changes: 2 additions & 2 deletions scripts/setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ combine_as_imports = true
force_grid_wrap = 0
use_parentheses = true
line_length = 88
known_first_party = datastore
known_first_party = datastore,cli
known_third_party = pytest

[mypy]
Expand All @@ -33,7 +33,7 @@ omit=
exclude_lines =
pragma: no cover
raise NotImplementedError
^\s*...$
if __name__ == .__main__.:

[coverage:html]
directory = tests/htmlcov
Expand Down
Empty file added tests/cli/__init__.py
Empty file.
Empty file added tests/cli/system/__init__.py
Empty file.
25 changes: 25 additions & 0 deletions tests/cli/system/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import pytest

from datastore.shared.postgresql_backend import setup_di as postgresql_setup_di
from datastore.shared.services import setup_di as util_setup_di
from datastore.writer import setup_di as writer_setup_di
from datastore.writer.redis_backend import setup_di as redis_setup_di
from tests import ( # noqa
db_connection,
db_cur,
reset_db_data,
reset_db_schema,
reset_di,
setup_db_connection,
)


# Application


@pytest.fixture(autouse=True)
def setup_di(reset_di): # noqa
util_setup_di()
postgresql_setup_di()
redis_setup_di()
writer_setup_di()
31 changes: 31 additions & 0 deletions tests/cli/system/test_trim_collectionfield_tables.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from datetime import datetime, timedelta

from cli.trim_collectionfield_tables import main as trim_collectionfield_tables
from datastore.shared.di import injector


def test_trim_collectionfield_tables(db_cur):
in_time = datetime.now() - timedelta(hours=12)
out_time = datetime.now() - timedelta(hours=36)
db_cur.execute(
"INSERT INTO positions (timestamp, user_id, migration_index) VALUES (%s, -1, -1), (%s, -1, -1)",
[out_time, in_time],
)
db_cur.execute(
"INSERT INTO events (position, fqid, type, weight) VALUES (1, 'a/1', 'create', 1), (2, 'a/2', 'create', 1)",
[],
)
db_cur.execute(
"INSERT INTO collectionfields (collectionfield, position) VALUES ('a/f', 1), ('a/g', 2)",
[],
)
db_cur.execute(
"INSERT INTO events_to_collectionfields VALUES (1, 1), (1, 2), (2, 2)", []
)
db_cur.connection.commit()
injector.provider_map.clear() # de-register services for testing purposes
trim_collectionfield_tables()
db_cur.execute("SELECT * FROM collectionfields")
assert db_cur.fetchall() == [(2, "a/g", 2)]
db_cur.execute("SELECT * FROM events_to_collectionfields")
assert db_cur.fetchall() == [(2, 2)]
6 changes: 5 additions & 1 deletion tests/writer/integration/write/test_occ_locking.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from datastore.shared.di import injector
from datastore.shared.postgresql_backend import ConnectionHandler, SqlQueryHelper
from datastore.shared.services import EnvironmentService, ReadDatabase
from datastore.shared.services.shutdown_service import ShutdownService
from datastore.shared.util import FilterOperator, ModelLocked
from datastore.writer.core import (
Database,
Expand All @@ -21,7 +22,9 @@

class FakeConnectionHandler:
def query_list_of_single_values(self, query, arguments):
if query.startswith("with all_together as"):
if query.startswith(
"select e.fqid || %s || split_part(cf.collectionfield, %s, 2)"
):
return [self.fqfield()]
elif query.startswith("select fqid from events"):
return [self.fqid()]
Expand All @@ -47,6 +50,7 @@ def setup_di(reset_di): # noqa
injector.register_as_singleton(ReadDatabase, MagicMock)
injector.register_as_singleton(Messaging, MagicMock)
injector.register(EnvironmentService, EnvironmentService)
injector.register(ShutdownService, ShutdownService)
core_setup_di()


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
SqlReadDatabaseBackendService,
)
from datastore.shared.services import ReadDatabase
from datastore.shared.services.shutdown_service import ShutdownService
from datastore.shared.util import KEYSEPARATOR, ModelLocked
from datastore.writer.core import OccLocker
from datastore.writer.postgresql_backend import SqlOccLockerBackendService
Expand All @@ -20,6 +21,7 @@ def provide_di(reset_di): # noqa
injector.register(SqlQueryHelper, SqlQueryHelper)
injector.register(ReadDatabase, SqlReadDatabaseBackendService)
injector.register(OccLocker, SqlOccLockerBackendService)
injector.register(ShutdownService, ShutdownService)
yield


Expand Down Expand Up @@ -95,7 +97,9 @@ def test_raise_model_locked_multiple_reduced_to_one(
def test_raise_model_locked_multiple_different(
occ_locker, connection, mock_write_request
):
connection.query_list_of_single_values = lambda query, args: [args[0]]
connection.query_list_of_single_values = lambda query, args: [
args[2] if len(args) > 2 else args[0]
]
mock_write_request.locked_fqids = {"a/1": 2}
mock_write_request.locked_fqfields = {"a/1/f": 2}
mock_write_request.locked_collectionfields = {"a/f": 2}
Expand Down Expand Up @@ -129,12 +133,12 @@ def test_query_arguments_fqfield(occ_locker, connection):
assert (
args
== [
KEYSEPARATOR,
KEYSEPARATOR,
"a/1",
2,
"b/3",
42,
KEYSEPARATOR,
KEYSEPARATOR,
"a/1",
["a/f"],
"b/3",
Expand All @@ -143,12 +147,12 @@ def test_query_arguments_fqfield(occ_locker, connection):
) or (
args
== [
KEYSEPARATOR,
KEYSEPARATOR,
"b/3",
42,
"a/1",
2,
KEYSEPARATOR,
KEYSEPARATOR,
"b/3",
["b/e"],
"a/1",
Expand Down

0 comments on commit 3f13318

Please sign in to comment.