Skip to content

Commit 47cc5db

Browse files
committed
feat(sync): add --local-only flag for faster incremental sync
1 parent 9255502 commit 47cc5db

3 files changed

Lines changed: 385 additions & 2 deletions

File tree

src/app/sync.rs

Lines changed: 353 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,8 @@ use crate::store::UpsertMessageParams;
33
use anyhow::{Context, Result};
44
use chrono::{DateTime, Utc};
55
use grammers_client::types::{Media, Message as TgMessage, Peer};
6-
use grammers_session::defs::PeerRef;
6+
use grammers_session::defs::{PeerId, PeerRef};
7+
use grammers_session::Session;
78
use grammers_tl_types as tl;
89
use std::collections::HashSet;
910
use std::path::Path;
@@ -31,6 +32,8 @@ pub struct SyncOptions {
3132
pub show_progress: bool,
3233
pub incremental: bool,
3334
pub messages_per_chat: usize,
35+
/// Skip iter_dialogs(), only sync chats that already exist in local DB with checkpoints
36+
pub local_only: bool,
3437
}
3538

3639
/// Get media type string and file extension from grammers Media enum
@@ -143,6 +146,66 @@ pub struct SyncResult {
143146
}
144147

145148
impl App {
149+
/// Try to resolve a chat ID to a PeerRef from the session cache (no API calls).
150+
/// Returns None if the chat is not cached in the session.
151+
fn resolve_peer_from_session(&self, chat_id: i64, kind: &str) -> Option<PeerRef> {
152+
// Try based on the known type first
153+
match kind {
154+
"channel" | "group" => {
155+
// Channels and megagroups use channel peer IDs
156+
let channel_peer_id = PeerId::channel(chat_id);
157+
if let Some(info) = self.tg.session.peer(channel_peer_id) {
158+
return Some(PeerRef {
159+
id: channel_peer_id,
160+
auth: info.auth(),
161+
});
162+
}
163+
}
164+
"user" => {
165+
let user_peer_id = PeerId::user(chat_id);
166+
if let Some(info) = self.tg.session.peer(user_peer_id) {
167+
return Some(PeerRef {
168+
id: user_peer_id,
169+
auth: info.auth(),
170+
});
171+
}
172+
}
173+
_ => {}
174+
}
175+
176+
// Fallback: try all peer types
177+
// Try as channel first (most common for groups)
178+
let channel_peer_id = PeerId::channel(chat_id);
179+
if let Some(info) = self.tg.session.peer(channel_peer_id) {
180+
return Some(PeerRef {
181+
id: channel_peer_id,
182+
auth: info.auth(),
183+
});
184+
}
185+
186+
// Try as user
187+
let user_peer_id = PeerId::user(chat_id);
188+
if let Some(info) = self.tg.session.peer(user_peer_id) {
189+
return Some(PeerRef {
190+
id: user_peer_id,
191+
auth: info.auth(),
192+
});
193+
}
194+
195+
// Try as small group chat (basic groups have different IDs)
196+
if chat_id > 0 && chat_id <= 999999999999 {
197+
let chat_peer_id = PeerId::chat(chat_id);
198+
if let Some(info) = self.tg.session.peer(chat_peer_id) {
199+
return Some(PeerRef {
200+
id: chat_peer_id,
201+
auth: info.auth(),
202+
});
203+
}
204+
}
205+
206+
None
207+
}
208+
146209
/// Download media from a message if present and return (media_type, media_path)
147210
async fn download_message_media(
148211
&self,
@@ -233,6 +296,295 @@ impl App {
233296
let mut last_progress_time = std::time::Instant::now();
234297
let progress_interval = Duration::from_millis(500);
235298

299+
// Local-only mode: skip iter_dialogs(), only sync chats already in local DB
300+
if opts.local_only {
301+
if opts.show_progress {
302+
eprint!("\rLocal sync... 0 chats, 0 messages");
303+
}
304+
305+
// Get all chats that have sync checkpoints
306+
let chats_with_checkpoints = self.store.list_chats_with_checkpoint().await?;
307+
let total_chats = chats_with_checkpoints.len();
308+
309+
for (idx, chat) in chats_with_checkpoints.into_iter().enumerate() {
310+
// Skip ignored chats
311+
if should_ignore(chat.id, &chat.kind) {
312+
continue;
313+
}
314+
315+
// Try to resolve peer from session (no API call)
316+
let peer_ref = match self.resolve_peer_from_session(chat.id, &chat.kind) {
317+
Some(p) => p,
318+
None => {
319+
log::debug!(
320+
"Skipping chat {} ({}) - not in session cache",
321+
chat.name,
322+
chat.id
323+
);
324+
continue;
325+
}
326+
};
327+
328+
chats_stored += 1;
329+
330+
// Get the last synced message ID
331+
let last_sync_id = chat.last_sync_message_id;
332+
333+
// Fetch messages incrementally
334+
let mut message_iter = client.iter_messages(peer_ref);
335+
let mut count = 0;
336+
let mut latest_ts: Option<DateTime<Utc>> = None;
337+
let mut highest_msg_id: Option<i64> = None;
338+
let mut topic_counts: std::collections::HashMap<i32, u64> =
339+
std::collections::HashMap::new();
340+
341+
while let Some(msg) = message_iter.next().await.with_context(|| {
342+
format!(
343+
"Failed to fetch messages for chat {} ({})",
344+
chat.name, chat.id
345+
)
346+
})? {
347+
let msg_id = msg.id() as i64;
348+
349+
// Stop when we hit a message we've already seen
350+
if let Some(last_id) = last_sync_id {
351+
if msg_id <= last_id {
352+
log::debug!(
353+
"Chat {}: reached last synced message {} (stopping at {})",
354+
chat.id,
355+
last_id,
356+
msg_id
357+
);
358+
break;
359+
}
360+
}
361+
362+
if count >= INCREMENTAL_MAX_MESSAGES {
363+
break;
364+
}
365+
count += 1;
366+
367+
// Track the highest message ID we've seen
368+
if highest_msg_id.is_none() || msg_id > highest_msg_id.unwrap() {
369+
highest_msg_id = Some(msg_id);
370+
}
371+
372+
let msg_ts = msg.date();
373+
if latest_ts.is_none() || msg_ts > latest_ts.unwrap() {
374+
latest_ts = Some(msg_ts);
375+
}
376+
377+
let sender_id = msg.sender().map(|s| s.id().bare_id()).unwrap_or(0);
378+
let from_me = msg.outgoing();
379+
380+
let text = msg.text().to_string();
381+
let reply_to_id = msg.reply_to_message_id().map(|id| id as i64);
382+
let topic_id = if chat.is_forum {
383+
extract_topic_id(&msg)
384+
} else {
385+
None
386+
};
387+
388+
// Track per-topic counts for forums
389+
if let Some(tid) = topic_id {
390+
*topic_counts.entry(tid).or_insert(0) += 1;
391+
}
392+
393+
// Download media if enabled
394+
let (media_type, media_path) = if opts.download_media {
395+
self.download_message_media(&msg, chat.id).await?
396+
} else {
397+
(msg.media().map(|_| "media".to_string()), None)
398+
};
399+
400+
// Clone media_type for use in stream output after the move
401+
let media_type_out = media_type.clone();
402+
403+
self.store
404+
.upsert_message(UpsertMessageParams {
405+
id: msg.id() as i64,
406+
chat_id: chat.id,
407+
sender_id,
408+
ts: msg_ts,
409+
edit_ts: msg.edit_date(),
410+
from_me,
411+
text: text.clone(),
412+
media_type,
413+
media_path,
414+
reply_to_id,
415+
topic_id,
416+
})
417+
.await?;
418+
messages_stored += 1;
419+
420+
// Show progress periodically
421+
if opts.show_progress && last_progress_time.elapsed() >= progress_interval {
422+
eprint!(
423+
"\rLocal sync... {}/{} chats, {} messages",
424+
idx + 1,
425+
total_chats,
426+
messages_stored
427+
);
428+
last_progress_time = std::time::Instant::now();
429+
}
430+
431+
// Output
432+
match opts.output {
433+
OutputMode::Text => {
434+
let from_label = if from_me {
435+
"me".to_string()
436+
} else {
437+
sender_id.to_string()
438+
};
439+
let short_text = text.replace('\n', " ");
440+
let short_text = if short_text.len() > 100 {
441+
let truncate_at = short_text
442+
.char_indices()
443+
.take_while(|(i, _)| *i < 100)
444+
.last()
445+
.map(|(i, c)| i + c.len_utf8())
446+
.unwrap_or(0);
447+
format!("{}…", &short_text[..truncate_at])
448+
} else {
449+
short_text
450+
};
451+
println!(
452+
"from={} chat={} id={} text={}",
453+
from_label,
454+
chat.id,
455+
msg.id(),
456+
short_text
457+
);
458+
}
459+
OutputMode::Json => {
460+
let obj = serde_json::json!({
461+
"from_me": from_me,
462+
"sender": sender_id,
463+
"chat": chat.id,
464+
"id": msg.id(),
465+
"timestamp": msg_ts.to_rfc3339(),
466+
"text": text,
467+
});
468+
println!("{}", serde_json::to_string(&obj).unwrap_or_default());
469+
}
470+
OutputMode::Stream => {
471+
use std::io::Write;
472+
let obj = serde_json::json!({
473+
"type": "message",
474+
"from_me": from_me,
475+
"sender_id": sender_id,
476+
"chat_id": chat.id,
477+
"id": msg.id(),
478+
"ts": msg_ts.to_rfc3339(),
479+
"text": text,
480+
"topic_id": topic_id,
481+
"media_type": media_type_out,
482+
});
483+
println!("{}", serde_json::to_string(&obj).unwrap_or_default());
484+
let _ = std::io::stdout().flush();
485+
}
486+
OutputMode::None => {}
487+
}
488+
}
489+
490+
// Update chat's last_message_ts if we got new messages
491+
if let Some(ts) = latest_ts {
492+
self.store
493+
.upsert_chat(
494+
chat.id,
495+
&chat.kind,
496+
&chat.name,
497+
chat.username.as_deref(),
498+
Some(ts),
499+
chat.is_forum,
500+
)
501+
.await?;
502+
}
503+
504+
// Update last_sync_message_id for incremental sync
505+
if let Some(high_id) = highest_msg_id {
506+
self.store
507+
.update_last_sync_message_id(chat.id, high_id)
508+
.await?;
509+
}
510+
511+
// Track per-chat summary if messages were synced
512+
if count > 0 {
513+
// Build topic summaries for forums
514+
let new_topics: Vec<TopicSyncSummary> =
515+
if chat.is_forum && !topic_counts.is_empty() {
516+
let mut topic_summaries = Vec::new();
517+
for (tid, msg_count) in &topic_counts {
518+
let topic_name = self
519+
.store
520+
.get_topic(chat.id, *tid)
521+
.await
522+
.ok()
523+
.flatten()
524+
.map(|t| t.name.clone())
525+
.unwrap_or_else(|| format!("Topic {}", tid));
526+
topic_summaries.push(TopicSyncSummary {
527+
topic_id: *tid,
528+
topic_name,
529+
messages_synced: *msg_count,
530+
});
531+
}
532+
topic_summaries
533+
} else {
534+
Vec::new()
535+
};
536+
537+
per_chat_map
538+
.entry(chat.id)
539+
.and_modify(|existing| {
540+
existing.messages_synced += count as u64;
541+
for new_topic in &new_topics {
542+
if let Some(existing_topic) = existing
543+
.topics
544+
.iter_mut()
545+
.find(|t| t.topic_id == new_topic.topic_id)
546+
{
547+
existing_topic.messages_synced += new_topic.messages_synced;
548+
} else {
549+
existing.topics.push(new_topic.clone());
550+
}
551+
}
552+
})
553+
.or_insert(ChatSyncSummary {
554+
chat_id: chat.id,
555+
chat_name: chat.name.clone(),
556+
messages_synced: count as u64,
557+
topics: new_topics,
558+
});
559+
}
560+
}
561+
562+
if opts.show_progress {
563+
eprint!("\r\x1b[K"); // Clear line
564+
}
565+
eprintln!(
566+
"Local sync complete: {} chats checked, {} messages",
567+
chats_stored, messages_stored
568+
);
569+
570+
// Convert HashMap to Vec and sort topics by message count descending
571+
let per_chat: Vec<ChatSyncSummary> = per_chat_map
572+
.into_values()
573+
.map(|mut summary| {
574+
summary
575+
.topics
576+
.sort_by(|a, b| b.messages_synced.cmp(&a.messages_synced));
577+
summary
578+
})
579+
.collect();
580+
581+
return Ok(SyncResult {
582+
messages_stored,
583+
chats_stored,
584+
per_chat,
585+
});
586+
}
587+
236588
// Phase 1: Bootstrap — fetch recent dialogs and their messages
237589
if opts.show_progress {
238590
eprint!("\rSyncing... 0 chats, 0 messages");

0 commit comments

Comments
 (0)