diff --git a/stratis_cli_cert.py b/stratis_cli_cert.py index 20a7eb6..aa0b81f 100644 --- a/stratis_cli_cert.py +++ b/stratis_cli_cert.py @@ -184,7 +184,8 @@ def setUp(self): """ super().setUp() - RunPostTestChecks.setUp(self) + self._post_test_checks = RunPostTestChecks() + self._post_test_checks.setUp() def tearDown(self): """ @@ -194,7 +195,7 @@ def tearDown(self): """ super().tearDown() - RunPostTestChecks.tearDown(self) + self._post_test_checks.tearDown() def _test_permissions(self, command_line, permissions, exp_stdout_empty): """ diff --git a/stratisd_cert.py b/stratisd_cert.py index c2b9538..47aeaf7 100644 --- a/stratisd_cert.py +++ b/stratisd_cert.py @@ -180,7 +180,8 @@ def setUp(self): """ super().setUp() - RunPostTestChecks.setUp(self) + self._post_test_checks = RunPostTestChecks() + self._post_test_checks.setUp() def tearDown(self): """ @@ -190,7 +191,7 @@ def tearDown(self): """ super().tearDown() - RunPostTestChecks.tearDown(self) + self._post_test_checks.tearDown() def _unittest_set_property( self, object_path, param_iface, dbus_param, dbus_value, exception_name diff --git a/testlib/dbus.py b/testlib/dbus.py index 40a07f5..3a96adc 100644 --- a/testlib/dbus.py +++ b/testlib/dbus.py @@ -282,6 +282,20 @@ def pool_uuid(pool_path): return iface.Get(StratisDbus._POOL_IFACE, "Uuid", timeout=StratisDbus._TIMEOUT) + @staticmethod + def pool_encrypted(pool_path): + """ + Find a pool Encrypted value given an object path. + """ + iface = dbus.Interface( + StratisDbus._BUS.get_object(StratisDbus._BUS_NAME, pool_path), + dbus.PROPERTIES_IFACE, + ) + + return iface.Get( + StratisDbus._POOL_IFACE, "Encrypted", timeout=StratisDbus._TIMEOUT + ) + @staticmethod def pool_create( pool_name, diff --git a/testlib/infra.py b/testlib/infra.py index 812653e..a9539e5 100644 --- a/testlib/infra.py +++ b/testlib/infra.py @@ -221,11 +221,97 @@ class PoolMetadataMonitor(unittest.TestCase): Manage verification of consistency of pool-level metadata. """ - def run_check(self): + def _check_thin_meta_allocations(self, metadata): + """ + Check whether sizes of thin meta and thin meta spare match. + """ + (thin_meta_size, thin_meta_spare_size) = [ + sum(x[1] for x in entries) + for entries in [ + metadata["flex_devs"]["thin_meta_dev"], + metadata["flex_devs"]["thin_meta_dev_spare"], + ] + ] + + self.assertEqual( + thin_meta_size, + thin_meta_spare_size, + "Total size of thin meta device is not equal to " + "total size of thin meta spare device.", + ) + + def _check_encryption_information_consistency(self, pool_object_path, metadata): + """ + Check whether D-Bus and metadata agree about encryption state of pool. + """ + encrypted = bool(StratisDbus.pool_encrypted(pool_object_path)) + features = metadata.get("features") + + if encrypted: + self.assertIsNotNone(features) + self.assertIn("Encryption", metadata["features"]) + elif features is not None: + self.assertNotIn("Encryption", metadata["features"]) + + def _check_crypt_meta_allocs(self, metadata): + """ + Check that all crypt metadata allocs exist and have non-zero length. + """ + crypt_meta_allocs = metadata["backstore"]["cap"].get("crypt_meta_allocs") + self.assertIsNotNone(crypt_meta_allocs) + self.assertIsInstance(crypt_meta_allocs, list) + self.assertGreater(len(crypt_meta_allocs), 0) + + crypt_meta_allocs = crypt_meta_allocs[0] + self.assertIsInstance(crypt_meta_allocs, list) + self.assertEqual(crypt_meta_allocs[0], 0) + self.assertGreater(crypt_meta_allocs[1], 0) + + def _check_raid_meta_allocs(self, metadata): + """ + Check that all raid_meta_allocs exist and have non-zero length. + """ + for raid_meta_allocs in [ + a["raid_meta_allocs"] + for a in metadata["backstore"]["data_tier"]["blockdev"]["devs"] + ]: + self.assertIsNotNone(raid_meta_allocs) + self.assertIsInstance(raid_meta_allocs, list) + self.assertGreater(len(raid_meta_allocs), 0) + + raid_meta_allocs = raid_meta_allocs[0] + self.assertIsInstance(raid_meta_allocs, list) + self.assertGreater(raid_meta_allocs[0], 0) + self.assertGreater(raid_meta_allocs[1], 0) + + def _check_integrity_meta_allocs(self, metadata): + """ + Check that all integrity_meta_allocs exist and have non-zero length. + """ + for integrity_meta_allocs in [ + a["integrity_meta_allocs"] + for a in metadata["backstore"]["data_tier"]["blockdev"]["devs"] + ]: + self.assertIsNotNone(integrity_meta_allocs) + self.assertIsInstance(integrity_meta_allocs, list) + self.assertGreater(len(integrity_meta_allocs), 0) + + integrity_meta_allocs = integrity_meta_allocs[0] + self.assertIsInstance(integrity_meta_allocs, list) + self.assertGreater(integrity_meta_allocs[0], 0) + self.assertGreater(integrity_meta_allocs[1], 0) + + def run_check(self, stop_time): """ Run the check. + + :param int stop_time: the time the test completed """ if PoolMetadataMonitor.verify: # pylint: disable=no-member + + # Wait for D-Bus to settle, so D-Bus and metadata can be compared + time.sleep(sleep_time(stop_time, 16)) + for object_path, _ in StratisDbus.pool_list(): (current, current_return_code, current_message) = ( StratisDbus.pool_get_metadata(object_path) @@ -243,20 +329,14 @@ def run_check(self): msg="previously written metadata and current metadata are not the same", ) - (thin_meta_size, thin_meta_spare_size) = [ - sum(x[1] for x in entries) - for entries in [ - written["flex_devs"]["thin_meta_dev"], - written["flex_devs"]["thin_meta_dev_spare"], - ] - ] + self._check_thin_meta_allocations(written) - self.assertEqual( - thin_meta_size, - thin_meta_spare_size, - "Total size of thin meta device is not equal to " - "total size of thin meta spare device.", - ) + self._check_encryption_information_consistency(object_path, written) + self._check_crypt_meta_allocs(written) + + self._check_raid_meta_allocs(written) + + self._check_integrity_meta_allocs(written) else: current_message = ( @@ -570,20 +650,17 @@ class RunPostTestChecks(unittest.TestCase): """ def setUp(self): - DbusMonitor.setUp(self) + self.dbus_monitor = DbusMonitor() + self.dbus_monitor.setUp() def tearDown(self): stop_time = time.monotonic_ns() + self.dbus_monitor.run_check(stop_time) - SysfsMonitor.run_check(self) - - SymlinkMonitor.run_check(self) - - DbusMonitor.run_check(self, stop_time) - - FilesystemSymlinkMonitor.run_check(self, stop_time) - - PoolMetadataMonitor.run_check(self) + SysfsMonitor().run_check() + SymlinkMonitor().run_check() + FilesystemSymlinkMonitor().run_check(stop_time) + PoolMetadataMonitor().run_check(stop_time) @staticmethod def set_from_post_test_check_option(post_test_check):