Skip to content

Commit 26a3b10

Browse files
authored
Merge pull request #37 from hapinessjs/next
version(v1.4.3)
2 parents 829c9e3 + 35e134d commit 26a3b10

6 files changed

Lines changed: 33 additions & 5 deletions

File tree

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -453,6 +453,8 @@ To set up your development environment:
453453
[Back to top](#table-of-contents)
454454

455455
## Change History
456+
* v1.4.3 (2018-08-20)
457+
* Emit RETRY_LIMIT_EXCEEDED error on ConnectionManager
456458
* v1.4.2 (2018-06-11)
457459
* Do not retry to connect if closing server
458460
* v1.4.1 (2018-05-31)

package-lock.json

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@hapiness/rabbitmq",
3-
"version": "1.4.2",
3+
"version": "1.4.3",
44
"description": "Hapiness module for rabbitmq",
55
"main": "commonjs/index.js",
66
"types": "index.d.ts",

src/module/managers/connection-manager.ts

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ import { ChannelStore } from './channel-store';
88
import { ChannelManager } from './channel-manager';
99

1010
const debug = require('debug')('hapiness:rabbitmq');
11+
const retryLimitErr: any = new Error('Retry limit exceeded');
12+
retryLimitErr.code = 'RETRY_LIMIT_EXCEEDED';
1113

1214
export class ConnectionManager extends EventEmitter {
1315
private _connection: Connection;
@@ -99,7 +101,7 @@ export class ConnectionManager extends EventEmitter {
99101
.delay(this._options.retry.delay)
100102
.takeWhile((attempts) => attempts < this._options.retry.maximum_attempts && !this._closingServer)
101103
.take(this._options.retry.maximum_attempts)
102-
.concat(Observable.throw(new Error('Retry limit exceeded')))
104+
.concat(Observable.throw(retryLimitErr))
103105
});
104106
}
105107

@@ -108,7 +110,6 @@ export class ConnectionManager extends EventEmitter {
108110
return Observable.of(null);
109111
}
110112

111-
112113
this._closingServer = false;
113114
this._isConnecting = true;
114115

@@ -117,6 +118,10 @@ export class ConnectionManager extends EventEmitter {
117118
this.emitEvent('connecting');
118119
const obs = this.openConnection();
119120
return obs
121+
.catch(err => {
122+
this.emitEvent('error', err);
123+
return Observable.throw(err);
124+
})
120125
.flatMap(con => {
121126
debug('connected, creating default channel ...');
122127
this._connection = con;

src/module/rabbitmq.extension.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,9 @@ export class RabbitMQExt implements OnExtensionLoad, OnModuleInstantiated, OnShu
6161
connection
6262
.connect()
6363
.flatMap(() => RegisterAnnotations.bootstrap(module, connection))
64-
.subscribe(() => {}, err => errorHandler(err));
64+
.subscribe(() => {}, err => {
65+
errorHandler(err);
66+
});
6567
});
6668

6769
return RegisterAnnotations.bootstrap(module, connection);

test/unit/managers/connection.test.ts

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,4 +136,23 @@ export class ConnectionUnitTest {
136136
instance.setDefaultPrefetch(5);
137137
unit.number(instance.getDefaultPrefetch()).is(5);
138138
}
139+
140+
@test(' - Test openConnection error')
141+
testOpenConnectionError(done) {
142+
const instance = new ConnectionManager({ retry: { delay: 100, maximum_attempts: 1 } });
143+
instance['_connection'] = <any>new RabbitConnectionMock();
144+
unit.stub(instance, '_connect').returns(Promise.reject(Observable.throw(new Error('Woopsie'))));
145+
146+
Observable.forkJoin([
147+
Observable.fromEvent(instance, 'error').map(err => {
148+
unit.object(err).isInstanceOf(Error).hasProperty('code', 'RETRY_LIMIT_EXCEEDED');
149+
}),
150+
instance.connect()
151+
.catch(err => {
152+
unit.object(err).isInstanceOf(Error).hasProperty('code', 'RETRY_LIMIT_EXCEEDED');
153+
return Observable.throw(null);
154+
})
155+
.flatMap(() => Observable.throw(new Error('Cannot be here')))
156+
]).subscribe(() => done(), err => done(err));
157+
}
139158
}

0 commit comments

Comments
 (0)