diff --git a/python/CMakeLists.txt b/python/CMakeLists.txt deleted file mode 100644 index e7c16117d55773772389038f54366f01a164195b..0000000000000000000000000000000000000000 --- a/python/CMakeLists.txt +++ /dev/null @@ -1 +0,0 @@ -add_subdirectory(armarx_face_recognition) diff --git a/python/armarx_face_recognition/CMakeLists.txt b/python/armarx_face_recognition/CMakeLists.txt deleted file mode 100644 index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..0000000000000000000000000000000000000000 diff --git a/python/armarx_face_recognition/README.rst b/python/armarx_face_recognition/README.rst deleted file mode 100644 index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..0000000000000000000000000000000000000000 diff --git a/python/armarx_face_recognition/armarx_face_recognition/__init__.py b/python/armarx_face_recognition/armarx_face_recognition/__init__.py deleted file mode 100644 index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..0000000000000000000000000000000000000000 diff --git a/python/armarx_face_recognition/armarx_face_recognition/actions.py b/python/armarx_face_recognition/armarx_face_recognition/actions.py deleted file mode 100644 index 7fd8e3d8234500b6acd0f42a0a02e07e7af60ce0..0000000000000000000000000000000000000000 --- a/python/armarx_face_recognition/armarx_face_recognition/actions.py +++ /dev/null @@ -1,112 +0,0 @@ -import logging -import datetime -import sys -import select - -from armarx.robots import A6 -from armarx import FramedPositionBase - -from .db_model import Person - -robot = A6() -logger = logging.getLogger(__name__) - - -# ToDo: Update to new data types / move data types to armarx-dev. - - -def report_to_memory(p: Person): - import numpy as np - from armarx_memory.client import MemoryNameSystem - from armarx_memory.segments.PersonInstance import PersonInstanceWriter - from armarx_memory.segments.Person import PersonWriter - - mns = MemoryNameSystem.get_mns(logger=logger) - - person_writer = PersonWriter.from_mns(mns) - person_id = person_writer.core_segment_id.with_provider_segment_name('FaceRecognition') - person_id = person_id.with_entity_name(str(p)) - family_name = p.family_name or '' - person_writer.commit(entity_id=person_id, given_name=p.given_name, family_name=family_name, roles=p.roles.split(',')) - - writer = PersonInstanceWriter.from_mns(mns) - entity_id = writer.core_segment_id.with_provider_segment_name('FaceRecognition') - entity_id = entity_id.with_entity_name(str(p)) - pose = np.identity(4) - pose[0, 3] = p.x - pose[1, 3] = p.y - pose[2, 3] = p.z - pose = pose.astype(np.float32) - writer.commit(entity_id=entity_id, personID=person_id, pose=pose) - - -def on_detected_faces(detected_faces, time): - logger.info('on_detect (%r)', detected_faces) - now = datetime.datetime.now() - for f in detected_faces: - if f.person.given_name == 'Unknown': - identify_face(f) - - report_to_memory(f.person) - else: - - report_to_memory(f.person) - check_birthday(f.person) - greet_persons(f.person) - # look_at_person(f.person) - f.person.update(last_seen=now) - - -def look_at_person(p: Person): - f = FramedPositionBase(p.x, p.y, p.z, frame='Global') - robot.gaze.fixate(f, 1000) - - -def check_birthday(p: Person): - if not p.birthday: - return - now = datetime.datetime.now() - birthday_in_current_year = p.birthday(year=now.date().year) - if birthday_in_current_year != now.date(): - return - if p.birthday_year_congratulated >= now.date().year: - return - robot.say(f'Happy birthday dear {p.given_name}.') - p.birthday_year_congratulated = now.date().year - p.save() - - -def greet_persons(p: Person): - if not p.given_name: - logger.error('no name set for person %s', p) - return - if not p.last_greeted: - robot.say(f'Hi {p.given_name}!') - p.last_greeted = datetime.datetime.now() - p.save() - elif (datetime.datetime.now() - p.last_greeted).total_seconds() > 120: - robot.say(f'Hi {p.given_name}!') - p.last_greeted = datetime.datetime.now() - p.save() - - -def identify_face(f): - robot.say("What is your name?") - - name = label_face() - if not name: - return - person = Person.select().where(Person.given_name==name).limit(1) - if not person: - person = Person.create(given_name=name) - f.person = person - f.save() - - -def label_face() -> str: - robot.say("What is your name?") - i, o, e = select.select([sys.stdin], [], [], 5) - if i: - name = sys.stdin.readline().strip() - return name - return None diff --git a/python/armarx_face_recognition/armarx_face_recognition/app/__init__.py b/python/armarx_face_recognition/armarx_face_recognition/app/__init__.py deleted file mode 100644 index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..0000000000000000000000000000000000000000 diff --git a/python/armarx_face_recognition/armarx_face_recognition/app/memory_face_recognition.py b/python/armarx_face_recognition/armarx_face_recognition/app/memory_face_recognition.py deleted file mode 100755 index daf1ae0d1b18d5c1006fd9a5a36c4f5c235a186d..0000000000000000000000000000000000000000 --- a/python/armarx_face_recognition/armarx_face_recognition/app/memory_face_recognition.py +++ /dev/null @@ -1,54 +0,0 @@ -#!/usr/bin/env python3 - -import logging -import typing as ty - -import numpy as np - -from armarx_core import ice_manager -from armarx.parser import ArmarXArgumentParser as ArgumentParser -from armarx_memory.client import MemoryNameSystem - -from armarx_face_recognition.face_detection import FaceDetection, Person - - -logger = logging.getLogger(__name__) - - -def main(): - parser = ArgumentParser("Example Image Provider") - parser.add_argument("-i", "--input-provider", default="OpenNIPointCloudProvider") - parser.add_argument("--depth", action="store_true", help="Use a depth image.") - parser.add_argument("--enable-result-image", action="store_true", help="Enable drawing and providing a result image.") - parser.add_argument("--create-tables", action="store_true", help="Initializes the database.") - parser.add_argument("--drop-tables", action="store_true", help="Drop tables if they already exist.") - parser.add_argument("--images", default="/tmp/*.jpg", help="Known faces.") - args = parser.parse_args() - - np.set_printoptions(suppress=True, precision=2) - - mns = MemoryNameSystem.wait_for_mns() - - persons: ty.List[Person] = FaceDetection.query_human_profiles(mns=mns, logger=logger) - if persons: - logger.info(f"Found {len(persons)} persons with images.") - logger.debug(f"Images: {persons}") - else: - logger.error("No persons with images found. Aborting.") - return - - logger.info("Starting image processor") - image_processor = FaceDetection( - args.input_provider, - persons=persons, - has_depth=args.depth, - enable_result_image=args.enable_result_image, - ) - image_processor.on_connect() - image_processor.update_calibration() - - ice_manager.wait_for_shutdown() - - -if __name__ == "__main__": - main() diff --git a/python/armarx_face_recognition/armarx_face_recognition/datatypes.py b/python/armarx_face_recognition/armarx_face_recognition/datatypes.py deleted file mode 100644 index 303b0f075f119776a9b3bd067a20dd65bd4031f9..0000000000000000000000000000000000000000 --- a/python/armarx_face_recognition/armarx_face_recognition/datatypes.py +++ /dev/null @@ -1,47 +0,0 @@ -import dataclasses as dc -import numpy as np -import typing as ty - -from armarx_memory import client as mem -from armarx_memory.aron.aron_dataclass import AronDataclass -from armarx_memory.aron.conversion import to_aron -from armarx_memory.segments.human.profile import Profile - - -@dc.dataclass -class Person: - - face_encodings: ty.List[np.ndarray] = dc.field(default_factory=list) - - profile_id: ty.Optional[mem.MemoryID] = None - profile: ty.Optional[Profile] = None - - def name(self) -> str: - try: - return self.profile.names.spoken[0] - except (KeyError, AttributeError): - if self.profile_id is not None: - return self.profile_id.entity_name - else: - return "<unknown>" - - -@dc.dataclass -class FaceRecognition(AronDataclass): - - position_3d: np.ndarray - position_2d: np.ndarray - extents_2d: np.ndarray - - profile_id: mem.MemoryID - - last_seen_usec: int = -1 - last_greeted_usec: int = -1 - - def to_aron_ice(self): - return to_aron({ - "position3D": self.position_3d.astype(np.float32).reshape(3, 1), - "position2D": self.position_2d.astype(np.int32), - "extents2D": self.extents_2d.astype(np.int32), - "profileID": self.profile_id.to_aron() if self.profile_id else None, - }) diff --git a/python/armarx_face_recognition/armarx_face_recognition/face_detection.py b/python/armarx_face_recognition/armarx_face_recognition/face_detection.py deleted file mode 100755 index 431a0b3ac807a5e6a6c2786ba8cea6c0eb001b3f..0000000000000000000000000000000000000000 --- a/python/armarx_face_recognition/armarx_face_recognition/face_detection.py +++ /dev/null @@ -1,249 +0,0 @@ -#!/usr/bin/env python3 - -import os -import logging -import math -import typing as ty - -import Ice -import cv2 -import numpy as np -import face_recognition - -from armarx_core import slice_loader - -from armarx import arviz as viz -from armarx_memory import client as mem - -from armarx.pose_helper import convert_position_to_global -from armarx import FramedPositionBase - -slice_loader.load_armarx_slice("VisionX", "core/ImageProcessorInterface.ice") -slice_loader.load_armarx_slice("VisionX", "components/Calibration.ice") -from armarx_vision.image_processor import ImageProcessor -from visionx import StereoCalibrationInterfacePrx - -from armarx_face_recognition.datatypes import Person, FaceRecognition - - -class FaceDetection(ImageProcessor): - - def __init__( - self, - provider_name: str, - persons: ty.List[Person], - num_result_images=None, - enable_result_image=True, - has_depth=False, - face_recognition_segment_id=mem.MemoryID("Human", "FaceRecognition"), - ): - super().__init__(provider_name=provider_name, num_result_images=num_result_images) - - self.persons = persons - self.unknown_person = Person() - - self.has_depth = has_depth - self.calibration = { - "fx": 500, "fy": 500, - "width": 640, "height": 480, - "horizontal_fov": 0.8, "vertical_fov": 0.9 - } - - self.enable_result_image = enable_result_image - - self.agent_name = "Armar6" - self.camera_frame_name = "AzureKinectCamera" - - self.logger = logging.getLogger(__name__) - self.arviz = viz.Client(__name__) - - self.mns = mem.MemoryNameSystem.wait_for_mns() - self.face_recognition_segment_id = face_recognition_segment_id - self.face_reco_writer = self.mns.wait_for_writer(face_recognition_segment_id) - - - @classmethod - def query_human_profiles( - cls, - mns: mem.MemoryNameSystem, - logger=None - ): - from armarx_memory.segments.human.profile import ProfileReader - - profile_reader = ProfileReader.from_mns(mns) - id_to_profile_dict = profile_reader.query_latest() - - persons = [] - for id, profile in id_to_profile_dict.items(): - if logger: - logger.info(f"Gathering face images of {id} ...") - - face_encodings = [] - for face_image_path in profile.face_image_paths: - abs_path = face_image_path.get_system_path() - if os.path.isfile(abs_path): - name = profile.names.spoken[0] or id.entity_name - image = face_recognition.load_image_file(abs_path) - encodings_of_faces = face_recognition.face_encodings(image) - if len(encodings_of_faces) > 0: - if logger: - logger.info(f"Loaded image of person '{name}' from '{abs_path}.") - encoding = encodings_of_faces[0] - else: - if logger: - logger.info(f"Found no faces in image '{abs_path}' ...") - - face_encodings.append(encoding) - - if face_encodings: - persons.append(Person(face_encodings=face_encodings, profile_id=id, profile=profile)) - - return persons - - def update_calibration(self): - image_format = self.image_source.getImageFormat() - - width = image_format.dimension.width - height = image_format.dimension.height - - proxy = StereoCalibrationInterfacePrx.get_proxy(self.provider_name) - if proxy is not None: - stereo_calibration = proxy.getStereoCalibration() - fx = stereo_calibration.calibrationRight.cameraParam.focalLength[0] - fy = stereo_calibration.calibrationRight.cameraParam.focalLength[1] - calibration = {"fx": fx, "fy": fy, "width": width, "height": height, - "vertical_fov": 2.0 * math.atan(height / (2.0 * fy)), - "horizontal_fov": 2.0 * math.atan(width / (2.0 * fx))} - - self.calibration.update(calibration) - - def process_images(self, images: np.ndarray, info): - image_rgb = images[0] - - face_locations = face_recognition.face_locations(image_rgb) - face_encodings = face_recognition.face_encodings(image_rgb, face_locations) - # self.logger.info("Found %s faces.", len(face_locations)) - - detected_persons = self.detect_persons(face_encodings) - self.logger.info(f"Detected {len(detected_persons)} persons " - f"({', '.join(p.name() for p in detected_persons)}).") - - face_recognitions = self.make_face_recognitions(images, face_locations, detected_persons) - - self.commit_recognitions(face_recognitions, info.timeProvided) - - if self.enable_result_image: - self.draw_result_image(image_rgb, face_locations, detected_persons) - self.result_image_provider.update_image(images, info.timeProvided) - - # It seems we must return images to avoid errors ... - return images, info - - def detect_persons(self, face_encodings) -> ty.List[Person]: - face_encoding_index: ty.Dict[int, Person] = {} - known_face_encodings: ty.List[np.ndarray] = [] - for person in self.persons: - for encoding in person.face_encodings: - face_encoding_index[len(known_face_encodings)] = person - known_face_encodings.append(encoding) - - detected_persons = [] - for face_encoding in face_encodings: - matches = face_recognition.compare_faces(face_encoding, known_face_encodings) - face_distances = face_recognition.face_distance(face_encoding, known_face_encodings) - best_match_index = np.argmin(face_distances) - - if matches[best_match_index] and face_distances[best_match_index] < 0.5: - person = face_encoding_index[best_match_index] - else: - person = self.unknown_person - detected_persons.append(person) - - return detected_persons - - def make_face_recognitions( - self, - images, - face_locations, - detected_persons: ty.List[Person], - ) -> ty.List[FaceRecognition]: - calibration = self.calibration - scale_x = math.tan(calibration["horizontal_fov"] / 2.0) * 2.0 - scale_y = math.tan(calibration["vertical_fov"] / 2.0) * 2.0 - - face_recognitions = [] - for (top, right, bottom, left), person in zip(face_locations, detected_persons): - i = int(left + 0.5 * (right - left)) - j = int(top + 0.5 * (bottom - top)) - if self.has_depth: - z = float(images[1][j][i][0] + (images[1][j][i][1] << 8) + (images[1][j][i][2] << 16)) - else: - z = 1000.0 - x = -1.0 * (i - calibration["width"] / 2.0) / calibration["width"] * z * scale_x - y = (calibration["height"] / 2.0 - j) / calibration["height"] * z * scale_y - - f = FramedPositionBase(-x, -y, z, self.camera_frame_name, self.agent_name) - try: - global_transform_mat = convert_position_to_global(f) - except AttributeError: - # AttributeError: 'NoneType' object has no attribute 'getSynchronizedRobot' - position_3d = np.array([-x, -y, z]) - except Ice.UnknownUserException: - position_3d = np.array([-x, -y, z]) - else: - position_3d = global_transform_mat[:3, 3] - - position_2d = 0.5 * np.array([left + right, bottom + top]) - extents_2d = np.array([right - left, top - bottom]) - - recognition = FaceRecognition( - position_3d=position_3d, - position_2d=position_2d, - extents_2d=extents_2d, - profile_id=person.profile_id, - ) - face_recognitions.append(recognition) - - self.visu_pose(person.name(), position_3d) - - return face_recognitions - - def commit_recognitions( - self, - recognitions: ty.List[FaceRecognition], - time_usec: int - ): - commit = mem.Commit() - for reco in recognitions: - update = commit.add() - entity_name = reco.profile_id.entity_name if reco.profile_id else "<unknown>" - update.entity_id = (self.face_recognition_segment_id - .with_provider_segment_name(__name__) - .with_entity_name(entity_name)) - update.time_created_usec = time_usec - update.instances_data = [reco.to_aron_ice()] - - self.face_reco_writer.commit(commit) - - def visu_pose( - self, - person_name, - position: np.ndarray, - ): - with self.arviz.begin_stage(commit_on_exit=True) as stage: - layer = stage.layer("Location") - layer.add(viz.Sphere(person_name, position=position, radius=100, color=(255, 0, 255))) - - @classmethod - def draw_result_image( - cls, - image_rgb, - face_locations, - detected_persons, - ): - for (top, right, bottom, left), person in zip(face_locations, detected_persons): - cv2.rectangle(image_rgb, (left, top), (right, bottom), (0, 0, 255), 2) - cv2.rectangle(image_rgb, (left, bottom - 35), (right, bottom), (0, 0, 255), cv2.FILLED) - - cv2.putText(image_rgb, person.name(), (left + 6, bottom - 6), - cv2.FONT_HERSHEY_DUPLEX, 1.0, (255, 255, 255), 1) diff --git a/python/armarx_face_recognition/armarx_face_recognition/helper.py b/python/armarx_face_recognition/armarx_face_recognition/helper.py deleted file mode 100644 index d2c55e163f3174efb042015313695d51b6a9c3c5..0000000000000000000000000000000000000000 --- a/python/armarx_face_recognition/armarx_face_recognition/helper.py +++ /dev/null @@ -1,50 +0,0 @@ -import numpy as np -import transforms3d as tf3d - -from armarx import RobotStateComponentInterfacePrx -from armarx import FramedPositionBase -from armarx import FramedPoseBase -from armarx import FramedOrientationBase - - -def pose2mat(pose: FramedPoseBase) -> np.ndarray: - """ - Converts a FramedPoseBase to a homogeneous matrix - - :param pose: FramedPoseBase - :return: numpy.ndarry - """ - qw = pose.orientation.qw - qx = pose.orientation.qx - qy = pose.orientation.qy - qz = pose.orientation.qz - rot_mat = tf3d.quaternions.quat2mat([qw, qx, qy, qz]) - transform_mat = np.identity(4) - transform_mat[0:3, 0:3] = rot_mat - position = pose.position - transform_mat[0, 3] = position.x - transform_mat[1, 3] = position.y - transform_mat[2, 3] = position.z - - return transform_mat - - -def convert_position_to_global(f: FramedPositionBase) -> np.ndarray: - pose = FramedPoseBase(position=f, orientation=FramedOrientationBase(), frame=f.frame, agent=f.agent) - print(pose) - return convert_pose_to_global(pose) - - -def convert_pose_to_global(f: FramedPoseBase) -> np.ndarray: - robot_state = RobotStateComponentInterfacePrx.get_proxy() - current_robot_state = robot_state.getSynchronizedRobot() - robot_pose = current_robot_state.getGlobalPose() - robot_node = current_robot_state.getRobotNode(f.frame).getPoseInRootFrame() - - transform = pose2mat(f) - transform_robot_node_to_root = pose2mat(robot_node) - - transform_root_to_global = pose2mat(robot_pose) - - return np.dot(transform_root_to_global, np.dot(transform_robot_node_to_root, transform)) - diff --git a/python/armarx_face_recognition/pyproject.toml b/python/armarx_face_recognition/pyproject.toml deleted file mode 100644 index 28f388f58ae6d32007da43441bd5db6727869952..0000000000000000000000000000000000000000 --- a/python/armarx_face_recognition/pyproject.toml +++ /dev/null @@ -1,36 +0,0 @@ -[tool.poetry] -name = "armarx_face_recognition" -version = "0.1.0" -description = "" -authors = ["Markus Grotz <markus dot grotz at kit dot edu>", "Rainer Kartmann <rainer dot kartmann at kit dot edu>"] - - -[tool.poetry.dependencies] -python = "^3.6.9" -# armarx-dev = "^0.16.2" -# armarx-dev = { path="../../../python3-armarx/", develop=true } # "^0.16.2" -ipython = "5.5" -numpy = "*" -face-recognition = "^1.3.0" -opencv-python = "4.3.0.36" -peewee = "^3.14.4" -transforms3d = "*" -pyzbar = "^0.1.8" -cbor2 = "^5.4.2" -base45 = "^0.4.3" -matplotlib = "2.1" -dataclasses = { version = "^0.8", python = "~3.6" } - - -[tool.poetry.dev-dependencies] -pytest = "^5.2" - -[build-system] -# requires = ["setuptools", "wheel"] -# build-backend = "setuptools.build_meta" -requires = ["poetry-core@https://github.com/python-poetry/poetry-core/archive/325312c016d69189ac93c945ba0c1b69296c5e54.zip"] -build-backend = "poetry.core.masonry.api" - -[[tool.poetry.source]] -name = "h2t" -url = "https://pypi.humanoids.kit.edu/simple/" diff --git a/python/armarx_face_recognition/tests/__init__.py b/python/armarx_face_recognition/tests/__init__.py deleted file mode 100644 index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..0000000000000000000000000000000000000000