Skip to content

Commit d037851

Browse files
committed
WebSocket WebAPI
Uses libcurl's websocket capabilities to add support for WebSocket. Depends on lightpanda-io/zig-v8-fork#167 Issue: #1952 This is a WIP because it currently uses the same connection pool used for all HTTP requests. It would be pretty easy for a page to starve the pool and block any progress. We previously stored the *Transfer inside of the easy's private data. We now store the *Connection, and a Connection now has a `transport` field which is a union for `http: *Transfer` or `websocket: *Websocket`.
1 parent 20cadd2 commit d037851

18 files changed

Lines changed: 2039 additions & 105 deletions

build.zig

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -462,6 +462,7 @@ fn buildCurl(
462462
.CURL_DISABLE_SMTP = true,
463463
.CURL_DISABLE_TELNET = true,
464464
.CURL_DISABLE_TFTP = true,
465+
.CURL_DISABLE_WEBSOCKETS = false, // Enable WebSocket support
465466

466467
.ssize_t = null,
467468
._FILE_OFFSET_BITS = 64,

build.zig.zon

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@
55
.minimum_zig_version = "0.15.2",
66
.dependencies = .{
77
.v8 = .{
8-
.url = "https://github.com/lightpanda-io/zig-v8-fork/archive/refs/tags/v0.3.7.tar.gz",
9-
.hash = "v8-0.0.0-xddH67uBBAD95hWsPQz3Ni1PlZjdywtPXrGUAp8rSKco",
8+
.url = "https://github.com/lightpanda-io/zig-v8-fork/archive/99c1ddf2d0b15f141e92ea09abdfc8e0e5f441e6.tar.gz",
9+
.hash = "v8-0.0.0-xddH63-BBABP05dni8oMrs9qQwuczHhNhXHbXXlPb95s",
1010
},
1111
// .v8 = .{ .path = "../zig-v8-fork" },
1212
.brotli = .{

src/TestWSServer.zig

Lines changed: 371 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,371 @@
1+
// Copyright (C) 2023-2026 Lightpanda (Selecy SAS)
2+
//
3+
// Francis Bouvier <francis@lightpanda.io>
4+
// Pierre Tachoire <pierre@lightpanda.io>
5+
//
6+
// This program is free software: you can redistribute it and/or modify
7+
// it under the terms of the GNU Affero General Public License as
8+
// published by the Free Software Foundation, either version 3 of the
9+
// License, or (at your option) any later version.
10+
//
11+
// This program is distributed in the hope that it will be useful,
12+
// but WITHOUT ANY WARRANTY; without even the implied warranty of
13+
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14+
// GNU Affero General Public License for more details.
15+
//
16+
// You should have received a copy of the GNU Affero General Public License
17+
// along with this program. If not, see <https://www.gnu.org/licenses/>.
18+
19+
const std = @import("std");
20+
const posix = std.posix;
21+
22+
const TestWSServer = @This();
23+
24+
shutdown: std.atomic.Value(bool),
25+
listener: ?posix.socket_t,
26+
27+
pub fn init() TestWSServer {
28+
return .{
29+
.shutdown = .init(true),
30+
.listener = null,
31+
};
32+
}
33+
34+
pub fn deinit(self: *TestWSServer) void {
35+
if (self.listener) |socket| {
36+
posix.close(socket);
37+
self.listener = null;
38+
}
39+
}
40+
41+
pub fn stop(self: *TestWSServer) void {
42+
self.shutdown.store(true, .release);
43+
if (self.listener) |socket| {
44+
posix.close(socket);
45+
self.listener = null;
46+
}
47+
}
48+
49+
pub fn run(self: *TestWSServer, wg: *std.Thread.WaitGroup) void {
50+
self.runImpl(wg) catch |err| {
51+
std.debug.print("WebSocket echo server error: {}\n", .{err});
52+
};
53+
}
54+
55+
fn runImpl(self: *TestWSServer, wg: *std.Thread.WaitGroup) !void {
56+
const socket = try posix.socket(posix.AF.INET, posix.SOCK.STREAM, 0);
57+
errdefer posix.close(socket);
58+
59+
const addr = std.net.Address.initIp4(.{ 127, 0, 0, 1 }, 9584);
60+
61+
try posix.setsockopt(socket, posix.SOL.SOCKET, posix.SO.REUSEADDR, &std.mem.toBytes(@as(c_int, 1)));
62+
try posix.bind(socket, &addr.any, addr.getOsSockLen());
63+
try posix.listen(socket, 8);
64+
65+
self.listener = socket;
66+
self.shutdown.store(false, .release);
67+
wg.finish();
68+
69+
while (!self.shutdown.load(.acquire)) {
70+
var client_addr: posix.sockaddr = undefined;
71+
var addr_len: posix.socklen_t = @sizeOf(posix.sockaddr);
72+
73+
const client = posix.accept(socket, &client_addr, &addr_len, 0) catch |err| {
74+
if (self.shutdown.load(.acquire)) return;
75+
std.debug.print("[WS Server] Accept error: {}\n", .{err});
76+
continue;
77+
};
78+
79+
const thread = std.Thread.spawn(.{}, handleClient, .{client}) catch |err| {
80+
std.debug.print("[WS Server] Thread spawn error: {}\n", .{err});
81+
posix.close(client);
82+
continue;
83+
};
84+
thread.detach();
85+
}
86+
}
87+
88+
fn handleClient(client: posix.socket_t) void {
89+
defer posix.close(client);
90+
91+
var buf: [4096]u8 = undefined;
92+
const n = posix.read(client, &buf) catch return;
93+
94+
const request = buf[0..n];
95+
96+
// Find Sec-WebSocket-Key
97+
const key_header = "Sec-WebSocket-Key: ";
98+
const key_start = std.mem.indexOf(u8, request, key_header) orelse return;
99+
const key_line_start = key_start + key_header.len;
100+
const key_end = std.mem.indexOfScalarPos(u8, request, key_line_start, '\r') orelse return;
101+
const key = request[key_line_start..key_end];
102+
103+
// Compute accept key
104+
var hasher = std.crypto.hash.Sha1.init(.{});
105+
hasher.update(key);
106+
hasher.update("258EAFA5-E914-47DA-95CA-C5AB0DC85B11");
107+
var hash: [20]u8 = undefined;
108+
hasher.final(&hash);
109+
110+
var accept_key: [28]u8 = undefined;
111+
_ = std.base64.standard.Encoder.encode(&accept_key, &hash);
112+
113+
// Send upgrade response
114+
var resp_buf: [256]u8 = undefined;
115+
const resp = std.fmt.bufPrint(&resp_buf, "HTTP/1.1 101 Switching Protocols\r\n" ++
116+
"Upgrade: websocket\r\n" ++
117+
"Connection: Upgrade\r\n" ++
118+
"Sec-WebSocket-Accept: {s}\r\n\r\n", .{accept_key}) catch return;
119+
_ = posix.write(client, resp) catch return;
120+
121+
// Message loop with larger buffer for big messages
122+
var msg_buf: [128 * 1024]u8 = undefined;
123+
var recv_buf = RecvBuffer{ .buf = &msg_buf };
124+
125+
while (true) {
126+
const frame = recv_buf.readFrame(client) orelse break;
127+
128+
// Close frame - echo it back before closing
129+
if (frame.opcode == 8) {
130+
sendFrame(client, 8, "", frame.payload) catch {};
131+
break;
132+
}
133+
134+
// Handle commands or echo
135+
if (frame.opcode == 1) { // Text
136+
handleTextMessage(client, frame.payload) catch break;
137+
} else if (frame.opcode == 2) { // Binary
138+
handleBinaryMessage(client, frame.payload) catch break;
139+
}
140+
}
141+
}
142+
143+
const Frame = struct {
144+
opcode: u8,
145+
payload: []u8,
146+
};
147+
148+
const RecvBuffer = struct {
149+
buf: []u8,
150+
start: usize = 0,
151+
end: usize = 0,
152+
153+
fn available(self: *RecvBuffer) []u8 {
154+
return self.buf[self.start..self.end];
155+
}
156+
157+
fn consume(self: *RecvBuffer, n: usize) void {
158+
self.start += n;
159+
if (self.start >= self.end) {
160+
self.start = 0;
161+
self.end = 0;
162+
}
163+
}
164+
165+
fn ensureBytes(self: *RecvBuffer, client: posix.socket_t, needed: usize) bool {
166+
while (self.end - self.start < needed) {
167+
// Compact buffer if needed
168+
if (self.end >= self.buf.len - 1024) {
169+
const avail = self.end - self.start;
170+
std.mem.copyForwards(u8, self.buf[0..avail], self.buf[self.start..self.end]);
171+
self.start = 0;
172+
self.end = avail;
173+
}
174+
175+
const n = posix.read(client, self.buf[self.end..]) catch return false;
176+
if (n == 0) return false;
177+
self.end += n;
178+
}
179+
return true;
180+
}
181+
182+
fn readFrame(self: *RecvBuffer, client: posix.socket_t) ?Frame {
183+
// Need at least 2 bytes for basic header
184+
if (!self.ensureBytes(client, 2)) return null;
185+
186+
const data = self.available();
187+
const opcode = data[0] & 0x0F;
188+
const masked = (data[1] & 0x80) != 0;
189+
var payload_len: usize = data[1] & 0x7F;
190+
var header_size: usize = 2;
191+
192+
// Extended payload length
193+
if (payload_len == 126) {
194+
if (!self.ensureBytes(client, 4)) return null;
195+
const d = self.available();
196+
payload_len = @as(usize, d[2]) << 8 | d[3];
197+
header_size = 4;
198+
} else if (payload_len == 127) {
199+
if (!self.ensureBytes(client, 10)) return null;
200+
const d = self.available();
201+
payload_len = @as(usize, d[2]) << 56 |
202+
@as(usize, d[3]) << 48 |
203+
@as(usize, d[4]) << 40 |
204+
@as(usize, d[5]) << 32 |
205+
@as(usize, d[6]) << 24 |
206+
@as(usize, d[7]) << 16 |
207+
@as(usize, d[8]) << 8 |
208+
d[9];
209+
header_size = 10;
210+
}
211+
212+
const mask_size: usize = if (masked) 4 else 0;
213+
const total_frame_size = header_size + mask_size + payload_len;
214+
215+
if (!self.ensureBytes(client, total_frame_size)) return null;
216+
217+
const frame_data = self.available();
218+
219+
// Get mask key if present
220+
var mask_key: [4]u8 = undefined;
221+
if (masked) {
222+
@memcpy(&mask_key, frame_data[header_size..][0..4]);
223+
}
224+
225+
// Get payload and unmask
226+
const payload_start = header_size + mask_size;
227+
const payload = frame_data[payload_start..][0..payload_len];
228+
229+
if (masked) {
230+
for (payload, 0..) |*b, i| {
231+
b.* ^= mask_key[i % 4];
232+
}
233+
}
234+
235+
self.consume(total_frame_size);
236+
237+
return .{ .opcode = opcode, .payload = payload };
238+
}
239+
};
240+
241+
fn handleTextMessage(client: posix.socket_t, payload: []const u8) !void {
242+
// Command: force-close - close socket immediately without close frame
243+
if (std.mem.eql(u8, payload, "force-close")) {
244+
return error.ForceClose;
245+
}
246+
247+
// Command: send-large:N - send a message of N bytes
248+
if (std.mem.startsWith(u8, payload, "send-large:")) {
249+
const size_str = payload["send-large:".len..];
250+
const size = std.fmt.parseInt(usize, size_str, 10) catch return error.InvalidCommand;
251+
try sendLargeMessage(client, size);
252+
return;
253+
}
254+
255+
// Command: close:CODE:REASON - send close frame with specific code/reason
256+
if (std.mem.startsWith(u8, payload, "close:")) {
257+
const rest = payload["close:".len..];
258+
if (std.mem.indexOf(u8, rest, ":")) |sep| {
259+
const code = std.fmt.parseInt(u16, rest[0..sep], 10) catch 1000;
260+
const reason = rest[sep + 1 ..];
261+
try sendCloseFrame(client, code, reason);
262+
}
263+
return;
264+
}
265+
266+
// Default: echo with "echo-" prefix
267+
const prefix = "echo-";
268+
try sendFrame(client, 1, prefix, payload);
269+
}
270+
271+
fn handleBinaryMessage(client: posix.socket_t, payload: []const u8) !void {
272+
// Echo binary data back with byte 0xEE prepended as marker
273+
const marker = [_]u8{0xEE};
274+
try sendFrame(client, 2, &marker, payload);
275+
}
276+
277+
fn sendFrame(client: posix.socket_t, opcode: u8, prefix: []const u8, payload: []const u8) !void {
278+
const total_len = prefix.len + payload.len;
279+
280+
// Build header
281+
var header: [10]u8 = undefined;
282+
var header_len: usize = 2;
283+
284+
header[0] = 0x80 | opcode; // FIN + opcode
285+
286+
if (total_len <= 125) {
287+
header[1] = @intCast(total_len);
288+
} else if (total_len <= 65535) {
289+
header[1] = 126;
290+
header[2] = @intCast((total_len >> 8) & 0xFF);
291+
header[3] = @intCast(total_len & 0xFF);
292+
header_len = 4;
293+
} else {
294+
header[1] = 127;
295+
header[2] = @intCast((total_len >> 56) & 0xFF);
296+
header[3] = @intCast((total_len >> 48) & 0xFF);
297+
header[4] = @intCast((total_len >> 40) & 0xFF);
298+
header[5] = @intCast((total_len >> 32) & 0xFF);
299+
header[6] = @intCast((total_len >> 24) & 0xFF);
300+
header[7] = @intCast((total_len >> 16) & 0xFF);
301+
header[8] = @intCast((total_len >> 8) & 0xFF);
302+
header[9] = @intCast(total_len & 0xFF);
303+
header_len = 10;
304+
}
305+
306+
_ = try posix.write(client, header[0..header_len]);
307+
if (prefix.len > 0) {
308+
_ = try posix.write(client, prefix);
309+
}
310+
if (payload.len > 0) {
311+
_ = try posix.write(client, payload);
312+
}
313+
}
314+
315+
fn sendLargeMessage(client: posix.socket_t, size: usize) !void {
316+
// Build header
317+
var header: [10]u8 = undefined;
318+
var header_len: usize = 2;
319+
320+
header[0] = 0x81; // FIN + text
321+
322+
if (size <= 125) {
323+
header[1] = @intCast(size);
324+
} else if (size <= 65535) {
325+
header[1] = 126;
326+
header[2] = @intCast((size >> 8) & 0xFF);
327+
header[3] = @intCast(size & 0xFF);
328+
header_len = 4;
329+
} else {
330+
header[1] = 127;
331+
header[2] = @intCast((size >> 56) & 0xFF);
332+
header[3] = @intCast((size >> 48) & 0xFF);
333+
header[4] = @intCast((size >> 40) & 0xFF);
334+
header[5] = @intCast((size >> 32) & 0xFF);
335+
header[6] = @intCast((size >> 24) & 0xFF);
336+
header[7] = @intCast((size >> 16) & 0xFF);
337+
header[8] = @intCast((size >> 8) & 0xFF);
338+
header[9] = @intCast(size & 0xFF);
339+
header_len = 10;
340+
}
341+
342+
_ = try posix.write(client, header[0..header_len]);
343+
344+
// Send payload in chunks - pattern of 'A'-'Z' repeating
345+
var sent: usize = 0;
346+
var chunk: [4096]u8 = undefined;
347+
while (sent < size) {
348+
const to_send = @min(chunk.len, size - sent);
349+
for (chunk[0..to_send], 0..) |*b, i| {
350+
b.* = @intCast('A' + ((sent + i) % 26));
351+
}
352+
_ = try posix.write(client, chunk[0..to_send]);
353+
sent += to_send;
354+
}
355+
}
356+
357+
fn sendCloseFrame(client: posix.socket_t, code: u16, reason: []const u8) !void {
358+
const reason_len = @min(reason.len, 123); // Max 123 bytes for reason
359+
const payload_len = 2 + reason_len;
360+
361+
var frame: [129]u8 = undefined; // 2 header + 2 code + 123 reason + 2 padding
362+
frame[0] = 0x88; // FIN + close
363+
frame[1] = @intCast(payload_len);
364+
frame[2] = @intCast((code >> 8) & 0xFF);
365+
frame[3] = @intCast(code & 0xFF);
366+
if (reason_len > 0) {
367+
@memcpy(frame[4..][0..reason_len], reason[0..reason_len]);
368+
}
369+
370+
_ = try posix.write(client, frame[0 .. 4 + reason_len]);
371+
}

0 commit comments

Comments
 (0)