From ea87ed7eb9c42f96eb112ac27b266ad49db61648 Mon Sep 17 00:00:00 2001 From: Marcelo Henrique Neppel Date: Tue, 17 Sep 2024 11:27:22 -0300 Subject: [PATCH] [DPE-3887] Avoid replication slot deletion (#680) * Add integration test Signed-off-by: Marcelo Henrique Neppel * Re-patch pod labels Signed-off-by: Marcelo Henrique Neppel * Improve fix and add initial test logic to crash the stop hook Signed-off-by: Marcelo Henrique Neppel * Add failture injection Signed-off-by: Marcelo Henrique Neppel * Use pebble ready event for fix and fix test checks order Signed-off-by: Marcelo Henrique Neppel * Simplify stop hook failure Signed-off-by: Marcelo Henrique Neppel --------- Signed-off-by: Marcelo Henrique Neppel --- src/charm.py | 44 ++++---- tests/integration/ha_tests/helpers.py | 10 ++ tests/integration/ha_tests/test_restart.py | 115 +++++++++++++++++++++ tests/integration/helpers.py | 7 +- 4 files changed, 155 insertions(+), 21 deletions(-) create mode 100644 tests/integration/ha_tests/test_restart.py diff --git a/src/charm.py b/src/charm.py index 8628ed2627..e264314281 100755 --- a/src/charm.py +++ b/src/charm.py @@ -884,6 +884,9 @@ def _create_pgdata(self, container: Container): def _on_postgresql_pebble_ready(self, event: WorkloadEvent) -> None: """Event handler for PostgreSQL container on PebbleReadyEvent.""" + if self._endpoint in self._endpoints: + self._fix_pod() + # TODO: move this code to an "_update_layer" method in order to also utilize it in # config-changed hook. # Get the postgresql container so we can configure/manipulate it. @@ -1033,25 +1036,7 @@ def is_blocked(self) -> bool: return isinstance(self.unit.status, BlockedStatus) def _on_upgrade_charm(self, _) -> None: - # Recreate k8s resources and add labels required for replication - # when the pod loses them (like when it's deleted). - if self.upgrade.idle: - try: - self._create_services() - except ApiError: - logger.exception("failed to create k8s services") - self.unit.status = BlockedStatus("failed to create k8s services") - return - - try: - self._patch_pod_labels(self.unit.name) - except ApiError as e: - logger.error("failed to patch pod") - self.unit.status = BlockedStatus(f"failed to patch pod with error {e}") - return - - # Update the sync-standby endpoint in the async replication data. - self.async_replication.update_async_replication_data() + self._fix_pod() def _patch_pod_labels(self, member: str) -> None: """Add labels required for replication to the current pod. @@ -1263,6 +1248,27 @@ def _on_get_primary(self, event: ActionEvent) -> None: except RetryError as e: logger.error(f"failed to get primary with error {e}") + def _fix_pod(self) -> None: + # Recreate k8s resources and add labels required for replication + # when the pod loses them (like when it's deleted). + if self.upgrade.idle: + try: + self._create_services() + except ApiError: + logger.exception("failed to create k8s services") + self.unit.status = BlockedStatus("failed to create k8s services") + return + + try: + self._patch_pod_labels(self.unit.name) + except ApiError as e: + logger.error("failed to patch pod") + self.unit.status = BlockedStatus(f"failed to patch pod with error {e}") + return + + # Update the sync-standby endpoint in the async replication data. + self.async_replication.update_async_replication_data() + def _on_stop(self, _): # Remove data from the drive when scaling down to zero to prevent # the cluster from getting stuck when scaling back up. diff --git a/tests/integration/ha_tests/helpers.py b/tests/integration/ha_tests/helpers.py index c7b1adfc12..a88b56c054 100644 --- a/tests/integration/ha_tests/helpers.py +++ b/tests/integration/ha_tests/helpers.py @@ -771,6 +771,16 @@ async def is_secondary_up_to_date(ops_test: OpsTest, unit_name: str, expected_wr return True +async def remove_charm_code(ops_test: OpsTest, unit_name: str) -> None: + """Remove src/charm.py from the PostgreSQL unit.""" + await run_command_on_unit( + ops_test, + unit_name, + f'rm /var/lib/juju/agents/unit-{unit_name.replace("/", "-")}/charm/src/charm.py', + "charm", + ) + + async def send_signal_to_process( ops_test: OpsTest, unit_name: str, process: str, signal: str, use_ssh: bool = False ) -> None: diff --git a/tests/integration/ha_tests/test_restart.py b/tests/integration/ha_tests/test_restart.py new file mode 100644 index 0000000000..e64f4a1833 --- /dev/null +++ b/tests/integration/ha_tests/test_restart.py @@ -0,0 +1,115 @@ +#!/usr/bin/env python3 +# Copyright 2024 Canonical Ltd. +# See LICENSE file for licensing details. +import logging + +import pytest as pytest +from kubernetes import config, dynamic +from kubernetes.client import api_client +from pytest_operator.plugin import OpsTest + +from ..helpers import ( + APPLICATION_NAME, + DATABASE_APP_NAME, + app_name, + build_and_deploy, + get_application_units, + get_cluster_members, + get_primary, + get_unit_address, +) +from .helpers import ( + are_writes_increasing, + check_writes, + remove_charm_code, + start_continuous_writes, +) + +logger = logging.getLogger(__name__) + + +CLUSTER_SIZE = 3 + + +@pytest.mark.group(1) +@pytest.mark.abort_on_fail +async def test_deploy(ops_test: OpsTest) -> None: + """Build and deploy a PostgreSQL cluster and a test application.""" + await build_and_deploy(ops_test, CLUSTER_SIZE, wait_for_idle=False) + if not await app_name(ops_test, APPLICATION_NAME): + await ops_test.model.deploy(APPLICATION_NAME, num_units=1) + + async with ops_test.fast_forward(): + await ops_test.model.wait_for_idle( + apps=[DATABASE_APP_NAME, APPLICATION_NAME], + status="active", + timeout=1000, + raise_on_error=False, + ) + + +@pytest.mark.group(1) +@pytest.mark.abort_on_fail +async def test_restart(ops_test: OpsTest, continuous_writes) -> None: + """Test restart of all the units simultaneously.""" + logger.info("starting continuous writes to the database") + await start_continuous_writes(ops_test, DATABASE_APP_NAME) + + logger.info("checking whether writes are increasing") + await are_writes_increasing(ops_test) + + logger.info( + "removing charm code from one non-primary unit to simulate a crash and prevent firing the update-charm hook" + ) + primary = await get_primary(ops_test) + status = await ops_test.model.get_status() + for unit in status.applications[DATABASE_APP_NAME].units: + if unit != primary: + non_primary = unit + break + await remove_charm_code(ops_test, non_primary) + logger.info(f"removed charm code from {non_primary}") + + logger.info("restarting all the units by deleting their pods") + client = dynamic.DynamicClient(api_client.ApiClient(configuration=config.load_kube_config())) + api = client.resources.get(api_version="v1", kind="Pod") + api.delete( + namespace=ops_test.model.info.name, + label_selector=f"app.kubernetes.io/name={DATABASE_APP_NAME}", + ) + await ops_test.model.block_until( + lambda: all( + unit.workload_status == "error" + for unit in ops_test.model.units.values() + if unit.name == non_primary + ) + ) + + # Resolve the error on the non-primary unit. + for unit in ops_test.model.units.values(): + if unit.name == non_primary and unit.workload_status == "error": + logger.info(f"resolving {non_primary} error") + await unit.resolved(retry=False) + break + + async with ops_test.fast_forward(): + await ops_test.model.wait_for_idle( + apps=[DATABASE_APP_NAME], status="active", raise_on_error=False, timeout=300 + ) + + # Check that all replication slots are present in the primary + # (by checking the list of cluster members). + logger.info( + "checking that all the replication slots are present in the primary by checking the list of cluster members" + ) + primary = await get_primary(ops_test) + address = await get_unit_address(ops_test, primary) + assert get_cluster_members(address) == get_application_units(ops_test, DATABASE_APP_NAME) + + logger.info("ensure continuous_writes after the crashed unit") + await are_writes_increasing(ops_test) + + # Verify that no writes to the database were missed after stopping the writes + # (check that all the units have all the writes). + logger.info("checking whether no writes were lost") + await check_writes(ops_test) diff --git a/tests/integration/helpers.py b/tests/integration/helpers.py index 94cc9a822c..4997b125d9 100644 --- a/tests/integration/helpers.py +++ b/tests/integration/helpers.py @@ -666,18 +666,21 @@ def resource_exists(client: Client, resource: GenericNamespacedResource) -> bool return False -async def run_command_on_unit(ops_test: OpsTest, unit_name: str, command: str) -> str: +async def run_command_on_unit( + ops_test: OpsTest, unit_name: str, command: str, container: str = "postgresql" +) -> str: """Run a command on a specific unit. Args: ops_test: The ops test framework instance unit_name: The name of the unit to run the command on command: The command to run + container: The container to run the command in (default: postgresql) Returns: the command output if it succeeds, otherwise raises an exception. """ - complete_command = f"ssh --container postgresql {unit_name} {command}" + complete_command = f"ssh --container {container} {unit_name} {command}" return_code, stdout, stderr = await ops_test.juju(*complete_command.split()) if return_code != 0: raise Exception(