Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GEN-1412: Implement load test logic #19155

Open
wants to merge 19 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ ingestion/requirements.txt
ingestion/.python-version
ingestion/venv2/**
.python-version
ingestion/tests/load/summaries/*.csv

# MLFlow
mlruns/
Expand Down
14 changes: 9 additions & 5 deletions ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
Python Dependencies
"""

import sys
from typing import Dict, List, Set

from setuptools import setup
Expand All @@ -25,9 +26,9 @@
"boto3": "boto3>=1.20,<2.0", # No need to add botocore separately. It's a dep from boto3
"geoalchemy2": "GeoAlchemy2~=0.12",
"google-cloud-monitoring": "google-cloud-monitoring>=2.0.0",
"google-cloud-storage": "google-cloud-storage>=1.43.0",
"gcsfs": "gcsfs>=2023.1.0",
"great-expectations": "great-expectations>=0.18.0,<0.18.14",
"google-cloud-storage": "google-cloud-storage==1.43.0",
"gcsfs": "gcsfs>=2023.10.0",
"great-expectations": "great-expectations==0.18.14",
"grpc-tools": "grpcio-tools>=1.47.2",
"msal": "msal~=1.2",
"neo4j": "neo4j~=5.3.0",
Expand Down Expand Up @@ -171,15 +172,15 @@
"bigtable": {"google-cloud-bigtable>=2.0.0", VERSIONS["pandas"], VERSIONS["numpy"]},
"clickhouse": {
"clickhouse-driver~=0.2",
"clickhouse-sqlalchemy~=0.2",
"clickhouse-sqlalchemy~=0.2.0",
DATA_DIFF["clickhouse"],
},
"dagster": {
"croniter<3",
VERSIONS["pymysql"],
"psycopg2-binary",
VERSIONS["geoalchemy2"],
"dagster_graphql~=1.1",
"dagster_graphql>=1.8.0",
},
"dbt": {
"google-cloud",
Expand Down Expand Up @@ -406,6 +407,9 @@
*plugins["mssql"],
}

if sys.version_info >= (3, 9):
test.add("locust~=2.32.0")

e2e_test = {
# playwright dependencies
"pytest-playwright",
Expand Down
22 changes: 22 additions & 0 deletions ingestion/src/_openmetadata_testutils/helpers/login_user.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
"""Base class for resource tests."""

from locust.contrib.fasthttp import FastHttpSession
from requests.auth import AuthBase


class BearerAuth(AuthBase):
def __init__(self, token):
self.token = token

def __call__(self, r):
r.headers["authorization"] = "Bearer " + self.token
return r


def login_user(client: FastHttpSession) -> BearerAuth:
resp = client.post(
"/api/v1/users/login",
json={"email": "[email protected]", "password": "YWRtaW4="},
)
token = resp.json().get("accessToken")
return BearerAuth(token)
129 changes: 129 additions & 0 deletions ingestion/tests/load/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
## Adding a new resource to load tests
Add a new `*.py` file to `test_resources/tasks`. The naming does not matter, but we use the resource name as defined in Java, but seperated by `_` (e.g. `TestCaseResource` becomes `test_case_tasks.py`).

In your newly created file, you'll need to import at minimum 1 package
```python
from locust import task, TaskSet
```
`task` will be used as a decorator to define our task that will run as part of our load test. `TaskSet` wil be inherited by our task set class.

Here is an example of a locust task definition. The integer argument in `@task` will give a specific weigth to the task (i.e. increasing its probability to be ran)
```
class TestCaseResultTasks(TaskSet):
"""Test case result resource load test"""

def _list_test_case_results(self, start_ts: int, end_ts: int, days_range: str):
"""List test case results for a given time range

Args:
start_ts (int): start timestamp
end_ts (int): end timestamp
range (str):
"""
for test_case in self.test_cases:
fqn = test_case.get("fullyQualifiedName")
if fqn:
self.client.get(
f"{TEST_CASE_RESULT_RESOURCE_PATH}/{fqn}",
params={ # type: ignore
"startTs": start_ts,
"endTs": end_ts,
},
auth=self.bearer,
name=f"{TEST_CASE_RESULT_RESOURCE_PATH}/[fqn]/{days_range}"
)

@task(3)
def list_test_case_results_30_days(self):
"""List test case results for the last 30 days. Weighted 3"""
now = datetime.now()
last_30_days = int((now - timedelta(days=30)).timestamp() * 1000)
self._list_test_case_results(last_30_days, int(now.timestamp() * 1000), "30_days")
```

Notice how we use `self.client.get` to perform the request. This is provided by locust `HttpSession`. If the request needs to be authenticated, you can use `auth=self.bearer`. You will need to first define `self.bearer`, you can achieve this using the `on_start` hook from locust.

```python
from _openmetadata_testutils.helpers.login_user import login_user

class TestCaseResultTasks(TaskSet):
"""Test case result resource load test"""
[...]

def on_start(self):
"""Get a list of test cases to fetch results for"""
self.bearer = login_user(self.client)
resp = self.client.get(f"{TEST_CASE_RESOURCE_PATH}", params={"limit": 100}, auth=self.bearer)
json = resp.json()
self.test_cases = json.get("data", [])
```

**IMPORTANT**
You MUST define a `def stop(self)` methodd in your `TaskSet` class as shown below so that control is given back to the parent user class.

```python
class TestCaseResultTasks(TaskSet):
"""Test case result resource load test"""
[...]

@task
def stop(self):
self.interrupt()
```

If your request contains a parameter (i.e. `/api/v1/dataQuality/testCases/testCaseResults/{fqn}`) you can name your request so all the request sent you will be grouped together like this

```python
self.client.get(
f"{TEST_CASE_RESULT_RESOURCE_PATH}/{fqn}",
params={ # type: ignore
"startTs": start_ts,
"endTs": end_ts,
},
auth=self.bearer,
name=f"{TEST_CASE_RESULT_RESOURCE_PATH}/[fqn]/{days_range}"
)
```

Notice the argument `name=f"{TEST_CASE_RESULT_RESOURCE_PATH}/[fqn]/{days_range}"`, this will define under which name the requests will be grouped. Example of statistics summary below grouped by the request `name`

```csv
Type,Name,Request Count,Failure Count,Median Response Time,Average Response Time,Min Response Time,Max Response Time,Average Content Size,Requests/s,Failures/s,50%,66%,75%,80%,90%,95%,98%,99%,99.9%,99.99%,100%
GET,/api/v1/dataQuality/testCases/testCaseResults/[fqn]/60_days,3510,0,13,16.2354597524217,5.146791999997902,100.67633299999557,84567.57407407407,49.30531562959204,0.0,13,17,20,21,28,35,45,56,92,100,100
```

As a final step in `test_resources/manifest.yaml` add the resources, the metrics and the thresholds you want to test.

```yaml
/api/v1/dataQuality/testCases/testCaseResults/[fqn]/30_days:
type: GET
99%: 100

/api/v1/dataQuality/testCases/testCaseResults/[fqn]/60_days:
type: GET
99%: 100
```

This will test that our GET request for the defined resources are running 99% of the time in less than 100 milliseconds (0.1 seconds).

Below is a list of all the metrics you can use:
- Request Count
- Failure Count
- Median Response Time
- Average Response Time
- Min Response Time
- Max Response Time
- Average Content Size
- Requests/s
- Failures/s
- 50%
- 66%
- 75%
- 80%
- 90%
- 95%
- 98%
- 99%
- 99.9%
- 99.99%
- 100%
Empty file.
Empty file.
73 changes: 73 additions & 0 deletions ingestion/tests/load/test_load.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
"""Run test case result resource load test"""

import csv
import os
import sys
from pathlib import Path
from unittest import TestCase, skipIf

import yaml

from ingestion.tests.load.utils import run_load_test


def run_all_resources(summary_file: str, locust_file: str):
"""Test test case result resource"""
args = [
"locust",
"--headless",
"-H",
"http://localhost:8585",
"--user",
os.getenv("LOCUST_USER", "50"),
"--spawn-rate",
"1",
"-f",
str(locust_file),
"--run-time",
os.getenv("LOCUST_RUNTIME", "1m"),
"--only-summary",
"--csv",
str(summary_file),
]

run_load_test(args)


class TestAllResources(TestCase):
"""Test class to run all resources load test"""

@skipIf(sys.version_info < (3, 9), "locust is not supported on python 3.8")
def test_all_resources(self):
"""Test all resources"""
directory = Path(__file__).parent
test_resources_dir = directory.joinpath("test_resources")

locust_file = test_resources_dir.joinpath("all_resources.py")
summary_file = directory.parent.joinpath("load/summaries/all_")
manifest_file = test_resources_dir.joinpath("manifest.yaml")

run_all_resources(str(summary_file), str(locust_file))

with open(manifest_file, "r", encoding="utf-8") as f:
manifest = yaml.safe_load(f)

with open(str(summary_file) + "_stats.csv", "r", encoding="utf-8") as f:
reader = csv.DictReader(f)

for row in reader:
name = row.get("Name")
if name in manifest:
resource = manifest.get(name)
type_ = resource.get("type")
if type_ == row.get("Type"):
for metric, threshold in resource.items():
with self.subTest(stat=metric, resource=name, type_=type_):
stat = row.get(metric)
if stat:
stat = int(stat)
self.assertLessEqual(
stat,
threshold,
msg=f"{metric} for {name} is greater than threshold",
)
Empty file.
50 changes: 50 additions & 0 deletions ingestion/tests/load/test_resources/all_resources.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
"""Test class to run all resources load test"""

import importlib.util
import inspect
import logging
from pathlib import Path
from typing import List

from locust import HttpUser, TaskSet, constant

TASKS_DIR = "tasks"

logger = logging.getLogger(__name__)


def get_all_tasks_set() -> List:
resource_classes = []
wd = Path(__file__).parent.joinpath(TASKS_DIR)
for file_path in wd.glob("*.py"):
if not str(file_path).startswith("base_"):
module_path = str(file_path)
module_name = file_path.stem
spec = importlib.util.spec_from_file_location(module_name, module_path)
if not spec:
logger.error(f"Could not load module {module_name}")
continue
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module) # type: ignore

for _, obj in inspect.getmembers(module, inspect.isclass):
if obj.__module__ == module_name:
resource_classes.append(obj)

return resource_classes


class AllResources(TaskSet):
"""Execute tasks for all resources"""

@classmethod
def set_tasks(cls):
tasks = get_all_tasks_set()
cls.tasks = set(tasks)


class All(HttpUser):
host = "http://localhost:8585"
wait_time = constant(1) # closed workload
AllResources.set_tasks()
tasks = [AllResources]
19 changes: 19 additions & 0 deletions ingestion/tests/load/test_resources/manifest.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# Description: This file contains the manifest for the test resources.
# You can add as a key of a resource any metric available in `summaries/all__summaries.csv` file.
# times should be expressed in milliseconds (e.g. 1000ms = 1s)

/api/v1/dataQuality/testCases/testCaseResults/[fqn]/30_days:
type: GET
99%: 1000

/api/v1/dataQuality/testCases/testCaseResults/[fqn]/60_days:
type: GET
99%: 1000

/api/v1/dataQuality/testCases/testCaseResults/[fqn]/180_days:
type: GET
99%: 1000

/api/v1/dataQuality/testCases?limit=10:
type: GET
99%: 6000
Empty file.
Loading
Loading