Skip to content
Snippets Groups Projects

Updates/a6u1/2023 07 26

Merged Fabian Tërnava requested to merge updates/a6u1/2023-07-26 into master
2 files
+ 83
47
Compare changes
  • Side-by-side
  • Inline
Files
2
@@ -20,13 +20,13 @@
* GNU General Public License
*/
#include "RobotUnitDataStreamingReceiver.h"
#include <Ice/ObjectAdapter.h>
#include "ArmarXCore/core/logging/Logging.h"
#include <ArmarXCore/core/ArmarXManager.h>
#include "RobotUnitDataStreamingReceiver.h"
namespace armarx::detail::RobotUnitDataStreamingReceiver
{
class Receiver :
@@ -34,23 +34,37 @@ namespace armarx::detail::RobotUnitDataStreamingReceiver
virtual public RobotUnitDataStreaming::Receiver
{
public:
std::string getDefaultName() const override
std::string
getDefaultName() const override
{
return "RobotUnitDataStreamingReceiver";
}
~Receiver()
{
std::lock_guard g{_data_mutex};
}
void onInitComponent() override {}
void onConnectComponent() override {}
void onExitComponent() override {}
void update_async(
const RobotUnitDataStreaming::AMD_Receiver_updatePtr& ptr,
const RobotUnitDataStreaming::TimeStepSeq& data,
Ice::Long msgSequenceNbr,
const Ice::Current&) override
void
onInitComponent() override
{
}
void
onConnectComponent() override
{
}
void
onExitComponent() override
{
}
void
update_async(const RobotUnitDataStreaming::AMD_Receiver_updatePtr& ptr,
const RobotUnitDataStreaming::TimeStepSeq& data,
Ice::Long msgSequenceNbr,
const Ice::Current&) override
{
ptr->ice_response();
if (_discard_data)
@@ -65,32 +79,31 @@ namespace armarx::detail::RobotUnitDataStreamingReceiver
_data[seq] = data;
}
std::atomic_bool _discard_data = false;
std::mutex _data_mutex;
std::map<std::uint64_t, RobotUnitDataStreaming::TimeStepSeq> _data;
Ice::Identity _identity;
std::atomic_bool _discard_data = false;
std::mutex _data_mutex;
std::map<std::uint64_t, RobotUnitDataStreaming::TimeStepSeq> _data;
Ice::Identity _identity;
};
}
} // namespace armarx::detail::RobotUnitDataStreamingReceiver
namespace armarx
{
RobotUnitDataStreamingReceiver::RobotUnitDataStreamingReceiver(
const ManagedIceObjectPtr& obj,
const RobotUnitInterfacePrx& ru,
const RobotUnitDataStreaming::Config& cfg
) : _obj{obj}, _ru{ru}
const RobotUnitDataStreaming::Config& cfg) :
_obj{obj}, _ru{ru}
{
ARMARX_CHECK_NOT_NULL(_obj);
ARMARX_CHECK_NOT_NULL(_ru);
_receiver = make_shared<detail::RobotUnitDataStreamingReceiver::Receiver>();
_receiver->_identity.name =
_obj->getName() + "_RobotUnitDataStreamingReceiver_" +
std::to_string(clock_t::now().time_since_epoch().count());
_receiver->_identity.name = _obj->getName() + "_RobotUnitDataStreamingReceiver_" +
std::to_string(clock_t::now().time_since_epoch().count());
auto adapter = _obj->getArmarXManager()->getAdapter();
_proxy = RobotUnitDataStreaming::ReceiverPrx::uncheckedCast(
adapter->add(_receiver, _receiver->_identity));
adapter->add(_receiver, _receiver->_identity));
_description = _ru->startDataStreaming(_proxy, cfg);
}
@@ -120,14 +133,15 @@ namespace armarx
ARMARX_INFO << deactivateSpam() << "waiting until receiver is removed from ice";
}
}
_proxy = nullptr;
_proxy = nullptr;
_receiver = nullptr;
}
std::deque<RobotUnitDataStreaming::TimeStep>& RobotUnitDataStreamingReceiver::getDataBuffer()
std::deque<RobotUnitDataStreaming::TimeStep>&
RobotUnitDataStreamingReceiver::getDataBuffer()
{
ARMARX_CHECK_NOT_NULL(_receiver);
std::map<std::uint64_t, RobotUnitDataStreaming::TimeStepSeq> data;
std::map<std::uint64_t, RobotUnitDataStreaming::TimeStepSeq> data;
{
std::lock_guard g{_receiver->_data_mutex};
std::swap(data, _receiver->_data);
@@ -169,14 +183,12 @@ namespace armarx
_tmp_data_buffer_seq_id = it->first;
for (auto& step : it->second)
{
if (
_last_iteration_id != -1 &&
_last_iteration_id + 1 != step.iterationId
)
if (_last_iteration_id != -1 && _last_iteration_id + 1 != step.iterationId)
{
ARMARX_VERBOSE << "Missing Iterations or iterations out of order! "
<< "This should not happen. "
<< VAROUT(_last_iteration_id) << ", " << VAROUT(step.iterationId);
ARMARX_INFO << deactivateSpam(10)
<< "Missing Iterations or iterations out of order! "
<< "This should not happen. " << VAROUT(_last_iteration_id) << ", "
<< VAROUT(step.iterationId);
}
_last_iteration_id = step.iterationId;
_data_buffer.emplace_back(std::move(step));
@@ -186,12 +198,14 @@ namespace armarx
return _data_buffer;
}
const RobotUnitDataStreaming::DataStreamingDescription& RobotUnitDataStreamingReceiver::getDataDescription() const
const RobotUnitDataStreaming::DataStreamingDescription&
RobotUnitDataStreamingReceiver::getDataDescription() const
{
return _description;
}
std::string RobotUnitDataStreamingReceiver::getDataDescriptionString() const
std::string
RobotUnitDataStreamingReceiver::getDataDescriptionString() const
{
std::stringstream str;
const auto& entr = getDataDescription().entries;
@@ -202,4 +216,4 @@ namespace armarx
}
return str.str();
}
}
} // namespace armarx
Loading