Skip to content

Commit

Permalink
Services WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
benjaminwp18 committed May 2, 2024
1 parent 7910bf2 commit 2324a96
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 21 deletions.
7 changes: 7 additions & 0 deletions config/client.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
mqtt_ros_bridge:
ros__parameters:
srv_client:
topic: "service_topic"
type: "example_interfaces.srv:AddTwoInts"
interaction_type: "ros_client"
use_ros_serializer: False
27 changes: 27 additions & 0 deletions launch/client_demo_launch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import os

from ament_index_python.packages import get_package_share_directory
from launch.actions import SetEnvironmentVariable
from launch.launch_description import LaunchDescription
from launch_ros.actions import Node


def generate_launch_description() -> LaunchDescription:
config = os.path.join(
get_package_share_directory('mqtt_ros_bridge'),
'config',
'client.yaml'
)

run_bridge_node = Node(
package='mqtt_ros_bridge',
executable='bridge_node',
emulate_tty=True,
output='screen',
arguments=[config]
)

return LaunchDescription([
SetEnvironmentVariable("ROS_DOMAIN_ID", "1"),
run_bridge_node
])
53 changes: 32 additions & 21 deletions mqtt_ros_bridge/bridge_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ def __str__(self) -> str:
PARAMETER_INTERACTION_TYPE = "interaction_type"
PARAMETER_USE_ROS_SERIALIZER = "use_ros_serializer"

SERVICE_REQUEST_POSTFIX = "_mqtt_bridge_request"
SERVICE_RESPONSE_POSTFIX = "_mqtt_bridge_response"
SRV_REQUEST_POSTFIX = "_mqtt_bridge_request"
SRV_RESPONSE_POSTFIX = "_mqtt_bridge_response"


class BridgeNode(Node):
Expand Down Expand Up @@ -105,7 +105,7 @@ def __init__(self, args: list[str]) -> None:
self.create_subscription(topic_info.msg_type, topic_info.topic, callback, 10)
case InteractionType.ROS_CLIENT:
self.ros_clients[topic_info.topic] = \
self.create_client(topic_info.topic, topic_info.topic)
self.create_client(topic_info.msg_type, topic_info.topic)
case InteractionType.ROS_SERVER:
pass # TODO
case _:
Expand All @@ -127,7 +127,7 @@ def topic_info_from_parameters(self, config: str) -> dict[str, TopicMsgInfo]:
params[key] = Parameter.from_parameter_msg(parameter_msg)

unique_names: set[str] = set()
for names in params.keys():
for names in params:
# TODO Check that right half matches PARAMETER_*
unique_names.add(names.split(".")[0])

Expand All @@ -138,7 +138,7 @@ def topic_info_from_parameters(self, config: str) -> dict[str, TopicMsgInfo]:
ros_serialiser = False

interaction_type_str = params[f"{name}.{PARAMETER_INTERACTION_TYPE}"].value
if interaction_type_str not in INTERACTION_TYPES.keys():
if interaction_type_str not in INTERACTION_TYPES:
msg = f'Interaction types must be one of {INTERACTION_TYPES.keys()}'
self.get_logger().error(msg)
raise ValueError(msg)
Expand Down Expand Up @@ -183,32 +183,43 @@ def mqtt_callback(self, _client: MQTT.Client,
the message received over MQTT
"""
if mqtt_msg.topic in self.topics:
if mqtt_msg.topic in self.topics and \
self.topics[mqtt_msg.topic].interaction_type == InteractionType.ROS_PUBLISHER:
topic_info = self.topics[mqtt_msg.topic]
elif mqtt_msg.topic + SERVICE_REQUEST_POSTFIX in self.topics:
topic_info = self.topics[mqtt_msg.topic + SERVICE_REQUEST_POSTFIX]
elif mqtt_msg.topic + SERVICE_RESPONSE_POSTFIX in self.topics:
topic_info = self.topics[mqtt_msg.topic + SERVICE_RESPONSE_POSTFIX]
ros_msg = self.mqtt_to_ros_message(mqtt_msg, topic_info)
self.ros_publishers[topic_info.topic].publish(ros_msg)

elif mqtt_msg.topic + SRV_REQUEST_POSTFIX in self.topics and \
self.topics[mqtt_msg.topic].interaction_type == InteractionType.ROS_CLIENT:
topic_info = self.topics[mqtt_msg.topic + SRV_REQUEST_POSTFIX]
ros_msg = self.mqtt_to_ros_message(mqtt_msg, topic_info)
request = topic_info.msg_type.Request()
future = self.ros_clients[topic_info.topic].call_async(request)
rclpy.spin_until_future_complete(self, future)
self.mqtt_client.publish(
topic_info.topic + SRV_RESPONSE_POSTFIX,
topic_info.serializer.serialize(future.result())
)

elif mqtt_msg.topic + SRV_RESPONSE_POSTFIX in self.topics and \
self.topics[mqtt_msg.topic].interaction_type == InteractionType.ROS_SERVER:
topic_info = self.topics[mqtt_msg.topic + SRV_RESPONSE_POSTFIX]
# TODO

else:
return # Ignore MQTT messages on topics that weren't registered with us

def mqtt_to_ros_message(self, mqtt_msg: MQTT.MQTTMessage,
topic_info: TopicMsgInfo[MsgLikeT]) -> MsgLikeT:
self.get_logger().info(
f'MQTT RECEIVED: Topic: "{topic_info.topic}" Payload: "{mqtt_msg.payload!r}"')

try:
ros_msg = topic_info.serializer.deserialize(mqtt_msg.payload, topic_info.msg_type)
except RMWError:
return topic_info.serializer.deserialize(mqtt_msg.payload, topic_info.msg_type)
except RMWError as e:
self.get_logger().info('Dropping message with bad serialization received from' +
f'MQTT on topic "{mqtt_msg.topic}": "{mqtt_msg.payload!r}"')
return

if topic_info.interaction_type == InteractionType.ROS_PUBLISHER:
self.ros_publishers[topic_info.topic].publish(ros_msg)
elif topic_info.interaction_type == InteractionType.ROS_CLIENT:
request = topic_info.msg_type.Request()
future = self.ros_clients[topic_info.topic].call_async(request)
# TODO: send a message on topic + RESPONSE POSTFIX when this future returns
# TODO: server
raise e


def main(args=None):
Expand Down
1 change: 1 addition & 0 deletions package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

<exec_depend>ros2launch</exec_depend>
<exec_depend>std_msgs</exec_depend>
<exec_depend>example_interfaces</exec_depend>

<test_depend>ament_flake8</test_depend>
<test_depend>ament_pep257</test_depend>
Expand Down

0 comments on commit 2324a96

Please sign in to comment.