Skip to content
Snippets Groups Projects
Commit 058bee70 authored by Fabian Tërnava's avatar Fabian Tërnava
Browse files

Merge remote-tracking branch 'origin/master' into skills/optionals-in-manager-gui

parents 29ec536d 53876d2f
No related branches found
No related tags found
No related merge requests found
#include "ReadStream.h" #include "ReadStream.h"
#include <ArmarXCore/core/time/Clock.h>
#include <RobotAPI/libraries/armem/client/query/Builder.h>
namespace armarx::armem::client namespace armarx::armem::client
{ {
std::optional<wm::EntitySnapshot> ReadStream::ReadStream() : metronome{armarx::core::time::Frequency::Hertz(10)}
ReadStream::stream(const Reader& reader,
const MemoryID& queriedId,
CallbackT& callback,
const armarx::core::time::Frequency& maxFreq)
{ {
// noone else is allowed to open a stream while this stream is running }
std::scoped_lock l(runningMutex);
armarx::core::time::Metronome metronome(maxFreq); ReadStream::ReadStream(const Reader& reader,
auto timeStart = armarx::core::time::Clock::Now(); const MemoryID& queriedId,
const core::time::Frequency& maxPollFrequency) :
reader{reader}, queriedId{queriedId}, metronome{maxPollFrequency}
{
timeStart = armarx::core::time::Clock::Now();
}
bool callbackReturnedFalse = false; std::optional<wm::EntitySnapshot>
while (not streamStoppedExternally && not callbackReturnedFalse) ReadStream::pollBlocking(const SnapshotCallbackT& callback)
{
if (isPolling.exchange(true))
{ {
// make sure to not busy wait. Also wait until probably data is available in first iteration throw armarx::armem::error::ReadStreamAlreadyPolling(queriedId, __PRETTY_FUNCTION__);
metronome.waitForNextTick(); }
pollingStoppedExternally = false;
auto timeEnd = armarx::core::time::Clock::Now();
auto makeQuery = [&timeStart, &timeEnd](const MemoryID& id)
{
query::Builder qb;
query::CoreSegmentSelector& core = id.hasCoreSegmentName()
? qb.coreSegments().withID(id)
: qb.coreSegments().all();
query::ProviderSegmentSelector& prov = id.hasProviderSegmentName()
? core.providerSegments().withID(id)
: core.providerSegments().all();
query::EntitySelector& entity =
id.hasEntityName() ? prov.entities().withID(id) : prov.entities().all();
entity.snapshots().timeRange(timeStart, timeEnd);
return qb.buildQueryInput(); auto result = _pollBlocking(callback);
};
auto query = makeQuery(queriedId); isPolling = false;
return result;
}
auto result = reader.query(query); void
ReadStream::pollAsync(const SnapshotCallbackT& callback)
{
if (isPolling.exchange(true))
{
throw armarx::armem::error::ReadStreamAlreadyPolling(queriedId, __PRETTY_FUNCTION__);
}
pollingStoppedExternally = false;
if (result.success) this->pollingThread = std::thread([&]() { this->_pollBlocking(callback); });
{ }
using EntitySnapshotReference =
std::reference_wrapper<armarx::armem::wm::EntitySnapshot>;
// copy references of snapshots into vector to sort them
std::vector<EntitySnapshotReference> snapshots;
result.memory.forEachSnapshot(
[&snapshots](armarx::armem::wm::EntitySnapshot& snapshot)
{ snapshots.push_back(snapshot); });
// sort correctly
std::sort(snapshots.begin(),
snapshots.end(),
[](const EntitySnapshotReference& a, const EntitySnapshotReference& b)
{ return a.get().id().timestamp < b.get().id().timestamp; });
for (const auto& snapshot : snapshots)
{
// assert times in correct interval
ARMARX_CHECK_LESS_EQUAL(timeStart, snapshot.get().id().timestamp);
ARMARX_CHECK_GREATER_EQUAL(timeEnd, snapshot.get().id().timestamp);
if (!callback(snapshot.get()))
{
return snapshot;
}
}
timeStart = timeEnd + armarx::core::time::Duration::MicroSeconds(1); std::optional<wm::EntitySnapshot>
} ReadStream::_pollBlocking(const SnapshotCallbackT& callback)
else {
while (not pollingStoppedExternally)
{
auto snapshot = pollOnce(callback);
if (snapshot.has_value())
{ {
ARMARX_ERROR return snapshot;
<< deactivateSpam()
<< "Received an error in ReadStream when querying data from a "
"memory. The error was '"
<< result.errorMessage
<< "'. Continue with stream, perhaps the memory was not yet initialized.";
} }
} }
return std::nullopt; return std::nullopt;
} }
void void
ReadStream::openInBackground(const Reader& reader, ReadStream::stop()
const MemoryID& queriedId, {
CallbackT& continueIf, pollingStoppedExternally = true;
const armarx::core::time::Frequency& maxFreq) if (pollingThread.joinable())
{
pollingThread.join();
isPolling = false;
}
}
std::optional<wm::EntitySnapshot>
ReadStream::pollOnce(const SnapshotCallbackT& callback)
{ {
std::scoped_lock r(runningMutex); if (isPolling.exchange(true))
std::scoped_lock l(threadMutex); {
throw armarx::armem::error::ReadStreamAlreadyPolling(queriedId, __PRETTY_FUNCTION__);
}
streaming = true; auto snapshot = _pollOnce(callback);
streamStoppedExternally = true;
// thread is part of this streamreader. Do not detach isPolling = false;
this->runningThread = return snapshot;
std::thread([&]() { this->stream(reader, queriedId, continueIf, maxFreq); });
} }
std::optional<wm::EntitySnapshot> std::optional<wm::EntitySnapshot>
ReadStream::open(const Reader& reader, ReadStream::_pollOnce(const SnapshotCallbackT& callback)
const MemoryID& queriedId,
CallbackT& continueIf,
const armarx::core::time::Frequency& maxFreq)
{ {
std::unique_lock r(runningMutex); // Make sure to not busy wait. Also wait until probably data is available in first iteration.
std::scoped_lock l(threadMutex); metronome.waitForNextTick();
streaming = true; auto timeEnd = armarx::core::time::Clock::Now();
streamStoppedExternally = true; auto makeQuery = [this, &timeEnd](const MemoryID& id)
{
query::Builder qb;
query::CoreSegmentSelector& core =
id.hasCoreSegmentName() ? qb.coreSegments().withID(id) : qb.coreSegments().all();
query::ProviderSegmentSelector& prov = id.hasProviderSegmentName()
? core.providerSegments().withID(id)
: core.providerSegments().all();
query::EntitySelector& entity =
id.hasEntityName() ? prov.entities().withID(id) : prov.entities().all();
entity.snapshots().timeRange(timeStart, timeEnd);
std::optional<wm::EntitySnapshot> ret = std::nullopt; return qb.buildQueryInput();
};
// thread is part of this streamreader. Do not detach auto query = makeQuery(queriedId);
this->runningThread =
std::thread([&]() { ret = this->stream(reader, queriedId, continueIf, maxFreq); });
r.unlock(); auto result = reader.query(query);
// we keep the threadMutex to make sure that noone is replacing the thread member
if (this->runningThread.joinable()) if (result.success)
{ {
// wait until finished using EntitySnapshotReference =
this->runningThread.join(); std::reference_wrapper<armarx::armem::wm::EntitySnapshot>;
// Copy references of snapshots into vector to sort them.
std::vector<EntitySnapshotReference> snapshots;
result.memory.forEachSnapshot([&snapshots](armarx::armem::wm::EntitySnapshot& snapshot)
{ snapshots.push_back(snapshot); });
// Sort correctly.
std::sort(snapshots.begin(),
snapshots.end(),
[](const EntitySnapshotReference& a, const EntitySnapshotReference& b)
{ return a.get().id().timestamp < b.get().id().timestamp; });
// Determine the next start time.
DateTime nextStart;
if (snapshots.size() > 0)
{
// Because they are sorted, back() has the highest time stamp.
nextStart = snapshots.back().get().id().timestamp +
armarx::core::time::Duration::MicroSeconds(1);
}
else
{
nextStart = timeStart;
}
// Call the callback on all snapshots.
for (const auto& snapshot : snapshots)
{
// Assert times in correct interval.
ARMARX_CHECK_LESS_EQUAL(timeStart, snapshot.get().id().timestamp);
ARMARX_CHECK_GREATER_EQUAL(timeEnd, snapshot.get().id().timestamp);
const bool continue_ = callback(snapshot.get());
if (not continue_)
{
return snapshot;
}
}
timeStart = nextStart;
}
else
{
ARMARX_WARNING
<< deactivateSpam()
<< "Received an error in ReadStream when querying data from a "
"memory. The error was '"
<< result.errorMessage
<< "'. Continue with stream, perhaps the memory was not yet initialized.";
} }
return ret; return std::nullopt;
} }
void
ReadStream::close()
{
this->streamStoppedExternally = true;
}
} // namespace armarx::armem::client } // namespace armarx::armem::client
#pragma once #pragma once
#include <functional> #include <functional>
#include <optional>
#include <thread> #include <thread>
#include <ArmarXCore/core/time/Clock.h>
#include <ArmarXCore/core/time/DateTime.h> #include <ArmarXCore/core/time/DateTime.h>
#include <ArmarXCore/core/time/Duration.h>
#include <ArmarXCore/core/time/Frequency.h> #include <ArmarXCore/core/time/Frequency.h>
#include <ArmarXCore/core/time/Metronome.h> #include <ArmarXCore/core/time/Metronome.h>
#include <RobotAPI/libraries/armem/client/query/Builder.h>
#include <RobotAPI/libraries/armem/core/MemoryID.h> #include <RobotAPI/libraries/armem/core/MemoryID.h>
#include <RobotAPI/libraries/armem/core/wm.h> #include <RobotAPI/libraries/armem/core/wm/memory_definitions.h>
#include "Reader.h" #include "Reader.h"
namespace armarx::armem::client namespace armarx::armem::client
{ {
/** /**
* @brief A readstream from the memory. * @brief A stream reading entity snapshots from the memory.
*
* After constructing a ReadStream, polling can be started in three ways:
*
* 1. Run a polling loop in this thread, blocking execution until terminated.
* See pollBlocking().
* 2. Run a polling loop in a new, separate thread, until it is stopped via stop().
* See pollAsync() and stop().
* 3. Perform a single query and process the result, embedded in your own loop or other control
* flow logic.
* See pollOnce().
*/ */
class ReadStream class ReadStream
{ {
public: public:
using CallbackT = std::function<bool(const wm::EntitySnapshot&)>; /**
* @brief Callback called on each entity snapshot in the queried ID.
/// Opens a stream and asyncronouely queries the memory and calls the callback function until the callback returns False. *
void openInBackground(const Reader& reader, * If it returns false, the stream is stopped.
const MemoryID& queriedId, */
CallbackT& callback, using SnapshotCallbackT = std::function<bool(const wm::EntitySnapshot&)>;
const armarx::core::time::Frequency& maxFreq =
armarx::core::time::Frequency::Hertz(10)); /**
* @brief Inizialize a ReadStream which does not represent a stream.
/// close an opened stream */
void close(); ReadStream();
/// open a stream and block until the stop function returns True. Return the object that yielded True. /**
std::optional<wm::EntitySnapshot> open(const Reader& reader, * @brief Initialize a read stream.
const MemoryID& queriedId, *
CallbackT& stopIf, * @param reader
const armarx::core::time::Frequency& maxFreq = * The reader to perform the queries.
armarx::core::time::Frequency::Hertz(10)); * @param queriedId
* The memory ID in which all snapshots should be processed by the stream.
protected: * @param maxPollFrequency
std::optional<wm::EntitySnapshot> stream(const Reader& reader, * The maximum frequency with which queries are performed. The
const MemoryID& queriedId, * real frequency might be lower.
CallbackT& callback, */
const armarx::core::time::Frequency& maxFreq); ReadStream(const Reader& reader,
const MemoryID& queriedId,
const armarx::core::time::Frequency& maxPollFrequency =
armarx::core::time::Frequency::Hertz(10));
/**
* @brief Poll in this thread as long as callback returns true.
*
* @param callback Function to call on each entity snapshot.
* @return The snapshot object that returns false.
* @throw armarx::armem::error::ReadStreamAlreadyPolling If the stream is already polling.
*/
std::optional<wm::EntitySnapshot> pollBlocking(const SnapshotCallbackT& callback);
/**
* @brief Poll in a new thread as long as callback returns true.
*
* Note that callback will be called in a separate thread, so take care of synchronizing
* access to variables in the callback appropriately.
*
* Roughly equivalent to:
* @code
* std::thread thread([]() { stream.pollBlocking(); });
* @endcode
*
* @param callback Function to call on each entity snapshot.
* @throw armarx::armem::error::ReadStreamAlreadyPolling If the stream is already polling.
*/
void pollAsync(const SnapshotCallbackT& callback);
/**
* @brief Stop a running polling loop.
*
* If a polling thread has been started by pollAsync() before, joins the thread.
*/
void stop();
/**
* @brief Perform one query and call the callbacks on each snapshot.
*
* This allows you to define your own loop, for example:
*
* @code
* bool condition = true;
* while (condition)
* {
* auto snapshot = stream.pollOnce(callback);
*
* ...
*
* if (...)
* {
* condition = false;
* }
* }
* @endcode
*
* @param callback Function to call on each entity snapshot.
* @throw armarx::armem::error::ReadStreamAlreadyPolling If the stream is already polling.
*/
std::optional<wm::EntitySnapshot> pollOnce(const SnapshotCallbackT& callback);
private:
std::optional<wm::EntitySnapshot> _pollBlocking(const SnapshotCallbackT& callback);
std::optional<wm::EntitySnapshot> _pollOnce(const SnapshotCallbackT& callback);
private: private:
mutable std::mutex runningMutex; Reader reader;
mutable std::mutex threadMutex; MemoryID queriedId;
std::thread runningThread;
armarx::core::time::Metronome metronome;
armarx::DateTime timeStart;
std::atomic_bool isPolling = false;
std::atomic_bool pollingStoppedExternally = false;
std::atomic_bool streaming = false; std::thread pollingThread;
std::atomic_bool streamStoppedExternally = false;
}; };
} // namespace armarx::armem::client } // namespace armarx::armem::client
...@@ -246,4 +246,19 @@ namespace armarx::armem::error ...@@ -246,4 +246,19 @@ namespace armarx::armem::error
return sstream.str(); return sstream.str();
} }
ReadStreamAlreadyPolling::ReadStreamAlreadyPolling(const MemoryID& queriedId,
const std::string& calledFunction) :
ArMemError(makeMsg(queriedId, calledFunction))
{
}
std::string
ReadStreamAlreadyPolling::makeMsg(const MemoryID& queriedId, const std::string& calledFunction)
{
std::stringstream ss;
ss << "The ReadStream for " << queriedId << " was already running when " << calledFunction
<< " was called.";
return ss.str();
}
} // namespace armarx::armem::error } // namespace armarx::armem::error
...@@ -220,4 +220,15 @@ namespace armarx::armem::error ...@@ -220,4 +220,15 @@ namespace armarx::armem::error
static std::string makeMsg(const std::string& proxyName, const std::string& message = ""); static std::string makeMsg(const std::string& proxyName, const std::string& message = "");
}; };
/**
* @brief Indicates that a ReadStream is already polling when a polling method was called.
*/
class ReadStreamAlreadyPolling : public ArMemError
{
public:
ReadStreamAlreadyPolling(const MemoryID& queriedId, const std::string& calledFunction);
static std::string makeMsg(const MemoryID& queriedId, const std::string& calledFunction);
};
} // namespace armarx::armem::error } // namespace armarx::armem::error
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment