Skip to content

Commit b344d14

Browse files
FGA pt 2
1 parent 526eb7d commit b344d14

2 files changed

Lines changed: 228 additions & 0 deletions

File tree

src/workos/authorization.py

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
from pydantic import TypeAdapter
44

5+
from workos.types.authorization.access_evaluation import AccessEvaluation
56
from workos.types.authorization.environment_role import (
67
EnvironmentRole,
78
EnvironmentRoleList,
@@ -161,6 +162,18 @@ def add_environment_role_permission(
161162
permission_slug: str,
162163
) -> SyncOrAsync[EnvironmentRole]: ...
163164

165+
# Access Evaluation
166+
167+
def check(
168+
self,
169+
organization_membership_id: str,
170+
*,
171+
permission_slug: str,
172+
resource_id: Optional[str] = None,
173+
resource_external_id: Optional[str] = None,
174+
resource_type_slug: Optional[str] = None,
175+
) -> SyncOrAsync[AccessEvaluation]: ...
176+
164177

165178
class Authorization(AuthorizationModule):
166179
_http_client: SyncHTTPClient
@@ -437,6 +450,42 @@ def add_environment_role_permission(
437450

438451
return EnvironmentRole.model_validate(response)
439452

453+
# Access Evaluation
454+
455+
def check(
456+
self,
457+
organization_membership_id: str,
458+
*,
459+
permission_slug: str,
460+
resource_id: Optional[str] = None,
461+
resource_external_id: Optional[str] = None,
462+
resource_type_slug: Optional[str] = None,
463+
) -> AccessEvaluation:
464+
if resource_id is not None and resource_external_id is not None:
465+
raise ValueError(
466+
"resource_id and resource_external_id are mutually exclusive"
467+
)
468+
if resource_external_id is not None and resource_type_slug is None:
469+
raise ValueError(
470+
"resource_type_slug is required when resource_external_id is provided"
471+
)
472+
473+
json: Dict[str, Any] = {"permission_slug": permission_slug}
474+
if resource_id is not None:
475+
json["resource_id"] = resource_id
476+
if resource_external_id is not None:
477+
json["resource_external_id"] = resource_external_id
478+
if resource_type_slug is not None:
479+
json["resource_type_slug"] = resource_type_slug
480+
481+
response = self._http_client.request(
482+
f"authorization/organization_memberships/{organization_membership_id}/check",
483+
method=REQUEST_METHOD_POST,
484+
json=json,
485+
)
486+
487+
return AccessEvaluation.model_validate(response)
488+
440489

441490
class AsyncAuthorization(AuthorizationModule):
442491
_http_client: AsyncHTTPClient
@@ -712,3 +761,39 @@ async def add_environment_role_permission(
712761
)
713762

714763
return EnvironmentRole.model_validate(response)
764+
765+
# Access Evaluation
766+
767+
async def check(
768+
self,
769+
organization_membership_id: str,
770+
*,
771+
permission_slug: str,
772+
resource_id: Optional[str] = None,
773+
resource_external_id: Optional[str] = None,
774+
resource_type_slug: Optional[str] = None,
775+
) -> AccessEvaluation:
776+
if resource_id is not None and resource_external_id is not None:
777+
raise ValueError(
778+
"resource_id and resource_external_id are mutually exclusive"
779+
)
780+
if resource_external_id is not None and resource_type_slug is None:
781+
raise ValueError(
782+
"resource_type_slug is required when resource_external_id is provided"
783+
)
784+
785+
json: Dict[str, Any] = {"permission_slug": permission_slug}
786+
if resource_id is not None:
787+
json["resource_id"] = resource_id
788+
if resource_external_id is not None:
789+
json["resource_external_id"] = resource_external_id
790+
if resource_type_slug is not None:
791+
json["resource_type_slug"] = resource_type_slug
792+
793+
response = await self._http_client.request(
794+
f"authorization/organization_memberships/{organization_membership_id}/check",
795+
method=REQUEST_METHOD_POST,
796+
json=json,
797+
)
798+
799+
return AccessEvaluation.model_validate(response)

tests/test_authorization_check.py

Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
from typing import Union
2+
3+
import pytest
4+
from tests.utils.syncify import syncify
5+
from workos.authorization import AsyncAuthorization, Authorization
6+
7+
8+
@pytest.mark.sync_and_async(Authorization, AsyncAuthorization)
9+
class TestAuthorizationCheck:
10+
@pytest.fixture(autouse=True)
11+
def setup(self, module_instance: Union[Authorization, AsyncAuthorization]):
12+
self.http_client = module_instance._http_client
13+
self.authorization = module_instance
14+
15+
@pytest.fixture
16+
def mock_check_authorized(self):
17+
return {"authorized": True}
18+
19+
@pytest.fixture
20+
def mock_check_unauthorized(self):
21+
return {"authorized": False}
22+
23+
def test_check_authorized(
24+
self, mock_check_authorized, capture_and_mock_http_client_request
25+
):
26+
request_kwargs = capture_and_mock_http_client_request(
27+
self.http_client, mock_check_authorized, 200
28+
)
29+
30+
result = syncify(
31+
self.authorization.check(
32+
"om_01ABC",
33+
permission_slug="documents:read",
34+
resource_id="res_01ABC",
35+
)
36+
)
37+
38+
assert result.authorized is True
39+
assert request_kwargs["method"] == "post"
40+
assert request_kwargs["url"].endswith(
41+
"/authorization/organization_memberships/om_01ABC/check"
42+
)
43+
44+
def test_check_unauthorized(
45+
self, mock_check_unauthorized, capture_and_mock_http_client_request
46+
):
47+
request_kwargs = capture_and_mock_http_client_request(
48+
self.http_client, mock_check_unauthorized, 200
49+
)
50+
51+
result = syncify(
52+
self.authorization.check(
53+
"om_01ABC",
54+
permission_slug="documents:write",
55+
resource_id="res_01ABC",
56+
)
57+
)
58+
59+
assert result.authorized is False
60+
assert request_kwargs["method"] == "post"
61+
62+
def test_check_with_resource_id(
63+
self, mock_check_authorized, capture_and_mock_http_client_request
64+
):
65+
request_kwargs = capture_and_mock_http_client_request(
66+
self.http_client, mock_check_authorized, 200
67+
)
68+
69+
syncify(
70+
self.authorization.check(
71+
"om_01ABC",
72+
permission_slug="documents:read",
73+
resource_id="res_01XYZ",
74+
)
75+
)
76+
77+
assert request_kwargs["json"] == {
78+
"permission_slug": "documents:read",
79+
"resource_id": "res_01XYZ",
80+
}
81+
82+
def test_check_with_resource_external_id(
83+
self, mock_check_authorized, capture_and_mock_http_client_request
84+
):
85+
request_kwargs = capture_and_mock_http_client_request(
86+
self.http_client, mock_check_authorized, 200
87+
)
88+
89+
syncify(
90+
self.authorization.check(
91+
"om_01ABC",
92+
permission_slug="documents:read",
93+
resource_external_id="ext_doc_123",
94+
resource_type_slug="document",
95+
)
96+
)
97+
98+
assert request_kwargs["json"] == {
99+
"permission_slug": "documents:read",
100+
"resource_external_id": "ext_doc_123",
101+
"resource_type_slug": "document",
102+
}
103+
104+
def test_check_url_construction(
105+
self, mock_check_authorized, capture_and_mock_http_client_request
106+
):
107+
request_kwargs = capture_and_mock_http_client_request(
108+
self.http_client, mock_check_authorized, 200
109+
)
110+
111+
syncify(
112+
self.authorization.check(
113+
"om_01MEMBERSHIP",
114+
permission_slug="admin:access",
115+
)
116+
)
117+
118+
assert request_kwargs["url"].endswith(
119+
"/authorization/organization_memberships/om_01MEMBERSHIP/check"
120+
)
121+
assert request_kwargs["json"] == {"permission_slug": "admin:access"}
122+
123+
def test_check_raises_when_both_resource_identifiers_provided(self):
124+
with pytest.raises(ValueError, match="mutually exclusive"):
125+
syncify(
126+
self.authorization.check(
127+
"om_01ABC",
128+
permission_slug="documents:read",
129+
resource_id="res_01ABC",
130+
resource_external_id="ext_doc_123",
131+
resource_type_slug="document",
132+
)
133+
)
134+
135+
def test_check_raises_when_external_id_without_type_slug(self):
136+
with pytest.raises(ValueError, match="resource_type_slug is required"):
137+
syncify(
138+
self.authorization.check(
139+
"om_01ABC",
140+
permission_slug="documents:read",
141+
resource_external_id="ext_doc_123",
142+
)
143+
)

0 commit comments

Comments
 (0)