Skip to content

Commit

Permalink
Fix packet stats
Browse files Browse the repository at this point in the history
  • Loading branch information
zhgzhg committed Jan 3, 2024
1 parent 3cf17c2 commit 25a2f6d
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 23 deletions.
23 changes: 10 additions & 13 deletions main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,19 +67,19 @@ void appIntubator(char* const argv[]) { // {{{
}
} // }}}

void networkPacketExhangeWorker(LoRaPacketTrafficStats_t *loraPacketStats,
void networkPacketExchangeWorker(LoRaPacketTrafficStats_t *loraPacketStats,
std::vector<Server_t> *servers) { // {{{

static std::function<bool(char*, int, char*, int)> isValidUplinkAck =
[](char* origMsg, int origMsgSz, char* respMsg, int respMsgSz) {
[](char* origMsg, int origMsgSz, char* respMsg, int respMsgSz) { // {{{
if (origMsg != nullptr && origMsgSz > 4 && respMsg != nullptr && respMsgSz >= 4) {
return origMsg[0] == respMsg[0] && origMsg[1] == respMsg[1] &&
origMsg[2] == respMsg[2] && (respMsg[3] == PKT_PUSH_ACK || respMsg[3] == PKT_PULL_ACK);
}
return false;
};
}; // }}}
static std::function<bool(char*, int, char*, int*)> isValidDownlinkPkt =
[](char *origMsg, int origMsgSz, char* respErrorJsonMsg, int *maxRespErrorJsonMsgSz) {
[](char *origMsg, int origMsgSz, char* respErrorJsonMsg, int *maxRespErrorJsonMsgSz) { // {{{
if (origMsg != nullptr && origMsgSz > 15 && origMsg[0] == PROTOCOL_VERSION &&
origMsg[3] == PKT_PULL_RESP)
{
Expand All @@ -96,15 +96,15 @@ void networkPacketExhangeWorker(LoRaPacketTrafficStats_t *loraPacketStats,
}

return false;
};
}; // }}}

struct TxPacket {
struct TxPacket { // {{{
PackagedDataToSend_t packet;
Direction direction;

TxPacket(PackagedDataToSend_t&& pkt, Direction directn) : packet(std::move(pkt)), direction(directn)
{ }
};
}; // }}}

char downlinkMsg[RX_BUFF_DOWN_SIZE];
bool iterateImmediately;
Expand Down Expand Up @@ -145,7 +145,7 @@ void networkPacketExhangeWorker(LoRaPacketTrafficStats_t *loraPacketStats,
{ printf("(%s) Requeued the %s packet.\n", asciiTime, (direction == UP_TX ? "uplink" : "downlink fetch request")); }
fflush(stdout);
}
else
else if (direction == UP_TX && packet.data_type == UPLINK_PUSH)
{ ++(loraPacketStats->acked_forw_packets); }
}
}
Expand Down Expand Up @@ -272,7 +272,7 @@ int main(int argc, char **argv) {
schedPrio.sched_priority = sched_get_priority_max(SCHED_RR) - 10;
sched_setscheduler(0, SCHED_RR, (const sched_param*) &schedPrio);

std::thread packetExchanger{networkPacketExhangeWorker, &loraPacketStats, &cfg.servers};
std::thread packetExchanger{networkPacketExchangeWorker, &loraPacketStats, &cfg.servers};
if (useIntubator) {
std::thread intubator{appIntubator, argv};
intubator.detach();
Expand All @@ -287,12 +287,8 @@ int main(int argc, char **argv) {
nextStatUpdateTime = currTime + sendStatPktIntervalSeconds;

PublishStatProtocolPacket(cfg, loraPacketStats);
++loraPacketStats.forw_packets_crc_good;
++loraPacketStats.forw_packets;

PublishLoRaDownlinkProtocolPacket(cfg);
++loraPacketStats.forw_packets_crc_good;
++loraPacketStats.forw_packets;
}

if (!keepRunning) break;
Expand All @@ -301,6 +297,7 @@ int main(int argc, char **argv) {
lora, cfg, loraDataPacket, msg, loraPacketStats);

if (lastRecvResult == LoRaRecvStat::DATARECV) {
++loraPacketStats.forw_packets;
PublishLoRaUplinkProtocolPacket(cfg, loraDataPacket);
lastRFInteractionTime = std::time(nullptr);
} else if (keepRunning && lastRecvResult == LoRaRecvStat::NODATA) {
Expand Down
16 changes: 9 additions & 7 deletions smtUdpPacketForwarder/UdpUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ static const std::map<Direction, std::timed_mutex&> direction_to_mutex = {
};

static Server_t NO_SERVER;
PackagedDataToSend_t NO_PACKAGED_DATA{0UL, 0UL, {}, NO_SERVER};
PackagedDataToSend_t NO_PACKAGED_DATA{0UL, STAT_PUSH, 0UL, {}, NO_SERVER};

static std::map<std::string, std::pair<time_t, struct in_addr> > hostname_cache;

Expand Down Expand Up @@ -123,7 +123,7 @@ bool RecvUdp(Server_t &server, char *msg, int size,
uint8_t *packet = new uint8_t[j - 3];
memcpy(packet, msg + 4, j - 4);
packet[j - 4] = '\0';
EnqueuePacket(packet, j - 4, server, DOWN_RX);
EnqueuePacket(packet, j - 4, DOWNLINK_TRANSMIT, server, DOWN_RX);

return true;
}
Expand Down Expand Up @@ -252,7 +252,7 @@ bool RequeuePacket(PackagedDataToSend_t &&packet, uint32_t maxAttempts, Directio
return true;
}

void EnqueuePacket(uint8_t *data, uint32_t data_length, Server_t& dest, Direction direction) // {{{
void EnqueuePacket(uint8_t *data, uint32_t data_length, PackagedDataContentType_t data_type, Server_t& dest, Direction direction) // {{{
{
if (data == nullptr) return;

Expand All @@ -267,7 +267,7 @@ void EnqueuePacket(uint8_t *data, uint32_t data_length, Server_t& dest, Directio
return;
}

PackagedDataToSend_t packaged_data{ 0UL, data_length, data, dest };
PackagedDataToSend_t packaged_data{ 0UL, data_type, data_length, data, dest };
direction_to_queue.at(direction).push(std::move(packaged_data));
} // }}}

Expand Down Expand Up @@ -295,6 +295,7 @@ PackagedDataToSend_t DequeuePacket(Direction direction) // {{{
void PublishStatProtocolPacket(PlatformInfo_t &cfg, LoRaPacketTrafficStats_t &pktStats) // {{{
{
// see https://github.com/Lora-net/packet_forwarder/blob/master/PROTOCOL.TXT
// also see document ANNWS.01.2.1.W.SYS

char status_report[STATUS_MSG_SIZE]; /* status report as a JSON object */
char stat_timestamp[24];
Expand Down Expand Up @@ -375,14 +376,15 @@ void PublishStatProtocolPacket(PlatformInfo_t &cfg, LoRaPacketTrafficStats_t &pk
size_t packet_sz = stat_index + json.size();
uint8_t *packet = new uint8_t[packet_sz];
memcpy(packet, status_report, packet_sz);
EnqueuePacket(packet, packet_sz, serv, UP_TX);
EnqueuePacket(packet, packet_sz, STAT_PUSH, serv, UP_TX);
}

} // }}}

void PublishLoRaUplinkProtocolPacket(PlatformInfo_t &cfg, LoRaDataPkt_t &loraPacket) // {{{
{
// see https://github.com/Lora-net/packet_forwarder/blob/master/PROTOCOL.TXT
// also see document ANNWS.01.2.1.W.SYS

char buff_up[TX_BUFF_UP_SIZE]; /* buffer to compose the upstream packet */
int buff_index = 0;
Expand Down Expand Up @@ -480,7 +482,7 @@ void PublishLoRaUplinkProtocolPacket(PlatformInfo_t &cfg, LoRaDataPkt_t &loraPac
size_t packet_sz = buff_index + json.size();
uint8_t *packet = new uint8_t[packet_sz];
memcpy(packet, buff_up, packet_sz);
EnqueuePacket(packet, packet_sz, serv, UP_TX);
EnqueuePacket(packet, packet_sz, UPLINK_PUSH, serv, UP_TX);
}

} // }}}
Expand Down Expand Up @@ -511,6 +513,6 @@ void PublishLoRaDownlinkProtocolPacket(PlatformInfo_t &cfg) // {{{

uint8_t *packet = new uint8_t[sizeof(buff_up)];
memcpy(packet, buff_up, sizeof(buff_up));
EnqueuePacket(packet, sizeof(buff_up), serv, DOWN_TX);
EnqueuePacket(packet, sizeof(buff_up), DOWNLINK_REQ, serv, DOWN_TX);
}
} // }}}
11 changes: 9 additions & 2 deletions smtUdpPacketForwarder/UdpUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,21 +50,27 @@

#define BASE64_MAX_LENGTH 341

typedef enum PackagedDataContentType : char
{
STAT_PUSH = 0, UPLINK_PUSH, DOWNLINK_REQ, DOWNLINK_TRANSMIT
} PackagedDataContentType_t;

typedef struct PackagedDataToSend
{
uint32_t curr_attempt;
PackagedDataContentType_t data_type;
uint32_t data_len;
std::unique_ptr<uint8_t> data;
Server_t destination;
bool logged;
std::time_t schedule;

PackagedDataToSend(uint32_t curr_attempt, uint32_t data_len, uint8_t *data_content, Server_t& destination)
PackagedDataToSend(uint32_t curr_attempt, PackagedDataContentType_t data_type, uint32_t data_len, uint8_t *data_content, Server_t& destination)
{
this->logged = false;
this->schedule = 0;
this->curr_attempt = curr_attempt;
this->data_type = data_type;
this->data_len = data_len;
this->data = std::unique_ptr<uint8_t>(data_content);
this->destination = destination;
Expand All @@ -75,6 +81,7 @@ typedef struct PackagedDataToSend
logged = origin.logged;
schedule = origin.schedule;
curr_attempt = origin.curr_attempt;
data_type = origin.data_type;
data_len = origin.data_len;
data = std::move(origin.data);
destination = origin.destination;
Expand All @@ -92,7 +99,7 @@ bool RecvUdp(Server_t &server, char *msg, int size,
std::function<bool(char*, int, char*, int*)> &validator);
NetworkConf_t PrepareNetworking(const char* networkInterfaceName, suseconds_t dataRecvTimeout, char gatewayId[25]);

void EnqueuePacket(uint8_t *data, uint32_t data_length, Server_t& dest, Direction direction);
void EnqueuePacket(uint8_t *data, uint32_t data_length, PackagedDataContentType_t data_type, Server_t& dest, Direction direction);
bool RequeuePacket(PackagedDataToSend_t &&packet, uint32_t maxAttempts, Direction direction);
PackagedDataToSend_t DequeuePacket(Direction direction);

Expand Down
1 change: 0 additions & 1 deletion smtUdpPacketForwarder/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ typedef struct LoRaPacketTrafficStats {
uint32_t recv_packets;
uint32_t recv_packets_crc_good;
uint32_t forw_packets;
uint32_t forw_packets_crc_good;
volatile uint32_t acked_forw_packets;
volatile uint32_t downlink_recv_packets;
uint32_t downlink_tx_packets;
Expand Down

0 comments on commit 25a2f6d

Please sign in to comment.