From 258f4e1dc2cc312b8a20bd9e4504db47ca0c1c1e Mon Sep 17 00:00:00 2001 From: Sean Shahkarami Date: Thu, 9 Dec 2021 10:42:25 -0600 Subject: [PATCH] Added support for Plugin context manager. (#23) * added support for with Plugin() as plugin block * remove trace messages * updated docs --- docs/writing-a-plugin.md | 100 ++++++++++++++------------------------- tests/test_plugin.py | 29 +++++++----- waggle/plugin.py | 64 +++++++++++++++---------- 3 files changed, 90 insertions(+), 103 deletions(-) diff --git a/docs/writing-a-plugin.md b/docs/writing-a-plugin.md index 96d0e54..1587229 100644 --- a/docs/writing-a-plugin.md +++ b/docs/writing-a-plugin.md @@ -63,36 +63,14 @@ touch requirements.txt Create a new file called `main.py` with the following code: ```python -from waggle import plugin -from time import sleep - -plugin.init() - -counter = 0 - -while True: - sleep(1) - print("publishing value", counter) - plugin.publish("hello.world.counter", counter) - counter += 1 -``` - -Let's walk through the pywaggle related details. First, we import the pywaggle plugin module: - -```python -from waggle import plugin -``` - -Next, we initialize our plugin. This will prepare our plugin to interface with Waggle system services. - -```python -plugin.init() -``` - -Finally, we publish our counter value. This will queue up our measurement name and value along with the current timestamp and will eventually be shipped to our data repository where it can be accessed. +from waggle.plugin import Plugin +import time -```python -plugin.publish("hello.world.counter", counter) +with Plugin() as plugin: + for i in range(10): + print("publishing value", i) + plugin.publish("hello.world.value", i) + time.sleep(1) ``` ### 5. Run plugin @@ -200,15 +178,15 @@ Plugins can subscribe to measurements published by other plugins running on the The followng basic example simply waits for measurements named "my.sensor.name" and prints the value it received. ```python -from waggle.plugin import plugin +from waggle.plugin import Plugin from random import random -plugin.init() -plugin.subscribe("my.sensor.name") +with Plugin() as plugin: + plugin.subscribe("my.sensor.name") -while True: - msg = plugin.get() - print("Another plugin published my.sensor.name value", msg.value) + while True: + msg = plugin.get() + print("Another plugin published my.sensor.name value", msg.value) ``` In the case you need multiple multiple measurements, you can simply use: @@ -255,21 +233,17 @@ pywaggle provides a simple abstraction to cameras and microphones. ### Accessing a video stream ```python -from waggle import plugin +from waggle.plugin import Plugin from waggle.data.vision import Camera import time -plugin.init() - -# open default local camera -camera = Camera() - -# process samples from video stream -for sample in camera.stream(): - count = count_cars_in_image(sample.data) - if count > 10: - sample.save("cars.jpg") - plugin.upload_file("cars.jpg") +with Plugin() as plugin, Camera() as camera: + # process samples from video stream + for sample in camera.stream(): + count = count_cars_in_image(sample.data) + if count > 10: + sample.save("cars.jpg") + plugin.upload_file("cars.jpg") ``` The `camera.stream()` function yields a sequence of `ImageSample` with the following properties: @@ -296,20 +270,17 @@ camera = Camera("bottom_camera") ### Recording audio data ```python -from waggle import plugin +from waggle.plugin import Plugin from waggle.data.audio import Microphone import time -plugin.init() - -microphone = Microphone() - -# record and upload a 10s sample periodically -while True: - sample = microphone.record(10) - sample.save("sample.ogg") - plugin.upload_file("sample.ogg") - time.sleep(300) +with Plugin() as plugin, Microphone() as microphone: + # record and upload a 10s sample periodically + while True: + sample = microphone.record(10) + sample.save("sample.ogg") + plugin.upload_file("sample.ogg") + time.sleep(300) ``` Similar to `ImageSample`, `AudioSample` provide the following properties: @@ -377,7 +348,7 @@ camera = ImageFolder(format=BGR) If we run the basic example, the only thing we'll see is the message "publishing a value!" every second. If you need to see more details, pywaggle is designed to easily interface with Python's standard logging module. To enable debug logging, simply make the following additions: ```python -from waggle import plugin +from waggle.plugin import Plugin from time import sleep # 1. import standard logging module @@ -386,12 +357,11 @@ import logging # 2. enable debug logging logging.basicConfig(level=logging.DEBUG) -plugin.init() - -while True: - sleep(1) - print("publishing a value!") - plugin.publish("my.sensor.name", 123) +with Plugin() as plugin: + while True: + sleep(1) + print("publishing a value!") + plugin.publish("my.sensor.name", 123) ``` You should see a lot of information like: diff --git a/tests/test_plugin.py b/tests/test_plugin.py index 42e18a4..1049915 100644 --- a/tests/test_plugin.py +++ b/tests/test_plugin.py @@ -18,23 +18,26 @@ def tearDown(self): plugin.stop() def test_publish(self): - plugin.publish('test.int', 1) - plugin.publish('test.float', 2.0) - plugin.publish('test.bytes', b'three') - plugin.publish('cows.total', 391, meta={ - "camera": "bottom_left", - }) + with Plugin() as plugin: + plugin.publish('test.int', 1) + plugin.publish('test.float', 2.0) + plugin.publish('test.bytes', b'three') + plugin.publish('cows.total', 391, meta={ + "camera": "bottom_left", + }) def test_publish_check_reserved(self): - with self.assertRaises(ValueError): - plugin.publish("upload", "path/to/data") + with Plugin() as plugin: + with self.assertRaises(ValueError): + plugin.publish("upload", "path/to/data") def test_get(self): - plugin.subscribe('raw.#') - with self.assertRaises(TimeoutError): - plugin.get(timeout=0) - with self.assertRaises(TimeoutError): - plugin.get(timeout=0.001) + with Plugin() as plugin: + plugin.subscribe('raw.#') + with self.assertRaises(TimeoutError): + plugin.get(timeout=0) + with self.assertRaises(TimeoutError): + plugin.get(timeout=0.001) def test_get_timestamp(self): ts = plugin.get_timestamp() diff --git a/waggle/plugin.py b/waggle/plugin.py index d86085c..320280a 100644 --- a/waggle/plugin.py +++ b/waggle/plugin.py @@ -60,9 +60,25 @@ def raise_for_invalid_publish_name(s): class Plugin: - def __init__(self, config, uploader=None): + def __init__(self, config=None, uploader=None): + # default config from env vars + if config is None: + config = PluginConfig( + username=getenv("WAGGLE_PLUGIN_USERNAME", "plugin"), + password=getenv("WAGGLE_PLUGIN_PASSWORD", "plugin"), + host=getenv("WAGGLE_PLUGIN_HOST", "rabbitmq"), + port=int(getenv("WAGGLE_PLUGIN_PORT", 5672)), + app_id=getenv("WAGGLE_APP_ID", ""), + ) + self.config = config + # default upload directory + if uploader is None: + uploader = Uploader(Path(getenv("WAGGLE_PLUGIN_UPLOAD_PATH", "/run/waggle/uploads"))) + + self.uploader = uploader + self.connection_parameters = pika.ConnectionParameters( host=config.host, port=config.port, @@ -81,15 +97,29 @@ def __init__(self, config, uploader=None): self.incoming_queue = Queue() self.subscribe_queue = Queue() - self.uploader = uploader + def __enter__(self): + self.init() + # self.publish("plugin.status", "start") + return self + + def __exit__(self, exc_type, exc_value, exc_traceback): + # if exc_type is None: + # self.publish("plugin.status", "stop") + # else: + # self.publish("plugin.status", "error") + self.stop() def init(self): logger.debug("starting plugin worker thread") + if self.running.is_set(): + raise RuntimeError("cannot init already running plugin") + self.running.set() Thread(target=self.run_rabbitmq_worker, daemon=True).start() - def stop(self): + def stop(self, timeout=None): logger.debug("stopping plugin worker thread") self.running.clear() + self.stopped.wait(timeout=timeout) def get(self, timeout=None): try: @@ -126,20 +156,14 @@ def upload_file(self, path, meta={}, timestamp=None, keep=False): self.__publish("upload", upload_path.name, meta, timestamp) def run_rabbitmq_worker(self): - if self.running.is_set(): - logger.warning("already have an instance of rabbitmq worker running") - return - try: - self.running.set() self.stopped.clear() - while self.running.is_set(): try: logger.debug("connecting to rabbitmq broker at %s:%d with username %r", - self.connection_parameters.host, - self.connection_parameters.port, - self.connection_parameters.credentials.username) + self.connection_parameters.host, + self.connection_parameters.port, + self.connection_parameters.credentials.username) with pika.BlockingConnection(self.connection_parameters) as connection: logger.debug("connected to rabbitmq broker") self.rabbitmq_worker_mainloop(connection) @@ -147,7 +171,6 @@ def run_rabbitmq_worker(self): logger.debug("rabbitmq connection error: %s", exc) time.sleep(1) finally: - self.running.clear() self.stopped.set() def rabbitmq_worker_mainloop(self, connection): @@ -198,7 +221,7 @@ def process_queues_and_events(): process_subscribe_queue() process_publish_queue() if self.running.is_set(): - connection.call_later(0.001, process_queues_and_events) + connection.call_later(0.01, process_queues_and_events) else: logger.debug("stopping rabbitmq processing loop") channel.stop_consuming() @@ -207,7 +230,7 @@ def process_queues_and_events(): queue = channel.queue_declare("", exclusive=True).method.queue channel.basic_consume(queue, subscriber_callback, auto_ack=True) # setup periodic publish and subscribe to topic checks - connection.call_later(0.001, process_queues_and_events) + connection.call_later(0.01, process_queues_and_events) logger.debug("starting rabbitmq processing loop") channel.start_consuming() @@ -270,17 +293,8 @@ def write_json_file(path, obj): json.dump(obj, f, separators=(',', ':'), sort_keys=True) -# define global default instance of Uploader -uploader = Uploader(Path(getenv("WAGGLE_PLUGIN_UPLOAD_PATH", "/run/waggle/uploads"))) - # define global default instance of Plugin -plugin = Plugin(PluginConfig( - username=getenv("WAGGLE_PLUGIN_USERNAME", "plugin"), - password=getenv("WAGGLE_PLUGIN_PASSWORD", "plugin"), - host=getenv("WAGGLE_PLUGIN_HOST", "rabbitmq"), - port=int(getenv("WAGGLE_PLUGIN_PORT", 5672)), - app_id=getenv("WAGGLE_APP_ID", ""), -), uploader=uploader) +plugin = Plugin() init = plugin.init stop = plugin.stop subscribe = plugin.subscribe