Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
.PHONY: build lint test test-all

build:
npm run compile

lint:
npm run lint

test: build lint
npm test

test-all: build lint
env HAZELCAST_ENTERPRISE_KEY=1 npm test

2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,6 @@ This command will only run the tests matching the pattern. The pattern can be a

## Copyright

Copyright (c) 2008-2022, Hazelcast, Inc. All Rights Reserved.
Copyright (c) 2008-2026, Hazelcast, Inc. All Rights Reserved.

Visit [www.hazelcast.com](http://www.hazelcast.com) for more information.
10 changes: 5 additions & 5 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "hazelcast-client",
"version": "5.3.0",
"version": "5.6.0",
"description": "Hazelcast - a real-time stream processing platform - Node.js Client",
"main": "lib/index.js",
"types": "lib/index.d.ts",
Expand All @@ -9,7 +9,7 @@
"lib/**/*.d.ts"
],
"dependencies": {
"@types/long": "4.0.0",
"@types/long": "4.0.2",
"long": "4.0.0"
},
"devDependencies": {
Expand All @@ -23,9 +23,9 @@
"eslint-plugin-mocha": "~9.0.0",
"husky": "~6.0.0",
"jsonschema": "~1.5.0",
"markdown-link-check": "~3.13.7",
"markdown-link-check": "^3.14.2",
"markdownlint-cli": "~0.32.2",
"mocha": "~9.2.2",
"mocha": "^11.7.5",
"mousse": "~0.3.1",
"nyc": "~15.1.0",
"path-exists-cli": "~2.0.0",
Expand All @@ -41,7 +41,7 @@
"yargs": "~17.5.1"
},
"engines": {
"node": ">=10.4.0"
"node": ">=20.20.0"
},
"scripts": {
"clean": "rimraf lib *.jar *.log *.xml coverage",
Expand Down
5 changes: 3 additions & 2 deletions src/invocation/InvocationService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -684,6 +684,7 @@ export class InvocationService {
+ invocation.request.getCorrelationId() + ') reached its deadline.', error));
return true;
}
return false;
}

private registerInvocation(invocation: Invocation): void {
Expand All @@ -703,7 +704,7 @@ export class InvocationService {
deregisterInvocation(correlationId: number): void {
this.pending.delete(correlationId);
}

/**
* Returns `true` if we need to check the urgent invocations, by
* examining the local registry of the schema service.
Expand Down Expand Up @@ -743,7 +744,7 @@ export class InvocationService {
+ 'data and it is not safe to invoke it when the client is not '
+ 'yet initialized on the cluster');
}

return null;
}
}
43 changes: 35 additions & 8 deletions src/network/Connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ export class PipelinedWriter extends Writer {
private canWrite = true;
// reusable buffer for coalescing
private readonly coalesceBuf: Buffer;
private readonly drainListener: () => void;

constructor(
private readonly socket: net.Socket,
Expand All @@ -72,11 +73,13 @@ export class PipelinedWriter extends Writer {
super();
this.coalesceBuf = Buffer.allocUnsafe(threshold);

// write queued items on drain event
socket.on('drain', () => {
this.drainListener = () => {
this.canWrite = true;
this.schedule();
});
}

// write queued items on drain event
socket.on('drain', this.drainListener);
}

write(message: ClientMessage, resolver: DeferredPromise<void>): void {
Expand All @@ -94,6 +97,7 @@ export class PipelinedWriter extends Writer {
}
this.error = this.makeIOError(error);
this.canWrite = false;
this.socket.off('drain', this.drainListener);
// If we pass an error to destroy, an unhandled error will be thrown because we don't handle the error event
// So we don't pass anything to the socket. It is internal anyway.
this.socket.destroy();
Expand Down Expand Up @@ -214,6 +218,8 @@ export class DirectWriter extends Writer {
}

close(cause: Error): void {
// Remove all listeners from this emitter
this.removeAllListeners('write');
this.socket.destroy();
}
}
Expand Down Expand Up @@ -351,6 +357,8 @@ export class Connection {
private readonly writer: Writer;
private readonly reader: ClientMessageReader;
private readonly fragmentedMessageHandler: FragmentedClientMessageHandler;
private dataListener: ((buffer: Buffer) => void) | null = null;
private writeListener: (() => void) | null = null;

constructor(
private readonly connectionManager: ConnectionManager,
Expand All @@ -374,9 +382,10 @@ export class Connection {
this.connectedServerVersion = BuildInfo.UNKNOWN_VERSION_ID;
this.writer = enablePipelining ? new PipelinedWriter(this.socket, pipeliningThreshold, this.incrementBytesWrittenFn)
: new DirectWriter(this.socket, this.incrementBytesWrittenFn);
this.writer.on('write', () => {
this.writeListener = () => {
this.lastWriteTimeMillis = Date.now();
});
}
this.writer.on('write', this.writeListener);
this.reader = new ClientMessageReader();
this.fragmentedMessageHandler = new FragmentedClientMessageHandler(this.logger);
}
Expand Down Expand Up @@ -437,11 +446,28 @@ export class Connection {

this.logClose();

// Remove socket listeners before closing
this.removeListeners();

this.writer.close(this.closedCause ? this.closedCause : new Error(reason ? reason : 'Connection closed'));

this.connectionManager.onConnectionClose(this);
}

/**
* Removes all registered listeners from the socket and writer.
*/
private removeListeners(): void {
if (this.dataListener !== null) {
this.socket.off('data', this.dataListener);
this.dataListener = null;
}
if (this.writeListener !== null) {
this.writer.off('write', this.writeListener);
this.writeListener = null;
}
}

isAlive(): boolean {
return this.closedTime === 0;
}
Expand Down Expand Up @@ -483,7 +509,7 @@ export class Connection {
* @param callback
*/
registerResponseCallback(callback: ClientMessageHandler): void {
this.socket.on('data', (buffer: Buffer) => {
this.dataListener = (buffer: Buffer) => {
this.lastReadTimeMillis = Date.now();
this.reader.append(buffer);
let clientMessage = this.reader.read();
Expand All @@ -496,14 +522,15 @@ export class Connection {
clientMessage = this.reader.read();
}
this.incrementBytesReadFn(buffer.length);
});
}
this.socket.on('data', this.dataListener);
}

setClusterUuid(uuid: UUID): void {
this.clusterUuid = uuid;
}

getClusterUuid(): UUID {
getClusterUuid(): UUID {
return this.clusterUuid;
}

Expand Down
4 changes: 2 additions & 2 deletions src/network/ConnectionManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -665,11 +665,11 @@ export class ConnectionManager extends EventEmitter implements MembershipListene
}, this.connectionTimeoutMillis);

socket.once('secureConnect', () => {
clearInterval(connectTimeoutTimer);
clearTimeout(connectTimeoutTimer);
connectionResolver.resolve(socket);
});
socket.once('error', (err) => {
clearInterval(connectTimeoutTimer);
clearTimeout(connectTimeoutTimer);
connectionResolver.reject(err);
});

Expand Down
Loading