Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tensor parallel #276

Merged
merged 2 commits into from
Nov 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion dicee/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ def __init__(self, **kwargs):

self.label_smoothing_rate: float = 0.0


self.num_core: int = 0
"""Number of CPUs to be used in the mini-batch loading process"""

Expand Down Expand Up @@ -139,6 +138,9 @@ def __init__(self, **kwargs):
self.continual_learning=None
"Path of a pretrained model size of LLM"

self.auto_batch_finding=False
"A flag for using auto batch finding"

def __iter__(self):
# Iterate
for k, v in self.__dict__.items():
Expand Down
3 changes: 3 additions & 0 deletions dicee/scripts/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,9 @@ def get_default_arguments(description=None):
parser.add_argument("--swa",
action="store_true",
help="Stochastic weight averaging")
parser.add_argument("--auto_batch_finding",
action="store_true",
help="Find a batch size fitting in GPUs. Only available for TP trainer")
parser.add_argument('--degree', type=int, default=0,
help='degree for polynomial embeddings')

Expand Down
86 changes: 35 additions & 51 deletions dicee/static_funcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -684,16 +684,15 @@ def download_pretrained_model(url: str) -> str:
download_files_from_url(url_to_download_from, destination_folder=dir_name)
return dir_name

def write_csv_from_model_parallel(path: str) -> None:
def write_csv_from_model_parallel(path: str) :
"""Create"""
assert os.path.exists(path), "Path does not exist"

# Detect files that start with model_ and end with .pt
model_files = [f for f in os.listdir(path) if f.startswith("model_") and f.endswith(".pt")]
model_files.sort() # Sort to maintain order if necessary (e.g., model_0.pt, model_1.pt)

entity_csv_path = os.path.join(path, "entity_embeddings.csv")
relation_csv_path = os.path.join(path, "relation_embeddings.csv")
entity_embeddings=[]
relation_embeddings=[]

# Process each model file
for model_file in model_files:
Expand All @@ -702,65 +701,50 @@ def write_csv_from_model_parallel(path: str) -> None:
model = torch.load(model_path)
# Assuming model has a get_embeddings method
entity_emb, relation_emb = model["_orig_mod.entity_embeddings.weight"], model["_orig_mod.relation_embeddings.weight"]
# Convert to numpy
entity_emb = entity_emb.numpy()
relation_emb = relation_emb.numpy()

# Write or append to CSV
if not os.path.exists(entity_csv_path) or not os.path.exists(relation_csv_path):
# If CSV files do not exist, create them
pd.DataFrame(entity_emb).to_csv(entity_csv_path, index=True, header=False)
pd.DataFrame(relation_emb).to_csv(relation_csv_path, index=True, header=False)
else:
# If CSV files exist, concatenate to the existing rows
existing_entity_df = pd.read_csv(entity_csv_path, header=None)
existing_relation_df = pd.read_csv(relation_csv_path, header=None)
entity_embeddings.append(entity_emb)
relation_embeddings.append(relation_emb)

# Concatenate along the columns (axis=1)
new_entity_df = pd.concat([existing_entity_df, pd.DataFrame(entity_emb)], axis=1)
new_relation_df = pd.concat([existing_relation_df, pd.DataFrame(relation_emb)], axis=1)
return torch.cat(entity_embeddings, dim=1), torch.cat(relation_embeddings, dim=1)

# Write the updated data back to the CSV files
new_entity_df.to_csv(entity_csv_path, index=False, header=False)
new_relation_df.to_csv(relation_csv_path, index=False, header=False)

def from_pretrained_model_write_embeddings_into_csv(path: str) -> None:
""" """
assert os.path.exists(path), "Path does not exist"
config = load_json(path + '/configuration.json')
if config["trainer"]=="MP":
write_csv_from_model_parallel(path)
entity_csv_path = os.path.join(path, f"{config['model']}_entity_embeddings.csv")
relation_csv_path = os.path.join(path, f"{config['model']}_relation_embeddings.csv")

if config["trainer"]=="TP":
entity_emb, relation_emb = write_csv_from_model_parallel(path)
else:
entity_csv_path = os.path.join(path, f"{config['model']}_entity_embeddings.csv")
relation_csv_path = os.path.join(path, f"{config['model']}_relation_embeddings.csv")
# Load model
model = torch.load(os.path.join(path, "model.pt"))
# Assuming model has a get_embeddings method
entity_emb, relation_emb = model["entity_embeddings.weight"], model["relation_embeddings.weight"]
str_entity = pd.read_csv(f"{path}/entity_to_idx.csv", index_col=0)["entity"]
assert str_entity.index.is_monotonic_increasing
str_entity=str_entity.to_list()
# Write entity embeddings with headers and indices
with open(entity_csv_path, "w", newline="") as f:
writer = csv.writer(f)
# Add header (e.g., "", "0", "1", ..., "N")
headers = [""] + [f"{i}" for i in range(entity_emb.size(1))]
writer.writerow(headers)
# Add rows with index
for i_row, (name,row) in enumerate(zip(str_entity,entity_emb)):
writer.writerow([name] + row.tolist())
str_relations = pd.read_csv(f"{path}/relation_to_idx.csv", index_col=0)["relation"]
assert str_relations.index.is_monotonic_increasing

# Write relation embeddings with headers and indices
with open(relation_csv_path, "w", newline="") as f:
writer = csv.writer(f)
# Add header (e.g., "", "0", "1", ..., "N")
headers = [""] + [f"{i}" for i in range(relation_emb.size(1))]
writer.writerow(headers)
# Add rows with index
for i_row, (name, row) in enumerate(zip(str_relations,relation_emb)):
writer.writerow([name]+ row.tolist())
str_entity = pd.read_csv(f"{path}/entity_to_idx.csv", index_col=0)["entity"]
assert str_entity.index.is_monotonic_increasing
str_entity=str_entity.to_list()
# Write entity embeddings with headers and indices
with open(entity_csv_path, "w", newline="") as f:
writer = csv.writer(f)
# Add header (e.g., "", "0", "1", ..., "N")
headers = [""] + [f"{i}" for i in range(entity_emb.size(1))]
writer.writerow(headers)
# Add rows with index
for i_row, (name,row) in enumerate(zip(str_entity,entity_emb)):
writer.writerow([name] + row.tolist())
str_relations = pd.read_csv(f"{path}/relation_to_idx.csv", index_col=0)["relation"]
assert str_relations.index.is_monotonic_increasing

# Write relation embeddings with headers and indices
with open(relation_csv_path, "w", newline="") as f:
writer = csv.writer(f)
# Add header (e.g., "", "0", "1", ..., "N")
headers = [""] + [f"{i}" for i in range(relation_emb.size(1))]
writer.writerow(headers)
# Add rows with index
for i_row, (name, row) in enumerate(zip(str_relations,relation_emb)):
writer.writerow([name]+ row.tolist())

"""

Expand Down
44 changes: 24 additions & 20 deletions dicee/trainer/model_parallelism.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ def find_good_batch_size(train_loader,ensemble_model, max_available_gpu_memory:f
if batch_size >= len(train_loader.dataset):
return batch_size
first_batch_size = train_loader.batch_size

print("Automatic batch size finding")
num_datapoints=len(train_loader.dataset)
print(f"Increment the batch size by {first_batch_size} until the Free/Total GPU memory is reached to {1-max_available_gpu_memory} or batch_size={num_datapoints} is achieved.")
while True:
# () Initialize a dataloader with a current batch_size
train_dataloaders = torch.utils.data.DataLoader(train_loader.dataset,
Expand All @@ -52,15 +52,16 @@ def find_good_batch_size(train_loader,ensemble_model, max_available_gpu_memory:f
avg_global_free_memory.append(global_free_memory / total_memory)
if i==3:
break

avg_global_free_memory=sum(avg_global_free_memory)/len(avg_global_free_memory)
print(f"Random Batch Loss: {loss}\tFree/Total GPU Memory: {avg_global_free_memory}\tBatch Size:{batch_size}")
# () Stepping criterion
if avg_global_free_memory > max_available_gpu_memory and batch_size < len(train_loader.dataset) :
# Increment the current batch size
batch_size+=first_batch_size
if avg_global_free_memory > max_available_gpu_memory and batch_size < num_datapoints :
if batch_size+first_batch_size <= num_datapoints:
batch_size+=first_batch_size
else:
batch_size=num_datapoints
else:
if batch_size >= len(train_loader.dataset):
assert batch_size<=num_datapoints
if batch_size == num_datapoints:
print("Batch size equals to the training dataset size")
else:
print(f"Max GPU memory used\tFree/Total GPU Memory:{avg_global_free_memory}")
Expand Down Expand Up @@ -88,6 +89,7 @@ class TensorParallel(AbstractTrainer):
def __init__(self, args, callbacks):
super().__init__(args, callbacks)
self.models=[]

def get_ensemble(self):
return self.models

Expand All @@ -104,18 +106,20 @@ def fit(self, *args, **kwargs):
# ()
train_dataloader = kwargs['train_dataloaders']
# ()
train_dataloader = torch.utils.data.DataLoader(train_dataloader.dataset,
batch_size=find_good_batch_size(train_dataloader, ensemble_model),
shuffle=True,
sampler=None,
batch_sampler=None,
num_workers=self.attributes.num_core,
collate_fn=train_dataloader.dataset.collate_fn,
pin_memory=False,
drop_last=False,
timeout=0,
worker_init_fn=None,
persistent_workers=False)
if self.attributes.auto_batch_finding:
train_dataloader = torch.utils.data.DataLoader(train_dataloader.dataset,
batch_size=find_good_batch_size(train_dataloader, ensemble_model),
shuffle=True,
sampler=None,
batch_sampler=None,
num_workers=self.attributes.num_core,
collate_fn=train_dataloader.dataset.collate_fn,
pin_memory=False,
drop_last=False,
timeout=0,
worker_init_fn=None,
persistent_workers=False)

num_of_batches = len(train_dataloader)
# () Start training.
for epoch in (tqdm_bar := make_iterable_verbose(range(self.attributes.num_epochs),
Expand Down
13 changes: 9 additions & 4 deletions tests/test_saving_embeddings.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from dicee.static_funcs import from_pretrained_model_write_embeddings_into_csv
from dicee.executer import Execute
from dicee.config import Namespace
import torch

class TestSavingEmbeddings:
def test_saving_embeddings(self):
Expand All @@ -23,9 +24,10 @@ def test_saving_embeddings(self):

def test_model_parallel_saving_embeddings(self):
# (1) Train a KGE model
import torch

if torch.cuda.is_available():
from dicee.config import Namespace
from dicee.executer import Execute
import os
args = Namespace()
args.model = 'Keci'
args.p = 0
Expand All @@ -34,9 +36,12 @@ def test_model_parallel_saving_embeddings(self):
args.scoring_technique = "KvsAll"
args.path_single_kg = "KGs/Family/family-benchmark_rich_background.owl"
args.backend = "rdflib"
args.trainer = "TP"
args.num_epochs = 1
args.batch_size = 1024
args.lr = 0.1
args.embedding_dim = 512
args.embedding_dim = 32
args.save_embeddings_as_csv = True
result = Execute(args).start()
from_pretrained_model_write_embeddings_into_csv(result["path_experiment_folder"])
assert os.path.exists(result["path_experiment_folder"] + "/Keci_entity_embeddings.csv")
assert os.path.exists(result["path_experiment_folder"] + "/Keci_relation_embeddings.csv")