Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
cdcc9ff
refactor!: Make the StorageManager more like StorageInstanceManager f…
janbuchar Apr 20, 2026
02c89b7
ServiceLocator.storageInstanceManager is class-level
janbuchar Apr 20, 2026
afc367c
Simplify StorageInstanceManager
janbuchar Apr 20, 2026
e6191c3
Fix type
janbuchar Apr 22, 2026
f8ae6d5
Fix cache eviction (See https://github.com/apify/crawlee-python/pull/…
janbuchar Apr 23, 2026
72b58e9
Minor nits
janbuchar Apr 23, 2026
fe46682
Do not initialize RequestQueue repeatedly
janbuchar Apr 23, 2026
481abe9
Handle missing storage client cache key better
janbuchar Apr 23, 2026
f1106f0
Clean up unused locks
janbuchar Apr 23, 2026
4f69d63
Refactor to match crawlee-python better and avoid race conditions
janbuchar Apr 23, 2026
a238fb9
Improve JSDoc wording
janbuchar Apr 29, 2026
5a78768
Improve interface of client creation methods
janbuchar Apr 29, 2026
1ac08cf
Fix build
janbuchar Apr 30, 2026
40ccbd5
Merge remote-tracking branch 'origin/v4' into refactor-storage-manager
janbuchar May 4, 2026
6116429
Move StorageOpenOptions out of the storage instance manager file
janbuchar May 4, 2026
7868f7d
Clean up the storage_instance_manager file
janbuchar May 4, 2026
0b14900
Do not pass implicit configuration to storage frontend constructors
janbuchar May 4, 2026
921dadd
Rename type variable
janbuchar May 4, 2026
09cfd75
Do not check for cached storage subclient instances
janbuchar May 7, 2026
673b586
Remove unnecessary ServiceLocator.clearStorageManagerCache
janbuchar May 7, 2026
ebac3ad
Correctly check for pre-existing storages
janbuchar May 7, 2026
3cda48c
Hide ugly workarounds inside memory storage
janbuchar May 7, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion packages/basic-crawler/src/internals/basic-crawler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1299,7 +1299,8 @@ export class BasicCrawler<
// we need to purge the default RQ to allow processing the same requests again - this is important so users can
// pass in failed requests back to the `crawler.run()`, otherwise they would be considered as handled and
// ignored - as a failed requests is still handled.
if (this.requestQueue?.name === 'default' && purgeRequestQueue) {
const isDefaultQueue = this.requestQueue?.name === 'default';
if (isDefaultQueue && purgeRequestQueue && this.requestQueue) {
await this.requestQueue.drop();
this.requestQueue = await this._getRequestQueue();
this.requestManager = undefined;
Expand Down
2 changes: 1 addition & 1 deletion packages/core/src/crawlers/crawler_commons.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import type { Session } from '../session_pool/session.js';
import type { Dataset } from '../storages/dataset.js';
import { KeyValueStore, type RecordOptions } from '../storages/key_value_store.js';
import type { RequestQueueOperationOptions } from '../storages/request_provider.js';
import type { StorageIdentifier } from '../storages/storage_manager.js';
import type { StorageIdentifier } from '../storages/storage_instance_manager.js';

/** @internal */
export type IsAny<T> = 0 extends 1 & T ? true : false;
Expand Down
53 changes: 13 additions & 40 deletions packages/core/src/service_locator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@ import type { EventManager } from './events/event_manager.js';
import { LocalEventManager } from './events/local_event_manager.js';
import type { CrawleeLogger } from './log.js';
import { ApifyLogAdapter } from './log.js';
import type { IStorage, StorageManager } from './storages/storage_manager.js';
import type { Constructor } from './typedefs.js';
import { StorageInstanceManager } from './storages/storage_instance_manager.js';

interface ServiceLocatorInterface {
/**
Expand Down Expand Up @@ -77,15 +76,10 @@ interface ServiceLocatorInterface {
*/
getChildLog(prefix: string): CrawleeLogger;

getStorageManager(constructor: Constructor<IStorage>): StorageManager | undefined;

setStorageManager(constructor: Constructor<IStorage>, storageManager: StorageManager): void;

/**
* Clears all storage manager caches.
* @internal
* Get the storage instance manager (shared across all storage types).
*/
clearStorageManagerCache(): void;
getStorageInstanceManager(): StorageInstanceManager;

/**
* Resets the service locator to its initial state.
Expand Down Expand Up @@ -134,10 +128,11 @@ export class ServiceLocator implements ServiceLocatorInterface {
private logger?: CrawleeLogger;

/**
* Storage managers for Dataset, KeyValueStore, and RequestQueue.
* Manages caching and lifecycle of storage instances.
* Unified storage instance manager for Dataset, KeyValueStore, and RequestQueue.
* Shared across all ServiceLocator instances (global singleton), matching crawlee-python.
* Per-crawler isolation is achieved via `clientCacheKey`, not separate manager instances.
*/
private storageManagers = new Map<Constructor<IStorage>, StorageManager>();
private static storageInstanceManager?: StorageInstanceManager;

/**
* Creates a new ServiceLocator instance.
Expand Down Expand Up @@ -264,42 +259,20 @@ export class ServiceLocator implements ServiceLocatorInterface {
return this.getLogger().child({ prefix });
}

getStorageManager(constructor: Constructor<IStorage>): StorageManager | undefined {
return this.storageManagers.get(constructor);
}

setStorageManager(constructor: Constructor<IStorage>, storageManager: StorageManager): void {
if (this.storageManagers.has(constructor)) {
throw new ServiceConflictError(
`StorageManager(${constructor.name})`,
storageManager,
this.storageManagers.get(constructor),
);
getStorageInstanceManager(): StorageInstanceManager {
if (!ServiceLocator.storageInstanceManager) {
ServiceLocator.storageInstanceManager = new StorageInstanceManager();
}

this.storageManagers.set(constructor, storageManager);
}

clearStorageManagerCache(): void {
this.storageManagers.forEach((manager) => {
// KeyValueStore has a clearCache method on its instances
// TODO this uses fragile string matching and `any` casts into private fields - remove as part of
// https://github.com/apify/crawlee/issues/3075 (Storage instance management will be reworked significantly)
if ((manager as any).name === 'KeyValueStore') {
(manager as any).cache?.forEach((item: any) => {
item.clearCache?.();
});
}
});
this.storageManagers.clear();
return ServiceLocator.storageInstanceManager;
}

reset(): void {
this.configuration = undefined;
this.eventManager = undefined;
this.storageClient = undefined;
this.logger = undefined;
this.clearStorageManagerCache();
ServiceLocator.storageInstanceManager?.clearCache();
ServiceLocator.storageInstanceManager = undefined;
}
}

Expand Down
23 changes: 14 additions & 9 deletions packages/core/src/storages/dataset.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ import { serviceLocator } from '../service_locator.js';
import type { Awaitable } from '../typedefs.js';
import { checkStorageAccess } from './access_checking.js';
import { KeyValueStore } from './key_value_store.js';
import type { StorageIdentifier, StorageManagerOptions } from './storage_manager.js';
import { StorageManager } from './storage_manager.js';
import type { StorageIdentifier } from './storage_instance_manager.js';
import type { StorageOpenOptions } from './utils.js';
import { resolveStorageIdentifier } from './storage_instance_manager.js';
import { purgeDefaultStorages } from './utils.js';

/** @internal */
Expand Down Expand Up @@ -681,8 +682,7 @@ export class Dataset<Data extends Dictionary = Dictionary> {
checkStorageAccess();

await this.client.delete();
const manager = StorageManager.getManager(Dataset);
manager.closeStorage(this);
serviceLocator.getStorageInstanceManager().removeFromCache(this);
}

/**
Expand All @@ -702,7 +702,7 @@ export class Dataset<Data extends Dictionary = Dictionary> {
*/
static async open<Data extends Dictionary = Dictionary>(
identifier?: string | StorageIdentifier | null,
options: StorageManagerOptions = {},
options: StorageOpenOptions = {},
): Promise<Dataset<Data>> {
checkStorageAccess();

Expand All @@ -715,13 +715,18 @@ export class Dataset<Data extends Dictionary = Dictionary> {
);

options.config ??= Configuration.getGlobalConfig();
options.storageClient ??= serviceLocator.getStorageClient();

await purgeDefaultStorages({ onlyPurgeOnce: true, client: options.storageClient, config: options.config });
const client = options.storageClient ?? serviceLocator.getStorageClient();

const manager = StorageManager.getManager<Dataset<Data>>(this);
await purgeDefaultStorages({ onlyPurgeOnce: true, client, config: options.config });

return manager.openStorage(identifier, options.storageClient);
const resolved = await resolveStorageIdentifier(identifier, client, 'Dataset');

return serviceLocator.getStorageInstanceManager().openStorage<Dataset<Data>>(this, {
...resolved,
clientOpener: () => client.createDatasetClient(resolved),
clientCacheKey: client.getStorageClientCacheKey?.() ?? client.constructor.name,
});
}

/**
Expand Down
2 changes: 1 addition & 1 deletion packages/core/src/storages/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ export * from './request_provider.js';
export { RequestQueueV1 } from './request_queue.js';
export { RequestQueue } from './request_queue_v2.js';
export { RequestQueue as RequestQueueV2 } from './request_queue_v2.js';
export * from './storage_manager.js';
export * from './storage_instance_manager.js';
export * from './utils.js';
export * from './access_checking.js';
export * from './sitemap_request_list.js';
Expand Down
22 changes: 13 additions & 9 deletions packages/core/src/storages/key_value_store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@ import { Configuration } from '../configuration.js';
import { serviceLocator } from '../service_locator.js';
import type { Awaitable } from '../typedefs.js';
import { checkStorageAccess } from './access_checking.js';
import type { StorageIdentifier, StorageManagerOptions } from './storage_manager.js';
import { StorageManager } from './storage_manager.js';
import type { StorageIdentifier } from './storage_instance_manager.js';
import type { StorageOpenOptions } from './utils.js';
import { resolveStorageIdentifier } from './storage_instance_manager.js';
import { purgeDefaultStorages } from './utils.js';

/**
Expand Down Expand Up @@ -417,8 +418,7 @@ export class KeyValueStore {
checkStorageAccess();

await this.client.delete();
const manager = StorageManager.getManager(KeyValueStore);
manager.closeStorage(this);
serviceLocator.getStorageInstanceManager().removeFromCache(this);
}

/** @internal */
Expand Down Expand Up @@ -599,7 +599,7 @@ export class KeyValueStore {
*/
static async open(
identifier?: string | StorageIdentifier | null,
options: StorageManagerOptions = {},
options: StorageOpenOptions = {},
): Promise<KeyValueStore> {
checkStorageAccess();

Expand All @@ -612,13 +612,17 @@ export class KeyValueStore {
);

options.config ??= Configuration.getGlobalConfig();
options.storageClient ??= serviceLocator.getStorageClient();
const client = options.storageClient ?? serviceLocator.getStorageClient();

await purgeDefaultStorages({ onlyPurgeOnce: true, client: options.storageClient, config: options.config });
await purgeDefaultStorages({ onlyPurgeOnce: true, client, config: options.config });

const manager = StorageManager.getManager(this);
const resolved = await resolveStorageIdentifier(identifier, client, 'KeyValueStore');

return manager.openStorage(identifier, options.storageClient);
return serviceLocator.getStorageInstanceManager().openStorage<KeyValueStore>(this, {
...resolved,
clientOpener: () => client.createKeyValueStoreClient(resolved),
clientCacheKey: client.getStorageClientCacheKey?.() ?? client.constructor.name,
});
}

/**
Expand Down
61 changes: 36 additions & 25 deletions packages/core/src/storages/request_provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@ import type { ProxyConfiguration } from '../proxy_configuration.js';
import type { InternalSource, RequestOptions, Source } from '../request.js';
import { Request } from '../request.js';
import { serviceLocator } from '../service_locator.js';
import type { Constructor } from '../typedefs.js';
import { checkStorageAccess } from './access_checking.js';
import type { IStorage, StorageIdentifier, StorageManagerOptions } from './storage_manager.js';
import { StorageManager } from './storage_manager.js';
import type { IStorage, StorageIdentifier } from './storage_instance_manager.js';
import type { StorageOpenOptions } from './utils.js';
import { resolveStorageIdentifier } from './storage_instance_manager.js';
import { getRequestId, purgeDefaultStorages, QUERY_HEAD_MIN_LENGTH } from './utils.js';

export type RequestsLike = AsyncIterable<Source | string> | Iterable<Source | string> | (Source | string)[];
Expand Down Expand Up @@ -120,6 +120,7 @@ export abstract class RequestProvider implements IStorage, IRequestManager {

private initialCount = 0;
private initialHandledCount = 0; // We track this separately from `assumedHandledCount` which is used non-trivially by RequestQueueV1
private isInitialized = false;

protected queueHeadIds = new ListDictionary<string>();
protected requestCache: LruCache<RequestLruItem>;
Expand All @@ -140,7 +141,7 @@ export abstract class RequestProvider implements IStorage, IRequestManager {

constructor(
options: InternalRequestProviderOptions,
protected readonly config: Configuration,
protected readonly config: Configuration = Configuration.getGlobalConfig(),
) {
this.id = options.id;
this.name = options.name;
Expand Down Expand Up @@ -722,8 +723,7 @@ export abstract class RequestProvider implements IStorage, IRequestManager {
checkStorageAccess();

await this.client.delete();
const manager = StorageManager.getManager(this.constructor as Constructor<IStorage>);
manager.closeStorage(this);
serviceLocator.getStorageInstanceManager().removeFromCache(this);
}

/**
Expand Down Expand Up @@ -861,7 +861,7 @@ export abstract class RequestProvider implements IStorage, IRequestManager {
*/
static async open(
identifier?: string | StorageIdentifier | null,
options: StorageManagerOptions = {},
options: StorageOpenOptions = {},
): Promise<RequestProvider> {
checkStorageAccess();

Expand All @@ -875,30 +875,41 @@ export abstract class RequestProvider implements IStorage, IRequestManager {
}),
);

options.storageClient ??= serviceLocator.getStorageClient();
const client = options.storageClient ?? serviceLocator.getStorageClient();
const config = options.config ?? serviceLocator.getConfiguration();

await purgeDefaultStorages({ onlyPurgeOnce: true, client: options.storageClient, config: options.config });
await purgeDefaultStorages({ onlyPurgeOnce: true, client, config });

const manager = StorageManager.getManager(this as typeof BuiltRequestProvider);
const queue = await manager.openStorage(identifier, options.storageClient);
const resolved = await resolveStorageIdentifier(identifier, client, 'RequestQueue');

const queue = await serviceLocator
.getStorageInstanceManager()
.openStorage<RequestProvider>(this as typeof BuiltRequestProvider, {
...resolved,
clientOpener: () => client.createRequestQueueClient(resolved),
clientCacheKey: client.getStorageClientCacheKey?.() ?? client.constructor.name,
});
queue.proxyConfiguration = options.proxyConfiguration;
queue.httpClient = options.httpClient;

// Re-create the request queue client with clientKey and timeoutSecs so that
// request locking works correctly for API-backed implementations.
// TODO: clientKey/timeoutSecs are Apify-platform concerns and should eventually be pushed
// down into the Apify SDK's client implementation, aligning with crawlee-python's approach
// where locking is handled internally by the client (see crawlee-python PR #1194).
queue.client = await options.storageClient.createRequestQueueClient({
id: queue.id,
clientKey: queue.clientKey,
timeoutSecs: queue.timeoutSecs,
});
if (!queue.isInitialized) {
// Re-create the request queue client with clientKey and timeoutSecs so that
// request locking works correctly for API-backed implementations.
// TODO: clientKey/timeoutSecs are Apify-platform concerns and should eventually be pushed
// down into the Apify SDK's client implementation, aligning with crawlee-python's approach
// where locking is handled internally by the client (see crawlee-python PR #1194).
queue.client = await client.createRequestQueueClient({
id: queue.id,
clientKey: queue.clientKey,
timeoutSecs: queue.timeoutSecs,
});

const queueInfo = await queue.client.getMetadata();
const queueInfo = await queue.client.getMetadata();

queue.initialCount = queueInfo.totalRequestCount;
queue.initialHandledCount = queueInfo.handledRequestCount;
queue.httpClient = options.httpClient;
queue.initialCount = queueInfo.totalRequestCount;
queue.initialHandledCount = queueInfo.handledRequestCount;
queue.isInitialized = true;
}

return queue;
}
Expand Down
Loading
Loading