Skip to content

Commit

Permalink
Clean up migration
Browse files Browse the repository at this point in the history
  • Loading branch information
jsangmeister committed Sep 7, 2023
1 parent fc6614f commit ff1539a
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 345 deletions.
300 changes: 21 additions & 279 deletions openslides_backend/migrations/migrations/0045_fix_amendment_paragraph.py
Original file line number Diff line number Diff line change
@@ -1,293 +1,35 @@
from collections import defaultdict
from enum import Enum, auto
from typing import Any, Callable, Dict, List, NamedTuple, Optional, Tuple, TypedDict
from typing import Any, Dict, List, Optional

from datastore.migrations import BaseModelMigration, MigrationException
from datastore.writer.core import (
BaseRequestEvent,
RequestCreateEvent,
RequestUpdateEvent,
)
from datastore.migrations import BaseModelMigration
from datastore.writer.core import BaseRequestEvent, RequestUpdateEvent

from openslides_backend.shared.patterns import (
Collection,
FullQualifiedId,
fqid_from_collection_and_id,
)


class FieldStrategy(Enum):
"""
Defines various strategies for handling template/structured fields.
Reminder: structured fields are template fields with inserted replacement.
"""

Rename = auto()
"""
Rename all structured fields and remove this template field.
"""

Merge = auto()
"""
Merges all replacements of this template field into one field.
"""

MergeToJSON = auto()
"""
Builds a JSON object from all structured fields of this template field.
"""

MoveToMeetingUser = auto()
"""
Moves this `user` template field to the `meeting_user` collection.
"""

ReplaceWithMeetingUsers = auto()
"""
Replaces this relation field pointing to the `user` collection with a list of `meeting_user` ids.
"""

MoveToMeetingUserAndReplace = auto()
"""
Combination of MoveToMeetingUser and ReplaceWithMeetingUsers. Used for previous self-referencing
`user` fields which are now fields of `meeting_user`.
"""


FieldNameFunc = Callable[[str], str]


class ParametrizedFieldStrategy(TypedDict):
strategy: FieldStrategy
name: str | Dict[str, str]


TEMPLATE_FIELDS: Dict[
Collection, Dict[str, FieldStrategy | ParametrizedFieldStrategy]
] = {
"motion": {
"amendment_paragraph_$": {
"strategy": FieldStrategy.MergeToJSON,
"name": "amendment_paragraphs",
},
},
}


class MeetingUserKey(NamedTuple):
meeting_id: int
user_id: int


class MeetingUsersDict(Dict[MeetingUserKey, Dict[str, Any]]):
last_id: int
ids_by_parent_object: Dict[Collection, Dict[int, List[int]]]

def __init__(self, *args: Any, **kwargs: Any):
super().__init__(*args, **kwargs)
self.last_id = 0
self.ids_by_parent_object = {
"user": defaultdict(list),
"meeting": defaultdict(list),
}

def __missing__(self, key: MeetingUserKey) -> Dict[str, Any]:
self.last_id += 1
self.ids_by_parent_object["user"][key.user_id].append(self.last_id)
self.ids_by_parent_object["meeting"][key.meeting_id].append(self.last_id)
self[key] = {
"id": self.last_id,
"user_id": key.user_id,
"meeting_id": key.meeting_id,
}
return self[key]
from openslides_backend.shared.patterns import fqid_from_collection_and_id


class Migration(BaseModelMigration):
"""
This migration removes all template fields. It iterates over the fields in TEMPLATE_FIELDS,
where a _strategy_ is defined for each field, potentially with a differing new name for the
field. The strategy defines how the field is migrated. It is first _resolved_ into the actual
strategy enum and a function that converts the old field name into the new one. Then, the
strategy is _applied_ to all models in the database which results in a list of updates to the
models. Lastly, the events are generated from these updates and the meeting users which were
created along the way.
Migration 44 had a typo in the field `motion/amendment_paragraph_$`. This migration fixes that.
"""

target_migration_index = 46

def migrate_models(self) -> Optional[List[BaseRequestEvent]]:
self.meeting_users = MeetingUsersDict()
updates: Dict[FullQualifiedId, Dict[str, Any]] = defaultdict(dict)

for collection, fields in TEMPLATE_FIELDS.items():
db_models = self.reader.get_all(collection)
for id, model in db_models.items():
update = updates[fqid_from_collection_and_id(collection, id)]
for old_field, _strategy in fields.items():
if old_field in model:
strategy, new_field_func = self.resolve_strategy(_strategy)
# all user template fields except committee_$_management_level have the
# meeting as replacement collection
replacement_collection = (
"meeting"
if collection == "user"
and old_field != "committee_$_management_level"
else None
)
update.update(
**self.apply_strategy(
model,
strategy,
old_field,
new_field_func,
replacement_collection,
)
)
old_field = "amendment_paragraph_$"
new_field = "amendment_paragraphs"

def migrate_models(self) -> Optional[List[BaseRequestEvent]]:
events: List[BaseRequestEvent] = []
# Create meeting users
events.extend(
RequestCreateEvent(
fqid_from_collection_and_id("meeting_user", model["id"]), model
)
for model in self.meeting_users.values()
)
# Update meetings and users with meeting users
for collection in ("meeting", "user"):
events.extend(
RequestUpdateEvent(
fqid_from_collection_and_id(collection, parent_id),
{"meeting_user_ids": meeting_user_ids},
db_models = self.reader.get_all("motion")
for id, model in db_models.items():
if self.old_field in model:
update: Dict[str, Any] = {self.old_field: None, self.new_field: {}}
for replacement in model.get(self.old_field, []):
structured_field = self.old_field.replace("$", f"${replacement}")
update[structured_field] = None
if structured_value := model.get(structured_field):
update[self.new_field][replacement] = structured_value
events.append(
RequestUpdateEvent(
fqid_from_collection_and_id("motion", id), update
)
)
for parent_id, meeting_user_ids in self.meeting_users.ids_by_parent_object[
collection
].items()
)
# Create all other update events
events.extend(
RequestUpdateEvent(fqid, model) for fqid, model in updates.items() if model
)
return events

def apply_strategy(
self,
model: Dict[str, Any],
strategy: FieldStrategy,
old_field: str,
new_field_func: FieldNameFunc,
replacement_collection: str | None,
) -> Dict[str, Any]:
# always remove the old field
update: Dict[str, Any] = {
old_field: None,
}

def get_meeting_user_ids(
meeting_id: int, user_ids: int | List[int]
) -> int | List[int]:
if isinstance(user_ids, list):
return [
self.meeting_users[MeetingUserKey(meeting_id, user_id)]["id"]
for user_id in user_ids
]
else:
key = MeetingUserKey(meeting_id, user_ids)
return self.meeting_users[key]["id"]

new_field = new_field_func(old_field)
if strategy is FieldStrategy.ReplaceWithMeetingUsers:
# replace user ids with meeting_user ids
update[new_field] = get_meeting_user_ids(
model["meeting_id"], model[old_field]
)
else:
new_value: List[Any] = []
for replacement in model[old_field]:
structured_field = old_field.replace("$", f"${replacement}")
# always remove the old field
update[structured_field] = None

if replacement_collection:
# check if the replacement actually exists, otherwise skip it
fqid = fqid_from_collection_and_id(
replacement_collection, replacement
)
if not self.reader.is_alive(fqid):
continue

if structured_value := model.get(structured_field):
if strategy is FieldStrategy.Rename:
# move value to new field
new_structured_field = new_field_func(structured_field)
update[new_structured_field] = structured_value
elif strategy is FieldStrategy.Merge:
# merge values together into a single list
new_value.extend(structured_value)
elif strategy is FieldStrategy.MergeToJSON:
# merge values together into a single list of key-value pairs
new_value.append((replacement, structured_value))
elif strategy in (
FieldStrategy.MoveToMeetingUser,
FieldStrategy.MoveToMeetingUserAndReplace,
):
# move value to new field in meeting_user
meeting_id = int(replacement)
key = MeetingUserKey(meeting_id, model["id"])
# replace user ids with meeting_user ids, if necessary
self.meeting_users[key][new_field] = (
structured_value
if strategy is FieldStrategy.MoveToMeetingUser
else get_meeting_user_ids(meeting_id, structured_value)
)
else:
raise MigrationException("Invalid strategy")

if new_value:
if strategy is FieldStrategy.MergeToJSON:
# make dict from key-value pairs
update[new_field] = dict(new_value)
else:
update[new_field] = new_value
return update

def resolve_strategy(
self, strategy: FieldStrategy | ParametrizedFieldStrategy
) -> Tuple[FieldStrategy, FieldNameFunc]:
"""
Resolves a (parametrized) strategy to a tuple of strategy and the new field name.
"""
if isinstance(strategy, dict):
return (strategy["strategy"], self.get_name_func_from_parameters(strategy))
else:
return (strategy, self.get_name_func_for_strategy(strategy))

def get_name_func_from_parameters(
self, strategy: ParametrizedFieldStrategy
) -> FieldNameFunc:
# see https://github.com/python/mypy/issues/4297 for an explanation for the redundant variables
if isinstance(strategy["name"], str):
name = strategy["name"]
return lambda _: name
elif isinstance(strategy["name"], dict):
name_map = strategy["name"]
return lambda field: (
name_map[field] if field in name_map else field.replace("$", "")
)
else:
raise MigrationException("Invalid name parameter")

def get_name_func_for_strategy(self, strategy: FieldStrategy) -> FieldNameFunc:
if strategy is FieldStrategy.Rename:
return lambda field: field.replace("$", "")
elif strategy in (
FieldStrategy.Merge,
FieldStrategy.MergeToJSON,
FieldStrategy.MoveToMeetingUser,
FieldStrategy.MoveToMeetingUserAndReplace,
):
return lambda field: field.replace("_$", "")
elif strategy is FieldStrategy.ReplaceWithMeetingUsers:
return lambda field: f"meeting_{field}"
else:
raise MigrationException("Invalid strategy")
Loading

0 comments on commit ff1539a

Please sign in to comment.