@@ -314,6 +314,44 @@ def test_consume_empty_does_not_create_span(self) -> None:
314314 span_list = self .memory_exporter .get_finished_spans ()
315315 self .assertEqual (len (span_list ), 0 )
316316
317+ def test_poll_empty_cleans_up_previous_span_and_token (self ) -> None :
318+ instrumentation = ConfluentKafkaInstrumentor ()
319+ consumer = MockConsumer (
320+ [MockedMessage ("topic-1" , 0 , 0 , [])],
321+ {
322+ "bootstrap.servers" : "localhost:29092" ,
323+ "group.id" : "mygroup" ,
324+ "auto.offset.reset" : "earliest" ,
325+ },
326+ )
327+ consumer = instrumentation .instrument_consumer (consumer )
328+ consumer .poll () # non-empty: sets _current_consume_span and _current_context_token
329+ self .assertIsNotNone (consumer ._current_consume_span )
330+ self .assertIsNotNone (consumer ._current_context_token )
331+
332+ consumer .poll () # empty: should clean up both
333+ self .assertIsNone (consumer ._current_consume_span )
334+ self .assertIsNone (consumer ._current_context_token )
335+
336+ def test_consume_empty_cleans_up_previous_span_and_token (self ) -> None :
337+ instrumentation = ConfluentKafkaInstrumentor ()
338+ consumer = MockConsumer (
339+ [MockedMessage ("topic-1" , 0 , 0 , [])],
340+ {
341+ "bootstrap.servers" : "localhost:29092" ,
342+ "group.id" : "mygroup" ,
343+ "auto.offset.reset" : "earliest" ,
344+ },
345+ )
346+ consumer = instrumentation .instrument_consumer (consumer )
347+ consumer .consume (1 ) # non-empty: sets _current_consume_span and _current_context_token
348+ self .assertIsNotNone (consumer ._current_consume_span )
349+ self .assertIsNotNone (consumer ._current_context_token )
350+
351+ consumer .consume (1 ) # empty: should clean up both
352+ self .assertIsNone (consumer ._current_consume_span )
353+ self .assertIsNone (consumer ._current_context_token )
354+
317355 def test_close (self ) -> None :
318356 instrumentation = ConfluentKafkaInstrumentor ()
319357 mocked_messages = [
0 commit comments