Skip to content

Commit b579d11

Browse files
committed
Fix reconnect
1 parent 2c87eb9 commit b579d11

14 files changed

Lines changed: 258 additions & 165 deletions

inc/finalmq/streamconnection/ConnectionData.h

Lines changed: 35 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,19 +25,49 @@
2525
#include <chrono>
2626
#include <string>
2727

28+
#include "OpenSsl.h"
2829
#include "finalmq/helpers/FmqDefines.h"
30+
#include "finalmq/variant/Variant.h"
2931

3032
namespace finalmq
3133
{
3234
enum class ConnectionState
3335
{
3436
CONNECTIONSTATE_CREATED = 0,
35-
CONNECTIONSTATE_CONNECTING = 1,
36-
CONNECTIONSTATE_CONNECTING_FAILED = 2,
37-
CONNECTIONSTATE_CONNECTED = 3,
38-
CONNECTIONSTATE_DISCONNECTED = 4,
37+
CONNECTIONSTATE_RECONNECT = 1,
38+
CONNECTIONSTATE_CONNECTING = 2,
39+
CONNECTIONSTATE_CONNECTING_FAILED = 3,
40+
CONNECTIONSTATE_CONNECTED = 4,
41+
CONNECTIONSTATE_DISCONNECTED = 5,
3942
};
4043

44+
struct BindProperties
45+
{
46+
CertificateData certificateData{};
47+
Variant protocolData{};
48+
Variant formatData{}; ///< data for the serialization format
49+
};
50+
51+
struct ConnectConfig
52+
{
53+
ConnectConfig(int r = 1000, int t = -1)
54+
: reconnectInterval(r)
55+
, totalReconnectDuration(t)
56+
{
57+
}
58+
int reconnectInterval{1000}; ///< if the server is not available, you can pass a reconnection intervall in [ms]
59+
int totalReconnectDuration{-1}; ///< if the server is not available, you can pass a duration in [ms] how long the reconnect shall happen. -1 means: try for ever.
60+
};
61+
62+
struct ConnectProperties
63+
{
64+
CertificateData certificateData{};
65+
ConnectConfig config{};
66+
Variant protocolData{};
67+
Variant formatData{}; ///< data for the serialization format
68+
};
69+
70+
4171
struct ConnectionData
4272
{
4373
std::int64_t connectionId = 0;
@@ -58,6 +88,7 @@ struct ConnectionData
5888
std::chrono::time_point<std::chrono::steady_clock> startTime{};
5989
bool ssl = false;
6090
ConnectionState connectionState = ConnectionState::CONNECTIONSTATE_CREATED;
91+
ConnectProperties connectionProperties{};
6192
};
6293

6394
} // namespace finalmq

inc/finalmq/streamconnection/StreamConnection.h

Lines changed: 2 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -36,31 +36,10 @@
3636
#include "finalmq/helpers/hybrid_ptr.h"
3737
#include "finalmq/poller/Poller.h"
3838
#include "finalmq/streamconnection/IMessage.h"
39-
#include "finalmq/variant/Variant.h"
4039

4140
namespace finalmq
4241
{
4342

44-
struct BindProperties
45-
{
46-
CertificateData certificateData{};
47-
Variant protocolData{};
48-
Variant formatData{}; ///< data for the serialization format
49-
};
50-
51-
struct ConnectConfig
52-
{
53-
int reconnectInterval = 1000; ///< if the server is not available, you can pass a reconnection intervall in [ms]
54-
int totalReconnectDuration = -1; ///< if the server is not available, you can pass a duration in [ms] how long the reconnect shall happen. -1 means: try for ever.
55-
};
56-
57-
struct ConnectProperties
58-
{
59-
CertificateData certificateData{};
60-
ConnectConfig config{};
61-
Variant protocolData{};
62-
Variant formatData{}; ///< data for the serialization format
63-
};
6443

6544
struct IStreamConnection;
6645
typedef std::shared_ptr<IStreamConnection> IStreamConnectionPtr;
@@ -90,6 +69,7 @@ struct IStreamConnectionPrivate : public IStreamConnection
9069
{
9170
virtual bool connect() = 0;
9271
virtual SocketPtr getSocketPrivate() = 0;
72+
virtual void setSocket(const SocketPtr& socket) = 0;
9373
virtual bool sendPendingMessages() = 0;
9474
virtual bool checkEdgeConnected() = 0;
9575
virtual bool doReconnect() = 0;
@@ -122,6 +102,7 @@ class SYMBOLEXP StreamConnection : public IStreamConnectionPrivate
122102
// IStreamConnectionPrivate
123103
virtual bool connect() override;
124104
virtual SocketPtr getSocketPrivate() override;
105+
virtual void setSocket(const SocketPtr& socket) override;
125106
virtual bool sendPendingMessages() override;
126107
virtual bool checkEdgeConnected() override;
127108
virtual bool doReconnect() override;

inc/finalmq/streamconnection/StreamConnectionContainer.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,6 @@ class SYMBOLEXP StreamConnectionContainer : public IStreamConnectionContainer
9595
IStreamConnectionPrivatePtr findConnectionBySdOnlyForPollerLoop(SOCKET sd);
9696
IStreamConnectionPrivatePtr findConnectionById(std::int64_t connectionId);
9797
bool createSocket(const IStreamConnectionPtr& streamConnection, ConnectionData& connectionData, const ConnectProperties& connectionProperties);
98-
void removeConnection(const SocketDescriptorPtr& sd, std::int64_t connectionId);
9998
void disconnectIntern(const IStreamConnectionPrivatePtr& connectionDisconnect, const SocketDescriptorPtr& sd);
10099
IStreamConnectionPrivatePtr addConnection(const SocketPtr& socket, ConnectionData& connectionData, hybrid_ptr<IStreamConnectionCallback> callback);
101100
void handleConnectionEvents(const IStreamConnectionPrivatePtr& connection, const SocketPtr& socket, const DescriptorInfo& info);
@@ -109,6 +108,7 @@ class SYMBOLEXP StreamConnectionContainer : public IStreamConnectionContainer
109108
std::unordered_map<std::int64_t, IStreamConnectionPrivatePtr> m_connectionId2Connection{};
110109
std::unordered_map<SOCKET, IStreamConnectionPrivatePtr> m_sd2Connection{};
111110
std::unordered_map<SOCKET, IStreamConnectionPrivatePtr> m_sd2ConnectionPollerLoop{};
111+
std::vector<SocketDescriptorPtr> m_socketErase;
112112
std::atomic_flag m_connectionsStable{};
113113
static std::atomic_int64_t m_nextConnectionId;
114114
std::atomic_bool m_terminatePollerLoop{false};

src/poller/PollerImplEpoll.cpp

Lines changed: 20 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -111,27 +111,30 @@ void PollerImplEpoll::addSocketEnableRead(const SocketDescriptorPtr& fd)
111111

112112
void PollerImplEpoll::removeSocket(const SocketDescriptorPtr& fd)
113113
{
114-
std::unique_lock<std::mutex> locker(m_mutex);
115-
auto it = m_socketDescriptors.find(fd);
116-
if (it != m_socketDescriptors.end())
114+
if (fd)
117115
{
118-
epoll_event ev; // to be compatible with linux versions before 2.6.9
119-
ev.events = 0;
120-
ev.data.fd = fd->getDescriptor();
121-
int res = OperatingSystem::instance().epoll_ctl(m_fdEpoll, EPOLL_CTL_DEL, fd->getDescriptor(), &ev);
122-
if (res == -1)
116+
std::unique_lock<std::mutex> locker(m_mutex);
117+
auto it = m_socketDescriptors.find(fd);
118+
if (it != m_socketDescriptors.end())
123119
{
124-
streamFatal << "epoll_ctl failed with errno: " << errno;
120+
epoll_event ev; // to be compatible with linux versions before 2.6.9
121+
ev.events = 0;
122+
ev.data.fd = fd->getDescriptor();
123+
int res = OperatingSystem::instance().epoll_ctl(m_fdEpoll, EPOLL_CTL_DEL, fd->getDescriptor(), &ev);
124+
if (res == -1)
125+
{
126+
streamFatal << "epoll_ctl failed with errno: " << errno;
127+
}
128+
assert(res != -1);
129+
m_socketDescriptors.erase(it);
130+
sockedDescriptorHasChanged();
125131
}
126-
assert(res != -1);
127-
m_socketDescriptors.erase(it);
128-
sockedDescriptorHasChanged();
129-
}
130-
else
131-
{
132-
// socket not added
132+
else
133+
{
134+
// socket not added
135+
}
136+
locker.unlock();
133137
}
134-
locker.unlock();
135138
}
136139

137140
void PollerImplEpoll::enableRead(const SocketDescriptorPtr& fd)

src/protocolsession/ProtocolSession.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -382,7 +382,7 @@ IProtocolPtr ProtocolSession::createRequestConnection()
382382
protocol->setCallback(shared_from_this());
383383
protocol->setConnection(connection);
384384

385-
m_connectionProperties.config.totalReconnectDuration = 0; // only try once, do not make retries to connect (in case of SyncReqRep)
385+
//m_connectionProperties.config.totalReconnectDuration = 0; // only try once, do not make retries to connect (in case of SyncReqRep)
386386

387387
bool res = m_streamConnectionContainer->connect(m_endpointStreamConnection, connection, m_connectionProperties);
388388
if (res)

src/streamconnection/StreamConnection.cpp

Lines changed: 60 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
#include <thread>
2626

2727
#include "finalmq/streamconnection/AddressHelpers.h"
28+
#include "finalmq/helpers/ModulenameFinalmq.h"
2829

2930
#define RELEASE_DISCONNECT 1
3031

@@ -48,54 +49,51 @@ void StreamConnection::sendMessage(const IMessagePtr& msg)
4849
return;
4950
}
5051
std::unique_lock<std::mutex> lock(m_mutex);
51-
if (m_socketPrivate)
52+
ssize_t size = msg->getTotalSendBufferSize();
53+
if (size > 0)
5254
{
53-
ssize_t size = msg->getTotalSendBufferSize();
54-
if (size > 0)
55+
const auto& payloads = msg->getAllSendBuffers();
56+
if (!m_pendingMessages.empty() || m_connectionData.connectionState != ConnectionState::CONNECTIONSTATE_CONNECTED)
57+
{
58+
m_pendingMessages.push_back({msg, payloads.begin(), 0});
59+
}
60+
else
5561
{
56-
const auto& payloads = msg->getAllSendBuffers();
57-
if (!m_pendingMessages.empty() || m_connectionData.connectionState != ConnectionState::CONNECTIONSTATE_CONNECTED)
58-
{
59-
m_pendingMessages.push_back({msg, payloads.begin(), 0});
60-
}
61-
else
62-
{
6362
#if 0
64-
m_pendingMessages.push_back({msg, payloads.begin(), 0});
65-
m_poller->enableWrite(m_socketPrivate->getSocketDescriptor());
63+
m_pendingMessages.push_back({msg, payloads.begin(), 0});
64+
m_poller->enableWrite(m_socketPrivate->getSocketDescriptor());
6665
#else
67-
bool ex = false;
68-
for (auto it = payloads.begin(); it != payloads.end() && !ex; )
69-
{
70-
const BufferRef& payload = *it;
71-
++it;
66+
bool ex = false;
67+
for (auto it = payloads.begin(); it != payloads.end() && !ex; )
68+
{
69+
const BufferRef& payload = *it;
70+
++it;
7271

7372
#ifdef __QNX__
74-
int flags = 0;
73+
int flags = 0;
7574
#else
76-
bool last = (it == payloads.end());
77-
int flags = last ? 0 : MSG_MORE; // win32: MSG_PARTIAL
75+
bool last = (it == payloads.end());
76+
int flags = last ? 0 : MSG_MORE; // win32: MSG_PARTIAL
7877
#endif
7978

8079
#if !defined WIN32
81-
flags |= MSG_NOSIGNAL; // no sigpipe
80+
flags |= MSG_NOSIGNAL; // no sigpipe
8281
#endif
83-
int err = m_socketPrivate->send(payload.first, static_cast<int>(payload.second), flags);
84-
if (err != payload.second)
82+
int err = m_socketPrivate->send(payload.first, static_cast<int>(payload.second), flags);
83+
if (err != payload.second)
84+
{
85+
if (err < 0)
8586
{
86-
if (err < 0)
87-
{
88-
err = 0;
89-
}
90-
assert(err < payload.second);
91-
--it;
92-
m_pendingMessages.push_back({msg, it, err});
93-
m_poller->enableWrite(m_socketPrivate->getSocketDescriptor());
94-
ex = true;
87+
err = 0;
9588
}
89+
assert(err < payload.second);
90+
--it;
91+
m_pendingMessages.push_back({msg, it, err});
92+
m_poller->enableWrite(m_socketPrivate->getSocketDescriptor());
93+
ex = true;
9694
}
97-
#endif
9895
}
96+
#endif
9997
}
10098
}
10199
lock.unlock();
@@ -120,10 +118,19 @@ std::int64_t StreamConnection::getConnectionId() const
120118

121119
SocketPtr StreamConnection::getSocketPrivate()
122120
{
123-
// do not mutex lock here, because the removeSocket and getSocketPrivate will be called from same thread.
121+
std::unique_lock<std::mutex> lock(m_mutex);
124122
return m_socketPrivate;
125123
}
126124

125+
void StreamConnection::setSocket(const SocketPtr& socket)
126+
{
127+
std::unique_lock<std::mutex> lock(m_mutex);
128+
assert(m_socket == nullptr);
129+
assert(m_socketPrivate == nullptr);
130+
m_socket = socket;
131+
m_socketPrivate = socket;
132+
}
133+
127134
SocketPtr StreamConnection::getSocket()
128135
{
129136
std::unique_lock<std::mutex> lock(m_mutex);
@@ -140,18 +147,25 @@ bool StreamConnection::connect()
140147
{
141148
bool connecting = false;
142149
std::unique_lock<std::mutex> lock(m_mutex);
143-
if ((m_connectionData.connectionState == ConnectionState::CONNECTIONSTATE_CREATED || m_connectionData.connectionState == ConnectionState::CONNECTIONSTATE_CONNECTING_FAILED) && m_socketPrivate)
150+
const int sockaddrSize = static_cast<int>(m_connectionData.sockaddr.size());
151+
if ((m_connectionData.connectionState == ConnectionState::CONNECTIONSTATE_CREATED ||
152+
m_connectionData.connectionState == ConnectionState::CONNECTIONSTATE_RECONNECT ||
153+
m_connectionData.connectionState == ConnectionState::CONNECTIONSTATE_CONNECTING_FAILED) && m_socketPrivate && (sockaddrSize > 0))
144154
{
145-
int ret = m_socketPrivate->connect(reinterpret_cast<const sockaddr*>(m_connectionData.sockaddr.c_str()), static_cast<int>(m_connectionData.sockaddr.size()));
155+
m_connectionData.connectionState = ConnectionState::CONNECTIONSTATE_CONNECTING;
156+
int ret = m_socketPrivate->connect(reinterpret_cast<const sockaddr*>(m_connectionData.sockaddr.c_str()), sockaddrSize);
146157
if (ret == 0)
147158
{
148159
connecting = true;
149-
m_connectionData.connectionState = ConnectionState::CONNECTIONSTATE_CONNECTING;
150160
SocketDescriptorPtr sd = m_socketPrivate->getSocketDescriptor();
151161
assert(sd);
152162
m_poller->addSocketEnableRead(sd);
153163
m_poller->enableWrite(sd);
154164
}
165+
else
166+
{
167+
m_connectionData.connectionState = ConnectionState::CONNECTIONSTATE_CONNECTING_FAILED;
168+
}
155169
}
156170
return connecting;
157171
}
@@ -222,7 +236,8 @@ bool StreamConnection::checkEdgeConnected()
222236
{
223237
std::unique_lock<std::mutex> lock(m_mutex);
224238
bool edgeConnected = false;
225-
if (m_connectionData.connectionState == ConnectionState::CONNECTIONSTATE_CONNECTING)
239+
const auto state = m_connectionData.connectionState;
240+
if (state == ConnectionState::CONNECTIONSTATE_CONNECTING)
226241
{
227242
m_connectionData.connectionState = ConnectionState::CONNECTIONSTATE_CONNECTED;
228243
edgeConnected = true;
@@ -233,15 +248,16 @@ bool StreamConnection::checkEdgeConnected()
233248
bool StreamConnection::doReconnect()
234249
{
235250
bool reconnecting = false;
236-
if (!m_connectionData.incomingConnection && m_connectionData.connectionState == ConnectionState::CONNECTIONSTATE_CONNECTING_FAILED && m_connectionData.reconnectInterval >= 0)
251+
if (!m_connectionData.incomingConnection && (m_connectionData.connectionState == ConnectionState::CONNECTIONSTATE_CONNECTING_FAILED) && m_connectionData.reconnectInterval >= 0)
237252
{
238253
std::chrono::time_point<std::chrono::steady_clock> now = std::chrono::steady_clock::now();
239254
std::chrono::duration<double> dur = now - m_lastReconnectTime;
240255
int delta = static_cast<int>(dur.count() * 1000);
241256
if (delta < 0 || delta >= m_connectionData.reconnectInterval)
242257
{
243258
m_lastReconnectTime = now;
244-
reconnecting = connect();
259+
streamInfo << "reconnect " << m_connectionData.endpoint;
260+
reconnecting = true;
245261
}
246262
}
247263
return reconnecting;
@@ -262,22 +278,19 @@ bool StreamConnection::changeStateForDisconnect()
262278
}
263279
else
264280
{
265-
assert(m_socketPrivate);
266281
m_connectionData.connectionState = ConnectionState::CONNECTIONSTATE_CONNECTING_FAILED;
267-
m_poller->removeSocket(m_socketPrivate->getSocketDescriptor());
268282
}
269283
}
270284

271285
if (m_disconnectFlag || (m_connectionData.connectionState == ConnectionState::CONNECTIONSTATE_CONNECTED) || reconnectExpired)
272286
{
273287
removeConnection = true;
274-
275-
assert(m_socketPrivate);
276288
m_connectionData.connectionState = ConnectionState::CONNECTIONSTATE_DISCONNECTED;
277-
m_poller->removeSocket(m_socketPrivate->getSocketDescriptor());
278-
m_socketPrivate = nullptr;
279-
m_socket = nullptr;
280289
}
290+
291+
m_socketPrivate = nullptr;
292+
m_socket = nullptr;
293+
281294
return removeConnection;
282295
}
283296

0 commit comments

Comments
 (0)