Skip to content

Commit 2e38f0a

Browse files
committed
debug
1 parent 562c236 commit 2e38f0a

5 files changed

Lines changed: 64 additions & 24 deletions

File tree

inc/finalmq/streamconnection/ConnectionData.h

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,13 @@ struct BindProperties
4949

5050
struct ConnectConfig
5151
{
52-
int reconnectInterval = 1000; ///< if the server is not available, you can pass a reconnection intervall in [ms]
53-
int totalReconnectDuration = -1; ///< if the server is not available, you can pass a duration in [ms] how long the reconnect shall happen. -1 means: try for ever.
52+
ConnectConfig(int r = 1000, int t = -1)
53+
: reconnectInterval(r)
54+
, totalReconnectDuration(t)
55+
{
56+
}
57+
int reconnectInterval{1000}; ///< if the server is not available, you can pass a reconnection intervall in [ms]
58+
int totalReconnectDuration{-1}; ///< if the server is not available, you can pass a duration in [ms] how long the reconnect shall happen. -1 means: try for ever.
5459
};
5560

5661
struct ConnectProperties

src/poller/PollerImplEpoll.cpp

Lines changed: 20 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -111,27 +111,30 @@ void PollerImplEpoll::addSocketEnableRead(const SocketDescriptorPtr& fd)
111111

112112
void PollerImplEpoll::removeSocket(const SocketDescriptorPtr& fd)
113113
{
114-
std::unique_lock<std::mutex> locker(m_mutex);
115-
auto it = m_socketDescriptors.find(fd);
116-
if (it != m_socketDescriptors.end())
114+
if (fd)
117115
{
118-
epoll_event ev; // to be compatible with linux versions before 2.6.9
119-
ev.events = 0;
120-
ev.data.fd = fd->getDescriptor();
121-
int res = OperatingSystem::instance().epoll_ctl(m_fdEpoll, EPOLL_CTL_DEL, fd->getDescriptor(), &ev);
122-
if (res == -1)
116+
std::unique_lock<std::mutex> locker(m_mutex);
117+
auto it = m_socketDescriptors.find(fd);
118+
if (it != m_socketDescriptors.end())
123119
{
124-
streamFatal << "epoll_ctl failed with errno: " << errno;
120+
epoll_event ev; // to be compatible with linux versions before 2.6.9
121+
ev.events = 0;
122+
ev.data.fd = fd->getDescriptor();
123+
int res = OperatingSystem::instance().epoll_ctl(m_fdEpoll, EPOLL_CTL_DEL, fd->getDescriptor(), &ev);
124+
if (res == -1)
125+
{
126+
streamFatal << "epoll_ctl failed with errno: " << errno;
127+
}
128+
assert(res != -1);
129+
m_socketDescriptors.erase(it);
130+
sockedDescriptorHasChanged();
125131
}
126-
assert(res != -1);
127-
m_socketDescriptors.erase(it);
128-
sockedDescriptorHasChanged();
129-
}
130-
else
131-
{
132-
// socket not added
132+
else
133+
{
134+
// socket not added
135+
}
136+
locker.unlock();
133137
}
134-
locker.unlock();
135138
}
136139

137140
void PollerImplEpoll::enableRead(const SocketDescriptorPtr& fd)

src/protocolsession/ProtocolSession.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -382,7 +382,7 @@ IProtocolPtr ProtocolSession::createRequestConnection()
382382
protocol->setCallback(shared_from_this());
383383
protocol->setConnection(connection);
384384

385-
m_connectionProperties.config.totalReconnectDuration = 0; // only try once, do not make retries to connect (in case of SyncReqRep)
385+
//m_connectionProperties.config.totalReconnectDuration = 0; // only try once, do not make retries to connect (in case of SyncReqRep)
386386

387387
bool res = m_streamConnectionContainer->connect(m_endpointStreamConnection, connection, m_connectionProperties);
388388
if (res)

src/streamconnection/StreamConnection.cpp

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
#include "finalmq/streamconnection/StreamConnection.h"
2424

2525
#include <thread>
26+
#include <iostream>
2627

2728
#include "finalmq/streamconnection/AddressHelpers.h"
2829
#include "finalmq/helpers/ModulenameFinalmq.h"
@@ -246,7 +247,11 @@ bool StreamConnection::checkEdgeConnected()
246247
bool StreamConnection::doReconnect()
247248
{
248249
bool reconnecting = false;
249-
if (!m_connectionData.incomingConnection && m_connectionData.connectionState == ConnectionState::CONNECTIONSTATE_CONNECTING_FAILED && m_connectionData.reconnectInterval >= 0)
250+
if (!m_connectionData.incomingConnection)
251+
{
252+
std::cout << "state: " << (int)m_connectionData.connectionState << std::endl;
253+
}
254+
if (!m_connectionData.incomingConnection && (/*m_connectionData.connectionState == ConnectionState::CONNECTIONSTATE_CONNECTING ||*/ m_connectionData.connectionState == ConnectionState::CONNECTIONSTATE_CONNECTING_FAILED) && m_connectionData.reconnectInterval >= 0)
250255
{
251256
std::chrono::time_point<std::chrono::steady_clock> now = std::chrono::steady_clock::now();
252257
std::chrono::duration<double> dur = now - m_lastReconnectTime;
@@ -255,7 +260,7 @@ bool StreamConnection::doReconnect()
255260
{
256261
m_lastReconnectTime = now;
257262
std::unique_lock<std::mutex> lock(m_mutex);
258-
if (m_connectionData.connectionState == ConnectionState::CONNECTIONSTATE_CONNECTING_FAILED)
263+
if (/*m_connectionData.connectionState == ConnectionState::CONNECTIONSTATE_CONNECTING ||*/ m_connectionData.connectionState == ConnectionState::CONNECTIONSTATE_CONNECTING_FAILED)
259264
{
260265
reconnecting = true;
261266
}
@@ -275,6 +280,7 @@ bool StreamConnection::changeStateForDisconnect()
275280
int delta = static_cast<int>(dur.count() * 1000);
276281
if (m_connectionData.totalReconnectDuration >= 0 && (delta < 0 || delta >= m_connectionData.totalReconnectDuration))
277282
{
283+
std::cout << "EXPIRED " << m_connectionData.totalReconnectDuration << " " << delta << std::endl;
278284
reconnectExpired = true;
279285
}
280286
else

src/streamconnection/StreamConnectionContainer.cpp

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
#endif
4949

5050
#include <thread>
51+
#include <iostream>
5152

5253
#include <assert.h>
5354

@@ -335,6 +336,7 @@ bool StreamConnectionContainer::createSocket(const IStreamConnectionPtr& streamC
335336
assert(streamConnectionPrivate);
336337
streamConnectionPrivate->setSocket(socket);
337338
streamConnectionPrivate->updateConnectionData(connectionData);
339+
std::cout << "create connect socket " << connectionData.sd << std::endl;
338340
auto& entry = m_sd2Connection[connectionData.sd];
339341
assert(entry == nullptr);
340342
entry = it->second;
@@ -496,6 +498,7 @@ void StreamConnectionContainer::disconnectIntern(const IStreamConnectionPrivateP
496498
std::unique_lock<std::mutex> lock(m_mutex);
497499
if (sd)
498500
{
501+
std::cout << "REMOVE SOCKET " << sd->getDescriptor() << std::endl;
499502
m_socketErase.push_back(sd);
500503
m_sd2Connection.erase(sd->getDescriptor());
501504
m_connectionsStable.clear(std::memory_order_release);
@@ -507,6 +510,7 @@ void StreamConnectionContainer::disconnectIntern(const IStreamConnectionPrivateP
507510
lock.unlock();
508511
if (removeConn)
509512
{
513+
std::cout << "CONNECTION GONE" << std::endl;
510514
connectionDisconnect->disconnected(connectionDisconnect);
511515
}
512516
}
@@ -538,6 +542,7 @@ IStreamConnectionPrivatePtr StreamConnectionContainer::addConnection(const Socke
538542
m_connectionId2Connection[connectionId] = connection;
539543
if (connectionData.sd != INVALID_SOCKET)
540544
{
545+
std::cout << "create BIND socket " << connectionData.sd << std::endl;
541546
m_sd2Connection[connectionData.sd] = connection;
542547
m_connectionsStable.clear(std::memory_order_release);
543548
}
@@ -812,6 +817,9 @@ void StreamConnectionContainer::doReconnect()
812817
if (reconnecting)
813818
{
814819
streamInfo << "reconnect " << connection->getConnectionData().endpoint;
820+
SocketPtr socket = streamConnection->getSocket();
821+
std::cout << "reconnect" << std::endl;
822+
disconnectIntern(connection, socket ? socket->getSocketDescriptor() : SocketDescriptorPtr(nullptr));
815823
connect(connection->getConnectionData().endpoint, streamConnection, connection->getConnectionData().connectionProperties);
816824
}
817825
}
@@ -842,20 +850,22 @@ void StreamConnectionContainer::pollerLoop()
842850
m_lastReconnectTime = std::chrono::steady_clock::now();
843851
while (!m_terminatePollerLoop)
844852
{
845-
if (m_connectionsStable.test_and_set(std::memory_order_acq_rel))
853+
if (!m_connectionsStable.test_and_set(std::memory_order_acq_rel))
846854
{
847855
std::unique_lock<std::mutex> lock(m_mutex);
848856
m_sd2ConnectionPollerLoop = m_sd2Connection;
849857
m_socketErase.clear();
858+
std::cout << "A" << std::endl;
850859
}
851860

852861
const PollerResult& result = m_poller->wait(1000);
853862

854-
if (m_connectionsStable.test_and_set(std::memory_order_acq_rel))
863+
if (!m_connectionsStable.test_and_set(std::memory_order_acq_rel))
855864
{
856865
std::unique_lock<std::mutex> lock(m_mutex);
857866
m_sd2ConnectionPollerLoop = m_sd2Connection;
858867
m_socketErase.clear();
868+
std::cout << "B" << std::endl;
859869
}
860870

861871
if (result.releaseWait)
@@ -897,6 +907,21 @@ void StreamConnectionContainer::pollerLoop()
897907
}
898908
}
899909

910+
if (!m_connectionsStable.test_and_set(std::memory_order_acq_rel))
911+
{
912+
std::unique_lock<std::mutex> lock(m_mutex);
913+
m_sd2ConnectionPollerLoop = m_sd2Connection;
914+
m_socketErase.clear();
915+
std::cout << "C" << std::endl;
916+
}
917+
918+
std::cout << "descriptors: ";
919+
for (auto it = m_sd2ConnectionPollerLoop.begin(); it != m_sd2ConnectionPollerLoop.end(); ++it)
920+
{
921+
std::cout << it->first << " ";
922+
}
923+
std::cout << std::endl;
924+
900925
if (result.error)
901926
{
902927
// error of the poller
@@ -916,6 +941,7 @@ void StreamConnectionContainer::pollerLoop()
916941
SocketPtr socket = connection->getSocketPrivate();
917942
if (socket)
918943
{
944+
std::cout << info.sd << " " << socket->getSocketDescriptor()->getDescriptor() << std::endl;
919945
assert(info.sd == socket->getSocketDescriptor()->getDescriptor());
920946
handleConnectionEvents(connection, socket, info);
921947
}

0 commit comments

Comments
 (0)