@@ -142,13 +142,21 @@ class FileEmbeddingStore(
142142 withContext(Dispatchers .IO ) {
143143 if (embeddings.isEmpty()) return @withContext 0
144144
145- if (idToFileOffsetIndex.isEmpty() && file.exists() ) {
145+ if (idToFileOffsetIndex.isEmpty()) {
146146 load()
147147 }
148148
149149 val filteredNewEmbeddings = embeddings.filterNot { it.id in idToFileOffsetIndex }
150150 if (filteredNewEmbeddings.isEmpty()) return @withContext 0
151151
152+ for (embedding in filteredNewEmbeddings) {
153+ if (embedding.embedding.size != embeddingDimension) {
154+ throw SmartScanException .InvalidEmbeddingDimension (
155+ " Embedding dimension mismatch. Expected $embeddingDimension , got ${embedding.embedding.size} "
156+ )
157+ }
158+ }
159+
152160 RandomAccessFile (file, " rw" ).use { raf ->
153161 val channel = raf.channel
154162
@@ -162,6 +170,44 @@ class FileEmbeddingStore(
162170
163171 val newCount = existingCount + filteredNewEmbeddings.size
164172
173+ // Move to the end (append mode) or start of data section if new file
174+ val nextOffset = if (fileExistsAndHasContent) {
175+ channel.size()
176+ } else {
177+ headerSize.toLong()
178+ }
179+
180+ channel.position(nextOffset)
181+
182+ val targetChunkBytes = 4 * 1024 * 1024
183+ val chunkCapacity = maxOf(
184+ recordSize,
185+ (targetChunkBytes / recordSize).coerceAtLeast(1 ) * recordSize
186+ )
187+ val writeBuffer = ByteBuffer .allocateDirect(chunkCapacity).order(ByteOrder .LITTLE_ENDIAN )
188+
189+ fun flushBuffer () {
190+ writeBuffer.flip()
191+ while (writeBuffer.hasRemaining()) {
192+ channel.write(writeBuffer)
193+ }
194+ writeBuffer.clear()
195+ }
196+
197+ for (embedding in filteredNewEmbeddings) {
198+ if (writeBuffer.remaining() < recordSize) {
199+ flushBuffer()
200+ }
201+
202+ writeBuffer.putLong(embedding.id)
203+ writeBuffer.putLong(embedding.date)
204+ for (f in embedding.embedding) writeBuffer.putFloat(f)
205+ }
206+
207+ if (writeBuffer.position() > 0 ) {
208+ flushBuffer()
209+ }
210+
165211 // Write updated count back as little-endian
166212 val headerBuf = ByteBuffer .allocate(headerSize).order(ByteOrder .LITTLE_ENDIAN )
167213 headerBuf.putInt(newCount)
@@ -171,45 +217,23 @@ class FileEmbeddingStore(
171217 channel.write(headerBuf)
172218 }
173219
174- // Move to the end (append mode) or start of data section if new file
175- var nextOffset = headerSize.toLong()
176- if (fileExistsAndHasContent) {
177- nextOffset = channel.size()
178- } else {
179- channel.position(headerSize.toLong())
180- }
181-
182- for (embedding in filteredNewEmbeddings) {
183- if (embedding.embedding.size != embeddingDimension) {
184- throw SmartScanException .InvalidEmbeddingDimension (
185- " Embedding dimension mismatch. Expected $embeddingDimension , got ${embedding.embedding.size} "
186- )
187- }
188-
189- val buf = ByteBuffer .allocate(recordSize).order(ByteOrder .LITTLE_ENDIAN )
190- buf.putLong(embedding.id)
191- buf.putLong(embedding.date)
192- for (f in embedding.embedding) buf.putFloat(f)
193- buf.flip()
194-
195- channel.position(nextOffset)
196- while (buf.hasRemaining()) {
197- channel.write(buf)
198- }
220+ channel.force(false )
199221
200- // update in-memory file offset index for the newly appended entry and cache
201- idToFileOffsetIndex[embedding.id] = nextOffset
222+ // update in-memory file offset index for the newly appended entry and cache
223+ filteredNewEmbeddings.forEachIndexed { index, embedding ->
224+ idToFileOffsetIndex[embedding.id] = nextOffset + (index.toLong() * recordSize)
225+ }
202226
203- // Only add items to cache if it's not empty e.g after get() call, to keep it synchronized.
204- // This prevents edge cases that could result in partial cache overwriting on-disk data
205- // It also prevents unnecessarily keeping embeddings in memory
206- if (cache.isNotEmpty()) {
227+ // Only add items to cache if it's not empty e.g after get() call, to keep it synchronized.
228+ // This prevents edge cases that could result in partial cache overwriting on-disk data
229+ // It also prevents unnecessarily keeping embeddings in memory
230+ if (cache.isNotEmpty()) {
231+ for (embedding in filteredNewEmbeddings) {
207232 cache[embedding.id] = embedding
208233 }
209- nextOffset + = recordSize.toLong()
210234 }
211- channel.force(false )
212235 }
236+
213237 filteredNewEmbeddings.size
214238 }
215239 }
@@ -236,8 +260,18 @@ class FileEmbeddingStore(
236260 idToFileOffsetIndex.clear()
237261 }
238262
239- override suspend fun query (embedding : FloatArray , topK : Int , threshold : Float , ids : Set <Long >): List <Long > {
240- val storedEmbeddings = if (ids.isNotEmpty()) get().filter { it.id in ids } else get()
263+ override suspend fun query (embedding : FloatArray , topK : Int , threshold : Float , ids : Set <Long >, startDate : Long? , endDate : Long? ): List <Long > {
264+ val storedEmbeddings = get().asSequence()
265+ .let { seq ->
266+ if (ids.isNotEmpty()) seq.filter { it.id in ids } else seq
267+ }
268+ .let { seq ->
269+ if (startDate != null ) seq.filter { it.date >= startDate } else seq
270+ }
271+ .let { seq ->
272+ if (endDate != null ) seq.filter { it.date <= endDate } else seq
273+ }
274+ .toList()
241275
242276 if (storedEmbeddings.isEmpty()) return emptyList()
243277
0 commit comments