Skip to content
Snippets Groups Projects
Commit bc61fcd7 authored by Rainer Kartmann's avatar Rainer Kartmann
Browse files

Integrate armarx_face_recognition to FaceRecognition segment

parent f445cd29
No related branches found
No related tags found
1 merge request!129Resolve "Import/Export of human profiles to PriorKnowledgeData"
add_subdirectory(face_recognition)
\ No newline at end of file
add_subdirectory(armarx_face_recognition)
......@@ -12,6 +12,9 @@ robot = A6()
logger = logging.getLogger(__name__)
# ToDo: Update to new data types / move data types to armarx-dev.
def report_to_memory(p: Person):
import numpy as np
from armarx_memory.client import MemoryNameSystem
......@@ -50,7 +53,7 @@ def on_detected_faces(detected_faces, time):
report_to_memory(f.person)
check_birthday(f.person)
greet_persons(f.person)
#look_at_person(f.person)
# look_at_person(f.person)
f.person.update(last_seen=now)
......@@ -60,7 +63,7 @@ def look_at_person(p: Person):
def check_birthday(p: Person):
if not p.birthday:
if not p.birthday:
return
now = datetime.datetime.now()
birthday_in_current_year = p.birthday(year=now.date().year)
......@@ -107,4 +110,3 @@ def label_face() -> str:
name = sys.stdin.readline().strip()
return name
return None
#!/usr/bin/env python3
import os
import glob
import numpy as np
import logging
from armarx.parser import ArmarXArgumentParser as ArgumentParser
from armarx_face_recognition.face_detection import FaceDetection
from armarx_face_recognition.db_model import Person
from armarx_face_recognition.db_model import Face
from armarx_face_recognition.db_model import db
import armarx_face_recognition
logger = logging.getLogger(__name__)
def main():
parser = ArgumentParser('Example Image Provider')
parser.add_argument('-i', '--input-provider', default='OpenNIPointCloudProvider')
parser.add_argument('--create-tables', action='store_true', help='Initializes the database.')
parser.add_argument('--drop-tables', action='store_true', help='Drop tables if they already exist.')
parser.add_argument('--images', default='/tmp/*.jpg', help='Known faces.')
args = parser.parse_args()
np.set_printoptions(suppress=True,precision=1)
logger.debug('Setting up database')
db.connect()
if args.create_tables:
if args.drop_tables:
db.drop_tables([Person, Face])
db.create_tables([Person, Face])
Person.create(given_name='Unknown')
image_files = [f for f in glob.glob(args.images)]
if not image_files:
logger.error('No images found. Aborting')
logger.debug('Found %d images ', len(image_files))
for f in image_files:
name = os.path.splitext(os.path.basename(f))[0]
logger.info('Loading person %s', name)
person = Person.create(given_name=name)
logger.debug('Creating face encoding for person %s', name)
image = armarx_face_recognition.load_image_file(f)
encoding = armarx_face_recognition.face_encodings(image)[0]
Face.create(person=person, face_encoding=memoryview(encoding))
logger.debug('Starting example image processor')
image_processor = FaceDetection(args.input_provider)
image_processor.register()
image_processor.update_calibration()
if __name__ == '__main__':
main()
import dataclasses as dc
import numpy as np
import typing as ty
from armarx_memory import client as mem
@dc.dataclass
class Person:
face_encodings: ty.List[np.ndarray] = dc.field(default_factory=list)
profile_id: ty.Optional[mem.MemoryID] = None
profile: ty.Dict = dc.field(default_factory=dict)
def name(self) -> str:
try:
return self.profile["spokenNames"][0]
except KeyError:
if self.profile_id is not None:
return self.profile_id.entity_name
else:
return "<unknown>"
@dc.dataclass
class FaceRecognition:
position_3d: np.ndarray
position_2d: np.ndarray
extents_2d: np.ndarray
profile_id: mem.MemoryID
last_seen_usec: int = -1
last_greeted_usec: int = -1
def to_aron_data(self):
return {
"position3D": self.position_3d.astype(np.float32).reshape(3, 1),
"position2D": self.position_2d.astype(np.int32),
"extents2D": self.extents_2d.astype(np.int32),
"profileID": self.profile_id.to_aron() if self.profile_id else mem.MemoryID().to_aron(),
}
def to_aron(self):
from armarx_memory.aron.conversion import to_aron
return to_aron(self.to_aron_data())
import datetime
from peewee import *
db = SqliteDatabase('/tmp/people.db')
class BaseModel(Model):
class Meta:
database = db
class Person(BaseModel):
given_name = CharField()
family_name = CharField(null=True)
birthday = DateField(null=True)
roles = CharField(default='user')
last_seen = DateTimeField(default=datetime.datetime(1,1,1))
last_greeted = DateTimeField(default=datetime.datetime(1,1,1))
birthday_year_congratulated = IntegerField(default=0)
created_date = DateTimeField(default=datetime.datetime.now)
x = FloatField(default=0)
y = FloatField(default=0)
z = FloatField(default=0)
def __str__(self):
return f'{self.given_name} {self.family_name}'
class Face(BaseModel):
person = ForeignKeyField(Person, backref='faces')
face_encoding = BlobField(null=True)
......@@ -3,57 +3,23 @@
import os
import logging
import math
import dataclasses as dc
import typing as ty
import numpy as np
import cv2
import numpy as np
import face_recognition
from armarx import slice_loader
from armarx import cmake_helper
from armarx import arviz as viz
from armarx_memory import client as mem
slice_loader.load_armarx_slice("RobotAPI", "core/FramedPoseBase.ice")
from armarx import FramedPositionBase
from armarx.pose_helper import convert_position_to_global
from armarx import FramedPositionBase
from visionx import StereoCalibrationInterfacePrx
from visionx.image_processor import ImageProcessor
from visionx import StereoCalibrationInterfacePrx
# from .actions import on_detected_faces
@dc.dataclass
class Person:
face_encoding: ty.Optional[np.ndarray]
profile_id: ty.Optional[mem.MemoryID]
profile: ty.Dict = dc.field(default_factory=dict)
def name(self) -> str:
try:
return self.profile["spokenNames"][0]
except KeyError:
if self.profile_id is not None:
return self.profile_id.entity_name
else:
return "<unknown>"
@dc.dataclass
class FaceRecognition:
position_3d: np.ndarray
position_2d: np.ndarray
extents_2d: np.ndarray
profile_id: mem.MemoryID
last_seen_usec: int = -1
last_greeted_usec: int = -1
from armarx_face_recognition.datatypes import Person, FaceRecognition
class FaceDetection(ImageProcessor):
......@@ -63,15 +29,14 @@ class FaceDetection(ImageProcessor):
provider_name: str,
persons: ty.List[Person],
num_result_images=None,
enable_result_image=True,
has_depth=False,
face_recognition_segment_id=mem.MemoryID("Human", "FaceRecognition"),
):
super().__init__(provider_name, num_result_images)
self.persons = persons
self.unknown_person = Person(face_encoding=None, profile_id=None, profile={})
self.logger = logging.getLogger(__name__)
self.arviz = viz.Client(__name__)
self.unknown_person = Person()
self.has_depth = has_depth
self.calibration = {
......@@ -80,15 +45,24 @@ class FaceDetection(ImageProcessor):
"horizontal_fov": 0.8, "vertical_fov": 0.9
}
self.enable_result_image = enable_result_image
self.agent_name = "Armar6"
self.camera_frame_name = "AzureKinectCamera"
self.logger = logging.getLogger(__name__)
self.arviz = viz.Client(__name__)
self.mns = mem.MemoryNameSystem.wait_for_mns()
self.face_recognition_segment_id = face_recognition_segment_id
self.face_reco_writer = self.mns.wait_for_writer(face_recognition_segment_id)
@classmethod
def query_human_profiles(
cls,
mns: mem.MemoryNameSystem,
logger = None
logger=None
):
human_reader = mns.wait_for_reader(mem.MemoryID("Human"))
result = human_reader.query_core_segment("Profile")
......@@ -97,29 +71,29 @@ class FaceDetection(ImageProcessor):
if logger:
logger.info(f"Gathering face images of {id} ...")
face_image_paths = profile["faceImagePaths"]
for face_image_path in face_image_paths:
from armarx import cmake_helper
face_encodings = []
for face_image_path in profile["faceImagePaths"]:
package: str = face_image_path["package"]
path: str = face_image_path["path"]
rel_path: str = face_image_path["path"]
[data_path] = cmake_helper.get_data_path(package)
abs_path = os.path.join(data_path, package, path)
abs_path = os.path.join(data_path, package, rel_path)
if os.path.isfile(abs_path):
name = profile["spokenNames"][0] or id.entity_name
if logger is not None:
logger.info("Loading person '%s'", name)
logger.debug("Creating face encoding for person %s", name)
logger.info("Loading person '%s'.", name)
logger.debug("Creating face encoding for person '%s'.", name)
image = face_recognition.load_image_file(abs_path)
encoding = face_recognition.face_encodings(image)[0]
return Person(face_encoding=encoding, profile_id=id, profile=profile)
face_encodings.append(encoding)
return None
if face_encodings:
return Person(face_encodings=face_encodings, profile_id=id, profile=profile)
else:
return None
persons: ty.List[Person] = list(filter(bool, human_reader.for_each_instance_data(gather_persons, result)))
persons: ty.List[Person] = human_reader.for_each_instance_data(gather_persons, result, discard_none=True)
return persons
def update_calibration(self):
......@@ -139,35 +113,53 @@ class FaceDetection(ImageProcessor):
self.calibration.update(calibration)
def process_images(self, images, info):
image_rgb = images[0]
def visu_pose(self, person_name, x, y, z):
with self.arviz.begin_stage(commit_on_exit=True) as stage:
layer = stage.layer("Location")
layer.add(viz.Sphere(person_name, position=(x, y, z), radius=100, color=(255, 0, 255)))
face_locations = face_recognition.face_locations(image_rgb)
face_encodings = face_recognition.face_encodings(image_rgb, face_locations)
# self.logger.info("Found %s faces.", len(face_locations))
detected_persons = self.detect_persons(face_encodings)
self.logger.info("Found %s persons.", len(detected_persons))
def process_images(self, images, info):
image = images[0]
face_recognitions = self.make_face_recognitions(images, face_locations, detected_persons)
face_locations = face_recognition.face_locations(image)
face_encodings = face_recognition.face_encodings(image, face_locations)
self.commit_recognitions(face_recognitions, info.timeProvided)
self.logger.info("Found %s faces.", len(face_locations))
if self.enable_result_image:
self.draw_result_image(image_rgb, face_locations, detected_persons)
self.result_image_provider.update_image(images, info.timeProvided)
return images, info
known_persons = self.persons
known_face_encodings = [p.face_encoding for p in known_persons]
def detect_persons(self, face_encodings) -> ty.List[Person]:
face_encoding_index: ty.Dict[int, Person] = {}
known_face_encodings: ty.List[np.ndarray] = []
for person in self.persons:
for encoding in person.face_encodings:
face_encoding_index[len(known_face_encodings)] = person
known_face_encodings.append(encoding)
detected_persons = []
for face_encoding in face_encodings:
matches = face_recognition.compare_faces(known_face_encodings, face_encoding)
face_distances = face_recognition.face_distance(known_face_encodings, face_encoding)
matches = face_recognition.compare_faces(face_encoding, known_face_encodings)
face_distances = face_recognition.face_distance(face_encoding, known_face_encodings)
best_match_index = np.argmin(face_distances)
if matches[best_match_index] and face_distances[best_match_index] < 0.5:
person = known_persons[best_match_index]
person = face_encoding_index[best_match_index]
else:
person = self.unknown_person
detected_persons.append(person)
return detected_persons
def make_face_recognitions(
self,
images,
face_locations,
detected_persons: ty.List[Person],
) -> ty.List[FaceRecognition]:
calibration = self.calibration
scale_x = math.tan(calibration["horizontal_fov"] / 2.0) * 2.0
scale_y = math.tan(calibration["vertical_fov"] / 2.0) * 2.0
......@@ -198,21 +190,46 @@ class FaceDetection(ImageProcessor):
)
face_recognitions.append(recognition)
self.visu_pose(person.name(), *position_3d)
self.logger.info("Found %s faces.", detected_persons)
for (top, right, bottom, left), person in zip(face_locations, detected_persons):
cv2.rectangle(images[0], (left, top), (right, bottom), (0, 0, 255), 2)
cv2.rectangle(images[0], (left, bottom - 35), (right, bottom), (0, 0, 255), cv2.FILLED)
cv2.putText(images[0], person.name(), (left + 6, bottom - 6), cv2.FONT_HERSHEY_DUPLEX, 1.0, (255, 255, 255), 1)
self.visu_pose(person.name(), position_3d)
self.result_image_provider.update_image(images, info.timeProvided)
return face_recognitions
# on_detected_faces(detected_faces, info.timeProvided)
self.commit_recognitions(face_recognitions)
def commit_recognitions(
self,
recognitions: ty.List[FaceRecognition],
time_usec: int
):
commit = mem.Commit()
for reco in recognitions:
update = commit.add()
entity_name = reco.profile_id.entity_name if reco.profile_id else "<unknown>"
update.entity_id = (self.face_recognition_segment_id
.with_provider_segment_name(__name__)
.with_entity_name(entity_name))
update.time_created_usec = time_usec
update.instances_data = [reco.to_aron()]
self.face_reco_writer.commit(commit)
def visu_pose(
self,
person_name,
position: np.ndarray,
):
with self.arviz.begin_stage(commit_on_exit=True) as stage:
layer = stage.layer("Location")
layer.add(viz.Sphere(person_name, position=position, radius=100, color=(255, 0, 255)))
return images, info
@classmethod
def draw_result_image(
cls,
image_rgb,
face_locations,
detected_persons,
):
for (top, right, bottom, left), person in zip(face_locations, detected_persons):
cv2.rectangle(image_rgb, (left, top), (right, bottom), (0, 0, 255), 2)
cv2.rectangle(image_rgb, (left, bottom - 35), (right, bottom), (0, 0, 255), cv2.FILLED)
def commit_recognitions(self, recognitions: ty.List[FaceRecognition]):
pass
cv2.putText(image_rgb, person.name(), (left + 6, bottom - 6),
cv2.FONT_HERSHEY_DUPLEX, 1.0, (255, 255, 255), 1)
......@@ -5,7 +5,6 @@ from armarx import RobotStateComponentInterfacePrx
from armarx import FramedPositionBase
from armarx import FramedPoseBase
from armarx import FramedOrientationBase
from armarx.robots import A6
def pose2mat(pose: FramedPoseBase) -> np.ndarray:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment