-
Fabian Reister authoredFabian Reister authored
RobotUnitDataStreamingReceiver.cpp 7.17 KiB
/*
* This file is part of ArmarX.
*
* ArmarX is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License version 2 as
* published by the Free Software Foundation.
*
* ArmarX is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* @package RobotAPI::ArmarXObjects::RobotUnitDataStreamingReceiver
* @author Raphael Grimm ( raphael dot grimm at kit dot edu )
* @date 2020
* @copyright http://www.gnu.org/licenses/gpl-2.0.txt
* GNU General Public License
*/
#include <Ice/ObjectAdapter.h>
#include <ArmarXCore/core/ArmarXManager.h>
#include "RobotUnitDataStreamingReceiver.h"
namespace armarx::detail::RobotUnitDataStreamingReceiver
{
class Receiver :
virtual public ManagedIceObject,
virtual public RobotUnitDataStreaming::Receiver
{
public:
std::string getDefaultName() const override
{
return "RobotUnitDataStreamingReceiver";
}
~Receiver()
{
std::lock_guard g{_data_mutex};
}
void onInitComponent() override {}
void onConnectComponent() override {}
void onExitComponent() override {}
void update_async(
const RobotUnitDataStreaming::AMD_Receiver_updatePtr& ptr,
const RobotUnitDataStreaming::TimeStepSeq& data,
Ice::Long msgSequenceNbr,
const Ice::Current&) override
{
ptr->ice_response();
if (_discard_data)
{
return;
}
static_assert(sizeof(std::uint64_t) == sizeof(msgSequenceNbr));
const auto seq = static_cast<std::uint64_t>(msgSequenceNbr);
std::lock_guard g{_data_mutex};
ARMARX_INFO << deactivateSpam()
<< "received " << data.size() << " timesteps";
_data[seq] = data;
}
std::atomic_bool _discard_data = false;
std::mutex _data_mutex;
std::map<std::uint64_t, RobotUnitDataStreaming::TimeStepSeq> _data;
Ice::Identity _identity;
};
}
namespace armarx
{
RobotUnitDataStreamingReceiver::RobotUnitDataStreamingReceiver(
const ManagedIceObjectPtr& obj,
const RobotUnitInterfacePrx& ru,
const RobotUnitDataStreaming::Config& cfg
) : _obj{obj}, _ru{ru}
{
ARMARX_CHECK_NOT_NULL(_obj);
ARMARX_CHECK_NOT_NULL(_ru);
_receiver = make_shared<detail::RobotUnitDataStreamingReceiver::Receiver>();
_receiver->_identity.name =
_obj->getName() + "_RobotUnitDataStreamingReceiver_" +
std::to_string(clock_t::now().time_since_epoch().count());
auto adapter = _obj->getArmarXManager()->getAdapter();
_proxy = RobotUnitDataStreaming::ReceiverPrx::uncheckedCast(
adapter->add(_receiver, _receiver->_identity));
_description = _ru->startDataStreaming(_proxy, cfg);
}
RobotUnitDataStreamingReceiver::~RobotUnitDataStreamingReceiver()
{
if (_proxy)
{
_receiver->_discard_data = true;
if (!_description.entries.empty())
{
try
{
_ru->stopDataStreaming(_proxy);
}
catch (...)
{
ARMARX_WARNING << "did not stop streaming since the network call failed";
}
}
auto icemanager = _obj->getArmarXManager()->getIceManager();
auto adapter = _obj->getArmarXManager()->getAdapter();
adapter->remove(_receiver->_identity);
while (icemanager->isObjectReachable(_receiver->_identity.name))
{
ARMARX_INFO << deactivateSpam() << "waiting until receiver is removed from ice";
}
}
_proxy = nullptr;
_receiver = nullptr;
}
std::deque<RobotUnitDataStreaming::TimeStep>& RobotUnitDataStreamingReceiver::getDataBuffer()
{
ARMARX_CHECK_NOT_NULL(_receiver);
std::map<std::uint64_t, RobotUnitDataStreaming::TimeStepSeq> data;
{
std::lock_guard g{_receiver->_data_mutex};
std::swap(data, _receiver->_data);
}
_tmp_data_buffer.merge(data);
if (!data.empty())
{
ARMARX_ERROR << "Double message sequence IDs! This should not happen!\nIDs:\n"
<< ARMARX_STREAM_PRINTER
{
for (const auto& [key, _] : data)
{
out << " " << key << "\n";
}
};
}
auto it = _tmp_data_buffer.begin();
for (std::size_t idx = 0; it != _tmp_data_buffer.end(); ++it, ++idx)
{
if (_last_iteration_id == -1)
{
_tmp_data_buffer_seq_id = it->first - 1;
}
if (_tmp_data_buffer_seq_id + 1 != it->first)
{
if (_tmp_data_buffer.size() > 10 && idx < _tmp_data_buffer.size() - 10)
{
//there is a lot more data (10 updates) in the buffer!
//-> some message calls went missing!
ARMARX_ERROR << "some update messages went missing!";
}
else
{
//maybe one or two frames are missing (due to async calls) -> wait
break;
}
}
_tmp_data_buffer_seq_id = it->first;
for (auto& step : it->second)
{
if (
_last_iteration_id != -1 &&
_last_iteration_id + 1 != step.iterationId
)
{
ARMARX_ERROR << "Missing Iterations or iterations out of order! "
<< "This should not happen. "
<< VAROUT(_last_iteration_id) << ", " << VAROUT(step.iterationId);
}
_last_iteration_id = step.iterationId;
_data_buffer.emplace_back(std::move(step));
}
}
_tmp_data_buffer.erase(_tmp_data_buffer.begin(), it);
return _data_buffer;
}
const RobotUnitDataStreaming::DataStreamingDescription& RobotUnitDataStreamingReceiver::getDataDescription() const
{
return _description;
}
std::string RobotUnitDataStreamingReceiver::getDataDescriptionString() const
{
std::stringstream str;
const auto& entr = getDataDescription().entries;
str << "Received data (" << entr.size() << " entries):\n";
for (const auto& [k, v] : entr)
{
str << " " << k << ": type " << v.type << " index " << v.index << "\n";
}
return str.str();
}
}