Skip to content

Commit f077ae3

Browse files
committed
Optimize getMsgData using OpenMP parallelization
1 parent 9d13097 commit f077ae3

1 file changed

Lines changed: 24 additions & 12 deletions

File tree

src/gxs/rsgenexchange.cc

Lines changed: 24 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1552,37 +1552,49 @@ bool RsGenExchange::getMsgData(uint32_t token, GxsMsgDataMap &msgItems)
15521552
const RsGxsGroupId& grpId = mit->first;
15531553
std::vector<RsGxsMsgItem*>& gxsMsgItems = msgItems[grpId];
15541554
std::vector<RsNxsMsg*>& nxsMsgsV = mit->second;
1555-
std::vector<RsNxsMsg*>::iterator vit = nxsMsgsV.begin();
1556-
for(; vit != nxsMsgsV.end(); ++vit)
1555+
1556+
// Pre-allocate a temporary vector for results to avoid locking in the parallel loop
1557+
std::vector<RsGxsMsgItem*> tempItems(nxsMsgsV.size(), nullptr);
1558+
1559+
#pragma omp parallel for
1560+
for(size_t i = 0; i < nxsMsgsV.size(); ++i)
15571561
{
1558-
RsNxsMsg*& msg = *vit;
1562+
RsNxsMsg* msg = nxsMsgsV[i];
15591563
RsItem* item = NULL;
15601564

15611565
if(msg->msg.bin_len != 0)
15621566
item = mSerialiser->deserialise(msg->msg.bin_data, &msg->msg.bin_len);
15631567

15641568
if (item)
15651569
{
1566-
RsGxsMsgItem* mItem = dynamic_cast<RsGxsMsgItem*>(item);
1570+
// Use static_cast as we expect the serializer to return the correct type for this service
1571+
// dynamic_cast can be slower and we want speed here.
1572+
RsGxsMsgItem* mItem = static_cast<RsGxsMsgItem*>(item);
15671573
if (mItem)
15681574
{
1569-
mItem->meta = *((*vit)->metaData); // get meta info from nxs msg
1570-
gxsMsgItems.push_back(mItem);
1571-
count++;
1575+
mItem->meta = *(msg->metaData); // get meta info from nxs msg
1576+
tempItems[i] = mItem;
15721577
}
15731578
else
15741579
{
1575-
std::cerr << "RsGenExchange::getMsgData() deserialisation/dynamic_cast ERROR";
1576-
std::cerr << std::endl;
1580+
// Should almost never happen if serializer is correct
15771581
delete item;
15781582
}
15791583
}
15801584
else
15811585
{
1582-
std::cerr << "RsGenExchange::getMsgData() deserialisation ERROR";
1583-
std::cerr << std::endl;
1586+
// Deserialization failed (corrupt data?)
1587+
// std::cerr << "RsGenExchange::getMsgData() deserialisation ERROR" << std::endl;
1588+
}
1589+
delete msg;
1590+
}
1591+
1592+
// Serial merge of successful items
1593+
for(size_t i = 0; i < tempItems.size(); ++i) {
1594+
if(tempItems[i]) {
1595+
gxsMsgItems.push_back(tempItems[i]);
1596+
count++;
15841597
}
1585-
delete msg;
15861598
}
15871599
}
15881600
// [TRACE] Log the number of items processed

0 commit comments

Comments
 (0)