Skip to content

Commit 6e4d100

Browse files
committed
iteration #5
1 parent 6f566ad commit 6e4d100

File tree

3 files changed

+376
-365
lines changed

3 files changed

+376
-365
lines changed
Lines changed: 343 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,343 @@
1+
/*******************************************************************************
2+
* ___ _ ____ ____
3+
* / _ \ _ _ ___ ___| |_| _ \| __ )
4+
* | | | | | | |/ _ \/ __| __| | | | _ \
5+
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
6+
* \__\_\\__,_|\___||___/\__|____/|____/
7+
*
8+
* Copyright (c) 2014-2019 Appsicle
9+
* Copyright (c) 2019-2026 QuestDB
10+
*
11+
* Licensed under the Apache License, Version 2.0 (the "License");
12+
* you may not use this file except in compliance with the License.
13+
* You may obtain a copy of the License at
14+
*
15+
* http://www.apache.org/licenses/LICENSE-2.0
16+
*
17+
* Unless required by applicable law or agreed to in writing, software
18+
* distributed under the License is distributed on an "AS IS" BASIS,
19+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20+
* See the License for the specific language governing permissions and
21+
* limitations under the License.
22+
*
23+
******************************************************************************/
24+
25+
package io.questdb.client.cutlass.qwp.client;
26+
27+
import io.questdb.client.cutlass.line.LineSenderException;
28+
import io.questdb.client.cutlass.qwp.protocol.QwpColumnDef;
29+
import io.questdb.client.cutlass.qwp.protocol.QwpGorillaEncoder;
30+
import io.questdb.client.cutlass.qwp.protocol.QwpTableBuffer;
31+
import io.questdb.client.std.Unsafe;
32+
33+
import static io.questdb.client.cutlass.qwp.protocol.QwpConstants.*;
34+
35+
/**
36+
* Transport-agnostic column encoder for ILP v4 table data.
37+
* <p>
38+
* Reads column data from {@link QwpTableBuffer.ColumnBuffer} and writes encoded
39+
* bytes to a {@link QwpBufferWriter}. Both {@link QwpWebSocketEncoder} and
40+
* {@link QwpUdpSender} delegate to this class for column encoding.
41+
*/
42+
class QwpColumnWriter {
43+
44+
private static final byte ENCODING_GORILLA = 0x01;
45+
private static final byte ENCODING_UNCOMPRESSED = 0x00;
46+
private final QwpGorillaEncoder gorillaEncoder = new QwpGorillaEncoder();
47+
private QwpBufferWriter buffer;
48+
49+
private void encodeColumn(QwpTableBuffer.ColumnBuffer col, QwpColumnDef colDef, int rowCount, boolean useGorilla, boolean useGlobalSymbols) {
50+
int valueCount = col.getValueCount();
51+
long dataAddr = col.getDataAddress();
52+
53+
if (colDef.isNullable()) {
54+
writeNullBitmap(col, rowCount);
55+
}
56+
57+
switch (col.getType()) {
58+
case TYPE_BOOLEAN:
59+
writeBooleanColumn(dataAddr, valueCount);
60+
break;
61+
case TYPE_BYTE:
62+
buffer.putBlockOfBytes(dataAddr, valueCount);
63+
break;
64+
case TYPE_SHORT:
65+
case TYPE_CHAR:
66+
buffer.putBlockOfBytes(dataAddr, (long) valueCount * 2);
67+
break;
68+
case TYPE_INT, TYPE_FLOAT:
69+
buffer.putBlockOfBytes(dataAddr, (long) valueCount * 4);
70+
break;
71+
case TYPE_LONG, TYPE_DATE, TYPE_DOUBLE:
72+
buffer.putBlockOfBytes(dataAddr, (long) valueCount * 8);
73+
break;
74+
case TYPE_TIMESTAMP:
75+
case TYPE_TIMESTAMP_NANOS:
76+
writeTimestampColumn(dataAddr, valueCount, useGorilla);
77+
break;
78+
case TYPE_GEOHASH:
79+
writeGeoHashColumn(dataAddr, valueCount, col.getGeoHashPrecision());
80+
break;
81+
case TYPE_STRING:
82+
case TYPE_VARCHAR:
83+
writeStringColumn(col, valueCount);
84+
break;
85+
case TYPE_SYMBOL:
86+
if (useGlobalSymbols) {
87+
writeSymbolColumnWithGlobalIds(col, valueCount);
88+
} else {
89+
writeSymbolColumn(col, valueCount);
90+
}
91+
break;
92+
case TYPE_UUID:
93+
// Stored as lo+hi contiguously, matching wire order
94+
buffer.putBlockOfBytes(dataAddr, (long) valueCount * 16);
95+
break;
96+
case TYPE_LONG256:
97+
// Stored as 4 contiguous longs per value
98+
buffer.putBlockOfBytes(dataAddr, (long) valueCount * 32);
99+
break;
100+
case TYPE_DOUBLE_ARRAY:
101+
writeDoubleArrayColumn(col, valueCount);
102+
break;
103+
case TYPE_LONG_ARRAY:
104+
writeLongArrayColumn(col, valueCount);
105+
break;
106+
case TYPE_DECIMAL64:
107+
writeDecimal64Column(col.getDecimalScale(), dataAddr, valueCount);
108+
break;
109+
case TYPE_DECIMAL128:
110+
writeDecimal128Column(col.getDecimalScale(), dataAddr, valueCount);
111+
break;
112+
case TYPE_DECIMAL256:
113+
writeDecimal256Column(col.getDecimalScale(), dataAddr, valueCount);
114+
break;
115+
default:
116+
throw new LineSenderException("Unknown column type: " + col.getType());
117+
}
118+
}
119+
120+
void encodeTable(QwpTableBuffer tableBuffer, boolean useSchemaRef, boolean useGlobalSymbols, boolean useGorilla) {
121+
QwpColumnDef[] columnDefs = tableBuffer.getColumnDefs();
122+
int rowCount = tableBuffer.getRowCount();
123+
124+
if (useSchemaRef) {
125+
writeTableHeaderWithSchemaRef(
126+
tableBuffer.getTableName(),
127+
rowCount,
128+
tableBuffer.getSchemaHash(),
129+
columnDefs.length
130+
);
131+
} else {
132+
writeTableHeaderWithSchema(tableBuffer.getTableName(), rowCount, columnDefs);
133+
}
134+
135+
for (int i = 0; i < tableBuffer.getColumnCount(); i++) {
136+
QwpTableBuffer.ColumnBuffer col = tableBuffer.getColumn(i);
137+
QwpColumnDef colDef = columnDefs[i];
138+
encodeColumn(col, colDef, rowCount, useGorilla, useGlobalSymbols);
139+
}
140+
}
141+
142+
void setBuffer(QwpBufferWriter buffer) {
143+
this.buffer = buffer;
144+
}
145+
146+
private void writeBooleanColumn(long addr, int count) {
147+
int packedSize = (count + 7) / 8;
148+
for (int i = 0; i < packedSize; i++) {
149+
byte b = 0;
150+
for (int bit = 0; bit < 8; bit++) {
151+
int idx = i * 8 + bit;
152+
if (idx < count && Unsafe.getUnsafe().getByte(addr + idx) != 0) {
153+
b |= (1 << bit);
154+
}
155+
}
156+
buffer.putByte(b);
157+
}
158+
}
159+
160+
private void writeDecimal128Column(byte scale, long addr, int count) {
161+
buffer.putByte(scale);
162+
for (int i = 0; i < count; i++) {
163+
long offset = (long) i * 16;
164+
long hi = Unsafe.getUnsafe().getLong(addr + offset);
165+
long lo = Unsafe.getUnsafe().getLong(addr + offset + 8);
166+
buffer.putLongBE(hi);
167+
buffer.putLongBE(lo);
168+
}
169+
}
170+
171+
private void writeDecimal256Column(byte scale, long addr, int count) {
172+
buffer.putByte(scale);
173+
for (int i = 0; i < count; i++) {
174+
long offset = (long) i * 32;
175+
buffer.putLongBE(Unsafe.getUnsafe().getLong(addr + offset));
176+
buffer.putLongBE(Unsafe.getUnsafe().getLong(addr + offset + 8));
177+
buffer.putLongBE(Unsafe.getUnsafe().getLong(addr + offset + 16));
178+
buffer.putLongBE(Unsafe.getUnsafe().getLong(addr + offset + 24));
179+
}
180+
}
181+
182+
private void writeDecimal64Column(byte scale, long addr, int count) {
183+
buffer.putByte(scale);
184+
for (int i = 0; i < count; i++) {
185+
buffer.putLongBE(Unsafe.getUnsafe().getLong(addr + (long) i * 8));
186+
}
187+
}
188+
189+
private void writeDoubleArrayColumn(QwpTableBuffer.ColumnBuffer col, int count) {
190+
byte[] dims = col.getArrayDims();
191+
int[] shapes = col.getArrayShapes();
192+
double[] data = col.getDoubleArrayData();
193+
194+
int shapeIdx = 0;
195+
int dataIdx = 0;
196+
for (int row = 0; row < count; row++) {
197+
int nDims = dims[row];
198+
buffer.putByte((byte) nDims);
199+
200+
int elemCount = 1;
201+
for (int d = 0; d < nDims; d++) {
202+
int dimLen = shapes[shapeIdx++];
203+
buffer.putInt(dimLen);
204+
elemCount = Math.multiplyExact(elemCount, dimLen);
205+
}
206+
207+
for (int e = 0; e < elemCount; e++) {
208+
buffer.putDouble(data[dataIdx++]);
209+
}
210+
}
211+
}
212+
213+
private void writeGeoHashColumn(long addr, int count, int precision) {
214+
if (precision < 1) {
215+
precision = 1;
216+
}
217+
buffer.putVarint(precision);
218+
int valueSize = (precision + 7) / 8;
219+
for (int i = 0; i < count; i++) {
220+
long value = Unsafe.getUnsafe().getLong(addr + (long) i * 8);
221+
for (int b = 0; b < valueSize; b++) {
222+
buffer.putByte((byte) (value >>> (b * 8)));
223+
}
224+
}
225+
}
226+
227+
private void writeLongArrayColumn(QwpTableBuffer.ColumnBuffer col, int count) {
228+
byte[] dims = col.getArrayDims();
229+
int[] shapes = col.getArrayShapes();
230+
long[] data = col.getLongArrayData();
231+
232+
int shapeIdx = 0;
233+
int dataIdx = 0;
234+
for (int row = 0; row < count; row++) {
235+
int nDims = dims[row];
236+
buffer.putByte((byte) nDims);
237+
238+
int elemCount = 1;
239+
for (int d = 0; d < nDims; d++) {
240+
int dimLen = shapes[shapeIdx++];
241+
buffer.putInt(dimLen);
242+
elemCount = Math.multiplyExact(elemCount, dimLen);
243+
}
244+
245+
for (int e = 0; e < elemCount; e++) {
246+
buffer.putLong(data[dataIdx++]);
247+
}
248+
}
249+
}
250+
251+
private void writeNullBitmap(QwpTableBuffer.ColumnBuffer col, int rowCount) {
252+
long nullAddr = col.getNullBitmapAddress();
253+
if (nullAddr != 0) {
254+
int bitmapSize = (rowCount + 7) / 8;
255+
buffer.putBlockOfBytes(nullAddr, bitmapSize);
256+
} else {
257+
int bitmapSize = (rowCount + 7) / 8;
258+
for (int i = 0; i < bitmapSize; i++) {
259+
buffer.putByte((byte) 0);
260+
}
261+
}
262+
}
263+
264+
private void writeStringColumn(QwpTableBuffer.ColumnBuffer col, int valueCount) {
265+
buffer.putBlockOfBytes(col.getStringOffsetsAddress(), (long) (valueCount + 1) * 4);
266+
buffer.putBlockOfBytes(col.getStringDataAddress(), col.getStringDataSize());
267+
}
268+
269+
private void writeSymbolColumn(QwpTableBuffer.ColumnBuffer col, int count) {
270+
long dataAddr = col.getDataAddress();
271+
String[] dictionary = col.getSymbolDictionary();
272+
273+
buffer.putVarint(dictionary.length);
274+
for (String symbol : dictionary) {
275+
buffer.putString(symbol);
276+
}
277+
278+
for (int i = 0; i < count; i++) {
279+
int idx = Unsafe.getUnsafe().getInt(dataAddr + (long) i * 4);
280+
buffer.putVarint(idx);
281+
}
282+
}
283+
284+
private void writeSymbolColumnWithGlobalIds(QwpTableBuffer.ColumnBuffer col, int count) {
285+
long auxAddr = col.getAuxDataAddress();
286+
if (auxAddr == 0) {
287+
long dataAddr = col.getDataAddress();
288+
for (int i = 0; i < count; i++) {
289+
int idx = Unsafe.getUnsafe().getInt(dataAddr + (long) i * 4);
290+
buffer.putVarint(idx);
291+
}
292+
} else {
293+
for (int i = 0; i < count; i++) {
294+
int globalId = Unsafe.getUnsafe().getInt(auxAddr + (long) i * 4);
295+
buffer.putVarint(globalId);
296+
}
297+
}
298+
}
299+
300+
private void writeTableHeaderWithSchema(String tableName, int rowCount, QwpColumnDef[] columns) {
301+
buffer.putString(tableName);
302+
buffer.putVarint(rowCount);
303+
buffer.putVarint(columns.length);
304+
buffer.putByte(SCHEMA_MODE_FULL);
305+
for (QwpColumnDef col : columns) {
306+
buffer.putString(col.getName());
307+
buffer.putByte(col.getWireTypeCode());
308+
}
309+
}
310+
311+
private void writeTableHeaderWithSchemaRef(String tableName, int rowCount, long schemaHash, int columnCount) {
312+
buffer.putString(tableName);
313+
buffer.putVarint(rowCount);
314+
buffer.putVarint(columnCount);
315+
buffer.putByte(SCHEMA_MODE_REFERENCE);
316+
buffer.putLong(schemaHash);
317+
}
318+
319+
private void writeTimestampColumn(long addr, int count, boolean useGorilla) {
320+
if (useGorilla && count > 2) {
321+
if (QwpGorillaEncoder.canUseGorilla(addr, count)) {
322+
buffer.putByte(ENCODING_GORILLA);
323+
int encodedSize = QwpGorillaEncoder.calculateEncodedSize(addr, count);
324+
buffer.ensureCapacity(encodedSize);
325+
int bytesWritten = gorillaEncoder.encodeTimestamps(
326+
buffer.getBufferPtr() + buffer.getPosition(),
327+
buffer.getCapacity() - buffer.getPosition(),
328+
addr,
329+
count
330+
);
331+
buffer.skip(bytesWritten);
332+
} else {
333+
buffer.putByte(ENCODING_UNCOMPRESSED);
334+
buffer.putBlockOfBytes(addr, (long) count * 8);
335+
}
336+
} else {
337+
if (useGorilla) {
338+
buffer.putByte(ENCODING_UNCOMPRESSED);
339+
}
340+
buffer.putBlockOfBytes(addr, (long) count * 8);
341+
}
342+
}
343+
}

0 commit comments

Comments
 (0)