Skip to content

Commit

Permalink
funnel now handles waypoints
Browse files Browse the repository at this point in the history
  • Loading branch information
tanneberger committed Mar 29, 2023
1 parent b176874 commit cbcdd74
Show file tree
Hide file tree
Showing 11 changed files with 57 additions and 148 deletions.
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pkg_check_modules(CURLPP REQUIRED curlpp)
add_executable(funnel
src/main.cpp
src/broadcast_server.cpp
src/receives_telegrams.cpp
src/receives_waypoints.cpp
src/protobuf/telegram.pb.cc
src/protobuf/telegram.grpc.pb.cc
src/prometheus.cpp
Expand Down
2 changes: 1 addition & 1 deletion derivation.nix
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ let
in
stdenv.mkDerivation {
pname = "funnel";
version = "0.1.0";
version = "0.2.0";

src = ./.;

Expand Down
18 changes: 9 additions & 9 deletions flake.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

93 changes: 6 additions & 87 deletions src/broadcast_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -158,107 +158,26 @@ void BroadcastServer::process_messages() noexcept {
}
}

auto BroadcastServer::fetch_api(unsigned int line, unsigned int run, unsigned int region) const noexcept
-> tlms::Edge * {
if (!api_url_.has_value()) {
return nullptr;
}

RequestInterpolated request{line, run};

std::string body = JS::serializeStruct(request);

std::list<std::string> header;
header.emplace_back("Content-Type: application/json");

curlpp::Cleanup clean;
curlpp::Easy r;
r.setOpt(new curlpp::options::Url(api_url_.value() + "/vehicles/" + std::to_string(region) + "/position"));
r.setOpt(new curlpp::options::HttpHeader(header));
r.setOpt(new curlpp::options::PostFields(body));
r.setOpt(new curlpp::options::PostFieldSize(static_cast<long>(body.length())));

std::stringstream response;
r.setOpt(new curlpp::options::WriteStream(&response));

r.perform();

auto http_code = curlpp::infos::ResponseCode::get(r);

if (http_code == 200) {
std::string json_string = response.str();
auto protobuf_string = google::protobuf::StringPiece(json_string);
google::protobuf::util::JsonParseOptions parse_options;
parse_options.case_insensitive_enum_parsing = false;
parse_options.ignore_unknown_fields = true;
auto *interpolation_struct = new tlms::Edge();

auto status = google::protobuf::util::JsonStringToMessage(protobuf_string, interpolation_struct, parse_options);

if (status.ok()) {
return interpolation_struct;
} else {
std::cout << "couldn't decode:" << json_string << " error:" << status.message() << std::endl;
return nullptr;
}
} else {
return nullptr;
}
}

void BroadcastServer::queue_telegram(const tlms::R09GrpcTelegram *telegram) noexcept {
void BroadcastServer::queue_waypoint(const tlms::GrpcWaypoint *waypoint) noexcept {
// serialize the protobuf struct into json string
std::string plain_serialized;
plain_serialized.reserve(400);

std::string enriched_serialized;
enriched_serialized.reserve(600);

google::protobuf::util::JsonPrintOptions options;
options.always_print_primitive_fields = true;
options.preserve_proto_field_names = true;
options.always_print_enums_as_ints = true;

google::protobuf::util::MessageToJsonString(*telegram, &plain_serialized, options);
auto interpolation_data = fetch_api(telegram->line(), telegram->run_number(), telegram->region());

bool enrichment_possible = interpolation_data != nullptr;
tlms::R09GrpcTelegramEnriched enriched_telegram;
if (enrichment_possible) {
enriched_telegram.set_time(telegram->time());
enriched_telegram.set_station(telegram->station());
enriched_telegram.set_region(telegram->region());
enriched_telegram.set_telegram_type(telegram->telegram_type());
enriched_telegram.set_delay(telegram->delay());
enriched_telegram.set_reporting_point(telegram->reporting_point());
enriched_telegram.set_junction(telegram->junction());
enriched_telegram.set_direction(telegram->direction());
enriched_telegram.set_request_status(telegram->request_status());
enriched_telegram.set_priority(telegram->priority());
enriched_telegram.set_direction_request(telegram->direction_request());
enriched_telegram.set_line(telegram->line());
enriched_telegram.set_run_number(telegram->run_number());
enriched_telegram.set_destination_number(telegram->destination_number());
enriched_telegram.set_train_length(telegram->train_length());
enriched_telegram.set_vehicle_number(telegram->vehicle_number());
enriched_telegram.set_operator_(telegram->operator_());
enriched_telegram.set_allocated_enriched(interpolation_data);

google::protobuf::util::MessageToJsonString(enriched_telegram, &enriched_serialized, options);
}

// lock connection list and yeet the telegram to all peers
google::protobuf::util::MessageToJsonString(*waypoint, &plain_serialized, options);

// lock connection list and yeet the waypoint to all peers
{
std::lock_guard<std::mutex> guard(connection_lock_);
//connection_list::iterator it;
auto filter_iterator = filters_.begin();
for (auto &connection: connections_) {
if (filter_iterator->match(telegram->line(), telegram->reporting_point(), telegram->region())) {
if (filter_iterator->enrich && enrichment_possible) {
server_.send(connection, enriched_serialized, websocketpp::frame::opcode::TEXT);
} else {
server_.send(connection, plain_serialized, websocketpp::frame::opcode::TEXT);
}
if (filter_iterator->match(waypoint->line(), waypoint->region())) {
server_.send(connection, plain_serialized, websocketpp::frame::opcode::TEXT);
}
filter_iterator++;
}
Expand Down
3 changes: 1 addition & 2 deletions src/broadcast_server.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,8 @@ class BroadcastServer {
void on_close(connection_hdl hdl) noexcept;
void on_message(connection_hdl hdl, server::message_ptr msg) noexcept;
void process_messages() noexcept;
void queue_telegram(const tlms::R09GrpcTelegram* telegram) noexcept;
void queue_waypoint(const tlms::GrpcWaypoint* waypoint) noexcept;
void kill() noexcept;
auto fetch_api(unsigned int line, unsigned int run, unsigned int region) const noexcept -> tlms::Edge*;
};

#endif //FUNNEL_BROADCAST_SERVER_HPP
Expand Down
17 changes: 5 additions & 12 deletions src/filter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,30 +9,23 @@

struct Filter {
std::vector<unsigned int> lines;
std::vector<unsigned int> positions;
std::vector<unsigned int> regions;
bool enrich = false;

JS_OBJ(lines, positions, regions, enrich);
JS_OBJ(lines, regions);

Filter(
std::vector<unsigned int> other_line,
std::vector<unsigned int> other_position,
std::vector<unsigned int> other_region,
bool enrich
) : lines(std::move(other_line)), positions(std::move(other_position)), regions(std::move(other_region)), enrich(enrich) {}
std::vector<unsigned int> other_region)
: lines(std::move(other_line)), regions(std::move(other_region)) {}

Filter() = default;

[[nodiscard]] auto match(unsigned int other_line, unsigned int other_position, unsigned int other_region) const noexcept -> bool {
[[nodiscard]] auto match(unsigned int other_line, unsigned int other_region) const noexcept -> bool {
bool matches_line =
(std::find(std::begin(lines), std::end(lines), other_line) != std::end(lines)) || lines.empty();
bool matches_position =
(std::find(std::begin(positions), std::end(positions), other_position) != std::end(positions)) ||
positions.empty();
bool matches_region = (std::find(std::begin(regions), std::end(regions), other_region) != std::end(regions)) ||
regions.empty();
return matches_line && matches_region && matches_position;
return matches_line && matches_region;
}
};

Expand Down
4 changes: 2 additions & 2 deletions src/main.cpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//#define ASIO_STANDALONE
//#define _WEBSOCKETPP_CPP11_TYPE_TRAITS_

#include "receives_telegrams.hpp"
#include "receives_waypoints.hpp"

#include <cstdlib>
#include <string>
Expand All @@ -16,7 +16,7 @@ int main() {
unsigned short websocket_port = static_cast<unsigned short>(std::stoi(std::getenv("WEBSOCKET_PORT")));

std::string server_address("127.0.0.1:" + std::to_string(grpc_port));
ReceivesTelegramsImpl service(websocket_port);
ReceivesWaypointsImpl service(websocket_port);

grpc::ServerBuilder builder;
builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
Expand Down
27 changes: 0 additions & 27 deletions src/receives_telegrams.cpp

This file was deleted.

26 changes: 26 additions & 0 deletions src/receives_waypoints.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#ifndef FUNNEL_RECEIVES_WAYPOINTS_CPP
#define FUNNEL_RECEIVES_WAYPOINTS_CPP

#include "./receives_waypoints.hpp"

ReceivesWaypointsImpl::ReceivesWaypointsImpl(unsigned short websocket_port) noexcept {
active_listener_ = std::thread([ObjectPtr = &websocket_server_] { ObjectPtr->process_messages(); });
message_processor_ = std::thread([&]() { websocket_server_.run(websocket_port); });
}

ReceivesWaypointsImpl::~ReceivesWaypointsImpl() noexcept {
websocket_server_.kill();
active_listener_.join();
message_processor_.join();
}

auto ReceivesWaypointsImpl::receive_waypoint([[maybe_unused]]grpc::ServerContext *context,
const tlms::GrpcWaypoint* waypoint,
tlms::ReturnCode *return_code) noexcept -> grpc::Status {
this->websocket_server_.queue_waypoint(waypoint);
return_code->set_status(0);
return grpc::Status::OK;
}

#endif //FUNNEL_RECEIVES_WAYPOINTS_CPP

8 changes: 4 additions & 4 deletions src/receives_telegrams.hpp → src/receives_waypoints.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,16 @@
#include <grpcpp/server_builder.h>
#include <grpcpp/server_context.h>

class ReceivesTelegramsImpl final : public tlms::ReceivesTelegrams::Service {
class ReceivesWaypointsImpl final : public tlms::ReceiveWaypoint::Service {
private:
BroadcastServer websocket_server_;
std::thread message_processor_;
std::thread active_listener_;
public:
explicit ReceivesTelegramsImpl(unsigned short websocket_port) noexcept;
~ReceivesTelegramsImpl() noexcept override;
explicit ReceivesWaypointsImpl(unsigned short websocket_port) noexcept;
~ReceivesWaypointsImpl() noexcept override;

auto receive_r09(grpc::ServerContext *context, const tlms::R09GrpcTelegram *telegram,
auto receive_waypoint(grpc::ServerContext *context, const tlms::GrpcWaypoint* waypoint,
tlms::ReturnCode *return_code) noexcept -> grpc::Status override;
};

Expand Down
5 changes: 2 additions & 3 deletions test.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@
from websockets import connect

config = {
"regions": [0, 1],
"enrich": True
"regions": [1],
}

raw_config = json.dumps(config);
Expand All @@ -20,5 +19,5 @@ async def hello(uri):
print(data)

#TODO: change this to new domain
asyncio.run(hello("wss://socket.dvb.solutions"))
asyncio.run(hello("wss://socket.staging.dvb.solutions"))
#asyncio.run(hello("ws://127.0.0.1:9001"))

0 comments on commit cbcdd74

Please sign in to comment.