-
Notifications
You must be signed in to change notification settings - Fork 870
Expand file tree
/
Copy path__init__.py
More file actions
164 lines (138 loc) · 5.33 KB
/
__init__.py
File metadata and controls
164 lines (138 loc) · 5.33 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
import json
import logging
import os
from dataclasses import dataclass
from types import TracebackType
from typing import Optional, Union
from httpx import AsyncBaseTransport, BaseTransport, Limits
from e2b.api.client.client import AuthenticatedClient
from e2b.api.client.types import Response
from e2b.api.metadata import default_headers
from e2b.connection_config import ConnectionConfig
from e2b.exceptions import (
AuthenticationException,
RateLimitException,
SandboxException,
)
logger = logging.getLogger(__name__)
limits = Limits(
max_keepalive_connections=int(os.getenv("E2B_MAX_KEEPALIVE_CONNECTIONS", "20")),
max_connections=int(os.getenv("E2B_MAX_CONNECTIONS", "2000")),
keepalive_expiry=int(os.getenv("E2B_KEEPALIVE_EXPIRY", "300")),
)
@dataclass
class SandboxCreateResponse:
sandbox_id: str
sandbox_domain: Optional[str]
envd_version: str
envd_access_token: Optional[str]
traffic_access_token: Optional[str]
def handle_api_exception(
e: Response,
default_exception_class: type[Exception] = SandboxException,
stack_trace: Optional[TracebackType] = None,
):
try:
body = json.loads(e.content) if e.content else {}
except json.JSONDecodeError:
body = {}
if e.status_code == 401:
message = f"{e.status_code}: Unauthorized, please check your credentials."
if body.get("message"):
message += f" - {body['message']}"
return AuthenticationException(message)
if e.status_code == 429:
message = f"{e.status_code}: Rate limit exceeded, please try again later."
if body.get("message"):
message += f" - {body['message']}"
return RateLimitException(message)
if "message" in body:
return default_exception_class(
f"{e.status_code}: {body['message']}"
).with_traceback(stack_trace)
return default_exception_class(f"{e.status_code}: {e.content}").with_traceback(
stack_trace
)
class ApiClient(AuthenticatedClient):
"""
The client for interacting with the E2B API.
"""
def __init__(
self,
config: ConnectionConfig,
require_api_key: bool = True,
require_access_token: bool = False,
transport: Optional[Union[BaseTransport, AsyncBaseTransport]] = None,
*args,
**kwargs,
):
if require_api_key and require_access_token:
raise AuthenticationException(
"Only one of api_key or access_token can be required, not both",
)
if not require_api_key and not require_access_token:
raise AuthenticationException(
"Either api_key or access_token is required",
)
token = None
if require_api_key:
if config.api_key is None:
raise AuthenticationException(
"API key is required, please visit the Team tab at https://e2b.dev/dashboard to get your API key. "
"You can either set the environment variable `E2B_API_KEY` "
'or you can pass it directly to the method like api_key="e2b_..."',
)
token = config.api_key
if require_access_token:
if config.access_token is None:
raise AuthenticationException(
"Access token is required, please visit the Personal tab at https://e2b.dev/dashboard to get your access token. "
"You can set the environment variable `E2B_ACCESS_TOKEN` or pass the `access_token` in options.",
)
token = config.access_token
auth_header_name = "X-API-KEY" if require_api_key else "Authorization"
prefix = "" if require_api_key else "Bearer"
headers = {
**default_headers,
**(config.headers or {}),
}
# Prevent passing these parameters twice
more_headers: Optional[dict] = kwargs.pop("headers", None)
if more_headers:
headers.update(more_headers)
kwargs.pop("token", None)
kwargs.pop("auth_header_name", None)
kwargs.pop("prefix", None)
super().__init__(
base_url=config.api_url,
httpx_args={
"event_hooks": {
"request": [self._log_request],
"response": [self._log_response],
},
"proxy": config.proxy,
"transport": transport,
},
headers=headers,
token=token or "",
auth_header_name=auth_header_name,
prefix=prefix,
*args,
**kwargs,
)
def _log_request(self, request):
logger.info(f"Request {request.method} {request.url}")
def _log_response(self, response: Response):
if response.status_code >= 400:
logger.error(f"Response {response.status_code}")
else:
logger.info(f"Response {response.status_code}")
# We need to override the logging hooks for the async usage
class AsyncApiClient(ApiClient):
async def _log_request(self, request):
logger.info(f"Request {request.method} {request.url}")
async def _log_response(self, response: Response):
if response.status_code >= 400:
logger.error(f"Response {response.status_code}")
else:
logger.info(f"Response {response.status_code}")