@@ -801,6 +801,8 @@ class ReaderItem {
801801 }
802802 }
803803
804+ reset_extras ( ) { }
805+
804806 shift_o ( sz ) {
805807 this . o += sz ;
806808 while ( ( this . o >= this . view_len ) && this . view_len ) {
@@ -1042,6 +1044,10 @@ class StringReaderItem extends ReaderItem {
10421044 this . off0 = 0 ;
10431045 }
10441046
1047+ reset_extras ( ) {
1048+ this . off0 = 0 ;
1049+ }
1050+
10451051 func ( tgtobj ) {
10461052 const tmp = { } ;
10471053 this . items [ 0 ] . func ( tmp ) ;
@@ -1101,6 +1107,10 @@ class CollectionReaderItem extends ReaderItem {
11011107 items [ 1 ] . set_not_simple ( ) ;
11021108 }
11031109
1110+ reset_extras ( ) {
1111+ this . off0 = 0 ;
1112+ }
1113+
11041114 func ( tgtobj ) {
11051115 const arr = [ ] , tmp = { } ;
11061116 this . items [ 0 ] . func ( tmp ) ;
@@ -1204,20 +1214,27 @@ async function rntupleProcess(rntuple, selector, args = {}) {
12041214 } ;
12051215
12061216 function readNextPortion ( inc_cluster ) {
1207- if ( inc_cluster ) {
1208- handle . current_cluster ++ ;
1209- handle . current_cluster_first_entry = handle . current_cluster_last_entry ;
1210- }
1217+ let do_again = true , numClusterEntries , locations ;
12111218
1212- const locations = rntuple . builder . pageLocations [ handle . current_cluster ] ;
1213- if ( ! locations ) {
1214- selector . Terminate ( true ) ;
1215- return selector ;
1216- }
1219+ while ( do_again ) {
1220+ if ( inc_cluster ) {
1221+ handle . current_cluster ++ ;
1222+ handle . current_cluster_first_entry = handle . current_cluster_last_entry ;
1223+ }
12171224
1218- const numClusterEntries = rntuple . builder . clusterSummaries [ handle . current_cluster ] . numEntries ;
1225+ locations = rntuple . builder . pageLocations [ handle . current_cluster ] ;
1226+ if ( ! locations ) {
1227+ selector . Terminate ( true ) ;
1228+ return selector ;
1229+ }
12191230
1220- handle . current_cluster_last_entry = handle . current_cluster_first_entry + numClusterEntries ;
1231+ numClusterEntries = rntuple . builder . clusterSummaries [ handle . current_cluster ] . numEntries ;
1232+
1233+ handle . current_cluster_last_entry = handle . current_cluster_first_entry + numClusterEntries ;
1234+
1235+ do_again = inc_cluster && handle . process_entries &&
1236+ ( handle . process_entries [ handle . process_entries_indx ] >= handle . current_cluster_last_entry ) ;
1237+ }
12211238
12221239 // calculate entries which can be extracted from the cluster
12231240 let emin , emax ;
@@ -1244,8 +1261,9 @@ async function rntupleProcess(rntuple, selector, args = {}) {
12441261 } ) . then ( unzipBlobs => {
12451262 unzipBlobs . map ( ( rawblob , idx ) => itemsToRead [ idx ] . reconstructBlob ( rawblob , pagesToRead [ idx ] ) ) ;
12461263
1247- for ( let indx = 0 ; indx < handle . columns . length ; ++ indx )
1248- handle . columns [ indx ] . init_o ( ) ;
1264+ // reset reading pointer after all buffers are there
1265+ handle . columns . forEach ( item => item . init_o ( ) ) ;
1266+ handle . arr . forEach ( item => item . reset_extras ( ) ) ;
12491267
12501268 let skip_entries = handle . current_entry - handle . current_cluster_first_entry ;
12511269
0 commit comments