All notable changes to this project will be documented in this file.
This project adheres to Semantic Versioning.
- Resolve nested calculated channel references when fetching calculated channels so expressions that depend on other calculated channels evaluate correctly. (#580)
- Infer the parquet time column for integer timestamp columns in data imports so files with integer epoch time columns no longer fail detection. (#582)
- Replace the POSIX-only
fcntl.flockcall in the test-results log with a cross-platformfilelock.FileLockso importingsift_clientno longer fails on Windows. (#574)
TestMeasurementCreate and TestMeasurementUpdate now accept description, metadata, and channel_names to attach contextual information and link measurements to the channels they were derived from. channel_names accepts either channel name strings or Channel objects. Descriptions longer than 2000 characters are truncated with a warning to match the server-side limit.
TestStepCreate and TestStepUpdate now accept a metadata dict so test steps can carry structured key/value context the same way reports and measurements do. The ReportContext.new_step / create_step and NewStep.substep helpers in the test-results context manager forward a new metadata kwarg through to the created step.
- Pass the configured log file through to test result update calls so update events are written to the
.jsonllog alongside create events. (#560)
- Update protos
- Fix log not being passed with update calls
- Add metadata, description, and associated channel support to measurements
- Add step metadata support
v0.15.0 updates the streaming ingestion client to match the new sift-stream-bindings 0.3.0
API. The RecoveryStrategyConfig class and recovery_strategy parameter have been replaced
with an explicit StreamingMode enum and discrete per-mode configuration kwargs. The
non-blocking send method has been renamed for consistency with the Rust library, and a new
try_send method is available for single-flow non-blocking sends. This release contains
breaking changes to the ingestion API — see below for details and a migration prompt.
RecoveryStrategyConfig and the recovery_strategy parameter on IngestionConfigStreamingClient
and IngestionAPIAsync have been removed. Transport mode is now selected via a StreamingMode
enum, with separate retry_policy and disk_backup_policy kwargs for per-mode configuration.
The default mode is StreamingMode.LIVE_WITH_BACKUPS, which matches the previous default
behavior of RecoveryStrategyConfig.retry_with_backups().
Removed from the public API:
RecoveryStrategyConfigclass (and itsretry_only()/retry_with_backups()factory methods)recovery_strategyparameter onIngestionConfigStreamingClientandIngestionAPIAsync
Added:
StreamingModeenum —LIVE_ONLY,LIVE_WITH_BACKUPS,FILE_BACKUPstreaming_modeparameter (default:StreamingMode.LIVE_WITH_BACKUPS)retry_policyparameter — applies toLIVE_WITH_BACKUPSmodedisk_backup_policyparameter — applies toLIVE_WITH_BACKUPSandFILE_BACKUPmodescheckpoint_interval_secondsparameter — applies toLIVE_WITH_BACKUPSmode
Before:
from sift_client.resources.ingestion import (
IngestionConfigStreamingClient,
RecoveryStrategyConfig,
)
# Default: live streaming with backups
client = await IngestionConfigStreamingClient.create(
ingestion_config=my_config,
recovery_strategy=RecoveryStrategyConfig.retry_with_backups(),
)
# Retry only (no disk backups)
client = await IngestionConfigStreamingClient.create(
ingestion_config=my_config,
recovery_strategy=RecoveryStrategyConfig.retry_only(),
)After:
from sift_client.resources.ingestion import (
IngestionConfigStreamingClient,
StreamingMode,
)
# Default: live streaming with backups (no change needed if you were using the default)
client = await IngestionConfigStreamingClient.create(
ingestion_config=my_config,
streaming_mode=StreamingMode.LIVE_WITH_BACKUPS,
)
# Live only — no disk backups, lowest overhead
client = await IngestionConfigStreamingClient.create(
ingestion_config=my_config,
streaming_mode=StreamingMode.LIVE_ONLY,
)
# File backup only
client = await IngestionConfigStreamingClient.create(
ingestion_config=my_config,
streaming_mode=StreamingMode.FILE_BACKUP,
)To pass a custom retry or disk backup policy:
from sift_stream_bindings import DiskBackupPolicyPy, RetryPolicyPy
client = await IngestionConfigStreamingClient.create(
ingestion_config=my_config,
streaming_mode=StreamingMode.LIVE_WITH_BACKUPS,
retry_policy=RetryPolicyPy.default(),
disk_backup_policy=DiskBackupPolicyPy.default(),
checkpoint_interval_seconds=30,
)IngestionConfigStreamingClient.send_requests_nonblocking has been renamed to try_send_requests
to align with the Rust sift-stream naming convention where try_ methods return immediately
without awaiting channel capacity.
Before:
client.send_requests_nonblocking(requests)After:
client.try_send_requests(requests)A new try_send(flow) method is available on IngestionConfigStreamingClient for non-blocking
single-flow sends. It accepts either a Flow or a raw FlowPy object.
client.try_send(flow)Use try_send in real-time loops where blocking on channel capacity is unacceptable. For
most use cases, the async send(flow) method (which applies backpressure) is preferred.
The sift-stream-bindings dependency has been bumped to 0.3.0, which reflects the
sift-stream 0.9.0 breaking API changes (stepped builder, send rename, removed types).
Copy and paste the following prompt to an AI coding agent to automate the upgrade:
You are upgrading a Python project from sift_client v0.14.x to v0.15.0. The streaming ingestion
API has breaking changes. Apply ALL of the following changes precisely. Do not make any other
modifications.
---
## 1. Update `pyproject.toml`
Find every occurrence of `sift-stream-bindings==0.2.2` and replace it with
`sift-stream-bindings==0.3.0`. This may appear under multiple dependency groups (e.g. `all`,
`dev-all`, `sift-stream`, `sift-stream-bindings`).
---
## 2. Remove all imports of `RecoveryStrategyConfig`
Delete any line that imports `RecoveryStrategyConfig`, for example:
from sift_client.resources.ingestion import RecoveryStrategyConfig
from sift_client.resources.ingestion import IngestionConfigStreamingClient, RecoveryStrategyConfig
Remove only `RecoveryStrategyConfig` from those imports; keep any other names on the same line.
---
## 3. Add `StreamingMode` to imports where needed
Wherever `IngestionConfigStreamingClient` or `IngestionAPIAsync` is imported and a streaming
mode needs to be specified, add `StreamingMode` to the import:
from sift_client.resources.ingestion import IngestionConfigStreamingClient, StreamingMode
---
## 4. Replace all `recovery_strategy` call sites
Search for every call to `IngestionConfigStreamingClient.create(...)` and
`IngestionAPIAsync.create(...)` that contains a `recovery_strategy` keyword argument.
### Case A — `RecoveryStrategyConfig.retry_with_backups()` (or no recovery_strategy at all)
This was (and remains) the default. Replace the kwarg:
# BEFORE
recovery_strategy=RecoveryStrategyConfig.retry_with_backups()
# AFTER
streaming_mode=StreamingMode.LIVE_WITH_BACKUPS
If the call had no `recovery_strategy` argument, no change is needed — `LIVE_WITH_BACKUPS`
is the default.
If the old call passed explicit `retry_policy` or `disk_backup_policy` arguments inside
`RecoveryStrategyConfig.retry_with_backups(...)`, move them to top-level kwargs:
# BEFORE
recovery_strategy=RecoveryStrategyConfig.retry_with_backups(
retry_policy=my_retry_policy,
disk_backup_policy=my_disk_policy,
)
# AFTER
streaming_mode=StreamingMode.LIVE_WITH_BACKUPS,
retry_policy=my_retry_policy,
disk_backup_policy=my_disk_policy,
### Case B — `RecoveryStrategyConfig.retry_only()`
Replace with `streaming_mode=StreamingMode.LIVE_ONLY`. If a `retry_policy` was passed,
keep it as a top-level kwarg (it is ignored for `LIVE_ONLY` in this version, but preserving
it avoids a TypeError):
# BEFORE
recovery_strategy=RecoveryStrategyConfig.retry_only(retry_policy=my_policy)
# AFTER
streaming_mode=StreamingMode.LIVE_ONLY
### Case C — Raw `RecoveryStrategyPy` object passed directly
If any call passes a raw `RecoveryStrategyPy` instance as `recovery_strategy`, determine
which mode it was configured for and replace accordingly:
- `RecoveryStrategyPy.retry_only(...)` → `streaming_mode=StreamingMode.LIVE_ONLY`
- `RecoveryStrategyPy.retry_with_backups(...)` → `streaming_mode=StreamingMode.LIVE_WITH_BACKUPS`
with `retry_policy` and `disk_backup_policy` promoted to top-level kwargs.
---
## 5. Rename `send_requests_nonblocking` → `try_send_requests`
Find every call to `.send_requests_nonblocking(...)` on any ingestion client instance and
rename it to `.try_send_requests(...)`. The signature is unchanged.
# BEFORE
client.send_requests_nonblocking(requests)
# AFTER
client.try_send_requests(requests)
---
## 6. Verify
After applying the above changes:
1. Run `grep -r "RecoveryStrategyConfig" .` — expect zero results.
2. Run `grep -r "send_requests_nonblocking" .` — expect zero results.
3. Run `grep -r "recovery_strategy" .` — expect zero results.
4. Run your test suite to confirm no remaining references.
- Lazy-import
h5pyandnptdmsso the base install doesn't require them. (#547) - Expose
page_sizeon resource list methods so callers can shrink pages when responses hit gRPC message-size limits. (#548)
The sift_client module now exposes a data import API supporting CSV, Parquet, TDMS, and HDF5. With this addition, all features previously available only in sift_py are now available in sift_client, which remains the recommended interface for new development. sift_py (deprecated since v0.10.0) continues to work and ship in this release.
Migrating from sift_py: the per-format upload services (CsvUploadService, ParquetUploadService, Hdf5UploadService, TdmsUploadService) collapse into a single client.data_import.import_from_path method. sift_py only auto-detected for CSV via simple_upload; other formats required more setup. sift_client unifies all four with auto-detection built into import_from_path itself: the config argument is optional, so the common call takes just a file path and target asset. Call client.data_import.detect_config(...) first if you want to inspect or patch the config before importing. import_from_path returns a Job you can optionally wait on.
# sift_py (deprecated)
from sift_py.data_import.csv import CsvUploadService
from sift_py.rest import SiftRestConfig
rest_conf: SiftRestConfig = {"uri": sift_uri, "apikey": apikey}
csv_service = CsvUploadService(rest_conf)
import_service = csv_service.simple_upload(asset_name="my_asset", path="data.csv")
import_service.wait_until_complete()
# sift_client
from sift_client import SiftClient
client = SiftClient(api_key=apikey, grpc_url=grpc_url, rest_url=rest_url)
job = client.data_import.import_from_path("data.csv", asset="my_asset")
job.wait_until_complete()Format-by-format support:
- CSV: auto-detected from
.csv. Supports an optional JSON metadata row (row 1 or row 2) for specifying channel names, units, data types, and the time column format. - Parquet: requires an explicit
data_type(PARQUET_FLATDATASETorPARQUET_SINGLE_CHANNEL_PER_ROW) since.parquetalone doesn't disambiguate the layout. Detection only reads the file footer, so it stays fast on large files. - HDF5: new in this release. Auto-detected from
.h5/.hdf5. Detection works out-of-the-box for files with a compound-dataset layout (first field = time) or a shared root-leveltimedataset; other layouts may need a hand-builtconfig. - TDMS: new in this release. Auto-detected from
.tdmsand recognizes the common timing conventions (group-levelxchannel, first-channelTimeStamp, or per-channel waveform properties).TdmsImportConfigcontrols handling of untimed channels (fallback_method), complex values (complex_component), scaled vs. raw data, and waveform start-time overrides.
client.data_export.export(...) now accepts ExportOutputFormat.PARQUET alongside the existing CSV and Sun/WinPlot options. Unlike the sift_py DataService + DataFrame.to_parquet() pattern (async-only, buffers everything in memory, name-strings only), the new export API runs as a server-side job, works sync or async, accepts Asset/Channel objects or IDs, and scales to large exports.
# sift_py (deprecated): no dedicated export API, so query in-memory and write yourself
import pandas as pd
from sift_py.data.query import ChannelQuery, DataQuery
from sift_py.data.service import DataService
from sift_py.grpc.transport import use_sift_async_channel
async with use_sift_async_channel({"uri": sift_uri, "apikey": apikey}) as channel:
result = await DataService(channel).execute(DataQuery(
asset_name="my_asset",
start_time=start,
end_time=stop,
channels=[ChannelQuery(channel_name="my_channel")],
))
pd.DataFrame(result.all_channels()[0].columns()).to_parquet("out.parquet")
# sift_client
from sift_client import SiftClient
from sift_client.sift_types.export import ExportOutputFormat
client = SiftClient(api_key=apikey, grpc_url=grpc_url, rest_url=rest_url)
job = client.data_export.export(
assets=["my_asset"], # accepts Asset objects or IDs
channels=["my_channel"], # accepts Channel objects or IDs
start_time=start,
stop_time=stop,
output_format=ExportOutputFormat.PARQUET,
)
files = job.wait_and_download(output_dir="./exports")wait_and_download polls the job to completion, downloads the resulting archive, and returns the list of downloaded file paths. Both arguments shown are optional: if output_dir is omitted, a temporary directory is created and used. By default the downloaded zip is extracted (and the archive deleted) so you get the Parquet files directly; pass extract=False to keep the zip instead, which is useful if you want to hand the whole bundle off to another system without unpacking it client-side.
Test result create and update events can now be optionally written to a .jsonl log file during a test run, then replayed against the Sift API later via the new import-test-result-log CLI command (installed with the package).
Adds progress indicators for job polling and file downloads for better visibility during long-running operations.
Previously the package shipped its own bundled copies of the generated Python bindings for google.api, protoc_gen_openapiv2, and buf.validate. With this release:
google.apiandprotoc_gen_openapiv2are now pulled in via thegoogleapis-common-protosandprotoc-gen-openapiv2runtime dependencies, so pip installs the upstream-maintained versions.buf.validatehas been removed entirely. The protovalidate field annotations were stripped from the affected.protofiles, so the generated_pb2.pyfiles no longer referencebuf.validate.
- Add
py.typedto the generated proto directory so type checkers pick up protobuf types correctly.
- Add data import API to sift_client
- Add HDF5 client-side detect_config
- Add TDMS client-side detect_config
- Add optional logging of test result create and update events
- Add progress indicators for job polling and file downloads
- Refactor files using run_in_executor
- Add Parquet as an export output format
- Add py.typed file to proto dir
- Remove vendored google.api and protoc_gen_openapiv2 modules from buf
- Remove protovalidate from client-side protos
The sift_client module now exposes a public export interface and wrapper, making it easier to integrate with downstream tools and workflows.
The grpc, rest, and time modules have been migrated from sift_py to sift_client. All imports from sift_py for these modules have been removed. This continues the transition toward sift_client as the primary interface ahead of the v1.0.0 release.
File attachment uploads now support arbitrary file types (e.g., Parquet, pcapng) by falling back to application/octet-stream for unknown MIME types.
SiftChannelConfig now handles URLs more robustly, fixing gRPC connection errors caused by missing URL schemes.
The streaming ingestion module has been refactored to use sift_stream_bindings types for improved performance.
- Add export API for SiftClient
- Port grpc, rest, and time modules into sift_client
- Fall back to application/octet-stream for unknown MIME types
- Parquet file attachment support
- Fix gRPC connection errors with missing scheme
- Migrate to sift_stream_bindings types for improved ingestion performance
- Add Python streaming ingestion tutorial example
- Update pandas and pandas-stubs version bounds
- Pin doc build versions
SiftClient has been updated with the ability to archive and unarchive channels, allowing users to cleanup old channel definitions that are no longer desired.
SiftClient has been updated to provide better support for RuleVersions and better ergonomics for Reports
- Additional methods have been added to the SiftClient Rules resource to allow easy access to RuleVersions
- Reports can now be generated from RuleVersions
- The Jobs and Reports resources now have a
wait_until_completesync/async method, which provides a convenient way to wait until a job or report is complete, with configurable polling and timeout settings. - When creating reports, SiftClient now returns a Job object, to signify that the report is still in progress. Waiting on the job with
wait_until_completewill return the completed report once finished. The report_id can also still be accessed directly from the job object if needed.
- Add more support for rule versions to SiftClient
- Add better report creation and wait_until_complete to SiftClient
- Add pagination for list ingestion config/flows
- Increase page size, fix list ingestion config API
- Regenerate remote file upload proto to include TestSteps
The sift_py module is deprecated as of v0.10.0 and will be removed in v1.0.0.
Please use sift_client for all new development. Several minor releases will follow
before the major release to add features and give users time to migrate.
- Stabilizes the sift_client module.
- Add FileAttachmentsMixin to TestStep
- Add support for live rule field in sift-client
- Adds batch rule update/create support to sift_client
- Sift client jobs resource
- Set Content-Disposition on data imports
- Show validation errors in External Rules
- Use faster yaml loading for telemtery configs
Adds support for uploading Parquet files to Sift for ingestion through the addition of the ParquetUploadService. See examples/data_import/parquet for an example of how to upload Parquet files.
Adds a flag to automatically import TDMS file metdadata into Sift Runs.
Adds the ability to create flows lazily which reduces initialization time of the IngestService.
- Lazy flow creation
- Adds support for bytes
- Add ParquetUploadService
- Import TDMS file metadata to Sift Run
- Catch PermissionError when removing temp files
- Add support for start and end times to rule evaluation
Adds support for uploading HDF5 files to Sift for ingestion through the addition of the Hdf5UploadService and Hdf5Config. See examples/data_import/hdf5 for an example of how to upload HDF5 files.
- Add HDF5 upload service
- Fixes bug when updating rules where the rule_id is not always passed to Sift
Support for attaching metadata to a run or asset has been introduced through the addition of the AssetService and updates to the attach_run method of the IngestionService. See examples/assets for an example of updating metadata for an asset.
- Make ingestion client key optional
- Add support for metadata definition through the python client
- Add public method for creating new run
The RuleEvaluationService has been introduced to provide functionality for evaluating rules and reports against runs or assets. This includes support for dry-run evaluations and previews, as well as evaluating rules defined in YAML configurations.
The RuleService has been updated to support External Rules. External Rules are ideal for automated workflows such as CI/CD pipelines, where external Rules are managed and evaluated programmatically.
Read more about External Rules here here.
- Add support for rule evaluation service and external rules
- Allow creating multiple new flows at the same time
- Contextual channels support
- Updates for user defined functions and external rules
- Fix bugs within the IngestService
A new RosbagsUploadService has been added to sift_py.data_import.rosbags, which provides the functionality for uploading
rosbags, including video frames from a rosbag. See examples/data_import/rosbags for examples on how to use the new service.
A new CalculatedChannelService, in sift_py.calculated_channels.service, provides functionality for creating, updating,
searching, and listing Calculated Channels. It also provides a YAML interface for defining Calculated Channels using a
YAML file. Examples can be found in examples/calculated_channels.
Channel Component field is being deprecated and will be removed in version 1.0.0. Please see https://docs.siftstack.com/docs/glossary#component
for more information.
- Added automatic retry for REST API calls
- Deprecated
componentfield for Channels - Fix for
from_annotation_typeinRuleConfig - Added support for Calculated Channels
- Remove previously deprecated
overwrite_rulesargument from ingestion service - Fix
ReportTemplateupdating - Remove breakpoint from internal function
- Moves npTDMS dependency to optional
Specify
sift-stack-py[tdms]in your project dependencies if you need to use the TDMS upload service.
There is now a ReportTemplateService that is available from sift_py.report_templates.service. This service allows for creating,
updating, and retrieving report templates with a client key, where client keys are unique user defined strings associated with a given report
template. Report templates may either by defined via a python config (using the ReportTemplateConfig class) or a YAML file, which
follows the same schema.
Please see examples/report_templates for some examples of how to manage report templates via python and YAML using the ReportTemplateService.
There is now a RuleService that is available from sift_py.rule.service. This service allows for creating, updating, and retrieving rules.
Rules created through this service should now include a rule client key, a unique user defined string associated with a given rule. As before,
rules may be defined in python (via the RuleConfigClass) or via YAML (following the RuleYamlSpec class).
Please see examples/ingestion_with_yaml_config for an example of how to create rules from YAML using the RuleService.
There is now a base class, BaseCh10File, and an upload service, Ch10UploadService that allows for easily parsing and uploading IRIG Chapter 10 files.
Please see examples/data_import/ch10 for an example of how to use these classes.
- Enables keepalive by default and configures user-agent
- Ping Example added
- Adds optional openSSL certificate fetching
The overwrite_rules option of IngestionService is going to be deprecated in the next release, and will emit a warning if set.
- Updates most examples to use RuleService to create rules
- Adds ReportTemplateService and example usage
- Enables keepalive by default and configures user-agent
- Fix status bug with CSV uploads
- Support Windows mimetypes for CSV uploads
- Allow uploading compressed chapter 10 files
- Fix data download bug for channels with '.' delimited name.
- Improvements to CSV importing
- Ping Example added
- Adds optional openSSL certificate fetching
- Loosen various dependency requirements
Summary of changes:
- Loosen various dependency requirements
Summary of changes:
- add missing double channel computation
- update protos
- update protos
- csv example ingestion
- update example
- Add multiple CSV ingestion example
- Add metadata arg to use_sift_channel
Summary of changes:
- Add pydantic models for the sift grafana plugin queries
- Lowered minimum version requirements for various dependencies and dependency cleanup
- Added py.typed to modules that utilize type annotations
- Added services derived from protobuf for the following APIs
- Remote files
- Rule versions
- Saved searches
- Added helpful validations when processing Sift API URLs
- Added support for gRPC keep-alive
- Added service to upload and download file attachments
- Added timer-based flushing for buffered ingestion and custom error-handling
- Added functionality to generate a channel value from a channel config
The following section will cover some of the more notable features in depth. Please refer to the documentation for even more detail.
- Keepalive
- File attachments
- Timer based flushing for buffered ingestion
- Custom error handling for buffered ingestion
Long-lived connections are generally are at risk of being closed due to idle timeouts if they are idle for a particular duration. In other words, if there is no data being exchanged on the connection for a duration specified by a load balancer's idle timeout, the connection will be closed. In order to ensure that this does not happen you can opt into enabling the HTTP/2 PING-based keepalive mechanism either on your own gRPC channel or the one that Sift provides. To configure keep-alive using the Sift-provided gRPC channel:
from sift_py.grpc.transport import use_sift_channel, SiftChannelConfig
sift_channel_config = SiftChannelConfig(
uri=uri,
apikey=apikey,
enable_keepalive=True,
)
with use_sift_channel(sift_channel_config) as channel:
...This uses default values set in the sift_py.grpc.keepalive module. If you'd like to configure your own keepalive parameters you could
also do the following:
from sift_py.grpc.transport import use_sift_channel, SiftChannelConfig
from sift_py.grpc.keepalive import KeepaliveConfig
sift_channel_config = SiftChannelConfig(
uri=uri,
apikey=apikey,
enable_keepalive=KeepaliveConfig(
keepalive_time_ms=keepalive_time_ms,
keepalive_timeout_ms=keepalive_timeout_ms,
keepalive_permit_without_calls=keepalive_permit_without_calls,
max_pings_without_data=max_pings_without_data,
),
)
with use_sift_channel(sift_channel_config) as channel:
...There is now a sift_py.file_attachment module that contains utilities to programmatically upload and download file attachments.
Files currently can be attached to various entities such as runs, annotations, and annotation logs. Various video and image formats
are currently supported. Below is an example demonstrating how to upload a file attachment and programmatically downloading it.
from sift_py.grpc.transport import SiftChannelConfig, use_sift_channel
from sift_py.file_attachment.service import FileAttachmentService
from sift_py.file_attachment.entity import Entity, EntityType
from sift_py.file_attachment.metadata import VideoMetadata
from sift_py.rest import SiftRestConfig
from sift.remote_files.v1.remote_files_pb2 import GetRemoteFileRequest
from sift.remote_files.v1.remote_files_pb2_grpc import RemoteFileServiceStub
# Get API credentials setup
...
with use_sift_channel(sift_channel_config) as channel:
file_attachment_service = FileAttachmentService(channel, rest_config)
run = entity=Entity(
entity_id=run_id, # some arbitrary run ID that refers to an existing run
entity_type=EntityType.RUN,
)
# uploading the file attachment and attaching it to a run of `run_id`
remote_file = file_attachment_service.upload_attachment(
path="path/to/foo.mp4",
entity=run,
# Metatadata.. optional but recommended for optimal viewing in the application
metadata=VideoMetadata(height=2160, width=3840, duration_seconds=5.5),
description="thrusters getting too hot" ,
)
# retrieving all of the file attachments for our run
all_file_attachments = file_attachment_service.retrieve_attachments(run)
# downloading our file_attachment and saving it to our current working dir
file_attachment_service.download_attachment(remote_file)
# downloading our file_attachment and saving it somewhere else with a different name
file_attachment_service.download_attachment(remote_file, "somewhere/else/foo.mp4")
# deleting out file attachment from Sift
file_attachment_service.delete_file_attachments(remote_file_1, remote_file_2, remote_file_etc)Previously the buffered ingestion API flushes a buffer whenever any one of the following conditions are met:
- The caller manually calls
flush - The buffer gets filled
- The
with-block associated with the buffered ingestion service as a context-manager goes out of scope. - An exception is raised inside of the aforementioned
with-block.
Note that the last two points only apply if the buffered ingestion service is used as a context manager like so:
with ingestion_service.buffered_ingestion() as buffered_ingestion:
...Now there is support to periodically flush the buffer regardless of whether or not the buffer is filled. To configure a timer to flush periodically:
with ingestion_service.buffered_ingestion(flush_interval_sec=3.2) as buffered_ingestion:
...This will configure the buffered ingestion service to flush its buffer every 3.2 seconds. If the buffer happens to be filled before the timer elapses,
then the timer will reset.
Previously when using the buffered ingestion service as a context manager, any exception that gets raised within the with-block will result
in the service attempting to flush one more time. The caller can now customize error-handling behavior by passing in a function handler that takes
in three arguments: the error that's raised, the buffer containing the remaining requests that weren't ingested, and a function that when called will
attempt to flush the buffer.
# Custom code to run when error
def on_error_calback(err, buffer, flush):
# Maybe try to save contents of buffer to disk
...
# Try once more to flush the buffer
flush()
with ingestion_service.buffered_ingestion(on_error=on_error_calback) as buffered_ingestion:
...Summary of changes:
- Extend support for Python 3.8
Summary of changes:
- Promote to
v0.1.0from release candidate state. - Add module to download telemetry
In addition to the changes above, documentation is now also available online:
Summary of changes:
The following are some gRPC error codes that can happen due to external factors that Sift doesn't directly control:
UNKNOWNUNAVAILABLEABORTEDDEADLINE_EXCEEDED
They are the source of common disruptions, particularly during ingestion, and so this mechanism will automatically retry failed RPCs over an existing connection or will establish a new one if necessary.
Summary of changes:
- Introduced automated ingestion request buffering to improve performance
- Added support for multi-config ingestion and creating new flows at run-time
- Added methods that combine request creation and ingestion into a single-step.
- Updates to documentation.
For in-depth documentation please see the documentation section of the README for instructions on how to build the documentation locally.
- Combining Request Generation and Ingestion into a Single Step
- Request Buffering
- Creating New Flows on the Fly
- Multi-config Ingestion
Previously to ingest a single request you needed to do the following:
request = ingestion_service.try_create_ingestion_request(
flow_name="logs",
timestamp=timestamp,
channel_values=[
{
"channel_name": "log",
"value": string_value("some log"),
},
],
)
ingestion_service.ingest(request)Now you can combine both steps using either of the following APIs:
ingestion_service.try_ingest_flows({
"flow_name": "log",
"timestamp": timestamp,
"channel_values": [
{
"channel_name": "log",
"value": string_value("some string")
},
],
})You can also send multiple flows:
# Send data for both logs and readings flows
ingestion_service.try_ingest_flows(
{
"flow_name": "readings",
"timestamp": datetime.now(timezone.utc),
"channel_values": [
{
"channel_name": "velocity",
"component": "mainmotor",
"value": double_value(10),
},
{
"channel_name": "voltage",
"value": int32_value(5),
},
{
"channel_name": "vehicle_state",
"value": enum_value(2),
},
],
},
{
"flow_name": "logs",
"timestamp": datetime.now(timezone.utc),
"channel_values": [
{
"channel_name": "logs",
"value": string_value("INFO: some message")
},
],
},
)Requests are now automatically buffered using the buffered ingestion API. Using this may significantly improve performance as it allows serialization and ingestion to occur in batches:
# Defaults to a buffer size of `sift_py.ingestion.buffer.DEFAULT_BUFFER_SIZE` requests.
with ingestion_service.buffered_ingestion() as buffered_ingestion:
buffered_ingestion.try_ingest_flows(*lots_of_flows)
buffered_ingestion.try_ingest_flows(*lots_more_flows)
# Custom buffer size of 750 requests
with ingestion_service.buffered_ingestion(750) as buffered_ingestion:
buffered_ingestion.try_ingest_flows(*lots_of_flows)
buffered_ingestion.try_ingest_flows(*lots_more_flows)Once the with-block ends the remaining requests will be automatically flushed and ingested, but flushing may also be done manually:
with ingestion_service.buffered_ingestion() as buffered_ingestion:
buffered_ingestion.try_ingest_flows(*lots_of_flows)
buffered_ingestion.flush()Contrast this with regular ingestion:
ingestion_service.try_ingest_flows(*lots_of_flows)
ingestion_service.try_ingest_flows(*lots_more_flows)If there is a flow you need to create on the fly which wasn't declared in your initial telemetry config, you may use either of the following APIs:
try_create_flowcreate_flow
new_flow_config = FlowConfig(
name="my_new_flow", channels=[ChannelConfig("new_channel", ChannelDataType.DOUBLE)]
)
ingestion_service.try_create_flow(new_flow_config)There is now an ergonomic utility class, IngestionServicesManager, that allows users to manage telemetry for multiple configs:
manager = IngestionServicesManager.from_telementry_configs(grpc_channel, {
"config_a": config_a,
"config_b": config_b,
})
with manager.ingestion_service("config_a") as config_a:
config_a.try_ingest_flow(...)
with manager.ingestion_service("config_b") as config_b:
config_b.try_ingest_flow(...)