Skip to content
Snippets Groups Projects
Commit 7cd7a211 authored by Rainer Kartmann's avatar Rainer Kartmann
Browse files

Add locking and non-locking commit

parent 69b81532
No related branches found
No related tags found
No related merge requests found
......@@ -9,6 +9,18 @@ namespace armarx::armem::wm
std::vector<Memory::Base::UpdateResult>
Memory::update(const Commit& commit)
{
std::vector<Memory::Base::UpdateResult> result;
for (const EntityUpdate& update : commit.updates)
{
result.push_back(this->update(update));
}
return result;
}
std::vector<Memory::Base::UpdateResult>
Memory::updateLocking(const Commit& commit)
{
// Group updates by core segment, then update each core segment in a batch to only lock it once.
std::map<std::string, std::vector<const EntityUpdate*>> updatesPerCoreSegment;
......@@ -26,6 +38,7 @@ namespace armarx::armem::wm
if (it != this->_container.end())
{
CoreSegment& coreSegment = it->second;
// Lock the core segment for the whole batch.
std::scoped_lock lock(coreSegment.mutex());
......@@ -53,25 +66,31 @@ namespace armarx::armem::wm
}
Memory::Base::UpdateResult Memory::update(const EntityUpdate& update)
Memory::Base::UpdateResult
Memory::update(const EntityUpdate& update)
{
this->_checkContainerName(update.entityID.memoryName, this->name());
auto it = this->_container.find(update.entityID.coreSegmentName);
if (it != this->_container.end())
{
Base::UpdateResult ret;
{
std::scoped_lock lock(it->second.mutex());
ret = it->second.update(update);
}
ret.memoryUpdateType = UpdateType::UpdatedExisting;
return ret;
}
else
CoreSegment& segment = getCoreSegment(update.entityID.coreSegmentName);
Base::UpdateResult result = segment.update(update);
result.memoryUpdateType = UpdateType::UpdatedExisting;
return result;
}
Memory::Base::UpdateResult
Memory::updateLocking(const EntityUpdate& update)
{
this->_checkContainerName(update.entityID.memoryName, this->name());
CoreSegment& segment = getCoreSegment(update.entityID.coreSegmentName);
Base::UpdateResult result;
{
throw armem::error::MissingEntry::create<CoreSegment>(update.entityID.coreSegmentName, *this);
std::scoped_lock lock(segment.mutex());
result = segment.update(update);
}
result.memoryUpdateType = UpdateType::UpdatedExisting;
return result;
}
......
#pragma once
#include "../base/MemoryBase.h"
#include <RobotAPI/libraries/armem/core/base/MemoryBase.h>
#include "CoreSegment.h"
#include "detail/CopyWithoutData.h"
......@@ -28,17 +28,20 @@ namespace armarx::armem::wm
Memory& operator=(Memory&& other) = default;
virtual std::vector<Base::UpdateResult> update(const Commit& commit) override;
/**
* @brief Perform the commit, locking the core segments.
*
* Groups the commits by core segment, and updates each core segment
* in a batch, locking the core segment.
*/
virtual std::vector<Base::UpdateResult> update(const Commit& commit) override;
std::vector<Base::UpdateResult> updateLocking(const Commit& commit);
virtual Base::UpdateResult update(const EntityUpdate& update) override;
/**
* @brief Update the memory, locking the updated core segment.
*/
virtual Base::UpdateResult update(const EntityUpdate& update) override;
Base::UpdateResult updateLocking(const EntityUpdate& update);
/**
......@@ -47,6 +50,7 @@ namespace armarx::armem::wm
*/
Commit toCommit() const;
protected:
virtual void _copySelfWithoutData(Memory& other) const override;
......
......@@ -142,6 +142,18 @@ namespace armarx::armem::server
armem::CommitResult
MemoryToIceAdapter::commit(const armem::Commit& commit)
{
return this->_commit(commit, false);
}
armem::CommitResult MemoryToIceAdapter::commitLocking(const armem::Commit& commit)
{
return this->_commit(commit, true);
}
armem::CommitResult MemoryToIceAdapter::_commit(const armem::Commit& commit, bool locking)
{
std::vector<data::MemoryID> updatedIDs;
const bool publishUpdates = bool(memoryListenerTopic);
......@@ -152,8 +164,9 @@ namespace armarx::armem::server
EntityUpdateResult& result = commitResult.results.emplace_back();
try
{
// update() will lock core segment mutexes
auto updateResult = workingMemory->update(update);
auto updateResult = locking
? workingMemory->updateLocking(update)
: workingMemory->update(update);
result.success = true;
result.snapshotID = updateResult.id;
......@@ -293,4 +306,6 @@ namespace armarx::armem::server
}
......@@ -39,6 +39,8 @@ namespace armarx::armem::server
data::CommitResult commit(const data::Commit& commitIce, Time timeArrived);
data::CommitResult commit(const data::Commit& commitIce);
armem::CommitResult commit(const armem::Commit& commit);
armem::CommitResult commitLocking(const armem::Commit& commit);
// READING
query::data::Result query(const armem::query::data::Input& input);
......@@ -57,6 +59,11 @@ namespace armarx::armem::server
client::MemoryListenerInterfacePrx memoryListenerTopic;
private:
armem::CommitResult _commit(const armem::Commit& commit, bool locking);
};
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment