Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Model loading #25

Merged
merged 14 commits into from
Sep 24, 2024
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions submitit_train.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,22 @@

if __name__ == "__main__":
executor = submitit.AutoExecutor(folder="~/slurm_jobs/titan/job_%j")
n_gpus = 4
n_gpus = 8
executor.update_parameters(
name="titan", timeout_min=15,
name="titan", timeout_min=3 * 24 * 60,
gpus_per_node=n_gpus,
nodes=1, mem_gb=40, cpus_per_task=n_gpus * 2
nodes=1, mem_gb=80, cpus_per_task=n_gpus * 4,
slurm_additional_parameters={
"partition": "h100"
}
)

jobs = []
with executor.batch():
for _ in range(1):
# train_config = './train_configs/chemlactica_125m.toml'
train_config = './train_configs/chemlactica_125m.toml'
# train_config = './train_configs/chemlactica_1.3b.toml'
train_config = './train_configs/llama3_8b.toml'
# train_config = './train_configs/llama3_8b.toml'
# train_config = './train_configs/debug_model.toml'
function = submitit.helpers.CommandFunction([
'python3', '-m', 'torch.distributed.run',
Expand Down
18 changes: 12 additions & 6 deletions torchtitan/datasets/hf_datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@

import pickle
from typing import Any, Dict, List, Optional
from pathlib import Path
import glob
import os

import numpy as np

Expand Down Expand Up @@ -33,7 +36,8 @@
_supported_datasets = {
"c4_test": "test/assets/c4_test",
"c4": "allenai/c4",
"chemlactica_train_mini": "test/assets/chemlactica_train_mini"
"chemlactica_train_mini": "test/assets/chemlactica_train_mini",
"chemlactica_train": "/nfs/dgx/raid/chem/data/rdkit_computed_rel+form/train_rdkit_computed_rel+form"
}

_supported_data_processing_styles = {
Expand Down Expand Up @@ -111,13 +115,16 @@ def __init__(
# c4 is huge, and requires both streaming and language selection
# (we default to en)
ds = load_dataset(dataset_path, name="en", split="train", streaming=True)
else:
elif dataset_name == "c4_test":
ds = load_dataset(dataset_path, split="train")

else:
dataset_files = glob.glob(os.path.join(dataset_path, "*.jsonl"))
ds = load_dataset("text", data_files=dataset_files, split="train", streaming=True)
try:
data_processing_fn = _supported_data_processing_styles[data_processing_style]
except KeyError as e:
raise ValueError(f"Unsupported data processing style: {data_processing_style}")
# data_processing_fn = lambda x, e: str(x)

# TODO: support shuffling and checkpointing
self.dataset_name = dataset_name
Expand Down Expand Up @@ -217,9 +224,8 @@ class DPAwareDataLoader(StatefulDataLoader, Stateful):
"""
A wrapper around the StatefulDataLoader that ensures that the state is stored only once per DP rank.
"""

def __init__(self, dp_rank: int, hf_ds: IterableDataset, batch_size: int, pin_memory: bool, num_workers: int):
super().__init__(hf_ds, batch_size)
super().__init__(hf_ds, batch_size, num_workers=num_workers)
self._dp_rank = dp_rank
self._rank_id = f"dp_rank_{dp_rank}"

Expand Down Expand Up @@ -251,7 +257,7 @@ def build_hf_data_loader(
rank,
infinite: bool = True,
pin_memory: bool = False,
num_workers: int = 0,
num_workers: int = 2,
special_mode = None,
context = "train",
):
Expand Down
6 changes: 6 additions & 0 deletions torchtitan/logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ def init_logger(log_level):
# suppress verbose torch.profiler logging
os.environ["KINETO_LOG_LEVEL"] = "5"

# enable dataloading logging for logging the type of dataloading used
enable_dataloader_logging()
philippguevorguian marked this conversation as resolved.
Show resolved Hide resolved


class LogLevel(Enum):
DEBUG = "DEBUG"
Expand All @@ -46,3 +49,6 @@ def from_string(cls, value: str):
def validate_log_level(value):
return LogLevel.from_string(value)


def enable_dataloader_logging():
logging.getLogger('datasets.iterable_dataset').setLevel(logging.INFO)
2 changes: 1 addition & 1 deletion torchtitan/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ def build_metric_logger(
"""
dump_dir = job_config.job.dump_folder
aim_config = job_config.metrics
save_aim_folder = aim_config.save_aim_folder
save_aim_folder = os.path.join(job_config.job.dump_folder, aim_config.save_aim_folder)
philippguevorguian marked this conversation as resolved.
Show resolved Hide resolved
# since we don't have run id, use current minute as the identifier
datetime_str = datetime.now().strftime("%Y%m%d-%H%M")
log_dir = os.path.join(dump_dir, datetime_str)
Expand Down
5 changes: 2 additions & 3 deletions torchtitan/models/opt/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,17 @@ class ModelArgs:
n_heads: int = 12
n_kv_heads: Optional[int] = None
vocab_size: int = -1 # defined later by tokenizer
multiple_of: int = 256 # make SwiGLU hidden layer size multiple of large power of 2
multiple_of: int = 256
ffn_dim_multiplier: Optional[float] = None
norm_eps: float = 1e-5
rope_theta: float = 10000
dropout_p: float = 0.1

max_batch_size: int = 32
max_seq_len: int = 2048
# If `True`, then each transformer block init uses its layer ID, and if
# `False`, each uses the total number of transformer blocks
depth_init: bool = True
norm_type: str = "layersnorm"
norm_type: str = "layernorm_bias"


def repeat_kv(x: torch.Tensor, n_rep: int) -> torch.Tensor:
Expand Down
2 changes: 1 addition & 1 deletion torchtitan/models/opt/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ def export_opt_weights(model: OPT, save_dir: str, token_embedding_size: int):
"""
write docs
"""
hf_model = OPTForCausalLM.from_pretrained(map_n_layers_to_model_name(model.n_layers))
hf_model = OPTForCausalLM.from_pretrained(map_n_layers_to_model_name(model.n_layers), tie_word_embeddings=False)
hf_model.resize_token_embeddings(new_num_tokens=token_embedding_size)
keys_mapping = get_hf_opt_state_dict_keys_mapping(model.n_layers)
state_dict = model.state_dict()
Expand Down
3 changes: 3 additions & 0 deletions torchtitan/tokenizers/tokenizer/custom.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,13 @@
# copied and adjusted from https://github.com/facebookresearch/llama/blob/main/llama/tokenizer.py

from typing import List
import os

from torchtitan.logging import logger
from transformers import AutoTokenizer

os.environ["TOKENIZER_PARALLELISM"] = "true"


class CustomTokenizer:
"""
Expand Down
3 changes: 2 additions & 1 deletion torchtitan/utils/dataset_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,14 @@ def load_jsonl_line(jsonl_line):

def chemlactica_style_data_processing(sample_json, rng):
try:
sample_json = json.loads(sample_json["text"])
compound = delete_empty_tags(sample_json)
sample_json = generate_formatted_string(
compound, rng
)
except Exception as e:
print(e)
sample_json = {}
sample_json = ""
return sample_json


Expand Down
11 changes: 7 additions & 4 deletions train.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import contextlib
import os
import time
import logging
from datetime import timedelta

import torch
Expand Down Expand Up @@ -284,12 +285,14 @@ def loss_fn(pred, labels):
# need to free to before bwd to avoid peaking memory
del pred
loss.backward()

for m in model_parts:
philippguevorguian marked this conversation as resolved.
Show resolved Hide resolved
torch.nn.utils.clip_grad_norm_(
m.parameters(), job_config.training.max_norm, foreach=True
)

if force_finish_train:
break
for m in model_parts:
torch.nn.utils.clip_grad_norm_(
m.parameters(), job_config.training.max_norm, foreach=True
)

# sync float8 amaxes and scales
float8_handler.sync_float8_amax_and_scale_history(model_parts)
Expand Down
31 changes: 17 additions & 14 deletions train_configs/chemlactica_1.3b.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,49 +15,52 @@ save_memory_snapshot_folder = "memory_snapshot"
[metrics]
log_freq = 1
enable_color_printing = true
enable_tensorboard = true
save_tb_folder = "tb"
enable_aim = true
save_aim_folder = "aim"

[model]
name = "opt"
flavor = "1.3B"
# norm_type = "layernorm_bias" # layernorm / np_layernorm / rmsnorm / compiled_rmsnorm / fused_rmsnorm
norm_type = "rmsnorm"
norm_type = "layernorm_bias" # layernorm / np_layernorm / rmsnorm / compiled_rmsnorm / fused_rmsnorm
# test tokenizer.model, for debug purpose only
tokenizer_path = "./test/assets/test_tiktoken.model"
# tokenizer_path = "./test/assets/test_tiktoken.model"
tokenizer_path = "./torchtitan/tokenizers/chemlactica-125m"

[optimizer]
name = "AdamW"
lr = 8e-4
lr = 1.0e-4

[training]
batch_size = 10
batch_size = 13
gradient_accumulation_steps = 9
seq_len = 2048
warmup_steps = 2 # lr scheduler warm up, normally 20% of the train steps
warmup_steps = 500 # lr scheduler warm up, normally 20% of the train steps
max_norm = 1.0 # grad norm clipping
steps = 50
steps = 18000
data_parallel_degree = -1
tensor_parallel_degree = 1
compile = false
dataset = "c4" # supported datasets: c4_test (2K), c4 (177M)
compile = true
# dataset = "c4" # supported datasets: c4_test (2K), c4 (177M)
# dataset = "chemlactica_train_mini" # supported datasets: c4_test (2K), c4 (177M), chemlactica_train_mini (4K)
dataset = "chemlactica_train"
data_process_style="chemlactica_style"

[experimental]
pipeline_parallel_degree = 1
enable_async_tensor_parallel = false

[checkpoint]
enable_checkpoint = true
create_seed_checkpoint = false
load_folder = "facebook/galactica-1.3b"
save_folder = "yerevann/chemlactica-1.3b"
interval_type = "steps"
interval = 100
interval = 2000
model_weights_only = false
export_dtype = "float32"
async_mode = "async_with_pinned_mem" # ["disabled", "async", "async_with_pinned_mem"]

[activation_checkpoint]
mode = 'selective' # ['none', 'selective', 'full']
mode = 'none' # ['none', 'selective', 'full']
selective_ac_option = '2' # 'int' = ac every positive int layer or 'op', ac based on ops policy

[float8]
Expand Down
20 changes: 11 additions & 9 deletions train_configs/chemlactica_125m.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ save_memory_snapshot_folder = "memory_snapshot"
[metrics]
log_freq = 1
enable_color_printing = true
enable_aim = false
enable_aim = true
save_aim_folder = "aim"
#aim_hash = "c6b4d8b340f74287b82ef928"
#aim_experiment_name = "hello"
Expand All @@ -29,19 +29,21 @@ tokenizer_path = "./torchtitan/tokenizers/chemlactica-125m"

[optimizer]
name = "AdamW"
lr = 8e-4
lr = 1.4e-3

[training]
batch_size = 1
batch_size = 20
gradient_accumulation_steps = 8
seq_len = 2048
warmup_steps = 2 # lr scheduler warm up, normally 20% of the train steps
warmup_steps = 500 # lr scheduler warm up, normally 20% of the train steps
max_norm = 1.0 # grad norm clipping
steps = 50
steps = 18000
data_parallel_degree = -1
tensor_parallel_degree = 1
compile = false
compile = true
# dataset = "c4" # supported datasets: c4_test (2K), c4 (177M)
dataset = "chemlactica_train_mini" # supported datasets: c4_test (2K), c4 (177M), chemlactica_train_mini (4K)
# dataset = "chemlactica_train_mini" # supported datasets: c4_test (2K), c4 (177M), chemlactica_train_mini (4K)
dataset = "chemlactica_train"
data_process_style="chemlactica_style"

[experimental]
Expand All @@ -54,13 +56,13 @@ create_seed_checkpoint = false
load_folder = "facebook/galactica-125m"
save_folder = "yerevann/chemlactica-125m"
interval_type = "steps"
interval = 100
interval = 2000
model_weights_only = false
export_dtype = "float32"
async_mode = "async_with_pinned_mem" # ["disabled", "async", "async_with_pinned_mem"]

[activation_checkpoint]
mode = 'selective' # ['none', 'selective', 'full']
mode = 'none' # ['none', 'selective', 'full']
selective_ac_option = '2' # 'int' = ac every positive int layer or 'op', ac based on ops policy

[float8]
Expand Down
72 changes: 72 additions & 0 deletions train_configs/chemlactica_debug.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
# torchtitan Config.toml

[job]
dump_folder = "/nfs/dgx/raid/chem/titan_outputs"
description = "Galactica training"
use_for_integration_test = false

[profiling]
enable_profiling = false
save_traces_folder = "profile_trace"
profile_freq = 10
enable_memory_snapshot = false
save_memory_snapshot_folder = "memory_snapshot"

[metrics]
log_freq = 1
enable_color_printing = true
enable_aim = false
save_aim_folder = "aim"
#aim_hash = "c6b4d8b340f74287b82ef928"
#aim_experiment_name = "hello"

[model]
name = "opt"
flavor = "debugmodel"
norm_type = "layernorm_bias" # layernorm / np_layernorm / rmsnorm / compiled_rmsnorm / fused_rmsnorm
# test tokenizer.model, for debug purpose only
tokenizer_path = "./torchtitan/tokenizers/chemlactica-125m"

[optimizer]
name = "AdamW"
lr = 1.4e-3

[training]
batch_size = 1
gradient_accumulation_steps = 1
seq_len = 2048
warmup_steps = 500 # lr scheduler warm up, normally 20% of the train steps
max_norm = 1.0 # grad norm clipping
steps = 50
data_parallel_degree = -1
tensor_parallel_degree = 1
compile = true
# dataset = "c4" # supported datasets: c4_test (2K), c4 (177M)
dataset = "chemlactica_train_mini" # supported datasets: c4_test (2K), c4 (177M), chemlactica_train_mini (4K)
# dataset = "chemlactica_train"
data_process_style="chemlactica_style"

[dataloader]
num_workers = 1

[experimental]
pipeline_parallel_degree = 1
enable_async_tensor_parallel = false

[checkpoint]
enable_checkpoint = false
create_seed_checkpoint = false
load_folder = "facebook/galactica-125m"
save_folder = "yerevann/chemlactica-125m"
interval_type = "steps"
interval = 2000
model_weights_only = false
export_dtype = "float32"
async_mode = "async_with_pinned_mem" # ["disabled", "async", "async_with_pinned_mem"]

[activation_checkpoint]
mode = 'none' # ['none', 'selective', 'full']
selective_ac_option = '2' # 'int' = ac every positive int layer or 'op', ac based on ops policy

[float8]
enable_float8_linear = false
Loading
Loading