@@ -254,35 +254,6 @@ def remove_parameter(self, parameter_name: str) -> bool:
254254 )
255255 return False
256256
257- def _add_flow_mutator (
258- self , d : "user_flow_decorator.FlowMutator"
259- ) -> "user_flow_decorator.FlowMutator" :
260- """Register a FlowMutator instance and prepare it for execution.
261-
262- Appends to both the merged list (for the current iteration) and the
263- underlying _self_data (for cache rebuild safety). Calls external_init()
264- so the mutator is ready before its pre_mutate is called by the ongoing
265- iteration loop.
266- """
267- from ..flowspec import FlowStateItems
268-
269- d .statically_defined = self ._statically_defined
270- d .inserted_by = self ._inserted_by
271- d ._flow_cls = self ._flow_cls
272-
273- # Append to the merged list (the one being iterated in the pre_mutate
274- # loop) so the new mutator's pre_mutate is called naturally.
275- merged_mutators = self ._flow_cls ._flow_state [FlowStateItems .FLOW_MUTATORS ]
276- merged_mutators .append (d )
277- # Also append to the underlying _self_data so a cache rebuild
278- # includes this mutator. We use _self_data directly (not the
279- # self_data property) to avoid clearing the merged cache.
280- self ._flow_cls ._flow_state ._self_data [FlowStateItems .FLOW_MUTATORS ].append (d )
281-
282- # external_init must be called before pre_mutate runs
283- d .external_init ()
284- return d
285-
286257 def add_decorator (
287258 self ,
288259 deco_type : Union [
@@ -444,6 +415,84 @@ def _do_add():
444415 else :
445416 raise ValueError ("Invalid duplicates value: %s" % duplicates )
446417
418+ def _add_flow_mutator (flow_mutator ):
419+ flow_mutator .statically_defined = self ._statically_defined
420+ flow_mutator .inserted_by = self ._inserted_by
421+ flow_mutator ._flow_cls = self ._flow_cls
422+
423+ def _do_add ():
424+ # Append to the merged list (the one being iterated in the
425+ # pre_mutate loop) so the new mutator's pre_mutate is called
426+ # naturally by the ongoing iteration.
427+ merged_mutators = self ._flow_cls ._flow_state [
428+ FlowStateItems .FLOW_MUTATORS
429+ ]
430+ merged_mutators .append (flow_mutator )
431+ # Also append to the underlying _self_data so a cache rebuild
432+ # includes this mutator. We use _self_data directly (not the
433+ # self_data property) to avoid clearing the merged cache.
434+ self ._flow_cls ._flow_state ._self_data [
435+ FlowStateItems .FLOW_MUTATORS
436+ ].append (flow_mutator )
437+ # external_init must be called before pre_mutate runs
438+ flow_mutator .external_init ()
439+ debug .userconf_exec (
440+ "Mutable flow adding flow mutator '%s'"
441+ % flow_mutator .decorator_name
442+ )
443+ return flow_mutator
444+
445+ # Check for existing mutator with the same decorator_name
446+ existing = [
447+ m
448+ for m in self ._flow_cls ._flow_state [FlowStateItems .FLOW_MUTATORS ]
449+ if hasattr (m , "decorator_name" )
450+ and m .decorator_name == flow_mutator .decorator_name
451+ ]
452+
453+ if not existing :
454+ return _do_add ()
455+ elif duplicates == MutableFlow .IGNORE :
456+ debug .userconf_exec (
457+ "Mutable flow ignoring flow mutator '%s' "
458+ "(already exists and duplicates are ignored)"
459+ % flow_mutator .decorator_name
460+ )
461+ return None
462+ elif duplicates == MutableFlow .OVERRIDE :
463+ debug .userconf_exec (
464+ "Mutable flow overriding flow mutator '%s' "
465+ "(removing existing mutator and adding new one)"
466+ % flow_mutator .decorator_name
467+ )
468+ # Remove from both the merged list and _self_data
469+ merged = self ._flow_cls ._flow_state [FlowStateItems .FLOW_MUTATORS ]
470+ for m in existing :
471+ if m in merged :
472+ merged .remove (m )
473+ self_list = self ._flow_cls ._flow_state ._self_data [
474+ FlowStateItems .FLOW_MUTATORS
475+ ]
476+ for m in existing :
477+ if m in self_list :
478+ self_list .remove (m )
479+ return _do_add ()
480+ elif duplicates == MutableFlow .ERROR :
481+ if self ._statically_defined :
482+ raise MetaflowException (
483+ "Duplicate FlowMutator '%s' on flow"
484+ % flow_mutator .decorator_name
485+ )
486+ else :
487+ debug .userconf_exec (
488+ "Mutable flow ignoring flow mutator '%s' "
489+ "(already exists and non statically defined)"
490+ % flow_mutator .decorator_name
491+ )
492+ return None
493+ else :
494+ raise ValueError ("Invalid duplicates value: %s" % duplicates )
495+
447496 # Check if this is a FlowMutator class or instance
448497 from .user_flow_decorator import FlowMutator
449498
@@ -458,12 +507,12 @@ def _do_add():
458507 if isinstance (flow_deco , FlowMutator ):
459508 # String resolved to a FlowMutator instance — route it through
460509 # the FlowMutator addition path
461- return self . _add_flow_mutator (flow_deco )
510+ return _add_flow_mutator (flow_deco )
462511 return _add_flow_decorator (flow_deco )
463512
464513 if isinstance (deco_type , type ) and issubclass (deco_type , FlowMutator ):
465514 d = deco_type (* deco_args , ** deco_kwargs )
466- return self . _add_flow_mutator (d )
515+ return _add_flow_mutator (d )
467516
468517 # Validate deco_type
469518 if (
0 commit comments