3333import org .slf4j .Logger ;
3434import org .slf4j .LoggerFactory ;
3535
36+ import java .util .ArrayList ;
3637import java .util .List ;
3738
3839import static io .milvus .param .Constant .NO_CACHE_ID ;
40+ import static io .milvus .param .Constant .MAX_BATCH_SIZE ;
3941import static io .milvus .param .Constant .UNLIMITED ;
4042
4143public class QueryIterator {
@@ -96,7 +98,7 @@ public QueryIterator(QueryIteratorReq queryIteratorReq,
9698 // perform a query to get the first time stamp check point
9799 // the time stamp will be input for the next query to skip something
98100 private void setupTsByRequest () {
99- QueryResults response = executeQuery (expr , 0L , 1L , 0L );
101+ QueryResults response = executeQuery (expr , 0L , 1L , 0L , true );
100102 if (response .getSessionTs () <= 0 ) {
101103 logger .warn ("Failed to get mvccTs from milvus server, use client-side ts instead" );
102104 // fall back to latest session ts by local time
@@ -114,11 +116,19 @@ private void seek() {
114116 return ;
115117 }
116118
117- QueryResults response = executeQuery (expr , 0L , offset , this .sessionTs );
118- QueryResultsWrapper queryWrapper = new QueryResultsWrapper (response );
119- List <QueryResultsWrapper .RowRecord > res = queryWrapper .getRowRecords ();
120- int resultIndex = Math .min (res .size (), (int ) offset );
121- updateCursor (res .subList (0 , resultIndex ));
119+ long currentOffset = offset ;
120+ while (currentOffset > 0 ) {
121+ long limit = Math .min (MAX_BATCH_SIZE , currentOffset );
122+ String currentExpr = setupNextExpr ();
123+ QueryResults response = executeQuery (currentExpr , 0L , limit , this .sessionTs , true );
124+ QueryResultsWrapper queryWrapper = new QueryResultsWrapper (response );
125+ List <QueryResultsWrapper .RowRecord > res = queryWrapper .getRowRecords ();
126+ if (res .isEmpty ()) {
127+ break ;
128+ }
129+ updateCursor (res );
130+ currentOffset -= res .size ();
131+ }
122132 offset = 0 ;
123133 }
124134
@@ -133,7 +143,7 @@ public List<QueryResultsWrapper.RowRecord> next() {
133143 iteratorCache .releaseCache (cacheIdInUse );
134144 String currentExpr = setupNextExpr ();
135145 logger .debug ("Query iterator next expression: " + currentExpr );
136- QueryResults response = executeQuery (currentExpr , offset , batchSize , this .sessionTs );
146+ QueryResults response = executeQuery (currentExpr , offset , batchSize , this .sessionTs , false );
137147 QueryResultsWrapper queryWrapper = new QueryResultsWrapper (response );
138148 List <QueryResultsWrapper .RowRecord > res = queryWrapper .getRowRecords ();
139149 maybeCache (res );
@@ -197,13 +207,18 @@ private boolean isResSufficient(List<QueryResultsWrapper.RowRecord> ret) {
197207 return ret != null && ret .size () >= batchSize ;
198208 }
199209
200- private QueryResults executeQuery (String expr , long offset , long limit , long ts ) {
210+ private QueryResults executeQuery (String expr , long offset , long limit , long ts , boolean isSeek ) {
211+ // for seeking offset, no need to return output fields
212+ List <String > outputFields = new ArrayList <>();
213+ if (!isSeek ) {
214+ outputFields = queryIteratorParam .getOutFields ();
215+ }
201216 QueryParam queryParam = QueryParam .newBuilder ()
202217 .withDatabaseName (queryIteratorParam .getDatabaseName ())
203218 .withCollectionName (queryIteratorParam .getCollectionName ())
204219 .withConsistencyLevel (queryIteratorParam .getConsistencyLevel ())
205220 .withPartitionNames (queryIteratorParam .getPartitionNames ())
206- .withOutFields (queryIteratorParam . getOutFields () )
221+ .withOutFields (outputFields )
207222 .withExpr (expr )
208223 .withOffset (offset )
209224 .withLimit (limit )
0 commit comments