Skip to content

Commit 8745cfd

Browse files
committed
add network transport modules
1 parent e7e78ea commit 8745cfd

7 files changed

Lines changed: 1330 additions & 0 deletions

File tree

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
#include <ZeroKernel.h>
2+
#include <modules/net/ZeroHttpPump.h>
3+
4+
using zerokernel::Kernel;
5+
using zerokernel::modules::net::ZeroHttpPump;
6+
7+
namespace {
8+
9+
ZeroHttpPump g_httpPump;
10+
bool g_connectPending = true;
11+
unsigned long g_requestId = 0;
12+
const Kernel::TopicKey kCompletionTopic = Kernel::makeTopicKey("http.done");
13+
14+
ZeroHttpPump::StepResult connectStep(const ZeroHttpPump::Request&, void*) {
15+
if (g_connectPending) {
16+
g_connectPending = false;
17+
return ZeroHttpPump::kStepPending;
18+
}
19+
20+
return ZeroHttpPump::kStepComplete;
21+
}
22+
23+
ZeroHttpPump::StepResult writeStep(const ZeroHttpPump::Request&, void*) {
24+
return ZeroHttpPump::kStepComplete;
25+
}
26+
27+
ZeroHttpPump::StepResult readStep(const ZeroHttpPump::Request&, void*) {
28+
return ZeroHttpPump::kStepComplete;
29+
}
30+
31+
ZeroHttpPump::StepResult closeStep(const ZeroHttpPump::Request&, void*) {
32+
g_connectPending = true;
33+
return ZeroHttpPump::kStepComplete;
34+
}
35+
36+
void onCompletion(const char*, const Kernel::EventValue& value) {
37+
if (value.type != Kernel::kEventBool) {
38+
return;
39+
}
40+
41+
Serial.print("http.done => ");
42+
Serial.println(value.boolValue ? "ok" : "fail");
43+
}
44+
45+
void queueRequestTask() {
46+
static char payload[48];
47+
const int written = snprintf(payload, sizeof(payload), "{\"seq\":%lu}", g_requestId++);
48+
if (written <= 0) {
49+
return;
50+
}
51+
52+
ZeroHttpPump::Request request;
53+
request.path = "/api/data";
54+
request.contentType = "application/json";
55+
request.body = payload;
56+
request.bodyLength = static_cast<uint16_t>(written);
57+
request.completionTopicKey = kCompletionTopic;
58+
g_httpPump.enqueue(request);
59+
}
60+
61+
void reportTask() {
62+
const zerokernel::modules::net::ZeroTransportMetrics::Snapshot snapshot =
63+
g_httpPump.metrics().snapshot();
64+
65+
Serial.print("http queue=");
66+
Serial.print(g_httpPump.queuedCount());
67+
Serial.print(" sent_ok=");
68+
Serial.print(snapshot.sendSuccesses);
69+
Serial.print(" sent_fail=");
70+
Serial.print(snapshot.sendFailures);
71+
Serial.print(" connect_attempts=");
72+
Serial.println(snapshot.connectAttempts);
73+
}
74+
75+
} // namespace
76+
77+
void setup() {
78+
Serial.begin(115200);
79+
delay(50);
80+
81+
ZeroKernel.begin();
82+
ZeroKernel.subscribeTypedFast(kCompletionTopic, onCompletion);
83+
84+
ZeroHttpPump::Config config;
85+
config.retryBaseMs = 100;
86+
config.retryMaxMs = 500;
87+
config.maxRetries = 1;
88+
89+
g_httpPump.begin(ZeroKernel, connectStep, writeStep, readStep, closeStep, config);
90+
91+
ZeroKernel.addTask("HttpQueue", queueRequestTask, 1500, 0);
92+
ZeroKernel.addTask("HttpReport", reportTask, 1000, 0);
93+
}
94+
95+
void loop() {
96+
ZeroKernel.tick();
97+
g_httpPump.tick();
98+
}
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
#include <ZeroKernel.h>
2+
#include <modules/net/ZeroMqttPump.h>
3+
4+
using zerokernel::Kernel;
5+
using zerokernel::modules::net::ZeroMqttPump;
6+
7+
namespace {
8+
9+
ZeroMqttPump g_mqttPump;
10+
bool g_brokerConnected = false;
11+
bool g_failNextPublish = true;
12+
unsigned long g_publishValue = 0;
13+
const Kernel::TopicKey kBrokerStateTopic = Kernel::makeTopicKey("mqtt.state");
14+
const Kernel::TopicKey kPublishTopic = Kernel::makeTopicKey("mqtt.out");
15+
16+
bool linkProbe() {
17+
return g_brokerConnected;
18+
}
19+
20+
bool connectStep(void*) {
21+
g_brokerConnected = true;
22+
return true;
23+
}
24+
25+
void loopStep(void*) {}
26+
27+
bool publishStep(Kernel::TopicKey, const Kernel::EventValue&, void*) {
28+
if (g_failNextPublish) {
29+
g_failNextPublish = false;
30+
return false;
31+
}
32+
33+
return true;
34+
}
35+
36+
void onStateEvent(const char*, const Kernel::EventValue& value) {
37+
if (value.type != Kernel::kEventBool) {
38+
return;
39+
}
40+
41+
Serial.print("mqtt.state => ");
42+
Serial.println(value.boolValue ? "connected" : "disconnected");
43+
}
44+
45+
void queuePublishTask() {
46+
g_mqttPump.enqueue(kPublishTopic, Kernel::EventValue::fromUnsigned(g_publishValue++), 1);
47+
}
48+
49+
void reportTask() {
50+
const zerokernel::modules::net::ZeroTransportMetrics::Snapshot snapshot =
51+
g_mqttPump.metrics().snapshot();
52+
53+
Serial.print("mqtt queue=");
54+
Serial.print(g_mqttPump.queuedCount());
55+
Serial.print(" sent_ok=");
56+
Serial.print(snapshot.sendSuccesses);
57+
Serial.print(" sent_fail=");
58+
Serial.print(snapshot.sendFailures);
59+
Serial.print(" loop_calls=");
60+
Serial.println(snapshot.loopCalls);
61+
}
62+
63+
} // namespace
64+
65+
void setup() {
66+
Serial.begin(115200);
67+
delay(50);
68+
69+
ZeroKernel.begin();
70+
ZeroKernel.subscribeTypedFast(kBrokerStateTopic, onStateEvent);
71+
72+
ZeroMqttPump::Config config;
73+
config.pollIntervalMs = 50;
74+
config.retryBaseMs = 100;
75+
config.retryMaxMs = 500;
76+
config.maxRetries = 1;
77+
config.stateTopicKey = kBrokerStateTopic;
78+
79+
g_mqttPump.begin(ZeroKernel, linkProbe, connectStep, loopStep, publishStep, config);
80+
81+
ZeroKernel.addTask("MqttQueue", queuePublishTask, 1200, 0);
82+
ZeroKernel.addTask("MqttReport", reportTask, 1000, 0);
83+
}
84+
85+
void loop() {
86+
ZeroKernel.tick();
87+
g_mqttPump.tick();
88+
}
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
#include <ZeroKernel.h>
2+
#include <modules/net/ZeroTransportMetrics.h>
3+
4+
using zerokernel::modules::net::ZeroTransportMetrics;
5+
6+
namespace {
7+
8+
ZeroTransportMetrics g_metrics;
9+
10+
void updateMetricsTask() {
11+
static bool toggle = false;
12+
toggle = !toggle;
13+
14+
g_metrics.recordConnectAttempt();
15+
g_metrics.recordConnectResult(toggle, toggle ? 2 : 5);
16+
g_metrics.recordSendQueued(2);
17+
g_metrics.recordSendAttempt();
18+
g_metrics.recordSendResult(toggle, toggle ? 1 : 4, toggle ? 3 : 8);
19+
if (!toggle) {
20+
g_metrics.recordQueueDrop();
21+
g_metrics.recordBackoffSchedule();
22+
}
23+
g_metrics.recordLoopCall();
24+
}
25+
26+
void reportTask() {
27+
const ZeroTransportMetrics::Snapshot snapshot = g_metrics.snapshot();
28+
29+
Serial.print("connect_ok=");
30+
Serial.print(snapshot.connectSuccesses);
31+
Serial.print(" connect_fail=");
32+
Serial.print(snapshot.connectFailures);
33+
Serial.print(" send_ok=");
34+
Serial.print(snapshot.sendSuccesses);
35+
Serial.print(" send_fail=");
36+
Serial.println(snapshot.sendFailures);
37+
}
38+
39+
} // namespace
40+
41+
void setup() {
42+
Serial.begin(115200);
43+
delay(50);
44+
45+
ZeroKernel.begin();
46+
ZeroKernel.addTask("MetricsTick", updateMetricsTask, 500, 0);
47+
ZeroKernel.addTask("MetricsReport", reportTask, 1000, 0);
48+
}
49+
50+
void loop() {
51+
ZeroKernel.tick();
52+
}

0 commit comments

Comments
 (0)