@@ -94,8 +94,8 @@ async def main(self):
9494 logger .debug ("Ingestion thread started" )
9595 self .sift_stream_builder .ingestion_config = self .ingestion_config
9696 sift_stream = await self .sift_stream_builder .build ()
97- time_since_last_metric = time .time () - 1
98- time_since_last_data = time .time ()
97+ time_of_last_metric = time .time ()
98+ time_of_last_data = time .time ()
9999 count = 0
100100 self .initialized = True
101101 try :
@@ -108,20 +108,20 @@ async def main(self):
108108 )
109109 await sift_stream .finish ()
110110 return
111- time_since_last_data = time .time ()
111+ time_of_last_metric = time .time ()
112112 item = self .data_queue .get ()
113113 sift_stream = await sift_stream .send_requests (item )
114114 count += 1
115- if time .time () - time_since_last_metric > self .metric_interval :
115+ time_since_last_metric = time .time () - time_of_last_metric
116+ if time_since_last_metric > self .metric_interval :
116117 logger .debug (
117118 f"Ingestion thread sent { count } requests, remaining: { self .data_queue .qsize ()} "
118119 )
119- time_since_last_metric = time .time ()
120+ time_of_last_metric = time .time ()
120121
121- if (
122- self ._stop_event .is_set ()
123- or time .time () - time_since_last_data > self .no_data_timeout
124- ):
122+ # Queue empty, check if we should stop.
123+ time_since_last_data = time .time () - time_of_last_data
124+ if self ._stop_event .is_set () or time_since_last_data > self .no_data_timeout :
125125 logger .debug (
126126 f"No more requests. Stopping. Sent { count } requests. { self .data_queue .qsize ()} requests remaining."
127127 )
0 commit comments