-
Notifications
You must be signed in to change notification settings - Fork 3.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix ddp tests + .test() #2512
Fix ddp tests + .test() #2512
Changes from all commits
5c2976c
d11bb1a
3f67989
b75e5d4
ccca15e
22e756e
e977380
b9de0c4
a9b67db
e5585a5
b7b378d
fb9c139
2cab5df
4dbef0f
8687047
906cfbb
38cce5d
6ba26f8
7fe58fd
8e96881
ce7eab3
30a5228
977284d
fda1f3d
8fcc6eb
307e08b
f0227e8
4f55014
cf2f22f
2e11dc2
ed78895
7a2cdbc
3518b7f
3dcaf09
61ad421
336bfec
ffa8fc7
8d8d21e
d112067
0df633a
080ccbf
6ebd9ab
f7ec5c2
de4f485
5e4ab40
8d2b282
4fdca74
6e401c6
7790675
4b7afc6
c20da12
6ab4df6
81b5052
c2fe20a
72aa9fd
23b6c1d
fa32ef7
4a340b5
72b6e93
5f72245
7fa9864
065c020
8996962
4d34ac7
2036861
0abd7c0
1ed4317
eec34f3
5790ce2
b532422
6c26d9d
2e82faf
5d7ad63
41f5c1d
9595874
6d4014e
36a8cf6
15ced0f
eaf3478
7c16724
33fbbc8
ed1f697
a9fd3e0
69ac053
4bfee48
c2e3c9d
4be6eda
fbd3ad7
003f564
196488a
45f2556
9fb51a8
061c5c1
408c802
c3708cf
043ae97
44949aa
c1b88b1
95cab83
07fdb41
74c5464
2ee786e
e400ded
ba50de3
8ca1aa4
cda6b94
37184aa
c96d7a2
341d9b2
b064e81
ae49eb0
7d3c181
acc9707
e3e46d8
f4c7073
8d3ba76
92efcd8
af890e6
78ebf54
f3e47cf
c660dd1
32a4b2e
111555c
ff66eab
ffeb1a1
5a58e8a
31a7487
32929e4
bb2b8a8
202b621
bd66b59
8c61f32
d33ca8a
3697588
2c59d66
e5742d8
e87612c
20fe68d
502f900
d188609
49af187
faf51eb
6cd9d9c
a801af5
9a35bfa
050b449
7638438
2df850f
2440e75
86d95c9
772c521
a227aff
34591b9
281344d
e4a7de4
da1486d
65026f4
22fa88c
6f9c9dc
1d3ef64
cd8e7f9
38c62d2
68dd3bc
0f8c60c
8982911
e1a5661
153e0b1
27a8d14
2aa94c8
cc2483d
a06128c
2f81c0d
73ac3eb
f04d147
0df9467
5a339cd
2e322e0
f60c085
57144f9
99247a2
81edf53
825a454
2facba2
789e65b
4bae01b
a8fe4c1
d742572
ef26a62
223b6b2
9af87b7
e143840
8693c7c
87f87a3
bf4fe46
ae98c6e
5d01d4b
aa30e40
7b0deeb
aa3a9b6
6405a29
64e1c1f
ed4a295
8ff2dec
118a875
5f6557d
a179513
50470af
e76da74
3c58e1b
2e52107
e689360
0aa1920
656ccf8
cd25b09
ba7c546
c377af7
0dd05b6
fbaa81d
98ea716
79a8cb7
6a9e9f6
ab36924
01ba566
a8b034c
9473a34
f549d14
13ba1a5
b3d6b3f
5067e87
4c99856
0640c38
05d9e0f
6a9d317
0024941
00f3624
743d6d6
5d11a0c
7d08987
890c846
33939b6
0f5c54e
16b6eb0
face96f
7cc326f
7abaae7
3ba0f74
e076453
786f893
7bd0feb
47d8f0d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -122,6 +122,8 @@ def train_fx(trial_hparams, cluster_manager, _): | |
from time import sleep | ||
import numpy as np | ||
from os.path import abspath | ||
from torch import distributed as dist | ||
import queue | ||
|
||
import torch | ||
from pytorch_lightning import _logger as log | ||
|
@@ -163,6 +165,10 @@ def train_fx(trial_hparams, cluster_manager, _): | |
else: | ||
XLA_AVAILABLE = True | ||
|
||
pid = os.getpid() | ||
rng1 = np.random.RandomState(pid) | ||
RANDOM_PORTS = rng1.randint(10000, 19999, 100) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. does this cause a failure for a distributed cluster > 100 nodes? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the random_port thing is only used in non-multi node ddp. |
||
|
||
|
||
class TrainerDDPMixin(ABC): | ||
|
||
|
@@ -178,6 +184,7 @@ class TrainerDDPMixin(ABC): | |
use_tpu: bool | ||
default_root_dir: str | ||
progress_bar_callback: ... | ||
checkpoint_callback: ... | ||
num_processes: int | ||
num_nodes: int | ||
node_rank: int | ||
|
@@ -377,17 +384,19 @@ def set_nvidia_flags(self, is_slurm_managing_tasks, data_parallel_device_ids): | |
# don't make this debug... this is good UX | ||
rank_zero_info(f'CUDA_VISIBLE_DEVICES: [{os.environ["CUDA_VISIBLE_DEVICES"]}]') | ||
|
||
def set_random_port(self): | ||
def set_random_port(self, force=False): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @jeremyjordan this function is only ever called from ddp on a single node... not distributed |
||
""" | ||
When running DDP NOT managed by SLURM, the ports might collide | ||
""" | ||
try: | ||
default_port = os.environ['MASTER_PORT'] | ||
except Exception: | ||
# use the process id as a seed to a generator for port only | ||
pid = os.getpid() | ||
rng1 = np.random.RandomState(pid) | ||
default_port = rng1.randint(10000, 19999, 1)[0] | ||
# pick a random port first | ||
assert self.num_nodes == 1, 'random port can only be called from single node training' | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @jeremyjordan added this to make sure it's used as expected There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm understanding this, it looks like this will disable multi-node support (at least, I'm not able to run across multiple nodes anymore due to this assertion - see issue here: flatironinstitute/deepblast#46) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @mortonjt can you open a github issue about this and explain how you launched your script? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure thing. See #2578 |
||
global RANDOM_PORTS | ||
default_port = RANDOM_PORTS[-1] | ||
RANDOM_PORTS = RANDOM_PORTS[:-1] | ||
|
||
# when not forced, use the user port | ||
if not force: | ||
default_port = os.environ.get('MASTER_PORT', default_port) | ||
|
||
os.environ['MASTER_PORT'] = str(default_port) | ||
|
||
|
@@ -446,15 +455,24 @@ def spawn_ddp_children(self, model): | |
sleep(delay) | ||
|
||
local_rank = 0 | ||
self.ddp_train(local_rank, model, is_master=True) | ||
results = self.ddp_train(local_rank, q=None, model=model, is_master=True) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. rather longer var name the |
||
del os.environ['WORLD_SIZE'] | ||
|
||
def ddp_train(self, process_idx, model, is_master=False, proc_offset=0): | ||
return results | ||
|
||
def ddp_train(self, process_idx, q, model, is_master=False, proc_offset=0): | ||
""" | ||
Entry point into a DP thread | ||
:param gpu_idx: | ||
:param model: | ||
:param cluster_obj: | ||
:return: | ||
Entry point for ddp | ||
|
||
Args: | ||
process_idx: | ||
q: | ||
model: | ||
is_master: | ||
proc_offset: | ||
|
||
Returns: | ||
|
||
""" | ||
# offset the process id if requested | ||
process_idx = process_idx + proc_offset | ||
|
@@ -535,7 +553,17 @@ def ddp_train(self, process_idx, model, is_master=False, proc_offset=0): | |
model = model.configure_ddp(model, device_ids) | ||
|
||
# continue training routine | ||
self.run_pretrain_routine(model) | ||
results = self.run_pretrain_routine(model) | ||
|
||
# clean up memory | ||
torch.cuda.empty_cache() | ||
|
||
if self.global_rank == 0 and q is not None: | ||
q.put(self.checkpoint_callback.best_model_path) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this feels hacky, what are we trying to do here? return the state of a callback to the main node? why put this specific attribute in the queue? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think he did this so that one can call .test in the main process, which will access the best model to test on. But I agree with you, this seems fragile and dangerous to modify state across processes, there's gotta be a better way. Could one put the whole trainer in the queue in theory? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i tried that but it didn’t work. i think in a different PR we can do something like state_dict for the trainer. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i agree this isn't optimal but let's get a release that fixes all the test issues (which this PR does) and then we can figure out a longer term strategy |
||
q.put(results) | ||
|
||
if self.global_rank == 0 and self.distributed_backend != 'ddp_spawn': | ||
return results | ||
|
||
def save_spawn_weights(self, model): | ||
""" | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is not needed anymore?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@williamFalcon