Skip to content

Commit 0130846

Browse files
authored
pubsub / rpc: centralize label prefiltering and matching (#299)
In SpecificDiscoveryStore, add the label set along with the handler info to the DiscoveryCluster as "Controller" info. This allows us to do the label matching centralized in the DiscoveryStore * Adds a GetSupplementalDataValue function for the specified key with proper error handling included * ServiceDiscovery: add test for two publishers and one subscriber * Add test to test whether the optional label prefiltering works when the publishers are started before the Subscriber * IntegrationTests: fix datarace for DataPubSub * SpecificDiscoveryStore: improve algorithm docs * Add comments to better explain the used algorithm, since it is somewhat convoluted Co-authored-by: Jan Kraemer <jan.kraemer@vector.com> Co-authored-by: Konrad Breitsprecher <Konrad.Breitsprecher@vector.com> Co-authored-by: Marius Börschig <Marius.Boerschig@vector.com> Co-authored-by: Daniel Edwards <Daniel.Edwards@vector.com> Signed-off-by: Jan Kraemer <jan.kraemer@vector.com> Signed-off-by: Konrad Breitsprecher <Konrad.Breitsprecher@vector.com> Signed-off-by: Marius Börschig <Marius.Boerschig@vector.com> Signed-off-by: Daniel Edwards <Daniel.Edwards@vector.com>
1 parent c7486b9 commit 0130846

13 files changed

Lines changed: 252 additions & 102 deletions

File tree

SilKit/IntegrationTests/ITest_Internals_DataPubSub.cpp

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -805,4 +805,83 @@ TEST_F(ITest_Internals_DataPubSub, test_1pub_1sub_async_rejoin)
805805
ShutdownSystem();
806806
}
807807

808+
809+
// Two publishers (optional label1), one subscriber (optional label1, label2); publishers start first; subscriber joins
810+
TEST_F(ITest_Internals_DataPubSub, test_2pub_1sub_async_starting_order)
811+
{
812+
const uint32_t numMsgToPublish = 1;
813+
const uint32_t numMsgToReceive = 1 * numMsgToPublish;
814+
815+
std::vector<PubSubParticipant> publishers;
816+
publishers.push_back({"Pub1",
817+
{{"PubCtrl1",
818+
"TopicA",
819+
{"A"},
820+
{{"K1", "V1", SilKit::Services::MatchingLabel::Kind::Optional}},
821+
1,
822+
defaultMsgSize,
823+
numMsgToPublish}},
824+
{}});
825+
publishers.push_back({"Pub2",
826+
{{"PubCtrl1",
827+
"TopicA",
828+
{"A"},
829+
{{"K1", "V1", SilKit::Services::MatchingLabel::Kind::Optional}},
830+
1,
831+
defaultMsgSize,
832+
numMsgToPublish}},
833+
{}});
834+
835+
std::vector<PubSubParticipant> subscribers;
836+
std::vector<std::vector<uint8_t>> expectedDataUnordered;
837+
expectedDataUnordered.reserve(numMsgToReceive);
838+
for (uint32_t d = 0; d < numMsgToReceive; d++)
839+
{
840+
// Receive the same blob several times (once from every publisher)
841+
expectedDataUnordered.emplace_back(std::vector<uint8_t>(defaultMsgSize, 0));
842+
}
843+
subscribers.push_back(
844+
{"Sub1",
845+
{},
846+
{{
847+
"SubCtrl1",
848+
"TopicA",
849+
{"A"},
850+
{{"K1", "V1", SilKit::Services::MatchingLabel::Kind::Optional},
851+
{"K2", "V2", SilKit::Services::MatchingLabel::Kind::Optional}
852+
}, // BUGHUNT: Second label breaks communication
853+
defaultMsgSize,
854+
numMsgToReceive,
855+
1,
856+
expectedDataUnordered,
857+
}}});
858+
859+
for (auto& sub : subscribers)
860+
{
861+
sub.communicationTimeout = std::chrono::milliseconds(1000);
862+
}
863+
864+
_testSystem.SetupRegistryAndSystemMaster("silkit://localhost:0", false, {});
865+
866+
867+
//BUGHUNT: Subscribers start first fails SOMETIMES
868+
//RunParticipants(subscribers, _testSystem.GetRegistryUri(), false);
869+
870+
//BUGHUNT: Publishers start first fails ALWAYS
871+
872+
// Start publishers
873+
RunParticipants(publishers, _testSystem.GetRegistryUri(), false);
874+
for (auto& p : publishers)
875+
{
876+
p.WaitForAllSent();
877+
}
878+
879+
// Start subscriber
880+
RunParticipants(subscribers, _testSystem.GetRegistryUri(), false);
881+
882+
883+
JoinPubSubThreads();
884+
ShutdownSystem();
885+
}
886+
808887
} // anonymous namespace

SilKit/IntegrationTests/ITest_Internals_DataPubSub.hpp

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,7 @@ class ITest_Internals_DataPubSub : public testing::Test
178178
, name{newName}
179179
, dataSubscribers{newDataSubscribers}
180180
, dataPublishers{newDataPublishers}
181+
, allReceived{std::make_unique<std::atomic<bool>>(false)}
181182
{
182183
}
183184

@@ -186,6 +187,7 @@ class ITest_Internals_DataPubSub : public testing::Test
186187
std::string name;
187188
std::vector<DataSubscriberInfo> dataSubscribers;
188189
std::vector<DataPublisherInfo> dataPublishers;
190+
std::unique_ptr<std::atomic<bool>> allReceived;
189191
std::unique_ptr<SilKit::IParticipant> participant;
190192
SilKit::Core::IParticipantInternal* participantImpl = nullptr;
191193

@@ -196,7 +198,6 @@ class ITest_Internals_DataPubSub : public testing::Test
196198
std::promise<void> allDiscoveredPromise;
197199
bool allDiscovered{false};
198200
std::promise<void> allReceivedPromise;
199-
bool allReceived{false};
200201
// Pub
201202
std::promise<void> allSentPromise;
202203
bool allSent{false};
@@ -208,7 +209,7 @@ class ITest_Internals_DataPubSub : public testing::Test
208209
if (std::all_of(dataSubscribers.begin(), dataSubscribers.end(),
209210
[](const auto& dsInfo) { return dsInfo.numMsgToReceive == 0; }))
210211
{
211-
allReceived = true;
212+
*allReceived = true;
212213
allReceivedPromise.set_value();
213214
}
214215
}
@@ -224,11 +225,11 @@ class ITest_Internals_DataPubSub : public testing::Test
224225

225226
void CheckAllReceivedPromise()
226227
{
227-
if (!allReceived && std::all_of(dataSubscribers.begin(), dataSubscribers.end(), [](const auto& dsInfo) {
228+
if (!*allReceived && std::all_of(dataSubscribers.begin(), dataSubscribers.end(), [](const auto& dsInfo) {
228229
return dsInfo.allReceived;
229230
}))
230231
{
231-
allReceived = true;
232+
*allReceived = true;
232233
allReceivedPromise.set_value();
233234
}
234235
}

SilKit/IntegrationTests/IntegrationTestInfrastructure.hpp

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,10 @@ class TestInfrastructure
2525
{
2626
std::stringstream ss;
2727
ss << "Something went wrong: " << error.what() << std::endl;
28-
_systemMaster.systemController->AbortSimulation();
28+
if (_systemMaster.systemController)
29+
{
30+
_systemMaster.systemController->AbortSimulation();
31+
}
2932
FAIL() << ss.str();
3033
}
3134

@@ -127,7 +130,7 @@ class TestInfrastructure
127130
struct SystemMaster
128131
{
129132
std::unique_ptr<IParticipant> participant;
130-
SilKit::Experimental::Services::Orchestration::ISystemController* systemController;
133+
SilKit::Experimental::Services::Orchestration::ISystemController* systemController{nullptr};
131134
ISystemMonitor* systemMonitor;
132135
ILifecycleService* lifecycleService;
133136

SilKit/include/silkit/services/datatypes.hpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
#include <cstdint>
88
#include <string>
9+
#include <tuple>
910

1011
#include "silkit/util/HandlerId.hpp"
1112

@@ -44,6 +45,11 @@ struct MatchingLabel
4445
std::string key; //!< The label's key.
4546
std::string value; //!< The label's key.
4647
Kind kind; //!< The matching kind to apply for this label.
48+
49+
friend bool operator==(const MatchingLabel& lhs, const MatchingLabel& rhs) noexcept
50+
{
51+
return std::tie(lhs.key, lhs.value, lhs.kind) == std::tie(rhs.key, rhs.value, rhs.kind);
52+
}
4753
};
4854

4955
using SilKit::Util::HandlerId;

SilKit/source/core/internal/ServiceDescriptor.hpp

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,8 @@ class ServiceDescriptor
8686
inline bool GetSupplementalDataItem(const std::string& key, std::string& value) const;
8787
inline void SetSupplementalDataItem(std::string key, std::string val);
8888

89+
inline std::string GetSupplementalDataValue(const std::string& key) const;
90+
8991
inline auto GetSimulationName() const -> const std::string&;
9092
inline void SetSimulationName(const std::string& simulationName);
9193

@@ -137,6 +139,17 @@ void ServiceDescriptor::SetSupplementalDataItem(std::string key, std::string val
137139
_supplementalData[key] = std::move(val);
138140
}
139141

142+
std::string ServiceDescriptor::GetSupplementalDataValue(const std::string& key) const
143+
{
144+
std::string tmp;
145+
if (GetSupplementalDataItem(key, tmp) == false)
146+
{
147+
throw SilKit::StateError{"Unknown key in supplementalData"};
148+
}
149+
150+
return tmp;
151+
}
152+
140153
auto ServiceDescriptor::GetParticipantId() const -> ParticipantId
141154
{
142155
return _participantId;

0 commit comments

Comments
 (0)