Skip to content

Commit 5a0f4dd

Browse files
committed
Retry transient failures in Xylem Data Lake adapter
Vendor-side instability this past week killed standard DAG runs in two places that previously had no retry: - 5xx from /api/v1/sqllab/execute/ during chunk queries (fatal at raise_for_status) - 401 from /api/v1/me/ during initial Superset login (fatal in _create_superset_session) Adds retry-with-backoff (10s/20s/40s, 3 attempts) at both points, plus the same retry path for ConnectionError and Timeout exceptions on the query POST. Re-auths triggered mid-query also flow through the new auth-retry wrapper.
1 parent 03cbae2 commit 5a0f4dd

2 files changed

Lines changed: 187 additions & 3 deletions

File tree

amiadapters/adapters/xylem_datalake.py

Lines changed: 69 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,8 @@ class DatalakeWaterRegister:
9191
DELAY_SECONDS = 2
9292
MAX_AUTH_ATTEMPTS = 3
9393
AUTH_BACKOFF_BASE = 10
94+
MAX_SERVER_ATTEMPTS = 3
95+
SERVER_BACKOFF_BASE = 10
9496
CSRF_REFRESH_INTERVAL = 10
9597

9698

@@ -204,6 +206,39 @@ def _create_superset_session(
204206
return session
205207

206208

209+
def _create_superset_session_with_retry(
210+
datalake_url: str,
211+
superset_url: str,
212+
client_id: str,
213+
username: str,
214+
password: str,
215+
) -> requests.Session:
216+
"""Wrap _create_superset_session with retry-with-backoff on transient failures.
217+
218+
Vendor-side flakes have produced 401 from /api/v1/me/ even after a successful
219+
Keycloak handshake. A short backoff and retry gets through most of these.
220+
"""
221+
last_exc = None
222+
for attempt in range(MAX_AUTH_ATTEMPTS + 1):
223+
if attempt > 0:
224+
delay = AUTH_BACKOFF_BASE * (2 ** (attempt - 1))
225+
logger.info(
226+
f"Backing off {delay}s before auth retry "
227+
f"(attempt {attempt + 1}/{MAX_AUTH_ATTEMPTS + 1})"
228+
)
229+
time.sleep(delay)
230+
try:
231+
return _create_superset_session(
232+
datalake_url, superset_url, client_id, username, password
233+
)
234+
except (RuntimeError, requests.exceptions.RequestException) as e:
235+
logger.warning(
236+
f"Auth attempt {attempt + 1}/{MAX_AUTH_ATTEMPTS + 1} failed: {e}"
237+
)
238+
last_exc = e
239+
raise last_exc
240+
241+
207242
def _refresh_csrf(session: requests.Session, superset_url: str) -> None:
208243
"""Refresh the CSRF token on an existing session."""
209244
csrf_resp = session.get(
@@ -291,7 +326,7 @@ def _extract(
291326
extract_range_start: datetime,
292327
extract_range_end: datetime,
293328
) -> ExtractOutput:
294-
self._session = _create_superset_session(
329+
self._session = _create_superset_session_with_retry(
295330
self.datalake_url,
296331
self.superset_url,
297332
self.client_id,
@@ -365,6 +400,7 @@ def _query_chunked(
365400
def _query(self, sql: str) -> List[dict]:
366401
"""Execute a SQL query via the Superset SQL Lab API with session management."""
367402
auth_failures = 0
403+
server_failures = 0
368404

369405
while True:
370406
# Refresh CSRF periodically
@@ -393,6 +429,36 @@ def _query(self, sql: str) -> List[dict]:
393429
self._session = self._reauth(auth_failures - 1)
394430
self._requests_since_csrf = 0
395431
continue
432+
except (
433+
requests.exceptions.ConnectionError,
434+
requests.exceptions.Timeout,
435+
) as e:
436+
server_failures += 1
437+
logger.warning(f"Connection error ({type(e).__name__}): {e}")
438+
if server_failures > MAX_SERVER_ATTEMPTS:
439+
raise
440+
delay = SERVER_BACKOFF_BASE * (2 ** (server_failures - 1))
441+
logger.info(
442+
f"Backing off {delay}s before retry "
443+
f"(attempt {server_failures}/{MAX_SERVER_ATTEMPTS})"
444+
)
445+
time.sleep(delay)
446+
continue
447+
448+
if 500 <= resp.status_code < 600:
449+
server_failures += 1
450+
logger.warning(
451+
f"Server error HTTP {resp.status_code}: {resp.text[:300]}"
452+
)
453+
if server_failures > MAX_SERVER_ATTEMPTS:
454+
resp.raise_for_status()
455+
delay = SERVER_BACKOFF_BASE * (2 ** (server_failures - 1))
456+
logger.info(
457+
f"Backing off {delay}s before retry "
458+
f"(attempt {server_failures}/{MAX_SERVER_ATTEMPTS})"
459+
)
460+
time.sleep(delay)
461+
continue
396462

397463
if _session_expired(resp):
398464
auth_failures += 1
@@ -413,6 +479,7 @@ def _query(self, sql: str) -> List[dict]:
413479
resp.raise_for_status()
414480
result = resp.json()
415481
auth_failures = 0
482+
server_failures = 0
416483

417484
if result.get("status") == "error":
418485
msg = result.get("error") or result.get("message") or "unknown"
@@ -433,7 +500,7 @@ def _reauth(self, consecutive_failures: int) -> requests.Session:
433500
time.sleep(delay)
434501
else:
435502
logger.info("Re-authenticating")
436-
return _create_superset_session(
503+
return _create_superset_session_with_retry(
437504
self.datalake_url,
438505
self.superset_url,
439506
self.client_id,

test/amiadapters/test_xylem_datalake.py

Lines changed: 118 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,14 @@
11
import json
2+
from unittest.mock import MagicMock, patch
3+
4+
import requests
25

36
from amiadapters.outputs.base import ExtractOutput
47
from amiadapters.models import DataclassJSONEncoder, GeneralMeter, GeneralMeterRead
5-
from amiadapters.adapters.xylem_datalake import XylemDatalakeAdapter
8+
from amiadapters.adapters.xylem_datalake import (
9+
XylemDatalakeAdapter,
10+
_create_superset_session_with_retry,
11+
)
612
from test.base_test_case import BaseTestCase
713

814

@@ -235,3 +241,114 @@ def test_parse_flowtime_with_space(self):
235241

236242
result = self.adapter._parse_flowtime("2026-04-14 08:00:00")
237243
self.assertEqual(result, datetime(2026, 4, 14, 8, 0, 0))
244+
245+
246+
def _query_response(
247+
status_code, json_data=None, text="", content_type="application/json"
248+
):
249+
resp = MagicMock(spec=requests.Response)
250+
resp.status_code = status_code
251+
resp.headers = {"Content-Type": content_type}
252+
resp.text = text
253+
resp.json.return_value = json_data if json_data is not None else {}
254+
if 400 <= status_code < 600:
255+
resp.raise_for_status.side_effect = requests.exceptions.HTTPError(
256+
f"{status_code} Error", response=resp
257+
)
258+
else:
259+
resp.raise_for_status.return_value = None
260+
return resp
261+
262+
263+
class TestXylemDatalakeQueryRetry(BaseTestCase):
264+
"""Tests for retry behavior in _query and the auth retry wrapper."""
265+
266+
def setUp(self):
267+
self.adapter = XylemDatalakeAdapter(
268+
org_id="test_org",
269+
org_timezone="America/Los_Angeles",
270+
pipeline_configuration=None,
271+
configured_task_output_controller=self.TEST_TASK_OUTPUT_CONTROLLER_CONFIGURATION,
272+
configured_metrics=self.TEST_METRICS_CONFIGURATION,
273+
agency_code="hlsbo",
274+
database_id=1,
275+
client_id="test-client-id",
276+
username="test",
277+
password="test",
278+
configured_sinks=[],
279+
)
280+
self.adapter._session = MagicMock()
281+
self.adapter._requests_since_csrf = 0
282+
283+
@patch("amiadapters.adapters.xylem_datalake.time.sleep")
284+
def test_query_retries_on_5xx_then_succeeds(self, _mock_sleep):
285+
success = _query_response(
286+
200, json_data={"status": "success", "data": [{"x": 1}]}
287+
)
288+
self.adapter._session.post.side_effect = [
289+
_query_response(500, text="boom"),
290+
_query_response(502, text="bad gateway"),
291+
success,
292+
]
293+
rows = self.adapter._query("SELECT 1")
294+
self.assertEqual(rows, [{"x": 1}])
295+
self.assertEqual(self.adapter._session.post.call_count, 3)
296+
297+
@patch("amiadapters.adapters.xylem_datalake.time.sleep")
298+
def test_query_raises_after_max_5xx_attempts(self, _mock_sleep):
299+
self.adapter._session.post.return_value = _query_response(500, text="boom")
300+
with self.assertRaises(requests.exceptions.HTTPError):
301+
self.adapter._query("SELECT 1")
302+
303+
@patch("amiadapters.adapters.xylem_datalake.time.sleep")
304+
def test_query_retries_on_connection_error_then_succeeds(self, _mock_sleep):
305+
success = _query_response(
306+
200, json_data={"status": "success", "data": [{"x": 1}]}
307+
)
308+
self.adapter._session.post.side_effect = [
309+
requests.exceptions.ConnectionError("dns failure"),
310+
requests.exceptions.Timeout("read timeout"),
311+
success,
312+
]
313+
rows = self.adapter._query("SELECT 1")
314+
self.assertEqual(rows, [{"x": 1}])
315+
self.assertEqual(self.adapter._session.post.call_count, 3)
316+
317+
318+
class TestCreateSupersetSessionWithRetry(BaseTestCase):
319+
"""Tests for the auth retry wrapper."""
320+
321+
@patch("amiadapters.adapters.xylem_datalake.time.sleep")
322+
@patch("amiadapters.adapters.xylem_datalake._create_superset_session")
323+
def test_returns_session_on_first_success(self, mock_create, _mock_sleep):
324+
sentinel = MagicMock(name="session")
325+
mock_create.return_value = sentinel
326+
result = _create_superset_session_with_retry(
327+
"https://dl", "https://sup", "client", "user", "pw"
328+
)
329+
self.assertIs(result, sentinel)
330+
self.assertEqual(mock_create.call_count, 1)
331+
332+
@patch("amiadapters.adapters.xylem_datalake.time.sleep")
333+
@patch("amiadapters.adapters.xylem_datalake._create_superset_session")
334+
def test_retries_then_succeeds(self, mock_create, _mock_sleep):
335+
sentinel = MagicMock(name="session")
336+
mock_create.side_effect = [
337+
RuntimeError("Superset login returned unexpected shape (status 401)"),
338+
RuntimeError("Superset login returned unexpected shape (status 401)"),
339+
sentinel,
340+
]
341+
result = _create_superset_session_with_retry(
342+
"https://dl", "https://sup", "client", "user", "pw"
343+
)
344+
self.assertIs(result, sentinel)
345+
self.assertEqual(mock_create.call_count, 3)
346+
347+
@patch("amiadapters.adapters.xylem_datalake.time.sleep")
348+
@patch("amiadapters.adapters.xylem_datalake._create_superset_session")
349+
def test_raises_after_max_attempts(self, mock_create, _mock_sleep):
350+
mock_create.side_effect = RuntimeError("persistent failure")
351+
with self.assertRaises(RuntimeError):
352+
_create_superset_session_with_retry(
353+
"https://dl", "https://sup", "client", "user", "pw"
354+
)

0 commit comments

Comments
 (0)