Skip to content

Commit 6f886c5

Browse files
committed
Drop Options generic and thread signal directly
`AsyncReadable<Options>` existed so stores could receive arbitrary per-call state. Auth headers, presigning context, cancellation signals, even chunk-layer concerns like caching and prefetch priority (#296, vole-core's `wrapArray`). It was a catch-all for extensions that had nowhere else to live, and the cost was a pile of type magic: higher-kinded-type encoding in the store middleware system (#384), threading generic types that didn't actually provide that much type safety. (TypeScript could often bail out to `any` when inference broke.) Those extensions now have proper homes. Store middleware (#384) gives transport-layer concerns (auth, presigning, request transformation) a proper extension point, and the custom `fetch` option on `FetchStore` (#388) handles the per-store cases at the callsite. Chunk-layer concerns will move to `zarr.extendArray` in a follow-up. What's left is `signal`, which now lives properly on the (non-generic) `AsyncReadable` interface and can be passed directly in `zarr.get` and `zarr.set` from the caller: // Before interface AsyncReadable<Options = unknown> { get(key: AbsolutePath, opts?: Options): Promise<Uint8Array | undefined>; } await zarr.get(arr, null, { opts: { signal: ctl.signal } }); // After interface AsyncReadable { get(key: AbsolutePath, opts?: { signal?: AbortSignal }): Promise<Uint8Array | undefined>; } await zarr.get(arr, null, { signal: ctl.signal }); Batched caller signals in `withRangeBatching` are now merged with `AbortSignal.any` instead of a user-supplied `mergeOptions` reducer. The deprecated `opts?: { signal? }` shape still works for one major version and is folded into the new `signal` via `AbortSignal.any`.
1 parent 5f1b5d0 commit 6f886c5

20 files changed

Lines changed: 347 additions & 480 deletions

File tree

packages/@zarrita-ndarray/src/index.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ export async function get<
4949
>(
5050
arr: zarr.Array<D, Store>,
5151
selection: Sel | null = null,
52-
opts: zarr.GetOptions<Parameters<Store["get"]>[1]> = {},
52+
opts: zarr.GetOptions = {},
5353
): Promise<
5454
null extends Sel[number]
5555
? ndarray.NdArray<zarr.TypedArray<D>>

packages/@zarrita-storage/src/fetch.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ interface FetchStoreOptions {
131131
* });
132132
* ```
133133
*/
134-
class FetchStore implements AsyncReadable<RequestInit> {
134+
class FetchStore implements AsyncReadable {
135135
#fetch: (request: Request) => Promise<Response>;
136136
#overrides: RequestInit;
137137
#useSuffixRequest: boolean;

packages/@zarrita-storage/src/fs.ts

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,12 @@ import { Buffer } from "node:buffer";
22
import * as fs from "node:fs";
33
import * as path from "node:path";
44

5-
import type { AbsolutePath, AsyncMutable, RangeQuery } from "./types.js";
5+
import type {
6+
AbsolutePath,
7+
AsyncMutable,
8+
GetOptions,
9+
RangeQuery,
10+
} from "./types.js";
611
import { stripPrefix } from "./util.js";
712

813
function isErrorNoEntry(err: unknown): err is { code: "ENOENT" } {
@@ -13,18 +18,26 @@ function isErrorNoEntry(err: unknown): err is { code: "ENOENT" } {
1318
class FileSystemStore implements AsyncMutable {
1419
constructor(public root: string) {}
1520

16-
async get(key: AbsolutePath): Promise<Uint8Array | undefined> {
21+
async get(
22+
key: AbsolutePath,
23+
opts: GetOptions = {},
24+
): Promise<Uint8Array | undefined> {
25+
opts.signal?.throwIfAborted();
1726
let fp = path.join(this.root, stripPrefix(key));
18-
return fs.promises.readFile(fp).catch((err: NodeJS.ErrnoException) => {
19-
if (err.code === "ENOENT") return undefined;
20-
throw err;
21-
});
27+
return fs.promises
28+
.readFile(fp, { signal: opts.signal })
29+
.catch((err: NodeJS.ErrnoException) => {
30+
if (err.code === "ENOENT") return undefined;
31+
throw err;
32+
});
2233
}
2334

2435
async getRange(
2536
key: AbsolutePath,
2637
range: RangeQuery,
38+
opts: GetOptions = {},
2739
): Promise<Uint8Array | undefined> {
40+
opts.signal?.throwIfAborted();
2841
let fp = path.join(this.root, stripPrefix(key));
2942
let filehandle: fs.promises.FileHandle | undefined;
3043
try {
@@ -38,10 +51,12 @@ class FileSystemStore implements AsyncMutable {
3851
range.suffixLength,
3952
stats.size - range.suffixLength,
4053
);
54+
opts.signal?.throwIfAborted();
4155
return data;
4256
}
4357
let data = Buffer.alloc(range.length);
4458
await filehandle.read(data, 0, range.length, range.offset);
59+
opts.signal?.throwIfAborted();
4560
return data;
4661
} catch (err: unknown) {
4762
// return undefined is no file or directory

packages/@zarrita-storage/src/ref.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@ function parseReferencesJson(refsJson: unknown): Map<string, ResolvedEntry> {
163163
*
164164
* @experimental
165165
*/
166-
class ReferenceStore implements AsyncReadable<RequestInit> {
166+
class ReferenceStore implements AsyncReadable {
167167
#inner: FetchStore;
168168

169169
constructor(

packages/@zarrita-storage/src/types.ts

Lines changed: 14 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -9,23 +9,25 @@ export type RangeQuery =
99
suffixLength: number;
1010
};
1111

12-
export type Readable<GetOptions = unknown> =
13-
| AsyncReadable<GetOptions>
14-
| SyncReadable<GetOptions>;
15-
export interface AsyncReadable<Options = unknown> {
16-
get(key: AbsolutePath, opts?: Options): Promise<Uint8Array | undefined>;
12+
export interface GetOptions {
13+
signal?: AbortSignal;
14+
}
15+
16+
export type Readable = AsyncReadable | SyncReadable;
17+
export interface AsyncReadable {
18+
get(key: AbsolutePath, opts?: GetOptions): Promise<Uint8Array | undefined>;
1719
getRange?(
1820
key: AbsolutePath,
1921
range: RangeQuery,
20-
opts?: Options,
22+
opts?: GetOptions,
2123
): Promise<Uint8Array | undefined>;
2224
}
23-
export interface SyncReadable<Options = unknown> {
24-
get(key: AbsolutePath, opts?: Options): Uint8Array | undefined;
25+
export interface SyncReadable {
26+
get(key: AbsolutePath, opts?: GetOptions): Uint8Array | undefined;
2527
getRange?(
2628
key: AbsolutePath,
2729
range: RangeQuery,
28-
opts?: Options,
30+
opts?: GetOptions,
2931
): Uint8Array | undefined;
3032
}
3133

@@ -37,10 +39,6 @@ export interface SyncWritable {
3739
set(key: AbsolutePath, value: Uint8Array): void;
3840
}
3941

40-
export type AsyncMutable<GetOptions = unknown> = AsyncReadable<GetOptions> &
41-
AsyncWritable;
42-
export type SyncMutable<GetOptions = unknown> = SyncReadable<GetOptions> &
43-
SyncWritable;
44-
export type Mutable<GetOptions = unknown> =
45-
| AsyncMutable<GetOptions>
46-
| SyncMutable<GetOptions>;
42+
export type AsyncMutable = AsyncReadable & AsyncWritable;
43+
export type SyncMutable = SyncReadable & SyncWritable;
44+
export type Mutable = AsyncMutable | SyncMutable;

packages/zarrita/__tests__/batched-fetch.test.ts

Lines changed: 19 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -306,97 +306,44 @@ describe("withRangeBatching", () => {
306306
});
307307
});
308308

309-
describe("mergeOptions", () => {
310-
it("uses first caller's options by default", async () => {
309+
describe("signal merging", () => {
310+
it("merges caller signals via AbortSignal.any across a batch", async () => {
311311
let inner = fakeStore();
312312
let store = withRangeBatching(inner);
313313

314+
let a = new AbortController();
315+
let b = new AbortController();
314316
await Promise.all([
315317
store.getRange(
316318
"/data/chunk",
317319
{ offset: 0, length: 100 },
318-
{ headers: { "x-req": "first" } },
320+
{ signal: a.signal },
319321
),
320322
store.getRange(
321323
"/data/chunk",
322324
{ offset: 100, length: 100 },
323-
{ headers: { "x-req": "second" } },
325+
{ signal: b.signal },
324326
),
325327
]);
326328

327-
expect(inner.getRange.mock.calls[0][2]?.headers).toEqual({
328-
"x-req": "first",
329-
});
329+
let passedSignal = inner.getRange.mock.calls[0][2]?.signal;
330+
expect(passedSignal).toBeInstanceOf(AbortSignal);
331+
// Aborting either upstream signal should propagate to the merged one.
332+
a.abort(new Error("a aborted"));
333+
expect(passedSignal?.aborted).toBe(true);
330334
});
331335

332-
it("rejects pending requests and recovers when mergeOptions throws", async () => {
336+
it("passes a single signal through unchanged", async () => {
333337
let inner = fakeStore();
334-
let shouldThrow = true;
335-
let store = withRangeBatching(inner, {
336-
mergeOptions: () => {
337-
if (shouldThrow) throw new Error("bad merge");
338-
return undefined;
339-
},
340-
});
338+
let store = withRangeBatching(inner);
339+
let ctl = new AbortController();
341340

342-
// First batch: mergeOptions throws, all requests should reject
343-
let results = await Promise.allSettled([
344-
store.getRange("/data/chunk", { offset: 0, length: 100 }),
345-
store.getRange("/data/chunk", { offset: 100, length: 100 }),
346-
]);
347-
expect(results[0].status).toBe("rejected");
348-
expect(results[1].status).toBe("rejected");
349-
expect((results[0] as PromiseRejectedResult).reason.message).toBe(
350-
"bad merge",
341+
await store.getRange(
342+
"/data/chunk",
343+
{ offset: 0, length: 100 },
344+
{ signal: ctl.signal },
351345
);
352-
353-
// Second batch: store should not be deadlocked
354-
shouldThrow = false;
355-
let r = await store.getRange("/data/chunk", { offset: 0, length: 100 });
356-
expect(r?.length).toBe(100);
357-
});
358-
359-
it("applies mergeOptions reducer across batched callers", async () => {
360-
interface TaggedOptions {
361-
tags: string[];
362-
}
363-
let inner = {
364-
get: vi.fn((_key: AbsolutePath, _opts?: TaggedOptions) =>
365-
Promise.resolve<Uint8Array | undefined>(new Uint8Array(0)),
366-
),
367-
getRange: vi.fn(
368-
(
369-
_key: AbsolutePath,
370-
range: RangeQuery,
371-
_options?: TaggedOptions,
372-
): Promise<Uint8Array | undefined> => {
373-
if ("suffixLength" in range) {
374-
return Promise.resolve(new Uint8Array(range.suffixLength));
375-
}
376-
return Promise.resolve(new Uint8Array(range.length));
377-
},
378-
),
379-
};
380-
let store = withRangeBatching(inner, {
381-
mergeOptions: (batch) => ({
382-
tags: batch.flatMap((o) => o?.tags ?? []),
383-
}),
384-
});
385-
386-
await Promise.all([
387-
store.getRange(
388-
"/data/chunk",
389-
{ offset: 0, length: 100 },
390-
{ tags: ["a"] },
391-
),
392-
store.getRange(
393-
"/data/chunk",
394-
{ offset: 100, length: 100 },
395-
{ tags: ["b"] },
396-
),
397-
]);
398-
399-
expect(inner.getRange.mock.calls[0][2]?.tags).toEqual(["a", "b"]);
346+
expect(inner.getRange.mock.calls[0][2]?.signal).toBe(ctl.signal);
400347
});
401348
});
402349

packages/zarrita/__tests__/get-types.test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,6 @@ describe("Chunk type per union dtype", () => {
162162
expectType(chunk.data).toMatchInlineSnapshot(
163163
`
164164
| string[]
165-
| unknown[]
166165
| Int8Array<ArrayBufferLike>
167166
| Int16Array<ArrayBufferLike>
168167
| Int32Array<ArrayBufferLike>
@@ -177,6 +176,7 @@ describe("Chunk type per union dtype", () => {
177176
| zarr.BoolArray<ArrayBufferLike>
178177
| zarr.UnicodeStringArray<ArrayBufferLike>
179178
| zarr.ByteStringArray<ArrayBufferLike>
179+
| unknown[]
180180
`,
181181
);
182182
});

packages/zarrita/__tests__/middleware-types.test.ts

Lines changed: 8 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -10,20 +10,13 @@ describe("extendStore", () => {
1010
expectType(store).toMatchInlineSnapshot(`Promise<zarr.FetchStore>`);
1111
});
1212

13-
test("direct form in pipeline infers store options", () => {
13+
test("direct form in pipeline", () => {
1414
let store = zarr.extendStore(new zarr.FetchStore(""), (s) =>
15-
zarr.withRangeBatching(s, {
16-
mergeOptions: (batch) => {
17-
expectType(batch).toMatchInlineSnapshot(
18-
`ReadonlyArray<RequestInit | undefined>`,
19-
);
20-
return batch[0];
21-
},
22-
}),
15+
zarr.withRangeBatching(s),
2316
);
2417
expectType(store).toMatchInlineSnapshot(`
2518
Promise<
26-
Required<AsyncReadable<RequestInit>> & {
19+
Required<AsyncReadable> & {
2720
stats: Readonly<zarr.RangeBatchingStats>;
2821
url: string | URL;
2922
}
@@ -36,12 +29,12 @@ describe("extendStore", () => {
3629
return zarr.extendStore(
3730
new zarr.FetchStore(""),
3831
zarr.withConsolidation,
39-
(s) => zarr.withRangeBatching(s, { mergeOptions: (batch) => batch[0] }),
32+
(s) => zarr.withRangeBatching(s),
4033
);
4134
}
4235
expectType(check).toMatchInlineSnapshot(`
4336
() => Promise<
44-
Required<AsyncReadable<RequestInit>> & {
37+
Required<AsyncReadable> & {
4538
stats: Readonly<zarr.RangeBatchingStats>;
4639
url: string | URL;
4740
contents: () => { path: AbsolutePath; kind: "array" | "group" }[];
@@ -68,46 +61,15 @@ describe("defineStoreMiddleware", () => {
6861
let store = withCustom(new zarr.FetchStore(""), { flag: true });
6962
expectType(store).toMatchInlineSnapshot(
7063
`
71-
Required<AsyncReadable<RequestInit>> & {
64+
Required<AsyncReadable> & {
7265
url: string | URL;
7366
hello: () => string;
7467
}
7568
`,
7669
);
7770
});
7871

79-
test("generic: store Options flows into opts parameter", () => {
80-
interface ThingOptions<O> {
81-
storeOptions?: O;
82-
retries?: number;
83-
}
84-
85-
let withThing = defineStoreMiddleware(
86-
<O>(store: AsyncReadable<O>, opts: ThingOptions<O>) => {
87-
return {
88-
async get(key: AbsolutePath, options?: O) {
89-
return store.get(key, options ?? opts.storeOptions);
90-
},
91-
retries: opts.retries ?? 3,
92-
};
93-
},
94-
);
95-
96-
let store = withThing(new zarr.FetchStore(""), {
97-
storeOptions: { signal: AbortSignal.timeout(1000) },
98-
retries: 5,
99-
});
100-
expectType(store).toMatchInlineSnapshot(
101-
`
102-
Required<AsyncReadable<RequestInit>> & {
103-
url: string | URL;
104-
retries: number;
105-
}
106-
`,
107-
);
108-
});
109-
110-
test("chaining preserves Options through wrappers", () => {
72+
test("chaining preserves store through wrappers", () => {
11173
let withA = defineStoreMiddleware(
11274
(store: AsyncReadable, _opts: { a: number }) => {
11375
return {
@@ -134,7 +96,7 @@ describe("defineStoreMiddleware", () => {
13496
);
13597
let store = withB(withA(new zarr.FetchStore(""), { a: 1 }), { b: "x" });
13698
expectType(store).toMatchInlineSnapshot(`
137-
Required<AsyncReadable<RequestInit>> & {
99+
Required<AsyncReadable> & {
138100
url: string | URL;
139101
methodB: () => string;
140102
methodA: () => number;

packages/zarrita/src/codecs/sharding.ts

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
1-
import type { Readable } from "@zarrita/storage";
1+
import type { GetOptions, Readable } from "@zarrita/storage";
22
import { createCodecPipeline } from "../codecs.js";
33
import type { Location } from "../hierarchy.js";
44
import type { Chunk } from "../metadata.js";
55
import { assert, type ShardingCodecMetadata } from "../util.js";
66

77
const MAX_BIG_UINT = 18446744073709551615n;
88

9-
export function createShardedChunkGetter<Store extends Readable>(
10-
location: Location<Store>,
9+
export function createShardedChunkGetter(
10+
location: Location<Readable>,
1111
shardShape: number[],
1212
encodeShardKey: (coord: number[]) => string,
1313
shardingConfig: ShardingCodecMetadata["configuration"],
@@ -24,10 +24,7 @@ export function createShardedChunkGetter<Store extends Readable>(
2424
let checksumSize = 4;
2525
let indexSize = 16 * indexShape.reduce((a, b) => a * b, 1);
2626
let cache: Record<string, Promise<Chunk<"uint64"> | null>> = {};
27-
return async (
28-
chunkCoord: number[],
29-
options?: Parameters<Store["get"]>[1],
30-
) => {
27+
return async (chunkCoord: number[], options?: GetOptions) => {
3128
let shardCoord = chunkCoord.map((d, i) => Math.floor(d / indexShape[i]));
3229
let shardPath = location.resolve(encodeShardKey(shardCoord)).path;
3330

0 commit comments

Comments
 (0)