diff --git a/inc/finalmq/streamconnection/ConnectionData.h b/inc/finalmq/streamconnection/ConnectionData.h index 305c28d9..b3632ae8 100644 --- a/inc/finalmq/streamconnection/ConnectionData.h +++ b/inc/finalmq/streamconnection/ConnectionData.h @@ -25,19 +25,49 @@ #include #include +#include "OpenSsl.h" #include "finalmq/helpers/FmqDefines.h" +#include "finalmq/variant/Variant.h" namespace finalmq { enum class ConnectionState { CONNECTIONSTATE_CREATED = 0, - CONNECTIONSTATE_CONNECTING = 1, - CONNECTIONSTATE_CONNECTING_FAILED = 2, - CONNECTIONSTATE_CONNECTED = 3, - CONNECTIONSTATE_DISCONNECTED = 4, + CONNECTIONSTATE_RECONNECT = 1, + CONNECTIONSTATE_CONNECTING = 2, + CONNECTIONSTATE_CONNECTING_FAILED = 3, + CONNECTIONSTATE_CONNECTED = 4, + CONNECTIONSTATE_DISCONNECTED = 5, }; +struct BindProperties +{ + CertificateData certificateData{}; + Variant protocolData{}; + Variant formatData{}; ///< data for the serialization format +}; + +struct ConnectConfig +{ + ConnectConfig(int r = 1000, int t = -1) + : reconnectInterval(r) + , totalReconnectDuration(t) + { + } + int reconnectInterval{1000}; ///< if the server is not available, you can pass a reconnection intervall in [ms] + int totalReconnectDuration{-1}; ///< if the server is not available, you can pass a duration in [ms] how long the reconnect shall happen. -1 means: try for ever. +}; + +struct ConnectProperties +{ + CertificateData certificateData{}; + ConnectConfig config{}; + Variant protocolData{}; + Variant formatData{}; ///< data for the serialization format +}; + + struct ConnectionData { std::int64_t connectionId = 0; @@ -58,6 +88,7 @@ struct ConnectionData std::chrono::time_point startTime{}; bool ssl = false; ConnectionState connectionState = ConnectionState::CONNECTIONSTATE_CREATED; + ConnectProperties connectionProperties{}; }; } // namespace finalmq diff --git a/inc/finalmq/streamconnection/StreamConnection.h b/inc/finalmq/streamconnection/StreamConnection.h index b7bcbd63..fecd7441 100644 --- a/inc/finalmq/streamconnection/StreamConnection.h +++ b/inc/finalmq/streamconnection/StreamConnection.h @@ -36,31 +36,10 @@ #include "finalmq/helpers/hybrid_ptr.h" #include "finalmq/poller/Poller.h" #include "finalmq/streamconnection/IMessage.h" -#include "finalmq/variant/Variant.h" namespace finalmq { -struct BindProperties -{ - CertificateData certificateData{}; - Variant protocolData{}; - Variant formatData{}; ///< data for the serialization format -}; - -struct ConnectConfig -{ - int reconnectInterval = 1000; ///< if the server is not available, you can pass a reconnection intervall in [ms] - int totalReconnectDuration = -1; ///< if the server is not available, you can pass a duration in [ms] how long the reconnect shall happen. -1 means: try for ever. -}; - -struct ConnectProperties -{ - CertificateData certificateData{}; - ConnectConfig config{}; - Variant protocolData{}; - Variant formatData{}; ///< data for the serialization format -}; struct IStreamConnection; typedef std::shared_ptr IStreamConnectionPtr; @@ -90,6 +69,7 @@ struct IStreamConnectionPrivate : public IStreamConnection { virtual bool connect() = 0; virtual SocketPtr getSocketPrivate() = 0; + virtual void setSocket(const SocketPtr& socket) = 0; virtual bool sendPendingMessages() = 0; virtual bool checkEdgeConnected() = 0; virtual bool doReconnect() = 0; @@ -122,6 +102,7 @@ class SYMBOLEXP StreamConnection : public IStreamConnectionPrivate // IStreamConnectionPrivate virtual bool connect() override; virtual SocketPtr getSocketPrivate() override; + virtual void setSocket(const SocketPtr& socket) override; virtual bool sendPendingMessages() override; virtual bool checkEdgeConnected() override; virtual bool doReconnect() override; diff --git a/inc/finalmq/streamconnection/StreamConnectionContainer.h b/inc/finalmq/streamconnection/StreamConnectionContainer.h index b9d66f61..fcdea1e1 100644 --- a/inc/finalmq/streamconnection/StreamConnectionContainer.h +++ b/inc/finalmq/streamconnection/StreamConnectionContainer.h @@ -95,7 +95,6 @@ class SYMBOLEXP StreamConnectionContainer : public IStreamConnectionContainer IStreamConnectionPrivatePtr findConnectionBySdOnlyForPollerLoop(SOCKET sd); IStreamConnectionPrivatePtr findConnectionById(std::int64_t connectionId); bool createSocket(const IStreamConnectionPtr& streamConnection, ConnectionData& connectionData, const ConnectProperties& connectionProperties); - void removeConnection(const SocketDescriptorPtr& sd, std::int64_t connectionId); void disconnectIntern(const IStreamConnectionPrivatePtr& connectionDisconnect, const SocketDescriptorPtr& sd); IStreamConnectionPrivatePtr addConnection(const SocketPtr& socket, ConnectionData& connectionData, hybrid_ptr callback); void handleConnectionEvents(const IStreamConnectionPrivatePtr& connection, const SocketPtr& socket, const DescriptorInfo& info); @@ -109,6 +108,7 @@ class SYMBOLEXP StreamConnectionContainer : public IStreamConnectionContainer std::unordered_map m_connectionId2Connection{}; std::unordered_map m_sd2Connection{}; std::unordered_map m_sd2ConnectionPollerLoop{}; + std::vector m_socketErase; std::atomic_flag m_connectionsStable{}; static std::atomic_int64_t m_nextConnectionId; std::atomic_bool m_terminatePollerLoop{false}; diff --git a/src/poller/PollerImplEpoll.cpp b/src/poller/PollerImplEpoll.cpp index 65201b36..cc097c4e 100644 --- a/src/poller/PollerImplEpoll.cpp +++ b/src/poller/PollerImplEpoll.cpp @@ -111,27 +111,30 @@ void PollerImplEpoll::addSocketEnableRead(const SocketDescriptorPtr& fd) void PollerImplEpoll::removeSocket(const SocketDescriptorPtr& fd) { - std::unique_lock locker(m_mutex); - auto it = m_socketDescriptors.find(fd); - if (it != m_socketDescriptors.end()) + if (fd) { - epoll_event ev; // to be compatible with linux versions before 2.6.9 - ev.events = 0; - ev.data.fd = fd->getDescriptor(); - int res = OperatingSystem::instance().epoll_ctl(m_fdEpoll, EPOLL_CTL_DEL, fd->getDescriptor(), &ev); - if (res == -1) + std::unique_lock locker(m_mutex); + auto it = m_socketDescriptors.find(fd); + if (it != m_socketDescriptors.end()) { - streamFatal << "epoll_ctl failed with errno: " << errno; + epoll_event ev; // to be compatible with linux versions before 2.6.9 + ev.events = 0; + ev.data.fd = fd->getDescriptor(); + int res = OperatingSystem::instance().epoll_ctl(m_fdEpoll, EPOLL_CTL_DEL, fd->getDescriptor(), &ev); + if (res == -1) + { + streamFatal << "epoll_ctl failed with errno: " << errno; + } + assert(res != -1); + m_socketDescriptors.erase(it); + sockedDescriptorHasChanged(); } - assert(res != -1); - m_socketDescriptors.erase(it); - sockedDescriptorHasChanged(); - } - else - { - // socket not added + else + { + // socket not added + } + locker.unlock(); } - locker.unlock(); } void PollerImplEpoll::enableRead(const SocketDescriptorPtr& fd) diff --git a/src/protocolsession/ProtocolSession.cpp b/src/protocolsession/ProtocolSession.cpp index 7249b372..180e768e 100644 --- a/src/protocolsession/ProtocolSession.cpp +++ b/src/protocolsession/ProtocolSession.cpp @@ -382,7 +382,7 @@ IProtocolPtr ProtocolSession::createRequestConnection() protocol->setCallback(shared_from_this()); protocol->setConnection(connection); - m_connectionProperties.config.totalReconnectDuration = 0; // only try once, do not make retries to connect (in case of SyncReqRep) + //m_connectionProperties.config.totalReconnectDuration = 0; // only try once, do not make retries to connect (in case of SyncReqRep) bool res = m_streamConnectionContainer->connect(m_endpointStreamConnection, connection, m_connectionProperties); if (res) diff --git a/src/streamconnection/StreamConnection.cpp b/src/streamconnection/StreamConnection.cpp index 5af457be..5521fe2b 100644 --- a/src/streamconnection/StreamConnection.cpp +++ b/src/streamconnection/StreamConnection.cpp @@ -25,6 +25,7 @@ #include #include "finalmq/streamconnection/AddressHelpers.h" +#include "finalmq/helpers/ModulenameFinalmq.h" #define RELEASE_DISCONNECT 1 @@ -48,54 +49,51 @@ void StreamConnection::sendMessage(const IMessagePtr& msg) return; } std::unique_lock lock(m_mutex); - if (m_socketPrivate) + ssize_t size = msg->getTotalSendBufferSize(); + if (size > 0) { - ssize_t size = msg->getTotalSendBufferSize(); - if (size > 0) + const auto& payloads = msg->getAllSendBuffers(); + if (!m_pendingMessages.empty() || m_connectionData.connectionState != ConnectionState::CONNECTIONSTATE_CONNECTED) + { + m_pendingMessages.push_back({msg, payloads.begin(), 0}); + } + else { - const auto& payloads = msg->getAllSendBuffers(); - if (!m_pendingMessages.empty() || m_connectionData.connectionState != ConnectionState::CONNECTIONSTATE_CONNECTED) - { - m_pendingMessages.push_back({msg, payloads.begin(), 0}); - } - else - { #if 0 - m_pendingMessages.push_back({msg, payloads.begin(), 0}); - m_poller->enableWrite(m_socketPrivate->getSocketDescriptor()); + m_pendingMessages.push_back({msg, payloads.begin(), 0}); + m_poller->enableWrite(m_socketPrivate->getSocketDescriptor()); #else - bool ex = false; - for (auto it = payloads.begin(); it != payloads.end() && !ex; ) - { - const BufferRef& payload = *it; - ++it; + bool ex = false; + for (auto it = payloads.begin(); it != payloads.end() && !ex; ) + { + const BufferRef& payload = *it; + ++it; #ifdef __QNX__ - int flags = 0; + int flags = 0; #else - bool last = (it == payloads.end()); - int flags = last ? 0 : MSG_MORE; // win32: MSG_PARTIAL + bool last = (it == payloads.end()); + int flags = last ? 0 : MSG_MORE; // win32: MSG_PARTIAL #endif #if !defined WIN32 - flags |= MSG_NOSIGNAL; // no sigpipe + flags |= MSG_NOSIGNAL; // no sigpipe #endif - int err = m_socketPrivate->send(payload.first, static_cast(payload.second), flags); - if (err != payload.second) + int err = m_socketPrivate->send(payload.first, static_cast(payload.second), flags); + if (err != payload.second) + { + if (err < 0) { - if (err < 0) - { - err = 0; - } - assert(err < payload.second); - --it; - m_pendingMessages.push_back({msg, it, err}); - m_poller->enableWrite(m_socketPrivate->getSocketDescriptor()); - ex = true; + err = 0; } + assert(err < payload.second); + --it; + m_pendingMessages.push_back({msg, it, err}); + m_poller->enableWrite(m_socketPrivate->getSocketDescriptor()); + ex = true; } -#endif } +#endif } } lock.unlock(); @@ -120,10 +118,19 @@ std::int64_t StreamConnection::getConnectionId() const SocketPtr StreamConnection::getSocketPrivate() { - // do not mutex lock here, because the removeSocket and getSocketPrivate will be called from same thread. + std::unique_lock lock(m_mutex); return m_socketPrivate; } +void StreamConnection::setSocket(const SocketPtr& socket) +{ + std::unique_lock lock(m_mutex); + assert(m_socket == nullptr); + assert(m_socketPrivate == nullptr); + m_socket = socket; + m_socketPrivate = socket; +} + SocketPtr StreamConnection::getSocket() { std::unique_lock lock(m_mutex); @@ -140,18 +147,25 @@ bool StreamConnection::connect() { bool connecting = false; std::unique_lock lock(m_mutex); - if ((m_connectionData.connectionState == ConnectionState::CONNECTIONSTATE_CREATED || m_connectionData.connectionState == ConnectionState::CONNECTIONSTATE_CONNECTING_FAILED) && m_socketPrivate) + const int sockaddrSize = static_cast(m_connectionData.sockaddr.size()); + if ((m_connectionData.connectionState == ConnectionState::CONNECTIONSTATE_CREATED || + m_connectionData.connectionState == ConnectionState::CONNECTIONSTATE_RECONNECT || + m_connectionData.connectionState == ConnectionState::CONNECTIONSTATE_CONNECTING_FAILED) && m_socketPrivate && (sockaddrSize > 0)) { - int ret = m_socketPrivate->connect(reinterpret_cast(m_connectionData.sockaddr.c_str()), static_cast(m_connectionData.sockaddr.size())); + m_connectionData.connectionState = ConnectionState::CONNECTIONSTATE_CONNECTING; + int ret = m_socketPrivate->connect(reinterpret_cast(m_connectionData.sockaddr.c_str()), sockaddrSize); if (ret == 0) { connecting = true; - m_connectionData.connectionState = ConnectionState::CONNECTIONSTATE_CONNECTING; SocketDescriptorPtr sd = m_socketPrivate->getSocketDescriptor(); assert(sd); m_poller->addSocketEnableRead(sd); m_poller->enableWrite(sd); } + else + { + m_connectionData.connectionState = ConnectionState::CONNECTIONSTATE_CONNECTING_FAILED; + } } return connecting; } @@ -222,7 +236,8 @@ bool StreamConnection::checkEdgeConnected() { std::unique_lock lock(m_mutex); bool edgeConnected = false; - if (m_connectionData.connectionState == ConnectionState::CONNECTIONSTATE_CONNECTING) + const auto state = m_connectionData.connectionState; + if (state == ConnectionState::CONNECTIONSTATE_CONNECTING) { m_connectionData.connectionState = ConnectionState::CONNECTIONSTATE_CONNECTED; edgeConnected = true; @@ -233,7 +248,7 @@ bool StreamConnection::checkEdgeConnected() bool StreamConnection::doReconnect() { bool reconnecting = false; - if (!m_connectionData.incomingConnection && m_connectionData.connectionState == ConnectionState::CONNECTIONSTATE_CONNECTING_FAILED && m_connectionData.reconnectInterval >= 0) + if (!m_connectionData.incomingConnection && (m_connectionData.connectionState == ConnectionState::CONNECTIONSTATE_CONNECTING_FAILED) && m_connectionData.reconnectInterval >= 0) { std::chrono::time_point now = std::chrono::steady_clock::now(); std::chrono::duration dur = now - m_lastReconnectTime; @@ -241,7 +256,8 @@ bool StreamConnection::doReconnect() if (delta < 0 || delta >= m_connectionData.reconnectInterval) { m_lastReconnectTime = now; - reconnecting = connect(); + streamInfo << "reconnect " << m_connectionData.endpoint; + reconnecting = true; } } return reconnecting; @@ -262,22 +278,19 @@ bool StreamConnection::changeStateForDisconnect() } else { - assert(m_socketPrivate); m_connectionData.connectionState = ConnectionState::CONNECTIONSTATE_CONNECTING_FAILED; - m_poller->removeSocket(m_socketPrivate->getSocketDescriptor()); } } if (m_disconnectFlag || (m_connectionData.connectionState == ConnectionState::CONNECTIONSTATE_CONNECTED) || reconnectExpired) { removeConnection = true; - - assert(m_socketPrivate); m_connectionData.connectionState = ConnectionState::CONNECTIONSTATE_DISCONNECTED; - m_poller->removeSocket(m_socketPrivate->getSocketDescriptor()); - m_socketPrivate = nullptr; - m_socket = nullptr; } + + m_socketPrivate = nullptr; + m_socket = nullptr; + return removeConnection; } diff --git a/src/streamconnection/StreamConnectionContainer.cpp b/src/streamconnection/StreamConnectionContainer.cpp index 0d81cf2a..ef36e1d9 100644 --- a/src/streamconnection/StreamConnectionContainer.cpp +++ b/src/streamconnection/StreamConnectionContainer.cpp @@ -28,6 +28,7 @@ #include "finalmq/streamconnection/AddressHelpers.h" #include "finalmq/streamconnection/Socket.h" #include "finalmq/streamconnection/streamconnection.fmq.h" +#include "finalmq/helpers/ModulenameFinalmq.h" #if defined(WIN32) || defined(__MINGW32__) || defined(__QNX__) #include "finalmq/poller/PollerImplSelect.h" @@ -37,6 +38,7 @@ #if !defined(WIN32) && !defined(__MINGW32__) #include +#include #include #include @@ -54,6 +56,7 @@ #define RELEASE_TERMINATE 4 + namespace finalmq { std::atomic_int64_t StreamConnectionContainer::m_nextConnectionId{1}; @@ -288,11 +291,11 @@ IStreamConnectionPtr StreamConnectionContainer::createConnection(hybrid_ptr(); + //SocketPtr socket = std::make_shared(); IStreamConnectionPrivatePtr connection; - connection = addConnection(socket, connectionData, callback); + connection = addConnection(nullptr, connectionData, callback); assert(connection); return connection; @@ -300,11 +303,7 @@ IStreamConnectionPtr StreamConnectionContainer::createConnection(hybrid_ptrgetSocket(); - if (socket == nullptr) - { - return false; - } + SocketPtr socket = std::make_shared(); // the endpoint should not been set twice! assert(socket->getSocketDescriptor() == nullptr); @@ -321,24 +320,24 @@ bool StreamConnectionContainer::createSocket(const IStreamConnectionPtr& streamC ret = socket->create(connectionData.af, connectionData.type, connectionData.protocol); } - (void)connectionProperties; - if (ret) { SocketDescriptorPtr sd = socket->getSocketDescriptor(); assert(sd); connectionData.sd = sd->getDescriptor(); - AddressHelpers::addr2peer(reinterpret_cast(const_cast(connectionData.sockaddr.c_str())), connectionData); - IStreamConnectionPrivatePtr streamConnectionPrivate; + IStreamConnectionPrivatePtr streamConnectionPrivate; std::unique_lock lock(m_mutex); auto it = m_connectionId2Connection.find(connectionData.connectionId); if (it != m_connectionId2Connection.end()) { streamConnectionPrivate = it->second; assert(streamConnectionPrivate); + streamConnectionPrivate->setSocket(socket); streamConnectionPrivate->updateConnectionData(connectionData); - m_sd2Connection[connectionData.sd] = it->second; + auto& entry = m_sd2Connection[connectionData.sd]; + assert(entry == nullptr); + entry = it->second; m_connectionsStable.clear(std::memory_order_release); } else @@ -367,56 +366,98 @@ bool StreamConnectionContainer::connect(const std::string& endpoint, const IStre ConnectProperties connectionPropertiesToUse = connectionProperties; getConnectPropertiesFromEndpoint(endpoint, connectionPropertiesToUse); - ConnectionData connectionData = AddressHelpers::endpoint2ConnectionData(endpoint); - connectionData.connectionId = connection->getConnectionId(); - connectionData.incomingConnection = false; - connectionData.reconnectInterval = connectionPropertiesToUse.config.reconnectInterval; - connectionData.totalReconnectDuration = connectionPropertiesToUse.config.totalReconnectDuration; - connectionData.startTime = std::chrono::steady_clock::now(); - connectionData.ssl = connectionPropertiesToUse.certificateData.ssl; - connectionData.connectionState = ConnectionState::CONNECTIONSTATE_CREATED; + ConnectionData connectionData = connection->getConnectionData(); + if (connectionData.connectionState == ConnectionState::CONNECTIONSTATE_CREATED) + { + connectionData = AddressHelpers::endpoint2ConnectionData(endpoint); + connectionData.connectionId = connection->getConnectionId(); + connectionData.incomingConnection = false; + connectionData.reconnectInterval = connectionPropertiesToUse.config.reconnectInterval; + connectionData.totalReconnectDuration = connectionPropertiesToUse.config.totalReconnectDuration; + connectionData.startTime = std::chrono::steady_clock::now(); + connectionData.ssl = connectionPropertiesToUse.certificateData.ssl; + connectionData.connectionProperties = connectionPropertiesToUse; + } + else + { + connectionData.connectionState = ConnectionState::CONNECTIONSTATE_RECONNECT; + } connection->updateConnectionData(connectionData); + bool doAsyncGetHostByName = false; - std::string addr = AddressHelpers::makeSocketAddress(connectionData.hostname, connectionData.port, connectionData.af, true, doAsyncGetHostByName); - bool ret = false; - if (doAsyncGetHostByName) - { - ret = true; - std::string hostname = connectionData.hostname; - m_executorWorker->addAction([this, connectionData, connection, connectionPropertiesToUse]() mutable { - struct in_addr ipAddress; - bool ok = AddressHelpers::getHostByName(connectionData.hostname, ipAddress); - if (ok) - { - struct sockaddr_in addrTcp; - memset(&addrTcp, 0, sizeof(sockaddr_in)); + std::string addr = connectionData.sockaddr; + if (addr.empty()) // don't do getHostByName for reconnect + { + addr = AddressHelpers::makeSocketAddress(connectionData.hostname, connectionData.port, connectionData.af, true, doAsyncGetHostByName); + } + bool ret = createSocket(connection, connectionData, connectionPropertiesToUse); + if (ret) + { + if (doAsyncGetHostByName) + { + m_executorWorker->addAction([this, connectionData, connection, connectionPropertiesToUse]() mutable { + struct in_addr ipAddress; + bool ok = AddressHelpers::getHostByName(connectionData.hostname, ipAddress); + if (ok) + { + struct sockaddr_in addrTcp; + memset(&addrTcp, 0, sizeof(sockaddr_in)); #ifdef WIN32 - addrTcp.sin_family = static_cast(connectionData.af); + addrTcp.sin_family = static_cast(connectionData.af); #else - addrTcp.sin_family = static_cast(connectionData.af); + addrTcp.sin_family = static_cast(connectionData.af); #endif - addrTcp.sin_addr.s_addr = ipAddress.s_addr; - addrTcp.sin_port = htons(static_cast(connectionData.port)); - connectionData.sockaddr = std::string(reinterpret_cast(&addrTcp), sizeof(sockaddr_in)); - ok = createSocket(connection, connectionData, connectionPropertiesToUse); - if (ok) + addrTcp.sin_addr.s_addr = ipAddress.s_addr; + addrTcp.sin_port = htons(static_cast(connectionData.port)); + connectionData.sockaddr = std::string(reinterpret_cast(&addrTcp), sizeof(sockaddr_in)); + AddressHelpers::addr2peer(reinterpret_cast(const_cast(connectionData.sockaddr.c_str())), connectionData); + IStreamConnectionPrivatePtr streamConnectionPrivate; + std::unique_lock lock(m_mutex); + auto it = m_connectionId2Connection.find(connectionData.connectionId); + if (it != m_connectionId2Connection.end()) + { + streamConnectionPrivate = it->second; + assert(streamConnectionPrivate); + streamConnectionPrivate->updateConnectionData(connectionData); + } + else + { + ok = false; + } + lock.unlock(); + if (ok) + { + ok = connection->connect(); + } + } + if (!ok) { - ok = connection->connect(); + connection->disconnect(); } + }); + } + else + { + connectionData.sockaddr = addr; + AddressHelpers::addr2peer(reinterpret_cast(const_cast(connectionData.sockaddr.c_str())), connectionData); + IStreamConnectionPrivatePtr streamConnectionPrivate; + std::unique_lock lock(m_mutex); + auto it = m_connectionId2Connection.find(connectionData.connectionId); + if (it != m_connectionId2Connection.end()) + { + streamConnectionPrivate = it->second; + assert(streamConnectionPrivate); + streamConnectionPrivate->updateConnectionData(connectionData); } - if (!ok) + else { - connection->disconnect(); + ret = false; + } + lock.unlock(); + if (ret) + { + ret = connection->connect(); } - }); - } - else - { - connectionData.sockaddr = addr; - ret = createSocket(connection, connectionData, connectionPropertiesToUse); - if (ret) - { - ret = connection->connect(); } } if (!ret) @@ -458,24 +499,24 @@ IStreamConnectionPtr StreamConnectionContainer::getConnection(std::int64_t conne ////////////// -void StreamConnectionContainer::removeConnection(const SocketDescriptorPtr& sd, std::int64_t connectionId) +void StreamConnectionContainer::disconnectIntern(const IStreamConnectionPrivatePtr& connectionDisconnect, const SocketDescriptorPtr& sd) { + bool removeConn = connectionDisconnect->changeStateForDisconnect(); + m_poller->removeSocket(sd); std::unique_lock lock(m_mutex); if (sd) { + m_socketErase.push_back(sd); m_sd2Connection.erase(sd->getDescriptor()); m_connectionsStable.clear(std::memory_order_release); } - m_connectionId2Connection.erase(connectionId); + if (removeConn) + { + m_connectionId2Connection.erase(connectionDisconnect->getConnectionId()); + } lock.unlock(); -} - -void StreamConnectionContainer::disconnectIntern(const IStreamConnectionPrivatePtr& connectionDisconnect, const SocketDescriptorPtr& sd) -{ - bool removeConn = connectionDisconnect->changeStateForDisconnect(); if (removeConn) { - removeConnection(sd, connectionDisconnect->getConnectionId()); connectionDisconnect->disconnected(connectionDisconnect); } } @@ -776,7 +817,14 @@ void StreamConnectionContainer::doReconnect() for (const auto& connection : connections) { - connection->doReconnect(); + IStreamConnectionPtr streamConnection = connection; + bool reconnecting = connection->doReconnect(); + if (reconnecting) + { + //SocketPtr socket = streamConnection->getSocket(); + //disconnectIntern(connection, socket ? socket->getSocketDescriptor() : SocketDescriptorPtr(nullptr)); + connect(connection->getConnectionData().endpoint, streamConnection, connection->getConnectionData().connectionProperties); + } } } @@ -805,12 +853,20 @@ void StreamConnectionContainer::pollerLoop() m_lastReconnectTime = std::chrono::steady_clock::now(); while (!m_terminatePollerLoop) { + if (!m_connectionsStable.test_and_set(std::memory_order_acq_rel)) + { + std::unique_lock lock(m_mutex); + m_sd2ConnectionPollerLoop = m_sd2Connection; + m_socketErase.clear(); + } + const PollerResult& result = m_poller->wait(1000); - if (m_connectionsStable.test_and_set(std::memory_order_acq_rel)) + if (!m_connectionsStable.test_and_set(std::memory_order_acq_rel)) { std::unique_lock lock(m_mutex); m_sd2ConnectionPollerLoop = m_sd2Connection; + m_socketErase.clear(); } if (result.releaseWait) @@ -842,15 +898,23 @@ void StreamConnectionContainer::pollerLoop() // last chance to send pending messages connectionDisconnect->sendPendingMessages(); SocketPtr socket = connectionDisconnect->getSocketPrivate(); + SocketDescriptorPtr sd; if (socket) { - SocketDescriptorPtr sd = socket->getSocketDescriptor(); - disconnectIntern(connectionDisconnect, sd); + sd = socket->getSocketDescriptor(); } + disconnectIntern(connectionDisconnect, sd); } } } + if (!m_connectionsStable.test_and_set(std::memory_order_acq_rel)) + { + std::unique_lock lock(m_mutex); + m_sd2ConnectionPollerLoop = m_sd2Connection; + m_socketErase.clear(); + } + if (result.error) { // error of the poller @@ -870,6 +934,7 @@ void StreamConnectionContainer::pollerLoop() SocketPtr socket = connection->getSocketPrivate(); if (socket) { + assert(info.sd == socket->getSocketDescriptor()->getDescriptor()); // remove for performance handleConnectionEvents(connection, socket, info); } } diff --git a/test/testIntegrationProtocolDelimiter.cpp b/test/testIntegrationProtocolDelimiter.cpp index 10b4b469..68bae8fb 100644 --- a/test/testIntegrationProtocolDelimiter.cpp +++ b/test/testIntegrationProtocolDelimiter.cpp @@ -222,7 +222,7 @@ TEST_F(TestIntegrationProtocolDelimiterSessionContainer, testReconnectExpires) message->addSendPayload(MESSAGE1_BUFFER); connection->sendMessage(message); - waitTillDone(expectDisconnected, 5000); + waitTillDone(expectDisconnected, 10000); EXPECT_EQ(connection->getConnectionData().connectionState, ConnectionState::CONNECTIONSTATE_DISCONNECTED); EXPECT_EQ(m_sessionContainer->getSession(connection->getSessionId()), nullptr); diff --git a/test/testIntegrationProtocolDelimiterSsl.cpp b/test/testIntegrationProtocolDelimiterSsl.cpp index 4bb45192..6ee5c1b8 100644 --- a/test/testIntegrationProtocolDelimiterSsl.cpp +++ b/test/testIntegrationProtocolDelimiterSsl.cpp @@ -189,7 +189,7 @@ TEST_F(TestIntegrationProtocolDelimiterLinefeedSessionContainerSsl, testReconnec message->addSendPayload(MESSAGE1_BUFFER); connection->sendMessage(message); - waitTillDone(expectDisconnected, 5000); + waitTillDone(expectDisconnected, 10000); EXPECT_EQ(connection->getConnectionData().connectionState, ConnectionState::CONNECTIONSTATE_DISCONNECTED); EXPECT_EQ(m_sessionContainer->getSession(connection->getSessionId()), nullptr); diff --git a/test/testIntegrationProtocolHeaderBinarySize.cpp b/test/testIntegrationProtocolHeaderBinarySize.cpp index f8b47427..a3a16446 100644 --- a/test/testIntegrationProtocolHeaderBinarySize.cpp +++ b/test/testIntegrationProtocolHeaderBinarySize.cpp @@ -180,7 +180,7 @@ TEST_F(TestIntegrationProtocolHeaderBinarySize, testReconnectExpires) message->addSendPayload(MESSAGE1_BUFFER); connection->sendMessage(message); - waitTillDone(expectDisconnected, 5000); + waitTillDone(expectDisconnected, 10000); EXPECT_EQ(connection->getConnectionData().connectionState, ConnectionState::CONNECTIONSTATE_DISCONNECTED); EXPECT_EQ(m_sessionContainer->getSession(connection->getSessionId()), nullptr); diff --git a/test/testIntegrationProtocolHttp.cpp b/test/testIntegrationProtocolHttp.cpp index a494b031..aeeea799 100644 --- a/test/testIntegrationProtocolHttp.cpp +++ b/test/testIntegrationProtocolHttp.cpp @@ -169,7 +169,7 @@ TEST_F(TestIntegrationProtocolHttp, testReconnectExpires) message->addSendPayload(MESSAGE1_BUFFER); connection->sendMessage(message); - for (int i = 0; i < 5000 / 10; ++i) + for (int i = 0; i < 10000 / 10; ++i) { if (connection->getConnectionData().connectionState == ConnectionState::CONNECTIONSTATE_DISCONNECTED) { @@ -259,7 +259,7 @@ TEST_F(TestIntegrationProtocolHttp, testSendMultipleMessages) EXPECT_EQ(res, 0); auto& expectConnectedClient = EXPECT_CALL(*m_mockClientCallback, connected(_)).Times(1); - EXPECT_CALL(*m_mockServerCallback, connected(_)).Times(testing::Between(6, 10000)); + EXPECT_CALL(*m_mockServerCallback, connected(_)).Times(testing::Between(4, 10000)); auto& expectReceive = EXPECT_CALL(*m_mockServerCallback, received(_, ReceivedMessage(MESSAGE1_BUFFER))) .Times(10000) .WillRepeatedly(Invoke([](const IProtocolSessionPtr& session, const IMessagePtr& message) { diff --git a/test/testIntegrationProtocolStreamSessionContainer.cpp b/test/testIntegrationProtocolStreamSessionContainer.cpp index 9e69424a..3d270afe 100644 --- a/test/testIntegrationProtocolStreamSessionContainer.cpp +++ b/test/testIntegrationProtocolStreamSessionContainer.cpp @@ -141,13 +141,13 @@ TEST_F(TestIntegrationProtocolStreamSessionContainer, testConnectBind) int res = m_sessionContainer->bind("tcp://*:3333:stream", m_mockServerCallback); EXPECT_EQ(res, 0); - waitTillDone(expectConnected, 5000); + waitTillDone(expectConnected, 2000); EXPECT_EQ(connection->getConnectionData().connectionState, ConnectionState::CONNECTIONSTATE_CONNECTED); EXPECT_EQ(m_sessionContainer->getSession(connection->getSessionId()), connection); } -TEST_F(TestIntegrationProtocolStreamSessionContainer, testConnectBindConnectPropertiesJson) +TEST_F(TestIntegrationProtocolStreamSessionContainer, test_ConnectBindConnectPropertiesJson) { auto& expectConnected = EXPECT_CALL(*m_mockClientCallback, connected(_)).Times(1); EXPECT_CALL(*m_mockServerCallback, connected(_)).Times(1); @@ -200,7 +200,7 @@ TEST_F(TestIntegrationProtocolStreamSessionContainer, testReconnectExpires) message->addSendPayload(MESSAGE1_BUFFER); connection->sendMessage(message); - waitTillDone(expectDisconnected, 5000); + waitTillDone(expectDisconnected, 10000); EXPECT_EQ(connection->getConnectionData().connectionState, ConnectionState::CONNECTIONSTATE_DISCONNECTED); EXPECT_EQ(m_sessionContainer->getSession(connection->getSessionId()), nullptr); diff --git a/test/testIntegrationStreamConnectionContainer.cpp b/test/testIntegrationStreamConnectionContainer.cpp index 135cf991..1b1e8779 100644 --- a/test/testIntegrationStreamConnectionContainer.cpp +++ b/test/testIntegrationStreamConnectionContainer.cpp @@ -203,7 +203,7 @@ TEST_F(TestIntegrationStreamConnectionContainer, testReconnectExpires) message->addSendPayload(MESSAGE1_BUFFER); connection->sendMessage(message); - waitTillDone(expectDisconnected, 5000); + waitTillDone(expectDisconnected, 10000); EXPECT_EQ(connection->getConnectionData().connectionState, ConnectionState::CONNECTIONSTATE_DISCONNECTED); EXPECT_EQ(m_connectionContainer->getConnection(connection->getConnectionData().connectionId), nullptr); diff --git a/test/testIntegrationStreamConnectionContainerSsl.cpp b/test/testIntegrationStreamConnectionContainerSsl.cpp index a164470d..cdec06f1 100644 --- a/test/testIntegrationStreamConnectionContainerSsl.cpp +++ b/test/testIntegrationStreamConnectionContainerSsl.cpp @@ -209,7 +209,7 @@ TEST_F(TestIntegrationStreamConnectionContainerSsl, testReconnectExpires) message->addSendPayload(MESSAGE1_BUFFER); connection->sendMessage(message); - waitTillDone(expectDisconnected, 5000); + waitTillDone(expectDisconnected, 10000); EXPECT_EQ(connection->getConnectionData().connectionState, ConnectionState::CONNECTIONSTATE_DISCONNECTED); EXPECT_EQ(m_connectionContainer->getConnection(connection->getConnectionData().connectionId), nullptr);