From 59f63a52763571e6b43b088304a8d799ec103a6c Mon Sep 17 00:00:00 2001 From: Ella Charlaix <80481427+echarlaix@users.noreply.github.com> Date: Fri, 3 Nov 2023 19:00:00 +0100 Subject: [PATCH] Fix transformers v4.35.0 compatibility (#471) * fix trainer * fix * format * fix version --- .github/workflows/test_inc.yml | 3 +- .../intel/neural_compressor/quantization.py | 60 +++- optimum/intel/neural_compressor/trainer.py | 321 +++++++++++------ optimum/intel/openvino/trainer.py | 332 ++++++++++++------ setup.py | 5 +- tests/neural_compressor/test_modeling.py | 3 + tests/neural_compressor/test_onnx.py | 2 +- 7 files changed, 508 insertions(+), 218 deletions(-) diff --git a/.github/workflows/test_inc.yml b/.github/workflows/test_inc.yml index fd5fd16509..3a15214f99 100644 --- a/.github/workflows/test_inc.yml +++ b/.github/workflows/test_inc.yml @@ -30,7 +30,8 @@ jobs: - name: Install dependencies run: | python -m pip install --upgrade pip - pip install .[neural-compressor,ipex,diffusers,tests] + pip install .[neural-compressor,diffusers,tests] + pip install intel-extension-for-pytorch - name: Test with Pytest run: | pytest tests/neural_compressor/ diff --git a/optimum/intel/neural_compressor/quantization.py b/optimum/intel/neural_compressor/quantization.py index 36f16524c2..d4846adc15 100644 --- a/optimum/intel/neural_compressor/quantization.py +++ b/optimum/intel/neural_compressor/quantization.py @@ -15,6 +15,7 @@ import copy import inspect import logging +import warnings from enum import Enum from itertools import chain from pathlib import Path @@ -30,16 +31,25 @@ from neural_compressor.quantization import fit from torch.utils.data import DataLoader, RandomSampler from transformers import ( + AutoModelForCausalLM, + AutoModelForMaskedLM, + AutoModelForMultipleChoice, + AutoModelForQuestionAnswering, + AutoModelForSeq2SeqLM, + AutoModelForSequenceClassification, + AutoModelForTokenClassification, + AutoModelForVision2Seq, DataCollator, PretrainedConfig, PreTrainedModel, + XLNetLMHeadModel, default_data_collator, ) from optimum.exporters import TasksManager from optimum.exporters.onnx import OnnxConfig from optimum.onnxruntime import ORTModel -from optimum.onnxruntime.modeling_decoder import ORTModelDecoder +from optimum.onnxruntime.modeling_decoder import ORTModelForCausalLM from optimum.onnxruntime.modeling_seq2seq import ORTModelForConditionalGeneration from optimum.onnxruntime.utils import ONNX_DECODER_NAME from optimum.quantization_base import OptimumQuantizer @@ -256,7 +266,7 @@ def quantize( if isinstance(self._original_model, ORTModelForConditionalGeneration): raise RuntimeError("ORTModelForConditionalGeneration not supported for quantization") - if isinstance(self._original_model, ORTModelDecoder): + if isinstance(self._original_model, ORTModelForCausalLM): model_or_path = self._original_model.onnx_paths if len(model_or_path) > 1: raise RuntimeError( @@ -528,3 +538,49 @@ def _apply_quantization_from_config(q_config: Dict, model: torch.nn.Module) -> t q_model = convert(q_model, mapping=q_mapping, inplace=True) return q_model + + +class IncQuantizedModel(INCModel): + @classmethod + def from_pretrained(cls, *args, **kwargs): + warnings.warn( + f"The class `{cls.__name__}` has been depreciated and will be removed in optimum-intel v1.12, please use " + f"`{cls.__name__.replace('IncQuantized', 'INC')}` instead." + ) + return super().from_pretrained(*args, **kwargs) + + +class IncQuantizedModelForQuestionAnswering(IncQuantizedModel): + auto_model_class = AutoModelForQuestionAnswering + + +class IncQuantizedModelForSequenceClassification(IncQuantizedModel): + auto_model_class = AutoModelForSequenceClassification + + +class IncQuantizedModelForTokenClassification(IncQuantizedModel): + auto_model_class = AutoModelForTokenClassification + + +class IncQuantizedModelForMultipleChoice(IncQuantizedModel): + auto_model_class = AutoModelForMultipleChoice + + +class IncQuantizedModelForSeq2SeqLM(IncQuantizedModel): + auto_model_class = AutoModelForSeq2SeqLM + + +class IncQuantizedModelForCausalLM(IncQuantizedModel): + auto_model_class = AutoModelForCausalLM + + +class IncQuantizedModelForMaskedLM(IncQuantizedModel): + auto_model_class = AutoModelForMaskedLM + + +class IncQuantizedModelForXLNetLM(IncQuantizedModel): + auto_model_class = XLNetLMHeadModel + + +class IncQuantizedModelForVision2Seq(IncQuantizedModel): + auto_model_class = AutoModelForVision2Seq diff --git a/optimum/intel/neural_compressor/trainer.py b/optimum/intel/neural_compressor/trainer.py index 8e8fec1758..918a2e4885 100644 --- a/optimum/intel/neural_compressor/trainer.py +++ b/optimum/intel/neural_compressor/trainer.py @@ -15,12 +15,21 @@ import copy import math import os +import shutil import sys import time from collections.abc import Mapping from itertools import chain from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Tuple, Union + +# Integrations must be imported before ML frameworks: +# isort: off +from transformers.integrations import hp_params +from transformers.integrations.deepspeed import deepspeed_init, deepspeed_load_checkpoint, is_deepspeed_available + +# isort: on + import datasets import torch import torch.distributed as dist @@ -28,38 +37,35 @@ from neural_compressor.compression import DistillationCallbacks from neural_compressor.conf.pythonic_config import _BaseQuantizationConfig from neural_compressor.experimental.export import torch_to_fp32_onnx, torch_to_int8_onnx - -# from packaging import version +from packaging import version from torch import nn from torch.utils.data import Dataset, RandomSampler -from torch.utils.data.dataloader import DataLoader -from torch.utils.data.distributed import DistributedSampler -from tqdm.auto import tqdm from transformers import Trainer from transformers.data.data_collator import DataCollator from transformers.debug_utils import DebugOption, DebugUnderflowOverflow -from transformers.deepspeed import deepspeed_init -from transformers.file_utils import WEIGHTS_NAME - -# Integrations must be imported before ML frameworks: -from transformers.integrations import hp_params from transformers.modeling_utils import PreTrainedModel, get_parameter_dtype, unwrap_model from transformers.models.auto.modeling_auto import MODEL_FOR_CAUSAL_LM_MAPPING_NAMES from transformers.pytorch_utils import is_torch_less_than_1_11 from transformers.tokenization_utils_base import PreTrainedTokenizerBase from transformers.trainer import TRAINER_STATE_NAME from transformers.trainer_callback import TrainerCallback, TrainerState -from transformers.trainer_pt_utils import IterableDatasetShard +from transformers.trainer_pt_utils import get_dataloader_sampler, get_model_param_count from transformers.trainer_utils import ( EvalPrediction, HPSearchBackend, - ShardedDDPOption, TrainOutput, has_length, speed_metrics, ) -from transformers.training_args import TrainingArguments -from transformers.utils import is_apex_available, is_sagemaker_mp_enabled, logging +from transformers.training_args import ParallelMode, TrainingArguments +from transformers.utils import ( + WEIGHTS_NAME, + is_accelerate_available, + is_apex_available, + is_sagemaker_mp_enabled, + is_torch_tpu_available, + logging, +) from optimum.exporters import TasksManager @@ -68,12 +74,31 @@ from .configuration import INCConfig +if is_accelerate_available(): + from accelerate import __version__ as accelerate_version + from accelerate import skip_first_batches + + if version.parse(accelerate_version) > version.parse("0.20.3"): + pass + DATA_SAMPLERS = [RandomSampler] + if version.parse(accelerate_version) > version.parse("0.23.0"): + from accelerate.data_loader import SeedableRandomSampler + + DATA_SAMPLERS += [SeedableRandomSampler] + + if is_deepspeed_available(): + pass + + if is_apex_available(): from apex import amp if is_sagemaker_mp_enabled(): import smdistributed.modelparallel.torch as smp +if is_torch_tpu_available(check_device=False): + import torch_xla.core.xla_model as xm + if TYPE_CHECKING: from optimum.exporters.onnx import OnnxConfig @@ -109,6 +134,8 @@ def __init__( task: Optional[str] = None, save_onnx_model: bool = False, ): + self.neftune_noise_alpha = None + super().__init__( model, args, @@ -178,7 +205,9 @@ def __init__( def _inner_training_loop( self, batch_size=None, args=None, resume_from_checkpoint=None, trial=None, ignore_keys_for_eval=None ): + self.accelerator.free_memory() self._train_batch_size = batch_size + logger.debug(f"Currently training with a batch size of: {self._train_batch_size}") # Data loader and number of training steps train_dataloader = self.get_train_dataloader() @@ -186,9 +215,10 @@ def _inner_training_loop( # number of training epochs: num_train_epochs # number of training steps per epoch: num_update_steps_per_epoch # total number of training steps to execute: max_steps - total_train_batch_size = args.train_batch_size * args.gradient_accumulation_steps * args.world_size + total_train_batch_size = self._train_batch_size * args.gradient_accumulation_steps * args.world_size len_dataloader = None + num_train_tokens = None if has_length(train_dataloader): len_dataloader = len(train_dataloader) num_update_steps_per_epoch = len_dataloader // args.gradient_accumulation_steps @@ -230,58 +260,106 @@ def _inner_training_loop( else: debug_overflow = DebugUnderflowOverflow(self.model) # noqa - delay_optimizer_creation = ( - self.sharded_ddp is not None - and self.sharded_ddp != ShardedDDPOption.SIMPLE - or is_sagemaker_mp_enabled() - or self.fsdp is not None - ) - if args.deepspeed: - deepspeed_engine, optimizer, lr_scheduler = deepspeed_init( - self, num_training_steps=max_steps, resume_from_checkpoint=resume_from_checkpoint - ) - self.model = deepspeed_engine.module - self.model_wrapped = deepspeed_engine - self.deepspeed = deepspeed_engine - self.optimizer = optimizer - self.lr_scheduler = lr_scheduler - elif not delay_optimizer_creation: + delay_optimizer_creation = is_sagemaker_mp_enabled() or self.fsdp is not None or self.is_fsdp_enabled + + if self.is_deepspeed_enabled: + self.optimizer, self.lr_scheduler = deepspeed_init(self, num_training_steps=max_steps) + + if not delay_optimizer_creation: self.create_optimizer_and_scheduler(num_training_steps=max_steps) self.state = TrainerState() self.state.is_hyper_param_search = trial is not None + # Compute absolute values for logging, eval, and save if given as ratio + if args.logging_steps is not None: + if args.logging_steps < 1: + self.state.logging_steps = math.ceil(max_steps * args.logging_steps) + else: + self.state.logging_steps = args.logging_steps + if args.eval_steps is not None: + if args.eval_steps < 1: + self.state.eval_steps = math.ceil(max_steps * args.eval_steps) + else: + self.state.eval_steps = args.eval_steps + if args.save_steps is not None: + if args.save_steps < 1: + self.state.save_steps = math.ceil(max_steps * args.save_steps) + else: + self.state.save_steps = args.save_steps + # Activate gradient checkpointing if needed if args.gradient_checkpointing: - self.model.gradient_checkpointing_enable() + if args.gradient_checkpointing_kwargs is None: + gradient_checkpointing_kwargs = {} + else: + gradient_checkpointing_kwargs = args.gradient_checkpointing_kwargs + + self.model.gradient_checkpointing_enable(gradient_checkpointing_kwargs=gradient_checkpointing_kwargs) model = self._wrap_model(self.model_wrapped) - if is_sagemaker_mp_enabled() and resume_from_checkpoint is not None: - self._load_from_checkpoint(resume_from_checkpoint, model) + # as the model is wrapped, don't use `accelerator.prepare` + # this is for unhandled cases such as + # FSDP-XLA, SageMaker MP/DP, DataParallel, IPEX + use_accelerator_prepare = True if model is self.model else False + + if delay_optimizer_creation: + if use_accelerator_prepare: + self.model = self.accelerator.prepare(self.model) + self.create_optimizer_and_scheduler(num_training_steps=max_steps) + + # prepare using `accelerator` prepare + if use_accelerator_prepare: + self.model.train() + if hasattr(self.lr_scheduler, "step"): + if self.use_apex: + model = self.accelerator.prepare(self.model) + else: + model, self.optimizer = self.accelerator.prepare(self.model, self.optimizer) + else: + # to handle cases wherein we pass "DummyScheduler" such as when it is specified in DeepSpeed config. + model, self.optimizer, self.lr_scheduler = self.accelerator.prepare( + self.model, self.optimizer, self.lr_scheduler + ) + + if self.is_fsdp_enabled: + self.model = self.model_wrapped = model # for the rest of this function `model` is the outside model, whether it was wrapped or not if model is not self.model: self.model_wrapped = model - if delay_optimizer_creation: - self.create_optimizer_and_scheduler(num_training_steps=max_steps) + # backward compatibility + if self.is_deepspeed_enabled: + self.deepspeed = self.model_wrapped + + # ckpt loading + if resume_from_checkpoint is not None: + if self.is_deepspeed_enabled: + deepspeed_load_checkpoint(self.model_wrapped, resume_from_checkpoint) + elif is_sagemaker_mp_enabled() or self.is_fsdp_enabled: + self._load_from_checkpoint(resume_from_checkpoint, self.model_wrapped) # Check if saved optimizer or scheduler states exist self._load_optimizer_and_scheduler(resume_from_checkpoint) # important: at this point: # self.model is the Transformers Model - # self.model_wrapped is DDP(Transformers Model), Deepspeed(Transformers Model), etc. + # self.model_wrapped is DDP(Transformers Model), Deepspeed(Transformers Model), + # FSDP(Transformers Model), Dynamo Optimized Module(Transformers Model) etc. # Train! logger.info("***** Running training *****") - logger.info(f" Num examples = {num_examples}") - logger.info(f" Num Epochs = {num_train_epochs}") - logger.info(f" Instantaneous batch size per device = {args.per_device_train_batch_size}") - logger.info(f" Total train batch size (w. parallel, distributed & accumulation) = {total_train_batch_size}") + logger.info(f" Num examples = {num_examples:,}") + logger.info(f" Num Epochs = {num_train_epochs:,}") + logger.info(f" Instantaneous batch size per device = {self.args.per_device_train_batch_size:,}") + if self.args.per_device_train_batch_size != self._train_batch_size: + logger.info(f" Training with DataParallel so batch size has been adjusted to: {self._train_batch_size:,}") + logger.info(f" Total train batch size (w. parallel, distributed & accumulation) = {total_train_batch_size:,}") logger.info(f" Gradient Accumulation steps = {args.gradient_accumulation_steps}") - logger.info(f" Total optimization steps = {max_steps}") + logger.info(f" Total optimization steps = {max_steps:,}") + logger.info(f" Number of trainable parameters = {get_model_param_count(model, trainable_only=True):,}") self.state.epoch = 0 start_time = time.time() @@ -306,20 +384,19 @@ def _inner_training_loop( logger.info(f" Continuing training from global step {self.state.global_step}") if not args.ignore_data_skip: logger.info( - f" Will skip the first {epochs_trained} epochs then the first {steps_trained_in_current_epoch} " - "batches in the first epoch. If this takes a lot of time, you can add the `--ignore_data_skip` " - "flag to your launch command, but you will resume the training on data already seen by your model." + f" Will skip the first {epochs_trained} epochs then the first" + f" {steps_trained_in_current_epoch} batches in the first epoch." ) - if self.is_local_process_zero() and not args.disable_tqdm: - steps_trained_progress_bar = tqdm(total=steps_trained_in_current_epoch) - steps_trained_progress_bar.set_description("Skipping the first batches") # Update the references self.callback_handler.model = self.model self.callback_handler.optimizer = self.optimizer self.callback_handler.lr_scheduler = self.lr_scheduler self.callback_handler.train_dataloader = train_dataloader - self.state.trial_name = self.hp_name(trial) if self.hp_name is not None else None + if self.hp_name is not None and self._trial is not None: + # use self._trial because the SigOpt/Optuna hpo only call `_hp_search_setup(trial)` instead of passing trial + # parameter to Train when using DDP. + self.state.trial_name = self.hp_name(self._trial) if trial is not None: assignments = trial.assignments if self.hp_search_backend == HPSearchBackend.SIGOPT else trial self.state.trial_params = hp_params(assignments) @@ -347,26 +424,26 @@ def _inner_training_loop( # Skip the first epochs_trained epochs to get the random state of the dataloader at the right point. if not args.ignore_data_skip: for epoch in range(epochs_trained): - is_random_sampler = hasattr(train_dataloader, "sampler") and isinstance( - train_dataloader.sampler, RandomSampler - ) + sampler = get_dataloader_sampler(train_dataloader) + sampler_kinds = [RandomSampler] + if version.parse(accelerate_version) > version.parse("0.23.0"): + sampler_kinds.append(SeedableRandomSampler) + is_random_sampler = isinstance(sampler, tuple(sampler_kinds)) if is_torch_less_than_1_11 or not is_random_sampler: # We just need to begin an iteration to create the randomization of the sampler. - # That was before PyTorch 1.11 however... for _ in train_dataloader: break else: # Otherwise we need to call the whooooole sampler cause there is some random operation added # AT THE VERY END! - _ = list(train_dataloader.sampler) + sampler = sampler if sampler is not None else [] + _ = list(sampler) + total_batched_samples = 0 for epoch in range(epochs_trained, num_train_epochs): - if isinstance(train_dataloader, DataLoader) and isinstance(train_dataloader.sampler, DistributedSampler): - train_dataloader.sampler.set_epoch(epoch) - elif hasattr(train_dataloader, "dataset") and isinstance(train_dataloader.dataset, IterableDatasetShard): - train_dataloader.dataset.set_epoch(epoch) - epoch_iterator = train_dataloader + if hasattr(epoch_iterator, "set_epoch"): + epoch_iterator.set_epoch(epoch) # Reset the past mems state at the beginning of each epoch if necessary. if args.past_index >= 0: @@ -385,8 +462,21 @@ def _inner_training_loop( if epoch == epochs_trained and resume_from_checkpoint is not None and steps_trained_in_current_epoch == 0: self._load_rng_state(resume_from_checkpoint) + rng_to_sync = False + steps_skipped = 0 + if steps_trained_in_current_epoch > 0: + epoch_iterator = skip_first_batches(epoch_iterator, steps_trained_in_current_epoch) + steps_skipped = steps_trained_in_current_epoch + steps_trained_in_current_epoch = 0 + rng_to_sync = True + step = -1 for step, inputs in enumerate(epoch_iterator): + total_batched_samples += 1 + if rng_to_sync: + self._load_rng_state(resume_from_checkpoint) + rng_to_sync = False + # Skip past any already trained steps if resuming training if steps_trained_in_current_epoch > 0: steps_trained_in_current_epoch -= 1 @@ -404,18 +494,14 @@ def _inner_training_loop( if self._compression_manager is not None: self._compression_manager.callbacks.on_step_begin(step) - if ( - ((step + 1) % args.gradient_accumulation_steps != 0) - and args.local_rank != -1 - and args._no_sync_in_gradient_accumulation - ): - # Avoid unnecessary DDP synchronization since there will be no backward pass on this example. - with model.no_sync(): - tr_loss_step = self.training_step(model, inputs) - else: + with self.accelerator.accumulate(model): tr_loss_step = self.training_step(model, inputs) - if args.logging_nan_inf_filter and (torch.isnan(tr_loss_step) or torch.isinf(tr_loss_step)): + if ( + args.logging_nan_inf_filter + and not is_torch_tpu_available() + and (torch.isnan(tr_loss_step) or torch.isinf(tr_loss_step)) + ): # if loss is nan or inf simply add the average of previous logged losses tr_loss += tr_loss / (1 + self.state.global_step - self._globalstep_last_logged) else: @@ -423,35 +509,38 @@ def _inner_training_loop( self.current_flos += float(self.floating_point_ops(inputs)) - # Optimizer step for deepspeed must be called on every step regardless of the value of gradient_accumulation_steps - if self.deepspeed: - self.deepspeed.step() + is_last_step_and_steps_less_than_grad_acc = ( + steps_in_epoch <= args.gradient_accumulation_steps and (step + 1) == steps_in_epoch + ) - if (step + 1) % args.gradient_accumulation_steps == 0 or ( + if ( + total_batched_samples % args.gradient_accumulation_steps == 0 + or # last step in epoch but step is always smaller than gradient_accumulation_steps - steps_in_epoch <= args.gradient_accumulation_steps - and (step + 1) == steps_in_epoch + is_last_step_and_steps_less_than_grad_acc ): + # the `or` condition of `is_last_step_and_steps_less_than_grad_acc` is not covered + # in accelerate. So, explicitly enable sync gradients to True in that case. + if is_last_step_and_steps_less_than_grad_acc or ( + version.parse(accelerate_version) <= version.parse("0.20.3") + ): + self.accelerator.gradient_state._set_sync_gradients(True) + # Gradient clipping - if args.max_grad_norm is not None and args.max_grad_norm > 0 and not self.deepspeed: + if args.max_grad_norm is not None and args.max_grad_norm > 0: # deepspeed does its own clipping - if self.do_grad_scaling: - # AMP: gradients need unscaling - self.scaler.unscale_(self.optimizer) - if is_sagemaker_mp_enabled() and args.fp16: self.optimizer.clip_master_grads(args.max_grad_norm) - elif hasattr(self.optimizer, "clip_grad_norm"): - # Some optimizers (like the sharded optimizer) have a specific way to do gradient clipping - self.optimizer.clip_grad_norm(args.max_grad_norm) - elif hasattr(model, "clip_grad_norm_"): - # Some models (like FullyShardedDDP) have a specific way to do gradient clipping - model.clip_grad_norm_(args.max_grad_norm) - else: + elif self.use_apex: # Revert to normal clipping otherwise, handling Apex or full precision nn.utils.clip_grad_norm_( - amp.master_params(self.optimizer) if self.use_apex else model.parameters(), + amp.master_params(self.optimizer), + args.max_grad_norm, + ) + else: + self.accelerator.clip_grad_norm_( + model.parameters(), args.max_grad_norm, ) @@ -459,27 +548,20 @@ def _inner_training_loop( self._compression_manager.callbacks.on_before_optimizer_step() # Optimizer step - optimizer_was_run = True - if self.deepspeed: - pass # called outside the loop - elif self.do_grad_scaling: - scale_before = self.scaler.get_scale() - self.scaler.step(self.optimizer) - self.scaler.update() - scale_after = self.scaler.get_scale() - optimizer_was_run = scale_before <= scale_after - else: - self.optimizer.step() + self.optimizer.step() if self._compression_manager is not None: self._compression_manager.callbacks.on_after_optimizer_step() - if optimizer_was_run and not self.deepspeed: - self.lr_scheduler.step() + optimizer_was_run = not self.accelerator.optimizer_step_was_skipped + if optimizer_was_run: + # Delay optimizer scheduling until metrics are generated + if not isinstance(self.lr_scheduler, torch.optim.lr_scheduler.ReduceLROnPlateau): + self.lr_scheduler.step() model.zero_grad() self.state.global_step += 1 - self.state.epoch = epoch + (step + 1) / steps_in_epoch + self.state.epoch = epoch + (step + 1 + steps_skipped) / steps_in_epoch self.control = self.callback_handler.on_step_end(args, self.state, self.control) if self._compression_manager is not None: self._compression_manager.callbacks.on_step_end() @@ -501,7 +583,6 @@ def _inner_training_loop( self.control = self.callback_handler.on_epoch_end(args, self.state, self.control) if self._compression_manager is not None: self._compression_manager.callbacks.on_epoch_end() - self._maybe_log_save_evaluate(tr_loss, model, trial, epoch, ignore_keys_for_eval) if self.control.should_training_stop: @@ -513,9 +594,10 @@ def _inner_training_loop( logger.info("\n\nTraining completed. Do not forget to share your model on huggingface.co/models =)\n\n") if args.load_best_model_at_end and self.state.best_model_checkpoint is not None: - # Wait for everyone to get here so we are sur the model has been saved by process 0. - - if args.local_rank != -1: + # Wait for everyone to get here so we are sure the model has been saved by process 0. + if is_torch_tpu_available(): + xm.rendezvous("load_best_model_at_end") + elif args.parallel_mode == ParallelMode.DISTRIBUTED: dist.barrier() elif is_sagemaker_mp_enabled(): smp.barrier() @@ -526,7 +608,13 @@ def _inner_training_loop( self._total_loss_scalar += tr_loss.item() train_loss = self._total_loss_scalar / self.state.global_step - metrics = speed_metrics("train", start_time, num_samples=num_train_samples, num_steps=self.state.max_steps) + metrics = speed_metrics( + "train", + start_time, + num_samples=num_train_samples, + num_steps=self.state.max_steps, + num_tokens=num_train_tokens, + ) self.store_flos() metrics["total_flos"] = self.state.total_flos metrics["train_loss"] = train_loss @@ -537,7 +625,26 @@ def _inner_training_loop( self.log(metrics) + run_dir = self._get_output_dir(trial) + checkpoints_sorted = self._sorted_checkpoints(use_mtime=False, output_dir=run_dir) + + # Delete the last checkpoint when save_total_limit=1 if it's different from the best checkpoint and process allowed to save. + if self.args.should_save and self.state.best_model_checkpoint is not None and self.args.save_total_limit == 1: + for checkpoint in checkpoints_sorted: + if not os.path.samefile(checkpoint, self.state.best_model_checkpoint): + logger.info(f"Deleting older checkpoint [{checkpoint}] due to args.save_total_limit") + shutil.rmtree(checkpoint) + self.control = self.callback_handler.on_train_end(args, self.state, self.control) + + # Wait for the checkpoint to be uploaded. + self._finish_current_push() + + # After training we make sure to retrieve back the original forward pass method + # for the embedding layer by removing the forward post hook. + if self.neftune_noise_alpha is not None: + self._deactivate_neftune(self.model) + if self._compression_manager is not None: self._compression_manager.callbacks.on_train_end() diff --git a/optimum/intel/openvino/trainer.py b/optimum/intel/openvino/trainer.py index 0bba054ad3..17b0aa7058 100644 --- a/optimum/intel/openvino/trainer.py +++ b/optimum/intel/openvino/trainer.py @@ -16,6 +16,7 @@ import io import math import os +import shutil import sys import time from collections import defaultdict @@ -23,8 +24,15 @@ from pathlib import Path from typing import Callable, Dict, List, Optional, Tuple, Type, Union + +# Integrations must be imported before ML frameworks: +# isort: off +from transformers.integrations import hp_params +from transformers.integrations.deepspeed import deepspeed_init, deepspeed_load_checkpoint, is_deepspeed_available + +# isort: on + import openvino -import openvino.runtime import torch import torch.distributed as dist import torch.nn.functional as F @@ -46,40 +54,39 @@ compress_quantize_weights_transformation, ) from openvino.runtime import Core, PartialShape, save_model +from packaging import version +from torch import nn from torch.onnx import export as onnx_export from torch.utils._pytree import tree_map -from torch.utils.data import DataLoader, Dataset, RandomSampler -from torch.utils.data.distributed import DistributedSampler -from tqdm.auto import tqdm +from torch.utils.data import Dataset, RandomSampler from transformers import Trainer from transformers.data.data_collator import DataCollator from transformers.debug_utils import DebugOption, DebugUnderflowOverflow -from transformers.deepspeed import deepspeed_init -from transformers.integrations import hp_params from transformers.modeling_utils import PreTrainedModel, unwrap_model from transformers.pytorch_utils import is_torch_less_than_1_11 from transformers.tokenization_utils_base import PreTrainedTokenizerBase from transformers.trainer import TRAINER_STATE_NAME, TRAINING_ARGS_NAME from transformers.trainer_callback import TrainerCallback, TrainerState -from transformers.trainer_pt_utils import IterableDatasetShard +from transformers.trainer_pt_utils import get_dataloader_sampler, get_model_param_count from transformers.trainer_utils import ( EvalPrediction, HPSearchBackend, - ShardedDDPOption, TrainOutput, has_length, speed_metrics, ) +from transformers.training_args import ParallelMode from transformers.utils import ( WEIGHTS_NAME, + is_accelerate_available, is_apex_available, is_sagemaker_mp_enabled, is_torch_tpu_available, logging, ) +from optimum.exporters import TasksManager from optimum.exporters.onnx import OnnxConfig -from optimum.exporters.tasks import TasksManager from ..utils.constant import _TASK_ALIASES from ..utils.import_utils import is_transformers_version @@ -95,6 +102,22 @@ ) +if is_accelerate_available(): + from accelerate import __version__ as accelerate_version + from accelerate import skip_first_batches + + if version.parse(accelerate_version) > version.parse("0.20.3"): + pass + DATA_SAMPLERS = [RandomSampler] + if version.parse(accelerate_version) > version.parse("0.23.0"): + from accelerate.data_loader import SeedableRandomSampler + + DATA_SAMPLERS += [SeedableRandomSampler] + + if is_deepspeed_available(): + pass + + if is_apex_available(): from apex import amp @@ -171,6 +194,8 @@ def __init__( task: Optional[str] = None, feature: Optional[str] = None, ): + self.neftune_noise_alpha = None + super().__init__( model, args, @@ -244,7 +269,9 @@ def _set_signature_columns_if_needed(self): def _inner_training_loop( self, batch_size=None, args=None, resume_from_checkpoint=None, trial=None, ignore_keys_for_eval=None ): + self.accelerator.free_memory() self._train_batch_size = batch_size + logger.debug(f"Currently training with a batch size of: {self._train_batch_size}") # Data loader and number of training steps train_dataloader = self.get_train_dataloader() @@ -252,9 +279,10 @@ def _inner_training_loop( # number of training epochs: num_train_epochs # number of training steps per epoch: num_update_steps_per_epoch # total number of training steps to execute: max_steps - total_train_batch_size = args.train_batch_size * args.gradient_accumulation_steps * args.world_size + total_train_batch_size = self._train_batch_size * args.gradient_accumulation_steps * args.world_size len_dataloader = None + num_train_tokens = None if has_length(train_dataloader): len_dataloader = len(train_dataloader) num_update_steps_per_epoch = len_dataloader // args.gradient_accumulation_steps @@ -268,10 +296,16 @@ def _inner_training_loop( # May be slightly incorrect if the last batch in the training dataloader has a smaller size but it's # the best we can do. num_train_samples = args.max_steps * total_train_batch_size + if args.include_tokens_per_second: + num_train_tokens = ( + self.num_tokens(train_dataloader, args.max_steps) * args.gradient_accumulation_steps + ) else: max_steps = math.ceil(args.num_train_epochs * num_update_steps_per_epoch) num_train_epochs = math.ceil(args.num_train_epochs) num_train_samples = self.num_examples(train_dataloader) * args.num_train_epochs + if args.include_tokens_per_second: + num_train_tokens = self.num_tokens(train_dataloader) * args.num_train_epochs elif args.max_steps > 0: # Rely on max_steps when dataloader does not have a working size max_steps = args.max_steps # Setting a very large number of epochs so we go as many times as necessary over the iterator. @@ -279,6 +313,8 @@ def _inner_training_loop( num_update_steps_per_epoch = max_steps num_examples = total_train_batch_size * args.max_steps num_train_samples = args.max_steps * total_train_batch_size + if args.include_tokens_per_second: + num_train_tokens = self.num_tokens(train_dataloader, args.max_steps) * args.gradient_accumulation_steps else: raise ValueError( "args.max_steps must be set to a positive value if dataloader does not have a length, was" @@ -287,7 +323,7 @@ def _inner_training_loop( if DebugOption.UNDERFLOW_OVERFLOW in self.args.debug: if self.args.n_gpu > 1: - # torch.nn.DataParallel(model) replicates the model, creating new variables and module + # nn.DataParallel(model) replicates the model, creating new variables and module # references registered here no longer work on other gpus, breaking the module raise ValueError( "Currently --debug underflow_overflow is not supported under DP. Please use DDP" @@ -296,30 +332,47 @@ def _inner_training_loop( else: debug_overflow = DebugUnderflowOverflow(self.model) # noqa - delay_optimizer_creation = ( - self.sharded_ddp is not None - and self.sharded_ddp != ShardedDDPOption.SIMPLE - or is_sagemaker_mp_enabled() - or self.fsdp is not None - ) - if args.deepspeed: - deepspeed_engine, optimizer, lr_scheduler = deepspeed_init( - self, num_training_steps=max_steps, resume_from_checkpoint=resume_from_checkpoint - ) - self.model = deepspeed_engine.module - self.model_wrapped = deepspeed_engine - self.deepspeed = deepspeed_engine - self.optimizer = optimizer - self.lr_scheduler = lr_scheduler - elif not delay_optimizer_creation: + delay_optimizer_creation = is_sagemaker_mp_enabled() or self.fsdp is not None or self.is_fsdp_enabled + + # We need to reset the scheduler, as its parameters may be different on subsequent calls + if self._created_lr_scheduler: + self.lr_scheduler = None + self._created_lr_scheduler = False + + if self.is_deepspeed_enabled: + self.optimizer, self.lr_scheduler = deepspeed_init(self, num_training_steps=max_steps) + + if not delay_optimizer_creation: self.create_optimizer_and_scheduler(num_training_steps=max_steps) self.state = TrainerState() self.state.is_hyper_param_search = trial is not None + # Compute absolute values for logging, eval, and save if given as ratio + if args.logging_steps is not None: + if args.logging_steps < 1: + self.state.logging_steps = math.ceil(max_steps * args.logging_steps) + else: + self.state.logging_steps = args.logging_steps + if args.eval_steps is not None: + if args.eval_steps < 1: + self.state.eval_steps = math.ceil(max_steps * args.eval_steps) + else: + self.state.eval_steps = args.eval_steps + if args.save_steps is not None: + if args.save_steps < 1: + self.state.save_steps = math.ceil(max_steps * args.save_steps) + else: + self.state.save_steps = args.save_steps + # Activate gradient checkpointing if needed if args.gradient_checkpointing: - self.model.gradient_checkpointing_enable() + if args.gradient_checkpointing_kwargs is None: + gradient_checkpointing_kwargs = {} + else: + gradient_checkpointing_kwargs = args.gradient_checkpointing_kwargs + + self.model.gradient_checkpointing_enable(gradient_checkpointing_kwargs=gradient_checkpointing_kwargs) if is_transformers_version("<", "4.29.0"): is_distributed = self.args.local_rank != -1 @@ -333,31 +386,67 @@ def _inner_training_loop( model = self._wrap_model(self.model_wrapped) - if is_sagemaker_mp_enabled() and resume_from_checkpoint is not None: - self._load_from_checkpoint(resume_from_checkpoint, model) + # as the model is wrapped, don't use `accelerator.prepare` + # this is for unhandled cases such as + # FSDP-XLA, SageMaker MP/DP, DataParallel, IPEX + use_accelerator_prepare = True if model is self.model else False + + if delay_optimizer_creation: + if use_accelerator_prepare: + self.model = self.accelerator.prepare(self.model) + self.create_optimizer_and_scheduler(num_training_steps=max_steps) + + # prepare using `accelerator` prepare + if use_accelerator_prepare: + self.model.train() + if hasattr(self.lr_scheduler, "step"): + if self.use_apex: + model = self.accelerator.prepare(self.model) + else: + model, self.optimizer = self.accelerator.prepare(self.model, self.optimizer) + else: + # to handle cases wherein we pass "DummyScheduler" such as when it is specified in DeepSpeed config. + model, self.optimizer, self.lr_scheduler = self.accelerator.prepare( + self.model, self.optimizer, self.lr_scheduler + ) + + if self.is_fsdp_enabled: + self.model = self.model_wrapped = model # for the rest of this function `model` is the outside model, whether it was wrapped or not if model is not self.model: self.model_wrapped = model - if delay_optimizer_creation: - self.create_optimizer_and_scheduler(num_training_steps=max_steps) + # backward compatibility + if self.is_deepspeed_enabled: + self.deepspeed = self.model_wrapped + + # ckpt loading + if resume_from_checkpoint is not None: + if self.is_deepspeed_enabled: + deepspeed_load_checkpoint(self.model_wrapped, resume_from_checkpoint) + elif is_sagemaker_mp_enabled() or self.is_fsdp_enabled: + self._load_from_checkpoint(resume_from_checkpoint, self.model_wrapped) # Check if saved optimizer or scheduler states exist self._load_optimizer_and_scheduler(resume_from_checkpoint) # important: at this point: # self.model is the Transformers Model - # self.model_wrapped is DDP(Transformers Model), Deepspeed(Transformers Model), etc. + # self.model_wrapped is DDP(Transformers Model), Deepspeed(Transformers Model), + # FSDP(Transformers Model), Dynamo Optimized Module(Transformers Model) etc. # Train! logger.info("***** Running training *****") - logger.info(f" Num examples = {num_examples}") - logger.info(f" Num Epochs = {num_train_epochs}") - logger.info(f" Instantaneous batch size per device = {args.per_device_train_batch_size}") - logger.info(f" Total train batch size (w. parallel, distributed & accumulation) = {total_train_batch_size}") + logger.info(f" Num examples = {num_examples:,}") + logger.info(f" Num Epochs = {num_train_epochs:,}") + logger.info(f" Instantaneous batch size per device = {self.args.per_device_train_batch_size:,}") + if self.args.per_device_train_batch_size != self._train_batch_size: + logger.info(f" Training with DataParallel so batch size has been adjusted to: {self._train_batch_size:,}") + logger.info(f" Total train batch size (w. parallel, distributed & accumulation) = {total_train_batch_size:,}") logger.info(f" Gradient Accumulation steps = {args.gradient_accumulation_steps}") - logger.info(f" Total optimization steps = {max_steps}") + logger.info(f" Total optimization steps = {max_steps:,}") + logger.info(f" Number of trainable parameters = {get_model_param_count(model, trainable_only=True):,}") self.state.epoch = 0 start_time = time.time() @@ -382,20 +471,19 @@ def _inner_training_loop( logger.info(f" Continuing training from global step {self.state.global_step}") if not args.ignore_data_skip: logger.info( - f" Will skip the first {epochs_trained} epochs then the first {steps_trained_in_current_epoch} " - "batches in the first epoch. If this takes a lot of time, you can add the `--ignore_data_skip` " - "flag to your launch command, but you will resume the training on data already seen by your model." + f" Will skip the first {epochs_trained} epochs then the first" + f" {steps_trained_in_current_epoch} batches in the first epoch." ) - if self.is_local_process_zero() and not args.disable_tqdm: - steps_trained_progress_bar = tqdm(total=steps_trained_in_current_epoch) - steps_trained_progress_bar.set_description("Skipping the first batches") # Update the references self.callback_handler.model = self.model self.callback_handler.optimizer = self.optimizer self.callback_handler.lr_scheduler = self.lr_scheduler self.callback_handler.train_dataloader = train_dataloader - self.state.trial_name = self.hp_name(trial) if self.hp_name is not None else None + if self.hp_name is not None and self._trial is not None: + # use self._trial because the SigOpt/Optuna hpo only call `_hp_search_setup(trial)` instead of passing trial + # parameter to Train when using DDP. + self.state.trial_name = self.hp_name(self._trial) if trial is not None: assignments = trial.assignments if self.hp_search_backend == HPSearchBackend.SIGOPT else trial self.state.trial_params = hp_params(assignments) @@ -408,6 +496,7 @@ def _inner_training_loop( self.state.is_local_process_zero = self.is_local_process_zero() self.state.is_world_process_zero = self.is_world_process_zero() + # tr_loss is a tensor to avoid synchronization of TPUs through .item() tr_loss = torch.tensor(0.0).to(args.device) self.compression_metrics = defaultdict(lambda: torch.tensor(0.0).to(args.device)) # _total_loss_scalar is updated everytime .item() has to be called on tr_loss and stores the sum of all losses @@ -420,31 +509,33 @@ def _inner_training_loop( # Skip the first epochs_trained epochs to get the random state of the dataloader at the right point. if not args.ignore_data_skip: for epoch in range(epochs_trained): - is_random_sampler = hasattr(train_dataloader, "sampler") and isinstance( - train_dataloader.sampler, RandomSampler - ) + sampler = get_dataloader_sampler(train_dataloader) + sampler_kinds = [RandomSampler] + if version.parse(accelerate_version) > version.parse("0.23.0"): + sampler_kinds.append(SeedableRandomSampler) + is_random_sampler = isinstance(sampler, tuple(sampler_kinds)) if is_torch_less_than_1_11 or not is_random_sampler: # We just need to begin an iteration to create the randomization of the sampler. - # That was before PyTorch 1.11 however... for _ in train_dataloader: break else: - # Otherwise we need to call the whole sampler cause there is some random operation added + # Otherwise we need to call the whooooole sampler cause there is some random operation added # AT THE VERY END! - _ = list(train_dataloader.sampler) + sampler = sampler if sampler is not None else [] + _ = list(sampler) + total_batched_samples = 0 for epoch in range(epochs_trained, num_train_epochs): - if isinstance(train_dataloader, DataLoader) and isinstance(train_dataloader.sampler, DistributedSampler): - train_dataloader.sampler.set_epoch(epoch) - elif hasattr(train_dataloader, "dataset") and isinstance(train_dataloader.dataset, IterableDatasetShard): - train_dataloader.dataset.set_epoch(epoch) + epoch_iterator = train_dataloader + if hasattr(epoch_iterator, "set_epoch"): + epoch_iterator.set_epoch(epoch) # Reset the past mems state at the beginning of each epoch if necessary. if args.past_index >= 0: self._past = None steps_in_epoch = ( - len(train_dataloader) + len(epoch_iterator) if len_dataloader is not None else args.max_steps * args.gradient_accumulation_steps ) @@ -460,8 +551,21 @@ def _inner_training_loop( if epoch == epochs_trained and resume_from_checkpoint is not None and steps_trained_in_current_epoch == 0: self._load_rng_state(resume_from_checkpoint) + rng_to_sync = False + steps_skipped = 0 + if steps_trained_in_current_epoch > 0: + epoch_iterator = skip_first_batches(epoch_iterator, steps_trained_in_current_epoch) + steps_skipped = steps_trained_in_current_epoch + steps_trained_in_current_epoch = 0 + rng_to_sync = True + step = -1 - for step, inputs in enumerate(train_dataloader): + for step, inputs in enumerate(epoch_iterator): + total_batched_samples += 1 + if rng_to_sync: + self._load_rng_state(resume_from_checkpoint) + rng_to_sync = False + # Skip past any already trained steps if resuming training if steps_trained_in_current_epoch > 0: steps_trained_in_current_epoch -= 1 @@ -480,17 +584,14 @@ def _inner_training_loop( # Must be called at the beginning of each training step to prepare the compression method self.compression_controller.scheduler.step() + with self.accelerator.accumulate(model): + tr_loss_step = self.training_step(model, inputs) + if ( - ((step + 1) % args.gradient_accumulation_steps != 0) - and args.local_rank != -1 - and args._no_sync_in_gradient_accumulation + args.logging_nan_inf_filter + and not is_torch_tpu_available() + and (torch.isnan(tr_loss_step) or torch.isinf(tr_loss_step)) ): - # Avoid unnecessary DDP synchronization since there will be no backward pass on this example. - with model.no_sync(): - tr_loss_step = self.training_step(model, inputs) - else: - tr_loss_step = self.training_step(model, inputs) - if args.logging_nan_inf_filter and (torch.isnan(tr_loss_step) or torch.isinf(tr_loss_step)): # if loss is nan or inf simply add the average of previous logged losses tr_loss += tr_loss / (1 + self.state.global_step - self._globalstep_last_logged) else: @@ -498,57 +599,52 @@ def _inner_training_loop( self.current_flos += float(self.floating_point_ops(inputs)) - # Optimizer step for deepspeed must be called on every step regardless of the value of gradient_accumulation_steps - if self.deepspeed: - self.deepspeed.step() + is_last_step_and_steps_less_than_grad_acc = ( + steps_in_epoch <= args.gradient_accumulation_steps and (step + 1) == steps_in_epoch + ) - if (step + 1) % args.gradient_accumulation_steps == 0 or ( + if ( + total_batched_samples % args.gradient_accumulation_steps == 0 + or # last step in epoch but step is always smaller than gradient_accumulation_steps - steps_in_epoch <= args.gradient_accumulation_steps - and (step + 1) == steps_in_epoch + is_last_step_and_steps_less_than_grad_acc ): + # the `or` condition of `is_last_step_and_steps_less_than_grad_acc` is not covered + # in accelerate. So, explicitly enable sync gradients to True in that case. + if is_last_step_and_steps_less_than_grad_acc or ( + version.parse(accelerate_version) <= version.parse("0.20.3") + ): + self.accelerator.gradient_state._set_sync_gradients(True) + # Gradient clipping - if args.max_grad_norm is not None and args.max_grad_norm > 0 and not self.deepspeed: + if args.max_grad_norm is not None and args.max_grad_norm > 0: # deepspeed does its own clipping - if self.do_grad_scaling: - # AMP: gradients need unscaling - self.scaler.unscale_(self.optimizer) - if is_sagemaker_mp_enabled() and args.fp16: self.optimizer.clip_master_grads(args.max_grad_norm) - elif hasattr(self.optimizer, "clip_grad_norm"): - # Some optimizers (like the sharded optimizer) have a specific way to do gradient clipping - self.optimizer.clip_grad_norm(args.max_grad_norm) - elif hasattr(model, "clip_grad_norm_"): - # Some models (like FullyShardedDDP) have a specific way to do gradient clipping - model.clip_grad_norm_(args.max_grad_norm) - else: + elif self.use_apex: # Revert to normal clipping otherwise, handling Apex or full precision - torch.nn.utils.clip_grad_norm_( - amp.master_params(self.optimizer) if self.use_apex else model.parameters(), + nn.utils.clip_grad_norm_( + amp.master_params(self.optimizer), + args.max_grad_norm, + ) + else: + self.accelerator.clip_grad_norm_( + model.parameters(), args.max_grad_norm, ) # Optimizer step - optimizer_was_run = True - if self.deepspeed: - pass # called outside the loop - elif self.do_grad_scaling: - scale_before = self.scaler.get_scale() - self.scaler.step(self.optimizer) - self.scaler.update() - scale_after = self.scaler.get_scale() - optimizer_was_run = scale_before <= scale_after - else: - self.optimizer.step() - - if optimizer_was_run and not self.deepspeed: - self.lr_scheduler.step() + self.optimizer.step() + optimizer_was_run = not self.accelerator.optimizer_step_was_skipped + if optimizer_was_run: + # Delay optimizer scheduling until metrics are generated + if not isinstance(self.lr_scheduler, torch.optim.lr_scheduler.ReduceLROnPlateau): + self.lr_scheduler.step() model.zero_grad() self.state.global_step += 1 - self.state.epoch = epoch + (step + 1) / steps_in_epoch + self.state.epoch = epoch + (step + 1 + steps_skipped) / steps_in_epoch self.control = self.callback_handler.on_step_end(args, self.state, self.control) self._maybe_log_save_evaluate(tr_loss, model, trial, epoch, ignore_keys_for_eval) @@ -559,7 +655,7 @@ def _inner_training_loop( break if step < 0: logger.warning( - "There seems to be not a single sample in your train_dataloader, stopping training at step" + "There seems to be not a single sample in your epoch_iterator, stopping training at step" f" {self.state.global_step}! This is expected if you're using an IterableDataset and set" f" num_steps ({max_steps}) higher than the number of available samples." ) @@ -577,8 +673,10 @@ def _inner_training_loop( logger.info("\n\nTraining completed. Do not forget to share your model on huggingface.co/models =)\n\n") if args.load_best_model_at_end and self.state.best_model_checkpoint is not None: - # Wait for everyone to get here so we are sur the model has been saved by process 0. - if args.local_rank != -1: + # Wait for everyone to get here so we are sure the model has been saved by process 0. + if is_torch_tpu_available(): + xm.rendezvous("load_best_model_at_end") + elif args.parallel_mode == ParallelMode.DISTRIBUTED: dist.barrier() elif is_sagemaker_mp_enabled(): smp.barrier() @@ -589,7 +687,13 @@ def _inner_training_loop( self._total_loss_scalar += tr_loss.item() train_loss = self._total_loss_scalar / self.state.global_step - metrics = speed_metrics("train", start_time, num_samples=num_train_samples, num_steps=self.state.max_steps) + metrics = speed_metrics( + "train", + start_time, + num_samples=num_train_samples, + num_steps=self.state.max_steps, + num_tokens=num_train_tokens, + ) self.store_flos() metrics["total_flos"] = self.state.total_flos metrics["train_loss"] = train_loss @@ -600,8 +704,26 @@ def _inner_training_loop( self.log(metrics) + run_dir = self._get_output_dir(trial) + checkpoints_sorted = self._sorted_checkpoints(use_mtime=False, output_dir=run_dir) + + # Delete the last checkpoint when save_total_limit=1 if it's different from the best checkpoint and process allowed to save. + if self.args.should_save and self.state.best_model_checkpoint is not None and self.args.save_total_limit == 1: + for checkpoint in checkpoints_sorted: + if not os.path.samefile(checkpoint, self.state.best_model_checkpoint): + logger.info(f"Deleting older checkpoint [{checkpoint}] due to args.save_total_limit") + shutil.rmtree(checkpoint) + self.control = self.callback_handler.on_train_end(args, self.state, self.control) + # Wait for the checkpoint to be uploaded. + self._finish_current_push() + + # After training we make sure to retrieve back the original forward pass method + # for the embedding layer by removing the forward post hook. + if self.neftune_noise_alpha is not None: + self._deactivate_neftune(self.model) + return TrainOutput(self.state.global_step, train_loss, metrics) def compute_distillation_loss(self, inputs, student_outputs): diff --git a/setup.py b/setup.py index 6d81b98b2a..0c1feace30 100644 --- a/setup.py +++ b/setup.py @@ -13,7 +13,7 @@ INSTALL_REQUIRE = [ "optimum>=1.13.0", - "transformers>=4.20.0", + "transformers", "datasets>=1.4.0", "sentencepiece", "scipy", @@ -41,8 +41,9 @@ "neural-compressor>=2.2.0", "onnx", "onnxruntime<1.15.0", + "transformers>=4.33.0", ], - "openvino": ["openvino>=2023.1.0", "onnx", "onnxruntime"], + "openvino": ["openvino>=2023.1.0", "onnx", "onnxruntime", "transformers>=4.33.0"], "nncf": ["nncf>=2.6.0"], "ipex": ["transformers<4.32.0", "intel-extension-for-pytorch", "onnx"], "diffusers": ["diffusers"], diff --git a/tests/neural_compressor/test_modeling.py b/tests/neural_compressor/test_modeling.py index fc2a310595..8098f011c5 100644 --- a/tests/neural_compressor/test_modeling.py +++ b/tests/neural_compressor/test_modeling.py @@ -19,6 +19,7 @@ import unittest import torch +from packaging.version import Version, parse from parameterized import parameterized from transformers import AutoTokenizer, pipeline, set_seed @@ -39,6 +40,7 @@ INCTrainer, ) from optimum.intel.neural_compressor.utils import _HEAD_TO_AUTOMODELS, WEIGHTS_NAME +from optimum.version import __version__ as _optimum_version os.environ["CUDA_VISIBLE_DEVICES"] = "" @@ -133,6 +135,7 @@ def test_pipeline(self, model_id, task): pipe(*inputs) + @unittest.skipIf(parse(_optimum_version) < Version("1.14.0"), "not supported, needs optimum>=v1.14.0") def test_compare_with_and_without_past_key_values(self): model_id = "echarlaix/tiny-random-gpt2-torchscript" tokenizer = AutoTokenizer.from_pretrained(model_id) diff --git a/tests/neural_compressor/test_onnx.py b/tests/neural_compressor/test_onnx.py index f5dc0b7c66..387c369dd1 100644 --- a/tests/neural_compressor/test_onnx.py +++ b/tests/neural_compressor/test_onnx.py @@ -54,7 +54,7 @@ def test_static_quantization(self, task, model_name, expected_quantized_matmuls) tokenizer.pad_token = tokenizer.eos_token quantizer = INCQuantizer.from_pretrained(model, task=task) calibration_dataset = _generate_dataset(quantizer, tokenizer, num_samples=num_samples) - save_onnx_model = True + save_onnx_model = False op_type_dict = ( {"Embedding": {"weight": {"dtype": ["fp32"]}, "activation": {"dtype": ["fp32"]}}} if save_onnx_model