File tree Expand file tree Collapse file tree
Expand file tree Collapse file tree Original file line number Diff line number Diff line change @@ -132,8 +132,10 @@ def register_producer(
132132 else :
133133 # Multi-output producer, recurse
134134 for sub_topic in topic :
135- self ._multi_output_topics [sub_topic ] = topic
135+ # if sub_topic is already registered as loader,
136+ # we can not handle it as _multi_output_topics
136137 if sub_topic not in registered :
138+ self ._multi_output_topics [sub_topic ] = topic
137139 self .register_producer (iterator , sub_topic )
138140 return
139141 assert isinstance (topic , str )
@@ -264,7 +266,10 @@ def _fetch_new(self, topic):
264266 # This is what our caller wants
265267 desired_sub_msg = sub_msg
266268 log .debug (f"Got submessage { sub_msg } for sub topic { sub_msg_topic } " )
267- self ._ack_msg_produced (sub_msg , sub_msg_topic )
269+ # sub_msg_topic not in self._multi_output_topics means
270+ # it already has a loader (as producer)
271+ if sub_msg_topic in self ._multi_output_topics :
272+ self ._ack_msg_produced (sub_msg , sub_msg_topic )
268273 return desired_sub_msg
269274
270275 def _ack_msg_produced (self , msg , topic ):
You can’t perform that action at this time.
0 commit comments