Skip to content

Commit

Permalink
Handle fragment only in dispatch task
Browse files Browse the repository at this point in the history
remove garbage collect from polling task
call periodically garbage collect
garbage collect still called on frag received for compatibility
  • Loading branch information
StephaneTriomphe committed Jan 10, 2025
1 parent 052ea55 commit 66dee86
Show file tree
Hide file tree
Showing 6 changed files with 89 additions and 59 deletions.
1 change: 1 addition & 0 deletions example/linux/gw-example/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -572,6 +572,7 @@ int main(int argc, char * argv[])
{
LOGE("MQTT failed to disconnect\n");
}
LOGI("MQTT disconnected\n");
MQTTClient_destroy(&m_client);
pthread_mutex_destroy(&m_pub_queue_mutex);
LOGI("Clean exit completed\n");
Expand Down
16 changes: 11 additions & 5 deletions lib/platform/linux/platform.c
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
// Polling interval to check for indication
#define POLLING_INTERVAL_MS 20

// Wakeup timeout for dispatch thread, mainly for garbage collection of fragments
#define DISPATCH_WAKEUP_TIMEOUT_MS 5000

// Mutex for sending, ie serial access
static pthread_mutex_t sending_mutex;

Expand Down Expand Up @@ -94,13 +97,20 @@ static pthread_cond_t m_queue_not_empty_cond = PTHREAD_COND_INITIALIZER;
*/
static void * dispatch_indication(void * unused)
{
struct timespec ts;

pthread_mutex_lock(&m_queue_mutex);
while (m_dispatch_thread_running)
{
if (m_queue_empty)
{
// Queue is empty, wait
pthread_cond_wait(&m_queue_not_empty_cond, &m_queue_mutex);
clock_gettime(CLOCK_REALTIME, &ts);
ts.tv_sec += DISPATCH_WAKEUP_TIMEOUT_MS ; // 5 second timeout
pthread_cond_timedwait(&m_queue_not_empty_cond, &m_queue_mutex, &ts);

// Force a garbage collect (to be sure it's called even if no frag are received)
reassembly_garbage_collect();

// Check if we wake up but nothing in queue
if (m_queue_empty)
Expand Down Expand Up @@ -200,10 +210,6 @@ static void * poll_for_indication(void * unused)
m_polling_thread_state_request = POLLING_THREAD_STOP;
break;
}
else
{
reassembly_garbage_collect(FRAGMENT_MAX_DURATION_S);
}
}

// Get the number of free buffers in the indication queue
Expand Down
22 changes: 3 additions & 19 deletions lib/wpc/dsap.c
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,6 @@ typedef struct
bool busy;
} packet_with_indication_t;

// Minimum period between two consecutive garbage collects of
// uncomplete fragments
// GC is anyway synchronous with received fragment so period between 2 GC
// could be much bigger if not fragments are received for a while
#define MIN_GARBAGE_COLLECT_PERIOD_S 5

// Max timeout in seconds for uncomplete fragmented packet to be discarded
// from rx queue.
static uint32_t m_fragment_max_duration_s = 0;

// Static buffer used to reassemble messages. Allocated statically
// to not have it allocated on stack dynamically. Could also be allocated
// dynamically with platform malloc, but as there is only one needed, static
Expand Down Expand Up @@ -318,7 +308,6 @@ void dsap_data_tx_indication_handler(dsap_data_tx_ind_pl_t * payload)
void dsap_data_rx_frag_indication_handler(dsap_data_rx_frag_ind_pl_t * payload,
unsigned long long timestamp_ms_epoch)
{
static unsigned long long last_gc_ts_ms = 0;
reassembly_fragment_t frag;
size_t full_size;
app_qos_e qos;
Expand Down Expand Up @@ -394,13 +383,8 @@ void dsap_data_rx_frag_indication_handler(dsap_data_rx_frag_ind_pl_t * payload,

// Do GC synchronously to avoid races as all fragment related actions happens on same thread
// and no need for an another scheduling method to add in Platform
if (m_fragment_max_duration_s > 0 &&
Platform_get_timestamp_ms_monotonic() - last_gc_ts_ms > (MIN_GARBAGE_COLLECT_PERIOD_S * 1000))
{
// Time for a new GC
reassembly_garbage_collect(m_fragment_max_duration_s);
last_gc_ts_ms = Platform_get_timestamp_ms_monotonic();
}
reassembly_garbage_collect();

}

void dsap_data_rx_indication_handler(dsap_data_rx_ind_pl_t * payload,
Expand Down Expand Up @@ -500,7 +484,7 @@ bool dsap_unregister_for_data()

bool dsap_set_max_fragment_duration(unsigned int fragment_max_duration_s)
{
m_fragment_max_duration_s = fragment_max_duration_s;
reassembly_set_max_fragment_duration(fragment_max_duration_s);
return true;
}

Expand Down
2 changes: 1 addition & 1 deletion lib/wpc/include/dsap.h
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ bool dsap_unregister_for_data();
#endif

/**
* \brief Set maximum duration to keep fragment in our buffer until packet is ful
* \brief Set maximum duration to keep fragment in our buffer until packet is full
* \param fragment_max_duration_s
* Maximum time in s to keep fragments from incomplete packets inside our buffers
*/
Expand Down
18 changes: 13 additions & 5 deletions lib/wpc/include/reassembly.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,26 @@ typedef struct {
unsigned long long timestamp; //< When was the fragment received
} reassembly_fragment_t;

/**
* \brief Set maximum duration for fragment
* \param duration_s
* the maximum duration in seconds to keep fragment from incomplete packets.
* Zero equals forever
* \return Return code of the operation
*/
void reassembly_set_max_fragment_duration(unsigned int duration_s);

/**
* \brief Initialize reassembly module
*/
void reassembly_init();

/**
* \brief Check queue emptyness
* \return true if empty, false otherwise
* \return True means that queue is empty, false that queue is most probably not empty
* \note This function can be called from any task
*/
bool reassembly_is_queue_empty(void);
bool reassembly_is_queue_empty();

/**
* \brief Add fragment to an existing full message
Expand Down Expand Up @@ -65,9 +75,7 @@ bool reassembly_get_full_message(uint32_t src_addr, uint16_t packet_id, uint8_t
/**
* \brief Clear all the uncomplete fragmented message that have no activity for \ref timeout_s
*
* \param timeout_s
* Limit in second for inactivity before message being deleted
*/
void reassembly_garbage_collect(uint32_t timeout_s);
void reassembly_garbage_collect();

#endif //REASSEMBLY_H__
89 changes: 60 additions & 29 deletions lib/wpc/reassembly/reassembly.c
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@

#include "platform.h"

// Minimum period between two consecutive garbage collects of uncomplete fragments
#define MIN_GARBAGE_COLLECT_PERIOD_S 5

/* undefine the defaults */
#undef uthash_malloc
#undef uthash_free
Expand Down Expand Up @@ -52,11 +55,25 @@ typedef struct
UT_hash_handle hh;
} full_packet_t;

/**
* Hash containing all the fragmented packet under construction
*/
// Max timeout in seconds for uncomplete fragmented packet to be discarded
// from rx queue.
static uint32_t m_fragment_max_duration_s = 0;

// Hash containing all the fragmented packet under construction
static full_packet_t * m_packets = NULL;

// Keep track of the queue emptyness, to get info from other tasks
// True indicate that queue is empty, false indicate that queue is most probably not empty
static bool m_is_queue_empty;

// Timestamp of last garbage collect
static unsigned long long m_last_gc_ts_ms;

void reassembly_set_max_fragment_duration(unsigned int fragment_max_duration_s)
{
m_fragment_max_duration_s = fragment_max_duration_s;
}

static full_packet_t * get_packet_from_hash(uint32_t src_add, uint16_t packet_id)
{
full_packet_t * p;
Expand Down Expand Up @@ -221,6 +238,8 @@ static bool reassemble_full_packet(full_packet_t * full_packet_p, uint8_t * buff
// release also full packet struct form hash
HASH_DEL(m_packets, full_packet_p);
Platform_free(full_packet_p, sizeof(full_packet_t));

m_is_queue_empty = (HASH_COUNT(m_packets) == 0);
return true;
}

Expand All @@ -229,16 +248,19 @@ void reassembly_init()
// Nothing to do at the moment
}

bool reassembly_is_queue_empty(void)
bool reassembly_is_queue_empty()
{
return HASH_COUNT(m_packets) == 0;
return m_is_queue_empty;
}

bool reassembly_add_fragment(reassembly_fragment_t * frag, size_t * full_size_p)
{
full_packet_t *full_packet_p;

*full_size_p = 0;

// set the queue empty flag in advance, even if we fail later (corrected by garbage collection)
m_is_queue_empty = false;

// Get packet or create it
full_packet_p = get_packet_from_hash(frag->src_add, frag->packet_id);
Expand Down Expand Up @@ -288,32 +310,41 @@ bool reassembly_get_full_message(uint32_t src_add, uint16_t packet_id, uint8_t *

}

void reassembly_garbage_collect(uint32_t timeout_s)
void reassembly_garbage_collect()
{
full_packet_t *fp, *tmp;
uint32_t messages_removed = 0;
HASH_ITER(hh, m_packets, fp, tmp) {
uint32_t last_activity =
(Platform_get_timestamp_ms_monotonic() - fp->timestamp_ms_epoch_last) / 1000;

/* Check if message is not getting too old */
if (last_activity > timeout_s)
{
LOGW("Fragmented message from src %u with id %u has no activity for more than %u s => delete it\n",
fp->key.src_add, fp->key.packet_id, timeout_s);

internal_fragment_t *f, *tmp;
LL_FOREACH_SAFE(fp->head, f, tmp) {
Platform_free(f->bytes, f->size);
LL_DELETE(fp->head, f);
Platform_free(f, sizeof(internal_fragment_t));
if (m_fragment_max_duration_s > 0 &&
Platform_get_timestamp_ms_monotonic() - m_last_gc_ts_ms > (MIN_GARBAGE_COLLECT_PERIOD_S * 1000))
{
// Time for a new GC
m_last_gc_ts_ms = Platform_get_timestamp_ms_monotonic();

full_packet_t *fp, *tmp;
uint32_t messages_removed = 0;
HASH_ITER(hh, m_packets, fp, tmp) {
uint32_t last_activity =
(Platform_get_timestamp_ms_monotonic() - fp->timestamp_ms_epoch_last) / 1000;

/* Check if message is not getting too old */
if (last_activity > m_fragment_max_duration_s)
{
LOGW("Fragmented message from src %u with id %u has no activity for more than %u s => delete it\n",
fp->key.src_add, fp->key.packet_id, m_fragment_max_duration_s);

internal_fragment_t *f, *tmp;
LL_FOREACH_SAFE(fp->head, f, tmp) {
Platform_free(f->bytes, f->size);
LL_DELETE(fp->head, f);
Platform_free(f, sizeof(internal_fragment_t));
}

// release also full packet struct from hash
HASH_DEL(m_packets, fp);
Platform_free(fp, sizeof(full_packet_t));
messages_removed ++;
}

// release also full packet struct from hash
HASH_DEL(m_packets, fp);
Platform_free(fp, sizeof(full_packet_t));
messages_removed ++;
}
LOGD("GC: %d message removed\n", messages_removed);

m_is_queue_empty = (HASH_COUNT(m_packets) == 0);
}
LOGD("GC: %d message removed\n", messages_removed);
}

0 comments on commit 66dee86

Please sign in to comment.