1010import java .util .concurrent .BlockingQueue ;
1111import java .util .concurrent .TimeUnit ;
1212
13+ import com .clickhouse .client .config .ClickHouseClientOption ;
14+
1315/**
1416 * Extended input stream for read optimization.
1517 */
@@ -184,11 +186,17 @@ public long skip(long n) throws IOException {
184186
185187 static final class WrappedInputStream extends ClickHouseInputStream {
186188 private final InputStream in ;
189+ private final byte [] buffer ;
187190
191+ private int position ;
192+ private int limit ;
188193 private boolean closed ;
189194
190- WrappedInputStream (InputStream input ) {
195+ WrappedInputStream (InputStream input , int bufferSize ) {
191196 in = ClickHouseChecker .nonNull (input , "InputStream" );
197+ buffer = new byte [bufferSize ];
198+ position = 0 ;
199+ limit = 0 ;
192200 closed = false ;
193201 }
194202
@@ -198,26 +206,34 @@ private void ensureOpen() throws IOException {
198206 }
199207 }
200208
209+ private int updateBuffer () throws IOException {
210+ if (closed ) {
211+ return -1 ;
212+ }
213+
214+ position = 0 ;
215+ int count = in .read (buffer );
216+ limit = count > 0 ? count : 0 ;
217+ return count ;
218+ }
219+
201220 @ Override
202221 public int available () throws IOException {
203- return !closed ? in . available () : 0 ;
222+ return !closed && ( position < limit || updateBuffer () > 0 ) ? limit - position : 0 ;
204223 }
205224
206225 @ Override
207226 public byte readByte () throws IOException {
208- ensureOpen ();
209-
210- int v = in .read ();
211- if (v != -1 ) {
212- return (byte ) v ;
227+ if (position >= limit && updateBuffer () < 0 ) {
228+ try {
229+ close ();
230+ } catch (IOException e ) {
231+ // ignore
232+ }
233+ throw new EOFException ();
213234 }
214235
215- try {
216- close ();
217- } catch (IOException e ) {
218- // ignore
219- }
220- throw new EOFException ();
236+ return buffer [position ++];
221237 }
222238
223239 @ Override
@@ -232,26 +248,125 @@ public void close() throws IOException {
232248 in .close ();
233249 } finally {
234250 closed = true ;
251+ position = 0 ;
252+ limit = 0 ;
235253 }
236254 }
237255 }
238256
239257 @ Override
240258 public int read () throws IOException {
241259 ensureOpen ();
242- return in .read ();
260+
261+ int value = -1 ;
262+ if (position < limit || updateBuffer () > 0 ) {
263+ value = 0xFF & buffer [position ++];
264+ }
265+ return value ;
243266 }
244267
245268 @ Override
246269 public int read (byte [] b , int off , int len ) throws IOException {
270+ if (position >= limit && updateBuffer () < 0 ) {
271+ return -1 ;
272+ }
273+
247274 ensureOpen ();
248- return in .read (b , off , len );
275+
276+ int counter = 0 ;
277+ while (counter < len ) {
278+ int size = Math .min (limit - position , len - counter );
279+ System .arraycopy (buffer , position , b , off , size );
280+ position += size ;
281+ off += size ;
282+ counter += size ;
283+
284+ if (position >= limit && updateBuffer () < 0 ) {
285+ break ;
286+ }
287+ }
288+
289+ return counter ;
290+ }
291+
292+ @ Override
293+ public byte [] readBytes (int length ) throws IOException {
294+ if (length <= 0 ) {
295+ return EMPTY_BYTES ;
296+ }
297+
298+ ensureOpen ();
299+
300+ byte [] bytes = new byte [length ];
301+ int offset = 0 ;
302+ int counter = 0 ;
303+ while (counter < length ) {
304+ if (position >= limit && updateBuffer () < 0 ) {
305+ try {
306+ close ();
307+ } catch (IOException e ) {
308+ // ignore
309+ }
310+ throw counter == 0 ? new EOFException ()
311+ : new IOException (ClickHouseUtils
312+ .format ("Reached end of input stream after reading %d of %d bytes" , counter ,
313+ bytes .length ));
314+ }
315+
316+ int size = Math .min (limit - position , length - counter );
317+ System .arraycopy (buffer , position , bytes , offset , size );
318+ position += size ;
319+ offset += size ;
320+ counter += size ;
321+ }
322+
323+ return bytes ;
324+ }
325+
326+ @ Override
327+ public String readString (int byteLength , Charset charset ) throws IOException {
328+ ensureOpen ();
329+
330+ if (byteLength < 1 ) {
331+ return "" ;
332+ }
333+
334+ if (charset == null ) {
335+ charset = StandardCharsets .UTF_8 ;
336+ }
337+
338+ if (limit - position > byteLength ) {
339+ int offset = position ;
340+ position += byteLength ;
341+ return new String (buffer , offset , byteLength , charset );
342+ }
343+
344+ return new String (readBytes (byteLength ), charset );
249345 }
250346
251347 @ Override
252348 public long skip (long n ) throws IOException {
253349 ensureOpen ();
254- return in .skip (n );
350+
351+ long counter = 0L ;
352+ while (n > 0L ) {
353+ if (position >= limit && updateBuffer () < 0 ) {
354+ break ;
355+ } else {
356+ int remain = limit - position ;
357+ if (n > remain ) {
358+ n -= remain ;
359+ counter += remain ;
360+ position = limit ;
361+ } else {
362+ counter += n ;
363+ position += n ;
364+ n = 0L ;
365+ }
366+ }
367+ }
368+
369+ return counter ;
255370 }
256371 }
257372
@@ -274,7 +389,21 @@ public static ClickHouseInputStream of(BlockingQueue<ByteBuffer> queue, int time
274389 * {@link ClickHouseInputStream}
275390 */
276391 public static ClickHouseInputStream of (InputStream input ) {
277- return input instanceof ClickHouseInputStream ? (ClickHouseInputStream ) input : new WrappedInputStream (input );
392+ return of (input , (int ) ClickHouseClientOption .MAX_BUFFER_SIZE .getDefaultValue ());
393+ }
394+
395+ /**
396+ * Wraps the given input stream.
397+ *
398+ * @param input non-null input stream
399+ * @param bufferSize buffer size which is always greater than zero(usually 4096
400+ * or larger)
401+ * @return wrapped input, or the same input if it's instance of
402+ * {@link ClickHouseInputStream}
403+ */
404+ public static ClickHouseInputStream of (InputStream input , int bufferSize ) {
405+ return input instanceof ClickHouseInputStream ? (ClickHouseInputStream ) input
406+ : new WrappedInputStream (input , bufferSize );
278407 }
279408
280409 /**
0 commit comments