Skip to content

Changestream parallel#40

Draft
FGasper wants to merge 32 commits into
mainfrom
changestream_parallel
Draft

Changestream parallel#40
FGasper wants to merge 32 commits into
mainfrom
changestream_parallel

Conversation

@FGasper

@FGasper FGasper commented Jun 24, 2026

Copy link
Copy Markdown
Collaborator

No description provided.

Copilot AI review requested due to automatic review settings June 24, 2026 19:30

Copilot AI left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copilot wasn't able to review this pull request because it exceeds the maximum number of lines (20,000). Try reducing the number of changed lines and requesting a review from Copilot again.

Copilot AI review requested due to automatic review settings June 24, 2026 19:48

Copilot AI left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copilot wasn't able to review this pull request because it exceeds the maximum number of lines (20,000). Try reducing the number of changed lines and requesting a review from Copilot again.

Copilot AI review requested due to automatic review settings June 24, 2026 23:55

Copilot AI left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copilot wasn't able to review this pull request because it exceeds the maximum number of lines (20,000). Try reducing the number of changed lines and requesting a review from Copilot again.

Copilot AI review requested due to automatic review settings June 25, 2026 00:20

Copilot AI left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copilot wasn't able to review this pull request because it exceeds the maximum number of lines (20,000). Try reducing the number of changed lines and requesting a review from Copilot again.

@FGasper

FGasper commented Jun 25, 2026

Copy link
Copy Markdown
Collaborator Author

auggie review

@augmentcode

augmentcode Bot commented Jun 25, 2026

Copy link
Copy Markdown
🤖 Augment PR Summary

Summary: Adds an initial parallel change stream implementation and CI support to validate it across MongoDB versions/topologies.

Changes:

  • Introduces mongotools/changestream with ParallelChangeStream, which runs multiple change streams in parallel and merges events while attempting to preserve order.
  • Adds an integration test that compares events from a plain database-level watch vs. the parallel stream.
  • Adds internal/test.go helpers to read integration-test env vars (URI/topology/version) and skip when unset.
  • Adds legacytools.SetDriverCompatibility to relax mongo-driver’s minimum supported server/wire versions for older clusters.
  • Adds a GitHub Actions “Integration” workflow that provisions MongoDB via Docker and runs integration tests over a version/topology matrix.
  • Bumps dependencies (notably go.mongodb.org/mongo-driver/v2 and github.com/samber/lo) and updates third-party notices/vendor accordingly.

Technical Notes: Integration tests are keyed off naming (-run Integration) and consume connection details via env vars provided by the workflow.

🤖 Was this summary useful? React with 👍 or 👎

@augmentcode augmentcode Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review completed. 8 suggestions posted.

Fix All in Augment

Comment augment review to trigger a new review at any time.


var events []bson.Raw
for {
if !cs.TryNext(sctx) {

@augmentcode augmentcode Bot Jun 25, 2026

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mongotools/changestream/changestream.go:233 If the server closes/exhausts the underlying change stream, cs.TryNext will keep returning false with cs.Err()==nil, and this loop will repeatedly sendBatch and spin forever. That can turn a normal stream end into a tight loop that never exits and continuously emits empty batches.

Severity: high

Fix This in Augment

🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.

Comment thread mongotools/changestream/changestream.go Outdated
Comment thread mongotools/changestream/changestream.go Outdated
Comment thread mongotools/changestream/changestream.go Outdated
blocking bool,
sess *mongo.Session,
) bool {
for len(pcs.curChanBatch[i].Events) == 0 {

@augmentcode augmentcode Bot Jun 25, 2026

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mongotools/changestream/changestream.go:335 In blocking mode, fillBatch waits until each stream has at least one event (len(Events)>0), so ParallelChangeStream.Next can hang indefinitely when any partition is idle. This seems likely in practice for hashed dispatching, where some streams may legitimately have no events for long periods.

Severity: high

Fix This in Augment

🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.

return false
case batch, ok := <-pcs.channels[i]:
if !ok {
pcs.nextErr = fmt.Errorf("channel %d closed unexpectedly", i)

@augmentcode augmentcode Bot Jun 25, 2026

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mongotools/changestream/changestream.go:343 Treating a closed per-thread channel as "unexpected" can produce misleading errors on normal shutdown paths (e.g., Close() or a thread exiting after setting errFuture). Consider propagating the underlying thread error/cancellation cause here instead of always creating a new error.

Severity: medium

Fix This in Augment

🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.

}

func TestIntegration_EventOrdering(t *testing.T) {
legacytools.SetDriverCompatibility("4.0")

@augmentcode augmentcode Bot Jun 25, 2026

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mongotools/changestream/integration_test.go:52 legacytools.SetDriverCompatibility mutates global mongo-driver state, so leaving it modified after this test can interfere with any other tests that run in the same go test ./... invocation (especially if packages/tests run in parallel).

Severity: medium

Fix This in Augment

🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.

t.Logf("Plain change stream captured %d events", len(plainEvents))

pcs, err := NewParallel(tctx, db, Options{
Streams: 1,

@augmentcode augmentcode Bot Jun 25, 2026

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mongotools/changestream/integration_test.go:134 This test sets Streams: 1, so it doesn’t actually exercise the multi-stream merge logic or ordering guarantees that ParallelChangeStream is meant to provide.

Severity: medium

Fix This in Augment

🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.

Comment thread README.md
Unit tests are written in the usual Go way.

Integration tests MUST include the word `Integration` in the test’s name. Call
`internal.GetConnStr()` to fetch the connection string for use in your tests.

@augmentcode augmentcode Bot Jun 25, 2026

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

README.md:44 The README says to call internal.GetConnStr(), but the helper in internal/test.go requires a *testing.T argument (internal.GetConnStr(t)).

Severity: low

Fix This in Augment

🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.

Copilot AI review requested due to automatic review settings June 25, 2026 01:33

Copilot AI left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copilot wasn't able to review this pull request because it exceeds the maximum number of lines (20,000). Try reducing the number of changed lines and requesting a review from Copilot again.

Copilot AI review requested due to automatic review settings June 25, 2026 19:17

Copilot AI left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copilot wasn't able to review this pull request because it exceeds the maximum number of lines (20,000). Try reducing the number of changed lines and requesting a review from Copilot again.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants