Skip to content

Commit

Permalink
dbtLoggerDatabase class for logging to database
Browse files Browse the repository at this point in the history
  • Loading branch information
austinweisgrau committed Jul 22, 2024
1 parent 4f10c3b commit a8e2c9b
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 1 deletion.
59 changes: 58 additions & 1 deletion parsons/utilities/dbt/logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@
from abc import ABC, abstractmethod
from typing import Optional

from dbt.contracts.graph.manifest import Manifest
from rich.console import Console
from rich.logging import RichHandler
from rich.markdown import Markdown

from dbt.contracts.graph.manifest import Manifest
from parsons import Table
from parsons.databases.database_connector import DatabaseConnector

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -175,3 +177,58 @@ def send(self, manifests: list[Manifest]) -> None:
from parsons.notifications.slack import Slack

Slack.message(channel=self.slack_channel, text=log_text, webhook=self.slack_webhook)


class dbtLoggerDatabase(dbtLogger, ABC):
"""Log dbt artifacts by loading to a database."""

def __init__(self, database_connector: DatabaseConnector, destination_table: str) -> None:
self.db_connector = database_connector
self.destination_table = destination_table

def format_command_result(self, manifest: Manifest) -> Table:
"""Loads all artifact results into a Parsons Table."""
run_metadata = {
key: getattr(manifest, key)
for key in (
"command",
"generated_at",
)
}
rows = []
for result in manifest.results:
row = run_metadata.copy()
row.update(
{
key: value
for key, value in result.__dict__.items()
if key in ("execution_time", "message")
}
)
row["status"] = str(result.status)
node_info = {
key: value
for key, value in result.node.__dict__.items()
if key in ("database", "schema", "name", "path")
}
node_info["node"] = result.node.unique_id
row.update(node_info)

row["bytes_processed"] = result.adapter_response.get("bytes_processed", 0)

rows.append(row)
tbl = Table(rows)
return tbl

def format_result(self) -> Table:
tbls = [self.format_command_result(command) for command in self.commands]
all_rows_lists = [tbl.to_dicts() for tbl in tbls]
all_rows_flat = [item for sublist in all_rows_lists for item in sublist]
tbl = Table(all_rows_flat)
return tbl

def send(self, manifests: list[Manifest]) -> None:
self.commands = manifests
log_tbl = self.format_result()

self.db_connector.copy(log_tbl, self.destination_table, if_exists="append")
5 changes: 5 additions & 0 deletions parsons/utilities/dbt/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@ def __init__(self, command: str, dbt_manifest: dbtManifest) -> None:
def __getattr__(self, key):
if key in self.__dict__:
result = self.__dict__[key]
elif (
getattr(self.dbt_manifest, "metadata", {})
and key in self.dbt_manifest.metadata.__dict__
):
result = getattr(self.dbt_manifest.metadata, key)
else:
result = getattr(self.dbt_manifest, key)
return result
Expand Down

0 comments on commit a8e2c9b

Please sign in to comment.