Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: create task #207

Open
wants to merge 25 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions deployment/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,23 @@ custom:
tesResources_backend_parameters:
- VmSize
- ParamToRecogniseDataComingFromConfig
taskmaster:
imageName: docker.io/elixircloud/tesk-core-taskmaster
imageVersion: v0.10.2
filerImageName: docker.io/elixircloud/tesk-core-filer
filerImageVersion: v0.10.2
ftp:
# Name of the secret with FTP account credentials
secretName: account-secret
# If FTP account enabled (based on non-emptiness of secretName)
enabled: true
# If verbose (debug) mode of taskmaster is on (passes additional flag to taskmaster and sets image pull policy to Always)
debug: false
# Environment variables, that will be passed to taskmaster
environment:
key: value
# Service Account name for taskmaster
serviceAccountName: taskmaster

# Logging configuration
# Cf. https://foca.readthedocs.io/en/latest/modules/foca.models.html#foca.models.config.LogConfig
Expand Down
16 changes: 13 additions & 3 deletions tesk/api/ga4gh/tes/controllers.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,14 @@
import logging

# from connexion import request # type: ignore
JaeAeich marked this conversation as resolved.
Show resolved Hide resolved
from typing import Any

from foca.utils.logging import log_traffic # type: ignore

from tesk.api.ga4gh.tes.models import TesTask
from tesk.api.ga4gh.tes.service_info.service_info import ServiceInfo
from tesk.api.ga4gh.tes.task.create_task import CreateTesTask
from tesk.exceptions import InternalServerError

# Get logger instance
logger = logging.getLogger(__name__)
Expand All @@ -26,14 +31,19 @@ def CancelTask(id, *args, **kwargs) -> dict: # type: ignore

# POST /tasks
@log_traffic
def CreateTask(*args, **kwargs) -> dict: # type: ignore
def CreateTask(**kwargs) -> dict: # type: ignore
"""Create task.

Args:
*args: Variable length argument list.
**kwargs: Arbitrary keyword arguments.
"""
pass
try:
request_body: Any = kwargs.get("body")
tes_task = TesTask(**request_body)
JaeAeich marked this conversation as resolved.
Show resolved Hide resolved
response = CreateTesTask(tes_task).response()
return response
JaeAeich marked this conversation as resolved.
Show resolved Hide resolved
except Exception as e:
JaeAeich marked this conversation as resolved.
Show resolved Hide resolved
raise InternalServerError from e


# GET /tasks/service-info
Expand Down
2 changes: 1 addition & 1 deletion tesk/api/ga4gh/tes/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ class TesResources(BaseModel):
example={"VmSize": "Standard_D64_v3"},
)
backend_parameters_strict: Optional[bool] = Field(
False,
default=False,
description="If set to true, backends should fail the task if any "
"backend_parameters\nkey/values are unsupported, otherwise, backends should "
"attempt to run the task",
Expand Down
1 change: 1 addition & 0 deletions tesk/api/ga4gh/tes/task/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""Task API controller logic."""
75 changes: 75 additions & 0 deletions tesk/api/ga4gh/tes/task/create_task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
"""TESK API module for creating a task."""

import logging

from tesk.api.ga4gh.tes.models import TesCreateTaskResponse, TesResources, TesTask
from tesk.api.ga4gh.tes.task.task_request import TesTaskRequest
from tesk.exceptions import KubernetesError

logger = logging.getLogger(__name__)


class CreateTesTask(TesTaskRequest):
"""Create TES task."""
JaeAeich marked this conversation as resolved.
Show resolved Hide resolved

def __init__(
self,
task: TesTask,
):
"""Initialize the CreateTask class.

Args:
task: TES task to create.
"""
super().__init__()
self.task = task

def handle_request(self) -> TesCreateTaskResponse:
"""Create TES task."""
attempts_no = 0
JaeAeich marked this conversation as resolved.
Show resolved Hide resolved
while (
attempts_no < self.tesk_k8s_constants.job_constants.JOB_CREATE_ATTEMPTS_NO
):
try:
attempts_no += 1
jemaltahir marked this conversation as resolved.
Show resolved Hide resolved
JaeAeich marked this conversation as resolved.
Show resolved Hide resolved
resources = self.task.resources
minimum_ram_gb = self.kubernetes_client_wrapper.minimum_ram_gb()

if not self.task.resources:
JaeAeich marked this conversation as resolved.
Show resolved Hide resolved
self.task.resources = TesResources(cpu_cores=int(minimum_ram_gb))
JaeAeich marked this conversation as resolved.
Show resolved Hide resolved
if resources and resources.ram_gb and resources.ram_gb < minimum_ram_gb:
self.task.resources.ram_gb = minimum_ram_gb
JaeAeich marked this conversation as resolved.
Show resolved Hide resolved

taskmaster_job = self.tes_kubernetes_converter.from_tes_task_to_k8s_job(
self.task,
)
taskmaster_config_map = (
self.tes_kubernetes_converter.from_tes_task_to_k8s_config_map(
self.task,
taskmaster_job,
)
)

_ = self.kubernetes_client_wrapper.create_config_map(
taskmaster_config_map
)
created_job = self.kubernetes_client_wrapper.create_job(taskmaster_job)

assert created_job.metadata is not None
assert created_job.metadata.name is not None
JaeAeich marked this conversation as resolved.
Show resolved Hide resolved

return TesCreateTaskResponse(id=created_job.metadata.name)

except KubernetesError as e:
if (
not e.is_object_name_duplicated()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find this much harder to read/parse than:

if (
    e.status != HTTPStatus.CONFLICT
    or ...
)

Why the function and the use of not?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, if your client wrapper already checked for CONFLICT/409 type errors and returned these as KubernetesConflict exceptions, you could design this more elegantly by letting the while loop conditional take care of managing the retries and then handle the "too many retries" situation below the loop with an explicit InternalServerError message. Then you could simplify this check to:

except KubernetesConflict:
    pass

All the other errors you can just let bubble up. I'm pretty sure FOCA will handle these gracefully (as mentioned below).

I think this would be considerably cleaner - and more informative.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh did you mean something like this.

    def handle_request(self) -> TesCreateTaskResponse:
        """Create TES task."""
        attempts_no: int = 0
        total_attempts_no: int = (
            self.tesk_k8s_constants.job_constants.JOB_CREATE_ATTEMPTS_NO
        )

        while attempts_no < total_attempts_no:
            try:
                logger.debug(
                    f"Creating K8s job, attempt no: {attempts_no}/{total_attempts_no}."
                )
                attempts_no += 1
               
                # more logic

                return TesCreateTaskResponse(id=created_job.metadata.name)

            except KubernetesError as e:
                if e.status == HTTPStatus.CONFLICT:
                    logger.debug(
                        "Conflict while creating Kubernetes job, retrying...",
                    )
                    pass

        raise KubernetesError(
            "Failed to create Kubernetes job after multiple attempts."
        )

or attempts_no
>= self.tesk_k8s_constants.job_constants.JOB_CREATE_ATTEMPTS_NO
):
raise e

except Exception as exc:
logging.error("ERROR: In createTask", exc_info=True)
raise exc
JaeAeich marked this conversation as resolved.
Show resolved Hide resolved

return TesCreateTaskResponse(id="") # To silence mypy, should never be reached
JaeAeich marked this conversation as resolved.
Show resolved Hide resolved
38 changes: 38 additions & 0 deletions tesk/api/ga4gh/tes/task/task_request.py
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it a base class for a TESK request (any request, including GET requests) or a task request (a POST requests to tasks/ in order to request a task being created). If the former, make sure to rename the module and class, if the latter, make sure to adapt the module-level docstring.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is the former, can you please suggest a name, I can't think of a reason or name to do so. I name it task request so that this base class can hold all the attr that might be common among endpoint that deal with "task request" and create a common interface to interacting with api programatically.

If we extend more endpoint sooner or later (say create serviceInfo), then I would propose to even create a base class for this class named request or something, just so that programatically, business logic is forced to exit via the same method say response.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that second part is where I would see this going then. Maybe even ending up in FOCA.

I mean, if we already have 21 changed files (not including tests) for the addition of a single controller, we might as well go all the way, right? 😛

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JaeAeich: Why is it that with your PRs I often see the entire file changed, even though it's just an iteration over the last time I've viewed? I don't have this with other people, so I guess it's something with your editor or Git flow.

Please have a look at this, because it's really quite annoying. Instead of just focusing on what changed since the last time, I have to look through the entire file again - which is not only not fun, but also holds up the reviews big time, especially when they tend to end up being huge.

And even apart from that, it's also really not good practice in terms of provenance. If this were previously existing code (e.g., maybe some of the old TES code from TES-core still remains), you'd end up being listed as git blame for every line, taking credit and blame for other people's work.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I agree, but I can assure you am not going it on purpose 😭, the thing is TESK code is complex and if I try to break the code base it makes no sense and is very hard to connect the dots. Also I don't think I have any issues in my git flow, my configs seem to be sound. I am trying not to step and cover up someones code but if you notice we most of the files are completely new and not a modification of prev (well there weren't prev files to speack of as I am mostly working on new module api mostly and service isn't touched).

Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
"""Base class for tesk request."""

import json
import logging
from abc import ABC, abstractmethod

from pydantic import BaseModel

from tesk.k8s.constants import tesk_k8s_constants
from tesk.k8s.converter.converter import TesKubernetesConverter
from tesk.k8s.wrapper import KubernetesClientWrapper

logger = logging.getLogger(__name__)


class TesTaskRequest(ABC):
"""Base class for tesk request ecapsulating common methods and members."""
JaeAeich marked this conversation as resolved.
Show resolved Hide resolved

def __init__(self):
"""Initialise base class for tesk request."""
self.kubernetes_client_wrapper = KubernetesClientWrapper()
self.tes_kubernetes_converter = TesKubernetesConverter()
self.tesk_k8s_constants = tesk_k8s_constants

@abstractmethod
def handle_request(self) -> BaseModel:
"""Business logic for the request."""
pass

def response(self) -> dict:
"""Get response for the request."""
response: BaseModel = self.handle_request()
try:
res: dict = json.loads(json.dumps(response))
return res
except (TypeError, ValueError) as e:
logger.info(e)
Comment on lines +40 to +44
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the significance of this? Why not going straight for the Pydantic way of marshalling the model?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried, for some reason I couldn't, IDK why! And this weird way only worked.

return response.dict()
73 changes: 72 additions & 1 deletion tesk/custom_config.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,83 @@
"""Custom configuration model for the FOCA app."""

from pydantic import BaseModel
from typing import Dict, Optional

from pydantic import BaseModel, Field

from tesk.api.ga4gh.tes.models import Service
from tesk.constants import tesk_constants


class FtpConfig(BaseModel):
"""Ftp configuration model for the TESK."""

secretName: Optional[str] = Field(
default=None, description="Name of the secret with FTP account credentials"
)
enabled: bool = Field(
default=False,
description="If FTP account enabled (based on non-emptiness of secretName)",
)


class ExecutorSecret(BaseModel):
"""Executor secret configuration."""

name: Optional[str] = Field(
default=None,
description=(
"Name of a secret that will be mounted as volume to each executor. The same"
" name will be used for the secret and the volume"
),
)
mountPath: Optional[str] = Field(
default=None,
alias="mountPath",
description="The path where the secret will be mounted to executors",
)
enabled: bool = Field(
default=False, description="Indicates whether the secret is enabled"
)


class Taskmaster(BaseModel):
"""Taskmaster environment properties model for the TESK."""

imageName: str = Field(
default=tesk_constants.TASKMASTER_IMAGE_NAME,
description="Taskmaster image name",
)
imageVersion: str = Field(
default=tesk_constants.TASKMASTER_IMAGE_VERSION,
description="Taskmaster image version",
)
filerImageName: str = Field(
default=tesk_constants.FILER_IMAGE_NAME, description="Filer image name"
)
filerImageVersion: str = Field(
default=tesk_constants.FILER_IMAGE_VERSION, description="Filer image version"
)
ftp: FtpConfig = Field(default=None, description="Test FTP account settings")
debug: bool = Field(
default=False,
description="If verbose (debug) mode of taskmaster is on (passes additional "
"flag to taskmaster and sets image pull policy to Always)",
)
environment: Optional[Dict[str, str]] = Field(
default=None,
description="Environment variables, that will be passed to taskmaster",
)
serviceAccountName: str = Field(
default="default", description="Service Account name for taskmaster"
)
executorSecret: Optional[ExecutorSecret] = Field(
default=None, description="Executor secret configuration"
)


class CustomConfig(BaseModel):
"""Custom configuration model for the FOCA app."""

# Define custom configuration fields here
service_info: Service
taskmaster: Taskmaster
6 changes: 6 additions & 0 deletions tesk/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
"""App exceptions."""

from http import HTTPStatus

from connexion.exceptions import (
BadRequestProblem,
ExtraParameterProblem,
Expand All @@ -22,6 +24,10 @@ class ConfigNotFoundError(FileNotFoundError):
class KubernetesError(ApiException):
"""Kubernetes error."""
JaeAeich marked this conversation as resolved.
Show resolved Hide resolved

def is_object_name_duplicated(self) -> bool:
"""Check if object name is duplicated."""
return self.status == HTTPStatus.CONFLICT


# exceptions raised in app context
exceptions = {
Expand Down
1 change: 1 addition & 0 deletions tesk/k8s/converter/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""Module for converting Kubernetes objects to Task objects."""
Loading