Skip to content

Commit

Permalink
upload event tracker
Browse files Browse the repository at this point in the history
Introduces UploadEventProgressTracker thread for monitoring the progress for each individual UploadEvent and logs a warning if an event has no progress for a specified period of time. It helps ensure that file uploads are making
progress and do not get stuck.
  • Loading branch information
kathia-barahona committed Nov 29, 2023
1 parent ae00595 commit 6dfb7e1
Show file tree
Hide file tree
Showing 7 changed files with 249 additions and 58 deletions.
2 changes: 1 addition & 1 deletion Vagrantfile
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ Vagrant.configure("2") do |config|
sed -i "s/^#create_main_cluster.*/create_main_cluster=false/g" /etc/postgresql-common/createcluster.conf
apt-get install -y python{3.8,3.9,3.10} python{3.8,3.9,3.10}-dev python{3.8,3.9,3.10}-venv
apt-get install -y postgresql-{10,11,12,13,14} postgresql-server-dev-{10,11,12,13,14}
apt-get install -y postgresql-{11,12,13,14,15} postgresql-server-dev-{11,12,13,14,15}
username="$(< /dev/urandom tr -dc a-z | head -c${1:-32};echo;)"
password=$(< /dev/urandom tr -dc _A-Z-a-z-0-9 | head -c${1:-32};echo;)
Expand Down
6 changes: 0 additions & 6 deletions pghoard/basebackup/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -316,17 +316,13 @@ def run_piped_basebackup(self):
})
metadata.update(self.metadata)

def callback(n_bytes: int) -> None:
self.metrics.increase("pghoard.basebackup_bytes_uploaded", inc_value=n_bytes, tags={"delta": False})

self.transfer_queue.put(
UploadEvent(
file_type=FileType.Basebackup,
backup_site_name=self.site,
file_path=basebackup_path,
callback_queue=self.callback_queue,
file_size=compressed_file_size,
incremental_progress_callback=callback,
source_data=stream_target,
remove_after_upload=True,
metadata=metadata
Expand Down Expand Up @@ -615,7 +611,6 @@ def run_local_tar_basebackup(self, delta: bool = False, with_delta_stats: bool =
data_file_format,
compressed_base,
delta_stats=delta_stats,
delta=delta,
file_type=FileType.Basebackup_chunk
)

Expand Down Expand Up @@ -725,7 +720,6 @@ def run_local_tar_basebackup(self, delta: bool = False, with_delta_stats: bool =
callback_queue=self.callback_queue,
chunk_path=Path(data_file_format(0)), # pylint: disable=too-many-format-args
temp_dir=compressed_base,
delta=delta,
files_to_backup=control_files,
file_type=FileType.Basebackup,
extra_metadata={
Expand Down
9 changes: 0 additions & 9 deletions pghoard/basebackup/chunks.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,6 @@ def tar_one_file(
chunk_path,
files_to_backup,
callback_queue: CallbackQueue,
delta: bool,
file_type: FileType = FileType.Basebackup_chunk,
extra_metadata: Optional[Dict[str, Any]] = None,
delta_stats: Optional[DeltaStats] = None
Expand Down Expand Up @@ -194,9 +193,6 @@ def tar_one_file(

middle_path, chunk_name = ChunkUploader.chunk_path_to_middle_path_name(Path(chunk_path), file_type)

def callback(n_bytes: int) -> None:
self.metrics.increase("pghoard.basebackup_bytes_uploaded", inc_value=n_bytes, tags={"delta": delta})

self.transfer_queue.put(
UploadEvent(
callback_queue=callback_queue,
Expand All @@ -205,7 +201,6 @@ def callback(n_bytes: int) -> None:
file_path=middle_path / chunk_name,
source_data=chunk_path,
metadata=metadata,
incremental_progress_callback=callback,
backup_site_name=self.site,
)
)
Expand All @@ -221,15 +216,13 @@ def handle_single_chunk(
chunks,
index: int,
temp_dir: Path,
delta: bool,
delta_stats: Optional[DeltaStats] = None,
file_type: FileType = FileType.Basebackup_chunk
) -> Dict[str, Any]:
one_chunk_files = chunks[index]
chunk_name, input_size, result_size = self.tar_one_file(
callback_queue=chunk_callback_queue,
chunk_path=chunk_path,
delta=delta,
temp_dir=temp_dir,
files_to_backup=one_chunk_files,
delta_stats=delta_stats,
Expand Down Expand Up @@ -270,7 +263,6 @@ def create_and_upload_chunks(
chunks,
data_file_format: Callable[[int], str],
temp_base_dir: Path,
delta: bool,
delta_stats: Optional[DeltaStats] = None,
file_type: FileType = FileType.Basebackup_chunk,
chunks_max_progress: float = 100.0
Expand Down Expand Up @@ -307,7 +299,6 @@ def create_and_upload_chunks(
chunks=chunks,
index=i,
temp_dir=temp_base_dir,
delta=delta,
delta_stats=delta_stats,
file_type=file_type
)
Expand Down
5 changes: 0 additions & 5 deletions pghoard/basebackup/delta.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,9 +194,6 @@ def progress_callback(n_bytes: int = 1) -> None:

dest_path = Path("basebackup_delta") / result_digest

def callback(n_bytes: int) -> None:
self.metrics.increase("pghoard.basebackup_bytes_uploaded", inc_value=n_bytes, tags={"delta": True})

self.transfer_queue.put(
UploadEvent(
callback_queue=callback_queue,
Expand All @@ -205,7 +202,6 @@ def callback(n_bytes: int) -> None:
backup_site_name=self.site,
metadata=metadata,
file_path=dest_path,
incremental_progress_callback=callback,
source_data=chunk_path
)
)
Expand Down Expand Up @@ -370,7 +366,6 @@ def _upload_chunks(self, delta_chunks, chunks_max_progress: float) -> Tuple[Uplo
chunk_files = self.chunk_uploader.create_and_upload_chunks(
chunks=delta_chunks,
data_file_format=self.data_file_format,
delta=True,
temp_base_dir=self.compressed_base,
file_type=FileType.Basebackup_delta_chunk,
chunks_max_progress=chunks_max_progress,
Expand Down
13 changes: 11 additions & 2 deletions pghoard/pghoard.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
is_basebackup_preserved, parse_preservation_requests, patch_basebackup_metadata_with_preservation
)
from pghoard.receivexlog import PGReceiveXLog
from pghoard.transfer import TransferAgent, TransferQueue, UploadEvent
from pghoard.transfer import (TransferAgent, TransferQueue, UploadEvent, UploadEventProgressTracker)
from pghoard.walreceiver import WALReceiver
from pghoard.webserver import WebServer

Expand Down Expand Up @@ -143,6 +143,9 @@ def __init__(self, config_path):
self.requested_basebackup_sites = set()
self.inotify_adapter = InotifyAdapter(self.compression_queue)
self.inotify = InotifyWatcher(self.inotify_adapter)

self.upload_tracker = UploadEventProgressTracker(metrics=self.metrics)

self.webserver = WebServer(
self.config, self.requested_basebackup_sites, self.compression_queue, self.transfer_queue, self.metrics
)
Expand All @@ -167,6 +170,7 @@ def __init__(self, config_path):
config=self.config,
mp_manager=self.mp_manager,
transfer_queue=self.transfer_queue,
upload_tracker=self.upload_tracker,
metrics=self.metrics,
shared_state_dict=self.transfer_agent_state
)
Expand Down Expand Up @@ -695,6 +699,7 @@ def startup_walk_for_missed_files(self):
def start_threads_on_startup(self):
# Startup threads
self.inotify.start()
self.upload_tracker.start()
self.webserver.start()
self.wal_file_deleter.start()
for compressor in self.compressors:
Expand Down Expand Up @@ -971,7 +976,11 @@ def load_config(self, _signal=None, _frame=None): # pylint: disable=unused-argu

def _get_all_threads(self):
all_threads = []
# on first config load webserver isn't initialized yet

# on first config load upload_tracker and webserver aren't initialized yet
if hasattr(self, "upload_tracker"):
all_threads.append(self.upload_tracker)

if hasattr(self, "webserver"):
all_threads.append(self.webserver)
all_threads.extend(self.basebackups.values())
Expand Down
162 changes: 155 additions & 7 deletions pghoard/transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,25 +8,28 @@
import enum
import logging
import os
import threading
import time
from contextlib import suppress
from contextlib import contextmanager, suppress
from dataclasses import dataclass
from functools import partial
from io import BytesIO
from pathlib import Path
from queue import Empty
from threading import Lock
from typing import Any, BinaryIO, Dict, Optional, Union
from typing import Any, BinaryIO, Dict, Iterator, List, Optional, Union

from rohmu import get_transfer
from rohmu.errors import FileNotFoundFromStorageError
from rohmu.object_storage.base import (BaseTransfer, IncrementalProgressCallbackType)
from rohmu.object_storage.base import BaseTransfer
from rohmu.typing import Metadata

from pghoard.common import (
CallbackEvent, CallbackQueue, FileType, PGHoardThread, Queue, QuitEvent, StrEnum, create_alert_file,
get_object_storage_config
)
from pghoard.fetcher import FileFetchManager
from pghoard.metrics import Metrics

_STATS_LOCK = Lock()
_last_stats_transmit_time = 0
Expand Down Expand Up @@ -55,7 +58,6 @@ class UploadEvent(BaseTransferEvent):
file_size: Optional[int]
remove_after_upload: bool = True
retry_number: int = 0
incremental_progress_callback: Optional[IncrementalProgressCallbackType] = None

@property
def operation(self):
Expand Down Expand Up @@ -100,15 +102,156 @@ def operation(self):
TransferQueue = Queue


@dataclass
class TransferIncrement:
total_bytes_uploaded: float
tracked_at: float = dataclasses.field(default_factory=time.monotonic)


@dataclass
class UploadEventProgress:
key: str
file_size: Optional[int]
file_type: FileType
increments: List[TransferIncrement] = dataclasses.field(default_factory=list)
started_at: float = dataclasses.field(default_factory=time.monotonic)


class UploadEventProgressTracker(PGHoardThread):
CHECK_FREQUENCY = 5.0 # check every 5 seconds for progress
WARNING_TIMEOUT = 5.0 * 60 # log a warning in case there is no progress during last 5 minutes

def __init__(self, metrics: Metrics) -> None:
self.metrics = metrics
self.log = logging.getLogger("UploadEventProgressTracker")

self.running: bool = False

self._tracked_events: Dict[str, UploadEventProgress] = {}
self._tracked_events_lock = threading.Lock()
self.log.debug("UploadEventProgressTracker initialized")

super().__init__()

def track_upload_event(self, file_key: str, file_type: FileType, file_size: Optional[int]) -> None:
with self._tracked_events_lock:
self.log.info("Tracking upload event for file %s", file_key)
self._tracked_events[file_key] = UploadEventProgress(key=file_key, file_type=file_type, file_size=file_size)

def untrack_upload_event(self, file_key: str) -> None:
if file_key not in self._tracked_events:
return

with self._tracked_events_lock:
self._tracked_events.pop(file_key)

def increment(self, file_key: str, total_bytes_uploaded: float) -> None:
metric_data = {}
with self._tracked_events_lock:
if file_key not in self._tracked_events:
raise Exception(f"UploadEvent for {file_key} is not being tracked.")

file_type = self._tracked_events[file_key].file_type
if file_type in (
FileType.Basebackup, FileType.Basebackup_chunk, FileType.Basebackup_delta, FileType.Basebackup_delta_chunk
):
metric_data = {
"metric": "pghoard.basebackup_bytes_uploaded",
"inc_value": total_bytes_uploaded,
"tags": {"delta": file_type in (FileType.Basebackup_delta, FileType.Basebackup_delta_chunk)},
}
elif file_type in (FileType.Wal, FileType.Timeline):
metric_data = {"metric": "pghoard.compressed_file_upload", "inc_value": total_bytes_uploaded}

self._tracked_events[file_key].increments.append(TransferIncrement(total_bytes_uploaded=total_bytes_uploaded))

self.metrics.increase(**metric_data)

def reset(self) -> None:
with self._tracked_events_lock:
self._tracked_events = {}
self.running = False

def run_safe(self):
try:
self.running = True

while self.running:
with self._tracked_events_lock:
self._check_increment_rate()

time.sleep(self.CHECK_FREQUENCY)
except Exception: # pylint: disable=broad-except
self.log.exception("Failed to update transfer rate %s", "pghoard.compressed_file_upload")
self.metrics.increase("pghoard.transfer_operation.errors")
self.reset()
self.stop()

self.log.debug("Quitting UploadEventProgressTracker")

def stop(self) -> None:
self.running = False

def _check_increment_rate(self) -> None:
"""
Check if the transfer operation is progressing by comparing the time elapsed since
last increment with the average time it took for previous increments. If the operation has been inactive,
a warning will be logged.
"""
now = time.monotonic()
for ue_progress in self._tracked_events.values():
last_increment_at = ue_progress.started_at
avg_rate = 0.

if ue_progress.increments:
# total "waiting" time between all increments
total_increment_diff = sum(
next_inc.tracked_at - prev_inc.tracked_at
for prev_inc, next_inc in zip(ue_progress.increments, ue_progress.increments[1:])
)
if len(ue_progress.increments) > 1:
avg_rate = total_increment_diff / (len(ue_progress.increments) - 1)
last_increment_at = ue_progress.increments[-1].tracked_at

# log warning in case we have not tracked any progress for the operation since
# the last check
if last_increment_at and (now - last_increment_at) >= avg_rate + self.WARNING_TIMEOUT:
self.log.warning(
"Upload for file %s has been inactive since %s seconds.", ue_progress.key, now - last_increment_at
)


@contextmanager
def track_upload_event(progress_tracker: UploadEventProgressTracker, file_key: str, upload_event: UploadEvent) -> Iterator:
progress_tracker.track_upload_event(
file_key=file_key,
file_type=upload_event.file_type,
file_size=upload_event.file_size,
)
try:
yield
finally:
progress_tracker.untrack_upload_event(file_key)


class TransferAgent(PGHoardThread):
def __init__(self, config, mp_manager, transfer_queue: TransferQueue, metrics, shared_state_dict):
def __init__(
self,
config,
mp_manager,
transfer_queue: TransferQueue,
upload_tracker: UploadEventProgressTracker,
metrics: Metrics,
shared_state_dict,
):
super().__init__()
self.log = logging.getLogger("TransferAgent")
self.config = config
self.metrics = metrics
self.mp_manager = mp_manager
self.fetch_manager = FileFetchManager(self.config, self.mp_manager, self.get_object_storage)
self.transfer_queue = transfer_queue
self.upload_tracker = upload_tracker
self.running = True
self.sleep = time.sleep
self.state = shared_state_dict
Expand Down Expand Up @@ -321,12 +464,16 @@ def handle_upload(self, site, key, file_to_transfer: UploadEvent):
f = file_to_transfer.source_data
else:
f = open(file_to_transfer.source_data, "rb")
with f:
with f, track_upload_event(progress_tracker=self.upload_tracker, file_key=key, upload_event=file_to_transfer):
metadata = file_to_transfer.metadata.copy()
if file_to_transfer.file_size:
metadata["Content-Length"] = str(file_to_transfer.file_size)
upload_progress_fn = partial(self.upload_tracker.increment, file_key=key)
storage.store_file_object(
key, f, metadata=metadata, upload_progress_fn=file_to_transfer.incremental_progress_callback
key,
f,
metadata=metadata,
upload_progress_fn=lambda n_bytes: upload_progress_fn(total_bytes_uploaded=n_bytes),
)
if unlink_local:
if isinstance(file_to_transfer.source_data, Path):
Expand Down Expand Up @@ -357,6 +504,7 @@ def handle_upload(self, site, key, file_to_transfer: UploadEvent):
"Problem in moving file: %r, need to retry (%s: %s)", file_to_transfer.source_data,
ex.__class__.__name__, ex
)

file_to_transfer = dataclasses.replace(file_to_transfer, retry_number=file_to_transfer.retry_number + 1)
if file_to_transfer.retry_number > self.config["upload_retries_warning_limit"]:
create_alert_file(self.config, "upload_retries_warning")
Expand Down
Loading

0 comments on commit 6dfb7e1

Please sign in to comment.