Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • sw/armarx/robot-api
  • uwkce_singer/robot-api
  • untcg_hofmann/robot-api
  • ulqba_korosakov/RobotAPI
4 results
Show changes
Commits on Source (2)
...@@ -64,6 +64,7 @@ set(LIB_FILES ...@@ -64,6 +64,7 @@ set(LIB_FILES
client/plugins/PluginUser.cpp client/plugins/PluginUser.cpp
client/plugins/Plugin.cpp client/plugins/Plugin.cpp
client/util/SubscriptionHandle.cpp
client/util/MemoryListener.cpp client/util/MemoryListener.cpp
client/util/SimpleReaderBase.cpp client/util/SimpleReaderBase.cpp
client/util/SimpleWriterBase.cpp client/util/SimpleWriterBase.cpp
...@@ -156,6 +157,7 @@ set(LIB_HEADERS ...@@ -156,6 +157,7 @@ set(LIB_HEADERS
client/query/detail/NameSelectorOps.h client/query/detail/NameSelectorOps.h
client/query/detail/SelectorOps.h client/query/detail/SelectorOps.h
client/util/SubscriptionHandle.h
client/util/MemoryListener.h client/util/MemoryListener.h
client/util/SimpleReaderBase.h client/util/SimpleReaderBase.h
client/util/SimpleWriterBase.h client/util/SimpleWriterBase.h
......
...@@ -2,36 +2,33 @@ ...@@ -2,36 +2,33 @@
#include <sstream> #include <sstream>
#include <ArmarXCore/core/ManagedIceObject.h>
#include <ArmarXCore/core/exceptions/LocalException.h> #include <ArmarXCore/core/exceptions/LocalException.h>
#include <ArmarXCore/core/logging/Logging.h>
#include <ArmarXCore/core/ice_conversions/ice_conversions_templates.h> #include <ArmarXCore/core/ice_conversions/ice_conversions_templates.h>
#include <ArmarXCore/core/ManagedIceObject.h> #include <ArmarXCore/core/logging/Logging.h>
#include <RobotAPI/libraries/armem/core/ice_conversions.h>
#include <RobotAPI/libraries/armem/core/error.h> #include <RobotAPI/libraries/armem/core/error.h>
#include <RobotAPI/libraries/armem/core/ice_conversions.h>
namespace armarx::armem::client::util namespace armarx::armem::client::util
{ {
std::string MemoryListener::MakeMemoryTopicName(const MemoryID& memoryID) std::string
MemoryListener::MakeMemoryTopicName(const MemoryID& memoryID)
{ {
return "MemoryUpdates." + memoryID.memoryName; return "MemoryUpdates." + memoryID.memoryName;
} }
MemoryListener::MemoryListener(ManagedIceObject* component) : component(component)
MemoryListener::MemoryListener(ManagedIceObject* component) :
component(component)
{ {
} }
void
void MemoryListener::setComponent(ManagedIceObject* component) MemoryListener::setComponent(ManagedIceObject* component)
{ {
this->component = component; this->component = component;
} }
void void
MemoryListener::updated(const std::vector<data::MemoryID>& updatedSnapshotIDs) const MemoryListener::updated(const std::vector<data::MemoryID>& updatedSnapshotIDs) const
{ {
...@@ -40,7 +37,6 @@ namespace armarx::armem::client::util ...@@ -40,7 +37,6 @@ namespace armarx::armem::client::util
updated(bos); updated(bos);
} }
void void
MemoryListener::updated(const std::vector<MemoryID>& updatedSnapshotIDs) const MemoryListener::updated(const std::vector<MemoryID>& updatedSnapshotIDs) const
{ {
...@@ -76,68 +72,104 @@ namespace armarx::armem::client::util ...@@ -76,68 +72,104 @@ namespace armarx::armem::client::util
if (not matchingSnapshotIDs.empty()) if (not matchingSnapshotIDs.empty())
{ {
ARMARX_DEBUG << "Calling " << subCallbacks.size() << " callbacks" ARMARX_DEBUG << "Calling " << subCallbacks.size() << " callbacks"
<< " subscribing " << subscription << " subscribing " << subscription << " with "
<< " with " << matchingSnapshotIDs.size() << " snapshot IDs ..."; << matchingSnapshotIDs.size() << " snapshot IDs ...";
for (auto& callback : subCallbacks) for (auto& managedCallback : subCallbacks)
{ {
try try
{ {
callback(subscription, matchingSnapshotIDs); managedCallback.callback(subscription, matchingSnapshotIDs);
} }
catch (const armarx::LocalException& e) catch (const armarx::LocalException& e)
{ {
error << "Calling callback subscribing " << subscription << " failed." error << "Calling callback subscribing " << subscription << " failed."
<< "\nCaught armarx::LocalException:" << "\nCaught armarx::LocalException:"
<< "\n" << e.getReason()
<< "\n Stacktrace: \n" << e.generateBacktrace()
<< "\n" << "\n"
; << e.getReason() << "\n Stacktrace: \n"
<< e.generateBacktrace() << "\n";
} }
catch (const std::exception& e) catch (const std::exception& e)
{ {
error << "Calling callback subscribing " << subscription << " failed." error << "Calling callback subscribing " << subscription << " failed."
<< "\nCaught armarx::Exception:" << "\nCaught armarx::Exception:"
<< "\n" << e.what()
<< "\n" << "\n"
; << e.what() << "\n";
} }
catch (...) catch (...)
{ {
error << "Calling callback subscribing " << subscription << " failed." error << "Calling callback subscribing " << subscription << " failed."
<< "\nCaught unknown exception." << "\nCaught unknown exception."
<< "\n" << "\n";
;
} }
} }
} }
} }
if (error.str().size() > 0) if (error.str().size() > 0)
{ {
ARMARX_WARNING << "The following issues were encountered during MemoryListener::" << __FUNCTION__ << "(): \n\n" ARMARX_WARNING << "The following issues were encountered during MemoryListener::"
<< __FUNCTION__ << "(): \n\n"
<< error.str(); << error.str();
} }
} }
SubscriptionHandle
void MemoryListener::subscribe(const MemoryID& memoryID, Callback callback)
MemoryListener::subscribe(const MemoryID& id, Callback callback)
{ {
callbacks[id].push_back(callback); ARMARX_CHECK_NOT_EMPTY(memoryID.memoryName)
if (component) << "The memoryName must be specified to subscribe";
if (component and memoryRefCount[memoryID.memoryName] == 0)
{ {
component->usingTopic(MakeMemoryTopicName(id)); component->usingTopic(MakeMemoryTopicName(memoryID));
} }
auto id = nextId++;
callbacks[memoryID].push_back({id, callback});
memoryRefCount[memoryID.memoryName]++;
return SubscriptionHandle(this, memoryID, id);
} }
SubscriptionHandle
MemoryListener::subscribe(const MemoryID& subscriptionID, CallbackUpdatedOnly callback)
{
return subscribe(
subscriptionID,
[callback](const MemoryID&, const std::vector<MemoryID>& updatedSnapshotIDs)
{ callback(updatedSnapshotIDs); });
}
void void
MemoryListener::subscribe(const MemoryID& subscriptionID, CallbackUpdatedOnly callback) MemoryListener::unsubscribe(SubscriptionHandle& handle)
{ {
subscribe(subscriptionID, [callback](const MemoryID&, const std::vector<MemoryID>& updatedSnapshotIDs) if (not handle.valid)
{
return;
}
handle.valid = false;
// Remove ManagedCallback with ManagedCallback.id == handle.id from callbacks[handle.memoryID]
auto it = std::find_if(callbacks[handle.memoryID].begin(),
callbacks[handle.memoryID].end(),
[&handle](ManagedCallback& mCb) { return mCb.id == handle.id; });
std::iter_swap(it, callbacks[handle.memoryID].end() - 1);
callbacks[handle.memoryID].pop_back();
memoryRefCount[handle.memoryID.memoryName]--;
if (callbacks[handle.memoryID].empty())
{ {
callback(updatedSnapshotIDs); callbacks.erase(handle.memoryID);
});
// unsubscribe from memory topic if no remainig callback needs it
if (component and memoryRefCount[handle.memoryID.memoryName] == 0)
{
component->unsubscribeFromTopic(MakeMemoryTopicName(handle.memoryID));
}
}
} }
} } // namespace armarx::armem::client::util
#pragma once #pragma once
// STD/STL // STD/STL
#include <functional> #include <functional>
#include <unordered_map> #include <unordered_map>
...@@ -10,7 +9,7 @@ ...@@ -10,7 +9,7 @@
#include <RobotAPI/interface/armem/client/MemoryListenerInterface.h> #include <RobotAPI/interface/armem/client/MemoryListenerInterface.h>
#include <RobotAPI/libraries/armem/core/MemoryID.h> #include <RobotAPI/libraries/armem/core/MemoryID.h>
#include "SubscriptionHandle.h"
namespace armarx namespace armarx
{ {
...@@ -19,34 +18,36 @@ namespace armarx ...@@ -19,34 +18,36 @@ namespace armarx
namespace armarx::armem::client::util namespace armarx::armem::client::util
{ {
/** /**
* @brief Handles update signals from the memory system and distributes it * @brief Handles update signals from the memory system and distributes it
* to its subsribers. * to its subsribers.
*/ */
class MemoryListener class MemoryListener
{ {
public:
using Callback = std::function<void(const MemoryID& subscriptionID, const std::vector<MemoryID>& updatedSnapshotIDs)>; public:
using CallbackUpdatedOnly = std::function<void(const std::vector<MemoryID>& updatedSnapshotIDs)>; using Callback = std::function<void(const MemoryID& subscriptionID,
const std::vector<MemoryID>& updatedSnapshotIDs)>;
using CallbackUpdatedOnly =
std::function<void(const std::vector<MemoryID>& updatedSnapshotIDs)>;
template <class CalleeT> template <class CalleeT>
using MemberCallback = void(CalleeT::*)(const MemoryID& subscriptionID, const std::vector<MemoryID>& updatedSnapshotIDs); using MemberCallback = void (CalleeT::*)(const MemoryID& subscriptionID,
const std::vector<MemoryID>& updatedSnapshotIDs);
template <class CalleeT> template <class CalleeT>
using MemberCallbackUpdatedOnly = void(CalleeT::*)(const std::vector<MemoryID>& updatedSnapshotIDs); using MemberCallbackUpdatedOnly =
void (CalleeT::*)(const std::vector<MemoryID>& updatedSnapshotIDs);
static std::string MakeMemoryTopicName(const MemoryID& memoryID); static std::string MakeMemoryTopicName(const MemoryID& memoryID);
public: public:
MemoryListener(ManagedIceObject* component = nullptr); MemoryListener(ManagedIceObject* component = nullptr);
void setComponent(ManagedIceObject* component); void setComponent(ManagedIceObject* component);
void subscribe(const MemoryID& subscriptionID, Callback Callback); SubscriptionHandle subscribe(const MemoryID& subscriptionID, Callback Callback);
void subscribe(const MemoryID& subscriptionID, CallbackUpdatedOnly Callback); SubscriptionHandle subscribe(const MemoryID& subscriptionID, CallbackUpdatedOnly Callback);
/** /**
* Subscribe with a class member function: * Subscribe with a class member function:
...@@ -55,28 +56,33 @@ namespace armarx::armem::client::util ...@@ -55,28 +56,33 @@ namespace armarx::armem::client::util
* @endcode * @endcode
*/ */
template <class CalleeT> template <class CalleeT>
void subscribe(const MemoryID& subscriptionID, CalleeT* callee, MemberCallback<CalleeT> callback) SubscriptionHandle
subscribe(const MemoryID& subscriptionID, CalleeT* callee, MemberCallback<CalleeT> callback)
{ {
auto cb = [callee, callback](const MemoryID & subscriptionID, const std::vector<MemoryID>& updatedSnapshotIDs) auto cb = [callee, callback](const MemoryID& subscriptionID,
{ const std::vector<MemoryID>& updatedSnapshotIDs)
(callee->*callback)(subscriptionID, updatedSnapshotIDs); { (callee->*callback)(subscriptionID, updatedSnapshotIDs); };
}; return subscribe(subscriptionID, cb);
subscribe(subscriptionID, cb);
} }
template <class CalleeT> template <class CalleeT>
void subscribe(const MemoryID& subscriptionID, CalleeT* callee, MemberCallbackUpdatedOnly<CalleeT> callback) SubscriptionHandle
subscribe(const MemoryID& subscriptionID,
CalleeT* callee,
MemberCallbackUpdatedOnly<CalleeT> callback)
{ {
auto cb = [callee, callback](const MemoryID&, const std::vector<MemoryID>& updatedSnapshotIDs) auto cb =
[callee, callback](const MemoryID&, const std::vector<MemoryID>& updatedSnapshotIDs)
{ {
if(callee) if (callee)
{ {
(callee->*callback)(updatedSnapshotIDs); (callee->*callback)(updatedSnapshotIDs);
} }
}; };
subscribe(subscriptionID, cb); return subscribe(subscriptionID, cb);
} }
void unsubscribe(SubscriptionHandle& subscriptionHandle);
/// Function handling updates from the MemoryListener ice topic. /// Function handling updates from the MemoryListener ice topic.
void updated(const std::vector<MemoryID>& updatedIDs) const; void updated(const std::vector<MemoryID>& updatedIDs) const;
...@@ -84,13 +90,21 @@ namespace armarx::armem::client::util ...@@ -84,13 +90,21 @@ namespace armarx::armem::client::util
protected: protected:
long nextId = 0;
std::unordered_map<MemoryID, std::vector<Callback>> callbacks; struct ManagedCallback
{
long id = 0;
Callback callback;
};
private: std::unordered_map<MemoryID, std::vector<ManagedCallback>> callbacks;
armarx::ManagedIceObject* component; /// memoryName -> #callbacks needing memory topic
std::unordered_map<std::string, int> memoryRefCount;
private:
armarx::ManagedIceObject* component;
}; };
} } // namespace armarx::armem::client::util
#include "SubscriptionHandle.h"
#include "MemoryListener.h"
namespace armarx::armem::client::util
{
SubscriptionHandle::SubscriptionHandle(MemoryListener* memoryListener,
const MemoryID& memoryID,
long id) :
valid{true}, memoryListener{memoryListener}, memoryID(memoryID), id{id}
{
}
SubscriptionHandle::SubscriptionHandle() : valid{false}
{
}
SubscriptionHandle::SubscriptionHandle(SubscriptionHandle&& other) :
valid{other.valid},
memoryListener{other.memoryListener},
memoryID(std::move(other.memoryID)),
id{other.id}
{
other.valid = false;
}
SubscriptionHandle&
SubscriptionHandle::operator=(SubscriptionHandle other)
{
swap(*this, other);
return *this;
}
void
SubscriptionHandle::release()
{
memoryListener->unsubscribe(*this);
}
ScopedSubscriptionHandle::ScopedSubscriptionHandle()
{
}
ScopedSubscriptionHandle::ScopedSubscriptionHandle(SubscriptionHandle&& handle) :
handle(std::move(handle))
{
}
ScopedSubscriptionHandle&
ScopedSubscriptionHandle::operator=(SubscriptionHandle handle)
{
std::swap(this->handle, handle);
return *this;
}
ScopedSubscriptionHandle::~ScopedSubscriptionHandle()
{
handle.release();
}
} // namespace armarx::armem::client::util
namespace armarx::armem::client
{
void
util::swap(util::SubscriptionHandle& first, util::SubscriptionHandle& second)
{
std::swap(first.valid, second.valid);
std::swap(first.memoryListener, second.memoryListener);
std::swap(first.memoryID, second.memoryID);
std::swap(first.id, second.id);
}
} // namespace armarx::armem::client
#pragma once
#include <RobotAPI/libraries/armem/core/MemoryID.h>
namespace armarx::armem::client::util
{
class MemoryListener;
class SubscriptionHandle
{
friend class MemoryListener;
public:
SubscriptionHandle();
SubscriptionHandle(SubscriptionHandle&& other);
/**
* @brief Assignment operator.
*
* @note Intentional call by value, since this leverages the move constructor. See
* https://stackoverflow.com/a/11540204 (section "Move assignment operators").
*/
SubscriptionHandle& operator=(SubscriptionHandle other);
friend void swap(SubscriptionHandle& first, SubscriptionHandle& second);
void release();
private:
SubscriptionHandle(MemoryListener* memoryListener, const MemoryID& memoryID, long id);
private:
bool valid = false;
MemoryListener* memoryListener = nullptr;
MemoryID memoryID;
long id = 0;
};
class ScopedSubscriptionHandle
{
public:
ScopedSubscriptionHandle();
ScopedSubscriptionHandle(SubscriptionHandle&& handle);
/**
* @brief Assignment operator.
*
* @note Intentional call by value, since this leverages the move constructor. See
* https://stackoverflow.com/a/11540204 (section "Move assignment operators").
*/
ScopedSubscriptionHandle& operator=(SubscriptionHandle handle);
~ScopedSubscriptionHandle();
private:
SubscriptionHandle handle;
};
void swap(SubscriptionHandle& first, SubscriptionHandle& second);
} // namespace armarx::armem::client::util