-
Notifications
You must be signed in to change notification settings - Fork 6
Expand file tree
/
Copy pathpermit.py
More file actions
320 lines (260 loc) · 11 KB
/
permit.py
File metadata and controls
320 lines (260 loc) · 11 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
import json
from contextlib import contextmanager
from typing import Generator, Optional
from loguru import logger
from permit_datafilter.boolean_expression.schemas import ResidualPolicyResponse
from pydantic import NonNegativeFloat
from typing_extensions import Self
from .api.api_client import PermitApiClient
from .api.elements import ElementsApi
from .config import PermitConfig
from .enforcement.enforcer import (
Action,
AuthorizedUsersResult,
CheckQuery,
Enforcer,
Resource,
User,
)
from .logger import configure_logger
from .pdp_api.pdp_api_client import PermitPdpApiClient
from .utils.context import Context
class Permit:
def __init__(self, config: Optional[PermitConfig] = None, **options):
self._config: PermitConfig = (
config if config is not None else PermitConfig(**options)
)
configure_logger(self._config)
self._enforcer = Enforcer(self._config)
self._api = PermitApiClient(self._config)
self._elements = ElementsApi(self._config)
self._pdp_api = PermitPdpApiClient(self._config)
logger.debug(
"Permit SDK initialized with config:\n${}",
json.dumps(self._config.dict(exclude={"api_context"})),
)
@property
def config(self):
"""
Access the SDK configuration using this property.
Once the SDK is initialized, the configuration is read-only.
Usage example:
permit = Permit(config)
pdp_url = permit.config.pdp
"""
return self._config.copy()
@contextmanager
def wait_for_sync(self, timeout: float = 10.0) -> Generator[Self, None, None]:
"""
Context manager that returns a client that is configured
to wait for facts to be synced before proceeding.
Args:
timeout: The amount of time in seconds to wait for facts to be available in the PDP
cache before returning the response.
Yields:
Permit: A Permit instance that is configured to wait for facts to be synced.
See Also:
https://docs.permit.io/how-to/manage-data/local-facts-uploader
"""
if not self._config.proxy_facts_via_pdp:
logger.warning(
"Tried to wait for synced facts but proxy_facts_via_pdp is disabled, ignoring..."
)
yield self
return
contextualized_config = self.config # this copies the config
contextualized_config.facts_sync_timeout = timeout
yield self.__class__(contextualized_config)
@property
def api(self) -> PermitApiClient:
"""
Access the Permit REST API using this property.
Usage example:
permit = Permit(token="<YOUR_API_KEY>")
await permit.api.roles.create(...)
"""
return self._api
@property
def elements(self) -> ElementsApi:
"""
Access the Permit Elements API using this property.
Usage example:
permit = Permit(token="<YOUR_API_KEY>")
await permit.elements.loginAs(user, tenant)
"""
return self._elements
@property
def pdp_api(self) -> PermitPdpApiClient:
"""
Access the Permit PDP API using this property.
Usage example:
permit = Permit(token="<YOUR_API_KEY>")
await permit.pdp_api.role_assignments.list()
"""
return self._pdp_api
async def authorized_users(
self,
action: Action,
resource: Resource,
context: Context = {},
) -> AuthorizedUsersResult:
"""
Queries to get all the users that are authorized to perform an action on a resource within the specified context.
Args:
action: The action to be performed on the resource.
resource: The resource object representing the resource.
context: The context object representing the context in which the action is performed. Defaults to None.
Returns:
AuthorizedUsersResult: Contains all the authorized users and the role assignments that granted the permission.
Raises:
PermitConnectionError: If an error occurs while sending the authorization request to the PDP.
Examples:
# all the users that can close any issue?
await permit.authorized_users('close', 'issue')
# all the users that can close an issue who's id is 1234?
await permit.authorized_users('close', 'issue:1234')
# all the users that can close (any) issues belonging to the 't1' tenant?
# (in a multi tenant application)
await permit.authorized_users('close', {'type': 'issue', 'tenant': 't1'})
"""
return await self._enforcer.authorized_users(action, resource, context)
async def bulk_check(
self,
checks: list[CheckQuery],
context: Context = {},
) -> list[bool]:
"""
Checks if a user is authorized to perform an action on a list of resources within the specified context.
Args:
checks: A list of check queries, each query contain user, action, and resource.
context: The context object representing the context in which the action is performed. Defaults to None.
Returns:
list[bool]: A list of booleans indicating whether the user is authorized for each resource.
Raises:
PermitConnectionError: If an error occurs while sending the authorization request to the PDP.
Examples:
# Bulk query of multiple check conventions
await permit.bulk_check([
{
"user": user,
"action": "close",
"resource": {type: "issue", key: "1234"},
},
{
"user": {key: "user"},
"action": "close",
"resource": "issue:1235",
},
{
"user": "user_a",
"action": "close",
"resource": "issue",
},
])
"""
return await self._enforcer.bulk_check(checks, context)
async def check(
self,
user: User,
action: Action,
resource: Resource,
context: Context = {},
) -> bool:
"""
Checks if a user is authorized to perform an action on a resource within the specified context.
Args:
user: The user object representing the user.
action: The action to be performed on the resource.
resource: The resource object representing the resource.
context: The context object representing the context in which the action is performed. Defaults to None.
Returns:
bool: True if the user is authorized, False otherwise.
Raises:
PermitConnectionError: If an error occurs while sending the authorization request to the PDP.
Examples:
# can the user close any issue?
await permit.check(user, 'close', 'issue')
# can the user close any issue who's id is 1234?
await permit.check(user, 'close', 'issue:1234')
# can the user close (any) issues belonging to the 't1' tenant?
# (in a multi tenant application)
await permit.check(user, 'close', {'type': 'issue', 'tenant': 't1'})
"""
return await self._enforcer.check(user, action, resource, context)
async def filter_resources(
self,
user: User,
action: Action,
resource_type: str,
context: Context = {},
) -> ResidualPolicyResponse:
"""
Returns a filter that can be applied to the user database that filters all the resources a user can access given a user, action and resource type.
The filter is a residual policy compiled from OPA Rego AST and transformed to be expressed as a boolean expression
(combination of logical and comparison operators, where the operands can be variable references or literal values).
An example for a residual policy:
{
"type": "conditional",
"condition": {
"expression": {
"operator": "eq",
"operands": [
{
"variable": "input.resource.tenant"
},
{
"value": "082f6978-6424-4e05-a706-1ab6f26c3768"
}
]
}
}
}
The user can then map this residual policy into an SQL expression using various plugins.
Args:
user: The user object representing the user.
action: The action to be performed on the resource.
resource_type: The resource type.
context: The context object representing the context in which the action is performed. Defaults to None.
Returns:
ResidualPolicyResponse: a residual policy that can be transformed into an SQL expression.
Raises:
PermitConnectionError: If an error occurs while sending the authorization request to the PDP.
Examples:
from sqlalchemy.orm import declarative_base, relationship
from permit import Permit
from permit_datafilter.plugins.sqlalchemy import QueryBuilder
# assuming we have the following SQL tables:
Base = declarative_base()
class Tenant(Base):
__tablename__ = "tenant"
id = Column(String, primary_key=True)
key = Column(String(255))
class Task(Base):
__tablename__ = "task"
id = Column(String, primary_key=True)
created_at = Column(DateTime, default=datetime.utcnow())
updated_at = Column(DateTime)
description = Column(String(255))
tenant_id = Column(String, ForeignKey("tenant.id"))
tenant = relationship("Tenant", backref="tasks")
# this is how we can filter all the task records in the database
# that are readable by the user according to the authz policy
# (i.e: that user have the `task:read` permission on them)
permit = Permit(...)
authz_filter = await permit.filter_resources("john@doe.com", "read", "task")
query = (
QueryBuilder()
.select(Task)
.filter_by(authz_filter)
.map_references({
# if mapping a reference to a field on a related table
"input.resource.tenant": Tenant.key,
})
# you must specify how to perform a join against that table
.join(Tenant, Task.tenant_id == Tenant.id)
.build()
)
"""
return await self._enforcer.filter_resources(
user, action, resource_type, context
)