Skip to content

Commit

Permalink
Added support for Plugin context manager. (#23)
Browse files Browse the repository at this point in the history
* added support for with Plugin() as plugin block

* remove trace messages

* updated docs
  • Loading branch information
seanshahkarami authored Dec 9, 2021
1 parent 0a7807e commit 258f4e1
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 103 deletions.
100 changes: 35 additions & 65 deletions docs/writing-a-plugin.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,36 +63,14 @@ touch requirements.txt
Create a new file called `main.py` with the following code:

```python
from waggle import plugin
from time import sleep

plugin.init()

counter = 0

while True:
sleep(1)
print("publishing value", counter)
plugin.publish("hello.world.counter", counter)
counter += 1
```

Let's walk through the pywaggle related details. First, we import the pywaggle plugin module:

```python
from waggle import plugin
```

Next, we initialize our plugin. This will prepare our plugin to interface with Waggle system services.

```python
plugin.init()
```

Finally, we publish our counter value. This will queue up our measurement name and value along with the current timestamp and will eventually be shipped to our data repository where it can be accessed.
from waggle.plugin import Plugin
import time

```python
plugin.publish("hello.world.counter", counter)
with Plugin() as plugin:
for i in range(10):
print("publishing value", i)
plugin.publish("hello.world.value", i)
time.sleep(1)
```

### 5. Run plugin
Expand Down Expand Up @@ -200,15 +178,15 @@ Plugins can subscribe to measurements published by other plugins running on the
The followng basic example simply waits for measurements named "my.sensor.name" and prints the value it received.

```python
from waggle.plugin import plugin
from waggle.plugin import Plugin
from random import random
plugin.init()
plugin.subscribe("my.sensor.name")
with Plugin() as plugin:
plugin.subscribe("my.sensor.name")
while True:
msg = plugin.get()
print("Another plugin published my.sensor.name value", msg.value)
while True:
msg = plugin.get()
print("Another plugin published my.sensor.name value", msg.value)
```

In the case you need multiple multiple measurements, you can simply use:
Expand Down Expand Up @@ -255,21 +233,17 @@ pywaggle provides a simple abstraction to cameras and microphones.
### Accessing a video stream

```python
from waggle import plugin
from waggle.plugin import Plugin
from waggle.data.vision import Camera
import time
plugin.init()
# open default local camera
camera = Camera()
# process samples from video stream
for sample in camera.stream():
count = count_cars_in_image(sample.data)
if count > 10:
sample.save("cars.jpg")
plugin.upload_file("cars.jpg")
with Plugin() as plugin, Camera() as camera:
# process samples from video stream
for sample in camera.stream():
count = count_cars_in_image(sample.data)
if count > 10:
sample.save("cars.jpg")
plugin.upload_file("cars.jpg")
```

The `camera.stream()` function yields a sequence of `ImageSample` with the following properties:
Expand All @@ -296,20 +270,17 @@ camera = Camera("bottom_camera")
### Recording audio data

```python
from waggle import plugin
from waggle.plugin import Plugin
from waggle.data.audio import Microphone
import time
plugin.init()
microphone = Microphone()
# record and upload a 10s sample periodically
while True:
sample = microphone.record(10)
sample.save("sample.ogg")
plugin.upload_file("sample.ogg")
time.sleep(300)
with Plugin() as plugin, Microphone() as microphone:
# record and upload a 10s sample periodically
while True:
sample = microphone.record(10)
sample.save("sample.ogg")
plugin.upload_file("sample.ogg")
time.sleep(300)
```

Similar to `ImageSample`, `AudioSample` provide the following properties:
Expand Down Expand Up @@ -377,7 +348,7 @@ camera = ImageFolder(format=BGR)
If we run the basic example, the only thing we'll see is the message "publishing a value!" every second. If you need to see more details, pywaggle is designed to easily interface with Python's standard logging module. To enable debug logging, simply make the following additions:

```python
from waggle import plugin
from waggle.plugin import Plugin
from time import sleep
# 1. import standard logging module
Expand All @@ -386,12 +357,11 @@ import logging
# 2. enable debug logging
logging.basicConfig(level=logging.DEBUG)
plugin.init()
while True:
sleep(1)
print("publishing a value!")
plugin.publish("my.sensor.name", 123)
with Plugin() as plugin:
while True:
sleep(1)
print("publishing a value!")
plugin.publish("my.sensor.name", 123)
```

You should see a lot of information like:
Expand Down
29 changes: 16 additions & 13 deletions tests/test_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,26 @@ def tearDown(self):
plugin.stop()

def test_publish(self):
plugin.publish('test.int', 1)
plugin.publish('test.float', 2.0)
plugin.publish('test.bytes', b'three')
plugin.publish('cows.total', 391, meta={
"camera": "bottom_left",
})
with Plugin() as plugin:
plugin.publish('test.int', 1)
plugin.publish('test.float', 2.0)
plugin.publish('test.bytes', b'three')
plugin.publish('cows.total', 391, meta={
"camera": "bottom_left",
})

def test_publish_check_reserved(self):
with self.assertRaises(ValueError):
plugin.publish("upload", "path/to/data")
with Plugin() as plugin:
with self.assertRaises(ValueError):
plugin.publish("upload", "path/to/data")

def test_get(self):
plugin.subscribe('raw.#')
with self.assertRaises(TimeoutError):
plugin.get(timeout=0)
with self.assertRaises(TimeoutError):
plugin.get(timeout=0.001)
with Plugin() as plugin:
plugin.subscribe('raw.#')
with self.assertRaises(TimeoutError):
plugin.get(timeout=0)
with self.assertRaises(TimeoutError):
plugin.get(timeout=0.001)

def test_get_timestamp(self):
ts = plugin.get_timestamp()
Expand Down
64 changes: 39 additions & 25 deletions waggle/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,25 @@ def raise_for_invalid_publish_name(s):

class Plugin:

def __init__(self, config, uploader=None):
def __init__(self, config=None, uploader=None):
# default config from env vars
if config is None:
config = PluginConfig(
username=getenv("WAGGLE_PLUGIN_USERNAME", "plugin"),
password=getenv("WAGGLE_PLUGIN_PASSWORD", "plugin"),
host=getenv("WAGGLE_PLUGIN_HOST", "rabbitmq"),
port=int(getenv("WAGGLE_PLUGIN_PORT", 5672)),
app_id=getenv("WAGGLE_APP_ID", ""),
)

self.config = config

# default upload directory
if uploader is None:
uploader = Uploader(Path(getenv("WAGGLE_PLUGIN_UPLOAD_PATH", "/run/waggle/uploads")))

self.uploader = uploader

self.connection_parameters = pika.ConnectionParameters(
host=config.host,
port=config.port,
Expand All @@ -81,15 +97,29 @@ def __init__(self, config, uploader=None):
self.incoming_queue = Queue()
self.subscribe_queue = Queue()

self.uploader = uploader
def __enter__(self):
self.init()
# self.publish("plugin.status", "start")
return self

def __exit__(self, exc_type, exc_value, exc_traceback):
# if exc_type is None:
# self.publish("plugin.status", "stop")
# else:
# self.publish("plugin.status", "error")
self.stop()

def init(self):
logger.debug("starting plugin worker thread")
if self.running.is_set():
raise RuntimeError("cannot init already running plugin")
self.running.set()
Thread(target=self.run_rabbitmq_worker, daemon=True).start()

def stop(self):
def stop(self, timeout=None):
logger.debug("stopping plugin worker thread")
self.running.clear()
self.stopped.wait(timeout=timeout)

def get(self, timeout=None):
try:
Expand Down Expand Up @@ -126,28 +156,21 @@ def upload_file(self, path, meta={}, timestamp=None, keep=False):
self.__publish("upload", upload_path.name, meta, timestamp)

def run_rabbitmq_worker(self):
if self.running.is_set():
logger.warning("already have an instance of rabbitmq worker running")
return

try:
self.running.set()
self.stopped.clear()

while self.running.is_set():
try:
logger.debug("connecting to rabbitmq broker at %s:%d with username %r",
self.connection_parameters.host,
self.connection_parameters.port,
self.connection_parameters.credentials.username)
self.connection_parameters.host,
self.connection_parameters.port,
self.connection_parameters.credentials.username)
with pika.BlockingConnection(self.connection_parameters) as connection:
logger.debug("connected to rabbitmq broker")
self.rabbitmq_worker_mainloop(connection)
except Exception as exc:
logger.debug("rabbitmq connection error: %s", exc)
time.sleep(1)
finally:
self.running.clear()
self.stopped.set()

def rabbitmq_worker_mainloop(self, connection):
Expand Down Expand Up @@ -198,7 +221,7 @@ def process_queues_and_events():
process_subscribe_queue()
process_publish_queue()
if self.running.is_set():
connection.call_later(0.001, process_queues_and_events)
connection.call_later(0.01, process_queues_and_events)
else:
logger.debug("stopping rabbitmq processing loop")
channel.stop_consuming()
Expand All @@ -207,7 +230,7 @@ def process_queues_and_events():
queue = channel.queue_declare("", exclusive=True).method.queue
channel.basic_consume(queue, subscriber_callback, auto_ack=True)
# setup periodic publish and subscribe to topic checks
connection.call_later(0.001, process_queues_and_events)
connection.call_later(0.01, process_queues_and_events)
logger.debug("starting rabbitmq processing loop")
channel.start_consuming()

Expand Down Expand Up @@ -270,17 +293,8 @@ def write_json_file(path, obj):
json.dump(obj, f, separators=(',', ':'), sort_keys=True)


# define global default instance of Uploader
uploader = Uploader(Path(getenv("WAGGLE_PLUGIN_UPLOAD_PATH", "/run/waggle/uploads")))

# define global default instance of Plugin
plugin = Plugin(PluginConfig(
username=getenv("WAGGLE_PLUGIN_USERNAME", "plugin"),
password=getenv("WAGGLE_PLUGIN_PASSWORD", "plugin"),
host=getenv("WAGGLE_PLUGIN_HOST", "rabbitmq"),
port=int(getenv("WAGGLE_PLUGIN_PORT", 5672)),
app_id=getenv("WAGGLE_APP_ID", ""),
), uploader=uploader)
plugin = Plugin()
init = plugin.init
stop = plugin.stop
subscribe = plugin.subscribe
Expand Down

0 comments on commit 258f4e1

Please sign in to comment.