diff --git a/source/RobotAPI/components/armem/server/RobotStateMemory/RobotStateMemory.cpp b/source/RobotAPI/components/armem/server/RobotStateMemory/RobotStateMemory.cpp index e3f006eaf4a554d13705868eaf812955b2fcfb12..eab8bf5e6dd721395647519d3e04fecc62ffbb7f 100644 --- a/source/RobotAPI/components/armem/server/RobotStateMemory/RobotStateMemory.cpp +++ b/source/RobotAPI/components/armem/server/RobotStateMemory/RobotStateMemory.cpp @@ -160,15 +160,12 @@ namespace armarx::armem::server::robot_state robotUnit.reader.task = new SimpleRunningTask<>([this]() { - robotUnit.reader.run( - robotUnit.pollFrequency, robotUnit.dataQueue, robotUnit.dataMutex - ); + robotUnit.reader.run(robotUnit.pollFrequency, robotUnit.dataBuffer); }, "Robot Unit Reader"); robotUnit.writer.task = new SimpleRunningTask<>([this]() { robotUnit.writer.run( - robotUnit.pollFrequency, robotUnit.dataQueue, robotUnit.dataMutex, - iceAdapter(), localizationSegment + robotUnit.pollFrequency, robotUnit.dataBuffer, iceAdapter(), localizationSegment ); }, "Robot State Writer"); diff --git a/source/RobotAPI/components/armem/server/RobotStateMemory/RobotStateMemory.h b/source/RobotAPI/components/armem/server/RobotStateMemory/RobotStateMemory.h index fb0329dc65699624bfd53236783846cdb3592876..09280c4691a479a3bbd612131565775669bf21d1 100644 --- a/source/RobotAPI/components/armem/server/RobotStateMemory/RobotStateMemory.h +++ b/source/RobotAPI/components/armem/server/RobotStateMemory/RobotStateMemory.h @@ -129,8 +129,7 @@ namespace armarx::armem::server::robot_state proprioception::RobotStateWriter writer; // queue - std::queue<proprioception::RobotUnitData> dataQueue; - std::mutex dataMutex; + TripleBuffer<std::vector<proprioception::RobotUnitData>> dataBuffer; bool waitForRobotUnit = false; }; diff --git a/source/RobotAPI/gui-plugins/DebugRobotUnitDataStreaming/DebugRobotUnitDataStreamingWidgetController.cpp b/source/RobotAPI/gui-plugins/DebugRobotUnitDataStreaming/DebugRobotUnitDataStreamingWidgetController.cpp index ac67b3a3b5ebfa719a6af1cca86032cd81238f25..c895e131e75cc63287f43dc323c600e825734550 100644 --- a/source/RobotAPI/gui-plugins/DebugRobotUnitDataStreaming/DebugRobotUnitDataStreamingWidgetController.cpp +++ b/source/RobotAPI/gui-plugins/DebugRobotUnitDataStreaming/DebugRobotUnitDataStreamingWidgetController.cpp @@ -89,7 +89,7 @@ namespace armarx while (rec.size() < n) { rec.emplace_back(getRobotUnitComponentPlugin() - .startDataSatreming(cfg)); + .startDataStreaming(cfg)); ARMARX_INFO << rec.back()->getDataDescriptionString(); } for (auto& r : rec) diff --git a/source/RobotAPI/libraries/RobotAPIComponentPlugins/RobotUnitComponentPlugin.cpp b/source/RobotAPI/libraries/RobotAPIComponentPlugins/RobotUnitComponentPlugin.cpp index 9c0c21175f5926a8db9676bc06e25d5b00dd7b84..8222d062291e990872750187ff6dc1d943dc5079 100644 --- a/source/RobotAPI/libraries/RobotAPIComponentPlugins/RobotUnitComponentPlugin.cpp +++ b/source/RobotAPI/libraries/RobotAPIComponentPlugins/RobotUnitComponentPlugin.cpp @@ -9,7 +9,7 @@ namespace armarx::plugins { RobotUnitDataStreamingReceiverPtr - RobotUnitComponentPlugin::startDataSatreming( + RobotUnitComponentPlugin::startDataStreaming( const RobotUnitDataStreaming::Config& cfg) { //ok to create smart ptr from parent, since ice handels this @@ -194,5 +194,3 @@ namespace armarx ARMARX_INFO << "Robot unit is up and running."; } } - - diff --git a/source/RobotAPI/libraries/RobotAPIComponentPlugins/RobotUnitComponentPlugin.h b/source/RobotAPI/libraries/RobotAPIComponentPlugins/RobotUnitComponentPlugin.h index 6edd2d83691362e3dfdebb372984ca05f497e586..ec497f88670d49e3e780114b2fcc3a9e78e3cc9e 100644 --- a/source/RobotAPI/libraries/RobotAPIComponentPlugins/RobotUnitComponentPlugin.h +++ b/source/RobotAPI/libraries/RobotAPIComponentPlugins/RobotUnitComponentPlugin.h @@ -74,7 +74,7 @@ namespace armarx //datastreaming public: - RobotUnitDataStreamingReceiverPtr startDataSatreming(const RobotUnitDataStreaming::Config& cfg); + RobotUnitDataStreamingReceiverPtr startDataStreaming(const RobotUnitDataStreaming::Config& cfg); private: static constexpr const char* PROPERTY_NAME = "RobotUnitName"; diff --git a/source/RobotAPI/libraries/RobotUnitDataStreamingReceiver/RobotUnitDataStreamingReceiver.cpp b/source/RobotAPI/libraries/RobotUnitDataStreamingReceiver/RobotUnitDataStreamingReceiver.cpp index 10a524cb71f64e12add7d65870139b1c44707ea5..b7d6343d447fd95a6031b2ab5b5c88201a540069 100644 --- a/source/RobotAPI/libraries/RobotUnitDataStreamingReceiver/RobotUnitDataStreamingReceiver.cpp +++ b/source/RobotAPI/libraries/RobotUnitDataStreamingReceiver/RobotUnitDataStreamingReceiver.cpp @@ -173,8 +173,9 @@ namespace armarx _last_iteration_id + 1 != step.iterationId ) { - ARMARX_ERROR << "Missing Iterations or iterations out of order!" - " This should not happen"; + ARMARX_ERROR << "Missing Iterations or iterations out of order! " + << "This should not happen. " + << VAROUT(_last_iteration_id) << ", " << VAROUT(step.iterationId); } _last_iteration_id = step.iterationId; _data_buffer.emplace_back(std::move(step)); diff --git a/source/RobotAPI/libraries/armem_robot_state/server/proprioception/RobotStateWriter.cpp b/source/RobotAPI/libraries/armem_robot_state/server/proprioception/RobotStateWriter.cpp index 82ba252a400bf57e561dd2ba45a063a49182dbd1..87df7cb013d99543485fdc8868aa83254bf52ffe 100644 --- a/source/RobotAPI/libraries/armem_robot_state/server/proprioception/RobotStateWriter.cpp +++ b/source/RobotAPI/libraries/armem_robot_state/server/proprioception/RobotStateWriter.cpp @@ -29,6 +29,7 @@ #include <SimoxUtility/math/convert/rpy_to_mat3f.h> // ArmarX +#include "ArmarXCore/core/time/Metronome.h" #include <ArmarXCore/core/exceptions/local/ExpressionException.h> #include <ArmarXCore/core/time/CycleUtil.h> #include <ArmarXCore/libraries/ArmarXCoreComponentPlugins/DebugObserverComponentPlugin.h> @@ -66,28 +67,21 @@ namespace armarx::armem::server::robot_state::proprioception void RobotStateWriter::run( float pollFrequency, - std::queue<RobotUnitData>& dataQueue, - std::mutex& dataMutex, + TripleBuffer<std::vector<RobotUnitData>>& dataBuffer, MemoryToIceAdapter& memory, localization::Segment& localizationSegment) { - CycleUtil cycle(static_cast<int>(1000.f / pollFrequency)); + Metronome metronome(Frequency::HertzDouble(pollFrequency)); + while (task and not task->isStopped()) { - size_t queueSize = 0; - std::queue<RobotUnitData> batch; - { - std::lock_guard lock{dataMutex}; - queueSize = dataQueue.size(); - if (!dataQueue.empty()) - { - std::swap(batch, dataQueue); - } - } + const std::vector<RobotUnitData>& batch = dataBuffer.getUpToDateReadBuffer(); + if (debugObserver) { - debugObserver->setDebugObserverDatafield("RobotStateWriter | Queue Size", queueSize); + debugObserver->setDebugObserverDatafield("RobotStateWriter | Queue Size", batch.size()); } + if (not batch.empty()) { auto start = std::chrono::high_resolution_clock::now(); @@ -131,23 +125,21 @@ namespace armarx::armem::server::robot_state::proprioception { debugObserver->sendDebugObserverBatch(); } - cycle.waitForCycleDuration(); + metronome.waitForNextTick(); } } - RobotStateWriter::Update RobotStateWriter::buildUpdate(std::queue<RobotUnitData>& dataQueue) + RobotStateWriter::Update RobotStateWriter::buildUpdate(const std::vector<RobotUnitData>& dataQueue) { - ARMARX_CHECK_GREATER(dataQueue.size(), 0); + ARMARX_CHECK_NOT_EMPTY(dataQueue); ARMARX_DEBUG << "RobotStateWriter: Commit batch of " << dataQueue.size() << " timesteps to memory..."; // Send batch to memory Update update; - while (dataQueue.size() > 0) + for (const RobotUnitData& data: dataQueue) { - const RobotUnitData& data = dataQueue.front(); - { armem::EntityUpdate& up = update.proprioception.add(); up.entityID = properties.robotUnitProviderID.withEntityName(properties.robotUnitProviderID.providerSegmentName); @@ -173,8 +165,6 @@ namespace armarx::armem::server::robot_state::proprioception ; noOdometryDataLogged = true; } - - dataQueue.pop(); } return update; diff --git a/source/RobotAPI/libraries/armem_robot_state/server/proprioception/RobotStateWriter.h b/source/RobotAPI/libraries/armem_robot_state/server/proprioception/RobotStateWriter.h index 4a4f14672b038132243bdbd5fc5a5617f3f5d668..d70566ebf5c3aa6808d0e14af9c1af0471ac31c7 100644 --- a/source/RobotAPI/libraries/armem_robot_state/server/proprioception/RobotStateWriter.h +++ b/source/RobotAPI/libraries/armem_robot_state/server/proprioception/RobotStateWriter.h @@ -25,6 +25,7 @@ #include <optional> +#include "ArmarXCore/util/CPPUtility/TripleBuffer.h" #include <ArmarXCore/core/logging/Logging.h> #include <ArmarXCore/core/services/tasks/TaskUtil.h> #include <ArmarXCore/libraries/DebugObserverHelper/DebugObserverHelper.h> @@ -58,7 +59,7 @@ namespace armarx::armem::server::robot_state::proprioception /// Reads data from `dataQueue` and commits to the memory. void run(float pollFrequency, - std::queue<RobotUnitData>& dataQueue, std::mutex& dataMutex, + TripleBuffer<std::vector<RobotUnitData>>& dataBuffer, MemoryToIceAdapter& memory, localization::Segment& localizationSegment ); @@ -69,7 +70,7 @@ namespace armarx::armem::server::robot_state::proprioception armem::Commit proprioception; std::vector<armem::robot_state::Transform> localization; }; - Update buildUpdate(std::queue<RobotUnitData>& dataQueue); + Update buildUpdate(const std::vector<RobotUnitData>& dataQueue); private: diff --git a/source/RobotAPI/libraries/armem_robot_state/server/proprioception/RobotUnitReader.cpp b/source/RobotAPI/libraries/armem_robot_state/server/proprioception/RobotUnitReader.cpp index f50c4e5619d70b7910d6d3ef544c2ca4f9d7fdc9..5794287c7231e07f9ffa0db967090f6d415c8f29 100644 --- a/source/RobotAPI/libraries/armem_robot_state/server/proprioception/RobotUnitReader.cpp +++ b/source/RobotAPI/libraries/armem_robot_state/server/proprioception/RobotUnitReader.cpp @@ -10,6 +10,10 @@ #include <RobotAPI/libraries/RobotUnitDataStreamingReceiver/RobotUnitDataStreamingReceiver.h> +#include "ArmarXCore/core/time/Frequency.h" +#include "ArmarXCore/core/time/Metronome.h" +#include "ArmarXCore/core/time/forward_declarations.h" +#include "ArmarXCore/util/CPPUtility/TripleBuffer.h" #include <ArmarXCore/core/time/CycleUtil.h> #include <ArmarXCore/libraries/ArmarXCoreComponentPlugins/DebugObserverComponentPlugin.h> @@ -23,9 +27,7 @@ namespace armarx::armem::server::robot_state::proprioception { - RobotUnitReader::RobotUnitReader() - { - } + RobotUnitReader::RobotUnitReader() = default; void RobotUnitReader::connect( @@ -40,7 +42,7 @@ namespace armarx::armem::server::robot_state::proprioception << "Known are: " << converterRegistry.getKeys(); config.loggingNames.push_back(properties.sensorPrefix); - receiver = robotUnitPlugin.startDataSatreming(config); + receiver = robotUnitPlugin.startDataStreaming(config); description = receiver->getDataDescription(); } { @@ -53,23 +55,24 @@ namespace armarx::armem::server::robot_state::proprioception void RobotUnitReader::run( float pollFrequency, - std::queue<RobotUnitData>& dataQueue, - std::mutex& dataMutex) + TripleBuffer<std::vector<RobotUnitData>>& dataBuffer) { - CycleUtil cycle(static_cast<int>(1000.f / pollFrequency)); + Metronome metronome(Frequency::HertzDouble(pollFrequency)); + while (task and not task->isStopped()) { if (std::optional<RobotUnitData> commit = fetchAndConvertLatestRobotUnitData()) { - std::lock_guard g{dataMutex}; - dataQueue.push(std::move(commit.value())); + dataBuffer.getWriteBuffer().push_back(std::move(commit.value())); + dataBuffer.commitWrite(); } if (debugObserver) { debugObserver->sendDebugObserverBatch(); } - cycle.waitForCycleDuration(); + + metronome.waitForNextTick(); } } @@ -125,4 +128,3 @@ namespace armarx::armem::server::robot_state::proprioception } - diff --git a/source/RobotAPI/libraries/armem_robot_state/server/proprioception/RobotUnitReader.h b/source/RobotAPI/libraries/armem_robot_state/server/proprioception/RobotUnitReader.h index efb7a0d8175de36ebf963fd035d176442df822f2..5abf7378378828005795d10401a2ee7cc1ea5aa4 100644 --- a/source/RobotAPI/libraries/armem_robot_state/server/proprioception/RobotUnitReader.h +++ b/source/RobotAPI/libraries/armem_robot_state/server/proprioception/RobotUnitReader.h @@ -6,6 +6,7 @@ #include <optional> #include <string> +#include "ArmarXCore/util/CPPUtility/TripleBuffer.h" #include <ArmarXCore/core/logging/Logging.h> #include <ArmarXCore/core/services/tasks/TaskUtil.h> #include <ArmarXCore/libraries/DebugObserverHelper/DebugObserverHelper.h> @@ -46,8 +47,7 @@ namespace armarx::armem::server::robot_state::proprioception /// Reads data from `handler` and fills `dataQueue`. void run(float pollFrequency, - std::queue<RobotUnitData>& dataQueue, - std::mutex& dataMutex); + TripleBuffer<std::vector<RobotUnitData>>& dataBuffer); std::optional<RobotUnitData> fetchAndConvertLatestRobotUnitData(); @@ -81,4 +81,3 @@ namespace armarx::armem::server::robot_state::proprioception }; } -