@@ -443,6 +443,10 @@ bool DistributedChatService::handleRecvItem(RsChatItem *item)
443443 case RS_PKT_SUBTYPE_CHAT_LOBBY_UNSUBSCRIBE: handleFriendUnsubscribeLobby (dynamic_cast <RsChatLobbyUnsubscribeItem *>(item)) ; break ;
444444 case RS_PKT_SUBTYPE_CHAT_LOBBY_LIST_REQUEST: handleRecvChatLobbyListRequest (dynamic_cast <RsChatLobbyListRequestItem *>(item)) ; break ;
445445 case RS_PKT_SUBTYPE_CHAT_LOBBY_LIST: handleRecvChatLobbyList (dynamic_cast <RsChatLobbyListItem *>(item)) ; break ;
446+ case RS_PKT_SUBTYPE_CHAT_LOBBY_HISTORY_PROBE: handleRecvLobbyHistoryProbe (dynamic_cast <RsChatLobbyHistoryProbeItem *>(item)) ; break ;
447+ case RS_PKT_SUBTYPE_CHAT_LOBBY_HISTORY_PROBE_RESP: handleRecvLobbyHistoryProbeResponse (dynamic_cast <RsChatLobbyHistoryProbeResponseItem *>(item)) ; break ;
448+ case RS_PKT_SUBTYPE_CHAT_LOBBY_HISTORY_REQUEST: handleRecvLobbyHistoryRequest (dynamic_cast <RsChatLobbyHistoryRequestItem *>(item)) ; break ;
449+ case RS_PKT_SUBTYPE_CHAT_LOBBY_HISTORY_DATA: handleRecvLobbyHistoryData (dynamic_cast <RsChatLobbyHistoryDataItem *>(item)) ; break ;
446450 default : return false ;
447451 }
448452 return true ;
@@ -2263,3 +2267,244 @@ bool DistributedChatService::processLoadListItem(const RsItem *item)
22632267 return false ;
22642268}
22652269
2270+ /* **************** Lobby History Retrieval Protocol *****************/
2271+
2272+ #include " chat/rschatitems.h"
2273+ #include " pqi/p3historymgr.h"
2274+
2275+ bool DistributedChatService::requestLobbyHistory (const ChatLobbyId& lobby_id)
2276+ {
2277+ RsStackMutex stack (mDistributedChatMtx ); /* ********* STACK LOCKED MTX ******/
2278+
2279+ std::map<ChatLobbyId,ChatLobbyEntry>::iterator it = _chat_lobbys.find (lobby_id) ;
2280+
2281+ if (it == _chat_lobbys.end ())
2282+ {
2283+ std::cerr << " (EE) requestLobbyHistory(): lobby " << std::hex << lobby_id << std::dec << " not found." << std::endl;
2284+ return false ;
2285+ }
2286+
2287+ // Send a probe to every direct friend participating in this lobby
2288+
2289+ for (std::set<RsPeerId>::const_iterator fit (it->second .participating_friends .begin ()); fit != it->second .participating_friends .end (); ++fit)
2290+ {
2291+ if (mServControl ->isPeerConnected (mServType , *fit))
2292+ {
2293+ RsChatLobbyHistoryProbeItem *item = new RsChatLobbyHistoryProbeItem ;
2294+ item->lobby_id = lobby_id ;
2295+ item->PeerId (*fit) ;
2296+
2297+ sendChatItem (item) ;
2298+
2299+ std::cerr << " requestLobbyHistory(): sent probe to peer " << *fit << " for lobby " << std::hex << lobby_id << std::dec << std::endl;
2300+ }
2301+ }
2302+
2303+ return true ;
2304+ }
2305+
2306+ void DistributedChatService::handleRecvLobbyHistoryProbe (RsChatLobbyHistoryProbeItem *item)
2307+ {
2308+ if (!item) return ;
2309+
2310+ // Check we are subscribed to this lobby
2311+ {
2312+ RsStackMutex stack (mDistributedChatMtx ); /* ********* STACK LOCKED MTX ******/
2313+
2314+ if (_chat_lobbys.find (item->lobby_id ) == _chat_lobbys.end ())
2315+ {
2316+ std::cerr << " (WW) handleRecvLobbyHistoryProbe(): lobby " << std::hex << item->lobby_id << std::dec << " not found. Ignoring." << std::endl;
2317+ return ;
2318+ }
2319+ }
2320+
2321+ // Retrieve our local history for this lobby
2322+ std::list<HistoryMsg> msgs ;
2323+ mHistMgr ->getMessages (ChatId (item->lobby_id ), msgs, 0 ) ; // 0 = get all available
2324+
2325+ uint32_t available_count = (uint32_t )msgs.size () ;
2326+ uint32_t oldest_ts = 0 ;
2327+
2328+ if (!msgs.empty ())
2329+ oldest_ts = msgs.front ().sendTime ;
2330+
2331+ // Send response back to the requesting peer
2332+ RsChatLobbyHistoryProbeResponseItem *response = new RsChatLobbyHistoryProbeResponseItem ;
2333+ response->lobby_id = item->lobby_id ;
2334+ response->available_count = available_count ;
2335+ response->oldest_timestamp = oldest_ts ;
2336+ response->PeerId (item->PeerId ()) ;
2337+
2338+ sendChatItem (response) ;
2339+
2340+ std::cerr << " handleRecvLobbyHistoryProbe(): responded to " << item->PeerId () << " — " << available_count << " msgs available, oldest TS=" << oldest_ts << std::endl;
2341+ }
2342+
2343+ void DistributedChatService::handleRecvLobbyHistoryProbeResponse (RsChatLobbyHistoryProbeResponseItem *item)
2344+ {
2345+ if (!item) return ;
2346+
2347+ std::cerr << " handleRecvLobbyHistoryProbeResponse(): peer " << item->PeerId ()
2348+ << " has " << item->available_count << " messages for lobby " << std::hex << item->lobby_id << std::dec
2349+ << " , oldest TS=" << item->oldest_timestamp << std::endl;
2350+
2351+ auto ev = std::make_shared<RsChatLobbyEvent>();
2352+ ev->mEventCode = RsChatLobbyEventCode::CHAT_LOBBY_EVENT_HISTORY_PROBE_RESPONSE;
2353+ ev->mPeerId = item->PeerId () ;
2354+ ev->mLobbyId = item->lobby_id ;
2355+ ev->mGenericCount = item->available_count ;
2356+ ev->mTimeShift = item->oldest_timestamp ; // using mTimeShift to store the timestamp
2357+ rsEvents->postEvent (ev);
2358+ }
2359+
2360+ bool DistributedChatService::requestLobbyHistoryFromPeer (const ChatLobbyId& lobby_id, const RsPeerId& peer_id, uint32_t max_count, uint32_t oldest_ts)
2361+ {
2362+ RsStackMutex stack (mDistributedChatMtx ); /* ********* STACK LOCKED MTX ******/
2363+
2364+ std::map<ChatLobbyId,ChatLobbyEntry>::iterator it = _chat_lobbys.find (lobby_id) ;
2365+
2366+ if (it == _chat_lobbys.end ())
2367+ {
2368+ std::cerr << " (EE) requestLobbyHistoryFromPeer(): lobby " << std::hex << lobby_id << std::dec << " not found." << std::endl;
2369+ return false ;
2370+ }
2371+
2372+ if (!mServControl ->isPeerConnected (mServType , peer_id))
2373+ {
2374+ std::cerr << " (EE) requestLobbyHistoryFromPeer(): peer " << peer_id << " is not connected." << std::endl;
2375+ return false ;
2376+ }
2377+
2378+ RsChatLobbyHistoryRequestItem *request = new RsChatLobbyHistoryRequestItem ;
2379+ request->lobby_id = lobby_id ;
2380+ request->max_count = max_count ;
2381+ request->oldest_timestamp = oldest_ts ;
2382+ request->PeerId (peer_id) ;
2383+
2384+ sendChatItem (request) ;
2385+
2386+ std::cerr << " requestLobbyHistoryFromPeer(): sent request to peer " << peer_id << " for lobby " << std::hex << lobby_id << std::dec << " (max " << max_count << " msgs, oldest TS=" << oldest_ts << " )" << std::endl;
2387+ return true ;
2388+ }
2389+
2390+ void DistributedChatService::handleRecvLobbyHistoryRequest (RsChatLobbyHistoryRequestItem *item)
2391+ {
2392+ if (!item) return ;
2393+
2394+ // Check we are subscribed to this lobby
2395+ {
2396+ RsStackMutex stack (mDistributedChatMtx ); /* ********* STACK LOCKED MTX ******/
2397+
2398+ if (_chat_lobbys.find (item->lobby_id ) == _chat_lobbys.end ())
2399+ {
2400+ std::cerr << " (WW) handleRecvLobbyHistoryRequest(): lobby " << std::hex << item->lobby_id << std::dec << " not found. Ignoring." << std::endl;
2401+ return ;
2402+ }
2403+ }
2404+
2405+ // Retrieve local history
2406+ std::list<HistoryMsg> msgs ;
2407+ mHistMgr ->getMessages (ChatId (item->lobby_id ), msgs, item->max_count ) ;
2408+
2409+ RsChatLobbyHistoryDataItem *data_item = new RsChatLobbyHistoryDataItem ;
2410+ data_item->lobby_id = item->lobby_id ;
2411+ data_item->PeerId (item->PeerId ()) ;
2412+ size_t count = 0 ;
2413+ size_t current_chunk_size = 0 ;
2414+ const size_t MAX_CHUNK_SIZE = 120 * 1024 ; // ~120 KB to stay safely under 128KB limit
2415+
2416+ for (std::list<HistoryMsg>::const_iterator it (msgs.begin ()); it != msgs.end (); ++it)
2417+ {
2418+ if (it->sendTime >= item->oldest_timestamp )
2419+ {
2420+ LobbyHistoryMsgEntry entry ;
2421+ // HistoryMsg only stores the author's nick (peerName) and peerId, not the GXS ID.
2422+ entry.author_id = RsGxsId () ;
2423+ entry.nick = it->peerName ;
2424+ entry.send_time = it->sendTime ;
2425+ entry.message = it->message ;
2426+ entry.incoming = it->incoming ;
2427+
2428+ // Calculate approximate size of this entry
2429+ // author_id (std::string approx 32 bytes) + nick + timestamp (4 bytes) + message + incoming (1 byte)
2430+ size_t entry_size = 32 + entry.nick .size () + 4 + entry.message .size () + 1 + 50 ; // +50 for serialization overhead
2431+
2432+ // If adding this message exceeds the chunk size (and the chunk isn't empty), send the current chunk first
2433+ if (current_chunk_size + entry_size > MAX_CHUNK_SIZE && !data_item->msgs .empty ()) {
2434+ std::cerr << " handleRecvLobbyHistoryRequest(): sending chunk of " << data_item->msgs .size () << " msgs (" << current_chunk_size << " bytes) to " << item->PeerId () << std::endl;
2435+ sendChatItem (data_item) ;
2436+
2437+ // Create a new data item for the next chunk
2438+ data_item = new RsChatLobbyHistoryDataItem ;
2439+ data_item->lobby_id = item->lobby_id ;
2440+ data_item->PeerId (item->PeerId ()) ;
2441+ current_chunk_size = 0 ;
2442+ }
2443+
2444+ data_item->msgs .push_back (entry) ;
2445+ current_chunk_size += entry_size;
2446+ count++;
2447+ }
2448+ }
2449+
2450+ if (!data_item->msgs .empty ()) {
2451+ std::cerr << " handleRecvLobbyHistoryRequest(): sending final chunk of " << data_item->msgs .size () << " msgs to " << item->PeerId () << std::endl;
2452+ sendChatItem (data_item) ;
2453+ } else {
2454+ // Clean up the unused item if we sent everything cleanly in batches, or if there were 0 messages.
2455+ delete data_item;
2456+ }
2457+
2458+ std::cerr << " handleRecvLobbyHistoryRequest(): finished. Sent a total of " << count << " messages." << std::endl;
2459+ }
2460+
2461+ void DistributedChatService::handleRecvLobbyHistoryData (RsChatLobbyHistoryDataItem *item)
2462+ {
2463+ if (!item) return ;
2464+
2465+ std::cerr << " handleRecvLobbyHistoryData(): received " << item->msgs .size () << " messages from peer " << item->PeerId ()
2466+ << " for lobby " << std::hex << item->lobby_id << std::dec << std::endl;
2467+
2468+ // Deduplicate and save to local history database
2469+ std::list<HistoryMsg> existingMsgs;
2470+ mHistMgr ->getMessages (ChatId (item->lobby_id ), existingMsgs, 0 );
2471+
2472+ std::set<std::pair<uint32_t , std::string> > existingSet;
2473+ for (std::list<HistoryMsg>::const_iterator it = existingMsgs.begin (); it != existingMsgs.end (); ++it) {
2474+ existingSet.insert (std::make_pair (it->sendTime , it->message ));
2475+ }
2476+
2477+ int addedCount = 0 ;
2478+ for (std::vector<LobbyHistoryMsgEntry>::const_iterator it = item->msgs .begin (); it != item->msgs .end (); ++it) {
2479+ if (existingSet.find (std::make_pair (it->send_time , it->message )) == existingSet.end ()) {
2480+ // Not a duplicate
2481+ ChatMessage cm;
2482+ cm.chat_id = ChatId (item->lobby_id );
2483+ cm.lobby_peer_gxs_id = it->author_id ;
2484+ cm.peer_alternate_nickname = it->nick ;
2485+ cm.chatflags = 0 ;
2486+ cm.sendTime = it->send_time ;
2487+ cm.recvTime = it->send_time ; // Keep the original send_time for chronological sorting
2488+ cm.msg = it->message ;
2489+ cm.incoming = it->incoming ;
2490+ cm.online = true ;
2491+
2492+ mHistMgr ->addMessage (cm);
2493+ addedCount++;
2494+
2495+ // Add to our set just in case the received chunk contains duplicates within itself
2496+ existingSet.insert (std::make_pair (it->send_time , it->message ));
2497+ }
2498+ }
2499+
2500+ std::cerr << " handleRecvLobbyHistoryData(): merged " << addedCount << " new messages into local history." << std::endl;
2501+
2502+ auto ev = std::make_shared<RsChatLobbyEvent>();
2503+ ev->mEventCode = RsChatLobbyEventCode::CHAT_LOBBY_EVENT_HISTORY_DATA;
2504+ ev->mPeerId = item->PeerId () ;
2505+ ev->mLobbyId = item->lobby_id ;
2506+ ev->mHistoryMsgs = item->msgs ; // Copy the received messages to the event
2507+ rsEvents->postEvent (ev);
2508+ }
2509+
2510+
0 commit comments