/* * This file is part of ArmarX. * * ArmarX is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License version 2 as * published by the Free Software Foundation. * * ArmarX is distributed in the hope that it will be useful, but * WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program. If not, see <http://www.gnu.org/licenses/>. * * @package RobotAPI::ArmarXObjects::RobotUnitDataStreamingReceiver * @author Raphael Grimm ( raphael dot grimm at kit dot edu ) * @date 2020 * @copyright http://www.gnu.org/licenses/gpl-2.0.txt * GNU General Public License */ #include "RobotUnitDataStreamingReceiver.h" #include <Ice/ObjectAdapter.h> #include "ArmarXCore/core/logging/Logging.h" #include <ArmarXCore/core/ArmarXManager.h> namespace armarx::detail::RobotUnitDataStreamingReceiver { class Receiver : virtual public ManagedIceObject, virtual public RobotUnitDataStreaming::Receiver { public: std::string getDefaultName() const override { return "RobotUnitDataStreamingReceiver"; } ~Receiver() { std::lock_guard g{_data_mutex}; } void onInitComponent() override { } void onConnectComponent() override { } void onExitComponent() override { } void update_async(const RobotUnitDataStreaming::AMD_Receiver_updatePtr& ptr, const RobotUnitDataStreaming::TimeStepSeq& data, Ice::Long msgSequenceNbr, const Ice::Current&) override { ptr->ice_response(); if (_discard_data) { return; } static_assert(sizeof(std::uint64_t) == sizeof(msgSequenceNbr)); const auto seq = static_cast<std::uint64_t>(msgSequenceNbr); std::lock_guard g{_data_mutex}; ARMARX_VERBOSE << deactivateSpam() << "received " << data.size() << " timesteps"; _data[seq] = data; } std::atomic_bool _discard_data = false; std::mutex _data_mutex; std::map<std::uint64_t, RobotUnitDataStreaming::TimeStepSeq> _data; Ice::Identity _identity; }; } // namespace armarx::detail::RobotUnitDataStreamingReceiver namespace armarx { RobotUnitDataStreamingReceiver::RobotUnitDataStreamingReceiver( const ManagedIceObjectPtr& obj, const RobotUnitInterfacePrx& ru, const RobotUnitDataStreaming::Config& cfg) : _obj{obj}, _ru{ru} { ARMARX_CHECK_NOT_NULL(_obj); ARMARX_CHECK_NOT_NULL(_ru); _receiver = make_shared<detail::RobotUnitDataStreamingReceiver::Receiver>(); _receiver->_identity.name = _obj->getName() + "_RobotUnitDataStreamingReceiver_" + std::to_string(clock_t::now().time_since_epoch().count()); auto adapter = _obj->getArmarXManager()->getAdapter(); _proxy = RobotUnitDataStreaming::ReceiverPrx::uncheckedCast( adapter->add(_receiver, _receiver->_identity)); _description = _ru->startDataStreaming(_proxy, cfg); } RobotUnitDataStreamingReceiver::~RobotUnitDataStreamingReceiver() { if (_proxy) { _receiver->_discard_data = true; if (!_description.entries.empty()) { try { _ru->stopDataStreaming(_proxy); } catch (...) { ARMARX_WARNING << "did not stop streaming since the network call failed"; } } auto icemanager = _obj->getArmarXManager()->getIceManager(); auto adapter = _obj->getArmarXManager()->getAdapter(); adapter->remove(_receiver->_identity); while (icemanager->isObjectReachable(_receiver->_identity.name)) { ARMARX_INFO << deactivateSpam() << "waiting until receiver is removed from ice"; } } _proxy = nullptr; _receiver = nullptr; } std::deque<RobotUnitDataStreaming::TimeStep>& RobotUnitDataStreamingReceiver::getDataBuffer() { ARMARX_CHECK_NOT_NULL(_receiver); std::map<std::uint64_t, RobotUnitDataStreaming::TimeStepSeq> data; { std::lock_guard g{_receiver->_data_mutex}; std::swap(data, _receiver->_data); } _tmp_data_buffer.merge(data); if (!data.empty()) { ARMARX_ERROR << "Double message sequence IDs! This should not happen!\nIDs:\n" << ARMARX_STREAM_PRINTER { for (const auto& [key, _] : data) { out << " " << key << "\n"; } }; } auto it = _tmp_data_buffer.begin(); for (std::size_t idx = 0; it != _tmp_data_buffer.end(); ++it, ++idx) { if (_last_iteration_id == -1) { _tmp_data_buffer_seq_id = it->first - 1; } if (_tmp_data_buffer_seq_id + 1 != it->first) { if (_tmp_data_buffer.size() > 10 && idx < _tmp_data_buffer.size() - 10) { //there is a lot more data (10 updates) in the buffer! //-> some message calls went missing! ARMARX_ERROR << "some update messages went missing!"; } else { //maybe one or two frames are missing (due to async calls) -> wait break; } } _tmp_data_buffer_seq_id = it->first; for (auto& step : it->second) { if (_last_iteration_id != -1 && _last_iteration_id + 1 != step.iterationId) { ARMARX_INFO << deactivateSpam(60) << "Missing Iterations or iterations out of order! " << "This should not happen. " << VAROUT(_last_iteration_id) << ", " << VAROUT(step.iterationId); } _last_iteration_id = step.iterationId; _data_buffer.emplace_back(std::move(step)); } } _tmp_data_buffer.erase(_tmp_data_buffer.begin(), it); return _data_buffer; } const RobotUnitDataStreaming::DataStreamingDescription& RobotUnitDataStreamingReceiver::getDataDescription() const { return _description; } std::string RobotUnitDataStreamingReceiver::getDataDescriptionString() const { std::stringstream str; const auto& entr = getDataDescription().entries; str << "Received data (" << entr.size() << " entries):\n"; for (const auto& [k, v] : entr) { str << " " << k << ": type " << v.type << " index " << v.index << "\n"; } return str.str(); } } // namespace armarx