diff --git a/testlib/dbus.py b/testlib/dbus.py index 40a07f5..3a96adc 100644 --- a/testlib/dbus.py +++ b/testlib/dbus.py @@ -282,6 +282,20 @@ def pool_uuid(pool_path): return iface.Get(StratisDbus._POOL_IFACE, "Uuid", timeout=StratisDbus._TIMEOUT) + @staticmethod + def pool_encrypted(pool_path): + """ + Find a pool Encrypted value given an object path. + """ + iface = dbus.Interface( + StratisDbus._BUS.get_object(StratisDbus._BUS_NAME, pool_path), + dbus.PROPERTIES_IFACE, + ) + + return iface.Get( + StratisDbus._POOL_IFACE, "Encrypted", timeout=StratisDbus._TIMEOUT + ) + @staticmethod def pool_create( pool_name, diff --git a/testlib/infra.py b/testlib/infra.py index 996f045..df8f6ed 100644 --- a/testlib/infra.py +++ b/testlib/infra.py @@ -240,6 +240,33 @@ def _check_thin_meta_allocations(self, metadata): "total size of thin meta spare device.", ) + def _check_encryption_information_consistency(self, pool_object_path, metadata): + """ + Check whether D-Bus and metadata agree about encryption state of pool. + """ + encrypted = bool(StratisDbus.pool_encrypted(pool_object_path)) + features = metadata.get("features") + + if encrypted: + self.assertIsNotNone(features) + self.assertIn("Encryption", metadata["features"]) + elif features is not None: + self.assertNotIn("Encryption", metadata["features"]) + + def _check_crypt_metadata_allocs(self, metadata): + """ + Check that all metadata allocs exist and are non-zero. + """ + crypt_meta_allocs = metadata["backstore"]["cap"].get("crypt_meta_allocs") + self.assertIsNotNone(crypt_meta_allocs) + self.assertIsInstance(crypt_meta_allocs, list) + self.assertEqual(len(crypt_meta_allocs), 1) + + crypt_meta_allocs = crypt_meta_allocs[0] + self.assertIsInstance(crypt_meta_allocs, list) + self.assertEqual(crypt_meta_allocs[0], 0) + self.assertGreater(crypt_meta_allocs[1], 0) + def run_check(self, stop_time): """ Run the check. @@ -270,6 +297,9 @@ def run_check(self, stop_time): self._check_thin_meta_allocations(written) + self._check_encryption_information_consistency(object_path, written) + self._check_crypt_metadata_allocs(written) + else: current_message = ( "" if current_return_code == _OK else current_message