forked from e2b-dev/E2B
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy path__init__.py
More file actions
76 lines (53 loc) · 2.08 KB
/
Copy path__init__.py
File metadata and controls
76 lines (53 loc) · 2.08 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
import asyncio
import logging
from typing import Dict, Tuple
import httpx
from e2b.api import AsyncApiClient, limits
from e2b.connection_config import ConnectionConfig
logger = logging.getLogger(__name__)
def get_api_client(config: ConnectionConfig, **kwargs) -> AsyncApiClient:
return AsyncApiClient(
config,
transport=get_transport(config, http2=config.http2),
**kwargs,
)
class AsyncTransportWithLogger(httpx.AsyncHTTPTransport):
_instances: Dict[Tuple[int, bool], "AsyncTransportWithLogger"] = {}
async def handle_async_request(self, request):
url = f"{request.url.scheme}://{request.url.host}{request.url.path}"
logger.info(f"Request: {request.method} {url}")
response = await super().handle_async_request(request)
# data = connect.GzipCompressor.decompress(response.read()).decode()
logger.info(f"Response: {response.status_code} {url}")
return response
@property
def pool(self):
return self._pool
def get_transport(
config: ConnectionConfig, http2: bool = True
) -> AsyncTransportWithLogger:
loop_id = (id(asyncio.get_running_loop()), http2)
if loop_id in AsyncTransportWithLogger._instances:
return AsyncTransportWithLogger._instances[loop_id]
transport = AsyncTransportWithLogger(
limits=limits,
proxy=config.proxy,
http2=http2,
)
AsyncTransportWithLogger._instances[loop_id] = transport
return transport
class AsyncEnvdTransportWithLogger(AsyncTransportWithLogger):
_instances: Dict[Tuple[int, bool], "AsyncEnvdTransportWithLogger"] = {}
def get_envd_transport(
config: ConnectionConfig, http2: bool = True
) -> AsyncEnvdTransportWithLogger:
loop_id = (id(asyncio.get_running_loop()), http2)
if loop_id in AsyncEnvdTransportWithLogger._instances:
return AsyncEnvdTransportWithLogger._instances[loop_id]
transport = AsyncEnvdTransportWithLogger(
limits=limits,
proxy=config.proxy,
http2=http2,
)
AsyncEnvdTransportWithLogger._instances[loop_id] = transport
return transport