Skip to content
Snippets Groups Projects
Memory.cpp 8.29 KiB
#include "Memory.h"

#include <ArmarXCore/core/logging/Logging.h>
#include <ArmarXCore/core/time/TimeUtil.h>

#include <RobotAPI/libraries/armem/server/wm/memory_definitions.h>

namespace armarx::armem::server::ltm
{
    void
    Memory::_configure(const nlohmann::json& json)
    {
        std::lock_guard l(ltm_mutex);

        BufferedBase::configureMixin(json);
        CachedBase::configureMixin(json);
        DiskMemoryBase::configureMixin(json);
        MongoDBStorageMixin::configureMixin(json);
    }

    Memory::Memory() :
        Memory(std::filesystem::path("/tmp/ARMARX/LTM_Exports"), {}, "MemoryExport", "Test")
    {
    }

    Memory::Memory(const detail::mixin::Path& p,

                   const std::string& exportName,
                   const std::string& memoryName /* UNESCAPED */) :
        Memory(p, {}, exportName, memoryName)
    {
    }

    Memory::Memory(const detail::mixin::Path& p,
                   const detail::mixin::MongoDBSettings& s,

                   const std::string& exportName,
                   const std::string& memoryName /* UNESCAPED */) :
        MemoryBase(exportName, MemoryID(memoryName, "")),
        BufferedBase(MemoryID(memoryName, "")),
        CachedBase(MemoryID(memoryName, "")),
        DiskMemoryItemMixin(p, exportName, MemoryID(memoryName, "")),
        MongoDBStorageMixin(s, exportName, MemoryID(memoryName, ""))
    {
        //set path to include date of creation as prefix:
        this->current_date = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
        std::tm* localTime = std::localtime(&this->current_date);

        // convert current time into string:
        std::stringstream ss;
        ss << std::put_time(localTime, "%Y_%m_%d");
        std::string dateString = ss.str();

        // change memory base path to include current date (date of memory creation)
        std::filesystem::path current_base_path = this->getMemoryBasePath();
        this->setMemoryBasePath(current_base_path.append(dateString));

        //inform user about change:
        ARMARX_DEBUG << "Changed memory base path to include current date of "
                    << dateString
                    << " to "
                    << this->getMemoryBasePath()
                       ;

        ARMARX_INFO << "Creating a new memory at " << p.string() << " and " << exportName;

    }

    void
    Memory::_setExportName(const std::string& n)
    {
        DiskMemoryBase::setMixinExportName(n);
        MongoDBStorageMixin::setMixinExportName(n);
    }

    void
    Memory::_enable()
    {
        BufferedBase::start();
        //MongoDBStorageMixin::start();
    }

    void
    Memory::_disable()
    {
        BufferedBase::stop();
        MongoDBStorageMixin::stop();
    }

    void
    Memory::_setMemoryID(const MemoryID& id)
    {
        std::lock_guard l(ltm_mutex);

        BufferedBase::setMixinMemoryID(id);
        CachedBase::setMixinMemoryID(id);
        DiskMemoryBase::setMixinMemoryID(id);
        MongoDBStorageMixin::setMixinMemoryID(id);
    }

    bool
    Memory::forEachCoreSegment(std::function<void(CoreSegment&)> func) const
    {
        std::lock_guard l(ltm_mutex);

        // legacy: check fs
        if (fullPathExists())
        {
            for (const auto& subdirName : getAllDirectories())
            {

                std::string segmentName = util::fs::detail::unescapeName(subdirName);
                ARMARX_INFO << "Name of segment " << segmentName;
                ARMARX_INFO << "After getting segment name";
                ARMARX_INFO << VAROUT(id().withCoreSegmentName(segmentName));
                //ARMARX_INFO << VAROUT(id().getCoreSegmentID());
                CoreSegment c(getMemoryBasePath(),
                              getSettings(),
                              getExportName(),
                              id().withCoreSegmentName(segmentName),
                              processors);
                func(c);
            }
        }

        ARMARX_INFO << "All CoreSegments handeled";

        return true;
    }

    bool
    Memory::hasCoreSegment(const std::string& name) const
    {
        std::lock_guard l(ltm_mutex);

        if (cacheHasCoreSegment(name))
        {
            return true;
        }
        // check if collection exists
        /*if (connected() && collectionExists())
        {
            auto c = CoreSegment(getMemoryBasePath(),
                                 getSettings(),
                                 getExportName(),
                                 id().withCoreSegmentName(name),
                                 processors);

            return (bool)c.collectionExists();
        }*/

        // legacy: check if segment is stored on hard drive without db
        if (fullPathExists())
        {
            //ARMARX_INFO << VAROUT(id());
            //ARMARX_INFO << VAROUT(id().getCoreSegmentID());
            auto c = CoreSegment(getMemoryBasePath(),
                                 getSettings(),
                                 getExportName(),
                                 id().withCoreSegmentName(name),
                                 processors);

            return c.fullPathExists();
        }

        return false;
    }

    std::unique_ptr<CoreSegment>
    Memory::findCoreSegment(const std::string& coreSegmentName) const
    {
        std::lock_guard l(ltm_mutex);

        if (!hasCoreSegment(coreSegmentName))
        {
            return nullptr;
        }

        return std::make_unique<CoreSegment>(getMemoryBasePath(),
                                             getSettings(),
                                             getExportName(),
                                             id().withCoreSegmentName(coreSegmentName),
                                             processors);
    }

    void
    Memory::_loadAllReferences(armem::wm::Memory& m)
    {
        std::lock_guard l(ltm_mutex);

        m.id() = id().getMemoryID();

        ARMARX_INFO << VAROUT(id());

        forEachCoreSegment(
            [&m](auto& x)
            {
                armem::wm::CoreSegment s;

                x.loadAllReferences(s);

                //ARMARX_INFO << VAROUT(s.id());
                m.addCoreSegment(s);
            });
    }

    void
    Memory::_resolve(armem::wm::Memory& m)
    {
        std::lock_guard l(ltm_mutex); // we cannot load a memory multiple times simultaneously

        if (/*(connected() && collectionExists()) ||*/ fullPathExists())
        {
            m.forEachCoreSegment(
                [&](auto& e)
                {
                    //ARMARX_INFO << "resolve for CoreSegment " << e.id().str();
                    CoreSegment c(getMemoryBasePath(),
                                  getSettings(),
                                  getExportName(),
                                  id().withCoreSegmentName(e.id().coreSegmentName),
                                  processors);
                    c.resolve(e);
                });
        }
    }

    void
    Memory::_store(const armem::wm::Memory& m)
    {
        BufferedBase::addToBuffer(m);
    }

    void
    Memory::_directlyStore(const armem::wm::Memory& memory)
    {
        std::lock_guard l(ltm_mutex); // we cannot store a memory multiple times simultaneously

        if (id().memoryName.empty())
        {
            ARMARX_WARNING
                << "During storage of memory '" << memory.id().str()
                << "' I noticed that the corresponding LTM has no id set. "
                << "I set the id of the LTM to the same name, however this should not happen!";
            setMemoryID(memory.id());
        }

        /*if (!connected())
        {
            ARMARX_WARNING << "LTM NOT CONNECTED ALTHOUGH ENABLED " << id().str();
            return;
        }*/

        memory.forEachCoreSegment(
            [&](const auto& core)
            {
                CoreSegment c(getMemoryBasePath(),
                              getSettings(),
                              getExportName(),
                              id().withCoreSegmentName(core.id().coreSegmentName),
                              processors);

                // 2. store data
                c.store(core);

                // 3. update statistics
                statistics.recordedCoreSegments++;
            });

        // 4. update cache
        //CachedBase::addToCache(memory);
    }
} // namespace armarx::armem::server::ltm