-
-
Notifications
You must be signed in to change notification settings - Fork 558
Expand file tree
/
Copy pathhttpIncomingMessage.ts
More file actions
93 lines (82 loc) · 2.65 KB
/
httpIncomingMessage.ts
File metadata and controls
93 lines (82 loc) · 2.65 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
import * as Headers from "@effect/platform/Headers"
import * as IncomingMessage from "@effect/platform/HttpIncomingMessage"
import * as UrlParams from "@effect/platform/UrlParams"
import * as Effect from "effect/Effect"
import * as FiberRef from "effect/FiberRef"
import * as Inspectable from "effect/Inspectable"
import * as Option from "effect/Option"
import type * as Stream from "effect/Stream"
import type * as Http from "node:http"
import * as NodeStream from "../NodeStream.js"
/** @internal */
export abstract class HttpIncomingMessageImpl<E> extends Inspectable.Class
implements IncomingMessage.HttpIncomingMessage<E>
{
readonly [IncomingMessage.TypeId]: IncomingMessage.TypeId
constructor(
readonly source: Http.IncomingMessage,
readonly onError: (error: unknown) => E,
readonly remoteAddressOverride?: string
) {
super()
this[IncomingMessage.TypeId] = IncomingMessage.TypeId
}
get headers() {
return Headers.fromInput(this.source.headers as any)
}
get remoteAddress() {
return Option.fromNullable(this.remoteAddressOverride ?? this.source.socket.remoteAddress)
}
private textEffect: Effect.Effect<string, E> | undefined
get text(): Effect.Effect<string, E> {
if (this.textEffect) {
return this.textEffect
}
this.textEffect = Effect.runSync(Effect.cached(
Effect.flatMap(
FiberRef.get(IncomingMessage.maxBodySize),
(maxBodySize) =>
NodeStream.toString(() => this.source, {
onFailure: this.onError,
maxBytes: Option.getOrUndefined(maxBodySize)
})
)
))
return this.textEffect
}
get unsafeText(): string {
return Effect.runSync(this.text)
}
get json(): Effect.Effect<unknown, E> {
return Effect.tryMap(this.text, {
try: (_) => _ === "" ? null : JSON.parse(_) as unknown,
catch: this.onError
})
}
get unsafeJson(): unknown {
return Effect.runSync(this.json)
}
get urlParamsBody(): Effect.Effect<UrlParams.UrlParams, E> {
return Effect.flatMap(this.text, (_) =>
Effect.try({
try: () => UrlParams.fromInput(new URLSearchParams(_)),
catch: this.onError
}))
}
get stream(): Stream.Stream<Uint8Array, E> {
return NodeStream.fromReadable<E, Uint8Array>(
() => this.source,
this.onError
)
}
get arrayBuffer(): Effect.Effect<ArrayBuffer, E> {
return Effect.flatMap(
FiberRef.get(IncomingMessage.maxBodySize),
(maxBodySize) =>
NodeStream.toUint8Array(() => this.source, {
onFailure: this.onError,
maxBytes: Option.getOrUndefined(maxBodySize)
}).pipe(Effect.map((_) => _.buffer as ArrayBuffer))
)
}
}