-
Notifications
You must be signed in to change notification settings - Fork 902
Expand file tree
/
Copy path__init__.py
More file actions
70 lines (55 loc) · 2.18 KB
/
__init__.py
File metadata and controls
70 lines (55 loc) · 2.18 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
import logging
import os
from typing import Optional
import httpx
from httpx import Limits
from e2b.api.metadata import default_headers
from e2b.exceptions import AuthenticationException
from e2b.volume.client.client import AuthenticatedClient as AsyncVolumeApiClient
from e2b.connection_config import _resolve_proxy
from e2b.volume.connection_config import VolumeConnectionConfig
logger = logging.getLogger(__name__)
limits = Limits(
max_keepalive_connections=int(os.getenv("E2B_MAX_KEEPALIVE_CONNECTIONS", "20")),
max_connections=int(os.getenv("E2B_MAX_CONNECTIONS", "2000")),
keepalive_expiry=int(os.getenv("E2B_KEEPALIVE_EXPIRY", "300")),
)
def get_api_client(config: VolumeConnectionConfig, **kwargs) -> AsyncVolumeApiClient:
if config.access_token is None:
raise AuthenticationException(
"Access token is required for volume operations. "
"Set `E2B_ACCESS_TOKEN` or pass `token` in options.",
)
headers = {
**default_headers,
**(config.headers or {}),
}
return AsyncVolumeApiClient(
base_url=config.api_url,
token=config.access_token,
auth_header_name="Authorization",
prefix="Bearer",
headers=headers,
httpx_args={"proxy": config.proxy, "transport": get_transport(config)},
**kwargs,
)
class AsyncTransportWithLogger(httpx.AsyncHTTPTransport):
singleton: Optional["AsyncTransportWithLogger"] = None
async def handle_async_request(self, request):
url = f"{request.url.scheme}://{request.url.host}{request.url.path}"
logger.info(f"Request: {request.method} {url}")
response = await super().handle_async_request(request)
logger.info(f"Response: {response.status_code} {url}")
return response
@property
def pool(self):
return self._pool
def get_transport(config: VolumeConnectionConfig) -> AsyncTransportWithLogger:
if AsyncTransportWithLogger.singleton is not None:
return AsyncTransportWithLogger.singleton
transport = AsyncTransportWithLogger(
limits=limits,
proxy=_resolve_proxy(config.proxy),
)
AsyncTransportWithLogger.singleton = transport
return transport