Source code for control_node

#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from rubu import temas
from std_msgs.msg import Bool, String, Float32
from example_interfaces.msg import Float32MultiArray
from threading import Thread


[docs] class TemasControlNode(Node): """ ROS2 node for controlling the TEMAS device. This node allows moving the device, starting/stopping point cloud scans, and publishing control-related data such as distance, position, and scan progress. **Parameters:** - ``ip_address`` (str): IP address of TEMAS device (default: '192.168.4.4')\n ``ros2 run temas_ros2 control --ros-args -p ip_address:=192.168.4.4`` **Published Topics:** - ``temas/control/distance`` (:class:`std_msgs.msg.Float32`) – current distance\n ``ros2 topic echo /temas/control/distance --once`` - ``temas/control/mean_distance`` (:class:`std_msgs.msg.Float32`) – mean distance\n ``ros2 topic echo /temas/control/mean_distance --once`` - ``temas/control/position`` (:class:`std_msgs.msg.String`) – current position\n ``ros2 topic echo /temas/control/position --once`` - ``temas/control/scan_progress`` (:class:`std_msgs.msg.Float32`) – point cloud scan progress [%]\n ``ros2 topic echo /temas/control/scan_progress --once`` **Subscribed Topics (Commands):** - ``temas/control/move_pos`` (:class:`example_interfaces.msg.Float32MultiArray`) – move to [phi, theta]\n ``ros2 topic pub /temas/control/move_pos example_interfaces/msg/Float32MultiArray "{data: [30.0, 45.0]}" --once`` - ``temas/control/move_home`` (:class:`std_msgs.msg.Bool`) – move to home position\n ``ros2 topic pub /temas/control/move_home std_msgs/msg/Bool "{data: true}" --once`` - ``distance_request`` (:class:`std_msgs.msg.Float32`) – current distance\n ``ros2 topic pub /distance_request`` - ``mean_distance_request`` (:class:`std_msgs.msg.Float32`) – mean distance\n ``ros2 topic pub /mean_distance_request`` - ``temas/control/start_scan`` (:class:`example_interfaces.msg.Float32MultiArray`) – start point cloud scan with optional args [min_theta, max_theta, min_phi, max_phi, color_mode]\n ``ros2 topic pub /temas/control/start_scan example_interfaces/msg/Float32MultiArray "{data: [-30.0, 90.0, -60.0, 60.0, 0]}" --once`` - ``temas/control/stop_scan`` (:class:`std_msgs.msg.Bool`) – stop point cloud scan\n ``ros2 topic pub /temas/control/stop_scan std_msgs/msg/Bool "{data: true}" --once`` - ``temas/control/reset_commands`` (:class:`std_msgs.msg.Bool`) – reset start scan flag\n ``ros2 topic pub /temas/control/reset_commands std_msgs/msg/Bool "{data: true}" --once`` - ``temas/control/save_pcl`` (:class:`std_msgs.msg.String`) – save point cloud to specified path\n ``ros2 topic pub /temas/control/save_pcl std_msgs/msg/String "{data: '/home/user/'}" --once`` **Notes:** - Start scan can only be executed once until reset. - All commands are executed in background threads to avoid blocking. - Node continuously publishes sensor/control data at ~10 Hz. """ def __init__(self): """Initialize the TEMAS Control Node and connect to the device.""" super().__init__('temas_control_full') # -------- Parameters -------- self.declare_parameter('ip_address', '192.168.4.4') self.declare_parameter('port', 8082) self.ip_address = self.get_parameter('ip_address').value self.port = self.get_parameter('port').value # -------- Connect to TEMAS -------- self.device = temas.Connect(ip_address=self.ip_address) try: self.common = temas.Common() hostname = self.common.get_hostname() self.get_logger().info(f"Connecting to TEMAS with hostname {hostname}") except Exception as e: self.get_logger().error(f"❌ Connecting to TEMAS failed: {e}") return self.control = temas.Control() # -------- Publishers -------- self.distance_pub = self.create_publisher(Float32, 'temas/control/distance', 10) self.mean_distance_pub = self.create_publisher(Float32, 'temas/control/mean_distance', 10) self.pos_pub = self.create_publisher(String, 'temas/control/position', 10) self.scan_progress_pub = self.create_publisher(Float32, 'temas/control/scan_progress', 10) # -------- Subscribers -------- self.create_subscription(Float32MultiArray, 'temas/control/move_pos', self.move_pos_cb, 10) self.create_subscription(Bool, 'temas/control/move_home', self.move_home_cb, 10) self.create_subscription(Float32MultiArray, 'temas/control/start_scan', self.start_scan_cb, 10) self.create_subscription(Bool, 'temas/control/reset_commands', self.reset_commands_cb, 10) self.create_subscription(Bool, 'temas/control/stop_scan', self.stop_scan_cb, 10) self.create_subscription(String, 'temas/control/save_pcl', self.save_pcl_cb, 10) # Flag: Start scan can only be executed once until reset self._start_scan_executed = False # -------- Background Publisher Thread -------- self.running = True self.pub_thread = Thread(target=self.publish_loop, daemon=True) self.pub_thread.start() self.get_logger().info(f"✅ TEMAS Control Node (full) started on {self.ip_address}:{self.port}") # ============================================================== # Publisher Loop # ==============================================================
[docs] def publish_loop(self): """ Continuously publishes TEMAS data. This method runs in a background thread and publishes: - Distance - Mean distance - Pan/Tilt position - Scan progress at a fixed rate (10 Hz). """ rate = self.create_rate(10) while self.running: try: # ---- Distance ---- dist_str = self.control.distance() try: dist = float(dist_str) except (TypeError, ValueError): dist = 0.0 self.distance_pub.publish(Float32(data=dist)) # ---- Mean Distance ---- try: mean_dist = float(self.control.mean_distance()) except (TypeError, ValueError): mean_dist = 0.0 self.mean_distance_pub.publish(Float32(data=mean_dist)) # ---- Position ---- pos_raw = self.control.get_pos() self.pos_pub.publish(String(data=str(pos_raw).strip())) # ---- Scan Progress ---- try: progress = float(self.control.get_point_cloud_scan_percent()) except (TypeError, ValueError): progress = 0.0 self.scan_progress_pub.publish(Float32(data=progress)) except Exception as e: self.get_logger().debug(f"Publisher error: {e}") rate.sleep()
# ============================================================== # Command Callbacks # ==============================================================
[docs] def move_pos_cb(self, msg: Float32MultiArray): """ Move the TEMAS device to a specified position. :param msg: Array containing [phi, theta] angles. :type msg: Float32MultiArray """ if len(msg.data) != 2: self.get_logger().warn("move_pos requires 2 values: [phi, theta]") return phi, theta = msg.data Thread(target=self.control.move_pos, args=(phi, theta)).start() self.get_logger().info(f"Move to phi={phi}, theta={theta} executed")
[docs] def move_home_cb(self, msg: Bool): """ Move the TEMAS device to the home position. :param msg: Boolean trigger :type msg: Bool """ if msg.data: Thread(target=self.control.move_home).start() self.get_logger().info("Move home executed")
[docs] def start_scan_cb(self, msg: Float32MultiArray): """ Start a point cloud scan (only once until reset). :param msg: Optional array [theta1, theta2, phi1, phi2, color] - theta1: min elevation (-30 to 90°, step 0.5°) - theta2: max elevation (-30 to 90°, step 0.5°) - phi1: min azimuth (-60 to 60°, step 0.5°) - phi2: max azimuth (-60 to 60°, step 0.5°) - color: 0=black-white, 1=color (default=0) :type msg: Float32MultiArray :returns: Status message indicating success. Returns 'True' if successful. :rtype: str """ if self._start_scan_executed: self.get_logger().info("Start scan already executed — waiting for reset") return args = [-30.0, 90.0, -60.0, 60.0, 0] if msg.data and len(msg.data) == 5: args = list(msg.data) elif msg.data: self.get_logger().warn("Start scan requires 5 parameters: [theta1, theta2, phi1, phi2, color]") return try: args[-1] = int(args[-1]) if args[-1] not in (0, 1): self.get_logger().warn(f"Invalid color mode {args[-1]} — using default 1.") args[-1] = 1 except (ValueError, TypeError): self.get_logger().warn("Color mode could not be converted to int — using default 1.") args[-1] = 1 Thread(target=self.control.start_point_cloud_scan, args=tuple(args)).start() self._start_scan_executed = True self.get_logger().info(f"Start scan executed once with args: {args}")
[docs] def stop_scan_cb(self, msg: Bool): """ Stop an ongoing point cloud scan. :param msg: Boolean trigger :type msg: Bool :returns: Status message. Returns 'True' if successful. :rtype: str """ if msg.data: Thread(target=self.control.stop_point_cloud_scan).start() self.get_logger().info("Scan stopped")
[docs] def save_pcl_cb(self, msg: String): """ Save the latest point cloud to the specified path. :param msg: File path :type msg: String :returns: Status message. Returns 'True' if successful. :rtype: str """ path = msg.data.strip() if path: Thread(target=self.control.get_pcl, args=(path,)).start() self.get_logger().info(f"PCL saved to {path}")
[docs] def reset_commands_cb(self, msg: Bool): """ Reset internal command flags to allow starting a new scan. :param msg: Boolean trigger :type msg: Bool """ if msg.data: self._start_scan_executed = False self.get_logger().info("✅ Start scan command flag reset")
# ============================================================== # Shutdown # ==============================================================
[docs] def destroy_node(self): """ Cleanly stop background threads and destroy the node. """ self.running = False if self.pub_thread.is_alive(): self.pub_thread.join(timeout=2.0) super().destroy_node()
# ============================================================== # Main Entry Point # ==============================================================
[docs] def main(args=None): """ Main function to initialize ROS2 and spin the TEMAS control node. :param args: Optional command line arguments """ rclpy.init(args=args) node = TemasControlNode() try: rclpy.spin(node) except KeyboardInterrupt: node.get_logger().info("Shutting down TEMAS Control Node...") finally: node.destroy_node() rclpy.shutdown()
if __name__ == '__main__': main()