@@ -33,11 +33,14 @@ async fn transform_join_produces_sum_on_both_inputs() {
3333 let runtime = Arc :: new ( TokioAdapter :: new ( ) . unwrap ( ) ) ;
3434 let mut builder = AimDbBuilder :: new ( ) . runtime ( runtime) ;
3535
36+ // SpmcRing inputs (vs SingleLatest) so that values produced before the join
37+ // transform's forwarders subscribe are still buffered and replayed — removes
38+ // a startup race where the test might otherwise need a hand-tuned barrier.
3639 builder. configure :: < ValueA > ( "test::A" , |reg| {
37- reg. buffer ( BufferCfg :: SingleLatest ) ;
40+ reg. buffer ( BufferCfg :: SpmcRing { capacity : 16 } ) ;
3841 } ) ;
3942 builder. configure :: < ValueB > ( "test::B" , |reg| {
40- reg. buffer ( BufferCfg :: SingleLatest ) ;
43+ reg. buffer ( BufferCfg :: SpmcRing { capacity : 16 } ) ;
4144 } ) ;
4245 builder. configure :: < Sum > ( "test::Sum" , |reg| {
4346 reg. buffer ( BufferCfg :: SpmcRing { capacity : 16 } )
@@ -92,3 +95,104 @@ async fn transform_join_produces_sum_on_both_inputs() {
9295 . unwrap ( ) ;
9396 assert_eq ! ( s. 0 , 22 , "expected 2+20=22" ) ;
9497}
98+
99+ /// Stress test for the bounded(64) Tokio fan-in channel: pushes well over 64
100+ /// events through a single-input join while the handler intentionally yields
101+ /// between receives. If backpressure is wired correctly, this completes
102+ /// without deadlock and every produced value is observed in order.
103+ #[ tokio:: test]
104+ async fn transform_join_bounded_fanin_backpressure_no_deadlock ( ) {
105+ const N : u32 = 200 ;
106+ const SENTINEL : u32 = u32:: MAX ;
107+ let cap = ( N + 16 ) as usize ;
108+
109+ let runtime = Arc :: new ( TokioAdapter :: new ( ) . unwrap ( ) ) ;
110+ let mut builder = AimDbBuilder :: new ( ) . runtime ( runtime) ;
111+
112+ // Input/output rings sized larger than the bounded(64) fan-in so the SpmcRing
113+ // itself isn't the limiter — we want the bounded channel to be the bottleneck.
114+ builder. configure :: < ValueA > ( "stress::A" , |reg| {
115+ reg. buffer ( BufferCfg :: SpmcRing { capacity : cap } ) ;
116+ } ) ;
117+ builder. configure :: < Sum > ( "stress::Echo" , |reg| {
118+ reg. buffer ( BufferCfg :: SpmcRing { capacity : cap } )
119+ . transform_join ( |b| {
120+ b. input :: < ValueA > ( "stress::A" )
121+ . on_triggers ( |mut rx, producer| async move {
122+ while let Ok ( trigger) = rx. recv ( ) . await {
123+ if let Some ( v) = trigger. as_input :: < ValueA > ( ) . copied ( ) {
124+ // Yield between receives to keep the fan-in channel
125+ // pressured well above its 64-slot capacity.
126+ tokio:: task:: yield_now ( ) . await ;
127+ let _ = producer. produce ( Sum ( v. 0 ) ) . await ;
128+ }
129+ }
130+ } )
131+ } ) ;
132+ } ) ;
133+
134+ let db = builder. build ( ) . await . unwrap ( ) ;
135+ let mut echo_rx = db. subscribe :: < Sum > ( "stress::Echo" ) . unwrap ( ) ;
136+
137+ // Warm-up: keep producing a sentinel until its echo lands. SpmcRing buffers
138+ // are tokio broadcast channels, so subscribers (including the join input
139+ // forwarder) only see values produced after they subscribe — the round-trip
140+ // gives us a deterministic barrier for "forwarder is up".
141+ {
142+ let warmup_db = db. clone ( ) ;
143+ let warmup = tokio:: spawn ( async move {
144+ loop {
145+ warmup_db
146+ . produce :: < ValueA > ( "stress::A" , ValueA ( SENTINEL ) )
147+ . await
148+ . unwrap ( ) ;
149+ tokio:: time:: sleep ( Duration :: from_millis ( 5 ) ) . await ;
150+ }
151+ } ) ;
152+ loop {
153+ let s = tokio:: time:: timeout ( Duration :: from_secs ( 2 ) , echo_rx. recv ( ) )
154+ . await
155+ . expect ( "warm-up: join forwarder did not subscribe in time" )
156+ . unwrap ( ) ;
157+ if s. 0 == SENTINEL {
158+ break ;
159+ }
160+ }
161+ warmup. abort ( ) ;
162+ let _ = warmup. await ;
163+ }
164+
165+ // Drain any remaining warm-up echoes so the burst checker sees a clean stream.
166+ while let Ok ( Ok ( s) ) = tokio:: time:: timeout ( Duration :: from_millis ( 50 ) , echo_rx. recv ( ) ) . await {
167+ assert_eq ! (
168+ s. 0 , SENTINEL ,
169+ "only warm-up sentinels should be in flight here"
170+ ) ;
171+ }
172+
173+ // Burst N events. The join handler yields between every receive, so the
174+ // bounded(64) fan-in fills up and backpressures the input forwarder. A
175+ // missing or broken backpressure path would deadlock here.
176+ let producer_db = db. clone ( ) ;
177+ let producer_task = tokio:: spawn ( async move {
178+ for i in 0 ..N {
179+ producer_db
180+ . produce :: < ValueA > ( "stress::A" , ValueA ( i) )
181+ . await
182+ . unwrap ( ) ;
183+ }
184+ } ) ;
185+
186+ for expected in 0 ..N {
187+ let s = tokio:: time:: timeout ( Duration :: from_secs ( 5 ) , echo_rx. recv ( ) )
188+ . await
189+ . expect ( "backpressured fan-in should not deadlock" )
190+ . unwrap ( ) ;
191+ assert_eq ! (
192+ s. 0 , expected,
193+ "values must arrive in order under backpressure"
194+ ) ;
195+ }
196+
197+ producer_task. await . unwrap ( ) ;
198+ }
0 commit comments