Skip to content

Commit 0d6761e

Browse files
committed
Expand traffic monitoring to PostgreSQL and N2K outputs, handle empty AIS messages in psql
1 parent 31323d4 commit 0d6761e

5 files changed

Lines changed: 36 additions & 2 deletions

File tree

Source/DBMS/PostgreSQL.cpp

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,19 +35,24 @@ namespace IO
3535
{
3636
if (PQstatus(con) != CONNECTION_OK)
3737
{
38+
stats.connected = 0;
3839
Warning() << "DBMS: Connection to PostgreSQL lost. Attempting to reset...";
3940
PQreset(con);
4041

4142
if (PQstatus(con) != CONNECTION_OK)
4243
{
4344
Error() << "DBMS: Could not reset connection. Aborting post.";
4445
conn_fails++;
46+
stats.connect_fail++;
4547
return;
4648
}
4749
else
4850
{
4951
Warning() << "DBMS: Connection successfully reset.";
5052
conn_fails = 0;
53+
stats.connected = 1;
54+
stats.connect_ok++;
55+
stats.reconnects++;
5156
}
5257
}
5358

@@ -92,7 +97,11 @@ namespace IO
9297
7, nullptr, params, nullptr, nullptr, 0);
9398

9499
if (PQresultStatus(res) == PGRES_TUPLES_OK && PQntuples(res) > 0)
100+
{
95101
msg_id = PQgetvalue(res, 0, 0);
102+
for (const char *p : params)
103+
stats.bytes_out += strlen(p);
104+
}
96105
else
97106
{
98107
Error() << "DBMS: ais_message insert failed: " << PQerrorMessage(con);
@@ -122,6 +131,10 @@ namespace IO
122131
Error() << "DBMS: ais_nmea insert failed: " << PQerrorMessage(con);
123132
ok = false;
124133
}
134+
else
135+
{
136+
stats.bytes_out += nmea.length();
137+
}
125138
PQclear(res);
126139
if (!ok)
127140
break;
@@ -169,8 +182,8 @@ namespace IO
169182
}
170183
}
171184

172-
// 4. Vessel upsert
173-
if (ok && VD)
185+
// 4. Vessel upsert (requires identifiable mmsi)
186+
if (ok && VD && entry.mmsi != "0")
174187
ok = execVessel(entry, msg_id_ptr);
175188

176189
// 5. Property inserts
@@ -375,9 +388,14 @@ namespace IO
375388
con = PQconnectdb(conn_string.c_str());
376389

377390
if (con == nullptr || PQstatus(con) != CONNECTION_OK)
391+
{
392+
stats.connect_fail++;
378393
throw std::runtime_error("DBMS: cannot open database :" + std::string(PQerrorMessage(con)));
394+
}
379395

380396
conn_fails = 0;
397+
stats.connected = 1;
398+
stats.connect_ok++;
381399

382400
PGresult *res = PQexec(con, "SELECT key_id, key_str FROM ais_keys");
383401
if (PQresultStatus(res) != PGRES_TUPLES_OK)

Source/DBMS/PostgreSQL.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,9 @@ namespace IO
9191

9292
#ifdef HASPSQL
9393
void process();
94+
using StreamIn<AIS::Message>::Receive;
95+
using StreamIn<AIS::GPS>::Receive;
96+
using StreamIn<JSON::JSON>::Receive;
9497
void Receive(const JSON::JSON *data, int len, TAG &tag);
9598
#endif
9699

Source/IO/N2KInterface.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,7 @@ namespace N2K
223223
{
224224
std::lock_guard<std::mutex> lock(mtx);
225225
NMEA2000.SendMsg(N2kMsg);
226+
bytes_sent += N2kMsg.DataLen;
226227
}
227228
}
228229

Source/IO/N2KInterface.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ namespace N2K
7878
bool output = false;
7979

8080
bool connected = false;
81+
uint64_t bytes_sent = 0;
8182

8283
void onOpen();
8384
static void onOpenStatic();
@@ -128,6 +129,9 @@ namespace N2K
128129
}
129130

130131
void sendMsg(const tN2kMsg &N2kMsg);
132+
133+
bool isConnected() const { return connected; }
134+
uint64_t getBytesSent() const { return bytes_sent; }
131135
};
132136

133137
#else
@@ -136,6 +140,8 @@ namespace N2K
136140
public:
137141
void Start() {}
138142
void Stop() {}
143+
bool isConnected() const { return false; }
144+
uint64_t getBytesSent() const { return 0; }
139145
};
140146

141147
#endif

Source/IO/N2KStream.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -811,6 +811,12 @@ namespace IO
811811
default:
812812
break;
813813
}
814+
815+
bool was_connected = stats.connected != 0;
816+
stats.connected = N2K::N2KInterface.isConnected() ? 1u : 0u;
817+
if (!was_connected && stats.connected)
818+
stats.connect_ok++;
819+
stats.bytes_out = N2K::N2KInterface.getBytesSent();
814820
return;
815821
}
816822
}

0 commit comments

Comments
 (0)