Skip to content
Snippets Groups Projects

Skills: Optionals in manager gui

Merged Rainer Kartmann requested to merge skills/optionals-in-manager-gui into master
4 files
+ 272
137
Compare changes
  • Side-by-side
  • Inline
Files
4
#include "ReadStream.h"
#include <ArmarXCore/core/time/Clock.h>
#include <RobotAPI/libraries/armem/client/query/Builder.h>
namespace armarx::armem::client
{
std::optional<wm::EntitySnapshot>
ReadStream::stream(const Reader& reader,
const MemoryID& queriedId,
CallbackT& callback,
const armarx::core::time::Frequency& maxFreq)
ReadStream::ReadStream() : metronome{armarx::core::time::Frequency::Hertz(10)}
{
// noone else is allowed to open a stream while this stream is running
std::scoped_lock l(runningMutex);
}
armarx::core::time::Metronome metronome(maxFreq);
auto timeStart = armarx::core::time::Clock::Now();
ReadStream::ReadStream(const Reader& reader,
const MemoryID& queriedId,
const core::time::Frequency& maxPollFrequency) :
reader{reader}, queriedId{queriedId}, metronome{maxPollFrequency}
{
timeStart = armarx::core::time::Clock::Now();
}
bool callbackReturnedFalse = false;
while (not streamStoppedExternally && not callbackReturnedFalse)
std::optional<wm::EntitySnapshot>
ReadStream::pollBlocking(const SnapshotCallbackT& callback)
{
if (isPolling.exchange(true))
{
// make sure to not busy wait. Also wait until probably data is available in first iteration
metronome.waitForNextTick();
auto timeEnd = armarx::core::time::Clock::Now();
auto makeQuery = [&timeStart, &timeEnd](const MemoryID& id)
{
query::Builder qb;
query::CoreSegmentSelector& core = id.hasCoreSegmentName()
? qb.coreSegments().withID(id)
: qb.coreSegments().all();
query::ProviderSegmentSelector& prov = id.hasProviderSegmentName()
? core.providerSegments().withID(id)
: core.providerSegments().all();
query::EntitySelector& entity =
id.hasEntityName() ? prov.entities().withID(id) : prov.entities().all();
entity.snapshots().timeRange(timeStart, timeEnd);
throw armarx::armem::error::ReadStreamAlreadyPolling(queriedId, __PRETTY_FUNCTION__);
}
pollingStoppedExternally = false;
return qb.buildQueryInput();
};
auto result = _pollBlocking(callback);
auto query = makeQuery(queriedId);
isPolling = false;
return result;
}
auto result = reader.query(query);
void
ReadStream::pollAsync(const SnapshotCallbackT& callback)
{
if (isPolling.exchange(true))
{
throw armarx::armem::error::ReadStreamAlreadyPolling(queriedId, __PRETTY_FUNCTION__);
}
pollingStoppedExternally = false;
if (result.success)
{
using EntitySnapshotReference =
std::reference_wrapper<armarx::armem::wm::EntitySnapshot>;
// copy references of snapshots into vector to sort them
std::vector<EntitySnapshotReference> snapshots;
result.memory.forEachSnapshot(
[&snapshots](armarx::armem::wm::EntitySnapshot& snapshot)
{ snapshots.push_back(snapshot); });
// sort correctly
std::sort(snapshots.begin(),
snapshots.end(),
[](const EntitySnapshotReference& a, const EntitySnapshotReference& b)
{ return a.get().id().timestamp < b.get().id().timestamp; });
for (const auto& snapshot : snapshots)
{
// assert times in correct interval
ARMARX_CHECK_LESS_EQUAL(timeStart, snapshot.get().id().timestamp);
ARMARX_CHECK_GREATER_EQUAL(timeEnd, snapshot.get().id().timestamp);
if (!callback(snapshot.get()))
{
return snapshot;
}
}
this->pollingThread = std::thread([&]() { this->_pollBlocking(callback); });
}
timeStart = timeEnd + armarx::core::time::Duration::MicroSeconds(1);
}
else
std::optional<wm::EntitySnapshot>
ReadStream::_pollBlocking(const SnapshotCallbackT& callback)
{
while (not pollingStoppedExternally)
{
auto snapshot = pollOnce(callback);
if (snapshot.has_value())
{
ARMARX_ERROR
<< deactivateSpam()
<< "Received an error in ReadStream when querying data from a "
"memory. The error was '"
<< result.errorMessage
<< "'. Continue with stream, perhaps the memory was not yet initialized.";
return snapshot;
}
}
return std::nullopt;
}
void
ReadStream::openInBackground(const Reader& reader,
const MemoryID& queriedId,
CallbackT& continueIf,
const armarx::core::time::Frequency& maxFreq)
ReadStream::stop()
{
pollingStoppedExternally = true;
if (pollingThread.joinable())
{
pollingThread.join();
isPolling = false;
}
}
std::optional<wm::EntitySnapshot>
ReadStream::pollOnce(const SnapshotCallbackT& callback)
{
std::scoped_lock r(runningMutex);
std::scoped_lock l(threadMutex);
if (isPolling.exchange(true))
{
throw armarx::armem::error::ReadStreamAlreadyPolling(queriedId, __PRETTY_FUNCTION__);
}
streaming = true;
streamStoppedExternally = true;
auto snapshot = _pollOnce(callback);
// thread is part of this streamreader. Do not detach
this->runningThread =
std::thread([&]() { this->stream(reader, queriedId, continueIf, maxFreq); });
isPolling = false;
return snapshot;
}
std::optional<wm::EntitySnapshot>
ReadStream::open(const Reader& reader,
const MemoryID& queriedId,
CallbackT& continueIf,
const armarx::core::time::Frequency& maxFreq)
ReadStream::_pollOnce(const SnapshotCallbackT& callback)
{
std::unique_lock r(runningMutex);
std::scoped_lock l(threadMutex);
// Make sure to not busy wait. Also wait until probably data is available in first iteration.
metronome.waitForNextTick();
streaming = true;
streamStoppedExternally = true;
auto timeEnd = armarx::core::time::Clock::Now();
auto makeQuery = [this, &timeEnd](const MemoryID& id)
{
query::Builder qb;
query::CoreSegmentSelector& core =
id.hasCoreSegmentName() ? qb.coreSegments().withID(id) : qb.coreSegments().all();
query::ProviderSegmentSelector& prov = id.hasProviderSegmentName()
? core.providerSegments().withID(id)
: core.providerSegments().all();
query::EntitySelector& entity =
id.hasEntityName() ? prov.entities().withID(id) : prov.entities().all();
entity.snapshots().timeRange(timeStart, timeEnd);
std::optional<wm::EntitySnapshot> ret = std::nullopt;
return qb.buildQueryInput();
};
// thread is part of this streamreader. Do not detach
this->runningThread =
std::thread([&]() { ret = this->stream(reader, queriedId, continueIf, maxFreq); });
auto query = makeQuery(queriedId);
r.unlock();
// we keep the threadMutex to make sure that noone is replacing the thread member
auto result = reader.query(query);
if (this->runningThread.joinable())
if (result.success)
{
// wait until finished
this->runningThread.join();
using EntitySnapshotReference =
std::reference_wrapper<armarx::armem::wm::EntitySnapshot>;
// Copy references of snapshots into vector to sort them.
std::vector<EntitySnapshotReference> snapshots;
result.memory.forEachSnapshot([&snapshots](armarx::armem::wm::EntitySnapshot& snapshot)
{ snapshots.push_back(snapshot); });
// Sort correctly.
std::sort(snapshots.begin(),
snapshots.end(),
[](const EntitySnapshotReference& a, const EntitySnapshotReference& b)
{ return a.get().id().timestamp < b.get().id().timestamp; });
// Determine the next start time.
DateTime nextStart;
if (snapshots.size() > 0)
{
// Because they are sorted, back() has the highest time stamp.
nextStart = snapshots.back().get().id().timestamp +
armarx::core::time::Duration::MicroSeconds(1);
}
else
{
nextStart = timeStart;
}
// Call the callback on all snapshots.
for (const auto& snapshot : snapshots)
{
// Assert times in correct interval.
ARMARX_CHECK_LESS_EQUAL(timeStart, snapshot.get().id().timestamp);
ARMARX_CHECK_GREATER_EQUAL(timeEnd, snapshot.get().id().timestamp);
const bool continue_ = callback(snapshot.get());
if (not continue_)
{
return snapshot;
}
}
timeStart = nextStart;
}
else
{
ARMARX_WARNING
<< deactivateSpam()
<< "Received an error in ReadStream when querying data from a "
"memory. The error was '"
<< result.errorMessage
<< "'. Continue with stream, perhaps the memory was not yet initialized.";
}
return ret;
return std::nullopt;
}
void
ReadStream::close()
{
this->streamStoppedExternally = true;
}
} // namespace armarx::armem::client
Loading