Skip to content

Commit

Permalink
fix(profile):bigquery - Check for every table if it is partitioned to…
Browse files Browse the repository at this point in the history
… not hit table quota (#4074)
  • Loading branch information
treff7es authored Feb 7, 2022
1 parent 782e66f commit 622d7bf
Showing 1 changed file with 14 additions and 43 deletions.
57 changes: 14 additions & 43 deletions metadata-ingestion/src/datahub/ingestion/source/sql/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@
-- Filter out special partitions (https://cloud.google.com/bigquery/docs/partitioned-tables#date_timestamp_partitioned_tables)
and p.partition_id not in ('__NULL__', '__UNPARTITIONED__')
and STORAGE_TIER='ACTIVE'
and p.table_name= '{table}'
group by
c.table_catalog,
c.table_schema,
Expand All @@ -108,8 +109,6 @@
c.table_schema,
c.table_name,
c.column_name
limit {limit}
offset {offset}
""".strip()

SHARDED_TABLE_REGEX = r"^(.+)[_](\d{4}|\d{6}|\d{8}|\d{10})$"
Expand Down Expand Up @@ -289,7 +288,6 @@ class BigQueryDatasetKey(ProjectIdKey):

class BigQuerySource(SQLAlchemySource):
config: BigQueryConfig
partiton_columns: Dict[str, Dict[str, BigQueryPartitionColumn]] = dict()
maximum_shard_ids: Dict[str, str] = dict()
lineage_metadata: Optional[Dict[str, Set[str]]] = None

Expand Down Expand Up @@ -486,50 +484,23 @@ def _create_lineage_map(self, entries: Iterable[QueryEvent]) -> Dict[str, Set[st
lineage_map[destination_table_str].add(ref_table_str)
return lineage_map

def get_latest_partitions_for_schema(self, schema: str) -> None:
query_limit: int = 500
offset: int = 0
def get_latest_partition(
self, schema: str, table: str
) -> Optional[BigQueryPartitionColumn]:
url = self.config.get_sql_alchemy_url()
engine = create_engine(url, **self.config.options)
with engine.connect() as con:
inspector = inspect(con)
partitions = {}

def get_partition_columns(
project_id: str, schema: str, limit: int, offset: int
) -> int:
sql = BQ_GET_LATEST_PARTITION_TEMPLATE.format(
project_id=project_id,
schema=schema,
limit=limit,
offset=offset,
)
result = con.execute(sql)
row_count: int = 0
for row in result:
partition = BigQueryPartitionColumn(**row)
partitions[partition.table_name] = partition
row_count = row_count + 1
return row_count

res_size = get_partition_columns(
self.get_db_name(inspector), schema, query_limit, offset
sql = BQ_GET_LATEST_PARTITION_TEMPLATE.format(
project_id=self.get_db_name(inspector), schema=schema, table=table
)
while res_size == query_limit:
offset = offset + query_limit
res_size = get_partition_columns(
self.get_db_name(inspector), schema, query_limit, offset
)

self.partiton_columns[schema] = partitions

def get_latest_partition(
self, schema: str, table: str
) -> Optional[BigQueryPartitionColumn]:
if schema not in self.partiton_columns:
self.get_latest_partitions_for_schema(schema)

return self.partiton_columns[schema].get(table)
result = con.execute(sql)
# Bigquery only supports one partition column
# https://stackoverflow.com/questions/62886213/adding-multiple-partitioned-columns-to-bigquery-table-from-sql-query
row = result.fetchone()
if row:
return BigQueryPartitionColumn(**row)
return None

def get_shard_from_table(self, table: str) -> Tuple[str, Optional[str]]:
match = re.search(SHARDED_TABLE_REGEX, table, re.IGNORECASE)
Expand Down Expand Up @@ -627,7 +598,7 @@ def is_dataset_eligable_profiling(

(project_id, schema, table) = dataset_name.split(".")
if not self.is_latest_shard(project_id=project_id, table=table, schema=schema):
logger.warning(
logger.debug(
f"{dataset_name} is sharded but not the latest shard, skipping..."
)
return False
Expand Down

0 comments on commit 622d7bf

Please sign in to comment.