Source code for camera_vis_node

#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Image
from std_msgs.msg import Bool, String
from cv_bridge import CvBridge
import threading
import time
from rubu import temas
from rcl_interfaces.msg import SetParametersResult

[docs] class TemasCameraVisNode(Node): """ ROS2 node for streaming a TEMAS RGB camera. This node connects to a TEMAS VIS camera, continuously publishes frames as ROS2 images, and provides camera status and info topics. Camera parameters (exposure, brightness, contrast, lens position, saturation, gain) can be set at startup or dynamically via ROS2 parameters. **Parameters:** - ``ip_address`` (str): IP address of the TEMAS device (default: '192.168.4.4')\n **Example:**\n ``ros2 run temas_ros2 camera_vis --ros-args -p ip_address:=192.168.4.4`` - ``set_exposure_time`` (int): Exposure time in microseconds (default: 17000) - ``set_brightness`` (int): Brightness (default: 50) - ``set_contrast`` (int): Contrast (default: 3) - ``set_lens_position`` (int): Lens focus position (default: 25) - ``set_saturation`` (int): Saturation (default: 3) - ``set_gain`` (float): Gain (default: 2.0) **Dynamic parameter change example:**\n ``ros2 param set /temas_camera_vis set_exposure_time 20000``\n ``ros2 param set /temas_camera_vis set_brightness 60`` **Published Topics:** - ``temas/camera_vis/image_raw`` (:class:`sensor_msgs.msg.Image`) – live camera frames\n **Example:**\n ``ros2 topic echo /temas/camera_vis/image_raw`` - ``temas/camera_vis/status`` (:class:`std_msgs.msg.Bool`) – frame status (True if frame received)\n ``ros2 topic echo /temas/camera_vis/status`` - ``temas/camera_vis/info`` (:class:`std_msgs.msg.String`) – connection info messages\n ``ros2 topic echo /temas/camera_vis/info`` **Usage Example:** Start the node with default parameters:\n ``ros2 run temas_ros2 camera_vis`` Change camera exposure at runtime:\n ``ros2 param set /temas_camera_vis set_exposure_time 25000`` View live images:\n ``ros2 run rqt_image_view rqt_image_view``\n Then select topic `/temas/camera_vis/image_raw`. **Notes:** - The node automatically starts a background thread to continuously fetch frames. - Status topic indicates whether frames are successfully received. - All camera settings can be updated dynamically via ROS2 parameters. """ def __init__(self): """Initialize the TEMAS Camera Visualization Node.""" super().__init__('temas_camera_vis') # Declare parameters self.declare_parameter('ip_address', '192.168.4.4') self.declare_parameter('port', 8081) self.declare_parameter('set_exposure_time', 17000) self.declare_parameter('set_brightness', 50) self.declare_parameter('set_contrast', 3) self.declare_parameter('set_lens_position', 25) self.declare_parameter('set_saturation', 3) self.declare_parameter('set_gain', 2.0) # Register parameter change callback self.add_on_set_parameters_callback(self.parameter_callback) # Read initial parameters self.ip_address = self.get_parameter('ip_address').value self.port = self.get_parameter('port').value self.exposure_time = self.get_parameter('set_exposure_time').value self.brightness = self.get_parameter('set_brightness').value self.contrast = self.get_parameter('set_contrast').value self.set_lens_position = self.get_parameter('set_lens_position').value self.set_saturation = self.get_parameter('set_saturation').value self.set_gain = self.get_parameter('set_gain').value # Publishers self.image_pub = self.create_publisher(Image, 'temas/camera_vis/image_raw', 10) self.status_pub = self.create_publisher(Bool, 'temas/camera_vis/status', 10) self.info_pub = self.create_publisher(String, 'temas/camera_vis/info', 10) self.bridge = CvBridge() self.camera = None self.running = False self.thread = None self.get_logger().info(f"🎥 Starting TEMAS Camera Node on {self.ip_address}:{self.port}") self.try_connect()
[docs] def try_connect(self): """ Attempt to connect to the TEMAS camera. This method establishes connection, starts the camera streaming thread, and applies the initial camera settings. """ try: temas.Connect(ip_address=self.ip_address) self.camera = temas.Camera(port=self.port) self.camera.start_thread() self.apply_camera_settings() self.running = True self.thread = threading.Thread(target=self.stream_camera, daemon=True) self.thread.start() msg = String() msg.data = f"Camera connected at {self.ip_address}:{self.port}" self.info_pub.publish(msg) self.get_logger().info(msg.data) except Exception as e: self.get_logger().error(f"Failed to connect to camera: {e}") self.running = False status_msg = Bool() status_msg.data = False self.status_pub.publish(status_msg)
[docs] def apply_camera_settings(self): """ Apply the current parameters to the camera. Methods from `temas.Camera` used: - set_exposure_time(microseconds: 50–33000) -> str: sets exposure - set_brightness(0–100) -> str: sets brightness - set_contrast(0–100) -> str: sets contrast - set_lens_position(0–100) -> str: sets lens position - set_saturation(0–100) -> str: sets saturation - set_gain(1.123–16.0) -> str: sets gain """ if self.camera: self.camera.set_exposure_time(self.exposure_time) self.camera.set_brightness(self.brightness) self.camera.set_contrast(self.contrast) self.camera.set_lens_position(self.set_lens_position) self.camera.set_saturation(self.set_saturation) self.camera.set_gain(self.set_gain) self.get_logger().info( f"🎛 Applied settings: exposure={self.exposure_time}, " f"brightness={self.brightness}, contrast={self.contrast}, " f"lens={self.set_lens_position}, " f"saturation={self.set_saturation}, gain={self.set_gain}" )
[docs] def parameter_callback(self, params): """ Handle runtime parameter changes. Updates camera settings immediately when parameters are changed. :param params: List of parameters being updated. :type params: list[rclpy.Parameter] :returns: Result of parameter update :rtype: SetParametersResult """ for param in params: if param.name == 'set_exposure_time' and param.type_ == param.Type.INTEGER: self.exposure_time = param.value elif param.name == 'set_brightness' and param.type_ == param.Type.INTEGER: self.brightness = param.value elif param.name == 'set_contrast' and param.type_ == param.Type.INTEGER: self.contrast = param.value elif param.name == 'set_lens_position' and param.type_ == param.Type.INTEGER: self.set_lens_position = param.value elif param.name == 'set_saturation' and param.type_ == param.Type.INTEGER: self.set_saturation = param.value elif param.name == 'set_gain' and param.type_ == param.Type.DOUBLE: self.set_gain = param.value self.apply_camera_settings() return SetParametersResult(successful=True)
[docs] def stream_camera(self): """ Continuously fetch frames from the TEMAS camera and publish them as ROS2 Images. Also publishes a status boolean indicating frame availability. """ self.get_logger().info("📡 Camera streaming thread started.") rate = self.create_rate(30) while self.running: try: frame = self.camera.get_frame() if frame is not None: ros_image = self.bridge.cv2_to_imgmsg(frame, encoding='bgr8') self.image_pub.publish(ros_image) status = Bool() status.data = True self.status_pub.publish(status) else: self.get_logger().warn("⚠️ No frame received.") status = Bool() status.data = False self.status_pub.publish(status) rate.sleep() except Exception as e: self.get_logger().error(f"Error reading frame: {e}") time.sleep(1) self.get_logger().info("🛑 Camera streaming stopped.")
[docs] def destroy_node(self): """ Gracefully stop the camera thread and shut down the node. """ self.get_logger().info("Shutting down TEMAS camera node...") self.running = False if self.thread: self.thread.join() if self.camera: try: self.camera.stop_thread() except Exception as e: self.get_logger().warn(f"Error stopping camera: {e}") super().destroy_node()
[docs] def main(args=None): """ Main function to initialize ROS2 and run the TEMAS Camera Node. :param args: Optional command line arguments """ rclpy.init(args=args) node = TemasCameraVisNode() try: rclpy.spin(node) except KeyboardInterrupt: node.get_logger().info("Camera node interrupted by user.") finally: node.destroy_node() rclpy.shutdown()
if __name__ == '__main__': main()