Run Your First Distributed Training¶
This quickstart demonstrates how to run distributed training workloads in Run:AI. Distributed training allows you to split model training across multiple GPUs and nodes, dramatically reducing training time for large models.
What is Distributed Training?¶
Distributed training is the ability to split the training of a model among multiple processors. Multiple workers operate in parallel to accelerate model training, enabling:
- Faster Training: Reduce training time from days to hours
- Larger Models: Train models that don't fit on a single GPU
- Better Resource Utilization: Leverage multiple GPUs efficiently
Prerequisites¶
Before starting, ensure you have: - A project with at least 2 GPU quota (for multi-worker training) - Access to the Run:AI user interface or CLI - Basic understanding of distributed training concepts
Method 1: Using the UI¶
1. Access the Run:AI Interface¶
Log in to your Run:AI user interface through your web browser.
2. Create a New Training Workload¶
- Navigate to Workload manager → Workloads
- Click "+NEW WORKLOAD"
- Select "Training" from the workload types
3. Configure Basic Settings¶
- Select Cluster and Project: Choose your target cluster and project
- Training Architecture: Select "Distributed"
- Framework: Choose "PyTorch" (most common) or "TensorFlow"
- Template: Select "Start from Scratch" or use an existing template
4. Environment Configuration¶
Container Image: Use a distributed training ready image:
# PyTorch distributed examples
kubeflow/pytorch-dist-mnist:latest
nvcr.io/nvidia/pytorch:23.10-py3
# TensorFlow distributed examples
tensorflow/tensorflow:latest-gpu
nvcr.io/nvidia/tensorflow:23.10-tf2-py3
5. Distributed Configuration¶
Worker Configuration:
- Number of Workers: Start with 2
workers
- Master Node: Automatically configured
- Worker Nodes: Will be distributed across available GPUs
Resource Allocation per Worker:
6. Submit the Training Job¶
- Review your distributed configuration
- Click "CREATE WORKLOAD"
- Monitor training progress in the workload manager
Method 2: Using the CLI¶
1. Login to Run:AI¶
2. Submit Distributed Training Job¶
Basic PyTorch Distributed Training:
runai distributed submit "pytorch-dist-mnist" \
--framework PyTorch \
--image kubeflow/pytorch-dist-mnist:latest \
--workers 2 \
--gpu-request-type portion \
--gpu-portion-request 0.5 \
--cpu-request 2 \
--memory-request 4Gi
Advanced Configuration:
runai distributed submit "bert-distributed" \
--framework PyTorch \
--image huggingface/transformers-pytorch-gpu:latest \
--workers 4 \
--gpu-request-type whole \
--gpu-request 1 \
--cpu-request 8 \
--memory-request 16Gi \
--volume /data/datasets:/workspace/data \
--command "python train_distributed.py --epochs 10"
TensorFlow Distributed Training:
runai distributed submit "tf-distributed" \
--framework TensorFlow \
--image tensorflow/tensorflow:latest-gpu \
--workers 2 \
--gpu-request-type whole \
--gpu-request 1 \
--command "python -m tf_cnn_benchmarks --model=resnet50 --batch_size=64"
Method 3: Using YAML Configuration¶
Create a distributed training specification file:
# distributed-training.yaml
apiVersion: run.ai/v1
kind: DistributedWorkload
metadata:
name: pytorch-distributed-example
namespace: runai-project-team-a
spec:
framework: PyTorch
workers: 2
template:
spec:
containers:
- name: pytorch-dist
image: kubeflow/pytorch-dist-mnist:latest
resources:
requests:
nvidia.com/gpu: 1
cpu: "4"
memory: "8Gi"
limits:
nvidia.com/gpu: 1
cpu: "8"
memory: "16Gi"
command: ["python", "/workspace/mnist.py"]
env:
- name: WORLD_SIZE
value: "2"
- name: EPOCHS
value: "10"
Submit the configuration:
Monitoring Distributed Training¶
1. Check Training Status¶
Via UI: - Navigate to your workload in the Run:AI dashboard - Monitor individual worker status - View aggregated logs and metrics
Via CLI:
# List distributed workloads
runai list
# Get detailed status
runai describe <workload-name>
# View logs from all workers
runai logs <workload-name> --all-workers
# View logs from specific worker
runai logs <workload-name> --worker 0
2. Monitor Training Progress¶
Real-time Metrics:
# Watch GPU utilization across workers
runai top <workload-name>
# Monitor training logs
runai logs <workload-name> --follow
Dashboard Metrics: - GPU utilization per worker - Memory usage across nodes - Training loss and accuracy (if logged) - Network bandwidth between workers
Example Training Scripts¶
PyTorch Distributed Data Parallel¶
# train_distributed.py
import torch
import torch.distributed as dist
import torch.nn as nn
import torch.optim as optim
from torch.nn.parallel import DistributedDataParallel as DDP
import os
def setup():
"""Initialize distributed training"""
dist.init_process_group(backend='nccl')
torch.cuda.set_device(int(os.environ['LOCAL_RANK']))
def cleanup():
"""Clean up distributed training"""
dist.destroy_process_group()
def train():
setup()
# Get distributed training info
rank = dist.get_rank()
world_size = dist.get_world_size()
device = torch.cuda.current_device()
print(f"Worker {rank}/{world_size} using GPU {device}")
# Create model and wrap with DDP
model = YourModel().to(device)
model = DDP(model, device_ids=[device])
# Create optimizer
optimizer = optim.Adam(model.parameters(), lr=0.001)
# Training loop
for epoch in range(num_epochs):
for batch_idx, (data, target) in enumerate(train_loader):
data, target = data.to(device), target.to(device)
optimizer.zero_grad()
output = model(data)
loss = criterion(output, target)
loss.backward()
optimizer.step()
if rank == 0 and batch_idx % 100 == 0:
print(f'Epoch {epoch}, Batch {batch_idx}, Loss: {loss.item():.6f}')
cleanup()
if __name__ == "__main__":
train()
TensorFlow Distributed Strategy¶
# tf_distributed.py
import tensorflow as tf
import os
def create_strategy():
"""Create distributed strategy"""
return tf.distribute.MultiWorkerMirroredStrategy()
def train_distributed():
# Setup distribution strategy
strategy = create_strategy()
with strategy.scope():
# Create model
model = tf.keras.Sequential([
tf.keras.layers.Dense(128, activation='relu'),
tf.keras.layers.Dense(10, activation='softmax')
])
# Compile model
model.compile(
optimizer='adam',
loss='sparse_categorical_crossentropy',
metrics=['accuracy']
)
# Load and prepare data
dataset = load_your_dataset()
# Train model
model.fit(
dataset,
epochs=10,
steps_per_epoch=100
)
if __name__ == "__main__":
train_distributed()
Best Practices¶
Resource Allocation¶
Start Small:
# Begin with 2 workers
runai distributed submit "test-distributed" \
--workers 2 \
--gpu-portion-request 0.25
Scale Gradually:
# Scale to 4 workers once validated
runai distributed submit "production-training" \
--workers 4 \
--gpu-request 1
Performance Optimization¶
Communication Backend: - Use NCCL for NVIDIA GPUs (default) - Use Gloo for CPU-only training
Batch Size Scaling:
# Scale batch size with number of workers
base_batch_size = 32
world_size = int(os.environ.get('WORLD_SIZE', 1))
effective_batch_size = base_batch_size * world_size
Gradient Accumulation:
# Accumulate gradients to maintain effective batch size
accumulation_steps = 4
for i, batch in enumerate(dataloader):
loss = model(batch) / accumulation_steps
loss.backward()
if (i + 1) % accumulation_steps == 0:
optimizer.step()
optimizer.zero_grad()
Troubleshooting¶
Common Issues¶
Workers Not Communicating:
# Check network connectivity between nodes
runai exec <workload-name> -- ping <worker-ip>
# Verify NCCL environment
runai logs <workload-name> | grep NCCL
Out of Memory Errors:
# Reduce batch size per worker
--env BATCH_SIZE=16
# Use gradient checkpointing
--env GRADIENT_CHECKPOINTING=true
Slow Training:
# Check if data loading is the bottleneck
--env NUM_WORKERS=4
# Monitor GPU utilization
runai top <workload-name>
Debugging Commands¶
# Get detailed worker information
runai describe <workload-name> --show-events
# Check resource allocation
runai get workload <workload-name> -o yaml
# Test communication between workers
runai exec <workload-name> -- python -c "
import torch.distributed as dist
dist.init_process_group(backend='nccl')
print(f'Rank: {dist.get_rank()}, World Size: {dist.get_world_size()}')
"
Next Steps¶
Now that you can run distributed training:
- Experiment with Scaling: Try different numbers of workers
- Optimize Communication: Tune NCCL settings for your network
- Advanced Patterns: Explore model parallelism and pipeline parallelism
- Monitoring: Set up comprehensive training metrics
Related Guides¶
- GPU Fractions Tutorial - Optimize resource usage
- Standard Training Quickstart - Single GPU training
- Preemption Exercise - Understanding workload priorities