Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 35 additions & 4 deletions inc/finalmq/streamconnection/ConnectionData.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,49 @@
#include <chrono>
#include <string>

#include "OpenSsl.h"
#include "finalmq/helpers/FmqDefines.h"
#include "finalmq/variant/Variant.h"

namespace finalmq
{
enum class ConnectionState
{
CONNECTIONSTATE_CREATED = 0,
CONNECTIONSTATE_CONNECTING = 1,
CONNECTIONSTATE_CONNECTING_FAILED = 2,
CONNECTIONSTATE_CONNECTED = 3,
CONNECTIONSTATE_DISCONNECTED = 4,
CONNECTIONSTATE_RECONNECT = 1,
CONNECTIONSTATE_CONNECTING = 2,
CONNECTIONSTATE_CONNECTING_FAILED = 3,
CONNECTIONSTATE_CONNECTED = 4,
CONNECTIONSTATE_DISCONNECTED = 5,
};

struct BindProperties
{
CertificateData certificateData{};
Variant protocolData{};
Variant formatData{}; ///< data for the serialization format
};

struct ConnectConfig
{
ConnectConfig(int r = 1000, int t = -1)
: reconnectInterval(r)
, totalReconnectDuration(t)
{
}
int reconnectInterval{1000}; ///< if the server is not available, you can pass a reconnection intervall in [ms]
int totalReconnectDuration{-1}; ///< if the server is not available, you can pass a duration in [ms] how long the reconnect shall happen. -1 means: try for ever.
};

struct ConnectProperties
{
CertificateData certificateData{};
ConnectConfig config{};
Variant protocolData{};
Variant formatData{}; ///< data for the serialization format
};


struct ConnectionData
{
std::int64_t connectionId = 0;
Expand All @@ -58,6 +88,7 @@ struct ConnectionData
std::chrono::time_point<std::chrono::steady_clock> startTime{};
bool ssl = false;
ConnectionState connectionState = ConnectionState::CONNECTIONSTATE_CREATED;
ConnectProperties connectionProperties{};
};

} // namespace finalmq
23 changes: 2 additions & 21 deletions inc/finalmq/streamconnection/StreamConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,31 +36,10 @@
#include "finalmq/helpers/hybrid_ptr.h"
#include "finalmq/poller/Poller.h"
#include "finalmq/streamconnection/IMessage.h"
#include "finalmq/variant/Variant.h"

namespace finalmq
{

struct BindProperties
{
CertificateData certificateData{};
Variant protocolData{};
Variant formatData{}; ///< data for the serialization format
};

struct ConnectConfig
{
int reconnectInterval = 1000; ///< if the server is not available, you can pass a reconnection intervall in [ms]
int totalReconnectDuration = -1; ///< if the server is not available, you can pass a duration in [ms] how long the reconnect shall happen. -1 means: try for ever.
};

struct ConnectProperties
{
CertificateData certificateData{};
ConnectConfig config{};
Variant protocolData{};
Variant formatData{}; ///< data for the serialization format
};

struct IStreamConnection;
typedef std::shared_ptr<IStreamConnection> IStreamConnectionPtr;
Expand Down Expand Up @@ -90,6 +69,7 @@ struct IStreamConnectionPrivate : public IStreamConnection
{
virtual bool connect() = 0;
virtual SocketPtr getSocketPrivate() = 0;
virtual void setSocket(const SocketPtr& socket) = 0;
virtual bool sendPendingMessages() = 0;
virtual bool checkEdgeConnected() = 0;
virtual bool doReconnect() = 0;
Expand Down Expand Up @@ -122,6 +102,7 @@ class SYMBOLEXP StreamConnection : public IStreamConnectionPrivate
// IStreamConnectionPrivate
virtual bool connect() override;
virtual SocketPtr getSocketPrivate() override;
virtual void setSocket(const SocketPtr& socket) override;
virtual bool sendPendingMessages() override;
virtual bool checkEdgeConnected() override;
virtual bool doReconnect() override;
Expand Down
2 changes: 1 addition & 1 deletion inc/finalmq/streamconnection/StreamConnectionContainer.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@ class SYMBOLEXP StreamConnectionContainer : public IStreamConnectionContainer
IStreamConnectionPrivatePtr findConnectionBySdOnlyForPollerLoop(SOCKET sd);
IStreamConnectionPrivatePtr findConnectionById(std::int64_t connectionId);
bool createSocket(const IStreamConnectionPtr& streamConnection, ConnectionData& connectionData, const ConnectProperties& connectionProperties);
void removeConnection(const SocketDescriptorPtr& sd, std::int64_t connectionId);
void disconnectIntern(const IStreamConnectionPrivatePtr& connectionDisconnect, const SocketDescriptorPtr& sd);
IStreamConnectionPrivatePtr addConnection(const SocketPtr& socket, ConnectionData& connectionData, hybrid_ptr<IStreamConnectionCallback> callback);
void handleConnectionEvents(const IStreamConnectionPrivatePtr& connection, const SocketPtr& socket, const DescriptorInfo& info);
Expand All @@ -109,6 +108,7 @@ class SYMBOLEXP StreamConnectionContainer : public IStreamConnectionContainer
std::unordered_map<std::int64_t, IStreamConnectionPrivatePtr> m_connectionId2Connection{};
std::unordered_map<SOCKET, IStreamConnectionPrivatePtr> m_sd2Connection{};
std::unordered_map<SOCKET, IStreamConnectionPrivatePtr> m_sd2ConnectionPollerLoop{};
std::vector<SocketDescriptorPtr> m_socketErase;
std::atomic_flag m_connectionsStable{};
static std::atomic_int64_t m_nextConnectionId;
std::atomic_bool m_terminatePollerLoop{false};
Expand Down
37 changes: 20 additions & 17 deletions src/poller/PollerImplEpoll.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,27 +111,30 @@ void PollerImplEpoll::addSocketEnableRead(const SocketDescriptorPtr& fd)

void PollerImplEpoll::removeSocket(const SocketDescriptorPtr& fd)
{
std::unique_lock<std::mutex> locker(m_mutex);
auto it = m_socketDescriptors.find(fd);
if (it != m_socketDescriptors.end())
if (fd)
{
epoll_event ev; // to be compatible with linux versions before 2.6.9
ev.events = 0;
ev.data.fd = fd->getDescriptor();
int res = OperatingSystem::instance().epoll_ctl(m_fdEpoll, EPOLL_CTL_DEL, fd->getDescriptor(), &ev);
if (res == -1)
std::unique_lock<std::mutex> locker(m_mutex);
auto it = m_socketDescriptors.find(fd);
if (it != m_socketDescriptors.end())
{
streamFatal << "epoll_ctl failed with errno: " << errno;
epoll_event ev; // to be compatible with linux versions before 2.6.9
ev.events = 0;
ev.data.fd = fd->getDescriptor();
int res = OperatingSystem::instance().epoll_ctl(m_fdEpoll, EPOLL_CTL_DEL, fd->getDescriptor(), &ev);
if (res == -1)
{
streamFatal << "epoll_ctl failed with errno: " << errno;
}
assert(res != -1);
m_socketDescriptors.erase(it);
sockedDescriptorHasChanged();
}
assert(res != -1);
m_socketDescriptors.erase(it);
sockedDescriptorHasChanged();
}
else
{
// socket not added
else
{
// socket not added
}
locker.unlock();
}
locker.unlock();
}

void PollerImplEpoll::enableRead(const SocketDescriptorPtr& fd)
Expand Down
2 changes: 1 addition & 1 deletion src/protocolsession/ProtocolSession.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,7 @@ IProtocolPtr ProtocolSession::createRequestConnection()
protocol->setCallback(shared_from_this());
protocol->setConnection(connection);

m_connectionProperties.config.totalReconnectDuration = 0; // only try once, do not make retries to connect (in case of SyncReqRep)
//m_connectionProperties.config.totalReconnectDuration = 0; // only try once, do not make retries to connect (in case of SyncReqRep)

bool res = m_streamConnectionContainer->connect(m_endpointStreamConnection, connection, m_connectionProperties);
if (res)
Expand Down
107 changes: 60 additions & 47 deletions src/streamconnection/StreamConnection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <thread>

#include "finalmq/streamconnection/AddressHelpers.h"
#include "finalmq/helpers/ModulenameFinalmq.h"

#define RELEASE_DISCONNECT 1

Expand All @@ -48,54 +49,51 @@ void StreamConnection::sendMessage(const IMessagePtr& msg)
return;
}
std::unique_lock<std::mutex> lock(m_mutex);
if (m_socketPrivate)
ssize_t size = msg->getTotalSendBufferSize();
if (size > 0)
{
ssize_t size = msg->getTotalSendBufferSize();
if (size > 0)
const auto& payloads = msg->getAllSendBuffers();
if (!m_pendingMessages.empty() || m_connectionData.connectionState != ConnectionState::CONNECTIONSTATE_CONNECTED)
{
m_pendingMessages.push_back({msg, payloads.begin(), 0});
}
else
{
const auto& payloads = msg->getAllSendBuffers();
if (!m_pendingMessages.empty() || m_connectionData.connectionState != ConnectionState::CONNECTIONSTATE_CONNECTED)
{
m_pendingMessages.push_back({msg, payloads.begin(), 0});
}
else
{
#if 0
m_pendingMessages.push_back({msg, payloads.begin(), 0});
m_poller->enableWrite(m_socketPrivate->getSocketDescriptor());
m_pendingMessages.push_back({msg, payloads.begin(), 0});
m_poller->enableWrite(m_socketPrivate->getSocketDescriptor());
#else
bool ex = false;
for (auto it = payloads.begin(); it != payloads.end() && !ex; )
{
const BufferRef& payload = *it;
++it;
bool ex = false;
for (auto it = payloads.begin(); it != payloads.end() && !ex; )
{
const BufferRef& payload = *it;
++it;

#ifdef __QNX__
int flags = 0;
int flags = 0;
#else
bool last = (it == payloads.end());
int flags = last ? 0 : MSG_MORE; // win32: MSG_PARTIAL
bool last = (it == payloads.end());
int flags = last ? 0 : MSG_MORE; // win32: MSG_PARTIAL
#endif

#if !defined WIN32
flags |= MSG_NOSIGNAL; // no sigpipe
flags |= MSG_NOSIGNAL; // no sigpipe
#endif
int err = m_socketPrivate->send(payload.first, static_cast<int>(payload.second), flags);
if (err != payload.second)
int err = m_socketPrivate->send(payload.first, static_cast<int>(payload.second), flags);
if (err != payload.second)
{
if (err < 0)
{
if (err < 0)
{
err = 0;
}
assert(err < payload.second);
--it;
m_pendingMessages.push_back({msg, it, err});
m_poller->enableWrite(m_socketPrivate->getSocketDescriptor());
ex = true;
err = 0;
}
assert(err < payload.second);
--it;
m_pendingMessages.push_back({msg, it, err});
m_poller->enableWrite(m_socketPrivate->getSocketDescriptor());
ex = true;
}
#endif
}
#endif
}
}
lock.unlock();
Expand All @@ -120,10 +118,19 @@ std::int64_t StreamConnection::getConnectionId() const

SocketPtr StreamConnection::getSocketPrivate()
{
// do not mutex lock here, because the removeSocket and getSocketPrivate will be called from same thread.
std::unique_lock<std::mutex> lock(m_mutex);
return m_socketPrivate;
}

void StreamConnection::setSocket(const SocketPtr& socket)
{
std::unique_lock<std::mutex> lock(m_mutex);
assert(m_socket == nullptr);
assert(m_socketPrivate == nullptr);
m_socket = socket;
m_socketPrivate = socket;
}

SocketPtr StreamConnection::getSocket()
{
std::unique_lock<std::mutex> lock(m_mutex);
Expand All @@ -140,18 +147,25 @@ bool StreamConnection::connect()
{
bool connecting = false;
std::unique_lock<std::mutex> lock(m_mutex);
if ((m_connectionData.connectionState == ConnectionState::CONNECTIONSTATE_CREATED || m_connectionData.connectionState == ConnectionState::CONNECTIONSTATE_CONNECTING_FAILED) && m_socketPrivate)
const int sockaddrSize = static_cast<int>(m_connectionData.sockaddr.size());
if ((m_connectionData.connectionState == ConnectionState::CONNECTIONSTATE_CREATED ||
m_connectionData.connectionState == ConnectionState::CONNECTIONSTATE_RECONNECT ||
m_connectionData.connectionState == ConnectionState::CONNECTIONSTATE_CONNECTING_FAILED) && m_socketPrivate && (sockaddrSize > 0))
{
int ret = m_socketPrivate->connect(reinterpret_cast<const sockaddr*>(m_connectionData.sockaddr.c_str()), static_cast<int>(m_connectionData.sockaddr.size()));
m_connectionData.connectionState = ConnectionState::CONNECTIONSTATE_CONNECTING;
int ret = m_socketPrivate->connect(reinterpret_cast<const sockaddr*>(m_connectionData.sockaddr.c_str()), sockaddrSize);
if (ret == 0)
{
connecting = true;
m_connectionData.connectionState = ConnectionState::CONNECTIONSTATE_CONNECTING;
SocketDescriptorPtr sd = m_socketPrivate->getSocketDescriptor();
assert(sd);
m_poller->addSocketEnableRead(sd);
m_poller->enableWrite(sd);
}
else
{
m_connectionData.connectionState = ConnectionState::CONNECTIONSTATE_CONNECTING_FAILED;
}
}
return connecting;
}
Expand Down Expand Up @@ -222,7 +236,8 @@ bool StreamConnection::checkEdgeConnected()
{
std::unique_lock<std::mutex> lock(m_mutex);
bool edgeConnected = false;
if (m_connectionData.connectionState == ConnectionState::CONNECTIONSTATE_CONNECTING)
const auto state = m_connectionData.connectionState;
if (state == ConnectionState::CONNECTIONSTATE_CONNECTING)
{
m_connectionData.connectionState = ConnectionState::CONNECTIONSTATE_CONNECTED;
edgeConnected = true;
Expand All @@ -233,15 +248,16 @@ bool StreamConnection::checkEdgeConnected()
bool StreamConnection::doReconnect()
{
bool reconnecting = false;
if (!m_connectionData.incomingConnection && m_connectionData.connectionState == ConnectionState::CONNECTIONSTATE_CONNECTING_FAILED && m_connectionData.reconnectInterval >= 0)
if (!m_connectionData.incomingConnection && (m_connectionData.connectionState == ConnectionState::CONNECTIONSTATE_CONNECTING_FAILED) && m_connectionData.reconnectInterval >= 0)
{
std::chrono::time_point<std::chrono::steady_clock> now = std::chrono::steady_clock::now();
std::chrono::duration<double> dur = now - m_lastReconnectTime;
int delta = static_cast<int>(dur.count() * 1000);
if (delta < 0 || delta >= m_connectionData.reconnectInterval)
{
m_lastReconnectTime = now;
reconnecting = connect();
streamInfo << "reconnect " << m_connectionData.endpoint;
reconnecting = true;
}
}
return reconnecting;
Expand All @@ -262,22 +278,19 @@ bool StreamConnection::changeStateForDisconnect()
}
else
{
assert(m_socketPrivate);
m_connectionData.connectionState = ConnectionState::CONNECTIONSTATE_CONNECTING_FAILED;
m_poller->removeSocket(m_socketPrivate->getSocketDescriptor());
}
}

if (m_disconnectFlag || (m_connectionData.connectionState == ConnectionState::CONNECTIONSTATE_CONNECTED) || reconnectExpired)
{
removeConnection = true;

assert(m_socketPrivate);
m_connectionData.connectionState = ConnectionState::CONNECTIONSTATE_DISCONNECTED;
m_poller->removeSocket(m_socketPrivate->getSocketDescriptor());
m_socketPrivate = nullptr;
m_socket = nullptr;
}

m_socketPrivate = nullptr;
m_socket = nullptr;

return removeConnection;
}

Expand Down
Loading