Skip to content

Commit 91e8e41

Browse files
peckato1troglobit
authored andcommitted
http: add optional callbacks to EventStream
In some cases, it might be useful to have a callback that is called when the client disconnects or when the connection is lost. For us, this will be useful in the future subscribed notifications implementation where we would like to clean up stuff after client disconnects. Change-Id: Icfc2959e38b812b7c18f45976415209b29151c7b Signed-off-by: Mattias Walström <lazzer@gmail.com>
1 parent cb0c00c commit 91e8e41

2 files changed

Lines changed: 23 additions & 5 deletions

File tree

src/http/EventStream.cpp

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,15 @@ EventStream::EventStream(const server::request& req,
2525
Termination& termination,
2626
EventSignal& signal,
2727
const std::chrono::seconds keepAlivePingInterval,
28-
const std::optional<std::string>& initialEvent)
28+
const std::optional<std::string>& initialEvent,
29+
const std::function<void()>& onTerminationCb,
30+
const std::function<void()>& onClientDisconnectedCb)
2931
: res{res}
3032
, ping{res.io_service()}
3133
, peer{peer_from_request(req)}
3234
, m_keepAlivePingInterval(keepAlivePingInterval)
35+
, onTerminationCb(onTerminationCb)
36+
, onClientDisconnectedCb(onClientDisconnectedCb)
3337
{
3438
if (initialEvent) {
3539
enqueue(FIELD_DATA, *initialEvent);
@@ -41,6 +45,9 @@ EventStream::EventStream(const server::request& req,
4145

4246
terminateSub = termination.connect([this]() {
4347
spdlog::trace("{}: will terminate", peer);
48+
if (this->onTerminationCb) {
49+
this->onTerminationCb();
50+
}
4451
std::lock_guard lock{mtx};
4552
if (state == Closed) { // we are late to the party, res is already gone
4653
return;
@@ -80,6 +87,9 @@ void EventStream::activate()
8087
myself->eventSub.disconnect();
8188
myself->terminateSub.disconnect();
8289
myself->state = Closed;
90+
if (myself->onClientDisconnectedCb) {
91+
myself->onClientDisconnectedCb();
92+
}
8393
});
8494

8595
res.end([myself](uint8_t* destination, std::size_t len, uint32_t* data_flags) {
@@ -187,9 +197,11 @@ std::shared_ptr<EventStream> EventStream::create(const nghttp2::asio_http2::serv
187197
Termination& terminate,
188198
EventSignal& signal,
189199
const std::chrono::seconds keepAlivePingInterval,
190-
const std::optional<std::string>& initialEvent)
200+
const std::optional<std::string>& initialEvent,
201+
const std::function<void()>& onTerminationCb,
202+
const std::function<void()>& onClientDisconnectedCb)
191203
{
192-
auto stream = std::shared_ptr<EventStream>(new EventStream(req, res, terminate, signal, keepAlivePingInterval, initialEvent));
204+
auto stream = std::shared_ptr<EventStream>(new EventStream(req, res, terminate, signal, keepAlivePingInterval, initialEvent, onTerminationCb, onClientDisconnectedCb));
193205
stream->activate();
194206
return stream;
195207
}

src/http/EventStream.h

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,9 @@ class EventStream : public std::enable_shared_from_this<EventStream> {
3636
Termination& terminate,
3737
EventSignal& signal,
3838
const std::chrono::seconds keepAlivePingInterval,
39-
const std::optional<std::string>& initialEvent = std::nullopt);
39+
const std::optional<std::string>& initialEvent = std::nullopt,
40+
const std::function<void()>& onTerminationCb = std::function<void()>(),
41+
const std::function<void()>& onClientDisconnectedCb = std::function<void()>());
4042

4143
private:
4244
const nghttp2::asio_http2::server::response& res;
@@ -54,6 +56,8 @@ class EventStream : public std::enable_shared_from_this<EventStream> {
5456
boost::signals2::scoped_connection eventSub, terminateSub;
5557
const std::string peer;
5658
const std::chrono::seconds m_keepAlivePingInterval;
59+
std::function<void()> onTerminationCb; ///< optional callback when the stream is terminated
60+
std::function<void()> onClientDisconnectedCb; ///< optional callback invoked in client.on_close()
5761

5862
size_t send_chunk(uint8_t* destination, std::size_t len, uint32_t* data_flags);
5963
ssize_t process(uint8_t* destination, std::size_t len, uint32_t* data_flags);
@@ -66,7 +70,9 @@ class EventStream : public std::enable_shared_from_this<EventStream> {
6670
Termination& terminate,
6771
EventSignal& signal,
6872
const std::chrono::seconds keepAlivePingInterval,
69-
const std::optional<std::string>& initialEvent = std::nullopt);
73+
const std::optional<std::string>& initialEvent = std::nullopt,
74+
const std::function<void()>& onTerminationCb = std::function<void()>(),
75+
const std::function<void()>& onClientDisconnectedCb = std::function<void()>());
7076
void activate();
7177
};
7278
}

0 commit comments

Comments
 (0)