Skip to content

Commit 49a5bc1

Browse files
aCombraycopybara-github
authored andcommitted
Internal change
PiperOrigin-RevId: 893956786
1 parent c568500 commit 49a5bc1

4 files changed

Lines changed: 104 additions & 62 deletions

File tree

src/devtools/mobileharness/shared/util/comm/dualconduit/conduit/pump.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ func handleIngressConnection(c *Conduit, conn net.Conn) {
7272

7373
// AcceptStream handles an incoming RSocket channel, dials the target service, and bridges the streams.
7474
// This implements the Responder Role.
75-
func AcceptStream(c *Conduit, destEndpoint string, incoming flux.Flux) flux.Flux {
75+
func AcceptStream(c *Conduit, destEndpoint string, downstream flux.Flux) flux.Flux {
7676
conn, err := net.Dial("tcp", destEndpoint)
7777
if err != nil {
7878
return flux.Error(err)
@@ -86,8 +86,11 @@ func AcceptStream(c *Conduit, destEndpoint string, incoming flux.Flux) flux.Flux
8686

8787
streamCtx, cancel := context.WithCancel(c.Context())
8888

89-
// Read from RSocket and write to backend TCP.
90-
rc.FromFlux(streamCtx, incoming)
89+
// 1. Read from RSocket and write to backend TCP.
90+
rc.FromFlux(streamCtx, downstream)
91+
92+
// 2. Read from backend TCP and send to RSocket.
93+
upstream := rc.ToFlux(streamCtx)
9194

9295
// Clean up TCP connection when streaming finishes.
9396
go func() {
@@ -96,6 +99,5 @@ func AcceptStream(c *Conduit, destEndpoint string, incoming flux.Flux) flux.Flux
9699
conn.Close()
97100
}()
98101

99-
// Read from backend TCP and send to RSocket.
100-
return rc.ToFlux(streamCtx)
102+
return upstream
101103
}

src/devtools/mobileharness/shared/util/comm/dualconduit/conduit/pump_test.go

Lines changed: 28 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -308,33 +308,36 @@ func TestCombinedLoopAndAccept(t *testing.T) {
308308
defer conn.Close()
309309

310310
// 3. Simultaneously run an AcceptStream toward another local server
311-
l2, err := net.Listen("tcp", "localhost:0")
311+
lis2, err := net.Listen("tcp", "localhost:0")
312312
if err != nil {
313313
t.Fatalf("Listen failed: %v", err)
314314
}
315-
defer l2.Close()
316-
port2 := l2.Addr().(*net.TCPAddr).Port
315+
defer lis2.Close()
316+
port2 := lis2.Addr().(*net.TCPAddr).Port
317317

318318
go func() {
319-
c, err := l2.Accept()
319+
conn, err := lis2.Accept()
320320
if err != nil {
321321
if errors.Is(err, net.ErrClosed) {
322322
return
323323
}
324-
t.Errorf("l2.Accept() got error: %v", err)
324+
t.Errorf("lis2.Accept() got error: %v", err)
325325
return
326326
}
327-
if c != nil {
328-
c.Close()
329-
}
327+
defer conn.Close()
328+
329+
io.Copy(io.Discard, conn) // Drain unread data to avoid RST
330330
}()
331331

332332
f := AcceptStream(c, fmt.Sprintf("localhost:%d", port2), flux.Just(payload.New([]byte("hello"), nil)))
333333
if f == nil {
334334
t.Fatalf("AcceptStream got nil, want non-nil flux")
335335
}
336336

337-
// Wait, no panic = success
337+
_, err = f.BlockLast(ctx)
338+
if err != nil && !errors.Is(err, io.EOF) {
339+
t.Fatalf("BlockLast failed: %v", err)
340+
}
338341
}
339342

340343
func TestHalfClose_Ingress_ClientCloseWrite(t *testing.T) {
@@ -344,19 +347,9 @@ func TestHalfClose_Ingress_ClientCloseWrite(t *testing.T) {
344347
completed := make(chan struct{})
345348
fakeRSocket := rsocket.NewAbstractSocket(
346349
rsocket.RequestChannel(func(msgs flux.Flux) flux.Flux {
347-
// Track when upstream is completed (client CloseWrite)
348-
go func() {
349-
_, err := msgs.BlockLast(ctx)
350-
if err == nil {
351-
close(completed)
352-
}
353-
}()
354-
// Return downstream data slowly
355-
f := flux.Create(func(ctx context.Context, sink flux.Sink) {
356-
sink.Next(payload.New([]byte("delayed-response"), nil))
357-
sink.Complete()
350+
return msgs.DoOnComplete(func() {
351+
close(completed)
358352
})
359-
return f
360353
}),
361354
)
362355

@@ -390,8 +383,8 @@ func TestHalfClose_Ingress_ClientCloseWrite(t *testing.T) {
390383
t.Fatalf("Unexpected read error: %v", err)
391384
}
392385

393-
if string(buf[:n]) != "delayed-response" {
394-
t.Fatalf("During half-close read got %q, want %q", string(buf[:n]), "delayed-response")
386+
if string(buf[:n]) != "req" {
387+
t.Fatalf("During half-close read got %q, want %q", string(buf[:n]), "req")
395388
}
396389

397390
select {
@@ -411,8 +404,9 @@ func TestHalfClose_Egress_ServerCloseWrite(t *testing.T) {
411404
defer l.Close()
412405
port := l.Addr().(*net.TCPAddr).Port
413406

414-
var testPassed atomic.Int32
407+
serverDone := make(chan struct{})
415408
go func() {
409+
defer close(serverDone)
416410
conn, err := l.Accept()
417411
if err != nil {
418412
return
@@ -425,21 +419,21 @@ func TestHalfClose_Egress_ServerCloseWrite(t *testing.T) {
425419

426420
buf := make([]byte, 1024)
427421
n, err := tcpConn.Read(buf) // Should get "req"
428-
if err != nil {
422+
if err != nil && err != io.EOF {
429423
return
430424
}
431-
if string(buf[:n]) == "req" {
425+
if n > 0 && string(buf[:n]) == "req" {
432426
tcpConn.Write([]byte("response"))
433427
// Trigger half-close from server end
434428
tcpConn.CloseWrite()
435429
}
436430

437-
// Wait to see if we get more data? Actually in Acceptance stream,
438-
// remote already pushed all.
439-
// Wait to observe EOF from client because RSocket complete propagated?
440-
_, err = tcpConn.Read(buf)
441-
if err == io.EOF {
442-
testPassed.Store(1)
431+
// Read remaining data until EOF if not already reached
432+
if err != io.EOF {
433+
_, err = tcpConn.Read(buf)
434+
}
435+
if err != io.EOF {
436+
t.Errorf("Server side did not receive EOF, got %v", err)
443437
}
444438
}()
445439

@@ -452,7 +446,7 @@ func TestHalfClose_Egress_ServerCloseWrite(t *testing.T) {
452446

453447
outFlux := AcceptStream(c, fmt.Sprintf("localhost:%d", port), f)
454448

455-
res, err := outFlux.BlockFirst(ctx)
449+
res, err := outFlux.BlockLast(ctx)
456450
if err != nil {
457451
t.Fatalf("outFlux blockFirst error: %v", err)
458452
}
@@ -462,7 +456,5 @@ func TestHalfClose_Egress_ServerCloseWrite(t *testing.T) {
462456

463457
// Since f completed, and rxconn mirrors EOF locally (CloseWrite on the dialed connection)
464458
// the server should have read an EOF.
465-
if testPassed.Load() != 1 {
466-
t.Fatalf("Server side did not receive EOF (rxconn did not mirror Complete)")
467-
}
459+
<-serverDone // Wait for server to finish reading EOF and close
468460
}

src/devtools/mobileharness/shared/util/comm/dualconduit/rxconn/rxconn.go

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,11 @@ package rxconn
33

44
import (
55
"context"
6+
"errors"
67
"io"
78
"net"
89
"sync"
10+
"sync/atomic"
911

1012
"github.com/rsocket/rsocket-go/payload"
1113
"github.com/rsocket/rsocket-go/rx/flux"
@@ -14,8 +16,9 @@ import (
1416

1517
// Conn bridges a standard network connection with reactive streams.
1618
type Conn struct {
17-
conn net.Conn
18-
wg sync.WaitGroup
19+
conn net.Conn
20+
wg sync.WaitGroup
21+
subscribed atomic.Bool // Guard to enforce the single-subscriber policy
1922
}
2023

2124
// New creates a new Conn bridging the active network connection to reactive fluxes.
@@ -29,12 +32,26 @@ func New(conn net.Conn) (*Conn, error) {
2932
// ToFlux returns a Flux that reads from the underlying active socket.
3033
// The context controls pump lifecycle: cancellation tears down background pumps.
3134
// It may be used after ToFlux returns, as pumps are run in background goroutines.
35+
//
36+
// ToFlux enforces a single-subscriber policy using an atomic guard to prevent
37+
// multiple subscribers from corrupting the stream (net.Conn is stateful and
38+
// cannot be shared by multiple Read Pumps).
3239
func (c *Conn) ToFlux(ctx context.Context) flux.Flux {
33-
upstreamChan := make(chan []byte, 32)
34-
errChan := make(chan error, 1)
35-
40+
// Eager Tracking: Protect c.wg.Wait() immediately.
41+
// This ensures that Wait() does not exit prematurely if the RSocket
42+
// subscription is slightly delayed or happens after Wait() is called.
3643
c.wg.Add(1)
3744
return flux.Create(func(subscriberCtx context.Context, sink flux.Sink) {
45+
if c.subscribed.Swap(true) {
46+
sink.Error(errors.New("ToFlux allows only one subscriber"))
47+
return
48+
}
49+
50+
// Lazy Pumping: Initialize channels and start goroutines ONLY when
51+
// a subscriber is ready.
52+
upstreamChan := make(chan []byte, 32)
53+
errChan := make(chan error, 1)
54+
3855
// 1. The TCP Read Pump
3956
go func() {
4057
defer close(upstreamChan)

src/devtools/mobileharness/shared/util/comm/dualconduit/rxconn/rxconn_test.go

Lines changed: 47 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,14 @@ func TestRxConn_ToFlux_EOF(t *testing.T) {
1616
clientSide, serverSide := net.Pipe()
1717
defer clientSide.Close()
1818

19-
rxConnObj, err := New(serverSide)
19+
conn, err := New(serverSide)
2020
if err != nil {
2121
t.Fatalf("Failed to create rxConn: %v", err)
2222
}
2323
ctx, cancel := context.WithTimeout(t.Context(), 2*time.Second)
2424
defer cancel()
2525

26-
f := rxConnObj.ToFlux(ctx)
26+
f := conn.ToFlux(ctx)
2727
resultChan := make(chan payload.Payload, 10)
2828
errChan := make(chan error, 1)
2929
doneChan := make(chan struct{})
@@ -74,13 +74,13 @@ func TestRxConn_ToFlux_ContextCancellation(t *testing.T) {
7474
clientSide, serverSide := net.Pipe()
7575
defer clientSide.Close()
7676

77-
rxConnObj, err := New(serverSide)
77+
conn, err := New(serverSide)
7878
if err != nil {
7979
t.Fatalf("Failed to create rxConn: %v", err)
8080
}
8181
ctx, cancel := context.WithCancel(t.Context())
8282

83-
f := rxConnObj.ToFlux(ctx)
83+
f := conn.ToFlux(ctx)
8484
errChan := make(chan error, 1)
8585

8686
f.Subscribe(t.Context(),
@@ -107,7 +107,7 @@ func TestRxConn_FromFlux_DataAndEOFPropagation(t *testing.T) {
107107
clientSide, serverSide := net.Pipe()
108108
defer clientSide.Close()
109109

110-
rxConnObj, err := New(serverSide)
110+
conn, err := New(serverSide)
111111
if err != nil {
112112
t.Fatalf("Failed to create rxConn: %v", err)
113113
}
@@ -121,7 +121,7 @@ func TestRxConn_FromFlux_DataAndEOFPropagation(t *testing.T) {
121121
// Subscribe in a goroutine because FromFlux will synchronously write to the pipe,
122122
// which blocks until the test reads from it below.
123123
go func() {
124-
rxConnObj.FromFlux(t.Context(), f)
124+
conn.FromFlux(t.Context(), f)
125125
}()
126126

127127
// Read from pipe to verify
@@ -149,14 +149,14 @@ func TestRxConn_ToFlux_Backpressure(t *testing.T) {
149149
defer clientSide.Close()
150150
defer serverSide.Close()
151151

152-
rxConnObj, err := New(serverSide)
152+
conn, err := New(serverSide)
153153
if err != nil {
154154
t.Fatalf("Failed to create rxConn: %v", err)
155155
}
156156
ctx, cancel := context.WithTimeout(t.Context(), 2*time.Second)
157157
defer cancel()
158158

159-
f := rxConnObj.ToFlux(ctx)
159+
f := conn.ToFlux(ctx)
160160

161161
// We DON'T consume the flux immediately, forcing the upstream channel to fill up.
162162
// The channel buffer is 32. We can send 32 bursts without blocking, then it should block.
@@ -205,13 +205,13 @@ func TestRxConn_ToFlux_Backpressure(t *testing.T) {
205205
func TestRxConn_Wait_CloseFromFluxFirst(t *testing.T) {
206206
clientSide, serverSide := net.Pipe()
207207

208-
rxConnObj, err := New(serverSide)
208+
conn, err := New(serverSide)
209209
if err != nil {
210210
t.Fatalf("Failed to create rxConn: %v", err)
211211
}
212212
ctx := t.Context()
213213

214-
fTo := rxConnObj.ToFlux(ctx)
214+
fTo := conn.ToFlux(ctx)
215215

216216
stopFrom := make(chan struct{})
217217
fFrom := flux.Create(func(subCtx context.Context, sink flux.Sink) {
@@ -223,11 +223,11 @@ func TestRxConn_Wait_CloseFromFluxFirst(t *testing.T) {
223223
})
224224

225225
fTo.Subscribe(ctx)
226-
go rxConnObj.FromFlux(ctx, fFrom)
226+
go conn.FromFlux(ctx, fFrom)
227227

228228
waitDone := make(chan struct{})
229229
go func() {
230-
rxConnObj.Wait()
230+
conn.Wait()
231231
close(waitDone)
232232
}()
233233

@@ -262,13 +262,13 @@ func TestRxConn_Wait_CloseFromFluxFirst(t *testing.T) {
262262
func TestRxConn_Wait_CloseToFluxFirst(t *testing.T) {
263263
clientSide, serverSide := net.Pipe()
264264

265-
rxConnObj, err := New(serverSide)
265+
conn, err := New(serverSide)
266266
if err != nil {
267267
t.Fatalf("Failed to create rxConn: %v", err)
268268
}
269269
ctx := t.Context()
270270

271-
fTo := rxConnObj.ToFlux(ctx)
271+
fTo := conn.ToFlux(ctx)
272272

273273
stopFrom := make(chan struct{})
274274
fFrom := flux.Create(func(subCtx context.Context, sink flux.Sink) {
@@ -280,11 +280,11 @@ func TestRxConn_Wait_CloseToFluxFirst(t *testing.T) {
280280
})
281281

282282
fTo.Subscribe(ctx)
283-
go rxConnObj.FromFlux(ctx, fFrom)
283+
go conn.FromFlux(ctx, fFrom)
284284

285285
waitDone := make(chan struct{})
286286
go func() {
287-
rxConnObj.Wait()
287+
conn.Wait()
288288
close(waitDone)
289289
}()
290290

@@ -315,3 +315,34 @@ func TestRxConn_Wait_CloseToFluxFirst(t *testing.T) {
315315
t.Fatal("Wait() blocked even after both pumps stopped")
316316
}
317317
}
318+
319+
func TestRxConn_ToFlux_SingleSubscriberPolicy(t *testing.T) {
320+
_, serverSide := net.Pipe()
321+
defer serverSide.Close()
322+
323+
conn, err := New(serverSide)
324+
if err != nil {
325+
t.Fatalf("Failed to create rxConn: %v", err)
326+
}
327+
ctx := t.Context()
328+
329+
f := conn.ToFlux(ctx)
330+
331+
// Subscribe once
332+
f.Subscribe(ctx)
333+
334+
// Subscribe second time - should fail
335+
errChan := make(chan error, 1)
336+
f.Subscribe(ctx, rx.OnError(func(err error) {
337+
errChan <- err
338+
}))
339+
340+
select {
341+
case err := <-errChan:
342+
if err.Error() != "ToFlux allows only one subscriber" {
343+
t.Errorf("ToFlux() error = %q, want %q", err.Error(), "ToFlux allows only one subscriber")
344+
}
345+
case <-time.After(time.Second):
346+
t.Fatal("timeout waiting for error on second subscription")
347+
}
348+
}

0 commit comments

Comments
 (0)