Skip to content

Commit 4b03f85

Browse files
committed
[DOP-33903] Document job dependencies
1 parent 688cba0 commit 4b03f85

9 files changed

Lines changed: 167 additions & 6 deletions

File tree

README.rst

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,12 @@ Run-level lineage graph
106106
.. image:: docs/entities/run_lineage.png
107107
:alt: Job-level lineage graph
108108

109+
Hierarchy graph
110+
~~~~~~~~~~~~~~~
111+
112+
.. image:: docs/integrations/airflow/job_hierarchy.png
113+
:alt: Job hierarchy
114+
109115
Datasets
110116
~~~~~~~~
111117

@@ -143,13 +149,13 @@ Hive query
143149
:alt: Hive query details
144150

145151
Airflow DagRun
146-
~~~~~~~~~~~~~~~
152+
~~~~~~~~~~~~~~
147153

148154
.. image:: docs/integrations/airflow/dag_run_details.png
149155
:alt: Airflow DagRun details
150156

151157
Airflow TaskInstance
152-
~~~~~~~~~~~~~~~~~~~~~
158+
~~~~~~~~~~~~~~~~~~~~
153159

154160
.. image:: docs/integrations/airflow/task_run_details.png
155161
:alt: Airflow TaskInstance details

docs/comparison.rst

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ DataHub cons
1313
To extract and draw lineage between tables, it is required to *both* connect ingestor to all databases, and to enable integration with ETL (Spark, Airflow, etc).
1414

1515
There is an option ``spark.datahub.metadata.dataset.materialize=true``, but in this case DataHub creates datasets without schema,
16-
so ingestors are still required.
16+
so ingestors are still required for column lineage.
1717

1818
* DataHub Spark agent doesn't properly work if *Platform Instances* are enabled in DataHub.
1919
Platform Instance is an additional hierarchy level for databases,
@@ -23,6 +23,8 @@ DataHub cons
2323

2424
Data.Rentgen has configurable ``granularity`` option while rendering the lineage graph.
2525

26+
* No support for Job → Job hierarchy like Airflow Task → Spark application, or Airflow Task → Airflow Task dependencies.
27+
2628
* High CPU and memory consumption.
2729

2830
DataHub pros
@@ -41,6 +43,7 @@ OpenMetadata cons
4143

4244
* Database ingestors are required to build a lineage graph, just like DataHub.
4345
* OpenLineage → OpenMetadata integration produces no lineage, for some unknown reason.
46+
* No support for Job → Job hierarchy like Airflow Task → Spark application, or Airflow Task → Airflow Task dependencies.
4447
* High CPU and memory consumption.
4548

4649
OpenMetadata pros
@@ -64,7 +67,7 @@ Marquez cons
6467

6568
* Severe performance issues while consuming lineage events.
6669
* No support for dataset symlinks, e.g. HDFS location → Hive table.
67-
* No support for parent runs, e.g. Airflow task → Spark application.
70+
* No support for Job → Job hierarchy like Airflow Task → Spark application, or Airflow Task → Airflow Task dependencies.
6871
* No releases since 2024.
6972

7073
Marquez pros

docs/entities/index.rst

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -321,6 +321,20 @@ It contains following fields:
321321

322322
.. image:: parent_relation.png
323323

324+
Dependency relation
325+
~~~~~~~~~~~~~~~~~~~
326+
327+
Relation between job/job or run/run which shows the order of executing ETL jobs.
328+
For example, one Airflow Task can depend on another Airflow Task.
329+
330+
It contains following fields:
331+
332+
- ``from: Job | Run`` - entity which should be waited before current job/run will be started.
333+
- ``to: Job | Run`` - entity which waits.
334+
- ``type: str`` - type of dependency, any arbitrary string provided by integration, usually something like ``DIRECT_DEPENDENCY``, ``INDIRECT_DEPENDENCY``.
335+
336+
.. image:: job_dependencies.png
337+
324338
Input relation
325339
~~~~~~~~~~~~~~
326340

docs/entities/job_dependencies.png

129 KB
Loading

docs/index.rst

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,12 @@ Run-level lineage graph
8686
.. image:: entities/run_lineage.png
8787
:alt: Job-level lineage graph
8888

89+
Hierarchy graph
90+
~~~~~~~~~~~~~~~
91+
92+
.. image:: integrations/airflow/job_hierarchy.png
93+
:alt: Job hierarchy
94+
8995
Datasets
9096
~~~~~~~~
9197

@@ -123,13 +129,13 @@ Hive query
123129
:alt: Hive query details
124130

125131
Airflow DagRun
126-
~~~~~~~~~~~~~~~
132+
~~~~~~~~~~~~~~
127133

128134
.. image:: integrations/airflow/dag_run_details.png
129135
:alt: Airflow DagRun details
130136

131137
Airflow TaskInstance
132-
~~~~~~~~~~~~~~~~~~~~~
138+
~~~~~~~~~~~~~~~~~~~~
133139

134140
.. image:: integrations/airflow/task_run_details.png
135141
:alt: Airflow TaskInstance details

docs/integrations/airflow/index.rst

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,11 @@ Job level lineage
197197

198198
.. image:: ./job_lineage.png
199199

200+
Job dependencies
201+
~~~~~~~~~~~~~~~~
202+
203+
.. image:: ./job_hierarchy.png
204+
200205
Extra configuration
201206
-------------------
202207

131 KB
Loading

docs/integrations/dbt/index.rst

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,3 +166,66 @@ It is possible to provide custom tags via model config:
166166
+tags:
167167
- environment:production
168168
- layer:bronze
169+
170+
Binding Airflow Task with Spark application
171+
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
172+
173+
If OpenLineage event contains `Parent Run facet <https://openlineage.io/docs/spec/facets/run-facets/parent_run/>`_,
174+
DataRentgen can use this information to bind dbt run to the run it was triggered by, e.g. Airflow task:
175+
176+
.. image:: ../airflow/job_hierarchy.png
177+
178+
To fill up this facet, it is required to:
179+
180+
* Setup OpenLineage integration for dbt
181+
* Setup :ref:`OpenLineage integration for Airflow <overview-setup-airflow>`
182+
* Pass parent Run info from Airflow to dbt by using `Airflow macros <https://airflow.apache.org/docs/apache-airflow-providers-openlineage/stable/macros.html#lineage-job-run-macros>`_:
183+
184+
.. tabs::
185+
186+
.. code-tab:: py BashOperator
187+
188+
from airflow.providers.standard.operators.bash import BashOperator
189+
190+
task = BashOperator(
191+
task_id="dbt_run_task",
192+
cwd="/path/to/project",
193+
bash_command="dbt-ol run",
194+
append_env=True,
195+
env={
196+
# Pass parent Run info from Airflow to Spark
197+
"OPENLINEAGE_PARENT_ID": "{{ macros.OpenLineageProviderPlugin.lineage_parent_id(task_instance) }}",
198+
# For apache-airflow-providers-openlineage 2.4.0 or above
199+
"OPENLINEAGE_ROOT_PARENT_ID": "{{ macros.OpenLineageProviderPlugin.lineage_root_parent_id(task_instance) }}",
200+
}
201+
)
202+
203+
.. code-tab:: py SSHOperator
204+
205+
from airflow.providers.ssh.operators.ssh import SSHOperator
206+
207+
task = SSHOperator(
208+
task_id="dbt_run_task",
209+
ssh_conn_id="some_host",
210+
command="cd /path/to/project && dbt-ol run",
211+
environment={
212+
"OPENLINEAGE_PARENT_ID": "{{ macros.OpenLineageProviderPlugin.lineage_parent_id(task_instance) }}",
213+
# For apache-airflow-providers-openlineage 2.4.0 or above
214+
"OPENLINEAGE_ROOT_PARENT_ID": "{{ macros.OpenLineageProviderPlugin.lineage_root_parent_id(task_instance) }}",
215+
}
216+
)
217+
218+
.. code-tab:: py KubernetesPodOperator
219+
220+
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
221+
222+
task = SSHOperator(
223+
task_id="dbt_run_task",
224+
cmds=["bash", "-cx"],
225+
arguments=["cd /path/to/project && dbt-ol run"],
226+
env_vars={
227+
"OPENLINEAGE_PARENT_ID": "{{ macros.OpenLineageProviderPlugin.lineage_parent_id(task_instance) }}",
228+
# For apache-airflow-providers-openlineage 2.4.0 or above
229+
"OPENLINEAGE_ROOT_PARENT_ID": "{{ macros.OpenLineageProviderPlugin.lineage_root_parent_id(task_instance) }}",
230+
}
231+
)

docs/integrations/spark/index.rst

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -304,3 +304,67 @@ It is possible to provide custom job tags using OpenLineage configuration:
304304
:caption: etl.py
305305
306306
SparkSession.builder.config("spark.openlineage.job.tags", "environment:production;layer:bronze")
307+
308+
Binding Airflow Task with Spark application
309+
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
310+
311+
If OpenLineage event contains `Parent Run facet <https://openlineage.io/docs/spec/facets/run-facets/parent_run/>`_,
312+
DataRentgen can use this information to bind Spark application to the run it was triggered by, e.g. Airflow task:
313+
314+
.. image:: ../airflow/job_hierarchy.png
315+
316+
To fill up this facet, it is required to:
317+
318+
* Setup OpenLineage integration for Spark
319+
* Setup :ref:`OpenLineage integration for Airflow <overview-setup-airflow>`
320+
* `Pass parent Run info from Airflow to Spark <https://openlineage.io/docs/integrations/spark/configuration/airflow#preserving-job-hierarchy>`_:
321+
322+
.. code-block:: python
323+
:caption: dag.py
324+
325+
def my_etl(
326+
parent_job_namespace: str,
327+
parent_job_name: str,
328+
parent_run_id: str,
329+
root_job_namespace: str,
330+
root_job_name: str,
331+
root_run_id: str,
332+
):
333+
spark = (
334+
SparkSession.builder
335+
# install OpenLineage integration (see above)
336+
# Pass parent Run info from Airflow to Spark
337+
.config("spark.openlineage.parentJobNamespace", parent_job_namespace)
338+
.config("spark.openlineage.parentJobName", parent_job_name)
339+
.config("spark.openlineage.parentRunId", parent_run_id)
340+
.config("spark.openlineage.rootJobNamespace", root_job_namespace)
341+
.config("spark.openlineage.rootJobName", root_job_name)
342+
.config("spark.openlineage.rootRunId", root_run_id)
343+
.getOrCreate()
344+
)
345+
346+
with spark:
347+
# actual ETL code
348+
349+
350+
from airflow.providers.standard.operators.python import PythonOperator
351+
352+
task = PythonOperator(
353+
task_id="spark_etl",
354+
python_callable=my_etl,
355+
# Using Jinja templates to pass Airflow macros to Python function
356+
op_kwargs={
357+
"parent_job_namespace": "{{ macros.OpenLineageProviderPlugin.lineage_job_namespace() }}",
358+
"parent_job_name": "{{ macros.OpenLineageProviderPlugin.lineage_job_name(task_instance) }}",
359+
"parent_run_id": "{{ macros.OpenLineageProviderPlugin.lineage_run_id(task_instance) }}",
360+
# For apache-airflow-providers-openlineage 2.4.0 or above
361+
"root_job_namespace": "{{ macros.OpenLineageProviderPlugin.lineage_root_job_namespace(task_instance) }}",
362+
"root_job_name": "{{ macros.OpenLineageProviderPlugin.lineage_root_job_name(task_instance) }}",
363+
"root_run_id": "{{ macros.OpenLineageProviderPlugin.lineage_root_run_id(task_instance) }}",
364+
},
365+
)
366+
367+
The exact way of substituting Airflow macros to SparkSession config may be different depending on used Airflow operator:
368+
* PythonOperator - via kwargs & `Airflow macros <https://airflow.apache.org/docs/apache-airflow-providers-openlineage/stable/macros.html#lineage-job-run-macros>`_:
369+
* BashOperator, SSHOperator, KubernetesPodOperator - via environment variables & Airflow macros
370+
* SparkSubmitOperator - via `spark_inject_parent_job_info=true in airflow.conf <https://openlineage.io/docs/integrations/spark/configuration/airflow#automatic-injection>`_

0 commit comments

Comments
 (0)