Skip to content

Commit 8fadf52

Browse files
author
mshaposhnikov@w3-edge.com
committed
dont send expired messages
1 parent 6e19e5f commit 8fadf52

2 files changed

Lines changed: 63 additions & 29 deletions

File tree

client.js

Lines changed: 26 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -42,10 +42,11 @@ class Client {
4242
this._clientId = p.clientId || 1;
4343

4444
// build protocol bytes object
45+
const timeoutMs = p.timeoutMs || 30000;
4546
this._protocolBytes = new ProtocolBytes();
4647
this._rateLimiter = new RateLimiter((data) => {
4748
return this._protocolBytes.sendFieldset(data);
48-
}, 45, 1000, 5000);
49+
}, 45, 1000, timeoutMs);
4950

5051
this._protocolBytes.on('message_fieldset', (o) => {
5152
this._onMessageFieldset(o);
@@ -68,7 +69,7 @@ class Client {
6869

6970
// attach messages handler
7071
this._incomeHandler = new IncomeFieldsetHandler({
71-
timeoutMs: p.timeoutMs || 30000,
72+
timeoutMs,
7273
eventEmitter: this._emitter
7374
});
7475

@@ -97,6 +98,12 @@ class Client {
9798

9899

99100

101+
_sendFieldsetExpirable(fields) {
102+
this._rateLimiter.runExpirable(fields);
103+
}
104+
105+
106+
100107
_sendStartApi() {
101108
const START_API = 71;
102109
const VERSION = 2;
@@ -126,7 +133,7 @@ class Client {
126133

127134
async getCurrentTime() {
128135
/* Asks the current system time on the server side. */
129-
this._sendFieldsetRateLimited([
136+
this._sendFieldsetExpirable([
130137
OutcomeMessageType.REQ_CURRENT_TIME,
131138
1 /* VERSION */
132139
]);
@@ -136,11 +143,11 @@ class Client {
136143
}
137144

138145

146+
139147
/*########################################################################
140148
################## Market Data
141149
##########################################################################*/
142150

143-
144151
/*
145152
Starts to stream market data
146153
see reqMktData for parameters
@@ -179,12 +186,13 @@ class Client {
179186
p.genericTickList = p.genericTickList || '';
180187
p.snapshot = true;
181188
p.regulatorySnapshot = p.regulatorySnapshot || false;
182-
this._sendFieldsetRateLimited(request_mktData(this._serverVersion, p));
189+
this._sendFieldsetExpirable(request_mktData(this._serverVersion, p));
183190

184191
return await this._incomeHandler.awaitRequestId(p.requestId);
185192
}
186193

187194

195+
188196
async reqMarketDataType(marketDataType) {
189197
/* The API can receive frozen market data from Trader
190198
Workstation. Frozen market data is the last data recorded in our system.
@@ -544,7 +552,7 @@ class Client {
544552
async cancelOrder(orderId) {
545553
/* cancel an order. */
546554

547-
this._sendFieldsetRateLimited([
555+
this._sendFieldsetExpirable([
548556
OutcomeMessageType.CANCEL_ORDER,
549557
1 /* VERSION */,
550558
orderId
@@ -563,7 +571,7 @@ class Client {
563571
orderId will be generated. This association will persist over multiple
564572
API and TWS sessions. */
565573

566-
this._sendFieldsetRateLimited([
574+
this._sendFieldsetExpirable([
567575
OutcomeMessageType.REQ_OPEN_ORDERS,
568576
1 /* VERSION */
569577
]);
@@ -600,7 +608,7 @@ class Client {
600608
Note: No association is made between the returned orders and the
601609
requesting client. */
602610

603-
this._sendFieldsetRateLimited([
611+
this._sendFieldsetExpirable([
604612
OutcomeMessageType.REQ_ALL_OPEN_ORDERS,
605613
1 /* VERSION */
606614
]);
@@ -752,7 +760,7 @@ class Client {
752760

753761
const VERSION = 1;
754762

755-
this._sendFieldsetRateLimited([OutcomeMessageType.REQ_POSITIONS, VERSION]);
763+
this._sendFieldsetExpirable([OutcomeMessageType.REQ_POSITIONS, VERSION]);
756764
return await this._incomeHandler.awaitMessageType(IncomeMessageType.POSITION_END);
757765
}
758766

@@ -1063,7 +1071,7 @@ class Client {
10631071
flds.push(contract.secId);
10641072
}
10651073

1066-
this._sendFieldsetRateLimited(flds);
1074+
this._sendFieldsetExpirable(flds);
10671075
return await this._incomeHandler.awaitRequestId(requestId);
10681076
}
10691077

@@ -1477,7 +1485,7 @@ class Client {
14771485
flds.push(chartOptionsStr);
14781486
}
14791487

1480-
this._sendFieldsetRateLimited(flds);
1488+
this._sendFieldsetExpirable(flds);
14811489
return await this._incomeHandler.awaitRequestId(requestId);
14821490
}
14831491

@@ -1502,10 +1510,7 @@ class Client {
15021510

15031511
async getHeadTimeStamp(p) {
15041512
/*
1505-
contract:,
1506-
whatToShow: str,
1507-
useRth: int,
1508-
formatDate: int
1513+
whatToShow: str, useRth: int, formatDate: int
15091514
15101515
Note that formatData parameter affects intraday bars only
15111516
1-day bars always return with date in YYYYMMDD format
@@ -1542,7 +1547,7 @@ class Client {
15421547
formatDate
15431548
];
15441549

1545-
this._sendFieldsetRateLimited(flds);
1550+
this._sendFieldsetExpirable(flds);
15461551
return await this._incomeHandler.awaitRequestId(requestId);
15471552
}
15481553

@@ -1558,7 +1563,7 @@ class Client {
15581563

15591564

15601565

1561-
async reqHistogramData(p) {
1566+
async getHistogramData(p) {
15621567
/*
15631568
contract: Contract,
15641569
useRth: bool,
@@ -1573,7 +1578,7 @@ class Client {
15731578
throw new Error("It does not support histogram requests.");
15741579
}
15751580

1576-
this._sendFieldsetRateLimited([
1581+
this._sendFieldsetExpirable([
15771582
OutcomeMessageType.REQ_HISTOGRAM_DATA,
15781583
requestId,
15791584
contract.conId,
@@ -1666,7 +1671,7 @@ class Client {
16661671

16671672
flds.push(miscOptionsString);
16681673

1669-
this._sendFieldsetRateLimited(flds);
1674+
this._sendFieldsetExpirable(flds);
16701675
return await this._incomeHandler.awaitRequestId(requestId);
16711676
}
16721677

@@ -1757,7 +1762,7 @@ class Client {
17571762
}
17581763
}
17591764

1760-
this._sendFieldsetRateLimited(flds);
1765+
this._sendFieldsetExpirable(flds);
17611766
return await this._incomeHandler.awaitRequestId(requestId);
17621767
}
17631768

@@ -2190,7 +2195,7 @@ class Client {
21902195
throw new Error("It does not support security definition option request.");
21912196
}
21922197

2193-
this._sendFieldsetRateLimited([OutcomeMessageType.REQ_SEC_DEF_OPT_PARAMS,
2198+
this._sendFieldsetExpirable([OutcomeMessageType.REQ_SEC_DEF_OPT_PARAMS,
21942199
requestId,
21952200
underlyingSymbol,
21962201
futFopExchange,

rate-limiter.js

Lines changed: 37 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,45 @@
1+
import util from 'util';
2+
const debuglog = util.debuglog('ib-tws-api');
3+
4+
15

26
export default class RateLimiter {
3-
constructor(workload, callsPerSlot, slotIntervalMs, maxQueueLength = Infinity) {
7+
constructor(workload, callsPerSlot, slotIntervalMs, timeoutMs) {
48
this._slotIntervalMs = slotIntervalMs;
59
this._callsPerSlot = callsPerSlot;
6-
this._maxQueueLength = maxQueueLength;
10+
this._timeoutMs = timeoutMs;
711
this._queue = [];
812
this._workload = workload;
913
}
1014

1115

1216

1317
run(item) {
14-
if (this.queueLength >= this.maxQueueLength) {
15-
throw new Error('Too many requests queued');
16-
}
18+
return new Promise((resolve, reject) => {
19+
this._queue.push({
20+
item,
21+
resolve,
22+
reject
23+
});
1724

25+
if (this._queue.length == 1) {
26+
process.nextTick(() => {
27+
this._processQueue();
28+
});
29+
}
30+
});
31+
}
32+
33+
34+
35+
runExpirable(item) {
1836
return new Promise((resolve, reject) => {
19-
this._queue.push({ item, resolve, reject });
37+
this._queue.push({
38+
item,
39+
resolve,
40+
reject,
41+
expireDate: Date.now() + this._timeoutMs
42+
});
2043

2144
if (this._queue.length == 1) {
2245
process.nextTick(() => {
@@ -43,9 +66,15 @@ export default class RateLimiter {
4366

4467
if (this._slot_remaining > 0) {
4568
const i = this._queue.shift();
46-
this._slot_remaining--;
4769

48-
Promise.resolve(this._workload(i.item)).then(i.resolve, i.reject);
70+
if (i.expireDate && i.expireDate < Date.now()) {
71+
// expired requests just ignored since timeout handled by
72+
// IncomeFieldsetHandler.awaitRequestId
73+
debuglog('RateLimiter: expired item ignored ' + i.item);
74+
} else {
75+
this._slot_remaining--;
76+
Promise.resolve(this._workload(i.item)).then(i.resolve, i.reject);
77+
}
4978
} else {
5079
if (this._timer) {
5180
clearTimeout(this._timer);

0 commit comments

Comments
 (0)