Skip to content

Commit

Permalink
[DPE-3887] Avoid replication slot deletion (#680)
Browse files Browse the repository at this point in the history
* Add integration test

Signed-off-by: Marcelo Henrique Neppel <[email protected]>

* Re-patch pod labels

Signed-off-by: Marcelo Henrique Neppel <[email protected]>

* Improve fix and add initial test logic to crash the stop hook

Signed-off-by: Marcelo Henrique Neppel <[email protected]>

* Add failture injection

Signed-off-by: Marcelo Henrique Neppel <[email protected]>

* Use pebble ready event for fix and fix test checks order

Signed-off-by: Marcelo Henrique Neppel <[email protected]>

* Simplify stop hook failure

Signed-off-by: Marcelo Henrique Neppel <[email protected]>

---------

Signed-off-by: Marcelo Henrique Neppel <[email protected]>
  • Loading branch information
marceloneppel authored Sep 17, 2024
1 parent 9cc58c7 commit ea87ed7
Show file tree
Hide file tree
Showing 4 changed files with 155 additions and 21 deletions.
44 changes: 25 additions & 19 deletions src/charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -884,6 +884,9 @@ def _create_pgdata(self, container: Container):

def _on_postgresql_pebble_ready(self, event: WorkloadEvent) -> None:
"""Event handler for PostgreSQL container on PebbleReadyEvent."""
if self._endpoint in self._endpoints:
self._fix_pod()

# TODO: move this code to an "_update_layer" method in order to also utilize it in
# config-changed hook.
# Get the postgresql container so we can configure/manipulate it.
Expand Down Expand Up @@ -1033,25 +1036,7 @@ def is_blocked(self) -> bool:
return isinstance(self.unit.status, BlockedStatus)

def _on_upgrade_charm(self, _) -> None:
# Recreate k8s resources and add labels required for replication
# when the pod loses them (like when it's deleted).
if self.upgrade.idle:
try:
self._create_services()
except ApiError:
logger.exception("failed to create k8s services")
self.unit.status = BlockedStatus("failed to create k8s services")
return

try:
self._patch_pod_labels(self.unit.name)
except ApiError as e:
logger.error("failed to patch pod")
self.unit.status = BlockedStatus(f"failed to patch pod with error {e}")
return

# Update the sync-standby endpoint in the async replication data.
self.async_replication.update_async_replication_data()
self._fix_pod()

def _patch_pod_labels(self, member: str) -> None:
"""Add labels required for replication to the current pod.
Expand Down Expand Up @@ -1263,6 +1248,27 @@ def _on_get_primary(self, event: ActionEvent) -> None:
except RetryError as e:
logger.error(f"failed to get primary with error {e}")

def _fix_pod(self) -> None:
# Recreate k8s resources and add labels required for replication
# when the pod loses them (like when it's deleted).
if self.upgrade.idle:
try:
self._create_services()
except ApiError:
logger.exception("failed to create k8s services")
self.unit.status = BlockedStatus("failed to create k8s services")
return

try:
self._patch_pod_labels(self.unit.name)
except ApiError as e:
logger.error("failed to patch pod")
self.unit.status = BlockedStatus(f"failed to patch pod with error {e}")
return

# Update the sync-standby endpoint in the async replication data.
self.async_replication.update_async_replication_data()

def _on_stop(self, _):
# Remove data from the drive when scaling down to zero to prevent
# the cluster from getting stuck when scaling back up.
Expand Down
10 changes: 10 additions & 0 deletions tests/integration/ha_tests/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -771,6 +771,16 @@ async def is_secondary_up_to_date(ops_test: OpsTest, unit_name: str, expected_wr
return True


async def remove_charm_code(ops_test: OpsTest, unit_name: str) -> None:
"""Remove src/charm.py from the PostgreSQL unit."""
await run_command_on_unit(
ops_test,
unit_name,
f'rm /var/lib/juju/agents/unit-{unit_name.replace("/", "-")}/charm/src/charm.py',
"charm",
)


async def send_signal_to_process(
ops_test: OpsTest, unit_name: str, process: str, signal: str, use_ssh: bool = False
) -> None:
Expand Down
115 changes: 115 additions & 0 deletions tests/integration/ha_tests/test_restart.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
#!/usr/bin/env python3
# Copyright 2024 Canonical Ltd.
# See LICENSE file for licensing details.
import logging

import pytest as pytest
from kubernetes import config, dynamic
from kubernetes.client import api_client
from pytest_operator.plugin import OpsTest

from ..helpers import (
APPLICATION_NAME,
DATABASE_APP_NAME,
app_name,
build_and_deploy,
get_application_units,
get_cluster_members,
get_primary,
get_unit_address,
)
from .helpers import (
are_writes_increasing,
check_writes,
remove_charm_code,
start_continuous_writes,
)

logger = logging.getLogger(__name__)


CLUSTER_SIZE = 3


@pytest.mark.group(1)
@pytest.mark.abort_on_fail
async def test_deploy(ops_test: OpsTest) -> None:
"""Build and deploy a PostgreSQL cluster and a test application."""
await build_and_deploy(ops_test, CLUSTER_SIZE, wait_for_idle=False)
if not await app_name(ops_test, APPLICATION_NAME):
await ops_test.model.deploy(APPLICATION_NAME, num_units=1)

async with ops_test.fast_forward():
await ops_test.model.wait_for_idle(
apps=[DATABASE_APP_NAME, APPLICATION_NAME],
status="active",
timeout=1000,
raise_on_error=False,
)


@pytest.mark.group(1)
@pytest.mark.abort_on_fail
async def test_restart(ops_test: OpsTest, continuous_writes) -> None:
"""Test restart of all the units simultaneously."""
logger.info("starting continuous writes to the database")
await start_continuous_writes(ops_test, DATABASE_APP_NAME)

logger.info("checking whether writes are increasing")
await are_writes_increasing(ops_test)

logger.info(
"removing charm code from one non-primary unit to simulate a crash and prevent firing the update-charm hook"
)
primary = await get_primary(ops_test)
status = await ops_test.model.get_status()
for unit in status.applications[DATABASE_APP_NAME].units:
if unit != primary:
non_primary = unit
break
await remove_charm_code(ops_test, non_primary)
logger.info(f"removed charm code from {non_primary}")

logger.info("restarting all the units by deleting their pods")
client = dynamic.DynamicClient(api_client.ApiClient(configuration=config.load_kube_config()))
api = client.resources.get(api_version="v1", kind="Pod")
api.delete(
namespace=ops_test.model.info.name,
label_selector=f"app.kubernetes.io/name={DATABASE_APP_NAME}",
)
await ops_test.model.block_until(
lambda: all(
unit.workload_status == "error"
for unit in ops_test.model.units.values()
if unit.name == non_primary
)
)

# Resolve the error on the non-primary unit.
for unit in ops_test.model.units.values():
if unit.name == non_primary and unit.workload_status == "error":
logger.info(f"resolving {non_primary} error")
await unit.resolved(retry=False)
break

async with ops_test.fast_forward():
await ops_test.model.wait_for_idle(
apps=[DATABASE_APP_NAME], status="active", raise_on_error=False, timeout=300
)

# Check that all replication slots are present in the primary
# (by checking the list of cluster members).
logger.info(
"checking that all the replication slots are present in the primary by checking the list of cluster members"
)
primary = await get_primary(ops_test)
address = await get_unit_address(ops_test, primary)
assert get_cluster_members(address) == get_application_units(ops_test, DATABASE_APP_NAME)

logger.info("ensure continuous_writes after the crashed unit")
await are_writes_increasing(ops_test)

# Verify that no writes to the database were missed after stopping the writes
# (check that all the units have all the writes).
logger.info("checking whether no writes were lost")
await check_writes(ops_test)
7 changes: 5 additions & 2 deletions tests/integration/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -666,18 +666,21 @@ def resource_exists(client: Client, resource: GenericNamespacedResource) -> bool
return False


async def run_command_on_unit(ops_test: OpsTest, unit_name: str, command: str) -> str:
async def run_command_on_unit(
ops_test: OpsTest, unit_name: str, command: str, container: str = "postgresql"
) -> str:
"""Run a command on a specific unit.
Args:
ops_test: The ops test framework instance
unit_name: The name of the unit to run the command on
command: The command to run
container: The container to run the command in (default: postgresql)
Returns:
the command output if it succeeds, otherwise raises an exception.
"""
complete_command = f"ssh --container postgresql {unit_name} {command}"
complete_command = f"ssh --container {container} {unit_name} {command}"
return_code, stdout, stderr = await ops_test.juju(*complete_command.split())
if return_code != 0:
raise Exception(
Expand Down

0 comments on commit ea87ed7

Please sign in to comment.