Skip to content

Commit 999464c

Browse files
committed
Allow timestamp-only rows in QWP UDP sender
1 parent ca47633 commit 999464c

3 files changed

Lines changed: 49 additions & 24 deletions

File tree

core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpUdpSender.java

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -719,9 +719,6 @@ private void appendLongArrayValue(QwpTableBuffer.ColumnBuffer column, Object val
719719
}
720720

721721
private void atMicros(long timestampMicros) {
722-
if (inProgressRowValueCount == 0) {
723-
throw new LineSenderException("no columns were provided");
724-
}
725722
try {
726723
stageDesignatedTimestampValue(timestampMicros, false);
727724
commitCurrentRow();
@@ -732,9 +729,6 @@ private void atMicros(long timestampMicros) {
732729
}
733730

734731
private void atNanos(long timestampNanos) {
735-
if (inProgressRowValueCount == 0) {
736-
throw new LineSenderException("no columns were provided");
737-
}
738732
try {
739733
stageDesignatedTimestampValue(timestampNanos, true);
740734
commitCurrentRow();
@@ -817,10 +811,6 @@ private void clearTransientRowState() {
817811
}
818812

819813
private void commitCurrentRow() {
820-
if (inProgressRowValueCount == 0) {
821-
throw new LineSenderException("no columns were provided");
822-
}
823-
824814
long estimate = 0;
825815
long committedEstimateBeforeRow = 0;
826816
int targetRows = currentTableBuffer.getRowCount() + 1;

core/src/test/java/io/questdb/client/test/cutlass/qwp/client/QwpUdpSenderTest.java

Lines changed: 21 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1580,6 +1580,27 @@ public void testSymbolPrefixFlushKeepsSingleRetainedDictionaryEntry() throws Exc
15801580
});
15811581
}
15821582

1583+
@Test
1584+
public void testTimestampOnlyRows() throws Exception {
1585+
assertMemoryLeak(() -> {
1586+
CapturingNetworkFacade nf = new CapturingNetworkFacade();
1587+
try (QwpUdpSender sender = new QwpUdpSender(nf, 0, 0, 9000, 1)) {
1588+
// at() with no other columns: designated timestamp is staged
1589+
sender.table("t").at(1_000L, ChronoUnit.MICROS);
1590+
// atNow() with no other columns: server assigns the timestamp
1591+
sender.table("t").atNow();
1592+
sender.flush();
1593+
}
1594+
1595+
List<DecodedRow> rows = decodeRows(nf.packets);
1596+
Assert.assertEquals("expected 2 timestamp-only rows", 2, rows.size());
1597+
assertRowsEqual(Arrays.asList(
1598+
decodedRow("t", "", 1_000L),
1599+
decodedRow("t", "", null)
1600+
), rows);
1601+
});
1602+
}
1603+
15831604
@Test
15841605
public void testUnboundedSenderOmittedNullableAndNonNullableColumnsPreservesRows() throws Exception {
15851606
assertMemoryLeak(() -> {
@@ -1712,20 +1733,6 @@ public void testUtf8StringAndSymbolStagingSupportsCancelAndPacketSizing() throws
17121733
});
17131734
}
17141735

1715-
@Test
1716-
public void testZeroColumnRowsThrow() throws Exception {
1717-
assertMemoryLeak(() -> {
1718-
CapturingNetworkFacade nf = new CapturingNetworkFacade();
1719-
try (QwpUdpSender sender = new QwpUdpSender(nf, 0, 0, 9000, 1, 1024 * 1024)) {
1720-
sender.table("t");
1721-
1722-
assertThrowsContains("no columns were provided", sender::atNow);
1723-
assertThrowsContains("no columns were provided", () -> sender.at(1, ChronoUnit.MICROS));
1724-
assertThrowsContains("no columns were provided", () -> sender.at(1, ChronoUnit.NANOS));
1725-
}
1726-
});
1727-
}
1728-
17291736
private static void assertEstimateAtLeastActual(List<ScenarioRow> rows) throws Exception {
17301737
CapturingNetworkFacade nf = new CapturingNetworkFacade();
17311738
try (QwpUdpSender sender = new QwpUdpSender(nf, 0, 0, 9000, 1, 1024 * 1024)) {

core/src/test/java/io/questdb/client/test/cutlass/qwp/client/QwpWebSocketSenderStateTest.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -218,6 +218,34 @@ public void testResetClearsAllTableBuffersAndPendingRowCount() throws Exception
218218
});
219219
}
220220

221+
@Test
222+
public void testTimestampOnlyRows() throws Exception {
223+
assertMemoryLeak(() -> {
224+
// autoFlushRows=10_000 prevents auto-flush; bytes and interval disabled
225+
QwpWebSocketSender sender = QwpWebSocketSender.createForTesting(
226+
"localhost", 0, 10_000, 0, 0L, 1
227+
);
228+
try {
229+
setField(sender, "connected", true);
230+
setField(sender, "inFlightWindow", new InFlightWindow(1, InFlightWindow.DEFAULT_TIMEOUT_MS));
231+
232+
// at(micros) with no other columns
233+
sender.table("t").at(1_000L, ChronoUnit.MICROS);
234+
// atNow() with no other columns
235+
sender.table("t").atNow();
236+
237+
QwpTableBuffer tb = sender.getTableBuffer("t");
238+
Assert.assertEquals(
239+
"at() and atNow() with no other columns must each buffer a row",
240+
2, tb.getRowCount()
241+
);
242+
} finally {
243+
setField(sender, "connected", false);
244+
sender.close();
245+
}
246+
});
247+
}
248+
221249
private static void setField(Object target, String fieldName, Object value) throws Exception {
222250
Field f = target.getClass().getDeclaredField(fieldName);
223251
f.setAccessible(true);

0 commit comments

Comments
 (0)