Skip to content
Snippets Groups Projects
Commit 428d3c18 authored by Rainer Kartmann's avatar Rainer Kartmann
Browse files

Merge branch 'master' into feature/name-phrase-grounding

parents cbe7e561 42a31e19
No related branches found
No related tags found
1 merge request!30Changes for Name phrase grounding
Showing
with 228 additions and 33 deletions
......@@ -12,3 +12,4 @@ __pycache__
.venv
armarx_dev.egg-info/
docs/_build/
......@@ -62,7 +62,7 @@ class Element:
if isinstance(value, Transform):
value = value.transform
value = self._to_array_checked(value, (4, 4), "pose matrix", dtype=np.float)
value = self._to_array_checked(value, (4, 4), "pose matrix", dtype=float)
self._pose = value
@property
......@@ -72,7 +72,7 @@ class Element:
@position.setter
def position(self, value):
value = self._to_array_checked(
value, [(3,), (3, 1)], "position vector", dtype=np.float
value, [(3,), (3, 1)], "position vector", dtype=float
)
self._pose[:3, 3] = value
......@@ -99,7 +99,7 @@ class Element:
def ori_mat(self, value):
"""Set the orientation as 3x3 rotation matrix."""
value = self._to_array_checked(
value, (3, 3), "orientation matrix", dtype=np.float
value, (3, 3), "orientation matrix", dtype=float
)
self._pose[:3, :3] = value
......@@ -291,7 +291,7 @@ class Element:
return np.all(
np.logical_or(
np.array(shape) == shape_pattern,
np.isnan(np.array(shape_pattern, dtype=np.float)),
np.isnan(np.array(shape_pattern, dtype=float)),
)
)
......
......@@ -7,6 +7,7 @@ Please note that the numpy array's data type must be np.float32
import numpy as np
import transforms3d as tf3d
import typing as ty
from armarx import slice_loader
......@@ -58,20 +59,14 @@ def pose2mat(pose: FramedPoseBase) -> np.ndarray:
return transform_mat
def convert_position_to_global(f: FramedPositionBase) -> np.ndarray:
def convert_position_to_global(
f: FramedPositionBase,
robot_state_component: ty.Optional[RobotStateComponentInterfacePrx] = None,
) -> np.ndarray:
pose = FramedPoseBase(
position=f, orientation=FramedOrientationBase(), frame=f.frame, agent=f.agent
)
return convert_pose_to_global(pose)
def convert_mat_to_global(pose: np.ndarray, frame: str) -> np.ndarray:
robot_state = RobotStateComponentInterfacePrx.get_proxy()
current_robot_state = robot_state.getSynchronizedRobot()
robot_pose = current_robot_state.getGlobalPose()
robot_node = current_robot_state.getRobotNode(frame).getPoseInRootFrame()
transform_robot_node_to_root = pose2mat(robot_node)
transform_root_to_global = pose2mat(robot_pose)
return convert_pose_to_global(pose, robot_state_component=robot_state_component)
def inv(pose: np.ndarray) -> np.ndarray:
......@@ -84,15 +79,20 @@ def inv(pose: np.ndarray) -> np.ndarray:
return inv_pose
def robot_state(timestamp: int = None):
def robot_state(
timestamp: int = None,
robot_state_component: ty.Optional[RobotStateComponentInterfacePrx] = None,
):
"""
Convenience method to get the robot state. If not timestamp is given return
the current state
"""
robot_state = RobotStateComponentInterfacePrx.get_proxy()
if robot_state_component is None:
robot_state_component = RobotStateComponentInterfacePrx.get_proxy()
if timestamp:
return robot_state.getRobotSnapshotAtTimestamp(timestamp)
return robot_state.getSynchronizedRobot()
return robot_state_component.getRobotSnapshotAtTimestamp(timestamp)
return robot_state_component.getSynchronizedRobot()
def convert_mat_to_robot_node(
......@@ -110,9 +110,10 @@ def convert_mat_to_robot_node(
def convert_mat_to_global(
pose: np.ndarray, frame: str, timestamp: int = None
pose: np.ndarray, frame: str, timestamp: int = None,
robot_state_component: ty.Optional[RobotStateComponentInterfacePrx] = None,
) -> np.ndarray:
current_robot_state = robot_state(timestamp)
current_robot_state = robot_state(timestamp, robot_state_component=robot_state_component)
robot_node = current_robot_state.getRobotNode(frame)
robot_node_pose = pose2mat(robot_node.getGlobalPose())
return np.dot(robot_node_pose, pose)
......@@ -131,7 +132,10 @@ def convert_mat_to_root(
return np.dot(transform_robot_node_to_root, pose)
def convert_pose_to_global(f: FramedPoseBase) -> np.ndarray:
def convert_pose_to_global(
f: FramedPoseBase,
robot_state_component: ty.Optional[RobotStateComponentInterfacePrx] = None,
) -> np.ndarray:
"""
Converts a armarx.FramedPoseBase to a numpy array in the global coordinate
system
......@@ -140,7 +144,7 @@ def convert_pose_to_global(f: FramedPoseBase) -> np.ndarray:
transform = pose2mat(f)
if f.frame == "Global" or f.frame == "armarx::Global":
return transform
return convert_mat_to_global(transform, f.frame)
return convert_mat_to_global(transform, f.frame, robot_state_component=robot_state_component)
def convert_pose_to_root(f: FramedPoseBase) -> np.ndarray:
......
......@@ -18,7 +18,7 @@ def _load_config() -> configparser.ConfigParser:
"""
config_file = os.path.join(_get_config_dir(), "armarx.ini")
if not os.path.exists(config_file):
raise FileNotFoundError("ArmarX config file does not exists.")
raise FileNotFoundError(f"ArmarX config file '{config_file}' does not exists.")
config_parser = configparser.ConfigParser()
config_parser.read(config_file)
return config_parser
......
import os
import time
import logging
from typing import Any
from typing import TypeVar
import typing as ty
from functools import lru_cache
......@@ -23,7 +21,7 @@ from .name_helper import get_ice_default_name
logger = logging.getLogger(__name__)
T = TypeVar("T")
T = ty.TypeVar("T")
def register_object(
......@@ -165,12 +163,13 @@ def wait_for_proxy(cls, proxy_name: str = None, timeout: int = 0):
time.sleep(0.1)
def get_proxy(cls: T, proxy_name: str = None) -> T:
def get_proxy(cls: T, proxy_name: str = None, not_exist_ok=False) -> ty.Optional[T]:
"""
Connects to a proxy.
:param cls: the class definition of an ArmarXComponent
:param proxy_name: name of the proxy
:param not_exist_ok: If true, do not print an error if the proxy does not exist.
:type proxy_name: str
:returns: the retrieved proxy
:rtype: an instance of cls
......@@ -181,7 +180,10 @@ def get_proxy(cls: T, proxy_name: str = None) -> T:
proxy = freezer().communicator.stringToProxy(proxy_name)
return cls.checkedCast(proxy)
except NotRegisteredException:
logging.exception("Proxy %s does not exist", proxy_name)
if not not_exist_ok:
logging.exception("Proxy %s does not exist", proxy_name)
return None
def get_admin():
......
......@@ -12,7 +12,7 @@ class PackagePath(AronDataclass):
def get_system_path(self) -> str:
import os
from armarx import cmake_helper
from armarx_core import cmake_helper
[data_path] = cmake_helper.get_data_path(self.package)
abs_path = os.path.join(data_path, self.package, self.path)
......
......@@ -65,7 +65,7 @@ class MemoryNameSystem:
)
if logger is not None:
logger.info("Done.")
logger.info("Connected to Memory Name System.")
return MemoryNameSystem(mns_proxy, **kwargs)
......
import socket
import time
import armarx
from armarx.ice_conv.ice_converter import IceConverter
from armarx_core import slice_loader
......
from armarx_skills.slice import load_slice
load_slice()
from armarx.skills.callback.dti import SkillProviderCallbackInterface
from armarx.skills.callback.dti import SkillProviderCallbackInterfacePrx
from armarx_skills.slice import load_slice
load_slice()
from armarx.skills.manager.dti import SkillManagerInterface
from armarx.skills.manager.dti import SkillManagerInterfacePrx
import typing as ty
from armarx_skills.slice import load_slice
from armarx_skills.provider.dto import SkillDescriptionMap
from armarx_skills.provider.dto import SkillStatusUpdateMap
load_slice()
from armarx.skills.manager.dto import SkillExecutionRequest
from armarx.skills.manager.dto import ProviderInfo
SkillDescriptionMapMap = ty.Dict[str, SkillDescriptionMap]
SkillStatusUpdateMapMap = ty.Dict[str, SkillStatusUpdateMap]
import dataclasses as dc
import typing as ty
from armarx_skills.manager import dto
from armarx_memory.ice_conv.ice_converter import IceConverter
from armarx_memory.aron.conversion import pythonic_from_to_aron_ice
from armarx_skills.provider.skill_id import SkillID, SkillIdConv
@dc.dataclass
class SkillExecutionRequest:
executor_name: str
skill_id: SkillID
params: ty.Dict[str, ty.Any]
class SkillExecutionRequestConv(IceConverter):
def __init__(self):
super().__init__()
self.skill_id_conv = SkillIdConv()
@classmethod
def _import_dto(cls):
return dto.SkillExecutionRequest
def _from_ice(self, dt: dto.SkillExecutionRequest) -> SkillExecutionRequest:
return SkillExecutionRequest(
executor_name=dt.executorName,
skill_id=self.skill_id_conv.from_ice(dt.skillId),
params=pythonic_from_to_aron_ice.pythonic_from_aron_ice(dt.params),
)
def _to_ice(self, bo: SkillExecutionRequest) -> dto.SkillExecutionRequest:
return dto.SkillExecutionRequest(
executorName=bo.executor_name,
skillId=self.skill_id_conv.to_ice(bo.skill_id),
params=pythonic_from_to_aron_ice.pythonic_to_aron_ice(bo.params),
)
import typing as ty
from armarx_core import ice_manager
from armarx_skills.manager import dti, dto
from armarx_skills.manager import skill_execution_request
from armarx_skills.provider import dto as provider_dto
from armarx_skills.provider import skill_status_update
class SkillManager:
DEFAULT_ICE_OBJECT_NAME = "SkillMemory"
def __init__(
self,
proxy: dti.SkillManagerInterfacePrx = None,
):
self.proxy = proxy
self.skill_execution_request_conv = skill_execution_request.SkillExecutionRequestConv()
self.skill_status_update_conv = skill_status_update.SkillStatusUpdateConv()
@classmethod
def wait_for_manager(cls, name=DEFAULT_ICE_OBJECT_NAME) -> "SkillManager":
return cls(proxy=ice_manager.wait_for_proxy(dti.SkillManagerInterfacePrx, name))
def add_provider(self, provider_info: dto.ProviderInfo) -> None:
return self.proxy.addProvider(provider_info)
def remove_provider(self, provider_info: dto.ProviderInfo) -> None:
return self.proxy.removeProvider(providerInfo=provider_info)
def get_skill_descriptions(self) -> dto.SkillDescriptionMapMap:
return self.proxy.getSkillDescriptions()
def get_skill_execution_statuses(self) -> dto.SkillStatusUpdateMapMap:
return self.proxy.getSkillExecutionStatuses()
def execute_skill(
self,
request: ty.Union[skill_execution_request.SkillExecutionRequest, dto.SkillExecutionRequest],
) -> skill_status_update.SkillStatusUpdate:
request = self.skill_execution_request_conv.to_ice(request)
update: provider_dto.SkillStatusUpdate = self.proxy.executeSkill(skillExecutionInfo=request)
return self.skill_status_update_conv.from_ice(update)
def abort_skill(self, provider_name: str, skill_name: str) -> None:
return self.proxy.abortSkill(providerName=provider_name, skillName=skill_name)
if __name__ == '__main__':
def test_main():
manager = SkillManager.wait_for_manager()
manager.get_skill_execution_statuses()
test_main()
from armarx_skills.slice import load_slice
load_slice()
from armarx.skills.provider.dti import SkillProviderInterface
from armarx.skills.provider.dti import SkillProviderInterfacePrx
import typing as ty
from armarx_skills.slice import load_slice
load_slice()
from armarx.skills.provider.dto import \
SkillID, SkillDescription, SkillExecutionRequest, SkillStatusUpdateHeader, SkillStatusUpdate
from armarx.skills.provider.dto.Execution import Status
SkillDescriptionMap = ty.Dict[str, SkillDescription]
SkillStatusUpdateMap = ty.Dict[str, SkillStatusUpdate]
import dataclasses as dc
import typing as ty
from armarx_skills.callback import dti as callback_dti
from armarx_skills.provider import dti as provider_dto
from armarx_skills.provider import dto as provider_dto
from armarx_memory.ice_conv.ice_converter import IceConverter
from armarx_memory.aron.conversion import pythonic_from_to_aron_ice
@dc.dataclass
class SkillExecutionRequest:
skill_name: str
executor_name: str
params: ty.Dict[str, ty.Any]
callback_interface: ty.Optional[callback_dti.SkillProviderCallbackInterfacePrx] = None
class SkillExecutionRequestConv(IceConverter):
@classmethod
def _import_dto(cls):
return provider_dto.SkillExecutionRequest
def _from_ice(self, dt: provider_dto.SkillExecutionRequest) -> SkillExecutionRequest:
return SkillExecutionRequest(
skill_name=dt.skillName,
executor_name=dt.executorName,
params=pythonic_from_to_aron_ice.pythonic_from_aron_ice(dt.params),
callback_interface=dt.callbackInterface,
)
def _to_ice(self, bo: SkillExecutionRequest) -> provider_dto.SkillExecutionRequest:
return provider_dto.SkillExecutionRequest(
skillName=bo.skill_name,
executorName=bo.executor_name,
params=pythonic_from_to_aron_ice.pythonic_to_aron_ice(bo.params),
callbackInterface=bo.callback_interface,
)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment