Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions python/lib/sift_py/asset/_internal/shared.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import List, Optional, cast
from typing import List, Optional, Tuple, Union, cast

from sift.assets.v1.assets_pb2 import Asset, ListAssetsRequest, ListAssetsResponse
from sift.assets.v1.assets_pb2_grpc import AssetServiceStub
Expand All @@ -7,16 +7,16 @@

def list_assets_impl(
_asset_service_stub: AssetServiceStub,
names: Optional[List[str]] = None,
ids: Optional[List[str]] = None,
names: Optional[Union[Tuple[str], List[str]]] = None,
ids: Optional[Union[Tuple[str], List[str]]] = None,
) -> List[Asset]:
"""
Lists assets in an organization.

Args:
_asset_service_stub: The asset service stub to use.
names: Optional list of names to filter by.
ids: Optional list of IDs to filter by.
names: Optional collection of names to filter by.
ids: Optional collection of IDs to filter by.

Returns:
A list of assets matching the criteria.
Expand Down
52 changes: 46 additions & 6 deletions python/lib/sift_py/rule/service.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,21 @@
from __future__ import annotations

from dataclasses import dataclass

try:
from functools import cache
except ImportError:
from functools import lru_cache as cache

from pathlib import Path
from typing import Any, Dict, List, Optional, Union, cast
from typing import Any, Dict, List, Optional, Tuple, Union, cast

from sift.annotations.v1.annotations_pb2 import AnnotationType
from sift.assets.v1.assets_pb2 import Asset
from sift.assets.v1.assets_pb2_grpc import AssetServiceStub
from sift.channels.v3.channels_pb2 import Channel as ChannelPb
from sift.channels.v3.channels_pb2_grpc import ChannelServiceStub
from sift.common.type.v1.user_pb2 import User as UserPb
from sift.rules.v1.rules_pb2 import (
ANNOTATION,
AnnotationActionConfiguration,
Expand Down Expand Up @@ -53,18 +61,25 @@
class RuleService:
"""
A service for managing rules. Allows for loading rules from YAML and creating or updating them in the Sift API.

Args:
channel: The configured Sift channel.
enable_caching: Enable caching on various API calls to speed up rule creation. Use this for short lived
instantiations of the RuleService where assets, channels, users are unlikely to change.
"""

_asset_service_stub: AssetServiceStub
_channel_service_stub: ChannelServiceStub
_rule_service_stub: RuleServiceStub
_user_service_stub: UserServiceStub
_enable_caching: bool

def __init__(self, channel: SiftChannel):
def __init__(self, channel: SiftChannel, enable_caching=False):
self._asset_service_stub = AssetServiceStub(channel)
self._channel_service_stub = ChannelServiceStub(channel)
self._rule_service_stub = RuleServiceStub(channel)
self._user_service_stub = UserServiceStub(channel)
self._enable_caching = enable_caching

def load_rules_from_yaml(
self,
Expand Down Expand Up @@ -401,8 +416,7 @@ def _update_req_from_rule_config(
assignee = config.action.assignee
user_id = None
if assignee:
users = get_active_users(
user_service=self._user_service_stub,
users = self._get_active_users(
filter=f"name=='{assignee}'",
)
if not users:
Expand Down Expand Up @@ -453,8 +467,7 @@ def _update_req_from_rule_config(

# Validate channels are present within each asset
for asset in assets:
found_channels = get_channels(
channel_service=self._channel_service_stub,
found_channels = self._get_channels(
filter=f"asset_id == '{asset.asset_id}' && {name_in}",
)
found_channels_names = [channel.name for channel in found_channels]
Expand Down Expand Up @@ -598,8 +611,35 @@ def _get_rule_from_rule_id(self, rule_id: str) -> Optional[Rule]:
return None

def _get_assets(self, names: List[str] = [], ids: List[str] = []) -> List[Asset]:
if self._enable_caching:
return self._get_assets_cached(tuple(sorted(names)), tuple(sorted(ids)))
else:
return list_assets_impl(self._asset_service_stub, names, ids)

def _get_channels(self, filter: str) -> List[ChannelPb]:
if self._enable_caching:
return self._get_channels_cached(filter)
else:
return get_channels(channel_service=self._channel_service_stub, filter=filter)

def _get_active_users(self, filter: str) -> List[UserPb]:
if self._enable_caching:
return self._get_active_users_cached(filter)
else:
return get_active_users(user_service=self._user_service_stub, filter=filter)

@cache
def _get_assets_cached(self, names: Tuple[str], ids: Tuple[str]) -> List[Asset]:
return list_assets_impl(self._asset_service_stub, names, ids)

@cache
def _get_channels_cached(self, filter: str) -> List[ChannelPb]:
return get_channels(channel_service=self._channel_service_stub, filter=filter)

@cache
def _get_active_users_cached(self, filter: str) -> List[UserPb]:
return get_active_users(user_service=self._user_service_stub, filter=filter)


@dataclass
class RuleChannelReference:
Expand Down
9 changes: 7 additions & 2 deletions python/lib/sift_py/rule_evaluation/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,21 @@
class RuleEvaluationService:
"""
A service for evaluating rules. Provides methods to evaluate rules and perform dry-run evaluations.

Args:
enable_caching: Enable caching during rule evaluation. This is enabled by default.
This service is typically short lived in a workflows so assets, channels, and
users are unlikely to change during its lifetime to invalidate caches.
"""

_channel: SiftChannel
_rule_evaluation_stub: RuleEvaluationServiceStub
_rule_service: RuleService

def __init__(self, channel: SiftChannel):
def __init__(self, channel: SiftChannel, enable_caching: bool = True):
self._channel = channel
self._rule_evaluation_stub = RuleEvaluationServiceStub(channel)
self._rule_service = RuleService(channel)
self._rule_service = RuleService(channel, enable_caching=enable_caching)

def evaluate_against_run(
self,
Expand Down
Loading