-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathutils.py
268 lines (214 loc) · 8.17 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
from __future__ import print_function
# General Util functions
from builtins import str
from sibispy import sibislogger as slog
import sibispy
import subprocess
import re
import tempfile
import shutil
import os.path
import pandas
import hashlib
import glob
date_format_ymd = '%Y-%m-%d'
# "Safe" CSV export - this will catch IO errors from trying to write to a file
# that is currently being read and will retry a number of times before giving
# up. This function will also confirm whether the newly created file is
# different from an already existing file of the same name. Only changed files
# will be updated.
def safe_dataframe_to_csv(df, fname, verbose=False):
import pandas
import time
import filecmp
import os
success = False
retries = 10
last_e = IOError(-999, "Default error - SHOULD NOT BE REACHED")
while (not success) and (retries > 0):
try:
df.to_csv(fname + '.new', index=False)
success = True
except IOError as e:
last_e = e
if e.errno == 11:
if verbose :
print("Failed to write to csv ! Retrying in 5s...")
time.sleep(5)
retries -= 1
else:
retries = 0
if not success:
slog.info("safe_dataframe_to_csv",
f"ERROR: failed to write file {fname} with errno {last_e.errno}")
return False
# Check if new file is equal to old file
if os.path.exists(fname) and filecmp.cmp(fname, fname + '.new',shallow=False):
# Equal - remove new file
os.remove(fname + '.new')
else:
# Not equal or no old file: put new file in its final place
os.rename(fname + '.new', fname)
if verbose:
print("Updated", fname)
return True
def dicom2bxh(dicom_path, bhx_file) :
cmd = "dicom2bxh "
if dicom_path and bhx_file :
cmd += "%s/* %s >& /dev/null" % ( dicom_path, bxh_file )
else :
cmd += "--help"
# Everything but 0 indicates an error occured
return call_shell_program(cmd)
def htmldoc(args) :
return call_shell_program("htmldoc " + args)
# args needs to be defined up to whre dicom path is defined
def subset_dcm2image(dcmArgs, dcmPath, dcmNumFiles, verbose = False) :
temp_dir = tempfile.mkdtemp()
dcmFileList=sorted(glob.glob(dcmPath +"/*.dcm"))[:dcmNumFiles]
for dcmFile in dcmFileList:
shutil.copy2(dcmFile, temp_dir)
args='%s %s 2>&1' % (dcmArgs, temp_dir)
result=dcm2image(args, verbose)
if temp_dir != "" :
shutil.rmtree(temp_dir)
return result
dcm2image_cmd = 'cmtk dcm2image '
def dcm2image(args, verbose = False) :
cmd = dcm2image_cmd + args
if verbose :
print(cmd)
return call_shell_program(cmd)
def detect_adni_phantom(args) :
return call_shell_program('cmtk detect_adni_phantom ' + args)
def gzip(args) :
return call_shell_program('gzip ' + args)
def zip(baseDir,zipFile,fileNames):
# if file already exists then first delete it otherwise zip returns code other than 0 !
absZipFile = os.path.join(baseDir,zipFile)
if os.path.exists(absZipFile):
os.remove(absZipFile)
cmd = 'cd %s; /usr/bin/zip -rqu %s %s' % (baseDir, zipFile, fileNames)
return call_shell_program(cmd)
def tar(args) :
cmd = 'tar ' + args
return call_shell_program(cmd)
def untar(tarfile, out_dir):
args = "-xzf " + tarfile + " --directory=" + out_dir
return tar(args)
def make_nifti(args):
cmd = "makenifti " + args
return call_shell_program(cmd)
# called by makenifti - we just have it hear for testing that makenifti runs
def sprlioadd(args):
cmd = "sprlioadd " + args
return call_shell_program(cmd)
def mdb_export(args):
cmd = "mdb-export " + args
return call_shell_program(cmd)
def make_nifti_from_spiral(spiral_file, outfile):
errcode, stdout, stderr = make_nifti("-s 0 %s %s" % (spiral_file, outfile[:-7]))
if os.path.exists(outfile[:-3]):
gzip('-9 ' + outfile[:-3])
return errcode, stdout, stderr
def Rscript(args) :
return call_shell_program('/usr/bin/Rscript ' + args)
def call_shell_program(cmd):
process = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
(out, err) = process.communicate()
return (process.returncode, out, err)
#
# Formarly in Rwrapper.py
#
# Label translation function - LimeSurvey to SRI/old REDCap style
def limesurvey_label_in_redcap( prefix, ls_label ):
return "%s_%s" % (prefix, re.sub( '_$', '', re.sub( r'[_\W]+', '_', re.sub( 'subjid', 'subject_id', ls_label.lower() ) ) ) )
# Map labels in a list according to a dictionary
def map_labels_to_dict( labels, ldict ):
new_labels = list()
for label in labels:
if label in list(ldict.keys()):
new_labels.append( ldict[label] )
else:
new_labels.append( label )
return new_labels
# Score one record by running R script
def run_rscript( row, script, scores_key = None):
tmpdir = tempfile.mkdtemp()
data_csv = os.path.join( tmpdir, 'data.csv' )
scores_csv = os.path.join( tmpdir, 'scores.csv' )
pandas.DataFrame( row ).T.to_csv( data_csv )
args = script + " " + data_csv + " " + scores_csv
(errcode, stdout, stderr) = Rscript(args)
if errcode :
# because it is run by apply we need to raise error
raise slog.sibisExecutionError('utils.run_rscript.' + hashlib.sha1(str(stderr).encode()).hexdigest()[0:6], 'Error: Rscript failed !', err_msg= str(stderr), args= args)
if scores_key :
scores = pandas.read_csv( scores_csv, index_col=0 )
shutil.rmtree( tmpdir )
return pandas.Series( name = row.name, data = scores.to_dict()[scores_key] )
scores = pandas.read_csv( scores_csv, index_col=None )
shutil.rmtree( tmpdir )
return scores.iloc[0]
"""
https://github.com/ActiveState/code/blob/master/recipes/Python/577982_Recursively_walk_Python_objects/recipe-577982.py
"""
from collections import Mapping, Set, Sequence
# dual python 2/3 compatability, inspired by the "six" library
string_types = (str, str) if str is bytes else (str, bytes)
iteritems = lambda mapping: getattr(mapping, 'iteritems', mapping.items)()
def objwalk(obj, path=(), memo=None):
if memo is None:
memo = set()
iterator = None
if isinstance(obj, Mapping):
iterator = iteritems
elif isinstance(obj, (Sequence, Set)) and not isinstance(obj, string_types):
iterator = enumerate
if iterator:
if id(obj) not in memo:
memo.add(id(obj))
for path_component, value in iterator(obj):
for result in objwalk(value, path + (path_component,), memo):
yield result
memo.remove(id(obj))
else:
yield path, obj
def ncanda_id_lookup(base_id, print_keys=False):
"""
Given an NCANDA ID, convert between site ids and case ids.
Returns as a string and only accepts one id at a time.
"""
session = sibispy.Session()
session.configure()
ifc = session.connect_server('xnat', True)
if not ifc:
print("Error: could not connect to xnat server!")
sys.exit()
# check if given ncanda s number:
match = re.search(r'NCANDA_S\d+', base_id)
if match:
search_field = 'xnat:subjectData/SUBJECT_ID'
result_idx = 2
else:
search_field = 'xnat:subjectData/SUBJECT_LABEL'
result_idx = 0
fields_per_subject = ['xnat:subjectData/SUBJECT_ID',
'xnat:subjectData/PROJECT',
'xnat:subjectData/SUBJECT_LABEL']
output = ""
pattern = (search_field, 'LIKE', '%' + base_id + '%')
subjects = list(ifc.search('xnat:subjectData',
fields_per_subject).where([pattern]).items())
if print_keys:
if len(subjects) > 0:
fmt = '{0},{1}'
res = [fmt.format(base_id,
record[result_idx]) for record in subjects]
output += '\n'.join(res)
else:
output += ''.join(base_id + ',\n')
else:
res = ['{0}\n'.format(record[result_idx]) for record in subjects]
output += ''.join(res)
return output