Skip to content

Commit 4b72c13

Browse files
committed
Further fixes
1 parent 3ef4ced commit 4b72c13

4 files changed

Lines changed: 41 additions & 33 deletions

File tree

inc/finalmq/streamconnection/StreamConnectionContainer.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,6 @@ class SYMBOLEXP StreamConnectionContainer : public IStreamConnectionContainer
9595
IStreamConnectionPrivatePtr findConnectionBySdOnlyForPollerLoop(SOCKET sd);
9696
IStreamConnectionPrivatePtr findConnectionById(std::int64_t connectionId);
9797
bool createSocket(const IStreamConnectionPtr& streamConnection, ConnectionData& connectionData, const ConnectProperties& connectionProperties);
98-
void removeConnection(const SocketDescriptorPtr& sd, std::int64_t connectionId);
9998
void disconnectIntern(const IStreamConnectionPrivatePtr& connectionDisconnect, const SocketDescriptorPtr& sd);
10099
IStreamConnectionPrivatePtr addConnection(const SocketPtr& socket, ConnectionData& connectionData, hybrid_ptr<IStreamConnectionCallback> callback);
101100
void handleConnectionEvents(const IStreamConnectionPrivatePtr& connection, const SocketPtr& socket, const DescriptorInfo& info);
@@ -109,6 +108,7 @@ class SYMBOLEXP StreamConnectionContainer : public IStreamConnectionContainer
109108
std::unordered_map<std::int64_t, IStreamConnectionPrivatePtr> m_connectionId2Connection{};
110109
std::unordered_map<SOCKET, IStreamConnectionPrivatePtr> m_sd2Connection{};
111110
std::unordered_map<SOCKET, IStreamConnectionPrivatePtr> m_sd2ConnectionPollerLoop{};
111+
std::vector<SocketDescriptorPtr> m_socketErase;
112112
std::atomic_flag m_connectionsStable{};
113113
static std::atomic_int64_t m_nextConnectionId;
114114
std::atomic_bool m_terminatePollerLoop{false};

src/streamconnection/StreamConnection.cpp

Lines changed: 14 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,8 @@ SocketPtr StreamConnection::getSocketPrivate()
125125
void StreamConnection::setSocket(const SocketPtr& socket)
126126
{
127127
std::unique_lock<std::mutex> lock(m_mutex);
128+
assert(m_socket == nullptr);
129+
assert(m_socketPrivate == nullptr);
128130
m_socket = socket;
129131
m_socketPrivate = socket;
130132
}
@@ -148,16 +150,20 @@ bool StreamConnection::connect()
148150
const int sockaddrSize = static_cast<int>(m_connectionData.sockaddr.size());
149151
if ((m_connectionData.connectionState == ConnectionState::CONNECTIONSTATE_CREATED || m_connectionData.connectionState == ConnectionState::CONNECTIONSTATE_CONNECTING_FAILED) && m_socketPrivate && (sockaddrSize > 0))
150152
{
153+
m_connectionData.connectionState = ConnectionState::CONNECTIONSTATE_CONNECTING;
151154
int ret = m_socketPrivate->connect(reinterpret_cast<const sockaddr*>(m_connectionData.sockaddr.c_str()), sockaddrSize);
152155
if (ret == 0)
153156
{
154157
connecting = true;
155-
m_connectionData.connectionState = ConnectionState::CONNECTIONSTATE_CONNECTING;
156158
SocketDescriptorPtr sd = m_socketPrivate->getSocketDescriptor();
157159
assert(sd);
158160
m_poller->addSocketEnableRead(sd);
159161
m_poller->enableWrite(sd);
160162
}
163+
else
164+
{
165+
m_connectionData.connectionState = ConnectionState::CONNECTIONSTATE_CONNECTING_FAILED;
166+
}
161167
}
162168
return connecting;
163169
}
@@ -228,7 +234,8 @@ bool StreamConnection::checkEdgeConnected()
228234
{
229235
std::unique_lock<std::mutex> lock(m_mutex);
230236
bool edgeConnected = false;
231-
if (m_connectionData.connectionState == ConnectionState::CONNECTIONSTATE_CONNECTING)
237+
const auto state = m_connectionData.connectionState;
238+
if (state == ConnectionState::CONNECTIONSTATE_CONNECTING)
232239
{
233240
m_connectionData.connectionState = ConnectionState::CONNECTIONSTATE_CONNECTED;
234241
edgeConnected = true;
@@ -248,14 +255,8 @@ bool StreamConnection::doReconnect()
248255
{
249256
m_lastReconnectTime = now;
250257
std::unique_lock<std::mutex> lock(m_mutex);
251-
if (m_connectionData.connectionState == ConnectionState::CONNECTIONSTATE_CREATED || m_connectionData.connectionState == ConnectionState::CONNECTIONSTATE_CONNECTING_FAILED)
258+
if (m_connectionData.connectionState == ConnectionState::CONNECTIONSTATE_CONNECTING_FAILED)
252259
{
253-
if (m_socketPrivate)
254-
{
255-
m_poller->removeSocket(m_socketPrivate->getSocketDescriptor());
256-
m_socketPrivate = nullptr;
257-
m_socket = nullptr;
258-
}
259260
reconnecting = true;
260261
}
261262
}
@@ -278,24 +279,19 @@ bool StreamConnection::changeStateForDisconnect()
278279
}
279280
else
280281
{
281-
assert(m_socketPrivate);
282282
m_connectionData.connectionState = ConnectionState::CONNECTIONSTATE_CONNECTING_FAILED;
283-
m_poller->removeSocket(m_socketPrivate->getSocketDescriptor());
284283
}
285284
}
286285

287286
if (m_disconnectFlag || (m_connectionData.connectionState == ConnectionState::CONNECTIONSTATE_CONNECTED) || reconnectExpired)
288287
{
289288
removeConnection = true;
290-
291289
m_connectionData.connectionState = ConnectionState::CONNECTIONSTATE_DISCONNECTED;
292-
if (m_socketPrivate)
293-
{
294-
m_poller->removeSocket(m_socketPrivate->getSocketDescriptor());
295-
}
296-
m_socketPrivate = nullptr;
297-
m_socket = nullptr;
298290
}
291+
292+
m_socketPrivate = nullptr;
293+
m_socket = nullptr;
294+
299295
return removeConnection;
300296
}
301297

src/streamconnection/StreamConnectionContainer.cpp

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -325,9 +325,8 @@ bool StreamConnectionContainer::createSocket(const IStreamConnectionPtr& streamC
325325
SocketDescriptorPtr sd = socket->getSocketDescriptor();
326326
assert(sd);
327327
connectionData.sd = sd->getDescriptor();
328-
AddressHelpers::addr2peer(reinterpret_cast<sockaddr*>(const_cast<char*>(connectionData.sockaddr.c_str())), connectionData);
329328

330-
IStreamConnectionPrivatePtr streamConnectionPrivate;
329+
IStreamConnectionPrivatePtr streamConnectionPrivate;
331330
std::unique_lock<std::mutex> lock(m_mutex);
332331
auto it = m_connectionId2Connection.find(connectionData.connectionId);
333332
if (it != m_connectionId2Connection.end())
@@ -336,7 +335,9 @@ bool StreamConnectionContainer::createSocket(const IStreamConnectionPtr& streamC
336335
assert(streamConnectionPrivate);
337336
streamConnectionPrivate->setSocket(socket);
338337
streamConnectionPrivate->updateConnectionData(connectionData);
339-
m_sd2Connection[connectionData.sd] = it->second;
338+
auto& entry = m_sd2Connection[connectionData.sd];
339+
assert(entry == nullptr);
340+
entry = it->second;
340341
m_connectionsStable.clear(std::memory_order_release);
341342
}
342343
else
@@ -399,6 +400,7 @@ bool StreamConnectionContainer::connect(const std::string& endpoint, const IStre
399400
addrTcp.sin_addr.s_addr = ipAddress.s_addr;
400401
addrTcp.sin_port = htons(static_cast<std::int16_t>(connectionData.port));
401402
connectionData.sockaddr = std::string(reinterpret_cast<const char*>(&addrTcp), sizeof(sockaddr_in));
403+
AddressHelpers::addr2peer(reinterpret_cast<sockaddr*>(const_cast<char*>(connectionData.sockaddr.c_str())), connectionData);
402404
IStreamConnectionPrivatePtr streamConnectionPrivate;
403405
std::unique_lock<std::mutex> lock(m_mutex);
404406
auto it = m_connectionId2Connection.find(connectionData.connectionId);
@@ -427,6 +429,7 @@ bool StreamConnectionContainer::connect(const std::string& endpoint, const IStre
427429
else
428430
{
429431
connectionData.sockaddr = addr;
432+
AddressHelpers::addr2peer(reinterpret_cast<sockaddr*>(const_cast<char*>(connectionData.sockaddr.c_str())), connectionData);
430433
IStreamConnectionPrivatePtr streamConnectionPrivate;
431434
std::unique_lock<std::mutex> lock(m_mutex);
432435
auto it = m_connectionId2Connection.find(connectionData.connectionId);
@@ -486,24 +489,24 @@ IStreamConnectionPtr StreamConnectionContainer::getConnection(std::int64_t conne
486489

487490
//////////////
488491

489-
void StreamConnectionContainer::removeConnection(const SocketDescriptorPtr& sd, std::int64_t connectionId)
492+
void StreamConnectionContainer::disconnectIntern(const IStreamConnectionPrivatePtr& connectionDisconnect, const SocketDescriptorPtr& sd)
490493
{
494+
bool removeConn = connectionDisconnect->changeStateForDisconnect();
495+
m_poller->removeSocket(sd);
491496
std::unique_lock<std::mutex> lock(m_mutex);
492497
if (sd)
493498
{
499+
m_socketErase.push_back(sd);
494500
m_sd2Connection.erase(sd->getDescriptor());
495501
m_connectionsStable.clear(std::memory_order_release);
496502
}
497-
m_connectionId2Connection.erase(connectionId);
503+
if (removeConn)
504+
{
505+
m_connectionId2Connection.erase(connectionDisconnect->getConnectionId());
506+
}
498507
lock.unlock();
499-
}
500-
501-
void StreamConnectionContainer::disconnectIntern(const IStreamConnectionPrivatePtr& connectionDisconnect, const SocketDescriptorPtr& sd)
502-
{
503-
bool removeConn = connectionDisconnect->changeStateForDisconnect();
504508
if (removeConn)
505509
{
506-
removeConnection(sd, connectionDisconnect->getConnectionId());
507510
connectionDisconnect->disconnected(connectionDisconnect);
508511
}
509512
}
@@ -839,12 +842,20 @@ void StreamConnectionContainer::pollerLoop()
839842
m_lastReconnectTime = std::chrono::steady_clock::now();
840843
while (!m_terminatePollerLoop)
841844
{
845+
if (m_connectionsStable.test_and_set(std::memory_order_acq_rel))
846+
{
847+
std::unique_lock<std::mutex> lock(m_mutex);
848+
m_sd2ConnectionPollerLoop = m_sd2Connection;
849+
m_socketErase.clear();
850+
}
851+
842852
const PollerResult& result = m_poller->wait(1000);
843853

844854
if (m_connectionsStable.test_and_set(std::memory_order_acq_rel))
845855
{
846856
std::unique_lock<std::mutex> lock(m_mutex);
847857
m_sd2ConnectionPollerLoop = m_sd2Connection;
858+
m_socketErase.clear();
848859
}
849860

850861
if (result.releaseWait)
@@ -905,6 +916,7 @@ void StreamConnectionContainer::pollerLoop()
905916
SocketPtr socket = connection->getSocketPrivate();
906917
if (socket)
907918
{
919+
assert(info.sd == socket->getSocketDescriptor()->getDescriptor());
908920
handleConnectionEvents(connection, socket, info);
909921
}
910922
}

test/testIntegrationProtocolStreamSessionContainer.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -141,13 +141,13 @@ TEST_F(TestIntegrationProtocolStreamSessionContainer, testConnectBind)
141141
int res = m_sessionContainer->bind("tcp://*:3333:stream", m_mockServerCallback);
142142
EXPECT_EQ(res, 0);
143143

144-
waitTillDone(expectConnected, 5000);
144+
waitTillDone(expectConnected, 2000);
145145

146146
EXPECT_EQ(connection->getConnectionData().connectionState, ConnectionState::CONNECTIONSTATE_CONNECTED);
147147
EXPECT_EQ(m_sessionContainer->getSession(connection->getSessionId()), connection);
148148
}
149149

150-
TEST_F(TestIntegrationProtocolStreamSessionContainer, testConnectBindConnectPropertiesJson)
150+
TEST_F(TestIntegrationProtocolStreamSessionContainer, test_ConnectBindConnectPropertiesJson)
151151
{
152152
auto& expectConnected = EXPECT_CALL(*m_mockClientCallback, connected(_)).Times(1);
153153
EXPECT_CALL(*m_mockServerCallback, connected(_)).Times(1);
@@ -200,7 +200,7 @@ TEST_F(TestIntegrationProtocolStreamSessionContainer, testReconnectExpires)
200200
message->addSendPayload(MESSAGE1_BUFFER);
201201
connection->sendMessage(message);
202202

203-
waitTillDone(expectDisconnected, 5000);
203+
waitTillDone(expectDisconnected, 10000);
204204

205205
EXPECT_EQ(connection->getConnectionData().connectionState, ConnectionState::CONNECTIONSTATE_DISCONNECTED);
206206
EXPECT_EQ(m_sessionContainer->getSession(connection->getSessionId()), nullptr);

0 commit comments

Comments
 (0)