-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathindex.js
More file actions
289 lines (249 loc) · 11.4 KB
/
index.js
File metadata and controls
289 lines (249 loc) · 11.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
import { DurableObject } from "cloudflare:workers";
import { instantiate } from "asyncify-wasm";
import mod from "./$$PROJECT_NAME_UNDERSCORE$$_queue.wasm";
const wasmModule = mod;
/**
* Durable Object for Uzumibi::KV storage
*/
export class UzumibiKVObject extends DurableObject {
async get(key) {
const value = await this.ctx.storage.get(key);
return value ?? null;
}
async set(key, value) {
await this.ctx.storage.put(key, value);
}
}
export default {
async fetch(request, env, ctx) {
return new Response("This endpoint is for queue processing. Please send messages to the queue instead.", { status: 400 });
},
async queue(batch, env, ctx) {
const decoder = new TextDecoder();
const encoder = new TextEncoder();
// Durable Object stub (if binding exists)
const doStub = env.UZUMIBI_KV_DATA
? env.UZUMIBI_KV_DATA.getByName("default")
: null;
// Look up message by ID for concurrency safety
const getMessage = (id) => {
const message = batch.messages.find((m) => m.id === id);
if (!message) throw new Error(`Message not found for id: ${id}`);
return message;
};
const importObject = {
env: {
debug_console_log: (ptr, size) => {
const memory = exports.memory;
const buffer = new Uint8Array(memory.buffer, ptr, size);
console.log(`[debug]: ${decoder.decode(buffer)}`);
return 0;
},
// Fetch.fetch(url, method, body, headers) -> packed Uzumibi::Response
uzumibi_cf_fetch: async (
urlPtr, urlSize,
methodPtr, methodSize,
bodyPtr, bodySize,
headersPtr, headersSize,
resultPtr, resultMaxSize,
) => {
const memory = exports.memory;
const url = decoder.decode(new Uint8Array(memory.buffer, urlPtr, urlSize));
const method = decoder.decode(new Uint8Array(memory.buffer, methodPtr, methodSize));
const body = bodySize > 0
? decoder.decode(new Uint8Array(memory.buffer, bodyPtr, bodySize))
: null;
const fetchOptions = { method };
if (body && method !== "GET" && method !== "HEAD") {
fetchOptions.body = body;
}
// Unpack request headers: u16 LE count, then (u16 LE key_size, key, u16 LE value_size, value) * count
if (headersSize >= 2) {
const hView = new DataView(memory.buffer, headersPtr, headersSize);
const hCount = hView.getUint16(0, true);
if (hCount > 0) {
const reqHeaders = {};
let hPos = 2;
for (let i = 0; i < hCount; i++) {
const kLen = hView.getUint16(hPos, true);
hPos += 2;
const k = decoder.decode(new Uint8Array(memory.buffer, headersPtr + hPos, kLen));
hPos += kLen;
const vLen = hView.getUint16(hPos, true);
hPos += 2;
const v = decoder.decode(new Uint8Array(memory.buffer, headersPtr + hPos, vLen));
hPos += vLen;
reqHeaders[k] = v;
}
fetchOptions.headers = reqHeaders;
}
}
const response = await fetch(url, fetchOptions);
const responseBody = await response.text();
const respHeaders = [];
response.headers.forEach((value, key) => {
respHeaders.push({ key, value });
});
const resultView = new DataView(memory.buffer, resultPtr, resultMaxSize);
const resultBuffer = new Uint8Array(memory.buffer, resultPtr, resultMaxSize);
let pos = 0;
resultView.setUint16(pos, response.status, true);
pos += 2;
resultView.setUint16(pos, respHeaders.length, true);
pos += 2;
for (const header of respHeaders) {
const keyBytes = encoder.encode(header.key);
resultView.setUint16(pos, keyBytes.length, true);
pos += 2;
resultBuffer.set(keyBytes, pos);
pos += keyBytes.length;
const valueBytes = encoder.encode(header.value);
resultView.setUint16(pos, valueBytes.length, true);
pos += 2;
resultBuffer.set(valueBytes, pos);
pos += valueBytes.length;
}
const bodyBytes = encoder.encode(responseBody);
resultView.setUint32(pos, bodyBytes.length, true);
pos += 4;
const bodyLen = Math.min(bodyBytes.length, resultMaxSize - pos);
resultBuffer.set(bodyBytes.slice(0, bodyLen), pos);
pos += bodyLen;
return pos;
},
// KV.get(key) -> value string (via Durable Object)
uzumibi_cf_durable_object_get: async (keyPtr, keySize, resultPtr, resultMaxSize) => {
if (!doStub) return -1;
const memory = exports.memory;
const key = decoder.decode(new Uint8Array(memory.buffer, keyPtr, keySize));
const value = await doStub.get(key);
if (value === null) return -1;
const valueBytes = encoder.encode(value);
const length = Math.min(valueBytes.length, resultMaxSize);
const resultBuffer = new Uint8Array(memory.buffer, resultPtr, resultMaxSize);
resultBuffer.set(valueBytes.slice(0, length));
return length;
},
// KV.set(key, value) (via Durable Object)
uzumibi_cf_durable_object_set: async (keyPtr, keySize, valuePtr, valueSize) => {
if (!doStub) return -1;
const memory = exports.memory;
const key = decoder.decode(new Uint8Array(memory.buffer, keyPtr, keySize));
const value = decoder.decode(new Uint8Array(memory.buffer, valuePtr, valueSize));
await doStub.set(key, value);
return 0;
},
// Queue.send(queue_name, message)
uzumibi_cf_queue_send: async (queueNamePtr, queueNameSize, messagePtr, messageSize) => {
const memory = exports.memory;
const queueName = decoder.decode(new Uint8Array(memory.buffer, queueNamePtr, queueNameSize));
const message = decoder.decode(new Uint8Array(memory.buffer, messagePtr, messageSize));
const queue = env[queueName];
if (!queue) {
console.error(`Queue binding '${queueName}' not found`);
return -1;
}
await queue.send(message);
return 0;
},
uzumibi_cf_message_ack: async (idPtr, idSize) => {
const id = decoder.decode(new Uint8Array(exports.memory.buffer, idPtr, idSize));
getMessage(id).ack();
return 0;
},
uzumibi_cf_message_retry: async (idPtr, idSize, delaySeconds) => {
const id = decoder.decode(new Uint8Array(exports.memory.buffer, idPtr, idSize));
getMessage(id).retry({ delaySeconds });
return 0;
},
},
};
const instance = await instantiate(wasmModule, importObject);
const exports = instance.exports;
for (const message of batch.messages) {
const idBytes = encoder.encode(message.id);
const timestampBytes = encoder.encode(
message.timestamp.toISOString(),
);
const bodyBytes = encoder.encode(
typeof message.body === "string"
? message.body
: JSON.stringify(message.body),
);
const attempts = message.attempts;
// Pack message data:
// u16 LE id_size, id bytes,
// u16 LE timestamp_size, timestamp bytes,
// u32 LE body_size, body bytes,
// u32 LE attempts
const totalSize =
2 +
idBytes.length +
2 +
timestampBytes.length +
4 +
bodyBytes.length +
4;
const msgResult =
await exports.uzumibi_initialize_message(totalSize);
const msgOffset = Number(msgResult & 0xffffffffn);
if (msgOffset === 0) {
const errOffset = Number(
(msgResult >> 32n) & 0xffffffffn,
);
const buffer = new Uint8Array(
exports.memory.buffer,
errOffset,
);
let errStr = "";
for (let i = 0; buffer[i] !== 0; i++) {
errStr += String.fromCharCode(buffer[i]);
}
throw new Error(
`Failed to initialize message: ${errStr}`,
);
}
const msgBuffer = new Uint8Array(
exports.memory.buffer,
msgOffset,
totalSize,
);
const dataView = new DataView(
exports.memory.buffer,
msgOffset,
);
let pos = 0;
// id
dataView.setUint16(pos, idBytes.length, true);
pos += 2;
msgBuffer.set(idBytes, pos);
pos += idBytes.length;
// timestamp
dataView.setUint16(pos, timestampBytes.length, true);
pos += 2;
msgBuffer.set(timestampBytes, pos);
pos += timestampBytes.length;
// body
dataView.setUint32(pos, bodyBytes.length, true);
pos += 4;
msgBuffer.set(bodyBytes, pos);
pos += bodyBytes.length;
// attempts
dataView.setUint32(pos, attempts, true);
const result = await exports.uzumibi_start_message();
if (result !== 0) {
const buffer = new Uint8Array(
exports.memory.buffer,
result,
);
let errStr = "";
for (let i = 0; buffer[i] !== 0; i++) {
errStr += String.fromCharCode(buffer[i]);
}
throw new Error(
`Failed to process message: ${errStr}`,
);
}
}
},
};