diff --git a/candig_federation/federation.py b/candig_federation/federation.py index 3070026..c62ad7f 100644 --- a/candig_federation/federation.py +++ b/candig_federation/federation.py @@ -6,10 +6,11 @@ import json import requests +import asyncio +import aiohttp from network import get_registered_servers, get_registered_services from heartbeat import get_live_servers from candigv2_logging.logging import CanDIGLogger -import gevent logger = CanDIGLogger(__file__) @@ -177,7 +178,7 @@ def post_service(self, service, endpoint_path, endpoint_payload): logger.debug(self.message) return - def handle_server_request(self, request, endpoint_path, endpoint_payload, endpoint_service, header): + async def handle_server_request(self, request, endpoint_path, endpoint_payload, endpoint_service, header): """ Make peer server data requests and update the results and status for a FederationResponse @@ -199,7 +200,7 @@ def handle_server_request(self, request, endpoint_path, endpoint_payload, endpoi :type header: object :return: List of ResponseObjects, this specific return is used only in testing """ - future_responses = self.async_requests(request=request, + future_responses = await self.async_requests(request=request, header=header, endpoint_payload=endpoint_payload, endpoint_path=endpoint_path, @@ -210,16 +211,17 @@ def handle_server_request(self, request, endpoint_path, endpoint_payload, endpoi location = future_response["location"] try: response = future_response["response"] + status_code = future_response["status_code"] # If the call was successful append the results - if response.status_code in [200, 201]: - self.results[future_response_id] = response.json()['results'] - self.status[future_response_id] = response.status_code - elif response.status_code == 405: - self.status[future_response_id] = response.status_code - self.message[future_response_id] = f"Unauthorized: {response.text}" + if status_code in [200, 201]: + self.results[future_response_id] = response['results'] + self.status[future_response_id] = status_code + elif status_code == 405: + self.status[future_response_id] = status_code + self.message[future_response_id] = f"Unauthorized: {response}" else: - self.status[future_response_id] = response.status_code + self.status[future_response_id] = status_code self.message[future_response_id] = f"handle_server_request failed on {future_response_id}, federation = {self.header['Federation']}" except AttributeError: if isinstance(future_response, requests.exceptions.ConnectionError): @@ -242,13 +244,13 @@ def handle_server_request(self, request, endpoint_path, endpoint_payload, endpoi continue except Exception as e: self.status[future_response_id] = 500 - self.message[future_response_id] = f"handle_server_request failed on {future_response_id}, federation = {self.header['Federation']}: {type(e)}: {str(e)} {response.text}" + self.message[future_response_id] = f"handle_server_request failed on {future_response_id}, federation = {self.header['Federation']}: {type(e)}: {str(e)} {response}" continue # Return is used for testing individual methods return self.results - def async_requests(self, request, endpoint_path, endpoint_payload, endpoint_service, header): + async def async_requests(self, request, endpoint_path, endpoint_payload, endpoint_service, header): """Send requests to each CanDIG node in the network asynchronously using FutureSession. The futures are returned back to and handled by handle_server_requests() @@ -269,7 +271,7 @@ def async_requests(self, request, endpoint_path, endpoint_payload, endpoint_serv args = {"method": request, "path": endpoint_path, "payload": endpoint_payload, "service": endpoint_service} responses = {} - jobs = {} + jobs = [] for server in self.servers.values(): try: response = {} @@ -282,19 +284,31 @@ def async_requests(self, request, endpoint_path, endpoint_payload, endpoint_serv # self.announce_fed_out(request_type, url, endpoint_path, endpoint_payload) url = f"{server['server']['url']}/federation/v1/fanout" - # spawn each request in a gevent - jobs[server['server']['id']] = gevent.spawn(requests.post, url, json=args, headers=header, timeout=self.timeout) + # spawn each request in an async task + jobs.append((server['server']['id'], url)) responses[server['server']['id']] = response except Exception as e: - jobs[server['server']['id']] = f"{type(e)} {str(e)}" responses[server['server']['id']] = f"async_requests {server['server']['id']}: {type(e)} {str(e)}" - # wait for all of the gevents to come back - gevent.joinall(jobs.values()) - for job in jobs: - responses[job]['response'] = jobs[job].value + async with aiohttp.ClientSession() as session: + tasks = [self.send_request(id, url, args, header, session) for (id, url) in jobs] + results = await asyncio.gather(*tasks) + + for (id, response, status) in results: + responses[id]['response'] = response + responses[id]['status_code'] = status return responses + + async def send_request(self, id, url, args, header, session): + async with session.post(url, json=args, headers=header, timeout=self.timeout) as response: + try: + data = await response.json() + return id, data, response.status + except: + message = await response.text() + return id, message, response.status + def merge_status(self, statuses): """Returns a single status to represent the federated query. @@ -331,7 +345,7 @@ def merge_status(self, statuses): else: return 500 - def get_response_object(self): + async def get_response_object(self): """Driver method to communicate with other CanDIG nodes. 1. Check if federation is needed @@ -347,7 +361,7 @@ def get_response_object(self): if self.federate_check(): - self.handle_server_request(request="GET", + await self.handle_server_request(request="GET", endpoint_path=self.endpoint_path, endpoint_payload=self.endpoint_payload, endpoint_service=self.endpoint_service, @@ -360,7 +374,7 @@ def get_response_object(self): else: if self.federate_check(): - self.handle_server_request(request="POST", + await self.handle_server_request(request="POST", endpoint_path=self.endpoint_path, endpoint_payload=self.endpoint_payload, endpoint_service=self.endpoint_service, diff --git a/candig_federation/federation.yaml b/candig_federation/federation.yaml index c06e17b..3421560 100644 --- a/candig_federation/federation.yaml +++ b/candig_federation/federation.yaml @@ -198,8 +198,7 @@ components: description: Flag for federated query. Federation assumed unless false is specified in: header schema: - type: boolean - default: true + type: string register: name: register in: query diff --git a/candig_federation/gunicorn.conf.py b/candig_federation/gunicorn.conf.py index 10ae6bb..4b30f33 100644 --- a/candig_federation/gunicorn.conf.py +++ b/candig_federation/gunicorn.conf.py @@ -3,7 +3,6 @@ bind = "0.0.0.0:4232" workers = int(os.getenv("WORKERS", 4)) threads = int(os.getenv("THREADS", 4)) -worker_class = "gevent" user = "candig" group = "candig" loglevel = 'debug' diff --git a/candig_federation/heartbeat.py b/candig_federation/heartbeat.py index 9e8971d..2afaa09 100644 --- a/candig_federation/heartbeat.py +++ b/candig_federation/heartbeat.py @@ -4,6 +4,10 @@ import requests import os.path import os +from candigv2_logging.logging import initialize, CanDIGLogger + +initialize() +logger = CanDIGLogger(__file__) def check_pulse(): servers = get_registered_servers() @@ -18,10 +22,11 @@ def check_pulse(): if server['server']['id'] != os.getenv("FEDERATION_SELF_SERVER_ID", 'internal-1'): try: url = f"{server['server']['url']}/hello" - log += f"\ntesting {url}" service_info = requests.get(url, timeout=2) if service_info.ok: live_servers.append(server['server']['id']) + else: + logger.warning(f"Heartbeat not successful for {server['server']['id']} {url}: {service_info.status_code}, {service_info.text}") except Exception as e: log += "\n" + str(e) else: @@ -32,10 +37,6 @@ def check_pulse(): with open('/app/federation/live_servers.txt', 'w') as f: f.write("|".join(live_servers)) - with open('/app/federation/log.txt', 'w') as f: - f.write(log) - - def get_live_servers(): live_servers = [] if os.path.isfile("/app/federation/live_servers.txt"): diff --git a/candig_federation/operations.py b/candig_federation/operations.py index 2996102..03203e4 100644 --- a/candig_federation/operations.py +++ b/candig_federation/operations.py @@ -5,7 +5,7 @@ from authz import is_site_admin import connexion from werkzeug.exceptions import UnsupportedMediaType -from flask import request, Flask +from flask import Flask from federation import FederationResponse from network import get_registered_servers, get_registered_services, register_server, register_service, unregister_server, unregister_service from candigv2_logging.logging import CanDIGLogger @@ -44,15 +44,15 @@ def list_servers(): if servers is not None: result = map(lambda x: x["server"], servers.values()) return list(result), 200 - logger.debug(f"Couldn't list servers", request) + logger.debug(f"Couldn't list servers", connexion.request) return {"message": "Couldn't list servers"}, 500 -def add_server(register=False): +async def add_server(register=False): """ :return: Server added. """ - if not is_site_admin(request): + if not is_site_admin(connexion.request): return {"message": "User is not authorized to POST"}, 403 try: # if register=True, list known servers in Vault and register them all @@ -63,16 +63,17 @@ def add_server(register=False): try: register_server(existing_servers[server]) except Exception as e: - logger.debug(f"failed to register {existing_servers[server]['server']['id']} {type(e)} {str(e)}", request) + logger.debug(f"failed to register {existing_servers[server]['server']['id']} {type(e)} {str(e)}", connexion.request) errors[existing_servers[server]['server']['id']] = f"{type(e)} {str(e)}" if len(errors) > 0: return errors, 500 except Exception as e: - logger.debug(f"Couldn't register servers: {type(e)} {str(e)}", request) + logger.debug(f"Couldn't register servers: {type(e)} {str(e)}", connexion.request) return {"message": f"Couldn't register servers: {type(e)} {str(e)} {connexion.request}"}, 500 try: - if connexion.request.json is not None and 'server' in connexion.request.json: - new_server = connexion.request.json + req = await connexion.request.json() + if req is not None and 'server' in req: + new_server = req if register_server(new_server) is None: return {"message": f"Server {new_server['server']['id']} already present"}, 204 return get_registered_servers()[new_server['server']['id']]['server'], 201 @@ -80,7 +81,7 @@ def add_server(register=False): # this is the exception that gets thrown if the requestbody is null return get_registered_servers(), 200 except Exception as e: - logger.debug(f"Couldn't register server", request) + logger.debug(f"Couldn't add server", connexion.request) return {"message": f"Couldn't add server: {type(e)} {str(e)} {connexion.request}"}, 500 @@ -93,7 +94,7 @@ def get_server(server_id): if servers is not None and server_id in servers: return servers[server_id], 200 else: - logger.debug(f"Couldn't find server {server_id}", request) + logger.debug(f"Couldn't find server {server_id}", connexion.request) return {"message": f"Couldn't find server {server_id}"}, 404 @@ -102,11 +103,11 @@ def delete_server(server_id): """ :return: Server deleted. """ - if not is_site_admin(request): + if not is_site_admin(connexion.request): return {"message": "User is not authorized to POST"}, 403 result = unregister_server(server_id) if result is None: - logger.debug(f"Server not found", request) + logger.debug(f"Server not found", connexion.request) return {"message": f"Server {server_id} not found"}, 404 return result, 200 @@ -118,7 +119,7 @@ def list_services(): services = get_registered_services() if services is not None: return list(services.values()), 200 - logger.debug(f"Couldn't list services", request) + logger.debug(f"Couldn't list services", connexion.request) return {"message": "Couldn't list services"}, 500 @@ -131,31 +132,37 @@ def get_service(service_id): if services is not None and service_id in services: return services[service_id], 200 else: - logger.debug(f"Couldn't find service {service_id}", request) + logger.debug(f"Couldn't find service {service_id}", connexion.request) return {"message": f"Couldn't find service {service_id}"}, 404 -def add_service(register=False): +async def add_service(register=False): """ :return: Service added. """ - if not is_site_admin(request): + if not is_site_admin(connexion.request): return {"message": "User is not authorized to POST"}, 403 - try: - # if register=True, list known services in Vault and register them all - if register: + # if register=True, list known services in Vault and register them all + if register: + try: existing_services = get_registered_services() for service in existing_services: register_service(existing_services[service]) - new_service = connexion.request.json - register_service(new_service) + except Exception as e: + logger.debug(f"Couldn't register existing services", connexion.request) + return {"message": f"Couldn't register existing services: {type(e)} {str(e)}"}, 500 + try: + new_service = await connexion.request.json() + if new_service is not None: + register_service(new_service) + return get_registered_services()[new_service['id']], 200 except UnsupportedMediaType as e: # this is the exception that gets thrown if the requestbody is null return get_registered_services(), 200 except Exception as e: - logger.debug(f"Couldn't add service", request) + logger.debug(f"Couldn't add service", connexion.request) return {"message": f"Couldn't add service: {type(e)} {str(e)} {connexion.request}"}, 500 - return get_registered_services()[new_service['id']], 200 + return get_registered_services(), 200 @app.route('/services/') @@ -163,16 +170,16 @@ def delete_service(service_id): """ :return: Service deleted. """ - if not is_site_admin(request): + if not is_site_admin(connexion.request): return {"message": "User is not authorized to POST"}, 403 result = unregister_service(service_id) if result is None: - logger.debug(f"Couldn't find service", request) + logger.debug(f"Couldn't find service", connexion.request) return {"message": f"Service {service_id} not found"}, 404 return result, 200 -def post_search(): +async def post_search(): """ Send a POST request to CanDIG services and possibly federate it. Method defined by federation.yaml OpenAPI document. @@ -196,8 +203,8 @@ def post_search(): ServiceName - Name of service (used for logstash tagging) """ try: - logger.debug("Sending federated request", request) - data = connexion.request.json + logger.debug("Sending federated request", connexion.request) + data = await connexion.request.json() request_type = data["method"] endpoint_path = data["path"] if endpoint_path[0] == "/": @@ -214,12 +221,13 @@ def post_search(): request=request_type, endpoint_path=endpoint_path, endpoint_payload=endpoint_payload, - request_dict=request, + request_dict=connexion.request, endpoint_service=endpoint_service, unsafe="unsafe" in data ) - return federation_response.get_response_object() + resp, status = await federation_response.get_response_object() + return resp, status except Exception as e: """ @@ -227,7 +235,7 @@ def post_search(): have a valid request_type, endpoint_path and endpoint_payload. A KeyError occuring here will be due to the service dictionary receiving an invalid key. """ - logger.error(f"{type(e)} {str(e)}", request) + logger.error(f"post_search {type(e)} {str(e)}", connexion.request) return { "response": f"{type(e)} {str(e)}", "status": 404, diff --git a/candig_federation/server.py b/candig_federation/server.py index 0e3102c..7bdddf7 100644 --- a/candig_federation/server.py +++ b/candig_federation/server.py @@ -1,53 +1,23 @@ -#!/usr/bin/env python3 - -""" -Driver program for service -""" -from gevent import monkey - -monkey.patch_all() - -import connexion # from prometheus_flask_exporter import PrometheusMetrics from flask_cors import CORS -import os.path -import os +import connexion import candigv2_logging.logging candigv2_logging.logging.initialize() +# Create the application instance +app = connexion.FlaskApp(__name__, specification_dir='./') +CORS(app.app) -def main(): - """ - Main Routine - - Set up server and service dictionaries - """ - - CONFIG_DIR = os.getenv("CONFIG_DIR", "../config") - - return APP +app.add_api('federation.yaml', strict_validation=True, validate_responses=True) -def configure_app(): - """ - Set up base flask app from Connexion +def main(): + # Create the application instance + app = connexion.FlaskApp(__name__, specification_dir='./') + CORS(app.app) - App pulled out as global variable to allow import into - testing files to access application context - """ - app = connexion.FlaskApp(__name__, options={"swagger_url": "/"}) app.add_api('federation.yaml', strict_validation=True, validate_responses=True) return app - -APP = configure_app() -APPLICATION = main() - -# expose flask app for uwsgi - -application = APPLICATION.app -# metrics = PrometheusMetrics(application) -CORS(application) - if __name__ == '__main__': - APPLICATION.run() + app.run() diff --git a/entrypoint.sh b/entrypoint.sh index 89e75c6..208cb10 100644 --- a/entrypoint.sh +++ b/entrypoint.sh @@ -17,4 +17,4 @@ bash candig_federation/heartbeat.sh & # use the following instead for production deployment cd candig_federation -gunicorn server:application \ No newline at end of file +gunicorn -k uvicorn.workers.UvicornWorker server:app diff --git a/requirements.txt b/requirements.txt index f596416..dbb690a 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,15 +1,16 @@ -attrs~=23.1.0 +aiohttp>=3.11.8 candigv2-authx@git+https://github.com/CanDIG/candigv2-authx.git@v2.6.0 candigv2-logging@git+https://github.com/CanDIG/candigv2-logging.git@v1.0.0 -connexion==2.14.1 +connexion==3.1.0 +connexion[swagger-ui] +connexion[flask] decorator==4.4.0 -flask==2.2.5 +Flask==3.1.0 +prometheus-flask-exporter==0.13.0 +Flask-Cors==5.0.0 +requests-mock>=1.12.1 +gunicorn>=23.0.0 +uvicorn[standard]==0.30.6 pyyaml>=4.2b1 pytest==7.2.0 pytest-cov==2.7.1 -swagger-ui-bundle==0.0.5 -prometheus-flask-exporter==0.13.0 -Flask-Cors==5.0.0 -requests-mock>=1.11.0 -gunicorn[gevent]>=23.0.0 -gevent>=24.2.1 diff --git a/tests/test_uniform_federation.py b/tests/test_uniform_federation.py index 2d03ac4..9e8fdb2 100644 --- a/tests/test_uniform_federation.py +++ b/tests/test_uniform_federation.py @@ -270,67 +270,67 @@ def mocked_async_local_TimeOUt_p1_Timeout_requests_post(*args, **kwargs): # Test basic service requests -------------------------------------------------------------------- @patch('federation.requests.Session.get', side_effect=mocked_service_get) -def test_valid_noFed_get(mock_requests, client, two_servers): +async def test_valid_noFed_get(mock_requests, client, two_servers): with client: FR = get_federation_response("GET") - RO, Status = FR.get_response_object() + RO, Status = await FR.get_response_object() assert RO["status"] == 200 assert RO["results"] == GetResponse["j1"] @patch('federation.requests.Session.post', side_effect=mocked_service_post) -def test_valid_noFed_post(mock_requests, client, two_servers): +async def test_valid_noFed_post(mock_requests, client, two_servers): with client: FR = get_federation_response("POST") - RO, Status = FR.get_response_object() + RO, Status = await FR.get_response_object() assert RO["status"] == 200 assert RO["results"] == PostListV1 # Test basic service errors -------------------------------------------------------------------- @patch('federation.requests.Session.get', side_effect=exceptions.ConnectionError) -def test_invalid_noFed_get(mock_requests, client, two_servers): +async def test_invalid_noFed_get(mock_requests, client, two_servers): with client: FR = get_federation_response("GET") - RO, Status = FR.get_response_object() + RO, Status = await FR.get_response_object() assert RO["status"] == 404 assert RO["results"] == {} @patch('federation.requests.Session.post', side_effect=exceptions.ConnectionError) -def test_invalid_noFed_post(mock_requests, client, two_servers): +async def test_invalid_noFed_post(mock_requests, client, two_servers): with client: FR = get_federation_response("POST") - RO, Status = FR.get_response_object() + RO, Status = await FR.get_response_object() assert RO["status"] == 404 assert RO["results"] == {} @patch('federation.requests.Session.get', side_effect=exceptions.Timeout) -def test_timeout_noFed_get(mock_requests, client, two_servers): +async def test_timeout_noFed_get(mock_requests, client, two_servers): with client: FR = get_federation_response("GET") - RO, Status = FR.get_response_object() + RO, Status = await FR.get_response_object() assert RO["status"] == 504 assert RO["results"] == {} @patch('federation.requests.Session.post', side_effect=exceptions.Timeout) -def test_timeout_noFed_post(mock_requests, client, two_servers): +async def test_timeout_noFed_post(mock_requests, client, two_servers): with client: FR = get_federation_response("POST") - RO, Status = FR.get_response_object() + RO, Status = await FR.get_response_object() assert RO["status"] == 504 assert RO["results"] == {} # Test the async request function -------------------------------------------------------------------- @patch('federation.requests.post', side_effect=mocked_async_requests_get) -def test_valid_asyncRequests_two_servers_get(mock_requests, client, two_servers): +async def test_valid_asyncRequests_two_servers_get(mock_requests, client, two_servers): with client: FR = get_federation_response("POST", "Federate") - resp = FR.async_requests(request='GET', + resp = await FR.async_requests(request='GET', endpoint_path=TestParams["path"], endpoint_payload="", header=TestParams["Headers"], @@ -343,10 +343,10 @@ def test_valid_asyncRequests_two_servers_get(mock_requests, client, two_servers) @patch('federation.requests.post', side_effect=mocked_async_requests_post) -def test_valid_asyncRequests_two_servers_post(mock_requests, client, two_servers): +async def test_valid_asyncRequests_two_servers_post(mock_requests, client, two_servers): with client: FR = get_federation_response("POST") - resp = FR.async_requests(request='POST', + resp = await FR.async_requests(request='POST', endpoint_path=TestParams["path"], endpoint_payload="", header=TestParams["Headers"], @@ -362,10 +362,10 @@ def test_valid_asyncRequests_two_servers_post(mock_requests, client, two_servers @patch('federation.requests.Session.get', side_effect=mocked_service_get) @patch('federation.requests.post', side_effect=mocked_async_requests_get) -def test_valid_ServerRequest_one_server_get(mock_requests, mock_session, client, two_servers): +async def test_valid_ServerRequest_one_server_get(mock_requests, mock_session, client, two_servers): with client: FR = get_federation_response("GET", "Federate") - RO, Status = FR.get_response_object() + RO, Status = await FR.get_response_object() assert Status == 200 for server in RO: loc = server["location"]["name"] @@ -374,7 +374,7 @@ def test_valid_ServerRequest_one_server_get(mock_requests, mock_session, client, @patch('federation.requests.Session.get', side_effect=mocked_service_get) @patch('federation.requests.post', side_effect=mocked_async_requests_get) -def test_valid_federated_query_one_server_get(mock_requests, mock_session, client, two_servers): +async def test_valid_federated_query_one_server_get(mock_requests, mock_session, client, two_servers): with client: with APP.app.test_request_context( data=json.dumps({"path": TestParams["path"], @@ -385,7 +385,7 @@ def test_valid_federated_query_one_server_get(mock_requests, mock_session, clien }), headers=Headers(fedHeader.headers) ): - RO, Status = operations.post_search() + RO, Status = await operations.post_search() assert Status == 200 for server in RO: @@ -395,10 +395,10 @@ def test_valid_federated_query_one_server_get(mock_requests, mock_session, clien @patch('federation.requests.Session.post', side_effect=mocked_service_post) @patch('federation.requests.post', side_effect=mocked_async_requests_post) -def test_valid_ServerRequest_one_server_post(mock_session, mock_requests, client, two_servers): +async def test_valid_ServerRequest_one_server_post(mock_session, mock_requests, client, two_servers): with client: FR = get_federation_response("POST", "Federate") - RO, Status = FR.get_response_object() + RO, Status = await FR.get_response_object() assert Status == 200 for server in RO: @@ -408,7 +408,7 @@ def test_valid_ServerRequest_one_server_post(mock_session, mock_requests, client @patch('federation.requests.Session.post', side_effect=mocked_service_post) @patch('federation.requests.post', side_effect=mocked_async_requests_post) -def test_valid_federated_query_one_server_post(mock_requests, mock_session, client, two_servers): +async def test_valid_federated_query_one_server_post(mock_requests, mock_session, client, two_servers): with client: with APP.app.test_request_context( data=json.dumps({"path": TestParams["path"], @@ -419,7 +419,7 @@ def test_valid_federated_query_one_server_post(mock_requests, mock_session, clie }), headers=Headers(fedHeader.headers) ): - RO, Status = operations.post_search() + RO, Status = await operations.post_search() assert Status == 200 for server in RO: @@ -433,10 +433,10 @@ def test_valid_federated_query_one_server_post(mock_requests, mock_session, clie @patch('federation.requests.Session.get', side_effect=mocked_service_get) @patch('federation.requests.post', side_effect=mocked_async_requests_get) -def test_valid_ServerRequest_two_server_get(mock_requests, mock_session, client, three_servers): +async def test_valid_ServerRequest_two_server_get(mock_requests, mock_session, client, three_servers): with client: FR = get_federation_response("GET", "Federate") - RO, Status = FR.get_response_object() + RO, Status = await FR.get_response_object() assert Status == 200 for server in RO: @@ -447,7 +447,7 @@ def test_valid_ServerRequest_two_server_get(mock_requests, mock_session, client, @patch('federation.requests.Session.get', side_effect=mocked_service_get) @patch('federation.requests.post', side_effect=mocked_async_requests_get) -def test_valid_federated_query_two_server_get(mock_requests, mock_session, client, three_servers): +async def test_valid_federated_query_two_server_get(mock_requests, mock_session, client, three_servers): with client: with APP.app.test_request_context( data=json.dumps({"path": TestParams["path"], @@ -458,7 +458,7 @@ def test_valid_federated_query_two_server_get(mock_requests, mock_session, clien }), headers=Headers(fedHeader.headers) ): - RO, Status = operations.post_search() + RO, Status = await operations.post_search() assert Status == 200 for server in RO: @@ -469,10 +469,10 @@ def test_valid_federated_query_two_server_get(mock_requests, mock_session, clien @patch('federation.requests.Session.post', side_effect=mocked_service_post) @patch('federation.requests.post', side_effect=mocked_async_requests_post) -def test_valid_ServerRequest_two_server_post(mock_session, mock_requests, client, three_servers): +async def test_valid_ServerRequest_two_server_post(mock_session, mock_requests, client, three_servers): with client: FR = get_federation_response("POST", "Federate") - RO, Status = FR.get_response_object() + RO, Status = await FR.get_response_object() assert Status == 200 for server in RO: @@ -483,7 +483,7 @@ def test_valid_ServerRequest_two_server_post(mock_session, mock_requests, client @patch('federation.requests.Session.post', side_effect=mocked_service_post) @patch('federation.requests.post', side_effect=mocked_async_requests_post) -def test_valid_federated_query_two_server_post(mock_requests, mock_session, client, three_servers): +async def test_valid_federated_query_two_server_post(mock_requests, mock_session, client, three_servers): with client: with APP.app.test_request_context( data=json.dumps({"path": TestParams["path"], @@ -494,7 +494,7 @@ def test_valid_federated_query_two_server_post(mock_requests, mock_session, clie }), headers=Headers(fedHeader.headers) ): - RO, Status = operations.post_search() + RO, Status = await operations.post_search() assert Status == 200 for server in RO: @@ -505,7 +505,7 @@ def test_valid_federated_query_two_server_post(mock_requests, mock_session, clie @patch('federation.requests.Session.get', side_effect=mocked_service_get) @patch('federation.requests.post', side_effect=mocked_async_requests_get) -def test_invalid_backslash_endpoint_start(mock_requests, mock_session, client, two_servers): +async def test_invalid_backslash_endpoint_start(mock_requests, mock_session, client, two_servers): with client: with APP.app.test_request_context( data=json.dumps({"path": "/fail/this/path", @@ -516,7 +516,7 @@ def test_invalid_backslash_endpoint_start(mock_requests, mock_session, client, t }), headers=Headers(fedHeader.headers) ): - RO, Status = operations.post_search() + RO, Status = await operations.post_search() assert Status == 400