1414import hashlib
1515import json
1616import logging
17+ import random
1718import re
1819import socket
1920import ssl
3839 POOL_MIN_IDLE ,
3940 RELAY_TIMEOUT ,
4041 SCRIPT_BLACKLIST_TTL ,
42+ SCRIPT_PROBE_INTERVAL_MAX ,
43+ SCRIPT_PROBE_INTERVAL_MIN ,
44+ SCRIPT_PROBE_TIMEOUT ,
4145 SEMAPHORE_MAX ,
4246 STATEFUL_HEADER_NAMES ,
4347 STATIC_EXTS ,
5963 validate_range_response ,
6064)
6165from .relay_response import (
66+ classify_relay_envelope ,
6267 classify_relay_error ,
6368 error_response ,
6469 extract_apps_script_user_html ,
@@ -88,6 +93,32 @@ def _mask_sid(sid: str) -> str:
8893 return f"{ sid [:6 ]} \u2026 { sid [- 4 :]} "
8994
9095
96+ class ScriptDeploymentError (Exception ):
97+ """Internal signal that a permanent Apps Script envelope was detected.
98+
99+ Raised by per-SID parse hooks (``_relay_single_h2_with_sid``,
100+ ``_relay_single_h2``, ``_relay_single``, ``_parse_batch_body``) when
101+ :func:`relay_response.classify_relay_envelope` returns a permanent
102+ category (``"quota" | "auth" | "deploy" | "admin"``). Caught by
103+ ``_relay_fanout`` and ``_relay_with_retry`` so the corresponding
104+ script ID is dropped from rotation via ``_blacklist_sid`` and the
105+ next attempt picks a different deployment. Never propagates to
106+ client code paths — the relay always converts it back into a normal
107+ response (a healthy racer's bytes, a retry result, or an upstream
108+ 502) before returning to callers outside this module.
109+ """
110+
111+ def __init__ (self , sid : str , category : str , raw : str ):
112+ self .sid = sid
113+ self .category = category
114+ self .raw = raw
115+ super ().__init__ (f"ScriptDeploymentError(sid={ _mask_sid (sid )} , "
116+ f"category={ category } , raw={ raw [:120 ]!r} )" )
117+
118+ def __str__ (self ) -> str :
119+ return f"{ self .category } : { self .raw [:120 ]} "
120+
121+
91122class DomainFronter :
92123 _STATIC_EXTS = STATIC_EXTS
93124 _H2_FAILURE_COOLDOWN = 15.0 # reduced: DPI token bucket refills in ~8-10s
@@ -151,6 +182,8 @@ def __init__(self, config: dict):
151182 len (self ._script_ids )))
152183 self ._sid_blacklist : dict [str , float ] = {}
153184 self ._blacklist_ttl = SCRIPT_BLACKLIST_TTL
185+ self ._probe_task : asyncio .Task | None = None
186+ self ._probe_semaphore : asyncio .Semaphore = asyncio .Semaphore (1 )
154187
155188 # Per-host stats (requests, cache hits, bytes, cumulative latency).
156189 self ._per_site : dict [str , HostStat ] = {}
@@ -745,6 +778,119 @@ def _prune_blacklist(self, force: bool = False) -> None:
745778 if force or until <= now :
746779 self ._sid_blacklist .pop (sid , None )
747780
781+ async def _probe_one_sid (self , sid : str ) -> None :
782+ """Re-validate a blacklisted SID with one low-cost relay request.
783+
784+ Healthy → drop from `_sid_blacklist` and log recovery.
785+ Permanent envelope or transport error → leave blacklisted, no TTL change.
786+ Never propagates exceptions to client paths (2.10).
787+ """
788+ payload = {"m" : "GET" , "u" : "http://example.com/" , "k" : self .auth_key }
789+ body_bytes = json .dumps (payload ).encode ()
790+ path = self ._exec_path_for_sid (sid )
791+ async with self ._probe_semaphore :
792+ try :
793+ # Probe also consumes Apps Script quota — record it.
794+ self ._record_execution (sid )
795+ transport = self ._pick_h2 () or self ._h2
796+ if transport is None :
797+ # No H2 transport available — skip this probe cycle. The
798+ # H1 fallback path is intentionally not used here so a
799+ # saturated client semaphore can't starve probes.
800+ return
801+ status , headers , body = await asyncio .wait_for (
802+ transport .request (
803+ method = "POST" , path = path , host = self .http_host ,
804+ headers = self ._apps_script_headers (),
805+ body = body_bytes ,
806+ timeout = SCRIPT_PROBE_TIMEOUT ,
807+ ),
808+ timeout = SCRIPT_PROBE_TIMEOUT ,
809+ )
810+ except Exception as exc :
811+ log .debug ("Probe %s failed (%s) — keeping blacklisted" ,
812+ _mask_sid (sid ), exc )
813+ return
814+
815+ category , _raw = classify_relay_envelope (body )
816+ if category is not None :
817+ log .debug ("Probe %s still %s — keeping blacklisted" ,
818+ _mask_sid (sid ), category )
819+ return
820+
821+ # Healthy — recover.
822+ self ._sid_blacklist .pop (sid , None )
823+ log .info ("Re-validated script %s — recovered" , _mask_sid (sid ))
824+
825+ async def _probe_tick (self ) -> None :
826+ """One pass of the probe loop — re-validate every still-blacklisted SID.
827+
828+ No-op when only one script ID is configured (3.7) or when the
829+ blacklist is empty (3.8).
830+ """
831+ if len (self ._script_ids ) <= 1 :
832+ return
833+ sids = [s for s in list (self ._sid_blacklist )
834+ if self ._is_sid_blacklisted (s )]
835+ if not sids :
836+ return
837+ await asyncio .gather (
838+ * (self ._probe_one_sid (s ) for s in sids ),
839+ return_exceptions = True ,
840+ )
841+
842+ async def _probe_blacklisted_sids (self ) -> None :
843+ """Background loop: every uniform(MIN, MAX) seconds re-probe blacklisted SIDs."""
844+ while True :
845+ try :
846+ interval = random .uniform (
847+ SCRIPT_PROBE_INTERVAL_MIN , SCRIPT_PROBE_INTERVAL_MAX ,
848+ )
849+ await asyncio .sleep (interval )
850+ await self ._probe_tick ()
851+ except asyncio .CancelledError :
852+ break
853+ except Exception as exc :
854+ log .debug ("Probe loop error: %s" , exc )
855+
856+ def _is_failed_relay_result (self , sid : str , result : bytes ) -> bool :
857+ """Return True if a successful racer's bytes are actually an Apps
858+ Script permanent-failure envelope (or a 502 synthesised from one).
859+
860+ The parse-site hooks in ``_relay_single_h2_with_sid`` /
861+ ``_relay_single_h2`` / ``_relay_single`` / ``_parse_batch_body``
862+ normally raise :class:`ScriptDeploymentError` before ``_relay_fanout``
863+ ever sees the body, so this inspection is a defence-in-depth check
864+ against bypass paths (test monkeypatches that replace
865+ ``_relay_single_h2_with_sid`` directly, future refactors that
866+ remove the per-SID hook). When this returns True the caller
867+ blacklists the SID (idempotent if already done by the parse hook)
868+ and keeps waiting on the remaining racers instead of forwarding
869+ the 502 to the client (requirement 2.4).
870+ """
871+ # Case 1: caller bypassed the parse hook and handed back the raw
872+ # Apps Script envelope. classify_relay_envelope decides whether
873+ # the envelope error is permanent (quota/auth/deploy/admin) or
874+ # transient/healthy.
875+ category , _raw = classify_relay_envelope (result )
876+ if category is not None :
877+ if not self ._is_sid_blacklisted (sid ):
878+ self ._blacklist_sid (sid , reason = category )
879+ return True
880+ # Case 2: caller bypassed the parse hook but already invoked
881+ # parse_relay_response, so the original envelope has been
882+ # replaced by a synthesised ``HTTP/1.1 502`` response. Without
883+ # the JSON envelope we cannot distinguish permanent from
884+ # transient classes, but in fan-out we always prefer to keep
885+ # waiting on the remaining racers rather than forward a 502 —
886+ # if every racer ends up here we still surface the last
887+ # exception via ``winner_exc``.
888+ if result .startswith (b"HTTP/1.1 502" ):
889+ if not self ._is_sid_blacklisted (sid ):
890+ self ._blacklist_sid (sid , reason = "parsed_502" )
891+ return True
892+ return False
893+
748894 def _pick_fanout_sids (self , key : str | None ) -> list [str ]:
749895 """Pick up to `parallel_relay` distinct non-blacklisted script IDs.
750896
@@ -1054,6 +1200,8 @@ async def _warm_pool(self):
10541200 self ._stats_task = self ._spawn (self ._stats_logger ())
10551201 if self ._execution_task is None :
10561202 self ._execution_task = self ._spawn (self ._execution_logger ())
1203+ if self ._probe_task is None :
1204+ self ._probe_task = self ._spawn (self ._probe_blacklisted_sids ())
10571205 # Start H2 connection (runs alongside H1 pool)
10581206 if self ._h2 :
10591207 self ._spawn (self ._h2_connect_and_warm ())
@@ -1099,6 +1247,7 @@ async def close(self):
10991247 self ._maintenance_task = None
11001248 self ._stats_task = None
11011249 self ._execution_task = None
1250+ self ._probe_task = None
11021251 self ._keepalive_task = None
11031252
11041253 await self ._flush_pool ()
@@ -2563,9 +2712,26 @@ async def _relay_fanout(self, payload: dict) -> bytes:
25632712 exc = t .exception ()
25642713 if exc is None :
25652714 winner_result = t .result ()
2715+ if self ._is_failed_relay_result (sid , winner_result ):
2716+ # Defence-in-depth: the per-SID parse hook
2717+ # normally raises ScriptDeploymentError before
2718+ # we get here, but a bypass path (test
2719+ # monkeypatch on _relay_single_h2_with_sid,
2720+ # future refactor) could still surface a
2721+ # permanent envelope as a "successful" body.
2722+ # Treat this racer as failed and keep waiting
2723+ # on the remaining racers (requirement 2.4).
2724+ winner_exc = RuntimeError (
2725+ f"fan-out racer { _mask_sid (sid )} "
2726+ f"returned permanent envelope"
2727+ )
2728+ continue
25662729 return winner_result
2567- # This racer failed — blacklist and keep waiting for others
2568- self ._blacklist_sid (sid , reason = type (exc ).__name__ )
2730+ # This racer failed — blacklist (unless the parse
2731+ # hook already did with a sharper category reason)
2732+ # and keep waiting for others (requirement 3.3).
2733+ if not isinstance (exc , ScriptDeploymentError ):
2734+ self ._blacklist_sid (sid , reason = type (exc ).__name__ )
25692735 winner_exc = exc
25702736 # All racers failed
25712737 if winner_exc is not None :
@@ -2608,6 +2774,11 @@ async def _relay_single_h2(self, payload: dict) -> bytes:
26082774 len (body ),
26092775 )
26102776
2777+ category , raw = classify_relay_envelope (body )
2778+ if category is not None :
2779+ self ._blacklist_sid (sid , reason = category )
2780+ raise ScriptDeploymentError (sid , category , raw )
2781+
26112782 return parse_relay_response (body , self ._max_response_body_bytes )
26122783
26132784 async def _relay_single_h2_with_sid (self , payload : dict ,
@@ -2631,6 +2802,11 @@ async def _relay_single_h2_with_sid(self, payload: dict,
26312802 timeout = self ._relay_timeout ,
26322803 )
26332804
2805+ category , raw = classify_relay_envelope (body )
2806+ if category is not None :
2807+ self ._blacklist_sid (sid , reason = category )
2808+ raise ScriptDeploymentError (sid , category , raw )
2809+
26342810 return parse_relay_response (body , self ._max_response_body_bytes )
26352811
26362812 async def _follow_redirects (
@@ -2714,7 +2890,6 @@ async def _relay_single(self, payload: dict) -> bytes:
27142890 )
27152891
27162892 await self ._release (reader , writer , created )
2717- return parse_relay_response (resp_body , self ._max_response_body_bytes )
27182893
27192894 except Exception :
27202895 try :
@@ -2723,6 +2898,13 @@ async def _relay_single(self, payload: dict) -> bytes:
27232898 pass
27242899 raise
27252900
2901+ category , raw = classify_relay_envelope (resp_body )
2902+ if category is not None :
2903+ self ._blacklist_sid (sid , reason = category )
2904+ raise ScriptDeploymentError (sid , category , raw )
2905+
2906+ return parse_relay_response (resp_body , self ._max_response_body_bytes )
2907+
27262908 async def _relay_batch (self , payloads : list [dict ]) -> list [bytes ]:
27272909 """Send multiple requests in one POST using Apps Script fetchAll."""
27282910 batch_payload = {
@@ -2763,7 +2945,7 @@ async def _relay_batch(self, payloads: list[dict]) -> list[bytes]:
27632945 len (body ),
27642946 )
27652947 self ._record_h2_success ()
2766- return self ._parse_batch_body (body , payloads )
2948+ return self ._parse_batch_body (body , payloads , sid )
27672949 except Exception as e :
27682950 if self ._is_h2_transport_error (e ):
27692951 self ._record_h2_failure (e )
@@ -2803,11 +2985,23 @@ async def _relay_batch(self, payloads: list[dict]) -> list[bytes]:
28032985 pass
28042986 raise
28052987
2806- return self ._parse_batch_body (resp_body , payloads )
2988+ return self ._parse_batch_body (resp_body , payloads , sid )
28072989
28082990 def _parse_batch_body (self , resp_body : bytes ,
2809- payloads : list [dict ]) -> list [bytes ]:
2991+ payloads : list [dict ],
2992+ sid : str ) -> list [bytes ]:
28102993 """Parse a batch response body into individual results."""
2994+ # Classify the *outer* batch envelope before per-item parsing.
2995+ # A permanent error here ("e" at the top level) means the
2996+ # deployment itself failed the whole batch — blacklist the SID
2997+ # and raise so _relay_with_retry can pick a different one.
2998+ # Per-item "e" fields below are target-origin errors and stay
2999+ # untouched (requirement 3.6).
3000+ category , raw = classify_relay_envelope (resp_body )
3001+ if category is not None :
3002+ self ._blacklist_sid (sid , reason = category )
3003+ raise ScriptDeploymentError (sid , category , raw )
3004+
28113005 text = resp_body .decode (errors = "replace" ).strip ()
28123006 # Apps Script can wrap JSON inside an HTML shell; reuse the same
28133007 # robust loader used by single-response parsing.
0 commit comments