Skip to content

Commit 30f14e8

Browse files
gjtorikianclaude
andcommitted
feat(python): add non-spec helpers H01-H19
New hand-maintained modules: - actions.py: AuthKit Actions request verification and response signing (H03) - pkce.py: PKCE code verifier/challenge utilities (H08) - public_client.py: Factory for PKCE-only public client usage (H19) Session enhancements: - seal_session_from_auth_response for auth response sealing (H07) @oagen-ignore blocks in generated files: - user_management: load_sealed_session, authenticate_with_session_cookie (H05), get_authorization_url_with_pkce (H10), authenticate_with_code_pkce (H11) - sso: get_authorization_url_with_pkce (H15), get_profile_and_token_pkce (H16) Emitter: wire actions and pkce onto WorkOS/AsyncWorkOS client Tests for all helpers including webhook verification (H01/H02), session management (H04/H06), and all new inline helpers. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent d5e0dc0 commit 30f14e8

File tree

13 files changed

+1467
-0
lines changed

13 files changed

+1467
-0
lines changed

src/workos/_client.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,9 @@
4747
from .webhooks._resource import Webhooks, AsyncWebhooks
4848
from .widgets._resource import Widgets, AsyncWidgets
4949
from .audit_logs._resource import AuditLogs, AsyncAuditLogs
50+
from .actions import Actions, AsyncActions
5051
from .passwordless import AsyncPasswordless, Passwordless
52+
from .pkce import PKCE
5153
from .vault import AsyncVault, Vault
5254

5355
try:
@@ -446,6 +448,14 @@ def passwordless(self) -> Passwordless:
446448
def vault(self) -> Vault:
447449
return Vault(self)
448450

451+
@functools.cached_property
452+
def actions(self) -> Actions:
453+
return Actions()
454+
455+
@functools.cached_property
456+
def pkce(self) -> PKCE:
457+
return PKCE()
458+
449459
@overload
450460
def request(
451461
self,
@@ -697,6 +707,14 @@ def passwordless(self) -> AsyncPasswordless:
697707
def vault(self) -> AsyncVault:
698708
return AsyncVault(self)
699709

710+
@functools.cached_property
711+
def actions(self) -> AsyncActions:
712+
return AsyncActions()
713+
714+
@functools.cached_property
715+
def pkce(self) -> PKCE:
716+
return PKCE()
717+
700718
@overload
701719
async def request(
702720
self,

src/workos/actions.py

Lines changed: 190 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,190 @@
1+
# @oagen-ignore-file
2+
# This file is hand-maintained. AuthKit Actions helpers for request
3+
# verification and response signing. These are client-side cryptographic
4+
# helpers that will always be hand-maintained.
5+
6+
from __future__ import annotations
7+
8+
import hashlib
9+
import hmac
10+
import json
11+
import time
12+
from typing import Any, Dict, Literal, Optional, Union
13+
14+
DEFAULT_TOLERANCE = 30 # seconds (stricter than webhooks' 180s)
15+
16+
ActionType = Literal["authentication", "user_registration"]
17+
18+
_ACTION_TYPE_TO_RESPONSE_OBJECT = {
19+
"authentication": "authentication_action_response",
20+
"user_registration": "user_registration_action_response",
21+
}
22+
23+
24+
def _verify_signature(
25+
*,
26+
payload: Union[bytes, str],
27+
sig_header: str,
28+
secret: str,
29+
tolerance: int = DEFAULT_TOLERANCE,
30+
) -> None:
31+
"""Verify an HMAC-SHA256 signature header. Raises ValueError on failure."""
32+
try:
33+
issued_part, sig_part = sig_header.split(", ")
34+
except (ValueError, AttributeError) as exc:
35+
raise ValueError(
36+
"Unable to extract timestamp and signature hash from header",
37+
sig_header,
38+
) from exc
39+
40+
issued_timestamp = issued_part[2:]
41+
signature_hash = sig_part[3:]
42+
43+
current_time = time.time()
44+
timestamp_in_seconds = int(issued_timestamp) / 1000
45+
seconds_since_issued = current_time - timestamp_in_seconds
46+
47+
if seconds_since_issued > tolerance:
48+
raise ValueError("Timestamp outside the tolerance zone")
49+
50+
body_str = payload.decode("utf-8") if isinstance(payload, bytes) else payload
51+
unhashed_string = f"{issued_timestamp}.{body_str}"
52+
expected_signature = hmac.new(
53+
secret.encode("utf-8"),
54+
unhashed_string.encode("utf-8"),
55+
digestmod=hashlib.sha256,
56+
).hexdigest()
57+
58+
if not hmac.compare_digest(signature_hash, expected_signature):
59+
raise ValueError(
60+
"Signature hash does not match the expected signature hash for payload"
61+
)
62+
63+
64+
def _compute_signature(payload_str: str, secret: str) -> str:
65+
"""Compute HMAC-SHA256 hex digest for a signed payload string."""
66+
return hmac.new(
67+
secret.encode("utf-8"),
68+
payload_str.encode("utf-8"),
69+
digestmod=hashlib.sha256,
70+
).hexdigest()
71+
72+
73+
class Actions:
74+
"""AuthKit Actions request verification and response signing."""
75+
76+
def verify_header(
77+
self,
78+
*,
79+
payload: Union[bytes, str],
80+
sig_header: str,
81+
secret: str,
82+
tolerance: int = DEFAULT_TOLERANCE,
83+
) -> None:
84+
"""Verify the signature of an Actions request."""
85+
_verify_signature(
86+
payload=payload, sig_header=sig_header, secret=secret, tolerance=tolerance,
87+
)
88+
89+
def construct_action(
90+
self,
91+
*,
92+
payload: Union[bytes, str],
93+
sig_header: str,
94+
secret: str,
95+
tolerance: int = DEFAULT_TOLERANCE,
96+
) -> Dict[str, Any]:
97+
"""Verify and deserialize an Actions request payload."""
98+
self.verify_header(
99+
payload=payload, sig_header=sig_header, secret=secret, tolerance=tolerance,
100+
)
101+
body = payload.decode("utf-8") if isinstance(payload, bytes) else payload
102+
return json.loads(body)
103+
104+
def sign_response(
105+
self,
106+
*,
107+
action_type: ActionType,
108+
verdict: Literal["Allow", "Deny"],
109+
error_message: Optional[str] = None,
110+
secret: str,
111+
) -> Dict[str, Any]:
112+
"""Build and sign an Actions response."""
113+
timestamp = int(time.time() * 1000)
114+
response_payload: Dict[str, Any] = {
115+
"timestamp": timestamp,
116+
"verdict": verdict,
117+
}
118+
if error_message is not None:
119+
response_payload["error_message"] = error_message
120+
121+
payload_json = json.dumps(response_payload, separators=(",", ":"))
122+
signed_payload = f"{timestamp}.{payload_json}"
123+
signature = _compute_signature(signed_payload, secret)
124+
object_type = _ACTION_TYPE_TO_RESPONSE_OBJECT[action_type]
125+
126+
return {
127+
"object": object_type,
128+
"payload": response_payload,
129+
"signature": signature,
130+
}
131+
132+
133+
class AsyncActions:
134+
"""Async variant of AuthKit Actions helpers."""
135+
136+
def verify_header(
137+
self,
138+
*,
139+
payload: Union[bytes, str],
140+
sig_header: str,
141+
secret: str,
142+
tolerance: int = DEFAULT_TOLERANCE,
143+
) -> None:
144+
"""Verify the signature of an Actions request."""
145+
_verify_signature(
146+
payload=payload, sig_header=sig_header, secret=secret, tolerance=tolerance,
147+
)
148+
149+
def construct_action(
150+
self,
151+
*,
152+
payload: Union[bytes, str],
153+
sig_header: str,
154+
secret: str,
155+
tolerance: int = DEFAULT_TOLERANCE,
156+
) -> Dict[str, Any]:
157+
"""Verify and deserialize an Actions request payload."""
158+
self.verify_header(
159+
payload=payload, sig_header=sig_header, secret=secret, tolerance=tolerance,
160+
)
161+
body = payload.decode("utf-8") if isinstance(payload, bytes) else payload
162+
return json.loads(body)
163+
164+
def sign_response(
165+
self,
166+
*,
167+
action_type: ActionType,
168+
verdict: Literal["Allow", "Deny"],
169+
error_message: Optional[str] = None,
170+
secret: str,
171+
) -> Dict[str, Any]:
172+
"""Build and sign an Actions response."""
173+
timestamp = int(time.time() * 1000)
174+
response_payload: Dict[str, Any] = {
175+
"timestamp": timestamp,
176+
"verdict": verdict,
177+
}
178+
if error_message is not None:
179+
response_payload["error_message"] = error_message
180+
181+
payload_json = json.dumps(response_payload, separators=(",", ":"))
182+
signed_payload = f"{timestamp}.{payload_json}"
183+
signature = _compute_signature(signed_payload, secret)
184+
object_type = _ACTION_TYPE_TO_RESPONSE_OBJECT[action_type]
185+
186+
return {
187+
"object": object_type,
188+
"payload": response_payload,
189+
"signature": signature,
190+
}

src/workos/pkce.py

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
# @oagen-ignore-file
2+
# This file is hand-maintained. PKCE (Proof Key for Code Exchange) utilities
3+
# for OAuth 2.0 public client flows. These are client-side cryptographic
4+
# helpers that will always be hand-maintained.
5+
6+
from __future__ import annotations
7+
8+
import base64
9+
import hashlib
10+
import os
11+
from dataclasses import dataclass
12+
from typing import Literal
13+
14+
15+
@dataclass(slots=True)
16+
class PKCEPair:
17+
"""A PKCE code verifier and challenge pair."""
18+
19+
code_verifier: str
20+
code_challenge: str
21+
code_challenge_method: Literal["S256"]
22+
23+
24+
def _base64url_encode(data: bytes) -> str:
25+
"""Base64url-encode bytes without padding, per RFC 7636."""
26+
return base64.urlsafe_b64encode(data).rstrip(b"=").decode("ascii")
27+
28+
29+
class PKCE:
30+
"""PKCE (RFC 7636) code verifier and challenge utilities.
31+
32+
All operations are synchronous and stateless -- no client or async
33+
variant is needed.
34+
"""
35+
36+
def generate_code_verifier(self, length: int = 43) -> str:
37+
"""Generate a cryptographically random code verifier.
38+
39+
Args:
40+
length: Length of the verifier string (43-128 per RFC 7636).
41+
42+
Returns:
43+
A base64url-encoded random string of the requested length.
44+
45+
Raises:
46+
ValueError: If length is outside the 43-128 range.
47+
"""
48+
if length < 43 or length > 128:
49+
raise ValueError(
50+
f"Code verifier length must be between 43 and 128, got {length}"
51+
)
52+
num_bytes = (length * 3 + 3) // 4
53+
raw = os.urandom(num_bytes)
54+
return _base64url_encode(raw)[:length]
55+
56+
def generate_code_challenge(self, verifier: str) -> str:
57+
"""Compute the S256 code challenge for a given verifier.
58+
59+
Args:
60+
verifier: The code verifier string.
61+
62+
Returns:
63+
The base64url-encoded SHA-256 hash of the verifier.
64+
"""
65+
digest = hashlib.sha256(verifier.encode("ascii")).digest()
66+
return _base64url_encode(digest)
67+
68+
def generate(self) -> PKCEPair:
69+
"""Generate a complete PKCE pair (verifier + challenge).
70+
71+
Returns:
72+
A PKCEPair with code_verifier, code_challenge, and
73+
code_challenge_method set to "S256".
74+
"""
75+
verifier = self.generate_code_verifier()
76+
challenge = self.generate_code_challenge(verifier)
77+
return PKCEPair(
78+
code_verifier=verifier,
79+
code_challenge=challenge,
80+
code_challenge_method="S256",
81+
)

src/workos/public_client.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
# @oagen-ignore-file
2+
# This file is hand-maintained. Public client factory for PKCE-only /
3+
# public-client usage (browser, mobile, CLI, desktop applications).
4+
5+
from __future__ import annotations
6+
7+
from typing import Optional
8+
9+
10+
def create_public_client(
11+
*,
12+
client_id: str,
13+
base_url: Optional[str] = None,
14+
request_timeout: Optional[int] = None,
15+
) -> "WorkOS":
16+
"""Create a WorkOS client configured for public/PKCE-only usage.
17+
18+
For browser, mobile, CLI, and desktop applications that cannot securely
19+
store an API key. Methods that require an API key will not include
20+
authorization headers.
21+
22+
Args:
23+
client_id: The WorkOS client ID.
24+
base_url: Override the base URL. Defaults to ``https://api.workos.com``.
25+
request_timeout: HTTP request timeout in seconds.
26+
27+
Returns:
28+
A WorkOS client instance with only ``client_id`` configured.
29+
"""
30+
from ._client import WorkOS
31+
32+
return WorkOS(
33+
client_id=client_id,
34+
base_url=base_url,
35+
request_timeout=request_timeout,
36+
)

src/workos/session.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,37 @@ def unseal_data(sealed_data: str, key: str) -> Dict[str, Any]:
105105
return cast(Dict[str, Any], json.loads(decrypted_str))
106106

107107

108+
def seal_session_from_auth_response(
109+
*,
110+
access_token: str,
111+
refresh_token: str,
112+
user: Optional[Dict[str, Any]] = None,
113+
impersonator: Optional[Dict[str, Any]] = None,
114+
cookie_password: str,
115+
) -> str:
116+
"""Seal session data from an authentication response into a cookie-safe string.
117+
118+
Args:
119+
access_token: The access token from the auth response.
120+
refresh_token: The refresh token from the auth response.
121+
user: The user dict from the auth response.
122+
impersonator: The impersonator dict, if present.
123+
cookie_password: The Fernet key used to seal the session.
124+
125+
Returns:
126+
A sealed session string suitable for storing in a cookie.
127+
"""
128+
session_data: Dict[str, Any] = {
129+
"access_token": access_token,
130+
"refresh_token": refresh_token,
131+
}
132+
if user is not None:
133+
session_data["user"] = user
134+
if impersonator is not None:
135+
session_data["impersonator"] = impersonator
136+
return seal_data(session_data, cookie_password)
137+
138+
108139
# ---------------------------------------------------------------------------
109140
# Session (sync)
110141
# ---------------------------------------------------------------------------

0 commit comments

Comments
 (0)