Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix parallel states fake initialization for new MCore with MoE parallel folding #11433

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 37 additions & 5 deletions nemo/collections/nlp/modules/common/megatron/megatron_init.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ def fake_initialize_model_parallel(
pipeline_model_parallel_split_rank_=None,
virtual_pipeline_model_parallel_size_=None,
expert_model_parallel_size_=1,
expert_tensor_parallel_size_=None,
context_parallel_size_=1,
encoder_tensor_model_parallel_size_=0,
encoder_pipeline_model_parallel_size_=0,
Expand Down Expand Up @@ -349,23 +350,54 @@ def fake_initialize_model_parallel(

decoder_rank_generator = RankGenerator(
tp=tensor_model_parallel_size,
ep=expert_model_parallel_size_,
ep=1,
dp=data_parallel_size,
pp=pipeline_model_parallel_size,
cp=context_parallel_size,
order='tp-pp-dp' if use_tp_pp_dp_mapping else 'tp-cp-ep-dp-pp',
rank_offset=encoder_world_size,
)
# Build expert rank generator
if expert_tensor_parallel_size_ is None:
expert_tensor_parallel_size_ = tensor_model_parallel_size
expert_tensor_model_pipeline_parallel_size = (
expert_tensor_parallel_size_ * expert_model_parallel_size_ * pipeline_model_parallel_size
)
expert_data_parallel_size = decoder_world_size // expert_tensor_model_pipeline_parallel_size
if decoder_world_size % expert_tensor_model_pipeline_parallel_size != 0:
raise RuntimeError(
f"decoder world_size ({decoder_world_size}) is not divisible by expert_tensor_model_pipeline_parallel size ({expert_tensor_model_pipeline_parallel_size})"
)

def generator_wrapper(group_type, **kwargs):
from itertools import cycle
# TODO: support expert specific ordering
expert_decoder_rank_generator = RankGenerator(
tp=expert_tensor_parallel_size_,
ep=expert_model_parallel_size_,
dp=expert_data_parallel_size,
pp=pipeline_model_parallel_size,
cp=1,
order='tp-pp-dp' if use_tp_pp_dp_mapping else 'tp-cp-ep-dp-pp',
rank_offset=encoder_world_size,
)

assert decoder_rank_generator.get_ranks("pp") == expert_decoder_rank_generator.get_ranks(
"pp"
), f"Pipeline parallel groups are expected to be the same for Non-Expert and Expert part, \
but got {decoder_rank_generator.get_ranks('pp')} and {expert_decoder_rank_generator.get_ranks('pp')}"

def generator_wrapper(group_type, is_expert=False, **kwargs):
"""The `RankGenerator` class produces a hyper-rectangle for a given set of
tensor, pipeline, data, expert, and context parallelism. If we have an encoder,
in addition to the default decoder, we essentially instantiate two `RankGenerator`
classes to construct the parallelism for each module separately, and we then have
to stitch them together for the right groups. For now, this means pp and tp-pp."""
d_ranks = decoder_rank_generator.get_ranks(group_type, **kwargs)
from itertools import cycle

if is_expert:
d_ranks = expert_decoder_rank_generator.get_ranks(group_type, **kwargs)
else:
d_ranks = decoder_rank_generator.get_ranks(group_type, **kwargs)

if encoder_rank_generator is None:
for x in d_ranks:
yield x
Expand Down Expand Up @@ -446,7 +478,7 @@ def generator_wrapper(group_type, **kwargs):
# EP rank
expert_model_parallel_rank = 0
if expert_model_parallel_size_ is not None and expert_model_parallel_size_ > 1:
for ranks in generator_wrapper('ep', independent_ep=True):
for ranks in generator_wrapper('ep', is_expert=True):
if rank in ranks:
expert_model_parallel_rank = list(ranks).index(rank)

Expand Down
38 changes: 34 additions & 4 deletions nemo/lightning/megatron_init.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ def fake_initialize_model_parallel(
pipeline_model_parallel_split_rank_=None,
virtual_pipeline_model_parallel_size_=None,
expert_model_parallel_size_=1,
expert_tensor_parallel_size_=None,
context_parallel_size_=1,
encoder_tensor_model_parallel_size_=0,
encoder_pipeline_model_parallel_size_=0,
Expand Down Expand Up @@ -349,23 +350,52 @@ def fake_initialize_model_parallel(

decoder_rank_generator = RankGenerator(
tp=tensor_model_parallel_size,
ep=expert_model_parallel_size_,
ep=1,
dp=data_parallel_size,
pp=pipeline_model_parallel_size,
cp=context_parallel_size,
order='tp-pp-dp' if use_tp_pp_dp_mapping else 'tp-cp-ep-dp-pp',
rank_offset=encoder_world_size,
)
# Build expert rank generator
if expert_tensor_parallel_size_ is None:
expert_tensor_parallel_size_ = tensor_model_parallel_size
expert_tensor_model_pipeline_parallel_size = (
expert_tensor_parallel_size_ * expert_model_parallel_size_ * pipeline_model_parallel_size
)
expert_data_parallel_size = decoder_world_size // expert_tensor_model_pipeline_parallel_size
if decoder_world_size % expert_tensor_model_pipeline_parallel_size != 0:
raise RuntimeError(
f"decoder world_size ({decoder_world_size}) is not divisible by expert_tensor_model_pipeline_parallel size ({expert_tensor_model_pipeline_parallel_size})"
)

def generator_wrapper(group_type, **kwargs):
expert_decoder_rank_generator = RankGenerator(
tp=expert_tensor_parallel_size_,
ep=expert_model_parallel_size_,
dp=expert_data_parallel_size,
pp=pipeline_model_parallel_size,
cp=1,
order='tp-pp-dp' if use_tp_pp_dp_mapping else 'tp-cp-ep-dp-pp',
rank_offset=encoder_world_size,
)

assert decoder_rank_generator.get_ranks("pp") == expert_decoder_rank_generator.get_ranks(
"pp"
), f"Pipeline parallel groups are expected to be the same for Non-Expert and Expert part, \
but got {decoder_rank_generator.get_ranks('pp')} and {expert_decoder_rank_generator.get_ranks('pp')}"

def generator_wrapper(group_type, is_expert=False, **kwargs):
from itertools import cycle

"""The `RankGenerator` class produces a hyper-rectangle for a given set of
tensor, pipeline, data, expert, and context parallelism. If we have an encoder,
in addition to the default decoder, we essentially instantiate two `RankGenerator`
classes to construct the parallelism for each module separately, and we then have
to stitch them together for the right groups. For now, this means pp and tp-pp."""
d_ranks = decoder_rank_generator.get_ranks(group_type, **kwargs)
if is_expert:
d_ranks = expert_decoder_rank_generator.get_ranks(group_type, **kwargs)
else:
d_ranks = decoder_rank_generator.get_ranks(group_type, **kwargs)
if encoder_rank_generator is None:
for x in d_ranks:
yield x
Expand Down Expand Up @@ -446,7 +476,7 @@ def generator_wrapper(group_type, **kwargs):
# EP rank
expert_model_parallel_rank = 0
if expert_model_parallel_size_ is not None and expert_model_parallel_size_ > 1:
for ranks in generator_wrapper('ep', independent_ep=True):
for ranks in generator_wrapper('ep', is_expert=True):
if rank in ranks:
expert_model_parallel_rank = list(ranks).index(rank)

Expand Down
9 changes: 8 additions & 1 deletion tests/collections/nlp/test_initialize.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,14 @@ def test_fake_initialize(nodes, num_gpu, tp, pp, cp, ep):
n_data_parallel_size,
n_pipeline_model_parallel_split_rank,
n_virtual_pipeline_model_parallel_rank,
) = fake_initialize_model_parallel(nodes * num_gpu, 0, tp, pp, None, None, ep, cp)
) = fake_initialize_model_parallel(
world_size=nodes * num_gpu,
rank=0,
tensor_model_parallel_size_=tp,
pipeline_model_parallel_size_=pp,
expert_model_parallel_size_=ep,
context_parallel_size_=cp,
)
assert m_tensor_model_parallel_rank == tensor_model_parallel_rank
assert n_pipeline_model_parallel_rank == pipeline_model_parallel_rank
assert n_expert_model_parallel_rank == expert_model_parallel_rank
Expand Down
Loading