-
Notifications
You must be signed in to change notification settings - Fork 3.5k
/
Copy pathdevelop_utils.py
114 lines (83 loc) · 3.48 KB
/
develop_utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
import os
import numpy as np
# from pl_examples import LightningTemplateModel
from pytorch_lightning import seed_everything
from pytorch_lightning.callbacks import ModelCheckpoint
from pytorch_lightning.loggers import TensorBoardLogger, TestTubeLogger
from tests import TEMP_PATH, RANDOM_PORTS, RANDOM_SEEDS
from tests.base.model_template import EvalModelTemplate
import functools
def assert_speed_parity_relative(pl_times, pt_times, max_diff: float = 0.1):
# assert speeds
diffs = np.asarray(pl_times) - np.asarray(pt_times)
# norm by vanila time
diffs = diffs / np.asarray(pt_times)
assert np.alltrue(diffs < max_diff), \
f"lightning {diffs} was slower than PT (threshold {max_diff})"
def assert_speed_parity_absolute(pl_times, pt_times, nb_epochs, max_diff: float = 0.6):
# assert speeds
diffs = np.asarray(pl_times) - np.asarray(pt_times)
# norm by vanila time
diffs = diffs / nb_epochs
assert np.alltrue(diffs < max_diff), \
f"lightning {diffs} was slower than PT (threshold {max_diff})"
def get_default_logger(save_dir, version=None):
# set up logger object without actually saving logs
logger = TensorBoardLogger(save_dir, name='lightning_logs', version=version)
return logger
def get_data_path(expt_logger, path_dir=None):
# some calls contain only experiment not complete logger
# each logger has to have these attributes
name, version = expt_logger.name, expt_logger.version
# only the test-tube experiment has such attribute
if isinstance(expt_logger, TestTubeLogger):
expt = expt_logger.experiment if hasattr(expt_logger, 'experiment') else expt_logger
return expt.get_data_path(name, version)
# the other experiments...
if not path_dir:
if hasattr(expt_logger, 'save_dir') and expt_logger.save_dir:
path_dir = expt_logger.save_dir
else:
path_dir = TEMP_PATH
path_expt = os.path.join(path_dir, name, 'version_%s' % version)
# try if the new sub-folder exists, typical case for test-tube
if not os.path.isdir(path_expt):
path_expt = path_dir
return path_expt
def load_model_from_checkpoint(logger, root_weights_dir, module_class=EvalModelTemplate, path_expt=None):
trained_model = module_class.load_from_checkpoint(root_weights_dir)
assert trained_model is not None, 'loading model failed'
return trained_model
def assert_ok_model_acc(trainer, key='test_acc', thr=0.5):
# this model should get 0.80+ acc
acc = trainer.callback_metrics[key]
assert acc > thr, f"Model failed to get expected {thr} accuracy. {key} = {acc}"
def reset_seed():
seed = RANDOM_SEEDS.pop()
seed_everything(seed)
def set_random_master_port():
reset_seed()
port = RANDOM_PORTS.pop()
os.environ['MASTER_PORT'] = str(port)
def init_checkpoint_callback(logger):
checkpoint = ModelCheckpoint(logger.save_dir)
return checkpoint
def pl_multi_process_test(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
from multiprocessing import Process, Queue
queue = Queue()
def inner_f(queue, **kwargs):
try:
func(**kwargs)
queue.put(1)
except Exception as e:
import traceback
traceback.print_exc()
queue.put(-1)
p = Process(target=inner_f, args=(queue,), kwargs=kwargs)
p.start()
p.join()
result = queue.get()
assert result == 1
return wrapper