diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..70df8a3 --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +**/__pycache__ + +COLCON_IGNORE +.colcon_root diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..fc45ba6 --- /dev/null +++ b/LICENSE @@ -0,0 +1,28 @@ +BSD 3-Clause License + +Copyright (c) 2024, Martin Klomp, MIRTE-team + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are met: + +1. Redistributions of source code must retain the above copyright notice, this + list of conditions and the following disclaimer. + +2. Redistributions in binary form must reproduce the above copyright notice, + this list of conditions and the following disclaimer in the documentation + and/or other materials provided with the distribution. + +3. Neither the name of the copyright holder nor the names of its + contributors may be used to endorse or promote products derived from + this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/mirte_robot/__pycache__/linetrace.cpython-36.pyc b/mirte_robot/__pycache__/linetrace.cpython-36.pyc deleted file mode 100644 index 9fc2b08..0000000 Binary files a/mirte_robot/__pycache__/linetrace.cpython-36.pyc and /dev/null differ diff --git a/mirte_robot/__pycache__/robot.cpython-36.pyc b/mirte_robot/__pycache__/robot.cpython-36.pyc deleted file mode 100644 index 8bb10d6..0000000 Binary files a/mirte_robot/__pycache__/robot.cpython-36.pyc and /dev/null differ diff --git a/mirte_robot/linetrace.py b/mirte_robot/linetrace.py index f7e4c66..2315fec 100644 --- a/mirte_robot/linetrace.py +++ b/mirte_robot/linetrace.py @@ -9,8 +9,8 @@ import multiprocessing from websocket_server import WebsocketServer -# Already load rospy (whicht takes long) and robot, so mirte.py does not need to do this anymore -import rospy +# Already load rclpy (whicht takes long) and robot, so mirte.py does not need to do this anymore +import rclpy from mirte_robot import robot # Global shared memory objects (TODO: check if we need shared memory, why is server working?) @@ -22,9 +22,9 @@ # 1) the code finishes # 2) the user stopped the process # 3) the websocket connection is closed -def stop_mirte(terminate = True): +def stop_mirte(): global running - if terminate: + if running.value: process.terminate() running.value = False @@ -68,8 +68,8 @@ def traceit(frame, event, arg): # Sending the linetrace 0 to the client server.send_message_to_all("0") - - stop_mirte(False) + running.value = False + stop_mirte() process = multiprocessing.Process(target = load_mirte_module, args=(stepper, do_step)) diff --git a/mirte_robot/robot.py b/mirte_robot/robot.py index 5ff597c..37723e1 100644 --- a/mirte_robot/robot.py +++ b/mirte_robot/robot.py @@ -1,41 +1,98 @@ #!/usr/bin/env python -import time -import rospy -import rosservice -import signal -import sys import math -import atexit - -# TODO: check if we need the telemetrix version of this? -#sys.path.append('/usr/local/lib/python2.7/dist-packages/PyMata-2.20-py2.7.egg') # Needed for jupyter notebooks -#sys.path.append('/usr/local/lib/python2.7/dist-packages/pyserial-3.4-py2.7.egg') +import os +import platform +import time +import weakref +from typing import TYPE_CHECKING, Literal, Optional, overload + +import rclpy +import rclpy.node +from rclpy.validate_namespace import validate_namespace + +if TYPE_CHECKING: + import rclpy.client + +from controller_manager_msgs.srv import SwitchController +from mirte_msgs.srv import ( + GetAnalogPinValue, + GetBoardCharacteristics, + GetColorHSL, + GetColorRGBW, + GetDigitalPinValue, + GetEncoder, + GetIntensity, + GetIntensityDigital, + GetKeypad, + GetRange, + SetDigitalPinValue, + SetMotorSpeed, + SetOLEDFile, + SetOLEDText, + SetPWMPinValue, + SetServoAngle, +) +from rcl_interfaces.srv import ListParameters -import message_filters -from geometry_msgs.msg import Twist -from std_msgs.msg import Int32 -from std_msgs.msg import String -from std_msgs.msg import Empty -from mirte_msgs.msg import * +mirte = {} -from mirte_msgs.srv import * -from std_srvs.srv import * +# No QoS Profiles are set, but this might not be required, since they might already behave like ROS 1 persistant. -mirte = {} -class Robot(): - """ Robot API +class Robot: + """Robot API - This class allows you to control the robot from Python. The getters and setters - are just wrappers calling ROS topics or services. + This class allows you to control the robot from Python. The getters and setters + are just wrappers calling ROS topics or services. """ + # Implementation Notes: + # This class creates a hidden ROS Node for the communication. + # This should only be run once, however this can not be prevented from the web interface. + # Therefore the node is also anonymized with the current time. + + def __init__( + self, machine_namespace: Optional[str] = None, hardware_namespace: str = "io" + ): + """Intialize the Mirte Robot API + + Parameters: + machine_namespace (Optional[str], optional): The Namespace from '/' to the ROS namespace for the specific Mirte. Defaults to "/{HOSTNAME}". (This only has to be changed when running the Robot API from a different machine directly. It is configured correctly for the Web interface) + hardware_namespace (str, optional): The namespace for the hardware peripherals. Defaults to "io". + """ + self._machine_namespace = ( + machine_namespace + if machine_namespace and validate_namespace(machine_namespace) + else "/" + platform.node().replace("-", "_").lower() + ) + self._hardware_namespace = ( + hardware_namespace + if validate_namespace( + hardware_namespace + if hardware_namespace.startswith("/") + else (self._machine_namespace + "/" + hardware_namespace) + ) + else "io" + ) + + ROS_DISTRO = os.getenv("ROS_DISTRO") + + rclpy.init() + + # This node should be only ran once. + # No 'anonymous' flag available, so use unix epoch nano seconds to pad name + self._node = rclpy.node.Node( + "_mirte_python_api_" + str(time.time_ns()), + namespace=self._machine_namespace, + start_parameter_services=True, + ) - def __init__(self): # Stop robot when exited - atexit.register(self.stop) + rclpy.get_default_context().on_shutdown(self._at_exit) + + self.CONTROLLER = "diffbot_base_controller" - self.PWM = 3 #PrivateConstants.PWM when moving to Python3 + self.PWM = 3 # PrivateConstants.PWM when moving to Python3 self.INPUT = 0 self.OUTPUT = 1 self.PULLUP = 11 @@ -45,28 +102,103 @@ def __init__(self): self.begin_time = time.time() self.last_call = 0 - # Call /stop and /start service to disable/enable the ROS diff_drive_controller - # By default this class will control the rbot though PWM (controller stopped). Only in case + # Call controller_manager/switch_controller service to disable/enable the ROS diff_drive_controller + # By default this class will control the robot though PWM (controller stopped). Only in case # the controller is needed, it will be enabled. - self.stop_controller_service = rospy.ServiceProxy('stop', Empty, persistent=True) - self.start_controller_service = rospy.ServiceProxy('start', Empty, persistent=True) + self._switch_controller_service = self._node.create_client( + SwitchController, "controller_manager/switch_controller" + ) + + if ROS_DISTRO[0] >= "i": # Check if the ROS Distro is IRON or newer + # Not available untill ROS Iron + if not self._node.wait_for_node( + self._hardware_namespace + "telemetrix", 10 + ): + self._node.get_logger().fatal( + f"Telemetrix node at '{self._node.get_namespace() + '/' + self._hardware_namespace + '/telemetrix'}' was not found! Aborting" + ) + exit(-1) + list_parameters: rclpy.client.Client = self._node.create_client( + ListParameters, self._hardware_namespace + "/telemetrix/list_parameters" + ) + + # # Get the Board Characteristics + get_board_characteristics = self._node.create_client( + GetBoardCharacteristics, + self._hardware_namespace + "/get_board_characteristics", + ) + + # Wait for the get_board_characteristics to prevent weird errors. + if not get_board_characteristics.wait_for_service(1): + self._node.get_logger().fatal( + f"Telemetrix node at '{self._node.get_namespace()+ '/'+ self._hardware_namespace + '/telemetrix'}' does not provide a '{self._node.get_namespace() + '/' + self._hardware_namespace + '/get_board_characteristics'}' service! Aborting" + ) + exit(-1) + + board_future: rclpy.Future = get_board_characteristics.call_async( + GetBoardCharacteristics.Request() + ) + rclpy.spin_until_future_complete(self._node, board_future) + + board_characteristics: GetBoardCharacteristics.Response = board_future.result() + + self._max_adc: int = board_characteristics.max_adc + self._max_pwm: int = board_characteristics.max_pwm + self._max_voltage: float = board_characteristics.max_voltage + + self._node.destroy_client(get_board_characteristics) # Service for motor speed - self.motors = {} - if rospy.has_param("/mirte/motor"): - self.motors = rospy.get_param("/mirte/motor") + self.motors = {} # FIXME: Is self.motors used? + motors_future: rclpy.Future = list_parameters.call_async( + ListParameters.Request(prefixes=["motor"], depth=3) + ) + rclpy.spin_until_future_complete(self._node, motors_future) + + motor_prefixes: list[str] = motors_future.result().result.prefixes + if len(motor_prefixes) > 0: + self.motors = [ + motor_prefix.split(".")[-1] for motor_prefix in motor_prefixes + ] self.motor_services = {} for motor in self.motors: - self.motor_services[motor] = rospy.ServiceProxy('/mirte/set_' + self.motors[motor]["name"] + '_speed', SetMotorSpeed, persistent=True) - - # Service for motor speed - if rospy.has_param("/mirte/servo"): - servos = rospy.get_param("/mirte/servo") - self.servo_services = {} + self._node.get_logger().info( + f"Created service client for motor [{motor}]" + ) + self.motor_services[motor] = self._node.create_client( + SetMotorSpeed, + self._hardware_namespace + "/motor/" + motor + "/set_speed", + ) + + # Service for motor speed + + def finalize(node: rclpy.node.Node, motors: dict[str, rclpy.client.Client]): + for motor in motors.values(): + future = motor.call_async(SetMotorSpeed.Request(speed=0)) + rclpy.spin_until_future_complete(node, future) + + self._finalizer = weakref.finalize( + self, finalize, self._node, self.motor_services + ) + + servo_future = list_parameters.call_async( + ListParameters.Request(prefixes=["servo"], depth=3) + ) + + rclpy.spin_until_future_complete(self._node, servo_future) + + servo_prefixes = servo_future.result().result.prefixes + if len(servo_prefixes) > 0: + servos = [servo_prefix.split(".")[-1] for servo_prefix in servo_prefixes] + self.servo_services: dict[str, rclpy.client.Client] = {} for servo in servos: - self.servo_services[servo] = rospy.ServiceProxy('/mirte/set_' + servos[servo]["name"] + '_servo_angle', SetServoAngle, persistent=True) - - rospy.init_node('mirte_python_api', anonymous=False) + self._node.get_logger().info( + f"Created service client for servo [{servo}]" + ) + self.servo_services[servo] = self._node.create_client( + SetServoAngle, + self._hardware_namespace + "/servo/" + servo + "/set_angle", + ) ## Sensors ## The sensors are now just using a blocking service call. This is intentionally @@ -76,64 +208,206 @@ def __init__(self): ## maybe even to blockly. # Services for distance sensors - if rospy.has_param("/mirte/distance"): - distance_sensors = rospy.get_param("/mirte/distance") + distance_future = list_parameters.call_async( + ListParameters.Request(prefixes=["distance"], depth=3) + ) + + rclpy.spin_until_future_complete(self._node, distance_future) + + distance_prefixes = distance_future.result().result.prefixes + if len(distance_prefixes) > 0: + distance_sensors = [sensor.split(".")[-1] for sensor in distance_prefixes] self.distance_services = {} for sensor in distance_sensors: - self.distance_services[sensor] = rospy.ServiceProxy('/mirte/get_distance_' + distance_sensors[sensor]["name"], GetDistance, persistent=True) - - if rospy.has_param("/mirte/oled"): - oleds = rospy.get_param("/mirte/oled") + self._node.get_logger().info( + f"Created service client for distance sensor [{sensor}]" + ) + self.distance_services[sensor] = self._node.create_client( + GetRange, + self._hardware_namespace + "/distance/" + sensor + "/get_range", + ) + + # Services for oled + oled_future = list_parameters.call_async( + ListParameters.Request(prefixes=["oled"], depth=3) + ) + + rclpy.spin_until_future_complete(self._node, oled_future) + + oled_prefixes = oled_future.result().result.prefixes + if len(oled_prefixes) > 0: + oleds = [oled.split(".")[-1] for oled in oled_prefixes] self.oled_services = {} for oled in oleds: - self.oled_services[oled] = rospy.ServiceProxy('/mirte/set_' + oleds[oled]["name"] + '_image', SetOLEDImage, persistent=True) + self._node.get_logger().info( + f"Created service client for oled [{oled}]" + ) + self.oled_services[oled] = { + "text": self._node.create_client( + SetOLEDText, + self._hardware_namespace + "/oled/" + oled + "/set_text", + ), + "file": self._node.create_client( + SetOLEDFile, + self._hardware_namespace + "/oled/" + oled + "/set_file", + ), + } # Services for intensity sensors (TODO: how to expose the digital version?) - if rospy.has_param("/mirte/intensity"): - intensity_sensors = rospy.get_param("/mirte/intensity") + intensity_future = list_parameters.call_async( + ListParameters.Request(prefixes=["intensity"], depth=3) + ) + + rclpy.spin_until_future_complete(self._node, intensity_future) + + intensity_prefixes = intensity_future.result().result.prefixes + if len(intensity_prefixes) > 0: + intensity_sensors = [sensor.split(".")[-1] for sensor in intensity_prefixes] self.intensity_services = {} # We can not get the types (analog and/or digital) of the intensity sensor # straight from the parameter server (it might be just set as the PCB without # explicit values. We can however deduct what is there by checking the # services. - service_list = rosservice.get_service_list() + service_list = set( + [ + service + for service, service_type in self._node.get_service_names_and_types() + ] + ) for sensor in intensity_sensors: - if "/mirte/get_intensity_" + intensity_sensors[sensor]["name"] in service_list: - self.intensity_services[sensor] = rospy.ServiceProxy('/mirte/get_intensity_' + intensity_sensors[sensor]["name"], GetIntensity, persistent=True) - if "/mirte/get_intensity_" + intensity_sensors[sensor]["name"] + "_digital" in service_list: - self.intensity_services[sensor + "_digital"] = rospy.ServiceProxy('/mirte/get_intensity_' + intensity_sensors[sensor]["name"] + "_digital", GetIntensityDigital, persistent=True) - + if ( + self._hardware_namespace + "/intensity/" + sensor + "/get_analog" + in service_list + ): + self._node.get_logger().info( + f"Created service client for intensity [{sensor}]" + ) + self.intensity_services[sensor] = self._node.create_client( + GetIntensity, + self._hardware_namespace + + "/intensity/" + + sensor + + "/get_analog", + ) + if ( + self._hardware_namespace + "/intensity/" + sensor + "/get_digital" + in service_list + ): + self._node.get_logger().info( + f"Created service client for digital intensity [{sensor}]" + ) + self.intensity_services[sensor + "_digital"] = ( + self._node.create_client( + GetIntensityDigital, + self._hardware_namespace + + "/intensity/" + + sensor + + "/get_digital", + ) + ) # Services for encoder sensors - if rospy.has_param("/mirte/encoder"): - encoder_sensors = rospy.get_param("/mirte/encoder") + encoder_future = list_parameters.call_async( + ListParameters.Request(prefixes=["encoder"], depth=3) + ) + + rclpy.spin_until_future_complete(self._node, encoder_future) + + encoder_prefixes = encoder_future.result().result.prefixes + if len(encoder_prefixes) > 0: + encoder_sensors = [sensor.split(".")[-1] for sensor in encoder_prefixes] self.encoder_services = {} for sensor in encoder_sensors: - self.encoder_services[sensor] = rospy.ServiceProxy('/mirte/get_encoder_' + encoder_sensors[sensor]["name"], GetEncoder, persistent=True) + self._node.get_logger().info( + f"Created service client for encoder [{sensor}]" + ) + self.encoder_services[sensor] = self._node.create_client( + GetEncoder, + self._hardware_namespace + "/encoder/" + sensor + "/get_encoder", + ) # Services for keypad sensors - if rospy.has_param("/mirte/keypad"): - keypad_sensors = rospy.get_param("/mirte/keypad") + keypad_future = list_parameters.call_async( + ListParameters.Request(prefixes=["keypad"], depth=3) + ) + + rclpy.spin_until_future_complete(self._node, keypad_future) + + keypad_prefixes = keypad_future.result().result.prefixes + if len(keypad_prefixes) > 0: + keypad_sensors = [sensor.split(".")[-1] for sensor in keypad_prefixes] self.keypad_services = {} for sensor in keypad_sensors: - self.keypad_services[sensor] = rospy.ServiceProxy('/mirte/get_keypad_' + keypad_sensors[sensor]["name"], GetKeypad, persistent=True) + self._node.get_logger().info( + f"Created service client for keypad [{sensor}]" + ) + self.keypad_services[sensor] = self._node.create_client( + GetKeypad, + self._hardware_namespace + "/keypad/" + sensor + "/get_key", + ) # Services for color sensors - if rospy.has_param("/mirte/color"): - color_sensors = rospy.get_param("/mirte/color") - self.color_services = {} - for sensor in color_sensors: - self.color_services[sensor] = rospy.ServiceProxy('/mirte/get_color_' + color_sensors[sensor]["name"], GetColor, persistent=True) - - self.get_pin_value_service = rospy.ServiceProxy('/mirte/get_pin_value', GetPinValue, persistent=True) - self.set_pin_value_service = rospy.ServiceProxy('/mirte/set_pin_value', SetPinValue, persistent=True) - + color_future = list_parameters.call_async( + ListParameters.Request(prefixes=["color"], depth=3) + ) - signal.signal(signal.SIGINT, self._signal_handler) - signal.signal(signal.SIGTERM, self._signal_handler) + rclpy.spin_until_future_complete(self._node, color_future) - def getTimestamp(self): + color_prefixes = color_future.result().result.prefixes + if len(color_prefixes) > 0: + color_sensors = [sensor.split(".")[-1] for sensor in color_prefixes] + self.color_services: dict[str, dict[str, "rclpy.client.Client"]] = {} + for sensor in color_sensors: + self._node.get_logger().info( + f"Created service client for color sensor [{sensor}]" + ) + self.color_services[sensor] = { + "RGBW": self._node.create_client( + GetColorRGBW, + self._hardware_namespace + "/color/" + sensor + "/get_rgbw", + ), + "HSL": self._node.create_client( + GetColorHSL, + self._hardware_namespace + "/color/" + sensor + "/get_hsl", + ), + } + + self._node.destroy_client(list_parameters) + + self._get_digital_pin_value_service = self._node.create_client( + GetDigitalPinValue, self._hardware_namespace + "/get_digital_pin_value" + ) + + self._get_analog_pin_value_service = self._node.create_client( + GetAnalogPinValue, self._hardware_namespace + "/get_analog_pin_value" + ) + + self._set_digital_pin_value_service = self._node.create_client( + SetDigitalPinValue, self._hardware_namespace + "/set_digital_pin_value" + ) + + self._set_pwm_pin_value_service = self._node.create_client( + SetPWMPinValue, self._hardware_namespace + "/set_pwm_pin_value" + ) + + def _call_service( + self, client: rclpy.client.Client, request: rclpy.client.SrvTypeRequest + ) -> rclpy.client.SrvTypeResponse: + client.wait_for_service() + + future_response = client.call_async(request) + rclpy.spin_until_future_complete(self._node, future_response) + return future_response.result() + + # FIXME: Check if services are available, if not don't hard error on: + # AttributeError: 'Robot' object has no attribute 'oled_services'. Did you mean: '_services'? + def _check_available(self, services: dict[str] | None, id: str) -> bool: + if services is None: + return False + return id in services + + def getTimestamp(self) -> float: """Gets the elapsed time in seconds since the initialization fo the Robot. Returns: @@ -142,7 +416,7 @@ def getTimestamp(self): return time.time() - self.begin_time - def getTimeSinceLastCall(self): + def getTimeSinceLastCall(self) -> float: """Gets the elapsed time in seconds since the last call to this function. Returns: @@ -152,28 +426,45 @@ def getTimeSinceLastCall(self): last_call = self.last_call self.last_call = time.time() if last_call == 0: - return 0 + return 0 else: - return time.time() - last_call + return time.time() - last_call - def getDistance(self, sensor): + def getDistance(self, sensor: str) -> float: """Gets data from a HC-SR04 distance sensor: calculated distance in meters. Parameters: sensor (str): The name of the sensor as defined in the configuration. Returns: - int: Range in meters measured by the HC-SR04 sensor. + float: Range in meters measured by the HC-SR04 sensor. (The distance gets clamped to minimum and maximum range of the HC-SR04 sensor) Warning: A maximum of 6 distance sensors is supported. """ + value = self._call_service(self.distance_services[sensor], GetRange.Request()) + range = value.range + + distance: float = range.range + + # FIXME: What to do about nan? + if distance == math.inf: + distance = range.max_range + elif distance == -math.inf: + distance = range.min_range - dist = self.distance_services[sensor]() - return dist.data + return distance - def getIntensity(self, sensor, type="analog"): + # TODO: Maybe change digital return type to bool + @overload + def getIntensity(self, sensor: str, type: Literal["analog"]) -> float: ... + @overload + def getIntensity(self, sensor: str, type: Literal["digital"]) -> int: ... + + def getIntensity( + self, sensor: str, type: Literal["analog", "digital"] = "analog" + ) -> int | float: """Gets data from an intensity sensor. Parameters: @@ -181,15 +472,21 @@ def getIntensity(self, sensor, type="analog"): type (str): The type of the sensor (either 'analog' or 'digital'). Returns: - int: Value of the sensor (0-255 when analog, 0-1 when digital). + int | float: Value of the sensor (0.0-1.0 when analog, 0-1 when digital). """ + # FIXME: IMPROVE ERROR for type if type == "analog": - value = self.intensity_services[sensor]() + value = self._call_service( + self.intensity_services[sensor], GetIntensity.Request() + ) if type == "digital": - value = self.intensity_services[sensor + "_digital"]() + value = self._call_service( + self.intensity_services[sensor + "_digital"], + GetIntensityDigital.Request(), + ) return value.data - def getEncoder(self, sensor): + def getEncoder(self, sensor: str) -> int: """Gets data from an encoder: every encoder pulse increments the counter. Parameters: @@ -199,10 +496,12 @@ def getEncoder(self, sensor): int: Number of encoder pulses since boot of the robot. """ - value = self.encoder_services[sensor]() + value = self._call_service(self.encoder_services[sensor], GetEncoder.Request()) return value.data - def getKeypad(self, keypad): + def getKeypad( + self, keypad: str + ) -> Literal["", "up", "down", "left", "right", "enter"]: """Gets the value of the keypad: the button that is pressed. Parameters: @@ -212,37 +511,111 @@ def getKeypad(self, keypad): str: The name of the button ('up', 'down', 'left', 'right', 'enter'). """ - - value = self.keypad_services[keypad]() + value = self._call_service(self.keypad_services[keypad], GetKeypad.Request()) return value.data - def getColor(self, sensor): + def getColorRGBW(self, sensor: str) -> dict[str, float]: + """Gets the value of the color sensor. + + Parameters: + sensor (str): The name of the sensor as defined in the configuration. + + Returns: + {r, g, b, w}: Scaled (0-1) values per R(ed), G(reen), B(lue), and W(hite). + """ + + value = self._call_service( + self.color_services[sensor]["RGBW"], GetColorRGBW.Request() + ) + return { + "r": value.color.r, + "g": value.color.g, + "b": value.color.b, + "w": value.color.w, + } + + def getColor(self, sensor: str) -> dict[str, float]: """Gets the value of the color sensor. Parameters: sensor (str): The name of the sensor as defined in the configuration. Returns: - {r, g, b, w}: Raw (0-65536) values per R(ed), G(reen), B(lue), and W(hite). + {h, s, l}: Hue (0-360), Saturation (0-1), Lightness. """ - value = self.color_services[sensor]() - return {'r': value.color.color.r, 'g': value.color.color.g, 'b': value.color.color.b, 'w': value.color.color.w } + value = self._call_service( + self.color_services[sensor]["HSL"], GetColorHSL.Request() + ) + return { + "h": value.color.h, + "s": value.color.s, + "l": value.color.l, + } + + @overload # FIXME: Should this return int of float + def getAnalogPinValue(self, pin: str, mode: Literal["percentage"]) -> int: ... - def getAnalogPinValue(self, pin): + @overload + def getAnalogPinValue(self, pin: str, mode: Literal["raw"]) -> int: ... + + @overload + def getAnalogPinValue(self, pin: str, mode: Literal["voltage"]) -> float: ... + + # TODO: What to do with max? + def getAnalogPinValue( + self, pin: str, mode: Literal["percentage", "raw", "voltage"] = "percentage" + ) -> float: """Gets the input value of an analog pin. Parameters: pin (str): The pin number of an analog pin as printed on the microcontroller. + mode (str, optional): The units of the value, can be "percentage", "raw" or "voltage". Defaults to "percentage". + # FIXME: HOW TO DO DOCS Returns: - int: Value between 0-255. + int: RAW + float: Value between 0-5V (Arduino) or 0-3.3V (Pico). """ - value = self.get_pin_value_service(str(pin), "analog") - return value.data - - def setAnalogPinValue(self, pin, value): + response: GetAnalogPinValue.Response = self._call_service( + self._get_analog_pin_value_service, + GetAnalogPinValue.Request(pin=str(pin)), + ) + + # FIXME: TMP CHECK + assert response.status, response.message + match mode: + case "raw": + return response.value + case "percentage": + return response.value / self._max_adc * 100 + case "voltage": + return response.value / self._max_adc * self._max_voltage + case _: + assert False, "FIXME: UNKNOWN MODE" + + # TODO: Input int or float? Why not both + @overload + def setAnalogPinValue( + self, pin: str, value: int | float, mode: Literal["percentage"] + ) -> bool: ... + + @overload + def setAnalogPinValue(self, pin: str, value: int, mode: Literal["raw"]) -> bool: ... + + @overload + def setAnalogPinValue( + self, pin: str, value: float, mode: Literal["voltage"] + ) -> bool: ... + + # FIXME: UPDATE DOCS + def setAnalogPinValue( + self, + pin: str, + value: int | float, + mode: Literal["percentage", "raw", "voltage"] = "percentage", + ) -> bool: """Sets the output value of an analog pin (PWM). Parameters: @@ -250,61 +623,101 @@ def setAnalogPinValue(self, pin, value): value (int): Value between 0-255. """ - value = self.set_pin_value_service(str(pin), "analog", value) - return value.status - - def setOLEDText(self, oled, text): + raw_value = None + + match mode: + # TODO: Maybe add u16 bounds checking, since message type is picky + case "raw": + # Bounds checking happens in telemetrix node + raw_value = int(value) + case "percentage": + # TODO: Maybe check percentage bounds here for better error. + raw_value = int((value / 100) * self._max_pwm) + case "voltage": + # TODO: Maybe check voltage bounds here for better error. + raw_value = int((value / self._max_voltage) * self._max_pwm) + case _: + assert False, "NO VALID MODE" + + response: SetPWMPinValue.Response = self._call_service( + self._set_pwm_pin_value_service, + SetPWMPinValue.Request(pin=str(pin), value=raw_value), + ) + + assert response.status, response.message + return response.status + + def setOLEDText(self, oled: str, text: str) -> bool: """Shows text on the OLED. Parameters: oled (str): The name of the sensor as defined in the configuration. text (str): String to be shown on the 128x64 OLED. """ - value = self.oled_services[oled]('text', str(text)) + value = self._call_service( + self.oled_services[oled]["text"], SetOLEDText.Request(text=str(text)) + ) return value.status - def setOLEDImage(self, oled, image): + def setOLEDImage(self, oled: str, image: str) -> bool: """Shows image on the OLED. Parameters: oled (str): The name of the sensor as defined in the configuration. - image (str): Image name as defined in the images folder of the mirte-oled-images repository (excl file extension). + image (str): Image name/path either an absolute path, a path + relative to the folder of the mirte-oled-images + repository or and package relative path (pkg://PACKAGE_NAME/REST/OF/PATH). + The image extension can be omitted if its png. """ - value = self.oled_services[oled]('image', image) + value = self._call_service( + self.oled_services[oled]["file"], + SetOLEDFile.Request( + path=(image if ("." in image.split("/")[-1]) else (image + ".png")) + ), + ) return value.status - def setOLEDAnimation(self, oled, animation): + def setOLEDAnimation(self, oled: str, animation: str) -> bool: """Shows animation on the OLED. Parameters: oled (str): The name of the sensor as defined in the configuration. - animation (str): Animation (directory) name as defined in the animations folder of the mirte-oled-images repository. + animation (str): Animation (directory) name/path either an absolute + path, a path relative to the folder of the + mirte-oled-images repository or and package + relative path (pkg://PACKAGE_NAME/REST/OF/PATH). """ - value = self.oled_services[oled]('animation', animation) + value = self._call_service( + self.oled_services[oled]["file"], SetOLEDFile.Request(path=animation) + ) return value.status - def getDigitalPinValue(self, pin): + def getDigitalPinValue(self, pin: str) -> bool: """Gets the input value of a digital pin. Parameters: - pin (str): The pin number of an analog pin as printed on the microcontroller. + pin (str): The pin number of a digital pin as printed on the microcontroller. Returns: bool: The input value. """ - value = self.get_pin_value_service(str(pin), "digital") - return value.data + response: GetDigitalPinValue.Response = self._call_service( + self._get_digital_pin_value_service, + GetDigitalPinValue.Request(pin=str(pin)), + ) + assert response.status, response.message + return bool(response.value) - def setServoAngle(self, servo, angle): + def setServoAngle(self, servo: str, angle: float) -> bool: """Sets the angle of a servo. Parameters: servo (str): The name of the sensor as defined in the configuration. - angle (int): The angle of the servo (range [0-360], but some servos - might be hysically limited to [0-180]. + angle (float): The angle of the servo (range [0-360], but some servos + might be physically limited to [0-180]. Returns: bool: True if set successfully. @@ -320,36 +733,48 @@ def setServoAngle(self, servo, angle): Warning: A maximum of 12 servos is supported. """ - - value = self.servo_services[servo](angle) + value = self._call_service( + self.servo_services[servo], + SetServoAngle.Request( + angle=float(angle), degrees=SetServoAngle.Request.DEGREES + ), + ) return value.status - def setDigitalPinValue(self, pin, value): + def setDigitalPinValue(self, pin: str, value: bool) -> bool: """Sets the output value of a digital pin. Parameters: pin (str): The pin number of an analog pin as printed on the microcontroller. value (bool): Value to set. """ - value = self.set_pin_value_service(str(pin), "digital", value) - return value.status - def setMotorSpeed(self, motor, value): + response: SetDigitalPinValue.Response = self._call_service( + self._set_digital_pin_value_service, + SetDigitalPinValue.Request(pin=str(pin), value=value), + ) + assert response.status, response.message # FIXME: TMP ERROR + + return response.status + + def setMotorSpeed(self, motor: str, value: int) -> bool: """Sets the speed of the motor. Parameters: motor (str): The name of the sensor as defined in the configuration. - value (int): The 'directional duty cycle' (range [-100, 100]) of the PWM + value (int): The 'directional duty cycle' (range [-100, 100]) of the PWM signal (-100: full backward, 0: stand still, 100: full forward). Returns: bool: True if set successfully. """ - motor = self.motor_services[motor](value) + motor = self._call_service( + self.motor_services[motor], SetMotorSpeed.Request(speed=int(value)) + ) return motor.status - def setMotorControl(self, status): + def setMotorControl(self, status: bool) -> bool: """Enables/disables the motor controller. This is enabled on boot, but can be disabled/enabled at runtime. This makes the ROS control node pause, so it will not respond to Twist messages anymore when disabled. @@ -358,16 +783,22 @@ def setMotorControl(self, status): status (bool): To which status the motor controller should be set. Returns: - none + bool: True if succes (ok) """ - - if (status): - self.start_controller_service() + request = None + if status: + request = SwitchController.Request( + activate_controllers=[self.CONTROLLER], activate_asap=True + ) else: - self.stop_controller_service() - return + request = SwitchController.Request(deactivate_controllers=[self.CONTROLLER]) + + response: SwitchController.Response = self._call_service( + self._switch_controller_service, request + ) + return response.ok - def stop(self): + def stop(self) -> None: """Stops all DC motors defined in the configuration Note: @@ -375,23 +806,28 @@ def stop(self): or when it finished. """ - for motor in self.motors: - self.setMotorSpeed(self.motors[motor]["name"], 0) + self.setMotorSpeed(motor, 0) - def _signal_handler(self, sig, frame): + def _at_exit(self) -> None: self.stop() - sys.exit() + # We need a special function to initiate the Robot() because the main.py need to call the # init_node() (see: https://answers.ros.org/question/266612/rospy-init_node-inside-imported-file/) -def createRobot(): +def createRobot( + machine_namespace: Optional[str] = None, hardware_namespace: str = "io" +) -> Robot: """Creates and return instance of the robot class. + Parameters: + machine_namespace (Optional[str], optional): The Namespace from '/' to the ROS namespace for the specific Mirte. Defaults to "/{HOSTNAME}". (This only has to be changed when running the Robot API from a different machine directly. It is configured correctly for the Web interface) + hardware_namespace (str, optional): The namespace for the hardware peripherals. Defaults to "io". + Returns: Robot: The initialize Robot class. """ global mirte - mirte = Robot() + mirte = Robot(machine_namespace, hardware_namespace) return mirte diff --git a/setup.py b/setup.py index 9c932a8..3745ab0 100644 --- a/setup.py +++ b/setup.py @@ -5,18 +5,19 @@ setuptools.setup( name="mirte_robot", - install_requires=["websocket_server", "rospkg"], + install_requires=["websocket_server"], # TODO: Require mirte_msgs? rcl_interfaces? controller_manager_msgs? + # Also requires "rclpy" version="0.1.0", author="Martin Klomp", author_email="m.klomp@tudelft.nl", description="Python API for the Mirte Robot", long_description=long_description, long_description_content_type="text/markdown", - url="https://github.com/zoef-robot/zoef_python", + url="https://github.com/mirte-robot/mirte-python", packages=['mirte_robot'], classifiers=[ "Programming Language :: Python :: 3", - "License :: OSI Approved :: Apache Software License", + "License :: OSI Approved :: BSD License", "Operating System :: OS Independent", ], python_requires='>=3.6',