#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from rubu import temas
from std_srvs.srv import Trigger
from std_msgs.msg import String
[docs]
class TemasCommonNode(Node):
"""
ROS2 node for interacting with TEMAS Common functionalities.
Provides services for actions, data queries, and configuration using Trigger services.
Parameters
----------
ip_address : str
IP address of TEMAS device (default: '192.168.4.4')\n
Example:\n
``ros2 run temas_ros2 common --ros-args -p ip_address:=192.168.4.4``
Published Topics
----------------
temas/common/info : std_msgs.msg.String
Status or info messages from the node.\n
Example:\n
``ros2 topic echo /temas/common/info``
Services
--------
Actions
-------
shutdown : Trigger
Shutdown TEMAS device\n
``ros2 service call /temas/common/shutdown std_srvs/srv/Trigger``
restart : Trigger
Restart TEMAS device\n
``ros2 service call /temas/common/restart std_srvs/srv/Trigger``
near_mode_on : Trigger
Enable near mode\n
``ros2 service call /temas/common/near_mode_on std_srvs/srv/Trigger``
near_mode_off : Trigger
Disable near mode\n
``ros2 service call /temas/common/near_mode_off std_srvs/srv/Trigger``
Data Queries
------------
get_fw_version : Trigger
Get firmware version\n
``ros2 service call /temas/common/get_fw_version std_srvs/srv/Trigger``
get_hostname : Trigger
Get hostname\n
``ros2 service call /temas/common/get_hostname std_srvs/srv/Trigger``
get_ip : Trigger
Get IP address\n
``ros2 service call /temas/common/get_ip std_srvs/srv/Trigger``
get_mac : Trigger
Get MAC address\n
``ros2 service call /temas/common/get_mac std_srvs/srv/Trigger``
get_sn : Trigger
Get serial number\n
``ros2 service call /temas/common/get_sn std_srvs/srv/Trigger``
get_temperature : Trigger
Get internal temperature\n
``ros2 service call /temas/common/get_temperature std_srvs/srv/Trigger``
get_ports : Trigger
Get ports of TEMAS subsystems\n
``ros2 service call /temas/common/get_ports std_srvs/srv/Trigger``
get_laser_x_los : Trigger
Get laser X line-of-sight position\n
``ros2 service call /temas/common/get_laser_x_los std_srvs/srv/Trigger``
get_laser_y_los : Trigger
Get laser Y line-of-sight position\n
``ros2 service call /temas/common/get_laser_y_los std_srvs/srv/Trigger``
"""
def __init__(self):
"""
Initialize the TEMAS Common Node and connect to the device.
"""
super().__init__('temas_common')
# -------- Parameters --------
self.declare_parameter('ip_address', '192.168.4.4')
self.declare_parameter('port', 8083)
self.ip_address = self.get_parameter('ip_address').value
self.port = self.get_parameter('port').value
# -------- Connect to TEMAS --------
self.device = temas.Connect(ip_address=self.ip_address)
try:
self.common = temas.Common()
hostname = self.common.get_hostname()
self.get_logger().info(f"Connecting to TEMAS with hostname {hostname}")
except Exception as e:
self.get_logger().error(f"❌ Connecting to TEMAS failed: {e}")
return
# Publisher for status info
self.info_pub = self.create_publisher(String, 'temas/common/info', 10)
# -------- Action Services --------
self.shutdown_srv = self.create_service(Trigger, 'temas/common/shutdown', self.handle_shutdown)
self.restart_srv = self.create_service(Trigger, 'temas/common/restart', self.handle_restart)
self.near_on_srv = self.create_service(Trigger, 'temas/common/near_mode_on', self.handle_near_on)
self.near_off_srv = self.create_service(Trigger, 'temas/common/near_mode_off', self.handle_near_off)
# -------- Data Services --------
self.get_fw_srv = self.create_service(Trigger, 'temas/common/get_fw_version', self.handle_get_fw)
self.get_hostname_srv = self.create_service(Trigger, 'temas/common/get_hostname', self.handle_get_hostname)
self.get_ip_srv = self.create_service(Trigger, 'temas/common/get_ip', self.handle_get_ip)
self.get_mac_srv = self.create_service(Trigger, 'temas/common/get_mac', self.handle_get_mac)
self.get_sn_srv = self.create_service(Trigger, 'temas/common/get_sn', self.handle_get_sn)
self.get_temp_srv = self.create_service(Trigger, 'temas/common/get_temperature', self.handle_get_temp)
self.get_ports_srv = self.create_service(Trigger, 'temas/common/get_ports', self.handle_get_ports)
self.get_laser_x_srv = self.create_service(Trigger, 'temas/common/get_laser_x_los', self.handle_get_laser_x)
self.get_laser_y_srv = self.create_service(Trigger, 'temas/common/get_laser_y_los', self.handle_get_laser_y)
self.get_logger().info("✅ TEMAS Common Node started")
# ---------------- Action Callbacks ----------------
[docs]
def handle_shutdown(self, request, response):
"""
Shutdown the TEMAS device.
Parameters
----------
request : Trigger.Request
response : Trigger.Response
Returns
-------
Trigger.Response
"""
try:
self.common.shutdown()
response.success = True
response.message = "TEMAS shutdown successful"
except Exception as e:
response.success = False
response.message = str(e)
return response
[docs]
def handle_restart(self, request, response):
"""
Restart the TEMAS device.
"""
try:
self.common.restart()
response.success = True
response.message = "TEMAS restarted"
except Exception as e:
response.success = False
response.message = str(e)
return response
[docs]
def handle_near_on(self, request, response):
"""
Enable near mode on the TEMAS device.
"""
try:
self.common.near_mode_on()
response.success = True
response.message = "Near mode ON"
except Exception as e:
response.success = False
response.message = str(e)
return response
[docs]
def handle_near_off(self, request, response):
"""
Disable near mode on the TEMAS device.
"""
try:
self.common.near_mode_off()
response.success = True
response.message = "Near mode OFF"
except Exception as e:
response.success = False
response.message = str(e)
return response
# ---------------- Data Callbacks ----------------
[docs]
def handle_get_fw(self, request, response):
"""Get firmware version."""
try:
fw = self.common.get_fw_version()
response.success = True
response.message = fw
except Exception as e:
response.success = False
response.message = str(e)
return response
[docs]
def handle_get_hostname(self, request, response):
"""Get hostname."""
try:
hostname = self.common.get_hostname()
response.success = True
response.message = hostname
except Exception as e:
response.success = False
response.message = str(e)
return response
[docs]
def handle_get_ip(self, request, response):
"""Get IP address."""
try:
ip = self.common.get_ip()
response.success = True
response.message = ip
except Exception as e:
response.success = False
response.message = str(e)
return response
[docs]
def handle_get_mac(self, request, response):
"""Get MAC address."""
try:
mac = self.common.get_mac()
response.success = True
response.message = mac
except Exception as e:
response.success = False
response.message = str(e)
return response
[docs]
def handle_get_sn(self, request, response):
"""Get serial number."""
try:
sn = self.common.get_sn()
response.success = True
response.message = sn
except Exception as e:
response.success = False
response.message = str(e)
return response
[docs]
def handle_get_temp(self, request, response):
"""Get internal temperature."""
try:
temp = self.common.get_temperature()
response.success = True
response.message = str(temp)
except Exception as e:
response.success = False
response.message = str(e)
return response
[docs]
def handle_get_ports(self, request, response):
"""Get ports of TEMAS subsystems."""
try:
ports = {
"camera": self.common.get_port_camera(),
"tof": self.common.get_port_tof_camera(),
"control": self.common.get_port_control(),
"common": self.common.get_port_common()
}
response.success = True
response.message = str(ports)
except Exception as e:
response.success = False
response.message = str(e)
return response
[docs]
def handle_get_laser_x(self, request, response):
"""Get laser X line-of-sight position."""
try:
x = self.common.get_laser_x_los()
response.success = True
response.message = str(x)
except Exception as e:
response.success = False
response.message = str(e)
return response
[docs]
def handle_get_laser_y(self, request, response):
"""Get laser Y line-of-sight position."""
try:
y = self.common.get_laser_y_los()
response.success = True
response.message = str(y)
except Exception as e:
response.success = False
response.message = str(e)
return response
[docs]
def main(args=None):
"""Run the TEMAS Common Node."""
rclpy.init(args=args)
node = TemasCommonNode()
try:
rclpy.spin(node)
except KeyboardInterrupt:
node.get_logger().info("Shutting down TEMAS Common Node...")
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()