Skip to content
Snippets Groups Projects
RobotUnitDataStreamingReceiver.cpp 7.27 KiB
Newer Older
/*
 * This file is part of ArmarX.
 *
 * ArmarX is free software; you can redistribute it and/or modify
 * it under the terms of the GNU General Public License version 2 as
 * published by the Free Software Foundation.
 *
 * ArmarX is distributed in the hope that it will be useful, but
 * WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 *
 * @package    RobotAPI::ArmarXObjects::RobotUnitDataStreamingReceiver
 * @author     Raphael Grimm ( raphael dot grimm at kit dot edu )
 * @date       2020
 * @copyright  http://www.gnu.org/licenses/gpl-2.0.txt
 *             GNU General Public License
 */

armar-user's avatar
armar-user committed
#include "RobotUnitDataStreamingReceiver.h"

#include <Ice/ObjectAdapter.h>

#include "ArmarXCore/core/logging/Logging.h"
#include <ArmarXCore/core/ArmarXManager.h>

namespace armarx::detail::RobotUnitDataStreamingReceiver
{
    class Receiver :
        virtual public ManagedIceObject,
        virtual public RobotUnitDataStreaming::Receiver
    {
    public:
armar-user's avatar
armar-user committed
        std::string
        getDefaultName() const override
        {
            return "RobotUnitDataStreamingReceiver";
        }
armar-user's avatar
armar-user committed

Pascal Weiner's avatar
Pascal Weiner committed
        ~Receiver()
        {
            std::lock_guard g{_data_mutex};
        }
armar-user's avatar
armar-user committed

        void
        onInitComponent() override
        {
        }

        void
        onConnectComponent() override
        {
        }

        void
        onExitComponent() override
        {
        }

        void
        update_async(const RobotUnitDataStreaming::AMD_Receiver_updatePtr& ptr,
                     const RobotUnitDataStreaming::TimeStepSeq& data,
                     Ice::Long msgSequenceNbr,
                     const Ice::Current&) override
            ptr->ice_response();
            if (_discard_data)
            {
                return;
            }
            static_assert(sizeof(std::uint64_t) == sizeof(msgSequenceNbr));
            const auto seq = static_cast<std::uint64_t>(msgSequenceNbr);
Raphael Grimm's avatar
Raphael Grimm committed
            std::lock_guard g{_data_mutex};
            ARMARX_VERBOSE << deactivateSpam()
                           << "received " << data.size() << " timesteps";
            _data[seq] = data;
armar-user's avatar
armar-user committed
        std::atomic_bool _discard_data = false;
        std::mutex _data_mutex;
        std::map<std::uint64_t, RobotUnitDataStreaming::TimeStepSeq> _data;
        Ice::Identity _identity;
armar-user's avatar
armar-user committed
} // namespace armarx::detail::RobotUnitDataStreamingReceiver

namespace armarx
{
    RobotUnitDataStreamingReceiver::RobotUnitDataStreamingReceiver(
        const ManagedIceObjectPtr& obj,
        const RobotUnitInterfacePrx& ru,
armar-user's avatar
armar-user committed
        const RobotUnitDataStreaming::Config& cfg) :
        _obj{obj}, _ru{ru}
    {
        ARMARX_CHECK_NOT_NULL(_obj);
        ARMARX_CHECK_NOT_NULL(_ru);
        _receiver = make_shared<detail::RobotUnitDataStreamingReceiver::Receiver>();

armar-user's avatar
armar-user committed
        _receiver->_identity.name = _obj->getName() + "_RobotUnitDataStreamingReceiver_" +
                                    std::to_string(clock_t::now().time_since_epoch().count());

        auto adapter = _obj->getArmarXManager()->getAdapter();
        _proxy = RobotUnitDataStreaming::ReceiverPrx::uncheckedCast(
armar-user's avatar
armar-user committed
            adapter->add(_receiver, _receiver->_identity));

        _description = _ru->startDataStreaming(_proxy, cfg);
    }

    RobotUnitDataStreamingReceiver::~RobotUnitDataStreamingReceiver()
    {
        if (_proxy)
        {
            _receiver->_discard_data = true;
            if (!_description.entries.empty())
            {
                try
                {
                    _ru->stopDataStreaming(_proxy);
                }
                catch (...)
                {
                    ARMARX_WARNING << "did not stop streaming since the network call failed";
                }
            auto icemanager = _obj->getArmarXManager()->getIceManager();
            auto adapter = _obj->getArmarXManager()->getAdapter();
            adapter->remove(_receiver->_identity);

            while (icemanager->isObjectReachable(_receiver->_identity.name))
            {
                ARMARX_INFO << deactivateSpam() << "waiting until receiver is removed from ice";
            }
armar-user's avatar
armar-user committed
        _proxy = nullptr;
        _receiver = nullptr;
    }

armar-user's avatar
armar-user committed
    std::deque<RobotUnitDataStreaming::TimeStep>&
    RobotUnitDataStreamingReceiver::getDataBuffer()
    {
        ARMARX_CHECK_NOT_NULL(_receiver);
armar-user's avatar
armar-user committed
        std::map<std::uint64_t, RobotUnitDataStreaming::TimeStepSeq> data;
Raphael Grimm's avatar
Raphael Grimm committed
            std::lock_guard g{_receiver->_data_mutex};
            std::swap(data, _receiver->_data);
        }
        _tmp_data_buffer.merge(data);
        if (!data.empty())
        {
            ARMARX_ERROR << "Double message sequence IDs! This should not happen!\nIDs:\n"
                         << ARMARX_STREAM_PRINTER
            {
                for (const auto& [key, _] : data)
                {
                    out << "    " << key << "\n";
                }
            };
        }

        auto it = _tmp_data_buffer.begin();
        for (std::size_t idx = 0; it != _tmp_data_buffer.end(); ++it, ++idx)
            if (_last_iteration_id == -1)
            {
                _tmp_data_buffer_seq_id = it->first - 1;
            }
            if (_tmp_data_buffer_seq_id + 1 != it->first)
            {
                if (_tmp_data_buffer.size() > 10 && idx < _tmp_data_buffer.size() - 10)
                {
                    //there is a lot more data (10 updates) in the buffer!
                    //-> some message calls went missing!
                    ARMARX_ERROR << "some update messages went missing!";
                }
                else
                {
                    //maybe one or two frames are missing (due to async calls) -> wait
                    break;
                }
            }
            _tmp_data_buffer_seq_id = it->first;
            for (auto& step : it->second)
armar-user's avatar
armar-user committed
                if (_last_iteration_id != -1 && _last_iteration_id + 1 != step.iterationId)
                    ARMARX_INFO << deactivateSpam(60)
armar-user's avatar
armar-user committed
                                << "Missing Iterations or iterations out of order! "
                                << "This should not happen. " << VAROUT(_last_iteration_id) << ", "
                                << VAROUT(step.iterationId);
                }
                _last_iteration_id = step.iterationId;
                _data_buffer.emplace_back(std::move(step));
            }
        }
        _tmp_data_buffer.erase(_tmp_data_buffer.begin(), it);
        return _data_buffer;
    }

armar-user's avatar
armar-user committed
    const RobotUnitDataStreaming::DataStreamingDescription&
    RobotUnitDataStreamingReceiver::getDataDescription() const
    {
        return _description;
    }
armar-user's avatar
armar-user committed
    std::string
    RobotUnitDataStreamingReceiver::getDataDescriptionString() const
    {
        std::stringstream str;
        const auto& entr = getDataDescription().entries;
        str << "Received data (" << entr.size() << " entries):\n";
        for (const auto& [k, v] : entr)
        {
            str << "    " << k << ": type " << v.type << " index " << v.index << "\n";
        }
        return str.str();
    }
armar-user's avatar
armar-user committed
} // namespace armarx