Skip to content

Commit 415fa9c

Browse files
committed
Move tests using mock websocket server to client module
1 parent 2821d91 commit 415fa9c

2 files changed

Lines changed: 691 additions & 0 deletions

File tree

Lines changed: 299 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,299 @@
1+
/*******************************************************************************
2+
* ___ _ ____ ____
3+
* / _ \ _ _ ___ ___| |_| _ \| __ )
4+
* | | | | | | |/ _ \/ __| __| | | | _ \
5+
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
6+
* \__\_\\__,_|\___||___/\__|____/|____/
7+
*
8+
* Copyright (c) 2014-2019 Appsicle
9+
* Copyright (c) 2019-2026 QuestDB
10+
*
11+
* Licensed under the Apache License, Version 2.0 (the "License");
12+
* you may not use this file except in compliance with the License.
13+
* You may obtain a copy of the License at
14+
*
15+
* http://www.apache.org/licenses/LICENSE-2.0
16+
*
17+
* Unless required by applicable law or agreed to in writing, software
18+
* distributed under the License is distributed on an "AS IS" BASIS,
19+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20+
* See the License for the specific language governing permissions and
21+
* limitations under the License.
22+
*
23+
******************************************************************************/
24+
25+
package io.questdb.client.test.cutlass.qwp.client;
26+
27+
import io.questdb.client.cutlass.qwp.client.QwpWebSocketSender;
28+
import io.questdb.client.cutlass.qwp.client.WebSocketResponse;
29+
import io.questdb.client.cutlass.qwp.websocket.WebSocketCloseCode;
30+
import io.questdb.client.test.AbstractTest;
31+
import io.questdb.client.test.cutlass.qwp.websocket.TestWebSocketServer;
32+
import org.junit.Assert;
33+
import org.junit.Test;
34+
35+
import java.io.IOException;
36+
import java.util.concurrent.TimeUnit;
37+
import java.util.concurrent.atomic.AtomicLong;
38+
39+
/**
40+
* Integration tests for QWP v1 WebSocket ACK delivery mechanism.
41+
* These tests verify that the InFlightWindow and ACK responses work correctly end-to-end.
42+
*/
43+
public class QwpWebSocketAckIntegrationTest extends AbstractTest {
44+
45+
private static final int TEST_PORT = 19_500 + (int) (System.nanoTime() % 100);
46+
47+
@Test
48+
public void testAsyncFlushFailsFastOnInvalidAckPayload() throws Exception {
49+
InvalidAckPayloadHandler handler = new InvalidAckPayloadHandler();
50+
int port = TEST_PORT + 21;
51+
52+
try (TestWebSocketServer server = new TestWebSocketServer(port, handler)) {
53+
server.start();
54+
Assert.assertTrue("Server failed to start", server.awaitStart(5, TimeUnit.SECONDS));
55+
56+
boolean errorCaught = false;
57+
long start = System.currentTimeMillis();
58+
try (QwpWebSocketSender sender = QwpWebSocketSender.connectAsync(
59+
"localhost", port, false, 0, 0, 0)) {
60+
sender.table("test")
61+
.longColumn("value", 1)
62+
.atNow();
63+
sender.flush();
64+
} catch (Exception e) {
65+
errorCaught = true;
66+
Assert.assertTrue(
67+
e.getMessage().contains("Invalid ACK response payload")
68+
|| e.getMessage().contains("Error in send queue")
69+
);
70+
}
71+
72+
long duration = System.currentTimeMillis() - start;
73+
Assert.assertTrue("Expected invalid ACK error", errorCaught);
74+
Assert.assertTrue("Flush should fail quickly on invalid ACK [duration=" + duration + "ms]", duration < 10_000);
75+
}
76+
}
77+
78+
@Test
79+
public void testAsyncFlushFailsFastOnServerClose() throws Exception {
80+
ClosingServerHandler handler = new ClosingServerHandler();
81+
int port = TEST_PORT + 20;
82+
83+
try (TestWebSocketServer server = new TestWebSocketServer(port, handler)) {
84+
server.start();
85+
Assert.assertTrue("Server failed to start", server.awaitStart(5, TimeUnit.SECONDS));
86+
87+
boolean errorCaught = false;
88+
long start = System.currentTimeMillis();
89+
try (QwpWebSocketSender sender = QwpWebSocketSender.connectAsync(
90+
"localhost", port, false, 0, 0, 0)) {
91+
sender.table("test")
92+
.longColumn("value", 1)
93+
.atNow();
94+
sender.flush();
95+
} catch (Exception e) {
96+
errorCaught = true;
97+
Assert.assertTrue(
98+
e.getMessage().contains("closed")
99+
|| e.getMessage().contains("Error in send queue")
100+
|| e.getMessage().contains("failed")
101+
);
102+
}
103+
104+
long duration = System.currentTimeMillis() - start;
105+
Assert.assertTrue("Expected async close error", errorCaught);
106+
Assert.assertTrue("Flush should fail quickly on close [duration=" + duration + "ms]", duration < 10_000);
107+
}
108+
}
109+
110+
/**
111+
* Test that flush blocks until ACK is received.
112+
* Uses async mode to enable ACK handling via InFlightWindow.
113+
*/
114+
@Test
115+
public void testFlushBlocksUntilAcked() throws Exception {
116+
final long DELAY_MS = 300; // 300ms delay before ACK
117+
DelayedAckHandler handler = new DelayedAckHandler(DELAY_MS);
118+
119+
int port = TEST_PORT + 10;
120+
try (TestWebSocketServer server = new TestWebSocketServer(port, handler)) {
121+
server.start();
122+
Assert.assertTrue("Server failed to start", server.awaitStart(5, TimeUnit.SECONDS));
123+
124+
try (QwpWebSocketSender sender = QwpWebSocketSender.connectAsync(
125+
"localhost", port, false, 0, 0, 0)) {
126+
127+
sender.table("test")
128+
.longColumn("value", 42)
129+
.atNow();
130+
131+
long startTime = System.currentTimeMillis();
132+
sender.flush();
133+
long duration = System.currentTimeMillis() - startTime;
134+
135+
Assert.assertTrue("Flush should have waited for ACK (took " + duration + "ms, expected >= " + (DELAY_MS / 2) + "ms)",
136+
duration >= DELAY_MS / 2);
137+
138+
LOG.info("Flush waited {}ms for ACK", duration);
139+
}
140+
}
141+
}
142+
143+
@Test
144+
public void testSyncFlushFailsOnInvalidAckPayload() throws Exception {
145+
InvalidAckPayloadHandler handler = new InvalidAckPayloadHandler();
146+
int port = TEST_PORT + 22;
147+
148+
try (TestWebSocketServer server = new TestWebSocketServer(port, handler)) {
149+
server.start();
150+
Assert.assertTrue("Server failed to start", server.awaitStart(5, TimeUnit.SECONDS));
151+
152+
boolean errorCaught = false;
153+
long start = System.currentTimeMillis();
154+
try (QwpWebSocketSender sender = QwpWebSocketSender.connect("localhost", port, false)) {
155+
sender.table("test")
156+
.longColumn("value", 7)
157+
.atNow();
158+
sender.flush();
159+
} catch (Exception e) {
160+
errorCaught = true;
161+
Assert.assertTrue(
162+
e.getMessage().contains("Invalid ACK response payload")
163+
|| e.getMessage().contains("Failed to parse ACK response")
164+
);
165+
}
166+
167+
long duration = System.currentTimeMillis() - start;
168+
Assert.assertTrue("Expected invalid ACK error in sync mode", errorCaught);
169+
Assert.assertTrue("Sync invalid ACK path should fail quickly [duration=" + duration + "ms]", duration < 10_000);
170+
}
171+
}
172+
173+
@Test
174+
public void testSyncFlushIgnoresPingAndWaitsForAck() throws Exception {
175+
final long ackDelayMs = 300;
176+
PingThenDelayedAckHandler handler = new PingThenDelayedAckHandler(ackDelayMs);
177+
int port = TEST_PORT + 23;
178+
179+
try (TestWebSocketServer server = new TestWebSocketServer(port, handler)) {
180+
server.start();
181+
Assert.assertTrue("Server failed to start", server.awaitStart(5, TimeUnit.SECONDS));
182+
183+
try (QwpWebSocketSender sender = QwpWebSocketSender.connect("localhost", port, false)) {
184+
sender.table("test")
185+
.longColumn("value", 11)
186+
.atNow();
187+
188+
long start = System.currentTimeMillis();
189+
sender.flush();
190+
long duration = System.currentTimeMillis() - start;
191+
192+
Assert.assertTrue("Flush returned too early [duration=" + duration + "ms]", duration >= ackDelayMs / 2);
193+
}
194+
}
195+
}
196+
197+
/**
198+
* Creates a binary ACK response using WebSocketResponse format.
199+
* Format: status (1 byte) + sequence (8 bytes little-endian)
200+
*/
201+
private static byte[] createAckResponse(long sequence) {
202+
byte[] response = new byte[WebSocketResponse.MIN_RESPONSE_SIZE];
203+
204+
// Status OK (0)
205+
response[0] = WebSocketResponse.STATUS_OK;
206+
207+
// Sequence (little-endian)
208+
response[1] = (byte) (sequence & 0xFF);
209+
response[2] = (byte) ((sequence >> 8) & 0xFF);
210+
response[3] = (byte) ((sequence >> 16) & 0xFF);
211+
response[4] = (byte) ((sequence >> 24) & 0xFF);
212+
response[5] = (byte) ((sequence >> 32) & 0xFF);
213+
response[6] = (byte) ((sequence >> 40) & 0xFF);
214+
response[7] = (byte) ((sequence >> 48) & 0xFF);
215+
response[8] = (byte) ((sequence >> 56) & 0xFF);
216+
217+
return response;
218+
}
219+
220+
private static class ClosingServerHandler implements TestWebSocketServer.WebSocketServerHandler {
221+
@Override
222+
public void onBinaryMessage(TestWebSocketServer.ClientHandler client, byte[] data) {
223+
try {
224+
client.sendClose(WebSocketCloseCode.GOING_AWAY, "bye");
225+
} catch (IOException e) {
226+
LOG.error("Failed to send close frame", e);
227+
}
228+
}
229+
}
230+
231+
/**
232+
* Server handler that delays ACKs to test blocking behavior.
233+
*/
234+
private static class DelayedAckHandler implements TestWebSocketServer.WebSocketServerHandler {
235+
private final long delayMs;
236+
private final AtomicLong nextSequence = new AtomicLong(0);
237+
238+
DelayedAckHandler(long delayMs) {
239+
this.delayMs = delayMs;
240+
}
241+
242+
@Override
243+
public void onBinaryMessage(TestWebSocketServer.ClientHandler client, byte[] data) {
244+
long sequence = nextSequence.getAndIncrement();
245+
246+
LOG.debug("Server delaying ACK by {}ms", delayMs);
247+
248+
new Thread(() -> {
249+
try {
250+
Thread.sleep(delayMs);
251+
byte[] ackResponse = createAckResponse(sequence);
252+
client.sendBinary(ackResponse);
253+
LOG.debug("Server sent delayed ACK for seq {}", sequence);
254+
} catch (Exception e) {
255+
LOG.error("Failed to send delayed ACK", e);
256+
}
257+
}).start();
258+
}
259+
}
260+
261+
private static class InvalidAckPayloadHandler implements TestWebSocketServer.WebSocketServerHandler {
262+
@Override
263+
public void onBinaryMessage(TestWebSocketServer.ClientHandler client, byte[] data) {
264+
try {
265+
client.sendBinary(new byte[]{1, 2, 3});
266+
} catch (IOException e) {
267+
LOG.error("Failed to send invalid payload", e);
268+
}
269+
}
270+
}
271+
272+
private static class PingThenDelayedAckHandler implements TestWebSocketServer.WebSocketServerHandler {
273+
private final long delayMs;
274+
private final AtomicLong nextSequence = new AtomicLong(0);
275+
276+
private PingThenDelayedAckHandler(long delayMs) {
277+
this.delayMs = delayMs;
278+
}
279+
280+
@Override
281+
public void onBinaryMessage(TestWebSocketServer.ClientHandler client, byte[] data) {
282+
long sequence = nextSequence.getAndIncrement();
283+
try {
284+
client.sendPing(new byte[]{42});
285+
} catch (IOException e) {
286+
LOG.error("Failed to send ping", e);
287+
}
288+
289+
new Thread(() -> {
290+
try {
291+
Thread.sleep(delayMs);
292+
client.sendBinary(createAckResponse(sequence));
293+
} catch (Exception e) {
294+
LOG.error("Failed to send delayed ACK", e);
295+
}
296+
}).start();
297+
}
298+
}
299+
}

0 commit comments

Comments
 (0)