33import random
44import threading
55import time
6- from datetime import datetime , timedelta
76from queue import Queue
87from typing import List , Optional
98
@@ -35,9 +34,9 @@ class IngestionThread(threading.Thread):
3534 Manages ingestion for a single ingestion config.
3635 """
3736
38- OUTER_LOOP_PERIOD = 0.1 # Time of intervals loop will sleep while waiting for data.
37+ IDLE_LOOP_PERIOD = 0.1 # Time of intervals loop will sleep while waiting for data.
3938 SIFT_STREAM_FINISH_TIMEOUT = 0.06 # Measured ~0.05s to finish stream.
40- CLEANUP_TIMEOUT = OUTER_LOOP_PERIOD + SIFT_STREAM_FINISH_TIMEOUT
39+ CLEANUP_TIMEOUT = IDLE_LOOP_PERIOD + SIFT_STREAM_FINISH_TIMEOUT
4140
4241 def __init__ (
4342 self ,
@@ -58,7 +57,7 @@ def __init__(
5857 self .data_queue = data_queue
5958 self ._stop = threading .Event ()
6059 self .sift_stream_builder = sift_stream_builder
61- self .metric_interval = timedelta ( seconds = metric_interval )
60+ self .metric_interval = metric_interval
6261
6362 def stop (self ):
6463 self ._stop .set ()
@@ -69,7 +68,7 @@ def stop(self):
6968 async def main (self ):
7069 logger .debug ("Ingestion thread started" )
7170 sift_stream = await self .sift_stream_builder .build ()
72- time_since_last_metric = datetime . now () - timedelta ( seconds = 1 )
71+ time_since_last_metric = time . time () - 1
7372 count = 0
7473 try :
7574 while True :
@@ -87,11 +86,11 @@ async def main(self):
8786 continue
8887 sift_stream = await sift_stream .send_requests (item )
8988 count += 1
90- if datetime . now () - time_since_last_metric > self .metric_interval :
89+ if time . time () - time_since_last_metric > self .metric_interval :
9190 logger .debug (
9291 f"Ingestion thread sent { count } requests, remaining: { self .data_queue .qsize ()} "
9392 )
94- time_since_last_metric = datetime . now ()
93+ time_since_last_metric = time . time ()
9594
9695 if self ._stop .is_set ():
9796 logger .debug (
@@ -100,7 +99,7 @@ async def main(self):
10099 await sift_stream .finish ()
101100 return
102101 else :
103- time .sleep (self .OUTER_LOOP_PERIOD )
102+ time .sleep (self .IDLE_LOOP_PERIOD )
104103
105104 except asyncio .CancelledError :
106105 # It's possible the thread was joined while sleeping waiting for data. Only note error if we have data left.
@@ -136,6 +135,7 @@ def get_builder(channel: SiftChannel, ingestion_config: TelemetryConfig) -> Sift
136135 if not uri or not apikey :
137136 raise ValueError (f"Channel config is missing uri or apikey: { channel .config } " )
138137
138+ # SiftStreamBuilder needs URI to start with http or https
139139 if not uri .startswith ("http" ):
140140 if "localhost" in uri :
141141 uri = f"http://{ uri } "
@@ -152,42 +152,37 @@ def get_builder(channel: SiftChannel, ingestion_config: TelemetryConfig) -> Sift
152152
153153
154154async def stream_requests_async (
155- data_queue : Queue , * requests : IngestWithConfigDataStreamRequest , run_id : str = ""
155+ data_queue : Queue , * requests : IngestWithConfigDataStreamRequest
156156):
157157 """
158158 Non-blocking: Convert requests for rust bindings and put them into a queue.
159159
160160 Args:
161161 data_queue: The queue to put IngestWithConfigDataStreamRequestPy requests into for ingestion.
162162 requests: List of IngestWithConfigDataStreamRequest protobuf objects
163- run_id: Optional run ID to associate with the requests
164163 """
165164
166165 # Put each request individually into the queue, filtering out None values
167166 processed_requests = []
168167 for request in requests :
169168 if not isinstance (request , IngestWithConfigDataStreamRequest ):
170- raise ValueError (f"Received unexpected request: { request } of type { type (request )} " )
171- processed_request = ingest_request_to_ingest_request_py (request , run_id )
172- if processed_request is not None :
173- processed_requests .append (processed_request )
169+ raise ValueError (f"Received unexpected request: { request } of type { type (request )} " )
170+ processed_requests .append (ingest_request_to_ingest_request_py (request ))
174171 data_queue .put (processed_requests )
175172
176173
177174def stream_requests (
178175 data_queue : Queue ,
179176 * requests : IngestWithConfigDataStreamRequest ,
180- run_id : str = "" ,
181177) -> None :
182178 """
183179 Blocking: Convert requests for rust bindings and put them into a queue.
184180
185181 Args:
186182 data_queue: The queue to put IngestWithConfigDataStreamRequestPy requests into for ingestion.
187183 requests: List of IngestWithConfigDataStreamRequest protobuf objects
188- run_id: Optional run ID to associate with the requests
189184 """
190- asyncio .run (stream_requests_async (data_queue , * requests , run_id = run_id ))
185+ asyncio .run (stream_requests_async (data_queue , * requests ))
191186
192187
193188def telemetry_config_to_ingestion_config_py (
@@ -327,7 +322,6 @@ def get_run_form(
327322
328323def ingest_request_to_ingest_request_py (
329324 request ,
330- run_id : str = "" ,
331325) -> IngestWithConfigDataStreamRequestPy :
332326 """
333327 Convert an IngestWithConfigDataStreamRequest to IngestWithConfigDataStreamRequestPy.
@@ -355,7 +349,7 @@ def ingest_request_to_ingest_request_py(
355349 flow = request .flow ,
356350 timestamp = timestamp_py ,
357351 channel_values = channel_values_py ,
358- run_id = run_id or "" ,
352+ run_id = request . run_id or "" ,
359353 end_stream_on_validation_error = request .end_stream_on_validation_error ,
360354 organization_id = request .organization_id ,
361355 )
@@ -388,7 +382,6 @@ def convert_channel_value_to_channel_value_py(channel_value) -> IngestWithConfig
388382 elif channel_value .HasField ("uint64" ):
389383 return IngestWithConfigDataChannelValuePy .uint64 (channel_value .uint64 )
390384 elif channel_value .HasField ("enum" ):
391- # For enum values, we need to create a ChannelEnumTypePy
392385 return IngestWithConfigDataChannelValuePy .enum_value (channel_value .enum )
393386 elif channel_value .HasField ("bit_field" ):
394387 return IngestWithConfigDataChannelValuePy .bitfield (channel_value .bit_field )
0 commit comments