diff --git a/source/RobotAPI/libraries/armem/CMakeLists.txt b/source/RobotAPI/libraries/armem/CMakeLists.txt index 676c1fd0752baf0c2995e479dfad2cca4f3d9960..18c6e22cb4b1e18717cf20cd77cbd94731129987 100644 --- a/source/RobotAPI/libraries/armem/CMakeLists.txt +++ b/source/RobotAPI/libraries/armem/CMakeLists.txt @@ -57,6 +57,7 @@ set(LIB_FILES client/MemoryNameSystem.cpp client/Reader.cpp client/Writer.cpp + client/ReadStream.cpp client/plugins/ListeningPluginUser.cpp client/plugins/ListeningPlugin.cpp @@ -138,6 +139,7 @@ set(LIB_HEADERS client/MemoryNameSystem.h client/Reader.h client/Writer.h + client/ReadStream.h client/plugins.h client/plugins/ListeningPluginUser.h diff --git a/source/RobotAPI/libraries/armem/client/ReadStream.cpp b/source/RobotAPI/libraries/armem/client/ReadStream.cpp new file mode 100644 index 0000000000000000000000000000000000000000..0c508a0306403797b3f8eabbf513733928ab9edf --- /dev/null +++ b/source/RobotAPI/libraries/armem/client/ReadStream.cpp @@ -0,0 +1,141 @@ +#include "ReadStream.h" + +namespace armarx::armem::client +{ + std::optional<wm::EntitySnapshot> + ReadStream::stream(const Reader& reader, + const MemoryID& queriedId, + CallbackT& callback, + const armarx::core::time::Frequency& maxFreq) + { + // noone else is allowed to open a stream while this stream is running + std::scoped_lock l(runningMutex); + + + armarx::core::time::Metronome metronome(maxFreq); + auto timeStart = armarx::core::time::Clock::Now(); + + bool callbackReturnedFalse = false; + while (not streamStoppedExternally && not callbackReturnedFalse) + { + // make sure to not busy wait. Also wait until probably data is available in first iteration + metronome.waitForNextTick(); + + auto timeEnd = armarx::core::time::Clock::Now(); + auto makeQuery = [&timeStart, &timeEnd](const MemoryID& id) + { + query::Builder qb; + query::CoreSegmentSelector& core = id.hasCoreSegmentName() + ? qb.coreSegments().withID(id) + : qb.coreSegments().all(); + query::ProviderSegmentSelector& prov = id.hasProviderSegmentName() + ? core.providerSegments().withID(id) + : core.providerSegments().all(); + query::EntitySelector& entity = + id.hasEntityName() ? prov.entities().withID(id) : prov.entities().all(); + entity.snapshots().timeRange(timeStart, timeEnd); + + return qb.buildQueryInput(); + }; + + auto query = makeQuery(queriedId); + + auto result = reader.query(query); + + if (result.success) + { + using EntitySnapshotReference = + std::reference_wrapper<armarx::armem::wm::EntitySnapshot>; + // copy references of snapshots into vector to sort them + std::vector<EntitySnapshotReference> snapshots; + + result.memory.forEachSnapshot( + [&snapshots](armarx::armem::wm::EntitySnapshot& snapshot) + { snapshots.push_back(snapshot); }); + + // sort correctly + std::sort(snapshots.begin(), + snapshots.end(), + [](const EntitySnapshotReference& a, const EntitySnapshotReference& b) + { return a.get().id().timestamp < b.get().id().timestamp; }); + + for (const auto& snapshot : snapshots) + { + // assert times in correct interval + ARMARX_CHECK_LESS_EQUAL(timeStart, snapshot.get().id().timestamp); + ARMARX_CHECK_GREATER_EQUAL(timeEnd, snapshot.get().id().timestamp); + + if (!callback(snapshot.get())) + { + return snapshot; + } + } + + timeStart = timeEnd + armarx::core::time::Duration::MicroSeconds(1); + } + else + { + ARMARX_ERROR + << deactivateSpam() + << "Received an error in ReadStream when querying data from a " + "memory. The error was '" + << result.errorMessage + << "'. Continue with stream, perhaps the memory was not yet initialized."; + } + } + return std::nullopt; + } + + void + ReadStream::openInBackground(const Reader& reader, + const MemoryID& queriedId, + CallbackT& continueIf, + const armarx::core::time::Frequency& maxFreq) + { + std::scoped_lock r(runningMutex); + std::scoped_lock l(threadMutex); + + streaming = true; + streamStoppedExternally = true; + + // thread is part of this streamreader. Do not detach + this->runningThread = + std::thread([&]() { this->stream(reader, queriedId, continueIf, maxFreq); }); + } + + std::optional<wm::EntitySnapshot> + ReadStream::open(const Reader& reader, + const MemoryID& queriedId, + CallbackT& continueIf, + const armarx::core::time::Frequency& maxFreq) + { + std::unique_lock r(runningMutex); + std::scoped_lock l(threadMutex); + + streaming = true; + streamStoppedExternally = true; + + std::optional<wm::EntitySnapshot> ret = std::nullopt; + + // thread is part of this streamreader. Do not detach + this->runningThread = + std::thread([&]() { ret = this->stream(reader, queriedId, continueIf, maxFreq); }); + + r.unlock(); + // we keep the threadMutex to make sure that noone is replacing the thread member + + if (this->runningThread.joinable()) + { + // wait until finished + this->runningThread.join(); + } + + return ret; + } + + void + ReadStream::close() + { + this->streamStoppedExternally = true; + } +} // namespace armarx::armem::client diff --git a/source/RobotAPI/libraries/armem/client/ReadStream.h b/source/RobotAPI/libraries/armem/client/ReadStream.h new file mode 100644 index 0000000000000000000000000000000000000000..14047a44d3631862920a450da44d90526facdea3 --- /dev/null +++ b/source/RobotAPI/libraries/armem/client/ReadStream.h @@ -0,0 +1,60 @@ +#pragma once + +#include <functional> +#include <thread> + +#include <ArmarXCore/core/time/Clock.h> +#include <ArmarXCore/core/time/DateTime.h> +#include <ArmarXCore/core/time/Duration.h> +#include <ArmarXCore/core/time/Frequency.h> +#include <ArmarXCore/core/time/Metronome.h> + +#include <RobotAPI/libraries/armem/client/query/Builder.h> +#include <RobotAPI/libraries/armem/core/MemoryID.h> +#include <RobotAPI/libraries/armem/core/wm.h> + +#include "Reader.h" + +namespace armarx::armem::client +{ + /** + * @brief A readstream from the memory. + */ + class ReadStream + { + public: + using CallbackT = std::function<bool(const wm::EntitySnapshot&)>; + + /// Opens a stream and asyncronouely queries the memory and calls the callback function until the callback returns False. + void openInBackground(const Reader& reader, + const MemoryID& queriedId, + CallbackT& callback, + const armarx::core::time::Frequency& maxFreq = + armarx::core::time::Frequency::Hertz(10)); + + /// close an opened stream + void close(); + + /// open a stream and block until the stop function returns True. Return the object that yielded True. + std::optional<wm::EntitySnapshot> open(const Reader& reader, + const MemoryID& queriedId, + CallbackT& stopIf, + const armarx::core::time::Frequency& maxFreq = + armarx::core::time::Frequency::Hertz(10)); + + protected: + std::optional<wm::EntitySnapshot> stream(const Reader& reader, + const MemoryID& queriedId, + CallbackT& callback, + const armarx::core::time::Frequency& maxFreq); + + private: + mutable std::mutex runningMutex; + mutable std::mutex threadMutex; + std::thread runningThread; + + std::atomic_bool streaming = false; + std::atomic_bool streamStoppedExternally = false; + }; + +} // namespace armarx::armem::client