Skip to content
Snippets Groups Projects
Commit f5ff81b6 authored by Rainer Kartmann's avatar Rainer Kartmann
Browse files

Refactor and document ReadStream

parent 77adbf6a
No related branches found
No related tags found
No related merge requests found
Pipeline #15764 passed
#include "ReadStream.h"
#include <ArmarXCore/core/time/Clock.h>
#include <RobotAPI/libraries/armem/client/query/Builder.h>
namespace armarx::armem::client
{
std::optional<wm::EntitySnapshot>
ReadStream::stream(const Reader& reader,
const MemoryID& queriedId,
CallbackT& callback,
const armarx::core::time::Frequency& maxFreq)
ReadStream::ReadStream() : metronome{armarx::core::time::Frequency::Hertz(10)}
{
// noone else is allowed to open a stream while this stream is running
std::scoped_lock l(runningMutex);
}
armarx::core::time::Metronome metronome(maxFreq);
auto timeStart = armarx::core::time::Clock::Now();
ReadStream::ReadStream(const Reader& reader,
const MemoryID& queriedId,
const core::time::Frequency& maxPollFrequency) :
reader{reader}, queriedId{queriedId}, metronome{maxPollFrequency}
{
timeStart = armarx::core::time::Clock::Now();
}
bool callbackReturnedFalse = false;
while (not streamStoppedExternally && not callbackReturnedFalse)
std::optional<wm::EntitySnapshot>
ReadStream::pollBlocking(SnapshotCallbackT& callback)
{
if (isPolling.exchange(true))
{
// make sure to not busy wait. Also wait until probably data is available in first iteration
metronome.waitForNextTick();
auto timeEnd = armarx::core::time::Clock::Now();
auto makeQuery = [&timeStart, &timeEnd](const MemoryID& id)
{
query::Builder qb;
query::CoreSegmentSelector& core = id.hasCoreSegmentName()
? qb.coreSegments().withID(id)
: qb.coreSegments().all();
query::ProviderSegmentSelector& prov = id.hasProviderSegmentName()
? core.providerSegments().withID(id)
: core.providerSegments().all();
query::EntitySelector& entity =
id.hasEntityName() ? prov.entities().withID(id) : prov.entities().all();
entity.snapshots().timeRange(timeStart, timeEnd);
throw armarx::armem::error::ReadStreamAlreadyPolling(queriedId, __PRETTY_FUNCTION__);
}
pollingStoppedExternally = false;
return qb.buildQueryInput();
};
auto result = _pollBlocking(callback);
auto query = makeQuery(queriedId);
isPolling = false;
return result;
}
auto result = reader.query(query);
void
ReadStream::pollAsync(SnapshotCallbackT& callback)
{
if (isPolling.exchange(true))
{
throw armarx::armem::error::ReadStreamAlreadyPolling(queriedId, __PRETTY_FUNCTION__);
}
pollingStoppedExternally = false;
if (result.success)
{
using EntitySnapshotReference =
std::reference_wrapper<armarx::armem::wm::EntitySnapshot>;
// copy references of snapshots into vector to sort them
std::vector<EntitySnapshotReference> snapshots;
result.memory.forEachSnapshot(
[&snapshots](armarx::armem::wm::EntitySnapshot& snapshot)
{ snapshots.push_back(snapshot); });
// sort correctly
std::sort(snapshots.begin(),
snapshots.end(),
[](const EntitySnapshotReference& a, const EntitySnapshotReference& b)
{ return a.get().id().timestamp < b.get().id().timestamp; });
for (const auto& snapshot : snapshots)
{
// assert times in correct interval
ARMARX_CHECK_LESS_EQUAL(timeStart, snapshot.get().id().timestamp);
ARMARX_CHECK_GREATER_EQUAL(timeEnd, snapshot.get().id().timestamp);
if (!callback(snapshot.get()))
{
return snapshot;
}
}
this->pollingThread = std::thread([&]() { this->_pollBlocking(callback); });
}
timeStart = timeEnd + armarx::core::time::Duration::MicroSeconds(1);
}
else
std::optional<wm::EntitySnapshot>
ReadStream::_pollBlocking(SnapshotCallbackT& callback)
{
while (not pollingStoppedExternally)
{
auto snapshot = pollOnce(callback);
if (snapshot.has_value())
{
ARMARX_ERROR
<< deactivateSpam()
<< "Received an error in ReadStream when querying data from a "
"memory. The error was '"
<< result.errorMessage
<< "'. Continue with stream, perhaps the memory was not yet initialized.";
return snapshot;
}
}
return std::nullopt;
}
void
ReadStream::openInBackground(const Reader& reader,
const MemoryID& queriedId,
CallbackT& continueIf,
const armarx::core::time::Frequency& maxFreq)
ReadStream::stop()
{
pollingStoppedExternally = true;
if (pollingThread.joinable())
{
pollingThread.join();
isPolling = false;
}
}
std::optional<wm::EntitySnapshot>
ReadStream::pollOnce(SnapshotCallbackT& callback)
{
std::scoped_lock r(runningMutex);
std::scoped_lock l(threadMutex);
if (isPolling.exchange(true))
{
throw armarx::armem::error::ReadStreamAlreadyPolling(queriedId, __PRETTY_FUNCTION__);
}
streaming = true;
streamStoppedExternally = true;
auto snapshot = _pollOnce(callback);
// thread is part of this streamreader. Do not detach
this->runningThread =
std::thread([&]() { this->stream(reader, queriedId, continueIf, maxFreq); });
isPolling = false;
return snapshot;
}
std::optional<wm::EntitySnapshot>
ReadStream::open(const Reader& reader,
const MemoryID& queriedId,
CallbackT& continueIf,
const armarx::core::time::Frequency& maxFreq)
ReadStream::_pollOnce(SnapshotCallbackT& callback)
{
std::unique_lock r(runningMutex);
std::scoped_lock l(threadMutex);
// Make sure to not busy wait. Also wait until probably data is available in first iteration.
metronome.waitForNextTick();
streaming = true;
streamStoppedExternally = true;
auto timeEnd = armarx::core::time::Clock::Now();
auto makeQuery = [this, &timeEnd](const MemoryID& id)
{
query::Builder qb;
query::CoreSegmentSelector& core =
id.hasCoreSegmentName() ? qb.coreSegments().withID(id) : qb.coreSegments().all();
query::ProviderSegmentSelector& prov = id.hasProviderSegmentName()
? core.providerSegments().withID(id)
: core.providerSegments().all();
query::EntitySelector& entity =
id.hasEntityName() ? prov.entities().withID(id) : prov.entities().all();
entity.snapshots().timeRange(timeStart, timeEnd);
std::optional<wm::EntitySnapshot> ret = std::nullopt;
return qb.buildQueryInput();
};
// thread is part of this streamreader. Do not detach
this->runningThread =
std::thread([&]() { ret = this->stream(reader, queriedId, continueIf, maxFreq); });
auto query = makeQuery(queriedId);
r.unlock();
// we keep the threadMutex to make sure that noone is replacing the thread member
auto result = reader.query(query);
if (this->runningThread.joinable())
if (result.success)
{
// wait until finished
this->runningThread.join();
using EntitySnapshotReference =
std::reference_wrapper<armarx::armem::wm::EntitySnapshot>;
// Copy references of snapshots into vector to sort them.
std::vector<EntitySnapshotReference> snapshots;
result.memory.forEachSnapshot([&snapshots](armarx::armem::wm::EntitySnapshot& snapshot)
{ snapshots.push_back(snapshot); });
// Sort correctly.
std::sort(snapshots.begin(),
snapshots.end(),
[](const EntitySnapshotReference& a, const EntitySnapshotReference& b)
{ return a.get().id().timestamp < b.get().id().timestamp; });
// Determine the next start time.
DateTime nextStart;
if (snapshots.size() > 0)
{
// Because they are sorted, back() has the highest time stamp.
nextStart = snapshots.back().get().id().timestamp +
armarx::core::time::Duration::MicroSeconds(1);
}
else
{
nextStart = timeStart;
}
// Call the callback on all snapshots.
for (const auto& snapshot : snapshots)
{
// Assert times in correct interval.
ARMARX_CHECK_LESS_EQUAL(timeStart, snapshot.get().id().timestamp);
ARMARX_CHECK_GREATER_EQUAL(timeEnd, snapshot.get().id().timestamp);
const bool continue_ = callback(snapshot.get());
if (not continue_)
{
return snapshot;
}
}
timeStart = nextStart;
}
else
{
ARMARX_WARNING
<< deactivateSpam()
<< "Received an error in ReadStream when querying data from a "
"memory. The error was '"
<< result.errorMessage
<< "'. Continue with stream, perhaps the memory was not yet initialized.";
}
return ret;
return std::nullopt;
}
void
ReadStream::close()
{
this->streamStoppedExternally = true;
}
} // namespace armarx::armem::client
#pragma once
#include <functional>
#include <optional>
#include <thread>
#include <ArmarXCore/core/time/Clock.h>
#include <ArmarXCore/core/time/DateTime.h>
#include <ArmarXCore/core/time/Duration.h>
#include <ArmarXCore/core/time/Frequency.h>
#include <ArmarXCore/core/time/Metronome.h>
#include <RobotAPI/libraries/armem/client/query/Builder.h>
#include <RobotAPI/libraries/armem/core/MemoryID.h>
#include <RobotAPI/libraries/armem/core/wm.h>
#include <RobotAPI/libraries/armem/core/wm/memory_definitions.h>
#include "Reader.h"
namespace armarx::armem::client
{
/**
* @brief A readstream from the memory.
* @brief A stream reading entity snapshots from the memory.
*
* After constructing a ReadStream, polling can be started in three ways:
*
* 1. Run a polling loop in this thread, blocking execution until terminated.
* See pollBlocking().
* 2. Run a polling loop in a new, separate thread, until it is stopped via stop().
* See pollAsync() and stop().
* 3. Perform a single query and process the result, embedded in your own loop or other control
* flow logic.
* See pollOnce().
*/
class ReadStream
{
public:
using CallbackT = std::function<bool(const wm::EntitySnapshot&)>;
/// Opens a stream and asyncronouely queries the memory and calls the callback function until the callback returns False.
void openInBackground(const Reader& reader,
const MemoryID& queriedId,
CallbackT& callback,
const armarx::core::time::Frequency& maxFreq =
armarx::core::time::Frequency::Hertz(10));
/// close an opened stream
void close();
/// open a stream and block until the stop function returns True. Return the object that yielded True.
std::optional<wm::EntitySnapshot> open(const Reader& reader,
const MemoryID& queriedId,
CallbackT& stopIf,
const armarx::core::time::Frequency& maxFreq =
armarx::core::time::Frequency::Hertz(10));
protected:
std::optional<wm::EntitySnapshot> stream(const Reader& reader,
const MemoryID& queriedId,
CallbackT& callback,
const armarx::core::time::Frequency& maxFreq);
/**
* @brief Callback called on each entity snapshot in the queried ID.
*
* If it returns false, the stream is stopped.
*/
using SnapshotCallbackT = std::function<bool(const wm::EntitySnapshot&)>;
/**
* @brief Inizialize a ReadStream which does not represent a stream.
*/
ReadStream();
/**
* @brief Initialize a read stream.
*
* @param reader
* The reader to perform the queries.
* @param queriedId
* The memory ID in which all snapshots should be processed by the stream.
* @param maxPollFrequency
* The maximum frequency with which queries are performed. The
* real frequency might be lower.
*/
ReadStream(const Reader& reader,
const MemoryID& queriedId,
const armarx::core::time::Frequency& maxPollFrequency =
armarx::core::time::Frequency::Hertz(10));
/**
* @brief Poll in this thread as long as callback returns true.
*
* @param callback Function to call on each entity snapshot.
* @return The snapshot object that returns false.
* @throw armarx::armem::error::ReadStreamAlreadyPolling If the stream is already polling.
*/
std::optional<wm::EntitySnapshot> pollBlocking(SnapshotCallbackT& callback);
/**
* @brief Poll in a new thread as long as callback returns true.
*
* Note that callback will be called in a separate thread, so take care of synchronizing
* access to variables in the callback appropriately.
*
* Roughly equivalent to:
* @code
* std::thread thread([]() { stream.pollBlocking(); });
* @endcode
*
* @param callback Function to call on each entity snapshot.
* @throw armarx::armem::error::ReadStreamAlreadyPolling If the stream is already polling.
*/
void pollAsync(SnapshotCallbackT& callback);
/**
* @brief Stop a running polling loop.
*
* If a polling thread has been started by pollAsync() before, joins the thread.
*/
void stop();
/**
* @brief Perform one query and call the callbacks on each snapshot.
*
* This allows you to define your own loop, for example:
*
* @code
* bool condition = true;
* while (condition)
* {
* auto snapshot = stream.pollOnce(callback);
*
* ...
*
* if (...)
* {
* condition = false;
* }
* }
* @endcode
*
* @param callback Function to call on each entity snapshot.
* @throw armarx::armem::error::ReadStreamAlreadyPolling If the stream is already polling.
*/
std::optional<wm::EntitySnapshot> pollOnce(SnapshotCallbackT& callback);
private:
std::optional<wm::EntitySnapshot> _pollBlocking(SnapshotCallbackT& callback);
std::optional<wm::EntitySnapshot> _pollOnce(SnapshotCallbackT& callback);
private:
mutable std::mutex runningMutex;
mutable std::mutex threadMutex;
std::thread runningThread;
Reader reader;
MemoryID queriedId;
armarx::core::time::Metronome metronome;
armarx::DateTime timeStart;
std::atomic_bool isPolling = false;
std::atomic_bool pollingStoppedExternally = false;
std::atomic_bool streaming = false;
std::atomic_bool streamStoppedExternally = false;
std::thread pollingThread;
};
} // namespace armarx::armem::client
......@@ -246,4 +246,19 @@ namespace armarx::armem::error
return sstream.str();
}
ReadStreamAlreadyPolling::ReadStreamAlreadyPolling(const MemoryID& queriedId,
const std::string& calledFunction) :
ArMemError(makeMsg(queriedId, calledFunction))
{
}
std::string
ReadStreamAlreadyPolling::makeMsg(const MemoryID& queriedId, const std::string& calledFunction)
{
std::stringstream ss;
ss << "The ReadStream for " << queriedId << " was already running when " << calledFunction
<< " was called.";
return ss.str();
}
} // namespace armarx::armem::error
......@@ -220,4 +220,15 @@ namespace armarx::armem::error
static std::string makeMsg(const std::string& proxyName, const std::string& message = "");
};
/**
* @brief Indicates that a ReadStream is already polling when a polling method was called.
*/
class ReadStreamAlreadyPolling : public ArMemError
{
public:
ReadStreamAlreadyPolling(const MemoryID& queriedId, const std::string& calledFunction);
static std::string makeMsg(const MemoryID& queriedId, const std::string& calledFunction);
};
} // namespace armarx::armem::error
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment