Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,12 @@ public void testFullyAckedActiveDoesNotReplayAfterCleanRestart() throws Exceptio
// Phase 1: server ACKs every frame. Sender writes a few rows,
// flushes, then close() blocks for the default 5s drain — by the
// time close returns, every frame has been ACK'd.
int port1 = TestPorts.findUnusedPort();
AckHandler ack1 = new AckHandler();
try (TestWebSocketServer s1 = new TestWebSocketServer(port1, ack1)) {
try (TestWebSocketServer s1 = new TestWebSocketServer(ack1)) {
s1.start();
Assert.assertTrue(s1.awaitStart(5, TimeUnit.SECONDS));

int port1 = s1.getPort();
String cfg1 = "ws::addr=localhost:" + port1
+ ";sf_dir=" + sfDir + ";";
try (Sender sender = Sender.fromConfig(cfg1)) {
Expand All @@ -105,12 +105,12 @@ public void testFullyAckedActiveDoesNotReplayAfterCleanRestart() throws Exceptio
// SAME slot dir. There is no unacked work — both rings should agree
// there's nothing to send. The expected count of binary frames at
// server 2 is zero.
int port2 = port1 + 50;
AckHandler ack2 = new AckHandler();
try (TestWebSocketServer s2 = new TestWebSocketServer(port2, ack2)) {
try (TestWebSocketServer s2 = new TestWebSocketServer(ack2)) {
s2.start();
Assert.assertTrue(s2.awaitStart(5, TimeUnit.SECONDS));

int port2 = s2.getPort();
String cfg2 = "ws::addr=localhost:" + port2
+ ";sf_dir=" + sfDir + ";";
try (Sender ignored = Sender.fromConfig(cfg2)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,13 @@ public void testCloseBlocksUntilAckArrives() throws Exception {
// Server delays every ACK by 800ms. With the default
// close_flush_timeout_millis=60000, close() must wait for that
// ACK before returning. Pre-fix close() returned within milliseconds.
int port = TestPorts.findUnusedPort();
long ackDelayMs = 800;
DelayingAckHandler handler = new DelayingAckHandler(ackDelayMs);
try (TestWebSocketServer server = new TestWebSocketServer(port, handler)) {
try (TestWebSocketServer server = new TestWebSocketServer(handler)) {
server.start();
Assert.assertTrue(server.awaitStart(5, TimeUnit.SECONDS));

int port = server.getPort();
String cfg = "ws::addr=localhost:" + port + ";"; // memory mode
long elapsedMs;
try (Sender sender = Sender.fromConfig(cfg)) {
Expand All @@ -82,13 +82,13 @@ public void testCloseFastWhenTimeoutIsZero() throws Exception {
// Same delayed-ACK server, but with close_flush_timeout_millis=0
// (fast close). close() must return immediately, well before the
// ACK delay would have elapsed.
int port = TestPorts.findUnusedPort();
long ackDelayMs = 1500;
DelayingAckHandler handler = new DelayingAckHandler(ackDelayMs);
try (TestWebSocketServer server = new TestWebSocketServer(port, handler)) {
try (TestWebSocketServer server = new TestWebSocketServer(handler)) {
server.start();
Assert.assertTrue(server.awaitStart(5, TimeUnit.SECONDS));

int port = server.getPort();
String cfg = "ws::addr=localhost:" + port
+ ";close_flush_timeout_millis=0;";
long elapsedMs;
Expand All @@ -115,13 +115,13 @@ public void testCloseFastWhenTimeoutIsMinusOne() throws Exception {
// sentinel in LineSenderBuilder, so the build path silently substitutes
// DEFAULT_CLOSE_FLUSH_TIMEOUT_MILLIS (60s) and close() blocks for the
// full ACK delay instead of returning fast.
int port = TestPorts.findUnusedPort();
long ackDelayMs = 1500;
DelayingAckHandler handler = new DelayingAckHandler(ackDelayMs);
try (TestWebSocketServer server = new TestWebSocketServer(port, handler)) {
try (TestWebSocketServer server = new TestWebSocketServer(handler)) {
server.start();
Assert.assertTrue(server.awaitStart(5, TimeUnit.SECONDS));

int port = server.getPort();
String cfg = "ws::addr=localhost:" + port
+ ";close_flush_timeout_millis=-1;";
long elapsedMs;
Expand All @@ -144,13 +144,13 @@ public void testCloseDrainTimesOutWhenAcksNeverArrive() throws Exception {
// Server that buffers frames silently and never ACKs. close() must
// throw a drain-timeout LineSenderException after roughly the
// configured timeout — not hang forever and not return immediately.
int port = TestPorts.findUnusedPort();
long timeoutMs = 500;
SilentHandler handler = new SilentHandler();
try (TestWebSocketServer server = new TestWebSocketServer(port, handler)) {
try (TestWebSocketServer server = new TestWebSocketServer(handler)) {
server.start();
Assert.assertTrue(server.awaitStart(5, TimeUnit.SECONDS));

int port = server.getPort();
String cfg = "ws::addr=localhost:" + port
+ ";close_flush_timeout_millis=" + timeoutMs + ";";
long elapsedMs;
Expand Down Expand Up @@ -182,13 +182,13 @@ public void testDrainBlocksUntilAckArrivesAndReturnsTrue() throws Exception {
// testCloseBlocksUntilAckArrives, but the wait happens inside the
// explicit drain() call. The subsequent close() should be a near-
// instant no-op because everything is already acked.
int port = TestPorts.findUnusedPort();
long ackDelayMs = 600;
DelayingAckHandler handler = new DelayingAckHandler(ackDelayMs);
try (TestWebSocketServer server = new TestWebSocketServer(port, handler)) {
try (TestWebSocketServer server = new TestWebSocketServer(handler)) {
server.start();
Assert.assertTrue(server.awaitStart(5, TimeUnit.SECONDS));

int port = server.getPort();
String cfg = "ws::addr=localhost:" + port + ";";
try (Sender sender = Sender.fromConfig(cfg)) {
sender.table("foo").longColumn("v", 1L).atNow();
Expand Down Expand Up @@ -218,12 +218,12 @@ public void testDrainReturnsFalseOnTimeoutAndSenderStillUsable() throws Exceptio
// usable for further row writes after a false return; the
// outstanding frames remain pending and close()'s own drain still
// runs.
int port = TestPorts.findUnusedPort();
SilentHandler handler = new SilentHandler();
try (TestWebSocketServer server = new TestWebSocketServer(port, handler)) {
try (TestWebSocketServer server = new TestWebSocketServer(handler)) {
server.start();
Assert.assertTrue(server.awaitStart(5, TimeUnit.SECONDS));

int port = server.getPort();
String cfg = "ws::addr=localhost:" + port + ";close_flush_timeout_millis=0;";
try (Sender sender = Sender.fromConfig(cfg)) {
sender.table("foo").longColumn("v", 1L).atNow();
Expand All @@ -246,12 +246,12 @@ public void testDrainNonZeroTimeoutOnFastServerReturnsImmediately() throws Excep
// Fast server: every frame is acked promptly. drain(longTimeout)
// must return true quickly -- no spurious wait when there is
// nothing to wait for.
int port = TestPorts.findUnusedPort();
DelayingAckHandler handler = new DelayingAckHandler(0);
try (TestWebSocketServer server = new TestWebSocketServer(port, handler)) {
try (TestWebSocketServer server = new TestWebSocketServer(handler)) {
server.start();
Assert.assertTrue(server.awaitStart(5, TimeUnit.SECONDS));

int port = server.getPort();
String cfg = "ws::addr=localhost:" + port + ";";
try (Sender sender = Sender.fromConfig(cfg)) {
sender.table("foo").longColumn("v", 1L).atNow();
Expand All @@ -266,9 +266,9 @@ public void testDrainNonZeroTimeoutOnFastServerReturnsImmediately() throws Excep

@Test
public void testAsyncCloseDrainSucceedsWhenServerStartsDuringDrain() throws Exception {
int port = TestPorts.findUnusedPort();
DelayingAckHandler handler = new DelayingAckHandler(0);
try (TestWebSocketServer server = new TestWebSocketServer(port, handler)) {
try (TestWebSocketServer server = new TestWebSocketServer(handler)) {
int port = server.getPort();
String cfg = "ws::addr=localhost:" + port
+ sfDirOpt()
+ ";initial_connect_retry=async"
Expand Down Expand Up @@ -303,12 +303,12 @@ public void testAsyncCloseDrainSucceedsWhenServerStartsDuringDrain() throws Exce

@Test
public void testAsyncCloseDrainSucceedsWhenServerWasUpAllAlong() throws Exception {
int port = TestPorts.findUnusedPort();
DelayingAckHandler handler = new DelayingAckHandler(0);
try (TestWebSocketServer server = new TestWebSocketServer(port, handler)) {
try (TestWebSocketServer server = new TestWebSocketServer(handler)) {
server.start();
Assert.assertTrue(server.awaitStart(5, TimeUnit.SECONDS));

int port = server.getPort();
for (int i = 0; i < 20; i++) {
String cfg = "ws::addr=localhost:" + port
+ sfDirOpt()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,9 @@ public void testAsyncAuthFailureDeliversToErrorInbox() throws Exception {
// Server returns HTTP 401 on every upgrade attempt. Auth failures
// are terminal at the I/O thread; in async mode they are
// delivered as a SenderError, not thrown from fromConfig.
int port = TestPorts.findUnusedPort();
try (Always401Fixture fixture = new Always401Fixture(port)) {
try (Always401Fixture fixture = new Always401Fixture()) {
fixture.start();
int port = fixture.getPort();
ErrorInbox inbox = new ErrorInbox();
String cfg = "ws::addr=localhost:" + port
+ sfDirOpt() + ";initial_connect_retry=async"
Expand Down Expand Up @@ -159,9 +159,9 @@ public void testAsyncDeliversBufferedRowsWhenServerArrivesLate() {
// appended to the cursor SF engine on the producer thread. The
// I/O thread retries connect in the background; once the server
// comes up, the buffered frame is sent and ACKed.
int port = TestPorts.findUnusedPort();
AckHandler handler = new AckHandler();
try (TestWebSocketServer server = new TestWebSocketServer(port, handler)) {
try (TestWebSocketServer server = new TestWebSocketServer(handler)) {
int port = server.getPort();
String cfg = "ws::addr=localhost:" + port
+ sfDirOpt() + ";initial_connect_retry=async"
+ ";reconnect_max_duration_millis=10000"
Expand Down Expand Up @@ -239,9 +239,9 @@ public void testConnectionLostBudgetExhaustionTagsDifferently() {
// Because the loop did connect at least once before the outage,
// the SenderError must use the connection-lost tag and the sender
// must report wasEverConnected()==true.
int port = TestPorts.findUnusedPort();
AckHandler handler = new AckHandler();
try (TestWebSocketServer server = new TestWebSocketServer(port, handler)) {
try (TestWebSocketServer server = new TestWebSocketServer(handler)) {
int port = server.getPort();
server.start();
Assert.assertTrue(server.awaitStart(5, java.util.concurrent.TimeUnit.SECONDS));

Expand Down Expand Up @@ -299,8 +299,8 @@ public void testWasEverConnectedTrueImmediatelyInSyncMode() {
// caller — there is no observable "never connected" window in
// those modes, so misclassifying a budget exhaustion as
// never-connected is impossible.
int port = TestPorts.findUnusedPort();
try (TestWebSocketServer server = new TestWebSocketServer(port, new AckHandler())) {
try (TestWebSocketServer server = new TestWebSocketServer(new AckHandler())) {
int port = server.getPort();
server.start();
Assert.assertTrue(server.awaitStart(5, java.util.concurrent.TimeUnit.SECONDS));
String cfg = "ws::addr=localhost:" + port
Expand Down Expand Up @@ -446,8 +446,16 @@ private static class Always401Fixture implements AutoCloseable {
private Thread acceptThread;
private volatile boolean running;

Always401Fixture(int port) throws IOException {
this.serverSocket = new ServerSocket(port);
Always401Fixture() throws IOException {
// Bind the listener up front on an OS-assigned loopback port and
// hold it for the fixture's lifetime; read it back via getPort().
// Owning the port from allocation to teardown avoids the bind race
// a pre-selected port would carry.
this.serverSocket = new ServerSocket(0, 50, java.net.InetAddress.getLoopbackAddress());
}

int getPort() {
return serverSocket.getLocalPort();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,14 +100,14 @@ public void testWithRetryGivesUpAfterCap() {
}

@Test
public void testWithRetrySucceedsWhenServerComesUpInTime() {
public void testWithRetrySucceedsWhenServerComesUpInTime() throws Exception {
// initial_connect_retry=true; we open the sender BEFORE starting
// the server, then start the server in a background thread after
// a short delay. The retry loop should see the server come up and
// proceed cleanly.
int port = TestPorts.findUnusedPort();
AckHandler handler = new AckHandler();
TestWebSocketServer server = new TestWebSocketServer(port, handler);
TestWebSocketServer server = new TestWebSocketServer(handler);
int port = server.getPort();
Thread starter = new Thread(() -> {
try {
Thread.sleep(300);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,9 @@ public class IoThreadErrorSurfacedOnRowApiTest {

@Test
public void testRowApiMethodSurfacesIoThreadTerminalError() throws Exception {
int port = TestPorts.findUnusedPort();
ErrorAckHandler handler = new ErrorAckHandler();
try (TestWebSocketServer server = new TestWebSocketServer(port, handler)) {
try (TestWebSocketServer server = new TestWebSocketServer(handler)) {
int port = server.getPort();
server.start();
Assert.assertTrue(server.awaitStart(5, TimeUnit.SECONDS));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,13 +78,13 @@ public class PrReviewRedTestsE2e {
@Test
public void testC4_handlerMustObserveTerminalErrorWhenInvoked() throws Exception {
TestUtils.assertMemoryLeak(() -> {
int port = TestPorts.findUnusedPort();
int iterations = 30;
AtomicInteger nullObservations = new AtomicInteger();
AtomicInteger totalObservations = new AtomicInteger();

ParseErrorAckHandler serverHandler = new ParseErrorAckHandler();
try (TestWebSocketServer server = new TestWebSocketServer(port, serverHandler)) {
try (TestWebSocketServer server = new TestWebSocketServer(serverHandler)) {
int port = server.getPort();
server.start();
Assert.assertTrue(server.awaitStart(5, TimeUnit.SECONDS));

Expand Down Expand Up @@ -161,9 +161,9 @@ public void testC4_handlerMustObserveTerminalErrorWhenInvoked() throws Exception
@Test
public void testC11_postHaltFlushThrowsTypedLineSenderServerException() throws Exception {
TestUtils.assertMemoryLeak(() -> {
int port = TestPorts.findUnusedPort();
ParseErrorAckHandler serverHandler = new ParseErrorAckHandler();
try (TestWebSocketServer server = new TestWebSocketServer(port, serverHandler)) {
try (TestWebSocketServer server = new TestWebSocketServer(serverHandler)) {
int port = server.getPort();
server.start();
Assert.assertTrue(server.awaitStart(5, TimeUnit.SECONDS));

Expand Down
Loading
Loading