Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 75 additions & 25 deletions src/Internet/Logging/Loom_MongoDB/Loom_MongoDB.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,10 @@ bool Loom_MongoDB::publish(Loom_BatchSD &batchSD) {
return false;
}

char line[MAX_JSON_SIZE];
int packetNumber = 0, index = 0;
char c;
char line[MAX_JSON_SIZE] = {};
int packetNumber = 0;
int index = 0;
bool lineOverflow = false;
if (moduleInitialized) {
// TIMER_DISABLE;
if (batchSD.shouldPublish()) {
Expand All @@ -153,42 +154,91 @@ bool Loom_MongoDB::publish(Loom_BatchSD &batchSD) {
File fileOutput = batchSD.getBatch();

bool allDataSuccess = true;
auto publishCurrentLine = [&]() {
packetNumber++;

if (lineOverflow) {
snprintf_P(output, OUTPUT_SIZE,
PSTR("Dropped oversized packet #%i (len >= %i bytes)"), packetNumber,
MAX_JSON_SIZE);
WARNING(output);
allDataSuccess = false;
lineOverflow = false;
index = 0;
line[0] = '\0';
return;
}

/* Utilize a stream so it doesn't matter how much data we have as its read in one by one
*/
while (fileOutput.available()) {
c = fileOutput.read();
if (index <= 0) {
line[0] = '\0';
return;
}

// Ensure null-termination before publish
line[index] = '\0';

// \r Marks the end of a line, at this point we want to publish that whole packet
if (c == '\r') {
snprintf_P(output, OUTPUT_SIZE, PSTR("Publishing Packet %i of %i with len=%d"),
packetNumber, batchSD.getBatchSize(), index);
LOG(output);

// Track the packet number we are currently publishing
snprintf_P(output, OUTPUT_SIZE, PSTR("Publishing Packet %i of %i"),
packetNumber + 1, batchSD.getBatchSize());
LOG(output);
if (!publishMessage(topic, line)) {
snprintf_P(output, OUTPUT_SIZE, PSTR("Failed to publish packet #%i"),
packetNumber);
WARNING(output);
allDataSuccess = false;
}

// Replace the \r with a null character
line[index] = '\0';
delay(500);
index = 0;
line[0] = '\0';
};

if (!publishMessage(topic, line)) { // This fails if the line is greater than
// 2000 bytes Or if the line is malformed
snprintf(output, OUTPUT_SIZE, PSTR("Failed to publish packet #%i"),
packetNumber + 1);
WARNING(output);
/* Utilize a stream so it doesn't matter how much data we have as its read in one by one
*/
while (fileOutput.available()) {
int readValue = fileOutput.read();
if (readValue < 0) {
WARNING(F("Failed to read from batch file."));
allDataSuccess = false;
break;
}
char c = (char)readValue;

/* Attempt to reconnect if connection has been stopped during publishMessage
* The previous packet that was lost due a stopped connected will not be
* retransmitted.
*/
if (!isConnected()) {
connectToBroker();
if (!isConnected()) {
WARNING(F("Connection lost during batch publish."));
allDataSuccess = false;
break;
}
}

delay(500);
index = 0;
packetNumber++;
// Handle both CR and LF line endings.
if (c == '\r' || c == '\n') {
// Ignore empty lines and LF from CRLF pairs.
if (index > 0 || lineOverflow) {
publishCurrentLine();
}
continue;
}

// If not just add the packet to the line array
else {
// Add data while leaving room for null terminator.
if (index < (MAX_JSON_SIZE - 1)) {
line[index] = c;
index++;
} else {
lineOverflow = true;
}
}

// Flush trailing data if file does not end in CR/LF.
if (index > 0 || lineOverflow) {
publishCurrentLine();
}
fileOutput.close();

// Check if we actually sent all the data successfully
Expand Down
19 changes: 19 additions & 0 deletions src/Internet/Logging/MQTTComponent/MQTTComponent.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,25 @@ bool MQTTComponent::connectToBroker() {
//////////////////////////////////////////////////////////////////////////////////////////////////////
bool MQTTComponent::publishMessage(const char *topic, const char *message, bool retain, int qos) {
FUNCTION_START;
if (topic == nullptr || message == nullptr) {
ERROR(F("Topic or message pointer was null."));
FUNCTION_END;
return false;
}

size_t topicLen = strlen(topic);
if (topicLen >= MAX_TOPIC_LENGTH) {
ERROR(F("Topic length exceeds MAX_TOPIC_LENGTH."));
FUNCTION_END;
return false;
}

size_t messageLen = strlen(message);
if (messageLen >= MAX_JSON_SIZE) {
ERROR(F("Message length exceeds MAX_JSON_SIZE."));
FUNCTION_END;
return false;
}

// Make sure the module is initialized
if (moduleInitialized && internetClient.moduleInitialized) {
Expand Down