-
Notifications
You must be signed in to change notification settings - Fork 77
/
Copy pathos_util.py
586 lines (490 loc) · 23.4 KB
/
os_util.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
#! /usr/bin/env python3
#
# os_util.py
#
# Copyright 2019 Luan Carvalho Martins <[email protected]>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
# MA 02110-1301, USA.
#
#
import itertools
import re
import os
import shutil
import subprocess
from io import TextIOBase
from collections import namedtuple
import traceback
from ast import literal_eval
import time
import rdkit.Chem
verbosity_level = namedtuple("VerbosityLevel",
"error default warning info debug extra_debug timing")(-1, 0, 1, 2, 3, 4, 5)
def date_fmt():
""" Returns a formatted date and time string """
return time.strftime('%H%M%S_%d%m%Y')
def makedir(dir_name, error_if_exists=False, parents=False, verbosity=0):
""" Safely create a directory
:param str dir_name: name of the directory to be created
:param bool error_if_exists: throw an error if dir_name exists
:param bool parents: create parent dirs as needed
:param int verbosity: sets th verbosity level
"""
local_print('Entering makedir(dir_name={}, error_if_exists={})'.format(dir_name, error_if_exists),
msg_verbosity=verbosity_level.debug, current_verbosity=verbosity)
if not parents:
try:
os.mkdir(dir_name)
except OSError as error:
if error.errno == 17:
if error_if_exists:
local_print('Directory {} exists (and makedir was called with error_if_exists=True).'
''.format(dir_name), msg_verbosity=verbosity_level.error, current_verbosity=verbosity)
raise SystemExit(1)
else:
local_print('Could not create directory {}. Error was {}'.format(dir_name, error),
msg_verbosity=verbosity_level.error, current_verbosity=verbosity)
raise OSError(error)
else:
try:
os.makedirs(dir_name, exist_ok=(not error_if_exists))
except FileExistsError as error:
local_print('Could not create directory {}. Error was {}'.format(dir_name, error),
msg_verbosity=verbosity_level.error, current_verbosity=verbosity)
raise FileExistsError(error)
def read_file_to_buffer(filename, die_on_error=False, return_as_list=False, error_message=None, verbosity=0):
"""Read and return file contents
Parameters
----------
filename : str
File to be read
die_on_error : bool
Throw an error if file cannot be read
return_as_list : bool
Return data as list. Default: return as str
error_message : str
If die_on_error=True, print this message error instead of default one
verbosity : int
Sets the verbosity level
Returns
-------
str or list or bool
If return_as_list is False, a str will be returned. If return_as_list is True, a list will be returned. If
die_on_error is False and the file cannot be read, False will be returned.
"""
try:
with open(filename, 'r') as input_file:
if return_as_list:
data_buffer = input_file.readlines()
else:
data_buffer = input_file.read()
except (IOError, TypeError) as error:
if die_on_error:
if error_message is None:
error_message = 'Could not read file {} (and read_file_to_buffer was called with ' \
'die_on_error=False). Error was: {}'.format(filename, error)
else:
error_message += '\nCould not read file {} (and read_file_to_buffer wall called with ' \
'die_on_error=False). Error was: {}'.format(filename, error)
local_print(error_message, msg_verbosity=verbosity_level.error, current_verbosity=verbosity)
raise error
else:
return False
else:
return data_buffer
def run_gmx(gmx_bin, arg_list, input_data='', output_file=None, alt_environment=None, cwd=None, die_on_error=True,
verbosity=0):
""" Run gmx_bin with arg_list
:param str gmx_bin: path to Gromacs binary
:param list arg_list: pass these args to gmx
:param str input_data: data to be sent to gmx, empty str (default) to send nothing
:param str output_file: save output (stdout + stderr) to this file (default: None = don't save)
:param dict alt_environment: environment to be passed (on top of current) to Gromacs
:param str cwd: run in this directory
:param bool die_on_error: raise error if command returns an error code
:param int verbosity: verbose level
"""
this_env = os.environ.copy()
if alt_environment is not None:
this_env.update(alt_environment)
if isinstance(gmx_bin, list):
final_arg_list = gmx_bin[:]
elif isinstance(gmx_bin, str):
final_arg_list = [gmx_bin]
else:
local_print('Could not understand gmx bin input to run_gmx. gmx_bin = {} (type = {}). Invalid type'
''.format(gmx_bin, type(gmx_bin)),
msg_verbosity=verbosity_level.error, current_verbosity=verbosity)
raise TypeError("expected str or list, not {}".format(type(gmx_bin)))
final_arg_list.extend(arg_list)
gmx_handler = subprocess.Popen(final_arg_list, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
stdin=subprocess.PIPE, universal_newlines=True, env=this_env, cwd=cwd)
if input_data:
stdout, stderr = gmx_handler.communicate(input_data)
else:
stdout, stderr = gmx_handler.communicate()
if die_on_error and gmx_handler.returncode != 0:
local_print('Failed to run {} {}. Error code {}.\nCommand line was: {}\n\nstdout:\n{}\n\nstderr:\n{}'
''.format(gmx_bin, arg_list[0], gmx_handler.returncode, [gmx_bin] + arg_list, stdout, stderr),
msg_verbosity=verbosity_level.error, current_verbosity=verbosity)
raise SystemExit(1)
else:
if output_file is not None:
with open(output_file, 'w') as fh:
fh.write(stdout)
return namedtuple('ReturnData', 'stdout stderr code')(stdout, stderr, gmx_handler.returncode)
def assemble_shell_command(gmx_bin, arg_list, input_data='', output_file=None, cwd=None, die_on_error=True,
verbosity=0):
""" Return a gmx_bin command with arg_list
:param [str, list] gmx_bin: path to GROMACS binary
:param list arg_list: pass these args to gmx
:param str input_data: data to be send to gmx, empty str (default) to send nothing
:param str output_file: pipe stdout + stderr to this file
:param str cwd: run in this directory
:param bool die_on_error: test for return code
:param int verbosity: verbose level
"""
local_print('Entering assemble_shell_command(gmx_bin={}, arg_list={}, input_data={}, output_file={}, verbosity={})'
''.format(gmx_bin, arg_list, input_data, output_file, verbosity),
msg_verbosity=verbosity_level.debug, current_verbosity=verbosity)
full_shell_command = '( cd {} && '.format(cwd) if cwd else ''
for old, new in {'\n': r'\n', '"': r'\"'}.items():
input_data = input_data.replace(old, new)
full_shell_command += 'printf "{}" | '.format(input_data) if input_data else ''
if isinstance(gmx_bin, list):
full_shell_command += ' '.join(gmx_bin + arg_list)
elif type(gmx_bin) == str:
full_shell_command += ' '.join([gmx_bin] + arg_list)
else:
local_print('Could not understand gmx bin input to run_gmx. gmx_bin = {} (type = {}). Invalid type'
''.format(gmx_bin, type(gmx_bin)),
msg_verbosity=verbosity_level.error, current_verbosity=verbosity)
raise TypeError("expected str or list, not {}".format(type(gmx_bin)))
full_shell_command += ' > {} 2>&1'.format(output_file) if output_file else ''
full_shell_command += r' )'.format(cwd) if cwd else ''
full_shell_command += ' || {{ echo "Failed to run command {} at line ${{LINENO}}" && exit; }}' \
''.format(input_data) if die_on_error else ''
return full_shell_command
def detect_type(value, test_for_boolean=True, test_for_dict=False, test_for_list=False, list_max_split=0, verbosity=0):
""" Detect and converts input types. First, using Python eval. Then, testing for more flexible formats.
Parameters
----------
value : Any
The value to be converted
test_for_boolean : bool
Also tests for boolean values (false/true, on/off, yes/no)
test_for_dict : bool
Also tests for flexible-formatted dicts
test_for_list : bool
Also tests for flexible-formatted lists
list_max_split : int
If a list is detected and list_max_split is nonzero, at most list_max_split splits occur and the remainder of
the string is returned as the final element of the list
verbosity : int
sets the verbosity level
Returns
-------
Any
"""
# If value is not a str there is no need to process it
if not value or not isinstance(value, str):
return value
value = value.lstrip().rstrip()
try:
converted_value = literal_eval(value)
except (ValueError, SyntaxError):
if test_for_boolean:
if value.lower() in ['false', 'off', 'no']:
return False
elif value.lower() in ['true', 'on', 'yes']:
return True
if test_for_dict:
try:
converted_value = {detect_type(each_key): detect_type(each_value.split('#')[0], test_for_list=True,
verbosity=verbosity)
for each_pair in re.split('[;\n]', value)
if len(each_pair.rstrip().lstrip()) > 0
and each_pair.rstrip().lstrip()[0] not in ['#']
for each_key, each_value in [re.split('[:=]', each_pair)]}
except (ValueError, IndexError):
if value.count(';') > 0 or value.count(',') > 0:
local_print('Your input "{}" seems to be a dictionary, but could not be parsed as such. Maybe you '
'want to check your input.'.format(value), msg_verbosity=verbosity_level.warning,
current_verbosity=verbosity)
else:
return converted_value
if test_for_list:
converted_value = [detect_type(each_value) for each_value in
re.split('[;,\n]', value, maxsplit=list_max_split) if each_value]
if len(converted_value) <= 1:
try:
converted_value = [int(i) for i in re.split(r'\s+', value, maxsplit=list_max_split) if i]
except ValueError:
try:
converted_value = [float(i) for i in re.split(r'\s+', value, maxsplit=list_max_split) if i]
except ValueError:
if value.count(';') > 0 or value.count(',') > 0:
local_print('Your input "{}" seems to be a list, but could not be parsed as such. Maybe '
'you want to check your input.'.format(value),
msg_verbosity=verbosity_level.warning, current_verbosity=verbosity)
return value
return converted_value
return value
else:
# Convert tuples to lists (to make sure the return is mutable)
if isinstance(converted_value, tuple):
converted_value = list(converted_value)
return converted_value
def local_print(this_string, msg_verbosity=0, logfile=None, current_verbosity=0):
""" Prints formatted messages depending on the verbosity
:param str this_string: string to be printed
:param int msg_verbosity: verbosity level of the message
:param logfile: prints all messages to this file as well
:param int current_verbosity: current verbosity level
"""
verbosity_name_dict = {verbosity_level.error: 'ERROR',
verbosity_level.warning: 'WARNING',
verbosity_level.info: 'INFO',
verbosity_level.debug: 'DEBUG',
verbosity_level.extra_debug: 'EXTRA_DEBUG',
verbosity_level.timing: 'TIMING'}
if current_verbosity >= msg_verbosity or msg_verbosity == verbosity_level.error:
if msg_verbosity == verbosity_level.debug:
formatted_string = '[{}] {}'.format(verbosity_name_dict[msg_verbosity],
'\n[{}] '.format(verbosity_name_dict[msg_verbosity])
.join(this_string.split('\n')))
elif msg_verbosity == verbosity_level.error:
formatted_string = '\n{:=^50}\n{}{:=^50}\n'.format(' STACK INFO ', ''.join(traceback.format_stack()),
' STACK INFO ')
formatted_string += '[{}] {}'.format(verbosity_name_dict[msg_verbosity],
'\n[{}] '.format(verbosity_name_dict[msg_verbosity])
.join(this_string.split('\n')))
elif msg_verbosity != verbosity_level.default:
formatted_string = '[{}] {}'.format(verbosity_name_dict[msg_verbosity],
'\n[{}] '.format(verbosity_name_dict[msg_verbosity])
.join(this_string.split('\n')))
else:
formatted_string = this_string
print(formatted_string)
if logfile:
if isinstance(logfile, str):
# Logfile is a filename, append to it
with open(logfile, 'a+') as fh:
fh.write('{}\n'.format(this_string))
elif isinstance(logfile, TextIOBase):
# Logfile is textfile object, write to it
logfile.write('{}\n'.format(this_string))
else:
raise TypeError("logfile must be str or TextIO, got {} instead".format(type(logfile)))
def recursive_map(function_f, iterable_i, args=(), kwargs=None):
""" Recursively apply function_f to iterable_i, unpack args e kwargs """
if kwargs is None:
kwargs = {}
if isinstance(iterable_i, str):
return function_f(iterable_i, *args, **kwargs)
elif isinstance(iterable_i, dict):
tempdict = iterable_i.copy()
for inner_key, inner_value in tempdict.items():
tempdict[inner_key] = recursive_map(function_f, inner_value, *args, **kwargs)
return tempdict
else:
return iterable_i
def recursive_update(base, updater):
""" Implementation of a recursive update for dicts
:param base: dictionary to be updated
:param updater: source of new values
:rtype: dict
"""
for k, v in updater.items():
if isinstance(v, dict) and isinstance(base.get(k, {}), dict):
base[k] = recursive_update(base.get(k, {}), v)
else:
base[k] = v
return base
def natural_sort_key(s):
""" Prepare a natural sorting key list. Copied from
https://stackoverflow.com/questions/4836710/does-python-have-a-built-in-function-for-string-natural-sort
:param str s: input list of str
:rtype: list
"""
return [int(text) if text.isdigit() else text.lower() for text in re.split('([0-9]+)', s)]
def wrapper_fn(fn, args, kwargs):
return fn(*args, **kwargs)
def starmap_unpack(function, pool, args_iter=None, kwargs_iter=None):
""" Wrapper around multiprocessing.starmap
:param function: run this function with arg from and **kwargs
:param multiprocessing.Pool pool: use this pool
:param iter args_iter: use args from this iter
:param iter kwargs_iter: use kwargs from this iter
"""
if args_iter and kwargs_iter:
assembled_args = zip(itertools.repeat(function), args_iter, kwargs_iter)
elif args_iter is None:
assembled_args = zip(itertools.repeat(function), itertools.repeat([]), kwargs_iter)
elif kwargs_iter is None:
assembled_args = zip(itertools.repeat(function), args_iter, itertools.repeat({}))
else:
raise ValueError('args_iter or kwargs_iter mandatory')
return pool.starmap(wrapper_fn, list(assembled_args))
def inner_search(needle, haystack, apply_filter=None, find_last=False, die_on_error=False):
""" Search for needle in items in haystack, returning the index of the first found occurrence
:param needle: what to search. If callable, needle will be called for each item in haystack.
:param list haystack: where to search
:param [str, function] apply_filter: filterfalse lines using this function or, if str, by removing strings that
starts with apply_filter
:param bool find_last: search for the last occurrence instead of the first one
:param bool die_on_error: when needle not in haystack, if true, raise VauleError, if False, return False
:return: int
"""
def search_func(needle, item):
if callable(needle):
return needle(item)
elif isinstance(needle, set) and isinstance(item, set):
return needle.issubset(item)
else:
try:
return needle in item
except TypeError:
return needle == item
if apply_filter is not None and not callable(apply_filter):
filter_str = apply_filter
apply_filter = lambda line: line.startswith(filter_str)
last_occur, idx = -1, -1
for idx, i in enumerate(haystack):
if apply_filter is not None and apply_filter(i):
continue
if search_func(needle, i):
if find_last:
last_occur = idx
else:
break
else:
if die_on_error and ((not find_last) or (find_last and last_occur == -1)):
raise ValueError("{} not in the iterator".format(needle))
elif (not die_on_error) and ((not find_last) or (find_last and last_occur == -1)):
return False
if find_last:
return last_occur
else:
return idx
# FIXME: remove this
def file_copy(src, dest, follow_symlinks=True, error_if_exists=False, verbosity=0):
""" Copy file, data and metadata, optionally returning an error if dest exists
:param str src: source file
:param str dest: destination file or path
:param bool follow_symlinks: if false, and src is a symbolic link, dst will be created as a symbolic link; if true
and src is a symbolic link, dst will be a copy of the file src refers to.
:param bool error_if_exists: raise an error if file exists
:param int verbosity: verbosity level
:return: str
"""
if error_if_exists and (os.path.exists(dest) and not os.path.isdir(dest)):
destfile = os.path.join(dest, os.path.basename(src)) if os.path.isdir(dest) else dest
raise FileExistsError("File {} exists".format(destfile))
else:
local_print('Copying {} to {}'.format(src, dest),
current_verbosity=verbosity, msg_verbosity=verbosity_level.debug)
return shutil.copy2(src, dest, follow_symlinks=follow_symlinks)
def parse_simple_config_file(input_data, verbosity=0):
""" Parse a simple key=value config file or data using ConfigParser
Parameters
----------
input_data : str
Path to a file or key:val data to be parsed
verbosity : int
Sets the verbosity level
Returns
-------
all_classes.Namespace
Values read, with types detected
"""
from configparser import ConfigParser
from all_classes import Namespace
if not input_data:
return Namespace()
if isinstance(input_data, dict):
# input_data is a dict already, I won't further process it
return input_data
read_data = read_file_to_buffer(input_data, return_as_list=False, verbosity=verbosity)
if not read_data:
# input_data is not a filename, try to interpret as the config data itself
read_data = input_data
# Use config parser to process input
parser = ConfigParser()
parser.read_string("[dummy]\n" + read_data)
result_data = dict(parser.items('dummy'))
result_data = Namespace(recursive_map(detect_type, dict(result_data)))
return result_data
from functools import wraps
def _get_scope(f, args):
"""Get scope name of given function."""
from inspect import getmodule
_scope = getmodule(f).__name__
# guess that function is a method of it's class
try:
if f.__name__ in dir(args[0].__class__):
_scope += '.' + args[0].__class__.__name__
_scope += '.' + f.__name__
else:
_scope += '.' + f.__name__
except IndexError:
_scope += '.' + f.__name__
return _scope
def trace_function(f):
"""Display argument and context call information of given function."""
@wraps(f)
def wrap_trace(*args, **kwargs):
formatted_args = []
for each_arg in args:
try:
formatted_args.append(f'<rdkit.Chem.rdchem.Mol object at {hex(id(each_arg))} '
f'(Name="{each_arg.GetProp("_Name")}"; '
f'SMILES={rdkit.Chem.MolToSmiles(each_arg)})>')
except (AttributeError, KeyError):
formatted_args.append(each_arg)
formatted_args_kwargs = {}
for each_key, each_arg in kwargs.items():
try:
formatted_args_kwargs[each_key] = f'<rdkit.Chem.rdchem.Mol object at {hex(id(each_arg))} ' \
f'(Name="{each_arg.GetProp("_Name")}"; ' \
f'SMILES={rdkit.Chem.MolToSmiles(each_arg)})>'
except (AttributeError, KeyError):
formatted_args_kwargs[each_key] = each_arg
local_print("Entering {} with: {} {}".format(_get_scope(f, args), formatted_args, kwargs),
msg_verbosity=verbosity_level.debug, current_verbosity=kwargs.get('verbosity', 1))
return f(*args, **kwargs)
return wrap_trace
def time_function(f):
""" Time the execution of a function """
from time import perf_counter
@wraps(f)
def wrap_time(*args, **kwargs):
t0 = perf_counter()
value = f(*args, **kwargs)
t1 = perf_counter()
local_print("Execution of {} took {:0.4f} seconds".format(_get_scope(f, args), t1 - t0),
msg_verbosity=verbosity_level.timing, current_verbosity=5)
return value
return wrap_time
def flatten(iterable):
""" Flattens an iterable """
it = iter(iterable)
for e in it:
if isinstance(e, (list, tuple)):
for f in flatten(e):
yield f
else:
yield e