Skip to content
Snippets Groups Projects
Commit 0168652f authored by Fabian Tërnava's avatar Fabian Tërnava
Browse files

moved ltm pipeline in seperate struct

parent b103038c
No related branches found
No related tags found
No related merge requests found
Showing
with 144 additions and 79 deletions
......@@ -9,10 +9,6 @@
#include <RobotAPI/libraries/armem/core/wm/memory_definitions.h>
#include <RobotAPI/libraries/armem/core/wm/aron_conversions.h>
#include "../extractor/Extractor.h"
#include "../converter/dict/Converter.h"
#include "../converter/image/Converter.h"
namespace armarx::armem::server::ltm
{
/// @brief Interface functions for the longterm memory classes
......@@ -50,9 +46,5 @@ namespace armarx::armem::server::ltm
protected:
mutable std::recursive_mutex ltm_mutex;
DictConverterPtr dictConverter;
ImageConverterPtr imgConverter;
ExtractorPtr extractor;
};
} // namespace armarx::armem::server::ltm
......@@ -7,7 +7,14 @@
namespace armarx::armem::server::ltm
{
MemoryItem::MemoryItem(const MemoryID& id) : _id(id)
MemoryItem::MemoryItem(const MemoryID& id) :
_id(id)
{
}
MemoryItem::MemoryItem(const MemoryID& id, const LongTermMemoryPipeline& p) :
pipeline(p),
_id(id)
{
}
......
......@@ -5,16 +5,31 @@
#include <optional>
#include <string>
#include "../filter/Filter.h"
#include "../extractor/Extractor.h"
#include "../converter/dict/Converter.h"
#include "../converter/image/Converter.h"
#include <RobotAPI/libraries/armem/core/MemoryID.h>
namespace armarx::armem::server::ltm
{
struct LongTermMemoryPipeline
{
MemoryFilterPtr memFilter;
SnapshotFilterPtr snapFilter;
DictConverterPtr dictConverter;
ImageConverterPtr imgConverter;
ExtractorPtr extractor;
};
/// @brief Interface functions for the longterm memory classes
class MemoryItem
{
public:
MemoryItem() = default;
MemoryItem(const MemoryID&);
MemoryItem(const MemoryID&, const LongTermMemoryPipeline&);
virtual ~MemoryItem() = default;
MemoryID id() const;
......@@ -22,6 +37,9 @@ namespace armarx::armem::server::ltm
virtual void setMemoryID(const MemoryID&);
protected:
LongTermMemoryPipeline pipeline;
private:
MemoryID _id;
};
......
......@@ -9,15 +9,27 @@
namespace armarx::armem::server::ltm
{
class Filter;
typedef std::shared_ptr<Filter> FilterPtr;
class SnapshotFilter;
typedef std::shared_ptr<SnapshotFilter> SnapshotFilterPtr;
class Filter
class MemoryFilter;
typedef std::shared_ptr<MemoryFilter> MemoryFilterPtr;
class MemoryFilter
{
public:
MemoryFilter() = default;
virtual ~MemoryFilter() = default;
virtual bool accept(const armem::wm::Memory& e) = 0;
};
class SnapshotFilter
{
public:
Filter() = default;
virtual ~Filter() = default;
SnapshotFilter() = default;
virtual ~SnapshotFilter() = default;
virtual bool accept(const armem::wm::Memory& memory) = 0;
virtual bool accept(const armem::wm::EntitySnapshot& e) = 0;
};
}
......@@ -4,9 +4,9 @@
namespace armarx::armem::server::ltm::filter
{
bool FrequencyFilter::accept(const armem::wm::Memory&)
bool MemoryFrequencyFilter::accept(const armem::wm::Memory& e)
{
auto now = IceUtil::Time::now().toMilliSeconds();
auto now = armem::Time::now().toMilliSeconds();
if (waitingTimeInMs < 0 || (now - timestampLastCommitInMs) > waitingTimeInMs)
{
timestampLastCommitInMs = now;
......@@ -14,4 +14,26 @@ namespace armarx::armem::server::ltm::filter
}
return false;
}
bool SnapshotFrequencyFilter::accept(const armem::wm::EntitySnapshot& e)
{
auto entityID = e.id().getEntityID();
auto genMs = e.time().toMilliSeconds();
long lastMs = 0;
if (timestampLastCommitInMs.count(entityID) > 0)
{
lastMs = timestampLastCommitInMs.at(entityID);
}
if (waitingTimeInMs < 0 || (genMs - lastMs) > waitingTimeInMs)
{
/*std::cout << "diff: " << (dataGeneratedInMs - timestampLastCommitInMs) << std::endl;
std::cout << "gen: " << (dataGeneratedInMs) << std::endl;
std::cout << "last: " << (timestampLastCommitInMs) << std::endl;*/
timestampLastCommitInMs[entityID] = genMs;
return true;
}
return false;
}
}
......@@ -5,19 +5,35 @@
namespace armarx::armem::server::ltm::filter
{
class FrequencyFilter;
typedef std::shared_ptr<FrequencyFilter> FrequencyFilterPtr;
class SnapshotFrequencyFilter;
typedef std::shared_ptr<SnapshotFrequencyFilter> SnapshotFrequencyFilterPtr;
class FrequencyFilter :
public Filter
class MemoryFrequencyFilter;
typedef std::shared_ptr<MemoryFrequencyFilter> MemoryFrequencyFilterPtr;
class MemoryFrequencyFilter :
public MemoryFilter
{
public:
FrequencyFilter() = default;
MemoryFrequencyFilter() = default;
virtual bool accept(const armem::wm::Memory& memory) override;
virtual bool accept(const armem::wm::Memory& e) override;
private:
int waitingTimeInMs = 10;
int waitingTimeInMs = -1;
long timestampLastCommitInMs = 0;
};
class SnapshotFrequencyFilter :
public SnapshotFilter
{
public:
SnapshotFrequencyFilter() = default;
virtual bool accept(const armem::wm::EntitySnapshot& e) override;
private:
int waitingTimeInMs = 1000;
std::unordered_map<MemoryID, long> timestampLastCommitInMs;
};
}
......@@ -21,9 +21,9 @@ namespace armarx::armem::server::ltm::disk
}
}
CoreSegment::CoreSegment(const std::filesystem::path& p) :
CoreSegmentBase(getMemoryIDFromPath(p)),
DiskStorage(p)
CoreSegment::CoreSegment(const std::filesystem::path& p, const LongTermMemoryPipeline& pipe) :
CoreSegmentBase(getMemoryIDFromPath(p), pipe),
DiskMemoryItem(p)
{
}
......@@ -40,7 +40,7 @@ namespace armarx::armem::server::ltm::disk
for (const auto& subdir : std::filesystem::directory_iterator(p))
{
std::filesystem::path subdirPath = subdir.path();
ProviderSegment c(subdirPath);
ProviderSegment c(subdirPath, pipeline);
func(c);
}
return true;
......@@ -58,7 +58,7 @@ namespace armarx::armem::server::ltm::disk
std::filesystem::path subpath = p / n;
util::ensureFolderExists(subpath, false);
auto c = std::make_shared<ProviderSegment>(subpath);
auto c = std::make_shared<ProviderSegment>(subpath, pipeline);
return c;
}
......@@ -83,7 +83,7 @@ namespace armarx::armem::server::ltm::disk
c.forEachProviderSegment([this](armem::wm::ProviderSegment& e)
{
util::ensureFolderExists(std::filesystem::path(path) / e.id().providerSegmentName, false);
ProviderSegment c(std::filesystem::path(path) / e.id().providerSegmentName);
ProviderSegment c((std::filesystem::path(path) / e.id().providerSegmentName), pipeline);
c.load(e);
});
}
......@@ -93,7 +93,7 @@ namespace armarx::armem::server::ltm::disk
c.forEachProviderSegment([this](const auto& provSegment)
{
util::ensureFolderExists(std::filesystem::path(path) / provSegment.id().providerSegmentName);
ProviderSegment c(std::filesystem::path(path) / provSegment.id().providerSegmentName);
ProviderSegment c((std::filesystem::path(path) / provSegment.id().providerSegmentName), pipeline);
c.store(provSegment);
});
}
......
......@@ -12,11 +12,11 @@ namespace armarx::armem::server::ltm::disk
{
class CoreSegment :
public CoreSegmentBase<ProviderSegment>,
public DiskStorage
public DiskMemoryItem
{
public:
CoreSegment(const std::filesystem::path&);
CoreSegment(const std::filesystem::path&, const LongTermMemoryPipeline& p);
bool forEachProviderSegment(std::function<void(ProviderSegment&)>&& func) const override;
......
......@@ -7,6 +7,8 @@
#include <RobotAPI/libraries/armem/server/wm/memory_definitions.h>
#include "../base/filter/frequencyFilter/FrequencyFilter.h"
namespace armarx::armem::server::ltm::disk
{
......@@ -26,9 +28,9 @@ namespace armarx::armem::server::ltm::disk
}
}
Entity::Entity(const std::filesystem::path& p) :
EntityBase(getMemoryIDFromPath(p)),
DiskStorage(p)
Entity::Entity(const std::filesystem::path& p, const LongTermMemoryPipeline& pipe) :
EntityBase(getMemoryIDFromPath(p), pipe),
DiskMemoryItem(p)
{
}
......@@ -50,7 +52,7 @@ namespace armarx::armem::server::ltm::disk
for (const auto& subdir : std::filesystem::directory_iterator(p))
{
std::filesystem::path subdirPath = subdir.path();
EntitySnapshot c(subdirPath);
EntitySnapshot c(subdirPath, pipeline);
func(c);
}
return true;
......@@ -88,7 +90,7 @@ namespace armarx::armem::server::ltm::disk
std::filesystem::path subpath = p / std::to_string(n.toMicroSeconds());
util::ensureFolderExists(subpath, false);
auto c = std::make_shared<EntitySnapshot>(subpath);
auto c = std::make_shared<EntitySnapshot>(subpath, pipeline);
return c;
}
......@@ -133,7 +135,7 @@ namespace armarx::armem::server::ltm::disk
p.forEachSnapshot([this](armem::wm::EntitySnapshot& e)
{
util::ensureFolderExists(std::filesystem::path(path) / std::to_string(e.id().timestamp.toMicroSeconds()), false);
EntitySnapshot c(std::filesystem::path(path) / std::to_string(e.id().timestamp.toMicroSeconds()));
EntitySnapshot c((std::filesystem::path(path) / std::to_string(e.id().timestamp.toMicroSeconds())), pipeline);
c.load(e);
});
}
......@@ -142,8 +144,14 @@ namespace armarx::armem::server::ltm::disk
{
entity.forEachSnapshot([this](armem::wm::EntitySnapshot& e)
{
if (!pipeline.snapFilter->accept(e))
{
ARMARX_WARNING << deactivateSpam() << "Ignoring to put an EntitiySnapshot into the LTM because it got filtered.";
return;
}
util::ensureFolderExists(std::filesystem::path(path) / std::to_string(e.id().timestamp.toMicroSeconds()));
EntitySnapshot c(std::filesystem::path(path) / std::to_string(e.id().timestamp.toMicroSeconds()));
EntitySnapshot c((std::filesystem::path(path) / std::to_string(e.id().timestamp.toMicroSeconds())), pipeline);
c.store(e);
});
}
......
......@@ -13,10 +13,10 @@ namespace armarx::armem::server::ltm::disk
/// @brief A memory storing data in mongodb (needs 'armarx memory start' to start the mongod instance)
class Entity :
public EntityBase<EntitySnapshot>,
public DiskStorage
public DiskMemoryItem
{
public:
Entity(const std::filesystem::path&);
Entity(const std::filesystem::path&, const LongTermMemoryPipeline& p);
bool forEachSnapshot(std::function<void(EntitySnapshot&)>&& func) const override;
bool forEachSnapshotInIndexRange(long first, long last, std::function<void(EntitySnapshot&)>&& func) const override;
......
......@@ -10,13 +10,6 @@
#include <ArmarXCore/core/logging/Logging.h>
#include <RobotAPI/libraries/aron/core/data/variant/container/Dict.h>
// ArmarX
#include "../base/extractor/noExtractor/NoExtractor.h"
#include "../base/extractor/imageExtractor/ImageExtractor.h"
#include "../base/converter/dict/json/JsonConverter.h"
#include "../base/converter/dict/bson/BsonConverter.h"
#include "../base/converter/image/png/PngConverter.h"
namespace armarx::armem::server::ltm::disk
{
......@@ -44,13 +37,11 @@ namespace armarx::armem::server::ltm::disk
}
}
EntitySnapshot::EntitySnapshot(const std::filesystem::path& p) :
EntitySnapshotBase(getMemoryIDFromPath(p)),
DiskStorage(p)
EntitySnapshot::EntitySnapshot(const std::filesystem::path& p, const LongTermMemoryPipeline& pipe) :
EntitySnapshotBase(getMemoryIDFromPath(p), pipe),
DiskMemoryItem(p)
{
extractor = std::make_shared<extractor::ImageExtractor>();
dictConverter = std::make_shared<converter::dict::JsonConverter>();
imgConverter = std::make_shared<converter::image::PngConverter>();
}
std::string EntitySnapshot::getExpectedFolderName() const
......@@ -87,8 +78,8 @@ namespace armarx::armem::server::ltm::disk
{
util::ensureFolderExists(p / std::to_string(i), false);
std::filesystem::path data = p / std::to_string(i) / (constantes::DATA_FILENAME + dictConverter->suffix);
std::filesystem::path metadata = p / std::to_string(i) / (constantes::METADATA_FILENAME + dictConverter->suffix);
std::filesystem::path data = p / std::to_string(i) / (constantes::DATA_FILENAME + pipeline.dictConverter->suffix);
std::filesystem::path metadata = p / std::to_string(i) / (constantes::METADATA_FILENAME + pipeline.dictConverter->suffix);
util::ensureFileExists(data);
......@@ -96,11 +87,11 @@ namespace armarx::armem::server::ltm::disk
std::ifstream dataifs(data);
std::vector<unsigned char> datafilecontent((std::istreambuf_iterator<char>(dataifs)), (std::istreambuf_iterator<char>()));
auto dataaron = dictConverter->convert(datafilecontent);
auto dataaron = pipeline.dictConverter->convert(datafilecontent);
std::ifstream metadataifs(metadata);
std::vector<unsigned char> metadatafilecontent((std::istreambuf_iterator<char>(metadataifs)), (std::istreambuf_iterator<char>()));
auto metadataaron = dictConverter->convert(metadatafilecontent);
auto metadataaron = pipeline.dictConverter->convert(metadatafilecontent);
from_aron(metadataaron, dataaron, ins);
}
......@@ -116,8 +107,8 @@ namespace armarx::armem::server::ltm::disk
std::filesystem::path instancePath = p / std::to_string(i);
util::ensureFolderExists(instancePath);
std::filesystem::path dataPath = instancePath / (constantes::DATA_FILENAME + dictConverter->suffix);
std::filesystem::path metadataPath = instancePath / (constantes::METADATA_FILENAME + dictConverter->suffix);
std::filesystem::path dataPath = instancePath / (constantes::DATA_FILENAME + pipeline.dictConverter->suffix);
std::filesystem::path metadataPath = instancePath / (constantes::METADATA_FILENAME + pipeline.dictConverter->suffix);
if (util::checkIfFileExists(dataPath) or util::checkIfFileExists(metadataPath))
{
......@@ -132,21 +123,21 @@ namespace armarx::armem::server::ltm::disk
to_aron(metadataAron, dataAron, ins);
// extract
auto dataExt = extractor->extract(dataAron);
auto metadataExt = extractor->extract(metadataAron);
auto dataExt = pipeline.extractor->extract(dataAron);
auto metadataExt = pipeline.extractor->extract(metadataAron);
// convert images
for (const auto& [memberName, img] : dataExt.images)
{
ARMARX_CHECK_NOT_NULL(img);
std::filesystem::path imgPath = instancePath / (memberName + imgConverter->suffix);
auto imgVec = imgConverter->convert(img);
std::filesystem::path imgPath = instancePath / (memberName + pipeline.imgConverter->suffix);
auto imgVec = pipeline.imgConverter->convert(img);
writeDataToFile(imgPath, imgVec);
}
// convert dict and metadata
auto dataVec = dictConverter->convert(dataExt.data);
auto metadataVec = dictConverter->convert(metadataExt.data);
auto dataVec = pipeline.dictConverter->convert(dataExt.data);
auto metadataVec = pipeline.dictConverter->convert(metadataExt.data);
writeDataToFile(dataPath, dataVec);
writeDataToFile(metadataPath, metadataVec);
}
......
......@@ -11,10 +11,10 @@ namespace armarx::armem::server::ltm::disk
class EntitySnapshot :
public EntitySnapshotBase,
public DiskStorage
public DiskMemoryItem
{
public:
EntitySnapshot(const std::filesystem::path&);
EntitySnapshot(const std::filesystem::path&, const LongTermMemoryPipeline& p);
protected:
void _loadAll(armem::wm::EntitySnapshot&) const override;
......
......@@ -24,9 +24,9 @@ namespace armarx::armem::server::ltm::disk
}
}
ProviderSegment::ProviderSegment(const std::filesystem::path& p) :
ProviderSegmentBase(getMemoryIDFromPath(p)),
DiskStorage(p)
ProviderSegment::ProviderSegment(const std::filesystem::path& p, const LongTermMemoryPipeline& pipe) :
ProviderSegmentBase(getMemoryIDFromPath(p), pipe),
DiskMemoryItem(p)
{
}
......@@ -43,7 +43,7 @@ namespace armarx::armem::server::ltm::disk
for (const auto& subdir : std::filesystem::directory_iterator(p))
{
std::filesystem::path subdirPath = subdir.path();
Entity c(subdirPath);
Entity c(subdirPath, pipeline);
func(c);
}
return true;
......@@ -61,7 +61,7 @@ namespace armarx::armem::server::ltm::disk
std::filesystem::path subpath = p / n;
util::ensureFolderExists(subpath, false);
auto c = std::make_shared<Entity>(subpath);
auto c = std::make_shared<Entity>(subpath, pipeline);
return c;
}
......@@ -86,7 +86,7 @@ namespace armarx::armem::server::ltm::disk
p.forEachEntity([this](armem::wm::Entity& e)
{
util::ensureFolderExists(std::filesystem::path(path) / e.id().entityName, false);
Entity c(std::filesystem::path(path) / e.id().entityName);
Entity c((std::filesystem::path(path) / e.id().entityName), pipeline);
c.load(e);
});
}
......@@ -96,9 +96,8 @@ namespace armarx::armem::server::ltm::disk
providerSegment.forEachEntity([this](const auto& entity)
{
util::ensureFolderExists(std::filesystem::path(path) / entity.id().entityName);
Entity c(std::filesystem::path(path) / entity.id().entityName);
Entity c((std::filesystem::path(path) / entity.id().entityName), pipeline);
c.store(entity);
});
}
}
......@@ -12,10 +12,10 @@ namespace armarx::armem::server::ltm::disk
{
class ProviderSegment :
public ProviderSegmentBase<Entity>,
public DiskStorage
public DiskMemoryItem
{
public:
ProviderSegment(const std::filesystem::path&);
ProviderSegment(const std::filesystem::path&, const LongTermMemoryPipeline& p);
bool forEachEntity(std::function<void(Entity&)>&& func) const override;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment