#include "Memory.h" #include <ArmarXCore/core/logging/Logging.h> #include <ArmarXCore/core/time/TimeUtil.h> #include <RobotAPI/libraries/armem/server/wm/memory_definitions.h> namespace armarx::armem::server::ltm { void Memory::_configure(const nlohmann::json& json) { std::lock_guard l(ltm_mutex); BufferedBase::configureMixin(json); CachedBase::configureMixin(json); DiskMemoryBase::configureMixin(json); MongoDBStorageMixin::configureMixin(json); } Memory::Memory() : Memory(std::filesystem::path("/tmp/ARMARX/LTM_Exports"), {}, "MemoryExport", "Test") { } Memory::Memory(const detail::mixin::Path& p, const std::string& exportName, const std::string& memoryName /* UNESCAPED */) : Memory(p, {}, exportName, memoryName) { } Memory::Memory(const detail::mixin::Path& p, const detail::mixin::MongoDBSettings& s, const std::string& exportName, const std::string& memoryName /* UNESCAPED */) : MemoryBase(exportName, MemoryID(memoryName, "")), BufferedBase(MemoryID(memoryName, "")), CachedBase(MemoryID(memoryName, "")), DiskMemoryItemMixin(p, exportName, MemoryID(memoryName, "")), MongoDBStorageMixin(s, exportName, MemoryID(memoryName, "")) { //set path to include date of creation as prefix: this->current_date = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now()); std::tm* localTime = std::localtime(&this->current_date); // convert current time into string: std::stringstream ss; ss << std::put_time(localTime, "%Y_%m_%d"); std::string dateString = ss.str(); // change memory base path to include current date (date of memory creation) std::filesystem::path current_base_path = this->getMemoryBasePath(); this->setMemoryBasePath(current_base_path.append(dateString)); //inform user about change: ARMARX_DEBUG << "Changed memory base path to include current date of " << dateString << " to " << this->getMemoryBasePath() ; ARMARX_INFO << "Creating a new memory at " << p.string() << " and " << exportName; } void Memory::_setExportName(const std::string& n) { DiskMemoryBase::setMixinExportName(n); MongoDBStorageMixin::setMixinExportName(n); } void Memory::_enable() { BufferedBase::start(); //MongoDBStorageMixin::start(); } void Memory::_disable() { BufferedBase::stop(); MongoDBStorageMixin::stop(); } void Memory::_setMemoryID(const MemoryID& id) { std::lock_guard l(ltm_mutex); BufferedBase::setMixinMemoryID(id); CachedBase::setMixinMemoryID(id); DiskMemoryBase::setMixinMemoryID(id); MongoDBStorageMixin::setMixinMemoryID(id); } bool Memory::forEachCoreSegment(std::function<void(CoreSegment&)> func) const { std::lock_guard l(ltm_mutex); // legacy: check fs if (fullPathExists()) { for (const auto& subdirName : getAllDirectories()) { std::string segmentName = util::fs::detail::unescapeName(subdirName); ARMARX_INFO << "Name of segment " << segmentName; ARMARX_INFO << "After getting segment name"; ARMARX_INFO << VAROUT(id().withCoreSegmentName(segmentName)); //ARMARX_INFO << VAROUT(id().getCoreSegmentID()); CoreSegment c(getMemoryBasePath(), getSettings(), getExportName(), id().withCoreSegmentName(segmentName), processors); func(c); } } ARMARX_INFO << "All CoreSegments handeled"; return true; } bool Memory::hasCoreSegment(const std::string& name) const { std::lock_guard l(ltm_mutex); if (cacheHasCoreSegment(name)) { return true; } // check if collection exists /*if (connected() && collectionExists()) { auto c = CoreSegment(getMemoryBasePath(), getSettings(), getExportName(), id().withCoreSegmentName(name), processors); return (bool)c.collectionExists(); }*/ // legacy: check if segment is stored on hard drive without db if (fullPathExists()) { //ARMARX_INFO << VAROUT(id()); //ARMARX_INFO << VAROUT(id().getCoreSegmentID()); auto c = CoreSegment(getMemoryBasePath(), getSettings(), getExportName(), id().withCoreSegmentName(name), processors); return c.fullPathExists(); } return false; } std::unique_ptr<CoreSegment> Memory::findCoreSegment(const std::string& coreSegmentName) const { std::lock_guard l(ltm_mutex); if (!hasCoreSegment(coreSegmentName)) { return nullptr; } return std::make_unique<CoreSegment>(getMemoryBasePath(), getSettings(), getExportName(), id().withCoreSegmentName(coreSegmentName), processors); } void Memory::_loadAllReferences(armem::wm::Memory& m) { std::lock_guard l(ltm_mutex); m.id() = id().getMemoryID(); ARMARX_INFO << VAROUT(id()); forEachCoreSegment( [&m](auto& x) { armem::wm::CoreSegment s; x.loadAllReferences(s); //ARMARX_INFO << VAROUT(s.id()); m.addCoreSegment(s); }); } void Memory::_resolve(armem::wm::Memory& m) { std::lock_guard l(ltm_mutex); // we cannot load a memory multiple times simultaneously if (/*(connected() && collectionExists()) ||*/ fullPathExists()) { m.forEachCoreSegment( [&](auto& e) { //ARMARX_INFO << "resolve for CoreSegment " << e.id().str(); CoreSegment c(getMemoryBasePath(), getSettings(), getExportName(), id().withCoreSegmentName(e.id().coreSegmentName), processors); c.resolve(e); }); } } void Memory::_store(const armem::wm::Memory& m) { BufferedBase::addToBuffer(m); } void Memory::_directlyStore(const armem::wm::Memory& memory) { std::lock_guard l(ltm_mutex); // we cannot store a memory multiple times simultaneously if (id().memoryName.empty()) { ARMARX_WARNING << "During storage of memory '" << memory.id().str() << "' I noticed that the corresponding LTM has no id set. " << "I set the id of the LTM to the same name, however this should not happen!"; setMemoryID(memory.id()); } /*if (!connected()) { ARMARX_WARNING << "LTM NOT CONNECTED ALTHOUGH ENABLED " << id().str(); return; }*/ memory.forEachCoreSegment( [&](const auto& core) { CoreSegment c(getMemoryBasePath(), getSettings(), getExportName(), id().withCoreSegmentName(core.id().coreSegmentName), processors); // 2. store data c.store(core); // 3. update statistics statistics.recordedCoreSegments++; }); // 4. update cache //CachedBase::addToCache(memory); } } // namespace armarx::armem::server::ltm