Skip to content

Commit

Permalink
serialize message before adding to queue to fail on unsupported types
Browse files Browse the repository at this point in the history
  • Loading branch information
seanshahkarami committed Oct 14, 2021
1 parent e260830 commit 684335c
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 5 deletions.
6 changes: 5 additions & 1 deletion tests/test_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import json
from tempfile import TemporaryDirectory

import wagglemsg


class TestPlugin(unittest.TestCase):

Expand Down Expand Up @@ -88,14 +90,16 @@ def test_upload_file(self):
upload_path.write_bytes(data)

pl.upload_file(upload_path)
scope, msg = pl.outgoing_queue.get_nowait()
scope, body = pl.outgoing_queue.get_nowait()
msg = wagglemsg.load(body)
self.assertEqual(scope, "all")
self.assertEqual(msg.name, "upload")
self.assertIsNotNone(msg.timestamp)
self.assertIsInstance(msg.value, str)
self.assertIsNotNone(msg.meta)
self.assertIn("filename", msg.meta)


class TestUploader(unittest.TestCase):

def test_upload_file(self):
Expand Down
9 changes: 5 additions & 4 deletions waggle/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ def subscribe(self, *topics):
def __publish(self, name, value, meta, timestamp, scope="all", timeout=None):
msg = wagglemsg.Message(name=name, value=value, timestamp=timestamp, meta=meta)
logger.debug("adding message to outgoing queue: %s", msg)
self.outgoing_queue.put((scope, msg), timeout=timeout)
self.outgoing_queue.put((scope, wagglemsg.dump(msg)), timeout=timeout)

def publish(self, name, value, meta={}, timestamp=None, scope="all", timeout=None):
if timestamp is None:
Expand Down Expand Up @@ -174,7 +174,7 @@ def process_subscribe_queue():
def process_publish_queue():
while self.running.is_set():
try:
scope, msg = self.outgoing_queue.get_nowait()
scope, body = self.outgoing_queue.get_nowait()
except Empty:
break
properties = pika.BasicProperties(
Expand All @@ -185,8 +185,9 @@ def process_publish_queue():
properties.app_id = self.config.app_id
# NOTE app_id is used by data service to validate and tag additional metadata provided by k3s scheduler.

body = wagglemsg.dump(msg)
logger.debug("publishing message to rabbitmq: %s", msg)
if logger.isEnabledFor(logging.DEBUG):
logger.debug("publishing message to rabbitmq: %s", wagglemsg.load(body))

channel.basic_publish(
exchange="to-validator",
routing_key=scope,
Expand Down

0 comments on commit 684335c

Please sign in to comment.