Skip to content

Commit

Permalink
use authx methods instead of directly calling opa
Browse files Browse the repository at this point in the history
  • Loading branch information
daisieh committed Jan 8, 2025
1 parent 465dffe commit d4f4cc4
Showing 1 changed file with 19 additions and 23 deletions.
42 changes: 19 additions & 23 deletions etc/tests/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import urllib.parse
import pprint
import time
import authx.auth

REPO_DIR = os.path.abspath(f"{os.path.dirname(os.path.realpath(__file__))}/../..")
sys.path.insert(0, os.path.abspath(f"{REPO_DIR}"))
Expand All @@ -20,6 +21,17 @@
ENV = get_env()


class AuthzRequest:
headers = {}
method = None
path = None

def __init__(self, headers, method, path):
self.headers = headers
self.method = method
self.path = path


## Keycloak tests:


Expand Down Expand Up @@ -96,26 +108,15 @@ def get_katsu_datasets(user):
username = ENV[f"{user}_USER"]
password = ENV[f"{user}_PASSWORD"]
token = get_token(username=username, password=password, access_token=True)

headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json; charset=utf-8"
}
payload = {
"input": {"body": {"path": "/v3/discovery/", "method": "GET"}, "token": token}
}
request = AuthzRequest(headers, "GET", "/v3/authorized/")
response = authx.auth.get_opa_datasets(request)

katsu_headers = {
"Content-Type": "application/json",
"Accept": "application/json",
"Authorization": f"Bearer {get_site_admin_token()}"
}

response = requests.post(
f"{ENV['CANDIG_ENV']['OPA_URL']}/v1/data/permissions/datasets",
json=payload,
headers=katsu_headers,
)
return response.json()["result"]
return response


def add_program_authorization(program: str, curators: list,
Expand Down Expand Up @@ -265,14 +266,9 @@ def test_site_admin(user, is_admin):
"Authorization": f"Bearer {token}"
}

payload["input"]["token"] = token
response = requests.post(
f"{ENV['CANDIG_ENV']['OPA_URL']}/v1/data/permissions/site_admin",
json=payload,
headers=headers,
)
print(response.json())
assert ("result" in response.json()) == is_admin
request = AuthzRequest(headers, None, None)

assert authx.auth.is_site_admin(request) == is_admin


def test_add_remove_site_admin():
Expand Down

0 comments on commit d4f4cc4

Please sign in to comment.