Skip to content

Commit

Permalink
DownloadResultsProcessor split run func
Browse files Browse the repository at this point in the history
  • Loading branch information
egor-voynov-aiven committed Mar 25, 2024
1 parent 649dcb6 commit 115d2b8
Showing 1 changed file with 46 additions and 43 deletions.
89 changes: 46 additions & 43 deletions pghoard/webserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def __init__(self, msg=None, headers=None, status=500):

class DownloadResultsProcessor(PGHoardThread):
"""
Processes download_results results, validates WAL and renames tmp file to target (".prefetch")
Processes download_results queue, validates WAL and renames tmp file to target (".prefetch")
"""
def __init__(self, lock, log, download_results, pending_download_ops, prefetch_404):
super().__init__(name=self.__class__.__name__)
Expand All @@ -82,53 +82,56 @@ def run_safe(self):
self.running = True
while self.running:
try:
result = self.download_results.get(block=True, timeout=0.1)
key = result.opaque
with self.lock:
op = self.pending_download_ops.pop(key, None)
if not op:
self.log.warning("Orphaned download operation %r completed: %r", key, result)
if result.success:
with suppress(OSError):
os.unlink(result.payload["target_path"])
continue
if result.success:
if os.path.isfile(op["target_path"]):
self.log.warning("Target path for %r already exists, skipping", key)
continue
# verify wal
src_tmp_file_path = result.payload["target_path"]
dst_file_path = op["target_path"]
if op["filetype"] == "xlog":
try:
wal.verify_wal(wal_name=op["filename"], filepath=src_tmp_file_path)
self.log.info("WAL verification successful %s", src_tmp_file_path)
except ValueError:
self.log.warning("WAL verification failed %s. Unlink file", src_tmp_file_path)
with suppress(OSError):
os.unlink(src_tmp_file_path)
continue
os.rename(src_tmp_file_path, dst_file_path)
metadata = result.payload["metadata"] or {}
self.log.info(
"Renamed %s to %s. Original upload from %r, hash %s:%s", result.payload["target_path"],
op["target_path"], metadata.get("host"), metadata.get("hash-algorithm"), metadata.get("hash")
)
else:
ex = result.exception or Error
if isinstance(ex, FileNotFoundFromStorageError):
# don't try prefetching this file again
self.prefetch_404.append(key)
else:
self.log.warning(
"Fetching %r failed (%s), took: %.3fs", key, ex.__class__.__name__,
time.monotonic() - op["started_at"]
)
item = self.download_results.get(block=True, timeout=0.1)
self.process_queue_item(item)
except Empty:
pass
except Exception: # pylint: disable=broad-except
self.log.exception("Unhandled exception in %s", self.__class__.__name__)

def process_queue_item(self, download_result):
key = download_result.opaque
with self.lock:
op = self.pending_download_ops.pop(key, None)
if not op:
self.log.warning("Orphaned download operation %r completed: %r", key, download_result)
if download_result.success:
with suppress(OSError):
os.unlink(download_result.payload["target_path"])
return
if download_result.success:
if os.path.isfile(op["target_path"]):
self.log.warning("Target path for %r already exists, skipping", key)
return
# verify wal
src_tmp_file_path = download_result.payload["target_path"]
dst_file_path = op["target_path"]
if op["filetype"] == "xlog":
try:
wal.verify_wal(wal_name=op["filename"], filepath=src_tmp_file_path)
self.log.info("WAL verification successful %s", src_tmp_file_path)
except ValueError:
self.log.warning("WAL verification failed %s. Unlink file", src_tmp_file_path)
with suppress(OSError):
os.unlink(src_tmp_file_path)
return
os.rename(src_tmp_file_path, dst_file_path)
metadata = download_result.payload["metadata"] or {}
self.log.info(
"Renamed %s to %s. Original upload from %r, hash %s:%s", download_result.payload["target_path"],
op["target_path"], metadata.get("host"), metadata.get("hash-algorithm"), metadata.get("hash")
)
else:
ex = download_result.exception or Error
if isinstance(ex, FileNotFoundFromStorageError):
# don't try prefetching this file again
self.prefetch_404.append(key)
else:
self.log.warning(
"Fetching %r failed (%s), took: %.3fs", key, ex.__class__.__name__,
time.monotonic() - op["started_at"]
)

def stop(self):
self.running = False

Expand Down

0 comments on commit 115d2b8

Please sign in to comment.