Skip to content

Commit

Permalink
[wip]
Browse files Browse the repository at this point in the history
  • Loading branch information
OrdiNeu committed Jan 10, 2025
1 parent 7d94b16 commit 8024daf
Show file tree
Hide file tree
Showing 6 changed files with 104 additions and 3 deletions.
Binary file added candig_federation/cache.db
Binary file not shown.
60 changes: 60 additions & 0 deletions candig_federation/database.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import datetime
import sqlite3

CACHE_FLUSH_PERIOD = datetime.timedelta(hours=1)

def get_connection():
con = sqlite3.connect("cache.db", check_same_thread=False)
cur = con.cursor()
return con, cur

def initialize():
"""
Initialize the sqlite3 database of cached requests
"""
con, cur = get_connection()

# Delete any old cache
cur.execute("DROP TABLE IF EXISTS Cache;")

# Create a new cache
cur.execute("CREATE TABLE Cache(hash TINYTEXT PRIMARY KEY, data MEDIUMTEXT, last_access TIMESTAMP)")
con.commit()

def get_cache(hash):
"""
Get the given cache entry, if it exists. Does not modify the last_access entry.
Returns None if it does not exist
"""
_, cur = get_connection()
cur.execute("SELECT data FROM Cache WHERE hash=(?)", (hash,))
res = cur.fetchone()

# Get the data field if successful
if res is not None:
res = res[0]

return res

def set_cache(hash, data):
"""
Insert the given cache entry
"""
con, cur = get_connection()
cur.execute("INSERT INTO Cache VALUES(?, ?, ?)", (hash, data, datetime.datetime.now()))
con.commit()

def touch_cache(hash):
"""
Reset the last_access on the given cache entry
"""
con, cur = get_connection()
cur.execute("UPDATE Cache SET last_access=(?) WHERE hash=(?)", (datetime.datetime.now(), hash))
con.commit()

def flush_cache(period=CACHE_FLUSH_PERIOD):
"""
Remove all entries in the cache that are older than a certain time period
"""
con, cur = get_connection()
cur.execute("DELETE FROM Cache WHERE last_access < (?)", (datetime.datetime.now() - period))
27 changes: 25 additions & 2 deletions candig_federation/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
Provides methods to handle both local and federated requests
"""


import json
import requests
import asyncio
import aiohttp
import database
from network import get_registered_servers, get_registered_services
from heartbeat import get_live_servers
from candigv2_logging.logging import CanDIGLogger
Expand All @@ -34,15 +34,21 @@ class FederationResponse:
:type service: str
:param return_mimetype: HTTP content-type, default is application/json
:type return_mimetype: str
:param page: Page number of results in request. If given, federation will truncate federated results to the given page, and return page information
:type timeout: int
:param page_size: Number of results per page to query. Use only in conjunction with the page parameter
:type timeout: int
:param timeout: Wait time before a request times out, default is 5 seconds
:type timeout: int
:param unsafe: If true, shortcuts around non-responding servers by checking the heartbeat list
:type unsafe: bool
"""

# pylint: disable=too-many-instance-attributes
# pylint: disable=too-many-arguments

def __init__(self, request, endpoint_path, endpoint_payload, request_dict, endpoint_service, return_mimetype='application/json',
timeout=60, unsafe=False):
page=None, page_size=10, timeout=60, unsafe=False):
"""Constructor method
"""
self.results = {}
Expand All @@ -56,6 +62,8 @@ def __init__(self, request, endpoint_path, endpoint_payload, request_dict, endpo
self.request_dict = request_dict
self.servers = get_registered_servers()
self.services = get_registered_services()
self.page = page
self.page_size = page_size
self.unsafe = unsafe

try:
Expand Down Expand Up @@ -178,6 +186,21 @@ def post_service(self, service, endpoint_path, endpoint_payload):
logger.debug(self.message)
return

async def get_num_results_per_server(self):
"""
For each server, grab the given number of results, loading/saving to cache when possible
"""
# Create a hash for this request, minus page/page_size

hashable_attr = self.request + self.endpoint_path + str(self.endpoint_payload) + str(self.request_dict) + self.endpoint_service
cached_data = database.get_cache(hash(hashable_attr))

# TODO: Cache invalidation when?
if cached_data is not None:
# Determine how many results each server has
for server in self.servers:


async def handle_server_request(self, request, endpoint_path, endpoint_payload, endpoint_service, header):
"""
Make peer server data requests and update the results and status for a FederationResponse
Expand Down
13 changes: 13 additions & 0 deletions candig_federation/federation.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,19 @@ components:
in: header
schema:
type: string
page:
name: page
description: Page number of results in request. If given, federation will truncate federated results to the given page, and return page information
in: header
schema:
type: integer
page_size:
name: page_size
description: Number of results per page to query. Use only in conjunction with the page parameter
in: header
schema:
type: integer
default: 10
register:
name: register
in: query
Expand Down
4 changes: 3 additions & 1 deletion candig_federation/operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,9 @@ async def post_search():
endpoint_payload=endpoint_payload,
request_dict=connexion.request,
endpoint_service=endpoint_service,
unsafe="unsafe" in data
unsafe="unsafe" in data,
page=data["page"] if "page" in data else None,
page_size=data["page_size"] if "page_size" in data else 10
)

resp, status = await federation_response.get_response_object()
Expand Down
3 changes: 3 additions & 0 deletions candig_federation/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from flask_cors import CORS
import connexion
import candigv2_logging.logging
import database

candigv2_logging.logging.initialize()

Expand All @@ -12,6 +13,8 @@
app.add_api('federation.yaml', strict_validation=True, validate_responses=True)

def main():
database.initialize()

# Create the application instance
app = connexion.FlaskApp(__name__, specification_dir='./')
CORS(app.app)
Expand Down

0 comments on commit 8024daf

Please sign in to comment.