From 0dcb5538e163e0296f2b217df2ea2c1bcf597d60 Mon Sep 17 00:00:00 2001 From: Sean Shahkarami Date: Mon, 11 Jul 2022 15:27:03 -0500 Subject: [PATCH] fixed consumer queue --- src/waggle/plugin/plugin.py | 3 ++- src/waggle/plugin/rabbitmq.py | 2 +- tests/test_plugin.py | 8 ++++++++ 3 files changed, 11 insertions(+), 2 deletions(-) diff --git a/src/waggle/plugin/plugin.py b/src/waggle/plugin/plugin.py index 0806c32..3252851 100644 --- a/src/waggle/plugin/plugin.py +++ b/src/waggle/plugin/plugin.py @@ -60,7 +60,8 @@ def __exit__(self, exc_type, exc_value, exc_traceback): task.done.wait() def subscribe(self, *topics): - self.tasks.append(RabbitMQConsumer(topics, self.config, self.send, self.stop)) + self.tasks.append(RabbitMQConsumer(topics, self.config, self.recv, self.stop)) + # TODO(sean) add mock or integration testing against rabbitmq to actually test this def get(self, timeout=None): try: diff --git a/src/waggle/plugin/rabbitmq.py b/src/waggle/plugin/rabbitmq.py index f6be12b..890b406 100644 --- a/src/waggle/plugin/rabbitmq.py +++ b/src/waggle/plugin/rabbitmq.py @@ -62,7 +62,7 @@ def __flush_messages(self, ch): properties = pika.BasicProperties( delivery_mode=2, user_id=self.params.credentials.username) - + # NOTE app_id is used by data service to validate and tag additional metadata provided by k3s scheduler. if self.config.app_id != "": properties.app_id = self.config.app_id diff --git a/tests/test_plugin.py b/tests/test_plugin.py index 6ad20ac..49d2b5c 100644 --- a/tests/test_plugin.py +++ b/tests/test_plugin.py @@ -8,6 +8,9 @@ from waggle.plugin import Plugin, PluginConfig, Uploader, get_timestamp import wagglemsg +# TODO(sean) add integration testing against rabbitmq +# TODO(sean) clean up the queue interface. it would be better to not know about the plugin.send / plugin.recv queues explicitly. + class TestPlugin(unittest.TestCase): @@ -33,6 +36,11 @@ def test_get(self): with self.assertRaises(TimeoutError): plugin.get(timeout=0.001) + msg = wagglemsg.Message("test", 1.0, 0, {}) + plugin.recv.put(msg) + msg2 = plugin.get(timeout=0) + self.assertEqual(msg, msg2) + def test_get_timestamp(self): ts = get_timestamp() self.assertIsInstance(ts, int)