From 1a173d8c910003de7fccd81ac6b9c62b18b15ccf Mon Sep 17 00:00:00 2001 From: Mariia Trofimova Date: Sun, 6 Oct 2019 23:03:35 +0300 Subject: [PATCH 01/12] feat: add reducelronplateau callback --- pytorch_lightning/callbacks/__init__.py | 3 ++- pytorch_lightning/callbacks/pt_callbacks.py | 30 +++++++++++++++++++++ 2 files changed, 32 insertions(+), 1 deletion(-) diff --git a/pytorch_lightning/callbacks/__init__.py b/pytorch_lightning/callbacks/__init__.py index 9538036563d79..cb843197edc2f 100644 --- a/pytorch_lightning/callbacks/__init__.py +++ b/pytorch_lightning/callbacks/__init__.py @@ -1,7 +1,8 @@ -from .pt_callbacks import EarlyStopping, ModelCheckpoint, GradientAccumulationScheduler +from .pt_callbacks import EarlyStopping, ModelCheckpoint, ReduceLROnPlateauScheduler, GradientAccumulationScheduler __all__ = [ 'EarlyStopping', 'ModelCheckpoint', + 'ReduceLROnPlateauScheduler', 'GradientAccumulationScheduler', ] diff --git a/pytorch_lightning/callbacks/pt_callbacks.py b/pytorch_lightning/callbacks/pt_callbacks.py index 877f74948ae62..aa85571b9fcb0 100644 --- a/pytorch_lightning/callbacks/pt_callbacks.py +++ b/pytorch_lightning/callbacks/pt_callbacks.py @@ -144,6 +144,36 @@ def on_train_end(self, logs=None): print('Epoch %05d: early stopping' % (self.stopped_epoch + 1)) +class ReduceLROnPlateauScheduler(Callback): + """ + Reduce learning rate when the monitored metric has stopped improving. + Wrapper for torch.optim.lr_schuduler.ReduceLROnPlateau learning rate + schedulers. + + # Arguments + schedulers: list of torch.optim.lr_scheduler.ReduceLROnPlateau + monitor: quantity to be monitored. + """ + + def __init__(self, schedulers, monitor='val_loss'): + super(ReduceLROnPlateauScheduler, self).__init__() + + self.monitor = monitor + self.schedulers = schedulers + + def on_epoch_end(self, epoch, logs=None): + current = logs.get(self.monitor) + stop_training = False + if current is None: + print('ReduceLROnPlateau conditioned on metric `%s` ' + 'which is not available. Available metrics are: %s' % + (self.monitor, ','.join(list(logs.keys()))), RuntimeWarning) + exit(-1) + + for scheduler in self.schedulers: + scheduler.step(current, epoch=epoch) + + class ModelCheckpoint(Callback): """Save the model after every epoch. `filepath` can contain named formatting options, From acb30c0f42c143b25c6335d46beb77dd2584e678 Mon Sep 17 00:00:00 2001 From: Mariia Trofimova Date: Sun, 6 Oct 2019 23:04:38 +0300 Subject: [PATCH 02/12] feat: use reducelronplateau callback in trainer --- pytorch_lightning/trainer/trainer.py | 431 ++++++++++++--------------- 1 file changed, 184 insertions(+), 247 deletions(-) diff --git a/pytorch_lightning/trainer/trainer.py b/pytorch_lightning/trainer/trainer.py index 10cf2c57aa6a5..305d48c0fb4f7 100644 --- a/pytorch_lightning/trainer/trainer.py +++ b/pytorch_lightning/trainer/trainer.py @@ -15,13 +15,13 @@ from torch.optim.optimizer import Optimizer from pytorch_lightning.root_module.root_module import LightningModule -from pytorch_lightning.root_module import memory +from pytorch_lightning.root_module.memory import get_gpu_memory_map from pytorch_lightning.logging import TestTubeLogger from pytorch_lightning.trainer.trainer_io import TrainerIO from pytorch_lightning.pt_overrides.override_data_parallel import ( LightningDistributedDataParallel, LightningDataParallel) from pytorch_lightning.callbacks import GradientAccumulationScheduler, \ - ModelCheckpoint, EarlyStopping + ReduceLROnPlateauScheduler, ModelCheckpoint, EarlyStopping from pytorch_lightning.utilities.debugging import MisconfigurationException import pdb from pytorch_lightning.trainer import ignored_warnings @@ -66,7 +66,7 @@ def __init__(self, process_position=0, nb_gpu_nodes=1, gpus=None, - log_gpu_memory=None, + log_gpu_memory=False, show_progress_bar=True, overfit_pct=0.0, track_grad_norm=-1, @@ -98,7 +98,7 @@ def __init__(self, :param process_position: shown in the tqdm bar :param nb_gpu_nodes: number of GPU nodes :param gpus: int. (ie: 2 gpus) OR list to specify which GPUs [0, 1] or '0,1' - :param log_gpu_memory: str. None, 'min_max', 'all' + :param log_gpu_memory: Bool. If true, adds memory logs :param show_progress_bar: Bool. If true shows tqdm bar :param overfit_pct: float. uses this much of all datasets :param track_grad_norm: int. -1 no tracking. Otherwise tracks that norm @@ -174,13 +174,13 @@ def __init__(self, verbose=True, mode='min' ) + self.lr_scheduler_callback = None # configure logger self.logger = logger if self.logger is None: self.logger = TestTubeLogger( save_dir=self.default_save_path, - version=self.slurm_job_id, name='lightning_logs' ) @@ -241,15 +241,6 @@ def __init__(self, self.amp_level = amp_level self.__init_amp(use_amp) - @property - def slurm_job_id(self): - try: - job_id = os.environ['SLURM_JOB_ID'] - job_id = int(job_id) - except Exception as e: - job_id = None - return job_id - def __configure_weights_path(self, checkpoint_callback, weights_save_path): """ Weight path set in this priority: @@ -285,8 +276,6 @@ def __init_amp(self, use_amp): raise ModuleNotFoundError(msg) def __configure_accumulated_gradients(self, accumulate_grad_batches): - self.accumulate_grad_batches = None - if isinstance(accumulate_grad_batches, dict): self.accumulation_scheduler = GradientAccumulationScheduler(accumulate_grad_batches) elif isinstance(accumulate_grad_batches, int): @@ -436,7 +425,7 @@ def __set_nvidia_flags(self, is_slurm_managing_tasks, data_parallel_device_ids): @property def data_parallel(self): - return self.use_dp or self.use_ddp or self.use_ddp2 + return self.use_dp or self.use_ddp def __determine_data_use_amount(self, train_percent_check, val_percent_check, test_percent_check, overfit_pct): @@ -536,7 +525,7 @@ def __evaluation_forward(self, model, batch, batch_idx, dataloader_idx, test=Fal args.append(dataloader_idx) # handle DP, DDP forward - if self.use_ddp or self.use_dp or self.use_ddp2: + if self.use_ddp or self.use_dp: output = model(*args) return output @@ -563,6 +552,7 @@ def evaluate(self, model, dataloaders, max_batches, test=False): :param model: PT model :param dataloaders: list of PT dataloaders :param max_batches: Scalar + :param dataloader_idx: :param test: boolean :return: """ @@ -591,10 +581,7 @@ def evaluate(self, model, dataloaders, max_batches, test=False): # ----------------- # RUN EVALUATION STEP # ----------------- - output = self.__evaluation_forward(model, - batch, - batch_idx, - dataloader_idx, + output = self.__evaluation_forward(model, batch, batch_idx, dataloader_idx, test) # track outputs for collation @@ -636,100 +623,96 @@ def get_dataloaders(self, model): self.get_test_dataloaders = model.test_dataloader self.get_val_dataloaders = model.val_dataloader - # call warnings from proc zero only which triggers dataloaders - # if those have to download data it will only happen on proc 0 - if self.proc_rank == 0: - on_ddp = self.use_ddp or self.use_ddp2 - if on_ddp and not isinstance(self.get_train_dataloader().sampler, DistributedSampler): - msg = """ - You're using multiple gpus and multiple nodes without using a DistributedSampler - to assign a subset of your data to each process. To silence this warning, pass a - DistributedSampler to your DataLoader. - - ie: this: - dataset = myDataset() - dataloader = Dataloader(dataset) - - becomes: - dataset = myDataset() - dist_sampler = torch.utils.data.distributed.DistributedSampler(dataset) - dataloader = Dataloader(dataset, sampler=dist_sampler) - - If you want each process to load the full dataset, ignore this warning. - """ - warnings.warn(msg) + if self.use_ddp and not isinstance(self.get_train_dataloader().sampler, DistributedSampler): + msg = """ + You're using multiple gpus and multiple nodes without using a DistributedSampler + to assign a subset of your data to each process. To silence this warning, pass a + DistributedSampler to your DataLoader. + + ie: this: + dataset = myDataset() + dataloader = Dataloader(dataset) + + becomes: + dataset = myDataset() + dist_sampler = torch.utils.data.distributed.DistributedSampler(dataset) + dataloader = Dataloader(dataset, sampler=dist_sampler) - if on_ddp and self.get_val_dataloaders() is not None: - for dataloader in self.get_val_dataloaders(): - if not isinstance(dataloader.sampler, DistributedSampler): - msg = """ - Your val_dataloader(s) don't use DistributedSampler. - - You're using multiple gpus and multiple nodes without using a - DistributedSampler to assign a subset of your data to each process. - To silence this warning, pass a DistributedSampler to your DataLoader. - - ie: this: - dataset = myDataset() - dataloader = Dataloader(dataset) - - becomes: - dataset = myDataset() - dist_sampler = torch.utils.data.distributed.DistributedSampler(dataset) - dataloader = Dataloader(dataset, sampler=dist_sampler) - - If you want each process to load the full dataset, ignore this warning. - """ - warnings.warn(msg) - break - - if on_ddp and self.get_test_dataloaders() is not None: - for dataloader in self.get_test_dataloaders(): - if not isinstance(dataloader.sampler, DistributedSampler): - msg = """ - Your test_dataloader(s) don't use DistributedSampler. - - You're using multiple gpus and multiple nodes without using a - DistributedSampler to assign a subset of your data to each process. - To silence this warning, pass a DistributedSampler to your DataLoader. - - ie: this: - dataset = myDataset() - dataloader = Dataloader(dataset) - - becomes: - dataset = myDataset() - dist_sampler = torch.utils.data.distributed.DistributedSampler(dataset) - dataloader = Dataloader(dataset, sampler=dist_sampler) - - If you want each process to load the full dataset, ignore this warning. - """ - warnings.warn(msg) - break - - if self.use_ddp or self.use_ddp2: - # wait for all processes to catch up - dist.barrier() - - # load each dataloader - self.get_train_dataloader() - self.get_test_dataloaders() - self.get_val_dataloaders() + If you want each process to load the full dataset, ignore this warning. + """ + warnings.warn(msg) + + if self.use_ddp and self.get_val_dataloaders is not None: + for dataloader in self.get_val_dataloaders(): + if not isinstance(dataloader.sampler, DistributedSampler): + msg = """ + Your val_dataloader(s) don't use DistributedSampler. + You're using multiple gpus and multiple nodes without using a DistributedSampler + to assign a subset of your data to each process. To silence this warning, pass a + DistributedSampler to your DataLoader. + + ie: this: + dataset = myDataset() + dataloader = Dataloader(dataset) + + becomes: + dataset = myDataset() + dist_sampler = torch.utils.data.distributed.DistributedSampler(dataset) + dataloader = Dataloader(dataset, sampler=dist_sampler) + + If you want each process to load the full dataset, ignore this warning. + """ + warnings.warn(msg) + break + + if self.use_ddp and self.get_test_dataloaders is not None: + for dataloader in self.get_test_dataloaders(): + if not isinstance(dataloader.sampler, DistributedSampler): + msg = """ + Your test_dataloader(s) don't use DistributedSampler. + You're using multiple gpus and multiple nodes without using a DistributedSampler + to assign a subset of your data to each process. To silence this warning, pass a + DistributedSampler to your DataLoader. + + ie: this: + dataset = myDataset() + dataloader = Dataloader(dataset) + + becomes: + dataset = myDataset() + dist_sampler = torch.utils.data.distributed.DistributedSampler(dataset) + dataloader = Dataloader(dataset, sampler=dist_sampler) + + If you want each process to load the full dataset, ignore this warning. + """ + warnings.warn(msg) + break # ----------------------------- # MODEL TRAINING # ----------------------------- def fit(self, model): # when using multi-node or DDP within a node start each module in a separate process - if self.use_ddp2: - task = int(os.environ['SLURM_LOCALID']) - self.ddp_train(task, model) + if self.use_ddp: - elif self.use_ddp: - if self.is_slurm_managing_tasks: + if self.use_ddp2: + task = int(os.environ['SLURM_LOCALID']) + self.ddp_train(task, model) + + elif self.is_slurm_managing_tasks: task = int(os.environ['SLURM_LOCALID']) self.ddp_train(task, model) else: + nb_gpus = self.nb_requested_gpus + nb_tasks = self.nb_slurm_tasks + msg = f""" + You requested {nb_gpus}s GPUs but launched {nb_tasks}s slurm tasks. + We will launch {nb_gpus}s processes for you. + We recommend you let slurm manage the processes by setting: + --ntasks-per-node={nb_gpus}s + If you're not using SLURM, ignore this message! + """ + warnings.warn(msg) mp.spawn(self.ddp_train, nprocs=self.num_gpus, args=(model, )) # 1 gpu or dp option triggers training using DP module @@ -766,12 +749,25 @@ def init_optimizers(self, optimizers): # two lists elif len(optimizers) == 2 and isinstance(optimizers[0], list): optimizers, lr_schedulers = optimizers + lr_schedulers = self.configure_schedulers(lr_schedulers) return optimizers, lr_schedulers # single list or tuple elif isinstance(optimizers, list) or isinstance(optimizers, tuple): return optimizers, [] + def configure_schedulers(self, schedulers): + custom_schedulers = [] + i = 0 + while i < len(schedulers): + if isinstance(schedulers[i], torch.optim.lr_scheduler.ReduceLROnPlateau): + custom_schedulers.append(schedulers.pop(i)) + i += 1 + if custom_schedulers: + self.lr_scheduler_callback = ReduceLROnPlateauScheduler(custom_schedulers, + monitor='val_loss') + return schedulers + def __single_gpu_train(self, model): # CHOOSE OPTIMIZER # allow for lr schedulers as well @@ -906,25 +902,12 @@ def __init_tcp_connection(self): :param tries: :return: """ - - # use slurm job id for the port number - # guarantees unique ports across jobs from same grid search - try: - # use the last 4 numbers in the job id as the id - default_port = os.environ['SLURM_JOB_ID'] - default_port = default_port[-4:] - - # all ports should be in the 10k+ range - default_port = int(default_port) + 15000 - - except Exception as e: - default_port = 12910 - - # if user gave a port number, use that one instead + # sets the appropriate port try: - default_port = os.environ['MASTER_PORT'] + port = os.environ['MASTER_PORT'] except Exception: - os.environ['MASTER_PORT'] = str(default_port) + port = 12910 + os.environ['MASTER_PORT'] = str(port) # figure out the root node addr try: @@ -934,6 +917,7 @@ def __init_tcp_connection(self): root_node = self.resolve_root_node_address(root_node) os.environ['MASTER_ADDR'] = root_node + dist.init_process_group("nccl", rank=self.proc_rank, world_size=self.world_size) def resolve_root_node_address(self, root_node): @@ -1090,8 +1074,7 @@ def run_training_epoch(self): # --------------- # RUN TRAIN STEP # --------------- - output = self.__run_training_batch(batch, batch_nb) - batch_result, grad_norm_dic, batch_step_metrics = output + batch_result, grad_norm_dic = self.__run_training_batch(batch, batch_nb) early_stop_epoch = batch_result == -1 # --------------- @@ -1110,9 +1093,29 @@ def run_training_epoch(self): # when metrics should be logged if batch_nb % self.row_log_interval == 0 or early_stop_epoch: + # count items in memory + # nb_params, nb_tensors = count_mem_items() + + model = self.__get_model() + metrics = self.__training_tqdm_dict - # logs user requested information to logger - self.__log_metrics(batch_step_metrics, grad_norm_dic) + # add gpu memory + if self.on_gpu and self.log_gpu_memory: + mem_map = get_gpu_memory_map() + metrics.update(mem_map) + + # add norms + metrics.update(grad_norm_dic) + + if self.__is_function_implemented('on_training_metrics'): + model.on_training_metrics(metrics) + + # log metrics + scalar_metrics = self.__metrics_to_scalars( + metrics, blacklist=self.__log_vals_blacklist()) + if self.proc_rank == 0 and self.logger is not None: + self.logger.log_metrics(scalar_metrics, step_num=self.global_step) + self.logger.save() # end epoch early if early_stop_epoch: @@ -1123,32 +1126,6 @@ def run_training_epoch(self): model = self.__get_model() model.on_epoch_end() - def __log_metrics(self, metrics, grad_norm_dic): - """ - Logs the metric dict passed in - :param metrics: - :param grad_norm_dic: - :return: - """ - # added metrics by Lightning for convenience - metrics['epoch'] = self.current_epoch - - # add gpu memory - if self.on_gpu and self.log_gpu_memory: - mem_map = memory.get_memory_profile(self.log_gpu_memory) - metrics.update(mem_map) - - # add norms - metrics.update(grad_norm_dic) - - # turn all tensors to scalars - scalar_metrics = self.__metrics_to_scalars(metrics) - - # log actual metrics - if self.proc_rank == 0 and self.logger is not None: - self.logger.log_metrics(scalar_metrics, step_num=self.global_step) - self.logger.save() - def test(self, model=None): if model is not None: self.testing = True @@ -1156,7 +1133,7 @@ def test(self, model=None): else: self.__run_evaluation(test=True) - def __metrics_to_scalars(self, metrics): + def __metrics_to_scalars(self, metrics, blacklist=set()): new_metrics = {} for k, v in metrics.items(): if type(v) is torch.Tensor: @@ -1165,6 +1142,9 @@ def __metrics_to_scalars(self, metrics): if type(v) is dict: v = self.__metrics_to_scalars(v) + if k not in blacklist: + new_metrics[k] = float(v) + return new_metrics def __log_vals_blacklist(self): @@ -1218,7 +1198,7 @@ def __training_forward(self, batch, batch_nb, opt_idx): if len(self.optimizers) > 1: args.append(opt_idx) - if self.use_ddp or self.use_ddp2: + if self.use_ddp: output = self.model(*args) elif self.use_dp: output = self.model(*args) @@ -1233,64 +1213,41 @@ def __training_forward(self, batch, batch_nb, opt_idx): else: output = self.model.training_step(*args) - # format and reduce outputs accordingly - loss, progress_bar_metrics, log_metrics = self.__process_output(output, train=True) - return loss, progress_bar_metrics, log_metrics - - def __process_output(self, output, train=False): - """ - Reduces output according to the training mode. - Separates loss from logging and tqdm metrics - :param output: - :return: - """ + # --------------- + # TQDM metrics + # --------------- try: - progress_output = output['progress_bar'] + progress_output = output['progress'] # reduce progress metrics for tqdm when using dp - if train and self.use_dp or self.use_ddp2: + if self.use_dp or self.use_ddp2: nb_gpus = self.num_gpus progress_output = reduce_distributed_output(progress_output, nb_gpus) - progress_bar_metrics = progress_output - except Exception: - progress_bar_metrics = {} - - # extract metrics to log to experiment - try: - log_output = output['log'] - - # reduce progress metrics for tqdm when using dp - if train and self.use_dp or self.use_ddp2: - nb_gpus = self.num_gpus - log_output = reduce_distributed_output(log_output, nb_gpus) - - log_metrics = log_output + model_specific_tqdm_metrics_dic = progress_output except Exception: - log_metrics = {} + model_specific_tqdm_metrics_dic = {} # --------------- # EXTRACT LOSS # --------------- # if output dict doesn't have the keyword loss # then assume the output=loss if scalar - loss = None - if train: - try: - loss = output['loss'] - except Exception: - if type(output) is torch.Tensor: - loss = output - else: - raise RuntimeError( - 'No `loss` value in the dictionary returned from `model.training_step()`.' - ) + try: + loss = output['loss'] + except Exception: + if type(output) is torch.Tensor: + loss = output + else: + raise RuntimeError( + 'No `loss` value in the dictionary returned from `model.training_step()`.' + ) - # when using dp need to reduce the loss - if self.use_dp or self.use_ddp2: - loss = reduce_distributed_output(loss, self.num_gpus) + # when using dp need to reduce the loss + if self.use_dp or self.use_ddp2: + loss = reduce_distributed_output(loss, self.num_gpus) - return loss, progress_bar_metrics, log_metrics + return loss, model_specific_tqdm_metrics_dic def __clip_gradients(self): if self.gradient_clip_val > 0: @@ -1307,9 +1264,6 @@ def __run_training_batch(self, batch, batch_nb): # track grad norms grad_norm_dic = {} - # track metrics to log - all_log_metrics = [] - if batch is None: return 0, grad_norm_dic @@ -1327,37 +1281,27 @@ def __run_training_batch(self, batch, batch_nb): # call training_step once per optimizer for opt_idx, optimizer in enumerate(self.optimizers): - # wrap the forward step in a closure so second order methods work - def optimizer_closure(): - # forward pass - output = self.__training_forward(batch, batch_nb, opt_idx) - closure_loss, progress_bar_metrics, log_metrics = output + # forward pass + loss, model_specific_tqdm_metrics = self.__training_forward(batch, batch_nb, opt_idx) - # track progress bar metrics - self.__add_tqdm_metrics(progress_bar_metrics) + # track metrics + self.__add_tqdm_metrics(model_specific_tqdm_metrics) - all_log_metrics.append(log_metrics) + # accumulate loss + # (if accumulate_grad_batches = 1 no effect) + loss = loss / self.accumulate_grad_batches - # accumulate loss - # (if accumulate_grad_batches = 1 no effect) - closure_loss = closure_loss / self.accumulate_grad_batches - - # backward pass - if self.use_amp: - with amp.scale_loss(closure_loss, optimizer) as scaled_loss: - scaled_loss.backward() - else: - closure_loss.backward() - - # insert after step hook - if self.__is_function_implemented('on_after_backward'): - model_ref = self.__get_model() - model_ref.on_after_backward() - - return closure_loss + # backward pass + if self.use_amp: + with amp.scale_loss(loss, optimizer) as scaled_loss: + scaled_loss.backward() + else: + loss.backward() - # calculate loss - loss = optimizer_closure() + # insert after step hook + if self.__is_function_implemented('on_after_backward'): + model_ref = self.__get_model() + model_ref.on_after_backward() # nan grads if self.print_nan_grads: @@ -1381,15 +1325,14 @@ def optimizer_closure(): # calls .step(), .zero_grad() # override function to modify this behavior model = self.__get_model() - model.optimizer_step(self.current_epoch, batch_nb, - optimizer, opt_idx, optimizer_closure) + model.optimizer_step(self.current_epoch, batch_nb, optimizer, opt_idx) # calculate running loss for display self.running_loss.append(self.batch_loss_value) self.batch_loss_value = 0 self.avg_loss = np.mean(self.running_loss[-100:]) - # update progress bar + # update progressbar if self.show_progress_bar: # add model specific metrics tqdm_metrics = self.__training_tqdm_dict @@ -1400,10 +1343,7 @@ def optimizer_closure(): model = self.__get_model() model.on_batch_end() - # collapse all metrics into one dict - all_log_metrics = {k: v for d in all_log_metrics for k, v in d.items()} - - return 0, grad_norm_dic, all_log_metrics + return 0, grad_norm_dic def __run_evaluation(self, test=False): # when testing make sure user defined a test step @@ -1438,19 +1378,11 @@ def __run_evaluation(self, test=False): if self.fast_dev_run: max_batches = 1 - # run evaluation - eval_results = self.evaluate(self.model, - dataloaders, - max_batches, - test) - - _, progress_bar_metrics, log_metrics = self.__process_output(eval_results) - - # add metrics to prog bar - self.__add_tqdm_metrics(progress_bar_metrics) - - # log metrics - self.__log_metrics(log_metrics, {}) + eval_out_metrics = self.evaluate(self.model, + dataloaders, + max_batches, + test) + self.__add_tqdm_metrics(eval_out_metrics) # hook model.on_post_performance_check() @@ -1460,6 +1392,11 @@ def __run_evaluation(self, test=False): tqdm_metrics = self.__training_tqdm_dict self.progress_bar.set_postfix(**tqdm_metrics) + # reduce learning rate based on metrics + if self.lr_scheduler_callback is not None and not test: + self.lr_scheduler_callback.on_epoch_end(epoch=self.current_epoch, + logs=self.__training_tqdm_dict) + # model checkpointing if self.proc_rank == 0 and self.checkpoint_callback is not None and not test: print('save callback...') From 682726298f9feed3c7dae01ed9744aa525c4d2d2 Mon Sep 17 00:00:00 2001 From: Mariia Trofimova Date: Sun, 6 Oct 2019 23:05:51 +0300 Subject: [PATCH 03/12] feat: only on unsupported lr schedulers --- pytorch_lightning/trainer/trainer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pytorch_lightning/trainer/trainer.py b/pytorch_lightning/trainer/trainer.py index 305d48c0fb4f7..f91ca19483e31 100644 --- a/pytorch_lightning/trainer/trainer.py +++ b/pytorch_lightning/trainer/trainer.py @@ -1038,7 +1038,7 @@ def __train(self): # update LR schedulers if self.lr_schedulers is not None: for lr_scheduler in self.lr_schedulers: - lr_scheduler.step(self.current_epoch) + lr_scheduler.step(epoch=self.current_epoch) # early stopping met_min_epochs = epoch_nb > self.min_nb_epochs From d232a7c07edbb63191330228a79309c038a7829b Mon Sep 17 00:00:00 2001 From: Mariia Trofimova Date: Wed, 9 Oct 2019 16:29:06 +0300 Subject: [PATCH 04/12] feat: last but not the least merge of master --- pytorch_lightning/trainer/trainer.py | 351 ++++++++++++++++----------- 1 file changed, 207 insertions(+), 144 deletions(-) diff --git a/pytorch_lightning/trainer/trainer.py b/pytorch_lightning/trainer/trainer.py index 06ebf60f434d8..74c868316bb5a 100644 --- a/pytorch_lightning/trainer/trainer.py +++ b/pytorch_lightning/trainer/trainer.py @@ -15,7 +15,7 @@ from torch.optim.optimizer import Optimizer from pytorch_lightning.root_module.root_module import LightningModule -from pytorch_lightning.root_module.memory import get_gpu_memory_map +from pytorch_lightning.root_module import memory from pytorch_lightning.logging import TestTubeLogger from pytorch_lightning.trainer.trainer_io import TrainerIO from pytorch_lightning.pt_overrides.override_data_parallel import ( @@ -66,7 +66,7 @@ def __init__(self, process_position=0, nb_gpu_nodes=1, gpus=None, - log_gpu_memory=False, + log_gpu_memory=None, show_progress_bar=True, overfit_pct=0.0, track_grad_norm=-1, @@ -98,7 +98,7 @@ def __init__(self, :param process_position: shown in the tqdm bar :param nb_gpu_nodes: number of GPU nodes :param gpus: int. (ie: 2 gpus) OR list to specify which GPUs [0, 1] or '0,1' - :param log_gpu_memory: Bool. If true, adds memory logs + :param log_gpu_memory: str. None, 'min_max', 'all' :param show_progress_bar: Bool. If true shows tqdm bar :param overfit_pct: float. uses this much of all datasets :param track_grad_norm: int. -1 no tracking. Otherwise tracks that norm @@ -182,6 +182,7 @@ def __init__(self, if self.logger is None: self.logger = TestTubeLogger( save_dir=self.default_save_path, + version=self.slurm_job_id, name='lightning_logs' ) self.logger.rank = 0 @@ -243,6 +244,15 @@ def __init__(self, self.amp_level = amp_level self.__init_amp(use_amp) + @property + def slurm_job_id(self): + try: + job_id = os.environ['SLURM_JOB_ID'] + job_id = int(job_id) + except Exception as e: + job_id = None + return job_id + def __configure_weights_path(self, checkpoint_callback, weights_save_path): """ Weight path set in this priority: @@ -278,6 +288,8 @@ def __init_amp(self, use_amp): raise ModuleNotFoundError(msg) def __configure_accumulated_gradients(self, accumulate_grad_batches): + self.accumulate_grad_batches = None + if isinstance(accumulate_grad_batches, dict): self.accumulation_scheduler = GradientAccumulationScheduler(accumulate_grad_batches) elif isinstance(accumulate_grad_batches, int): @@ -427,7 +439,7 @@ def __set_nvidia_flags(self, is_slurm_managing_tasks, data_parallel_device_ids): @property def data_parallel(self): - return self.use_dp or self.use_ddp + return self.use_dp or self.use_ddp or self.use_ddp2 def __determine_data_use_amount(self, train_percent_check, val_percent_check, test_percent_check, overfit_pct): @@ -527,7 +539,7 @@ def __evaluation_forward(self, model, batch, batch_idx, dataloader_idx, test=Fal args.append(dataloader_idx) # handle DP, DDP forward - if self.use_ddp or self.use_dp: + if self.use_ddp or self.use_dp or self.use_ddp2: output = model(*args) return output @@ -554,7 +566,6 @@ def evaluate(self, model, dataloaders, max_batches, test=False): :param model: PT model :param dataloaders: list of PT dataloaders :param max_batches: Scalar - :param dataloader_idx: :param test: boolean :return: """ @@ -583,7 +594,10 @@ def evaluate(self, model, dataloaders, max_batches, test=False): # ----------------- # RUN EVALUATION STEP # ----------------- - output = self.__evaluation_forward(model, batch, batch_idx, dataloader_idx, + output = self.__evaluation_forward(model, + batch, + batch_idx, + dataloader_idx, test) # track outputs for collation @@ -625,96 +639,100 @@ def get_dataloaders(self, model): self.get_test_dataloaders = model.test_dataloader self.get_val_dataloaders = model.val_dataloader - if self.use_ddp and not isinstance(self.get_train_dataloader().sampler, DistributedSampler): - msg = """ - You're using multiple gpus and multiple nodes without using a DistributedSampler - to assign a subset of your data to each process. To silence this warning, pass a - DistributedSampler to your DataLoader. - - ie: this: - dataset = myDataset() - dataloader = Dataloader(dataset) - - becomes: - dataset = myDataset() - dist_sampler = torch.utils.data.distributed.DistributedSampler(dataset) - dataloader = Dataloader(dataset, sampler=dist_sampler) - - If you want each process to load the full dataset, ignore this warning. - """ - warnings.warn(msg) - - if self.use_ddp and self.get_val_dataloaders is not None: - for dataloader in self.get_val_dataloaders(): - if not isinstance(dataloader.sampler, DistributedSampler): - msg = """ - Your val_dataloader(s) don't use DistributedSampler. - You're using multiple gpus and multiple nodes without using a DistributedSampler - to assign a subset of your data to each process. To silence this warning, pass a - DistributedSampler to your DataLoader. - - ie: this: - dataset = myDataset() - dataloader = Dataloader(dataset) - - becomes: - dataset = myDataset() - dist_sampler = torch.utils.data.distributed.DistributedSampler(dataset) - dataloader = Dataloader(dataset, sampler=dist_sampler) - - If you want each process to load the full dataset, ignore this warning. - """ - warnings.warn(msg) - break + # call warnings from proc zero only which triggers dataloaders + # if those have to download data it will only happen on proc 0 + if self.proc_rank == 0: + on_ddp = self.use_ddp or self.use_ddp2 + if on_ddp and not isinstance(self.get_train_dataloader().sampler, DistributedSampler): + msg = """ + You're using multiple gpus and multiple nodes without using a DistributedSampler + to assign a subset of your data to each process. To silence this warning, pass a + DistributedSampler to your DataLoader. + + ie: this: + dataset = myDataset() + dataloader = Dataloader(dataset) + + becomes: + dataset = myDataset() + dist_sampler = torch.utils.data.distributed.DistributedSampler(dataset) + dataloader = Dataloader(dataset, sampler=dist_sampler) + + If you want each process to load the full dataset, ignore this warning. + """ + warnings.warn(msg) - if self.use_ddp and self.get_test_dataloaders is not None: - for dataloader in self.get_test_dataloaders(): - if not isinstance(dataloader.sampler, DistributedSampler): - msg = """ - Your test_dataloader(s) don't use DistributedSampler. - You're using multiple gpus and multiple nodes without using a DistributedSampler - to assign a subset of your data to each process. To silence this warning, pass a - DistributedSampler to your DataLoader. - - ie: this: - dataset = myDataset() - dataloader = Dataloader(dataset) - - becomes: - dataset = myDataset() - dist_sampler = torch.utils.data.distributed.DistributedSampler(dataset) - dataloader = Dataloader(dataset, sampler=dist_sampler) - - If you want each process to load the full dataset, ignore this warning. - """ - warnings.warn(msg) - break + if on_ddp and self.get_val_dataloaders() is not None: + for dataloader in self.get_val_dataloaders(): + if not isinstance(dataloader.sampler, DistributedSampler): + msg = """ + Your val_dataloader(s) don't use DistributedSampler. + + You're using multiple gpus and multiple nodes without using a + DistributedSampler to assign a subset of your data to each process. + To silence this warning, pass a DistributedSampler to your DataLoader. + + ie: this: + dataset = myDataset() + dataloader = Dataloader(dataset) + + becomes: + dataset = myDataset() + dist_sampler = torch.utils.data.distributed.DistributedSampler(dataset) + dataloader = Dataloader(dataset, sampler=dist_sampler) + + If you want each process to load the full dataset, ignore this warning. + """ + warnings.warn(msg) + break + + if on_ddp and self.get_test_dataloaders() is not None: + for dataloader in self.get_test_dataloaders(): + if not isinstance(dataloader.sampler, DistributedSampler): + msg = """ + Your test_dataloader(s) don't use DistributedSampler. + + You're using multiple gpus and multiple nodes without using a + DistributedSampler to assign a subset of your data to each process. + To silence this warning, pass a DistributedSampler to your DataLoader. + + ie: this: + dataset = myDataset() + dataloader = Dataloader(dataset) + + becomes: + dataset = myDataset() + dist_sampler = torch.utils.data.distributed.DistributedSampler(dataset) + dataloader = Dataloader(dataset, sampler=dist_sampler) + + If you want each process to load the full dataset, ignore this warning. + """ + warnings.warn(msg) + break + + if self.use_ddp or self.use_ddp2: + # wait for all processes to catch up + dist.barrier() + + # load each dataloader + self.get_train_dataloader() + self.get_test_dataloaders() + self.get_val_dataloaders() # ----------------------------- # MODEL TRAINING # ----------------------------- def fit(self, model): # when using multi-node or DDP within a node start each module in a separate process - if self.use_ddp: + if self.use_ddp2: + task = int(os.environ['SLURM_LOCALID']) + self.ddp_train(task, model) - if self.use_ddp2: - task = int(os.environ['SLURM_LOCALID']) - self.ddp_train(task, model) - - elif self.is_slurm_managing_tasks: + elif self.use_ddp: + if self.is_slurm_managing_tasks: task = int(os.environ['SLURM_LOCALID']) self.ddp_train(task, model) else: - nb_gpus = self.nb_requested_gpus - nb_tasks = self.nb_slurm_tasks - msg = f""" - You requested {nb_gpus}s GPUs but launched {nb_tasks}s slurm tasks. - We will launch {nb_gpus}s processes for you. - We recommend you let slurm manage the processes by setting: - --ntasks-per-node={nb_gpus}s - If you're not using SLURM, ignore this message! - """ - warnings.warn(msg) mp.spawn(self.ddp_train, nprocs=self.num_gpus, args=(model, )) # 1 gpu or dp option triggers training using DP module @@ -904,12 +922,25 @@ def __init_tcp_connection(self): :param tries: :return: """ - # sets the appropriate port + + # use slurm job id for the port number + # guarantees unique ports across jobs from same grid search + try: + # use the last 4 numbers in the job id as the id + default_port = os.environ['SLURM_JOB_ID'] + default_port = default_port[-4:] + + # all ports should be in the 10k+ range + default_port = int(default_port) + 15000 + + except Exception as e: + default_port = 12910 + + # if user gave a port number, use that one instead try: - port = os.environ['MASTER_PORT'] + default_port = os.environ['MASTER_PORT'] except Exception: - port = 12910 - os.environ['MASTER_PORT'] = str(port) + os.environ['MASTER_PORT'] = str(default_port) # figure out the root node addr try: @@ -919,7 +950,6 @@ def __init_tcp_connection(self): root_node = self.resolve_root_node_address(root_node) os.environ['MASTER_ADDR'] = root_node - dist.init_process_group("nccl", rank=self.proc_rank, world_size=self.world_size) def resolve_root_node_address(self, root_node): @@ -1083,7 +1113,8 @@ def run_training_epoch(self): # --------------- # RUN TRAIN STEP # --------------- - batch_result, grad_norm_dic = self.__run_training_batch(batch, batch_nb) + output = self.__run_training_batch(batch, batch_nb) + batch_result, grad_norm_dic, batch_step_metrics = output early_stop_epoch = batch_result == -1 # --------------- @@ -1102,29 +1133,9 @@ def run_training_epoch(self): # when metrics should be logged if batch_nb % self.row_log_interval == 0 or early_stop_epoch: - # count items in memory - # nb_params, nb_tensors = count_mem_items() - - model = self.__get_model() - metrics = self.__training_tqdm_dict - # add gpu memory - if self.on_gpu and self.log_gpu_memory: - mem_map = get_gpu_memory_map() - metrics.update(mem_map) - - # add norms - metrics.update(grad_norm_dic) - - if self.__is_function_implemented('on_training_metrics'): - model.on_training_metrics(metrics) - - # log metrics - scalar_metrics = self.__metrics_to_scalars( - metrics, blacklist=self.__log_vals_blacklist()) - if self.proc_rank == 0 and self.logger is not None: - self.logger.log_metrics(scalar_metrics, step_num=self.global_step) - self.logger.save() + # logs user requested information to logger + self.__log_metrics(batch_step_metrics, grad_norm_dic) # end epoch early if early_stop_epoch: @@ -1135,6 +1146,32 @@ def run_training_epoch(self): model = self.__get_model() model.on_epoch_end() + def __log_metrics(self, metrics, grad_norm_dic): + """ + Logs the metric dict passed in + :param metrics: + :param grad_norm_dic: + :return: + """ + # added metrics by Lightning for convenience + metrics['epoch'] = self.current_epoch + + # add gpu memory + if self.on_gpu and self.log_gpu_memory: + mem_map = memory.get_memory_profile(self.log_gpu_memory) + metrics.update(mem_map) + + # add norms + metrics.update(grad_norm_dic) + + # turn all tensors to scalars + scalar_metrics = self.__metrics_to_scalars(metrics) + + # log actual metrics + if self.proc_rank == 0 and self.logger is not None: + self.logger.log_metrics(scalar_metrics, step_num=self.global_step) + self.logger.save() + def test(self, model=None): if model is not None: self.testing = True @@ -1142,7 +1179,7 @@ def test(self, model=None): else: self.__run_evaluation(test=True) - def __metrics_to_scalars(self, metrics, blacklist=set()): + def __metrics_to_scalars(self, metrics): new_metrics = {} for k, v in metrics.items(): if isinstance(v, torch.Tensor): @@ -1158,6 +1195,7 @@ def __metrics_to_scalars(self, metrics, blacklist=set()): def __log_vals_blacklist(self): """avoid logging some vals lightning uses to maintain state""" blacklist = {'batch_nb', 'v_nb', 'gpu'} + return blacklist def transfer_batch_to_gpu(self, batch, gpu_id): # base case: object can be directly moved using `cuda` or `to` @@ -1205,7 +1243,7 @@ def __training_forward(self, batch, batch_nb, opt_idx): if len(self.optimizers) > 1: args.append(opt_idx) - if self.use_ddp: + if self.use_ddp or self.use_ddp2: output = self.model(*args) elif self.use_dp: output = self.model(*args) @@ -1239,35 +1277,50 @@ def __process_output(self, output, train=False): callback_metrics[k] = v try: - progress_output = output['progress'] + progress_output = output['progress_bar'] # reduce progress metrics for tqdm when using dp - if self.use_dp or self.use_ddp2: + if train and self.use_dp or self.use_ddp2: nb_gpus = self.num_gpus progress_output = reduce_distributed_output(progress_output, nb_gpus) - model_specific_tqdm_metrics_dic = progress_output + progress_bar_metrics = progress_output + except Exception: + progress_bar_metrics = {} + + # extract metrics to log to experiment + try: + log_output = output['log'] + + # reduce progress metrics for tqdm when using dp + if train and self.use_dp or self.use_ddp2: + nb_gpus = self.num_gpus + log_output = reduce_distributed_output(log_output, nb_gpus) + + log_metrics = log_output except Exception: - model_specific_tqdm_metrics_dic = {} + log_metrics = {} # --------------- # EXTRACT LOSS # --------------- # if output dict doesn't have the keyword loss # then assume the output=loss if scalar - try: - loss = output['loss'] - except Exception: - if type(output) is torch.Tensor: - loss = output - else: - raise RuntimeError( - 'No `loss` value in the dictionary returned from `model.training_step()`.' - ) + loss = None + if train: + try: + loss = output['loss'] + except Exception: + if type(output) is torch.Tensor: + loss = output + else: + raise RuntimeError( + 'No `loss` value in the dictionary returned from `model.training_step()`.' + ) - # when using dp need to reduce the loss - if self.use_dp or self.use_ddp2: - loss = reduce_distributed_output(loss, self.num_gpus) + # when using dp need to reduce the loss + if self.use_dp or self.use_ddp2: + loss = reduce_distributed_output(loss, self.num_gpus) return loss, progress_bar_metrics, log_metrics, callback_metrics @@ -1322,17 +1375,26 @@ def optimizer_closure(): self.__add_tqdm_metrics(progress_bar_metrics) all_log_metrics.append(log_metrics) - # backward pass - if self.use_amp: - with amp.scale_loss(loss, optimizer) as scaled_loss: - scaled_loss.backward() - else: - loss.backward() + # accumulate loss + # (if accumulate_grad_batches = 1 no effect) + closure_loss = closure_loss / self.accumulate_grad_batches + + # backward pass + if self.use_amp: + with amp.scale_loss(closure_loss, optimizer) as scaled_loss: + scaled_loss.backward() + else: + closure_loss.backward() + + # insert after step hook + if self.__is_function_implemented('on_after_backward'): + model_ref = self.__get_model() + model_ref.on_after_backward() + + return closure_loss - # insert after step hook - if self.__is_function_implemented('on_after_backward'): - model_ref = self.__get_model() - model_ref.on_after_backward() + # calculate loss + loss = optimizer_closure() # nan grads if self.print_nan_grads: @@ -1356,14 +1418,15 @@ def optimizer_closure(): # calls .step(), .zero_grad() # override function to modify this behavior model = self.__get_model() - model.optimizer_step(self.current_epoch, batch_nb, optimizer, opt_idx) + model.optimizer_step(self.current_epoch, batch_nb, + optimizer, opt_idx, optimizer_closure) # calculate running loss for display self.running_loss.append(self.batch_loss_value) self.batch_loss_value = 0 self.avg_loss = np.mean(self.running_loss[-100:]) - # update progressbar + # update progress bar if self.show_progress_bar: # add model specific metrics tqdm_metrics = self.__training_tqdm_dict From 26844acc4f11a55ed39d422c306559226991585f Mon Sep 17 00:00:00 2001 From: Mariia Trofimova Date: Sun, 20 Oct 2019 12:30:07 +0300 Subject: [PATCH 05/12] feat: merge master --- pytorch_lightning/trainer/trainer.py | 24 ++++++++++-------------- 1 file changed, 10 insertions(+), 14 deletions(-) diff --git a/pytorch_lightning/trainer/trainer.py b/pytorch_lightning/trainer/trainer.py index cc54f629ed415..9c1a29fc38381 100644 --- a/pytorch_lightning/trainer/trainer.py +++ b/pytorch_lightning/trainer/trainer.py @@ -190,7 +190,7 @@ def __init__(self, else: self.early_stop_callback = early_stop_callback self.enable_early_stop = True - self.lr_scheduler_callback = None + self.val_loss_drop_lr_callback = None # configure logger if logger is True: @@ -794,7 +794,7 @@ def init_optimizers(self, optimizers): # two lists elif len(optimizers) == 2 and isinstance(optimizers[0], list): optimizers, lr_schedulers = optimizers - lr_schedulers = self.configure_schedulers(lr_schedulers) + lr_schedulers, self.val_loss_drop_lr_callback = self.configure_schedulers(lr_schedulers) return optimizers, lr_schedulers # single list or tuple @@ -802,16 +802,12 @@ def init_optimizers(self, optimizers): return optimizers, [] def configure_schedulers(self, schedulers): - custom_schedulers = [] - i = 0 - while i < len(schedulers): + for i in range(len(schedulers)): if isinstance(schedulers[i], torch.optim.lr_scheduler.ReduceLROnPlateau): - custom_schedulers.append(schedulers.pop(i)) - i += 1 - if custom_schedulers: - self.lr_scheduler_callback = ReduceLROnPlateauScheduler(custom_schedulers, - monitor='val_loss') - return schedulers + val_loss_drop_lr_callback = ReduceLROnPlateauScheduler(schedulers.pop(i), + monitor='val_loss') + return schedulers, val_loss_drop_lr_callback + return schedulers, None def __single_gpu_train(self, model): # CHOOSE OPTIMIZER @@ -1559,9 +1555,9 @@ def __run_evaluation(self, test=False): self.progress_bar.set_postfix(**tqdm_metrics) # reduce learning rate based on metrics - if self.lr_scheduler_callback is not None and not test: - self.lr_scheduler_callback.on_epoch_end(epoch=self.current_epoch, - logs=self.__training_tqdm_dict) + if self.val_loss_drop_lr_callback is not None and not test: + self.val_loss_drop_lr_callback.on_epoch_end(epoch=self.current_epoch, + logs=callback_metrics) # model checkpointing if self.proc_rank == 0 and self.checkpoint_callback is not None and not test: From cae83caff413f6ab96496b89d48348ac052d0ba7 Mon Sep 17 00:00:00 2001 From: Mariia Trofimova Date: Sun, 20 Oct 2019 12:29:19 +0300 Subject: [PATCH 06/12] feat: support only on scheduler in reduceLrOnPlateauScheduler --- pytorch_lightning/callbacks/pt_callbacks.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/pytorch_lightning/callbacks/pt_callbacks.py b/pytorch_lightning/callbacks/pt_callbacks.py index 89b3eb09f349e..5016cef089da9 100644 --- a/pytorch_lightning/callbacks/pt_callbacks.py +++ b/pytorch_lightning/callbacks/pt_callbacks.py @@ -156,11 +156,11 @@ class ReduceLROnPlateauScheduler(Callback): monitor: quantity to be monitored. """ - def __init__(self, schedulers, monitor='val_loss'): + def __init__(self, scheduler, monitor='val_loss'): super(ReduceLROnPlateauScheduler, self).__init__() self.monitor = monitor - self.schedulers = schedulers + self.scheduler = scheduler def on_epoch_end(self, epoch, logs=None): current = logs.get(self.monitor) @@ -171,8 +171,7 @@ def on_epoch_end(self, epoch, logs=None): (self.monitor, ','.join(list(logs.keys()))), RuntimeWarning) exit(-1) - for scheduler in self.schedulers: - scheduler.step(current, epoch=epoch) + self.scheduler.step(current, epoch=epoch) class ModelCheckpoint(Callback): From 78b975bee6d6c04e9cbfb35aee4ba6fece532fb3 Mon Sep 17 00:00:00 2001 From: Mariia Trofimova Date: Thu, 7 Nov 2019 12:41:58 +0300 Subject: [PATCH 07/12] refactor: code style --- pytorch_lightning/trainer/train_loop_mixin.py | 5 ++--- pytorch_lightning/trainer/trainer.py | 2 +- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/pytorch_lightning/trainer/train_loop_mixin.py b/pytorch_lightning/trainer/train_loop_mixin.py index 593430c1d489d..42398a8e0c55a 100644 --- a/pytorch_lightning/trainer/train_loop_mixin.py +++ b/pytorch_lightning/trainer/train_loop_mixin.py @@ -1,5 +1,4 @@ import numpy as np -import tqdm try: from apex import amp @@ -67,8 +66,8 @@ def train(self): val_loss = self.callback_metrics.get('val_loss') if val_loss is None: print('ReduceLROnPlateau conditioned on metric `%s` ' - 'which is not available. Available metrics are: %s' % - ('val_loss', ','.join(list(self.callback_metrics.keys()))), RuntimeWarning) + 'which is not available. Available metrics are: %s' % + ('val_loss', ','.join(list(self.callback_metrics.keys()))), RuntimeWarning) exit(-1) self.reduce_lr_on_plateau_scheduler.step(val_loss, epoch=self.current_epoch) diff --git a/pytorch_lightning/trainer/trainer.py b/pytorch_lightning/trainer/trainer.py index 922f3ff0c9041..b8d2e4e7ae1d1 100644 --- a/pytorch_lightning/trainer/trainer.py +++ b/pytorch_lightning/trainer/trainer.py @@ -185,7 +185,7 @@ def __init__(self, self.early_stop_callback = None self.configure_early_stopping(early_stop_callback, logger) - self.reduce_lr_on_plateau_scheduler= None + self.reduce_lr_on_plateau_scheduler = None # configure checkpoint callback self.checkpoint_callback = checkpoint_callback From c9d561857fdbf4d83076f5256ba4e64da9ec751a Mon Sep 17 00:00:00 2001 From: William Falcon Date: Sun, 1 Dec 2019 04:53:07 -0500 Subject: [PATCH 08/12] Update pt_callbacks.py --- pytorch_lightning/callbacks/pt_callbacks.py | 1 - 1 file changed, 1 deletion(-) diff --git a/pytorch_lightning/callbacks/pt_callbacks.py b/pytorch_lightning/callbacks/pt_callbacks.py index 03e9576adc2e5..c55a48849696f 100644 --- a/pytorch_lightning/callbacks/pt_callbacks.py +++ b/pytorch_lightning/callbacks/pt_callbacks.py @@ -315,7 +315,6 @@ def on_epoch_end(self, epoch, logs=None): f'\nEpoch {epoch:05d}: {self.monitor} reached', f'{current:0.5f} (best {self.best:0.5f}), saving model to', f'{filepath} as top {self.save_top_k}') - self.best = current self._save_model(filepath) else: From 75faae58e6dd9c31d44ce76de991b8695dd024a4 Mon Sep 17 00:00:00 2001 From: William Falcon Date: Sun, 1 Dec 2019 04:54:59 -0500 Subject: [PATCH 09/12] Update trainer.py --- pytorch_lightning/trainer/trainer.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pytorch_lightning/trainer/trainer.py b/pytorch_lightning/trainer/trainer.py index e87baa2198c89..ae78a4c1ec3dd 100644 --- a/pytorch_lightning/trainer/trainer.py +++ b/pytorch_lightning/trainer/trainer.py @@ -386,8 +386,8 @@ def init_optimizers(self, optimizers): return optimizers, [] def configure_schedulers(self, schedulers): - for i in range(len(schedulers)): - if isinstance(schedulers[i], torch.optim.lr_scheduler.ReduceLROnPlateau): + for i, scheduler for enumerate(schedulers): + if isinstance(scheduler, torch.optim.lr_scheduler.ReduceLROnPlateau): reduce_lr_on_plateau_scheduler = schedulers.pop(i) return schedulers, reduce_lr_on_plateau_scheduler return schedulers, None From 5465466ee7ffeef23ce1f17f98132a6687a492c0 Mon Sep 17 00:00:00 2001 From: William Falcon Date: Sun, 1 Dec 2019 05:02:28 -0500 Subject: [PATCH 10/12] Update train_loop_mixin.py --- pytorch_lightning/trainer/train_loop_mixin.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/pytorch_lightning/trainer/train_loop_mixin.py b/pytorch_lightning/trainer/train_loop_mixin.py index e7f2b2c48e185..8ede30c44c887 100644 --- a/pytorch_lightning/trainer/train_loop_mixin.py +++ b/pytorch_lightning/trainer/train_loop_mixin.py @@ -151,6 +151,8 @@ def training_step(self, batch, batch_nb): import numpy as np +from pytorch_lightning.utilities.debugging import MisconfigurationException + try: from apex import amp @@ -216,10 +218,10 @@ def train(self): if self.reduce_lr_on_plateau_scheduler is not None: val_loss = self.callback_metrics.get('val_loss') if val_loss is None: - print('ReduceLROnPlateau conditioned on metric `%s` ' - 'which is not available. Available metrics are: %s' % - ('val_loss', ','.join(list(self.callback_metrics.keys()))), RuntimeWarning) - exit(-1) + avail_metrics = ','.join(list(self.callback_metrics.keys())) + m = f'ReduceLROnPlateau conditioned on metric val_loss' \ + 'which is not available. Available metrics are: {avail_metrics}' + raise MisconfigurationException(m) self.reduce_lr_on_plateau_scheduler.step(val_loss, epoch=self.current_epoch) # early stopping From d16a6c04ce89b970478e3d97e0f685582defe7d0 Mon Sep 17 00:00:00 2001 From: William Falcon Date: Sun, 1 Dec 2019 05:11:34 -0500 Subject: [PATCH 11/12] Update trainer.py --- pytorch_lightning/trainer/trainer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pytorch_lightning/trainer/trainer.py b/pytorch_lightning/trainer/trainer.py index ae78a4c1ec3dd..b783e4406aac0 100644 --- a/pytorch_lightning/trainer/trainer.py +++ b/pytorch_lightning/trainer/trainer.py @@ -386,7 +386,7 @@ def init_optimizers(self, optimizers): return optimizers, [] def configure_schedulers(self, schedulers): - for i, scheduler for enumerate(schedulers): + for i, scheduler in enumerate(schedulers): if isinstance(scheduler, torch.optim.lr_scheduler.ReduceLROnPlateau): reduce_lr_on_plateau_scheduler = schedulers.pop(i) return schedulers, reduce_lr_on_plateau_scheduler From 7650fa4640dd39dabe00558e830d5267b81d7527 Mon Sep 17 00:00:00 2001 From: William Falcon Date: Sun, 1 Dec 2019 05:12:32 -0500 Subject: [PATCH 12/12] Update train_loop_mixin.py --- pytorch_lightning/trainer/train_loop_mixin.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pytorch_lightning/trainer/train_loop_mixin.py b/pytorch_lightning/trainer/train_loop_mixin.py index 8ede30c44c887..5ee7ae06890ba 100644 --- a/pytorch_lightning/trainer/train_loop_mixin.py +++ b/pytorch_lightning/trainer/train_loop_mixin.py @@ -219,8 +219,8 @@ def train(self): val_loss = self.callback_metrics.get('val_loss') if val_loss is None: avail_metrics = ','.join(list(self.callback_metrics.keys())) - m = f'ReduceLROnPlateau conditioned on metric val_loss' \ - 'which is not available. Available metrics are: {avail_metrics}' + m = f'ReduceLROnPlateau conditioned on metric val_loss ' \ + f'which is not available. Available metrics are: {avail_metrics}' raise MisconfigurationException(m) self.reduce_lr_on_plateau_scheduler.step(val_loss, epoch=self.current_epoch)