diff --git a/source/RobotAPI/components/units/RobotUnit/RobotUnitModules/RobotUnitModuleLogging.cpp b/source/RobotAPI/components/units/RobotUnit/RobotUnitModules/RobotUnitModuleLogging.cpp index eaa0b6accedf3a3d0de7b7e0e6f45a450cdafb5b..26fe846c4c2b6165d0a4f5f57d95f47dd874f6ab 100644 --- a/source/RobotAPI/components/units/RobotUnit/RobotUnitModules/RobotUnitModuleLogging.cpp +++ b/source/RobotAPI/components/units/RobotUnit/RobotUnitModules/RobotUnitModuleLogging.cpp @@ -593,9 +593,10 @@ namespace armarx::RobotUnitModule } else { - data.send(prx); + data.send(prx, rtDataStreamingMsgID); } } + ++rtDataStreamingMsgID; for (const auto& prx : toRemove) { rtDataStreamingEntry.erase(prx); @@ -688,7 +689,7 @@ namespace armarx::RobotUnitModule } durations.sens_csv.stop(); } - if (!rtLoggingEntries.empty()) + if (!rtDataStreamingEntry.empty()) { durations.sens_stream.start(); for (auto& [_, data] : rtDataStreamingEntry) @@ -1027,31 +1028,46 @@ namespace armarx::RobotUnitModule WriteTo(*this, o, val, fidx, data); } - void Logging::DataStreamingEntry::send(const RobotUnitDataStreaming::ReceiverPrx& r) + void Logging::DataStreamingEntry::send(const RobotUnitDataStreaming::ReceiverPrx& r, + std::uint64_t msgId) { ARMARX_TRACE; - try - { - const auto start_send = IceUtil::Time::now(); - r->update(result); - connectionFailures = 0; - const auto start_clear = IceUtil::Time::now(); - clearResult(); - const auto end = IceUtil::Time::now(); - ARMARX_DEBUG_S << "Logging::DataStreamingEntry::send" - << "\n update " << (start_clear - start_send).toMilliSecondsDouble() << "ms (" << result.size() << " timesteps)" - << "\n clear " << (end - start_clear).toMilliSecondsDouble() << "ms"; - } - catch (...) + const auto start_send = IceUtil::Time::now(); + updateCalls.emplace_back(r->begin_update(result, static_cast<Ice::Long>(msgId))); + const auto start_clear = IceUtil::Time::now(); + clearResult(); + const auto end = IceUtil::Time::now(); + ARMARX_DEBUG_S << "Logging::DataStreamingEntry::send" + << "\n update " << (start_clear - start_send).toMilliSecondsDouble() << "ms (" << result.size() << " timesteps)" + << "\n clear " << (end - start_clear).toMilliSecondsDouble() << "ms"; + + //now execute all ready callbacks + std::size_t i = 0; + for (; i < updateCalls.size(); ++i) { - ARMARX_TRACE; - ++connectionFailures; - if (connectionFailures > rtStreamMaxClientErrors) + try { - stopStreaming = true; - ARMARX_WARNING_S << "DataStreaming Receiver was not reachable for " - << connectionFailures << " iterations! Removing receiver"; + if (!updateCalls.at(i)->isCompleted()) + { + break; + } + r->end_update(updateCalls.at(i)); + connectionFailures = 0; + } + catch (...) + { + ARMARX_TRACE; + ++connectionFailures; + if (connectionFailures > rtStreamMaxClientErrors) + { + stopStreaming = true; + ARMARX_WARNING_S << "DataStreaming Receiver was not reachable for " + << connectionFailures << " iterations! Removing receiver"; + updateCalls.clear(); + break; + } } } + updateCalls.erase(updateCalls.begin(), updateCalls.begin() + i); } } diff --git a/source/RobotAPI/components/units/RobotUnit/RobotUnitModules/RobotUnitModuleLogging.h b/source/RobotAPI/components/units/RobotUnit/RobotUnitModules/RobotUnitModuleLogging.h index dc88451c7db6bee7ad4c953bc4de5b0b8d7d0905..975979177d45d71530317ea6f282929d72f6924c 100644 --- a/source/RobotAPI/components/units/RobotUnit/RobotUnitModules/RobotUnitModuleLogging.h +++ b/source/RobotAPI/components/units/RobotUnit/RobotUnitModules/RobotUnitModuleLogging.h @@ -239,6 +239,7 @@ namespace armarx::RobotUnitModule RobotUnitDataStreaming::TimeStepSeq result; std::deque<RobotUnitDataStreaming::TimeStep> entryBuffer; + std::deque<Ice::AsyncResultPtr> updateCalls; void clearResult(); RobotUnitDataStreaming::TimeStep getResultElement(); RobotUnitDataStreaming::TimeStep allocateResultElement() const; @@ -253,9 +254,10 @@ namespace armarx::RobotUnitModule void processSens(const SensorValueBase& val, std::size_t didx, std::size_t fidx); - void send(const RobotUnitDataStreaming::ReceiverPrx& r); + void send(const RobotUnitDataStreaming::ReceiverPrx& r, uint64_t msgId); }; std::map<RobotUnitDataStreaming::ReceiverPrx, DataStreamingEntry> rtDataStreamingEntry; + std::uint64_t rtDataStreamingMsgID = 0; /// @brief The thread executing the logging functions. std::thread rtLoggingTask; diff --git a/source/RobotAPI/interface/units/RobotUnit/RobotUnitInterface.ice b/source/RobotAPI/interface/units/RobotUnit/RobotUnitInterface.ice index c7871f0860fbe343a1704b2022f54a8ed6cd8674..dac07929c87809b5b51552cf79ab07ffe0b51e0d 100644 --- a/source/RobotAPI/interface/units/RobotUnit/RobotUnitInterface.ice +++ b/source/RobotAPI/interface/units/RobotUnit/RobotUnitInterface.ice @@ -222,7 +222,7 @@ module armarx interface Receiver { - void update(TimeStepSeq data); + ["amd"] void update(TimeStepSeq data, long msgSequenceNbr); }; struct Config diff --git a/source/RobotAPI/libraries/RobotUnitDataStreamingReceiver/RobotUnitDataStreamingReceiver.cpp b/source/RobotAPI/libraries/RobotUnitDataStreamingReceiver/RobotUnitDataStreamingReceiver.cpp index 45329ad72ffa5d694091bec3b81cc11cfbd72ab1..10a524cb71f64e12add7d65870139b1c44707ea5 100644 --- a/source/RobotAPI/libraries/RobotUnitDataStreamingReceiver/RobotUnitDataStreamingReceiver.cpp +++ b/source/RobotAPI/libraries/RobotUnitDataStreamingReceiver/RobotUnitDataStreamingReceiver.cpp @@ -45,22 +45,29 @@ namespace armarx::detail::RobotUnitDataStreamingReceiver void onConnectComponent() override {} void onExitComponent() override {} - void update(const RobotUnitDataStreaming::TimeStepSeq& data, const Ice::Current&) override + void update_async( + const RobotUnitDataStreaming::AMD_Receiver_updatePtr& ptr, + const RobotUnitDataStreaming::TimeStepSeq& data, + Ice::Long msgSequenceNbr, + const Ice::Current&) override { + ptr->ice_response(); if (_discard_data) { return; } + static_assert(sizeof(std::uint64_t) == sizeof(msgSequenceNbr)); + const auto seq = static_cast<std::uint64_t>(msgSequenceNbr); std::lock_guard g{_data_mutex}; ARMARX_INFO << deactivateSpam() << "received " << data.size() << " timesteps"; - _data.emplace_back(data); + _data[seq] = data; } - std::atomic_bool _discard_data = false; - std::mutex _data_mutex; - std::deque<RobotUnitDataStreaming::TimeStepSeq> _data; - Ice::Identity _identity; + std::atomic_bool _discard_data = false; + std::mutex _data_mutex; + std::map<std::uint64_t, RobotUnitDataStreaming::TimeStepSeq> _data; + Ice::Identity _identity; }; } @@ -94,7 +101,14 @@ namespace armarx _receiver->_discard_data = true; if (!_description.entries.empty()) { - _ru->stopDataStreaming(_proxy); + try + { + _ru->stopDataStreaming(_proxy); + } + catch (...) + { + ARMARX_WARNING << "did not stop streaming since the network call failed"; + } } auto icemanager = _obj->getArmarXManager()->getIceManager(); auto adapter = _obj->getArmarXManager()->getAdapter(); @@ -112,14 +126,47 @@ namespace armarx std::deque<RobotUnitDataStreaming::TimeStep>& RobotUnitDataStreamingReceiver::getDataBuffer() { ARMARX_CHECK_NOT_NULL(_receiver); - std::deque<RobotUnitDataStreaming::TimeStepSeq> data; + std::map<std::uint64_t, RobotUnitDataStreaming::TimeStepSeq> data; { std::lock_guard g{_receiver->_data_mutex}; std::swap(data, _receiver->_data); } - for (auto& chunk : data) + _tmp_data_buffer.merge(data); + if (!data.empty()) + { + ARMARX_ERROR << "Double message sequence IDs! This should not happen!\nIDs:\n" + << ARMARX_STREAM_PRINTER + { + for (const auto& [key, _] : data) + { + out << " " << key << "\n"; + } + }; + } + + auto it = _tmp_data_buffer.begin(); + for (std::size_t idx = 0; it != _tmp_data_buffer.end(); ++it, ++idx) { - for (auto& step : chunk) + if (_last_iteration_id == -1) + { + _tmp_data_buffer_seq_id = it->first - 1; + } + if (_tmp_data_buffer_seq_id + 1 != it->first) + { + if (_tmp_data_buffer.size() > 10 && idx < _tmp_data_buffer.size() - 10) + { + //there is a lot more data (10 updates) in the buffer! + //-> some message calls went missing! + ARMARX_ERROR << "some update messages went missing!"; + } + else + { + //maybe one or two frames are missing (due to async calls) -> wait + break; + } + } + _tmp_data_buffer_seq_id = it->first; + for (auto& step : it->second) { if ( _last_iteration_id != -1 && @@ -133,6 +180,7 @@ namespace armarx _data_buffer.emplace_back(std::move(step)); } } + _tmp_data_buffer.erase(_tmp_data_buffer.begin(), it); return _data_buffer; } @@ -140,4 +188,16 @@ namespace armarx { return _description; } + + std::string RobotUnitDataStreamingReceiver::getDataDescriptionString() const + { + std::stringstream str; + const auto& entr = getDataDescription().entries; + str << "Received data (" << entr.size() << " entries):\n"; + for (const auto& [k, v] : entr) + { + str << " " << k << ": type " << v.type << " index " << v.index << "\n"; + } + return str.str(); + } } diff --git a/source/RobotAPI/libraries/RobotUnitDataStreamingReceiver/RobotUnitDataStreamingReceiver.h b/source/RobotAPI/libraries/RobotUnitDataStreamingReceiver/RobotUnitDataStreamingReceiver.h index 741937eb749f7800eec9f5665a8872e8fd69d4f9..60a03b5c98109b657adddcd48c55731c84cb2f63 100644 --- a/source/RobotAPI/libraries/RobotUnitDataStreamingReceiver/RobotUnitDataStreamingReceiver.h +++ b/source/RobotAPI/libraries/RobotUnitDataStreamingReceiver/RobotUnitDataStreamingReceiver.h @@ -82,6 +82,7 @@ namespace armarx std::deque<timestep_t>& getDataBuffer(); const RobotUnitDataStreaming::DataStreamingDescription& getDataDescription() const; + std::string getDataDescriptionString() const; template<class T> DataEntryReader<T> getDataEntryReader(const std::string& name) const; @@ -97,13 +98,16 @@ namespace armarx static RobotUnitDataStreaming::DataEntryType ExpectedDataEntryType(); static void VisitEntries(auto&& f, const timestep_t& st, const auto& cont); private: - ManagedIceObjectPtr _obj; - RobotUnitInterfacePrx _ru; - detail::RobotUnitDataStreamingReceiver::ReceiverPtr _receiver; - RobotUnitDataStreaming::ReceiverPrx _proxy; - RobotUnitDataStreaming::DataStreamingDescription _description; - std::deque<RobotUnitDataStreaming::TimeStep> _data_buffer; - long _last_iteration_id = -1; + ManagedIceObjectPtr _obj; + RobotUnitInterfacePrx _ru; + detail::RobotUnitDataStreamingReceiver::ReceiverPtr _receiver; + RobotUnitDataStreaming::ReceiverPrx _proxy; + RobotUnitDataStreaming::DataStreamingDescription _description; + std::map<std::uint64_t, RobotUnitDataStreaming::TimeStepSeq> _tmp_data_buffer; + std::uint64_t _tmp_data_buffer_seq_id = 0; + std::deque<RobotUnitDataStreaming::TimeStep> _data_buffer; + long _last_iteration_id = -1; + }; } //impl