From 6415bc0b55f8fd0836c491c9be23ef6f45ab0c10 Mon Sep 17 00:00:00 2001 From: Patrick Dormanns <patrick.dormanns@student.kit.edu> Date: Thu, 12 Oct 2023 22:42:14 +0200 Subject: [PATCH] implemented MemoryListener::SubscriptionHandle to unsubscirbe from memory --- .../armem/client/util/MemoryListener.cpp | 116 +++++++++++++----- .../armem/client/util/MemoryListener.h | 89 ++++++++++---- 2 files changed, 148 insertions(+), 57 deletions(-) diff --git a/source/RobotAPI/libraries/armem/client/util/MemoryListener.cpp b/source/RobotAPI/libraries/armem/client/util/MemoryListener.cpp index c8a7c6c68..669be5db6 100644 --- a/source/RobotAPI/libraries/armem/client/util/MemoryListener.cpp +++ b/source/RobotAPI/libraries/armem/client/util/MemoryListener.cpp @@ -2,35 +2,68 @@ #include <sstream> +#include <ArmarXCore/core/ManagedIceObject.h> #include <ArmarXCore/core/exceptions/LocalException.h> -#include <ArmarXCore/core/logging/Logging.h> #include <ArmarXCore/core/ice_conversions/ice_conversions_templates.h> -#include <ArmarXCore/core/ManagedIceObject.h> +#include <ArmarXCore/core/logging/Logging.h> -#include <RobotAPI/libraries/armem/core/ice_conversions.h> #include <RobotAPI/libraries/armem/core/error.h> - +#include <RobotAPI/libraries/armem/core/ice_conversions.h> namespace armarx::armem::client::util { - std::string MemoryListener::MakeMemoryTopicName(const MemoryID& memoryID) + + MemoryListener::SubscriptionHandle::SubscriptionHandle(MemoryListener* memoryListener, + MemoryID memoryID, + long id) { - return "MemoryUpdates." + memoryID.memoryName; + this->memoryListener = memoryListener; + this->memoryID = memoryID; + this->id = id; } + void + MemoryListener::SubscriptionHandle::release() + { + memoryListener->unsubscribe(*this); + } - MemoryListener::MemoryListener(ManagedIceObject* component) : - component(component) + MemoryListener::ScopedSubscriptionHandle::ScopedSubscriptionHandle(SubscriptionHandle handle) : + handle(handle) { } + MemoryListener::ScopedSubscriptionHandle* + MemoryListener::ScopedSubscriptionHandle::operator=(const SubscriptionHandle& other) + { + handle.emplace(other); + return this; + } - void MemoryListener::setComponent(ManagedIceObject* component) + MemoryListener::ScopedSubscriptionHandle::~ScopedSubscriptionHandle() { - this->component = component; + if (handle) + { + handle->release(); + } } + std::string + MemoryListener::MakeMemoryTopicName(const MemoryID& memoryID) + { + return "MemoryUpdates." + memoryID.memoryName; + } + + MemoryListener::MemoryListener(ManagedIceObject* component) : component(component) + { + } + + void + MemoryListener::setComponent(ManagedIceObject* component) + { + this->component = component; + } void MemoryListener::updated(const std::vector<data::MemoryID>& updatedSnapshotIDs) const @@ -40,7 +73,6 @@ namespace armarx::armem::client::util updated(bos); } - void MemoryListener::updated(const std::vector<MemoryID>& updatedSnapshotIDs) const { @@ -76,68 +108,84 @@ namespace armarx::armem::client::util if (not matchingSnapshotIDs.empty()) { ARMARX_DEBUG << "Calling " << subCallbacks.size() << " callbacks" - << " subscribing " << subscription - << " with " << matchingSnapshotIDs.size() << " snapshot IDs ..."; - for (auto& callback : subCallbacks) + << " subscribing " << subscription << " with " + << matchingSnapshotIDs.size() << " snapshot IDs ..."; + for (auto& managedCallback : subCallbacks) { try { - callback(subscription, matchingSnapshotIDs); + managedCallback.callback(subscription, matchingSnapshotIDs); } catch (const armarx::LocalException& e) { error << "Calling callback subscribing " << subscription << " failed." << "\nCaught armarx::LocalException:" - << "\n" << e.getReason() - << "\n Stacktrace: \n" << e.generateBacktrace() << "\n" - ; + << e.getReason() << "\n Stacktrace: \n" + << e.generateBacktrace() << "\n"; } catch (const std::exception& e) { error << "Calling callback subscribing " << subscription << " failed." << "\nCaught armarx::Exception:" - << "\n" << e.what() << "\n" - ; + << e.what() << "\n"; } catch (...) { error << "Calling callback subscribing " << subscription << " failed." << "\nCaught unknown exception." - << "\n" - ; + << "\n"; } } } } + if (error.str().size() > 0) { - ARMARX_WARNING << "The following issues were encountered during MemoryListener::" << __FUNCTION__ << "(): \n\n" + ARMARX_WARNING << "The following issues were encountered during MemoryListener::" + << __FUNCTION__ << "(): \n\n" << error.str(); } } - - void - MemoryListener::subscribe(const MemoryID& id, Callback callback) + MemoryListener::SubscriptionHandle + MemoryListener::subscribe(const MemoryID& memoryID, Callback callback) { - callbacks[id].push_back(callback); - if (component) + if (component && callbacks.count(memoryID) == 0) { - component->usingTopic(MakeMemoryTopicName(id)); + component->usingTopic(MakeMemoryTopicName(memoryID)); } + + long id = next_id++; + callbacks[memoryID].push_back({id, callback}); + + return SubscriptionHandle(this, memoryID, id); } + MemoryListener::SubscriptionHandle + MemoryListener::subscribe(const MemoryID& subscriptionID, CallbackUpdatedOnly callback) + { + return subscribe( + subscriptionID, + [callback](const MemoryID&, const std::vector<MemoryID>& updatedSnapshotIDs) + { callback(updatedSnapshotIDs); }); + } void - MemoryListener::subscribe(const MemoryID& subscriptionID, CallbackUpdatedOnly callback) + MemoryListener::unsubscribe(SubscriptionHandle& handle) { - subscribe(subscriptionID, [callback](const MemoryID&, const std::vector<MemoryID>& updatedSnapshotIDs) + // Remove ManagedCallback with ManagedCallback.id == handle.id from callbacks[handle.memoryID] + auto it = std::find_if(callbacks[handle.memoryID].begin(), + callbacks[handle.memoryID].end(), + [handle](ManagedCallback mCb) { return mCb.id == handle.id; }); + + if (it->id == handle.id) { - callback(updatedSnapshotIDs); - }); + std::iter_swap(it, callbacks[handle.memoryID].end()); + callbacks[handle.memoryID].pop_back(); + } } -} +} // namespace armarx::armem::client::util diff --git a/source/RobotAPI/libraries/armem/client/util/MemoryListener.h b/source/RobotAPI/libraries/armem/client/util/MemoryListener.h index 521d9ce69..7c6bb2ef9 100644 --- a/source/RobotAPI/libraries/armem/client/util/MemoryListener.h +++ b/source/RobotAPI/libraries/armem/client/util/MemoryListener.h @@ -3,6 +3,7 @@ // STD/STL #include <functional> +#include <list> #include <unordered_map> #include <vector> @@ -10,8 +11,6 @@ #include <RobotAPI/interface/armem/client/MemoryListenerInterface.h> #include <RobotAPI/libraries/armem/core/MemoryID.h> - - namespace armarx { class ManagedIceObject; @@ -26,27 +25,61 @@ namespace armarx::armem::client::util */ class MemoryListener { - public: - using Callback = std::function<void(const MemoryID& subscriptionID, const std::vector<MemoryID>& updatedSnapshotIDs)>; - using CallbackUpdatedOnly = std::function<void(const std::vector<MemoryID>& updatedSnapshotIDs)>; + public: + using Callback = std::function<void(const MemoryID& subscriptionID, + const std::vector<MemoryID>& updatedSnapshotIDs)>; + using CallbackUpdatedOnly = + std::function<void(const std::vector<MemoryID>& updatedSnapshotIDs)>; template <class CalleeT> - using MemberCallback = void(CalleeT::*)(const MemoryID& subscriptionID, const std::vector<MemoryID>& updatedSnapshotIDs); + using MemberCallback = void (CalleeT::*)(const MemoryID& subscriptionID, + const std::vector<MemoryID>& updatedSnapshotIDs); template <class CalleeT> - using MemberCallbackUpdatedOnly = void(CalleeT::*)(const std::vector<MemoryID>& updatedSnapshotIDs); + using MemberCallbackUpdatedOnly = + void (CalleeT::*)(const std::vector<MemoryID>& updatedSnapshotIDs); static std::string MakeMemoryTopicName(const MemoryID& memoryID); public: + class SubscriptionHandle + { + friend class MemoryListener; + + public: + //SubscriptionHandle() = delete; + void release(); + + private: + SubscriptionHandle(MemoryListener* memoryListener, MemoryID memoryID, long id); + + private: + MemoryListener* memoryListener; + MemoryID memoryID; + long id; + }; + + class ScopedSubscriptionHandle + { + public: + ScopedSubscriptionHandle(SubscriptionHandle handle); + ScopedSubscriptionHandle* operator=(SubscriptionHandle const& other); + + ~ScopedSubscriptionHandle(); + private: + std::optional<SubscriptionHandle> handle; + }; + + + public: MemoryListener(ManagedIceObject* component = nullptr); void setComponent(ManagedIceObject* component); - void subscribe(const MemoryID& subscriptionID, Callback Callback); - void subscribe(const MemoryID& subscriptionID, CallbackUpdatedOnly Callback); + SubscriptionHandle subscribe(const MemoryID& subscriptionID, Callback Callback); + SubscriptionHandle subscribe(const MemoryID& subscriptionID, CallbackUpdatedOnly Callback); /** * Subscribe with a class member function: @@ -55,28 +88,33 @@ namespace armarx::armem::client::util * @endcode */ template <class CalleeT> - void subscribe(const MemoryID& subscriptionID, CalleeT* callee, MemberCallback<CalleeT> callback) + MemoryListener::SubscriptionHandle + subscribe(const MemoryID& subscriptionID, CalleeT* callee, MemberCallback<CalleeT> callback) { - auto cb = [callee, callback](const MemoryID & subscriptionID, const std::vector<MemoryID>& updatedSnapshotIDs) - { - (callee->*callback)(subscriptionID, updatedSnapshotIDs); - }; - subscribe(subscriptionID, cb); + auto cb = [callee, callback](const MemoryID& subscriptionID, + const std::vector<MemoryID>& updatedSnapshotIDs) + { (callee->*callback)(subscriptionID, updatedSnapshotIDs); }; + return subscribe(subscriptionID, cb); } template <class CalleeT> - void subscribe(const MemoryID& subscriptionID, CalleeT* callee, MemberCallbackUpdatedOnly<CalleeT> callback) + MemoryListener::SubscriptionHandle + subscribe(const MemoryID& subscriptionID, + CalleeT* callee, + MemberCallbackUpdatedOnly<CalleeT> callback) { - auto cb = [callee, callback](const MemoryID&, const std::vector<MemoryID>& updatedSnapshotIDs) + auto cb = + [callee, callback](const MemoryID&, const std::vector<MemoryID>& updatedSnapshotIDs) { - if(callee) + if (callee) { (callee->*callback)(updatedSnapshotIDs); } }; - subscribe(subscriptionID, cb); + return subscribe(subscriptionID, cb); } + void unsubscribe(SubscriptionHandle& subscriptionHandle); /// Function handling updates from the MemoryListener ice topic. void updated(const std::vector<MemoryID>& updatedIDs) const; @@ -84,13 +122,18 @@ namespace armarx::armem::client::util protected: + long next_id = 0; - std::unordered_map<MemoryID, std::vector<Callback>> callbacks; + typedef struct + { + long id; + Callback callback; + } ManagedCallback; - private: + std::unordered_map<MemoryID, std::vector<ManagedCallback>> callbacks; + private: armarx::ManagedIceObject* component; - }; -} +} // namespace armarx::armem::client::util -- GitLab