From 06602d1c2c382937be84d53277de8bbc0bd2bd12 Mon Sep 17 00:00:00 2001 From: Javed Habib Date: Thu, 5 Sep 2024 16:09:05 +0530 Subject: [PATCH 01/19] feat: get taskmaster values from config --- deployment/config.yaml | 17 ++++++ tesk/custom_config.py | 73 ++++++++++++++++++++++- tesk/utils.py | 127 +++++++++++++++++++++++++++++++++++++++++ 3 files changed, 216 insertions(+), 1 deletion(-) diff --git a/deployment/config.yaml b/deployment/config.yaml index 0006f11..aea3f70 100644 --- a/deployment/config.yaml +++ b/deployment/config.yaml @@ -72,6 +72,23 @@ custom: tesResources_backend_parameters: - VmSize - ParamToRecogniseDataComingFromConfig + taskmaster: + imageName: docker.io/elixircloud/tesk-core-taskmaster + imageVersion: v0.10.2 + filerImageName: docker.io/elixircloud/tesk-core-filer + filerImageVersion: v0.10.2 + ftp: + # Name of the secret with FTP account credentials + secretName: account-secret + # If FTP account enabled (based on non-emptiness of secretName) + enabled: true + # If verbose (debug) mode of taskmaster is on (passes additional flag to taskmaster and sets image pull policy to Always) + debug: false + # Environment variables, that will be passed to taskmaster + environment: + key: value + # Service Account name for taskmaster + serviceAccountName: taskmaster # Logging configuration # Cf. https://foca.readthedocs.io/en/latest/modules/foca.models.html#foca.models.config.LogConfig diff --git a/tesk/custom_config.py b/tesk/custom_config.py index ae1cbaf..7acab14 100644 --- a/tesk/custom_config.py +++ b/tesk/custom_config.py @@ -1,8 +1,78 @@ """Custom configuration model for the FOCA app.""" -from pydantic import BaseModel +from typing import Dict, Optional + +from pydantic import BaseModel, Field from tesk.api.ga4gh.tes.models import Service +from tesk.constants import tesk_constants + + +class FtpConfig(BaseModel): + """Ftp configuration model for the TESK.""" + + secretName: Optional[str] = Field( + default=None, description="Name of the secret with FTP account credentials" + ) + enabled: bool = Field( + default=False, + description="If FTP account enabled (based on non-emptiness of secretName)", + ) + + +class ExecutorSecret(BaseModel): + """Executor secret configuration.""" + + name: Optional[str] = Field( + default=None, + description=( + "Name of a secret that will be mounted as volume to each executor. The same" + " name will be used for the secret and the volume" + ), + ) + mountPath: Optional[str] = Field( + default=None, + alias="mountPath", + description="The path where the secret will be mounted to executors", + ) + enabled: bool = Field( + default=False, description="Indicates whether the secret is enabled" + ) + + +class Taskmaster(BaseModel): + """Taskmaster environment properties model for the TESK.""" + + imageName: str = Field( + default=tesk_constants.TASKMASTER_IMAGE_NAME, + description="Taskmaster image name", + ) + imageVersion: str = Field( + default=tesk_constants.TASKMASTER_IMAGE_VERSION, + description="Taskmaster image version", + ) + filerImageName: str = Field( + default=tesk_constants.FILER_IMAGE_NAME, description="Filer image name" + ) + filerImageVersion: str = Field( + default=tesk_constants.FILER_IMAGE_VERSION, description="Filer image version" + ) + ftp: FtpConfig = Field(default=None, description="Test FTP account settings") + debug: bool = Field( + default=False, + description="If verbose (debug) mode of taskmaster is on (passes additional " + "flag to taskmaster and sets image pull policy to Always)", + ) + environment: Optional[Dict[str, str]] = Field( + default=None, + description="Environment variables, that will be passed to taskmaster", + ) + serviceAccountName: str = Field( + default="default", description="Service Account name for taskmaster" + ) + executorSecret: Optional[ExecutorSecret] = Field( + default=None, description="Executor secret configuration" + ) class CustomConfig(BaseModel): @@ -10,3 +80,4 @@ class CustomConfig(BaseModel): # Define custom configuration fields here service_info: Service + taskmaster: Taskmaster diff --git a/tesk/utils.py b/tesk/utils.py index 214ffd6..cdbbb7f 100644 --- a/tesk/utils.py +++ b/tesk/utils.py @@ -3,6 +3,28 @@ import os from pathlib import Path +from foca import Foca +from kubernetes.client.models import ( + V1Container, + V1EnvVar, + V1EnvVarSource, + V1Job, + V1JobSpec, + V1ObjectMeta, + V1PodSpec, + V1PodTemplateSpec, + V1SecretKeySelector, + V1VolumeMount, +) + +from tesk.constants import TeskConstants +from tesk.custom_config import ( + CustomConfig, + TaskmasterEnvProperties, +) +from tesk.exceptions import ConfigNotFoundError +from tesk.k8s.constants import TeskK8sConstants + def get_config_path() -> Path: """Get the configuration path. @@ -15,3 +37,108 @@ def get_config_path() -> Path: return Path(config_path_env).resolve() else: return (Path(__file__).parents[1] / "deployment" / "config.yaml").resolve() + + +def get_custom_config() -> CustomConfig: + """Get the custom configuration. + + Returns: + The custom configuration. + """ + conf = Foca(config_file=get_config_path()).conf + try: + return CustomConfig(**conf.custom) + except AttributeError: + raise ConfigNotFoundError( + "Custom configuration not found in config file." + ) from None + + +def get_taskmaster_template() -> V1Job: + """Get the taskmaster template from the custom configuration. + + Returns: + The taskmaster template. + """ + job = V1Job( + api_version="batch/v1", + kind="Job", + metadata=V1ObjectMeta( + name=TeskK8sConstants.label_constants.LABEL_JOBTYPE_VALUE_TASKM, + labels={"app": TeskK8sConstants.label_constants.LABEL_JOBTYPE_VALUE_TASKM}, + ), + spec=V1JobSpec( + template=V1PodTemplateSpec( + metadata=V1ObjectMeta( + name=TeskK8sConstants.label_constants.LABEL_JOBTYPE_VALUE_TASKM + ), + spec=V1PodSpec( + service_account_name="default", + containers=[ + V1Container( + name=TeskK8sConstants.label_constants.LABEL_JOBTYPE_VALUE_TASKM, + image=f"{TeskConstants.TASKMASTER_IMAGE_NAME}:{TeskConstants.TASKMASTER_IMAGE_VERSION}", + args=[ + "-f", + f"/jsoninput/{TeskK8sConstants.job_constants.TASKMASTER_INPUT}.gz", + ], + env=[ + V1EnvVar( + name=TeskK8sConstants.ftp_constants.FTP_SECRET_USERNAME_ENV, + value_from=V1EnvVarSource( + secret_key_ref=V1SecretKeySelector( + name="ftp-secret", + key="username", + optional=True, + ) + ), + ), + V1EnvVar( + name=TeskK8sConstants.ftp_constants.FTP_SECRET_PASSWORD_ENV, + value_from=V1EnvVarSource( + secret_key_ref=V1SecretKeySelector( + name="ftp-secret", + key="password", + optional=True, + ) + ), + ), + ], + volume_mounts=[ + V1VolumeMount( + name="podinfo", + mount_path="/podinfo", + read_only=True, + ), + V1VolumeMount( + name="jsoninput", + mount_path="/jsoninput", + read_only=True, + ), + ], + ) + ], + volumes=[], + restart_policy=TeskK8sConstants.k8s_constants.JOB_RESTART_POLICY, + ), + ) + ), + ) + return job + + +def get_taskmaster_env_property() -> TaskmasterEnvProperties: + """Get the taskmaster env property from the custom configuration. + + Returns: + The taskmaster env property. + """ + custom_conf = get_custom_config() + try: + return custom_conf.taskmaster_env_properties + except AttributeError: + raise ConfigNotFoundError( + "Custom configuration doesn't seem to have taskmaster_env_properties in " + "config file." + f"Custom config:\n{custom_conf}" + ) from None From b5a917d259659b10ae044684ba7e1002fd2ee2b3 Mon Sep 17 00:00:00 2001 From: Javed Habib Date: Thu, 5 Sep 2024 23:41:21 +0530 Subject: [PATCH 02/19] feat: create task --- tesk/api/ga4gh/tes/controllers.py | 16 +- tesk/api/ga4gh/tes/models.py | 2 +- tesk/api/ga4gh/tes/task/__init__.py | 1 + tesk/api/ga4gh/tes/task/create_task.py | 75 +++++ tesk/api/ga4gh/tes/task/task_request.py | 38 +++ tesk/exceptions.py | 6 + tesk/k8s/converter/__init__.py | 1 + tesk/k8s/converter/converter.py | 312 ++++++++++++++++++ tesk/k8s/converter/data/__init__.py | 1 + tesk/k8s/converter/data/job.py | 80 +++++ tesk/k8s/converter/data/task.py | 98 ++++++ .../k8s/converter/executor_command_wrapper.py | 54 +++ tesk/k8s/converter/template.py | 192 +++++++++++ tesk/utils.py | 67 +++- 14 files changed, 924 insertions(+), 19 deletions(-) create mode 100644 tesk/api/ga4gh/tes/task/__init__.py create mode 100644 tesk/api/ga4gh/tes/task/create_task.py create mode 100644 tesk/api/ga4gh/tes/task/task_request.py create mode 100644 tesk/k8s/converter/__init__.py create mode 100644 tesk/k8s/converter/converter.py create mode 100644 tesk/k8s/converter/data/__init__.py create mode 100644 tesk/k8s/converter/data/job.py create mode 100644 tesk/k8s/converter/data/task.py create mode 100644 tesk/k8s/converter/executor_command_wrapper.py create mode 100644 tesk/k8s/converter/template.py diff --git a/tesk/api/ga4gh/tes/controllers.py b/tesk/api/ga4gh/tes/controllers.py index 466053d..653d3b3 100644 --- a/tesk/api/ga4gh/tes/controllers.py +++ b/tesk/api/ga4gh/tes/controllers.py @@ -3,9 +3,14 @@ import logging # from connexion import request # type: ignore +from typing import Any + from foca.utils.logging import log_traffic # type: ignore +from tesk.api.ga4gh.tes.models import TesTask from tesk.api.ga4gh.tes.service_info.service_info import ServiceInfo +from tesk.api.ga4gh.tes.task.create_task import CreateTesTask +from tesk.exceptions import InternalServerError # Get logger instance logger = logging.getLogger(__name__) @@ -26,14 +31,19 @@ def CancelTask(id, *args, **kwargs) -> dict: # type: ignore # POST /tasks @log_traffic -def CreateTask(*args, **kwargs) -> dict: # type: ignore +def CreateTask(**kwargs) -> dict: # type: ignore """Create task. Args: - *args: Variable length argument list. **kwargs: Arbitrary keyword arguments. """ - pass + try: + request_body: Any = kwargs.get("body") + tes_task = TesTask(**request_body) + response = CreateTesTask(tes_task).response() + return response + except Exception as e: + raise InternalServerError from e # GET /tasks/service-info diff --git a/tesk/api/ga4gh/tes/models.py b/tesk/api/ga4gh/tes/models.py index c442e1e..54bb6ba 100644 --- a/tesk/api/ga4gh/tes/models.py +++ b/tesk/api/ga4gh/tes/models.py @@ -275,7 +275,7 @@ class TesResources(BaseModel): example={"VmSize": "Standard_D64_v3"}, ) backend_parameters_strict: Optional[bool] = Field( - False, + default=False, description="If set to true, backends should fail the task if any " "backend_parameters\nkey/values are unsupported, otherwise, backends should " "attempt to run the task", diff --git a/tesk/api/ga4gh/tes/task/__init__.py b/tesk/api/ga4gh/tes/task/__init__.py new file mode 100644 index 0000000..b8e71d8 --- /dev/null +++ b/tesk/api/ga4gh/tes/task/__init__.py @@ -0,0 +1 @@ +"""Task API controller logic.""" diff --git a/tesk/api/ga4gh/tes/task/create_task.py b/tesk/api/ga4gh/tes/task/create_task.py new file mode 100644 index 0000000..17568d4 --- /dev/null +++ b/tesk/api/ga4gh/tes/task/create_task.py @@ -0,0 +1,75 @@ +"""TESK API module for creating a task.""" + +import logging + +from tesk.api.ga4gh.tes.models import TesCreateTaskResponse, TesResources, TesTask +from tesk.api.ga4gh.tes.task.task_request import TesTaskRequest +from tesk.exceptions import KubernetesError + +logger = logging.getLogger(__name__) + + +class CreateTesTask(TesTaskRequest): + """Create TES task.""" + + def __init__( + self, + task: TesTask, + ): + """Initialize the CreateTask class. + + Args: + task: TES task to create. + """ + super().__init__() + self.task = task + + def handle_request(self) -> TesCreateTaskResponse: + """Create TES task.""" + attempts_no = 0 + while ( + attempts_no < self.tesk_k8s_constants.job_constants.JOB_CREATE_ATTEMPTS_NO + ): + try: + attempts_no += 1 + resources = self.task.resources + minimum_ram_gb = self.kubernetes_client_wrapper.minimum_ram_gb() + + if not self.task.resources: + self.task.resources = TesResources(cpu_cores=int(minimum_ram_gb)) + if resources and resources.ram_gb and resources.ram_gb < minimum_ram_gb: + self.task.resources.ram_gb = minimum_ram_gb + + taskmaster_job = self.tes_kubernetes_converter.from_tes_task_to_k8s_job( + self.task, + ) + taskmaster_config_map = ( + self.tes_kubernetes_converter.from_tes_task_to_k8s_config_map( + self.task, + taskmaster_job, + ) + ) + + _ = self.kubernetes_client_wrapper.create_config_map( + taskmaster_config_map + ) + created_job = self.kubernetes_client_wrapper.create_job(taskmaster_job) + + assert created_job.metadata is not None + assert created_job.metadata.name is not None + + return TesCreateTaskResponse(id=created_job.metadata.name) + + except KubernetesError as e: + if ( + not e.is_object_name_duplicated() + or attempts_no + >= self.tesk_k8s_constants.job_constants.JOB_CREATE_ATTEMPTS_NO + ): + raise e + + except Exception as exc: + logging.error("ERROR: In createTask", exc_info=True) + raise exc + + return TesCreateTaskResponse(id="") # To silence mypy, should never be reached diff --git a/tesk/api/ga4gh/tes/task/task_request.py b/tesk/api/ga4gh/tes/task/task_request.py new file mode 100644 index 0000000..7417a03 --- /dev/null +++ b/tesk/api/ga4gh/tes/task/task_request.py @@ -0,0 +1,38 @@ +"""Base class for tesk request.""" + +import json +import logging +from abc import ABC, abstractmethod + +from pydantic import BaseModel + +from tesk.k8s.constants import tesk_k8s_constants +from tesk.k8s.converter.converter import TesKubernetesConverter +from tesk.k8s.wrapper import KubernetesClientWrapper + +logger = logging.getLogger(__name__) + + +class TesTaskRequest(ABC): + """Base class for tesk request ecapsulating common methods and members.""" + + def __init__(self): + """Initialise base class for tesk request.""" + self.kubernetes_client_wrapper = KubernetesClientWrapper() + self.tes_kubernetes_converter = TesKubernetesConverter() + self.tesk_k8s_constants = tesk_k8s_constants + + @abstractmethod + def handle_request(self) -> BaseModel: + """Business logic for the request.""" + pass + + def response(self) -> dict: + """Get response for the request.""" + response: BaseModel = self.handle_request() + try: + res: dict = json.loads(json.dumps(response)) + return res + except (TypeError, ValueError) as e: + logger.info(e) + return response.dict() diff --git a/tesk/exceptions.py b/tesk/exceptions.py index d0aa727..fbe6f69 100644 --- a/tesk/exceptions.py +++ b/tesk/exceptions.py @@ -1,5 +1,7 @@ """App exceptions.""" +from http import HTTPStatus + from connexion.exceptions import ( BadRequestProblem, ExtraParameterProblem, @@ -22,6 +24,10 @@ class ConfigNotFoundError(FileNotFoundError): class KubernetesError(ApiException): """Kubernetes error.""" + def is_object_name_duplicated(self) -> bool: + """Check if object name is duplicated.""" + return self.status == HTTPStatus.CONFLICT + # exceptions raised in app context exceptions = { diff --git a/tesk/k8s/converter/__init__.py b/tesk/k8s/converter/__init__.py new file mode 100644 index 0000000..33ac4cc --- /dev/null +++ b/tesk/k8s/converter/__init__.py @@ -0,0 +1 @@ +"""Module for converting Kubernetes objects to Task objects.""" diff --git a/tesk/k8s/converter/converter.py b/tesk/k8s/converter/converter.py new file mode 100644 index 0000000..d3ca901 --- /dev/null +++ b/tesk/k8s/converter/converter.py @@ -0,0 +1,312 @@ +"""Module for converting TES tasks to Kubernetes jobs.""" + +import base64 +import gzip +import json +import logging +from decimal import Decimal +from enum import Enum +from io import BytesIO +from typing import Any, Optional + +from kubernetes.client import ( + V1ConfigMap, + V1ConfigMapVolumeSource, + V1Container, + V1EnvVar, + V1JobSpec, + V1ObjectMeta, + V1PodSpec, + V1PodTemplateSpec, + V1ResourceRequirements, + V1Volume, +) +from kubernetes.client.models import V1Job +from kubernetes.utils.quantity import parse_quantity # type: ignore + +from tesk.api.ga4gh.tes.models import ( + TesExecutor, + TesResources, + TesTask, +) +from tesk.custom_config import Taskmaster +from tesk.k8s.constants import tesk_k8s_constants +from tesk.k8s.converter.data.job import Job +from tesk.k8s.converter.data.task import Task +from tesk.k8s.converter.executor_command_wrapper import ExecutorCommandWrapper +from tesk.k8s.converter.template import KubernetesTemplateSupplier +from tesk.k8s.wrapper import KubernetesClientWrapper +from tesk.utils import ( + get_taskmaster_env_property, + pydantic_model_list_dict, +) + +logger = logging.getLogger(__name__) + + +class TesKubernetesConverter: + """Convert TES requests to Kubernetes resources.""" + + def __init__(self): + """Initialize the converter.""" + self.taskmaster_env_properties: Taskmaster = get_taskmaster_env_property() + self.template_supplier = KubernetesTemplateSupplier() + self.tesk_k8s_constants = tesk_k8s_constants + self.kubernetes_client_wrapper = KubernetesClientWrapper() + + def from_tes_task_to_k8s_job(self, task: TesTask): + """Convert TES task to Kubernetes job.""" + taskmaster_job: V1Job = ( + self.template_supplier.get_taskmaster_template_with_value_from_config() + ) + + if taskmaster_job.metadata is None: + taskmaster_job.metadata = V1ObjectMeta() + + if taskmaster_job.metadata.annotations is None: + taskmaster_job.metadata.annotations = {} + + if taskmaster_job.metadata.labels is None: + taskmaster_job.metadata.labels = {} + + if task.name: + taskmaster_job.metadata.annotations[ + self.tesk_k8s_constants.annotation_constants.ANN_TESTASK_NAME_KEY + ] = task.name + # taskmaster_job.metadata.labels[self.constants.label_userid_key] = user[ + # "username" + # ] + + # if task.tags and "GROUP_NAME" in task.tags: + # taskmaster_job.metadata.labels[self.constants.label_userid_key] = task[ + # "tags" + # ]["GROUP_NAME"] + # elif user["is_member"]: + # taskmaster_job.metadata.labels[self.constants.label_groupname_key] = user[ + # "any_group" + # ] + + json_input = json.dumps( + task.dict(), + indent=2, + default=lambda enum: str(enum.name) if isinstance(enum, Enum) else None, + ) + + try: + taskmaster_job.metadata.annotations[ + self.tesk_k8s_constants.annotation_constants.ANN_JSON_INPUT_KEY + ] = json_input + except Exception as ex: + logger.info( + f"Serializing task {taskmaster_job.metadata.name} to JSON failed", ex + ) + + volume = V1Volume( + name="jsoninput", + config_map=V1ConfigMapVolumeSource(name=taskmaster_job.metadata.name), + ) + + if taskmaster_job.spec is None: + taskmaster_job.spec = V1JobSpec(template=V1PodTemplateSpec()) + if taskmaster_job.spec.template.spec is None: + taskmaster_job.spec.template.spec = V1PodSpec(containers=[]) + if taskmaster_job.spec.template.spec.volumes is None: + taskmaster_job.spec.template.spec.volumes = [] + + taskmaster_job.spec.template.spec.volumes.append(volume) + return taskmaster_job + + def from_tes_task_to_k8s_config_map( + self, + task: TesTask, + taskmaster_job: V1Job, + ) -> V1ConfigMap: + """Create a Kubernetes ConfigMap from a TES task.""" + assert taskmaster_job.metadata is not None, ( + "Taskmaster job metadata should have already been set while create" + " taskmaster!" + ) + + taskmaster_config_map = V1ConfigMap( + metadata=V1ObjectMeta(name=taskmaster_job.metadata.name) + ) + + assert ( + taskmaster_config_map.metadata is not None + ), "Taskmaster metadata is should have already been set!" + + if taskmaster_config_map.metadata.labels is None: + taskmaster_config_map.metadata.labels = {} + + if taskmaster_config_map.metadata.annotations is None: + taskmaster_config_map.metadata.annotations = {} + + # FIXME: What if the task name is None? + task_name = task.name or "task-name-not-set" + + taskmaster_config_map.metadata.annotations[ + self.tesk_k8s_constants.annotation_constants.ANN_TESTASK_NAME_KEY + ] = task_name + + # taskmaster_config_map.metadata.labels[self.constants.label_userid_key] + # = user["username"] + + if task.tags and "GROUP_NAME" in task.tags: + taskmaster_config_map.metadata.labels[ + self.tesk_k8s_constants.label_constants.LABEL_GROUPNAME_KEY + ] = task.tags["GROUP_NAME"] + # elif user["is_member"]: + # taskmaster_config_map.metadata.labels[self.constants.label_groupname_key] + # = user["any_group"] + + assert taskmaster_config_map.metadata.name is not None + assert task.resources is not None + + executors_as_jobs = [ + self.from_tes_executor_to_k8s_job( + generated_task_id=taskmaster_config_map.metadata.name, + tes_task_name=task_name, + executor=executor, + executor_index=idx, + resources=task.resources, + ) + for idx, executor in enumerate(task.executors) + ] + + taskmaster_input: dict[str, Any] = { + "inputs": pydantic_model_list_dict(task.inputs) if task.inputs else [], + "outputs": pydantic_model_list_dict(task.outputs) if task.outputs else [], + "volumes": task.volumes or [], + "resources": { + "disk_gb": float(task.resources.disk_gb) + if task.resources.disk_gb + else 10.0 + }, + } + taskmaster_input[ + self.tesk_k8s_constants.job_constants.TASKMASTER_INPUT_EXEC_KEY + ] = [exec_job.to_dict() for exec_job in executors_as_jobs] + + taskmaster_input_as_json = json.loads( + json.dumps( + taskmaster_input, + default=lambda obj: float(obj) + if isinstance(obj, Decimal) + else TypeError, + ) + ) + + try: + with BytesIO() as obj: + with gzip.GzipFile(fileobj=obj, mode="wb") as gzip_file: + json_data = json.dumps(taskmaster_input_as_json) + gzip_file.write(json_data.encode("utf-8")) + taskmaster_config_map.binary_data = { + f"{self.tesk_k8s_constants.job_constants.TASKMASTER_INPUT}.gz": base64.b64encode( # noqa: E501 + obj.getvalue() + ).decode("utf-8") + } + except Exception as e: + logger.info( + ( + f"Compression of task {taskmaster_config_map.metadata.name}" + f" JSON configmap failed" + ), + e, + ) + + return taskmaster_config_map + + def from_tes_executor_to_k8s_job( # noqa: PLR0913, PLR0912 + self, + generated_task_id: str, + tes_task_name: Optional[str], + executor: TesExecutor, + executor_index: int, + resources: TesResources, + ) -> V1Job: + """Create a Kubernetes job from a TES executor.""" + # Get new template executor Job object + executor_job: V1Job = ( + self.template_supplier.get_executor_template_with_value_from_config() + ) + + # Set executors name based on taskmaster's job name + Job(executor_job).change_job_name( + Task(taskmaster_name=generated_task_id).get_executor_name(executor_index) + ) + + if executor_job.metadata is None: + executor_job.metadata = V1ObjectMeta() + + # Put arbitrary labels and annotations + executor_job.metadata.labels = executor_job.metadata.labels or {} + executor_job.metadata.labels[ + self.tesk_k8s_constants.label_constants.LABEL_TESTASK_ID_KEY + ] = generated_task_id + executor_job.metadata.labels[ + self.tesk_k8s_constants.label_constants.LABEL_EXECNO_KEY + ] = str(executor_index) + # job.metadata.labels[self.constants.label_userid_key] = user.username + + if executor_job.metadata is None: + executor_job.metadata = V1ObjectMeta() + if executor_job.metadata.annotations is None: + executor_job.metadata.annotations = {} + + if tes_task_name: + executor_job.metadata.annotations[ + self.tesk_k8s_constants.annotation_constants.ANN_TESTASK_NAME_KEY + ] = tes_task_name + + if executor_job.spec is None: + executor_job.spec = V1JobSpec(template=V1PodTemplateSpec()) + if executor_job.spec.template.spec is None: + executor_job.spec.template.spec = V1PodSpec(containers=[]) + + container: V1Container = executor_job.spec.template.spec.containers[0] + + # TODO: Not sure what to do with this + # Convert potential TRS URI into docker image + container.image = executor.image + + if not container.command: + container.command = [] + + for command in ExecutorCommandWrapper( + executor + ).get_commands_with_stream_redirects(): + container.command.append(command) + + if executor.env: + container.env = [ + V1EnvVar(name=key, value=value) for key, value in executor.env.items() + ] + else: + container.env = [] + + container.working_dir = executor.workdir + container.resources = V1ResourceRequirements(requests={}) + + assert container.resources.requests is not None + + if resources.cpu_cores: + container.resources.requests["cpu"] = parse_quantity( + str(resources.cpu_cores) + ) + + if resources.ram_gb: + container.resources.requests["memory"] = parse_quantity( + f"{resources.ram_gb:.6f}Gi" + ) + + # # FIXME: Workaround + # # Check if volumes is None and set it to an empty list if it is + if ( + executor_job.spec + and executor_job.spec.template.spec + and executor_job.spec.template.spec.volumes is None + ): + executor_job.spec.template.spec.volumes = [] + + return executor_job diff --git a/tesk/k8s/converter/data/__init__.py b/tesk/k8s/converter/data/__init__.py new file mode 100644 index 0000000..080fc38 --- /dev/null +++ b/tesk/k8s/converter/data/__init__.py @@ -0,0 +1 @@ +"""Data structure module to handle k8s resources of TES.""" diff --git a/tesk/k8s/converter/data/job.py b/tesk/k8s/converter/data/job.py new file mode 100644 index 0000000..73251dd --- /dev/null +++ b/tesk/k8s/converter/data/job.py @@ -0,0 +1,80 @@ +"""A container for a single Kubernetes job object. + +Can be both a taskmaster and an executor, it list of worker pods (Kubernetes +Pod objects). +""" + +from enum import Enum +from typing import List, Optional + +from kubernetes.client import V1Job, V1ObjectMeta, V1Pod + + +class Job: + """Class to list worker pods (Kubernetes Pod objects).""" + + def __init__(self, job: V1Job): + """Initializes the Job with a Kubernetes job object.""" + self.job: V1Job = job + self.pods: List[V1Pod] = [] + + def get_job(self) -> V1Job: + """Returns the Kubernetes job object.""" + return self.job + + def add_pod(self, pod: V1Pod): + """Adds a single pod to the list.""" + self.pods.append(pod) + + def has_pods(self) -> bool: + """Checks if the job has any pods.""" + return bool(self.pods) + + def get_first_pod(self) -> Optional[V1Pod]: + """Returns arbitrarily chosen pod from the list. + + Currently the first one added or None if the job has no pods. + """ + if not self.has_pods(): + return None + return self.pods[0] + + def get_pods(self) -> List[V1Pod]: + """Returns the list of job pods. + + Returns in the order of addition to the list or an empty list if no pods. + """ + return self.pods + + def change_job_name(self, new_name: str): + """Changes the job name. + + Also the names in its metadata and container specs. + """ + if self.job.metadata is None: + self.job.metadata = V1ObjectMeta(name=new_name) + else: + self.job.metadata.name = new_name + + if ( + self.job is not None + and self.job.spec is not None + and self.job.spec.template is not None + and self.job.spec.template.metadata is not None + ): + self.job.spec.template.metadata.name = new_name + + if self.job.spec.template.spec and self.job.spec.template.spec.containers: + self.job.spec.template.spec.containers[0].name = new_name + + def get_job_name(self) -> Optional[str]: + """Returns the job name.""" + return self.job.metadata.name if self.job.metadata else None + + +class JobStatus(Enum): + """State of job.""" + + ACTIVE = "Active" + SUCCEEDED = "Succeeded" + FAILED = "Failed" diff --git a/tesk/k8s/converter/data/task.py b/tesk/k8s/converter/data/task.py new file mode 100644 index 0000000..3f0e0e2 --- /dev/null +++ b/tesk/k8s/converter/data/task.py @@ -0,0 +1,98 @@ +"""A composite that represents Kubernetes object's graph of a single TES task. + +- Taskmaster job with its pods. +- Executor jobs with its pods. +""" + +import re +from typing import Dict, List, Optional + +from kubernetes.client.models import V1Job, V1ObjectMeta + +from tesk.k8s.constants import tesk_k8s_constants +from tesk.k8s.converter.data.job import Job + + +class Task: + """Task is a composite. + + It represents Kubernetes object's graph of a single TES task. + """ + + def __init__( + self, taskmaster: Optional[Job] = None, taskmaster_name: Optional[str] = None + ): + """Initialize the Task.""" + if taskmaster: + self.taskmaster = taskmaster + elif taskmaster_name: + job = V1Job(metadata=V1ObjectMeta(name=taskmaster_name)) + self.taskmaster = Job(job) + else: + raise ValueError("Either taskmaster or taskmaster_name must be provided.") + + self.executors_by_name: Dict[str, Job] = {} + self.output_filer: Optional[Job] = None + self.tesk_k8s_constants = tesk_k8s_constants + self.MAX_INT = 2**31 - 1 + + def add_executor(self, executor: Job) -> None: + """Add executor to the task.""" + metadata = executor.get_job().metadata + assert metadata is not None + + name = metadata.name + assert name is not None + + self.executors_by_name.setdefault(name, executor) + + def set_output_filer(self, filer: Job): + """Set output filer for the task.""" + self.output_filer = filer + + def get_taskmaster(self) -> Job: + """Get taskmaster job.""" + return self.taskmaster + + def get_executors(self) -> List[Job]: + """Get executors.""" + return sorted(self.executors_by_name.values(), key=self.extract_executor_number) + + def get_last_executor(self) -> Optional[Job]: + """Get last executor.""" + if not self.executors_by_name: + return None + executors = self.get_executors() + return executors[-1] if executors else None + + def get_output_filer(self) -> Optional[Job]: + """Get output filer.""" + return self.output_filer + + def extract_executor_number(self, executor: Job) -> int: + """Extract executor number from the executor's name.""" + taskmaster_name = self.taskmaster.get_job_name() + assert taskmaster_name is not None + + prefix = ( + taskmaster_name + self.tesk_k8s_constants.job_constants.JOB_NAME_EXEC_PREFIX + ) + exec_name = executor.get_job_name() + + if not exec_name: + return self.MAX_INT + + match = re.match(f"{re.escape(prefix)}(\d+)", exec_name) + if match: + return int(match.group(1)) + + return self.MAX_INT + + def get_executor_name(self, executor_index: int) -> str: + """Get executor name based on the taskmaster's job name and executor index.""" + taskmaster_name = self.taskmaster.get_job_name() + return ( + f"{taskmaster_name}" + f"{self.tesk_k8s_constants.job_constants.JOB_NAME_EXEC_PREFIX}" + f"{str(executor_index).zfill(self.tesk_k8s_constants.job_constants.JOB_NAME_EXEC_NO_LENGTH)}" + ) diff --git a/tesk/k8s/converter/executor_command_wrapper.py b/tesk/k8s/converter/executor_command_wrapper.py new file mode 100644 index 0000000..ebda592 --- /dev/null +++ b/tesk/k8s/converter/executor_command_wrapper.py @@ -0,0 +1,54 @@ +"""Wraps list of executor's command. + +Such that: +- If any of executor's stdin/stdout/stderr params is set, the command runs in shell +- Each part of the original command (single command/argument) that contained shell + special chars is surrounded by single quotes, plus single quote inside such string + are replaced with '"'"' sequence +- `stdin`, `stdout`, `stderr` streams are redirected to paths according to executors + params +""" + +from typing import List + +from tesk.api.ga4gh.tes.models import TesExecutor + + +class ExecutorCommandWrapper: + """Wraps executor's command.""" + + def __init__(self, executor: TesExecutor): + """Initialize the wrapper.""" + self.executor = executor + + def get_commands_with_stream_redirects(self) -> List[str]: + """Get command with stream redirects.""" + result = [] + + if ( + not self.executor.stdin + and not self.executor.stdout + and not self.executor.stderr + ): + return self.executor.command + + result.append("/bin/sh") + result.append("-c") + + command_parts = [" ".join(self.executor.command)] + + if self.executor.stdin: + command_parts.append("<") + command_parts.append(self.executor.stdin) + + if self.executor.stdout: + command_parts.append(">") + command_parts.append(self.executor.stdout) + + if self.executor.stderr: + command_parts.append("2>") + command_parts.append(self.executor.stderr) + + result.append(" ".join(command_parts)) + + return result diff --git a/tesk/k8s/converter/template.py b/tesk/k8s/converter/template.py new file mode 100644 index 0000000..feb5d09 --- /dev/null +++ b/tesk/k8s/converter/template.py @@ -0,0 +1,192 @@ +"""Create template for kubernetes objects.""" + +import logging +import uuid +from typing import Iterable + +from kubernetes.client import ( + V1Container, + V1EnvVar, + V1JobSpec, + V1ObjectMeta, + V1PodSpec, + V1PodTemplateSpec, + V1ResourceRequirements, + V1SecretVolumeSource, + V1Volume, + V1VolumeMount, +) +from kubernetes.client.models import V1Job + +from tesk.constants import tesk_constants +from tesk.custom_config import Taskmaster +from tesk.k8s.constants import tesk_k8s_constants +from tesk.utils import get_taskmaster_env_property, get_taskmaster_template + +logger = logging.getLogger(__name__) + + +class KubernetesTemplateSupplier: + """Templates for tasmaster's and executor's job object..""" + + def __init__( + self, + ): + """Initialize the converter.""" + self.taskmaster_template: V1Job = get_taskmaster_template() + self.taskmaster: Taskmaster = get_taskmaster_env_property() + self.tesk_k8s_constants = tesk_k8s_constants + self.tesk_constants = tesk_constants + + def get_taskmaster_name(self) -> str: + """Generate a unique name for the taskmaster job.""" + name: str = self.tesk_k8s_constants.job_constants.JOB_NAME_TASKM_PREFIX + str( + uuid.uuid4() + ) + return name + + def get_taskmaster_template_with_value_from_config(self) -> V1Job: + """Create a template for the taskmaster job.""" + job: V1Job = self.taskmaster_template + + if job.spec is None: + job.spec = V1JobSpec(template=V1PodTemplateSpec()) + if job.spec.template.spec is None: + job.spec.template.spec = V1PodSpec(containers=[]) + + job.spec.template.spec.service_account_name = self.taskmaster.serviceAccountName + + container = job.spec.template.spec.containers[0] + container.image = ( + f"{self.taskmaster.imageName}:" f"{self.taskmaster.imageVersion}" + ) + + assert isinstance(container.args, Iterable) + + container.args.extend( + [ + "-n", + self.tesk_constants.TESK_NAMESPACE, + "-fn", + self.taskmaster.filerImageName, + "-fv", + self.taskmaster.filerImageVersion, + ] + ) + + if self.taskmaster.debug: + container.args.append("-d") + container.image_pull_policy = "Always" + + if job.metadata is None: + job.metadata = V1ObjectMeta(labels={}) + + if job.metadata.labels is None: + job.metadata.labels = {} + + job.metadata.labels[ + self.tesk_k8s_constants.label_constants.LABEL_JOBTYPE_KEY + ] = self.tesk_k8s_constants.label_constants.LABEL_JOBTYPE_VALUE_TASKM + + # Assign a unique name to the taskmaster job + taskmaster_name = self.get_taskmaster_name() + job.metadata.name = taskmaster_name + container.name = taskmaster_name + + assert isinstance(container.env, Iterable) + + if container.env is None: + container.env = V1EnvVar() + + if self.taskmaster and self.taskmaster.environment: + container.env.extend( + [ + V1EnvVar(name=key.upper().replace(".", "_"), value=value) + for key, value in self.taskmaster.environment.items() + ] + ) + + # Set backoff env variables for `filer` and `executor` + backoff_limits = { + self.tesk_k8s_constants.job_constants.FILER_BACKOFF_LIMIT: self.tesk_constants.FILER_BACKOFF_LIMIT, # noqa: E501 + self.tesk_k8s_constants.job_constants.EXECUTOR_BACKOFF_LIMIT: self.tesk_constants.EXECUTOR_BACKOFF_LIMIT, # noqa: E501 + } + + container.env.extend( + [V1EnvVar(name=key, value=value) for key, value in backoff_limits.items()] + ) + + ftp_secrets = [ + self.tesk_k8s_constants.ftp_constants.FTP_SECRET_USERNAME_ENV, + self.tesk_k8s_constants.ftp_constants.FTP_SECRET_PASSWORD_ENV, + ] + container.env = [ + env + for env in container.env + if env.name not in ftp_secrets or self.taskmaster.ftp.enabled + ] + + if self.taskmaster.ftp.enabled: + for env in container.env: + if env.name in ftp_secrets: + assert env.value_from is not None + assert env.value_from.secret_key_ref is not None + env.value_from.secret_key_ref.name = self.taskmaster.ftp.secretName + + return job + + def get_executor_template_with_value_from_config(self) -> V1Job: + """Create a template for the executor job.""" + container = V1Container( + name=self.tesk_k8s_constants.label_constants.LABEL_JOBTYPE_VALUE_EXEC, + resources=V1ResourceRequirements(), + ) + + if self.taskmaster.executorSecret is not None: + container.volume_mounts = [ + V1VolumeMount( + read_only=True, + name=str(self.taskmaster.executorSecret.name), + mount_path=str(self.taskmaster.executorSecret.mountPath), + ) + ] + + pod_spec = V1PodSpec( + containers=[container], + restart_policy=self.tesk_k8s_constants.k8s_constants.JOB_RESTART_POLICY, + ) + + job = V1Job( + api_version=self.tesk_k8s_constants.k8s_constants.K8S_BATCH_API_VERSION, + kind=self.tesk_k8s_constants.k8s_constants.K8S_BATCH_API_JOB_TYPE, + metadata=V1ObjectMeta( + labels={ + self.tesk_k8s_constants.label_constants.LABEL_JOBTYPE_KEY: ( + self.tesk_k8s_constants.label_constants.LABEL_JOBTYPE_VALUE_EXEC + ) + } + ), + spec=V1JobSpec( + template=V1PodTemplateSpec(metadata=V1ObjectMeta(), spec=pod_spec) + ), + ) + + if self.taskmaster.executorSecret is not None: + if job.spec is None: + job.spec = V1JobSpec(template=V1PodTemplateSpec()) + if job.spec.template.spec is None: + job.spec.template.spec = V1PodSpec(containers=[]) + + job.spec.template.spec.volumes = [ + V1Volume( + name=str(self.taskmaster.executorSecret.name), + secret=V1SecretVolumeSource( + secret_name=self.taskmaster.executorSecret.name + ), + ) + ] + + assert job.spec is not None + assert job.spec.template.spec is not None + + return job diff --git a/tesk/utils.py b/tesk/utils.py index cdbbb7f..66affb6 100644 --- a/tesk/utils.py +++ b/tesk/utils.py @@ -1,29 +1,36 @@ """Utility functions for the TESK package.""" +import json import os from pathlib import Path +from typing import List, Sequence from foca import Foca from kubernetes.client.models import ( V1Container, + V1DownwardAPIVolumeFile, + V1DownwardAPIVolumeSource, V1EnvVar, V1EnvVarSource, V1Job, V1JobSpec, + V1ObjectFieldSelector, V1ObjectMeta, V1PodSpec, V1PodTemplateSpec, V1SecretKeySelector, + V1Volume, V1VolumeMount, ) +from pydantic import BaseModel -from tesk.constants import TeskConstants +from tesk.constants import tesk_constants from tesk.custom_config import ( CustomConfig, - TaskmasterEnvProperties, + Taskmaster, ) from tesk.exceptions import ConfigNotFoundError -from tesk.k8s.constants import TeskK8sConstants +from tesk.k8s.constants import tesk_k8s_constants def get_config_path() -> Path: @@ -64,27 +71,29 @@ def get_taskmaster_template() -> V1Job: api_version="batch/v1", kind="Job", metadata=V1ObjectMeta( - name=TeskK8sConstants.label_constants.LABEL_JOBTYPE_VALUE_TASKM, - labels={"app": TeskK8sConstants.label_constants.LABEL_JOBTYPE_VALUE_TASKM}, + name=tesk_k8s_constants.label_constants.LABEL_JOBTYPE_VALUE_TASKM, + labels={ + "app": tesk_k8s_constants.label_constants.LABEL_JOBTYPE_VALUE_TASKM + }, ), spec=V1JobSpec( template=V1PodTemplateSpec( metadata=V1ObjectMeta( - name=TeskK8sConstants.label_constants.LABEL_JOBTYPE_VALUE_TASKM + name=tesk_k8s_constants.label_constants.LABEL_JOBTYPE_VALUE_TASKM ), spec=V1PodSpec( service_account_name="default", containers=[ V1Container( - name=TeskK8sConstants.label_constants.LABEL_JOBTYPE_VALUE_TASKM, - image=f"{TeskConstants.TASKMASTER_IMAGE_NAME}:{TeskConstants.TASKMASTER_IMAGE_VERSION}", + name=tesk_k8s_constants.label_constants.LABEL_JOBTYPE_VALUE_TASKM, + image=f"{tesk_constants.TASKMASTER_IMAGE_NAME}:{tesk_constants.TASKMASTER_IMAGE_VERSION}", args=[ "-f", - f"/jsoninput/{TeskK8sConstants.job_constants.TASKMASTER_INPUT}.gz", + f"/jsoninput/{tesk_k8s_constants.job_constants.TASKMASTER_INPUT}.gz", ], env=[ V1EnvVar( - name=TeskK8sConstants.ftp_constants.FTP_SECRET_USERNAME_ENV, + name=tesk_k8s_constants.ftp_constants.FTP_SECRET_USERNAME_ENV, value_from=V1EnvVarSource( secret_key_ref=V1SecretKeySelector( name="ftp-secret", @@ -94,7 +103,7 @@ def get_taskmaster_template() -> V1Job: ), ), V1EnvVar( - name=TeskK8sConstants.ftp_constants.FTP_SECRET_PASSWORD_ENV, + name=tesk_k8s_constants.ftp_constants.FTP_SECRET_PASSWORD_ENV, value_from=V1EnvVarSource( secret_key_ref=V1SecretKeySelector( name="ftp-secret", @@ -118,8 +127,28 @@ def get_taskmaster_template() -> V1Job: ], ) ], - volumes=[], - restart_policy=TeskK8sConstants.k8s_constants.JOB_RESTART_POLICY, + volumes=[ + V1Volume( + name="podinfo", + downward_api=V1DownwardAPIVolumeSource( + items=[ + V1DownwardAPIVolumeFile( + path="labels", + field_ref=V1ObjectFieldSelector( + field_path="metadata.labels" + ), + ), + V1DownwardAPIVolumeFile( + path="annotations", + field_ref=V1ObjectFieldSelector( + field_path="metadata.annotations" + ), + ), + ] + ), + ), + ], + restart_policy=tesk_k8s_constants.k8s_constants.JOB_RESTART_POLICY, ), ) ), @@ -127,7 +156,7 @@ def get_taskmaster_template() -> V1Job: return job -def get_taskmaster_env_property() -> TaskmasterEnvProperties: +def get_taskmaster_env_property() -> Taskmaster: """Get the taskmaster env property from the custom configuration. Returns: @@ -135,10 +164,18 @@ def get_taskmaster_env_property() -> TaskmasterEnvProperties: """ custom_conf = get_custom_config() try: - return custom_conf.taskmaster_env_properties + return custom_conf.taskmaster except AttributeError: raise ConfigNotFoundError( "Custom configuration doesn't seem to have taskmaster_env_properties in " "config file." f"Custom config:\n{custom_conf}" ) from None + + +def pydantic_model_list_dict(model_list: Sequence[BaseModel]) -> List[str]: + """Convert a list of pydantic models to a list of dictionaries.""" + json_list = [] + for item in model_list: + json_list.append(json.loads(item.json())) + return json_list From 3fa2eca05dc4fb6fe49df8369572199658e1fb2a Mon Sep 17 00:00:00 2001 From: Javed Habib Date: Thu, 5 Sep 2024 23:48:42 +0530 Subject: [PATCH 03/19] fix: add podinfo volumes --- tesk/utils.py | 26 +++++++++++++++++++++++++- 1 file changed, 25 insertions(+), 1 deletion(-) diff --git a/tesk/utils.py b/tesk/utils.py index cdbbb7f..9e68aec 100644 --- a/tesk/utils.py +++ b/tesk/utils.py @@ -6,14 +6,18 @@ from foca import Foca from kubernetes.client.models import ( V1Container, + V1DownwardAPIVolumeFile, + V1DownwardAPIVolumeSource, V1EnvVar, V1EnvVarSource, V1Job, V1JobSpec, + V1ObjectFieldSelector, V1ObjectMeta, V1PodSpec, V1PodTemplateSpec, V1SecretKeySelector, + V1Volume, V1VolumeMount, ) @@ -118,7 +122,27 @@ def get_taskmaster_template() -> V1Job: ], ) ], - volumes=[], + volumes=[ + V1Volume( + name="podinfo", + downward_api=V1DownwardAPIVolumeSource( + items=[ + V1DownwardAPIVolumeFile( + path="labels", + field_ref=V1ObjectFieldSelector( + field_path="metadata.labels" + ), + ), + V1DownwardAPIVolumeFile( + path="annotations", + field_ref=V1ObjectFieldSelector( + field_path="metadata.annotations" + ), + ), + ] + ), + ), + ], restart_policy=TeskK8sConstants.k8s_constants.JOB_RESTART_POLICY, ), ) From ffecf225d28579d2d8eefefbb7312fe41ede27e6 Mon Sep 17 00:00:00 2001 From: Javed Habib Date: Fri, 6 Sep 2024 17:40:33 +0530 Subject: [PATCH 04/19] minoe --- tesk/utils.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tesk/utils.py b/tesk/utils.py index 9e68aec..4e788da 100644 --- a/tesk/utils.py +++ b/tesk/utils.py @@ -24,7 +24,7 @@ from tesk.constants import TeskConstants from tesk.custom_config import ( CustomConfig, - TaskmasterEnvProperties, + Taskmaster, ) from tesk.exceptions import ConfigNotFoundError from tesk.k8s.constants import TeskK8sConstants @@ -151,7 +151,7 @@ def get_taskmaster_template() -> V1Job: return job -def get_taskmaster_env_property() -> TaskmasterEnvProperties: +def get_taskmaster_env_property() -> Taskmaster: """Get the taskmaster env property from the custom configuration. Returns: @@ -159,7 +159,7 @@ def get_taskmaster_env_property() -> TaskmasterEnvProperties: """ custom_conf = get_custom_config() try: - return custom_conf.taskmaster_env_properties + return custom_conf.taskmaster except AttributeError: raise ConfigNotFoundError( "Custom configuration doesn't seem to have taskmaster_env_properties in " From 0acd4b0df9b9d03ab6b3ee183a62bf6a21e117f1 Mon Sep 17 00:00:00 2001 From: Javed Habib Date: Mon, 9 Sep 2024 17:27:02 +0530 Subject: [PATCH 05/19] minor --- tesk/utils.py | 29 +++++++++++------------------ 1 file changed, 11 insertions(+), 18 deletions(-) diff --git a/tesk/utils.py b/tesk/utils.py index 4e788da..55cd8e4 100644 --- a/tesk/utils.py +++ b/tesk/utils.py @@ -21,13 +21,13 @@ V1VolumeMount, ) -from tesk.constants import TeskConstants +from tesk.constants import tesk_constants from tesk.custom_config import ( CustomConfig, Taskmaster, ) from tesk.exceptions import ConfigNotFoundError -from tesk.k8s.constants import TeskK8sConstants +from tesk.k8s.constants import tesk_k8s_constants def get_config_path() -> Path: @@ -68,27 +68,26 @@ def get_taskmaster_template() -> V1Job: api_version="batch/v1", kind="Job", metadata=V1ObjectMeta( - name=TeskK8sConstants.label_constants.LABEL_JOBTYPE_VALUE_TASKM, - labels={"app": TeskK8sConstants.label_constants.LABEL_JOBTYPE_VALUE_TASKM}, + name=tesk_k8s_constants.label_constants.LABEL_JOBTYPE_VALUE_TASKM, ), spec=V1JobSpec( template=V1PodTemplateSpec( metadata=V1ObjectMeta( - name=TeskK8sConstants.label_constants.LABEL_JOBTYPE_VALUE_TASKM + name=tesk_k8s_constants.label_constants.LABEL_JOBTYPE_VALUE_TASKM ), spec=V1PodSpec( - service_account_name="default", + service_account_name=tesk_constants.TASKMASTER_SERVICE_ACCOUNT_NAME, containers=[ V1Container( - name=TeskK8sConstants.label_constants.LABEL_JOBTYPE_VALUE_TASKM, - image=f"{TeskConstants.TASKMASTER_IMAGE_NAME}:{TeskConstants.TASKMASTER_IMAGE_VERSION}", + name=tesk_k8s_constants.label_constants.LABEL_JOBTYPE_VALUE_TASKM, + image=f"{tesk_constants.TASKMASTER_IMAGE_NAME}:{tesk_constants.TASKMASTER_IMAGE_VERSION}", args=[ "-f", - f"/jsoninput/{TeskK8sConstants.job_constants.TASKMASTER_INPUT}.gz", + f"/jsoninput/{tesk_k8s_constants.job_constants.TASKMASTER_INPUT}.gz", ], env=[ V1EnvVar( - name=TeskK8sConstants.ftp_constants.FTP_SECRET_USERNAME_ENV, + name=tesk_k8s_constants.ftp_constants.FTP_SECRET_USERNAME_ENV, value_from=V1EnvVarSource( secret_key_ref=V1SecretKeySelector( name="ftp-secret", @@ -98,7 +97,7 @@ def get_taskmaster_template() -> V1Job: ), ), V1EnvVar( - name=TeskK8sConstants.ftp_constants.FTP_SECRET_PASSWORD_ENV, + name=tesk_k8s_constants.ftp_constants.FTP_SECRET_PASSWORD_ENV, value_from=V1EnvVarSource( secret_key_ref=V1SecretKeySelector( name="ftp-secret", @@ -133,17 +132,11 @@ def get_taskmaster_template() -> V1Job: field_path="metadata.labels" ), ), - V1DownwardAPIVolumeFile( - path="annotations", - field_ref=V1ObjectFieldSelector( - field_path="metadata.annotations" - ), - ), ] ), ), ], - restart_policy=TeskK8sConstants.k8s_constants.JOB_RESTART_POLICY, + restart_policy=tesk_k8s_constants.k8s_constants.JOB_RESTART_POLICY, ), ) ), From 2948508b6d9702a192422a25838ea6a5922bd67f Mon Sep 17 00:00:00 2001 From: Javed Habib Date: Wed, 11 Sep 2024 00:00:04 +0530 Subject: [PATCH 06/19] review changes --- tesk/custom_config.py | 112 ++++++++++++++++++++---------------------- tesk/exceptions.py | 4 ++ tesk/utils.py | 54 ++++++++++---------- 3 files changed, 85 insertions(+), 85 deletions(-) diff --git a/tesk/custom_config.py b/tesk/custom_config.py index 7acab14..7680b21 100644 --- a/tesk/custom_config.py +++ b/tesk/custom_config.py @@ -2,82 +2,74 @@ from typing import Dict, Optional -from pydantic import BaseModel, Field +from pydantic import BaseModel from tesk.api.ga4gh.tes.models import Service from tesk.constants import tesk_constants class FtpConfig(BaseModel): - """Ftp configuration model for the TESK.""" + """Ftp configuration model for the TESK. - secretName: Optional[str] = Field( - default=None, description="Name of the secret with FTP account credentials" - ) - enabled: bool = Field( - default=False, - description="If FTP account enabled (based on non-emptiness of secretName)", - ) + Args: + secretName: Name of the secret with FTP account credentials. + enabled: If FTP account enabled (based on non-emptiness of secretName). + """ + + secretName: Optional[str] = None + enabled: bool = False class ExecutorSecret(BaseModel): - """Executor secret configuration.""" - - name: Optional[str] = Field( - default=None, - description=( - "Name of a secret that will be mounted as volume to each executor. The same" - " name will be used for the secret and the volume" - ), - ) - mountPath: Optional[str] = Field( - default=None, - alias="mountPath", - description="The path where the secret will be mounted to executors", - ) - enabled: bool = Field( - default=False, description="Indicates whether the secret is enabled" - ) + """Executor secret configuration. + + Args: + name: Name of a secret that will be mounted as volume to each executor. The same + name will be used for the secret and the volume. + mountPath: The path where the secret will be mounted to executors. + enabled: Indicates whether the secret is enabled. + """ + + name: Optional[str] = None + mountPath: Optional[str] = None + enabled: bool = False class Taskmaster(BaseModel): - """Taskmaster environment properties model for the TESK.""" - - imageName: str = Field( - default=tesk_constants.TASKMASTER_IMAGE_NAME, - description="Taskmaster image name", - ) - imageVersion: str = Field( - default=tesk_constants.TASKMASTER_IMAGE_VERSION, - description="Taskmaster image version", - ) - filerImageName: str = Field( - default=tesk_constants.FILER_IMAGE_NAME, description="Filer image name" - ) - filerImageVersion: str = Field( - default=tesk_constants.FILER_IMAGE_VERSION, description="Filer image version" - ) - ftp: FtpConfig = Field(default=None, description="Test FTP account settings") - debug: bool = Field( - default=False, - description="If verbose (debug) mode of taskmaster is on (passes additional " - "flag to taskmaster and sets image pull policy to Always)", - ) - environment: Optional[Dict[str, str]] = Field( - default=None, - description="Environment variables, that will be passed to taskmaster", - ) - serviceAccountName: str = Field( - default="default", description="Service Account name for taskmaster" - ) - executorSecret: Optional[ExecutorSecret] = Field( - default=None, description="Executor secret configuration" - ) + """Taskmaster's configuration model for the TESK. + + Args: + imageName: Taskmaster image name. + imageVersion: Taskmaster image version. + filerImageName: Filer image name. + filerImageVersion: Filer image version. + ftp: FTP account settings. + debug: If verbose (debug) mode of taskmaster is on (passes additional flag to + taskmaster and sets image pull policy to Always). + environment: Environment variables, that will be passed to taskmaster. + serviceAccountName: Service Account name for taskmaster. + executorSecret: Executor secret configuration + """ + + imageName: str = tesk_constants.TASKMASTER_IMAGE_NAME + imageVersion: str = tesk_constants.TASKMASTER_IMAGE_VERSION + filerImageName: str = tesk_constants.FILER_IMAGE_NAME + filerImageVersion: str = tesk_constants.FILER_IMAGE_VERSION + ftp: FtpConfig = FtpConfig() + debug: bool = False + environment: Optional[Dict[str, str]] = None + serviceAccountName: str = tesk_constants.TASKMASTER_SERVICE_ACCOUNT_NAME + executorSecret: Optional[ExecutorSecret] = None class CustomConfig(BaseModel): - """Custom configuration model for the FOCA app.""" + """Custom configuration model for the FOCA app. + + Args: + service_info: Service information. + taskmaster: Taskmaster environment. + """ # Define custom configuration fields here service_info: Service - taskmaster: Taskmaster + taskmaster: Taskmaster = Taskmaster() diff --git a/tesk/exceptions.py b/tesk/exceptions.py index d0aa727..d270678 100644 --- a/tesk/exceptions.py +++ b/tesk/exceptions.py @@ -19,6 +19,10 @@ class ConfigNotFoundError(FileNotFoundError): """Configuration file not found error.""" +class ConfigInvalidError(ValueError): + """Configuration file is invalid.""" + + class KubernetesError(ApiException): """Kubernetes error.""" diff --git a/tesk/utils.py b/tesk/utils.py index 55cd8e4..c040643 100644 --- a/tesk/utils.py +++ b/tesk/utils.py @@ -21,12 +21,11 @@ V1VolumeMount, ) -from tesk.constants import tesk_constants from tesk.custom_config import ( CustomConfig, Taskmaster, ) -from tesk.exceptions import ConfigNotFoundError +from tesk.exceptions import ConfigInvalidError from tesk.k8s.constants import tesk_k8s_constants @@ -34,7 +33,7 @@ def get_config_path() -> Path: """Get the configuration path. Returns: - The path of the config file. + The path of the config file. """ # Determine the configuration path if config_path_env := os.getenv("TESK_FOCA_CONFIG_PATH"): @@ -47,23 +46,45 @@ def get_custom_config() -> CustomConfig: """Get the custom configuration. Returns: - The custom configuration. + The custom configuration. """ conf = Foca(config_file=get_config_path()).conf try: return CustomConfig(**conf.custom) except AttributeError: - raise ConfigNotFoundError( + raise ConfigInvalidError( "Custom configuration not found in config file." ) from None +def get_taskmaster_config() -> Taskmaster: + """Get the taskmaster env property from the custom configuration. + + Returns: + The taskmaster env property. + """ + custom_conf = get_custom_config() + try: + return custom_conf.taskmaster + except AttributeError: + raise ConfigInvalidError( + "Custom configuration doesn't seem to have taskmaster_env_properties in " + "config file." + f"Custom config:\n{custom_conf}" + ) from None + + def get_taskmaster_template() -> V1Job: """Get the taskmaster template from the custom configuration. + This will be used to create the taskmaster job, API will inject values + into the template, depending upon the type of job and request. + Returns: - The taskmaster template. + The taskmaster template. """ + taskmaster_conf: Taskmaster = get_taskmaster_config() + job = V1Job( api_version="batch/v1", kind="Job", @@ -76,11 +97,11 @@ def get_taskmaster_template() -> V1Job: name=tesk_k8s_constants.label_constants.LABEL_JOBTYPE_VALUE_TASKM ), spec=V1PodSpec( - service_account_name=tesk_constants.TASKMASTER_SERVICE_ACCOUNT_NAME, + service_account_name=taskmaster_conf.serviceAccountName, containers=[ V1Container( name=tesk_k8s_constants.label_constants.LABEL_JOBTYPE_VALUE_TASKM, - image=f"{tesk_constants.TASKMASTER_IMAGE_NAME}:{tesk_constants.TASKMASTER_IMAGE_VERSION}", + image=f"{taskmaster_conf.imageName}:{taskmaster_conf.imageVersion}", args=[ "-f", f"/jsoninput/{tesk_k8s_constants.job_constants.TASKMASTER_INPUT}.gz", @@ -142,20 +163,3 @@ def get_taskmaster_template() -> V1Job: ), ) return job - - -def get_taskmaster_env_property() -> Taskmaster: - """Get the taskmaster env property from the custom configuration. - - Returns: - The taskmaster env property. - """ - custom_conf = get_custom_config() - try: - return custom_conf.taskmaster - except AttributeError: - raise ConfigNotFoundError( - "Custom configuration doesn't seem to have taskmaster_env_properties in " - "config file." - f"Custom config:\n{custom_conf}" - ) from None From 7f21d1f9d57e19d6a4a1ed5d748b71ea0548f228 Mon Sep 17 00:00:00 2001 From: Javed Habib Date: Wed, 25 Sep 2024 22:58:02 +0530 Subject: [PATCH 07/19] remove getting env from os --- deployment/config.yaml | 2 ++ tesk/constants.py | 53 ++++++------------------------------------ tesk/custom_config.py | 2 ++ 3 files changed, 11 insertions(+), 46 deletions(-) diff --git a/deployment/config.yaml b/deployment/config.yaml index aea3f70..09027d9 100644 --- a/deployment/config.yaml +++ b/deployment/config.yaml @@ -89,6 +89,8 @@ custom: key: value # Service Account name for taskmaster serviceAccountName: taskmaster + filerBackoffLimit: 2 + executorBackoffLimit: 2 # Logging configuration # Cf. https://foca.readthedocs.io/en/latest/modules/foca.models.html#foca.models.config.LogConfig diff --git a/tesk/constants.py b/tesk/constants.py index 5508eb2..a301504 100644 --- a/tesk/constants.py +++ b/tesk/constants.py @@ -19,55 +19,16 @@ class TeskConstants(BaseModel): TASKMASTER_ENVIRONMENT_EXECUTOR_BACKOFF_LIMIT: Backoff limit for taskmaster env FILER_BACKOFF_LIMIT: Backoff limit got filer job EXECUTOR_BACKOFF_LIMIT: Backoff limit for executor job - - Note: - Below are the mentioned environment variable with which these constants can be - configured, otherwise mentioned default will be assigned. - - variable: - ENV_VARIABLE = default - - FILER_IMAGE_NAME: - TESK_API_TASKMASTER_FILER_IMAGE_NAME = docker.io/elixircloud/tesk-core-filer - FILER_IMAGE_VERSION: - TESK_API_TASKMASTER_FILER_IMAGE_VERSION = latest - TASKMASTER_IMAGE_NAME: - TESK_API_TASKMASTER_IMAGE_NAME = docker.io/elixircloud/tesk-core-taskmaster - TASKMASTER_IMAGE_VERSION: - TESK_API_TASKMASTER_IMAGE_VERSION = latest - TESK_NAMESPACE: - TESK_API_K8S_NAMESPACE = tesk - TASKMASTER_SERVICE_ACCOUNT_NAME: - TESK_API_TASKMASTER_SERVICE_ACCOUNT_NAME = taskmaster - TASKMASTER_ENVIRONMENT_EXECUTOR_BACKOFF_LIMIT: - ENVIRONMENT_EXECUTOR_BACKOFF_LIMIT = 6 - FILER_BACKOFF_LIMIT: - FILER_BACKOFF_LIMIT = 2 - EXECUTOR_BACKOFF_LIMIT: - EXECUTOR_BACKOFF_LIMIT = 2 """ - FILER_IMAGE_NAME: str = os.getenv( - "TESK_API_TASKMASTER_FILER_IMAGE_NAME", "docker.io/elixircloud/tesk-core-filer" - ) - FILER_IMAGE_VERSION: str = os.getenv( - "TESK_API_TASKMASTER_FILER_IMAGE_VERSION", "latest" - ) - TASKMASTER_IMAGE_NAME: str = os.getenv( - "TESK_API_TASKMASTER_IMAGE_NAME", "docker.io/elixircloud/tesk-core-taskmaster" - ) - TASKMASTER_IMAGE_VERSION: str = os.getenv( - "TESK_API_TASKMASTER_IMAGE_VERSION", "latest" - ) TESK_NAMESPACE: str = os.getenv("TESK_API_K8S_NAMESPACE", "tesk") - TASKMASTER_SERVICE_ACCOUNT_NAME: str = os.getenv( - "TESK_API_TASKMASTER_SERVICE_ACCOUNT_NAME", "taskmaster" - ) - TASKMASTER_ENVIRONMENT_EXECUTOR_BACKOFF_LIMIT: str = os.getenv( - "ENVIRONMENT_EXECUTOR_BACKOFF_LIMIT", "6" - ) - FILER_BACKOFF_LIMIT: str = os.getenv("FILER_BACKOFF_LIMIT", "2") - EXECUTOR_BACKOFF_LIMIT: str = os.getenv("EXECUTOR_BACKOFF_LIMIT", "2") + FILER_IMAGE_NAME: str = "docker.io/elixircloud/tesk-core-filer" + FILER_IMAGE_VERSION: str = "latest" + TASKMASTER_IMAGE_NAME: str = "docker.io/elixircloud/tesk-core-taskmaster" + TASKMASTER_IMAGE_VERSION: str = "latest" + TASKMASTER_SERVICE_ACCOUNT_NAME: str = "taskmaster" + FILER_BACKOFF_LIMIT: str = "2" + EXECUTOR_BACKOFF_LIMIT: str = "2" class Config: """Configuration for class.""" diff --git a/tesk/custom_config.py b/tesk/custom_config.py index 7680b21..5adaccf 100644 --- a/tesk/custom_config.py +++ b/tesk/custom_config.py @@ -60,6 +60,8 @@ class Taskmaster(BaseModel): environment: Optional[Dict[str, str]] = None serviceAccountName: str = tesk_constants.TASKMASTER_SERVICE_ACCOUNT_NAME executorSecret: Optional[ExecutorSecret] = None + filerBackoffLimit: int = tesk_constants.FILER_BACKOFF_LIMIT + executorBackoffLimit: int = tesk_constants.EXECUTOR_BACKOFF_LIMIT class CustomConfig(BaseModel): From 71d9be1230e8ed40d7957a0d87d0423cde7095dd Mon Sep 17 00:00:00 2001 From: Javed Habib Date: Wed, 25 Sep 2024 23:00:50 +0530 Subject: [PATCH 08/19] mypy --- tesk/constants.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tesk/constants.py b/tesk/constants.py index a301504..ff856bc 100644 --- a/tesk/constants.py +++ b/tesk/constants.py @@ -27,8 +27,8 @@ class TeskConstants(BaseModel): TASKMASTER_IMAGE_NAME: str = "docker.io/elixircloud/tesk-core-taskmaster" TASKMASTER_IMAGE_VERSION: str = "latest" TASKMASTER_SERVICE_ACCOUNT_NAME: str = "taskmaster" - FILER_BACKOFF_LIMIT: str = "2" - EXECUTOR_BACKOFF_LIMIT: str = "2" + FILER_BACKOFF_LIMIT: int = 2 + EXECUTOR_BACKOFF_LIMIT: int = 2 class Config: """Configuration for class.""" From 41d8d19bb9d7ec190cadba958f864ff0e3fe2636 Mon Sep 17 00:00:00 2001 From: Javed Habib Date: Fri, 4 Oct 2024 17:39:22 +0530 Subject: [PATCH 09/19] remove inline typing and use constants --- tesk/utils.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/tesk/utils.py b/tesk/utils.py index c040643..960549f 100644 --- a/tesk/utils.py +++ b/tesk/utils.py @@ -85,9 +85,9 @@ def get_taskmaster_template() -> V1Job: """ taskmaster_conf: Taskmaster = get_taskmaster_config() - job = V1Job( - api_version="batch/v1", - kind="Job", + return V1Job( + api_version=tesk_k8s_constants.k8s_constants.K8S_BATCH_API_VERSION, + kind=tesk_k8s_constants.k8s_constants.K8S_BATCH_API_JOB_TYPE, metadata=V1ObjectMeta( name=tesk_k8s_constants.label_constants.LABEL_JOBTYPE_VALUE_TASKM, ), @@ -162,4 +162,3 @@ def get_taskmaster_template() -> V1Job: ) ), ) - return job From e4993dfc08200716867cf9f33047b80542f02165 Mon Sep 17 00:00:00 2001 From: Javed Habib Date: Fri, 4 Oct 2024 18:12:01 +0530 Subject: [PATCH 10/19] change int to str to make it json marshalable --- tesk/constants.py | 4 ++-- tesk/custom_config.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/tesk/constants.py b/tesk/constants.py index ff856bc..a301504 100644 --- a/tesk/constants.py +++ b/tesk/constants.py @@ -27,8 +27,8 @@ class TeskConstants(BaseModel): TASKMASTER_IMAGE_NAME: str = "docker.io/elixircloud/tesk-core-taskmaster" TASKMASTER_IMAGE_VERSION: str = "latest" TASKMASTER_SERVICE_ACCOUNT_NAME: str = "taskmaster" - FILER_BACKOFF_LIMIT: int = 2 - EXECUTOR_BACKOFF_LIMIT: int = 2 + FILER_BACKOFF_LIMIT: str = "2" + EXECUTOR_BACKOFF_LIMIT: str = "2" class Config: """Configuration for class.""" diff --git a/tesk/custom_config.py b/tesk/custom_config.py index 5adaccf..4b584b1 100644 --- a/tesk/custom_config.py +++ b/tesk/custom_config.py @@ -60,8 +60,8 @@ class Taskmaster(BaseModel): environment: Optional[Dict[str, str]] = None serviceAccountName: str = tesk_constants.TASKMASTER_SERVICE_ACCOUNT_NAME executorSecret: Optional[ExecutorSecret] = None - filerBackoffLimit: int = tesk_constants.FILER_BACKOFF_LIMIT - executorBackoffLimit: int = tesk_constants.EXECUTOR_BACKOFF_LIMIT + filerBackoffLimit: str = tesk_constants.FILER_BACKOFF_LIMIT + executorBackoffLimit: str = tesk_constants.EXECUTOR_BACKOFF_LIMIT class CustomConfig(BaseModel): From 2ac6959d39ce239aee0274656079b7e579d23606 Mon Sep 17 00:00:00 2001 From: Javed Habib Date: Fri, 4 Oct 2024 18:12:55 +0530 Subject: [PATCH 11/19] change int to str to make it json marshalable --- tesk/constants.py | 4 ++-- tesk/custom_config.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/tesk/constants.py b/tesk/constants.py index ff856bc..a301504 100644 --- a/tesk/constants.py +++ b/tesk/constants.py @@ -27,8 +27,8 @@ class TeskConstants(BaseModel): TASKMASTER_IMAGE_NAME: str = "docker.io/elixircloud/tesk-core-taskmaster" TASKMASTER_IMAGE_VERSION: str = "latest" TASKMASTER_SERVICE_ACCOUNT_NAME: str = "taskmaster" - FILER_BACKOFF_LIMIT: int = 2 - EXECUTOR_BACKOFF_LIMIT: int = 2 + FILER_BACKOFF_LIMIT: str = "2" + EXECUTOR_BACKOFF_LIMIT: str = "2" class Config: """Configuration for class.""" diff --git a/tesk/custom_config.py b/tesk/custom_config.py index 5adaccf..4b584b1 100644 --- a/tesk/custom_config.py +++ b/tesk/custom_config.py @@ -60,8 +60,8 @@ class Taskmaster(BaseModel): environment: Optional[Dict[str, str]] = None serviceAccountName: str = tesk_constants.TASKMASTER_SERVICE_ACCOUNT_NAME executorSecret: Optional[ExecutorSecret] = None - filerBackoffLimit: int = tesk_constants.FILER_BACKOFF_LIMIT - executorBackoffLimit: int = tesk_constants.EXECUTOR_BACKOFF_LIMIT + filerBackoffLimit: str = tesk_constants.FILER_BACKOFF_LIMIT + executorBackoffLimit: str = tesk_constants.EXECUTOR_BACKOFF_LIMIT class CustomConfig(BaseModel): From a4edd0ebf83ea4d5448c51600cb61feb8fc6d528 Mon Sep 17 00:00:00 2001 From: Javed Habib Date: Mon, 7 Oct 2024 15:10:35 +0530 Subject: [PATCH 12/19] review --- tesk/api/ga4gh/tes/controllers.py | 15 ++++++++------- tesk/api/ga4gh/tes/task/create_task.py | 24 +++++++++++++++--------- tesk/exceptions.py | 6 ------ 3 files changed, 23 insertions(+), 22 deletions(-) diff --git a/tesk/api/ga4gh/tes/controllers.py b/tesk/api/ga4gh/tes/controllers.py index 653d3b3..791f620 100644 --- a/tesk/api/ga4gh/tes/controllers.py +++ b/tesk/api/ga4gh/tes/controllers.py @@ -1,16 +1,15 @@ """Controllers for GA4GH TES API endpoints.""" import logging - -# from connexion import request # type: ignore from typing import Any from foca.utils.logging import log_traffic # type: ignore +from pydantic import ValidationError from tesk.api.ga4gh.tes.models import TesTask from tesk.api.ga4gh.tes.service_info.service_info import ServiceInfo from tesk.api.ga4gh.tes.task.create_task import CreateTesTask -from tesk.exceptions import InternalServerError +from tesk.exceptions import BadRequest, InternalServerError # Get logger instance logger = logging.getLogger(__name__) @@ -31,7 +30,7 @@ def CancelTask(id, *args, **kwargs) -> dict: # type: ignore # POST /tasks @log_traffic -def CreateTask(**kwargs) -> dict: # type: ignore +def CreateTask(**kwargs) -> dict: """Create task. Args: @@ -39,9 +38,11 @@ def CreateTask(**kwargs) -> dict: # type: ignore """ try: request_body: Any = kwargs.get("body") - tes_task = TesTask(**request_body) - response = CreateTesTask(tes_task).response() - return response + try: + tes_task = TesTask(**request_body) + except ValidationError as e: + raise BadRequest(str(e)) from e + return CreateTesTask(tes_task).response() except Exception as e: raise InternalServerError from e diff --git a/tesk/api/ga4gh/tes/task/create_task.py b/tesk/api/ga4gh/tes/task/create_task.py index 17568d4..8b56d7f 100644 --- a/tesk/api/ga4gh/tes/task/create_task.py +++ b/tesk/api/ga4gh/tes/task/create_task.py @@ -1,6 +1,7 @@ """TESK API module for creating a task.""" import logging +from http import HTTPStatus from tesk.api.ga4gh.tes.models import TesCreateTaskResponse, TesResources, TesTask from tesk.api.ga4gh.tes.task.task_request import TesTaskRequest @@ -26,18 +27,27 @@ def __init__( def handle_request(self) -> TesCreateTaskResponse: """Create TES task.""" - attempts_no = 0 + attempts_no: int = 0 + total_attempts_no: int = ( + self.tesk_k8s_constants.job_constants.JOB_CREATE_ATTEMPTS_NO + ) + while ( attempts_no < self.tesk_k8s_constants.job_constants.JOB_CREATE_ATTEMPTS_NO ): try: + logger.debug( + f"Creating K8s job, attempt no: {attempts_no}/{total_attempts_no}." + ) attempts_no += 1 - resources = self.task.resources minimum_ram_gb = self.kubernetes_client_wrapper.minimum_ram_gb() - if not self.task.resources: + if self.task.resources is None: self.task.resources = TesResources(cpu_cores=int(minimum_ram_gb)) - if resources and resources.ram_gb and resources.ram_gb < minimum_ram_gb: + elif ( + self.task.resources.ram_gb is None + or self.task.resources.ram_gb < minimum_ram_gb + ): self.task.resources.ram_gb = minimum_ram_gb taskmaster_job = self.tes_kubernetes_converter.from_tes_task_to_k8s_job( @@ -62,14 +72,10 @@ def handle_request(self) -> TesCreateTaskResponse: except KubernetesError as e: if ( - not e.is_object_name_duplicated() + e.status != HTTPStatus.CONFLICT or attempts_no >= self.tesk_k8s_constants.job_constants.JOB_CREATE_ATTEMPTS_NO ): raise e - except Exception as exc: - logging.error("ERROR: In createTask", exc_info=True) - raise exc - return TesCreateTaskResponse(id="") # To silence mypy, should never be reached diff --git a/tesk/exceptions.py b/tesk/exceptions.py index 35471f1..d270678 100644 --- a/tesk/exceptions.py +++ b/tesk/exceptions.py @@ -1,7 +1,5 @@ """App exceptions.""" -from http import HTTPStatus - from connexion.exceptions import ( BadRequestProblem, ExtraParameterProblem, @@ -28,10 +26,6 @@ class ConfigInvalidError(ValueError): class KubernetesError(ApiException): """Kubernetes error.""" - def is_object_name_duplicated(self) -> bool: - """Check if object name is duplicated.""" - return self.status == HTTPStatus.CONFLICT - # exceptions raised in app context exceptions = { From 0d7981ac93fb77898ec0d745bb5d26770b8e05b6 Mon Sep 17 00:00:00 2001 From: Javed Habib Date: Fri, 11 Oct 2024 10:25:44 +0530 Subject: [PATCH 13/19] reviiews --- deployment/config.yaml | 4 +- tesk/api/ga4gh/tes/controllers.py | 15 +++--- tesk/api/ga4gh/tes/task/create_task.py | 23 ++++---- tesk/api/ga4gh/tes/task/task_request.py | 9 +++- tesk/exceptions.py | 5 ++ tesk/k8s/converter/converter.py | 52 ++++++++++++++++--- tesk/k8s/converter/data/task.py | 26 +++++++--- .../k8s/converter/executor_command_wrapper.py | 34 ++++++------ tesk/k8s/converter/template.py | 30 +++++++++-- tesk/k8s/wrapper.py | 18 ++++++- tesk/utils.py | 5 +- 11 files changed, 159 insertions(+), 62 deletions(-) diff --git a/deployment/config.yaml b/deployment/config.yaml index 09027d9..3567a52 100644 --- a/deployment/config.yaml +++ b/deployment/config.yaml @@ -74,9 +74,9 @@ custom: - ParamToRecogniseDataComingFromConfig taskmaster: imageName: docker.io/elixircloud/tesk-core-taskmaster - imageVersion: v0.10.2 + imageVersion: v0.10.4 filerImageName: docker.io/elixircloud/tesk-core-filer - filerImageVersion: v0.10.2 + filerImageVersion: v0.10.4 ftp: # Name of the secret with FTP account credentials secretName: account-secret diff --git a/tesk/api/ga4gh/tes/controllers.py b/tesk/api/ga4gh/tes/controllers.py index 791f620..0bbcbc6 100644 --- a/tesk/api/ga4gh/tes/controllers.py +++ b/tesk/api/ga4gh/tes/controllers.py @@ -9,7 +9,7 @@ from tesk.api.ga4gh.tes.models import TesTask from tesk.api.ga4gh.tes.service_info.service_info import ServiceInfo from tesk.api.ga4gh.tes.task.create_task import CreateTesTask -from tesk.exceptions import BadRequest, InternalServerError +from tesk.exceptions import BadRequest # Get logger instance logger = logging.getLogger(__name__) @@ -36,15 +36,12 @@ def CreateTask(**kwargs) -> dict: Args: **kwargs: Arbitrary keyword arguments. """ + request_body: Any = kwargs.get("body") try: - request_body: Any = kwargs.get("body") - try: - tes_task = TesTask(**request_body) - except ValidationError as e: - raise BadRequest(str(e)) from e - return CreateTesTask(tes_task).response() - except Exception as e: - raise InternalServerError from e + tes_task = TesTask(**request_body) + except ValidationError as e: + raise BadRequest(str(e)) from e + return CreateTesTask(tes_task).response() # GET /tasks/service-info diff --git a/tesk/api/ga4gh/tes/task/create_task.py b/tesk/api/ga4gh/tes/task/create_task.py index 8b56d7f..80dd3cf 100644 --- a/tesk/api/ga4gh/tes/task/create_task.py +++ b/tesk/api/ga4gh/tes/task/create_task.py @@ -32,16 +32,16 @@ def handle_request(self) -> TesCreateTaskResponse: self.tesk_k8s_constants.job_constants.JOB_CREATE_ATTEMPTS_NO ) - while ( - attempts_no < self.tesk_k8s_constants.job_constants.JOB_CREATE_ATTEMPTS_NO - ): + while attempts_no < total_attempts_no: try: logger.debug( f"Creating K8s job, attempt no: {attempts_no}/{total_attempts_no}." ) attempts_no += 1 + minimum_ram_gb = self.kubernetes_client_wrapper.minimum_ram_gb() + # Setting task resources based on the minimum RAM if self.task.resources is None: self.task.resources = TesResources(cpu_cores=int(minimum_ram_gb)) elif ( @@ -50,6 +50,7 @@ def handle_request(self) -> TesCreateTaskResponse: ): self.task.resources.ram_gb = minimum_ram_gb + # Create the K8s job and config map taskmaster_job = self.tes_kubernetes_converter.from_tes_task_to_k8s_job( self.task, ) @@ -59,7 +60,6 @@ def handle_request(self) -> TesCreateTaskResponse: taskmaster_job, ) ) - _ = self.kubernetes_client_wrapper.create_config_map( taskmaster_config_map ) @@ -71,11 +71,12 @@ def handle_request(self) -> TesCreateTaskResponse: return TesCreateTaskResponse(id=created_job.metadata.name) except KubernetesError as e: - if ( - e.status != HTTPStatus.CONFLICT - or attempts_no - >= self.tesk_k8s_constants.job_constants.JOB_CREATE_ATTEMPTS_NO - ): - raise e + if e.status == HTTPStatus.CONFLICT: + logger.debug( + "Conflict while creating Kubernetes job, retrying...", + ) + pass - return TesCreateTaskResponse(id="") # To silence mypy, should never be reached + raise KubernetesError( + "Failed to create Kubernetes job after multiple attempts." + ) diff --git a/tesk/api/ga4gh/tes/task/task_request.py b/tesk/api/ga4gh/tes/task/task_request.py index 7417a03..bbda96a 100644 --- a/tesk/api/ga4gh/tes/task/task_request.py +++ b/tesk/api/ga4gh/tes/task/task_request.py @@ -14,7 +14,14 @@ class TesTaskRequest(ABC): - """Base class for tesk request ecapsulating common methods and members.""" + """Base class for tesk request ecapsulating common methods and members. + + Attributes: + kubernetes_client_wrapper: kubernetes client wrapper + tes_kubernetes_converter: TES Kubernetes converter, used to convert TES requests + to Kubernetes resource + tesk_k8s_constants: TESK Kubernetes constants + """ def __init__(self): """Initialise base class for tesk request.""" diff --git a/tesk/exceptions.py b/tesk/exceptions.py index d270678..a1e56f3 100644 --- a/tesk/exceptions.py +++ b/tesk/exceptions.py @@ -74,6 +74,11 @@ class KubernetesError(ApiException): "detail": "An unexpected error occurred.", "status": 500, }, + KubernetesError: { + "title": "Kubernetes error", + "detail": "An error occurred while interacting with Kubernetes.", + "status": 500, + }, } diff --git a/tesk/k8s/converter/converter.py b/tesk/k8s/converter/converter.py index d3ca901..ba2d3fe 100644 --- a/tesk/k8s/converter/converter.py +++ b/tesk/k8s/converter/converter.py @@ -45,17 +45,38 @@ class TesKubernetesConverter: - """Convert TES requests to Kubernetes resources.""" + """Convert TES requests to Kubernetes resources. + + Attributes: + taskmaster_env_properties: taskmaster environment properties + template_supplier: Kubernetes template supplier + tesk_k8s_constants: TESK Kubernetes constants + kubernetes_client_wrapper: Kubernetes client wrapper + """ def __init__(self): - """Initialize the converter.""" + """Initialize the converter. + + Args: + taskmaster_env_properties: taskmaster environment properties + template_supplier: Kubernetes template supplier + tesk_k8s_constants: TESK Kubernetes constants + kubernetes_client_wrapper: Kubernetes client wrapper + """ self.taskmaster_env_properties: Taskmaster = get_taskmaster_env_property() self.template_supplier = KubernetesTemplateSupplier() self.tesk_k8s_constants = tesk_k8s_constants self.kubernetes_client_wrapper = KubernetesClientWrapper() - def from_tes_task_to_k8s_job(self, task: TesTask): - """Convert TES task to Kubernetes job.""" + def from_tes_task_to_k8s_job(self, task: TesTask) -> V1Job: + """Convert TES task to Kubernetes job. + + Args: + task: TES task + + Returns: + V1Job: Kubernetes job, taskmaster job + """ taskmaster_job: V1Job = ( self.template_supplier.get_taskmaster_template_with_value_from_config() ) @@ -121,7 +142,15 @@ def from_tes_task_to_k8s_config_map( task: TesTask, taskmaster_job: V1Job, ) -> V1ConfigMap: - """Create a Kubernetes ConfigMap from a TES task.""" + """Create a Kubernetes ConfigMap from a TES task. + + Args: + task: TES task + taskmaster_job: Kubernetes job + + Returns: + V1ConfigMap: Kubernetes ConfigMap + """ assert taskmaster_job.metadata is not None, ( "Taskmaster job metadata should have already been set while create" " taskmaster!" @@ -225,7 +254,18 @@ def from_tes_executor_to_k8s_job( # noqa: PLR0913, PLR0912 executor_index: int, resources: TesResources, ) -> V1Job: - """Create a Kubernetes job from a TES executor.""" + """Create a Kubernetes job from a TES executor. + + Args: + generated_task_id: generated task ID + tes_task_name: TES task name + executor: TES executor + executor_index: executor index + resources: TES resources + + Returns: + V1Job: Kubernetes job, executor job + """ # Get new template executor Job object executor_job: V1Job = ( self.template_supplier.get_executor_template_with_value_from_config() diff --git a/tesk/k8s/converter/data/task.py b/tesk/k8s/converter/data/task.py index 3f0e0e2..8a91be7 100644 --- a/tesk/k8s/converter/data/task.py +++ b/tesk/k8s/converter/data/task.py @@ -17,12 +17,23 @@ class Task: """Task is a composite. It represents Kubernetes object's graph of a single TES task. + + Attributes: + taskmaster: taskmaster job with its pods + executors_by_name: executors jobs with its pods + output_filer: output filer job with its pods + tesk_k8s_constants: TESK Kubernetes constants """ def __init__( self, taskmaster: Optional[Job] = None, taskmaster_name: Optional[str] = None ): - """Initialize the Task.""" + """Initialize the Task. + + Args: + taskmaster: taskmaster job with its pods + taskmaster_name: name of the taskmaster job + """ if taskmaster: self.taskmaster = taskmaster elif taskmaster_name: @@ -34,7 +45,6 @@ def __init__( self.executors_by_name: Dict[str, Job] = {} self.output_filer: Optional[Job] = None self.tesk_k8s_constants = tesk_k8s_constants - self.MAX_INT = 2**31 - 1 def add_executor(self, executor: Job) -> None: """Add executor to the task.""" @@ -77,16 +87,16 @@ def extract_executor_number(self, executor: Job) -> int: prefix = ( taskmaster_name + self.tesk_k8s_constants.job_constants.JOB_NAME_EXEC_PREFIX ) - exec_name = executor.get_job_name() - if not exec_name: - return self.MAX_INT + inf = 2**31 - 1 - match = re.match(f"{re.escape(prefix)}(\d+)", exec_name) - if match: + if not (exec_name := executor.get_job_name()): + return inf + + if match := re.match(f"{re.escape(prefix)}(\d+)", exec_name): return int(match.group(1)) - return self.MAX_INT + return inf def get_executor_name(self, executor_index: int) -> str: """Get executor name based on the taskmaster's job name and executor index.""" diff --git a/tesk/k8s/converter/executor_command_wrapper.py b/tesk/k8s/converter/executor_command_wrapper.py index ebda592..2c8cd13 100644 --- a/tesk/k8s/converter/executor_command_wrapper.py +++ b/tesk/k8s/converter/executor_command_wrapper.py @@ -15,16 +15,26 @@ class ExecutorCommandWrapper: - """Wraps executor's command.""" + """Wraps executor's command. + + Attributes: + executor: executor to wrap + """ def __init__(self, executor: TesExecutor): - """Initialize the wrapper.""" + """Initialize the wrapper. + + Args: + executor: executor to wrap + """ self.executor = executor def get_commands_with_stream_redirects(self) -> List[str]: - """Get command with stream redirects.""" - result = [] - + """Get command with stream redirects. + + Returns: + List[str]: command to run by exector with stream redirects + """ if ( not self.executor.stdin and not self.executor.stdout @@ -32,22 +42,16 @@ def get_commands_with_stream_redirects(self) -> List[str]: ): return self.executor.command - result.append("/bin/sh") - result.append("-c") + result = ["/bin/sh", "-c"] command_parts = [" ".join(self.executor.command)] if self.executor.stdin: - command_parts.append("<") - command_parts.append(self.executor.stdin) - + command_parts.extend(["<", self.executor.stdin]) if self.executor.stdout: - command_parts.append(">") - command_parts.append(self.executor.stdout) - + command_parts.extend([">", self.executor.stdout]) if self.executor.stderr: - command_parts.append("2>") - command_parts.append(self.executor.stderr) + command_parts.extend(["2>", self.executor.stderr]) result.append(" ".join(command_parts)) diff --git a/tesk/k8s/converter/template.py b/tesk/k8s/converter/template.py index feb5d09..a049184 100644 --- a/tesk/k8s/converter/template.py +++ b/tesk/k8s/converter/template.py @@ -27,12 +27,26 @@ class KubernetesTemplateSupplier: - """Templates for tasmaster's and executor's job object..""" + """Templates for tasmaster's and executor's job object. + + Attributes: + taskmaster_template: template for the taskmaster job + taskmaster: taskmaster environment properties + tesk_k8s_constants: TESK Kubernetes constants + tesk_constants: TESK constants + """ def __init__( self, ): - """Initialize the converter.""" + """Initialize the converter. + + Args: + taskmaster_template: template for the taskmaster job + taskmaster: taskmaster environment properties + tesk_k8s_constants: TESK Kubernetes constants + tesk_constants: TESK constants + """ self.taskmaster_template: V1Job = get_taskmaster_template() self.taskmaster: Taskmaster = get_taskmaster_env_property() self.tesk_k8s_constants = tesk_k8s_constants @@ -46,7 +60,11 @@ def get_taskmaster_name(self) -> str: return name def get_taskmaster_template_with_value_from_config(self) -> V1Job: - """Create a template for the taskmaster job.""" + """Create a template for the taskmaster job. + + Returns: + V1Job: template for the taskmaster job with values from config + """ job: V1Job = self.taskmaster_template if job.spec is None: @@ -136,7 +154,11 @@ def get_taskmaster_template_with_value_from_config(self) -> V1Job: return job def get_executor_template_with_value_from_config(self) -> V1Job: - """Create a template for the executor job.""" + """Create a template for the executor job. + + Returns: + V1Job: template for the executor job with values from config + """ container = V1Container( name=self.tesk_k8s_constants.label_constants.LABEL_JOBTYPE_VALUE_EXEC, resources=V1ResourceRequirements(), diff --git a/tesk/k8s/wrapper.py b/tesk/k8s/wrapper.py index 80a7112..5f7fb2b 100644 --- a/tesk/k8s/wrapper.py +++ b/tesk/k8s/wrapper.py @@ -22,10 +22,24 @@ class KubernetesClientWrapper: - """Kubernetes client wrapper class.""" + """Kubernetes client wrapper class. + + Attributes: + batch_api: Kubernetes batch API client + core_api: Kubernetes core API client + namespace: Kubernetes namespace + tesk_k8s_constant: TESK Kubernetes constants + """ def __init__(self): - """Initialize the Kubernetes client wrapper.""" + """Initialize the Kubernetes client wrapper. + + Args: + batch_api: Kubernetes batch API client + core_api: Kubernetes core API client + namespace: Kubernetes namespace + tesk_k8s_constant: TESK Kubernetes constants + """ config.load_kube_config() self.batch_api = client.BatchV1Api() self.core_api = client.CoreV1Api() diff --git a/tesk/utils.py b/tesk/utils.py index 36eb043..8d0091c 100644 --- a/tesk/utils.py +++ b/tesk/utils.py @@ -186,7 +186,4 @@ def get_taskmaster_env_property() -> Taskmaster: def pydantic_model_list_dict(model_list: Sequence[BaseModel]) -> List[str]: """Convert a list of pydantic models to a list of dictionaries.""" - json_list = [] - for item in model_list: - json_list.append(json.loads(item.json())) - return json_list + return [json.loads(item.json()) for item in model_list] From c45724bf2632649d1975606998c3cb9ef279386c Mon Sep 17 00:00:00 2001 From: Javed Habib Date: Fri, 11 Oct 2024 15:34:57 +0530 Subject: [PATCH 14/19] reviiews --- tesk/api/ga4gh/tes/task/create_task.py | 14 ++++++++++++-- tesk/api/ga4gh/tes/task/task_request.py | 2 +- tesk/k8s/converter/converter.py | 16 ++++++++-------- tesk/k8s/converter/data/task.py | 4 ++-- tesk/k8s/converter/executor_command_wrapper.py | 6 +++--- tesk/k8s/converter/template.py | 8 ++++---- tesk/k8s/wrapper.py | 4 ++-- 7 files changed, 32 insertions(+), 22 deletions(-) diff --git a/tesk/api/ga4gh/tes/task/create_task.py b/tesk/api/ga4gh/tes/task/create_task.py index 80dd3cf..0443052 100644 --- a/tesk/api/ga4gh/tes/task/create_task.py +++ b/tesk/api/ga4gh/tes/task/create_task.py @@ -11,7 +11,12 @@ class CreateTesTask(TesTaskRequest): - """Create TES task.""" + """Create TES task. + + Arguments: + TesTaskRequest: Base class for TES task request. + task: TES task to create. + """ def __init__( self, @@ -26,7 +31,12 @@ def __init__( self.task = task def handle_request(self) -> TesCreateTaskResponse: - """Create TES task.""" + """Create TES task. + + Returns: + TesCreateTaskResponse: TES task response after creating corresponding K8s + job. + """ attempts_no: int = 0 total_attempts_no: int = ( self.tesk_k8s_constants.job_constants.JOB_CREATE_ATTEMPTS_NO diff --git a/tesk/api/ga4gh/tes/task/task_request.py b/tesk/api/ga4gh/tes/task/task_request.py index bbda96a..0c6657b 100644 --- a/tesk/api/ga4gh/tes/task/task_request.py +++ b/tesk/api/ga4gh/tes/task/task_request.py @@ -15,7 +15,7 @@ class TesTaskRequest(ABC): """Base class for tesk request ecapsulating common methods and members. - + Attributes: kubernetes_client_wrapper: kubernetes client wrapper tes_kubernetes_converter: TES Kubernetes converter, used to convert TES requests diff --git a/tesk/k8s/converter/converter.py b/tesk/k8s/converter/converter.py index ba2d3fe..6ab704c 100644 --- a/tesk/k8s/converter/converter.py +++ b/tesk/k8s/converter/converter.py @@ -46,7 +46,7 @@ class TesKubernetesConverter: """Convert TES requests to Kubernetes resources. - + Attributes: taskmaster_env_properties: taskmaster environment properties template_supplier: Kubernetes template supplier @@ -56,7 +56,7 @@ class TesKubernetesConverter: def __init__(self): """Initialize the converter. - + Args: taskmaster_env_properties: taskmaster environment properties template_supplier: Kubernetes template supplier @@ -70,10 +70,10 @@ def __init__(self): def from_tes_task_to_k8s_job(self, task: TesTask) -> V1Job: """Convert TES task to Kubernetes job. - + Args: task: TES task - + Returns: V1Job: Kubernetes job, taskmaster job """ @@ -143,11 +143,11 @@ def from_tes_task_to_k8s_config_map( taskmaster_job: V1Job, ) -> V1ConfigMap: """Create a Kubernetes ConfigMap from a TES task. - + Args: task: TES task taskmaster_job: Kubernetes job - + Returns: V1ConfigMap: Kubernetes ConfigMap """ @@ -255,14 +255,14 @@ def from_tes_executor_to_k8s_job( # noqa: PLR0913, PLR0912 resources: TesResources, ) -> V1Job: """Create a Kubernetes job from a TES executor. - + Args: generated_task_id: generated task ID tes_task_name: TES task name executor: TES executor executor_index: executor index resources: TES resources - + Returns: V1Job: Kubernetes job, executor job """ diff --git a/tesk/k8s/converter/data/task.py b/tesk/k8s/converter/data/task.py index 8a91be7..6200293 100644 --- a/tesk/k8s/converter/data/task.py +++ b/tesk/k8s/converter/data/task.py @@ -17,7 +17,7 @@ class Task: """Task is a composite. It represents Kubernetes object's graph of a single TES task. - + Attributes: taskmaster: taskmaster job with its pods executors_by_name: executors jobs with its pods @@ -29,7 +29,7 @@ def __init__( self, taskmaster: Optional[Job] = None, taskmaster_name: Optional[str] = None ): """Initialize the Task. - + Args: taskmaster: taskmaster job with its pods taskmaster_name: name of the taskmaster job diff --git a/tesk/k8s/converter/executor_command_wrapper.py b/tesk/k8s/converter/executor_command_wrapper.py index 2c8cd13..ae9a724 100644 --- a/tesk/k8s/converter/executor_command_wrapper.py +++ b/tesk/k8s/converter/executor_command_wrapper.py @@ -16,14 +16,14 @@ class ExecutorCommandWrapper: """Wraps executor's command. - + Attributes: executor: executor to wrap """ def __init__(self, executor: TesExecutor): """Initialize the wrapper. - + Args: executor: executor to wrap """ @@ -31,7 +31,7 @@ def __init__(self, executor: TesExecutor): def get_commands_with_stream_redirects(self) -> List[str]: """Get command with stream redirects. - + Returns: List[str]: command to run by exector with stream redirects """ diff --git a/tesk/k8s/converter/template.py b/tesk/k8s/converter/template.py index a049184..1e4a70e 100644 --- a/tesk/k8s/converter/template.py +++ b/tesk/k8s/converter/template.py @@ -28,7 +28,7 @@ class KubernetesTemplateSupplier: """Templates for tasmaster's and executor's job object. - + Attributes: taskmaster_template: template for the taskmaster job taskmaster: taskmaster environment properties @@ -40,7 +40,7 @@ def __init__( self, ): """Initialize the converter. - + Args: taskmaster_template: template for the taskmaster job taskmaster: taskmaster environment properties @@ -61,7 +61,7 @@ def get_taskmaster_name(self) -> str: def get_taskmaster_template_with_value_from_config(self) -> V1Job: """Create a template for the taskmaster job. - + Returns: V1Job: template for the taskmaster job with values from config """ @@ -155,7 +155,7 @@ def get_taskmaster_template_with_value_from_config(self) -> V1Job: def get_executor_template_with_value_from_config(self) -> V1Job: """Create a template for the executor job. - + Returns: V1Job: template for the executor job with values from config """ diff --git a/tesk/k8s/wrapper.py b/tesk/k8s/wrapper.py index 5f7fb2b..8ce5ef5 100644 --- a/tesk/k8s/wrapper.py +++ b/tesk/k8s/wrapper.py @@ -23,7 +23,7 @@ class KubernetesClientWrapper: """Kubernetes client wrapper class. - + Attributes: batch_api: Kubernetes batch API client core_api: Kubernetes core API client @@ -33,7 +33,7 @@ class KubernetesClientWrapper: def __init__(self): """Initialize the Kubernetes client wrapper. - + Args: batch_api: Kubernetes batch API client core_api: Kubernetes core API client From b182a87dad271445088db4ccb794136af27e5ba9 Mon Sep 17 00:00:00 2001 From: Javed Habib Date: Fri, 11 Oct 2024 15:36:25 +0530 Subject: [PATCH 15/19] docs till here --- docs/source/pages/tesk/tesk.api.ga4gh.tes.rst | 1 + .../pages/tesk/tesk.api.ga4gh.tes.task.rst | 29 ++++++++++++ .../pages/tesk/tesk.k8s.converter.data.rst | 29 ++++++++++++ docs/source/pages/tesk/tesk.k8s.converter.rst | 45 +++++++++++++++++++ docs/source/pages/tesk/tesk.k8s.rst | 8 ++++ 5 files changed, 112 insertions(+) create mode 100644 docs/source/pages/tesk/tesk.api.ga4gh.tes.task.rst create mode 100644 docs/source/pages/tesk/tesk.k8s.converter.data.rst create mode 100644 docs/source/pages/tesk/tesk.k8s.converter.rst diff --git a/docs/source/pages/tesk/tesk.api.ga4gh.tes.rst b/docs/source/pages/tesk/tesk.api.ga4gh.tes.rst index 9081843..c319b4d 100644 --- a/docs/source/pages/tesk/tesk.api.ga4gh.tes.rst +++ b/docs/source/pages/tesk/tesk.api.ga4gh.tes.rst @@ -8,6 +8,7 @@ Subpackages :maxdepth: 4 tesk.api.ga4gh.tes.service_info + tesk.api.ga4gh.tes.task Submodules ---------- diff --git a/docs/source/pages/tesk/tesk.api.ga4gh.tes.task.rst b/docs/source/pages/tesk/tesk.api.ga4gh.tes.task.rst new file mode 100644 index 0000000..fc81148 --- /dev/null +++ b/docs/source/pages/tesk/tesk.api.ga4gh.tes.task.rst @@ -0,0 +1,29 @@ +tesk.api.ga4gh.tes.task package +=============================== + +Submodules +---------- + +tesk.api.ga4gh.tes.task.create\_task module +------------------------------------------- + +.. automodule:: tesk.api.ga4gh.tes.task.create_task + :members: + :undoc-members: + :show-inheritance: + +tesk.api.ga4gh.tes.task.task\_request module +-------------------------------------------- + +.. automodule:: tesk.api.ga4gh.tes.task.task_request + :members: + :undoc-members: + :show-inheritance: + +Module contents +--------------- + +.. automodule:: tesk.api.ga4gh.tes.task + :members: + :undoc-members: + :show-inheritance: diff --git a/docs/source/pages/tesk/tesk.k8s.converter.data.rst b/docs/source/pages/tesk/tesk.k8s.converter.data.rst new file mode 100644 index 0000000..20cf5c5 --- /dev/null +++ b/docs/source/pages/tesk/tesk.k8s.converter.data.rst @@ -0,0 +1,29 @@ +tesk.k8s.converter.data package +=============================== + +Submodules +---------- + +tesk.k8s.converter.data.job module +---------------------------------- + +.. automodule:: tesk.k8s.converter.data.job + :members: + :undoc-members: + :show-inheritance: + +tesk.k8s.converter.data.task module +----------------------------------- + +.. automodule:: tesk.k8s.converter.data.task + :members: + :undoc-members: + :show-inheritance: + +Module contents +--------------- + +.. automodule:: tesk.k8s.converter.data + :members: + :undoc-members: + :show-inheritance: diff --git a/docs/source/pages/tesk/tesk.k8s.converter.rst b/docs/source/pages/tesk/tesk.k8s.converter.rst new file mode 100644 index 0000000..77bc5cb --- /dev/null +++ b/docs/source/pages/tesk/tesk.k8s.converter.rst @@ -0,0 +1,45 @@ +tesk.k8s.converter package +========================== + +Subpackages +----------- + +.. toctree:: + :maxdepth: 4 + + tesk.k8s.converter.data + +Submodules +---------- + +tesk.k8s.converter.converter module +----------------------------------- + +.. automodule:: tesk.k8s.converter.converter + :members: + :undoc-members: + :show-inheritance: + +tesk.k8s.converter.executor\_command\_wrapper module +---------------------------------------------------- + +.. automodule:: tesk.k8s.converter.executor_command_wrapper + :members: + :undoc-members: + :show-inheritance: + +tesk.k8s.converter.template module +---------------------------------- + +.. automodule:: tesk.k8s.converter.template + :members: + :undoc-members: + :show-inheritance: + +Module contents +--------------- + +.. automodule:: tesk.k8s.converter + :members: + :undoc-members: + :show-inheritance: diff --git a/docs/source/pages/tesk/tesk.k8s.rst b/docs/source/pages/tesk/tesk.k8s.rst index 6756e05..503d4a0 100644 --- a/docs/source/pages/tesk/tesk.k8s.rst +++ b/docs/source/pages/tesk/tesk.k8s.rst @@ -1,6 +1,14 @@ tesk.k8s package ================ +Subpackages +----------- + +.. toctree:: + :maxdepth: 4 + + tesk.k8s.converter + Submodules ---------- From b4570a5fcd8028c115af94289e7c42cfcb6516aa Mon Sep 17 00:00:00 2001 From: Javed Habib Date: Fri, 11 Oct 2024 16:11:16 +0530 Subject: [PATCH 16/19] format --- tesk/api/ga4gh/tes/task/create_task.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tesk/api/ga4gh/tes/task/create_task.py b/tesk/api/ga4gh/tes/task/create_task.py index 0443052..d5884e0 100644 --- a/tesk/api/ga4gh/tes/task/create_task.py +++ b/tesk/api/ga4gh/tes/task/create_task.py @@ -34,7 +34,7 @@ def handle_request(self) -> TesCreateTaskResponse: """Create TES task. Returns: - TesCreateTaskResponse: TES task response after creating corresponding K8s + TesCreateTaskResponse: TES task response after creating corresponding K8s job. """ attempts_no: int = 0 From 925d63a5b19deb80c86c455893635ea5a095af88 Mon Sep 17 00:00:00 2001 From: Javed Habib Date: Fri, 11 Oct 2024 16:20:39 +0530 Subject: [PATCH 17/19] error --- tesk/api/ga4gh/tes/task/create_task.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tesk/api/ga4gh/tes/task/create_task.py b/tesk/api/ga4gh/tes/task/create_task.py index d5884e0..ad69507 100644 --- a/tesk/api/ga4gh/tes/task/create_task.py +++ b/tesk/api/ga4gh/tes/task/create_task.py @@ -88,5 +88,6 @@ def handle_request(self) -> TesCreateTaskResponse: pass raise KubernetesError( - "Failed to create Kubernetes job after multiple attempts." + status=HTTPStatus.INTERNAL_SERVER_ERROR, + reason=f"Failed to create Kubernetes job after {attempts_no} attempts.", ) From c6e3964a346946e4ce7b3a40175335244fef2088 Mon Sep 17 00:00:00 2001 From: Javed Habib Date: Mon, 14 Oct 2024 10:48:59 +0530 Subject: [PATCH 18/19] review --- pyproject.toml | 1 + tesk/api/ga4gh/tes/controllers.py | 4 ++-- tesk/utils.py | 25 ------------------------- 3 files changed, 3 insertions(+), 27 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index ae2a83c..94af9d3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,6 +4,7 @@ requires = ["poetry-core"] [tool.bandit] skips = [ + "B101", # Use of assert detected. "B321", # FTP-related functions are being called. "B402", # A FTP-related module is being imported. "B108" # Insecure usage of temp file/directory, false positive. diff --git a/tesk/api/ga4gh/tes/controllers.py b/tesk/api/ga4gh/tes/controllers.py index 0bbcbc6..b7c4aed 100644 --- a/tesk/api/ga4gh/tes/controllers.py +++ b/tesk/api/ga4gh/tes/controllers.py @@ -1,7 +1,7 @@ """Controllers for GA4GH TES API endpoints.""" import logging -from typing import Any +from typing import Mapping from foca.utils.logging import log_traffic # type: ignore from pydantic import ValidationError @@ -36,7 +36,7 @@ def CreateTask(**kwargs) -> dict: Args: **kwargs: Arbitrary keyword arguments. """ - request_body: Any = kwargs.get("body") + request_body: Mapping = kwargs.get("body", {}) try: tes_task = TesTask(**request_body) except ValidationError as e: diff --git a/tesk/utils.py b/tesk/utils.py index 3ef9e13..8d0091c 100644 --- a/tesk/utils.py +++ b/tesk/utils.py @@ -31,31 +31,6 @@ from tesk.exceptions import ConfigInvalidError, ConfigNotFoundError from tesk.k8s.constants import tesk_k8s_constants -from foca import Foca -from kubernetes.client.models import ( - V1Container, - V1DownwardAPIVolumeFile, - V1DownwardAPIVolumeSource, - V1EnvVar, - V1EnvVarSource, - V1Job, - V1JobSpec, - V1ObjectFieldSelector, - V1ObjectMeta, - V1PodSpec, - V1PodTemplateSpec, - V1SecretKeySelector, - V1Volume, - V1VolumeMount, -) - -from tesk.custom_config import ( - CustomConfig, - Taskmaster, -) -from tesk.exceptions import ConfigInvalidError -from tesk.k8s.constants import tesk_k8s_constants - def get_config_path() -> Path: """Get the configuration path. From 5607e62fef0f238a7ef47455b4bdd79efb61b2db Mon Sep 17 00:00:00 2001 From: Javed Habib Date: Mon, 14 Oct 2024 10:53:10 +0530 Subject: [PATCH 19/19] remove todo and fixme --- tesk/k8s/converter/converter.py | 21 +-------------------- 1 file changed, 1 insertion(+), 20 deletions(-) diff --git a/tesk/k8s/converter/converter.py b/tesk/k8s/converter/converter.py index 6ab704c..9c58a34 100644 --- a/tesk/k8s/converter/converter.py +++ b/tesk/k8s/converter/converter.py @@ -94,18 +94,6 @@ def from_tes_task_to_k8s_job(self, task: TesTask) -> V1Job: taskmaster_job.metadata.annotations[ self.tesk_k8s_constants.annotation_constants.ANN_TESTASK_NAME_KEY ] = task.name - # taskmaster_job.metadata.labels[self.constants.label_userid_key] = user[ - # "username" - # ] - - # if task.tags and "GROUP_NAME" in task.tags: - # taskmaster_job.metadata.labels[self.constants.label_userid_key] = task[ - # "tags" - # ]["GROUP_NAME"] - # elif user["is_member"]: - # taskmaster_job.metadata.labels[self.constants.label_groupname_key] = user[ - # "any_group" - # ] json_input = json.dumps( task.dict(), @@ -170,23 +158,16 @@ def from_tes_task_to_k8s_config_map( if taskmaster_config_map.metadata.annotations is None: taskmaster_config_map.metadata.annotations = {} - # FIXME: What if the task name is None? - task_name = task.name or "task-name-not-set" + task_name = task.name or "String" taskmaster_config_map.metadata.annotations[ self.tesk_k8s_constants.annotation_constants.ANN_TESTASK_NAME_KEY ] = task_name - # taskmaster_config_map.metadata.labels[self.constants.label_userid_key] - # = user["username"] - if task.tags and "GROUP_NAME" in task.tags: taskmaster_config_map.metadata.labels[ self.tesk_k8s_constants.label_constants.LABEL_GROUPNAME_KEY ] = task.tags["GROUP_NAME"] - # elif user["is_member"]: - # taskmaster_config_map.metadata.labels[self.constants.label_groupname_key] - # = user["any_group"] assert taskmaster_config_map.metadata.name is not None assert task.resources is not None