Source code for camera_tof_node

#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Image
from std_msgs.msg import Bool, String
from cv_bridge import CvBridge
import cv2
from rubu import temas
import threading
import time

[docs] class TemasCameraTofNode(Node): """ ROS2 node for streaming a TEMAS Time-of-Flight (ToF) camera. This node connects to a TEMAS ToF camera, continuously publishes frames as ROS2 images, and provides camera status and info topics. **Parameters:** - ``ip_address`` (str): IP address of the TEMAS device (default: '192.168.4.4')\n **Example:**\n ``ros2 run temas_ros2 camera_tof --ros-args -p ip_address:=192.168.4.4`` **Published Topics:** - ``temas/camera_tof/image_raw`` (:class:`sensor_msgs.msg.Image`) – live camera frames\n **Example:**\n ``ros2 topic echo /temas/camera_tof/image_raw`` - ``temas/camera_tof/status`` (:class:`std_msgs.msg.Bool`) – frame status (True if frame received)\n ``ros2 topic echo /temas/camera_tof/status`` - ``temas/camera_tof/info`` (:class:`std_msgs.msg.String`) – connection info messages\n ``ros2 topic echo /temas/camera_tof/info`` **Usage Example:** Start the node with default parameters:\n ``ros2 run temas_ros2 camera_tof`` Check if the camera is streaming:\n ``ros2 topic echo /temas/camera_tof/status`` View live images:\n ``ros2 run rqt_image_view rqt_image_view``\n Then select topic `/temas/camera_tof/image_raw`. **Notes:** - The camera automatically starts a streaming thread. - Node publishes `status` True when frames are received; False if no frame is available. """ def __init__(self): """Initialize the TEMAS ToF Camera Node.""" super().__init__('temas_camera_tof') # Declare parameters self.declare_parameter('ip_address', '192.168.4.4') self.declare_parameter('port', 8084) # Read initial parameters self.ip_address = self.get_parameter('ip_address').value self.port = self.get_parameter('port').value # Publishers self.image_pub = self.create_publisher(Image, 'temas/camera_tof/image_raw', 10) self.status_pub = self.create_publisher(Bool, 'temas/camera_tof/status', 10) self.info_pub = self.create_publisher(String, 'temas/camera_tof/info', 10) self.bridge = CvBridge() self.camera = None self.running = False self.thread = None self.get_logger().info(f"🎥 Starting TEMAS ToF Camera Node on {self.ip_address}:{self.port}") self.try_connect()
[docs] def try_connect(self): """ Attempt to connect to the TEMAS ToF camera. This method establishes the connection, starts the camera streaming thread, and publishes initial info and status messages. """ try: temas.Connect(ip_address=self.ip_address) self.camera = temas.Camera(port=self.port) self.camera.start_thread() self.running = True self.thread = threading.Thread(target=self.stream_camera, daemon=True) self.thread.start() msg = String() msg.data = f"Camera connected at {self.ip_address}:{self.port}" self.info_pub.publish(msg) self.get_logger().info(msg.data) except Exception as e: self.get_logger().error(f"Failed to connect to camera: {e}") self.running = False status_msg = Bool() status_msg.data = False self.status_pub.publish(status_msg)
[docs] def stream_camera(self): """ Continuously read frames from the camera and publish them as ROS2 Image messages. Also publishes a status boolean indicating whether a frame was successfully received. """ self.get_logger().info("📡 Camera streaming thread started.") rate = self.create_rate(30) while self.running: try: frame = self.camera.get_frame() if frame is not None: ros_image = self.bridge.cv2_to_imgmsg(frame, encoding='bgr8') self.image_pub.publish(ros_image) status = Bool() status.data = True self.status_pub.publish(status) else: self.get_logger().warn("⚠️ No frame received.") status = Bool() status.data = False self.status_pub.publish(status) rate.sleep() except Exception as e: self.get_logger().error(f"Error reading frame: {e}") time.sleep(1) self.get_logger().info("🛑 Camera streaming stopped.")
[docs] def destroy_node(self): """ Gracefully stop the camera thread and shut down the node. Stops the camera thread if active and ensures proper ROS2 shutdown. """ self.get_logger().info("Shutting down TEMAS ToF camera node...") self.running = False if self.thread: self.thread.join() if self.camera: try: self.camera.stop_thread() except Exception as e: self.get_logger().warn(f"Error stopping camera: {e}") super().destroy_node()
[docs] def main(args=None): """ Main entry point for the TEMAS ToF Camera Node. Initializes ROS2, spins the node, and handles shutdown on keyboard interrupt. """ rclpy.init(args=args) node = TemasCameraTofNode() try: rclpy.spin(node) except KeyboardInterrupt: node.get_logger().info("Camera node interrupted by user.") finally: node.destroy_node() rclpy.shutdown()
if __name__ == '__main__': main()