Skip to content

Commit b1258a6

Browse files
authored
Merge pull request #194 from remittor-pr/fastpysgi-db
[python] FastPySGI: Cleanup code and add support async-db test
2 parents 166967a + 44cc6e7 commit b1258a6

4 files changed

Lines changed: 143 additions & 69 deletions

File tree

frameworks/fastpysgi-asgi/app.py

Lines changed: 18 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
DB_PATH = "/data/benchmark.db"
1919
DB_AVAILABLE = os.path.exists(DB_PATH)
20-
2120
DB_QUERY = (
2221
"SELECT id, name, category, price, quantity, active, tags, rating_score, rating_count"
2322
" FROM items"
@@ -72,7 +71,7 @@ def _get_db() -> sqlite3.Connection:
7271
_local.conn = conn
7372
return conn
7473

75-
# -- Posgres DB ------------------------------------------------------------
74+
# -- Postgres DB ------------------------------------------------------------
7675

7776
PG_POOL_MIN_SIZE = 2
7877
PG_POOL_MAX_SIZE = 3
@@ -193,7 +192,7 @@ async def json_endpoint(scope, receive, send):
193192

194193
async def compression_endpoint(scope, receive, send):
195194
global LARGE_JSON_BUF
196-
if LARGE_JSON_BUF is None:
195+
if not LARGE_JSON_BUF:
197196
return text_resp("No dataset", 500)
198197
compressed = zlib.compress(LARGE_JSON_BUF, level = 1, wbits = 31)
199198
return json_resp(compressed, gzip = True)
@@ -245,7 +244,7 @@ async def async_db_endpoint(scope, receive, send):
245244
'price' : row['price'],
246245
'quantity': row['quantity'],
247246
'active' : row['active'],
248-
'tags' : row['tags'],
247+
'tags' : json.loads(row['tags']) if isinstance(row['tags'], str) else row['tags'],
249248
'rating': {
250249
'score': row['rating_score'],
251250
'count': row['rating_count'],
@@ -284,19 +283,23 @@ async def handle_405(scope, receive, send):
284283

285284
# -- ASGI app -----------------------------------------------------------
286285

286+
async def asgi_lifespan(receive, send):
287+
while True:
288+
message = await receive()
289+
if message['type'] == 'lifespan.startup':
290+
#await db_setup()
291+
await send({'type': 'lifespan.startup.complete'})
292+
elif message['type'] == 'lifespan.shutdown':
293+
await db_close()
294+
await send({'type': 'lifespan.shutdown.complete'})
295+
return
296+
287297
async def app(scope, receive, send):
288298
global ROUTES
289-
if scope['type'] == 'lifespan':
290-
while True:
291-
message = await receive()
292-
if message['type'] == 'lifespan.startup':
293-
await db_setup()
294-
await send({'type': 'lifespan.startup.complete'})
295-
elif message['type'] == 'lifespan.shutdown':
296-
await db_close()
297-
await send({'type': 'lifespan.shutdown.complete'})
298-
return
299-
return
299+
req_type = scope['type']
300+
if req_type == 'lifespan':
301+
return await asgi_lifespan(receive, send)
302+
assert req_type == 'http'
300303
req_method = scope.get('method', '')
301304
if req_method not in [ 'GET', 'POST' ]:
302305
await send( { 'type': 'http.response.start', 'status': 405, 'headers': DEF_TEXT_HEADERS } )

frameworks/fastpysgi/app.py

Lines changed: 120 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -2,67 +2,117 @@
22
import sys
33
import json
44
import threading
5+
import multiprocessing
56
import zlib
67
import sqlite3
78
from urllib.parse import parse_qs
89

910
import orjson
11+
import psycopg_pool
12+
import psycopg.rows
1013

11-
# -- Dataset ----------------------------------------------------------
14+
# -- Dataset and constants --------------------------------------------------------
1215

13-
dataset_items = None
14-
dataset_path = os.environ.get("DATASET_PATH", "/data/dataset.json")
16+
CPU_COUNT = int(multiprocessing.cpu_count())
17+
18+
DB_PATH = "/data/benchmark.db"
19+
DB_AVAILABLE = os.path.exists(DB_PATH)
20+
DB_QUERY = (
21+
"SELECT id, name, category, price, quantity, active, tags, rating_score, rating_count"
22+
" FROM items"
23+
" WHERE price BETWEEN ? AND ? LIMIT 50"
24+
)
25+
26+
DATABASE_URL = os.environ.get("DATABASE_URL", "postgresql://bench:bench@localhost:5432/benchmark")
27+
DATABASE_POOL = None
28+
DATABASE_QUERY = (
29+
"SELECT id, name, category, price, quantity, active, tags, rating_score, rating_count"
30+
" FROM items"
31+
" WHERE price BETWEEN %s AND %s LIMIT 50"
32+
)
33+
if DATABASE_URL.startswith("postgres://"):
34+
DATABASE_URL = "postgresql://" + DATABASE_URL[len("postgres://"):]
35+
36+
DATASET_LARGE_PATH = "/data/dataset-large.json"
37+
DATASET_PATH = os.environ.get("DATASET_PATH", "/data/dataset.json")
38+
DATASET_ITEMS = None
1539
try:
16-
with open(dataset_path) as file:
17-
dataset_items = json.load(file)
40+
with open(DATASET_PATH) as file:
41+
DATASET_ITEMS = json.load(file)
1842
except Exception:
1943
pass
2044

2145
# Large dataset for compression (pre-serialised)
22-
large_json_buf: bytes | None = None
46+
LARGE_JSON_BUF: bytes | None = None
2347
try:
24-
with open("/data/dataset-large.json") as file:
48+
with open(DATASET_LARGE_PATH) as file:
2549
raw = json.load(file)
2650
items = [ ]
2751
for d in raw:
2852
item = dict(d)
2953
item["total"] = round(d["price"] * d["quantity"] * 100) / 100
3054
items.append(item)
31-
large_json_buf = orjson.dumps( { "items": items, "count": len(items) } )
55+
LARGE_JSON_BUF = orjson.dumps( { "items": items, "count": len(items) } )
3256
except Exception:
3357
pass
3458

3559
# -- SQLite (thread-local, sync — runs in threadpool via run_in_executor) --
3660

37-
db_available = os.path.exists("/data/benchmark.db")
38-
DB_QUERY = (
39-
"SELECT id, name, category, price, quantity, active, tags, rating_score, rating_count"
40-
" FROM items"
41-
" WHERE price BETWEEN ? AND ? LIMIT 50"
42-
)
4361
_local = threading.local()
4462

4563
def _get_db() -> sqlite3.Connection:
64+
global _local
4665
conn = getattr(_local, "conn", None)
4766
if conn is None:
48-
conn = sqlite3.connect("/data/benchmark.db", uri = True, check_same_thread = False)
67+
conn = sqlite3.connect(DB_PATH, uri = True, check_same_thread = False)
4968
conn.execute("PRAGMA mmap_size=268435456")
5069
conn.row_factory = sqlite3.Row
5170
_local.conn = conn
5271
return conn
5372

73+
# -- Postgres DB ------------------------------------------------------------
74+
75+
PG_POOL_MIN_SIZE = 2
76+
PG_POOL_MAX_SIZE = 3
77+
78+
def db_close():
79+
global DATABASE_POOL
80+
if DATABASE_POOL:
81+
try:
82+
DATABASE_POOL.close()
83+
except Exception:
84+
pass
85+
DATABASE_POOL = None
86+
87+
def db_setup():
88+
global DATABASE_POOL, DATABASE_URL, CPU_COUNT
89+
db_close()
90+
max_pool_size = 0
91+
try:
92+
DATABASE_POOL = psycopg_pool.ConnectionPool(
93+
conninfo = DATABASE_URL,
94+
min_size = PG_POOL_MIN_SIZE,
95+
max_size = max(max_pool_size, PG_POOL_MAX_SIZE),
96+
kwargs = { 'row_factory': psycopg.rows.dict_row },
97+
)
98+
#DATABASE_POOL.wait()
99+
except Exception:
100+
DATABASE_POOL = None
101+
54102
# -- Helpers ----------------------------------------------------------
55103

104+
DEF_TEXT_HEADERS = [ ( 'Content-Type', 'text/plain; charset=utf-8' ) ]
105+
56106
def text_resp(body: str | bytes, status: int = 200):
57-
headers = [ ( 'Content-Type', 'text/plain; charset=utf-8' ) ]
58107
if isinstance(body, str):
59108
body = body.encode('utf-8')
60-
return status, headers, body
109+
return status, DEF_TEXT_HEADERS, body
61110

62111
def json_resp(body, status: int = 200, gzip: bool = False):
63-
headers = [ ( 'Content-Type', 'application/json' ) ]
64112
if gzip:
65-
headers.append( ( 'Content-Encoding', 'gzip' ) )
113+
headers = [ ('Content-Type', 'application/json'), ('Content-Encoding', 'gzip') ]
114+
else:
115+
headers = [ ('Content-Type', 'application/json') ]
66116
if isinstance(body, dict):
67117
body = orjson.dumps(body)
68118
if isinstance(body, str):
@@ -106,25 +156,28 @@ def baseline2(env):
106156
return text_resp(str(total))
107157

108158
def json_endpoint(env):
109-
if dataset_items is None:
159+
global DATASET_ITEMS
160+
if not DATASET_ITEMS:
110161
return text_resp("No dataset", 500)
111162
items = [ ]
112-
for d in dataset_items:
163+
for d in DATASET_ITEMS:
113164
item = dict(d)
114165
item["total"] = round(d["price"] * d["quantity"] * 100) / 100
115166
items.append(item)
116167
return json_resp( { "items": items, "count": len(items) } )
117168

118169
def compression_endpoint(env):
119-
if large_json_buf is None:
170+
global LARGE_JSON_BUF
171+
if not LARGE_JSON_BUF:
120172
return text_resp("No dataset", 500)
121-
compressed = zlib.compress(large_json_buf, level = 1, wbits = 31)
173+
compressed = zlib.compress(LARGE_JSON_BUF, level = 1, wbits = 31)
122174
return json_resp(compressed, gzip = True)
123175

124176
def db_endpoint(env):
125-
query_params = parse_qs(env.get('QUERY_STRING', ''))
126-
if not db_available:
177+
global DB_AVAILABLE, DB_QUERY
178+
if not DB_AVAILABLE:
127179
return json_resp( { "items": [ ], "count": 0 } )
180+
query_params = parse_qs(env.get('QUERY_STRING', ''))
128181
min_val = float(query_params.get("min", [10])[0])
129182
max_val = float(query_params.get("max", [50])[0])
130183
conn = _get_db()
@@ -145,6 +198,39 @@ def db_endpoint(env):
145198
)
146199
return json_resp( { "items": items, "count": len(items) } )
147200

201+
def async_db_endpoint(env):
202+
global DATABASE_POOL, DATABASE_QUERY
203+
if not DATABASE_POOL:
204+
db_setup()
205+
if not DATABASE_POOL:
206+
return json_resp( { "items": [ ], "count": 0 } )
207+
query_params = parse_qs(env.get('QUERY_STRING', ''))
208+
min_val = float(query_params.get("min", [10])[0])
209+
max_val = float(query_params.get("max", [50])[0])
210+
try:
211+
with DATABASE_POOL.connection() as conn:
212+
rows = conn.execute(DATABASE_QUERY, (min_val, max_val)).fetchall()
213+
items = [
214+
{
215+
'id' : row['id'],
216+
'name' : row['name'],
217+
'category': row['category'],
218+
'price' : row['price'],
219+
'quantity': row['quantity'],
220+
'active' : row['active'],
221+
'tags' : json.loads(row['tags']) if isinstance(row['tags'], str) else row['tags'],
222+
'rating': {
223+
'score': row['rating_score'],
224+
'count': row['rating_count'],
225+
}
226+
}
227+
for row in rows
228+
]
229+
return json_resp( { "items": items, "count": len(items) } )
230+
except Exception:
231+
return json_resp( { "items": [ ], "count": 0 } )
232+
233+
148234
READ_BUF_SIZE = 256*1024
149235

150236
def upload_endpoint(env):
@@ -162,13 +248,14 @@ def upload_endpoint(env):
162248
break
163249
return text_resp(str(size))
164250

165-
routes = {
251+
ROUTES = {
166252
'/pipeline': pipeline,
167253
'/baseline11': baseline11,
168254
'/baseline2': baseline2,
169255
'/json': json_endpoint,
170256
'/compression': compression_endpoint,
171257
'/db': db_endpoint,
258+
'/async-db': async_db_endpoint,
172259
'/upload': upload_endpoint,
173260
}
174261

@@ -180,52 +267,33 @@ def handle_405(env):
180267

181268
# -- WSGI app -----------------------------------------------------------
182269

183-
http_status = {
270+
HTTP_STATUS = {
184271
200: '200 OK',
185272
404: '404 Not Found',
186273
405: '405 Method Not Allowed',
187274
500: '500 Internal Server Error',
188275
}
189276

190277
def app(env, start_response):
191-
global routes
278+
global ROUTES, HTTP_STATUS
192279
req_method = env.get('REQUEST_METHOD', '')
193280
if req_method not in [ 'GET', 'POST' ]:
194281
status, headers, body = handle_405(env)
195282
else:
196283
path = env["PATH_INFO"]
197-
app_handler = routes.get(path, handle_404)
284+
app_handler = ROUTES.get(path, handle_404)
198285
status, headers, body = app_handler(env)
199-
start_response(http_status.get(status, str(status)), headers)
286+
start_response(HTTP_STATUS.get(status, str(status)), headers)
200287
return [ body ]
201288

202289
# -----------------------------------------------------------------------
203290

204291
if __name__ == "__main__":
205-
import multiprocessing
206292
import fastpysgi
207293

208-
workers = int(multiprocessing.cpu_count())
209294
host = '0.0.0.0'
210295
port = 8080
211296

212-
def run_app():
213-
fastpysgi.server.read_buffer_size = READ_BUF_SIZE
214-
fastpysgi.server.backlog = 4096
215-
fastpysgi.run(app, host, port, loglevel = 0)
216-
sys.exit(0)
217-
218-
processes = [ ]
219-
# fork limiting the cpu count - 1
220-
for i in range(1, workers):
221-
try:
222-
pid = os.fork()
223-
if pid == 0:
224-
run_app()
225-
else:
226-
processes.append(pid)
227-
except OSError as e:
228-
print("Failed to fork:", e)
229-
230-
# run app on the main process too :)
231-
run_app()
297+
fastpysgi.server.read_buffer_size = READ_BUF_SIZE
298+
fastpysgi.server.backlog = 4096
299+
fastpysgi.run(app, host, port, workers = CPU_COUNT, loglevel = 0)

frameworks/fastpysgi/meta.json

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
{
2-
"display_name": "FastPySGI",
2+
"display_name": "FastPySGI-WSGI",
33
"language": "Python",
44
"type": "engine",
55
"engine": "libuv",
@@ -14,6 +14,7 @@
1414
"json",
1515
"upload",
1616
"compression",
17-
"mixed"
17+
"mixed",
18+
"async-db"
1819
]
1920
}
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,4 @@
11
fastpysgi==0.4
22
orjson==3.10.15
3+
psycopg[binary]==3.2.4
4+
psycopg_pool==3.2.6

0 commit comments

Comments
 (0)