WIP: Check size of input files to handle nfs sync issues#62
Conversation
|
Note that the finished file is also put in the finished.tar.gz file at job cleanup. |
494f778 to
0f2e67b
Compare
critias
left a comment
There was a problem hiding this comment.
The overall approach looks good to me, let's see if it works as expected.
|
Side note: This change does not break old setups. If no size info is available for a given job / task, no checks are run. |
| @@ -210,6 +210,10 @@ def file_caching(path): | |||
| WAIT_PERIOD_JOB_CLEANUP = 10 | |||
| #: How many seconds should all inputs be available before starting a job to avoid file system synchronization problems | |||
| WAIT_PERIOD_MTIME_OF_INPUTS = 60 | |||
There was a problem hiding this comment.
WAIT_PERIOD_MTIME_OF_INPUTS can be removed since the check inside of task is removed. Setting it should also be removed in toolkit.setup_script_mode
| except AttributeError: | ||
| job_dir = job | ||
|
|
||
| if os.path.exists(os.path.join(job_dir, gs.JOB_FINISHED_ARCHIVE)): |
There was a problem hiding this comment.
This could alternatively also be checked doing something like this:
import tarfile
tf=tarfile(os.path.join(job_dir, gs.JOB_FINISHED_ARCHIVE))
for n in tf:
if n.path.startswith('usage.'):
usage = literal_eval(tf.extractfile(n.path).read())
but I'm not sure if this would be worth the effort. The given solution should work in nearly all cases if the clean up timeout is large enough.
| # If the job has been cleaned up, no size info is available, but we can safely | ||
| # assume that enough time has passed so that all files are synced. | ||
| if other_job_sizes: | ||
| expected_sizes[i.rel_path()] = other_job_sizes[i.rel_path()] |
There was a problem hiding this comment.
This fails if a path is only used as prefix without representing a real file. We could change it to something like this:
try:
expected_sizes[rel_path] = other_job_sizes[rel_path]
except KeyError:
for k, v in other_job_sizes.items():
if k.startswith(rel_path):
expected_sizes[k] = v
This could also be used if the path is pointing to a directory.
| if time.time() - start > timeout: | ||
| logging.error("%s not synced for more than %ds, file_stats still empty.", fn, timeout) | ||
| raise TimeoutError | ||
| logging.info("%s not synced yet, file_stats still empty.", fn) |
There was a problem hiding this comment.
This will cause problems if a path is available and should be used before the job is finished e.g. training of a neural model.
We could require that these paths have a special attribute set. They could then be excluded from this check.
| while True: | ||
| with open(fn) as f: | ||
| try: | ||
| stats = literal_eval(f.read())["file_stats"] |
There was a problem hiding this comment.
It can happen that this raises a SyntaxError if the file is accessed while it's being written.
| logging.info("Inputs:\n%s", "\n".join( str(i) for i in self._job._sis_inputs)) | ||
|
|
||
| try: | ||
| self._wait_for_input_to_sync() |
There was a problem hiding this comment.
It would be could to be able to switch off the new check with an entry in the setting file if something goes wrong. Even better: switch back to the old timeout.
| try: | ||
| self._wait_for_input_to_sync() | ||
| except TimeoutError: | ||
| self.error(task_id, True) |
There was a problem hiding this comment.
This doesn't really stop the task from running. It's sets the error marker and then continues. Once the task is finished a finished marker is set and the error marker is ignored.
|
@critias : I wanted to get this PR merged before "vacation" (=no kindergarden) starts, which didn't happen because I spent last week in bed, not in front of a computer screen. As I won't be able to do so until 8/13, I invite you to take over this PR. :-) |
|
@critias I got sidetracked for quite some time ... How about the current situation of the cluster? Is it running so smoothly that this PR has become obsolete? Starting next week I'd have time to implement the changes you suggested. |
|
I continued working on it here: https://github.com/rwth-i6/sisyphus/tree/check-output-size |
After a discussion with @critias , we opted for the following design, handling sync issues both between jobs and between tasks.
LoggingThreadwrites a the size and the mtime of all files belowworkandoutputto theressourcesfile (->Job._sis_get_file_stats)Task._wait_for_input_to_sync. There the expected sizes are obtained by callingJob._sis_get_expected_file_sizes(job_dir, task)...There are two new config keys: