Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

read/write commands in backend #2652

Open
wants to merge 3 commits into
base: feature/relational-db
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 40 additions & 35 deletions openslides_backend/action/action_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

import fastjsonschema

from openslides_backend.database.db_connection_handling import get_new_os_conn

from ..shared.exceptions import (
ActionException,
DatastoreLockedException,
Expand Down Expand Up @@ -103,47 +105,50 @@ def handle_request(
parsing all actions. In the end it sends everything to the event store.
"""
with make_span(self.env, "handle request"):
self.user_id = user_id
self.internal = internal
with get_new_os_conn() as db_connection:
self.db_connection = db_connection
self.user_id = user_id
self.internal = internal

try:
payload_schema(payload)
except fastjsonschema.JsonSchemaException as exception:
raise ActionException(exception.message)
try:
payload_schema(payload)
except fastjsonschema.JsonSchemaException as exception:
raise ActionException(exception.message)

results: ActionsResponseResults = []
if atomic:
results = self.execute_write_requests(self.parse_actions, payload)
else:
results: ActionsResponseResults = []
if atomic:
results = self.execute_write_requests(self.parse_actions, payload)
else:

def transform_to_list(
tuple: tuple[WriteRequest | None, ActionResults | None]
) -> tuple[list[WriteRequest], ActionResults | None]:
return ([tuple[0]] if tuple[0] is not None else [], tuple[1])
def transform_to_list(
tuple: tuple[WriteRequest | None, ActionResults | None]
) -> tuple[list[WriteRequest], ActionResults | None]:
return ([tuple[0]] if tuple[0] is not None else [], tuple[1])

for element in payload:
try:
result = self.execute_write_requests(
lambda e: transform_to_list(self.perform_action(e)), element
)
results.append(result)
except ActionException as exception:
error = cast(ActionError, exception.get_json())
results.append(error)
self.datastore.reset()
for element in payload:
try:
result = self.execute_write_requests(
lambda e: transform_to_list(self.perform_action(e)),
element,
)
results.append(result)
except ActionException as exception:
error = cast(ActionError, exception.get_json())
results.append(error)
self.datastore.reset()

# execute cleanup methods
for on_success in self.on_success:
on_success()
# execute cleanup methods
for on_success in self.on_success:
on_success()

# Return action result
self.logger.info("Request was successful. Send response now.")
return ActionsResponse(
status_code=HTTPStatus.OK.value,
success=True,
message="Actions handled successfully",
results=results,
)
# Return action result
self.logger.info("Request was successful. Send response now.")
return ActionsResponse(
status_code=HTTPStatus.OK.value,
success=True,
message="Actions handled successfully",
results=results,
)

def execute_internal_action(self, action: str, data: dict[str, Any]) -> None:
"""Helper function to execute an internal action with user id -1."""
Expand Down
8 changes: 6 additions & 2 deletions openslides_backend/database/db_connection_handling.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,18 @@

import psycopg
import psycopg_pool

from openslides_backend.shared.env import Environment
from openslides_backend.shared.exceptions import DatabaseException

env = Environment(os.environ)
conn_string_without_db = f"host='{env.DATABASE_HOST}' port='{env.DATABASE_PORT}' user='{env.DATABASE_USER}' password='{env.PGPASSWORD}' "

def configure_connection(conn: psycopg.Connection) -> None:
""" callback, will be called after creation of new connection from connection pool"""
conn.isolation_level = psycopg.IsolationLevel.SERIALIZABLE

def create_os_conn_pool(open: bool = True) -> psycopg_pool.ConnectionPool:
"""create the global connection pool on the openslides-db"""
global os_conn_pool
if "os_conn_pool" in globals() and not os_conn_pool.closed:
os_conn_pool.close()
Expand All @@ -31,6 +34,7 @@ def create_os_conn_pool(open: bool = True) -> psycopg_pool.ConnectionPool:
max_idle=float(env.DB_POOL_MAX_IDLE),
reconnect_timeout=float(env.DB_POOL_RECONNECT_TIMEOUT),
num_workers=int(env.DB_POOL_NUM_WORKERS),
configure= configure_connection,
)
return os_conn_pool

Expand All @@ -49,7 +53,7 @@ def get_current_os_conn_pool() -> psycopg_pool.ConnectionPool:
return os_conn_pool


def get_current_os_conn() -> contextlib._GeneratorContextManager[psycopg.Connection]:
def get_new_os_conn() -> contextlib._GeneratorContextManager[psycopg.Connection]:
os_conn_pool = get_current_os_conn_pool()
return os_conn_pool.connection()

Expand Down
Loading
Loading