Skip to content

Commit

Permalink
refactor: modify test cases for system client.
Browse files Browse the repository at this point in the history
Signed-off-by: Ganesh Nithin <[email protected]>
  • Loading branch information
ganesh-nithin committed Dec 4, 2024
1 parent 814f1fb commit 85cc103
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 100 deletions.
2 changes: 1 addition & 1 deletion nisystemlink/clients/system/_system_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ def query_jobs(self, query: models.QueryJobsRequest) -> models.QueryJobsResponse
@post("cancel-jobs")
def cancel_jobs(
self, job_ids: List[models.CancelJobRequest]
) -> models.CancelJobResponse:
) -> models.CancelJobResponse | None:
"""Cancel the jobs.
Args:
Expand Down
2 changes: 1 addition & 1 deletion nisystemlink/clients/system/models/_create_job_response.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
class CreateJobResponse(CreateJobRequest):
"""Model for response of create job request."""

error: ApiError
error: Optional[ApiError] = None
"""Represents the standard error structure."""

jid: Optional[str] = None
Expand Down
13 changes: 8 additions & 5 deletions nisystemlink/clients/system/models/_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,23 +59,26 @@ class Job(JsonModel):
id: Optional[str] = None
"""The ID of the system that the job targets."""

created_timestamp: datetime
created_timestamp: Optional[datetime] = None
"""The timestamp representing when the job was created."""

last_updated_timestamp: datetime
last_updated_timestamp: Optional[datetime] = None
"""The timestamp representing when the job was last updated."""

dispatched_timestamp: Optional[datetime] = None
"""The timestamp representing when the job was dispatched."""

state: JobState
state: Optional[JobState] = None
"""The state of the job."""

metadata: Optional[Dict[str, Any]] = None
"""The metadata associated with the job."""

config: JobConfig
config: Optional[JobConfig] = None
"""The configuration of the job."""

result: JobResult
result: Optional[JobResult] = None
"""The result of the job."""

class Config:
orm_mode = True
195 changes: 102 additions & 93 deletions tests/integration/system/test_system_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,13 @@
QueryJobsRequest,
)

TARGET_SYSTEM = (
"HVM_domU--SN-ec200972-eeca-062e-5bf5-33g3g3g3d73b2--MAC-0A-E1-20-D6-96-2B"
)
METADATA = {"queued": True, "refresh_minion_cache": {"grains": True}}
SAMPLE_FUN_1 = "system.sample_function_one"
SAMPLE_FUN_2 = "system.sample_function_two"


@pytest.fixture(scope="class")
def client(enterprise_config: HttpConfiguration) -> SystemClient:
Expand All @@ -33,26 +40,29 @@ def _create_job(job: CreateJobRequest) -> CreateJobResponse:
yield _create_job

job_requests = [
CancelJobRequest(jid=response.jid, tgt=response.tgt[0])
CancelJobRequest(jid=response.jid, system_id=TARGET_SYSTEM)
for response in responses
if response.tgt is not None
if response.jid != ""
]

client.cancel_jobs(job_requests)
if len(job_requests):
client.cancel_jobs(job_requests)


@pytest.fixture(scope="class")
@pytest.fixture(scope="class", autouse=True)
def create_multiple_jobs(
create_job,
):
"""Fixture to create multiple jobs."""
responses = []
arg_1 = [["A description"]]
arg_2 = [["Another description"]]
tgt = ["HVM_domU--SN-ec200972-eeca-062e-5bf5-33g3g3g3d73b2--MAC-0A-E1-20-D6-96-2B"]
fun_1 = ["system.set_computer_desc"]
fun_2 = ["system.set_computer_asc"]
metadata = {"queued": True, "refresh_minion_cache": {"grains": True}}
arg_1 = [["sample argument one"]]
arg_2 = [["sample argument two"]]
tgt = [TARGET_SYSTEM]
fun_1 = [SAMPLE_FUN_1]
fun_2 = [SAMPLE_FUN_2]

metadata = METADATA

job_1 = CreateJobRequest(
arg=arg_1,
tgt=tgt,
Expand All @@ -69,92 +79,87 @@ def create_multiple_jobs(
)
responses.append(create_job(job_2))

job_3 = CreateJobRequest(
arg=arg_1,
tgt=tgt,
fun=fun_2,
metadata=metadata,
)
responses.append(create_job(job_3))

return responses


@pytest.mark.integration
@pytest.mark.enterprise
class TestSystemClient:
def test__create_job__one_job_created_with_right_field_values(
def test__create_a_job__job_is_created_with_right_field_values(
self,
create_job,
):
arg = [["A description"]]
tgt = [
"HVM_domU--SN-ec200972-eeca-062e-5bf5-017a25451b39--MAC-0A-E1-20-D6-96-2B"
]
fun = ["system.set_computer_desc"]
metadata = {"queued": True, "refresh_minion_cache": {"grains": True}}
arg = [["sample argument"]]
tgt = [TARGET_SYSTEM]
fun = ["system.set_computer_desc_sample_function"]

job = CreateJobRequest(
arg=arg,
tgt=tgt,
fun=fun,
metadata=metadata,
metadata=METADATA,
)

response = create_job(job)

assert response is not None
assert response.jid != ""
assert response.arg == arg
assert response.tgt == tgt
assert response.metadata == metadata
assert response.fun == fun
assert response.error is None

def test__list_jobs__list_single_job_succeeds(
self, create_job, client: SystemClient
def test__get_job_using_target_and_job_id__returns_job_matches_target_and_job_id(
self, create_multiple_jobs, client: SystemClient
):
arg = [["A description"]]
tgt = [
"HVM_domU--SN-ec200972-eeca-062e-5bf5-017a25451b39--MAC-0A-E1-20-D6-96-2B"
]
fun = ["system.set_computer_desc"]
metadata = {"queued": True, "refresh_minion_cache": {"grains": True}}
job = CreateJobRequest(
arg=arg,
tgt=tgt,
fun=fun,
metadata=metadata,
)
create_job_response = create_job(job)

response = client.list_jobs(jid=create_job_response.jid)
assert response is not None
[first_job, *_] = create_multiple_jobs
print(first_job.jid)
response = client.list_jobs(system_id=TARGET_SYSTEM, jid=first_job.jid)
assert len(response) == 1

[response_job] = response

assert response_job.jid == create_job_response.jid
print(response)
assert response_job.jid == first_job.jid
assert response_job.config is not None
assert response_job.config.arg == arg
assert response_job.config.tgt == tgt
assert response_job.metadata == metadata
assert response_job.config.fun == fun
assert response_job.config.tgt == first_job.tgt

def test__list_jobs__list_multiple_jobs_succeeds(
def test__get_jobs_using_target_and_function__return_jobs_match_target_and_function(
self, create_multiple_jobs, client: SystemClient
):
response = client.list_jobs(system_id=create_multiple_jobs[0].tgt[0])
[_, second_job, third_job] = create_multiple_jobs
response = client.list_jobs(system_id=TARGET_SYSTEM, function=SAMPLE_FUN_2)
assert len(response) == 2

def test__list_jobs__list_multiple_jobs_take_one_succeeds(
self, create_multiple_jobs, client: SystemClient
):
response = client.list_jobs(system_id=create_multiple_jobs[0].tgt[0], take=1)
assert len(response) == 1
[response_second_job, response_third_job] = response

assert response_second_job.jid == second_job.jid
assert response_second_job.config is not None
assert response_second_job.config.tgt == second_job.tgt

def test__list_jobs__list_multiple_jobs_skip_one_succeeds(
assert response_third_job.jid == third_job.jid
assert response_third_job.config is not None
assert response_third_job.config.tgt == third_job.tgt

def test__get_job_by_taking_one__return_only_one_job(
self, create_multiple_jobs, client: SystemClient
):
response = client.list_jobs(system_id=create_multiple_jobs[0].tgt[0], skip=1)
response = client.list_jobs(system_id=TARGET_SYSTEM, skip=1)
assert len(response) == 1

def test__list_jobs__invalid_system_id(self, client: SystemClient):
def test__get_jobs_using_invalid_system_id__returns_empty_list(
self, client: SystemClient
):
response = client.list_jobs(system_id="Invalid_system_id")
assert len(response) == 0

def test__list_jobs__invalid_jid(self, client: SystemClient):
def test__get_jobs_using_invalid_jid__returns_empty_list(
self, client: SystemClient
):
response = client.list_jobs(jid="Invalid_jid")
assert len(response) == 0

Expand All @@ -167,101 +172,105 @@ def test__get_job_summary__returns_job_summary(self, client: SystemClient):
assert response.succeeded_count is not None
assert response.error is None

def test__query_jobs__take_one_job_succeeds(self, client: SystemClient):
query = QueryJobsRequest(take=1)
def test__query_jobs_by_taking_one__returns_one_job(self, client: SystemClient):
query = QueryJobsRequest(skip=0, take=1)
response = client.query_jobs(query=query)

assert response is not None
assert response.data is not None
assert len(response.data) == response.count == 1

def test__query_jobs__filter_config_fun_succeeds(self, client: SystemClient):
query = QueryJobsRequest(
filter='config.fun.Contains("system.set_computer_desc")'
)
def test__query_jobs_by_filtering_config__return_jobs_matches_filter(
self, client: SystemClient
):
query = QueryJobsRequest(skip=0, filter=f"config.fun.Contains({SAMPLE_FUN_2})")
response = client.query_jobs(query=query)

assert response is not None
assert response.data is not None
assert response.count is not None
assert len(response.data) == response.count > 0

def test__query_jobs__filter_config_fun_fails(self, client: SystemClient):
def test__query_jobs_by_filtering_invalid_filter__raises_ApiException(
self, client: SystemClient
):
query = QueryJobsRequest(
filter='config.fun.Contains("system.set_computer_desc")'
skip=0, filter='config.fun.Contains("failed_function")'
)
with pytest.raises(ApiException):
client.query_jobs(query=query)

def test__query_jobs__filter_config_jid_succeeds(
def test__query_jobs_by_filtering_jid__returns_job_matches_jid(
self, create_multiple_jobs, client: SystemClient
):
query = QueryJobsRequest(filter=f"jid={create_multiple_jobs[0].jid}")
query = QueryJobsRequest(skip=0, filter=f"jid={create_multiple_jobs[0].jid}")
response = client.query_jobs(query=query)

assert response is not None
assert response.data is not None
assert len(response.data) == response.count == 1

def test__query_jobs__filter_config_jid_fails(self, client: SystemClient):
query = QueryJobsRequest(filter="jid=Invalid_jid")
def test__query_jobs_by_filtering_invalid_jid__raises_ApiException(
self, client: SystemClient
):
query = QueryJobsRequest(skip=0, filter="jid=Invalid_jid")
with pytest.raises(ApiException):
client.query_jobs(query=query)

def test__cancel_jobs__cancel_single_job_succeeds(self, client: SystemClient):
arg = [["A description"]]
tgt = [
"HVM_domU--SN-ec200972-eeca-062e-5bf5-017a25451b39--MAC-0A-E1-20-D6-96-2B"
]
fun = ["system.set_computer_desc"]
metadata = {"queued": True, "refresh_minion_cache": {"grains": True}}
def test__cancel_single_job__cancel_single_job_succeeds(self, client: SystemClient):
arg = [["sample argument"]]
tgt = [TARGET_SYSTEM]
fun = [SAMPLE_FUN_1]
job = CreateJobRequest(
arg=arg,
tgt=tgt,
fun=fun,
metadata=metadata,
metadata=METADATA,
)
response = client.create_job(job)

cancel_job_request = CancelJobRequest(jid=response.jid, tgt=tgt[0])
cancel_job_request = CancelJobRequest(jid=response.jid, tgt=TARGET_SYSTEM)
cancel_response = client.cancel_jobs([cancel_job_request])

assert cancel_response.error is None
assert cancel_response is None

def test__cancel_multiple_jobs__cancel_multiple_job_succeeds(
self, client: SystemClient
):
arg_1 = [["sample argument one"]]
arg_2 = [["sample argument two"]]
tgt = [TARGET_SYSTEM]
fun = [SAMPLE_FUN_2]

def test__cancel_jobs__cancel_multiple_job_succeeds(self, client: SystemClient):
arg_1 = [["A description"]]
arg_2 = [["Another description"]]
tgt = [
"HVM_domU--SN-ec200972-eeca-062e-5bf5-017a25451b39--MAC-0A-E1-20-D6-96-2B"
]
fun = ["system.set_computer_desc"]
metadata = {"queued": True, "refresh_minion_cache": {"grains": True}}
job_1 = CreateJobRequest(
arg=arg_1,
tgt=tgt,
fun=fun,
metadata=metadata,
metadata=METADATA,
)
response_1 = client.create_job(job_1)
job_2 = CreateJobRequest(
arg=arg_2,
tgt=tgt,
fun=fun,
metadata=metadata,
metadata=METADATA,
)
response_2 = client.create_job(job_2)

cancel_job_request_1 = CancelJobRequest(jid=response_1.jid, tgt=tgt[0])
cancel_job_request_2 = CancelJobRequest(jid=response_2.jid, tgt=tgt[0])
cancel_job_request_1 = CancelJobRequest(jid=response_1.jid, tgt=TARGET_SYSTEM)
cancel_job_request_2 = CancelJobRequest(jid=response_2.jid, tgt=TARGET_SYSTEM)
cancel_response = client.cancel_jobs(
[cancel_job_request_1, cancel_job_request_2]
)

assert cancel_response.error is None
assert cancel_response is None

def test__cancel_jobs__cancel_with_invalid_jid_fails(self, client: SystemClient):
def test__cancel_with_invalid_jid__cancel_job_returns_error(
self, client: SystemClient
):
cancel_job_request = CancelJobRequest(jid="Invalid_jid", tgt="Invalid_tgt")
cancel_response = client.cancel_jobs([cancel_job_request])

assert cancel_response is not None
assert cancel_response.error is not None
assert cancel_response.error.message is not None

0 comments on commit 85cc103

Please sign in to comment.