Skip to content

Commit 80c389b

Browse files
authored
Merge pull request #118 from CESNET/fix-forwarder-issues
Fix forwarder issues
2 parents 928fa3c + 68ec5a8 commit 80c389b

File tree

9 files changed

+83
-26
lines changed

9 files changed

+83
-26
lines changed

LICENSE

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
Copyright (C) 2015-2017 CESNET, z.s.p.o.
1+
Copyright (C) 2015-2025 CESNET, z.s.p.o.
22

33
Redistribution and use in source and binary forms, with or without
44
modification, are permitted provided that the following conditions

src/core/main.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ print_version()
7373
<< "Build type: " << IPX_BUILD_TYPE << "\n"
7474
<< "Architecture: " << IPX_BUILD_ARCH << " (" << IPX_BUILD_BYTE_ORDER << ")" << "\n"
7575
<< "Compiler: " << IPX_BUILD_COMPILER << "\n"
76-
<< "Copyright (C) 2018 CESNET z.s.p.o.\n";
76+
<< "Copyright (C) 2025 CESNET z.s.p.o.\n";
7777
}
7878

7979
/**

src/plugins/output/forwarder/src/Connection.cpp

Lines changed: 37 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -87,8 +87,7 @@ Connection::forward_message(ipx_msg_ipfix_t *msg)
8787
sender.process_message(msg);
8888

8989
} catch (const ConnectionError &err) {
90-
// In case connection was lost, we have to resend templates when it reconnects
91-
sender.clear_templates();
90+
on_connection_lost();
9291
throw err;
9392
}
9493
}
@@ -107,31 +106,36 @@ Connection::advance_transfers()
107106

108107
IPX_CTX_DEBUG(m_log_ctx, "Waiting transfers on connection %s: %zu", m_ident.c_str(), m_transfers.size());
109108

110-
for (auto it = m_transfers.begin(); it != m_transfers.end(); ) {
109+
try {
110+
for (auto it = m_transfers.begin(); it != m_transfers.end(); ) {
111111

112-
Transfer &transfer = *it;
112+
Transfer &transfer = *it;
113113

114-
assert(transfer.data.size() <= UINT16_MAX); // The transfer consists of one IPFIX message which cannot be larger
114+
assert(transfer.data.size() <= UINT16_MAX); // The transfer consists of one IPFIX message which cannot be larger
115115

116-
ssize_t ret = send(m_sockfd.get(), &transfer.data[transfer.offset],
117-
transfer.data.size() - transfer.offset, MSG_DONTWAIT | MSG_NOSIGNAL);
116+
ssize_t ret = send(m_sockfd.get(), &transfer.data[transfer.offset],
117+
transfer.data.size() - transfer.offset, MSG_DONTWAIT | MSG_NOSIGNAL);
118118

119-
check_socket_error(ret);
119+
check_socket_error(ret);
120120

121-
size_t sent = std::max<ssize_t>(0, ret);
122-
IPX_CTX_DEBUG(m_log_ctx, "Sent %zu/%zu B to %s", sent, transfer.data.size(), m_ident.c_str());
121+
size_t sent = std::max<ssize_t>(0, ret);
122+
IPX_CTX_DEBUG(m_log_ctx, "Sent %zu/%zu B to %s", sent, transfer.data.size(), m_ident.c_str());
123123

124-
// Is the transfer done?
125-
if (transfer.offset + sent == transfer.data.size()) {
126-
it = m_transfers.erase(it);
127-
// Remove the transfer and continue with the next one
124+
// Is the transfer done?
125+
if (transfer.offset + sent == transfer.data.size()) {
126+
it = m_transfers.erase(it);
127+
// Remove the transfer and continue with the next one
128128

129-
} else {
130-
transfer.offset += sent;
129+
} else {
130+
transfer.offset += sent;
131131

132-
// Finish, cannot advance next transfer before the one before it is fully sent
133-
break;
132+
// Finish, cannot advance next transfer before the one before it is fully sent
133+
break;
134+
}
134135
}
136+
} catch (ConnectionError& err) {
137+
on_connection_lost();
138+
throw err;
135139
}
136140
}
137141

@@ -236,8 +240,8 @@ Connection::get_or_create_sender(ipx_msg_ipfix_t *msg)
236240
send_message(msg);
237241
},
238242
m_con_params.protocol == Protocol::TCP,
239-
m_tmplts_resend_pkts,
240-
m_tmplts_resend_secs)));
243+
m_con_params.protocol != Protocol::TCP ? m_tmplts_resend_pkts : 0,
244+
m_con_params.protocol != Protocol::TCP ? m_tmplts_resend_secs : 0)));
241245
}
242246

243247
Sender &sender = *m_senders[odid].get();
@@ -261,3 +265,16 @@ Connection::check_socket_error(ssize_t sock_ret)
261265
throw ConnectionError(errbuf);
262266
}
263267
}
268+
269+
void
270+
Connection::on_connection_lost()
271+
{
272+
for (auto& p : m_senders) {
273+
// In case connection was lost, we have to resend templates when it reconnects
274+
Sender& sender = *p.second.get();
275+
sender.clear_templates();
276+
}
277+
278+
// Do not continue any of the transfers that haven't been finished so we don't end up in the middle of a message
279+
m_transfers.clear();
280+
}

src/plugins/output/forwarder/src/Connection.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -181,4 +181,7 @@ class Connection {
181181

182182
void
183183
check_socket_error(ssize_t sock_ret);
184-
};
184+
185+
void
186+
on_connection_lost();
187+
};

src/plugins/output/forwarder/src/Forwarder.cpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,14 @@ void Forwarder::handle_ipfix_message(ipx_msg_ipfix_t *msg)
105105
}
106106
}
107107

108+
void
109+
Forwarder::handle_periodic_message(ipx_msg_periodic_t *msg)
110+
{
111+
for (auto &host : m_hosts) {
112+
host->advance_transfers();
113+
}
114+
}
115+
108116
void
109117
Forwarder::forward_to_all(ipx_msg_ipfix_t *msg)
110118
{

src/plugins/output/forwarder/src/Forwarder.h

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,13 @@ class Forwarder {
8080
void
8181
handle_ipfix_message(ipx_msg_ipfix_t *msg);
8282

83+
/**
84+
* \brief Handle a periodic message
85+
* \param msg The periodic message
86+
*/
87+
void
88+
handle_periodic_message(ipx_msg_periodic_t *msg);
89+
8390
/**
8491
* \brief The destructor - finalize the forwarder
8592
*/
@@ -101,4 +108,5 @@ class Forwarder {
101108

102109
void
103110
forward_round_robin(ipx_msg_ipfix_t *msg);
104-
};
111+
112+
};

src/plugins/output/forwarder/src/Host.cpp

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,17 @@ Host::forward_message(ipx_msg_ipfix_t *msg)
142142
return true;
143143
}
144144

145+
void
146+
Host::advance_transfers()
147+
{
148+
for (auto &p : m_session_to_connection) {
149+
Connection &connection = *p.second.get();
150+
if (connection.check_connected()) {
151+
connection.advance_transfers();
152+
}
153+
}
154+
}
155+
145156
Host::~Host()
146157
{
147158
for (auto &p : m_session_to_connection) {

src/plugins/output/forwarder/src/Host.h

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,12 @@ class Host {
100100
bool
101101
forward_message(ipx_msg_ipfix_t *msg);
102102

103+
/**
104+
* \brief Advance the unfinished transfers
105+
*/
106+
void
107+
advance_transfers();
108+
103109
private:
104110
const std::string &m_ident;
105111

@@ -116,4 +122,4 @@ class Host {
116122
Connector &m_connector;
117123

118124
std::unordered_map<const ipx_session *, std::unique_ptr<Connection>> m_session_to_connection;
119-
};
125+
};

src/plugins/output/forwarder/src/main.cpp

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ ipx_plugin_init(ipx_ctx_t *ctx, const char *xml_config)
8888
return IPX_ERR_DENIED;
8989
}
9090

91-
ipx_msg_mask_t mask = IPX_MSG_IPFIX | IPX_MSG_SESSION;
91+
ipx_msg_mask_t mask = IPX_MSG_IPFIX | IPX_MSG_SESSION | IPX_MSG_PERIODIC;
9292
ipx_ctx_subscribe(ctx, &mask, NULL);
9393
ipx_ctx_private_set(ctx, forwarder.release());
9494

@@ -121,6 +121,10 @@ ipx_plugin_process(ipx_ctx_t *ctx, void *priv, ipx_msg_t *msg)
121121
forwarder.handle_session_message(ipx_msg_base2session(msg));
122122
break;
123123

124+
case IPX_MSG_PERIODIC:
125+
forwarder.handle_periodic_message(ipx_msg_base2periodic(msg));
126+
break;
127+
124128
default: assert(0);
125129
}
126130

0 commit comments

Comments
 (0)