Skip to content

Commit

Permalink
Merge pull request #46 from Auterion/intelligent-callbacks
Browse files Browse the repository at this point in the history
connection: added a more intelligent callback API
  • Loading branch information
ThomasDebrunner authored May 21, 2024
2 parents 141060b + 501d97d commit e7544fb
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 7 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ jobs:
sonarcloud-build-wrapper : build-wrapper-linux-x86-64
- runner: ubuntu-20.04
sonarcloud-build-wrapper : build-wrapper-linux-x86-64
- runner: macos-latest
- runner: macos-13
sonarcloud-build-wrapper : build-wrapper-macosx-x86
runs-on: ${{ matrix.platform.runner }}
steps:
Expand Down
33 changes: 28 additions & 5 deletions include/mav/Connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -192,18 +192,41 @@ namespace mav {
return !_underlying_network_fault && (millis() - _last_received_ms < CONNECTION_TIMEOUT);
}

template<typename T, typename E>
CallbackHandle addMessageCallback(const T &on_message, const E &on_error) {
CallbackHandle addMessageCallback(const std::function<void(const mav::Message&)> &on_message,
const std::function<void(const std::exception_ptr&)> &on_error) {
std::scoped_lock<std::mutex> lock(_message_callback_mtx);
CallbackHandle handle = _next_handle;
_message_callbacks[handle] = FunctionCallback{on_message, on_error};
_next_handle++;
return handle;
}

template<typename T>
CallbackHandle addMessageCallback(const T &on_message) {
return addMessageCallback(on_message, nullptr);
CallbackHandle addMessageCallback(const std::function<void(const mav::Message&)> &on_message) {
return addMessageCallback(on_message, std::function<void(const std::exception_ptr&)>{});
}

CallbackHandle addMessageCallback(const std::function<bool(const mav::Message&)> &selector,
const std::function<void(const mav::Message&)> &on_message,
const std::function<void(const std::exception_ptr&)> &on_error) {
return addMessageCallback([selector, on_message](const Message &message) {
if (selector(message)) {
on_message(message);
}
}, on_error);
}

CallbackHandle addMessageCallback(int message_id, const std::function<void(const mav::Message&)> &on_message,
int source_id=mav::ANY_ID, int component_id=mav::ANY_ID) {
return addMessageCallback([message_id, source_id, component_id](const Message &message) {
return message.id() == message_id &&
(source_id == mav::ANY_ID || message.header().systemId() == source_id) &&
(component_id == mav::ANY_ID || message.header().componentId() == component_id);
}, on_message, std::function<void(const std::exception_ptr&)>{});
}

CallbackHandle addMessageCallback(const std::string &message_name, const std::function<void(const mav::Message&)> &on_message,
int source_id=mav::ANY_ID, int component_id=mav::ANY_ID) {
return addMessageCallback(_message_set.idForMessage(message_name), on_message, source_id, component_id);
}

void removeMessageCallback(CallbackHandle handle) {
Expand Down
74 changes: 73 additions & 1 deletion tests/Network.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ uint64_t getTimestamp() {
return 770479200;
}

TEST_CASE("Create network runtime") {
TEST_CASE("Network runtime") {

MessageSet message_set;
message_set.addFromXMLString(R"(
Expand Down Expand Up @@ -155,6 +155,24 @@ TEST_CASE("Create network runtime") {
CHECK_EQ(message.get<std::string>("text"), "Hello World!");
}

SUBCASE("Selects correct message for specific message id, system id, component id") {
interface.reset();
auto expectation = connection->expect("TEST_MESSAGE", 1, 1);
// message with wrong system id
interface.addToReceiveQueue("\xfd\x10\x00\x00\x01\x02\x01\xbc\x26\x00\x2a\x00\x00\x00\x48\x65\x6c\x6c\x6f\x20\x57\x6f\x72\x6c\x64\x21\xa0\xcb"s, interface_partner);
// message with wrong component id
interface.addToReceiveQueue("\xfd\x10\x00\x00\x01\x01\x02\xbc\x26\x00\x2a\x00\x00\x00\x48\x65\x6c\x6c\x6f\x20\x57\x6f\x72\x6c\x64\x21\xe2\x61"s, interface_partner);
// message with wrong message id
interface.addToReceiveQueue("\xfd\x09\x00\x00\x00\xfd\x01\x00\x00\x00\x04\x00\x00\x00\x01\x02\x03\x05\x06\x77\x53"s, interface_partner);
// message with correct system id and component id and message id
interface.addToReceiveQueue("\xfd\x10\x00\x00\x01\x01\x01\xbc\x26\x00\x2a\x00\x00\x00\x48\x65\x6c\x6c\x6f\x20\x57\x6f\x72\x6c\x64\x21\x56\x38"s, interface_partner);
auto message = connection->receive(expectation);
// we should only have received the last message
CHECK_EQ(message.name(), "TEST_MESSAGE");
CHECK_EQ(message.header().systemId(), 1);
CHECK_EQ(message.header().componentId(), 1);
}


SUBCASE("Message sent twice before receive") {
interface.reset();
Expand Down Expand Up @@ -308,4 +326,58 @@ TEST_CASE("Create network runtime") {
connection->receive("HEARTBEAT");
CHECK_EQ(connection->callbackCount(), 0);
}

SUBCASE("Message callback for specific message is called when message arrives") {
interface.reset();
std::promise<void> callback_called_promise;
auto callback_called_future = callback_called_promise.get_future();

connection->addMessageCallback("TEST_MESSAGE", [&callback_called_promise](const Message &message) {
if (message.name() == "TEST_MESSAGE") {
callback_called_promise.set_value();
}
});

interface.addToReceiveQueue("\xfd\x10\x00\x00\x01\x61\x61\xbc\x26\x00\x2a\x00\x00\x00\x48\x65\x6c\x6c\x6f\x20\x57\x6f\x72\x6c\x64\x21\x53\xd9"s, interface_partner);
CHECK((callback_called_future.wait_for(std::chrono::seconds(2)) != std::future_status::timeout));
connection->removeAllCallbacks();
}

SUBCASE("Can specify message callbacks for message id, source system id and source component id") {
interface.reset();

// these should not get called
connection->addMessageCallback(9916, [](const Message &message) {
FAIL("This callback should not be called");
}, 1, 2);

connection->addMessageCallback(9916, [](const Message &message) {
FAIL("This callback should not be called");
}, 2, 1);

connection->addMessageCallback(9917, [](const Message &message) {
FAIL("This callback should not be called");
}, 1, 1);

// run a send-receive twice - if we succeed the second time around, we know for sure that the FAIL were not
// called from the first time around, since there is only a single receive thread.
for (int i=0; i<2; i++) {
std::promise<void> callback_called_promise;
auto callback_called_future = callback_called_promise.get_future();


// this should get called
auto cb = connection->addMessageCallback(9916, [&callback_called_promise](const Message &message) {
if (message.name() == "TEST_MESSAGE") {
callback_called_promise.set_value();
}
}, 1, 1);

// message id is 9916, source system id is 1, source component id is 1
interface.addToReceiveQueue("\xfd\x10\x00\x00\x01\x01\x01\xbc\x26\x00\x2a\x00\x00\x00\x48\x65\x6c\x6c\x6f\x20\x57\x6f\x72\x6c\x64\x21\x56\x38"s, interface_partner);
CHECK((callback_called_future.wait_for(std::chrono::seconds(2)) != std::future_status::timeout));
connection->removeMessageCallback(cb);
}
connection->removeAllCallbacks();
}
}

0 comments on commit e7544fb

Please sign in to comment.