Skip to content

Commit

Permalink
fixed consumer queue
Browse files Browse the repository at this point in the history
  • Loading branch information
seanshahkarami committed Jul 11, 2022
1 parent 623535b commit 0dcb553
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 2 deletions.
3 changes: 2 additions & 1 deletion src/waggle/plugin/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ def __exit__(self, exc_type, exc_value, exc_traceback):
task.done.wait()

def subscribe(self, *topics):
self.tasks.append(RabbitMQConsumer(topics, self.config, self.send, self.stop))
self.tasks.append(RabbitMQConsumer(topics, self.config, self.recv, self.stop))
# TODO(sean) add mock or integration testing against rabbitmq to actually test this

def get(self, timeout=None):
try:
Expand Down
2 changes: 1 addition & 1 deletion src/waggle/plugin/rabbitmq.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ def __flush_messages(self, ch):
properties = pika.BasicProperties(
delivery_mode=2,
user_id=self.params.credentials.username)

# NOTE app_id is used by data service to validate and tag additional metadata provided by k3s scheduler.
if self.config.app_id != "":
properties.app_id = self.config.app_id
Expand Down
8 changes: 8 additions & 0 deletions tests/test_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
from waggle.plugin import Plugin, PluginConfig, Uploader, get_timestamp
import wagglemsg

# TODO(sean) add integration testing against rabbitmq
# TODO(sean) clean up the queue interface. it would be better to not know about the plugin.send / plugin.recv queues explicitly.


class TestPlugin(unittest.TestCase):

Expand All @@ -33,6 +36,11 @@ def test_get(self):
with self.assertRaises(TimeoutError):
plugin.get(timeout=0.001)

msg = wagglemsg.Message("test", 1.0, 0, {})
plugin.recv.put(msg)
msg2 = plugin.get(timeout=0)
self.assertEqual(msg, msg2)

def test_get_timestamp(self):
ts = get_timestamp()
self.assertIsInstance(ts, int)
Expand Down

0 comments on commit 0dcb553

Please sign in to comment.