Skip to content

Commit

Permalink
add async deployment function as service code (#1)
Browse files Browse the repository at this point in the history
* add async deployment function as service code

* nit: initialize nats connection globally

* Create AsyncFunctionDeployment class, refactor code, add example

* format code using black

* Update README.md

* run isort

* fix circular import

* resolve PR comments

* pr comments resolve

* resolve comments, rename class. Update examples

* fix tests

* resolve PR comments, handle cases, improve docs

* nit: minor fix.

* fix: remove timeout logic from nats, expose timeout to user

* nit

---------

Co-authored-by: debajyoti-truefoundry <[email protected]>
  • Loading branch information
nikp1172 and debajyoti-truefoundry authored Aug 28, 2023
1 parent 099d44f commit 9033638
Show file tree
Hide file tree
Showing 11 changed files with 483 additions and 2 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
venv/
venv3/
.venv/
*.pyc
.pytest_cache/
Expand Down
2 changes: 2 additions & 0 deletions async_service/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from async_service.app import ProcessorApp
from async_service.function_service import FunctionAsyncExecutor
from async_service.processor import Processor
from async_service.types import (
AWSAccessKeyAuth,
Expand Down Expand Up @@ -31,5 +32,6 @@
"OutputConfig",
"Input",
"Output",
"FunctionAsyncExecutor",
"AWSAccessKeyAuth",
]
95 changes: 95 additions & 0 deletions async_service/function_service/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
# Deploying Functions as Async Service

## Install
```console
pip install "async_service[nats] @ git+https://github.com/truefoundry/async_service.git@main"
```

## Quick start

### Define the Sample Functions
### `sample_functions.py`
```python
from typing import Annotated
from pydantic import Field


def func1(first_name: str, last_name: str) -> str:
return first_name + last_name


def func2(
a: Annotated[int, Field(description="Enter first Number")] = 5,
b: Annotated[int, Field(description="Enter Second Number")] = 10,
) -> int:
return a + b
```

Note: Currently only the following types are supported as arguments - `int`, `str`, `float`, `list`, `dict`, `None`. In the future we plan to decouple the serialization/deserialization from queue publish/subscribe to support arbitrary types

### Create the FastAPI applications
### `main.py`
```python
from async_service import (
WorkerConfig,
FunctionAsyncExecutor,
NATSInputConfig,
NATSOutputConfig,
)
from sample_functions import func1, func2

# Define the function names and corresponding functions
functions = {"func_1": func1, "func_2": func2}

# Configure the deployment
async_func_deployment = FunctionAsyncExecutor(
functions=functions,
worker_config=WorkerConfig(
input_config=NATSInputConfig(
nats_url="<paste nats url here>",
root_subject="<paste root subject for work queue>",
consumer_name="<name of consumer>",
visibility_timeout=2,
),
output_config = NATSOutputConfig(
nats_url="<paste nats url here>",
root_subject="paste subject root for result queue",
)
),
)

# Build and configure the applications
server_app = async_func_deployment.build_async_server_app()
worker_app = async_func_deployment.build_worker_app()

# These two apps can now be run on different ports or different machines.
```

### Run the applications locally ot deploy them
```
uvicorn --host 0.0.0.0 --port 8000 main:server_app
uvicorn --host 0.0.0.0 --port 8001 main:wroker_app
```

### Send Request to the server_app

* Finally you can send request to your server_app at the deployed endpoint. (OpenAPI Docs are Auto-Generated)
* You can fetch the results from server_app at `/result/{request_id}` if the Output Config Queue supports fetching results per request_id. It is NOT supported for SQS but supported for NATS

To trigger the func_1 you can use the curl request
```
curl -X 'POST' \
'http://0.0.0.0:8000/func_1' \
-H 'Content-Type: application/json' \
-d '{
"first_name": "string",
"last_name": "string"
}'
```

This request returns in a format: `{"request_id" : "<some req id>"}`.
You can now send a request to check the result for this request_id [Only applicable if Output is NATS]

```
curl 'http://0.0.0.0:8000/result/<paste your request id here>'
```
5 changes: 5 additions & 0 deletions async_service/function_service/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from async_service.function_service.async_function_deployment import (
FunctionAsyncExecutor,
)

__all__ = ["FunctionAsyncExecutor"]
168 changes: 168 additions & 0 deletions async_service/function_service/async_function_deployment.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
import json
import os
from typing import Any, Callable, Dict

from fastapi import FastAPI, HTTPException

from async_service.function_service.utils import (
INTERNAL_FUNCTION_NAME,
AsyncOutputResponse,
async_wrapper_func,
get_functions_dict_with_input_signatures,
validate_function_name,
)
from async_service.processor import Processor
from async_service.types import (
InputMessage,
OutputMessage,
OutputMessageFetchTimeoutError,
WorkerConfig,
)

FUNCTION_SCHEMA_ENDPOINT = "/function-schemas"
RESULT_ENDPOINT = "/result/{request_id}"


class FunctionAsyncExecutor:
"""
A class for deploying and managing asynchronous functions with input and output configurations.
Args:
worker_config (WorkerConfig): Configuration for the worker behavior, including input and output config.
functions (Dict[str, Callable]): A dictionary of function names and corresponding callable functions.
init_function (Callable, optional): An initialization function called once before processing starts.
Methods:
build_async_server_app: Build and return an asynchronous server application for processing input.
build_worker_app: Build and return an asynchronous worker application for executing functions.
Usage Example:
from async_service import (
WorkerConfig,
FunctionAsyncExecutor,
SQSInputConfig,
SQSOutputConfig,
)
from your_package import func1, func2
# Define the function names and corresponding functions
functions = {"func_1": func1, "func_2": func2}
# Configure the deployment
async_func_deployment = FunctionAsyncExecutor(
functions=functions,
worker_config=WorkerConfig(
input_config=SQSInputConfig(
queue_url="<Paste SQS URL Here>",
visibility_timeout=10
),
output_config=SQSOutputConfig(
queue_url="<Paste SQS URL Here>",
)
)
)
# Build and configure the applications
server_app = async_func_deployment.build_async_server_app()
worker_app = async_func_deployment.build_worker_app()
# These two apps can now be run on different ports or different machines.
"""

def __init__(
self,
worker_config: WorkerConfig,
functions: Dict[str, Callable],
init_function: Callable = None,
) -> None:
self.functions = {}
if (
INTERNAL_FUNCTION_NAME in functions
or FUNCTION_SCHEMA_ENDPOINT.lstrip("/") in functions
or RESULT_ENDPOINT.split("/")[1] in functions
):
raise ValueError(
f"Function names {INTERNAL_FUNCTION_NAME}, {FUNCTION_SCHEMA_ENDPOINT.lstrip('/')} and RESULT_ENDPOINT.split('/')[1] are reserved for internal use."
)
for name in functions:
if not validate_function_name(name):
raise ValueError(
f"Function name {name} is not valid. Function names length must be less than 30 and not contain . or /"
)
self.functions[name] = functions[name]
self.worker_config = worker_config
self.init_function = init_function

def build_worker_app(self) -> FastAPI:
app = FastAPI(root_path=os.getenv("TFY_SERVICE_ROOT_PATH"), docs_url="/")

functions = self.functions
init_function = self.init_function

class FunctionProcessor(Processor):
def init(self):
if init_function:
init_function()

def process(self, input_message: InputMessage) -> int:
body = input_message.body
func_name = body.pop(INTERNAL_FUNCTION_NAME, None)
if func_name is None:
raise ValueError(
f"Input message does not contain {INTERNAL_FUNCTION_NAME} key."
)

func = functions[func_name]
return func(**body)

app = FunctionProcessor().build_app(worker_config=self.worker_config)
return app

def build_async_server_app(self) -> FastAPI:
app = FastAPI(root_path=os.getenv("TFY_SERVICE_ROOT_PATH"), docs_url="/")

app.add_api_route(
FUNCTION_SCHEMA_ENDPOINT,
lambda: get_functions_dict_with_input_signatures(self.functions),
methods=["GET"],
response_model=Dict[str, Dict[str, Any]],
)

input_publisher = self.worker_config.input_config.to_input()
output_subscriber = self.worker_config.output_config.to_output()

async def get_output(request_id: str, timeout: float = 2):
if timeout > 10:
raise HTTPException(
status_code=400, detail="Timeout must be less than 2 seconds"
)
try:
data = await output_subscriber.get_output_message(request_id, timeout)
return OutputMessage(**json.loads(data.decode("utf-8")))
except OutputMessageFetchTimeoutError as ex:
raise HTTPException(status_code=404, detail=str(ex))
except NotImplementedError as ex:
raise HTTPException(status_code=501, detail=str(ex))

app.add_api_route(
RESULT_ENDPOINT,
get_output,
methods=["GET"],
response_model=OutputMessage,
)

# check if all names are unique
func_names_list = [name.lower() for name in list(self.functions.keys())]
if len(func_names_list) != len(set(func_names_list)):
raise ValueError(
"Keys of functions dictionary (converted to lower case) must be unique."
)

for name, func in self.functions.items():
app.add_api_route(
f"/{name.lower()}",
async_wrapper_func(func, name, input_publisher),
methods=["POST"],
response_model=AsyncOutputResponse,
)
return app
99 changes: 99 additions & 0 deletions async_service/function_service/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
import inspect
import json
import re
import uuid
from typing import Any, Callable, Dict

import pydantic
from pydantic import BaseModel

from async_service.types import Input, InputMessage

INTERNAL_FUNCTION_NAME = "internal_func_name"


class AsyncOutputResponse(BaseModel):
request_id: str

class Config:
allow_extra = True


def create_pydantic_model_from_function_signature(func, model_name: str):
# https://github.com/pydantic/pydantic/issues/1391
(
args,
_,
varkw,
defaults,
kwonlyargs,
kwonlydefaults,
annotations,
) = inspect.getfullargspec(func)
defaults = defaults or []
args = args or []
if len(args) > 0 and args[0] == "self":
del args[0]

non_default_args = len(args) - len(defaults)
defaults = [
...,
] * non_default_args + list(defaults)

keyword_only_params = {
param: kwonlydefaults.get(param, Any) for param in kwonlyargs
}
params = {}
for param, default in zip(args, defaults):
params[param] = (annotations.get(param, Any), default)

class Config:
extra = "allow"

# Allow extra params if there is a **kwargs parameter in the function signature
config = Config if varkw else None

return pydantic.create_model(
model_name,
**params,
**keyword_only_params,
__base__=pydantic.BaseModel,
__config__=config,
)


def get_functions_dict_with_input_signatures(functions_dict: Dict[str, Callable]):
return {
name: create_pydantic_model_from_function_signature(func, name).schema()
for name, func in functions_dict.items()
}


async def send_request_to_queue(
request_id: str, input: BaseModel, input_publisher: Input
):
my_dict = input.dict()
my_dict[INTERNAL_FUNCTION_NAME] = input.__class__.__name__
input_message = InputMessage(request_id=request_id, body=my_dict)

await input_publisher.publish_input_message(
request_id=request_id,
serialized_output_message=json.dumps(input_message.dict()).encode("utf-8"),
)


def async_wrapper_func(func, name: str, output_publisher: Input):
async def wrapper(input: create_pydantic_model_from_function_signature(func, name)):
request_id = str(uuid.uuid4())
await send_request_to_queue(request_id, input, output_publisher)
return AsyncOutputResponse(request_id=request_id)

return wrapper


def validate_function_name(input_string):
pattern = r"^[^./\s]{1,30}$"
if re.match(pattern, input_string):
return True
else:
return False
Loading

0 comments on commit 9033638

Please sign in to comment.