Critique request: Implementing Litestar 2.0 with other concurrent processes sharing access to a data repository object #2713
-
Hello everyone: The app is a backend running 3 concurrent processes all accessing the same data repository which is an instance of a class. These are the processes:
I have followed the documentation about the Application objects, from where I got these key takeaways for my use case:
Am I correct in my conclusions for my design? Is there something else I should consider besides this approach I took? I have completed the code to illustrate the above mimicking the functionality of the app. It is available on this GitHub repository as it may be quite large for this post. |
Beta Was this translation helpful? Give feedback.
Replies: 1 comment 1 reply
-
Here are some thoughts and questions:
I don't see anything running a dedicated process, just asyncio tasks. Are those what you are talking about here? Code like this: asyncio.create_task(client_a.loop()) is dangerous; If you don't save a reference to your tasks, they might be garbage collected at some point. I recommend using a Your setup code can also be simplified somewhat: @asynccontextmanager
async def manage_client_lifespan(app: Litestar) -> AsyncGenerator[None, None]:
repo_a = getattr(app.state, "repo_a", None)
if repo_a is None:
repo_a = app.state.repo_a= Repo("A")
repo_b = getattr(app.state, "repo_b", None)
if repo_b is None:
repo_b = app.state.repo_b = Repo("B")
client_a = app.state.client_a = ClientA(repo_a)
client_b = app.state.client_b = ClientB(repo_b)
# the TaskGroup keeps a reference to the tasks, will await them when its exited
# and cancels them on errors
async with asyncio.TaskGroup() as tg:
tg.create_task(client_a.loop())
tg.create_task(client_b.loop())
try:
yield
finally:
await client_a.dispose()
await client_b.dispose() I'm assuming the actual clients have more logic than the ones shown in the example repo, but generally, when you have an infinite loop running within an asyncio task, you want to be able to gracefully shut it down, so it's good to add an exit condition: import asyncio
from repo import Repo
class ClientA():
"""A process that executes every 2 seconds"""
def __init__(self, repo: Repo):
self.shared_repo = repo
print(f"Client A initialized with repository created by: {self.shared_repo.creator} with current data: {self.shared_repo.current_values()}")
self._is_active = True
async def loop(self):
"""This is the continuos task"""
while self._is_active:
got = self.shared_repo.increment_counter_a()
print(f"A says: {got}")
# Clears itself after a while
if got[0] > 8:
got = self.shared_repo.clear_counter_a()
print(f"A cleared itself: {got}")
await asyncio.sleep(2)
def dispose(self):
"""To be called when shutting down the app"""
self._is_active = False
print("Client A disposed") Now, calling The alternative to this would be to cancel the tasks, something like this: @asynccontextmanager
async def manage_client_lifespan(app: Litestar) -> AsyncGenerator[None, None]:
repo_a = getattr(app.state, "repo_a", None)
if repo_a is None:
repo_a = app.state.repo_a= Repo("A")
repo_b = getattr(app.state, "repo_b", None)
if repo_b is None:
repo_b = app.state.repo_b = Repo("B")
client_a = app.state.client_a = ClientA(repo_a)
client_b = app.state.client_b = ClientB(repo_b)
async with asyncio.TaskGroup() as tg:
task_a = tg.create_task(client_a.loop())
task_b = tg.create_task(client_b.loop())
try:
yield
finally:
task_a.cancel()
task_b.cancel()
await client_a.dispose()
await client_b.dispose() this though is more error prone and I generally recommend the former solution unless you have a good reason not to. Finally, your state management / setup can be simplified: class MyState(State).
repo_a: Repo
repo_b: Repo
client_a: ClientA | None
client_b: ClientB | None
def get_repo_a(state: MyState) -> Repo:
return state.repo_a
def get_repo_b(state: MyState) -> Repo:
return state.repo_b
@asynccontextmanager
async def manage_client_lifespan(app: Litestar) -> AsyncGenerator[None, None]:
client_a = app.state.client_a = ClientA(app.state.repo_a)
client_b = app.state.client_b = ClientB(app.state.repo_a)
# the TaskGroup keeps a reference to the tasks, will await them when its exited
# and cancels them on errors
async with asyncio.TaskGroup() as tg:
tg.create_task(client_a.loop())
tg.create_task(client_b.loop())
try:
yield
finally:
await client_a.dispose()
await client_b.dispose()
@put("/clear_a", sync_to_thread=False)
def clear_a(repo_a: Repo) -> tuple:
"""Handler function that clears the counter for A."""
got = repo_a.clear_counter_a()
logger.info("Repo value in handler from `State`: %s after clearing A", got)
return got
@put("/clear_b", sync_to_thread=False)
def clear_b(repo_b: Repo) -> tuple:
"""Handler function that clears the counter for B."""
got = repo_b.clear_counter_b()
logger.info("Repo value in handler from `State`: %s after clearing B", got)
return got
app = Litestar(
lifespan=[manage_client_lifespan],
route_handlers=[current_state, clear_a, clear_b],
state=State({"repo_a": Repo("A"), "repo_b": Repo("B)}),
dependencies={
"repo_a": get_repo_a,
"repo_b": get_repo_b,
}
) As a last step, you could simplify the repository routes, using path parameters to provide the repository name: @put("/clear_{repo_name: str}", sync_to_thread=False)
def clear_a(repo_a: Repo, repo_b: Repo, repo_name: Literal["a", "b"]) -> tuple:
"""Handler function that clears the counter for A."""
repo = repo_a if repo_name == "a" else repo_b
return repo.clear_counter() Adding the I hope this helps :) |
Beta Was this translation helpful? Give feedback.
Here are some thoughts and questions:
I don't see anything running a dedicated process, just asyncio tasks. Are those what you are talking about here?
Code like this:
is dangerous; If you don't save a reference to your tasks, they might be garbage collected at some point. I recommend using a
TaskGroup
to handle those instead.Your setup code can also be simplified somewhat: