-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathrunner.py
More file actions
1561 lines (1343 loc) · 60.9 KB
/
Copy pathrunner.py
File metadata and controls
1561 lines (1343 loc) · 60.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
######################################################################################
# Copyright (c) 2023-2025 Orange. All rights reserved. #
# This software is distributed under the BSD 3-Clause-clear License, the text of #
# which is available at https://spdx.org/licenses/BSD-3-Clause-Clear.html or #
# see the "LICENSE.md" file for more details. #
######################################################################################
# Important note:
# To detect the installation environment, this module makes use of the path in
# `__file__`. If you move to another place make sure you properly update any line using
# Path(__file__).
"""Classes implementing Khiops Python' backend runners"""
import io
import os
import pathlib
import platform
import shlex
import shutil
import site
import subprocess
import sys
import tempfile
import uuid
import warnings
from abc import ABC, abstractmethod
from importlib.metadata import PackageNotFoundError, files
from pathlib import Path
import khiops
import khiops.core.internals.filesystems as fs
from khiops.core.exceptions import KhiopsEnvironmentError, KhiopsRuntimeError
from khiops.core.internals.common import (
CommandLineOptions,
SystemSettings,
invalid_keys_message,
is_string_like,
type_error_message,
)
from khiops.core.internals.io import KhiopsOutputWriter
from khiops.core.internals.task import KhiopsTask
from khiops.core.internals.version import KhiopsVersion
def _isdir_without_all_perms(dir_path):
"""Returns True if the path is a directory but missing of any of rwx permissions"""
return os.path.isdir(dir_path) and not os.access(
dir_path, os.R_OK | os.W_OK | os.X_OK
)
def get_default_samples_dir():
"""Returns the default samples directory
The default samples directory is computed according to the following priorities:
- all systems: ``KHIOPS_SAMPLES_DIR/khiops_data/samples`` if set
- Windows:
- ``%PUBLIC%\\khiops_data\\samples`` if ``%PUBLIC%`` is defined
- ``%USERPROFILE%\\khiops_data\\samples`` otherwise
- Linux/macOS: ``$HOME/khiops_data/samples``
"""
if "KHIOPS_SAMPLES_DIR" in os.environ and os.environ["KHIOPS_SAMPLES_DIR"]:
samples_dir = os.environ["KHIOPS_SAMPLES_DIR"]
elif platform.system() == "Windows" and "PUBLIC" in os.environ:
samples_dir = os.path.join(os.environ["PUBLIC"], "khiops_data", "samples")
else:
samples_dir = str(pathlib.Path.home() / "khiops_data" / "samples")
return samples_dir
def _get_dir_status(a_dir):
"""Returns the status of a local or remote directory
Against a local directory a real check is performed. A remote directory is detected
but not checked.
"""
if fs.is_local_resource(a_dir):
a_dir_res = fs.create_resource(os.path.normpath(a_dir))
# a_dir_res is a LocalFilesystemResource already
a_dir_path = a_dir_res.path
if not os.path.exists(a_dir_path):
status = "non-existent"
elif not os.path.isdir(a_dir_path):
status = "not-a-dir"
else:
status = "ok"
else:
status = "remote-path"
assert status in ["non-existent", "not-a-dir", "ok", "remote-path"]
return status
def _check_samples_dir(samples_dir):
# Warn if there are problems with the samples_dir
samples_dir_status = _get_dir_status(samples_dir)
download_msg = (
"Execute the kh-download-datasets script or "
"the khiops.tools.download_datasets function to download them."
)
if samples_dir_status == "non-existent":
warnings.warn(
"Sample datasets location does not exist "
f"({samples_dir}). {download_msg}",
stacklevel=3,
)
elif samples_dir_status == "not-a-dir":
warnings.warn(
"Sample datasets location is not a directory "
f"({samples_dir}). {download_msg}",
stacklevel=3,
)
def _khiops_env_file_exists(env_dir):
"""Check ``khiops_env`` exists relative to the specified environment dir"""
khiops_env_path = os.path.join(env_dir, "khiops_env")
if platform.system() == "Windows":
khiops_env_path += ".cmd"
return os.path.exists(khiops_env_path) and os.path.isfile(khiops_env_path)
def _infer_env_bin_dir_for_conda_based_installations():
"""Infers the bin directory of
*supposed* Conda-based Khiops installations
Returns
-------
str
absolute path of the 'bin' dir where the khiops binaries are installed
.. note::
Borderline case : if no Conda-based Khiops installation is found
this function will return 'bin'
"""
conda_env_dir = _infer_base_dir_for_conda_based_or_pip_installations()
# Conda env binary dir is:
# - on Windows: conda_env_dir\Library\bin
# - on Linux/macOS: conda_env_dir\bin
if platform.system() == "Windows":
env_bin_dir = os.path.join(str(conda_env_dir), "Library", "bin")
else:
env_bin_dir = os.path.join(str(conda_env_dir), "bin")
return env_bin_dir
def _infer_base_dir_for_conda_based_or_pip_installations():
"""Infers reference directory (base directory)
for Khiops installations
This function detects
- 'conda' and 'conda-based' installations
- system-wide pure python installation (in a dist-packages folder)
- pure python virtual environment installation (in a site-packages folder)
Any installation in an unexpected location is regarded as borderline
Returns
-------
str
An absolute path to the base directory
.. note::
It returns an empty string if it detects a borderline installation
"""
assert os.path.basename(Path(__file__).parents[2]) == "khiops", (
"Please fix the `Path.parents` in this method "
"so it finds environment directory of this module"
)
# Obtain a normalized (OS-dependent and without symlinks) full path
# of the current file
current_file_path = Path(__file__).resolve()
# Windows: Match either
# %CONDA_PREFIX%\Lib\site-packages\khiops\core\internals\runner.py
# or {python lib root}\Lib\dist-packages\khiops\core\internals\runner.py
# or {virtual env root}\Lib\site-packages\khiops\core\internals\runner.py
if platform.system() == "Windows":
# safeguard to prevent an IndexError on borderline installations
if len(current_file_path.parents) < 6:
base_dir = ""
else:
base_dir = str(current_file_path.parents[5])
# Linux/macOS: Match either
# $CONDA_PREFIX/[Ll]ib/python3.X/site-packages/khiops/core/internals/runner.py
# or {python lib root}/
# [Ll]ib/python3.X/dist-packages/khiops/core/internals/runner.py
# or {virtual env root}/
# [Ll]ib/python3.X/site-packages/khiops/core/internals/runner.py
else:
# safeguard to prevent an IndexError on borderline installations
if len(current_file_path.parents) < 7:
base_dir = ""
else:
base_dir = str(current_file_path.parents[6])
return base_dir
def _check_conda_env_bin_dir(conda_env_bin_dir):
"""Checks inferred Conda environment binary directory really is one
A real Conda environment binary directory:
- should exist
- should not be directly under the root directory
- should coexist with `conda-meta` directory under the same parent
"""
conda_env_bin_dir_path = Path(conda_env_bin_dir)
# Conda env bin dir should end with `/bin`
assert conda_env_bin_dir_path.parts[-1] == "bin"
is_conda_env_bin_dir = False
# Conda env dir is not equal to its root dir
# Conda env bin dir exists, along with the `conda-meta` dir
# Note: On Windows, Conda env bin dir equals conda env dir\Library\bin
conda_env_dir_path = conda_env_bin_dir_path.parent
if platform.system() == "Windows":
conda_env_dir_path = conda_env_dir_path.parent
if (
str(conda_env_dir_path) != conda_env_dir_path.root # `.root` is an `str`
and conda_env_bin_dir_path.is_dir()
and conda_env_dir_path.joinpath("conda-meta").is_dir()
):
is_conda_env_bin_dir = True
return is_conda_env_bin_dir
def _infer_khiops_installation_method(trace=False):
"""Returns the Khiops installation method
Definitions :
- 'conda' environment contains binaries, shared libraries and python libraries
- 'conda-based' environment is similar to 'conda' except that
it was not activated previously nor during the execution
and thus the CONDA_PREFIX environment variable is undefined
and the path to the `bin` directory inside the conda environment is not in PATH
- 'binary+pip' installs the binaries and the shared libraries system-wide
but will keep the python libraries
in the python system folder
or in the Python folder inside the home directory of the user,
or in a virtual environment (if one is used)
"""
# We are in a Conda environment if
# - the CONDA_PREFIX environment variable exists and,
# - the khiops_env script exists within:
# - `%CONDA_PREFIX\Library\bin%` on Windows
# - `$CONDA_PREFIX/bin` on Linux and MacOS
# Note: The check that the Khiops binaries are actually executable is done
# afterwards by the initializations method.
installation_method = "unknown"
if "CONDA_PREFIX" in os.environ:
conda_env_dir = os.environ["CONDA_PREFIX"]
if platform.system() == "Windows":
conda_binary_dir = os.path.join(conda_env_dir, "Library", "bin")
else:
conda_binary_dir = os.path.join(conda_env_dir, "bin")
if _khiops_env_file_exists(conda_binary_dir):
installation_method = "conda"
# Otherwise (installation_method is still "unknown"), we choose between
# conda-based and local (default choice)
if installation_method == "unknown":
env_bin_dir = _infer_env_bin_dir_for_conda_based_installations()
if trace:
print(f"Environment binary dir: '{env_bin_dir}'")
if _check_conda_env_bin_dir(env_bin_dir) and _khiops_env_file_exists(
env_bin_dir
):
installation_method = "conda-based"
else:
installation_method = "binary+pip"
if trace:
print(f"Installation method: '{installation_method}'")
assert installation_method in ("conda", "conda-based", "binary+pip")
return installation_method
def _check_executable(bin_path):
if not os.path.isfile(bin_path):
raise KhiopsEnvironmentError(f"Non-regular executable file. Path: {bin_path}")
elif not os.access(bin_path, os.X_OK):
raise KhiopsEnvironmentError(
f"Executable has no execution rights. Path: {bin_path}"
)
def _get_current_library_installer():
"""Returns the installer of the python library
Returns
-------
str
installer name among : 'pip', 'conda' or 'unknown'
"""
try:
# Each time a python library is installed a 'dist-info' folder is created
# Normalized files can be found in this folder
installer_files = [path for path in files("khiops") if path.name == "INSTALLER"]
if len(installer_files) > 0:
try:
return installer_files[0].read_text().strip()
except FileNotFoundError:
# At this step a FileNotFoundError exception can still occur
# because the files list is read first from a RECORD file
# before the filesystem is actually accessed.
# The exception is ignored here because a warning
# for the general case of a missing INSTALLER file
# will be created below.
pass
# No "INSTALLER" file is found inside the package metadata
warnings.warn(
"The python library metadata exists ('khiops-*.dist-info') "
"but seems corrupted as no INSTALLER file can be found. "
"Please re-install using the same tool ('conda' or 'pip').",
stacklevel=3,
)
return "unknown"
except PackageNotFoundError:
# The python library is not installed via standard tools like conda, pip...
return "unknown"
def _build_khiops_process_environment():
"""Build a specific environment used for the execution of khiops in a process
This environment can be modified freely without interfering
with the global one.
"""
khiops_env = os.environ.copy()
# Ensure HOME is always set for OpenMPI 5+
# (using KHIOPS_MPI_HOME if it exists)
khiops_env["HOME"] = os.path.pathsep.join(
[khiops_env.get("KHIOPS_MPI_HOME", ""), khiops_env.get("HOME", "")]
)
return khiops_env
class KhiopsRunner(ABC):
"""Abstract Khiops Python runner to be re-implemented"""
def __init__(self):
"""See class docstring"""
self._initialize_root_temp_dir()
self._khiops_version = None
self._samples_dir = None
# Whether to write the Khiops Python library version of the scenarios
# For development uses only
self._write_version = True
def _initialize_root_temp_dir(self):
"""Initializes the runner's root temporary directory
It tries to set a proper root temporary directory. It tries the following
strategies in order:
- Check that ``$TEMP/khiops/python`` exists and use it
- Try to create ``$TEMP/khiops/python` and use it
- Create a ``$TEMP/khiops_<HASH>/python`` and use it
"""
# Create the directory if it doesn't exists
self._root_temp_dir = os.path.join(tempfile.gettempdir(), "khiops", "python")
if not os.path.exists(self._root_temp_dir):
os.makedirs(self._root_temp_dir)
# Create the dir with a hash name if it is a dir but it doesn't have all
# permissions or if it is a file
elif os.path.isfile(self._root_temp_dir) or _isdir_without_all_perms(
self._root_temp_dir
):
self._root_temp_dir = os.path.join(
tempfile.mkdtemp(prefix="khiops_"), "python"
)
os.makedirs(self._root_temp_dir, exist_ok=True)
@property
def root_temp_dir(self):
r"""str: The runner's temporary directory
The temporary scenarios/templates and dictionary files created by
the Khiops Python library are stored here.
Default value:
- Windows: ``%TEMP%\khiops\python``
- Linux: ``$TMP/khiops/python``
When set to a local path it tries to create the specified directory if it
doesn't exist.
Raises
------
`.KhiopsEnvironmentError`
If set to a local path: if it is a file or if it does not have ``+rwx``
permissions.
"""
return self._root_temp_dir
@root_temp_dir.setter
def root_temp_dir(self, dir_path):
# Check existence, directory status and permissions for local paths
if fs.is_local_resource(dir_path):
real_dir_path = fs.create_resource(dir_path).path
if os.path.exists(real_dir_path):
if os.path.isfile(real_dir_path):
raise KhiopsEnvironmentError(
f"File at temporary directory os.path. Path: {real_dir_path}"
)
elif _isdir_without_all_perms(real_dir_path):
raise KhiopsEnvironmentError(
"Temporary directory must have +rwx permissions. "
f"Path: {real_dir_path}"
)
else:
os.makedirs(real_dir_path)
# There are no checks for non-local filesystems (no `else` statement)
self._root_temp_dir = dir_path
def create_temp_file(self, prefix, suffix):
"""Creates a unique temporary file in the runner's root temporary directory
.. note::
For remote filesystems no actual file is created, just a (highly probable)
unique path is returned.
Parameters
----------
prefix : str
Prefix for the file's name.
suffix : str
Suffix for the file's name.
Returns
-------
str
A unique path within the root temporary directory. The file is created only
in the case of a local filesystem.
"""
# Local resource: Effectively create the file with the python file API
if fs.is_local_resource(self.root_temp_dir):
# Extract the path from the potential URI
root_temp_dir_path = fs.create_resource(self.root_temp_dir).path
# Create the temporary file
tmp_file_fd, tmp_file_path = tempfile.mkstemp(
prefix=prefix, suffix=suffix, dir=root_temp_dir_path
)
os.close(tmp_file_fd)
# Remote resource: Just return a highly probable unique path
else:
tmp_file_path = fs.get_child_path(
self.root_temp_dir, f"{prefix}{uuid.uuid4()}{suffix}"
)
return tmp_file_path
def create_temp_dir(self, prefix):
"""Creates a unique directory in the runner's root temporary directory
Parameters
----------
prefix : str
Prefix for the directory's name.
Returns
-------
str
A unique directory path within the root temporary directory. The directory
is created only in the case of a local filesystem.
"""
# Local resource: Effectively create the directory with the python file API
if fs.is_local_resource(self.root_temp_dir):
root_temp_dir_path = fs.create_resource(self.root_temp_dir).path
temp_dir = tempfile.mkdtemp(prefix=prefix, dir=root_temp_dir_path)
# Remote resource: Just return a highly probable unique path
else:
temp_dir = fs.get_child_path(self.root_temp_dir, f"{prefix}{uuid.uuid4()}")
return temp_dir
@property
def samples_dir(self):
r"""str: Location of the Khiops' sample datasets directory. May be an URL/URI"""
return self._get_samples_dir()
def _get_samples_dir(self):
"""To be overridden by subclasses"""
return self._samples_dir
@samples_dir.setter
def samples_dir(self, samples_dir):
if not is_string_like(samples_dir):
raise TypeError(
type_error_message("samples_dir", samples_dir, "string-like")
)
self._set_samples_dir(samples_dir)
def _set_samples_dir(self, samples_dir):
"""To be overridden by child classes to add additional checks"""
self._samples_dir = samples_dir
@property
def khiops_version(self):
"""`.KhiopsVersion`: The version of the Khiops backend of this runner"""
return self._get_khiops_version()
def _get_khiops_version(self):
"""khiops_version getter to be overridden by subclasses"""
return self._khiops_version
def _build_status_message(self):
"""Constructs the status message
Descendant classes can add additional information.
Returns
-------
tuple
A 3-tuple containing in this order :
- The status message
- A list of error messages (str)
- A list of warning messages (str)
"""
# Capture the status of the samples dir
warning_list = []
with warnings.catch_warnings(record=True) as caught_warnings:
samples_dir_path = self.samples_dir
if caught_warnings is not None:
# caught_warnings contains a list of WarningMessage
warning_list.extend([w.message for w in caught_warnings])
# the following path is accurate only if the current file
# is still in the 'khiops.core.internals' package
assert (
os.path.basename(Path(__file__).parents[2]) == "khiops"
), "Please fix the `Path.parents` in this method "
library_root_dir = Path(__file__).parents[2]
status_msg = "Khiops Python library settings\n"
status_msg += f"version : {khiops.__version__}\n"
status_msg += f"runner class : {self.__class__.__name__}\n"
status_msg += f"root temp dir : {self.root_temp_dir}\n"
status_msg += f"sample datasets dir : {samples_dir_path}\n"
status_msg += f"library root dir : {library_root_dir}\n"
error_list = []
return status_msg, error_list, warning_list
def print_status(self):
"""Prints the status of the runner to stdout"""
# Obtains the status_msg, errors and warnings
status_msg, error_list, warning_list = self._build_status_message()
# Print status details
print(status_msg, end="")
if error_list or warning_list:
print("Installation issues detected:\n")
print("---\n")
# Print the errors (if any)
if error_list:
print("Errors:")
for error in error_list:
print(f"\tError: {error}\n")
# Print the warnings (if any)
if warning_list:
print("Warnings:")
for warning in warning_list:
print(f"\tWarning: {warning}\n")
# The exit code is non-zero if there are errors
if len(error_list) == 0:
return 0
return 1
@abstractmethod
def _initialize_khiops_version(self):
"""Initialization of `khiops_version` to be implemented in child classes"""
def run(
self,
task,
task_args,
command_line_options=None,
trace=False,
system_settings=None,
stdout_file_path="",
stderr_file_path="",
force_ansi_scenario=False,
**kwargs,
):
"""Runs a Khiops Task
Parameters
----------
task : `.KhiopsTask`
Khiops task to be run.
task_args : dict
Arguments for the task.
command_line_options : `.CommandLineOptions`, optional
Command line options for all tasks. If not set the default values are used.
See the `.CommandLineOptions` for more information.
trace : bool, default ``False``
If True prints the command line executed of the process and does not delete
any temporary files created.
system_settings : `.SystemSettings`, optional
*Advanced:* System settings for all tasks. See the `.SystemSettings`
class for more information.
stdout_file_path : str, default ""
*Advanced* Path to a file where the Khiops process writes its stdout stream.
Normally Khiops should not write to this stream but MPI, filesystems plugins
or debug versions may do it. The stream is captured with a UTF-8 encoding
and replacing encoding errors. If equal to "" then it writes no file.
stderr_file_path : str, default ""
*Advanced* Path to a file where the Khiops process writes its stderr stream.
Normally Khiops should not write to this stream but MPI, filesystems plugins
or debug versions may do it. The stream is captured with a UTF-8 encoding
and replacing encoding errors. If equal to "" then it writes no file.
force_ansi_scenario : bool, default ``False``
*Advanced:* If True the internal scenario generated by Khiops will force
characters such as accentuated ones to be decoded with the UTF8->ANSI khiops
transformation.
Raises
------
`ValueError`
- Unknown keyword argument
- Files or executable not found
- Errors in the execution of the Khiops tool
`TypeError`
- Invalid type of a keyword argument
- When the search/replace pairs are not strings
"""
# Warn if there are still kwargs: At this point any keyword argument is invalid
if kwargs:
warnings.warn(invalid_keys_message(kwargs), stacklevel=3)
kwargs.clear()
# Use the default command line options if not specified
if command_line_options is None:
command_line_options = CommandLineOptions()
# Check the call arguments
if not isinstance(trace, bool):
raise TypeError(type_error_message("trace", trace, bool))
command_line_options.check()
if not isinstance(stdout_file_path, str):
raise TypeError(
type_error_message("stdout_file_path", stdout_file_path, str)
)
if not isinstance(stderr_file_path, str):
raise TypeError(
type_error_message("stderr_file_path", stderr_file_path, str)
)
# Write the scenarios file
scenario_path = self._write_task_scenario_file(
task,
task_args,
system_settings,
force_ansi_scenario=force_ansi_scenario,
)
# If no log file specified: Use a temporary file
tmp_log_file_path = None
if not command_line_options.log_file_path:
tmp_log_file_path = self.create_temp_file("_run_", ".log")
command_line_options.log_file_path = tmp_log_file_path
# Execute Khiops
try:
# Disable pylint warning about abstract method _run returning None
# pylint: disable=assignment-from-no-return
return_code, stdout, stderr = self._run(
task.tool_name,
scenario_path,
command_line_options,
trace,
)
# pylint: enable=assignment-from-no-return
# Catch an OS level error if any
except OSError as error:
raise KhiopsRuntimeError("Khiops execution failed.") from error
# Report any errors raised by Khiops
else:
# Write the stdout and stderr streams if specified
if stdout_file_path:
fs.write(stdout_file_path, bytes(stdout, encoding="utf8"))
if stderr_file_path:
fs.write(stderr_file_path, bytes(stderr, encoding="utf8"))
# Report the exit status
self._report_exit_status(
task.tool_name,
return_code,
stdout,
stderr,
command_line_options.log_file_path,
)
# Clean files unless trace mode is activated
finally:
if trace:
print(f"Khiops execution scenario: {scenario_path}")
print(f"Khiops log file: {command_line_options.log_file_path}")
else:
fs.remove(scenario_path)
if tmp_log_file_path is not None:
fs.remove(tmp_log_file_path)
def _report_exit_status(
self, tool_name, return_code, stdout, stderr, log_file_path
):
"""Reports the exit status of a Khiops execution"""
# Note:
# We report stdout and stderr below because we use a log file and thus
# normally Khiops doesn't write anything to these streams. In practice MPI and
# the remote filesystems plugins may write to them to report anomalies.
# Report messages:
# - The warnings in the log
# - The errors and/or fatal errors in the log
# - The stdout if not empty
# - The stderr if not empty
#
# If there were any errors (fatal or not) or the return code is non-zero the
# reporting is via an exception. Otherwise we show the message as a warning.
#
# Create the message reporting the errors and warnings
error_msg = ""
# If the log file exists: Collect the errors and warnings messages
if fs.exists(log_file_path):
errors, fatal_errors, warning_messages = self._collect_errors(log_file_path)
if warning_messages:
error_msg += "Warnings in log:\n" + "".join(warning_messages)
if errors:
if error_msg:
error_msg += "\n"
error_msg += "Errors in log:\n" + "".join(errors)
if fatal_errors:
if error_msg:
error_msg += "\n"
error_msg += "Fatal errors in log:\n" + "".join(fatal_errors)
# Otherwise warn that the log file is missing
else:
warnings.warn(
f"Log file not found after {tool_name} execution."
f"Path: {log_file_path}"
)
errors = fatal_errors = []
# Add stdout to the warning message if non empty
if stdout:
if error_msg:
error_msg += "\n"
error_msg += f"Contents of stdout:\n{stdout}"
# Add stderr to the warning message if non empty
if stderr:
if error_msg:
error_msg += "\n"
error_msg += f"Contents of stderr:\n{stderr}"
# Report the message to the user if there were any
if error_msg:
# Raise an exception if there were errors
if errors or fatal_errors or return_code != 0:
raise KhiopsRuntimeError(
f"{tool_name} execution had errors (return code {return_code}):\n"
f"{error_msg}"
)
# Otherwise show the message as a warning
else:
error_msg = (
f"Khiops ended correctly but there were minor issues:\n{error_msg}"
)
warnings.warn(error_msg.rstrip())
# Raise an exception anyway for a non-zero return code without any message
else:
if return_code != 0:
raise KhiopsRuntimeError(
f"{tool_name} execution had errors (return code {return_code}) "
"but no message is available"
)
def _collect_errors(self, log_file_path):
# Collect errors any errors found in the log
errors = []
fatal_errors = []
warning_messages = []
# Look in the log for error lines
log_file_lines = None
try:
log_file_contents = fs.read(log_file_path)
log_file_lines = io.TextIOWrapper(
io.BytesIO(log_file_contents), encoding="utf8", errors="replace"
)
for line_number, line in enumerate(log_file_lines, start=1):
if line.startswith("warning : "):
warning_messages.append(f"Line {line_number}: {line}")
elif line.startswith("error : "):
errors.append(f"Line {line_number}: {line}")
elif line.startswith("fatal error : "):
fatal_errors.append(f"Line {line_number}: {line}")
# Warn on error for remote file handling. Replace with empty log file.
except ImportError:
warnings.warn(
"Could not read remote log file and errors may not be "
"reported. Make sure you have installed the extra "
"dependencies for remote filesystems.",
stacklevel=3,
)
return errors, fatal_errors, warning_messages
def _create_scenario_file(self, task):
assert isinstance(task, KhiopsTask)
return self.create_temp_file(f"{task.name}_", "._kh")
def _write_task_scenario_file(
self, task, task_args, system_settings, force_ansi_scenario=False
):
scenario_path = self._create_scenario_file(task)
with io.BytesIO() as scenario_stream:
writer = KhiopsOutputWriter(scenario_stream, force_ansi=force_ansi_scenario)
if self._write_version:
writer.writeln(f"// Generated by khiops-python {khiops.__version__}")
self._write_task_scenario(writer, task, task_args, system_settings)
fs.write(scenario_path, scenario_stream.getvalue())
return scenario_path
def _write_task_scenario(self, writer, task, task_args, system_settings):
assert isinstance(task, KhiopsTask)
assert isinstance(task_args, dict)
assert isinstance(system_settings, SystemSettings)
# Write the task scenario
self._write_scenario_prologue(writer, system_settings)
task.write_execution_scenario(writer, task_args)
self._write_scenario_exit_statement(writer)
def _write_scenario_prologue(self, writer, system_settings):
# Write the system settings if any
if (
system_settings.max_cores
or system_settings.memory_limit_mb
or system_settings.temp_dir
):
writer.writeln("// System settings")
if system_settings.max_cores:
writer.write("AnalysisSpec.SystemParameters.MaxCoreNumber ")
writer.writeln(str(system_settings.max_cores))
if system_settings.memory_limit_mb:
writer.write("AnalysisSpec.SystemParameters.MemoryLimit ")
writer.writeln(str(system_settings.memory_limit_mb))
if system_settings.temp_dir:
writer.write("AnalysisSpec.SystemParameters.TemporaryDirectoryName ")
writer.writeln(system_settings.temp_dir)
writer.writeln("")
# Write the user defined prologue
if system_settings.scenario_prologue:
writer.writeln("// User-defined prologue")
for line in system_settings.scenario_prologue.split("\n"):
writer.writeln(line)
writer.writeln("")
def _write_scenario_exit_statement(self, writer):
writer.writeln("")
writer.writeln("// Exit Khiops")
writer.writeln("ClassManagement.Quit")
writer.writeln("OK")
@abstractmethod
def _run(
self,
tool_name,
scenario_path,
command_line_options,
trace,
):
"""Abstract run method to be implemented in child classes
Returns
-------
tuple
A 3-tuple containing the return code, the stdout and the stderr of the
Khiops process.
Raises
------
`.KhiopsRuntimeError`
If there were any errors in the Khiops execution.
"""
class KhiopsLocalRunner(KhiopsRunner):
r"""Implementation of a local Khiops runner
Requires either:
- This library installed through Conda and run from a Conda environment, or
- the ``khiops-core`` Linux native library installed on the local machine, or
- the Windows Khiops desktop application installed on the local machine
.. rubric:: Samples directory settings
Default values for the ``samples_dir`` attribute:
- The value of the ``KHIOPS_SAMPLES_DIR`` environment variable (path to the Khiops
sample datasets directory).
- Otherwise:
- Windows:
- ``%PUBLIC%\khiops_data\samples%`` if ``%PUBLIC%`` is defined
- ``%USERPROFILE%\khiops_data\samples%`` otherwise
- Linux and macOS:
- ``$HOME/khiops_data/samples``
"""
def __init__(self):
# Define specific attributes
self._mpi_command_args = None
self._khiops_path = None
self._khiops_coclustering_path = None
self._khiops_version = None
self._samples_dir = None
self._samples_dir_checked = False
# Call parent constructor
super().__init__()
# Initialize Khiops environment
self._initialize_khiops_environment()
def _initialize_khiops_environment(self):
# Check the `khiops_env` script
# On Windows native installations, rely on the `KHIOPS_HOME` environment
# variable set by the Khiops Desktop Application installer
installation_method = _infer_khiops_installation_method()
if platform.system() == "Windows" and installation_method == "binary+pip":
# KHIOPS_HOME variable by default
if "KHIOPS_HOME" in os.environ:
khiops_env_path = os.path.join(
os.environ["KHIOPS_HOME"], "bin", "khiops_env.cmd"
)
# Raise error if KHIOPS_HOME is not set
else:
raise KhiopsEnvironmentError(
"No environment variable named 'KHIOPS_HOME' found. "
"Make sure you have installed Khiops >= 10.2.3. "
"Go to https://khiops.org for more information."
)
# In Conda-based environments, `khiops_env` might not be in the PATH,
# hence its path must be inferred
elif installation_method == "conda-based":
khiops_env_path = os.path.join(
_infer_env_bin_dir_for_conda_based_installations(), "khiops_env"
)
if platform.system() == "Windows":
khiops_env_path += ".cmd"
# On UNIX or Conda, khiops_env is always in path for a proper installation
else:
khiops_env_path = shutil.which("khiops_env")
if khiops_env_path is None:
raise KhiopsEnvironmentError(
"The 'khiops_env' script not found for the current "
f"'{installation_method}' installation method. Make sure "
"you have installed khiops >= 10.2.3. "
"Go to https://khiops.org for more information."
)
with subprocess.Popen(
[khiops_env_path, "--env"],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=True,
) as khiops_env_process:
stdout, stderr = khiops_env_process.communicate()
if khiops_env_process.returncode != 0:
raise KhiopsEnvironmentError(