Skip to content

Commit

Permalink
Merge pull request #82 from CanDIG/daisieh/updates
Browse files Browse the repository at this point in the history
  • Loading branch information
daisieh authored Nov 29, 2024
2 parents 3452782 + 524c12e commit cb134f1
Show file tree
Hide file tree
Showing 9 changed files with 138 additions and 146 deletions.
60 changes: 37 additions & 23 deletions candig_federation/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@

import json
import requests
import asyncio
import aiohttp
from network import get_registered_servers, get_registered_services
from heartbeat import get_live_servers
from candigv2_logging.logging import CanDIGLogger
import gevent

logger = CanDIGLogger(__file__)

Expand Down Expand Up @@ -177,7 +178,7 @@ def post_service(self, service, endpoint_path, endpoint_payload):
logger.debug(self.message)
return

def handle_server_request(self, request, endpoint_path, endpoint_payload, endpoint_service, header):
async def handle_server_request(self, request, endpoint_path, endpoint_payload, endpoint_service, header):
"""
Make peer server data requests and update the results and status for a FederationResponse
Expand All @@ -199,7 +200,7 @@ def handle_server_request(self, request, endpoint_path, endpoint_payload, endpoi
:type header: object
:return: List of ResponseObjects, this specific return is used only in testing
"""
future_responses = self.async_requests(request=request,
future_responses = await self.async_requests(request=request,
header=header,
endpoint_payload=endpoint_payload,
endpoint_path=endpoint_path,
Expand All @@ -210,16 +211,17 @@ def handle_server_request(self, request, endpoint_path, endpoint_payload, endpoi
location = future_response["location"]
try:
response = future_response["response"]
status_code = future_response["status_code"]

# If the call was successful append the results
if response.status_code in [200, 201]:
self.results[future_response_id] = response.json()['results']
self.status[future_response_id] = response.status_code
elif response.status_code == 405:
self.status[future_response_id] = response.status_code
self.message[future_response_id] = f"Unauthorized: {response.text}"
if status_code in [200, 201]:
self.results[future_response_id] = response['results']
self.status[future_response_id] = status_code
elif status_code == 405:
self.status[future_response_id] = status_code
self.message[future_response_id] = f"Unauthorized: {response}"
else:
self.status[future_response_id] = response.status_code
self.status[future_response_id] = status_code
self.message[future_response_id] = f"handle_server_request failed on {future_response_id}, federation = {self.header['Federation']}"
except AttributeError:
if isinstance(future_response, requests.exceptions.ConnectionError):
Expand All @@ -242,13 +244,13 @@ def handle_server_request(self, request, endpoint_path, endpoint_payload, endpoi
continue
except Exception as e:
self.status[future_response_id] = 500
self.message[future_response_id] = f"handle_server_request failed on {future_response_id}, federation = {self.header['Federation']}: {type(e)}: {str(e)} {response.text}"
self.message[future_response_id] = f"handle_server_request failed on {future_response_id}, federation = {self.header['Federation']}: {type(e)}: {str(e)} {response}"
continue

# Return is used for testing individual methods
return self.results

def async_requests(self, request, endpoint_path, endpoint_payload, endpoint_service, header):
async def async_requests(self, request, endpoint_path, endpoint_payload, endpoint_service, header):
"""Send requests to each CanDIG node in the network asynchronously using FutureSession. The
futures are returned back to and handled by handle_server_requests()
Expand All @@ -269,7 +271,7 @@ def async_requests(self, request, endpoint_path, endpoint_payload, endpoint_serv
args = {"method": request, "path": endpoint_path,
"payload": endpoint_payload, "service": endpoint_service}
responses = {}
jobs = {}
jobs = []
for server in self.servers.values():
try:
response = {}
Expand All @@ -282,19 +284,31 @@ def async_requests(self, request, endpoint_path, endpoint_payload, endpoint_serv
# self.announce_fed_out(request_type, url, endpoint_path, endpoint_payload)
url = f"{server['server']['url']}/federation/v1/fanout"

# spawn each request in a gevent
jobs[server['server']['id']] = gevent.spawn(requests.post, url, json=args, headers=header, timeout=self.timeout)
# spawn each request in an async task
jobs.append((server['server']['id'], url))
responses[server['server']['id']] = response

except Exception as e:
jobs[server['server']['id']] = f"{type(e)} {str(e)}"
responses[server['server']['id']] = f"async_requests {server['server']['id']}: {type(e)} {str(e)}"
# wait for all of the gevents to come back
gevent.joinall(jobs.values())
for job in jobs:
responses[job]['response'] = jobs[job].value
async with aiohttp.ClientSession() as session:
tasks = [self.send_request(id, url, args, header, session) for (id, url) in jobs]
results = await asyncio.gather(*tasks)

for (id, response, status) in results:
responses[id]['response'] = response
responses[id]['status_code'] = status
return responses


async def send_request(self, id, url, args, header, session):
async with session.post(url, json=args, headers=header, timeout=self.timeout) as response:
try:
data = await response.json()
return id, data, response.status
except:
message = await response.text()
return id, message, response.status

def merge_status(self, statuses):
"""Returns a single status to represent the federated query.
Expand Down Expand Up @@ -331,7 +345,7 @@ def merge_status(self, statuses):
else:
return 500

def get_response_object(self):
async def get_response_object(self):
"""Driver method to communicate with other CanDIG nodes.
1. Check if federation is needed
Expand All @@ -347,7 +361,7 @@ def get_response_object(self):

if self.federate_check():

self.handle_server_request(request="GET",
await self.handle_server_request(request="GET",
endpoint_path=self.endpoint_path,
endpoint_payload=self.endpoint_payload,
endpoint_service=self.endpoint_service,
Expand All @@ -360,7 +374,7 @@ def get_response_object(self):
else:

if self.federate_check():
self.handle_server_request(request="POST",
await self.handle_server_request(request="POST",
endpoint_path=self.endpoint_path,
endpoint_payload=self.endpoint_payload,
endpoint_service=self.endpoint_service,
Expand Down
3 changes: 1 addition & 2 deletions candig_federation/federation.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -198,8 +198,7 @@ components:
description: Flag for federated query. Federation assumed unless false is specified
in: header
schema:
type: boolean
default: true
type: string
register:
name: register
in: query
Expand Down
1 change: 0 additions & 1 deletion candig_federation/gunicorn.conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
bind = "0.0.0.0:4232"
workers = int(os.getenv("WORKERS", 4))
threads = int(os.getenv("THREADS", 4))
worker_class = "gevent"
user = "candig"
group = "candig"
loglevel = 'debug'
Expand Down
11 changes: 6 additions & 5 deletions candig_federation/heartbeat.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@
import requests
import os.path
import os
from candigv2_logging.logging import initialize, CanDIGLogger

initialize()
logger = CanDIGLogger(__file__)

def check_pulse():
servers = get_registered_servers()
Expand All @@ -18,10 +22,11 @@ def check_pulse():
if server['server']['id'] != os.getenv("FEDERATION_SELF_SERVER_ID", 'internal-1'):
try:
url = f"{server['server']['url']}/hello"
log += f"\ntesting {url}"
service_info = requests.get(url, timeout=2)
if service_info.ok:
live_servers.append(server['server']['id'])
else:
logger.warning(f"Heartbeat not successful for {server['server']['id']} {url}: {service_info.status_code}, {service_info.text}")
except Exception as e:
log += "\n" + str(e)
else:
Expand All @@ -32,10 +37,6 @@ def check_pulse():
with open('/app/federation/live_servers.txt', 'w') as f:
f.write("|".join(live_servers))

with open('/app/federation/log.txt', 'w') as f:
f.write(log)


def get_live_servers():
live_servers = []
if os.path.isfile("/app/federation/live_servers.txt"):
Expand Down
70 changes: 39 additions & 31 deletions candig_federation/operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from authz import is_site_admin
import connexion
from werkzeug.exceptions import UnsupportedMediaType
from flask import request, Flask
from flask import Flask
from federation import FederationResponse
from network import get_registered_servers, get_registered_services, register_server, register_service, unregister_server, unregister_service
from candigv2_logging.logging import CanDIGLogger
Expand Down Expand Up @@ -44,15 +44,15 @@ def list_servers():
if servers is not None:
result = map(lambda x: x["server"], servers.values())
return list(result), 200
logger.debug(f"Couldn't list servers", request)
logger.debug(f"Couldn't list servers", connexion.request)
return {"message": "Couldn't list servers"}, 500


def add_server(register=False):
async def add_server(register=False):
"""
:return: Server added.
"""
if not is_site_admin(request):
if not is_site_admin(connexion.request):
return {"message": "User is not authorized to POST"}, 403
try:
# if register=True, list known servers in Vault and register them all
Expand All @@ -63,24 +63,25 @@ def add_server(register=False):
try:
register_server(existing_servers[server])
except Exception as e:
logger.debug(f"failed to register {existing_servers[server]['server']['id']} {type(e)} {str(e)}", request)
logger.debug(f"failed to register {existing_servers[server]['server']['id']} {type(e)} {str(e)}", connexion.request)
errors[existing_servers[server]['server']['id']] = f"{type(e)} {str(e)}"
if len(errors) > 0:
return errors, 500
except Exception as e:
logger.debug(f"Couldn't register servers: {type(e)} {str(e)}", request)
logger.debug(f"Couldn't register servers: {type(e)} {str(e)}", connexion.request)
return {"message": f"Couldn't register servers: {type(e)} {str(e)} {connexion.request}"}, 500
try:
if connexion.request.json is not None and 'server' in connexion.request.json:
new_server = connexion.request.json
req = await connexion.request.json()
if req is not None and 'server' in req:
new_server = req
if register_server(new_server) is None:
return {"message": f"Server {new_server['server']['id']} already present"}, 204
return get_registered_servers()[new_server['server']['id']]['server'], 201
except UnsupportedMediaType as e:
# this is the exception that gets thrown if the requestbody is null
return get_registered_servers(), 200
except Exception as e:
logger.debug(f"Couldn't register server", request)
logger.debug(f"Couldn't add server", connexion.request)
return {"message": f"Couldn't add server: {type(e)} {str(e)} {connexion.request}"}, 500


Expand All @@ -93,7 +94,7 @@ def get_server(server_id):
if servers is not None and server_id in servers:
return servers[server_id], 200
else:
logger.debug(f"Couldn't find server {server_id}", request)
logger.debug(f"Couldn't find server {server_id}", connexion.request)
return {"message": f"Couldn't find server {server_id}"}, 404


Expand All @@ -102,11 +103,11 @@ def delete_server(server_id):
"""
:return: Server deleted.
"""
if not is_site_admin(request):
if not is_site_admin(connexion.request):
return {"message": "User is not authorized to POST"}, 403
result = unregister_server(server_id)
if result is None:
logger.debug(f"Server not found", request)
logger.debug(f"Server not found", connexion.request)
return {"message": f"Server {server_id} not found"}, 404
return result, 200

Expand All @@ -118,7 +119,7 @@ def list_services():
services = get_registered_services()
if services is not None:
return list(services.values()), 200
logger.debug(f"Couldn't list services", request)
logger.debug(f"Couldn't list services", connexion.request)
return {"message": "Couldn't list services"}, 500


Expand All @@ -131,48 +132,54 @@ def get_service(service_id):
if services is not None and service_id in services:
return services[service_id], 200
else:
logger.debug(f"Couldn't find service {service_id}", request)
logger.debug(f"Couldn't find service {service_id}", connexion.request)
return {"message": f"Couldn't find service {service_id}"}, 404


def add_service(register=False):
async def add_service(register=False):
"""
:return: Service added.
"""
if not is_site_admin(request):
if not is_site_admin(connexion.request):
return {"message": "User is not authorized to POST"}, 403
try:
# if register=True, list known services in Vault and register them all
if register:
# if register=True, list known services in Vault and register them all
if register:
try:
existing_services = get_registered_services()
for service in existing_services:
register_service(existing_services[service])
new_service = connexion.request.json
register_service(new_service)
except Exception as e:
logger.debug(f"Couldn't register existing services", connexion.request)
return {"message": f"Couldn't register existing services: {type(e)} {str(e)}"}, 500
try:
new_service = await connexion.request.json()
if new_service is not None:
register_service(new_service)
return get_registered_services()[new_service['id']], 200
except UnsupportedMediaType as e:
# this is the exception that gets thrown if the requestbody is null
return get_registered_services(), 200
except Exception as e:
logger.debug(f"Couldn't add service", request)
logger.debug(f"Couldn't add service", connexion.request)
return {"message": f"Couldn't add service: {type(e)} {str(e)} {connexion.request}"}, 500
return get_registered_services()[new_service['id']], 200
return get_registered_services(), 200


@app.route('/services/<path:service_id>')
def delete_service(service_id):
"""
:return: Service deleted.
"""
if not is_site_admin(request):
if not is_site_admin(connexion.request):
return {"message": "User is not authorized to POST"}, 403
result = unregister_service(service_id)
if result is None:
logger.debug(f"Couldn't find service", request)
logger.debug(f"Couldn't find service", connexion.request)
return {"message": f"Service {service_id} not found"}, 404
return result, 200


def post_search():
async def post_search():
"""
Send a POST request to CanDIG services and possibly federate it.
Method defined by federation.yaml OpenAPI document.
Expand All @@ -196,8 +203,8 @@ def post_search():
ServiceName - Name of service (used for logstash tagging)
"""
try:
logger.debug("Sending federated request", request)
data = connexion.request.json
logger.debug("Sending federated request", connexion.request)
data = await connexion.request.json()
request_type = data["method"]
endpoint_path = data["path"]
if endpoint_path[0] == "/":
Expand All @@ -214,20 +221,21 @@ def post_search():
request=request_type,
endpoint_path=endpoint_path,
endpoint_payload=endpoint_payload,
request_dict=request,
request_dict=connexion.request,
endpoint_service=endpoint_service,
unsafe="unsafe" in data
)

return federation_response.get_response_object()
resp, status = await federation_response.get_response_object()
return resp, status

except Exception as e:
"""
Due to Connexion parsing the args prior this code running, it will be assumed that we
have a valid request_type, endpoint_path and endpoint_payload. A KeyError occuring here
will be due to the service dictionary receiving an invalid key.
"""
logger.error(f"{type(e)} {str(e)}", request)
logger.error(f"post_search {type(e)} {str(e)}", connexion.request)
return {
"response": f"{type(e)} {str(e)}",
"status": 404,
Expand Down
Loading

0 comments on commit cb134f1

Please sign in to comment.