Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feature] Access to ParallelFor values and set_paralellism per Op #7454

Closed
mikwieczorek opened this issue Mar 23, 2022 · 3 comments
Closed
Assignees
Labels
area/backend area/sdk kind/feature lifecycle/stale The issue / pull request is stale, any activities remove this label.

Comments

@mikwieczorek
Copy link

mikwieczorek commented Mar 23, 2022

Feature Area

/area backend
/area sdk

What feature would you like to see?

I stumbled upon a case where two features could be useful when building dynamically a pipeline that is controlled by some outside config.

What is the use case or pain point?

Let's say we have a number of datasets and number of models we want to train. Not all models should be run on all datasets, so we use a config to specify pipeline content. (Examples in the code). Moreover, we would like that each run-per-dataset is parallel to each other and some task may be CPU/RAM heavy so we would like to limit the parallelism per Op-type.
This i a case when we want to run a pipeline from github CI/CD to test the newly pushed code on a set of dataset and models to ensure it's validity and performance.

Is there a workaround currently?

Semi-workaround is presented in the example as using config in a function that returns pipeline-function and iterating over the config without using ParallelFor, but this seems problematic in limiting parallelism per Op.

For the problem of paralellism I saw a related issue #4089, but
also I know that Argo allows setting parallelism per Task/Step, so having that in Kubeflow would be nice.

Also using separate pipelines per dateset is somewhat a working option.

from kfp.components import func_to_container_op
import kfp.dsl as dsl
import json

### Functions
def print_fun(calculation: str) -> str:
    print("Calculation Type: ",calculation)
    return calculation

def add(a: float, b: float) -> float:
  return a + b

def multiply(a: float, b: float) -> float:
  return a * b

def divide(a: float, b: float) -> float:
  return a / b

def subtract(a: float, b: float) -> float:
  return a - b

### Container ops
print_op = func_to_container_op(print_fun)
add_op = func_to_container_op(add)
multiply_op = func_to_container_op(multiply)
divide_op = func_to_container_op(divide)
subtract_op = func_to_container_op(subtract)

### Dict to easily fetch op according to config
name2operator = {
    "add": add_op,
    "multiply": multiply_op,
    "divide": divide_op,
    "subtract": subtract_op
}

### Example config
master_config = [
    {
        "name": "name1",
        "a": 1,
        "b": 1,
        "models": ["add", "subtract", "multiply", "divide"]
    },
    {
        "name": "name2",
        "a": 0,
        "b": 1,
        "models": ["multiply", "divide"]
    },
    
        {
        "name": "name3",
        "a": 100,
        "b": 2,
        "models": ["subtract", "multiply", "divide"]
    },
    
]

Semi-working solution

### Workaround to use for-loop and create model-task in line with config.
### Using ParallelFor won't allow to get the config as 
def get_pipeline(config):
    @dsl.pipeline(
          name='Parallel pipeline mock test',
          description='Pipeline with for-loop and config-based operators.'
        )
    def multi_pipeline():
        for config_item in config:
            root_op = print_op(config_item['name'])
            root_op.

            for model_name in config_item['models']:
                new_op = name2operator[model_name](config_item['a'], config_item['b'])
                new_op.after(root_op)
    return multi_pipeline

client = kfp.Client()
client.create_run_from_pipeline_func(get_pipeline(master_config), arguments={})

Example of my thought process in solving the problem – changing ParallelFor

@dsl.pipeline(
    name='Parallel pipeline mock test',
    description='Pipeline with for-loop and config-based operators.'
)
def multi_pipeline(
    config
):
    root_ops = []
    # I only want parallelism limit on the print_op
    for idx, item in enumerate(dsl.ParallelFor(config, parallelism=2)):
        root_op = print_op(item['name'])
        root_ops.append(root_op)
        
    # Now it is impossible, as config is PipelineParam and it is not iterable, but would be nice if it is
    for root_idx, item in enumerate(config):
        # item['models'] is also not iterable
        for model_name in item['models']:
            new_op = name2operator[model_name](item.a, item.b)
            new_op.after(root_ops[root_idx])

client = kfp.Client()
client.create_run_from_pipeline_func(multi_pipeline, arguments={
    "config": json.dumps(master_config)
})

Love this idea? Give it a 👍. We prioritize fulfilling features with the most 👍.

Copy link

github-actions bot commented May 5, 2024

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

@github-actions github-actions bot added the lifecycle/stale The issue / pull request is stale, any activities remove this label. label May 5, 2024
Copy link

This issue has been automatically closed because it has not had recent activity. Please comment "/reopen" to reopen it.

@HumairAK
Copy link
Collaborator

HumairAK commented May 27, 2024

I believe this falls under: #10798

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/backend area/sdk kind/feature lifecycle/stale The issue / pull request is stale, any activities remove this label.
Projects
Status: Closed
Development

No branches or pull requests

3 participants