Skip to content

Commit a0ec457

Browse files
Python(feat): add integration and unit tests to sift_client (#329)
1 parent 3f64532 commit a0ec457

47 files changed

Lines changed: 5131 additions & 1483 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.githooks/pre-push

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,10 @@ GITHOOKS_DIR="$REPO_ROOT/.githooks"
88
python_changed_files=($(git diff --name-only --diff-filter=ACM | grep '^python/lib/sift_client/' || true))
99

1010
if [[ -n "$python_changed_files" ]]; then
11-
echo "Python files changed, running Python stub checks..."
11+
echo "Python files changed, running Python formatting and linting..."
12+
bash "$GITHOOKS_DIR/pre-push-python/fmt-lint.sh"
13+
14+
echo "Running Python stub checks..."
1215
bash "$GITHOOKS_DIR/pre-push-python/stubs.sh"
1316
fi
1417

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
#!/usr/bin/env bash
2+
3+
set -e
4+
5+
# Store the root directory of the repository
6+
REPO_ROOT="$(git rev-parse --show-toplevel)"
7+
PYTHON_DIR="$REPO_ROOT/python"
8+
9+
echo "Running Python formatting and linting with --fix..."
10+
11+
# Change to Python directory
12+
cd "$PYTHON_DIR"
13+
14+
# Run ruff format (formatter)
15+
echo "Running ruff format..."
16+
bash ./scripts/dev fmt
17+
18+
# Run ruff check with --fix (linter)
19+
echo "Running ruff check --fix..."
20+
bash ./scripts/dev lint-fix
21+
22+
# Check if any files were modified by formatting/linting
23+
cd "$REPO_ROOT"
24+
changed_files=$(git status --porcelain python/lib/sift_client/ | grep -E '\.py$' || true)
25+
26+
if [ -n "$changed_files" ]; then
27+
echo ""
28+
echo "ERROR: Formatting/linting made changes to the following files:"
29+
echo "$changed_files"
30+
echo ""
31+
echo "Please commit these changes before pushing."
32+
exit 1
33+
fi
34+
35+
echo "Python formatting and linting completed successfully."

.github/workflows/python_ci.yaml

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,11 @@ jobs:
2525
python-version: "3.8"
2626

2727
- name: Pip install
28+
id: install
2829
run: |
2930
python -m pip install --upgrade pip
3031
pip install '.[development,openssl,tdms,rosbags,hdf5,sift-stream]'
32+
3133
- name: Lint
3234
run: |
3335
ruff check
@@ -44,9 +46,17 @@ jobs:
4446
run: |
4547
pyright lib
4648
47-
- name: Pytest
49+
- name: Pytest Unit Tests
50+
run: |
51+
pytest -m "not integration"
52+
53+
- name: Pytest Integration Tests
54+
env:
55+
SIFT_GRPC_URI: ${{ vars.SIFT_GRPC_URI }}
56+
SIFT_REST_URI: ${{ vars.SIFT_REST_URI }}
57+
SIFT_API_KEY: ${{ secrets.SIFT_API_KEY }}
4858
run: |
49-
pytest
59+
pytest -m "integration"
5060
5161
- name: Sync Stubs Mypy
5262
working-directory: python/lib

python/lib/sift_client/_internal/low_level_wrappers/base.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ async def _handle_pagination(
2222
page_size: The number of results to return per page.
2323
page_token: The token to use for the next page.
2424
order_by: How to order the retrieved results.
25-
max_results: Maximum number of results to return. NOTE: Will be in increments of page_size or default page size defined by the call if no page_size is provided.
25+
max_results: Maximum number of results to return.
2626
2727
Returns:
2828
A list of all matching results.
@@ -31,6 +31,8 @@ async def _handle_pagination(
3131
kwargs = {}
3232

3333
results: list[Any] = []
34+
if max_results == 0:
35+
return results
3436
if page_token is None:
3537
page_token = ""
3638
while True:
@@ -45,4 +47,6 @@ async def _handle_pagination(
4547
results.extend(response)
4648
if page_token == "":
4749
break
50+
if max_results and len(results) > max_results:
51+
results = results[:max_results]
4852
return results

python/lib/sift_client/_internal/low_level_wrappers/ingestion.py

Lines changed: 26 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -18,17 +18,14 @@
1818
from queue import Queue
1919
from typing import TYPE_CHECKING, Any, cast
2020

21-
import sift_stream_bindings
2221
from sift.ingestion_configs.v2.ingestion_configs_pb2 import (
2322
GetIngestionConfigRequest,
2423
ListIngestionConfigFlowsResponse,
2524
ListIngestionConfigsRequest,
2625
ListIngestionConfigsResponse,
2726
)
28-
from sift.ingestion_configs.v2.ingestion_configs_pb2_grpc import IngestionConfigServiceStub
29-
from sift_stream_bindings import (
30-
IngestionConfigFormPy,
31-
IngestWithConfigDataStreamRequestPy,
27+
from sift.ingestion_configs.v2.ingestion_configs_pb2_grpc import (
28+
IngestionConfigServiceStub,
3229
)
3330

3431
from sift_client._internal.low_level_wrappers.base import (
@@ -44,6 +41,12 @@
4441
if TYPE_CHECKING:
4542
from datetime import datetime
4643

44+
from sift_stream_bindings import (
45+
IngestionConfigFormPy,
46+
IngestWithConfigDataStreamRequestPy,
47+
SiftStreamBuilderPy,
48+
)
49+
4750

4851
class IngestionThread(threading.Thread):
4952
"""Manages ingestion for a single ingestion config."""
@@ -54,7 +57,7 @@ class IngestionThread(threading.Thread):
5457

5558
def __init__(
5659
self,
57-
sift_stream_builder: sift_stream_bindings.SiftStreamBuilderPy,
60+
sift_stream_builder: SiftStreamBuilderPy,
5861
data_queue: Queue,
5962
ingestion_config: IngestionConfigFormPy,
6063
no_data_timeout: int = 1,
@@ -154,7 +157,7 @@ class IngestionLowLevelClient(LowLevelClientBase, WithGrpcClient):
154157

155158
CacheEntry = namedtuple("CacheEntry", ["data_queue", "ingestion_config", "thread"])
156159

157-
sift_stream_builder: sift_stream_bindings.SiftStreamBuilderPy
160+
sift_stream_builder: SiftStreamBuilderPy
158161
stream_cache: dict[str, CacheEntry]
159162

160163
def __init__(self, grpc_client: GrpcClient):
@@ -163,21 +166,25 @@ def __init__(self, grpc_client: GrpcClient):
163166
Args:
164167
grpc_client: The gRPC client to use for making API calls.
165168
"""
169+
from sift_stream_bindings import (
170+
RecoveryStrategyPy,
171+
RetryPolicyPy,
172+
SiftStreamBuilderPy,
173+
)
174+
166175
super().__init__(grpc_client=grpc_client)
167176
# Rust GRPC client expects URI to have http(s):// prefix.
168177
uri = grpc_client._config.uri
169178
if not uri.startswith("http"):
170179
uri = f"https://{uri}" if grpc_client._config.use_ssl else f"http://{uri}"
171-
self.sift_stream_builder = sift_stream_bindings.SiftStreamBuilderPy(
180+
self.sift_stream_builder = SiftStreamBuilderPy(
172181
uri=uri,
173182
apikey=grpc_client._config.api_key,
174183
)
175184
self.sift_stream_builder.enable_tls = grpc_client._config.use_ssl
176185
# FD-177: Expose configuration for recovery strategy.
177-
self.sift_stream_builder.recovery_strategy = (
178-
sift_stream_bindings.RecoveryStrategyPy.retry_only(
179-
sift_stream_bindings.RetryPolicyPy.default()
180-
)
186+
self.sift_stream_builder.recovery_strategy = RecoveryStrategyPy.retry_only(
187+
RetryPolicyPy.default()
181188
)
182189
self.stream_cache = {}
183190

@@ -229,7 +236,9 @@ async def get_ingestion_config_id_from_client_key(self, client_key: str) -> str
229236
return ingestion_configs[0].id_
230237

231238
def _new_ingestion_thread(
232-
self, ingestion_config_id: str, ingestion_config: IngestionConfigFormPy | None = None
239+
self,
240+
ingestion_config_id: str,
241+
ingestion_config: IngestionConfigFormPy | None = None,
233242
):
234243
"""Start a new ingestion thread.
235244
This allows ingestion to happen in the background regardless of if the user is using the sync or async client
@@ -290,7 +299,6 @@ async def create_ingestion_config(
290299
asset_name: str,
291300
flows: list[Flow],
292301
client_key: str | None = None,
293-
organization_id: str | None = None,
294302
) -> str:
295303
"""Create an ingestion config.
296304
@@ -303,6 +311,8 @@ async def create_ingestion_config(
303311
Returns:
304312
The id of the new or found ingestion config.
305313
"""
314+
from sift_stream_bindings import IngestionConfigFormPy
315+
306316
ingestion_config_id = None
307317
if client_key:
308318
logger.debug(f"Getting ingestion config id for client key {client_key}")
@@ -381,6 +391,8 @@ def ingest_flow(
381391
channel_values: The channel values to ingest.
382392
organization_id: The organization id to use for ingestion. Only relevant if the user is part of several organizations.
383393
"""
394+
from sift_stream_bindings import IngestWithConfigDataStreamRequestPy
395+
384396
if not flow.ingestion_config_id:
385397
raise ValueError(
386398
"Flow has no ingestion config id -- have you created an ingestion config for this flow?"

python/lib/sift_client/_internal/low_level_wrappers/rules.py

Lines changed: 34 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,8 @@ async def create_rule(
135135
)
136136
conditions_request = [
137137
UpdateConditionRequest(
138-
expression=expression_proto, actions=[create.action._to_update_request()]
138+
expression=expression_proto,
139+
actions=[create.action._to_update_request()],
139140
)
140141
]
141142
update_request = UpdateRuleRequest(
@@ -183,9 +184,7 @@ def _update_rule_request_from_update(
183184
"asset_tag_ids",
184185
]
185186
# Need to manually copy fields that will be reset even if not provided in update dict.
186-
copy_unset_fields = [
187-
"description",
188-
]
187+
copy_unset_fields = ["description", "name"]
189188

190189
# Populate the trivial fields first.
191190
update_dict.update(
@@ -214,15 +213,17 @@ def _update_rule_request_from_update(
214213
"Expression and channel_references must both be provided or both be None"
215214
)
216215
expression_proto = RuleConditionExpression(
217-
calculated_channel=CalculatedChannelConfig(
218-
expression=expression,
219-
channel_references={
220-
c.channel_reference: ChannelReferenceProto(name=c.channel_identifier)
221-
for c in channel_references
222-
},
216+
calculated_channel=(
217+
CalculatedChannelConfig(
218+
expression=expression,
219+
channel_references={
220+
c.channel_reference: ChannelReferenceProto(name=c.channel_identifier)
221+
for c in channel_references
222+
},
223+
)
224+
if expression
225+
else None
223226
)
224-
if expression
225-
else None
226227
)
227228
conditions_request = [
228229
UpdateConditionRequest(
@@ -238,10 +239,10 @@ def _update_rule_request_from_update(
238239

239240
# This always needs to be set, so handle the defaults.
240241
update_dict["asset_configuration"] = RuleAssetConfiguration( # type: ignore
241-
asset_ids=update.asset_ids if "asset_ids" in model_dump else rule.asset_ids or [],
242-
tag_ids=update.asset_tag_ids
243-
if "asset_tag_ids" in model_dump
244-
else rule.asset_tag_ids or [],
242+
asset_ids=(update.asset_ids if "asset_ids" in model_dump else rule.asset_ids or []),
243+
tag_ids=(
244+
update.asset_tag_ids if "asset_tag_ids" in model_dump else rule.asset_tag_ids or []
245+
),
245246
)
246247

247248
update_request = UpdateRuleRequest(
@@ -254,7 +255,7 @@ def _update_rule_request_from_update(
254255
async def update_rule(
255256
self, rule: Rule, update: RuleUpdate, version_notes: str | None = None
256257
) -> Rule:
257-
"""Update a rule.
258+
"""Update a rule. Also handles archive/unarchive to behave similar to other low-level clients.
258259
259260
Args:
260261
rule: The rule to update.
@@ -264,14 +265,26 @@ async def update_rule(
264265
Returns:
265266
The updated Rule.
266267
"""
268+
269+
should_update_archive = "is_archived" in update.model_fields_set
270+
267271
update.resource_id = rule.id_
272+
if not should_update_archive or (
273+
should_update_archive and len(update.model_fields_set) > 1
274+
):
275+
update_request = self._update_rule_request_from_update(rule, update, version_notes)
276+
277+
response = await self._grpc_client.get_stub(RuleServiceStub).UpdateRule(update_request)
278+
_ = cast("UpdateRuleResponse", response)
268279

269-
update_request = self._update_rule_request_from_update(rule, update, version_notes)
280+
if should_update_archive:
281+
if update.is_archived:
282+
await self.archive_rule(rule_id=rule.id_)
283+
else:
284+
await self.unarchive_rule(rule_id=rule.id_)
270285

271-
response = await self._grpc_client.get_stub(RuleServiceStub).UpdateRule(update_request)
272-
updated_grpc_rule = cast("UpdateRuleResponse", response)
273286
# Get the updated rule
274-
return await self.get_rule(rule_id=updated_grpc_rule.rule_id)
287+
return await self.get_rule(rule_id=rule.id_)
275288

276289
async def batch_update_rules(self, rules: list[RuleUpdate]) -> BatchUpdateRulesResponse:
277290
"""Batch update rules.

python/lib/sift_client/_tests/integrated/__init__.py renamed to python/lib/sift_client/_tests/_internal/low_level_wrappers/__init__.py

File renamed without changes.

0 commit comments

Comments
 (0)