From 56dbe431be8ae3c652138a9520103125c52ebcbf Mon Sep 17 00:00:00 2001
From: Fabian Reister <fabian.reister@kit.edu>
Date: Mon, 28 Nov 2022 13:28:33 +0100
Subject: [PATCH] draft; using triple buffer

---
 .../RobotStateMemory/RobotStateMemory.cpp     |  7 ++--
 .../RobotStateMemory/RobotStateMemory.h       |  3 +-
 ...RobotUnitDataStreamingWidgetController.cpp |  2 +-
 .../RobotUnitComponentPlugin.cpp              |  4 +--
 .../RobotUnitComponentPlugin.h                |  2 +-
 .../RobotUnitDataStreamingReceiver.cpp        |  5 +--
 .../proprioception/RobotStateWriter.cpp       | 34 +++++++------------
 .../server/proprioception/RobotStateWriter.h  |  5 +--
 .../server/proprioception/RobotUnitReader.cpp | 24 +++++++------
 .../server/proprioception/RobotUnitReader.h   |  5 ++-
 10 files changed, 39 insertions(+), 52 deletions(-)

diff --git a/source/RobotAPI/components/armem/server/RobotStateMemory/RobotStateMemory.cpp b/source/RobotAPI/components/armem/server/RobotStateMemory/RobotStateMemory.cpp
index e3f006eaf..eab8bf5e6 100644
--- a/source/RobotAPI/components/armem/server/RobotStateMemory/RobotStateMemory.cpp
+++ b/source/RobotAPI/components/armem/server/RobotStateMemory/RobotStateMemory.cpp
@@ -160,15 +160,12 @@ namespace armarx::armem::server::robot_state
 
             robotUnit.reader.task = new SimpleRunningTask<>([this]()
             {
-                robotUnit.reader.run(
-                    robotUnit.pollFrequency, robotUnit.dataQueue, robotUnit.dataMutex
-                );
+                robotUnit.reader.run(robotUnit.pollFrequency, robotUnit.dataBuffer);
             }, "Robot Unit Reader");
             robotUnit.writer.task = new SimpleRunningTask<>([this]()
             {
                 robotUnit.writer.run(
-                    robotUnit.pollFrequency, robotUnit.dataQueue, robotUnit.dataMutex,
-                    iceAdapter(), localizationSegment
+                    robotUnit.pollFrequency, robotUnit.dataBuffer, iceAdapter(), localizationSegment
                 );
             }, "Robot State Writer");
 
diff --git a/source/RobotAPI/components/armem/server/RobotStateMemory/RobotStateMemory.h b/source/RobotAPI/components/armem/server/RobotStateMemory/RobotStateMemory.h
index fb0329dc6..09280c469 100644
--- a/source/RobotAPI/components/armem/server/RobotStateMemory/RobotStateMemory.h
+++ b/source/RobotAPI/components/armem/server/RobotStateMemory/RobotStateMemory.h
@@ -129,8 +129,7 @@ namespace armarx::armem::server::robot_state
             proprioception::RobotStateWriter writer;
 
             // queue
-            std::queue<proprioception::RobotUnitData> dataQueue;
-            std::mutex dataMutex;
+            TripleBuffer<std::vector<proprioception::RobotUnitData>> dataBuffer;
 
             bool waitForRobotUnit = false;
         };
diff --git a/source/RobotAPI/gui-plugins/DebugRobotUnitDataStreaming/DebugRobotUnitDataStreamingWidgetController.cpp b/source/RobotAPI/gui-plugins/DebugRobotUnitDataStreaming/DebugRobotUnitDataStreamingWidgetController.cpp
index ac67b3a3b..c895e131e 100644
--- a/source/RobotAPI/gui-plugins/DebugRobotUnitDataStreaming/DebugRobotUnitDataStreamingWidgetController.cpp
+++ b/source/RobotAPI/gui-plugins/DebugRobotUnitDataStreaming/DebugRobotUnitDataStreamingWidgetController.cpp
@@ -89,7 +89,7 @@ namespace armarx
         while (rec.size() < n)
         {
             rec.emplace_back(getRobotUnitComponentPlugin()
-                             .startDataSatreming(cfg));
+                             .startDataStreaming(cfg));
             ARMARX_INFO << rec.back()->getDataDescriptionString();
         }
         for (auto& r : rec)
diff --git a/source/RobotAPI/libraries/RobotAPIComponentPlugins/RobotUnitComponentPlugin.cpp b/source/RobotAPI/libraries/RobotAPIComponentPlugins/RobotUnitComponentPlugin.cpp
index 9c0c21175..8222d0622 100644
--- a/source/RobotAPI/libraries/RobotAPIComponentPlugins/RobotUnitComponentPlugin.cpp
+++ b/source/RobotAPI/libraries/RobotAPIComponentPlugins/RobotUnitComponentPlugin.cpp
@@ -9,7 +9,7 @@
 namespace armarx::plugins
 {
     RobotUnitDataStreamingReceiverPtr
-    RobotUnitComponentPlugin::startDataSatreming(
+    RobotUnitComponentPlugin::startDataStreaming(
         const RobotUnitDataStreaming::Config& cfg)
     {
         //ok to create smart ptr from parent, since ice handels this
@@ -194,5 +194,3 @@ namespace armarx
         ARMARX_INFO << "Robot unit is up and running.";
     }
 }
-
-
diff --git a/source/RobotAPI/libraries/RobotAPIComponentPlugins/RobotUnitComponentPlugin.h b/source/RobotAPI/libraries/RobotAPIComponentPlugins/RobotUnitComponentPlugin.h
index 6edd2d836..ec497f886 100644
--- a/source/RobotAPI/libraries/RobotAPIComponentPlugins/RobotUnitComponentPlugin.h
+++ b/source/RobotAPI/libraries/RobotAPIComponentPlugins/RobotUnitComponentPlugin.h
@@ -74,7 +74,7 @@ namespace armarx
 
             //datastreaming
         public:
-            RobotUnitDataStreamingReceiverPtr startDataSatreming(const RobotUnitDataStreaming::Config& cfg);
+            RobotUnitDataStreamingReceiverPtr startDataStreaming(const RobotUnitDataStreaming::Config& cfg);
 
         private:
             static constexpr const char* PROPERTY_NAME = "RobotUnitName";
diff --git a/source/RobotAPI/libraries/RobotUnitDataStreamingReceiver/RobotUnitDataStreamingReceiver.cpp b/source/RobotAPI/libraries/RobotUnitDataStreamingReceiver/RobotUnitDataStreamingReceiver.cpp
index 10a524cb7..b7d6343d4 100644
--- a/source/RobotAPI/libraries/RobotUnitDataStreamingReceiver/RobotUnitDataStreamingReceiver.cpp
+++ b/source/RobotAPI/libraries/RobotUnitDataStreamingReceiver/RobotUnitDataStreamingReceiver.cpp
@@ -173,8 +173,9 @@ namespace armarx
                     _last_iteration_id + 1 != step.iterationId
                 )
                 {
-                    ARMARX_ERROR << "Missing Iterations or iterations out of order!"
-                                 " This should not happen";
+                    ARMARX_ERROR << "Missing Iterations or iterations out of order! "
+                                 << "This should not happen. " 
+                                 << VAROUT(_last_iteration_id) << ", " << VAROUT(step.iterationId);
                 }
                 _last_iteration_id = step.iterationId;
                 _data_buffer.emplace_back(std::move(step));
diff --git a/source/RobotAPI/libraries/armem_robot_state/server/proprioception/RobotStateWriter.cpp b/source/RobotAPI/libraries/armem_robot_state/server/proprioception/RobotStateWriter.cpp
index 82ba252a4..87df7cb01 100644
--- a/source/RobotAPI/libraries/armem_robot_state/server/proprioception/RobotStateWriter.cpp
+++ b/source/RobotAPI/libraries/armem_robot_state/server/proprioception/RobotStateWriter.cpp
@@ -29,6 +29,7 @@
 #include <SimoxUtility/math/convert/rpy_to_mat3f.h>
 
 // ArmarX
+#include "ArmarXCore/core/time/Metronome.h"
 #include <ArmarXCore/core/exceptions/local/ExpressionException.h>
 #include <ArmarXCore/core/time/CycleUtil.h>
 #include <ArmarXCore/libraries/ArmarXCoreComponentPlugins/DebugObserverComponentPlugin.h>
@@ -66,28 +67,21 @@ namespace armarx::armem::server::robot_state::proprioception
 
     void RobotStateWriter::run(
         float pollFrequency,
-        std::queue<RobotUnitData>& dataQueue,
-        std::mutex& dataMutex,
+        TripleBuffer<std::vector<RobotUnitData>>& dataBuffer,
         MemoryToIceAdapter& memory,
         localization::Segment& localizationSegment)
     {
-        CycleUtil cycle(static_cast<int>(1000.f / pollFrequency));
+        Metronome metronome(Frequency::HertzDouble(pollFrequency));
+
         while (task and not task->isStopped())
         {
-            size_t queueSize = 0;
-            std::queue<RobotUnitData> batch;
-            {
-                std::lock_guard lock{dataMutex};
-                queueSize = dataQueue.size();
-                if (!dataQueue.empty())
-                {
-                    std::swap(batch, dataQueue);
-                }
-            }
+            const std::vector<RobotUnitData>& batch = dataBuffer.getUpToDateReadBuffer();
+
             if (debugObserver)
             {
-                debugObserver->setDebugObserverDatafield("RobotStateWriter | Queue Size", queueSize);
+                debugObserver->setDebugObserverDatafield("RobotStateWriter | Queue Size", batch.size());
             }
+
             if (not batch.empty())
             {
                 auto start = std::chrono::high_resolution_clock::now();
@@ -131,23 +125,21 @@ namespace armarx::armem::server::robot_state::proprioception
             {
                 debugObserver->sendDebugObserverBatch();
             }
-            cycle.waitForCycleDuration();
+            metronome.waitForNextTick();
         }
     }
 
 
-    RobotStateWriter::Update RobotStateWriter::buildUpdate(std::queue<RobotUnitData>& dataQueue)
+    RobotStateWriter::Update RobotStateWriter::buildUpdate(const std::vector<RobotUnitData>& dataQueue)
     {
-        ARMARX_CHECK_GREATER(dataQueue.size(), 0);
+        ARMARX_CHECK_NOT_EMPTY(dataQueue);
         ARMARX_DEBUG << "RobotStateWriter: Commit batch of " << dataQueue.size() << " timesteps to memory...";
 
         // Send batch to memory
         Update update;
 
-        while (dataQueue.size() > 0)
+        for (const RobotUnitData& data: dataQueue)
         {
-            const RobotUnitData& data = dataQueue.front();
-
             {
                 armem::EntityUpdate& up = update.proprioception.add();
                 up.entityID = properties.robotUnitProviderID.withEntityName(properties.robotUnitProviderID.providerSegmentName);
@@ -173,8 +165,6 @@ namespace armarx::armem::server::robot_state::proprioception
                             ;
                 noOdometryDataLogged = true;
             }
-
-            dataQueue.pop();
         }
 
         return update;
diff --git a/source/RobotAPI/libraries/armem_robot_state/server/proprioception/RobotStateWriter.h b/source/RobotAPI/libraries/armem_robot_state/server/proprioception/RobotStateWriter.h
index 4a4f14672..d70566ebf 100644
--- a/source/RobotAPI/libraries/armem_robot_state/server/proprioception/RobotStateWriter.h
+++ b/source/RobotAPI/libraries/armem_robot_state/server/proprioception/RobotStateWriter.h
@@ -25,6 +25,7 @@
 
 #include <optional>
 
+#include "ArmarXCore/util/CPPUtility/TripleBuffer.h"
 #include <ArmarXCore/core/logging/Logging.h>
 #include <ArmarXCore/core/services/tasks/TaskUtil.h>
 #include <ArmarXCore/libraries/DebugObserverHelper/DebugObserverHelper.h>
@@ -58,7 +59,7 @@ namespace armarx::armem::server::robot_state::proprioception
 
         /// Reads data from `dataQueue` and commits to the memory.
         void run(float pollFrequency,
-                 std::queue<RobotUnitData>& dataQueue, std::mutex& dataMutex,
+                 TripleBuffer<std::vector<RobotUnitData>>& dataBuffer,
                  MemoryToIceAdapter& memory,
                  localization::Segment& localizationSegment
                 );
@@ -69,7 +70,7 @@ namespace armarx::armem::server::robot_state::proprioception
             armem::Commit proprioception;
             std::vector<armem::robot_state::Transform> localization;
         };
-        Update buildUpdate(std::queue<RobotUnitData>& dataQueue);
+        Update buildUpdate(const std::vector<RobotUnitData>& dataQueue);
 
 
     private:
diff --git a/source/RobotAPI/libraries/armem_robot_state/server/proprioception/RobotUnitReader.cpp b/source/RobotAPI/libraries/armem_robot_state/server/proprioception/RobotUnitReader.cpp
index f50c4e561..5794287c7 100644
--- a/source/RobotAPI/libraries/armem_robot_state/server/proprioception/RobotUnitReader.cpp
+++ b/source/RobotAPI/libraries/armem_robot_state/server/proprioception/RobotUnitReader.cpp
@@ -10,6 +10,10 @@
 
 #include <RobotAPI/libraries/RobotUnitDataStreamingReceiver/RobotUnitDataStreamingReceiver.h>
 
+#include "ArmarXCore/core/time/Frequency.h"
+#include "ArmarXCore/core/time/Metronome.h"
+#include "ArmarXCore/core/time/forward_declarations.h"
+#include "ArmarXCore/util/CPPUtility/TripleBuffer.h"
 #include <ArmarXCore/core/time/CycleUtil.h>
 #include <ArmarXCore/libraries/ArmarXCoreComponentPlugins/DebugObserverComponentPlugin.h>
 
@@ -23,9 +27,7 @@
 namespace armarx::armem::server::robot_state::proprioception
 {
 
-    RobotUnitReader::RobotUnitReader()
-    {
-    }
+    RobotUnitReader::RobotUnitReader() = default;
 
 
     void RobotUnitReader::connect(
@@ -40,7 +42,7 @@ namespace armarx::armem::server::robot_state::proprioception
                     << "Known are: " << converterRegistry.getKeys();
 
             config.loggingNames.push_back(properties.sensorPrefix);
-            receiver = robotUnitPlugin.startDataSatreming(config);
+            receiver = robotUnitPlugin.startDataStreaming(config);
             description = receiver->getDataDescription();
         }
         {
@@ -53,23 +55,24 @@ namespace armarx::armem::server::robot_state::proprioception
 
     void RobotUnitReader::run(
         float pollFrequency,
-        std::queue<RobotUnitData>& dataQueue,
-        std::mutex& dataMutex)
+        TripleBuffer<std::vector<RobotUnitData>>& dataBuffer)
     {
-        CycleUtil cycle(static_cast<int>(1000.f / pollFrequency));
+        Metronome metronome(Frequency::HertzDouble(pollFrequency));
+
         while (task and not task->isStopped())
         {
             if (std::optional<RobotUnitData> commit = fetchAndConvertLatestRobotUnitData())
             {
-                std::lock_guard g{dataMutex};
-                dataQueue.push(std::move(commit.value()));
+                dataBuffer.getWriteBuffer().push_back(std::move(commit.value()));
+                dataBuffer.commitWrite();
             }
 
             if (debugObserver)
             {
                 debugObserver->sendDebugObserverBatch();
             }
-            cycle.waitForCycleDuration();
+
+            metronome.waitForNextTick();
         }
     }
 
@@ -125,4 +128,3 @@ namespace armarx::armem::server::robot_state::proprioception
 
 
 }
-
diff --git a/source/RobotAPI/libraries/armem_robot_state/server/proprioception/RobotUnitReader.h b/source/RobotAPI/libraries/armem_robot_state/server/proprioception/RobotUnitReader.h
index efb7a0d81..5abf73783 100644
--- a/source/RobotAPI/libraries/armem_robot_state/server/proprioception/RobotUnitReader.h
+++ b/source/RobotAPI/libraries/armem_robot_state/server/proprioception/RobotUnitReader.h
@@ -6,6 +6,7 @@
 #include <optional>
 #include <string>
 
+#include "ArmarXCore/util/CPPUtility/TripleBuffer.h"
 #include <ArmarXCore/core/logging/Logging.h>
 #include <ArmarXCore/core/services/tasks/TaskUtil.h>
 #include <ArmarXCore/libraries/DebugObserverHelper/DebugObserverHelper.h>
@@ -46,8 +47,7 @@ namespace armarx::armem::server::robot_state::proprioception
 
         /// Reads data from `handler` and fills `dataQueue`.
         void run(float pollFrequency,
-                 std::queue<RobotUnitData>& dataQueue,
-                 std::mutex& dataMutex);
+                 TripleBuffer<std::vector<RobotUnitData>>& dataBuffer);
 
         std::optional<RobotUnitData> fetchAndConvertLatestRobotUnitData();
 
@@ -81,4 +81,3 @@ namespace armarx::armem::server::robot_state::proprioception
     };
 
 }
-
-- 
GitLab