diff --git a/.gitbook.yaml b/.gitbook.yaml
index 828031f184..984ca276a6 100644
--- a/.gitbook.yaml
+++ b/.gitbook.yaml
@@ -51,7 +51,8 @@ redirects:
how-to/build-pipelines/schedule-a-pipeline: how-to/pipeline-development/build-pipelines/schedule-a-pipeline.md
how-to/build-pipelines/delete-a-pipeline: how-to/pipeline-development/build-pipelines/delete-a-pipeline.md
how-to/build-pipelines/compose-pipelines: how-to/pipeline-development/build-pipelines/compose-pipelines.md
- how-to/build-pipelines/dynamically-assign-artifact-names: how-to/pipeline-development/build-pipelines/dynamically-assign-artifact-names.md
+ how-to/build-pipelines/dynamically-assign-artifact-names: how-to/data-artifact-management/handle-data-artifacts/artifacts-naming.md
+ how-to/pipeline-development/build-pipelines/dynamically-assign-artifact-names: how-to/data-artifact-management/handle-data-artifacts/artifacts-naming.md
how-to/build-pipelines/retry-steps: how-to/pipeline-development/build-pipelines/retry-steps.md
how-to/build-pipelines/run-pipelines-asynchronously: how-to/pipeline-development/build-pipelines/run-pipelines-asynchronously.md
how-to/build-pipelines/control-execution-order-of-steps: how-to/pipeline-development/build-pipelines/control-execution-order-of-steps.md
diff --git a/docs/book/how-to/data-artifact-management/handle-data-artifacts/artifacts-naming.md b/docs/book/how-to/data-artifact-management/handle-data-artifacts/artifacts-naming.md
new file mode 100644
index 0000000000..45f0ff9942
--- /dev/null
+++ b/docs/book/how-to/data-artifact-management/handle-data-artifacts/artifacts-naming.md
@@ -0,0 +1,152 @@
+---
+description: Understand how you can name your ZenML artifacts.
+---
+
+# How Artifact Naming works in ZenML
+
+In ZenML pipelines, you often need to reuse the same step multiple times with different inputs, resulting in multiple artifacts. However, the default naming convention for artifacts can make it challenging to track and differentiate between these outputs, especially when they need to be used in subsequent pipelines. Below you can find a detailed exploration of how you might name your output artifacts dynamically or statically, depending on your needs.
+
+ZenML uses type annotations in function definitions to determine artifact names. Output artifacts with the same name are saved with incremented version numbers.
+
+ZenML provides flexible options for naming output artifacts, supporting both static and dynamic naming strategies:
+- Names can be generated dynamically at runtime
+- Support for string templates (standard and custom placeholders supported)
+- Compatible with single and multiple output scenarios
+- Annotations help define naming strategy without modifying core logic
+
+## Naming Strategies
+
+### Static Naming
+Static names are defined directly as string literals.
+
+```python
+@step
+def static_single() -> Annotated[str, "static_output_name"]:
+ return "null"
+```
+
+### Dynamic Naming
+Dynamic names can be generated using:
+
+#### String Templates Using Standard Placeholders
+Use the following placeholders that ZenML will replace automatically:
+
+* `{date}` will resolve to the current date, e.g. `2024_11_18`
+* `{time}` will resolve to the current time, e.g. `11_07_09_326492`
+
+```python
+@step
+def dynamic_single_string() -> Annotated[str, "name_{date}_{time}"]:
+ return "null"
+```
+
+#### String Templates Using Custom Placeholders
+Use any placeholders that ZenML will replace for you, if they are provided into a step via `substitutions` parameter:
+
+```python
+@step(substitutions={"custom_placeholder": "some_substitute"})
+def dynamic_single_string() -> Annotated[str, "name_{custom_placeholder}_{time}"]:
+ return "null"
+```
+
+Another option is to use `with_options` to dynamically redefine the placeholder, like this:
+
+```python
+@step
+def extract_data(source: str) -> Annotated[str, "{stage}_dataset"]:
+ ...
+ return "my data"
+
+@pipeline
+def extraction_pipeline():
+ extract_data.with_options(substitutions={"stage": "train"})(source="s3://train")
+ extract_data.with_options(substitutions={"stage": "test"})(source="s3://test")
+```
+
+{% hint style="info" %}
+The substitutions for the custom placeholders like `stage` can be set in:
+- `@pipeline` decorator, so they are effective for all steps in this pipeline
+- `pipeline.with_options` function, so they are effective for all steps in this pipeline run
+- `@step` decorator, so they are effective for this step (this overrides the pipeline settings)
+- `step.with_options` function, so they are effective for this step run (this overrides the pipeline settings)
+
+Standard substitutions always available and consistent in all steps of the pipeline are:
+- `{date}`: current date, e.g. `2024_11_27`
+- `{time}`: current time in UTC format, e.g. `11_07_09_326492`
+{% endhint %}
+
+### Multiple Output Handling
+
+If you plan to return multiple artifacts from you ZenML step you can flexibly combine all naming options outlined above, like this:
+
+```python
+@step
+def mixed_tuple() -> Tuple[
+ Annotated[str, "static_output_name"],
+ Annotated[str, "name_{date}_{time}"],
+]:
+ return "static_namer", "str_namer"
+```
+
+## Naming in cached runs
+
+If your ZenML step is running with enabled caching and cache was used the names of the outputs artifacts (both static and dynamic) will remain the same as in the original run.
+
+```python
+from typing_extensions import Annotated
+from typing import Tuple
+
+from zenml import step, pipeline
+from zenml.models import PipelineRunResponse
+
+
+@step(substitutions={"custom_placeholder": "resolution"})
+def demo() -> Tuple[
+ Annotated[int, "name_{date}_{time}"],
+ Annotated[int, "name_{custom_placeholder}"],
+]:
+ return 42, 43
+
+
+@pipeline
+def my_pipeline():
+ demo()
+
+
+if __name__ == "__main__":
+ run_without_cache: PipelineRunResponse = my_pipeline.with_options(
+ enable_cache=False
+ )()
+ run_with_cache: PipelineRunResponse = my_pipeline.with_options(enable_cache=True)()
+
+ assert set(run_without_cache.steps["demo"].outputs.keys()) == set(
+ run_with_cache.steps["demo"].outputs.keys()
+ )
+ print(list(run_without_cache.steps["demo"].outputs.keys()))
+```
+
+These 2 runs will produce output like the one below:
+```
+Initiating a new run for the pipeline: my_pipeline.
+Caching is disabled by default for my_pipeline.
+Using user: default
+Using stack: default
+ orchestrator: default
+ artifact_store: default
+You can visualize your pipeline runs in the ZenML Dashboard. In order to try it locally, please run zenml login --local.
+Step demo has started.
+Step demo has finished in 0.038s.
+Pipeline run has finished in 0.064s.
+Initiating a new run for the pipeline: my_pipeline.
+Using user: default
+Using stack: default
+ orchestrator: default
+ artifact_store: default
+You can visualize your pipeline runs in the ZenML Dashboard. In order to try it locally, please run zenml login --local.
+Using cached version of step demo.
+All steps of the pipeline run were cached.
+['name_2024_11_21_14_27_33_750134', 'name_resolution']
+```
+
+
+
diff --git a/docs/book/how-to/model-management-metrics/model-control-plane/model-versions.md b/docs/book/how-to/model-management-metrics/model-control-plane/model-versions.md
index 9f24f7db15..d331a91ff7 100644
--- a/docs/book/how-to/model-management-metrics/model-control-plane/model-versions.md
+++ b/docs/book/how-to/model-management-metrics/model-control-plane/model-versions.md
@@ -37,13 +37,13 @@ Please note in the above example if the model version exists, it is automaticall
## Use name templates for your model versions
-If you want to continuously run the same project, but keep track of your model versions using semantical naming, you can rely on templated naming in the `version` argument to the `Model` object. Instead of static model version name from the previous section, templated names will be unique with every new run, but also will be semantically searchable and readable by your team.
+If you want to continuously run the same project, but keep track of your model versions using semantical naming, you can rely on templated naming in the `version` and/or `name` argument to the `Model` object. Instead of static model version name from the previous section, templated names will be unique with every new run, but also will be semantically searchable and readable by your team.
```python
from zenml import Model, step, pipeline
model= Model(
- name="my_model",
+ name="{team}_my_model",
version="experiment_with_phi_3_{date}_{time}"
)
@@ -53,16 +53,24 @@ def llm_trainer(...) -> ...:
...
# This configures it for all steps within the pipeline
-@pipeline(model=model)
+@pipeline(model=model, substitutions={"team": "Team_A"})
def training_pipeline( ... ):
# training happens here
```
-Here we are specifically setting the model configuration for a particular step or for the pipeline as a whole. Once you run this pipeline it will produce a model version with a name evaluated at a runtime, like `experiment_with_phi_3_2024_08_30_12_42_53`. Subsequent runs will also have unique but readable names.
+Here we are specifically setting the model configuration for a particular step or for the pipeline as a whole. Once you run this pipeline it will produce a model version with a name evaluated at a runtime, like `experiment_with_phi_3_2024_08_30_12_42_53`. Subsequent runs will have the same name of the model and model version, since the substitutions like `time` and `date` are evaluated for the whole pipeline run. We also used a custom substitution via `{team}` placeholder and set it to `Team_A` in the `pipeline` decorator.
-We currently support following placeholders to be used in model version name templates:
-- `{date}`: current date
-- `{time}`: current time in UTC format
+{% hint style="info" %}
+The substitutions for the custom placeholders like `team` can be set in:
+- `@pipeline` decorator, so they are effective for all steps in this pipeline
+- `pipeline.with_options` function, so they are effective for all steps in this pipeline run
+- `@step` decorator, so they are effective for this step (this overrides the pipeline settings)
+- `step.with_options` function, so they are effective for this step run (this overrides the pipeline settings)
+
+Standard substitutions always available and consistent in all steps of the pipeline are:
+- `{date}`: current date, e.g. `2024_11_27`
+- `{time}`: current time in UTC format, e.g. `11_07_09_326492`
+{% endhint %}
## Fetching model versions by stage
diff --git a/docs/book/how-to/pipeline-development/build-pipelines/dynamically-assign-artifact-names.md b/docs/book/how-to/pipeline-development/build-pipelines/dynamically-assign-artifact-names.md
deleted file mode 100644
index 179913d9ae..0000000000
--- a/docs/book/how-to/pipeline-development/build-pipelines/dynamically-assign-artifact-names.md
+++ /dev/null
@@ -1,143 +0,0 @@
----
-description: How to dynamically assign artifact names in your pipelines.
----
-
-# Dynamically assign artifact names
-
-In ZenML pipelines, you often need to reuse the same step multiple times with
-different inputs, resulting in multiple artifacts. However, the default naming
-convention for artifacts can make it challenging to track and differentiate
-between these outputs, especially when they need to be used in subsequent
-pipelines. Below you can find a detailed exploration of how you might go about dynamically generating steps and artifacts to improve pipeline flexibility and maintainability.
-
-By default, ZenML uses type annotations in function definitions to determine artifact names. While this works well for steps used once in a pipeline, it becomes problematic when:
-
-1. The same step is called multiple times with different inputs.
-2. The resulting artifacts need to be used in different pipelines later.
-3. Output artifacts are saved with the same name and incremented version numbers.
-
-For example, when using a preprocessor step that needs to transform train, validation, and test data separately, you might end up with three versions of an artifact called `transformed_data`, making it difficult to track which is which.
-
-ZenML offers two possible ways to address this problem:
-
-1. Using factory functions to create dynamic steps with custom artifact names.
-2. Using metadata to identify artifacts in a single step.
-
-## 1. Using factory functions for dynamic artifact names
-
-This approach allows you to create steps with custom artifact names dynamically:
-
-```python
-from typing import Any, Dict
-from typing_extensions import Annotated
-from zenml import step, pipeline, get_step_context, ArtifactConfig
-
-def create_step(prefix: str):
- def _entrypoint(data: Any) -> Annotated[Dict[str, Any], ArtifactConfig(name=f"{prefix}_artifact")]:
- context = get_step_context()
- return {"processed_data": data, "step_name": context.step_name}
-
- step_name = f"dynamic_step_{prefix}"
- _entrypoint.__name__ = step_name
- s = step(_entrypoint)
- globals()[step_name] = s
- return s
-
-# Create the dynamic steps
-train_step = create_step(prefix="train")
-validation_step = create_step(prefix="validation")
-test_step = create_step(prefix="test")
-
-# Resolve the steps
-train_step.resolve()
-validation_step.resolve()
-test_step.resolve()
-
-@pipeline
-def dynamic_artifact_pipeline(train_data, val_data, test_data):
- train_result = train_step(train_data)
- validation_result = validation_step(val_data)
- test_result = test_step(test_data)
-
-
-dynamic_artifact_pipeline(train_data=1, val_data=2, test_data=3)
-```
-
-This method generates unique artifact names for each step, making it easier to
-track and retrieve specific artifacts later in your workflow.
-
-![Dynamic artifact pipeline DAG in the
-dashboard](../../../.gitbook/assets/dynamic_artifact_pipeline.png)
-
-One caveat applies to this first method which is that either of the following
-two things must be true:
-
-- The factory must be in the same file as where the steps are defined -> This is
- so the logic with `globals()` works
-- The user must have use the same variable name for the step as the `__name__`
- of the entrypoint function
-
-As you can see, this is not always possible or desirable and you should use
-the second method if you can.
-
-## 2. Using Metadata for Custom Artifact Identification
-
-If you prefer using a single step and differentiating artifacts through metadata, try this approach:
-
-```python
-from typing import Any, Dict
-from typing_extensions import Annotated
-from zenml import step, get_step_context, pipeline
-
-@step
-def generic_step(data: Any, prefix: str) -> Annotated[Dict[str, Any], "dataset"]:
- result = {"processed_data": data}
-
- # Add custom metadata
- step_context = get_step_context()
- step_context.add_output_metadata(
- output_name="dataset",
- metadata={"custom_prefix": prefix}
- )
-
- return result
-
-@pipeline
-def metadata_artifact_pipeline(train_data, val_data, test_data):
- generic_step(train_data, prefix="train")
- generic_step(val_data, prefix="validation")
- generic_step(test_data, prefix="test")
-
-metadata_artifact_pipeline(train_data=1, val_data=2, test_data=3)
-```
-
-We can see the metadata in the dashboard:
-
-![Metadata visible in the dashboard](../../../.gitbook/assets/metadata_artifact_pipeline.png)
-
-This method uses a single `generic_step` but adds custom metadata to each artifact. You can later use this metadata to identify and differentiate between artifacts:
-
-```python
-from zenml.client import Client
-
-client = Client()
-artifacts = client.list_artifact_versions("generic_artifact")
-for artifact in artifacts:
- prefix = artifact.run_metadata.get("custom_prefix")
- if prefix == "train":
- train_data = artifact.load()
- elif prefix == "validation":
- val_data = artifact.load()
- elif prefix == "test":
- test_data = artifact.load()
-```
-
-Both solutions provide ways to custom-identify your artifacts without modifying
-ZenML's core functionality. The factory function approach offers more control
-over the artifact name itself, while the metadata approach maintains consistent
-artifact names but adds custom metadata for identification.
-
-
-
-
-
diff --git a/docs/book/how-to/pipeline-development/build-pipelines/name-your-pipeline-runs.md b/docs/book/how-to/pipeline-development/build-pipelines/name-your-pipeline-runs.md
index 0259a78eab..c3e4f5a568 100644
--- a/docs/book/how-to/pipeline-development/build-pipelines/name-your-pipeline-runs.md
+++ b/docs/book/how-to/pipeline-development/build-pipelines/name-your-pipeline-runs.md
@@ -15,14 +15,21 @@ training_pipeline = training_pipeline.with_options(
training_pipeline()
```
-Pipeline run names must be unique, so if you plan to run your pipelines multiple times or run them on a schedule, make sure to either compute the run name dynamically or include one of the following placeholders that ZenML will replace:
+Pipeline run names must be unique, so if you plan to run your pipelines multiple times or run them on a schedule, make sure to either compute the run name dynamically or include one of the placeholders that ZenML will replace.
-* `{date}` will resolve to the current date, e.g. `2023_02_19`
-* `{time}` will resolve to the current time, e.g. `11_07_09_326492`
+{% hint style="info" %}
+The substitutions for the custom placeholders like `experiment_name` can be set in:
+- `@pipeline` decorator, so they are effective for all steps in this pipeline
+- `pipeline.with_options` function, so they are effective for all steps in this pipeline run
+
+Standard substitutions always available and consistent in all steps of the pipeline are:
+- `{date}`: current date, e.g. `2024_11_27`
+- `{time}`: current time in UTC format, e.g. `11_07_09_326492`
+{% endhint %}
```python
training_pipeline = training_pipeline.with_options(
- run_name="custom_pipeline_run_name_{date}_{time}"
+ run_name="custom_pipeline_run_name_{experiment_name}_{date}_{time}"
)
training_pipeline()
```
diff --git a/docs/book/toc.md b/docs/book/toc.md
index 0ab38b1b9e..ddd58e52aa 100644
--- a/docs/book/toc.md
+++ b/docs/book/toc.md
@@ -94,7 +94,6 @@
* [Schedule a pipeline](how-to/pipeline-development/build-pipelines/schedule-a-pipeline.md)
* [Deleting a pipeline](how-to/pipeline-development/build-pipelines/delete-a-pipeline.md)
* [Compose pipelines](how-to/pipeline-development/build-pipelines/compose-pipelines.md)
- * [Dynamically assign artifact names](how-to/pipeline-development/build-pipelines/dynamically-assign-artifact-names.md)
* [Automatically retry steps](how-to/pipeline-development/build-pipelines/retry-steps.md)
* [Run pipelines asynchronously](how-to/pipeline-development/build-pipelines/run-pipelines-asynchronously.md)
* [Control execution order of steps](how-to/pipeline-development/build-pipelines/control-execution-order-of-steps.md)
@@ -123,6 +122,7 @@
* [How ZenML stores data](how-to/data-artifact-management/handle-data-artifacts/artifact-versioning.md)
* [Return multiple outputs from a step](how-to/data-artifact-management/handle-data-artifacts/return-multiple-outputs-from-a-step.md)
* [Delete an artifact](how-to/data-artifact-management/handle-data-artifacts/delete-an-artifact.md)
+ * [Artifacts naming](how-to/data-artifact-management/handle-data-artifacts/artifacts-naming.md)
* [Organize data with tags](how-to/data-artifact-management/handle-data-artifacts/tagging.md)
* [Get arbitrary artifacts in a step](how-to/data-artifact-management/handle-data-artifacts/get-arbitrary-artifacts-in-a-step.md)
* [Handle custom data types](how-to/data-artifact-management/handle-data-artifacts/handle-custom-data-types.md)
diff --git a/src/zenml/artifacts/artifact_config.py b/src/zenml/artifacts/artifact_config.py
index f420817141..c534fbb447 100644
--- a/src/zenml/artifacts/artifact_config.py
+++ b/src/zenml/artifacts/artifact_config.py
@@ -21,6 +21,7 @@
from zenml.logger import get_logger
from zenml.metadata.metadata_types import MetadataType
from zenml.utils.pydantic_utils import before_validator_handler
+from zenml.utils.string_utils import format_name_template
logger = get_logger(__name__)
@@ -45,7 +46,13 @@ def my_step() -> Annotated[
```
Attributes:
- name: The name of the artifact.
+ name: The name of the artifact:
+ - static string e.g. "name"
+ - dynamic string e.g. "name_{date}_{time}_{custom_placeholder}"
+ If you use any placeholders besides `date` and `time`,
+ you need to provide the values for them in the `substitutions`
+ argument of the step decorator or the `substitutions` argument
+ of `with_options` of the step.
version: The version of the artifact.
tags: The tags of the artifact.
run_metadata: Metadata to add to the artifact.
@@ -111,3 +118,16 @@ def _remove_old_attributes(cls, data: Dict[str, Any]) -> Dict[str, Any]:
data.setdefault("artifact_type", ArtifactType.SERVICE)
return data
+
+ def _evaluated_name(self, substitutions: Dict[str, str]) -> Optional[str]:
+ """Evaluated name of the artifact.
+
+ Args:
+ substitutions: Extra placeholders to use in the name template.
+
+ Returns:
+ The evaluated name of the artifact.
+ """
+ if self.name:
+ return format_name_template(self.name, substitutions=substitutions)
+ return self.name
diff --git a/src/zenml/artifacts/utils.py b/src/zenml/artifacts/utils.py
index 9b8ebaff93..22067abe33 100644
--- a/src/zenml/artifacts/utils.py
+++ b/src/zenml/artifacts/utils.py
@@ -689,7 +689,11 @@ def _link_artifact_version_to_the_step_and_model(
client.zen_store.update_run_step(
step_run_id=step_run.id,
step_run_update=StepRunUpdate(
- outputs={artifact_version.artifact.name: artifact_version.id}
+ outputs={
+ artifact_version.artifact.name: [
+ artifact_version.id,
+ ]
+ }
),
)
error_message = "model"
diff --git a/src/zenml/config/compiler.py b/src/zenml/config/compiler.py
index 5d1cc00690..a2526ced46 100644
--- a/src/zenml/config/compiler.py
+++ b/src/zenml/config/compiler.py
@@ -99,7 +99,10 @@ def compile(
self._apply_stack_default_settings(pipeline=pipeline, stack=stack)
if run_configuration.run_name:
- self._verify_run_name(run_configuration.run_name)
+ self._verify_run_name(
+ run_configuration.run_name,
+ pipeline.configuration.substitutions,
+ )
pipeline_settings = self._filter_and_validate_settings(
settings=pipeline.configuration.settings,
@@ -305,16 +308,22 @@ def _get_default_settings(
return default_settings
@staticmethod
- def _verify_run_name(run_name: str) -> None:
+ def _verify_run_name(
+ run_name: str,
+ substitutions: Dict[str, str],
+ ) -> None:
"""Verifies that the run name contains only valid placeholders.
Args:
run_name: The run name to verify.
+ substitutions: The substitutions to be used in the run name.
Raises:
ValueError: If the run name contains invalid placeholders.
"""
- valid_placeholder_names = {"date", "time"}
+ valid_placeholder_names = {"date", "time"}.union(
+ set(substitutions.keys())
+ )
placeholders = {
v[1] for v in string.Formatter().parse(run_name) if v[1]
}
diff --git a/src/zenml/config/pipeline_configurations.py b/src/zenml/config/pipeline_configurations.py
index a79570d245..e3004f821b 100644
--- a/src/zenml/config/pipeline_configurations.py
+++ b/src/zenml/config/pipeline_configurations.py
@@ -13,6 +13,7 @@
# permissions and limitations under the License.
"""Pipeline configuration classes."""
+from datetime import datetime
from typing import TYPE_CHECKING, Any, Dict, List, Optional
from pydantic import SerializeAsAny, field_validator
@@ -46,6 +47,25 @@ class PipelineConfigurationUpdate(StrictBaseModel):
model: Optional[Model] = None
parameters: Optional[Dict[str, Any]] = None
retry: Optional[StepRetryConfig] = None
+ substitutions: Dict[str, str] = {}
+
+ def _get_full_substitutions(
+ self, start_time: Optional[datetime]
+ ) -> Dict[str, str]:
+ """Returns the full substitutions dict.
+
+ Args:
+ start_time: Start time of the pipeline run.
+
+ Returns:
+ The full substitutions dict including date and time.
+ """
+ if start_time is None:
+ start_time = datetime.utcnow()
+ ret = self.substitutions.copy()
+ ret.setdefault("date", start_time.strftime("%Y_%m_%d"))
+ ret.setdefault("time", start_time.strftime("%H_%M_%S_%f"))
+ return ret
class PipelineConfiguration(PipelineConfigurationUpdate):
diff --git a/src/zenml/config/pipeline_run_configuration.py b/src/zenml/config/pipeline_run_configuration.py
index 1788f860cb..f3f7db16e7 100644
--- a/src/zenml/config/pipeline_run_configuration.py
+++ b/src/zenml/config/pipeline_run_configuration.py
@@ -52,3 +52,4 @@ class PipelineRunConfiguration(
retry: Optional[StepRetryConfig] = None
failure_hook_source: Optional[SourceWithValidator] = None
success_hook_source: Optional[SourceWithValidator] = None
+ substitutions: Dict[str, str] = {}
diff --git a/src/zenml/config/step_configurations.py b/src/zenml/config/step_configurations.py
index b7c4764ba5..8916d8e068 100644
--- a/src/zenml/config/step_configurations.py
+++ b/src/zenml/config/step_configurations.py
@@ -13,6 +13,7 @@
# permissions and limitations under the License.
"""Pipeline configuration classes."""
+from datetime import datetime
from typing import (
TYPE_CHECKING,
Any,
@@ -49,6 +50,7 @@
if TYPE_CHECKING:
from zenml.config import DockerSettings, ResourceSettings
+ from zenml.config.pipeline_configurations import PipelineConfiguration
logger = get_logger(__name__)
@@ -152,6 +154,7 @@ class StepConfigurationUpdate(StrictBaseModel):
success_hook_source: Optional[SourceWithValidator] = None
model: Optional[Model] = None
retry: Optional[StepRetryConfig] = None
+ substitutions: Dict[str, str] = {}
outputs: Mapping[str, PartialArtifactConfiguration] = {}
@@ -237,6 +240,24 @@ def docker_settings(self) -> "DockerSettings":
model_or_dict = model_or_dict.model_dump()
return DockerSettings.model_validate(model_or_dict)
+ def _get_full_substitutions(
+ self,
+ pipeline_config: "PipelineConfiguration",
+ start_time: Optional[datetime],
+ ) -> Dict[str, str]:
+ """Get the full set of substitutions for this step configuration.
+
+ Args:
+ pipeline_config: The pipeline configuration.
+ start_time: The start time of the pipeline run.
+
+ Returns:
+ The full set of substitutions for this step configuration.
+ """
+ ret = pipeline_config._get_full_substitutions(start_time)
+ ret.update(self.substitutions)
+ return ret
+
class InputSpec(StrictBaseModel):
"""Step input specification."""
diff --git a/src/zenml/model/model.py b/src/zenml/model/model.py
index 7c8fee6cf9..786287c910 100644
--- a/src/zenml/model/model.py
+++ b/src/zenml/model/model.py
@@ -57,7 +57,9 @@ class Model(BaseModel):
ethics: The ethical implications of the model.
tags: Tags associated with the model.
version: The version name, version number or stage is optional and points model context
- to a specific version/stage. If skipped new version will be created.
+ to a specific version/stage. If skipped new version will be created. `version`
+ also supports placeholders: standard `{date}` and `{time}` and any custom placeholders
+ that are passed as substitutions in the pipeline or step decorators.
save_models_to_registry: Whether to save all ModelArtifacts to Model Registry,
if available in active stack.
"""
@@ -534,6 +536,8 @@ def _get_or_create_model(self) -> "ModelResponse":
from zenml.models import ModelRequest
zenml_client = Client()
+ # backup logic, if the Model class is used directly from the code
+ self.name = format_name_template(self.name, substitutions={})
if self.model_version_id:
mv = zenml_client.get_model_version(
model_version_name_or_number_or_id=self.model_version_id,
@@ -663,7 +667,7 @@ def _get_or_create_model_version(
# backup logic, if the Model class is used directly from the code
if isinstance(self.version, str):
- self.version = format_name_template(self.version)
+ self.version = format_name_template(self.version, substitutions={})
try:
if self.version or self.model_version_id:
diff --git a/src/zenml/models/v2/core/pipeline_run.py b/src/zenml/models/v2/core/pipeline_run.py
index 8468c105be..bd93be98b0 100644
--- a/src/zenml/models/v2/core/pipeline_run.py
+++ b/src/zenml/models/v2/core/pipeline_run.py
@@ -237,6 +237,10 @@ class PipelineRunResponseMetadata(WorkspaceScopedResponseMetadata):
default=False,
description="Whether a template can be created from this run.",
)
+ steps_substitutions: Dict[str, Dict[str, str]] = Field(
+ title="Substitutions used in the step runs of this pipeline run.",
+ default_factory=dict,
+ )
class PipelineRunResponseResources(WorkspaceScopedResponseResources):
diff --git a/src/zenml/models/v2/core/step_run.py b/src/zenml/models/v2/core/step_run.py
index 7052a1b42d..2916d9236c 100644
--- a/src/zenml/models/v2/core/step_run.py
+++ b/src/zenml/models/v2/core/step_run.py
@@ -142,7 +142,7 @@ class StepRunRequest(WorkspaceScopedRequest):
class StepRunUpdate(BaseModel):
"""Update model for step runs."""
- outputs: Dict[str, UUID] = Field(
+ outputs: Dict[str, List[UUID]] = Field(
title="The IDs of the output artifact versions of the step run.",
default={},
)
diff --git a/src/zenml/orchestrators/publish_utils.py b/src/zenml/orchestrators/publish_utils.py
index 1b8f168517..7e4cf89c6e 100644
--- a/src/zenml/orchestrators/publish_utils.py
+++ b/src/zenml/orchestrators/publish_utils.py
@@ -32,7 +32,7 @@
def publish_successful_step_run(
- step_run_id: "UUID", output_artifact_ids: Dict[str, "UUID"]
+ step_run_id: "UUID", output_artifact_ids: Dict[str, List["UUID"]]
) -> "StepRunResponse":
"""Publishes a successful step run.
diff --git a/src/zenml/orchestrators/step_launcher.py b/src/zenml/orchestrators/step_launcher.py
index e9c8cfc188..a2f42eca06 100644
--- a/src/zenml/orchestrators/step_launcher.py
+++ b/src/zenml/orchestrators/step_launcher.py
@@ -309,8 +309,12 @@ def _create_or_reuse_run(self) -> Tuple[PipelineRunResponse, bool]:
The created or existing pipeline run,
and a boolean indicating whether the run was created or reused.
"""
+ start_time = datetime.utcnow()
run_name = orchestrator_utils.get_run_name(
- run_name_template=self._deployment.run_name_template
+ run_name_template=self._deployment.run_name_template,
+ substitutions=self._deployment.pipeline_configuration._get_full_substitutions(
+ start_time
+ ),
)
logger.debug("Creating pipeline run %s", run_name)
@@ -329,7 +333,7 @@ def _create_or_reuse_run(self) -> Tuple[PipelineRunResponse, bool]:
),
status=ExecutionStatus.RUNNING,
orchestrator_environment=get_run_environment_dict(),
- start_time=datetime.utcnow(),
+ start_time=start_time,
tags=self._deployment.pipeline_configuration.tags,
)
return client.zen_store.get_or_create_run(pipeline_run)
diff --git a/src/zenml/orchestrators/step_run_utils.py b/src/zenml/orchestrators/step_run_utils.py
index 6264e52508..d226a09f27 100644
--- a/src/zenml/orchestrators/step_run_utils.py
+++ b/src/zenml/orchestrators/step_run_utils.py
@@ -354,13 +354,16 @@ def create_cached_step_runs(
def get_or_create_model_version_for_pipeline_run(
- model: "Model", pipeline_run: PipelineRunResponse
+ model: "Model",
+ pipeline_run: PipelineRunResponse,
+ substitutions: Dict[str, str],
) -> Tuple[ModelVersionResponse, bool]:
"""Get or create a model version as part of a pipeline run.
Args:
model: The model to get or create.
pipeline_run: The pipeline run for which the model should be created.
+ substitutions: Substitutions to apply to the model version name.
Returns:
The model version and a boolean indicating whether it was newly created
@@ -374,12 +377,14 @@ def get_or_create_model_version_for_pipeline_run(
return model._get_model_version(), False
elif model.version:
if isinstance(model.version, str):
- start_time = pipeline_run.start_time or datetime.utcnow()
model.version = string_utils.format_name_template(
model.version,
- date=start_time.strftime("%Y_%m_%d"),
- time=start_time.strftime("%H_%M_%S_%f"),
+ substitutions=substitutions,
)
+ model.name = string_utils.format_name_template(
+ model.name,
+ substitutions=substitutions,
+ )
return (
model._get_or_create_model_version(),
@@ -460,7 +465,9 @@ def prepare_pipeline_run_model_version(
model_version = pipeline_run.model_version
elif config_model := pipeline_run.config.model:
model_version, _ = get_or_create_model_version_for_pipeline_run(
- model=config_model, pipeline_run=pipeline_run
+ model=config_model,
+ pipeline_run=pipeline_run,
+ substitutions=pipeline_run.config.substitutions,
)
pipeline_run = Client().zen_store.update_run(
run_id=pipeline_run.id,
@@ -492,7 +499,9 @@ def prepare_step_run_model_version(
model_version = step_run.model_version
elif config_model := step_run.config.model:
model_version, created = get_or_create_model_version_for_pipeline_run(
- model=config_model, pipeline_run=pipeline_run
+ model=config_model,
+ pipeline_run=pipeline_run,
+ substitutions=step_run.config.substitutions,
)
step_run = Client().zen_store.update_run_step(
step_run_id=step_run.id,
diff --git a/src/zenml/orchestrators/step_runner.py b/src/zenml/orchestrators/step_runner.py
index b9f2537aa3..5e40e0238c 100644
--- a/src/zenml/orchestrators/step_runner.py
+++ b/src/zenml/orchestrators/step_runner.py
@@ -152,6 +152,15 @@ def run(
func=step_instance.entrypoint
)
+ self._evaluate_artifact_names_in_collections(
+ step_run,
+ output_annotations,
+ [
+ output_artifact_uris,
+ output_materializers,
+ ],
+ )
+
self._stack.prepare_step_run(info=step_run_info)
# Initialize the step context singleton
@@ -257,7 +266,9 @@ def run(
# Update the status and output artifacts of the step run.
output_artifact_ids = {
- output_name: artifact.id
+ output_name: [
+ artifact.id,
+ ]
for output_name, artifact in output_artifacts.items()
}
publish_successful_step_run(
@@ -265,6 +276,33 @@ def run(
output_artifact_ids=output_artifact_ids,
)
+ def _evaluate_artifact_names_in_collections(
+ self,
+ step_run: "StepRunResponse",
+ output_annotations: Dict[str, OutputSignature],
+ collections: List[Dict[str, Any]],
+ ) -> None:
+ """Evaluates the artifact names in the collections.
+
+ Args:
+ step_run: The step run.
+ output_annotations: The output annotations of the step function
+ (also evaluated).
+ collections: The collections to evaluate.
+ """
+ collections.append(output_annotations)
+ for k, v in list(output_annotations.items()):
+ _evaluated_name = None
+ if v.artifact_config:
+ _evaluated_name = v.artifact_config._evaluated_name(
+ step_run.config.substitutions
+ )
+ if _evaluated_name is None:
+ _evaluated_name = k
+
+ for d in collections:
+ d[_evaluated_name] = d.pop(k)
+
def _load_step(self) -> "BaseStep":
"""Load the step instance.
diff --git a/src/zenml/orchestrators/utils.py b/src/zenml/orchestrators/utils.py
index 70145b2afc..2d68bc4f07 100644
--- a/src/zenml/orchestrators/utils.py
+++ b/src/zenml/orchestrators/utils.py
@@ -196,11 +196,12 @@ def get_config_environment_vars(
return environment_vars
-def get_run_name(run_name_template: str) -> str:
+def get_run_name(run_name_template: str, substitutions: Dict[str, str]) -> str:
"""Fill out the run name template to get a complete run name.
Args:
run_name_template: The run name template to fill out.
+ substitutions: The substitutions to use in the template.
Raises:
ValueError: If the run name is empty.
@@ -208,7 +209,9 @@ def get_run_name(run_name_template: str) -> str:
Returns:
The run name derived from the template.
"""
- run_name = format_name_template(run_name_template)
+ run_name = format_name_template(
+ run_name_template, substitutions=substitutions
+ )
if run_name == "":
raise ValueError("Empty run names are not allowed.")
diff --git a/src/zenml/pipelines/pipeline_decorator.py b/src/zenml/pipelines/pipeline_decorator.py
index ed609ea4ce..e9d8f14c47 100644
--- a/src/zenml/pipelines/pipeline_decorator.py
+++ b/src/zenml/pipelines/pipeline_decorator.py
@@ -55,6 +55,7 @@ def pipeline(
on_failure: Optional["HookSpecification"] = None,
on_success: Optional["HookSpecification"] = None,
model: Optional["Model"] = None,
+ substitutions: Optional[Dict[str, str]] = None,
) -> Callable[["F"], "Pipeline"]: ...
@@ -71,6 +72,7 @@ def pipeline(
on_failure: Optional["HookSpecification"] = None,
on_success: Optional["HookSpecification"] = None,
model: Optional["Model"] = None,
+ substitutions: Optional[Dict[str, str]] = None,
) -> Union["Pipeline", Callable[["F"], "Pipeline"]]:
"""Decorator to create a pipeline.
@@ -91,6 +93,7 @@ def pipeline(
function with no arguments, or a source path to such a function
(e.g. `module.my_function`).
model: configuration of the model in the Model Control Plane.
+ substitutions: Extra placeholders to use in the name templates.
Returns:
A pipeline instance.
@@ -111,6 +114,7 @@ def inner_decorator(func: "F") -> "Pipeline":
on_success=on_success,
model=model,
entrypoint=func,
+ substitutions=substitutions,
)
p.__doc__ = func.__doc__
diff --git a/src/zenml/pipelines/pipeline_definition.py b/src/zenml/pipelines/pipeline_definition.py
index 6be8884078..fa29e8c364 100644
--- a/src/zenml/pipelines/pipeline_definition.py
+++ b/src/zenml/pipelines/pipeline_definition.py
@@ -135,6 +135,7 @@ def __init__(
on_failure: Optional["HookSpecification"] = None,
on_success: Optional["HookSpecification"] = None,
model: Optional["Model"] = None,
+ substitutions: Optional[Dict[str, str]] = None,
) -> None:
"""Initializes a pipeline.
@@ -157,6 +158,7 @@ def __init__(
be a function with no arguments, or a source path to such a
function (e.g. `module.my_function`).
model: configuration of the model in the Model Control Plane.
+ substitutions: Extra placeholders to use in the name templates.
"""
self._invocations: Dict[str, StepInvocation] = {}
self._run_args: Dict[str, Any] = {}
@@ -177,6 +179,7 @@ def __init__(
on_failure=on_failure,
on_success=on_success,
model=model,
+ substitutions=substitutions,
)
self.entrypoint = entrypoint
self._parameters: Dict[str, Any] = {}
@@ -297,6 +300,7 @@ def configure(
model: Optional["Model"] = None,
parameters: Optional[Dict[str, Any]] = None,
merge: bool = True,
+ substitutions: Optional[Dict[str, str]] = None,
) -> Self:
"""Configures the pipeline.
@@ -333,6 +337,7 @@ def configure(
method for an example.
model: configuration of the model version in the Model Control Plane.
parameters: input parameters for the pipeline.
+ substitutions: Extra placeholders to use in the name templates.
Returns:
The pipeline instance that this method was called on.
@@ -365,6 +370,7 @@ def configure(
"success_hook_source": success_hook_source,
"model": model,
"parameters": parameters,
+ "substitutions": substitutions,
}
)
if not self.__suppress_warnings_flag__:
@@ -656,7 +662,8 @@ def _create_deployment(
schedule_name = schedule.name
else:
schedule_name = format_name_template(
- deployment.run_name_template
+ deployment.run_name_template,
+ substitutions=deployment.pipeline_configuration.substitutions,
)
components = Client().active_stack_model.components
orchestrator = components[StackComponentType.ORCHESTRATOR][0]
diff --git a/src/zenml/pipelines/run_utils.py b/src/zenml/pipelines/run_utils.py
index 9fb307dc01..61cd3e466c 100644
--- a/src/zenml/pipelines/run_utils.py
+++ b/src/zenml/pipelines/run_utils.py
@@ -66,16 +66,21 @@ def create_placeholder_run(
if deployment.schedule:
return None
-
+ start_time = datetime.utcnow()
run_request = PipelineRunRequest(
- name=get_run_name(run_name_template=deployment.run_name_template),
+ name=get_run_name(
+ run_name_template=deployment.run_name_template,
+ substitutions=deployment.pipeline_configuration._get_full_substitutions(
+ start_time
+ ),
+ ),
# We set the start time on the placeholder run already to
# make it consistent with the {time} placeholder in the
# run name. This means the placeholder run will usually
# have longer durations than scheduled runs, as for them
# the start_time is only set once the first step starts
# running.
- start_time=datetime.utcnow(),
+ start_time=start_time,
orchestrator_run_id=None,
user=deployment.user.id,
workspace=deployment.workspace.id,
diff --git a/src/zenml/steps/base_step.py b/src/zenml/steps/base_step.py
index b8ba79315e..55eaf54739 100644
--- a/src/zenml/steps/base_step.py
+++ b/src/zenml/steps/base_step.py
@@ -116,6 +116,7 @@ def __init__(
on_success: Optional["HookSpecification"] = None,
model: Optional["Model"] = None,
retry: Optional[StepRetryConfig] = None,
+ substitutions: Optional[Dict[str, str]] = None,
) -> None:
"""Initializes a step.
@@ -144,11 +145,13 @@ def __init__(
function (e.g. `module.my_function`).
model: configuration of the model version in the Model Control Plane.
retry: Configuration for retrying the step in case of failure.
+ substitutions: Extra placeholders to use in the name template.
"""
from zenml.config.step_configurations import PartialStepConfiguration
self.entrypoint_definition = validate_entrypoint_function(
- self.entrypoint, reserved_arguments=["after", "id"]
+ self.entrypoint,
+ reserved_arguments=["after", "id"],
)
name = name or self.__class__.__name__
@@ -203,6 +206,7 @@ def __init__(
on_success=on_success,
model=model,
retry=retry,
+ substitutions=substitutions,
)
notebook_utils.try_to_save_notebook_cell_code(self.source_object)
@@ -595,6 +599,7 @@ def configure(
model: Optional["Model"] = None,
merge: bool = True,
retry: Optional[StepRetryConfig] = None,
+ substitutions: Optional[Dict[str, str]] = None,
) -> T:
"""Configures the step.
@@ -637,6 +642,7 @@ def configure(
overwrite all existing ones. See the general description of this
method for an example.
retry: Configuration for retrying the step in case of failure.
+ substitutions: Extra placeholders to use in the name template.
Returns:
The step instance that this method was called on.
@@ -701,6 +707,7 @@ def _convert_to_tuple(value: Any) -> Tuple[Source, ...]:
"success_hook_source": success_hook_source,
"model": model,
"retry": retry,
+ "substitutions": substitutions,
}
)
config = StepConfigurationUpdate(**values)
@@ -725,6 +732,7 @@ def with_options(
on_success: Optional["HookSpecification"] = None,
model: Optional["Model"] = None,
merge: bool = True,
+ substitutions: Optional[Dict[str, str]] = None,
) -> "BaseStep":
"""Copies the step and applies the given configurations.
@@ -756,6 +764,7 @@ def with_options(
configurations. If `False` the given configurations will
overwrite all existing ones. See the general description of this
method for an example.
+ substitutions: Extra placeholders for the step name.
Returns:
The copied step instance.
@@ -776,6 +785,7 @@ def with_options(
on_success=on_success,
model=model,
merge=merge,
+ substitutions=substitutions,
)
return step_copy
diff --git a/src/zenml/steps/entrypoint_function_utils.py b/src/zenml/steps/entrypoint_function_utils.py
index 5034bd8979..6897b2dfcf 100644
--- a/src/zenml/steps/entrypoint_function_utils.py
+++ b/src/zenml/steps/entrypoint_function_utils.py
@@ -211,7 +211,8 @@ def _validate_input_value(
def validate_entrypoint_function(
- func: Callable[..., Any], reserved_arguments: Sequence[str] = ()
+ func: Callable[..., Any],
+ reserved_arguments: Sequence[str] = (),
) -> EntrypointFunctionDefinition:
"""Validates a step entrypoint function.
@@ -258,7 +259,8 @@ def validate_entrypoint_function(
inputs[key] = parameter
outputs = parse_return_type_annotations(
- func=func, enforce_type_annotations=ENFORCE_TYPE_ANNOTATIONS
+ func=func,
+ enforce_type_annotations=ENFORCE_TYPE_ANNOTATIONS,
)
return EntrypointFunctionDefinition(
diff --git a/src/zenml/steps/step_decorator.py b/src/zenml/steps/step_decorator.py
index 7ab22f99fb..bd546c9e91 100644
--- a/src/zenml/steps/step_decorator.py
+++ b/src/zenml/steps/step_decorator.py
@@ -73,6 +73,7 @@ def step(
on_success: Optional["HookSpecification"] = None,
model: Optional["Model"] = None,
retry: Optional["StepRetryConfig"] = None,
+ substitutions: Optional[Dict[str, str]] = None,
) -> Callable[["F"], "BaseStep"]: ...
@@ -93,6 +94,7 @@ def step(
on_success: Optional["HookSpecification"] = None,
model: Optional["Model"] = None,
retry: Optional["StepRetryConfig"] = None,
+ substitutions: Optional[Dict[str, str]] = None,
) -> Union["BaseStep", Callable[["F"], "BaseStep"]]:
"""Decorator to create a ZenML step.
@@ -124,6 +126,7 @@ def step(
(e.g. `module.my_function`).
model: configuration of the model in the Model Control Plane.
retry: configuration of step retry in case of step failure.
+ substitutions: Extra placeholders for the step name.
Returns:
The step instance.
@@ -157,6 +160,7 @@ def inner_decorator(func: "F") -> "BaseStep":
on_success=on_success,
model=model,
retry=retry,
+ substitutions=substitutions,
)
return step_instance
diff --git a/src/zenml/steps/utils.py b/src/zenml/steps/utils.py
index e237d12f9f..cd4b720c06 100644
--- a/src/zenml/steps/utils.py
+++ b/src/zenml/steps/utils.py
@@ -18,7 +18,15 @@
import contextlib
import inspect
import textwrap
-from typing import TYPE_CHECKING, Any, Callable, Dict, Optional, Tuple, Union
+from typing import (
+ TYPE_CHECKING,
+ Any,
+ Callable,
+ Dict,
+ Optional,
+ Tuple,
+ Union,
+)
from uuid import UUID
from pydantic import BaseModel
@@ -94,7 +102,8 @@ def get_args(obj: Any) -> Tuple[Any, ...]:
def parse_return_type_annotations(
- func: Callable[..., Any], enforce_type_annotations: bool = False
+ func: Callable[..., Any],
+ enforce_type_annotations: bool = False,
) -> Dict[str, OutputSignature]:
"""Parse the return type annotation of a step function.
@@ -228,9 +237,12 @@ def get_artifact_config_from_annotation_metadata(
error_message = (
"Artifact annotation should only contain two elements: the artifact "
- "type, and either an output name or an `ArtifactConfig`, e.g.: "
- "`Annotated[int, 'output_name']` or "
- "`Annotated[int, ArtifactConfig(name='output_name'), ...]`."
+ "type, and one of the following: { a static or dynamic name || "
+ "an `ArtifactConfig` }, e.g.: "
+ "`Annotated[int, 'output_name']` || "
+ "`Annotated[int, 'output_{placeholder}']` || "
+ "`Annotated[int, ArtifactConfig(name='output_{placeholder}')]` ||"
+ "`Annotated[int, ArtifactConfig(name='output_name')]`."
)
if len(metadata) > 2:
diff --git a/src/zenml/utils/string_utils.py b/src/zenml/utils/string_utils.py
index 1b5091aff4..77d434d503 100644
--- a/src/zenml/utils/string_utils.py
+++ b/src/zenml/utils/string_utils.py
@@ -14,10 +14,10 @@
"""Utils for strings."""
import base64
-import datetime
import functools
import random
import string
+from datetime import datetime
from typing import Any, Callable, Dict, TypeVar, cast
from pydantic import BaseModel
@@ -147,7 +147,7 @@ def validate_name(model: BaseModel) -> None:
def format_name_template(
name_template: str,
- **kwargs: str,
+ substitutions: Dict[str, str],
) -> str:
"""Formats a name template with the given arguments.
@@ -157,20 +157,38 @@ def format_name_template(
Args:
name_template: The name template to format.
- **kwargs: The arguments to replace in the template.
+ substitutions: A dictionary of substitutions to use in the template.
Returns:
The formatted name template.
+
+ Raises:
+ KeyError: If a key in template is missing in the kwargs.
"""
- kwargs["date"] = kwargs.get(
- "date",
- datetime.datetime.now(datetime.timezone.utc).strftime("%Y_%m_%d"),
- )
- kwargs["time"] = kwargs.get(
- "time",
- datetime.datetime.now(datetime.timezone.utc).strftime("%H_%M_%S_%f"),
- )
- return name_template.format(**kwargs)
+ if ("date" not in substitutions and "{date}" in name_template) or (
+ "time" not in substitutions and "{time}" in name_template
+ ):
+ from zenml import get_step_context
+
+ try:
+ pr = get_step_context().pipeline_run
+ start_time = pr.start_time
+ substitutions.update(pr.config.substitutions)
+ except RuntimeError:
+ start_time = None
+
+ if start_time is None:
+ start_time = datetime.utcnow()
+ substitutions.setdefault("date", start_time.strftime("%Y_%m_%d"))
+ substitutions.setdefault("time", start_time.strftime("%H_%M_%S_%f"))
+
+ try:
+ return name_template.format(**substitutions)
+ except KeyError as e:
+ raise KeyError(
+ f"Could not format the name template `{name_template}`. "
+ f"Missing key: {e}"
+ )
def substitute_string(value: V, substitution_func: Callable[[str], str]) -> V:
diff --git a/src/zenml/zen_server/template_execution/utils.py b/src/zenml/zen_server/template_execution/utils.py
index 0bc5620cf4..c1a1b35cca 100644
--- a/src/zenml/zen_server/template_execution/utils.py
+++ b/src/zenml/zen_server/template_execution/utils.py
@@ -363,6 +363,7 @@ def deployment_request_from_template(
"external_input_artifacts",
"model_artifacts_or_metadata",
"client_lazy_loaders",
+ "substitutions",
"outputs",
}
),
diff --git a/src/zenml/zen_stores/schemas/artifact_schemas.py b/src/zenml/zen_stores/schemas/artifact_schemas.py
index 8b08e51b56..eda2f17927 100644
--- a/src/zenml/zen_stores/schemas/artifact_schemas.py
+++ b/src/zenml/zen_stores/schemas/artifact_schemas.py
@@ -352,8 +352,9 @@ def to_model(
producer_step_run_id = step_run.original_step_run_id
# Create the body of the model
+ artifact = self.artifact.to_model()
body = ArtifactVersionResponseBody(
- artifact=self.artifact.to_model(),
+ artifact=artifact,
version=self.version or str(self.version_number),
user=self.user.to_model() if self.user else None,
uri=self.uri,
diff --git a/src/zenml/zen_stores/schemas/pipeline_run_schemas.py b/src/zenml/zen_stores/schemas/pipeline_run_schemas.py
index 6028451acf..009afaff24 100644
--- a/src/zenml/zen_stores/schemas/pipeline_run_schemas.py
+++ b/src/zenml/zen_stores/schemas/pipeline_run_schemas.py
@@ -284,6 +284,10 @@ def to_model(
deployment = self.deployment.to_model()
config = deployment.pipeline_configuration
+ new_substitutions = config._get_full_substitutions(self.start_time)
+ config = config.model_copy(
+ update={"substitutions": new_substitutions}
+ )
client_environment = deployment.client_environment
stack = deployment.stack
@@ -323,9 +327,11 @@ def to_model(
build=build,
schedule=schedule,
code_reference=code_reference,
- trigger_execution=self.trigger_execution.to_model()
- if self.trigger_execution
- else None,
+ trigger_execution=(
+ self.trigger_execution.to_model()
+ if self.trigger_execution
+ else None
+ ),
created=self.created,
updated=self.updated,
deployment_id=self.deployment_id,
@@ -344,6 +350,10 @@ def to_model(
steps = {step.name: step.to_model() for step in self.step_runs}
+ steps_substitutions = {
+ step_name: step.config.substitutions
+ for step_name, step in steps.items()
+ }
metadata = PipelineRunResponseMetadata(
workspace=self.workspace.to_model(),
run_metadata=run_metadata,
@@ -361,6 +371,7 @@ def to_model(
if self.deployment
else None,
is_templatable=is_templatable,
+ steps_substitutions=steps_substitutions,
)
resources = None
diff --git a/src/zenml/zen_stores/schemas/step_run_schemas.py b/src/zenml/zen_stores/schemas/step_run_schemas.py
index 8500db9715..860ff79442 100644
--- a/src/zenml/zen_stores/schemas/step_run_schemas.py
+++ b/src/zenml/zen_stores/schemas/step_run_schemas.py
@@ -23,6 +23,7 @@
from sqlalchemy.dialects.mysql import MEDIUMTEXT
from sqlmodel import Field, Relationship, SQLModel
+from zenml.config.pipeline_configurations import PipelineConfiguration
from zenml.config.step_configurations import Step
from zenml.constants import MEDIUMTEXT_MAX_LENGTH
from zenml.enums import (
@@ -163,6 +164,9 @@ class StepRunSchema(NamedSchema, table=True):
"primaryjoin": "StepRunParentsSchema.child_id == StepRunSchema.id",
},
)
+ pipeline_run: "PipelineRunSchema" = Relationship(
+ back_populates="step_runs"
+ )
model_version: "ModelVersionSchema" = Relationship(
back_populates="step_runs",
)
@@ -248,6 +252,21 @@ def to_model(
full_step_config = Step.model_validate(
step_configuration[self.name]
)
+ new_substitutions = (
+ full_step_config.config._get_full_substitutions(
+ PipelineConfiguration.model_validate_json(
+ self.deployment.pipeline_configuration
+ ),
+ self.pipeline_run.start_time,
+ )
+ )
+ full_step_config = full_step_config.model_copy(
+ update={
+ "config": full_step_config.config.model_copy(
+ update={"substitutions": new_substitutions}
+ )
+ }
+ )
elif not self.step_configuration:
raise ValueError(
f"Unable to load the configuration for step `{self.name}` from the"
diff --git a/src/zenml/zen_stores/sql_zen_store.py b/src/zenml/zen_stores/sql_zen_store.py
index 81646f9d84..98ea30fb8e 100644
--- a/src/zenml/zen_stores/sql_zen_store.py
+++ b/src/zenml/zen_stores/sql_zen_store.py
@@ -2726,7 +2726,9 @@ def delete_artifact(self, artifact_id: UUID) -> None:
# -------------------- Artifact Versions --------------------
def _get_or_create_artifact_for_name(
- self, name: str, has_custom_name: bool
+ self,
+ name: str,
+ has_custom_name: bool,
) -> ArtifactSchema:
"""Get or create an artifact with a specific name.
@@ -2747,7 +2749,8 @@ def _get_or_create_artifact_for_name(
try:
with session.begin_nested():
artifact_request = ArtifactRequest(
- name=name, has_custom_name=has_custom_name
+ name=name,
+ has_custom_name=has_custom_name,
)
artifact = ArtifactSchema.from_request(
artifact_request
@@ -8182,12 +8185,12 @@ def create_run_step(self, step_run: StepRunRequest) -> StepRunResponse:
)
# Save output artifact IDs into the database.
- for output_name, artifact_version_ids in step_run.outputs.items():
+ for name, artifact_version_ids in step_run.outputs.items():
for artifact_version_id in artifact_version_ids:
self._set_run_step_output_artifact(
step_run_id=step_schema.id,
artifact_version_id=artifact_version_id,
- name=output_name,
+ name=name,
session=session,
)
@@ -8291,13 +8294,14 @@ def update_run_step(
session.add(existing_step_run)
# Update the artifacts.
- for name, artifact_version_id in step_run_update.outputs.items():
- self._set_run_step_output_artifact(
- step_run_id=step_run_id,
- artifact_version_id=artifact_version_id,
- name=name,
- session=session,
- )
+ for name, artifact_version_ids in step_run_update.outputs.items():
+ for artifact_version_id in artifact_version_ids:
+ self._set_run_step_output_artifact(
+ step_run_id=step_run_id,
+ artifact_version_id=artifact_version_id,
+ name=name,
+ session=session,
+ )
# Update loaded artifacts.
for (
diff --git a/tests/integration/functional/pipelines/test_pipeline_naming.py b/tests/integration/functional/pipelines/test_pipeline_naming.py
new file mode 100644
index 0000000000..a8b2a8c3ed
--- /dev/null
+++ b/tests/integration/functional/pipelines/test_pipeline_naming.py
@@ -0,0 +1,58 @@
+# Copyright (c) ZenML GmbH 2024. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at:
+#
+# https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+# or implied. See the License for the specific language governing
+# permissions and limitations under the License.
+from typing_extensions import Annotated
+
+from zenml import Model, pipeline, step
+from zenml.client import Client
+from zenml.models.v2.core.pipeline_run import PipelineRunResponse
+
+
+@step
+def my_step() -> Annotated[str, "artifact_{date}_{time}_{placeholder}"]:
+ return "42"
+
+
+@pipeline(
+ enable_cache=False,
+ model=Model(
+ name="model_{date}_{time}_{placeholder}",
+ version="model_version_{date}_{time}_{placeholder}",
+ ),
+ substitutions={"placeholder": "42"},
+)
+def my_pipeline():
+ my_step()
+
+
+def test_that_naming_is_consistent_across_the_board(clean_client: Client):
+ """Test that naming is consistent across the entities for standard placeholders.
+
+ Putting simply: `time` and `date` placeholders should be consistent across
+ all entities in the pipeline run.
+ """
+ p: PipelineRunResponse = my_pipeline.with_options(
+ run_name="run_{date}_{time}_{placeholder}"
+ )()
+ assert p.name.startswith("run_")
+ tail = p.name.split("run_")[1]
+ mv = clean_client.get_model_version(
+ model_name_or_id=f"model_{tail}",
+ model_version_name_or_number_or_id=f"model_version_{tail}",
+ )
+ assert mv.name == f"model_version_{tail}"
+ assert mv.model.name == f"model_{tail}"
+ assert (
+ clean_client.get_artifact_version(f"artifact_{tail}").name
+ == f"artifact_{tail}"
+ )
diff --git a/tests/integration/functional/steps/test_step_naming.py b/tests/integration/functional/steps/test_step_naming.py
new file mode 100644
index 0000000000..0446d4a1c2
--- /dev/null
+++ b/tests/integration/functional/steps/test_step_naming.py
@@ -0,0 +1,364 @@
+# Copyright (c) ZenML GmbH 2024. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at:
+#
+# https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+# or implied. See the License for the specific language governing
+# permissions and limitations under the License.
+
+from typing import Callable, Tuple
+
+import pytest
+from typing_extensions import Annotated
+
+from zenml import ArtifactConfig, pipeline, step
+from zenml.client import Client
+from zenml.models.v2.core.pipeline_run import PipelineRunResponse
+
+str_namer_standard = "dummy_dynamic_dt_{date}_{time}"
+str_namer_custom = "dummy_dynamic_custom_{funny_name}"
+str_namer_custom_2 = "dummy_dynamic_custom_{funnier_name}"
+static_namer = "dummy_static"
+
+
+def _validate_name_by_value(name: str, value: str) -> bool:
+ if value == "str_namer_standard":
+ return name.startswith("dummy_dynamic_dt_")
+ if value == "str_namer_custom":
+ return name.startswith("dummy_dynamic_custom_")
+ if value == "static_namer":
+ return name == "dummy_static"
+ if value == "unannotated":
+ return name.startswith("output")
+ return False
+
+
+@step
+def dynamic_single_string_standard() -> Annotated[str, str_namer_standard]:
+ return "str_namer_standard"
+
+
+@step(substitutions={"funny_name": "name_placeholder"})
+def dynamic_single_string_custom() -> Annotated[str, str_namer_custom]:
+ return "str_namer_custom"
+
+
+@step(substitutions={"funnier_name": "name_placeholder"})
+def dynamic_single_string_custom_2() -> Annotated[str, str_namer_custom_2]:
+ return "str_namer_custom"
+
+
+@step
+def dynamic_single_string_custom_no_default() -> (
+ Annotated[str, str_namer_custom]
+):
+ return "str_namer_custom"
+
+
+@step(substitutions={"funny_name": "name_placeholder"})
+def dynamic_tuple() -> (
+ Tuple[
+ Annotated[str, str_namer_standard],
+ Annotated[str, str_namer_custom],
+ ]
+):
+ return "str_namer_standard", "str_namer_custom"
+
+
+@step(substitutions={"funny_name": "name_placeholder"})
+def mixed_tuple() -> (
+ Tuple[
+ Annotated[str, str_namer_standard],
+ Annotated[str, static_namer],
+ Annotated[str, str_namer_custom],
+ ]
+):
+ return "str_namer_standard", "static_namer", "str_namer_custom"
+
+
+@step
+def static_single() -> Annotated[str, static_namer]:
+ return "static_namer"
+
+
+@step(substitutions={"funny_name": "name_placeholder"})
+def mixed_tuple_artifact_config() -> (
+ Tuple[
+ Annotated[str, ArtifactConfig(name=static_namer)],
+ Annotated[str, ArtifactConfig(name=str_namer_standard)],
+ Annotated[str, ArtifactConfig(name=str_namer_custom)],
+ ]
+):
+ return "static_namer", "str_namer_standard", "str_namer_custom"
+
+
+@step
+def dynamic_single_string_standard_controlled_return(
+ s: str,
+) -> Annotated[str, str_namer_standard]:
+ return s
+
+
+@step(substitutions={"funny_name": "name_placeholder"})
+def mixed_with_unannotated_returns() -> (
+ Tuple[
+ Annotated[str, str_namer_standard],
+ str,
+ Annotated[str, str_namer_custom],
+ str,
+ ]
+):
+ return (
+ "str_namer_standard",
+ "unannotated",
+ "str_namer_custom",
+ "unannotated",
+ )
+
+
+@pytest.mark.parametrize(
+ "step",
+ [
+ dynamic_single_string_standard,
+ dynamic_single_string_custom,
+ dynamic_tuple,
+ mixed_tuple,
+ static_single,
+ mixed_tuple_artifact_config,
+ mixed_with_unannotated_returns,
+ ],
+ ids=[
+ "dynamic_single_string_standard",
+ "dynamic_single_string_custom",
+ "dynamic_tuple",
+ "mixed_tuple",
+ "static_single",
+ "mixed_tuple_artifact_config",
+ "mixed_with_unannotated_returns",
+ ],
+)
+def test_various_naming_scenarios(step: Callable, clean_client: Client):
+ """Test that dynamic naming works in both normal and cached runs.
+
+ In cached run the names of the dynamic artifacts shall remain same as in real run.
+ """
+
+ @pipeline
+ def _inner():
+ step()
+
+ p1: PipelineRunResponse = _inner.with_options(enable_cache=False)()
+ for step_response in p1.steps.values():
+ for k in step_response.outputs.keys():
+ if k.startswith("output"):
+ value = clean_client.get_artifact_version(
+ f"{p1.pipeline.name}::{step_response.name}::{k}"
+ ).load()
+ else:
+ value = clean_client.get_artifact_version(k).load()
+ assert _validate_name_by_value(k, value)
+
+ p2: PipelineRunResponse = _inner.with_options(enable_cache=True)()
+ for step_response in p2.steps.values():
+ assert set(step_response.outputs.keys()) == set(
+ p1.steps[step_response.name].outputs.keys()
+ )
+ for k in step_response.outputs.keys():
+ if k.startswith("output"):
+ value = clean_client.get_artifact_version(
+ f"{p2.pipeline.name}::{step_response.name}::{k}"
+ ).load()
+ else:
+ value = clean_client.get_artifact_version(k).load()
+ assert _validate_name_by_value(k, value)
+
+
+def test_sequential_executions_have_different_names(clean_client: "Client"):
+ """Test that dynamic naming works each time for unique uncached runs."""
+
+ @pipeline(enable_cache=False)
+ def _inner(name_placeholder: str):
+ dynamic_single_string_custom.with_options(
+ substitutions={"funny_name": name_placeholder}
+ )()
+
+ p1: PipelineRunResponse = _inner("funny_name_42")
+ p2: PipelineRunResponse = _inner("this_is_not_funny")
+
+ assert set(p1.steps["dynamic_single_string_custom"].outputs.keys()) != set(
+ p2.steps["dynamic_single_string_custom"].outputs.keys()
+ )
+
+
+def test_sequential_executions_have_different_names_in_pipeline_context(
+ clean_client: "Client",
+):
+ """Test that dynamic naming works each time for unique uncached runs."""
+
+ @pipeline(enable_cache=False)
+ def _inner():
+ dynamic_single_string_custom_no_default()
+
+ @pipeline(enable_cache=False, substitutions={"funny_name": "p_d"})
+ def _inner_2():
+ dynamic_single_string_custom_no_default()
+
+ @pipeline(enable_cache=False, substitutions={"funny_name": "p_d"})
+ def _inner_3():
+ dynamic_single_string_custom_no_default.with_options(
+ substitutions={"funny_name": "s_wo"}
+ )()
+
+ @pipeline(enable_cache=False, substitutions={"funny_name": "p_d"})
+ def _inner_4():
+ dynamic_single_string_custom()
+
+ p1: PipelineRunResponse = _inner.with_options(
+ substitutions={"funny_name": "p_wo"}
+ )()
+ p2: PipelineRunResponse = _inner_2()
+ p3: PipelineRunResponse = _inner_3()
+ p4: PipelineRunResponse = _inner_4()
+ p5: PipelineRunResponse = _inner_2.with_options(
+ substitutions={"funny_name": "p_wo"}
+ )()
+
+ assert (
+ set(p1.steps["dynamic_single_string_custom_no_default"].outputs.keys())
+ != set(
+ p2.steps["dynamic_single_string_custom_no_default"].outputs.keys()
+ )
+ != set(
+ p3.steps["dynamic_single_string_custom_no_default"].outputs.keys()
+ )
+ != set(p4.steps["dynamic_single_string_custom"].outputs.keys())
+ )
+ # only pipeline `with_options` -> pipeline with_options
+ assert (
+ list(
+ p1.steps["dynamic_single_string_custom_no_default"].outputs.keys()
+ )[0]
+ == "dummy_dynamic_custom_p_wo"
+ )
+ # only pipeline deco -> pipeline deco
+ assert (
+ list(
+ p2.steps["dynamic_single_string_custom_no_default"].outputs.keys()
+ )[0]
+ == "dummy_dynamic_custom_p_d"
+ )
+ # pipeline deco + step with_options -> step with_options
+ assert (
+ list(
+ p3.steps["dynamic_single_string_custom_no_default"].outputs.keys()
+ )[0]
+ == "dummy_dynamic_custom_s_wo"
+ )
+ # pipeline deco + step deco -> step deco
+ assert (
+ list(p4.steps["dynamic_single_string_custom"].outputs.keys())[0]
+ == "dummy_dynamic_custom_name_placeholder"
+ )
+ # pipeline deco + with_options -> pipeline with_options
+ assert (
+ list(
+ p5.steps["dynamic_single_string_custom_no_default"].outputs.keys()
+ )[0]
+ == "dummy_dynamic_custom_p_wo"
+ )
+
+
+def test_execution_fails_on_custom_but_not_provided_name(
+ clean_client: "Client",
+):
+ """Test that dynamic naming fails on custom placeholder, if they are not provided."""
+
+ @pipeline(enable_cache=False)
+ def _inner():
+ dynamic_single_string_custom_no_default.with_options(
+ substitutions={"not_a_funny_name": "it's gonna fail"}
+ )()
+
+ with pytest.raises(
+ KeyError,
+ match="Could not format the name template `dummy_dynamic_custom_{funny_name}`. Missing key: 'funny_name'",
+ ):
+ _inner()
+
+
+def test_stored_info_not_affected_by_dynamic_naming(clean_client: "Client"):
+ """Test that dynamic naming does not affect stored info."""
+
+ @pipeline(enable_cache=False)
+ def _inner(ret: str):
+ dynamic_single_string_standard_controlled_return(ret)
+
+ p1: PipelineRunResponse = _inner("output_1")
+ p2: PipelineRunResponse = _inner("output_2")
+
+ a1 = clean_client.get_artifact_version(
+ list(
+ p1.steps[
+ "dynamic_single_string_standard_controlled_return"
+ ].outputs.keys()
+ )[0]
+ ).load()
+ a2 = clean_client.get_artifact_version(
+ list(
+ p2.steps[
+ "dynamic_single_string_standard_controlled_return"
+ ].outputs.keys()
+ )[0]
+ ).load()
+ assert a1 == "output_1" != a2
+ assert a2 == "output_2" != a1
+
+
+def test_that_overrides_work_as_expected(clean_client: "Client"):
+ """Test that dynamic naming does not affect stored info."""
+
+ @pipeline(
+ enable_cache=False, substitutions={"date": "not_a_date_actually"}
+ )
+ def _inner(pass_to_step: str = ""):
+ if pass_to_step:
+ dynamic_single_string_custom_no_default.with_options(
+ substitutions={
+ "funny_name": pass_to_step,
+ "date": pass_to_step,
+ }
+ )()
+ else:
+ dynamic_single_string_custom_no_default()
+
+ p1: PipelineRunResponse = _inner.with_options(
+ substitutions={"funny_name": "pipeline_level"}
+ )()
+ p2: PipelineRunResponse = _inner("step_level")
+ p1_subs = p1.config.substitutions
+ p2_subs = p2.config.substitutions
+
+ assert p1_subs["date"] == "not_a_date_actually"
+ assert p2_subs["date"] == "not_a_date_actually"
+ assert p1_subs["time"] != p2_subs["time"]
+ assert p1_subs["funny_name"] == "pipeline_level"
+ assert "funny_name" not in p2_subs
+
+ p1_step_subs = p1.steps[
+ "dynamic_single_string_custom_no_default"
+ ].config.substitutions
+ p2_step_subs = p2.steps[
+ "dynamic_single_string_custom_no_default"
+ ].config.substitutions
+ assert p1_step_subs["time"] != p2_step_subs["time"]
+ assert p1_step_subs["date"] != p2_step_subs["date"]
+ assert p1_step_subs["date"] == "not_a_date_actually"
+ assert p2_step_subs["date"] == "step_level"
+ assert p1_step_subs["funny_name"] == "pipeline_level"
+ assert p2_step_subs["funny_name"] == "step_level"
diff --git a/tests/unit/orchestrators/test_publish_utils.py b/tests/unit/orchestrators/test_publish_utils.py
index 12acba6366..ecab4252ac 100644
--- a/tests/unit/orchestrators/test_publish_utils.py
+++ b/tests/unit/orchestrators/test_publish_utils.py
@@ -27,7 +27,11 @@ def test_publishing_a_successful_step_run(mocker):
)
step_run_id = uuid4()
- output_artifact_ids = {"output_name": uuid4()}
+ output_artifact_ids = {
+ "output_name": [
+ uuid4(),
+ ]
+ }
publish_utils.publish_successful_step_run(
step_run_id=step_run_id, output_artifact_ids=output_artifact_ids
diff --git a/tests/unit/steps/test_utils.py b/tests/unit/steps/test_utils.py
index 2d6b9ea741..9f834bf190 100644
--- a/tests/unit/steps/test_utils.py
+++ b/tests/unit/steps/test_utils.py
@@ -283,7 +283,7 @@ def func_with_multiple_annotated_outputs_and_deployment_artifact_config() -> (
],
)
def test_step_output_annotation_parsing(func, expected_output):
- assert parse_return_type_annotations(func) == expected_output
+ assert parse_return_type_annotations(func, {}) == expected_output
def func_with_multiple_annotations() -> Annotated[int, "a", "b"]:
@@ -329,4 +329,4 @@ def func_with_duplicate_output_name() -> (
)
def test_invalid_step_output_annotations(func, exception):
with pytest.raises(exception):
- parse_return_type_annotations(func)
+ parse_return_type_annotations(func, {})