@@ -105,13 +105,9 @@ def dr_cb(self, err, msg):
105105
106106 else :
107107 self .dr_cnt += 1
108- # perf: batch OTEL counter updates every 1000 messages to avoid
109- # per-msg SDK overhead (~20-50us/call). Grafana rate calcs are
110- # unaffected since window >>0.5s.
111- if self .dr_cnt % 1000 == 0 :
112- self .incr_counter ("producer.drok" , 1000 )
113- # perf: per-message gauge disabled to reach high throughput.
114- # self.set_gauge("producer.latency", msg.latency(), tags={"partition": "{}".format(msg.partition())})
108+ self .incr_counter ("producer.drok" , 1 )
109+ self .set_gauge ("producer.latency" , msg .latency (),
110+ tags = {"partition" : "{}" .format (msg .partition ())})
115111 if (self .dr_cnt % self .disprate ) == 0 :
116112 self .logger .debug (
117113 "producer: delivered message to {} [{}] at offset {} in {}s" .format (
@@ -132,11 +128,12 @@ def produce_record(self):
132128 self .producer .produce (
133129 self .topic ,
134130 value = record .serialize (),
135- # perf: "time" header dropped — it only fed the per-message
136- # e2e latency gauge, which is now disabled. str(time.time())
137- # was a syscall + float->str on every produced message.
138- headers = {"msgid" : str (record .msgid ), "txcnt" : str (txcnt )},
139- # on_delivery=self.dr_cb,
131+ headers = {
132+ "msgid" : str (record .msgid ),
133+ "txcnt" : str (txcnt ),
134+ "time" : str (time .time ()),
135+ },
136+ on_delivery = self .dr_cb ,
140137 )
141138 break
142139
@@ -145,9 +142,7 @@ def produce_record(self):
145142 continue
146143
147144 self .producer_msgid += 1
148- # perf: batch OTEL counter updates every 1000 messages.
149- if self .producer_msgid % 1000 == 0 :
150- self .incr_counter ("producer.send" , 1000 )
145+ self .incr_counter ("producer.send" , 1 )
151146
152147 def producer_status (self ):
153148 """Print producer status"""
@@ -276,20 +271,16 @@ def consumer_run(self):
276271 self .incr_counter ("consumer.msgerr" , 1 )
277272
278273 self .msg_cnt += 1
279- # perf: batch OTEL counter updates every 1000 messages.
280- if self .msg_cnt % 1000 == 0 :
281- self .incr_counter ("consumer.msg" , 1000 )
282-
283- # perf: per-message end-to-end latency gauge disabled to reach
284- # high throughput (rebuilt a dict + gauge per message).
285- # headers = dict(msg.headers())
286- # txtime = headers.get('time', None)
287- # if txtime is not None:
288- # latency = time.time() - float(txtime)
289- # self.set_gauge("consumer.e2e_latency", latency, tags={"partition": "{}".format(msg.partition())})
290- # else:
291- # latency = None
292- latency = None
274+ self .incr_counter ("consumer.msg" , 1 )
275+
276+ headers = dict (msg .headers ())
277+ txtime = headers .get ('time' , None )
278+ if txtime is not None :
279+ latency = time .time () - float (txtime )
280+ self .set_gauge ("consumer.e2e_latency" , latency ,
281+ tags = {"partition" : "{}" .format (msg .partition ())})
282+ else :
283+ latency = None
293284
294285 if (self .msg_cnt % self .disprate ) == 0 :
295286 # Show a sample message every #disprate messages
@@ -441,20 +432,16 @@ def share_run(self):
441432 self .incr_counter ("consumer.msgerr" , 1 )
442433
443434 self .share_msg_cnt += 1
444- # perf: batch OTEL counter updates every 1000 messages.
445- if self .share_msg_cnt % 1000 == 0 :
446- self .incr_counter ("consumer.msg" , 1000 )
447-
448- # perf: per-message end-to-end latency gauge disabled to reach
449- # high throughput (rebuilt a dict + gauge per message).
450- # headers = dict(msg.headers())
451- # txtime = headers.get('time', None)
452- # if txtime is not None:
453- # latency = time.time() - float(txtime)
454- # self.set_gauge(
455- # "consumer.e2e_latency", latency,
456- # tags={"partition": "{}".format(msg.partition())}
457- # )
435+ self .incr_counter ("consumer.msg" , 1 )
436+
437+ headers = dict (msg .headers ())
438+ txtime = headers .get ('time' , None )
439+ if txtime is not None :
440+ latency = time .time () - float (txtime )
441+ self .set_gauge (
442+ "consumer.e2e_latency" , latency ,
443+ tags = {"partition" : "{}" .format (msg .partition ())}
444+ )
458445
459446 if (self .share_msg_cnt % self .disprate ) == 0 :
460447 self .logger .info (
0 commit comments