From c066e86da8d16576cad58a6fa7dc57fee2f40adc Mon Sep 17 00:00:00 2001 From: Austin Weisgrau Date: Fri, 12 Jul 2024 14:15:53 -0700 Subject: [PATCH] Refactor dbt utility for modular, separable logging --- parsons/utilities/dbt.py | 283 ------------------------------ parsons/utilities/dbt/__init__.py | 60 +++++++ parsons/utilities/dbt/dbt.py | 138 +++++++++++++++ parsons/utilities/dbt/logging.py | 173 ++++++++++++++++++ parsons/utilities/dbt/models.py | 59 +++++++ requirements.txt | 1 + setup.py | 8 +- 7 files changed, 435 insertions(+), 287 deletions(-) delete mode 100644 parsons/utilities/dbt.py create mode 100644 parsons/utilities/dbt/__init__.py create mode 100644 parsons/utilities/dbt/dbt.py create mode 100644 parsons/utilities/dbt/logging.py create mode 100644 parsons/utilities/dbt/models.py diff --git a/parsons/utilities/dbt.py b/parsons/utilities/dbt.py deleted file mode 100644 index 963eccdecb..0000000000 --- a/parsons/utilities/dbt.py +++ /dev/null @@ -1,283 +0,0 @@ -"""Utility for running and logging output from dbt commands - -Enable this utility by installing parsons with a dbt extra: -`pip install parsons[dbt-redshift]` -or `pip install parsons[dbt-postgres]` -or `pip install parsons[dbt-snowflake]` -or `pip install parsons[dbt-bigquery]` - -To run dbt commands, you will need to have a dbt project directory -somewhere on the local filesystem. - -If slack-related arguments or environment variables are not provided, -no log message will be sent to slack. - -The dbt command will inherit environment variables from the python -process shell, so if your dbt profiles.yml file uses environment -variables, ensure those are set in python or the parent shell before -running this dbt utility. - -Example usage: -``` -from parsons.utilities.dbt import dbtRunner - -dbt_runner = dbtRunner( - commands=['run', 'test'], - dbt_project_directory='/home/ubuntu/code/dbt_project/', -) -dbt_runner.run() -``` - -""" - -import datetime -import json -import logging -import os -import pathlib -import shutil -import subprocess -import time -from typing import List, Literal, Optional - -from parsons.notifications.slack import Slack - -logger = logging.getLogger(__name__) - - -class dbtLogger: - """Module for aggregating logs between dbt commands and sending to slack.""" - - _command_times: dict[str, dict[Literal["start", "end"], float]] = {} - - def __init__( - self, - slack_channel: Optional[str] = None, - slack_webhook: Optional[str] = None, - slack_api_key: Optional[str] = None, - ): - self.start = time.time() - self.log_messages = [] - self.error_messages = [] - self.warn_messages = [] - self.done_messages = [] - self.slack_channel = slack_channel - self.slack_webhook = slack_webhook - self.slack_api_key = slack_api_key - - def record_start(self, command: str) -> None: - """Record start time for command""" - self._command_times[command] = {"start": time.time()} - - def record_end(self, command: str) -> None: - """Record end time for command""" - self._command_times[command]["end"] = time.time() - - def seconds_to_time_string(self, seconds: int): - time_str = "" - command_time = time.gmtime(seconds) - if command_time.tm_yday - 1: - time_str += f"{command_time.tm_yday - 1} days, " - if command_time.tm_hour: - time_str += f"{command_time.tm_hour} hours, " - if command_time.tm_min: - time_str += f"{command_time.tm_min} minutes, " - if command_time.tm_sec: - time_str += f"{command_time.tm_sec} seconds" - - return time_str - - def record_result( - self, - command: str, - error_messages: list[str], - warn_messages: list[str], - skip_messages: list[str], - done_message: str, - ): - command_seconds = int( - self._command_times[command]["end"] - self._command_times[command]["start"] - ) - - log_message = "" - if error_messages: - log_message += ":red_circle:" - status = "Error" - elif warn_messages: - log_message += ":large_orange_circle:" - status = "Warning" - else: - log_message += ":large_green_circle:" - status = "Success" - - time_str = self.seconds_to_time_string(command_seconds) - log_message += f"Invoke dbt with `dbt {command}` ({status} in {time_str})" - if done_message: - log_message += f"\n*Summary*: `{done_message}`" - - if error_messages: - log_message += "\nError messages:\n```{}```".format("\n\n".join(error_messages)) - - if warn_messages: - log_message += "\nWarning messages:\n```{}```".format("\n\n".join(warn_messages)) - - if skip_messages: - skips = [ - msg.split(" ")[5].split(".")[1] - for msg in skip_messages - if msg.split(" ")[4] == "relation" - ] - log_message += "\nSkipped:\n```{}```".format(", ".join(skips)) - - self.log_messages.append(log_message) - - def send_to_slack(self) -> None: - """Log final result to logger and send to slack.""" - end_time = time.time() - duration_seconds = int(end_time - self.start) - duration_time_str = self.seconds_to_time_string(duration_seconds) - - full_log_message = "" - if any(":red_circle:" in log_message for log_message in self.log_messages): - status = "failed" - full_log_message += ":red_circle:" - else: - status = "succeeded" - full_log_message += ":large_green_circle:" - - now = datetime.datetime.today().strftime("%Y-%m-%d %H:%M") - full_log_message += f"*dbt run {status} - {now}*" - full_log_message += f"\n*Duration:* {duration_time_str}\n\n" - full_log_message += "\n".join(self.log_messages) - - if self.slack_webhook: - Slack.message(self.slack_channel, full_log_message, self.slack_webhook) - elif self.slack_api_key: - Slack(self.slack_api_key).message(self.slack_channel, full_log_message) - - def log_results(self, command_str: str, stdout: str, stderr: str) -> None: - """Parsed logs from dbt command and log to logger and slack.""" - - message = "" - parsed_rows = [] - - for output in (stdout, stderr): - for row in output.split("\n"): - if not row: - continue - try: - parsed_row = json.loads(row) - parsed_rows.append(parsed_row) - except json.JSONDecodeError: - message += row + "\n" - - log_messages = [] - error_messages = [] - warn_messages = [] - skip_messages = [] - - for row in parsed_rows: - if not row["info"]["msg"]: - continue - - log_message = row["info"]["msg"] - log_messages.append(log_message) - - if row["info"]["level"] == "error": - logger.error(log_message) - error_messages.append(log_message) - # Capture model/test warnings but exclude verbose top-level warnings - elif row["info"]["level"] == "warn" and "[WARNING]" not in row["info"]["msg"]: - logger.warning(log_message) - warn_messages.append(log_message) - elif "SKIP " in row["info"]["msg"]: - logger.warning(log_message) - skip_messages.append(log_message) - else: - logger.info(log_message) - - done_messages = [i for i in log_messages if "Done. PASS" in i] - if done_messages: - done_message = done_messages[0] - else: - done_message = "" - - self.record_result(command_str, error_messages, warn_messages, skip_messages, done_message) - - -class dbtRunner: - def __init__( - self, - commands: List[str], - dbt_project_directory: pathlib.Path, - raise_errors: bool = False, - slack_channel: Optional[str] = None, - slack_webhook: Optional[str] = None, - slack_api_key: Optional[str] = None, - ) -> None: - """Initialize dbtRunner client with commands, credentials, and options. - - `Args:` - commands: List[str] - A list of strings, each string a dbt command with - options separated by spaces. - e.g. ["seed", "build -s models/staging", "test"] - dbt_project_directory: pathlib.Path - The path to find the dbt project, as a working - directory for dbt commands to run - raise_errors: bool - Default value: False - A flag indicating whether errors encountered by - the dbt command should be raised as exceptions. - slack_channel: Optional[str] - If set, will be used to send log results. Can be set - with environment variable `SLACK_CHANNEL` - slack_webhook: Optional[str] - If set, will be used to send log results. Only one - of slack_webhook or slack_api_key is necessary. - Can be set with environment variable `SLACK_WEBHOOK` - slack_api_key: Optional[str] - If set, will be used to send log results. Only one - of slack_webhook or slack_api_key is necessary. - Can be set with environment variable `SLACK_API_KEY` - """ - if isinstance(commands, str): - commands = [commands] - self.commands = commands - self.dbt_project_directory = dbt_project_directory - self.raise_errors = raise_errors - self.dbt_logger = dbtLogger( - slack_channel=slack_channel or os.environ.get("SLACK_CHANNEL"), - slack_webhook=slack_webhook or os.environ.get("SLACK_WEBHOOK"), - slack_api_key=slack_api_key or os.environ.get("SLACK_API_KEY"), - ) - - def run(self) -> None: - for command in self.commands: - self.dbt_command(command) - self.dbt_logger.send_to_slack() - - def dbt_command(self, command: str) -> None: - """Runs dbt command and logs results after process is completed. - - If raise_error is set, this method will raise an error if the dbt - command hits any errors. - """ - - self.dbt_logger.record_start(command) - dbt_executable_path = shutil.which("dbt") - commands = [dbt_executable_path, "--log-format", "json"] + command.split(" ") - - process = subprocess.run( - commands, - env=os.environ.copy(), - cwd=self.dbt_project_directory, - text=True, - capture_output=True, - ) - self.dbt_logger.record_end(command) - - self.dbt_logger.log_results(command, process.stdout, process.stderr) - - if self.raise_errors: - process.check_returncode() diff --git a/parsons/utilities/dbt/__init__.py b/parsons/utilities/dbt/__init__.py new file mode 100644 index 0000000000..5bb34ca2c2 --- /dev/null +++ b/parsons/utilities/dbt/__init__.py @@ -0,0 +1,60 @@ +"""Utility for running and logging output from dbt commands + +Enable this utility by installing parsons with a dbt extra: +`pip install parsons[dbt-redshift]` +or `pip install parsons[dbt-postgres]` +or `pip install parsons[dbt-snowflake]` +or `pip install parsons[dbt-bigquery]` + +To run dbt commands, you will need to have a dbt project directory +somewhere on the local filesystem. + +The dbt command will inherit environment variables from the python +process shell, so if your dbt profiles.yml file uses environment +variables, ensure those are set in python or the parent shell before +running this dbt utility. + +Logging is handled separately from the dbt run itself. The +dbtRunner.run method returns a dbtCommandResult object which can be +passed to a child class of dbtLogger for logging to stdout, slack, +etc. + +Parsons provides a few example dbtLogger child classes, but for +best results, design your own! + +Example usage: +``` +from parsons.utilities.dbt import ( + run_dbt_commands, + dbtLoggerSlack, + dbtLoggerPython +) + +run_dbt_commands( + commands=['run', 'test'], + dbt_project_directory='/home/ubuntu/code/dbt_project/', + loggers=[ + dbtLoggerPython, + dbtLoggerSlack(slack_webhook=os.environ['SLACK_WEBHOOK']) + ] +) + +``` + +""" + +from parsons.utilities.dbt.dbt import run_dbt_commands +from parsons.utilities.dbt.logging import ( + dbtLoggerMarkdown, + dbtLoggerSlack, + dbtLoggerStdout, + dbtLoggerPython, +) + +__all__ = [ + "run_dbt_commands", + "dbtLoggerMarkdown", + "dbtLoggerSlack", + "dbtLoggerStdout", + "dbtLoggerPython", +] diff --git a/parsons/utilities/dbt/dbt.py b/parsons/utilities/dbt/dbt.py new file mode 100644 index 0000000000..7dcfd224ae --- /dev/null +++ b/parsons/utilities/dbt/dbt.py @@ -0,0 +1,138 @@ +"""Core methods for running dbt commands.""" + +import json +import logging +import os +import pathlib +import shutil +import subprocess +from typing import List, Optional, Type, Union + +from parsons.utilities.dbt.logging import dbtLogger +from parsons.utilities.dbt.models import dbtCommandResult + +logger = logging.getLogger(__name__) + + +class dbtRunner: + def __init__( + self, + commands: Union[str, List[str]], + dbt_project_directory: pathlib.Path, + ) -> None: + """Initialize dbtRunner with commands and a working directory. + + `Args:` + commands: Union[str, List[str]] + A single dbt command string or a list of dbt command + strings. + e.g. ["seed", "build -s models/staging", "test"] + dbt_project_directory: pathlib.Path + The path to find the dbt project, as a working + directory for dbt commands to run + """ + if isinstance(commands, str): + commands = [commands] + self.commands = commands + self.dbt_project_directory = dbt_project_directory + + def run(self) -> list[dbtCommandResult]: + """Executes dbt commands one by one, returns all results.""" + results = [] + + for command in self.commands: + result = self.execute_dbt_command(command) + results.append(result) + + return results + + def execute_dbt_command(self, command: str) -> dbtCommandResult: + """Runs dbt command and logs results after process is completed. + + If raise_error is set, this method will raise an error if the dbt + command hits any errors. + """ + if command.startswith("dbt "): + command = command[4:] + dbt_executable_path = shutil.which("dbt") + if not dbt_executable_path: + raise RuntimeError("dbt executable not found.") + + commands = [dbt_executable_path, "--log-format", "json"] + command.split(" ") + + completed_process = subprocess.run( + commands, + env=os.environ.copy(), + cwd=self.dbt_project_directory, + text=True, + capture_output=True, + ) + + logger.debug(completed_process.stdout) + logger.debug(completed_process.stderr) + + if completed_process.returncode == 2: + raise RuntimeError(completed_process.stderr) + + with open(os.path.join(self.dbt_project_directory, "target", "run_results.json")) as file: + raw_result = json.loads(file.read()) + + result = dbtCommandResult(command=command, **raw_result) + + return result + + +def run_dbt_commands( + commands: Union[str, List[str]], + dbt_project_directory: pathlib.Path, + loggers: Optional[list[Union[dbtLogger, Type[dbtLogger]]]] = None, +) -> list[dbtCommandResult]: + """Executes dbt commands within a directory, optionally logs results. + + Parameters: + ----------- + commands : Union[str, List[str]] + A single dbt command as a string or a list of dbt commands to + be executed. + + dbt_project_directory : pathlib.Path + The path to the dbt project directory where the commands will + be executed. + + loggers : Optional[list[Union[dbtLogger, Type[dbtLogger]]]], default=None + A list of logger instances or logger classes. If classes are + provided, they will be instantiated. Each logger should have + a `send` method that takes the dbt command results as an + argument. + + Returns: + -------- + list[dbtCommandResult] + A list of result objects from the executed dbt commands. + + Example: + -------- + >>> from pathlib import Path + >>> from parsons.utilities.dbt import ( + ... run_dbt_commands, + ... dbtLoggerSlack, + ... dbtLoggerPython + ... ) + >>> results = run_dbt_commands( + ... commands=["dbt run", "dbt test"], + ... dbt_project_directory=Path("/path/to/dbt/project"), + ... loggers=[dbtLoggerPython, dbtLoggerSlack] + ... ) + """ + dbt_runner = dbtRunner(commands, dbt_project_directory) + + dbt_command_results = dbt_runner.run() + + if loggers: + for logger in loggers: + if not isinstance(logger, dbtLogger): + # Instantiate logger if not already instantiated + logger = logger() + logger.send(dbt_command_results) + + return dbt_command_results diff --git a/parsons/utilities/dbt/logging.py b/parsons/utilities/dbt/logging.py new file mode 100644 index 0000000000..70485684db --- /dev/null +++ b/parsons/utilities/dbt/logging.py @@ -0,0 +1,173 @@ +"""Logging classes for use with Parsons dbt utility.""" + +import datetime +import logging +import time +from abc import ABC, abstractmethod +from typing import Optional + +from rich.console import Console +from rich.logging import RichHandler +from rich.markdown import Markdown + +from parsons.utilities.dbt.models import dbtCommandResult + +logger = logging.getLogger(__name__) + + +def human_readable_duration(seconds: int | float) -> str: + time_struct = time.gmtime(seconds) + + time_str = "" + + if time_struct.tm_yday - 1: + time_str += f"{time_struct.tm_yday - 1} days, " + if time_struct.tm_hour: + time_str += f"{time_struct.tm_hour} hours, " + if time_struct.tm_min: + time_str += f"{time_struct.tm_min} minutes, " + if time_struct.tm_sec: + time_str += f"{time_struct.tm_sec} seconds" + + return time_str + + +class dbtLogger(ABC): + """Abstract base class for aggregating logs from dbt commands.""" + + commands: list[dbtCommandResult] + + @abstractmethod + def format_command_result(self, command: dbtCommandResult) -> str: + pass + + @abstractmethod + def format_result(self) -> str: + pass + + @abstractmethod + def send(self, dbt_command_results: list[dbtCommandResult]) -> None: + """The send method is called to execute logging. + + dbt_command_results are passed to this method directly (rather + than on initialization) so that the logger class can be + initialized before the dbt commands have been run. This is + mostly necessary for loggers that need to be initialized with + credentials or options before being provided to the + run_dbt_commands method. + """ + self.commands = dbt_command_results + log_text = self.format_result() # noqa + ... + + +class dbtLoggerMarkdown(dbtLogger): + def format_command_result( + self, + command: dbtCommandResult, + ) -> str: + log_message = "" + + # Header + if command.errors: + log_message += "\U0001F534" # Red box + status = "Error" + elif command.warnings: + log_message += "\U0001F7E0" # Orange circle + status = "Warning" + else: + log_message += "\U0001F7E2" # Green circle + status = "Success" + + time_str = human_readable_duration(command.elapsed_time) + log_message += f"Invoke dbt with `dbt {command.command}` ({status} in {time_str})" + + log_summary_str = ", ".join([f"{node}: {count}" for node, count in command.summary.items()]) + if not log_summary_str: + log_summary_str = "No models ran." + log_message += "\n*Summary*: `{}`".format(log_summary_str) + + # Errors + if command.errors: + log_message += "\nError messages:\n```{}```".format( + "\n\n".join([i.node + ": " + i.message for i in command.errors]) + ) + + # Warnings + if command.warnings: + log_message += "\nWarn messages:\n```{}```".format( + "\n\n".join([i.node + ": " + i.message for i in command.warnings]) + ) + + # Skips + if command.skips: + skips = set([i.node for i in command.skips]) + log_message += "\nSkipped:\n```{}```".format(", ".join(skips)) + + return log_message + + def format_result(self) -> str: + """Format result string from all commands.""" + full_log_message = "" + + # Header + if any([command.errors for command in self.commands]): + status = "failed" + full_log_message += "\U0001F534" + else: + status = "succeeded" + full_log_message += "\U0001F7E2" + + now = datetime.datetime.today().strftime("%Y-%m-%d %H:%M") + full_log_message += f"*dbt run {status} - {now}*" + + total_duration = sum([command.elapsed_time for command in self.commands]) + duration_time_str = human_readable_duration(total_duration) + full_log_message += f"\n*Duration:* {duration_time_str}\n\n" + + # Formatted results from each command + log_messages = [self.format_command_result(command) for command in self.commands] + full_log_message += "\n".join(log_messages) + + return full_log_message + + +class dbtLoggerStdout(dbtLoggerMarkdown): + def send(self, dbt_command_results: list[dbtCommandResult]) -> None: + self.commands = dbt_command_results + log_text = self.format_result() + + md = Markdown(log_text) + console = Console() + console.print(md) + + +class dbtLoggerPython(dbtLoggerMarkdown): + def send(self, dbt_command_results: list[dbtCommandResult]) -> None: + self.commands = dbt_command_results + log_text = self.format_result() + + if "RichHandler" not in [handler.__class__.__name__ for handler in logger.handlers]: + logger.addHandler(RichHandler(level=logging.INFO)) + + logger.info(log_text) + + +class dbtLoggerSlack(dbtLoggerMarkdown): + + def __init__( + self, + slack_webhook: str, + slack_channel: Optional[str] = None, + ) -> None: + self.slack_webhook = slack_webhook + self.slack_channel = slack_channel + + def send(self, dbt_command_results: list[dbtCommandResult]) -> None: + self.commands = dbt_command_results + log_text = self.format_result() + + # Importing here to avoid needing to make slackclient a dependency for all dbt users + from parsons.notifications.slack import Slack + + Slack.message(channel=self.slack_channel, text=log_text, webhook=self.slack_webhook) diff --git a/parsons/utilities/dbt/models.py b/parsons/utilities/dbt/models.py new file mode 100644 index 0000000000..08812e2b19 --- /dev/null +++ b/parsons/utilities/dbt/models.py @@ -0,0 +1,59 @@ +"""Pydantic data models for use with dbt utilities.""" + +from pydantic.v1 import BaseModel, Field, validator +import collections +from typing import Literal + + +class dbtResult(BaseModel): + """For each dbt SQL operation, one dbtResult object is generated.""" + + status: Literal["success", "pass", "warn", "error", "fail", "skipped"] + execution_time: float + node: str = Field(alias="unique_id") + message: str | None = None + bytes_processed: int | None = Field(default=None, alias="adapter_response") + + @validator("bytes_processed", pre=True) + def unnest_bytes_processed(cls, value: dict) -> int | None: + """Fetch bytes_processed from adapter_response""" + return value.get("bytes_processed") + + +class dbtCommandResult(BaseModel): + """Results from the execution of a dbt command. + + These results are fetched from the dbt-generated run_results.json + file. + """ + + command: str + elapsed_time: float + results: list[dbtResult] + + def filter_results(self, **kwargs) -> list[dbtResult]: + """Subset of results based on filter""" + filtered_results = [ + result + for result in self.results + if all([getattr(result, key) == value for key, value in kwargs.items()]) + ] + return filtered_results + + @property + def warnings(self) -> list[dbtResult]: + return self.filter_results(status="warn") + + @property + def errors(self) -> list[dbtResult]: + return self.filter_results(status="error") + + @property + def skips(self) -> list[dbtResult]: + return self.filter_results(status="skipped") + + @property + def summary(self) -> collections.Counter: + """Counts of pass, warn, fail, error & skip.""" + result = collections.Counter([i.status for i in self.results]) + return result diff --git a/requirements.txt b/requirements.txt index 219c3017c8..0c68fd20cb 100644 --- a/requirements.txt +++ b/requirements.txt @@ -26,6 +26,7 @@ paramiko==3.4.0 petl==1.7.15 psycopg2-binary==2.9.9 PyGitHub==1.51 +pydantic>=1.10.17 python-dateutil==2.8.2 requests==2.31.0 requests_oauthlib==1.3.0 diff --git a/setup.py b/setup.py index e11b6bd1c5..4c1e1cb901 100644 --- a/setup.py +++ b/setup.py @@ -22,10 +22,10 @@ def main(): "braintree": ["braintree"], "catalist": ["paramiko"], "civis": ["civis"], - "dbt-redshift": ["dbt-redshift", "slackclient<2"], - "dbt-bigquery": ["dbt-bigquery", "slackclient<2"], - "dbt-postgres": ["dbt-postgres", "slackclient<2"], - "dbt-snowflake": ["dbt-snowflake", "slackclient<2"], + "dbt-redshift": ["dbt-redshift", "pydantic>=1.10.17"], + "dbt-bigquery": ["dbt-bigquery", "pydantic>=1.10.17"], + "dbt-postgres": ["dbt-postgres", "pydantic>=1.10.17"], + "dbt-snowflake": ["dbt-snowflake", "pydantic>=1.10.17"], "facebook": ["joblib", "facebook-business"], "geocode": ["censusgeocode", "urllib3==1.26.19"], "github": ["PyGitHub"],