Skip to content

Commit

Permalink
feat!: add app.state
Browse files Browse the repository at this point in the history
Adds 2 new system tasks to load and store a snapshot of state.
Currently this is just `last_block_processed` and `last_block_seen`.

This paves the way for the `Parameter` feature, but does not include
that just yet.

BREAKING CHANGE: state snapshotting migrates from runner to worker
  • Loading branch information
fubuloubu committed May 30, 2024
1 parent 466f18a commit 532306b
Show file tree
Hide file tree
Showing 5 changed files with 147 additions and 50 deletions.
48 changes: 38 additions & 10 deletions example.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
from typing import Annotated

from ape import chain
Expand All @@ -11,31 +12,47 @@
# Do this first to initialize your app
app = SilverbackApp()

# Cannot call `app.state` outside of an app function handler
# app.state.something # NOTE: raises AttributeError

# NOTE: Don't do any networking until after initializing app
USDC = tokens["USDC"]
YFI = tokens["YFI"]


@app.on_startup()
def app_startup(startup_state: StateSnapshot):
# NOTE: This is called just as the app is put into "run" state,
# and handled by the first available worker
# raise Exception # NOTE: Any exception raised on startup aborts immediately
# This is called just as the app is put into "run" state,
# and handled by the first available worker

# Any exception raised on startup aborts immediately:
# raise Exception # NOTE: raises StartupFailure

# This is a great place to set `app.state` values
app.state.logs_processed = 0
# NOTE: Can put anything here, any python object works

return {"block_number": startup_state.last_block_seen}


# Can handle some resource initialization for each worker, like LLMs or database connections
class MyDB:
def execute(self, query: str):
pass
pass # Handle query somehow...


@app.on_worker_startup()
def worker_startup(state: TaskiqState): # NOTE: You need the type hint here
def worker_startup(worker_state: TaskiqState): # NOTE: You need the type hint to load worker state
# NOTE: Worker state is per-worker, not shared with other workers
# NOTE: Can put anything here, any python object works
state.db = MyDB()
state.block_count = 0
# raise Exception # NOTE: Any exception raised on worker startup aborts immediately
worker_state.db = MyDB()
worker_state.block_count = 0

# Any exception raised on worker startup aborts immediately:
# raise Exception # NOTE: raises StartupFailure

# Cannot call `app.state` because it is not set up yet on worker startup functions
# app.state.something # NOTE: raises AttributeError


# This is how we trigger off of new blocks
Expand All @@ -57,6 +74,9 @@ def exec_event1(log):
# NOTE: By default, if you have 3 tasks fail in a row, the app will shutdown itself
raise ValueError("I don't like the number 3.")

# You can update state whenever you want
app.state.logs_processed += 1

return {"amount": log.amount}


Expand All @@ -67,18 +87,26 @@ async def exec_event2(log: ContractLog):
# If you ever want the app to immediately shutdown under some scenario, raise this exception
raise CircuitBreaker("Oopsie!")

# All `app.state` values are updated across all workers at the same time
app.state.logs_processed += 1
# Do any other long running tasks...
await asyncio.sleep(5)
return log.amount


# A final job to execute on Silverback shutdown
@app.on_shutdown()
def app_shutdown():
# raise Exception # NOTE: Any exception raised on shutdown is ignored
# NOTE: Any exception raised on worker shutdown is ignored:
# raise Exception
return {"some_metric": 123}


# Just in case you need to release some resources or something inside each worker
@app.on_worker_shutdown()
def worker_shutdown(state: TaskiqState): # NOTE: You need the type hint here
# This is a good time to release resources
state.db = None
# raise Exception # NOTE: Any exception raised on worker shutdown is ignored

# NOTE: Any exception raised on worker shutdown is ignored:
# raise Exception
71 changes: 71 additions & 0 deletions silverback/application.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import atexit
from collections import defaultdict
from datetime import timedelta
from typing import Any, Callable

Expand All @@ -13,6 +14,7 @@

from .exceptions import ContainerTypeMismatchError, InvalidContainerTypeError
from .settings import Settings
from .state import AppDatastore, StateSnapshot
from .types import SilverbackID, TaskType


Expand All @@ -33,6 +35,24 @@ class TaskData(BaseModel):
# NOTE: Any other items here must have a default value


class SharedState(defaultdict):
def __init__(self):
# Any unknown key returns None
super().__init__(lambda: None)

def __getattr__(self, attr):
try:
return super().__getattr__(attr)
except AttributeError:
return super().__getitem__(attr)

def __setattr__(self, attr, val):
try:
super().__setattr__(attr, val)
except AttributeError:
super().__setitem__(attr, val)


class SilverbackApp(ManagerAccessMixin):
"""
The application singleton. Must be initialized prior to use.
Expand Down Expand Up @@ -113,6 +133,16 @@ def __init__(self, settings: Settings | None = None):
TaskType.SYSTEM_USER_ALL_TASKDATA, self.__get_user_all_taskdata_handler
)

# TODO: Make backup optional and settings-driven
# TODO: Allow configuring backup class
self.datastore = AppDatastore()
self._load_snapshot = self.__register_system_task(
TaskType.SYSTEM_LOAD_SNAPSHOT, self.__load_snapshot_handler
)
self._save_snapshot = self.__register_system_task(
TaskType.SYSTEM_SAVE_SNAPSHOT, self.__save_snapshot_handler
)

def __register_system_task(
self, task_type: TaskType, task_handler: Callable
) -> AsyncTaskiqDecoratedTask:
Expand Down Expand Up @@ -142,6 +172,47 @@ def __get_user_taskdata_handler(self, task_type: TaskType) -> list[TaskData]:
def __get_user_all_taskdata_handler(self) -> list[TaskData]:
return [v for k, l in self.tasks.items() if str(k).startswith("user:") for v in l]

async def __load_snapshot_handler(self) -> StateSnapshot:
# NOTE: This is not networked in any way, nor thread-safe nor multi-process safe,
# but will be accessible across multiple workers in a single container
# NOTE: *DO NOT USE* in Runner, as it will not be updated by the app
self.state = SharedState()
# NOTE: attribute does not exist before this task is executed,
# ensuring no one uses it during worker startup

if not (startup_state := await self.datastore.init(app_id=self.identifier)):
logger.warning("No state snapshot detected, using empty snapshot")
# TODO: Refactor to `None` by removing
self.state["system:last_block_seen"] = -1
self.state["system:last_block_processed"] = -1
startup_state = StateSnapshot(
# TODO: Migrate these to parameters (remove explicitly from state)
last_block_seen=-1,
last_block_processed=-1,
) # Use empty snapshot

return startup_state

async def __save_snapshot_handler(
self,
last_block_seen: int | None = None,
last_block_processed: int | None = None,
):
# Task that backups state before/after every non-system runtime task and at shutdown
if last_block_seen is not None:
self.state["system:last_block_seen"] = last_block_seen

if last_block_processed is not None:
self.state["system:last_block_processed"] = last_block_processed

snapshot = StateSnapshot(
# TODO: Migrate these to parameters (remove explicitly from state)
last_block_processed=self.state["system:last_block_seen"] or -1,
last_block_seen=self.state["system:last_block_processed"] or -1,
)

return await self.datastore.save(snapshot)

def broker_task_decorator(
self,
task_type: TaskType,
Expand Down
67 changes: 36 additions & 31 deletions silverback/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from .application import SilverbackApp, SystemConfig, TaskData
from .exceptions import Halt, NoTasksAvailableError, NoWebsocketAvailableError, StartupFailure
from .recorder import BaseRecorder, TaskResult
from .state import AppDatastore, StateSnapshot
from .state import StateSnapshot
from .subscriptions import SubscriptionType, Web3SubscriptionsManager
from .types import TaskType
from .utils import (
Expand All @@ -37,8 +37,6 @@ def __init__(
):
self.app = app
self.recorder = recorder
self.state = None
self.datastore = AppDatastore()

self.max_exceptions = max_exceptions
self.exceptions = 0
Expand Down Expand Up @@ -76,26 +74,12 @@ async def _checkpoint(
last_block_processed: int | None = None,
):
"""Set latest checkpoint block number"""
assert self.state, f"{self.__class__.__name__}.run() not triggered."
if not self.save_snapshot_supported:
return # Can't support this feature

logger.debug(
(
f"Checkpoint block [seen={self.state.last_block_seen}, "
f"procssed={self.state.last_block_processed}]"
)
)

if last_block_seen:
self.state.last_block_seen = last_block_seen
if last_block_processed:
self.state.last_block_processed = last_block_processed

if self.recorder:
try:
await self.datastore.set_state(self.state)

except Exception as err:
logger.error(f"Error setting state: {err}")
task = await self.app._save_snapshot.kiq(last_block_seen, last_block_processed)
if (result := await task.wait_result()).is_err:
logger.error(f"Error saving snapshot: {result.error}")

@abstractmethod
async def _block_task(self, task_data: TaskData):
Expand All @@ -106,7 +90,7 @@ async def _block_task(self, task_data: TaskData):
@abstractmethod
async def _event_task(self, task_data: TaskData):
"""
handle an event handler task for the given contract event
Handle an event handler task for the given contract event
"""

async def run(self):
Expand Down Expand Up @@ -148,20 +132,41 @@ async def run(self):
f", available task types:\n- {system_tasks_str}"
)

# NOTE: Bypass snapshotting if unsupported
self.save_snapshot_supported = TaskType.SYSTEM_SAVE_SNAPSHOT in system_tasks

# Load the snapshot (if available)
# NOTE: Add some additional handling to see if this feature is available in bot
if TaskType.SYSTEM_LOAD_SNAPSHOT not in system_tasks:
logger.warning(
"Silverback no longer supports runner-based snapshotting, "
"please upgrade your bot SDK version to latest."
)
startup_state = StateSnapshot(
last_block_seen=-1,
last_block_processed=-1,
) # Use empty snapshot

elif (
result := await run_taskiq_task_wait_result(
self._create_system_task_kicker(TaskType.SYSTEM_LOAD_SNAPSHOT)
)
).is_err:
raise StartupFailure(result.error)

else:
startup_state = result.return_value
logger.debug(f"Startup state: {startup_state}")
# NOTE: State snapshot is immediately out of date after init

# NOTE: Do this for other system tasks because they may not be in older SDK versions
# `if TaskType.<SYSTEM_TASK_NAME> not in system_tasks: raise StartupFailure(...)`
# or handle accordingly by having default logic if it is not available

# Initialize recorder (if available) and fetch state if app has been run previously
# Initialize recorder (if available)
if self.recorder:
await self.recorder.init(app_id=self.app.identifier)

if startup_state := (await self.datastore.init(app_id=self.app.identifier)):
self.state = startup_state

else: # use empty state
self.state = StateSnapshot(last_block_seen=-1, last_block_processed=-1)

# Execute Silverback startup task before we init the rest
startup_taskdata_result = await run_taskiq_task_wait_result(
self._create_system_task_kicker(TaskType.SYSTEM_USER_TASKDATA), TaskType.STARTUP
Expand All @@ -176,7 +181,7 @@ async def run(self):
)

startup_task_results = await run_taskiq_task_group_wait_results(
(task_handler for task_handler in startup_task_handlers), self.state
(task_handler for task_handler in startup_task_handlers), startup_state
)

if any(result.is_err for result in startup_task_results):
Expand Down
9 changes: 0 additions & 9 deletions silverback/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ async def init(self, app_id: SilverbackID) -> StateSnapshot | None:
Path.cwd() / ".silverback-sessions" / app_id.name / app_id.ecosystem / app_id.network
)
data_folder.mkdir(parents=True, exist_ok=True)

self.state_backup_file = data_folder / "state.json"

return (
Expand All @@ -53,12 +52,4 @@ async def init(self, app_id: SilverbackID) -> StateSnapshot | None:
)

async def save(self, snapshot: StateSnapshot):
if self.state_backup_file.exists():
old_snapshot = AppState.parse_file(self.snapshot_backup_file)
if old_snapshot.last_block_seen > snapshot.last_block_seen:
snapshot.last_block_seen = old_snapshot.last_block_seen
if old_snapshot.last_block_processed > snapshot.last_block_processed:
snapshot.last_block_processed = old_snapshot.last_block_processed

snapshot.last_updated = utc_now()
self.state_backup_file.write_text(snapshot.model_dump_json())
2 changes: 2 additions & 0 deletions silverback/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ class TaskType(str, Enum):
SYSTEM_CONFIG = "system:config"
SYSTEM_USER_TASKDATA = "system:user-taskdata"
SYSTEM_USER_ALL_TASKDATA = "system:user-all-taskdata"
SYSTEM_LOAD_SNAPSHOT = "system:load-snapshot"
SYSTEM_SAVE_SNAPSHOT = "system:save-snapshot"

# User-accessible Tasks
STARTUP = "user:startup"
Expand Down

0 comments on commit 532306b

Please sign in to comment.