All notable changes to this project will be documented in this file.
This project adheres to Semantic Versioning.
Update ingestion to use compiled Rust binary under the hood for performance improvements.
Add plumbing to allow specifying bytes type data for ingestion.
- Catch PermissionError when removing temp files
- Add support for start and end times to rule evaluation
Adds support for uploading HDF5 files to Sift for ingestion through the addition of the Hdf5UploadService and Hdf5Config. See examples/data_import/hdf5 for an example of how to upload HDF5 files.
- Add HDF5 upload service
- Fixes bug when updating rules where the rule_id is not always passed to Sift
Support for attaching metadata to a run or asset has been introduced through the addition of the AssetService and updates to the attach_run method of the IngestionService. See examples/assets for an example of updating metadata for an asset.
- Make ingestion client key optional
- Add support for metadata definition through the python client
- Add public method for creating new run
The RuleEvaluationService has been introduced to provide functionality for evaluating rules and reports against runs or assets. This includes support for dry-run evaluations and previews, as well as evaluating rules defined in YAML configurations.
The RuleService has been updated to support External Rules. External Rules are ideal for automated workflows such as CI/CD pipelines, where external Rules are managed and evaluated programmatically.
Read more about External Rules here here.
- Add support for rule evaluation service and external rules
- Allow creating multiple new flows at the same time
- Contextual channels support
- Updates for user defined functions and external rules
- Fix bugs within the IngestService
A new RosbagsUploadService has been added to sift_py.data_import.rosbags, which provides the functionality for uploading
rosbags, including video frames from a rosbag. See examples/data_import/rosbags for examples on how to use the new service.
A new CalculatedChannelService, in sift_py.calculated_channels.service, provides functionality for creating, updating,
searching, and listing Calculated Channels. It also provides a YAML interface for defining Calculated Channels using a
YAML file. Examples can be found in examples/calculated_channels.
Channel Component field is being deprecated and will be removed in version 1.0.0. Please see https://docs.siftstack.com/docs/glossary#component
for more information.
- Added automatic retry for REST API calls
- Deprecated
componentfield for Channels - Fix for
from_annotation_typeinRuleConfig - Added support for Calculated Channels
- Remove previously deprecated
overwrite_rulesargument from ingestion service - Fix
ReportTemplateupdating - Remove breakpoint from internal function
- Moves npTDMS dependency to optional
Specify
sift-stack-py[tdms]in your project dependencies if you need to use the TDMS upload service.
There is now a ReportTemplateService that is available from sift_py.report_templates.service. This service allows for creating,
updating, and retrieving report templates with a client key, where client keys are unique user defined strings associated with a given report
template. Report templates may either by defined via a python config (using the ReportTemplateConfig class) or a YAML file, which
follows the same schema.
Please see examples/report_templates for some examples of how to manage report templates via python and YAML using the ReportTemplateService.
There is now a RuleService that is available from sift_py.rule.service. This service allows for creating, updating, and retrieving rules.
Rules created through this service should now include a rule client key, a unique user defined string associated with a given rule. As before,
rules may be defined in python (via the RuleConfigClass) or via YAML (following the RuleYamlSpec class).
Please see examples/ingestion_with_yaml_config for an example of how to create rules from YAML using the RuleService.
There is now a base class, BaseCh10File, and an upload service, Ch10UploadService that allows for easily parsing and uploading IRIG Chapter 10 files.
Please see examples/data_import/ch10 for an example of how to use these classes.
- Enables keepalive by default and configures user-agent
- Ping Example added
- Adds optional openSSL certificate fetching
The overwrite_rules option of IngestionService is going to be deprecated in the next release, and will emit a warning if set.
- Updates most examples to use RuleService to create rules
- Adds ReportTemplateService and example usage
- Enables keepalive by default and configures user-agent
- Fix status bug with CSV uploads
- Support Windows mimetypes for CSV uploads
- Allow uploading compressed chapter 10 files
- Fix data download bug for channels with '.' delimited name.
- Improvements to CSV importing
- Ping Example added
- Adds optional openSSL certificate fetching
- Loosen various dependency requirements
Summary of changes:
- Loosen various dependency requirements
Summary of changes:
- add missing double channel computation
- update protos
- update protos
- csv example ingestion
- update example
- Add multiple CSV ingestion example
- Add metadata arg to use_sift_channel
Summary of changes:
- Add pydantic models for the sift grafana plugin queries
- Lowered minimum version requirements for various dependencies and dependency cleanup
- Added py.typed to modules that utilize type annotations
- Added services derived from protobuf for the following APIs
- Remote files
- Rule versions
- Saved searches
- Added helpful validations when processing Sift API URLs
- Added support for gRPC keep-alive
- Added service to upload and download file attachments
- Added timer-based flushing for buffered ingestion and custom error-handling
- Added functionality to generate a channel value from a channel config
The following section will cover some of the more notable features in depth. Please refer to the documentation for even more detail.
- Keepalive
- File attachments
- Timer based flushing for buffered ingestion
- Custom error handling for buffered ingestion
Long-lived connections are generally are at risk of being closed due to idle timeouts if they are idle for a particular duration. In other words, if there is no data being exchanged on the connection for a duration specified by a load balancer's idle timeout, the connection will be closed. In order to ensure that this does not happen you can opt into enabling the HTTP/2 PING-based keepalive mechanism either on your own gRPC channel or the one that Sift provides. To configure keep-alive using the Sift-provided gRPC channel:
from sift_py.grpc.transport import use_sift_channel, SiftChannelConfig
sift_channel_config = SiftChannelConfig(
uri=uri,
apikey=apikey,
enable_keepalive=True,
)
with use_sift_channel(sift_channel_config) as channel:
...This uses default values set in the sift_py.grpc.keepalive module. If you'd like to configure your own keepalive parameters you could
also do the following:
from sift_py.grpc.transport import use_sift_channel, SiftChannelConfig
from sift_py.grpc.keepalive import KeepaliveConfig
sift_channel_config = SiftChannelConfig(
uri=uri,
apikey=apikey,
enable_keepalive=KeepaliveConfig(
keepalive_time_ms=keepalive_time_ms,
keepalive_timeout_ms=keepalive_timeout_ms,
keepalive_permit_without_calls=keepalive_permit_without_calls,
max_pings_without_data=max_pings_without_data,
),
)
with use_sift_channel(sift_channel_config) as channel:
...There is now a sift_py.file_attachment module that contains utilities to programmatically upload and download file attachments.
Files currently can be attached to various entities such as runs, annotations, and annotation logs. Various video and image formats
are currently supported. Below is an example demonstrating how to upload a file attachment and programmatically downloading it.
from sift_py.grpc.transport import SiftChannelConfig, use_sift_channel
from sift_py.file_attachment.service import FileAttachmentService
from sift_py.file_attachment.entity import Entity, EntityType
from sift_py.file_attachment.metadata import VideoMetadata
from sift_py.rest import SiftRestConfig
from sift.remote_files.v1.remote_files_pb2 import GetRemoteFileRequest
from sift.remote_files.v1.remote_files_pb2_grpc import RemoteFileServiceStub
# Get API credentials setup
...
with use_sift_channel(sift_channel_config) as channel:
file_attachment_service = FileAttachmentService(channel, rest_config)
run = entity=Entity(
entity_id=run_id, # some arbitrary run ID that refers to an existing run
entity_type=EntityType.RUN,
)
# uploading the file attachment and attaching it to a run of `run_id`
remote_file = file_attachment_service.upload_attachment(
path="path/to/foo.mp4",
entity=run,
# Metatadata.. optional but recommended for optimal viewing in the application
metadata=VideoMetadata(height=2160, width=3840, duration_seconds=5.5),
description="thrusters getting too hot" ,
)
# retrieving all of the file attachments for our run
all_file_attachments = file_attachment_service.retrieve_attachments(run)
# downloading our file_attachment and saving it to our current working dir
file_attachment_service.download_attachment(remote_file)
# downloading our file_attachment and saving it somewhere else with a different name
file_attachment_service.download_attachment(remote_file, "somewhere/else/foo.mp4")
# deleting out file attachment from Sift
file_attachment_service.delete_file_attachments(remote_file_1, remote_file_2, remote_file_etc)Previously the buffered ingestion API flushes a buffer whenever any one of the following conditions are met:
- The caller manually calls
flush - The buffer gets filled
- The
with-block associated with the buffered ingestion service as a context-manager goes out of scope. - An exception is raised inside of the aforementioned
with-block.
Note that the last two points only apply if the buffered ingestion service is used as a context manager like so:
with ingestion_service.buffered_ingestion() as buffered_ingestion:
...Now there is support to periodically flush the buffer regardless of whether or not the buffer is filled. To configure a timer to flush periodically:
with ingestion_service.buffered_ingestion(flush_interval_sec=3.2) as buffered_ingestion:
...This will configure the buffered ingestion service to flush its buffer every 3.2 seconds. If the buffer happens to be filled before the timer elapses,
then the timer will reset.
Previously when using the buffered ingestion service as a context manager, any exception that gets raised within the with-block will result
in the service attempting to flush one more time. The caller can now customize error-handling behavior by passing in a function handler that takes
in three arguments: the error that's raised, the buffer containing the remaining requests that weren't ingested, and a function that when called will
attempt to flush the buffer.
# Custom code to run when error
def on_error_calback(err, buffer, flush):
# Maybe try to save contents of buffer to disk
...
# Try once more to flush the buffer
flush()
with ingestion_service.buffered_ingestion(on_error=on_error_calback) as buffered_ingestion:
...Summary of changes:
- Extend support for Python 3.8
Summary of changes:
- Promote to
v0.1.0from release candidate state. - Add module to download telemetry
In addition to the changes above, documentation is now also available online:
Summary of changes:
The following are some gRPC error codes that can happen due to external factors that Sift doesn't directly control:
UNKNOWNUNAVAILABLEABORTEDDEADLINE_EXCEEDED
They are the source of common disruptions, particularly during ingestion, and so this mechanism will automatically retry failed RPCs over an existing connection or will establish a new one if necessary.
Summary of changes:
- Introduced automated ingestion request buffering to improve performance
- Added support for multi-config ingestion and creating new flows at run-time
- Added methods that combine request creation and ingestion into a single-step.
- Updates to documentation.
For in-depth documentation please see the documentation section of the README for instructions on how to build the documentation locally.
- Combining Request Generation and Ingestion into a Single Step
- Request Buffering
- Creating New Flows on the Fly
- Multi-config Ingestion
Previously to ingest a single request you needed to do the following:
request = ingestion_service.try_create_ingestion_request(
flow_name="logs",
timestamp=timestamp,
channel_values=[
{
"channel_name": "log",
"value": string_value("some log"),
},
],
)
ingestion_service.ingest(request)Now you can combine both steps using either of the following APIs:
ingestion_service.try_ingest_flows({
"flow_name": "log",
"timestamp": timestamp,
"channel_values": [
{
"channel_name": "log",
"value": string_value("some string")
},
],
})You can also send multiple flows:
# Send data for both logs and readings flows
ingestion_service.try_ingest_flows(
{
"flow_name": "readings",
"timestamp": datetime.now(timezone.utc),
"channel_values": [
{
"channel_name": "velocity",
"component": "mainmotor",
"value": double_value(10),
},
{
"channel_name": "voltage",
"value": int32_value(5),
},
{
"channel_name": "vehicle_state",
"value": enum_value(2),
},
],
},
{
"flow_name": "logs",
"timestamp": datetime.now(timezone.utc),
"channel_values": [
{
"channel_name": "logs",
"value": string_value("INFO: some message")
},
],
},
)Requests are now automatically buffered using the buffered ingestion API. Using this may significantly improve performance as it allows serialization and ingestion to occur in batches:
# Defaults to a buffer size of `sift_py.ingestion.buffer.DEFAULT_BUFFER_SIZE` requests.
with ingestion_service.buffered_ingestion() as buffered_ingestion:
buffered_ingestion.try_ingest_flows(*lots_of_flows)
buffered_ingestion.try_ingest_flows(*lots_more_flows)
# Custom buffer size of 750 requests
with ingestion_service.buffered_ingestion(750) as buffered_ingestion:
buffered_ingestion.try_ingest_flows(*lots_of_flows)
buffered_ingestion.try_ingest_flows(*lots_more_flows)Once the with-block ends the remaining requests will be automatically flushed and ingested, but flushing may also be done manually:
with ingestion_service.buffered_ingestion() as buffered_ingestion:
buffered_ingestion.try_ingest_flows(*lots_of_flows)
buffered_ingestion.flush()Contrast this with regular ingestion:
ingestion_service.try_ingest_flows(*lots_of_flows)
ingestion_service.try_ingest_flows(*lots_more_flows)If there is a flow you need to create on the fly which wasn't declared in your initial telemetry config, you may use either of the following APIs:
try_create_flowcreate_flow
new_flow_config = FlowConfig(
name="my_new_flow", channels=[ChannelConfig("new_channel", ChannelDataType.DOUBLE)]
)
ingestion_service.try_create_flow(new_flow_config)There is now an ergonomic utility class, IngestionServicesManager, that allows users to manage telemetry for multiple configs:
manager = IngestionServicesManager.from_telementry_configs(grpc_channel, {
"config_a": config_a,
"config_b": config_b,
})
with manager.ingestion_service("config_a") as config_a:
config_a.try_ingest_flow(...)
with manager.ingestion_service("config_b") as config_b:
config_b.try_ingest_flow(...)