Skip to content

Commit

Permalink
Adding a retain flag supported by mqtt in the gateway and status topi…
Browse files Browse the repository at this point in the history
…cs in mqtt.

Some brokers (like AWS iot core) do not support retain messages.
Add option to transport to specify if the retain is supported by broker.

New MQTT topics can be used in the case where the mqtt broker does not support retained message
and we would like to know the status of the gateways as soon as possible.

The concerned topics in the mqtt broker are :
gw-request/get_gw_status
gw-response/get_gw_status/<gw-id>/<sink-id>
  • Loading branch information
LePailleurThibault committed Oct 27, 2022
1 parent d9d43d3 commit 643764f
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 8 deletions.
12 changes: 12 additions & 0 deletions python_transport/tests/test_arguments.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
env_vars["WM_SERVICES_MQTT_PERSIST_SESSION"] = True
env_vars["WM_SERVICES_MQTT_FORCE_UNSECURE"] = True
env_vars["WM_SERVICES_MQTT_ALLOW_UNTRUSTED"] = True
env_vars["WM_SERVICES_MQTT_RETAIN_FLAG_SUPPORTED"] = True

env_vars["WM_GW_BUFFERING_MAX_BUFFERED_PACKETS"] = 1000
env_vars["WM_GW_BUFFERING_MAX_DELAY_WITHOUT_PUBLISH"] = 128
Expand Down Expand Up @@ -60,11 +61,13 @@
file_vars["gateway_version"] = env_vars["WM_GW_VERSION"]
file_vars["ignored_endpoints_filter"] = env_vars["WM_GW_IGNORED_ENDPOINTS_FILTER"]
file_vars["whitened_endpoints_filter"] = env_vars["WM_GW_WHITENED_ENDPOINTS_FILTER"]
file_vars["mqtt_retain_flag_supported"] = env_vars["WM_SERVICES_MQTT_RETAIN_FLAG_SUPPORTED"]

booleans = [
"WM_SERVICES_MQTT_PERSIST_SESSION",
"WM_SERVICES_MQTT_FORCE_UNSECURE",
"WM_SERVICES_MQTT_ALLOW_UNTRUSTED",
"WM_SERVICES_MQTT_RETAIN_FLAG_SUPPORTED",
]


Expand Down Expand Up @@ -139,6 +142,14 @@ def content_tests(settings, vcopy):
vcopy["WM_SERVICES_MQTT_ALLOW_UNTRUSTED"] == settings.mqtt_allow_untrusted
)

if "WM_SERVICES_MQTT_RETAIN_FLAG_SUPPORTED" not in vcopy:
assert settings.mqtt_retain_flag_supported is False
else:
assert (
vcopy["WM_SERVICES_MQTT_RETAIN_FLAG_SUPPORTED"]
== settings.mqtt_retain_flag_supported
)

assert vcopy["WM_SERVICES_MQTT_RECONNECT_DELAY"] == settings.mqtt_reconnect_delay
assert (
vcopy["WM_GW_BUFFERING_MAX_BUFFERED_PACKETS"]
Expand Down Expand Up @@ -199,6 +210,7 @@ def test_defaults():
assert settings.mqtt_persist_session is False
assert settings.mqtt_force_unsecure is False
assert settings.mqtt_allow_untrusted is False
assert settings.mqtt_retain_flag_supported is False
assert settings.mqtt_reconnect_delay == 0
assert settings.buffering_max_buffered_packets == 0
assert settings.buffering_max_delay_without_publish == 0
Expand Down
12 changes: 11 additions & 1 deletion python_transport/wirepas_gateway/protocol/mqtt_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,13 @@ def __init__(
# Keep track of latest published packet
self._publish_monitor = PublishMonitor()

# load special settings for broker compatibility
self.retain_supported = settings.mqtt_retain_flag_supported
self.logger.info(
"MQTT retain flag supported is set to %s",
settings.mqtt_retain_flag_supported
)

if settings.mqtt_use_websocket:
transport = "websockets"
self._use_websockets = True
Expand Down Expand Up @@ -234,7 +241,7 @@ def _get_socket(self):

def _set_last_will(self, topic, data):
# Set Last wil message
self._client.will_set(topic, data, qos=2, retain=True)
self._client.will_set(topic, data, qos=2, retain=self.retain_supported)

def run(self):
self.running = True
Expand Down Expand Up @@ -279,6 +286,9 @@ def _publish_from_wrapper_thread(self, topic, payload, qos, retain):
retain: Is it a retain message
"""
# Clear retain flag if not supported
retain = retain and self.retain_supported

mid = self._client.publish(topic, payload, qos=qos, retain=retain).mid
self._unpublished_mid_set.add(mid)

Expand Down
8 changes: 8 additions & 0 deletions python_transport/wirepas_gateway/protocol/topic_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ def make_otap_set_target_scratchpad_request_topic(gw_id="+", sink_id="+"):
def make_get_gateway_info_request_topic(gw_id):
return TopicGenerator._make_request_topic("get_gw_info", [str(gw_id)])

@staticmethod
def make_get_gw_status_request_topic():
return TopicGenerator._make_request_topic("get_gw_status", [])

##################
# Response Part
##################
Expand Down Expand Up @@ -125,6 +129,10 @@ def make_otap_set_target_scratchpad_response_topic(gw_id="+", sink_id="+"):
def make_get_gateway_info_response_topic(gw_id):
return TopicGenerator._make_response_topic("get_gw_info", [str(gw_id)])

@staticmethod
def make_get_gw_status_response_topic(gw_id):
return TopicGenerator._make_response_topic("get_gw_status", [str(gw_id)])

##################
# Event Part
##################
Expand Down
44 changes: 37 additions & 7 deletions python_transport/wirepas_gateway/transport_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,9 +188,12 @@ def __init__(self, settings, logger=None, **kwargs):
self.gw_model = settings.gateway_model
self.gw_version = settings.gateway_version

# Does broker support retain flag
self.retain_supported = settings.mqtt_retain_flag_supported

self.whitened_ep_filter = settings.whitened_endpoints_filter

last_will_topic = TopicGenerator.make_status_topic(self.gw_id)
self.status_topic = TopicGenerator.make_status_topic(self.gw_id)
last_will_message = wmm.StatusEvent(
self.gw_id, wmm.GatewayState.OFFLINE
).payload
Expand All @@ -200,7 +203,7 @@ def __init__(self, settings, logger=None, **kwargs):
self.logger,
self._on_mqtt_wrapper_termination_cb,
self._on_connect,
last_will_topic,
self.status_topic,
last_will_message,
)

Expand Down Expand Up @@ -246,9 +249,9 @@ def _on_mqtt_wrapper_termination_cb(self):
def _set_status(self):
event_online = wmm.StatusEvent(self.gw_id, wmm.GatewayState.ONLINE)

topic = TopicGenerator.make_status_topic(self.gw_id)

self.mqtt_wrapper.publish(topic, event_online.payload, qos=1, retain=True)
self.mqtt_wrapper.publish(
self.status_topic, event_online.payload, qos=1, retain=self.retain_supported
)

def _on_connect(self):
# Register for get gateway info
Expand Down Expand Up @@ -290,13 +293,17 @@ def _on_connect(self):
topic, self._on_otap_set_target_scratchpad_request_received
)

topic = TopicGenerator.make_get_gw_status_request_topic()
self.mqtt_wrapper.subscribe(
topic, self._on_get_gw_status_request_received
)

# Register ourself to our status in case someone else (by mistake)
# update our status.
# It will work only if we are allowed to register for event topic
# at broker level
topic = TopicGenerator.make_status_topic(self.gw_id)
self.mqtt_wrapper.subscribe(
topic, self._on_own_status_received
self.status_topic, self._on_own_status_received
)

self._set_status()
Expand Down Expand Up @@ -753,6 +760,29 @@ def _on_otap_set_target_scratchpad_request_received(

self.mqtt_wrapper.publish(topic, response.payload, qos=2)

@deferred_thread
def _on_get_gw_status_request_received(
self, client, userdata, message
):
# pylint: disable=unused-argument
res = wmm.GatewayResultCode.GW_RES_OK
self.logger.info("Get gateway status request received")
try:
request = wmm.GetGatewayStatusRequest.from_payload(
message.payload
)
except wmm.GatewayAPIParsingException as e:
self.logger.error(str(e))
return

response = wmm.GetGatewayStatusResponse(
request.req_id, self.gw_id, res, wmm.GatewayState.ONLINE
)
topic = TopicGenerator.make_get_gw_status_response_topic(
self.gw_id
)

self.mqtt_wrapper.publish(topic, response.payload, qos=1)

def parse_setting_list(list_setting):
""" This function parse ep list specified from setting file or cmd line
Expand Down
9 changes: 9 additions & 0 deletions python_transport/wirepas_gateway/utils/argument_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,15 @@ def add_mqtt(self):
),
)

self.mqtt.add_argument(
"--mqtt_retain_flag_supported",
default=os.environ.get("WM_SERVICES_MQTT_RETAIN_FLAG_SUPPORTED", True),
type=self.str2bool,
nargs="?",
const=True,
help=("Set to true if broker support retain flag"),
)

def add_buffering_settings(self):
""" Parameters used to avoid black hole case """
self.buffering.add_argument(
Expand Down

0 comments on commit 643764f

Please sign in to comment.