diff --git a/tests/test_plugin.py b/tests/test_plugin.py index b98d4af..42e18a4 100644 --- a/tests/test_plugin.py +++ b/tests/test_plugin.py @@ -6,6 +6,8 @@ import json from tempfile import TemporaryDirectory +import wagglemsg + class TestPlugin(unittest.TestCase): @@ -88,7 +90,8 @@ def test_upload_file(self): upload_path.write_bytes(data) pl.upload_file(upload_path) - scope, msg = pl.outgoing_queue.get_nowait() + scope, body = pl.outgoing_queue.get_nowait() + msg = wagglemsg.load(body) self.assertEqual(scope, "all") self.assertEqual(msg.name, "upload") self.assertIsNotNone(msg.timestamp) @@ -96,6 +99,7 @@ def test_upload_file(self): self.assertIsNotNone(msg.meta) self.assertIn("filename", msg.meta) + class TestUploader(unittest.TestCase): def test_upload_file(self): diff --git a/waggle/plugin.py b/waggle/plugin.py index 9ddc063..d86085c 100644 --- a/waggle/plugin.py +++ b/waggle/plugin.py @@ -107,7 +107,7 @@ def subscribe(self, *topics): def __publish(self, name, value, meta, timestamp, scope="all", timeout=None): msg = wagglemsg.Message(name=name, value=value, timestamp=timestamp, meta=meta) logger.debug("adding message to outgoing queue: %s", msg) - self.outgoing_queue.put((scope, msg), timeout=timeout) + self.outgoing_queue.put((scope, wagglemsg.dump(msg)), timeout=timeout) def publish(self, name, value, meta={}, timestamp=None, scope="all", timeout=None): if timestamp is None: @@ -174,7 +174,7 @@ def process_subscribe_queue(): def process_publish_queue(): while self.running.is_set(): try: - scope, msg = self.outgoing_queue.get_nowait() + scope, body = self.outgoing_queue.get_nowait() except Empty: break properties = pika.BasicProperties( @@ -185,8 +185,9 @@ def process_publish_queue(): properties.app_id = self.config.app_id # NOTE app_id is used by data service to validate and tag additional metadata provided by k3s scheduler. - body = wagglemsg.dump(msg) - logger.debug("publishing message to rabbitmq: %s", msg) + if logger.isEnabledFor(logging.DEBUG): + logger.debug("publishing message to rabbitmq: %s", wagglemsg.load(body)) + channel.basic_publish( exchange="to-validator", routing_key=scope,