Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/parser in ecs #3

Open
wants to merge 45 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
adad0a6
lambda functions updated with sentry
ranjan-stha Apr 27, 2022
423a13a
terraform configs updated with sentry_url passed as env variable; upd…
ranjan-stha Apr 27, 2022
8235f16
values updated for dev, staging, prod environments in their variable …
ranjan-stha Apr 27, 2022
150791b
added the missing package dependencies in terraform config;
ranjan-stha Apr 27, 2022
1e6b69e
updated the parser tool url;
ranjan-stha Apr 28, 2022
f168556
enabled stacktrace in sentry; handles image links(and discards them, …
ranjan-stha Apr 28, 2022
74e353a
enabled err stack;
ranjan-stha Apr 28, 2022
9e09b2a
updated the hashicorp aws version;
ranjan-stha Apr 28, 2022
5eb3862
terraform configs updated with sentry_url passed as env variable; upd…
ranjan-stha Apr 27, 2022
4f98fa5
enabled err stack;
ranjan-stha Apr 28, 2022
54e86e7
updated the hashicorp aws version;
ranjan-stha Apr 28, 2022
4274ac8
if content type is not determined with head requests, downloads the f…
ranjan-stha May 5, 2022
2ddbcdd
user agent added while sending get or head request;
ranjan-stha May 6, 2022
4623d60
doc conversion is done by uploading in s3 and triggering lambda with …
ranjan-stha Jul 27, 2022
db7b492
deploy deepex parser in the ecs;
ranjan-stha Aug 1, 2022
d73c26b
deepex parser dockerfile;
ranjan-stha Aug 4, 2022
2902c1f
reduce the number of privisioned lambda;
ranjan-stha Aug 4, 2022
8399454
try..except block used;
ranjan-stha Aug 5, 2022
1914f13
param values added for prod env
ranjan-stha Aug 5, 2022
4acc48a
config values for dev env;
ranjan-stha Sep 1, 2022
708b73a
code refactored in parser for text extraction;
ranjan-stha Sep 1, 2022
dc0fd27
env added;
ranjan-stha Sep 1, 2022
72da790
updated lambda concurrency for staging env;
ranjan-stha Sep 5, 2022
4b823a1
sentry url fetched from secrets manager;
ranjan-stha Sep 5, 2022
67e3324
sentry secret name assign;
ranjan-stha Sep 5, 2022
4dd5d45
updates to use cpu model for prediction;
ranjan-stha Oct 10, 2022
f89506a
fixes on env vars usages; refactored code;
ranjan-stha Oct 11, 2022
a3b9a04
removed unused dependency;
ranjan-stha Oct 11, 2022
5024243
updated env vars
ranjan-stha Oct 11, 2022
b6daeb3
moved env vars to a separate file;
ranjan-stha Oct 11, 2022
da2e6da
prod vars updated;
ranjan-stha Oct 13, 2022
b27963f
parsing lib updated; method updated;
ranjan-stha Dec 13, 2022
46532b2
tf vars updated on ecr url;
ranjan-stha Dec 13, 2022
80fe746
utf-8 added to content type while saving file in s3;
ranjan-stha Dec 13, 2022
1b1f94b
added vpc endpoint for transferring data to s3;
ranjan-stha Dec 23, 2022
86f2f8e
added variables for setting the lambda concurrency
ranjan-stha Dec 23, 2022
828bdb1
updated poetry lock file; added sentry pkg;
ranjan-stha Jan 4, 2023
b56e61b
updated the methods for parser library;
ranjan-stha Jan 4, 2023
35af4fa
ecs vcpu and memory reduced;
ranjan-stha Jan 5, 2023
f07893a
ecs vcpu and memory updated in prod;
ranjan-stha Jan 5, 2023
7e8ae4e
refactor and removed few non-required functions;
ranjan-stha Jan 20, 2023
6852eca
dockerfile updated;
ranjan-stha Jan 20, 2023
f612614
updated the poetry requirements;
ranjan-stha Jan 20, 2023
503dc59
updated the deep parser import;
ranjan-stha Jan 20, 2023
f13dbf4
updated the mock data related to preds, thresholds and selected tags;
ranjan-stha Nov 1, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/terraform-plan.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ on:
push:
branches:
- initial_setup
- feat/sentry-integration

jobs:
terraform:
Expand Down
22 changes: 21 additions & 1 deletion dev.tfvars
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ environment = "dev"
# api gateway
api_gateway_name = "rapi"
vpce_id = "vpce-02c7bb08b571074e1"
vpc_id = "vpc-0e65245d5e4c2deaf"

# models
model_endpoint_name = "test-all-models-rsh"
Expand All @@ -16,4 +17,23 @@ model_info_fn_name = "model_info"
docs_extract_fn_image_name = "extractor-tool"

# docs convert lambda
docs_convert_lambda_fn_name = "libreoffice-dev-libreoffice"
docs_convert_lambda_fn_name = "libreoffice-dev-libreoffice"

# docs convert bucket name
docs_convert_bucket_name = "deep-large-docs-conversion"

# sentry url
sentry_url = "https://[email protected]/1223576"

# VPC
az_count = 1
cidr_block = "172.16.0.0/16"

# ECS role
ecs_task_execution_role = "ECSTaskExecutionRole"

# ECS
fargate_cpu = "2048"
fargate_memory = "4096"
app_count = 1
app_image = "961104659532.dkr.ecr.us-east-1.amazonaws.com/deepex-parser"
64 changes: 28 additions & 36 deletions lambda_fns/entry_predict/app.py
Original file line number Diff line number Diff line change
@@ -1,31 +1,36 @@
import os
import boto3
from botocore.exceptions import ClientError
import json
from enum import Enum
import sentry_sdk
import logging
from concurrent.futures import ThreadPoolExecutor, as_completed

from postprocess_raw_preds import get_predictions_all, get_clean_thresholds, get_clean_ratios
from enum import Enum
from botocore.exceptions import ClientError
from concurrent.futures import ThreadPoolExecutor, as_completed
from postprocess_cpu_model_outputs import output_all_preds

from config import (
DEFAULT_AWS_REGION,
AWS_REGION,
PREDICTION_QUEUE_NAME,
MODEL_ENDPOINT_NAME,
GEOLOCATION_FN_NAME,
RELIABILITY_FN_NAME,
MODEL_INFO_FN_NAME,
SENTRY_URL,
ENVIRONMENT
)

logging.getLogger().setLevel(logging.INFO)

DEFAULT_AWS_REGION = "us-east-1"

AWS_REGION = os.environ.get("AWS_REGION", DEFAULT_AWS_REGION)
PREDICTION_QUEUE_NAME = os.environ.get("PREDICTION_QUEUE")
MODEL_ENDPOINT_NAME = os.environ.get("MODEL_ENDPOINT_NAME")
GEOLOCATION_FN_NAME = os.environ.get("GEOLOCATION_FN_NAME")
RELIABILITY_FN_NAME = os.environ.get("RELIABILITY_FN_NAME")
MODEL_INFO_FN_NAME = os.environ.get("MODEL_INFO_FN_NAME")

sqs_client = boto3.client('sqs', region_name=AWS_REGION)

sagemaker_rt = boto3.client("runtime.sagemaker", region_name="us-east-1") # todo: update the region later.
geolocation_client = boto3.client("lambda", region_name="us-east-1")
reliability_client = boto3.client("lambda", region_name="us-east-1")
sagemaker_rt = boto3.client("runtime.sagemaker", region_name=DEFAULT_AWS_REGION)
geolocation_client = boto3.client("lambda", region_name=DEFAULT_AWS_REGION)
reliability_client = boto3.client("lambda", region_name=DEFAULT_AWS_REGION)
model_info_client = boto3.client("lambda", region_name=AWS_REGION)

sentry_sdk.init(SENTRY_URL, environment=ENVIRONMENT, attach_stacktrace=True, traces_sample_rate=1.0)


class PredictionStatus(Enum):
FAILED = 0
Expand Down Expand Up @@ -144,27 +149,14 @@ def get_model_info():


def get_predictions(entry):
data = {
"columns": ["excerpt", "return_type"],
"index": [0],
"data": [[entry, "all_models"]]
}
try:
response = sagemaker_rt.invoke_endpoint(
EndpointName=MODEL_ENDPOINT_NAME,
ContentType="application/json; format=pandas-split",
Body=json.dumps(data)
)
pred_response = json.loads(response["Body"].read().decode("ascii"))
pred_tags = get_clean_ratios(pred_response['raw_predictions'])
pred_thresholds = get_clean_thresholds(pred_response['thresholds'])
tags_selected = get_predictions_all(pred_response['raw_predictions'])
pred_tags, pred_thresholds, tags_selected, error_status = output_all_preds(
entry,
MODEL_ENDPOINT_NAME
)
if not error_status:
prediction_status = PredictionStatus.SUCCESS.value
except ClientError as error:
logging.error(f"Error occurred while getting predictions: {error}")
pred_response = None
else:
prediction_status = PredictionStatus.FAILED.value

return pred_tags, pred_thresholds, tags_selected, str(prediction_status)


Expand Down
13 changes: 13 additions & 0 deletions lambda_fns/entry_predict/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import os

DEFAULT_AWS_REGION = "us-east-1"

AWS_REGION = os.environ.get("AWS_REGION", DEFAULT_AWS_REGION)
PREDICTION_QUEUE_NAME = os.environ.get("PREDICTION_QUEUE")
MODEL_ENDPOINT_NAME = os.environ.get("MODEL_ENDPOINT_NAME")
GEOLOCATION_FN_NAME = os.environ.get("GEOLOCATION_FN_NAME")
RELIABILITY_FN_NAME = os.environ.get("RELIABILITY_FN_NAME")
MODEL_INFO_FN_NAME = os.environ.get("MODEL_INFO_FN_NAME")

SENTRY_URL = os.environ.get("SENTRY_URL")
ENVIRONMENT = os.environ.get("ENVIRONMENT")
243 changes: 243 additions & 0 deletions lambda_fns/entry_predict/postprocess_cpu_model_outputs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,243 @@
import json
import boto3
import logging

from botocore.exceptions import ClientError
from ast import literal_eval
from typing import List, Dict
from collections import defaultdict


logging.getLogger().setLevel(logging.INFO)

pillars_1d_tags = [
"Covid-19",
"Casualties",
"Context",
"Displacement",
"Humanitarian Access",
"Shock/Event",
"Information And Communication",
]

pillars_2d_tags = [
"At Risk",
"Priority Interventions",
"Capacities & Response",
"Humanitarian Conditions",
"Impact",
"Priority Needs",
]

secondary_tags = [
"age",
"gender",
"affected_groups",
"specific_needs_groups",
"severity",
]


def get_preds_entry(
preds_column: Dict[str, float],
return_at_least_one=True,
ratio_nb=1,
return_only_one=False,
):
if return_only_one:
preds_entry = [
sub_tag
for sub_tag, ratio in preds_column.items()
if ratio == max(list(preds_column.values()))
]
else:
preds_entry = [
sub_tag for sub_tag, ratio in preds_column.items() if ratio > ratio_nb
]
if return_at_least_one and len(preds_entry) == 0:
preds_entry = [
sub_tag
for sub_tag, ratio in preds_column.items()
if ratio == max(list(preds_column.values()))
]
return preds_entry


def get_predictions_all(
ratios_entries: List[Dict[str, float]],
pillars_2d=pillars_2d_tags,
pillars_1d=pillars_1d_tags,
ratio_nb: int = 1,
):

predictions = defaultdict(list)
for ratio_proba_threshold_one_entry in ratios_entries:
returns_sectors = ratio_proba_threshold_one_entry["primary_tags"]["sectors"]

subpillars_2d_tags = ratio_proba_threshold_one_entry["primary_tags"][
"subpillars_2d"
]
subpillars_1d_tags = ratio_proba_threshold_one_entry["primary_tags"][
"subpillars_1d"
]

ratios_sectors_subpillars_2d = list(returns_sectors.values()) + list(
subpillars_2d_tags.values()
)

if any([item >= ratio_nb for item in ratios_sectors_subpillars_2d]):
preds_2d = get_preds_entry(subpillars_2d_tags, True, ratio_nb)
preds_sectors = get_preds_entry(returns_sectors, True, ratio_nb)

else:
preds_2d = []
preds_sectors = []

predictions["sectors"].append(preds_sectors)
predictions["subpillars_2d"].append(preds_2d)

preds_1d = get_preds_entry(subpillars_1d_tags, False, ratio_nb)
predictions["subpillars_1d"].append(preds_1d)

returns_sec_tags = ratio_proba_threshold_one_entry["secondary_tags"]

for secondary_tag in [
"age",
"gender",
"affected_groups",
"specific_needs_groups",
]:
preds_one_sec_tag = get_preds_entry(
returns_sec_tags[secondary_tag], False, ratio_nb
)

predictions[secondary_tag].append(preds_one_sec_tag)

severity_tags = returns_sec_tags["severity"]
if any(["Humanitarian Conditions" in item for item in preds_2d]):
preds_severity = get_preds_entry(severity_tags, True, ratio_nb, True)
else:
preds_severity = []
predictions["severity"].append(preds_severity)

return predictions


def flatten(t: List[List]) -> List:
return [item for sublist in t for item in sublist]


def convert_current_dict_to_previous_one(
ratios_one_entry: Dict[str, float]
) -> Dict[str, Dict[str, float]]:

# prim tags
primary_tags_results = {"sectors": {}, "subpillars_2d": {}, "subpillars_1d": {}}

# sec tags
secondary_tags_results = {
"age": {},
"gender": {},
"affected_groups": {},
"specific_needs_groups": {},
"severity": {},
}

for tag, number in ratios_one_entry.items():
tag_levels = tag.split("->")
if "subpillars" == tag_levels[0]:
assert tag_levels[1] in pillars_1d_tags or tag_levels[1] in pillars_2d_tags

if tag_levels[1] in pillars_1d_tags:
subpillar_name = "subpillars_1d"
else:
subpillar_name = "subpillars_2d"

primary_tags_results[subpillar_name]["->".join(tag_levels[1:])] = number

elif "secondary_tags" == tag_levels[0]:
assert tag_levels[1] in secondary_tags

secondary_tags_results[tag_levels[1]][tag_levels[2]] = number

else:
if "sectors" == tag_levels[1]:
primary_tags_results["sectors"][tag_levels[2]] = number

outputs = {
"primary_tags": primary_tags_results,
"secondary_tags": secondary_tags_results,
}

return outputs


def get_endpoint_probas(df, endpoint_name):
client = boto3.session.Session().client(
"sagemaker-runtime", region_name="us-east-1"
)

all_outputs = []
try:
response = client.invoke_endpoint(
EndpointName=endpoint_name,
Body=json.dumps(df),
ContentType="application/json; format=pandas-split",
)
except ClientError as error:
logging.error(f"Error occurred while getting predictions: {error}")
return []
all_outputs = response["Body"].read().decode("ascii")

return all_outputs

def output_all_preds(entry, endpoint_name):
data = {
"columns": ["excerpt", "return_type"],
"index": [0],
"data": [[entry, "default_analyis"]]
}

data["columns"].append("analyis_framework_id")
data["columns"].append("return_prediction_labels")
data["columns"].append("interpretability")
data["columns"].append("ratio_interpreted_labels")
data["columns"].append("attribution_type")
data["columns"].append("output_backbone_embeddings")
data["columns"].append("pooling_type")
data["columns"].append("finetuned_task")
data["columns"].append("embeddings_return_type")

data["data"][0].append("all")
data["data"][0].append(True)
data["data"][0].append(False)
data["data"][0].append(0.5)
data["data"][0].append("Layer DeepLift")
data["data"][0].append(False)
data["data"][0].append("['cls', 'mean_pooling']")
data["data"][0].append("['first_level_tags', 'secondary_tags', 'subpillars']")
data["data"][0].append("array")

endpoint_outputs = get_endpoint_probas(
df=data,
endpoint_name=endpoint_name
)
if endpoint_outputs:
eval_batch = literal_eval(endpoint_outputs)
output_ratios = eval_batch["raw_predictions"]

thresholds = eval_batch["thresholds"]

clean_thresholds = convert_current_dict_to_previous_one(thresholds)

clean_outputs = [
convert_current_dict_to_previous_one(one_entry_preds)
for one_entry_preds in output_ratios
]

final_predictions = get_predictions_all(clean_outputs)

return clean_outputs, clean_thresholds, dict(final_predictions), False

return [], {}, {}, True

3 changes: 2 additions & 1 deletion lambda_fns/entry_predict/requirements.txt
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
requests==2.26.0
requests==2.26.0
sentry-sdk==1.5.8
Loading