Skip to content

Commit 462c5ad

Browse files
committed
gate mqtt on transport readiness
1 parent 802dc84 commit 462c5ad

File tree

3 files changed

+27
-3
lines changed

3 files changed

+27
-3
lines changed

examples/LiveNetworkNode/LiveNetworkNode.ino

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -401,6 +401,7 @@ void setup() {
401401
mqttLoopStep,
402402
mqttPublishStep,
403403
mqttConfig);
404+
g_mqttPump.setTransportProbe(isWiFiConnected);
404405

405406
#if defined(ARDUINO_ARCH_ESP8266)
406407
ZeroKernel.addTask("Sample", sampleTask, 100, 0);

src/modules/net/ZeroMqttPump.h

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ class ZeroMqttPump {
2626
};
2727

2828
typedef bool (*LinkProbe)();
29+
typedef bool (*TransportProbe)();
2930
typedef bool (*ConnectStep)(void* context);
3031
typedef void (*LoopStep)(void* context);
3132
typedef bool (*PublishStep)(Kernel::TopicKey topicKey,
@@ -59,6 +60,7 @@ class ZeroMqttPump {
5960
ZeroMqttPump()
6061
: kernel_(NULL),
6162
linkProbe_(NULL),
63+
transportProbe_(NULL),
6264
connectStep_(NULL),
6365
loopStep_(NULL),
6466
publishStep_(NULL),
@@ -95,6 +97,10 @@ class ZeroMqttPump {
9597
currentRetryMs_ = config.retryBaseMs;
9698
}
9799

100+
void setTransportProbe(TransportProbe transportProbe) {
101+
transportProbe_ = transportProbe;
102+
}
103+
98104
void reset() {
99105
metrics_.reset();
100106
started_ = false;
@@ -159,6 +165,10 @@ class ZeroMqttPump {
159165
notifyStateIfChanged_();
160166

161167
if (!connected_) {
168+
if (transportProbe_ != NULL && !transportProbe_()) {
169+
return;
170+
}
171+
162172
if (pendingRetryAtMs_ != 0 && nowMs < pendingRetryAtMs_) {
163173
return;
164174
}
@@ -277,6 +287,7 @@ class ZeroMqttPump {
277287

278288
Kernel* kernel_;
279289
LinkProbe linkProbe_;
290+
TransportProbe transportProbe_;
280291
ConnectStep connectStep_;
281292
LoopStep loopStep_;
282293
PublishStep publishStep_;

tests/desktop/KernelTests.cpp

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -249,6 +249,10 @@ bool mqttLinkProbe() {
249249
return g_mqttLinkUp;
250250
}
251251

252+
bool mqttTransportProbe() {
253+
return g_wifiLinkUp;
254+
}
255+
252256
bool mqttConnectStep(void*) {
253257
++g_mqttConnectCalls;
254258
g_mqttLinkUp = true;
@@ -1058,6 +1062,7 @@ int testMqttPumpModule() {
10581062
g_mqttLoopCalls = 0;
10591063
g_mqttPublishCalls = 0;
10601064
g_mqttStateEvents = 0;
1065+
g_wifiLinkUp = false;
10611066
g_mqttLinkUp = false;
10621067
g_mqttLastState = false;
10631068
g_mqttPublishShouldFail = true;
@@ -1071,6 +1076,7 @@ int testMqttPumpModule() {
10711076
mqttLoopStep,
10721077
mqttPublishStep,
10731078
config);
1079+
pump.setTransportProbe(mqttTransportProbe);
10741080

10751081
expectTrue(pump.enqueue(zerokernel::Kernel::makeTopicKey("mqtt.out"),
10761082
zerokernel::Kernel::EventValue::fromUnsigned(7UL)),
@@ -1079,22 +1085,28 @@ int testMqttPumpModule() {
10791085
g_fakeNowMs = 1;
10801086
isolatedKernel.tick();
10811087
pump.tick();
1088+
expectTrue(g_mqttConnectCalls == 0, "mqtt pump waits for transport before connect");
1089+
1090+
g_wifiLinkUp = true;
1091+
g_fakeNowMs = 2;
1092+
isolatedKernel.tick();
1093+
pump.tick();
10821094
expectTrue(pump.isConnected(), "mqtt pump connects on first retry");
10831095
expectTrue(g_mqttStateEvents == 1, "mqtt pump publishes connected state");
10841096
expectTrue(g_mqttLastState, "mqtt pump reports connected");
10851097

1086-
g_fakeNowMs = 2;
1098+
g_fakeNowMs = 3;
10871099
isolatedKernel.tick();
10881100
pump.tick();
10891101
expectTrue(g_mqttPublishCalls == 1, "mqtt pump attempts first publish");
10901102
expectTrue(pump.queuedCount() == 1, "mqtt pump keeps message queued after failure");
10911103

1092-
g_fakeNowMs = 4;
1104+
g_fakeNowMs = 5;
10931105
isolatedKernel.tick();
10941106
pump.tick();
10951107
expectTrue(g_mqttPublishCalls == 1, "mqtt pump honors publish backoff");
10961108

1097-
g_fakeNowMs = 7;
1109+
g_fakeNowMs = 8;
10981110
isolatedKernel.tick();
10991111
pump.tick();
11001112
expectTrue(g_mqttPublishCalls == 2, "mqtt pump retries publish after backoff");

0 commit comments

Comments
 (0)