Skip to content

Commit afdf9d0

Browse files
authored
Merge pull request #2 from fern-demo/devin/1767822715-area-based-pool
Add Area-based Pool logic for regional URL cycling
2 parents 25ae08a + e204918 commit afdf9d0

5 files changed

Lines changed: 435 additions & 7 deletions

File tree

.fernignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,4 @@
11
# Specify files that shouldn't be modified by Fern
2+
src/agoraio/pool_client.py
3+
src/agoraio/__init__.py
4+
src/agoraio/core/domain.py

src/agoraio/__init__.py

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,19 @@
66
from importlib import import_module
77

88
if typing.TYPE_CHECKING:
9-
from . import agents, phone_numbers, telephony
10-
from .client import Agora, AsyncAgora
9+
from . import agents, core, phone_numbers, telephony
10+
from .core.domain import Area, Pool, create_pool
11+
from .pool_client import Agora, AsyncAgora
1112
from .version import __version__
1213
_dynamic_imports: typing.Dict[str, str] = {
13-
"Agora": ".client",
14-
"AsyncAgora": ".client",
14+
"Agora": ".pool_client",
15+
"Area": ".core.domain",
16+
"AsyncAgora": ".pool_client",
17+
"Pool": ".core.domain",
1518
"__version__": ".version",
1619
"agents": ".agents",
20+
"core": ".core",
21+
"create_pool": ".core.domain",
1722
"phone_numbers": ".phone_numbers",
1823
"telephony": ".telephony",
1924
}
@@ -40,4 +45,4 @@ def __dir__():
4045
return sorted(lazy_attrs)
4146

4247

43-
__all__ = ["Agora", "AsyncAgora", "__version__", "agents", "phone_numbers", "telephony"]
48+
__all__ = ["Agora", "Area", "AsyncAgora", "Pool", "__version__", "agents", "core", "create_pool", "phone_numbers", "telephony"]

src/agoraio/core/__init__.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,15 @@
66
from importlib import import_module
77

88
if typing.TYPE_CHECKING:
9+
from .domain import Area, Pool, create_pool
910
from .file import File, with_content_type
10-
_dynamic_imports: typing.Dict[str, str] = {"File": ".file", "with_content_type": ".file"}
11+
_dynamic_imports: typing.Dict[str, str] = {
12+
"Area": ".domain",
13+
"Pool": ".domain",
14+
"create_pool": ".domain",
15+
"File": ".file",
16+
"with_content_type": ".file",
17+
}
1118

1219

1320
def __getattr__(attr_name: str) -> typing.Any:
@@ -31,4 +38,4 @@ def __dir__():
3138
return sorted(lazy_attrs)
3239

3340

34-
__all__ = ["File", "with_content_type"]
41+
__all__ = ["Area", "Pool", "create_pool", "File", "with_content_type"]

src/agoraio/core/domain.py

Lines changed: 204 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,204 @@
1+
# This file was auto-generated by Fern from our API Definition.
2+
3+
import asyncio
4+
import socket
5+
import threading
6+
import time
7+
from enum import IntEnum
8+
from typing import List, Optional
9+
10+
11+
class Area(IntEnum):
12+
"""Area represents the global regions where the Open API gateway endpoint is located"""
13+
14+
UNKNOWN = 0
15+
US = 1 # US represents the western and eastern regions of the United States
16+
EU = 2 # EU represents the western and central regions of Europe
17+
AP = 3 # AP represents the southeastern and northeastern regions of Asia-Pacific
18+
CN = 4 # CN represents the eastern and northern regions of Chinese mainland
19+
20+
21+
CHINESE_MAINLAND_MAJOR_DOMAIN = "sd-rtn.com"
22+
OVERSEA_MAJOR_DOMAIN = "agora.io"
23+
24+
GLOBAL_DOMAIN_PREFIX = "api"
25+
26+
US_WEST_REGION_DOMAIN_PREFIX = "api-us-west-1"
27+
US_EAST_REGION_DOMAIN_PREFIX = "api-us-east-1"
28+
29+
AP_SOUTHEAST_REGION_DOMAIN_PREFIX = "api-ap-southeast-1"
30+
AP_NORTHEAST_REGION_DOMAIN_PREFIX = "api-ap-northeast-1"
31+
32+
EU_WEST_REGION_DOMAIN_PREFIX = "api-eu-west-1"
33+
EU_CENTRAL_REGION_DOMAIN_PREFIX = "api-eu-central-1"
34+
35+
CN_EAST_REGION_DOMAIN_PREFIX = "api-cn-east-1"
36+
CN_NORTH_REGION_DOMAIN_PREFIX = "api-cn-north-1"
37+
38+
API_PATH_SUFFIX = "/api/conversational-ai-agent"
39+
40+
41+
class Domain:
42+
"""Domain contains the regional prefixes and domain suffixes for an area"""
43+
44+
def __init__(self, region_domain_prefixes: List[str], major_domain_suffixes: List[str]):
45+
self.region_domain_prefixes = region_domain_prefixes
46+
self.major_domain_suffixes = major_domain_suffixes
47+
48+
49+
REGION_DOMAIN = {
50+
Area.UNKNOWN: Domain([], []),
51+
Area.US: Domain(
52+
[US_WEST_REGION_DOMAIN_PREFIX, US_EAST_REGION_DOMAIN_PREFIX],
53+
[OVERSEA_MAJOR_DOMAIN, CHINESE_MAINLAND_MAJOR_DOMAIN],
54+
),
55+
Area.EU: Domain(
56+
[EU_WEST_REGION_DOMAIN_PREFIX, EU_CENTRAL_REGION_DOMAIN_PREFIX],
57+
[OVERSEA_MAJOR_DOMAIN, CHINESE_MAINLAND_MAJOR_DOMAIN],
58+
),
59+
Area.AP: Domain(
60+
[AP_SOUTHEAST_REGION_DOMAIN_PREFIX, AP_NORTHEAST_REGION_DOMAIN_PREFIX],
61+
[OVERSEA_MAJOR_DOMAIN, CHINESE_MAINLAND_MAJOR_DOMAIN],
62+
),
63+
Area.CN: Domain(
64+
[CN_EAST_REGION_DOMAIN_PREFIX, CN_NORTH_REGION_DOMAIN_PREFIX],
65+
[CHINESE_MAINLAND_MAJOR_DOMAIN, OVERSEA_MAJOR_DOMAIN],
66+
),
67+
}
68+
69+
70+
class Resolver:
71+
"""Interface for resolving the best domain"""
72+
73+
def resolve(self, domains: List[str], region_prefix: str) -> str:
74+
raise NotImplementedError
75+
76+
77+
class ResolverImpl(Resolver):
78+
"""Default DNS-based resolver implementation"""
79+
80+
def resolve(self, domains: List[str], region_prefix: str) -> str:
81+
result: Optional[str] = None
82+
result_lock = threading.Lock()
83+
84+
def lookup_domain(domain: str) -> None:
85+
nonlocal result
86+
try:
87+
url = f"{region_prefix}.{domain}"
88+
socket.gethostbyname(url)
89+
with result_lock:
90+
if result is None:
91+
result = domain
92+
except socket.gaierror:
93+
pass
94+
95+
threads = []
96+
for domain in domains:
97+
thread = threading.Thread(target=lookup_domain, args=(domain,))
98+
thread.start()
99+
threads.append(thread)
100+
101+
for thread in threads:
102+
thread.join(timeout=5.0)
103+
104+
if result is not None:
105+
return result
106+
107+
raise Exception("query all dns failed")
108+
109+
110+
class AsyncResolverImpl(Resolver):
111+
"""Async DNS-based resolver implementation"""
112+
113+
async def resolve_async(self, domains: List[str], region_prefix: str) -> str:
114+
async def lookup_domain(domain: str) -> str:
115+
url = f"{region_prefix}.{domain}"
116+
loop = asyncio.get_event_loop()
117+
await loop.getaddrinfo(url, None)
118+
return domain
119+
120+
tasks = [lookup_domain(domain) for domain in domains]
121+
122+
for coro in asyncio.as_completed(tasks):
123+
try:
124+
result = await coro
125+
return result
126+
except (socket.gaierror, OSError):
127+
continue
128+
129+
raise Exception("query all dns failed")
130+
131+
132+
UPDATE_DURATION_SECONDS = 30
133+
134+
135+
class Pool:
136+
"""Pool manages a pool of regional URLs with automatic cycling and domain selection"""
137+
138+
def __init__(self, domain_area: Area):
139+
domain_config = REGION_DOMAIN.get(domain_area)
140+
if domain_config is None or len(domain_config.region_domain_prefixes) == 0:
141+
raise ValueError("invalid domain area")
142+
143+
self._domain_area = domain_area
144+
self._domain_suffixes = list(domain_config.major_domain_suffixes)
145+
self._region_prefixes = list(domain_config.region_domain_prefixes)
146+
self._current_region_prefixes = list(self._region_prefixes)
147+
self._current_domain = self._domain_suffixes[0]
148+
self._resolver = ResolverImpl()
149+
self._async_resolver = AsyncResolverImpl()
150+
self._last_update: float = 0
151+
self._lock = threading.Lock()
152+
153+
def _domain_need_update(self) -> bool:
154+
return time.time() - self._last_update > UPDATE_DURATION_SECONDS
155+
156+
def select_best_domain(self) -> None:
157+
"""SelectBestDomain uses DNS resolution to select the best available domain (sync)"""
158+
if not self._domain_need_update():
159+
return
160+
161+
with self._lock:
162+
if self._domain_need_update():
163+
domain = self._resolver.resolve(self._domain_suffixes, self._current_region_prefixes[0])
164+
self._select_domain(domain)
165+
166+
async def select_best_domain_async(self) -> None:
167+
"""SelectBestDomain uses DNS resolution to select the best available domain (async)"""
168+
if not self._domain_need_update():
169+
return
170+
171+
with self._lock:
172+
if self._domain_need_update():
173+
domain = await self._async_resolver.resolve_async(
174+
self._domain_suffixes, self._current_region_prefixes[0]
175+
)
176+
self._select_domain(domain)
177+
178+
def next_region(self) -> None:
179+
"""NextRegion cycles to the next region prefix in the pool"""
180+
with self._lock:
181+
self._current_region_prefixes = self._current_region_prefixes[1:]
182+
if len(self._current_region_prefixes) == 0:
183+
self._current_region_prefixes = list(self._region_prefixes)
184+
185+
def _select_domain(self, domain: str) -> None:
186+
if domain in self._domain_suffixes:
187+
self._current_domain = domain
188+
self._last_update = time.time()
189+
190+
def get_current_url(self) -> str:
191+
"""GetCurrentURL returns the current URL based on the selected region and domain"""
192+
with self._lock:
193+
current_region = self._current_region_prefixes[0]
194+
current_domain = self._current_domain
195+
return f"https://{current_region}.{current_domain}{API_PATH_SUFFIX}"
196+
197+
def get_area(self) -> Area:
198+
"""Get the current area"""
199+
return self._domain_area
200+
201+
202+
def create_pool(area: Area) -> Pool:
203+
"""Creates a new Pool for the specified area"""
204+
return Pool(area)

0 commit comments

Comments
 (0)