forked from workos/workos-python
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathasync_client.py
More file actions
132 lines (114 loc) · 4.32 KB
/
async_client.py
File metadata and controls
132 lines (114 loc) · 4.32 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
from typing import Optional
from workos.__about__ import __version__
from workos._base_client import BaseClient
from workos.audit_logs import AuditLogsModule
from workos.directory_sync import AsyncDirectorySync
from workos.events import AsyncEvents
from workos.fga import FGAModule
from workos.mfa import MFAModule
from workos.organizations import AsyncOrganizations
from workos.organization_domains import AsyncOrganizationDomains
from workos.passwordless import PasswordlessModule
from workos.portal import PortalModule
from workos.sso import AsyncSSO
from workos.user_management import AsyncUserManagement
from workos.utils.http_client import AsyncHTTPClient
from workos.webhooks import WebhooksModule
from workos.widgets import WidgetsModule
from workos.vault import VaultModule
class AsyncClient(BaseClient):
"""Client for a convenient way to access the WorkOS feature set."""
_http_client: AsyncHTTPClient
def __init__(
self,
*,
api_key: Optional[str] = None,
client_id: Optional[str] = None,
base_url: Optional[str] = None,
request_timeout: Optional[int] = None,
jwt_leeway: float = 0,
):
super().__init__(
api_key=api_key,
client_id=client_id,
base_url=base_url,
request_timeout=request_timeout,
jwt_leeway=jwt_leeway,
)
self._http_client = AsyncHTTPClient(
api_key=self._api_key,
base_url=self.base_url,
client_id=self._client_id,
version=__version__,
timeout=self.request_timeout,
)
@property
def sso(self) -> AsyncSSO:
if not getattr(self, "_sso", None):
self._sso = AsyncSSO(
http_client=self._http_client, client_configuration=self
)
return self._sso
@property
def audit_logs(self) -> AuditLogsModule:
raise NotImplementedError(
"Audit logs APIs are not yet supported in the async client."
)
@property
def directory_sync(self) -> AsyncDirectorySync:
if not getattr(self, "_directory_sync", None):
self._directory_sync = AsyncDirectorySync(self._http_client)
return self._directory_sync
@property
def events(self) -> AsyncEvents:
if not getattr(self, "_events", None):
self._events = AsyncEvents(self._http_client)
return self._events
@property
def fga(self) -> FGAModule:
raise NotImplementedError("FGA APIs are not yet supported in the async client.")
@property
def organizations(self) -> AsyncOrganizations:
if not getattr(self, "_organizations", None):
self._organizations = AsyncOrganizations(self._http_client)
return self._organizations
@property
def organization_domains(self) -> AsyncOrganizationDomains:
if not getattr(self, "_organization_domains", None):
self._organization_domains = AsyncOrganizationDomains(
http_client=self._http_client, client_configuration=self
)
return self._organization_domains
@property
def passwordless(self) -> PasswordlessModule:
raise NotImplementedError(
"Passwordless APIs are not yet supported in the async client."
)
@property
def portal(self) -> PortalModule:
raise NotImplementedError(
"Portal APIs are not yet supported in the async client."
)
@property
def webhooks(self) -> WebhooksModule:
raise NotImplementedError("Webhooks are not yet supported in the async client.")
@property
def mfa(self) -> MFAModule:
raise NotImplementedError("MFA APIs are not yet supported in the async client.")
@property
def user_management(self) -> AsyncUserManagement:
if not getattr(self, "_user_management", None):
self._user_management = AsyncUserManagement(
http_client=self._http_client, client_configuration=self
)
return self._user_management
@property
def widgets(self) -> WidgetsModule:
raise NotImplementedError(
"Widgets APIs are not yet supported in the async client."
)
@property
def vault(self) -> VaultModule:
raise NotImplementedError(
"Vault APIs are not yet supported in the async client."
)