diff --git a/python/lib/sift_py/asset/_internal/shared.py b/python/lib/sift_py/asset/_internal/shared.py index 7e503616a..c11f4ebc3 100644 --- a/python/lib/sift_py/asset/_internal/shared.py +++ b/python/lib/sift_py/asset/_internal/shared.py @@ -1,4 +1,4 @@ -from typing import List, Optional, cast +from typing import List, Optional, Tuple, Union, cast from sift.assets.v1.assets_pb2 import Asset, ListAssetsRequest, ListAssetsResponse from sift.assets.v1.assets_pb2_grpc import AssetServiceStub @@ -7,16 +7,16 @@ def list_assets_impl( _asset_service_stub: AssetServiceStub, - names: Optional[List[str]] = None, - ids: Optional[List[str]] = None, + names: Optional[Union[Tuple[str], List[str]]] = None, + ids: Optional[Union[Tuple[str], List[str]]] = None, ) -> List[Asset]: """ Lists assets in an organization. Args: _asset_service_stub: The asset service stub to use. - names: Optional list of names to filter by. - ids: Optional list of IDs to filter by. + names: Optional collection of names to filter by. + ids: Optional collection of IDs to filter by. Returns: A list of assets matching the criteria. diff --git a/python/lib/sift_py/rule/service.py b/python/lib/sift_py/rule/service.py index db2951725..6bcfbb2c9 100644 --- a/python/lib/sift_py/rule/service.py +++ b/python/lib/sift_py/rule/service.py @@ -1,13 +1,21 @@ from __future__ import annotations from dataclasses import dataclass + +try: + from functools import cache +except ImportError: + from functools import lru_cache as cache + from pathlib import Path -from typing import Any, Dict, List, Optional, Union, cast +from typing import Any, Dict, List, Optional, Tuple, Union, cast from sift.annotations.v1.annotations_pb2 import AnnotationType from sift.assets.v1.assets_pb2 import Asset from sift.assets.v1.assets_pb2_grpc import AssetServiceStub +from sift.channels.v3.channels_pb2 import Channel as ChannelPb from sift.channels.v3.channels_pb2_grpc import ChannelServiceStub +from sift.common.type.v1.user_pb2 import User as UserPb from sift.rules.v1.rules_pb2 import ( ANNOTATION, AnnotationActionConfiguration, @@ -53,18 +61,25 @@ class RuleService: """ A service for managing rules. Allows for loading rules from YAML and creating or updating them in the Sift API. + + Args: + channel: The configured Sift channel. + enable_caching: Enable caching on various API calls to speed up rule creation. Use this for short lived + instantiations of the RuleService where assets, channels, users are unlikely to change. """ _asset_service_stub: AssetServiceStub _channel_service_stub: ChannelServiceStub _rule_service_stub: RuleServiceStub _user_service_stub: UserServiceStub + _enable_caching: bool - def __init__(self, channel: SiftChannel): + def __init__(self, channel: SiftChannel, enable_caching=False): self._asset_service_stub = AssetServiceStub(channel) self._channel_service_stub = ChannelServiceStub(channel) self._rule_service_stub = RuleServiceStub(channel) self._user_service_stub = UserServiceStub(channel) + self._enable_caching = enable_caching def load_rules_from_yaml( self, @@ -401,8 +416,7 @@ def _update_req_from_rule_config( assignee = config.action.assignee user_id = None if assignee: - users = get_active_users( - user_service=self._user_service_stub, + users = self._get_active_users( filter=f"name=='{assignee}'", ) if not users: @@ -453,8 +467,7 @@ def _update_req_from_rule_config( # Validate channels are present within each asset for asset in assets: - found_channels = get_channels( - channel_service=self._channel_service_stub, + found_channels = self._get_channels( filter=f"asset_id == '{asset.asset_id}' && {name_in}", ) found_channels_names = [channel.name for channel in found_channels] @@ -598,8 +611,35 @@ def _get_rule_from_rule_id(self, rule_id: str) -> Optional[Rule]: return None def _get_assets(self, names: List[str] = [], ids: List[str] = []) -> List[Asset]: + if self._enable_caching: + return self._get_assets_cached(tuple(sorted(names)), tuple(sorted(ids))) + else: + return list_assets_impl(self._asset_service_stub, names, ids) + + def _get_channels(self, filter: str) -> List[ChannelPb]: + if self._enable_caching: + return self._get_channels_cached(filter) + else: + return get_channels(channel_service=self._channel_service_stub, filter=filter) + + def _get_active_users(self, filter: str) -> List[UserPb]: + if self._enable_caching: + return self._get_active_users_cached(filter) + else: + return get_active_users(user_service=self._user_service_stub, filter=filter) + + @cache + def _get_assets_cached(self, names: Tuple[str], ids: Tuple[str]) -> List[Asset]: return list_assets_impl(self._asset_service_stub, names, ids) + @cache + def _get_channels_cached(self, filter: str) -> List[ChannelPb]: + return get_channels(channel_service=self._channel_service_stub, filter=filter) + + @cache + def _get_active_users_cached(self, filter: str) -> List[UserPb]: + return get_active_users(user_service=self._user_service_stub, filter=filter) + @dataclass class RuleChannelReference: diff --git a/python/lib/sift_py/rule_evaluation/service.py b/python/lib/sift_py/rule_evaluation/service.py index d53431687..217ee4117 100644 --- a/python/lib/sift_py/rule_evaluation/service.py +++ b/python/lib/sift_py/rule_evaluation/service.py @@ -34,16 +34,21 @@ class RuleEvaluationService: """ A service for evaluating rules. Provides methods to evaluate rules and perform dry-run evaluations. + + Args: + enable_caching: Enable caching during rule evaluation. This is enabled by default. + This service is typically short lived in a workflows so assets, channels, and + users are unlikely to change during its lifetime to invalidate caches. """ _channel: SiftChannel _rule_evaluation_stub: RuleEvaluationServiceStub _rule_service: RuleService - def __init__(self, channel: SiftChannel): + def __init__(self, channel: SiftChannel, enable_caching: bool = True): self._channel = channel self._rule_evaluation_stub = RuleEvaluationServiceStub(channel) - self._rule_service = RuleService(channel) + self._rule_service = RuleService(channel, enable_caching=enable_caching) def evaluate_against_run( self,