Skip to content

Commit bd161f4

Browse files
committed
[DOP-25498] Allow to parse unknown RunEvent as Run+Operation
1 parent 559d78b commit bd161f4

21 files changed

Lines changed: 1618 additions & 744 deletions

data_rentgen/consumer/extractors/batch_extractor.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,17 +28,19 @@ def add_events(self, events: list[OpenLineageRunEvent]) -> BatchExtractionResult
2828
return self.result
2929

3030
def is_operation(self, event: OpenLineageRunEvent) -> bool:
31+
has_lineage = bool(event.inputs or event.outputs)
32+
3133
job_type_facet = event.job.facets.jobType
3234
if not job_type_facet:
33-
return False
35+
return has_lineage
3436

3537
if job_type_facet.integration == "SPARK":
3638
return job_type_facet.jobType != "APPLICATION"
3739

3840
if job_type_facet.integration == "AIRFLOW":
3941
return False
4042

41-
return False
43+
return has_lineage
4244

4345
def extract_run(self, event: OpenLineageRunEvent) -> None:
4446
run = extract_run(event)

data_rentgen/consumer/extractors/operation.py

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
# SPDX-FileCopyrightText: 2024-2025 MTS PJSC
22
# SPDX-License-Identifier: Apache-2.0
33

4-
from data_rentgen.consumer.extractors.run import extract_parent_run
4+
from data_rentgen.consumer.extractors.run import extract_parent_run, extract_run
55
from data_rentgen.consumer.openlineage.run_event import (
66
OpenLineageRunEvent,
77
OpenLineageRunEventType,
@@ -10,25 +10,30 @@
1010

1111

1212
def extract_operation(event: OpenLineageRunEvent) -> OperationDTO:
13-
# operation always has parent
14-
run = extract_parent_run(event.run.facets.parent) # type: ignore[arg-type]
13+
if event.run.facets.parent and event.job.facets.jobType and event.job.facets.jobType.integration == "SPARK":
14+
run = extract_parent_run(event.run.facets.parent)
15+
else:
16+
run = extract_run(event)
1517

1618
# in some cases, operation name may contain raw SELECT query with newlines
1719
operation_name = " ".join(line.strip() for line in event.job.name.splitlines()).strip()
1820
# remove parent job name from operation name
19-
if operation_name.startswith(run.job.name):
21+
if operation_name.startswith(run.job.name) and operation_name != run.job.name:
2022
prefix = len(run.job.name) + 1
2123
operation_name = operation_name[prefix:]
2224

2325
type_: OperationTypeDTO = OperationTypeDTO.BATCH
24-
if event.job.facets.jobType and event.job.facets.jobType.processingType:
26+
if event.job.facets.jobType:
2527
type_ = OperationTypeDTO(event.job.facets.jobType.processingType)
2628

2729
operation = OperationDTO(
2830
id=event.run.runId, # type: ignore [arg-type]
2931
run=run,
3032
name=operation_name,
3133
type=type_,
34+
status=OperationStatusDTO(run.status),
35+
started_at=run.started_at,
36+
ended_at=run.ended_at,
3237
)
3338
enrich_operation_status(operation, event)
3439
enrich_operation_description(operation, event)

tests/conftest.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,22 @@
77
"tests.test_database.fixtures.alembic",
88
"tests.test_consumer.fixtures.consumer_app_settings",
99
"tests.test_consumer.fixtures.test_broker",
10-
"tests.test_consumer.test_extractors.fixtures.extracted_dto",
11-
"tests.test_consumer.test_extractors.fixtures.column_lineage_facets",
10+
"tests.test_consumer.test_extractors.fixtures.io_raw",
11+
"tests.test_consumer.test_extractors.fixtures.io_dto",
12+
"tests.test_consumer.test_extractors.fixtures.spark_raw",
13+
"tests.test_consumer.test_extractors.fixtures.spark_dto",
14+
"tests.test_consumer.test_extractors.fixtures.airflow_raw",
15+
"tests.test_consumer.test_extractors.fixtures.airflow_dto",
16+
"tests.test_consumer.test_extractors.fixtures.unknown_raw",
17+
"tests.test_consumer.test_extractors.fixtures.unknown_dto",
18+
"tests.test_consumer.test_extractors.fixtures.column_lineage_raw",
1219
"tests.test_server.fixtures.server_app_settings",
1320
"tests.test_server.fixtures.test_server_app",
1421
"tests.test_server.fixtures.test_client",
1522
"tests.test_server.fixtures.keycloak",
1623
"tests.test_server.fixtures.factories.address",
1724
"tests.test_server.fixtures.factories.dataset",
25+
"tests.test_server.fixtures.factories.job_type",
1826
"tests.test_server.fixtures.factories.job",
1927
"tests.test_server.fixtures.factories.lineage",
2028
"tests.test_server.fixtures.factories.location",

tests/test_consumer/resources/events_spark.jsonl

Lines changed: 5 additions & 5 deletions
Large diffs are not rendered by default.
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
{"eventTime": "2024-07-05T09:06:29.462Z", "producer": "unknown://unknown", "schemaURL": "https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunEvent", "eventType": "START", "run": {"runId": "01908224-8410-79a2-8de6-a769ad6944c9"}, "job": {"namespace": "unknown://unknown", "name": "somejob", "facets": {"jobType": {"_producer": "unknown://unknown", "_schemaURL": "https://openlineage.io/spec/facets/2-0-2/JobTypeJobFacet.json#/$defs/JobTypeJobFacet", "processingType": "NONE", "integration": "UNKNOWN", "jobType": "SOMETHING"}}}, "inputs": [], "outputs": []}
2+
{"eventTime": "2024-07-05T09:07:09.849Z", "producer": "unknown://unknown", "schemaURL": "https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunEvent", "eventType": "RUNNING", "run": {"runId": "01908224-8410-79a2-8de6-a769ad6944c9"}, "job": {"namespace": "unknown://unknown", "name": "somejob", "facets": {"jobType": {"_producer": "unknown://unknown", "_schemaURL": "https://openlineage.io/spec/facets/2-0-2/JobTypeJobFacet.json#/$defs/JobTypeJobFacet", "processingType": "NONE", "integration": "UNKNOWN", "jobType": "SOMETHING"}}}, "inputs": [{"namespace": "hive://test-hadoop:9083", "name": "mydatabase.source_table", "facets": {"dataSource": {"_producer": "unknown://unknown", "_schemaURL": "https://openlineage.io/spec/facets/1-0-1/DatasourceDatasetFacet.json#/$defs/DatasourceDatasetFacet", "name": "hive://test-hadoop:9083", "uri": "hive://test-hadoop:9083"}, "schema": {"_producer": "unknown://unknown", "_schemaURL": "https://openlineage.io/spec/facets/1-1-1/SchemaDatasetFacet.json#/$defs/SchemaDatasetFacet", "fields": [{"name": "dt", "type": "timestamp", "description": "Business date"}, {"name": "customer_id", "type": "decimal(20,0)"}, {"name": "total_spent", "type": "float"}]}}, "inputFacets": {}}], "outputs": [{"namespace": "clickhouse://localhost:8123", "name": "mydb.myschema.mytable", "facets": {"dataSource": {"_producer": "unknown://unknown", "_schemaURL": "https://openlineage.io/spec/facets/1-0-1/DatasourceDatasetFacet.json#/$defs/DatasourceDatasetFacet", "name": "clickhouse://localhost:8123", "uri": "clickhouse://localhost:8123"}, "schema": {"_producer": "unknown://unknown", "_schemaURL": "https://openlineage.io/spec/facets/1-1-1/SchemaDatasetFacet.json#/$defs/SchemaDatasetFacet", "fields": [{"name": "dt", "type": "timestamp"}, {"name": "customer_id", "type": "decimal(20,0)"}, {"name": "total_spent", "type": "float"}]}, "columnLineage": {"_producer": "unknown://unknown", "_schemaURL": "https://openlineage.io/spec/facets/1-2-0/ColumnLineageDatasetFacet.json#/$defs/ColumnLineageDatasetFacet", "fields": {"dt": {"inputFields": [{"namespace": "hive://test-hadoop:9083", "name": "mydatabase.source_table", "field": "dt", "transformations": [{"type": "DIRECT", "subtype": "IDENTITY", "description": "", "masking": false}]}]}, "customer_id": {"inputFields": [{"namespace": "hive://test-hadoop:9083", "name": "mydatabase.source_table", "field": "customer_id", "transformations": [{"type": "DIRECT", "subtype": "IDENTITY", "description": "", "masking": false}]}]}, "total_spent": {"inputFields": [{"namespace": "hive://test-hadoop:9083", "name": "mydatabase.source_table", "field": "total_spent", "transformations": [{"type": "DIRECT", "subtype": "IDENTITY", "description": "", "masking": false}]}]}}, "dataset": [{"namespace": "hive://test-hadoop:9083", "name": "mydatabase.source_table", "field": "customer_id", "transformations": [{"type": "INDIRECT", "subtype": "JOIN", "description": "ON (DISCOUNTS.CUSTOMERS_ID=CUSTOMERS.ID)", "masking": false}]}]}, "lifecycleStateChange": {"_producer": "unknown://unknown", "_schemaURL": "https://openlineage.io/spec/facets/1-0-1/LifecycleStateChangeDatasetFacet.json#/$defs/LifecycleStateChangeDatasetFacet", "lifecycleStateChange": "OVERWRITE"}}}]}
3+
{"eventTime": "2024-07-05T09:07:15.642Z", "producer": "unknown://unknown", "schemaURL": "https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunEvent", "eventType": "COMPLETE", "run": {"runId": "01908224-8410-79a2-8de6-a769ad6944c9"}, "job": {"namespace": "unknown://unknown", "name": "somejob", "facets": {"jobType": {"_producer": "unknown://unknown", "_schemaURL": "https://openlineage.io/spec/facets/2-0-2/JobTypeJobFacet.json#/$defs/JobTypeJobFacet", "processingType": "NONE", "integration": "UNKNOWN", "jobType": "SOMETHING"}}}, "inputs": [{"namespace": "hive://test-hadoop:9083", "name": "mydatabase.source_table", "facets": {"dataSource": {"_producer": "unknown://unknown", "_schemaURL": "https://openlineage.io/spec/facets/1-0-1/DatasourceDatasetFacet.json#/$defs/DatasourceDatasetFacet", "name": "hive://test-hadoop:9083", "uri": "hive://test-hadoop:9083"}, "schema": {"_producer": "unknown://unknown", "_schemaURL": "https://openlineage.io/spec/facets/1-1-1/SchemaDatasetFacet.json#/$defs/SchemaDatasetFacet", "fields": [{"name": "dt", "type": "timestamp", "description": "Business date"}, {"name": "customer_id", "type": "decimal(20,0)"}, {"name": "total_spent", "type": "float"}]}}, "inputFacets": {}}], "outputs": [{"namespace": "clickhouse://localhost:8123", "name": "mydb.myschema.mytable", "facets": {"dataSource": {"_producer": "unknown://unknown", "_schemaURL": "https://openlineage.io/spec/facets/1-0-1/DatasourceDatasetFacet.json#/$defs/DatasourceDatasetFacet", "name": "clickhouse://localhost:8123", "uri": "clickhouse://localhost:8123"}, "schema": {"_producer": "unknown://unknown", "_schemaURL": "https://openlineage.io/spec/facets/1-1-1/SchemaDatasetFacet.json#/$defs/SchemaDatasetFacet", "fields": [{"name": "dt", "type": "timestamp"}, {"name": "customer_id", "type": "decimal(20,0)"}, {"name": "total_spent", "type": "float"}]}, "columnLineage": {"_producer": "unknown://unknown", "_schemaURL": "https://openlineage.io/spec/facets/1-2-0/ColumnLineageDatasetFacet.json#/$defs/ColumnLineageDatasetFacet", "fields": {"dt": {"inputFields": [{"namespace": "hive://test-hadoop:9083", "name": "mydatabase.source_table", "field": "dt", "transformations": [{"type": "DIRECT", "subtype": "IDENTITY", "description": "", "masking": false}]}]}, "customer_id": {"inputFields": [{"namespace": "hive://test-hadoop:9083", "name": "mydatabase.source_table", "field": "customer_id", "transformations": [{"type": "DIRECT", "subtype": "IDENTITY", "description": "", "masking": false}]}]}, "total_spent": {"inputFields": [{"namespace": "hive://test-hadoop:9083", "name": "mydatabase.source_table", "field": "total_spent", "transformations": [{"type": "DIRECT", "subtype": "IDENTITY", "description": "", "masking": false}]}]}}}, "lifecycleStateChange": {"_producer": "unknown://unknown", "_schemaURL": "https://openlineage.io/spec/facets/1-0-1/LifecycleStateChangeDatasetFacet.json#/$defs/LifecycleStateChangeDatasetFacet", "lifecycleStateChange": "OVERWRITE"}}, "outputFacets": {"outputStatistics": {"_producer": "unknown://unknown", "_schemaURL": "https://openlineage.io/spec/facets/1-0-2/OutputStatisticsOutputDatasetFacet.json#/$defs/OutputStatisticsOutputDatasetFacet", "rowCount": 10000, "size": 5000000}}}]}
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
from datetime import datetime, timezone
2+
from uuid import UUID
3+
4+
import pytest
5+
6+
from data_rentgen.dto import (
7+
JobDTO,
8+
LocationDTO,
9+
RunDTO,
10+
RunStartReasonDTO,
11+
RunStatusDTO,
12+
UserDTO,
13+
)
14+
from data_rentgen.dto.job_type import JobTypeDTO
15+
16+
17+
@pytest.fixture
18+
def extracted_airflow_location() -> LocationDTO:
19+
return LocationDTO(
20+
type="http",
21+
name="airflow-host:8081",
22+
addresses={"http://airflow-host:8081"},
23+
)
24+
25+
26+
@pytest.fixture
27+
def extracted_airflow_dag_job(
28+
extracted_airflow_location: LocationDTO,
29+
) -> JobDTO:
30+
return JobDTO(
31+
name="mydag",
32+
location=extracted_airflow_location,
33+
type=JobTypeDTO(type="AIRFLOW_DAG"),
34+
)
35+
36+
37+
@pytest.fixture
38+
def extracted_airflow_task_job(
39+
extracted_airflow_location: LocationDTO,
40+
) -> JobDTO:
41+
return JobDTO(
42+
name="mydag.mytask",
43+
location=extracted_airflow_location,
44+
type=JobTypeDTO(type="AIRFLOW_TASK"),
45+
)
46+
47+
48+
@pytest.fixture
49+
def extracted_airflow_dag_run(
50+
extracted_airflow_dag_job: JobDTO,
51+
) -> RunDTO:
52+
return RunDTO(
53+
id=UUID("01908223-0782-79b8-9495-b1c38aaee839"),
54+
job=extracted_airflow_dag_job,
55+
status=RunStatusDTO.SUCCEEDED,
56+
started_at=datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc),
57+
start_reason=RunStartReasonDTO.MANUAL,
58+
user=UserDTO(
59+
name="myuser",
60+
id=None,
61+
),
62+
ended_at=datetime(2024, 7, 5, 9, 8, 5, 691973, tzinfo=timezone.utc),
63+
external_id="manual__2024-07-05T09:04:13:979349+00:00",
64+
persistent_log_url="http://airflow-host:8081/dags/mydag/grid?dag_run_id=manual__2024-07-05T09%3A04%3A13%3A979349%2B00%3A00",
65+
)
66+
67+
68+
@pytest.fixture
69+
def extracted_airflow_task_run(
70+
extracted_airflow_task_job: JobDTO,
71+
) -> RunDTO:
72+
return RunDTO(
73+
id=UUID("01908223-0782-7fc0-9d69-b1df9dac2c60"),
74+
job=extracted_airflow_task_job,
75+
status=RunStatusDTO.SUCCEEDED,
76+
started_at=datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc),
77+
start_reason=RunStartReasonDTO.MANUAL,
78+
user=UserDTO(
79+
name="myuser",
80+
id=None,
81+
),
82+
ended_at=datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc),
83+
external_id="manual__2024-07-05T09:04:13:979349+00:00",
84+
attempt="1",
85+
persistent_log_url=(
86+
"http://airflow-host:8081/dags/mydag/grid?tab=logs&dag_run_id=manual__2024-07-05T09%3A04%3A13%3A979349%2B00%3A00&task_id=mytask"
87+
),
88+
)

0 commit comments

Comments
 (0)