Changestream parallel#40
Conversation
|
auggie review |
🤖 Augment PR SummarySummary: Adds an initial parallel change stream implementation and CI support to validate it across MongoDB versions/topologies. Changes:
Technical Notes: Integration tests are keyed off naming ( 🤖 Was this summary useful? React with 👍 or 👎 |
|
|
||
| var events []bson.Raw | ||
| for { | ||
| if !cs.TryNext(sctx) { |
There was a problem hiding this comment.
mongotools/changestream/changestream.go:233 If the server closes/exhausts the underlying change stream, cs.TryNext will keep returning false with cs.Err()==nil, and this loop will repeatedly sendBatch and spin forever. That can turn a normal stream end into a tight loop that never exits and continuously emits empty batches.
Severity: high
🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.
| blocking bool, | ||
| sess *mongo.Session, | ||
| ) bool { | ||
| for len(pcs.curChanBatch[i].Events) == 0 { |
There was a problem hiding this comment.
mongotools/changestream/changestream.go:335 In blocking mode, fillBatch waits until each stream has at least one event (len(Events)>0), so ParallelChangeStream.Next can hang indefinitely when any partition is idle. This seems likely in practice for hashed dispatching, where some streams may legitimately have no events for long periods.
Severity: high
🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.
| return false | ||
| case batch, ok := <-pcs.channels[i]: | ||
| if !ok { | ||
| pcs.nextErr = fmt.Errorf("channel %d closed unexpectedly", i) |
There was a problem hiding this comment.
mongotools/changestream/changestream.go:343 Treating a closed per-thread channel as "unexpected" can produce misleading errors on normal shutdown paths (e.g., Close() or a thread exiting after setting errFuture). Consider propagating the underlying thread error/cancellation cause here instead of always creating a new error.
Severity: medium
🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.
| } | ||
|
|
||
| func TestIntegration_EventOrdering(t *testing.T) { | ||
| legacytools.SetDriverCompatibility("4.0") |
There was a problem hiding this comment.
mongotools/changestream/integration_test.go:52 legacytools.SetDriverCompatibility mutates global mongo-driver state, so leaving it modified after this test can interfere with any other tests that run in the same go test ./... invocation (especially if packages/tests run in parallel).
Severity: medium
🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.
| t.Logf("Plain change stream captured %d events", len(plainEvents)) | ||
|
|
||
| pcs, err := NewParallel(tctx, db, Options{ | ||
| Streams: 1, |
There was a problem hiding this comment.
mongotools/changestream/integration_test.go:134 This test sets Streams: 1, so it doesn’t actually exercise the multi-stream merge logic or ordering guarantees that ParallelChangeStream is meant to provide.
Severity: medium
🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.
| Unit tests are written in the usual Go way. | ||
|
|
||
| Integration tests MUST include the word `Integration` in the test’s name. Call | ||
| `internal.GetConnStr()` to fetch the connection string for use in your tests. |
No description provided.