From d4f4cc43eabe354896d93190283969afc34f774f Mon Sep 17 00:00:00 2001 From: Daisie Huang Date: Wed, 8 Jan 2025 12:05:57 -0800 Subject: [PATCH] use authx methods instead of directly calling opa --- etc/tests/test_integration.py | 42 ++++++++++++++++------------------- 1 file changed, 19 insertions(+), 23 deletions(-) diff --git a/etc/tests/test_integration.py b/etc/tests/test_integration.py index 8538e3c7..7526f4cf 100644 --- a/etc/tests/test_integration.py +++ b/etc/tests/test_integration.py @@ -10,6 +10,7 @@ import urllib.parse import pprint import time +import authx.auth REPO_DIR = os.path.abspath(f"{os.path.dirname(os.path.realpath(__file__))}/../..") sys.path.insert(0, os.path.abspath(f"{REPO_DIR}")) @@ -20,6 +21,17 @@ ENV = get_env() +class AuthzRequest: + headers = {} + method = None + path = None + + def __init__(self, headers, method, path): + self.headers = headers + self.method = method + self.path = path + + ## Keycloak tests: @@ -96,26 +108,15 @@ def get_katsu_datasets(user): username = ENV[f"{user}_USER"] password = ENV[f"{user}_PASSWORD"] token = get_token(username=username, password=password, access_token=True) + headers = { "Authorization": f"Bearer {token}", "Content-Type": "application/json; charset=utf-8" } - payload = { - "input": {"body": {"path": "/v3/discovery/", "method": "GET"}, "token": token} - } + request = AuthzRequest(headers, "GET", "/v3/authorized/") + response = authx.auth.get_opa_datasets(request) - katsu_headers = { - "Content-Type": "application/json", - "Accept": "application/json", - "Authorization": f"Bearer {get_site_admin_token()}" - } - - response = requests.post( - f"{ENV['CANDIG_ENV']['OPA_URL']}/v1/data/permissions/datasets", - json=payload, - headers=katsu_headers, - ) - return response.json()["result"] + return response def add_program_authorization(program: str, curators: list, @@ -265,14 +266,9 @@ def test_site_admin(user, is_admin): "Authorization": f"Bearer {token}" } - payload["input"]["token"] = token - response = requests.post( - f"{ENV['CANDIG_ENV']['OPA_URL']}/v1/data/permissions/site_admin", - json=payload, - headers=headers, - ) - print(response.json()) - assert ("result" in response.json()) == is_admin + request = AuthzRequest(headers, None, None) + + assert authx.auth.is_site_admin(request) == is_admin def test_add_remove_site_admin():