Skip to content

Commit 8a329b3

Browse files
kylebarronclaude
andauthored
fix(geotiff): revert ConcurrencyLimiter to a chunkd SourceMiddleware (#572)
chunkd#1697 shipped (@chunkd/source 11.4.1, @chunkd/middleware 11.3.1), so SourceView now forwards the request signal to middleware and SourceChunk forwards it on multi-chunk sub-requests. That removes the reason for the LimitedSource source-wrapper workaround (#557): a middleware can now observe an abort, so a request whose caller aborts while queued for a slot is dropped before any network I/O. Replace the internal LimitedSource with LimiterMiddleware again, composed into the header SourceView *after* SourceChunk/SourceCache (so cache hits short-circuit and never burn a slot) and into a tiny SourceView wrapping the data source. Bump the @chunkd/source, @chunkd/source-http, and @chunkd/middleware floors to the fixed releases. Internal-only: GeoTIFF.fromUrl options, ConcurrencyLimiter, Priority, and PerOriginSemaphore are unchanged. The integration test that aborts a queued header read through the real fromUrl -> SourceView -> limiter path is unchanged and still passes, proving the signal now reaches the middleware end to end. Closes #565. Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 6fbeb5d commit 8a329b3

6 files changed

Lines changed: 163 additions & 177 deletions

File tree

dev-docs/specs/2026-05-19-concurrency-limiter-design.md

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -112,8 +112,6 @@ class LimiterMiddleware implements SourceMiddleware {
112112

113113
`LimiterMiddleware` is also internal: the limiter is wired by `GeoTIFF.fromUrl` (where chunkd's `Source` / `SourceView` types are already in scope), so callers don't need to compose middleware themselves.
114114

115-
> **Implementation note (temporary).** The `SourceMiddleware` shape above is the intended design, but it doesn't work yet: chunkd's `SourceView` doesn't forward the request `signal` to middleware, so a middleware can't observe an abort (only the underlying source receives the read options via `SourceView`'s terminal handler). Until that's fixed upstream ([chunkd#1697](https://github.com/blacha/chunkd/pull/1697)), the limiter is implemented as an internal **source wrapper**`LimitedSource` — composed *beneath* `SourceChunk` / `SourceCache` (as the `SourceView`'s source) rather than as a middleware on top. Cache hits still short-circuit in `SourceCache` before reaching it. This is internal-only; the public API is unchanged either way. Revert to `LimiterMiddleware` once chunkd ships the fix — tracked in [#565](https://github.com/developmentseed/deck.gl-raster/issues/565).
116-
117115
## Integration
118116

119117
### `@developmentseed/geotiff`

packages/geotiff/package.json

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,9 +50,9 @@
5050
"extends": "../../package.json"
5151
},
5252
"dependencies": {
53-
"@chunkd/middleware": "^11.3.0",
54-
"@chunkd/source": "^11.4.0",
55-
"@chunkd/source-http": "^11.4.0",
53+
"@chunkd/middleware": "^11.3.1",
54+
"@chunkd/source": "^11.4.1",
55+
"@chunkd/source-http": "^11.4.1",
5656
"@chunkd/source-memory": "^11.2.0",
5757
"@cogeotiff/core": "^9.5.0",
5858
"@developmentseed/affine": "workspace:^",

packages/geotiff/src/geotiff.ts

Lines changed: 22 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ import { parseGDALMetadata } from "./gdal-metadata.js";
1313
import type { CachedTags, GeoKeyDirectory } from "./ifd.js";
1414
import { extractGeoKeyDirectory, prefetchTags } from "./ifd.js";
1515
import type { ConcurrencyLimiter, Priority } from "./limiter.js";
16-
import { LimitedSource } from "./limiter.js";
16+
import { LimiterMiddleware } from "./limiter.js";
1717
import { Overview } from "./overview.js";
1818
import type { DecoderPool } from "./pool/pool.js";
1919
import type { Tile } from "./tile.js";
@@ -327,30 +327,32 @@ export class GeoTIFF {
327327
// unbounded length. Remove once the upstream fix lands.
328328
source.metadata = { size: Number.POSITIVE_INFINITY };
329329

330-
// When a limiter is supplied, gate every network read through it by
331-
// wrapping the raw source. The header `SourceView` composes SourceChunk +
332-
// SourceCache *on top* of this wrapped source, so a cache hit
333-
// short-circuits in SourceCache and never reaches — never burns a slot on
334-
// — the limiter; only reads that escape the cache (and every data read,
335-
// which bypasses the cache) are gated. The same wrapped source backs both
336-
// the header view and the data source, so both share one per-origin pool.
337-
//
338-
// Gating here as a source wrapper rather than a chunkd SourceMiddleware is
339-
// a workaround for chunkd not forwarding the abort signal to middleware;
340-
// see LimitedSource. Once that's fixed upstream this can become a
341-
// middleware again. Tracked in
342-
// https://github.com/developmentseed/deck.gl-raster/issues/565
343-
const limitedSource = concurrencyLimiter
344-
? new LimitedSource(source, { limiter: concurrencyLimiter, getPriority })
345-
: source;
346-
347-
const view = new SourceView(limitedSource, [
330+
// When a limiter is supplied, slot a LimiterMiddleware into both
331+
// sources' middleware stacks. On the header source, it sits *after*
332+
// SourceChunk + SourceCache so a cache hit short-circuits and never
333+
// consumes a slot — only network reads that escape the cache are gated.
334+
// On the data source (no caching), every fetch is gated. Both stacks
335+
// share one LimiterMiddleware instance, so they share one per-origin pool.
336+
const limiter = concurrencyLimiter
337+
? new LimiterMiddleware({
338+
url: new URL(url),
339+
limiter: concurrencyLimiter,
340+
getPriority,
341+
})
342+
: null;
343+
344+
const view = new SourceView(source, [
348345
new SourceChunk({ size: chunkSize }),
349346
new SourceCache({ size: cacheSize }),
347+
...(limiter ? [limiter] : []),
350348
]);
351349

350+
const dataSource: Pick<Source, "fetch"> = limiter
351+
? new SourceView(source, [limiter])
352+
: source;
353+
352354
return await GeoTIFF.open({
353-
dataSource: limitedSource,
355+
dataSource,
354356
headerSource: view,
355357
signal,
356358
debug,

packages/geotiff/src/limiter.ts

Lines changed: 33 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,8 @@
1-
import type { Source, SourceMetadata } from "@chunkd/source";
1+
import type {
2+
SourceCallback,
3+
SourceMiddleware,
4+
SourceRequest,
5+
} from "@chunkd/source";
26

37
/**
48
* Numeric priority used to order waiters in a {@link Semaphore}'s queue. Lower
@@ -63,7 +67,7 @@ interface Waiter {
6367

6468
/**
6569
* Counting semaphore with abort-aware acquire and dynamic priority. Internal
66-
* primitive used by {@link PerOriginSemaphore} and {@link LimitedSource}.
70+
* primitive used by {@link PerOriginSemaphore} and {@link LimiterMiddleware}.
6771
*
6872
* Hands out up to `maxRequests` concurrent slots. Further `acquire()`s queue.
6973
* On every slot-open, the queue is searched for the lowest-priority waiter
@@ -222,91 +226,66 @@ export class PerOriginSemaphore implements ConcurrencyLimiter {
222226
}
223227
}
224228

225-
/** Options for {@link LimitedSource}. */
226-
interface LimitedSourceOptions {
227-
/** The {@link ConcurrencyLimiter} to gate through. The wrapped source's
228-
* own `url` is passed to `limiter.acquire` for per-origin routing. */
229+
/** Options for {@link LimiterMiddleware}. */
230+
interface LimiterMiddlewareOptions {
231+
/** The URL the wrapped source is reading from. Passed to
232+
* `limiter.acquire(url, signal?)` on every fetch — the limiter uses it for
233+
* per-origin routing. */
234+
url: URL;
235+
/** The {@link ConcurrencyLimiter} to gate through. */
229236
limiter: ConcurrencyLimiter;
230-
/** Optional dynamic priority for every fetch through this source. The
237+
/** Optional dynamic priority for every fetch through this middleware. The
231238
* limiter re-invokes this callback on each slot-open, so closures over
232239
* dynamic state (e.g. layer viewport center) re-sort the queue when that
233240
* state changes. Lower = serviced sooner. */
234241
getPriority?: () => Priority;
235242
}
236243

237244
/**
238-
* Wraps a {@link Source} so every `fetch` holds a {@link ConcurrencyLimiter}
239-
* slot for its duration — acquiring before the read, releasing when it settles
240-
* (resolve or reject). Forwards the read's `signal` to `limiter.acquire`, so a
241-
* request whose caller aborts while it is still queued for a slot is dropped
242-
* before any network I/O fires.
245+
* chunkd middleware that holds a {@link ConcurrencyLimiter} slot for the
246+
* duration of each underlying `fetch` — releasing on resolve, on reject, and
247+
* never otherwise interfering. Forwards the request's `signal` to
248+
* `limiter.acquire`, so if the caller aborts while the call is queued the
249+
* request is dropped before any network I/O fires.
243250
*
244-
* Compose this *beneath* `SourceChunk` / `SourceCache` (i.e. as the
245-
* `SourceView`'s underlying source), so a cache hit short-circuits in
246-
* `SourceCache` and never reaches — never burns a slot on — the limiter:
251+
* Composed into a {@link SourceView}'s middleware list alongside the chunkd
252+
* middlewares (`SourceChunk`, `SourceCache`, …). Place it after caching so
253+
* cache hits don't burn a slot.
247254
*
248255
* @example
249256
* ```ts
250257
* import { SourceView } from "@chunkd/source";
251258
* import { SourceCache, SourceChunk } from "@chunkd/middleware";
252259
*
253-
* const limited = new LimitedSource(source, { limiter });
254-
* const view = new SourceView(limited, [
260+
* const view = new SourceView(source, [
255261
* new SourceChunk({ size: 64 * 1024 }),
256262
* new SourceCache({ size: 8 * 1024 * 1024 }),
263+
* new LimiterMiddleware({ url, limiter }),
257264
* ]);
258265
* ```
259266
*
260-
* **Why a source wrapper and not a chunkd `SourceMiddleware`** (which would
261-
* compose more naturally): chunkd's `SourceView` does not forward the request
262-
* `signal` to its middleware, so a middleware cannot observe an abort — only
263-
* the underlying source receives the read options (incl. `signal`) via
264-
* `SourceView`'s terminal handler. Wrapping the source is therefore the only
265-
* layer that can drop a queued request on abort. Revert to a `SourceMiddleware`
266-
* once chunkd forwards the signal (https://github.com/blacha/chunkd/pull/1697);
267-
* tracked in https://github.com/developmentseed/deck.gl-raster/issues/565.
268-
*
269267
* @internal
270268
*/
271-
export class LimitedSource implements Source {
272-
private readonly source: Source;
269+
export class LimiterMiddleware implements SourceMiddleware {
270+
readonly name = "limiter";
271+
private readonly url: URL;
273272
private readonly limiter: ConcurrencyLimiter;
274273
private readonly getPriority?: () => Priority;
275274

276-
constructor(source: Source, opts: LimitedSourceOptions) {
277-
this.source = source;
275+
constructor(opts: LimiterMiddlewareOptions) {
276+
this.url = opts.url;
278277
this.limiter = opts.limiter;
279278
this.getPriority = opts.getPriority;
280279
}
281280

282-
get type(): string {
283-
return this.source.type;
284-
}
285-
286-
get url(): URL {
287-
return this.source.url;
288-
}
289-
290-
get metadata(): SourceMetadata | undefined {
291-
return this.source.metadata;
292-
}
293-
294-
head(options?: { signal: AbortSignal }): Promise<SourceMetadata> {
295-
return this.source.head(options);
296-
}
297-
298-
async fetch(
299-
offset: number,
300-
length?: number,
301-
options?: { signal: AbortSignal },
302-
): Promise<ArrayBuffer> {
281+
async fetch(req: SourceRequest, next: SourceCallback): Promise<ArrayBuffer> {
303282
const release = await this.limiter.acquire(
304-
this.source.url,
305-
options?.signal,
283+
this.url,
284+
req.signal,
306285
this.getPriority,
307286
);
308287
try {
309-
return await this.source.fetch(offset, length, options);
288+
return await next(req);
310289
} finally {
311290
release();
312291
}

0 commit comments

Comments
 (0)