Skip to content
Snippets Groups Projects
Commit b103038c authored by Fabian Tërnava's avatar Fabian Tërnava
Browse files

Buffered LTM now mixin

parent 810eb237
No related branches found
No related tags found
No related merge requests found
......@@ -4,23 +4,26 @@
namespace armarx::armem::server::ltm
{
template <class _CoreSegmentT>
class BufferedMemoryBase : virtual public MemoryBase<_CoreSegmentT>
class BufferedMemoryItem
{
public:
using MemoryBase<_CoreSegmentT>::MemoryBase;
BufferedMemoryBase(const MemoryID&) = delete; // remove non-default constructor
BufferedMemoryItem() = default;
BufferedMemoryItem(const MemoryID& id) :
buffer(id)
{
}
virtual ~BufferedMemoryItem() = default;
armem::wm::Memory getBuffer() const
{
std::lock_guard l(this->ltm_mutex);
return buffer;
}
void directlyStore(const armem::wm::Memory& memory)
{
TIMING_START(LTM_Memory_DirectlyStore);
_directlyStore(memory);
_directlyStore(memory);
TIMING_END(LTM_Memory_DirectlyStore);
}
......@@ -28,31 +31,22 @@ namespace armarx::armem::server::ltm
{
if (buffer.empty())
{
ARMARX_INFO << "Cannot store an empty buffer. Ignoring.";
ARMARX_INFO << deactivateSpam() << "Cannot store an empty buffer. Ignoring.";
return;
}
this->directlyStore(buffer);
buffer.clear();
}
void setMemoryID(const MemoryID& id) override
{
MemoryBase<_CoreSegmentT>::setMemoryID(id);
buffer.name() = this->name();
}
protected:
virtual void _directlyStore(const armem::wm::Memory& memory) = 0;
void _store(const armem::wm::Memory& memory) override
{
std::lock_guard l(this->ltm_mutex);
buffer.append(memory);
}
protected:
/// Internal memory for data consolidated from wm to ltm (buffer)
/// The to-put-to-ltm buffer (contains data in plain text)
/// This buffer may still be filtered.
/// This means that it is not guaranteed that all data in the buffer will be stored in the ltm
armem::wm::Memory buffer;
};
......
......@@ -4,6 +4,7 @@
namespace armarx::armem::server::ltm
{
// TODO refactor to mixin (see buffered)
template <class _CoreSegmentT>
class CachedMemoryBase : virtual public MemoryBase<_CoreSegmentT>
{
......
......@@ -5,6 +5,13 @@
#include <RobotAPI/libraries/armem/server/wm/memory_definitions.h>
// ArmarX
#include "../base/filter/frequencyFilter/FrequencyFilter.h"
#include "../base/extractor/noExtractor/NoExtractor.h"
#include "../base/extractor/imageExtractor/ImageExtractor.h"
#include "../base/converter/dict/json/JsonConverter.h"
#include "../base/converter/dict/bson/BsonConverter.h"
#include "../base/converter/image/png/PngConverter.h"
namespace armarx::armem::server::ltm::disk
{
......@@ -12,44 +19,57 @@ namespace armarx::armem::server::ltm::disk
{
MemoryID getMemoryIDFromPath(const std::filesystem::path& p)
{
ARMARX_CHECK(!p.empty());
util::ensureFolderExists(p);
MemoryID m;
m.memoryName = p.filename();
return m;
}
std::filesystem::path getDefaultStoragePath()
{
/*std::string armarx_home = std::string(getenv("HOME")) + "/.armarx";
if (getenv("ARMARX_DEFAULTS_DIR"))
{
armarx_home = getenv("ARMARX_DEFAULTS_DIR");
}
path = armarx_home + "/armem/disk/data/db";*/
return "/tmp/MemoryExport/Test";
}
}
Memory::Memory() :
BufferedMemoryBase(),
DiskStorage()
void Memory::createPropertyDefinitions(PropertyDefinitionsPtr& properties, const std::string& prefix)
{
filter = std::make_shared<filter::FrequencyFilter>();
setMemoryID(MemoryID("Test", ""));
properties->optional(path, prefix + "storagepath", "The path to the memory storage.");
}
/*std::string armarx_home = std::string(getenv("HOME")) + "/.armarx";
if (getenv("ARMARX_DEFAULTS_DIR"))
{
armarx_home = getenv("ARMARX_DEFAULTS_DIR");
}
path = armarx_home + "/armem/disk/data/db";*/
path = "/tmp/MemoryExport/" + getExpectedFolderName();
Memory::Memory() :
Memory(getDefaultStoragePath())
{
}
Memory::Memory(const std::filesystem::path& p) :
BufferedMemoryBase(),
DiskStorage(p)
Base(getMemoryIDFromPath(p)),
BufferedMemoryItem(getMemoryIDFromPath(p)),
DiskMemoryItem(p)
{
filter = std::make_shared<filter::FrequencyFilter>();
setPath(p);
setMemoryID(getMemoryIDFromPath(p));
pipeline.memFilter = std::make_shared<filter::MemoryFrequencyFilter>();
pipeline.snapFilter = std::make_shared<filter::SnapshotFrequencyFilter>();
pipeline.extractor = std::make_shared<extractor::ImageExtractor>();
pipeline.dictConverter = std::make_shared<converter::dict::JsonConverter>();
pipeline.imgConverter = std::make_shared<converter::image::PngConverter>();
}
void Memory::setMemoryID(const MemoryID& id)
{
BufferedMemoryBase::setMemoryID(id);
ARMARX_CHECK_NOT_EMPTY(id.memoryName);
Base::setMemoryID(id.getMemoryID());
std::filesystem::path p(path);
setPath(p.parent_path() / id.memoryName);
path = (p.parent_path() / id.memoryName);
buffer.id() = id.getMemoryID();
}
std::string Memory::getExpectedFolderName() const
......@@ -70,7 +90,7 @@ namespace armarx::armem::server::ltm::disk
for (const auto& subdir : std::filesystem::directory_iterator(p))
{
std::filesystem::path subdirPath = subdir.path();
CoreSegment c(subdirPath);
CoreSegment c(subdirPath, pipeline);
func(c);
}
return true;
......@@ -88,7 +108,7 @@ namespace armarx::armem::server::ltm::disk
std::filesystem::path subpath = p / n;
util::ensureFolderExists(subpath, false);
auto c = std::make_shared<CoreSegment>(subpath);
auto c = std::make_shared<CoreSegment>(subpath, pipeline);
return c;
}
......@@ -114,13 +134,25 @@ namespace armarx::armem::server::ltm::disk
m.forEachCoreSegment([this](armem::wm::CoreSegment& e)
{
util::ensureFolderExists(std::filesystem::path(path) / e.id().coreSegmentName, false);
CoreSegment c(std::filesystem::path(path) / e.id().coreSegmentName);
CoreSegment c((std::filesystem::path(path) / e.id().coreSegmentName), pipeline);
c.load(e);
});
}
void Memory::_store(const armem::wm::Memory& memory)
{
std::lock_guard l(this->ltm_mutex);
buffer.append(memory);
}
void Memory::_directlyStore(const armem::wm::Memory& memory)
{
if (!this->pipeline.memFilter->accept(memory))
{
ARMARX_WARNING << deactivateSpam() << "Ignoring to put a Memory into the LTM because it got filtered.";
return;
}
if (!checkPath())
{
return;
......@@ -131,7 +163,7 @@ namespace armarx::armem::server::ltm::disk
memory.forEachCoreSegment([this](const auto& core)
{
util::ensureFolderExists(std::filesystem::path(path) / core.id().coreSegmentName);
CoreSegment c(std::filesystem::path(path) / core.id().coreSegmentName);
CoreSegment c((std::filesystem::path(path) / core.id().coreSegmentName), pipeline);
c.store(core);
});
}
......
......@@ -9,23 +9,25 @@
// Segmnet Type
#include "CoreSegment.h"
// Config
#include <RobotAPI/libraries/armem/server/ltm/base/filter/frequencyFilter/FrequencyFilter.h>
namespace armarx::armem::server::ltm::disk
{
/// @brief A memory storing data in mongodb (needs 'armarx memory start' to start the mongod instance)
class Memory :
public BufferedMemoryBase<CoreSegment>,
public DiskStorage
public MemoryBase<CoreSegment>,
public BufferedMemoryItem,
public DiskMemoryItem
{
public:
using Base = MemoryBase<CoreSegment>;
Memory();
Memory(const std::filesystem::path&);
void setMemoryID(const MemoryID& id) override;
void createPropertyDefinitions(PropertyDefinitionsPtr& properties, const std::string& prefix) override;
bool forEachCoreSegment(std::function<void(CoreSegment&)>&& func) const override;
std::shared_ptr<CoreSegment> findCoreSegment(const std::string&) const override;
......@@ -33,6 +35,7 @@ namespace armarx::armem::server::ltm::disk
protected:
void _loadAll(armem::wm::Memory&) override;
void _load(armem::wm::Memory&) override;
void _store(const armem::wm::Memory&) override;
void _directlyStore(const armem::wm::Memory&) override;
std::string getExpectedFolderName() const override;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment