From 7f63af5dbb99b99ddefb7c271743fbf1e673f6f8 Mon Sep 17 00:00:00 2001 From: William Falcon Date: Thu, 18 Jun 2020 22:56:10 -0400 Subject: [PATCH 01/10] remove barriers --- pytorch_lightning/trainer/trainer.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pytorch_lightning/trainer/trainer.py b/pytorch_lightning/trainer/trainer.py index 36664b55498bf..c037a489d7ff4 100644 --- a/pytorch_lightning/trainer/trainer.py +++ b/pytorch_lightning/trainer/trainer.py @@ -857,7 +857,7 @@ def fit( model.prepare_data() self._is_data_prepared = True - self.barrier('fit_prepare_data') + # no barrier needed because all non-download processes will wait at init distributed (the barrier) self.setup('fit') if self.is_function_implemented('setup', model): @@ -1153,7 +1153,7 @@ def test( if self.is_function_implemented('setup', model_ref): model_ref.setup('test') - self.barrier('test_setup') + # no barrier needed because all processes will catch up once the distributed group boots if model is None and ckpt_path == 'best' and self.checkpoint_callback.save_top_k <= 0: raise MisconfigurationException( From 9ea025d5d29bc68cc952c886ffad12d49fd03825 Mon Sep 17 00:00:00 2001 From: William Falcon Date: Thu, 18 Jun 2020 22:56:42 -0400 Subject: [PATCH 02/10] remove barriers --- pytorch_lightning/trainer/trainer.py | 8 -------- 1 file changed, 8 deletions(-) diff --git a/pytorch_lightning/trainer/trainer.py b/pytorch_lightning/trainer/trainer.py index c037a489d7ff4..27f7b23767113 100644 --- a/pytorch_lightning/trainer/trainer.py +++ b/pytorch_lightning/trainer/trainer.py @@ -1256,14 +1256,6 @@ def check_model_configuration(self, model: LightningModule): raise MisconfigurationException('You have defined `test_step()` but did not' ' implement `test_dataloader` nor passed in `.test(test_dataloader)`.') - def barrier(self, name): - if self.use_ddp or self.use_ddp2: - torch_distrib.barrier() - - if self.on_tpu and XLA_AVAILABLE: - # wait for all processes to catch up - torch_xla.core.xla_model.rendezvous(f'pl.Trainer.{name}') - class _PatchDataLoader(object): r""" From 867a0879a192e7e3cf65b934c01cc76b5bb8009e Mon Sep 17 00:00:00 2001 From: William Falcon Date: Thu, 18 Jun 2020 23:03:57 -0400 Subject: [PATCH 03/10] remove barriers --- pytorch_lightning/trainer/trainer.py | 2 -- pytorch_lightning/trainer/training_loop.py | 6 ++++++ 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/pytorch_lightning/trainer/trainer.py b/pytorch_lightning/trainer/trainer.py index 27f7b23767113..d3f2f2c0e0b3c 100644 --- a/pytorch_lightning/trainer/trainer.py +++ b/pytorch_lightning/trainer/trainer.py @@ -857,8 +857,6 @@ def fit( model.prepare_data() self._is_data_prepared = True - # no barrier needed because all non-download processes will wait at init distributed (the barrier) - self.setup('fit') if self.is_function_implemented('setup', model): model.setup('fit') diff --git a/pytorch_lightning/trainer/training_loop.py b/pytorch_lightning/trainer/training_loop.py index 54b8c0271582d..3400a312f962b 100644 --- a/pytorch_lightning/trainer/training_loop.py +++ b/pytorch_lightning/trainer/training_loop.py @@ -153,6 +153,7 @@ def training_step(self, batch, batch_idx): import numpy as np import torch from torch.utils.data import DataLoader +import torch.distributed as torch_distrib from pytorch_lightning import _logger as log from pytorch_lightning.callbacks.base import Callback @@ -701,6 +702,11 @@ def _get_optimizers_iterable(self): def run_training_teardown(self): if hasattr(self, '_teardown_already_run') and self._teardown_already_run: return + + # clean up dist group + if self.use_ddp or self.use_ddp2: + torch_distrib.destroy_process_group() + # Train end events with self.profiler.profile('on_train_end'): # callbacks From f88114b46a4c93081478fa1a310f8ad2c7d64987 Mon Sep 17 00:00:00 2001 From: William Falcon Date: Thu, 18 Jun 2020 23:29:01 -0400 Subject: [PATCH 04/10] remove barriers --- docs/source/trainer.rst | 2 +- pytorch_lightning/core/hooks.py | 2 +- .../trainer/distrib_data_parallel.py | 5 ++++ pytorch_lightning/trainer/distrib_parts.py | 19 +++++++++++++ pytorch_lightning/trainer/trainer.py | 28 +++++++++++++------ 5 files changed, 46 insertions(+), 10 deletions(-) diff --git a/docs/source/trainer.rst b/docs/source/trainer.rst index 393650ccebeec..978eaec554e2a 100644 --- a/docs/source/trainer.rst +++ b/docs/source/trainer.rst @@ -9,7 +9,7 @@ Trainer :exclude-members: run_pretrain_routine, _abc_impl, - _Trainer_set_random_port, + set_random_port, _Trainer__set_root_gpu, _Trainer__init_optimizers, _Trainer__parse_gpu_ids, diff --git a/pytorch_lightning/core/hooks.py b/pytorch_lightning/core/hooks.py index a4f52711f972e..e4c5e67d851b9 100644 --- a/pytorch_lightning/core/hooks.py +++ b/pytorch_lightning/core/hooks.py @@ -22,7 +22,7 @@ def setup(self, stage: str): Called at the beginning of fit and test. Args: - stage: either 'fit' or 'test' + step: either 'fit' or 'test' """ def teardown(self, stage: str): diff --git a/pytorch_lightning/trainer/distrib_data_parallel.py b/pytorch_lightning/trainer/distrib_data_parallel.py index 68629dc8c178c..7d5b5002c864b 100644 --- a/pytorch_lightning/trainer/distrib_data_parallel.py +++ b/pytorch_lightning/trainer/distrib_data_parallel.py @@ -475,6 +475,11 @@ def ddp_train(self, process_idx, model, is_master=False, proc_offset=0): model.trainer = self model.init_ddp_connection(self.global_rank, self.world_size, self.is_slurm_managing_tasks) + # call setup after the ddp process has connected + self.setup() + if self.is_function_implemented('setup'): + model.setup() + # on world_size=0 let everyone know training is starting if self.is_global_zero: log.info('-' * 100) diff --git a/pytorch_lightning/trainer/distrib_parts.py b/pytorch_lightning/trainer/distrib_parts.py index 8807fd1cfc879..c298cc250143e 100644 --- a/pytorch_lightning/trainer/distrib_parts.py +++ b/pytorch_lightning/trainer/distrib_parts.py @@ -155,6 +155,11 @@ def __transfer_batch_to_device(self, batch: Any, device: torch.device): return move_data_to_device(batch, device) def single_gpu_train(self, model): + # call setup + self.setup('fit') + if self.is_function_implemented('setup', model): + model.setup('fit') + model.cuda(self.root_gpu) # CHOOSE OPTIMIZER @@ -171,6 +176,11 @@ def single_gpu_train(self, model): self.run_pretrain_routine(model) def tpu_train(self, tpu_core_idx, model): + # call setup after the ddp process has connected + self.setup('fit') + if self.is_function_implemented('setup', model): + model.setup('fit') + # put model on tpu self._device = xm.xla_device(self.tpu_id) if self.tpu_id is not None else xm.xla_device() model.to(self._device) @@ -205,6 +215,10 @@ def tpu_train(self, tpu_core_idx, model): self.save_spawn_weights(model) def dp_train(self, model): + # call setup after the ddp process has connected + self.setup('fit') + if self.is_function_implemented('setup', model): + model.setup('fit') # CHOOSE OPTIMIZER # allow for lr schedulers as well @@ -246,6 +260,11 @@ def dp_train(self, model): model.forward = model_autocast_original_forward def horovod_train(self, model): + # call setup after the ddp process has connected + self.setup() + if self.is_function_implemented('setup', model): + model.setup() + if torch.cuda.is_available() and self.on_gpu: # Horovod: pin GPU to local rank assert self.root_gpu == hvd.local_rank() diff --git a/pytorch_lightning/trainer/trainer.py b/pytorch_lightning/trainer/trainer.py index d3f2f2c0e0b3c..4783e9ec74bfc 100644 --- a/pytorch_lightning/trainer/trainer.py +++ b/pytorch_lightning/trainer/trainer.py @@ -857,10 +857,6 @@ def fit( model.prepare_data() self._is_data_prepared = True - self.setup('fit') - if self.is_function_implemented('setup', model): - model.setup('fit') - # Run auto batch size scaling if self.auto_scale_batch_size: if isinstance(self.auto_scale_batch_size, bool): @@ -895,19 +891,19 @@ def fit( self.ddp_train(task, model) elif self.distributed_backend == 'cpu_ddp': - self._set_random_port + self.set_random_port() self.model = model mp.spawn(self.ddp_train, nprocs=self.num_processes, args=(model,)) elif self.distributed_backend == 'ddp_spawn': - self._set_random_port + self.set_random_port() model.share_memory() # spin up peers mp.spawn(self.ddp_train, nprocs=self.num_processes, args=(model, )) elif self.distributed_backend == 'ddp': - self._set_random_port + self.set_random_port() self.spawn_ddp_children(model) # 1 gpu or dp option triggers training using DP module @@ -930,6 +926,9 @@ def fit( # track for predict self.model = model + # wait for all prepare data nodes to finish + self.barrier('setup') + # train if self.tpu_id is not None: self.tpu_train(self.tpu_id, model) @@ -946,6 +945,11 @@ def fit( if self.use_amp: raise MisconfigurationException('amp + cpu is not supported. Please use a GPU option') + # call setup after the ddp process has connected + self.setup('fit') + if self.is_function_implemented('setup', model): + model.setup('fit') + # CHOOSE OPTIMIZER # allow for lr schedulers as well self.optimizers, self.lr_schedulers, self.optimizer_frequencies = self.init_optimizers(model) @@ -1151,7 +1155,7 @@ def test( if self.is_function_implemented('setup', model_ref): model_ref.setup('test') - # no barrier needed because all processes will catch up once the distributed group boots + self.barrier('test_setup') if model is None and ckpt_path == 'best' and self.checkpoint_callback.save_top_k <= 0: raise MisconfigurationException( @@ -1254,6 +1258,14 @@ def check_model_configuration(self, model: LightningModule): raise MisconfigurationException('You have defined `test_step()` but did not' ' implement `test_dataloader` nor passed in `.test(test_dataloader)`.') + def barrier(self, name): + if self.use_ddp or self.use_ddp2: + torch_distrib.barrier() + + if self.on_tpu and XLA_AVAILABLE: + # wait for all processes to catch up + torch_xla.core.xla_model.rendezvous(f'pl.Trainer.{name}') + class _PatchDataLoader(object): r""" From d862fe7693f8ae852a306d0940407a055c13ece7 Mon Sep 17 00:00:00 2001 From: William Falcon Date: Thu, 18 Jun 2020 23:29:42 -0400 Subject: [PATCH 05/10] remove barriers --- pytorch_lightning/trainer/distrib_data_parallel.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pytorch_lightning/trainer/distrib_data_parallel.py b/pytorch_lightning/trainer/distrib_data_parallel.py index 7d5b5002c864b..67a3c6d317e42 100644 --- a/pytorch_lightning/trainer/distrib_data_parallel.py +++ b/pytorch_lightning/trainer/distrib_data_parallel.py @@ -477,7 +477,7 @@ def ddp_train(self, process_idx, model, is_master=False, proc_offset=0): # call setup after the ddp process has connected self.setup() - if self.is_function_implemented('setup'): + if self.is_function_implemented('setup', model): model.setup() # on world_size=0 let everyone know training is starting From 45a87f524abe47d375fb372865278b65bb347a29 Mon Sep 17 00:00:00 2001 From: William Falcon Date: Thu, 18 Jun 2020 23:31:00 -0400 Subject: [PATCH 06/10] remove barriers --- pytorch_lightning/core/hooks.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pytorch_lightning/core/hooks.py b/pytorch_lightning/core/hooks.py index e4c5e67d851b9..71b3600ce474f 100644 --- a/pytorch_lightning/core/hooks.py +++ b/pytorch_lightning/core/hooks.py @@ -22,7 +22,7 @@ def setup(self, stage: str): Called at the beginning of fit and test. Args: - step: either 'fit' or 'test' + stage: either 'fit' or 'test' """ def teardown(self, stage: str): @@ -30,7 +30,7 @@ def teardown(self, stage: str): Called at the end of fit and test. Args: - step: either 'fit' or 'test' + stage: either 'fit' or 'test' """ def on_fit_start(self): From 1011c9a60c4dec06371f84a3671e44b6e4f98115 Mon Sep 17 00:00:00 2001 From: William Falcon Date: Thu, 18 Jun 2020 23:32:41 -0400 Subject: [PATCH 07/10] remove barriers --- pytorch_lightning/trainer/distrib_parts.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pytorch_lightning/trainer/distrib_parts.py b/pytorch_lightning/trainer/distrib_parts.py index c298cc250143e..145afba576bce 100644 --- a/pytorch_lightning/trainer/distrib_parts.py +++ b/pytorch_lightning/trainer/distrib_parts.py @@ -261,9 +261,9 @@ def dp_train(self, model): def horovod_train(self, model): # call setup after the ddp process has connected - self.setup() + self.setup('fit') if self.is_function_implemented('setup', model): - model.setup() + model.setup('fit') if torch.cuda.is_available() and self.on_gpu: # Horovod: pin GPU to local rank From 6a9379be082762f733673b6bf2f82b3798d8041a Mon Sep 17 00:00:00 2001 From: William Falcon Date: Thu, 18 Jun 2020 23:45:31 -0400 Subject: [PATCH 08/10] remove barriers --- pytorch_lightning/trainer/distrib_data_parallel.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pytorch_lightning/trainer/distrib_data_parallel.py b/pytorch_lightning/trainer/distrib_data_parallel.py index 67a3c6d317e42..9702967c590d8 100644 --- a/pytorch_lightning/trainer/distrib_data_parallel.py +++ b/pytorch_lightning/trainer/distrib_data_parallel.py @@ -184,6 +184,10 @@ class TrainerDDPMixin(ABC): node_rank: int tpu_cores: int + @property + def is_function_implemented(self, *args) -> bool: + """Warning: this is just empty shell for code implemented in other class.""" + @property def is_global_zero(self) -> int: """Warning: this is just empty shell for code implemented in other class.""" From 499a2a73850d553e531455553db5a69375bbbba6 Mon Sep 17 00:00:00 2001 From: William Falcon Date: Thu, 18 Jun 2020 23:52:13 -0400 Subject: [PATCH 09/10] remove barriers --- pytorch_lightning/trainer/distrib_data_parallel.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pytorch_lightning/trainer/distrib_data_parallel.py b/pytorch_lightning/trainer/distrib_data_parallel.py index 9702967c590d8..8888751da83c5 100644 --- a/pytorch_lightning/trainer/distrib_data_parallel.py +++ b/pytorch_lightning/trainer/distrib_data_parallel.py @@ -185,7 +185,7 @@ class TrainerDDPMixin(ABC): tpu_cores: int @property - def is_function_implemented(self, *args) -> bool: + def is_function_implemented(self, *args): """Warning: this is just empty shell for code implemented in other class.""" @property From 75e9793ed584296e2bf215d20e3aede7e25602d9 Mon Sep 17 00:00:00 2001 From: William Falcon Date: Fri, 19 Jun 2020 00:28:38 -0400 Subject: [PATCH 10/10] remove barriers --- pytorch_lightning/trainer/distrib_data_parallel.py | 4 ---- pytorch_lightning/trainer/training_loop.py | 2 +- 2 files changed, 1 insertion(+), 5 deletions(-) diff --git a/pytorch_lightning/trainer/distrib_data_parallel.py b/pytorch_lightning/trainer/distrib_data_parallel.py index 8888751da83c5..67a3c6d317e42 100644 --- a/pytorch_lightning/trainer/distrib_data_parallel.py +++ b/pytorch_lightning/trainer/distrib_data_parallel.py @@ -184,10 +184,6 @@ class TrainerDDPMixin(ABC): node_rank: int tpu_cores: int - @property - def is_function_implemented(self, *args): - """Warning: this is just empty shell for code implemented in other class.""" - @property def is_global_zero(self) -> int: """Warning: this is just empty shell for code implemented in other class.""" diff --git a/pytorch_lightning/trainer/training_loop.py b/pytorch_lightning/trainer/training_loop.py index 3400a312f962b..ce805161ce47f 100644 --- a/pytorch_lightning/trainer/training_loop.py +++ b/pytorch_lightning/trainer/training_loop.py @@ -259,7 +259,7 @@ def get_model(self) -> LightningModule: """Warning: this is just empty shell for code implemented in other class.""" @abstractmethod - def is_function_implemented(self, *args): + def is_function_implemented(self, *args, **kwargs): """Warning: this is just empty shell for code implemented in other class.""" @abstractmethod