Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/otap as chunks #44

Merged
merged 5 commits into from
Nov 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions .github/workflows/python-package.yml
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ jobs:
- name: Set Version for PR
if: ${{ github.event_name == 'pull_request' }}
run: echo "VERSION=PR_${{ github.event.pull_request.number }}" >> $GITHUB_ENV

- name: Set Version for manual triggering
if: ${{ github.event_name == 'workflow_dispatch' }}
run: echo "VERSION=${{ github.event.inputs.version_name }}" >> $GITHUB_ENV
Expand All @@ -72,10 +72,10 @@ jobs:

- name: Build the wheel
run: python setup.py sdist bdist_wheel

- name: Save produced wheel name and path
run: echo "WHEEL_FILE=$(ls dist/*-py3-none-any.whl)" >> $GITHUB_ENV

- name: Store artifacts
uses: actions/upload-artifact@v4
with:
Expand All @@ -99,7 +99,7 @@ jobs:
user: __token__
password: ${{ secrets.PYPI_TEST_PWD }}
repository_url: https://test.pypi.org/legacy/

- name: Publish package to PyPI for release
if: ${{ github.event_name == 'release' }}
uses: pypa/[email protected]
Expand Down
9 changes: 4 additions & 5 deletions .github/workflows/sphinx.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,23 +11,22 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2

- name: Setup python
uses: actions/setup-python@v2
with:
python-version: '3.7'

- name: Install pip packages
run: pip install sphinx==4.2.0 sphinx-rtd-theme==0.5.1

- name: Insatll package requirement
run: pip install -r requirements.txt

- name: Build the doc
run: |
cd docs
make html

- uses: actions/upload-artifact@v4
with:
name: DocumentationHTML
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
wirepas_mesh_messaging==1.2.5
wirepas_mesh_messaging==1.2.6rc1
paho_mqtt==1.5.1
peewee~=3.17.6
183 changes: 127 additions & 56 deletions wirepas_mqtt_library/wirepas_network_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,13 @@ class WirepasNetworkInterface:

# Inner class definition to define a gateway
class _Gateway:
def __init__(self, id, online=False, sinks=None):
def __init__(self, id, online=False, sinks=None, model=None, version=None, max_scratchpad_size=None):
self.id = id
self.online = online
self.sinks = sinks
self.model = model
self.version = version
self.max_scratchpad_size = max_scratchpad_size
self.config_received_event = Event()
if sinks is not None:
self.config_received_event.set()
Expand Down Expand Up @@ -122,7 +125,7 @@ def from_broker_connack(cls, rc):
return error_code

def __init__(self, host, port, username, password,
insecure=False, num_worker_thread=1, strict_mode=True,
insecure=False, num_worker_thread=1, strict_mode=False,
connection_cb=None, client_id="", clean_session=None,
transport="tcp", gw_timeout_s=2):
"""Constructor
Expand Down Expand Up @@ -215,7 +218,7 @@ def _execute_connection(self, host, port):
error = WirepasNetworkInterface.ConnectionErrorCode.CONNECTION_REFUSED
except Exception as e:
error = WirepasNetworkInterface.ConnectionErrorCode.UNKNOWN_ERROR
logging.error("Unknow exception in client connection %s", e)
logging.error("Unknown exception in client connection %s", e)

if self._connection_cb is not None and error is not None:
self._connection_cb(False, error)
Expand Down Expand Up @@ -286,6 +289,10 @@ def _on_status_gateway_received(self, client, userdata, message):
try:
gw = self._gateways[status.gw_id]
gw.online = (status.state == wmm.GatewayState.ONLINE)
gw.model = status.gateway_model
gw.version = status.gateway_version
gw.max_scratchpad_size=status.max_scratchpad_size

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also typo just for missing spaces


if gw.update_all_sink_configs(status.sink_configs):
# Config has changed, notify any subscriber
if self._on_config_changed_cb is not None:
Expand All @@ -295,7 +302,10 @@ def _on_status_gateway_received(self, client, userdata, message):
# Create Gateway
self._gateways[status.gw_id] = self._Gateway(status.gw_id,
status.state == wmm.GatewayState.ONLINE,
status.sink_configs)
status.sink_configs,
model=status.gateway_model,
version=status.gateway_version,
max_scratchpad_size=status.max_scratchpad_size)



Expand Down Expand Up @@ -467,59 +477,45 @@ def _ask_gateway_config(self, gw_id):
# Call update gateway config when receiving it
self._wait_for_response(self._update_gateway_configs, request.req_id)

def _wait_for_configs(fn):
# Decorator to handle waiting for enough time for the setup
# of the network interface
def wrapper(*args, **kwargs):
# args[0] is self

gateways_subset = None
# Check for missing gateway config
if ("gateway" in kwargs):
gateways_subset = kwargs['gateway']
def _wait_for_configs(self, gateways=None):
gateways_to_wait_config = []
for gw in self._gateways.copy().values():
if gateways is not None and gw.id not in gateways:
# Not interested by this gateway, so no need for the config
continue

gateways_to_wait_config = []
for gw in args[0]._gateways.copy().values():
if gateways_subset is not None and gw.id not in gateways_subset:
# Not interested by this gateway, so no need for the config
if gw.online:
if gw.config_received_event.is_set():
# We have already received the config
continue

if gw.online:
if gw.config_received_event.is_set():
# We have already received the config
continue

# Time to ask the gateway config
args[0]._ask_gateway_config(gw.id)

gateways_to_wait_config.append(gw)
# Time to ask the gateway config
self._ask_gateway_config(gw.id)
gateways_to_wait_config.append(gw)

timeout_ts = time() + args[0]._gw_timeout_s

# We have asked config for gateway we never received it
# Check if we received it for all gateways we asked before timeout
for gw in gateways_to_wait_config:
timeout = timeout_ts - time()
if timeout <= 0:
timeout = None
timeout_ts = time() + self._gw_timeout_s

# We have asked config for gateway we never received it
# Check if we received it for all gateways we asked before timeout
for gw in gateways_to_wait_config:
timeout = timeout_ts - time()
if timeout > 0:
# We can still wait
gw.config_received_event.wait(timeout)
if not gw.config_received_event.is_set():
logging.error("Config timeout for gw %s" % gw.id)
logging.error("Is the gateway really online? If not, its status can be cleared by "
"calling clear_gateway_status(\"%s\")", gw.id)
# Mark the initial config as empty list to avoid waiting for it next time
# It may still come later
gw.update_all_sink_configs([])

if args[0]._strict_mode:
logging.error("This Timeout will generate an exception but you can "
"avoid it by starting WirepasNetworkInteface with strict_mode=False")
raise TimeoutError("Cannot get config from online GW %s", gw.id)

return fn(*args, **kwargs)
wrapper.__doc__ = fn.__doc__
return wrapper
if not gw.config_received_event.is_set():
logging.error("Config timeout for gw %s" % gw.id)
logging.error("Is the gateway really online? If not, its status can be cleared by "
"calling clear_gateway_status(\"%s\")", gw.id)
# Mark the initial config as empty list to avoid waiting for it next time
# It may still come later
gw.update_all_sink_configs([])

if self._strict_mode:
logging.error("This Timeout will generate an exception but you can "
"avoid it by starting WirepasNetworkInteface with strict_mode=False")
raise TimeoutError("Cannot get config from online GW %s", gw.id)

def close(self):
"""Explicitly close this network interface as well as the worker threads
Expand All @@ -542,7 +538,6 @@ def close(self):


@_wait_for_connection
@_wait_for_configs
def get_sinks(self, network_address=None, gateway=None):
"""
get_sinks(self, network_address=None, gateway=None)
Expand Down Expand Up @@ -587,6 +582,8 @@ def get_sinks(self, network_address=None, gateway=None):
:raises TimeoutError: If a gateway doesn't send its initial config fast enough

"""
# Wait for subset of gateways
self._wait_for_configs(gateways=gateway)
sinks = list()
for gw in self._gateways.copy().values():
if not gw.online:
Expand Down Expand Up @@ -729,6 +726,54 @@ def send_message(self, gw_id, sink_id, dest, src_ep, dst_ep, payload, qos=0, csm

return self._wait_for_response(cb, request.req_id, param=param)

def _upload_scratchpad_as_chunks(self, topic, sink_id, seq, scratchpad, max_chunk_size, cb, param=None, timeout=60):
end_event = Event()
final_res = None

# Definition of the intermediate callback
# to publish next block if previous one was successfully sent
def next_chunk(response, param):
total_size = scratchpad.__len__()
# Parse the param we set in last call
full_scratchpad, sent_bytes, user_cb, user_param = param

# Is it end of transfer? (last chunk or error)
if (response is not None and response != wmm.GatewayResultCode.GW_RES_OK) \
or sent_bytes == total_size:
# Time to call initial callback of unlock our initial requester
if user_cb is not None:
user_cb(response, user_param)
else:
nonlocal final_res
final_res = response
end_event.set()
return

chunk_size = min(total_size - sent_bytes, max_chunk_size)
request = wmm.UploadScratchpadRequest(seq,
sink_id,
scratchpad=scratchpad[sent_bytes:sent_bytes+chunk_size],
chunk_info={"total_size": total_size,
"offset": sent_bytes})

logging.info("Sending chunk from %d -> %d (%d)", sent_bytes, sent_bytes + chunk_size, total_size)

self._publish(topic, request.payload, 1)
sent_bytes += chunk_size
self._wait_for_response(next_chunk,
request.req_id,
extra_timeout=timeout,
param=(full_scratchpad, sent_bytes, user_cb, user_param))

# Initiate the transfer by calling the next_chunk cb a fisrt time
next_chunk(None, (scratchpad, 0, cb, param))

if cb is None:
# There is no user callback so lock caller until end of transfer
end_event.wait(timeout)

return final_res

@_wait_for_connection
def upload_scratchpad(self, gw_id, sink_id, seq, scratchpad=None, cb=None, param=None, timeout=60):
"""
Expand Down Expand Up @@ -766,12 +811,38 @@ def upload_scratchpad(self, gw_id, sink_id, seq, scratchpad=None, cb=None, param

:raises TimeoutError: Raised if cb is None and response is not received within the specified timeout
"""
request = wmm.UploadScratchpadRequest(seq, sink_id, scratchpad=scratchpad)
try:
# Check what is the max transfert size supported by the gateway
max_size = self._gateways[gw_id].max_scratchpad_size
if max_size is not None:
logging.info("Max scratchpad size is %d for %s" % (scratchpad.__len__(), gw_id))
except KeyError:
logging.error("Unknown gateway in upload_scratchpad %s", gw_id)
return wmm.GatewayResultCode.GW_RES_INVALID_PARAM

topic = TopicGenerator.make_otap_load_scratchpad_request_topic(gw_id, sink_id)

# Check if scratchpad must be sent as chunk
if scratchpad is not None and max_size is not None and scratchpad.__len__() > max_size:
# Scratchpad must be devided in chunk
logging.info("Loading scratchpad of size: %d in chunk of %d bytes" % (scratchpad.__len__(), max_size))
return self._upload_scratchpad_as_chunks(
topic,
sink_id,
seq,
scratchpad,
max_size,
cb,
param,
timeout)
else:
# A single request is enough to upload (or clear) scratchpad
request = wmm.UploadScratchpadRequest(seq, sink_id, scratchpad=scratchpad)

self._publish(TopicGenerator.make_otap_load_scratchpad_request_topic(gw_id, sink_id),
request.payload,
1)
return self._wait_for_response(cb, request.req_id, extra_timeout=timeout, param=param)
self._publish(topic,
request.payload,
1)
return self._wait_for_response(cb, request.req_id, extra_timeout=timeout, param=param)

@_wait_for_connection
def process_scratchpad(self, gw_id, sink_id, cb=None, param=None, timeout=120):
Expand Down Expand Up @@ -1042,7 +1113,6 @@ def set_sink_config(self, gw_id, sink_id, new_config, cb=None, param=None):
return self._wait_for_response(cb, request.req_id, extra_timeout=3, param=param)

@_wait_for_connection
@_wait_for_configs
def set_config_changed_cb(self, cb):
"""
set_config_changed_cb(self, cb)
Expand All @@ -1054,6 +1124,7 @@ def set_config_changed_cb(self, cb):
and :meth:`~wirepas_mqtt_library.wirepas_network_interface.WirepasNetworkInterface.get_sinks`
can be used to discover what has changed
"""
self._wait_for_configs()
self._on_config_changed_cb = cb

# Call the cb a first time as an initial info
Expand Down
Loading