Skip to content
Snippets Groups Projects
RobotUnitDataStreamingReceiver.cpp 7.17 KiB
/*
 * This file is part of ArmarX.
 *
 * ArmarX is free software; you can redistribute it and/or modify
 * it under the terms of the GNU General Public License version 2 as
 * published by the Free Software Foundation.
 *
 * ArmarX is distributed in the hope that it will be useful, but
 * WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 *
 * @package    RobotAPI::ArmarXObjects::RobotUnitDataStreamingReceiver
 * @author     Raphael Grimm ( raphael dot grimm at kit dot edu )
 * @date       2020
 * @copyright  http://www.gnu.org/licenses/gpl-2.0.txt
 *             GNU General Public License
 */

#include <Ice/ObjectAdapter.h>

#include <ArmarXCore/core/ArmarXManager.h>

#include "RobotUnitDataStreamingReceiver.h"

namespace armarx::detail::RobotUnitDataStreamingReceiver
{
    class Receiver :
        virtual public ManagedIceObject,
        virtual public RobotUnitDataStreaming::Receiver
    {
    public:
        std::string getDefaultName() const override
        {
            return "RobotUnitDataStreamingReceiver";
        }
        ~Receiver()
        {
            std::lock_guard g{_data_mutex};
        }
        void onInitComponent() override {}
        void onConnectComponent() override {}
        void onExitComponent() override {}

        void update_async(
            const RobotUnitDataStreaming::AMD_Receiver_updatePtr& ptr,
            const RobotUnitDataStreaming::TimeStepSeq& data,
            Ice::Long msgSequenceNbr,
            const Ice::Current&) override
        {
            ptr->ice_response();
            if (_discard_data)
            {
                return;
            }
            static_assert(sizeof(std::uint64_t) == sizeof(msgSequenceNbr));
            const auto seq = static_cast<std::uint64_t>(msgSequenceNbr);
            std::lock_guard g{_data_mutex};
            ARMARX_INFO << deactivateSpam()
                        << "received " << data.size() << " timesteps";
            _data[seq] = data;
        }

        std::atomic_bool                                              _discard_data = false;
        std::mutex                                                    _data_mutex;
        std::map<std::uint64_t, RobotUnitDataStreaming::TimeStepSeq>  _data;
        Ice::Identity                                                 _identity;
    };
}

namespace armarx
{
    RobotUnitDataStreamingReceiver::RobotUnitDataStreamingReceiver(
        const ManagedIceObjectPtr& obj,
        const RobotUnitInterfacePrx& ru,
        const RobotUnitDataStreaming::Config& cfg
    ) : _obj{obj}, _ru{ru}
    {
        ARMARX_CHECK_NOT_NULL(_obj);
        ARMARX_CHECK_NOT_NULL(_ru);
        _receiver = make_shared<detail::RobotUnitDataStreamingReceiver::Receiver>();

        _receiver->_identity.name =
            _obj->getName() + "_RobotUnitDataStreamingReceiver_" +
            std::to_string(clock_t::now().time_since_epoch().count());

        auto adapter = _obj->getArmarXManager()->getAdapter();
        _proxy = RobotUnitDataStreaming::ReceiverPrx::uncheckedCast(
                     adapter->add(_receiver,  _receiver->_identity));

        _description = _ru->startDataStreaming(_proxy, cfg);
    }

    RobotUnitDataStreamingReceiver::~RobotUnitDataStreamingReceiver()
    {
        if (_proxy)
        {
            _receiver->_discard_data = true;
            if (!_description.entries.empty())
            {
                try
                {
                    _ru->stopDataStreaming(_proxy);
                }
                catch (...)
                {
                    ARMARX_WARNING << "did not stop streaming since the network call failed";
                }
            }
            auto icemanager = _obj->getArmarXManager()->getIceManager();
            auto adapter = _obj->getArmarXManager()->getAdapter();
            adapter->remove(_receiver->_identity);

            while (icemanager->isObjectReachable(_receiver->_identity.name))
            {
                ARMARX_INFO << deactivateSpam() << "waiting until receiver is removed from ice";
            }
        }
        _proxy    = nullptr;
        _receiver = nullptr;
    }

    std::deque<RobotUnitDataStreaming::TimeStep>& RobotUnitDataStreamingReceiver::getDataBuffer()
    {
        ARMARX_CHECK_NOT_NULL(_receiver);
        std::map<std::uint64_t, RobotUnitDataStreaming::TimeStepSeq>  data;
        {
            std::lock_guard g{_receiver->_data_mutex};
            std::swap(data, _receiver->_data);
        }
        _tmp_data_buffer.merge(data);
        if (!data.empty())
        {
            ARMARX_ERROR << "Double message sequence IDs! This should not happen!\nIDs:\n"
                         << ARMARX_STREAM_PRINTER
            {
                for (const auto& [key, _] : data)
                {
                    out << "    " << key << "\n";
                }
            };
        }

        auto it = _tmp_data_buffer.begin();
        for (std::size_t idx = 0; it != _tmp_data_buffer.end(); ++it, ++idx)
        {
            if (_last_iteration_id == -1)
            {
                _tmp_data_buffer_seq_id = it->first - 1;
            }
            if (_tmp_data_buffer_seq_id + 1 != it->first)
            {
                if (_tmp_data_buffer.size() > 10 && idx < _tmp_data_buffer.size() - 10)
                {
                    //there is a lot more data (10 updates) in the buffer!
                    //-> some message calls went missing!
                    ARMARX_ERROR << "some update messages went missing!";
                }
                else
                {
                    //maybe one or two frames are missing (due to async calls) -> wait
                    break;
                }
            }
            _tmp_data_buffer_seq_id = it->first;
            for (auto& step : it->second)
            {
                if (
                    _last_iteration_id != -1 &&
                    _last_iteration_id + 1 != step.iterationId
                )
                {
                    ARMARX_ERROR << "Missing Iterations or iterations out of order! "
                                 << "This should not happen. " 
                                 << VAROUT(_last_iteration_id) << ", " << VAROUT(step.iterationId);
                }
                _last_iteration_id = step.iterationId;
                _data_buffer.emplace_back(std::move(step));
            }
        }
        _tmp_data_buffer.erase(_tmp_data_buffer.begin(), it);
        return _data_buffer;
    }

    const RobotUnitDataStreaming::DataStreamingDescription& RobotUnitDataStreamingReceiver::getDataDescription() const
    {
        return _description;
    }

    std::string RobotUnitDataStreamingReceiver::getDataDescriptionString() const
    {
        std::stringstream str;
        const auto& entr = getDataDescription().entries;
        str << "Received data (" << entr.size() << " entries):\n";
        for (const auto& [k, v] : entr)
        {
            str << "    " << k << ": type " << v.type << " index " << v.index << "\n";
        }
        return str.str();
    }
}