Skip to content

Latest commit

 

History

History
1012 lines (769 loc) · 46.8 KB

File metadata and controls

1012 lines (769 loc) · 46.8 KB

Change Log

All notable changes to this project will be documented in this file.

This project adheres to Semantic Versioning.

[v0.16.2] - May 21, 2026

Bugfixes

  • Resolve nested calculated channel references when fetching calculated channels so expressions that depend on other calculated channels evaluate correctly. (#580)
  • Infer the parquet time column for integer timestamp columns in data imports so files with integer epoch time columns no longer fail detection. (#582)

[v0.16.1] - May 19, 2026

Bugfixes

  • Replace the POSIX-only fcntl.flock call in the test-results log with a cross-platform filelock.FileLock so importing sift_client no longer fails on Windows. (#574)

[v0.16.0] - May 14, 2026

What's New

Description, Metadata, and Associated Channels on Test Measurements

TestMeasurementCreate and TestMeasurementUpdate now accept description, metadata, and channel_names to attach contextual information and link measurements to the channels they were derived from. channel_names accepts either channel name strings or Channel objects. Descriptions longer than 2000 characters are truncated with a warning to match the server-side limit.

Metadata on Test Steps

TestStepCreate and TestStepUpdate now accept a metadata dict so test steps can carry structured key/value context the same way reports and measurements do. The ReportContext.new_step / create_step and NewStep.substep helpers in the test-results context manager forward a new metadata kwarg through to the created step.

Bugfixes

  • Pass the configured log file through to test result update calls so update events are written to the .jsonl log alongside create events. (#560)

Full Changelog

[v0.15.0] - May 7, 2026

What's New

v0.15.0 updates the streaming ingestion client to match the new sift-stream-bindings 0.3.0 API. The RecoveryStrategyConfig class and recovery_strategy parameter have been replaced with an explicit StreamingMode enum and discrete per-mode configuration kwargs. The non-blocking send method has been renamed for consistency with the Rust library, and a new try_send method is available for single-flow non-blocking sends. This release contains breaking changes to the ingestion API — see below for details and a migration prompt.

Breaking Changes

1. RecoveryStrategyConfig Removed — Replaced by StreamingMode + Per-Mode Kwargs

RecoveryStrategyConfig and the recovery_strategy parameter on IngestionConfigStreamingClient and IngestionAPIAsync have been removed. Transport mode is now selected via a StreamingMode enum, with separate retry_policy and disk_backup_policy kwargs for per-mode configuration.

The default mode is StreamingMode.LIVE_WITH_BACKUPS, which matches the previous default behavior of RecoveryStrategyConfig.retry_with_backups().

Removed from the public API:

  • RecoveryStrategyConfig class (and its retry_only() / retry_with_backups() factory methods)
  • recovery_strategy parameter on IngestionConfigStreamingClient and IngestionAPIAsync

Added:

  • StreamingMode enum — LIVE_ONLY, LIVE_WITH_BACKUPS, FILE_BACKUP
  • streaming_mode parameter (default: StreamingMode.LIVE_WITH_BACKUPS)
  • retry_policy parameter — applies to LIVE_WITH_BACKUPS mode
  • disk_backup_policy parameter — applies to LIVE_WITH_BACKUPS and FILE_BACKUP modes
  • checkpoint_interval_seconds parameter — applies to LIVE_WITH_BACKUPS mode

Before:

from sift_client.resources.ingestion import (
    IngestionConfigStreamingClient,
    RecoveryStrategyConfig,
)

# Default: live streaming with backups
client = await IngestionConfigStreamingClient.create(
    ingestion_config=my_config,
    recovery_strategy=RecoveryStrategyConfig.retry_with_backups(),
)

# Retry only (no disk backups)
client = await IngestionConfigStreamingClient.create(
    ingestion_config=my_config,
    recovery_strategy=RecoveryStrategyConfig.retry_only(),
)

After:

from sift_client.resources.ingestion import (
    IngestionConfigStreamingClient,
    StreamingMode,
)

# Default: live streaming with backups (no change needed if you were using the default)
client = await IngestionConfigStreamingClient.create(
    ingestion_config=my_config,
    streaming_mode=StreamingMode.LIVE_WITH_BACKUPS,
)

# Live only — no disk backups, lowest overhead
client = await IngestionConfigStreamingClient.create(
    ingestion_config=my_config,
    streaming_mode=StreamingMode.LIVE_ONLY,
)

# File backup only
client = await IngestionConfigStreamingClient.create(
    ingestion_config=my_config,
    streaming_mode=StreamingMode.FILE_BACKUP,
)

To pass a custom retry or disk backup policy:

from sift_stream_bindings import DiskBackupPolicyPy, RetryPolicyPy

client = await IngestionConfigStreamingClient.create(
    ingestion_config=my_config,
    streaming_mode=StreamingMode.LIVE_WITH_BACKUPS,
    retry_policy=RetryPolicyPy.default(),
    disk_backup_policy=DiskBackupPolicyPy.default(),
    checkpoint_interval_seconds=30,
)
2. send_requests_nonblocking Renamed to try_send_requests

IngestionConfigStreamingClient.send_requests_nonblocking has been renamed to try_send_requests to align with the Rust sift-stream naming convention where try_ methods return immediately without awaiting channel capacity.

Before:

client.send_requests_nonblocking(requests)

After:

client.try_send_requests(requests)

New Features

try_send — Non-Blocking Single-Flow Send

A new try_send(flow) method is available on IngestionConfigStreamingClient for non-blocking single-flow sends. It accepts either a Flow or a raw FlowPy object.

client.try_send(flow)

Use try_send in real-time loops where blocking on channel capacity is unacceptable. For most use cases, the async send(flow) method (which applies backpressure) is preferred.

sift-stream-bindings 0.3.0

The sift-stream-bindings dependency has been bumped to 0.3.0, which reflects the sift-stream 0.9.0 breaking API changes (stepped builder, send rename, removed types).

AI-Assisted Migration Prompt (v0.14.x → v0.15.0)

Copy and paste the following prompt to an AI coding agent to automate the upgrade:

You are upgrading a Python project from sift_client v0.14.x to v0.15.0. The streaming ingestion
API has breaking changes. Apply ALL of the following changes precisely. Do not make any other
modifications.

---

## 1. Update `pyproject.toml`

Find every occurrence of `sift-stream-bindings==0.2.2` and replace it with
`sift-stream-bindings==0.3.0`. This may appear under multiple dependency groups (e.g. `all`,
`dev-all`, `sift-stream`, `sift-stream-bindings`).

---

## 2. Remove all imports of `RecoveryStrategyConfig`

Delete any line that imports `RecoveryStrategyConfig`, for example:

    from sift_client.resources.ingestion import RecoveryStrategyConfig
    from sift_client.resources.ingestion import IngestionConfigStreamingClient, RecoveryStrategyConfig

Remove only `RecoveryStrategyConfig` from those imports; keep any other names on the same line.

---

## 3. Add `StreamingMode` to imports where needed

Wherever `IngestionConfigStreamingClient` or `IngestionAPIAsync` is imported and a streaming
mode needs to be specified, add `StreamingMode` to the import:

    from sift_client.resources.ingestion import IngestionConfigStreamingClient, StreamingMode

---

## 4. Replace all `recovery_strategy` call sites

Search for every call to `IngestionConfigStreamingClient.create(...)` and
`IngestionAPIAsync.create(...)` that contains a `recovery_strategy` keyword argument.

### Case A — `RecoveryStrategyConfig.retry_with_backups()` (or no recovery_strategy at all)

This was (and remains) the default. Replace the kwarg:

    # BEFORE
    recovery_strategy=RecoveryStrategyConfig.retry_with_backups()

    # AFTER
    streaming_mode=StreamingMode.LIVE_WITH_BACKUPS

If the call had no `recovery_strategy` argument, no change is needed — `LIVE_WITH_BACKUPS`
is the default.

If the old call passed explicit `retry_policy` or `disk_backup_policy` arguments inside
`RecoveryStrategyConfig.retry_with_backups(...)`, move them to top-level kwargs:

    # BEFORE
    recovery_strategy=RecoveryStrategyConfig.retry_with_backups(
        retry_policy=my_retry_policy,
        disk_backup_policy=my_disk_policy,
    )

    # AFTER
    streaming_mode=StreamingMode.LIVE_WITH_BACKUPS,
    retry_policy=my_retry_policy,
    disk_backup_policy=my_disk_policy,

### Case B — `RecoveryStrategyConfig.retry_only()`

Replace with `streaming_mode=StreamingMode.LIVE_ONLY`. If a `retry_policy` was passed,
keep it as a top-level kwarg (it is ignored for `LIVE_ONLY` in this version, but preserving
it avoids a TypeError):

    # BEFORE
    recovery_strategy=RecoveryStrategyConfig.retry_only(retry_policy=my_policy)

    # AFTER
    streaming_mode=StreamingMode.LIVE_ONLY

### Case C — Raw `RecoveryStrategyPy` object passed directly

If any call passes a raw `RecoveryStrategyPy` instance as `recovery_strategy`, determine
which mode it was configured for and replace accordingly:
- `RecoveryStrategyPy.retry_only(...)` → `streaming_mode=StreamingMode.LIVE_ONLY`
- `RecoveryStrategyPy.retry_with_backups(...)` → `streaming_mode=StreamingMode.LIVE_WITH_BACKUPS`
  with `retry_policy` and `disk_backup_policy` promoted to top-level kwargs.

---

## 5. Rename `send_requests_nonblocking` → `try_send_requests`

Find every call to `.send_requests_nonblocking(...)` on any ingestion client instance and
rename it to `.try_send_requests(...)`. The signature is unchanged.

    # BEFORE
    client.send_requests_nonblocking(requests)

    # AFTER
    client.try_send_requests(requests)

---

## 6. Verify

After applying the above changes:
1. Run `grep -r "RecoveryStrategyConfig" .` — expect zero results.
2. Run `grep -r "send_requests_nonblocking" .` — expect zero results.
3. Run `grep -r "recovery_strategy" .` — expect zero results.
4. Run your test suite to confirm no remaining references.

[v0.14.1] - April 30, 2026

Bugfixes

  • Lazy-import h5py and nptdms so the base install doesn't require them. (#547)
  • Expose page_size on resource list methods so callers can shrink pages when responses hit gRPC message-size limits. (#548)

[v0.14.0] - April 28, 2026

What's New

Data Import API in SiftClient

The sift_client module now exposes a data import API supporting CSV, Parquet, TDMS, and HDF5. With this addition, all features previously available only in sift_py are now available in sift_client, which remains the recommended interface for new development. sift_py (deprecated since v0.10.0) continues to work and ship in this release.

Migrating from sift_py: the per-format upload services (CsvUploadService, ParquetUploadService, Hdf5UploadService, TdmsUploadService) collapse into a single client.data_import.import_from_path method. sift_py only auto-detected for CSV via simple_upload; other formats required more setup. sift_client unifies all four with auto-detection built into import_from_path itself: the config argument is optional, so the common call takes just a file path and target asset. Call client.data_import.detect_config(...) first if you want to inspect or patch the config before importing. import_from_path returns a Job you can optionally wait on.

# sift_py (deprecated)
from sift_py.data_import.csv import CsvUploadService
from sift_py.rest import SiftRestConfig

rest_conf: SiftRestConfig = {"uri": sift_uri, "apikey": apikey}
csv_service = CsvUploadService(rest_conf)
import_service = csv_service.simple_upload(asset_name="my_asset", path="data.csv")
import_service.wait_until_complete()

# sift_client
from sift_client import SiftClient

client = SiftClient(api_key=apikey, grpc_url=grpc_url, rest_url=rest_url)
job = client.data_import.import_from_path("data.csv", asset="my_asset")
job.wait_until_complete()

Format-by-format support:

  • CSV: auto-detected from .csv. Supports an optional JSON metadata row (row 1 or row 2) for specifying channel names, units, data types, and the time column format.
  • Parquet: requires an explicit data_type (PARQUET_FLATDATASET or PARQUET_SINGLE_CHANNEL_PER_ROW) since .parquet alone doesn't disambiguate the layout. Detection only reads the file footer, so it stays fast on large files.
  • HDF5: new in this release. Auto-detected from .h5 / .hdf5. Detection works out-of-the-box for files with a compound-dataset layout (first field = time) or a shared root-level time dataset; other layouts may need a hand-built config.
  • TDMS: new in this release. Auto-detected from .tdms and recognizes the common timing conventions (group-level xchannel, first-channel TimeStamp, or per-channel waveform properties). TdmsImportConfig controls handling of untimed channels (fallback_method), complex values (complex_component), scaled vs. raw data, and waveform start-time overrides.

Parquet as an Export Output Format

client.data_export.export(...) now accepts ExportOutputFormat.PARQUET alongside the existing CSV and Sun/WinPlot options. Unlike the sift_py DataService + DataFrame.to_parquet() pattern (async-only, buffers everything in memory, name-strings only), the new export API runs as a server-side job, works sync or async, accepts Asset/Channel objects or IDs, and scales to large exports.

# sift_py (deprecated): no dedicated export API, so query in-memory and write yourself
import pandas as pd
from sift_py.data.query import ChannelQuery, DataQuery
from sift_py.data.service import DataService
from sift_py.grpc.transport import use_sift_async_channel

async with use_sift_async_channel({"uri": sift_uri, "apikey": apikey}) as channel:
    result = await DataService(channel).execute(DataQuery(
        asset_name="my_asset",
        start_time=start,
        end_time=stop,
        channels=[ChannelQuery(channel_name="my_channel")],
    ))
    pd.DataFrame(result.all_channels()[0].columns()).to_parquet("out.parquet")

# sift_client
from sift_client import SiftClient
from sift_client.sift_types.export import ExportOutputFormat

client = SiftClient(api_key=apikey, grpc_url=grpc_url, rest_url=rest_url)
job = client.data_export.export(
    assets=["my_asset"],          # accepts Asset objects or IDs
    channels=["my_channel"],      # accepts Channel objects or IDs
    start_time=start,
    stop_time=stop,
    output_format=ExportOutputFormat.PARQUET,
)
files = job.wait_and_download(output_dir="./exports")

wait_and_download polls the job to completion, downloads the resulting archive, and returns the list of downloaded file paths. Both arguments shown are optional: if output_dir is omitted, a temporary directory is created and used. By default the downloaded zip is extracted (and the archive deleted) so you get the Parquet files directly; pass extract=False to keep the zip instead, which is useful if you want to hand the whole bundle off to another system without unpacking it client-side.

Test Result Logging

Test result create and update events can now be optionally written to a .jsonl log file during a test run, then replayed against the Sift API later via the new import-test-result-log CLI command (installed with the package).

Progress Indicators

Adds progress indicators for job polling and file downloads for better visibility during long-running operations.

Stop Bundling google.api, protoc_gen_openapiv2, and buf.validate

Previously the package shipped its own bundled copies of the generated Python bindings for google.api, protoc_gen_openapiv2, and buf.validate. With this release:

  • google.api and protoc_gen_openapiv2 are now pulled in via the googleapis-common-protos and protoc-gen-openapiv2 runtime dependencies, so pip installs the upstream-maintained versions.
  • buf.validate has been removed entirely. The protovalidate field annotations were stripped from the affected .proto files, so the generated _pb2.py files no longer reference buf.validate.

Bugfixes

  • Add py.typed to the generated proto directory so type checkers pick up protobuf types correctly.

Full Changelog

[v0.13.0] - March 24, 2026

What's New

SiftClient Export API

The sift_client module now exposes a public export interface and wrapper, making it easier to integrate with downstream tools and workflows.

Module Migration to sift_client

The grpc, rest, and time modules have been migrated from sift_py to sift_client. All imports from sift_py for these modules have been removed. This continues the transition toward sift_client as the primary interface ahead of the v1.0.0 release.

Arbitrary File Attachment Uploads

File attachment uploads now support arbitrary file types (e.g., Parquet, pcapng) by falling back to application/octet-stream for unknown MIME types.

Improved SiftChannelConfig URL Handling

SiftChannelConfig now handles URLs more robustly, fixing gRPC connection errors caused by missing URL schemes.

Streaming Ingestion Performance

The streaming ingestion module has been refactored to use sift_stream_bindings types for improved performance.

Full Changelog

[v0.12.0] - February 26, 2026

What's New

Channel Archiving/Unarchiving Support in SiftClient

SiftClient has been updated with the ability to archive and unarchive channels, allowing users to cleanup old channel definitions that are no longer desired.

Full Changelog

[v0.11.0] - February 24, 2026

What's New

Better Rule and Report Support in SiftClient

SiftClient has been updated to provide better support for RuleVersions and better ergonomics for Reports

  • Additional methods have been added to the SiftClient Rules resource to allow easy access to RuleVersions
  • Reports can now be generated from RuleVersions
  • The Jobs and Reports resources now have a wait_until_complete sync/async method, which provides a convenient way to wait until a job or report is complete, with configurable polling and timeout settings.
  • When creating reports, SiftClient now returns a Job object, to signify that the report is still in progress. Waiting on the job with wait_until_complete will return the completed report once finished. The report_id can also still be accessed directly from the job object if needed.

Full Changelog

[v0.10.1] - February 11, 2026

What's New

[v0.10.0] - January 30, 2026

DEPRECATION NOTICE

The sift_py module is deprecated as of v0.10.0 and will be removed in v1.0.0. Please use sift_client for all new development. Several minor releases will follow before the major release to add features and give users time to migrate.

What's New

[v0.9.6] - December 22, 2025

[v0.9.5] - December 12, 2025

[v0.9.4] - December 10, 2025

[v0.9.3] - December 4, 2025

[v0.9.2] - December 3, 2025

[v0.9.1] - August 18, 2025

[v0.9.0] - September 15, 2025

What's New

Parquet Upload Service

Adds support for uploading Parquet files to Sift for ingestion through the addition of the ParquetUploadService. See examples/data_import/parquet for an example of how to upload Parquet files.

TDMS File Metadata Support

Adds a flag to automatically import TDMS file metdadata into Sift Runs.

Lazy Flow Creation

Adds the ability to create flows lazily which reduces initialization time of the IngestService.

Full Changelog

[v0.8.4] - August 18, 2025

[v0.8.3] - August 11, 2025

[v0.8.2] - August 1, 2025

[v0.8.1] - July 31, 2025

[v0.8.0] - July 29, 2025

What's New

HDF5 Upload Service

Adds support for uploading HDF5 files to Sift for ingestion through the addition of the Hdf5UploadService and Hdf5Config. See examples/data_import/hdf5 for an example of how to upload HDF5 files.

Full Changelog

[v0.7.0] - June 24, 2025

What's New

AssetService and Metadata Support

Support for attaching metadata to a run or asset has been introduced through the addition of the AssetService and updates to the attach_run method of the IngestionService. See examples/assets for an example of updating metadata for an asset.

Full Changelog

[v0.6.2] - June 9, 2025

[v0.6.1] - June 2, 2025

[v0.6.0] - May 23, 2025

What's New

Rule Evaluation Service

The RuleEvaluationService has been introduced to provide functionality for evaluating rules and reports against runs or assets. This includes support for dry-run evaluations and previews, as well as evaluating rules defined in YAML configurations.

External Rules

The RuleService has been updated to support External Rules. External Rules are ideal for automated workflows such as CI/CD pipelines, where external Rules are managed and evaluated programmatically.

Read more about External Rules here here.

Full Changelog

[v0.5.3] - May 20, 2025

[v0.5.2] - April 23, 2025

[v0.5.1] - April 18, 2025

[v0.5.0] - March 31, 2025

What's New

Rosbag2 Uploads

A new RosbagsUploadService has been added to sift_py.data_import.rosbags, which provides the functionality for uploading rosbags, including video frames from a rosbag. See examples/data_import/rosbags for examples on how to use the new service.

Full Changelog

[v0.4.2] - March 4, 2025

[v0.4.1] - February 27, 2025

[v0.4.0] - February 21, 2025

What's New

Calculated Channels Service

A new CalculatedChannelService, in sift_py.calculated_channels.service, provides functionality for creating, updating, searching, and listing Calculated Channels. It also provides a YAML interface for defining Calculated Channels using a YAML file. Examples can be found in examples/calculated_channels.

Deprecation Notices

Channel Component field is being deprecated and will be removed in version 1.0.0. Please see https://docs.siftstack.com/docs/glossary#component

for more information.

Full Changelog

[v0.3.3] - January 24, 2025

[v0.3.2] - January 17, 2025

[v0.3.1] - January 8, 2025

[v0.3.0] - January 7, 2025

What's New

Report Template Service

There is now a ReportTemplateService that is available from sift_py.report_templates.service. This service allows for creating, updating, and retrieving report templates with a client key, where client keys are unique user defined strings associated with a given report template. Report templates may either by defined via a python config (using the ReportTemplateConfig class) or a YAML file, which follows the same schema. Please see examples/report_templates for some examples of how to manage report templates via python and YAML using the ReportTemplateService.

Rule Service

There is now a RuleService that is available from sift_py.rule.service. This service allows for creating, updating, and retrieving rules. Rules created through this service should now include a rule client key, a unique user defined string associated with a given rule. As before, rules may be defined in python (via the RuleConfigClass) or via YAML (following the RuleYamlSpec class). Please see examples/ingestion_with_yaml_config for an example of how to create rules from YAML using the RuleService.

Chapter 10 Support

There is now a base class, BaseCh10File, and an upload service, Ch10UploadService that allows for easily parsing and uploading IRIG Chapter 10 files. Please see examples/data_import/ch10 for an example of how to use these classes.

Other Improvements

Bugfixes

Deprecation Notices

The overwrite_rules option of IngestionService is going to be deprecated in the next release, and will emit a warning if set.

Full Changelog

[v0.2.2] - October 9, 2024

Summary of changes:

  • Loosen various dependency requirements

[v0.2.1] - September 26, 2024

Summary of changes:

[v0.2.0] - September 3, 2024

Summary of changes:

The following section will cover some of the more notable features in depth. Please refer to the documentation for even more detail.

Table of Contents

Keepalive

Long-lived connections are generally are at risk of being closed due to idle timeouts if they are idle for a particular duration. In other words, if there is no data being exchanged on the connection for a duration specified by a load balancer's idle timeout, the connection will be closed. In order to ensure that this does not happen you can opt into enabling the HTTP/2 PING-based keepalive mechanism either on your own gRPC channel or the one that Sift provides. To configure keep-alive using the Sift-provided gRPC channel:

from sift_py.grpc.transport import use_sift_channel, SiftChannelConfig

sift_channel_config = SiftChannelConfig(
    uri=uri,
    apikey=apikey,
    enable_keepalive=True,
)

with use_sift_channel(sift_channel_config) as channel:
    ...

This uses default values set in the sift_py.grpc.keepalive module. If you'd like to configure your own keepalive parameters you could also do the following:

from sift_py.grpc.transport import use_sift_channel, SiftChannelConfig
from sift_py.grpc.keepalive import KeepaliveConfig

sift_channel_config = SiftChannelConfig(
    uri=uri,
    apikey=apikey,
    enable_keepalive=KeepaliveConfig(
        keepalive_time_ms=keepalive_time_ms,
        keepalive_timeout_ms=keepalive_timeout_ms,
        keepalive_permit_without_calls=keepalive_permit_without_calls,
        max_pings_without_data=max_pings_without_data,
    ),
)

with use_sift_channel(sift_channel_config) as channel:
    ...

File attachments

There is now a sift_py.file_attachment module that contains utilities to programmatically upload and download file attachments. Files currently can be attached to various entities such as runs, annotations, and annotation logs. Various video and image formats are currently supported. Below is an example demonstrating how to upload a file attachment and programmatically downloading it.

from sift_py.grpc.transport import SiftChannelConfig, use_sift_channel
from sift_py.file_attachment.service import FileAttachmentService
from sift_py.file_attachment.entity import Entity, EntityType
from sift_py.file_attachment.metadata import VideoMetadata
from sift_py.rest import SiftRestConfig

from sift.remote_files.v1.remote_files_pb2 import GetRemoteFileRequest
from sift.remote_files.v1.remote_files_pb2_grpc import RemoteFileServiceStub

# Get API credentials setup
...

with use_sift_channel(sift_channel_config) as channel:
    file_attachment_service = FileAttachmentService(channel, rest_config)

    run = entity=Entity(
        entity_id=run_id, # some arbitrary run ID that refers to an existing run
        entity_type=EntityType.RUN,
    )

    # uploading the file attachment and attaching it to a run of `run_id`
    remote_file = file_attachment_service.upload_attachment(
        path="path/to/foo.mp4",
        entity=run,
        # Metatadata.. optional but recommended for optimal viewing in the application
        metadata=VideoMetadata(height=2160, width=3840, duration_seconds=5.5),
        description="thrusters getting too hot" ,
    )

    # retrieving all of the file attachments for our run
    all_file_attachments = file_attachment_service.retrieve_attachments(run)

    # downloading our file_attachment and saving it to our current working dir
    file_attachment_service.download_attachment(remote_file)

    # downloading our file_attachment and saving it somewhere else with a different name
    file_attachment_service.download_attachment(remote_file, "somewhere/else/foo.mp4")

    # deleting out file attachment from Sift
    file_attachment_service.delete_file_attachments(remote_file_1, remote_file_2, remote_file_etc)

Timer based flushing for buffered ingestion

Previously the buffered ingestion API flushes a buffer whenever any one of the following conditions are met:

  • The caller manually calls flush
  • The buffer gets filled
  • The with-block associated with the buffered ingestion service as a context-manager goes out of scope.
  • An exception is raised inside of the aforementioned with-block.

Note that the last two points only apply if the buffered ingestion service is used as a context manager like so:

with ingestion_service.buffered_ingestion() as buffered_ingestion:
    ...

Now there is support to periodically flush the buffer regardless of whether or not the buffer is filled. To configure a timer to flush periodically:

with ingestion_service.buffered_ingestion(flush_interval_sec=3.2) as buffered_ingestion:
    ...

This will configure the buffered ingestion service to flush its buffer every 3.2 seconds. If the buffer happens to be filled before the timer elapses, then the timer will reset.

Custom error handling for buffered ingestion

Previously when using the buffered ingestion service as a context manager, any exception that gets raised within the with-block will result in the service attempting to flush one more time. The caller can now customize error-handling behavior by passing in a function handler that takes in three arguments: the error that's raised, the buffer containing the remaining requests that weren't ingested, and a function that when called will attempt to flush the buffer.

# Custom code to run when error
def on_error_calback(err, buffer, flush):
    # Maybe try to save contents of buffer to disk
    ...
    # Try once more to flush the buffer
    flush()

with ingestion_service.buffered_ingestion(on_error=on_error_calback) as buffered_ingestion:
    ...

[v0.1.1] - July 17, 2024

Summary of changes:

  • Extend support for Python 3.8

[v0.1.0] - July 12, 2024

Summary of changes:

In addition to the changes above, documentation is now also available online:

[v0.1.0-rc.3] - July 3, 2024

Summary of changes:

The following are some gRPC error codes that can happen due to external factors that Sift doesn't directly control:

  • UNKNOWN
  • UNAVAILABLE
  • ABORTED
  • DEADLINE_EXCEEDED

They are the source of common disruptions, particularly during ingestion, and so this mechanism will automatically retry failed RPCs over an existing connection or will establish a new one if necessary.

[v0.1.0-rc.2] - July 1, 2024

Summary of changes:

For in-depth documentation please see the documentation section of the README for instructions on how to build the documentation locally.

Table of Contents

Combining Request Generation and Ingestion into a Single Step

Previously to ingest a single request you needed to do the following:

request = ingestion_service.try_create_ingestion_request(
    flow_name="logs",
    timestamp=timestamp,
    channel_values=[
        {
            "channel_name": "log",
            "value": string_value("some log"),
        },
    ],
)
ingestion_service.ingest(request)

Now you can combine both steps using either of the following APIs:

ingestion_service.try_ingest_flows({
    "flow_name": "log",
    "timestamp": timestamp,
    "channel_values": [
        {
            "channel_name": "log",
            "value": string_value("some string")
        },
    ],
})

You can also send multiple flows:

# Send data for both logs and readings flows
ingestion_service.try_ingest_flows(
    {
        "flow_name": "readings",
        "timestamp": datetime.now(timezone.utc),
        "channel_values": [
            {
                "channel_name": "velocity",
                "component": "mainmotor",
                "value": double_value(10),
            },
            {
                "channel_name": "voltage",
                "value": int32_value(5),
            },
            {
                "channel_name": "vehicle_state",
                "value": enum_value(2),
            },
        ],
    },
    {
        "flow_name": "logs",
        "timestamp": datetime.now(timezone.utc),
        "channel_values": [
            {
                "channel_name": "logs",
                "value": string_value("INFO: some message")
            },
        ],
    },
)

Request Buffering

Requests are now automatically buffered using the buffered ingestion API. Using this may significantly improve performance as it allows serialization and ingestion to occur in batches:

# Defaults to a buffer size of `sift_py.ingestion.buffer.DEFAULT_BUFFER_SIZE` requests.
with ingestion_service.buffered_ingestion() as buffered_ingestion:
    buffered_ingestion.try_ingest_flows(*lots_of_flows)
    buffered_ingestion.try_ingest_flows(*lots_more_flows)

# Custom buffer size of 750 requests
with ingestion_service.buffered_ingestion(750) as buffered_ingestion:
    buffered_ingestion.try_ingest_flows(*lots_of_flows)
    buffered_ingestion.try_ingest_flows(*lots_more_flows)

Once the with-block ends the remaining requests will be automatically flushed and ingested, but flushing may also be done manually:

with ingestion_service.buffered_ingestion() as buffered_ingestion:
    buffered_ingestion.try_ingest_flows(*lots_of_flows)
    buffered_ingestion.flush()

Contrast this with regular ingestion:

ingestion_service.try_ingest_flows(*lots_of_flows)
ingestion_service.try_ingest_flows(*lots_more_flows)

Creating New Flows on the Fly

If there is a flow you need to create on the fly which wasn't declared in your initial telemetry config, you may use either of the following APIs:

  • try_create_flow
  • create_flow
new_flow_config = FlowConfig(
    name="my_new_flow", channels=[ChannelConfig("new_channel", ChannelDataType.DOUBLE)]
)
ingestion_service.try_create_flow(new_flow_config)

Multi-config Ingestion

There is now an ergonomic utility class, IngestionServicesManager, that allows users to manage telemetry for multiple configs:

manager = IngestionServicesManager.from_telementry_configs(grpc_channel, {
    "config_a": config_a,
    "config_b": config_b,
})

with manager.ingestion_service("config_a") as config_a:
    config_a.try_ingest_flow(...)

with manager.ingestion_service("config_b") as config_b:
    config_b.try_ingest_flow(...)