From e60a71e7025ae1fcc6720daaf294dc724acc45b7 Mon Sep 17 00:00:00 2001 From: Rohan Pota Date: Fri, 15 May 2026 10:01:29 -0400 Subject: [PATCH] ASP driver changes --- package-lock.json | 1 - poc_asp.ts | 179 ++++ src/index.ts | 16 + .../create_stream_processor.ts | 46 ++ .../drop_stream_processor.ts | 28 + .../get_more_sample_stream_processor.ts | 35 + .../stream_processing/get_stream_processor.ts | 28 + .../get_stream_processor_stats.ts | 37 + .../start_sample_stream_processor.ts | 31 + .../start_stream_processor.ts | 48 ++ .../stop_stream_processor.ts | 28 + src/stream_processing/index.ts | 18 + src/stream_processing/sample_cursor.ts | 104 +++ .../stream_processing_client.ts | 88 ++ src/stream_processing/stream_processors.ts | 281 +++++++ src/stream_processing/types.ts | 154 ++++ .../atlas_stream_processing.prose.test.ts | 242 ++++++ test/mongodb_all.ts | 9 + .../stream_processing.test.ts | 774 ++++++++++++++++++ 19 files changed, 2146 insertions(+), 1 deletion(-) create mode 100644 poc_asp.ts create mode 100644 src/operations/stream_processing/create_stream_processor.ts create mode 100644 src/operations/stream_processing/drop_stream_processor.ts create mode 100644 src/operations/stream_processing/get_more_sample_stream_processor.ts create mode 100644 src/operations/stream_processing/get_stream_processor.ts create mode 100644 src/operations/stream_processing/get_stream_processor_stats.ts create mode 100644 src/operations/stream_processing/start_sample_stream_processor.ts create mode 100644 src/operations/stream_processing/start_stream_processor.ts create mode 100644 src/operations/stream_processing/stop_stream_processor.ts create mode 100644 src/stream_processing/index.ts create mode 100644 src/stream_processing/sample_cursor.ts create mode 100644 src/stream_processing/stream_processing_client.ts create mode 100644 src/stream_processing/stream_processors.ts create mode 100644 src/stream_processing/types.ts create mode 100644 test/integration/atlas-stream-processing/atlas_stream_processing.prose.test.ts create mode 100644 test/unit/stream_processing/stream_processing.test.ts diff --git a/package-lock.json b/package-lock.json index d584873227e..3dbae71a69b 100644 --- a/package-lock.json +++ b/package-lock.json @@ -34,7 +34,6 @@ "@typescript-eslint/eslint-plugin": "^8.46.3", "@typescript-eslint/parser": "^8.31.1", "aws4": "^1.13.2", - "baseline-browser-mapping": "^2.10.0", "chai": "^4.4.1", "chai-subset": "^1.6.0", "chalk": "^4.1.2", diff --git a/poc_asp.ts b/poc_asp.ts new file mode 100644 index 00000000000..cccda3da65f --- /dev/null +++ b/poc_asp.ts @@ -0,0 +1,179 @@ +/** + * POC: Atlas Stream Processing — create / start / stop / drop a stream processor. + * + * Run with: + * npx ts-node --skipProject poc_asp.ts + * + * Pipeline used: + * $source → sample_stream_solar + * $emit → __testLog + */ + +import { MongoServerError } from './src/error'; +import { StreamProcessingClient } from './src/stream_processing/stream_processing_client'; + +// --------------------------------------------------------------------------- +// Configuration +// --------------------------------------------------------------------------- + +const WORKSPACE_URI = + 'mongodb://atlas-stream-69ed590869155100cecc8b33-lulzki.virginia-usa.a.query.mongodb-dev.net/'; +const USERNAME = 'streams'; +const PASSWORD = 'letsdostreaming123'; + +const PROCESSOR_NAME = 'simpletestSP_node'; + +const PIPELINE = [ + { + $source: { + connectionName: 'sample_stream_solar' + } + }, + { + $emit: { + connectionName: '__testLog' + } + } +]; + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +const sleep = (ms: number): Promise => new Promise(resolve => setTimeout(resolve, ms)); + +// --------------------------------------------------------------------------- +// POC steps +// --------------------------------------------------------------------------- + +async function main(): Promise { + const client = new StreamProcessingClient(WORKSPACE_URI, { + auth: { username: USERNAME, password: PASSWORD } + }); + + try { + const sps = client.streamProcessors(); + + // ------------------------------------------------------------------ + // 1. Create + // ------------------------------------------------------------------ + console.log(`\n[1] Creating processor '${PROCESSOR_NAME}' ...`); + try { + await sps.create(PROCESSOR_NAME, PIPELINE); + console.log(' Created OK'); + } catch (e) { + if (e instanceof MongoServerError) { + throw new Error(` Create failed (code ${e.code}): ${e.message}`); + } + throw e; + } + + // ------------------------------------------------------------------ + // 2. Inspect before starting + // ------------------------------------------------------------------ + console.log('\n[2] Getting info ...'); + let info = await sps.getInfo(PROCESSOR_NAME); + console.log(` state : ${info.state}`); + console.log(` pipelineVersion : ${info.pipelineVersion}`); + console.log(` hasStarted : ${info.hasStarted}`); + + // ------------------------------------------------------------------ + // 3. Start + // ------------------------------------------------------------------ + const proc = sps.get(PROCESSOR_NAME); + console.log('\n[3] Starting processor ...'); + try { + await proc.start(); + console.log(' Start command sent OK'); + } catch (e) { + if (e instanceof MongoServerError) { + throw new Error(` Start failed (code ${e.code}): ${e.message}`); + } + throw e; + } + + await sleep(2000); + + info = await sps.getInfo(PROCESSOR_NAME); + console.log(` state after start: ${info.state}`); + + // ------------------------------------------------------------------ + // 4. Stats + // ------------------------------------------------------------------ + console.log('\n[4] Fetching stats ...'); + try { + const rawStats = await proc.stats(); + console.dir(rawStats, { depth: null }); + } catch (e) { + if (e instanceof MongoServerError) { + console.log(` Stats unavailable (code ${e.code}): ${e.message}`); + } else { + throw e; + } + } + + // ------------------------------------------------------------------ + // 5. Sample (up to 5 docs) + // Note: breaking manually after N docs because the dev server does not + // signal cursor exhaustion with cursorId=0 as the spec requires. + // ------------------------------------------------------------------ + console.log('\n[5] Sampling up to 5 documents ...'); + try { + let count = 0; + for await (const doc of proc.sample()) { + console.log(` doc: ${JSON.stringify(doc)}`); + count += 1; + if (count >= 5) break; + } + console.log(` Sampled ${count} document(s)`); + } catch (e) { + if (e instanceof MongoServerError) { + console.log(` Sample unavailable (code ${e.code}): ${e.message}`); + } else { + throw e; + } + } + + // ------------------------------------------------------------------ + // 6. Stop + // ------------------------------------------------------------------ + console.log('\n[6] Stopping processor ...'); + try { + await proc.stop(); + console.log(' Stop command sent OK'); + } catch (e) { + if (e instanceof MongoServerError) { + throw new Error(` Stop failed (code ${e.code}): ${e.message}`); + } + throw e; + } + + await sleep(1000); + + info = await sps.getInfo(PROCESSOR_NAME); + console.log(` state after stop : ${info.state}`); + + // ------------------------------------------------------------------ + // 7. Drop (permanent — comment out to keep the processor alive) + // ------------------------------------------------------------------ + console.log('\n[7] Dropping processor ...'); + try { + await proc.drop(); + console.log(' Dropped OK'); + } catch (e) { + if (e instanceof MongoServerError) { + throw new Error(` Drop failed (code ${e.code}): ${e.message}`); + } + throw e; + } + + console.log('\nDone.'); + } finally { + await client.close(); + } +} + +main().catch(err => { + console.error(err); + process.exitCode = 1; +}); diff --git a/src/index.ts b/src/index.ts index 74803dfa2a0..a57c337de31 100644 --- a/src/index.ts +++ b/src/index.ts @@ -42,6 +42,7 @@ export { MongoBulkWriteError } from './bulk/common'; export { ClientEncryption } from './client-side-encryption/client_encryption'; +// Atlas Stream Processing (experimental) export { ChangeStreamCursor } from './cursor/change_stream_cursor'; export { ExplainableCursor } from './cursor/explainable_cursor'; export { @@ -87,6 +88,12 @@ export { MongoWriteConcernError, WriteConcernErrorResult } from './error'; +export { + SampleCursor, + StreamProcessingClient, + StreamProcessor, + StreamProcessors +} from './stream_processing'; export { AbstractCursor, // Actual driver classes exported @@ -612,6 +619,15 @@ export type { WithTransactionCallback } from './sessions'; export type { Sort, SortDirection, SortDirectionForCmd, SortForCmd } from './sort'; +export type { + CreateStreamProcessorOptions, + GetStreamProcessorSamplesOptions, + GetStreamProcessorSamplesResult, + GetStreamProcessorStatsOptions, + StartStreamProcessorOptions, + StreamProcessorInfo, + StreamProcessorTier +} from './stream_processing'; export type { CSOTTimeoutContext, CSOTTimeoutContextOptions, diff --git a/src/operations/stream_processing/create_stream_processor.ts b/src/operations/stream_processing/create_stream_processor.ts new file mode 100644 index 00000000000..1a206df42e5 --- /dev/null +++ b/src/operations/stream_processing/create_stream_processor.ts @@ -0,0 +1,46 @@ +import { type Connection } from '../..'; +import type { Document } from '../../bson'; +import { MongoDBResponse } from '../../cmap/wire_protocol/responses'; +import type { ClientSession } from '../../sessions'; +import type { CreateStreamProcessorOptions } from '../../stream_processing/types'; +import { CommandOperation, type CommandOperationOptions } from '../command'; +import { Aspect, defineAspects } from '../operation'; + +/** @internal */ +export class CreateStreamProcessorOperation extends CommandOperation { + override SERVER_COMMAND_RESPONSE_TYPE = MongoDBResponse; + + constructor( + readonly processorName: string, + readonly pipeline: Document[], + readonly aspOptions?: CreateStreamProcessorOptions, + options?: CommandOperationOptions + ) { + super(undefined, options); + } + + override get commandName() { + return 'createStreamProcessor' as const; + } + + override buildCommandDocument(_connection: Connection, _session?: ClientSession): Document { + const cmd: Document = { + createStreamProcessor: this.processorName, + pipeline: this.pipeline + }; + + if (this.aspOptions) { + const optsDoc: Document = {}; + if (this.aspOptions.dlq != null) optsDoc.dlq = this.aspOptions.dlq; + if (this.aspOptions.streamMetaFieldName != null) + optsDoc.streamMetaFieldName = this.aspOptions.streamMetaFieldName; + if (this.aspOptions.tier != null) optsDoc.tier = this.aspOptions.tier; + if (this.aspOptions.failover != null) optsDoc.failover = this.aspOptions.failover; + if (Object.keys(optsDoc).length > 0) cmd.options = optsDoc; + } + + return cmd; + } +} + +defineAspects(CreateStreamProcessorOperation, [Aspect.WRITE_OPERATION]); diff --git a/src/operations/stream_processing/drop_stream_processor.ts b/src/operations/stream_processing/drop_stream_processor.ts new file mode 100644 index 00000000000..d214c62a1fb --- /dev/null +++ b/src/operations/stream_processing/drop_stream_processor.ts @@ -0,0 +1,28 @@ +import { type Connection } from '../..'; +import type { Document } from '../../bson'; +import { MongoDBResponse } from '../../cmap/wire_protocol/responses'; +import type { ClientSession } from '../../sessions'; +import { CommandOperation, type CommandOperationOptions } from '../command'; +import { Aspect, defineAspects } from '../operation'; + +/** @internal */ +export class DropStreamProcessorOperation extends CommandOperation { + override SERVER_COMMAND_RESPONSE_TYPE = MongoDBResponse; + + constructor( + readonly processorName: string, + options?: CommandOperationOptions + ) { + super(undefined, options); + } + + override get commandName() { + return 'dropStreamProcessor' as const; + } + + override buildCommandDocument(_connection: Connection, _session?: ClientSession): Document { + return { dropStreamProcessor: this.processorName }; + } +} + +defineAspects(DropStreamProcessorOperation, [Aspect.WRITE_OPERATION]); diff --git a/src/operations/stream_processing/get_more_sample_stream_processor.ts b/src/operations/stream_processing/get_more_sample_stream_processor.ts new file mode 100644 index 00000000000..7d307e2509a --- /dev/null +++ b/src/operations/stream_processing/get_more_sample_stream_processor.ts @@ -0,0 +1,35 @@ +import { type Connection } from '../..'; +import type { Document } from '../../bson'; +import { MongoDBResponse } from '../../cmap/wire_protocol/responses'; +import type { ClientSession } from '../../sessions'; +import { CommandOperation, type CommandOperationOptions } from '../command'; +import { Aspect, defineAspects } from '../operation'; + +/** @internal */ +export class GetMoreSampleStreamProcessorOperation extends CommandOperation { + override SERVER_COMMAND_RESPONSE_TYPE = MongoDBResponse; + + constructor( + readonly processorName: string, + readonly cursorId: bigint | number, + readonly batchSize?: number, + options?: CommandOperationOptions + ) { + super(undefined, options); + } + + override get commandName() { + return 'getMoreSampleStreamProcessor' as const; + } + + override buildCommandDocument(_connection: Connection, _session?: ClientSession): Document { + const cmd: Document = { + getMoreSampleStreamProcessor: this.processorName, + cursorId: this.cursorId + }; + if (this.batchSize != null) cmd.batchSize = this.batchSize; + return cmd; + } +} + +defineAspects(GetMoreSampleStreamProcessorOperation, [Aspect.WRITE_OPERATION]); diff --git a/src/operations/stream_processing/get_stream_processor.ts b/src/operations/stream_processing/get_stream_processor.ts new file mode 100644 index 00000000000..789c9251fb1 --- /dev/null +++ b/src/operations/stream_processing/get_stream_processor.ts @@ -0,0 +1,28 @@ +import { type Connection } from '../..'; +import type { Document } from '../../bson'; +import { MongoDBResponse } from '../../cmap/wire_protocol/responses'; +import type { ClientSession } from '../../sessions'; +import { CommandOperation, type CommandOperationOptions } from '../command'; +import { Aspect, defineAspects } from '../operation'; + +/** @internal */ +export class GetStreamProcessorOperation extends CommandOperation { + override SERVER_COMMAND_RESPONSE_TYPE = MongoDBResponse; + + constructor( + readonly processorName: string, + options?: CommandOperationOptions + ) { + super(undefined, options); + } + + override get commandName() { + return 'getStreamProcessor' as const; + } + + override buildCommandDocument(_connection: Connection, _session?: ClientSession): Document { + return { getStreamProcessor: this.processorName }; + } +} + +defineAspects(GetStreamProcessorOperation, [Aspect.READ_OPERATION, Aspect.RETRYABLE]); diff --git a/src/operations/stream_processing/get_stream_processor_stats.ts b/src/operations/stream_processing/get_stream_processor_stats.ts new file mode 100644 index 00000000000..9812335e690 --- /dev/null +++ b/src/operations/stream_processing/get_stream_processor_stats.ts @@ -0,0 +1,37 @@ +import { type Connection } from '../..'; +import type { Document } from '../../bson'; +import { MongoDBResponse } from '../../cmap/wire_protocol/responses'; +import type { ClientSession } from '../../sessions'; +import type { GetStreamProcessorStatsOptions } from '../../stream_processing/types'; +import { CommandOperation, type CommandOperationOptions } from '../command'; +import { Aspect, defineAspects } from '../operation'; + +/** @internal */ +export class GetStreamProcessorStatsOperation extends CommandOperation { + override SERVER_COMMAND_RESPONSE_TYPE = MongoDBResponse; + + constructor( + readonly processorName: string, + readonly aspOptions?: GetStreamProcessorStatsOptions, + options?: CommandOperationOptions + ) { + super(undefined, options); + } + + override get commandName() { + return 'getStreamProcessorStats' as const; + } + + override buildCommandDocument(_connection: Connection, _session?: ClientSession): Document { + const cmd: Document = { getStreamProcessorStats: this.processorName }; + + if (this.aspOptions) { + if (this.aspOptions.scale != null) cmd.scale = this.aspOptions.scale; + if (this.aspOptions.verbose != null) cmd.verbose = this.aspOptions.verbose; + } + + return cmd; + } +} + +defineAspects(GetStreamProcessorStatsOperation, [Aspect.READ_OPERATION, Aspect.RETRYABLE]); diff --git a/src/operations/stream_processing/start_sample_stream_processor.ts b/src/operations/stream_processing/start_sample_stream_processor.ts new file mode 100644 index 00000000000..68323a627be --- /dev/null +++ b/src/operations/stream_processing/start_sample_stream_processor.ts @@ -0,0 +1,31 @@ +import { type Connection } from '../..'; +import type { Document } from '../../bson'; +import { MongoDBResponse } from '../../cmap/wire_protocol/responses'; +import type { ClientSession } from '../../sessions'; +import { CommandOperation, type CommandOperationOptions } from '../command'; +import { Aspect, defineAspects } from '../operation'; + +/** @internal */ +export class StartSampleStreamProcessorOperation extends CommandOperation { + override SERVER_COMMAND_RESPONSE_TYPE = MongoDBResponse; + + constructor( + readonly processorName: string, + readonly limit?: number, + options?: CommandOperationOptions + ) { + super(undefined, options); + } + + override get commandName() { + return 'startSampleStreamProcessor' as const; + } + + override buildCommandDocument(_connection: Connection, _session?: ClientSession): Document { + const cmd: Document = { startSampleStreamProcessor: this.processorName }; + if (this.limit != null) cmd.limit = this.limit; + return cmd; + } +} + +defineAspects(StartSampleStreamProcessorOperation, [Aspect.WRITE_OPERATION]); diff --git a/src/operations/stream_processing/start_stream_processor.ts b/src/operations/stream_processing/start_stream_processor.ts new file mode 100644 index 00000000000..4ecae5974c0 --- /dev/null +++ b/src/operations/stream_processing/start_stream_processor.ts @@ -0,0 +1,48 @@ +import { type Connection } from '../..'; +import type { Document } from '../../bson'; +import { MongoDBResponse } from '../../cmap/wire_protocol/responses'; +import type { ClientSession } from '../../sessions'; +import type { StartStreamProcessorOptions } from '../../stream_processing/types'; +import { CommandOperation, type CommandOperationOptions } from '../command'; +import { Aspect, defineAspects } from '../operation'; + +/** @internal */ +export class StartStreamProcessorOperation extends CommandOperation { + override SERVER_COMMAND_RESPONSE_TYPE = MongoDBResponse; + + constructor( + readonly processorName: string, + readonly aspOptions?: StartStreamProcessorOptions, + options?: CommandOperationOptions + ) { + super(undefined, options); + } + + override get commandName() { + return 'startStreamProcessor' as const; + } + + override buildCommandDocument(_connection: Connection, _session?: ClientSession): Document { + const cmd: Document = { startStreamProcessor: this.processorName }; + + if (this.aspOptions) { + if (this.aspOptions.workers != null) cmd.workers = this.aspOptions.workers; + + const optsDoc: Document = {}; + if (this.aspOptions.clearCheckpoints != null) + optsDoc.clearCheckpoints = this.aspOptions.clearCheckpoints; + if (this.aspOptions.startAtOperationTime != null) + optsDoc.startAtOperationTime = this.aspOptions.startAtOperationTime; + if (this.aspOptions.startAfter != null) optsDoc.startAfter = this.aspOptions.startAfter; + if (this.aspOptions.tier != null) optsDoc.tier = this.aspOptions.tier; + if (this.aspOptions.enableAutoScaling != null) + optsDoc.enableAutoScaling = this.aspOptions.enableAutoScaling; + if (this.aspOptions.failover != null) optsDoc.failover = this.aspOptions.failover; + if (Object.keys(optsDoc).length > 0) cmd.options = optsDoc; + } + + return cmd; + } +} + +defineAspects(StartStreamProcessorOperation, [Aspect.WRITE_OPERATION]); diff --git a/src/operations/stream_processing/stop_stream_processor.ts b/src/operations/stream_processing/stop_stream_processor.ts new file mode 100644 index 00000000000..b8d00916281 --- /dev/null +++ b/src/operations/stream_processing/stop_stream_processor.ts @@ -0,0 +1,28 @@ +import { type Connection } from '../..'; +import type { Document } from '../../bson'; +import { MongoDBResponse } from '../../cmap/wire_protocol/responses'; +import type { ClientSession } from '../../sessions'; +import { CommandOperation, type CommandOperationOptions } from '../command'; +import { Aspect, defineAspects } from '../operation'; + +/** @internal */ +export class StopStreamProcessorOperation extends CommandOperation { + override SERVER_COMMAND_RESPONSE_TYPE = MongoDBResponse; + + constructor( + readonly processorName: string, + options?: CommandOperationOptions + ) { + super(undefined, options); + } + + override get commandName() { + return 'stopStreamProcessor' as const; + } + + override buildCommandDocument(_connection: Connection, _session?: ClientSession): Document { + return { stopStreamProcessor: this.processorName }; + } +} + +defineAspects(StopStreamProcessorOperation, [Aspect.WRITE_OPERATION]); diff --git a/src/stream_processing/index.ts b/src/stream_processing/index.ts new file mode 100644 index 00000000000..77f4d7e000a --- /dev/null +++ b/src/stream_processing/index.ts @@ -0,0 +1,18 @@ +/** + * Atlas Stream Processing (experimental) + * + * Errors from ASP commands are surfaced as `MongoServerError`. The following + * codes are known to be returned, but the list is non-exhaustive and may grow: + * + * 9 FailedToParse Invalid pipeline or command document + * 72 InvalidOptions Invalid option values + * 125 CommandFailed General command execution failure + * 1 InternalError Unexpected server-side error + * + * Do NOT branch on this list as if it were closed — server may return new codes. + */ + +export { SampleCursor } from './sample_cursor'; +export { StreamProcessingClient } from './stream_processing_client'; +export { StreamProcessor, StreamProcessors } from './stream_processors'; +export * from './types'; diff --git a/src/stream_processing/sample_cursor.ts b/src/stream_processing/sample_cursor.ts new file mode 100644 index 00000000000..defd60a30ff --- /dev/null +++ b/src/stream_processing/sample_cursor.ts @@ -0,0 +1,104 @@ +import type { Document } from '../bson'; +import type { StreamProcessor } from './stream_processors'; +import type { GetStreamProcessorSamplesOptions } from './types'; + +/** + * An async-iterable cursor over Atlas Stream Processing sample output. + * + * Does not extend `AbstractCursor`; uses the dedicated + * `startSampleStreamProcessor` / `getMoreSampleStreamProcessor` wire commands. + * Server-side exhaustion is signalled by `cursorId === 0`. + * + * @public + * @experimental + */ +export class SampleCursor implements AsyncIterable { + private buffer: Document[] = []; + private currentCursorId: bigint | number | null = null; + private exhausted = false; + private closed = false; + + /** + * @param processor - The `StreamProcessor` to sample from. + * @param limit - Maximum documents requested on the initial wire call. + * @param batchSize - Documents per continuation wire call. + */ + constructor( + private readonly processor: StreamProcessor, + private readonly limit?: number, + private readonly batchSize?: number + ) {} + + /** + * The current server-assigned cursor ID. + * `null` before the first wire call; `0` or `0n` when exhausted. + * @experimental + */ + get cursorId(): bigint | number | null { + return this.currentCursorId; + } + + /** + * `true` if the cursor has not been exhausted or closed. + * @experimental + */ + get alive(): boolean { + return !this.exhausted && !this.closed; + } + + /** + * Marks the cursor closed. ASP has no kill-cursors equivalent; + * the server reclaims the cursor independently. + * + * @experimental + */ + async close(): Promise { + this.closed = true; + // ASP has no kill-cursors equivalent; server cleans up on its own. + } + + /** + * Returns an async iterator over the sampled documents. + * Implements the `AsyncIterable` contract. + * + * @experimental + */ + [Symbol.asyncIterator](): AsyncIterableIterator { + return this.iterator(); + } + + /** + * Async generator that yields documents from the server in batches. + * Continues until the cursor is exhausted (`cursorId === 0`) or closed. + * + * @experimental + */ + async *iterator(): AsyncIterableIterator { + while (!this.closed) { + const doc = this.buffer.shift(); + if (doc != null) { + yield doc; + continue; + } + if (this.exhausted) return; + await this.refill(); + // Guard: empty batch with non-zero cursorId continues iteration on next loop. + if (this.buffer.length === 0 && this.exhausted) return; + } + } + + private async refill(): Promise { + if (this.exhausted || this.closed) return; + // Mirrors Python's AsyncSampleCursor._refill exactly: + // null currentCursorId means not yet opened → send limit only. + // Non-null means continuation → send cursorId + batchSize only. + const opts: GetStreamProcessorSamplesOptions = + this.currentCursorId == null + ? { limit: this.limit } + : { cursorId: this.currentCursorId, batchSize: this.batchSize }; + const result = await this.processor.getStreamProcessorSamples(opts); + this.currentCursorId = result.cursorId; + this.buffer.push(...result.documents); + if (Number(result.cursorId) === 0) this.exhausted = true; + } +} diff --git a/src/stream_processing/stream_processing_client.ts b/src/stream_processing/stream_processing_client.ts new file mode 100644 index 00000000000..c123d7eec72 --- /dev/null +++ b/src/stream_processing/stream_processing_client.ts @@ -0,0 +1,88 @@ +import { MongoParseError } from '../error'; +import { MongoClient, type MongoClientOptions } from '../mongo_client'; +import { StreamProcessors } from './stream_processors'; + +/** + * A client for Atlas Stream Processing workspaces. + * + * Wraps a standard `MongoClient` with ASP-specific validation: + * requires a `mongodb://` URI, enforces TLS, and defaults `authSource` to `admin`. + * + * @public + * @experimental + */ +export class StreamProcessingClient { + /** @internal */ + readonly _mongoClient: MongoClient; + + /** + * @param url - A `mongodb://` connection string for the ASP workspace. + * `mongodb+srv://` is rejected. + * @param options - Standard `MongoClientOptions`. TLS is forced on; `ssl: false` is rejected. + */ + constructor(url: string, options?: MongoClientOptions) { + if (url.startsWith('mongodb+srv://')) { + throw new MongoParseError( + 'Atlas Stream Processing does not support mongodb+srv:// URIs; use mongodb:// instead' + ); + } + + const mergedOptions: MongoClientOptions = { ...options }; + + if (mergedOptions.tls === false || mergedOptions.ssl === false) { + throw new MongoParseError('TLS cannot be disabled for Atlas Stream Processing connections'); + } + + const qsMark = url.indexOf('?'); + if (qsMark !== -1) { + const params = new URLSearchParams(url.slice(qsMark + 1)); + if (params.get('tls') === 'false' || params.get('ssl') === 'false') { + throw new MongoParseError('TLS cannot be disabled for Atlas Stream Processing connections'); + } + } + + delete (mergedOptions as Record).ssl; + mergedOptions.tls = true; + + const hasAuthSourceInUrl = + qsMark !== -1 && new URLSearchParams(url.slice(qsMark + 1)).has('authSource'); + if (!hasAuthSourceInUrl && !mergedOptions.authSource) { + mergedOptions.authSource = 'admin'; + } + + this._mongoClient = new MongoClient(url, mergedOptions); + } + + /** + * Returns a handle for managing stream processors in this workspace. + * + * @returns A `StreamProcessors` instance bound to this client. + * @experimental + */ + streamProcessors(): StreamProcessors { + return new StreamProcessors(this); + } + + /** + * Closes the underlying connection and releases all resources. + * + * @experimental + */ + async close(): Promise { + await this._mongoClient.close(); + } + + /** + * Alias for {@link StreamProcessingClient.close} for use with `await using`. + * + * @experimental + */ + async [Symbol.asyncDispose](): Promise { + await this.close(); + } +} + +/** @internal */ +export function isWorkspaceEndpoint(host: string): boolean { + return host.startsWith('atlas-stream-') || host.endsWith('.a.query.mongodb.net'); +} diff --git a/src/stream_processing/stream_processors.ts b/src/stream_processing/stream_processors.ts new file mode 100644 index 00000000000..052cadfe38f --- /dev/null +++ b/src/stream_processing/stream_processors.ts @@ -0,0 +1,281 @@ +import type { Document } from '../bson'; +import { MongoInvalidArgumentError } from '../error'; +import { executeOperation } from '../operations/execute_operation'; +import { CreateStreamProcessorOperation } from '../operations/stream_processing/create_stream_processor'; +import { DropStreamProcessorOperation } from '../operations/stream_processing/drop_stream_processor'; +import { GetMoreSampleStreamProcessorOperation } from '../operations/stream_processing/get_more_sample_stream_processor'; +import { GetStreamProcessorOperation } from '../operations/stream_processing/get_stream_processor'; +import { GetStreamProcessorStatsOperation } from '../operations/stream_processing/get_stream_processor_stats'; +import { StartSampleStreamProcessorOperation } from '../operations/stream_processing/start_sample_stream_processor'; +import { StartStreamProcessorOperation } from '../operations/stream_processing/start_stream_processor'; +import { StopStreamProcessorOperation } from '../operations/stream_processing/stop_stream_processor'; +import { SampleCursor } from './sample_cursor'; +import type { StreamProcessingClient } from './stream_processing_client'; +import type { + CreateStreamProcessorOptions, + GetStreamProcessorSamplesOptions, + GetStreamProcessorSamplesResult, + GetStreamProcessorStatsOptions, + StartStreamProcessorOptions, + StreamProcessorInfo +} from './types'; + +// NOTE: Per the ASP driver spec, server errors MUST be surfaced as-is. +// Do NOT introduce error-code branching, rewrapping, or filtering anywhere +// in this module. Known codes are documented in src/stream_processing/index.ts +// for reference only — they are not runtime invariants. + +const VALID_TIERS = new Set(['SP2', 'SP5', 'SP10', 'SP30', 'SP50']); + +function assertName(name: string): void { + if (!name?.trim()) { + throw new MongoInvalidArgumentError('Stream processor name must be a non-empty string'); + } +} + +function toStreamProcessorInfo(doc: Document): StreamProcessorInfo { + return { + id: doc.id, + name: doc.name, + state: doc.state, + pipeline: doc.pipeline ?? [], + pipelineVersion: doc.pipelineVersion, + tier: doc.tier, + dlq: doc.dlq, + streamMetaFieldName: doc.streamMetaFieldName, + enableAutoScaling: doc.enableAutoScaling, + failoverEnabled: doc.failoverEnabled, + activeRegion: doc.activeRegion, + lastModifiedAt: doc.lastModifiedAt, + modifiedBy: doc.modifiedBy, + lastStateChange: doc.lastStateChange, + lastHeartbeat: doc.lastHeartbeat, + hasStarted: doc.hasStarted, + stats: doc.stats, + errorMsg: doc.errorMsg, + errorCode: doc.errorCode, + errorRetryable: doc.errorRetryable, + raw: doc + }; +} + +/** + * Provides collection-level operations for stream processors in an ASP workspace. + * Obtained via {@link StreamProcessingClient.streamProcessors}. + * + * @public + * @experimental + */ +export class StreamProcessors { + constructor(private readonly client: StreamProcessingClient) {} + + /** + * Creates a new stream processor in the workspace. + * + * Sends the `createStreamProcessor` wire command. + * @remarks This operation is not retryable. + * + * @param name - Processor name; must be non-empty. + * @param pipeline - Aggregation pipeline; must be non-empty. + * @param options - Additional creation options. + * @experimental + */ + async create( + name: string, + pipeline: Document[], + options?: CreateStreamProcessorOptions + ): Promise { + assertName(name); + if (!pipeline || pipeline.length === 0) { + throw new MongoInvalidArgumentError('createStreamProcessor requires a non-empty pipeline'); + } + const op = new CreateStreamProcessorOperation(name, pipeline, options); + await executeOperation(this.client._mongoClient, op); + } + + /** + * Returns a `StreamProcessor` handle for an existing processor. + * No wire command is sent. + * + * @param name - Processor name; must be non-empty. + * @returns A `StreamProcessor` bound to the named processor. + * @experimental + */ + get(name: string): StreamProcessor { + assertName(name); + return new StreamProcessor(this.client, name); + } + + /** + * Retrieves the current state snapshot for a stream processor. + * + * Sends the `getStreamProcessor` wire command. + * @remarks This operation is a retryable read. + * + * @param name - Processor name; must be non-empty. + * @returns A `StreamProcessorInfo` snapshot with the full server response on `.raw`. + * @experimental + */ + async getInfo(name: string): Promise { + assertName(name); + const op = new GetStreamProcessorOperation(name); + const response = await executeOperation(this.client._mongoClient, op); + // Dev server wraps response in { ok, result }; Atlas returns flat. + const doc = (response.result as Document | undefined) ?? response; + return toStreamProcessorInfo(doc); + } +} + +/** + * Represents a single stream processor and exposes its lifecycle operations. + * Obtained via {@link StreamProcessors.get}. + * + * @public + * @experimental + */ +export class StreamProcessor { + constructor( + private readonly client: StreamProcessingClient, + /** The name of the stream processor. */ + public readonly name: string + ) { + assertName(name); + } + + /** + * Starts the stream processor. + * + * Sends the `startStreamProcessor` wire command. + * @remarks This operation is not retryable. + * + * @param options - Start options. `startAfter` and `startAtOperationTime` are mutually exclusive. + * @experimental + */ + async start(options?: StartStreamProcessorOptions): Promise { + if (options?.startAfter && options?.startAtOperationTime) { + throw new MongoInvalidArgumentError( + 'startAfter and startAtOperationTime are mutually exclusive' + ); + } + if (options?.tier != null && !VALID_TIERS.has(options.tier)) { + throw new MongoInvalidArgumentError(`Invalid tier: ${options.tier}`); + } + if (options?.workers != null && options.workers <= 0) { + throw new MongoInvalidArgumentError('workers must be a positive integer'); + } + const op = new StartStreamProcessorOperation(this.name, options); + await executeOperation(this.client._mongoClient, op); + } + + /** + * Stops the stream processor. + * + * Sends the `stopStreamProcessor` wire command. + * @remarks This operation is not retryable. + * + * @experimental + */ + async stop(): Promise { + const op = new StopStreamProcessorOperation(this.name); + await executeOperation(this.client._mongoClient, op); + } + + /** + * Permanently deletes the stream processor. + * + * Sends the `dropStreamProcessor` wire command. + * @remarks This operation is not retryable. + * + * @experimental + */ + async drop(): Promise { + const op = new DropStreamProcessorOperation(this.name); + await executeOperation(this.client._mongoClient, op); + } + + /** + * Returns statistics for the stream processor. + * + * Sends the `getStreamProcessorStats` wire command. + * @remarks This operation is a retryable read. + * + * @param options - Stats options including optional scale factor. + * @returns Raw stats document from the server. + * @experimental + */ + async stats(options?: GetStreamProcessorStatsOptions): Promise { + if (options?.scale != null && options.scale <= 0) { + throw new MongoInvalidArgumentError('scale must be a positive integer'); + } + const op = new GetStreamProcessorStatsOperation(this.name, options); + return await executeOperation(this.client._mongoClient, op); + } + + /** + * Spec-literal entry point for the two-phase sample protocol. + * + * When `options.cursorId` is absent, sends `startSampleStreamProcessor` (initial call). + * When `options.cursorId` is present and non-zero, sends `getMoreSampleStreamProcessor`. + * @remarks This operation is not retryable. + * + * @param options - Sample options; use `cursorId` to continue an existing cursor. + * @returns `cursorId` and a batch of documents. `cursorId === 0` means exhausted. + * @experimental + */ + async getStreamProcessorSamples( + options?: GetStreamProcessorSamplesOptions + ): Promise { + const opts = options ?? {}; + + if (opts.cursorId != null && Number(opts.cursorId) === 0) { + throw new MongoInvalidArgumentError( + 'Sample cursor is exhausted; cursorId 0 cannot be continued' + ); + } + if (opts.cursorId != null && Number(opts.cursorId) < 0) { + throw new MongoInvalidArgumentError('cursorId must be a non-negative value'); + } + if (opts.limit != null && opts.limit < 0) { + throw new MongoInvalidArgumentError('limit must be a non-negative integer'); + } + if (opts.batchSize != null && opts.batchSize < 0) { + throw new MongoInvalidArgumentError('batchSize must be a non-negative integer'); + } + + let response: Document; + let documents: Document[]; + + if (opts.cursorId == null) { + // Initial: startSampleStreamProcessor with limit (NO batchSize) + response = await executeOperation( + this.client._mongoClient, + new StartSampleStreamProcessorOperation(this.name, opts.limit) + ); + // Real server returns `messages`; spec says `firstBatch`. Tolerate both. + documents = (response.messages as Document[] | undefined) ?? response.firstBatch ?? []; + } else { + // Continuation: getMoreSampleStreamProcessor with batchSize (NO limit) + response = await executeOperation( + this.client._mongoClient, + new GetMoreSampleStreamProcessorOperation(this.name, opts.cursorId, opts.batchSize) + ); + // Real server returns `messages`; spec says `nextBatch`. Tolerate both. + documents = (response.messages as Document[] | undefined) ?? response.nextBatch ?? []; + } + + const cursorIdRaw = response.cursorId ?? 0; + const cursorId = typeof cursorIdRaw === 'bigint' ? cursorIdRaw : Number(cursorIdRaw); + return { cursorId, documents }; + } + + /** + * Returns a `SampleCursor` that asynchronously iterates over stream processor output. + * + * @param options - Optional `limit` (initial call) and `batchSize` (continuation calls). + * @returns A `SampleCursor` ready for `for await...of` iteration. + * @experimental + */ + sample(options?: { limit?: number; batchSize?: number }): SampleCursor { + return new SampleCursor(this, options?.limit, options?.batchSize); + } +} diff --git a/src/stream_processing/types.ts b/src/stream_processing/types.ts new file mode 100644 index 00000000000..7584ac91caa --- /dev/null +++ b/src/stream_processing/types.ts @@ -0,0 +1,154 @@ +import type { Document, Timestamp } from '../bson'; + +/** + * Options for creating a stream processor. + * @public + * @experimental + */ +export interface CreateStreamProcessorOptions { + /** Dead letter queue connection configuration. */ + dlq?: Document; + /** Name of the field that contains stream metadata. */ + streamMetaFieldName?: string; + /** Compute tier for the processor. */ + tier?: StreamProcessorTier; + /** Whether to enable failover for the processor. */ + failover?: boolean; +} + +/** + * Options for starting a stream processor. + * @public + * @experimental + */ +export interface StartStreamProcessorOptions { + /** Number of worker threads. Sent at the top level of the command document. */ + workers?: number; + /** If true, clear any existing checkpoints before starting. */ + clearCheckpoints?: boolean; + /** + * Resume from this cluster time. + * Mutually exclusive with `startAfter`. + */ + startAtOperationTime?: Timestamp; + /** + * Resume after the given resume token document. + * Mutually exclusive with `startAtOperationTime`. + */ + startAfter?: Document; + /** Compute tier to use when starting. */ + tier?: StreamProcessorTier; + /** Enable auto-scaling for the processor. */ + enableAutoScaling?: boolean; + /** Failover configuration document. */ + failover?: Document; +} + +/** + * Options for `getStreamProcessorStats`. + * @public + * @experimental + */ +export interface GetStreamProcessorStatsOptions { + /** + * Scaling factor for size fields. + * `1` = bytes (default), `1024` = KiB. + */ + scale?: number; + /** If true, include additional verbose fields in the response. */ + verbose?: boolean; +} + +/** + * Options for the spec-literal `getStreamProcessorSamples` entry point. + * @public + * @experimental + */ +export interface GetStreamProcessorSamplesOptions { + /** + * Absent or `0` opens a new cursor; non-zero continues an existing one. + * Must be positive (non-zero) when used for continuation. + */ + cursorId?: bigint | number; + /** Maximum number of documents to return on the initial call. */ + limit?: number; + /** Number of documents to return per continuation call. */ + batchSize?: number; +} + +/** + * Result returned by `getStreamProcessorSamples`. + * `cursorId === 0` means the cursor is exhausted. + * @public + * @experimental + */ +export interface GetStreamProcessorSamplesResult { + /** Server-assigned cursor ID. `0` or `0n` indicates exhaustion. */ + cursorId: bigint | number; + /** Batch of sampled documents from the stream. */ + documents: Document[]; +} + +/** + * Spec-defined compute tier values for stream processors. + * @public + * @experimental + */ +export type StreamProcessorTier = 'SP2' | 'SP5' | 'SP10' | 'SP30' | 'SP50'; + +/** + * Snapshot of a stream processor's state as returned by `getStreamProcessor`. + * + * Per spec, `state` is a plain string — the driver must NOT enumerate it. + * The full server response is preserved on `raw` so unknown fields survive. + * + * @public + * @experimental + */ +export interface StreamProcessorInfo { + /** Unique processor ID. Dev server may omit this field. */ + id?: string; + /** Processor name. */ + name: string; + /** + * Current lifecycle state (e.g. `"CREATED"`, `"STARTED"`, `"STOPPED"`). + * Treated as an open string — do not branch on a closed enum. + */ + state: string; + /** The aggregation pipeline that defines the processor's logic. */ + pipeline: Document[]; + /** Pipeline schema version. Dev server may omit this field. */ + pipelineVersion?: number; + /** Compute tier. May be one of the known `StreamProcessorTier` values or an unrecognized string. */ + tier?: StreamProcessorTier | string; + /** Dead letter queue configuration. */ + dlq?: Document; + /** Name of the stream metadata field. */ + streamMetaFieldName?: string; + /** Whether auto-scaling is enabled. */ + enableAutoScaling?: boolean; + /** Whether failover is enabled. */ + failoverEnabled?: boolean; + /** The active deployment region. */ + activeRegion?: string; + /** Timestamp of the last modification. */ + lastModifiedAt?: Date; + /** Identity of the last modifier. */ + modifiedBy?: string; + /** Timestamp of the last state transition. */ + lastStateChange?: Date; + /** Timestamp of the last heartbeat. */ + lastHeartbeat?: Date; + /** Whether the processor has been started at least once. */ + hasStarted?: boolean; + /** Latest statistics snapshot from the server. */ + stats?: Document; + /** Human-readable error message if the processor is in an error state. */ + errorMsg?: string; + /** Numeric error code if the processor is in an error state. */ + errorCode?: number; + /** Whether the error is retryable. */ + errorRetryable?: boolean; + /** Full server response, preserved verbatim so unknown fields survive. */ + raw: Document; +} diff --git a/test/integration/atlas-stream-processing/atlas_stream_processing.prose.test.ts b/test/integration/atlas-stream-processing/atlas_stream_processing.prose.test.ts new file mode 100644 index 00000000000..c651a4b3770 --- /dev/null +++ b/test/integration/atlas-stream-processing/atlas_stream_processing.prose.test.ts @@ -0,0 +1,242 @@ +import { expect } from 'chai'; +import { setTimeout } from 'timers/promises'; + +import { + type Document, + MongoServerError, + SampleCursor, + StreamProcessingClient, + StreamProcessor, + StreamProcessors +} from '../../mongodb'; + +/** + * Integration tests for Atlas Stream Processing. + * + * Prerequisites: + * MONGODB_ASP_URI - mongodb:// URI for an ASP workspace + * MONGODB_ASP_USERNAME - username with permission to manage stream processors + * MONGODB_ASP_PASSWORD - password for that user + * + * Optional: + * MONGODB_ASP_SOURCE_CONNECTION - connection name for $source (default: sample_stream_solar) + * MONGODB_ASP_SINK_CONNECTION - connection name for $emit (default: __testLog) + * + * Run with: + * MONGODB_ASP_URI='mongodb://...' MONGODB_ASP_USERNAME=streams MONGODB_ASP_PASSWORD=... \ + * npx mocha --timeout 60000 test/integration/atlas-stream-processing + */ + +const ASP_URI = process.env.MONGODB_ASP_URI; +const ASP_USERNAME = process.env.MONGODB_ASP_USERNAME; +const ASP_PASSWORD = process.env.MONGODB_ASP_PASSWORD; +const SOURCE_CONNECTION = process.env.MONGODB_ASP_SOURCE_CONNECTION ?? 'sample_stream_solar'; +const SINK_CONNECTION = process.env.MONGODB_ASP_SINK_CONNECTION ?? '__testLog'; + +const PROCESSOR_NAME = `node_driver_test_${Date.now()}`; + +const PIPELINE: Document[] = [ + { $source: { connectionName: SOURCE_CONNECTION } }, + { $emit: { connectionName: SINK_CONNECTION } } +]; + +describe('Atlas Stream Processing', function () { + before(function () { + if (!ASP_URI) { + this.skipReason = 'MONGODB_ASP_URI is not set; skipping ASP integration tests'; + this.skip(); + } + }); + + describe('StreamProcessingClient', function () { + it('rejects mongodb+srv:// URIs', function () { + expect( + () => new StreamProcessingClient('mongodb+srv://host/db', { auth: { username: 'u', password: 'p' } }) + ).to.throw(Error, /mongodb\+srv/); + }); + + it('rejects tls: false in options', function () { + expect( + () => new StreamProcessingClient(ASP_URI!, { tls: false }) + ).to.throw(Error, /TLS cannot be disabled/); + }); + + it('rejects ssl=false in the URI query string', function () { + const uri = ASP_URI!.includes('?') + ? `${ASP_URI}&ssl=false` + : `${ASP_URI}?ssl=false`; + expect(() => new StreamProcessingClient(uri)).to.throw(Error, /TLS cannot be disabled/); + }); + }); + + describe('Processor lifecycle', function () { + let client: StreamProcessingClient; + let sps: StreamProcessors; + let proc: StreamProcessor; + + before(async function () { + client = new StreamProcessingClient(ASP_URI!, { + auth: { username: ASP_USERNAME, password: ASP_PASSWORD } + }); + sps = client.streamProcessors(); + proc = sps.get(PROCESSOR_NAME); + }); + + after(async function () { + // Best-effort cleanup: drop the processor if it still exists. + try { + await proc.drop(); + } catch { + // Ignore — processor may have already been dropped by the test. + } + await client?.close(); + }); + + it('creates a stream processor', async function () { + await sps.create(PROCESSOR_NAME, PIPELINE); + }); + + it('getInfo returns CREATED state after creation', async function () { + const info = await sps.getInfo(PROCESSOR_NAME); + expect(info.name).to.equal(PROCESSOR_NAME); + expect(info.state).to.be.a('string'); + expect(info.pipeline).to.be.an('array').with.lengthOf(2); + }); + + it('starts the processor', async function () { + await proc.start(); + // Brief pause so the server can transition state. + await setTimeout(2000); + const info = await sps.getInfo(PROCESSOR_NAME); + expect(info.state).to.be.a('string'); + }); + + it('returns stats without throwing', async function () { + let stats: Document | undefined; + try { + stats = await proc.stats(); + } catch (err) { + if (err instanceof MongoServerError) { + // Stats may be unavailable on dev deployments — not a failure. + this.skipReason = `Stats unavailable (code ${err.code}): ${err.message}`; + this.skip(); + } + throw err; + } + expect(stats).to.be.an('object'); + }); + + it('sample() returns a SampleCursor and yields documents', async function () { + let cursor: SampleCursor | undefined; + try { + cursor = proc.sample({ limit: 5 }); + expect(cursor).to.be.instanceOf(SampleCursor); + expect(cursor.alive).to.be.true; + + let count = 0; + for await (const doc of cursor) { + expect(doc).to.be.an('object'); + count += 1; + if (count >= 5) break; + } + // At least acknowledge the cursor was iterable — count may be 0 on an empty stream. + expect(count).to.be.at.least(0); + } catch (err) { + if (err instanceof MongoServerError) { + this.skipReason = `Sample unavailable (code ${err.code}): ${err.message}`; + this.skip(); + } + throw err; + } finally { + await cursor?.close(); + } + }); + + it('stops the processor', async function () { + await proc.stop(); + await setTimeout(1000); + const info = await sps.getInfo(PROCESSOR_NAME); + expect(info.state).to.be.a('string'); + }); + + it('drops the processor', async function () { + await proc.drop(); + }); + }); + + describe('Argument validation', function () { + let client: StreamProcessingClient; + let sps: StreamProcessors; + + before(async function () { + client = new StreamProcessingClient(ASP_URI!, { + auth: { username: ASP_USERNAME, password: ASP_PASSWORD } + }); + sps = client.streamProcessors(); + }); + + after(async function () { + await client?.close(); + }); + + it('create() rejects an empty processor name', async function () { + const err = await sps.create('', PIPELINE).catch(e => e); + expect(err).to.be.instanceOf(Error); + expect(err.message).to.match(/non-empty/); + }); + + it('create() rejects an empty pipeline', async function () { + const err = await sps.create('validName', []).catch(e => e); + expect(err).to.be.instanceOf(Error); + expect(err.message).to.match(/non-empty pipeline/); + }); + + it('get() rejects an empty processor name', function () { + expect(() => sps.get('')).to.throw(Error, /non-empty/); + }); + + it('start() rejects mutually exclusive startAfter + startAtOperationTime', async function () { + const proc = sps.get('dummy'); + const err = await proc + .start({ startAfter: {}, startAtOperationTime: { t: 1, i: 0 } as any }) + .catch(e => e); + expect(err).to.be.instanceOf(Error); + expect(err.message).to.match(/mutually exclusive/); + }); + + it('start() rejects non-positive workers', async function () { + const proc = sps.get('dummy'); + const err = await proc.start({ workers: 0 }).catch(e => e); + expect(err).to.be.instanceOf(Error); + expect(err.message).to.match(/workers/); + }); + + it('start() rejects an invalid tier', async function () { + const proc = sps.get('dummy'); + const err = await proc.start({ tier: 'SP999' as any }).catch(e => e); + expect(err).to.be.instanceOf(Error); + expect(err.message).to.match(/Invalid tier/); + }); + + it('stats() rejects non-positive scale', async function () { + const proc = sps.get('dummy'); + const err = await proc.stats({ scale: 0 }).catch(e => e); + expect(err).to.be.instanceOf(Error); + expect(err.message).to.match(/scale/); + }); + + it('getStreamProcessorSamples() rejects cursorId of 0 (exhausted)', async function () { + const proc = sps.get('dummy'); + const err = await proc.getStreamProcessorSamples({ cursorId: 0 }).catch(e => e); + expect(err).to.be.instanceOf(Error); + expect(err.message).to.match(/exhausted/); + }); + + it('getStreamProcessorSamples() rejects negative cursorId', async function () { + const proc = sps.get('dummy'); + const err = await proc.getStreamProcessorSamples({ cursorId: -1 }).catch(e => e); + expect(err).to.be.instanceOf(Error); + expect(err.message).to.match(/non-negative/); + }); + }); +}); diff --git a/test/mongodb_all.ts b/test/mongodb_all.ts index d98518f6f34..0329787b27c 100644 --- a/test/mongodb_all.ts +++ b/test/mongodb_all.ts @@ -108,6 +108,14 @@ export * from '../src/operations/run_command'; export * from '../src/operations/search_indexes/create'; export * from '../src/operations/search_indexes/drop'; export * from '../src/operations/search_indexes/update'; +export * from '../src/operations/stream_processing/create_stream_processor'; +export * from '../src/operations/stream_processing/drop_stream_processor'; +export * from '../src/operations/stream_processing/get_more_sample_stream_processor'; +export * from '../src/operations/stream_processing/get_stream_processor'; +export * from '../src/operations/stream_processing/get_stream_processor_stats'; +export * from '../src/operations/stream_processing/start_sample_stream_processor'; +export * from '../src/operations/stream_processing/start_stream_processor'; +export * from '../src/operations/stream_processing/stop_stream_processor'; export * from '../src/operations/set_profiling_level'; export * from '../src/operations/stats'; export * from '../src/operations/update'; @@ -127,6 +135,7 @@ export * from '../src/sdam/topology'; export * from '../src/sdam/topology_description'; export * from '../src/sessions'; export * from '../src/sort'; +export * from '../src/stream_processing'; export * from '../src/timeout'; export * from '../src/transactions'; export * from '../src/utils'; diff --git a/test/unit/stream_processing/stream_processing.test.ts b/test/unit/stream_processing/stream_processing.test.ts new file mode 100644 index 00000000000..9b305e423ba --- /dev/null +++ b/test/unit/stream_processing/stream_processing.test.ts @@ -0,0 +1,774 @@ +import * as fs from 'fs'; +import * as path from 'path'; + +import { expect } from 'chai'; +import * as sinon from 'sinon'; + +import { + Aspect, + CreateStreamProcessorOperation, + DropStreamProcessorOperation, + GetMoreSampleStreamProcessorOperation, + GetStreamProcessorOperation, + GetStreamProcessorStatsOperation, + MongoInvalidArgumentError, + MongoParseError, + MongoServerError, + SampleCursor, + StartSampleStreamProcessorOperation, + StartStreamProcessorOperation, + StopStreamProcessorOperation, + StreamProcessingClient, + StreamProcessor, + StreamProcessors +} from '../../mongodb'; +import * as executeOperationModule from '../../../src/operations/execute_operation'; +import { isWorkspaceEndpoint } from '../../../src/stream_processing/stream_processing_client'; + +// --------------------------------------------------------------------------- +// helpers +// --------------------------------------------------------------------------- + +/** Build a minimal StreamProcessingClient without opening any sockets. */ +function makeClient(url = 'mongodb://localhost:27017/', opts = {}) { + return new StreamProcessingClient(url, opts); +} + +/** Build StreamProcessors backed by a fake client. */ +function makeStreamProcessors(url = 'mongodb://localhost:27017/') { + return makeClient(url).streamProcessors(); +} + +// --------------------------------------------------------------------------- +// StreamProcessingClient construction +// --------------------------------------------------------------------------- + +describe('StreamProcessingClient construction', function () { + it('rejects mongodb+srv:// URI', function () { + expect(() => makeClient('mongodb+srv://host/db')).to.throw( + MongoParseError, + /mongodb\+srv/ + ); + }); + + it('rejects tls: false option', function () { + expect(() => makeClient('mongodb://localhost/', { tls: false })).to.throw( + MongoParseError, + /TLS cannot be disabled/ + ); + }); + + it('rejects ssl: false option', function () { + expect(() => makeClient('mongodb://localhost/', { ssl: false })).to.throw( + MongoParseError, + /TLS cannot be disabled/ + ); + }); + + it('rejects ?tls=false in URI query string', function () { + expect(() => makeClient('mongodb://localhost/?tls=false')).to.throw( + MongoParseError, + /TLS cannot be disabled/ + ); + }); + + it('rejects ?ssl=false in URI query string', function () { + expect(() => makeClient('mongodb://localhost/?ssl=false')).to.throw( + MongoParseError, + /TLS cannot be disabled/ + ); + }); + + it('forces tls: true on the underlying MongoClient', function () { + const client = makeClient('mongodb://localhost/'); + expect(client._mongoClient.options).to.have.property('tls', true); + }); + + it('defaults authSource to admin when not set in URI or options', function () { + // authSource ends up in credentials.source after MongoClient parses options; + // credentials are only created when auth is present, so we must pass auth. + const client = makeClient('mongodb://localhost/', { auth: { username: 'u', password: 'p' } }); + expect(client._mongoClient.options.credentials?.source).to.equal('admin'); + }); + + it('preserves explicit authSource from options', function () { + const client = makeClient('mongodb://localhost/', { + auth: { username: 'u', password: 'p' }, + authSource: 'mydb' + }); + expect(client._mongoClient.options.credentials?.source).to.equal('mydb'); + }); + + it('preserves explicit authSource from URI query string', function () { + const client = makeClient('mongodb://u:p@localhost/?authSource=mydb'); + expect(client._mongoClient.options.credentials?.source).to.equal('mydb'); + }); + + it('drops the ssl option so it does not shadow tls', function () { + // ssl is deleted from mergedOptions before passing to MongoClient + // so the internal options should only show tls: true, not ssl + const client = makeClient('mongodb://localhost/'); + expect(client._mongoClient.options).not.to.have.property('ssl'); + expect(client._mongoClient.options).to.have.property('tls', true); + }); +}); + +// --------------------------------------------------------------------------- +// isWorkspaceEndpoint +// --------------------------------------------------------------------------- + +describe('isWorkspaceEndpoint', function () { + it('returns true for atlas-stream- prefix', function () { + expect(isWorkspaceEndpoint('atlas-stream-foo.virginia-usa.a.query.mongodb.net')).to.be.true; + }); + + it('returns true for .a.query.mongodb.net suffix', function () { + expect(isWorkspaceEndpoint('something.a.query.mongodb.net')).to.be.true; + }); + + it('returns false for a normal Atlas cluster hostname', function () { + expect(isWorkspaceEndpoint('cluster0.mongodb.net')).to.be.false; + }); + + it('returns false for localhost', function () { + expect(isWorkspaceEndpoint('localhost')).to.be.false; + }); +}); + +// --------------------------------------------------------------------------- +// Operation command documents & aspects +// --------------------------------------------------------------------------- + +describe('Operation command documents', function () { + const conn = undefined as any; + + describe('CreateStreamProcessorOperation', function () { + it('builds minimal command with no options', function () { + const op = new CreateStreamProcessorOperation('sp1', [{ $source: {} }]); + const cmd = op.buildCommandDocument(conn); + expect(cmd).to.deep.equal({ createStreamProcessor: 'sp1', pipeline: [{ $source: {} }] }); + }); + + it('includes options sub-doc when aspOptions are given', function () { + const op = new CreateStreamProcessorOperation('sp1', [{ $source: {} }], { + tier: 'SP10', + streamMetaFieldName: '__stream', + failover: true + }); + const cmd = op.buildCommandDocument(conn); + expect(cmd).to.have.property('options'); + expect(cmd.options).to.deep.include({ tier: 'SP10', streamMetaFieldName: '__stream', failover: true }); + }); + + it('omits options key when aspOptions object is empty after filtering', function () { + const op = new CreateStreamProcessorOperation('sp1', [{ $source: {} }], {}); + const cmd = op.buildCommandDocument(conn); + expect(cmd).not.to.have.property('options'); + }); + + it('is a WRITE operation, not retryable read', function () { + const op = new CreateStreamProcessorOperation('sp1', [{ $source: {} }]); + expect(op.hasAspect(Aspect.WRITE_OPERATION)).to.be.true; + expect(op.canRetryRead).to.be.false; + }); + }); + + describe('DropStreamProcessorOperation', function () { + it('builds correct command', function () { + const op = new DropStreamProcessorOperation('sp1'); + expect(op.buildCommandDocument(conn)).to.deep.equal({ dropStreamProcessor: 'sp1' }); + }); + + it('is a WRITE operation', function () { + const op = new DropStreamProcessorOperation('sp1'); + expect(op.hasAspect(Aspect.WRITE_OPERATION)).to.be.true; + }); + }); + + describe('StopStreamProcessorOperation', function () { + it('builds correct command', function () { + const op = new StopStreamProcessorOperation('sp1'); + expect(op.buildCommandDocument(conn)).to.deep.equal({ stopStreamProcessor: 'sp1' }); + }); + + it('is a WRITE operation', function () { + const op = new StopStreamProcessorOperation('sp1'); + expect(op.hasAspect(Aspect.WRITE_OPERATION)).to.be.true; + }); + }); + + describe('GetStreamProcessorOperation', function () { + it('builds correct command', function () { + const op = new GetStreamProcessorOperation('sp1'); + expect(op.buildCommandDocument(conn)).to.deep.equal({ getStreamProcessor: 'sp1' }); + }); + + it('is a retryable READ operation', function () { + const op = new GetStreamProcessorOperation('sp1'); + expect(op.hasAspect(Aspect.READ_OPERATION)).to.be.true; + expect(op.hasAspect(Aspect.RETRYABLE)).to.be.true; + expect(op.canRetryRead).to.be.true; + }); + }); + + describe('GetStreamProcessorStatsOperation', function () { + it('builds minimal command with no options', function () { + const op = new GetStreamProcessorStatsOperation('sp1'); + expect(op.buildCommandDocument(conn)).to.deep.equal({ getStreamProcessorStats: 'sp1' }); + }); + + it('includes scale when provided', function () { + const op = new GetStreamProcessorStatsOperation('sp1', { scale: 1024 }); + const cmd = op.buildCommandDocument(conn); + expect(cmd).to.have.property('scale', 1024); + }); + + it('includes verbose when provided', function () { + const op = new GetStreamProcessorStatsOperation('sp1', { verbose: true }); + const cmd = op.buildCommandDocument(conn); + expect(cmd).to.have.property('verbose', true); + }); + + it('is a retryable READ operation', function () { + const op = new GetStreamProcessorStatsOperation('sp1'); + expect(op.hasAspect(Aspect.READ_OPERATION)).to.be.true; + expect(op.hasAspect(Aspect.RETRYABLE)).to.be.true; + expect(op.canRetryRead).to.be.true; + }); + }); + + describe('StartStreamProcessorOperation', function () { + it('builds minimal command with no options', function () { + const op = new StartStreamProcessorOperation('sp1'); + expect(op.buildCommandDocument(conn)).to.deep.equal({ startStreamProcessor: 'sp1' }); + }); + + it('places workers at the top level', function () { + const op = new StartStreamProcessorOperation('sp1', { workers: 3 }); + const cmd = op.buildCommandDocument(conn); + expect(cmd).to.have.property('workers', 3); + expect(cmd).not.to.have.nested.property('options.workers'); + }); + + it('places clearCheckpoints, tier, enableAutoScaling, failover inside options sub-doc', function () { + const op = new StartStreamProcessorOperation('sp1', { + clearCheckpoints: true, + tier: 'SP30', + enableAutoScaling: false, + failover: { enabled: true } + }); + const cmd = op.buildCommandDocument(conn); + expect(cmd).to.have.property('options'); + expect(cmd.options).to.deep.include({ + clearCheckpoints: true, + tier: 'SP30', + enableAutoScaling: false, + failover: { enabled: true } + }); + }); + + it('omits options sub-doc when none of the nested opts are given', function () { + const op = new StartStreamProcessorOperation('sp1', { workers: 2 }); + const cmd = op.buildCommandDocument(conn); + expect(cmd).not.to.have.property('options'); + }); + + it('is a WRITE operation', function () { + const op = new StartStreamProcessorOperation('sp1'); + expect(op.hasAspect(Aspect.WRITE_OPERATION)).to.be.true; + }); + }); + + describe('StartSampleStreamProcessorOperation', function () { + it('builds minimal command with no limit', function () { + const op = new StartSampleStreamProcessorOperation('sp1'); + expect(op.buildCommandDocument(conn)).to.deep.equal({ startSampleStreamProcessor: 'sp1' }); + }); + + it('includes limit when provided', function () { + const op = new StartSampleStreamProcessorOperation('sp1', 10); + const cmd = op.buildCommandDocument(conn); + expect(cmd).to.have.property('limit', 10); + }); + + it('is a WRITE operation', function () { + const op = new StartSampleStreamProcessorOperation('sp1'); + expect(op.hasAspect(Aspect.WRITE_OPERATION)).to.be.true; + }); + }); + + describe('GetMoreSampleStreamProcessorOperation', function () { + it('builds command with cursorId', function () { + const op = new GetMoreSampleStreamProcessorOperation('sp1', 42); + const cmd = op.buildCommandDocument(conn); + expect(cmd).to.have.property('getMoreSampleStreamProcessor', 'sp1'); + expect(cmd).to.have.property('cursorId', 42); + }); + + it('includes batchSize when provided', function () { + const op = new GetMoreSampleStreamProcessorOperation('sp1', 42, 5); + const cmd = op.buildCommandDocument(conn); + expect(cmd).to.have.property('batchSize', 5); + }); + + it('omits batchSize when not provided', function () { + const op = new GetMoreSampleStreamProcessorOperation('sp1', 42); + expect(op.buildCommandDocument(conn)).not.to.have.property('batchSize'); + }); + + it('is a WRITE operation', function () { + const op = new GetMoreSampleStreamProcessorOperation('sp1', 42); + expect(op.hasAspect(Aspect.WRITE_OPERATION)).to.be.true; + }); + }); +}); + +// --------------------------------------------------------------------------- +// StreamProcessors (stub executeOperation) +// --------------------------------------------------------------------------- + +describe('StreamProcessors', function () { + let executeStub: sinon.SinonStub; + let sps: StreamProcessors; + + beforeEach(function () { + executeStub = sinon.stub(executeOperationModule, 'executeOperation'); + sps = makeStreamProcessors(); + }); + + afterEach(function () { + sinon.restore(); + }); + + describe('create()', function () { + it('sends createStreamProcessor command', async function () { + executeStub.resolves({}); + await sps.create('sp1', [{ $source: {} }]); + expect(executeStub.calledOnce).to.be.true; + const op: CreateStreamProcessorOperation = executeStub.firstCall.args[1]; + expect(op).to.be.instanceOf(CreateStreamProcessorOperation); + expect(op.processorName).to.equal('sp1'); + expect(op.pipeline).to.deep.equal([{ $source: {} }]); + }); + + it('rejects empty processor name before wire call', async function () { + const err = await sps.create('', [{ $source: {} }]).catch(e => e); + expect(err).to.be.instanceOf(MongoInvalidArgumentError); + expect(err.message).to.match(/non-empty/); + expect(executeStub.called).to.be.false; + }); + + it('rejects whitespace-only processor name before wire call', async function () { + const err = await sps.create(' ', [{ $source: {} }]).catch(e => e); + expect(err).to.be.instanceOf(MongoInvalidArgumentError); + expect(executeStub.called).to.be.false; + }); + + it('rejects empty pipeline before wire call', async function () { + const err = await sps.create('sp1', []).catch(e => e); + expect(err).to.be.instanceOf(MongoInvalidArgumentError); + expect(err.message).to.match(/non-empty pipeline/); + expect(executeStub.called).to.be.false; + }); + }); + + describe('get()', function () { + it('returns a StreamProcessor handle without making a wire call', function () { + const proc = sps.get('sp1'); + expect(proc).to.be.instanceOf(StreamProcessor); + expect(proc.name).to.equal('sp1'); + expect(executeStub.called).to.be.false; + }); + + it('rejects empty name', function () { + expect(() => sps.get('')).to.throw(MongoInvalidArgumentError, /non-empty/); + }); + }); + + describe('getInfo()', function () { + it('sends getStreamProcessor command', async function () { + executeStub.resolves({ name: 'sp1', state: 'CREATED', pipeline: [] }); + await sps.getInfo('sp1'); + const op: GetStreamProcessorOperation = executeStub.firstCall.args[1]; + expect(op).to.be.instanceOf(GetStreamProcessorOperation); + expect(op.processorName).to.equal('sp1'); + }); + + it('unwraps the { ok, result } envelope from dev server', async function () { + const inner = { name: 'sp1', state: 'STARTED', pipeline: [{ $source: {} }], pipelineVersion: 2, unknownField: 'kept' }; + executeStub.resolves({ ok: 1, result: inner }); + const info = await sps.getInfo('sp1'); + expect(info.state).to.equal('STARTED'); + expect(info.pipelineVersion).to.equal(2); + }); + + it('handles a flat response from Atlas (no result envelope)', async function () { + const flat = { name: 'sp1', state: 'STOPPED', pipeline: [], hasStarted: true }; + executeStub.resolves(flat); + const info = await sps.getInfo('sp1'); + expect(info.state).to.equal('STOPPED'); + expect(info.hasStarted).to.be.true; + }); + + it('preserves unknown fields on .raw', async function () { + const doc = { name: 'sp1', state: 'CREATED', pipeline: [], brandNewField: 'future' }; + executeStub.resolves(doc); + const info = await sps.getInfo('sp1'); + expect(info.raw).to.have.property('brandNewField', 'future'); + }); + + it('works when id and pipelineVersion are absent', async function () { + executeStub.resolves({ name: 'sp1', state: 'CREATED', pipeline: [] }); + const info = await sps.getInfo('sp1'); + expect(info.id).to.be.undefined; + expect(info.pipelineVersion).to.be.undefined; + }); + + it('returns an unknown state string as-is (not enumerated)', async function () { + executeStub.resolves({ name: 'sp1', state: 'SOME_FUTURE_STATE', pipeline: [] }); + const info = await sps.getInfo('sp1'); + expect(info.state).to.equal('SOME_FUTURE_STATE'); + }); + + it('defaults pipeline to [] when absent from server response', async function () { + executeStub.resolves({ name: 'sp1', state: 'CREATED' }); + const info = await sps.getInfo('sp1'); + expect(info.pipeline).to.deep.equal([]); + }); + }); +}); + +// --------------------------------------------------------------------------- +// StreamProcessor lifecycle (stub executeOperation) +// --------------------------------------------------------------------------- + +describe('StreamProcessor lifecycle', function () { + let executeStub: sinon.SinonStub; + let proc: StreamProcessor; + + beforeEach(function () { + executeStub = sinon.stub(executeOperationModule, 'executeOperation'); + proc = makeStreamProcessors().get('sp1'); + }); + + afterEach(function () { + sinon.restore(); + }); + + describe('start()', function () { + it('sends startStreamProcessor command', async function () { + executeStub.resolves({}); + await proc.start(); + const op: StartStreamProcessorOperation = executeStub.firstCall.args[1]; + expect(op).to.be.instanceOf(StartStreamProcessorOperation); + expect(op.processorName).to.equal('sp1'); + }); + + it('rejects startAfter + startAtOperationTime together before wire call', async function () { + const err = await proc.start({ startAfter: {}, startAtOperationTime: { t: 1, i: 0 } as any }).catch(e => e); + expect(err).to.be.instanceOf(MongoInvalidArgumentError); + expect(err.message).to.match(/mutually exclusive/); + expect(executeStub.called).to.be.false; + }); + + it('rejects an invalid tier before wire call', async function () { + const err = await proc.start({ tier: 'SP999' as any }).catch(e => e); + expect(err).to.be.instanceOf(MongoInvalidArgumentError); + expect(err.message).to.match(/Invalid tier/); + expect(executeStub.called).to.be.false; + }); + + it('rejects workers <= 0 before wire call', async function () { + const err = await proc.start({ workers: 0 }).catch(e => e); + expect(err).to.be.instanceOf(MongoInvalidArgumentError); + expect(err.message).to.match(/workers/); + expect(executeStub.called).to.be.false; + }); + }); + + describe('stop()', function () { + it('sends stopStreamProcessor command', async function () { + executeStub.resolves({}); + await proc.stop(); + const op: StopStreamProcessorOperation = executeStub.firstCall.args[1]; + expect(op).to.be.instanceOf(StopStreamProcessorOperation); + expect(op.processorName).to.equal('sp1'); + }); + }); + + describe('drop()', function () { + it('sends dropStreamProcessor command', async function () { + executeStub.resolves({}); + await proc.drop(); + const op: DropStreamProcessorOperation = executeStub.firstCall.args[1]; + expect(op).to.be.instanceOf(DropStreamProcessorOperation); + expect(op.processorName).to.equal('sp1'); + }); + }); + + describe('stats()', function () { + it('sends getStreamProcessorStats command', async function () { + executeStub.resolves({ bytesIn: 100 }); + await proc.stats(); + const op: GetStreamProcessorStatsOperation = executeStub.firstCall.args[1]; + expect(op).to.be.instanceOf(GetStreamProcessorStatsOperation); + expect(op.processorName).to.equal('sp1'); + }); + + it('returns the raw response preserving unknown fields', async function () { + const raw = { bytesIn: 100, futureField: 'x' }; + executeStub.resolves(raw); + const result = await proc.stats(); + expect(result).to.deep.equal(raw); + }); + + it('rejects scale <= 0 before wire call', async function () { + const err = await proc.stats({ scale: 0 }).catch(e => e); + expect(err).to.be.instanceOf(MongoInvalidArgumentError); + expect(err.message).to.match(/scale/); + expect(executeStub.called).to.be.false; + }); + }); + + describe('getStreamProcessorSamples()', function () { + it('sends startSampleStreamProcessor when no cursorId given', async function () { + executeStub.resolves({ cursorId: 1, messages: [] }); + await proc.getStreamProcessorSamples(); + const op: StartSampleStreamProcessorOperation = executeStub.firstCall.args[1]; + expect(op).to.be.instanceOf(StartSampleStreamProcessorOperation); + }); + + it('sends getMoreSampleStreamProcessor when cursorId is present', async function () { + executeStub.resolves({ cursorId: 0, messages: [] }); + await proc.getStreamProcessorSamples({ cursorId: 42 }); + const op: GetMoreSampleStreamProcessorOperation = executeStub.firstCall.args[1]; + expect(op).to.be.instanceOf(GetMoreSampleStreamProcessorOperation); + }); + + it('rejects cursorId of 0 (exhausted) before wire call', async function () { + const err = await proc.getStreamProcessorSamples({ cursorId: 0 }).catch(e => e); + expect(err).to.be.instanceOf(MongoInvalidArgumentError); + expect(err.message).to.match(/exhausted/); + expect(executeStub.called).to.be.false; + }); + + it('rejects negative cursorId before wire call', async function () { + const err = await proc.getStreamProcessorSamples({ cursorId: -1 }).catch(e => e); + expect(err).to.be.instanceOf(MongoInvalidArgumentError); + expect(err.message).to.match(/non-negative/); + expect(executeStub.called).to.be.false; + }); + + it('handles messages shape (Atlas real server)', async function () { + const docs = [{ a: 1 }, { a: 2 }]; + executeStub.resolves({ cursorId: 0, messages: docs }); + const result = await proc.getStreamProcessorSamples(); + expect(result.documents).to.deep.equal(docs); + }); + + it('handles firstBatch shape (spec)', async function () { + const docs = [{ b: 1 }]; + executeStub.resolves({ cursorId: 0, firstBatch: docs }); + const result = await proc.getStreamProcessorSamples(); + expect(result.documents).to.deep.equal(docs); + }); + + it('handles nextBatch shape (dev server continuation)', async function () { + const docs = [{ c: 1 }]; + executeStub.resolves({ cursorId: 0, nextBatch: docs }); + const result = await proc.getStreamProcessorSamples({ cursorId: 99 }); + expect(result.documents).to.deep.equal(docs); + }); + + it('normalises bigint cursorId to number in result', async function () { + executeStub.resolves({ cursorId: BigInt(7), messages: [] }); + const result = await proc.getStreamProcessorSamples(); + expect(result.cursorId).to.equal(BigInt(7)); + }); + + it('defaults documents to [] when batch key is absent', async function () { + executeStub.resolves({ cursorId: 0 }); + const result = await proc.getStreamProcessorSamples(); + expect(result.documents).to.deep.equal([]); + }); + }); +}); + +// --------------------------------------------------------------------------- +// SampleCursor +// --------------------------------------------------------------------------- + +describe('SampleCursor', function () { + let getSampleStub: sinon.SinonStub; + let fakeProc: StreamProcessor; + + beforeEach(function () { + fakeProc = makeStreamProcessors().get('sp1'); + getSampleStub = sinon.stub(fakeProc, 'getStreamProcessorSamples'); + }); + + afterEach(function () { + sinon.restore(); + }); + + it('does not extend AbstractCursor or any other base class', function () { + expect(Object.getPrototypeOf(SampleCursor.prototype)).to.equal(Object.prototype); + }); + + it('starts with cursorId null, alive true', function () { + const cursor = new SampleCursor(fakeProc); + expect(cursor.cursorId).to.be.null; + expect(cursor.alive).to.be.true; + }); + + it('close() sets alive to false', async function () { + const cursor = new SampleCursor(fakeProc); + await cursor.close(); + expect(cursor.alive).to.be.false; + }); + + it('cursorId goes null → N → 0 across lifecycle', async function () { + getSampleStub.onFirstCall().resolves({ cursorId: 5, documents: [{ x: 1 }] }); + getSampleStub.onSecondCall().resolves({ cursorId: 0, documents: [] }); + + const cursor = new SampleCursor(fakeProc); + expect(cursor.cursorId).to.be.null; + + const iter = cursor[Symbol.asyncIterator](); + await iter.next(); // drains doc from first batch + expect(cursor.cursorId).to.equal(5); + + await iter.next(); // triggers refill → cursorId 0 + expect(Number(cursor.cursorId)).to.equal(0); + expect(cursor.alive).to.be.false; + }); + + it('drains a single batch and stops', async function () { + getSampleStub.resolves({ cursorId: 0, documents: [{ a: 1 }, { a: 2 }] }); + + const docs: unknown[] = []; + for await (const doc of new SampleCursor(fakeProc)) { + docs.push(doc); + } + expect(docs).to.deep.equal([{ a: 1 }, { a: 2 }]); + expect(getSampleStub.calledOnce).to.be.true; + }); + + it('drains multiple batches (3 calls, last returns cursorId 0)', async function () { + getSampleStub.onFirstCall().resolves({ cursorId: 1, documents: [{ n: 1 }] }); + getSampleStub.onSecondCall().resolves({ cursorId: 2, documents: [{ n: 2 }] }); + getSampleStub.onThirdCall().resolves({ cursorId: 0, documents: [{ n: 3 }] }); + + const docs: unknown[] = []; + for await (const doc of new SampleCursor(fakeProc)) { + docs.push(doc); + } + expect(docs).to.deep.equal([{ n: 1 }, { n: 2 }, { n: 3 }]); + expect(getSampleStub.calledThrice).to.be.true; + }); + + it('continues polling on empty batch with non-zero cursorId', async function () { + getSampleStub.onFirstCall().resolves({ cursorId: 9, documents: [] }); + getSampleStub.onSecondCall().resolves({ cursorId: 0, documents: [{ z: 1 }] }); + + const docs: unknown[] = []; + for await (const doc of new SampleCursor(fakeProc)) { + docs.push(doc); + } + expect(docs).to.deep.equal([{ z: 1 }]); + expect(getSampleStub.calledTwice).to.be.true; + }); + + it('makes no extra wire call after cursorId 0', async function () { + getSampleStub.resolves({ cursorId: 0, documents: [{ a: 1 }] }); + + const docs: unknown[] = []; + for await (const doc of new SampleCursor(fakeProc)) { + docs.push(doc); + } + expect(getSampleStub.calledOnce).to.be.true; + }); + + it('stops iteration when close() is called mid-stream', async function () { + getSampleStub.onFirstCall().resolves({ cursorId: 1, documents: [{ a: 1 }, { a: 2 }, { a: 3 }] }); + + const cursor = new SampleCursor(fakeProc); + const docs: unknown[] = []; + for await (const doc of cursor) { + docs.push(doc); + if (docs.length === 1) await cursor.close(); + } + expect(docs).to.have.lengthOf(1); + expect(cursor.alive).to.be.false; + }); + + it('passes limit to the initial call', async function () { + getSampleStub.resolves({ cursorId: 0, documents: [] }); + const cursor = new SampleCursor(fakeProc, 7); + for await (const _ of cursor) { /* drain */ } + expect(getSampleStub.firstCall.args[0]).to.deep.include({ limit: 7 }); + expect(getSampleStub.firstCall.args[0]).not.to.have.property('cursorId'); + }); + + it('passes batchSize to continuation calls', async function () { + getSampleStub.onFirstCall().resolves({ cursorId: 3, documents: [{ a: 1 }] }); + getSampleStub.onSecondCall().resolves({ cursorId: 0, documents: [] }); + + const cursor = new SampleCursor(fakeProc, undefined, 20); + for await (const _ of cursor) { /* drain */ } + + const secondCallArg = getSampleStub.secondCall.args[0]; + expect(secondCallArg).to.have.property('batchSize', 20); + expect(secondCallArg).to.have.property('cursorId', 3); + expect(secondCallArg).not.to.have.property('limit'); + }); +}); + +// --------------------------------------------------------------------------- +// Error propagation +// --------------------------------------------------------------------------- + +describe('Error propagation', function () { + let executeStub: sinon.SinonStub; + let proc: StreamProcessor; + let sps: StreamProcessors; + + beforeEach(function () { + executeStub = sinon.stub(executeOperationModule, 'executeOperation'); + sps = makeStreamProcessors(); + proc = sps.get('sp1'); + }); + + afterEach(function () { + sinon.restore(); + }); + + const serverError = new MongoServerError({ message: 'boom', code: 125 }); + + for (const [label, action] of [ + ['create()', () => sps.create('sp1', [{ $source: {} }])], + ['getInfo()', () => sps.getInfo('sp1')], + ['start()', () => proc.start()], + ['stop()', () => proc.stop()], + ['drop()', () => proc.drop()], + ['stats()', () => proc.stats()], + ['getStreamProcessorSamples()', () => proc.getStreamProcessorSamples()], + ] as const) { + it(`propagates MongoServerError from ${label} unchanged`, async function () { + executeStub.rejects(serverError); + const err = await action().catch(e => e); + expect(err).to.equal(serverError); + expect(err.code).to.equal(125); + }); + } + + it('src/stream_processing/ files contain no catch blocks', function () { + const dir = path.resolve(__dirname, '../../../src/stream_processing'); + const files = fs.readdirSync(dir).filter(f => f.endsWith('.ts')); + const catchPattern = /\bcatch\s*\(/; + for (const file of files) { + const src = fs.readFileSync(path.join(dir, file), 'utf8'); + expect(src, `${file} must not contain catch blocks`).to.not.match(catchPattern); + } + }); +});