Skip to content

Commit 280b7c8

Browse files
committed
chore: remove reate limits
1 parent 1157546 commit 280b7c8

4 files changed

Lines changed: 164 additions & 98 deletions

File tree

packages/core/api/status/src/route/actor.rs

Lines changed: 67 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -267,6 +267,17 @@ pub async fn status(
267267
)
268268
.await;
269269

270+
// Unwrap res
271+
match &test_res {
272+
Ok(Ok(())) => {}
273+
Ok(Err(err)) => {
274+
tracing::error!(?err, "status check error");
275+
}
276+
Err(_) => {
277+
tracing::error!("status check timeout");
278+
}
279+
}
280+
270281
// Destroy actor regardless of connection status
271282
{
272283
use actors_api::ActorsDestroyError::*;
@@ -331,15 +342,21 @@ pub async fn status(
331342

332343
#[tracing::instrument]
333344
async fn test_actor_connection(hostname: &str, port: u16, actor_origin: &str) -> GlobalResult<()> {
345+
tracing::info!("starting actor connection test");
346+
334347
// Look up IP for the actor's host
348+
tracing::info!(?hostname, port, "looking up DNS");
335349
let gg_ips = lookup_dns(hostname, port).await?;
350+
tracing::info!(?gg_ips, "DNS lookup complete");
351+
336352
ensure_with!(
337353
!gg_ips.is_empty(),
338354
INTERNAL_STATUS_CHECK_FAILED,
339355
error = format!("no IPs found for host {hostname}")
340356
);
341357

342358
// Test HTTP connectivity
359+
tracing::info!(ip_count = gg_ips.len(), "testing HTTP connectivity to all IPs");
343360
let test_http_res = futures_util::future::join_all(gg_ips.iter().cloned().map(|x| {
344361
test_http(
345362
actor_origin.to_string(),
@@ -348,6 +365,7 @@ async fn test_actor_connection(hostname: &str, port: u16, actor_origin: &str) ->
348365
)
349366
}))
350367
.await;
368+
351369
let failed_tests = gg_ips
352370
.iter()
353371
.zip(test_http_res.iter())
@@ -359,7 +377,14 @@ async fn test_actor_connection(hostname: &str, port: u16, actor_origin: &str) ->
359377
}
360378
})
361379
.collect::<Vec<_>>();
380+
362381
if !failed_tests.is_empty() {
382+
tracing::error!(
383+
failed_count = failed_tests.len(),
384+
total_count = gg_ips.len(),
385+
?failed_tests,
386+
"HTTP connectivity tests failed"
387+
);
363388
bail_with!(
364389
INTERNAL_STATUS_CHECK_FAILED,
365390
error = format!(
@@ -371,14 +396,21 @@ async fn test_actor_connection(hostname: &str, port: u16, actor_origin: &str) ->
371396
);
372397
}
373398

399+
tracing::info!("all HTTP connectivity tests passed");
400+
374401
// Test WebSocket connectivity
402+
tracing::info!("testing WebSocket connectivity");
375403
test_ws(actor_origin).await.map_err(|err| {
404+
tracing::error!(?err, "WebSocket connectivity test failed");
376405
err_code!(
377406
INTERNAL_STATUS_CHECK_FAILED,
378407
error = format!("ws failed: {err:?}")
379408
)
380409
})?;
381410

411+
tracing::info!("WebSocket connectivity test passed");
412+
tracing::info!("actor connection test completed successfully");
413+
382414
Ok(())
383415
}
384416

@@ -410,18 +442,49 @@ async fn test_http(
410442
hostname: String,
411443
addr: SocketAddr,
412444
) -> Result<(), reqwest::Error> {
445+
tracing::info!(?addr, "starting HTTP test");
446+
413447
// Resolve the host to the specific IP addr
414448
let client = reqwest::Client::builder()
415449
.resolve(&hostname, addr)
416450
.build()?;
417451

452+
let url = format!("{actor_origin}/health");
453+
tracing::info!(?url, "sending HTTP request");
454+
418455
// Test HTTP connectivity
419-
client
420-
.get(format!("{actor_origin}/health"))
456+
let response = client
457+
.get(&url)
421458
.send()
422459
.instrument(tracing::info_span!("health_request"))
423-
.await?
424-
.error_for_status()?;
460+
.await?;
461+
462+
let status = response.status();
463+
let headers = response.headers().clone();
464+
465+
// Check status before consuming the response for logging
466+
// This way we get proper reqwest::Error if status is not successful
467+
let response = match response.error_for_status() {
468+
Ok(resp) => resp,
469+
Err(e) => {
470+
// Try to read the body even on error for logging
471+
tracing::error!(?status, ?headers, ?e, "HTTP request returned error status");
472+
return Err(e);
473+
}
474+
};
475+
476+
// Read body (this consumes the response)
477+
let body = response.text().await?;
478+
479+
tracing::info!(
480+
?status,
481+
?headers,
482+
?body,
483+
body_len = body.len(),
484+
"HTTP response received"
485+
);
486+
487+
tracing::info!(?addr, "HTTP test completed successfully");
425488

426489
Ok(())
427490
}

packages/edge/infra/guard/core/src/proxy_service.rs

Lines changed: 93 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -533,38 +533,40 @@ impl ProxyState {
533533
ip_addr: std::net::IpAddr,
534534
actor_id: &Option<Uuid>,
535535
) -> GlobalResult<bool> {
536-
let Some(actor_id) = *actor_id else {
537-
// No rate limiting when actor_id is None
538-
return Ok(true);
539-
};
540-
541-
// Get actor-specific middleware config
542-
let middleware_config = self.get_middleware_config(&actor_id).await?;
543-
544-
let cache_key = (actor_id, ip_addr);
545-
546-
// Get existing limiter or create a new one
547-
let limiter_arc = if let Some(existing_limiter) = self.rate_limiters.get(&cache_key).await {
548-
existing_limiter
549-
} else {
550-
let new_limiter = Arc::new(Mutex::new(RateLimiter::new(
551-
middleware_config.rate_limit.requests,
552-
middleware_config.rate_limit.period,
553-
)));
554-
self.rate_limiters
555-
.insert(cache_key, new_limiter.clone())
556-
.await;
557-
metrics::RATE_LIMITER_COUNT.set(self.rate_limiters.entry_count() as i64);
558-
new_limiter
559-
};
560-
561-
// Try to acquire from the limiter
562-
let result = {
563-
let mut limiter = limiter_arc.lock().await;
564-
limiter.try_acquire()
565-
};
566-
567-
Ok(result)
536+
// let Some(actor_id) = *actor_id else {
537+
// // No rate limiting when actor_id is None
538+
// return Ok(true);
539+
// };
540+
//
541+
// // Get actor-specific middleware config
542+
// let middleware_config = self.get_middleware_config(&actor_id).await?;
543+
//
544+
// let cache_key = (actor_id, ip_addr);
545+
//
546+
// // Get existing limiter or create a new one
547+
// let limiter_arc = if let Some(existing_limiter) = self.rate_limiters.get(&cache_key).await {
548+
// existing_limiter
549+
// } else {
550+
// let new_limiter = Arc::new(Mutex::new(RateLimiter::new(
551+
// middleware_config.rate_limit.requests,
552+
// middleware_config.rate_limit.period,
553+
// )));
554+
// self.rate_limiters
555+
// .insert(cache_key, new_limiter.clone())
556+
// .await;
557+
// metrics::RATE_LIMITER_COUNT.set(self.rate_limiters.entry_count() as i64);
558+
// new_limiter
559+
// };
560+
//
561+
// // Try to acquire from the limiter
562+
// let result = {
563+
// let mut limiter = limiter_arc.lock().await;
564+
// limiter.try_acquire()
565+
// };
566+
//
567+
// Ok(result)
568+
569+
Ok(true)
568570
}
569571

570572
#[tracing::instrument(skip_all)]
@@ -573,53 +575,55 @@ impl ProxyState {
573575
ip_addr: std::net::IpAddr,
574576
actor_id: &Option<Uuid>,
575577
) -> GlobalResult<bool> {
576-
let Some(actor_id) = *actor_id else {
577-
// No in-flight limiting when actor_id is None
578-
return Ok(true);
579-
};
580-
581-
// Get actor-specific middleware config
582-
let middleware_config = self.get_middleware_config(&actor_id).await?;
583-
584-
let cache_key = (actor_id, ip_addr);
585-
586-
// Get existing counter or create a new one
587-
let counter_arc =
588-
if let Some(existing_counter) = self.in_flight_counters.get(&cache_key).await {
589-
existing_counter
590-
} else {
591-
let new_counter = Arc::new(Mutex::new(InFlightCounter::new(
592-
middleware_config.max_in_flight.amount,
593-
)));
594-
self.in_flight_counters
595-
.insert(cache_key, new_counter.clone())
596-
.await;
597-
metrics::IN_FLIGHT_COUNTER_COUNT.set(self.in_flight_counters.entry_count() as i64);
598-
new_counter
599-
};
600-
601-
// Try to acquire from the counter
602-
let result = {
603-
let mut counter = counter_arc.lock().await;
604-
counter.try_acquire()
605-
};
606-
607-
Ok(result)
578+
// let Some(actor_id) = *actor_id else {
579+
// // No in-flight limiting when actor_id is None
580+
// return Ok(true);
581+
// };
582+
//
583+
// // Get actor-specific middleware config
584+
// let middleware_config = self.get_middleware_config(&actor_id).await?;
585+
//
586+
// let cache_key = (actor_id, ip_addr);
587+
//
588+
// // Get existing counter or create a new one
589+
// let counter_arc =
590+
// if let Some(existing_counter) = self.in_flight_counters.get(&cache_key).await {
591+
// existing_counter
592+
// } else {
593+
// let new_counter = Arc::new(Mutex::new(InFlightCounter::new(
594+
// middleware_config.max_in_flight.amount,
595+
// )));
596+
// self.in_flight_counters
597+
// .insert(cache_key, new_counter.clone())
598+
// .await;
599+
// metrics::IN_FLIGHT_COUNTER_COUNT.set(self.in_flight_counters.entry_count() as i64);
600+
// new_counter
601+
// };
602+
//
603+
// // Try to acquire from the counter
604+
// let result = {
605+
// let mut counter = counter_arc.lock().await;
606+
// counter.try_acquire()
607+
// };
608+
//
609+
// Ok(result)
610+
611+
Ok(true)
608612
}
609613

610614
#[tracing::instrument(skip_all)]
611615
async fn release_in_flight(&self, ip_addr: std::net::IpAddr, actor_id: &Option<Uuid>) {
612-
// Skip if actor_id is None (no in-flight tracking)
613-
let actor_id = match actor_id {
614-
Some(id) => *id,
615-
None => return, // No in-flight tracking when actor_id is None
616-
};
617-
618-
let cache_key = (actor_id, ip_addr);
619-
if let Some(counter_arc) = self.in_flight_counters.get(&cache_key).await {
620-
let mut counter = counter_arc.lock().await;
621-
counter.release();
622-
}
616+
// // Skip if actor_id is None (no in-flight tracking)
617+
// let actor_id = match actor_id {
618+
// Some(id) => *id,
619+
// None => return, // No in-flight tracking when actor_id is None
620+
// };
621+
//
622+
// let cache_key = (actor_id, ip_addr);
623+
// if let Some(counter_arc) = self.in_flight_counters.get(&cache_key).await {
624+
// let mut counter = counter_arc.lock().await;
625+
// counter.release();
626+
// }
623627
}
624628
}
625629

@@ -728,19 +732,20 @@ impl ProxyService {
728732
let client_ip = self.remote_addr.ip();
729733

730734
// Apply rate limiting
731-
let res = if !self.state.check_rate_limit(client_ip, &actor_id).await? {
732-
Response::builder()
733-
.status(StatusCode::TOO_MANY_REQUESTS)
734-
.body(ResponseBody::Full(Full::<Bytes>::new(Bytes::new())))
735-
.map_err(Into::into)
736-
}
737-
// Check in-flight limit
738-
else if !self.state.acquire_in_flight(client_ip, &actor_id).await? {
739-
Response::builder()
740-
.status(StatusCode::TOO_MANY_REQUESTS)
741-
.body(ResponseBody::Full(Full::<Bytes>::new(Bytes::new())))
742-
.map_err(Into::into)
743-
} else {
735+
// let res = if !self.state.check_rate_limit(client_ip, &actor_id).await? {
736+
// Response::builder()
737+
// .status(StatusCode::TOO_MANY_REQUESTS)
738+
// .body(ResponseBody::Full(Full::<Bytes>::new(Bytes::new())))
739+
// .map_err(Into::into)
740+
// }
741+
// // Check in-flight limit
742+
// else if !self.state.acquire_in_flight(client_ip, &actor_id).await? {
743+
// Response::builder()
744+
// .status(StatusCode::TOO_MANY_REQUESTS)
745+
// .body(ResponseBody::Full(Full::<Bytes>::new(Bytes::new())))
746+
// .map_err(Into::into)
747+
// } else {
748+
let res = {
744749
// Increment metrics
745750
metrics::PROXY_REQUEST_PENDING.inc();
746751
metrics::PROXY_REQUEST_TOTAL.inc();

packages/edge/services/pegboard/src/keys/env.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -149,8 +149,8 @@ impl TuplePack for ActiveActorKey {
149149

150150
impl<'de> TupleUnpack<'de> for ActiveActorKey {
151151
fn unpack(input: &[u8], tuple_depth: TupleDepth) -> PackResult<(&[u8], Self)> {
152-
let (input, (_, environment_id, _, create_ts, actor_id)) =
153-
<(usize, Uuid, usize, i64, Uuid)>::unpack(input, tuple_depth)?;
152+
let (input, (_, environment_id, _, _, create_ts, actor_id)) =
153+
<(usize, Uuid, usize, usize, i64, Uuid)>::unpack(input, tuple_depth)?;
154154
let v = ActiveActorKey {
155155
environment_id,
156156
create_ts,

packages/edge/services/pegboard/src/ops/actor/list_for_env.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -89,10 +89,9 @@ pub async fn pegboard_actor_list_for_env(
8989
}
9090
}
9191
}
92-
}
92+
} else {
93+
// Old idx with scanning
9394

94-
// Old idx with scanning
95-
if results.len() < input.limit {
9695
let actor_subspace =
9796
keys::subspace().subspace(&keys::env::ActorKey::subspace(input.env_id));
9897
let (start, end) = actor_subspace.range();
@@ -116,7 +115,6 @@ pub async fn pegboard_actor_list_for_env(
116115
// NOTE: Does not have to be serializable because we are listing, stale data does not matter
117116
SNAPSHOT,
118117
);
119-
let mut results = Vec::new();
120118

121119
while let Some(entry) = stream.try_next().await? {
122120
let actor_key = keys::subspace()

0 commit comments

Comments
 (0)