Skip to content

Commit 7a94ee2

Browse files
committed
[DOP-25464] Make consumer message parsing more robust
1 parent b170fc9 commit 7a94ee2

21 files changed

Lines changed: 284 additions & 91 deletions

.env.local

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,7 @@ export DATA_RENTGEN__KAFKA__SECURITY__USER=data_rentgen
88
export DATA_RENTGEN__KAFKA__SECURITY__PASSWORD=changeme
99
export DATA_RENTGEN__KAFKA__COMPRESSION=zstd
1010
#export DATA_RENTGEN__CONSUMER__MAX_RECORDS=100
11-
# Handling events with a lot of column lineage takes so much time
12-
# that Kafka coodrinator consider worker as dead. Limit by total message size.
13-
# This value depends on the number of OL version, complexity of Spark jobs, number of Kafka partitions and number of workers.
14-
export DATA_RENTGEN__CONSUMER__MAX_PARTITION_FETCH_BYTES=200Kb
11+
#export DATA_RENTGEN__CONSUMER__MAX_PARTITION_FETCH_BYTES=5MB
1512

1613
export DATA_RENTGEN__SERVER__DEBUG=True
1714

data_rentgen/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
# _raw_version could contain pre-release version, like 0.0.1dev123
55
# value is updated automatically by `poetry version ...` and poetry-bumpversion plugin
6-
_raw_version = "0.2.2"
6+
_raw_version = "0.3.0"
77

88
# version always contain only release number like 0.0.1
99
__version__ = ".".join(_raw_version.split(".")[:3])

data_rentgen/consumer/__init__.py

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
from faststream._compat import ExceptionGroup
1212
from faststream.asgi import AsgiFastStream, AsgiResponse, get
1313
from faststream.kafka import KafkaBroker
14+
from faststream.kafka.publisher.asyncapi import AsyncAPIDefaultPublisher
1415
from sqlalchemy.ext.asyncio import AsyncSession
1516

1617
import data_rentgen
@@ -38,15 +39,21 @@ def broker_factory(settings: ConsumerApplicationSettings) -> KafkaBroker:
3839
)
3940

4041
# register subscribers using settings
41-
consumer_settings = settings.consumer.model_dump(exclude={"topics_list", "topics_pattern"})
42-
broker.subscriber(
42+
consumer_settings = settings.consumer.model_dump(exclude={"topics_list", "topics_pattern", "malformed_topic"})
43+
44+
subscriber = broker.subscriber(
4345
*settings.consumer.topics_list,
4446
pattern=settings.consumer.topics_pattern,
4547
**consumer_settings,
4648
batch=True,
47-
)(runs_events_subscriber)
49+
)
50+
publisher = broker.publisher(settings.producer.malformed_topic)
51+
52+
# perform registration
53+
subscriber(runs_events_subscriber)
4854

4955
dependency_provider.override(AsyncSession, create_session_factory(settings.database))
56+
dependency_provider.override(AsyncAPIDefaultPublisher, lambda: publisher)
5057
return broker
5158

5259

data_rentgen/consumer/extractors/__init__.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
# SPDX-FileCopyrightText: 2024-2025 MTS PJSC
22
# SPDX-License-Identifier: Apache-2.0
33

4-
from data_rentgen.consumer.extractors.batch import BatchExtractionResult, extract_batch
4+
from data_rentgen.consumer.extractors.batch_extraction_result import BatchExtractionResult
5+
from data_rentgen.consumer.extractors.batch_extractor import BatchExtractor
56
from data_rentgen.consumer.extractors.column_lineage import extract_column_lineage
67
from data_rentgen.consumer.extractors.dataset import (
78
connect_dataset_with_symlinks,
@@ -17,8 +18,8 @@
1718

1819
__all__ = [
1920
"BatchExtractionResult",
21+
"BatchExtractor",
2022
"connect_dataset_with_symlinks",
21-
"extract_batch",
2223
"extract_column_lineage",
2324
"extract_dataset",
2425
"extract_dataset_and_symlinks",

data_rentgen/consumer/extractors/batch.py renamed to data_rentgen/consumer/extractors/batch_extraction_result.py

Lines changed: 0 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,6 @@
44

55
from typing import TypeVar
66

7-
from data_rentgen.consumer.extractors.column_lineage import extract_column_lineage
8-
from data_rentgen.consumer.extractors.input import extract_input
9-
from data_rentgen.consumer.extractors.operation import extract_operation
10-
from data_rentgen.consumer.extractors.output import extract_output
11-
from data_rentgen.consumer.extractors.run import extract_run
12-
from data_rentgen.consumer.openlineage.job_facets.job_type import OpenLineageJobType
13-
from data_rentgen.consumer.openlineage.run_event import OpenLineageRunEvent
147
from data_rentgen.dto import (
158
ColumnLineageDTO,
169
DatasetDTO,
@@ -252,44 +245,3 @@ def schemas(self) -> list[SchemaDTO]:
252245

253246
def users(self) -> list[UserDTO]:
254247
return list(map(self.get_user, self._users))
255-
256-
257-
def extract_batch(events: list[OpenLineageRunEvent]) -> BatchExtractionResult:
258-
result = BatchExtractionResult()
259-
dataset_cache: dict[tuple[str, str], DatasetDTO] = {}
260-
261-
for event in events:
262-
if event.job.facets.jobType and event.job.facets.jobType.jobType == OpenLineageJobType.JOB:
263-
operation = extract_operation(event)
264-
result.add_operation(operation)
265-
266-
for input_dataset in event.inputs:
267-
input_dto, symlink_dtos = extract_input(operation, input_dataset)
268-
269-
result.add_input(input_dto)
270-
dataset_dto_cache_key = (input_dataset.namespace, input_dataset.name)
271-
dataset_cache[dataset_dto_cache_key] = result.get_dataset(input_dto.dataset.unique_key)
272-
273-
for symlink_dto in symlink_dtos:
274-
result.add_dataset_symlink(symlink_dto)
275-
276-
for output_dataset in event.outputs:
277-
output_dto, symlink_dtos = extract_output(operation, output_dataset)
278-
279-
result.add_output(output_dto)
280-
dataset_dto_cache_key = (output_dataset.namespace, output_dataset.name)
281-
dataset_cache[dataset_dto_cache_key] = result.get_dataset(output_dto.dataset.unique_key)
282-
283-
for symlink_dto in symlink_dtos:
284-
result.add_dataset_symlink(symlink_dto)
285-
286-
for dataset in event.inputs + event.outputs:
287-
column_lineage = extract_column_lineage(operation, dataset, dataset_cache)
288-
for item in column_lineage:
289-
result.add_column_lineage(item)
290-
291-
else:
292-
run = extract_run(event)
293-
result.add_run(run)
294-
295-
return result
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
# SPDX-FileCopyrightText: 2024-2025 MTS PJSC
2+
# SPDX-License-Identifier: Apache-2.0
3+
from __future__ import annotations
4+
5+
from data_rentgen.consumer.extractors.batch_extraction_result import BatchExtractionResult
6+
from data_rentgen.consumer.extractors.column_lineage import extract_column_lineage
7+
from data_rentgen.consumer.extractors.input import extract_input
8+
from data_rentgen.consumer.extractors.operation import extract_operation
9+
from data_rentgen.consumer.extractors.output import extract_output
10+
from data_rentgen.consumer.extractors.run import extract_run
11+
from data_rentgen.consumer.openlineage.job_facets.job_type import OpenLineageJobType
12+
from data_rentgen.consumer.openlineage.run_event import OpenLineageRunEvent
13+
from data_rentgen.dto import (
14+
DatasetDTO,
15+
)
16+
17+
18+
class BatchExtractor:
19+
def __init__(self) -> None:
20+
self.dataset_cache: dict[tuple[str, str], DatasetDTO] = {}
21+
self.result = BatchExtractionResult()
22+
23+
def add_events(self, events: list[OpenLineageRunEvent]) -> BatchExtractionResult:
24+
for event in events:
25+
if event.job.facets.jobType and event.job.facets.jobType.jobType == OpenLineageJobType.JOB:
26+
self.extract_operation(event)
27+
else:
28+
self.extract_run(event)
29+
30+
return self.result
31+
32+
def extract_run(self, event: OpenLineageRunEvent) -> None:
33+
run = extract_run(event)
34+
self.result.add_run(run)
35+
36+
def extract_operation(self, event: OpenLineageRunEvent) -> None:
37+
operation = extract_operation(event)
38+
self.result.add_operation(operation)
39+
40+
for input_dataset in event.inputs:
41+
input_dto, symlink_dtos = extract_input(operation, input_dataset)
42+
43+
self.result.add_input(input_dto)
44+
dataset_dto_cache_key = (input_dataset.namespace, input_dataset.name)
45+
self.dataset_cache[dataset_dto_cache_key] = self.result.get_dataset(input_dto.dataset.unique_key)
46+
47+
for symlink_dto in symlink_dtos:
48+
self.result.add_dataset_symlink(symlink_dto)
49+
50+
for output_dataset in event.outputs:
51+
output_dto, symlink_dtos = extract_output(operation, output_dataset)
52+
53+
self.result.add_output(output_dto)
54+
dataset_dto_cache_key = (output_dataset.namespace, output_dataset.name)
55+
self.dataset_cache[dataset_dto_cache_key] = self.result.get_dataset(output_dto.dataset.unique_key)
56+
57+
for symlink_dto in symlink_dtos:
58+
self.result.add_dataset_symlink(symlink_dto)
59+
60+
for dataset in event.inputs + event.outputs:
61+
column_lineage = extract_column_lineage(operation, dataset, self.dataset_cache)
62+
for item in column_lineage:
63+
self.result.add_column_lineage(item)

data_rentgen/consumer/settings/__init__.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
from data_rentgen.consumer.settings.consumer import ConsumerSettings
88
from data_rentgen.consumer.settings.kafka import KafkaSettings
9+
from data_rentgen.consumer.settings.producer import ProducerSettings
910
from data_rentgen.db.settings import DatabaseSettings
1011
from data_rentgen.logging.settings import LoggingSettings
1112

@@ -50,5 +51,9 @@ class ConsumerApplicationSettings(BaseSettings):
5051
default_factory=ConsumerSettings,
5152
description=":ref:`Consumer settings <configuration-consumer-specific>`",
5253
)
54+
producer: ProducerSettings = Field(
55+
default_factory=ProducerSettings,
56+
description=":ref:`Producer settings <configuration-producer-specific>`",
57+
)
5358

5459
model_config = SettingsConfigDict(env_prefix="DATA_RENTGEN__", env_nested_delimiter="__", extra="forbid")

data_rentgen/consumer/settings/consumer.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ class ConsumerSettings(BaseModel):
1919
.. code-block:: bash
2020
2121
DATA_RENTGEN__CONSUMER__TOPICS_LIST=["input.runs"]
22+
DATA_RENTGEN__CONSUMER__MALFOMED_TOPIC="input.runs:malformed"
2223
DATA_RENTGEN__CONSUMER__GROUP_ID=data-rentgen
2324
DATA_RENTGEN__CONSUMER__FETCH_MAX_WAIT_MS=5000
2425
DATA_RENTGEN__CONSUMER__MAX_PARTITION_FETCH_BYTES=5MiB
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
# SPDX-FileCopyrightText: 2024-2025 MTS PJSC
2+
# SPDX-License-Identifier: Apache-2.0
3+
4+
from pydantic import BaseModel, Field
5+
6+
7+
class ProducerSettings(BaseModel):
8+
"""Data.Rentgen producer-specific settings.
9+
10+
These options are passed directly to
11+
`AIOKafkaProducer <https://aiokafka.readthedocs.io/en/stable/api.html#aiokafka.AIOKafkaProducer>`_.
12+
13+
Examples
14+
--------
15+
16+
.. code-block:: bash
17+
18+
DATA_RENTGEN__PRODUCER__MALFOMED_TOPIC="input.runs__malformed"
19+
"""
20+
21+
malformed_topic: str = Field(
22+
default="input.runs__malformed",
23+
description="Topic to publish malformed messages to.",
24+
)

data_rentgen/consumer/subscribers.py

Lines changed: 83 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,17 @@
33

44
from __future__ import annotations
55

6-
from faststream import Depends, Logger
6+
import asyncio
7+
from typing import cast
8+
9+
from aiokafka import ConsumerRecord
10+
from faststream import Depends, Logger, NoCast
11+
from faststream.kafka import KafkaMessage
12+
from faststream.kafka.publisher.asyncapi import AsyncAPIDefaultPublisher
13+
from pydantic import TypeAdapter
714
from sqlalchemy.ext.asyncio import AsyncSession
815

9-
from data_rentgen.consumer.extractors import BatchExtractionResult, extract_batch
16+
from data_rentgen.consumer.extractors import BatchExtractionResult, BatchExtractor
1017
from data_rentgen.consumer.openlineage.run_event import OpenLineageRunEvent
1118
from data_rentgen.dependencies import Stub
1219
from data_rentgen.services.uow import UnitOfWork
@@ -15,33 +22,75 @@
1522
"runs_events_subscriber",
1623
]
1724

18-
19-
def get_unit_of_work(session: AsyncSession = Depends(Stub(AsyncSession))) -> UnitOfWork:
20-
return UnitOfWork(session)
25+
OpenLineageRunEventAdapter = TypeAdapter(OpenLineageRunEvent)
2126

2227

2328
async def runs_events_subscriber(
24-
events: list[OpenLineageRunEvent],
29+
_events: NoCast[list[OpenLineageRunEvent]],
30+
batch: KafkaMessage,
2531
logger: Logger,
26-
unit_of_work: UnitOfWork = Depends(get_unit_of_work),
32+
publisher: AsyncAPIDefaultPublisher = Depends(Stub(AsyncAPIDefaultPublisher)),
33+
session: AsyncSession = Depends(Stub(AsyncSession)),
2734
):
28-
logger.info("Got %d events", len(events))
29-
extracted = extract_batch(events)
30-
logger.info("Extracted: %r", extracted)
35+
logger.info("Extracting events")
36+
parsed, malformed = await extract_events(batch, logger)
3137

3238
logger.info("Saving to database")
33-
await save_to_db(extracted, unit_of_work, logger)
39+
await save_to_db(parsed, session, logger)
3440
logger.info("Saved successfully")
3541

42+
if malformed:
43+
logger.warning("Malformed messages: %d", len(malformed))
44+
await report_malformed(batch, malformed, publisher)
45+
46+
47+
async def extract_events(
48+
raw_data: KafkaMessage,
49+
logger: Logger,
50+
await_every: int = 50,
51+
) -> tuple[BatchExtractionResult, list[ConsumerRecord]]:
52+
messages = cast(tuple[ConsumerRecord], raw_data.raw_message) # https://github.com/airtai/faststream/issues/2102
53+
total_bytes = sum(len(message.value or "") for message in messages)
54+
logger.info("Got %d messages (%dKiB)", len(messages), total_bytes / 1024)
55+
56+
extractor = BatchExtractor()
57+
malformed: list[ConsumerRecord] = []
58+
59+
for i, message in enumerate(messages):
60+
try:
61+
if message.value is None:
62+
msg = "Message value cannot be empty"
63+
raise ValueError(msg) # noqa: TRY301
64+
65+
event = OpenLineageRunEventAdapter.validate_json(message.value)
66+
extractor.add_events([event])
67+
except (ValueError, TypeError):
68+
logger.error( # noqa: TRY400
69+
"Failed to parse message: ConsumerRecord(topic=%r, partition=%d, offset=%d)",
70+
message.topic,
71+
message.partition,
72+
message.offset,
73+
)
74+
malformed.append(message)
75+
76+
if await_every and i >= await_every and i % await_every == 0:
77+
# OpenLineage models are heavy, parsing is CPU bound task which may take some time.
78+
# Blocking event loop is not a good idea, so we need to await sometimes,
79+
await asyncio.sleep(0)
80+
81+
return extractor.result, malformed
82+
3683

3784
async def save_to_db(
3885
data: BatchExtractionResult,
39-
unit_of_work: UnitOfWork,
86+
session: AsyncSession,
4087
logger: Logger,
4188
) -> None:
4289
# To avoid deadlocks when parallel consumer instances insert/update the same row,
4390
# commit changes for each row instead of committing the whole batch. Yes, this cloud be slow.
4491

92+
unit_of_work = UnitOfWork(session)
93+
4594
logger.debug("Creating locations")
4695
for location_dto in data.locations():
4796
async with unit_of_work:
@@ -108,3 +157,25 @@ async def save_to_db(
108157

109158
logger.debug("Creating column lineage")
110159
await unit_of_work.column_lineage.create_bulk(column_lineage)
160+
161+
162+
async def report_malformed(
163+
batch: KafkaMessage,
164+
messages: list[ConsumerRecord],
165+
publisher: AsyncAPIDefaultPublisher,
166+
):
167+
# Return malformed messages back to the broker
168+
for message in messages:
169+
headers: dict[str, str] = {}
170+
if message.headers:
171+
headers = {key: value.decode("utf-8") for key, value in message.headers}
172+
173+
await publisher.publish(
174+
message.value,
175+
key=message.key,
176+
partition=message.partition,
177+
timestamp_ms=message.timestamp,
178+
headers=headers or None,
179+
reply_to=batch.message_id,
180+
correlation_id=batch.correlation_id,
181+
)

0 commit comments

Comments
 (0)