Skip to content

Commit

Permalink
change dict to dataclass for pendind_download_ops
Browse files Browse the repository at this point in the history
  • Loading branch information
egor-voynov-aiven committed Mar 25, 2024
1 parent 115d2b8 commit 6ebf4ea
Showing 1 changed file with 17 additions and 8 deletions.
25 changes: 17 additions & 8 deletions pghoard/webserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from collections import deque
from concurrent.futures import ThreadPoolExecutor
from contextlib import contextmanager, suppress
from dataclasses import dataclass
from http.server import BaseHTTPRequestHandler, HTTPServer
from pathlib import Path
from queue import Empty, Queue
Expand All @@ -29,6 +30,14 @@
from pghoard.version import __version__


@dataclass(frozen=True)
class PendingDownloadOp:
started_at: float
target_path: str
filetype: str
filename: str


class PoolMixIn(ThreadingMixIn):
def process_request(self, request, client_address):
self.pool.submit(self.process_request_thread, request, client_address)
Expand Down Expand Up @@ -100,15 +109,15 @@ def process_queue_item(self, download_result):
os.unlink(download_result.payload["target_path"])
return
if download_result.success:
if os.path.isfile(op["target_path"]):
if os.path.isfile(op.target_path):
self.log.warning("Target path for %r already exists, skipping", key)
return
# verify wal
src_tmp_file_path = download_result.payload["target_path"]
dst_file_path = op["target_path"]
if op["filetype"] == "xlog":
dst_file_path = op.target_path
if op.filetype == "xlog":
try:
wal.verify_wal(wal_name=op["filename"], filepath=src_tmp_file_path)
wal.verify_wal(wal_name=op.filename, filepath=src_tmp_file_path)
self.log.info("WAL verification successful %s", src_tmp_file_path)
except ValueError:
self.log.warning("WAL verification failed %s. Unlink file", src_tmp_file_path)
Expand All @@ -119,7 +128,7 @@ def process_queue_item(self, download_result):
metadata = download_result.payload["metadata"] or {}
self.log.info(
"Renamed %s to %s. Original upload from %r, hash %s:%s", download_result.payload["target_path"],
op["target_path"], metadata.get("host"), metadata.get("hash-algorithm"), metadata.get("hash")
op.target_path, metadata.get("host"), metadata.get("hash-algorithm"), metadata.get("hash")
)
else:
ex = download_result.exception or Error
Expand All @@ -129,7 +138,7 @@ def process_queue_item(self, download_result):
else:
self.log.warning(
"Fetching %r failed (%s), took: %.3fs", key, ex.__class__.__name__,
time.monotonic() - op["started_at"]
time.monotonic() - op.started_at
)

def stop(self):
Expand Down Expand Up @@ -379,7 +388,7 @@ def _create_fetch_operation(self, key, site, filetype, filename, max_age=-1, sup
# Don't fetch again if we already have pending fetch operation unless the operation
# has been ongoing longer than given max age and has potentially became stale
existing = self.server.pending_download_ops.get(key)
if existing and (max_age < 0 or time.monotonic() - existing["started_at"] <= max_age):
if existing and (max_age < 0 or time.monotonic() - existing.started_at <= max_age):
return

xlog_dir = get_pg_wal_directory(self.server.config["backup_sites"][site])
Expand All @@ -403,7 +412,7 @@ def _create_fetch_operation(self, key, site, filetype, filename, max_age=-1, sup
"Fetching site: %r, filename: %r, filetype: %r, tmp_target_path: %r", site, filename, filetype, tmp_target_path
)
target_path = os.path.join(xlog_dir, "{}.pghoard.prefetch".format(filename))
self.server.pending_download_ops[key] = dict(
self.server.pending_download_ops[key] = PendingDownloadOp(
started_at=time.monotonic(), target_path=target_path, filetype=filetype, filename=filename
)
self.server.transfer_queue.put(
Expand Down

0 comments on commit 6ebf4ea

Please sign in to comment.