Skip to content
Snippets Groups Projects
Commit 7e250b03 authored by Rainer Kartmann's avatar Rainer Kartmann
Browse files

Merge branch 'master' into aron-remote-gui-group-box-for-listz

parents 1c76f910 0ec0be24
No related branches found
No related tags found
No related merge requests found
...@@ -64,6 +64,7 @@ set(LIB_FILES ...@@ -64,6 +64,7 @@ set(LIB_FILES
client/plugins/PluginUser.cpp client/plugins/PluginUser.cpp
client/plugins/Plugin.cpp client/plugins/Plugin.cpp
client/util/SubscriptionHandle.cpp
client/util/MemoryListener.cpp client/util/MemoryListener.cpp
client/util/SimpleReaderBase.cpp client/util/SimpleReaderBase.cpp
client/util/SimpleWriterBase.cpp client/util/SimpleWriterBase.cpp
...@@ -156,6 +157,7 @@ set(LIB_HEADERS ...@@ -156,6 +157,7 @@ set(LIB_HEADERS
client/query/detail/NameSelectorOps.h client/query/detail/NameSelectorOps.h
client/query/detail/SelectorOps.h client/query/detail/SelectorOps.h
client/util/SubscriptionHandle.h
client/util/MemoryListener.h client/util/MemoryListener.h
client/util/SimpleReaderBase.h client/util/SimpleReaderBase.h
client/util/SimpleWriterBase.h client/util/SimpleWriterBase.h
......
...@@ -2,36 +2,33 @@ ...@@ -2,36 +2,33 @@
#include <sstream> #include <sstream>
#include <ArmarXCore/core/ManagedIceObject.h>
#include <ArmarXCore/core/exceptions/LocalException.h> #include <ArmarXCore/core/exceptions/LocalException.h>
#include <ArmarXCore/core/logging/Logging.h>
#include <ArmarXCore/core/ice_conversions/ice_conversions_templates.h> #include <ArmarXCore/core/ice_conversions/ice_conversions_templates.h>
#include <ArmarXCore/core/ManagedIceObject.h> #include <ArmarXCore/core/logging/Logging.h>
#include <RobotAPI/libraries/armem/core/ice_conversions.h>
#include <RobotAPI/libraries/armem/core/error.h> #include <RobotAPI/libraries/armem/core/error.h>
#include <RobotAPI/libraries/armem/core/ice_conversions.h>
namespace armarx::armem::client::util namespace armarx::armem::client::util
{ {
std::string MemoryListener::MakeMemoryTopicName(const MemoryID& memoryID) std::string
MemoryListener::MakeMemoryTopicName(const MemoryID& memoryID)
{ {
return "MemoryUpdates." + memoryID.memoryName; return "MemoryUpdates." + memoryID.memoryName;
} }
MemoryListener::MemoryListener(ManagedIceObject* component) : component(component)
MemoryListener::MemoryListener(ManagedIceObject* component) :
component(component)
{ {
} }
void
void MemoryListener::setComponent(ManagedIceObject* component) MemoryListener::setComponent(ManagedIceObject* component)
{ {
this->component = component; this->component = component;
} }
void void
MemoryListener::updated(const std::vector<data::MemoryID>& updatedSnapshotIDs) const MemoryListener::updated(const std::vector<data::MemoryID>& updatedSnapshotIDs) const
{ {
...@@ -40,7 +37,6 @@ namespace armarx::armem::client::util ...@@ -40,7 +37,6 @@ namespace armarx::armem::client::util
updated(bos); updated(bos);
} }
void void
MemoryListener::updated(const std::vector<MemoryID>& updatedSnapshotIDs) const MemoryListener::updated(const std::vector<MemoryID>& updatedSnapshotIDs) const
{ {
...@@ -76,68 +72,104 @@ namespace armarx::armem::client::util ...@@ -76,68 +72,104 @@ namespace armarx::armem::client::util
if (not matchingSnapshotIDs.empty()) if (not matchingSnapshotIDs.empty())
{ {
ARMARX_DEBUG << "Calling " << subCallbacks.size() << " callbacks" ARMARX_DEBUG << "Calling " << subCallbacks.size() << " callbacks"
<< " subscribing " << subscription << " subscribing " << subscription << " with "
<< " with " << matchingSnapshotIDs.size() << " snapshot IDs ..."; << matchingSnapshotIDs.size() << " snapshot IDs ...";
for (auto& callback : subCallbacks) for (auto& managedCallback : subCallbacks)
{ {
try try
{ {
callback(subscription, matchingSnapshotIDs); managedCallback.callback(subscription, matchingSnapshotIDs);
} }
catch (const armarx::LocalException& e) catch (const armarx::LocalException& e)
{ {
error << "Calling callback subscribing " << subscription << " failed." error << "Calling callback subscribing " << subscription << " failed."
<< "\nCaught armarx::LocalException:" << "\nCaught armarx::LocalException:"
<< "\n" << e.getReason()
<< "\n Stacktrace: \n" << e.generateBacktrace()
<< "\n" << "\n"
; << e.getReason() << "\n Stacktrace: \n"
<< e.generateBacktrace() << "\n";
} }
catch (const std::exception& e) catch (const std::exception& e)
{ {
error << "Calling callback subscribing " << subscription << " failed." error << "Calling callback subscribing " << subscription << " failed."
<< "\nCaught armarx::Exception:" << "\nCaught armarx::Exception:"
<< "\n" << e.what()
<< "\n" << "\n"
; << e.what() << "\n";
} }
catch (...) catch (...)
{ {
error << "Calling callback subscribing " << subscription << " failed." error << "Calling callback subscribing " << subscription << " failed."
<< "\nCaught unknown exception." << "\nCaught unknown exception."
<< "\n" << "\n";
;
} }
} }
} }
} }
if (error.str().size() > 0) if (error.str().size() > 0)
{ {
ARMARX_WARNING << "The following issues were encountered during MemoryListener::" << __FUNCTION__ << "(): \n\n" ARMARX_WARNING << "The following issues were encountered during MemoryListener::"
<< __FUNCTION__ << "(): \n\n"
<< error.str(); << error.str();
} }
} }
SubscriptionHandle
void MemoryListener::subscribe(const MemoryID& memoryID, Callback callback)
MemoryListener::subscribe(const MemoryID& id, Callback callback)
{ {
callbacks[id].push_back(callback); ARMARX_CHECK_NOT_EMPTY(memoryID.memoryName)
if (component) << "The memoryName must be specified to subscribe";
if (component and memoryRefCount[memoryID.memoryName] == 0)
{ {
component->usingTopic(MakeMemoryTopicName(id)); component->usingTopic(MakeMemoryTopicName(memoryID));
} }
auto id = nextId++;
callbacks[memoryID].push_back({id, callback});
memoryRefCount[memoryID.memoryName]++;
return SubscriptionHandle(this, memoryID, id);
} }
SubscriptionHandle
MemoryListener::subscribe(const MemoryID& subscriptionID, CallbackUpdatedOnly callback)
{
return subscribe(
subscriptionID,
[callback](const MemoryID&, const std::vector<MemoryID>& updatedSnapshotIDs)
{ callback(updatedSnapshotIDs); });
}
void void
MemoryListener::subscribe(const MemoryID& subscriptionID, CallbackUpdatedOnly callback) MemoryListener::unsubscribe(SubscriptionHandle& handle)
{ {
subscribe(subscriptionID, [callback](const MemoryID&, const std::vector<MemoryID>& updatedSnapshotIDs) if (not handle.valid)
{
return;
}
handle.valid = false;
// Remove ManagedCallback with ManagedCallback.id == handle.id from callbacks[handle.memoryID]
auto it = std::find_if(callbacks[handle.memoryID].begin(),
callbacks[handle.memoryID].end(),
[&handle](ManagedCallback& mCb) { return mCb.id == handle.id; });
std::iter_swap(it, callbacks[handle.memoryID].end() - 1);
callbacks[handle.memoryID].pop_back();
memoryRefCount[handle.memoryID.memoryName]--;
if (callbacks[handle.memoryID].empty())
{ {
callback(updatedSnapshotIDs); callbacks.erase(handle.memoryID);
});
// unsubscribe from memory topic if no remainig callback needs it
if (component and memoryRefCount[handle.memoryID.memoryName] == 0)
{
component->unsubscribeFromTopic(MakeMemoryTopicName(handle.memoryID));
}
}
} }
} } // namespace armarx::armem::client::util
#pragma once #pragma once
// STD/STL // STD/STL
#include <functional> #include <functional>
#include <unordered_map> #include <unordered_map>
...@@ -10,7 +9,7 @@ ...@@ -10,7 +9,7 @@
#include <RobotAPI/interface/armem/client/MemoryListenerInterface.h> #include <RobotAPI/interface/armem/client/MemoryListenerInterface.h>
#include <RobotAPI/libraries/armem/core/MemoryID.h> #include <RobotAPI/libraries/armem/core/MemoryID.h>
#include "SubscriptionHandle.h"
namespace armarx namespace armarx
{ {
...@@ -19,34 +18,36 @@ namespace armarx ...@@ -19,34 +18,36 @@ namespace armarx
namespace armarx::armem::client::util namespace armarx::armem::client::util
{ {
/** /**
* @brief Handles update signals from the memory system and distributes it * @brief Handles update signals from the memory system and distributes it
* to its subsribers. * to its subsribers.
*/ */
class MemoryListener class MemoryListener
{ {
public:
using Callback = std::function<void(const MemoryID& subscriptionID, const std::vector<MemoryID>& updatedSnapshotIDs)>; public:
using CallbackUpdatedOnly = std::function<void(const std::vector<MemoryID>& updatedSnapshotIDs)>; using Callback = std::function<void(const MemoryID& subscriptionID,
const std::vector<MemoryID>& updatedSnapshotIDs)>;
using CallbackUpdatedOnly =
std::function<void(const std::vector<MemoryID>& updatedSnapshotIDs)>;
template <class CalleeT> template <class CalleeT>
using MemberCallback = void(CalleeT::*)(const MemoryID& subscriptionID, const std::vector<MemoryID>& updatedSnapshotIDs); using MemberCallback = void (CalleeT::*)(const MemoryID& subscriptionID,
const std::vector<MemoryID>& updatedSnapshotIDs);
template <class CalleeT> template <class CalleeT>
using MemberCallbackUpdatedOnly = void(CalleeT::*)(const std::vector<MemoryID>& updatedSnapshotIDs); using MemberCallbackUpdatedOnly =
void (CalleeT::*)(const std::vector<MemoryID>& updatedSnapshotIDs);
static std::string MakeMemoryTopicName(const MemoryID& memoryID); static std::string MakeMemoryTopicName(const MemoryID& memoryID);
public: public:
MemoryListener(ManagedIceObject* component = nullptr); MemoryListener(ManagedIceObject* component = nullptr);
void setComponent(ManagedIceObject* component); void setComponent(ManagedIceObject* component);
void subscribe(const MemoryID& subscriptionID, Callback Callback); SubscriptionHandle subscribe(const MemoryID& subscriptionID, Callback Callback);
void subscribe(const MemoryID& subscriptionID, CallbackUpdatedOnly Callback); SubscriptionHandle subscribe(const MemoryID& subscriptionID, CallbackUpdatedOnly Callback);
/** /**
* Subscribe with a class member function: * Subscribe with a class member function:
...@@ -55,28 +56,33 @@ namespace armarx::armem::client::util ...@@ -55,28 +56,33 @@ namespace armarx::armem::client::util
* @endcode * @endcode
*/ */
template <class CalleeT> template <class CalleeT>
void subscribe(const MemoryID& subscriptionID, CalleeT* callee, MemberCallback<CalleeT> callback) SubscriptionHandle
subscribe(const MemoryID& subscriptionID, CalleeT* callee, MemberCallback<CalleeT> callback)
{ {
auto cb = [callee, callback](const MemoryID & subscriptionID, const std::vector<MemoryID>& updatedSnapshotIDs) auto cb = [callee, callback](const MemoryID& subscriptionID,
{ const std::vector<MemoryID>& updatedSnapshotIDs)
(callee->*callback)(subscriptionID, updatedSnapshotIDs); { (callee->*callback)(subscriptionID, updatedSnapshotIDs); };
}; return subscribe(subscriptionID, cb);
subscribe(subscriptionID, cb);
} }
template <class CalleeT> template <class CalleeT>
void subscribe(const MemoryID& subscriptionID, CalleeT* callee, MemberCallbackUpdatedOnly<CalleeT> callback) SubscriptionHandle
subscribe(const MemoryID& subscriptionID,
CalleeT* callee,
MemberCallbackUpdatedOnly<CalleeT> callback)
{ {
auto cb = [callee, callback](const MemoryID&, const std::vector<MemoryID>& updatedSnapshotIDs) auto cb =
[callee, callback](const MemoryID&, const std::vector<MemoryID>& updatedSnapshotIDs)
{ {
if(callee) if (callee)
{ {
(callee->*callback)(updatedSnapshotIDs); (callee->*callback)(updatedSnapshotIDs);
} }
}; };
subscribe(subscriptionID, cb); return subscribe(subscriptionID, cb);
} }
void unsubscribe(SubscriptionHandle& subscriptionHandle);
/// Function handling updates from the MemoryListener ice topic. /// Function handling updates from the MemoryListener ice topic.
void updated(const std::vector<MemoryID>& updatedIDs) const; void updated(const std::vector<MemoryID>& updatedIDs) const;
...@@ -84,13 +90,21 @@ namespace armarx::armem::client::util ...@@ -84,13 +90,21 @@ namespace armarx::armem::client::util
protected: protected:
long nextId = 0;
std::unordered_map<MemoryID, std::vector<Callback>> callbacks; struct ManagedCallback
{
long id = 0;
Callback callback;
};
private: std::unordered_map<MemoryID, std::vector<ManagedCallback>> callbacks;
armarx::ManagedIceObject* component; /// memoryName -> #callbacks needing memory topic
std::unordered_map<std::string, int> memoryRefCount;
private:
armarx::ManagedIceObject* component;
}; };
} } // namespace armarx::armem::client::util
#include "SubscriptionHandle.h"
#include "MemoryListener.h"
namespace armarx::armem::client::util
{
SubscriptionHandle::SubscriptionHandle(MemoryListener* memoryListener,
const MemoryID& memoryID,
long id) :
valid{true}, memoryListener{memoryListener}, memoryID(memoryID), id{id}
{
}
SubscriptionHandle::SubscriptionHandle() : valid{false}
{
}
SubscriptionHandle::SubscriptionHandle(SubscriptionHandle&& other) :
valid{other.valid},
memoryListener{other.memoryListener},
memoryID(std::move(other.memoryID)),
id{other.id}
{
other.valid = false;
}
SubscriptionHandle&
SubscriptionHandle::operator=(SubscriptionHandle other)
{
swap(*this, other);
return *this;
}
void
SubscriptionHandle::release()
{
memoryListener->unsubscribe(*this);
}
ScopedSubscriptionHandle::ScopedSubscriptionHandle()
{
}
ScopedSubscriptionHandle::ScopedSubscriptionHandle(SubscriptionHandle&& handle) :
handle(std::move(handle))
{
}
ScopedSubscriptionHandle&
ScopedSubscriptionHandle::operator=(SubscriptionHandle handle)
{
std::swap(this->handle, handle);
return *this;
}
ScopedSubscriptionHandle::~ScopedSubscriptionHandle()
{
handle.release();
}
} // namespace armarx::armem::client::util
namespace armarx::armem::client
{
void
util::swap(util::SubscriptionHandle& first, util::SubscriptionHandle& second)
{
std::swap(first.valid, second.valid);
std::swap(first.memoryListener, second.memoryListener);
std::swap(first.memoryID, second.memoryID);
std::swap(first.id, second.id);
}
} // namespace armarx::armem::client
#pragma once
#include <RobotAPI/libraries/armem/core/MemoryID.h>
namespace armarx::armem::client::util
{
class MemoryListener;
class SubscriptionHandle
{
friend class MemoryListener;
public:
SubscriptionHandle();
SubscriptionHandle(SubscriptionHandle&& other);
/**
* @brief Assignment operator.
*
* @note Intentional call by value, since this leverages the move constructor. See
* https://stackoverflow.com/a/11540204 (section "Move assignment operators").
*/
SubscriptionHandle& operator=(SubscriptionHandle other);
friend void swap(SubscriptionHandle& first, SubscriptionHandle& second);
void release();
private:
SubscriptionHandle(MemoryListener* memoryListener, const MemoryID& memoryID, long id);
private:
bool valid = false;
MemoryListener* memoryListener = nullptr;
MemoryID memoryID;
long id = 0;
};
class ScopedSubscriptionHandle
{
public:
ScopedSubscriptionHandle();
ScopedSubscriptionHandle(SubscriptionHandle&& handle);
/**
* @brief Assignment operator.
*
* @note Intentional call by value, since this leverages the move constructor. See
* https://stackoverflow.com/a/11540204 (section "Move assignment operators").
*/
ScopedSubscriptionHandle& operator=(SubscriptionHandle handle);
~ScopedSubscriptionHandle();
private:
SubscriptionHandle handle;
};
void swap(SubscriptionHandle& first, SubscriptionHandle& second);
} // namespace armarx::armem::client::util
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment