1212// See the License for the specific language governing permissions and
1313// limitations under the License.
1414
15- //! Planning logic for mature-merge: pure, synchronous functions that decide
16- //! which splits to merge and how to group them. No I/O, no actors.
17- //!
18- //! The main entry point is [`plan_merge_operations_for_index`].
19-
2015use std:: collections:: HashMap ;
2116use std:: time:: Duration ;
2217
@@ -27,6 +22,12 @@ use time::OffsetDateTime;
2722use crate :: mature_merge:: MatureMergeConfig ;
2823use crate :: merge_policy:: MergeOperation ;
2924
25+ pub const SECS_PER_DAY : i64 = 60 * 60 * 24 ;
26+
27+ /// Wait a couple of hours after the split got mature to be extra sure no merge
28+ /// process is still running on it.
29+ pub const MATURITY_BUFFER : Duration = Duration :: from_hours ( 6 ) ;
30+
3031/// Computes the earliest UTC-day midnight (seconds since epoch) that is safe to merge,
3132/// given the index's retention policy and the current time.
3233fn retention_safety_cutoff_secs (
@@ -43,7 +44,7 @@ fn retention_safety_cutoff_secs(
4344 }
4445 let cutoff_raw = now_secs - period. as_secs ( ) as i64 + retention_safety_buffer. as_secs ( ) as i64 ;
4546 // Round up to the next day boundary so we never partially exclude a day bucket.
46- Some ( ( cutoff_raw / 86400 + 1 ) * 86400 )
47+ Some ( ( cutoff_raw / SECS_PER_DAY + 1 ) * SECS_PER_DAY )
4748}
4849
4950/// Converts a single day-bucket group of eligible splits into one or more balanced
@@ -55,12 +56,6 @@ fn plan_operations_for_group(
5556 if group_splits. len ( ) < config. min_merge_group_size {
5657 return Vec :: new ( ) ;
5758 }
58- if !group_splits
59- . iter ( )
60- . all ( |s| s. num_docs < config. input_split_max_num_docs )
61- {
62- return Vec :: new ( ) ;
63- }
6459 // Sort ascending by end time so each sub-operation covers the most compact range.
6560 group_splits. sort_by_key ( |s| s. time_range . as_ref ( ) . map ( |r| * r. end ( ) ) . unwrap_or ( 0 ) ) ;
6661
@@ -92,6 +87,11 @@ fn plan_operations_for_group(
9287/// fall on the same UTC day (i.e., the split does not span midnight).
9388/// - Immature splits are excluded.
9489/// - Splits whose `time_range.end()` falls within the retention safety buffer are excluded.
90+ ///
91+ /// Important: This plan merges splits accross sources. It can be problematic if
92+ /// the IndexingSettings are different (e.g different maturation period), which
93+ /// was made possible on Kafka sources by specifying an override in the
94+ /// client_params.
9595pub fn plan_merge_operations_for_index (
9696 index_config : & IndexConfig ,
9797 splits : Vec < SplitMetadata > ,
@@ -107,7 +107,12 @@ pub fn plan_merge_operations_for_index(
107107
108108 for split in splits {
109109 // Only splits that have been mature for a while
110- if !split. is_mature ( now - Duration :: from_hours ( 6 ) ) {
110+ if !split. is_mature ( now - MATURITY_BUFFER ) {
111+ continue ;
112+ }
113+
114+ // Enforce the max size for splits to be considered for merging.
115+ if split. num_docs > config. input_split_max_num_docs {
111116 continue ;
112117 }
113118
@@ -116,8 +121,8 @@ pub fn plan_merge_operations_for_index(
116121 continue ;
117122 } ;
118123
119- let start_day = time_range. start ( ) / 86400 ;
120- let end_day = time_range. end ( ) / 86400 ;
124+ let start_day = time_range. start ( ) / SECS_PER_DAY ;
125+ let end_day = time_range. end ( ) / SECS_PER_DAY ;
121126
122127 // Both endpoints must fall on the same UTC day.
123128 if start_day != end_day {
@@ -131,7 +136,7 @@ pub fn plan_merge_operations_for_index(
131136 continue ;
132137 }
133138
134- let day_bucket = start_day * 86400 ;
139+ let day_bucket = start_day * SECS_PER_DAY ;
135140 let key = (
136141 split. partition_id ,
137142 split. doc_mapping_uid . to_string ( ) ,
@@ -161,7 +166,7 @@ mod tests {
161166 /// Builds a mature [`SplitMetadata`] for use in tests.
162167 ///
163168 /// - `day_bucket`: UTC day expressed as seconds-since-epoch (midnight). For example `day_bucket
164- /// = 0` means 1970-01-01, `day_bucket = 86400 ` means 1970-01-02.
169+ /// = 0` means 1970-01-01, `day_bucket = SECS_PER_DAY ` means 1970-01-02.
165170 fn mature_split_for_test (
166171 split_id : & str ,
167172 index_uid : & IndexUid ,
@@ -199,11 +204,11 @@ mod tests {
199204
200205 // UTC day 0 = 1970-01-01. Use a recent-ish day to avoid the retention buffer.
201206 // We use day 20000 (approx 2024-10) so splits are "recent" relative to a "now" we control.
202- const RECENT_DAY : i64 = 20_000 * 86400 ;
207+ const RECENT_DAY : i64 = 20_000 * SECS_PER_DAY ;
203208
204209 fn now_well_after_recent_day ( ) -> OffsetDateTime {
205210 // 1 day after the splits' day — they are mature but not in a retention buffer.
206- OffsetDateTime :: from_unix_timestamp ( RECENT_DAY + 86400 + 1 ) . unwrap ( )
211+ OffsetDateTime :: from_unix_timestamp ( RECENT_DAY + SECS_PER_DAY + 1 ) . unwrap ( )
207212 }
208213
209214 #[ test]
@@ -238,7 +243,7 @@ mod tests {
238243 fn test_plan_below_threshold ( ) {
239244 let index_uid = IndexUid :: for_test ( "test-index" , 0 ) ;
240245 let doc_mapping_uid = DocMappingUid :: random ( ) ;
241- // Only 4 splits — below MIN_MERGE_GROUP_SIZE (6 ).
246+ // Only 4 splits — below the min_merge_group_size (5 ).
242247 let splits: Vec < SplitMetadata > = ( 0 ..4 )
243248 . map ( |i| {
244249 mature_split_for_test (
@@ -256,7 +261,10 @@ mod tests {
256261 & index_config_no_retention ( ) ,
257262 splits,
258263 now_well_after_recent_day ( ) ,
259- & MatureMergeConfig :: default ( ) ,
264+ & MatureMergeConfig {
265+ min_merge_group_size : 5 ,
266+ ..Default :: default ( )
267+ } ,
260268 ) ;
261269
262270 assert ! ( operations. is_empty( ) , "expected no operations for 4 splits" ) ;
@@ -342,7 +350,7 @@ mod tests {
342350 // Then: cutoff_raw = (RECENT_DAY + 91d) - 90d + 30d = RECENT_DAY + 31d
343351 // cutoff = RECENT_DAY + 32d (rounded up to next day boundary)
344352 // Because RECENT_DAY + 3600 < cutoff, splits should be excluded.
345- let now_ts = RECENT_DAY + 91 * 86400 ;
353+ let now_ts = RECENT_DAY + 91 * SECS_PER_DAY ;
346354 let now = OffsetDateTime :: from_unix_timestamp ( now_ts) . unwrap ( ) ;
347355
348356 let splits: Vec < SplitMetadata > = ( 0 ..10 )
0 commit comments