Skip to content

Commit c60af72

Browse files
committed
Reuse a persistent subprocess per python transform instead of spawning one per example
Previously each example snippet spawned a fresh subprocess, paying the full Python startup + import cost (e.g. geopandas pulls in pandas/numpy/shapely/...) on every invocation. On Windows Docker Desktop with a bind-mounted workspace, this cost is multiplied by the WSL2 filesystem overhead, turning a 3-second run into several minutes. Now a single subprocess is kept alive per (python_bin, transform_content) pair and fed examples via a JSON-lines stdin/stdout protocol. Imports are cached in sys.modules after the first example; each subsequent example pays near-zero import cost. Per-example globals are isolated via a fresh namespace dict on each exec(), so state cannot leak between examples. The process is respawned automatically if it dies, with one retry.
1 parent 66debb9 commit c60af72

1 file changed

Lines changed: 154 additions & 56 deletions

File tree

ogc/bblocks/transformers/python.py

Lines changed: 154 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,15 @@
11
#!/usr/bin/env python3
22
from __future__ import annotations
33

4+
import atexit
5+
import base64
46
import json
57
import logging
68
import subprocess
79
import sys
810
import tempfile
911
from pathlib import Path
12+
from threading import Lock
1013

1114
from ogc.bblocks.log import log_indent
1215
from ogc.bblocks.models import TransformMetadata, TransformResult, Transformer
@@ -15,32 +18,134 @@
1518

1619
transform_type = 'python'
1720

21+
_process_cache: dict[tuple, _PersistentProcess] = {}
22+
_cache_lock = Lock()
1823

19-
def _strip_harness_frames(stderr: str, harness_path: str) -> str:
20-
lines = stderr.splitlines(keepends=True)
21-
result = []
22-
skip_next = False
23-
for line in lines:
24-
if skip_next:
25-
skip_next = False
26-
continue
27-
if f'File "{harness_path}"' in line:
28-
skip_next = True
29-
continue
30-
result.append(line)
31-
return ''.join(result)
32-
33-
34-
def _decode_output(raw: bytes) -> tuple[str | bytes | None, bool]:
35-
if not raw:
36-
return None, False
24+
25+
def _build_persistent_harness(transform_content: str) -> str:
26+
return f"""\
27+
import sys as _sys, json as _json, types as _types, base64 as _b64, io as _io, traceback as _tb
28+
29+
_TRANSFORM_CODE = compile({repr(transform_content)}, '<transform>', 'exec')
30+
31+
_real_stdout = _sys.stdout.buffer
32+
33+
for _line in _sys.stdin:
34+
_line = _line.strip()
35+
if not _line:
36+
continue
37+
_req = _json.loads(_line)
38+
_d = _req['metadata']
39+
if isinstance(_d.get('context'), dict):
40+
_d['context'] = _types.SimpleNamespace(**_d['context'])
41+
transform_metadata = _types.SimpleNamespace(**_d)
42+
_raw = _b64.b64decode(_req['input'])
3743
try:
38-
text = raw.decode('utf-8')
39-
if '\x00' in text:
40-
return raw, True
41-
return text, False
44+
input_data = _raw.decode('utf-8')
4245
except UnicodeDecodeError:
43-
return raw, True
46+
input_data = _raw
47+
48+
_capture = _io.StringIO()
49+
_prev_stdout, _prev_stderr = _sys.stdout, _sys.stderr
50+
_sys.stdout = _capture
51+
_sys.stderr = _capture
52+
53+
_ns = {{'transform_metadata': transform_metadata, 'input_data': input_data, 'output_data': None}}
54+
try:
55+
exec(_TRANSFORM_CODE, _ns)
56+
output_data = _ns.get('output_data')
57+
if output_data is not None:
58+
if isinstance(output_data, bytes):
59+
_out_b64 = _b64.b64encode(output_data).decode()
60+
_binary = True
61+
else:
62+
_out_b64 = _b64.b64encode(output_data.encode('utf-8')).decode()
63+
_binary = False
64+
else:
65+
_out_b64 = None
66+
_binary = False
67+
_resp = {{'output': _out_b64, 'success': True, 'stderr': _capture.getvalue() or None, 'binary': _binary}}
68+
except Exception:
69+
_stderr_text = _capture.getvalue() + _tb.format_exc()
70+
_resp = {{'output': None, 'success': False, 'stderr': _stderr_text, 'binary': False}}
71+
finally:
72+
_sys.stdout, _sys.stderr = _prev_stdout, _prev_stderr
73+
74+
_real_stdout.write((_json.dumps(_resp) + '\\n').encode('utf-8'))
75+
_real_stdout.flush()
76+
"""
77+
78+
79+
class _PersistentProcess:
80+
81+
def __init__(self, python_bin: Path, transform_content: str):
82+
self._python_bin = python_bin
83+
self._transform_content = transform_content
84+
self._proc: subprocess.Popen | None = None
85+
self._harness_path: Path | None = None
86+
self._lock = Lock()
87+
self._start()
88+
89+
def _start(self):
90+
if self._harness_path is None:
91+
harness = _build_persistent_harness(self._transform_content)
92+
with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as f:
93+
f.write(harness)
94+
self._harness_path = Path(f.name)
95+
self._proc = subprocess.Popen(
96+
[str(self._python_bin), str(self._harness_path)],
97+
stdin=subprocess.PIPE,
98+
stdout=subprocess.PIPE,
99+
)
100+
101+
def _send_raw(self, req_line: bytes) -> dict | None:
102+
try:
103+
self._proc.stdin.write(req_line)
104+
self._proc.stdin.flush()
105+
resp_line = self._proc.stdout.readline()
106+
if resp_line:
107+
return json.loads(resp_line)
108+
except (BrokenPipeError, OSError):
109+
pass
110+
return None
111+
112+
def send(self, metadata_dict: dict, input_data: bytes | str) -> dict:
113+
if isinstance(input_data, str):
114+
input_data = input_data.encode('utf-8')
115+
req_line = (json.dumps({
116+
'metadata': metadata_dict,
117+
'input': base64.b64encode(input_data).decode(),
118+
}) + '\n').encode('utf-8')
119+
120+
with self._lock:
121+
resp = self._send_raw(req_line)
122+
if resp is None:
123+
logger.warning("Transform process died, respawning")
124+
self._start()
125+
resp = self._send_raw(req_line)
126+
return resp or {'output': None, 'success': False, 'stderr': 'Transform process died', 'binary': False}
127+
128+
def close(self):
129+
with self._lock:
130+
if self._proc and self._proc.poll() is None:
131+
try:
132+
self._proc.stdin.close()
133+
self._proc.wait(timeout=5)
134+
except Exception:
135+
self._proc.kill()
136+
if self._harness_path:
137+
self._harness_path.unlink(missing_ok=True)
138+
self._harness_path = None
139+
140+
141+
def _close_all_processes():
142+
with _cache_lock:
143+
for proc in _process_cache.values():
144+
proc.close()
145+
_process_cache.clear()
146+
147+
148+
atexit.register(_close_all_processes)
44149

45150

46151
class PythonTransformer(Transformer):
@@ -57,6 +162,17 @@ def transform(self, metadata: TransformMetadata) -> TransformResult:
57162
else:
58163
python_bin = Path(sys.executable)
59164

165+
transform_content = metadata.transform_content
166+
if isinstance(transform_content, bytes):
167+
transform_content = transform_content.decode('utf-8')
168+
169+
cache_key = (str(python_bin), transform_content)
170+
with _cache_lock:
171+
proc = _process_cache.get(cache_key)
172+
if proc is None:
173+
proc = _PersistentProcess(python_bin, transform_content)
174+
_process_cache[cache_key] = proc
175+
60176
transform_metadata_dict = {
61177
'source_mime_type': metadata.source_mime_type,
62178
'target_mime_type': metadata.target_mime_type,
@@ -65,44 +181,26 @@ def transform(self, metadata: TransformMetadata) -> TransformResult:
65181
'context': metadata.ctx.to_dict() if metadata.ctx else None,
66182
}
67183

68-
harness = f"""\
69-
import sys as _sys, json as _json, types as _types
70-
_d = _json.loads(_sys.argv[1])
71-
if isinstance(_d.get('context'), dict):
72-
_d['context'] = _types.SimpleNamespace(**_d['context'])
73-
transform_metadata = _types.SimpleNamespace(**_d)
74-
input_data = _sys.stdin.read()
75-
output_data = None
76-
_real_stdout = _sys.stdout
77-
_sys.stdout = _sys.stderr
78-
exec(compile({repr(metadata.transform_content)}, '<transform>', 'exec'), globals())
79-
_sys.stdout = _real_stdout
80-
if output_data is not None:
81-
_sys.stdout.buffer.write(output_data if isinstance(output_data, bytes) else output_data.encode('utf-8'))
82-
"""
83-
84-
with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as f:
85-
f.write(harness)
86-
harness_path = f.name
184+
resp = proc.send(transform_metadata_dict, metadata.input_data)
87185

88-
try:
89-
result = subprocess.run(
90-
[str(python_bin), harness_path, json.dumps(transform_metadata_dict)],
91-
input=metadata.input_data.encode('utf-8') if isinstance(metadata.input_data, str) else metadata.input_data,
92-
capture_output=True,
93-
)
94-
finally:
95-
Path(harness_path).unlink(missing_ok=True)
96-
97-
stderr = _strip_harness_frames(result.stderr.decode('utf-8', errors='replace'), harness_path) or None
186+
stderr = resp.get('stderr') or None
98187
if stderr:
99188
label = metadata.id or 'python'
100189
with log_indent():
101190
for line in stderr.splitlines():
102191
if line.strip():
103192
logger.info("[%s] %s", label, line)
104-
if result.returncode != 0:
105-
return TransformResult(output=None, success=False, stderr=stderr)
106193

107-
output, binary = _decode_output(result.stdout)
108-
return TransformResult(output=output, success=True, stderr=stderr, binary=binary)
194+
output_b64 = resp.get('output')
195+
if output_b64 is not None:
196+
output_bytes = base64.b64decode(output_b64)
197+
output: str | bytes | None = output_bytes if resp.get('binary') else output_bytes.decode('utf-8')
198+
else:
199+
output = None
200+
201+
return TransformResult(
202+
output=output,
203+
success=resp.get('success', False),
204+
stderr=stderr,
205+
binary=resp.get('binary', False),
206+
)

0 commit comments

Comments
 (0)