@@ -154,6 +154,53 @@ public static ImmutablePQVectors encodeAndBuild(ProductQuantization pq, int vect
154154 return new ImmutablePQVectors (pq , chunks , vectorCount , vectorsPerChunk );
155155 }
156156
157+ /**
158+ * Build a PQVectors instance from the given RandomAccessVectorValues. The vectors are encoded in parallel
159+ * and split into chunks to avoid exceeding the maximum array size.
160+ *
161+ * @param pq the ProductQuantization to use
162+ * @param vectorCount the number of vectors to encode
163+ * @param ravv the RandomAccessVectorValues to encode
164+ * @param simdExecutor the ForkJoinPool to use for SIMD operations
165+ * @return the PQVectors instance
166+ */
167+ public static ImmutablePQVectors encodeAndBuild (ProductQuantization pq , int vectorCount , int [] ordinalsMapping , RandomAccessVectorValues ravv , ForkJoinPool simdExecutor ) {
168+ // Calculate if we need to split into multiple chunks
169+ int compressedDimension = pq .compressedVectorSize ();
170+ long totalSize = (long ) vectorCount * compressedDimension ;
171+ int vectorsPerChunk = totalSize <= PQVectors .MAX_CHUNK_SIZE ? vectorCount : PQVectors .MAX_CHUNK_SIZE / compressedDimension ;
172+
173+ int numChunks = vectorCount / vectorsPerChunk ;
174+ final ByteSequence <?>[] chunks = new ByteSequence <?>[numChunks ];
175+ int chunkSize = vectorsPerChunk * compressedDimension ;
176+ for (int i = 0 ; i < numChunks - 1 ; i ++)
177+ chunks [i ] = vectorTypeSupport .createByteSequence (chunkSize );
178+
179+ // Last chunk might be smaller
180+ int remainingVectors = vectorCount - (vectorsPerChunk * (numChunks - 1 ));
181+ chunks [numChunks - 1 ] = vectorTypeSupport .createByteSequence (remainingVectors * compressedDimension );
182+
183+ // Encode the vectors in parallel into the compressed data chunks
184+ // The changes are concurrent, but because they are coordinated and do not overlap, we can use parallel streams
185+ // and then we are guaranteed safe publication because we join the thread after completion.
186+ var ravvCopy = ravv .threadLocalSupplier ();
187+ simdExecutor .submit (() -> IntStream .range (0 , ordinalsMapping .length )
188+ .parallel ()
189+ .forEach (ordinal -> {
190+ // Retrieve the slice and mutate it.
191+ var localRavv = ravvCopy .get ();
192+ var slice = PQVectors .get (chunks , ordinal , vectorsPerChunk , pq .getSubspaceCount ());
193+ var vector = localRavv .getVector (ordinalsMapping [ordinal ]);
194+ if (vector != null )
195+ pq .encodeTo (vector , slice );
196+ else
197+ slice .zero ();
198+ }))
199+ .join ();
200+
201+ return new ImmutablePQVectors (pq , chunks , vectorCount , vectorsPerChunk );
202+ }
203+
157204 @ Override
158205 public void write (DataOutput out , int version ) throws IOException
159206 {
0 commit comments