From 5268b82eba5d5a21b2e7af0fdf2ec4a54873ea89 Mon Sep 17 00:00:00 2001 From: Patrick Dormanns <uyrdb@student.kit.edu> Date: Tue, 31 Oct 2023 11:42:04 +0100 Subject: [PATCH] (Scoped)SubscriptionHandle to unsubscribe from memory updates --- .../RobotAPI/libraries/armem/CMakeLists.txt | 2 + .../armem/client/util/MemoryListener.cpp | 102 ++++++++++++------ .../armem/client/util/MemoryListener.h | 64 ++++++----- .../armem/client/util/SubscriptionHandle.cpp | 73 +++++++++++++ .../armem/client/util/SubscriptionHandle.h | 62 +++++++++++ 5 files changed, 243 insertions(+), 60 deletions(-) create mode 100644 source/RobotAPI/libraries/armem/client/util/SubscriptionHandle.cpp create mode 100644 source/RobotAPI/libraries/armem/client/util/SubscriptionHandle.h diff --git a/source/RobotAPI/libraries/armem/CMakeLists.txt b/source/RobotAPI/libraries/armem/CMakeLists.txt index 18c6e22cb..9030bd175 100644 --- a/source/RobotAPI/libraries/armem/CMakeLists.txt +++ b/source/RobotAPI/libraries/armem/CMakeLists.txt @@ -64,6 +64,7 @@ set(LIB_FILES client/plugins/PluginUser.cpp client/plugins/Plugin.cpp + client/util/SubscriptionHandle.cpp client/util/MemoryListener.cpp client/util/SimpleReaderBase.cpp client/util/SimpleWriterBase.cpp @@ -156,6 +157,7 @@ set(LIB_HEADERS client/query/detail/NameSelectorOps.h client/query/detail/SelectorOps.h + client/util/SubscriptionHandle.h client/util/MemoryListener.h client/util/SimpleReaderBase.h client/util/SimpleWriterBase.h diff --git a/source/RobotAPI/libraries/armem/client/util/MemoryListener.cpp b/source/RobotAPI/libraries/armem/client/util/MemoryListener.cpp index c8a7c6c68..9aa23153c 100644 --- a/source/RobotAPI/libraries/armem/client/util/MemoryListener.cpp +++ b/source/RobotAPI/libraries/armem/client/util/MemoryListener.cpp @@ -2,36 +2,33 @@ #include <sstream> +#include <ArmarXCore/core/ManagedIceObject.h> #include <ArmarXCore/core/exceptions/LocalException.h> -#include <ArmarXCore/core/logging/Logging.h> #include <ArmarXCore/core/ice_conversions/ice_conversions_templates.h> -#include <ArmarXCore/core/ManagedIceObject.h> +#include <ArmarXCore/core/logging/Logging.h> -#include <RobotAPI/libraries/armem/core/ice_conversions.h> #include <RobotAPI/libraries/armem/core/error.h> - +#include <RobotAPI/libraries/armem/core/ice_conversions.h> namespace armarx::armem::client::util { - std::string MemoryListener::MakeMemoryTopicName(const MemoryID& memoryID) + std::string + MemoryListener::MakeMemoryTopicName(const MemoryID& memoryID) { return "MemoryUpdates." + memoryID.memoryName; } - - MemoryListener::MemoryListener(ManagedIceObject* component) : - component(component) + MemoryListener::MemoryListener(ManagedIceObject* component) : component(component) { } - - void MemoryListener::setComponent(ManagedIceObject* component) + void + MemoryListener::setComponent(ManagedIceObject* component) { this->component = component; } - void MemoryListener::updated(const std::vector<data::MemoryID>& updatedSnapshotIDs) const { @@ -40,7 +37,6 @@ namespace armarx::armem::client::util updated(bos); } - void MemoryListener::updated(const std::vector<MemoryID>& updatedSnapshotIDs) const { @@ -76,68 +72,104 @@ namespace armarx::armem::client::util if (not matchingSnapshotIDs.empty()) { ARMARX_DEBUG << "Calling " << subCallbacks.size() << " callbacks" - << " subscribing " << subscription - << " with " << matchingSnapshotIDs.size() << " snapshot IDs ..."; - for (auto& callback : subCallbacks) + << " subscribing " << subscription << " with " + << matchingSnapshotIDs.size() << " snapshot IDs ..."; + for (auto& managedCallback : subCallbacks) { try { - callback(subscription, matchingSnapshotIDs); + managedCallback.callback(subscription, matchingSnapshotIDs); } catch (const armarx::LocalException& e) { error << "Calling callback subscribing " << subscription << " failed." << "\nCaught armarx::LocalException:" - << "\n" << e.getReason() - << "\n Stacktrace: \n" << e.generateBacktrace() << "\n" - ; + << e.getReason() << "\n Stacktrace: \n" + << e.generateBacktrace() << "\n"; } catch (const std::exception& e) { error << "Calling callback subscribing " << subscription << " failed." << "\nCaught armarx::Exception:" - << "\n" << e.what() << "\n" - ; + << e.what() << "\n"; } catch (...) { error << "Calling callback subscribing " << subscription << " failed." << "\nCaught unknown exception." - << "\n" - ; + << "\n"; } } } } if (error.str().size() > 0) { - ARMARX_WARNING << "The following issues were encountered during MemoryListener::" << __FUNCTION__ << "(): \n\n" + ARMARX_WARNING << "The following issues were encountered during MemoryListener::" + << __FUNCTION__ << "(): \n\n" << error.str(); } } - - void - MemoryListener::subscribe(const MemoryID& id, Callback callback) + SubscriptionHandle + MemoryListener::subscribe(const MemoryID& memoryID, Callback callback) { - callbacks[id].push_back(callback); - if (component) + ARMARX_CHECK_NOT_EMPTY(memoryID.memoryName) + << "The memoryName must be specified to subscribe"; + + if (component and memoryRefCount[memoryID.memoryName] == 0) { - component->usingTopic(MakeMemoryTopicName(id)); + component->usingTopic(MakeMemoryTopicName(memoryID)); } + + auto id = nextId++; + callbacks[memoryID].push_back({id, callback}); + + memoryRefCount[memoryID.memoryName]++; + + return SubscriptionHandle(this, memoryID, id); } + SubscriptionHandle + MemoryListener::subscribe(const MemoryID& subscriptionID, CallbackUpdatedOnly callback) + { + return subscribe( + subscriptionID, + [callback](const MemoryID&, const std::vector<MemoryID>& updatedSnapshotIDs) + { callback(updatedSnapshotIDs); }); + } void - MemoryListener::subscribe(const MemoryID& subscriptionID, CallbackUpdatedOnly callback) + MemoryListener::unsubscribe(SubscriptionHandle& handle) { - subscribe(subscriptionID, [callback](const MemoryID&, const std::vector<MemoryID>& updatedSnapshotIDs) + if (not handle.valid) + { + return; + } + handle.valid = false; + + // Remove ManagedCallback with ManagedCallback.id == handle.id from callbacks[handle.memoryID] + auto it = std::find_if(callbacks[handle.memoryID].begin(), + callbacks[handle.memoryID].end(), + [&handle](ManagedCallback& mCb) { return mCb.id == handle.id; }); + + std::iter_swap(it, callbacks[handle.memoryID].end() - 1); + callbacks[handle.memoryID].pop_back(); + + memoryRefCount[handle.memoryID.memoryName]--; + + if (callbacks[handle.memoryID].empty()) { - callback(updatedSnapshotIDs); - }); + callbacks.erase(handle.memoryID); + + // unsubscribe from memory topic if no remainig callback needs it + if (component and memoryRefCount[handle.memoryID.memoryName] == 0) + { + component->unsubscribeFromTopic(MakeMemoryTopicName(handle.memoryID)); + } + } } -} +} // namespace armarx::armem::client::util diff --git a/source/RobotAPI/libraries/armem/client/util/MemoryListener.h b/source/RobotAPI/libraries/armem/client/util/MemoryListener.h index 521d9ce69..80d8eb331 100644 --- a/source/RobotAPI/libraries/armem/client/util/MemoryListener.h +++ b/source/RobotAPI/libraries/armem/client/util/MemoryListener.h @@ -1,6 +1,5 @@ #pragma once - // STD/STL #include <functional> #include <unordered_map> @@ -10,7 +9,7 @@ #include <RobotAPI/interface/armem/client/MemoryListenerInterface.h> #include <RobotAPI/libraries/armem/core/MemoryID.h> - +#include "SubscriptionHandle.h" namespace armarx { @@ -19,34 +18,36 @@ namespace armarx namespace armarx::armem::client::util { - /** * @brief Handles update signals from the memory system and distributes it * to its subsribers. */ class MemoryListener { - public: - using Callback = std::function<void(const MemoryID& subscriptionID, const std::vector<MemoryID>& updatedSnapshotIDs)>; - using CallbackUpdatedOnly = std::function<void(const std::vector<MemoryID>& updatedSnapshotIDs)>; + public: + using Callback = std::function<void(const MemoryID& subscriptionID, + const std::vector<MemoryID>& updatedSnapshotIDs)>; + using CallbackUpdatedOnly = + std::function<void(const std::vector<MemoryID>& updatedSnapshotIDs)>; template <class CalleeT> - using MemberCallback = void(CalleeT::*)(const MemoryID& subscriptionID, const std::vector<MemoryID>& updatedSnapshotIDs); + using MemberCallback = void (CalleeT::*)(const MemoryID& subscriptionID, + const std::vector<MemoryID>& updatedSnapshotIDs); template <class CalleeT> - using MemberCallbackUpdatedOnly = void(CalleeT::*)(const std::vector<MemoryID>& updatedSnapshotIDs); + using MemberCallbackUpdatedOnly = + void (CalleeT::*)(const std::vector<MemoryID>& updatedSnapshotIDs); static std::string MakeMemoryTopicName(const MemoryID& memoryID); public: - MemoryListener(ManagedIceObject* component = nullptr); void setComponent(ManagedIceObject* component); - void subscribe(const MemoryID& subscriptionID, Callback Callback); - void subscribe(const MemoryID& subscriptionID, CallbackUpdatedOnly Callback); + SubscriptionHandle subscribe(const MemoryID& subscriptionID, Callback Callback); + SubscriptionHandle subscribe(const MemoryID& subscriptionID, CallbackUpdatedOnly Callback); /** * Subscribe with a class member function: @@ -55,28 +56,33 @@ namespace armarx::armem::client::util * @endcode */ template <class CalleeT> - void subscribe(const MemoryID& subscriptionID, CalleeT* callee, MemberCallback<CalleeT> callback) + SubscriptionHandle + subscribe(const MemoryID& subscriptionID, CalleeT* callee, MemberCallback<CalleeT> callback) { - auto cb = [callee, callback](const MemoryID & subscriptionID, const std::vector<MemoryID>& updatedSnapshotIDs) - { - (callee->*callback)(subscriptionID, updatedSnapshotIDs); - }; - subscribe(subscriptionID, cb); + auto cb = [callee, callback](const MemoryID& subscriptionID, + const std::vector<MemoryID>& updatedSnapshotIDs) + { (callee->*callback)(subscriptionID, updatedSnapshotIDs); }; + return subscribe(subscriptionID, cb); } template <class CalleeT> - void subscribe(const MemoryID& subscriptionID, CalleeT* callee, MemberCallbackUpdatedOnly<CalleeT> callback) + SubscriptionHandle + subscribe(const MemoryID& subscriptionID, + CalleeT* callee, + MemberCallbackUpdatedOnly<CalleeT> callback) { - auto cb = [callee, callback](const MemoryID&, const std::vector<MemoryID>& updatedSnapshotIDs) + auto cb = + [callee, callback](const MemoryID&, const std::vector<MemoryID>& updatedSnapshotIDs) { - if(callee) + if (callee) { (callee->*callback)(updatedSnapshotIDs); } }; - subscribe(subscriptionID, cb); + return subscribe(subscriptionID, cb); } + void unsubscribe(SubscriptionHandle& subscriptionHandle); /// Function handling updates from the MemoryListener ice topic. void updated(const std::vector<MemoryID>& updatedIDs) const; @@ -84,13 +90,21 @@ namespace armarx::armem::client::util protected: + long nextId = 0; - std::unordered_map<MemoryID, std::vector<Callback>> callbacks; + struct ManagedCallback + { + long id = 0; + Callback callback; + }; - private: + std::unordered_map<MemoryID, std::vector<ManagedCallback>> callbacks; - armarx::ManagedIceObject* component; + /// memoryName -> #callbacks needing memory topic + std::unordered_map<std::string, int> memoryRefCount; + private: + armarx::ManagedIceObject* component; }; -} +} // namespace armarx::armem::client::util diff --git a/source/RobotAPI/libraries/armem/client/util/SubscriptionHandle.cpp b/source/RobotAPI/libraries/armem/client/util/SubscriptionHandle.cpp new file mode 100644 index 000000000..114146c91 --- /dev/null +++ b/source/RobotAPI/libraries/armem/client/util/SubscriptionHandle.cpp @@ -0,0 +1,73 @@ +#include "SubscriptionHandle.h" + +#include "MemoryListener.h" + +namespace armarx::armem::client::util +{ + SubscriptionHandle::SubscriptionHandle(MemoryListener* memoryListener, + const MemoryID& memoryID, + long id) : + valid{true}, memoryListener{memoryListener}, memoryID(memoryID), id{id} + { + } + + SubscriptionHandle::SubscriptionHandle() : valid{false} + { + } + + SubscriptionHandle::SubscriptionHandle(SubscriptionHandle&& other) : + valid{other.valid}, + memoryListener{other.memoryListener}, + memoryID(std::move(other.memoryID)), + id{other.id} + { + other.valid = false; + } + + SubscriptionHandle& + SubscriptionHandle::operator=(SubscriptionHandle other) + { + swap(*this, other); + return *this; + } + + void + SubscriptionHandle::release() + { + memoryListener->unsubscribe(*this); + } + + ScopedSubscriptionHandle::ScopedSubscriptionHandle() + { + } + + ScopedSubscriptionHandle::ScopedSubscriptionHandle(SubscriptionHandle&& handle) : + handle(std::move(handle)) + { + } + + ScopedSubscriptionHandle& + ScopedSubscriptionHandle::operator=(SubscriptionHandle handle) + { + std::swap(this->handle, handle); + return *this; + } + + ScopedSubscriptionHandle::~ScopedSubscriptionHandle() + { + handle.release(); + } + +} // namespace armarx::armem::client::util + +namespace armarx::armem::client +{ + void + util::swap(util::SubscriptionHandle& first, util::SubscriptionHandle& second) + { + std::swap(first.valid, second.valid); + std::swap(first.memoryListener, second.memoryListener); + std::swap(first.memoryID, second.memoryID); + std::swap(first.id, second.id); + } +} // namespace armarx::armem::client diff --git a/source/RobotAPI/libraries/armem/client/util/SubscriptionHandle.h b/source/RobotAPI/libraries/armem/client/util/SubscriptionHandle.h new file mode 100644 index 000000000..6847ac9a7 --- /dev/null +++ b/source/RobotAPI/libraries/armem/client/util/SubscriptionHandle.h @@ -0,0 +1,62 @@ +#pragma once + +#include <RobotAPI/libraries/armem/core/MemoryID.h> + +namespace armarx::armem::client::util +{ + + class MemoryListener; + + class SubscriptionHandle + { + friend class MemoryListener; + + public: + SubscriptionHandle(); + SubscriptionHandle(SubscriptionHandle&& other); + + /** + * @brief Assignment operator. + * + * @note Intentional call by value, since this leverages the move constructor. See + * https://stackoverflow.com/a/11540204 (section "Move assignment operators"). + */ + SubscriptionHandle& operator=(SubscriptionHandle other); + + friend void swap(SubscriptionHandle& first, SubscriptionHandle& second); + + void release(); + + private: + SubscriptionHandle(MemoryListener* memoryListener, const MemoryID& memoryID, long id); + + private: + bool valid = false; + MemoryListener* memoryListener = nullptr; + MemoryID memoryID; + long id = 0; + }; + + class ScopedSubscriptionHandle + { + public: + ScopedSubscriptionHandle(); + ScopedSubscriptionHandle(SubscriptionHandle&& handle); + + /** + * @brief Assignment operator. + * + * @note Intentional call by value, since this leverages the move constructor. See + * https://stackoverflow.com/a/11540204 (section "Move assignment operators"). + */ + ScopedSubscriptionHandle& operator=(SubscriptionHandle handle); + + ~ScopedSubscriptionHandle(); + + private: + SubscriptionHandle handle; + }; + + void swap(SubscriptionHandle& first, SubscriptionHandle& second); + +} // namespace armarx::armem::client::util -- GitLab