diff --git a/pghoard/webserver.py b/pghoard/webserver.py index bc0ae332..81112a37 100644 --- a/pghoard/webserver.py +++ b/pghoard/webserver.py @@ -14,6 +14,7 @@ from collections import deque from concurrent.futures import ThreadPoolExecutor from contextlib import contextmanager, suppress +from dataclasses import dataclass from http.server import BaseHTTPRequestHandler, HTTPServer from pathlib import Path from queue import Empty, Queue @@ -29,6 +30,14 @@ from pghoard.version import __version__ +@dataclass(frozen=True) +class PendingDownloadOp: + started_at: float + target_path: str + filetype: str + filename: str + + class PoolMixIn(ThreadingMixIn): def process_request(self, request, client_address): self.pool.submit(self.process_request_thread, request, client_address) @@ -100,15 +109,15 @@ def process_queue_item(self, download_result): os.unlink(download_result.payload["target_path"]) return if download_result.success: - if os.path.isfile(op["target_path"]): + if os.path.isfile(op.target_path): self.log.warning("Target path for %r already exists, skipping", key) return # verify wal src_tmp_file_path = download_result.payload["target_path"] - dst_file_path = op["target_path"] - if op["filetype"] == "xlog": + dst_file_path = op.target_path + if op.filetype == "xlog": try: - wal.verify_wal(wal_name=op["filename"], filepath=src_tmp_file_path) + wal.verify_wal(wal_name=op.filename, filepath=src_tmp_file_path) self.log.info("WAL verification successful %s", src_tmp_file_path) except ValueError: self.log.warning("WAL verification failed %s. Unlink file", src_tmp_file_path) @@ -119,7 +128,7 @@ def process_queue_item(self, download_result): metadata = download_result.payload["metadata"] or {} self.log.info( "Renamed %s to %s. Original upload from %r, hash %s:%s", download_result.payload["target_path"], - op["target_path"], metadata.get("host"), metadata.get("hash-algorithm"), metadata.get("hash") + op.target_path, metadata.get("host"), metadata.get("hash-algorithm"), metadata.get("hash") ) else: ex = download_result.exception or Error @@ -129,7 +138,7 @@ def process_queue_item(self, download_result): else: self.log.warning( "Fetching %r failed (%s), took: %.3fs", key, ex.__class__.__name__, - time.monotonic() - op["started_at"] + time.monotonic() - op.started_at ) def stop(self): @@ -379,7 +388,7 @@ def _create_fetch_operation(self, key, site, filetype, filename, max_age=-1, sup # Don't fetch again if we already have pending fetch operation unless the operation # has been ongoing longer than given max age and has potentially became stale existing = self.server.pending_download_ops.get(key) - if existing and (max_age < 0 or time.monotonic() - existing["started_at"] <= max_age): + if existing and (max_age < 0 or time.monotonic() - existing.started_at <= max_age): return xlog_dir = get_pg_wal_directory(self.server.config["backup_sites"][site]) @@ -403,7 +412,7 @@ def _create_fetch_operation(self, key, site, filetype, filename, max_age=-1, sup "Fetching site: %r, filename: %r, filetype: %r, tmp_target_path: %r", site, filename, filetype, tmp_target_path ) target_path = os.path.join(xlog_dir, "{}.pghoard.prefetch".format(filename)) - self.server.pending_download_ops[key] = dict( + self.server.pending_download_ops[key] = PendingDownloadOp( started_at=time.monotonic(), target_path=target_path, filetype=filetype, filename=filename ) self.server.transfer_queue.put(