Skip to content

Commit 2b5fe52

Browse files
committed
feat: automatically purge stale prepared statements from cache and retry query
1 parent 3adca00 commit 2b5fe52

2 files changed

Lines changed: 121 additions & 2 deletions

File tree

lib/base/connection.js

Lines changed: 71 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ const Packets = require('../packets/index.js');
2828
const Commands = require('../commands/index.js');
2929
const ConnectionConfig = require('../connection_config.js');
3030
const CharsetToEncoding = require('../constants/charset_encodings.js');
31+
const { ER_UNKNOWN_STMT_HANDLER } = require('../constants/errors.js');
3132
const {
3233
traceCallback,
3334
tracePromise,
@@ -814,13 +815,45 @@ class BaseConnection extends EventEmitter {
814815
this.addCommand(executeCommand);
815816
};
816817

818+
// We need to intercept and retry prepareAndExecute if we had a stale prepared statement in the cache that the server already released
819+
const key = BaseConnection.statementKey(options);
820+
const cacheHasKey = this._statements.has(key);
821+
817822
if (executeCommand.onResult) {
818823
// Callback mode: traceCallback wraps the callback with tracing lifecycle, or calls through directly when no subscribers are registered
819824
const origExecCb = executeCommand.onResult;
820825
traceCallback(
821826
executeChannel,
822827
(wrappedCb) => {
823-
executeCommand.onResult = wrappedCb;
828+
if (cacheHasKey) {
829+
executeCommand.onResult = (err, ...rest) => {
830+
if (err && err.errno === ER_UNKNOWN_STMT_HANDLER) {
831+
const origEmit = executeCommand.emit.bind(executeCommand);
832+
executeCommand.emit = (eventName, ...rest) => {
833+
if (eventName === 'end') {
834+
// Intercept the 'end' event that will be emitted after this 'error' event is emitted
835+
executeCommand.emit = origEmit;
836+
return false;
837+
}
838+
839+
return origEmit(eventName, ...rest);
840+
};
841+
842+
// Listeners may have been added to this execute command, so we re-use it
843+
executeCommand.next = null;
844+
845+
this._statements.delete(key);
846+
executeCommand.onResult = wrappedCb;
847+
prepareAndExecute(wrappedCb);
848+
return;
849+
}
850+
851+
wrappedCb(err, ...rest);
852+
};
853+
} else {
854+
executeCommand.onResult = wrappedCb;
855+
}
856+
824857
prepareAndExecute(wrappedCb);
825858
},
826859
0,
@@ -837,7 +870,43 @@ class BaseConnection extends EventEmitter {
837870
null,
838871
origExecCb
839872
);
840-
} else if (shouldTrace(executeChannel)) {
873+
874+
return executeCommand;
875+
}
876+
877+
if (cacheHasKey) {
878+
const origEmit = executeCommand.emit.bind(executeCommand);
879+
executeCommand.emit = (eventName, firstArg, ...rest) => {
880+
if (
881+
eventName === 'error' &&
882+
firstArg &&
883+
firstArg.errno === ER_UNKNOWN_STMT_HANDLER
884+
) {
885+
executeCommand.emit = (eventName, ...rest) => {
886+
if (eventName === 'end') {
887+
// Intercept the 'end' event that will be emitted after this 'error' event is emitted
888+
executeCommand.emit = origEmit;
889+
return false;
890+
}
891+
892+
return origEmit(eventName, ...rest);
893+
};
894+
895+
// Listeners may have been added to this execute command, so we re-use it
896+
executeCommand.next = null;
897+
898+
this._statements.delete(key);
899+
prepareAndExecute((err) => {
900+
executeCommand.emit('error', err);
901+
});
902+
return false;
903+
}
904+
905+
return origEmit(eventName, firstArg, ...rest);
906+
};
907+
}
908+
909+
if (shouldTrace(executeChannel)) {
841910
// Event-emitter mode: tracePromise wraps the async lifecycle
842911
tracePromise(
843912
executeChannel,

test/integration/connection/test-execute-cached.test.mts

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,5 +41,55 @@ await describe('Execute Cached', async () => {
4141
strict.deepEqual(rows3, [{ test: 126 }]);
4242
});
4343

44+
await it('should discard cached prepared statements that no longer exist on the server and retry automatically', async () => {
45+
// Remove on the server but leave it in the cache
46+
// @ts-expect-error: internal access
47+
connection._statements.get(key).close();
48+
49+
// Remember the id
50+
// @ts-expect-error: internal access
51+
const { id: oldStatementId } = connection._statements.get(key);
52+
53+
// @ts-expect-error: internal access
54+
strict(connection._statements.size === 1);
55+
// @ts-expect-error: internal access
56+
strict(connection._statements.get(key).query === q);
57+
// @ts-expect-error: internal access
58+
strict(connection._statements.get(key).parameters.length === 1);
59+
// @ts-expect-error: internal access
60+
strict(connection._statements.get(key).id === oldStatementId);
61+
62+
const rows1 = await new Promise<TestRow[]>((resolve, reject) => {
63+
connection.execute<TestRow[]>(q, [123], (err, _rows) =>
64+
err ? reject(err) : resolve(_rows)
65+
);
66+
});
67+
68+
const rows2 = await new Promise<TestRow[]>((resolve, reject) => {
69+
connection.execute<TestRow[]>(q, [124], (err, _rows) =>
70+
err ? reject(err) : resolve(_rows)
71+
);
72+
});
73+
74+
const rows3 = await new Promise<TestRow[]>((resolve, reject) => {
75+
connection.execute<TestRow[]>(q, [125], (err, _rows) =>
76+
err ? reject(err) : resolve(_rows)
77+
);
78+
});
79+
80+
// @ts-expect-error: internal access
81+
strict(connection._statements.size === 1);
82+
// @ts-expect-error: internal access
83+
strict(connection._statements.get(key).query === q);
84+
// @ts-expect-error: internal access
85+
strict(connection._statements.get(key).parameters.length === 1);
86+
// @ts-expect-error: internal access
87+
strict(connection._statements.get(key).id !== oldStatementId);
88+
89+
strict.deepEqual(rows1, [{ test: 124 }]);
90+
strict.deepEqual(rows2, [{ test: 125 }]);
91+
strict.deepEqual(rows3, [{ test: 126 }]);
92+
});
93+
4494
connection.end();
4595
});

0 commit comments

Comments
 (0)