Skip to content

Commit cfb2d61

Browse files
authored
Fix temporary directory race condition in sklearn estimators (#529)
Caution : for the tests, MPI must be disabled otherwise a segfault would occur because of a huge use of resources
1 parent 9645665 commit cfb2d61

File tree

3 files changed

+114
-21
lines changed

3 files changed

+114
-21
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
### Fixed
1212
- (General) Automatic Credentials Discovery-based credential retrieval for
1313
Google cloud storage (GCS).
14+
- (`sklearn`) Temporary directory race condition in estimators.
1415

1516
## 11.0.0.0 - 2025-12-19
1617

khiops/sklearn/estimators.py

Lines changed: 0 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -362,8 +362,6 @@ def fit(self, X, y=None, **kwargs):
362362

363363
# Create temporary directory and tables
364364
computation_dir = self._create_computation_dir("fit")
365-
initial_runner_temp_dir = kh.get_runner().root_temp_dir
366-
kh.get_runner().root_temp_dir = computation_dir
367365

368366
# Create the dataset, fit the model and reset in case of any failure
369367
try:
@@ -377,7 +375,6 @@ def fit(self, X, y=None, **kwargs):
377375
# Cleanup and restore the runner's temporary dir
378376
finally:
379377
self._cleanup_computation_dir(computation_dir)
380-
kh.get_runner().root_temp_dir = initial_runner_temp_dir
381378

382379
# If on "fitted" state then:
383380
# - self.model_ must be a DictionaryDomain
@@ -963,15 +960,13 @@ def _simplify(
963960
computation_dir = self._create_computation_dir("simplify")
964961
output_dir = self._get_output_dir(computation_dir)
965962
simplify_log_file_path = fs.get_child_path(output_dir, "khiops_simplify_cc.log")
966-
initial_runner_temp_dir = kh.get_runner().root_temp_dir
967963
full_coclustering_file_path = fs.get_child_path(
968964
output_dir, "FullCoclustering.khcj"
969965
)
970966
simplified_coclustering_file_path = fs.get_child_path(
971967
output_dir, "Coclustering.khcj"
972968
)
973969
self.model_report_.write_khiops_json_file(full_coclustering_file_path)
974-
kh.get_runner().root_temp_dir = computation_dir
975970
try:
976971
# - simplify_coclustering, then
977972
# - prepare_coclustering_deployment
@@ -1040,7 +1035,6 @@ def _simplify(
10401035
)
10411036
finally:
10421037
self._cleanup_computation_dir(computation_dir)
1043-
kh.get_runner().root_temp_dir = initial_runner_temp_dir
10441038
return simplified_cc
10451039

10461040
def simplify(
@@ -1101,8 +1095,6 @@ def predict(self, X):
11011095
"""
11021096
# Create temporary directory
11031097
computation_dir = self._create_computation_dir("predict")
1104-
initial_runner_temp_dir = kh.get_runner().root_temp_dir
1105-
kh.get_runner().root_temp_dir = computation_dir
11061098

11071099
# Create the input dataset
11081100
ds = Dataset(X)
@@ -1119,7 +1111,6 @@ def predict(self, X):
11191111
# Cleanup and restore the runner's temporary dir
11201112
finally:
11211113
self._cleanup_computation_dir(computation_dir)
1122-
kh.get_runner().root_temp_dir = initial_runner_temp_dir
11231114

11241115
# Transform to numpy.array
11251116
y_pred = y_pred.to_numpy()
@@ -1557,8 +1548,6 @@ def predict(self, X):
15571548
"""
15581549
# Create temporary directory
15591550
computation_dir = self._create_computation_dir("predict")
1560-
initial_runner_temp_dir = kh.get_runner().root_temp_dir
1561-
kh.get_runner().root_temp_dir = computation_dir
15621551

15631552
try:
15641553
# Create the input dataset
@@ -1575,10 +1564,6 @@ def predict(self, X):
15751564
# Cleanup and restore the runner's temporary dir
15761565
finally:
15771566
self._cleanup_computation_dir(computation_dir)
1578-
kh.get_runner().root_temp_dir = initial_runner_temp_dir
1579-
1580-
# Restore the runner's temporary dir
1581-
kh.get_runner().root_temp_dir = initial_runner_temp_dir
15821567

15831568
# Return pd.Series in the monotable + pandas case
15841569
assert isinstance(y_pred, (str, pd.DataFrame)), "Expected str or DataFrame"
@@ -1997,8 +1982,6 @@ def predict_proba(self, X):
19971982
"""
19981983
# Create temporary directory and tables
19991984
computation_dir = self._create_computation_dir("predict_proba")
2000-
initial_runner_temp_dir = kh.get_runner().root_temp_dir
2001-
kh.get_runner().root_temp_dir = computation_dir
20021985

20031986
# Create the input dataset
20041987

@@ -2015,7 +1998,6 @@ def predict_proba(self, X):
20151998
# Cleanup and restore the runner's temporary dir
20161999
finally:
20172000
self._cleanup_computation_dir(computation_dir)
2018-
kh.get_runner().root_temp_dir = initial_runner_temp_dir
20192001

20202002
# - Reorder the columns to that of self.classes_
20212003
# - Transform to np.ndarray
@@ -2649,8 +2631,6 @@ def transform(self, X):
26492631
"""
26502632
# Create temporary directory
26512633
computation_dir = self._create_computation_dir("transform")
2652-
initial_runner_temp_dir = kh.get_runner().root_temp_dir
2653-
kh.get_runner().root_temp_dir = computation_dir
26542634

26552635
# Create and transform the dataset
26562636
try:
@@ -2665,7 +2645,6 @@ def transform(self, X):
26652645
# Cleanup and restore the runner's temporary dir
26662646
finally:
26672647
self._cleanup_computation_dir(computation_dir)
2668-
kh.get_runner().root_temp_dir = initial_runner_temp_dir
26692648
return X_transformed.to_numpy(copy=False)
26702649

26712650
def _transform_prepare_deployment_for_transform(self, ds):

tests/test_sklearn.py

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,18 +6,24 @@
66
######################################################################################
77
"""Tests parameter transfer between Khiops sklearn and core APIs"""
88
import contextlib
9+
import io
910
import os
1011
import shutil
1112
import unittest
1213
import warnings
14+
from concurrent.futures import as_completed
15+
from concurrent.futures.thread import ThreadPoolExecutor
1316
from itertools import product
1417

1518
import numpy as np
19+
import pandas as pd
1620
from sklearn.exceptions import NotFittedError
1721
from sklearn.utils.estimator_checks import check_estimator
1822
from sklearn.utils.validation import check_is_fitted
1923

2024
import khiops.core as kh
25+
import khiops.core.internals.filesystems as fs
26+
from khiops.core.internals.runner import KhiopsLocalRunner
2127
from khiops.sklearn.estimators import (
2228
KhiopsClassifier,
2329
KhiopsCoclustering,
@@ -1773,6 +1779,27 @@ def test_sklearn_check_estimator(self):
17731779
print("Done")
17741780

17751781

1782+
def no_mpi(func):
1783+
"""Disable MPI (in setting proc number to 1) to save resources"""
1784+
1785+
def inner(self):
1786+
# set up a single cpu runner
1787+
initial_runner = kh.get_runner()
1788+
initial_proc_number = os.environ["KHIOPS_PROC_NUMBER"]
1789+
os.environ["KHIOPS_PROC_NUMBER"] = "1"
1790+
single_cpu_runner = KhiopsLocalRunner()
1791+
kh.set_runner(single_cpu_runner)
1792+
1793+
# call the initial test function
1794+
func(self)
1795+
1796+
# restore the runner
1797+
os.environ["KHIOPS_PROC_NUMBER"] = initial_proc_number
1798+
kh.set_runner(initial_runner)
1799+
1800+
return inner
1801+
1802+
17761803
class KhiopsSklearnVariousTests(unittest.TestCase):
17771804
"""Miscelanous sklearn classes tests"""
17781805

@@ -1827,3 +1854,89 @@ def test_export_operations_raise_when_not_fitted(self):
18271854
with self.subTest(export_operation=export_operation, estimator=estimator):
18281855
with self.assertRaises(NotFittedError):
18291856
getattr(estimator, export_operation)("report.khj")
1857+
1858+
@no_mpi
1859+
def test_concurrency_safe_operations(self):
1860+
"""Ensure no race condition occurs when running concurrent operations"""
1861+
1862+
# Define all the function calls that will be submitted to the threads
1863+
def predict_func(clf, X):
1864+
return clf.predict(X)
1865+
1866+
def predict_proba_func(clf, X):
1867+
return clf.predict_proba(X)
1868+
1869+
def encoder_fit_transform_func(khe, X, y):
1870+
return khe.fit_transform(X, y)
1871+
1872+
def estimator_fit_func(khcc, X, id_column):
1873+
return khcc.fit(X, id_column=id_column)
1874+
1875+
def coclustering_simplify_func(khcc):
1876+
return khcc.simplify()
1877+
1878+
def coclustering_predict_func(khcc, X):
1879+
return khcc.predict(X)
1880+
1881+
clf = KhiopsClassifier(n_trees=0)
1882+
adult_df = pd.read_csv(
1883+
f"{kh.get_samples_dir()}/Adult/Adult.txt", sep="\t", header=0
1884+
)
1885+
X = adult_df.drop("class", axis=1)
1886+
clf.fit(X, adult_df["class"])
1887+
1888+
# Test `predict`, `predict_proba` of `KhiopsPredictor` and its children
1889+
# (`KhiopsClassifier` and `KhiopsRegressor`)
1890+
with ThreadPoolExecutor(max_workers=5) as executor:
1891+
futures = {executor.submit(predict_func, clf, X): i for i in range(5)}
1892+
for future in as_completed(futures):
1893+
print(future.result())
1894+
futures = {executor.submit(predict_proba_func, clf, X): i for i in range(5)}
1895+
for future in as_completed(futures):
1896+
print(future.result())
1897+
1898+
# Test `transform` of `KhiopsEncoder`
1899+
khe = KhiopsEncoder()
1900+
1901+
y = adult_df["class"]
1902+
with ThreadPoolExecutor(max_workers=5) as executor:
1903+
futures = {
1904+
executor.submit(encoder_fit_transform_func, khe, X, y): i
1905+
for i in range(5)
1906+
}
1907+
for future in as_completed(futures):
1908+
print(future.result())
1909+
1910+
# Test `fit`, `simplify` and `predict` of
1911+
# `KhiopsCoclustering` and `KhiopsEstimator`
1912+
splice_data_dir = fs.get_child_path(
1913+
kh.get_runner().samples_dir, "SpliceJunction"
1914+
)
1915+
splice_data_file_path = fs.get_child_path(
1916+
splice_data_dir, "SpliceJunctionDNA.txt"
1917+
)
1918+
1919+
# Read the splice junction secondary datatable
1920+
with io.BytesIO(fs.read(splice_data_file_path)) as splice_data_file:
1921+
splice_df = pd.read_csv(splice_data_file, sep="\t")
1922+
1923+
khcc = KhiopsCoclustering()
1924+
1925+
with ThreadPoolExecutor(max_workers=5) as executor:
1926+
futures = {
1927+
executor.submit(estimator_fit_func, khcc, splice_df, "SampleId"): i
1928+
for i in range(5)
1929+
}
1930+
for future in as_completed(futures):
1931+
print(future.result())
1932+
futures = {
1933+
executor.submit(coclustering_simplify_func, khcc): i for i in range(5)
1934+
}
1935+
for future in as_completed(futures):
1936+
print(future.result())
1937+
futures = {
1938+
executor.submit(coclustering_predict_func, khcc, splice_df): i
1939+
for i in range(5)
1940+
}
1941+
for future in as_completed(futures):
1942+
print(future.result())

0 commit comments

Comments
 (0)