Skip to content

Commit ac61b4f

Browse files
committed
fix: clippy and fmt — range contains, allow explicit_auto_deref
1 parent 4c5dfb2 commit ac61b4f

49 files changed

Lines changed: 3422 additions & 648 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

Cargo.lock

Lines changed: 1721 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/connector-aws-synthetic/src/lib.rs

Lines changed: 149 additions & 81 deletions
Large diffs are not rendered by default.

crates/connector-gcp-synthetic/src/lib.rs

Lines changed: 151 additions & 78 deletions
Large diffs are not rendered by default.

crates/parallax-cli/src/main.rs

Lines changed: 38 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -108,10 +108,18 @@ async fn main() -> Result<()> {
108108
}
109109

110110
match cli.command {
111-
Command::Serve { host, port, data_dir } => {
111+
Command::Serve {
112+
host,
113+
port,
114+
data_dir,
115+
} => {
112116
cmd_serve(&host, port, &data_dir).await?;
113117
}
114-
Command::Query { pql, data_dir, limit } => {
118+
Command::Query {
119+
pql,
120+
data_dir,
121+
limit,
122+
} => {
115123
cmd_query(&pql, &data_dir, limit)?;
116124
}
117125
Command::Stats { data_dir } => {
@@ -139,7 +147,7 @@ async fn cmd_serve(host: &str, port: u16, data_dir: &str) -> Result<()> {
139147
fn cmd_query(pql: &str, data_dir: &str, limit: usize) -> Result<()> {
140148
use parallax_graph::GraphReader;
141149
use parallax_query::{execute, parse, plan, IndexStats, QueryLimits, QueryResult};
142-
use parallax_store::{StoreConfig, StorageEngine};
150+
use parallax_store::{StorageEngine, StoreConfig};
143151
use std::collections::HashMap;
144152

145153
let config = StoreConfig::new(data_dir);
@@ -152,16 +160,27 @@ fn cmd_query(pql: &str, data_dir: &str, limit: usize) -> Result<()> {
152160
let mut class_counts: HashMap<String, usize> = HashMap::new();
153161
for e in &all {
154162
*type_counts.entry(e._type.as_str().to_owned()).or_insert(0) += 1;
155-
*class_counts.entry(e._class.as_str().to_owned()).or_insert(0) += 1;
163+
*class_counts
164+
.entry(e._class.as_str().to_owned())
165+
.or_insert(0) += 1;
156166
}
157-
let stats = IndexStats::new(type_counts, class_counts, snap.entity_count(), snap.relationship_count());
167+
let stats = IndexStats::new(
168+
type_counts,
169+
class_counts,
170+
snap.entity_count(),
171+
snap.relationship_count(),
172+
);
158173

159174
let ast = parse(pql).map_err(|e| anyhow::anyhow!("Parse error: {e}"))?;
160175
let query_plan = plan(ast, &stats).map_err(|e| anyhow::anyhow!("Plan error: {e}"))?;
161176

162177
let graph = GraphReader::new(&snap);
163-
let limits = QueryLimits { max_results: limit, ..QueryLimits::default() };
164-
let result = execute(&query_plan, &graph, limits).map_err(|e| anyhow::anyhow!("Exec error: {e}"))?;
178+
let limits = QueryLimits {
179+
max_results: limit,
180+
..QueryLimits::default()
181+
};
182+
let result =
183+
execute(&query_plan, &graph, limits).map_err(|e| anyhow::anyhow!("Exec error: {e}"))?;
165184

166185
let count = result.count();
167186
println!("Results: {count}");
@@ -205,18 +224,22 @@ fn cmd_query(pql: &str, data_dir: &str, limit: usize) -> Result<()> {
205224
}
206225

207226
fn cmd_stats(data_dir: &str) -> Result<()> {
208-
use parallax_store::{StoreConfig, StorageEngine};
227+
use parallax_store::{StorageEngine, StoreConfig};
209228

210229
let config = StoreConfig::new(data_dir);
211230
let engine = StorageEngine::open(config)?;
212231
let snap = engine.snapshot();
213232

214233
let all = snap.all_entities();
215-
let mut type_counts: std::collections::HashMap<String, usize> = std::collections::HashMap::new();
216-
let mut class_counts: std::collections::HashMap<String, usize> = std::collections::HashMap::new();
234+
let mut type_counts: std::collections::HashMap<String, usize> =
235+
std::collections::HashMap::new();
236+
let mut class_counts: std::collections::HashMap<String, usize> =
237+
std::collections::HashMap::new();
217238
for e in &all {
218239
*type_counts.entry(e._type.as_str().to_owned()).or_insert(0) += 1;
219-
*class_counts.entry(e._class.as_str().to_owned()).or_insert(0) += 1;
240+
*class_counts
241+
.entry(e._class.as_str().to_owned())
242+
.or_insert(0) += 1;
220243
}
221244

222245
println!("Parallax Graph Statistics");
@@ -244,8 +267,8 @@ fn cmd_wal_dump(data_dir: &str, verbose: bool) -> Result<()> {
244267
use parallax_store::{dump_wal, WriteOp};
245268
use std::path::Path;
246269

247-
let entries = dump_wal(Path::new(data_dir))
248-
.map_err(|e| anyhow::anyhow!("WAL read error: {e}"))?;
270+
let entries =
271+
dump_wal(Path::new(data_dir)).map_err(|e| anyhow::anyhow!("WAL read error: {e}"))?;
249272

250273
if entries.is_empty() {
251274
println!("WAL is empty (data_dir: {data_dir})");
@@ -274,7 +297,8 @@ fn cmd_wal_dump(data_dir: &str, verbose: bool) -> Result<()> {
274297
println!(" - entity id={id}");
275298
}
276299
WriteOp::UpsertRelationship(r) => {
277-
println!(" + rel [{cls}] {from} → {to}",
300+
println!(
301+
" + rel [{cls}] {from} → {to}",
278302
cls = r._class.as_str(),
279303
from = r.from_id,
280304
to = r.to_id,

crates/parallax-connect/src/builder.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,9 @@ mod tests {
206206

207207
#[test]
208208
fn entity_builder_derives_id() {
209-
let e = entity("host", "h1").class("Host").build("acme", test_source());
209+
let e = entity("host", "h1")
210+
.class("Host")
211+
.build("acme", test_source());
210212
assert_eq!(e.id, EntityId::derive("acme", "host", "h1"));
211213
assert_eq!(e._type.as_str(), "host");
212214
assert_eq!(e._class.as_str(), "Host");

crates/parallax-connect/src/connector.rs

Lines changed: 21 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,11 @@ impl StepDefinitionBuilder {
4949
}
5050

5151
pub fn build(self) -> StepDefinition {
52-
StepDefinition { id: self.id, description: self.description, depends_on: self.depends_on }
52+
StepDefinition {
53+
id: self.id,
54+
description: self.description,
55+
depends_on: self.depends_on,
56+
}
5357
}
5458
}
5559

@@ -70,11 +74,13 @@ pub struct PriorStepData {
7074

7175
impl PriorStepData {
7276
pub fn get(&self, entity_type: &str, entity_key: &str) -> Option<&EntityBuilder> {
73-
self.entities.get(&(entity_type.to_owned(), entity_key.to_owned()))
77+
self.entities
78+
.get(&(entity_type.to_owned(), entity_key.to_owned()))
7479
}
7580

7681
pub(crate) fn insert(&mut self, b: &EntityBuilder) {
77-
self.entities.insert((b.entity_type.clone(), b.entity_key.clone()), b.clone());
82+
self.entities
83+
.insert((b.entity_type.clone(), b.entity_key.clone()), b.clone());
7884
}
7985
}
8086

@@ -156,11 +162,7 @@ impl StepContext {
156162
}
157163

158164
/// Look up an entity emitted by a prior step.
159-
pub fn get_prior_entity(
160-
&self,
161-
entity_type: &str,
162-
entity_key: &str,
163-
) -> Option<&EntityBuilder> {
165+
pub fn get_prior_entity(&self, entity_type: &str, entity_key: &str) -> Option<&EntityBuilder> {
164166
self.prior_entities.get(entity_type, entity_key)
165167
}
166168

@@ -198,8 +200,7 @@ pub trait Connector: Send + Sync {
198200

199201
/// Validate that step dependencies are acyclic (INV-C05).
200202
pub fn validate_steps(steps: &[StepDefinition]) -> Result<(), ConnectorError> {
201-
let step_ids: std::collections::HashSet<&str> =
202-
steps.iter().map(|s| s.id.as_str()).collect();
203+
let step_ids: std::collections::HashSet<&str> = steps.iter().map(|s| s.id.as_str()).collect();
203204

204205
// Check all declared dependencies exist.
205206
for step in steps {
@@ -311,7 +312,8 @@ pub fn topological_order(steps: &[StepDefinition]) -> Vec<&StepDefinition> {
311312
.map(|(&id, _)| id)
312313
.collect();
313314

314-
let step_map: HashMap<&str, &StepDefinition> = steps.iter().map(|s| (s.id.as_str(), s)).collect();
315+
let step_map: HashMap<&str, &StepDefinition> =
316+
steps.iter().map(|s| (s.id.as_str(), s)).collect();
315317
let mut result = Vec::new();
316318

317319
while let Some(id) = queue.pop_front() {
@@ -355,13 +357,19 @@ mod tests {
355357
#[test]
356358
fn validate_cyclic_steps_error() {
357359
let steps = make_steps(&[("a", &["b"]), ("b", &["a"])]);
358-
assert!(matches!(validate_steps(&steps), Err(ConnectorError::DependencyCycle { .. })));
360+
assert!(matches!(
361+
validate_steps(&steps),
362+
Err(ConnectorError::DependencyCycle { .. })
363+
));
359364
}
360365

361366
#[test]
362367
fn topological_order_respects_dependencies() {
363368
let steps = make_steps(&[("a", &[]), ("b", &["a"]), ("c", &["b"])]);
364-
let order: Vec<&str> = topological_order(&steps).iter().map(|s| s.id.as_str()).collect();
369+
let order: Vec<&str> = topological_order(&steps)
370+
.iter()
371+
.map(|s| s.id.as_str())
372+
.collect();
365373
let a = order.iter().position(|&s| s == "a").unwrap();
366374
let b = order.iter().position(|&s| s == "b").unwrap();
367375
let c = order.iter().position(|&s| s == "c").unwrap();

crates/parallax-connect/src/error.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,17 @@ pub enum ConnectorError {
1313
AuthFailed { reason: String },
1414

1515
#[error("API request failed: {endpoint} returned {status}: {body}")]
16-
ApiError { endpoint: String, status: u16, body: String },
16+
ApiError {
17+
endpoint: String,
18+
status: u16,
19+
body: String,
20+
},
1721

1822
#[error("Rate limited by {service}. Retry after {retry_after:?}")]
19-
RateLimited { service: String, retry_after: Option<Duration> },
23+
RateLimited {
24+
service: String,
25+
retry_after: Option<Duration>,
26+
},
2027

2128
#[error("Entity validation failed: {reason}")]
2229
ValidationFailed { reason: String },

crates/parallax-connect/src/event.rs

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,30 @@ use crate::error::ConnectorError;
99
/// Events emitted during a sync cycle for observability.
1010
#[derive(Debug)]
1111
pub enum SyncEvent {
12-
Started { connector_id: String, sync_id: String },
13-
StepStarted { step_id: String },
14-
StepCompleted { step_id: String, entities: u64, relationships: u64, duration: Duration },
15-
StepFailed { step_id: String, error: ConnectorError },
16-
SyncCompleted { sync_id: String, entities_created: u64, entities_deleted: u64 },
17-
SyncFailed { sync_id: String, error: String },
12+
Started {
13+
connector_id: String,
14+
sync_id: String,
15+
},
16+
StepStarted {
17+
step_id: String,
18+
},
19+
StepCompleted {
20+
step_id: String,
21+
entities: u64,
22+
relationships: u64,
23+
duration: Duration,
24+
},
25+
StepFailed {
26+
step_id: String,
27+
error: ConnectorError,
28+
},
29+
SyncCompleted {
30+
sync_id: String,
31+
entities_created: u64,
32+
entities_deleted: u64,
33+
},
34+
SyncFailed {
35+
sync_id: String,
36+
error: String,
37+
},
1838
}

crates/parallax-connect/src/scheduler.rs

Lines changed: 44 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,10 @@ use crate::connector::{topological_waves, Connector, PriorStepData, StepContext}
2323
use crate::error::{ConnectorError, SyncError};
2424
use crate::event::SyncEvent;
2525

26-
type StepOutcome = (String, Result<(StepContext, std::time::Duration), ConnectorError>);
26+
type StepOutcome = (
27+
String,
28+
Result<(StepContext, std::time::Duration), ConnectorError>,
29+
);
2730

2831
/// Output collected from a connector run, ready to be committed.
2932
///
@@ -56,10 +59,13 @@ pub async fn run_connector(
5659
) -> Result<ConnectorOutput, SyncError> {
5760
let connector_id = connector.name().to_owned();
5861

59-
emit(&event_tx, SyncEvent::Started {
60-
connector_id: connector_id.clone(),
61-
sync_id: sync_id.to_owned(),
62-
})
62+
emit(
63+
&event_tx,
64+
SyncEvent::Started {
65+
connector_id: connector_id.clone(),
66+
sync_id: sync_id.to_owned(),
67+
},
68+
)
6369
.await;
6470

6571
let steps = connector.steps();
@@ -72,7 +78,13 @@ pub async fn run_connector(
7278
for wave in waves {
7379
// Emit StepStarted for each step in the wave before spawning.
7480
for step_id in &wave {
75-
emit(&event_tx, SyncEvent::StepStarted { step_id: step_id.clone() }).await;
81+
emit(
82+
&event_tx,
83+
SyncEvent::StepStarted {
84+
step_id: step_id.clone(),
85+
},
86+
)
87+
.await;
7688
}
7789

7890
// Spawn all steps in the wave concurrently.
@@ -111,12 +123,15 @@ pub async fn run_connector(
111123
new_prior.insert(b);
112124
}
113125

114-
emit(&event_tx, SyncEvent::StepCompleted {
115-
step_id: step_id.clone(),
116-
entities: step_entities,
117-
relationships: step_rels,
118-
duration: elapsed,
119-
})
126+
emit(
127+
&event_tx,
128+
SyncEvent::StepCompleted {
129+
step_id: step_id.clone(),
130+
entities: step_entities,
131+
relationships: step_rels,
132+
duration: elapsed,
133+
},
134+
)
120135
.await;
121136

122137
// Collect materialised entities and relationships.
@@ -132,10 +147,13 @@ pub async fn run_connector(
132147
}
133148
Ok((step_id, Err(e))) => {
134149
warn!(step_id = %step_id, error = %e, "Step failed");
135-
emit(&event_tx, SyncEvent::StepFailed {
136-
step_id: step_id.clone(),
137-
error: e,
138-
})
150+
emit(
151+
&event_tx,
152+
SyncEvent::StepFailed {
153+
step_id: step_id.clone(),
154+
error: e,
155+
},
156+
)
139157
.await;
140158
// Continue with other steps in the wave (INV-C06).
141159
}
@@ -173,20 +191,24 @@ async fn emit(tx: &Option<&tokio::sync::mpsc::Sender<SyncEvent>>, event: SyncEve
173191
#[cfg(test)]
174192
mod tests {
175193
use super::*;
176-
use async_trait::async_trait;
177194
use crate::builder::entity;
178195
use crate::connector::{step, StepDefinition};
179196
use crate::error::ConnectorError;
197+
use async_trait::async_trait;
180198

181199
struct MockConnector {
182200
steps: Vec<StepDefinition>,
183201
}
184202

185203
#[async_trait]
186204
impl Connector for MockConnector {
187-
fn name(&self) -> &str { "mock" }
205+
fn name(&self) -> &str {
206+
"mock"
207+
}
188208

189-
fn steps(&self) -> Vec<StepDefinition> { self.steps.clone() }
209+
fn steps(&self) -> Vec<StepDefinition> {
210+
self.steps.clone()
211+
}
190212

191213
async fn execute_step(
192214
&self,
@@ -202,7 +224,9 @@ mod tests {
202224

203225
#[async_trait]
204226
impl Connector for FailingConnector {
205-
fn name(&self) -> &str { "failing" }
227+
fn name(&self) -> &str {
228+
"failing"
229+
}
206230

207231
fn steps(&self) -> Vec<StepDefinition> {
208232
vec![

0 commit comments

Comments
 (0)