diff --git a/armarx_control/robots/common.py b/armarx_control/robots/common.py index b858f2008d7969c83802c5330687960ed32ea40b..c60fcc80e88b8fdc113dd0f390bc8101e021d1c9 100644 --- a/armarx_control/robots/common.py +++ b/armarx_control/robots/common.py @@ -398,7 +398,7 @@ class Robot: ) return controller_name, ctrl, ctrl_cfg - def teach(self, robot_node_set: cfg.NodeSet, side: str, filename: str): + def teach(self, robot_node_set: cfg.NodeSet, side: str, filename: str, name_suffix: str = ""): """ Enable kinethestic teaching for the given robot node set, and filename as prefix Args: @@ -411,7 +411,7 @@ class Robot: e.g. 'kinesthetic_teaching-2023-06-18-16-21-15', you will have four files recorded, including file_path-ts-forward.csv, file_path-ts-backward.csv, file_path-js-forward.csv, file_path-js-backward.csv """ - return self.ctrl.kinestheticTeaching(robot_node_set.value, side, filename) + return self.ctrl.kinestheticTeaching(robot_node_set.value, side, filename, name_suffix) def update_controller_config( self, diff --git a/armarx_control/run/adaptRecordingJSTrajectories.py b/armarx_control/run/adaptRecordingJSTrajectories.py index 035d1bff5c5607c293a0c0e060e9a9857062c417..b1688dbff7e04832f4159d05125451cd0e316d2e 100644 --- a/armarx_control/run/adaptRecordingJSTrajectories.py +++ b/armarx_control/run/adaptRecordingJSTrajectories.py @@ -27,8 +27,8 @@ def convert_file(data_path, filename): def main(): - data_path = "/common/homes/staff/krebs/repos/armarx/skills/control/data/armarx_control/kinesthetic_teaching"#"/home/armar-user/code/armarx_integration/robots/armar7/motions/data/armar7_motions/kinesthetic_teaching" - filename = "move_to_dishwasher_handle_5-2024-05-29-16-22-00" + data_path = "/home/armar-user/code_unstable/armarx/skills/control/data/armarx_control/kinesthetic_teaching"#"/home/armar-user/code/armarx_integration/robots/armar7/motions/data/armar7_motions/kinesthetic_teaching" + filename = "grasp_fridge_handle-2024-10-28-19-43-05" convert_file(data_path, filename) return directory = os.fsencode(data_path) diff --git a/armarx_control/run/kinesthetic_teaching.py b/armarx_control/run/kinesthetic_teaching.py index 6ec8607cb1a45ebb73abf39e9c1811d28795cee8..a16c5434341e46a2dfc3407a5a3bbff656d01f9e 100644 --- a/armarx_control/run/kinesthetic_teaching.py +++ b/armarx_control/run/kinesthetic_teaching.py @@ -9,11 +9,11 @@ from armarx_control.config.type import ControllerType from armarx_control.config.utils import get_default_ctrl_cfg -def teach_motion(robot, name: str, node_set: NodeSet): +def teach_motion(robot, name: str, node_set: NodeSet, name_suffix: str = ""): console.log(f"[bold cyan]Kinesthetic teaching with name {name}") side = node_set.get_side(node_set) console.log(f"[bold yellow]you can press the FT sensor of {node_set.value} (on {side} side) to start") - traj_file_stem = robot.teach(node_set, side, name) + traj_file_stem = robot.teach(node_set, side, name, name_suffix) for motion_dir in ["backward", "forward"]: motion_file_ts = Path(f"{traj_file_stem}-ts-{motion_dir}.csv") motion_file_js = Path(f"{traj_file_stem}-js-{motion_dir}.csv") @@ -32,11 +32,13 @@ def teach_motion(robot, name: str, node_set: NodeSet): @click.option("--time_sec", "-t", type=float, default=5.0, help="time duration in seconds") @click.option("--left_arm", "-l", is_flag=True, help="set to use left arm, otherwise right arm") @click.option("--series", "-s", type=int, default=6, help="which robot to use") +@click.option("--name_suffix", "-ns", type=str, default="", help="any name suffix to avoid name collision") def main( name: str = "", time_sec: float = 5.0, left_arm: bool = False, - series: int = 6 + series: int = 6, + name_suffix: str = "", ): np.set_printoptions(suppress=True, precision=3) robot = get_control_robot(RobotName.from_series(series), with_ctrl_creator=True) @@ -50,7 +52,7 @@ def main( raise NotImplementedError(f"{robot.c.name} is not implemented yet") tcp = robot.arm_tcp.RightArm if right_arm else robot.arm_tcp.LeftArm - filename = teach_motion(robot, name, nodeset) + filename = teach_motion(robot, name, nodeset, name_suffix) control_type = ControllerType.TSMPImp appendix = "" diff --git a/armarx_control/run/ts_mp_imp_forward_backward_with_traj.py b/armarx_control/run/ts_mp_imp_forward_backward_with_traj.py index 2f716d6dc4f261b3c3ffc8a8a622b723e0547647..d5347b83d1b1aaf9d53426b827591cad90c77042 100644 --- a/armarx_control/run/ts_mp_imp_forward_backward_with_traj.py +++ b/armarx_control/run/ts_mp_imp_forward_backward_with_traj.py @@ -30,7 +30,7 @@ def run_ctrl( target_pose = [] controller_name, ctrl, _cfg = robot.create_controller( - "python", control_type, ctrl_config, True, True + "python2", control_type, ctrl_config, True, True ) try: @@ -190,4 +190,4 @@ if __name__ == "__main__": # Note: # python ts_mp_imp_forward_backward_with_traj.py -f raise_up_right_hand -t 5.0 -a -r -s 6 # python ts_mp_imp_forward_backward_with_traj.py -f raise_up_right_hand -t 5.0 -a -r -s 6 -js -# python ts_mp_imp_forward_backward_with_traj.py -f raise_up_right_hand -t 5.0 -a -r -s 6 -js -hs \ No newline at end of file +# python ts_mp_imp_forward_backward_with_traj.py -f raise_up_right_hand -t 5.0 -a -r -s 6 -js -hs diff --git a/armarx_control/utils/mirror_trajectory.py b/armarx_control/utils/mirror_trajectory.py new file mode 100644 index 0000000000000000000000000000000000000000..01213eb70f06768e671b8695886c6bd063ab0900 --- /dev/null +++ b/armarx_control/utils/mirror_trajectory.py @@ -0,0 +1,197 @@ +import csv +import os.path +import sys + +from enum import Enum, auto +from typing import List, Tuple +import numpy as np + +from scipy.spatial.transform import Rotation + + +class ConversionDirection(Enum): + """ + From which arm to which arm to convert + """ + LEFT_TO_RIGHT = auto() + RIGHT_TO_LEFT = auto() + + +class Space(Enum): + """ + Space in which the trajectory is represented (joint space or task space, i.e., end effector pose) + """ + JOINT_SPACE = auto() + TASK_SPACE = auto() + + +def resolve_path( + source_path_string: str, conversion_direction: ConversionDirection or None = None +) -> Tuple[str, str, ConversionDirection]: + """ + Resolve path without having this script depending on ArmarX (which would offer the ArmarX package path); + e.g., use $ARMARX_WORKSPACE or even the package dir variable. + The function also detects in which direction to convert, and determines the target path. + + :param source_path_string: Path to the csv file to be converted, + may contain environment variables like $ARMARX_WORKSPACE + :param conversion_direction: LEFT_TO_RIGHT or RIGHT_TO_LEFT, or None for auto-detection based on 'LeftArm' and + 'RightArm' in the source_path + :return: [source path, target path, conversion direction] + """ + source_path = os.path.expandvars(source_path_string) + + if 'LeftArm' in source_path: + if conversion_direction is None: + conversion_direction = ConversionDirection.LEFT_TO_RIGHT + target_path = source_path.replace('LeftArm', 'RightArm') + elif 'RightArm' in source_path: + if conversion_direction is None: + conversion_direction = ConversionDirection.RIGHT_TO_LEFT + target_path = source_path.replace('RightArm', 'LeftArm') + else: + assert conversion_direction is not None, 'Could not detect whether to convert from left to right, or vice '\ + 'versa; please specify manually' + assert source_path.endswith('.csv') + if conversion_direction is ConversionDirection.LEFT_TO_RIGHT: + target_path = source_path.replace('.csv', '-right.csv') + elif conversion_direction is ConversionDirection.RIGHT_TO_LEFT: + target_path = source_path.replace('.csv', '-left.csv') + else: + raise KeyError + return source_path, target_path, conversion_direction + + +def detect_space(header: List[str]): + """ + Evaluate the header of the csv file to detect the space + :param header: column labels + :return: space in which the poses are represented + """ + if header == ['timestamp', 'x', 'y', 'z', 'qw', 'qx', 'qy', 'qz']: + space = Space.TASK_SPACE + # TODO: potentially extend the detection code for joint space + elif header[1] == 'ArmL1_Cla1' or header[1] == 'ArmR1_Cla1': + space = Space.JOINT_SPACE + else: + raise RuntimeError('Could not detect the space') + return space + + +def pose7_to_matrix(pose7: np.ndarray) -> np.ndarray: + """ + convert the x/y/z/qw/qx/qy/qz task space pose to a homogeneous matrix + :param pose7: pose in task space as x/y/z/qw/qx/qy/qz + :return: Pose in task space as homogeneous matrix + """ + position = np.array(pose7[:3]) + orientation = Rotation.from_quat(pose7[3:7], scalar_first=True) + pose = np.identity(4) + pose[:3, 3] = position + pose[:3, :3] = orientation.as_matrix() + return pose + + +def matrix_to_pose7(matrix: np.ndarray) -> np.ndarray: + """ + convert the task space pose provided as a homogeneous matrix to a x/y/z/qw/qx/qy/qz task space pose + :param matrix: Pose in task space as homogeneous matrix + :return: pose in task space as x/y/z/qw/qx/qy/qz + """ + assert matrix.shape == (4, 4) + pose7 = np.empty(7) + pose7[:3] = matrix[:3, 3] + orientation = Rotation.from_matrix(matrix[:3, :3]) + pose7[3:] = orientation.as_quat(scalar_first=True) + return pose7 + + +def mirror_pose(line: List[float], space: Space, conversion_direction: ConversionDirection): + """ + Convert an entry of the trajectory csv file, corresponding to a pose + :param line: entries of the csv line + :param space: space in which the poses are represented + :param conversion_direction: from which arm to which one to convert + :return: modified entries of the csv line, corresponding to the mirrored pose + """ + converted_line = [] + # timestamp remains unchanged + converted_line.append(line[0]) + if space == Space.TASK_SPACE: + pose = pose7_to_matrix(line[1:]) + coeff_wise_factors = np.array([ + [1, -1, -1, -1], + [-1, 1, 1, 1], + [-1, 1, 1, 1], + [1, 1, 1, 1] + ], dtype=float) + # on purpose coefficient-wise, and not a matrix multiplication + converted_pose = coeff_wise_factors * pose + converted_line += list(matrix_to_pose7(converted_pose)) + elif space == Space.JOINT_SPACE: + # for ARMAR-6, determined using RobotViewer + coeff_wise_factors = np.array([ + -1, # Cla + -1, # Sho1 + +1, # Sho2 + -1, # Sho3 + +1, # Elb1 + -1, # Elb2 + -1, # Wri1 + +1 # Wri2 + ], dtype=float) + converted_line += list(coeff_wise_factors * np.array(line[1:], dtype=float)) + else: + raise TypeError + return converted_line + + +def mirror_header(header: List[str], conversion_direction: ConversionDirection) -> List[str]: + """ + Adapt the header of the csv file. For task space trajectories, nothing changes. For joint space trajectories, the + joint names might need to be adapted. Here, it is assumed that all "L" and "R" characters are to be swapped in these + joint names, which is a hard-coded solution for the given robots. + TODO: extend if needed + :param header: column labels + :param conversion_direction: from which arm to which one to convert + :return: column labels for mirrored motion + """ + if conversion_direction is conversion_direction.LEFT_TO_RIGHT: + converted_header = [entry.replace('L', 'R') for entry in header] + elif conversion_direction is conversion_direction.RIGHT_TO_LEFT: + converted_header = [entry.replace('R', 'L') for entry in header] + else: + KeyError + return converted_header + + +def main(path: str, conversion_direction: ConversionDirection or None = None): + source_path, target_path, conversion_direction = resolve_path(path, conversion_direction) + print(f'converting from\n\t{source_path}\nto\n\t{target_path}') + converted_lines = [] + assert os.path.isfile(source_path), f'Input file does not exist: {source_path}' + with open(source_path, 'r') as f: + reader = csv.reader(f) + header = next(reader) + space = detect_space(header) + for line in reader: + converted_lines.append(mirror_pose(line, space, conversion_direction)) + + converted_header = mirror_header(header, conversion_direction) + with open(target_path, 'w') as f: + writer = csv.writer(f) + f.write(','.join(converted_header) + '\n') + writer.writerows(converted_lines) + + +if __name__ == '__main__': + if len(sys.argv) > 1: + # use command line parameter + path = sys.argv[1] + else: + dir_path = '$ARMARX_WORKSPACE/armarx_integration/robots/armar7/motions/data/armar7_motions/mobile_kitchen/RightArm/' + file_name_base = 'through_door_pose_arm_primary_lower_01' + for suffix in ['-ts-forward', '-ts-backward', '-update-js-forward', '-update-js-backward']: + file_name = file_name_base + suffix + '.csv' + path = os.path.join(dir_path, file_name) + main(path, None) diff --git a/armarx_memory/aron/conversion/ndarray/common.py b/armarx_memory/aron/conversion/ndarray/common.py index a0437312715175c980681d2d309ad6ecd86ecb54..af198576e4131d912353ee708321daddce277003 100644 --- a/armarx_memory/aron/conversion/ndarray/common.py +++ b/armarx_memory/aron/conversion/ndarray/common.py @@ -52,7 +52,7 @@ def ndarray_to_aron( return PointCloudConversions.pcl_point_cloud_from_py_point_cloud(value) # General case. - shape = (*value.shape, value.itemsize) + shape = (*value.shape, 1) # value.itemsize) return AronIceTypes.NDArray( shape=shape, type=dtypes_dict_to_aron[str(value.dtype)], diff --git a/armarx_robots/armar_6.py b/armarx_robots/armar_6.py index e4bef00fcdd2b20c6855f8ed9ca67cabc977644a..d3aaf9c364a565454de58fd65bf23b14cc466f37 100644 --- a/armarx_robots/armar_6.py +++ b/armarx_robots/armar_6.py @@ -89,16 +89,6 @@ class A6(Robot): } self.move_joints(joint_angles) - def save_pose(self, pose_name: str): - """ - ..todo:: retrieve current pose and store under name - """ - pass - - def set_pose(self, pose_name: str): - """ """ - pass - def both_arms_zero_torque(self, joint_names=None): """ Sets zero torque mode for both arms diff --git a/armarx_robots/armar_7.py b/armarx_robots/armar_7.py new file mode 100644 index 0000000000000000000000000000000000000000..5dcc6f809c8e958eac9dbfb6140b3f557d36c939 --- /dev/null +++ b/armarx_robots/armar_7.py @@ -0,0 +1,193 @@ +import typing +from functools import lru_cache + +import numpy as np + +from armarx_core import pose_helper +from armarx_robots.basic_robot import Robot + +from armarx import KinematicUnitInterfacePrx + + +from armarx_robots.statechart import StatechartExecutor +from armarx_robots.sensors import Camera +from armarx_skills.core import errors +from armarx_skills.manager.skill_id import SkillID, ProviderID +from armarx_skills.manager.skill_manager import SkillManager, ReconnectingSkillManager + + +class A7(Robot): + """ + ARMAR-7 + + .. highlight:: python + .. code-block:: python + + from armarx.robots import A7 + robot = A7() + robot.say('hello world') + """ + + profile_name = "Armar7Real" + + def __init__(self): + super().__init__() + self.both_arms_joint_names = [ + "ArmL1_Cla1", + "ArmL2_Sho1", + "ArmL3_Sho2", + "ArmL4_Sho3", + "ArmL5_Elb1", + "ArmL6_Elb2", + "ArmL7_Wrist_Hemisphere_A", + "ArmL8_Wrist_Hemisphere_B", + "ArmR1_Cla1", + "ArmR2_Sho1", + "ArmR3_Sho2", + "ArmR4_Sho3", + "ArmR5_Elb1", + "ArmR6_Elb2", + "ArmR7_Wrist_Hemisphere_A", + "ArmR8_Wrist_Hemisphere_B", + ] + self.on_connect() + self.rgbd_camera = Camera("AzureKinectPointCloudProvider") + self.stereo_camera = Camera("ECAM24PointCloudProvider") + self.reconnecting_skill_manager = ReconnectingSkillManager() + self.navigation_skill_provider_id = ProviderID("navigation_skill_provider") + self.move_relative_planar_skill_id = SkillID( + provider_id=self.navigation_skill_provider_id, + skill_name="MoveRelativePlanar", + ) + self.navigate_to_named_location_skill_id = SkillID( + provider_id=self.navigation_skill_provider_id, + skill_name="NavigateToNamedLocation", + ) + self.navigate_to_skill_id = SkillID( + provider_id=self.navigation_skill_provider_id, + skill_name="NavigateTo", + ) + + # self.poses = + + def on_connect(self): + super().on_connect() + # self.kinematic_observer = KinematicUnitObserverInterfacePrx.get_proxy('Armar6KinematicUnitObserver') + + @property + @lru_cache(1) + def kinematic_unit(self): + return KinematicUnitInterfacePrx.get_proxy("Armar7KinematicUnit") + + @property + def skill_manager(self) -> typing.Optional[SkillManager]: + return self.reconnecting_skill_manager.get(wait=False) + + @property + def executed_skill_kwargs(self): + return dict( + executor_name=self.profile_name, + ) + + def execute_skill( + self, + skill_id: SkillID, + parameters_function: typing.Callable[[typing.Dict], None], + asynchronous=False + ): + skill_manager = self.skill_manager + + if skill_manager is not None: + try: + return skill_manager.execute_skill_based_on_default_parameters( + skill_id=skill_id, + parameters_function=parameters_function, + asynchronous=asynchronous, + **self.executed_skill_kwargs, + ) + except errors.SkillNotFoundError: + print("Skill not found") + + else: + print("Failed to connect to skill manager.") + + def init_pose(self): + """ + Sets the joint to a default pose + """ + joint_angles = { + "ArmL1_Cla1" : 0.0, + "ArmL2_Sho1" : 0.30000001192092896, + "ArmL3_Sho2" : 0.25, + "ArmL4_Sho3" : 0.20000000298023224, + "ArmL5_Elb1" : 1.0, + "ArmL6_Elb2" : 1.5, + "ArmL7_Wrist_Hemisphere_A" : 0.0, + "ArmL8_Wrist_Hemisphere_B" : 0.0, + "ArmR1_Cla1" : 0.0, + "ArmR2_Sho1" : -0.30000001192092896, + "ArmR3_Sho2" : 0.25, + "ArmR4_Sho3" : -0.20000000298023224, + "ArmR5_Elb1" : 1.0, + "ArmR6_Elb2" : -1.5, + "ArmR7_Wrist_Hemisphere_A" : 0.0, + "ArmR8_Wrist_Hemisphere_B" : 0.0, + "Hip" : 0.8399999737739563, + "Knee" : 1.0499999523162842, + "LeftHandFingers" : 0.0, + "LeftHandIndex" : 0.0, + "LeftHandThumbCircumduction" : 0.0, + "LeftHandThumbFlexion" : 0.0, + "Neck_1_Yaw" : 0.0, + "Neck_2_Pitch" : 0.0, + "RightHandFingers" : 0.0, + "RightHandIndex" : 0.0, + "RightHandThumbCircumduction" : 0.0, + "RightHandThumbFlexion" : 0.0 + } + + self.move_joints(joint_angles) + + def what_can_you_see_now(self, state_parameters=None): + statechart = StatechartExecutor( + self.profile_name, "ScanLocationGroup", "WhatCanYouSeeNow" + ) + return statechart.run(state_parameters, True) + + def handover(self, state_parameters=None): + statechart = StatechartExecutor( + self.profile_name, "HandOverGroup", "ReceiveFromRobot" + ) + return statechart.run(state_parameters, True) + + def navigate_to_location(self, location_name: str, state_parameters=None): + + def parameter_fn(parameters:dict): + parameters["locationName"] = location_name + + self.execute_skill(self.navigate_to_named_location_skill_id, parameter_fn) + + def navigate_to(self, x, y, yaw, state_parameters=None): + + cos_yaw = np.cos(yaw) + sin_yaw = np.sin(yaw) + + # Define the 4x4 homogeneous transformation matrix + transformation_matrix = np.array([ + [cos_yaw, -sin_yaw, 0, x], + [sin_yaw, cos_yaw, 0, y], + [0, 0, 1, 0], + [0, 0, 0, 1] + ]) + + def parameter_fn(parameters: dict): + parameters["targetPose"] = transformation_matrix + + self.execute_skill(self.navigate_to_skill_id, parameter_fn) + + @property + @lru_cache(1) + def gaze(self): + from armarx import GazeControlInterfacePrx + + return GazeControlInterfacePrx.get_proxy() diff --git a/armarx_robots/basic_robot.py b/armarx_robots/basic_robot.py index c69dd7f1fdf5af83b0aa4b140b336d43ec2d2ca6..5cca815f25814263790319716dafd78e9ed8adb2 100644 --- a/armarx_robots/basic_robot.py +++ b/armarx_robots/basic_robot.py @@ -22,6 +22,11 @@ from armarx import EmergencyStopState from armarx import KinematicUnitInterfacePrx from armarx import HandUnitInterfacePrx + +from armarx_core import pose_helper +import numpy as np +from transforms3d.euler import mat2euler + from armarx_robots.speech import TextStateListener from armarx_robots.statechart import StatechartExecutor from armarx_robots.arms import Bimanual @@ -111,6 +116,17 @@ class Robot(ABC, Bimanual): ) return statechart.run(state_parameters, True) + def save_pose(self, pose_name: str): + """ + ..todo:: retrieve current pose and store under name + """ + pass + + def set_pose(self, pose_name: str): + """ """ + pass + + def say(self, text): """ Verbalizes the given text. SSML markup is supported @@ -144,6 +160,34 @@ class Robot(ABC, Bimanual): EmergencyStopState.eEmergencyStopActive ) + + def wait_for_eef_pose(self, target_pose: np.ndarray, eef_name: str, eps_r:float = 0.1, eps_t:float = 100.9, timeout:float = 5): + """ + Waits until the robot has reached a hand pose or the timeout has been reached + + :param target_pose: the target pose 4x4 + :param eef_name: the name of the robot node, e.g. Hand R TCP + :param eps_r: rotational accuracy + :param eps_t: translational accuracy + :param timeout: the time in seconds to waiy + + """ + target_r = np.array(mat2euler(target_pose[:3, :3])) + target_t = target_pose[:3, 3] + start_time = time.time() + while (time.time() - start_time) < timeout: + actual_pose = pose_helper.robot_state().getRobotNode(eef_name).getPoseInRootFrame() + actual_pose = pose_helper.pose2mat(actual_pose) + if np.linalg.norm(actual_pose[:3, 3] - target_t) < eps_t: + actual_r = np.array(mat2euler(actual_pose[:3, :3])) + if np.linalg.norm(actual_r - target_r) < eps_r: + return True + else: + time.sleep(0.05) + return False + + + def wait_for_joints(self, joint_angles: Dict[str, float], eps=0.1, timeout=5): """ Waits until the robot has reached a pose diff --git a/armarx_skills/manager/skill_manager.py b/armarx_skills/manager/skill_manager.py index 3be311d81a692f9c60a7202298a06a752c9cd006..0471cff2f84cfd4b201c7bf09153b7d94b0754ab 100644 --- a/armarx_skills/manager/skill_manager.py +++ b/armarx_skills/manager/skill_manager.py @@ -313,24 +313,7 @@ class SkillManager: blocking=True, log=None, ) -> ty.Dict[sei.SkillExecutionID, asr.AbortSkillResult]: - execution_ids_to_abort = list() - - execution_status_map = self.get_skill_execution_statuses() - for execution_id, execution_status in execution_status_map.items(): - if execution_status.status in abort_statuses: - log.info(f"{execution_id} RUNNING") - execution_ids_to_abort.append(execution_id) - else: - log.info(f"{execution_id} not running") - - results = dict() - for execution_id in execution_ids_to_abort: - if log is not None: - log.info(f"Aborting skill execution {execution_id} ...") - result = self.abort_skill(skill_execution_id=execution_id, blocking=blocking) - results[execution_id] = result - - return results + return self.proxy.abortAllSkills() @staticmethod def _ice_unset_to_none(optional):