Skip to content

Commit ab6697e

Browse files
author
ci bot
committed
Merge branch 'graceful-shutdown' into 'enterprise'
fix(standalone): graceful shutdown of embedded postgres on signal See merge request dkinternal/testgen/dataops-testgen!505
2 parents 07419e2 + cd0c53b commit ab6697e

2 files changed

Lines changed: 67 additions & 15 deletions

File tree

testgen/__main__.py

Lines changed: 63 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@ def _capped(level):
9797
from testgen.common.notifications.base import smtp_configured
9898
from testgen.common.standalone_postgres import (
9999
STANDALONE_URI_ENV_VAR,
100+
ensure_standalone_setup,
100101
get_server_uri,
101102
is_standalone_mode,
102103
)
@@ -116,12 +117,41 @@ def _capped(level):
116117
CHILDREN_POLL_INTERVAL = 10
117118

118119

120+
def _subprocess_spawn_kwargs() -> dict:
121+
"""Spawn flags that let the parent send a *catchable* shutdown signal to a child.
122+
123+
On POSIX, ``start_new_session`` puts the child in its own session so the
124+
console SIGINT only hits the parent. On Windows, ``CREATE_NEW_PROCESS_GROUP``
125+
is required so the parent can deliver ``CTRL_BREAK_EVENT`` to the child —
126+
without it, the only options are ``terminate()`` (uncatchable) or relying on
127+
the console-wide Ctrl+C broadcast (racy).
128+
"""
129+
if sys.platform == "win32":
130+
return {"creationflags": subprocess.CREATE_NEW_PROCESS_GROUP}
131+
return {"start_new_session": True}
132+
133+
134+
def _install_shutdown_handler(handler) -> None:
135+
"""Register *handler* for SIGINT, SIGTERM, and (on Windows) SIGBREAK.
136+
137+
Windows children spawned with ``CREATE_NEW_PROCESS_GROUP`` receive
138+
``CTRL_BREAK_EVENT`` (delivered as SIGBREAK) instead of console Ctrl+C —
139+
treat it the same as SIGTERM so atexit hooks run and pgserver's PID list
140+
stays clean.
141+
"""
142+
signal.signal(signal.SIGINT, handler)
143+
signal.signal(signal.SIGTERM, handler)
144+
if sigbreak := getattr(signal, "SIGBREAK", None):
145+
signal.signal(sigbreak, handler)
146+
147+
119148
def _forward_signal_to_child(child: subprocess.Popen, signum: int) -> None:
120-
# On POSIX, forward the signal verbatim. On Windows, subprocess.send_signal
121-
# rejects everything except SIGTERM / CTRL_C_EVENT / CTRL_BREAK_EVENT, so
122-
# fall back to terminate() — equivalent to TerminateProcess().
149+
# POSIX: forward the signal verbatim. Windows: send CTRL_BREAK_EVENT so the
150+
# child's atexit hooks run (it deregisters from pgserver's PID list, lets
151+
# streamlit/uvicorn shut down their event loops, etc.). The child must have
152+
# been spawned with CREATE_NEW_PROCESS_GROUP — see _subprocess_spawn_kwargs.
123153
if sys.platform == "win32":
124-
child.terminate()
154+
child.send_signal(signal.CTRL_BREAK_EVENT)
125155
else:
126156
child.send_signal(signum)
127157

@@ -169,7 +199,14 @@ def format_epilog(self, _ctx: Context, formatter: click.HelpFormatter) -> None:
169199
@click.pass_context
170200
def cli(ctx: Context, verbose: bool):
171201
if is_standalone_mode():
172-
start_standalone_postgres()
202+
# Children spawned by `run-app all` (and the Streamlit subprocess) inherit the
203+
# parent's pgserver URI. They must NOT call get_server() themselves — each call
204+
# registers the calling PID in pgserver's on-disk handle list, and any PID left
205+
# over after a hard kill prevents the parent from actually stopping postgres.
206+
if inherited_uri := os.environ.get(STANDALONE_URI_ENV_VAR):
207+
ensure_standalone_setup(inherited_uri)
208+
else:
209+
start_standalone_postgres()
173210

174211
if verbose:
175212
configure_logging(level=logging.DEBUG)
@@ -924,14 +961,16 @@ def init_ui():
924961
app_file = os.path.join(os.path.dirname(os.path.abspath(__file__)), "ui/app.py")
925962

926963
# In standalone mode, pass the pgserver URI to the Streamlit subprocess
927-
# so it can connect without acquiring the pgserver file lock.
964+
# so it can connect without acquiring the pgserver file lock. When this
965+
# process is itself a child of `run-app all`, the URI is already in the
966+
# inherited environment — fall back to that so Streamlit still gets it.
928967
child_env = {**os.environ, "TG_JOB_SOURCE": "UI"}
929968
if is_standalone_mode():
930-
server_uri = get_server_uri()
969+
server_uri = get_server_uri() or os.environ.get(STANDALONE_URI_ENV_VAR)
931970
if server_uri:
932-
child_env = {**os.environ, "TG_JOB_SOURCE": "UI", STANDALONE_URI_ENV_VAR: server_uri}
971+
child_env[STANDALONE_URI_ENV_VAR] = server_uri
933972

934-
process= subprocess.Popen(
973+
process = subprocess.Popen(
935974
[
936975
sys.executable,
937976
"-m",
@@ -950,12 +989,12 @@ def init_ui():
950989
f"{'--debug' if settings.IS_DEBUG else ''}",
951990
],
952991
env=child_env,
992+
**_subprocess_spawn_kwargs(),
953993
)
954994
def term_ui(signum, _):
955995
LOG.info(f"Sending termination signal {signum} to Testgen UI")
956996
_forward_signal_to_child(process, signum)
957-
signal.signal(signal.SIGINT, term_ui)
958-
signal.signal(signal.SIGTERM, term_ui)
997+
_install_shutdown_handler(term_ui)
959998
status_code = process.wait()
960999
LOG.log(logging.ERROR if status_code != 0 else logging.INFO, f"Testgen UI exited with status code {status_code}")
9611000

@@ -980,17 +1019,23 @@ def run_app(module):
9801019
run_server()
9811020

9821021
case "all":
1022+
child_env = os.environ.copy()
1023+
if is_standalone_mode() and (server_uri := get_server_uri()):
1024+
child_env[STANDALONE_URI_ENV_VAR] = server_uri
9831025
children = [
984-
subprocess.Popen([sys.executable, "-m", "testgen", "run-app", m], start_new_session=True)
1026+
subprocess.Popen(
1027+
[sys.executable, "-m", "testgen", "run-app", m],
1028+
env=child_env,
1029+
**_subprocess_spawn_kwargs(),
1030+
)
9851031
for m in APP_MODULES
9861032
]
9871033

9881034
def term_children(signum, _):
9891035
for child in children:
9901036
_forward_signal_to_child(child, signum)
9911037

992-
signal.signal(signal.SIGINT, term_children)
993-
signal.signal(signal.SIGTERM, term_children)
1038+
_install_shutdown_handler(term_children)
9941039

9951040
terminating = False
9961041
while children:
@@ -999,7 +1044,10 @@ def term_children(signum, _):
9991044
except subprocess.TimeoutExpired:
10001045
pass
10011046

1002-
for child in children:
1047+
# Iterate a snapshot — `children.remove(child)` inside a live iteration
1048+
# would skip the next element and could leave a sibling un-reaped during
1049+
# the simultaneous-exit case we're explicitly hardening for here.
1050+
for child in list(children):
10031051
if child.poll() is not None:
10041052
children.remove(child)
10051053
if not terminating:

testgen/scheduler/cli_scheduler.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,10 @@ def sig_handler(signum, _):
195195

196196
signal.signal(signal.SIGINT, sig_handler)
197197
signal.signal(signal.SIGTERM, sig_handler)
198+
# Windows: parent forwards CTRL_BREAK_EVENT (delivered as SIGBREAK) when the
199+
# child is spawned in its own process group, so handle it the same way.
200+
if sigbreak := getattr(signal, "SIGBREAK", None):
201+
signal.signal(sigbreak, sig_handler)
198202

199203
try:
200204
self.start(datetime.now(UTC))

0 commit comments

Comments
 (0)