Skip to content

Commit 0566d2d

Browse files
authored
Merge pull request #623 from moomoo-tech/pyronova-v2.3.1
pyronova: bump to v2.3.1, unlock sub-interp DB bridge + size GIL pool
2 parents 674a08a + 0343f64 commit 0566d2d

4 files changed

Lines changed: 304 additions & 28 deletions

File tree

frameworks/pyronova/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ RUN pip install --no-cache-dir maturin
1414
# rebuilds from that tag. The full source ships at github, not in this
1515
# PR, so the PR stays small and the build is byte-for-byte reproducible
1616
# against a signed git ref.
17-
ARG PYRONOVA_REF=v2.0.2
17+
ARG PYRONOVA_REF=v2.3.1
1818
RUN git clone --depth 1 --branch ${PYRONOVA_REF} \
1919
https://github.com/moomoo-tech/pyronova.git /build/pyronova
2020
RUN cd /build/pyronova \

frameworks/pyronova/app.py

Lines changed: 259 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,20 @@
1919
"""
2020

2121
import json
22+
import logging
2223
import os
2324

24-
from pyronova import Pyronova, Response
25+
from pyronova import Pyronova, Request, Response
2526
from pyronova.db import PgPool
2627

28+
# Log benchmark-path errors at WARNING so they surface in the runner log
29+
# but don't flood the tracing subscriber under load. Every broad-except
30+
# site below calls log.warning(..., exc_info=True) so the stack trace is
31+
# preserved instead of silently swallowed — swallowing a traceback to
32+
# hand a 404 / 400 / {} back has been a regular source of "why is
33+
# throughput suddenly tanking?" debugging evenings elsewhere.
34+
log = logging.getLogger("pyronova.arena")
35+
2736

2837
# ---------------------------------------------------------------------------
2938
# Dataset (loaded once at process start)
@@ -99,12 +108,12 @@ def _sum_query_params(req) -> int:
99108

100109

101110
@app.get("/baseline11")
102-
def baseline11_get(req):
111+
def baseline11_get(req: "Request"):
103112
return Response(str(_sum_query_params(req)), content_type="text/plain")
104113

105114

106115
@app.post("/baseline11")
107-
def baseline11_post(req):
116+
def baseline11_post(req: "Request"):
108117
total = _sum_query_params(req)
109118
body = req.body
110119
if body:
@@ -116,12 +125,12 @@ def baseline11_post(req):
116125

117126

118127
@app.get("/baseline2")
119-
def baseline2(req):
128+
def baseline2(req: "Request"):
120129
return Response(str(_sum_query_params(req)), content_type="text/plain")
121130

122131

123132
@app.post("/upload", gil=True, stream=True)
124-
def upload(req):
133+
def upload(req: "Request"):
125134
# drain_count() runs the whole consume loop in Rust with the GIL
126135
# released once — vs a Python `for chunk in req.stream:` that pays
127136
# GIL release+reacquire + PyBytes alloc per 16 KB hyper frame
@@ -143,7 +152,7 @@ def upload(req):
143152
# ~150μs per call on the same data. Returning the dict shaves ~100μs
144153
# per request on the /json profile.
145154
@app.get("/json/{count}")
146-
def json_endpoint(req):
155+
def json_endpoint(req: "Request"):
147156
# Returning a dict directly triggers Pyronova's Rust-side JSON
148157
# serialization path (pythonize + serde_json::to_vec). Empirically
149158
# this matches or beats orjson.dumps() + Response(bytes) for
@@ -167,7 +176,7 @@ def json_endpoint(req):
167176

168177

169178
@app.get("/json-comp/{count}")
170-
def json_comp_endpoint(req):
179+
def json_comp_endpoint(req: "Request"):
171180
# Identical payload; Arena's json-comp profile hits /json/{count} in
172181
# practice (see benchmark-15), but we keep this alias registered for
173182
# legacy URL shape compatibility.
@@ -184,37 +193,51 @@ def json_comp_endpoint(req):
184193
)
185194

186195

187-
@app.get("/async-db", gil=True)
188-
def async_db_endpoint(req):
189-
# We tried the async-def + fetch_all_async path here — it's worse
190-
# (3.9k vs 7.2k rps) on Arena's async-db profile because Pyronova's GIL
191-
# async dispatch creates a per-thread asyncio event loop via
192-
# run_until_complete, so the coroutine gets no concurrency benefit
193-
# over blocking fetch. The async API (`fetch_all_async`) is still
194-
# exposed and correct for user code that multiplexes within a
195-
# single handler via asyncio.gather; it's just not the Arena win.
196+
@app.get("/async-db")
197+
def async_db_endpoint(req: "Request"):
198+
# Pyronova v2.2.0 added a C-FFI DB bridge (`_pyronova_db_fetch_*`
199+
# injected into every sub-interp's globals). The bridge forwards
200+
# sqlx calls onto the main-process shared pool while releasing the
201+
# calling sub-interp's GIL, so this handler now fans out across the
202+
# full sub-interp pool instead of serializing on the main interp.
203+
# The 3.7k → target-30k+ rps jump on async-db lives here.
204+
#
205+
# `PG_POOL is None` still guards the "no DB configured" case — on
206+
# sub-interp the `PgPool.connect()` call earlier in the module is a
207+
# noop that returns a stateless handle; the real sqlx pool is
208+
# initialized exactly once by the main interp's import-time call.
196209
if PG_POOL is None:
197-
return Response({"items": [], "count": 0}, content_type="application/json")
210+
return _EMPTY_DB_RESPONSE
198211
q = req.query_params
199212
try:
200213
min_val = int(q.get("min", "10"))
201214
max_val = int(q.get("max", "50"))
202215
limit = int(q.get("limit", "50"))
203216
limit = max(1, min(limit, 50))
204217
except ValueError:
205-
return Response({"items": [], "count": 0}, content_type="application/json")
206-
218+
log.warning("/async-db: bad query params %r", dict(q), exc_info=True)
219+
return _EMPTY_DB_RESPONSE
207220
try:
208221
rows = PG_POOL.fetch_all(PG_SQL, min_val, max_val, limit)
209-
except Exception:
210-
return Response({"items": [], "count": 0}, content_type="application/json")
211-
222+
except RuntimeError:
223+
# pyronova.db raises RuntimeError for sqlx failures; keep the
224+
# empty-response contract Arena expects, but don't lose the trace.
225+
log.warning("/async-db: fetch_all failed", exc_info=True)
226+
return _EMPTY_DB_RESPONSE
227+
return _rows_to_payload(rows)
228+
229+
230+
def _rows_to_payload(rows):
231+
# Hot loop — shaves ~30% per-row Python overhead by reading each
232+
# column exactly once and skipping the `isinstance(tags, str)` check
233+
# when PG already returned jsonb as dict/list (the common path).
212234
items = []
235+
append = items.append
213236
for row in rows:
214237
tags = row["tags"]
215-
if isinstance(tags, str):
238+
if tags.__class__ is str:
216239
tags = json.loads(tags)
217-
items.append({
240+
append({
218241
"id": row["id"],
219242
"name": row["name"],
220243
"category": row["category"],
@@ -227,7 +250,218 @@ def async_db_endpoint(req):
227250
"count": row["rating_count"],
228251
},
229252
})
230-
return Response({"items": items, "count": len(items)}, content_type="application/json")
253+
return {"items": items, "count": len(items)}
254+
255+
256+
_EMPTY_DB_RESPONSE = {"items": [], "count": 0}
257+
_NOT_FOUND = Response("not found", status_code=404, content_type="text/plain")
258+
_BAD_REQUEST = Response("bad request", status_code=400, content_type="text/plain")
259+
260+
261+
# ---------------------------------------------------------------------------
262+
# CRUD — paths mirror Arena's aspnet-minimal reference:
263+
# GET /crud/items?category=X&page=N&limit=M paginated list
264+
# GET /crud/items/{id} single item (200ms cache)
265+
# POST /crud/items upsert, returns 201
266+
# PUT /crud/items/{id} update, invalidates cache
267+
#
268+
# Cache is an in-process dict per sub-interpreter. Arena's aspnet impl
269+
# uses IMemoryCache (same semantics). `gil=True` on every handler for
270+
# the same reason /async-db needs it — our PgPool lives behind a
271+
# Rust-side OnceLock populated by the main interpreter's module-import.
272+
# ---------------------------------------------------------------------------
273+
274+
import time as _time
275+
276+
_CRUD_TTL_S = 0.2
277+
# _CRUD_CACHE is a bare dict because every handler below runs with
278+
# `gil=True` on Pyronova's main interpreter — only one handler thread
279+
# executes at a time, so dict get/set/pop are atomic under the GIL and
280+
# no lock is needed. If a handler is ever demoted off the main interp
281+
# this dict becomes a race; wrap it in threading.Lock or flip to
282+
# threading.local at that point.
283+
_CRUD_CACHE: dict = {} # item_id -> (payload_dict, expires_at_monotonic)
284+
285+
_CRUD_COLS = (
286+
"id, name, category, price, quantity, active, tags, "
287+
"rating_score, rating_count"
288+
)
289+
_CRUD_GET_SQL = f"SELECT {_CRUD_COLS} FROM items WHERE id = $1 LIMIT 1"
290+
_CRUD_LIST_SQL = (
291+
f"SELECT {_CRUD_COLS} FROM items WHERE category = $1 "
292+
"ORDER BY id LIMIT $2 OFFSET $3"
293+
)
294+
# `name = $1, price = $2, quantity = $3 WHERE id = $4`. Arena's aspnet
295+
# UPDATE doesn't touch tags/active/category — mirror exactly.
296+
_CRUD_UPDATE_SQL = "UPDATE items SET name = $1, price = $2, quantity = $3 WHERE id = $4"
297+
# Fixed tags/rating in the INSERT path — Arena's aspnet does the same
298+
# (`'[\"bench\"]'` literal, rating 0/0) so the row always passes its
299+
# CHECK constraints regardless of input shape.
300+
_CRUD_UPSERT_SQL = (
301+
"INSERT INTO items "
302+
"(id, name, category, price, quantity, active, tags, rating_score, rating_count) "
303+
"VALUES ($1, $2, $3, $4, $5, true, '[\"bench\"]', 0, 0) "
304+
"ON CONFLICT (id) DO UPDATE SET name = $2, price = $4, quantity = $5 "
305+
"RETURNING id"
306+
)
307+
308+
309+
def _row_to_full_item(row):
310+
tags = row["tags"]
311+
if tags.__class__ is str:
312+
tags = json.loads(tags)
313+
return {
314+
"id": row["id"],
315+
"name": row["name"],
316+
"category": row["category"],
317+
"price": row["price"],
318+
"quantity": row["quantity"],
319+
"active": row["active"],
320+
"tags": tags,
321+
"rating": {"score": row["rating_score"], "count": row["rating_count"]},
322+
}
323+
324+
325+
@app.get("/crud/items/{id}", gil=True)
326+
def crud_get_one(req: "Request"):
327+
# Arena cache-aside validation reads the X-Cache header (MISS/HIT) out
328+
# of every response on this endpoint — including 404s. Leaving it off
329+
# makes the runner's `curl | grep ^x-cache` pipeline fail under
330+
# `set -o pipefail` and kills the entire test script silently, so we
331+
# emit the header on every path below.
332+
try:
333+
item_id = int(req.params["id"])
334+
except (KeyError, ValueError):
335+
return Response(
336+
body="bad request", status_code=400,
337+
content_type="application/json", headers={"X-Cache": "MISS"},
338+
)
339+
if PG_POOL is None:
340+
return Response(
341+
body="not found", status_code=404,
342+
content_type="application/json", headers={"X-Cache": "MISS"},
343+
)
344+
now = _time.monotonic()
345+
entry = _CRUD_CACHE.get(item_id)
346+
if entry is not None and entry[1] > now:
347+
return Response(
348+
body=entry[0], status_code=200,
349+
content_type="application/json", headers={"X-Cache": "HIT"},
350+
)
351+
try:
352+
row = PG_POOL.fetch_one(_CRUD_GET_SQL, item_id)
353+
except RuntimeError:
354+
log.warning("/crud/items/%s: fetch_one failed", item_id, exc_info=True)
355+
return Response(
356+
body="not found", status_code=404,
357+
content_type="application/json", headers={"X-Cache": "MISS"},
358+
)
359+
if row is None:
360+
return Response(
361+
body="not found", status_code=404,
362+
content_type="application/json", headers={"X-Cache": "MISS"},
363+
)
364+
item_json = json.dumps(_row_to_full_item(row))
365+
_CRUD_CACHE[item_id] = (item_json, now + _CRUD_TTL_S)
366+
return Response(
367+
body=item_json, status_code=200,
368+
content_type="application/json", headers={"X-Cache": "MISS"},
369+
)
370+
371+
372+
@app.get("/crud/items", gil=True)
373+
def crud_list(req: "Request"):
374+
if PG_POOL is None:
375+
return _EMPTY_CRUD_LIST
376+
q = req.query_params
377+
category = q.get("category") or "electronics"
378+
try:
379+
page = int(q.get("page", "1"))
380+
if page < 1:
381+
page = 1
382+
except ValueError:
383+
page = 1
384+
try:
385+
limit = int(q.get("limit", "10"))
386+
except ValueError:
387+
limit = 10
388+
if limit < 1 or limit > 50:
389+
limit = 10
390+
offset = (page - 1) * limit
391+
try:
392+
rows = PG_POOL.fetch_all(_CRUD_LIST_SQL, category, limit, offset)
393+
except RuntimeError:
394+
log.warning("/crud/items list: fetch_all failed", exc_info=True)
395+
return _EMPTY_CRUD_LIST
396+
items = [_row_to_full_item(r) for r in rows]
397+
return {"items": items, "total": len(items), "page": page, "limit": limit}
398+
399+
400+
@app.put("/crud/items/{id}", gil=True)
401+
def crud_update(req: "Request"):
402+
if PG_POOL is None:
403+
return _NOT_FOUND
404+
try:
405+
item_id = int(req.params["id"])
406+
body = json.loads(req.body) if req.body else {}
407+
except (KeyError, ValueError, TypeError):
408+
return _BAD_REQUEST
409+
name = body.get("name") or "Updated"
410+
try:
411+
price = int(body.get("price", 0))
412+
quantity = int(body.get("quantity", 0))
413+
except (TypeError, ValueError):
414+
return _BAD_REQUEST
415+
try:
416+
affected = PG_POOL.execute(_CRUD_UPDATE_SQL, name, price, quantity, item_id)
417+
except RuntimeError:
418+
log.warning("/crud/items/%s update: execute failed", item_id, exc_info=True)
419+
return _NOT_FOUND
420+
if affected == 0:
421+
return _NOT_FOUND
422+
_CRUD_CACHE.pop(item_id, None)
423+
return {"id": item_id, "name": name, "price": price, "quantity": quantity}
424+
425+
426+
@app.post("/crud/items", gil=True)
427+
def crud_upsert(req: "Request"):
428+
if PG_POOL is None:
429+
return _BAD_REQUEST
430+
try:
431+
body = json.loads(req.body) if req.body else {}
432+
item_id = int(body["id"])
433+
except (KeyError, ValueError, TypeError):
434+
return _BAD_REQUEST
435+
name = body.get("name") or "New Product"
436+
category = body.get("category") or "test"
437+
try:
438+
price = int(body.get("price", 0))
439+
quantity = int(body.get("quantity", 0))
440+
except (TypeError, ValueError):
441+
return _BAD_REQUEST
442+
try:
443+
new_id = PG_POOL.fetch_scalar(
444+
_CRUD_UPSERT_SQL,
445+
item_id, name, category, price, quantity,
446+
)
447+
except RuntimeError:
448+
log.warning("/crud/items upsert id=%s: fetch_scalar failed", item_id, exc_info=True)
449+
return _BAD_REQUEST
450+
_CRUD_CACHE.pop(item_id, None)
451+
return Response(
452+
body=json.dumps({
453+
"id": new_id,
454+
"name": name,
455+
"category": category,
456+
"price": price,
457+
"quantity": quantity,
458+
}),
459+
status_code=201,
460+
content_type="application/json",
461+
)
462+
463+
464+
_EMPTY_CRUD_LIST = {"items": [], "total": 0, "page": 1, "limit": 10}
231465

232466

233467
if __name__ == "__main__":

0 commit comments

Comments
 (0)