From 50b130809f3e58773a410133ee9acbdacd6f1a29 Mon Sep 17 00:00:00 2001 From: Marc Julien Date: Fri, 12 Dec 2025 09:40:49 -0800 Subject: [PATCH 1/3] python(feat): Reduce API calls when creating rules with caching. This is optional and only enabled by default for rule evaluation. --- python/lib/sift_py/asset/_internal/shared.py | 10 ++-- python/lib/sift_py/rule/service.py | 47 ++++++++++++++++--- python/lib/sift_py/rule_evaluation/service.py | 6 ++- 3 files changed, 51 insertions(+), 12 deletions(-) diff --git a/python/lib/sift_py/asset/_internal/shared.py b/python/lib/sift_py/asset/_internal/shared.py index 7e503616a..c11f4ebc3 100644 --- a/python/lib/sift_py/asset/_internal/shared.py +++ b/python/lib/sift_py/asset/_internal/shared.py @@ -1,4 +1,4 @@ -from typing import List, Optional, cast +from typing import List, Optional, Tuple, Union, cast from sift.assets.v1.assets_pb2 import Asset, ListAssetsRequest, ListAssetsResponse from sift.assets.v1.assets_pb2_grpc import AssetServiceStub @@ -7,16 +7,16 @@ def list_assets_impl( _asset_service_stub: AssetServiceStub, - names: Optional[List[str]] = None, - ids: Optional[List[str]] = None, + names: Optional[Union[Tuple[str], List[str]]] = None, + ids: Optional[Union[Tuple[str], List[str]]] = None, ) -> List[Asset]: """ Lists assets in an organization. Args: _asset_service_stub: The asset service stub to use. - names: Optional list of names to filter by. - ids: Optional list of IDs to filter by. + names: Optional collection of names to filter by. + ids: Optional collection of IDs to filter by. Returns: A list of assets matching the criteria. diff --git a/python/lib/sift_py/rule/service.py b/python/lib/sift_py/rule/service.py index db2951725..8b9eefaae 100644 --- a/python/lib/sift_py/rule/service.py +++ b/python/lib/sift_py/rule/service.py @@ -1,13 +1,16 @@ from __future__ import annotations from dataclasses import dataclass +from functools import cache from pathlib import Path -from typing import Any, Dict, List, Optional, Union, cast +from typing import Any, Dict, List, Optional, Tuple, Union, cast from sift.annotations.v1.annotations_pb2 import AnnotationType from sift.assets.v1.assets_pb2 import Asset from sift.assets.v1.assets_pb2_grpc import AssetServiceStub +from sift.channels.v3.channels_pb2 import Channel as ChannelPb from sift.channels.v3.channels_pb2_grpc import ChannelServiceStub +from sift.common.type.v1.user_pb2 import User as UserPb from sift.rules.v1.rules_pb2 import ( ANNOTATION, AnnotationActionConfiguration, @@ -53,18 +56,25 @@ class RuleService: """ A service for managing rules. Allows for loading rules from YAML and creating or updating them in the Sift API. + + Args: + channel: The configured Sift channel. + enable_caching: Enable caching on various API calls to speed up rule creation. Use this for short lived + instantiations of the RuleService where assets, channels, users are unlikely to change. """ _asset_service_stub: AssetServiceStub _channel_service_stub: ChannelServiceStub _rule_service_stub: RuleServiceStub _user_service_stub: UserServiceStub + _enable_caching: bool - def __init__(self, channel: SiftChannel): + def __init__(self, channel: SiftChannel, enable_caching=False): self._asset_service_stub = AssetServiceStub(channel) self._channel_service_stub = ChannelServiceStub(channel) self._rule_service_stub = RuleServiceStub(channel) self._user_service_stub = UserServiceStub(channel) + self._enable_caching = enable_caching def load_rules_from_yaml( self, @@ -401,8 +411,7 @@ def _update_req_from_rule_config( assignee = config.action.assignee user_id = None if assignee: - users = get_active_users( - user_service=self._user_service_stub, + users = self._get_active_users( filter=f"name=='{assignee}'", ) if not users: @@ -453,8 +462,7 @@ def _update_req_from_rule_config( # Validate channels are present within each asset for asset in assets: - found_channels = get_channels( - channel_service=self._channel_service_stub, + found_channels = self._get_channels( filter=f"asset_id == '{asset.asset_id}' && {name_in}", ) found_channels_names = [channel.name for channel in found_channels] @@ -598,8 +606,35 @@ def _get_rule_from_rule_id(self, rule_id: str) -> Optional[Rule]: return None def _get_assets(self, names: List[str] = [], ids: List[str] = []) -> List[Asset]: + if self._enable_caching: + return self._get_assets_cached(tuple(sorted(names)), tuple(sorted(ids))) + else: + return list_assets_impl(self._asset_service_stub, names, ids) + + def _get_channels(self, filter: str) -> List[ChannelPb]: + if self._enable_caching: + return self._get_channels_cached(filter) + else: + return get_channels(channel_service=self._channel_service_stub, filter=filter) + + def _get_active_users(self, filter: str) -> List[UserPb]: + if self._enable_caching: + return self._get_active_users_cached(filter) + else: + return get_active_users(user_service=self._user_service_stub, filter=filter) + + @cache + def _get_assets_cached(self, names: Tuple[str], ids: Tuple[str]) -> List[Asset]: return list_assets_impl(self._asset_service_stub, names, ids) + @cache + def _get_channels_cached(self, filter: str) -> List[ChannelPb]: + return get_channels(channel_service=self._channel_service_stub, filter=filter) + + @cache + def _get_active_users_cached(self, filter: str) -> List[UserPb]: + return get_active_users(user_service=self._user_service_stub, filter=filter) + @dataclass class RuleChannelReference: diff --git a/python/lib/sift_py/rule_evaluation/service.py b/python/lib/sift_py/rule_evaluation/service.py index d53431687..bc6a0acd5 100644 --- a/python/lib/sift_py/rule_evaluation/service.py +++ b/python/lib/sift_py/rule_evaluation/service.py @@ -43,7 +43,11 @@ class RuleEvaluationService: def __init__(self, channel: SiftChannel): self._channel = channel self._rule_evaluation_stub = RuleEvaluationServiceStub(channel) - self._rule_service = RuleService(channel) + + # Enable caching during rule evaluation. This service is typically + # short lived in a workflow so assets, channels, and users are unlikely + # to change during its lifetime to invalidate caches. + self._rule_service = RuleService(channel, enable_caching=True) def evaluate_against_run( self, From 6b3bb9af3028dfb7ddfa4ab9266997d8a4d8993b Mon Sep 17 00:00:00 2001 From: Marc Julien Date: Fri, 12 Dec 2025 09:49:47 -0800 Subject: [PATCH 2/3] Fallback to lru_cache in python 3.8 --- python/lib/sift_py/rule/service.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/python/lib/sift_py/rule/service.py b/python/lib/sift_py/rule/service.py index 8b9eefaae..6bcfbb2c9 100644 --- a/python/lib/sift_py/rule/service.py +++ b/python/lib/sift_py/rule/service.py @@ -1,7 +1,12 @@ from __future__ import annotations from dataclasses import dataclass -from functools import cache + +try: + from functools import cache +except ImportError: + from functools import lru_cache as cache + from pathlib import Path from typing import Any, Dict, List, Optional, Tuple, Union, cast From 884144a5ebd74de55d023c8a37bd42e03a11148d Mon Sep 17 00:00:00 2001 From: Marc Julien Date: Fri, 12 Dec 2025 10:17:52 -0800 Subject: [PATCH 3/3] Expose enable_caching in RuleEvaluationService --- python/lib/sift_py/rule_evaluation/service.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/python/lib/sift_py/rule_evaluation/service.py b/python/lib/sift_py/rule_evaluation/service.py index bc6a0acd5..217ee4117 100644 --- a/python/lib/sift_py/rule_evaluation/service.py +++ b/python/lib/sift_py/rule_evaluation/service.py @@ -34,20 +34,21 @@ class RuleEvaluationService: """ A service for evaluating rules. Provides methods to evaluate rules and perform dry-run evaluations. + + Args: + enable_caching: Enable caching during rule evaluation. This is enabled by default. + This service is typically short lived in a workflows so assets, channels, and + users are unlikely to change during its lifetime to invalidate caches. """ _channel: SiftChannel _rule_evaluation_stub: RuleEvaluationServiceStub _rule_service: RuleService - def __init__(self, channel: SiftChannel): + def __init__(self, channel: SiftChannel, enable_caching: bool = True): self._channel = channel self._rule_evaluation_stub = RuleEvaluationServiceStub(channel) - - # Enable caching during rule evaluation. This service is typically - # short lived in a workflow so assets, channels, and users are unlikely - # to change during its lifetime to invalidate caches. - self._rule_service = RuleService(channel, enable_caching=True) + self._rule_service = RuleService(channel, enable_caching=enable_caching) def evaluate_against_run( self,