Skip to content

Commit

Permalink
Merge pull request #63 from CanDIG/feature/safe-check
Browse files Browse the repository at this point in the history
Add the ability to send requests with "safe" checking, exiting early if the service info cannot be quickly communicated with
  • Loading branch information
OrdiNeu authored Aug 21, 2024
2 parents 16fd64d + 7d7cd82 commit 3ce8939
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 6 deletions.
21 changes: 16 additions & 5 deletions candig_federation/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import requests
from requests_futures.sessions import FuturesSession
from network import get_registered_servers, get_registered_services
from heartbeat import get_live_servers
from candigv2_logging.logging import CanDIGLogger


Expand Down Expand Up @@ -41,7 +42,7 @@ class FederationResponse:
# pylint: disable=too-many-arguments

def __init__(self, request, endpoint_path, endpoint_payload, request_dict, endpoint_service, return_mimetype='application/json',
timeout=60):
timeout=60, safe=False):
"""Constructor method
"""
self.results = {}
Expand All @@ -55,6 +56,7 @@ def __init__(self, request, endpoint_path, endpoint_payload, request_dict, endpo
self.request_dict = request_dict
self.servers = get_registered_servers()
self.services = get_registered_services()
self.safe = safe

try:
self.token = self.request_dict.headers['Authorization']
Expand Down Expand Up @@ -196,6 +198,7 @@ def handle_server_request(self, request, endpoint_path, endpoint_payload, endpoi
:type header: object
:return: List of ResponseObjects, this specific return is used only in testing
"""
logger.info(str(header))
future_responses = self.async_requests(request=request,
header=header,
endpoint_payload=endpoint_payload,
Expand Down Expand Up @@ -223,9 +226,12 @@ def handle_server_request(self, request, endpoint_path, endpoint_payload, endpoi
if isinstance(future_response, requests.exceptions.ConnectionError):
self.status[future_response_id] = 404
self.message[future_response_id] = f'Connection Error. Peer server may be down. Location: {location["name"]}, {location["province"]}'
if isinstance(future_response, requests.exceptions.Timeout):
elif isinstance(future_response, requests.exceptions.Timeout):
self.status[future_response_id] = 504
self.message[future_response_id] = f'Peer server timed out, it may be down. Location: {location["name"]}, {location["province"]}'
else:
self.status[future_response_id] = 500
self.message[future_response_id] = f"handle_server_request failed on {future_response_id}, federation = {self.header['Federation']}: {future_response}"
continue
except requests.exceptions.ConnectionError:
self.status[future_response_id] = 404
Expand Down Expand Up @@ -270,12 +276,17 @@ def async_requests(self, request, endpoint_path, endpoint_payload, endpoint_serv

for server in self.servers.values():
try:
# self.announce_fed_out(request_type, url, endpoint_path, endpoint_payload)
response = {}
url = f"{server['server']['url']}/v1/fanout"
response["response"] = async_session.post(url, json=args, headers=header, timeout=self.timeout)
response["location"] = server['server']["location"]

if self.safe and not server['server']['id'] in get_live_servers():
# Do not ping servers that are not live according to the heartbeat service
response["response"] = f"Safe check abort: {server['server']['id']} is assumed to be down"
else:
# self.announce_fed_out(request_type, url, endpoint_path, endpoint_payload)
url = f"{server['server']['url']}/v1/fanout"
response["response"] = async_session.post(url, json=args, headers=header, timeout=self.timeout)

responses[server['server']['id']] = response

except Exception as e:
Expand Down
44 changes: 44 additions & 0 deletions candig_federation/heartbeat.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
#!/usr/bin/env python3
import time
from network import get_registered_servers
import requests
import os.path

def check_pulse():
servers = get_registered_servers()
if len(servers) == 0:
return
# Determine which sites we have access to
live_servers = []
log = ""
try:
for server in servers.values():
url = f"{server['server']['url']}/v1/service-info"
log += f"\ntesting {url}"
service_info = requests.get(url, timeout=2)
if service_info.ok:
live_servers.append(server['server']['id'])

# Determine whether or not those sites are available by pinging Federation service-info
with open('/app/federation/live_servers.txt', 'w') as f:
f.write("|".join(live_servers))
except Exception as e:
log += "\n" + str(e)

with open('/app/federation/log.txt', 'w') as f:
f.write(log)
f.write(str(e))


def get_live_servers():
live_servers = []
if os.path.isfile("/app/federation/live_servers.txt"):
with open('/app/federation/live_servers.txt', 'r') as f:
live_servers_str = f.read().strip()
live_servers = live_servers_str.split('|')
return live_servers

if __name__ == "__main__":
while(True):
check_pulse()
time.sleep(30)
3 changes: 2 additions & 1 deletion candig_federation/operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,8 @@ def post_search():
endpoint_path=endpoint_path,
endpoint_payload=endpoint_payload,
request_dict=request,
endpoint_service=endpoint_service
endpoint_service=endpoint_service,
safe="safe" in data
)

return federation_response.get_response_object()
Expand Down
2 changes: 2 additions & 0 deletions entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,7 @@ fi
# use the following for development
#python3 -m candig_federation

python candig_federation/heartbeat.py &

# use the following instead for production deployment
uwsgi federation.ini

0 comments on commit 3ce8939

Please sign in to comment.