Skip to content

Commit

Permalink
Check that all are reassembled before closing
Browse files Browse the repository at this point in the history
change WPC close order, add a check to fragment list.
Adapt gw-example to do clean exit
  • Loading branch information
StephaneTriomphe committed Jan 6, 2025
1 parent 7fddd04 commit 0dc596e
Show file tree
Hide file tree
Showing 7 changed files with 144 additions and 28 deletions.
41 changes: 33 additions & 8 deletions example/linux/gw-example/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
*/

#include <getopt.h>
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
Expand Down Expand Up @@ -43,6 +44,10 @@ static pthread_t m_thread_publish;
static pthread_mutex_t m_pub_queue_mutex;
static pthread_cond_t m_pub_queue_not_empty_cond = PTHREAD_COND_INITIALIZER;

char topic_all_requests[16 + sizeof(m_gateway_id)]; //"gw-request/+/<gateway_id/#"

static volatile bool running = true;

// Statically allocated but could be mallocated
typedef struct
{
Expand All @@ -69,6 +74,11 @@ static bool m_pub_queue_empty = true;

static MQTTClient m_client = NULL;

static void signal_handler(int signum)
{
running = false;
}

static bool MQTT_publish(char * topic, uint8_t * payload, size_t payload_size, bool retained)
{
message_to_publish_t * message_p;
Expand Down Expand Up @@ -242,7 +252,6 @@ static bool reconnect(uint32_t timeout_s)
int rc;
size_t proto_size;
char topic_status[sizeof(TOPIC_EVENT_PREFIX) + sizeof(m_gateway_id) + 1];
char topic_all_requests[16 + sizeof(m_gateway_id)]; //"gw-request/+/<gateway_id/#"
MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
MQTTClient_willOptions will_options = MQTTClient_willOptions_initializer;
MQTTClient_SSLOptions ssl_options = MQTTClient_SSLOptions_initializer;
Expand Down Expand Up @@ -289,11 +298,6 @@ static bool reconnect(uint32_t timeout_s)
TOPIC_EVENT_PREFIX,
m_gateway_id);

snprintf(topic_all_requests,
sizeof(topic_all_requests),
"gw-request/+/%s/#",
m_gateway_id);

while (timeout_s > 0)
{
if ((rc = MQTTClient_connect(m_client, &conn_opts)) == MQTTCLIENT_SUCCESS)
Expand Down Expand Up @@ -331,6 +335,17 @@ static bool reconnect(uint32_t timeout_s)
return true;
}

static bool mqtt_unsubscribe_topics(void)
{
if (MQTTClient_unsubscribe(m_client, topic_all_requests) != MQTTCLIENT_SUCCESS)
{
LOGE("Failed to unsubscribe from topic %s\n", topic_all_requests);
return false;
}
LOGI("Successfully unsubscribed from topic %s\n", topic_all_requests);
return true;
}

static void on_mqtt_connection_lost(void *context, char *cause)
{
LOGE("Connection lost\n");
Expand All @@ -345,6 +360,8 @@ static bool MQTT_connect(uint32_t timeout_s,

MQTTClient_init_options global_init_options = MQTTClient_init_options_initializer;
global_init_options.do_openssl_init = true;

snprintf(topic_all_requests, sizeof(topic_all_requests), "gw-request/+/%s/#", m_gateway_id);

MQTTClient_global_init(&global_init_options);

Expand Down Expand Up @@ -434,6 +451,9 @@ int main(int argc, char * argv[])
pthread_mutexattr_init(&attr);
pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_ERRORCHECK);

signal(SIGINT, signal_handler);
signal(SIGTERM, signal_handler);

// Parse the arguments
static struct option long_options[]
= { { "baudrate", required_argument, 0, 'b' },
Expand Down Expand Up @@ -540,10 +560,15 @@ int main(int argc, char * argv[])

LOGI("Starting gw with id %s on host %s\n", gateway_id, mqtt_host);

for (;;)
while (running)
{
sleep(2);
}

LOGE("End of program\n");
LOGI("Clean exit requested\n");
mqtt_unsubscribe_topics();
WPC_Proto_close();
MQTTClient_destroy(&m_client);
pthread_mutex_destroy(&m_pub_queue_mutex);
LOGI("Clean exit completed\n");
}
7 changes: 6 additions & 1 deletion lib/api/wpc_proto.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,17 @@
#include <stddef.h>
#include <stdint.h>

/**
* Max default delay to keep incomplete fragmented packet inside our buffers
*/
#define FRAGMENT_MAX_DURATION_S 45

/**
* Max possible overhead estimation for wp_GenericMessage
* compared to specific single message. Should be added to
* size of single message to estimate the max sized occupied
* by the full proto encoded message */
* by the full proto encoded message
*/
#define WPC_PROTO_GENERIC_MESSAGE_OVERHEAD 20

/*
Expand Down
83 changes: 67 additions & 16 deletions lib/platform/linux/platform.c
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
* See file LICENSE for full license details.
*
*/
#include <errno.h>
#include <stdbool.h>
#include <stdlib.h>
#include <pthread.h>
Expand All @@ -14,6 +15,8 @@
#define MAX_LOG_LEVEL INFO_LOG_LEVEL
#include "logger.h"
#include "platform.h"
#include "reassembly.h"
#include "wpc_proto.h"

// Maximum number of indication to be retrieved from a single poll
#define MAX_NUMBER_INDICATION 30
Expand All @@ -27,8 +30,14 @@ static pthread_mutex_t sending_mutex;
// This thread is used to poll for indication
static pthread_t thread_polling;

// Set to false to stop polling thread execution
static bool m_polling_thread_running;
typedef enum {
POLLING_THREAD_RUN,
POLLING_THREAD_STOP,
POLLING_THREAD_STOP_REQUESTED
} polling_thread_state_t;

// Request to handle polling thread state
static polling_thread_state_t m_polling_thread_state_request = POLLING_THREAD_STOP;

// This thread is used to dispatch indication
static pthread_t thread_dispatch;
Expand Down Expand Up @@ -170,10 +179,33 @@ static void * poll_for_indication(void * unused)
// Initially wait for 500ms before any polling
uint32_t wait_before_next_polling_ms = 500;

while (m_polling_thread_running)
m_polling_thread_state_request = POLLING_THREAD_RUN;

while (m_polling_thread_state_request != POLLING_THREAD_STOP)
{
usleep(wait_before_next_polling_ms * 1000);

if(m_polling_thread_state_request == POLLING_THREAD_STOP_REQUESTED)
{
if (!m_queue_empty)
{
// Dispatch did not process all indications, just wait for it to
wait_before_next_polling_ms = POLLING_INTERVAL_MS;
continue;
}

if (reassembly_is_queue_empty())
{
LOGI("Reassembly queue is empty, exiting polling thread\n");
m_polling_thread_state_request = POLLING_THREAD_STOP;
break;
}
else
{
reassembly_garbage_collect(FRAGMENT_MAX_DURATION_S);
}
}

// Get the number of free buffers in the indication queue
// Note: No need to lock the queue as only m_ind_queue_read can be updated
// and could still be modified when we release the lock after computing
Expand Down Expand Up @@ -207,16 +239,26 @@ static void * poll_for_indication(void * unused)
free_buffer_room = m_ind_queue_read - m_ind_queue_write;
}
}

max_num_indication = free_buffer_room > MAX_NUMBER_INDICATION ?
MAX_NUMBER_INDICATION :
free_buffer_room;

if (m_polling_thread_state_request == POLLING_THREAD_STOP_REQUESTED)
{
// In case we are about to stop, let's poll only one by one to have more chance to
// finish uncomplete fragmented packet and not start to receive a new one
max_num_indication = 1;
LOGD("Poll for one more fragment to empty reassembly queue\n");
}
else
{
// Let's read max indications that can fit in the queue
max_num_indication = MIN(MAX_NUMBER_INDICATION, free_buffer_room);
}

LOGD("Poll for %d indications\n", max_num_indication);

get_ind_res = m_get_indication_f(max_num_indication, onIndicationReceivedLocked);

if (get_ind_res == 1)
if ((get_ind_res == 1)
&& (m_polling_thread_state_request != POLLING_THREAD_STOP_REQUESTED))
{
// Still pending indication, only wait 1 ms to give a chance
// to other threads but not more to have better throughput
Expand All @@ -226,6 +268,7 @@ static void * poll_for_indication(void * unused)
{
// In case of error or if no more indication, just wait
// the POLLING INTERVAL to avoid polling all the time
// In case of stop request, wait for to give time to push data received
wait_before_next_polling_ms = POLLING_INTERVAL_MS;
}
}
Expand All @@ -242,7 +285,14 @@ bool Platform_lock_request()
{
// It must never happen but add a check and
// return to avoid a deadlock
LOGE("Mutex already locked %d\n", res);
if (res == EINVAL)
{
LOGW("Mutex no longer exists (destroyed)\n");
}
else
{
LOGE("Mutex lock failed %d\n", res);
}
return false;
}
return true;
Expand Down Expand Up @@ -320,7 +370,6 @@ bool Platform_init(Platform_get_indication_f get_indication_f,
goto error2;
}

m_polling_thread_running = true;
// Start a thread to poll for indication
if (pthread_create(&thread_polling, NULL, poll_for_indication, NULL) != 0)
{
Expand Down Expand Up @@ -352,21 +401,23 @@ void Platform_close()
{
void * res;
pthread_t cur_thread = pthread_self();
// Signal our dispatch thread to stop
m_dispatch_thread_running = false;
// Signal condition to wakeup thread
pthread_cond_signal(&m_queue_not_empty_cond);

// Signal our polling thread to stop
// No need to signal it as it will wakeup periodically
m_polling_thread_running = false;
m_polling_thread_state_request = POLLING_THREAD_STOP_REQUESTED;

// Wait for both tread to finish
// Wait for polling tread to finish
if (cur_thread != thread_polling)
{
pthread_join(thread_polling, &res);
}

// Signal our dispatch thread to stop
m_dispatch_thread_running = false;
// Signal condition to wakeup thread
pthread_cond_signal(&m_queue_not_empty_cond);

// Wait for dispatch tread to finish
if (cur_thread != thread_dispatch)
{
pthread_join(thread_dispatch, &res);
Expand Down
6 changes: 6 additions & 0 deletions lib/wpc/include/reassembly.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@ typedef struct {
*/
void reassembly_init();

/**
* \brief Check queue emptyness
* \return true if empty, false otherwise
*/
bool reassembly_is_queue_empty(void);

/**
* \brief Add fragment to an existing full message
* Full message holder will be created if first fragment
Expand Down
26 changes: 26 additions & 0 deletions lib/wpc/include/util.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,32 @@
#ifndef UTIL_H__
#define UTIL_H__

/**
* \brief Get the maximum value between two parameters
* \param a First value to compare
* \param b Second value to compare
* \return The highest value between a and b
*/
#define MAX(a, b) \
({ \
__typeof__(a) _a = (a); \
__typeof__(b) _b = (b); \
_a > _b ? _a : _b; \
})

/**
* \brief Get the minimum value between two parameters
* \param a First value to compare
* \param b Second value to compare
* \return The lowest value between a and b
*/
#define MIN(a, b) \
({ \
__typeof__(a) _a = (a); \
__typeof__(b) _b = (b); \
_a < _b ? _a : _b; \
})

/**
* \brief Function for encoding a uint16 value in Little endian.
* \param value
Expand Down
5 changes: 5 additions & 0 deletions lib/wpc/reassembly/reassembly.c
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,11 @@ void reassembly_init()
// Nothing to do at the moment
}

bool reassembly_is_queue_empty(void)
{
return HASH_COUNT(m_packets) == 0;
}

bool reassembly_add_fragment(reassembly_fragment_t * frag, size_t * full_size_p)
{
full_packet_t *full_packet_p;
Expand Down
4 changes: 1 addition & 3 deletions lib/wpc_proto/wpc_proto.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@
// Max possible size of encoded message
#define MAX_PROTOBUF_SIZE WP_CONFIG_MESSAGE_PB_H_MAX_SIZE

/* max default delay to keep incomplete fragmented packet inside our buffers */
#define FRAGMENT_MAX_DURATION_S 45
/* max default delay for poll fail duration */
/* 120s should cover most scratchpad exchanges and image processing. Sink is
not answearing during that time */
Expand Down Expand Up @@ -113,11 +111,11 @@ app_proto_res_e WPC_Proto_initialize(const char * port_name,

void WPC_Proto_close()
{
WPC_close();
WPC_unregister_from_stack_status();
Proto_otap_close();
Proto_config_close();
Proto_data_close();
WPC_close();
}

app_proto_res_e WPC_Proto_register_for_data_rx_event(onDataRxEvent_cb_f onDataRxEvent_cb)
Expand Down

0 comments on commit 0dc596e

Please sign in to comment.