Skip to content

Commit 3fc794c

Browse files
committed
Modified everything to use 'res.abortCh'
1 parent 268c3df commit 3fc794c

6 files changed

Lines changed: 24 additions & 25 deletions

File tree

bun.lock

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@ublitzjs/payload",
3-
"version": "1.0.2",
3+
"version": "1.0.3",
44
"types": "./dist/types/index.d.ts",
55
"files": ["dist"],
66
"tsd": {
@@ -31,7 +31,7 @@
3131
}
3232
},
3333
"dependencies": {
34-
"@ublitzjs/core": "^1.1.0",
34+
"@ublitzjs/core": "^1.2.0",
3535
"busboy": "^1.6.0",
3636
"nanoid": "^5.1.6",
3737
"tseep": "^1.3.1"

src/index.ts

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ export type FilesOnDisk<Repeated extends boolean> = FilesType<FileOnDisk, Repeat
3333
// type of "files" object, returned by parseFormDataBody with memory mode
3434
export type FilesInMemory<Repeated extends boolean> = FilesType<FileInMemory, Repeated>;
3535
/**
36-
* This function asynchronously parses multipart requests until the end, so that any work considering unfinished request wouldn't need to be undone.
36+
* This function asynchronously parses multipart requests until the end, so that any work considering unfinished request wouldn't need to be undone. It requires "regAbort" from "@ublitzjs/core" package to be used on response.
3737
* This utility expects "unpaused" response object and doesn't set any additional properties on it.
3838
* When limits are exceeded - acts as like because of an error (as described below)
3939
* @param limits - limits passed to "busboy"
@@ -54,7 +54,7 @@ export type FilesInMemory<Repeated extends boolean> = FilesType<FileInMemory, Re
5454
* }
5555
* },
5656
* // mode "memory" | "disk"
57-
* "disk",
57+
* "disk", It requires "regAbort" from "core" package to be used on response.
5858
* // whether to accept repeated files/fields
5959
* false
6060
* );
@@ -270,15 +270,14 @@ export async function parseFormDataBody<T extends "memory" | "disk", Repeated ex
270270
parserStream.write(copy);
271271
if (isLast) { parserStream.end();}
272272
});
273-
// calling emitter.emit many times won't hurt, as listener is ONE and emitEmergency might be called IN ONE PLACE
274273
var endReceivingBody: ()=>void | undefined
275274
function onAborted() {
276275
if(!lastError) { lastError = "aborted" }
277276
emitEmergency()
278277
}
279-
res.emitter.once("abort", onAborted);
278+
res.abortCh.sub(onAborted);
280279
await new Promise<void>((resolve) => { endReceivingBody = resolve });
281-
if(!res.aborted) res.emitter.off("abort", onAborted);
280+
if(!res.aborted) res.abortCh.unsub(onAborted);
282281
if(lastError && !emergency) { emitEmergency(); }
283282
if(emergency && emergency !== true) await (emergency as Promise<void[]>);
284283
return !lastError
@@ -287,7 +286,7 @@ export async function parseFormDataBody<T extends "memory" | "disk", Repeated ex
287286
}
288287
type AccumulatedBody<T extends boolean> = Buffer<T extends true ? SharedArrayBuffer : ArrayBuffer>
289288
/**
290-
* This utility just accumulates body and verifies if it stays within the given limit.
289+
* This utility just accumulates body and verifies if it stays within the given limit. It requires "regAbort" from "@ublitzjs/core" package to be used.
291290
* @param CL this is Content-Length to be compared against + acts as a preallocation amount.
292291
* If not given by developer OR == 0 - constantly happen memory reallocations with Buffer.concat
293292
* @param shared whether return Buffer of SharedArrayBuffer or simple ArrayBuffer. SharedArrayBuffer lets you pass data to a worker thread if work there is cpu intensive
@@ -318,11 +317,11 @@ export async function accumulateBody<T extends boolean = false>(
318317
data = (shared ? Buffer.from(new SharedArrayBuffer(CL)) : Buffer.allocUnsafe(CL)) as AccumulatedBody<T>
319318
return new Promise((resolve) => {
320319
function onAborted() { resolve(undefined) }
321-
res.emitter.once("abort", onAborted);
320+
res.abortCh.sub(onAborted);
322321
res.onData((ab, isLast) => {
323322
write(ab);
324323
writtenBytes += ab.byteLength;
325-
if (isLast) { res.emitter.off("abort", onAborted); resolve(data) }
324+
if (isLast) { res.abortCh.unsub(onAborted); resolve(data) }
326325
});
327326
})
328327
}

tests/k6/server.mjs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import {App} from "uWebSockets.js"
2-
import {extendApp, registerAbort} from "@ublitzjs/core"
2+
import {extendApp, regAbort} from "@ublitzjs/core"
33
import {accumulateBody, parseFormDataBody} from "@ublitzjs/payload"
44
import {nanoid} from "nanoid"
55
import {mkdirSync, rm} from "node:fs"
@@ -8,7 +8,7 @@ var server = extendApp(App())
88
var outDir = "tmp/" + nanoid(7)
99
mkdirSync(outDir)
1010
server.post("/multipart-disk", async (res, req)=>{
11-
registerAbort(res)
11+
regAbort(res)
1212
var result = await parseFormDataBody({
1313
CT: req.getHeader("content-type"), res, outDir,
1414
parseLimits: {fileSize: 1024*1024+1}
@@ -24,7 +24,7 @@ server.post("/multipart-disk", async (res, req)=>{
2424
) + '}'))
2525
})
2626
server.post("/multipart-memory", async (res, req)=>{
27-
registerAbort(res)
27+
regAbort(res)
2828
var result = await parseFormDataBody({
2929
CT: req.getHeader("content-type"), res, outDir,
3030
parseLimits: {fileSize: 1024*1024+1}
@@ -40,7 +40,7 @@ server.post("/multipart-memory", async (res, req)=>{
4040
) + '}'))
4141
})
4242
server.post("/upload", async (res, req)=>{
43-
registerAbort(res)
43+
regAbort(res)
4444
var result = await accumulateBody(res, Number(req.getHeader("content-length")), false)
4545
res.cork(()=>res.end('{"receivedBytes": ' + result.byteLength + '}'))
4646
})

tests/vitest/http/form-data.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import type { ClientRequest } from "node:http";
22
import type { FileInMemory, FileOnDisk, parseFormDataBody } from "@ublitzjs/payload"
33
import type { Limits } from "busboy";
4-
import { registerAbort, type HttpRequest, type HttpResponse } from "@ublitzjs/core";
4+
import { regAbort, type HttpRequest, type HttpResponse } from "@ublitzjs/core";
55
import { expect } from "vitest";
66
import { readFileSync, rmSync } from "node:fs";
77

@@ -61,7 +61,7 @@ export function setupHandler(parseBody: typeof parseFormDataBody) {
6161
opts: { limits?: Limits, outDir?: string } = {}
6262
) {
6363
return async (res: HttpResponse, req: HttpRequest) => {
64-
registerAbort(res)
64+
regAbort(res)
6565
var result = await parseBody({
6666
res, CT: req.getHeader("content-type"), parseLimits: opts.limits,
6767
outDir: opts.outDir,

tests/vitest/http/index.ts

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { registerAbort } from "@ublitzjs/core";
1+
import { regAbort } from "@ublitzjs/core";
22
import { request } from "node:http";
33
import {expectType} from "tsd"
44
//import { setTimeout as awaitTimeout } from "node:timers/promises"
@@ -37,7 +37,7 @@ export default async function(module: typeof import("@ublitzjs/payload")) {
3737
it("is type-safe", skipHelper(), ()=>{
3838
if(runningTsd) {
3939
server.post("/types", async (res, req)=>{
40-
registerAbort(res);
40+
regAbort(res);
4141
var resultMemorySingular = await module.parseFormDataBody({
4242
res, CT:req.getHeader("content-type")
4343
}, "memory", false);
@@ -238,7 +238,7 @@ export default async function(module: typeof import("@ublitzjs/payload")) {
238238
it("successfully aborts request", skipHelper(), async () => {
239239
await new Promise<void>((resolve) => {
240240
server.post("/files/memory/abort", async (res, req) => {
241-
registerAbort(res)
241+
regAbort(res)
242242
var result = await module.parseFormDataBody({
243243
CT: req.getHeader("content-type"),
244244
res
@@ -360,7 +360,7 @@ export default async function(module: typeof import("@ublitzjs/payload")) {
360360
function doIt(type: "/singular" | "/repeated") {
361361
return new Promise<void>((resolve) => {
362362
server.post("/files/disk/abort" + type, async (res, req) => {
363-
registerAbort(res)
363+
regAbort(res)
364364
var result = await module.parseFormDataBody({
365365
CT: req.getHeader("content-type"),
366366
res, outDir
@@ -399,7 +399,7 @@ export default async function(module: typeof import("@ublitzjs/payload")) {
399399
var outDir = "tmp/" + nanoid(10);
400400
mkdirSync(outDir)
401401
server.post("/files/disk/empty", async (res, req)=>{
402-
registerAbort(res)
402+
regAbort(res)
403403
var result = await module.parseFormDataBody({ res, CT: req.getHeader("content-type"), outDir },"disk", false)
404404
if (result.ok) {
405405
try {
@@ -437,7 +437,7 @@ export default async function(module: typeof import("@ublitzjs/payload")) {
437437
it("is type-safe", ()=>{
438438
if(runningTsd) {
439439
server.post("/body/types", async (res, req)=>{
440-
registerAbort(res)
440+
regAbort(res)
441441
var result = await module.accumulateBody(res, Number(req.getHeader("content-length")), false)
442442
if(res.aborted) return;
443443
expectType<Buffer<ArrayBuffer>|undefined>(result)
@@ -451,7 +451,7 @@ export default async function(module: typeof import("@ublitzjs/payload")) {
451451
it("accumulates larger bodies then 64KiB", async ()=>{
452452
var body = Buffer.allocUnsafeSlow(65*1024)
453453
server.post("/body/large", async (res, req)=>{
454-
registerAbort(res)
454+
regAbort(res)
455455
var result = await module.accumulateBody(res, Number(req.getHeader("content-length")))
456456
if(!result) return;
457457
expect(body.compare(result)).toBe(0)

0 commit comments

Comments
 (0)