Source code for common_node

#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from rubu import temas
from std_srvs.srv import Trigger
from std_msgs.msg import String


[docs] class TemasCommonNode(Node): """ ROS2 node for interacting with TEMAS Common functionalities. Provides services for actions, data queries, and configuration using Trigger services. Parameters ---------- ip_address : str IP address of TEMAS device (default: '192.168.4.4')\n Example:\n ``ros2 run temas_ros2 common --ros-args -p ip_address:=192.168.4.4`` Published Topics ---------------- temas/common/info : std_msgs.msg.String Status or info messages from the node.\n Example:\n ``ros2 topic echo /temas/common/info`` Services -------- Actions ------- shutdown : Trigger Shutdown TEMAS device\n ``ros2 service call /temas/common/shutdown std_srvs/srv/Trigger`` restart : Trigger Restart TEMAS device\n ``ros2 service call /temas/common/restart std_srvs/srv/Trigger`` near_mode_on : Trigger Enable near mode\n ``ros2 service call /temas/common/near_mode_on std_srvs/srv/Trigger`` near_mode_off : Trigger Disable near mode\n ``ros2 service call /temas/common/near_mode_off std_srvs/srv/Trigger`` Data Queries ------------ get_fw_version : Trigger Get firmware version\n ``ros2 service call /temas/common/get_fw_version std_srvs/srv/Trigger`` get_hostname : Trigger Get hostname\n ``ros2 service call /temas/common/get_hostname std_srvs/srv/Trigger`` get_ip : Trigger Get IP address\n ``ros2 service call /temas/common/get_ip std_srvs/srv/Trigger`` get_mac : Trigger Get MAC address\n ``ros2 service call /temas/common/get_mac std_srvs/srv/Trigger`` get_sn : Trigger Get serial number\n ``ros2 service call /temas/common/get_sn std_srvs/srv/Trigger`` get_temperature : Trigger Get internal temperature\n ``ros2 service call /temas/common/get_temperature std_srvs/srv/Trigger`` get_ports : Trigger Get ports of TEMAS subsystems\n ``ros2 service call /temas/common/get_ports std_srvs/srv/Trigger`` get_laser_x_los : Trigger Get laser X line-of-sight position\n ``ros2 service call /temas/common/get_laser_x_los std_srvs/srv/Trigger`` get_laser_y_los : Trigger Get laser Y line-of-sight position\n ``ros2 service call /temas/common/get_laser_y_los std_srvs/srv/Trigger`` """ def __init__(self): """ Initialize the TEMAS Common Node and connect to the device. """ super().__init__('temas_common') # -------- Parameters -------- self.declare_parameter('ip_address', '192.168.4.4') self.declare_parameter('port', 8083) self.ip_address = self.get_parameter('ip_address').value self.port = self.get_parameter('port').value # -------- Connect to TEMAS -------- self.device = temas.Connect(ip_address=self.ip_address) try: self.common = temas.Common() hostname = self.common.get_hostname() self.get_logger().info(f"Connecting to TEMAS with hostname {hostname}") except Exception as e: self.get_logger().error(f"❌ Connecting to TEMAS failed: {e}") return # Publisher for status info self.info_pub = self.create_publisher(String, 'temas/common/info', 10) # -------- Action Services -------- self.shutdown_srv = self.create_service(Trigger, 'temas/common/shutdown', self.handle_shutdown) self.restart_srv = self.create_service(Trigger, 'temas/common/restart', self.handle_restart) self.near_on_srv = self.create_service(Trigger, 'temas/common/near_mode_on', self.handle_near_on) self.near_off_srv = self.create_service(Trigger, 'temas/common/near_mode_off', self.handle_near_off) # -------- Data Services -------- self.get_fw_srv = self.create_service(Trigger, 'temas/common/get_fw_version', self.handle_get_fw) self.get_hostname_srv = self.create_service(Trigger, 'temas/common/get_hostname', self.handle_get_hostname) self.get_ip_srv = self.create_service(Trigger, 'temas/common/get_ip', self.handle_get_ip) self.get_mac_srv = self.create_service(Trigger, 'temas/common/get_mac', self.handle_get_mac) self.get_sn_srv = self.create_service(Trigger, 'temas/common/get_sn', self.handle_get_sn) self.get_temp_srv = self.create_service(Trigger, 'temas/common/get_temperature', self.handle_get_temp) self.get_ports_srv = self.create_service(Trigger, 'temas/common/get_ports', self.handle_get_ports) self.get_laser_x_srv = self.create_service(Trigger, 'temas/common/get_laser_x_los', self.handle_get_laser_x) self.get_laser_y_srv = self.create_service(Trigger, 'temas/common/get_laser_y_los', self.handle_get_laser_y) self.get_logger().info("✅ TEMAS Common Node started") # ---------------- Action Callbacks ----------------
[docs] def handle_shutdown(self, request, response): """ Shutdown the TEMAS device. Parameters ---------- request : Trigger.Request response : Trigger.Response Returns ------- Trigger.Response """ try: self.common.shutdown() response.success = True response.message = "TEMAS shutdown successful" except Exception as e: response.success = False response.message = str(e) return response
[docs] def handle_restart(self, request, response): """ Restart the TEMAS device. """ try: self.common.restart() response.success = True response.message = "TEMAS restarted" except Exception as e: response.success = False response.message = str(e) return response
[docs] def handle_near_on(self, request, response): """ Enable near mode on the TEMAS device. """ try: self.common.near_mode_on() response.success = True response.message = "Near mode ON" except Exception as e: response.success = False response.message = str(e) return response
[docs] def handle_near_off(self, request, response): """ Disable near mode on the TEMAS device. """ try: self.common.near_mode_off() response.success = True response.message = "Near mode OFF" except Exception as e: response.success = False response.message = str(e) return response
# ---------------- Data Callbacks ----------------
[docs] def handle_get_fw(self, request, response): """Get firmware version.""" try: fw = self.common.get_fw_version() response.success = True response.message = fw except Exception as e: response.success = False response.message = str(e) return response
[docs] def handle_get_hostname(self, request, response): """Get hostname.""" try: hostname = self.common.get_hostname() response.success = True response.message = hostname except Exception as e: response.success = False response.message = str(e) return response
[docs] def handle_get_ip(self, request, response): """Get IP address.""" try: ip = self.common.get_ip() response.success = True response.message = ip except Exception as e: response.success = False response.message = str(e) return response
[docs] def handle_get_mac(self, request, response): """Get MAC address.""" try: mac = self.common.get_mac() response.success = True response.message = mac except Exception as e: response.success = False response.message = str(e) return response
[docs] def handle_get_sn(self, request, response): """Get serial number.""" try: sn = self.common.get_sn() response.success = True response.message = sn except Exception as e: response.success = False response.message = str(e) return response
[docs] def handle_get_temp(self, request, response): """Get internal temperature.""" try: temp = self.common.get_temperature() response.success = True response.message = str(temp) except Exception as e: response.success = False response.message = str(e) return response
[docs] def handle_get_ports(self, request, response): """Get ports of TEMAS subsystems.""" try: ports = { "camera": self.common.get_port_camera(), "tof": self.common.get_port_tof_camera(), "control": self.common.get_port_control(), "common": self.common.get_port_common() } response.success = True response.message = str(ports) except Exception as e: response.success = False response.message = str(e) return response
[docs] def handle_get_laser_x(self, request, response): """Get laser X line-of-sight position.""" try: x = self.common.get_laser_x_los() response.success = True response.message = str(x) except Exception as e: response.success = False response.message = str(e) return response
[docs] def handle_get_laser_y(self, request, response): """Get laser Y line-of-sight position.""" try: y = self.common.get_laser_y_los() response.success = True response.message = str(y) except Exception as e: response.success = False response.message = str(e) return response
[docs] def main(args=None): """Run the TEMAS Common Node.""" rclpy.init(args=args) node = TemasCommonNode() try: rclpy.spin(node) except KeyboardInterrupt: node.get_logger().info("Shutting down TEMAS Common Node...") finally: node.destroy_node() rclpy.shutdown()
if __name__ == '__main__': main()