Skip to content
Snippets Groups Projects
Commit 77adbf6a authored by Fabian Tërnava's avatar Fabian Tërnava
Browse files

implemented readStream for clients

parent 103cddbf
No related branches found
No related tags found
No related merge requests found
Pipeline #15739 passed
......@@ -57,6 +57,7 @@ set(LIB_FILES
client/MemoryNameSystem.cpp
client/Reader.cpp
client/Writer.cpp
client/ReadStream.cpp
client/plugins/ListeningPluginUser.cpp
client/plugins/ListeningPlugin.cpp
......@@ -138,6 +139,7 @@ set(LIB_HEADERS
client/MemoryNameSystem.h
client/Reader.h
client/Writer.h
client/ReadStream.h
client/plugins.h
client/plugins/ListeningPluginUser.h
......
#include "ReadStream.h"
namespace armarx::armem::client
{
std::optional<wm::EntitySnapshot>
ReadStream::stream(const Reader& reader,
const MemoryID& queriedId,
CallbackT& callback,
const armarx::core::time::Frequency& maxFreq)
{
// noone else is allowed to open a stream while this stream is running
std::scoped_lock l(runningMutex);
armarx::core::time::Metronome metronome(maxFreq);
auto timeStart = armarx::core::time::Clock::Now();
bool callbackReturnedFalse = false;
while (not streamStoppedExternally && not callbackReturnedFalse)
{
// make sure to not busy wait. Also wait until probably data is available in first iteration
metronome.waitForNextTick();
auto timeEnd = armarx::core::time::Clock::Now();
auto makeQuery = [&timeStart, &timeEnd](const MemoryID& id)
{
query::Builder qb;
query::CoreSegmentSelector& core = id.hasCoreSegmentName()
? qb.coreSegments().withID(id)
: qb.coreSegments().all();
query::ProviderSegmentSelector& prov = id.hasProviderSegmentName()
? core.providerSegments().withID(id)
: core.providerSegments().all();
query::EntitySelector& entity =
id.hasEntityName() ? prov.entities().withID(id) : prov.entities().all();
entity.snapshots().timeRange(timeStart, timeEnd);
return qb.buildQueryInput();
};
auto query = makeQuery(queriedId);
auto result = reader.query(query);
if (result.success)
{
using EntitySnapshotReference =
std::reference_wrapper<armarx::armem::wm::EntitySnapshot>;
// copy references of snapshots into vector to sort them
std::vector<EntitySnapshotReference> snapshots;
result.memory.forEachSnapshot(
[&snapshots](armarx::armem::wm::EntitySnapshot& snapshot)
{ snapshots.push_back(snapshot); });
// sort correctly
std::sort(snapshots.begin(),
snapshots.end(),
[](const EntitySnapshotReference& a, const EntitySnapshotReference& b)
{ return a.get().id().timestamp < b.get().id().timestamp; });
for (const auto& snapshot : snapshots)
{
// assert times in correct interval
ARMARX_CHECK_LESS_EQUAL(timeStart, snapshot.get().id().timestamp);
ARMARX_CHECK_GREATER_EQUAL(timeEnd, snapshot.get().id().timestamp);
if (!callback(snapshot.get()))
{
return snapshot;
}
}
timeStart = timeEnd + armarx::core::time::Duration::MicroSeconds(1);
}
else
{
ARMARX_ERROR
<< deactivateSpam()
<< "Received an error in ReadStream when querying data from a "
"memory. The error was '"
<< result.errorMessage
<< "'. Continue with stream, perhaps the memory was not yet initialized.";
}
}
return std::nullopt;
}
void
ReadStream::openInBackground(const Reader& reader,
const MemoryID& queriedId,
CallbackT& continueIf,
const armarx::core::time::Frequency& maxFreq)
{
std::scoped_lock r(runningMutex);
std::scoped_lock l(threadMutex);
streaming = true;
streamStoppedExternally = true;
// thread is part of this streamreader. Do not detach
this->runningThread =
std::thread([&]() { this->stream(reader, queriedId, continueIf, maxFreq); });
}
std::optional<wm::EntitySnapshot>
ReadStream::open(const Reader& reader,
const MemoryID& queriedId,
CallbackT& continueIf,
const armarx::core::time::Frequency& maxFreq)
{
std::unique_lock r(runningMutex);
std::scoped_lock l(threadMutex);
streaming = true;
streamStoppedExternally = true;
std::optional<wm::EntitySnapshot> ret = std::nullopt;
// thread is part of this streamreader. Do not detach
this->runningThread =
std::thread([&]() { ret = this->stream(reader, queriedId, continueIf, maxFreq); });
r.unlock();
// we keep the threadMutex to make sure that noone is replacing the thread member
if (this->runningThread.joinable())
{
// wait until finished
this->runningThread.join();
}
return ret;
}
void
ReadStream::close()
{
this->streamStoppedExternally = true;
}
} // namespace armarx::armem::client
#pragma once
#include <functional>
#include <thread>
#include <ArmarXCore/core/time/Clock.h>
#include <ArmarXCore/core/time/DateTime.h>
#include <ArmarXCore/core/time/Duration.h>
#include <ArmarXCore/core/time/Frequency.h>
#include <ArmarXCore/core/time/Metronome.h>
#include <RobotAPI/libraries/armem/client/query/Builder.h>
#include <RobotAPI/libraries/armem/core/MemoryID.h>
#include <RobotAPI/libraries/armem/core/wm.h>
#include "Reader.h"
namespace armarx::armem::client
{
/**
* @brief A readstream from the memory.
*/
class ReadStream
{
public:
using CallbackT = std::function<bool(const wm::EntitySnapshot&)>;
/// Opens a stream and asyncronouely queries the memory and calls the callback function until the callback returns False.
void openInBackground(const Reader& reader,
const MemoryID& queriedId,
CallbackT& callback,
const armarx::core::time::Frequency& maxFreq =
armarx::core::time::Frequency::Hertz(10));
/// close an opened stream
void close();
/// open a stream and block until the stop function returns True. Return the object that yielded True.
std::optional<wm::EntitySnapshot> open(const Reader& reader,
const MemoryID& queriedId,
CallbackT& stopIf,
const armarx::core::time::Frequency& maxFreq =
armarx::core::time::Frequency::Hertz(10));
protected:
std::optional<wm::EntitySnapshot> stream(const Reader& reader,
const MemoryID& queriedId,
CallbackT& callback,
const armarx::core::time::Frequency& maxFreq);
private:
mutable std::mutex runningMutex;
mutable std::mutex threadMutex;
std::thread runningThread;
std::atomic_bool streaming = false;
std::atomic_bool streamStoppedExternally = false;
};
} // namespace armarx::armem::client
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment