24.5 C
Canberra
Saturday, January 3, 2026

Prepare Your Massive Mannequin on A number of GPUs with Totally Sharded Information Parallelism


import dataclasses

import functools

import os

 

import datasets

import tokenizers

import torch

import torch.distributed as dist

import torch.nn as nn

import torch.nn.purposeful as F

import torch.optim.lr_scheduler as lr_scheduler

import tqdm

from torch import Tensor

from torch.distributed.algorithms._checkpoint.checkpoint_wrapper import (

    apply_activation_checkpointing,

    checkpoint_wrapper,

)

from torch.distributed.checkpoint import load, save

from torch.distributed.checkpoint.state_dict import (

    StateDictOptions,

    get_state_dict,

    set_state_dict,

)

from torch.distributed.fsdp import (

    CPUOffloadPolicy,

    FSDPModule,

    MixedPrecisionPolicy,

    fully_shard,

)

from torch.distributed.fsdp.wrap import transformer_auto_wrap_policy

from torch.utils.knowledge.distributed import DistributedSampler

 

 

# Construct the mannequin

@dataclasses.dataclass

class LlamaConfig:

    “”“Outline Llama mannequin hyperparameters.”“”

    vocab_size: int = 50000  # Measurement of the tokenizer vocabulary

    max_position_embeddings: int = 2048  # Most sequence size

    hidden_size: int = 768  # Dimension of hidden layers

    intermediate_size: int = 4*768  # Dimension of MLP’s hidden layer

    num_hidden_layers: int = 12  # Variety of transformer layers

    num_attention_heads: int = 12  # Variety of consideration heads

    num_key_value_heads: int = 3  # Variety of key-value heads for GQA

 

 

class RotaryPositionEncoding(nn.Module):

    “”“Rotary place encoding.”“”

 

    def __init__(self, dim: int, max_position_embeddings: int) -> None:

        “”“Initialize the RotaryPositionEncoding module.

 

        Args:

            dim: The hidden dimension of the enter tensor to which RoPE is utilized

            max_position_embeddings: The utmost sequence size of the enter tensor

        ““”

        tremendous().__init__()

        self.dim = dim

        self.max_position_embeddings = max_position_embeddings

        # compute a matrix of ntheta_i

        N = 10_000.0

        inv_freq = 1.0 / (N ** (torch.arange(0, dim, 2) / dim))

        inv_freq = torch.cat((inv_freq, inv_freq), dim=1)

        place = torch.arange(max_position_embeddings)

        sinusoid_inp = torch.outer(place, inv_freq)

        # save cosine and sine matrices as buffers, not parameters

        self.register_buffer(“cos”, sinusoid_inp.cos())

        self.register_buffer(“sin”, sinusoid_inp.sin())

 

    def ahead(self, x: Tensor) -> Tensor:

        “”“Apply RoPE to tensor x.

 

        Args:

            x: Enter tensor of form (batch_size, seq_length, num_heads, head_dim)

 

        Returns:

            Output tensor of form (batch_size, seq_length, num_heads, head_dim)

        ““”

        batch_size, seq_len, num_heads, head_dim = x.form

        system = x.system

        dtype = x.dtype

        # rework the cosine and sine matrices to 4D tensor and the identical dtype as x

        cos = self.cos.to(system, dtype)[:seq_len].view(1, seq_len, 1, 1)

        sin = self.sin.to(system, dtype)[:seq_len].view(1, seq_len, 1, 1)

        # apply RoPE to x

        x1, x2 = x.chunk(2, dim=1)

        rotated = torch.cat((x2, x1), dim=1)

        output = (x * cos) + (rotated * sin)

        return output

 

 

class LlamaAttention(nn.Module):

    “”“Grouped-query consideration with rotary embeddings.”“”

 

    def __init__(self, config: LlamaConfig) -> None:

        tremendous().__init__()

        self.hidden_size = config.hidden_size

        self.num_heads = config.num_attention_heads

        self.head_dim = self.hidden_size // self.num_heads

        self.num_kv_heads = config.num_key_value_heads  # GQA: H_kv < H_q

 

        # hidden_size should be divisible by num_heads

        assert (self.head_dim * self.num_heads) == self.hidden_dimension

 

        # Linear layers for Q, Ok, V projections

        self.q_proj = nn.Linear(self.hidden_size, self.num_heads * self.head_dim, bias=False)

        self.k_proj = nn.Linear(self.hidden_size, self.num_kv_heads * self.head_dim, bias=False)

        self.v_proj = nn.Linear(self.hidden_size, self.num_kv_heads * self.head_dim, bias=False)

        self.o_proj = nn.Linear(self.num_heads * self.head_dim, self.hidden_size, bias=False)

 

    def reset_parameters(self):

        self.q_proj.reset_parameters()

        self.k_proj.reset_parameters()

        self.v_proj.reset_parameters()

        self.o_proj.reset_parameters()

 

    def ahead(self, hidden_states: Tensor, rope: RotaryPositionEncoding, attn_mask: Tensor) -> Tensor:

        bs, seq_len, dim = hidden_states.dimension()

 

        # Venture inputs to Q, Ok, V

        query_states = self.q_proj(hidden_states).view(bs, seq_len, self.num_heads, self.head_dim)

        key_states = self.k_proj(hidden_states).view(bs, seq_len, self.num_kv_heads, self.head_dim)

        value_states = self.v_proj(hidden_states).view(bs, seq_len, self.num_kv_heads, self.head_dim)

 

        # Apply rotary place embeddings

        query_states = rope(query_states)

        key_states = rope(key_states)

 

        # Transpose tensors from BSHD to BHSD dimension for scaled_dot_product_attention

        query_states = query_states.transpose(1, 2)

        key_states = key_states.transpose(1, 2)

        value_states = value_states.transpose(1, 2)

 

        # Use PyTorch’s optimized consideration implementation

        # setting is_causal=True is incompatible with setting specific consideration masks

        attn_output = F.scaled_dot_product_attention(

            query_states,

            key_states,

            value_states,

            attn_mask=attn_mask,

            dropout_p=0.0,

            enable_gqa=True,

        )

 

        # Transpose output tensor from BHSD to BSHD dimension, reshape to 3D, after which challenge output

        attn_output = attn_output.transpose(1, 2).reshape(bs, seq_len, self.hidden_size)

        attn_output = self.o_proj(attn_output)

        return attn_output

 

 

class LlamaMLP(nn.Module):

    “”“Feed-forward community with SwiGLU activation.”“”

 

    def __init__(self, config: LlamaConfig) -> None:

        tremendous().__init__()

        # Two parallel projections for SwiGLU

        self.gate_proj = nn.Linear(config.hidden_size, config.intermediate_size, bias=False)

        self.up_proj = nn.Linear(config.hidden_size, config.intermediate_size, bias=False)

        self.act_fn = F.silu  # SwiGLU activation operate

        # Venture again to hidden dimension

        self.down_proj = nn.Linear(config.intermediate_size, config.hidden_size, bias=False)

 

    def reset_parameters(self):

        self.gate_proj.reset_parameters()

        self.up_proj.reset_parameters()

        self.down_proj.reset_parameters()

 

    def ahead(self, x: Tensor) -> Tensor:

        # SwiGLU activation: multiply gate and up-projected inputs

        gate = self.act_fn(self.gate_proj(x))

        up = self.up_proj(x)

        return self.down_proj(gate * up)

 

 

class LlamaDecoderLayer(nn.Module):

    “”“Single transformer layer for a Llama mannequin.”“”

 

    def __init__(self, config: LlamaConfig) -> None:

        tremendous().__init__()

        self.input_layernorm = nn.RMSNorm(config.hidden_size, eps=1e5)

        self.self_attn = LlamaAttention(config)

        self.post_attention_layernorm = nn.RMSNorm(config.hidden_size, eps=1e5)

        self.mlp = LlamaMLP(config)

 

    def reset_parameters(self):

        self.input_layernorm.reset_parameters()

        self.self_attn.reset_parameters()

        self.post_attention_layernorm.reset_parameters()

        self.mlp.reset_parameters()

 

    def ahead(self, hidden_states: Tensor, rope: RotaryPositionEncoding, attn_mask: Tensor) -> Tensor:

        # First residual block: Self-attention

        residual = hidden_states

        hidden_states = self.input_layernorm(hidden_states)

        attn_outputs = self.self_attn(hidden_states, rope=rope, attn_mask=attn_mask)

        hidden_states = attn_outputs + residual

 

        # Second residual block: MLP

        residual = hidden_states

        hidden_states = self.post_attention_layernorm(hidden_states)

        hidden_states = self.mlp(hidden_states) + residual

        return hidden_states

 

 

class LlamaModel(nn.Module):

    “”“The total Llama mannequin with none pretraining heads.”“”

 

    def __init__(self, config: LlamaConfig) -> None:

        tremendous().__init__()

        self.rotary_emb = RotaryPositionEncoding(

            config.hidden_size // config.num_attention_heads,

            config.max_position_embeddings,

        )

 

        self.embed_tokens = nn.Embedding(config.vocab_size, config.hidden_size)

        self.layers = nn.ModuleList([

            LlamaDecoderLayer(config) for _ in range(config.num_hidden_layers)

        ])

        self.norm = nn.RMSNorm(config.hidden_size, eps=1e5)

 

    def reset_parameters(self):

        self.embed_tokens.reset_parameters()

        for layer in self.layers:

            layer.reset_parameters()

        self.norm.reset_parameters()

 

    def ahead(self, input_ids: Tensor, attn_mask: Tensor) -> Tensor:

        # Convert enter token IDs to embeddings

        hidden_states = self.embed_tokens(input_ids)

        # Course of by way of all transformer layers, then the ultimate norm layer

        for layer in self.layers:

            hidden_states = layer(hidden_states, rope=self.rotary_emb, attn_mask=attn_mask)

        hidden_states = self.norm(hidden_states)

        # Return the ultimate hidden states

        return hidden_states

 

 

class LlamaForPretraining(nn.Module):

    def __init__(self, config: LlamaConfig) -> None:

        tremendous().__init__()

        self.base_model = LlamaModel(config)

        self.lm_head = nn.Linear(config.hidden_size, config.vocab_size, bias=False)

 

    def reset_parameters(self):

        self.base_model.reset_parameters()

        self.lm_head.reset_parameters()

 

    def ahead(self, input_ids: Tensor, attn_mask: Tensor) -> Tensor:

        hidden_states = self.base_model(input_ids, attn_mask)

        return self.lm_head(hidden_states)

 

 

def create_causal_mask(batch: Tensor, dtype: torch.dtype = torch.float32) -> Tensor:

    “”“Create a causal masks for self-attention.

 

    Args:

        batch: Batch of sequences, form (batch_size, seq_len)

        dtype: Information sort of the masks

 

    Returns:

        Causal masks of form (seq_len, seq_len)

    ““”

    batch_size, seq_len = batch.form

    masks = torch.full((seq_len, seq_len), float(“-inf”), system=batch.system, dtype=dtype)

                .triu(diagonal=1)

    return masks

 

 

def create_padding_mask(batch: Tensor, padding_token_id: int, dtype: torch.dtype = torch.float32) -> Tensor:

    “”“Create a padding masks for a batch of sequences for self-attention.

 

    Args:

        batch: Batch of sequences, form (batch_size, seq_len)

        padding_token_id: ID of the padding token

        dtype: Information sort of the masks

 

    Returns:

        Padding masks of form (batch_size, 1, seq_len, seq_len)

    ““”

    padded = torch.zeros_like(batch, system=batch.system, dtype=dtype)

                  .masked_fill(batch == padding_token_id, float(“-inf”))

    masks = padded[:,:,None] + padded[:,None,:]

    return masks[:, None, :, :]

 

 

# Generator operate to create padded sequences of mounted size

class PretrainingDataset(torch.utils.knowledge.Dataset):

    def __init__(self, dataset: datasets.Dataset, tokenizer: tokenizers.Tokenizer,

                 seq_length: int):

        self.dataset = dataset

        self.tokenizer = tokenizer

        self.seq_length = seq_length

        self.bot = tokenizer.token_to_id(“[BOT]”)

        self.eot = tokenizer.token_to_id(“[EOT]”)

        self.pad = tokenizer.token_to_id(“[PAD]”)

 

    def __len__(self):

        return len(self.dataset)

 

    def __getitem__(self, index: int) -> tuple[Tensor, Tensor]:

        “”“Get a sequence of token ids from the dataset. [BOT] and [EOT] tokens

        are added. Clipped and padded to the sequence size.

        ““”

        seq = self.dataset[index][“text”]

        tokens: listing[int] = [self.bot] + self.tokenizer.encode(seq).ids + [self.eot]

        # pad to focus on sequence size

        toklen = len(tokens)

        if toklen < self.seq_length+1:

            pad_length = self.seq_length+1 toklen

            tokens += [self.pad] * pad_size

        # return the sequence

        x = torch.tensor(tokens[:self.seq_length], dtype=torch.int64)

        y = torch.tensor(tokens[1:self.seq_length+1], dtype=torch.int64)

        return x, y

 

 

def load_checkpoint(mannequin: nn.Module, optimizer: torch.optim.Optimizer, scheduler: lr_scheduler.SequentialLR) -> None:

    dist.barrier()

    model_state, optimizer_state = get_state_dict(

        mannequin, optimizer, choices=StateDictOptions(full_state_dict=True, cpu_offload=cpu_offload),

    )

    load(

        {“mannequin”: model_state, “optimizer”: optimizer_state},

        checkpoint_id=“checkpoint-dist”,

    )

    set_state_dict(

        mannequin, optimizer,

        model_state_dict=model_state, optim_state_dict=optimizer_state,

        choices=StateDictOptions(broadcast_from_rank0=True, full_state_dict=True, cpu_offload=cpu_offload),

    )

    scheduler.load_state_dict(

        torch.load(“checkpoint-dist/lrscheduler.pt”, map_location=system),

    )

    dist.barrier()

 

 

def save_checkpoint(mannequin: nn.Module, optimizer: torch.optim.Optimizer, scheduler: lr_scheduler.SequentialLR) -> None:

    dist.barrier()

    model_state, optimizer_state = get_state_dict(

        mannequin, optimizer, choices=StateDictOptions(full_state_dict=True, cpu_offload=cpu_offload),

    )

    save(

        {“mannequin”: model_state, “optimizer”: optimizer_state},

        checkpoint_id=“checkpoint-dist”,

    )

    if dist.get_rank() == 0:

        torch.save(scheduler.state_dict(), “checkpoint-dist/lrscheduler.pt”)

    dist.barrier()

 

 

# Load the tokenizer and dataset

tokenizer = tokenizers.Tokenizer.from_file(“bpe_50K.json”)

dataset = datasets.load_dataset(“HuggingFaceFW/fineweb”, “sample-10BT”, cut up=“prepare”)

 

# Initialize the distributed setting

dist.init_process_group(backend=“nccl”)

local_rank = int(os.environ[“LOCAL_RANK”])

system = torch.system(f“cuda:{local_rank}”)

rank = dist.get_rank()

world_size = dist.get_world_size()

print(f“World dimension {world_size}, rank {rank}, native rank {local_rank}. Utilizing {system}”)

 

# Create pretraining mannequin on meta system, on all ranks

with torch.system(“meta”):

    model_config = LlamaConfig()

    mannequin = LlamaForPretraining(model_config)

 

# Convert mannequin from meta system to FSDP2, should shard each part

cpu_offload = False

fsdp_kwargs = {

    # elective: use combined precision coaching

    “mp_policy”: MixedPrecisionPolicy(

        param_dtype=torch.bfloat16,

        reduce_dtype=torch.float32,

    ),

    # elective: CPU offloading

    “offload_policy”: CPUOffloadPolicy() if cpu_offload else None,

    # elective: discard all-gathered parameters after ahead go even on root modules

    # “reshard_after_forward”: True,

}

for layer in mannequin.base_model.layers:

    fully_shard(layer, **fsdp_kwargs)

fully_shard(mannequin.base_model, **fsdp_kwargs)

fully_shard(mannequin, **fsdp_kwargs)

mannequin.to_empty(system=“cpu” if cpu_offload else system)

mannequin.reset_parameters()

assert isinstance(mannequin, FSDPModule), f“Anticipated FSDPModule, bought {sort(mannequin)}”

 

# Set specific prefetching on fashions

# extra prefetching makes use of extra reminiscence, however enable extra overlap of computation and communication

num_prefetch = 1

if num_prefetch > 1:

    modules = listing(mannequin.base_model.layers)

    for i, module in enumerate(modules):

        if i == len(modules) 1:

            break

        module.set_modules_to_forward_prefetch(modules[i+1:i+num_prefetch+1])

    for i, module in enumerate(modules):

        if i == 0:

            proceed

        module.set_modules_to_backward_prefetch(modules[max(0, inum_prefetch):i])

 

# Optionally available: Apply gradient checkpointing on a distributed mannequin (all ranks)

#wrap_policy = functools.partial(

#    transformer_auto_wrap_policy,

#    transformer_layer_cls={LlamaDecoderLayer, nn.Embedding},

#)

#apply_activation_checkpointing(

#    mannequin,

#    checkpoint_wrapper_fn=checkpoint_wrapper,

#    auto_wrap_policy=wrap_policy,

#)

 

# Coaching parameters

epochs = 3

learning_rate = 1e3

batch_size = 64 // world_size

seq_length = 512

num_warmup_steps = 1000

PAD_TOKEN_ID = tokenizer.token_to_id(“[PAD]”)

mannequin.prepare()

 

# DataLoader, optimizer, scheduler, and loss operate

# Sampler is required to shard the dataset throughout world dimension

dataset = PretrainingDataset(dataset, tokenizer, seq_length)

sampler = DistributedSampler(dataset, shuffle=False, drop_last=True)

dataloader = torch.utils.knowledge.DataLoader(

    dataset,

    sampler=sampler,

    batch_size=batch_size,

    pin_memory=True,  # elective

    shuffle=False,

    num_workers=2,

    prefetch_factor=2,

)

num_training_steps = len(dataloader) * epochs

 

optimizer = torch.optim.AdamW(

    mannequin.parameters(), lr=learning_rate, betas=(0.9, 0.99), eps=1e8, weight_decay=0.1,

)

warmup_scheduler = lr_scheduler.LinearLR(

    optimizer,

    start_factor=0.1, end_factor=1.0, total_iters=num_warmup_steps,

)

cosine_scheduler = lr_scheduler.CosineAnnealingLR(

    optimizer,

    T_max=num_training_steps num_warmup_steps,

    eta_min=0,

)

scheduler = lr_scheduler.SequentialLR(

    optimizer,

    schedulers=[warmup_scheduler, cosine_scheduler],

    milestones=[num_warmup_steps],

)

loss_fn = nn.CrossEntropyLoss(ignore_index=PAD_TOKEN_ID)

 

# Optionally available: Compile the mannequin and loss operate

#mannequin = torch.compile(mannequin)

#loss_fn = torch.compile(loss_fn)

 

# if checkpoint-dist dir exists, load the checkpoint to mannequin and optimizer

if os.path.exists(“checkpoint-dist”):

    load_checkpoint(mannequin, optimizer, scheduler)

 

# begin coaching

for epoch in vary(epochs):

    pbar = tqdm.tqdm(dataloader, desc=f“Epoch {epoch+1}/{epochs}”)

    for batch_id, batch in enumerate(pbar):

        if batch_id % 1000 == 0:

            save_checkpoint(mannequin, optimizer, scheduler)

        # Specific prefetching earlier than sending any knowledge to mannequin

        mannequin.unshard()

        # Get batched knowledge, transfer from CPU to GPU

        input_ids, target_ids = batch

        input_ids = input_ids.to(system)

        target_ids = target_ids.to(system)

        # create consideration masks: causal masks + padding masks

        attn_mask = create_causal_mask(input_ids) +

                    create_padding_mask(input_ids, PAD_TOKEN_ID)

        # Extract output from mannequin

        logits = mannequin(input_ids, attn_mask)

        # Compute loss: cross-entropy between logits and goal, ignoring padding tokens

        loss = loss_fn(logits.view(1, logits.dimension(1)), target_ids.view(1))

        # Backward with loss and gradient clipping by L2 norm to 1.0

        # Optimizer and gradient clipping works on DTensor

        optimizer.zero_grad(set_to_none=False if cpu_offload else True)

        loss.backward()

        # All-reduce fail if utilizing CPU offloading

        if not cpu_offload:

            torch.nn.utils.clip_grad_norm_(mannequin.parameters(), 1.0)

        optimizer.step()

        scheduler.step()

        pbar.set_postfix(loss=loss.merchandise())

        pbar.replace(1)

    pbar.shut()

 

# Save the mannequin

save_checkpoint(mannequin, optimizer, scheduler)

 

# Clear up the distributed setting

dist.destroy_process_group()

Related Articles

LEAVE A REPLY

Please enter your comment!
Please enter your name here

[td_block_social_counter facebook="tagdiv" twitter="tagdivofficial" youtube="tagdiv" style="style8 td-social-boxed td-social-font-icons" tdc_css="eyJhbGwiOnsibWFyZ2luLWJvdHRvbSI6IjM4IiwiZGlzcGxheSI6IiJ9LCJwb3J0cmFpdCI6eyJtYXJnaW4tYm90dG9tIjoiMzAiLCJkaXNwbGF5IjoiIn0sInBvcnRyYWl0X21heF93aWR0aCI6MTAxOCwicG9ydHJhaXRfbWluX3dpZHRoIjo3Njh9" custom_title="Stay Connected" block_template_id="td_block_template_8" f_header_font_family="712" f_header_font_transform="uppercase" f_header_font_weight="500" f_header_font_size="17" border_color="#dd3333"]
- Advertisement -spot_img

Latest Articles