diff --git a/main.cpp b/main.cpp index c2b2c16..813a909 100644 --- a/main.cpp +++ b/main.cpp @@ -67,19 +67,19 @@ void appIntubator(char* const argv[]) { // {{{ } } // }}} -void networkPacketExhangeWorker(LoRaPacketTrafficStats_t *loraPacketStats, +void networkPacketExchangeWorker(LoRaPacketTrafficStats_t *loraPacketStats, std::vector *servers) { // {{{ static std::function isValidUplinkAck = - [](char* origMsg, int origMsgSz, char* respMsg, int respMsgSz) { + [](char* origMsg, int origMsgSz, char* respMsg, int respMsgSz) { // {{{ if (origMsg != nullptr && origMsgSz > 4 && respMsg != nullptr && respMsgSz >= 4) { return origMsg[0] == respMsg[0] && origMsg[1] == respMsg[1] && origMsg[2] == respMsg[2] && (respMsg[3] == PKT_PUSH_ACK || respMsg[3] == PKT_PULL_ACK); } return false; - }; + }; // }}} static std::function isValidDownlinkPkt = - [](char *origMsg, int origMsgSz, char* respErrorJsonMsg, int *maxRespErrorJsonMsgSz) { + [](char *origMsg, int origMsgSz, char* respErrorJsonMsg, int *maxRespErrorJsonMsgSz) { // {{{ if (origMsg != nullptr && origMsgSz > 15 && origMsg[0] == PROTOCOL_VERSION && origMsg[3] == PKT_PULL_RESP) { @@ -96,15 +96,15 @@ void networkPacketExhangeWorker(LoRaPacketTrafficStats_t *loraPacketStats, } return false; - }; + }; // }}} - struct TxPacket { + struct TxPacket { // {{{ PackagedDataToSend_t packet; Direction direction; TxPacket(PackagedDataToSend_t&& pkt, Direction directn) : packet(std::move(pkt)), direction(directn) { } - }; + }; // }}} char downlinkMsg[RX_BUFF_DOWN_SIZE]; bool iterateImmediately; @@ -145,7 +145,7 @@ void networkPacketExhangeWorker(LoRaPacketTrafficStats_t *loraPacketStats, { printf("(%s) Requeued the %s packet.\n", asciiTime, (direction == UP_TX ? "uplink" : "downlink fetch request")); } fflush(stdout); } - else + else if (direction == UP_TX && packet.data_type == UPLINK_PUSH) { ++(loraPacketStats->acked_forw_packets); } } } @@ -272,7 +272,7 @@ int main(int argc, char **argv) { schedPrio.sched_priority = sched_get_priority_max(SCHED_RR) - 10; sched_setscheduler(0, SCHED_RR, (const sched_param*) &schedPrio); - std::thread packetExchanger{networkPacketExhangeWorker, &loraPacketStats, &cfg.servers}; + std::thread packetExchanger{networkPacketExchangeWorker, &loraPacketStats, &cfg.servers}; if (useIntubator) { std::thread intubator{appIntubator, argv}; intubator.detach(); @@ -287,12 +287,8 @@ int main(int argc, char **argv) { nextStatUpdateTime = currTime + sendStatPktIntervalSeconds; PublishStatProtocolPacket(cfg, loraPacketStats); - ++loraPacketStats.forw_packets_crc_good; - ++loraPacketStats.forw_packets; PublishLoRaDownlinkProtocolPacket(cfg); - ++loraPacketStats.forw_packets_crc_good; - ++loraPacketStats.forw_packets; } if (!keepRunning) break; @@ -301,6 +297,7 @@ int main(int argc, char **argv) { lora, cfg, loraDataPacket, msg, loraPacketStats); if (lastRecvResult == LoRaRecvStat::DATARECV) { + ++loraPacketStats.forw_packets; PublishLoRaUplinkProtocolPacket(cfg, loraDataPacket); lastRFInteractionTime = std::time(nullptr); } else if (keepRunning && lastRecvResult == LoRaRecvStat::NODATA) { diff --git a/smtUdpPacketForwarder/UdpUtils.cpp b/smtUdpPacketForwarder/UdpUtils.cpp index be68ff1..39766c2 100644 --- a/smtUdpPacketForwarder/UdpUtils.cpp +++ b/smtUdpPacketForwarder/UdpUtils.cpp @@ -18,7 +18,7 @@ static const std::map direction_to_mutex = { }; static Server_t NO_SERVER; -PackagedDataToSend_t NO_PACKAGED_DATA{0UL, 0UL, {}, NO_SERVER}; +PackagedDataToSend_t NO_PACKAGED_DATA{0UL, STAT_PUSH, 0UL, {}, NO_SERVER}; static std::map > hostname_cache; @@ -123,7 +123,7 @@ bool RecvUdp(Server_t &server, char *msg, int size, uint8_t *packet = new uint8_t[j - 3]; memcpy(packet, msg + 4, j - 4); packet[j - 4] = '\0'; - EnqueuePacket(packet, j - 4, server, DOWN_RX); + EnqueuePacket(packet, j - 4, DOWNLINK_TRANSMIT, server, DOWN_RX); return true; } @@ -252,7 +252,7 @@ bool RequeuePacket(PackagedDataToSend_t &&packet, uint32_t maxAttempts, Directio return true; } -void EnqueuePacket(uint8_t *data, uint32_t data_length, Server_t& dest, Direction direction) // {{{ +void EnqueuePacket(uint8_t *data, uint32_t data_length, PackagedDataContentType_t data_type, Server_t& dest, Direction direction) // {{{ { if (data == nullptr) return; @@ -267,7 +267,7 @@ void EnqueuePacket(uint8_t *data, uint32_t data_length, Server_t& dest, Directio return; } - PackagedDataToSend_t packaged_data{ 0UL, data_length, data, dest }; + PackagedDataToSend_t packaged_data{ 0UL, data_type, data_length, data, dest }; direction_to_queue.at(direction).push(std::move(packaged_data)); } // }}} @@ -295,6 +295,7 @@ PackagedDataToSend_t DequeuePacket(Direction direction) // {{{ void PublishStatProtocolPacket(PlatformInfo_t &cfg, LoRaPacketTrafficStats_t &pktStats) // {{{ { // see https://github.com/Lora-net/packet_forwarder/blob/master/PROTOCOL.TXT + // also see document ANNWS.01.2.1.W.SYS char status_report[STATUS_MSG_SIZE]; /* status report as a JSON object */ char stat_timestamp[24]; @@ -375,7 +376,7 @@ void PublishStatProtocolPacket(PlatformInfo_t &cfg, LoRaPacketTrafficStats_t &pk size_t packet_sz = stat_index + json.size(); uint8_t *packet = new uint8_t[packet_sz]; memcpy(packet, status_report, packet_sz); - EnqueuePacket(packet, packet_sz, serv, UP_TX); + EnqueuePacket(packet, packet_sz, STAT_PUSH, serv, UP_TX); } } // }}} @@ -383,6 +384,7 @@ void PublishStatProtocolPacket(PlatformInfo_t &cfg, LoRaPacketTrafficStats_t &pk void PublishLoRaUplinkProtocolPacket(PlatformInfo_t &cfg, LoRaDataPkt_t &loraPacket) // {{{ { // see https://github.com/Lora-net/packet_forwarder/blob/master/PROTOCOL.TXT + // also see document ANNWS.01.2.1.W.SYS char buff_up[TX_BUFF_UP_SIZE]; /* buffer to compose the upstream packet */ int buff_index = 0; @@ -480,7 +482,7 @@ void PublishLoRaUplinkProtocolPacket(PlatformInfo_t &cfg, LoRaDataPkt_t &loraPac size_t packet_sz = buff_index + json.size(); uint8_t *packet = new uint8_t[packet_sz]; memcpy(packet, buff_up, packet_sz); - EnqueuePacket(packet, packet_sz, serv, UP_TX); + EnqueuePacket(packet, packet_sz, UPLINK_PUSH, serv, UP_TX); } } // }}} @@ -511,6 +513,6 @@ void PublishLoRaDownlinkProtocolPacket(PlatformInfo_t &cfg) // {{{ uint8_t *packet = new uint8_t[sizeof(buff_up)]; memcpy(packet, buff_up, sizeof(buff_up)); - EnqueuePacket(packet, sizeof(buff_up), serv, DOWN_TX); + EnqueuePacket(packet, sizeof(buff_up), DOWNLINK_REQ, serv, DOWN_TX); } } // }}} diff --git a/smtUdpPacketForwarder/UdpUtils.h b/smtUdpPacketForwarder/UdpUtils.h index 678c3c3..312a045 100644 --- a/smtUdpPacketForwarder/UdpUtils.h +++ b/smtUdpPacketForwarder/UdpUtils.h @@ -50,21 +50,27 @@ #define BASE64_MAX_LENGTH 341 +typedef enum PackagedDataContentType : char +{ + STAT_PUSH = 0, UPLINK_PUSH, DOWNLINK_REQ, DOWNLINK_TRANSMIT +} PackagedDataContentType_t; typedef struct PackagedDataToSend { uint32_t curr_attempt; + PackagedDataContentType_t data_type; uint32_t data_len; std::unique_ptr data; Server_t destination; bool logged; std::time_t schedule; - PackagedDataToSend(uint32_t curr_attempt, uint32_t data_len, uint8_t *data_content, Server_t& destination) + PackagedDataToSend(uint32_t curr_attempt, PackagedDataContentType_t data_type, uint32_t data_len, uint8_t *data_content, Server_t& destination) { this->logged = false; this->schedule = 0; this->curr_attempt = curr_attempt; + this->data_type = data_type; this->data_len = data_len; this->data = std::unique_ptr(data_content); this->destination = destination; @@ -75,6 +81,7 @@ typedef struct PackagedDataToSend logged = origin.logged; schedule = origin.schedule; curr_attempt = origin.curr_attempt; + data_type = origin.data_type; data_len = origin.data_len; data = std::move(origin.data); destination = origin.destination; @@ -92,7 +99,7 @@ bool RecvUdp(Server_t &server, char *msg, int size, std::function &validator); NetworkConf_t PrepareNetworking(const char* networkInterfaceName, suseconds_t dataRecvTimeout, char gatewayId[25]); -void EnqueuePacket(uint8_t *data, uint32_t data_length, Server_t& dest, Direction direction); +void EnqueuePacket(uint8_t *data, uint32_t data_length, PackagedDataContentType_t data_type, Server_t& dest, Direction direction); bool RequeuePacket(PackagedDataToSend_t &&packet, uint32_t maxAttempts, Direction direction); PackagedDataToSend_t DequeuePacket(Direction direction); diff --git a/smtUdpPacketForwarder/config.h b/smtUdpPacketForwarder/config.h index 460b0e5..0a4a104 100644 --- a/smtUdpPacketForwarder/config.h +++ b/smtUdpPacketForwarder/config.h @@ -98,7 +98,6 @@ typedef struct LoRaPacketTrafficStats { uint32_t recv_packets; uint32_t recv_packets_crc_good; uint32_t forw_packets; - uint32_t forw_packets_crc_good; volatile uint32_t acked_forw_packets; volatile uint32_t downlink_recv_packets; uint32_t downlink_tx_packets;