Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: add app.state #80

Merged
merged 17 commits into from
Oct 9, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@ RUN pip wheel . --wheel-dir=/wheels
# Install from wheels
FROM ghcr.io/apeworx/ape:${BASE_APE_IMAGE_TAG:-latest}
USER root
COPY --from=builder /wheels /wheels
COPY --from=builder /wheels/*.whl /wheels
RUN pip install --upgrade pip \
&& pip install silverback \
&& pip install \
--no-cache-dir --no-index --find-links=/wheels \
'taskiq-sqs>=0.0.11' \
johnson2427 marked this conversation as resolved.
Show resolved Hide resolved
--no-cache-dir --find-links=/wheels
silverback

USER harambe

ENTRYPOINT ["silverback"]
54 changes: 27 additions & 27 deletions example.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,29 +7,29 @@
from ape_tokens import tokens # type: ignore[import]
from taskiq import Context, TaskiqDepends, TaskiqState

from silverback import CircuitBreaker, SilverbackApp, StateSnapshot
from silverback import CircuitBreaker, SilverbackBot, StateSnapshot

# Do this first to initialize your app
app = SilverbackApp()
# Do this first to initialize your bot
bot = SilverbackBot()

# Cannot call `app.state` outside of an app function handler
# app.state.something # NOTE: raises AttributeError
# Cannot call `bot.state` outside of an bot function handler
# bot.state.something # NOTE: raises AttributeError

# NOTE: Don't do any networking until after initializing app
# NOTE: Don't do any networking until after initializing bot
USDC = tokens["USDC"]
YFI = tokens["YFI"]


@app.on_startup()
def app_startup(startup_state: StateSnapshot):
# This is called just as the app is put into "run" state,
@bot.on_startup()
def bot_startup(startup_state: StateSnapshot):
# This is called just as the bot is put into "run" state,
# and handled by the first available worker

# Any exception raised on startup aborts immediately:
# raise Exception # NOTE: raises StartupFailure

# This is a great place to set `app.state` values
app.state.logs_processed = 0
# This is a great place to set `bot.state` values
bot.state.logs_processed = 0
# NOTE: Can put anything here, any python object works

return {"block_number": startup_state.last_block_seen}
Expand All @@ -41,7 +41,7 @@ def execute(self, query: str):
pass # Handle query somehow...


@app.on_worker_startup()
@bot.on_worker_startup()
# NOTE: This event is triggered internally, do not use unless you know what you're doing
def worker_startup(worker_state: TaskiqState): # NOTE: You need the type hint to load worker state
# NOTE: Worker state is per-worker, not shared with other workers
Expand All @@ -51,12 +51,12 @@ def worker_startup(worker_state: TaskiqState): # NOTE: You need the type hint t
# Any exception raised on worker startup aborts immediately:
# raise Exception # NOTE: raises StartupFailure

# Cannot call `app.state` because it is not set up yet on worker startup functions
# app.state.something # NOTE: raises AttributeError
# Cannot call `bot.state` because it is not set up yet on worker startup functions
# bot.state.something # NOTE: raises AttributeError


# This is how we trigger off of new blocks
@app.on_(chain.blocks)
@bot.on_(chain.blocks)
# NOTE: The type hint for block is `BlockAPI`, but we parse it using `EcosystemAPI`
# NOTE: If you need something from worker state, you have to use taskiq context
def exec_block(block: BlockAPI, context: Annotated[Context, TaskiqDepends()]):
Expand All @@ -66,48 +66,48 @@ def exec_block(block: BlockAPI, context: Annotated[Context, TaskiqDepends()]):

# This is how we trigger off of events
# Set new_block_timeout to adjust the expected block time.
@app.on_(USDC.Transfer, start_block=19784367, new_block_timeout=25)
@bot.on_(USDC.Transfer, start_block=19784367, new_block_timeout=25)
# NOTE: Typing isn't required, it will still be an Ape `ContractLog` type
def exec_event1(log):
if log.log_index % 7 == 3:
# If you raise any exception, Silverback will track the failure and keep running
# NOTE: By default, if you have 3 tasks fail in a row, the app will shutdown itself
# NOTE: By default, if you have 3 tasks fail in a row, the bot will shutdown itself
raise ValueError("I don't like the number 3.")

# You can update state whenever you want
app.state.logs_processed += 1
bot.state.logs_processed += 1

return {"amount": log.amount}


@app.on_(YFI.Approval)
@bot.on_(YFI.Approval)
# Any handler function can be async too
async def exec_event2(log: ContractLog):
# All `app.state` values are updated across all workers at the same time
app.state.logs_processed += 1
# All `bot.state` values are updated across all workers at the same time
bot.state.logs_processed += 1
# Do any other long running tasks...
await asyncio.sleep(5)
return log.amount


@app.on_(chain.blocks)
@bot.on_(chain.blocks)
# NOTE: You can have multiple handlers for any trigger we support
def check_logs(log):
if app.state.logs_processed > 20:
# If you ever want the app to immediately shutdown under some scenario, raise this exception
if bot.state.logs_processed > 20:
# If you ever want the bot to immediately shutdown under some scenario, raise this exception
raise CircuitBreaker("Oopsie!")


# A final job to execute on Silverback shutdown
@app.on_shutdown()
def app_shutdown():
@bot.on_shutdown()
def bot_shutdown():
# NOTE: Any exception raised on worker shutdown is ignored:
# raise Exception
return {"some_metric": 123}


# Just in case you need to release some resources or something inside each worker
@app.on_worker_shutdown()
@bot.on_worker_shutdown()
def worker_shutdown(state: TaskiqState): # NOTE: You need the type hint here
# This is a good time to release resources
state.db = None
Expand Down
4 changes: 2 additions & 2 deletions silverback/__init__.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
from .application import SilverbackApp
from .main import SilverbackBot
from .exceptions import CircuitBreaker, SilverbackException
from .state import StateSnapshot

__all__ = [
"StateSnapshot",
"CircuitBreaker",
"SilverbackApp",
"SilverbackBot",
"SilverbackException",
]
8 changes: 4 additions & 4 deletions silverback/_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
@click.group(cls=SectionedHelpGroup)
def cli():
"""
Silverback: Build Python apps that react to on-chain events
Silverback: Build Python bots that react to on-chain events

To learn more about our cloud offering, please check out https://silverback.apeworx.io
"""
Expand Down Expand Up @@ -110,7 +110,7 @@ def _network_callback(ctx, param, val):
@click.option("-x", "--max-exceptions", type=int, default=3)
@click.argument("bot", required=False, callback=bot_path_callback)
def run(cli_ctx, account, runner_class, recorder_class, max_exceptions, bot):
"""Run Silverback application"""
"""Run Silverback bot"""

if not runner_class:
# NOTE: Automatically select runner class
Expand All @@ -120,7 +120,7 @@ def run(cli_ctx, account, runner_class, recorder_class, max_exceptions, bot):
runner_class = PollingRunner
else:
raise click.BadOptionUsage(
option_name="network", message="Network choice cannot support running app"
option_name="network", message="Network choice cannot support running bot"
)

runner = runner_class(
Expand Down Expand Up @@ -213,7 +213,7 @@ def login(auth: FiefAuth):

@cli.group(cls=SectionedHelpGroup, section="Cloud Commands (https://silverback.apeworx.io)")
def cluster():
"""Manage a Silverback hosted application cluster
"""Manage a Silverback hosted bot cluster

For clusters on the Silverback Platform, please provide a name for the cluster to access under
your platform account via `-c WORKSPACE/NAME`"""
Expand Down
4 changes: 2 additions & 2 deletions silverback/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,11 @@ def __init__(self):

class Halt(SilverbackException):
def __init__(self):
super().__init__("App halted, must restart manually")
super().__init__("Bot halted, must restart manually")


class CircuitBreaker(Halt):
"""Custom exception (created by user) that will trigger an application shutdown."""
"""Custom exception (created by user) that will trigger an bot shutdown."""

def __init__(self, message: str):
super(SilverbackException, self).__init__(message)
54 changes: 27 additions & 27 deletions silverback/application.py → silverback/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
class SystemConfig(BaseModel):
# NOTE: Do not change this datatype unless major breaking

# NOTE: Useful for determining if Runner can handle this app
# NOTE: Useful for determining if Runner can handle this bot
sdk_version: str
# NOTE: Useful for specifying what task types can be specified by app
# NOTE: Useful for specifying what task types can be specified by bot
task_types: list[str]


Expand All @@ -37,7 +37,7 @@ class TaskData(BaseModel):

class SharedState(defaultdict):
"""
Class containing the application shared state that all workers can read from and write to.
Class containing the bot shared state that all workers can read from and write to.

```{warning}
This is not networked in any way, nor is it multi-process safe, but will be
Expand All @@ -46,19 +46,19 @@ class SharedState(defaultdict):

Usage example::

@app.on_(...)
@bot.on_(...)
def do_something_with_state(value):
# Read from state using `getattr`
... = app.state.something
... = bot.state.something

# Set state using `setattr`
app.state.something = ...
bot.state.something = ...

# Read from state using `getitem`
... = app.state["something"]
... = bot.state["something"]

# Set state using setitem
app.state["something"] = ...
bot.state["something"] = ...
"""

# TODO: This class does not have thread-safe access control, but should remain safe due to
Expand All @@ -82,22 +82,22 @@ def __setattr__(self, attr, val):
super().__setitem__(attr, val)


class SilverbackApp(ManagerAccessMixin):
class SilverbackBot(ManagerAccessMixin):
"""
The application singleton. Must be initialized prior to use.
The bot singleton. Must be initialized prior to use.

Usage example::

from silverback import SilverbackApp
from silverback import SilverbackBot

app = SilverbackApp()
bot = SilverbackBot()

... # Connection has been initialized, can call broker methods e.g. `app.on_(...)`
... # Connection has been initialized, can call broker methods e.g. `bot.on_(...)`
"""

def __init__(self, settings: Settings | None = None):
"""
Create app
Create bot

Args:
settings (~:class:`silverback.settings.Settings` | None): Settings override.
Expand All @@ -111,7 +111,7 @@ def __init__(self, settings: Settings | None = None):
provider = provider_context.__enter__()

self.identifier = SilverbackID(
name=settings.APP_NAME,
name=settings.BOT_NAME,
network=provider.network.name,
ecosystem=provider.network.ecosystem.name,
)
Expand All @@ -123,7 +123,7 @@ def __init__(self, settings: Settings | None = None):
settings.NEW_BLOCK_TIMEOUT = int(timedelta(days=1).total_seconds())

settings_str = "\n ".join(f'{key}="{val}"' for key, val in settings.dict().items() if val)
logger.info(f"Loading Silverback App with settings:\n {settings_str}")
logger.info(f"Loading Silverback Bot with settings:\n {settings_str}")

self.broker = settings.get_broker()
self.tasks: dict[TaskType, list[TaskData]] = {
Expand All @@ -146,7 +146,7 @@ def __init__(self, settings: Settings | None = None):

network_choice = f"{self.identifier.ecosystem}:{self.identifier.network}"
logger.success(
f'Loaded Silverback App:\n NETWORK="{network_choice}"'
f'Loaded Silverback Bot:\n NETWORK="{network_choice}"'
f"{signer_str}{new_block_timeout_str}"
)

Expand Down Expand Up @@ -198,7 +198,7 @@ def __get_user_all_taskdata_handler(self) -> list[TaskData]:
return [v for k, l in self.tasks.items() if str(k).startswith("user:") for v in l]

async def __load_snapshot_handler(self, startup_state: StateSnapshot):
# NOTE: *DO NOT USE* in Runner, as it will not be updated by the app
# NOTE: *DO NOT USE* in Runner, as it will not be updated by the bot
self.state = SharedState()
# NOTE: attribute does not exist before this task is executed,
# ensuring no one uses it during worker startup
Expand Down Expand Up @@ -295,11 +295,11 @@ def add_taskiq_task(handler: Callable) -> AsyncTaskiqDecoratedTask:
def on_startup(self) -> Callable:
"""
Code that will be exected by one worker after worker startup, but before the
application is put into the "run" state by the Runner.
bot is put into the "run" state by the Runner.

Usage example::

@app.on_startup()
@bot.on_startup()
def do_something_on_startup(startup_state: StateSnapshot):
... # Reprocess missed events or blocks
"""
Expand All @@ -308,13 +308,13 @@ def do_something_on_startup(startup_state: StateSnapshot):
def on_shutdown(self) -> Callable:
"""
Code that will be exected by one worker before worker shutdown, after the
Runner has decided to put the application into the "shutdown" state.
Runner has decided to put the bot into the "shutdown" state.

Usage example::

@app.on_shutdown()
@bot.on_shutdown()
def do_something_on_shutdown():
... # Record final state of app
... # Record final state of bot
"""
return self.broker_task_decorator(TaskType.SHUTDOWN)

Expand All @@ -330,7 +330,7 @@ def on_worker_startup(self) -> Callable:

Usage example::

@app.on_worker_startup()
@bot.on_worker_startup()
def do_something_on_startup(state):
... # Can provision resources, or add things to `state`.
"""
Expand All @@ -348,7 +348,7 @@ def on_worker_shutdown(self) -> Callable:

Usage example::

@app.on_worker_shutdown()
@bot.on_worker_shutdown()
def do_something_on_shutdown(state):
... # Update some external service, perhaps using information from `state`.
"""
Expand All @@ -367,13 +367,13 @@ def on_(
Args:
container: (BlockContainer | ContractEvent): The event source to watch.
new_block_timeout: (int | None): Override for block timeout that is acceptable.
Defaults to whatever the app's settings are for default polling timeout are.
Defaults to whatever the bot's settings are for default polling timeout are.
start_block (int | None): block number to start processing events from.
Defaults to whatever the latest block is.

Raises:
:class:`~silverback.exceptions.InvalidContainerTypeError`:
If the type of `container` is not configurable for the app.
If the type of `container` is not configurable for the bot.
"""
if isinstance(container, BlockContainer):
if new_block_timeout is not None:
Expand Down
Loading
Loading