Skip to content

Commit

Permalink
Implement large tests for subdagio
Browse files Browse the repository at this point in the history
Signed-off-by: droctothorpe <[email protected]>
Co-authored-by: zazulam <[email protected]>
Co-authored-by: CarterFendley <[email protected]>
  • Loading branch information
3 people committed Oct 1, 2024
1 parent 6ce09a0 commit 00fb964
Show file tree
Hide file tree
Showing 11 changed files with 406 additions and 22 deletions.
6 changes: 3 additions & 3 deletions backend/src/v2/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -1353,7 +1353,7 @@ func resolveUpstreamArtifacts(cfg resolveUpstreamArtifactsConfig) error {
}
glog.V(4).Info("producer: ", producer)
currentTask := producer
var outputArtifactKey string = taskOutput.GetOutputArtifactKey()
outputArtifactKey := taskOutput.GetOutputArtifactKey()
currentSubTaskMaybeDAG := true
// Continue looping until we reach a sub-task that is NOT a DAG.
for currentSubTaskMaybeDAG {
Expand All @@ -1371,9 +1371,9 @@ func resolveUpstreamArtifacts(cfg resolveUpstreamArtifactsConfig) error {
glog.V(4).Infof("Deserialized outputArtifacts: %v", outputArtifacts)
// Adding support for multiple output artifacts
var subTaskName string
value := outputArtifacts[outputArtifactKey].GetArtifactSelectors()
artifactSelectors := outputArtifacts[outputArtifactKey].GetArtifactSelectors()

for _, v := range value {
for _, v := range artifactSelectors {
glog.V(4).Infof("v: %v", v)
glog.V(4).Infof("v.ProducerSubtask: %v", v.ProducerSubtask)
glog.V(4).Infof("v.OutputArtifactKey: %v", v.OutputArtifactKey)
Expand Down
65 changes: 48 additions & 17 deletions samples/v2/sample_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,22 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import unittest
from concurrent.futures import ThreadPoolExecutor, as_completed
from concurrent.futures import as_completed
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass
import inspect
import os
from pprint import pprint
from typing import List
import unittest

import component_with_optional_inputs
import hello_world
import kfp
from kfp.dsl.graph_component import GraphComponent
import component_with_optional_inputs
import pipeline_container_no_input
import pipeline_with_env
import hello_world
import producer_consumer_param
import pipeline_container_no_input
import two_step_pipeline_containerized

_MINUTE = 60 # seconds
Expand All @@ -38,16 +40,21 @@ class TestCase:


class SampleTest(unittest.TestCase):
_kfp_host_and_port = os.getenv('KFP_API_HOST_AND_PORT', 'http://localhost:8888')
_kfp_ui_and_port = os.getenv('KFP_UI_HOST_AND_PORT', 'http://localhost:8080')
_kfp_host_and_port = os.getenv('KFP_API_HOST_AND_PORT',
'http://localhost:8888')
_kfp_ui_and_port = os.getenv('KFP_UI_HOST_AND_PORT',
'http://localhost:8080')
_client = kfp.Client(host=_kfp_host_and_port, ui_host=_kfp_ui_and_port)

def test(self):
test_cases: List[TestCase] = [
TestCase(pipeline_func=hello_world.pipeline_hello_world),
TestCase(pipeline_func=producer_consumer_param.producer_consumer_param_pipeline),
TestCase(pipeline_func=pipeline_container_no_input.pipeline_container_no_input),
TestCase(pipeline_func=two_step_pipeline_containerized.two_step_pipeline_containerized),
TestCase(pipeline_func=producer_consumer_param
.producer_consumer_param_pipeline),
TestCase(pipeline_func=pipeline_container_no_input
.pipeline_container_no_input),
TestCase(pipeline_func=two_step_pipeline_containerized
.two_step_pipeline_containerized),
TestCase(pipeline_func=component_with_optional_inputs.pipeline),
TestCase(pipeline_func=pipeline_with_env.pipeline_with_env),

Expand All @@ -56,27 +63,51 @@ def test(self):
# TestCase(pipeline_func=pipeline_with_volume.pipeline_with_volume),
# TestCase(pipeline_func=pipeline_with_secret_as_volume.pipeline_secret_volume),
# TestCase(pipeline_func=pipeline_with_secret_as_env.pipeline_secret_env),

# This next set of tests needs to be commented out until issue
# https://github.com/kubeflow/pipelines/issues/11239#issuecomment-2374792592
# is addressed or the driver image that is used in CI is updated
# because otherwise the tests are run against incompatible version
# of the driver. In the meantime, for local validation, these tests
# can be executed (once you've manually deployed an updated driver
# image).

# TestCase(pipeline_func=subdagio.parameter.crust),
# TestCase(pipeline_func=subdagio.parameter_cache.crust),
# TestCase(pipeline_func=subdagio.artifact_cache.crust),
# TestCase(pipeline_func=subdagio.artifact.crust),
# TestCase(pipeline_func=subdagio.multiple_parameters.crust),
# TestCase(pipeline_func=subdagio.multiple_parameters_namedtuple.crust)
# TestCase(pipeline_func=subdagio.multiple_artifacts_namedtuple.crust),
]

with ThreadPoolExecutor() as executor:
futures = [
executor.submit(self.run_test_case, test_case.pipeline_func, test_case.timeout)
for test_case in test_cases
executor.submit(self.run_test_case, test_case.pipeline_func,
test_case.timeout) for test_case in test_cases
]
for future in as_completed(futures):
future.result()

def run_test_case(self, pipeline_func: GraphComponent, timeout: int):
with self.subTest(pipeline=pipeline_func, msg=pipeline_func.name):
run_result = self._client.create_run_from_pipeline_func(pipeline_func=pipeline_func)
print(
f'Running pipeline: {inspect.getmodule(pipeline_func.pipeline_func).__name__}/{pipeline_func.name}.'
)
run_result = self._client.create_run_from_pipeline_func(
pipeline_func=pipeline_func)

run_response = run_result.wait_for_run_completion(timeout)

pprint(run_response.run_details)
print("Run details page URL:")
print(f"{self._kfp_ui_and_port}/#/runs/details/{run_response.run_id}")
print('Run details page URL:')
print(
f'{self._kfp_ui_and_port}/#/runs/details/{run_response.run_id}')

self.assertEqual(run_response.state, "SUCCEEDED")
self.assertEqual(run_response.state, 'SUCCEEDED')
print(
f'Pipeline, {inspect.getmodule(pipeline_func.pipeline_func).__name__}/{pipeline_func.name}, succeeded.'
)


if __name__ == '__main__':
Expand Down
7 changes: 7 additions & 0 deletions samples/v2/subdagio/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from subdagio import artifact
from subdagio import artifact_cache
from subdagio import multiple_artifacts_namedtuple
from subdagio import multiple_parameters
from subdagio import multiple_parameters_namedtuple
from subdagio import parameter
from subdagio import parameter_cache
47 changes: 47 additions & 0 deletions samples/v2/subdagio/artifact.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import os

from kfp import Client
from kfp import dsl


@dsl.component
def core_comp(dataset: dsl.Output[dsl.Dataset]):
with open(dataset.path, 'w') as f:
f.write('foo')


@dsl.component
def crust_comp(input: dsl.Dataset):
with open(input.path, 'r') as f:
print('input: ', f.read())


@dsl.pipeline
def core() -> dsl.Dataset:
task = core_comp()
task.set_caching_options(False)

return task.output


@dsl.pipeline
def mantle() -> dsl.Dataset:
dag_task = core()
dag_task.set_caching_options(False)

return dag_task.output


@dsl.pipeline(name=os.path.basename(__file__).removesuffix('.py') + '-pipeline')
def crust():
dag_task = mantle()
dag_task.set_caching_options(False)

task = crust_comp(input=dag_task.output)
task.set_caching_options(False)


if __name__ == '__main__':
# Compiler().compile(pipeline_func=crust, package_path=f"{__file__.removesuffix('.py')}.yaml")
client = Client()
client.create_run_from_pipeline_func(crust)
42 changes: 42 additions & 0 deletions samples/v2/subdagio/artifact_cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import os

from kfp import Client
from kfp import dsl


@dsl.component
def core_comp(dataset: dsl.Output[dsl.Dataset]):
with open(dataset.path, 'w') as f:
f.write('foo')


@dsl.component
def crust_comp(input: dsl.Dataset):
with open(input.path, 'r') as f:
print('input: ', f.read())


@dsl.pipeline
def core() -> dsl.Dataset:
task = core_comp()

return task.output


@dsl.pipeline
def mantle() -> dsl.Dataset:
dag_task = core()
return dag_task.output


@dsl.pipeline(name=os.path.basename(__file__).removesuffix('.py') + '-pipeline')
def crust():
dag_task = mantle()

task = crust_comp(input=dag_task.output)


if __name__ == '__main__':
# Compiler().compile(pipeline_func=crust, package_path=f"{__file__.removesuffix('.py')}.yaml")
client = Client()
client.create_run_from_pipeline_func(crust)
66 changes: 66 additions & 0 deletions samples/v2/subdagio/multiple_artifacts_namedtuple.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import os
from typing import NamedTuple

from kfp import Client
from kfp import dsl


@dsl.component
def core_comp(ds1: dsl.Output[dsl.Dataset], ds2: dsl.Output[dsl.Dataset]):
with open(ds1.path, 'w') as f:
f.write('foo')
with open(ds2.path, 'w') as f:
f.write('bar')


@dsl.component
def crust_comp(
ds1: dsl.Dataset,
ds2: dsl.Dataset,
):
with open(ds1.path, 'r') as f:
print('ds1: ', f.read())
with open(ds2.path, 'r') as f:
print('ds2: ', f.read())


@dsl.pipeline
def core() -> NamedTuple(
'outputs',
ds1=dsl.Dataset,
ds2=dsl.Dataset,
): # type: ignore
task = core_comp()
task.set_caching_options(False)

return task.outputs


@dsl.pipeline
def mantle() -> NamedTuple(
'outputs',
ds1=dsl.Dataset,
ds2=dsl.Dataset,
): # type: ignore
dag_task = core()
dag_task.set_caching_options(False)

return dag_task.outputs


@dsl.pipeline(name=os.path.basename(__file__).removesuffix('.py') + '-pipeline')
def crust():
dag_task = mantle()
dag_task.set_caching_options(False)

task = crust_comp(
ds1=dag_task.outputs['ds1'],
ds2=dag_task.outputs['ds2'],
)
task.set_caching_options(False)


if __name__ == '__main__':
# Compiler().compile(pipeline_func=crust, package_path=f"{__file__.removesuffix('.py')}.yaml")
client = Client()
client.create_run_from_pipeline_func(crust)
48 changes: 48 additions & 0 deletions samples/v2/subdagio/multiple_parameters.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import os

from kfp import Client
from kfp import dsl
from kfp.compiler import Compiler


@dsl.component
def core_comp() -> int:
return 1


@dsl.component
def crust_comp(x: int, y: int):
print('sum :', x + y)


@dsl.pipeline
def core() -> int:
task = core_comp()
task.set_caching_options(False)

return task.output


@dsl.pipeline
def mantle() -> int:
dag_task = core()
dag_task.set_caching_options(False)

return dag_task.output


@dsl.pipeline(name=os.path.basename(__file__).removesuffix('.py') + '-pipeline')
def crust():
dag_task = mantle()
dag_task.set_caching_options(False)

task = crust_comp(x=2, y=dag_task.output)
task.set_caching_options(False)


if __name__ == '__main__':
Compiler().compile(
pipeline_func=crust,
package_path=f"{__file__.removesuffix('.py')}.yaml")
client = Client()
client.create_run_from_pipeline_func(crust)
Loading

0 comments on commit 00fb964

Please sign in to comment.