Skip to content
Snippets Groups Projects
Commit 5268b82e authored by Patrick Dormanns's avatar Patrick Dormanns Committed by Christian Dreher
Browse files

(Scoped)SubscriptionHandle to unsubscribe from memory updates

parent 064c9202
No related branches found
No related tags found
No related merge requests found
......@@ -64,6 +64,7 @@ set(LIB_FILES
client/plugins/PluginUser.cpp
client/plugins/Plugin.cpp
client/util/SubscriptionHandle.cpp
client/util/MemoryListener.cpp
client/util/SimpleReaderBase.cpp
client/util/SimpleWriterBase.cpp
......@@ -156,6 +157,7 @@ set(LIB_HEADERS
client/query/detail/NameSelectorOps.h
client/query/detail/SelectorOps.h
client/util/SubscriptionHandle.h
client/util/MemoryListener.h
client/util/SimpleReaderBase.h
client/util/SimpleWriterBase.h
......
......@@ -2,36 +2,33 @@
#include <sstream>
#include <ArmarXCore/core/ManagedIceObject.h>
#include <ArmarXCore/core/exceptions/LocalException.h>
#include <ArmarXCore/core/logging/Logging.h>
#include <ArmarXCore/core/ice_conversions/ice_conversions_templates.h>
#include <ArmarXCore/core/ManagedIceObject.h>
#include <ArmarXCore/core/logging/Logging.h>
#include <RobotAPI/libraries/armem/core/ice_conversions.h>
#include <RobotAPI/libraries/armem/core/error.h>
#include <RobotAPI/libraries/armem/core/ice_conversions.h>
namespace armarx::armem::client::util
{
std::string MemoryListener::MakeMemoryTopicName(const MemoryID& memoryID)
std::string
MemoryListener::MakeMemoryTopicName(const MemoryID& memoryID)
{
return "MemoryUpdates." + memoryID.memoryName;
}
MemoryListener::MemoryListener(ManagedIceObject* component) :
component(component)
MemoryListener::MemoryListener(ManagedIceObject* component) : component(component)
{
}
void MemoryListener::setComponent(ManagedIceObject* component)
void
MemoryListener::setComponent(ManagedIceObject* component)
{
this->component = component;
}
void
MemoryListener::updated(const std::vector<data::MemoryID>& updatedSnapshotIDs) const
{
......@@ -40,7 +37,6 @@ namespace armarx::armem::client::util
updated(bos);
}
void
MemoryListener::updated(const std::vector<MemoryID>& updatedSnapshotIDs) const
{
......@@ -76,68 +72,104 @@ namespace armarx::armem::client::util
if (not matchingSnapshotIDs.empty())
{
ARMARX_DEBUG << "Calling " << subCallbacks.size() << " callbacks"
<< " subscribing " << subscription
<< " with " << matchingSnapshotIDs.size() << " snapshot IDs ...";
for (auto& callback : subCallbacks)
<< " subscribing " << subscription << " with "
<< matchingSnapshotIDs.size() << " snapshot IDs ...";
for (auto& managedCallback : subCallbacks)
{
try
{
callback(subscription, matchingSnapshotIDs);
managedCallback.callback(subscription, matchingSnapshotIDs);
}
catch (const armarx::LocalException& e)
{
error << "Calling callback subscribing " << subscription << " failed."
<< "\nCaught armarx::LocalException:"
<< "\n" << e.getReason()
<< "\n Stacktrace: \n" << e.generateBacktrace()
<< "\n"
;
<< e.getReason() << "\n Stacktrace: \n"
<< e.generateBacktrace() << "\n";
}
catch (const std::exception& e)
{
error << "Calling callback subscribing " << subscription << " failed."
<< "\nCaught armarx::Exception:"
<< "\n" << e.what()
<< "\n"
;
<< e.what() << "\n";
}
catch (...)
{
error << "Calling callback subscribing " << subscription << " failed."
<< "\nCaught unknown exception."
<< "\n"
;
<< "\n";
}
}
}
}
if (error.str().size() > 0)
{
ARMARX_WARNING << "The following issues were encountered during MemoryListener::" << __FUNCTION__ << "(): \n\n"
ARMARX_WARNING << "The following issues were encountered during MemoryListener::"
<< __FUNCTION__ << "(): \n\n"
<< error.str();
}
}
void
MemoryListener::subscribe(const MemoryID& id, Callback callback)
SubscriptionHandle
MemoryListener::subscribe(const MemoryID& memoryID, Callback callback)
{
callbacks[id].push_back(callback);
if (component)
ARMARX_CHECK_NOT_EMPTY(memoryID.memoryName)
<< "The memoryName must be specified to subscribe";
if (component and memoryRefCount[memoryID.memoryName] == 0)
{
component->usingTopic(MakeMemoryTopicName(id));
component->usingTopic(MakeMemoryTopicName(memoryID));
}
auto id = nextId++;
callbacks[memoryID].push_back({id, callback});
memoryRefCount[memoryID.memoryName]++;
return SubscriptionHandle(this, memoryID, id);
}
SubscriptionHandle
MemoryListener::subscribe(const MemoryID& subscriptionID, CallbackUpdatedOnly callback)
{
return subscribe(
subscriptionID,
[callback](const MemoryID&, const std::vector<MemoryID>& updatedSnapshotIDs)
{ callback(updatedSnapshotIDs); });
}
void
MemoryListener::subscribe(const MemoryID& subscriptionID, CallbackUpdatedOnly callback)
MemoryListener::unsubscribe(SubscriptionHandle& handle)
{
subscribe(subscriptionID, [callback](const MemoryID&, const std::vector<MemoryID>& updatedSnapshotIDs)
if (not handle.valid)
{
return;
}
handle.valid = false;
// Remove ManagedCallback with ManagedCallback.id == handle.id from callbacks[handle.memoryID]
auto it = std::find_if(callbacks[handle.memoryID].begin(),
callbacks[handle.memoryID].end(),
[&handle](ManagedCallback& mCb) { return mCb.id == handle.id; });
std::iter_swap(it, callbacks[handle.memoryID].end() - 1);
callbacks[handle.memoryID].pop_back();
memoryRefCount[handle.memoryID.memoryName]--;
if (callbacks[handle.memoryID].empty())
{
callback(updatedSnapshotIDs);
});
callbacks.erase(handle.memoryID);
// unsubscribe from memory topic if no remainig callback needs it
if (component and memoryRefCount[handle.memoryID.memoryName] == 0)
{
component->unsubscribeFromTopic(MakeMemoryTopicName(handle.memoryID));
}
}
}
}
} // namespace armarx::armem::client::util
#pragma once
// STD/STL
#include <functional>
#include <unordered_map>
......@@ -10,7 +9,7 @@
#include <RobotAPI/interface/armem/client/MemoryListenerInterface.h>
#include <RobotAPI/libraries/armem/core/MemoryID.h>
#include "SubscriptionHandle.h"
namespace armarx
{
......@@ -19,34 +18,36 @@ namespace armarx
namespace armarx::armem::client::util
{
/**
* @brief Handles update signals from the memory system and distributes it
* to its subsribers.
*/
class MemoryListener
{
public:
using Callback = std::function<void(const MemoryID& subscriptionID, const std::vector<MemoryID>& updatedSnapshotIDs)>;
using CallbackUpdatedOnly = std::function<void(const std::vector<MemoryID>& updatedSnapshotIDs)>;
public:
using Callback = std::function<void(const MemoryID& subscriptionID,
const std::vector<MemoryID>& updatedSnapshotIDs)>;
using CallbackUpdatedOnly =
std::function<void(const std::vector<MemoryID>& updatedSnapshotIDs)>;
template <class CalleeT>
using MemberCallback = void(CalleeT::*)(const MemoryID& subscriptionID, const std::vector<MemoryID>& updatedSnapshotIDs);
using MemberCallback = void (CalleeT::*)(const MemoryID& subscriptionID,
const std::vector<MemoryID>& updatedSnapshotIDs);
template <class CalleeT>
using MemberCallbackUpdatedOnly = void(CalleeT::*)(const std::vector<MemoryID>& updatedSnapshotIDs);
using MemberCallbackUpdatedOnly =
void (CalleeT::*)(const std::vector<MemoryID>& updatedSnapshotIDs);
static std::string MakeMemoryTopicName(const MemoryID& memoryID);
public:
MemoryListener(ManagedIceObject* component = nullptr);
void setComponent(ManagedIceObject* component);
void subscribe(const MemoryID& subscriptionID, Callback Callback);
void subscribe(const MemoryID& subscriptionID, CallbackUpdatedOnly Callback);
SubscriptionHandle subscribe(const MemoryID& subscriptionID, Callback Callback);
SubscriptionHandle subscribe(const MemoryID& subscriptionID, CallbackUpdatedOnly Callback);
/**
* Subscribe with a class member function:
......@@ -55,28 +56,33 @@ namespace armarx::armem::client::util
* @endcode
*/
template <class CalleeT>
void subscribe(const MemoryID& subscriptionID, CalleeT* callee, MemberCallback<CalleeT> callback)
SubscriptionHandle
subscribe(const MemoryID& subscriptionID, CalleeT* callee, MemberCallback<CalleeT> callback)
{
auto cb = [callee, callback](const MemoryID & subscriptionID, const std::vector<MemoryID>& updatedSnapshotIDs)
{
(callee->*callback)(subscriptionID, updatedSnapshotIDs);
};
subscribe(subscriptionID, cb);
auto cb = [callee, callback](const MemoryID& subscriptionID,
const std::vector<MemoryID>& updatedSnapshotIDs)
{ (callee->*callback)(subscriptionID, updatedSnapshotIDs); };
return subscribe(subscriptionID, cb);
}
template <class CalleeT>
void subscribe(const MemoryID& subscriptionID, CalleeT* callee, MemberCallbackUpdatedOnly<CalleeT> callback)
SubscriptionHandle
subscribe(const MemoryID& subscriptionID,
CalleeT* callee,
MemberCallbackUpdatedOnly<CalleeT> callback)
{
auto cb = [callee, callback](const MemoryID&, const std::vector<MemoryID>& updatedSnapshotIDs)
auto cb =
[callee, callback](const MemoryID&, const std::vector<MemoryID>& updatedSnapshotIDs)
{
if(callee)
if (callee)
{
(callee->*callback)(updatedSnapshotIDs);
}
};
subscribe(subscriptionID, cb);
return subscribe(subscriptionID, cb);
}
void unsubscribe(SubscriptionHandle& subscriptionHandle);
/// Function handling updates from the MemoryListener ice topic.
void updated(const std::vector<MemoryID>& updatedIDs) const;
......@@ -84,13 +90,21 @@ namespace armarx::armem::client::util
protected:
long nextId = 0;
std::unordered_map<MemoryID, std::vector<Callback>> callbacks;
struct ManagedCallback
{
long id = 0;
Callback callback;
};
private:
std::unordered_map<MemoryID, std::vector<ManagedCallback>> callbacks;
armarx::ManagedIceObject* component;
/// memoryName -> #callbacks needing memory topic
std::unordered_map<std::string, int> memoryRefCount;
private:
armarx::ManagedIceObject* component;
};
}
} // namespace armarx::armem::client::util
#include "SubscriptionHandle.h"
#include "MemoryListener.h"
namespace armarx::armem::client::util
{
SubscriptionHandle::SubscriptionHandle(MemoryListener* memoryListener,
const MemoryID& memoryID,
long id) :
valid{true}, memoryListener{memoryListener}, memoryID(memoryID), id{id}
{
}
SubscriptionHandle::SubscriptionHandle() : valid{false}
{
}
SubscriptionHandle::SubscriptionHandle(SubscriptionHandle&& other) :
valid{other.valid},
memoryListener{other.memoryListener},
memoryID(std::move(other.memoryID)),
id{other.id}
{
other.valid = false;
}
SubscriptionHandle&
SubscriptionHandle::operator=(SubscriptionHandle other)
{
swap(*this, other);
return *this;
}
void
SubscriptionHandle::release()
{
memoryListener->unsubscribe(*this);
}
ScopedSubscriptionHandle::ScopedSubscriptionHandle()
{
}
ScopedSubscriptionHandle::ScopedSubscriptionHandle(SubscriptionHandle&& handle) :
handle(std::move(handle))
{
}
ScopedSubscriptionHandle&
ScopedSubscriptionHandle::operator=(SubscriptionHandle handle)
{
std::swap(this->handle, handle);
return *this;
}
ScopedSubscriptionHandle::~ScopedSubscriptionHandle()
{
handle.release();
}
} // namespace armarx::armem::client::util
namespace armarx::armem::client
{
void
util::swap(util::SubscriptionHandle& first, util::SubscriptionHandle& second)
{
std::swap(first.valid, second.valid);
std::swap(first.memoryListener, second.memoryListener);
std::swap(first.memoryID, second.memoryID);
std::swap(first.id, second.id);
}
} // namespace armarx::armem::client
#pragma once
#include <RobotAPI/libraries/armem/core/MemoryID.h>
namespace armarx::armem::client::util
{
class MemoryListener;
class SubscriptionHandle
{
friend class MemoryListener;
public:
SubscriptionHandle();
SubscriptionHandle(SubscriptionHandle&& other);
/**
* @brief Assignment operator.
*
* @note Intentional call by value, since this leverages the move constructor. See
* https://stackoverflow.com/a/11540204 (section "Move assignment operators").
*/
SubscriptionHandle& operator=(SubscriptionHandle other);
friend void swap(SubscriptionHandle& first, SubscriptionHandle& second);
void release();
private:
SubscriptionHandle(MemoryListener* memoryListener, const MemoryID& memoryID, long id);
private:
bool valid = false;
MemoryListener* memoryListener = nullptr;
MemoryID memoryID;
long id = 0;
};
class ScopedSubscriptionHandle
{
public:
ScopedSubscriptionHandle();
ScopedSubscriptionHandle(SubscriptionHandle&& handle);
/**
* @brief Assignment operator.
*
* @note Intentional call by value, since this leverages the move constructor. See
* https://stackoverflow.com/a/11540204 (section "Move assignment operators").
*/
ScopedSubscriptionHandle& operator=(SubscriptionHandle handle);
~ScopedSubscriptionHandle();
private:
SubscriptionHandle handle;
};
void swap(SubscriptionHandle& first, SubscriptionHandle& second);
} // namespace armarx::armem::client::util
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment