From 115d2b84883939963944edc1ab63eff77dff7543 Mon Sep 17 00:00:00 2001 From: Egor Voynov Date: Mon, 25 Mar 2024 10:56:06 +0100 Subject: [PATCH] DownloadResultsProcessor split run func --- pghoard/webserver.py | 89 +++++++++++++++++++++++--------------------- 1 file changed, 46 insertions(+), 43 deletions(-) diff --git a/pghoard/webserver.py b/pghoard/webserver.py index e10c413d..bc0ae332 100644 --- a/pghoard/webserver.py +++ b/pghoard/webserver.py @@ -67,7 +67,7 @@ def __init__(self, msg=None, headers=None, status=500): class DownloadResultsProcessor(PGHoardThread): """ - Processes download_results results, validates WAL and renames tmp file to target (".prefetch") + Processes download_results queue, validates WAL and renames tmp file to target (".prefetch") """ def __init__(self, lock, log, download_results, pending_download_ops, prefetch_404): super().__init__(name=self.__class__.__name__) @@ -82,53 +82,56 @@ def run_safe(self): self.running = True while self.running: try: - result = self.download_results.get(block=True, timeout=0.1) - key = result.opaque - with self.lock: - op = self.pending_download_ops.pop(key, None) - if not op: - self.log.warning("Orphaned download operation %r completed: %r", key, result) - if result.success: - with suppress(OSError): - os.unlink(result.payload["target_path"]) - continue - if result.success: - if os.path.isfile(op["target_path"]): - self.log.warning("Target path for %r already exists, skipping", key) - continue - # verify wal - src_tmp_file_path = result.payload["target_path"] - dst_file_path = op["target_path"] - if op["filetype"] == "xlog": - try: - wal.verify_wal(wal_name=op["filename"], filepath=src_tmp_file_path) - self.log.info("WAL verification successful %s", src_tmp_file_path) - except ValueError: - self.log.warning("WAL verification failed %s. Unlink file", src_tmp_file_path) - with suppress(OSError): - os.unlink(src_tmp_file_path) - continue - os.rename(src_tmp_file_path, dst_file_path) - metadata = result.payload["metadata"] or {} - self.log.info( - "Renamed %s to %s. Original upload from %r, hash %s:%s", result.payload["target_path"], - op["target_path"], metadata.get("host"), metadata.get("hash-algorithm"), metadata.get("hash") - ) - else: - ex = result.exception or Error - if isinstance(ex, FileNotFoundFromStorageError): - # don't try prefetching this file again - self.prefetch_404.append(key) - else: - self.log.warning( - "Fetching %r failed (%s), took: %.3fs", key, ex.__class__.__name__, - time.monotonic() - op["started_at"] - ) + item = self.download_results.get(block=True, timeout=0.1) + self.process_queue_item(item) except Empty: pass except Exception: # pylint: disable=broad-except self.log.exception("Unhandled exception in %s", self.__class__.__name__) + def process_queue_item(self, download_result): + key = download_result.opaque + with self.lock: + op = self.pending_download_ops.pop(key, None) + if not op: + self.log.warning("Orphaned download operation %r completed: %r", key, download_result) + if download_result.success: + with suppress(OSError): + os.unlink(download_result.payload["target_path"]) + return + if download_result.success: + if os.path.isfile(op["target_path"]): + self.log.warning("Target path for %r already exists, skipping", key) + return + # verify wal + src_tmp_file_path = download_result.payload["target_path"] + dst_file_path = op["target_path"] + if op["filetype"] == "xlog": + try: + wal.verify_wal(wal_name=op["filename"], filepath=src_tmp_file_path) + self.log.info("WAL verification successful %s", src_tmp_file_path) + except ValueError: + self.log.warning("WAL verification failed %s. Unlink file", src_tmp_file_path) + with suppress(OSError): + os.unlink(src_tmp_file_path) + return + os.rename(src_tmp_file_path, dst_file_path) + metadata = download_result.payload["metadata"] or {} + self.log.info( + "Renamed %s to %s. Original upload from %r, hash %s:%s", download_result.payload["target_path"], + op["target_path"], metadata.get("host"), metadata.get("hash-algorithm"), metadata.get("hash") + ) + else: + ex = download_result.exception or Error + if isinstance(ex, FileNotFoundFromStorageError): + # don't try prefetching this file again + self.prefetch_404.append(key) + else: + self.log.warning( + "Fetching %r failed (%s), took: %.3fs", key, ex.__class__.__name__, + time.monotonic() - op["started_at"] + ) + def stop(self): self.running = False