Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.net.Inet4Address;
import java.net.Inet6Address;
import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.Instant;
Expand Down Expand Up @@ -87,7 +88,7 @@ protected AbstractBinaryFormatReader(InputStream inputStream, QuerySettings quer
boolean jsonAsString = MapUtils.getFlag(settings,
ClientConfigProperties.serverSetting(ServerSettings.OUTPUT_FORMAT_BINARY_WRITE_JSON_AS_STRING), false);
this.binaryStreamReader = new BinaryStreamReader(inputStream, timeZone, LOG, byteBufferAllocator, jsonAsString,
defaultTypeHintMap);
defaultTypeHintMap, ByteBuffer::allocate);
if (schema != null) {
setSchema(schema);
}
Expand Down Expand Up @@ -208,13 +209,18 @@ public <T> T readValue(int colIndex) {
if (colIndex < 1 || colIndex > getSchema().getColumns().size()) {
throw new ClientException("Column index out of bounds: " + colIndex);
}
return (T) currentRecord[colIndex - 1];

T value = (T) currentRecord[colIndex - 1];
if (value instanceof BinaryString) {
return (T) ((BinaryString) value).asString();
}
return value;
}

@SuppressWarnings("unchecked")
@Override
public <T> T readValue(String colName) {
return (T) currentRecord[getSchema().nameToIndex(colName)];
return readValue(getSchema().nameToColumnIndex(colName));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import java.math.BigInteger;
import java.net.Inet4Address;
import java.net.Inet6Address;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.Instant;
Expand All @@ -33,6 +35,7 @@
import java.util.Map;
import java.util.TimeZone;
import java.util.UUID;
import java.util.function.Function;

/**
* This class is not thread safe and should not be shared between multiple threads.
Expand All @@ -51,6 +54,8 @@

private final ByteBufferAllocator bufferAllocator;

private final StringBufferAllocator stringBufferAllocator;

private final boolean jsonAsString;

private final Class<?> arrayDefaultTypeHint;
Expand All @@ -69,11 +74,17 @@
* @param jsonAsString - use string to serialize/deserialize JSON columns
* @param typeHintMapping - what type use as hint if hint is not set or may not be known.
*/
BinaryStreamReader(InputStream input, TimeZone timeZone, Logger log, ByteBufferAllocator bufferAllocator, boolean jsonAsString, Map<ClickHouseDataType, Class<?>> typeHintMapping) {
BinaryStreamReader(InputStream input, TimeZone timeZone, Logger log,
ByteBufferAllocator bufferAllocator,
boolean jsonAsString,
Map<ClickHouseDataType,
Class<?>> typeHintMapping,
StringBufferAllocator stringBufferAllocator) {
this.log = log == null ? NOPLogger.NOP_LOGGER : log;
this.timeZone = timeZone;
this.input = input;
this.bufferAllocator = bufferAllocator;
this.stringBufferAllocator = stringBufferAllocator;
this.jsonAsString = jsonAsString;

this.arrayDefaultTypeHint = typeHintMapping == null ||
Expand Down Expand Up @@ -121,13 +132,11 @@
switch (dataType) {
// Primitives
case FixedString: {
byte[] bytes = precision > STRING_BUFF.length ?
new byte[precision] : STRING_BUFF;
readNBytes(input, bytes, 0, precision);
return (T) new String(bytes, 0, precision, StandardCharsets.UTF_8);
return (T) readBytesToBuffer(precision, stringBufferAllocator::allocate);
}
case String: {
return (T) readString();
int len = readVarInt(input);
return (T) readBytesToBuffer(len, stringBufferAllocator::allocate);
}
case Int8:
return (T) Byte.valueOf(readByte());
Expand Down Expand Up @@ -1111,6 +1120,28 @@
return new String(dest, 0, len, StandardCharsets.UTF_8);
}

public BinaryString readBytesToBuffer(int len, Function<Integer, ByteBuffer> bufferAllocator) throws IOException {

Check warning on line 1123 in client-v2/src/main/java/com/clickhouse/client/api/data_formats/internal/BinaryStreamReader.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Refactor this code to use the more specialised Functional Interface 'IntFunction<ByteBuffer>'

See more on https://sonarcloud.io/project/issues?id=ClickHouse_clickhouse-java&issues=AZ1FwnRMMypVGK4GgbNA&open=AZ1FwnRMMypVGK4GgbNA&pullRequest=2813
ByteBuffer buffer = null;
if (len > 0) {
buffer = bufferAllocator.apply(len);
if (buffer == null) {
throw new IOException("bufferAllocator returned `null`");
}
if (buffer.hasArray()) {
readNBytes(input, buffer.array(), 0, len);
} else {
int left = len;
while (left > 0) {
int chunkSize = Math.min(STRING_BUFF.length, left);
readNBytes(input, STRING_BUFF, 0, chunkSize);
buffer.put(STRING_BUFF, 0, chunkSize);
left -= chunkSize;
}
}
}
return buffer == null ? null : new BinaryStringImpl(buffer);
}

/**
* Reads a decimal value from input stream.
* @param input - source of bytes
Expand All @@ -1137,6 +1168,10 @@
byte[] allocate(int size);
}

public interface StringBufferAllocator {
ByteBuffer allocate(int size);
}

/**
* Byte allocator that creates a new byte array for each request.
*/
Expand Down Expand Up @@ -1391,4 +1426,63 @@
}
return obj;
}

static final class BinaryStringImpl implements BinaryString {

private final ByteBuffer buffer;
private final int len;
private CharBuffer charBuffer = null;
private String strValue = null;

BinaryStringImpl(ByteBuffer buffer) {
this.buffer = buffer;
this.len = buffer.position();
}

@Override
public ByteBuffer rawBuffer() {
return buffer;
}

@Override
public String asString() {
if (strValue == null) {
if (buffer.hasArray()) {
strValue = new String(buffer.array(), StandardCharsets.UTF_8);
} else {
ensureCharBuffer();
strValue = charBuffer.toString();
}
}
return strValue;
}

@Override
public int length() {
return len;
}

@Override
public char charAt(int index) {
ensureCharBuffer();
return charBuffer.charAt(index);
}

@Override
public CharSequence subSequence(int start, int end) {
ensureCharBuffer();
return charBuffer.subSequence(start, end);
}

private void ensureCharBuffer() {
if (charBuffer == null) {
charBuffer = buffer.asCharBuffer();
}
}

@Override
public int compareTo(String o) {
return asString().compareTo(o);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.clickhouse.client.api.data_formats.internal;

import java.nio.ByteBuffer;

public interface BinaryString extends Comparable<String>, CharSequence {

/**
* Returns a backing byte buffer or creates one
* @return ByteBuffer instance.
*/
ByteBuffer rawBuffer();

/**
* Converts raw bytes to a string whenever size is.
* @return String object
*/
String asString();
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import com.clickhouse.client.api.ClickHouseException;
import com.clickhouse.client.api.DataTypeUtils;
import com.clickhouse.client.api.data_formats.internal.BinaryStreamReader;
import com.clickhouse.client.api.data_formats.internal.BinaryString;
import com.clickhouse.client.api.data_formats.internal.ValueConverters;
import com.clickhouse.data.ClickHouseColumn;
import com.clickhouse.data.ClickHouseDataType;

Expand Down Expand Up @@ -36,6 +38,8 @@

private final ArrayAsStringWriter arrayAsStringWriter = new ArrayAsStringWriter();

private final ValueConverters valueConverters = new ValueConverters();

Check warning on line 41 in client-v2/src/main/java/com/clickhouse/client/api/internal/DataTypeConverter.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Remove this unused "valueConverters" private field.

See more on https://sonarcloud.io/project/issues?id=ClickHouse_clickhouse-java&issues=AZ1FwnR5MypVGK4GgbNB&open=AZ1FwnR5MypVGK4GgbNB&pullRequest=2813

public String convertToString(Object value, ClickHouseColumn column) {
if (value == null) {
return null;
Expand Down Expand Up @@ -72,21 +76,29 @@
}

public String stringToString(Object bytesOrString, ClickHouseColumn column) {
StringBuilder sb = new StringBuilder();
if (column.isArray()) {
StringBuilder sb = new StringBuilder();
sb.append(QUOTE);
}
if (bytesOrString instanceof CharSequence) {
sb.append(((CharSequence) bytesOrString));
} else if (bytesOrString instanceof byte[]) {
sb.append(new String((byte[]) bytesOrString));
} else {
sb.append(bytesOrString);
}
if (column.isArray()) {
if (bytesOrString instanceof BinaryString) {
sb.append(((BinaryString)bytesOrString).asString()); // string will be cached
} else if (bytesOrString instanceof CharSequence) {
sb.append(((CharSequence) bytesOrString));
} else if (bytesOrString instanceof byte[]) {
sb.append(new String((byte[]) bytesOrString));
} else {
sb.append(bytesOrString);
}
sb.append(QUOTE);
return sb.toString();
} else {
if (bytesOrString instanceof BinaryString) {
return ((BinaryString)bytesOrString).asString(); // string will be cached
} else if (bytesOrString instanceof byte[]) {
return new String((byte[]) bytesOrString);
} else {
return bytesOrString.toString();
}
}
return sb.toString();
}

public static ZoneId UTC_ZONE_ID = ZoneId.of("UTC");
Expand Down
Loading