Skip to content
Snippets Groups Projects
Commit 9ae951f4 authored by Patrick Dormanns's avatar Patrick Dormanns
Browse files

implemented MemoryListener::(Scoped)SubscriptionHandle correctly

parent 6415bc0b
No related branches found
No related tags found
1 merge request!391(Scoped)SubscriptionHandle to unsubscribe from memory updates
Pipeline #15740 passed
This commit is part of merge request !391. Comments created here will be created in the context of that merge request.
...@@ -15,12 +15,39 @@ namespace armarx::armem::client::util ...@@ -15,12 +15,39 @@ namespace armarx::armem::client::util
MemoryListener::SubscriptionHandle::SubscriptionHandle(MemoryListener* memoryListener, MemoryListener::SubscriptionHandle::SubscriptionHandle(MemoryListener* memoryListener,
MemoryID memoryID, const MemoryID& memoryID,
long id) long id) :
valid{true}, memoryListener{memoryListener}, memoryID(memoryID), id{id}
{ {
this->memoryListener = memoryListener; }
this->memoryID = memoryID;
this->id = id; MemoryListener::SubscriptionHandle::SubscriptionHandle() : valid{false}
{
}
MemoryListener::SubscriptionHandle::SubscriptionHandle(SubscriptionHandle&& other) :
valid{other.valid},
memoryListener{other.memoryListener},
memoryID(std::move(other.memoryID)),
id{other.id}
{
other.valid = false;
}
MemoryListener::SubscriptionHandle&
MemoryListener::SubscriptionHandle::operator=(SubscriptionHandle other)
{
swap(*this, other);
return *this;
}
void
swap(MemoryListener::SubscriptionHandle& first, MemoryListener::SubscriptionHandle& second)
{
std::swap(first.valid, second.valid);
std::swap(first.memoryListener, second.memoryListener);
std::swap(first.memoryID, second.memoryID);
std::swap(first.id, second.id);
} }
void void
...@@ -29,24 +56,26 @@ namespace armarx::armem::client::util ...@@ -29,24 +56,26 @@ namespace armarx::armem::client::util
memoryListener->unsubscribe(*this); memoryListener->unsubscribe(*this);
} }
MemoryListener::ScopedSubscriptionHandle::ScopedSubscriptionHandle(SubscriptionHandle handle) : MemoryListener::ScopedSubscriptionHandle::ScopedSubscriptionHandle()
handle(handle) {
}
MemoryListener::ScopedSubscriptionHandle::ScopedSubscriptionHandle(
SubscriptionHandle&& handle) :
handle(std::move(handle))
{ {
} }
MemoryListener::ScopedSubscriptionHandle* MemoryListener::ScopedSubscriptionHandle&
MemoryListener::ScopedSubscriptionHandle::operator=(const SubscriptionHandle& other) MemoryListener::ScopedSubscriptionHandle::operator=(MemoryListener::SubscriptionHandle handle)
{ {
handle.emplace(other); std::swap(this->handle, handle);
return this; return *this;
} }
MemoryListener::ScopedSubscriptionHandle::~ScopedSubscriptionHandle() MemoryListener::ScopedSubscriptionHandle::~ScopedSubscriptionHandle()
{ {
if (handle) handle.release();
{
handle->release();
}
} }
std::string std::string
...@@ -140,7 +169,6 @@ namespace armarx::armem::client::util ...@@ -140,7 +169,6 @@ namespace armarx::armem::client::util
} }
} }
} }
if (error.str().size() > 0) if (error.str().size() > 0)
{ {
ARMARX_WARNING << "The following issues were encountered during MemoryListener::" ARMARX_WARNING << "The following issues were encountered during MemoryListener::"
...@@ -152,14 +180,19 @@ namespace armarx::armem::client::util ...@@ -152,14 +180,19 @@ namespace armarx::armem::client::util
MemoryListener::SubscriptionHandle MemoryListener::SubscriptionHandle
MemoryListener::subscribe(const MemoryID& memoryID, Callback callback) MemoryListener::subscribe(const MemoryID& memoryID, Callback callback)
{ {
if (component && callbacks.count(memoryID) == 0) ARMARX_CHECK_NOT_EMPTY(memoryID.memoryName)
<< "The memoryName must be specified to subscribe";
if (component && memoryRefCount[memoryID.memoryName] == 0)
{ {
component->usingTopic(MakeMemoryTopicName(memoryID)); component->usingTopic(MakeMemoryTopicName(memoryID));
} }
long id = next_id++; auto id = next_id++;
callbacks[memoryID].push_back({id, callback}); callbacks[memoryID].push_back({id, callback});
memoryRefCount[memoryID.memoryName]++;
return SubscriptionHandle(this, memoryID, id); return SubscriptionHandle(this, memoryID, id);
} }
...@@ -175,15 +208,36 @@ namespace armarx::armem::client::util ...@@ -175,15 +208,36 @@ namespace armarx::armem::client::util
void void
MemoryListener::unsubscribe(SubscriptionHandle& handle) MemoryListener::unsubscribe(SubscriptionHandle& handle)
{ {
if (not handle.valid)
{
return;
}
handle.valid = false;
// Remove ManagedCallback with ManagedCallback.id == handle.id from callbacks[handle.memoryID] // Remove ManagedCallback with ManagedCallback.id == handle.id from callbacks[handle.memoryID]
auto it = std::find_if(callbacks[handle.memoryID].begin(), auto it = std::find_if(callbacks[handle.memoryID].begin(),
callbacks[handle.memoryID].end(), callbacks[handle.memoryID].end(),
[handle](ManagedCallback mCb) { return mCb.id == handle.id; }); [&handle](ManagedCallback& mCb) { return mCb.id == handle.id; });
if (it->id == handle.id) if (it->id != handle.id)
{ {
std::iter_swap(it, callbacks[handle.memoryID].end()); return;
callbacks[handle.memoryID].pop_back(); }
std::iter_swap(it, callbacks[handle.memoryID].end() - 1);
callbacks[handle.memoryID].pop_back();
memoryRefCount[handle.memoryID.memoryName]--;
if (callbacks[handle.memoryID].size() == 0)
{
callbacks.erase(handle.memoryID);
// unsubscribe from memory topic if no remainig callback needs it
if (component && memoryRefCount[handle.memoryID.memoryName] == 0)
{
component->unsubscribeFromTopic(MakeMemoryTopicName(handle.memoryID));
}
} }
} }
......
#pragma once #pragma once
// STD/STL // STD/STL
#include <functional> #include <functional>
#include <list>
#include <unordered_map> #include <unordered_map>
#include <vector> #include <vector>
...@@ -48,13 +46,19 @@ namespace armarx::armem::client::util ...@@ -48,13 +46,19 @@ namespace armarx::armem::client::util
friend class MemoryListener; friend class MemoryListener;
public: public:
//SubscriptionHandle() = delete; SubscriptionHandle();
SubscriptionHandle(SubscriptionHandle&& other);
SubscriptionHandle& operator=(SubscriptionHandle other);
friend void swap(SubscriptionHandle& first, SubscriptionHandle& second);
void release(); void release();
private: private:
SubscriptionHandle(MemoryListener* memoryListener, MemoryID memoryID, long id); SubscriptionHandle(MemoryListener* memoryListener, const MemoryID& memoryID, long id);
private: private:
bool valid;
MemoryListener* memoryListener; MemoryListener* memoryListener;
MemoryID memoryID; MemoryID memoryID;
long id; long id;
...@@ -63,13 +67,14 @@ namespace armarx::armem::client::util ...@@ -63,13 +67,14 @@ namespace armarx::armem::client::util
class ScopedSubscriptionHandle class ScopedSubscriptionHandle
{ {
public: public:
ScopedSubscriptionHandle(SubscriptionHandle handle); ScopedSubscriptionHandle();
ScopedSubscriptionHandle* operator=(SubscriptionHandle const& other); ScopedSubscriptionHandle(SubscriptionHandle&& handle);
ScopedSubscriptionHandle& operator=(SubscriptionHandle handle);
~ScopedSubscriptionHandle(); ~ScopedSubscriptionHandle();
private: private:
std::optional<SubscriptionHandle> handle; SubscriptionHandle handle;
}; };
...@@ -132,6 +137,9 @@ namespace armarx::armem::client::util ...@@ -132,6 +137,9 @@ namespace armarx::armem::client::util
std::unordered_map<MemoryID, std::vector<ManagedCallback>> callbacks; std::unordered_map<MemoryID, std::vector<ManagedCallback>> callbacks;
/// memoryName -> #callbacks needing memory topic
std::unordered_map<std::string, int> memoryRefCount;
private: private:
armarx::ManagedIceObject* component; armarx::ManagedIceObject* component;
}; };
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment