|
| 1 | +/* HttpArena minimal bench server on top of libreactorng. |
| 2 | + * |
| 3 | + * Uses libreactor's built-in HTTP parser (session->request.{method,target,body}) |
| 4 | + * so this file is just dispatch + integer arithmetic. |
| 5 | + * |
| 6 | + * Multi-process: one libreactor per logical CPU in the container's affinity |
| 7 | + * mask, each listening on its own SO_REUSEPORT socket so the kernel balances |
| 8 | + * accepted connections across workers. |
| 9 | + */ |
| 10 | +#define _GNU_SOURCE |
| 11 | +#include <stdio.h> |
| 12 | +#include <stdlib.h> |
| 13 | +#include <string.h> |
| 14 | +#include <stdint.h> |
| 15 | +#include <signal.h> |
| 16 | +#include <unistd.h> |
| 17 | +#include <sched.h> |
| 18 | +#include <sys/socket.h> |
| 19 | +#include <netinet/in.h> |
| 20 | +#include <netinet/tcp.h> |
| 21 | + |
| 22 | +#include <reactor.h> |
| 23 | + |
| 24 | +/* Parse a leading signed integer. Skips whitespace, stops at the first |
| 25 | + * non-digit. Matches the contract of nginx/h2o reference implementations. */ |
| 26 | +static int64_t parse_int(const char *p, const char *end) |
| 27 | +{ |
| 28 | + while (p < end && (*p == ' ' || *p == '\t' || *p == '\r' || *p == '\n')) p++; |
| 29 | + int neg = 0; |
| 30 | + if (p < end && *p == '-') { neg = 1; p++; } |
| 31 | + int64_t n = 0; |
| 32 | + while (p < end && *p >= '0' && *p <= '9') { |
| 33 | + n = n * 10 + (*p - '0'); |
| 34 | + p++; |
| 35 | + } |
| 36 | + return neg ? -n : n; |
| 37 | +} |
| 38 | + |
| 39 | +/* Sum integer values across "k1=v1&k2=v2..." — ignores keys, non-integer |
| 40 | + * values silently skip. */ |
| 41 | +static int64_t sum_query(const char *p, size_t len) |
| 42 | +{ |
| 43 | + const char *end = p + len; |
| 44 | + int64_t sum = 0; |
| 45 | + while (p < end) { |
| 46 | + const char *eq = memchr(p, '=', end - p); |
| 47 | + if (!eq) break; |
| 48 | + const char *v = eq + 1; |
| 49 | + const char *amp = memchr(v, '&', end - v); |
| 50 | + const char *ve = amp ? amp : end; |
| 51 | + sum += parse_int(v, ve); |
| 52 | + p = amp ? amp + 1 : end; |
| 53 | + } |
| 54 | + return sum; |
| 55 | +} |
| 56 | + |
| 57 | +static void on_request(reactor_event_t *event) |
| 58 | +{ |
| 59 | + server_session_t *s = (server_session_t *) event->data; |
| 60 | + string_t method = s->request.method; |
| 61 | + string_t target = s->request.target; |
| 62 | + |
| 63 | + /* Split target at the first '?' to get path + query string. */ |
| 64 | + const char *t = (const char *) data_base(target); |
| 65 | + size_t t_len = data_size(target); |
| 66 | + const char *q = memchr(t, '?', t_len); |
| 67 | + size_t path_len = q ? (size_t)(q - t) : t_len; |
| 68 | + const char *qs = q ? q + 1 : NULL; |
| 69 | + size_t qs_len = q ? (t_len - path_len - 1) : 0; |
| 70 | + |
| 71 | + /* Connection: close is NOT honored — libreactor keeps the session open |
| 72 | + * after the response, and the only teardown primitive it exposes |
| 73 | + * (server_disconnect → stream_close) is abortive: it closes before the |
| 74 | + * queued response bytes reach the socket. That causes the TCP |
| 75 | + * fragmentation validation checks to time out reading for an EOF that |
| 76 | + * never comes. Fixing it cleanly would need a write-completion hook in |
| 77 | + * libreactor's stream, which isn't in the public API. Known limitation. */ |
| 78 | + |
| 79 | + if (path_len == 9 && memcmp(t, "/pipeline", 9) == 0) { |
| 80 | + server_plain(s, data_string("ok"), NULL, 0); |
| 81 | + return; |
| 82 | + } |
| 83 | + |
| 84 | + if (path_len == 11 && memcmp(t, "/baseline11", 11) == 0) { |
| 85 | + int64_t sum = qs ? sum_query(qs, qs_len) : 0; |
| 86 | + if (string_equal(method, string("POST"))) { |
| 87 | + const char *b = (const char *) data_base(s->request.body); |
| 88 | + size_t b_len = data_size(s->request.body); |
| 89 | + if (b_len > 0) sum += parse_int(b, b + b_len); |
| 90 | + } |
| 91 | + /* Stack buffer is safe: http_write_response copies via stream_allocate |
| 92 | + * before this handler returns to the event loop. */ |
| 93 | + char buf[32]; |
| 94 | + int n = snprintf(buf, sizeof(buf), "%lld", (long long) sum); |
| 95 | + server_plain(s, data(buf, n), NULL, 0); |
| 96 | + return; |
| 97 | + } |
| 98 | + |
| 99 | + if (path_len == 10 && memcmp(t, "/baseline2", 10) == 0) { |
| 100 | + int64_t sum = qs ? sum_query(qs, qs_len) : 0; |
| 101 | + char buf[32]; |
| 102 | + int n = snprintf(buf, sizeof(buf), "%lld", (long long) sum); |
| 103 | + server_plain(s, data(buf, n), NULL, 0); |
| 104 | + return; |
| 105 | + } |
| 106 | + |
| 107 | + server_respond(s, string("404 Not Found"), string("text/plain"), |
| 108 | + data_string("Not Found"), NULL, 0); |
| 109 | +} |
| 110 | + |
| 111 | +static int make_reuseport_socket(int port) |
| 112 | +{ |
| 113 | + int s = socket(AF_INET, SOCK_STREAM, 0); |
| 114 | + if (s < 0) { perror("socket"); exit(1); } |
| 115 | + int on = 1; |
| 116 | + setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)); |
| 117 | + setsockopt(s, SOL_SOCKET, SO_REUSEPORT, &on, sizeof(on)); |
| 118 | + setsockopt(s, IPPROTO_TCP, TCP_NODELAY, &on, sizeof(on)); |
| 119 | + struct sockaddr_in addr = { |
| 120 | + .sin_family = AF_INET, |
| 121 | + .sin_port = htons(port), |
| 122 | + .sin_addr.s_addr = htonl(INADDR_ANY), |
| 123 | + }; |
| 124 | + if (bind(s, (struct sockaddr *) &addr, sizeof(addr)) < 0) { |
| 125 | + perror("bind"); exit(1); |
| 126 | + } |
| 127 | + if (listen(s, 4096) < 0) { perror("listen"); exit(1); } |
| 128 | + return s; |
| 129 | +} |
| 130 | + |
| 131 | +int main(void) |
| 132 | +{ |
| 133 | + signal(SIGPIPE, SIG_IGN); |
| 134 | + |
| 135 | + /* Respect Docker --cpuset-cpus via the affinity mask. sysconf() would |
| 136 | + * report the host CPU count which isn't what we want inside a pinned |
| 137 | + * container. */ |
| 138 | + cpu_set_t cs; |
| 139 | + int workers = 1; |
| 140 | + if (sched_getaffinity(0, sizeof(cs), &cs) == 0) workers = CPU_COUNT(&cs); |
| 141 | + if (workers < 1) workers = 1; |
| 142 | + |
| 143 | + for (int i = 1; i < workers; i++) { |
| 144 | + pid_t pid = fork(); |
| 145 | + if (pid < 0) { perror("fork"); break; } |
| 146 | + if (pid == 0) break; |
| 147 | + } |
| 148 | + |
| 149 | + int fd = make_reuseport_socket(8080); |
| 150 | + |
| 151 | + server_t server; |
| 152 | + reactor_construct(); |
| 153 | + server_construct(&server, on_request, NULL); |
| 154 | + server_open_socket(&server, fd); |
| 155 | + reactor_loop(); |
| 156 | + server_destruct(&server); |
| 157 | + reactor_destruct(); |
| 158 | + return 0; |
| 159 | +} |
0 commit comments