Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • sw/armarx/robot-api
  • uwkce_singer/robot-api
  • untcg_hofmann/robot-api
  • ulqba_korosakov/RobotAPI
4 results
Show changes
Commits on Source (2)
......@@ -64,6 +64,7 @@ set(LIB_FILES
client/plugins/PluginUser.cpp
client/plugins/Plugin.cpp
client/util/SubscriptionHandle.cpp
client/util/MemoryListener.cpp
client/util/SimpleReaderBase.cpp
client/util/SimpleWriterBase.cpp
......@@ -156,6 +157,7 @@ set(LIB_HEADERS
client/query/detail/NameSelectorOps.h
client/query/detail/SelectorOps.h
client/util/SubscriptionHandle.h
client/util/MemoryListener.h
client/util/SimpleReaderBase.h
client/util/SimpleWriterBase.h
......
......@@ -2,36 +2,33 @@
#include <sstream>
#include <ArmarXCore/core/ManagedIceObject.h>
#include <ArmarXCore/core/exceptions/LocalException.h>
#include <ArmarXCore/core/logging/Logging.h>
#include <ArmarXCore/core/ice_conversions/ice_conversions_templates.h>
#include <ArmarXCore/core/ManagedIceObject.h>
#include <ArmarXCore/core/logging/Logging.h>
#include <RobotAPI/libraries/armem/core/ice_conversions.h>
#include <RobotAPI/libraries/armem/core/error.h>
#include <RobotAPI/libraries/armem/core/ice_conversions.h>
namespace armarx::armem::client::util
{
std::string MemoryListener::MakeMemoryTopicName(const MemoryID& memoryID)
std::string
MemoryListener::MakeMemoryTopicName(const MemoryID& memoryID)
{
return "MemoryUpdates." + memoryID.memoryName;
}
MemoryListener::MemoryListener(ManagedIceObject* component) :
component(component)
MemoryListener::MemoryListener(ManagedIceObject* component) : component(component)
{
}
void MemoryListener::setComponent(ManagedIceObject* component)
void
MemoryListener::setComponent(ManagedIceObject* component)
{
this->component = component;
}
void
MemoryListener::updated(const std::vector<data::MemoryID>& updatedSnapshotIDs) const
{
......@@ -40,7 +37,6 @@ namespace armarx::armem::client::util
updated(bos);
}
void
MemoryListener::updated(const std::vector<MemoryID>& updatedSnapshotIDs) const
{
......@@ -76,68 +72,104 @@ namespace armarx::armem::client::util
if (not matchingSnapshotIDs.empty())
{
ARMARX_DEBUG << "Calling " << subCallbacks.size() << " callbacks"
<< " subscribing " << subscription
<< " with " << matchingSnapshotIDs.size() << " snapshot IDs ...";
for (auto& callback : subCallbacks)
<< " subscribing " << subscription << " with "
<< matchingSnapshotIDs.size() << " snapshot IDs ...";
for (auto& managedCallback : subCallbacks)
{
try
{
callback(subscription, matchingSnapshotIDs);
managedCallback.callback(subscription, matchingSnapshotIDs);
}
catch (const armarx::LocalException& e)
{
error << "Calling callback subscribing " << subscription << " failed."
<< "\nCaught armarx::LocalException:"
<< "\n" << e.getReason()
<< "\n Stacktrace: \n" << e.generateBacktrace()
<< "\n"
;
<< e.getReason() << "\n Stacktrace: \n"
<< e.generateBacktrace() << "\n";
}
catch (const std::exception& e)
{
error << "Calling callback subscribing " << subscription << " failed."
<< "\nCaught armarx::Exception:"
<< "\n" << e.what()
<< "\n"
;
<< e.what() << "\n";
}
catch (...)
{
error << "Calling callback subscribing " << subscription << " failed."
<< "\nCaught unknown exception."
<< "\n"
;
<< "\n";
}
}
}
}
if (error.str().size() > 0)
{
ARMARX_WARNING << "The following issues were encountered during MemoryListener::" << __FUNCTION__ << "(): \n\n"
ARMARX_WARNING << "The following issues were encountered during MemoryListener::"
<< __FUNCTION__ << "(): \n\n"
<< error.str();
}
}
void
MemoryListener::subscribe(const MemoryID& id, Callback callback)
SubscriptionHandle
MemoryListener::subscribe(const MemoryID& memoryID, Callback callback)
{
callbacks[id].push_back(callback);
if (component)
ARMARX_CHECK_NOT_EMPTY(memoryID.memoryName)
<< "The memoryName must be specified to subscribe";
if (component and memoryRefCount[memoryID.memoryName] == 0)
{
component->usingTopic(MakeMemoryTopicName(id));
component->usingTopic(MakeMemoryTopicName(memoryID));
}
auto id = nextId++;
callbacks[memoryID].push_back({id, callback});
memoryRefCount[memoryID.memoryName]++;
return SubscriptionHandle(this, memoryID, id);
}
SubscriptionHandle
MemoryListener::subscribe(const MemoryID& subscriptionID, CallbackUpdatedOnly callback)
{
return subscribe(
subscriptionID,
[callback](const MemoryID&, const std::vector<MemoryID>& updatedSnapshotIDs)
{ callback(updatedSnapshotIDs); });
}
void
MemoryListener::subscribe(const MemoryID& subscriptionID, CallbackUpdatedOnly callback)
MemoryListener::unsubscribe(SubscriptionHandle& handle)
{
subscribe(subscriptionID, [callback](const MemoryID&, const std::vector<MemoryID>& updatedSnapshotIDs)
if (not handle.valid)
{
return;
}
handle.valid = false;
// Remove ManagedCallback with ManagedCallback.id == handle.id from callbacks[handle.memoryID]
auto it = std::find_if(callbacks[handle.memoryID].begin(),
callbacks[handle.memoryID].end(),
[&handle](ManagedCallback& mCb) { return mCb.id == handle.id; });
std::iter_swap(it, callbacks[handle.memoryID].end() - 1);
callbacks[handle.memoryID].pop_back();
memoryRefCount[handle.memoryID.memoryName]--;
if (callbacks[handle.memoryID].empty())
{
callback(updatedSnapshotIDs);
});
callbacks.erase(handle.memoryID);
// unsubscribe from memory topic if no remainig callback needs it
if (component and memoryRefCount[handle.memoryID.memoryName] == 0)
{
component->unsubscribeFromTopic(MakeMemoryTopicName(handle.memoryID));
}
}
}
}
} // namespace armarx::armem::client::util
#pragma once
// STD/STL
#include <functional>
#include <unordered_map>
......@@ -10,7 +9,7 @@
#include <RobotAPI/interface/armem/client/MemoryListenerInterface.h>
#include <RobotAPI/libraries/armem/core/MemoryID.h>
#include "SubscriptionHandle.h"
namespace armarx
{
......@@ -19,34 +18,36 @@ namespace armarx
namespace armarx::armem::client::util
{
/**
* @brief Handles update signals from the memory system and distributes it
* to its subsribers.
*/
class MemoryListener
{
public:
using Callback = std::function<void(const MemoryID& subscriptionID, const std::vector<MemoryID>& updatedSnapshotIDs)>;
using CallbackUpdatedOnly = std::function<void(const std::vector<MemoryID>& updatedSnapshotIDs)>;
public:
using Callback = std::function<void(const MemoryID& subscriptionID,
const std::vector<MemoryID>& updatedSnapshotIDs)>;
using CallbackUpdatedOnly =
std::function<void(const std::vector<MemoryID>& updatedSnapshotIDs)>;
template <class CalleeT>
using MemberCallback = void(CalleeT::*)(const MemoryID& subscriptionID, const std::vector<MemoryID>& updatedSnapshotIDs);
using MemberCallback = void (CalleeT::*)(const MemoryID& subscriptionID,
const std::vector<MemoryID>& updatedSnapshotIDs);
template <class CalleeT>
using MemberCallbackUpdatedOnly = void(CalleeT::*)(const std::vector<MemoryID>& updatedSnapshotIDs);
using MemberCallbackUpdatedOnly =
void (CalleeT::*)(const std::vector<MemoryID>& updatedSnapshotIDs);
static std::string MakeMemoryTopicName(const MemoryID& memoryID);
public:
MemoryListener(ManagedIceObject* component = nullptr);
void setComponent(ManagedIceObject* component);
void subscribe(const MemoryID& subscriptionID, Callback Callback);
void subscribe(const MemoryID& subscriptionID, CallbackUpdatedOnly Callback);
SubscriptionHandle subscribe(const MemoryID& subscriptionID, Callback Callback);
SubscriptionHandle subscribe(const MemoryID& subscriptionID, CallbackUpdatedOnly Callback);
/**
* Subscribe with a class member function:
......@@ -55,28 +56,33 @@ namespace armarx::armem::client::util
* @endcode
*/
template <class CalleeT>
void subscribe(const MemoryID& subscriptionID, CalleeT* callee, MemberCallback<CalleeT> callback)
SubscriptionHandle
subscribe(const MemoryID& subscriptionID, CalleeT* callee, MemberCallback<CalleeT> callback)
{
auto cb = [callee, callback](const MemoryID & subscriptionID, const std::vector<MemoryID>& updatedSnapshotIDs)
{
(callee->*callback)(subscriptionID, updatedSnapshotIDs);
};
subscribe(subscriptionID, cb);
auto cb = [callee, callback](const MemoryID& subscriptionID,
const std::vector<MemoryID>& updatedSnapshotIDs)
{ (callee->*callback)(subscriptionID, updatedSnapshotIDs); };
return subscribe(subscriptionID, cb);
}
template <class CalleeT>
void subscribe(const MemoryID& subscriptionID, CalleeT* callee, MemberCallbackUpdatedOnly<CalleeT> callback)
SubscriptionHandle
subscribe(const MemoryID& subscriptionID,
CalleeT* callee,
MemberCallbackUpdatedOnly<CalleeT> callback)
{
auto cb = [callee, callback](const MemoryID&, const std::vector<MemoryID>& updatedSnapshotIDs)
auto cb =
[callee, callback](const MemoryID&, const std::vector<MemoryID>& updatedSnapshotIDs)
{
if(callee)
if (callee)
{
(callee->*callback)(updatedSnapshotIDs);
}
};
subscribe(subscriptionID, cb);
return subscribe(subscriptionID, cb);
}
void unsubscribe(SubscriptionHandle& subscriptionHandle);
/// Function handling updates from the MemoryListener ice topic.
void updated(const std::vector<MemoryID>& updatedIDs) const;
......@@ -84,13 +90,21 @@ namespace armarx::armem::client::util
protected:
long nextId = 0;
std::unordered_map<MemoryID, std::vector<Callback>> callbacks;
struct ManagedCallback
{
long id = 0;
Callback callback;
};
private:
std::unordered_map<MemoryID, std::vector<ManagedCallback>> callbacks;
armarx::ManagedIceObject* component;
/// memoryName -> #callbacks needing memory topic
std::unordered_map<std::string, int> memoryRefCount;
private:
armarx::ManagedIceObject* component;
};
}
} // namespace armarx::armem::client::util
#include "SubscriptionHandle.h"
#include "MemoryListener.h"
namespace armarx::armem::client::util
{
SubscriptionHandle::SubscriptionHandle(MemoryListener* memoryListener,
const MemoryID& memoryID,
long id) :
valid{true}, memoryListener{memoryListener}, memoryID(memoryID), id{id}
{
}
SubscriptionHandle::SubscriptionHandle() : valid{false}
{
}
SubscriptionHandle::SubscriptionHandle(SubscriptionHandle&& other) :
valid{other.valid},
memoryListener{other.memoryListener},
memoryID(std::move(other.memoryID)),
id{other.id}
{
other.valid = false;
}
SubscriptionHandle&
SubscriptionHandle::operator=(SubscriptionHandle other)
{
swap(*this, other);
return *this;
}
void
SubscriptionHandle::release()
{
memoryListener->unsubscribe(*this);
}
ScopedSubscriptionHandle::ScopedSubscriptionHandle()
{
}
ScopedSubscriptionHandle::ScopedSubscriptionHandle(SubscriptionHandle&& handle) :
handle(std::move(handle))
{
}
ScopedSubscriptionHandle&
ScopedSubscriptionHandle::operator=(SubscriptionHandle handle)
{
std::swap(this->handle, handle);
return *this;
}
ScopedSubscriptionHandle::~ScopedSubscriptionHandle()
{
handle.release();
}
} // namespace armarx::armem::client::util
namespace armarx::armem::client
{
void
util::swap(util::SubscriptionHandle& first, util::SubscriptionHandle& second)
{
std::swap(first.valid, second.valid);
std::swap(first.memoryListener, second.memoryListener);
std::swap(first.memoryID, second.memoryID);
std::swap(first.id, second.id);
}
} // namespace armarx::armem::client
#pragma once
#include <RobotAPI/libraries/armem/core/MemoryID.h>
namespace armarx::armem::client::util
{
class MemoryListener;
class SubscriptionHandle
{
friend class MemoryListener;
public:
SubscriptionHandle();
SubscriptionHandle(SubscriptionHandle&& other);
/**
* @brief Assignment operator.
*
* @note Intentional call by value, since this leverages the move constructor. See
* https://stackoverflow.com/a/11540204 (section "Move assignment operators").
*/
SubscriptionHandle& operator=(SubscriptionHandle other);
friend void swap(SubscriptionHandle& first, SubscriptionHandle& second);
void release();
private:
SubscriptionHandle(MemoryListener* memoryListener, const MemoryID& memoryID, long id);
private:
bool valid = false;
MemoryListener* memoryListener = nullptr;
MemoryID memoryID;
long id = 0;
};
class ScopedSubscriptionHandle
{
public:
ScopedSubscriptionHandle();
ScopedSubscriptionHandle(SubscriptionHandle&& handle);
/**
* @brief Assignment operator.
*
* @note Intentional call by value, since this leverages the move constructor. See
* https://stackoverflow.com/a/11540204 (section "Move assignment operators").
*/
ScopedSubscriptionHandle& operator=(SubscriptionHandle handle);
~ScopedSubscriptionHandle();
private:
SubscriptionHandle handle;
};
void swap(SubscriptionHandle& first, SubscriptionHandle& second);
} // namespace armarx::armem::client::util