diff --git a/src/gxs/rsdataservice.cc b/src/gxs/rsdataservice.cc index 61900e171a..61fb075710 100644 --- a/src/gxs/rsdataservice.cc +++ b/src/gxs/rsdataservice.cc @@ -26,6 +26,8 @@ * #define RS_DATA_SERVICE_DEBUG_CACHE 1 ****/ +//#define GXSPROFILING + #include #include #include @@ -1174,11 +1176,21 @@ int RsDataService::retrieveNxsMsgs(const GxsMsgReq &reqIds, GxsMsgResult &msg, int resultCount = 0; #endif +#ifdef GXSPROFILING + // [TRACE] Start the database retrieval timer + RsDbg() << "GXSPROFILING [DataService]: START retrieveNxsMsgs for " << reqIds.size() << " groups"; + auto start_all = std::chrono::steady_clock::now(); +#endif + for(auto mit = reqIds.begin(); mit != reqIds.end(); ++mit) { - const RsGxsGroupId& grpId = mit->first; +#ifdef GXSPROFILING + // [TRACE] Start timer for this specific group + auto start_group = std::chrono::steady_clock::now(); +#endif + // if vector empty then request all messages const std::set& msgIdV = mit->second; std::vector msgSet; @@ -1222,6 +1234,14 @@ int RsDataService::retrieveNxsMsgs(const GxsMsgReq &reqIds, GxsMsgResult &msg, msg[grpId] = msgSet; +#ifdef GXSPROFILING + // [TRACE] Log time per group to monitor progress + auto end_group = std::chrono::steady_clock::now(); + auto group_ms = std::chrono::duration_cast(end_group - start_group).count(); + RsDbg() << "GXSPROFILING [DataService]: Group " << grpId.toStdString() + << " (Total " << msgSet.size() << " msgs) processed in " << group_ms << "ms"; +#endif + msgSet.clear(); } @@ -1229,6 +1249,13 @@ int RsDataService::retrieveNxsMsgs(const GxsMsgReq &reqIds, GxsMsgResult &msg, std::cerr << "RsDataService::retrieveNxsMsgs() " << mDbName << ", Requests: " << reqIds.size() << ", Results: " << resultCount << ", Time: " << timer.duration() << std::endl; #endif +#ifdef GXSPROFILING + // [TRACE] Log total database time + auto end_all = std::chrono::steady_clock::now(); + auto total_ms = std::chrono::duration_cast(end_all - start_all).count(); + RsDbg() << "GXSPROFILING [DataService]: END retrieveNxsMsgs total time: " << total_ms << "ms"; +#endif + return 1; } diff --git a/src/gxs/rsgenexchange.cc b/src/gxs/rsgenexchange.cc index 21b4eff0ca..b8ac502d85 100644 --- a/src/gxs/rsgenexchange.cc +++ b/src/gxs/rsgenexchange.cc @@ -73,6 +73,8 @@ static const uint32_t INTEGRITY_CHECK_PERIOD = 60*31; // 31 minutes * #define GEN_EXCH_DEBUG 1 */ +//#define GXSPROFILING + #if defined(GEN_EXCH_DEBUG) static const uint32_t service_to_print = RS_SERVICE_GXS_TYPE_FORUMS;// use this to allow to this service id only, or 0 for all services // warning. Numbers should be SERVICE IDS (see serialiser/rsserviceids.h. E.g. 0x0215 for forums) @@ -1569,22 +1571,34 @@ bool RsGenExchange::getGroupData(const uint32_t &token, std::vectorgetMsgData(token, msgResult); if(ok) { + uint32_t count = 0; NxsMsgDataResult::iterator mit = msgResult.begin(); for(; mit != msgResult.end(); ++mit) { const RsGxsGroupId& grpId = mit->first; std::vector& gxsMsgItems = msgItems[grpId]; std::vector& nxsMsgsV = mit->second; - std::vector::iterator vit = nxsMsgsV.begin(); - for(; vit != nxsMsgsV.end(); ++vit) + + // Pre-allocate a temporary vector for results to avoid locking in the parallel loop + std::vector tempItems(nxsMsgsV.size(), nullptr); + + // THREAD-SAFETY NOTE: This OMP loop performs in-memory deserialization only. + // The SQLite/SQLCipher query has already completed above (getMsgData). + // The serialiser (mSerialiser) must remain stateless/re-entrant for this to be safe. + #pragma omp parallel for + for(size_t i = 0; i < nxsMsgsV.size(); ++i) { - RsNxsMsg*& msg = *vit; + RsNxsMsg* msg = nxsMsgsV[i]; RsItem* item = NULL; if(msg->msg.bin_len != 0) @@ -1595,25 +1609,44 @@ bool RsGenExchange::getMsgData(uint32_t token, GxsMsgDataMap &msgItems) RsGxsMsgItem* mItem = dynamic_cast(item); if (mItem) { - mItem->meta = *((*vit)->metaData); // get meta info from nxs msg - gxsMsgItems.push_back(mItem); + mItem->meta = *(msg->metaData); // get meta info from nxs msg + tempItems[i] = mItem; } else { - std::cerr << "RsGenExchange::getMsgData() deserialisation/dynamic_cast ERROR"; - std::cerr << std::endl; + // Should almost never happen if serializer is correct delete item; } } else { - std::cerr << "RsGenExchange::getMsgData() deserialisation ERROR"; - std::cerr << std::endl; + // Deserialization failed (corrupt data?) + // std::cerr << "RsGenExchange::getMsgData() deserialisation ERROR" << std::endl; + } + delete msg; + } + + // Serial merge of successful items + for(size_t i = 0; i < tempItems.size(); ++i) { + if(tempItems[i]) { + gxsMsgItems.push_back(tempItems[i]); + count++; } - delete msg; } } + // [TRACE] Log the number of items processed +#ifdef GXSPROFILING + RsDbg() << "GXSPROFILING [GenExch]: Deserialized " << count << " items"; +#endif } + +#ifdef GXSPROFILING + // [TRACE] End timer and log total processing time + auto end_time = std::chrono::steady_clock::now(); + auto elapsed = std::chrono::duration_cast(end_time - start_time).count(); + RsDbg() << "GXSPROFILING [GenExch]: getMsgData (Token: " << token << ") total time: " << elapsed << "ms"; +#endif + return ok; } diff --git a/src/gxs/rsgenexchange.h b/src/gxs/rsgenexchange.h index 257b7bc5f7..73dbc96d97 100644 --- a/src/gxs/rsgenexchange.h +++ b/src/gxs/rsgenexchange.h @@ -170,14 +170,14 @@ class RsGenExchange : public RsNxsObserver, public RsTickingThread, public RsGxs */ virtual void service_tick() = 0; - /*! - * - * @return handle to token service handle for making - * request to this gxs service - */ - RsTokenService* getTokenService(); - - void threadTick() override; /// @see RsTickingThread + /*! + * + * @return handle to token service handle for making + * request to this gxs service + */ + RsTokenService* getTokenService(); + + void threadTick() override; /// @see RsTickingThread /*! * Policy bit pattern portion @@ -948,7 +948,7 @@ class RsGenExchange : public RsNxsObserver, public RsTickingThread, public RsGxs RsGxsDataAccess* mDataAccess; RsGeneralDataService* mDataStore; RsNetworkExchangeService *mNetService; - RsSerialType *mSerialiser; + RsSerialType *mSerialiser; // WARNING: used concurrently via OpenMP in getMsgData() — must remain stateless/re-entrant /// service type uint16_t mServType; RsGixs* mGixs; diff --git a/src/gxs/rsgxsnetservice.cc b/src/gxs/rsgxsnetservice.cc index 9f74520b3a..c174cbac88 100644 --- a/src/gxs/rsgxsnetservice.cc +++ b/src/gxs/rsgxsnetservice.cc @@ -288,6 +288,8 @@ //#define NXS_FRAG +//#define GXSPROFILING + // The constant below have a direct influence on how fast forums/channels/posted/identity groups propagate and on the overloading of queues: // // Channels/forums will update at a rate of SYNC_PERIOD*MAX_REQLIST_SIZE/60 messages per minute. @@ -3515,6 +3517,11 @@ void RsGxsNetService::runVetting() void RsGxsNetService::locked_genSendMsgsTransaction(NxsTransaction* tr) { +#ifdef GXSPROFILING + // [TRACE] Start global timer for the network transaction + auto start_net = std::chrono::steady_clock::now(); +#endif + #ifdef NXS_NET_DEBUG_0 GXSNETDEBUG_P_(tr->mTransaction->PeerId()) << "locked_genSendMsgsTransaction() Generating Msg data send fron TransN: " << tr->mTransaction->transactionNumber << std::endl; #endif @@ -3702,8 +3709,16 @@ void RsGxsNetService::locked_genSendMsgsTransaction(NxsTransaction* tr) delete newTr; } +#ifdef GXSPROFILING + // [TRACE] End global timer and log with the exact same format as V3 + auto end_net = std::chrono::steady_clock::now(); + auto net_ms = std::chrono::duration_cast(end_net - start_net).count(); + RsDbg() << "GXSPROFILING [NetService]: TOTAL locked_genSendMsgsTransaction for " << tr->mItems.size() << " items took " << net_ms << "ms"; +#endif + return; } + uint32_t RsGxsNetService::locked_getTransactionId() { return ++mTransactionN; diff --git a/src/libretroshare.pro b/src/libretroshare.pro index 812c593856..0af0f2e386 100644 --- a/src/libretroshare.pro +++ b/src/libretroshare.pro @@ -21,6 +21,10 @@ DESTDIR = lib QMAKE_CXXFLAGS += -fPIC +# OpenMP support for parallel deserialization in GXS (rsgenexchange.cc) +QMAKE_CXXFLAGS += -fopenmp +LIBS += -fopenmp + ## Uncomment to enable Unfinished Services. #CONFIG += wikipoos #CONFIG += gxsthewire diff --git a/src/use_libretroshare.pri b/src/use_libretroshare.pri index 85377d0429..dfa65e5e29 100644 --- a/src/use_libretroshare.pri +++ b/src/use_libretroshare.pri @@ -106,9 +106,11 @@ rs_jsonapi { linux-* { mLibs += dl + # OpenMP runtime needed for parallel deserialization in rsgenexchange.cc + LIBS += -fopenmp } -rs_deep_channels_index | rs_deep_files_index | rs_deep_forums_index { +rs_deep_channels_index | rs_deep_files_index { mLibs += xapian win32-g++|win32-clang-g++:mLibs += rpcrt4 } diff --git a/src/util/retrodb.cc b/src/util/retrodb.cc index cff63fa9f4..bd2ea560ce 100644 --- a/src/util/retrodb.cc +++ b/src/util/retrodb.cc @@ -35,6 +35,8 @@ //#define RETRODB_DEBUG +//#define GXSPROFILING + const int RetroDb::OPEN_READONLY = SQLITE_OPEN_READONLY; const int RetroDb::OPEN_READWRITE = SQLITE_OPEN_READWRITE; const int RetroDb::OPEN_READWRITE_CREATE = SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE; @@ -241,6 +243,11 @@ bool RetroDb::execSQL(const std::string &query){ RetroCursor* RetroDb::sqlQuery(const std::string& tableName, const std::list& columns, const std::string& selection, const std::string& orderBy){ +#ifdef GXSPROFILING + // [TRACE] Start individual query timer + auto start_sql = std::chrono::steady_clock::now(); +#endif + if(tableName.empty() || columns.empty()){ std::cerr << "RetroDb::sqlQuery(): No table or columns given" << std::endl; return NULL; @@ -279,7 +286,17 @@ RetroCursor* RetroDb::sqlQuery(const std::string& tableName, const std::list(end_sql - start_sql).count(); + + RsDbg() << "GXSPROFILING [RetroDb]: Batch SQL for group individual_query took " << sql_ms << "ms"; +#endif + + return cursor; } bool RetroDb::isOpen() const {