Skip to content

Commit

Permalink
docs(ingestion): Emitter api examples + Documentation (#3599)
Browse files Browse the repository at this point in the history
  • Loading branch information
rslanka authored Nov 19, 2021
1 parent a36fefa commit f1045f8
Show file tree
Hide file tree
Showing 6 changed files with 223 additions and 2 deletions.
27 changes: 25 additions & 2 deletions metadata-ingestion/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -184,8 +184,31 @@ Check out the [transformers guide](./transformers.md) for more info!

In some cases, you might want to construct the MetadataChangeEvents yourself but still use this framework to emit that metadata to DataHub. In this case, take a look at the emitter interfaces, which can easily be imported and called from your own code.

- [DataHub emitter via REST](./src/datahub/emitter/rest_emitter.py) (same requirements as `datahub-rest`). Basic usage [example](./examples/library/lineage_emitter_rest.py).
- [DataHub emitter via Kafka](./src/datahub/emitter/kafka_emitter.py) (same requirements as `datahub-kafka`). Basic usage [example](./examples/library/lineage_emitter_kafka.py).
- [DataHub emitter via REST](./src/datahub/emitter/rest_emitter.py) (same requirements as `datahub-rest`).
- [DataHub emitter via Kafka](./src/datahub/emitter/kafka_emitter.py) (same requirements as `datahub-kafka`).
### Sample code
#### Lineage
The following samples will cover emitting dataset-to-dataset, dataset-to-job-to-dataset, chart-to-dataset, dashboard-to-chart and job-to-dataflow lineages.
- [lineage_emitter_mcpw_rest.py](./examples/library/lineage_emitter_mcpw_rest.py) - emits simple bigquery table-to-table (dataset-to-dataset) lineage via REST as MetadataChangeProposalWrapper.
- [lineage_dataset_job_dataset.py](./examples/library/lineage_dataset_job_dataset.py) - emits mysql-to-airflow-to-kafka (dataset-to-job-to-dataset) lineage via REST as MetadataChangeProposalWrapper.
- [lineage_dataset_chart.py](./examples/library/lineage_dataset_chart.py) - emits the dataset-to-chart lineage via REST as MetadataChangeProposalWrapper.
- [lineage_chart_dashboard.py](./examples/library/lineage_chart_dashboard.py) - emits the chart-to-dashboard lineage via REST as MetadataChangeProposalWrapper.
- [lineage_job_dataflow.py](./examples/library/lineage_job_dataflow.py) - emits the job-to-dataflow lineage via REST as MetadataChangeProposalWrapper.
- [lineage_emitter_rest.py](./examples/library/lineage_emitter_rest.py) - emits simple dataset-to-dataset lineage via REST as MetadataChangeEvent.
- [lineage_emitter_kafka.py](./examples/library/lineage_emitter_kafka.py) - emits simple dataset-to-dataset lineage via Kafka as MetadataChangeEvent.
- [Datahub Snowflake Lineage](https://github.com/linkedin/datahub/blob/master/metadata-ingestion/src/datahub/ingestion/source/sql/snowflake.py#L249) - emits Datahub's Snowflake lineage as MetadataChangeProposalWrapper.
- [Datahub Bigquery Lineage](https://github.com/linkedin/datahub/blob/a1bf95307b040074c8d65ebb86b5eb177fdcd591/metadata-ingestion/src/datahub/ingestion/source/sql/bigquery.py#L229) - emits Datahub's Bigquery lineage as MetadataChangeProposalWrapper.
- [Datahub Dbt Lineage](https://github.com/linkedin/datahub/blob/a9754ebe83b6b73bc2bfbf49d9ebf5dbd2ca5a8f/metadata-ingestion/src/datahub/ingestion/source/dbt.py#L625,L630) - emits Datahub's DBT lineage as MetadataChangeEvent.

NOTE:
- Emitting aspects as MetadataChangeProposalWrapper is recommended over emitting aspects via the
MetadataChangeEvent.
- Emitting any aspect associated with an entity completely overwrites the previous
value of the aspect associated with the entity. This means that emitting a lineage aspect associated with a dataset will overwrite lineage edges that already exist.
#### Programmatic Pipeline
In some cases, you might want to configure and run a pipeline entirely from within your custom python script. Here is an example of how to do it.
- [programmatic_pipeline.py](./examples/library/programatic_pipeline.py) - a basic mysql to REST programmatic pipeline.


## Lineage with Airflow

Expand Down
39 changes: 39 additions & 0 deletions metadata-ingestion/examples/library/lineage_chart_dashboard.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
from typing import List

import datahub.emitter.mce_builder as builder
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.rest_emitter import DatahubRestEmitter
from datahub.metadata.com.linkedin.pegasus2avro.dashboard import DashboardInfoClass
from datahub.metadata.schema_classes import ChangeAuditStampsClass, ChangeTypeClass

# Construct the DashboardInfo aspect with the charts -> dashboard lineage.
charts_in_dashboard: List[str] = [
builder.make_chart_urn(platform="looker", name="chart_1"),
builder.make_chart_urn(platform="looker", name="chart_2"),
]

last_modified = ChangeAuditStampsClass()


dashboard_info = DashboardInfoClass(
title="My Dashboard 1",
description="Sample dashboard",
lastModified=last_modified,
charts=charts_in_dashboard,
)

# Construct a MetadataChangeProposalWrapper object with the DashboardInfo aspect.
# NOTE: This will overwrite all of the existing dashboard aspect information associated with this dashboard.
chart_info_mcp = MetadataChangeProposalWrapper(
entityType="dashboard",
changeType=ChangeTypeClass.UPSERT,
entityUrn=builder.make_dashboard_urn(platform="looker", name="my_dashboard_1"),
aspectName="dashboardInfo",
aspect=dashboard_info,
)

# Create an emitter to the GMS REST API.
emitter = DatahubRestEmitter("http://localhost:8080")

# Emit metadata!
emitter.emit_mcp(chart_info_mcp)
38 changes: 38 additions & 0 deletions metadata-ingestion/examples/library/lineage_dataset_chart.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
from typing import List

import datahub.emitter.mce_builder as builder
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.rest_emitter import DatahubRestEmitter
from datahub.metadata.com.linkedin.pegasus2avro.chart import ChartInfoClass
from datahub.metadata.schema_classes import ChangeAuditStampsClass, ChangeTypeClass

# Construct the ChartInfo aspect with the input_datasets lineage.
input_datasets: List[str] = [
builder.make_dataset_urn(platform="hdfs", name="dataset1", env="PROD"),
builder.make_dataset_urn(platform="hdfs", name="dataset2", env="PROD"),
]

last_modified = ChangeAuditStampsClass()

chart_info = ChartInfoClass(
title="Baz Chart 1",
description="Sample Baz chart",
lastModified=last_modified,
inputs=input_datasets,
)

# Construct a MetadataChangeProposalWrapper object with the ChartInfo aspect.
# NOTE: This will overwrite all of the existing chartInfo aspect information associated with this chart.
chart_info_mcp = MetadataChangeProposalWrapper(
entityType="chart",
changeType=ChangeTypeClass.UPSERT,
entityUrn=builder.make_chart_urn(platform="looker", name="my_chart_1"),
aspectName="chartInfo",
aspect=chart_info,
)

# Create an emitter to the GMS REST API.
emitter = DatahubRestEmitter("http://localhost:8080")

# Emit metadata!
emitter.emit_mcp(chart_info_mcp)
49 changes: 49 additions & 0 deletions metadata-ingestion/examples/library/lineage_dataset_job_dataset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
from typing import List

import datahub.emitter.mce_builder as builder
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.rest_emitter import DatahubRestEmitter
from datahub.metadata.com.linkedin.pegasus2avro.datajob import DataJobInputOutputClass
from datahub.metadata.schema_classes import ChangeTypeClass

# Construct the DataJobInputOutput aspect.
input_datasets: List[str] = [
builder.make_dataset_urn(platform="mysql", name="librarydb.member", env="PROD"),
builder.make_dataset_urn(platform="mysql", name="librarydb.checkout", env="PROD"),
]

output_datasets: List[str] = [
builder.make_dataset_urn(
platform="kafka", name="debezium.topics.librarydb.member_checkout", env="PROD"
)
]

input_data_jobs: List[str] = [
builder.make_data_job_urn(
orchestrator="airflow", flow_id="flow1", job_id="job0", cluster="PROD"
)
]

datajob_input_output = DataJobInputOutputClass(
inputDatasets=input_datasets,
outputDatasets=output_datasets,
inputDatajobs=input_data_jobs,
)

# Construct a MetadataChangeProposalWrapper object.
# NOTE: This will overwrite all of the existing lineage information associated with this job.
datajob_input_output_mcp = MetadataChangeProposalWrapper(
entityType="datajob",
changeType=ChangeTypeClass.UPSERT,
entityUrn=builder.make_data_job_urn(
orchestrator="airflow", flow_id="flow1", job_id="job1", cluster="PROD"
),
aspectName="dataJobInputOutput",
aspect=datajob_input_output,
)

# Create an emitter to the GMS REST API.
emitter = DatahubRestEmitter("http://localhost:8080")

# Emit metadata!
emitter.emit_mcp(datajob_input_output_mcp)
42 changes: 42 additions & 0 deletions metadata-ingestion/examples/library/lineage_emitter_mcpw_rest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
from typing import List

import datahub.emitter.mce_builder as builder
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.rest_emitter import DatahubRestEmitter
from datahub.metadata.com.linkedin.pegasus2avro.dataset import (
DatasetLineageTypeClass,
UpstreamClass,
UpstreamLineage,
)
from datahub.metadata.schema_classes import ChangeTypeClass

# Construct upstream tables.
upstream_tables: List[UpstreamClass] = []
upstream_table_1 = UpstreamClass(
dataset=builder.make_dataset_urn("bigquery", "upstream_table_1", "PROD"),
type=DatasetLineageTypeClass.TRANSFORMED,
)
upstream_tables.append(upstream_table_1)
upstream_table_2 = UpstreamClass(
dataset=builder.make_dataset_urn("bigquery", "upstream_table_2", "PROD"),
type=DatasetLineageTypeClass.TRANSFORMED,
)
upstream_tables.append(upstream_table_2)

# Construct a lineage object.
upstream_lineage = UpstreamLineage(upstreams=upstream_tables)

# Construct a MetadataChangeProposalWrapper object.
lineage_mcp = MetadataChangeProposalWrapper(
entityType="dataset",
changeType=ChangeTypeClass.UPSERT,
entityUrn=builder.make_dataset_urn("bigquery", "downstream"),
aspectName="upstreamLineage",
aspect=upstream_lineage,
)

# Create an emitter to the GMS REST API.
emitter = DatahubRestEmitter("http://localhost:8080")

# Emit metadata!
emitter.emit_mcp(lineage_mcp)
30 changes: 30 additions & 0 deletions metadata-ingestion/examples/library/lineage_job_dataflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import datahub.emitter.mce_builder as builder
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.rest_emitter import DatahubRestEmitter
from datahub.metadata.com.linkedin.pegasus2avro.datajob import DataJobInfoClass
from datahub.metadata.schema_classes import ChangeTypeClass

# Construct the DataJobInfo aspect with the job -> flow lineage.
dataflow_urn = builder.make_data_flow_urn(
orchestrator="airflow", flow_id="flow1", cluster="prod"
)

datajob_info = DataJobInfoClass(name="My Job 1", type="AIRFLOW", flowUrn=dataflow_urn)

# Construct a MetadataChangeProposalWrapper object with the DataJobInfo aspect.
# NOTE: This will overwrite all of the existing dataJobInfo aspect information associated with this job.
chart_info_mcp = MetadataChangeProposalWrapper(
entityType="dataJob",
changeType=ChangeTypeClass.UPSERT,
entityUrn=builder.make_data_job_urn(
orchestrator="airflow", flow_id="flow1", job_id="job1", cluster="prod"
),
aspectName="dataJobInfo",
aspect=datajob_info,
)

# Create an emitter to the GMS REST API.
emitter = DatahubRestEmitter("http://localhost:8080")

# Emit metadata!
emitter.emit_mcp(chart_info_mcp)

0 comments on commit f1045f8

Please sign in to comment.