Skip to content

Commit dc9cbd2

Browse files
authored
Merge pull request #31 from hapinessjs/next
release(version): v1.4.0
2 parents aa935e1 + a97aa8a commit dc9cbd2

4 files changed

Lines changed: 16 additions & 5 deletions

File tree

src/module/managers/channel-manager.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ export class ChannelManager extends EventEmitter {
3232
return this._global;
3333
}
3434

35-
public canCreateChannel() {
35+
public canCreateChannel(): boolean {
3636
return this._connectionManager.isConnected() && !this._connectionManager.isConnecting() && !MessageStore.isShutdownRunning();
3737
}
3838

src/module/managers/connection-manager.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ export class ConnectionManager extends EventEmitter {
2121
private _connect: typeof connect;
2222
private _defaultPrefetch: number;
2323
private _channelStore: ChannelStore;
24+
private _isSIGTERMReceived: boolean;
2425

2526
constructor(config?: RabbitMQConfigConnection) {
2627
super();
@@ -51,6 +52,10 @@ export class ConnectionManager extends EventEmitter {
5152
this._uri = `amqp://${credentials}${host}:${port}${vhost}${params}`;
5253
}
5354

55+
// Will block new connection if SIGTERM is received
56+
process.once('SIGTERM', () => this._isSIGTERMReceived = true);
57+
process.once('SIGINT', () => this._isSIGTERMReceived = true);
58+
5459
this.setDefaultPrefetch(this._options.default_prefetch);
5560

5661
// Create a channel store for this connection
@@ -104,6 +109,10 @@ export class ConnectionManager extends EventEmitter {
104109
return Observable.of(null);
105110
}
106111

112+
if (this._isSIGTERMReceived) {
113+
return Observable.of(null);
114+
}
115+
107116
this._isConnecting = true;
108117

109118
debug('Connecting', this._uri);

src/module/managers/message-store.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import { Observable } from 'rxjs';
44
import { Channel as ChannelInterface } from 'amqplib';
55
import { RabbitMessage } from '../interfaces/rabbit-message';
66
import { Config } from '@hapiness/config';
7+
import { ConnectionManager } from './connection-manager';
78

89
const debug = require('debug')('hapiness:rabbitmq');
910

@@ -78,7 +79,7 @@ export class MessageStoreClass extends EventEmitter {
7879
this.shutdown_running = false;
7980
}
8081

81-
shutdown(connection): Observable<any> {
82+
shutdown(connection: ConnectionManager): Observable<any> {
8283
if (this.shutdown_running) {
8384
debug('shutdown already running');
8485
return Observable.of(null);

src/module/managers/queue-manager.ts

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ export class QueueManager {
107107
dispatcher,
108108
options,
109109
defaultDispatch
110-
}) {
110+
}): Observable<Replies.Consume> {
111111
const consumerChannel = this._ch.getChannel();
112112
return Observable.fromPromise(
113113
this._ch.getChannel().consume(this.getName(), message => {
@@ -150,6 +150,8 @@ export class QueueManager {
150150
)
151151
.do(res => MessageStore.addConsumer(consumerChannel, res.consumerTag))
152152
.do(res => {
153+
// If channel is closed, has en error or is reconnected the consumerTag is not valid
154+
// and needs to be removed
153155
['close', 'error', 'reconnected'].forEach(event => this._ch.once(event, () => {
154156
debug('removing consumer', res.consumerTag);
155157
MessageStore.removeConsumer(res.consumerTag);
@@ -161,8 +163,7 @@ export class QueueManager {
161163
message: RabbitMessage,
162164
{ storeMessage, options, err, consumerChannel }:
163165
{ storeMessage: StoreMessage,
164-
// options: { errorHandler: (err: Error, message: RabbitMessage, ch: ChannelInterface) => {} },
165-
options: any,
166+
options: { errorHandler: (err: Error, message: RabbitMessage, ch: ChannelInterface) => {} },
166167
err: Error, consumerChannel: ChannelInterface }
167168
) {
168169
if (MessageStore.isShutdownRunning()) {

0 commit comments

Comments
 (0)