Skip to content

Commit 970d717

Browse files
committed
implement support for storing global-symbol-IDs-only in QwpTableBuffer
this avoid extra work when just global dictionary is needed we need to keep the per-column dictionaries due to UDP. for now.
1 parent 8afd5f6 commit 970d717

4 files changed

Lines changed: 163 additions & 11 deletions

File tree

core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpWebSocketSender.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1344,10 +1344,10 @@ private boolean shouldAutoFlush() {
13441344
if (autoFlushBytes > 0 && getPendingBytes() >= autoFlushBytes) {
13451345
return true;
13461346
}
1347-
if (autoFlushIntervalNanos > 0) {
1348-
long ageNanos = System.nanoTime() - firstPendingRowTimeNanos;
1349-
return ageNanos >= autoFlushIntervalNanos;
1350-
}
1347+
// if (autoFlushIntervalNanos > 0) {
1348+
// long ageNanos = System.nanoTime() - firstPendingRowTimeNanos;
1349+
// return ageNanos >= autoFlushIntervalNanos;
1350+
// }
13511351
return false;
13521352
}
13531353

core/src/main/java/io/questdb/client/cutlass/qwp/protocol/QwpTableBuffer.java

Lines changed: 36 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -583,7 +583,7 @@ public static class ColumnBuffer implements QuietCloseable {
583583
private byte[] arrayDims;
584584
private int arrayShapeOffset;
585585
private int[] arrayShapes;
586-
// Off-heap auxiliary buffer for global symbol IDs (SYMBOL type only)
586+
// Optional auxiliary buffer used by symbol encoders that need sideband IDs.
587587
private OffHeapAppendMemory auxBuffer;
588588
// Off-heap data buffer for fixed-width types
589589
private OffHeapAppendMemory dataBuffer;
@@ -605,6 +605,7 @@ public static class ColumnBuffer implements QuietCloseable {
605605
// Off-heap storage for string/varchar column data
606606
private OffHeapAppendMemory stringOffsets;
607607
// Symbol specific (dictionary stays on-heap)
608+
private boolean storeGlobalSymbolIdsOnly;
608609
private CharSequenceIntHashMap symbolDict;
609610
private ObjList<String> symbolList;
610611
private StringSink symbolLookupSink;
@@ -1101,6 +1102,9 @@ public void addSymbol(CharSequence value) {
11011102
return;
11021103
}
11031104
ensureNullBitmapCapacity();
1105+
if (storeGlobalSymbolIdsOnly) {
1106+
throw new LineSenderException("column '" + name + "' cannot mix global symbol IDs with local symbol dictionary values");
1107+
}
11041108
int idx = getOrAddLocalSymbol(value);
11051109
dataBuffer.putInt(idx);
11061110
valueCount++;
@@ -1129,6 +1133,9 @@ public void addSymbolUtf8(long ptr, int len) {
11291133
return;
11301134
}
11311135
ensureNullBitmapCapacity();
1136+
if (storeGlobalSymbolIdsOnly) {
1137+
throw new LineSenderException("column '" + name + "' cannot mix global symbol IDs with local symbol dictionary values");
1138+
}
11321139
int idx = getOrAddLocalSymbol(lookupSink);
11331140
dataBuffer.putInt(idx);
11341141
valueCount++;
@@ -1141,13 +1148,24 @@ public void addSymbolWithGlobalId(CharSequence value, int globalId) {
11411148
return;
11421149
}
11431150
ensureNullBitmapCapacity();
1144-
int localIdx = getOrAddLocalSymbol(value);
1145-
dataBuffer.putInt(localIdx);
1146-
1147-
if (auxBuffer == null) {
1148-
auxBuffer = new OffHeapAppendMemory(64);
1151+
if (!storeGlobalSymbolIdsOnly) {
1152+
if (symbolList != null && symbolList.size() > 0) {
1153+
int localIdx = getOrAddLocalSymbol(value);
1154+
dataBuffer.putInt(localIdx);
1155+
if (auxBuffer == null) {
1156+
auxBuffer = new OffHeapAppendMemory(64);
1157+
}
1158+
auxBuffer.putInt(globalId);
1159+
if (globalId > maxGlobalSymbolId) {
1160+
maxGlobalSymbolId = globalId;
1161+
}
1162+
valueCount++;
1163+
size++;
1164+
return;
1165+
}
1166+
storeGlobalSymbolIdsOnly = true;
11491167
}
1150-
auxBuffer.putInt(globalId);
1168+
dataBuffer.putInt(globalId);
11511169

11521170
if (globalId > maxGlobalSymbolId) {
11531171
maxGlobalSymbolId = globalId;
@@ -1312,6 +1330,9 @@ public String[] getSymbolDictionary() {
13121330
}
13131331

13141332
public int getSymbolDictionarySize() {
1333+
if (storeGlobalSymbolIdsOnly) {
1334+
return 0;
1335+
}
13151336
return symbolList != null ? symbolList.size() : 0;
13161337
}
13171338

@@ -1364,6 +1385,7 @@ public void reset() {
13641385
symbolDict.clear();
13651386
symbolList.clear();
13661387
}
1388+
storeGlobalSymbolIdsOnly = false;
13671389
maxGlobalSymbolId = -1;
13681390
arrayShapeOffset = 0;
13691391
arrayDataOffset = 0;
@@ -1480,6 +1502,7 @@ public void truncateTo(int newSize) {
14801502
decimalScale = -1;
14811503
geohashPrecision = -1;
14821504
maxGlobalSymbolId = -1;
1505+
storeGlobalSymbolIdsOnly = false;
14831506
if (symbolDict != null) {
14841507
symbolDict.clear();
14851508
symbolList.clear();
@@ -1729,6 +1752,7 @@ private void resetEmptyMetadata() {
17291752
decimalScale = -1;
17301753
geohashPrecision = -1;
17311754
maxGlobalSymbolId = -1;
1755+
storeGlobalSymbolIdsOnly = false;
17321756
if (symbolDict != null) {
17331757
symbolDict.clear();
17341758
symbolList.clear();
@@ -1798,6 +1822,11 @@ private void retainStringValue(int valueIndex) {
17981822
private void retainSymbolValue(int valueIndex) {
17991823
retainFixedWidthValue(valueIndex);
18001824

1825+
if (storeGlobalSymbolIdsOnly) {
1826+
maxGlobalSymbolId = Unsafe.getUnsafe().getInt(dataBuffer.pageAddress());
1827+
return;
1828+
}
1829+
18011830
int localIndex = Unsafe.getUnsafe().getInt(dataBuffer.pageAddress());
18021831
String symbol = symbolList.get(localIndex);
18031832

core/src/test/java/io/questdb/client/test/cutlass/qwp/client/QwpWebSocketEncoderTest.java

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@
3232
import org.junit.Assert;
3333
import org.junit.Test;
3434

35+
import java.nio.charset.StandardCharsets;
36+
3537
import static io.questdb.client.cutlass.qwp.protocol.QwpConstants.*;
3638
import static io.questdb.client.test.tools.TestUtils.assertMemoryLeak;
3739

@@ -961,6 +963,43 @@ public void testEncodeWithDeltaDict_withConfirmed_sendsOnlyNew() throws Exceptio
961963
});
962964
}
963965

966+
@Test
967+
public void testEncodeWithDeltaDict_readsGlobalIdsFromDataBuffer() throws Exception {
968+
assertMemoryLeak(() -> {
969+
try (QwpWebSocketEncoder encoder = new QwpWebSocketEncoder();
970+
QwpTableBuffer buffer = new QwpTableBuffer("test_table")) {
971+
GlobalSymbolDictionary globalDict = new GlobalSymbolDictionary();
972+
for (int i = 0; i < 8; i++) {
973+
globalDict.getOrAddSymbol("SYM_" + i);
974+
}
975+
976+
QwpTableBuffer.ColumnBuffer col = buffer.getOrCreateColumn("ticker", TYPE_SYMBOL, false);
977+
col.addSymbolWithGlobalId("SYM_5", 5);
978+
buffer.nextRow();
979+
col.addSymbolWithGlobalId("SYM_7", 7);
980+
buffer.nextRow();
981+
982+
Assert.assertEquals(0, col.getAuxDataAddress());
983+
984+
int size = encoder.encodeWithDeltaDict(buffer, globalDict, 7, 7, false);
985+
Assert.assertTrue(size > 12);
986+
987+
Cursor cursor = new Cursor(encoder.getBuffer().getBufferPtr() + HEADER_SIZE);
988+
Assert.assertEquals(8, cursor.readVarint());
989+
Assert.assertEquals(0, cursor.readVarint());
990+
991+
Assert.assertEquals("test_table", cursor.readString());
992+
Assert.assertEquals(2, cursor.readVarint());
993+
Assert.assertEquals(1, cursor.readVarint());
994+
Assert.assertEquals(SCHEMA_MODE_FULL, cursor.readByte());
995+
Assert.assertEquals("ticker", cursor.readString());
996+
Assert.assertEquals(TYPE_SYMBOL, cursor.readByte());
997+
Assert.assertEquals(5, cursor.readVarint());
998+
Assert.assertEquals(7, cursor.readVarint());
999+
}
1000+
});
1001+
}
1002+
9641003
// ==================== SCHEMA REFERENCE TESTS ====================
9651004

9661005
@Test
@@ -1359,4 +1398,40 @@ public void testReset() throws Exception {
13591398
}
13601399
});
13611400
}
1401+
1402+
private static final class Cursor {
1403+
private long address;
1404+
1405+
private Cursor(long address) {
1406+
this.address = address;
1407+
}
1408+
1409+
private byte readByte() {
1410+
return Unsafe.getUnsafe().getByte(address++);
1411+
}
1412+
1413+
private String readString() {
1414+
int len = readVarint();
1415+
byte[] bytes = new byte[len];
1416+
for (int i = 0; i < len; i++) {
1417+
bytes[i] = Unsafe.getUnsafe().getByte(address + i);
1418+
}
1419+
String value = new String(bytes, StandardCharsets.UTF_8);
1420+
address += len;
1421+
return value;
1422+
}
1423+
1424+
private int readVarint() {
1425+
int value = 0;
1426+
int shift = 0;
1427+
while (true) {
1428+
int b = Unsafe.getUnsafe().getByte(address++) & 0xff;
1429+
value |= (b & 0x7f) << shift;
1430+
if ((b & 0x80) == 0) {
1431+
return value;
1432+
}
1433+
shift += 7;
1434+
}
1435+
}
1436+
}
13621437
}

core/src/test/java/io/questdb/client/test/cutlass/qwp/protocol/QwpTableBufferTest.java

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -328,6 +328,29 @@ public void testAddSymbolUtf8ReusesExistingDictionaryEntry() throws Exception {
328328
});
329329
}
330330

331+
@Test
332+
public void testAddSymbolWithGlobalIdStoresOnlyGlobalIds() throws Exception {
333+
assertMemoryLeak(() -> {
334+
try (QwpTableBuffer table = new QwpTableBuffer("test")) {
335+
QwpTableBuffer.ColumnBuffer col = table.getOrCreateColumn("sym", QwpConstants.TYPE_SYMBOL, true);
336+
col.addSymbolWithGlobalId("alpha", 7);
337+
table.nextRow();
338+
col.addSymbolWithGlobalId("beta", 11);
339+
table.nextRow();
340+
341+
assertEquals(2, col.getSize());
342+
assertEquals(2, col.getValueCount());
343+
assertEquals(0, col.getSymbolDictionarySize());
344+
assertEquals(0, col.getAuxDataAddress());
345+
assertEquals(11, col.getMaxGlobalSymbolId());
346+
347+
long dataAddress = col.getDataAddress();
348+
assertEquals(7, Unsafe.getUnsafe().getInt(dataAddress));
349+
assertEquals(11, Unsafe.getUnsafe().getInt(dataAddress + Integer.BYTES));
350+
}
351+
});
352+
}
353+
331354
@Test
332355
public void testCancelRowResetsDecimalScaleOnLateAddedColumn() throws Exception {
333356
assertMemoryLeak(() -> {
@@ -406,6 +429,31 @@ public void testCancelRowResetsSymbolDictOnLateAddedColumn() throws Exception {
406429
});
407430
}
408431

432+
@Test
433+
public void testCancelRowRetainsGlobalSymbolIdWithoutLocalDictionary() throws Exception {
434+
assertMemoryLeak(() -> {
435+
try (QwpTableBuffer table = new QwpTableBuffer("test")) {
436+
table.getOrCreateColumn("a", QwpConstants.TYPE_LONG, false).addLong(0);
437+
table.nextRow();
438+
439+
table.getOrCreateColumn("a", QwpConstants.TYPE_LONG, false).addLong(1);
440+
QwpTableBuffer.ColumnBuffer colS = table.getOrCreateColumn("s", QwpConstants.TYPE_SYMBOL, true);
441+
colS.addSymbolWithGlobalId("stale", 4);
442+
table.cancelCurrentRow();
443+
444+
table.getOrCreateColumn("a", QwpConstants.TYPE_LONG, false).addLong(1);
445+
colS.addSymbolWithGlobalId("fresh", 9);
446+
table.nextRow();
447+
448+
assertEquals(2, colS.getSize());
449+
assertEquals(1, colS.getValueCount());
450+
assertEquals(0, colS.getSymbolDictionarySize());
451+
assertEquals(9, colS.getMaxGlobalSymbolId());
452+
assertEquals(9, Unsafe.getUnsafe().getInt(colS.getDataAddress()));
453+
}
454+
});
455+
}
456+
409457
@Test
410458
public void testCancelRowRewindsDoubleArrayOffsets() throws Exception {
411459
assertMemoryLeak(() -> {

0 commit comments

Comments
 (0)