1+ import multiprocessing
2+ import os
3+ import psycopg
4+ import platform
5+ import random
6+ import asyncio
7+ import blacksheep as bs
8+ import jinja2
9+ from pathlib import Path
10+ from psycopg_pool import AsyncConnectionPool
11+
12+ READ_ROW_SQL = 'SELECT "id", "randomnumber" FROM "world" WHERE id = %s'
13+ WRITE_ROW_SQL = 'UPDATE "world" SET "randomnumber"=%s WHERE id=%s'
14+ ADDITIONAL_ROW = [0 , "Additional fortune added at request time." ]
15+ CORE_COUNT = multiprocessing .cpu_count ()
16+
17+ MAX_POOL_SIZE = CORE_COUNT * 2
18+ MIN_POOL_SIZE = max (1 , MAX_POOL_SIZE // 2 )
19+ db_pool = None
20+
21+ async def setup_db (app ):
22+ global db_pool
23+ conninfo = (
24+ f"postgresql://{ os .getenv ('PGUSER' , 'benchmarkdbuser' )} :{ os .getenv ('PGPASS' , 'benchmarkdbpass' )} "
25+ f"@tfb-database:5432/hello_world"
26+ )
27+ db_pool = AsyncConnectionPool (
28+ conninfo = conninfo ,
29+ min_size = MIN_POOL_SIZE ,
30+ max_size = MAX_POOL_SIZE ,
31+ open = False ,
32+ timeout = 5.0 ,
33+ max_lifetime = 1800 ,
34+ )
35+ await db_pool .open ()
36+
37+ async def shutdown_db (app ):
38+ global db_pool
39+ if db_pool is not None :
40+ await db_pool .close ()
41+ db_pool = None
42+
43+ def load_fortunes_template ():
44+ with Path ("templates/fortune.html" ).open ("r" ) as f :
45+ return jinja2 .Template (f .read ())
46+
47+ fortune_template = load_fortunes_template ()
48+
49+ app = bs .Application ()
50+ app .on_start += setup_db
51+ app .on_stop += shutdown_db
52+
53+ def get_num_queries (request ):
54+ try :
55+ value = request .query .get ('queries' )
56+ if value is None :
57+ return 1
58+ query_count = int (value [0 ])
59+ except (KeyError , IndexError , ValueError ):
60+ return 1
61+ return min (max (query_count , 1 ), 500 )
62+
63+ JSON_CONTENT_TYPE = b"application/json"
64+
65+ @bs .get ('/json' )
66+ async def json_test (request ):
67+ return bs .json ({'message' : 'Hello, world!' })
68+
69+ @bs .get ('/db' )
70+ async def single_db_query_test (request ):
71+ row_id = random .randint (1 , 10000 )
72+ async with db_pool .connection () as db_conn :
73+ async with db_conn .cursor () as cursor :
74+ await cursor .execute (READ_ROW_SQL , (row_id ,))
75+ number = await cursor .fetchone ()
76+ return bs .json ({'id' : row_id , 'randomNumber' : number [1 ]})
77+
78+ @bs .get ('/queries' )
79+ async def multiple_db_queries_test (request ):
80+ num_queries = get_num_queries (request )
81+ row_ids = random .sample (range (1 , 10000 ), num_queries )
82+ worlds = []
83+ async with db_pool .connection () as db_conn :
84+ async with db_conn .cursor () as cursor :
85+ for row_id in row_ids :
86+ await cursor .execute (READ_ROW_SQL , (row_id ,))
87+ number = await cursor .fetchone ()
88+ worlds .append ({"id" : row_id , "randomNumber" : number [1 ]})
89+ return bs .json (worlds )
90+
91+ @bs .get ('/fortunes' )
92+ async def fortunes_test (request ):
93+ async with db_pool .connection () as db_conn :
94+ async with db_conn .cursor () as cursor :
95+ await cursor .execute ("SELECT * FROM Fortune" )
96+ fortunes = await cursor .fetchall ()
97+ fortunes .append (ADDITIONAL_ROW )
98+ fortunes .sort (key = lambda row : row [1 ])
99+ data = fortune_template .render (fortunes = fortunes )
100+ return bs .html (data )
101+
102+ @bs .get ('/updates' )
103+ async def db_updates_test (request ):
104+ num_queries = get_num_queries (request )
105+ updates = sorted (zip (
106+ random .sample (range (1 , 10000 ), num_queries ),
107+ random .sample (range (1 , 10000 ), num_queries )
108+ ), key = lambda x : x [1 ])
109+ worlds = [{"id" : row_id , "randomNumber" : number } for row_id , number in updates ]
110+ for _ in range (5 ):
111+ async with db_pool .connection () as db_conn :
112+ try :
113+ await db_conn .execute ("SET TRANSACTION ISOLATION LEVEL READ COMMITTED" )
114+ async with db_conn .cursor () as cursor :
115+ for row_id , number in updates :
116+ await cursor .execute (READ_ROW_SQL , (row_id ,))
117+ await cursor .fetchone ()
118+ for _ in range (5 ):
119+ try :
120+ await cursor .executemany (WRITE_ROW_SQL , [(number , row_id ) for row_id , number in updates ])
121+ return bs .json (worlds )
122+ except psycopg .errors .DeadlockDetected :
123+ await db_conn .rollback ()
124+ continue
125+ # await cursor.executemany(WRITE_ROW_SQL, [(number, row_id) for row_id, number in updates])
126+ except (psycopg .errors .OperationalError , psycopg .errors .PipelineAborted ):
127+ await db_conn .rollback ()
128+ continue
129+ raise Exception ("connect error" )
130+
131+ @bs .get ('/plaintext' )
132+ async def plaintext_test (request ):
133+ return bs .Response (200 , content = bs .Content (b"text/plain" , b'Hello, World!' ))
134+
135+ if platform .python_implementation () == 'PyPy' :
136+ import logging
137+ from socketify import ASGI
138+ workers = int (multiprocessing .cpu_count ())
139+ if os .environ .get ('TRAVIS' ) == 'true' :
140+ workers = 2
141+
142+ def run_app ():
143+ ASGI (app ).listen (8080 , lambda config : logging .info (f"Listening on port http://localhost:{ config .port } now\n " )).run ()
144+
145+ def create_fork ():
146+ n = os .fork ()
147+ if not n > 0 :
148+ run_app ()
149+
150+ for i in range (1 , workers ):
151+ create_fork ()
152+ run_app ()
0 commit comments