Skip to content

Commit 399ef9a

Browse files
mzrmeta-codesync[bot]
authored andcommitted
wireproto: per-handler idle watchdog to unblock H2 migration
Summary: Adds a per-handler idle watchdog to the wireproto websocket-upgrade path so that handlers stuck on a wedged HTTP/2 stream get killed (and the Arcs / response buffers they hold get released) instead of accumulating indefinitely. The watchdog is gated behind `scm/mononoke:wireproto_idle_kill_seconds` (defaults to 0 = disabled until intentionally rolled out, per `fbcode/eden/.llms/rules/rust_unwrap_safety.md`). This is intended to unblock re-enabling http/2 between revproxy and mononoke without waiting on full wireproto deprecation. ## Why the leak existed (S530959) Each wireproto session in `slapi_server/repo_listener` spawns a `fwd` task that drains stdout/stderr/keepalive into a `WireprotoSink<W>` where `W` is the writer half of the HTTP body returned from `hyper::upgrade::on(...)`. The session tear-down chain is strictly serial: ``` 1. request_handler returns (forward(...) errors when its stdout mpsc::Sender errors) 2. keep_alive.abort() 3. join_handle.await (waits for fwd to drain + flush + close wr) ``` Step 1 fires only when the mpsc::Sender to `stdout` returns Err, which only happens once the receiver inside `fwd` is dropped, which only happens once `fwd` exits, which only happens once `wr.poll_*` returns Ready (either Ok or Err). Under HTTP/1.1, this is fine: peer disconnect closes TCP, the SSL stream sees FIN, `wr.poll_*` returns Err on the next poll, the chain unwinds in milliseconds. Under HTTP/2 extended CONNECT (D71055412), one wireproto websocket is a single H2 stream multiplexed onto a long-lived H2 connection (with proxygen pool config `keep_alive_timeout_ms: 900000` / `max_pooled_conns_per_server: 10` post-D70000450). When a peer stops draining a stream without sending RST_STREAM — common when the connection is being kept open for other streams — `H2Upgraded::poll_ready` and `poll_flush` can pend on the H2 flow-control window indefinitely. None of the existing layers will ever return Err; `request_handler` keeps holding `Arc<Mononoke<Repo>>`, `RepoClient` (repo + push_redirector_args + session + logging), the boxed `HgProtoHandler::outstream`, and any in-flight response payload (`getbundle` responses can be megabytes per session). This was the leak shape behind S530959 ("memory is being held in wireproto codepath" after H2 enablement in D70000450) and the reason both H2 *and* connection pooling for the mononoke backend pool have been kept off in revproxy ever since (see D86877171: "removes selective http/2 disablement for mononoke (no longer used as http/2 has been rolled back everywhere)"). The S530805 sapling-404 incident was a separate proxygen-side bug (D74514382 fixed it), but the memory pressure is what's actually keeping H2 disabled today. ## How this fixes it `WireprotoSinkData::last_successful_io` is already updated on every successful poll on the sink (via `WireprotoSink::peek_io`). Until now nothing read it outside the failure-scuba path. This diff: 1. Shares `WireprotoSinkData` between the `fwd` task's `WireprotoSink` and a new `wireproto_idle_watchdog` future in `handle_wireproto`, via `Arc<Mutex<WireprotoSinkData>>`. The mutex is uncontended in steady state (one writer, one periodic reader) and is never held across an `.await`. 2. Races `request_handler` against the watchdog via `tokio::select!`. If the watchdog wins (no successful sink poll for `wireproto_idle_kill_seconds`): - `request_handler` is dropped, releasing the Arc chain immediately. - `join_handle.abort()` is called explicitly so the `await` below doesn't itself hang against the same wedged sink, and so `wr` (the upgraded H2 IO) is dropped promptly. 3. Bumps `STATS::wireproto_idle_killed` whenever the watchdog fires, so we have a leak-detection signal during the rollout. The keepalive task writes an empty `Bytes` to the sink every 5 seconds, so any healthy session — even one whose protocol layer is doing slow CPU work — refreshes `last_successful_io` continuously. Only an actually-wedged sink fails to update. ## Why this only affects wireproto `handle_wireproto` is reached exclusively from `MononokeHttpService::handle_websocket_request`, which is only called when `is_websocket_req(...)` returns true — i.e., the request is an HTTP/1.1 `Upgrade: websocket` or HTTP/2 extended CONNECT (`:protocol = websocket`). Every other path through `MononokeHttpService::handle` (SLAPI, EdenAPI, `/control`, `/health_check`, `/netspeedtest`) returns before reaching the wireproto upgrade and is unaffected by this change. There is no global timeout here that could trim long-running edenapi streams, and no new behavior on the H2 connection itself. ## Configuration / rollout - New JK: `scm/mononoke:wireproto_idle_kill_seconds` (i64). `<= 0` ⇒ watchdog inert. Suggested production value: 120 (wireproto is interactive; 2 minutes of zero successful sink polls means wedged). - Default in `just_knobs.json` should be 0 so this is a no-op until intentionally enabled. - Killswitch: setting the JK to 0 instantly disables the watchdog without a code push. The `tokio::select!` arm then can never fire (the watchdog stays in `continue` forever). - Fail-safe on JK errors: if the JK system itself returns Err (e.g., during a config push), the watchdog logs once a minute and stays inert rather than killing handlers. ## Wireproto-only memory release When the watchdog fires we drop: - `request_handler` future ⇒ `RepoClient`, `HgProtoHandler::outstream`, all in-flight response Bytes - `Stdio { stdin, stdout, stderr }` ⇒ `mpsc::Sender`s, framed reader/writer halves - After `keep_alive.abort()` + `join_handle.abort()`: the `WireprotoSink<W>` and the `H2Upgraded` (or `SslStream`-backed) `W` it owns So a handler that would have leaked indefinitely instead releases everything within (`POLL_INTERVAL` + `wireproto_idle_kill_seconds`) of going idle. Reviewed By: gustavoavena, YousefSalama Differential Revision: D104390582 fbshipit-source-id: 5a177db7826f6b19c4220a6652fe2f4461d8cbfd
1 parent 8ffb844 commit 399ef9a

2 files changed

Lines changed: 212 additions & 42 deletions

File tree

eden/mononoke/servers/slapi/slapi_server/repo_listener/src/connection_acceptor.rs

Lines changed: 188 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -11,15 +11,18 @@ use std::io::Write;
1111
use std::net::SocketAddr;
1212
use std::path::PathBuf;
1313
use std::sync::Arc;
14+
use std::sync::Mutex;
1415
use std::sync::atomic::AtomicBool;
1516
use std::sync::atomic::AtomicUsize;
1617
use std::sync::atomic::Ordering;
1718
use std::time::Duration;
1819

1920
use anyhow::Context;
2021
use anyhow::Result;
22+
use anyhow::anyhow;
2123
use bytes::Bytes;
2224
use cached_config::ConfigStore;
25+
use chrono::Utc;
2326
use connection_security_checker::ConnectionSecurityChecker;
2427
use fbinit::FacebookInit;
2528
use futures::channel::mpsc;
@@ -82,11 +85,16 @@ use crate::errors::ErrorKind;
8285
use crate::http_service::MononokeHttpService;
8386
use crate::request_handler::request_handler;
8487
use crate::wireproto_sink::WireprotoSink;
88+
use crate::wireproto_sink::WireprotoSinkData;
8589

8690
define_stats! {
8791
prefix = "mononoke.connection_acceptor";
8892
http_accepted: timeseries(Sum),
8993
open_connections: singleton_counter(),
94+
// Number of wireproto handlers killed by the per-handler idle watchdog
95+
// (see `wireproto_idle_watchdog`). Non-zero means we caught a wedged
96+
// handler that would otherwise have leaked Arcs / response buffers.
97+
wireproto_idle_killed: timeseries(Sum),
9098
}
9199

92100
pub trait MononokeStream: AsyncRead + AsyncWrite + Unpin + Send + Sync + 'static {}
@@ -359,6 +367,7 @@ where
359367
stderr,
360368
keep_alive,
361369
join_handle,
370+
sink_data,
362371
} = ChannelConn::setup(framed, conn.clone(), metadata.clone());
363372

364373
if metadata.client_debug() {
@@ -372,9 +381,7 @@ where
372381
stderr,
373382
};
374383

375-
// Don't immediately return error here, we need to cleanup our
376-
// handlers like keep alive, otherwise they will run forever.
377-
let result = request_handler(
384+
let request_fut = request_handler(
378385
conn.pending.acceptor.fb,
379386
reponame,
380387
Arc::clone(&conn.pending.acceptor.mononoke),
@@ -385,21 +392,153 @@ where
385392
conn.pending.acceptor.scribe.clone(),
386393
conn.pending.acceptor.qps.clone(),
387394
conn.pending.acceptor.readonly,
388-
)
389-
.await
390-
.context("Failed to execute request_handler");
395+
);
396+
397+
// Race `request_handler` against a per-handler idle watchdog. The watchdog
398+
// observes the WireprotoSink's `last_successful_io` timestamp; if no write
399+
// has succeeded for the configured threshold, the handler is presumed
400+
// wedged (stuck on a half-dead H2 stream's flow-control window, typically)
401+
// and we kill it. Don't call `?` here — we still need to run cleanup.
402+
let (watchdog_fired, result) = tokio::select! {
403+
res = request_fut => {
404+
(false, res.context("Failed to execute request_handler"))
405+
}
406+
_ = wireproto_idle_watchdog(sink_data) => {
407+
STATS::wireproto_idle_killed.add_value(1);
408+
(true, Err(anyhow!("Wireproto handler killed by idle watchdog")))
409+
}
410+
};
391411

392412
// Shutdown our keepalive handler
393413
keep_alive.abort();
394414

395-
join_handle
396-
.await
397-
.context("Failed to join ChannelConn")?
398-
.context("Failed to close ChannelConn")?;
415+
if watchdog_fired {
416+
// The fwd task is almost certainly stuck on the same WireprotoSink
417+
// poll that the watchdog flagged. Aborting it ensures the
418+
// `join_handle.await` below doesn't itself hang and that `wr` (and
419+
// the upgraded H2 IO it owns) is dropped promptly so memory is
420+
// released.
421+
join_handle.abort();
422+
let _ = join_handle.await;
423+
} else {
424+
join_handle
425+
.await
426+
.context("Failed to join ChannelConn")?
427+
.context("Failed to close ChannelConn")?;
428+
}
399429

400430
result
401431
}
402432

433+
/// Per-handler idle watchdog for wireproto sessions.
434+
///
435+
/// ## Why this exists
436+
///
437+
/// Each wireproto session spawns a `fwd` task that drains stdout/stderr/
438+
/// keepalive streams into a `WireprotoSink<W>`, where `W` is the writer half
439+
/// of the upgraded HTTP body. Under HTTP/1.1, when the client disconnects,
440+
/// TCP closes and `W::poll_*` returns `Err` immediately — the existing
441+
/// cleanup chain (drop senders → fwd ends → request_handler unwinds) runs to
442+
/// completion in milliseconds.
443+
///
444+
/// Under HTTP/2 extended CONNECT (RFC 8441), a wireproto stream lives inside
445+
/// a multiplexed H2 connection that may persist for the pool's
446+
/// `keep_alive_timeout_ms` (15 minutes in production). If the peer stops
447+
/// draining a stream without sending RST_STREAM — e.g., a misbehaving
448+
/// proxygen pool that's holding the connection open for other streams —
449+
/// `W::poll_ready` / `W::poll_flush` can pend on the H2 flow-control window
450+
/// indefinitely. Nothing in the existing code path will ever return `Err`,
451+
/// so request_handler stays alive holding `Arc<Mononoke<Repo>>`,
452+
/// `RepoClient`, the boxed `HgProtoHandler::outstream`, and any in-flight
453+
/// response payload (`getbundle` responses can be megabytes). This is the
454+
/// memory-leak shape observed in S530959 ("memory is being held in wireproto
455+
/// codepath" after the H2 enablement in D70000450).
456+
///
457+
/// ## What this does
458+
///
459+
/// Polls the shared `WireprotoSinkData::last_successful_io` timestamp. If the
460+
/// configured threshold has elapsed since the last successful write (or
461+
/// poll-ready) on the sink, returns — which fires the `tokio::select!` arm
462+
/// in `handle_wireproto` and cancels `request_handler`, releasing all the
463+
/// held Arcs and buffers. The keepalive task writes to the sink every 5s, so
464+
/// a healthy session — even one whose protocol layer is doing CPU work —
465+
/// always has a fresh `last_successful_io`. Only an actually-wedged sink
466+
/// fails to update.
467+
///
468+
/// ## Scope
469+
///
470+
/// Only invoked from `handle_wireproto`, which is reached exclusively from
471+
/// `MononokeHttpService::handle_websocket_request` when `is_websocket_req`
472+
/// returns true. SLAPI/EdenAPI/control/health/netspeedtest paths bypass this
473+
/// entirely — none of them go through the wireproto upgrade. Adding an idle
474+
/// kill here therefore cannot affect non-wireproto traffic.
475+
///
476+
/// ## Configuration
477+
///
478+
/// Gated by `scm/mononoke:wireproto_idle_kill_seconds` (i64). A value <= 0
479+
/// disables the watchdog. Defined in just_knobs.json so the default lives in
480+
/// configuration, not in code (per `fbcode/eden/.llms/rules/rust_unwrap_safety.md`).
481+
async fn wireproto_idle_watchdog(data: Arc<Mutex<WireprotoSinkData>>) {
482+
// Granular enough to react quickly; coarse enough that the JK lookup is
483+
// negligible.
484+
const POLL_INTERVAL: Duration = Duration::from_secs(10);
485+
// Back-off when the JK system itself is unhappy: don't spam logs more than
486+
// once a minute, and keep the watchdog disabled (i.e., return Pending) in
487+
// the meantime — failing closed here would kill every wireproto handler
488+
// when the JK fetcher hiccups, which is precisely the kind of blast
489+
// radius this watchdog is meant to prevent.
490+
const JK_ERROR_BACKOFF: Duration = Duration::from_secs(60);
491+
492+
loop {
493+
tokio::time::sleep(POLL_INTERVAL).await;
494+
match check_wireproto_idle(&data) {
495+
Ok(true) => return,
496+
Ok(false) => continue,
497+
Err(e) => {
498+
warn!(
499+
"wireproto_idle_watchdog: JK read failed, watchdog inert: {:#}",
500+
e
501+
);
502+
tokio::time::sleep(JK_ERROR_BACKOFF).await;
503+
}
504+
}
505+
}
506+
}
507+
508+
/// Returns `Ok(true)` iff the watchdog should fire (handler is idle past the
509+
/// configured threshold). Sync; safe to call from the watchdog loop without
510+
/// holding any guard across an await.
511+
fn check_wireproto_idle(data: &Arc<Mutex<WireprotoSinkData>>) -> Result<bool> {
512+
let threshold_secs =
513+
justknobs::get_as::<i64>("scm/mononoke:wireproto_idle_kill_seconds", None)?;
514+
if threshold_secs <= 0 {
515+
// Off switch: not configured for this rollout, or explicitly disabled.
516+
return Ok(false);
517+
}
518+
let threshold_secs = threshold_secs as u64;
519+
520+
// Briefly lock to copy the timestamp; immediately released.
521+
let last = data
522+
.lock()
523+
.expect("WireprotoSinkData lock poisoned")
524+
.last_successful_io;
525+
526+
if let Some(t) = last {
527+
let elapsed = (Utc::now() - t).num_seconds();
528+
if elapsed > 0 && (elapsed as u64) > threshold_secs {
529+
warn!(
530+
"wireproto handler idle for {}s (threshold {}s); aborting",
531+
elapsed, threshold_secs
532+
);
533+
return Ok(true);
534+
}
535+
}
536+
// No I/O recorded yet (handler may still be initializing) — leave it
537+
// alone. If the handler genuinely never writes anything it will be
538+
// bounded by the connection-level lifetime, not by us.
539+
Ok(false)
540+
}
541+
403542
pub struct FramedConn<R, W> {
404543
rd: FramedRead<R, SshDecoder>,
405544
wr: FramedWrite<W, SshEncoder>,
@@ -424,6 +563,11 @@ pub struct ChannelConn {
424563
stderr: mpsc::UnboundedSender<Bytes>,
425564
keep_alive: AbortHandle,
426565
join_handle: JoinHandle<Result<(), io::Error>>,
566+
// Shared with the `WireprotoSink` running inside the `fwd` task. Read by
567+
// `wireproto_idle_watchdog` to detect a wedged writer half (the leak
568+
// failure mode that motivated this; see module-level comment on
569+
// `wireproto_idle_watchdog`).
570+
sink_data: Arc<Mutex<WireprotoSinkData>>,
427571
}
428572

429573
impl ChannelConn {
@@ -446,6 +590,12 @@ impl ChannelConn {
446590
}
447591
}));
448592

593+
// Shared between the `fwd` task's `WireprotoSink` and the idle
594+
// watchdog launched in `handle_wireproto`. The Mutex is only ever
595+
// locked briefly to read/write timestamp+counter fields and is never
596+
// held across an `.await`.
597+
let sink_data = Arc::new(Mutex::new(WireprotoSinkData::new()));
598+
449599
let (stdout, stderr, keep_alive, join_handle) = {
450600
let (otx, orx) = mpsc::channel(1);
451601
let (etx, erx) = mpsc::unbounded();
@@ -461,9 +611,10 @@ impl ChannelConn {
461611
.map_ok(|v| SshMsg::new(IoStream::Stderr, v));
462612
let krx = krx.map_ok(|v| SshMsg::new(IoStream::Stderr, v));
463613

614+
let fwd_sink_data = sink_data.clone();
464615
// Glue them together
465616
let fwd = async move {
466-
let wr = WireprotoSink::new(wr);
617+
let wr = WireprotoSink::with_shared_data(wr, fwd_sink_data);
467618

468619
futures::pin_mut!(wr);
469620

@@ -473,27 +624,33 @@ impl ChannelConn {
473624
.await;
474625

475626
if let Err(e) = res.as_ref() {
476-
let projected_wr = wr.as_mut().project();
477-
let data = projected_wr.data;
478-
479627
let mut scuba = conn.pending.acceptor.wireproto_scuba.clone();
480628
scuba.add_metadata(&metadata);
481-
scuba.add_opt(
482-
"last_successful_flush",
483-
data.last_successful_flush.map(|dt| dt.timestamp()),
484-
);
485-
scuba.add_opt(
486-
"last_successful_io",
487-
data.last_successful_io.map(|dt| dt.timestamp()),
488-
);
489-
scuba.add_opt(
490-
"last_failed_io",
491-
data.last_failed_io.map(|dt| dt.timestamp()),
492-
);
493-
scuba.add("stdout_bytes", data.stdout.bytes);
494-
scuba.add("stdout_messages", data.stdout.messages);
495-
scuba.add("stderr_bytes", data.stderr.bytes);
496-
scuba.add("stderr_messages", data.stderr.messages);
629+
{
630+
// Scope the guard tightly: WireprotoSinkData is now
631+
// shared, and we must drop it before `.await` below.
632+
let projected_wr = wr.as_mut().project();
633+
let data = projected_wr
634+
.data
635+
.lock()
636+
.expect("WireprotoSinkData lock poisoned");
637+
scuba.add_opt(
638+
"last_successful_flush",
639+
data.last_successful_flush.map(|dt| dt.timestamp()),
640+
);
641+
scuba.add_opt(
642+
"last_successful_io",
643+
data.last_successful_io.map(|dt| dt.timestamp()),
644+
);
645+
scuba.add_opt(
646+
"last_failed_io",
647+
data.last_failed_io.map(|dt| dt.timestamp()),
648+
);
649+
scuba.add("stdout_bytes", data.stdout.bytes);
650+
scuba.add("stdout_messages", data.stdout.messages);
651+
scuba.add("stderr_bytes", data.stderr.bytes);
652+
scuba.add("stderr_messages", data.stderr.messages);
653+
}
497654
scuba.log_with_msg("Forwarding failed", format!("{:#}", e));
498655
}
499656

@@ -540,6 +697,7 @@ impl ChannelConn {
540697
stderr,
541698
keep_alive,
542699
join_handle,
700+
sink_data,
543701
}
544702
}
545703
}

eden/mononoke/servers/slapi/slapi_server/repo_listener/src/wireproto_sink.rs

Lines changed: 24 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
*/
77

88
use std::pin::Pin;
9+
use std::sync::Arc;
10+
use std::sync::Mutex;
911

1012
use chrono::DateTime;
1113
use chrono::Utc;
@@ -20,15 +22,15 @@ use sshrelay::SshMsg;
2022
pub struct WireprotoSink<T> {
2123
#[pin]
2224
inner: T,
23-
pub data: WireprotoSinkData,
25+
// Shared with the wireproto idle watchdog in
26+
// connection_acceptor::handle_wireproto. The lock is held only briefly to
27+
// record stat updates / read timestamps — never across an `.await`.
28+
pub data: Arc<Mutex<WireprotoSinkData>>,
2429
}
2530

2631
impl<T> WireprotoSink<T> {
27-
pub fn new(inner: T) -> Self {
28-
Self {
29-
inner,
30-
data: WireprotoSinkData::new(),
31-
}
32+
pub fn with_shared_data(inner: T, data: Arc<Mutex<WireprotoSinkData>>) -> Self {
33+
Self { inner, data }
3234
}
3335
}
3436

@@ -41,28 +43,38 @@ where
4143
fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
4244
let this = self.project();
4345
let ret = this.inner.poll_ready(cx);
44-
this.data.peek_io(&ret);
46+
this.data
47+
.lock()
48+
.expect("WireprotoSinkData lock poisoned")
49+
.peek_io(&ret);
4550
ret
4651
}
4752

4853
fn start_send(self: Pin<&mut Self>, item: SshMsg) -> Result<(), Self::Error> {
4954
let this = self.project();
50-
this.data.peek_message(&item);
55+
this.data
56+
.lock()
57+
.expect("WireprotoSinkData lock poisoned")
58+
.peek_message(&item);
5159
this.inner.start_send(item)
5260
}
5361

5462
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
5563
let this = self.project();
5664
let ret = this.inner.poll_flush(cx);
57-
this.data.peek_io(&ret);
58-
this.data.peek_flush(&ret);
65+
let mut guard = this.data.lock().expect("WireprotoSinkData lock poisoned");
66+
guard.peek_io(&ret);
67+
guard.peek_flush(&ret);
5968
ret
6069
}
6170

6271
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
6372
let this = self.project();
6473
let ret = this.inner.poll_close(cx);
65-
this.data.peek_io(&ret);
74+
this.data
75+
.lock()
76+
.expect("WireprotoSinkData lock poisoned")
77+
.peek_io(&ret);
6678
ret
6779
}
6880
}
@@ -76,7 +88,7 @@ pub struct WireprotoSinkData {
7688
}
7789

7890
impl WireprotoSinkData {
79-
fn new() -> Self {
91+
pub fn new() -> Self {
8092
Self {
8193
last_successful_flush: None,
8294
last_successful_io: None,

0 commit comments

Comments
 (0)