From 41e68ade46c6ec44bd21de73df75f8b58dd16a24 Mon Sep 17 00:00:00 2001 From: xyluo <36498464+Xiangyongluo@users.noreply.github.com> Date: Tue, 5 Mar 2024 14:55:15 -0700 Subject: [PATCH] add func: run_parallel --- pyufunc/pkg_configs.py | 1 + pyufunc/pkg_utils.py | 78 +++++++++++++++++++++++++++++- utility_function_by_category.md | 3 +- utility_function_by_keyword.md | 5 +- utility_function_package_review.md | 2 + 5 files changed, 86 insertions(+), 3 deletions(-) diff --git a/pyufunc/pkg_configs.py b/pyufunc/pkg_configs.py index 02c628f..01fb976 100644 --- a/pyufunc/pkg_configs.py +++ b/pyufunc/pkg_configs.py @@ -51,6 +51,7 @@ "create" : [], "find" : [], "calc" : [], + "run" : [], "group" : [], "check" : [], "validate" : [], diff --git a/pyufunc/pkg_utils.py b/pyufunc/pkg_utils.py index 12928b2..fd79869 100644 --- a/pyufunc/pkg_utils.py +++ b/pyufunc/pkg_utils.py @@ -13,6 +13,9 @@ import datetime from functools import wraps import copy +from multiprocessing import Pool +from typing import Iterable, Callable, Iterator +import os # specify the available utility functions for importing all @@ -20,13 +23,13 @@ "requires", "func_running_time", "func_time", + "run_parallel", "get_user_defined_func", "is_user_defined_func", "is_module_importable", ] """TODO -a decorator to run function in multiple processors a decorator to run function in multiple threads ... @@ -355,3 +358,76 @@ def passer(*args, **kwargs): return passer return inner + + +@func_running_time +def run_parallel(func: Callable, iterable: Iterable, + num_processes: int = os.cpu_count() - 1, chunksize: int = 0) -> Iterator: + """Run a function in parallel with multiple processors. + + Args: + func (callable): The function to run in parallel. + iterable (Iterable): The input iterable to the function. + num_processes (int, optional): The number of processors to use. Defaults to os.cpu_count() - 1. + chunksize (int, optional): The chunksize for the parallel processing. Defaults to "". + + Returns: + Iterator: The results of the function. + + Raises: + TypeError: If the input function is not callable, + or if chunksize or num_processes are not integers, + or if iterable is not an Iterable. + + TypeError: if the input iterable is not an Iterable + The input iterable should be an Iterable. + TypeError: if the input number of processors is not an integer + TypeError: if the input chunksize should be an integer + ValueError: if the input number of processors is not greater than 0 + ValueError: if the input chunksize is not greater than 0 + + Examples: + >>> import numpy as np + >>> from pyufunc import run_parallel + >>> def my_func(x): + print(f"running {x}") + return x**2 + >>> results = run_parallel(my_func, list(range(10)), num_processes=7) + >>> for res in results: + print(res) + 0, 1, 4, 9, 16, 25, 36, 49, 64, 81 + """ + + # TDD, test-driven development: check inputs + + if not callable(func): + raise TypeError("The input function should be a callable.") + if not isinstance(iterable, Iterable): + raise TypeError("The input iterable should be an Iterable.") + if not isinstance(num_processes, int): + raise TypeError("The input number of processors should be an integer.") + if not isinstance(chunksize, int): + raise TypeError("The input chunksize should be an integer.") + + # check the number of processors and chunksize are greater than 0 + if num_processes <= 0: + raise ValueError("The input number of processors should be greater than 0.") + if chunksize < 0: + raise ValueError("The input chunksize should be greater than 0.") + + # Step 1: get the number of processors to use + num_processors = min(num_processes, os.cpu_count() - 1) + print(f" :Info: using {num_processors} processors to run {func.__name__}...") + + # Step 2: check the chunksize + if chunksize >= len(iterable): + chunksize = len(iterable) // num_processors + + # Step 3: run the function in parallel + with Pool(num_processors) as pool: + if chunksize == 0: + results = pool.map(func, iterable) + else: + results = pool.map(func, iterable, chunksize=chunksize) + + return results diff --git a/utility_function_by_category.md b/utility_function_by_category.md index 0f2e212..418c746 100644 --- a/utility_function_by_category.md +++ b/utility_function_by_category.md @@ -12,7 +12,7 @@ Note: we may not update available functions in time, please run code below to ch pyufunc.show_util_func_by_category() ``` -Available utility functions in pyUFunc (52): +Available utility functions in pyUFunc (53): - util_common: - show_docstring_headers @@ -76,6 +76,7 @@ Available utility functions in pyUFunc (52): - requires - func_running_time - func_time + - run_parallel - get_user_defined_func - is_user_defined_func - is_module_importable diff --git a/utility_function_by_keyword.md b/utility_function_by_keyword.md index 9400b1c..7ee8993 100644 --- a/utility_function_by_keyword.md +++ b/utility_function_by_keyword.md @@ -20,7 +20,7 @@ Note: we may not update available functions in time, please run code below to ch pyufunc.show_util_func_by_keyword() ``` -Available utility functions in pyUFunc (52): +Available utility functions in pyUFunc (53): - non-keywords: - gmns_geo @@ -62,6 +62,9 @@ Available utility functions in pyUFunc (52): - calc: - calc_distance_on_unit_sphere +- run: + - run_parallel + - group: - group_dt_yearly - group_dt_monthly diff --git a/utility_function_package_review.md b/utility_function_package_review.md index 63d2a50..43b70e6 100644 --- a/utility_function_package_review.md +++ b/utility_function_package_review.md @@ -22,9 +22,11 @@ Updating... | | | | | | | | | | +psutil pyhelpers pyutilator - @print_return_value @print_kwargs @print_args pyutil 3.3.6 common-pyutil 0.8.7 imutils mxnet +