Skip to content
Snippets Groups Projects
Commit 1fb6a6ae authored by Fabian Tërnava's avatar Fabian Tërnava
Browse files

Allow Disk LTM to either load multiple or a single stored memory

parent 4a4a9e57
No related branches found
No related tags found
No related merge requests found
Showing
with 215 additions and 153 deletions
......@@ -280,7 +280,7 @@ namespace armarx::armem::server
{
ARMARX_INFO << "The LTM returned data after query";
longtermMemory->load(ltmResult); // convert memory ==> meaning resolving references
longtermMemory->resolve(ltmResult); // convert memory ==> meaning resolving references
wmResult.append(ltmResult);
if (wmResult.empty())
......
......@@ -5,9 +5,6 @@
namespace armarx::armem::server::ltm::converter::image
{
class PngConverter;
using PngConverterPtr = std::shared_ptr<PngConverter>;
class PngConverter : public ImageConverter
{
public:
......
......@@ -35,9 +35,9 @@ namespace armarx::armem::server::ltm
void directlyStore(const armem::wm::Memory& memory)
{
TIMING_START(LTM_Memory_DirectlyStore);
for (auto& f : this->memoryFilters)
for (auto& f : this->filters->memFilters)
{
if (!f.accept(memory))
if (!f->accept(memory))
{
ARMARX_WARNING << deactivateSpam() << "Ignoring to put a Memory into the LTM because it got filtered.";
return;
......
......@@ -24,15 +24,15 @@ namespace armarx::armem::server::ltm
/// return the full sub-ltm as a wm::CoreSegment with only references
/// the ltm may be huge, use with caution
void loadAll(armem::wm::CoreSegment& coreSeg)
void loadAllReferences(armem::wm::CoreSegment& coreSeg)
{
_loadAll(coreSeg);
_loadAllReferences(coreSeg);
}
/// convert the references of the input into a wm::Memory
void load(armem::wm::CoreSegment& coreSeg)
void resolve(armem::wm::CoreSegment& coreSeg)
{
_load(coreSeg);
_resolve(coreSeg);
}
/// encode the content of a wm::Memory and store
......@@ -60,8 +60,8 @@ namespace armarx::armem::server::ltm
}
protected:
virtual void _loadAll(armem::wm::CoreSegment&) = 0;
virtual void _load(armem::wm::CoreSegment&) = 0;
virtual void _loadAllReferences(armem::wm::CoreSegment&) = 0;
virtual void _resolve(armem::wm::CoreSegment&) = 0;
virtual void _store(const armem::wm::CoreSegment&) = 0;
protected:
......
......@@ -24,15 +24,15 @@ namespace armarx::armem::server::ltm
/// return the full sub-ltm as a wm::Entity with only references
/// the ltm may be huge, use with caution
void loadAll(armem::wm::Entity& e)
void loadAllReferences(armem::wm::Entity& e)
{
_loadAll(e);
_loadAllReferences(e);
}
/// convert the references of the input into a wm::Memory
void load(armem::wm::Entity& e)
void resolve(armem::wm::Entity& e)
{
_load(e);
_resolve(e);
}
/// encode the content of a wm::Memory and store
......@@ -62,8 +62,8 @@ namespace armarx::armem::server::ltm
}
protected:
virtual void _loadAll(armem::wm::Entity&) = 0;
virtual void _load(armem::wm::Entity&) = 0;
virtual void _loadAllReferences(armem::wm::Entity&) = 0;
virtual void _resolve(armem::wm::Entity&) = 0;
virtual void _store(const armem::wm::Entity&) = 0;
protected:
......
......@@ -20,15 +20,15 @@ namespace armarx::armem::server::ltm
/// return the full sub-ltm as a wm::EntitySnapshot with only references
/// the ltm may be huge, use with caution
void loadAll(armem::wm::EntitySnapshot& e) const
void loadAllReferences(armem::wm::EntitySnapshot& e) const
{
_loadAll(e);
_loadAllReferences(e);
}
/// convert the references of the input into a wm::Memory
void load(armem::wm::EntitySnapshot& e) const
void resolve(armem::wm::EntitySnapshot& e) const
{
_load(e);
_resolve(e);
}
/// encode the content of a wm::Memory and store
......@@ -40,8 +40,8 @@ namespace armarx::armem::server::ltm
static std::string getLevelName();
protected:
virtual void _loadAll(armem::wm::EntitySnapshot&) const = 0;
virtual void _load(armem::wm::EntitySnapshot&) const = 0;
virtual void _loadAllReferences(armem::wm::EntitySnapshot&) const = 0;
virtual void _resolve(armem::wm::EntitySnapshot&) const = 0;
virtual void _store(const armem::wm::EntitySnapshot&) const = 0;
protected:
......
......@@ -19,6 +19,8 @@
#include <RobotAPI/libraries/armem/core/operations.h>
#include <RobotAPI/libraries/armem/server/wm/memory_definitions.h>
// The default filter
#include "../converter/dict/Converter.h"
namespace armarx::armem::server::ltm
{
......@@ -37,27 +39,45 @@ namespace armarx::armem::server::ltm
MemoryBase(const MemoryID& id) :
MemoryItem(id)
{
pipeline.memFreqFilter = std::make_shared<filter::MemoryFrequencyFilter>();
pipeline.snapFreqFilter = std::make_shared<filter::SnapshotFrequencyFilter>();
pipeline.imageExtractor = std::make_shared<extractor::ImageExtractor>();
pipeline.dictConverter = std::make_shared<converter::dict::JsonConverter>();
pipeline.imgConverter = std::make_shared<converter::image::PngConverter>();
}
/// return the full ltm as a wm::Memory with only references
/// the ltm may be huge, use with caution
void loadAll(armem::wm::Memory& memory)
armem::wm::Memory loadAllReferences()
{
armem::wm::Memory ret;
loadAllReferences(ret);
return ret;
}
void loadAllReferences(armem::wm::Memory& memory)
{
TIMING_START(LTM_Memory_LoadAll);
_loadAll(memory);
_loadAllReferences(memory);
TIMING_END(LTM_Memory_LoadAll);
}
/// return the full ltm as a wm::Memory and resolves the references
/// the ltm may be huge, use with caution
armem::wm::Memory loadAllAndResolve()
{
armem::wm::Memory ret;
loadAllAndResolve(ret);
return ret;
}
void loadAllAndResolve(armem::wm::Memory& memory)
{
TIMING_START(LTM_Memory_LoadAllAndResolve);
_loadAllReferences(memory);
_resolve(memory);
TIMING_END(LTM_Memory_LoadAllAndResolve);
}
/// convert the references of the input into a wm::Memory
void load(armem::wm::Memory& memory)
void resolve(armem::wm::Memory& memory)
{
TIMING_START(LTM_Memory_Load);
_load(memory);
_resolve(memory);
TIMING_END(LTM_Memory_Load);
}
......@@ -65,14 +85,15 @@ namespace armarx::armem::server::ltm
void store(const armem::wm::Memory& memory)
{
TIMING_START(LTM_Memory_Append);
if (pipeline.memFreqFilter->accept(memory))
{
_store(memory);
}
else
for (auto& f : filters->memFilters)
{
ARMARX_WARNING << deactivateSpam() << "Ignoring to put a Memory into the LTM because it got filtered.";
if (!f->accept(memory))
{
ARMARX_WARNING << deactivateSpam() << "Ignoring to put a Memory into the LTM because it got filtered.";
return;
}
}
_store(memory);
TIMING_END(LTM_Memory_Append);
}
......@@ -84,6 +105,27 @@ namespace armarx::armem::server::ltm
this->store(memory);
}
void init()
{
// setup the filters
if (mFreqFilterEnabled)
{
auto mFreqFilter = std::make_unique<filter::MemoryFrequencyFilter>();
mFreqFilter->waitingTimeInMs = mFreqFilterWaitingTime;
filters->memFilters.push_back(std::move(mFreqFilter));
}
if (snapFreqFilterEnabled)
{
auto snapFreqFilter = std::make_unique<filter::SnapshotFrequencyFilter>();
snapFreqFilter->waitingTimeInMs = snapFreqFilterEnabled;
filters->snapFilters.push_back(std::move(snapFreqFilter));
}
// setup converters
filters->converters[aron::type::Descriptor::eObject] = std::make_unique<converter::dict::JsonConverter>();
}
/// iterate over all core segments of this ltm
virtual bool forEachCoreSegment(std::function<void(CoreSegmentT&)>&& func) const = 0;
......@@ -93,9 +135,10 @@ namespace armarx::armem::server::ltm
/// parameters
virtual void createPropertyDefinitions(PropertyDefinitionsPtr& defs, const std::string& prefix)
{
defs->optional(pipeline.memFreqFilter->waitingTimeInMs, prefix + "memFreqFilter.WaitingTime", "Withdraw time in MS after each LTM update.");
defs->optional(pipeline.snapFreqFilter->waitingTimeInMs, prefix + "memSnapFilter.WaitingTime", "Withdraw time in MS after each Entity update.");
defs->optional(pipeline.snapFreqFilter->waitingTimeInMs, prefix + "memSnapFilter.WaitingTime", "Withdraw time in MS after each Entity update.");
defs->optional(mFreqFilterEnabled, prefix + "memFreqFilter.Enabled");
defs->optional(mFreqFilterWaitingTime, prefix + "memFreqFilter.WaitingTime", "Waiting time in MS after each LTM update.");
defs->optional(snapFreqFilterEnabled, prefix + "memFreqFilter.Enabled");
defs->optional(mFreqFilterWaitingTime, prefix + "memSnapFilter.WaitingTime", "Waiting time in MS after each Entity update.");
}
/// get level name
......@@ -105,8 +148,8 @@ namespace armarx::armem::server::ltm
}
protected:
virtual void _loadAll(armem::wm::Memory& memory) = 0;
virtual void _load(armem::wm::Memory& memory) = 0;
virtual void _loadAllReferences(armem::wm::Memory& memory) = 0;
virtual void _resolve(armem::wm::Memory& memory) = 0;
virtual void _store(const armem::wm::Memory& memory) = 0;
public:
......@@ -114,5 +157,11 @@ namespace armarx::armem::server::ltm
protected:
mutable std::recursive_mutex ltm_mutex;
private:
bool mFreqFilterEnabled = false;
long mFreqFilterWaitingTime = -1;
bool snapFreqFilterEnabled = false;
long snapFreqFilterWaitingTime = -1;
};
} // namespace armarx::armem::server::ltm
......@@ -8,14 +8,15 @@
namespace armarx::armem::server::ltm
{
MemoryItem::MemoryItem(const MemoryID& id) :
_id(id)
MemoryItem(id, std::make_shared<FilterCollection>())
{
}
MemoryItem::MemoryItem(const MemoryID& id, const LongTermMemoryPipeline& p) :
pipeline(p),
MemoryItem::MemoryItem(const MemoryID& id, const std::shared_ptr<FilterCollection>& p) :
filters(p),
_id(id)
{
ARMARX_CHECK_NOT_NULL(p); // There must be a filter stack.
}
void MemoryItem::setMemoryID(const MemoryID& id)
......
......@@ -14,36 +14,28 @@
namespace armarx::armem::server::ltm
{
/// all necessary classes to convert an entry of the ltm to some other format(s)
struct LongTermMemoryPipeline
/// all necessary classes to filter and convert an entry of the ltm to some other format(s)
struct FilterCollection
{
// Memory Filters
/// filter based on update frequency (removes full updates)
filter::MemoryFrequencyFilterPtr memFreqFilter;
std::vector<std::unique_ptr<MemoryFilter>> memFilters;
// Snapshot filters
/// filter based on latest snapshot stored (per entity)
filter::SnapshotFrequencyFilterPtr snapFreqFilter;
std::vector<std::unique_ptr<SnapshotFilter>> snapFilters;
// Extractors
/// Extract NDArrays with 2 dims and at least 10 values (Images)
extractor::ImageExtractorPtr imageExtractor;
std::map<aron::data::Descriptor, std::unique_ptr<Extractor>> extractors;
// Converters
/// Convert dicts (everything that has not been extracted) to JSON
converter::dict::JsonConverterPtr dictConverter;
/// Convert images to png
converter::image::PngConverterPtr imgConverter;
std::map<aron::type::Descriptor, std::unique_ptr<Converter>> converters;
};
/// @brief Interface functions for the longterm memory classes
class MemoryItem
{
public:
MemoryItem() = default;
MemoryItem(const MemoryID&);
MemoryItem(const MemoryID&, const LongTermMemoryPipeline&);
MemoryItem(const MemoryID&, const std::shared_ptr<FilterCollection>&);
virtual ~MemoryItem() = default;
MemoryID id() const;
......@@ -52,7 +44,7 @@ namespace armarx::armem::server::ltm
virtual void setMemoryID(const MemoryID&);
protected:
LongTermMemoryPipeline pipeline;
std::shared_ptr<FilterCollection> filters;
private:
MemoryID _id;
......
......@@ -24,15 +24,15 @@ namespace armarx::armem::server::ltm
/// return the full sub-ltm as a wm::ProviderSegment with only references
/// the ltm may be huge, use with caution
void loadAll(armem::wm::ProviderSegment& provSeg)
void loadAllReferences(armem::wm::ProviderSegment& provSeg)
{
_loadAll(provSeg);
_loadAllReferences(provSeg);
}
/// convert the references of the input into a wm::Memory
void load(armem::wm::ProviderSegment& provSeg)
void resolve(armem::wm::ProviderSegment& provSeg)
{
_load(provSeg);
_resolve(provSeg);
}
/// encode the content of a wm::Memory and store
......@@ -58,8 +58,8 @@ namespace armarx::armem::server::ltm
}
protected:
virtual void _loadAll(armem::wm::ProviderSegment&) = 0;
virtual void _load(armem::wm::ProviderSegment&) = 0;
virtual void _loadAllReferences(armem::wm::ProviderSegment&) = 0;
virtual void _resolve(armem::wm::ProviderSegment&) = 0;
virtual void _store(const armem::wm::ProviderSegment&) = 0;
protected:
......
......@@ -21,7 +21,7 @@ namespace armarx::armem::server::ltm::disk
}
}
CoreSegment::CoreSegment(const std::filesystem::path& p, const LongTermMemoryPipeline& pipe) :
CoreSegment::CoreSegment(const std::filesystem::path& p, const std::shared_ptr<FilterCollection>& pipe) :
CoreSegmentBase(getMemoryIDFromPath(p), pipe),
DiskMemoryItem(p.parent_path())
{
......@@ -41,7 +41,7 @@ namespace armarx::armem::server::ltm::disk
for (const auto& subdir : std::filesystem::directory_iterator(p))
{
std::filesystem::path subdirPath = subdir.path();
ProviderSegment c(subdirPath, pipeline);
ProviderSegment c(subdirPath, filters);
func(c);
}
return true;
......@@ -60,7 +60,7 @@ namespace armarx::armem::server::ltm::disk
std::filesystem::path subpath = p / n;
util::ensureFolderExists(subpath, false);
auto c = std::make_shared<ProviderSegment>(subpath, pipeline);
auto c = std::make_shared<ProviderSegment>(subpath, filters);
return c;
}
......@@ -69,24 +69,24 @@ namespace armarx::armem::server::ltm::disk
return name();
}
void CoreSegment::_loadAll(armem::wm::CoreSegment& e)
void CoreSegment::_loadAllReferences(armem::wm::CoreSegment& e)
{
e.id() = id();
forEachProviderSegment([&e](ProviderSegment& x) {
armem::wm::ProviderSegment s;
x.loadAll(s);
x.loadAllReferences(s);
e.addProviderSegment(s);
});
}
void CoreSegment::_load(armem::wm::CoreSegment& c)
void CoreSegment::_resolve(armem::wm::CoreSegment& c)
{
c.forEachProviderSegment([this](armem::wm::ProviderSegment& e)
{
util::ensureFolderExists(std::filesystem::path(path) / id().coreSegmentName / e.id().providerSegmentName, false);
ProviderSegment c((std::filesystem::path(path) / id().coreSegmentName / e.id().providerSegmentName), pipeline);
c.load(e);
ProviderSegment c((std::filesystem::path(path) / id().coreSegmentName / e.id().providerSegmentName), filters);
c.resolve(e);
});
}
......@@ -95,7 +95,7 @@ namespace armarx::armem::server::ltm::disk
c.forEachProviderSegment([this](const auto& provSegment)
{
util::ensureFolderExists(std::filesystem::path(path) / id().coreSegmentName / provSegment.id().providerSegmentName);
ProviderSegment c((std::filesystem::path(path) / id().coreSegmentName / provSegment.id().providerSegmentName), pipeline);
ProviderSegment c((std::filesystem::path(path) / id().coreSegmentName / provSegment.id().providerSegmentName), filters);
c.store(provSegment);
});
}
......
......@@ -16,15 +16,15 @@ namespace armarx::armem::server::ltm::disk
{
public:
CoreSegment(const std::filesystem::path&, const LongTermMemoryPipeline& p);
CoreSegment(const std::filesystem::path&, const std::shared_ptr<FilterCollection>& p);
bool forEachProviderSegment(std::function<void(ProviderSegment&)>&& func) const override;
std::shared_ptr<ProviderSegment> findProviderSegment(const std::string&) const override;
protected:
void _loadAll(armem::wm::CoreSegment&) override;
void _load(armem::wm::CoreSegment&) override;
void _loadAllReferences(armem::wm::CoreSegment&) override;
void _resolve(armem::wm::CoreSegment&) override;
void _store(const armem::wm::CoreSegment&) override;
std::string getExpectedFolderName() const override;
......
......@@ -28,7 +28,7 @@ namespace armarx::armem::server::ltm::disk
}
}
Entity::Entity(const std::filesystem::path& p, const LongTermMemoryPipeline& pipe) :
Entity::Entity(const std::filesystem::path& p, const std::shared_ptr<FilterCollection>& pipe) :
EntityBase(getMemoryIDFromPath(p), pipe),
DiskMemoryItem(p.parent_path())
{
......@@ -53,7 +53,7 @@ namespace armarx::armem::server::ltm::disk
for (const auto& subdir : std::filesystem::directory_iterator(p))
{
std::filesystem::path subdirPath = subdir.path();
EntitySnapshot c(subdirPath, pipeline);
EntitySnapshot c(subdirPath, filters);
func(c);
}
return true;
......@@ -92,7 +92,7 @@ namespace armarx::armem::server::ltm::disk
std::filesystem::path subpath = p / std::to_string(n.toMicroSeconds());
util::ensureFolderExists(subpath, false);
auto c = std::make_shared<EntitySnapshot>(subpath, pipeline);
auto c = std::make_shared<EntitySnapshot>(subpath, filters);
return c;
}
......@@ -121,24 +121,24 @@ namespace armarx::armem::server::ltm::disk
return {};
}
void Entity::_loadAll(armem::wm::Entity& e)
void Entity::_loadAllReferences(armem::wm::Entity& e)
{
e.id() = id();
forEachSnapshot([&e](EntitySnapshotBase& x) {
armem::wm::EntitySnapshot s;
x.loadAll(s);
x.loadAllReferences(s);
e.addSnapshot(s);
});
}
void Entity::_load(armem::wm::Entity& p)
void Entity::_resolve(armem::wm::Entity& p)
{
p.forEachSnapshot([this](armem::wm::EntitySnapshot& e)
{
util::ensureFolderExists(std::filesystem::path(path) / id().entityName / std::to_string(e.id().timestamp.toMicroSeconds()), false);
EntitySnapshot c((std::filesystem::path(path) / id().entityName / std::to_string(e.id().timestamp.toMicroSeconds())), pipeline);
c.load(e);
EntitySnapshot c((std::filesystem::path(path) / id().entityName / std::to_string(e.id().timestamp.toMicroSeconds())), filters);
c.resolve(e);
});
}
......@@ -146,14 +146,17 @@ namespace armarx::armem::server::ltm::disk
{
entity.forEachSnapshot([this](armem::wm::EntitySnapshot& e)
{
if (!pipeline.snapFreqFilter->accept(e))
for (auto& f : filters->snapFilters)
{
ARMARX_WARNING << deactivateSpam() << "Ignoring to put an EntitiySnapshot into the LTM because it got filtered.";
return;
if (!f->accept(e))
{
ARMARX_WARNING << deactivateSpam() << "Ignoring to put an EntitiySnapshot into the LTM because it got filtered.";
return;
}
}
util::ensureFolderExists(std::filesystem::path(path) / id().entityName / std::to_string(e.id().timestamp.toMicroSeconds()));
EntitySnapshot c((std::filesystem::path(path) / id().entityName / std::to_string(e.id().timestamp.toMicroSeconds())), pipeline);
EntitySnapshot c((std::filesystem::path(path) / id().entityName / std::to_string(e.id().timestamp.toMicroSeconds())), filters);
c.store(e);
});
}
......
......@@ -16,7 +16,7 @@ namespace armarx::armem::server::ltm::disk
public DiskMemoryItem
{
public:
Entity(const std::filesystem::path&, const LongTermMemoryPipeline& p);
Entity(const std::filesystem::path&, const std::shared_ptr<FilterCollection>& p);
bool forEachSnapshot(std::function<void(EntitySnapshot&)>&& func) const override;
bool forEachSnapshotInIndexRange(long first, long last, std::function<void(EntitySnapshot&)>&& func) const override;
......@@ -32,8 +32,8 @@ namespace armarx::armem::server::ltm::disk
std::shared_ptr<EntitySnapshot> findFirstSnapshotAfterOrAt(const Time& time) const override;
protected:
void _loadAll(armem::wm::Entity&) override;
void _load(armem::wm::Entity&) override;
void _loadAllReferences(armem::wm::Entity&) override;
void _resolve(armem::wm::Entity&) override;
void _store(const armem::wm::Entity&) override;
std::string getExpectedFolderName() const override;
......
......@@ -44,7 +44,7 @@ namespace armarx::armem::server::ltm::disk
}
}
EntitySnapshot::EntitySnapshot(const std::filesystem::path& p, const LongTermMemoryPipeline& pipe) :
EntitySnapshot::EntitySnapshot(const std::filesystem::path& p, const std::shared_ptr<FilterCollection>& pipe) :
EntitySnapshotBase(getMemoryIDFromPath(p), pipe),
DiskMemoryItem(p.parent_path())
{
......@@ -56,7 +56,7 @@ namespace armarx::armem::server::ltm::disk
return name();
}
void EntitySnapshot::_loadAll(armem::wm::EntitySnapshot& e) const
void EntitySnapshot::_loadAllReferences(armem::wm::EntitySnapshot& e) const
{
std::filesystem::path p = path;
p = p / id().timestampStr();
......@@ -76,8 +76,11 @@ namespace armarx::armem::server::ltm::disk
}
}
void EntitySnapshot::_load(armem::wm::EntitySnapshot& e) const
void EntitySnapshot::_resolve(armem::wm::EntitySnapshot& e) const
{
ARMARX_IMPORTANT << __PRETTY_FUNCTION__;
auto& dictConverter = filters->converters.at(aron::type::Descriptor::eObject);
// Get data from disk
std::filesystem::path p = path;
p = p / id().timestampStr();
......@@ -87,41 +90,51 @@ namespace armarx::armem::server::ltm::disk
{
util::ensureFolderExists(p / std::to_string(i), false);
std::filesystem::path data = p / std::to_string(i) / (constantes::DATA_FILENAME + pipeline.dictConverter->suffix);
std::filesystem::path metadata = p / std::to_string(i) / (constantes::METADATA_FILENAME + pipeline.dictConverter->suffix);
std::filesystem::path data = p / std::to_string(i) / (constantes::DATA_FILENAME + dictConverter->suffix);
std::filesystem::path metadata = p / std::to_string(i) / (constantes::METADATA_FILENAME + dictConverter->suffix);
util::ensureFileExists(data);
auto& ins = e.getInstance(i);
auto datafilecontent = readDataFromFile(data);
auto dataaron = pipeline.dictConverter->convert(datafilecontent);
auto dataaron = dictConverter->convert(datafilecontent);
auto datadict = aron::data::Dict::DynamicCastAndCheck(dataaron);
// check for special members
for (const auto& [key, m] : dataaron->getElements())
for (const auto& [key, m] : datadict->getElements())
{
if (!m)
for (auto& [t, f] : filters->converters)
{
// check for image
std::filesystem::path member = p / std::to_string(i) / (key + pipeline.imgConverter->suffix);
if (t == aron::type::Descriptor::eObject)
{
continue;
}
std::filesystem::path member = p / std::to_string(i) / (key + f->suffix);
if (std::filesystem::exists(member) && std::filesystem::is_regular_file(member))
{
auto memberfilecontent = readDataFromFile(member);
auto memberaron = pipeline.imgConverter->convert(memberfilecontent);
dataaron->setElement(key, memberaron);
auto memberaron = f->convert(memberfilecontent);
datadict->setElement(key, memberaron);
}
}
}
auto metadatafilecontent = readDataFromFile(metadata);
auto metadataaron = pipeline.dictConverter->convert(metadatafilecontent);
auto metadataaron = dictConverter->convert(metadatafilecontent);
auto metadatadict = aron::data::Dict::DynamicCastAndCheck(metadataaron);
from_aron(metadataaron, dataaron, ins);
from_aron(metadatadict, datadict, ins);
}
}
void EntitySnapshot::_store(const armem::wm::EntitySnapshot& e) const
{
ARMARX_IMPORTANT << __PRETTY_FUNCTION__;
auto& dictConverter = filters->converters.at(aron::type::Descriptor::eObject);
std::filesystem::path p = path;
p = p / id().timestampStr();
util::ensureFolderExists(p);
......@@ -131,12 +144,12 @@ namespace armarx::armem::server::ltm::disk
std::filesystem::path instancePath = p / std::to_string(i);
util::ensureFolderExists(instancePath);
std::filesystem::path dataPath = instancePath / (constantes::DATA_FILENAME + pipeline.dictConverter->suffix);
std::filesystem::path metadataPath = instancePath / (constantes::METADATA_FILENAME + pipeline.dictConverter->suffix);
std::filesystem::path dataPath = instancePath / (constantes::DATA_FILENAME + dictConverter->suffix);
std::filesystem::path metadataPath = instancePath / (constantes::METADATA_FILENAME + dictConverter->suffix);
if (util::checkIfFileExists(dataPath) or util::checkIfFileExists(metadataPath))
{
continue;
continue; // we ignore if file already exists
}
auto& ins = e.getInstance(i);
......@@ -146,23 +159,30 @@ namespace armarx::armem::server::ltm::disk
auto metadataAron = std::make_shared<aron::data::Dict>();
to_aron(metadataAron, dataAron, ins);
// extract
auto dataExt = pipeline.imageExtractor->extract(dataAron);
auto metadataExt = pipeline.imageExtractor->extract(metadataAron); // useless
// convert images
for (const auto& [memberName, var] : dataExt.extraction)
// check special members for special extractions
for (auto& [t, x] : filters->extractors)
{
ARMARX_CHECK_NOT_NULL(var);
auto img = aron::data::NDArray::DynamicCastAndCheck(var);
std::filesystem::path imgPath = instancePath / (memberName + pipeline.imgConverter->suffix);
auto imgVec = pipeline.imgConverter->convert(img);
writeDataToFile(imgPath, imgVec);
if (auto it = filters->converters.find(aron::data::defaultconversion::Data2TypeDescriptor.at(t)); it != filters->converters.end())
{
auto& conv = it->second;
auto dataExt = x->extract(dataAron);
for (const auto& [memberName, var] : dataExt.extraction)
{
ARMARX_CHECK_NOT_NULL(var);
std::filesystem::path extPath = instancePath / (memberName + conv->suffix);
auto extVec = conv->convert(var);
writeDataToFile(extPath, extVec);
}
dataAron = dataExt.dataWithoutExtraction;
}
// else we could not convert the extracted data so it makes no sense to extract it at all...
}
// convert dict and metadata
auto dataVec = pipeline.dictConverter->convert(dataExt.dataWithoutExtraction);
auto metadataVec = pipeline.dictConverter->convert(metadataExt.dataWithoutExtraction);
auto dataVec = dictConverter->convert(dataAron);
auto metadataVec = dictConverter->convert(metadataAron);
writeDataToFile(dataPath, dataVec);
writeDataToFile(metadataPath, metadataVec);
}
......
......@@ -14,11 +14,11 @@ namespace armarx::armem::server::ltm::disk
public DiskMemoryItem
{
public:
EntitySnapshot(const std::filesystem::path&, const LongTermMemoryPipeline& p);
EntitySnapshot(const std::filesystem::path&, const std::shared_ptr<FilterCollection>& p);
protected:
void _loadAll(armem::wm::EntitySnapshot&) const override;
void _load(armem::wm::EntitySnapshot&) const override;
void _loadAllReferences(armem::wm::EntitySnapshot&) const override;
void _resolve(armem::wm::EntitySnapshot&) const override;
void _store(const armem::wm::EntitySnapshot&) const override;
std::string getExpectedFolderName() const override;
......
......@@ -7,7 +7,6 @@
// ArmarX
#include "../base/filter/frequencyFilter/FrequencyFilter.h"
#include "../base/extractor/noExtractor/NoExtractor.h"
#include "../base/extractor/imageExtractor/ImageExtractor.h"
#include "../base/converter/dict/json/JsonConverter.h"
#include "../base/converter/dict/bson/BsonConverter.h"
......@@ -60,7 +59,8 @@ namespace armarx::armem::server::ltm::disk
ARMARX_CHECK_NOT_EMPTY(id.memoryName);
Base::setMemoryID(id.getMemoryID());
buffer.id() = id.getMemoryID();
buffer->id() = id.getMemoryID();
to_store->id() = id.getMemoryID();
}
std::string Memory::getExpectedFolderName() const
......@@ -82,7 +82,7 @@ namespace armarx::armem::server::ltm::disk
for (const auto& subdir : std::filesystem::directory_iterator(p))
{
std::filesystem::path subdirPath = subdir.path();
CoreSegment c(subdirPath, pipeline);
CoreSegment c(subdirPath, filters);
func(c);
}
return true;
......@@ -101,22 +101,22 @@ namespace armarx::armem::server::ltm::disk
std::filesystem::path subpath = p / n;
util::ensureFolderExists(subpath, false);
auto c = std::make_shared<CoreSegment>(subpath, pipeline);
auto c = std::make_shared<CoreSegment>(subpath, filters);
return c;
}
void Memory::_loadAll(armem::wm::Memory& m)
void Memory::_loadAllReferences(armem::wm::Memory& m)
{
m.id() = id();
forEachCoreSegment([&m](CoreSegment& x) {
armem::wm::CoreSegment s;
x.loadAll(s);
x.loadAllReferences(s);
m.addCoreSegment(s);
});
}
void Memory::_load(armem::wm::Memory& m)
void Memory::_resolve(armem::wm::Memory& m)
{
if (!checkPath(id().memoryName))
{
......@@ -127,8 +127,8 @@ namespace armarx::armem::server::ltm::disk
m.forEachCoreSegment([this](armem::wm::CoreSegment& e)
{
util::ensureFolderExists(std::filesystem::path(path) / id().memoryName / e.id().coreSegmentName, false);
CoreSegment c((std::filesystem::path(path) / id().memoryName / e.id().coreSegmentName), pipeline);
c.load(e);
CoreSegment c((std::filesystem::path(path) / id().memoryName / e.id().coreSegmentName), filters);
c.resolve(e);
});
}
......@@ -144,7 +144,7 @@ namespace armarx::armem::server::ltm::disk
memory.forEachCoreSegment([this](const auto& core)
{
util::ensureFolderExists(std::filesystem::path(path) / id().memoryName / core.id().coreSegmentName);
CoreSegment c((std::filesystem::path(path) / id().memoryName / core.id().coreSegmentName), pipeline);
CoreSegment c((std::filesystem::path(path) / id().memoryName / core.id().coreSegmentName), filters);
c.store(core);
});
}
......
......@@ -33,8 +33,8 @@ namespace armarx::armem::server::ltm::disk
void setPath(const std::string&) override;
protected:
void _loadAll(armem::wm::Memory&) override;
void _load(armem::wm::Memory&) override;
void _loadAllReferences(armem::wm::Memory&) override;
void _resolve(armem::wm::Memory&) override;
void _directlyStore(const armem::wm::Memory&) override;
std::string getExpectedFolderName() const override;
......
......@@ -24,7 +24,7 @@ namespace armarx::armem::server::ltm::disk
}
}
ProviderSegment::ProviderSegment(const std::filesystem::path& p, const LongTermMemoryPipeline& pipe) :
ProviderSegment::ProviderSegment(const std::filesystem::path& p, const std::shared_ptr<FilterCollection>& pipe) :
ProviderSegmentBase(getMemoryIDFromPath(p), pipe),
DiskMemoryItem(p.parent_path())
{
......@@ -44,7 +44,7 @@ namespace armarx::armem::server::ltm::disk
for (const auto& subdir : std::filesystem::directory_iterator(p))
{
std::filesystem::path subdirPath = subdir.path();
Entity c(subdirPath, pipeline);
Entity c(subdirPath, filters);
func(c);
}
return true;
......@@ -63,7 +63,7 @@ namespace armarx::armem::server::ltm::disk
std::filesystem::path subpath = p / n;
util::ensureFolderExists(subpath, false);
auto c = std::make_shared<Entity>(subpath, pipeline);
auto c = std::make_shared<Entity>(subpath, filters);
return c;
}
......@@ -72,24 +72,24 @@ namespace armarx::armem::server::ltm::disk
return name();
}
void ProviderSegment::_loadAll(armem::wm::ProviderSegment& e)
void ProviderSegment::_loadAllReferences(armem::wm::ProviderSegment& e)
{
e.id() = id();
forEachEntity([&e](Entity& x) {
armem::wm::Entity s;
x.loadAll(s);
x.loadAllReferences(s);
e.addEntity(s);
});
}
void ProviderSegment::_load(armem::wm::ProviderSegment& p)
void ProviderSegment::_resolve(armem::wm::ProviderSegment& p)
{
p.forEachEntity([this](armem::wm::Entity& e)
{
util::ensureFolderExists(std::filesystem::path(path) / id().providerSegmentName / e.id().entityName, false);
Entity c((std::filesystem::path(path) / id().providerSegmentName / e.id().entityName), pipeline);
c.load(e);
Entity c((std::filesystem::path(path) / id().providerSegmentName / e.id().entityName), filters);
c.resolve(e);
});
}
......@@ -98,7 +98,7 @@ namespace armarx::armem::server::ltm::disk
providerSegment.forEachEntity([this](const auto& entity)
{
util::ensureFolderExists(std::filesystem::path(path) / id().providerSegmentName / entity.id().entityName);
Entity c((std::filesystem::path(path) / id().providerSegmentName / entity.id().entityName), pipeline);
Entity c((std::filesystem::path(path) / id().providerSegmentName / entity.id().entityName), filters);
c.store(entity);
});
}
......
......@@ -15,15 +15,15 @@ namespace armarx::armem::server::ltm::disk
public DiskMemoryItem
{
public:
ProviderSegment(const std::filesystem::path&, const LongTermMemoryPipeline& p);
ProviderSegment(const std::filesystem::path&, const std::shared_ptr<FilterCollection>& p);
bool forEachEntity(std::function<void(Entity&)>&& func) const override;
std::shared_ptr<Entity> findEntity(const std::string&) const override;
protected:
void _loadAll(armem::wm::ProviderSegment&) override;
void _load(armem::wm::ProviderSegment&) override;
void _loadAllReferences(armem::wm::ProviderSegment&) override;
void _resolve(armem::wm::ProviderSegment&) override;
void _store(const armem::wm::ProviderSegment&) override;
std::string getExpectedFolderName() const override;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment