Skip to content
Snippets Groups Projects
Commit c11724aa authored by Julian Tusch's avatar Julian Tusch :no_entry_sign:
Browse files

Merge branch 'master' into dev/fluxio

parents 191cba42 04827d40
No related branches found
No related tags found
1 merge request!68Fluxio preliminary release
Pipeline #22335 failed
Showing
with 642 additions and 92 deletions
......@@ -9,3 +9,8 @@ class MPType(Enum):
hand_ts = "hand_ts"
head = "head"
platform = "platform"
class MPStyle(Enum):
Periodic = "Periodic"
Discrete = "Discrete"
......@@ -22,7 +22,7 @@ def get_control_config_classname(controller_type: ControllerType) -> CommonContr
return ControllerType.TSImpCol.value
elif controller_type == ControllerType.TSMixImpVel or controller_type == ControllerType.TSMPMixImpVel:
return ControllerType.TSMixImpVel.value
elif controller_type == ControllerType.TSVel or controller_type == ControllerType.TSMPVel:
elif controller_type == ControllerType.TSVel or controller_type == ControllerType.TSMPVel or controller_type == ControllerType.TSMPVelWiping:
return ControllerType.TSVel.value
elif controller_type == ControllerType.TSMixImpVelCol or controller_type == ControllerType.TSMPMixImpVelCol:
return ControllerType.TSMixImpVelCol.value
......@@ -44,7 +44,7 @@ def get_controller_config_from_json(controller_type: ControllerType, cfg_dir: Pa
return TSCollisionAvoidanceImpedanceControllerConfig.from_json(cfg_dir.as_posix())
elif controller_type == ControllerType.TSMixImpVel or controller_type == ControllerType.TSMPMixImpVel:
return TaskspaceMixedImpedanceVelocityControllerConfig.from_json(cfg_dir.as_posix())
elif controller_type == ControllerType.TSVel or controller_type == ControllerType.TSMPVel:
elif controller_type == ControllerType.TSVel or controller_type == ControllerType.TSMPVel or controller_type == ControllerType.TSMPVelWiping:
return TaskspaceVelocityControllerConfig.from_json(cfg_dir.as_posix())
elif controller_type == ControllerType.TSMixImpVelCol or controller_type == ControllerType.TSMPMixImpVelCol:
return TSCollisionAvoidanceMixedImpedanceVelocityControllerConfig.from_json(cfg_dir.as_posix())
......@@ -67,7 +67,7 @@ def get_controller_config_from_aron(controller_type: ControllerType, dto: Any) -
return TSCollisionAvoidanceImpedanceControllerConfig.from_aron_ice(dto)
elif controller_type == ControllerType.TSMixImpVel or controller_type == ControllerType.TSMPMixImpVel:
return TaskspaceMixedImpedanceVelocityControllerConfig.from_aron_ice(dto)
elif controller_type == ControllerType.TSVel or controller_type == ControllerType.TSMPVel:
elif controller_type == ControllerType.TSVel or controller_type == ControllerType.TSMPVel or controller_type == ControllerType.TSMPVelWiping:
return TaskspaceVelocityControllerConfig.from_aron_ice(dto)
elif controller_type == ControllerType.TSMixImpVelCol or controller_type == ControllerType.TSMPMixImpVelCol:
return TSCollisionAvoidanceMixedImpedanceVelocityControllerConfig.from_aron_ice(dto)
......
import numpy as np
from typing import Dict, Optional
from dataclasses import dataclass
from armarx_control.config.common import CommonControlConfig, default_field
@dataclass
class WipingAdaptionConfig(CommonControlConfig):
original_kp_imp: Optional[np.ndarray] = default_field(np.zeros((6, 1), dtype=np.float32))
original_kp_vel: Optional[np.ndarray] = default_field(np.zeros((6, 1), dtype=np.float32))
kpf: float = 0.0
target_force: float = 5.0
dead_zone_force: float = 2.0
adapt_force_coeff: float = 0.0
adapt_coeff: float = 0.0
change_position_tolerance: float = 0.0
change_timer_threshold: float = 0.0
def __post_init__(self):
if self.original_kp_imp is not None:
self.original_kp_imp = np.array(self.original_kp_imp, dtype=np.float32).reshape((-1, 1))
if self.original_kp_vel is not None:
self.original_kp_vel = np.array(self.original_kp_vel, dtype=np.float32).reshape((-1, 1))
@dataclass
class WipingAdaptionConfigs(CommonControlConfig):
cfg: Dict[str, WipingAdaptionConfig] = default_field({})
if __name__ == "__main__":
from armarx_control.utils.pkg import get_armarx_package_data_dir
p = get_armarx_package_data_dir("armarx_control")
# p = p / "controller_config/NJointTaskspaceImpedanceController/default.json"
p = p / "controller_config/NJointWipingVelMPController/default_adaptive_cfg_right.json"
if not p.is_file():
raise FileExistsError(f"{p} does not exist")
c = WipingAdaptionConfigs().from_json(str(p))
ic(c)
d = WipingAdaptionConfigs.from_aron_ice(c.to_aron_ice())
ic(d)
......@@ -13,7 +13,7 @@ class ControllerType(Enum):
# KptAdm = "NJointKeypointsAdmittanceController"
# KptImp = "KeypointImpedance"
# Note: taskspace torque Movement Primitive controllers
# Note: taskspace Movement Primitive controllers
TSMPAdm = "NJointTSAdmittanceMPController"
TSMPImp = "NJointTSImpedanceMPController"
TSMPImpSafe = "NJointTSSafetyImpedanceMPController"
......@@ -21,6 +21,8 @@ class ControllerType(Enum):
TSMPMixImpVel = "NJointTSMixedImpedanceVelocityMPController"
TSMPMixImpVelCol = "NJointTSCollisionAvoidanceMixedImpedanceVelocityMPController"
TSMPVel = "NJointTSVelocityMPController"
TSMPMixImpVelColWiping = "NJointTSVelocityMPController"
TSMPVelWiping = "NJointWipingVelMPController"
# KptMPAdm = "KptMPAdm"
# KptMPImp = "KptMPImp"
# KVILMPImp = "KVILMPImp"
......
......@@ -3,7 +3,7 @@ from typing import List, Union
from pathlib import Path
from armarx_control.config.type import ControllerType
from armarx_control.config.common import RobotName
from armarx_control.config.mp import MPType
from armarx_control.config.mp import MPType, MPStyle
from armarx_control.config.mp.mp_config import MPConfig, MPTraj, MPListConfig, ListViaPoint
from armarx_control.utils.pkg import get_armarx_package_data_dir
......@@ -16,7 +16,8 @@ def get_default_mp_cfg(
trajs: Union[List[np.ndarray], List[Path]] = None,
flag_read_traj: bool = True,
via_canonical_value: List[np.float64] = None,
via_point_value: List[np.ndarray] = None
via_point_value: List[np.ndarray] = None,
mp_style: MPStyle = None,
) -> MPConfig:
"""
This interface allow you to set the mostly used parameters, for advanced user, you can modify other params after
......@@ -31,6 +32,7 @@ def get_default_mp_cfg(
flag_read_traj:
via_canonical_value:
via_point_value:
mp_style: either Discrete or Periodic
Returns: MPConfig
......@@ -46,6 +48,8 @@ def get_default_mp_cfg(
cfg.add_trajs(trajs, flag_read_traj)
if via_canonical_value is not None and via_point_value is not None:
cfg.set_via_points(via_canonical_value, via_point_value)
if mp_style is not None:
cfg.mp_style = mp_style.value
cfg.duration_sec = np.float64(duration_sec)
return cfg
......
......@@ -116,6 +116,12 @@ def cast_controller(controller_ptr, controller_type: ControllerType):
from armarx.control import NJointTSCollisionAvoidanceMixedImpedanceVelocityMPControllerInterfacePrx
return NJointTSCollisionAvoidanceMixedImpedanceVelocityMPControllerInterfacePrx.checkedCast(controller_ptr)
elif controller_type == ControllerType.TSMPVelWiping:
load_ts_njoint_mp_controller_interface()
from armarx.control import NJointWipingVelMPControllerInterfacePrx
return NJointWipingVelMPControllerInterfacePrx.checkedCast(controller_ptr)
# elif controller_type == ControllerType.BiKAC:
# load_ts_njoint_mp_controller_interface()
# from armarx.control import NJointBiKACInterfacePrx
......
import numpy as np
from pathlib import Path
right_hand_init_pose = np.array([170.003, 293.222, 977.006, 0.68272, -0.7170, -0.1315, 0.04856])
right_arm_init_angle = np.array([0.000, -0.30, 0.250, -0.20, 1.829, -1.50, 0.000, 0.000])
radius = 100.0
center = right_hand_init_pose[:3] + np.array([0, radius, 0.])
n_samples = 500
traj = np.zeros((n_samples, 8))
traj[:, 0] = np.linspace(0, 2, n_samples)
alpha = np.linspace(0, np.pi * 2, n_samples)
traj[:, 1] = np.cos(alpha) * radius + center[0]
traj[:, 2] = np.sin(alpha) * radius + center[1]
traj[:, 3] = center[2]
traj[:, 4:] = right_hand_init_pose[3:]
joint_traj = np.zeros((n_samples, 9))
joint_traj[:, 0] = np.linspace(0, 2, n_samples)
joint_traj[:, 1:] = right_arm_init_angle
path = Path("/common/homes/staff/gao/code/armarx/skills/control/data/armarx_control/common_motions/testing")
np.savetxt((path / "wiping_circle_ts.csv").as_posix(), traj, delimiter=",")
np.savetxt((path / "wiping_circle_js.csv").as_posix(), joint_traj, delimiter=",")
import cv2
from armarx_control.robots.common import Robot, cfg
from armarx_control.config.common import get_armar7_default_config_for_control
cam_mode = "mono"
robot_cfg = get_armar7_default_config_for_control(with_mono=True)
robot = Robot(robot_cfg)
while True:
if cam_mode == "stereo":
images, info = robot.get_stereo_images()
img = images[0]
depth = None
else:
img, depth, info = robot.get_mono_images(True)
cv2.imshow('Video Stream (RGB)', img[..., ::-1])
if cam_mode == "mono":
cv2.imshow('Video Stream (Depth)', depth)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
cv2.destroyAllWindows()
\ No newline at end of file
......@@ -54,6 +54,7 @@ def run_ctrl(
# ctrl.start("hand", [], [], duration)
_start = time.time()
_start_loop = _start
ready_to_stop = False
while True:
time.sleep(0.001)
......@@ -67,6 +68,8 @@ def run_ctrl(
console.log("[bold green]stop impedance controller")
break
duration = time.time() - _start_loop
console.log(f"[bold green]controller finished in {duration} seconds")
console.log(f"current position right: "
f"\n{robot.get_pose(tcp_name)[:3, 3]}")
console.log(f"target pose right: \n"
......@@ -96,19 +99,19 @@ def get_mp_config(
mp_cfg = MPListConfig()
pkg_path = get_armarx_package_data_dir("armarx_control")
ts_traj = pkg_path / "kinesthetic_teaching" / f"{traj_file_stem}-ts-{dir_str}.csv"
ts_traj = pkg_path / "common_motions/testing" / f"{traj_file_stem}-ts-{dir_str}.csv"
mp_cfg.mp_list.append(get_default_mp_cfg(
mp_type=MPType.taskspace, duration_sec=np.float64(time_sec),
name="taskspace", node_set_name=nodeset.value,
trajs=[ts_traj], flag_read_traj=True
trajs=[ts_traj, ts_traj, ts_traj, ts_traj, ts_traj, ts_traj, ts_traj, ts_traj, ts_traj], flag_read_traj=True
))
if nullspace:
js_traj = pkg_path / "kinesthetic_teaching" / f"{traj_file_stem}-js-{dir_str}.csv"
js_traj = pkg_path / "common_motions/testing" / f"{traj_file_stem}-js-{dir_str}.csv"
mp_cfg.mp_list.append(get_default_mp_cfg(
mp_type=MPType.nullspace, duration_sec=np.float64(time_sec),
name="nullspace", node_set_name=nodeset.value,
trajs=[js_traj], flag_read_traj=True
trajs=[js_traj, js_traj, js_traj, js_traj, js_traj, js_traj, js_traj, js_traj, js_traj], flag_read_traj=True
))
if hands:
......@@ -123,7 +126,7 @@ def get_mp_config(
mp_cfg.mp_list.append(get_default_mp_cfg(
mp_type=MPType.hand, duration_sec=np.float64(time_sec),
name="hand", node_set_name="RightHand",
trajs=[hand_traj], flag_read_traj=True,
trajs=[hand_traj, hand_traj, hand_traj, hand_traj, hand_traj, hand_traj, hand_traj, hand_traj, hand_traj], flag_read_traj=True,
via_canonical_value=[np.float64(1.0), np.float64(0.5), np.float64(0.0)],
via_point_value=[np.zeros(hand_dim, dtype=np.float64), np.ones(hand_dim, dtype=np.float64), np.zeros(hand_dim, dtype=np.float64)],
))
......
import time
import click
import numpy as np
from typing import Union
from armarx_control import console
from armarx_control.config.mp import MPStyle
from armarx_control.utils.pkg import get_armarx_package_data_dir
from armarx_control.utils.interact import ask_binary
from armarx_control.config.type import ControllerType
from armarx_control.robots import get_control_robot, RobotName, Robot
from armarx_control.config.njoint_controllers import CommonControlConfig
from armarx_control.config.utils import get_default_ctrl_cfg, get_default_mp_cfg, MPType
from armarx_control.config.mp.mp_config import MPListConfig
from armarx_control.config.njoint_controllers.taskspace_wiping import WipingAdaptionConfigs, WipingAdaptionConfig
def run_ctrl(
robot: Robot,
control_type: ControllerType,
ctrl_config: Union[str, CommonControlConfig],
mp_config: MPListConfig,
nodeset,
tcp,
duration: float = 5.0,
flag_delete_controller: bool = True,
nullspace: bool = False,
hands: bool = False,
adaptive_config: WipingAdaptionConfigs = None,
):
tcp_name = tcp.value
node_set_name = nodeset.value
target_pose = []
controller_name, ctrl, _cfg = robot.create_controller(
"python", control_type, ctrl_config, True, True
)
ctrl.updateAdaptiveConfig(adaptive_config.to_aron_ice())
try:
current_pose = robot.get_controller_config(controller_name).limbs.get(node_set_name).get_desired_pose_vector()
ctrl.updateMPConfig(mp_config.to_aron_ice())
ctrl.trainMP()
console.log(f"[bold cyan]Start with current position: \n{current_pose[:3]}, "
f"target pose: \n{target_pose}")
input("Press `Enter` to continue ... ")
console.rule("[bold blue]Start movement primitive controller")
# 1) method 1: start all at once, all MPs must be configured properly with all viapoint setup, trajectories
ctrl.startAll()
# 2) Method 2: explicitly call start with "all" string, all other arguments are ignored
# ctrl.start("all", [], [], duration)
# 3) Method 3: start individual MPs
# ctrl.start("taskspace", current_pose, target_pose, duration)
# if nullspace:
# ctrl.start("nullspace", [], target_pose, duration)
# if hands:
# ctrl.start("hand", [], [], duration)
_start = time.time()
ready_to_stop = False
last_can_value = 1.0
start_time = time.time()
while True:
time.sleep(0.001)
# ic(ctrl.getCanVal("taskspace"), ctrl.getCanVal("nullspace"), ctrl.getCanVal("hand"))
can_value = ctrl.getCanVal("taskspace")
if can_value > 0.99 and last_can_value < 0.01:
console.log(f"[blue]last period took {time.time() - start_time:>2.3f} seconds")
start_time = time.time()
last_can_value = can_value
if ctrl.isFinishedAll() and not ready_to_stop:
_start = time.time()
console.log("[bold green]VMP controller finished")
ready_to_stop = True
if ready_to_stop and (time.time() - _start) > 1.5:
console.log("[bold green]stop impedance controller")
break
console.log(f"current position right: "
f"\n{robot.get_pose(tcp_name)[:3, 3]}")
console.log(f"target pose right: \n"
f"{robot.get_controller_config(controller_name).limbs.get(node_set_name).get_desired_pose()[:3, 3]}")
except RuntimeError as e:
console.log(f"error: {e}")
robot.delete_controller(controller_name)
finally:
if flag_delete_controller:
console.log(f"[bold green]deleting controller")
robot.delete_controller(controller_name)
console.rule("done")
def get_mp_config(
traj_file_stem: str,
robot: Robot,
time_sec: float,
dir_str: str,
nodeset,
nullspace: bool = False,
hands: bool = False,
mp_style: MPStyle = MPStyle.Periodic
):
console.rule(f"[bold cyan]running {dir_str} MP ...")
mp_cfg = MPListConfig()
pkg_path = get_armarx_package_data_dir("armarx_control")
ts_traj = pkg_path / "common_motions/testing" / f"{traj_file_stem}-ts-{dir_str}.csv"
mp_cfg.mp_list.append(get_default_mp_cfg(
mp_type=MPType.taskspace, duration_sec=np.float64(time_sec),
name="taskspace", node_set_name=nodeset.value,
trajs=[ts_traj], flag_read_traj=True, mp_style=mp_style
))
if nullspace:
js_traj = pkg_path / "common_motions/testing" / f"{traj_file_stem}-js-{dir_str}.csv"
mp_cfg.mp_list.append(get_default_mp_cfg(
mp_type=MPType.nullspace, duration_sec=np.float64(time_sec),
name="nullspace", node_set_name=nodeset.value,
trajs=[js_traj], flag_read_traj=True, mp_style=mp_style
))
if hands:
if robot.c.name.is_a6():
hand_dim = 2
elif robot.c.name.is_a7():
hand_dim = 4
else:
raise NotImplementedError
hand_traj = np.zeros((100, hand_dim+1), dtype=np.float64)
hand_traj[:, 0] = np.linspace(0, 1, 100, dtype=np.float64)
mp_cfg.mp_list.append(get_default_mp_cfg(
mp_type=MPType.hand, duration_sec=np.float64(time_sec),
name="hand", node_set_name="RightHand",
trajs=[hand_traj], flag_read_traj=True,
via_canonical_value=[np.float64(1.0), np.float64(0.5), np.float64(0.0)],
via_point_value=[np.zeros(hand_dim, dtype=np.float64), np.ones(hand_dim, dtype=np.float64), np.zeros(hand_dim, dtype=np.float64)],
mp_style=mp_style
))
return mp_cfg
def get_default_adaption_cfg():
from armarx_control.utils.pkg import get_armarx_package_data_dir
p = get_armarx_package_data_dir("armarx_control")
p = p / "controller_config/NJointWipingVelMPController/default_adaptive_cfg_right.json"
if not p.is_file():
raise FileExistsError(f"{p} does not exist")
c = WipingAdaptionConfigs().from_json(p.as_posix())
return c
@click.command(context_settings=dict(help_option_names=['-h', '--help']))
@click.option("--filename", "-f", type=str, help="abs path to the recorded traj (end before -ts-forward/-ts-backward")
@click.option("--time_sec", "-t", type=float, default=5.0, help="time duration in seconds")
@click.option("--from_aron", "-a", is_flag=True, help="to create controller from aron config")
@click.option("--right_arm", "-r", is_flag=True, help="set to use right arm, otherwise left arm")
@click.option("--joint_space", "-js", is_flag=True, help="whether to enable joint space mp along with task space")
@click.option("--hands", "-hs", is_flag=True, help="whether to enable hands mp along with task space")
@click.option("--series", "-s", type=int, default=6, help="which robot to use")
def reproduction(
filename: str,
time_sec: float = 5.0,
from_aron: bool = True,
right_arm: bool = False,
joint_space: bool = False,
hands: bool = False,
series: int = 6,
):
np.set_printoptions(suppress=True, precision=3)
robot = get_control_robot(RobotName.from_series(series), with_ctrl_creator=True)
control_type = ControllerType.TSMPVelWiping
appendix = "right_with_hands_sim"
ctrl_cfg = get_default_ctrl_cfg(control_type, robot.c.name, appendix=appendix, as_file=not from_aron)
nodeset = robot.node_set.RightArm if right_arm else robot.node_set.LeftArm
tcp = robot.arm_tcp.RightArm if right_arm else robot.arm_tcp.LeftArm
move_dir = "forward"
mp_cfg = get_mp_config(filename, robot, time_sec, move_dir, nodeset, nullspace=joint_space, hands=hands)
adapt_cfg = get_default_adaption_cfg()
ic(mp_cfg)
if ask_binary(f"Move in a {move_dir} way? "):
run_ctrl(robot, control_type, ctrl_cfg, mp_cfg, nodeset, tcp,
duration=time_sec, nullspace=joint_space, hands=hands, flag_delete_controller=False,
adaptive_config=adapt_cfg)
if __name__ == "__main__":
reproduction()
# Note:
# python ts_mp_vel_wiping.py -f raise_up_right_hand -t 5.0 -a -r -s 6
# python ts_mp_vel_wiping.py -f raise_up_right_hand -t 5.0 -a -r -s 6 -js
# python ts_mp_vel_wiping.py -f raise_up_right_hand -t 5.0 -a -r -s 6 -js -hs
\ No newline at end of file
from typing import Union, Dict, Any
from pathlib import Path
from paramiko import SSHClient, SSHException
from scp import SCPClient
from armarx_control import console
# from typing import Union, Dict, Any
# from pathlib import Path
# from paramiko import SSHClient, SSHException
# from scp import SCPClient
# from armarx_control import console
armar6_ssh_config = {
'hostname': '10.6.2.100',
'port': 22,
'username': "armar-user",
'timeout': 5
}
armar7_ssh_config = {
'hostname': '10.6.71.100',
'port': 22,
'username': "armar-user",
'timeout': 5
}
def copy_file_to_robot(
filename: Path,
target_folder_on_robot: str = "",
ssh_config: Dict[str, Any] = None,
):
if not target_folder_on_robot:
target_folder_on_robot = "/home/armar-user/armar6_motion/kinesthetic_teaching"
if ssh_config is None:
ssh_config = armar6_ssh_config
with SSHClient() as ssh:
ssh.load_system_host_keys()
try:
ssh.connect(**ssh_config)
console.log(f"Connected to {ssh_config['hostname']}")
except SSHException as ex:
console.print(f"[bold red]{ex}")
console.log(f"[bold red]Cannot connect to {ssh_config}")
return None
try:
with SCPClient(ssh.get_transport()) as scp:
if filename.is_file():
console.log(f"[bold green]source file: {filename}")
scp.put(str(filename), target_folder_on_robot, recursive=True)
console.log(f"[yellow]file {Path(target_folder_on_robot) / filename.name} sent to robot")
return Path(target_folder_on_robot) / filename.name
else:
console.log(f"[bold red]{filename} doesn't exist")
return None
except RuntimeError:
console.log(f"ssh copy failed")
#
# armar6_ssh_config = {
# 'hostname': '10.6.2.100',
# 'port': 22,
# 'username': "armar-user",
# 'timeout': 5
# }
#
# armar7_ssh_config = {
# 'hostname': '10.6.71.100',
# 'port': 22,
# 'username': "armar-user",
# 'timeout': 5
# }
#
#
# def copy_file_to_robot(
# filename: Path,
# target_folder_on_robot: str = "",
# ssh_config: Dict[str, Any] = None,
# ):
# if not target_folder_on_robot:
# target_folder_on_robot = "/home/armar-user/armar6_motion/kinesthetic_teaching"
# if ssh_config is None:
# ssh_config = armar6_ssh_config
#
# with SSHClient() as ssh:
# ssh.load_system_host_keys()
# try:
# ssh.connect(**ssh_config)
# console.log(f"Connected to {ssh_config['hostname']}")
# except SSHException as ex:
# console.print(f"[bold red]{ex}")
# console.log(f"[bold red]Cannot connect to {ssh_config}")
# return None
# try:
# with SCPClient(ssh.get_transport()) as scp:
# if filename.is_file():
# console.log(f"[bold green]source file: {filename}")
# scp.put(str(filename), target_folder_on_robot, recursive=True)
# console.log(f"[yellow]file {Path(target_folder_on_robot) / filename.name} sent to robot")
# return Path(target_folder_on_robot) / filename.name
# else:
# console.log(f"[bold red]{filename} doesn't exist")
# return None
# except RuntimeError:
# console.log(f"ssh copy failed")
import csv
import os.path
import sys
from enum import Enum, auto
from typing import List, Tuple
import numpy as np
from scipy.spatial.transform import Rotation
class ConversionDirection(Enum):
"""
From which arm to which arm to convert
"""
LEFT_TO_RIGHT = auto()
RIGHT_TO_LEFT = auto()
class Space(Enum):
"""
Space in which the trajectory is represented (joint space or task space, i.e., end effector pose)
"""
JOINT_SPACE = auto()
TASK_SPACE = auto()
def resolve_path(
source_path_string: str, conversion_direction: ConversionDirection or None = None
) -> Tuple[str, str, ConversionDirection]:
"""
Resolve path without having this script depending on ArmarX (which would offer the ArmarX package path);
e.g., use $ARMARX_WORKSPACE or even the package dir variable.
The function also detects in which direction to convert, and determines the target path.
:param source_path_string: Path to the csv file to be converted,
may contain environment variables like $ARMARX_WORKSPACE
:param conversion_direction: LEFT_TO_RIGHT or RIGHT_TO_LEFT, or None for auto-detection based on 'LeftArm' and
'RightArm' in the source_path
:return: [source path, target path, conversion direction]
"""
source_path = os.path.expandvars(source_path_string)
if 'LeftArm' in source_path:
if conversion_direction is None:
conversion_direction = ConversionDirection.LEFT_TO_RIGHT
target_path = source_path.replace('LeftArm', 'RightArm')
elif 'RightArm' in source_path:
if conversion_direction is None:
conversion_direction = ConversionDirection.RIGHT_TO_LEFT
target_path = source_path.replace('RightArm', 'LeftArm')
else:
assert conversion_direction is not None, 'Could not detect whether to convert from left to right, or vice '\
'versa; please specify manually'
assert source_path.endswith('.csv')
if conversion_direction is ConversionDirection.LEFT_TO_RIGHT:
target_path = source_path.replace('.csv', '-right.csv')
elif conversion_direction is ConversionDirection.RIGHT_TO_LEFT:
target_path = source_path.replace('.csv', '-left.csv')
else:
raise KeyError
return source_path, target_path, conversion_direction
def detect_space(header: List[str]):
"""
Evaluate the header of the csv file to detect the space
:param header: column labels
:return: space in which the poses are represented
"""
if header == ['timestamp', 'x', 'y', 'z', 'qw', 'qx', 'qy', 'qz']:
space = Space.TASK_SPACE
# TODO: potentially extend the detection code for joint space
elif header[1] == 'ArmL1_Cla1' or header[1] == 'ArmR1_Cla1':
space = Space.JOINT_SPACE
else:
raise RuntimeError('Could not detect the space')
return space
def pose7_to_matrix(pose7: np.ndarray) -> np.ndarray:
"""
convert the x/y/z/qw/qx/qy/qz task space pose to a homogeneous matrix
:param pose7: pose in task space as x/y/z/qw/qx/qy/qz
:return: Pose in task space as homogeneous matrix
"""
position = np.array(pose7[:3])
orientation = Rotation.from_quat(pose7[3:7], scalar_first=True)
pose = np.identity(4)
pose[:3, 3] = position
pose[:3, :3] = orientation.as_matrix()
return pose
def matrix_to_pose7(matrix: np.ndarray) -> np.ndarray:
"""
convert the task space pose provided as a homogeneous matrix to a x/y/z/qw/qx/qy/qz task space pose
:param matrix: Pose in task space as homogeneous matrix
:return: pose in task space as x/y/z/qw/qx/qy/qz
"""
assert matrix.shape == (4, 4)
pose7 = np.empty(7)
pose7[:3] = matrix[:3, 3]
orientation = Rotation.from_matrix(matrix[:3, :3])
pose7[3:] = orientation.as_quat(scalar_first=True)
return pose7
def mirror_pose(line: List[float], space: Space, conversion_direction: ConversionDirection):
"""
Convert an entry of the trajectory csv file, corresponding to a pose
:param line: entries of the csv line
:param space: space in which the poses are represented
:param conversion_direction: from which arm to which one to convert
:return: modified entries of the csv line, corresponding to the mirrored pose
"""
converted_line = []
# timestamp remains unchanged
converted_line.append(line[0])
if space == Space.TASK_SPACE:
pose = pose7_to_matrix(line[1:])
coeff_wise_factors = np.array([
[1, -1, -1, -1],
[-1, 1, 1, 1],
[-1, 1, 1, 1],
[1, 1, 1, 1]
], dtype=float)
# on purpose coefficient-wise, and not a matrix multiplication
converted_pose = coeff_wise_factors * pose
converted_line += list(matrix_to_pose7(converted_pose))
elif space == Space.JOINT_SPACE:
# for ARMAR-6, determined using RobotViewer
coeff_wise_factors = np.array([
-1, # Cla
-1, # Sho1
+1, # Sho2
-1, # Sho3
+1, # Elb1
-1, # Elb2
-1, # Wri1
+1 # Wri2
], dtype=float)
converted_line += list(coeff_wise_factors * np.array(line[1:], dtype=float))
else:
raise TypeError
return converted_line
def mirror_header(header: List[str], conversion_direction: ConversionDirection) -> List[str]:
"""
Adapt the header of the csv file. For task space trajectories, nothing changes. For joint space trajectories, the
joint names might need to be adapted. Here, it is assumed that all "L" and "R" characters are to be swapped in these
joint names, which is a hard-coded solution for the given robots.
TODO: extend if needed
:param header: column labels
:param conversion_direction: from which arm to which one to convert
:return: column labels for mirrored motion
"""
if conversion_direction is conversion_direction.LEFT_TO_RIGHT:
converted_header = [entry.replace('L', 'R') for entry in header]
elif conversion_direction is conversion_direction.RIGHT_TO_LEFT:
converted_header = [entry.replace('R', 'L') for entry in header]
else:
KeyError
return converted_header
def main(path: str, conversion_direction: ConversionDirection or None = None):
source_path, target_path, conversion_direction = resolve_path(path, conversion_direction)
print(f'converting from\n\t{source_path}\nto\n\t{target_path}')
converted_lines = []
assert os.path.isfile(source_path), f'Input file does not exist: {source_path}'
with open(source_path, 'r') as f:
reader = csv.reader(f)
header = next(reader)
space = detect_space(header)
for line in reader:
converted_lines.append(mirror_pose(line, space, conversion_direction))
converted_header = mirror_header(header, conversion_direction)
with open(target_path, 'w') as f:
writer = csv.writer(f)
f.write(','.join(converted_header) + '\n')
writer.writerows(converted_lines)
if __name__ == '__main__':
if len(sys.argv) > 1:
# use command line parameter
path = sys.argv[1]
else:
dir_path = '$ARMARX_WORKSPACE/armarx_integration/robots/armar7/motions/data/armar7_motions/mobile_kitchen/RightArm/'
file_name_base = 'through_door_pose_arm_primary_lower_01'
for suffix in ['-ts-forward', '-ts-backward', '-update-js-forward', '-update-js-backward']:
file_name = file_name_base + suffix + '.csv'
path = os.path.join(dir_path, file_name)
main(path, None)
......@@ -3,7 +3,7 @@ import time
import numpy as np
import dataclasses
from enum import Enum, auto
from enum import IntEnum
import armarx_core
from armarx_core.ice_conversion.ice_converter import IceConverter
......@@ -14,14 +14,16 @@ slice_loader.load_armarx_slice("ArmarXCore", "core/time.ice")
from armarx.core.time.dto import DateTime as DateTimeIce
from armarx.core.time.dto.ClockType import ClockTypeEnum
from armarx_core.time.duration import Duration, MIN_LONG_CPP
MIN_LONG_CPP = -9223372036854775808
INVALID_TIME_USEC = MIN_LONG_CPP
# auxiliary function
def time_usec() -> int:
return int(time.time() * 1e6)
# Converter between dto and a plain int value
class DateTimeIceConverter(IceConverter):
@classmethod
......@@ -41,19 +43,11 @@ class DateTimeIceConverter(IceConverter):
# DateTime type corresponding to C++ type (preferred)
@dataclasses.dataclass
class ClockType(Enum):
Realtime = auto()
Monotonic = auto()
Virtual = auto()
Unknown = auto()
def to_aron(self):
return pythonic_to_aron_ice(self.value)
@dataclasses.dataclass
class Duration(AronDataclass):
microSeconds: np.int64 = MIN_LONG_CPP
class ClockType(IntEnum):
Realtime = 0
Monotonic = 1
Virtual = 2
Unknown = 3
@dataclasses.dataclass
......@@ -70,7 +64,7 @@ class DateTime(AronDataclass):
@staticmethod
def from_seconds(time_in_sec: float):
dto = DateTime()
dto.timeSinceEpoch.microSeconds = int(round(time_in_sec * 1e6))
dto.timeSinceEpoch.microSeconds = np.int64(round(time_in_sec * 1e6))
return dto
......
import dataclasses
import numpy as np
import armarx_core
from armarx_core.ice_conversion.ice_converter import IceConverter
from armarx_core import slice_loader
from armarx_core.time.date_time import MIN_LONG_CPP
from armarx_memory.aron.aron_dataclass import AronDataclass
slice_loader.load_armarx_slice("ArmarXCore", "core/time.ice")
from armarx.core.time import dto
MIN_LONG_CPP = -9223372036854775808
@dataclasses.dataclass
class Duration(AronDataclass):
microSeconds: np.int64 = MIN_LONG_CPP, # std::numeric_limits<long>::min()
class Duration:
def __init__(
self,
microseconds = MIN_LONG_CPP, # std::numeric_limits<long>::min()
):
self.microseconds = microseconds
@staticmethod
def from_seconds(seconds: float) -> "Duration":
return Duration(np.int64(round(seconds * 1e6)))
def as_seconds(self) -> float:
print(self.microSeconds)
return self.microSeconds * 1e-6
def __repr__(self) -> str:
return f"<Duration {self.microseconds} µs>"
......@@ -23,7 +31,7 @@ class Duration:
def __str__(self) -> str:
unit = "µs"
time_count = self.microseconds
time_count = self.microSeconds
unit_ratios = [
("ms", 1000),
......@@ -49,12 +57,12 @@ class DurationIceConv(IceConverter):
def _from_ice(self, dto: dto.Duration) -> Duration:
return Duration(
microseconds=dto.microSeconds,
microSeconds=dto.microSeconds,
)
def _to_ice(self, bo: Duration) -> dto.Duration:
return dto.Duration(
microSeconds=bo.microseconds,
microSeconds=bo.microSeconds,
)
......@@ -67,3 +75,9 @@ if __name__ == '__main__':
dt2 = conv.to_ice(bo)
print(dt2)
assert dt2 == dt
# without converter, using standard aron conversions:
dto = Duration.from_seconds(1.2)
ice = dto.to_aron_ice()
dto2 = Duration.from_aron_ice(ice)
assert dto.as_seconds() == 1.2
from __future__ import annotations
import dataclasses as dc
import numpy as np
......@@ -42,3 +44,14 @@ class FramedPose(AronDataclass):
"""
header: FrameID = dc.field(default_factory=FrameID)
pose: np.ndarray = dc.field(default_factory=lambda: np.eye(4, dtype=np.float32))
@classmethod
def with_type_check(cls, header: FrameID, pose: np.ndarray) -> FramedPose:
"""
Ensures that the data type of the pose array is correct; otherwise, there might be problems when reading
the memory from C++
"""
assert isinstance(header, FrameID)
assert isinstance(pose, np.ndarray)
assert pose.shape == (4, 4)
return cls(header, pose.astype(np.float32))
......@@ -296,7 +296,7 @@ class DataclassFromToDict:
union_types = type_.__args__
assert type(None) in union_types, (type_, origin, union_types)
else:
assert type_ == type(None), (type_, origin) # Note, If this fails, comment out
assert type_ == type(None), (type_, origin) # Note, If this fails, comment out; beforehand, make sure you have properly annotated optionals as Union[TheType, None]
return None
......@@ -308,6 +308,9 @@ class DataclassFromToDict:
elif isinstance(value, dict):
return {k: self.dataclass_to_dict(v, depth=depth + 1) for k, v in value.items()}
elif isinstance(value, enum.IntEnum) or isinstance(value, enum.Enum):
return value.value
else:
if self.logger is not None:
self.logger.debug(f"{pre}> Process other type: {type_}")
......
......@@ -27,6 +27,7 @@ dtypes_dict_to_aron = {
str(np.dtype(np.float32)): "float",
str(np.dtype(np.float64)): "double", # alternative: "float64"
str(np.dtype(dtype_rgb)): "16",
str(np.dtype(np.int64)): "int64",
}
......
......@@ -29,7 +29,11 @@ def pythonic_to_aron_ice(
elif isinstance(value, np.int64):
return AronDataIceTypes.long(int(value))
elif isinstance(value, int) or isinstance(value, np.int32):
return AronDataIceTypes.int(int(value))
assert(isinstance(value, int)), value
assert 'invalid value - expected int' not in str(AronIceTypes.int(int(value))), \
f'Casting {value} to int failed. Did you intend to use np.int64 instead, ' \
'but assigned a plain int value, or converted the value somewhere in the meantime?'
return AronIceTypes.int(int(value))
elif isinstance(value, np.float64):
return AronDataIceTypes.double(float(value))
elif isinstance(value, float) or isinstance(value, np.float32):
......@@ -37,7 +41,15 @@ def pythonic_to_aron_ice(
elif isinstance(value, list):
return AronDataIceTypes.list(list(map(pythonic_to_aron_ice, value)))
elif isinstance(value, enum.IntEnum):
assert(isinstance(value.value, int)), f'Value of IntEnum is not an int: {value.value}'
return pythonic_to_aron_ice(value.value) # int
elif isinstance(value, enum.Enum):
if not isinstance(value.value, int):
raise NotImplementedError(
f'enum.Enum is only supported as long as it uses int values, but got value {value.value}. '
'Consider directly using enum.IntEnum.'
)
return pythonic_to_aron_ice(value.value)
elif isinstance(value, dict):
a = AronDataIceTypes.dict({k: pythonic_to_aron_ice(v) for k, v in value.items()})
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment