From 099113ac91b75bd5336001c23ec52f471492de80 Mon Sep 17 00:00:00 2001 From: Austin Weisgrau Date: Fri, 20 Sep 2024 18:31:00 -0700 Subject: [PATCH] Refactored dbt utility for modularity --- parsons/utilities/dbt.py | 327 ------------------------------ parsons/utilities/dbt/__init__.py | 60 ++++++ parsons/utilities/dbt/dbt.py | 127 ++++++++++++ parsons/utilities/dbt/logging.py | 284 ++++++++++++++++++++++++++ parsons/utilities/dbt/models.py | 76 +++++++ requirements.txt | 3 +- setup.py | 8 +- 7 files changed, 552 insertions(+), 333 deletions(-) delete mode 100644 parsons/utilities/dbt.py create mode 100644 parsons/utilities/dbt/__init__.py create mode 100644 parsons/utilities/dbt/dbt.py create mode 100644 parsons/utilities/dbt/logging.py create mode 100644 parsons/utilities/dbt/models.py diff --git a/parsons/utilities/dbt.py b/parsons/utilities/dbt.py deleted file mode 100644 index 9861564344..0000000000 --- a/parsons/utilities/dbt.py +++ /dev/null @@ -1,327 +0,0 @@ -"""Utility for running and logging output from dbt commands - -Enable this utility by installing parsons with a dbt extra: -`pip install parsons[dbt-redshift]` -or `pip install parsons[dbt-postgres]` -or `pip install parsons[dbt-snowflake]` -or `pip install parsons[dbt-bigquery]` - -To run dbt commands, you will need to have a dbt project directory -somewhere on the local filesystem. - -If slack-related arguments or environment variables are not provided, -no log message will be sent to slack. - -Example usage: -``` -from parsons.utilities.dbt import dbtRunner - -dbt_runner = dbtRunner( - commands=['run', 'test'], - dbt_project_directory='/home/ubuntu/code/dbt_project/', - dbt_schema='dbt_dev' -) -dbt_runner.run() -``` -""" - -import datetime -import json -import logging -import os -import pathlib -import shutil -import subprocess -import time -from typing import List, Literal, Optional - -from parsons.notifications.slack import Slack -from parsons.utilities import check_env - -logger = logging.getLogger(__name__) - - -class dbtLogger: - """Module for aggregating logs between dbt commands and sending to slack.""" - - _command_times: dict[str, dict[Literal["start", "end"], float]] = {} - - def __init__( - self, - slack_channel: Optional[str] = None, - slack_webhook: Optional[str] = None, - slack_api_key: Optional[str] = None, - ): - self.start = time.time() - self.log_messages = [] - self.error_messages = [] - self.warn_messages = [] - self.done_messages = [] - self.slack_channel = slack_channel - self.slack_webhook = slack_webhook - self.slack_api_key = slack_api_key - - def record_start(self, command: str) -> None: - """Record start time for command""" - self._command_times[command] = {"start": time.time()} - - def record_end(self, command: str) -> None: - """Record end time for command""" - self._command_times[command]["end"] = time.time() - - def seconds_to_time_string(self, seconds: int): - time_str = "" - command_time = time.gmtime(seconds) - if command_time.tm_yday - 1: - time_str += f"{command_time.tm_yday - 1} days, " - if command_time.tm_hour: - time_str += f"{command_time.tm_hour} hours, " - if command_time.tm_min: - time_str += f"{command_time.tm_min} minutes, " - if command_time.tm_sec: - time_str += f"{command_time.tm_sec} seconds" - - return time_str - - def record_result( - self, - command: str, - error_messages: list[str], - warn_messages: list[str], - skip_messages: list[str], - done_message: str, - ): - command_seconds = int( - self._command_times[command]["end"] - self._command_times[command]["start"] - ) - - log_message = "" - if error_messages: - log_message += ":red_circle:" - status = "Error" - elif warn_messages: - log_message += ":large_orange_circle:" - status = "Warning" - else: - log_message += ":large_green_circle:" - status = "Success" - - time_str = self.seconds_to_time_string(command_seconds) - log_message += f"Invoke dbt with `dbt {command}` ({status} in {time_str})" - if done_message: - log_message += f"\n*Summary*: `{done_message}`" - - if error_messages: - log_message += "\nError messages:\n```{}```".format("\n\n".join(error_messages)) - - if warn_messages: - log_message += "\nWarning messages:\n```{}```".format("\n\n".join(warn_messages)) - - if skip_messages: - skips = [ - msg.split(" ")[5].split(".")[1] - for msg in skip_messages - if msg.split(" ")[4] == "relation" - ] - log_message += "\nSkipped:\n```{}```".format(", ".join(skips)) - - self.log_messages.append(log_message) - - def send_to_slack(self) -> None: - """Log final result to logger and send to slack.""" - end_time = time.time() - duration_seconds = int(end_time - self.start) - duration_time_str = self.seconds_to_time_string(duration_seconds) - - full_log_message = "" - if any(":red_circle:" in log_message for log_message in self.log_messages): - status = "failed" - full_log_message += ":red_circle:" - else: - status = "succeeded" - full_log_message += ":large_green_circle:" - - now = datetime.datetime.today().strftime("%Y-%m-%d %H:%M") - full_log_message += f"*dbt run {status} - {now}*" - full_log_message += f"\n*Duration:* {duration_time_str}\n\n" - full_log_message += "\n".join(self.log_messages) - - if self.slack_webhook: - Slack.message(self.slack_channel, full_log_message, self.slack_webhook) - elif self.slack_api_key: - Slack(self.slack_api_key).message(self.slack_channel, full_log_message) - - def log_results(self, command_str: str, stdout: str, stderr: str) -> None: - """Parsed logs from dbt command and log to logger and slack.""" - - message = "" - parsed_rows = [] - - for output in (stdout, stderr): - for row in output.split("\n"): - if not row: - continue - try: - parsed_row = json.loads(row) - parsed_rows.append(parsed_row) - except json.JSONDecodeError: - message += row + "\n" - - log_messages = [] - error_messages = [] - warn_messages = [] - skip_messages = [] - - for row in parsed_rows: - if not row["info"]["msg"]: - continue - - log_message = row["info"]["msg"] - log_messages.append(log_message) - - if row["info"]["level"] == "error": - logger.error(log_message) - error_messages.append(log_message) - # Capture model/test warnings but exclude verbose top-level warnings - elif row["info"]["level"] == "warn" and "[WARNING]" not in row["info"]["msg"]: - logger.warning(log_message) - warn_messages.append(log_message) - elif "SKIP " in row["info"]["msg"]: - logger.warning(log_message) - skip_messages.append(log_message) - else: - logger.info(log_message) - - done_messages = [i for i in log_messages if "Done. PASS" in i] - if done_messages: - done_message = done_messages[0] - else: - done_message = "" - - self.record_result(command_str, error_messages, warn_messages, skip_messages, done_message) - - -class dbtRunner: - def __init__( - self, - commands: List[str], - dbt_project_directory: pathlib.Path, - dbt_schema: Optional[str] = None, - username: Optional[str] = None, - password: Optional[str] = None, - host: Optional[str] = None, - port: Optional[int] = None, - db: Optional[str] = None, - raise_errors: bool = False, - slack_channel: Optional[str] = None, - slack_webhook: Optional[str] = None, - slack_api_key: Optional[str] = None, - ) -> None: - """Initialize dbtRunner client with commands, credentials, and options. - - `Args:` - commands: List[str] - A list of strings, each string a dbt command with - options separated by spaces. - e.g. ["seed", "build -s models/staging", "test"] - dbt_project_directory: pathlib.Path - The path to find the dbt project, as a working - directory for dbt commands to run - dbt_schema: Optional[str] - Populates an environment variable DBT_SCHEMA - which can be used in your dbt profile. - Not required if the `DBT_SCHEMA` environment variable set. - username: Optional[str] - Populates an environment variable REDSHIFT_USERNAME - which can be used in your dbt profile - Not requried if the `REDSHIFT_USERNAME` - environment variable set. - password: Optional[str] - Populates an environment variable REDSHIFT_PASSWORD - which can be used in your dbt profile - Not requried if the `REDSHIFT_PASSWORD` - environment variable set. - host: Optional[str] - Populates an environment variable REDSHIFT_HOST - which can be used in your dbt profile - Not requried if the `REDSHIFT_HOST` - environment variable set. - port: Optional[str] - Populates an environment variable REDSHIFT_PORT - which can be used in your dbt profile - Not requried if the `REDSHIFT_PORT` - environment variable set. - db: Optional[str] - Populates an environment variable REDSHIFT_DB - which can be used in your dbt profile - Not requried if the `REDSHIFT_DB` - environment variable set. - raise_errors: bool - Default value: False - A flag indicating whether errors encountered by - the dbt command should be raised as exceptions. - slack_channel: Optional[str] - If set, will be used to send log results. Can be set - with environment variable `SLACK_CHANNEL` - slack_webhook: Optional[str] - If set, will be used to send log results. Only one - of slack_webhook or slack_api_key is necessary. - Can be set with environment variable `SLACK_WEBHOOK` - slack_api_key: Optional[str] - If set, will be used to send log results. Only one - of slack_webhook or slack_api_key is necessary. - Can be set with environment variable `SLACK_API_KEY` - """ - self.commands = commands - self.dbt_schema = check_env.check("DBT_SCHEMA", dbt_schema) - self.username = check_env.check("REDSHIFT_USERNAME", username) - self.password = check_env.check("REDSHIFT_PASSWORD", password) - self.host = check_env.check("REDSHIFT_HOST", host) - self.port = check_env.check("REDSHIFT_PORT", port) - self.db = check_env.check("REDSHIFT_DB", db) - self.dbt_project_directory = dbt_project_directory - self.raise_errors = raise_errors - self.dbt_logger = dbtLogger( - slack_channel=slack_channel or os.environ.get("SLACK_CHANNEL"), - slack_webhook=slack_webhook or os.environ.get("SLACK_WEBHOOK"), - slack_api_key=slack_api_key or os.environ.get("SLACK_API_KEY"), - ) - - def run(self) -> None: - for command in self.commands: - self.dbt_command(command) - self.dbt_logger.send_to_slack() - - def dbt_command(self, command: str) -> None: - """Runs dbt command and logs results after process is completed. - - If raise_error is set, this method will raise an error if the dbt - command hits any errors. - """ - - self.dbt_logger.record_start(command) - dbt_executable_path = shutil.which("dbt") - commands = [dbt_executable_path, "--log-format", "json"] + command.split(" ") - - shell_environment = { - "REDSHIFT_USERNAME": self.username, - "REDSHIFT_PASSWORD": self.password, - "REDSHIFT_HOST": self.host, - "REDSHIFT_PORT": self.port, - "REDSHIFT_DB": self.db, - "DBT_SCHEMA": self.dbt_schema, - } - - process = subprocess.run( - commands, - env=shell_environment, - cwd=self.dbt_project_directory, - text=True, - capture_output=True, - ) - self.dbt_logger.record_end(command) - - self.dbt_logger.log_results(command, process.stdout, process.stderr) - - if self.raise_errors: - process.check_returncode() diff --git a/parsons/utilities/dbt/__init__.py b/parsons/utilities/dbt/__init__.py new file mode 100644 index 0000000000..5bb34ca2c2 --- /dev/null +++ b/parsons/utilities/dbt/__init__.py @@ -0,0 +1,60 @@ +"""Utility for running and logging output from dbt commands + +Enable this utility by installing parsons with a dbt extra: +`pip install parsons[dbt-redshift]` +or `pip install parsons[dbt-postgres]` +or `pip install parsons[dbt-snowflake]` +or `pip install parsons[dbt-bigquery]` + +To run dbt commands, you will need to have a dbt project directory +somewhere on the local filesystem. + +The dbt command will inherit environment variables from the python +process shell, so if your dbt profiles.yml file uses environment +variables, ensure those are set in python or the parent shell before +running this dbt utility. + +Logging is handled separately from the dbt run itself. The +dbtRunner.run method returns a dbtCommandResult object which can be +passed to a child class of dbtLogger for logging to stdout, slack, +etc. + +Parsons provides a few example dbtLogger child classes, but for +best results, design your own! + +Example usage: +``` +from parsons.utilities.dbt import ( + run_dbt_commands, + dbtLoggerSlack, + dbtLoggerPython +) + +run_dbt_commands( + commands=['run', 'test'], + dbt_project_directory='/home/ubuntu/code/dbt_project/', + loggers=[ + dbtLoggerPython, + dbtLoggerSlack(slack_webhook=os.environ['SLACK_WEBHOOK']) + ] +) + +``` + +""" + +from parsons.utilities.dbt.dbt import run_dbt_commands +from parsons.utilities.dbt.logging import ( + dbtLoggerMarkdown, + dbtLoggerSlack, + dbtLoggerStdout, + dbtLoggerPython, +) + +__all__ = [ + "run_dbt_commands", + "dbtLoggerMarkdown", + "dbtLoggerSlack", + "dbtLoggerStdout", + "dbtLoggerPython", +] diff --git a/parsons/utilities/dbt/dbt.py b/parsons/utilities/dbt/dbt.py new file mode 100644 index 0000000000..8507b6850d --- /dev/null +++ b/parsons/utilities/dbt/dbt.py @@ -0,0 +1,127 @@ +"""Core methods for running dbt commands.""" + +import logging +import pathlib +from typing import List, Optional, Type, Union + +from parsons.utilities.dbt.logging import dbtLogger +from parsons.utilities.dbt.models import Manifest + +from dbt.cli.main import dbtRunner, dbtRunnerResult + + +logger = logging.getLogger(__name__) + + +class dbtRunnerParsons: + def __init__( + self, + commands: Union[str, List[str]], + dbt_project_directory: pathlib.Path, + ) -> None: + """Initialize dbtRunner with commands and a working directory. + + `Args:` + commands: Union[str, List[str]] + A single dbt command string or a list of dbt command + strings. + e.g. ["seed", "build -s models/staging", "test"] + dbt_project_directory: pathlib.Path + The path to find the dbt project, as a working + directory for dbt commands to run + """ + if isinstance(commands, str): + commands = [commands] + self.commands = commands + self.dbt_project_directory = dbt_project_directory + + def run(self) -> list[Manifest]: + """Executes dbt commands one by one, returns all results.""" + results = [] + + for command in self.commands: + result = self.execute_dbt_command(command) + results.append(result) + + return results + + def execute_dbt_command(self, command: str) -> Manifest: + """Runs dbt command and logs results after process is completed. + + If raise_error is set, this method will raise an error if the dbt + command hits any errors. + """ + if command.startswith("dbt "): + command = command[4:] + + # initialize + dbt = dbtRunner() + + # create CLI args as a list of strings + cli_args = command.split(" ") + cli_args.extend(["--project-dir", str(self.dbt_project_directory)]) + + # run the command + result: dbtRunnerResult = dbt.invoke(cli_args) + manifest = Manifest(command=command, dbt_manifest=result.result) + + if result.exception: + raise result.exception + + return manifest + + +def run_dbt_commands( + commands: Union[str, List[str]], + dbt_project_directory: pathlib.Path, + loggers: Optional[list[Union[dbtLogger, Type[dbtLogger]]]] = None, +) -> list[Manifest]: + """Executes dbt commands within a directory, optionally logs results. + + Parameters: + ----------- + commands : Union[str, List[str]] + A single dbt command as a string or a list of dbt commands to + be executed. + + dbt_project_directory : pathlib.Path + The path to the dbt project directory where the commands will + be executed. + + loggers : Optional[list[Union[dbtLogger, Type[dbtLogger]]]], default=None + A list of logger instances or logger classes. If classes are + provided, they will be instantiated. Each logger should have + a `send` method that takes the dbt command results as an + argument. + + Returns: + -------- + list[Manifest] + A list of result objects from the executed dbt commands. + + Example: + -------- + >>> from pathlib import Path + >>> from parsons.utilities.dbt import ( + ... run_dbt_commands, + ... dbtLoggerSlack, + ... dbtLoggerPython + ... ) + >>> results = run_dbt_commands( + ... commands=["dbt run", "dbt test"], + ... dbt_project_directory=Path("/path/to/dbt/project"), + ... loggers=[dbtLoggerPython, dbtLoggerSlack] + ... ) + """ + dbt_runner = dbtRunnerParsons(commands, dbt_project_directory) + + dbt_command_results = dbt_runner.run() + + if loggers: + for logger in loggers: + if not isinstance(logger, dbtLogger): + # Instantiate logger if not already instantiated + logger = logger() + logger.send(dbt_command_results) + + return dbt_command_results diff --git a/parsons/utilities/dbt/logging.py b/parsons/utilities/dbt/logging.py new file mode 100644 index 0000000000..a46e395dd5 --- /dev/null +++ b/parsons/utilities/dbt/logging.py @@ -0,0 +1,284 @@ +"""Logging classes for use with Parsons dbt utility.""" + +import datetime +import logging +import uuid +import time +from abc import ABC, abstractmethod +from typing import Optional + +from dbt.contracts.graph.manifest import Manifest +from rich.console import Console +from rich.logging import RichHandler +from rich.markdown import Markdown + +from parsons import Table +from parsons.databases.database_connector import DatabaseConnector + +logger = logging.getLogger(__name__) + + +def human_readable_duration(seconds: int | float) -> str: + time_struct = time.gmtime(seconds) + + time_str = "" + + if time_struct.tm_yday - 1: + time_str += f"{time_struct.tm_yday - 1} days, " + if time_struct.tm_hour: + time_str += f"{time_struct.tm_hour} hours, " + if time_struct.tm_min: + time_str += f"{time_struct.tm_min} minutes, " + if time_struct.tm_sec: + time_str += f"{time_struct.tm_sec} seconds" + + return time_str + + +class dbtLogger(ABC): + """Abstract base class for aggregating logs from dbt commands.""" + + commands: list[Manifest] + + @abstractmethod + def format_command_result(self, manifest: Manifest) -> str: + pass + + @abstractmethod + def format_result(self) -> str: + pass + + @abstractmethod + def send(self, manifests: list[Manifest]) -> None: + """The send method is called to execute logging. + + manifests are passed to this method directly (rather + than on initialization) so that the logger class can be + initialized before the dbt commands have been run. This is + mostly necessary for loggers that need to be initialized with + credentials or options before being provided to the + run_dbt_commands method. + """ + self.commands = manifests + log_text = self.format_result() # noqa + ... + + +class dbtLoggerMarkdown(dbtLogger): + def format_command_result( + self, + manifest: Manifest, + ) -> str: + log_message = "" + + # Header + if manifest.errors: + log_message += "\U0001f534" # Red box + status = "Error" + elif manifest.warnings: + log_message += "\U0001f7e0" # Orange circle + status = "Warning" + else: + log_message += "\U0001f7e2" # Green circle + status = "Success" + + time_str = human_readable_duration(manifest.elapsed_time) + log_message += f"Invoke dbt with `dbt {manifest.command}` ({status} in {time_str})" + + log_summary_str = ", ".join( + [f"{node}: {count}" for node, count in manifest.summary.items()] + ) + if not log_summary_str: + log_summary_str = "No models ran." + log_message += "\n*Summary*: `{}`".format(log_summary_str) + + log_message += "\n*GB Processed*: {:.2f}".format(manifest.total_gb_processed) + log_message += "\n*Slot hours*: {:.2f}".format(manifest.total_slot_hours) + + # Errors + if manifest.errors or manifest.fails: + log_message += "\nError messages:\n```{}```".format( + "\n\n".join( + [i.node.name + ": " + i.message for i in [*manifest.errors, *manifest.fails]] + ) + ) + + # Warnings + if manifest.warnings: + log_message += "\nWarn messages:\n```{}```".format( + "\n\n".join([i.node.name + ": " + (i.message or "") for i in manifest.warnings]) + ) + + # Skips + if manifest.skips: + skips = set([i.node.name for i in manifest.skips]) + log_message += "\nSkipped:\n```{}```".format(", ".join(skips)) + + return log_message + + def format_result(self) -> str: + """Format result string from all commands.""" + full_log_message = "" + + # Header + if any([command.errors for command in self.commands]): + status = "failed" + full_log_message += "\U0001f534" + else: + status = "succeeded" + full_log_message += "\U0001f7e2" + + now = datetime.datetime.today().strftime("%Y-%m-%d %H:%M") + full_log_message += f"*dbt run {status} - {now}*" + + total_duration = sum([command.elapsed_time for command in self.commands]) + duration_time_str = human_readable_duration(total_duration) + full_log_message += f"\n*Duration:* {duration_time_str}\n\n" + + # Formatted results from each command + log_messages = [self.format_command_result(command) for command in self.commands] + full_log_message += "\n".join(log_messages) + + return full_log_message + + +class dbtLoggerStdout(dbtLoggerMarkdown): + def send(self, manifests: list[Manifest]) -> None: + self.commands = manifests + log_text = self.format_result() + + md = Markdown(log_text) + console = Console() + console.print(md) + + +class dbtLoggerPython(dbtLoggerMarkdown): + def send(self, manifests: list[Manifest]) -> None: + self.commands = manifests + log_text = self.format_result() + + if "RichHandler" not in [handler.__class__.__name__ for handler in logger.handlers]: + logger.addHandler(RichHandler(level=logging.INFO)) + + logger.info(log_text) + + +class dbtLoggerSlack(dbtLoggerMarkdown): + def __init__( + self, + slack_webhook: str, + slack_channel: Optional[str] = None, + ) -> None: + self.slack_webhook = slack_webhook + self.slack_channel = slack_channel + + def send(self, manifests: list[Manifest]) -> None: + self.commands = manifests + log_text = self.format_result() + + # Importing here to avoid needing to make slackclient a dependency for all dbt users + from parsons.notifications.slack import Slack + + Slack.message(channel=self.slack_channel, text=log_text, webhook=self.slack_webhook) + + +class dbtLoggerDatabase(dbtLogger, ABC): + """Log dbt artifacts by loading to a database. + + This class is an abstract base class for logging dbt artifacts to + a database. + """ + + def __init__( + self, + database_connector: DatabaseConnector, + destination_table_runs: str, + destination_table_nodes: str, + extra_run_table_fields: dict, + **copy_kwargs, + ) -> None: + """Initialize the logger. + + Args: + database_connector: A DatabaseConnector object. + destination_table_runs: The name of the table to log run information. + destination_table_nodes: The name of the table to log node information. + extra_run_table_fields: A dictionary of additional fields to include in the run table. + **copy_kwargs: Additional keyword arguments to pass to the `copy` method. + """ + self.db_connector = database_connector + self.destination_table_runs = destination_table_runs + self.destination_table_nodes = destination_table_nodes + self.extra_run_table_fields = extra_run_table_fields + self.copy_kwargs = copy_kwargs + + def format_command_result(self, manifest: Manifest) -> tuple[Table, Table]: + """Loads all artifact results into a Parsons Table.""" + dbt_run_id = str(uuid.uuid4()) + run_metadata = { + key: getattr(manifest, key) for key in ("command", "generated_at", "elapsed_time") + } + run_metadata.update(**self.extra_run_table_fields) + run_metadata["run_id"] = dbt_run_id + run_tbl = Table([run_metadata]) + + node_rows = [] + for result in manifest.results: + node_row = {"dbt_run_id": dbt_run_id} + node_row.update( + { + key: value + for key, value in result.__dict__.items() + if key in ("execution_time", "message") + } + ) + node_row["status"] = str(result.status) + node_info = { + key: value + for key, value in result.node.__dict__.items() + if key in ("database", "schema", "name", "path") + } + node_info["node"] = result.node.unique_id + node_row.update(node_info) + + adapter_response_data = { + key: value + for key, value in result.adapter_response.items() + if key in ("bytes_processed", "bytes_billed", "job_id", "slot_ms") + } + node_row.update(adapter_response_data) + + node_rows.append(node_row) + + nodes_tbl = Table(node_rows) + return run_tbl, nodes_tbl + + def format_result(self) -> tuple[Table, Table]: + """Returns a table for the dbt runs and a table for the node runs.""" + run_rows = [] + node_rows = [] + for command in self.commands: + run_tbl, nodes_tbl = self.format_command_result(command) + run_rows.extend(run_tbl.to_dicts()) + node_rows.extend(nodes_tbl.to_dicts()) + + all_runs_tbl = Table(run_rows) + all_nodes_tbl = Table(node_rows) + return all_runs_tbl, all_nodes_tbl + + def send(self, manifests: list[Manifest]) -> None: + self.commands = manifests + runs_tbl, nodes_tbl = self.format_result() + + self.db_connector.copy( + runs_tbl, + self.destination_table_runs, + if_exists="append", + **self.copy_kwargs, + ) + self.db_connector.copy( + nodes_tbl, + self.destination_table_nodes, + if_exists="append", + **self.copy_kwargs, + ) diff --git a/parsons/utilities/dbt/models.py b/parsons/utilities/dbt/models.py new file mode 100644 index 0000000000..863e78ac1f --- /dev/null +++ b/parsons/utilities/dbt/models.py @@ -0,0 +1,76 @@ +"""Pydantic data models for use with dbt utilities.""" + +import collections +from dbt.contracts.graph.manifest import Manifest as dbtManifest +from dbt.contracts.results import NodeResult + + +class Manifest: + def __init__(self, command: str, dbt_manifest: dbtManifest) -> None: + self.command = command + self.dbt_manifest = dbt_manifest + + def __getattr__(self, key): + if key in dir(self): + result = getattr(self, key) + elif ( + getattr(self.dbt_manifest, "metadata", {}) + and key in self.dbt_manifest.metadata.__dict__ + ): + result = getattr(self.dbt_manifest.metadata, key) + else: + result = getattr(self.dbt_manifest, key) + return result + + def filter_results(self, **kwargs) -> list[NodeResult]: + """Subset of results based on filter""" + filtered_results = [ + result + for result in self.dbt_manifest + if all([str(getattr(result, key)) == value for key, value in kwargs.items()]) + ] + return filtered_results + + @property + def warnings(self) -> list[NodeResult]: + return self.filter_results(status="warn") + + @property + def errors(self) -> list[NodeResult]: + return self.filter_results(status="error") + + @property + def fails(self) -> list[NodeResult]: + return self.filter_results(status="fail") + + @property + def skips(self) -> list[NodeResult]: + """Returns skipped model builds but not skipped tests.""" + return [ + node + for node in self.filter_results(status="skipped") + if node.node.name.split(".")[0] == "model" + ] + + @property + def summary(self) -> collections.Counter: + """Counts of pass, warn, fail, error & skip.""" + result = collections.Counter([str(i.status) for i in self.dbt_manifest]) + return result + + @property + def total_gb_processed(self) -> float: + """Total GB processed by full dbt command run.""" + result = ( + sum([node.adapter_response.get("bytes_processed", 0) for node in self.dbt_manifest]) + / 1000000000 + ) + return result + + @property + def total_slot_hours(self) -> float: + """Total slot hours used by full dbt command run.""" + result = ( + sum([node.adapter_response.get("slot_ms", 0) for node in self.dbt_manifest]) / 3600000 + ) + return result diff --git a/requirements.txt b/requirements.txt index a8af914744..067a2aaad9 100644 --- a/requirements.txt +++ b/requirements.txt @@ -6,8 +6,7 @@ braintree==4.17.1 bs4==0.0.1 censusgeocode==0.4.3.post1 civis==1.16.1 -curlify==2.2.1 -dbt_redshift==1.4.0 +dbt_core>=1.5.0 docutils<0.18,>=0.14 defusedxml>=0.7.1, <=0.8.0 facebook-business==13.0.0 diff --git a/setup.py b/setup.py index 706b360a7f..2a1ef9250a 100644 --- a/setup.py +++ b/setup.py @@ -22,10 +22,10 @@ def main(): "braintree": ["braintree"], "catalist": ["paramiko"], "civis": ["civis"], - "dbt-redshift": ["dbt-redshift", "slackclient<2"], - "dbt-bigquery": ["dbt-bigquery", "slackclient<2"], - "dbt-postgres": ["dbt-postgres", "slackclient<2"], - "dbt-snowflake": ["dbt-snowflake", "slackclient<2"], + "dbt-redshift": ["dbt-redshift>=1.5.0"], + "dbt-bigquery": ["dbt-bigquery>=1.5.0"], + "dbt-postgres": ["dbt-postgres>=1.5.0"], + "dbt-snowflake": ["dbt-snowflake>=1.5.0"], "facebook": ["joblib", "facebook-business"], "geocode": ["censusgeocode", "urllib3==1.26.19"], "github": ["PyGitHub"],