-
Notifications
You must be signed in to change notification settings - Fork 8
Expand file tree
/
Copy pathpolicies.py
More file actions
217 lines (181 loc) · 7.62 KB
/
policies.py
File metadata and controls
217 lines (181 loc) · 7.62 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
from __future__ import annotations
import logging
from typing import TYPE_CHECKING, Any, cast
from sift.policies.v1.policies_pb2 import (
ArchivePolicyRequest,
ArchivePolicyResponse,
CreatePolicyResponse,
GetPolicyRequest,
GetPolicyResponse,
ListPoliciesRequest,
ListPoliciesResponse,
UpdatePolicyRequest,
UpdatePolicyResponse,
)
from sift.policies.v1.policies_pb2_grpc import PolicyServiceStub
from sift_client._internal.low_level_wrappers.base import LowLevelClientBase
from sift_client.sift_types.policies import Policy, PolicyCreate, PolicyUpdate
from sift_client.transport import WithGrpcClient
if TYPE_CHECKING:
from sift_client.transport.grpc_transport import GrpcClient
# Configure logging
logger = logging.getLogger(__name__)
class PoliciesLowLevelClient(LowLevelClientBase, WithGrpcClient):
"""Low-level client for the PolicyService.
This class provides a thin wrapper around the autogenerated bindings for the PolicyService.
"""
def __init__(self, grpc_client: GrpcClient):
"""Initialize the PoliciesLowLevelClient.
Args:
grpc_client: The gRPC client to use for making API calls.
"""
super().__init__(grpc_client)
async def create_policy(
self,
name: str,
cedar_policy: str,
description: str | None = None,
version_notes: str | None = None,
) -> Policy:
"""Create a new policy.
Args:
name: The name of the policy.
cedar_policy: The Cedar policy string.
description: Optional description.
version_notes: Optional version notes.
Returns:
The created Policy.
"""
create = PolicyCreate(
name=name,
cedar_policy=cedar_policy,
description=description,
version_notes=version_notes,
)
request = create.to_proto()
response = await self._grpc_client.get_stub(PolicyServiceStub).CreatePolicy(request)
grpc_policy = cast("CreatePolicyResponse", response).policy
return Policy._from_proto(grpc_policy)
async def get_policy(self, policy_id: str) -> Policy:
"""Get a policy by ID.
Args:
policy_id: The policy ID.
Returns:
The Policy.
"""
request = GetPolicyRequest(policy_id=policy_id)
response = await self._grpc_client.get_stub(PolicyServiceStub).GetPolicy(request)
grpc_policy = cast("GetPolicyResponse", response).policy
return Policy._from_proto(grpc_policy)
async def list_policies(
self,
*,
page_size: int | None = None,
page_token: str | None = None,
query_filter: str | None = None,
order_by: str | None = None,
include_archived: bool = False,
) -> tuple[list[Policy], str]:
"""List policies with optional filtering and pagination.
Args:
page_size: The maximum number of policies to return.
page_token: A page token for pagination.
query_filter: A CEL filter string.
order_by: How to order the retrieved policies.
include_archived: Whether to include archived policies.
Returns:
A tuple of (policies, next_page_token).
"""
request_kwargs: dict[str, Any] = {
"include_archived": include_archived,
}
if page_size is not None:
request_kwargs["page_size"] = page_size
if page_token is not None:
request_kwargs["page_token"] = page_token
if query_filter is not None:
request_kwargs["filter"] = query_filter
if order_by is not None:
request_kwargs["order_by"] = order_by
request = ListPoliciesRequest(**request_kwargs)
response = await self._grpc_client.get_stub(PolicyServiceStub).ListPolicies(request)
response = cast("ListPoliciesResponse", response)
policies = [Policy._from_proto(policy) for policy in response.policies]
return policies, response.next_page_token
async def list_all_policies(
self,
*,
query_filter: str | None = None,
order_by: str | None = None,
include_archived: bool = False,
max_results: int | None = None,
) -> list[Policy]:
"""List all policies with optional filtering.
Args:
query_filter: A CEL filter string.
order_by: How to order the retrieved policies.
include_archived: Whether to include archived policies.
max_results: Maximum number of results to return.
Returns:
A list of all matching policies.
"""
return await self._handle_pagination(
self.list_policies,
kwargs={"include_archived": include_archived, "query_filter": query_filter},
order_by=order_by,
max_results=max_results,
)
async def update_policy(
self,
policy: str | Policy,
update: PolicyUpdate | dict,
version_notes: str | None = None,
) -> Policy:
"""Update a policy.
Args:
policy: The Policy or policy ID to update.
update: Updates to apply to the policy.
version_notes: Optional version notes for the update.
Returns:
The updated Policy.
"""
policy_id = policy._id_or_error if isinstance(policy, Policy) else policy
if isinstance(update, dict):
update = PolicyUpdate.model_validate(update)
update.resource_id = policy_id
# Get current policy to build full proto
current_policy = await self.get_policy(policy_id)
proto, mask = update.to_proto_with_mask()
# Copy current values for fields not being updated
if "name" not in mask.paths: # type: ignore[attr-defined]
proto.name = current_policy.name
if "description" not in mask.paths: # type: ignore[attr-defined]
if current_policy.description:
proto.description = current_policy.description
if "configuration.cedar_policy" not in mask.paths: # type: ignore[attr-defined]
proto.configuration.cedar_policy = current_policy.cedar_policy
# Copy read-only fields from current policy (required by backend validation)
proto.organization_id = current_policy.organization_id
proto.created_by_user_id = current_policy.created_by_user_id
proto.modified_by_user_id = current_policy.modified_by_user_id
proto.policy_version_id = current_policy.policy_version_id
if current_policy.proto is not None:
proto.created_date.CopyFrom(current_policy.proto.created_date) # type: ignore[attr-defined]
proto.modified_date.CopyFrom(current_policy.proto.modified_date) # type: ignore[attr-defined]
request = UpdatePolicyRequest(policy=proto, update_mask=mask)
if version_notes is not None:
request.version_notes = version_notes
response = await self._grpc_client.get_stub(PolicyServiceStub).UpdatePolicy(request)
grpc_policy = cast("UpdatePolicyResponse", response).policy
return Policy._from_proto(grpc_policy)
async def archive_policy(self, policy_id: str) -> Policy:
"""Archive a policy.
Args:
policy_id: The policy ID to archive.
Returns:
The archived Policy.
"""
request = ArchivePolicyRequest(policy_id=policy_id)
response = await self._grpc_client.get_stub(PolicyServiceStub).ArchivePolicy(request)
grpc_policy = cast("ArchivePolicyResponse", response).policy
return Policy._from_proto(grpc_policy)