Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Component Interactions, Remote CLI improvements. #336

Merged
merged 8 commits into from
Nov 8, 2024
96 changes: 17 additions & 79 deletions QUICK_START.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ Make sure you first have the necessary dependencies according to how you intend

## Client side CLI

Some client-side commands can be executed in the `AWS Remote CLI`, provided it is deployed.

**Note:** Remote CLI features, including all remote CLI commands and timer functionalities, are experimental. Please use them with caution and at your own risk.

### Setup a new workflow

To set up a new workflow, in your command line, navigate to the directory where you want to create the new workflow and run:
Expand Down Expand Up @@ -106,6 +110,8 @@ poetry run caribou remove <workflow_id>

Where `<workflow_id>` is the id of the workflow you want to remove.

**Note:** May be executed remotely with the `-r` or `--remote` flag to execute them remotely and asynchronously.

## Local Framework Component Execution

The Caribou framework components can be run locally for fast prototyping and development.
Expand All @@ -119,7 +125,9 @@ poetry run caribou log_sync
```

This might take a while, depending on the number of workflows and the amount of logs that need to be synced.
Also, there is an inherent buffer of five minutes, meaning that logs are only synced if they are at least five minutes old.
Also, there is an inherent buffer of fifteen minutes, meaning that logs are only synced if they are at least fifteen minutes old.

**Note:** May be executed remotely with the `-r` or `--remote` flag to execute them remotely and asynchronously.

### Data Collecting

Expand All @@ -134,7 +142,7 @@ Before we can generate a new deployment, we need to collect data from the provid
2. Collect data from the other collectors:

```bash
poetry run caribou data_collect all --workflow_id <workflow_id>
poetry run caribou data_collect all
```

Or collect data for a specific collector:
Expand All @@ -151,11 +159,13 @@ Before we can generate a new deployment, we need to collect data from the provid
- `workflow`
- `all`

The `all` and `workflow` collectors need a workflow id to be passed as an argument with `--workflow_id` or `-w`.
The `workflow` collectors need a workflow id to be passed as an argument with `--workflow_id` or `-w`.

The workflow collector is invoked by the manager and collects data for the workflows that are currently being solved.

**Note:** May be executed remotely with the `-r` or `--remote` flag to execute them remotely and asynchronously.
**Note:** For the data collectors to work locally, you must set some environment variables.
**Note:** The `all` collector does not collect `workflow` information, as `manage_deployments` would perform this automatically, the `all` collector performs `provider`, `carbon`, and `performance` collector in that order.

```bash
export ELECTRICITY_MAPS_AUTH_TOKEN=<your_token>
Expand All @@ -177,6 +187,8 @@ poetry run caribou manage_deployments
Refer to section 5.2 of the paper to learn about how we make this calculation.
If you execute this command and nothing happens it might be that the minimal threshold for the number of invocations has not been reached yet, which is set to 10 invocations.

**Note:** May be executed remotely with the `-r` or `--remote` flag to execute them remotely and asynchronously.

### Run Deployment Migrator

Make sure that you have the crane dependency installed.
Expand All @@ -190,6 +202,8 @@ poetry run caribou run_deployment_migrator

This will check if a new deployment is required for any workflow, and, if so, migrate the functions according to this new deployment.

**Note:** May be executed remotely with the `-r` or `--remote` flag to execute them remotely and asynchronously.

## Deployment to AWS (AWS Remote CLI)
To deploy the framework to AWS after completing the local setup process, use the following command while inside the main `caribou` directory.
Ensure that you can see both the `caribou` and `caribou-go` folders in this directory.
Expand Down Expand Up @@ -218,82 +232,6 @@ export ELECTRICITY_MAPS_AUTH_TOKEN=<your_token>
export GOOGLE_API_KEY=<your_key>
```

### How to Invoke the AWS Remote CLI
After deploying the `AWS Remote CLI`, you can run Caribou components by invoking the deployed lambda function using the returned Lambda ARN with the following event parameters.

- List workflows:
```json
{
"action": "list"
}
```

- Invoke workflow:

Where `argument` is the payload of the application.

```json
{
"action": "run",
"workflow_id": "workflow_name-version_number",
"argument": {}
}
```

- Remove Workflow:
```json
{
"action": "remove",
"workflow_id": "workflow_name-version_number"
}
```

- Perform Log Sync:
```json
{
"action": "log_sync"
}
```

- Perform Data collect:

`collector` can be one of the following options: `provider`, `carbon`, `performance`, `workflow`, or `all`.

`workflow_id` is only required for the `workflow` or `all` collector options.

```json
{
"action": "data_collect",
"collector": "all",
"workflow_id": "workflow_name-version_number"
}
```

- Manage Deployments:

`deployment_metrics_calculator_type` can be either `simple` (for the Python solver) or `go` (to use the Go solver) for deployment metrics determination.

```json
{
"action": "manage_deployments",
"deployment_metrics_calculator_type": "simple"
}
```

- Deployment Migration:
```json
{
"action": "run_deployment_migrator"
}
```

- Inquire Caribou Version:
```json
{
"action": "version"
}
```

## Setup Automatic Components (For AWS Remote CLI)
After deploying the AWS remote CLI `deploy_remote_cli`, the user can set up automatic timers for all relevant Caribou components.
This includes automating data collection (provider, performance, carbon, etc.), log synchronization, deployment management (solving for new deployments when needed), and deployment migration.
Expand Down
7 changes: 5 additions & 2 deletions caribou/common/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,10 +137,11 @@
FORGETTING_TIME_DAYS = 30 # 30 days
FORGETTING_NUMBER = 5000 # 5000 invocations
KEEP_ALIVE_DATA_COUNT = 10 # Keep sample it is part of any of the 10 samples for any execution or transmission
MIN_TIME_BETWEEN_SYNC = 15 # In Minutes

## Grace period for the log-syncer
## Used as lambda insights can be delayed
BUFFER_LAMBDA_INSIGHTS_GRACE_PERIOD = 5
BUFFER_LAMBDA_INSIGHTS_GRACE_PERIOD = 15 # In minutes

## Successor task types
REDIRECT_ONLY_TASK_TYPE = "REDIRECT_ONLY"
Expand All @@ -161,4 +162,6 @@
GO_PATH = pathlib.Path(__file__).parents[2].resolve() / "caribou-go"

# AWS Lambda Timeout
AWS_TIMEOUT_SECONDS = 850 # Lambda functions must terminate in 900 seconds, we leave 50 seconds as buffer time
AWS_TIMEOUT_SECONDS = (
800 # Lambda functions must terminate in 900 seconds, we leave some time as buffer time (For other operations)
)
6 changes: 6 additions & 0 deletions caribou/common/models/endpoints.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import os

from caribou.common.constants import GLOBAL_SYSTEM_REGION, INTEGRATION_TEST_SYSTEM_REGION
from caribou.common.models.remote_client.aws_remote_client import AWSRemoteClient
from caribou.common.models.remote_client.remote_client import RemoteClient
from caribou.common.models.remote_client.remote_client_factory import RemoteClientFactory
from caribou.common.provider import Provider
Expand Down Expand Up @@ -40,6 +41,8 @@ def __init__(self) -> None:
self._data_store_region = global_system_region
self._data_store_client = RemoteClientFactory.get_remote_client(provider, self._data_store_region)

self._framework_cli_remote_client = RemoteClientFactory.get_framework_cli_remote_client(global_system_region)

def get_deployment_resources_client(self) -> RemoteClient:
return self._deployment_resources_client

Expand All @@ -54,3 +57,6 @@ def get_data_collector_client(self) -> RemoteClient:

def get_datastore_client(self) -> RemoteClient:
return self._data_store_client

def get_framework_cli_remote_client(self) -> AWSRemoteClient:
return self._framework_cli_remote_client
52 changes: 46 additions & 6 deletions caribou/common/models/remote_client/aws_remote_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@
CARIBOU_WORKFLOW_IMAGES_TABLE,
DEPLOYMENT_RESOURCES_BUCKET,
GLOBAL_SYSTEM_REGION,
REMOTE_CARIBOU_CLI_FUNCTION_NAME,
SYNC_MESSAGES_TABLE,
SYNC_PREDECESSOR_COUNTER_TABLE,
)
from caribou.common.models.remote_client.remote_client import RemoteClient
from caribou.common.utils import compress_json_str, decompress_json_str
from caribou.deployment.common.deploy.models.resource import Resource

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -612,18 +614,28 @@ def send_message_to_messaging_service(self, identifier: str, message: str) -> No
client = self._client("sns")
client.publish(TopicArn=identifier, Message=message)

def set_value_in_table(self, table_name: str, key: str, value: str) -> None:
def set_value_in_table(self, table_name: str, key: str, value: str, convert_to_bytes: bool = False) -> None:
Danidite marked this conversation as resolved.
Show resolved Hide resolved
client = self._client("dynamodb")
client.put_item(TableName=table_name, Item={"key": {"S": key}, "value": {"S": value}})

def update_value_in_table(self, table_name: str, key: str, value: str) -> None:
if convert_to_bytes:
client.put_item(TableName=table_name, Item={"key": {"S": key}, "value": {"B": compress_json_str(value)}})
else:
client.put_item(TableName=table_name, Item={"key": {"S": key}, "value": {"S": value}})

def update_value_in_table(self, table_name: str, key: str, value: str, convert_to_bytes: bool = False) -> None:
client = self._client("dynamodb")
expression_attribute_values: dict[str, Any]
if convert_to_bytes:
expression_attribute_values = {":value": {"B": compress_json_str(value)}}
else:
expression_attribute_values = {":value": {"S": value}}

client.update_item(
TableName=table_name,
Key={"key": {"S": key}},
UpdateExpression="SET #v = :value",
ExpressionAttributeNames={"#v": "value"},
ExpressionAttributeValues={":value": {"S": value}},
ExpressionAttributeValues=expression_attribute_values,
)

def set_value_in_table_column(
Expand All @@ -646,7 +658,9 @@ def set_value_in_table_column(
UpdateExpression=update_expression,
)

def get_value_from_table(self, table_name: str, key: str, consistent_read: bool = True) -> tuple[str, float]:
def get_value_from_table(
self, table_name: str, key: str, consistent_read: bool = True, convert_from_bytes: bool = False
Danidite marked this conversation as resolved.
Show resolved Hide resolved
) -> tuple[str, float]:
client = self._client("dynamodb")
response = client.get_item(
TableName=table_name,
Expand All @@ -663,6 +677,9 @@ def get_value_from_table(self, table_name: str, key: str, consistent_read: bool

item = response.get("Item")
if item is not None and "value" in item:
if convert_from_bytes:
return decompress_json_str(item["value"]["B"]), consumed_read_capacity

return item["value"]["S"], consumed_read_capacity

return "", consumed_read_capacity
Expand All @@ -671,14 +688,18 @@ def remove_value_from_table(self, table_name: str, key: str) -> None:
client = self._client("dynamodb")
client.delete_item(TableName=table_name, Key={"key": {"S": key}})

def get_all_values_from_table(self, table_name: str) -> dict[str, Any]:
def get_all_values_from_table(self, table_name: str, convert_from_bytes: bool = False) -> dict[str, Any]:
client = self._client("dynamodb")
response = client.scan(TableName=table_name)
if "Items" not in response:
return {}
items = response.get("Items")
if items is not None:
if convert_from_bytes:
return {item["key"]["S"]: decompress_json_str(item["value"]["B"]) for item in items}

return {item["key"]["S"]: item["value"]["S"] for item in items}

return {}

def get_key_present_in_table(self, table_name: str, key: str, consistent_read: bool = True) -> bool:
Expand Down Expand Up @@ -1117,3 +1138,22 @@ def create_timer_rule(
Rule=rule_name,
Targets=[{"Id": f"{lambda_function_name}-target", "Arn": lambda_arn, "Input": event_payload}],
)

def invoke_remote_framework_internal_action(self, action_type: str, action_events: dict[str, Any]) -> None:
payload = {
"action": "internal_action",
"type": action_type,
"event": action_events,
}

self.invoke_remote_framework_with_payload(payload, invocation_type="Event")

def invoke_remote_framework_with_payload(self, payload: dict[str, Any], invocation_type: str = "Event") -> None:
# Get the boto3 lambda client
lambda_client = self._client("lambda")
remote_framework_cli_name = REMOTE_CARIBOU_CLI_FUNCTION_NAME

# Invoke the lambda function with the payload
lambda_client.invoke(
FunctionName=remote_framework_cli_name, InvocationType=invocation_type, Payload=json.dumps(payload)
)
Original file line number Diff line number Diff line change
Expand Up @@ -208,14 +208,16 @@ def upload_predecessor_data_at_sync_node(

return 0.0

def set_value_in_table(self, table_name: str, key: str, value: str) -> None:
def set_value_in_table(self, table_name: str, key: str, value: str, convert_to_bytes: bool = False) -> None:
conn = self._db_connection()
cursor = conn.cursor()
cursor.execute(f"INSERT INTO {table_name} (key, value) VALUES (?, ?)", (key, value))
conn.commit()
conn.close()

def get_value_from_table(self, table_name: str, key: str, consistent_read: bool = True) -> tuple[str, float]:
def get_value_from_table(
self, table_name: str, key: str, consistent_read: bool = True, convert_from_bytes: bool = False
) -> tuple[str, float]:
conn = self._db_connection()
cursor = conn.cursor()
cursor.execute(f"SELECT value FROM {table_name} WHERE key=?", (key,))
Expand Down Expand Up @@ -265,7 +267,7 @@ def set_predecessor_reached(
conn.close()
return [bool(res) for res in result], 0.0, 0.0

def get_all_values_from_table(self, table_name: str) -> dict:
def get_all_values_from_table(self, table_name: str, convert_from_bytes: bool = False) -> dict:
conn = self._db_connection()
cursor = conn.cursor()
cursor.execute(f"SELECT key, value FROM {table_name}")
Expand Down Expand Up @@ -457,7 +459,7 @@ def remove_resource(self, key: str) -> None:
conn.commit()
conn.close()

def update_value_in_table(self, table_name: str, key: str, value: str) -> None:
def update_value_in_table(self, table_name: str, key: str, value: str, convert_to_bytes: bool = False) -> None:
conn = self._db_connection()
cursor = conn.cursor()
cursor.execute(f"UPDATE {table_name} SET value=? WHERE key=?", (value, key))
Expand Down
8 changes: 4 additions & 4 deletions caribou/common/models/remote_client/mock_remote_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@ def get_predecessor_data(self, current_instance_name, workflow_instance_id, cons
def upload_predecessor_data_at_sync_node(self, function_name, workflow_instance_id, message):
pass

def set_value_in_table(self, table_name, key, value):
def set_value_in_table(self, table_name, key, value, convert_to_bytes: bool = False):
pass

def get_value_from_table(self, table_name, key, consistent_read: bool = True):
def get_value_from_table(self, table_name, key, consistent_read: bool = True, convert_from_bytes: bool = False):
pass

def upload_resource(self, key, resource):
Expand Down Expand Up @@ -74,7 +74,7 @@ def set_predecessor_reached(
) -> list[bool]:
pass

def get_all_values_from_table(self, table_name: str) -> dict:
def get_all_values_from_table(self, table_name: str, convert_from_bytes: bool = False) -> dict:
pass

def set_value_in_table_column(
Expand Down Expand Up @@ -112,7 +112,7 @@ def get_topic_identifier(self, topic_name: str) -> str:
def remove_resource(self, key: str) -> None:
pass

def update_value_in_table(self, table_name: str, key: str, value: str) -> None:
def update_value_in_table(self, table_name: str, key: str, value: str, convert_to_bytes: bool = False) -> None:
pass

def get_logs_between(self, function_instance: str, start: datetime, end: datetime) -> list[str]:
Expand Down
Loading
Loading