From 0a094546414e1a4aebbc250ed10fb6658ca654b6 Mon Sep 17 00:00:00 2001 From: iHsin Date: Fri, 26 Jun 2026 03:09:47 +0800 Subject: [PATCH 1/2] fix(wind-core): reap idle half-closed relays MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `copy_io` delegates to `copy_bidirectional`, which only returns once BOTH directions reach EOF. When the outbound peer half-closes — e.g. an origin server FINs after its response while the downstream TUIC client leaves its upload half of the QUIC bi-stream open — the inbound→outbound direction never EOFs, so the outbound TCP socket sits in CLOSE_WAIT and the relay task hangs for the entire lifetime of the long-lived QUIC connection. Proxied TCP connections then only get cleaned up when the connection itself tears down (client disconnect or server kill). #49 reaps connection children on connection teardown only, not per stream. Arm an idle reaper once the outbound peer closes (detected when `copy_bidirectional` shuts down the inbound writer): after RELAY_HALF_CLOSE_TIMEOUT (30s) with no traffic the half-open relay is torn down, dropping both streams and closing the lingering sockets. Any byte moved resets the window, so a slow-but-live transfer is never cut off; a fully-open idle tunnel (keep-alive / long-poll) is never reaped because the timer only arms after the outbound side has closed. Co-Authored-By: Claude Opus 4.8 --- crates/wind-core/src/io.rs | 293 +++++++++++++++++++++++++++++++++++-- 1 file changed, 281 insertions(+), 12 deletions(-) diff --git a/crates/wind-core/src/io.rs b/crates/wind-core/src/io.rs index 71ff06e..4b43f70 100644 --- a/crates/wind-core/src/io.rs +++ b/crates/wind-core/src/io.rs @@ -1,4 +1,14 @@ -use tokio::io::{AsyncRead, AsyncWrite}; +use std::{ + pin::Pin, + sync::{ + Arc, + atomic::{AtomicBool, AtomicU64, Ordering}, + }, + task::{Context, Poll}, + time::Duration, +}; + +use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; /// Per-direction copy buffer size used by [`copy_io`]. /// @@ -11,25 +21,205 @@ use tokio::io::{AsyncRead, AsyncWrite}; /// stream receive windows. pub const RELAY_BUF_SIZE: usize = 64 * 1024; -/// Bidirectionally relay bytes between two duplex streams until BOTH sides -/// have closed. +/// How long a *half-closed* relay may sit idle before it is torn down. +/// +/// `copy_bidirectional` only returns once BOTH directions reach EOF. When the +/// outbound peer half-closes — e.g. the origin server sends its response and +/// FINs while the downstream TUIC client leaves its upload half of the QUIC +/// bi-stream open — the inbound→outbound direction never EOFs, so the outbound +/// TCP socket would sit in `CLOSE_WAIT` for the entire (long-lived) QUIC +/// connection. That is the "proxied TCP connections never terminate until the +/// server is killed" symptom. +/// +/// Once the outbound side has closed, [`copy_io`] gives the surviving direction +/// this long to finish. Any byte moved resets the window, so a slow-but-live +/// transfer is never cut off; only a genuinely idle half-open relay is reaped. +pub const RELAY_HALF_CLOSE_TIMEOUT: Duration = Duration::from_secs(30); + +/// Shared, per-relay progress state read by the half-close reaper. +#[derive(Default)] +struct RelayMeters { + /// Bytes read from the inbound side `a` (inbound→outbound traffic). + a2b: AtomicU64, + /// Bytes read from the outbound side `b` (outbound→inbound traffic). + b2a: AtomicU64, + /// Set once the inbound writer is shut down — which `copy_bidirectional` + /// only does after the *outbound* reader hit EOF, i.e. the outbound peer + /// closed. Until this flips, the reaper stays disarmed (a fully-open but + /// idle tunnel — keep-alive, long-poll — must never be reaped). + half_closed: AtomicBool, +} + +impl RelayMeters { + /// Monotonic "bytes moved in either direction" counter; a stalled relay is + /// one whose `activity()` does not change across a timeout window. + fn activity(&self) -> u64 { + self.a2b.load(Ordering::Relaxed).wrapping_add(self.b2a.load(Ordering::Relaxed)) + } +} + +/// One side of a relay, wrapping the real stream to (1) count throughput per +/// direction and (2) observe the half-close that arms the idle reaper. +struct Tracked<'s, S: ?Sized> { + inner: &'s mut S, + meters: Arc, + /// `true` for the inbound stream `a`: its reads are inbound→outbound bytes, + /// and a shutdown of its writer signals the outbound peer closed. + is_inbound: bool, +} + +impl AsyncRead for Tracked<'_, S> { + fn poll_read(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>) -> Poll> { + let this = self.get_mut(); + let before = buf.filled().len(); + let res = Pin::new(&mut *this.inner).poll_read(cx, buf); + if let Poll::Ready(Ok(())) = &res { + let n = (buf.filled().len() - before) as u64; + if n > 0 { + let counter = if this.is_inbound { &this.meters.a2b } else { &this.meters.b2a }; + counter.fetch_add(n, Ordering::Relaxed); + } + } + res + } +} + +impl AsyncWrite for Tracked<'_, S> { + fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll> { + let this = self.get_mut(); + Pin::new(&mut *this.inner).poll_write(cx, buf) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.get_mut(); + Pin::new(&mut *this.inner).poll_flush(cx) + } + + fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.get_mut(); + // `copy_bidirectional` shuts down a writer when the OPPOSITE reader hits + // EOF. Shutting down the inbound writer therefore means the outbound peer + // closed: arm the reaper so the still-open inbound→outbound direction can + // no longer linger forever. The outbound writer's shutdown (the inbound + // peer closed first) is the benign case `copy_bidirectional` already + // handles — leave the reaper disarmed there so a slow outbound response is + // not cut short. + if this.is_inbound { + this.meters.half_closed.store(true, Ordering::Release); + } + Pin::new(&mut *this.inner).poll_shutdown(cx) + } + + fn poll_write_vectored( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + bufs: &[std::io::IoSlice<'_>], + ) -> Poll> { + let this = self.get_mut(); + Pin::new(&mut *this.inner).poll_write_vectored(cx, bufs) + } + + fn is_write_vectored(&self) -> bool { + self.inner.is_write_vectored() + } +} + +/// Bidirectionally relay bytes between two duplex streams. /// -/// Delegates to [`tokio::io::copy_bidirectional_with_sizes`] (with +/// By convention `a` is the **inbound** stream (closer to the originating +/// client) and `b` the **outbound** stream (closer to the target). +/// +/// The hot path delegates to [`tokio::io::copy_bidirectional_with_sizes`] (with /// [`RELAY_BUF_SIZE`] per direction), which correctly handles half-close: when /// one direction sees EOF, it calls `shutdown()` on the opposite writer and -/// continues pumping the remaining direction. The previous hand-rolled -/// implementation broke out of the outer loop on the FIRST EOF, dropping any -/// in-flight bytes flowing the other way — a common problem for HTTP, where a -/// client sends its request and FINs while the server is still streaming the -/// response. +/// continues pumping the remaining direction. (A previous hand-rolled +/// implementation broke out of the loop on the FIRST EOF, dropping in-flight +/// bytes flowing the other way — truncating responses for HTTP clients that FIN +/// after their request while the server is still streaming.) +/// +/// On top of that, once the **outbound** peer closes, the surviving +/// inbound→outbound direction is bounded by [`RELAY_HALF_CLOSE_TIMEOUT`]: if it +/// moves no bytes for that long it is torn down, so a half-open connection the +/// downstream client never closes can no longer pin the outbound socket in +/// `CLOSE_WAIT` for the life of the QUIC connection. pub async fn copy_io(a: &mut A, b: &mut B) -> (usize, usize, Option) where A: AsyncRead + AsyncWrite + Unpin + ?Sized, B: AsyncRead + AsyncWrite + Unpin + ?Sized, { - match tokio::io::copy_bidirectional_with_sizes(a, b, RELAY_BUF_SIZE, RELAY_BUF_SIZE).await { - Ok((a2b, b2a)) => (a2b as usize, b2a as usize, None), - Err(e) => (0, 0, Some(e)), + copy_io_with_timeout(a, b, RELAY_HALF_CLOSE_TIMEOUT).await +} + +/// [`copy_io`] with an injectable half-close timeout (kept private so tests can +/// exercise the reaper without waiting the production grace period). +async fn copy_io_with_timeout( + a: &mut A, + b: &mut B, + half_close_timeout: Duration, +) -> (usize, usize, Option) +where + A: AsyncRead + AsyncWrite + Unpin + ?Sized, + B: AsyncRead + AsyncWrite + Unpin + ?Sized, +{ + let meters = Arc::new(RelayMeters::default()); + let mut ta = Tracked { + inner: a, + meters: meters.clone(), + is_inbound: true, + }; + let mut tb = Tracked { + inner: b, + meters: meters.clone(), + is_inbound: false, + }; + + let relay = tokio::io::copy_bidirectional_with_sizes(&mut ta, &mut tb, RELAY_BUF_SIZE, RELAY_BUF_SIZE); + tokio::pin!(relay); + + tokio::select! { + res = &mut relay => match res { + Ok((a2b, b2a)) => (a2b as usize, b2a as usize, None), + Err(e) => ( + meters.a2b.load(Ordering::Relaxed) as usize, + meters.b2a.load(Ordering::Relaxed) as usize, + Some(e), + ), + }, + // The relay went half-closed (outbound peer gone) and then idle past the + // grace period: the surviving direction is a dead half-open connection. + // Returning drops both `Tracked`s — and with them the inner streams — + // closing the lingering sockets instead of leaking them. + () = reap_when_half_open(&meters, half_close_timeout) => ( + meters.a2b.load(Ordering::Relaxed) as usize, + meters.b2a.load(Ordering::Relaxed) as usize, + None, + ), + } +} + +/// Resolve once the relay has been half-closed AND has then moved no bytes for +/// `idle_timeout`. Any activity, or a relay that has not (yet) half-closed, +/// resets the window — so an active transfer is never interrupted and a +/// fully-open idle tunnel is never reaped. +async fn reap_when_half_open(meters: &RelayMeters, idle_timeout: Duration) { + // Sample several times per window so reaping lands within ~`idle_timeout` of + // going quiet rather than up to 2× it, while staying cheap for the 30 s + // production value (a 6 s tick). + let poll = (idle_timeout / 5).clamp(Duration::from_millis(50), Duration::from_secs(5)); + let mut last_activity = meters.activity(); + let mut idle_for = Duration::ZERO; + loop { + tokio::time::sleep(poll).await; + let activity = meters.activity(); + if activity != last_activity || !meters.half_closed.load(Ordering::Acquire) { + last_activity = activity; + idle_for = Duration::ZERO; + continue; + } + idle_for += poll; + if idle_for >= idle_timeout { + return; + } } } @@ -98,3 +288,82 @@ pub mod quinn { } } } + +#[cfg(test)] +mod tests { + use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _}; + + use super::*; + + /// A clean full-duplex exchange where both peers close completes promptly + /// and reports the bytes relayed in each direction. The reaper must not fire + /// on this path. + #[tokio::test] + async fn clean_full_close_completes() { + let (mut a, mut a_peer) = tokio::io::duplex(1024); + let (mut b, mut b_peer) = tokio::io::duplex(1024); + + let relay = tokio::spawn(async move { copy_io_with_timeout(&mut a, &mut b, Duration::from_secs(30)).await }); + + // inbound → outbound: a's peer sends + closes, b's peer drains to EOF. + a_peer.write_all(b"ping").await.unwrap(); + a_peer.shutdown().await.unwrap(); + let mut forward = Vec::new(); + b_peer.read_to_end(&mut forward).await.unwrap(); + assert_eq!(forward, b"ping"); + + // outbound → inbound: b's peer replies + closes, a's peer drains to EOF. + b_peer.write_all(b"pong!").await.unwrap(); + b_peer.shutdown().await.unwrap(); + let mut back = Vec::new(); + a_peer.read_to_end(&mut back).await.unwrap(); + assert_eq!(back, b"pong!"); + + let (a2b, b2a, err) = relay.await.unwrap(); + assert!(err.is_none(), "clean close should not surface an error: {err:?}"); + assert_eq!(a2b, 4); + assert_eq!(b2a, 5); + } + + /// The regression: the outbound peer closes, the inbound peer leaves its + /// upload half open and silent. `copy_bidirectional` alone would hang here + /// forever (leaking the outbound socket); the reaper must tear it down once + /// the half-open relay has been idle past the timeout. + #[tokio::test] + async fn half_open_idle_relay_is_reaped() { + let (mut a, _a_peer) = tokio::io::duplex(1024); + let (mut b, b_peer) = tokio::io::duplex(1024); + + // Outbound closes immediately: dropping b's peer makes b read EOF. + drop(b_peer); + + // `_a_peer` stays alive and silent, so the inbound→outbound direction + // never EOFs. Without the reaper copy_io would never return. + let result = tokio::time::timeout( + Duration::from_secs(5), + copy_io_with_timeout(&mut a, &mut b, Duration::from_millis(150)), + ) + .await; + + let (_a2b, _b2a, err) = result.expect("half-open relay was not reaped — copy_io hung"); + assert!(err.is_none(), "reaping a half-open relay is not an error: {err:?}"); + } + + /// A fully-open but idle tunnel (neither side has closed) must NOT be reaped: + /// the half-close timer only arms once the outbound peer is gone. + #[tokio::test] + async fn fully_open_idle_relay_is_not_reaped() { + let (mut a, _a_peer) = tokio::io::duplex(1024); + let (mut b, _b_peer) = tokio::io::duplex(1024); + + // Both peers stay alive and silent — this models an idle keep-alive + // connection. The relay must still be running after several timeouts. + let result = tokio::time::timeout( + Duration::from_millis(600), + copy_io_with_timeout(&mut a, &mut b, Duration::from_millis(100)), + ) + .await; + + assert!(result.is_err(), "an idle but fully-open relay must not be reaped"); + } +} From 9e8dbcb86dcfe3e424f494e753bc4620522a311d Mon Sep 17 00:00:00 2001 From: iHsin Date: Fri, 26 Jun 2026 03:13:53 +0800 Subject: [PATCH 2/2] style(wind-core): apply nightly rustfmt to half-close reaper MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Wrap `RelayMeters::activity` and the reaper match per the repo's nightly rustfmt config (`wrap_comments`, `imports_granularity`, …). No behavior change. Co-Authored-By: Claude Opus 4.8 --- crates/wind-core/src/io.rs | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/crates/wind-core/src/io.rs b/crates/wind-core/src/io.rs index 4b43f70..0b052a4 100644 --- a/crates/wind-core/src/io.rs +++ b/crates/wind-core/src/io.rs @@ -54,7 +54,9 @@ impl RelayMeters { /// Monotonic "bytes moved in either direction" counter; a stalled relay is /// one whose `activity()` does not change across a timeout window. fn activity(&self) -> u64 { - self.a2b.load(Ordering::Relaxed).wrapping_add(self.b2a.load(Ordering::Relaxed)) + self.a2b + .load(Ordering::Relaxed) + .wrapping_add(self.b2a.load(Ordering::Relaxed)) } } @@ -296,8 +298,8 @@ mod tests { use super::*; /// A clean full-duplex exchange where both peers close completes promptly - /// and reports the bytes relayed in each direction. The reaper must not fire - /// on this path. + /// and reports the bytes relayed in each direction. The reaper must not + /// fire on this path. #[tokio::test] async fn clean_full_close_completes() { let (mut a, mut a_peer) = tokio::io::duplex(1024); @@ -349,8 +351,8 @@ mod tests { assert!(err.is_none(), "reaping a half-open relay is not an error: {err:?}"); } - /// A fully-open but idle tunnel (neither side has closed) must NOT be reaped: - /// the half-close timer only arms once the outbound peer is gone. + /// A fully-open but idle tunnel (neither side has closed) must NOT be + /// reaped: the half-close timer only arms once the outbound peer is gone. #[tokio::test] async fn fully_open_idle_relay_is_not_reaped() { let (mut a, _a_peer) = tokio::io::duplex(1024);