Skip to content
Snippets Groups Projects
Commit f79c8384 authored by Rainer Kartmann's avatar Rainer Kartmann
Browse files

Move subscription code to new base class

parent ded2a4c6
No related branches found
No related tags found
2 merge requests!171Periodic merge of armem/dev into master,!170ArMem Viewer: Resolve Memory IDs
......@@ -85,6 +85,7 @@ set(LIB_FILES
client/Writer.cpp
client/WriterComponentPlugin.cpp
client/util/MemoryListener.cpp
client/util/SimpleReaderBase.cpp
client/util/SimpleWriterBase.cpp
......@@ -199,6 +200,7 @@ set(LIB_HEADERS
client/query/detail/NameSelectorOps.h
client/query/detail/SelectorOps.h
client/util/MemoryListener.h
client/util/SimpleReaderBase.h
client/util/SimpleWriterBase.h
......
......@@ -108,48 +108,6 @@ namespace armarx::armem::client
}
void
Reader::updated(const std::vector<MemoryID>& updatedSnapshotIDs) const
{
std::stringstream error;
for (const auto& [subscription, callbacks] : this->callbacks)
{
std::vector<MemoryID> matchingSnapshotIDs;
for (const MemoryID& updatedSnapshotID : updatedSnapshotIDs)
{
try
{
if (contains(subscription, updatedSnapshotID))
{
matchingSnapshotIDs.push_back(updatedSnapshotID);
}
}
catch (const armem::error::InvalidMemoryID& e)
{
// Log to debug, but ignore otherwise
error << "Error when comparing subscribed ID " << subscription
<< " with updated ID " << updatedSnapshotID << ":\n"
<< e.what() << "\n\n";
}
}
if (not matchingSnapshotIDs.empty())
{
for (auto& callback : callbacks)
{
callback(subscription, matchingSnapshotIDs);
}
}
}
if (error.str().size() > 0)
{
ARMARX_VERBOSE << "The following issues were encountered during Reader::" << __FUNCTION__ << "(): \n\n"
<< error.str();
}
}
data::StoreResult
Reader::readAndStore(const data::StoreInput& input) const
{
......@@ -166,21 +124,6 @@ namespace armarx::armem::client
}
void
Reader::subscribe(const MemoryID& id, callback callback)
{
callbacks[id].push_back(callback);
}
void Reader::subscribe(const MemoryID& subscriptionID, callback_updated_only callback)
{
subscribe(subscriptionID, [callback](const MemoryID&, const std::vector<MemoryID>& updatedSnapshotIDs)
{
callback(updatedSnapshotIDs);
});
}
void
Reader::setReadingMemory(server::ReadingMemoryInterfacePrx memory)
{
......
......@@ -2,8 +2,6 @@
// STD/STL
#include <functional>
#include <unordered_map>
#include <vector>
// RobotAPI
......@@ -13,6 +11,7 @@
#include <RobotAPI/libraries/armem/core/workingmemory/ice_conversions.h>
#include <RobotAPI/libraries/armem/core/workingmemory/Memory.h>
#include <RobotAPI/libraries/armem/core/longtermmemory/Memory.h>
#include <RobotAPI/libraries/armem/client/util/MemoryListener.h>
#include "Query.h"
......@@ -23,18 +22,9 @@ namespace armarx::armem::client
/**
* @brief Reads data from a memory server.
*/
class Reader
class Reader : public util::MemoryListener
{
using callback = std::function<void(const MemoryID& subscriptionID, const std::vector<MemoryID>& updatedSnapshotIDs)>;
using callback_updated_only = std::function<void(const std::vector<MemoryID>& updatedSnapshotIDs)>;
template <class CalleeT>
using member_callback = void(CalleeT::*)(const MemoryID& subscriptionID, const std::vector<MemoryID>& updatedSnapshotIDs);
template <class CalleeT>
using member_callback_updated_only = void(CalleeT::*)(const std::vector<MemoryID>& updatedSnapshotIDs);
public:
/**
......@@ -78,36 +68,6 @@ namespace armarx::armem::client
data::StoreResult readAndStore(const data::StoreInput& input) const;
void subscribe(const MemoryID& subscriptionID, callback callback);
void subscribe(const MemoryID& subscriptionID, callback_updated_only callback);
/**
* Subscribe with a class member function:
* @code
* reader.subscribe(entityID, this, &This::myCallback);
* @endcode
*/
template <class CalleeT>
void subscribe(const MemoryID& subscriptionID, CalleeT* callee, member_callback<CalleeT> callback)
{
auto cb = [callee, callback](const MemoryID & subscriptionID, const std::vector<MemoryID>& updatedSnapshotIDs)
{
(callee->*callback)(subscriptionID, updatedSnapshotIDs);
};
subscribe(subscriptionID, cb);
}
template <class CalleeT>
void subscribe(const MemoryID& subscriptionID, CalleeT* callee, member_callback_updated_only<CalleeT> callback)
{
auto cb = [callee, callback](const MemoryID&, const std::vector<MemoryID>& updatedSnapshotIDs)
{
(callee->*callback)(updatedSnapshotIDs);
};
subscribe(subscriptionID, cb);
}
/// Function handling updates from the MemoryListener ice topic.
void updated(const std::vector<MemoryID>& updatedIDs) const;
inline operator bool() const
{
return bool(memoryPrx);
......@@ -117,10 +77,6 @@ namespace armarx::armem::client
server::ReadingMemoryInterfacePrx memoryPrx;
private:
std::unordered_map<MemoryID, std::vector<callback>> callbacks;
};
}
#include "MemoryListener.h"
#include <sstream>
#include <ArmarXCore/core/logging/Logging.h>
#include <RobotAPI/libraries/armem/core/error.h>
namespace armarx::armem::client::util
{
MemoryListener::MemoryListener()
{
}
void
MemoryListener::updated(const std::vector<MemoryID>& updatedSnapshotIDs) const
{
std::stringstream error;
for (const auto& [subscription, callbacks] : this->callbacks)
{
std::vector<MemoryID> matchingSnapshotIDs;
for (const MemoryID& updatedSnapshotID : updatedSnapshotIDs)
{
try
{
if (contains(subscription, updatedSnapshotID))
{
matchingSnapshotIDs.push_back(updatedSnapshotID);
}
}
catch (const armem::error::InvalidMemoryID& e)
{
// Log to debug, but ignore otherwise
error << "Error when comparing subscribed ID " << subscription
<< " with updated ID " << updatedSnapshotID << ":\n"
<< e.what() << "\n\n";
}
}
if (not matchingSnapshotIDs.empty())
{
for (auto& callback : callbacks)
{
callback(subscription, matchingSnapshotIDs);
}
}
}
if (error.str().size() > 0)
{
ARMARX_VERBOSE << "The following issues were encountered during MemoryListener::" << __FUNCTION__ << "(): \n\n"
<< error.str();
}
}
void
MemoryListener::subscribe(const MemoryID& id, callback callback)
{
callbacks[id].push_back(callback);
}
void MemoryListener::subscribe(const MemoryID& subscriptionID, callback_updated_only callback)
{
subscribe(subscriptionID, [callback](const MemoryID&, const std::vector<MemoryID>& updatedSnapshotIDs)
{
callback(updatedSnapshotIDs);
});
}
}
#pragma once
// STD/STL
#include <functional>
#include <unordered_map>
#include <vector>
// RobotAPI
#include <RobotAPI/interface/armem/client/MemoryListenerInterface.h>
#include <RobotAPI/libraries/armem/core/MemoryID.h>
namespace armarx::armem::client::util
{
/**
* @brief Handles update signals from the memory system and distributes it
* to its subsribers.
*/
class MemoryListener
{
using callback = std::function<void(const MemoryID& subscriptionID, const std::vector<MemoryID>& updatedSnapshotIDs)>;
using callback_updated_only = std::function<void(const std::vector<MemoryID>& updatedSnapshotIDs)>;
template <class CalleeT>
using member_callback = void(CalleeT::*)(const MemoryID& subscriptionID, const std::vector<MemoryID>& updatedSnapshotIDs);
template <class CalleeT>
using member_callback_updated_only = void(CalleeT::*)(const std::vector<MemoryID>& updatedSnapshotIDs);
public:
MemoryListener();
void subscribe(const MemoryID& subscriptionID, callback callback);
void subscribe(const MemoryID& subscriptionID, callback_updated_only callback);
/**
* Subscribe with a class member function:
* @code
* reader.subscribe(entityID, this, &This::myCallback);
* @endcode
*/
template <class CalleeT>
void subscribe(const MemoryID& subscriptionID, CalleeT* callee, member_callback<CalleeT> callback)
{
auto cb = [callee, callback](const MemoryID & subscriptionID, const std::vector<MemoryID>& updatedSnapshotIDs)
{
(callee->*callback)(subscriptionID, updatedSnapshotIDs);
};
subscribe(subscriptionID, cb);
}
template <class CalleeT>
void subscribe(const MemoryID& subscriptionID, CalleeT* callee, member_callback_updated_only<CalleeT> callback)
{
auto cb = [callee, callback](const MemoryID&, const std::vector<MemoryID>& updatedSnapshotIDs)
{
(callee->*callback)(updatedSnapshotIDs);
};
subscribe(subscriptionID, cb);
}
/// Function handling updates from the MemoryListener ice topic.
void updated(const std::vector<MemoryID>& updatedIDs) const;
protected:
std::unordered_map<MemoryID, std::vector<callback>> callbacks;
};
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment