Skip to content
Snippets Groups Projects
Commit bd9c4717 authored by Rainer Kartmann's avatar Rainer Kartmann
Browse files

Move armarx_face_recognition to...

parent e4fb0aaf
No related branches found
No related tags found
1 merge request!151Move armarx_face_recognition to own package
Showing
with 0 additions and 549 deletions
add_subdirectory(armarx_face_recognition)
import logging
import datetime
import sys
import select
from armarx.robots import A6
from armarx import FramedPositionBase
from .db_model import Person
robot = A6()
logger = logging.getLogger(__name__)
# ToDo: Update to new data types / move data types to armarx-dev.
def report_to_memory(p: Person):
import numpy as np
from armarx_memory.client import MemoryNameSystem
from armarx_memory.segments.PersonInstance import PersonInstanceWriter
from armarx_memory.segments.Person import PersonWriter
mns = MemoryNameSystem.get_mns(logger=logger)
person_writer = PersonWriter.from_mns(mns)
person_id = person_writer.core_segment_id.with_provider_segment_name('FaceRecognition')
person_id = person_id.with_entity_name(str(p))
family_name = p.family_name or ''
person_writer.commit(entity_id=person_id, given_name=p.given_name, family_name=family_name, roles=p.roles.split(','))
writer = PersonInstanceWriter.from_mns(mns)
entity_id = writer.core_segment_id.with_provider_segment_name('FaceRecognition')
entity_id = entity_id.with_entity_name(str(p))
pose = np.identity(4)
pose[0, 3] = p.x
pose[1, 3] = p.y
pose[2, 3] = p.z
pose = pose.astype(np.float32)
writer.commit(entity_id=entity_id, personID=person_id, pose=pose)
def on_detected_faces(detected_faces, time):
logger.info('on_detect (%r)', detected_faces)
now = datetime.datetime.now()
for f in detected_faces:
if f.person.given_name == 'Unknown':
identify_face(f)
report_to_memory(f.person)
else:
report_to_memory(f.person)
check_birthday(f.person)
greet_persons(f.person)
# look_at_person(f.person)
f.person.update(last_seen=now)
def look_at_person(p: Person):
f = FramedPositionBase(p.x, p.y, p.z, frame='Global')
robot.gaze.fixate(f, 1000)
def check_birthday(p: Person):
if not p.birthday:
return
now = datetime.datetime.now()
birthday_in_current_year = p.birthday(year=now.date().year)
if birthday_in_current_year != now.date():
return
if p.birthday_year_congratulated >= now.date().year:
return
robot.say(f'Happy birthday dear {p.given_name}.')
p.birthday_year_congratulated = now.date().year
p.save()
def greet_persons(p: Person):
if not p.given_name:
logger.error('no name set for person %s', p)
return
if not p.last_greeted:
robot.say(f'Hi {p.given_name}!')
p.last_greeted = datetime.datetime.now()
p.save()
elif (datetime.datetime.now() - p.last_greeted).total_seconds() > 120:
robot.say(f'Hi {p.given_name}!')
p.last_greeted = datetime.datetime.now()
p.save()
def identify_face(f):
robot.say("What is your name?")
name = label_face()
if not name:
return
person = Person.select().where(Person.given_name==name).limit(1)
if not person:
person = Person.create(given_name=name)
f.person = person
f.save()
def label_face() -> str:
robot.say("What is your name?")
i, o, e = select.select([sys.stdin], [], [], 5)
if i:
name = sys.stdin.readline().strip()
return name
return None
#!/usr/bin/env python3
import logging
import typing as ty
import numpy as np
from armarx_core import ice_manager
from armarx.parser import ArmarXArgumentParser as ArgumentParser
from armarx_memory.client import MemoryNameSystem
from armarx_face_recognition.face_detection import FaceDetection, Person
logger = logging.getLogger(__name__)
def main():
parser = ArgumentParser("Example Image Provider")
parser.add_argument("-i", "--input-provider", default="OpenNIPointCloudProvider")
parser.add_argument("--depth", action="store_true", help="Use a depth image.")
parser.add_argument("--enable-result-image", action="store_true", help="Enable drawing and providing a result image.")
parser.add_argument("--create-tables", action="store_true", help="Initializes the database.")
parser.add_argument("--drop-tables", action="store_true", help="Drop tables if they already exist.")
parser.add_argument("--images", default="/tmp/*.jpg", help="Known faces.")
args = parser.parse_args()
np.set_printoptions(suppress=True, precision=2)
mns = MemoryNameSystem.wait_for_mns()
persons: ty.List[Person] = FaceDetection.query_human_profiles(mns=mns, logger=logger)
if persons:
logger.info(f"Found {len(persons)} persons with images.")
logger.debug(f"Images: {persons}")
else:
logger.error("No persons with images found. Aborting.")
return
logger.info("Starting image processor")
image_processor = FaceDetection(
args.input_provider,
persons=persons,
has_depth=args.depth,
enable_result_image=args.enable_result_image,
)
image_processor.on_connect()
image_processor.update_calibration()
ice_manager.wait_for_shutdown()
if __name__ == "__main__":
main()
import dataclasses as dc
import numpy as np
import typing as ty
from armarx_memory import client as mem
from armarx_memory.aron.aron_dataclass import AronDataclass
from armarx_memory.aron.conversion import to_aron
from armarx_memory.segments.human.profile import Profile
@dc.dataclass
class Person:
face_encodings: ty.List[np.ndarray] = dc.field(default_factory=list)
profile_id: ty.Optional[mem.MemoryID] = None
profile: ty.Optional[Profile] = None
def name(self) -> str:
try:
return self.profile.names.spoken[0]
except (KeyError, AttributeError):
if self.profile_id is not None:
return self.profile_id.entity_name
else:
return "<unknown>"
@dc.dataclass
class FaceRecognition(AronDataclass):
position_3d: np.ndarray
position_2d: np.ndarray
extents_2d: np.ndarray
profile_id: mem.MemoryID
last_seen_usec: int = -1
last_greeted_usec: int = -1
def to_aron_ice(self):
return to_aron({
"position3D": self.position_3d.astype(np.float32).reshape(3, 1),
"position2D": self.position_2d.astype(np.int32),
"extents2D": self.extents_2d.astype(np.int32),
"profileID": self.profile_id.to_aron() if self.profile_id else None,
})
#!/usr/bin/env python3
import os
import logging
import math
import typing as ty
import Ice
import cv2
import numpy as np
import face_recognition
from armarx_core import slice_loader
from armarx import arviz as viz
from armarx_memory import client as mem
from armarx.pose_helper import convert_position_to_global
from armarx import FramedPositionBase
slice_loader.load_armarx_slice("VisionX", "core/ImageProcessorInterface.ice")
slice_loader.load_armarx_slice("VisionX", "components/Calibration.ice")
from armarx_vision.image_processor import ImageProcessor
from visionx import StereoCalibrationInterfacePrx
from armarx_face_recognition.datatypes import Person, FaceRecognition
class FaceDetection(ImageProcessor):
def __init__(
self,
provider_name: str,
persons: ty.List[Person],
num_result_images=None,
enable_result_image=True,
has_depth=False,
face_recognition_segment_id=mem.MemoryID("Human", "FaceRecognition"),
):
super().__init__(provider_name=provider_name, num_result_images=num_result_images)
self.persons = persons
self.unknown_person = Person()
self.has_depth = has_depth
self.calibration = {
"fx": 500, "fy": 500,
"width": 640, "height": 480,
"horizontal_fov": 0.8, "vertical_fov": 0.9
}
self.enable_result_image = enable_result_image
self.agent_name = "Armar6"
self.camera_frame_name = "AzureKinectCamera"
self.logger = logging.getLogger(__name__)
self.arviz = viz.Client(__name__)
self.mns = mem.MemoryNameSystem.wait_for_mns()
self.face_recognition_segment_id = face_recognition_segment_id
self.face_reco_writer = self.mns.wait_for_writer(face_recognition_segment_id)
@classmethod
def query_human_profiles(
cls,
mns: mem.MemoryNameSystem,
logger=None
):
from armarx_memory.segments.human.profile import ProfileReader
profile_reader = ProfileReader.from_mns(mns)
id_to_profile_dict = profile_reader.query_latest()
persons = []
for id, profile in id_to_profile_dict.items():
if logger:
logger.info(f"Gathering face images of {id} ...")
face_encodings = []
for face_image_path in profile.face_image_paths:
abs_path = face_image_path.get_system_path()
if os.path.isfile(abs_path):
name = profile.names.spoken[0] or id.entity_name
image = face_recognition.load_image_file(abs_path)
encodings_of_faces = face_recognition.face_encodings(image)
if len(encodings_of_faces) > 0:
if logger:
logger.info(f"Loaded image of person '{name}' from '{abs_path}.")
encoding = encodings_of_faces[0]
else:
if logger:
logger.info(f"Found no faces in image '{abs_path}' ...")
face_encodings.append(encoding)
if face_encodings:
persons.append(Person(face_encodings=face_encodings, profile_id=id, profile=profile))
return persons
def update_calibration(self):
image_format = self.image_source.getImageFormat()
width = image_format.dimension.width
height = image_format.dimension.height
proxy = StereoCalibrationInterfacePrx.get_proxy(self.provider_name)
if proxy is not None:
stereo_calibration = proxy.getStereoCalibration()
fx = stereo_calibration.calibrationRight.cameraParam.focalLength[0]
fy = stereo_calibration.calibrationRight.cameraParam.focalLength[1]
calibration = {"fx": fx, "fy": fy, "width": width, "height": height,
"vertical_fov": 2.0 * math.atan(height / (2.0 * fy)),
"horizontal_fov": 2.0 * math.atan(width / (2.0 * fx))}
self.calibration.update(calibration)
def process_images(self, images: np.ndarray, info):
image_rgb = images[0]
face_locations = face_recognition.face_locations(image_rgb)
face_encodings = face_recognition.face_encodings(image_rgb, face_locations)
# self.logger.info("Found %s faces.", len(face_locations))
detected_persons = self.detect_persons(face_encodings)
self.logger.info(f"Detected {len(detected_persons)} persons "
f"({', '.join(p.name() for p in detected_persons)}).")
face_recognitions = self.make_face_recognitions(images, face_locations, detected_persons)
self.commit_recognitions(face_recognitions, info.timeProvided)
if self.enable_result_image:
self.draw_result_image(image_rgb, face_locations, detected_persons)
self.result_image_provider.update_image(images, info.timeProvided)
# It seems we must return images to avoid errors ...
return images, info
def detect_persons(self, face_encodings) -> ty.List[Person]:
face_encoding_index: ty.Dict[int, Person] = {}
known_face_encodings: ty.List[np.ndarray] = []
for person in self.persons:
for encoding in person.face_encodings:
face_encoding_index[len(known_face_encodings)] = person
known_face_encodings.append(encoding)
detected_persons = []
for face_encoding in face_encodings:
matches = face_recognition.compare_faces(face_encoding, known_face_encodings)
face_distances = face_recognition.face_distance(face_encoding, known_face_encodings)
best_match_index = np.argmin(face_distances)
if matches[best_match_index] and face_distances[best_match_index] < 0.5:
person = face_encoding_index[best_match_index]
else:
person = self.unknown_person
detected_persons.append(person)
return detected_persons
def make_face_recognitions(
self,
images,
face_locations,
detected_persons: ty.List[Person],
) -> ty.List[FaceRecognition]:
calibration = self.calibration
scale_x = math.tan(calibration["horizontal_fov"] / 2.0) * 2.0
scale_y = math.tan(calibration["vertical_fov"] / 2.0) * 2.0
face_recognitions = []
for (top, right, bottom, left), person in zip(face_locations, detected_persons):
i = int(left + 0.5 * (right - left))
j = int(top + 0.5 * (bottom - top))
if self.has_depth:
z = float(images[1][j][i][0] + (images[1][j][i][1] << 8) + (images[1][j][i][2] << 16))
else:
z = 1000.0
x = -1.0 * (i - calibration["width"] / 2.0) / calibration["width"] * z * scale_x
y = (calibration["height"] / 2.0 - j) / calibration["height"] * z * scale_y
f = FramedPositionBase(-x, -y, z, self.camera_frame_name, self.agent_name)
try:
global_transform_mat = convert_position_to_global(f)
except AttributeError:
# AttributeError: 'NoneType' object has no attribute 'getSynchronizedRobot'
position_3d = np.array([-x, -y, z])
except Ice.UnknownUserException:
position_3d = np.array([-x, -y, z])
else:
position_3d = global_transform_mat[:3, 3]
position_2d = 0.5 * np.array([left + right, bottom + top])
extents_2d = np.array([right - left, top - bottom])
recognition = FaceRecognition(
position_3d=position_3d,
position_2d=position_2d,
extents_2d=extents_2d,
profile_id=person.profile_id,
)
face_recognitions.append(recognition)
self.visu_pose(person.name(), position_3d)
return face_recognitions
def commit_recognitions(
self,
recognitions: ty.List[FaceRecognition],
time_usec: int
):
commit = mem.Commit()
for reco in recognitions:
update = commit.add()
entity_name = reco.profile_id.entity_name if reco.profile_id else "<unknown>"
update.entity_id = (self.face_recognition_segment_id
.with_provider_segment_name(__name__)
.with_entity_name(entity_name))
update.time_created_usec = time_usec
update.instances_data = [reco.to_aron_ice()]
self.face_reco_writer.commit(commit)
def visu_pose(
self,
person_name,
position: np.ndarray,
):
with self.arviz.begin_stage(commit_on_exit=True) as stage:
layer = stage.layer("Location")
layer.add(viz.Sphere(person_name, position=position, radius=100, color=(255, 0, 255)))
@classmethod
def draw_result_image(
cls,
image_rgb,
face_locations,
detected_persons,
):
for (top, right, bottom, left), person in zip(face_locations, detected_persons):
cv2.rectangle(image_rgb, (left, top), (right, bottom), (0, 0, 255), 2)
cv2.rectangle(image_rgb, (left, bottom - 35), (right, bottom), (0, 0, 255), cv2.FILLED)
cv2.putText(image_rgb, person.name(), (left + 6, bottom - 6),
cv2.FONT_HERSHEY_DUPLEX, 1.0, (255, 255, 255), 1)
import numpy as np
import transforms3d as tf3d
from armarx import RobotStateComponentInterfacePrx
from armarx import FramedPositionBase
from armarx import FramedPoseBase
from armarx import FramedOrientationBase
def pose2mat(pose: FramedPoseBase) -> np.ndarray:
"""
Converts a FramedPoseBase to a homogeneous matrix
:param pose: FramedPoseBase
:return: numpy.ndarry
"""
qw = pose.orientation.qw
qx = pose.orientation.qx
qy = pose.orientation.qy
qz = pose.orientation.qz
rot_mat = tf3d.quaternions.quat2mat([qw, qx, qy, qz])
transform_mat = np.identity(4)
transform_mat[0:3, 0:3] = rot_mat
position = pose.position
transform_mat[0, 3] = position.x
transform_mat[1, 3] = position.y
transform_mat[2, 3] = position.z
return transform_mat
def convert_position_to_global(f: FramedPositionBase) -> np.ndarray:
pose = FramedPoseBase(position=f, orientation=FramedOrientationBase(), frame=f.frame, agent=f.agent)
print(pose)
return convert_pose_to_global(pose)
def convert_pose_to_global(f: FramedPoseBase) -> np.ndarray:
robot_state = RobotStateComponentInterfacePrx.get_proxy()
current_robot_state = robot_state.getSynchronizedRobot()
robot_pose = current_robot_state.getGlobalPose()
robot_node = current_robot_state.getRobotNode(f.frame).getPoseInRootFrame()
transform = pose2mat(f)
transform_robot_node_to_root = pose2mat(robot_node)
transform_root_to_global = pose2mat(robot_pose)
return np.dot(transform_root_to_global, np.dot(transform_robot_node_to_root, transform))
[tool.poetry]
name = "armarx_face_recognition"
version = "0.1.0"
description = ""
authors = ["Markus Grotz <markus dot grotz at kit dot edu>", "Rainer Kartmann <rainer dot kartmann at kit dot edu>"]
[tool.poetry.dependencies]
python = "^3.6.9"
# armarx-dev = "^0.16.2"
# armarx-dev = { path="../../../python3-armarx/", develop=true } # "^0.16.2"
ipython = "5.5"
numpy = "*"
face-recognition = "^1.3.0"
opencv-python = "4.3.0.36"
peewee = "^3.14.4"
transforms3d = "*"
pyzbar = "^0.1.8"
cbor2 = "^5.4.2"
base45 = "^0.4.3"
matplotlib = "2.1"
dataclasses = { version = "^0.8", python = "~3.6" }
[tool.poetry.dev-dependencies]
pytest = "^5.2"
[build-system]
# requires = ["setuptools", "wheel"]
# build-backend = "setuptools.build_meta"
requires = ["poetry-core@https://github.com/python-poetry/poetry-core/archive/325312c016d69189ac93c945ba0c1b69296c5e54.zip"]
build-backend = "poetry.core.masonry.api"
[[tool.poetry.source]]
name = "h2t"
url = "https://pypi.humanoids.kit.edu/simple/"
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment