Skip to content

Commit 0df5a19

Browse files
MQTT/Protobuf Middleware API (#390)
1 parent c17e763 commit 0df5a19

10 files changed

Lines changed: 337 additions & 9 deletions

File tree

.gitmodules

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,3 +2,6 @@
22
[submodule "dev/OpenOCD"]
33
path = dev/OpenOCD
44
url = https://github.com/STMicroelectronics/OpenOCD.git
5+
[submodule "dev/nanopb/nanopb"]
6+
path = dev/nanopb/nanopb
7+
url = https://github.com/nanopb/nanopb.git

NetX/inc/u_nx_ethernet.h

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -131,9 +131,17 @@ UINT ethernet_mqtt_reconnect(void);
131131

132132
/**
133133
* @brief Retrieves the time from PTP stack.
134-
* @return The UTC time
134+
* @param datetime Buffer for the retrieved datetime info.
135+
* @return U_SUCCESS if successful, U_ERROR is not successful.
135136
*/
136-
NX_PTP_DATE_TIME ethernet_get_time(void);
137+
int ethernet_get_time(NX_PTP_DATE_TIME* datetime);
138+
139+
/**
140+
* @brief Gets the number of microseconds since the Unix epoch (1970-01-01 00:00:00 UTC), using the PTP stack.
141+
* @param buffer The buffer for the retrieved time.
142+
* @return U_SUCCESS if successful, U_ERROR is not successful.
143+
*/
144+
int ethernet_ptp_get_unix_microseconds(uint64_t* buffer);
137145

138146
/**
139147
* Debugging, print the status of ARP statistics

NetX/inc/u_nx_protobuf.h

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
#pragma once
2+
3+
// clang-format off
4+
5+
/*
6+
* Wrapper for structuring and sending protobuf Ethernet messages over MQTT.
7+
* This API sits above the lower-level u_nx_ethernet.h driver layer. Messages with nonstandard formats can still be sent using the u_nx_ethernet.h API directly.
8+
*/
9+
10+
#include <stddef.h>
11+
#include <stdbool.h>
12+
#include "serverdata.pb.h"
13+
14+
/* Helper macros. */
15+
#define PB_STR_HELPER(x) #x // Helper for PB_TOSTR(). Probably should never use directly.
16+
#define PB_TOSTR(x) PB_STR_HELPER(x) // Converts a macro's value into a string.
17+
#define PB_COUNT_ARGS(...) (sizeof((float[]){ __VA_ARGS__ }) / sizeof(float)) // Returns the number of arguments passed into it.
18+
#define PB_STR_LEN(s) (sizeof(s) - 1) // Returns the length of a string literal.
19+
20+
/* CONFIG: Compile-time validation of topic size, unit size, and number of values. */
21+
#define PB_MAX_TOPIC_LENGTH 100 // Maximum length of topic string literal (in characters).
22+
#define PB_MAX_UNIT_LENGTH 15 // Maximum length of unit string literal (in characters).
23+
#define PB_MIN_DATAPOINTS 1 // Minimum number of datapoints (i.e., variable `...` arguments passed into `nx_protobuf_mqtt_message_create()`).
24+
#define PB_MAX_DATAPOINTS 5 // Maximum number of datapoints (i.e., variable `...` arguments passed into `nx_protobuf_mqtt_message_create()`).
25+
#define PB_VALIDATE_ARGS(topic, unit, num_values) \
26+
do { \
27+
_Static_assert( \
28+
PB_STR_LEN(topic) <= PB_MAX_TOPIC_LENGTH, \
29+
"MQTT topic parameter exceeds maximum length of " PB_TOSTR(PB_MAX_TOPIC_LENGTH) " allowed by `nx_protobuf_mqtt_message_create()`."\
30+
); \
31+
_Static_assert( \
32+
PB_STR_LEN(unit) <= PB_MAX_UNIT_LENGTH, \
33+
"MQTT unit parameter exceeds maximum length of " PB_TOSTR(PB_MAX_UNIT_LENGTH) " allowed by `nx_protobuf_mqtt_message_create()`." \
34+
); \
35+
_Static_assert( \
36+
(num_values) >= PB_MIN_DATAPOINTS, \
37+
"Must pass at least " PB_TOSTR(PB_MIN_DATAPOINTS) " value into the variable argument of `nx_protobuf_mqtt_message_create()`." \
38+
); \
39+
_Static_assert( \
40+
(num_values) <= PB_MAX_DATAPOINTS, \
41+
"Cannot pass more than " PB_TOSTR(PB_MAX_DATAPOINTS) " values into the variable argument of `nx_protobuf_mqtt_message_create()`." \
42+
); \
43+
} while (0)
44+
45+
/**
46+
* @brief Creates and formats a `ethernet_mqtt_message_t` object, and returns it to the caller.
47+
* @param topic (const char*) String literal representing the message's MQTT topic name.
48+
* @param unit (const char*) String literal representing the unit of the message's data.
49+
* @param ... (float) The data to be sent in the message. This is a variable argument, so it can be repeated depending on how many datapoints you want to send. If you pass in more datapoints than allowed, you will get a compile-time error.
50+
* @return An `ethernet_mqtt_message_t` object.
51+
* @note If message creation was not completed for any reason, .initialized will be false in the returned `ethernet_mqtt_message_t` object. You may still use the object as you please (including attempting to initialize it again), but attempting to send the message (via `nx_protobuf_mqtt_message_send()`) will return an error.
52+
*/
53+
#define nx_protobuf_mqtt_message_create(topic, unit, ...) \
54+
({ \
55+
PB_VALIDATE_ARGS(topic, unit, PB_COUNT_ARGS(__VA_ARGS__)); \
56+
_nx_protobuf_mqtt_message_create( \
57+
(topic), PB_STR_LEN(topic), \
58+
(unit), PB_STR_LEN(unit), \
59+
(float[]){ __VA_ARGS__ }, \
60+
PB_COUNT_ARGS(__VA_ARGS__) \
61+
); \
62+
})
63+
64+
65+
/* Ethernet MQTT Message. */
66+
typedef struct {
67+
const char* topic;
68+
int topic_size;
69+
serverdata_v2_ServerData protobuf;
70+
bool initialized;
71+
} ethernet_mqtt_message_t;
72+
73+
/**
74+
* @brief Dispatches a `ethernet_mqtt_message_t` message over MQTT.
75+
* @param message The message to send.
76+
* @return U_SUCCESS if successful, U_ERROR is not successful.
77+
*/
78+
int nx_protobuf_mqtt_message_send(ethernet_mqtt_message_t* message);
79+
80+
/* MACRO IMPLEMENTATIONS */
81+
ethernet_mqtt_message_t _nx_protobuf_mqtt_message_create(const char* topic, size_t topic_size, const char* unit, size_t unit_len, const float values[], int values_count);
82+
83+
// clang-format on

NetX/src/u_nx_debug.c

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,17 @@
11
#include "u_nx_debug.h"
22
#include "nxd_ptp_client.h"
33

4+
#if defined(__has_include)
5+
#if __has_include("nxd_mqtt_client.h")
6+
#include "nxd_mqtt_client.h"
7+
#define U_NX_DEBUG_HAS_MQTT 1
8+
#endif
9+
#endif
10+
11+
#ifndef U_NX_DEBUG_HAS_MQTT
12+
#define U_NX_DEBUG_HAS_MQTT 0
13+
#endif
14+
415
// clang-format off
516

617
/* Converts a NetX status macro to a printable string. */
@@ -74,8 +85,11 @@ const char* nx_status_toString(UINT status) {
7485
case NX_CONTINUE: return "NX_CONTINUE";
7586
case NX_TCPIP_OFFLOAD_ERROR: return "NX_TCPIP_OFFLOAD_ERROR";
7687

77-
/* MQTT-specific stuff. */
88+
/* MQTT-specific stuff. */
89+
#if U_NX_DEBUG_HAS_MQTT
90+
#if (NXD_MQTT_SUCCESS != NX_SUCCESS)
7891
case NXD_MQTT_SUCCESS: return "NXD_MQTT_SUCCESS";
92+
#endif
7993
case NXD_MQTT_ALREADY_CONNECTED: return "NXD_MQTT_ALREADY_CONNECTED";
8094
case NXD_MQTT_NOT_CONNECTED: return "NXD_MQTT_NOT_CONNECTED";
8195
case NXD_MQTT_MUTEX_FAILURE: return "NXD_MQTT_MUTEX_FAILURE";
@@ -100,6 +114,7 @@ const char* nx_status_toString(UINT status) {
100114
case NXD_MQTT_ERROR_SERVER_UNAVAILABLE: return "NXD_MQTT_ERROR_SERVER_UNAVAILABLE";
101115
case NXD_MQTT_ERROR_BAD_USERNAME_PASSWORD: return "NXD_MQTT_ERROR_BAD_USERNAME_PASSWORD";
102116
case NXD_MQTT_ERROR_NOT_AUTHORIZED: return "NXD_MQTT_ERROR_NOT_AUTHORIZED";
117+
#endif
103118

104119
/* PTP-specific stuff. */
105120
case NX_PTP_CLIENT_NOT_STARTED: return "NX_PTP_CLIENT_NOT_STARTED";

NetX/src/u_nx_ethernet.c

Lines changed: 77 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88
#include "c_utils.h"
99
#include "nx_api.h"
1010
#include "u_tx_general.h"
11+
#include <inttypes.h>
12+
#include "serial.h"
1113
#include <string.h>
1214
#include <stdio.h>
1315
#include <sys/_types.h>
@@ -382,6 +384,7 @@ UINT ethernet_init(ethernet_node_t node_id, DriverFunction driver, OnRecieve on_
382384
/* Mark device as initialized. */
383385
device.is_initialized = true;
384386

387+
PRINTLN_INFO("Ran ethernet_init()");
385388
return NX_SUCCESS;
386389
}
387390

@@ -523,16 +526,84 @@ UINT ethernet_mqtt_reconnect(void) {
523526
}
524527
#endif
525528

526-
NX_PTP_DATE_TIME ethernet_get_time(void) {
527-
NX_PTP_TIME tm;
528-
NX_PTP_DATE_TIME date;
529+
int ethernet_get_time(NX_PTP_DATE_TIME* datetime) {
530+
NX_PTP_TIME tm = { 0 };
531+
NX_PTP_DATE_TIME dt = { 0 };
532+
NX_PTP_CLIENT_SYNC sync = { 0 };
533+
USHORT flags;
534+
535+
/* If not initialized, don't try to read PTP yet. */
536+
if(!device.is_initialized) {
537+
PRINTLN_ERROR("Tried getting PTP time before device has been initialized.");
538+
return U_ERROR;
539+
}
540+
529541
/* read the PTP clock */
530-
nx_ptp_client_time_get(&device.ptp_client, &tm);
542+
int status = nx_ptp_client_time_get(&device.ptp_client, &tm);
543+
if(status != NX_SUCCESS) {
544+
PRINTLN_ERROR("Failed to call nx_ptp_client_time_get() (Status: %d/%s).", status, nx_status_toString(status));
545+
return U_ERROR;
546+
}
547+
548+
PRINTLN_INFO("ptp nanoseconds: %ld", tm.nanosecond);
549+
550+
/* Set utc_offset. */
551+
const SHORT utc_offset = 0;
552+
PRINTLN_INFO("utc offset: %d", utc_offset);
531553

532554
/* convert PTP time to UTC date and time */
533-
nx_ptp_client_utility_convert_time_to_date(&tm, 0, &date);
555+
status = nx_ptp_client_utility_convert_time_to_date(&tm, utc_offset, &dt);
556+
if(status != NX_SUCCESS) {
557+
PRINTLN_ERROR("Failed to call nx_ptp_client_utility_convert_time_to_date() (Status: %d/%s).", status, nx_status_toString(status));
558+
return U_ERROR;
559+
}
560+
561+
*datetime = dt;
562+
return U_SUCCESS;
563+
}
564+
565+
/* Gets the number of microseconds since the Unix epoch (1970-01-01 00:00:00 UTC)*/
566+
int ethernet_ptp_get_unix_microseconds(uint64_t* buffer)
567+
{
568+
569+
NX_PTP_DATE_TIME datetime = { 0 };
570+
571+
/* Get PTP datetime. */
572+
int status = ethernet_get_time(&datetime);
573+
if(status != U_SUCCESS) {
574+
PRINTLN_ERROR("Failed to call ethernet_get_time() (Status: %d).", status);
575+
return U_ERROR;
576+
}
577+
578+
serial_monitor("datetime", "nanoseconds", "%d", datetime.nanosecond);
579+
serial_monitor("datetime", "year", "%d", datetime.year);
580+
serial_monitor("datetime", "month", "%d", datetime.month);
581+
serial_monitor("datetime", "day", "%d", datetime.day);
534582

535-
return date;
583+
int y = datetime.year;
584+
int m = datetime.month;
585+
int d = datetime.day;
586+
587+
/* Adjust year and month for March-based counting (simplifies leap year handling) */
588+
if (m <= 2)
589+
{
590+
y--;
591+
m += 12;
592+
}
593+
594+
/* Days from epoch (1970-01-01) using the Rata Die algorithm */
595+
uint32_t days = 365 * y + y / 4 - y / 100 + y / 400
596+
+ (153 * (m - 3) + 2) / 5 + d - 719469;
597+
598+
uint64_t us = (uint64_t)days * 86400LL * 1000000LL
599+
+ (uint64_t)datetime.hour * 3600LL * 1000000LL
600+
+ (uint64_t)datetime.minute * 60LL * 1000000LL
601+
+ (uint64_t)datetime.second * 1000000LL
602+
+ (uint64_t)(datetime.nanosecond / 1000);
603+
604+
*buffer = us;
605+
serial_monitor("datetime", "microseconds from epoch", "%" PRIu64, us);
606+
return U_SUCCESS;
536607
}
537608

538609
UINT ethernet_print_arp_status(void) {

NetX/src/u_nx_protobuf.c

Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
#include <string.h>
2+
#include "u_tx_debug.h"
3+
#include "u_nx_debug.h"
4+
#include "u_nx_protobuf.h"
5+
#include "u_nx_ethernet.h"
6+
#include "pb.h"
7+
#include "pb_encode.h"
8+
#include "tx_api.h"
9+
#include "nxd_mqtt_client.h"
10+
11+
ethernet_mqtt_message_t _nx_protobuf_mqtt_message_create(const char* topic, size_t topic_len, const char* unit, size_t unit_len, const float values[], int values_size) {
12+
/* Zero-initialize the protobuf struct and the sendable ethernet_mqtt_message_t message. */
13+
serverdata_v2_ServerData protobuf = serverdata_v2_ServerData_init_zero;
14+
ethernet_mqtt_message_t message = { 0 };
15+
message.initialized = false;
16+
17+
/* Enforce topic length. */
18+
if(topic_len > PB_MAX_TOPIC_LENGTH) {
19+
PRINTLN_ERROR("MQTT Message topic exceeds maximum length of %d (Topic: %s, Current Topic Length: %d).", PB_MAX_TOPIC_LENGTH, topic, topic_len);
20+
return message; // Return empty, uninitialized message.
21+
}
22+
// NOTE: If using the `nx_protobuf_mqtt_message_create()` macro (as intended), the static asserts should catch this. This is just an extra check in case a caller uses this function directly for whatever reason.
23+
24+
/* Enforce unit length. */
25+
if(unit_len > PB_MAX_UNIT_LENGTH) {
26+
PRINTLN_ERROR("MQTT Unit string length exceeds maximum length of %d (Topic: %s, Current Unit String Length: %d).", PB_MAX_UNIT_LENGTH, topic, unit_len);
27+
return message; // Return empty, uninitialized message.
28+
}
29+
// NOTE: If using the `nx_protobuf_mqtt_message_create()` macro (as intended), the static asserts should catch this. This is just an extra check in case a caller uses this function directly for whatever reason.
30+
31+
/* Enforce minimum number of datapoints. */
32+
if(values_size < PB_MIN_DATAPOINTS) {
33+
PRINTLN_ERROR("Message must have at least %d datapoints (Topic: %s, Current values_size: %d).", PB_MIN_DATAPOINTS, topic, values_size);
34+
return message; // Return empty, uninitialized message.
35+
}
36+
// NOTE: If using the `nx_protobuf_mqtt_message_create()` macro (as intended), the static asserts should catch this. This is just an extra check in case a caller uses this function directly for whatever reason.
37+
38+
/* Enforce maximum number of datapoints. */
39+
if(values_size > PB_MAX_DATAPOINTS) {
40+
PRINTLN_ERROR("Message cannot have more than %d datapoints (Topic: %s, Current values_size: %d).", PB_MAX_DATAPOINTS, topic, values_size);
41+
return message; // Return empty, uninitialized message.
42+
}
43+
// NOTE: If using the `nx_protobuf_mqtt_message_create()` macro (as intended), the static asserts should catch this. This is just an extra check in case a caller uses this function directly for whatever reason.
44+
45+
/* Get the PTP time and convert to appropriate protobuf time. */
46+
uint64_t datetime = 0;
47+
int status = ethernet_ptp_get_unix_microseconds(&datetime);
48+
if(status != U_SUCCESS) {
49+
PRINTLN_ERROR("Failed to get PTP Unix time (Status: %d).", status);
50+
return message; // Return empty, uninitialized message.
51+
}
52+
53+
54+
PRINTLN_INFO("got time: %d", datetime);
55+
56+
/* Pack the protobuf message. */
57+
// CURRENT PROTOBUF SCHEMA LOOKS LIKE THIS:
58+
// typedef struct _serverdata_v2_ServerData {
59+
// char unit[15];
60+
// uint64_t time_us;
61+
// pb_size_t values_count;
62+
// float values[5];
63+
// } serverdata_v2_ServerData;
64+
memcpy(protobuf.unit, unit, unit_len);
65+
protobuf.time_us = datetime;
66+
protobuf.values_count = values_size;
67+
memcpy(protobuf.values, values, values_size * sizeof(float));
68+
69+
/* Pack the `ethernet_mqtt_message_t` object and return it as successfully initialized. */
70+
message.topic = topic;
71+
message.topic_size = topic_len + 1; // u_TODO - for some reason you need to do + 1 here or else it will cut the last letter off of the topic in MQTT ui. The macro probably just calculates the topic length with one less than it should or something
72+
message.protobuf = protobuf;
73+
message.initialized = true;
74+
return message;
75+
}
76+
77+
int nx_protobuf_mqtt_message_send(ethernet_mqtt_message_t* message) {
78+
/* Make sure message isn't nullptr. */
79+
if(!message) {
80+
PRINTLN_ERROR("Null pointer to `ethernet_mqtt_message_t` message.");
81+
return U_ERROR;
82+
}
83+
84+
/* Make sure message is initialized. */
85+
if(!message->initialized) {
86+
PRINTLN_ERROR("Attempting to send an uninitialized `ethernet_mqtt_message_t` message.");
87+
return U_ERROR;
88+
}
89+
90+
/* Set up the buffer. */
91+
unsigned char buffer[serverdata_v2_ServerData_size];
92+
pb_ostream_t stream = pb_ostream_from_buffer(buffer, sizeof(buffer));
93+
94+
/* Encode the protobuf. */
95+
int status = pb_encode(&stream, serverdata_v2_ServerData_fields, &message->protobuf);
96+
if(status != true) {
97+
PRINTLN_ERROR("Failed to serialize protobuf message (Topic: %s): %s", message->topic, PB_GET_ERROR(&stream));
98+
return U_ERROR;
99+
}
100+
101+
/* Publish over MQTT. */
102+
status = ethernet_mqtt_publish(message->topic, message->topic_size, (char*)buffer, stream.bytes_written); // u_TODO - ethernet_mqtt_publish should return U_SUCCESS/U_ERROR instead of the internal netx error macro
103+
if(status != NXD_MQTT_SUCCESS) {
104+
PRINTLN_WARNING("Failed to publish MQTT message (Topic: %s, Status: %d).", message->topic, status);
105+
106+
/* If disconnected, attempt reconnection. */
107+
if(status == NXD_MQTT_NOT_CONNECTED) {
108+
PRINTLN_WARNING("Detected disconnect from MQTT. Attempting reconnection...");
109+
do {
110+
tx_thread_sleep(1000);
111+
status = ethernet_mqtt_reconnect();
112+
PRINTLN_WARNING("Attempting MQTT reconnection (Status: %d/%s).", status, nx_status_toString(status));
113+
} while ((status != NXD_MQTT_SUCCESS) && (status != NXD_MQTT_ALREADY_CONNECTED));
114+
PRINTLN_WARNING("MQTT reconnection successful.");
115+
}
116+
117+
return U_ERROR;
118+
}
119+
120+
/* Return successful! */
121+
PRINTLN_INFO("Sent MQTT message (Topic: %s).", message->topic);
122+
return U_SUCCESS;
123+
}

dev/nanopb/CMakeLists.txt

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
cmake_minimum_required(VERSION 3.22)
2+
3+
set(CMAKE_MODULE_PATH ${CMAKE_CURRENT_SOURCE_DIR}/nanopb/extra)
4+
find_package(Nanopb REQUIRED)
5+
6+
nanopb_generate_cpp(TARGET proto serverdata.proto)
7+
8+
target_link_libraries(${CMAKE_PROJECT_NAME} proto)

dev/nanopb/nanopb

Submodule nanopb added at c716db1

dev/nanopb/serverdata.options

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
serverdata.v2.ServerData.unit max_size: 15
2+
serverdata.v2.ServerData.values max_count: 5

0 commit comments

Comments
 (0)