-
Notifications
You must be signed in to change notification settings - Fork 8
Expand file tree
/
Copy path__init__.py
More file actions
933 lines (797 loc) · 30.7 KB
/
Copy path__init__.py
File metadata and controls
933 lines (797 loc) · 30.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
"""
!!! warning "Deprecation Warning"
The `sift_py` module is deprecated as of **v0.10.0**. Please use `sift_client` if you aren't already.
The `sift_py` module will be removed entirely in **v1.0.0**.
`sift_py` is a Python module built on top of Sift's protocol buffers to ergonomically interface with
Sift's gRPC API, especially with regard to data ingestion and and rule evaluation. If there are any
words or concepts that you find yourself needing to familiarize yourself with, be sure to visit the
official [Sift documentation](https://docs.siftstack.com/glossary).
* [Introduction](#introduction)
- [Quickstart](#quickstart)
* [Telemetry Config](#telemetry-config)
- [Telemetry Config from YAML](#telemetry-config-from-yaml)
- [Telemetry Config YAML Schema](#telemetry-config-yaml-schema)
- [Named Expression Modules](#named-expression-modules)
* [Updating a Telemetry Config](#updating-a-telemetry-config)
- [Ingestion Client Key](#ingestion-client-key)
* [Ingestion Service](#ingestion-service)
- [Sending data to Sift](#sending-data-to-sift)
* [Ingestion Performance](#ingestion-performance)
- [Buffered Ingestion](#buffered-ingestion)
* [Downloading Telemetry](#downloading-telemetry)
* [File attachments](#file-attachments)
* [More Examples](#more-examples)
## Introduction
The two fundamental components of this module are the following:
- `sift_py.ingestion.config.telemetry.TelemetryConfig` (telemetry config)
- `sift_py.ingestion.service.IngestionService` (ingestion service)
The telemetry config defines the schema of your telemetry. It is where you will declare your asset, channels and their components,
flows, and rules:
- `sift_py.ingestion.channel.ChannelConfig`
- `sift_py.ingestion.rule.config.RuleConfig`
- `sift_py.ingestion.flow.FlowConfig`
Once you have a telemetry config instantiated, you can then proceed to instantiate `sift_py.ingestion.service.IngestionService`
which is what's used to actually send Data to Sift.
### Quickstart
The following example demonstrates how to create a simple telemetry config for an asset with a single channel
and a single rule, afterwhich we'll send a single data point to Sift for that channel.
```python
from datetime import datetime, timezone
from sift_py.grpc.transport import SiftChannelConfig, use_sift_channel
from sift_py.ingestion.channel import (
ChannelBitFieldElement,
ChannelConfig,
ChannelDataType,
ChannelEnumType,
double_value
)
from sift_py.ingestion.service import IngestionService
from sift_py.ingestion.config.telemetry import FlowConfig, TelemetryConfig
from sift_py.ingestion.rule.config import (
RuleActionCreateDataReviewAnnotation,
RuleConfig,
)
# Create a channel config
temperature_channel = ChannelConfig(
name="temperature",
component="thruster",
data_type=ChannelDataType.DOUBLE,
description="temperature of thruster",
unit="Kelvin",
)
# Create a rule config referencing the above channel
overheating_rule = RuleConfig(
name="overheating",
description="Notify Ripley if thrusters get too hot",
expression='$1 > 400',
channel_references=[
{
"channel_reference": "$1",
"channel_config": temperature_channel,
},
],
action=RuleActionCreateDataReviewAnnotation(
assignee="ellen.ripley@weylandcorp.com",
tags=["warning", "thruster"],
),
),
# Creating the telemetry config using the rules and channels
# described above
telemetry_config = TelemetryConfig(
asset_name="NostromoLV426",
rules=[overheating_rule],
flows=[
FlowConfig(
name="temperature_reading",
channels=[temperature_channel],
),
],
)
# Create a gRPC transport channel configured specifically for the Sift API
sift_channel_config = SiftChannelConfig(uri=SIFT_BASE_URI, apikey=SIFT_API_KEY)
with use_sift_channel(sift_channel_config) as channel:
# Create ingestion service using the telemetry config we just created
ingestion_service = IngestionService(
channel,
telemetry_config,
)
# Send data to Sift for the 'temperature_reading' flow
ingestion_service.try_ingest_flows({
"flow_name": "temperature_reading",
"timestamp": datetime.now(timezone.utc),
"channel_values": [
{
"channel_name": "temperature",
"component": "thruster",
"value": double_value(327)
},
],
})
```
## Telemetry Config
There are currently two methods with which to initialize a telemetry config:
- `sift_py.ingestion.config.telemetry.TelemetryConfig.__init__`
- `sift_py.ingestion.config.telemetry.TelemetryConfig.try_from_yaml`
Both are equally valid and your choice to use one or the other largely depends on you and your team's preferred
workflow. The following sections will cover each initialization method.
### Telemetry Config From Yaml
While the telemetry config can be declaratively initialized using using the telemetry config's initializer, `sift_py` also exposes an API
to initialize a telemetry config from a YAML file. The following is a simple demonstration.
Say that we had the following project structure:
```
example
├─ telemetry_configs
│ └─ nostromo_lv_426.yml
├─ main.py
├─ telemetry_config.py
└─ requirements.txt
```
If our telemetry config is defined in the YAML file, `nostromo_lv_426.yml`, one of the ways in which
we might read that YAML file in as a `sift_py.ingestion.config.telemetry.TelemetryConfig` is to do the following:
```python
from pathlib import Path
TELEMETRY_CONFIGS_DIR = Path().joinpath("telemetry_configs")
def nostromos_lv_426() -> TelemetryConfig:
telemetry_config_path = TELEMETRY_CONFIGS_DIR.joinpath("nostromo_lv_426.yml")
return TelemetryConfig.try_from_yaml(telemetry_config_path)
```
As for the contents of the `nostromo_lv_426.yml`, file it might look something like this:
```yaml
asset_name: NostromoLV426
channels:
temperature_channel: &temperature_channel
name: temperature
component: thruster
data_type: double
description: temperature of the thruster
unit: Kelvin
rules:
- name: overheating
description: Notify Ripley if thrusters get too hot
expression: $1 > 400
channel_references:
- $1: *temperature_channel
type: review
assignee: ellen.ripley@weylandcorp.com
tags:
- warning
- thruster
flows:
- name: temperature_reading
channels:
- <<: *temperature_channel
```
And with the telemetry config that we just created, we can then proceed to create an ingestion service
and begin data ingestion.
### Telemetry Config YAML Schema
The following is the formal schema for a valid telemetry config in YAML. You can also see the `sift_py.ingestion.ingestion.config.yaml.spec` module
to see the schema in the for of Python classes.
```yaml
schema:
description: |
A formal specification to create a telemetry config which is used
to stream data and evaluate rules using Sift's gRPC API.
asset_name:
type: string
description: The name of the asset to telemeter.
ingestion_client_key:
type: string
description: Optional user-defined string-key that uniquely identifies this telemetry config.
organization_id:
type: string
description: Optional ID of user's organization. Required if user belongs to multiple organizations.
channels:
type: array
description: Sensors that send the data.
items:
type: object
properties:
name:
type: string
description: Name of the channel.
description:
type: string
description: Description of the channel.
unit:
type: string
description: Unit of measurement.
component:
type: string
description: Name of the component that the channel belongs to.
data_type:
type: string
enum: ["double", "string", "enum", "bit_field", "bool", "float", "int32", "int64", "uint32", "uint64"]
description: Type of the data associated with the channel.
enum_types:
type: array
items:
type: object
properties:
name:
type: string
description: Name of the enum type.
key:
type: integer
description: Key of the enum type.
description: Required if `data_type` is `enum`.
bit_field_elements:
type: array
description: Required if `data_type` is `bit_field`.
items:
type: object
properties:
name:
type: string
description: Name of the bit-field element.
index:
type: integer
description: Index of the bit-field element.
bit_count:
type: integer
description: Bit count of the bit-field element.
rules:
type: array
description: Rules that, when evaluated to a true, will perform some sort of action.
items:
type: object
properties:
name:
type: string
description: Name of the rule.
description:
type: string
description: Description of the rule.
expression:
oneOf:
- type: string
description: A string expression defining the rule logic.
- type: object
description: A reference to a named expression.
properties:
name:
type: string
description: Name of the named expression.
type:
type: string
enum: [phase, review]
description: Determines the action to perform if a rule gets evaluated to true.
assignee:
type: string
description: If 'type' is 'review', determines who to notify. Expects an email.
tags:
type: array
items:
type: string
description: Tags to associate with the rule.
channel_references:
type: array
description: A list of channel references that map to an actual channel. Use YAML anchors to reference channels.
items:
type: object
description: |
Key-value pair of string to channel. The channel should be a YAML anchor to a previously declared channel
in the top-level 'channels' property. The key should take the form of '$1', '$2', '$11', and do on. In YAML
it would look something like this:
------------------------------------
channel_references:
- $1: *vehicle_state_channel
- $2: *voltage_channel
------------------------------------
sub_expressions:
type: array
description: A list of sub-expressions which is a mapping of place-holders to sub-expressions.
items:
type: object
description: |
A sub-expression is made up of two components: A reference and the actual sub-expression. The sub-expression reference is
a string with a "$" prepended to another string comprised of characters in the following character set: `[a-zA-Z0-9_]`.
This reference should be mapped to the actual sub-expression. For example, say you have kinematic equations in `kinematics.yml`,
and the equation you're interested in using looks like the following:
------------------------------------
kinetic_energy_gt:
0.5 * $mass * $1 * $1 > $threshold
------------------------------------
To properly use `kinetic_energy_gt` in your rule, it would look like the following:
------------------------------------
rules:
- name: kinetic_energy
description: Tracks high energy output while in motion
type: review
assignee: bob@example.com
expression:
name: kinetic_energy_gt
channel_references:
- $1: *velocity_channel
sub_expressions:
- $mass: 10
- $threshold: 470
tags:
- nostromo
------------------------------------
flows:
type: array
description: A list of named groups of channels that send data together.
items:
type: object
properties:
name:
type: string
description: Name of the flow.
channels:
type: array
items:
type: object
description: |
List of channels included in the flow. Should be a YAML anchor from a previously declared channel
in the top-level 'channels' property.
```
## Named Expression Modules
Often times you may find yourself needing to re-using more complex rule expressions across different telemetry
configs. If this is the case you might consider leveraging `named expressions` which allow you to reference the name
of an expression defined in another YAML file rather than defining it repeatedly across different telemetry configs.
As an example, say this is our current rule in our YAML telemetry config:
```yaml
rules:
- name: kinetic_energy_gt
description: Tracks high energy output while in motion
type: review
assignee: cthulhu@rlyeh.com
expression: 0.5 * 10 * $1 * $1 > 470
channel_references:
- $1: *velocity_channel
```
Instead of repeatedly writing that kinetic energy expression across different telemetry configs, you can move that expression
over to it's own named expression module YAML file which we'll call `kinematics.yml`, and then reference it by name in the
telemetry configs:
`kinematics.yml`
```yaml
kinetic_energy_gt:
0.5 * $mass * $1 * $1 > $threshold
rod_torque_gt:
(1 / 12) * $mass * $rod_length * $rod_length * $1
```
`telemetry_config.py`
```yaml
rules:
- name: kinetic_energy
description: Tracks high energy output while in motion
type: review
expression:
name: kinetic_energy_gt
channel_references:
- $1: *velocity_channel
sub_expressions:
- $mass: 10
- $threshold: 470
```
In order for the telemetry configs to load in the named expression modules at run-time, all you need to do is provide the path
to the named expression module(s) wherever it may be. For example, given the following project structure:
```
example
├─ telemetry_configs
│ └─ nostromo_lv_426.yml
├─ main.py
├─ telemetry_config.py
└─ expression_modules
├─ string.yml
└─ kinematics.yml
```
Here is how we might load our telemetry config:
```python
from pathlib import Path
from sift_py.ingestion.service import TelemetryConfig
TELEMETRY_CONFIGS_DIR = Path().joinpath("telemetry_configs")
EXPRESSION_MODULES_DIR = Path().joinpath("expression_modules")
def nostromos_lv_426() -> TelemetryConfig:
telemetry_config_path = TELEMETRY_CONFIGS_DIR.joinpath("nostromo_lv_426.yml")
return TelemetryConfig.try_from_yaml(
telemetry_config_path,
[
EXPRESSION_MODULES_DIR.joinpath("kinematics.yml"),
EXPRESSION_MODULES_DIR.joinpath("string.yml"),
],
)
```
## Updating a Telemetry Config
The following section covers the situation where you would like to maintain your config using an `ingestion_client_key`. Note that
this is not required and only necessary if you are updating your telemetry config dynamically.
### Ingestion Client Key
A `sift_py.ingestion.config.telemetry.TelemetryConfig` contains a field called `ingestion_client_key`
which is used by Sift to uniquely identify an existing telemetry config for an asset. For a given telemetry config
you are free to make the following changes and Sift will be able to pick it up without changing the `ingestion_client_key`:
- Adding new channels
- Removing existing channels (Need to also remove channel reference in the flow)
- Adding new flows
- Removing existing flows
- Adding new rules
- Updating existing rules
These can even be done on the fly at run-time.
The following changes, however, would require you to also update the `ingestion_client_key`, otherwise an exception will be raised
when a `sift_py.ingestion.service.IngestionService` is initialized.
- Updating an existing channel
- Adding a new channel to an existing flow
## Ingestion Service
As mentioned previously, whereas a telemetry config defines the schema of your telemetry,
`sift_py.ingestion.service.IngestionService` is what's actually responsible for sending your data to Sift.
The two methods most folks will use to send data to Sift are the following:
- `sift_py.ingestion.service.IngestionService.try_ingest_flows`
- `sift_py.ingestion.service.IngestionService.ingest_flows`
Visit the function definitions to understand the differences between each.
Once you have generated a request using either of those methods
data is then sent to Sift using `sift_py.ingestion.service.IngestionService.ingest`.
The following are some examples illustrating generating data ingestion requests and sending them to Sift.
### Sending Data to Sift
Suppose we have the following telemetry config with four configured instances of `sift_py.ingestion.flow.FlowConfig`.
```python
def nostromos_lv_426() -> TelemetryConfig:
log_channel = ChannelConfig(
name="log",
data_type=ChannelDataType.STRING,
description="asset logs",
)
velocity_channel = ChannelConfig(
name="velocity",
data_type=ChannelDataType.DOUBLE,
description="speed",
unit="Miles Per Hour",
component="mainmotor",
)
voltage_channel = ChannelConfig(
name="voltage",
data_type=ChannelDataType.INT_32,
description="voltage at source",
unit="Volts",
)
vehicle_state_channel = ChannelConfig(
name="vehicle_state",
data_type=ChannelDataType.ENUM,
description="vehicle state",
enum_types=[
ChannelEnumType(name="Accelerating", key=0),
ChannelEnumType(name="Decelerating", key=1),
ChannelEnumType(name="Stopped", key=2),
],
)
gpio_channel = ChannelConfig(
name="gpio",
data_type=ChannelDataType.BIT_FIELD,
description="on/off values for pins on gpio",
bit_field_elements=[
ChannelBitFieldElement(name="12v", index=0, bit_count=1),
ChannelBitFieldElement(name="charge", index=1, bit_count=2),
ChannelBitFieldElement(name="led", index=3, bit_count=4),
ChannelBitFieldElement(name="heater", index=7, bit_count=1),
],
)
return TelemetryConfig(
asset_name="NostromoLV426",
flows=[
FlowConfig(
name="readings",
channels=[
velocity_channel,
voltage_channel,
vehicle_state_channel,
gpio_channel,
],
),
FlowConfig(
name="voltage",
channels=[voltage_channel],
),
FlowConfig(
name="gpio_channel",
channels=[gpio_channel],
),
FlowConfig(name="logs", channels=[log_channel]),
],
)
```
The following is an example of ingesting data for each flow using `sift_py.ingestion.service.IngestionService.try_ingest_flows`:
```python
import time
from datetime import datetime, timezone
from sift_py.grpc.transport import SiftChannelConfig, use_sift_channel
from sift_py.ingestion.channel import (
ChannelBitFieldElement,
ChannelConfig,
ChannelDataType,
ChannelEnumType,
bit_field_value,
double_value,
enum_value,
int32_value,
string_value,
)
from sift_py.ingestion.service import IngestionService
from sift_py.ingestion.config.telemetry import FlowConfig, TelemetryConfig
telemetry_config = nostromos_lv_426()
sift_channel_config = SiftChannelConfig(uri=base_uri, apikey=apikey)
with use_sift_channel(sift_channel_config) as channel:
ingestion_service = IngestionService(
channel,
telemetry_config,
)
# Send data for the readings flow
ingestion_service.try_ingest_flows({
"flow_name": "readings",
"timestamp": datetime.now(timezone.utc),
"channel_values": [
{
"channel_name": "velocity",
"component": "mainmotor",
"value": double_value(10),
},
{
"channel_name": "voltage",
"value": int32_value(5),
},
{
"channel_name": "vehicle_state",
"value": enum_value(2),
},
{
"channel_name": "gpio",
"value": bit_field_value(bytes(int("00001001", 2)),
},
],
})
# Send partial data for the readings flow
ingestion_service.try_ingest_flows({
"flow_name": "readings",
"timestamp": datetime.now(timezone.utc),
"channel_values": [
{
"channel_name": "velocity",
"component": "mainmotor",
"value": double_value(10),
},
{
"channel_name": "gpio",
"value": bit_field_value(bytes(int("00001001", 2)),
},
],
})
# Send partial data for the logs flow
ingestion_service.try_ingest_flows({
"flow_name": "readings",
"timestamp": datetime.now(timezone.utc),
"channel_values": [
{
"channel_name": "logs",
"value": string_value("INFO: some message")
},
],
})
# Send data for both logs and readings
ingestion_service.try_ingest_flows(
{
"flow_name": "readings",
"timestamp": datetime.now(timezone.utc),
"channel_values": [
{
"channel_name": "velocity",
"component": "mainmotor",
"value": double_value(10),
},
{
"channel_name": "voltage",
"value": int32_value(5),
},
{
"channel_name": "vehicle_state",
"value": enum_value(2),
},
{
"channel_name": "gpio",
"value": bit_field_value(bytes(int("00001001", 2)),
},
],
},
{
"flow_name": "logs",
"timestamp": datetime.now(timezone.utc),
"channel_values": [
{
"channel_name": "logs",
"value": string_value("INFO: some message")
},
],
},
)
```
Alternatively, you may also use `sift_py.ingestion.service.IngestionService.ingest_flows`, but be sure
to read the documentation for that method to understand how to leverage it correctly. Unlike
`sift_py.ingestion.service.IngestionService.try_ingest_flows`, it will not perform any client-side validations.
This is useful when performance is critical. Do note, however, that the client-side validations done in `sift_py.ingestion.service.IngestionService.try_ingest_flows`
are pretty minimal and should not incur noticeable overhead.
```python
import time
from datetime import datetime, timezone
from sift_py.grpc.transport import SiftChannelConfig, use_sift_channel
from sift_py.ingestion.channel import (
ChannelBitFieldElement,
ChannelConfig,
ChannelDataType,
ChannelEnumType,
bit_field_value,
empty_value,
double_value,
enum_value,
int32_value,
string_value,
)
from sift_py.ingestion.service import IngestionService
from sift_py.ingestion.config.telemetry import FlowConfig, TelemetryConfig
telemetry_config = nostromos_lv_426()
sift_channel_config = SiftChannelConfig(uri=base_uri, apikey=apikey)
with use_sift_channel(sift_channel_config) as channel:
ingestion_service = IngestionService(
channel,
telemetry_config,
)
# Send data for the readings flow
ingestion_service.ingest_flows({
"flow_name": "readings",
"timestamp": datetime.now(timezone.utc),
"channel_values": [
double_value(10),
int32_value(5),
enum_value(2),
bit_field_value(bytes(int("00001001", 2)),
],
})
# Send partial data for the readings flow
ingestion_service.ingest_flows({
"flow_name": "readings",
"timestamp": datetime.now(timezone.utc),
"channel_values": [
double_value(10),
empty_value(),
empty_value(),
bit_field_value(bytes(int("00001001", 2)),
],
})
# Send data for logs flow
ingestion_service.ingest_flows({
"flow_name": "logs",
"timestamp": datetime.now(timezone.utc),
"channel_values": [
string_value("INFO: some message"),
],
})
# Send data for both logs and readings flow
ingestion_service.ingest_flows(
{
"flow_name": "readings",
"timestamp": datetime.now(timezone.utc),
"channel_values": [
double_value(10),
int32_value(5),
enum_value(2),
bit_field_value(bytes(int("00001001", 2)),
],
},
{
"flow_name": "logs",
"timestamp": datetime.now(timezone.utc),
"channel_values": [
string_value("INFO: some message"),
],
},
)
```
## Ingestion Performance
Depending on your ingestion setup there are some very common Python gotchas as it relates to gRPC that
hinders performance. The following are some examples of things you may want to avoid
when ingesting data into Sift:
1. Avoid ingesting a high volume of data points in a hot loop. Prefer to ingest the data as a batch so that
serializing all outgoing requests can happen in one-fell swoop.
```python
# Avoid this:
for flow in flows:
ingestion_service.try_ingest_flows(flow)
# Do this:
ingestion_service.try_ingest_flows(*flows)
```
2. Avoid sending too much data at once, otherwise you may encounter CPU-bound bottlenecks caused by
serializing a large amount of messages.
```python
# Avoid this:
ingestion_service.try_ingest_flows(*a_very_large_amount_of_flows)
```
To avoid having to deal with these pitfalls, prefer to leverage buffered ingestion.
### Buffered Ingestion
`sift_py` offers an API to automatically buffer requests and send them in batches when the
buffer threshold is met. This ensures the following:
- You are not serializing, streaming, serializing, streaming, and so on, one record at a time.
- You are not spending too much time serializing a large amount of requests, and likewise,
spending too much time streaming a high volume of messages.
This API is available via the following:
- `sift_py.ingestion.service.IngestionService.buffered_ingestion`
The buffered ingestion mechanism simply handles the buffering logic and streams the data only
after the buffer threshold is met. The following is an example of how it might be used:
```python
# Defaults to a buffer size of `sift_py.ingestion.buffer.DEFAULT_BUFFER_SIZE` requests.
with ingestion_service.buffered_ingestion() as buffered_ingestion:
buffered_ingestion.try_ingest_flows(*lots_of_flows)
buffered_ingestion.try_ingest_flows(*lots_more_flows)
# Custom buffer size of 750 requests
with ingestion_service.buffered_ingestion(750) as buffered_ingestion:
buffered_ingestion.try_ingest_flows(*lots_of_flows)
buffered_ingestion.try_ingest_flows(*lots_more_flows)
```
Once the with-block ends, the remaining requests will be flushed from the buffer automatically,
but you may manually flush as well:
```python
with ingestion_service.buffered_ingestion() as buffered_ingestion:
buffered_ingestion.try_ingest_flows(*lots_of_flows)
buffered_ingestion.flush()
```
Visit the `sift_py.ingestion.service.IngestionService.buffered_ingestion` function definition
for further details.
## Downloading Telemetry
To download your telemetry locally you'll want to make use of the `sift_py.data` module. Them module-level documentation
contains more details, but here is an example script demonstrating how to download data for multiple channels, putting them
into a `pandas` data frame, and writing the results out to a CSV:
```python
import asyncio
import functools
import pandas as pd
from sift_py.data.query import ChannelQuery, DataQuery
from sift_py.grpc.transport import SiftChannelConfig, use_sift_async_channel
from sift_py.data.service import DataService
async def channel_demo():
channel_config: SiftChannelConfig = {
"apikey": "my-key"
"uri": "sift-uri"
}
async with use_sift_async_channel(channel_config) as channel:
data_service = DataService(channel)
query = DataQuery(
asset_name="NostromoLV426",
start_time="2024-07-04T18:09:08.555-07:00",
end_time="2024-07-04T18:09:11.556-07:00",
channels=[
ChannelQuery(
channel_name="voltage",
run_name="[NostromoLV426].1720141748.047512"
),
ChannelQuery(
channel_name="velocity",
component="mainmotors",
run_name="[NostromoLV426].1720141748.047512",
),
ChannelQuery(
channel_name="gpio",
run_name="[NostromoLV426].1720141748.047512",
),
],
)
result = await data_service.execute(query)
data_frames = [
pd.DataFrame(data.columns())
for data in result.channels("voltage", "mainmotors.velocity", "gpio.12v")
]
merged_frame = functools.reduce(
lambda x, y: pd.merge_asof(x, y, on="time"), data_frames
)
merged_frame.to_csv("my_csv.csv")
if __name__ == "__main__":
asyncio.run(example())
```
## File attachments
See the module-level documentation for `sift_py.file_attachment` to learn uploading and downloading
file attachments to various entities such as runs, annotations, and annotation logs. Once file attachments
are created they become viewable in the Sift application.
## More Examples
For more comphrensive examples demonstrating a little bit of everything, you may
visit the [examples directory](https://github.com/sift-stack/sift/tree/main/python/examples) in the project repo.
"""
import warnings
warnings.warn(
"The `sift_py` module is deprecated as of **v0.10.0**. Please use `sift_client` if you aren't already. "
"The `sift_py` module will be removed entirely in **v1.0.0**.",
category=FutureWarning,
stacklevel=2,
)