Skip to content

Commit d082441

Browse files
authored
fix do class inheritance, upgrade Effect to beta.57 (#423)
1 parent ffe8258 commit d082441

8 files changed

Lines changed: 191 additions & 170 deletions

File tree

.changeset/legal-streets-write.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
"effect-workerd": patch
3+
"liminal": patch
4+
---
5+
6+
Actor now inherits from DurableObject. Effect updated to beta 57.

effect-workerd/D1.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,4 +5,4 @@ import * as Binding from "./Binding.ts"
55

66
export class D1 extends Context.Service<D1, D1Database>()("effect-workerd/D1") {}
77

8-
export const layer = Binding.layer(D1, ["exec"], (db) => D1Client.layer({ db }))
8+
export const layer = Binding.layer(D1, ["prepare", "batch", "exec"], (db) => D1Client.layer({ db }))

effect-workerd/Kv.ts

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,11 @@ export const Kv =
4141
decodeValue: S.decodeUnknownEffect(S.fromJsonString(S.toCodecJson(value))),
4242
}
4343

44-
const layer = Binding.layer(tag, ["put", "get", "delete"])
45-
46-
return Object.assign(tag, { definition, transcoders, layer })
44+
return Object.assign(tag, {
45+
definition,
46+
transcoders,
47+
layer: Binding.layer(tag, ["get", "put", "delete", "list", "getWithMetadata"]),
48+
})
4749
}
4850

4951
export const put = Effect.fnUntraced(function* <Self, Id extends string, D extends KvDefinition>(

effect-workerd/Worker.ts

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,18 +26,29 @@ export interface WorkerDefinition<PreludeROut, PreludeE, E> {
2626
}
2727

2828
export const make = <PreludeROut, PreludeE, E>({ handler, prelude }: WorkerDefinition<PreludeROut, PreludeE, E>) => {
29-
const runtime = ManagedRuntime.make(
30-
Layer.mergeAll(FetchHttpClient.layer, ConfigProvider.layer(ConfigProvider.fromUnknown(env))),
31-
)
32-
33-
const fetch = (request: Request, _env: unknown, ctx: globalThis.ExecutionContext): Promise<Response> =>
34-
handler.pipe(
29+
let runtime:
30+
| undefined
31+
| ManagedRuntime.ManagedRuntime<
32+
PreludeROut | Layer.Success<typeof HttpServer.layerServices> | HttpClient.HttpClient,
33+
PreludeE
34+
>
35+
const fetch = (request: Request, _env: unknown, ctx: globalThis.ExecutionContext): Promise<Response> => {
36+
runtime ??= ManagedRuntime.make(
37+
prelude.pipe(
38+
Layer.provideMerge(
39+
Layer.mergeAll(
40+
FetchHttpClient.layer,
41+
ConfigProvider.layer(ConfigProvider.fromUnknown(env)),
42+
HttpServer.layerServices,
43+
),
44+
),
45+
),
46+
)
47+
return handler.pipe(
3548
Effect.tapCause(logCause),
3649
Effect.catchCause(() => Effect.succeed(HttpServerResponse.empty({ status: 500 }))),
3750
Effect.map(HttpServerResponse.toWeb),
3851
Effect.provide([
39-
prelude,
40-
HttpServer.layerServices,
4152
Layer.succeed(ExecutionContext, ctx),
4253
Layer.succeed(NativeRequest, request),
4354
Layer.succeed(HttpServerRequest.HttpServerRequest, HttpServerRequest.fromWeb(request)),
@@ -51,6 +62,7 @@ export const make = <PreludeROut, PreludeE, E>({ handler, prelude }: WorkerDefin
5162
Effect.provideService(Layer.CurrentMemoMap, runtime.memoMap),
5263
runtime.runPromise,
5364
)
65+
}
5466

5567
return { fetch }
5668
}

liminal/workerd/WorkerdActorNamespace.ts

Lines changed: 73 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import { DurableObject } from "cloudflare:workers"
12
import {
23
Layer,
34
Effect,
@@ -12,6 +13,7 @@ import {
1213
Array,
1314
Encoding,
1415
Option,
16+
Cause,
1517
} from "effect"
1618
import { Binding, DoState, NativeRequest } from "effect-workerd"
1719
import { SecWebSocketProtocol, close } from "effect-workerd/socket_util"
@@ -91,8 +93,10 @@ export interface ActorNamespace<
9193
PreludeE,
9294
RunROut,
9395
RunE,
94-
> extends Context.Service<NamespaceSelf, DurableObjectNamespace> {
95-
new (state: globalThis.DurableObjectState<{}>): Context.ServiceClass.Shape<NamespaceId, DurableObjectNamespace>
96+
> {
97+
new (state: DurableObjectState<{}>, env: Cloudflare.Env): DurableObject
98+
99+
readonly service: Context.ServiceClass<NamespaceSelf, NamespaceId, DurableObjectNamespace>
96100

97101
readonly definition: ActorNamespaceDefinition<
98102
ActorSelf,
@@ -111,7 +115,14 @@ export interface ActorNamespace<
111115
readonly upgrade: (
112116
name: Name["Type"],
113117
attachments: S.Struct<AttachmentFields>["Type"],
114-
) => Effect.Effect<HttpServerResponse.HttpServerResponse, S.SchemaError, NamespaceSelf | NativeRequest.NativeRequest>
118+
) => Effect.Effect<
119+
HttpServerResponse.HttpServerResponse,
120+
S.SchemaError | Encoding.EncodingError | Cause.NoSuchElementError,
121+
| NamespaceSelf
122+
| NativeRequest.NativeRequest
123+
| Name["EncodingServices"]
124+
| S.Struct<AttachmentFields>["EncodingServices"]
125+
>
115126

116127
readonly layer: (binding: string) => Layer.Layer<NamespaceSelf, S.SchemaError, never>
117128
}
@@ -161,7 +172,7 @@ export const Service =
161172
RunROut,
162173
RunE
163174
> => {
164-
const { hibernation, actor, prelude, handlers, layer: runLayer, onConnect } = definition
175+
const { hibernation, actor, prelude, handlers, layer, onConnect } = definition
165176
const {
166177
definition: {
167178
name: Name,
@@ -171,7 +182,6 @@ export const Service =
171182
} = actor
172183

173184
const encodeName = S.encodeEffect(Name)
174-
const decodeName = S.decodeUnknownEffect(Name)
175185

176186
const Attachments = S.Struct(AttachmentFields)
177187
const encodeAttachments = S.encodeEffect(S.toCodecJson(Attachments))
@@ -194,63 +204,56 @@ export const Service =
194204
encodeAttachments(attachments).pipe(Effect.andThen((v) => Effect.sync(() => socket.serializeAttachment(v)))),
195205
}
196206

197-
const tag = class tag extends Context.Service<NamespaceSelf, DurableObjectNamespace>()(id) {
207+
class NameDecoded extends Context.Service<NameDecoded, Name["Type"]>()(
208+
"liminal/WorkerdActorNamespace/NameDecoded",
209+
) {}
210+
211+
return class extends DurableObject {
212+
static definition = definition
213+
static service = Context.Service<NamespaceSelf, DurableObjectNamespace>()(id)
214+
static layer = Binding.layer(this.service, ["idFromName", "idFromString", "newUniqueId", "get"])
215+
198216
readonly runtime
199217
readonly directory = ClientDirectory.make(actor, transport)
200218

201-
constructor(...args: [never]) {
202-
super(...args)
203-
const [state, env] = args as never as [state: globalThis.DurableObjectState<{}>, env: unknown]
204-
219+
constructor(state: DurableObjectState<{}>, env: Cloudflare.Env) {
220+
super(state, env)
205221
if (hibernation) {
206222
Option.andThen(
207223
Duration.fromInput(hibernation),
208224
flow(Duration.toMillis, state.setHibernatableWebSocketEventTimeout),
209225
)
210226
}
211227

212-
const baseLayer = Layer.mergeAll(
213-
prelude.pipe(Layer.provideMerge(ConfigProvider.layer(ConfigProvider.fromUnknown(env)))),
228+
const Live = Layer.mergeAll(
214229
FetchHttpClient.layer,
215230
Layer.succeed(DoState.DoState, state),
216231
Mutex.layer,
217-
)
232+
Layer.effect(NameDecoded, S.decodeUnknownEffect(Name)(state.id.name)).pipe(
233+
Layer.provideMerge(prelude.pipe(Layer.provideMerge(ConfigProvider.layer(ConfigProvider.fromUnknown(env))))),
234+
),
235+
).pipe(boundLayer("actor"))
218236

219-
this.runtime = Effect.gen({ self: this }, function* () {
220-
this.#name = yield* Effect.tryPromise(() => state.storage.get("__liminal_name")).pipe(
221-
Effect.flatMap((v) => (typeof v === "string" ? decodeName(v) : Effect.succeed(undefined))),
222-
)
237+
const hydrateAttachments = Effect.gen({ self: this }, function* () {
223238
for (const socket of state.getWebSockets()) {
224239
const attachments = yield* decodeAttachments(socket.deserializeAttachment())
225240
yield* this.directory.register(socket, attachments)
226241
}
227-
}).pipe(
228-
Effect.tapCause(logCause),
229-
span("make_runtime"),
230-
Layer.effectDiscard,
231-
Layer.provideMerge(baseLayer),
232-
boundLayer("actor"),
233-
ManagedRuntime.make,
234-
)
242+
}).pipe(span("hydrateAttachments"), Effect.tapCause(logCause))
243+
244+
this.runtime = hydrateAttachments.pipe(Layer.effectDiscard, Layer.provideMerge(Live), ManagedRuntime.make)
235245
}
236246

237-
#name?: Name["Type"] | undefined
238-
fetch(request: Request): Promise<Response> {
247+
override fetch(request: Request): Promise<Response> {
239248
return Effect.gen({ self: this }, function* () {
240249
const url = new URL(request.url)
241-
const name = yield* decodeName(url.searchParams.get("__liminal_name"))
242250
const attachments = yield* decodeAttachmentsString(url.searchParams.get("__liminal_attachments"))
243-
if (!this.#name) {
244-
this.#name = name
245-
const state = yield* DoState.DoState
246-
const encoded = yield* S.encodeEffect(Name)(name)
247-
yield* Effect.promise(() => state.storage.put("__liminal_name", encoded))
248-
}
249251
const { 0: webSocket, 1: server } = new WebSocketPair()
250252
const state = yield* DoState.DoState
251253
state.acceptWebSocket(server)
252254
server.send(yield* encodeAuditionSuccess({ _tag: "Audition.Success" }))
253255
const currentClient = yield* this.directory.register(server, attachments)
256+
const name = yield* NameDecoded
254257
const ActorLive = Layer.succeed(actor, {
255258
name,
256259
clients: this.directory.handles,
@@ -260,7 +263,7 @@ export const Service =
260263
Effect.scoped,
261264
span("onConnect"),
262265
Effect.scoped,
263-
Effect.provide(Layer.provideMerge(runLayer, ActorLive)),
266+
Effect.provide(Layer.provideMerge(layer, ActorLive)),
264267
)
265268
yield* debug("ClientRegistered")
266269
return new Response(null, {
@@ -271,10 +274,10 @@ export const Service =
271274
}).pipe(Effect.tapCause(logCause), span("fetch"), this.runtime.runPromise)
272275
}
273276

274-
webSocketMessage(socket: WebSocket, raw: string | ArrayBuffer) {
277+
override webSocketMessage(socket: WebSocket, raw: string | ArrayBuffer) {
275278
Effect.gen({ self: this }, function* () {
276279
const currentClient = yield* this.directory.get(socket)
277-
const name = yield* Effect.fromNullishOr(this.#name)
280+
const name = yield* NameDecoded
278281
const ActorLive = Layer.succeed(actor, {
279282
name,
280283
clients: this.directory.handles,
@@ -308,54 +311,52 @@ export const Service =
308311
span("handler", { attributes: { _tag } }),
309312
Effect.andThen((v) => Effect.sync(() => socket.send(v))),
310313
Effect.scoped,
311-
Effect.provide(Layer.provideMerge(runLayer, ActorLive)),
314+
Effect.provide(Layer.provideMerge(layer, ActorLive)),
312315
)
313316
}).pipe(Effect.scoped, Mutex.task, Effect.tapCause(logCause), span("webSocketMessage"), this.runtime.runFork)
314317
}
315318

316-
webSocketClose(socket: WebSocket, _code: number, _reason: string, _wasClean: boolean) {
319+
override webSocketClose(socket: WebSocket, _code: number, _reason: string, _wasClean: boolean) {
317320
this.directory
318321
.unregister(socket)
319322
.pipe(Effect.tap(debug("SocketClosed")), Effect.tapCause(logCause), this.runtime.runFork)
320323
}
321324

322-
webSocketError(socket: WebSocket, cause: unknown) {
325+
override webSocketError(socket: WebSocket, cause: unknown) {
323326
Effect.gen({ self: this }, function* () {
324327
yield* debug("SocketErrored", { cause })
325328
yield* this.directory.unregister(socket)
326329
}).pipe(Effect.tapCause(logCause), span("SocketErrored", { attributes: { cause } }), this.runtime.runFork)
327330
}
328-
}
329-
330-
const upgrade = Effect.fnUntraced(function* (name: Name["Type"], attachments: (typeof Attachments)["Type"]) {
331-
yield* debug("UpgradeInitiated", { attachments })
332-
const namespace = yield* tag
333-
const nameEncoded = yield* encodeName(name)
334-
const stub = namespace.getByName(nameEncoded)
335-
const request = yield* NativeRequest.NativeRequest
336-
const protocols = yield* Effect.fromNullishOr(request.headers.get(SecWebSocketProtocol)).pipe(
337-
Effect.map(flow(String.split(","), Array.map(String.trim))),
338-
)
339-
const liminalTokenI = yield* Array.findFirstIndex(protocols, (v) => v === "liminal")
340-
const requestClientId = yield* Effect.fromNullishOr(protocols[liminalTokenI + 1]).pipe(
341-
Effect.flatMap((v) => Encoding.decodeBase64UrlString(v).asEffect()),
342-
)
343-
if (requestClientId !== clientId) {
344-
return close(
345-
yield* encodeAuditionFailure({
346-
_tag: "Audition.Failure",
347-
expected: clientId,
348-
actual: requestClientId,
349-
}),
350-
)
351-
}
352-
const url = new URL(request.url)
353-
url.searchParams.set("__liminal_name", nameEncoded)
354-
url.searchParams.set("__liminal_attachments", yield* encodeAttachmentsString(attachments))
355-
return yield* Effect.promise(() => stub.fetch(new Request(url, request))).pipe(Effect.map(HttpServerResponse.raw))
356-
}, span("upgrade"))
357331

358-
const layer = Binding.layer(tag, ["getByName"])
359-
360-
return Object.assign(tag, { definition, upgrade, layer }) as never
332+
static readonly upgrade = (name: Name["Type"], attachments: (typeof Attachments)["Type"]) =>
333+
Effect.gen({ self: this }, function* () {
334+
yield* debug("UpgradeInitiated", { attachments })
335+
const namespace = yield* this.service
336+
const nameEncoded = yield* encodeName(name)
337+
const stub = namespace.getByName(nameEncoded)
338+
const request = yield* NativeRequest.NativeRequest
339+
const protocols = yield* Effect.fromNullishOr(request.headers.get(SecWebSocketProtocol)).pipe(
340+
Effect.map(flow(String.split(","), Array.map(String.trim))),
341+
)
342+
const liminalTokenI = yield* Array.findFirstIndex(protocols, (v) => v === "liminal")
343+
const requestClientId = yield* Effect.fromNullishOr(protocols[liminalTokenI + 1]).pipe(
344+
Effect.flatMap((v) => Encoding.decodeBase64UrlString(v).asEffect()),
345+
)
346+
if (requestClientId !== clientId) {
347+
return close(
348+
yield* encodeAuditionFailure({
349+
_tag: "Audition.Failure",
350+
expected: clientId,
351+
actual: requestClientId,
352+
}),
353+
)
354+
}
355+
const url = new URL(request.url)
356+
url.searchParams.set("__liminal_attachments", yield* encodeAttachmentsString(attachments))
357+
return yield* Effect.promise(() => stub.fetch(new Request(url, request))).pipe(
358+
Effect.map(HttpServerResponse.raw),
359+
)
360+
}).pipe(span("upgrade"))
361+
}
361362
}

0 commit comments

Comments
 (0)