|
| 1 | +import os |
| 2 | +import sys |
| 3 | +import multiprocessing |
| 4 | +import json |
| 5 | +from contextlib import asynccontextmanager |
| 6 | + |
| 7 | +import asyncpg |
| 8 | +import orjson |
| 9 | + |
| 10 | +os.environ["TURBO_DISABLE_RATE_LIMITING"] = "1" |
| 11 | +os.environ["TURBO_DISABLE_CACHE"] = "1" |
| 12 | + |
| 13 | +from turboapi import TurboAPI, Request, Response, Path, Query, HTTPException |
| 14 | +from turboapi.responses import PlainTextResponse, JSONResponse |
| 15 | +from turboapi.middleware.gzip import GZipMiddleware |
| 16 | +from turboapi.staticfiles import StaticFiles |
| 17 | + |
| 18 | +# -- Dataset and constants -------------------------------------------------------- |
| 19 | + |
| 20 | +CPU_COUNT = int(multiprocessing.cpu_count()) |
| 21 | +WRK_COUNT = min(len(os.sched_getaffinity(0)), 128) |
| 22 | +WRK_COUNT = max(WRK_COUNT, 4) |
| 23 | + |
| 24 | +DATASET_LARGE_PATH = "/data/dataset-large.json" |
| 25 | +DATASET_PATH = os.environ.get("DATASET_PATH", "/data/dataset.json") |
| 26 | +DATASET_ITEMS = None |
| 27 | +try: |
| 28 | + with open(DATASET_PATH) as file: |
| 29 | + DATASET_ITEMS = json.load(file) |
| 30 | +except Exception: |
| 31 | + pass |
| 32 | + |
| 33 | +# -- Postgres DB ------------------------------------------------------------ |
| 34 | + |
| 35 | +PG_POOL: asyncpg.Pool | None = None |
| 36 | + |
| 37 | +PG_QUERY = ( |
| 38 | + "SELECT id, name, category, price, quantity, active, tags, rating_score, rating_count " |
| 39 | + "FROM items WHERE price BETWEEN $1 AND $2 LIMIT $3" |
| 40 | +) |
| 41 | + |
| 42 | +class NoResetConnection(asyncpg.Connection): |
| 43 | + __slots__ = () |
| 44 | + def get_reset_query(self): |
| 45 | + return "" |
| 46 | + |
| 47 | +@asynccontextmanager |
| 48 | +async def lifespan(application: TurboAPI): |
| 49 | + global PG_POOL, NoResetConnection |
| 50 | + DATABASE_URL = os.environ.get("DATABASE_URL") |
| 51 | + if DATABASE_URL: |
| 52 | + try: |
| 53 | + if DATABASE_URL.startswith("postgres://"): |
| 54 | + DATABASE_URL = "postgresql://" + DATABASE_URL[len("postgres://"):] |
| 55 | + PG_POOL_MAX_SIZE = 2 |
| 56 | + DATABASE_MAX_CONN = os.environ.get("DATABASE_MAX_CONN", None) |
| 57 | + if DATABASE_MAX_CONN: |
| 58 | + pool_size = int(DATABASE_MAX_CONN) * 0.92 / WRK_COUNT |
| 59 | + PG_POOL_MAX_SIZE = int(pool_size + 0.95) |
| 60 | + PG_POOL = await asyncpg.create_pool( |
| 61 | + dsn = DATABASE_URL, |
| 62 | + min_size = 1, |
| 63 | + max_size = max(PG_POOL_MAX_SIZE, 2), |
| 64 | + connection_class = NoResetConnection |
| 65 | + ) |
| 66 | + except Exception: |
| 67 | + PG_POOL = None |
| 68 | + yield |
| 69 | + if PG_POOL: |
| 70 | + await PG_POOL.close() |
| 71 | + PG_POOL = None |
| 72 | + |
| 73 | + |
| 74 | +app = TurboAPI(lifespan=lifespan) |
| 75 | + |
| 76 | +app.add_middleware(GZipMiddleware, minimum_size=1, compresslevel=5) |
| 77 | + |
| 78 | + |
| 79 | +# -- Routes ------------------------------------------------------------------ |
| 80 | + |
| 81 | +@app.get("/pipeline") |
| 82 | +async def pipeline(): |
| 83 | + return PlainTextResponse(b"ok") |
| 84 | + |
| 85 | + |
| 86 | +@app.api_route("/baseline11", methods=["GET", "POST"]) |
| 87 | +async def baseline11(request: Request): |
| 88 | + total = 0 |
| 89 | + for v in request.query_params.values(): |
| 90 | + try: |
| 91 | + total += int(v) |
| 92 | + except ValueError: |
| 93 | + pass |
| 94 | + if request.method == "POST": |
| 95 | + body = await request.body() |
| 96 | + if body: |
| 97 | + try: |
| 98 | + total += int(body.strip()) |
| 99 | + except ValueError: |
| 100 | + pass |
| 101 | + return PlainTextResponse(str(total)) |
| 102 | + |
| 103 | + |
| 104 | +def json_common(request: Request, count: int, m_val: float): |
| 105 | + global DATASET_ITEMS |
| 106 | + if not DATASET_ITEMS: |
| 107 | + return PlainTextResponse("No dataset", 500) |
| 108 | + try: |
| 109 | + items = [ ] |
| 110 | + for idx, dsitem in enumerate(DATASET_ITEMS): |
| 111 | + if idx >= count: |
| 112 | + break |
| 113 | + item = dict(dsitem) |
| 114 | + item["total"] = dsitem["price"] * dsitem["quantity"] * m_val |
| 115 | + items.append(item) |
| 116 | + return JSONResponse( { "items": items, "count": len(items) } ) |
| 117 | + except Exception: |
| 118 | + return JSONResponse( { "items": [ ], "count": 0 } ) |
| 119 | + |
| 120 | + |
| 121 | +@app.get("/json/{count}") |
| 122 | +async def json_endpoint(request: Request, count: int = Path(...), m: float = Query(...)): |
| 123 | + return json_common(request, count, m) |
| 124 | + |
| 125 | + |
| 126 | +@app.get("/json-comp/{count}") |
| 127 | +async def json_comp_endpoint(request: Request, count: int = Path(...), m: float = Query(...)): |
| 128 | + return json_common(request, count, m) |
| 129 | + |
| 130 | + |
| 131 | +@app.get("/async-db") |
| 132 | +async def async_db_endpoint(request: Request, min_val: float = Query(..., alias="min"), max_val: float = Query(..., alias="max"), limit: int = Query(...)): |
| 133 | + global PG_POOL |
| 134 | + if not PG_POOL: |
| 135 | + return JSONResponse( { "items": [ ], "count": 0 } ) |
| 136 | + try: |
| 137 | + db_conn = await PG_POOL.acquire() |
| 138 | + try: |
| 139 | + rows = await db_conn.fetch(PG_QUERY, min_val, max_val, limit) |
| 140 | + finally: |
| 141 | + await PG_POOL.release(db_conn) |
| 142 | + items = [ |
| 143 | + { |
| 144 | + 'id' : row['id'], |
| 145 | + 'name' : row['name'], |
| 146 | + 'category': row['category'], |
| 147 | + 'price' : row['price'], |
| 148 | + 'quantity': row['quantity'], |
| 149 | + 'active' : row['active'], |
| 150 | + 'tags' : json.loads(row['tags']) if isinstance(row['tags'], str) else row['tags'], |
| 151 | + 'rating': { |
| 152 | + 'score': row['rating_score'], |
| 153 | + 'count': row['rating_count'], |
| 154 | + } |
| 155 | + } |
| 156 | + for row in rows |
| 157 | + ] |
| 158 | + return JSONResponse( { "items": items, "count": len(items) } ) |
| 159 | + except Exception: |
| 160 | + return JSONResponse( { "items": [ ], "count": 0 } ) |
| 161 | + |
| 162 | + |
| 163 | +@app.post("/upload") |
| 164 | +async def upload_endpoint(request: Request): |
| 165 | + size = 0 |
| 166 | + async for chunk in request.stream(): |
| 167 | + size += len(chunk) |
| 168 | + return PlainTextResponse(str(size)) |
| 169 | + |
| 170 | + |
| 171 | +try: |
| 172 | + app.mount("/static", StaticFiles(directory="/data/static/"), name="static") |
| 173 | +except Exception: |
| 174 | + pass |
| 175 | + |
| 176 | + |
| 177 | +if __name__ == "__main__": |
| 178 | + app.run(host="0.0.0.0", port=8080, workers=WRK_COUNT) |
0 commit comments