diff --git a/candig_federation/cache.db b/candig_federation/cache.db new file mode 100644 index 0000000..15c4a0a Binary files /dev/null and b/candig_federation/cache.db differ diff --git a/candig_federation/database.py b/candig_federation/database.py new file mode 100644 index 0000000..30bbc90 --- /dev/null +++ b/candig_federation/database.py @@ -0,0 +1,60 @@ +import datetime +import sqlite3 + +CACHE_FLUSH_PERIOD = datetime.timedelta(hours=1) + +def get_connection(): + con = sqlite3.connect("cache.db", check_same_thread=False) + cur = con.cursor() + return con, cur + +def initialize(): + """ + Initialize the sqlite3 database of cached requests + """ + con, cur = get_connection() + + # Delete any old cache + cur.execute("DROP TABLE IF EXISTS Cache;") + + # Create a new cache + cur.execute("CREATE TABLE Cache(hash TINYTEXT PRIMARY KEY, data MEDIUMTEXT, last_access TIMESTAMP)") + con.commit() + +def get_cache(hash): + """ + Get the given cache entry, if it exists. Does not modify the last_access entry. + Returns None if it does not exist + """ + _, cur = get_connection() + cur.execute("SELECT data FROM Cache WHERE hash=(?)", (hash,)) + res = cur.fetchone() + + # Get the data field if successful + if res is not None: + res = res[0] + + return res + +def set_cache(hash, data): + """ + Insert the given cache entry + """ + con, cur = get_connection() + cur.execute("INSERT INTO Cache VALUES(?, ?, ?)", (hash, data, datetime.datetime.now())) + con.commit() + +def touch_cache(hash): + """ + Reset the last_access on the given cache entry + """ + con, cur = get_connection() + cur.execute("UPDATE Cache SET last_access=(?) WHERE hash=(?)", (datetime.datetime.now(), hash)) + con.commit() + +def flush_cache(period=CACHE_FLUSH_PERIOD): + """ + Remove all entries in the cache that are older than a certain time period + """ + con, cur = get_connection() + cur.execute("DELETE FROM Cache WHERE last_access < (?)", (datetime.datetime.now() - period)) diff --git a/candig_federation/federation.py b/candig_federation/federation.py index c62ad7f..bcd15e0 100644 --- a/candig_federation/federation.py +++ b/candig_federation/federation.py @@ -3,11 +3,11 @@ Provides methods to handle both local and federated requests """ - import json import requests import asyncio import aiohttp +import database from network import get_registered_servers, get_registered_services from heartbeat import get_live_servers from candigv2_logging.logging import CanDIGLogger @@ -34,15 +34,21 @@ class FederationResponse: :type service: str :param return_mimetype: HTTP content-type, default is application/json :type return_mimetype: str + :param page: Page number of results in request. If given, federation will truncate federated results to the given page, and return page information + :type timeout: int + :param page_size: Number of results per page to query. Use only in conjunction with the page parameter + :type timeout: int :param timeout: Wait time before a request times out, default is 5 seconds :type timeout: int + :param unsafe: If true, shortcuts around non-responding servers by checking the heartbeat list + :type unsafe: bool """ # pylint: disable=too-many-instance-attributes # pylint: disable=too-many-arguments def __init__(self, request, endpoint_path, endpoint_payload, request_dict, endpoint_service, return_mimetype='application/json', - timeout=60, unsafe=False): + page=None, page_size=10, timeout=60, unsafe=False): """Constructor method """ self.results = {} @@ -56,6 +62,8 @@ def __init__(self, request, endpoint_path, endpoint_payload, request_dict, endpo self.request_dict = request_dict self.servers = get_registered_servers() self.services = get_registered_services() + self.page = page + self.page_size = page_size self.unsafe = unsafe try: @@ -178,6 +186,21 @@ def post_service(self, service, endpoint_path, endpoint_payload): logger.debug(self.message) return + async def get_num_results_per_server(self): + """ + For each server, grab the given number of results, loading/saving to cache when possible + """ + # Create a hash for this request, minus page/page_size + + hashable_attr = self.request + self.endpoint_path + str(self.endpoint_payload) + str(self.request_dict) + self.endpoint_service + cached_data = database.get_cache(hash(hashable_attr)) + + # TODO: Cache invalidation when? + if cached_data is not None: + # Determine how many results each server has + for server in self.servers: + + async def handle_server_request(self, request, endpoint_path, endpoint_payload, endpoint_service, header): """ Make peer server data requests and update the results and status for a FederationResponse diff --git a/candig_federation/federation.yaml b/candig_federation/federation.yaml index 3421560..f25e636 100644 --- a/candig_federation/federation.yaml +++ b/candig_federation/federation.yaml @@ -199,6 +199,19 @@ components: in: header schema: type: string + page: + name: page + description: Page number of results in request. If given, federation will truncate federated results to the given page, and return page information + in: header + schema: + type: integer + page_size: + name: page_size + description: Number of results per page to query. Use only in conjunction with the page parameter + in: header + schema: + type: integer + default: 10 register: name: register in: query diff --git a/candig_federation/operations.py b/candig_federation/operations.py index 03203e4..84287de 100644 --- a/candig_federation/operations.py +++ b/candig_federation/operations.py @@ -223,7 +223,9 @@ async def post_search(): endpoint_payload=endpoint_payload, request_dict=connexion.request, endpoint_service=endpoint_service, - unsafe="unsafe" in data + unsafe="unsafe" in data, + page=data["page"] if "page" in data else None, + page_size=data["page_size"] if "page_size" in data else 10 ) resp, status = await federation_response.get_response_object() diff --git a/candig_federation/server.py b/candig_federation/server.py index 7bdddf7..c629441 100644 --- a/candig_federation/server.py +++ b/candig_federation/server.py @@ -2,6 +2,7 @@ from flask_cors import CORS import connexion import candigv2_logging.logging +import database candigv2_logging.logging.initialize() @@ -12,6 +13,8 @@ app.add_api('federation.yaml', strict_validation=True, validate_responses=True) def main(): + database.initialize() + # Create the application instance app = connexion.FlaskApp(__name__, specification_dir='./') CORS(app.app)