11use std:: collections:: HashMap ;
22use std:: sync:: Arc ;
3+ use std:: time:: Duration ;
34
45use arrow_array:: builder:: {
56 FixedSizeListBuilder , Float32Builder , Int32Builder , LargeBinaryBuilder , LargeStringBuilder ,
@@ -12,28 +13,98 @@ use arrow_array::{
1213 TimestampMicrosecondArray ,
1314} ;
1415use arrow_schema:: { ArrowError , DataType , Field , FieldRef , Schema , TimeUnit } ;
15- use chrono:: DateTime ;
16+ use chrono:: { DateTime , Timelike , Utc } ;
1617use futures:: TryStreamExt ;
18+ use lance:: dataset:: optimize:: { compact_files, CompactionMetrics , CompactionOptions } ;
1719use lance:: dataset:: { builder:: DatasetBuilder , Dataset , WriteMode , WriteParams } ;
1820use lance:: io:: ObjectStoreParams ;
1921use lance:: { Error as LanceError , Result as LanceResult } ;
22+ use tokio:: sync:: Mutex ;
23+ use tokio:: task:: JoinHandle ;
24+ use tracing:: { error, info, warn} ;
2025
2126use crate :: record:: { ContextRecord , SearchResult , StateMetadata } ;
2227
2328/// Embedding length used for the semantic index column.
2429const DEFAULT_EMBEDDING_DIM : i32 = 1536 ;
2530const DEFAULT_SEARCH_LIMIT : usize = 10 ;
2631
32+ /// Configuration for background compaction.
33+ #[ derive( Debug , Clone ) ]
34+ pub struct CompactionConfig {
35+ /// Whether background compaction is enabled.
36+ pub enabled : bool ,
37+ /// Minimum number of fragments to trigger compaction.
38+ pub min_fragments : usize ,
39+ /// Target rows per fragment after compaction.
40+ pub target_rows_per_fragment : usize ,
41+ /// Maximum rows per row group.
42+ pub max_rows_per_group : usize ,
43+ /// Whether to materialize (remove) deleted rows during compaction.
44+ pub materialize_deletions : bool ,
45+ /// Deletion threshold (0.0-1.0) to trigger materialization.
46+ pub materialize_deletions_threshold : f32 ,
47+ /// Number of threads for compaction (None = auto).
48+ pub num_threads : Option < usize > ,
49+ /// Interval in seconds between compaction checks.
50+ pub check_interval_secs : u64 ,
51+ /// Quiet hours during which compaction is skipped [(start_hour, end_hour)].
52+ pub quiet_hours : Vec < ( u8 , u8 ) > ,
53+ }
54+
55+ impl Default for CompactionConfig {
56+ fn default ( ) -> Self {
57+ Self {
58+ enabled : false ,
59+ min_fragments : 5 ,
60+ target_rows_per_fragment : 1_000_000 ,
61+ max_rows_per_group : 1024 ,
62+ materialize_deletions : true ,
63+ materialize_deletions_threshold : 0.1 ,
64+ num_threads : None ,
65+ check_interval_secs : 300 ,
66+ quiet_hours : vec ! [ ] ,
67+ }
68+ }
69+ }
70+
71+ /// Statistics about compaction status and history.
72+ #[ derive( Debug , Clone ) ]
73+ pub struct CompactionStats {
74+ /// Current number of fragments in the dataset.
75+ pub total_fragments : usize ,
76+ /// Whether a compaction is currently in progress.
77+ pub is_compacting : bool ,
78+ /// Timestamp of the last successful compaction.
79+ pub last_compaction : Option < DateTime < Utc > > ,
80+ /// Error message from the last failed compaction.
81+ pub last_error : Option < String > ,
82+ /// Total number of successful compactions performed.
83+ pub total_compactions : u64 ,
84+ }
85+
86+ /// Internal state for tracking background compaction.
87+ struct CompactionState {
88+ background_task : Option < JoinHandle < ( ) > > ,
89+ is_compacting : bool ,
90+ last_compaction : Option < DateTime < Utc > > ,
91+ last_error : Option < String > ,
92+ total_compactions : u64 ,
93+ }
94+
2795/// Persistent Lance-backed context store.
2896#[ derive( Clone ) ]
2997pub struct ContextStore {
3098 dataset : Dataset ,
99+ compaction_state : Arc < Mutex < CompactionState > > ,
100+ pub compaction_config : CompactionConfig ,
31101}
32102
33103/// Additional configuration when opening a [`ContextStore`].
34104#[ derive( Debug , Clone , Default ) ]
35105pub struct ContextStoreOptions {
36106 pub storage_options : Option < HashMap < String , String > > ,
107+ pub compaction : CompactionConfig ,
37108}
38109
39110impl ContextStoreOptions {
@@ -52,14 +123,30 @@ impl ContextStore {
52123 /// Open a dataset with explicit object store configuration (e.g. S3 credentials).
53124 pub async fn open_with_options ( uri : & str , options : ContextStoreOptions ) -> LanceResult < Self > {
54125 let storage_options = options. storage_options ( ) ;
55- match Self :: load_with_options ( uri, storage_options. clone ( ) ) . await {
56- Ok ( dataset) => Ok ( Self { dataset } ) ,
126+ let dataset = match Self :: load_with_options ( uri, storage_options. clone ( ) ) . await {
127+ Ok ( dataset) => dataset,
57128 Err ( LanceError :: DatasetNotFound { .. } ) => {
58- let dataset = Self :: create_with_options ( uri, storage_options) . await ?;
59- Ok ( Self { dataset } )
129+ Self :: create_with_options ( uri, storage_options) . await ?
60130 }
61- Err ( err) => Err ( err) ,
62- }
131+ Err ( err) => return Err ( err) ,
132+ } ;
133+
134+ let mut store = Self {
135+ dataset,
136+ compaction_state : Arc :: new ( Mutex :: new ( CompactionState {
137+ background_task : None ,
138+ is_compacting : false ,
139+ last_compaction : None ,
140+ last_error : None ,
141+ total_compactions : 0 ,
142+ } ) ) ,
143+ compaction_config : options. compaction ,
144+ } ;
145+
146+ // Start background compaction if enabled
147+ store. start_background_compaction ( ) . await ?;
148+
149+ Ok ( store)
63150 }
64151
65152 /// Append context records to the store and return the new dataset version.
@@ -146,6 +233,166 @@ impl ContextStore {
146233 Ok ( results)
147234 }
148235
236+ /// Manually trigger compaction to merge small fragments.
237+ pub async fn compact ( & mut self , options : Option < CompactionConfig > ) -> LanceResult < CompactionMetrics > {
238+ let config = options. unwrap_or_else ( || self . compaction_config . clone ( ) ) ;
239+
240+ info ! ( "Starting compaction: {} fragments" , self . dataset. count_fragments( ) ) ;
241+ let start = std:: time:: Instant :: now ( ) ;
242+
243+ // Mark as compacting
244+ {
245+ let mut state = self . compaction_state . lock ( ) . await ;
246+ if state. is_compacting {
247+ warn ! ( "Compaction already in progress, skipping" ) ;
248+ return Err ( LanceError :: from ( ArrowError :: InvalidArgumentError (
249+ "Compaction already in progress" . to_string ( ) ,
250+ ) ) ) ;
251+ }
252+ state. is_compacting = true ;
253+ }
254+
255+ // Build Lance CompactionOptions
256+ let lance_options = CompactionOptions {
257+ target_rows_per_fragment : config. target_rows_per_fragment ,
258+ max_rows_per_group : config. max_rows_per_group ,
259+ materialize_deletions : config. materialize_deletions ,
260+ materialize_deletions_threshold : config. materialize_deletions_threshold ,
261+ num_threads : config. num_threads ,
262+ ..Default :: default ( )
263+ } ;
264+
265+ // Run compaction
266+ let result = compact_files ( & mut self . dataset , lance_options, None ) . await ;
267+
268+ // Update state
269+ let mut state = self . compaction_state . lock ( ) . await ;
270+ state. is_compacting = false ;
271+
272+ match result {
273+ Ok ( metrics) => {
274+ state. last_compaction = Some ( Utc :: now ( ) ) ;
275+ state. total_compactions += 1 ;
276+ state. last_error = None ;
277+
278+ info ! (
279+ "Compaction completed in {:?}: removed {} fragments ({}files), added {} fragments ({} files)" ,
280+ start. elapsed( ) ,
281+ metrics. fragments_removed,
282+ metrics. files_removed,
283+ metrics. fragments_added,
284+ metrics. files_added
285+ ) ;
286+
287+ // Reload dataset to see new version
288+ self . dataset = Dataset :: open ( self . dataset . uri ( ) ) . await ?;
289+
290+ Ok ( metrics)
291+ }
292+ Err ( e) => {
293+ error ! ( "Compaction failed: {}" , e) ;
294+ state. last_error = Some ( e. to_string ( ) ) ;
295+ Err ( e)
296+ }
297+ }
298+ }
299+
300+ /// Check if compaction should run based on configuration thresholds.
301+ pub async fn should_compact ( & self ) -> LanceResult < bool > {
302+ let fragment_count = self . dataset . count_fragments ( ) ;
303+
304+ if fragment_count < self . compaction_config . min_fragments {
305+ return Ok ( false ) ;
306+ }
307+
308+ // Check quiet hours
309+ if !self . compaction_config . quiet_hours . is_empty ( ) {
310+ let now = Utc :: now ( ) ;
311+ let current_hour = now. hour ( ) as u8 ;
312+
313+ for ( start, end) in & self . compaction_config . quiet_hours {
314+ if current_hour >= * start && current_hour < * end {
315+ info ! ( "Skipping compaction during quiet hours ({}-{})" , start, end) ;
316+ return Ok ( false ) ;
317+ }
318+ }
319+ }
320+
321+ Ok ( true )
322+ }
323+
324+ /// Get current compaction statistics.
325+ pub async fn compaction_stats ( & self ) -> LanceResult < CompactionStats > {
326+ let state = self . compaction_state . lock ( ) . await ;
327+
328+ Ok ( CompactionStats {
329+ total_fragments : self . dataset . count_fragments ( ) ,
330+ is_compacting : state. is_compacting ,
331+ last_compaction : state. last_compaction ,
332+ last_error : state. last_error . clone ( ) ,
333+ total_compactions : state. total_compactions ,
334+ } )
335+ }
336+
337+ /// Start background compaction task if enabled.
338+ async fn start_background_compaction ( & mut self ) -> LanceResult < ( ) > {
339+ if !self . compaction_config . enabled {
340+ return Ok ( ( ) ) ;
341+ }
342+
343+ let mut state = self . compaction_state . lock ( ) . await ;
344+ if state. background_task . is_some ( ) {
345+ warn ! ( "Background compaction already running" ) ;
346+ return Ok ( ( ) ) ;
347+ }
348+
349+ info ! (
350+ "Starting background compaction (interval: {}s, min fragments: {})" ,
351+ self . compaction_config. check_interval_secs, self . compaction_config. min_fragments
352+ ) ;
353+
354+ let mut store_clone = self . clone ( ) ;
355+ let interval_secs = self . compaction_config . check_interval_secs ;
356+
357+ let task = tokio:: spawn ( async move {
358+ let mut interval = tokio:: time:: interval ( Duration :: from_secs ( interval_secs) ) ;
359+
360+ loop {
361+ interval. tick ( ) . await ;
362+
363+ match store_clone. should_compact ( ) . await {
364+ Ok ( true ) => {
365+ info ! ( "Background compaction triggered" ) ;
366+ if let Err ( e) = store_clone. compact ( None ) . await {
367+ error ! ( "Background compaction failed: {}" , e) ;
368+ }
369+ }
370+ Ok ( false ) => {
371+ // Not needed or in quiet hours
372+ }
373+ Err ( e) => {
374+ error ! ( "Error checking compaction need: {}" , e) ;
375+ }
376+ }
377+ }
378+ } ) ;
379+
380+ state. background_task = Some ( task) ;
381+ Ok ( ( ) )
382+ }
383+
384+ /// Stop background compaction task.
385+ pub async fn stop_background_compaction ( & mut self ) -> LanceResult < ( ) > {
386+ let mut state = self . compaction_state . lock ( ) . await ;
387+
388+ if let Some ( task) = state. background_task . take ( ) {
389+ info ! ( "Stopping background compaction" ) ;
390+ task. abort ( ) ;
391+ }
392+
393+ Ok ( ( ) )
394+ }
395+
149396 /// Lance schema for the context store.
150397 pub fn schema ( ) -> Schema {
151398 Schema :: new ( vec ! [
@@ -368,6 +615,17 @@ impl ContextStore {
368615 }
369616}
370617
618+ impl Drop for ContextStore {
619+ fn drop ( & mut self ) {
620+ // Best-effort cleanup of background task
621+ if let Ok ( mut state) = self . compaction_state . try_lock ( ) {
622+ if let Some ( task) = state. background_task . take ( ) {
623+ task. abort ( ) ;
624+ }
625+ }
626+ }
627+ }
628+
371629fn batch_to_search_results ( batch : & RecordBatch ) -> LanceResult < Vec < SearchResult > > {
372630 let records = batch_to_records ( batch) ?;
373631
0 commit comments