|
| 1 | +(ns aleph-bench.core |
| 2 | + (:require [aleph.http :as http] |
| 3 | + [aleph.netty :as netty] |
| 4 | + [clojure.java.io :as io] |
| 5 | + [clojure.string :as str] |
| 6 | + [jj.sql.async-boa :as async-boa] |
| 7 | + [jj.sql.boa :as boa] |
| 8 | + [jj.sql.boa.query.next-jdbc :refer [->NextJdbcAdapter]] |
| 9 | + [jj.sql.boa.query.vertx-pg :as vertx-adapter] |
| 10 | + [jj.tassu :refer [GET POST route]] |
| 11 | + [jsonista.core :as json] |
| 12 | + [manifold.deferred :as d] |
| 13 | + [manifold.stream :as s] |
| 14 | + [next.jdbc :as jdbc]) |
| 15 | + (:import (io.netty.buffer ByteBuf PooledByteBufAllocator) |
| 16 | + (io.netty.channel ChannelOption) |
| 17 | + (io.netty.handler.codec.http HttpContentCompressor) |
| 18 | + (io.vertx.core Vertx) |
| 19 | + (io.vertx.pgclient PgBuilder PgConnectOptions) |
| 20 | + (io.vertx.sqlclient PoolOptions) |
| 21 | + (java.io ByteArrayOutputStream) |
| 22 | + (java.net URI)) |
| 23 | + (:gen-class)) |
| 24 | + |
| 25 | +(def ^:private ^:const ct-json "application/json") |
| 26 | +(def ^:private ^:const ct-text "text/plain") |
| 27 | +(def ^:private ^:const ct-octet "application/octet-stream") |
| 28 | +(def ^:private ^:const hdr-ct "Content-Type") |
| 29 | +(def ^:private ^:const hdr-server "Server") |
| 30 | +(def ^:private ^:const server-name "aleph") |
| 31 | +(def ^:private ^:const dot ".") |
| 32 | +(def ^:private ^:const not-found-body "Not found") |
| 33 | +(def ^:private ^:const empty-db-body "{\"items\":[],\"count\":0}") |
| 34 | +(def ^:private ^:const dataset-path "/data/dataset.json") |
| 35 | +(def ^:private ^:const dataset-large-path "/data/dataset-large.json") |
| 36 | +(def ^:private ^:const db-path "/data/benchmark.db") |
| 37 | +(def ^:private ^:const static-dir "/data/static") |
| 38 | +(def ^:private ^:const param-min "min") |
| 39 | +(def ^:private ^:const param-max "max") |
| 40 | +(def ^:private ^:const param-limit "limit") |
| 41 | +(def ^:private ^:const param-m "m") |
| 42 | +(def ^:private ^:const pg-prefix "postgres://") |
| 43 | +(def ^:private ^:const pg-replace "postgresql://") |
| 44 | + |
| 45 | +(def ^:private json-headers {hdr-ct ct-json hdr-server server-name}) |
| 46 | +(def ^:private text-headers {hdr-ct ct-text hdr-server server-name}) |
| 47 | +(def ^:private empty-db-response {:status 200 :headers json-headers :body empty-db-body}) |
| 48 | + |
| 49 | +(defn- load-json [path] |
| 50 | + (when (.exists (io/file path)) |
| 51 | + (json/read-value (slurp path) json/keyword-keys-object-mapper))) |
| 52 | + |
| 53 | +(defn- process-item [item ^long m] |
| 54 | + (assoc item :total (* (:price item) (:quantity item) m))) |
| 55 | + |
| 56 | +(defn- parse-qs [^String qs] |
| 57 | + (when qs |
| 58 | + (loop [i 0 m (transient {})] |
| 59 | + (if (>= i (.length qs)) |
| 60 | + (persistent! m) |
| 61 | + (let [amp (.indexOf qs (int \&) i) |
| 62 | + end (if (neg? amp) (.length qs) amp) |
| 63 | + eq (.indexOf qs (int \=) i)] |
| 64 | + (if (and (>= eq 0) (< eq end)) |
| 65 | + (recur (inc end) (assoc! m (subs qs i eq) (subs qs (inc eq) end))) |
| 66 | + (recur (inc end) m))))))) |
| 67 | + |
| 68 | +(defn- sum-params [^String qs] |
| 69 | + (if (nil? qs) 0 |
| 70 | + (loop [i 0 sum 0] |
| 71 | + (if (>= i (.length qs)) |
| 72 | + sum |
| 73 | + (let [amp (.indexOf qs (int \&) i) |
| 74 | + end (if (neg? amp) (.length qs) amp) |
| 75 | + eq (.indexOf qs (int \=) i)] |
| 76 | + (if (and (>= eq 0) (< eq end)) |
| 77 | + (recur (inc end) (+ sum (try (Long/parseLong (subs qs (inc eq) end)) (catch Exception _ 0)))) |
| 78 | + (recur (inc end) sum))))))) |
| 79 | + |
| 80 | +(defn- json-response [data] |
| 81 | + {:status 200 :headers json-headers :body (json/write-value-as-string data)}) |
| 82 | + |
| 83 | +(defn- text-response [s] |
| 84 | + {:status 200 :headers text-headers :body (str s)}) |
| 85 | + |
| 86 | +(defn- parse-long-param [params k default] |
| 87 | + (try (Long/parseLong (get params k)) (catch Exception _ default))) |
| 88 | + |
| 89 | +(defn- parse-double-param [params k default] |
| 90 | + (try (Double/parseDouble (get params k)) (catch Exception _ default))) |
| 91 | + |
| 92 | +(defn- read-body-bytes [body] |
| 93 | + (if (nil? body) |
| 94 | + (d/success-deferred (byte-array 0)) |
| 95 | + (d/chain |
| 96 | + (s/reduce |
| 97 | + (fn [^ByteArrayOutputStream baos ^ByteBuf buf] |
| 98 | + (try |
| 99 | + (let [n (.readableBytes buf) |
| 100 | + arr (byte-array n)] |
| 101 | + (.readBytes buf arr) |
| 102 | + (.write baos arr 0 n) |
| 103 | + baos) |
| 104 | + (finally (.release buf)))) |
| 105 | + (ByteArrayOutputStream.) |
| 106 | + body) |
| 107 | + (fn [^ByteArrayOutputStream baos] (.toByteArray baos))))) |
| 108 | + |
| 109 | +(def ^:private ^:const extension-map |
| 110 | + {".css" "text/css" |
| 111 | + ".js" "application/javascript" |
| 112 | + ".html" "text/html" |
| 113 | + ".woff2" "font/woff2" |
| 114 | + ".svg" "image/svg+xml" |
| 115 | + ".webp" "image/webp" |
| 116 | + ".json" ct-json}) |
| 117 | + |
| 118 | +(defn- get-content-type [^String name] |
| 119 | + (let [dot-index (.lastIndexOf name ^String dot) |
| 120 | + ext (if (>= dot-index 0) (subs name dot-index) "")] |
| 121 | + (get extension-map ext ct-octet))) |
| 122 | + |
| 123 | +(defn- transform-row [row parse-tags parse-active] |
| 124 | + {:id (:id row) :name (:name row) :category (:category row) |
| 125 | + :price (:price row) :quantity (:quantity row) :active (parse-active (:active row)) |
| 126 | + :tags (parse-tags (:tags row)) |
| 127 | + :rating {:score (:rating_score row) :count (:rating_count row)}}) |
| 128 | + |
| 129 | +(defn -main [& _] |
| 130 | + (netty/leak-detector-level! :disabled) |
| 131 | + (let [dataset (load-json (or (System/getenv "DATASET_PATH") dataset-path)) |
| 132 | + json-body (let [items (mapv #(process-item % 1) dataset)] |
| 133 | + (json/write-value-as-string {:items items :count (clojure.core/count items)})) |
| 134 | + large-dataset (load-json dataset-large-path) |
| 135 | + compression-body (when large-dataset |
| 136 | + (let [items (mapv #(process-item % 1) large-dataset)] |
| 137 | + (json/write-value-as-string {:items items :count (clojure.core/count items)}))) |
| 138 | + static-cache (let [dir (io/file static-dir)] |
| 139 | + (when (.isDirectory dir) |
| 140 | + (into {} |
| 141 | + (map (fn [^java.io.File f] |
| 142 | + [(.getName f) |
| 143 | + {:ct (get-content-type (.getName f)) |
| 144 | + :body (java.nio.file.Files/readAllBytes (.toPath f))}])) |
| 145 | + (.listFiles dir)))) |
| 146 | + adapter (->NextJdbcAdapter) |
| 147 | + sqlite-tag-parser #(json/read-value % json/keyword-keys-object-mapper) |
| 148 | + sqlite-active #(== 1 (long %)) |
| 149 | + pg-tag-parser #(json/read-value (str %)) |
| 150 | + db-query-fn (when (.exists (io/file db-path)) |
| 151 | + (boa/build-query adapter "sql/db-query")) |
| 152 | + tl-ds (ThreadLocal.) |
| 153 | + get-sqlite-ds (fn [] |
| 154 | + (or (.get tl-ds) |
| 155 | + (let [ds (jdbc/get-datasource {:dbtype "sqlite" :dbname db-path :read-only true})] |
| 156 | + (.set tl-ds ds) |
| 157 | + ds))) |
| 158 | + pg-pool (when-let [url (System/getenv "DATABASE_URL")] |
| 159 | + (try |
| 160 | + (let [uri (URI. (str/replace url pg-prefix pg-replace)) |
| 161 | + host (.getHost uri) |
| 162 | + port (if (pos? (.getPort uri)) (.getPort uri) 5432) |
| 163 | + db (subs (.getPath uri) 1) |
| 164 | + [user pass] (str/split (.getUserInfo uri) #":" 2) |
| 165 | + max-conn (try (Integer/parseInt (System/getenv "DATABASE_MAX_CONN")) |
| 166 | + (catch Exception _ 256)) |
| 167 | + connect-opts (-> (PgConnectOptions.) |
| 168 | + (.setHost host) |
| 169 | + (.setPort port) |
| 170 | + (.setDatabase db) |
| 171 | + (.setUser user) |
| 172 | + (.setPassword (or pass ""))) |
| 173 | + pool-opts (-> (PoolOptions.) (.setMaxSize max-conn)) |
| 174 | + vertx (Vertx/vertx)] |
| 175 | + (-> (PgBuilder/pool) |
| 176 | + (.with pool-opts) |
| 177 | + (.connectingTo connect-opts) |
| 178 | + (.using vertx) |
| 179 | + (.build))) |
| 180 | + (catch Throwable t |
| 181 | + (println "PG init failed:" (.getMessage t)) |
| 182 | + nil))) |
| 183 | + pg-query (when pg-pool |
| 184 | + (async-boa/build-async-query (vertx-adapter/->VertxPgAdapter) "sql/pg-query")) |
| 185 | + |
| 186 | + handler |
| 187 | + (route |
| 188 | + {"/baseline11" [(GET (fn [req] (text-response (sum-params (:query-string req))))) |
| 189 | + (POST (fn [req] |
| 190 | + (let [s (sum-params (:query-string req))] |
| 191 | + (d/chain (read-body-bytes (:body req)) |
| 192 | + (fn [^bytes bs] |
| 193 | + (let [n (try (Long/parseLong (str/trim (String. bs))) (catch Exception _ 0))] |
| 194 | + (text-response (+ s n))))))))] |
| 195 | + "/json/:count" [(GET (fn [req] |
| 196 | + (let [count (try (Long/parseLong (get-in req [:params :count])) (catch Exception _ 50)) |
| 197 | + count (min count (long (clojure.core/count dataset))) |
| 198 | + params (parse-qs (:query-string req)) |
| 199 | + m (parse-long-param params param-m 1) |
| 200 | + items (mapv #(process-item % m) (subvec dataset 0 count))] |
| 201 | + {:status 200 :headers json-headers :body (json/write-value-as-string {:items items :count (clojure.core/count items)})})))] |
| 202 | + "/json" [(GET (fn [_] {:status 200 :headers json-headers :body json-body}))] |
| 203 | + "/compression" [(GET (fn [_] {:status 200 :headers json-headers :body compression-body}))] |
| 204 | + "/upload" [(POST (fn [req] |
| 205 | + (d/chain (read-body-bytes (:body req)) |
| 206 | + (fn [^bytes bs] (text-response (alength bs))))))] |
| 207 | + "/db" [(GET (fn [req] |
| 208 | + (if db-query-fn |
| 209 | + (let [params (parse-qs (:query-string req)) |
| 210 | + min-p (parse-double-param params param-min 10.0) |
| 211 | + max-p (parse-double-param params param-max 50.0) |
| 212 | + limit (parse-long-param params param-limit 50) |
| 213 | + items (try (mapv #(transform-row % sqlite-tag-parser sqlite-active) (db-query-fn (get-sqlite-ds) {:min min-p :max max-p :limit limit})) |
| 214 | + (catch Exception _ []))] |
| 215 | + (json-response {:items items :count (clojure.core/count items)})) |
| 216 | + empty-db-response)))] |
| 217 | + "/async-db" [(GET (fn [req] |
| 218 | + (if pg-query |
| 219 | + (let [params (parse-qs (:query-string req)) |
| 220 | + min-p (parse-double-param params param-min 10.0) |
| 221 | + max-p (parse-double-param params param-max 50.0) |
| 222 | + limit (parse-long-param params param-limit 50) |
| 223 | + dfd (d/deferred)] |
| 224 | + (pg-query pg-pool {:min min-p :max max-p :limit limit} |
| 225 | + (fn [rows] |
| 226 | + (let [items (mapv #(transform-row % pg-tag-parser identity) rows)] |
| 227 | + (d/success! dfd (json-response {:items items :count (clojure.core/count items)})))) |
| 228 | + (fn [_] (d/success! dfd empty-db-response))) |
| 229 | + dfd) |
| 230 | + empty-db-response)))] |
| 231 | + "/static/:filename" [(GET (fn [req] |
| 232 | + (let [name (get-in req [:params :filename])] |
| 233 | + (if-let [entry (and static-cache (get static-cache name))] |
| 234 | + {:status 200 :headers {hdr-ct (:ct entry) hdr-server server-name} :body (:body entry)} |
| 235 | + {:status 404 :body not-found-body}))))] |
| 236 | + "/" [(GET (fn [_] (text-response server-name)))]})] |
| 237 | + |
| 238 | + (http/start-server handler {:port 8080 |
| 239 | + :raw-stream? true |
| 240 | + :executor :none |
| 241 | + :bootstrap-transform (fn [bootstrap] |
| 242 | + (.option bootstrap ChannelOption/ALLOCATOR PooledByteBufAllocator/DEFAULT) |
| 243 | + (.childOption bootstrap ChannelOption/ALLOCATOR PooledByteBufAllocator/DEFAULT)) |
| 244 | + :pipeline-transform (fn [pipeline] |
| 245 | + (.remove pipeline "continue-handler") |
| 246 | + (.addBefore pipeline "request-handler" "compressor" (HttpContentCompressor.)))}) |
| 247 | + (println "Server running on port 8080") |
| 248 | + @(promise))) |
0 commit comments