Source code for camera_tof_node
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Image
from std_msgs.msg import Bool, String
from cv_bridge import CvBridge
import cv2
from rubu import temas
import threading
import time
[docs]
class TemasCameraTofNode(Node):
"""
ROS2 node for streaming a TEMAS Time-of-Flight (ToF) camera.
This node connects to a TEMAS ToF camera, continuously publishes frames
as ROS2 images, and provides camera status and info topics.
**Parameters:**
- ``ip_address`` (str): IP address of the TEMAS device (default: '192.168.4.4')\n
**Example:**\n
``ros2 run temas_ros2 camera_tof --ros-args -p ip_address:=192.168.4.4``
**Published Topics:**
- ``temas/camera_tof/image_raw`` (:class:`sensor_msgs.msg.Image`) – live camera frames\n
**Example:**\n
``ros2 topic echo /temas/camera_tof/image_raw``
- ``temas/camera_tof/status`` (:class:`std_msgs.msg.Bool`) – frame status (True if frame received)\n
``ros2 topic echo /temas/camera_tof/status``
- ``temas/camera_tof/info`` (:class:`std_msgs.msg.String`) – connection info messages\n
``ros2 topic echo /temas/camera_tof/info``
**Usage Example:**
Start the node with default parameters:\n
``ros2 run temas_ros2 camera_tof``
Check if the camera is streaming:\n
``ros2 topic echo /temas/camera_tof/status``
View live images:\n
``ros2 run rqt_image_view rqt_image_view``\n
Then select topic `/temas/camera_tof/image_raw`.
**Notes:**
- The camera automatically starts a streaming thread.
- Node publishes `status` True when frames are received; False if no frame is available.
"""
def __init__(self):
"""Initialize the TEMAS ToF Camera Node."""
super().__init__('temas_camera_tof')
# Declare parameters
self.declare_parameter('ip_address', '192.168.4.4')
self.declare_parameter('port', 8084)
# Read initial parameters
self.ip_address = self.get_parameter('ip_address').value
self.port = self.get_parameter('port').value
# Publishers
self.image_pub = self.create_publisher(Image, 'temas/camera_tof/image_raw', 10)
self.status_pub = self.create_publisher(Bool, 'temas/camera_tof/status', 10)
self.info_pub = self.create_publisher(String, 'temas/camera_tof/info', 10)
self.bridge = CvBridge()
self.camera = None
self.running = False
self.thread = None
self.get_logger().info(f"🎥 Starting TEMAS ToF Camera Node on {self.ip_address}:{self.port}")
self.try_connect()
[docs]
def try_connect(self):
"""
Attempt to connect to the TEMAS ToF camera.
This method establishes the connection, starts the camera streaming thread,
and publishes initial info and status messages.
"""
try:
temas.Connect(ip_address=self.ip_address)
self.camera = temas.Camera(port=self.port)
self.camera.start_thread()
self.running = True
self.thread = threading.Thread(target=self.stream_camera, daemon=True)
self.thread.start()
msg = String()
msg.data = f"Camera connected at {self.ip_address}:{self.port}"
self.info_pub.publish(msg)
self.get_logger().info(msg.data)
except Exception as e:
self.get_logger().error(f"Failed to connect to camera: {e}")
self.running = False
status_msg = Bool()
status_msg.data = False
self.status_pub.publish(status_msg)
[docs]
def stream_camera(self):
"""
Continuously read frames from the camera and publish them as ROS2 Image messages.
Also publishes a status boolean indicating whether a frame was successfully received.
"""
self.get_logger().info("📡 Camera streaming thread started.")
rate = self.create_rate(30)
while self.running:
try:
frame = self.camera.get_frame()
if frame is not None:
ros_image = self.bridge.cv2_to_imgmsg(frame, encoding='bgr8')
self.image_pub.publish(ros_image)
status = Bool()
status.data = True
self.status_pub.publish(status)
else:
self.get_logger().warn("⚠️ No frame received.")
status = Bool()
status.data = False
self.status_pub.publish(status)
rate.sleep()
except Exception as e:
self.get_logger().error(f"Error reading frame: {e}")
time.sleep(1)
self.get_logger().info("🛑 Camera streaming stopped.")
[docs]
def destroy_node(self):
"""
Gracefully stop the camera thread and shut down the node.
Stops the camera thread if active and ensures proper ROS2 shutdown.
"""
self.get_logger().info("Shutting down TEMAS ToF camera node...")
self.running = False
if self.thread:
self.thread.join()
if self.camera:
try:
self.camera.stop_thread()
except Exception as e:
self.get_logger().warn(f"Error stopping camera: {e}")
super().destroy_node()
[docs]
def main(args=None):
"""
Main entry point for the TEMAS ToF Camera Node.
Initializes ROS2, spins the node, and handles shutdown on keyboard interrupt.
"""
rclpy.init(args=args)
node = TemasCameraTofNode()
try:
rclpy.spin(node)
except KeyboardInterrupt:
node.get_logger().info("Camera node interrupted by user.")
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()