Newer
Older
/*
* This file is part of ArmarX.
*
* ArmarX is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License version 2 as
* published by the Free Software Foundation.
*
* ArmarX is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* @package RobotAPI::ArmarXObjects::RobotUnitDataStreamingReceiver
* @author Raphael Grimm ( raphael dot grimm at kit dot edu )
* @date 2020
* @copyright http://www.gnu.org/licenses/gpl-2.0.txt
* GNU General Public License
*/
#include "ArmarXCore/core/logging/Logging.h"
#include <ArmarXCore/core/ArmarXManager.h>
namespace armarx::detail::RobotUnitDataStreamingReceiver
{
class Receiver :
virtual public ManagedIceObject,
virtual public RobotUnitDataStreaming::Receiver
{
public:
{
return "RobotUnitDataStreamingReceiver";
}
void
onInitComponent() override
{
}
void
onConnectComponent() override
{
}
void
onExitComponent() override
{
}
void
update_async(const RobotUnitDataStreaming::AMD_Receiver_updatePtr& ptr,
const RobotUnitDataStreaming::TimeStepSeq& data,
Ice::Long msgSequenceNbr,
const Ice::Current&) override
static_assert(sizeof(std::uint64_t) == sizeof(msgSequenceNbr));
const auto seq = static_cast<std::uint64_t>(msgSequenceNbr);
ARMARX_VERBOSE << deactivateSpam()
<< "received " << data.size() << " timesteps";
std::atomic_bool _discard_data = false;
std::mutex _data_mutex;
std::map<std::uint64_t, RobotUnitDataStreaming::TimeStepSeq> _data;
Ice::Identity _identity;
} // namespace armarx::detail::RobotUnitDataStreamingReceiver
namespace armarx
{
RobotUnitDataStreamingReceiver::RobotUnitDataStreamingReceiver(
const ManagedIceObjectPtr& obj,
const RobotUnitInterfacePrx& ru,
const RobotUnitDataStreaming::Config& cfg) :
_obj{obj}, _ru{ru}
{
ARMARX_CHECK_NOT_NULL(_obj);
ARMARX_CHECK_NOT_NULL(_ru);
_receiver = make_shared<detail::RobotUnitDataStreamingReceiver::Receiver>();
_receiver->_identity.name = _obj->getName() + "_RobotUnitDataStreamingReceiver_" +
std::to_string(clock_t::now().time_since_epoch().count());
auto adapter = _obj->getArmarXManager()->getAdapter();
_proxy = RobotUnitDataStreaming::ReceiverPrx::uncheckedCast(
_description = _ru->startDataStreaming(_proxy, cfg);
}
RobotUnitDataStreamingReceiver::~RobotUnitDataStreamingReceiver()
{
if (_proxy)
{
if (!_description.entries.empty())
{
try
{
_ru->stopDataStreaming(_proxy);
}
catch (...)
{
ARMARX_WARNING << "did not stop streaming since the network call failed";
}
auto icemanager = _obj->getArmarXManager()->getIceManager();
auto adapter = _obj->getArmarXManager()->getAdapter();
adapter->remove(_receiver->_identity);
while (icemanager->isObjectReachable(_receiver->_identity.name))
{
ARMARX_INFO << deactivateSpam() << "waiting until receiver is removed from ice";
}
std::deque<RobotUnitDataStreaming::TimeStep>&
RobotUnitDataStreamingReceiver::getDataBuffer()
{
ARMARX_CHECK_NOT_NULL(_receiver);
std::map<std::uint64_t, RobotUnitDataStreaming::TimeStepSeq> data;
std::swap(data, _receiver->_data);
}
_tmp_data_buffer.merge(data);
if (!data.empty())
{
ARMARX_ERROR << "Double message sequence IDs! This should not happen!\nIDs:\n"
<< ARMARX_STREAM_PRINTER
{
for (const auto& [key, _] : data)
{
out << " " << key << "\n";
}
};
}
auto it = _tmp_data_buffer.begin();
for (std::size_t idx = 0; it != _tmp_data_buffer.end(); ++it, ++idx)
if (_last_iteration_id == -1)
{
_tmp_data_buffer_seq_id = it->first - 1;
}
if (_tmp_data_buffer_seq_id + 1 != it->first)
{
if (_tmp_data_buffer.size() > 10 && idx < _tmp_data_buffer.size() - 10)
{
//there is a lot more data (10 updates) in the buffer!
//-> some message calls went missing!
ARMARX_ERROR << "some update messages went missing!";
}
else
{
//maybe one or two frames are missing (due to async calls) -> wait
break;
}
}
_tmp_data_buffer_seq_id = it->first;
for (auto& step : it->second)
if (_last_iteration_id != -1 && _last_iteration_id + 1 != step.iterationId)
ARMARX_INFO << deactivateSpam(60)
<< "Missing Iterations or iterations out of order! "
<< "This should not happen. " << VAROUT(_last_iteration_id) << ", "
<< VAROUT(step.iterationId);
}
_last_iteration_id = step.iterationId;
_data_buffer.emplace_back(std::move(step));
}
}
_tmp_data_buffer.erase(_tmp_data_buffer.begin(), it);
const RobotUnitDataStreaming::DataStreamingDescription&
RobotUnitDataStreamingReceiver::getDataDescription() const
std::string
RobotUnitDataStreamingReceiver::getDataDescriptionString() const
{
std::stringstream str;
const auto& entr = getDataDescription().entries;
str << "Received data (" << entr.size() << " entries):\n";
for (const auto& [k, v] : entr)
{
str << " " << k << ": type " << v.type << " index " << v.index << "\n";
}
return str.str();
}