Skip to content

Commit 9c80a1d

Browse files
mattsu2020oech3
authored andcommitted
fix(sort): GNU sort-continue.sh test (uutils#9107)
* feat: dynamically adjust merge batch size based on file descriptor limits - Add `effective_merge_batch_size()` function to calculate batch size considering fd soft limit, with minimums and safety margins. - Generalize fd limit handling from Linux-only to Unix systems using `fd_soft_limit()`. - Update merge logic to use dynamic batch size instead of fixed `settings.merge_batch_size` to prevent fd exhaustion. * fix(sort): update rlimit fetching to use fd_soft_limit with error handling Replace direct call to get_rlimit()? with fd_soft_limit(), adding a check for None value to return a usage error if rlimit cannot be fetched. This improves robustness on Linux by ensuring proper error handling when retrieving the file descriptor soft limit. * refactor(sort): restrict nix::libc and fd_soft_limit to Linux Update conditional compilation attributes from #[cfg(unix)] to #[cfg(target_os = "linux")] for the nix::libc import and fd_soft_limit function implementations, ensuring these features are only enabled on Linux systems to improve portability and avoid issues on other Unix-like platforms. * refactor: improve thread management and replace unsafe libc calls Replace unsafe libc::getrlimit calls in fd_soft_limit with safe nix crate usage. Update Rayon thread configuration to use ThreadPoolBuilder instead of environment variables for better control. Add documentation comment to effective_merge_batch_size function for clarity. * refactor(linux): improve error handling in fd_soft_limit function Extract the rlimit fetching logic into a separate `get_rlimit` function that returns `UResult<usize>` and properly handles errors with `UUsageError`, instead of silently returning `None` on failure or infinity. This provides better error reporting for resource limit issues on Linux platforms. * refactor(sort): reorder imports in get_rlimit for consistency Reordered the nix::sys::resource imports to group constants first (RLIM_INFINITY), then types (Resource), and finally functions (getrlimit), improving code readability and adhering to import style guidelines.
1 parent 9258427 commit 9c80a1d

2 files changed

Lines changed: 70 additions & 17 deletions

File tree

src/uu/sort/src/merge.rs

Lines changed: 32 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ use uucore::error::{FromIo, UResult};
3030
use crate::{
3131
GlobalSettings, Output, SortError,
3232
chunks::{self, Chunk, RecycledChunk},
33-
compare_by, open,
33+
compare_by, fd_soft_limit, open,
3434
tmp_dir::TmpDirWrapper,
3535
};
3636

@@ -62,6 +62,28 @@ fn replace_output_file_in_input_files(
6262
Ok(())
6363
}
6464

65+
/// Determine the effective merge batch size, enforcing a minimum and respecting the
66+
/// file-descriptor soft limit after reserving stdio/output and a safety margin.
67+
fn effective_merge_batch_size(settings: &GlobalSettings) -> usize {
68+
const MIN_BATCH_SIZE: usize = 2;
69+
const RESERVED_STDIO: usize = 3;
70+
const RESERVED_OUTPUT: usize = 1;
71+
const SAFETY_MARGIN: usize = 1;
72+
let mut batch_size = settings.merge_batch_size.max(MIN_BATCH_SIZE);
73+
74+
if let Some(limit) = fd_soft_limit() {
75+
let reserved = RESERVED_STDIO + RESERVED_OUTPUT + SAFETY_MARGIN;
76+
let available_inputs = limit.saturating_sub(reserved);
77+
if available_inputs >= MIN_BATCH_SIZE {
78+
batch_size = batch_size.min(available_inputs);
79+
} else {
80+
batch_size = MIN_BATCH_SIZE;
81+
}
82+
}
83+
84+
batch_size
85+
}
86+
6587
/// Merge pre-sorted `Box<dyn Read>`s.
6688
///
6789
/// If `settings.merge_batch_size` is greater than the length of `files`, intermediate files will be used.
@@ -94,18 +116,21 @@ pub fn merge_with_file_limit<
94116
output: Output,
95117
tmp_dir: &mut TmpDirWrapper,
96118
) -> UResult<()> {
97-
if files.len() <= settings.merge_batch_size {
119+
let batch_size = effective_merge_batch_size(settings);
120+
debug_assert!(batch_size >= 2);
121+
122+
if files.len() <= batch_size {
98123
let merger = merge_without_limit(files, settings);
99124
merger?.write_all(settings, output)
100125
} else {
101126
let mut temporary_files = vec![];
102-
let mut batch = vec![];
127+
let mut batch = Vec::with_capacity(batch_size);
103128
for file in files {
104129
batch.push(file);
105-
if batch.len() >= settings.merge_batch_size {
106-
assert_eq!(batch.len(), settings.merge_batch_size);
130+
if batch.len() >= batch_size {
131+
assert_eq!(batch.len(), batch_size);
107132
let merger = merge_without_limit(batch.into_iter(), settings)?;
108-
batch = vec![];
133+
batch = Vec::with_capacity(batch_size);
109134

110135
let mut tmp_file =
111136
Tmp::create(tmp_dir.next_file()?, settings.compress_prog.as_deref())?;
@@ -115,7 +140,7 @@ pub fn merge_with_file_limit<
115140
}
116141
// Merge any remaining files that didn't get merged in a full batch above.
117142
if !batch.is_empty() {
118-
assert!(batch.len() < settings.merge_batch_size);
143+
assert!(batch.len() < batch_size);
119144
let merger = merge_without_limit(batch.into_iter(), settings)?;
120145

121146
let mut tmp_file =

src/uu/sort/src/sort.rs

Lines changed: 38 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1073,13 +1073,27 @@ fn make_sort_mode_arg(mode: &'static str, short: char, help: String) -> Arg {
10731073

10741074
#[cfg(target_os = "linux")]
10751075
fn get_rlimit() -> UResult<usize> {
1076-
use nix::sys::resource::{Resource, getrlimit};
1076+
use nix::sys::resource::{RLIM_INFINITY, Resource, getrlimit};
10771077

1078-
getrlimit(Resource::RLIMIT_NOFILE)
1079-
.map(|(rlim_cur, _)| rlim_cur as usize)
1078+
let (rlim_cur, _rlim_max) = getrlimit(Resource::RLIMIT_NOFILE)
1079+
.map_err(|_| UUsageError::new(2, translate!("sort-failed-fetch-rlimit")))?;
1080+
if rlim_cur == RLIM_INFINITY {
1081+
return Err(UUsageError::new(2, translate!("sort-failed-fetch-rlimit")));
1082+
}
1083+
usize::try_from(rlim_cur)
10801084
.map_err(|_| UUsageError::new(2, translate!("sort-failed-fetch-rlimit")))
10811085
}
10821086

1087+
#[cfg(target_os = "linux")]
1088+
pub(crate) fn fd_soft_limit() -> Option<usize> {
1089+
get_rlimit().ok()
1090+
}
1091+
1092+
#[cfg(not(target_os = "linux"))]
1093+
pub(crate) fn fd_soft_limit() -> Option<usize> {
1094+
None
1095+
}
1096+
10831097
const STDIN_FILE: &str = "-";
10841098

10851099
/// Legacy `+POS1 [-POS2]` syntax is permitted unless `_POSIX2_VERSION` is in
@@ -1232,12 +1246,12 @@ fn default_merge_batch_size() -> usize {
12321246
#[cfg(target_os = "linux")]
12331247
{
12341248
// Adjust merge batch size dynamically based on available file descriptors.
1235-
match get_rlimit() {
1236-
Ok(limit) => {
1249+
match fd_soft_limit() {
1250+
Some(limit) => {
12371251
let usable_limit = limit.saturating_div(LINUX_BATCH_DIVISOR);
12381252
usable_limit.clamp(LINUX_BATCH_MIN, LINUX_BATCH_MAX)
12391253
}
1240-
Err(_) => 64,
1254+
None => 64,
12411255
}
12421256
}
12431257

@@ -1366,9 +1380,15 @@ pub fn uumain(args: impl uucore::Args) -> UResult<()> {
13661380
settings.threads = matches
13671381
.get_one::<String>(options::PARALLEL)
13681382
.map_or_else(|| "0".to_string(), String::from);
1369-
unsafe {
1370-
env::set_var("RAYON_NUM_THREADS", &settings.threads);
1371-
}
1383+
let num_threads = match settings.threads.parse::<usize>() {
1384+
Ok(0) | Err(_) => std::thread::available_parallelism()
1385+
.map(|n| n.get())
1386+
.unwrap_or(1),
1387+
Ok(n) => n,
1388+
};
1389+
let _ = rayon::ThreadPoolBuilder::new()
1390+
.num_threads(num_threads)
1391+
.build_global();
13721392
}
13731393

13741394
if let Some(size_str) = matches.get_one::<String>(options::BUF_SIZE) {
@@ -1419,7 +1439,15 @@ pub fn uumain(args: impl uucore::Args) -> UResult<()> {
14191439

14201440
translate!(
14211441
"sort-maximum-batch-size-rlimit",
1422-
"rlimit" => get_rlimit()?
1442+
"rlimit" => {
1443+
let Some(rlimit) = fd_soft_limit() else {
1444+
return Err(UUsageError::new(
1445+
2,
1446+
translate!("sort-failed-fetch-rlimit"),
1447+
));
1448+
};
1449+
rlimit
1450+
}
14231451
)
14241452
}
14251453
#[cfg(not(target_os = "linux"))]

0 commit comments

Comments
 (0)