diff --git a/dicee/config.py b/dicee/config.py index 22cd7844..5c5e69a8 100644 --- a/dicee/config.py +++ b/dicee/config.py @@ -85,7 +85,6 @@ def __init__(self, **kwargs): self.label_smoothing_rate: float = 0.0 - self.num_core: int = 0 """Number of CPUs to be used in the mini-batch loading process""" @@ -139,6 +138,9 @@ def __init__(self, **kwargs): self.continual_learning=None "Path of a pretrained model size of LLM" + self.auto_batch_finding=False + "A flag for using auto batch finding" + def __iter__(self): # Iterate for k, v in self.__dict__.items(): diff --git a/dicee/scripts/run.py b/dicee/scripts/run.py index 390d25d0..20ad784f 100755 --- a/dicee/scripts/run.py +++ b/dicee/scripts/run.py @@ -123,6 +123,9 @@ def get_default_arguments(description=None): parser.add_argument("--swa", action="store_true", help="Stochastic weight averaging") + parser.add_argument("--auto_batch_finding", + action="store_true", + help="Find a batch size fitting in GPUs. Only available for TP trainer") parser.add_argument('--degree', type=int, default=0, help='degree for polynomial embeddings') diff --git a/dicee/static_funcs.py b/dicee/static_funcs.py index e6b20168..7627fa13 100644 --- a/dicee/static_funcs.py +++ b/dicee/static_funcs.py @@ -684,16 +684,15 @@ def download_pretrained_model(url: str) -> str: download_files_from_url(url_to_download_from, destination_folder=dir_name) return dir_name -def write_csv_from_model_parallel(path: str) -> None: +def write_csv_from_model_parallel(path: str) : """Create""" assert os.path.exists(path), "Path does not exist" - # Detect files that start with model_ and end with .pt model_files = [f for f in os.listdir(path) if f.startswith("model_") and f.endswith(".pt")] model_files.sort() # Sort to maintain order if necessary (e.g., model_0.pt, model_1.pt) - entity_csv_path = os.path.join(path, "entity_embeddings.csv") - relation_csv_path = os.path.join(path, "relation_embeddings.csv") + entity_embeddings=[] + relation_embeddings=[] # Process each model file for model_file in model_files: @@ -702,65 +701,50 @@ def write_csv_from_model_parallel(path: str) -> None: model = torch.load(model_path) # Assuming model has a get_embeddings method entity_emb, relation_emb = model["_orig_mod.entity_embeddings.weight"], model["_orig_mod.relation_embeddings.weight"] - # Convert to numpy - entity_emb = entity_emb.numpy() - relation_emb = relation_emb.numpy() - - # Write or append to CSV - if not os.path.exists(entity_csv_path) or not os.path.exists(relation_csv_path): - # If CSV files do not exist, create them - pd.DataFrame(entity_emb).to_csv(entity_csv_path, index=True, header=False) - pd.DataFrame(relation_emb).to_csv(relation_csv_path, index=True, header=False) - else: - # If CSV files exist, concatenate to the existing rows - existing_entity_df = pd.read_csv(entity_csv_path, header=None) - existing_relation_df = pd.read_csv(relation_csv_path, header=None) + entity_embeddings.append(entity_emb) + relation_embeddings.append(relation_emb) - # Concatenate along the columns (axis=1) - new_entity_df = pd.concat([existing_entity_df, pd.DataFrame(entity_emb)], axis=1) - new_relation_df = pd.concat([existing_relation_df, pd.DataFrame(relation_emb)], axis=1) + return torch.cat(entity_embeddings, dim=1), torch.cat(relation_embeddings, dim=1) - # Write the updated data back to the CSV files - new_entity_df.to_csv(entity_csv_path, index=False, header=False) - new_relation_df.to_csv(relation_csv_path, index=False, header=False) def from_pretrained_model_write_embeddings_into_csv(path: str) -> None: """ """ assert os.path.exists(path), "Path does not exist" config = load_json(path + '/configuration.json') - if config["trainer"]=="MP": - write_csv_from_model_parallel(path) + entity_csv_path = os.path.join(path, f"{config['model']}_entity_embeddings.csv") + relation_csv_path = os.path.join(path, f"{config['model']}_relation_embeddings.csv") + + if config["trainer"]=="TP": + entity_emb, relation_emb = write_csv_from_model_parallel(path) else: - entity_csv_path = os.path.join(path, f"{config['model']}_entity_embeddings.csv") - relation_csv_path = os.path.join(path, f"{config['model']}_relation_embeddings.csv") # Load model model = torch.load(os.path.join(path, "model.pt")) # Assuming model has a get_embeddings method entity_emb, relation_emb = model["entity_embeddings.weight"], model["relation_embeddings.weight"] - str_entity = pd.read_csv(f"{path}/entity_to_idx.csv", index_col=0)["entity"] - assert str_entity.index.is_monotonic_increasing - str_entity=str_entity.to_list() - # Write entity embeddings with headers and indices - with open(entity_csv_path, "w", newline="") as f: - writer = csv.writer(f) - # Add header (e.g., "", "0", "1", ..., "N") - headers = [""] + [f"{i}" for i in range(entity_emb.size(1))] - writer.writerow(headers) - # Add rows with index - for i_row, (name,row) in enumerate(zip(str_entity,entity_emb)): - writer.writerow([name] + row.tolist()) - str_relations = pd.read_csv(f"{path}/relation_to_idx.csv", index_col=0)["relation"] - assert str_relations.index.is_monotonic_increasing - - # Write relation embeddings with headers and indices - with open(relation_csv_path, "w", newline="") as f: - writer = csv.writer(f) - # Add header (e.g., "", "0", "1", ..., "N") - headers = [""] + [f"{i}" for i in range(relation_emb.size(1))] - writer.writerow(headers) - # Add rows with index - for i_row, (name, row) in enumerate(zip(str_relations,relation_emb)): - writer.writerow([name]+ row.tolist()) + str_entity = pd.read_csv(f"{path}/entity_to_idx.csv", index_col=0)["entity"] + assert str_entity.index.is_monotonic_increasing + str_entity=str_entity.to_list() + # Write entity embeddings with headers and indices + with open(entity_csv_path, "w", newline="") as f: + writer = csv.writer(f) + # Add header (e.g., "", "0", "1", ..., "N") + headers = [""] + [f"{i}" for i in range(entity_emb.size(1))] + writer.writerow(headers) + # Add rows with index + for i_row, (name,row) in enumerate(zip(str_entity,entity_emb)): + writer.writerow([name] + row.tolist()) + str_relations = pd.read_csv(f"{path}/relation_to_idx.csv", index_col=0)["relation"] + assert str_relations.index.is_monotonic_increasing + + # Write relation embeddings with headers and indices + with open(relation_csv_path, "w", newline="") as f: + writer = csv.writer(f) + # Add header (e.g., "", "0", "1", ..., "N") + headers = [""] + [f"{i}" for i in range(relation_emb.size(1))] + writer.writerow(headers) + # Add rows with index + for i_row, (name, row) in enumerate(zip(str_relations,relation_emb)): + writer.writerow([name]+ row.tolist()) """ diff --git a/dicee/trainer/model_parallelism.py b/dicee/trainer/model_parallelism.py index ffea1a46..9b10941e 100644 --- a/dicee/trainer/model_parallelism.py +++ b/dicee/trainer/model_parallelism.py @@ -29,8 +29,8 @@ def find_good_batch_size(train_loader,ensemble_model, max_available_gpu_memory:f if batch_size >= len(train_loader.dataset): return batch_size first_batch_size = train_loader.batch_size - - print("Automatic batch size finding") + num_datapoints=len(train_loader.dataset) + print(f"Increment the batch size by {first_batch_size} until the Free/Total GPU memory is reached to {1-max_available_gpu_memory} or batch_size={num_datapoints} is achieved.") while True: # () Initialize a dataloader with a current batch_size train_dataloaders = torch.utils.data.DataLoader(train_loader.dataset, @@ -52,15 +52,16 @@ def find_good_batch_size(train_loader,ensemble_model, max_available_gpu_memory:f avg_global_free_memory.append(global_free_memory / total_memory) if i==3: break - avg_global_free_memory=sum(avg_global_free_memory)/len(avg_global_free_memory) print(f"Random Batch Loss: {loss}\tFree/Total GPU Memory: {avg_global_free_memory}\tBatch Size:{batch_size}") - # () Stepping criterion - if avg_global_free_memory > max_available_gpu_memory and batch_size < len(train_loader.dataset) : - # Increment the current batch size - batch_size+=first_batch_size + if avg_global_free_memory > max_available_gpu_memory and batch_size < num_datapoints : + if batch_size+first_batch_size <= num_datapoints: + batch_size+=first_batch_size + else: + batch_size=num_datapoints else: - if batch_size >= len(train_loader.dataset): + assert batch_size<=num_datapoints + if batch_size == num_datapoints: print("Batch size equals to the training dataset size") else: print(f"Max GPU memory used\tFree/Total GPU Memory:{avg_global_free_memory}") @@ -88,6 +89,7 @@ class TensorParallel(AbstractTrainer): def __init__(self, args, callbacks): super().__init__(args, callbacks) self.models=[] + def get_ensemble(self): return self.models @@ -104,18 +106,20 @@ def fit(self, *args, **kwargs): # () train_dataloader = kwargs['train_dataloaders'] # () - train_dataloader = torch.utils.data.DataLoader(train_dataloader.dataset, - batch_size=find_good_batch_size(train_dataloader, ensemble_model), - shuffle=True, - sampler=None, - batch_sampler=None, - num_workers=self.attributes.num_core, - collate_fn=train_dataloader.dataset.collate_fn, - pin_memory=False, - drop_last=False, - timeout=0, - worker_init_fn=None, - persistent_workers=False) + if self.attributes.auto_batch_finding: + train_dataloader = torch.utils.data.DataLoader(train_dataloader.dataset, + batch_size=find_good_batch_size(train_dataloader, ensemble_model), + shuffle=True, + sampler=None, + batch_sampler=None, + num_workers=self.attributes.num_core, + collate_fn=train_dataloader.dataset.collate_fn, + pin_memory=False, + drop_last=False, + timeout=0, + worker_init_fn=None, + persistent_workers=False) + num_of_batches = len(train_dataloader) # () Start training. for epoch in (tqdm_bar := make_iterable_verbose(range(self.attributes.num_epochs), diff --git a/tests/test_saving_embeddings.py b/tests/test_saving_embeddings.py index 0534b3c4..f83a03e0 100644 --- a/tests/test_saving_embeddings.py +++ b/tests/test_saving_embeddings.py @@ -2,6 +2,7 @@ from dicee.static_funcs import from_pretrained_model_write_embeddings_into_csv from dicee.executer import Execute from dicee.config import Namespace +import torch class TestSavingEmbeddings: def test_saving_embeddings(self): @@ -23,9 +24,10 @@ def test_saving_embeddings(self): def test_model_parallel_saving_embeddings(self): # (1) Train a KGE model - import torch - if torch.cuda.is_available(): + from dicee.config import Namespace + from dicee.executer import Execute + import os args = Namespace() args.model = 'Keci' args.p = 0 @@ -34,9 +36,12 @@ def test_model_parallel_saving_embeddings(self): args.scoring_technique = "KvsAll" args.path_single_kg = "KGs/Family/family-benchmark_rich_background.owl" args.backend = "rdflib" + args.trainer = "TP" args.num_epochs = 1 args.batch_size = 1024 args.lr = 0.1 - args.embedding_dim = 512 + args.embedding_dim = 32 + args.save_embeddings_as_csv = True result = Execute(args).start() - from_pretrained_model_write_embeddings_into_csv(result["path_experiment_folder"]) \ No newline at end of file + assert os.path.exists(result["path_experiment_folder"] + "/Keci_entity_embeddings.csv") + assert os.path.exists(result["path_experiment_folder"] + "/Keci_relation_embeddings.csv")