diff --git a/docs/user/pytorch.md b/docs/user/pytorch.md new file mode 100644 index 00000000000..44013bd9f50 --- /dev/null +++ b/docs/user/pytorch.md @@ -0,0 +1,77 @@ +# DAOS pytorch interface + +PyTorch is fully featured framework for building deep learning models and training them. +It is widely used in the research community and in the industry. +PyTorch allows loading data from various sources and DAOS can be used as a storage backend for training data and models' checkpoints. + +[DFS plugin](https://github.com/daos-stack/daos/tree/master/src/client/pydaos/torch) implements PyTorch interfaces for loading data from DAOS: Map and Iterable style datasets. +This allows to use all features of `torch.utils.data.DataLoader` to load data from DAOS POSIX containers, including parallel data loading, batching, shuffling, etc. + +## Installation + +To install the plugin, you need to have PyTorch installed. Please follow the official [PyTorch installation guide](https://pytorch.org/get-started/). +`pydoas.torch` module comes with DAOS client package. Please refer to DAOS installation guide for your distribution. + + +## Usage + +To use DAOS as a storage backend for PyTorch, you need to have DAOS agent running on the nodes where PyTorch is running and correctly configured ACLs for the container. + +Here's an example of how to use Map-style dataset with DAOS directly: + +```python +import torch +from torch.utils.data import DataLoader +from pydaos.torch import Dataset + +dataset = Dataset(pool='pool', container='container', path='/training/samples') +# That's it, when the Dataset is created, it will connect to DAOS, scan the namaspace of the container +# and will be ready to load data from it. + +for i, sample in enumerate(dataset): + print(f"Sample {i} size: {len(sample)}") +``` + +To use Dataset with DataLoader, you can pass it directly to DataLoader constructor: + +```python + +dataloader = DataLoader(dataset, + batch_size=4, + shuffle=True, + num_workers=4, + worker_init_fn=dataset.worker_init) + +# and use DataLoader as usual +for batch in dataloader: + print(f"Batch size: {len(batch)}") +``` + +The only notable difference is that you need to set `worker_init_fn` method of the dataset to correctly initialize the DAOS connection in the worker processes. + +## Checkpoints + +DAOS can be used to store model checkpoints as well. +PyTorch provides a way to save and load model checkpoints using [torch.save](https://pytorch.org/docs/main/generated/torch.save.html) and [torch.load](https://pytorch.org/docs/main/generated/torch.load.html) functions + +`pydaos.torch` provides a way to save and load model checkpoints directly to/from DAOS container (could be same or different container than the one used for data).: + +```python +import torch +from pydaos.torch import Checkpoint + +# ... + +chkp = Checkpoint(pool, cont, prefix='/training/checkpoints') + +with chkp.writer('model.pt') as w: + torch.save(model.state_dict(), w) + +# Later, to load the model + +with chkp.reader('model.pt') as r: + torch.load(r) + +``` + +See [pydaos.torch](https://github.com/daos-stack/daos/blob/master/src/client/pydaos/torch/Readme.md) plugin for an example of how to use checkpoints with DLIO benchmark diff --git a/requirements-ftest.txt b/requirements-ftest.txt index c0c0f2b8eb1..7b86d613036 100644 --- a/requirements-ftest.txt +++ b/requirements-ftest.txt @@ -4,3 +4,4 @@ avocado-framework-plugin-varianter-yaml-to-mux==82 clustershell paramiko distro +torch diff --git a/src/client/pydaos/torch/Readme.md b/src/client/pydaos/torch/Readme.md index b2a39cc9b53..85d8e388b68 100644 --- a/src/client/pydaos/torch/Readme.md +++ b/src/client/pydaos/torch/Readme.md @@ -62,3 +62,74 @@ for i in range(1, cols * rows + 1): plt.imshow(img.squeeze(), cmap="gray") plt.show() ``` + + +### Checkpoint interface + +Torch framework provides a way to save and load model's checkpoints: `torch.save` and `torch.load` functions are used to save and load the model state dictionary. +The `torch.save` function expects a state dictionary object and a file like object `Union[str, PathLike, BinaryIO, IO[bytes]]`. +To implement such interface, `pydaos.torch.WriteBuffer` class is introduced, which is a wrapper around `io.BufferedIOBase` object, behaving like a writable stream. +`WriteBuffer` can operate in two modes: in-memory buffer and chunked buffer. In-memory buffer accumulates data in memory and writes it to the DAOS container when `close()` method is called. +Chunked buffer writes the data to the DAOS container in chunks of fixed size. There are optional parameters to limit number of chunks in-flight and number of worker processes to use. +Implementation of the loader is pretty straightforward - it reads the data from the file with existing API and returns it as a buffer. + +For convenience, the `pydoas.torch.Checkpoint` class is provided that manages the DAOS connections and provides `reader` and `writer` methods. + + +Example of using the checkpointing interface in DLIO benchmark: + +```python +import logging +import torch +from pydaos.torch import Checkpoint as DaosCheckpoint + +from dlio_benchmark.checkpointing.base_checkpointing import BaseCheckpointing +from dlio_benchmark.utils.utility import Profile +from dlio_benchmark.utils.config import ConfigArguments + +from dlio_benchmark.common.constants import MODULE_CHECKPOINT + +dlp = Profile(MODULE_CHECKPOINT) + + +class PyDaosTorchCheckpointing(BaseCheckpointing): + __instance = None + + @staticmethod + def get_instance(): + """ Static access method. """ + if PyDaosTorchCheckpointing.__instance is None: + logging.basicConfig(level=logging.INFO) + PyDaosTorchCheckpointing.__instance = PyDaosTorchCheckpointing() + return PyDaosTorchCheckpointing.__instance + + @dlp.log_init + def __init__(self): + super().__init__("pt") + + args = ConfigArguments.get_instance() + prefix = args.checkpoint_folder + pool = args.checkpoint_daos_pool + cont = args.checkpoint_daos_cont + + logging.info(f"Checkpointing is set to DAOS pool: {pool}, container: {cont} with prefix: {prefix}") + self.ckpt = DaosCheckpoint(pool, cont, prefix) + + @dlp.log + def get_tensor(self, size): + return torch.randint(high=1, size=(size,), dtype=torch.int8) + + @dlp.log + def save_state(self, suffix, state): + name = self.get_name(suffix) + with self.ckpt.writer(name) as f: + torch.save(state, f) + + @dlp.log + def checkpoint(self, epoch, step_number): + super().checkpoint(epoch, step_number) + + @dlp.log + def finalize(self): + super().finalize() +``` diff --git a/src/client/pydaos/torch/__init__.py b/src/client/pydaos/torch/__init__.py index 31c901c8891..7d3ac476338 100644 --- a/src/client/pydaos/torch/__init__.py +++ b/src/client/pydaos/torch/__init__.py @@ -1,6 +1,7 @@ -# (C) Copyright 2024 Intel Corporation. -# (C) Copyright 2024 Google LLC -# (C) Copyright 2024 Enakta Labs Ltd +# (C) Copyright 2024-2025 Intel Corporation. +# (C) Copyright 2025 Hewlett Packard Enterprise Development LP +# (C) Copyright 2024-2025 Google LLC +# (C) Copyright 2024-2025 Enakta Labs Ltd # # SPDX-License-Identifier: BSD-2-Clause-Patent # @@ -8,7 +9,6 @@ """ PyTorch DAOS Module allowing using DFS as Dataset """ - import atexit from . import torch_shim # pylint: disable=relative-beyond-top-level,import-self @@ -16,17 +16,57 @@ DAOS_MAGIC = 0x7A8B -# The module loader procedure guarantees that __init__.py is going to be run only once -_rc = torch_shim.module_init() -if _rc != 0: - raise ValueError(f"Could not initialize DAOS module: rc={_rc}") +class DaosClient(): + # pylint: disable=too-few-public-methods + # pylint: disable=attribute-defined-outside-init + """ + DaosClient is responsible for handling DAOS init/fini. + + The class implements the Singleton pattern and only + allows a single instance to be instantiated during + the lifetime of a process. + """ + _instance = None + + @classmethod + def cleanup(cls): + """Trigger the instance cleanup process.""" + if cls._instance is None: + return + cls._instance = None + + def __new__(cls): + if cls._instance is None: + cls._instance = super().__new__(cls) + # pylint: disable=protected-access + cls._instance._open() + return cls._instance + + def _open(self): + # Initialize DAOS + self.connected = False + _rc = torch_shim.module_init(DAOS_MAGIC) + if _rc != 0: + raise ValueError(f"Could not initialize DAOS module: rc={_rc}") + self.connected = True + + def _close(self): + if not self.connected: + return + _rc = torch_shim.module_fini(DAOS_MAGIC) + if _rc != 0: + raise ValueError(f"Could not finalize DAOS module: rc={_rc}") + self.connected = False + + def __del__(self): + if not torch_shim or not self.connected: + return + self._close() @atexit.register -def _fini(): - rc = torch_shim.module_fini() - if rc != 0: - raise ValueError(f"Could not finalize DAOS module, rc={rc}") +def _cleanup(): + DaosClient.cleanup() from .torch_api import * # noqa: F403,E402 diff --git a/src/client/pydaos/torch/torch_api.py b/src/client/pydaos/torch/torch_api.py index fb402170bde..3feb815b744 100644 --- a/src/client/pydaos/torch/torch_api.py +++ b/src/client/pydaos/torch/torch_api.py @@ -1,6 +1,6 @@ # -# (C) Copyright 2024 Google LLC -# (C) Copyright 2024 Enakta Labs Ltd +# (C) Copyright 2024-2025 Google LLC +# (C) Copyright 2024-2025 Enakta Labs Ltd # # SPDX-License-Identifier: BSD-2-Clause-Patent # @@ -9,16 +9,17 @@ to access training data on DAOS DFS via POSIX container. """ -import concurrent +import io import math import os -from concurrent.futures import FIRST_COMPLETED, ProcessPoolExecutor +import stat +from multiprocessing import Process, Queue from torch.utils.data import Dataset as TorchDataset from torch.utils.data import IterableDataset as TorchIterableDataset from torch.utils.data import get_worker_info -from . import DAOS_MAGIC, torch_shim +from . import DAOS_MAGIC, DaosClient, torch_shim ITER_BATCH_SIZE = 32 READDIR_BATCH_SIZE = 128 @@ -253,6 +254,250 @@ def __load_batch(self, items): return [self._transform_fn(x) for x in result] +class WriteBuffer(io.BufferedIOBase): + """ + Class representing stream like write buffer for saving PyTorch model checkpoints to DAOS DFS. + + It provides two ways of writing data: + + In-memory buffer: all data will be written to the buffer + and flushed to the storage on close() call. To use this mode set transfer_chunk_size to 0. + + Chunked write: data will be written in chunks and saved to the storage in parallel, + using multiple workers. To use this mode set transfer_chunk_size to non-zero value. + + chunks_limit parameter is used to limit memory usage (only in chunked write mode): + no more than chunks_limit chunks will be queued for writing to the storage. + + This class is not intended to be used directly: Checkpoint class is the main interface. + """ + + # pylint: disable=too-many-arguments,too-many-instance-attributes + def __init__(self, dfs, path, mode, open_flags, class_name, + file_chunk_size, transfer_chunk_size, chunks_limit, workers): + super().__init__() + + self._dfs = dfs + self._path = path + self._buffer = bytearray() + # offset is used to track the offset in the file for chunked writes + self._offset = 0 + # position is used to track how much was written in the buffer + self._position = 0 + self._closed = False + self._mode = mode + self._oflags = open_flags + self._class_name = class_name + self._file_chunk_size = file_chunk_size + self._transfer_chunk_size = transfer_chunk_size + + self._workers = [] + if self._transfer_chunk_size > 0: + if chunks_limit == 0: + self._queue = Queue() + else: + self._queue = Queue(chunks_limit) + + for _ in range(workers): + worker = Process(target=self._worker_fn, args=(self._queue,)) + worker.start() + self._workers.append(worker) + + def _worker_fn(self, queue): + self._dfs.worker_init() + while True: + work = queue.get() + if work is None: + break + + (offset, chunk) = work + self._dfs.write(self._path, self._mode, self._oflags, + self._class_name, self._file_chunk_size, offset, chunk) + + def write(self, data): + """ Writes data to the buffer.""" + + if self.closed: + raise ValueError("I/O operation on closed file") + + # In case of no chunking, we just extend the existing buffer without any limits + if self._transfer_chunk_size == 0: + self._buffer.extend(data) + self._position += len(data) + return len(data) + + written = len(data) + while len(data) > 0: + fit = min(len(data), self._transfer_chunk_size - len(self._buffer)) + chunk = data[:fit] + self._buffer.extend(chunk) + self._position += len(chunk) + + if len(self._buffer) == self._transfer_chunk_size: + self._submit_chunk(self._offset, self._buffer) + self._offset += len(self._buffer) + self._buffer = bytearray() + + data = data[len(chunk):] + + return written + + def tell(self): + """Return current stream position.""" + if self.closed: + raise ValueError("I/O operation on closed file") + return self._position + + def close(self): + """Upload any data left in buffer to storage and close.""" + if self.closed: + return + + self._flush() + self._closed = True + + for _ in self._workers: + self._queue.put(None) + for worker in self._workers: + worker.join() + + super().close() + + def _flush(self): + """Write if anything left and wait for any outstanding transfers""" + if self.closed: + raise ValueError("I/O operation on closed file") + + if len(self._buffer) > 0 and self._transfer_chunk_size == 0: + self._dfs.write(self._path, self._mode, self._oflags, + self._class_name, self._file_chunk_size, 0, self._buffer) + return + + if len(self._buffer) > 0: + self._submit_chunk(self._offset, self._buffer) + self._offset += len(self._buffer) + + def _submit_chunk(self, offset, chunk): + """ Submits chunk for writing to the container. + + It will block if the queue is full and has a size limit, forcing the caller to wait + until some of the chunks are written to the storage. + """ + + self._queue.put((offset, chunk)) + + @property + def closed(self): + """Return True if the file is closed.""" + return self._closed + + def writable(self): + """Return True if the file is writable.""" + return True + + def readable(self): + return False + + def seekable(self): + """Return True if the file is seekable.""" + return False + + +class Checkpoint(): + """ + Class representing checkpoint interface for pytorch to save and load + model's stave over DAOS DFS. + + Attributes + ---------- + pool : string + Pool label or UUID string + cont: string + Container label or UUID string + prefix : string (optional) + Prefix as a directory to store checkpoint files, default is root of the container. + mode : int (optional) + File mode to be used for checkpoint files, default is 0o744. + open_flags : int (optional) + Open flags to be used for checkpoint files, default is to create a file. + class_name : string (optional) + Object class name to be used for checkpoint files, default is OC_UNKNOWN. + file_chunk_size : int (optional) + Chunk size to be used for checkpoint files, default is 0. + transfer_chunk_size : int (optional) + Chunk size for data buffering/transfer, default is 0, which means no chunking: + All writes go to in memory buffer and actual flush to storage will happen on close() call. + chunks_limits: int (optional) + Number of chunks to be used for buffering and transfer, default is 0, which means no limit. + It's used only when transfer_chunk_size is set to non-zero value and provides the mechanism + to limit memory usage. + workers: int (optional) + Number of workers to be used for parallel chunked writes. + This parameter is used only when transfer_chunk_size is set to non-zero value. + + Methods + ------- + reader(fname): + Reads the checkpoint file and returns its content as BytesIO object. + + writer(fname): + Returns write buffer to save the checkpoint file. + """ + + # pylint: disable=too-many-arguments,too-many-instance-attributes + def __init__(self, pool, cont, prefix=os.sep, + mode=stat.S_IFREG | stat.S_IRWXU | stat.S_IRGRP | stat.S_IROTH, + open_flags=os.O_CREAT | os.O_RDWR, + class_name="OC_UNKNOWN", + file_chunk_size=0, + transfer_chunk_size=0, + chunks_limit=0, + workers=4, + ): + self._pool = pool + self._cont = cont + self._prefix = prefix + self._mode = mode + self._oflags = open_flags + self._class_name = class_name + self._file_chunk_size = file_chunk_size + self._transfer_chunk_size = transfer_chunk_size + self._chunks_limit = chunks_limit + self._workers = workers + self._dfs = _Dfs(pool=pool, cont=cont, rd_only=False) + + def __del__(self): + """ Cleanups the used resources and connection """ + + if self._dfs is None: + return + self._dfs.disconnect() + self._dfs = None + + def reader(self, fname): + """ Reads the checkpoint file and returns its content as BytesIO object """ + + if fname is None: + raise ValueError("fname is required") + + fpath = os.path.join(self._prefix, fname) + + size = self._dfs.get_file_size(fpath) + buf = self._dfs.read(fpath, size) + return io.BytesIO(buf) + + def writer(self, fname): + """ Returns write buffer to save the checkpoint file """ + + if fname is None: + raise ValueError("fname is required") + + fpath = os.path.join(self._prefix, fname) + return WriteBuffer(self._dfs, fpath, self._mode, self._oflags, + self._class_name, self._file_chunk_size, self._transfer_chunk_size, + self._chunks_limit, self._workers) + + class _Dfs(): """ Class encapsulating libdfs interface to load PyTorch Dataset @@ -260,9 +505,12 @@ class _Dfs(): """ def __init__(self, pool=None, cont=None, rd_only=True): - if (pool is None or cont is None): - raise ValueError("invalid pool or container labels") + if pool is None: + raise ValueError("pool label or UUID is required") + if cont is None: + raise ValueError("container label or UUID is required") + self._dc = DaosClient() (ret, dfs) = torch_shim.torch_connect(DAOS_MAGIC, pool, cont, rd_only) if ret != 0: raise OSError(ret, os.strerror(ret), f"could not connect to {pool}:{cont}") @@ -280,28 +528,44 @@ def disconnect(self): raise OSError(ret, os.strerror(ret)) self._dfs = None - def worker_fn(self, work, readdir_batch_size=READDIR_BATCH_SIZE): + def list_worker_fn(self, in_work, out_dirs, out_files, readdir_batch_size=READDIR_BATCH_SIZE): """ - Reads the directory with indexed anchor. - Returns separate lists for files and directories, ready to be consumed by other workers. + Worker function to scan directory in parallel. + It expects to receive tuples (path, index) to scan the directory with an anchor index, + from the `in_work` queue. + It should emit tuples (scanned, to_scan) to the `out_dirs` queue, where `scanned` is the + number of scanned directories and `to_scan` is the list of directories to scan in parallel. + Upon completion it should emit the list of files in the `out_files` queue. """ - (path, index) = work + self.worker_init() - dirs = [] - files = [] - ret = torch_shim.torch_list_with_anchor(DAOS_MAGIC, self._dfs, - path, index, files, dirs, readdir_batch_size - ) - if ret != 0: - raise OSError(ret, os.strerror(ret), path) + result = [] + while True: + work = in_work.get() + if work is None: + break + + (path, index) = work + + dirs = [] + files = [] + ret = torch_shim.torch_list_with_anchor(DAOS_MAGIC, self._dfs, + path, index, files, dirs, readdir_batch_size + ) + if ret != 0: + raise OSError(ret, os.strerror(ret), path) + + dirs = [chunk for d in dirs for chunk in self.split_dir_for_parallel_scan( + os.path.join(path, d)) + ] + # Even if there are no dirs, we should emit the tuple to notify the main process + out_dirs.put((1, dirs)) - dirs = [chunk for d in dirs for chunk in self.split_dir_for_parallel_scan( - os.path.join(path, d)) - ] + files = [(os.path.join(path, fname), size) for (fname, size) in files] + result.extend(files) - files = [(os.path.join(path, fname), size) for (fname, size) in files] - return files, dirs + out_files.put(result) def split_dir_for_parallel_scan(self, path): """ @@ -309,11 +573,11 @@ def split_dir_for_parallel_scan(self, path): It returns list of tuples (dirname, anchor index) to be consumed by worker function """ - ret = torch_shim.torch_recommended_dir_split(DAOS_MAGIC, self._dfs, path) - if ret < 0: - raise OSError(-ret, os.strerror(-ret), path) + ret, splits = torch_shim.torch_recommended_dir_split(DAOS_MAGIC, self._dfs, path) + if ret != 0: + raise OSError(ret, os.strerror(ret), path) - return [(path, idx) for idx in range(0, ret)] + return [(path, idx) for idx in range(0, splits)] def parallel_list(self, path=None, readdir_batch_size=READDIR_BATCH_SIZE, @@ -331,28 +595,36 @@ def parallel_list(self, path=None, if not path.startswith(os.sep): raise ValueError("relative path is unacceptable") - result = [] - inprogress = set() - dirs = self.split_dir_for_parallel_scan(path) - with ProcessPoolExecutor(max_workers=workers, initializer=self.worker_init) as pool: - while True: - batch = dirs[:workers] - dirs = dirs[len(batch):] - - futures = [pool.submit(self.worker_fn, dir, readdir_batch_size) for dir in batch] - - inprogress.update(futures) - (complete, incomplete) = concurrent.futures.wait( - inprogress, return_when=FIRST_COMPLETED) + procs = [] + work = Queue() + dirs = Queue() + files = Queue() + for _ in range(workers): + worker = Process(target=self.list_worker_fn, args=( + work, dirs, files, readdir_batch_size)) + worker.start() + procs.append(worker) + + queued = 0 + processed = 0 + for anchored_dir in self.split_dir_for_parallel_scan(path): + work.put(anchored_dir) + queued += 1 + + while processed < queued: + (scanned, to_scan) = dirs.get() + processed += scanned + for d in to_scan: + work.put(d) + queued += 1 - for fut in complete: - files, to_process = fut.result() - dirs.extend(to_process) - result.extend(files) + result = [] + for _ in range(workers): + work.put(None) + result.extend(files.get()) - inprogress = incomplete - if len(dirs) == 0 and len(inprogress) == 0: - break + for worker in procs: + worker.join() return result @@ -366,6 +638,15 @@ def read(self, path, size): return buf + # pylint: disable=too-many-arguments + def write(self, path, mode, open_flags, class_name, chunk_size, offset, data): + """ Writes data to the file """ + + ret = torch_shim.torch_write(DAOS_MAGIC, self._dfs, path, mode, + open_flags, class_name, chunk_size, offset, data) + if ret != 0: + raise OSError(ret, os.strerror(ret), path) + def batch_read(self, items): """ parallel read of multiple files """ @@ -383,3 +664,11 @@ def worker_init(self): ret = torch_shim.torch_worker_init(DAOS_MAGIC, self._dfs) if ret != 0: raise OSError(ret, os.strerror(ret), "could not re-initialize DAOS for worker") + + def get_file_size(self, path): + """ Returns file size by its path """ + + ret, size = torch_shim.torch_get_fsize(DAOS_MAGIC, self._dfs, path) + if ret != 0: + raise OSError(ret, os.strerror(ret), path) + return size diff --git a/src/client/pydaos/torch/torch_shim.c b/src/client/pydaos/torch/torch_shim.c index db1ffe8e2e8..9c848a1b1bd 100644 --- a/src/client/pydaos/torch/torch_shim.c +++ b/src/client/pydaos/torch/torch_shim.c @@ -1,13 +1,15 @@ /** * (C) Copyright 2019-2024 Intel Corporation. - * (C) Copyright 2024 Google LLC - * (C) Copyright 2024 Enakta Labs Ltd + * (C) Copyright 2024-2025 Google LLC + * (C) Copyright 2024-2025 Enakta Labs Ltd + * (C) Copyright 2025 Hewlett Packard Enterprise Development LP * * SPDX-License-Identifier: BSD-2-Clause-Patent */ #include +#include #include /* S_ISDIR */ #include /* dirname() */ @@ -67,8 +69,8 @@ __shim_handle__module_init(PyObject *self, PyObject *args) { int rc = daos_init(); if (rc) { - PyErr_Format(PyExc_TypeError, "Could not initialize DAOS module %s (rc=%d)", - d_errstr(rc), rc); + PyErr_Format(PyExc_TypeError, "Could not initialize DAOS: %s (rc=%d)", d_errstr(rc), + rc); return NULL; } @@ -87,9 +89,10 @@ __shim_handle__module_fini(PyObject *self, PyObject *args) { int rc = daos_fini(); if (rc) { - rc = daos_der2errno(rc); + PyErr_Format(PyExc_TypeError, "Could not finalize DAOS: %s (rc=%d)", d_errstr(rc), + rc); + return NULL; } - return PyLong_FromLong(rc); } @@ -97,6 +100,7 @@ static PyObject * __shim_handle__torch_connect(PyObject *self, PyObject *args) { int rc = 0; + int rc2 = 0; char *pool = NULL; char *cont = NULL; int rd_only = 1; @@ -109,6 +113,12 @@ __shim_handle__torch_connect(PyObject *self, PyObject *args) RETURN_NULL_IF_FAILED_TO_PARSE(args, "ssp", &pool, &cont, &rd_only); + rc = dfs_init(); + if (rc) { + D_ERROR("Could not initialize DFS: %s (rc=%d)", strerror(rc), rc); + return PyLong_FromLong(rc); + } + D_ALLOC_PTR(hdl); if (hdl == NULL) { rc = ENOMEM; @@ -116,12 +126,6 @@ __shim_handle__torch_connect(PyObject *self, PyObject *args) } hdl->flags = rd_only ? O_RDONLY : O_RDWR; - rc = dfs_init(); - if (rc) { - D_ERROR("Could not initialize DFS: %s (rc=%d)", strerror(rc), rc); - goto out; - } - rc = dfs_connect(pool, NULL, cont, hdl->flags, NULL, &hdl->dfs); if (rc) { D_ERROR("Could not connect to %s:%s: %s (rc=%d)", pool, cont, strerror(rc), rc); @@ -157,17 +161,27 @@ __shim_handle__torch_connect(PyObject *self, PyObject *args) } hdl->eq_owner_pid = getpid(); + PyList_SetItem(result, 0, PyLong_FromLong(rc)); + PyList_SetItem(result, 1, PyLong_FromVoidPtr(hdl)); + + return result; + out: - if (rc) { - dfs_disconnect(hdl->dfs); + rc2 = dfs_disconnect(hdl->dfs); + if (rc2) { + D_ERROR("Could not disconnect DFS: %s (rc=%d)", strerror(rc2), rc2); + } + + D_FREE(hdl->global.iov_buf); + D_FREE(hdl); - D_FREE(hdl->global.iov_buf); - D_FREE(hdl); - hdl = NULL; + rc2 = dfs_fini(); + if (rc2) { + D_ERROR("Could not finalize DFS: %s (rc=%d)", strerror(rc2), rc2); } PyList_SetItem(result, 0, PyLong_FromLong(rc)); - PyList_SetItem(result, 1, PyLong_FromVoidPtr(hdl)); + PyList_SetItem(result, 1, PyLong_FromVoidPtr(NULL)); return result; } @@ -266,20 +280,16 @@ __shim_handle__torch_recommended_dir_split(PyObject *self, PyObject *args) int rc = dfs_lookup(hdl->dfs, path, O_RDONLY, &obj, NULL, NULL); if (rc) { - return PyLong_FromLong(-rc); + return Py_BuildValue("iI", rc, nr); } rc = dfs_obj_anchor_split(obj, &nr, NULL); if (rc) { - return PyLong_FromLong(-rc); + return Py_BuildValue("iI", rc, nr); } rc = dfs_release(obj); - if (rc) { - return PyLong_FromLong(-rc); - } - - return PyLong_FromLong(nr); + return Py_BuildValue("iI", rc, nr); } static PyObject * @@ -602,9 +612,6 @@ complete_read_op(struct dfs_handle *hdl, struct io_op *op) return rc; } -/* - - */ static PyObject * __shim_handle__torch_batch_read(PyObject *self, PyObject *args) { @@ -680,6 +687,170 @@ __shim_handle__torch_batch_read(PyObject *self, PyObject *args) return PyLong_FromLong(rc); } +static int +split_path(const char *path, char **dir, char **name) +{ + assert(dir != NULL); + assert(name != NULL); + + int rc = 0; + char *cp1 = NULL; + char *cp2 = NULL; + char *dir_name = NULL; + char *file_name = NULL; + + D_STRNDUP(cp1, path, PATH_MAX); + if (cp1 == NULL) { + return ENOMEM; + } + D_STRNDUP(cp2, path, PATH_MAX); + if (cp2 == NULL) { + rc = ENOMEM; + goto out; + } + + dir_name = dirname(cp1); + file_name = basename(cp2); + + D_STRNDUP(*dir, dir_name, PATH_MAX); + if (*dir == NULL) { + rc = ENOMEM; + goto out; + } + D_STRNDUP(*name, file_name, PATH_MAX); + if (*name == NULL) { + D_FREE(*dir); + rc = ENOMEM; + goto out; + } + +out: + D_FREE(cp1); + D_FREE(cp2); + + return rc; +} + +static PyObject * +__shim_handle__torch_write(PyObject *self, PyObject *args) +{ + int rc = 0; + int rc2 = 0; + struct dfs_handle *hdl = NULL; + char *path = NULL; + char *dir_name = NULL; + char *file_name = NULL; + dfs_obj_t *dir = NULL; + dfs_obj_t *obj = NULL; + PyObject *buffer = NULL; + int oflags = 0; + mode_t mode = 0; + int chunk_size = 0; + char *class_name = NULL; + daos_size_t offset = 0; + daos_oclass_id_t cid = OC_UNKNOWN; + + RETURN_NULL_IF_FAILED_TO_PARSE(args, "LsIisiKO", &hdl, &path, &mode, &oflags, &class_name, + &chunk_size, &offset, &buffer); + assert(hdl->dfs != NULL); + + mode |= S_IFREG; /* In case when only acl bits were set */ + cid = daos_oclass_name2id(class_name); + + if (!PyObject_CheckBuffer(buffer)) { + PyErr_SetString(PyExc_TypeError, + "Expected an object that supports the buffer protocol"); + return NULL; + } + + Py_buffer bview; + if (PyObject_GetBuffer(buffer, &bview, PyBUF_READ) == -1) { + return NULL; + } + + if (!PyBuffer_IsContiguous(&bview, 'C')) { + PyErr_SetString(PyExc_BufferError, "Buffer is not contiguous"); + PyBuffer_Release(&bview); + return NULL; + } + + rc = split_path(path, &dir_name, &file_name); + if (rc) { + goto out; + } + + rc = dfs_lookup(hdl->dfs, dir_name, O_RDWR, &dir, NULL, NULL); + if (rc) { + D_ERROR("Could not lookup '%s': %s (rc=%d)", dir_name, strerror(rc), rc); + goto out; + } + + rc = dfs_open(hdl->dfs, dir, file_name, mode, oflags, cid, chunk_size, NULL, &obj); + if (rc) { + D_ERROR("Could not open '%s': %s (rc=%d)", path, strerror(rc), rc); + goto out; + } + + d_iov_t iov; + d_iov_set(&iov, bview.buf, bview.len); + + d_sg_list_t sgl = { + .sg_nr = 1, + .sg_nr_out = 0, + .sg_iovs = &iov, + }; + + rc = dfs_write(hdl->dfs, obj, &sgl, offset, NULL); + if (rc) { + D_ERROR("Could not write to '%s': %s (rc=%d)", path, strerror(rc), rc); + goto out; + } + +out: + PyBuffer_Release(&bview); + + if (obj) { + rc2 = dfs_release(obj); + if (rc2) { + D_ERROR("Could not release object '%s': %s (rc=%d)", path, strerror(rc2), + rc2); + } + } + if (dir) { + rc2 = dfs_release(dir); + if (rc2) { + D_ERROR("Could not release dir '%s': %s (rc=%d)", dir_name, strerror(rc2), + rc2); + } + } + + D_FREE(dir_name); + D_FREE(file_name); + + return PyLong_FromLong(rc); +} + +static PyObject * +__shim_handle__torch_get_fsize(PyObject *self, PyObject *args) +{ + struct dfs_handle *hdl = NULL; + char *path = NULL; + dfs_obj_t *obj = NULL; + struct stat st = {0}; + + RETURN_NULL_IF_FAILED_TO_PARSE(args, "Ls", &hdl, &path); + + assert(hdl->dfs != NULL); + + int rc = dfs_lookup(hdl->dfs, path, O_RDONLY, &obj, NULL, &st); + if (rc) { + return Py_BuildValue("iK", rc, st.st_size); + } + + rc = dfs_release(obj); + return Py_BuildValue("iK", rc, st.st_size); +} + /** * Python shim module */ @@ -694,9 +865,11 @@ static PyMethodDef torchMethods[] = { EXPORT_PYTHON_METHOD(torch_disconnect), EXPORT_PYTHON_METHOD(torch_worker_init), EXPORT_PYTHON_METHOD(torch_read), + EXPORT_PYTHON_METHOD(torch_write), EXPORT_PYTHON_METHOD(torch_batch_read), EXPORT_PYTHON_METHOD(torch_recommended_dir_split), EXPORT_PYTHON_METHOD(torch_list_with_anchor), + EXPORT_PYTHON_METHOD(torch_get_fsize), EXPORT_PYTHON_METHOD(module_init), EXPORT_PYTHON_METHOD(module_fini), diff --git a/src/tests/ftest/SConscript b/src/tests/ftest/SConscript index bfa26143d51..a36fb58e843 100644 --- a/src/tests/ftest/SConscript +++ b/src/tests/ftest/SConscript @@ -22,7 +22,7 @@ def scons(): 'server', 'soak', 'erasurecode', 'datamover', 'scripts', 'dbench', 'harness', 'telemetry', 'deployment', 'performance', - 'scrubber', 'vmd'] + 'scrubber', 'vmd', 'pytorch'] for sub_dir in dirs: env.Install(os.path.join(ftest_install_dir, sub_dir), Glob(f'{sub_dir}/*.*')) diff --git a/src/tests/ftest/directory_tree.py b/src/tests/ftest/directory_tree.py index e4003730aec..3991fcaddd1 100644 --- a/src/tests/ftest/directory_tree.py +++ b/src/tests/ftest/directory_tree.py @@ -1,5 +1,7 @@ """ (C) Copyright 2018-2024 Intel Corporation. + (C) Copyright 2025 Hewlett Packard Enterprise Development LP + (C) Copyright 2025 Google LLC SPDX-License-Identifier: BSD-2-Clause-Patent """ @@ -56,6 +58,8 @@ def __init__(self, root, height=1, subdirs_per_node=1, files_per_node=1): self._needles_prefix = "" self._needles_count = 0 self._needles_paths = [] + self._file_size_min = 0 + self._file_size_max = 0 def create(self): """Populate the directory-tree. @@ -110,6 +114,12 @@ def get_probe(self): needle_name = os.path.basename(needle_path) return needle_name, needle_path + def set_file_size(self, fmin=0, fmax=0): + """ Set the minimum and maximum file size """ + + self._file_size_min = fmin + self._file_size_max = fmax + def _create_dir_tree(self, current_path, current_height): """Create the actual directory tree using depth-first search approach. @@ -124,8 +134,7 @@ def _create_dir_tree(self, current_path, current_height): # create files for _ in range(self._files_per_node): - fd, _ = tempfile.mkstemp(dir=current_path, suffix=".file") - os.close(fd) + self._mktemp_file(where=current_path, suffix=".file") # create nested directories for _ in range(self._subdirs_per_node): @@ -144,8 +153,7 @@ def _created_remaining_needles(self): for count in range(self._needles_count): new_path = os.path.dirname(random.choice(self._needles_paths)) # nosec suffix = f"_{count:05d}.needle" - fd, _ = tempfile.mkstemp(dir=new_path, prefix=self._needles_prefix, suffix=suffix) - os.close(fd) + self._mktemp_file(where=new_path, prefix=self._needles_prefix, suffix=suffix) def _create_needle(self, current_path, current_height): """Create a *.needle file if we reach the bottom of the tree. @@ -162,14 +170,32 @@ def _create_needle(self, current_path, current_height): self._needles_count -= 1 suffix = "_{:05d}.needle".format(self._needles_count) - fd, file_name = tempfile.mkstemp( - dir=current_path, prefix=self._needles_prefix, suffix=suffix) - os.close(fd) + fname = self._mktemp_file(where=current_path, prefix=self._needles_prefix, suffix=suffix) + self._needles_paths.append(fname) + + def _mktemp_file(self, where=None, prefix=None, suffix=None): + """Create a temporary file. + If the file size is 0, the file will be empty. + If the file size is greater than 0, the file will be filled with random data. + If min and max file size are different, the file size will be a random between min and max. + """ - self._needles_paths.append(file_name) + fd, fname = tempfile.mkstemp(dir=where, prefix=prefix, suffix=suffix) + if self._file_size_min == 0: + os.close(fd) + return fname + + size = self._file_size_min + if self._file_size_max > self._file_size_min: + size = random.randrange(self._file_size_min, self._file_size_max) # nosec + + with os.fdopen(fd, 'wb') as f: + f.write(os.urandom(size)) + return fname -def _populate_dir_tree(path, height, subdirs_per_node, files_per_node, needles, prefix): +def _populate_dir_tree(path, height, subdirs_per_node, files_per_node, needles, prefix, + file_size_min, file_size_max): """Create a directory tree and its needle files. Args: @@ -179,11 +205,14 @@ def _populate_dir_tree(path, height, subdirs_per_node, files_per_node, needles, files_per_node (int): number of files created per directory needles (int): number of needles prefix (str): needle prefix + file_size_min (int): min size of the files (0 for empty file) + file_size_max (int): max size of the files """ logger.info('Populating: %s', path) dir_tree = DirTree(path, height, subdirs_per_node, files_per_node) dir_tree.set_needles_prefix(prefix) dir_tree.set_number_of_needles(needles) + dir_tree.set_file_size(file_size_min, file_size_max) tree_path = dir_tree.create() logger.info('Directory tree created at: %s', tree_path) @@ -203,11 +232,16 @@ def main(): '--needles', type=int, default=1, help='number of files in the bottom directory') parser.add_argument( '--prefix', type=str, required=True, help='bottom directory file prefix') + parser.add_argument( + '--file-size-min', type=int, default=0, help='min size of the files') + parser.add_argument( + '--file-size-max', type=int, default=0, help='max size of the files') args = parser.parse_args() try: _populate_dir_tree( - args.path, args.height, args.subdirs, args.files, args.needles, args.prefix) + args.path, args.height, args.subdirs, args.files, args.needles, + args.prefix, args.file_size_min, args.file_size_max) except Exception as error: # pylint: disable=broad-except logger.error('Error detected: %s', str(error)) logger.debug("Stacktrace", exc_info=True) diff --git a/src/tests/ftest/pytorch/checkpoint.py b/src/tests/ftest/pytorch/checkpoint.py new file mode 100644 index 00000000000..ab114ed43ae --- /dev/null +++ b/src/tests/ftest/pytorch/checkpoint.py @@ -0,0 +1,99 @@ +""" + (C) Copyright 2025 Hewlett Packard Enterprise Development LP + (C) Copyright 2025 Google LLC + (C) Copyright 2025 Enakta Labs Ltd + + SPDX-License-Identifier: BSD-2-Clause-Patent +""" +import os +import uuid + +from apricot import TestWithServers +from pydaos.torch import Checkpoint + + +class PytorchCheckpointTest(TestWithServers): + """Test Pytorch Checkpoint interface + + :avocado: recursive + """ + + def test_checkpoint_no_chunking(self): + """Test Pytorch Checkpoint interface without chunking + + Test Description: Ensure that single or multiple writes are read back correctly + + :avocado: tags=all,full_regression + :avocado: tags=vm + :avocado: tags=pytorch + :avocado: tags=PytorchCheckpointTest,test_checkpoint_no_chunking + """ + pool = self.get_pool() + container = self.get_container(pool) + + min_size = self.params.get("min_size", "/run/checkpoint_no_chunking/*", 1) + max_size = self.params.get("max_size", "/run/checkpoint_no_chunking/*", 4 * 1024 * 1024) + num_writes = self.params.get("writes", "/run/checkpoint_no_chunking/*", 7) + + writes = [] + for _ in range(num_writes): + writes.append(os.urandom(self.random.randint(min_size, max_size))) + self._test_checkpoint(pool.identifier, container.identifier, writes) + + def test_checkpoint_chunking(self): + """Test Pytorch Checkpoint interface with chunking and parallel writes + + Test Description: Ensure that single or multiple writes are read back correctly + + :avocado: tags=all,full_regression + :avocado: tags=vm + :avocado: tags=pytorch + :avocado: tags=PytorchCheckpointTest,test_checkpoint_chunking + """ + + pool = self.get_pool() + container = self.get_container(pool) + + min_size = self.params.get("min_size", "/run/checkpoint_chunking/*", 1) + max_size = self.params.get("max_size", "/run/checkpoint_chunking/*", 4 * 1024 * 1024) + num_writes = self.params.get("writes", "/run/checkpoint_chunking/*", 8) + chunk_sizes = self.params.get("chunk_sizes", "/run/checkpoint_chunking/*") + chunks_limits = self.params.get("chunks_limits", "/run/checkpoint_chunking/*") + workers = self.params.get("workers", "/run/checkpoint_chunking/*") + + if len(chunk_sizes) == 0 or len(workers) == 0 or len(chunks_limits) == 0: + self.fail("chunk_sizes, chunks_limits and workers must be provided") + + writes = [] + for _ in range(num_writes): + writes.append(os.urandom(self.random.randint(min_size, max_size))) + for chunk_size in chunk_sizes: + for chunks_limit in chunks_limits: + for worker in workers: + self._test_checkpoint(pool.identifier, container.identifier, writes, + chunk_size=chunk_size, chunks_limit=chunks_limit, + workers=worker) + + def _test_checkpoint(self, pool, cont, writes, chunk_size=0, chunks_limit=0, workers=0): + """Creates a checkpoint with the given parameters, writes the given data to it, + then reads written data back from it and compares it with the expected writes. + """ + + self.log.info("Checkpoint test: writes=%s, chunk_size=%s, chunks_limit=%s, workers=%s", + len(writes), chunk_size, chunks_limit, workers) + chkp = Checkpoint(pool, cont, transfer_chunk_size=chunk_size, chunks_limit=chunks_limit, + workers=workers) + + expected = bytearray() + fname = str(uuid.uuid4()) + with chkp.writer(fname) as w: + for chunk in writes: + w.write(chunk) + expected.extend(chunk) + + actual = chkp.reader(fname) + if expected != actual.getvalue(): + self.fail( + f"checkpoint did not read back the expected content for {len(writes)} writes," + f"chunk_size={chunk_size}, chunks_limit={chunks_limit}, workers={workers}") + del chkp diff --git a/src/tests/ftest/pytorch/checkpoint.yaml b/src/tests/ftest/pytorch/checkpoint.yaml new file mode 100644 index 00000000000..76c4b9c07d4 --- /dev/null +++ b/src/tests/ftest/pytorch/checkpoint.yaml @@ -0,0 +1,31 @@ +hosts: + test_servers: 1 + test_clients: 1 +server_config: + name: daos_server + engines_per_host: 1 + engines: + 0: + targets: 4 + nr_xs_helpers: 0 + storage: + 0: + class: ram + scm_mount: /mnt/daos + system_ram_reserved: 1 +pool: + size: 8G +container: + type: POSIX + control_method: daos + +timeout: 2000 + +checkpoint_no_chunking: + writes: 8 + +checkpoint_chunking: + writes: 4 + chunk_sizes: [449, 4096, 1048576, 4194304] + chunks_limits: [0, 1, 2, 3, 8] + workers: [1, 2, 3, 4] diff --git a/src/tests/ftest/pytorch/dataset.py b/src/tests/ftest/pytorch/dataset.py new file mode 100644 index 00000000000..12b7a4aeafc --- /dev/null +++ b/src/tests/ftest/pytorch/dataset.py @@ -0,0 +1,249 @@ +""" + (C) Copyright 2025 Hewlett Packard Enterprise Development LP + (C) Copyright 2025 Google LLC + (C) Copyright 2025 Enakta Labs Ltd + + SPDX-License-Identifier: BSD-2-Clause-Patent +""" +import hashlib + +from apricot import TestWithServers +from dfuse_utils import get_dfuse, start_dfuse +from io_utilities import DirectoryTreeCommand +from pydaos.torch import Dataset, IterableDataset +from run_utils import run_remote +from torch.utils.data import DataLoader + + +class PytorchDatasetsTest(TestWithServers): + """Test Pytorch Map Style Dataset. + + :avocado: recursive + """ + + def test_map_style_dataset(self): + """Test Map Style Dataset directly without DataLoader + + Test Description: Ensure that the dataset can read all the samples that were seeded. + + :avocado: tags=all,full_regression + :avocado: tags=vm + :avocado: tags=dfuse,pytorch + :avocado: tags=PytorchDatasetsTest,test_map_style_dataset + """ + pool = self.get_pool() + container = self.get_container(pool) + dfuse = get_dfuse(self, self.hostlist_clients) + start_dfuse(self, dfuse, pool, container) + + root_dir = dfuse.mount_dir.value + + height = self.params.get("tree_height", "/run/map_style_dataset/*") + subdirs = self.params.get("subdirs", "/run/map_style_dataset/*") + files_per_node = self.params.get("files_per_node", "/run/map_style_dataset/*") + file_min_size = self.params.get("file_min_size", "/run/map_style_dataset/*", 4096) + file_max_size = self.params.get("file_max_size", "/run/map_style_dataset/*", 128 * 1024) + + self._create_test_files(root_dir, height, subdirs, files_per_node, + file_min_size, file_max_size) + + expected = self._get_test_files_hashmap(root_dir, self.hostlist_clients) + + dataset = Dataset(pool.identifier, container.identifier) + + actual = {} + for _, content in enumerate(dataset): + h = hashlib.md5(content, usedforsecurity=False).hexdigest() + if h not in actual: + actual[h] = 1 + else: + actual[h] += 1 + + if actual != expected: + self.fail("dataset did not fetch all samples") + + def test_iterable_dataset(self): + """Test Iterable Dataset directly without DataLoader + + Test Description: Ensure that the dataset can read all the samples that were seeded. + + :avocado: tags=all,full_regression + :avocado: tags=vm + :avocado: tags=dfuse,pytorch + :avocado: tags=PytorchDatasetsTest,test_iterable_dataset + """ + pool = self.get_pool() + container = self.get_container(pool) + dfuse = get_dfuse(self, self.hostlist_clients) + start_dfuse(self, dfuse, pool, container) + + root_dir = dfuse.mount_dir.value + + height = self.params.get("tree_height", "/run/iterable_dataset/*") + subdirs = self.params.get("subdirs", "/run/iterable_dataset/*") + files_per_node = self.params.get("files_per_node", "/run/iterable_dataset/*") + file_min_size = self.params.get("file_min_size", "/run/iterable_dataset/*", 4096) + file_max_size = self.params.get("file_max_size", "/run/iterable_dataset/*", 128 * 1024) + + self._create_test_files(root_dir, height, subdirs, files_per_node, + file_min_size, file_max_size) + + expected = self._get_test_files_hashmap(root_dir, self.hostlist_clients) + + dataset = IterableDataset(pool.identifier, container.identifier) + + actual = {} + for _, content in enumerate(dataset): + h = hashlib.md5(content, usedforsecurity=False).hexdigest() + if h not in actual: + actual[h] = 1 + else: + actual[h] += 1 + + if actual != expected: + self.fail("dataset did not fetch all samples") + + def test_map_dataset_with_dataloader(self): + """Test Map Style Dataset with DataLoader. + + Test Description: Ensure that the DataLoader can read all the samples that were seeded. + + :avocado: tags=all,full_regression + :avocado: tags=vm + :avocado: tags=dfuse,pytorch + :avocado: tags=PytorchDatasetsTest,test_map_dataset_with_dataloader + """ + pool = self.get_pool() + container = self.get_container(pool) + dfuse = get_dfuse(self, self.hostlist_clients) + start_dfuse(self, dfuse, pool, container) + + root_dir = dfuse.mount_dir.value + + height = self.params.get("tree_height", "/run/map_dataset_with_dataloader/*") + subdirs = self.params.get("subdirs", "/run/map_dataset_with_dataloader/*") + files_per_node = self.params.get("files_per_node", "/run/map_dataset_with_dataloader/*") + + # DataLoader requires that samples are of the same size + file_min_size = self.params.get("file_min_size", "/run/map_dataset_with_dataloader/*", 4096) + file_max_size = self.params.get("file_max_size", "/run/map_dataset_with_dataloader/*", 4096) + + batch_sizes = self.params.get("batch_size", "/run/map_dataset_with_dataloader/*") + processes = self.params.get("processes", "/run/map_dataset_with_dataloader/*") + + self._create_test_files(root_dir, height, subdirs, files_per_node, + file_min_size, file_max_size) + + expected = self._get_test_files_hashmap(root_dir, self.hostlist_clients) + + dataset = Dataset(pool.identifier, container.identifier) + for procs in processes: + for batch_size in batch_sizes: + self._test_dataloader(dataset, expected, batch_size, procs) + + def test_iterable_dataset_with_dataloader(self): + """Test Iterable Dataset with DataLoader. + + Test Description: Ensure that the DataLoader can read all the samples that were seeded. + + :avocado: tags=all,full_regression + :avocado: tags=vm + :avocado: tags=dfuse,pytorch + :avocado: tags=PytorchDatasetsTest,test_iterable_dataset_with_dataloader + """ + pool = self.get_pool() + container = self.get_container(pool) + dfuse = get_dfuse(self, self.hostlist_clients) + start_dfuse(self, dfuse, pool, container) + + root_dir = dfuse.mount_dir.value + + height = self.params.get("tree_height", "/run/iterable_dataset_with_dataloader/*") + subdirs = self.params.get("subdirs", "/run/iterable_dataset_with_dataloader/*") + files_per_node = self.params.get( + "files_per_node", "/run/iterable_dataset_with_dataloader/*") + + # DataLoader requires that samples are of the same size + file_min_size = self.params.get( + "file_min_size", "/run/iterable_dataset_with_dataloader/*", 4096) + file_max_size = self.params.get( + "file_max_size", "/run/iterable_dataset_with_dataloader/*", 4096) + + batch_sizes = self.params.get("batch_size", "/run/iterable_dataset_with_dataloader/*") + processes = self.params.get("processes", "/run/iterable_dataset_with_dataloader/*") + + self._create_test_files(root_dir, height, subdirs, files_per_node, + file_min_size, file_max_size) + + expected = self._get_test_files_hashmap(root_dir, self.hostlist_clients) + + dataset = IterableDataset(pool.identifier, container.identifier) + for procs in processes: + for batch_size in batch_sizes: + self._test_dataloader(dataset, expected, batch_size, procs) + + def _test_dataloader(self, dataset, expected, batch_size, processes): + """With the given dataset and parameters load all samples using DataLoader + and check if all expected samples are fetched""" + + loader = DataLoader(dataset, + batch_size=batch_size, + num_workers=processes, + # no collation, otherwise tensors are returned + collate_fn=lambda x: x, + worker_init_fn=dataset.worker_init, + drop_last=False) + + actual = {} + for batch in loader: + for content in batch: + h = hashlib.md5(content, usedforsecurity=False).hexdigest() + if h not in actual: + actual[h] = 1 + else: + actual[h] += 1 + + if actual != expected: + self.fail( + f"DataLoader with nproc={processes} and bs={batch_size} did not fetch all samples") + + def _create_test_files(self, path, height, subdirs, files_per_node, min_size, max_size): + """Create a directory tree""" + + dir_tree = DirectoryTreeCommand(self.hostlist_clients) + dir_tree.path.value = path + dir_tree.height.value = height + dir_tree.subdirs.value = subdirs + dir_tree.files.value = files_per_node + dir_tree.prefix.value = "samples" + dir_tree.needles.value = 0 + dir_tree.file_size_min.value = min_size + dir_tree.file_size_max.value = max_size + + self.log.info("Populating: %s", path) + result = dir_tree.run() + if not result.passed: + self.fail( + f"Error running '{dir_tree.command}' for '{path}' on {result.failed_hosts}") + + def _get_test_files_hashmap(self, root_dir, hostlist): + """Map all files in the directory tree to their md5 hash""" + + cmd = f'find {root_dir} -type f -exec md5sum {{}} + ' + result = run_remote(self.log, hostlist, cmd) + + if not result.passed: + self.fail(f'"{cmd}" failed on {result.failed_hosts}') + + hashes = {} + for line in result.output[0].stdout: + parts = line.split() + if len(parts) != 2: + self.fail(f'unexpected result from md5sum: {line}') + h = parts[0] + if h not in hashes: + hashes[h] = 1 + else: + hashes[h] += 1 + + return hashes diff --git a/src/tests/ftest/pytorch/dataset.yaml b/src/tests/ftest/pytorch/dataset.yaml new file mode 100644 index 00000000000..aac16c17f2c --- /dev/null +++ b/src/tests/ftest/pytorch/dataset.yaml @@ -0,0 +1,44 @@ +hosts: + test_servers: 1 + test_clients: 1 +server_config: + name: daos_server + engines_per_host: 1 + engines: + 0: + targets: 4 + nr_xs_helpers: 0 + storage: + 0: + class: ram + scm_mount: /mnt/daos + system_ram_reserved: 1 +pool: + size: 8G +container: + type: POSIX + control_method: daos + +map_style_dataset: + tree_height: 4 + subdirs: 3 + files_per_node: 5 + +iterable_dataset: + tree_height: 3 + subdirs: 3 + files_per_node: 6 + +map_dataset_with_dataloader: + tree_height: 3 + subdirs: 5 + files_per_node: 7 + processes: [0, 1, 2, 3, 4, 8] + batch_size: [2, 4, 8, 16] + +iterable_dataset_with_dataloader: + tree_height: 3 + subdirs: 5 + files_per_node: 7 + processes: [0, 1, 2, 3, 4, 8] + batch_size: [2, 4, 8, 16] diff --git a/src/tests/ftest/util/io_utilities.py b/src/tests/ftest/util/io_utilities.py index af7cd37e8ca..47d1d80794b 100644 --- a/src/tests/ftest/util/io_utilities.py +++ b/src/tests/ftest/util/io_utilities.py @@ -1,5 +1,7 @@ """ (C) Copyright 2018-2024 Intel Corporation. + (C) Copyright 2025 Hewlett Packard Enterprise Development LP + (C) Copyright 2025 Google LLC SPDX-License-Identifier: BSD-2-Clause-Patent """ @@ -357,6 +359,8 @@ def __init__(self, hosts, namespace="/run/directory_tree/*"): self.files = FormattedParameter("--files {}") self.needles = FormattedParameter("--needles {}") self.prefix = FormattedParameter("--prefix {}") + self.file_size_min = FormattedParameter("--file-size-min {}") + self.file_size_max = FormattedParameter("--file-size-max {}") # run options self.hosts = hosts.copy() diff --git a/utils/cq/words.dict b/utils/cq/words.dict index 102ca21a882..4a8d20041b8 100644 --- a/utils/cq/words.dict +++ b/utils/cq/words.dict @@ -409,6 +409,7 @@ scancel scm scons scontrol +seekable sharedctypes shlex simul diff --git a/utils/node_local_test.py b/utils/node_local_test.py index 727f2214ecf..6d7b5e56ac4 100755 --- a/utils/node_local_test.py +++ b/utils/node_local_test.py @@ -2,6 +2,7 @@ """Node local test (NLT). (C) Copyright 2020-2024 Intel Corporation. +(C) Copyright 2025 Hewlett Packard Enterprise Development LP SPDX-License-Identifier: BSD-2-Clause-Patent @@ -4543,66 +4544,6 @@ def import_torch(self, server): return importlib.import_module('pydaos.torch') - @needs_dfuse_with_opt(caching_variants=[False]) - def test_torch_map_dataset(self): - """Check that all files in container are read regardless of the directory level""" - test_files = [ - {"name": "0.txt", "content": b"0", "seen": 0}, - {"name": "1/l1.txt", "content": b"1", "seen": 0}, - {"name": "1/2/l2.txt", "content": b"2", "seen": 0}, - {"name": "1/2/3/l3.txt", "content": b"3", "seen": 0}, - ] - - for tf in test_files: - file = join(self.dfuse.dir, tf["name"]) - os.makedirs(os.path.dirname(file), exist_ok=True) - with open(file, 'wb') as f: - f.write(tf["content"]) - - torch = self.import_torch(self.server) - dataset = torch.Dataset(pool=self.pool.uuid, cont=self.container.uuid) - - assert len(dataset) == len(test_files) - - for _, content in enumerate(dataset): - for f in test_files: - if f["content"] == content: - f["seen"] += 1 - - for f in test_files: - assert f["seen"] == 1 - - del dataset - - @needs_dfuse_with_opt(caching_variants=[False]) - def test_torch_iter_dataset(self): - """Check that all files in container are read regardless of the directory level""" - test_files = [ - {"name": "0.txt", "content": b"0", "seen": 0}, - {"name": "1/l1.txt", "content": b"1", "seen": 0}, - {"name": "1/2/l2.txt", "content": b"2", "seen": 0}, - {"name": "1/2/3/l3.txt", "content": b"3", "seen": 0}, - ] - - for tf in test_files: - file = join(self.dfuse.dir, tf["name"]) - os.makedirs(os.path.dirname(file), exist_ok=True) - with open(file, 'wb') as f: - f.write(tf["content"]) - - torch = self.import_torch(self.server) - dataset = torch.IterableDataset(pool=self.pool.uuid, cont=self.container.uuid) - - for content in dataset: - for f in test_files: - if f["content"] == content: - f["seen"] += 1 - - for f in test_files: - assert f["seen"] == 1 - - del dataset - class NltStdoutWrapper(): """Class for capturing stdout from threads"""