Skip to content

Commit

Permalink
Updating authz tests to remove deprecated endpoint calls (#932)
Browse files Browse the repository at this point in the history
  • Loading branch information
daisieh authored Jan 8, 2025
1 parent c6a8386 commit 3bf1355
Showing 1 changed file with 44 additions and 66 deletions.
110 changes: 44 additions & 66 deletions etc/tests/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import urllib.parse
import pprint
import time
import authx.auth

REPO_DIR = os.path.abspath(f"{os.path.dirname(os.path.realpath(__file__))}/../..")
sys.path.insert(0, os.path.abspath(f"{REPO_DIR}"))
Expand All @@ -20,6 +21,17 @@
ENV = get_env()


class AuthzRequest:
headers = {}
method = None
path = None

def __init__(self, headers, method, path):
self.headers = headers
self.method = method
self.path = path


## Keycloak tests:


Expand Down Expand Up @@ -82,42 +94,32 @@ def test_tyk():
## Opa tests:
## Test DAC user authorizations

## Can we get the correct dataset response for each user?
def user_auth_datasets():
## Can we get the correct program response for each user?
def user_auth_programs():
return [
("CANDIG_NOT_ADMIN2", "PROGRAM-2"),
("CANDIG_NOT_ADMIN", "PROGRAM-1"),
("CANDIG_NOT_ADMIN", "TEST_2"),
("CANDIG_SITE_ADMIN", "TEST_3"),
]


def get_katsu_datasets(user):
username = ENV[f"{user}_USER"]
password = ENV[f"{user}_PASSWORD"]
token = get_token(username=username, password=password, access_token=True)

headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json; charset=utf-8"
}
payload = {
"input": {"body": {"path": "/v3/discovery/", "method": "GET"}, "token": token}
}
request = AuthzRequest(headers, "GET", "/v3/authorized/")
response = authx.auth.get_opa_datasets(request)

katsu_headers = {
"Content-Type": "application/json",
"Accept": "application/json",
"Authorization": f"Bearer {get_site_admin_token()}"
}

response = requests.post(
f"{ENV['CANDIG_ENV']['OPA_URL']}/v1/data/permissions/datasets",
json=payload,
headers=katsu_headers,
)
return response.json()["result"]
return response


def add_program_authorization(dataset: str, curators: list,
def add_program_authorization(program: str, curators: list,
team_members: list):
token = get_site_admin_token()
headers = {
Expand All @@ -127,7 +129,7 @@ def add_program_authorization(dataset: str, curators: list,

# create a program and its authorizations:
test_program = {
"program_id": dataset,
"program_id": program,
"program_curators": curators,
"team_members": team_members
}
Expand All @@ -142,58 +144,39 @@ def add_program_authorization(dataset: str, curators: list,
return response.json()


def delete_program_authorization(dataset: str):
def delete_program_authorization(program: str):
token = get_site_admin_token()
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json; charset=utf-8",
}
response = requests.delete(f"{ENV['CANDIG_URL']}/ingest/program/{dataset}", headers=headers)
response = requests.delete(f"{ENV['CANDIG_URL']}/ingest/program/{program}", headers=headers)
print(response.text)
return response.json()
return response


## Can we add a program authorization and modify it?
@pytest.mark.parametrize("user, dataset", user_auth_datasets())
def test_add_remove_program_authorization(user, dataset):
add_program_authorization(dataset, [], [])
@pytest.mark.parametrize("user, program", user_auth_programs())
def test_add_remove_program_authorization(user, program):
add_program_authorization(program, [], [])
token = get_site_admin_token()
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json; charset=utf-8",
}

# try adding a user to the program:
test_data = {
"email": ENV[f"{user}_USER"],
"program": dataset
}

response = requests.post(f"{ENV['CANDIG_URL']}/ingest/program/{test_data['program']}/email/{test_data['email']}", headers=headers)
# when the user has admin access, they should be allowed
print(f"{response.json()}, {response.status_code}")
assert response.status_code == 200

assert test_data["program"] in get_katsu_datasets(user)

# remove the user
response = requests.delete(f"{ENV['CANDIG_URL']}/ingest/program/{test_data['program']}/email/{test_data['email']}", headers=headers)
assert response.status_code == 200
assert test_data["email"] not in response.json()[test_data["program"]]["team_members"]

# remove the program
response = requests.delete(f"{ENV['CANDIG_URL']}/ingest/program/{test_data['program']}", headers=headers)
print(response.text)
response = delete_program_authorization(program)
assert response.status_code == 200

response = requests.get(f"{ENV['CANDIG_URL']}/ingest/program/{test_data['program']}", headers=headers)
response = requests.get(f"{ENV['CANDIG_URL']}/ingest/program/{program}", headers=headers)
assert response.status_code == 404


@pytest.mark.parametrize("user, dataset", user_auth_datasets())
def test_user_authorizations(user, dataset):
@pytest.mark.parametrize("user, program", user_auth_programs())
def test_user_authorizations(user, program):
# set up these programs to exist at all:
add_program_authorization(dataset, [], [])
add_program_authorization(program, [], [])

# add user to pending users
username = ENV[f"{user}_USER"]
Expand Down Expand Up @@ -232,11 +215,11 @@ def test_user_authorizations(user, dataset):
)
assert response.status_code == 200

# see if user can access dataset before authorizing
# see if user can access program before authorizing
katsu_datasets = get_katsu_datasets(user)
assert dataset not in katsu_datasets or user == "CANDIG_SITE_ADMIN"
assert program not in katsu_datasets or user == "CANDIG_SITE_ADMIN"

# add dataset to user's authz
# add program to user's authz
from datetime import date

TODAY = date.today()
Expand All @@ -245,18 +228,18 @@ def test_user_authorizations(user, dataset):
response = requests.post(
f"{ENV['CANDIG_URL']}/ingest/user/{safe_name}/authorize",
headers=headers,
json={"program_id": dataset, "start_date": "2000-01-01", "end_date": THE_FUTURE}
json={"program_id": program, "start_date": "2000-01-01", "end_date": THE_FUTURE}
)
print(f"hi {response.text}")
assert response.status_code == 200

# see if user can access dataset now
# see if user can access program now
katsu_datasets = get_katsu_datasets(user)
assert dataset in katsu_datasets
assert program in katsu_datasets

# remove the dataset
# remove the program
response = requests.delete(
f"{ENV['CANDIG_URL']}/ingest/user/{safe_name}/authorize/{dataset}",
f"{ENV['CANDIG_URL']}/ingest/user/{safe_name}/authorize/{program}",
headers=headers
)
assert response.status_code == 200
Expand All @@ -283,14 +266,9 @@ def test_site_admin(user, is_admin):
"Authorization": f"Bearer {token}"
}

payload["input"]["token"] = token
response = requests.post(
f"{ENV['CANDIG_ENV']['OPA_URL']}/v1/data/permissions/site_admin",
json=payload,
headers=headers,
)
print(response.json())
assert ("result" in response.json()) == is_admin
request = AuthzRequest(headers, None, None)

assert authx.auth.is_site_admin(request) == is_admin


def test_add_remove_site_admin():
Expand Down Expand Up @@ -361,7 +339,7 @@ def test_s3_credentials():
# -----------------
def clean_up_program(test_id):
"""
Deletes a dataset and all related objects in katsu, htsget and opa. Expected either
Deletes a program and all related objects in katsu, htsget and opa. Expected either
successful delete or not found if the programs are not ingested.
"""
print(f"deleting {test_id}")
Expand Down

0 comments on commit 3bf1355

Please sign in to comment.