Skip to content

Commit 3fad4fb

Browse files
authored
fix: LiveQuery subscriptions leak when a client reuses a subscribe requestId (#10499)
1 parent c700ebd commit 3fad4fb

4 files changed

Lines changed: 260 additions & 15 deletions

File tree

spec/ParseLiveQuery.spec.js

Lines changed: 209 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1485,3 +1485,212 @@ describe('ParseLiveQuery', function () {
14851485
});
14861486
});
14871487
});
1488+
1489+
describe('ParseLiveQuery duplicate requestId handling', function () {
1490+
const WebSocket = require('ws');
1491+
1492+
const waitFor = async predicate => {
1493+
const deadline = Date.now() + 4000;
1494+
while (Date.now() < deadline) {
1495+
if (predicate()) {
1496+
return;
1497+
}
1498+
await sleep(20);
1499+
}
1500+
throw new Error('timed out waiting for condition');
1501+
};
1502+
1503+
let sockets;
1504+
1505+
beforeEach(() => {
1506+
Parse.CoreManager.getLiveQueryController().setDefaultLiveQueryClient(null);
1507+
sockets = [];
1508+
});
1509+
1510+
afterEach(() => {
1511+
for (const socket of sockets) {
1512+
if (socket.readyState === WebSocket.OPEN) {
1513+
socket.close();
1514+
}
1515+
}
1516+
sockets = [];
1517+
});
1518+
1519+
const configureServer = async () => {
1520+
const parseServer = await reconfigureServer({
1521+
liveQuery: { classNames: ['LQDupA', 'LQDupB'] },
1522+
startLiveQueryServer: true,
1523+
verbose: false,
1524+
silent: true,
1525+
});
1526+
return parseServer.liveQueryServer;
1527+
};
1528+
1529+
// Opens a raw LiveQuery WebSocket client and returns a small protocol helper.
1530+
const openClient = async () => {
1531+
const socket = new WebSocket('ws://localhost:8378/1');
1532+
sockets.push(socket);
1533+
const messages = [];
1534+
socket.on('message', data => messages.push(JSON.parse(data.toString())));
1535+
await new Promise((resolve, reject) => {
1536+
socket.on('open', resolve);
1537+
socket.on('error', reject);
1538+
});
1539+
socket.send(JSON.stringify({ op: 'connect', applicationId: Parse.applicationId }));
1540+
const client = {
1541+
socket,
1542+
messages,
1543+
subscribe(requestId, className, where) {
1544+
socket.send(JSON.stringify({ op: 'subscribe', requestId, query: { className, where } }));
1545+
},
1546+
update(requestId, className, where) {
1547+
socket.send(JSON.stringify({ op: 'update', requestId, query: { className, where } }));
1548+
},
1549+
countOp(op) {
1550+
return messages.filter(message => message.op === op).length;
1551+
},
1552+
waitForOpCount(op, count) {
1553+
return waitFor(() => this.countOp(op) === count);
1554+
},
1555+
};
1556+
await waitFor(() => messages.some(message => message.op === 'connected'));
1557+
return client;
1558+
};
1559+
1560+
it('replaces rather than leaks subscriptions when a client reuses a requestId with different queries', async () => {
1561+
const lqServer = await configureServer();
1562+
const client = await openClient();
1563+
1564+
for (let i = 0; i < 5; i++) {
1565+
client.subscribe(7, 'LQDupA', { marker: `ws-${i}` });
1566+
}
1567+
await client.waitForOpCount('subscribed', 5);
1568+
1569+
// Reusing one requestId must keep a single active subscription, not one per frame.
1570+
expect(lqServer.subscriptions.get('LQDupA').size).toBe(1);
1571+
1572+
client.socket.close();
1573+
await waitFor(() => lqServer.clients.size === 0);
1574+
1575+
// No stale subscriptions may survive the disconnect.
1576+
expect(lqServer.subscriptions.get('LQDupA')?.size ?? 0).toBe(0);
1577+
});
1578+
1579+
it('does not leak subscriptions when a client reuses a requestId with the same query', async () => {
1580+
const lqServer = await configureServer();
1581+
const client = await openClient();
1582+
1583+
for (let i = 0; i < 5; i++) {
1584+
client.subscribe(7, 'LQDupA', { marker: 'same' });
1585+
}
1586+
await client.waitForOpCount('subscribed', 5);
1587+
1588+
expect(lqServer.subscriptions.get('LQDupA').size).toBe(1);
1589+
1590+
client.socket.close();
1591+
await waitFor(() => lqServer.clients.size === 0);
1592+
1593+
expect(lqServer.subscriptions.get('LQDupA')?.size ?? 0).toBe(0);
1594+
});
1595+
1596+
it('cleans up the prior subscription when a client reuses a requestId on a different class', async () => {
1597+
const lqServer = await configureServer();
1598+
const client = await openClient();
1599+
1600+
client.subscribe(7, 'LQDupA', { marker: 'a' });
1601+
await client.waitForOpCount('subscribed', 1);
1602+
client.subscribe(7, 'LQDupB', { marker: 'b' });
1603+
await client.waitForOpCount('subscribed', 2);
1604+
client.subscribe(7, 'LQDupA', { marker: 'a2' });
1605+
await client.waitForOpCount('subscribed', 3);
1606+
1607+
// Only the most recent subscription survives; the prior class entry is pruned.
1608+
expect(lqServer.subscriptions.get('LQDupA').size).toBe(1);
1609+
expect(lqServer.subscriptions.has('LQDupB')).toBe(false);
1610+
1611+
client.socket.close();
1612+
await waitFor(() => lqServer.clients.size === 0);
1613+
1614+
expect(lqServer.subscriptions.get('LQDupA')?.size ?? 0).toBe(0);
1615+
expect(lqServer.subscriptions.has('LQDupB')).toBe(false);
1616+
});
1617+
1618+
it('does not tear down a subscription still held by another client when a client reuses a requestId', async () => {
1619+
const lqServer = await configureServer();
1620+
const clientA = await openClient();
1621+
const clientB = await openClient();
1622+
1623+
// Both clients share the same query, so they share one Subscription.
1624+
clientA.subscribe(7, 'LQDupA', { marker: 'shared' });
1625+
await clientA.waitForOpCount('subscribed', 1);
1626+
clientB.subscribe(9, 'LQDupA', { marker: 'shared' });
1627+
await clientB.waitForOpCount('subscribed', 1);
1628+
expect(lqServer.subscriptions.get('LQDupA').size).toBe(1);
1629+
1630+
// Client A reuses its requestId with a different query.
1631+
clientA.subscribe(7, 'LQDupA', { marker: 'other' });
1632+
await clientA.waitForOpCount('subscribed', 2);
1633+
1634+
// The shared subscription must survive (B still holds it), alongside A's new one.
1635+
expect(lqServer.subscriptions.get('LQDupA').size).toBe(2);
1636+
1637+
// The shared subscription still delivers events to B, but not to A anymore.
1638+
const shared = new Parse.Object('LQDupA');
1639+
shared.set('marker', 'shared');
1640+
await shared.save(null, { useMasterKey: true });
1641+
await clientB.waitForOpCount('create', 1);
1642+
expect(clientB.countOp('create')).toBe(1);
1643+
expect(clientA.countOp('create')).toBe(0);
1644+
1645+
clientA.socket.close();
1646+
clientB.socket.close();
1647+
await waitFor(() => lqServer.clients.size === 0);
1648+
expect(lqServer.subscriptions.get('LQDupA')?.size ?? 0).toBe(0);
1649+
});
1650+
1651+
it('delivers events only for the replacement query after a client reuses a requestId', async () => {
1652+
const lqServer = await configureServer();
1653+
const client = await openClient();
1654+
1655+
client.subscribe(7, 'LQDupA', { marker: 'old' });
1656+
await client.waitForOpCount('subscribed', 1);
1657+
client.subscribe(7, 'LQDupA', { marker: 'new' });
1658+
await client.waitForOpCount('subscribed', 2);
1659+
expect(lqServer.subscriptions.get('LQDupA').size).toBe(1);
1660+
1661+
const oldObject = new Parse.Object('LQDupA');
1662+
oldObject.set('marker', 'old');
1663+
await oldObject.save(null, { useMasterKey: true });
1664+
1665+
const newObject = new Parse.Object('LQDupA');
1666+
newObject.set('marker', 'new');
1667+
await newObject.save(null, { useMasterKey: true });
1668+
1669+
await client.waitForOpCount('create', 1);
1670+
// Only the replacement query (marker 'new') may produce an event.
1671+
expect(client.countOp('create')).toBe(1);
1672+
expect(client.messages.find(message => message.op === 'create').object.marker).toBe('new');
1673+
});
1674+
1675+
it('keeps the update op working after the duplicate-subscribe cleanup', async () => {
1676+
const lqServer = await configureServer();
1677+
const client = await openClient();
1678+
1679+
client.subscribe(7, 'LQDupA', { marker: 'old' });
1680+
await client.waitForOpCount('subscribed', 1);
1681+
client.update(7, 'LQDupA', { marker: 'new' });
1682+
await client.waitForOpCount('subscribed', 2);
1683+
1684+
expect(lqServer.subscriptions.get('LQDupA').size).toBe(1);
1685+
1686+
const updated = new Parse.Object('LQDupA');
1687+
updated.set('marker', 'new');
1688+
await updated.save(null, { useMasterKey: true });
1689+
await client.waitForOpCount('create', 1);
1690+
expect(client.countOp('create')).toBe(1);
1691+
1692+
client.socket.close();
1693+
await waitFor(() => lqServer.clients.size === 0);
1694+
expect(lqServer.subscriptions.get('LQDupA')?.size ?? 0).toBe(0);
1695+
});
1696+
});

spec/helper.js

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -657,12 +657,18 @@ global.fdescribe_only = validator => {
657657

658658
const libraryCache = {};
659659
jasmine.mockLibrary = function (library, name, mock) {
660-
const original = require(library)[name];
661660
if (!libraryCache[library]) {
662661
libraryCache[library] = {};
663662
}
663+
// Cache the original implementation only the first time an export is mocked.
664+
// Re-mocking the same export (e.g. swapping the mock mid-test) must not
665+
// overwrite the cached original with another mock, otherwise restoreLibrary
666+
// would restore a mock instead of the real implementation and leak it into
667+
// later specs.
668+
if (!(name in libraryCache[library])) {
669+
libraryCache[library][name] = require(library)[name];
670+
}
664671
require(library)[name] = mock;
665-
libraryCache[library][name] = original;
666672
};
667673

668674
jasmine.restoreLibrary = function (library, name) {

src/LiveQuery/ParseLiveQueryServer.ts

Lines changed: 38 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -523,12 +523,14 @@ class ParseLiveQueryServer {
523523

524524
// If there is no client which is subscribing this subscription, remove it from subscriptions
525525
const classSubscriptions = this.subscriptions.get(subscription.className);
526-
if (!subscription.hasSubscribingClient()) {
527-
classSubscriptions.delete(subscription.hash);
528-
}
529-
// If there is no subscriptions under this class, remove it from subscriptions
530-
if (classSubscriptions.size === 0) {
531-
this.subscriptions.delete(subscription.className);
526+
if (classSubscriptions) {
527+
if (!subscription.hasSubscribingClient()) {
528+
classSubscriptions.delete(subscription.hash);
529+
}
530+
// If there is no subscriptions under this class, remove it from subscriptions
531+
if (classSubscriptions.size === 0) {
532+
this.subscriptions.delete(subscription.className);
533+
}
532534
}
533535
}
534536

@@ -1164,6 +1166,28 @@ class ParseLiveQueryServer {
11641166
// Validate regex patterns in the subscription query
11651167
this._validateQueryConstraints(request.query.where);
11661168

1169+
// If this client already has a subscription registered under this
1170+
// requestId, replace it by tearing down the previous subscription before
1171+
// creating the new one. The client-side metadata map is keyed only by
1172+
// requestId, so a duplicate `subscribe` frame would otherwise overwrite it
1173+
// while the previous Subscription stays in the server-wide map, leaking it
1174+
// for the lifetime of the process (disconnect cleanup only walks the
1175+
// surviving client metadata and never reaches the orphaned subscription).
1176+
const previousSubscriptionInfo = client.getSubscriptionInfo(request.requestId);
1177+
if (previousSubscriptionInfo) {
1178+
const previousSubscription = previousSubscriptionInfo.subscription;
1179+
previousSubscription.deleteClientSubscription(parseWebsocket.clientId, request.requestId);
1180+
const previousClassSubscriptions = this.subscriptions.get(previousSubscription.className);
1181+
if (previousClassSubscriptions) {
1182+
if (!previousSubscription.hasSubscribingClient()) {
1183+
previousClassSubscriptions.delete(previousSubscription.hash);
1184+
}
1185+
if (previousClassSubscriptions.size === 0) {
1186+
this.subscriptions.delete(previousSubscription.className);
1187+
}
1188+
}
1189+
}
1190+
11671191
// Get subscription from subscriptions, create one if necessary
11681192
const subscriptionHash = queryHash(request.query);
11691193
// Add className to subscriptions if necessary
@@ -1286,12 +1310,14 @@ class ParseLiveQueryServer {
12861310
subscription.deleteClientSubscription(parseWebsocket.clientId, requestId);
12871311
// If there is no client which is subscribing this subscription, remove it from subscriptions
12881312
const classSubscriptions = this.subscriptions.get(className);
1289-
if (!subscription.hasSubscribingClient()) {
1290-
classSubscriptions.delete(subscription.hash);
1291-
}
1292-
// If there is no subscriptions under this class, remove it from subscriptions
1293-
if (classSubscriptions.size === 0) {
1294-
this.subscriptions.delete(className);
1313+
if (classSubscriptions) {
1314+
if (!subscription.hasSubscribingClient()) {
1315+
classSubscriptions.delete(subscription.hash);
1316+
}
1317+
// If there is no subscriptions under this class, remove it from subscriptions
1318+
if (classSubscriptions.size === 0) {
1319+
this.subscriptions.delete(className);
1320+
}
12951321
}
12961322
runLiveQueryEventHandlers({
12971323
client,

src/LiveQuery/Subscription.js

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,11 @@ class Subscription {
2222
this.clientRequestIds.set(clientId, []);
2323
}
2424
const requestIds = this.clientRequestIds.get(clientId);
25-
requestIds.push(requestId);
25+
// Keep (clientId, requestId) pairs unique so a duplicate registration cannot
26+
// leave a residual entry that survives cleanup.
27+
if (!requestIds.includes(requestId)) {
28+
requestIds.push(requestId);
29+
}
2630
}
2731

2832
deleteClientSubscription(clientId: number, requestId: number): void {

0 commit comments

Comments
 (0)