@@ -51,20 +51,33 @@ public static void readVectored(VectoredReadable readable, List<? extends FileRa
5151 if (ranges .isEmpty ()) {
5252 return ;
5353 }
54+ readVectored (readable , ranges , ReadOptions .from (readable ));
55+ }
56+
57+ public static void readVectored (
58+ VectoredReadable readable , List <? extends FileRange > ranges , ReadOptions options )
59+ throws IOException {
60+ if (ranges .isEmpty ()) {
61+ return ;
62+ }
63+ requireNonNull (readable , "readable is null" );
64+ requireNonNull (options , "options is null" );
5465
5566 List <? extends FileRange > sortRanges = validateAndSortRanges (ranges );
5667 List <CombinedRange > combinedRanges =
57- mergeSortedRanges (sortRanges , readable .minSeekForVectorReads () );
68+ mergeSortedRanges (sortRanges , options .minSeekForVectorReads );
5869
59- int parallelism = readable .parallelismForVectorReads () ;
70+ int parallelism = options .parallelismForVectorReads ;
6071
61- if (combinedRanges .size () == 1 && readable instanceof SeekableInputStream ) {
72+ if (options .sequentialReadFallback
73+ && combinedRanges .size () == 1
74+ && readable instanceof SeekableInputStream ) {
6275 fallbackToReadSequence ((SeekableInputStream ) readable , sortRanges );
6376 return ;
6477 }
6578
6679 BlockingExecutor executor = new BlockingExecutor (IO_THREAD_POOL , parallelism );
67- long batchSize = readable .batchSizeForVectorReads () ;
80+ long batchSize = options .batchSizeForVectorReads ;
6881 for (CombinedRange combinedRange : combinedRanges ) {
6982 if (combinedRange .underlying .size () == 1 ) {
7083 FileRange fileRange = combinedRange .underlying .get (0 );
@@ -76,12 +89,95 @@ public static void readVectored(VectoredReadable readable, List<? extends FileRa
7689 List <CompletableFuture <byte []>> futures =
7790 splitBatches .stream ().map (FileRange ::getData ).collect (Collectors .toList ());
7891 CompletableFuture .allOf (futures .toArray (new CompletableFuture <?>[0 ]))
79- .thenAcceptAsync (
80- unused -> copyToFileRanges (combinedRange , futures ), IO_THREAD_POOL );
92+ .whenCompleteAsync (
93+ (unused , throwable ) -> {
94+ if (throwable == null ) {
95+ try {
96+ copyToFileRanges (combinedRange , futures );
97+ } catch (Throwable t ) {
98+ completeFileRangesExceptionally (combinedRange , t );
99+ }
100+ } else {
101+ completeFileRangesExceptionally (combinedRange , throwable );
102+ }
103+ },
104+ IO_THREAD_POOL );
81105 }
82106 }
83107 }
84108
109+ /** Options for vectored reads. */
110+ public static class ReadOptions {
111+
112+ private final int minSeekForVectorReads ;
113+ private final long batchSizeForVectorReads ;
114+ private final int parallelismForVectorReads ;
115+ private final boolean sequentialReadFallback ;
116+
117+ public static ReadOptions from (VectoredReadable readable ) {
118+ return new ReadOptions (
119+ readable .minSeekForVectorReads (),
120+ readable .batchSizeForVectorReads (),
121+ readable .parallelismForVectorReads (),
122+ true );
123+ }
124+
125+ public ReadOptions (
126+ int minSeekForVectorReads ,
127+ long batchSizeForVectorReads ,
128+ int parallelismForVectorReads ,
129+ boolean sequentialReadFallback ) {
130+ checkArgument (
131+ minSeekForVectorReads >= 0 ,
132+ "minSeekForVectorReads must be non-negative: %s" ,
133+ minSeekForVectorReads );
134+ checkArgument (
135+ batchSizeForVectorReads > 0 ,
136+ "batchSizeForVectorReads must be positive: %s" ,
137+ batchSizeForVectorReads );
138+ checkArgument (
139+ parallelismForVectorReads > 0 ,
140+ "parallelismForVectorReads must be positive: %s" ,
141+ parallelismForVectorReads );
142+ this .minSeekForVectorReads = minSeekForVectorReads ;
143+ this .batchSizeForVectorReads = batchSizeForVectorReads ;
144+ this .parallelismForVectorReads = parallelismForVectorReads ;
145+ this .sequentialReadFallback = sequentialReadFallback ;
146+ }
147+
148+ public ReadOptions withMinSeekForVectorReads (int minSeekForVectorReads ) {
149+ return new ReadOptions (
150+ minSeekForVectorReads ,
151+ batchSizeForVectorReads ,
152+ parallelismForVectorReads ,
153+ sequentialReadFallback );
154+ }
155+
156+ public ReadOptions withBatchSizeForVectorReads (long batchSizeForVectorReads ) {
157+ return new ReadOptions (
158+ minSeekForVectorReads ,
159+ batchSizeForVectorReads ,
160+ parallelismForVectorReads ,
161+ sequentialReadFallback );
162+ }
163+
164+ public ReadOptions withParallelismForVectorReads (int parallelismForVectorReads ) {
165+ return new ReadOptions (
166+ minSeekForVectorReads ,
167+ batchSizeForVectorReads ,
168+ parallelismForVectorReads ,
169+ sequentialReadFallback );
170+ }
171+
172+ public ReadOptions withSequentialReadFallback (boolean sequentialReadFallback ) {
173+ return new ReadOptions (
174+ minSeekForVectorReads ,
175+ batchSizeForVectorReads ,
176+ parallelismForVectorReads ,
177+ sequentialReadFallback );
178+ }
179+ }
180+
85181 private static void fallbackToReadSequence (
86182 SeekableInputStream in , List <? extends FileRange > ranges ) throws IOException {
87183 for (FileRange range : ranges ) {
@@ -126,6 +222,13 @@ private static void copyToFileRanges(
126222 }
127223 }
128224
225+ private static void completeFileRangesExceptionally (
226+ CombinedRange combinedRange , Throwable throwable ) {
227+ for (FileRange fileRange : combinedRange .underlying ) {
228+ fileRange .getData ().completeExceptionally (throwable );
229+ }
230+ }
231+
129232 private static void copyMultiBytesToBytes (
130233 List <byte []> segments , int offset , byte [] bytes , int numBytes ) {
131234 int remainSize = numBytes ;
0 commit comments