Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

## [Unreleased]

- PPPSYS-56851 Serialize concurrent OAuth token initialization and harden 401 refresh retries
- PPPSYS-56851 Handle HTTP 429 using Retry-After and a shared cross-worker rate-limit window
- PPPSYS-56851 Require Python 3.10 or newer (3.9 and older are end-of-life); CI tests 3.10 through 3.14
- PPPSYS-56851 Migrated packaging and CI from Poetry to uv (PEP 621, setuptools, committed `uv.lock`); PyPI releases use `uv publish`.
- PPPSYS-56851 Added local multi-version testing with tox (`tox.ini`, environments `py310`–`py314` aligned with CI); optional `dev-local` uv dependency group installs tox and tox-uv (see README).
Expand Down
61 changes: 49 additions & 12 deletions piwik_pro_log_analytics/import_logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import os
import os.path
import queue
import random
import re
import socket
import ssl
Expand Down Expand Up @@ -576,6 +577,8 @@ class Error(Exception):
pass

piwik_token = None
# Process-wide: serializes OAuth token refresh across threads.
_token_lock = threading.Lock()

def _create_parser(self):
"""
Expand Down Expand Up @@ -1341,10 +1344,12 @@ def get_resolver(self):
return DynamicResolver()

def init_token_auth(self):
self.piwik_token = None
if not config.options.replay_tracking:
self.piwik_token = self._get_token_auth()
logging.debug("Authentication token is: %s", self.piwik_token)
# ``replay_tracking`` is read from global ``config`` (tests call this on a bare instance).
with self._token_lock:
self.piwik_token = None
if not config.options.replay_tracking:
self.piwik_token = self._get_token_auth()
logging.debug("Authentication token is: %s", self.piwik_token)


class Statistics:
Expand Down Expand Up @@ -1661,6 +1666,20 @@ class PiwikHttpUrllib(PiwikHttpBase):
Make requests to Piwik PRO.
"""

_rate_limit_until = 0.0
_rate_limit_lock = threading.Lock()

@staticmethod
def _wait_until_rate_limit_allows():
while True:
wait_until = PiwikHttpUrllib._rate_limit_until
now = time.time()
if wait_until <= now:
break
# Jitter spreads workers out so they don't all fire at once after the window expires.
jitter = random.uniform(0, (wait_until - now) * 0.1)
time.sleep(wait_until - now + jitter)

class RedirectHandlerWithLogging(urllib.request.HTTPRedirectHandler):
"""
Special implementation of HTTPRedirectHandler that logs redirects in debug mode
Expand Down Expand Up @@ -1772,6 +1791,7 @@ def _call_wrapper(self, func, expected_response, on_failure, *args, **kwargs):
"""
errors = 0
while True:
self._wait_until_rate_limit_allows()
try:
response = func(*args, **kwargs)
if expected_response is not None and response != expected_response:
Expand Down Expand Up @@ -1805,12 +1825,26 @@ def _call_wrapper(self, func, expected_response, on_failure, *args, **kwargs):
errors += 1
if errors == max_attempts:
logging.info("Max number of attempts reached, server is unreachable!")

raise PiwikHttpBase.Error(message, code)
else:
logging.info("Retrying request, attempt number %d" % (errors + 1))

time.sleep(delay_after_failure)
if code == 429:
wait = delay_after_failure
retry_after = (getattr(e, "headers", None) or {}).get("Retry-After")
if retry_after:
try:
wait = float(retry_after)
except ValueError:
logging.debug(
"Could not parse Retry-After header value %r, using default delay",
retry_after,
)
with PiwikHttpUrllib._rate_limit_lock:
PiwikHttpUrllib._rate_limit_until = max(PiwikHttpUrllib._rate_limit_until, time.time() + wait)
logging.info("Rate limited (429), waiting %.1f seconds before retry", wait)
continue

logging.info("Retrying request, attempt number %d" % (errors + 1))
time.sleep(delay_after_failure)

def _parse_http_exception(self, e):
code = None
Expand All @@ -1832,11 +1866,14 @@ def _call_authentication_wrapper(self, func, *args, **kwargs):
try:
return func(*args, **kwargs)
except urllib.error.URLError as e:
if getattr(e, "code", None) == 401:
config.init_token_auth()
return func(*args, **kwargs)
else:
if getattr(e, "code", None) != 401:
raise
token_before = config.piwik_token
config.init_token_auth()
# Another thread may have refreshed the token while we waited; retry only if it changed.
if config.piwik_token != token_before:
return func(*args, **kwargs)
raise

def auth_call(self, path, args, headers=None, data=None):
return self._call_authentication_wrapper(self._call, path, args, headers, data=data)
Expand Down
Loading
Loading