diff --git a/python/skills/skills/query_symbolic_scene_memory.py b/python/skills/skills/query_symbolic_scene_memory.py index 067ff590a0aea0092398d62bc896b795929df345..d2f724fad37115b5a5390d52ae1ae2a9b08c6899 100644 --- a/python/skills/skills/query_symbolic_scene_memory.py +++ b/python/skills/skills/query_symbolic_scene_memory.py @@ -39,23 +39,49 @@ class ObjectLastSeenAtIceClient(dti.skills.object_last_seen_at.core.ObjectLastSe self.ltm_name = "MemoryExport" - # object-id: last_request, timestamp, location - # in the new request we just have to search until the last_request time. If the object was not seen between last - #request and now the newest location is still the same - self.cached_locations = {} + self.request_event = threading.Event() + self.result_event = threading.Event() + self.abort_event = threading.Event() + self.request = None + self.result = None + self.worker_thread = threading.Thread(target=self.run_execution) + self.worker_thread.start() + self.worker_running = False + def queryMemoryForObjectLastSeenAt(self, ltm_base_path: str, object_name: str, ltm_name: str, current=None): - self.ltm_name = ltm_name - wm_result = {} - #wm_result = query_wm_for_object_location(self.mns, self.core_segment_id, object_name) # skipping for demo - if wm_result == {}: + + self.request = (ltm_base_path, object_name, ltm_name) + self.request_event.set() + self.result_event.wait() + return self.result[0], self.result[1], self.result[2] + + + def run_execution(self): + + object_name = self.request[1] + ltm_base_path = self.request[0] + ltm_name = self.request[2] + print("Thread for queryMemoryForObjectLastSeenAt started") + + while True: + print("Started") + self.worker_running = False + self.request_event.wait() + self.worker_running = True + request = self.request + self.abort_event.clear() + print("Received request") + self.request_event.clear() + + return_result = True + print("Could not find", object_name, "in extended WM, searching LTM") - ltm_result, timestamp, mem_id = query_ltm_for_object_location(ltm_base_path, object_name, self.ltm_name) + ltm_result, timestamp, mem_id = query_ltm_for_object_location(ltm_base_path, object_name, ltm_name) print("Done with searching LTM") query_result = dti.skills.object_last_seen_at.core.QueryResult() print("Got query result object") query_result.locationName = ltm_result - # needed: object id and timestamp of symbolic scene memory print("Memory ID for object pose query:", mem_id) if(mem_id == None): query_result.objectInstance = None @@ -76,155 +102,141 @@ class ObjectLastSeenAtIceClient(dti.skills.object_last_seen_at.core.ObjectLastSe print("IndexError, skipping") query_result.objectInstance = None return query_result - else: - print("No object pose found") - query_result = dti.skills.object_last_seen_at.core.QueryResult() - query_result.locationName = make_name_pretty(wm_result, self.object_core_segment_id) - query_result.objectInstance = None - return query_result -def query_ltm_for_object_pose(entity_id, timestamp): - try: - print("Searching for the object pose") - reader = mns.wait_for_reader(entity_id) + def query_ltm_for_object_pose(self, entity_id, timestamp): + try: + print("Searching for the object pose") + reader = mns.wait_for_reader(entity_id) + print("Got reader") + memory = reader.query_core_segment(entity_id.core_segment_name) + print("Got memory") + for prov in memory.coreSegments[entity_id.core_segment_name].providerSegments.values(): + for entity in prov.entities.values(): + for snapshot in reversed(entity.history.values()): + for instance in snapshot.instances: + id = mem.MemoryID.from_ice(instance.id) + if entity_id.get_all_items()[3] == id.entity_name: + #metadata = pythonic_from_to_aron_ice.pythonic_from_aron_ice(instance.metadata) + stored = id.timestamp_usec #metadata.timeStored + if stored <= timestamp: + print("Found closest timestamp for entity", entity_id, "at", stored, "for", timestamp) + print("Returning object pose") + return instance.data + print("Could not find any matching object pose") + return None + except Exception as e: + print("Could not find object pose") + print(e) + return None + + def make_name_pretty(self, location, core_id): + print("Location:", location) + core_id = MemoryID("Object", "Class") + print("Core ID for location query:", core_id) + try: + type_of_position = location.split('/')[-1].split(":")[0].split("-")[0] + name_of_location = query_mem_for_object_class_info(mns, location, core_id) + pretty_name = type_of_position + " the " + name_of_location + return pretty_name + except Exception as e: + print("Using fall back option for making names pretty") + print(e) + name_of_location = location.split('/')[-2] + name_of_location_clean = re.sub(r'\d+', '', name_of_location) + print(name_of_location_clean) + type_of_position = location.split('/')[-1].split(":")[0].split("-")[0] + pretty_name = type_of_position + " the " + name_of_location_clean + print(pretty_name) + return pretty_name + + def query_mem_for_object_class_info(self, mns, object_name, core_id): + location_name = object_name.split('/')[1] + print("Searching for object class information for location", location_name) + reader = mns.wait_for_reader(core_id) print("Got reader") - # TODO: invalid value for element 0 -> something is still wrong with the entity_id and the way I query this reader - memory = reader.query_core_segment(entity_id.core_segment_name) - print("Got memory") - for prov in memory.coreSegments[entity_id.core_segment_name].providerSegments.values(): + memory = reader.query_core_segment(name=core_id.core_segment_name, latest_snapshot=True) + print("Got memory and reader") + # this is Object/Class and inside PriorKnowledgeData/Room, Dataset/name/timestamp: names.spoken[0] + for prov in memory.coreSegments[core_id.core_segment_name].providerSegments.values(): for entity in prov.entities.values(): - for snapshot in reversed(entity.history.values()): + for snapshot in entity.history.values(): for instance in snapshot.instances: - id = mem.MemoryID.from_ice(instance.id) - if entity_id.get_all_items()[3] == id.entity_name: - #metadata = pythonic_from_to_aron_ice.pythonic_from_aron_ice(instance.metadata) - stored = id.timestamp_usec #metadata.timeStored - if stored <= timestamp: - print("Found closest timestamp for entity", entity_id, "at", stored, "for", timestamp) - print("Returning object pose") - return instance.data - print("Could not find any matching object pose") - return None - except Exception as e: - print("Could not find object pose") - print(e) - return None - - - -def make_name_pretty(location, core_id): - print("Location:", location) - core_id = MemoryID("Object", "Class") - print("Core ID for location query:", core_id) - try: - type_of_position = location.split('/')[-1].split(":")[0].split("-")[0] - name_of_location = query_mem_for_object_class_info(mns, location, core_id) - pretty_name = type_of_position + " the " + name_of_location - return pretty_name - except Exception as e: - print("Using fall back option for making names pretty") - print(e) - name_of_location = location.split('/')[-2] - name_of_location_clean = re.sub(r'\d+', '', name_of_location) - print(name_of_location_clean) - type_of_position = location.split('/')[-1].split(":")[0].split("-")[0] - pretty_name = type_of_position + " the " + name_of_location_clean - print(pretty_name) - return pretty_name - - -def query_mem_for_object_class_info(mns, object_name, core_id): - location_name = object_name.split('/')[1] - print("Searching for object class information for location", location_name) - reader = mns.wait_for_reader(core_id) - print("Got reader") - memory = reader.query_core_segment(name=core_id.core_segment_name, latest_snapshot=True) - print("Got memory and reader") - # this is Object/Class and inside PriorKnowledgeData/Room, Dataset/name/timestamp: names.spoken[0] - for prov in memory.coreSegments[core_id.core_segment_name].providerSegments.values(): - for entity in prov.entities.values(): - for snapshot in entity.history.values(): - for instance in snapshot.instances: - data: Dict[str, Any] = pythonic_from_to_aron_ice.pythonic_from_aron_ice(instance.data) - try: - o_id = data['id']['className'] - o_names_spoken = data['names']['spoken'] - - if o_id == location_name: - print("Found object class information for object", object_name) - print("Spoken names are", o_names_spoken) - print("Object id:", o_id) - - return o_names_spoken[0] - - except KeyError: - print("Cannot read className, recognize and spoken names") - - -def query_wm_for_object_location(mns, core_segment_id: mem.MemoryID, object_name): - print("Starting to search extended WM for object location") - # Get Reader. - example_reader = mns.wait_for_reader(core_segment_id) - - # Perform query. - memory = example_reader.query_core_segment( - name=core_segment_id.core_segment_name - ) - - core_segment_name = core_segment_id.core_segment_name - - result_data = {} - - # Process result. - for prov in memory.coreSegments[core_segment_name].providerSegments.values(): - for entity in prov.entities.values(): - for snapshot in reversed(entity.history.values()): # loop over snapshots in reverse - for instance in snapshot.instances: - - data: Dict[str, Any] = pythonic_from_to_aron_ice.pythonic_from_aron_ice(instance.data) - - objects_in_scene = data["objects"] - for object in objects_in_scene: - if (object_name in object) and (objects_in_scene[object]['objectAt'] != '(unknown)'): - print("Object found in scene at", objects_in_scene[object]['objectAt']) - result_data[object_name] = objects_in_scene[object]['objectAt'] - - return result_data[object_name] - print("object not found in WM") - return result_data - -results = [] - -def search_snapshot_for_location(snapshot, object_name): - try: - for i in snapshot.instances: - i.load() - timestamp = i.metadata.timeStored - all_object_info = i.data.to_primitive() - - for object in all_object_info["objects"]: - if object_name in object: - if all_object_info["objects"][object]['objectAt'] != '(unknown)': - print("Found object at", all_object_info["objects"][object]['objectAt']) - return True, all_object_info["objects"][object]['objectAt'], timestamp, all_object_info["objects"][object]['source'] - except Exception as e: - print("Could not find object in LTM") - print(e) - return False, "(unknown)", 0, None - #print("No location found in this chunk") - return False, None, None, None - -def search_through_snapshots(snapshots, object_name): - i = snapshots[0].instances[0] - i.load() - latest_snapshot = i.metadata.timeStored - for s in snapshots: - r = search_snapshot_for_location(s, object_name) - if r[0]: - return r[1], r[2], r[3] - return "(unknown)", 0, None + data: Dict[str, Any] = pythonic_from_to_aron_ice.pythonic_from_aron_ice(instance.data) + try: + o_id = data['id']['className'] + o_names_spoken = data['names']['spoken'] + + if o_id == location_name: + print("Found object class information for object", object_name) + print("Spoken names are", o_names_spoken) + print("Object id:", o_id) + + return o_names_spoken[0] + + except KeyError: + print("Cannot read className, recognize and spoken names") + + def query_wm_for_object_location(self, mns, core_segment_id: mem.MemoryID, object_name): + print("Starting to search extended WM for object location") + # Get Reader. + example_reader = mns.wait_for_reader(core_segment_id) + + # Perform query. + memory = example_reader.query_core_segment( + name=core_segment_id.core_segment_name + ) + + core_segment_name = core_segment_id.core_segment_name + + result_data = {} + + # Process result. + for prov in memory.coreSegments[core_segment_name].providerSegments.values(): + for entity in prov.entities.values(): + for snapshot in reversed(entity.history.values()): # loop over snapshots in reverse + for instance in snapshot.instances: + + data: Dict[str, Any] = pythonic_from_to_aron_ice.pythonic_from_aron_ice(instance.data) + objects_in_scene = data["objects"] + for object in objects_in_scene: + if (object_name in object) and (objects_in_scene[object]['objectAt'] != '(unknown)'): + print("Object found in scene at", objects_in_scene[object]['objectAt']) + result_data[object_name] = objects_in_scene[object]['objectAt'] + + return result_data[object_name] + print("object not found in WM") + return result_data + + def search_snapshot_for_location(self, snapshot, object_name): + try: + for i in snapshot.instances: + i.load() + timestamp = i.metadata.timeStored + all_object_info = i.data.to_primitive() + + for object in all_object_info["objects"]: + if object_name in object: + if all_object_info["objects"][object]['objectAt'] != '(unknown)': + print("Found object at", all_object_info["objects"][object]['objectAt']) + return True, all_object_info["objects"][object]['objectAt'], timestamp, all_object_info["objects"][object]['source'] + except Exception as e: + print("Could not find object in LTM") + print(e) + return False, "(unknown)", 0, None + return False, None, None, None + + def search_through_snapshots(self, snapshots, object_name): + i = snapshots[0].instances[0] + i.load() + latest_snapshot = i.metadata.timeStored + for s in snapshots: + r = search_snapshot_for_location(s, object_name) + if r[0]: + return r[1], r[2], r[3] + return "(unknown)", 0, None +''' def search_through_snapshots_in_parallel(snapshots, object_name, min_snap_length=512, thread_pool_size=16): def split_into_chunks(lst, n): # Determine the size of each chunk @@ -277,86 +289,81 @@ def search_through_snapshots_in_parallel(snapshots, object_name, min_snap_length closest_timestamp = res[1] closest_res = res return closest_res - - -def q( - ltm_base_path, - object_name, - ltm_name -): - beginning_of_ltm_search = time.time() +''' - ent_id = MemoryID("SymbolicScene", "SymbolicSceneDescription", "SymbolicScene", "SceneDescription") - entity_base_path = ltm_base_path + '/' + ltm_name + '/' + 'SymbolicScene/SymbolicSceneDescription/SymbolicScene' - ent = Entity(ent_id, entity_base_path) + def q( + self, + ltm_base_path, + object_name, + ltm_name + ): + beginning_of_ltm_search = time.time() - ent.loadReferences() - loaded_references_time = time.time() - print("Loaded all references for entity", ent.id, "in", "{:.4f}".format(loaded_references_time - beginning_of_ltm_search), "seconds") + ent_id = MemoryID("SymbolicScene", "SymbolicSceneDescription", "SymbolicScene", "SceneDescription") + entity_base_path = ltm_base_path + '/' + ltm_name + '/' + 'SymbolicScene/SymbolicSceneDescription/SymbolicScene' + ent = Entity(ent_id, entity_base_path) - snapshots = list(ent.snapshots.values()) - snapshots.sort(key=lambda x: x.id.timestamp_usec, reverse=True) + ent.loadReferences() + loaded_references_time = time.time() + print("Loaded all references for entity", ent.id, "in", "{:.4f}".format(loaded_references_time - beginning_of_ltm_search), "seconds") - sorted_time = time.time() + snapshots = list(ent.snapshots.values()) + snapshots.sort(key=lambda x: x.id.timestamp_usec, reverse=True) - print("sorted snapshots in", "{:.4f}".format(sorted_time - loaded_references_time), "seconds") + sorted_time = time.time() - type_of_search = 0 - current_time = time.time() * 1000000 + print("sorted snapshots in", "{:.4f}".format(sorted_time - loaded_references_time), "seconds") - #try: - #print("Searching in parallel for", object_name, "in", len(snapshots), "snapshots") - #res = search_through_snapshots_in_parallel(snapshots=snapshots, object_name=object_name, min_snap_length=1024, thread_pool_size=32) - #time_in_minutes = (time.time() * 1000000 - current_time) / 60000000 - #print("Parallel search succeeded in", "{:.4f}".format(time_in_minutes), "minutes, with result", res) - #return res - #except Exception as e: - # print("Parallel search failed") - # print(e) + type_of_search = 0 + current_time = time.time() * 1000000 - current_time = time.time() * 1000000 - - print("Starting linear search") - - try: - for s in snapshots: - r = search_snapshot_for_location(s, object_name) - if r[0]: - found_time = time.time() - time_ago = (current_time - r[2]) - minutes_ago = time_ago / 60000000 - print("Object location was recorded", current_time - r[2], "milliseconds (=", "{:.2f}".format(minutes_ago), "minutes) ago") - print("It took ", "{:.4f}".format(found_time - beginning_of_ltm_search), "seconds to find object") - return r[1], r[2], r[3] - - except Exception as e: - print("Could not find object in LTM") - print(e) - return "(unknown)", 0, None - - - - - -def query_ltm_for_object_location(ltm_base_path, object_name, ltm_name): - print("Starting to search LTM for object") - name_of_location = None - try: - location, timestamp, mem_id = q(ltm_base_path, object_name, ltm_name) - if location: - object_mem_id = mem.MemoryID("Object") - object_core_segment_id = object_mem_id.with_core_segment_name("Class") - return make_name_pretty(location, object_core_segment_id), timestamp, mem_id - else: - print("Could not find object in WM or LTM") - name_of_location = '(unknown)' + print("Starting linear search") + + try: + for s in snapshots: + if self.abort_event.is_set(): + print("Aborting search") + self.abort_event.clear() + print("Old request skipped") + return_result = False + break + r = search_snapshot_for_location(s, object_name) + if r[0]: + found_time = time.time() + time_ago = (current_time - r[2]) + minutes_ago = time_ago / 60000000 + print("Object location was recorded", current_time - r[2], "milliseconds (=", "{:.2f}".format(minutes_ago), "minutes) ago") + print("It took ", "{:.4f}".format(found_time - beginning_of_ltm_search), "seconds to find object") + if return_result: + self.result = (r[1], r[2], r[3]) + else: + self.result = ("(unknown)", 0, None) + self.result_event.set() + except Exception as e: + print("Could not find object in LTM") + print(e) + return "(unknown)", 0, None + + + def query_ltm_for_object_location(self, ltm_base_path, object_name, ltm_name): + print("Starting to search LTM for object") + name_of_location = None + try: + location, timestamp, mem_id = q(ltm_base_path, object_name, ltm_name) + if location: + object_mem_id = mem.MemoryID("Object") + object_core_segment_id = object_mem_id.with_core_segment_name("Class") + return make_name_pretty(location, object_core_segment_id), timestamp, mem_id + else: + print("Could not find object in WM or LTM") + name_of_location = '(unknown)' + return name_of_location, 0, None + except Exception as e: + print("Could not find object in LTM") + print(e) + if name_of_location == None: + name_of_location = '(unknown)' return name_of_location, 0, None - except Exception as e: - print("Could not find object in LTM") - print(e) - if name_of_location == None: - name_of_location = '(unknown)' - return name_of_location, 0, None if __name__ == '__main__':