forked from Cosmotheka/Cosmotheka
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrun_cls.py
410 lines (349 loc) · 15.2 KB
/
run_cls.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
#!/usr/bin/python
from xcell.cls.data import Data
import os
import sys
import time
import subprocess
import numpy as np
import re
from datetime import datetime
from glob import glob
##############################################################################
def get_mem(data, trs, compute):
# Return memory for nside 4096
d = {}
if compute == 'cls':
d[0] = 16
d[2] = 25
elif compute == 'cov':
d[0] = 16
d[2] = 47
else:
raise ValueError('{} not defined'.format(compute))
mem = 0
for tr in trs:
mapper = data.get_mapper(tr)
s = mapper.get_spin()
mem += d[s]
return mem
def get_queued_jobs():
result = subprocess.run(['q', '-tn'], stdout=subprocess.PIPE)
return result.stdout.decode('utf-8')
def check_skip(data, skip, trs):
for tr in trs:
if tr in skip:
return True
elif data.get_tracer_bare_name(tr) in skip:
return True
return False
def get_pyexec(comment, nc, queue, mem, onlogin, outdir, batches=False, logfname=None):
if batches:
pyexec = ""
else:
pyexec = sys.executable
if not onlogin:
logdir = os.path.join(outdir, 'log')
os.makedirs(logdir, exist_ok=True)
if logfname is None:
logfname = os.path.join(logdir, comment + '.log')
pyexec = "addqueue -o {} -c {} -n 1x{} -s -q {} -m {} {}".format(logfname, comment, nc, queue, mem, pyexec)
return pyexec
def get_jobs_with_same_cwsp(data):
cov_tracers = data.get_cov_trs_names(wsp=True)
cov_tracers += data.get_cov_trs_names(wsp=False)
cov_tracers = np.unique(cov_tracers, axis=0).tolist()
nsteps = len(cov_tracers)
cwsp = {}
for i, trs in enumerate(cov_tracers):
# print(f"Splitting jobs in batches. Step {i}/{nsteps}")
# Instantiating the Cov class is too slow
# cov = Cov(data.data, *trs)
# fname = cov.get_cwsp_path()
# The problem with this is that any change in cov.py will not be seen here
mask1, mask2, mask3, mask4 = [data.data['tracers'][trsi]["mask_name"] for trsi in trs]
fname = os.path.join(data.data['output'],
f'cov/cw__{mask1}__{mask2}__{mask3}__{mask4}.fits')
if fname in cwsp:
cwsp[fname].append(trs)
else:
cwsp[fname] = [trs]
return cwsp
def clean_lock(data):
qjobs = get_queued_jobs()
# regexp without the path since it does not appear when in queuing
batches = re.findall('batch.*.sh', qjobs)
batches_dir = os.path.join(data.data['output'], "run_batches")
rm_lock_files = glob(os.path.join(batches_dir, "*_remove_locks.sh"))
# Remove from rm_lock_files those that are still running
rm_lf_tokeep = []
for batch in batches:
name = os.path.basename(batch).replace('.sh', '')
for rm_lf in rm_lock_files.copy():
if name in rm_lf:
rm_lock_files.remove(rm_lf)
rm_lf_tokeep.append(rm_lf)
# Run the scripts of the processes that failed
for rm_lf in rm_lock_files:
print(f"Running cleaning script: {rm_lf}")
os.system(f"/bin/bash {rm_lf}")
os.remove(rm_lf)
# Clean lock files of jobs that did not get to create the rm files
cov_dir = os.path.join(data.data['output'], "cov")
lock_files = glob(os.path.join(cov_dir, "*.lock"))
for rm_lf in rm_lf_tokeep:
with open(rm_lf, 'r') as f:
for l in f.readlines():
# [3:] to remove the rm
if 'rm' not in l:
continue
fname = re.search("rm .*.lock", l).group()[3:]
# TODO: maybe a try it's faster
try:
lock_files.remove(fname)
except ValueError as e:
if 'not in list' in str(e):
continue
raise e
for lf in lock_files:
os.remove(lf)
print("Finished cleaning lock files")
def launch_cov_batches(data, queue, njobs, nc, mem, onlogin=False, skip=[],
remove_cwsp=False, nnodes=1):
def create_lock_file(fname):
with open(fname + '.lock', 'w') as f:
f.write('')
def write_cw_sh(cw, covs_tbc):
# Create a temporal file so that this cw script is not run elsewhere
# This will be removed once the script finishes (independently if it
# successes or fails)
create_lock_file(cw)
s = f"echo Running {cw}\n"
s += f"{sys.executable} run_cwsp_batch.py {args.INPUT}"
if remove_cwsp:
s += " --no_save_cw"
for covi in covs_tbc:
s += " -trs {} {} {} {}".format(*covi)
s += "\n\n"
s += f"echo Removing lock file: {cw}.lock\n"
s += f'rm {cw}.lock\n\n'
s += f"echo Finished running {cw}\n\n"
return s
outdir = data.data['output']
cwsp = get_jobs_with_same_cwsp(data)
# Create a folder to place the batch scripts. This is because I couldn't
# figure out how to pass it through STDIN
outdir_batches = os.path.join(outdir, 'run_batches')
logfolder = os.path.join(outdir, 'log')
os.makedirs(outdir_batches, exist_ok=True)
# Create the covariance output directory
covdir = os.path.join(outdir, "cov")
os.makedirs(covdir, exist_ok=True)
c = 0
cw_tbc = []
sh_tbc = []
for ni, (cw, trs_list) in enumerate(cwsp.items()):
if c >= njobs * nnodes:
break
elif os.path.isfile(cw + '.lock'):
continue
# Find the covariances to be computed
covs_tbc = []
for trs in trs_list:
fname = os.path.join(covdir, 'cov_{}_{}_{}_{}.npz'.format(*trs))
recompute = data.data['recompute']['cov'] or data.data['recompute']['cmcm']
if (os.path.isfile(fname) and (not recompute)) or \
check_skip(data, skip, trs):
continue
covs_tbc.append(trs)
# To avoid writing and launching an empty file (which will fail if
# remove_cwsp is True when it tries to remove the cw.
if len(covs_tbc) == 0:
continue
cw_tbc.append(cw)
sh_tbc.append(write_cw_sh(cw, covs_tbc))
c += 1
n_total_jobs = len(cw_tbc)
date = datetime.utcnow()
timestamp = date.strftime("%Y%m%d%H%M%S")
for nodei in range(nnodes):
if njobs > n_total_jobs / nnodes:
njobs = int(n_total_jobs / nnodes + 1)
# ~1min per job for nside 1024
time_expected = njobs / (60 * 24)
name = f"batch{nodei}-{njobs}_{timestamp}"
comment = f"{name}\(~{time_expected:.1f}days\)"
sh_name = os.path.join(outdir_batches, f'{name}.sh')
rm_name = os.path.join(outdir_batches, f'{name}_remove_locks.sh')
logfname = os.path.join(logfolder, f'{name}.sh.log')
# Create the script that will be launched
with open(sh_name, 'w') as f:
f.write('#!/bin/bash\n\n')
for i, shi in enumerate(sh_tbc[nodei * njobs : (nodei + 1) * njobs], 1):
f.write(f"# Step {i} / {njobs}\n")
f.write(f"echo Step {i} / {njobs}\n")
f.write(shi)
f.write("echo Removing cleaning script\n")
f.write(f"rm {rm_name}")
# Create a file that will be used to remove the orphan lock files
with open(rm_name, 'w') as f:
f.write('#!/bin/bash\n\n')
f.write('echo Removing lock files\n')
for cwi in cw_tbc[nodei * njobs : (nodei + 1) * njobs]:
f.write(f"[ -f {cwi}.lock ] && rm {cwi}.lock\n")
f.write('echo Finished removing lock files\n')
pyexec = get_pyexec(comment, nc, queue, mem, onlogin, outdir,
batches=True, logfname=logfname)
print("##################################")
print(pyexec + " " + sh_name)
print("##################################")
print()
os.system(f"chmod +x {sh_name}")
os.system(pyexec + " " + sh_name)
time.sleep(1)
def launch_cls(data, queue, njobs, nc, mem, fiducial=False, onlogin=False, skip=[]):
#######
#
cl_tracers = data.get_cl_trs_names(wsp=True)
cl_tracers += data.get_cl_trs_names(wsp=False)
# Remove duplicates
cl_tracers = np.unique(cl_tracers, axis=0).tolist()
outdir = data.data['output']
if fiducial:
outdir = os.path.join(outdir, 'fiducial')
if os.uname()[1] == 'glamdring':
qjobs = get_queued_jobs()
else:
qjobs = ''
c = 0
for tr1, tr2 in cl_tracers:
comment = 'cl_{}_{}'.format(tr1, tr2)
if c >= njobs:
break
elif comment in qjobs:
continue
elif check_skip(data, skip, [tr1, tr2]):
continue
# TODO: don't hard-code it!
trreq = data.get_tracers_bare_name_pair(tr1, tr2, '_')
fname = os.path.join(outdir, trreq, comment + '.npz')
recompute_cls = data.data['recompute']['cls']
recompute_mcm = data.data['recompute']['mcm']
recompute = recompute_cls or recompute_mcm
if os.path.isfile(fname) and (not recompute):
continue
if not fiducial:
pyexec = get_pyexec(comment, nc, queue, mem, onlogin, outdir)
pyrun = '-m xcell.cls.cl {} {} {}'.format(args.INPUT, tr1, tr2)
else:
pyexec = get_pyexec(comment, nc, queue, 2, onlogin, outdir)
pyrun = '-m xcell.cls.cl {} {} {} --fiducial'.format(args.INPUT, tr1, tr2)
print(pyexec + " " + pyrun)
os.system(pyexec + " " + pyrun)
c += 1
time.sleep(1)
def launch_cov(data, queue, njobs, nc, mem, onlogin=False, skip=[]):
#######
#
cov_tracers = data.get_cov_trs_names(wsp=True)
cov_tracers += data.get_cov_trs_names(wsp=False)
cov_tracers = np.unique(cov_tracers, axis=0).tolist()
outdir = data.data['output']
if os.uname()[1] == 'glamdring':
qjobs = get_queued_jobs()
else:
qjobs = ''
c = 0
for trs in cov_tracers:
comment = 'cov_{}_{}_{}_{}'.format(*trs)
if c >= njobs:
break
elif comment in qjobs:
continue
elif check_skip(data, skip, trs):
continue
fname = os.path.join(outdir, 'cov', comment + '.npz')
recompute = data.data['recompute']['cov'] or data.data['recompute']['cmcm']
if os.path.isfile(fname) and (not recompute):
continue
pyexec = get_pyexec(comment, nc, queue, mem, onlogin, outdir)
pyrun = '-m xcell.cls.cov {} {} {} {} {}'.format(args.INPUT, *trs)
print(pyexec + " " + pyrun)
os.system(pyexec + " " + pyrun)
c += 1
time.sleep(1)
def launch_to_sacc(data, name, use, queue, nc, mem, onlogin=False):
outdir = data.data['output']
fname = os.path.join(outdir, name)
if os.path.isfile(fname):
return
comment = 'to_sacc'
pyexec = get_pyexec(comment, nc, queue, mem, onlogin, outdir)
pyrun = '-m xcell.cls.to_sacc {} {}'.format(args.INPUT, name)
if use == 'nl':
pyrun += ' --use_nl'
elif use == 'fiducial':
pyrun += ' --use_fiducial'
print(pyexec + " " + pyrun)
os.system(pyexec + " " + pyrun)
##############################################################################
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="Compute Cls and cov from data.yml file",
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('INPUT', type=str, help='Input YAML data file')
parser.add_argument('compute', type=str, help='Compute: cls, cov or to_sacc.')
parser.add_argument('-n', '--nc', type=int, default=28, help='Number of cores to use')
parser.add_argument('-m', '--mem', type=int, default=7., help='Memory (in GB) per core to use')
parser.add_argument('-q', '--queue', type=str, default='berg', help='SLURM queue to use')
parser.add_argument('-N', '--nnodes', type=int, default=1, help='Number of nodes to use. If given, the jobs will be launched all in the same node')
parser.add_argument('-j', '--njobs', type=int, default=100000, help='Maximum number of jobs to launch')
parser.add_argument('--to_sacc_name', type=str, default='cls_cov.fits', help='Sacc file name')
parser.add_argument('--to_sacc_use_nl', default=False, action='store_true',
help='Set if you want to use nl and cov extra (if present) instead of cls and covG ')
parser.add_argument('--to_sacc_use_fiducial', default=False, action='store_true',
help="Set if you want to use the fiducial Cl and covG instead of data cls")
parser.add_argument('--cls_fiducial', default=False, action='store_true', help='Set to compute the fiducial cls')
parser.add_argument('--onlogin', default=False, action='store_true', help='Run the jobs in the login screen instead appending them to the queue')
parser.add_argument('--skip', default=[], nargs='+', help='Skip the following tracers. It can be given as DELS__0 to skip only DELS__0 tracer or DELS to skip all DELS tracers')
parser.add_argument('--override_yaml', default=False, action='store_true', help='Override the YAML file if already stored. Be ware that this could cause compatibility problems in your data!')
parser.add_argument('--batches', default=False, action='store_true',
help='Run the covariances in batches with all the ' +
'blocks sharing the same covariance workspace in a ' +
'single job')
parser.add_argument('--remove_cwsp', default=False, action='store_true',
help='Remove the covariance workspace once the ' +
'batch job has finished')
parser.add_argument('--clean_lock', default=False, action="store_true",
help="Remove lock files from failed runs.")
args = parser.parse_args()
##############################################################################
data = Data(data_path=args.INPUT, override=args.override_yaml)
queue = args.queue
njobs = args.njobs
onlogin = args.onlogin
nnodes = args.nnodes
if args.clean_lock:
clean_lock(data)
elif args.compute == 'cls':
launch_cls(data, queue, njobs, args.nc, args.mem, args.cls_fiducial, onlogin, args.skip)
elif args.compute == 'cov':
if args.batches:
launch_cov_batches(data, queue, njobs, args.nc, args.mem, onlogin,
args.skip, args.remove_cwsp, nnodes)
else:
launch_cov(data, queue, njobs, args.nc, args.mem, onlogin,
args.skip)
elif args.compute == 'to_sacc':
if args.to_sacc_use_nl and args.to_sacc_use_fiducial:
raise ValueError(
'Only one of --to_sacc_use_nl or --to_sacc_use_fiducial can be set')
elif args.to_sacc_use_nl:
use = 'nl'
elif args.to_sacc_use_fiducial:
use = 'fiducial'
else:
use = 'cls'
launch_to_sacc(data, args.to_sacc_name, use, queue, args.nc, args.mem, onlogin)
else:
raise ValueError(
"Compute value '{}' not understood".format(args.compute))