Skip to content

Commit b91cfab

Browse files
committed
Add lobby history retrieval protocol: probe/request/data items, chunked transfer, local dedup & merge
1 parent ca004ea commit b91cfab

8 files changed

Lines changed: 413 additions & 5 deletions

File tree

src/chat/distributedchat.cc

Lines changed: 245 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -443,6 +443,10 @@ bool DistributedChatService::handleRecvItem(RsChatItem *item)
443443
case RS_PKT_SUBTYPE_CHAT_LOBBY_UNSUBSCRIBE: handleFriendUnsubscribeLobby (dynamic_cast<RsChatLobbyUnsubscribeItem *>(item)) ; break ;
444444
case RS_PKT_SUBTYPE_CHAT_LOBBY_LIST_REQUEST: handleRecvChatLobbyListRequest (dynamic_cast<RsChatLobbyListRequestItem *>(item)) ; break ;
445445
case RS_PKT_SUBTYPE_CHAT_LOBBY_LIST: handleRecvChatLobbyList (dynamic_cast<RsChatLobbyListItem *>(item)) ; break ;
446+
case RS_PKT_SUBTYPE_CHAT_LOBBY_HISTORY_PROBE: handleRecvLobbyHistoryProbe (dynamic_cast<RsChatLobbyHistoryProbeItem *>(item)) ; break ;
447+
case RS_PKT_SUBTYPE_CHAT_LOBBY_HISTORY_PROBE_RESP: handleRecvLobbyHistoryProbeResponse(dynamic_cast<RsChatLobbyHistoryProbeResponseItem *>(item)) ; break ;
448+
case RS_PKT_SUBTYPE_CHAT_LOBBY_HISTORY_REQUEST: handleRecvLobbyHistoryRequest (dynamic_cast<RsChatLobbyHistoryRequestItem *>(item)) ; break ;
449+
case RS_PKT_SUBTYPE_CHAT_LOBBY_HISTORY_DATA: handleRecvLobbyHistoryData (dynamic_cast<RsChatLobbyHistoryDataItem *>(item)) ; break ;
446450
default: return false ;
447451
}
448452
return true ;
@@ -2269,3 +2273,244 @@ bool DistributedChatService::processLoadListItem(const RsItem *item)
22692273
return false ;
22702274
}
22712275

2276+
/***************** Lobby History Retrieval Protocol *****************/
2277+
2278+
#include "chat/rschatitems.h"
2279+
#include "pqi/p3historymgr.h"
2280+
2281+
bool DistributedChatService::requestLobbyHistory(const ChatLobbyId& lobby_id)
2282+
{
2283+
RsStackMutex stack(mDistributedChatMtx); /********** STACK LOCKED MTX ******/
2284+
2285+
std::map<ChatLobbyId,ChatLobbyEntry>::iterator it = _chat_lobbys.find(lobby_id) ;
2286+
2287+
if(it == _chat_lobbys.end())
2288+
{
2289+
std::cerr << "(EE) requestLobbyHistory(): lobby " << std::hex << lobby_id << std::dec << " not found." << std::endl;
2290+
return false ;
2291+
}
2292+
2293+
// Send a probe to every direct friend participating in this lobby
2294+
2295+
for(std::set<RsPeerId>::const_iterator fit(it->second.participating_friends.begin()); fit != it->second.participating_friends.end(); ++fit)
2296+
{
2297+
if(mServControl->isPeerConnected(mServType, *fit))
2298+
{
2299+
RsChatLobbyHistoryProbeItem *item = new RsChatLobbyHistoryProbeItem ;
2300+
item->lobby_id = lobby_id ;
2301+
item->PeerId(*fit) ;
2302+
2303+
sendChatItem(item) ;
2304+
2305+
std::cerr << "requestLobbyHistory(): sent probe to peer " << *fit << " for lobby " << std::hex << lobby_id << std::dec << std::endl;
2306+
}
2307+
}
2308+
2309+
return true ;
2310+
}
2311+
2312+
void DistributedChatService::handleRecvLobbyHistoryProbe(RsChatLobbyHistoryProbeItem *item)
2313+
{
2314+
if(!item) return ;
2315+
2316+
// Check we are subscribed to this lobby
2317+
{
2318+
RsStackMutex stack(mDistributedChatMtx); /********** STACK LOCKED MTX ******/
2319+
2320+
if(_chat_lobbys.find(item->lobby_id) == _chat_lobbys.end())
2321+
{
2322+
std::cerr << "(WW) handleRecvLobbyHistoryProbe(): lobby " << std::hex << item->lobby_id << std::dec << " not found. Ignoring." << std::endl;
2323+
return ;
2324+
}
2325+
}
2326+
2327+
// Retrieve our local history for this lobby
2328+
std::list<HistoryMsg> msgs ;
2329+
mHistMgr->getMessages(ChatId(item->lobby_id), msgs, 0) ; // 0 = get all available
2330+
2331+
uint32_t available_count = (uint32_t)msgs.size() ;
2332+
uint32_t oldest_ts = 0 ;
2333+
2334+
if(!msgs.empty())
2335+
oldest_ts = msgs.front().sendTime ;
2336+
2337+
// Send response back to the requesting peer
2338+
RsChatLobbyHistoryProbeResponseItem *response = new RsChatLobbyHistoryProbeResponseItem ;
2339+
response->lobby_id = item->lobby_id ;
2340+
response->available_count = available_count ;
2341+
response->oldest_timestamp = oldest_ts ;
2342+
response->PeerId(item->PeerId()) ;
2343+
2344+
sendChatItem(response) ;
2345+
2346+
std::cerr << "handleRecvLobbyHistoryProbe(): responded to " << item->PeerId() << "" << available_count << " msgs available, oldest TS=" << oldest_ts << std::endl;
2347+
}
2348+
2349+
void DistributedChatService::handleRecvLobbyHistoryProbeResponse(RsChatLobbyHistoryProbeResponseItem *item)
2350+
{
2351+
if(!item) return ;
2352+
2353+
std::cerr << "handleRecvLobbyHistoryProbeResponse(): peer " << item->PeerId()
2354+
<< " has " << item->available_count << " messages for lobby " << std::hex << item->lobby_id << std::dec
2355+
<< ", oldest TS=" << item->oldest_timestamp << std::endl;
2356+
2357+
auto ev = std::make_shared<RsChatLobbyEvent>();
2358+
ev->mEventCode = RsChatLobbyEventCode::CHAT_LOBBY_EVENT_HISTORY_PROBE_RESPONSE;
2359+
ev->mPeerId = item->PeerId() ;
2360+
ev->mLobbyId = item->lobby_id ;
2361+
ev->mGenericCount = item->available_count ;
2362+
ev->mTimeShift = item->oldest_timestamp ; // using mTimeShift to store the timestamp
2363+
rsEvents->postEvent(ev);
2364+
}
2365+
2366+
bool DistributedChatService::requestLobbyHistoryFromPeer(const ChatLobbyId& lobby_id, const RsPeerId& peer_id, uint32_t max_count, uint32_t oldest_ts)
2367+
{
2368+
RsStackMutex stack(mDistributedChatMtx); /********** STACK LOCKED MTX ******/
2369+
2370+
std::map<ChatLobbyId,ChatLobbyEntry>::iterator it = _chat_lobbys.find(lobby_id) ;
2371+
2372+
if(it == _chat_lobbys.end())
2373+
{
2374+
std::cerr << "(EE) requestLobbyHistoryFromPeer(): lobby " << std::hex << lobby_id << std::dec << " not found." << std::endl;
2375+
return false ;
2376+
}
2377+
2378+
if(!mServControl->isPeerConnected(mServType, peer_id))
2379+
{
2380+
std::cerr << "(EE) requestLobbyHistoryFromPeer(): peer " << peer_id << " is not connected." << std::endl;
2381+
return false ;
2382+
}
2383+
2384+
RsChatLobbyHistoryRequestItem *request = new RsChatLobbyHistoryRequestItem ;
2385+
request->lobby_id = lobby_id ;
2386+
request->max_count = max_count ;
2387+
request->oldest_timestamp = oldest_ts ;
2388+
request->PeerId(peer_id) ;
2389+
2390+
sendChatItem(request) ;
2391+
2392+
std::cerr << "requestLobbyHistoryFromPeer(): sent request to peer " << peer_id << " for lobby " << std::hex << lobby_id << std::dec << " (max " << max_count << " msgs, oldest TS=" << oldest_ts << ")" << std::endl;
2393+
return true ;
2394+
}
2395+
2396+
void DistributedChatService::handleRecvLobbyHistoryRequest(RsChatLobbyHistoryRequestItem *item)
2397+
{
2398+
if(!item) return ;
2399+
2400+
// Check we are subscribed to this lobby
2401+
{
2402+
RsStackMutex stack(mDistributedChatMtx); /********** STACK LOCKED MTX ******/
2403+
2404+
if(_chat_lobbys.find(item->lobby_id) == _chat_lobbys.end())
2405+
{
2406+
std::cerr << "(WW) handleRecvLobbyHistoryRequest(): lobby " << std::hex << item->lobby_id << std::dec << " not found. Ignoring." << std::endl;
2407+
return ;
2408+
}
2409+
}
2410+
2411+
// Retrieve local history
2412+
std::list<HistoryMsg> msgs ;
2413+
mHistMgr->getMessages(ChatId(item->lobby_id), msgs, item->max_count) ;
2414+
2415+
RsChatLobbyHistoryDataItem *data_item = new RsChatLobbyHistoryDataItem ;
2416+
data_item->lobby_id = item->lobby_id ;
2417+
data_item->PeerId(item->PeerId()) ;
2418+
size_t count = 0;
2419+
size_t current_chunk_size = 0;
2420+
const size_t MAX_CHUNK_SIZE = 120 * 1024; // ~120 KB to stay safely under 128KB limit
2421+
2422+
for(std::list<HistoryMsg>::const_iterator it(msgs.begin()); it != msgs.end(); ++it)
2423+
{
2424+
if(it->sendTime >= item->oldest_timestamp)
2425+
{
2426+
LobbyHistoryMsgEntry entry ;
2427+
// HistoryMsg only stores the author's nick (peerName) and peerId, not the GXS ID.
2428+
entry.author_id = RsGxsId() ;
2429+
entry.nick = it->peerName ;
2430+
entry.send_time = it->sendTime ;
2431+
entry.message = it->message ;
2432+
entry.incoming = it->incoming ;
2433+
2434+
// Calculate approximate size of this entry
2435+
// author_id (std::string approx 32 bytes) + nick + timestamp (4 bytes) + message + incoming (1 byte)
2436+
size_t entry_size = 32 + entry.nick.size() + 4 + entry.message.size() + 1 + 50; // +50 for serialization overhead
2437+
2438+
// If adding this message exceeds the chunk size (and the chunk isn't empty), send the current chunk first
2439+
if (current_chunk_size + entry_size > MAX_CHUNK_SIZE && !data_item->msgs.empty()) {
2440+
std::cerr << "handleRecvLobbyHistoryRequest(): sending chunk of " << data_item->msgs.size() << " msgs (" << current_chunk_size << " bytes) to " << item->PeerId() << std::endl;
2441+
sendChatItem(data_item) ;
2442+
2443+
// Create a new data item for the next chunk
2444+
data_item = new RsChatLobbyHistoryDataItem ;
2445+
data_item->lobby_id = item->lobby_id ;
2446+
data_item->PeerId(item->PeerId()) ;
2447+
current_chunk_size = 0;
2448+
}
2449+
2450+
data_item->msgs.push_back(entry) ;
2451+
current_chunk_size += entry_size;
2452+
count++;
2453+
}
2454+
}
2455+
2456+
if (!data_item->msgs.empty()) {
2457+
std::cerr << "handleRecvLobbyHistoryRequest(): sending final chunk of " << data_item->msgs.size() << " msgs to " << item->PeerId() << std::endl;
2458+
sendChatItem(data_item) ;
2459+
} else {
2460+
// Clean up the unused item if we sent everything cleanly in batches, or if there were 0 messages.
2461+
delete data_item;
2462+
}
2463+
2464+
std::cerr << "handleRecvLobbyHistoryRequest(): finished. Sent a total of " << count << " messages." << std::endl;
2465+
}
2466+
2467+
void DistributedChatService::handleRecvLobbyHistoryData(RsChatLobbyHistoryDataItem *item)
2468+
{
2469+
if(!item) return ;
2470+
2471+
std::cerr << "handleRecvLobbyHistoryData(): received " << item->msgs.size() << " messages from peer " << item->PeerId()
2472+
<< " for lobby " << std::hex << item->lobby_id << std::dec << std::endl;
2473+
2474+
// Deduplicate and save to local history database
2475+
std::list<HistoryMsg> existingMsgs;
2476+
mHistMgr->getMessages(ChatId(item->lobby_id), existingMsgs, 0);
2477+
2478+
std::set<std::pair<uint32_t, std::string> > existingSet;
2479+
for (std::list<HistoryMsg>::const_iterator it = existingMsgs.begin(); it != existingMsgs.end(); ++it) {
2480+
existingSet.insert(std::make_pair(it->sendTime, it->message));
2481+
}
2482+
2483+
int addedCount = 0;
2484+
for (std::vector<LobbyHistoryMsgEntry>::const_iterator it = item->msgs.begin(); it != item->msgs.end(); ++it) {
2485+
if (existingSet.find(std::make_pair(it->send_time, it->message)) == existingSet.end()) {
2486+
// Not a duplicate
2487+
ChatMessage cm;
2488+
cm.chat_id = ChatId(item->lobby_id);
2489+
cm.lobby_peer_gxs_id = it->author_id;
2490+
cm.peer_alternate_nickname = it->nick;
2491+
cm.chatflags = 0;
2492+
cm.sendTime = it->send_time;
2493+
cm.recvTime = it->send_time; // Keep the original send_time for chronological sorting
2494+
cm.msg = it->message;
2495+
cm.incoming = it->incoming;
2496+
cm.online = true;
2497+
2498+
mHistMgr->addMessage(cm);
2499+
addedCount++;
2500+
2501+
// Add to our set just in case the received chunk contains duplicates within itself
2502+
existingSet.insert(std::make_pair(it->send_time, it->message));
2503+
}
2504+
}
2505+
2506+
std::cerr << "handleRecvLobbyHistoryData(): merged " << addedCount << " new messages into local history." << std::endl;
2507+
2508+
auto ev = std::make_shared<RsChatLobbyEvent>();
2509+
ev->mEventCode = RsChatLobbyEventCode::CHAT_LOBBY_EVENT_HISTORY_DATA;
2510+
ev->mPeerId = item->PeerId() ;
2511+
ev->mLobbyId = item->lobby_id ;
2512+
ev->mHistoryMsgs = item->msgs ; // Copy the received messages to the event
2513+
rsEvents->postEvent(ev);
2514+
}
2515+
2516+

src/chat/distributedchat.h

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,11 @@ class RsChatItem ;
4646
class RsChatMsgItem ;
4747
class RsGixs ;
4848

49+
class RsChatLobbyHistoryProbeItem ;
50+
class RsChatLobbyHistoryProbeResponseItem ;
51+
class RsChatLobbyHistoryRequestItem ;
52+
class RsChatLobbyHistoryDataItem ;
53+
4954
class DistributedChatService
5055
{
5156
public:
@@ -79,6 +84,10 @@ class DistributedChatService
7984
void getListOfNearbyChatLobbies(std::vector<VisibleChatLobbyRecord>& public_lobbies) ;
8085
bool joinVisibleChatLobby(const ChatLobbyId& id, const RsGxsId &gxs_id) ;
8186

87+
// Lobby history retrieval protocol
88+
bool requestLobbyHistory(const ChatLobbyId& lobby_id) ;
89+
bool requestLobbyHistoryFromPeer(const ChatLobbyId& lobby_id, const RsPeerId& peer_id, uint32_t max_count, uint32_t oldest_ts) ;
90+
8291
protected:
8392
bool handleRecvItem(RsChatItem *) ;
8493

@@ -125,6 +134,12 @@ class DistributedChatService
125134
void sendLobbyStatusNewPeer(const ChatLobbyId& lobby_id) ;
126135
void sendLobbyStatusKeepAlive(const ChatLobbyId&) ;
127136

137+
// Lobby history retrieval handlers
138+
void handleRecvLobbyHistoryProbe(RsChatLobbyHistoryProbeItem *item) ;
139+
void handleRecvLobbyHistoryProbeResponse(RsChatLobbyHistoryProbeResponseItem *item) ;
140+
void handleRecvLobbyHistoryRequest(RsChatLobbyHistoryRequestItem *item) ;
141+
void handleRecvLobbyHistoryData(RsChatLobbyHistoryDataItem *item) ;
142+
128143
bool locked_initLobbyBouncableObject(const ChatLobbyId& id,RsChatLobbyBouncingObject&) ;
129144
void locked_printDebugInfo() const ;
130145
RsGxsId locked_getDefaultIdentity();

src/chat/p3chatservice.cc

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -554,6 +554,16 @@ ChatLobbyId p3ChatService::createChatLobby(const std::string& lobby_name,const R
554554
return DistributedChatService::createChatLobby(lobby_name,lobby_identity,lobby_topic,invited_friends,privacy_type) ;
555555
}
556556

557+
bool p3ChatService::requestLobbyHistory(const ChatLobbyId& lobby_id)
558+
{
559+
return DistributedChatService::requestLobbyHistory(lobby_id) ;
560+
}
561+
562+
bool p3ChatService::requestLobbyHistoryFromPeer(const ChatLobbyId& lobby_id, const RsPeerId& peer_id, uint32_t max_count, uint32_t oldest_ts)
563+
{
564+
return DistributedChatService::requestLobbyHistoryFromPeer(lobby_id, peer_id, max_count, oldest_ts) ;
565+
}
566+
557567

558568
void p3ChatService::sendChatItem(RsChatItem *item)
559569
{

src/chat/p3chatservice.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,8 @@ class p3ChatService :
138138
virtual void getDefaultIdentityForChatLobby(RsGxsId& nick_name) override;
139139
virtual void setLobbyAutoSubscribe(const ChatLobbyId& lobby_id, const bool autoSubscribe) override;
140140
virtual bool getLobbyAutoSubscribe(const ChatLobbyId& lobby_id) override;
141+
virtual bool requestLobbyHistory(const ChatLobbyId& lobby_id) override;
142+
virtual bool requestLobbyHistoryFromPeer(const ChatLobbyId& lobby_id, const RsPeerId& peer_id, uint32_t max_count, uint32_t oldest_ts) override;
141143

142144
/** methods that will call the DistantChatService parent
143145
*/

src/chat/rschatitems.cc

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,10 @@ RsItem *RsChatSerialiser::create_item(uint16_t service_id,uint8_t item_sub_id) c
5656
case RS_PKT_SUBTYPE_CHAT_AVATAR_INFO: return new RsChatAvatarInfoItem();
5757
case RS_PKT_SUBTYPE_CHAT_AVATAR_CONFIG: return new RsChatAvatarConfigItem();
5858
case RS_PKT_SUBTYPE_OUTGOING_MAP: return new PrivateOugoingMapItem();
59+
case RS_PKT_SUBTYPE_CHAT_LOBBY_HISTORY_PROBE: return new RsChatLobbyHistoryProbeItem();
60+
case RS_PKT_SUBTYPE_CHAT_LOBBY_HISTORY_PROBE_RESP: return new RsChatLobbyHistoryProbeResponseItem();
61+
case RS_PKT_SUBTYPE_CHAT_LOBBY_HISTORY_REQUEST: return new RsChatLobbyHistoryRequestItem();
62+
case RS_PKT_SUBTYPE_CHAT_LOBBY_HISTORY_DATA: return new RsChatLobbyHistoryDataItem();
5963
default:
6064
std::cerr << "Unknown packet type in chat!" << std::endl;
6165
return NULL;
@@ -220,3 +224,32 @@ void PrivateOugoingMapItem::serial_process(
220224
RsGenericSerializer::SerializeJob j,
221225
RsGenericSerializer::SerializeContext& ctx )
222226
{ RS_SERIAL_PROCESS(store); }
227+
228+
/***************** Lobby History Retrieval Protocol *****************/
229+
230+
231+
232+
void RsChatLobbyHistoryProbeItem::serial_process(RsGenericSerializer::SerializeJob j,RsGenericSerializer::SerializeContext& ctx)
233+
{
234+
RsTypeSerializer::serial_process<uint64_t>(j,ctx,lobby_id,"lobby_id") ;
235+
}
236+
237+
void RsChatLobbyHistoryProbeResponseItem::serial_process(RsGenericSerializer::SerializeJob j,RsGenericSerializer::SerializeContext& ctx)
238+
{
239+
RsTypeSerializer::serial_process<uint64_t>(j,ctx,lobby_id, "lobby_id") ;
240+
RsTypeSerializer::serial_process<uint32_t>(j,ctx,available_count, "available_count") ;
241+
RsTypeSerializer::serial_process<uint32_t>(j,ctx,oldest_timestamp, "oldest_timestamp") ;
242+
}
243+
244+
void RsChatLobbyHistoryRequestItem::serial_process(RsGenericSerializer::SerializeJob j,RsGenericSerializer::SerializeContext& ctx)
245+
{
246+
RsTypeSerializer::serial_process<uint64_t>(j,ctx,lobby_id, "lobby_id") ;
247+
RsTypeSerializer::serial_process<uint32_t>(j,ctx,max_count, "max_count") ;
248+
RsTypeSerializer::serial_process<uint32_t>(j,ctx,oldest_timestamp, "oldest_timestamp") ;
249+
}
250+
251+
void RsChatLobbyHistoryDataItem::serial_process(RsGenericSerializer::SerializeJob j,RsGenericSerializer::SerializeContext& ctx)
252+
{
253+
RsTypeSerializer::serial_process<uint64_t>(j,ctx,lobby_id,"lobby_id") ;
254+
RsTypeSerializer::serial_process (j,ctx,msgs, "msgs") ;
255+
}

0 commit comments

Comments
 (0)