66session, a restricted set of tools, and a tight iteration budget.
77
88The main agent uses the ``dispatch_subagent`` tool function to fire off work.
9+
10+ Rate-limit handling:
11+ - A global semaphore throttles how many sub-agents can hit the API at once.
12+ - Each API call is wrapped in exponential-backoff retry logic for 429 errors.
13+ - Concurrency is tuneable via env vars SUBAGENT_MAX_WORKERS and
14+ SUBAGENT_MAX_CONCURRENT_API.
915"""
1016
17+ import os
18+ import time
19+ import random
1120import threading
1221from concurrent .futures import ThreadPoolExecutor , Future
1322
@@ -68,6 +77,50 @@ def _get_next_id() -> int:
6877 return current
6978
7079
80+ # ---------------------------------------------------------------------------
81+ # Rate-limit helpers
82+ # ---------------------------------------------------------------------------
83+
84+ # Max sub-agents that can make an API call at the same time.
85+ # Lower this if you're on a free-tier key with tight RPM limits.
86+ _MAX_CONCURRENT_API = int (os .getenv ("SUBAGENT_MAX_CONCURRENT_API" , "2" ))
87+ _api_semaphore = threading .Semaphore (_MAX_CONCURRENT_API )
88+
89+ # Retry settings for 429 / ResourceExhausted errors
90+ _MAX_RETRIES = 5
91+ _RETRY_BASE_DELAY = 2.0 # seconds — first retry waits ~2s
92+ _RETRY_MAX_DELAY = 60.0 # cap so we don't wait forever
93+
94+
95+ def _is_rate_limit_error (exc : Exception ) -> bool :
96+ """Return True if *exc* looks like a Gemini rate-limit / quota error."""
97+ msg = str (exc ).lower ()
98+ return any (kw in msg for kw in ("429" , "resource_exhausted" , "rate limit" , "quota" ))
99+
100+
101+ def _send_with_retry (chat_session , message , agent_id : int ):
102+ """Send a message through the chat session with semaphore + exp backoff."""
103+ for attempt in range (1 , _MAX_RETRIES + 1 ):
104+ with _api_semaphore :
105+ try :
106+ return chat_session .send_message (message )
107+ except Exception as exc :
108+ if _is_rate_limit_error (exc ) and attempt < _MAX_RETRIES :
109+ delay = min (
110+ _RETRY_BASE_DELAY * (2 ** (attempt - 1 )) + random .uniform (0 , 1 ),
111+ _RETRY_MAX_DELAY ,
112+ )
113+ console .print (
114+ f" [dim yellow]⏳ sub-agent #{ agent_id } : rate-limited, "
115+ f"retry { attempt } /{ _MAX_RETRIES } in { delay :.1f} s[/dim yellow]"
116+ )
117+ time .sleep (delay )
118+ else :
119+ raise
120+ # Should not reach here, but just in case:
121+ raise RuntimeError (f"sub-agent #{ agent_id } : exhausted { _MAX_RETRIES } retries" )
122+
123+
71124# ---------------------------------------------------------------------------
72125# SubAgent class
73126# ---------------------------------------------------------------------------
@@ -143,7 +196,7 @@ def run(self) -> str:
143196 prompt = "" .join (parts )
144197
145198 try :
146- response = self .chat_session . send_message ( prompt )
199+ response = _send_with_retry ( self .chat_session , prompt , self . id )
147200 except Exception as e :
148201 return f"[sub-agent { self .id } ] Error on initial message: { e } "
149202
@@ -183,7 +236,9 @@ def run(self) -> str:
183236 )
184237 )
185238
186- response = self .chat_session .send_message (tool_responses )
239+ response = _send_with_retry (
240+ self .chat_session , tool_responses , self .id
241+ )
187242 continue
188243 else :
189244 # Final text response
@@ -203,7 +258,8 @@ def _clip(self, text: str) -> str:
203258# ---------------------------------------------------------------------------
204259
205260# Thread pool for running sub-agents concurrently
206- _executor = ThreadPoolExecutor (max_workers = 4 , thread_name_prefix = "subagent" )
261+ _MAX_WORKERS = int (os .getenv ("SUBAGENT_MAX_WORKERS" , "4" ))
262+ _executor = ThreadPoolExecutor (max_workers = _MAX_WORKERS , thread_name_prefix = "subagent" )
207263
208264
209265def dispatch_subagent (task : str , context : str = "" , tools : str = "" ) -> str :
0 commit comments