|
147 | 147 | "source": [ |
148 | 148 | "async def advanced_flowbuilder_example():\n", |
149 | 149 | " \"\"\"Example showing advanced FlowBuilderPy usage with channel indices for maximum performance.\"\"\"\n", |
150 | | - " from sift_stream_bindings import FlowBuilderPy, TimeValuePy, ValuePy, ChannelIndexPy\n", |
| 150 | + " from sift_stream_bindings import FlowBuilderPy, TimeValuePy, ValuePy\n", |
151 | 151 | "\n", |
152 | 152 | " connection_config = SiftConnectionConfig(\n", |
153 | 153 | " api_key=\"my_api_key\",\n", |
|
188 | 188 | " # Get the mapping from channel names to ChannelIndexPy\n", |
189 | 189 | " # This allows us to avoid hash lookups by using indices directly\n", |
190 | 190 | " channel_index_map = descriptor.mapping()\n", |
191 | | - " \n", |
| 191 | + "\n", |
192 | 192 | " # Pre-compute channel indices and value conversion methods\n", |
193 | 193 | " # This creates a list of (ChannelIndexPy, conversion_method) tuples\n", |
194 | 194 | " # that can be reused for each flow, avoiding hash operations\n", |
|
204 | 204 | " for i in range(10):\n", |
205 | 205 | " # Create a FlowBuilderPy from the descriptor\n", |
206 | 206 | " flow_builder = FlowBuilderPy(descriptor)\n", |
207 | | - " \n", |
| 207 | + "\n", |
208 | 208 | " # Attach the run ID directly to the flow builder\n", |
209 | 209 | " flow_builder.attach_run_id(run_id)\n", |
210 | | - " \n", |
| 210 | + "\n", |
211 | 211 | " # Set channel values using set() with pre-computed indices\n", |
212 | 212 | " # This avoids hash lookups and provides better performance\n", |
213 | 213 | " motor_temp_value = 50.0 + random.random() * 5.0\n", |
214 | 214 | " tank_pressure_value = 2000.0 + random.random() * 100.0\n", |
215 | | - " \n", |
| 215 | + "\n", |
216 | 216 | " # If the raw data class used provides in-order iteration over the raw data, you can also iterate\n", |
217 | 217 | " # over the values and encoding information directly. Since the value indices are used, the\n", |
218 | 218 | " # additional per-channel hash lookup is not needed, further improving performance.\n", |
|
227 | 227 | " values = [motor_temp_value, tank_pressure_value]\n", |
228 | 228 | " for (channel_index, conversion_method), value in zip(channel_indices_and_methods, values):\n", |
229 | 229 | " flow_builder.set(channel_index, conversion_method(value))\n", |
230 | | - " \n", |
| 230 | + "\n", |
231 | 231 | " # Build the request with current timestamp\n", |
232 | 232 | " request = flow_builder.request(TimeValuePy.now())\n", |
233 | | - " \n", |
| 233 | + "\n", |
234 | 234 | " # Send the request (non-blocking version)\n", |
235 | 235 | " ingest_client.send_requests_nonblocking([request])\n", |
236 | 236 | "\n", |
|
267 | 267 | "async def high_performance_batch_example():\n", |
268 | 268 | " \"\"\"Example showing high-performance batch sending with FlowBuilderPy using channel indices.\"\"\"\n", |
269 | 269 | " from datetime import timedelta\n", |
| 270 | + "\n", |
270 | 271 | " from sift_stream_bindings import FlowBuilderPy, TimeValuePy, ValuePy\n", |
271 | 272 | "\n", |
272 | 273 | " connection_config = SiftConnectionConfig(\n", |
|
320 | 321 | "\n", |
321 | 322 | " start_time = datetime.now(timezone.utc)\n", |
322 | 323 | " requests = []\n", |
323 | | - " \n", |
| 324 | + "\n", |
324 | 325 | " for i in range(num_flows):\n", |
325 | 326 | " # Calculate timestamp for each sample (spaced 0.1 seconds apart)\n", |
326 | 327 | " timestamp_secs = int((start_time + timedelta(seconds=i / sample_rate_hz)).timestamp())\n", |
327 | 328 | " timestamp = TimeValuePy.from_timestamp(timestamp_secs, 0)\n", |
328 | | - " \n", |
| 329 | + "\n", |
329 | 330 | " # Create FlowBuilderPy and build request using pre-computed indices\n", |
330 | 331 | " flow_builder = FlowBuilderPy(descriptor)\n", |
331 | 332 | " flow_builder.attach_run_id(run_id)\n", |
332 | | - " \n", |
| 333 | + "\n", |
333 | 334 | " # Generate values\n", |
334 | 335 | " motor_temp_value = 50.0 + random.random() * 5.0\n", |
335 | 336 | " tank_pressure_value = 2000.0 + random.random() * 100.0\n", |
336 | | - " \n", |
| 337 | + "\n", |
337 | 338 | " # Use indices directly - no hash operations!\n", |
338 | 339 | " values = [motor_temp_value, tank_pressure_value]\n", |
339 | 340 | " for (channel_index, conversion_method), value in zip(channel_indices_and_methods, values):\n", |
|
377 | 378 | "metadata": {}, |
378 | 379 | "outputs": [], |
379 | 380 | "source": [ |
380 | | - "import asyncio\n", |
381 | | - "import random\n", |
382 | | - "import time\n", |
383 | 381 | "from dataclasses import dataclass\n", |
384 | | - "from datetime import datetime, timezone\n", |
385 | 382 | "\n", |
386 | 383 | "from sift_stream_bindings import FlowBuilderPy, FlowDescriptorPy, TimeValuePy\n", |
387 | 384 | "\n", |
|
428 | 425 | " # Queues for the pipeline\n", |
429 | 426 | " queue1: asyncio.Queue[RawDataMessage] = asyncio.Queue()\n", |
430 | 427 | " queue2: asyncio.Queue[tuple[RawDataMessage, FlowDescriptorPy]] = asyncio.Queue()\n", |
431 | | - " \n", |
| 428 | + "\n", |
432 | 429 | " # Cache for flow descriptors (flow_name -> FlowDescriptorPy)\n", |
433 | 430 | " descriptor_cache: dict[str, FlowDescriptorPy] = {}\n", |
434 | | - " \n", |
| 431 | + "\n", |
435 | 432 | " # Cache for flow configs (flow_name -> FlowConfig)\n", |
436 | 433 | " # In a real scenario, you'd derive this from your raw data schema\n", |
437 | 434 | " flow_config_cache: dict[str, FlowConfig] = {\n", |
|
461 | 458 | " for i in range(20):\n", |
462 | 459 | " # Simulate different flows arriving\n", |
463 | 460 | " flow_name = \"onboard_sensors\" if i % 2 == 0 else \"navigation\"\n", |
464 | | - " \n", |
| 461 | + "\n", |
465 | 462 | " if flow_name == \"onboard_sensors\":\n", |
466 | 463 | " raw_data = RawDataMessage(\n", |
467 | 464 | " flow_name=flow_name,\n", |
|
480 | 477 | " \"gps_lon\": -122.4194 + random.random() * 0.01,\n", |
481 | 478 | " },\n", |
482 | 479 | " )\n", |
483 | | - " \n", |
| 480 | + "\n", |
484 | 481 | " queue1.put_nowait(raw_data)\n", |
485 | 482 | " await asyncio.sleep(0.1)\n", |
486 | 483 | "\n", |
|
497 | 494 | " continue\n", |
498 | 495 | "\n", |
499 | 496 | " flow_name = raw_data.flow_name\n", |
500 | | - " \n", |
| 497 | + "\n", |
501 | 498 | " # Check if descriptor is cached\n", |
502 | 499 | " if flow_name not in descriptor_cache:\n", |
503 | | - " \n", |
504 | | - " # For this example, the flow configs are pre-defined above. \n", |
505 | | - " # \n", |
| 500 | + "\n", |
| 501 | + " # For this example, the flow configs are pre-defined above.\n", |
| 502 | + " #\n", |
506 | 503 | " # Though in practice, these would often be dynamically generated based on\n", |
507 | 504 | " # the raw data schema.\n", |
508 | 505 | " if flow_name not in flow_config_cache:\n", |
509 | 506 | " raise ValueError(f\"Flow config not found for {flow_name}\")\n", |
510 | | - " \n", |
| 507 | + "\n", |
511 | 508 | " flow_config = flow_config_cache[flow_name]\n", |
512 | | - " \n", |
| 509 | + "\n", |
513 | 510 | " # Convert to Rust FlowConfigPy format\n", |
514 | | - " from sift_stream_bindings import FlowConfigPy, ChannelConfigPy, ChannelDataTypePy\n", |
515 | | - " \n", |
| 511 | + " from sift_stream_bindings import (\n", |
| 512 | + " ChannelConfigPy,\n", |
| 513 | + " ChannelDataTypePy,\n", |
| 514 | + " FlowConfigPy,\n", |
| 515 | + " )\n", |
| 516 | + "\n", |
516 | 517 | " channel_configs_py = [\n", |
517 | 518 | " ChannelConfigPy(\n", |
518 | 519 | " name=ch.name,\n", |
|
524 | 525 | " )\n", |
525 | 526 | " for ch in flow_config.channels\n", |
526 | 527 | " ]\n", |
527 | | - " \n", |
| 528 | + "\n", |
528 | 529 | " flow_config_py = FlowConfigPy(\n", |
529 | 530 | " name=flow_config.name,\n", |
530 | 531 | " channels=channel_configs_py,\n", |
531 | 532 | " )\n", |
532 | | - " \n", |
| 533 | + "\n", |
533 | 534 | " # Register the new flow\n", |
534 | 535 | " await ingest_client.add_new_flows([flow_config_py])\n", |
535 | | - " \n", |
| 536 | + "\n", |
536 | 537 | " # Get the descriptor and cache it\n", |
537 | 538 | " descriptor = ingest_client.get_flow_descriptor(flow_name)\n", |
538 | 539 | " descriptor_cache[flow_name] = descriptor\n", |
539 | 540 | " print(f\"Registered new flow: {flow_name}\")\n", |
540 | | - " \n", |
| 541 | + "\n", |
541 | 542 | " # Push to Queue 2 with the descriptor\n", |
542 | 543 | " await queue2.put((raw_data, descriptor_cache[flow_name]))\n", |
543 | 544 | "\n", |
|
556 | 557 | " # Create FlowBuilderPy and set values\n", |
557 | 558 | " flow_builder = FlowBuilderPy(descriptor)\n", |
558 | 559 | " flow_builder.attach_run_id(run_id)\n", |
559 | | - " \n", |
| 560 | + "\n", |
560 | 561 | " # Set all channel values from raw data.\n", |
561 | 562 | " for channel_name, value in raw_data.channel_values.items():\n", |
562 | 563 | " flow_builder.set_with_key(channel_name, value)\n", |
563 | | - " \n", |
| 564 | + "\n", |
564 | 565 | " # Convert timestamp to TimeValuePy\n", |
565 | 566 | " timestamp_secs = int(raw_data.timestamp.timestamp())\n", |
566 | 567 | " timestamp = TimeValuePy.from_timestamp(timestamp_secs, 0)\n", |
567 | | - " \n", |
| 568 | + "\n", |
568 | 569 | " # Build request and send\n", |
569 | 570 | " request = flow_builder.request(timestamp)\n", |
570 | 571 | " await ingest_client.send_requests([request])\n", |
|
0 commit comments