Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add scheduling capability #173

Merged
merged 16 commits into from
Aug 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
# Changelog

## [Unreleased] 2024-07-23
## [Unreleased] 2024-07-29
sfczekalski marked this conversation as resolved.
Show resolved Hide resolved

- Brought back the Vertex AI Pipelines scheduling capability
- Migrated to kfp 2
- Removed `image_pull_policy` parameter from configuration, as it only applies to Kubernetes backend and not Vertex AI,
and it's only available in `kfp-kubernetes` extension package
Expand Down
17 changes: 17 additions & 0 deletions docs/source/02_installation/02_configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,23 @@ run_config:
# client_id: iam-client-id

dynamic_config_providers: []

# Schedules configuration
schedules:
default_schedule:
cron_expression: "0 * * * *"
timezone: Etc/UTC
# Optional. Timestamp after which the first run can be scheduled. If unspecified, it defaults to the schedule creation timestamp.
start_time: null
# Optional. Timestamp after which no more runs will be scheduled. If unspecified, then runs will be scheduled indefinitely.
end_time: null
# Optional. Whether new scheduled runs can be queued when max_concurrent_runs limit is reached.
allow_queueing: false
# Optional. Maximum run count of the schedule. If specified, The schedule will be completed when either started_run_count >= max_run_count or when end_time is reached. Must be positive and <= 2^63-1.
max_run_count: null
# Optional. Maximum number of runs that can be started concurrently for this PipelineJobSchedule.
max_concurrent_run_count: 1

```

## Dynamic configuration support
Expand Down
82 changes: 77 additions & 5 deletions kedro_vertexai/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from click import ClickException, Context, confirm

from .client import VertexAIPipelinesClient
from .config import PluginConfig, RunConfig
from .config import PluginConfig, RunConfig, ScheduleConfig
from .constants import VERTEXAI_RUN_ID_TAG
from .context_helper import ContextHelper
from .utils import (
Expand Down Expand Up @@ -206,6 +206,43 @@ def compile(ctx, image, pipeline, output) -> None:
help="Cron expression for recurring run",
required=False,
)
@click.option(
"-t",
"--timezone",
type=str,
help="Time zone of the crone expression.",
required=False,
)
@click.option(
"--start-time",
type=str,
help="Timestamp after which the first run can be scheduled.",
required=False,
)
@click.option(
"--end-time",
type=str,
help="Timestamp after which no more runs will be scheduled. ",
required=False,
)
@click.option(
"--allow-queueing",
type=bool,
help="Whether new scheduled runs can be queued when max_concurrent_runs limit is reached.",
required=False,
)
@click.option(
"--max-run-count",
type=int,
help="Maximum run count of the schedule.",
required=False,
)
@click.option(
"--max-concurrent-run-count",
type=int,
help="Maximum number of runs that can be started concurrently.",
required=False,
)
@click.option(
"--param",
"params",
Expand All @@ -218,12 +255,47 @@ def schedule(
ctx,
pipeline: str,
cron_expression: str,
params: list,
timezone: str,
start_time: str = None,
end_time: str = None,
allow_queueing: bool = None,
max_run_count: int = None,
max_concurrent_run_count: int = None,
params: list = [],
):
"""Schedules recurring execution of latest version of the pipeline"""
logger.warning(
"Scheduler functionality was temporarily disabled, "
"follow https://github.com/getindata/kedro-vertexai/issues/4 for updates"
context_helper = ctx.obj["context_helper"]
client: VertexAIPipelinesClient = context_helper.vertexai_client
config: RunConfig = context_helper.config.run_config

schedule_config: ScheduleConfig = config.schedules.get(
pipeline, config.schedules["default_schedule"]
)

schedule_config.cron_expression = (
cron_expression if cron_expression else schedule_config.cron_expression
)
schedule_config.timezone = timezone if timezone else schedule_config.timezone
schedule_config.start_time = (
start_time if start_time else schedule_config.start_time
)
schedule_config.end_time = end_time if end_time else schedule_config.end_time
schedule_config.allow_queueing = (
allow_queueing if allow_queueing else schedule_config.allow_queueing
)
schedule_config.max_run_count = (
max_run_count if max_run_count else schedule_config.max_run_count
)
schedule_config.max_concurrent_run_count = (
max_concurrent_run_count
if max_concurrent_run_count
else schedule_config.max_concurrent_run_count
)

client.schedule(
pipeline=pipeline,
schedule_config=schedule_config,
parameter_values=format_params(params),
)


Expand Down
94 changes: 54 additions & 40 deletions kedro_vertexai/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,17 @@
"""

import datetime as dt
import json
import logging
import os
from tempfile import NamedTemporaryFile
from typing import Any, Dict, Optional

from google.cloud import aiplatform as aip
from google.cloud.aiplatform import PipelineJob
from google.cloud.scheduler_v1.services.cloud_scheduler import (
CloudSchedulerClient,
)
from kfp import compiler
from kfp.compiler import Compiler
from tabulate import tabulate

from .config import PluginConfig
from .config import PluginConfig, ScheduleConfig
from .generator import PipelineGenerator


Expand All @@ -30,7 +27,6 @@ class VertexAIPipelinesClient:
def __init__(self, config: PluginConfig, project_name, context):

aip.init(project=config.project_id, location=config.region)
self.cloud_scheduler_client = CloudSchedulerClient()
self.location = f"projects/{config.project_id}/locations/{config.region}"
self.run_config = config.run_config
self.run_name = self._generate_run_name(config)
Expand Down Expand Up @@ -106,61 +102,79 @@ def compile(
"""
token = os.getenv("MLFLOW_TRACKING_TOKEN", "")
pipeline_func = self.generator.generate_pipeline(pipeline, image, token)
compiler.Compiler().compile(
Compiler().compile(
pipeline_func=pipeline_func,
package_path=output,
)
self.log.info("Generated pipeline definition was saved to %s", str(output))

def _cleanup_old_schedule(self, pipeline_name):
"""
Removes old jobs scheduled for given pipeline name
def _cleanup_old_schedule(self, display_name: str):
"""Cleanup old schedules with a given display name.

Args:
display_name (str): Display name of the schedule.
"""
for job in self.cloud_scheduler_client.list_jobs(parent=self.location):
if "jobs/pipeline_pipeline" not in job.name:
continue

job_pipeline_name = json.loads(job.http_target.body)["pipelineSpec"][
"pipelineInfo"
]["name"]
if job_pipeline_name == pipeline_name:
self.log.info(
"Found existing schedule for the pipeline at %s, deleting...",
job.schedule,
)
self.cloud_scheduler_client.delete_job(name=job.name)
existing_schedules = aip.PipelineJobSchedule.list(
filter=f'display_name="{display_name}"'
)
self.log.info(
f"Found {len(existing_schedules)} existing schedules with display name {display_name}"
)

for schedule in existing_schedules:
schedule.delete()

self.log.info(
f"Cleaned up existing old schedules with display name {display_name}"
)

def schedule(
self,
pipeline,
cron_expression,
parameter_values=None,
image_pull_policy="IfNotPresent",
pipeline: str,
schedule_config: ScheduleConfig,
parameter_values: Optional[Dict[str, Any]] = None,
):
"""
Schedule pipeline to Vertex AI with given cron expression
:param pipeline:
:param cron_expression:
:param parameter_values:
:param image_pull_policy:
:param pipeline: Name of the Kedro pipeline to schedule.
:param schedule_config: Schedule config.
:param parameter_values: Kubeflow pipeline parameter values.
:return:
"""
self._cleanup_old_schedule(self.generator.get_pipeline_name())
self._cleanup_old_schedule(display_name=self.run_config.scheduled_run_name)

with NamedTemporaryFile(
mode="rt", prefix="kedro-vertexai", suffix=".json"
mode="rt", prefix="kedro-vertexai", suffix=".yaml"
) as spec_output:
self.compile(
pipeline,
self.run_config.image,
output=spec_output.name,
)
self.api_client.create_schedule_from_job_spec(
job_spec_path=spec_output.name,
time_zone="Etc/UTC",
schedule=cron_expression,

job = aip.PipelineJob(
display_name=self.run_name,
template_path=spec_output.name,
job_id=self.run_name,
pipeline_root=f"gs://{self.run_config.root}",
enable_caching=False,
parameter_values=parameter_values or {},
enable_caching=False,
)

cron_with_timezone = (
f"TZ={schedule_config.timezone} {schedule_config.cron_expression}"
)

job.create_schedule(
cron=cron_with_timezone,
display_name=self.run_config.scheduled_run_name,
start_time=schedule_config.start_time,
end_time=schedule_config.end_time,
allow_queueing=schedule_config.allow_queueing,
max_run_count=schedule_config.max_run_count,
max_concurrent_run_count=schedule_config.max_concurrent_run_count,
service_account=self.run_config.service_account,
network=self.run_config.network.vpc,
)

self.log.info("Pipeline scheduled to %s", cron_expression)
self.log.info("Pipeline scheduled to %s", cron_with_timezone)
30 changes: 30 additions & 0 deletions kedro_vertexai/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,25 @@
# mlflow:
# request_header_provider_params:
# key: value

# Schedules configuration
schedules:
default_schedule:
cron_expression: "0 * * * *"
timezone: Etc/UTC
start_time: none
end_time: none
allow_queueing: false
max_run_count: none
max_concurrent_run_count: 1
# training_pipeline:
# cron_expression: "0 0 * * *"
# timezone: America/New_York
# start_time: none
# end_time: none
# allow_queueing: false
# max_run_count: none
# max_concurrent_run_count: 1
"""


Expand Down Expand Up @@ -193,6 +212,16 @@ class MLFlowVertexAIConfig(BaseModel):
request_header_provider_params: Optional[Dict[str, str]]


class ScheduleConfig(BaseModel):
cron_expression: Optional[str] = "0 * * * *"
timezone: Optional[str] = "Etc/UTC"
start_time: Optional[str] = None
end_time: Optional[str] = None
allow_queueing: Optional[bool] = False
max_run_count: Optional[int] = None
max_concurrent_run_count: Optional[int] = 1


class RunConfig(BaseModel):
image: str
root: Optional[str]
Expand All @@ -209,6 +238,7 @@ class RunConfig(BaseModel):
node_selectors: Optional[Dict[str, Dict[str, str]]] = {}
dynamic_config_providers: Optional[List[DynamicConfigProviderConfig]] = []
mlflow: Optional[MLFlowVertexAIConfig] = None
schedules: Optional[Dict[str, ScheduleConfig]] = None

def resources_for(self, node: str, tags: Optional[set] = None):
default_config = self.resources["__default__"].dict()
Expand Down
Loading
Loading