diff --git a/source/RobotAPI/components/armem/server/RobotStateMemory/RobotStateMemory.h b/source/RobotAPI/components/armem/server/RobotStateMemory/RobotStateMemory.h index 09280c4691a479a3bbd612131565775669bf21d1..45814a74f861e11701cee7db3463e62ce43bae27 100644 --- a/source/RobotAPI/components/armem/server/RobotStateMemory/RobotStateMemory.h +++ b/source/RobotAPI/components/armem/server/RobotStateMemory/RobotStateMemory.h @@ -129,7 +129,8 @@ namespace armarx::armem::server::robot_state proprioception::RobotStateWriter writer; // queue - TripleBuffer<std::vector<proprioception::RobotUnitData>> dataBuffer; + using Queue = boost::lockfree::spsc_queue<proprioception::RobotUnitData, boost::lockfree::capacity<1024>>; + Queue dataBuffer; bool waitForRobotUnit = false; }; diff --git a/source/RobotAPI/libraries/armem_robot_state/server/proprioception/RobotStateWriter.cpp b/source/RobotAPI/libraries/armem_robot_state/server/proprioception/RobotStateWriter.cpp index 87df7cb013d99543485fdc8868aa83254bf52ffe..1e7e3d533c0251d3cbf1fb84962b4e11e182c35b 100644 --- a/source/RobotAPI/libraries/armem_robot_state/server/proprioception/RobotStateWriter.cpp +++ b/source/RobotAPI/libraries/armem_robot_state/server/proprioception/RobotStateWriter.cpp @@ -67,7 +67,7 @@ namespace armarx::armem::server::robot_state::proprioception void RobotStateWriter::run( float pollFrequency, - TripleBuffer<std::vector<RobotUnitData>>& dataBuffer, + Queue& dataBuffer, MemoryToIceAdapter& memory, localization::Segment& localizationSegment) { @@ -75,19 +75,19 @@ namespace armarx::armem::server::robot_state::proprioception while (task and not task->isStopped()) { - const std::vector<RobotUnitData>& batch = dataBuffer.getUpToDateReadBuffer(); - if (debugObserver) - { - debugObserver->setDebugObserverDatafield("RobotStateWriter | Queue Size", batch.size()); - } + // if (debugObserver) + // { + // debugObserver->setDebugObserverDatafield("RobotStateWriter | Queue Size", dataBuffer.size()); + // } - if (not batch.empty()) + RobotUnitData robotUnitData; + if (dataBuffer.pop(robotUnitData)) { auto start = std::chrono::high_resolution_clock::now(); auto endBuildUpdate = start, endProprioception = start, endLocalization = start; - Update update = buildUpdate(batch); + Update update = buildUpdate(robotUnitData); endBuildUpdate = std::chrono::high_resolution_clock::now(); // Commits lock the core segments. @@ -130,41 +130,35 @@ namespace armarx::armem::server::robot_state::proprioception } - RobotStateWriter::Update RobotStateWriter::buildUpdate(const std::vector<RobotUnitData>& dataQueue) + RobotStateWriter::Update RobotStateWriter::buildUpdate(const RobotUnitData& data) { - ARMARX_CHECK_NOT_EMPTY(dataQueue); - ARMARX_DEBUG << "RobotStateWriter: Commit batch of " << dataQueue.size() << " timesteps to memory..."; - // Send batch to memory Update update; - for (const RobotUnitData& data: dataQueue) { - { - armem::EntityUpdate& up = update.proprioception.add(); - up.entityID = properties.robotUnitProviderID.withEntityName(properties.robotUnitProviderID.providerSegmentName); - up.timeCreated = data.timestamp; - up.instancesData = { data.proprioception }; - } + armem::EntityUpdate& up = update.proprioception.add(); + up.entityID = properties.robotUnitProviderID.withEntityName(properties.robotUnitProviderID.providerSegmentName); + up.timeCreated = data.timestamp; + up.instancesData = { data.proprioception }; + } - // Extract odometry data. - const std::string platformKey = "platform"; - if (data.proprioception->hasElement(platformKey)) - { - ARMARX_DEBUG << "Found odometry data."; - auto platformData = aron::data::Dict::DynamicCastAndCheck(data.proprioception->getElement(platformKey)); - update.localization.emplace_back(getTransform(platformData, data.timestamp)); - } - else - { - ARMARX_INFO << "No odometry data! " - << "(No element '" << platformKey << "' in proprioception data.)" - << "\nIf you are using a mobile platform this should not have happened." - << "\nThis error is only logged once." - << "\nThese keys exist: " << data.proprioception->getAllKeys() - ; - noOdometryDataLogged = true; - } + // Extract odometry data. + const std::string platformKey = "platform"; + if (data.proprioception->hasElement(platformKey)) + { + ARMARX_DEBUG << "Found odometry data."; + auto platformData = aron::data::Dict::DynamicCastAndCheck(data.proprioception->getElement(platformKey)); + update.localization.emplace_back(getTransform(platformData, data.timestamp)); + } + else + { + ARMARX_INFO << "No odometry data! " + << "(No element '" << platformKey << "' in proprioception data.)" + << "\nIf you are using a mobile platform this should not have happened." + << "\nThis error is only logged once." + << "\nThese keys exist: " << data.proprioception->getAllKeys() + ; + noOdometryDataLogged = true; } return update; diff --git a/source/RobotAPI/libraries/armem_robot_state/server/proprioception/RobotStateWriter.h b/source/RobotAPI/libraries/armem_robot_state/server/proprioception/RobotStateWriter.h index d70566ebf5c3aa6808d0e14af9c1af0471ac31c7..362bde7193fe40c9572bd4064437526bf4f0c7cb 100644 --- a/source/RobotAPI/libraries/armem_robot_state/server/proprioception/RobotStateWriter.h +++ b/source/RobotAPI/libraries/armem_robot_state/server/proprioception/RobotStateWriter.h @@ -23,6 +23,7 @@ #pragma once +#include <boost/lockfree/spsc_queue.hpp> #include <optional> #include "ArmarXCore/util/CPPUtility/TripleBuffer.h" @@ -57,9 +58,12 @@ namespace armarx::armem::server::robot_state::proprioception void connect(armarx::plugins::DebugObserverComponentPlugin& debugObserver); + using Queue = boost::lockfree::spsc_queue<RobotUnitData, boost::lockfree::capacity<1024>>; + + /// Reads data from `dataQueue` and commits to the memory. void run(float pollFrequency, - TripleBuffer<std::vector<RobotUnitData>>& dataBuffer, + Queue& dataBuffer, MemoryToIceAdapter& memory, localization::Segment& localizationSegment ); @@ -70,7 +74,7 @@ namespace armarx::armem::server::robot_state::proprioception armem::Commit proprioception; std::vector<armem::robot_state::Transform> localization; }; - Update buildUpdate(const std::vector<RobotUnitData>& dataQueue); + Update buildUpdate(const RobotUnitData& dataQueue); private: diff --git a/source/RobotAPI/libraries/armem_robot_state/server/proprioception/RobotUnitReader.cpp b/source/RobotAPI/libraries/armem_robot_state/server/proprioception/RobotUnitReader.cpp index 5794287c7231e07f9ffa0db967090f6d415c8f29..ae44141606fb73011263bfe08fa474e7918ff023 100644 --- a/source/RobotAPI/libraries/armem_robot_state/server/proprioception/RobotUnitReader.cpp +++ b/source/RobotAPI/libraries/armem_robot_state/server/proprioception/RobotUnitReader.cpp @@ -55,7 +55,7 @@ namespace armarx::armem::server::robot_state::proprioception void RobotUnitReader::run( float pollFrequency, - TripleBuffer<std::vector<RobotUnitData>>& dataBuffer) + Queue& dataBuffer) { Metronome metronome(Frequency::HertzDouble(pollFrequency)); @@ -63,8 +63,7 @@ namespace armarx::armem::server::robot_state::proprioception { if (std::optional<RobotUnitData> commit = fetchAndConvertLatestRobotUnitData()) { - dataBuffer.getWriteBuffer().push_back(std::move(commit.value())); - dataBuffer.commitWrite(); + dataBuffer.push(std::move(commit.value())); } if (debugObserver) diff --git a/source/RobotAPI/libraries/armem_robot_state/server/proprioception/RobotUnitReader.h b/source/RobotAPI/libraries/armem_robot_state/server/proprioception/RobotUnitReader.h index 5abf7378378828005795d10401a2ee7cc1ea5aa4..9b1ea8861768f2a9bcd227c7be4e22d11ccef38c 100644 --- a/source/RobotAPI/libraries/armem_robot_state/server/proprioception/RobotUnitReader.h +++ b/source/RobotAPI/libraries/armem_robot_state/server/proprioception/RobotUnitReader.h @@ -6,7 +6,8 @@ #include <optional> #include <string> -#include "ArmarXCore/util/CPPUtility/TripleBuffer.h" +#include <boost/lockfree/spsc_queue.hpp> + #include <ArmarXCore/core/logging/Logging.h> #include <ArmarXCore/core/services/tasks/TaskUtil.h> #include <ArmarXCore/libraries/DebugObserverHelper/DebugObserverHelper.h> @@ -37,6 +38,8 @@ namespace armarx::armem::server::robot_state::proprioception RobotUnitReader(); + using Queue = boost::lockfree::spsc_queue<RobotUnitData, boost::lockfree::capacity<1024>>; + void connect( armarx::plugins::RobotUnitComponentPlugin& robotUnitPlugin, @@ -47,7 +50,7 @@ namespace armarx::armem::server::robot_state::proprioception /// Reads data from `handler` and fills `dataQueue`. void run(float pollFrequency, - TripleBuffer<std::vector<RobotUnitData>>& dataBuffer); + Queue& dataBuffer); std::optional<RobotUnitData> fetchAndConvertLatestRobotUnitData();