-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathchemfarm_scripts_tools.py
More file actions
259 lines (214 loc) · 9.09 KB
/
chemfarm_scripts_tools.py
File metadata and controls
259 lines (214 loc) · 9.09 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""Module for chemfarm deployment scripts.
Created on Thu Jun 16 17:42:40 2022
>>> import chemfarm_scripts_tools as chest
package for making chemfarm-grade simulation folders with bash scripts
which is derived from a single tape/vtu scanning a block of parameters
An example of a simulation folder
folder
job_script_00
parameters.json => {"a": [0,1], "b":[2,4]}
tape => A=3,B=8,C=9
x0000_a0b0
parameters.json => {"a": 0, "b": 2}
tape => A=0,B=2,C=9
x0001_a0b1
parameters.json => {"a": 0, "b": 4}
tape => A=0,B=4,C=9
x0002_a1b0
parameters.json => {"a": 1, "b": 2}
tape => A=1,B=2,C=9
x0003_a1b1
parameters.json => {"a": 1, "b": 4}
tape => A=1,B=4,C=9
Param is a named tuple which represent a parameter of the simulation
This is used in dictionaries
param_dict = { "key": Param(value,id,name) }
"key{id}" is what appears in the folder name, and the tape is updated with "name=value"
The subfolders are derived from a single tape and a dictionary of Params
all_param_dict = {"a": Param(value=[0,1],id=None,name='A'), "b": Param(value=[2,4],id=None,name='B')}
The `iterate_params` function generate a param_dict for each subfolder. In the example, subfolder 2 gets
iterate_params(all_param_dict)[2] == {"a": Param(value=1,id=1,name='A'), "b": Param(value=2,id=0,name='B')}
the name of the folder is generated by `make_dirname`.
`write_parameters_file` records the param_dict into a parameters.json file.
the tapes are derived using the `modify_tapetext` and `write_derivative_tape` functions
`make_scripts` function creates PBS scripts for running the folders
See the `file.py` module, which has structures to represented such a completed simulation folder.
"""
import os
import json
import datetime
from collections import namedtuple
from itertools import cycle, product, repeat, islice
def batched(iterable,n):
"""Batch data into tuples of length n. The last batch may be shorter.
>>> batched('ABCDEFG', 3)
ABC DEF G
From python docs
"""
if n < 1:
raise ValueError('n must be at least one')
it = iter(iterable)
while batch := tuple(islice(it, n)):
yield batch
Param = namedtuple("Param", ["value", "id", "name"])
def get_pval(d, key, default_val=None):
"""Get parameter value from dictionary of Param."""
return d.get(key, Param(default_val, None, None)).value
def iterate_params(all_param_dict):
"""Create an iterator for all parameter combinations in all_param_dict."""
for x in product(
*(enumerate(zip(p.value, repeat(p.name), repeat(key)))
for key, p in all_param_dict.items())):
yield {z[1][2]: Param(z[1][0], z[0], z[1][1]) for z in x}
def write_parameters_file(folder, param_dict):
"""Write parameters.json, recording x_i02 -> {i: 1.2, 1.3, 1.4, 1.5}."""
param_val_dict = {key: param.value for key, param in param_dict.items()}
with open(os.path.join(folder, "parameters.json"), "w") as f:
json.dump(param_val_dict, f)
def make_dirname(num_sim, param_dict, numsize=(4, 2)):
"""Return name string for a directory in the style x0032_a01b02."""
prefix = f"x{num_sim:0{numsize[0]}}_"
postfix = "".join(f"{key}{param.id:0{numsize[1]}}"
for key, param in param_dict.items())
return prefix + postfix
def modify_tapetext(text, param_dict, base_temperature=20, extra_temperature_names=None):
"""Return modified tape text, which is updated with the parameters in param_dict.
Special behavior for temperature: rescale parameters with keys
{'w', 'f', 'F', 'xk0', 'kx2', 'pressure', 'xkA0', 'adhesion_strength'}
over base_temperature.
example: change 'A' to 5 and temperature to 25
>>> modify_tapetext("A=2,B=4,F=6,w=8",{'a': Param(value=5,id=9,name='A'), 'blah': Param(value=25,id=-12,name='temperature')})
"A=5,B=4,F=4.8,w=6.4"
"""
lines = text.splitlines()
newlines = []
reprules = {param.name: param.value
for param in param_dict.values() if param.name is not None}
do_temperature = 'temperature' in reprules
if do_temperature:
temperature_reprules = {'w', 'f', 'F', 'xk0', 'kx2', 'pressure',
'xkA0', 'adhesion_strength'}
if extra_temperature_names:
temperature_reprules.update(extra_temperature_names)
factor = reprules['temperature']/base_temperature
for i, line in enumerate(lines):
name, *rest = line.split("=")
if name in reprules:
rest = str(reprules[name]),
if do_temperature:
if name in temperature_reprules:
curr_val = float(*rest)
rest = str(curr_val*factor),
newlines.append("=".join((name, *rest)))
return "\n".join(newlines)
def write_derivative_tape(folder, base_tape, param_dict):
"""Derive tape from base tape and parameter dict and write in subfolder."""
if base_tape is not None:
with open(base_tape, "r") as tape_file:
text = tape_file.read()
text = modify_tapetext(text, param_dict)
with open(os.path.join(folder, "tape"), "w") as tape_file:
tape_file.write(text)
return text
def job_script_head(name, queue="long", select="1",
ncpus="1", mem="10000", walltime=None):
"""Job script file PBS directives."""
walltime = f"#PBS -l walltime={walltime}:00:00\n" if walltime else ""
s = f"""#!/bin/bash
#
#PBS -N {name}
#PBS -j oe
#PBS -q {queue}
#PBS -m eb
#PBS -M yoav.ravid@weizmann.ac.il
#PBS -l select={select}:ncpus={ncpus}:mem={mem}mb
{walltime}
# Print time and date, beginning of the simulation
date
echo `hostname`
"""
return s
def job_script_generate_note(script_name,today):
main_note = f"# GENERATED BY PYTHON SCRIPT {script_name} on {today} #"
padd_str = "#"*len(main_note)
return f"{padd_str}\n{main_note}\n{padd_str}\n\n"
def job_script_tail(dir_names, trisurf_name,
start_params, continue_params):
"""Job script: execute trisurf for each folder folder."""
directories = "\n".join(' '.join(x) for x in batched(dir_names,4))
directories = f'({directories})'
s = f"""### work in the PBS_O_WORKDIR ###
cd $PBS_O_WORKDIR
# now move to each dir, and run trisurf
# a directory with the appropriate tape and vtu should be prepared ahead of time
# start and continue params are here for easier sed manipulation
trisurf_prog={trisurf_name}
start_params={start_params}
continue_params={continue_params}
directories={directories}
for dir in ${{directories[@]}} ; do
cd $dir && {{
sleep 2
if [[ -e dump.bin ]] ; then
time $trisurf_prog $continue_params &
else
time $trisurf_prog $start_params &
fi
cd -
}}
done;
# wait for all jobs to finish
wait
# print the time and date at the end
date
"""
return s
def make_scripts(script_name, job_name, subdirectories,
base_tape, base_vtu=None, chunks=None,
mem_per_sim=1200, max_time=360, queue="idle", opmode=None,
trisurf_path="~/apps/bin/trisurf"):
"""Generate PBS compatible scripts for running trisurf in a folder.
The subdirectories are split into chunks each with its own script "job_script_##"
returns list of (script_name,script_text)
"""
scripts_out = []
today = datetime.date.today()
if opmode is None:
opmode = "--force" if base_vtu is None else "--restore timestep_000000.vtu"
jobs_i = 0
if chunks is None:
chunks = (24, 12)
chunk_sizes = cycle(chunks)
generated_note = job_script_generate_note(script_name,today)
all_directories = [*subdirectories]
xsize = len(str(len(all_directories))) # number of digits for the script number i.e. 1000 scripts need job_script_0000
chunk_size = next(chunk_sizes)
current_directories = []
for directory in all_directories:
current_directories.append(directory)
if len(current_directories) == chunk_size:
head = job_script_head(f"{job_name}_{jobs_i:0{xsize}}",
queue, 1, chunk_size,
chunk_size * mem_per_sim,
max_time)
tail = job_script_tail(current_directories, trisurf_path,
f'{opmode}', '')
scripts_out.append([f"job_script_{jobs_i:0{xsize}}",
"".join((head,generated_note,tail))])
jobs_i += 1
current_directories = []
chunk_size = next(chunk_sizes)
if current_directories:
# write any remaining files
chunk_size = len(current_directories)
head = job_script_head(f"{job_name}_{jobs_i:0{xsize}}",
queue, 1, chunk_size,
chunk_size * mem_per_sim,
max_time)
tail = job_script_tail(current_directories, trisurf_path,
f'{opmode}', '')
scripts_out.append([f"job_script_{jobs_i:0{xsize}}",
"".join((head,generated_note,tail))])
return scripts_out