Skip to content
Snippets Groups Projects

(Scoped)SubscriptionHandle to unsubscribe from memory updates

Merged Patrick Dormanns requested to merge MemoryListener-unsubscribe into master
Files
5
@@ -2,36 +2,33 @@
#include <sstream>
#include <ArmarXCore/core/ManagedIceObject.h>
#include <ArmarXCore/core/exceptions/LocalException.h>
#include <ArmarXCore/core/logging/Logging.h>
#include <ArmarXCore/core/ice_conversions/ice_conversions_templates.h>
#include <ArmarXCore/core/ManagedIceObject.h>
#include <ArmarXCore/core/logging/Logging.h>
#include <RobotAPI/libraries/armem/core/ice_conversions.h>
#include <RobotAPI/libraries/armem/core/error.h>
#include <RobotAPI/libraries/armem/core/ice_conversions.h>
namespace armarx::armem::client::util
{
std::string MemoryListener::MakeMemoryTopicName(const MemoryID& memoryID)
std::string
MemoryListener::MakeMemoryTopicName(const MemoryID& memoryID)
{
return "MemoryUpdates." + memoryID.memoryName;
}
MemoryListener::MemoryListener(ManagedIceObject* component) :
component(component)
MemoryListener::MemoryListener(ManagedIceObject* component) : component(component)
{
}
void MemoryListener::setComponent(ManagedIceObject* component)
void
MemoryListener::setComponent(ManagedIceObject* component)
{
this->component = component;
}
void
MemoryListener::updated(const std::vector<data::MemoryID>& updatedSnapshotIDs) const
{
@@ -40,7 +37,6 @@ namespace armarx::armem::client::util
updated(bos);
}
void
MemoryListener::updated(const std::vector<MemoryID>& updatedSnapshotIDs) const
{
@@ -76,68 +72,104 @@ namespace armarx::armem::client::util
if (not matchingSnapshotIDs.empty())
{
ARMARX_DEBUG << "Calling " << subCallbacks.size() << " callbacks"
<< " subscribing " << subscription
<< " with " << matchingSnapshotIDs.size() << " snapshot IDs ...";
for (auto& callback : subCallbacks)
<< " subscribing " << subscription << " with "
<< matchingSnapshotIDs.size() << " snapshot IDs ...";
for (auto& managedCallback : subCallbacks)
{
try
{
callback(subscription, matchingSnapshotIDs);
managedCallback.callback(subscription, matchingSnapshotIDs);
}
catch (const armarx::LocalException& e)
{
error << "Calling callback subscribing " << subscription << " failed."
<< "\nCaught armarx::LocalException:"
<< "\n" << e.getReason()
<< "\n Stacktrace: \n" << e.generateBacktrace()
<< "\n"
;
<< e.getReason() << "\n Stacktrace: \n"
<< e.generateBacktrace() << "\n";
}
catch (const std::exception& e)
{
error << "Calling callback subscribing " << subscription << " failed."
<< "\nCaught armarx::Exception:"
<< "\n" << e.what()
<< "\n"
;
<< e.what() << "\n";
}
catch (...)
{
error << "Calling callback subscribing " << subscription << " failed."
<< "\nCaught unknown exception."
<< "\n"
;
<< "\n";
}
}
}
}
if (error.str().size() > 0)
{
ARMARX_WARNING << "The following issues were encountered during MemoryListener::" << __FUNCTION__ << "(): \n\n"
ARMARX_WARNING << "The following issues were encountered during MemoryListener::"
<< __FUNCTION__ << "(): \n\n"
<< error.str();
}
}
void
MemoryListener::subscribe(const MemoryID& id, Callback callback)
SubscriptionHandle
MemoryListener::subscribe(const MemoryID& memoryID, Callback callback)
{
callbacks[id].push_back(callback);
if (component)
ARMARX_CHECK_NOT_EMPTY(memoryID.memoryName)
<< "The memoryName must be specified to subscribe";
if (component and memoryRefCount[memoryID.memoryName] == 0)
{
component->usingTopic(MakeMemoryTopicName(id));
component->usingTopic(MakeMemoryTopicName(memoryID));
}
auto id = nextId++;
callbacks[memoryID].push_back({id, callback});
memoryRefCount[memoryID.memoryName]++;
return SubscriptionHandle(this, memoryID, id);
}
SubscriptionHandle
MemoryListener::subscribe(const MemoryID& subscriptionID, CallbackUpdatedOnly callback)
{
return subscribe(
subscriptionID,
[callback](const MemoryID&, const std::vector<MemoryID>& updatedSnapshotIDs)
{ callback(updatedSnapshotIDs); });
}
void
MemoryListener::subscribe(const MemoryID& subscriptionID, CallbackUpdatedOnly callback)
MemoryListener::unsubscribe(SubscriptionHandle& handle)
{
subscribe(subscriptionID, [callback](const MemoryID&, const std::vector<MemoryID>& updatedSnapshotIDs)
if (not handle.valid)
{
return;
}
handle.valid = false;
// Remove ManagedCallback with ManagedCallback.id == handle.id from callbacks[handle.memoryID]
auto it = std::find_if(callbacks[handle.memoryID].begin(),
callbacks[handle.memoryID].end(),
[&handle](ManagedCallback& mCb) { return mCb.id == handle.id; });
std::iter_swap(it, callbacks[handle.memoryID].end() - 1);
callbacks[handle.memoryID].pop_back();
memoryRefCount[handle.memoryID.memoryName]--;
if (callbacks[handle.memoryID].empty())
{
callback(updatedSnapshotIDs);
});
callbacks.erase(handle.memoryID);
// unsubscribe from memory topic if no remainig callback needs it
if (component and memoryRefCount[handle.memoryID.memoryName] == 0)
{
component->unsubscribeFromTopic(MakeMemoryTopicName(handle.memoryID));
}
}
}
}
} // namespace armarx::armem::client::util
Loading