diff --git a/core/dbt/artifacts/resources/v1/components.py b/core/dbt/artifacts/resources/v1/components.py index 8eb43f35d8e..05c150782d2 100644 --- a/core/dbt/artifacts/resources/v1/components.py +++ b/core/dbt/artifacts/resources/v1/components.py @@ -3,6 +3,8 @@ from datetime import timedelta from typing import Any, Dict, List, Optional, Union +from dbt_config.catalog_config import ExternalCatalog + from dbt.artifacts.resources.base import Docs, FileHash, GraphResource from dbt.artifacts.resources.types import NodeType, TimePeriod from dbt.artifacts.resources.v1.config import NodeConfig @@ -164,6 +166,7 @@ class DeferRelation(HasRelationMetadata): meta: Dict[str, Any] tags: List[str] config: Optional[NodeConfig] + external_catalog: Optional[ExternalCatalog] @property def identifier(self): diff --git a/core/dbt/config/external_config.py b/core/dbt/config/external_config.py new file mode 100644 index 00000000000..7533a8becbf --- /dev/null +++ b/core/dbt/config/external_config.py @@ -0,0 +1,24 @@ +from typing import Dict, Optional + +from dbt.clients.yaml_helper import load_yaml_text +from dbt.constants import EXTERNAL_CATALOG_FILE_NAME +from dbt_common.clients.system import load_file_contents, path_exists + + +def _load_yaml(path): + contents = load_file_contents(path) + return load_yaml_text(contents) + + +def _load_yml_dict(file_path): + if path_exists(file_path): + ret = _load_yaml(file_path) or {} + return ret + return None + + +def load_external_catalog_config(project) -> Optional[Dict]: + unparsed_config = _load_yml_dict(f"{project.project_root}/{EXTERNAL_CATALOG_FILE_NAME}") + if unparsed_config is not None: + return unparsed_config + return None diff --git a/core/dbt/config/renderer.py b/core/dbt/config/renderer.py index 4f605979e62..d4b5ad75c3c 100644 --- a/core/dbt/config/renderer.py +++ b/core/dbt/config/renderer.py @@ -229,3 +229,9 @@ class PackageRenderer(SecretRenderer): @property def name(self): return "Packages config" + + +class CatalogRenderer(SecretRenderer): + @property + def name(self): + return "Catalog config" diff --git a/core/dbt/config/runtime.py b/core/dbt/config/runtime.py index e1c24cf5f0c..0727053def0 100644 --- a/core/dbt/config/runtime.py +++ b/core/dbt/config/runtime.py @@ -15,6 +15,8 @@ Type, ) +from dbt_config.catalog_config import ExternalCatalogConfig + from dbt import tracking from dbt.adapters.contracts.connection import ( AdapterRequiredConfig, @@ -39,6 +41,7 @@ from dbt_common.events.functions import warn_or_error from dbt_common.helper_types import DictDefaultEmptyStr, FQNPath, PathSet +from .external_config import load_external_catalog_config from .profile import Profile from .project import Project from .renderer import DbtProjectYamlRenderer, ProfileRenderer @@ -98,6 +101,7 @@ class RuntimeConfig(Project, Profile, AdapterRequiredConfig): profile_name: str cli_vars: Dict[str, Any] dependencies: Optional[Mapping[str, "RuntimeConfig"]] = None + catalogs: Optional[ExternalCatalogConfig] = None def __post_init__(self): self.validate() @@ -125,12 +129,15 @@ def from_parts( profile: Profile, args: Any, dependencies: Optional[Mapping[str, "RuntimeConfig"]] = None, + catalogs: Optional[ExternalCatalogConfig] = None, ) -> "RuntimeConfig": """Instantiate a RuntimeConfig from its components. :param profile: A parsed dbt Profile. :param project: A parsed dbt Project. :param args: The parsed command-line arguments. + :param dependencies: A mapping of project names to RuntimeConfigs. + :param catalogs: A parsed dbt ExternalCatalogConfig. :returns RuntimeConfig: The new configuration. """ quoting: Dict[str, Any] = ( @@ -194,6 +201,7 @@ def from_parts( dependencies=dependencies, dbt_cloud=project.dbt_cloud, flags=project.flags, + catalogs=catalogs, ) # Called by 'load_projects' in this class @@ -253,7 +261,9 @@ def validate(self): # Called by RuntimeConfig.from_args @classmethod - def collect_parts(cls: Type["RuntimeConfig"], args: Any) -> Tuple[Project, Profile]: + def collect_parts( + cls: Type["RuntimeConfig"], args: Any + ) -> Tuple[Project, Profile, Optional[ExternalCatalogConfig]]: # profile_name from the project project_root = args.project_dir if args.project_dir else os.getcwd() cli_vars: Dict[str, Any] = getattr(args, "vars", {}) @@ -264,7 +274,9 @@ def collect_parts(cls: Type["RuntimeConfig"], args: Any) -> Tuple[Project, Profi ) flags = get_flags() project = load_project(project_root, bool(flags.VERSION_CHECK), profile, cli_vars) - return project, profile + catalog_yml = load_external_catalog_config(project) + catalogs = ExternalCatalogConfig.model_validate(catalog_yml) if catalog_yml else None + return project, profile, catalogs # Called in task/base.py, in BaseTask.from_args @classmethod @@ -278,12 +290,13 @@ def from_args(cls, args: Any) -> "RuntimeConfig": :raises DbtProfileError: If the profile is invalid or missing. :raises DbtValidationError: If the cli variables are invalid. """ - project, profile = cls.collect_parts(args) + project, profile, catalogs = cls.collect_parts(args) return cls.from_parts( project=project, profile=profile, args=args, + catalogs=catalogs, ) def get_metadata(self) -> ManifestMetadata: diff --git a/core/dbt/constants.py b/core/dbt/constants.py index 0ff538910d5..a74110116af 100644 --- a/core/dbt/constants.py +++ b/core/dbt/constants.py @@ -15,6 +15,7 @@ PACKAGES_FILE_NAME = "packages.yml" DEPENDENCIES_FILE_NAME = "dependencies.yml" PACKAGE_LOCK_FILE_NAME = "package-lock.yml" +EXTERNAL_CATALOG_FILE_NAME = "catalog.yml" MANIFEST_FILE_NAME = "manifest.json" SEMANTIC_MANIFEST_FILE_NAME = "semantic_manifest.json" LEGACY_TIME_SPINE_MODEL_NAME = "metricflow_time_spine" diff --git a/core/dbt/context/providers.py b/core/dbt/context/providers.py index dfc8c9bb40b..b02a8752285 100644 --- a/core/dbt/context/providers.py +++ b/core/dbt/context/providers.py @@ -19,6 +19,7 @@ from typing_extensions import Protocol from dbt import selected_resources +from dbt.adapters.base.catalog import ExternalCatalogIntegrations from dbt.adapters.base.column import Column from dbt.adapters.base.relation import EventTimeFilter from dbt.adapters.contracts.connection import AdapterResponse @@ -890,6 +891,9 @@ def __init__( self.context_config: Optional[ContextConfig] = context_config self.provider: Provider = provider self.adapter = get_adapter(self.config) + self.catalog_integrations = ExternalCatalogIntegrations.from_json_strings( + self.manifest.catalogs.values(), self.adapter.ExternalCatalogIntegration + ) # The macro namespace is used in creating the DatabaseWrapper self.db_wrapper = self.provider.DatabaseWrapper(self.adapter, self.namespace) @@ -1287,6 +1291,7 @@ def api(self) -> Dict[str, Any]: return { "Relation": self.db_wrapper.Relation, "Column": self.adapter.Column, + "catalogs": self.catalog_integrations, } @contextproperty() diff --git a/core/dbt/contracts/graph/manifest.py b/core/dbt/contracts/graph/manifest.py index b556b479fb4..d7c7ecbf59f 100644 --- a/core/dbt/contracts/graph/manifest.py +++ b/core/dbt/contracts/graph/manifest.py @@ -20,6 +20,7 @@ Union, ) +from dbt_config.catalog_config import ExternalCatalog from typing_extensions import Protocol import dbt_common.exceptions @@ -882,6 +883,7 @@ class Manifest(MacroMethods, dbtClassMixin): unit_tests: MutableMapping[str, UnitTestDefinition] = field(default_factory=dict) saved_queries: MutableMapping[str, SavedQuery] = field(default_factory=dict) fixtures: MutableMapping[str, UnitTestFileFixture] = field(default_factory=dict) + catalogs: MutableMapping[str, str] = field(default_factory=dict) _doc_lookup: Optional[DocLookup] = field( default=None, metadata={"serialize": lambda x: None, "deserialize": lambda x: None} @@ -1379,6 +1381,26 @@ def resolve_source( current_project: str, node_package: str, ) -> MaybeParsedSource: + if target_source_name in self.catalogs: + catalog = ExternalCatalog.model_validate_json(self.catalogs[target_source_name]) + identifier = f"{target_source_name}.{target_table_name}" + catalog_database = catalog.configuration.internal_namespace.database + catalog_schema = catalog.configuration.internal_namespace.schema_ + return SourceDefinition( + database=catalog_database, + schema=catalog_schema, + fqn=[catalog_database, catalog_schema, catalog.name, target_table_name], + name=target_table_name, + source_description=f"External Catalog source for {target_source_name}.{target_table_name}", + source_name=target_source_name, + unique_id=identifier, + identifier=identifier, + package_name="dbt", + path="/root/catalogs.yml", + loader=catalog.type.value, + resource_type=NodeType.Source, + original_file_path="/root/catalogs.yml", + ) search_name = f"{target_source_name}.{target_table_name}" candidates = _packages_to_search(current_project, node_package) diff --git a/core/dbt/parser/manifest.py b/core/dbt/parser/manifest.py index 7ffd00febc5..8dc743b965d 100644 --- a/core/dbt/parser/manifest.py +++ b/core/dbt/parser/manifest.py @@ -10,6 +10,7 @@ from typing import Any, Callable, Dict, List, Mapping, Optional, Set, Tuple, Type, Union import msgpack +from dbt_config.catalog_config import ExternalCatalogConfig import dbt.deprecations import dbt.exceptions @@ -29,6 +30,7 @@ from dbt.clients.jinja import MacroStack, get_rendered from dbt.clients.jinja_static import statically_extract_macro_calls from dbt.config import Project, RuntimeConfig +from dbt.config.external_config import load_external_catalog_config from dbt.constants import ( MANIFEST_FILE_NAME, PARTIAL_PARSE_FILE_NAME, @@ -444,6 +446,13 @@ def load(self) -> Manifest: self.manifest.sources = patcher.sources self._perf_info.patch_sources_elapsed = time.perf_counter() - start_patch + # Get catalog.yml and update the manifest + raw_catalog = load_external_catalog_config(self.root_project) + if raw_catalog: + catalog_config = ExternalCatalogConfig.model_validate(raw_catalog) + self.manifest.catalogs = { + c.name: c.model_dump_json(by_alias=True) for c in catalog_config.catalogs + } # We need to rebuild disabled in order to include disabled sources self.manifest.rebuild_disabled_lookup() @@ -466,6 +475,7 @@ def load(self) -> Manifest: self.process_docs(self.root_project) self.process_metrics(self.root_project) self.process_saved_queries(self.root_project) + self.process_catalog(self.root_project) self.process_model_inferred_primary_keys() self.check_valid_group_config() self.check_valid_access_property() @@ -1140,6 +1150,11 @@ def process_metrics(self, config: RuntimeConfig): continue _process_metrics_for_node(self.manifest, current_project, exposure) + def process_catalog(self, config: RuntimeConfig): + if config.catalogs: + for catalog in config.catalogs.catalogs: + self.manifest.catalogs[catalog.name] = catalog.model_dump_json(by_alias=True) + def process_saved_queries(self, config: RuntimeConfig): """Processes SavedQuery nodes to populate their `depends_on`.""" # Note: This will also capture various nodes which have been re-parsed diff --git a/dev-requirements.txt b/dev-requirements.txt index 20605e632b8..55962bb5136 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -1,6 +1,7 @@ git+https://github.com/dbt-labs/dbt-adapters.git@main git+https://github.com/dbt-labs/dbt-adapters.git@main#subdirectory=dbt-tests-adapter -git+https://github.com/dbt-labs/dbt-common.git@main +git+https://github.com/dbt-labs/dbt-common.git@feature/externalCatalogConfig +git+https://github.com/dbt-labs/dbt-common.git@feature/externalCatalogConfig#egg=dbt-config&subdirectory=config git+https://github.com/dbt-labs/dbt-postgres.git@main # black must match what's in .pre-commit-config.yaml to be sure local env matches CI black==24.3.0 diff --git a/tests/functional/configs/fixtures.py b/tests/functional/configs/fixtures.py index 63490289528..63f58b89aef 100644 --- a/tests/functional/configs/fixtures.py +++ b/tests/functional/configs/fixtures.py @@ -32,7 +32,7 @@ models__untagged_sql = """ {{ - config(materialized='table') + config(materialized=table) }} select id, value from {{ source('raw', 'seed') }} diff --git a/tests/functional/test_external_catalog.py b/tests/functional/test_external_catalog.py new file mode 100644 index 00000000000..6c97b64387e --- /dev/null +++ b/tests/functional/test_external_catalog.py @@ -0,0 +1,48 @@ +import pytest +import yaml +from dbt_config.catalog_config import ExternalCatalog + +from dbt.tests.util import run_dbt, write_file + + +@pytest.fixture(scope="class", autouse=True) +def dbt_catalog_config(project_root): + config = { + "catalogs": [ + { + "name": "my_external_catalog", + "type": "iceberg", + "configuration": { + "table_format": "iceberg", + "catalog_namespace": "dbt", + "internal_namespace": { + "database": "my_db", + "schema": "my_schema", + }, + "external_location": "s3://my-bucket/my-path", + }, + "management": { + "enabled": True, + "create_if_not_exists": False, + "alter_if_different": False, + "read_only": True, + "refresh": "on-start", + }, + } + ], + } + write_file(yaml.safe_dump(config), project_root, "catalog.yml") + + +class TestCatalogConfig: + @pytest.fixture(scope="class") + def models(self): + return { + "model.sql": "select 1 as id from {{ source('my_external_catalog', 'my_table') }}", + } + + def test_supplying_external_catalog(self, project): + manifest = run_dbt(["parse"]) + assert manifest.catalogs != {} + assert manifest.nodes["model.test.model"].sources == [["my_external_catalog", "my_table"]] + ExternalCatalog.model_validate_json(manifest.catalogs["my_external_catalog"]) diff --git a/tests/unit/utils/__init__.py b/tests/unit/utils/__init__.py index ec9cb57595d..9c106338e9d 100644 --- a/tests/unit/utils/__init__.py +++ b/tests/unit/utils/__init__.py @@ -6,12 +6,15 @@ import os import string +from typing import Dict from unittest import TestCase, mock import agate import pytest +from dbt_config.catalog_config import ExternalCatalogConfig from dbt.config.project import PartialProject +from dbt.config.renderer import CatalogRenderer from dbt.contracts.graph.manifest import Manifest from dbt_common.dataclass_schema import ValidationError @@ -57,6 +60,14 @@ def profile_from_dict(profile, profile_name, cli_vars="{}"): ) +def catalog_from_dict(catalog, cli_vars=None): + if cli_vars is None: + cli_vars = {} + renderer = CatalogRenderer(cli_vars) + rendered = renderer.render_value(catalog) + return ExternalCatalogConfig.model_validate(rendered) + + def project_from_dict(project, profile, packages=None, selectors=None, cli_vars="{}"): from dbt.config.renderer import DbtProjectYamlRenderer from dbt.config.utils import parse_cli_vars @@ -77,7 +88,9 @@ def project_from_dict(project, profile, packages=None, selectors=None, cli_vars= return partial.render(renderer) -def config_from_parts_or_dicts(project, profile, packages=None, selectors=None, cli_vars={}): +def config_from_parts_or_dicts( + project, profile, packages=None, selectors=None, cli_vars={}, catalogs=None +): from copy import deepcopy from dbt.config import Profile, Project, RuntimeConfig @@ -103,10 +116,13 @@ def config_from_parts_or_dicts(project, profile, packages=None, selectors=None, cli_vars, ) + if isinstance(catalogs, Dict): + catalogs = catalog_from_dict(catalogs, cli_vars) + args = Obj() args.vars = cli_vars args.profile_dir = "/dev/null" - return RuntimeConfig.from_parts(project=project, profile=profile, args=args) + return RuntimeConfig.from_parts(project=project, profile=profile, args=args, catalogs=catalogs) def inject_plugin(plugin):