Skip to content

Commit

Permalink
cleaning funnel
Browse files Browse the repository at this point in the history
  • Loading branch information
tanneberger committed Dec 8, 2022
1 parent 5913ca4 commit 0c41de4
Show file tree
Hide file tree
Showing 14 changed files with 105 additions and 138 deletions.
2 changes: 2 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ set(CMAKE_CXX_STANDARD_REQUIRED ON)
set(CMAKE_VERBOSE_MAKEFILE ON)
set(THREADS_PREFER_PTHREAD_FLAG ON)

set(CMAKE_CXX_FLAGS "-O3 -Wall -Wextra")

include(FindPkgConfig)

find_package (Threads REQUIRED)
Expand Down
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ Service which filters and sends out data to all listening clients over websocket

- **GRPC_PORT**: mendatory under which the grpc server runs
- **WEBSOCKET_PORT**: mendatory under which port the websocket port runs

- **API_DOMAIN**: http domain of an running [dvb-api](https://github.com/dump-dvb/dvb-api)

### Filter

Expand All @@ -19,7 +19,8 @@ Service which filters and sends out data to all listening clients over websocket
{
"lines": [9, 11],
"positions": [],
"regions": [0]
"regions": [0],
"enrich": true
}
```

Expand Down
1 change: 0 additions & 1 deletion default.nix

This file was deleted.

27 changes: 2 additions & 25 deletions derivation.nix
Original file line number Diff line number Diff line change
Expand Up @@ -17,28 +17,6 @@
json-structs-src
}:
let
/*websocketpp = stdenv.mkDerivation rec {
pname = "websocket++";
version = "0.8.2";
src = fetchFromGitHub {
owner = "zaphoyd";
repo = "websocketpp";
rev = version;
sha256 = "sha256-9fIwouthv2GcmBe/UPvV7Xn9P2o0Kmn2hCI4jCh0hPM=";
};
nativeBuildInputs = [ cmake ];
meta = with lib; {
homepage = "https://www.zaphoyd.com/websocketpp/";
description = "C++/Boost Asio based websocket client/server library";
license = licenses.bsd3;
platforms = platforms.unix;
maintainers = with maintainers; [ revol-xut ];
};
};*/

json_struct = stdenv.mkDerivation rec {
pname = "json_struct";
version = "0.0.1";
Expand All @@ -50,9 +28,8 @@ let
nativeBuildInputs = [ cmake ];

meta = with lib; {
homepage = "https://www.zaphoyd.com/websocketpp/";
description = "C++/Boost Asio based websocket client/server library";
license = licenses.bsd3;
homepage = "https://github.com/jorgen/json_structs/";
description = "json_struct is a header only library that parses JSON to C++ structs and serializing structs to JSON.";
platforms = platforms.unix;
maintainers = with maintainers; [ revol-xut ];
};
Expand Down
6 changes: 3 additions & 3 deletions flake.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

33 changes: 2 additions & 31 deletions src/api_request.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,46 +3,17 @@

#include "protobuf/telegram.pb.h"

#include <algorithm>
#include <vector>
#include <map>
#include <cinttypes>
#include <unordered_map>

#include <json_struct/json_struct.h>

struct RequestInterpolated {
unsigned int line;
unsigned int run;

RequestInterpolated(unsigned int input_line, unsigned int input_run): line(input_line), run(input_run) {};
RequestInterpolated(unsigned int input_line, unsigned int input_run) : line(input_line), run(input_run) {};

RequestInterpolated() = default;

JS_OBJ(line, run);
};

struct Epsg3857 {
float x;
float y;

JS_OBJ(x, y);
};

struct Position {
float lat;
float lon;
std::unordered_map<std::string, Epsg3857> properties;

JS_OBJ(lat, lon, properties);
};

struct ResponseInterpolated {
unsigned int historical_time;
unsigned int next_reporting_point;
std::unordered_map<std::string, Position> positions;

JS_OBJ(historical_time, next_reporting_point, positions);
};


#endif //FUNNEL_API_REQUESTS_HPP
80 changes: 47 additions & 33 deletions src/broadcast_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,9 @@
#include <curlpp/Options.hpp>
#include <curlpp/Infos.hpp>

#include <google/protobuf/util/type_resolver.h>
#include <google/protobuf/util/type_resolver_util.h>
#include <google/protobuf/util/json_util.h>
#include <google/protobuf/message.h>
//#include <google/protobuf/json/json.h>

#include <utility>
#include <vector>
Expand All @@ -29,12 +27,20 @@ BroadcastServer::BroadcastServer() noexcept {
server_.init_asio();

// Register handler callbacks
server_.set_open_handler([this](auto && PH1) { on_open(std::forward<decltype(PH1)>(PH1)); });
server_.set_close_handler([this](auto && PH1) { on_close(std::forward<decltype(PH1)>(PH1)); });
server_.set_message_handler([this](auto && PH1, auto && PH2) { on_message(std::forward<decltype(PH1)>(PH1), std::forward<decltype(PH2)>(PH2)); });
server_.set_open_handler([this](auto &&PH1) { on_open(std::forward<decltype(PH1)>(PH1)); });
server_.set_close_handler([this](auto &&PH1) { on_close(std::forward<decltype(PH1)>(PH1)); });
server_.set_message_handler([this](auto &&PH1, auto &&PH2) {
on_message(std::forward<decltype(PH1)>(PH1), std::forward<decltype(PH2)>(PH2));
});


api_url_ = std::getenv("API_DOMAIN");
if (std::getenv("API_DOMAIN") == nullptr) {
api_url_ = std::getenv("API_DOMAIN");
std::cout << "API_DOMAIN FOUND:" << api_url_.value() << std::endl;
} else {
api_url_ = std::nullopt;
std::cout << "NO API_DOMAIN FOUND" << std::endl;
};
}

BroadcastServer::~BroadcastServer() noexcept {
Expand All @@ -47,23 +53,23 @@ void BroadcastServer::run(uint16_t port) noexcept {

try {
server_.run();
} catch (const std::exception & e) {
} catch (const std::exception &e) {
std::cout << e.what() << std::endl;
}
}

void BroadcastServer::on_open(connection_hdl hdl) noexcept {
{
lock_guard<mutex> guard(action_lock_);
actions_.push(action(SUBSCRIBE,std::move(hdl)));
actions_.push(action(SUBSCRIBE, std::move(hdl)));
}
action_condition_.notify_one();
}

void BroadcastServer::on_close(connection_hdl hdl) noexcept {
{
lock_guard<mutex> guard(action_lock_);
actions_.push(action(UNSUBSCRIBE,std::move(hdl)));
actions_.push(action(UNSUBSCRIBE, std::move(hdl)));
}
action_condition_.notify_one();
}
Expand All @@ -72,24 +78,24 @@ void BroadcastServer::on_message(connection_hdl hdl, server::message_ptr msg) no
// queue message up for sending by processing thread
{
lock_guard<mutex> guard(action_lock_);
actions_.push(action(MESSAGE,std::move(hdl),std::move(msg)));
actions_.push(action(MESSAGE, std::move(hdl), std::move(msg)));
}
action_condition_.notify_one();
}

void BroadcastServer::process_messages() noexcept {
// gets prometheus counter
auto& opened_connections = exporter_.get_opened_connections();
auto& closed_connections = exporter_.get_closed_connections();
auto &opened_connections = exporter_.get_opened_connections();
auto &closed_connections = exporter_.get_closed_connections();

// initializes the two labels
opened_connections.Add({{"count", "accumulative"}});
closed_connections.Add({{"count", "accumulative"}});

while(not kill_) {
while (not kill_) {
std::unique_lock<mutex> lock(action_lock_);

while(actions_.empty()) {
while (actions_.empty()) {
action_condition_.wait(lock);
}

Expand All @@ -109,8 +115,8 @@ void BroadcastServer::process_messages() noexcept {
} else if (a.type == UNSUBSCRIBE) {
std::lock_guard<mutex> guard(connection_lock_);

const auto pos = std::find_if(connections_.begin(), connections_.end(), [&a](const connection_hdl& ptr1) {
return ptr1.lock().get() == ((const std::weak_ptr<void>&)a.hdl).lock().get();
const auto pos = std::find_if(connections_.begin(), connections_.end(), [&a](const connection_hdl &ptr1) {
return ptr1.lock().get() == ((const std::weak_ptr<void> &) a.hdl).lock().get();
});

auto index = pos - std::begin(connections_);
Expand All @@ -127,16 +133,19 @@ void BroadcastServer::process_messages() noexcept {
// parses string to filter struct
JS::ParseContext context(message);
Filter filter;
context.parseTo(filter);
JS::Error return_value = context.parseTo(filter);

{
// no error while decoding payload from user
if (return_value == JS::Error::NoError) {
// we sadly need a lock here to make sure that nobody deletes the connection while writing
std::lock_guard<mutex> guard(connection_lock_);

// searches for connection
const auto pos = std::find_if(connections_.begin(), connections_.end(), [&a](const connection_hdl& ptr1) {
return ptr1.lock().get() == ((const std::weak_ptr<void>&)a.hdl).lock().get();
});
const auto pos = std::find_if(connections_.begin(), connections_.end(),
[&a](const connection_hdl &ptr1) {
return ptr1.lock().get() ==
((const std::weak_ptr<void> &) a.hdl).lock().get();
});

auto index = pos - std::begin(connections_);

Expand All @@ -149,20 +158,25 @@ void BroadcastServer::process_messages() noexcept {
}
}

auto BroadcastServer::fetch_api(unsigned int line, unsigned int run, unsigned int region) noexcept -> dvbdump::Edge* {
auto BroadcastServer::fetch_api(unsigned int line, unsigned int run, unsigned int region) const noexcept
-> dvbdump::Edge * {
if (!api_url_.has_value()) {
return nullptr;
}

RequestInterpolated request{line, run};

std::string body = JS::serializeStruct(request);

std::list<std::string> header;
header.push_back("Content-Type: application/json");
header.emplace_back("Content-Type: application/json");

curlpp::Cleanup clean;
curlpp::Easy r;
r.setOpt(new curlpp::options::Url(api_url_ + "/vehicles/" +std::to_string(region) + "/position"));
r.setOpt(new curlpp::options::Url(api_url_.value() + "/vehicles/" + std::to_string(region) + "/position"));
r.setOpt(new curlpp::options::HttpHeader(header));
r.setOpt(new curlpp::options::PostFields(body));
r.setOpt(new curlpp::options::PostFieldSize(body.length()));
r.setOpt(new curlpp::options::PostFieldSize(static_cast<long>(body.length())));

std::stringstream response;
r.setOpt(new curlpp::options::WriteStream(&response));
Expand All @@ -177,8 +191,8 @@ auto BroadcastServer::fetch_api(unsigned int line, unsigned int run, unsigned
google::protobuf::util::JsonParseOptions parse_options;
parse_options.case_insensitive_enum_parsing = false;
parse_options.ignore_unknown_fields = true;
auto* interpolation_struct = new dvbdump::Edge();
auto *interpolation_struct = new dvbdump::Edge();

auto status = google::protobuf::util::JsonStringToMessage(protobuf_string, interpolation_struct, parse_options);

if (status.ok()) {
Expand All @@ -192,7 +206,7 @@ auto BroadcastServer::fetch_api(unsigned int line, unsigned int run, unsigned
}
}

void BroadcastServer::queue_telegram(const dvbdump::R09GrpcTelegram* telegram) noexcept {
void BroadcastServer::queue_telegram(const dvbdump::R09GrpcTelegram *telegram) noexcept {
// serialize the protobuf struct into json string
std::string plain_serialized;
plain_serialized.reserve(400);
Expand All @@ -206,8 +220,8 @@ void BroadcastServer::queue_telegram(const dvbdump::R09GrpcTelegram* telegram) n
options.always_print_enums_as_ints = true;

google::protobuf::util::MessageToJsonString(*telegram, &plain_serialized, options);

auto interpolation_data = fetch_api(telegram->line(), telegram->run_number(), telegram->region());

bool enrichment_possible = interpolation_data != nullptr;
dvbdump::R09GrpcTelegramEnriched enriched_telegram;
if (enrichment_possible) {
Expand Down Expand Up @@ -238,12 +252,12 @@ void BroadcastServer::queue_telegram(const dvbdump::R09GrpcTelegram* telegram) n
std::lock_guard<std::mutex> guard(connection_lock_);
//connection_list::iterator it;
auto filter_iterator = filters_.begin();
for (auto & connection : connections_) {
for (auto &connection: connections_) {
if (filter_iterator->match(telegram->line(), telegram->reporting_point(), telegram->region())) {
if(filter_iterator->enrich && enrichment_possible) {
server_.send(connection,enriched_serialized, websocketpp::frame::opcode::TEXT);
if (filter_iterator->enrich && enrichment_possible) {
server_.send(connection, enriched_serialized, websocketpp::frame::opcode::TEXT);
} else {
server_.send(connection,plain_serialized, websocketpp::frame::opcode::TEXT);
server_.send(connection, plain_serialized, websocketpp::frame::opcode::TEXT);
}
}
filter_iterator++;
Expand Down
6 changes: 3 additions & 3 deletions src/broadcast_server.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
#include "protobuf/telegram.pb.h"
#include "filter.hpp"
#include "prometheus.hpp"
#include "./api_request.hpp"
#include "api_request.hpp"

#include <websocketpp/config/asio_no_tls.hpp>
#include <websocketpp/server.hpp>
Expand Down Expand Up @@ -57,7 +57,7 @@ class BroadcastServer {

PrometheusExporter exporter_;

std::string api_url_;
std::optional<std::string> api_url_;
public:
BroadcastServer() noexcept;
~BroadcastServer() noexcept;
Expand All @@ -69,7 +69,7 @@ class BroadcastServer {
void process_messages() noexcept;
void queue_telegram(const dvbdump::R09GrpcTelegram* telegram) noexcept;
void kill() noexcept;
auto fetch_api(unsigned int line, unsigned int run, unsigned int region) noexcept -> dvbdump::Edge*;
auto fetch_api(unsigned int line, unsigned int run, unsigned int region) const noexcept -> dvbdump::Edge*;
};

#endif //FUNNEL_BROADCAST_SERVER_HPP
Expand Down
Loading

0 comments on commit 0c41de4

Please sign in to comment.