Skip to content

Commit 8fa4b09

Browse files
author
Andy Ford
authored
Merge pull request #7 from ECFMP/event-bus
Event bus
2 parents 79acb9f + 4bf533e commit 8fa4b09

22 files changed

Lines changed: 678 additions & 232 deletions

include/flow-sdk/EventBus.h

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
#pragma once
2+
#include "../src/eventbus/InternalEventStream.h"
3+
#include <any>
4+
#include <mutex>
5+
#include <typeindex>
6+
#include <unordered_map>
7+
8+
namespace FlowSdk::EventBus {
9+
10+
template<typename EventType>
11+
class EventStreamFactory;
12+
13+
class EventBus
14+
{
15+
public:
16+
virtual ~EventBus() = default;
17+
/**
18+
* Subscribes the given listener to the event stream.
19+
*/
20+
template<typename EventType>
21+
void Subscribe(std::shared_ptr<NewEventListener<EventType>> listener)
22+
{
23+
Subscribe<EventType>(listener, nullptr);
24+
};
25+
26+
template<typename EventType>
27+
void Subscribe(
28+
std::shared_ptr<NewEventListener<EventType>> listener, std::shared_ptr<NewEventFilter<EventType>> filter
29+
)
30+
{
31+
GetStream<EventType>().Subscribe(listener, filter);
32+
};
33+
34+
/**
35+
* Subscribes the given listener to the event stream, but only for the next event.
36+
*/
37+
template<typename EventType>
38+
void SubscribeOnce(std::shared_ptr<NewEventListener<EventType>> listener)
39+
{
40+
SubscribeOnce<EventType>(listener, nullptr);
41+
}
42+
43+
template<typename EventType>
44+
void SubscribeOnce(
45+
std::shared_ptr<NewEventListener<EventType>> listener, std::shared_ptr<NewEventFilter<EventType>> filter
46+
)
47+
{
48+
GetStream<EventType>().SubscribeOnce(listener, filter);
49+
};
50+
51+
private:
52+
template<typename EventType>
53+
auto GetStream() -> InternalEventStream<EventType>&
54+
{
55+
auto lock = std::lock_guard(mutex);
56+
57+
const auto index = std::type_index(typeid(EventType));
58+
if (!streams.contains(index)) {
59+
streams.insert({index, std::any(std::make_shared<InternalEventStream<EventType>>())});
60+
}
61+
62+
return *std::any_cast<std::shared_ptr<InternalEventStream<EventType>>>(streams.at(index));
63+
}
64+
65+
// Protects the streams map
66+
std::mutex mutex;
67+
68+
// All the streams
69+
std::unordered_map<std::type_index, std::any> streams;
70+
};
71+
}// namespace FlowSdk::EventBus

include/flow-sdk/EventStream.h

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
#pragma once
2+
#include <memory>
3+
#include <mutex>
4+
#include <vector>
5+
6+
namespace FlowSdk::EventBus {
7+
8+
template<typename EventType>
9+
class NewEventListener;
10+
template<typename EventType>
11+
class NewEventFilter;
12+
13+
template<typename EventType>
14+
struct EventSubscription {
15+
std::shared_ptr<NewEventListener<EventType>> listener;
16+
std::shared_ptr<NewEventFilter<EventType>> filter;
17+
bool once;
18+
};
19+
20+
template<typename EventType>
21+
class EventStream
22+
{
23+
public:
24+
virtual ~EventStream() = default;
25+
26+
/**
27+
* Subscribes the given listener to the event stream.
28+
*/
29+
void Subscribe(std::shared_ptr<NewEventListener<EventType>> listener)
30+
{
31+
Subscribe(listener, nullptr);
32+
};
33+
34+
virtual void Subscribe(
35+
std::shared_ptr<NewEventListener<EventType>> listener, std::shared_ptr<NewEventFilter<EventType>> filter
36+
)
37+
{
38+
if (listener == nullptr) {
39+
throw std::invalid_argument("listener cannot be null");
40+
}
41+
42+
auto guard = std::lock_guard(mutex);
43+
subscriptions.push_back(EventSubscription<EventType>{listener, filter, false});
44+
};
45+
46+
/**
47+
* Subscribes the given listener to the event stream, but only for the next event.
48+
*/
49+
virtual void SubscribeOnce(std::shared_ptr<NewEventListener<EventType>> listener)
50+
{
51+
SubscribeOnce(listener, nullptr);
52+
}
53+
54+
virtual void SubscribeOnce(
55+
std::shared_ptr<NewEventListener<EventType>> listener, std::shared_ptr<NewEventFilter<EventType>> filter
56+
)
57+
{
58+
if (listener == nullptr) {
59+
throw std::invalid_argument("listener cannot be null");
60+
}
61+
62+
auto guard = std::lock_guard(mutex);
63+
subscriptions.push_back(EventSubscription<EventType>{listener, filter, true});
64+
};
65+
66+
protected:
67+
std::mutex mutex;
68+
69+
// All subscriptions to this event stream.
70+
std::vector<EventSubscription<EventType>> subscriptions;
71+
};
72+
}// namespace FlowSdk::EventBus

include/flow-sdk/NewEventFilter.h

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
#pragma once
2+
3+
namespace FlowSdk::EventBus {
4+
5+
template<typename EventType>
6+
class NewEventFilter
7+
{
8+
public:
9+
virtual ~NewEventFilter() = default;
10+
virtual bool ShouldProcess(const EventType&) = 0;
11+
};
12+
}// namespace FlowSdk::EventBus
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
#pragma once
2+
3+
namespace FlowSdk::EventBus {
4+
template<typename EventType>
5+
class NewEventListener
6+
{
7+
public:
8+
virtual ~NewEventListener() = default;
9+
virtual void OnEvent(const EventType&) = 0;
10+
};
11+
}// namespace FlowSdk::EventBus

include/flow-sdk/Sdk.h

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,16 @@
11
#pragma once
22

3-
namespace FlowSdk::FlowMeasure {
4-
class FlowMeasure;
5-
}// namespace FlowSdk::FlowMeasure
3+
namespace FlowSdk {
4+
namespace EventBus {
5+
class EventBus;
6+
}// namespace EventBus
7+
namespace FlowMeasure {
8+
class FlowMeasure;
9+
}// namespace FlowMeasure
10+
}// namespace FlowSdk
611

712
namespace FlowSdk::Plugin {
813

9-
class SdkEventListeners;
10-
1114
/**
1215
* Represents the public-facing facade of the SDK.
1316
*/
@@ -16,7 +19,16 @@ namespace FlowSdk::Plugin {
1619
public:
1720
virtual ~Sdk() = default;
1821

19-
[[nodiscard]] virtual auto Listeners() const noexcept -> SdkEventListeners& = 0;
22+
/**
23+
* The event bus for the SDK, which can be used to subscribe to events.
24+
*/
25+
[[nodiscard]] virtual auto EventBus() const noexcept -> EventBus::EventBus& = 0;
26+
27+
/**
28+
* This method should be called once every second by the plugin using this SDK (as part of EuroScopes timer
29+
* tick method).
30+
*/
31+
virtual void OnEuroscopeTimerTick() = 0;
2032

2133
/**
2234
* Destroys this SDK instance. Once destroyed it cannot be reused.

include/flow-sdk/SdkEventListenerTypes.h

Lines changed: 0 additions & 32 deletions
This file was deleted.

include/flow-sdk/SdkEventListeners.h

Lines changed: 0 additions & 51 deletions
This file was deleted.

include/flow-sdk/SdkEvents.h

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
#pragma once
2+
3+
namespace FlowSdk::FlowMeasure {
4+
class FlowMeasure;
5+
}// namespace FlowSdk::FlowMeasure
6+
7+
namespace FlowSdk::Plugin {
8+
using FlowMeasureNotifiedEvent = struct FlowMeasureNotifiedEvent {
9+
const FlowMeasure::FlowMeasure& measure;
10+
};
11+
12+
using FlowMeasureActivatedEvent = struct FlowMeasureActivatedEvent {
13+
const FlowMeasure::FlowMeasure& measure;
14+
};
15+
16+
using FlowMeasureExpiredEvent = struct FlowMeasureExpiredEvent {
17+
const FlowMeasure::FlowMeasure& measure;
18+
};
19+
20+
using FlowMeasureWithdrawnEvent = struct FlowMeasureWithdrawnEvent {
21+
const FlowMeasure::FlowMeasure& measure;
22+
};
23+
24+
using FlowMeasureReissuedEvent = struct FlowMeasureReissuedEvent {
25+
const FlowMeasure::FlowMeasure& original;
26+
const FlowMeasure::FlowMeasure& reissued;
27+
};
28+
}// namespace FlowSdk::Plugin

include/flow-sdk/flow-sdk.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,4 +19,4 @@
1919
#include "MultipleLevelFilter.h"
2020
#include "RouteFilter.h"
2121
#include "Sdk.h"
22-
#include "SdkEventListenerTypes.h"
22+
#include "SdkEvents.h"

src/CMakeLists.txt

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,12 @@ set(src_flight_information_regions
1414
set(src_events
1515
event/ConcreteEvent.cpp event/ConcreteEvent.h
1616
event/ConcreteEventParticipant.cpp "event/ConcreteEventParticipant.h"
17-
event/Event.cpp)
17+
event/Event.cpp ../include/flow-sdk/EventStream.h ../include/flow-sdk/NewEventListener.h ../include/flow-sdk/NewEventFilter.h)
18+
19+
set(src_eventbus
20+
../include/flow-sdk/EventStream.h
21+
../include/flow-sdk/NewEventListener.h
22+
../include/flow-sdk/NewEventFilter.h eventbus/InternalEventStream.h ../include/flow-sdk/EventBus.h eventbus/InternalEventBus.h)
1823

1924
set(src_flowmeasures
2025
flowmeasure/ConcreteFlowMeasure.cpp flowmeasure/ConcreteFlowMeasure.h flowmeasure/ConcreteAirportFilter.cpp flowmeasure/ConcreteAirportFilter.h flowmeasure/ConcreteEventFilter.cpp flowmeasure/ConcreteEventFilter.h flowmeasure/ConcreteLevelRangeFilter.cpp flowmeasure/ConcreteLevelRangeFilter.h flowmeasure/ConcreteRouteFilter.cpp flowmeasure/ConcreteRouteFilter.h flowmeasure/ConcreteMeasure.cpp flowmeasure/ConcreteMeasure.h flowmeasure/ConcreteMeasureFactory.cpp flowmeasure/ConcreteMeasureFactory.h flowmeasure/ConcreteFlowMeasureFilters.cpp flowmeasure/ConcreteFlowMeasureFilters.h api/FlowMeasureFilterParserInterface.h ../include/flow-sdk/MultipleLevelFilter.h flowmeasure/ConcreteMultipleLevelFilter.cpp flowmeasure/ConcreteMultipleLevelFilter.h ../include/mock/MultipleLevelFilterMock.h ../include/flow-sdk/RangeToDestinationFilter.h flowmeasure/ConcreteRangeToDestinationFilter.cpp flowmeasure/ConcreteRangeToDestinationFilter.h)
@@ -28,12 +33,13 @@ set(src_plugin
2833
plugin/ConcreteSdk.h
2934
plugin/ConcreteSdk.cpp
3035
plugin/InternalEventListeners.h
31-
plugin/ConcreteEventListeners.cpp plugin/ConcreteEventListeners.h plugin/ConcreteSdkEventListeners.cpp plugin/ConcreteSdkEventListeners.h plugin/SdkFactory.cpp)
36+
plugin/ConcreteEventListeners.cpp plugin/ConcreteEventListeners.h plugin/SdkFactory.cpp ../include/flow-sdk/SdkEvents.h)
3237

3338
set(ALL_FILES
3439
${src_api}
3540
${src_date}
3641
${src_events}
42+
${src_eventbus}
3743
${src_flowmeasures}
3844
${src_flight_information_regions}
3945
${src_log}

0 commit comments

Comments
 (0)