2323from metadata .ingestion .api .models import Either , Entity , StackTraceError
2424from metadata .ingestion .api .status import Status
2525from metadata .ingestion .ometa .ometa_api import OpenMetadata
26- from metadata .utils .logger import ingestion_logger
26+ from metadata .utils .logger import StatusWarningHandler , ingestion_logger
2727from metadata .utils .operation_metrics import OperationMetricsState
2828from metadata .utils .progress_tracker import ProgressTrackerState
2929
@@ -45,6 +45,24 @@ class Step(ABC, Closeable):
4545
4646 def __init__ (self ):
4747 self .status = Status ()
48+ self ._warning_handler = StatusWarningHandler (self .status )
49+
50+ def _activate_handler (self ) -> None :
51+ """Attach the warning handler to the ingestion logger.
52+
53+ Called at the start of each run() so that warnings emitted
54+ during step execution are captured in the step's Status.
55+ Must be paired with _deactivate_handler in a finally block.
56+ """
57+ ingestion_logger ().addHandler (self ._warning_handler )
58+
59+ def _deactivate_handler (self ) -> None :
60+ """Remove the warning handler from the ingestion logger.
61+
62+ Called in the finally block of run() to ensure the handler
63+ does not leak across step boundaries.
64+ """
65+ ingestion_logger ().removeHandler (self ._warning_handler )
4866
4967 @classmethod
5068 @abstractmethod
@@ -128,6 +146,7 @@ def run(self, record: Entity) -> Optional[Entity]:
128146 """
129147 Run the step and handle the status and exceptions
130148 """
149+ self ._activate_handler ()
131150 try :
132151 result : Either = self ._run (record )
133152 if result :
@@ -162,6 +181,8 @@ def run(self, record: Entity) -> Optional[Entity]:
162181 name = "Unhandled" , error = error , stackTrace = traceback .format_exc ()
163182 )
164183 )
184+ finally :
185+ self ._deactivate_handler ()
165186
166187 return None
167188
@@ -186,6 +207,7 @@ def run(self, record: Entity) -> None:
186207 """
187208 Run the step and handle the status and exceptions.
188209 """
210+ self ._activate_handler ()
189211 try :
190212 for result in self ._run (record ):
191213 if result .left is not None :
@@ -217,6 +239,8 @@ def run(self, record: Entity) -> None:
217239 name = "Unhandled" , error = error , stackTrace = traceback .format_exc ()
218240 )
219241 )
242+ finally :
243+ self ._deactivate_handler ()
220244
221245
222246class IterStep (Step , ABC ):
@@ -233,6 +257,7 @@ def run(self) -> Iterable[Optional[Entity]]:
233257 Note that we are overwriting the default run implementation
234258 in order to create a generator with `yield`.
235259 """
260+ self ._activate_handler ()
236261 try :
237262 for result in self ._iter ():
238263 if result .left is not None :
@@ -266,6 +291,8 @@ def run(self) -> Iterable[Optional[Entity]]:
266291 name = "Unhandled" , error = error , stackTrace = traceback .format_exc ()
267292 )
268293 )
294+ finally :
295+ self ._deactivate_handler ()
269296
270297
271298class BulkStep (Step , ABC ):
0 commit comments