#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from rubu import temas
from std_msgs.msg import Bool, String, Float32
from example_interfaces.msg import Float32MultiArray
from threading import Thread
[docs]
class TemasControlNode(Node):
"""
ROS2 node for controlling the TEMAS device.
This node allows moving the device, starting/stopping point cloud scans,
and publishing control-related data such as distance, position, and scan progress.
**Parameters:**
- ``ip_address`` (str): IP address of TEMAS device (default: '192.168.4.4')\n
``ros2 run temas_ros2 control --ros-args -p ip_address:=192.168.4.4``
**Published Topics:**
- ``temas/control/distance`` (:class:`std_msgs.msg.Float32`) – current distance\n
``ros2 topic echo /temas/control/distance --once``
- ``temas/control/mean_distance`` (:class:`std_msgs.msg.Float32`) – mean distance\n
``ros2 topic echo /temas/control/mean_distance --once``
- ``temas/control/position`` (:class:`std_msgs.msg.String`) – current position\n
``ros2 topic echo /temas/control/position --once``
- ``temas/control/scan_progress`` (:class:`std_msgs.msg.Float32`) – point cloud scan progress [%]\n
``ros2 topic echo /temas/control/scan_progress --once``
**Subscribed Topics (Commands):**
- ``temas/control/move_pos`` (:class:`example_interfaces.msg.Float32MultiArray`) – move to [phi, theta]\n
``ros2 topic pub /temas/control/move_pos example_interfaces/msg/Float32MultiArray "{data: [30.0, 45.0]}" --once``
- ``temas/control/move_home`` (:class:`std_msgs.msg.Bool`) – move to home position\n
``ros2 topic pub /temas/control/move_home std_msgs/msg/Bool "{data: true}" --once``
- ``distance_request`` (:class:`std_msgs.msg.Float32`) – current distance\n
``ros2 topic pub /distance_request``
- ``mean_distance_request`` (:class:`std_msgs.msg.Float32`) – mean distance\n
``ros2 topic pub /mean_distance_request``
- ``temas/control/start_scan`` (:class:`example_interfaces.msg.Float32MultiArray`) – start point cloud scan with optional args [min_theta, max_theta, min_phi, max_phi, color_mode]\n
``ros2 topic pub /temas/control/start_scan example_interfaces/msg/Float32MultiArray "{data: [-30.0, 90.0, -60.0, 60.0, 0]}" --once``
- ``temas/control/stop_scan`` (:class:`std_msgs.msg.Bool`) – stop point cloud scan\n
``ros2 topic pub /temas/control/stop_scan std_msgs/msg/Bool "{data: true}" --once``
- ``temas/control/reset_commands`` (:class:`std_msgs.msg.Bool`) – reset start scan flag\n
``ros2 topic pub /temas/control/reset_commands std_msgs/msg/Bool "{data: true}" --once``
- ``temas/control/save_pcl`` (:class:`std_msgs.msg.String`) – save point cloud to specified path\n
``ros2 topic pub /temas/control/save_pcl std_msgs/msg/String "{data: '/home/user/'}" --once``
**Notes:**
- Start scan can only be executed once until reset.
- All commands are executed in background threads to avoid blocking.
- Node continuously publishes sensor/control data at ~10 Hz.
"""
def __init__(self):
"""Initialize the TEMAS Control Node and connect to the device."""
super().__init__('temas_control_full')
# -------- Parameters --------
self.declare_parameter('ip_address', '192.168.4.4')
self.declare_parameter('port', 8082)
self.ip_address = self.get_parameter('ip_address').value
self.port = self.get_parameter('port').value
# -------- Connect to TEMAS --------
self.device = temas.Connect(ip_address=self.ip_address)
try:
self.common = temas.Common()
hostname = self.common.get_hostname()
self.get_logger().info(f"Connecting to TEMAS with hostname {hostname}")
except Exception as e:
self.get_logger().error(f"❌ Connecting to TEMAS failed: {e}")
return
self.control = temas.Control()
# -------- Publishers --------
self.distance_pub = self.create_publisher(Float32, 'temas/control/distance', 10)
self.mean_distance_pub = self.create_publisher(Float32, 'temas/control/mean_distance', 10)
self.pos_pub = self.create_publisher(String, 'temas/control/position', 10)
self.scan_progress_pub = self.create_publisher(Float32, 'temas/control/scan_progress', 10)
# -------- Subscribers --------
self.create_subscription(Float32MultiArray, 'temas/control/move_pos', self.move_pos_cb, 10)
self.create_subscription(Bool, 'temas/control/move_home', self.move_home_cb, 10)
self.create_subscription(Float32MultiArray, 'temas/control/start_scan', self.start_scan_cb, 10)
self.create_subscription(Bool, 'temas/control/reset_commands', self.reset_commands_cb, 10)
self.create_subscription(Bool, 'temas/control/stop_scan', self.stop_scan_cb, 10)
self.create_subscription(String, 'temas/control/save_pcl', self.save_pcl_cb, 10)
# Flag: Start scan can only be executed once until reset
self._start_scan_executed = False
# -------- Background Publisher Thread --------
self.running = True
self.pub_thread = Thread(target=self.publish_loop, daemon=True)
self.pub_thread.start()
self.get_logger().info(f"✅ TEMAS Control Node (full) started on {self.ip_address}:{self.port}")
# ==============================================================
# Publisher Loop
# ==============================================================
[docs]
def publish_loop(self):
"""
Continuously publishes TEMAS data.
This method runs in a background thread and publishes:
- Distance
- Mean distance
- Pan/Tilt position
- Scan progress
at a fixed rate (10 Hz).
"""
rate = self.create_rate(10)
while self.running:
try:
# ---- Distance ----
dist_str = self.control.distance()
try:
dist = float(dist_str)
except (TypeError, ValueError):
dist = 0.0
self.distance_pub.publish(Float32(data=dist))
# ---- Mean Distance ----
try:
mean_dist = float(self.control.mean_distance())
except (TypeError, ValueError):
mean_dist = 0.0
self.mean_distance_pub.publish(Float32(data=mean_dist))
# ---- Position ----
pos_raw = self.control.get_pos()
self.pos_pub.publish(String(data=str(pos_raw).strip()))
# ---- Scan Progress ----
try:
progress = float(self.control.get_point_cloud_scan_percent())
except (TypeError, ValueError):
progress = 0.0
self.scan_progress_pub.publish(Float32(data=progress))
except Exception as e:
self.get_logger().debug(f"Publisher error: {e}")
rate.sleep()
# ==============================================================
# Command Callbacks
# ==============================================================
[docs]
def move_pos_cb(self, msg: Float32MultiArray):
"""
Move the TEMAS device to a specified position.
:param msg: Array containing [phi, theta] angles.
:type msg: Float32MultiArray
"""
if len(msg.data) != 2:
self.get_logger().warn("move_pos requires 2 values: [phi, theta]")
return
phi, theta = msg.data
Thread(target=self.control.move_pos, args=(phi, theta)).start()
self.get_logger().info(f"Move to phi={phi}, theta={theta} executed")
[docs]
def move_home_cb(self, msg: Bool):
"""
Move the TEMAS device to the home position.
:param msg: Boolean trigger
:type msg: Bool
"""
if msg.data:
Thread(target=self.control.move_home).start()
self.get_logger().info("Move home executed")
[docs]
def start_scan_cb(self, msg: Float32MultiArray):
"""
Start a point cloud scan (only once until reset).
:param msg: Optional array [theta1, theta2, phi1, phi2, color]
- theta1: min elevation (-30 to 90°, step 0.5°)
- theta2: max elevation (-30 to 90°, step 0.5°)
- phi1: min azimuth (-60 to 60°, step 0.5°)
- phi2: max azimuth (-60 to 60°, step 0.5°)
- color: 0=black-white, 1=color (default=0)
:type msg: Float32MultiArray
:returns: Status message indicating success. Returns 'True' if successful.
:rtype: str
"""
if self._start_scan_executed:
self.get_logger().info("Start scan already executed — waiting for reset")
return
args = [-30.0, 90.0, -60.0, 60.0, 0]
if msg.data and len(msg.data) == 5:
args = list(msg.data)
elif msg.data:
self.get_logger().warn("Start scan requires 5 parameters: [theta1, theta2, phi1, phi2, color]")
return
try:
args[-1] = int(args[-1])
if args[-1] not in (0, 1):
self.get_logger().warn(f"Invalid color mode {args[-1]} — using default 1.")
args[-1] = 1
except (ValueError, TypeError):
self.get_logger().warn("Color mode could not be converted to int — using default 1.")
args[-1] = 1
Thread(target=self.control.start_point_cloud_scan, args=tuple(args)).start()
self._start_scan_executed = True
self.get_logger().info(f"Start scan executed once with args: {args}")
[docs]
def stop_scan_cb(self, msg: Bool):
"""
Stop an ongoing point cloud scan.
:param msg: Boolean trigger
:type msg: Bool
:returns: Status message. Returns 'True' if successful.
:rtype: str
"""
if msg.data:
Thread(target=self.control.stop_point_cloud_scan).start()
self.get_logger().info("Scan stopped")
[docs]
def save_pcl_cb(self, msg: String):
"""
Save the latest point cloud to the specified path.
:param msg: File path
:type msg: String
:returns: Status message. Returns 'True' if successful.
:rtype: str
"""
path = msg.data.strip()
if path:
Thread(target=self.control.get_pcl, args=(path,)).start()
self.get_logger().info(f"PCL saved to {path}")
[docs]
def reset_commands_cb(self, msg: Bool):
"""
Reset internal command flags to allow starting a new scan.
:param msg: Boolean trigger
:type msg: Bool
"""
if msg.data:
self._start_scan_executed = False
self.get_logger().info("✅ Start scan command flag reset")
# ==============================================================
# Shutdown
# ==============================================================
[docs]
def destroy_node(self):
"""
Cleanly stop background threads and destroy the node.
"""
self.running = False
if self.pub_thread.is_alive():
self.pub_thread.join(timeout=2.0)
super().destroy_node()
# ==============================================================
# Main Entry Point
# ==============================================================
[docs]
def main(args=None):
"""
Main function to initialize ROS2 and spin the TEMAS control node.
:param args: Optional command line arguments
"""
rclpy.init(args=args)
node = TemasControlNode()
try:
rclpy.spin(node)
except KeyboardInterrupt:
node.get_logger().info("Shutting down TEMAS Control Node...")
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()