Skip to content

Commit b8d7458

Browse files
lutterclaude
andcommitted
graph, store: Add periodic probe for unavailable database recovery
When a database shard is marked unavailable due to connection timeouts, StateTracker now allows one probe request through every few seconds (configurable via GRAPH_STORE_CONNECTION_UNAVAILABLE_RETRY, default 2s) to check if the database has recovered. This prevents the recovery deadlock where mark_available() only fires on successful connection, but no connections are attempted when unavailable. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1 parent 36faa01 commit b8d7458

3 files changed

Lines changed: 62 additions & 3 deletions

File tree

docs/environment-variables.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,11 @@ those.
216216
decisions. Set to `true` to turn simulation on, defaults to `false`
217217
- `GRAPH_STORE_CONNECTION_TIMEOUT`: How long to wait to connect to a
218218
database before assuming the database is down in ms. Defaults to 5000ms.
219+
- `GRAPH_STORE_CONNECTION_UNAVAILABLE_RETRY`: When a database shard is marked
220+
unavailable due to connection timeouts, this controls how often to allow a
221+
single probe request through to check if the database has recovered. Only one
222+
request per interval will attempt a connection; all others fail instantly.
223+
Value is in seconds and defaults to 2s.
219224
- `EXPERIMENTAL_SUBGRAPH_VERSION_SWITCHING_MODE`: default is `instant`, set
220225
to `synced` to only switch a named subgraph to a new deployment once it
221226
has synced, making the new deployment the "Pending" version.

graph/src/env/store.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,12 @@ pub struct EnvVarsStore {
171171
/// Set to 0 to always validate (previous behavior).
172172
/// Set by `GRAPH_STORE_CONNECTION_VALIDATION_IDLE_SECS`. Default is 30 seconds.
173173
pub connection_validation_idle_secs: Duration,
174+
/// When a database shard is marked unavailable due to connection timeouts,
175+
/// this controls how often to allow a single probe request through to check
176+
/// if the database has recovered. Only one request per interval will attempt
177+
/// a connection; all others fail instantly with DatabaseUnavailable.
178+
/// Set by `GRAPH_STORE_CONNECTION_UNAVAILABLE_RETRY`. Default is 2 seconds.
179+
pub connection_unavailable_retry: Duration,
174180
}
175181

176182
// This does not print any values avoid accidentally leaking any sensitive env vars
@@ -234,6 +240,9 @@ impl TryFrom<InnerStore> for EnvVarsStore {
234240
disable_call_cache: x.disable_call_cache,
235241
disable_chain_head_ptr_cache: x.disable_chain_head_ptr_cache,
236242
connection_validation_idle_secs: Duration::from_secs(x.connection_validation_idle_secs),
243+
connection_unavailable_retry: Duration::from_secs(
244+
x.connection_unavailable_retry_in_secs,
245+
),
237246
};
238247
if let Some(timeout) = vars.batch_timeout {
239248
if timeout < 2 * vars.batch_target_duration {
@@ -345,6 +354,8 @@ pub struct InnerStore {
345354
disable_chain_head_ptr_cache: bool,
346355
#[envconfig(from = "GRAPH_STORE_CONNECTION_VALIDATION_IDLE_SECS", default = "30")]
347356
connection_validation_idle_secs: u64,
357+
#[envconfig(from = "GRAPH_STORE_CONNECTION_UNAVAILABLE_RETRY", default = "2")]
358+
connection_unavailable_retry_in_secs: u64,
348359
}
349360

350361
#[derive(Clone, Copy, Debug)]

store/postgres/src/pool/manager.rs

Lines changed: 46 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,10 @@ use graph::slog::Logger;
2020

2121
use std::collections::HashMap;
2222
use std::sync::atomic::AtomicBool;
23+
use std::sync::atomic::AtomicU64;
2324
use std::sync::atomic::Ordering;
2425
use std::sync::Arc;
25-
use std::time::{Duration, Instant};
26+
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
2627

2728
use crate::pool::AsyncPool;
2829

@@ -141,6 +142,9 @@ pub(super) struct StateTracker {
141142
logger: Logger,
142143
available: Arc<AtomicBool>,
143144
ignore_timeout: Arc<AtomicBool>,
145+
/// Timestamp (as millis since UNIX_EPOCH) when we can next probe.
146+
/// 0 means available/no limit.
147+
next_probe_at: Arc<AtomicU64>,
144148
}
145149

146150
impl StateTracker {
@@ -149,14 +153,16 @@ impl StateTracker {
149153
logger,
150154
available: Arc::new(AtomicBool::new(true)),
151155
ignore_timeout: Arc::new(AtomicBool::new(false)),
156+
next_probe_at: Arc::new(AtomicU64::new(0)),
152157
}
153158
}
154159

155160
pub(super) fn mark_available(&self) {
156161
if !self.is_available() {
157-
info!(self.logger, "Conection checkout"; "event" => "available");
162+
info!(self.logger, "Connection checkout"; "event" => "available");
158163
}
159164
self.available.store(true, Ordering::Relaxed);
165+
self.next_probe_at.store(0, Ordering::Relaxed);
160166
}
161167

162168
pub(super) fn mark_unavailable(&self, waited: Duration) {
@@ -171,10 +177,47 @@ impl StateTracker {
171177
}
172178
}
173179
self.available.store(false, Ordering::Relaxed);
180+
181+
// Set next probe time
182+
let retry_interval = ENV_VARS.store.connection_unavailable_retry.as_millis() as u64;
183+
let next_probe = SystemTime::now()
184+
.duration_since(UNIX_EPOCH)
185+
.unwrap()
186+
.as_millis() as u64
187+
+ retry_interval;
188+
self.next_probe_at.store(next_probe, Ordering::Relaxed);
174189
}
175190

176191
pub(super) fn is_available(&self) -> bool {
177-
AtomicBool::load(&self.available, Ordering::Relaxed)
192+
if AtomicBool::load(&self.available, Ordering::Relaxed) {
193+
return true;
194+
}
195+
196+
// Allow one probe through every `connection_unavailable_retry` interval
197+
let next_probe = AtomicU64::load(&self.next_probe_at, Ordering::Relaxed);
198+
let now_millis = SystemTime::now()
199+
.duration_since(UNIX_EPOCH)
200+
.unwrap()
201+
.as_millis() as u64;
202+
203+
if now_millis >= next_probe {
204+
// Try to claim this probe slot with CAS
205+
let retry_interval = ENV_VARS.store.connection_unavailable_retry.as_millis() as u64;
206+
if self
207+
.next_probe_at
208+
.compare_exchange(
209+
next_probe,
210+
now_millis + retry_interval,
211+
Ordering::Relaxed,
212+
Ordering::Relaxed,
213+
)
214+
.is_ok()
215+
{
216+
// We claimed the probe - allow this request through
217+
return true;
218+
}
219+
}
220+
false
178221
}
179222

180223
pub(super) fn timeout_is_ignored(&self) -> bool {

0 commit comments

Comments
 (0)