Skip to content

Commit

Permalink
Adapt gateway example to new event status size definition
Browse files Browse the repository at this point in the history
Update also last will size
Use malloc to get buffer for message to publish, free in delivered callback
handle case when user or pwd is not set (avoid crash)
Connect possible with --noSSL option
  • Loading branch information
StephaneTriomphe committed Sep 24, 2024
1 parent b856b6c commit 9036a47
Showing 1 changed file with 129 additions and 46 deletions.
175 changes: 129 additions & 46 deletions example/linux/gw-example/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@
static char m_gateway_id[32] = "\0";
static char m_user[64] = "\0";
static char m_password[128] = "\0";
static bool use_ssl = true;

static uint8_t m_proto_buffer[WPC_PROTO_MAX_RESPONSE_SIZE];
static uint8_t m_proto_response_buffer[WPC_PROTO_MAX_RESPONSE_SIZE];

static pthread_t m_thread_publish;
// Mutex for publishing on MQTT
Expand All @@ -45,14 +46,15 @@ static pthread_cond_t m_pub_queue_not_empty_cond = PTHREAD_COND_INITIALIZER;
typedef struct
{
char topic[64]; //< Topic to publish
uint8_t payload[1024]; //< The payload
uint8_t* payload_p; //< The payload
size_t payload_size;
bool retained;
MQTTClient_deliveryToken token;
} message_to_publish_t;

#define PUBLISH_QUEUE_SIZE 16
#define PUBLISH_QUEUE_SIZE 32
// Publish queue
static message_to_publish_t m_publish_queue[PUBLISH_QUEUE_SIZE];
static message_to_publish_t m_publish_queue[PUBLISH_QUEUE_SIZE] = { 0 };

// Head of the queue for the polling thread to write
static unsigned int m_pub_queue_write = 0;
Expand All @@ -74,23 +76,43 @@ static bool MQTT_publish(char * topic, uint8_t * payload, size_t payload_size, b

if (!m_pub_queue_empty && (m_pub_queue_write == m_pub_queue_read))
{
LOGE("Unable to publish, message queue full pos %d\n", m_pub_queue_write);
ret = false;
}
else
{
// Insert our publish
message_p = &m_publish_queue[m_pub_queue_write];
// TODO add check on size
strcpy(message_p->topic, topic);
memcpy(message_p->payload, payload, payload_size);
message_p->payload_size = payload_size;
message_p->retained = retained;
if(message_p->payload_p != NULL)
{
LOGE("Overwriting message pos %d on topic %s, delivered info not received\n",
m_pub_queue_write,
message_p->topic);
free(message_p->payload_p);
}

m_pub_queue_write = (m_pub_queue_write + 1) % PUBLISH_QUEUE_SIZE;
message_p->payload_p = malloc(payload_size);

pthread_cond_signal(&m_pub_queue_not_empty_cond);
m_pub_queue_empty = false;
ret = true;
if (message_p->payload_p == NULL)
{
LOGE("Unable to publish, not enough memory\n");
ret = false;
}
else
{
strcpy(message_p->topic, topic);
memcpy(message_p->payload_p, payload, payload_size);
message_p->payload_size = payload_size;
message_p->retained = retained;
LOGI("Message pushed in the queue pos %d\n", m_pub_queue_write);

m_pub_queue_write = (m_pub_queue_write + 1) % PUBLISH_QUEUE_SIZE;

pthread_cond_signal(&m_pub_queue_not_empty_cond);
m_pub_queue_empty = false;
ret = true;

}
}

pthread_mutex_unlock(&m_pub_queue_mutex);
Expand Down Expand Up @@ -125,19 +147,30 @@ static void * publish_thread(void * unused)
// Release the lock as we don't need it to publish
pthread_mutex_unlock(&m_pub_queue_mutex);

// Queue our message
MQTTClient_message pubmsg = MQTTClient_message_initializer;
MQTTClient_deliveryToken token;

pubmsg.payloadlen = message->payload_size;
pubmsg.payload = message->payload;
pubmsg.qos = 1;
pubmsg.retained = message->retained;


LOGD("Publishing on topic %s\n", message->topic);
rc = MQTTClient_publishMessage(m_client, message->topic, &pubmsg, &token);
LOGI("Message with token %d published rc=%d topic=%s\n", token, rc, message->topic);
if (message->payload_p == NULL)
{
LOGE("Unable to publish pos %d, message payload is NULL\n", m_pub_queue_read);
}
else
{
// Queue our message
MQTTClient_message pubmsg = MQTTClient_message_initializer;

pubmsg.payloadlen = message->payload_size;
pubmsg.payload = message->payload_p;
pubmsg.qos = 1;
pubmsg.retained = message->retained;

rc = MQTTClient_publishMessage(m_client,
message->topic,
&pubmsg,
&message->token);
LOGI("Message pos %d with token %d published rc=%d topic=%s\n",
m_pub_queue_read,
message->token,
rc,
message->topic);
}

// Take the lock to update queue indexes and to wait again on condition
pthread_mutex_lock(&m_pub_queue_mutex);
Expand All @@ -155,9 +188,20 @@ static void * publish_thread(void * unused)

static void on_mqtt_message_delivered(void *context, MQTTClient_deliveryToken dt)
{
// For now, only informal but message should be cleared from queue only when
// delivered and not published as of today
// Warning : this is called only if Qos is not 0
LOGI("Message with token %d delivery confirmed\n", dt);
for (unsigned int i = 0; i < PUBLISH_QUEUE_SIZE; i++)
{
if (m_publish_queue[i].token == dt)
{
// Free the payload
free(m_publish_queue[i].payload_p);
m_publish_queue[i].payload_p = NULL;
return;
}
}
// Token not found
LOGE("Message with token %d not found\n", dt);
}

static int on_message_rx_mqtt(void *context, char *topic, int topic_len, MQTTClient_message *message)
Expand All @@ -167,9 +211,9 @@ static int on_message_rx_mqtt(void *context, char *topic, int topic_len, MQTTCli
app_proto_res_e res;

LOGD("Message received on topic %s\n", topic);
response_size = sizeof(m_proto_buffer);
response_size = sizeof(m_proto_response_buffer);

res = WPC_Proto_handle_request(message->payload, message->payloadlen, m_proto_buffer, &response_size);
res = WPC_Proto_handle_request(message->payload, message->payloadlen, m_proto_response_buffer, &response_size);
if (res == APP_RES_PROTO_OK)
{
// response_topic is same as request with substitution of request with response
Expand All @@ -188,7 +232,7 @@ static int on_message_rx_mqtt(void *context, char *topic, int topic_len, MQTTCli
"gw-response/%s",
topic + 11); // Everything after gw-request/
LOGI("Response generated of size = %d for topic: %s\n", response_size, response_topic_p);
MQTT_publish(response_topic_p, m_proto_buffer, response_size, false);
MQTT_publish(response_topic_p, m_proto_response_buffer, response_size, false);
free(response_topic_p);
}
}
Expand All @@ -209,27 +253,42 @@ static bool reconnect(uint32_t timeout_s)
size_t proto_size;
char topic_status[sizeof(TOPIC_EVENT_PREFIX) + sizeof(m_gateway_id) + 1];
char topic_all_requests[16 + sizeof(m_gateway_id)]; //"gw-request/+/<gateway_id/#"
uint8_t last_will_message[128];
MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
MQTTClient_willOptions will_options = MQTTClient_willOptions_initializer;
MQTTClient_SSLOptions ssl_options = MQTTClient_SSLOptions_initializer;

conn_opts.keepAliveInterval = 20;
conn_opts.cleansession = 1;
conn_opts.ssl = &ssl_options;
if (!use_ssl)
{
conn_opts.ssl = NULL;
LOGI("Connect without SSL\n");
}
else
{
conn_opts.ssl = &ssl_options;
}

conn_opts.username = m_user;
conn_opts.password = m_password;

// Allocate needed buffer for event status
uint8_t * event_status_p = malloc(WPC_PROTO_MAX_EVENTSTATUS_SIZE);
if (event_status_p == NULL)
{
LOGE("Not enough memory for event status buffer");
return false;
}

// Setup last will
proto_size = sizeof(last_will_message);
if (WPC_Proto_get_current_event_status(false, last_will_message, &proto_size) == APP_RES_PROTO_OK)
proto_size = WPC_PROTO_MAX_EVENTSTATUS_SIZE;
if (WPC_Proto_get_current_event_status(false, event_status_p, &proto_size) == APP_RES_PROTO_OK)
{
will_options.topicName = topic_status;
will_options.qos = 1;
will_options.retained = 1;
will_options.payload.len = proto_size;
will_options.payload.data = last_will_message;
will_options.payload.data = event_status_p;
conn_opts.will = &will_options;
}

Expand Down Expand Up @@ -259,15 +318,18 @@ static bool reconnect(uint32_t timeout_s)
if (!MQTTClient_isConnected(m_client))
{
LOGE("Failed to connect, return code %d\n", rc);
free(event_status_p);
return false;
}


// Set our current status
proto_size = sizeof(m_proto_buffer);
if (WPC_Proto_get_current_event_status(true, m_proto_buffer, &proto_size) == APP_RES_PROTO_OK)
proto_size = WPC_PROTO_MAX_EVENTSTATUS_SIZE;
if (WPC_Proto_get_current_event_status(true, event_status_p, &proto_size) == APP_RES_PROTO_OK)
{
MQTT_publish(topic_status, m_proto_buffer, proto_size, true);
MQTT_publish(topic_status, event_status_p, proto_size, true);
}
free(event_status_p);

// Register for request topic
if ((rc = MQTTClient_subscribe(m_client, topic_all_requests, 1)) != MQTTCLIENT_SUCCESS)
Expand Down Expand Up @@ -356,9 +418,17 @@ static void onEventStatus_cb(uint8_t * event_p, size_t event_size)
LOGI("EventStatus to publish size = %d\n", event_size);
}


void print_usage() {
printf("Usage: gw-example -p <port name> [-b <baudrate>] -g <gateway_id> -H <mqtt hostname> [-U <mqtt user>] [-P <mqtt password>]\n");
printf("Usage: gw-example [OPTIONS]\n");
printf("Options:\n");
printf(" -p, --port <port name> Serial port name\n");
printf(" -b, --baudrate <baudrate> Baudrate (default: %d)\n", DEFAULT_BAUDTRATE);
printf(" -g, --gateway <gateway_id> Gateway ID\n");
printf(" -H, --host <mqtt hostname> MQTT hostname\n");
printf(" -U, --user <mqtt user> MQTT username\n");
printf(" -P, --password <password> MQTT password\n");
printf(" -S, --noSSL Disable SSL\n");
printf(" -h, --help Display this help message\n");
}

int main(int argc, char * argv[])
Expand All @@ -367,15 +437,25 @@ int main(int argc, char * argv[])
int c;
char * port_name = NULL;
char * mqtt_host = NULL;
char * mqtt_user = NULL;
char * mqtt_password = NULL;
char * gateway_id;
char * mqtt_user = "";
char * mqtt_password = "";
char * gateway_id = NULL;
pthread_mutexattr_t attr;
pthread_mutexattr_init(&attr);
pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_ERRORCHECK);

// Parse the arguments
while ((c = getopt(argc, argv, "b:p:g:U:P:H:h")) != -1)
static struct option long_options[]
= { { "baudrate", required_argument, 0, 'b' },
{ "port", required_argument, 0, 'p' },
{ "gateway", required_argument, 0, 'g' },
{ "host", required_argument, 0, 'H' },
{ "user", required_argument, 0, 'U' },
{ "password", required_argument, 0, 'P' },
{ "noSSL", no_argument, 0, 'S' },
{ "help", no_argument, 0, 'h' },
{ 0, 0, 0, 0 } };
while ((c = getopt_long(argc, argv, "b:p:g:U:P:SH:h?", long_options, NULL)) != -1)
{
switch (c)
{
Expand All @@ -397,12 +477,15 @@ int main(int argc, char * argv[])
case 'P':
mqtt_password = optarg;
break;
case 'S':
use_ssl = false;
break;
case 'h':
case '?':
print_usage();
default:
return -1;
}
}
}

if (!port_name || !gateway_id ||!mqtt_host)
Expand Down

0 comments on commit 9036a47

Please sign in to comment.