Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 85 additions & 0 deletions src/workos/authorization.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from pydantic import TypeAdapter

from workos.types.authorization.access_evaluation import AccessEvaluation
from workos.types.authorization.environment_role import (
EnvironmentRole,
EnvironmentRoleList,
Expand Down Expand Up @@ -161,6 +162,18 @@ def add_environment_role_permission(
permission_slug: str,
) -> SyncOrAsync[EnvironmentRole]: ...

# Access Evaluation

def check(
self,
organization_membership_id: str,
*,
permission_slug: str,
resource_id: Optional[str] = None,
resource_external_id: Optional[str] = None,
resource_type_slug: Optional[str] = None,
) -> SyncOrAsync[AccessEvaluation]: ...


class Authorization(AuthorizationModule):
_http_client: SyncHTTPClient
Expand Down Expand Up @@ -437,6 +450,42 @@ def add_environment_role_permission(

return EnvironmentRole.model_validate(response)

# Access Evaluation

def check(
self,
organization_membership_id: str,
*,
permission_slug: str,
resource_id: Optional[str] = None,
resource_external_id: Optional[str] = None,
resource_type_slug: Optional[str] = None,
) -> AccessEvaluation:
if resource_id is not None and resource_external_id is not None:
raise ValueError(
"resource_id and resource_external_id are mutually exclusive"
)
if resource_external_id is not None and resource_type_slug is None:
raise ValueError(
"resource_type_slug is required when resource_external_id is provided"
)

json: Dict[str, Any] = {"permission_slug": permission_slug}
if resource_id is not None:
json["resource_id"] = resource_id
if resource_external_id is not None:
json["resource_external_id"] = resource_external_id
if resource_type_slug is not None:
json["resource_type_slug"] = resource_type_slug

response = self._http_client.request(
f"authorization/organization_memberships/{organization_membership_id}/check",
method=REQUEST_METHOD_POST,
json=json,
)

return AccessEvaluation.model_validate(response)


class AsyncAuthorization(AuthorizationModule):
_http_client: AsyncHTTPClient
Expand Down Expand Up @@ -712,3 +761,39 @@ async def add_environment_role_permission(
)

return EnvironmentRole.model_validate(response)

# Access Evaluation

async def check(
self,
organization_membership_id: str,
*,
permission_slug: str,
resource_id: Optional[str] = None,
resource_external_id: Optional[str] = None,
resource_type_slug: Optional[str] = None,
) -> AccessEvaluation:
if resource_id is not None and resource_external_id is not None:
raise ValueError(
"resource_id and resource_external_id are mutually exclusive"
)
if resource_external_id is not None and resource_type_slug is None:
raise ValueError(
"resource_type_slug is required when resource_external_id is provided"
)

json: Dict[str, Any] = {"permission_slug": permission_slug}
if resource_id is not None:
json["resource_id"] = resource_id
if resource_external_id is not None:
json["resource_external_id"] = resource_external_id
if resource_type_slug is not None:
json["resource_type_slug"] = resource_type_slug

response = await self._http_client.request(
f"authorization/organization_memberships/{organization_membership_id}/check",
method=REQUEST_METHOD_POST,
json=json,
)

return AccessEvaluation.model_validate(response)
143 changes: 143 additions & 0 deletions tests/test_authorization_check.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
from typing import Union

import pytest
from tests.utils.syncify import syncify
from workos.authorization import AsyncAuthorization, Authorization


@pytest.mark.sync_and_async(Authorization, AsyncAuthorization)
class TestAuthorizationCheck:
@pytest.fixture(autouse=True)
def setup(self, module_instance: Union[Authorization, AsyncAuthorization]):
self.http_client = module_instance._http_client
self.authorization = module_instance

@pytest.fixture
def mock_check_authorized(self):
return {"authorized": True}

@pytest.fixture
def mock_check_unauthorized(self):
return {"authorized": False}

def test_check_authorized(
self, mock_check_authorized, capture_and_mock_http_client_request
):
request_kwargs = capture_and_mock_http_client_request(
self.http_client, mock_check_authorized, 200
)

result = syncify(
self.authorization.check(
"om_01ABC",
permission_slug="documents:read",
resource_id="res_01ABC",
)
)

assert result.authorized is True
assert request_kwargs["method"] == "post"
assert request_kwargs["url"].endswith(
"/authorization/organization_memberships/om_01ABC/check"
)

def test_check_unauthorized(
self, mock_check_unauthorized, capture_and_mock_http_client_request
):
request_kwargs = capture_and_mock_http_client_request(
self.http_client, mock_check_unauthorized, 200
)

result = syncify(
self.authorization.check(
"om_01ABC",
permission_slug="documents:write",
resource_id="res_01ABC",
)
)

assert result.authorized is False
assert request_kwargs["method"] == "post"

def test_check_with_resource_id(
self, mock_check_authorized, capture_and_mock_http_client_request
):
request_kwargs = capture_and_mock_http_client_request(
self.http_client, mock_check_authorized, 200
)

syncify(
self.authorization.check(
"om_01ABC",
permission_slug="documents:read",
resource_id="res_01XYZ",
)
)

assert request_kwargs["json"] == {
"permission_slug": "documents:read",
"resource_id": "res_01XYZ",
}

def test_check_with_resource_external_id(
self, mock_check_authorized, capture_and_mock_http_client_request
):
request_kwargs = capture_and_mock_http_client_request(
self.http_client, mock_check_authorized, 200
)

syncify(
self.authorization.check(
"om_01ABC",
permission_slug="documents:read",
resource_external_id="ext_doc_123",
resource_type_slug="document",
)
)

assert request_kwargs["json"] == {
"permission_slug": "documents:read",
"resource_external_id": "ext_doc_123",
"resource_type_slug": "document",
}

def test_check_url_construction(
self, mock_check_authorized, capture_and_mock_http_client_request
):
request_kwargs = capture_and_mock_http_client_request(
self.http_client, mock_check_authorized, 200
)

syncify(
self.authorization.check(
"om_01MEMBERSHIP",
permission_slug="admin:access",
)
)

assert request_kwargs["url"].endswith(
"/authorization/organization_memberships/om_01MEMBERSHIP/check"
)
assert request_kwargs["json"] == {"permission_slug": "admin:access"}

def test_check_raises_when_both_resource_identifiers_provided(self):
with pytest.raises(ValueError, match="mutually exclusive"):
syncify(
self.authorization.check(
"om_01ABC",
permission_slug="documents:read",
resource_id="res_01ABC",
resource_external_id="ext_doc_123",
resource_type_slug="document",
)
)

def test_check_raises_when_external_id_without_type_slug(self):
with pytest.raises(ValueError, match="resource_type_slug is required"):
syncify(
self.authorization.check(
"om_01ABC",
permission_slug="documents:read",
resource_external_id="ext_doc_123",
)
)