Skip to content

Commit ee0c272

Browse files
authored
Merge pull request #18 from lucas-rampimdesouza_data/feature/exponential-backoff
feat: exponential backoff with jitter for retry and rate-limit queue
2 parents 37d2ff0 + 9b61ac6 commit ee0c272

File tree

4 files changed

+125
-195
lines changed

4 files changed

+125
-195
lines changed

backend/app/api/genie_clone_routes.py

Lines changed: 82 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,8 @@
2424
from app.api.config_store import get_effective_setting
2525
from app.api.auth_helpers import extract_bearer_token
2626
from app.services.embedding_service import embedding_service
27-
from app.services.genie_service import genie_service, GenieRateLimitError
27+
from app.services.genie_service import genie_service, GenieRateLimitError, GenieConfigError
28+
from app.utils import exponential_backoff
2829
from app.services.intent_splitter import split_by_intent
2930
from app.services.question_normalizer import normalize_question
3031
from app.services.cache_validator import validate_cache_entry
@@ -43,6 +44,7 @@
4344
# In-memory store for synthetic (cache / queued) messages & attachments
4445
# ---------------------------------------------------------------------------
4546
_synthetic_messages: dict[str, dict] = {}
47+
_message_locks: dict[str, asyncio.Lock] = {}
4648

4749
CONV_PREFIX = "ccache_"
4850
MSG_PREFIX = "mcache_"
@@ -53,6 +55,16 @@
5355
# Helpers
5456
# ---------------------------------------------------------------------------
5557

58+
def _get_message_lock(msg_id: str) -> asyncio.Lock:
59+
if msg_id not in _message_locks:
60+
_message_locks[msg_id] = asyncio.Lock()
61+
return _message_locks[msg_id]
62+
63+
64+
def _release_message_lock(msg_id: str) -> None:
65+
_message_locks.pop(msg_id, None)
66+
67+
5668
def _extract_token(request: Request) -> str:
5769
"""Extract auth token from request headers."""
5870
return extract_bearer_token(request)
@@ -192,7 +204,7 @@ async def _process_genie_background(
192204

193205
last_error = None
194206
attempt = 0
195-
max_rate_limit_waits = 12 # Max 12 × 5s = 60s waiting for rate limit
207+
max_rate_limit_waits = 12 # worst case 12 × 10s = 120s
196208

197209
while attempt <= max_retries:
198210
# Wait for rate limit slot (does NOT consume an attempt)
@@ -202,16 +214,20 @@ async def _process_genie_background(
202214
if rate_limit_waits > max_rate_limit_waits:
203215
logger.warning("Background rate limit wait exhausted for msg_id=%s", msg_id)
204216
break
205-
logger.info("Background rate limited, waiting 5s (wait %d/%d)", rate_limit_waits, max_rate_limit_waits)
206-
await asyncio.sleep(5)
217+
wait = exponential_backoff(rate_limit_waits - 1, base=5.0, cap=10.0)
218+
logger.info("Background rate limited, waiting %.1fs (wait %d/%d)", wait, rate_limit_waits, max_rate_limit_waits)
219+
await asyncio.sleep(wait)
207220

208-
if msg_id in _synthetic_messages:
209-
_synthetic_messages[msg_id].setdefault("_proxy", {})["stage"] = "processing_genie"
221+
async with _get_message_lock(msg_id):
222+
if msg_id in _synthetic_messages:
223+
_synthetic_messages[msg_id].setdefault("_proxy", {})["stage"] = "processing_genie"
210224

211225
try:
212226
if conversation_id and not conversation_id.startswith(CONV_PREFIX):
213227
try:
214228
result = await genie_service.send_message(space_id, conversation_id, query_text, rs)
229+
except GenieConfigError:
230+
raise
215231
except Exception:
216232
logger.warning("send_message failed, falling back to start_conversation")
217233
result = await genie_service.start_conversation(space_id, query_text, rs)
@@ -251,11 +267,12 @@ async def _process_genie_background(
251267
"sql_query": sql_query,
252268
"result": None,
253269
}
254-
_synthetic_messages[msg_id] = completed
255-
_synthetic_messages[att_id] = {"sql_query": sql_query, "token": token, "space_id": space_id}
256-
for _att in completed.get("attachments", []):
257-
if isinstance(_att, dict) and _att.get("query") and _att.get("attachment_id"):
258-
_synthetic_messages[_att["attachment_id"]] = {"sql_query": sql_query, "token": token, "space_id": space_id}
270+
async with _get_message_lock(msg_id):
271+
_synthetic_messages[msg_id] = completed
272+
_synthetic_messages[att_id] = {"sql_query": sql_query, "token": token, "space_id": space_id}
273+
for _att in completed.get("attachments", []):
274+
if isinstance(_att, dict) and _att.get("query") and _att.get("attachment_id"):
275+
_synthetic_messages[_att["attachment_id"]] = {"sql_query": sql_query, "token": token, "space_id": space_id}
259276

260277
# Now execute SQL (poll arriving here sees stage=processing_genie, not received)
261278
actual_result = None
@@ -268,12 +285,14 @@ async def _process_genie_background(
268285
logger.warning("execute_sql after cache miss failed: %s", e)
269286

270287
# Update _proxy to final state
271-
_synthetic_messages[msg_id]["_proxy"] = {
272-
"stage": "completed",
273-
"from_cache": False,
274-
"sql_query": sql_query,
275-
"result": actual_result,
276-
}
288+
async with _get_message_lock(msg_id):
289+
_synthetic_messages[msg_id]["_proxy"] = {
290+
"stage": "completed",
291+
"from_cache": False,
292+
"sql_query": sql_query,
293+
"result": actual_result,
294+
}
295+
_release_message_lock(msg_id)
277296

278297
# Save query log
279298
try:
@@ -288,45 +307,62 @@ async def _process_genie_background(
288307
)
289308
except Exception as e:
290309
logger.warning("Failed to save cache miss query log: %s", e)
291-
292310
return
293311

294312
# Non-COMPLETED terminal status
295-
_synthetic_messages[msg_id] = {
296-
"conversation_id": CONV_PREFIX + msg_id[len(MSG_PREFIX):],
297-
"message_id": msg_id,
298-
"status": result.get("status", "FAILED"),
299-
"attachments": [],
300-
"error": result.get("error"),
301-
"_proxy": {"stage": "failed", "from_cache": False, "sql_query": None, "result": None},
302-
}
313+
async with _get_message_lock(msg_id):
314+
_synthetic_messages[msg_id] = {
315+
"conversation_id": CONV_PREFIX + msg_id[len(MSG_PREFIX):],
316+
"message_id": msg_id,
317+
"status": result.get("status", "FAILED"),
318+
"attachments": [],
319+
"error": result.get("error"),
320+
"_proxy": {"stage": "failed", "from_cache": False, "sql_query": None, "result": None},
321+
}
322+
_release_message_lock(msg_id)
303323
return
304324

305325
except GenieRateLimitError as e:
306-
logger.info("Genie 429 in background, waiting %ss (attempt %d/%d)", e.retry_after, attempt + 1, max_retries + 1)
326+
attempt += 1
327+
logger.info("Genie 429 in background, waiting %ss (attempt %d/%d)", e.retry_after, attempt, max_retries + 1)
307328
await asyncio.sleep(e.retry_after)
308329
last_error = str(e)
309330
continue
331+
except GenieConfigError as e:
332+
logger.error("Non-retryable Genie error %d for msg_id=%s: %s", e.status_code, msg_id, e.detail)
333+
async with _get_message_lock(msg_id):
334+
_synthetic_messages[msg_id] = {
335+
"conversation_id": CONV_PREFIX + msg_id[len(MSG_PREFIX):],
336+
"message_id": msg_id,
337+
"status": "FAILED",
338+
"attachments": [],
339+
"error": {"error": e.detail, "type": "CONFIG_ERROR"},
340+
"_proxy": {"stage": "failed", "from_cache": False, "sql_query": None, "result": None},
341+
}
342+
_release_message_lock(msg_id)
343+
return
310344
except Exception as e:
311345
last_error = str(e)
312346
attempt += 1
313347
if attempt <= max_retries:
314-
wait = [5, 15, 30][min(attempt - 1, 2)]
315-
logger.warning("Background Genie attempt %d failed: %s, retrying in %ds", attempt, e, wait)
348+
wait = exponential_backoff(attempt - 1, base=2.0, cap=30.0)
349+
logger.warning("Background Genie attempt %d failed: %s, retrying in %.1fs", attempt, e, wait)
316350
await asyncio.sleep(wait)
317351
continue
318352
break
319353

320354
# Fallback: all retries exhausted — ALWAYS set FAILED so client stops polling
321355
logger.error("Background processing failed for msg_id=%s: %s", msg_id, last_error)
322-
_synthetic_messages[msg_id] = {
323-
"conversation_id": CONV_PREFIX + msg_id[len(MSG_PREFIX):],
324-
"message_id": msg_id,
325-
"status": "FAILED",
326-
"attachments": [],
327-
"error": {"error": last_error or "All retries exhausted", "type": "INTERNAL_ERROR"},
328-
"_proxy": {"stage": "failed", "from_cache": False, "sql_query": None, "result": None},
329-
}
356+
async with _get_message_lock(msg_id):
357+
_synthetic_messages[msg_id] = {
358+
"conversation_id": CONV_PREFIX + msg_id[len(MSG_PREFIX):],
359+
"message_id": msg_id,
360+
"status": "FAILED",
361+
"attachments": [],
362+
"error": {"error": last_error or "All retries exhausted", "type": "INTERNAL_ERROR"},
363+
"_proxy": {"stage": "failed", "from_cache": False, "sql_query": None, "result": None},
364+
}
365+
_release_message_lock(msg_id)
330366

331367

332368
# ---------------------------------------------------------------------------
@@ -426,8 +462,10 @@ async def _handle_query(
426462
"sql_query": sql_query,
427463
"result": actual_result,
428464
}
429-
_synthetic_messages[msg_id] = response
430-
_synthetic_messages[att_id] = {"sql_query": sql_query, "token": token, "space_id": space_id}
465+
async with _get_message_lock(msg_id):
466+
_synthetic_messages[msg_id] = response
467+
_synthetic_messages[att_id] = {"sql_query": sql_query, "token": token, "space_id": space_id}
468+
_release_message_lock(msg_id)
431469

432470
# Save query log
433471
try:
@@ -449,7 +487,8 @@ async def _handle_query(
449487
conv_id, msg_id, att_id = _make_synthetic_ids()
450488
response = _format_executing_response(conv_id, msg_id)
451489
response["_proxy"] = {"stage": "cache_miss", "from_cache": False, "sql_query": None, "result": None}
452-
_synthetic_messages[msg_id] = response
490+
async with _get_message_lock(msg_id):
491+
_synthetic_messages[msg_id] = response
453492

454493
task = asyncio.create_task(_process_genie_background(
455494
space_id=space_id,
@@ -477,6 +516,9 @@ def _on_task_done(t):
477516
"error": {"error": f"Background task crashed: {exc}", "type": "INTERNAL_ERROR"},
478517
"_proxy": {"stage": "failed", "from_cache": False, "sql_query": None, "result": None},
479518
}
519+
_release_message_lock(msg_id)
520+
elif t.cancelled():
521+
_release_message_lock(msg_id)
480522

481523
task.add_done_callback(_on_task_done)
482524

backend/app/utils.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
import random
2+
3+
4+
def exponential_backoff(attempt: int, base: float = 1.0, cap: float = 30.0) -> float:
5+
"""Exponential backoff with proportional jitter, hard-capped at *cap*."""
6+
raw = base * (2 ** attempt)
7+
jittered = raw + random.uniform(0, raw * 0.5)
8+
return min(jittered, cap)

0 commit comments

Comments
 (0)