Skip to content

Commit af38b09

Browse files
config: enable configuration override for publisher history length (#226)
Issue: SILKIT-1777
1 parent 7c1185b commit af38b09

11 files changed

Lines changed: 280 additions & 3 deletions

SilKit/IntegrationTests/CMakeLists.txt

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,12 @@ add_silkit_test_to_executable(SilKitIntegrationTests
184184
SOURCES ITest_LinDynamicResponse.cpp
185185
)
186186

187+
# Pub/Sub
188+
189+
add_silkit_test_to_executable(SilKitIntegrationTests
190+
SOURCES ITest_PubHistory.cpp
191+
)
192+
187193
# RPC
188194

189195
add_silkit_test_to_executable(SilKitInternalIntegrationTests
Lines changed: 247 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,247 @@
1+
// SPDX-FileCopyrightText: 2025 Vector Informatik GmbH
2+
// SPDX-License-Identifier: MIT
3+
4+
#include "gtest/gtest.h"
5+
#include "gmock/gmock.h"
6+
7+
#include "silkit/SilKit.hpp"
8+
#include "silkit/config/IParticipantConfiguration.hpp"
9+
#include "silkit/participant/IParticipant.hpp"
10+
#include "silkit/services/pubsub/IDataPublisher.hpp"
11+
#include "silkit/services/pubsub/IDataSubscriber.hpp"
12+
#include "silkit/vendor/CreateSilKitRegistry.hpp"
13+
14+
#include <atomic>
15+
#include <chrono>
16+
#include <future>
17+
#include <thread>
18+
19+
namespace {
20+
21+
using namespace std::chrono_literals;
22+
23+
constexpr const char* PUBLISHER_NAME = "P";
24+
25+
constexpr const char* EMPTY_PART_CONF = R"(
26+
)";
27+
28+
constexpr const char* HISTORY_ZERO_PART_CONF = R"(
29+
DataPublishers:
30+
- Name: P
31+
History: 0
32+
)";
33+
34+
constexpr const char* HISTORY_ONE_PART_CONF = R"(
35+
DataPublishers:
36+
- Name: P
37+
History: 1
38+
)";
39+
40+
constexpr const char* HISTORY_TWO_PART_CONF = R"(
41+
DataPublishers:
42+
- Name: P
43+
History: 2
44+
)";
45+
46+
struct Participant
47+
{
48+
std::shared_ptr<SilKit::Config::IParticipantConfiguration> configuration;
49+
std::unique_ptr<SilKit::IParticipant> participant;
50+
51+
void SetupParticipant(const char* configurationString, const std::string& participantName,
52+
const std::string& registryUri)
53+
{
54+
this->configuration = SilKit::Config::ParticipantConfigurationFromString(configurationString);
55+
this->participant = SilKit::CreateParticipant(configuration, participantName, registryUri);
56+
}
57+
};
58+
59+
struct PublisherParticipant : Participant
60+
{
61+
SilKit::Services::PubSub::IDataPublisher* publisher{nullptr};
62+
63+
void SetupPublisher(const std::string& name, const SilKit::Services::PubSub::PubSubSpec& spec, std::size_t history)
64+
{
65+
this->publisher = this->participant->CreateDataPublisher(name, spec, history);
66+
}
67+
};
68+
69+
struct SubscriberParticipant : Participant
70+
{
71+
SilKit::Services::PubSub::IDataSubscriber* subscriber{nullptr};
72+
std::atomic<std::uint64_t> counter{0};
73+
74+
void SetupSubscriber(const std::string& name, const SilKit::Services::PubSub::PubSubSpec& spec)
75+
{
76+
this->subscriber =
77+
this->participant->CreateDataSubscriber(name, spec, [this](auto, const auto&) { this->counter += 1; });
78+
}
79+
};
80+
81+
void ExpectMessage(SubscriberParticipant& s, std::chrono::nanoseconds timeout)
82+
{
83+
const auto deadline = std::chrono::steady_clock::now() + timeout;
84+
85+
while (std::chrono::steady_clock::now() < deadline)
86+
{
87+
if (s.counter > 0)
88+
{
89+
break;
90+
}
91+
92+
std::this_thread::sleep_for(100ms);
93+
}
94+
95+
ASSERT_GT(s.counter, 0);
96+
}
97+
98+
void ExpectNoMessage(SubscriberParticipant& s, std::chrono::nanoseconds timeout)
99+
{
100+
const auto deadline = std::chrono::steady_clock::now() + timeout;
101+
102+
while (std::chrono::steady_clock::now() < deadline)
103+
{
104+
ASSERT_EQ(s.counter, 0);
105+
std::this_thread::sleep_for(100ms);
106+
}
107+
}
108+
109+
struct ITest_PubHistory : testing::Test
110+
{
111+
void SetUp() override
112+
{
113+
this->registryConfiguration = SilKit::Config::ParticipantConfigurationFromString("");
114+
this->spec = SilKit::Services::PubSub::PubSubSpec{"P", "application/octet-stream"};
115+
this->data = std::vector<std::uint8_t>{0xCA, 0xFE, 0xAF, 0xFE};
116+
}
117+
118+
std::shared_ptr<SilKit::Config::IParticipantConfiguration> registryConfiguration;
119+
SilKit::Services::PubSub::PubSubSpec spec;
120+
std::vector<std::uint8_t> data;
121+
};
122+
123+
TEST_F(ITest_PubHistory, history_api_one_conf_empty)
124+
{
125+
const auto registry = SilKit::Vendor::Vector::CreateSilKitRegistry(registryConfiguration);
126+
const auto registryUri = registry->StartListening("silkit://127.0.0.1:0");
127+
128+
PublisherParticipant a;
129+
a.SetupParticipant(EMPTY_PART_CONF, "A", registryUri);
130+
a.SetupPublisher(PUBLISHER_NAME, spec, 1);
131+
132+
// publish a single message, which will be historized and picked up by the subscriber
133+
a.publisher->Publish(SilKit::Util::ToSpan(data));
134+
135+
SubscriberParticipant b;
136+
b.SetupParticipant(EMPTY_PART_CONF, "B", registryUri);
137+
b.SetupSubscriber("S", spec);
138+
139+
ExpectMessage(b, 2s);
140+
}
141+
142+
TEST_F(ITest_PubHistory, history_api_zero_conf_empty)
143+
{
144+
const auto registry = SilKit::Vendor::Vector::CreateSilKitRegistry(registryConfiguration);
145+
const auto registryUri = registry->StartListening("silkit://127.0.0.1:0");
146+
147+
PublisherParticipant a;
148+
a.SetupParticipant(EMPTY_PART_CONF, "A", registryUri);
149+
a.SetupPublisher(PUBLISHER_NAME, spec, 0);
150+
151+
// publish a single message, which will not be historized
152+
a.publisher->Publish(SilKit::Util::ToSpan(data));
153+
154+
SubscriberParticipant b;
155+
b.SetupParticipant(EMPTY_PART_CONF, "B", registryUri);
156+
b.SetupSubscriber("S", spec);
157+
158+
ExpectNoMessage(b, 2s);
159+
}
160+
161+
TEST_F(ITest_PubHistory, history_api_one_conf_one)
162+
{
163+
const auto registry = SilKit::Vendor::Vector::CreateSilKitRegistry(registryConfiguration);
164+
const auto registryUri = registry->StartListening("silkit://127.0.0.1:0");
165+
166+
PublisherParticipant a;
167+
a.SetupParticipant(HISTORY_ONE_PART_CONF, "A", registryUri);
168+
a.SetupPublisher(PUBLISHER_NAME, spec, 1);
169+
170+
// publish a single message, which will be historized and picked up by the subscriber
171+
a.publisher->Publish(SilKit::Util::ToSpan(data));
172+
173+
SubscriberParticipant b;
174+
b.SetupParticipant(EMPTY_PART_CONF, "B", registryUri);
175+
b.SetupSubscriber("S", spec);
176+
177+
ExpectMessage(b, 2s);
178+
}
179+
180+
TEST_F(ITest_PubHistory, history_api_zero_conf_one)
181+
{
182+
const auto registry = SilKit::Vendor::Vector::CreateSilKitRegistry(registryConfiguration);
183+
const auto registryUri = registry->StartListening("silkit://127.0.0.1:0");
184+
185+
PublisherParticipant a;
186+
a.SetupParticipant(HISTORY_ONE_PART_CONF, "A", registryUri);
187+
a.SetupPublisher(PUBLISHER_NAME, spec, 0);
188+
189+
// publish a single message, which will be historized and picked up by the subscriber
190+
a.publisher->Publish(SilKit::Util::ToSpan(data));
191+
192+
SubscriberParticipant b;
193+
b.SetupParticipant(EMPTY_PART_CONF, "B", registryUri);
194+
b.SetupSubscriber("S", spec);
195+
196+
ExpectMessage(b, 2s);
197+
}
198+
199+
TEST_F(ITest_PubHistory, history_api_one_conf_zero)
200+
{
201+
const auto registry = SilKit::Vendor::Vector::CreateSilKitRegistry(registryConfiguration);
202+
const auto registryUri = registry->StartListening("silkit://127.0.0.1:0");
203+
204+
PublisherParticipant a;
205+
a.SetupParticipant(HISTORY_ZERO_PART_CONF, "A", registryUri);
206+
a.SetupPublisher(PUBLISHER_NAME, spec, 1);
207+
208+
// publish a single message, which will not be historized
209+
a.publisher->Publish(SilKit::Util::ToSpan(data));
210+
211+
SubscriberParticipant b;
212+
b.SetupParticipant(EMPTY_PART_CONF, "B", registryUri);
213+
b.SetupSubscriber("S", spec);
214+
215+
ExpectNoMessage(b, 2s);
216+
}
217+
218+
TEST_F(ITest_PubHistory, history_api_zero_conf_zero)
219+
{
220+
const auto registry = SilKit::Vendor::Vector::CreateSilKitRegistry(registryConfiguration);
221+
const auto registryUri = registry->StartListening("silkit://127.0.0.1:0");
222+
223+
PublisherParticipant a;
224+
a.SetupParticipant(HISTORY_ZERO_PART_CONF, "A", registryUri);
225+
a.SetupPublisher(PUBLISHER_NAME, spec, 0);
226+
227+
// publish a single message, which will not be historized
228+
a.publisher->Publish(SilKit::Util::ToSpan(data));
229+
230+
SubscriberParticipant b;
231+
b.SetupParticipant(EMPTY_PART_CONF, "B", registryUri);
232+
b.SetupSubscriber("S", spec);
233+
234+
ExpectNoMessage(b, 2s);
235+
}
236+
237+
TEST_F(ITest_PubHistory, history_api_zero_conf_two_error)
238+
{
239+
const auto registry = SilKit::Vendor::Vector::CreateSilKitRegistry(registryConfiguration);
240+
const auto registryUri = registry->StartListening("silkit://127.0.0.1:0");
241+
242+
PublisherParticipant a;
243+
a.SetupParticipant(HISTORY_TWO_PART_CONF, "A", registryUri);
244+
EXPECT_THROW(a.SetupPublisher(PUBLISHER_NAME, spec, 0), SilKit::ConfigurationError);
245+
}
246+
247+
} //end namespace

SilKit/source/config/ParticipantConfiguration.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ struct DataPublisher
176176
std::optional<std::vector<Label>> labels;
177177

178178
//! \brief History length of a DataPublisher.
179-
std::optional<size_t> history{0};
179+
std::optional<size_t> history;
180180

181181
std::vector<std::string> useTraceSinks;
182182
Replay replay;

SilKit/source/config/ParticipantConfiguration.schema.json

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -439,6 +439,12 @@
439439
"$ref": "#/definitions/MatchingLabel"
440440
}
441441
},
442+
"History": {
443+
"type": "integer",
444+
"description": "Override the history length (must be `0` or `1`)",
445+
"minimum": 0,
446+
"maximum": 1
447+
},
442448
"Logging": {
443449
"type": "object",
444450
"description": "Configures the properties of the SIL Kit Logging Service",
@@ -601,6 +607,9 @@
601607
},
602608
"Labels": {
603609
"$ref": "#/definitions/Labels"
610+
},
611+
"History": {
612+
"$ref": "#/definitions/History"
604613
}
605614
},
606615
"additionalProperties": false,

SilKit/source/config/ParticipantConfiguration_Full.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,7 @@
150150
{
151151
"Name": "Publisher1",
152152
"Topic": "Temperature",
153+
"History": 1,
153154
"Labels": [
154155
{
155156
"Key": "A",

SilKit/source/config/ParticipantConfiguration_Full.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@ FlexrayControllers:
117117
DataPublishers:
118118
- Name: Publisher1
119119
Topic: Temperature
120+
History: 1
120121
Labels:
121122
- Key: A
122123
Value: B

SilKit/source/config/YamlReader.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -339,6 +339,7 @@ void YamlReader::Read(SilKit::Config::DataPublisher& obj)
339339
ReadKeyValue(obj.name, "Name");
340340
OptionalRead(obj.topic, "Topic");
341341
OptionalRead(obj.labels, "Labels");
342+
OptionalRead(obj.history, "History");
342343
OptionalRead(obj.useTraceSinks, "UseTraceSinks");
343344
OptionalRead(obj.replay, "Replay");
344345
}

SilKit/source/config/YamlValidator.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,7 @@ const std::set<std::string> schemaPaths_v1 = {
121121
"/DataPublishers/Replay/MdfChannel/GroupSource",
122122
"/DataPublishers/Replay/UseTraceSource",
123123
"/DataPublishers/Topic",
124+
"/DataPublishers/History",
124125
"/DataPublishers/UseTraceSinks",
125126
"/DataSubscribers",
126127
"/DataSubscribers/Labels",

SilKit/source/config/YamlWriter.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -403,7 +403,7 @@ void YamlWriter::Write(const SilKit::Config::DataPublisher& obj)
403403
WriteKeyValue("Name", obj.name);
404404
OptionalWrite(obj.topic, "Topic");
405405
OptionalWrite(obj.labels, "Labels");
406-
//OptionalWrite(obj.history, "History");
406+
OptionalWrite(obj.history, "History");
407407
OptionalWrite(obj.useTraceSinks, "UseTraceSinks");
408408
OptionalWrite(obj.replay, "Replay");
409409
}

SilKit/source/core/participant/Participant_impl.hpp

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -511,9 +511,15 @@ auto Participant<SilKitConnectionT>::CreateDataPublisher(const std::string& cano
511511
SilKit::Config::DataPublisher controllerConfig =
512512
GetConfigByControllerName(_participantConfig.dataPublishers, canonicalName);
513513
UpdateOptionalConfigValue(canonicalName, controllerConfig.topic, dataSpec.Topic());
514+
UpdateOptionalConfigValue(canonicalName, controllerConfig.history, history);
514515
UpdateOptionalConfigValue(canonicalName, controllerConfig.labels,
515516
SilKit::Config::V1::Label::VectorFromPublicApi(dataSpec.Labels()));
516517

518+
if (controllerConfig.history.value() > 1)
519+
{
520+
throw SilKit::ConfigurationError("DataPublishers do not support history > 1.");
521+
}
522+
517523
auto sortedConfigLabels = controllerConfig.labels.value();
518524
std::sort(sortedConfigLabels.begin(), sortedConfigLabels.end(),
519525
[](const auto& v1, const auto& v2) { return v1.key < v2.key; });
@@ -536,7 +542,7 @@ auto Participant<SilKitConnectionT>::CreateDataPublisher(const std::string& cano
536542
controllerConfig, network, std::move(supplementalData), true, true, &_timeProvider, configuredDataNodeSpec,
537543
network, controllerConfig);
538544

539-
_connection.SetHistoryLengthForLink(history, controller);
545+
_connection.SetHistoryLengthForLink(controllerConfig.history.value(), controller);
540546

541547
if (GetLogger()->GetLogLevel() <= Logging::Level::Trace)
542548
{

0 commit comments

Comments
 (0)