OpenLineage configuration and facets

OpenLineage is a lineage metadata extraction library that you can install in a target application such as Apache Airflow or Apache Spark. Once you have installed OpenLineage, you can configure the target application to integrate with Atlan. This will allow Atlan to receive OpenLineage events and catalog your assets from supported sources. You will neither have to clone a GitHub repository nor make any code changes to your DAGs.

To install OpenLineage, refer to the documentation for supported sources:

💪 Did you know? To add lineage support to sources other than the ones listed above, you can use OpenLineage's extensible specification. Refer to our developer documentation to learn more.

Example

Apache Airflow

Once you have configured a supported Apache Airflow distribution, you can run a sample DAG to confirm that your assets are being crawled in Atlan. Although Atlan strongly recommends running the preflight check DAG to test your Apache Airflow connection, you can also use the example DAG below to verify your setup.

For example:

import json
from pendulum import datetime

from airflow.decorators import (
    dag,
    task,
)
@dag(
    dag_id="example_dag_basic",
    schedule="@once",
    start_date=datetime(2023, 1, 1),
    catchup=False,
    tags=["example"],
)
def example_dag_basic():

    @task()
    def extract():
        data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'

        order_data_dict = json.loads(data_string)
        return order_data_dict

    @task(multiple_outputs=True)
    def transform(order_data_dict: dict):
        total_order_value = 0

        for value in order_data_dict.values():
            total_order_value += value

        return {"total_order_value": total_order_value}

    @task()
    def load(total_order_value: float):
        print(f"Total order value is: {total_order_value:.2f}")

    order_data = extract()
    order_summary = transform(order_data)
    load(order_summary["total_order_value"])


example_dag_basic()

Apache Spark

Once you have configured Apache Spark, you can run a sample Spark job to confirm that your assets are being crawled in Atlan.

For example:

from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Create a Spark session and configure the spark properties
spark = (SparkSession.builder.master('local')
         .appName('data_pipeline_sample')
         .getOrCreate())

snowflake_options = {
    "sfURL": ".snowflakecomputing.com",
    "sfUser": "",
    "sfPassword": "",
    "sfDatabase": "",
    "sfWarehouse": "",
    "sfSchema": "",
    "sfRole": "",
}

instacart_df = spark.read \
    .format("snowflake") \
    .options(**snowflake_options) \
    .option("dbtable", "table1") \
    .load()

filtered_df = instacart_df.filter(col('"order_id"') == '123456')

filtered_df.write \
    .format("snowflake") \
    .options(**snowflake_options) \
    .option("dbtable", "table2") \
    .mode("append") \
    .save()

spark.stop()

Supported facets

An OpenLineage event will contain the following object model: dataset, job, and run entities. In addition, OpenLineage supports facets to provide contextual metadata for events.

Atlan currently only processes the following facets for OpenLineage events:

Apache Airflow

OpenLineage facet Description Where in Atlan
job.facets.jobType Apache Airflow asset type (task or DAG) asset profile, preview, and sidebar
run.facets.airflow DAG details, including runs, tasks, owner, and task group asset profile, overview sidebar, and pipeline graph
run.facets.airflow_version Apache Airflow version and DAG metadata API only
run.facets.parentRun parent DAG for tasks API only
run.facets.processing_engine Apache Airflow and OpenLineage versions API only
outputs.facets.columnLineage fetches column lineage lineage graph

Apache Spark

OpenLineage facet Description Where in Atlan
eventType job run status overview sidebar
eventTime job start and end time asset profile
job.namespace connection name asset profile and overview sidebar
job.name Spark job name asset name
run.runId Spark job name run ID API only
run.facets.spark_version Spark version overview sidebar
run.facets.spark_properties OpenLineage package version API only
run.facets.processing_engine Spark cluster details API only
inputs.facets.name links input facets related assets and pipeline graph
outputs.facets.name links output facets related assets and pipeline graph
inputs.facets.namespace input type related assets and pipeline graph
outputs.facets.namespace output type related assets and pipeline graph
inputs.facets.symlinks retrieves logical entity API only
outputs.facets.symlinks retrieves logical entity API only
outputs.facets.columnLineage fetches column lineage lineage graph

Related articles

Was this article helpful?
0 out of 0 found this helpful