Skip to content

Commit

Permalink
Merge branch 'master' into dremio-space-source-filters
Browse files Browse the repository at this point in the history
  • Loading branch information
acrylJonny authored Nov 29, 2024
2 parents 2914f66 + c42f779 commit 3847574
Show file tree
Hide file tree
Showing 23 changed files with 401 additions and 211 deletions.
10 changes: 9 additions & 1 deletion datahub-web-react/src/app/ingest/source/builder/sources.json
Original file line number Diff line number Diff line change
Expand Up @@ -309,13 +309,21 @@
"displayName": "Dremio",
"description": "Import Spaces, Sources, Tables and statistics from Dremio.",
"docsUrl": "https://datahubproject.io/docs/metadata-ingestion/",
"recipe": "source:\n type: dremio\n config:\n # Coordinates\n hostname: null\n port: null\n #true if https, otherwise false\n tls: true\n\n #For cloud instance\n #is_dremio_cloud: True\n #dremio_cloud_project_id: <project_id>\n\n #Credentials with personal access token\n authentication_method: PAT\n password: pass\n\n #Or Credentials with basic auth\n #authentication_method: password\n #username: null\n #password: null\n\n stateful_ingestion:\n enabled: true"
"recipe": "source:\n type: dremio\n config:\n # Coordinates\n hostname: null\n port: null\n #true if https, otherwise false\n tls: true\n\n #For cloud instance\n #is_dremio_cloud: True\n #dremio_cloud_project_id: <project_id>\n\n #Credentials with personal access token\n authentication_method: PAT\n password: pass\n\n #Or Credentials with basic auth\n #authentication_method: password\n #username: null\n #password: null\n\n ingest_owner: true\n\n stateful_ingestion:\n enabled: true"
},
{
"urn": "urn:li:dataPlatform:cassandra",
"name": "cassandra",
"displayName": "CassandraDB",
"docsUrl": "https://datahubproject.io/docs/generated/ingestion/sources/cassandra",
"recipe": "source:\n type: cassandra\n config:\n # Credentials for on prem cassandra\n contact_point: localhost\n port: 9042\n username: admin\n password: password\n\n # Or\n # Credentials Astra Cloud\n #cloud_config:\n # secure_connect_bundle: Path to Secure Connect Bundle (.zip)\n # token: Application Token\n\n # Optional Allow / Deny extraction of particular keyspaces.\n keyspace_pattern:\n allow: [.*]\n\n # Optional Allow / Deny extraction of particular tables.\n table_pattern:\n allow: [.*]"
},
{
"urn": "urn:li:dataPlatform:iceberg",
"name": "iceberg",
"displayName": "Iceberg",
"description": "Ingest databases and tables from any Iceberg catalog implementation",
"docsUrl": "https://datahubproject.io/docs/generated/ingestion/sources/iceberg",
"recipe": "source:\n type: \"iceberg\"\n config:\n env: dev\n # each thread will open internet connections to fetch manifest files independently, \n # this value needs to be adjusted with ulimit\n processing_threads: 1 \n # a single catalog definition with a form of a dictionary\n catalog: \n demo: # name of the catalog\n type: \"rest\" # other types are available\n uri: \"uri\"\n s3.access-key-id: \"access-key\"\n s3.secret-access-key: \"secret-access-key\"\n s3.region: \"aws-region\"\n profiling:\n enabled: false\n"
}
]
4 changes: 2 additions & 2 deletions docs/managed-datahub/release-notes/v_0_3_7.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ Release Availability Date

Recommended CLI/SDK
---
- `v0.14.1.11` with release notes at https://github.com/datahub/datahub/releases/tag/v0.14.1.11
- `v0.14.1.12` with release notes at https://github.com/datahub/datahub/releases/tag/v0.14.1.12

If you are using an older CLI/SDK version, then please upgrade it. This applies for all CLI/SDK usages, if you are using it through your terminal, GitHub Actions, Airflow, in Python SDK somewhere, Java SDK, etc. This is a strong recommendation to upgrade, as we keep on pushing fixes in the CLI, and it helps us support you better.

Expand Down Expand Up @@ -116,7 +116,7 @@ If you are using an older CLI/SDK version, then please upgrade it. This applies
- Improved UX for setting up and managing SSO

- Ingestion changes
- In addition to the improvements listed here: https://github.com/acryldata/datahub/releases/tag/v0.14.1.11
- In addition to the improvements listed here: https://github.com/acryldata/datahub/releases/tag/v0.14.1.12
- PowerBI: Support for PowerBI Apps and cross-workspace lineage
- Fivetran: Major improvements to configurability and improved reliability with large Fivetran setups
- Snowflake & BigQuery: Improved handling of temporary tables and swap statements when generating lineage
Expand Down
2 changes: 2 additions & 0 deletions metadata-ingestion/docs/sources/dremio/dremio_recipe.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ source:

include_query_lineage: True

ingest_owner: true

#Optional
source_mappings:
- platform: s3
Expand Down
6 changes: 4 additions & 2 deletions metadata-ingestion/docs/sources/iceberg/iceberg.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ This ingestion source maps the following Source System Concepts to DataHub Conce

## Troubleshooting

### [Common Issue]
### Exceptions while increasing `processing_threads`

[Provide description of common issues with this integration and steps to resolve]
Each processing thread will open several files/sockets to download manifest files from blob storage. If you experience
exceptions appearing when increasing `processing_threads` configuration parameter, try to increase limit of open
files (i.e. using `ulimit` in Linux).
7 changes: 4 additions & 3 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
)

base_requirements = {
# Typing extension should be >=3.10.0.2 ideally but we can't restrict due to a Airflow 2.1 dependency conflict.
"typing_extensions>=3.7.4.3",
# Our min version of typing_extensions is somewhat constrained by Airflow.
"typing_extensions>=3.10.0.2",
# Actual dependencies.
"typing-inspect",
# pydantic 1.8.2 is incompatible with mypy 0.910.
Expand Down Expand Up @@ -249,7 +249,8 @@

iceberg_common = {
# Iceberg Python SDK
"pyiceberg>=0.4,<0.7",
# Kept at 0.4.0 due to higher versions requiring pydantic>2, as soon as we are fine with it, bump this dependency
"pyiceberg>=0.4.0",
}

mssql_common = {
Expand Down
68 changes: 7 additions & 61 deletions metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,19 +53,7 @@
make_assertion_from_test,
make_assertion_result_from_test,
)
from datahub.ingestion.source.sql.sql_types import (
ATHENA_SQL_TYPES_MAP,
BIGQUERY_TYPES_MAP,
POSTGRES_TYPES_MAP,
SNOWFLAKE_TYPES_MAP,
SPARK_SQL_TYPES_MAP,
TRINO_SQL_TYPES_MAP,
VERTICA_SQL_TYPES_MAP,
resolve_athena_modified_type,
resolve_postgres_modified_type,
resolve_trino_modified_type,
resolve_vertica_modified_type,
)
from datahub.ingestion.source.sql.sql_types import resolve_sql_type
from datahub.ingestion.source.state.stale_entity_removal_handler import (
StaleEntityRemovalHandler,
StaleEntityRemovalSourceReport,
Expand All @@ -89,17 +77,11 @@
from datahub.metadata.com.linkedin.pegasus2avro.metadata.snapshot import DatasetSnapshot
from datahub.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent
from datahub.metadata.com.linkedin.pegasus2avro.schema import (
BooleanTypeClass,
DateTypeClass,
MySqlDDL,
NullTypeClass,
NumberTypeClass,
RecordType,
SchemaField,
SchemaFieldDataType,
SchemaMetadata,
StringTypeClass,
TimeTypeClass,
)
from datahub.metadata.schema_classes import (
DataPlatformInstanceClass,
Expand Down Expand Up @@ -804,28 +786,6 @@ def make_mapping_upstream_lineage(
)


# See https://github.com/fishtown-analytics/dbt/blob/master/core/dbt/adapters/sql/impl.py
_field_type_mapping = {
"boolean": BooleanTypeClass,
"date": DateTypeClass,
"time": TimeTypeClass,
"numeric": NumberTypeClass,
"text": StringTypeClass,
"timestamp with time zone": DateTypeClass,
"timestamp without time zone": DateTypeClass,
"integer": NumberTypeClass,
"float8": NumberTypeClass,
"struct": RecordType,
**POSTGRES_TYPES_MAP,
**SNOWFLAKE_TYPES_MAP,
**BIGQUERY_TYPES_MAP,
**SPARK_SQL_TYPES_MAP,
**TRINO_SQL_TYPES_MAP,
**ATHENA_SQL_TYPES_MAP,
**VERTICA_SQL_TYPES_MAP,
}


def get_column_type(
report: DBTSourceReport,
dataset_name: str,
Expand All @@ -835,24 +795,10 @@ def get_column_type(
"""
Maps known DBT types to datahub types
"""
TypeClass: Any = _field_type_mapping.get(column_type) if column_type else None

if TypeClass is None and column_type:
# resolve a modified type
if dbt_adapter == "trino":
TypeClass = resolve_trino_modified_type(column_type)
elif dbt_adapter == "athena":
TypeClass = resolve_athena_modified_type(column_type)
elif dbt_adapter == "postgres" or dbt_adapter == "redshift":
# Redshift uses a variant of Postgres, so we can use the same logic.
TypeClass = resolve_postgres_modified_type(column_type)
elif dbt_adapter == "vertica":
TypeClass = resolve_vertica_modified_type(column_type)
elif dbt_adapter == "snowflake":
# Snowflake types are uppercase, so we check that.
TypeClass = _field_type_mapping.get(column_type.upper())

# if still not found, report the warning

TypeClass = resolve_sql_type(column_type, dbt_adapter)

# if still not found, report a warning
if TypeClass is None:
if column_type:
report.info(
Expand All @@ -861,9 +807,9 @@ def get_column_type(
context=f"{dataset_name} - {column_type}",
log=False,
)
TypeClass = NullTypeClass
TypeClass = NullTypeClass()

return SchemaFieldDataType(type=TypeClass())
return SchemaFieldDataType(type=TypeClass)


@platform_name("dbt")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ def __init__(
platform: str,
ui_url: str,
env: str,
ingest_owner: bool,
domain: Optional[str] = None,
platform_instance: Optional[str] = None,
):
Expand All @@ -150,6 +151,7 @@ def __init__(
self.env = env
self.domain = domain
self.ui_url = ui_url
self.ingest_owner = ingest_owner

def get_container_key(
self, name: Optional[str], path: Optional[List[str]]
Expand Down Expand Up @@ -429,21 +431,23 @@ def _create_external_url(self, dataset: DremioDataset) -> str:
return f'{self.ui_url}/{container_type}/{dataset_url_path}"{dataset.resource_name}"'

def _create_ownership(self, dataset: DremioDataset) -> Optional[OwnershipClass]:
if not dataset.owner:
return None
owner = (
make_user_urn(dataset.owner)
if dataset.owner_type == "USER"
else make_group_urn(dataset.owner)
)
return OwnershipClass(
owners=[
OwnerClass(
owner=owner,
type=OwnershipTypeClass.TECHNICAL_OWNER,
)
]
)
if self.ingest_owner and dataset.owner:
owner_urn = (
make_user_urn(dataset.owner)
if dataset.owner_type == "USER"
else make_group_urn(dataset.owner)
)
ownership: OwnershipClass = OwnershipClass(
owners=[
OwnerClass(
owner=owner_urn,
type=OwnershipTypeClass.TECHNICAL_OWNER,
)
]
)
return ownership

return None

def _create_glossary_terms(self, entity: DremioDataset) -> GlossaryTermsClass:
return GlossaryTermsClass(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,3 +174,8 @@ def is_profiling_enabled(self) -> bool:
default=False,
description="Whether to include query-based lineage information.",
)

ingest_owner: bool = Field(
default=True,
description="Ingest Owner from source. This will override Owner info entered from UI",
)
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ class DremioSource(StatefulIngestionSourceBase):
- Ownership and Glossary Terms:
- Metadata related to ownership of datasets, extracted from Dremio’s ownership model.
- Glossary terms and business metadata associated with datasets, providing additional context to the data.
- Note: Ownership information will only be available for the Cloud and Enterprise editions, it will not be available for the Community edition.
- Optional SQL Profiling (if enabled):
- Table, row, and column statistics can be profiled and ingested via optional SQL queries.
Expand All @@ -123,6 +124,7 @@ def __init__(self, config: DremioSourceConfig, ctx: PipelineContext):
self.dremio_aspects = DremioAspects(
platform=self.get_platform(),
domain=self.config.domain,
ingest_owner=self.config.ingest_owner,
platform_instance=self.config.platform_instance,
env=self.config.env,
ui_url=dremio_api.ui_url,
Expand Down
17 changes: 12 additions & 5 deletions metadata-ingestion/src/datahub/ingestion/source/iceberg/iceberg.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
NoSuchIcebergTableError,
NoSuchNamespaceError,
NoSuchPropertyException,
NoSuchTableError,
)
from pyiceberg.schema import Schema, SchemaVisitorPerPrimitiveType, visit
from pyiceberg.table import Table
Expand Down Expand Up @@ -104,7 +105,7 @@
@capability(SourceCapability.DESCRIPTIONS, "Enabled by default.")
@capability(
SourceCapability.OWNERSHIP,
"Optionally enabled via configuration by specifying which Iceberg table property holds user or group ownership.",
"Automatically ingests ownership information from table properties based on `user_ownership_property` and `group_ownership_property`",
)
@capability(SourceCapability.DELETION_DETECTION, "Enabled via stateful ingestion")
class IcebergSource(StatefulIngestionSourceBase):
Expand Down Expand Up @@ -192,9 +193,7 @@ def _process_dataset(dataset_path: Identifier) -> Iterable[MetadataWorkUnit]:
table = thread_local.local_catalog.load_table(dataset_path)
time_taken = timer.elapsed_seconds()
self.report.report_table_load_time(time_taken)
LOGGER.debug(
f"Loaded table: {table.identifier}, time taken: {time_taken}"
)
LOGGER.debug(f"Loaded table: {table.name()}, time taken: {time_taken}")
yield from self._create_iceberg_workunit(dataset_name, table)
except NoSuchPropertyException as e:
self.report.report_warning(
Expand All @@ -206,12 +205,20 @@ def _process_dataset(dataset_path: Identifier) -> Iterable[MetadataWorkUnit]:
)
except NoSuchIcebergTableError as e:
self.report.report_warning(
"no-iceberg-table",
"not-an-iceberg-table",
f"Failed to create workunit for {dataset_name}. {e}",
)
LOGGER.warning(
f"NoSuchIcebergTableError while processing table {dataset_path}, skipping it.",
)
except NoSuchTableError as e:
self.report.report_warning(
"no-such-table",
f"Failed to create workunit for {dataset_name}. {e}",
)
LOGGER.warning(
f"NoSuchTableError while processing table {dataset_path}, skipping it.",
)
except Exception as e:
self.report.report_failure("general", f"Failed to create workunit: {e}")
LOGGER.exception(
Expand Down
29 changes: 21 additions & 8 deletions metadata-ingestion/src/datahub/ingestion/source/kafka/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ def get_kafka_consumer(
) -> confluent_kafka.Consumer:
consumer = confluent_kafka.Consumer(
{
"group.id": "test",
"group.id": "datahub-kafka-ingestion",
"bootstrap.servers": connection.bootstrap,
**connection.consumer_config,
}
Expand All @@ -164,6 +164,25 @@ def get_kafka_consumer(
return consumer


def get_kafka_admin_client(
connection: KafkaConsumerConnectionConfig,
) -> AdminClient:
client = AdminClient(
{
"group.id": "datahub-kafka-ingestion",
"bootstrap.servers": connection.bootstrap,
**connection.consumer_config,
}
)
if CallableConsumerConfig.is_callable_config(connection.consumer_config):
# As per documentation, we need to explicitly call the poll method to make sure OAuth callback gets executed
# https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#kafka-client-configuration
logger.debug("Initiating polling for kafka admin client")
client.poll(timeout=30)
logger.debug("Initiated polling for kafka admin client")
return client


@dataclass
class KafkaSourceReport(StaleEntityRemovalSourceReport):
topics_scanned: int = 0
Expand Down Expand Up @@ -278,13 +297,7 @@ def __init__(self, config: KafkaSourceConfig, ctx: PipelineContext):
def init_kafka_admin_client(self) -> None:
try:
# TODO: Do we require separate config than existing consumer_config ?
self.admin_client = AdminClient(
{
"group.id": "test",
"bootstrap.servers": self.source_config.connection.bootstrap,
**self.source_config.connection.consumer_config,
}
)
self.admin_client = get_kafka_admin_client(self.source_config.connection)
except Exception as e:
logger.debug(e, exc_info=e)
self.report.report_warning(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
TimeType,
)

# TODO: Replace with standardized types in sql_types.py
FIELD_TYPE_MAPPING: Dict[
str,
Type[
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ class RedshiftSource(StatefulIngestionSourceBase, TestableSource):
```
"""

# TODO: Replace with standardized types in sql_types.py
REDSHIFT_FIELD_TYPE_MAPPINGS: Dict[
str,
Type[
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@
logger = logging.getLogger(__name__)

# https://docs.snowflake.com/en/sql-reference/intro-summary-data-types.html
# TODO: Move to the standardized types in sql_types.py
SNOWFLAKE_FIELD_TYPE_MAPPINGS = {
"DATE": DateType,
"BIGINT": NumberType,
Expand Down
Loading

0 comments on commit 3847574

Please sign in to comment.