From a6e06f16f157b1eb8c103195018ee91ac9977faa Mon Sep 17 00:00:00 2001 From: Dariusz Paluch Date: Wed, 13 May 2026 13:08:32 +0200 Subject: [PATCH 1/5] PPPSYS-56851 Serialize OAuth token refresh and harden 401 retry path --- piwik_pro_log_analytics/import_logs.py | 29 +++++++++++++++----------- 1 file changed, 17 insertions(+), 12 deletions(-) diff --git a/piwik_pro_log_analytics/import_logs.py b/piwik_pro_log_analytics/import_logs.py index 10a903e..7877693 100755 --- a/piwik_pro_log_analytics/import_logs.py +++ b/piwik_pro_log_analytics/import_logs.py @@ -576,6 +576,8 @@ class Error(Exception): pass piwik_token = None + # Process-wide: serializes OAuth token refresh across threads. + _token_lock = threading.Lock() def _create_parser(self): """ @@ -1341,10 +1343,12 @@ def get_resolver(self): return DynamicResolver() def init_token_auth(self): - self.piwik_token = None - if not config.options.replay_tracking: - self.piwik_token = self._get_token_auth() - logging.debug("Authentication token is: %s", self.piwik_token) + # ``replay_tracking`` is read from global ``config`` (tests call this on a bare instance). + with self._token_lock: + self.piwik_token = None + if not config.options.replay_tracking: + self.piwik_token = self._get_token_auth() + logging.debug("Authentication token is: %s", self.piwik_token) class Statistics: @@ -1805,12 +1809,10 @@ def _call_wrapper(self, func, expected_response, on_failure, *args, **kwargs): errors += 1 if errors == max_attempts: logging.info("Max number of attempts reached, server is unreachable!") - raise PiwikHttpBase.Error(message, code) - else: - logging.info("Retrying request, attempt number %d" % (errors + 1)) - time.sleep(delay_after_failure) + logging.info("Retrying request, attempt number %d" % (errors + 1)) + time.sleep(delay_after_failure) def _parse_http_exception(self, e): code = None @@ -1832,11 +1834,14 @@ def _call_authentication_wrapper(self, func, *args, **kwargs): try: return func(*args, **kwargs) except urllib.error.URLError as e: - if getattr(e, "code", None) == 401: - config.init_token_auth() - return func(*args, **kwargs) - else: + if getattr(e, "code", None) != 401: raise + token_before = config.piwik_token + config.init_token_auth() + # Another thread may have refreshed the token while we waited; retry only if it changed. + if config.piwik_token != token_before: + return func(*args, **kwargs) + raise def auth_call(self, path, args, headers=None, data=None): return self._call_authentication_wrapper(self._call, path, args, headers, data=data) From 91635dd9a67235301f9cb9cbada216ea07a2b47b Mon Sep 17 00:00:00 2001 From: Dariusz Paluch Date: Wed, 13 May 2026 12:50:14 +0200 Subject: [PATCH 2/5] PPPSYS-56851 Add tests for OAuth token refresh and 401 handling --- tests/test_main.py | 146 +++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 142 insertions(+), 4 deletions(-) diff --git a/tests/test_main.py b/tests/test_main.py index 11ece42..4655aac 100644 --- a/tests/test_main.py +++ b/tests/test_main.py @@ -1,10 +1,18 @@ # vim: et sw=4 ts=4: import datetime +import io import json +import logging import os import re +import threading +import time +import urllib.error from collections import OrderedDict +from contextlib import contextmanager +from unittest.mock import MagicMock, patch +import pytest from piwik_pro_log_analytics import import_logs @@ -1422,8 +1430,138 @@ def test_bz2_parsing(): def test_static_resolver_with_idsite(): - import_logs.piwik = Piwik() - import_logs.stats = import_logs.Statistics() - import_logs.resolver = import_logs.StaticResolver("194edb22-394a-48e5-aed8-0797ab29d2ae") + with _import_logs_config(_mock_config(replay_tracking=True)): + import_logs.piwik = Piwik() + import_logs.stats = import_logs.Statistics() + import_logs.resolver = import_logs.StaticResolver("194edb22-394a-48e5-aed8-0797ab29d2ae") + + assert "194edb22-394a-48e5-aed8-0797ab29d2ae" in import_logs.stats.piwik_sites + + +# --------------------------------------------------------------------------- +# Helpers for OAuth tests +# --------------------------------------------------------------------------- + + +def _make_http_error(code, headers=None): + """Build a urllib.error.HTTPError with the given status code and optional headers.""" + e = urllib.error.HTTPError(url="http://example.com", code=code, msg="", hdrs=None, fp=io.BytesIO(b"")) + e.code = code + if headers is not None: + e.headers = headers + return e + + +def _make_piwik_http(): + """Return a fresh PiwikHttpUrllib instance.""" + return import_logs.PiwikHttpUrllib() + + +def _mock_config(delay_after_failure=0, max_attempts=3, replay_tracking=False): + """Return a MagicMock that satisfies config.options accesses in _call_wrapper.""" + cfg = MagicMock() + cfg.options.delay_after_failure = delay_after_failure + cfg.options.max_attempts = max_attempts + cfg.options.replay_tracking = replay_tracking + return cfg + + +def _set_config(new_config): + """Set import_logs.config, returning (had_config, old_config) for restore.""" + had = hasattr(import_logs, "config") + old = getattr(import_logs, "config", None) + import_logs.config = new_config + return had, old + + +def _restore_config(had, old): + if had: + import_logs.config = old + elif hasattr(import_logs, "config"): + delattr(import_logs, "config") + + +@contextmanager +def _import_logs_config(cfg): + """Temporarily set ``import_logs.config``.""" + had, old = _set_config(cfg) + try: + yield + finally: + _restore_config(had, old) + + +# --------------------------------------------------------------------------- +# OAuth token refresh +# --------------------------------------------------------------------------- + + +def test_init_token_auth_no_concurrent_fetches(): + """Two threads calling init_token_auth() concurrently must not fetch in parallel — + the lock should serialize them so at most one fetch is in-flight at a time.""" + concurrent_count = [0] + max_concurrent = [0] + + def slow_get_token(self): + concurrent_count[0] += 1 + max_concurrent[0] = max(max_concurrent[0], concurrent_count[0]) + time.sleep(0.05) # simulate network latency + concurrent_count[0] -= 1 + return {"token_type": "Bearer", "access_token": "tok"} + + with _import_logs_config(_mock_config()): + with patch.object(import_logs.Configuration, "_get_token_auth", slow_get_token): + cfg = import_logs.Configuration.__new__(import_logs.Configuration) + cfg.piwik_token = None + t1 = threading.Thread(target=cfg.init_token_auth) + t2 = threading.Thread(target=cfg.init_token_auth) + t1.start() + t2.start() + t1.join() + t2.join() + + assert max_concurrent[0] == 1, "Fetches must not overlap — max concurrent was %d" % max_concurrent[0] + assert cfg.piwik_token is not None + + +def test_call_authentication_wrapper_retries_on_401_when_token_refreshed(): + """On 401, wrapper refreshes token and retries the call.""" + piwik = _make_piwik_http() + call_count = [0] + + def func(): + call_count[0] += 1 + if call_count[0] == 1: + raise _make_http_error(401) + return "ok" + + cfg = _mock_config() + cfg.piwik_token = "old-token" + + def refresh(): + cfg.piwik_token = "new-token" + + cfg.init_token_auth = refresh + + with _import_logs_config(cfg): + result = piwik._call_authentication_wrapper(func) + assert result == "ok" + assert call_count[0] == 2 + + +def test_call_authentication_wrapper_raises_when_token_unchanged_after_refresh(): + """If the token did not change after refresh (e.g. refresh also failed), re-raise 401.""" + piwik = _make_piwik_http() + + def func(): + raise _make_http_error(401) + + cfg = _mock_config() + cfg.piwik_token = "same-token" + cfg.init_token_auth = lambda: None # no-op — token stays the same + + with _import_logs_config(cfg): + with pytest.raises(urllib.error.HTTPError) as exc_info: + piwik._call_authentication_wrapper(func) + assert exc_info.value.code == 401 - assert "194edb22-394a-48e5-aed8-0797ab29d2ae" in import_logs.stats.piwik_sites From 1e5ff91b55ef4fc55c5537ea029fad467e6f31f6 Mon Sep 17 00:00:00 2001 From: Dariusz Paluch Date: Wed, 13 May 2026 12:15:11 +0000 Subject: [PATCH 3/5] PPPSYS-56851 Handle HTTP 429 with Retry-After and shared worker backoff --- piwik_pro_log_analytics/import_logs.py | 32 ++++++++++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/piwik_pro_log_analytics/import_logs.py b/piwik_pro_log_analytics/import_logs.py index 7877693..99b8967 100755 --- a/piwik_pro_log_analytics/import_logs.py +++ b/piwik_pro_log_analytics/import_logs.py @@ -33,6 +33,7 @@ import os import os.path import queue +import random import re import socket import ssl @@ -1665,6 +1666,20 @@ class PiwikHttpUrllib(PiwikHttpBase): Make requests to Piwik PRO. """ + _rate_limit_until = 0.0 + _rate_limit_lock = threading.Lock() + + @staticmethod + def _wait_until_rate_limit_allows(): + while True: + wait_until = PiwikHttpUrllib._rate_limit_until + now = time.time() + if wait_until <= now: + break + # Jitter spreads workers out so they don't all fire at once after the window expires. + jitter = random.uniform(0, (wait_until - now) * 0.1) + time.sleep(wait_until - now + jitter) + class RedirectHandlerWithLogging(urllib.request.HTTPRedirectHandler): """ Special implementation of HTTPRedirectHandler that logs redirects in debug mode @@ -1776,6 +1791,7 @@ def _call_wrapper(self, func, expected_response, on_failure, *args, **kwargs): """ errors = 0 while True: + self._wait_until_rate_limit_allows() try: response = func(*args, **kwargs) if expected_response is not None and response != expected_response: @@ -1811,6 +1827,22 @@ def _call_wrapper(self, func, expected_response, on_failure, *args, **kwargs): logging.info("Max number of attempts reached, server is unreachable!") raise PiwikHttpBase.Error(message, code) + if code == 429: + wait = delay_after_failure + retry_after = (getattr(e, "headers", None) or {}).get("Retry-After") + if retry_after: + try: + wait = float(retry_after) + except ValueError: + logging.debug( + "Could not parse Retry-After header value %r, using default delay", + retry_after, + ) + with PiwikHttpUrllib._rate_limit_lock: + PiwikHttpUrllib._rate_limit_until = max(PiwikHttpUrllib._rate_limit_until, time.time() + wait) + logging.info("Rate limited (429), waiting %.1f seconds before retry", wait) + continue + logging.info("Retrying request, attempt number %d" % (errors + 1)) time.sleep(delay_after_failure) From 12f2a488e17d00f994bbe6c203570f901ec32de6 Mon Sep 17 00:00:00 2001 From: Dariusz Paluch Date: Wed, 13 May 2026 12:50:41 +0200 Subject: [PATCH 4/5] PPPSYS-56851 Add tests for HTTP 429 and shared rate-limit window --- tests/test_main.py | 151 +++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 146 insertions(+), 5 deletions(-) diff --git a/tests/test_main.py b/tests/test_main.py index 4655aac..cabad4a 100644 --- a/tests/test_main.py +++ b/tests/test_main.py @@ -1439,7 +1439,7 @@ def test_static_resolver_with_idsite(): # --------------------------------------------------------------------------- -# Helpers for OAuth tests +# Helpers for OAuth / rate-limit tests # --------------------------------------------------------------------------- @@ -1453,7 +1453,8 @@ def _make_http_error(code, headers=None): def _make_piwik_http(): - """Return a fresh PiwikHttpUrllib instance.""" + """Return a fresh PiwikHttpUrllib with shared rate-limit state reset.""" + import_logs.PiwikHttpUrllib._rate_limit_until = 0.0 return import_logs.PiwikHttpUrllib() @@ -1482,17 +1483,27 @@ def _restore_config(had, old): @contextmanager -def _import_logs_config(cfg): - """Temporarily set ``import_logs.config``.""" +def _import_logs_config(cfg, reset_rate_limit_after=False): + """Temporarily set ``import_logs.config``; optionally reset shared 429 clock after.""" had, old = _set_config(cfg) try: yield finally: _restore_config(had, old) + if reset_rate_limit_after: + import_logs.PiwikHttpUrllib._rate_limit_until = 0.0 + + +def _rate_limit_test_config(delay_after_failure, max_attempts=3): + """Mock ``config`` for 429 tests; resets shared rate-limit deadline after.""" + return _import_logs_config( + _mock_config(delay_after_failure=delay_after_failure, max_attempts=max_attempts), + reset_rate_limit_after=True, + ) # --------------------------------------------------------------------------- -# OAuth token refresh +# OAuth refresh and HTTP 429 handling # --------------------------------------------------------------------------- @@ -1565,3 +1576,133 @@ def func(): piwik._call_authentication_wrapper(func) assert exc_info.value.code == 401 + +def test_call_wrapper_respects_retry_after_header(): + """429 with numeric Retry-After sets _rate_limit_until to now + header value.""" + piwik = _make_piwik_http() + call_count = [0] + + def func(*a, **kw): + call_count[0] += 1 + if call_count[0] == 1: + raise _make_http_error(429, headers={"Retry-After": "30"}) + return "ok" + + with _rate_limit_test_config(10), patch("piwik_pro_log_analytics.import_logs.time") as mock_time: + mock_time.time.side_effect = [0.0, 0.0, 0.0, 31.0] + mock_time.sleep = MagicMock() + result = piwik._call_wrapper(func, None, None) + + assert result == "ok" + assert call_count[0] == 2 + assert import_logs.PiwikHttpUrllib._rate_limit_until == pytest.approx(30.0, abs=0.1) + mock_time.sleep.assert_called_once() + sleep_arg = mock_time.sleep.call_args[0][0] + assert sleep_arg == pytest.approx(30.0, abs=3.5) # 30 + up to 10% jitter + + +def test_call_wrapper_falls_back_to_default_delay_without_retry_after(): + """429 without Retry-After header uses delay_after_failure as the wait.""" + piwik = _make_piwik_http() + call_count = [0] + + def func(*a, **kw): + call_count[0] += 1 + if call_count[0] == 1: + raise _make_http_error(429) + return "ok" + + with _rate_limit_test_config(7), patch("piwik_pro_log_analytics.import_logs.time") as mock_time: + mock_time.time.side_effect = [0.0, 0.0, 0.0, 8.0] + mock_time.sleep = MagicMock() + result = piwik._call_wrapper(func, None, None) + + assert result == "ok" + assert import_logs.PiwikHttpUrllib._rate_limit_until == pytest.approx(7.0, abs=0.1) + sleep_arg = mock_time.sleep.call_args[0][0] + assert sleep_arg == pytest.approx(7.0, abs=0.8) # 7 + up to 10% jitter + + +def test_call_wrapper_logs_debug_on_malformed_retry_after(caplog): + """Non-numeric Retry-After (e.g. HTTP-date) logs debug and falls back to default delay.""" + piwik = _make_piwik_http() + call_count = [0] + + def func(*a, **kw): + call_count[0] += 1 + if call_count[0] == 1: + raise _make_http_error(429, headers={"Retry-After": "Wed, 21 Oct 2025 07:28:00 GMT"}) + return "ok" + + with ( + _rate_limit_test_config(5), + caplog.at_level(logging.DEBUG), + patch("piwik_pro_log_analytics.import_logs.time") as mock_time, + ): + mock_time.time.side_effect = [0.0, 0.0, 0.0, 6.0] + mock_time.sleep = MagicMock() + result = piwik._call_wrapper(func, None, None) + + assert result == "ok" + assert any("Retry-After" in r.message for r in caplog.records) + assert import_logs.PiwikHttpUrllib._rate_limit_until == pytest.approx(5.0, abs=0.1) + + +def test_call_wrapper_429_counts_against_max_attempts(): + """429 responses exhaust max_attempts and raise PiwikHttpBase.Error.""" + piwik = _make_piwik_http() + + def func(*a, **kw): + raise _make_http_error(429) + + with _rate_limit_test_config(0), patch("piwik_pro_log_analytics.import_logs.time") as mock_time: + mock_time.time.return_value = 0.0 + mock_time.sleep = MagicMock() + with pytest.raises(import_logs.PiwikHttpBase.Error) as exc_info: + piwik._call_wrapper(func, None, None) + assert exc_info.value.code == 429 + + +def test_rate_limit_shared_across_workers(): + """A rate-limit window set by one worker causes other workers to wait.""" + piwik = _make_piwik_http() + import_logs.PiwikHttpUrllib._rate_limit_until = 100.0 + + def func(*a, **kw): + return "ok" + + with _rate_limit_test_config(1), patch("piwik_pro_log_analytics.import_logs.time") as mock_time: + sleep_calls = [] + mock_time.time.side_effect = [0.0, 101.0] + mock_time.sleep = lambda n: sleep_calls.append(n) + result = piwik._call_wrapper(func, None, None) + + assert result == "ok" + assert len(sleep_calls) == 1, "Worker should have slept once for the rate-limit window" + assert sleep_calls[0] == pytest.approx(100.0, abs=11.0) # 100 + up to 10% jitter + + +def test_rate_limit_until_extended_mid_sleep_is_respected(): + """If _rate_limit_until is extended while a worker sleeps, the worker re-checks + and sleeps again for the remainder.""" + piwik = _make_piwik_http() + import_logs.PiwikHttpUrllib._rate_limit_until = 5.0 + + sleep_calls = [] + + def extending_sleep(n): + sleep_calls.append(n) + if len(sleep_calls) == 1: + # Another worker extends the deadline while this worker sleeps + import_logs.PiwikHttpUrllib._rate_limit_until = 10.0 + + def func(*a, **kw): + return "ok" + + with _rate_limit_test_config(1), patch("piwik_pro_log_analytics.import_logs.time") as mock_time: + mock_time.time.side_effect = [0.0, 0.0, 11.0] + mock_time.sleep = extending_sleep + result = piwik._call_wrapper(func, None, None) + + assert result == "ok" + assert len(sleep_calls) == 2, "Worker should sleep twice: once for original window, once after extension" From f311c983faf0e420a468d86d85dd005d8fdf365f Mon Sep 17 00:00:00 2001 From: Dariusz Paluch Date: Wed, 13 May 2026 12:50:51 +0200 Subject: [PATCH 5/5] PPPSYS-56851 Document OAuth and HTTP 429 handling in changelog --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 28e99d4..addb1ff 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,8 @@ ## [Unreleased] +- PPPSYS-56851 Serialize concurrent OAuth token initialization and harden 401 refresh retries +- PPPSYS-56851 Handle HTTP 429 using Retry-After and a shared cross-worker rate-limit window - PPPSYS-56851 Require Python 3.10 or newer (3.9 and older are end-of-life); CI tests 3.10 through 3.14 - PPPSYS-56851 Migrated packaging and CI from Poetry to uv (PEP 621, setuptools, committed `uv.lock`); PyPI releases use `uv publish`. - PPPSYS-56851 Added local multi-version testing with tox (`tox.ini`, environments `py310`–`py314` aligned with CI); optional `dev-local` uv dependency group installs tox and tox-uv (see README).