diff --git a/.coveragerc b/.coveragerc new file mode 100644 index 000000000..5fa78441f --- /dev/null +++ b/.coveragerc @@ -0,0 +1,7 @@ +[run] +omit = + src/**/test_*.py,,src/codeflare_sdk/common/utils/unit_test_support.py + +[report] +exclude_lines = + pragma: no cover diff --git a/.github/workflows/coverage-badge.yaml b/.github/workflows/coverage-badge.yaml index bae1212df..ed1b08e4e 100644 --- a/.github/workflows/coverage-badge.yaml +++ b/.github/workflows/coverage-badge.yaml @@ -26,7 +26,7 @@ jobs: poetry install --with test - name: Generate coverage report run: | - coverage run -m pytest + coverage run -m pytest --omit="src/**/test_*.py,src/codeflare_sdk/common/utils/unit_test_support.py" - name: Coverage Badge uses: tj-actions/coverage-badge-py@v2 diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 884632da6..820ad9139 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -76,7 +76,7 @@ pytest -v src/codeflare_sdk ### Local e2e Testing -- Please follow the [e2e documentation](https://github.com/project-codeflare/codeflare-sdk/blob/main/docs/e2e.md) +- Please follow the [e2e documentation](https://github.com/project-codeflare/codeflare-sdk/blob/main/docs/sphinx/user-docs/e2e.rst) #### Code Coverage diff --git a/src/codeflare_sdk/common/kueue/test_kueue.py b/src/codeflare_sdk/common/kueue/test_kueue.py index 77095d4d9..19593dd17 100644 --- a/src/codeflare_sdk/common/kueue/test_kueue.py +++ b/src/codeflare_sdk/common/kueue/test_kueue.py @@ -11,7 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from ..utils.unit_test_support import get_local_queue, createClusterConfig +from ..utils.unit_test_support import get_local_queue, create_cluster_config from unittest.mock import patch from codeflare_sdk.ray.cluster.cluster import Cluster, ClusterConfiguration import yaml @@ -46,7 +46,7 @@ def test_cluster_creation_no_aw_local_queue(mocker): "kubernetes.client.CustomObjectsApi.list_namespaced_custom_object", return_value=get_local_queue("kueue.x-k8s.io", "v1beta1", "ns", "localqueues"), ) - config = createClusterConfig() + config = create_cluster_config() config.name = "unit-test-cluster-kueue" config.write_to_file = True config.local_queue = "local-queue-default" @@ -59,7 +59,7 @@ def test_cluster_creation_no_aw_local_queue(mocker): ) # With resources loaded in memory, no Local Queue specified. - config = createClusterConfig() + config = create_cluster_config() config.name = "unit-test-cluster-kueue" config.write_to_file = False cluster = Cluster(config) @@ -79,7 +79,7 @@ def test_aw_creation_local_queue(mocker): "kubernetes.client.CustomObjectsApi.list_namespaced_custom_object", return_value=get_local_queue("kueue.x-k8s.io", "v1beta1", "ns", "localqueues"), ) - config = createClusterConfig() + config = create_cluster_config() config.name = "unit-test-aw-kueue" config.appwrapper = True config.write_to_file = True @@ -93,7 +93,7 @@ def test_aw_creation_local_queue(mocker): ) # With resources loaded in memory, no Local Queue specified. - config = createClusterConfig() + config = create_cluster_config() config.name = "unit-test-aw-kueue" config.appwrapper = True config.write_to_file = False @@ -114,7 +114,7 @@ def test_get_local_queue_exists_fail(mocker): "kubernetes.client.CustomObjectsApi.list_namespaced_custom_object", return_value=get_local_queue("kueue.x-k8s.io", "v1beta1", "ns", "localqueues"), ) - config = createClusterConfig() + config = create_cluster_config() config.name = "unit-test-aw-kueue" config.appwrapper = True config.write_to_file = True diff --git a/src/codeflare_sdk/common/utils/unit_test_support.py b/src/codeflare_sdk/common/utils/unit_test_support.py index 8e034378f..d3c9728a5 100644 --- a/src/codeflare_sdk/common/utils/unit_test_support.py +++ b/src/codeflare_sdk/common/utils/unit_test_support.py @@ -26,32 +26,34 @@ aw_dir = os.path.expanduser("~/.codeflare/resources/") -def createClusterConfig(): +def create_cluster_config(num_workers=2, write_to_file=False): config = ClusterConfiguration( name="unit-test-cluster", namespace="ns", - num_workers=2, + num_workers=num_workers, worker_cpu_requests=3, worker_cpu_limits=4, worker_memory_requests=5, worker_memory_limits=6, appwrapper=True, - write_to_file=False, + write_to_file=write_to_file, ) return config -def createClusterWithConfig(mocker): - mocker.patch("kubernetes.config.load_kube_config", return_value="ignore") - mocker.patch( - "kubernetes.client.CustomObjectsApi.get_cluster_custom_object", - return_value={"spec": {"domain": "apps.cluster.awsroute.org"}}, - ) - cluster = Cluster(createClusterConfig()) +def create_cluster(mocker, num_workers=2, write_to_file=False): + cluster = Cluster(create_cluster_config(num_workers, write_to_file)) return cluster -def createClusterWrongType(): +def patch_cluster_with_dynamic_client(mocker, cluster, dynamic_client=None): + mocker.patch.object(cluster, "get_dynamic_client", return_value=dynamic_client) + mocker.patch.object(cluster, "down", return_value=None) + mocker.patch.object(cluster, "config_check", return_value=None) + # mocker.patch.object(cluster, "_throw_for_no_raycluster", return_value=None) + + +def create_cluster_wrong_type(): config = ClusterConfiguration( name="unit-test-cluster", namespace="ns", @@ -383,6 +385,48 @@ def mocked_ingress(port, cluster_name="unit-test-cluster", annotations: dict = N return mock_ingress +# Global dictionary to maintain state in the mock +cluster_state = {} + + +# The mock side_effect function for server_side_apply +def mock_server_side_apply(resource, body=None, name=None, namespace=None, **kwargs): + # Simulate the behavior of server_side_apply: + # Update a mock state that represents the cluster's current configuration. + # Stores the state in a global dictionary for simplicity. + + global cluster_state + + if not resource or not body or not name or not namespace: + raise ValueError("Missing required parameters for server_side_apply") + + # Extract worker count from the body if it exists + try: + worker_count = ( + body["spec"]["workerGroupSpecs"][0]["replicas"] + if "spec" in body and "workerGroupSpecs" in body["spec"] + else None + ) + except KeyError: + worker_count = None + + # Apply changes to the cluster_state mock + cluster_state[name] = { + "namespace": namespace, + "worker_count": worker_count, + "body": body, + } + + # Return a response that mimics the behavior of a successful apply + return { + "status": "success", + "applied": True, + "name": name, + "namespace": namespace, + "worker_count": worker_count, + } + + @patch.dict("os.environ", {"NB_PREFIX": "test-prefix"}) def create_cluster_all_config_params(mocker, cluster_name, is_appwrapper) -> Cluster: mocker.patch( diff --git a/src/codeflare_sdk/common/widgets/test_widgets.py b/src/codeflare_sdk/common/widgets/test_widgets.py index 12c238544..a7d3de923 100644 --- a/src/codeflare_sdk/common/widgets/test_widgets.py +++ b/src/codeflare_sdk/common/widgets/test_widgets.py @@ -15,7 +15,7 @@ import codeflare_sdk.common.widgets.widgets as cf_widgets import pandas as pd from unittest.mock import MagicMock, patch -from ..utils.unit_test_support import get_local_queue, createClusterConfig +from ..utils.unit_test_support import get_local_queue, create_cluster_config from codeflare_sdk.ray.cluster.cluster import Cluster from codeflare_sdk.ray.cluster.status import ( RayCluster, @@ -38,7 +38,7 @@ def test_cluster_up_down_buttons(mocker): "kubernetes.client.CustomObjectsApi.list_namespaced_custom_object", return_value=get_local_queue("kueue.x-k8s.io", "v1beta1", "ns", "localqueues"), ) - cluster = Cluster(createClusterConfig()) + cluster = Cluster(create_cluster_config()) with patch("ipywidgets.Button") as MockButton, patch( "ipywidgets.Checkbox" diff --git a/src/codeflare_sdk/ray/cluster/cluster.py b/src/codeflare_sdk/ray/cluster/cluster.py index a3f345547..e2b989ca6 100644 --- a/src/codeflare_sdk/ray/cluster/cluster.py +++ b/src/codeflare_sdk/ray/cluster/cluster.py @@ -52,6 +52,10 @@ import requests from kubernetes import config +from kubernetes.dynamic import DynamicClient +from kubernetes import client as k8s_client +from kubernetes.client.rest import ApiException + from kubernetes.client.rest import ApiException import warnings @@ -84,6 +88,14 @@ def __init__(self, config: ClusterConfiguration): if is_notebook(): cluster_up_down_buttons(self) + def get_dynamic_client(self): # pragma: no cover + """Return a dynamic client, optionally mocked in tests.""" + return DynamicClient(get_api_client()) + + def config_check(self): + """Return a dynamic client, optionally mocked in tests.""" + return config_check() + @property def _client_headers(self): k8_client = get_api_client() @@ -95,9 +107,7 @@ def _client_headers(self): @property def _client_verify_tls(self): - if not _is_openshift_cluster or not self.config.verify_tls: - return False - return True + return _is_openshift_cluster and self.config.verify_tls @property def job_client(self): @@ -121,7 +131,6 @@ def create_resource(self): Called upon cluster object creation, creates an AppWrapper yaml based on the specifications of the ClusterConfiguration. """ - if self.config.namespace is None: self.config.namespace = get_current_namespace() if self.config.namespace is None: @@ -130,7 +139,6 @@ def create_resource(self): raise TypeError( f"Namespace {self.config.namespace} is of type {type(self.config.namespace)}. Check your Kubernetes Authentication." ) - return build_ray_cluster(self) # creates a new cluster with the provided or default spec @@ -139,10 +147,11 @@ def up(self): Applies the Cluster yaml, pushing the resource request onto the Kueue localqueue. """ - + print( + "WARNING: The up() function is planned for deprecation in favor of apply()." + ) # check if RayCluster CustomResourceDefinition exists if not throw RuntimeError self._throw_for_no_raycluster() - namespace = self.config.namespace try: @@ -176,6 +185,52 @@ def up(self): except Exception as e: # pragma: no cover return _kube_api_error_handling(e) + # Applies a new cluster with the provided or default spec + def apply(self, force=False): + """ + Applies the Cluster yaml using server-side apply. + If 'force' is set to True, conflicts will be forced. + """ + # check if RayCluster CustomResourceDefinition exists if not throw RuntimeError + self._throw_for_no_raycluster() + namespace = self.config.namespace + + try: + self.config_check() + api_instance = client.CustomObjectsApi(get_api_client()) + crds = self.get_dynamic_client().resources + api_instance = crds.get( + api_version="workload.codeflare.dev/v1beta2", kind="AppWrapper" + ) + if self.config.appwrapper: + if self.config.write_to_file: + with open(self.resource_yaml) as f: + aw = yaml.load(f, Loader=yaml.FullLoader) + api_instance.server_side_apply( + group="workload.codeflare.dev", + version="v1beta2", + namespace=namespace, + plural="appwrappers", + body=aw, + ) + else: + api_instance.server_side_apply( + group="workload.codeflare.dev", + version="v1beta2", + namespace=namespace, + plural="appwrappers", + body=self.resource_yaml, + ) + print(f"AppWrapper: '{self.config.name}' has successfully been created") + else: + api_instance = crds.get(api_version="ray.io/v1", kind="RayCluster") + self._component_resources_apply(namespace, api_instance) + print( + f"Ray Cluster: '{self.config.name}' has successfully been applied" + ) + except Exception as e: # pragma: no cover + return _kube_api_error_handling(e) + def _throw_for_no_raycluster(self): api_instance = client.CustomObjectsApi(get_api_client()) try: @@ -204,7 +259,7 @@ def down(self): resource_name = self.config.name self._throw_for_no_raycluster() try: - config_check() + self.config_check() api_instance = client.CustomObjectsApi(get_api_client()) if self.config.appwrapper: api_instance.delete_namespaced_custom_object( @@ -507,6 +562,16 @@ def _component_resources_up( else: _create_resources(self.resource_yaml, namespace, api_instance) + def _component_resources_apply( + self, namespace: str, api_instance: client.CustomObjectsApi + ): + if self.config.write_to_file: + with open(self.resource_yaml) as f: + ray_cluster = yaml.safe_load(f) + _apply_resources(ray_cluster, namespace, api_instance) + else: + _apply_resources(self.resource_yaml, namespace, api_instance) + def _component_resources_down( self, namespace: str, api_instance: client.CustomObjectsApi ): @@ -744,6 +809,20 @@ def _create_resources(yamls, namespace: str, api_instance: client.CustomObjectsA ) +def _apply_resources( + yamls, namespace: str, api_instance: client.CustomObjectsApi, force=False +): + api_instance.server_side_apply( + field_manager="cluster-manager", + group="ray.io", + version="v1", + namespace=namespace, + plural="rayclusters", + body=yamls, + force_conflicts=force, # Allow forcing conflicts if needed + ) + + def _check_aw_exists(name: str, namespace: str) -> bool: try: config_check() diff --git a/src/codeflare_sdk/ray/cluster/test_cluster.py b/src/codeflare_sdk/ray/cluster/test_cluster.py index 5e83c82a8..298c416ed 100644 --- a/src/codeflare_sdk/ray/cluster/test_cluster.py +++ b/src/codeflare_sdk/ray/cluster/test_cluster.py @@ -19,16 +19,18 @@ list_all_queued, ) from codeflare_sdk.common.utils.unit_test_support import ( - createClusterWithConfig, + create_cluster, arg_check_del_effect, ingress_retrieval, arg_check_apply_effect, get_local_queue, - createClusterConfig, + create_cluster_config, get_ray_obj, get_obj_none, get_ray_obj_with_status, get_aw_obj_with_status, + patch_cluster_with_dynamic_client, + route_list_retrieval, ) from codeflare_sdk.ray.cluster.cluster import _is_openshift_cluster from pathlib import Path @@ -67,11 +69,189 @@ def test_cluster_up_down(mocker): "kubernetes.client.CustomObjectsApi.list_namespaced_custom_object", return_value=get_local_queue("kueue.x-k8s.io", "v1beta1", "ns", "localqueues"), ) - cluster = cluster = createClusterWithConfig(mocker) + cluster = create_cluster(mocker) cluster.up() cluster.down() +def test_cluster_apply_scale_up_scale_down(mocker): + mocker.patch("kubernetes.client.ApisApi.get_api_versions") + mocker.patch("kubernetes.config.load_kube_config", return_value="ignore") + mock_dynamic_client = mocker.Mock() + mocker.patch( + "kubernetes.dynamic.DynamicClient.resources", new_callable=mocker.PropertyMock + ) + mocker.patch( + "codeflare_sdk.ray.cluster.cluster.Cluster.create_resource", + return_value="./tests/test_cluster_yamls/ray/default-ray-cluster.yaml", + ) + mocker.patch("kubernetes.config.load_kube_config", return_value="ignore") + mocker.patch( + "kubernetes.client.CustomObjectsApi.get_cluster_custom_object", + return_value={"spec": {"domain": "apps.cluster.awsroute.org"}}, + ) + + # Initialize test + initial_num_workers = 1 + scaled_up_num_workers = 2 + + # Step 1: Create cluster with initial workers + cluster = create_cluster(mocker, initial_num_workers) + patch_cluster_with_dynamic_client(mocker, cluster, mock_dynamic_client) + mocker.patch( + "kubernetes.client.CustomObjectsApi.list_namespaced_custom_object", + return_value=get_obj_none("ray.io", "v1", "ns", "rayclusters"), + ) + cluster.apply() + + # Step 2: Scale up the cluster + cluster = create_cluster(mocker, scaled_up_num_workers) + patch_cluster_with_dynamic_client(mocker, cluster, mock_dynamic_client) + cluster.apply() + + # Step 3: Scale down the cluster + cluster = create_cluster(mocker, initial_num_workers) + patch_cluster_with_dynamic_client(mocker, cluster, mock_dynamic_client) + cluster.apply() + + # Tear down + cluster.down() + + +def test_cluster_apply_with_file(mocker): + mocker.patch("kubernetes.client.ApisApi.get_api_versions") + mocker.patch("kubernetes.config.load_kube_config", return_value="ignore") + mock_dynamic_client = mocker.Mock() + mocker.patch("codeflare_sdk.ray.cluster.cluster.Cluster._throw_for_no_raycluster") + mocker.patch( + "kubernetes.dynamic.DynamicClient.resources", new_callable=mocker.PropertyMock + ) + mocker.patch( + "codeflare_sdk.ray.cluster.cluster.Cluster.create_resource", + return_value="./tests/test_cluster_yamls/ray/default-ray-cluster.yaml", + ) + mocker.patch("kubernetes.config.load_kube_config", return_value="ignore") + mocker.patch( + "kubernetes.client.CustomObjectsApi.get_cluster_custom_object", + return_value={"spec": {"domain": "apps.cluster.awsroute.org"}}, + ) + + # Step 1: Create cluster with initial workers + cluster = create_cluster(mocker, 1, write_to_file=True) + patch_cluster_with_dynamic_client(mocker, cluster, mock_dynamic_client) + mocker.patch( + "kubernetes.client.CustomObjectsApi.list_namespaced_custom_object", + return_value=get_obj_none("ray.io", "v1", "ns", "rayclusters"), + ) + cluster.apply() + # Tear down + cluster.down() + + +def test_cluster_apply_with_appwrapper(mocker): + # Mock Kubernetes client and dynamic client methods + mocker.patch("kubernetes.client.ApisApi.get_api_versions") + mocker.patch("kubernetes.config.load_kube_config", return_value="ignore") + mocker.patch( + "codeflare_sdk.ray.cluster.cluster._check_aw_exists", + return_value=True, + ) + mock_dynamic_client = mocker.Mock() + mocker.patch("codeflare_sdk.ray.cluster.cluster.Cluster._throw_for_no_raycluster") + mocker.patch( + "kubernetes.dynamic.DynamicClient.resources", new_callable=mocker.PropertyMock + ) + mocker.patch( + "codeflare_sdk.ray.cluster.cluster.Cluster.create_resource", + return_value="./tests/test_cluster_yamls/ray/default-ray-cluster.yaml", + ) + mocker.patch("kubernetes.config.load_kube_config", return_value="ignore") + + # Create a cluster configuration with appwrapper set to False + cluster = create_cluster(mocker, 1, write_to_file=False) + patch_cluster_with_dynamic_client(mocker, cluster, mock_dynamic_client) + + # Mock listing RayCluster to simulate it doesn't exist + mocker.patch( + "kubernetes.client.CustomObjectsApi.list_namespaced_custom_object", + return_value=get_obj_none("ray.io", "v1", "ns", "rayclusters"), + ) + # Call the apply method + cluster.apply() + + # Assertions + print("Cluster applied without AppWrapper.") + + +def test_cluster_apply_without_appwrapper_write_to_file(mocker): + # Mock Kubernetes client and dynamic client methods + mocker.patch("kubernetes.client.ApisApi.get_api_versions") + mocker.patch("kubernetes.config.load_kube_config", return_value="ignore") + mocker.patch( + "codeflare_sdk.ray.cluster.cluster._check_aw_exists", + return_value=True, + ) + mock_dynamic_client = mocker.Mock() + mocker.patch("codeflare_sdk.ray.cluster.cluster.Cluster._throw_for_no_raycluster") + mocker.patch( + "kubernetes.dynamic.DynamicClient.resources", new_callable=mocker.PropertyMock + ) + mocker.patch( + "codeflare_sdk.ray.cluster.cluster.Cluster.create_resource", + return_value="./tests/test_cluster_yamls/ray/default-ray-cluster.yaml", + ) + mocker.patch("kubernetes.config.load_kube_config", return_value="ignore") + + # Create a cluster configuration with appwrapper set to False + cluster = create_cluster(mocker, 1, write_to_file=True) + patch_cluster_with_dynamic_client(mocker, cluster, mock_dynamic_client) + cluster.config.appwrapper = False + + # Mock listing RayCluster to simulate it doesn't exist + mocker.patch( + "kubernetes.client.CustomObjectsApi.list_namespaced_custom_object", + return_value=get_obj_none("ray.io", "v1", "ns", "rayclusters"), + ) + # Call the apply method + cluster.apply() + + # Assertions + print("Cluster applied without AppWrapper.") + + +def test_cluster_apply_without_appwrapper(mocker): + # Mock Kubernetes client and dynamic client methods + mocker.patch("kubernetes.client.ApisApi.get_api_versions") + mocker.patch("kubernetes.config.load_kube_config", return_value="ignore") + mock_dynamic_client = mocker.Mock() + mocker.patch("codeflare_sdk.ray.cluster.cluster.Cluster._throw_for_no_raycluster") + mocker.patch( + "kubernetes.dynamic.DynamicClient.resources", new_callable=mocker.PropertyMock + ) + mocker.patch( + "codeflare_sdk.ray.cluster.cluster.Cluster.create_resource", + return_value="./tests/test_cluster_yamls/ray/default-ray-cluster.yaml", + ) + mocker.patch("kubernetes.config.load_kube_config", return_value="ignore") + + # Create a cluster configuration with appwrapper set to False + cluster = create_cluster(mocker, 1, write_to_file=False) + cluster.config.appwrapper = None + patch_cluster_with_dynamic_client(mocker, cluster, mock_dynamic_client) + + # Mock listing RayCluster to simulate it doesn't exist + mocker.patch( + "kubernetes.client.CustomObjectsApi.list_namespaced_custom_object", + return_value=get_obj_none("ray.io", "v1", "ns", "rayclusters"), + ) + + # Call the apply method + cluster.apply() + + # Assertions + print("Cluster applied without AppWrapper.") + + def test_cluster_up_down_no_mcad(mocker): mocker.patch("codeflare_sdk.ray.cluster.cluster.Cluster._throw_for_no_raycluster") mocker.patch("kubernetes.config.load_kube_config", return_value="ignore") @@ -98,7 +278,7 @@ def test_cluster_up_down_no_mcad(mocker): "kubernetes.client.CustomObjectsApi.list_cluster_custom_object", return_value={"items": []}, ) - config = createClusterConfig() + config = create_cluster_config() config.name = "unit-test-cluster-ray" config.appwrapper = False cluster = Cluster(config) @@ -117,7 +297,7 @@ def test_cluster_uris(mocker): "kubernetes.client.CustomObjectsApi.list_namespaced_custom_object", return_value=get_local_queue("kueue.x-k8s.io", "v1beta1", "ns", "localqueues"), ) - cluster = cluster = createClusterWithConfig(mocker) + cluster = create_cluster(mocker) mocker.patch( "kubernetes.client.NetworkingV1Api.list_namespaced_ingress", return_value=ingress_retrieval( @@ -147,6 +327,52 @@ def test_cluster_uris(mocker): == "Dashboard not available yet, have you run cluster.up()?" ) + mocker.patch( + "codeflare_sdk.ray.cluster.cluster._is_openshift_cluster", return_value=True + ) + mocker.patch( + "kubernetes.client.CustomObjectsApi.list_namespaced_custom_object", + return_value={ + "items": [ + { + "metadata": { + "name": "ray-dashboard-unit-test-cluster", + }, + "spec": { + "host": "ray-dashboard-unit-test-cluster-ns.apps.cluster.awsroute.org", + "tls": {}, # Indicating HTTPS + }, + } + ] + }, + ) + cluster = create_cluster(mocker) + assert ( + cluster.cluster_dashboard_uri() + == "http://ray-dashboard-unit-test-cluster-ns.apps.cluster.awsroute.org" + ) + mocker.patch( + "kubernetes.client.CustomObjectsApi.list_namespaced_custom_object", + return_value={ + "items": [ + { + "metadata": { + "name": "ray-dashboard-unit-test-cluster", + }, + "spec": { + "host": "ray-dashboard-unit-test-cluster-ns.apps.cluster.awsroute.org", + "tls": {"termination": "passthrough"}, # Indicating HTTPS + }, + } + ] + }, + ) + cluster = create_cluster(mocker) + assert ( + cluster.cluster_dashboard_uri() + == "https://ray-dashboard-unit-test-cluster-ns.apps.cluster.awsroute.org" + ) + def test_ray_job_wrapping(mocker): import ray @@ -159,7 +385,7 @@ def ray_addr(self, *args): "kubernetes.client.CustomObjectsApi.list_namespaced_custom_object", return_value=get_local_queue("kueue.x-k8s.io", "v1beta1", "ns", "localqueues"), ) - cluster = cluster = createClusterWithConfig(mocker) + cluster = create_cluster(mocker) mocker.patch( "ray.job_submission.JobSubmissionClient._check_connection_and_version_with_url", return_value="None", diff --git a/src/codeflare_sdk/ray/cluster/test_config.py b/src/codeflare_sdk/ray/cluster/test_config.py index 3416fc28c..2d3aa7796 100644 --- a/src/codeflare_sdk/ray/cluster/test_config.py +++ b/src/codeflare_sdk/ray/cluster/test_config.py @@ -13,7 +13,7 @@ # limitations under the License. from codeflare_sdk.common.utils.unit_test_support import ( - createClusterWrongType, + create_cluster_wrong_type, get_local_queue, create_cluster_all_config_params, ) @@ -55,6 +55,7 @@ def test_default_appwrapper_creation(mocker): assert cluster.resource_yaml == expected_aw +@pytest.mark.filterwarnings("ignore::UserWarning") def test_config_creation_all_parameters(mocker): from codeflare_sdk.ray.cluster.config import DEFAULT_RESOURCE_MAPPING @@ -98,6 +99,7 @@ def test_config_creation_all_parameters(mocker): ) +@pytest.mark.filterwarnings("ignore::UserWarning") def test_all_config_params_aw(mocker): create_cluster_all_config_params(mocker, "aw-all-params", True) assert filecmp.cmp( @@ -109,11 +111,12 @@ def test_all_config_params_aw(mocker): def test_config_creation_wrong_type(): with pytest.raises(TypeError) as error_info: - createClusterWrongType() + create_cluster_wrong_type() assert len(str(error_info.value).splitlines()) == 4 +@pytest.mark.filterwarnings("ignore::UserWarning") def test_cluster_config_deprecation_conversion(mocker): config = ClusterConfiguration( name="test", diff --git a/tests/e2e/cluster_apply_kind_test.py b/tests/e2e/cluster_apply_kind_test.py new file mode 100644 index 000000000..398bf73b1 --- /dev/null +++ b/tests/e2e/cluster_apply_kind_test.py @@ -0,0 +1,156 @@ +from codeflare_sdk import Cluster, ClusterConfiguration +import pytest +from kubernetes import client + +from support import ( + initialize_kubernetes_client, + create_namespace, + delete_namespace, + get_ray_cluster, +) + + +@pytest.mark.kind +class TestRayClusterApply: + def setup_method(self): + initialize_kubernetes_client(self) + + def teardown_method(self): + delete_namespace(self) + + def test_cluster_apply(self): + self.setup_method() + create_namespace(self) + + cluster_name = "test-cluster-apply" + namespace = self.namespace + + # Initial configuration with 1 worker + initial_config = ClusterConfiguration( + name=cluster_name, + namespace=namespace, + num_workers=1, + head_cpu_requests="500m", + head_cpu_limits="1", + head_memory_requests="1Gi", + head_memory_limits="2Gi", + worker_cpu_requests="500m", + worker_cpu_limits="1", + worker_memory_requests="1Gi", + worker_memory_limits="2Gi", + write_to_file=True, + verify_tls=False, + ) + + # Create the cluster + cluster = Cluster(initial_config) + cluster.apply() + + # Wait for the cluster to be ready + cluster.wait_ready() + status = cluster.status() + assert status["ready"], f"Cluster {cluster_name} is not ready: {status}" + + # Verify the cluster is created + ray_cluster = get_ray_cluster(cluster_name, namespace) + assert ray_cluster is not None, "Cluster was not created successfully" + assert ( + ray_cluster["spec"]["workerGroupSpecs"][0]["replicas"] == 1 + ), "Initial worker count does not match" + + # Update configuration with 3 workers + updated_config = ClusterConfiguration( + name=cluster_name, + namespace=namespace, + num_workers=2, + head_cpu_requests="500m", + head_cpu_limits="1", + head_memory_requests="1Gi", + head_memory_limits="2Gi", + worker_cpu_requests="500m", + worker_cpu_limits="1", + worker_memory_requests="1Gi", + worker_memory_limits="2Gi", + write_to_file=True, + verify_tls=False, + ) + + # Apply the updated configuration + cluster.config = updated_config + cluster.apply() + + # Wait for the updated cluster to be ready + cluster.wait_ready() + updated_status = cluster.status() + assert updated_status[ + "ready" + ], f"Cluster {cluster_name} is not ready after update: {updated_status}" + + # Verify the cluster is updated + updated_ray_cluster = get_ray_cluster(cluster_name, namespace) + assert ( + updated_ray_cluster["spec"]["workerGroupSpecs"][0]["replicas"] == 2 + ), "Worker count was not updated" + + # Clean up + cluster.down() + ray_cluster = get_ray_cluster(cluster_name, namespace) + assert ray_cluster is None, "Cluster was not deleted successfully" + + def test_apply_invalid_update(self): + self.setup_method() + create_namespace(self) + + cluster_name = "test-cluster-apply-invalid" + namespace = self.namespace + + # Initial configuration + initial_config = ClusterConfiguration( + name=cluster_name, + namespace=namespace, + num_workers=1, + head_cpu_requests="500m", + head_cpu_limits="1", + head_memory_requests="1Gi", + head_memory_limits="2Gi", + worker_cpu_requests="500m", + worker_cpu_limits="1", + worker_memory_requests="1Gi", + worker_memory_limits="2Gi", + write_to_file=True, + verify_tls=False, + ) + + # Create the cluster + cluster = Cluster(initial_config) + cluster.apply() + + # Wait for the cluster to be ready + cluster.wait_ready() + status = cluster.status() + assert status["ready"], f"Cluster {cluster_name} is not ready: {status}" + + # Update with an invalid configuration (e.g., immutable field change) + invalid_config = ClusterConfiguration( + name=cluster_name, + namespace=namespace, + num_workers=2, + head_cpu_requests="1", + head_cpu_limits="2", # Changing CPU limits (immutable) + head_memory_requests="1Gi", + head_memory_limits="2Gi", + worker_cpu_requests="500m", + worker_cpu_limits="1", + worker_memory_requests="1Gi", + worker_memory_limits="2Gi", + write_to_file=True, + verify_tls=False, + ) + + # Try to apply the invalid configuration and expect failure + cluster.config = invalid_config + with pytest.raises(RuntimeError, match="Immutable fields detected"): + cluster.apply() + + # Clean up + cluster.down() diff --git a/tests/e2e/support.py b/tests/e2e/support.py index 2ff33e911..5e4ddbdf2 100644 --- a/tests/e2e/support.py +++ b/tests/e2e/support.py @@ -11,6 +11,22 @@ ) +def get_ray_cluster(cluster_name, namespace): + api = client.CustomObjectsApi() + try: + return api.get_namespaced_custom_object( + group="ray.io", + version="v1", + namespace=namespace, + plural="rayclusters", + name=cluster_name, + ) + except client.exceptions.ApiException as e: + if e.status == 404: + return None + raise + + def get_ray_image(): default_ray_image = "quay.io/modh/ray@sha256:0d715f92570a2997381b7cafc0e224cfa25323f18b9545acfd23bc2b71576d06" return os.getenv("RAY_IMAGE", default_ray_image)