Skip to content

Commit 6fb1a25

Browse files
committed
remove estimator bandaid
1 parent 872e0c7 commit 6fb1a25

2 files changed

Lines changed: 112 additions & 2 deletions

File tree

core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpUdpSender.java

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,6 @@
6565
*/
6666
public class QwpUdpSender implements Sender {
6767
private static final int VARINT_INT_UPPER_BOUND = 5;
68-
private static final int SAFETY_MARGIN_BYTES = 8;
6968
private static final Logger LOG = LoggerFactory.getLogger(QwpUdpSender.class);
7069

7170
private final UdpLineChannel channel;
@@ -85,15 +84,24 @@ public class QwpUdpSender implements Sender {
8584
private int inProgressRowValueCount;
8685
private QwpTableBuffer currentTableBuffer;
8786
private String currentTableName;
87+
88+
// prefix* arrays: per-column snapshots captured before the in-progress row,
89+
// used to encode and flush only the committed prefix when a row is still being built.
90+
// Indexed by column index. -1 means the column has no in-progress data.
8891
private int[] prefixArrayDataOffsetBefore = new int[8];
8992
private int[] prefixArrayShapeOffsetBefore = new int[8];
9093
private long[] prefixStringDataSizeBefore = new long[8];
9194
private int[] prefixSymbolDictionarySizeBefore = new int[8];
9295
private int[] prefixSizeBefore = new int[8];
9396
private int[] prefixValueCountBefore = new int[8];
97+
98+
// columns that need NULL/default fill for the current row (columns not yet written to)
9499
private QwpTableBuffer.ColumnBuffer[] rowFillColumns = new QwpTableBuffer.ColumnBuffer[8];
100+
// maps column index -> 1-based position in rowFillColumns (0 means absent)
95101
private int[] rowFillColumnPositions = new int[8];
102+
// per-column marks to detect duplicate writes within a single row; compared against currentRowMark
96103
private int[] stagedColumnMarks = new int[8];
104+
// monotonically increasing mark; incremented per row to invalidate stagedColumnMarks without clearing
97105
private int currentRowMark = 1;
98106
private int rowFillColumnCount;
99107
private int inProgressColumnCount;
@@ -845,7 +853,6 @@ private long estimateBaseForCurrentSchema() {
845853
estimate += 1;
846854
}
847855
}
848-
estimate += SAFETY_MARGIN_BYTES;
849856
return estimate;
850857
}
851858

@@ -1085,6 +1092,11 @@ public QwpTableBuffer currentTableBufferForTest() {
10851092
return currentTableBuffer;
10861093
}
10871094

1095+
@TestOnly
1096+
public long committedDatagramEstimateForTest() {
1097+
return committedDatagramEstimate;
1098+
}
1099+
10881100
private void captureInProgressColumnPrefixState() {
10891101
int columnCount = currentTableBuffer.getColumnCount();
10901102
ensurePrefixColumnCapacity(columnCount);
@@ -1299,6 +1311,11 @@ private static int utf8Length(CharSequence s) {
12991311
return len;
13001312
}
13011313

1314+
/**
1315+
* Captures the state of a column buffer at the moment the in-progress row starts
1316+
* writing to it. The snapshot allows the sender to compute incremental datagram
1317+
* size estimates and to roll back the column to its pre-row state on error or cancel.
1318+
*/
13021319
private static final class InProgressColumnState {
13031320
private int arrayDataOffsetBefore;
13041321
private int arrayShapeOffsetBefore;

core/src/test/java/io/questdb/client/test/cutlass/qwp/client/QwpUdpSenderTest.java

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,14 @@ public void testBoundedSenderMixedNullablePaddingPreservesRowsAndPacketLimit() t
139139
});
140140
}
141141

142+
@Test
143+
public void testEstimateMatchesActualEncodedSize() throws Exception {
144+
assertMemoryLeak(() -> {
145+
auditEstimateWithStableSchemaAndNullableValues();
146+
auditEstimateAcrossSymbolDictionaryVarintBoundary();
147+
});
148+
}
149+
142150
@Test
143151
public void testBoundedSenderNullableStringNullAcrossOverflowBoundaryPreservesRowsAndPacketLimit() throws Exception {
144152
assertMemoryLeak(() -> {
@@ -1702,6 +1710,91 @@ private static DoubleArrayValue doubleArrayValue(int[] shape, double... values)
17021710
return new DoubleArrayValue(dims, elems);
17031711
}
17041712

1713+
private static void auditEstimateAcrossSymbolDictionaryVarintBoundary() throws Exception {
1714+
ArrayList<ScenarioRow> rows = new ArrayList<>();
1715+
for (int i = 0; i < 160; i++) {
1716+
final int rowId = i;
1717+
rows.add(row(
1718+
"sym_audit",
1719+
sender -> sender.table("sym_audit")
1720+
.longColumn("x", rowId)
1721+
.symbol("sym", "sym-" + rowId)
1722+
.atNow(),
1723+
"x", (long) rowId,
1724+
"sym", "sym-" + rowId
1725+
));
1726+
}
1727+
assertEstimateAtLeastActual(rows);
1728+
}
1729+
1730+
private static void auditEstimateWithStableSchemaAndNullableValues() throws Exception {
1731+
ArrayList<ScenarioRow> rows = new ArrayList<>();
1732+
for (int i = 0; i < 96; i++) {
1733+
final int rowId = i;
1734+
final String stringValue = (i & 1) == 0 ? "tokyo-" + i + "-" + repeat('x', (i % 31) + 1) : null;
1735+
final long[] longArray = i % 3 == 0 ? new long[]{i, i + 1L, i + 2L} : null;
1736+
final double[][] doubleArray = i % 5 == 0 ? new double[][]{{i + 0.5, i + 1.5}, {i + 2.5, i + 3.5}} : null;
1737+
final Decimal64 decimal64 = i % 7 == 0 ? Decimal64.fromLong(i * 100L + 7, 2) : null;
1738+
final Decimal128 decimal128 = i % 11 == 0 ? Decimal128.fromLong(i * 1000L + 11, 4) : null;
1739+
final Decimal256 decimal256 = i % 13 == 0 ? Decimal256.fromLong(i * 10000L + 13, 3) : null;
1740+
1741+
rows.add(row(
1742+
"audit",
1743+
sender -> {
1744+
sender.table("audit")
1745+
.longColumn("l", rowId)
1746+
.doubleColumn("d", rowId + 0.25)
1747+
.symbol("sym", "stable");
1748+
if (stringValue != null) {
1749+
sender.stringColumn("s", stringValue);
1750+
}
1751+
if (longArray != null) {
1752+
sender.longArray("la", longArray);
1753+
}
1754+
if (doubleArray != null) {
1755+
sender.doubleArray("da", doubleArray);
1756+
}
1757+
if (decimal64 != null) {
1758+
sender.decimalColumn("d64", decimal64);
1759+
}
1760+
if (decimal128 != null) {
1761+
sender.decimalColumn("d128", decimal128);
1762+
}
1763+
if (decimal256 != null) {
1764+
sender.decimalColumn("d256", decimal256);
1765+
}
1766+
sender.at(rowId + 1L, ChronoUnit.MICROS);
1767+
},
1768+
"l", (long) rowId,
1769+
"d", rowId + 0.25,
1770+
"sym", "stable",
1771+
"s", stringValue,
1772+
"la", longArray == null ? null : longArrayValue(shape(longArray.length), longArray),
1773+
"da", doubleArray == null ? null : doubleArrayValue(shape(doubleArray.length, doubleArray[0].length), flatten(doubleArray)),
1774+
"d64", decimal64 == null ? null : decimal(i * 100L + 7, 2),
1775+
"d128", decimal128 == null ? null : decimal(i * 1000L + 11, 4),
1776+
"d256", decimal256 == null ? null : decimal(i * 10000L + 13, 3),
1777+
"", rowId + 1L
1778+
));
1779+
}
1780+
assertEstimateAtLeastActual(rows);
1781+
}
1782+
1783+
private static void assertEstimateAtLeastActual(List<ScenarioRow> rows) throws Exception {
1784+
CapturingNetworkFacade nf = new CapturingNetworkFacade();
1785+
try (QwpUdpSender sender = new QwpUdpSender(nf, 0, 0, 9000, 1, 1024 * 1024)) {
1786+
for (int i = 0; i < rows.size(); i++) {
1787+
rows.get(i).writer.accept(sender);
1788+
long estimate = sender.committedDatagramEstimateForTest();
1789+
long actual = fullPacketSize(rows.subList(0, i + 1));
1790+
Assert.assertTrue(
1791+
"row " + i + " estimate underflow: estimate=" + estimate + ", actual=" + actual,
1792+
estimate >= actual
1793+
);
1794+
}
1795+
}
1796+
}
1797+
17051798
private static List<DecodedRow> expectedRows(List<ScenarioRow> rows) {
17061799
ArrayList<DecodedRow> expected = new ArrayList<>(rows.size());
17071800
for (ScenarioRow row : rows) {

0 commit comments

Comments
 (0)