From cbcdd74d25f8f907a47fd0400a36c58f16b0e35d Mon Sep 17 00:00:00 2001 From: revol-xut Date: Wed, 29 Mar 2023 15:17:15 +0200 Subject: [PATCH] funnel now handles waypoints --- CMakeLists.txt | 2 +- derivation.nix | 2 +- flake.lock | 18 ++-- src/broadcast_server.cpp | 93 ++----------------- src/broadcast_server.hpp | 3 +- src/filter.hpp | 17 +--- src/main.cpp | 4 +- src/receives_telegrams.cpp | 27 ------ src/receives_waypoints.cpp | 26 ++++++ ...s_telegrams.hpp => receives_waypoints.hpp} | 8 +- test.py | 5 +- 11 files changed, 57 insertions(+), 148 deletions(-) delete mode 100644 src/receives_telegrams.cpp create mode 100644 src/receives_waypoints.cpp rename src/{receives_telegrams.hpp => receives_waypoints.hpp} (72%) diff --git a/CMakeLists.txt b/CMakeLists.txt index 60b9612..fec3fc4 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -20,7 +20,7 @@ pkg_check_modules(CURLPP REQUIRED curlpp) add_executable(funnel src/main.cpp src/broadcast_server.cpp - src/receives_telegrams.cpp + src/receives_waypoints.cpp src/protobuf/telegram.pb.cc src/protobuf/telegram.grpc.pb.cc src/prometheus.cpp diff --git a/derivation.nix b/derivation.nix index 92fc43d..0e75a37 100644 --- a/derivation.nix +++ b/derivation.nix @@ -37,7 +37,7 @@ let in stdenv.mkDerivation { pname = "funnel"; - version = "0.1.0"; + version = "0.2.0"; src = ./.; diff --git a/flake.lock b/flake.lock index 10665a0..5dcf5d1 100644 --- a/flake.lock +++ b/flake.lock @@ -18,11 +18,11 @@ }, "nixpkgs": { "locked": { - "lastModified": 1677779205, - "narHash": "sha256-6DBjL9wjq86p2GczmwnHtFRnWPBPItc67gapWENBgX8=", + "lastModified": 1679966490, + "narHash": "sha256-k0jV+y1jawE6w4ZvKgXDNg4+O9NNtcaWwzw8gufv0b4=", "owner": "nixos", "repo": "nixpkgs", - "rev": "96e18717904dfedcd884541e5a92bf9ff632cf39", + "rev": "5b7cd5c39befee629be284970415b6eb3b0ff000", "type": "github" }, "original": { @@ -43,11 +43,11 @@ "tlms-rust": { "flake": false, "locked": { - "lastModified": 1677980059, - "narHash": "sha256-bPTKW80qgJLv14Th5USYM+S3pt7irsfhnJMxZhQp9eo=", + "lastModified": 1680095449, + "narHash": "sha256-cnM1QfANsg4d+hvNpmihFKwAn+R4VNtLvElSQtI3EVw=", "owner": "tlm-solutions", "repo": "tlms.rs", - "rev": "99be81df55035ff8645a38983911bd862a8aa281", + "rev": "75bcddabdb8df9e04a0849b174d06409ac40715d", "type": "github" }, "original": { @@ -58,11 +58,11 @@ }, "utils": { "locked": { - "lastModified": 1676283394, - "narHash": "sha256-XX2f9c3iySLCw54rJ/CZs+ZK6IQy7GXNY4nSOyu2QG4=", + "lastModified": 1678901627, + "narHash": "sha256-U02riOqrKKzwjsxc/400XnElV+UtPUQWpANPlyazjH0=", "owner": "numtide", "repo": "flake-utils", - "rev": "3db36a8b464d0c4532ba1c7dda728f4576d6d073", + "rev": "93a2b84fc4b70d9e089d029deacc3583435c2ed6", "type": "github" }, "original": { diff --git a/src/broadcast_server.cpp b/src/broadcast_server.cpp index 3d257db..60674a8 100644 --- a/src/broadcast_server.cpp +++ b/src/broadcast_server.cpp @@ -158,107 +158,26 @@ void BroadcastServer::process_messages() noexcept { } } -auto BroadcastServer::fetch_api(unsigned int line, unsigned int run, unsigned int region) const noexcept --> tlms::Edge * { - if (!api_url_.has_value()) { - return nullptr; - } - - RequestInterpolated request{line, run}; - - std::string body = JS::serializeStruct(request); - - std::list header; - header.emplace_back("Content-Type: application/json"); - - curlpp::Cleanup clean; - curlpp::Easy r; - r.setOpt(new curlpp::options::Url(api_url_.value() + "/vehicles/" + std::to_string(region) + "/position")); - r.setOpt(new curlpp::options::HttpHeader(header)); - r.setOpt(new curlpp::options::PostFields(body)); - r.setOpt(new curlpp::options::PostFieldSize(static_cast(body.length()))); - - std::stringstream response; - r.setOpt(new curlpp::options::WriteStream(&response)); - - r.perform(); - - auto http_code = curlpp::infos::ResponseCode::get(r); - - if (http_code == 200) { - std::string json_string = response.str(); - auto protobuf_string = google::protobuf::StringPiece(json_string); - google::protobuf::util::JsonParseOptions parse_options; - parse_options.case_insensitive_enum_parsing = false; - parse_options.ignore_unknown_fields = true; - auto *interpolation_struct = new tlms::Edge(); - - auto status = google::protobuf::util::JsonStringToMessage(protobuf_string, interpolation_struct, parse_options); - - if (status.ok()) { - return interpolation_struct; - } else { - std::cout << "couldn't decode:" << json_string << " error:" << status.message() << std::endl; - return nullptr; - } - } else { - return nullptr; - } -} - -void BroadcastServer::queue_telegram(const tlms::R09GrpcTelegram *telegram) noexcept { +void BroadcastServer::queue_waypoint(const tlms::GrpcWaypoint *waypoint) noexcept { // serialize the protobuf struct into json string std::string plain_serialized; plain_serialized.reserve(400); - std::string enriched_serialized; - enriched_serialized.reserve(600); - google::protobuf::util::JsonPrintOptions options; options.always_print_primitive_fields = true; options.preserve_proto_field_names = true; options.always_print_enums_as_ints = true; - google::protobuf::util::MessageToJsonString(*telegram, &plain_serialized, options); - auto interpolation_data = fetch_api(telegram->line(), telegram->run_number(), telegram->region()); - - bool enrichment_possible = interpolation_data != nullptr; - tlms::R09GrpcTelegramEnriched enriched_telegram; - if (enrichment_possible) { - enriched_telegram.set_time(telegram->time()); - enriched_telegram.set_station(telegram->station()); - enriched_telegram.set_region(telegram->region()); - enriched_telegram.set_telegram_type(telegram->telegram_type()); - enriched_telegram.set_delay(telegram->delay()); - enriched_telegram.set_reporting_point(telegram->reporting_point()); - enriched_telegram.set_junction(telegram->junction()); - enriched_telegram.set_direction(telegram->direction()); - enriched_telegram.set_request_status(telegram->request_status()); - enriched_telegram.set_priority(telegram->priority()); - enriched_telegram.set_direction_request(telegram->direction_request()); - enriched_telegram.set_line(telegram->line()); - enriched_telegram.set_run_number(telegram->run_number()); - enriched_telegram.set_destination_number(telegram->destination_number()); - enriched_telegram.set_train_length(telegram->train_length()); - enriched_telegram.set_vehicle_number(telegram->vehicle_number()); - enriched_telegram.set_operator_(telegram->operator_()); - enriched_telegram.set_allocated_enriched(interpolation_data); - - google::protobuf::util::MessageToJsonString(enriched_telegram, &enriched_serialized, options); - } - - // lock connection list and yeet the telegram to all peers + google::protobuf::util::MessageToJsonString(*waypoint, &plain_serialized, options); + + // lock connection list and yeet the waypoint to all peers { std::lock_guard guard(connection_lock_); //connection_list::iterator it; auto filter_iterator = filters_.begin(); for (auto &connection: connections_) { - if (filter_iterator->match(telegram->line(), telegram->reporting_point(), telegram->region())) { - if (filter_iterator->enrich && enrichment_possible) { - server_.send(connection, enriched_serialized, websocketpp::frame::opcode::TEXT); - } else { - server_.send(connection, plain_serialized, websocketpp::frame::opcode::TEXT); - } + if (filter_iterator->match(waypoint->line(), waypoint->region())) { + server_.send(connection, plain_serialized, websocketpp::frame::opcode::TEXT); } filter_iterator++; } diff --git a/src/broadcast_server.hpp b/src/broadcast_server.hpp index 5f4b131..f6f4ce8 100644 --- a/src/broadcast_server.hpp +++ b/src/broadcast_server.hpp @@ -67,9 +67,8 @@ class BroadcastServer { void on_close(connection_hdl hdl) noexcept; void on_message(connection_hdl hdl, server::message_ptr msg) noexcept; void process_messages() noexcept; - void queue_telegram(const tlms::R09GrpcTelegram* telegram) noexcept; + void queue_waypoint(const tlms::GrpcWaypoint* waypoint) noexcept; void kill() noexcept; - auto fetch_api(unsigned int line, unsigned int run, unsigned int region) const noexcept -> tlms::Edge*; }; #endif //FUNNEL_BROADCAST_SERVER_HPP diff --git a/src/filter.hpp b/src/filter.hpp index c0ff2d4..88796af 100644 --- a/src/filter.hpp +++ b/src/filter.hpp @@ -9,30 +9,23 @@ struct Filter { std::vector lines; - std::vector positions; std::vector regions; - bool enrich = false; - JS_OBJ(lines, positions, regions, enrich); + JS_OBJ(lines, regions); Filter( std::vector other_line, - std::vector other_position, - std::vector other_region, - bool enrich - ) : lines(std::move(other_line)), positions(std::move(other_position)), regions(std::move(other_region)), enrich(enrich) {} + std::vector other_region) + : lines(std::move(other_line)), regions(std::move(other_region)) {} Filter() = default; - [[nodiscard]] auto match(unsigned int other_line, unsigned int other_position, unsigned int other_region) const noexcept -> bool { + [[nodiscard]] auto match(unsigned int other_line, unsigned int other_region) const noexcept -> bool { bool matches_line = (std::find(std::begin(lines), std::end(lines), other_line) != std::end(lines)) || lines.empty(); - bool matches_position = - (std::find(std::begin(positions), std::end(positions), other_position) != std::end(positions)) || - positions.empty(); bool matches_region = (std::find(std::begin(regions), std::end(regions), other_region) != std::end(regions)) || regions.empty(); - return matches_line && matches_region && matches_position; + return matches_line && matches_region; } }; diff --git a/src/main.cpp b/src/main.cpp index 83d46ac..3238ba9 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -1,7 +1,7 @@ //#define ASIO_STANDALONE //#define _WEBSOCKETPP_CPP11_TYPE_TRAITS_ -#include "receives_telegrams.hpp" +#include "receives_waypoints.hpp" #include #include @@ -16,7 +16,7 @@ int main() { unsigned short websocket_port = static_cast(std::stoi(std::getenv("WEBSOCKET_PORT"))); std::string server_address("127.0.0.1:" + std::to_string(grpc_port)); - ReceivesTelegramsImpl service(websocket_port); + ReceivesWaypointsImpl service(websocket_port); grpc::ServerBuilder builder; builder.AddListeningPort(server_address, grpc::InsecureServerCredentials()); diff --git a/src/receives_telegrams.cpp b/src/receives_telegrams.cpp deleted file mode 100644 index c6a837a..0000000 --- a/src/receives_telegrams.cpp +++ /dev/null @@ -1,27 +0,0 @@ -#ifndef FUNNEL_RECEIVES_TELEGRAMS_CPP -#define FUNNEL_RECEIVES_TELEGRAMS_CPP - -#include "./receives_telegrams.hpp" - -ReceivesTelegramsImpl::~ReceivesTelegramsImpl() noexcept { - websocket_server_.kill(); - active_listener_.join(); - message_processor_.join(); -} - -auto ReceivesTelegramsImpl::receive_r09([[maybe_unused]]grpc::ServerContext *context, - const tlms::R09GrpcTelegram *telegram, - tlms::ReturnCode *return_code) noexcept -> grpc::Status { - this->websocket_server_.queue_telegram(telegram); - return_code->set_status(0); - return grpc::Status::OK; -} - -ReceivesTelegramsImpl::ReceivesTelegramsImpl(unsigned short websocket_port) noexcept { - active_listener_ = std::thread([ObjectPtr = &websocket_server_] { ObjectPtr->process_messages(); }); - message_processor_ = std::thread([&]() { websocket_server_.run(websocket_port); }); -} - - -#endif //FUNNEL_RECEIVES_TELEGRAMS_CPP - diff --git a/src/receives_waypoints.cpp b/src/receives_waypoints.cpp new file mode 100644 index 0000000..646ce90 --- /dev/null +++ b/src/receives_waypoints.cpp @@ -0,0 +1,26 @@ +#ifndef FUNNEL_RECEIVES_WAYPOINTS_CPP +#define FUNNEL_RECEIVES_WAYPOINTS_CPP + +#include "./receives_waypoints.hpp" + +ReceivesWaypointsImpl::ReceivesWaypointsImpl(unsigned short websocket_port) noexcept { + active_listener_ = std::thread([ObjectPtr = &websocket_server_] { ObjectPtr->process_messages(); }); + message_processor_ = std::thread([&]() { websocket_server_.run(websocket_port); }); +} + +ReceivesWaypointsImpl::~ReceivesWaypointsImpl() noexcept { + websocket_server_.kill(); + active_listener_.join(); + message_processor_.join(); +} + +auto ReceivesWaypointsImpl::receive_waypoint([[maybe_unused]]grpc::ServerContext *context, + const tlms::GrpcWaypoint* waypoint, + tlms::ReturnCode *return_code) noexcept -> grpc::Status { + this->websocket_server_.queue_waypoint(waypoint); + return_code->set_status(0); + return grpc::Status::OK; +} + +#endif //FUNNEL_RECEIVES_WAYPOINTS_CPP + diff --git a/src/receives_telegrams.hpp b/src/receives_waypoints.hpp similarity index 72% rename from src/receives_telegrams.hpp rename to src/receives_waypoints.hpp index 0d1bb0b..4e8f329 100644 --- a/src/receives_telegrams.hpp +++ b/src/receives_waypoints.hpp @@ -17,16 +17,16 @@ #include #include -class ReceivesTelegramsImpl final : public tlms::ReceivesTelegrams::Service { +class ReceivesWaypointsImpl final : public tlms::ReceiveWaypoint::Service { private: BroadcastServer websocket_server_; std::thread message_processor_; std::thread active_listener_; public: - explicit ReceivesTelegramsImpl(unsigned short websocket_port) noexcept; - ~ReceivesTelegramsImpl() noexcept override; + explicit ReceivesWaypointsImpl(unsigned short websocket_port) noexcept; + ~ReceivesWaypointsImpl() noexcept override; - auto receive_r09(grpc::ServerContext *context, const tlms::R09GrpcTelegram *telegram, + auto receive_waypoint(grpc::ServerContext *context, const tlms::GrpcWaypoint* waypoint, tlms::ReturnCode *return_code) noexcept -> grpc::Status override; }; diff --git a/test.py b/test.py index 8ee60c8..eb1f224 100755 --- a/test.py +++ b/test.py @@ -6,8 +6,7 @@ from websockets import connect config = { - "regions": [0, 1], - "enrich": True + "regions": [1], } raw_config = json.dumps(config); @@ -20,5 +19,5 @@ async def hello(uri): print(data) #TODO: change this to new domain -asyncio.run(hello("wss://socket.dvb.solutions")) +asyncio.run(hello("wss://socket.staging.dvb.solutions")) #asyncio.run(hello("ws://127.0.0.1:9001"))