|
| 1 | +// Hand-written runtime support for the generated `createWasmRawCallbacks` |
| 2 | +// adapter (`./generated/host-callbacks-adapter.ts`). The adapter is mechanical |
| 3 | +// (decode params, call the typed host callback, read the result); the pieces |
| 4 | +// here are the genuinely bespoke runtime plumbing it leans on: stream driving |
| 5 | +// and the chain-connection handle. |
| 6 | + |
| 7 | +import { type GenericError, type Result } from "@parity/truapi"; |
| 8 | +import { hexToBytes } from "@parity/truapi/scale"; |
| 9 | + |
| 10 | +import type { ChainConnect, ChainConnection } from "./runtime.js"; |
| 11 | +import type { HostCallbacks } from "./generated/host-callbacks.js"; |
| 12 | + |
| 13 | +type WireResult<T, E> = |
| 14 | + | { success: true; value: T } |
| 15 | + | { success: false; value: E }; |
| 16 | + |
| 17 | +type StreamResult<T, E> = Result<T, E> | WireResult<T, E>; |
| 18 | + |
| 19 | +type MaybeAsyncIterable<T> = AsyncIterable<T> | Iterable<T>; |
| 20 | + |
| 21 | +/** |
| 22 | + * Normalize both generated `Result<T, GenericError>` values and the plain |
| 23 | + * `{ success, value }` envelope used by some JS fixtures into a raw item. |
| 24 | + */ |
| 25 | +function unwrapStreamResult<T>(item: StreamResult<T, GenericError>): T { |
| 26 | + if ("success" in item) { |
| 27 | + if (item.success === false) { |
| 28 | + throw new Error(item.value.reason); |
| 29 | + } |
| 30 | + return item.value; |
| 31 | + } |
| 32 | + if (item.isErr()) { |
| 33 | + throw new Error(item.error.reason); |
| 34 | + } |
| 35 | + return item.value; |
| 36 | +} |
| 37 | + |
| 38 | +/** |
| 39 | + * Accept sync and async host streams behind one async-iterator interface. |
| 40 | + * Host callbacks often use async iterables in production, while tests can use |
| 41 | + * small synchronous fixtures without a custom wrapper. |
| 42 | + */ |
| 43 | +function toAsyncIterator<T>(stream: MaybeAsyncIterable<T>): AsyncIterator<T> { |
| 44 | + const asyncIterable = stream as AsyncIterable<T>; |
| 45 | + if (typeof asyncIterable[Symbol.asyncIterator] === "function") { |
| 46 | + return asyncIterable[Symbol.asyncIterator](); |
| 47 | + } |
| 48 | + const iterator = (stream as Iterable<T>)[Symbol.iterator](); |
| 49 | + const asyncIterator: AsyncIterator<T> = { |
| 50 | + next: async () => iterator.next(), |
| 51 | + }; |
| 52 | + if (iterator.return) { |
| 53 | + asyncIterator.return = async () => iterator.return!(); |
| 54 | + } |
| 55 | + return asyncIterator; |
| 56 | +} |
| 57 | + |
| 58 | +/** |
| 59 | + * Drain an async iterator into a sink until disposed. This is used for |
| 60 | + * callback streams where the Rust core owns cancellation but JS owns the |
| 61 | + * iterator and any transport cleanup behind `return()`. |
| 62 | + */ |
| 63 | +function pumpIterator<T>( |
| 64 | + iterator: AsyncIterator<T>, |
| 65 | + onItem: (value: T) => void, |
| 66 | + label: string, |
| 67 | +): () => void { |
| 68 | + let stopped = false; |
| 69 | + void (async () => { |
| 70 | + try { |
| 71 | + while (!stopped) { |
| 72 | + const next = await iterator.next(); |
| 73 | + if (next.done) return; |
| 74 | + onItem(next.value); |
| 75 | + } |
| 76 | + } catch (err) { |
| 77 | + console.error(`[truapi host callbacks] ${label} failed:`, err); |
| 78 | + } |
| 79 | + })(); |
| 80 | + return () => { |
| 81 | + stopped = true; |
| 82 | + void iterator.return?.(); |
| 83 | + }; |
| 84 | +} |
| 85 | + |
| 86 | +/** |
| 87 | + * Drive a typed host stream of `Result` items into the core's `sendItem` |
| 88 | + * sink, unwrapping each `Result` (or throwing on its error). Returns a |
| 89 | + * disposer that stops iteration. |
| 90 | + */ |
| 91 | +export function driveResultStream<T>( |
| 92 | + stream: MaybeAsyncIterable<StreamResult<T, GenericError>>, |
| 93 | + sendItem: (value: T) => void, |
| 94 | +): () => void { |
| 95 | + return pumpIterator( |
| 96 | + toAsyncIterator(stream), |
| 97 | + (value) => sendItem(unwrapStreamResult(value)), |
| 98 | + "subscription", |
| 99 | + ); |
| 100 | +} |
| 101 | + |
| 102 | +/** |
| 103 | + * Bridge the typed `ChainProvider.connect` callback onto the raw |
| 104 | + * `chainConnect` the WASM core invokes: decode the genesis hash, pump the |
| 105 | + * connection's `responses()` stream into `onResponse`, and expose |
| 106 | + * `send`/`close`. |
| 107 | + */ |
| 108 | +export function chainConnectAdapter( |
| 109 | + host: Pick<HostCallbacks, "connect">, |
| 110 | +): ChainConnect { |
| 111 | + return async (genesisHash, onResponse): Promise<ChainConnection | null> => { |
| 112 | + const connection = await host.connect(hexToBytes(genesisHash)); |
| 113 | + const iterator = connection.responses()[Symbol.asyncIterator](); |
| 114 | + const stopResponses = pumpIterator(iterator, onResponse, "chain responses"); |
| 115 | + return { |
| 116 | + send(request: string): void { |
| 117 | + connection.send(request); |
| 118 | + }, |
| 119 | + close(): void { |
| 120 | + stopResponses(); |
| 121 | + connection.close(); |
| 122 | + }, |
| 123 | + }; |
| 124 | + }; |
| 125 | +} |
0 commit comments