Skip to content

Commit 82f5857

Browse files
authored
Merge pull request #14 from vectordotdev/feat/fetch-since-and-compact
Add --since to fetch commands and compact for deduplication
2 parents fb6ad77 + 50b2858 commit 82f5857

6 files changed

Lines changed: 569 additions & 40 deletions

File tree

src/commands/compact.rs

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
use anyhow::{Context, Result};
2+
use serde_json::Value;
3+
use std::collections::HashMap;
4+
use std::fs;
5+
use std::path::Path;
6+
7+
/// Deduplicate JSON year files in a directory.
8+
/// For issues/PRs: dedup by "id". For discussions: dedup by "number".
9+
pub fn run(dir: &str) -> Result<()> {
10+
let path = Path::new(dir);
11+
if !path.is_dir() {
12+
anyhow::bail!("{dir} is not a directory");
13+
}
14+
15+
let mut entries: Vec<_> = fs::read_dir(path)?
16+
.filter_map(|e| e.ok())
17+
.filter(|e| e.path().extension().is_some_and(|ext| ext == "json"))
18+
.collect();
19+
entries.sort_by_key(|e| e.path());
20+
21+
if entries.is_empty() {
22+
println!("No JSON files found in {dir}");
23+
return Ok(());
24+
}
25+
26+
// Detect key field from first file
27+
let first_json = fs::read_to_string(entries[0].path())?;
28+
let first_items: Vec<Value> = serde_json::from_str(&first_json)?;
29+
let key_field = if first_items.first().and_then(|v| v.get("number")).is_some()
30+
&& first_items.first().and_then(|v| v.get("id")).is_none()
31+
{
32+
"number"
33+
} else {
34+
"id"
35+
};
36+
println!("Deduplicating by \"{key_field}\" in {dir}");
37+
38+
let mut total_before = 0;
39+
let mut total_after = 0;
40+
41+
for entry in entries {
42+
let file_path = entry.path();
43+
let json = fs::read_to_string(&file_path)
44+
.with_context(|| format!("Failed to read {}", file_path.display()))?;
45+
let items: Vec<Value> = serde_json::from_str(&json)?;
46+
let before = items.len();
47+
48+
// Dedup: last occurrence wins (preserves most recent state)
49+
let mut seen: HashMap<String, usize> = HashMap::new();
50+
let mut deduped: Vec<Value> = Vec::new();
51+
for item in items {
52+
let key = item[key_field].to_string();
53+
if let Some(idx) = seen.get(&key) {
54+
deduped[*idx] = item;
55+
} else {
56+
seen.insert(key, deduped.len());
57+
deduped.push(item);
58+
}
59+
}
60+
61+
let after = deduped.len();
62+
total_before += before;
63+
total_after += after;
64+
65+
if before != after {
66+
let json_str = serde_json::to_string_pretty(&deduped)?;
67+
fs::write(&file_path, json_str)?;
68+
println!(
69+
" {}: {} -> {} (removed {} duplicates)",
70+
file_path.display(),
71+
before,
72+
after,
73+
before - after
74+
);
75+
} else {
76+
println!(" {}: {} items (no duplicates)", file_path.display(), after);
77+
}
78+
}
79+
80+
println!(
81+
"Total: {} -> {} (removed {} duplicates)",
82+
total_before,
83+
total_after,
84+
total_before - total_after
85+
);
86+
Ok(())
87+
}

src/commands/fetch_discussions.rs

Lines changed: 153 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
1+
use crate::commands::fetch_issues::parse_since;
12
use crate::config::Config;
23
use anyhow::{Context, Result};
34
use reqwest::blocking::Client;
45
use serde::{Deserialize, Serialize};
5-
use serde_json::json;
6+
use serde_json::{json, Value};
7+
use std::collections::BTreeMap;
68
use std::fs;
79
use std::path::Path;
810

@@ -45,6 +47,42 @@ query($owner: String!, $name: String!, $first: Int!, $after: String) {
4547
}
4648
"#;
4749

50+
const QUERY_SINCE: &str = r#"
51+
query($owner: String!, $name: String!, $first: Int!, $after: String) {
52+
repository(owner: $owner, name: $name) {
53+
discussions(first: $first, after: $after, orderBy: {field: UPDATED_AT, direction: DESC}) {
54+
pageInfo {
55+
endCursor
56+
hasNextPage
57+
}
58+
nodes {
59+
number
60+
title
61+
bodyText
62+
url
63+
createdAt
64+
updatedAt
65+
closedAt
66+
closed
67+
stateReason
68+
isAnswered
69+
locked
70+
author {
71+
login
72+
}
73+
category {
74+
name
75+
}
76+
comments {
77+
totalCount
78+
}
79+
upvoteCount
80+
}
81+
}
82+
}
83+
}
84+
"#;
85+
4886
#[derive(Deserialize, Serialize)]
4987
#[serde(rename_all = "camelCase")]
5088
pub struct Discussion {
@@ -111,28 +149,129 @@ struct GraphQlResponse {
111149
errors: Option<serde_json::Value>,
112150
}
113151

114-
pub fn run(config: &Config) -> Result<()> {
115-
run_with_client(&Client::new(), config)
152+
pub fn run(config: &Config, since: Option<&str>) -> Result<()> {
153+
run_with_client(&Client::new(), config, since)
116154
}
117155

118-
pub fn run_with_client(client: &Client, config: &Config) -> Result<()> {
119-
let discussions = fetch_all_discussions(client, config)?;
156+
pub fn run_with_client(client: &Client, config: &Config, since: Option<&str>) -> Result<()> {
157+
let since_ts = since.map(parse_since).transpose()?;
158+
159+
let discussions = if let Some(ref ts) = since_ts {
160+
println!("Fetching discussions updated since {ts}...");
161+
fetch_discussions_since(client, config, ts)?
162+
} else {
163+
fetch_all_discussions(client, config)?
164+
};
120165

121166
println!("Total discussions fetched: {}", discussions.len());
122167

123-
let out_dir = Path::new("out/historical/discussions");
124-
fs::create_dir_all(out_dir)?;
125-
let out_file = out_dir.join(format!(
126-
"{}_{}_discussions.json",
127-
config.repo_owner, config.repo_name
128-
));
129-
let json = serde_json::to_string_pretty(&discussions)?;
130-
fs::write(&out_file, json)?;
131-
println!("Saved to '{}'", out_file.display());
168+
let repo_prefix = format!("{}_{}", config.repo_owner, config.repo_name);
169+
let out_dir = Path::new("data").join(&repo_prefix).join("discussions");
170+
fs::create_dir_all(&out_dir)?;
171+
172+
// Bucket by year based on createdAt
173+
let mut by_year: BTreeMap<String, Vec<Discussion>> = BTreeMap::new();
174+
for d in discussions {
175+
let year = d.created_at.get(..4).unwrap_or("unknown").to_string();
176+
by_year.entry(year).or_default().push(d);
177+
}
178+
179+
for (year, items) in &by_year {
180+
let path = out_dir.join(format!("{year}.json"));
181+
let mut existing: Vec<Value> = if path.exists() {
182+
let json = fs::read_to_string(&path)?;
183+
serde_json::from_str(&json).unwrap_or_default()
184+
} else {
185+
Vec::new()
186+
};
187+
let new_values: Vec<Value> = items.iter()
188+
.map(|d| serde_json::to_value(d).unwrap())
189+
.collect();
190+
existing.extend(new_values);
191+
let json_str = serde_json::to_string_pretty(&existing)?;
192+
fs::write(&path, json_str)?;
193+
println!(" {year}.json: appended {} discussions", items.len());
194+
}
132195

133196
Ok(())
134197
}
135198

199+
fn fetch_discussions_since(client: &Client, config: &Config, since: &str) -> Result<Vec<Discussion>> {
200+
let mut discussions = Vec::new();
201+
let mut after: Option<String> = None;
202+
let mut page = 1;
203+
204+
loop {
205+
println!("Fetching page {page} of discussions...");
206+
207+
let body = json!({
208+
"query": QUERY_SINCE,
209+
"variables": {
210+
"owner": config.repo_owner,
211+
"name": config.repo_name,
212+
"first": PAGE_SIZE,
213+
"after": after,
214+
}
215+
});
216+
217+
let response = client
218+
.post(GRAPHQL_URL)
219+
.header("Authorization", format!("Bearer {}", config.github_token))
220+
.header("Accept", "application/vnd.github+json")
221+
.header("User-Agent", "github-tools")
222+
.json(&body)
223+
.send()
224+
.context("Failed to send GraphQL request")?;
225+
226+
if !response.status().is_success() {
227+
let status = response.status();
228+
let text = response.text().unwrap_or_default();
229+
eprintln!("Warning: GraphQL request failed ({status}): {text}");
230+
break;
231+
}
232+
233+
let result: GraphQlResponse = response
234+
.json()
235+
.context("Failed to parse GraphQL response")?;
236+
237+
if let Some(errors) = result.errors {
238+
eprintln!("Warning: GraphQL errors: {errors}");
239+
break;
240+
}
241+
242+
let page_data = result
243+
.data
244+
.context("Missing 'data' in GraphQL response")?
245+
.repository
246+
.discussions;
247+
248+
let has_next = page_data.page_info.has_next_page;
249+
after = page_data.page_info.end_cursor;
250+
251+
let mut hit_boundary = false;
252+
for d in page_data.nodes {
253+
if d.updated_at.as_str() < since {
254+
hit_boundary = true;
255+
} else {
256+
discussions.push(d);
257+
}
258+
}
259+
260+
println!("Fetched {} discussions so far...", discussions.len());
261+
262+
if hit_boundary {
263+
println!("Reached discussions older than --since cutoff.");
264+
break;
265+
}
266+
if !has_next {
267+
break;
268+
}
269+
page += 1;
270+
}
271+
272+
Ok(discussions)
273+
}
274+
136275
fn fetch_all_discussions(client: &Client, config: &Config) -> Result<Vec<Discussion>> {
137276
let mut discussions = Vec::new();
138277
let mut after: Option<String> = None;

0 commit comments

Comments
 (0)