Skip to content
Snippets Groups Projects
Commit 6415bc0b authored by Patrick Dormanns's avatar Patrick Dormanns
Browse files

implemented MemoryListener::SubscriptionHandle to unsubscirbe from memory

parent 84426201
No related branches found
No related tags found
1 merge request!391(Scoped)SubscriptionHandle to unsubscribe from memory updates
Pipeline #15632 failed
......@@ -2,35 +2,68 @@
#include <sstream>
#include <ArmarXCore/core/ManagedIceObject.h>
#include <ArmarXCore/core/exceptions/LocalException.h>
#include <ArmarXCore/core/logging/Logging.h>
#include <ArmarXCore/core/ice_conversions/ice_conversions_templates.h>
#include <ArmarXCore/core/ManagedIceObject.h>
#include <ArmarXCore/core/logging/Logging.h>
#include <RobotAPI/libraries/armem/core/ice_conversions.h>
#include <RobotAPI/libraries/armem/core/error.h>
#include <RobotAPI/libraries/armem/core/ice_conversions.h>
namespace armarx::armem::client::util
{
std::string MemoryListener::MakeMemoryTopicName(const MemoryID& memoryID)
MemoryListener::SubscriptionHandle::SubscriptionHandle(MemoryListener* memoryListener,
MemoryID memoryID,
long id)
{
return "MemoryUpdates." + memoryID.memoryName;
this->memoryListener = memoryListener;
this->memoryID = memoryID;
this->id = id;
}
void
MemoryListener::SubscriptionHandle::release()
{
memoryListener->unsubscribe(*this);
}
MemoryListener::MemoryListener(ManagedIceObject* component) :
component(component)
MemoryListener::ScopedSubscriptionHandle::ScopedSubscriptionHandle(SubscriptionHandle handle) :
handle(handle)
{
}
MemoryListener::ScopedSubscriptionHandle*
MemoryListener::ScopedSubscriptionHandle::operator=(const SubscriptionHandle& other)
{
handle.emplace(other);
return this;
}
void MemoryListener::setComponent(ManagedIceObject* component)
MemoryListener::ScopedSubscriptionHandle::~ScopedSubscriptionHandle()
{
this->component = component;
if (handle)
{
handle->release();
}
}
std::string
MemoryListener::MakeMemoryTopicName(const MemoryID& memoryID)
{
return "MemoryUpdates." + memoryID.memoryName;
}
MemoryListener::MemoryListener(ManagedIceObject* component) : component(component)
{
}
void
MemoryListener::setComponent(ManagedIceObject* component)
{
this->component = component;
}
void
MemoryListener::updated(const std::vector<data::MemoryID>& updatedSnapshotIDs) const
......@@ -40,7 +73,6 @@ namespace armarx::armem::client::util
updated(bos);
}
void
MemoryListener::updated(const std::vector<MemoryID>& updatedSnapshotIDs) const
{
......@@ -76,68 +108,84 @@ namespace armarx::armem::client::util
if (not matchingSnapshotIDs.empty())
{
ARMARX_DEBUG << "Calling " << subCallbacks.size() << " callbacks"
<< " subscribing " << subscription
<< " with " << matchingSnapshotIDs.size() << " snapshot IDs ...";
for (auto& callback : subCallbacks)
<< " subscribing " << subscription << " with "
<< matchingSnapshotIDs.size() << " snapshot IDs ...";
for (auto& managedCallback : subCallbacks)
{
try
{
callback(subscription, matchingSnapshotIDs);
managedCallback.callback(subscription, matchingSnapshotIDs);
}
catch (const armarx::LocalException& e)
{
error << "Calling callback subscribing " << subscription << " failed."
<< "\nCaught armarx::LocalException:"
<< "\n" << e.getReason()
<< "\n Stacktrace: \n" << e.generateBacktrace()
<< "\n"
;
<< e.getReason() << "\n Stacktrace: \n"
<< e.generateBacktrace() << "\n";
}
catch (const std::exception& e)
{
error << "Calling callback subscribing " << subscription << " failed."
<< "\nCaught armarx::Exception:"
<< "\n" << e.what()
<< "\n"
;
<< e.what() << "\n";
}
catch (...)
{
error << "Calling callback subscribing " << subscription << " failed."
<< "\nCaught unknown exception."
<< "\n"
;
<< "\n";
}
}
}
}
if (error.str().size() > 0)
{
ARMARX_WARNING << "The following issues were encountered during MemoryListener::" << __FUNCTION__ << "(): \n\n"
ARMARX_WARNING << "The following issues were encountered during MemoryListener::"
<< __FUNCTION__ << "(): \n\n"
<< error.str();
}
}
void
MemoryListener::subscribe(const MemoryID& id, Callback callback)
MemoryListener::SubscriptionHandle
MemoryListener::subscribe(const MemoryID& memoryID, Callback callback)
{
callbacks[id].push_back(callback);
if (component)
if (component && callbacks.count(memoryID) == 0)
{
component->usingTopic(MakeMemoryTopicName(id));
component->usingTopic(MakeMemoryTopicName(memoryID));
}
long id = next_id++;
callbacks[memoryID].push_back({id, callback});
return SubscriptionHandle(this, memoryID, id);
}
MemoryListener::SubscriptionHandle
MemoryListener::subscribe(const MemoryID& subscriptionID, CallbackUpdatedOnly callback)
{
return subscribe(
subscriptionID,
[callback](const MemoryID&, const std::vector<MemoryID>& updatedSnapshotIDs)
{ callback(updatedSnapshotIDs); });
}
void
MemoryListener::subscribe(const MemoryID& subscriptionID, CallbackUpdatedOnly callback)
MemoryListener::unsubscribe(SubscriptionHandle& handle)
{
subscribe(subscriptionID, [callback](const MemoryID&, const std::vector<MemoryID>& updatedSnapshotIDs)
// Remove ManagedCallback with ManagedCallback.id == handle.id from callbacks[handle.memoryID]
auto it = std::find_if(callbacks[handle.memoryID].begin(),
callbacks[handle.memoryID].end(),
[handle](ManagedCallback mCb) { return mCb.id == handle.id; });
if (it->id == handle.id)
{
callback(updatedSnapshotIDs);
});
std::iter_swap(it, callbacks[handle.memoryID].end());
callbacks[handle.memoryID].pop_back();
}
}
}
} // namespace armarx::armem::client::util
......@@ -3,6 +3,7 @@
// STD/STL
#include <functional>
#include <list>
#include <unordered_map>
#include <vector>
......@@ -10,8 +11,6 @@
#include <RobotAPI/interface/armem/client/MemoryListenerInterface.h>
#include <RobotAPI/libraries/armem/core/MemoryID.h>
namespace armarx
{
class ManagedIceObject;
......@@ -26,27 +25,61 @@ namespace armarx::armem::client::util
*/
class MemoryListener
{
public:
using Callback = std::function<void(const MemoryID& subscriptionID, const std::vector<MemoryID>& updatedSnapshotIDs)>;
using CallbackUpdatedOnly = std::function<void(const std::vector<MemoryID>& updatedSnapshotIDs)>;
public:
using Callback = std::function<void(const MemoryID& subscriptionID,
const std::vector<MemoryID>& updatedSnapshotIDs)>;
using CallbackUpdatedOnly =
std::function<void(const std::vector<MemoryID>& updatedSnapshotIDs)>;
template <class CalleeT>
using MemberCallback = void(CalleeT::*)(const MemoryID& subscriptionID, const std::vector<MemoryID>& updatedSnapshotIDs);
using MemberCallback = void (CalleeT::*)(const MemoryID& subscriptionID,
const std::vector<MemoryID>& updatedSnapshotIDs);
template <class CalleeT>
using MemberCallbackUpdatedOnly = void(CalleeT::*)(const std::vector<MemoryID>& updatedSnapshotIDs);
using MemberCallbackUpdatedOnly =
void (CalleeT::*)(const std::vector<MemoryID>& updatedSnapshotIDs);
static std::string MakeMemoryTopicName(const MemoryID& memoryID);
public:
class SubscriptionHandle
{
friend class MemoryListener;
public:
//SubscriptionHandle() = delete;
void release();
private:
SubscriptionHandle(MemoryListener* memoryListener, MemoryID memoryID, long id);
private:
MemoryListener* memoryListener;
MemoryID memoryID;
long id;
};
class ScopedSubscriptionHandle
{
public:
ScopedSubscriptionHandle(SubscriptionHandle handle);
ScopedSubscriptionHandle* operator=(SubscriptionHandle const& other);
~ScopedSubscriptionHandle();
private:
std::optional<SubscriptionHandle> handle;
};
public:
MemoryListener(ManagedIceObject* component = nullptr);
void setComponent(ManagedIceObject* component);
void subscribe(const MemoryID& subscriptionID, Callback Callback);
void subscribe(const MemoryID& subscriptionID, CallbackUpdatedOnly Callback);
SubscriptionHandle subscribe(const MemoryID& subscriptionID, Callback Callback);
SubscriptionHandle subscribe(const MemoryID& subscriptionID, CallbackUpdatedOnly Callback);
/**
* Subscribe with a class member function:
......@@ -55,28 +88,33 @@ namespace armarx::armem::client::util
* @endcode
*/
template <class CalleeT>
void subscribe(const MemoryID& subscriptionID, CalleeT* callee, MemberCallback<CalleeT> callback)
MemoryListener::SubscriptionHandle
subscribe(const MemoryID& subscriptionID, CalleeT* callee, MemberCallback<CalleeT> callback)
{
auto cb = [callee, callback](const MemoryID & subscriptionID, const std::vector<MemoryID>& updatedSnapshotIDs)
{
(callee->*callback)(subscriptionID, updatedSnapshotIDs);
};
subscribe(subscriptionID, cb);
auto cb = [callee, callback](const MemoryID& subscriptionID,
const std::vector<MemoryID>& updatedSnapshotIDs)
{ (callee->*callback)(subscriptionID, updatedSnapshotIDs); };
return subscribe(subscriptionID, cb);
}
template <class CalleeT>
void subscribe(const MemoryID& subscriptionID, CalleeT* callee, MemberCallbackUpdatedOnly<CalleeT> callback)
MemoryListener::SubscriptionHandle
subscribe(const MemoryID& subscriptionID,
CalleeT* callee,
MemberCallbackUpdatedOnly<CalleeT> callback)
{
auto cb = [callee, callback](const MemoryID&, const std::vector<MemoryID>& updatedSnapshotIDs)
auto cb =
[callee, callback](const MemoryID&, const std::vector<MemoryID>& updatedSnapshotIDs)
{
if(callee)
if (callee)
{
(callee->*callback)(updatedSnapshotIDs);
}
};
subscribe(subscriptionID, cb);
return subscribe(subscriptionID, cb);
}
void unsubscribe(SubscriptionHandle& subscriptionHandle);
/// Function handling updates from the MemoryListener ice topic.
void updated(const std::vector<MemoryID>& updatedIDs) const;
......@@ -84,13 +122,18 @@ namespace armarx::armem::client::util
protected:
long next_id = 0;
std::unordered_map<MemoryID, std::vector<Callback>> callbacks;
typedef struct
{
long id;
Callback callback;
} ManagedCallback;
private:
std::unordered_map<MemoryID, std::vector<ManagedCallback>> callbacks;
private:
armarx::ManagedIceObject* component;
};
}
} // namespace armarx::armem::client::util
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment