Skip to content

Commit

Permalink
add func: run_parallel
Browse files Browse the repository at this point in the history
  • Loading branch information
xyluo25 committed Mar 5, 2024
1 parent 70f6677 commit 41e68ad
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 3 deletions.
1 change: 1 addition & 0 deletions pyufunc/pkg_configs.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
"create" : [],
"find" : [],
"calc" : [],
"run" : [],
"group" : [],
"check" : [],
"validate" : [],
Expand Down
78 changes: 77 additions & 1 deletion pyufunc/pkg_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,23 @@
import datetime
from functools import wraps
import copy
from multiprocessing import Pool
from typing import Iterable, Callable, Iterator
import os


# specify the available utility functions for importing all
__all__ = ["import_package",
"requires",
"func_running_time",
"func_time",
"run_parallel",
"get_user_defined_func",
"is_user_defined_func",
"is_module_importable",
]

"""TODO
a decorator to run function in multiple processors
a decorator to run function in multiple threads
...
Expand Down Expand Up @@ -355,3 +358,76 @@ def passer(*args, **kwargs):
return passer

return inner


@func_running_time
def run_parallel(func: Callable, iterable: Iterable,
num_processes: int = os.cpu_count() - 1, chunksize: int = 0) -> Iterator:
"""Run a function in parallel with multiple processors.
Args:
func (callable): The function to run in parallel.
iterable (Iterable): The input iterable to the function.
num_processes (int, optional): The number of processors to use. Defaults to os.cpu_count() - 1.
chunksize (int, optional): The chunksize for the parallel processing. Defaults to "".
Returns:
Iterator: The results of the function.
Raises:
TypeError: If the input function is not callable,
or if chunksize or num_processes are not integers,
or if iterable is not an Iterable.
TypeError: if the input iterable is not an Iterable
The input iterable should be an Iterable.
TypeError: if the input number of processors is not an integer
TypeError: if the input chunksize should be an integer
ValueError: if the input number of processors is not greater than 0
ValueError: if the input chunksize is not greater than 0
Examples:
>>> import numpy as np
>>> from pyufunc import run_parallel
>>> def my_func(x):
print(f"running {x}")
return x**2
>>> results = run_parallel(my_func, list(range(10)), num_processes=7)
>>> for res in results:
print(res)
0, 1, 4, 9, 16, 25, 36, 49, 64, 81
"""

# TDD, test-driven development: check inputs

if not callable(func):
raise TypeError("The input function should be a callable.")
if not isinstance(iterable, Iterable):
raise TypeError("The input iterable should be an Iterable.")
if not isinstance(num_processes, int):
raise TypeError("The input number of processors should be an integer.")
if not isinstance(chunksize, int):
raise TypeError("The input chunksize should be an integer.")

# check the number of processors and chunksize are greater than 0
if num_processes <= 0:
raise ValueError("The input number of processors should be greater than 0.")
if chunksize < 0:
raise ValueError("The input chunksize should be greater than 0.")

# Step 1: get the number of processors to use
num_processors = min(num_processes, os.cpu_count() - 1)
print(f" :Info: using {num_processors} processors to run {func.__name__}...")

# Step 2: check the chunksize
if chunksize >= len(iterable):
chunksize = len(iterable) // num_processors

# Step 3: run the function in parallel
with Pool(num_processors) as pool:
if chunksize == 0:
results = pool.map(func, iterable)
else:
results = pool.map(func, iterable, chunksize=chunksize)

return results
3 changes: 2 additions & 1 deletion utility_function_by_category.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ Note: we may not update available functions in time, please run code below to ch
pyufunc.show_util_func_by_category()
```

Available utility functions in pyUFunc (52):
Available utility functions in pyUFunc (53):

- util_common:
- show_docstring_headers
Expand Down Expand Up @@ -76,6 +76,7 @@ Available utility functions in pyUFunc (52):
- requires
- func_running_time
- func_time
- run_parallel
- get_user_defined_func
- is_user_defined_func
- is_module_importable
Expand Down
5 changes: 4 additions & 1 deletion utility_function_by_keyword.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ Note: we may not update available functions in time, please run code below to ch
pyufunc.show_util_func_by_keyword()
```

Available utility functions in pyUFunc (52):
Available utility functions in pyUFunc (53):

- non-keywords:
- gmns_geo
Expand Down Expand Up @@ -62,6 +62,9 @@ Available utility functions in pyUFunc (52):
- calc:
- calc_distance_on_unit_sphere

- run:
- run_parallel

- group:
- group_dt_yearly
- group_dt_monthly
Expand Down
2 changes: 2 additions & 0 deletions utility_function_package_review.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@ Updating...
| | | | |
| | | | |

psutil
pyhelpers
pyutilator - @print_return_value @print_kwargs @print_args
pyutil 3.3.6
common-pyutil 0.8.7
imutils
mxnet

0 comments on commit 41e68ad

Please sign in to comment.