88import anyio
99from fast_depends import dependency_provider
1010from faststream import ContextRepo , FastStream
11- from faststream ._compat import ExceptionGroup
11+ from faststream ._internal . _compat import ExceptionGroup
1212from faststream .asgi import AsgiFastStream , AsgiResponse , get
1313from faststream .kafka import KafkaBroker
14- from faststream .kafka .publisher .asyncapi import AsyncAPIDefaultPublisher
15- from sqlalchemy .ext .asyncio import AsyncSession
14+ from faststream .specification .asyncapi import AsyncAPI
1615
1716import data_rentgen
1817from data_rentgen .consumer .settings import ConsumerApplicationSettings
19- from data_rentgen .consumer .subscribers import runs_events_subscriber
20- from data_rentgen .db .factory import create_session_factory
18+ from data_rentgen .consumer .subscribers import (
19+ get_default_publisher ,
20+ get_session ,
21+ runs_events_subscriber ,
22+ )
23+ from data_rentgen .db .factory import session_generator
2124from data_rentgen .logging .setup_logging import setup_logging
2225
2326logger = logging .getLogger (__name__ )
@@ -53,11 +56,14 @@ def broker_factory(settings: ConsumerApplicationSettings) -> KafkaBroker:
5356 )
5457 publisher = broker .publisher (settings .producer .malformed_topic )
5558
59+ async def get_publisher ():
60+ return publisher
61+
5662 # perform registration
5763 subscriber (runs_events_subscriber )
5864
59- dependency_provider .override (AsyncSession , create_session_factory (settings .database ))
60- dependency_provider .override (AsyncAPIDefaultPublisher , lambda : publisher )
65+ dependency_provider .override (get_session , session_generator (settings .database ))
66+ dependency_provider .override (get_default_publisher , get_publisher )
6167 return broker
6268
6369
@@ -78,11 +84,13 @@ async def security_lifespan(context: ContextRepo):
7884 raise exception from None
7985
8086 return FastStream (
81- broker = broker_factory (settings ),
87+ broker_factory (settings ),
8288 lifespan = security_lifespan ,
83- title = "Data.Rentgen" ,
84- description = "Data.Rentgen is a nextgen DataLineage service" ,
85- version = data_rentgen .__version__ ,
89+ specification = AsyncAPI (
90+ title = "Data.Rentgen" ,
91+ description = "Data.Rentgen is a nextgen DataLineage service" ,
92+ version = data_rentgen .__version__ ,
93+ ),
8694 logger = logger ,
8795 ).as_asgi (asgi_routes = [("/monitoring/ping" , liveness )])
8896
0 commit comments