11package dev.paulee.core.data
22
3- import dev.paulee.api.data.DataInfo
4- import dev.paulee.api.data.IDataService
5- import dev.paulee.api.data.PreFilter
6- import dev.paulee.api.data.VariantMapping
3+ import com.github.benmanes.caffeine.cache.Cache
4+ import com.github.benmanes.caffeine.cache.Caffeine
5+ import dev.paulee.api.data.*
76import dev.paulee.api.data.provider.IStorageProvider
87import dev.paulee.api.data.provider.ProviderStatus
98import dev.paulee.api.data.provider.QueryOrder
109import dev.paulee.api.internal.Embedding
1110import dev.paulee.core.data.analysis.Indexer
12- import dev.paulee.core.data.io.BufferedCSVReader
1311import dev.paulee.core.data.model.DataPool
1412import dev.paulee.core.data.provider.EmbeddingProvider
1513import dev.paulee.core.data.provider.StorageProvider
@@ -19,19 +17,22 @@ import kotlinx.coroutines.withContext
1917import org.slf4j.LoggerFactory.getLogger
2018import java.io.IOException
2119import java.nio.file.Path
20+ import java.time.Duration
2221import kotlin.coroutines.cancellation.CancellationException
2322import kotlin.io.path.*
2423import kotlin.math.ceil
2524
25+ typealias QueryKey = Triple <Int , String , QueryOrder ?>
26+
2627typealias PageResult = Pair <List <Map <String , String >>, Map <String , List <Map <String , String >>>>
2728
2829object DataServiceImpl : IDataService {
2930
3031 private val logger = getLogger(DataServiceImpl ::class .java)
3132
32- private const val PAGE_SIZE = 50
33+ private const val PAGE_SIZE = 100
3334
34- private const val CSV_READER_BATCH_SIZE = 300
35+ private const val BATCH_SIZE = 1000
3536
3637 private val variantPattern = Regex (" @([^:]+):(\\ S+)" )
3738
@@ -41,11 +42,10 @@ object DataServiceImpl : IDataService {
4142
4243 private var currentField: String? = null
4344
44- private val pageCache = object : LinkedHashMap <Triple <Int , String , QueryOrder ?>, PageResult > (6 , 0.75f , true ) {
45- override fun removeEldestEntry (eldest : MutableMap .MutableEntry <Triple <Int , String , QueryOrder ?>, PageResult >? ): Boolean {
46- return size > 6
47- }
48- }
45+ private val pageCache: Cache <QueryKey , PageResult > = Caffeine .newBuilder()
46+ .maximumSize(32 )
47+ .expireAfterAccess(Duration .ofDays(3 ))
48+ .build()
4949
5050 private val storageProvider = mutableMapOf<String , IStorageProvider >()
5151
@@ -60,11 +60,13 @@ object DataServiceImpl : IDataService {
6060 try {
6161 val poolPath = FileService .dataDir.resolve(dataInfo.name)
6262
63- val storageProvider = StorageProvider .of(dataInfo.storageType)
63+ val currentProvider = StorageProvider .of(dataInfo.storageType)
6464
65- val initStatus = storageProvider.init (dataInfo, poolPath)
66-
67- if (initStatus != ProviderStatus .SUCCESS ) return @withContext initStatus == ProviderStatus .EXISTS
65+ when (currentProvider.init (dataInfo, poolPath)) {
66+ ProviderStatus .Success -> Unit
67+ ProviderStatus .Exists -> return @withContext true
68+ else -> return @withContext false
69+ }
6870
6971 dataInfoToString(dataInfo)?.let { json ->
7072 poolPath.resolve(" info.json" ).writeText(json)
@@ -75,62 +77,45 @@ object DataServiceImpl : IDataService {
7577 DataPool (
7678 indexer = Indexer (poolPath.resolve(" index" ), dataInfo),
7779 dataInfo = dataInfo,
78- storageProvider = storageProvider
80+ storageProvider = currentProvider
7981 )
8082 }.getOrElse { e ->
8183 logger.error(" Exception: Failed to create data pool." , e)
8284 return @withContext false
8385 }
8486
85- val totalBatches = dataInfo.sources.sumOf { source ->
86- val sourcePath =
87- FileService .dataDir.resolve(source.name.let { if (it.endsWith(" .csv" )) it else " $it .csv" })
87+ val sourcesWithIndex = dataInfo.sources.filter { it.fields.any { field -> field is IndexField } }
8888
89- if (sourcePath.exists()) BufferedCSVReader .estimateBatches(sourcePath, CSV_READER_BATCH_SIZE )
90- else 0
91- }
89+ val totalBatches =
90+ ((sourcesWithIndex.sumOf { currentProvider.count(it.name) } + BATCH_SIZE - 1 ) / BATCH_SIZE ).toInt()
9291
9392 var processedBatches = 0
9493 var lastPercentage = 0
9594 onProgress(0 )
9695
97- dataInfo.sources.forEach { source ->
98- val file = source.name
99-
100- if (file.isEmpty()) {
101- logger.warn(" No data source provided for ${file} ." )
102- return @forEach
103- }
96+ sourcesWithIndex
97+ .forEach { source ->
98+ val name = source.name
10499
105- val sourcePath =
106- FileService .dataDir.resolve(file.let { if (it.endsWith(" .csv" )) it else " $it .csv" })
100+ currentProvider.streamData(name)
101+ .chunked(BATCH_SIZE )
102+ .forEach { entries ->
103+ dataPool.indexer.indexEntries(name, entries)
107104
108- if (! sourcePath.exists()) {
109- logger.warn(" Source file '$sourcePath ' not found." )
110- return @forEach
111- }
105+ processedBatches++
112106
113- val idGenerator = generateSequence(1L ) { it + 1 }.iterator()
107+ val percentage =
108+ (if (totalBatches > 0 ) (processedBatches * 100 ) / totalBatches else 0 )
114109
115- BufferedCSVReader (sourcePath, batchSize = CSV_READER_BATCH_SIZE ).readLines { lines ->
116- val entries = lines.map { line ->
117- if (dataPool.hasIdentifier(file, line)) line
118- else line + (" ${file} _ag_id" to idGenerator.next().toString())
119- }
120-
121- dataPool.indexer.indexEntries(file, entries)
122- storageProvider.insert(file, entries)
123-
124- processedBatches++
125-
126- val percentage = if (totalBatches > 0 ) (processedBatches * 100 ) / totalBatches else 0
127-
128- if (percentage > lastPercentage) {
129- onProgress(percentage)
130- lastPercentage = percentage
131- }
110+ if (percentage > lastPercentage) {
111+ onProgress(percentage)
112+ lastPercentage = percentage
113+ }
114+ }
132115 }
133- }
116+
117+ dataPool.indexer.finish()
118+ EmbeddingProvider .finish()
134119
135120 dataPools[dataInfo.name] = dataPool
136121
@@ -170,7 +155,7 @@ object DataServiceImpl : IDataService {
170155
171156 val storageProvider = StorageProvider .of(dataInfo.storageType)
172157
173- if (storageProvider.init (dataInfo, child) == ProviderStatus .EXISTS ) {
158+ if (storageProvider.init (dataInfo, child) == ProviderStatus .Exists ) {
174159 dataPools[dataInfo.name] =
175160 DataPool (Indexer (child.resolve(" index" ), dataInfo), dataInfo, storageProvider)
176161
@@ -247,7 +232,7 @@ object DataServiceImpl : IDataService {
247232
248233 val key = Triple (pageCount, query, order)
249234
250- pageCache[ key] ?.let { return it }
235+ pageCache.getIfPresent( key) ?.let { return it }
251236
252237 val dataPool = this .dataPools[this .currentPool] ? : return Pair (emptyList(), emptyMap())
253238
@@ -284,7 +269,7 @@ object DataServiceImpl : IDataService {
284269
285270 val result = PageResult (entries, links)
286271
287- pageCache[ key] = result
272+ pageCache.put( key, result)
288273
289274 return result
290275 }
@@ -323,7 +308,7 @@ object DataServiceImpl : IDataService {
323308 return null
324309 }
325310
326- provider.init (dataInfo, path, true )
311+ provider.init (dataInfo, path)
327312
328313 return provider
329314 }
0 commit comments