Skip to content

Commit 8c72f27

Browse files
committed
Merge remote-tracking branch 'origin/jh_experiment_new_ilp' into jh_experiment_new_ilp
2 parents 504721d + 3fa1a07 commit 8c72f27

3 files changed

Lines changed: 173 additions & 18 deletions

File tree

core/src/main/java/io/questdb/client/cutlass/qwp/protocol/QwpTableBuffer.java

Lines changed: 50 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -583,7 +583,7 @@ public static class ColumnBuffer implements QuietCloseable {
583583
private byte[] arrayDims;
584584
private int arrayShapeOffset;
585585
private int[] arrayShapes;
586-
// Off-heap auxiliary buffer for global symbol IDs (SYMBOL type only)
586+
// Optional auxiliary buffer used by symbol encoders that need sideband IDs.
587587
private OffHeapAppendMemory auxBuffer;
588588
// Off-heap data buffer for fixed-width types
589589
private OffHeapAppendMemory dataBuffer;
@@ -601,10 +601,11 @@ public static class ColumnBuffer implements QuietCloseable {
601601
private long nullBufPtr;
602602
private QwpWebSocketSender sender;
603603
private int size; // Total row count (including nulls)
604+
// Symbol specific (dictionary stays on-heap)
605+
private boolean storeGlobalSymbolIdsOnly;
604606
private OffHeapAppendMemory stringData;
605607
// Off-heap storage for string/varchar column data
606608
private OffHeapAppendMemory stringOffsets;
607-
// Symbol specific (dictionary stays on-heap)
608609
private CharSequenceIntHashMap symbolDict;
609610
private ObjList<String> symbolList;
610611
private StringSink symbolLookupSink;
@@ -1087,6 +1088,9 @@ public void addSymbol(CharSequence value) {
10871088
addSymbolWithGlobalId(value, globalId);
10881089
return;
10891090
}
1091+
if (storeGlobalSymbolIdsOnly) {
1092+
throw new LineSenderException("column '" + name + "' cannot mix global symbol IDs with local symbol dictionary values");
1093+
}
10901094
int idx = getOrAddLocalSymbol(value);
10911095
dataBuffer.putInt(idx);
10921096
valueCount++;
@@ -1114,6 +1118,9 @@ public void addSymbolUtf8(long ptr, int len) {
11141118
addSymbolWithGlobalId(lookupSink, globalId);
11151119
return;
11161120
}
1121+
if (storeGlobalSymbolIdsOnly) {
1122+
throw new LineSenderException("column '" + name + "' cannot mix global symbol IDs with local symbol dictionary values");
1123+
}
11171124
int idx = getOrAddLocalSymbol(lookupSink);
11181125
dataBuffer.putInt(idx);
11191126
valueCount++;
@@ -1125,13 +1132,27 @@ public void addSymbolWithGlobalId(CharSequence value, int globalId) {
11251132
addNull();
11261133
return;
11271134
}
1128-
int localIdx = getOrAddLocalSymbol(value);
1129-
dataBuffer.putInt(localIdx);
1130-
11311135
if (auxBuffer == null) {
11321136
auxBuffer = new OffHeapAppendMemory(64);
11331137
}
1134-
auxBuffer.putInt(globalId);
1138+
if (!storeGlobalSymbolIdsOnly) {
1139+
if (symbolList != null && symbolList.size() > 0) {
1140+
int localIdx = getOrAddLocalSymbol(value);
1141+
dataBuffer.putInt(localIdx);
1142+
if (auxBuffer == null) {
1143+
auxBuffer = new OffHeapAppendMemory(64);
1144+
}
1145+
auxBuffer.putInt(globalId);
1146+
if (globalId > maxGlobalSymbolId) {
1147+
maxGlobalSymbolId = globalId;
1148+
}
1149+
valueCount++;
1150+
size++;
1151+
return;
1152+
}
1153+
storeGlobalSymbolIdsOnly = true;
1154+
}
1155+
dataBuffer.putInt(globalId);
11351156

11361157
if (globalId > maxGlobalSymbolId) {
11371158
maxGlobalSymbolId = globalId;
@@ -1174,6 +1195,18 @@ public void close() {
11741195
}
11751196
}
11761197

1198+
public void ensureNullBitmapCapacity(int minRows) {
1199+
if (nullBufPtr == 0 || nullBufCapRows >= minRows) {
1200+
return;
1201+
}
1202+
int newCapRows = Math.max(nullBufCapRows * 2, ((minRows + 63) >>> 6) << 6);
1203+
long newSizeBytes = (long) newCapRows >>> 3;
1204+
long oldSizeBytes = (long) nullBufCapRows >>> 3;
1205+
nullBufPtr = Unsafe.realloc(nullBufPtr, oldSizeBytes, newSizeBytes, MemoryTag.NATIVE_ILP_RSS);
1206+
Vect.memset(nullBufPtr + oldSizeBytes, newSizeBytes - oldSizeBytes, 0);
1207+
nullBufCapRows = newCapRows;
1208+
}
1209+
11771210
public int getArrayDataOffset() {
11781211
return arrayDataOffset;
11791212
}
@@ -1295,6 +1328,9 @@ public String[] getSymbolDictionary() {
12951328
}
12961329

12971330
public int getSymbolDictionarySize() {
1331+
if (storeGlobalSymbolIdsOnly) {
1332+
return 0;
1333+
}
12981334
return symbolList != null ? symbolList.size() : 0;
12991335
}
13001336

@@ -1347,6 +1383,7 @@ public void reset() {
13471383
symbolDict.clear();
13481384
symbolList.clear();
13491385
}
1386+
storeGlobalSymbolIdsOnly = false;
13501387
maxGlobalSymbolId = -1;
13511388
arrayShapeOffset = 0;
13521389
arrayDataOffset = 0;
@@ -1463,6 +1500,7 @@ public void truncateTo(int newSize) {
14631500
decimalScale = -1;
14641501
geohashPrecision = -1;
14651502
maxGlobalSymbolId = -1;
1503+
storeGlobalSymbolIdsOnly = false;
14661504
if (symbolDict != null) {
14671505
symbolDict.clear();
14681506
symbolList.clear();
@@ -1674,18 +1712,6 @@ private void ensureArrayCapacity(int nDims, int dataElements) {
16741712
}
16751713
}
16761714

1677-
public void ensureNullBitmapCapacity(int minRows) {
1678-
if (nullBufPtr == 0 || nullBufCapRows >= minRows) {
1679-
return;
1680-
}
1681-
int newCapRows = Math.max(nullBufCapRows * 2, ((minRows + 63) >>> 6) << 6);
1682-
long newSizeBytes = (long) newCapRows >>> 3;
1683-
long oldSizeBytes = (long) nullBufCapRows >>> 3;
1684-
nullBufPtr = Unsafe.realloc(nullBufPtr, oldSizeBytes, newSizeBytes, MemoryTag.NATIVE_ILP_RSS);
1685-
Vect.memset(nullBufPtr + oldSizeBytes, newSizeBytes - oldSizeBytes, 0);
1686-
nullBufCapRows = newCapRows;
1687-
}
1688-
16891715
private int getOrAddLocalSymbol(CharSequence value) {
16901716
int idx = symbolDict.get(value);
16911717
if (idx == CharSequenceIntHashMap.NO_ENTRY_VALUE) {
@@ -1709,6 +1735,7 @@ private void resetEmptyMetadata() {
17091735
decimalScale = -1;
17101736
geohashPrecision = -1;
17111737
maxGlobalSymbolId = -1;
1738+
storeGlobalSymbolIdsOnly = false;
17121739
if (symbolDict != null) {
17131740
symbolDict.clear();
17141741
symbolList.clear();
@@ -1778,6 +1805,11 @@ private void retainStringValue(int valueIndex) {
17781805
private void retainSymbolValue(int valueIndex) {
17791806
retainFixedWidthValue(valueIndex);
17801807

1808+
if (storeGlobalSymbolIdsOnly) {
1809+
maxGlobalSymbolId = Unsafe.getUnsafe().getInt(dataBuffer.pageAddress());
1810+
return;
1811+
}
1812+
17811813
int localIndex = Unsafe.getUnsafe().getInt(dataBuffer.pageAddress());
17821814
String symbol = symbolList.get(localIndex);
17831815

core/src/test/java/io/questdb/client/test/cutlass/qwp/client/QwpWebSocketEncoderTest.java

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@
3232
import org.junit.Assert;
3333
import org.junit.Test;
3434

35+
import java.nio.charset.StandardCharsets;
36+
3537
import static io.questdb.client.cutlass.qwp.protocol.QwpConstants.*;
3638
import static io.questdb.client.test.tools.TestUtils.assertMemoryLeak;
3739

@@ -961,6 +963,43 @@ public void testEncodeWithDeltaDict_withConfirmed_sendsOnlyNew() throws Exceptio
961963
});
962964
}
963965

966+
@Test
967+
public void testEncodeWithDeltaDict_readsGlobalIdsFromDataBuffer() throws Exception {
968+
assertMemoryLeak(() -> {
969+
try (QwpWebSocketEncoder encoder = new QwpWebSocketEncoder();
970+
QwpTableBuffer buffer = new QwpTableBuffer("test_table")) {
971+
GlobalSymbolDictionary globalDict = new GlobalSymbolDictionary();
972+
for (int i = 0; i < 8; i++) {
973+
globalDict.getOrAddSymbol("SYM_" + i);
974+
}
975+
976+
QwpTableBuffer.ColumnBuffer col = buffer.getOrCreateColumn("ticker", TYPE_SYMBOL, false);
977+
col.addSymbolWithGlobalId("SYM_5", 5);
978+
buffer.nextRow();
979+
col.addSymbolWithGlobalId("SYM_7", 7);
980+
buffer.nextRow();
981+
982+
Assert.assertEquals(0, col.getAuxDataAddress());
983+
984+
int size = encoder.encodeWithDeltaDict(buffer, globalDict, 7, 7, false);
985+
Assert.assertTrue(size > 12);
986+
987+
Cursor cursor = new Cursor(encoder.getBuffer().getBufferPtr() + HEADER_SIZE);
988+
Assert.assertEquals(8, cursor.readVarint());
989+
Assert.assertEquals(0, cursor.readVarint());
990+
991+
Assert.assertEquals("test_table", cursor.readString());
992+
Assert.assertEquals(2, cursor.readVarint());
993+
Assert.assertEquals(1, cursor.readVarint());
994+
Assert.assertEquals(SCHEMA_MODE_FULL, cursor.readByte());
995+
Assert.assertEquals("ticker", cursor.readString());
996+
Assert.assertEquals(TYPE_SYMBOL, cursor.readByte());
997+
Assert.assertEquals(5, cursor.readVarint());
998+
Assert.assertEquals(7, cursor.readVarint());
999+
}
1000+
});
1001+
}
1002+
9641003
// ==================== SCHEMA REFERENCE TESTS ====================
9651004

9661005
@Test
@@ -1359,4 +1398,40 @@ public void testReset() throws Exception {
13591398
}
13601399
});
13611400
}
1401+
1402+
private static final class Cursor {
1403+
private long address;
1404+
1405+
private Cursor(long address) {
1406+
this.address = address;
1407+
}
1408+
1409+
private byte readByte() {
1410+
return Unsafe.getUnsafe().getByte(address++);
1411+
}
1412+
1413+
private String readString() {
1414+
int len = readVarint();
1415+
byte[] bytes = new byte[len];
1416+
for (int i = 0; i < len; i++) {
1417+
bytes[i] = Unsafe.getUnsafe().getByte(address + i);
1418+
}
1419+
String value = new String(bytes, StandardCharsets.UTF_8);
1420+
address += len;
1421+
return value;
1422+
}
1423+
1424+
private int readVarint() {
1425+
int value = 0;
1426+
int shift = 0;
1427+
while (true) {
1428+
int b = Unsafe.getUnsafe().getByte(address++) & 0xff;
1429+
value |= (b & 0x7f) << shift;
1430+
if ((b & 0x80) == 0) {
1431+
return value;
1432+
}
1433+
shift += 7;
1434+
}
1435+
}
1436+
}
13621437
}

core/src/test/java/io/questdb/client/test/cutlass/qwp/protocol/QwpTableBufferTest.java

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -328,6 +328,29 @@ public void testAddSymbolUtf8ReusesExistingDictionaryEntry() throws Exception {
328328
});
329329
}
330330

331+
@Test
332+
public void testAddSymbolWithGlobalIdStoresOnlyGlobalIds() throws Exception {
333+
assertMemoryLeak(() -> {
334+
try (QwpTableBuffer table = new QwpTableBuffer("test")) {
335+
QwpTableBuffer.ColumnBuffer col = table.getOrCreateColumn("sym", QwpConstants.TYPE_SYMBOL, true);
336+
col.addSymbolWithGlobalId("alpha", 7);
337+
table.nextRow();
338+
col.addSymbolWithGlobalId("beta", 11);
339+
table.nextRow();
340+
341+
assertEquals(2, col.getSize());
342+
assertEquals(2, col.getValueCount());
343+
assertEquals(0, col.getSymbolDictionarySize());
344+
assertEquals(0, col.getAuxDataAddress());
345+
assertEquals(11, col.getMaxGlobalSymbolId());
346+
347+
long dataAddress = col.getDataAddress();
348+
assertEquals(7, Unsafe.getUnsafe().getInt(dataAddress));
349+
assertEquals(11, Unsafe.getUnsafe().getInt(dataAddress + Integer.BYTES));
350+
}
351+
});
352+
}
353+
331354
@Test
332355
public void testCancelRowResetsDecimalScaleOnLateAddedColumn() throws Exception {
333356
assertMemoryLeak(() -> {
@@ -406,6 +429,31 @@ public void testCancelRowResetsSymbolDictOnLateAddedColumn() throws Exception {
406429
});
407430
}
408431

432+
@Test
433+
public void testCancelRowRetainsGlobalSymbolIdWithoutLocalDictionary() throws Exception {
434+
assertMemoryLeak(() -> {
435+
try (QwpTableBuffer table = new QwpTableBuffer("test")) {
436+
table.getOrCreateColumn("a", QwpConstants.TYPE_LONG, false).addLong(0);
437+
table.nextRow();
438+
439+
table.getOrCreateColumn("a", QwpConstants.TYPE_LONG, false).addLong(1);
440+
QwpTableBuffer.ColumnBuffer colS = table.getOrCreateColumn("s", QwpConstants.TYPE_SYMBOL, true);
441+
colS.addSymbolWithGlobalId("stale", 4);
442+
table.cancelCurrentRow();
443+
444+
table.getOrCreateColumn("a", QwpConstants.TYPE_LONG, false).addLong(1);
445+
colS.addSymbolWithGlobalId("fresh", 9);
446+
table.nextRow();
447+
448+
assertEquals(2, colS.getSize());
449+
assertEquals(1, colS.getValueCount());
450+
assertEquals(0, colS.getSymbolDictionarySize());
451+
assertEquals(9, colS.getMaxGlobalSymbolId());
452+
assertEquals(9, Unsafe.getUnsafe().getInt(colS.getDataAddress()));
453+
}
454+
});
455+
}
456+
409457
@Test
410458
public void testCancelRowRewindsDoubleArrayOffsets() throws Exception {
411459
assertMemoryLeak(() -> {

0 commit comments

Comments
 (0)