Skip to content

Commit 09ca6df

Browse files
Merge pull request #519 from crypto-chassis/fix-threading
Fix threading
2 parents 6b023a3 + 450262f commit 09ca6df

53 files changed

Lines changed: 159 additions & 250 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

README.md

Lines changed: 12 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
# Some breaking changes introduced
2-
* Rename "PRICE_TIMES_QUANTITY_MIN" to "QUOTE_QUANTITY_MIN". Rename "CUMULATIVE_FILLED_PRICE_TIMES_QUANTITY" to "CUMULATIVE_FILLED_QUOTE_QUANTITY".
2+
* The return type of `EventHandler::processEvent` has been changed from `bool` to `void`.
3+
* Please read [Handle events in "immediate" vs. "batching" mode](#handle-events-in-immediate-vs-batching-mode).
34

45
<!-- START doctoc generated TOC please keep comment here to allow auto update -->
56

@@ -57,6 +58,8 @@
5758
<!-- END doctoc generated TOC please keep comment here to allow auto update -->
5859

5960

61+
62+
6063
# ccapi
6164
* A header-only C++ library for streaming market data and executing trades directly from cryptocurrency exchanges (i.e. the connections are between your server and the exchange server without anything in-between).
6265
* Bindings for other languages such as Python, Java, C#, Go, and Javascript are provided.
@@ -225,9 +228,8 @@ Logger* Logger::logger = nullptr; // This line is needed.
225228
226229
class MyEventHandler : public EventHandler {
227230
public:
228-
bool processEvent(const Event& event, Session* sessionPtr) override {
231+
void processEvent(const Event& event, Session* sessionPtr) override {
229232
std::cout << "Received an event:\n" + event.toStringPretty(2, 2) << std::endl;
230-
return true;
231233
}
232234
};
233235
} /* namespace ccapi */
@@ -302,7 +304,7 @@ Logger* Logger::logger = nullptr; // This line is needed.
302304
303305
class MyEventHandler : public EventHandler {
304306
public:
305-
bool processEvent(const Event& event, Session* sessionPtr) override {
307+
void processEvent(const Event& event, Session* sessionPtr) override {
306308
if (event.getType() == Event::Type::SUBSCRIPTION_STATUS) {
307309
std::cout << "Received an event of type SUBSCRIPTION_STATUS:\n" + event.toStringPretty(2, 2) << std::endl;
308310
} else if (event.getType() == Event::Type::SUBSCRIPTION_DATA) {
@@ -314,7 +316,6 @@ class MyEventHandler : public EventHandler {
314316
}
315317
}
316318
}
317-
return true;
318319
}
319320
};
320321
} /* namespace ccapi */
@@ -494,9 +495,8 @@ Logger* Logger::logger = nullptr; // This line is needed.
494495
495496
class MyEventHandler : public EventHandler {
496497
public:
497-
bool processEvent(const Event& event, Session* sessionPtr) override {
498+
void processEvent(const Event& event, Session* sessionPtr) override {
498499
std::cout << "Received an event:\n" + event.toStringPretty(2, 2) << std::endl;
499-
return true;
500500
}
501501
};
502502
} /* namespace ccapi */
@@ -592,7 +592,7 @@ Logger* Logger::logger = nullptr; // This line is needed.
592592
593593
class MyEventHandler : public EventHandler {
594594
public:
595-
bool processEvent(const Event& event, Session* sessionPtr) override {
595+
void processEvent(const Event& event, Session* sessionPtr) override {
596596
if (event.getType() == Event::Type::SUBSCRIPTION_STATUS) {
597597
std::cout << "Received an event of type SUBSCRIPTION_STATUS:\n" + event.toStringPretty(2, 2) << std::endl;
598598
auto message = event.getMessageList().at(0);
@@ -609,7 +609,6 @@ class MyEventHandler : public EventHandler {
609609
} else if (event.getType() == Event::Type::SUBSCRIPTION_DATA) {
610610
std::cout << "Received an event of type SUBSCRIPTION_DATA:\n" + event.toStringPretty(2, 2) << std::endl;
611611
}
612-
return true;
613612
}
614613
};
615614
} /* namespace ccapi */
@@ -836,7 +835,7 @@ namespace ccapi {
836835
Logger* Logger::logger = nullptr; // This line is needed.
837836
class MyEventHandler : public EventHandler {
838837
public:
839-
bool processEvent(const Event& event, Session* sessionPtr) override {
838+
void processEvent(const Event& event, Session* sessionPtr) override {
840839
if (event.getType() == Event::Type::AUTHORIZATION_STATUS) {
841840
std::cout << "Received an event of type AUTHORIZATION_STATUS:\n" + event.toStringPretty(2, 2) << std::endl;
842841
auto message = event.getMessageList().at(0);
@@ -857,7 +856,6 @@ class MyEventHandler : public EventHandler {
857856
} else if (event.getType() == Event::Type::FIX) {
858857
std::cout << "Received an event of type FIX:\n" + event.toStringPretty(2, 2) << std::endl;
859858
}
860-
return true;
861859
}
862860
};
863861
} /* namespace ccapi */
@@ -955,21 +953,16 @@ Bye
955953
#### Handle events in "immediate" vs. "batching" mode
956954

957955
In general there are 2 ways to handle events.
958-
* When a `Session` is instantiated with an `eventHandler` argument, it will handle events in immediate mode. The `processEvent` method in the `eventHandler` will be invoked immediately when an `Event` is available.
959-
* When a `Session` is instantiated without an `eventHandler` argument, it will handle events in batching mode. The evetns will be batched into an internal `Queue<Event>` and can be retrieved by
956+
* When a `Session` is instantiated with an `eventHandler` argument, it will handle events in immediate mode. The `processEvent` method in the `eventHandler` will be invoked immediately when an `Event` is available, and the invocation will run on the thread where `boost::asio::io_context` runs. When a `Session` is instantiated with an `eventHandler` and an `eventDispatcher` argument, it will also handle events in immediate mode. The `processEvent` method in the `eventHandler` will also be invoked immediately when an `Event` is available, but the invocation will run in the thread(s) provided by the `eventDispatcher` therefore not blocking the thread where `boost::asio::io_context` runs. `EventHandler`s and/or `EventDispatcher`s can be shared among different sessions. Otherwise, different sessions are independent from each other.
957+
An example can be found [here](example/src/market_data_advanced_subscription/main.cpp).
958+
* When a `Session` is instantiated without an `eventHandler` argument, it will handle events in batching mode. The events will be batched into an internal `Queue<Event>` and can be retrieved by
960959
```
961960
std::vector<Event> eventList = session.getEventQueue().purge();
962961
```
963962
An example can be found [here](example/src/market_data_advanced_subscription/main.cpp).
964963

965964
#### Thread safety
966965
* The following methods are implemented to be thread-safe: `Session::sendRequest`, `Session::subscribe`, `Session::sendRequestByFix`, `Session::subscribeByFix`, `Session::setTimer`, all public methods in `Queue`.
967-
* The `processEvent` method in the `eventHandler` is invoked on one of the internal threads in the `eventDispatcher`. A default `EventDispatcher` with 1 internal thread will be created if no `eventDispatcher` argument is provided in `Session` instantiation. To dispatch events to multiple threads, instantiate `EventDispatcher` with `numDispatcherThreads` set to be the desired number. `EventHandler`s and/or `EventDispatcher`s can be shared among different sessions. Otherwise, different sessions are independent from each other.
968-
```
969-
EventDispatcher eventDispatcher(2);
970-
Session session(sessionOptions, sessionConfigs, &eventHandler, &eventDispatcher);
971-
```
972-
An example can be found [here](example/src/market_data_advanced_subscription/main.cpp).
973966

974967
#### Enable library logging
975968

@@ -1015,7 +1008,6 @@ sessionPtr->setTimer(
10151008
* Shorten constant strings used as key names in the returned `Element` (e.g. in CmakeLists.txt `add_compile_definitions(CCAPI_BEST_BID_N_PRICE="b")`).
10161009
* Only enable the services and exchanges that you need.
10171010
* Handle events in ["batching" mode](#handle-events-in-immediate-vs-batching-mode) if your application (e.g. market data archiver) isn't latency sensitive.
1018-
* Define macro `CCAPI_USE_SINGLE_THREAD`. It reduces locking overhead for single threaded applications.
10191011

10201012
## Known Issues and Workarounds
10211013
* Kraken invalid nonce errors. Give the API key a nonce window (https://support.kraken.com/hc/en-us/articles/360001148023-What-is-a-nonce-window-). We use unix timestamp with microsecond resolution as nonce and therefore a nonce window of 500000 translates to a tolerance of 0.5 second.

binding/csharp/example/execution_management_simple_request/MainProgram.cs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,8 @@
11
class MainProgram {
22
class MyEventHandler : ccapi.EventHandler {
3-
public override bool ProcessEvent(ccapi.Event event_, ccapi.Session session) {
3+
public override void ProcessEvent(ccapi.Event event_, ccapi.Session session) {
44
System.Console.WriteLine(string.Format("Received an event:\n{0}", event_.ToStringPretty(2, 2)));
5-
return true;
6-
}
5+
}
76
}
87
static void Main(string[] args) {
98
if (System.Environment.GetEnvironmentVariable("OKX_API_KEY") is null) {

binding/csharp/example/execution_management_simple_subscription/MainProgram.cs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
class MainProgram {
22
class MyEventHandler : ccapi.EventHandler {
3-
public override bool ProcessEvent(ccapi.Event event_, ccapi.Session session) {
3+
public override void ProcessEvent(ccapi.Event event_, ccapi.Session session) {
44
if (event_.GetType_() == ccapi.Event.Type.SUBSCRIPTION_STATUS) {
55
System.Console.WriteLine(string.Format("Received an event of type SUBSCRIPTION_STATUS:\n{0}", event_.ToStringPretty(2, 2)));
66
var message = event_.GetMessageList()[0];
@@ -16,8 +16,7 @@ public override bool ProcessEvent(ccapi.Event event_, ccapi.Session session) {
1616
} else if (event_.GetType_() == ccapi.Event.Type.SUBSCRIPTION_DATA) {
1717
System.Console.WriteLine(string.Format("Received an event of type SUBSCRIPTION_DATA:\n{0}", event_.ToStringPretty(2, 2)));
1818
}
19-
return true;
20-
}
19+
}
2120
}
2221
static void Main(string[] args) {
2322
if (System.Environment.GetEnvironmentVariable("OKX_API_KEY") is null) {

binding/csharp/example/fix_simple/MainProgram.cs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
class MainProgram {
22
class MyEventHandler : ccapi.EventHandler {
3-
public override bool ProcessEvent(ccapi.Event event_, ccapi.Session session) {
3+
public override void ProcessEvent(ccapi.Event event_, ccapi.Session session) {
44
if (event_.GetType_() == ccapi.Event.Type.AUTHORIZATION_STATUS) {
55
System.Console.WriteLine(string.Format("Received an event of type AUTHORIZATION_STATUS:\n{0}", event_.ToStringPretty(2, 2)));
66
var message = event_.GetMessageList()[0];
@@ -21,8 +21,7 @@ public override bool ProcessEvent(ccapi.Event event_, ccapi.Session session) {
2121
} else if (event_.GetType_() == ccapi.Event.Type.FIX) {
2222
System.Console.WriteLine(string.Format("Received an event of type FIX:\n{0}", event_.ToStringPretty(2, 2)));
2323
}
24-
return true;
25-
}
24+
}
2625
}
2726
static void Main(string[] args) {
2827
if (System.Environment.GetEnvironmentVariable("COINBASE_API_KEY") is null) {

binding/csharp/example/handle_exception/MainProgram.cs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,13 @@
11
class MainProgram {
22
class MyEventHandler : ccapi.EventHandler {
3-
public override bool ProcessEvent(ccapi.Event event_, ccapi.Session session) {
3+
public override void ProcessEvent(ccapi.Event event_, ccapi.Session session) {
44
try {
55
throw new System.Exception("oops");
66
} catch (System.Exception e) {
77
System.Console.WriteLine(e.ToString());
88
System.Environment.Exit(1);
99
}
10-
return true;
11-
}
10+
}
1211
}
1312
static void Main(string[] args) {
1413
var eventHandler = new MyEventHandler();

binding/csharp/example/market_data_multiple_subscription/MainProgram.cs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
class MainProgram {
22
class MyEventHandler : ccapi.EventHandler {
3-
public override bool ProcessEvent(ccapi.Event event_, ccapi.Session session) {
3+
public override void ProcessEvent(ccapi.Event event_, ccapi.Session session) {
44
if (event_.GetType_() == ccapi.Event.Type.SUBSCRIPTION_DATA) {
55
foreach (var message in event_.GetMessageList()) {
66
var correlationId = message.GetCorrelationIdList()[0];
@@ -15,8 +15,7 @@ public override bool ProcessEvent(ccapi.Event event_, ccapi.Session session) {
1515
}
1616
}
1717
}
18-
return true;
19-
}
18+
}
2019
}
2120
static void Main(string[] args) {
2221
var eventHandler = new MyEventHandler();

binding/csharp/example/market_data_simple_request/MainProgram.cs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,8 @@
11
class MainProgram {
22
class MyEventHandler : ccapi.EventHandler {
3-
public override bool ProcessEvent(ccapi.Event event_, ccapi.Session session) {
3+
public override void ProcessEvent(ccapi.Event event_, ccapi.Session session) {
44
System.Console.WriteLine(string.Format("Received an event:\n{0}", event_.ToStringPretty(2, 2)));
5-
return true;
6-
}
5+
}
76
}
87
static void Main(string[] args) {
98
var eventHandler = new MyEventHandler();

binding/csharp/example/market_data_simple_subscription/MainProgram.cs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
class MainProgram {
22
class MyEventHandler : ccapi.EventHandler {
3-
public override bool ProcessEvent(ccapi.Event event_, ccapi.Session session) {
3+
public override void ProcessEvent(ccapi.Event event_, ccapi.Session session) {
44
if (event_.GetType_() == ccapi.Event.Type.SUBSCRIPTION_STATUS) {
55
System.Console.WriteLine(string.Format("Received an event of type SUBSCRIPTION_STATUS:\n{0}", event_.ToStringPretty(2, 2)));
66
} else if (event_.GetType_() == ccapi.Event.Type.SUBSCRIPTION_DATA) {
@@ -16,8 +16,7 @@ public override bool ProcessEvent(ccapi.Event event_, ccapi.Session session) {
1616
}
1717
}
1818
}
19-
return true;
20-
}
19+
}
2120
}
2221
static void Main(string[] args) {
2322
var eventHandler = new MyEventHandler();

binding/csharp/test/MainProgram.cs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
11
class MainProgram {
22
class MyEventHandler : ccapi.EventHandler {
3-
public override bool ProcessEvent(ccapi.Event event_, ccapi.Session session) {
4-
return true;
5-
}
3+
public override void ProcessEvent(ccapi.Event event_, ccapi.Session session) {
4+
}
65
}
76
static void Main(string[] args) {
87
var eventHandler = new MyEventHandler();

binding/go/example/execution_management_simple_request/main.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,9 @@ type MyEventHandler struct {
1111
ccapi.EventHandler
1212
}
1313

14-
func (*MyEventHandler) ProcessEvent(event ccapi.Event, session ccapi.Session) bool {
14+
func (*MyEventHandler) ProcessEvent(event ccapi.Event, session ccapi.Session) {
1515
fmt.Printf("Received an event:\n%s\n", event.ToStringPretty(2, 2))
16-
return true
17-
}
16+
}
1817

1918
func main() {
2019
if len(os.Getenv("OKX_API_KEY")) == 0 {

0 commit comments

Comments
 (0)