Skip to content

Commit e60a71e

Browse files
committed
ASP driver changes
1 parent 1a0ec07 commit e60a71e

19 files changed

Lines changed: 2146 additions & 1 deletion

package-lock.json

Lines changed: 0 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

poc_asp.ts

Lines changed: 179 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
/**
2+
* POC: Atlas Stream Processing — create / start / stop / drop a stream processor.
3+
*
4+
* Run with:
5+
* npx ts-node --skipProject poc_asp.ts
6+
*
7+
* Pipeline used:
8+
* $source → sample_stream_solar
9+
* $emit → __testLog
10+
*/
11+
12+
import { MongoServerError } from './src/error';
13+
import { StreamProcessingClient } from './src/stream_processing/stream_processing_client';
14+
15+
// ---------------------------------------------------------------------------
16+
// Configuration
17+
// ---------------------------------------------------------------------------
18+
19+
const WORKSPACE_URI =
20+
'mongodb://atlas-stream-69ed590869155100cecc8b33-lulzki.virginia-usa.a.query.mongodb-dev.net/';
21+
const USERNAME = 'streams';
22+
const PASSWORD = 'letsdostreaming123';
23+
24+
const PROCESSOR_NAME = 'simpletestSP_node';
25+
26+
const PIPELINE = [
27+
{
28+
$source: {
29+
connectionName: 'sample_stream_solar'
30+
}
31+
},
32+
{
33+
$emit: {
34+
connectionName: '__testLog'
35+
}
36+
}
37+
];
38+
39+
// ---------------------------------------------------------------------------
40+
// Helpers
41+
// ---------------------------------------------------------------------------
42+
43+
const sleep = (ms: number): Promise<void> => new Promise(resolve => setTimeout(resolve, ms));
44+
45+
// ---------------------------------------------------------------------------
46+
// POC steps
47+
// ---------------------------------------------------------------------------
48+
49+
async function main(): Promise<void> {
50+
const client = new StreamProcessingClient(WORKSPACE_URI, {
51+
auth: { username: USERNAME, password: PASSWORD }
52+
});
53+
54+
try {
55+
const sps = client.streamProcessors();
56+
57+
// ------------------------------------------------------------------
58+
// 1. Create
59+
// ------------------------------------------------------------------
60+
console.log(`\n[1] Creating processor '${PROCESSOR_NAME}' ...`);
61+
try {
62+
await sps.create(PROCESSOR_NAME, PIPELINE);
63+
console.log(' Created OK');
64+
} catch (e) {
65+
if (e instanceof MongoServerError) {
66+
throw new Error(` Create failed (code ${e.code}): ${e.message}`);
67+
}
68+
throw e;
69+
}
70+
71+
// ------------------------------------------------------------------
72+
// 2. Inspect before starting
73+
// ------------------------------------------------------------------
74+
console.log('\n[2] Getting info ...');
75+
let info = await sps.getInfo(PROCESSOR_NAME);
76+
console.log(` state : ${info.state}`);
77+
console.log(` pipelineVersion : ${info.pipelineVersion}`);
78+
console.log(` hasStarted : ${info.hasStarted}`);
79+
80+
// ------------------------------------------------------------------
81+
// 3. Start
82+
// ------------------------------------------------------------------
83+
const proc = sps.get(PROCESSOR_NAME);
84+
console.log('\n[3] Starting processor ...');
85+
try {
86+
await proc.start();
87+
console.log(' Start command sent OK');
88+
} catch (e) {
89+
if (e instanceof MongoServerError) {
90+
throw new Error(` Start failed (code ${e.code}): ${e.message}`);
91+
}
92+
throw e;
93+
}
94+
95+
await sleep(2000);
96+
97+
info = await sps.getInfo(PROCESSOR_NAME);
98+
console.log(` state after start: ${info.state}`);
99+
100+
// ------------------------------------------------------------------
101+
// 4. Stats
102+
// ------------------------------------------------------------------
103+
console.log('\n[4] Fetching stats ...');
104+
try {
105+
const rawStats = await proc.stats();
106+
console.dir(rawStats, { depth: null });
107+
} catch (e) {
108+
if (e instanceof MongoServerError) {
109+
console.log(` Stats unavailable (code ${e.code}): ${e.message}`);
110+
} else {
111+
throw e;
112+
}
113+
}
114+
115+
// ------------------------------------------------------------------
116+
// 5. Sample (up to 5 docs)
117+
// Note: breaking manually after N docs because the dev server does not
118+
// signal cursor exhaustion with cursorId=0 as the spec requires.
119+
// ------------------------------------------------------------------
120+
console.log('\n[5] Sampling up to 5 documents ...');
121+
try {
122+
let count = 0;
123+
for await (const doc of proc.sample()) {
124+
console.log(` doc: ${JSON.stringify(doc)}`);
125+
count += 1;
126+
if (count >= 5) break;
127+
}
128+
console.log(` Sampled ${count} document(s)`);
129+
} catch (e) {
130+
if (e instanceof MongoServerError) {
131+
console.log(` Sample unavailable (code ${e.code}): ${e.message}`);
132+
} else {
133+
throw e;
134+
}
135+
}
136+
137+
// ------------------------------------------------------------------
138+
// 6. Stop
139+
// ------------------------------------------------------------------
140+
console.log('\n[6] Stopping processor ...');
141+
try {
142+
await proc.stop();
143+
console.log(' Stop command sent OK');
144+
} catch (e) {
145+
if (e instanceof MongoServerError) {
146+
throw new Error(` Stop failed (code ${e.code}): ${e.message}`);
147+
}
148+
throw e;
149+
}
150+
151+
await sleep(1000);
152+
153+
info = await sps.getInfo(PROCESSOR_NAME);
154+
console.log(` state after stop : ${info.state}`);
155+
156+
// ------------------------------------------------------------------
157+
// 7. Drop (permanent — comment out to keep the processor alive)
158+
// ------------------------------------------------------------------
159+
console.log('\n[7] Dropping processor ...');
160+
try {
161+
await proc.drop();
162+
console.log(' Dropped OK');
163+
} catch (e) {
164+
if (e instanceof MongoServerError) {
165+
throw new Error(` Drop failed (code ${e.code}): ${e.message}`);
166+
}
167+
throw e;
168+
}
169+
170+
console.log('\nDone.');
171+
} finally {
172+
await client.close();
173+
}
174+
}
175+
176+
main().catch(err => {
177+
console.error(err);
178+
process.exitCode = 1;
179+
});

src/index.ts

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ export {
4242
MongoBulkWriteError
4343
} from './bulk/common';
4444
export { ClientEncryption } from './client-side-encryption/client_encryption';
45+
// Atlas Stream Processing (experimental)
4546
export { ChangeStreamCursor } from './cursor/change_stream_cursor';
4647
export { ExplainableCursor } from './cursor/explainable_cursor';
4748
export {
@@ -87,6 +88,12 @@ export {
8788
MongoWriteConcernError,
8889
WriteConcernErrorResult
8990
} from './error';
91+
export {
92+
SampleCursor,
93+
StreamProcessingClient,
94+
StreamProcessor,
95+
StreamProcessors
96+
} from './stream_processing';
9097
export {
9198
AbstractCursor,
9299
// Actual driver classes exported
@@ -612,6 +619,15 @@ export type {
612619
WithTransactionCallback
613620
} from './sessions';
614621
export type { Sort, SortDirection, SortDirectionForCmd, SortForCmd } from './sort';
622+
export type {
623+
CreateStreamProcessorOptions,
624+
GetStreamProcessorSamplesOptions,
625+
GetStreamProcessorSamplesResult,
626+
GetStreamProcessorStatsOptions,
627+
StartStreamProcessorOptions,
628+
StreamProcessorInfo,
629+
StreamProcessorTier
630+
} from './stream_processing';
615631
export type {
616632
CSOTTimeoutContext,
617633
CSOTTimeoutContextOptions,
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
import { type Connection } from '../..';
2+
import type { Document } from '../../bson';
3+
import { MongoDBResponse } from '../../cmap/wire_protocol/responses';
4+
import type { ClientSession } from '../../sessions';
5+
import type { CreateStreamProcessorOptions } from '../../stream_processing/types';
6+
import { CommandOperation, type CommandOperationOptions } from '../command';
7+
import { Aspect, defineAspects } from '../operation';
8+
9+
/** @internal */
10+
export class CreateStreamProcessorOperation extends CommandOperation<Document> {
11+
override SERVER_COMMAND_RESPONSE_TYPE = MongoDBResponse;
12+
13+
constructor(
14+
readonly processorName: string,
15+
readonly pipeline: Document[],
16+
readonly aspOptions?: CreateStreamProcessorOptions,
17+
options?: CommandOperationOptions
18+
) {
19+
super(undefined, options);
20+
}
21+
22+
override get commandName() {
23+
return 'createStreamProcessor' as const;
24+
}
25+
26+
override buildCommandDocument(_connection: Connection, _session?: ClientSession): Document {
27+
const cmd: Document = {
28+
createStreamProcessor: this.processorName,
29+
pipeline: this.pipeline
30+
};
31+
32+
if (this.aspOptions) {
33+
const optsDoc: Document = {};
34+
if (this.aspOptions.dlq != null) optsDoc.dlq = this.aspOptions.dlq;
35+
if (this.aspOptions.streamMetaFieldName != null)
36+
optsDoc.streamMetaFieldName = this.aspOptions.streamMetaFieldName;
37+
if (this.aspOptions.tier != null) optsDoc.tier = this.aspOptions.tier;
38+
if (this.aspOptions.failover != null) optsDoc.failover = this.aspOptions.failover;
39+
if (Object.keys(optsDoc).length > 0) cmd.options = optsDoc;
40+
}
41+
42+
return cmd;
43+
}
44+
}
45+
46+
defineAspects(CreateStreamProcessorOperation, [Aspect.WRITE_OPERATION]);
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
import { type Connection } from '../..';
2+
import type { Document } from '../../bson';
3+
import { MongoDBResponse } from '../../cmap/wire_protocol/responses';
4+
import type { ClientSession } from '../../sessions';
5+
import { CommandOperation, type CommandOperationOptions } from '../command';
6+
import { Aspect, defineAspects } from '../operation';
7+
8+
/** @internal */
9+
export class DropStreamProcessorOperation extends CommandOperation<Document> {
10+
override SERVER_COMMAND_RESPONSE_TYPE = MongoDBResponse;
11+
12+
constructor(
13+
readonly processorName: string,
14+
options?: CommandOperationOptions
15+
) {
16+
super(undefined, options);
17+
}
18+
19+
override get commandName() {
20+
return 'dropStreamProcessor' as const;
21+
}
22+
23+
override buildCommandDocument(_connection: Connection, _session?: ClientSession): Document {
24+
return { dropStreamProcessor: this.processorName };
25+
}
26+
}
27+
28+
defineAspects(DropStreamProcessorOperation, [Aspect.WRITE_OPERATION]);
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
import { type Connection } from '../..';
2+
import type { Document } from '../../bson';
3+
import { MongoDBResponse } from '../../cmap/wire_protocol/responses';
4+
import type { ClientSession } from '../../sessions';
5+
import { CommandOperation, type CommandOperationOptions } from '../command';
6+
import { Aspect, defineAspects } from '../operation';
7+
8+
/** @internal */
9+
export class GetMoreSampleStreamProcessorOperation extends CommandOperation<Document> {
10+
override SERVER_COMMAND_RESPONSE_TYPE = MongoDBResponse;
11+
12+
constructor(
13+
readonly processorName: string,
14+
readonly cursorId: bigint | number,
15+
readonly batchSize?: number,
16+
options?: CommandOperationOptions
17+
) {
18+
super(undefined, options);
19+
}
20+
21+
override get commandName() {
22+
return 'getMoreSampleStreamProcessor' as const;
23+
}
24+
25+
override buildCommandDocument(_connection: Connection, _session?: ClientSession): Document {
26+
const cmd: Document = {
27+
getMoreSampleStreamProcessor: this.processorName,
28+
cursorId: this.cursorId
29+
};
30+
if (this.batchSize != null) cmd.batchSize = this.batchSize;
31+
return cmd;
32+
}
33+
}
34+
35+
defineAspects(GetMoreSampleStreamProcessorOperation, [Aspect.WRITE_OPERATION]);
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
import { type Connection } from '../..';
2+
import type { Document } from '../../bson';
3+
import { MongoDBResponse } from '../../cmap/wire_protocol/responses';
4+
import type { ClientSession } from '../../sessions';
5+
import { CommandOperation, type CommandOperationOptions } from '../command';
6+
import { Aspect, defineAspects } from '../operation';
7+
8+
/** @internal */
9+
export class GetStreamProcessorOperation extends CommandOperation<Document> {
10+
override SERVER_COMMAND_RESPONSE_TYPE = MongoDBResponse;
11+
12+
constructor(
13+
readonly processorName: string,
14+
options?: CommandOperationOptions
15+
) {
16+
super(undefined, options);
17+
}
18+
19+
override get commandName() {
20+
return 'getStreamProcessor' as const;
21+
}
22+
23+
override buildCommandDocument(_connection: Connection, _session?: ClientSession): Document {
24+
return { getStreamProcessor: this.processorName };
25+
}
26+
}
27+
28+
defineAspects(GetStreamProcessorOperation, [Aspect.READ_OPERATION, Aspect.RETRYABLE]);

0 commit comments

Comments
 (0)