Skip to content

add asp commands#1701

Draft
nickpoindexter wants to merge 4 commits into
mongodb:mainfrom
nickpoindexter:add_asp_commands
Draft

add asp commands#1701
nickpoindexter wants to merge 4 commits into
mongodb:mainfrom
nickpoindexter:add_asp_commands

Conversation

@nickpoindexter
Copy link
Copy Markdown

Summary

Adds first-class driver support for Atlas Stream Processing (ASP), implementing the ASP driver spec. Today users have to drop down to Database::run_command against a workspace endpoint; this PR adds a dedicated client/handles/operations layer.

What's new

Public module (driver/src/stream_processing.rs, driver/src/stream_processing/)

  • StreamProcessingClient — workspace-scoped client distinct from Client. Validates the workspace URI (atlas-stream-*.<region>.a.query.mongodb.net and the .mongodb-<env>.net staging variants), enforces TLS, defaults authSource=admin. Constructor pair: with_uri_str and with_options. Static is_workspace_uri(&str) -> bool is exposed for callers that want to gate before connecting.
  • StreamProcessorscreate() / get() / get_info() for managing processors in a workspace.
  • StreamProcessorstart() / stop() / drop() / stats() / samples() for a named processor.
  • StreamProcessorInfo — typed deserialization target for getStreamProcessor. Fields the spec marks as Optional (e.g. id, pipeline_version) are Option<T>.
  • StreamProcessorSamples — result of samples(), exposes cursor_id, documents, is_exhausted(). - Options: CreateStreamProcessorOptions, StartStreamProcessorOptions, FailoverOptions, GetStreamProcessorStatsOptions, GetStreamProcessorSamplesOptions. All #[derive(TypedBuilder, Serialize, Deserialize)] with #[skip_serializing_none] to match the rest of the driver's options pattern.

Operation layer (driver/src/operation/stream_processing/)
One OperationWithDefaults impl per wire command: createStreamProcessor, startStreamProcessor, stopStreamProcessor, dropStreamProcessor, getStreamProcessor, getStreamProcessorStats, startSampleStreamProcessor, getMoreSampleStreamProcessor. All target the admin database via OperationTarget::admin(client). Retryable reads (getStreamProcessor, getStreamProcessorStats) declare
Retryability::read(options) per the spec's retryability matrix.

Notable spec / server alignment

  • startAfter (in StartStreamProcessorOptions) is intentionally not exposed — the spec marks it RESERVED for future use and explicitly forbids drivers from sending it. - Dev-server response shape deviations are accommodated, matching the workarounds in the Python POC and PHP PR:
    • GetStreamProcessor unwraps a top-level result wrapper when the server returns { ok: 1, result: { … } }.
    • GetMoreSampleStreamProcessor accepts both nextBatch (spec) and messages (current dev server). - StreamProcessorInfo::id and pipeline_version are Option<T> rather than required.
  • samples() is a single-command dispatch: absent / zero cursor_idstartSampleStreamProcessor (returns cursor_id, empty docs); non-zero cursor_idgetMoreSampleStreamProcessor. Callers stop when cursor_id ==0.
  • State strings are returned as plain String, not an enum — per the spec, drivers MUST surface unknown state values as-is rather than mapping to a closed set.
  • Internal-only fields (tenantID, projectId, processorId) are not surfaced in the user-facing API.

Test plan

  • cargo check -p mongodb clean
  • cargo clippy -p mongodb --all-targets -- -D warnings — no new warnings on the new code (only pre-existing CSFLE-feature-gated dead-code warnings in client.rs / tracking_arc.rs that are present on main)
  • cargo +nightly fmt --check -- --unstable-features clean
  • cargo test -p mongodb --lib stream_processing::tests — 10 unit tests pass (URI detection for prod / staging / creds-and-port / case-insensitivity / four reject cases; async rejection paths for non-workspace URI and SRVscheme)
  • Functional smoke test stream_processing::lifecycle in driver/src/test/stream_processing.rs runs the full create → start → stats → sample → stop → drop lifecycle. Self-skips via log_uncaptured unless MONGODB_STREAM_PROCESSING_URI is set to a real workspace endpoint — needs an Evergreen variant configured with workspace creds before it actually exercises in CI.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant