Skip to content

Run Your First Distributed Training

Docs

This quickstart demonstrates how to run distributed training workloads in Run:AI. Distributed training allows you to split model training across multiple GPUs and nodes, dramatically reducing training time for large models.

What is Distributed Training?

Distributed training is the ability to split the training of a model among multiple processors. Multiple workers operate in parallel to accelerate model training, enabling:

  • Faster Training: Reduce training time from days to hours
  • Larger Models: Train models that don't fit on a single GPU
  • Better Resource Utilization: Leverage multiple GPUs efficiently

Prerequisites

Before starting, ensure you have: - A project with at least 2 GPU quota (for multi-worker training) - Access to the Run:AI user interface or CLI - Basic understanding of distributed training concepts

Method 1: Using the UI

1. Access the Run:AI Interface

Log in to your Run:AI user interface through your web browser.

2. Create a New Training Workload

  1. Navigate to Workload manager → Workloads
  2. Click "+NEW WORKLOAD"
  3. Select "Training" from the workload types

3. Configure Basic Settings

  1. Select Cluster and Project: Choose your target cluster and project
  2. Training Architecture: Select "Distributed"
  3. Framework: Choose "PyTorch" (most common) or "TensorFlow"
  4. Template: Select "Start from Scratch" or use an existing template

4. Environment Configuration

Container Image: Use a distributed training ready image:

# PyTorch distributed examples
kubeflow/pytorch-dist-mnist:latest
nvcr.io/nvidia/pytorch:23.10-py3

# TensorFlow distributed examples  
tensorflow/tensorflow:latest-gpu
nvcr.io/nvidia/tensorflow:23.10-tf2-py3

5. Distributed Configuration

Worker Configuration: - Number of Workers: Start with 2 workers - Master Node: Automatically configured - Worker Nodes: Will be distributed across available GPUs

Resource Allocation per Worker:

GPU Request: 1 GPU per worker
CPU: 4 cores per worker  
Memory: 8Gi per worker

6. Submit the Training Job

  1. Review your distributed configuration
  2. Click "CREATE WORKLOAD"
  3. Monitor training progress in the workload manager

Method 2: Using the CLI

1. Login to Run:AI

runai login

2. Submit Distributed Training Job

Basic PyTorch Distributed Training:

runai distributed submit "pytorch-dist-mnist" \
    --framework PyTorch \
    --image kubeflow/pytorch-dist-mnist:latest \
    --workers 2 \
    --gpu-request-type portion \
    --gpu-portion-request 0.5 \
    --cpu-request 2 \
    --memory-request 4Gi

Advanced Configuration:

runai distributed submit "bert-distributed" \
    --framework PyTorch \
    --image huggingface/transformers-pytorch-gpu:latest \
    --workers 4 \
    --gpu-request-type whole \
    --gpu-request 1 \
    --cpu-request 8 \
    --memory-request 16Gi \
    --volume /data/datasets:/workspace/data \
    --command "python train_distributed.py --epochs 10"

TensorFlow Distributed Training:

runai distributed submit "tf-distributed" \
    --framework TensorFlow \
    --image tensorflow/tensorflow:latest-gpu \
    --workers 2 \
    --gpu-request-type whole \
    --gpu-request 1 \
    --command "python -m tf_cnn_benchmarks --model=resnet50 --batch_size=64"

Method 3: Using YAML Configuration

Create a distributed training specification file:

# distributed-training.yaml
apiVersion: run.ai/v1
kind: DistributedWorkload
metadata:
  name: pytorch-distributed-example
  namespace: runai-project-team-a
spec:
  framework: PyTorch
  workers: 2
  template:
    spec:
      containers:
      - name: pytorch-dist
        image: kubeflow/pytorch-dist-mnist:latest
        resources:
          requests:
            nvidia.com/gpu: 1
            cpu: "4"
            memory: "8Gi"
          limits:
            nvidia.com/gpu: 1
            cpu: "8"
            memory: "16Gi"
        command: ["python", "/workspace/mnist.py"]
        env:
        - name: WORLD_SIZE
          value: "2"
        - name: EPOCHS
          value: "10"

Submit the configuration:

kubectl apply -f distributed-training.yaml

Monitoring Distributed Training

1. Check Training Status

Via UI: - Navigate to your workload in the Run:AI dashboard - Monitor individual worker status - View aggregated logs and metrics

Via CLI:

# List distributed workloads
runai list

# Get detailed status
runai describe <workload-name>

# View logs from all workers
runai logs <workload-name> --all-workers

# View logs from specific worker
runai logs <workload-name> --worker 0

2. Monitor Training Progress

Real-time Metrics:

# Watch GPU utilization across workers
runai top <workload-name>

# Monitor training logs
runai logs <workload-name> --follow

Dashboard Metrics: - GPU utilization per worker - Memory usage across nodes - Training loss and accuracy (if logged) - Network bandwidth between workers

Example Training Scripts

PyTorch Distributed Data Parallel

# train_distributed.py
import torch
import torch.distributed as dist
import torch.nn as nn
import torch.optim as optim
from torch.nn.parallel import DistributedDataParallel as DDP
import os

def setup():
    """Initialize distributed training"""
    dist.init_process_group(backend='nccl')
    torch.cuda.set_device(int(os.environ['LOCAL_RANK']))

def cleanup():
    """Clean up distributed training"""
    dist.destroy_process_group()

def train():
    setup()

    # Get distributed training info
    rank = dist.get_rank()
    world_size = dist.get_world_size()
    device = torch.cuda.current_device()

    print(f"Worker {rank}/{world_size} using GPU {device}")

    # Create model and wrap with DDP
    model = YourModel().to(device)
    model = DDP(model, device_ids=[device])

    # Create optimizer
    optimizer = optim.Adam(model.parameters(), lr=0.001)

    # Training loop
    for epoch in range(num_epochs):
        for batch_idx, (data, target) in enumerate(train_loader):
            data, target = data.to(device), target.to(device)

            optimizer.zero_grad()
            output = model(data)
            loss = criterion(output, target)
            loss.backward()
            optimizer.step()

            if rank == 0 and batch_idx % 100 == 0:
                print(f'Epoch {epoch}, Batch {batch_idx}, Loss: {loss.item():.6f}')

    cleanup()

if __name__ == "__main__":
    train()

TensorFlow Distributed Strategy

# tf_distributed.py
import tensorflow as tf
import os

def create_strategy():
    """Create distributed strategy"""
    return tf.distribute.MultiWorkerMirroredStrategy()

def train_distributed():
    # Setup distribution strategy
    strategy = create_strategy()

    with strategy.scope():
        # Create model
        model = tf.keras.Sequential([
            tf.keras.layers.Dense(128, activation='relu'),
            tf.keras.layers.Dense(10, activation='softmax')
        ])

        # Compile model
        model.compile(
            optimizer='adam',
            loss='sparse_categorical_crossentropy',
            metrics=['accuracy']
        )

    # Load and prepare data
    dataset = load_your_dataset()

    # Train model
    model.fit(
        dataset,
        epochs=10,
        steps_per_epoch=100
    )

if __name__ == "__main__":
    train_distributed()

Best Practices

Resource Allocation

Start Small:

# Begin with 2 workers
runai distributed submit "test-distributed" \
    --workers 2 \
    --gpu-portion-request 0.25

Scale Gradually:

# Scale to 4 workers once validated
runai distributed submit "production-training" \
    --workers 4 \
    --gpu-request 1

Performance Optimization

Communication Backend: - Use NCCL for NVIDIA GPUs (default) - Use Gloo for CPU-only training

Batch Size Scaling:

# Scale batch size with number of workers
base_batch_size = 32
world_size = int(os.environ.get('WORLD_SIZE', 1))
effective_batch_size = base_batch_size * world_size

Gradient Accumulation:

# Accumulate gradients to maintain effective batch size
accumulation_steps = 4
for i, batch in enumerate(dataloader):
    loss = model(batch) / accumulation_steps
    loss.backward()

    if (i + 1) % accumulation_steps == 0:
        optimizer.step()
        optimizer.zero_grad()

Troubleshooting

Common Issues

Workers Not Communicating:

# Check network connectivity between nodes
runai exec <workload-name> -- ping <worker-ip>

# Verify NCCL environment
runai logs <workload-name> | grep NCCL

Out of Memory Errors:

# Reduce batch size per worker
--env BATCH_SIZE=16

# Use gradient checkpointing
--env GRADIENT_CHECKPOINTING=true

Slow Training:

# Check if data loading is the bottleneck
--env NUM_WORKERS=4

# Monitor GPU utilization
runai top <workload-name>

Debugging Commands

# Get detailed worker information
runai describe <workload-name> --show-events

# Check resource allocation
runai get workload <workload-name> -o yaml

# Test communication between workers
runai exec <workload-name> -- python -c "
import torch.distributed as dist
dist.init_process_group(backend='nccl')
print(f'Rank: {dist.get_rank()}, World Size: {dist.get_world_size()}')
"

Next Steps

Now that you can run distributed training:

  1. Experiment with Scaling: Try different numbers of workers
  2. Optimize Communication: Tune NCCL settings for your network
  3. Advanced Patterns: Explore model parallelism and pipeline parallelism
  4. Monitoring: Set up comprehensive training metrics