Skip to content

Commit 81457c5

Browse files
JohnMcLearclaude
andcommitted
fix: use persistent socket listeners to avoid missing messages in CI
Replace sequential waitForSocketEvent loops with single persistent listeners that filter messages inline. This prevents race conditions where messages arrive between off/on listener cycles, causing timeouts on slower CI runners. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 3bc8bd1 commit 81457c5

1 file changed

Lines changed: 57 additions & 56 deletions

File tree

src/tests/backend/specs/undo_clear_authorship.ts

Lines changed: 57 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -72,42 +72,57 @@ describe(__filename, function () {
7272
};
7373

7474
/**
75-
* Wait for an ACCEPT_COMMIT message, skipping any other COLLABROOM messages
76-
* (like USER_NEWINFO, NEW_CHANGES, etc.) that may arrive first.
75+
* Wait for an ACCEPT_COMMIT or disconnect message, ignoring other messages.
76+
* Uses a single persistent listener to avoid missing messages between on/off cycles.
7777
*/
78-
const waitForAcceptCommit = async (socket: any, wantRev: number) => {
79-
for (;;) {
80-
const msg = await common.waitForSocketEvent(socket, 'message');
81-
if (msg.disconnect) {
82-
throw new Error(`Unexpected disconnect: ${JSON.stringify(msg)}`);
83-
}
84-
if (msg.type === 'COLLABROOM' && msg.data?.type === 'ACCEPT_COMMIT') {
85-
assert.equal(msg.data.newRev, wantRev);
86-
return;
87-
}
88-
// Skip non-ACCEPT_COMMIT messages (USER_NEWINFO, NEW_CHANGES, etc.)
89-
}
78+
const waitForCommitOrDisconnect = (socket: any, timeoutMs = 10000): Promise<any> => {
79+
return new Promise((resolve, reject) => {
80+
const timeout = setTimeout(() => {
81+
socket.off('message', handler);
82+
reject(new Error(`timed out waiting for ACCEPT_COMMIT or disconnect after ${timeoutMs}ms`));
83+
}, timeoutMs);
84+
const handler = (msg: any) => {
85+
if (msg.disconnect) {
86+
clearTimeout(timeout);
87+
socket.off('message', handler);
88+
resolve(msg);
89+
} else if (msg.type === 'COLLABROOM' && msg.data?.type === 'ACCEPT_COMMIT') {
90+
clearTimeout(timeout);
91+
socket.off('message', handler);
92+
resolve(msg);
93+
}
94+
// Ignore USER_NEWINFO, NEW_CHANGES, etc.
95+
};
96+
socket.on('message', handler);
97+
});
9098
};
9199

92-
/**
93-
* Drain messages from a socket until we get an ACCEPT_COMMIT or disconnect.
94-
* Returns the message for assertion.
95-
*/
96-
const waitForNextCommitOrDisconnect = async (socket: any): Promise<any> => {
97-
for (;;) {
98-
const msg = await common.waitForSocketEvent(socket, 'message');
99-
if (msg.disconnect) return msg;
100-
if (msg.type === 'COLLABROOM' && msg.data?.type === 'ACCEPT_COMMIT') return msg;
101-
// Skip USER_NEWINFO, NEW_CHANGES, etc.
100+
const waitForAcceptCommit = async (socket: any, wantRev: number) => {
101+
const msg = await waitForCommitOrDisconnect(socket);
102+
if (msg.disconnect) {
103+
throw new Error(`Unexpected disconnect: ${JSON.stringify(msg)}`);
102104
}
105+
assert.equal(msg.data.newRev, wantRev);
103106
};
104107

105108
/**
106-
* Drain non-ACCEPT_COMMIT messages so the socket is ready for the next operation.
107-
* Waits briefly then consumes any queued messages.
109+
* Wait for a specific message type, ignoring others. Used for cross-user sync.
108110
*/
109-
const drainMessages = async (socket: any) => {
110-
await new Promise(resolve => setTimeout(resolve, 500));
111+
const waitForNewChanges = (socket: any, timeoutMs = 10000): Promise<void> => {
112+
return new Promise((resolve, reject) => {
113+
const timeout = setTimeout(() => {
114+
socket.off('message', handler);
115+
reject(new Error(`timed out waiting for NEW_CHANGES after ${timeoutMs}ms`));
116+
}, timeoutMs);
117+
const handler = (msg: any) => {
118+
if (msg.type === 'COLLABROOM' && msg.data?.type === 'NEW_CHANGES') {
119+
clearTimeout(timeout);
120+
socket.off('message', handler);
121+
resolve();
122+
}
123+
};
124+
socket.on('message', handler);
125+
});
111126
};
112127

113128
describe('undo of clear authorship colors (bug #2802)', function () {
@@ -129,24 +144,22 @@ describe(__filename, function () {
129144
revA += 1;
130145

131146
// Step 3: Connect User B (after User A's text is committed)
132-
await drainMessages(socketA);
133147
const userB = await connectUser();
134148
socketB = userB.socket;
135149
revB = userB.rev;
136-
// User B joins and sees the pad at the current head revision
137-
await drainMessages(socketA);
138150

139151
// Step 4: User B types " world" with their author attribute
140152
const apoolB = new AttributePool();
141153
apoolB.putAttrib(['author', userB.author]);
154+
const userASeesB = waitForNewChanges(socketA);
142155
await Promise.all([
143156
waitForAcceptCommit(socketB, revB + 1),
144157
sendUserChanges(socketB, revB, 'Z:6>6=5*0+6$ world', apoolB),
145158
]);
146159
revB += 1;
147160

148-
// Wait for User A to see the change
149-
await drainMessages(socketA);
161+
// Wait for User A to see User B's change
162+
await userASeesB;
150163
revA = revB;
151164

152165
// The pad now has "hello world\n" with two different authors
@@ -160,27 +173,21 @@ describe(__filename, function () {
160173
sendUserChanges(socketB, revB, 'Z:c>0*0=b$', clearPool),
161174
]);
162175
revB += 1;
163-
await drainMessages(socketA);
164-
revA = revB;
165176

166177
// Step 6: User B undoes the clear authorship
167178
// This is the critical part - the undo changeset re-applies the original
168179
// author attributes, which include User A's author ID.
169-
// The server currently rejects this because User B is submitting changes
170-
// with User A's author ID.
171180
const undoPool = new AttributePool();
172181
undoPool.putAttrib(['author', userA.author]); // 0 = author A
173182
undoPool.putAttrib(['author', userB.author]); // 1 = author B
174183
// Undo restores: "hello" with author A (5 chars), " world" with author B (6 chars)
175184
const undoChangeset = 'Z:c>0*0=5*1=6$';
176185

177186
// This should NOT disconnect User B - that's the bug (#2802)
178-
const result = await Promise.all([
179-
waitForNextCommitOrDisconnect(socketB),
180-
sendUserChanges(socketB, revB, undoChangeset, undoPool),
181-
]);
187+
const resultP = waitForCommitOrDisconnect(socketB);
188+
await sendUserChanges(socketB, revB, undoChangeset, undoPool);
189+
const msg = await resultP;
182190

183-
const msg = result[0];
184191
assert.notDeepEqual(msg, {disconnect: 'badChangeset'},
185192
'User was disconnected with badChangeset - bug #2802');
186193
assert.equal(msg.type, 'COLLABROOM');
@@ -250,24 +257,19 @@ describe(__filename, function () {
250257
socketA = userA.socket;
251258
revA = userA.rev;
252259

253-
await drainMessages(socketA);
254-
255260
const userB = await connectUser();
256261
socketB = userB.socket;
257262
revB = userB.rev;
258263

259-
await drainMessages(socketA);
260-
261264
// User B tries to insert text attributed to User A - this should be rejected
262265
const fakePool = new AttributePool();
263266
fakePool.putAttrib(['author', userA.author]);
264267

265-
const result = await Promise.all([
266-
waitForNextCommitOrDisconnect(socketB),
267-
sendUserChanges(socketB, revB, 'Z:1>5*0+5$hello', fakePool),
268-
]);
268+
const resultP = waitForCommitOrDisconnect(socketB);
269+
await sendUserChanges(socketB, revB, 'Z:1>5*0+5$hello', fakePool);
270+
const msg = await resultP;
269271

270-
assert.deepEqual(result[0], {disconnect: 'badChangeset'},
272+
assert.deepEqual(msg, {disconnect: 'badChangeset'},
271273
'Should reject changeset that impersonates another author for new text');
272274
});
273275

@@ -291,12 +293,11 @@ describe(__filename, function () {
291293
const fakePool = new AttributePool();
292294
fakePool.putAttrib(['author', 'a.fabricatedAuthorId']);
293295

294-
const result = await Promise.all([
295-
waitForNextCommitOrDisconnect(socketA),
296-
sendUserChanges(socketA, revA, 'Z:6>0*0=5$', fakePool),
297-
]);
296+
const resultP = waitForCommitOrDisconnect(socketA);
297+
await sendUserChanges(socketA, revA, 'Z:6>0*0=5$', fakePool);
298+
const msg = await resultP;
298299

299-
assert.deepEqual(result[0], {disconnect: 'badChangeset'},
300+
assert.deepEqual(msg, {disconnect: 'badChangeset'},
300301
'Should reject = op with fabricated author not in pad pool');
301302
});
302303
});

0 commit comments

Comments
 (0)