Skip to content

Conversation

zhangyuqin1998
Copy link
Contributor

@zhangyuqin1998 zhangyuqin1998 commented Mar 5, 2025

PR Category

Distributed Strategy

PR Types

New features

Description

An implementation of the DeepSeek-V3 DualPipeV, based on https://github.com/deepseek-ai/DualPipe/blob/main/dualpipe/dualpipev.py

image

For the pipeline schedule

Usage:
set use_dualpipev=True for both your PipelineLayer and the strategy.hybrid_configs

The following codes can be run using python -m paddle.distributed.launch --gpus="0,1,2,3" demo.py

import random
import numpy as np
import paddle
import paddle.nn as nn
from paddle.distributed import fleet
from paddle.distributed.fleet.meta_parallel import LayerDesc, PipelineLayer
from paddle.io import Dataset, DataLoader

# Constants
BATCH_NUM = 20
BATCH_SIZE = 80
MICRO_BATCH_SIZE = 2
SEQ_LEN = 1024
HIDDEN_SIZE = 2048

# Dataset class
class RandomDataset(Dataset):
    def __init__(self, num_samples):
        self.num_samples = num_samples

    def __getitem__(self, idx):
        image = np.random.random([SEQ_LEN, HIDDEN_SIZE]).astype('float32')
        label = np.random.randint(0, 10, (SEQ_LEN)).astype('int64')
        return image, label

    def __len__(self):
        return self.num_samples

class LinearPipe(nn.Linear):
    def forward(self, input):
        if isinstance(input, list):
            input = input[0]
        return paddle.matmul(input, self.weight)
    
class CrossEntropyLossPipe(nn.CrossEntropyLoss):
    def forward(self, logits, label):
        if isinstance(logits, list):
            logits = logits[0]
        return super().forward(logits, label)

# Pipeline description class
class SimplePipeDesc(PipelineLayer):
    def __init__(self, **kwargs):
        descs = [LayerDesc(LinearPipe, HIDDEN_SIZE, HIDDEN_SIZE) for _ in range(8)]
        super(SimplePipeDesc, self).__init__(
            layers=descs, loss_fn=CrossEntropyLossPipe(), **kwargs
        )

# Main function
if __name__ == "__main__":
    # Distributed strategy configuration
    strategy = fleet.DistributedStrategy()
    pipeline_parallel_size = 4
    strategy.hybrid_configs = {
        "pp_degree": pipeline_parallel_size
    }
    strategy.pipeline_configs = {
        "accumulate_steps": BATCH_SIZE // MICRO_BATCH_SIZE,
        "micro_batch_size": MICRO_BATCH_SIZE
    }
    strategy.hybrid_configs["pp_configs"].use_dualpipev = True

    # Initialize fleet
    fleet.init(is_collective=True, strategy=strategy)

    # Model and optimizer setup
    model = SimplePipeDesc(
        num_stages=pipeline_parallel_size,
        topology=fleet.get_hybrid_communicate_group()._topo,
        use_dualpipev=True
    )
    model = fleet.distributed_model(model)

    scheduler = paddle.optimizer.lr.PiecewiseDecay(
        boundaries=[2], values=[0.001, 0.002], verbose=False
    )
    optimizer = paddle.optimizer.SGD(
        learning_rate=scheduler, parameters=model.parameters()
    )
    optimizer = fleet.distributed_optimizer(optimizer)

    # Data loader setup
    dataset = RandomDataset(BATCH_NUM * BATCH_SIZE)
    train_reader = DataLoader(
        dataset,
        batch_size=BATCH_SIZE,
        shuffle=True,
        drop_last=True,
        num_workers=2
    )

    # Training loop
    for i, (input, label) in enumerate(train_reader()):
        if i >= 5:
            break
        loss = model.train_batch([input, label], optimizer, scheduler)
        print("pp_loss:", loss.numpy())

For the SplitBW Linear

SplitBW Linear is used for zero bubble pipeline proposed in https://arxiv.org/abs/2401.10241

Use paddle.distributed.fleet.meta_parallel.zero_bubble_utils.SplitBWLinear to replace the standard nn.Linear. Notably, SplitBWLinear can only be used in DualPipeV; otherwise, users need to manage the WeightGradStore themselves to ensure that all weight gradients are calculated.
image

Pcard-76459

Copy link

paddle-bot bot commented Mar 5, 2025

你的PR提交成功,感谢你对开源项目的贡献!
请关注后续CI自动化测试结果,详情请参考Paddle-CI手册
Your PR has been submitted. Thanks for your contribution!
Please wait for the result of CI firstly. See Paddle CI Manual for details.

Copy link
Contributor

@zhangbo9674 zhangbo9674 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Member

@ForFishes ForFishes left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Member

@ForFishes ForFishes left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@ForFishes ForFishes merged commit bffc15b into PaddlePaddle:develop Mar 14, 2025
31 checks passed
zhangyuqin1998 added a commit to zhangyuqin1998/Paddle that referenced this pull request Apr 14, 2025
* [Distribution] Support DualPipeV
ForFishes pushed a commit that referenced this pull request Apr 14, 2025
* [Distribution] Support DualPipeV (#71427)

* [Distribution] Support DualPipeV

* [Distributed] Add fail-fast for dualpipev (#71977)

* [Distribution] support ScheduleNode for overlapping in dualpipev (#71665)

* [Distribution] support ScheduleNode for overlapping in dualpipev

* fix

* opt mem

* [Bug fix] fix mem leakage in dualpipev (#72070)

* fix code style

* fix pipeline in dynamic_shape
YqGe585 pushed a commit to YqGe585/Paddle that referenced this pull request May 7, 2025
* [Distribution] Support DualPipeV

* fix

* fix
@zhangyuqin1998 zhangyuqin1998 deleted the dualpipev_pr branch May 9, 2025 06:18
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants