Skip to content

Commit 0bcc9e1

Browse files
committed
feat(DISET): add throttle duration tracking and diagnostic logging
Track when throttling starts and log state transitions with queue/thread diagnostics. Periodic warnings are emitted every 30s while throttling persists, making it easier to diagnose stuck services from logs without needing to attach a debugger. Adds Service.throttleDiagnostics() to expose queue size, max queue, active threads, and max threads for logging purposes.
1 parent 06d9906 commit 0bcc9e1

File tree

2 files changed

+35
-1
lines changed

2 files changed

+35
-1
lines changed

src/DIRAC/Core/DISET/ServiceReactor.py

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,9 @@
3535
#: This sleep is repeated for as long as Service.wantsThrottle is truthy
3636
THROTTLE_SERVICE_SLEEP_SECONDS = 0.25
3737

38+
#: Interval between periodic throttle warning messages (seconds)
39+
THROTTLE_LOG_INTERVAL_SECONDS = 30
40+
3841

3942
class ServiceReactor:
4043
__transportExtraKeywords = {
@@ -200,6 +203,8 @@ def __acceptIncomingConnection(self, svcName=False):
200203
services at the same time
201204
"""
202205
sel = self.__getListeningSelector(svcName)
206+
throttleStartedAt = None
207+
lastThrottleLog = 0
203208
while self.__alive:
204209
clientTransport = None
205210
try:
@@ -226,10 +231,30 @@ def __acceptIncomingConnection(self, svcName=False):
226231
# Handle throttling: reject all connections while overloaded
227232
# to prevent queue growth when threads are stuck
228233
if self.__services[svcName].wantsThrottle:
229-
gLogger.warn("Rejecting client due to throttling", str(clientTransport.getRemoteAddress()))
234+
now = time.time()
235+
if throttleStartedAt is None:
236+
throttleStartedAt = now
237+
diag = self.__services[svcName].throttleDiagnostics()
238+
gLogger.warn(
239+
f"Service {svcName} entering throttle mode",
240+
f"queue={diag['queue']}/{diag['maxQueue']}, " f"threads={diag['threads']}/{diag['maxThreads']}",
241+
)
242+
lastThrottleLog = now
243+
elif now - lastThrottleLog >= THROTTLE_LOG_INTERVAL_SECONDS:
244+
duration = now - throttleStartedAt
245+
diag = self.__services[svcName].throttleDiagnostics()
246+
gLogger.warn(
247+
f"Service {svcName} still throttling after {duration:.0f}s",
248+
f"queue={diag['queue']}/{diag['maxQueue']}, " f"threads={diag['threads']}/{diag['maxThreads']}",
249+
)
250+
lastThrottleLog = now
230251
clientTransport.close()
231252
time.sleep(THROTTLE_SERVICE_SLEEP_SECONDS)
232253
continue
254+
if throttleStartedAt is not None:
255+
duration = time.time() - throttleStartedAt
256+
gLogger.info(f"Service {svcName} throttle cleared after {duration:.1f}s")
257+
throttleStartedAt = None
233258
# Handle connection
234259
self.__stats.connectionStablished()
235260
self.__services[svcName].handleConnection(clientTransport)

src/DIRAC/Core/DISET/private/Service.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -286,6 +286,15 @@ def wantsThrottle(self):
286286
nQueued = self._threadPool._work_queue.qsize()
287287
return nQueued > self._cfg.getMaxWaitingPetitions()
288288

289+
def throttleDiagnostics(self):
290+
"""Return a dict of diagnostics useful for throttle logging"""
291+
return {
292+
"queue": self._threadPool._work_queue.qsize(),
293+
"maxQueue": self._cfg.getMaxWaitingPetitions(),
294+
"threads": len(self._threadPool._threads),
295+
"maxThreads": self._cfg.getMaxThreads(),
296+
}
297+
289298
# Threaded process function
290299
def _processInThread(self, clientTransport):
291300
"""

0 commit comments

Comments
 (0)