diff --git a/source/RobotAPI/components/armem/server/RobotStateMemory/RobotStateMemory.cpp b/source/RobotAPI/components/armem/server/RobotStateMemory/RobotStateMemory.cpp index e3f006eaf4a554d13705868eaf812955b2fcfb12..eab8bf5e6dd721395647519d3e04fecc62ffbb7f 100644 --- a/source/RobotAPI/components/armem/server/RobotStateMemory/RobotStateMemory.cpp +++ b/source/RobotAPI/components/armem/server/RobotStateMemory/RobotStateMemory.cpp @@ -160,15 +160,12 @@ namespace armarx::armem::server::robot_state robotUnit.reader.task = new SimpleRunningTask<>([this]() { - robotUnit.reader.run( - robotUnit.pollFrequency, robotUnit.dataQueue, robotUnit.dataMutex - ); + robotUnit.reader.run(robotUnit.pollFrequency, robotUnit.dataBuffer); }, "Robot Unit Reader"); robotUnit.writer.task = new SimpleRunningTask<>([this]() { robotUnit.writer.run( - robotUnit.pollFrequency, robotUnit.dataQueue, robotUnit.dataMutex, - iceAdapter(), localizationSegment + robotUnit.pollFrequency, robotUnit.dataBuffer, iceAdapter(), localizationSegment ); }, "Robot State Writer"); diff --git a/source/RobotAPI/components/armem/server/RobotStateMemory/RobotStateMemory.h b/source/RobotAPI/components/armem/server/RobotStateMemory/RobotStateMemory.h index fb0329dc65699624bfd53236783846cdb3592876..09280c4691a479a3bbd612131565775669bf21d1 100644 --- a/source/RobotAPI/components/armem/server/RobotStateMemory/RobotStateMemory.h +++ b/source/RobotAPI/components/armem/server/RobotStateMemory/RobotStateMemory.h @@ -129,8 +129,7 @@ namespace armarx::armem::server::robot_state proprioception::RobotStateWriter writer; // queue - std::queue<proprioception::RobotUnitData> dataQueue; - std::mutex dataMutex; + TripleBuffer<std::vector<proprioception::RobotUnitData>> dataBuffer; bool waitForRobotUnit = false; }; diff --git a/source/RobotAPI/components/units/RobotUnit/RobotUnitModules/RobotUnitModuleLogging.cpp b/source/RobotAPI/components/units/RobotUnit/RobotUnitModules/RobotUnitModuleLogging.cpp index 92f65d1941cd1c88dde3ace948d0c37a85d4ab60..0f45fe1f7bbd22ff33afc7859f595ecdc952ab60 100644 --- a/source/RobotAPI/components/units/RobotUnit/RobotUnitModules/RobotUnitModuleLogging.cpp +++ b/source/RobotAPI/components/units/RobotUnit/RobotUnitModules/RobotUnitModuleLogging.cpp @@ -389,6 +389,8 @@ namespace armarx::RobotUnitModule DataStreamingEntry& streamingEntry = rtDataStreamingEntry[receiver]; getProperty(streamingEntry.rtStreamMaxClientErrors, "RTLogging_StreamingDataMaxClientConnectionFailures"); + + getProperty(logAllEntries, "RTLogging_LogAllMessages"); ARMARX_INFO << "start data streaming to " << receiver->ice_getIdentity().name << ". Values: " << config.loggingNames; @@ -575,14 +577,29 @@ namespace armarx::RobotUnitModule << '\n' << "Number of streams " << rtDataStreamingEntry.size(); } - _module<ControlThreadDataBuffer>() - .getControlThreadOutputBuffer() - .foreachNewLoggingEntry( - [&](const auto& data, auto i, auto num) - { - ARMARX_TRACE; - doLogging(dlogdurs, now, data, i, num); - }); + + + if(logAllEntries) + { + _module<ControlThreadDataBuffer>() + .getControlThreadOutputBuffer() + .foreachNewLoggingEntry( + [&](const auto& data, auto i, auto num) + { + ARMARX_TRACE; + doLogging(dlogdurs, now, data, i, num); + }); + }else { // only log newest entry + _module<ControlThreadDataBuffer>() + .getControlThreadOutputBuffer() + .forLatestLoggingEntry( + [&](const auto& data, auto i, auto num) + { + ARMARX_TRACE; + doLogging(dlogdurs, now, data, i, num); + }); + } + } ARMARX_DEBUG << ::deactivateSpam() << "the last " << backlog.size() << " iterations are stored"; @@ -863,7 +880,7 @@ namespace armarx::RobotUnitModule rtLoggingBacklogRetentionTime = IceUtil::Time::milliSeconds(getProperty<std::size_t>("RTLogging_KeepIterationsForMs")); - + ARMARX_CHECK_GREATER(getControlThreadTargetPeriod().toMicroSeconds(), 0); } @@ -993,9 +1010,9 @@ namespace armarx::RobotUnitModule { return; } - result.emplace_back(getResultElement()); + + auto& data = result.emplace_back(getResultElement()); - auto& data = result.back(); data.iterationId = e.iteration; data.timestampUSec = e.sensorValuesTimestamp.toMicroSeconds(); data.timesSinceLastIterationUSec = e.timeSinceLastIteration.toMicroSeconds(); diff --git a/source/RobotAPI/components/units/RobotUnit/RobotUnitModules/RobotUnitModuleLogging.h b/source/RobotAPI/components/units/RobotUnit/RobotUnitModules/RobotUnitModuleLogging.h index 732b3a193d6da5831634a1c6caa0206e9c391140..bb6a625e3484fc843f4d27224a5801d416d91858 100644 --- a/source/RobotAPI/components/units/RobotUnit/RobotUnitModules/RobotUnitModuleLogging.h +++ b/source/RobotAPI/components/units/RobotUnit/RobotUnitModules/RobotUnitModuleLogging.h @@ -79,6 +79,9 @@ namespace armarx::RobotUnitModule defineOptionalProperty<std::size_t>( "RTLogging_StreamingDataMaxClientConnectionFailures", 10, "If sending data to a client fails this often, the client is removed from the list"); + + defineOptionalProperty<bool>("RTLogging_LogAllMessages", true, + "If true, logs all messages from the control thread. If false, only the newest message will be logged"); } }; @@ -301,6 +304,9 @@ namespace armarx::RobotUnitModule /// @brief The time an entry shold remain in the backlog. IceUtil::Time rtLoggingBacklogRetentionTime; + /// @brief + bool logAllEntries = true; + friend void WriteTo(const auto& dentr, const Logging::DataStreamingEntry::OutVal& out, const auto& val, diff --git a/source/RobotAPI/components/units/RobotUnit/util/ControlThreadOutputBuffer.cpp b/source/RobotAPI/components/units/RobotUnit/util/ControlThreadOutputBuffer.cpp index 8e5aa6c3238a3149180217c31bfff947adc27bff..ed230762d821de4fef3e400bb9280066dbeec877 100644 --- a/source/RobotAPI/components/units/RobotUnit/util/ControlThreadOutputBuffer.cpp +++ b/source/RobotAPI/components/units/RobotUnit/util/ControlThreadOutputBuffer.cpp @@ -21,6 +21,7 @@ */ #include "ControlThreadOutputBuffer.h" #include <memory> +#include <utility> namespace armarx { @@ -124,6 +125,29 @@ namespace armarx } + + void ControlThreadOutputBuffer::forLatestLoggingEntry(ConsumerFunctor consumer) + { + ARMARX_TRACE; + ARMARX_CHECK_EXPRESSION(isInitialized); + if (writePosition - onePastLoggingReadPosition >= numEntries) + { + ARMARX_ERROR << "There are " << writePosition - onePastLoggingReadPosition + << " unlogged entries, but only the last " << numEntries << " are saved! " + "There seems to be something wrong (e.g. the rt logging threw an exception, " + "the system load is too high or the logging takes to long). " + "The log position will be reset to the newest entry!"; + resetLoggingPosition(); + } + + // skip all messages except the latest + onePastLoggingReadPosition = writePosition.load() - 1; + + // use already existing impl. + foreachNewLoggingEntry(std::move(consumer)); + + } + std::size_t ControlThreadOutputBuffer::initialize(std::size_t numEntries, const KeyValueVector<std::string, ControlDevicePtr>& controlDevices, const KeyValueVector<std::string, SensorDevicePtr>& sensorDevices, diff --git a/source/RobotAPI/components/units/RobotUnit/util/ControlThreadOutputBuffer.h b/source/RobotAPI/components/units/RobotUnit/util/ControlThreadOutputBuffer.h index 40d70b0d55cc93a372a058f0a850aa344a426bcd..4cbdf1e4b7fcde010ea07db88f4d8585163cf2d9 100644 --- a/source/RobotAPI/components/units/RobotUnit/util/ControlThreadOutputBuffer.h +++ b/source/RobotAPI/components/units/RobotUnit/util/ControlThreadOutputBuffer.h @@ -228,6 +228,7 @@ namespace armarx //logging read void resetLoggingPosition() const; void foreachNewLoggingEntry(ConsumerFunctor consumer); + void forLatestLoggingEntry(ConsumerFunctor consumer); std::size_t getNumberOfBytes() const; diff --git a/source/RobotAPI/gui-plugins/DebugRobotUnitDataStreaming/DebugRobotUnitDataStreamingWidgetController.cpp b/source/RobotAPI/gui-plugins/DebugRobotUnitDataStreaming/DebugRobotUnitDataStreamingWidgetController.cpp index ac67b3a3b5ebfa719a6af1cca86032cd81238f25..c895e131e75cc63287f43dc323c600e825734550 100644 --- a/source/RobotAPI/gui-plugins/DebugRobotUnitDataStreaming/DebugRobotUnitDataStreamingWidgetController.cpp +++ b/source/RobotAPI/gui-plugins/DebugRobotUnitDataStreaming/DebugRobotUnitDataStreamingWidgetController.cpp @@ -89,7 +89,7 @@ namespace armarx while (rec.size() < n) { rec.emplace_back(getRobotUnitComponentPlugin() - .startDataSatreming(cfg)); + .startDataStreaming(cfg)); ARMARX_INFO << rec.back()->getDataDescriptionString(); } for (auto& r : rec) diff --git a/source/RobotAPI/libraries/RobotAPIComponentPlugins/RobotUnitComponentPlugin.cpp b/source/RobotAPI/libraries/RobotAPIComponentPlugins/RobotUnitComponentPlugin.cpp index 9c0c21175f5926a8db9676bc06e25d5b00dd7b84..8222d062291e990872750187ff6dc1d943dc5079 100644 --- a/source/RobotAPI/libraries/RobotAPIComponentPlugins/RobotUnitComponentPlugin.cpp +++ b/source/RobotAPI/libraries/RobotAPIComponentPlugins/RobotUnitComponentPlugin.cpp @@ -9,7 +9,7 @@ namespace armarx::plugins { RobotUnitDataStreamingReceiverPtr - RobotUnitComponentPlugin::startDataSatreming( + RobotUnitComponentPlugin::startDataStreaming( const RobotUnitDataStreaming::Config& cfg) { //ok to create smart ptr from parent, since ice handels this @@ -194,5 +194,3 @@ namespace armarx ARMARX_INFO << "Robot unit is up and running."; } } - - diff --git a/source/RobotAPI/libraries/RobotAPIComponentPlugins/RobotUnitComponentPlugin.h b/source/RobotAPI/libraries/RobotAPIComponentPlugins/RobotUnitComponentPlugin.h index 6edd2d83691362e3dfdebb372984ca05f497e586..ec497f88670d49e3e780114b2fcc3a9e78e3cc9e 100644 --- a/source/RobotAPI/libraries/RobotAPIComponentPlugins/RobotUnitComponentPlugin.h +++ b/source/RobotAPI/libraries/RobotAPIComponentPlugins/RobotUnitComponentPlugin.h @@ -74,7 +74,7 @@ namespace armarx //datastreaming public: - RobotUnitDataStreamingReceiverPtr startDataSatreming(const RobotUnitDataStreaming::Config& cfg); + RobotUnitDataStreamingReceiverPtr startDataStreaming(const RobotUnitDataStreaming::Config& cfg); private: static constexpr const char* PROPERTY_NAME = "RobotUnitName"; diff --git a/source/RobotAPI/libraries/RobotUnitDataStreamingReceiver/RobotUnitDataStreamingReceiver.cpp b/source/RobotAPI/libraries/RobotUnitDataStreamingReceiver/RobotUnitDataStreamingReceiver.cpp index 10a524cb71f64e12add7d65870139b1c44707ea5..b7d6343d447fd95a6031b2ab5b5c88201a540069 100644 --- a/source/RobotAPI/libraries/RobotUnitDataStreamingReceiver/RobotUnitDataStreamingReceiver.cpp +++ b/source/RobotAPI/libraries/RobotUnitDataStreamingReceiver/RobotUnitDataStreamingReceiver.cpp @@ -173,8 +173,9 @@ namespace armarx _last_iteration_id + 1 != step.iterationId ) { - ARMARX_ERROR << "Missing Iterations or iterations out of order!" - " This should not happen"; + ARMARX_ERROR << "Missing Iterations or iterations out of order! " + << "This should not happen. " + << VAROUT(_last_iteration_id) << ", " << VAROUT(step.iterationId); } _last_iteration_id = step.iterationId; _data_buffer.emplace_back(std::move(step)); diff --git a/source/RobotAPI/libraries/armem_robot_state/server/proprioception/RobotStateWriter.cpp b/source/RobotAPI/libraries/armem_robot_state/server/proprioception/RobotStateWriter.cpp index 82ba252a400bf57e561dd2ba45a063a49182dbd1..87df7cb013d99543485fdc8868aa83254bf52ffe 100644 --- a/source/RobotAPI/libraries/armem_robot_state/server/proprioception/RobotStateWriter.cpp +++ b/source/RobotAPI/libraries/armem_robot_state/server/proprioception/RobotStateWriter.cpp @@ -29,6 +29,7 @@ #include <SimoxUtility/math/convert/rpy_to_mat3f.h> // ArmarX +#include "ArmarXCore/core/time/Metronome.h" #include <ArmarXCore/core/exceptions/local/ExpressionException.h> #include <ArmarXCore/core/time/CycleUtil.h> #include <ArmarXCore/libraries/ArmarXCoreComponentPlugins/DebugObserverComponentPlugin.h> @@ -66,28 +67,21 @@ namespace armarx::armem::server::robot_state::proprioception void RobotStateWriter::run( float pollFrequency, - std::queue<RobotUnitData>& dataQueue, - std::mutex& dataMutex, + TripleBuffer<std::vector<RobotUnitData>>& dataBuffer, MemoryToIceAdapter& memory, localization::Segment& localizationSegment) { - CycleUtil cycle(static_cast<int>(1000.f / pollFrequency)); + Metronome metronome(Frequency::HertzDouble(pollFrequency)); + while (task and not task->isStopped()) { - size_t queueSize = 0; - std::queue<RobotUnitData> batch; - { - std::lock_guard lock{dataMutex}; - queueSize = dataQueue.size(); - if (!dataQueue.empty()) - { - std::swap(batch, dataQueue); - } - } + const std::vector<RobotUnitData>& batch = dataBuffer.getUpToDateReadBuffer(); + if (debugObserver) { - debugObserver->setDebugObserverDatafield("RobotStateWriter | Queue Size", queueSize); + debugObserver->setDebugObserverDatafield("RobotStateWriter | Queue Size", batch.size()); } + if (not batch.empty()) { auto start = std::chrono::high_resolution_clock::now(); @@ -131,23 +125,21 @@ namespace armarx::armem::server::robot_state::proprioception { debugObserver->sendDebugObserverBatch(); } - cycle.waitForCycleDuration(); + metronome.waitForNextTick(); } } - RobotStateWriter::Update RobotStateWriter::buildUpdate(std::queue<RobotUnitData>& dataQueue) + RobotStateWriter::Update RobotStateWriter::buildUpdate(const std::vector<RobotUnitData>& dataQueue) { - ARMARX_CHECK_GREATER(dataQueue.size(), 0); + ARMARX_CHECK_NOT_EMPTY(dataQueue); ARMARX_DEBUG << "RobotStateWriter: Commit batch of " << dataQueue.size() << " timesteps to memory..."; // Send batch to memory Update update; - while (dataQueue.size() > 0) + for (const RobotUnitData& data: dataQueue) { - const RobotUnitData& data = dataQueue.front(); - { armem::EntityUpdate& up = update.proprioception.add(); up.entityID = properties.robotUnitProviderID.withEntityName(properties.robotUnitProviderID.providerSegmentName); @@ -173,8 +165,6 @@ namespace armarx::armem::server::robot_state::proprioception ; noOdometryDataLogged = true; } - - dataQueue.pop(); } return update; diff --git a/source/RobotAPI/libraries/armem_robot_state/server/proprioception/RobotStateWriter.h b/source/RobotAPI/libraries/armem_robot_state/server/proprioception/RobotStateWriter.h index 4a4f14672b038132243bdbd5fc5a5617f3f5d668..d70566ebf5c3aa6808d0e14af9c1af0471ac31c7 100644 --- a/source/RobotAPI/libraries/armem_robot_state/server/proprioception/RobotStateWriter.h +++ b/source/RobotAPI/libraries/armem_robot_state/server/proprioception/RobotStateWriter.h @@ -25,6 +25,7 @@ #include <optional> +#include "ArmarXCore/util/CPPUtility/TripleBuffer.h" #include <ArmarXCore/core/logging/Logging.h> #include <ArmarXCore/core/services/tasks/TaskUtil.h> #include <ArmarXCore/libraries/DebugObserverHelper/DebugObserverHelper.h> @@ -58,7 +59,7 @@ namespace armarx::armem::server::robot_state::proprioception /// Reads data from `dataQueue` and commits to the memory. void run(float pollFrequency, - std::queue<RobotUnitData>& dataQueue, std::mutex& dataMutex, + TripleBuffer<std::vector<RobotUnitData>>& dataBuffer, MemoryToIceAdapter& memory, localization::Segment& localizationSegment ); @@ -69,7 +70,7 @@ namespace armarx::armem::server::robot_state::proprioception armem::Commit proprioception; std::vector<armem::robot_state::Transform> localization; }; - Update buildUpdate(std::queue<RobotUnitData>& dataQueue); + Update buildUpdate(const std::vector<RobotUnitData>& dataQueue); private: diff --git a/source/RobotAPI/libraries/armem_robot_state/server/proprioception/RobotUnitReader.cpp b/source/RobotAPI/libraries/armem_robot_state/server/proprioception/RobotUnitReader.cpp index f50c4e5619d70b7910d6d3ef544c2ca4f9d7fdc9..5794287c7231e07f9ffa0db967090f6d415c8f29 100644 --- a/source/RobotAPI/libraries/armem_robot_state/server/proprioception/RobotUnitReader.cpp +++ b/source/RobotAPI/libraries/armem_robot_state/server/proprioception/RobotUnitReader.cpp @@ -10,6 +10,10 @@ #include <RobotAPI/libraries/RobotUnitDataStreamingReceiver/RobotUnitDataStreamingReceiver.h> +#include "ArmarXCore/core/time/Frequency.h" +#include "ArmarXCore/core/time/Metronome.h" +#include "ArmarXCore/core/time/forward_declarations.h" +#include "ArmarXCore/util/CPPUtility/TripleBuffer.h" #include <ArmarXCore/core/time/CycleUtil.h> #include <ArmarXCore/libraries/ArmarXCoreComponentPlugins/DebugObserverComponentPlugin.h> @@ -23,9 +27,7 @@ namespace armarx::armem::server::robot_state::proprioception { - RobotUnitReader::RobotUnitReader() - { - } + RobotUnitReader::RobotUnitReader() = default; void RobotUnitReader::connect( @@ -40,7 +42,7 @@ namespace armarx::armem::server::robot_state::proprioception << "Known are: " << converterRegistry.getKeys(); config.loggingNames.push_back(properties.sensorPrefix); - receiver = robotUnitPlugin.startDataSatreming(config); + receiver = robotUnitPlugin.startDataStreaming(config); description = receiver->getDataDescription(); } { @@ -53,23 +55,24 @@ namespace armarx::armem::server::robot_state::proprioception void RobotUnitReader::run( float pollFrequency, - std::queue<RobotUnitData>& dataQueue, - std::mutex& dataMutex) + TripleBuffer<std::vector<RobotUnitData>>& dataBuffer) { - CycleUtil cycle(static_cast<int>(1000.f / pollFrequency)); + Metronome metronome(Frequency::HertzDouble(pollFrequency)); + while (task and not task->isStopped()) { if (std::optional<RobotUnitData> commit = fetchAndConvertLatestRobotUnitData()) { - std::lock_guard g{dataMutex}; - dataQueue.push(std::move(commit.value())); + dataBuffer.getWriteBuffer().push_back(std::move(commit.value())); + dataBuffer.commitWrite(); } if (debugObserver) { debugObserver->sendDebugObserverBatch(); } - cycle.waitForCycleDuration(); + + metronome.waitForNextTick(); } } @@ -125,4 +128,3 @@ namespace armarx::armem::server::robot_state::proprioception } - diff --git a/source/RobotAPI/libraries/armem_robot_state/server/proprioception/RobotUnitReader.h b/source/RobotAPI/libraries/armem_robot_state/server/proprioception/RobotUnitReader.h index efb7a0d8175de36ebf963fd035d176442df822f2..5abf7378378828005795d10401a2ee7cc1ea5aa4 100644 --- a/source/RobotAPI/libraries/armem_robot_state/server/proprioception/RobotUnitReader.h +++ b/source/RobotAPI/libraries/armem_robot_state/server/proprioception/RobotUnitReader.h @@ -6,6 +6,7 @@ #include <optional> #include <string> +#include "ArmarXCore/util/CPPUtility/TripleBuffer.h" #include <ArmarXCore/core/logging/Logging.h> #include <ArmarXCore/core/services/tasks/TaskUtil.h> #include <ArmarXCore/libraries/DebugObserverHelper/DebugObserverHelper.h> @@ -46,8 +47,7 @@ namespace armarx::armem::server::robot_state::proprioception /// Reads data from `handler` and fills `dataQueue`. void run(float pollFrequency, - std::queue<RobotUnitData>& dataQueue, - std::mutex& dataMutex); + TripleBuffer<std::vector<RobotUnitData>>& dataBuffer); std::optional<RobotUnitData> fetchAndConvertLatestRobotUnitData(); @@ -81,4 +81,3 @@ namespace armarx::armem::server::robot_state::proprioception }; } -