diff --git a/config/client.yaml b/config/client.yaml new file mode 100644 index 0000000..0339509 --- /dev/null +++ b/config/client.yaml @@ -0,0 +1,7 @@ +mqtt_ros_bridge: + ros__parameters: + srv_client: + topic: "service_topic" + type: "example_interfaces.srv:AddTwoInts" + interaction_type: "ros_client" + use_ros_serializer: False \ No newline at end of file diff --git a/launch/client_demo_launch.py b/launch/client_demo_launch.py new file mode 100644 index 0000000..b8b33f9 --- /dev/null +++ b/launch/client_demo_launch.py @@ -0,0 +1,27 @@ +import os + +from ament_index_python.packages import get_package_share_directory +from launch.actions import SetEnvironmentVariable +from launch.launch_description import LaunchDescription +from launch_ros.actions import Node + + +def generate_launch_description() -> LaunchDescription: + config = os.path.join( + get_package_share_directory('mqtt_ros_bridge'), + 'config', + 'client.yaml' + ) + + run_bridge_node = Node( + package='mqtt_ros_bridge', + executable='bridge_node', + emulate_tty=True, + output='screen', + arguments=[config] + ) + + return LaunchDescription([ + SetEnvironmentVariable("ROS_DOMAIN_ID", "1"), + run_bridge_node + ]) diff --git a/mqtt_ros_bridge/bridge_node.py b/mqtt_ros_bridge/bridge_node.py index 04c813b..1785cc8 100644 --- a/mqtt_ros_bridge/bridge_node.py +++ b/mqtt_ros_bridge/bridge_node.py @@ -57,8 +57,8 @@ def __str__(self) -> str: PARAMETER_INTERACTION_TYPE = "interaction_type" PARAMETER_USE_ROS_SERIALIZER = "use_ros_serializer" -SERVICE_REQUEST_POSTFIX = "_mqtt_bridge_request" -SERVICE_RESPONSE_POSTFIX = "_mqtt_bridge_response" +SRV_REQUEST_POSTFIX = "_mqtt_bridge_request" +SRV_RESPONSE_POSTFIX = "_mqtt_bridge_response" class BridgeNode(Node): @@ -105,7 +105,7 @@ def __init__(self, args: list[str]) -> None: self.create_subscription(topic_info.msg_type, topic_info.topic, callback, 10) case InteractionType.ROS_CLIENT: self.ros_clients[topic_info.topic] = \ - self.create_client(topic_info.topic, topic_info.topic) + self.create_client(topic_info.msg_type, topic_info.topic) case InteractionType.ROS_SERVER: pass # TODO case _: @@ -127,7 +127,7 @@ def topic_info_from_parameters(self, config: str) -> dict[str, TopicMsgInfo]: params[key] = Parameter.from_parameter_msg(parameter_msg) unique_names: set[str] = set() - for names in params.keys(): + for names in params: # TODO Check that right half matches PARAMETER_* unique_names.add(names.split(".")[0]) @@ -138,7 +138,7 @@ def topic_info_from_parameters(self, config: str) -> dict[str, TopicMsgInfo]: ros_serialiser = False interaction_type_str = params[f"{name}.{PARAMETER_INTERACTION_TYPE}"].value - if interaction_type_str not in INTERACTION_TYPES.keys(): + if interaction_type_str not in INTERACTION_TYPES: msg = f'Interaction types must be one of {INTERACTION_TYPES.keys()}' self.get_logger().error(msg) raise ValueError(msg) @@ -183,32 +183,43 @@ def mqtt_callback(self, _client: MQTT.Client, the message received over MQTT """ - if mqtt_msg.topic in self.topics: + if mqtt_msg.topic in self.topics and \ + self.topics[mqtt_msg.topic].interaction_type == InteractionType.ROS_PUBLISHER: topic_info = self.topics[mqtt_msg.topic] - elif mqtt_msg.topic + SERVICE_REQUEST_POSTFIX in self.topics: - topic_info = self.topics[mqtt_msg.topic + SERVICE_REQUEST_POSTFIX] - elif mqtt_msg.topic + SERVICE_RESPONSE_POSTFIX in self.topics: - topic_info = self.topics[mqtt_msg.topic + SERVICE_RESPONSE_POSTFIX] + ros_msg = self.mqtt_to_ros_message(mqtt_msg, topic_info) + self.ros_publishers[topic_info.topic].publish(ros_msg) + + elif mqtt_msg.topic + SRV_REQUEST_POSTFIX in self.topics and \ + self.topics[mqtt_msg.topic].interaction_type == InteractionType.ROS_CLIENT: + topic_info = self.topics[mqtt_msg.topic + SRV_REQUEST_POSTFIX] + ros_msg = self.mqtt_to_ros_message(mqtt_msg, topic_info) + request = topic_info.msg_type.Request() + future = self.ros_clients[topic_info.topic].call_async(request) + rclpy.spin_until_future_complete(self, future) + self.mqtt_client.publish( + topic_info.topic + SRV_RESPONSE_POSTFIX, + topic_info.serializer.serialize(future.result()) + ) + + elif mqtt_msg.topic + SRV_RESPONSE_POSTFIX in self.topics and \ + self.topics[mqtt_msg.topic].interaction_type == InteractionType.ROS_SERVER: + topic_info = self.topics[mqtt_msg.topic + SRV_RESPONSE_POSTFIX] + # TODO + else: return # Ignore MQTT messages on topics that weren't registered with us + def mqtt_to_ros_message(self, mqtt_msg: MQTT.MQTTMessage, + topic_info: TopicMsgInfo[MsgLikeT]) -> MsgLikeT: self.get_logger().info( f'MQTT RECEIVED: Topic: "{topic_info.topic}" Payload: "{mqtt_msg.payload!r}"') try: - ros_msg = topic_info.serializer.deserialize(mqtt_msg.payload, topic_info.msg_type) - except RMWError: + return topic_info.serializer.deserialize(mqtt_msg.payload, topic_info.msg_type) + except RMWError as e: self.get_logger().info('Dropping message with bad serialization received from' + f'MQTT on topic "{mqtt_msg.topic}": "{mqtt_msg.payload!r}"') - return - - if topic_info.interaction_type == InteractionType.ROS_PUBLISHER: - self.ros_publishers[topic_info.topic].publish(ros_msg) - elif topic_info.interaction_type == InteractionType.ROS_CLIENT: - request = topic_info.msg_type.Request() - future = self.ros_clients[topic_info.topic].call_async(request) - # TODO: send a message on topic + RESPONSE POSTFIX when this future returns - # TODO: server + raise e def main(args=None): diff --git a/package.xml b/package.xml index d4b0840..477069d 100644 --- a/package.xml +++ b/package.xml @@ -13,6 +13,7 @@ ros2launch std_msgs + example_interfaces ament_flake8 ament_pep257