Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DIG-1836: Fix Snyk vulnerabilities #82

Merged
merged 9 commits into from
Nov 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 37 additions & 23 deletions candig_federation/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@

import json
import requests
import asyncio
import aiohttp
from network import get_registered_servers, get_registered_services
from heartbeat import get_live_servers
from candigv2_logging.logging import CanDIGLogger
import gevent

logger = CanDIGLogger(__file__)

Expand Down Expand Up @@ -177,7 +178,7 @@ def post_service(self, service, endpoint_path, endpoint_payload):
logger.debug(self.message)
return

def handle_server_request(self, request, endpoint_path, endpoint_payload, endpoint_service, header):
async def handle_server_request(self, request, endpoint_path, endpoint_payload, endpoint_service, header):
"""
Make peer server data requests and update the results and status for a FederationResponse

Expand All @@ -199,7 +200,7 @@ def handle_server_request(self, request, endpoint_path, endpoint_payload, endpoi
:type header: object
:return: List of ResponseObjects, this specific return is used only in testing
"""
future_responses = self.async_requests(request=request,
future_responses = await self.async_requests(request=request,
header=header,
endpoint_payload=endpoint_payload,
endpoint_path=endpoint_path,
Expand All @@ -210,16 +211,17 @@ def handle_server_request(self, request, endpoint_path, endpoint_payload, endpoi
location = future_response["location"]
try:
response = future_response["response"]
status_code = future_response["status_code"]

# If the call was successful append the results
if response.status_code in [200, 201]:
self.results[future_response_id] = response.json()['results']
self.status[future_response_id] = response.status_code
elif response.status_code == 405:
self.status[future_response_id] = response.status_code
self.message[future_response_id] = f"Unauthorized: {response.text}"
if status_code in [200, 201]:
self.results[future_response_id] = response['results']
self.status[future_response_id] = status_code
elif status_code == 405:
self.status[future_response_id] = status_code
self.message[future_response_id] = f"Unauthorized: {response}"
else:
self.status[future_response_id] = response.status_code
self.status[future_response_id] = status_code
self.message[future_response_id] = f"handle_server_request failed on {future_response_id}, federation = {self.header['Federation']}"
except AttributeError:
if isinstance(future_response, requests.exceptions.ConnectionError):
Expand All @@ -242,13 +244,13 @@ def handle_server_request(self, request, endpoint_path, endpoint_payload, endpoi
continue
except Exception as e:
self.status[future_response_id] = 500
self.message[future_response_id] = f"handle_server_request failed on {future_response_id}, federation = {self.header['Federation']}: {type(e)}: {str(e)} {response.text}"
self.message[future_response_id] = f"handle_server_request failed on {future_response_id}, federation = {self.header['Federation']}: {type(e)}: {str(e)} {response}"
continue

# Return is used for testing individual methods
return self.results

def async_requests(self, request, endpoint_path, endpoint_payload, endpoint_service, header):
async def async_requests(self, request, endpoint_path, endpoint_payload, endpoint_service, header):
"""Send requests to each CanDIG node in the network asynchronously using FutureSession. The
futures are returned back to and handled by handle_server_requests()

Expand All @@ -269,7 +271,7 @@ def async_requests(self, request, endpoint_path, endpoint_payload, endpoint_serv
args = {"method": request, "path": endpoint_path,
"payload": endpoint_payload, "service": endpoint_service}
responses = {}
jobs = {}
jobs = []
for server in self.servers.values():
try:
response = {}
Expand All @@ -282,19 +284,31 @@ def async_requests(self, request, endpoint_path, endpoint_payload, endpoint_serv
# self.announce_fed_out(request_type, url, endpoint_path, endpoint_payload)
url = f"{server['server']['url']}/federation/v1/fanout"

# spawn each request in a gevent
jobs[server['server']['id']] = gevent.spawn(requests.post, url, json=args, headers=header, timeout=self.timeout)
# spawn each request in an async task
jobs.append((server['server']['id'], url))
responses[server['server']['id']] = response

except Exception as e:
jobs[server['server']['id']] = f"{type(e)} {str(e)}"
responses[server['server']['id']] = f"async_requests {server['server']['id']}: {type(e)} {str(e)}"
# wait for all of the gevents to come back
gevent.joinall(jobs.values())
for job in jobs:
responses[job]['response'] = jobs[job].value
async with aiohttp.ClientSession() as session:
tasks = [self.send_request(id, url, args, header, session) for (id, url) in jobs]
results = await asyncio.gather(*tasks)

for (id, response, status) in results:
responses[id]['response'] = response
responses[id]['status_code'] = status
return responses


async def send_request(self, id, url, args, header, session):
async with session.post(url, json=args, headers=header, timeout=self.timeout) as response:
try:
data = await response.json()
return id, data, response.status
except:
message = await response.text()
return id, message, response.status

def merge_status(self, statuses):
"""Returns a single status to represent the federated query.

Expand Down Expand Up @@ -331,7 +345,7 @@ def merge_status(self, statuses):
else:
return 500

def get_response_object(self):
async def get_response_object(self):
"""Driver method to communicate with other CanDIG nodes.

1. Check if federation is needed
Expand All @@ -347,7 +361,7 @@ def get_response_object(self):

if self.federate_check():

self.handle_server_request(request="GET",
await self.handle_server_request(request="GET",
endpoint_path=self.endpoint_path,
endpoint_payload=self.endpoint_payload,
endpoint_service=self.endpoint_service,
Expand All @@ -360,7 +374,7 @@ def get_response_object(self):
else:

if self.federate_check():
self.handle_server_request(request="POST",
await self.handle_server_request(request="POST",
endpoint_path=self.endpoint_path,
endpoint_payload=self.endpoint_payload,
endpoint_service=self.endpoint_service,
Expand Down
3 changes: 1 addition & 2 deletions candig_federation/federation.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -198,8 +198,7 @@ components:
description: Flag for federated query. Federation assumed unless false is specified
in: header
schema:
type: boolean
default: true
type: string
register:
name: register
in: query
Expand Down
1 change: 0 additions & 1 deletion candig_federation/gunicorn.conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
bind = "0.0.0.0:4232"
workers = int(os.getenv("WORKERS", 4))
threads = int(os.getenv("THREADS", 4))
worker_class = "gevent"
user = "candig"
group = "candig"
loglevel = 'debug'
Expand Down
11 changes: 6 additions & 5 deletions candig_federation/heartbeat.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@
import requests
import os.path
import os
from candigv2_logging.logging import initialize, CanDIGLogger

initialize()
logger = CanDIGLogger(__file__)

def check_pulse():
servers = get_registered_servers()
Expand All @@ -18,10 +22,11 @@ def check_pulse():
if server['server']['id'] != os.getenv("FEDERATION_SELF_SERVER_ID", 'internal-1'):
try:
url = f"{server['server']['url']}/hello"
log += f"\ntesting {url}"
service_info = requests.get(url, timeout=2)
if service_info.ok:
live_servers.append(server['server']['id'])
else:
logger.warning(f"Heartbeat not successful for {server['server']['id']} {url}: {service_info.status_code}, {service_info.text}")
except Exception as e:
log += "\n" + str(e)
else:
Expand All @@ -32,10 +37,6 @@ def check_pulse():
with open('/app/federation/live_servers.txt', 'w') as f:
f.write("|".join(live_servers))

with open('/app/federation/log.txt', 'w') as f:
f.write(log)


def get_live_servers():
live_servers = []
if os.path.isfile("/app/federation/live_servers.txt"):
Expand Down
70 changes: 39 additions & 31 deletions candig_federation/operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from authz import is_site_admin
import connexion
from werkzeug.exceptions import UnsupportedMediaType
from flask import request, Flask
from flask import Flask
from federation import FederationResponse
from network import get_registered_servers, get_registered_services, register_server, register_service, unregister_server, unregister_service
from candigv2_logging.logging import CanDIGLogger
Expand Down Expand Up @@ -44,15 +44,15 @@ def list_servers():
if servers is not None:
result = map(lambda x: x["server"], servers.values())
return list(result), 200
logger.debug(f"Couldn't list servers", request)
logger.debug(f"Couldn't list servers", connexion.request)
return {"message": "Couldn't list servers"}, 500


def add_server(register=False):
async def add_server(register=False):
"""
:return: Server added.
"""
if not is_site_admin(request):
if not is_site_admin(connexion.request):
return {"message": "User is not authorized to POST"}, 403
try:
# if register=True, list known servers in Vault and register them all
Expand All @@ -63,24 +63,25 @@ def add_server(register=False):
try:
register_server(existing_servers[server])
except Exception as e:
logger.debug(f"failed to register {existing_servers[server]['server']['id']} {type(e)} {str(e)}", request)
logger.debug(f"failed to register {existing_servers[server]['server']['id']} {type(e)} {str(e)}", connexion.request)
errors[existing_servers[server]['server']['id']] = f"{type(e)} {str(e)}"
if len(errors) > 0:
return errors, 500
except Exception as e:
logger.debug(f"Couldn't register servers: {type(e)} {str(e)}", request)
logger.debug(f"Couldn't register servers: {type(e)} {str(e)}", connexion.request)
return {"message": f"Couldn't register servers: {type(e)} {str(e)} {connexion.request}"}, 500
try:
if connexion.request.json is not None and 'server' in connexion.request.json:
new_server = connexion.request.json
req = await connexion.request.json()
if req is not None and 'server' in req:
new_server = req
if register_server(new_server) is None:
return {"message": f"Server {new_server['server']['id']} already present"}, 204
return get_registered_servers()[new_server['server']['id']]['server'], 201
except UnsupportedMediaType as e:
# this is the exception that gets thrown if the requestbody is null
return get_registered_servers(), 200
except Exception as e:
logger.debug(f"Couldn't register server", request)
logger.debug(f"Couldn't add server", connexion.request)
return {"message": f"Couldn't add server: {type(e)} {str(e)} {connexion.request}"}, 500


Expand All @@ -93,7 +94,7 @@ def get_server(server_id):
if servers is not None and server_id in servers:
return servers[server_id], 200
else:
logger.debug(f"Couldn't find server {server_id}", request)
logger.debug(f"Couldn't find server {server_id}", connexion.request)
return {"message": f"Couldn't find server {server_id}"}, 404


Expand All @@ -102,11 +103,11 @@ def delete_server(server_id):
"""
:return: Server deleted.
"""
if not is_site_admin(request):
if not is_site_admin(connexion.request):
return {"message": "User is not authorized to POST"}, 403
result = unregister_server(server_id)
if result is None:
logger.debug(f"Server not found", request)
logger.debug(f"Server not found", connexion.request)
return {"message": f"Server {server_id} not found"}, 404
return result, 200

Expand All @@ -118,7 +119,7 @@ def list_services():
services = get_registered_services()
if services is not None:
return list(services.values()), 200
logger.debug(f"Couldn't list services", request)
logger.debug(f"Couldn't list services", connexion.request)
return {"message": "Couldn't list services"}, 500


Expand All @@ -131,48 +132,54 @@ def get_service(service_id):
if services is not None and service_id in services:
return services[service_id], 200
else:
logger.debug(f"Couldn't find service {service_id}", request)
logger.debug(f"Couldn't find service {service_id}", connexion.request)
return {"message": f"Couldn't find service {service_id}"}, 404


def add_service(register=False):
async def add_service(register=False):
"""
:return: Service added.
"""
if not is_site_admin(request):
if not is_site_admin(connexion.request):
return {"message": "User is not authorized to POST"}, 403
try:
# if register=True, list known services in Vault and register them all
if register:
# if register=True, list known services in Vault and register them all
if register:
try:
existing_services = get_registered_services()
for service in existing_services:
register_service(existing_services[service])
new_service = connexion.request.json
register_service(new_service)
except Exception as e:
logger.debug(f"Couldn't register existing services", connexion.request)
return {"message": f"Couldn't register existing services: {type(e)} {str(e)}"}, 500
try:
new_service = await connexion.request.json()
if new_service is not None:
register_service(new_service)
return get_registered_services()[new_service['id']], 200
except UnsupportedMediaType as e:
# this is the exception that gets thrown if the requestbody is null
return get_registered_services(), 200
except Exception as e:
logger.debug(f"Couldn't add service", request)
logger.debug(f"Couldn't add service", connexion.request)
return {"message": f"Couldn't add service: {type(e)} {str(e)} {connexion.request}"}, 500
return get_registered_services()[new_service['id']], 200
return get_registered_services(), 200


@app.route('/services/<path:service_id>')
def delete_service(service_id):
"""
:return: Service deleted.
"""
if not is_site_admin(request):
if not is_site_admin(connexion.request):
return {"message": "User is not authorized to POST"}, 403
result = unregister_service(service_id)
if result is None:
logger.debug(f"Couldn't find service", request)
logger.debug(f"Couldn't find service", connexion.request)
return {"message": f"Service {service_id} not found"}, 404
return result, 200


def post_search():
async def post_search():
"""
Send a POST request to CanDIG services and possibly federate it.
Method defined by federation.yaml OpenAPI document.
Expand All @@ -196,8 +203,8 @@ def post_search():
ServiceName - Name of service (used for logstash tagging)
"""
try:
logger.debug("Sending federated request", request)
data = connexion.request.json
logger.debug("Sending federated request", connexion.request)
data = await connexion.request.json()
request_type = data["method"]
endpoint_path = data["path"]
if endpoint_path[0] == "/":
Expand All @@ -214,20 +221,21 @@ def post_search():
request=request_type,
endpoint_path=endpoint_path,
endpoint_payload=endpoint_payload,
request_dict=request,
request_dict=connexion.request,
endpoint_service=endpoint_service,
unsafe="unsafe" in data
)

return federation_response.get_response_object()
resp, status = await federation_response.get_response_object()
return resp, status

except Exception as e:
"""
Due to Connexion parsing the args prior this code running, it will be assumed that we
have a valid request_type, endpoint_path and endpoint_payload. A KeyError occuring here
will be due to the service dictionary receiving an invalid key.
"""
logger.error(f"{type(e)} {str(e)}", request)
logger.error(f"post_search {type(e)} {str(e)}", connexion.request)
return {
"response": f"{type(e)} {str(e)}",
"status": 404,
Expand Down
Loading