Skip to content

Commit 6c7e3b9

Browse files
committed
wip: close sockets on instance closed
1 parent 072eb1e commit 6c7e3b9

File tree

4 files changed

+112
-65
lines changed

4 files changed

+112
-65
lines changed

src/cloud-sql-instance.ts

Lines changed: 49 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,16 @@ import {RSAKeys} from './rsa-keys';
2424
import {SslCert} from './ssl-cert';
2525
import {getRefreshInterval, isExpirationTimeValid} from './time';
2626
import {AuthTypes} from './auth-types';
27+
import {CloudSQLConnectorError} from './errors';
28+
29+
// Private types that describe exactly the methods
30+
// needed from tls.Socket to be able to close
31+
// sockets when the DNS Name changes.
32+
type EventFn = () => void;
33+
type ClosableSocket = {
34+
destroy: (error?: Error) => void;
35+
once: (name: string, handler: EventFn) => void;
36+
};
2737

2838
interface Fetcher {
2939
getInstanceMetadata({
@@ -45,7 +55,7 @@ interface CloudSQLInstanceOptions {
4555
ipType: IpAddressTypes;
4656
limitRateInterval?: number;
4757
sqlAdminFetcher: Fetcher;
48-
checkDomainInterval?: number
58+
checkDomainInterval?: number;
4959
}
5060

5161
interface RefreshResult {
@@ -78,11 +88,12 @@ export class CloudSQLInstance {
7888
// The ongoing refresh promise is referenced by the `next` property
7989
private next?: Promise<RefreshResult>;
8090
private scheduledRefreshID?: ReturnType<typeof setTimeout> | null = undefined;
81-
private checkDomainID?: ReturnType<typeof setTimeout> | null = undefined;
91+
private checkDomainID?: ReturnType<typeof setInterval> | null = undefined;
8292
/* eslint-disable-next-line @typescript-eslint/no-explicit-any */
8393
private throttle?: any;
8494
private closed = false;
85-
private checkDomainInterval:number;
95+
private checkDomainInterval: number;
96+
private sockets = new Set<ClosableSocket>();
8697

8798
public readonly instanceInfo: InstanceConnectionInfo;
8899
public ephemeralCert?: SslCert;
@@ -105,7 +116,7 @@ export class CloudSQLInstance {
105116
this.ipType = options.ipType || IpAddressTypes.PUBLIC;
106117
this.limitRateInterval = options.limitRateInterval || 30 * 1000; // 30 seconds
107118
this.sqlAdminFetcher = options.sqlAdminFetcher;
108-
this.checkDomainInterval = options.checkDomainInterval || 30* 1000;
119+
this.checkDomainInterval = options.checkDomainInterval || 30 * 1000;
109120
}
110121

111122
// p-throttle library has to be initialized in an async scope in order to
@@ -137,9 +148,9 @@ export class CloudSQLInstance {
137148
// in progress, the promise will resolve immediately.
138149
refreshComplete(): Promise<void> {
139150
return new Promise(resolve => {
140-
// setTimeout() to yield execution to allow other refresh background
151+
// setImmediate() to yield execution to allow other refresh background
141152
// tasks to start.
142-
setTimeout(() => {
153+
setImmediate(() => {
143154
if (this.next) {
144155
// If there is a refresh promise in progress, resolve this promise
145156
// when the refresh is complete.
@@ -148,7 +159,7 @@ export class CloudSQLInstance {
148159
// Else resolve immediately.
149160
resolve();
150161
}
151-
}, 0);
162+
});
152163
});
153164
}
154165

@@ -158,10 +169,13 @@ export class CloudSQLInstance {
158169
this.next = undefined;
159170
return Promise.reject('closed');
160171
}
161-
if(this?.instanceInfo?.domainName && ! this.checkDomainID){
162-
this.checkDomainID = setInterval(()=>{
163-
this.checkDomainChanged()
164-
}, this.checkDomainInterval || 30*1000)
172+
if (this?.instanceInfo?.domainName && !this.checkDomainID) {
173+
this.checkDomainID = setInterval(
174+
() => {
175+
this.checkDomainChanged();
176+
},
177+
this.checkDomainInterval || 30 * 1000
178+
);
165179
}
166180

167181
const currentRefreshId = this.scheduledRefreshID;
@@ -307,7 +321,7 @@ export class CloudSQLInstance {
307321
// If refresh has not yet started, then cancel the setTimeout
308322
if (this.scheduledRefreshID) {
309323
clearTimeout(this.scheduledRefreshID);
310-
this.scheduledRefreshID = null
324+
this.scheduledRefreshID = null;
311325
}
312326
}
313327

@@ -323,9 +337,17 @@ export class CloudSQLInstance {
323337
close(): void {
324338
this.closed = true;
325339
this.cancelRefresh();
326-
if(this.checkDomainID){
340+
if (this.checkDomainID) {
327341
clearInterval(this.checkDomainID);
328-
this.checkDomainID = null
342+
this.checkDomainID = null;
343+
}
344+
for (const socket of this.sockets) {
345+
socket.destroy(
346+
new CloudSQLConnectorError({
347+
code: 'ERRCLOSED',
348+
message: 'The connector was closed.',
349+
})
350+
);
329351
}
330352
}
331353

@@ -346,4 +368,17 @@ export class CloudSQLInstance {
346368
this.close();
347369
}
348370
}
371+
addSocket(socket: ClosableSocket) {
372+
if (!this.instanceInfo.domainName) {
373+
// This was not connected by domain name. Ignore all sockets.
374+
return;
375+
}
376+
377+
// Add the socket to the list
378+
this.sockets.add(socket);
379+
// When the socket is closed, remove it.
380+
socket.once('closed', () => {
381+
this.sockets.delete(socket);
382+
});
383+
}
349384
}

src/connector.ts

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,8 @@ export declare interface ConnectionOptions {
4444
ipType?: IpAddressTypes;
4545
instanceConnectionName: string;
4646
domainName?: string;
47-
checkDomainInterval?: number
48-
limitRateInterval?: number
47+
checkDomainInterval?: number;
48+
limitRateInterval?: number;
4949
}
5050

5151
export declare interface SocketConnectionOptions extends ConnectionOptions {
@@ -143,7 +143,7 @@ class CloudSQLInstanceMap extends Map<string, CacheEntry> {
143143
ipType: opts.ipType || IpAddressTypes.PUBLIC,
144144
limitRateInterval: opts.limitRateInterval || 30 * 1000, // 30 sec
145145
sqlAdminFetcher: this.sqlAdminFetcher,
146-
checkDomainInterval: opts.checkDomainInterval
146+
checkDomainInterval: opts.checkDomainInterval,
147147
});
148148
this.set(this.cacheKey(opts), new CacheEntry(promise));
149149

@@ -248,6 +248,9 @@ export class Connector {
248248
tlsSocket.once('secureConnect', async () => {
249249
cloudSqlInstance.setEstablishedConnection();
250250
});
251+
252+
cloudSqlInstance.addSocket(tlsSocket);
253+
251254
return tlsSocket;
252255
}
253256

test/cloud-sql-instance.ts

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ t.test('CloudSQLInstance', async t => {
6767
instanceConnectionName: 'my-project:us-east1:my-instance',
6868
sqlAdminFetcher: fetcher,
6969
});
70-
t.after(() =>instance.close())
70+
t.after(() => instance.close());
7171

7272
t.same(
7373
instance.ephemeralCert.cert,
@@ -116,7 +116,7 @@ t.test('CloudSQLInstance', async t => {
116116
limitRateInterval: 50,
117117
},
118118
});
119-
t.after(() =>instance.close())
119+
t.after(() => instance.close());
120120

121121
await t.rejects(
122122
instance.refresh(),
@@ -137,7 +137,7 @@ t.test('CloudSQLInstance', async t => {
137137
limitRateInterval: 50,
138138
},
139139
});
140-
t.after(() =>instance.close())
140+
t.after(() => instance.close());
141141
instance.refresh = () => {
142142
if (refreshCount === 2) {
143143
const end = Date.now();
@@ -180,7 +180,7 @@ t.test('CloudSQLInstance', async t => {
180180
limitRateInterval: 50,
181181
},
182182
});
183-
t.after(() =>instance.close())
183+
t.after(() => instance.close());
184184
await (() =>
185185
new Promise((res): void => {
186186
let refreshCount = 0;
@@ -237,7 +237,7 @@ t.test('CloudSQLInstance', async t => {
237237
limitRateInterval: 50,
238238
},
239239
});
240-
t.after(() =>instance.close())
240+
t.after(() => instance.close());
241241
await (() =>
242242
new Promise((res): void => {
243243
let refreshCount = 0;
@@ -268,7 +268,7 @@ t.test('CloudSQLInstance', async t => {
268268
limitRateInterval: 50,
269269
},
270270
});
271-
t.after(() =>instance.close())
271+
t.after(() => instance.close());
272272

273273
await instance.refresh();
274274

@@ -308,7 +308,7 @@ t.test('CloudSQLInstance', async t => {
308308
limitRateInterval: 50,
309309
},
310310
});
311-
t.after(() =>instance.close())
311+
t.after(() => instance.close());
312312

313313
let cancelRefreshCalled = false;
314314
let refreshCalled = false;
@@ -347,7 +347,7 @@ t.test('CloudSQLInstance', async t => {
347347
sqlAdminFetcher: fetcher,
348348
},
349349
});
350-
t.after(() =>instance.close())
350+
t.after(() => instance.close());
351351

352352
const start = Date.now();
353353
// starts regular refresh cycle
@@ -387,7 +387,7 @@ t.test('CloudSQLInstance', async t => {
387387
sqlAdminFetcher: fetcher,
388388
},
389389
});
390-
t.after(()=> instance.close())
390+
t.after(() => instance.close());
391391
const start = Date.now();
392392
// starts out refresh logic
393393
let refreshCount = 1;
@@ -435,7 +435,7 @@ t.test('CloudSQLInstance', async t => {
435435
limitRateInterval: 50,
436436
},
437437
});
438-
t.after(() =>instance.close())
438+
t.after(() => instance.close());
439439

440440
// starts a new refresh cycle but do not await on it
441441
instance.refresh();
@@ -463,7 +463,7 @@ t.test('CloudSQLInstance', async t => {
463463
limitRateInterval: 50,
464464
},
465465
});
466-
t.after(() =>instance.close())
466+
t.after(() => instance.close());
467467

468468
// simulates an ongoing instance, already has data
469469
await instance.refresh();
@@ -500,7 +500,7 @@ t.test('CloudSQLInstance', async t => {
500500
limitRateInterval: 50,
501501
},
502502
});
503-
t.after(() =>instance.close())
503+
t.after(() => instance.close());
504504

505505
await instance.refresh();
506506
instance.setEstablishedConnection();
@@ -536,7 +536,7 @@ t.test('CloudSQLInstance', async t => {
536536
limitRateInterval: 50,
537537
},
538538
});
539-
t.after(() =>instance.close())
539+
t.after(() => instance.close());
540540

541541
await instance.refresh();
542542
instance.setEstablishedConnection();
@@ -605,7 +605,7 @@ t.test('CloudSQLInstance', async t => {
605605
limitRateInterval: 0,
606606
},
607607
});
608-
t.after(() =>instance.close())
608+
t.after(() => instance.close());
609609
await (() =>
610610
new Promise((res): void => {
611611
let refreshCount = 0;
@@ -629,5 +629,6 @@ t.test('CloudSQLInstance', async t => {
629629
instance.refresh();
630630
instance.setEstablishedConnection();
631631
}))();
632-
});
632+
}
633+
);
633634
});

test/connector.ts

Lines changed: 41 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -639,7 +639,6 @@ t.test('Connector by domain resolves and creates instance', async t => {
639639
connector.close();
640640
});
641641

642-
643642
t.test('Connector by domain resolves and creates instance', async t => {
644643
const th = setupConnectorModule(t);
645644
const connector = new th.Connector();
@@ -681,38 +680,47 @@ t.test('Connector by domain resolves and creates instance', async t => {
681680
connector.close();
682681
});
683682

683+
t.test(
684+
'Connector checks if name changes in background and closes connector',
685+
async t => {
686+
const th = setupConnectorModule(t);
687+
const connector = new th.Connector();
688+
t.after(() => {
689+
connector.close();
690+
});
684691

685-
t.test('Connector checks if name changes in background and closes connector', async t => {
686-
const th = setupConnectorModule(t);
687-
const connector = new th.Connector();
688-
t.after(() => {
689-
connector.close();
690-
});
691-
692-
// Get options loads the instance
693-
await connector.getOptions({
694-
ipType: 'PUBLIC',
695-
authType: 'PASSWORD',
696-
domainName: 'db.example.com',
697-
checkDomainInterval: 10, // 10ms for testing
698-
});
699-
700-
// Ensure there is only one entry.
701-
t.same(connector.instances.size, 1);
702-
const oldInstance = connector.instances.get(
703-
'db.example.com-PASSWORD-PUBLIC'
704-
).instance;
705-
t.same(oldInstance.instanceInfo.domainName, 'db.example.com');
706-
t.same(oldInstance.instanceInfo.instanceId, 'instance');
707-
708-
// getOptions after DNS response changes closes the old instance
709-
// and loads a new one.
710-
th.resolveTxtResponse = 'project:region2:instance2';
711-
await new Promise((res) =>{
712-
setTimeout(res, 50);
713-
})
692+
// Get options loads the instance
693+
await connector.getOptions({
694+
ipType: 'PUBLIC',
695+
authType: 'PASSWORD',
696+
domainName: 'db.example.com',
697+
checkDomainInterval: 10, // 10ms for testing
698+
});
714699

715-
t.same(oldInstance.isClosed(), true, 'old instance is closed');
700+
// Ensure there is only one entry.
701+
t.same(connector.instances.size, 1);
702+
const oldInstance = connector.instances.get(
703+
'db.example.com-PASSWORD-PUBLIC'
704+
).instance;
705+
t.same(oldInstance.instanceInfo.domainName, 'db.example.com');
706+
t.same(oldInstance.instanceInfo.instanceId, 'instance');
707+
708+
// add a mock socket to the old instance
709+
const mockSocket = {
710+
destroyed: false,
711+
once(e:string, fn:()=>void) {},
712+
destroy(){this.destroyed = true}
713+
}
714+
oldInstance.addSocket(mockSocket);
715+
716+
// getOptions after DNS response changes closes the old instance
717+
// and loads a new one.
718+
th.resolveTxtResponse = 'project:region2:instance2';
719+
await new Promise(res => {
720+
setTimeout(res, 50);
721+
});
716722

717-
connector.close();
718-
});
723+
t.same(oldInstance.isClosed(), true, 'old instance is closed');
724+
t.same(mockSocket.destroyed, true, 'old instance closed its sockets');
725+
}
726+
);

0 commit comments

Comments
 (0)