Skip to content

Commit f9cec75

Browse files
committed
TCP: try all addrinfo entries, add connect timeout for blocking mode, guard repeated stop requests in TCP streamer
1 parent a568c6a commit f9cec75

4 files changed

Lines changed: 83 additions & 87 deletions

File tree

Source/IO/Network.cpp

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -542,9 +542,10 @@ namespace IO
542542

543543
if (SendTo((data[i].getNMEA() + "\r\n").c_str()) < 0)
544544
{
545-
if (!persistent)
545+
if (!persistent && !stop_requested)
546546
{
547547
Error() << "TCP feed: requesting termination.";
548+
stop_requested = true;
548549
StopRequest();
549550
}
550551
}
@@ -557,9 +558,10 @@ namespace IO
557558

558559
if (SendTo((data[i].getJSON() + "\r\n").c_str()) < 0)
559560
{
560-
if (!persistent)
561+
if (!persistent && !stop_requested)
561562
{
562563
Error() << "TCP feed: requesting termination.";
564+
stop_requested = true;
563565
StopRequest();
564566
}
565567
}
@@ -576,9 +578,10 @@ namespace IO
576578

577579
formatInto(data[i], tag, include_sample_start, uuid, "\r\n");
578580

579-
if (SendTo(json.data(), (int)json.size()) < 0 && !persistent)
581+
if (SendTo(json.data(), (int)json.size()) < 0 && !persistent && !stop_requested)
580582
{
581583
Error() << "TCP feed: requesting termination.";
584+
stop_requested = true;
582585
StopRequest();
583586
}
584587
}
@@ -594,9 +597,10 @@ namespace IO
594597
json.clear();
595598
builder.stringify(data[i], json, "\r\n");
596599
if (SendTo(json.data(), (int)json.size()) < 0)
597-
if (!persistent)
600+
if (!persistent && !stop_requested)
598601
{
599602
Critical() << "TCP feed: requesting termination.";
603+
stop_requested = true;
600604
StopRequest();
601605
}
602606
}

Source/IO/Network.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,7 @@ namespace IO
185185
bool persistent = true;
186186
std::string uuid;
187187
bool include_sample_start = false;
188+
bool stop_requested = false;
188189

189190
public:
190191
TCPClientStreamer() : OutputMessage("TCP Client") { fmt = MessageFormat::NMEA; }

Source/IO/Protocol.cpp

Lines changed: 72 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -93,8 +93,6 @@ namespace Protocol
9393
{
9494
state = DISCONNECTED;
9595

96-
int r;
97-
9896
struct addrinfo h;
9997

10098
std::memset(&h, 0, sizeof(h));
@@ -116,8 +114,33 @@ namespace Protocol
116114
return false;
117115
}
118116

119-
sock = socket(ai.get()->ai_family, ai.get()->ai_socktype, ai.get()->ai_protocol);
117+
for (struct addrinfo *p = ai.get(); p != nullptr; p = p->ai_next)
118+
{
119+
if (connectAddress(p))
120+
return true;
121+
}
122+
123+
if (stats)
124+
stats->connect_fail++;
125+
126+
return persistent;
127+
}
128+
129+
// One connect attempt against a single addrinfo entry.
130+
// Returns true if this attempt owns a live socket (READY, or CONNECTING in persistent mode).
131+
// Returns false if this address failed — sock is closed and reset to -1 so the caller can try the next.
132+
bool TCP::connectAddress(struct addrinfo *p)
133+
{
134+
auto fail = [this]() -> bool {
135+
if (sock != -1)
136+
{
137+
closesocket(sock);
138+
sock = -1;
139+
}
140+
return false;
141+
};
120142

143+
sock = socket(p->ai_family, p->ai_socktype, p->ai_protocol);
121144
if (sock == -1)
122145
return false;
123146

@@ -127,13 +150,7 @@ namespace Protocol
127150
#else
128151
if (setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, (void *)&yes, sizeof(yes)) == -1)
129152
#endif
130-
{
131-
if (stats)
132-
stats->connect_fail++;
133-
134-
disconnect();
135-
return false;
136-
}
153+
return fail();
137154

138155
if (keep_alive)
139156
{
@@ -143,108 +160,81 @@ namespace Protocol
143160
#else
144161
if (setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, (void *)&yes, sizeof(yes)))
145162
#endif
146-
{
147-
if (stats)
148-
stats->connect_fail++;
149-
150-
disconnect();
151-
return false;
152-
}
163+
return fail();
153164
#if defined(__APPLE__)
154165
if (setsockopt(sock, IPPROTO_TCP, TCP_KEEPALIVE, &idle, sizeof(idle)))
155-
{
156-
if (stats)
157-
stats->connect_fail++;
158-
159-
disconnect();
160-
return false;
161-
}
166+
return fail();
162167
#elif defined(_WIN32)
163-
// Windows specific keepalive
164168
int interval = 5;
165169
struct tcp_keepalive keepalive;
166170
keepalive.onoff = 1;
167171
keepalive.keepalivetime = idle * 1000;
168172
keepalive.keepaliveinterval = interval * 1000;
169173
DWORD br;
170174
if (WSAIoctl(sock, SIO_KEEPALIVE_VALS, &keepalive, sizeof(keepalive), NULL, 0, &br, NULL, NULL) == SOCKET_ERROR)
171-
{
172-
if (stats)
173-
stats->connect_fail++;
174-
175-
disconnect();
176-
return false;
177-
}
175+
return fail();
178176
#elif defined(__ANDROID__)
179-
// Android uses same config as Linux
180177
int interval = 5;
181178
int count = 2;
182179
if (setsockopt(sock, IPPROTO_TCP, TCP_KEEPIDLE, &idle, sizeof(idle)) ||
183180
setsockopt(sock, IPPROTO_TCP, TCP_KEEPINTVL, &interval, sizeof(interval)) ||
184181
setsockopt(sock, IPPROTO_TCP, TCP_KEEPCNT, &count, sizeof(count)))
185-
{
186-
if (stats)
187-
stats->connect_fail++;
188-
189-
disconnect();
190-
return false;
191-
}
182+
return fail();
192183
#else
193184
int interval = 5;
194185
int count = 2;
195186
if (setsockopt(sock, SOL_TCP, TCP_KEEPIDLE, &idle, sizeof(idle)) ||
196187
setsockopt(sock, SOL_TCP, TCP_KEEPINTVL, &interval, sizeof(interval)) ||
197188
setsockopt(sock, SOL_TCP, TCP_KEEPCNT, &count, sizeof(count)))
198-
{
199-
if (stats)
200-
stats->connect_fail++;
201-
202-
disconnect();
203-
return false;
204-
}
189+
return fail();
205190
#endif
206191
}
207192

208193
if (persistent)
209194
{
210195
#ifndef _WIN32
211-
r = fcntl(sock, F_GETFL, 0);
212-
if (r == -1)
196+
int fl = fcntl(sock, F_GETFL, 0);
197+
if (fl == -1)
213198
{
214-
if (stats)
215-
stats->connect_fail++;
216-
217-
disconnect();
218199
Error() << "TCP (" << host << ":" << port << "): fcntl F_GETFL failed: " << strerror(errno);
219-
return false;
200+
return fail();
220201
}
221202

222-
r = fcntl(sock, F_SETFL, r | O_NONBLOCK);
223-
if (r == -1)
203+
if (fcntl(sock, F_SETFL, fl | O_NONBLOCK) == -1)
224204
{
225-
if (stats)
226-
stats->connect_fail++;
227-
228-
disconnect();
229205
Error() << "TCP (" << host << ":" << port << "): fcntl F_SETFL failed: " << strerror(errno);
230-
return false;
206+
return fail();
231207
}
232208
#else
233-
u_long mode = 1; // 1 to enable non-blocking socket
209+
u_long mode = 1;
234210
if (ioctlsocket(sock, FIONBIO, &mode) != 0)
235211
{
236-
if (stats)
237-
stats->connect_fail++;
238-
239-
disconnect();
240212
Error() << "TCP (" << host << ":" << port << "): ioctlsocket failed. Error code: " << WSAGetLastError();
241-
return false;
213+
return fail();
242214
}
243215
#endif
244216
}
217+
else if (timeout > 0)
218+
{
219+
// Non-persistent mode keeps the socket blocking. Cap blocking I/O (and the
220+
// synchronous ::connect) via SO_SNDTIMEO / SO_RCVTIMEO so a dead host can't
221+
// hang us for the OS default. Best-effort: Windows applies these to connect,
222+
// Linux also does for blocking sockets; other POSIX may not.
223+
#ifdef _WIN32
224+
DWORD tv_ms = (DWORD)timeout * 1000;
225+
if (setsockopt(sock, SOL_SOCKET, SO_SNDTIMEO, (const char *)&tv_ms, sizeof(tv_ms)) ||
226+
setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO, (const char *)&tv_ms, sizeof(tv_ms)))
227+
return fail();
228+
#else
229+
struct timeval tv = { timeout, 0 };
230+
if (setsockopt(sock, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv)) ||
231+
setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)))
232+
return fail();
233+
#endif
234+
}
245235

246236
stamp = time(nullptr);
247-
r = ::connect(sock, ai.get()->ai_addr, (int)ai.get()->ai_addrlen);
237+
int r = ::connect(sock, p->ai_addr, (int)p->ai_addrlen);
248238

249239
if (r != -1)
250240
{
@@ -267,25 +257,25 @@ namespace Protocol
267257
}
268258
#ifndef _WIN32
269259
if (errno != EINPROGRESS)
270-
{
271-
if (stats)
272-
stats->connect_fail++;
273-
274-
disconnect();
275-
return false;
276-
}
260+
return fail();
277261
#else
278262
if (WSAGetLastError() != WSAEWOULDBLOCK)
279-
{
280-
if (stats)
281-
stats->connect_fail++;
263+
return fail();
264+
#endif
282265

283-
disconnect();
266+
// Non-blocking connect in progress.
267+
if (isConnected(timeout))
268+
return true;
269+
270+
// isConnected() may have internally detected async failure and reset sock to -1.
271+
// In that case fall through so the caller tries the next address.
272+
if (sock == -1)
284273
return false;
285-
}
286-
#endif
287274

288-
return isConnected(timeout) || persistent;
275+
if (persistent)
276+
return true;
277+
278+
return fail();
289279
}
290280

291281
bool TCP::isConnected(int t)

Source/IO/Protocol.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -258,7 +258,7 @@ namespace Protocol
258258
persistent = Util::Parse::Switch(value);
259259
break;
260260
case AIS::KEY_SETTING_TIMEOUT:
261-
timeout = std::stoi(value);
261+
timeout = Util::Parse::Integer(value, 0, 3600);
262262
break;
263263
case AIS::KEY_SETTING_KEEP_ALIVE:
264264
keep_alive = Util::Parse::Switch(value);
@@ -315,6 +315,7 @@ namespace Protocol
315315

316316
void updateState();
317317
bool isConnected(int t);
318+
bool connectAddress(struct addrinfo *p);
318319

319320
bool reconnect()
320321
{

0 commit comments

Comments
 (0)