diff --git a/source/RobotAPI/libraries/armem/client/ReadStream.cpp b/source/RobotAPI/libraries/armem/client/ReadStream.cpp index 0c508a0306403797b3f8eabbf513733928ab9edf..e341e139f66b40168ba4dd9b809d921835953838 100644 --- a/source/RobotAPI/libraries/armem/client/ReadStream.cpp +++ b/source/RobotAPI/libraries/armem/client/ReadStream.cpp @@ -1,141 +1,172 @@ #include "ReadStream.h" +#include <ArmarXCore/core/time/Clock.h> + +#include <RobotAPI/libraries/armem/client/query/Builder.h> + namespace armarx::armem::client { - std::optional<wm::EntitySnapshot> - ReadStream::stream(const Reader& reader, - const MemoryID& queriedId, - CallbackT& callback, - const armarx::core::time::Frequency& maxFreq) + ReadStream::ReadStream() : metronome{armarx::core::time::Frequency::Hertz(10)} { - // noone else is allowed to open a stream while this stream is running - std::scoped_lock l(runningMutex); - + } - armarx::core::time::Metronome metronome(maxFreq); - auto timeStart = armarx::core::time::Clock::Now(); + ReadStream::ReadStream(const Reader& reader, + const MemoryID& queriedId, + const core::time::Frequency& maxPollFrequency) : + reader{reader}, queriedId{queriedId}, metronome{maxPollFrequency} + { + timeStart = armarx::core::time::Clock::Now(); + } - bool callbackReturnedFalse = false; - while (not streamStoppedExternally && not callbackReturnedFalse) + std::optional<wm::EntitySnapshot> + ReadStream::pollBlocking(SnapshotCallbackT& callback) + { + if (isPolling.exchange(true)) { - // make sure to not busy wait. Also wait until probably data is available in first iteration - metronome.waitForNextTick(); - - auto timeEnd = armarx::core::time::Clock::Now(); - auto makeQuery = [&timeStart, &timeEnd](const MemoryID& id) - { - query::Builder qb; - query::CoreSegmentSelector& core = id.hasCoreSegmentName() - ? qb.coreSegments().withID(id) - : qb.coreSegments().all(); - query::ProviderSegmentSelector& prov = id.hasProviderSegmentName() - ? core.providerSegments().withID(id) - : core.providerSegments().all(); - query::EntitySelector& entity = - id.hasEntityName() ? prov.entities().withID(id) : prov.entities().all(); - entity.snapshots().timeRange(timeStart, timeEnd); + throw armarx::armem::error::ReadStreamAlreadyPolling(queriedId, __PRETTY_FUNCTION__); + } + pollingStoppedExternally = false; - return qb.buildQueryInput(); - }; + auto result = _pollBlocking(callback); - auto query = makeQuery(queriedId); + isPolling = false; + return result; + } - auto result = reader.query(query); + void + ReadStream::pollAsync(SnapshotCallbackT& callback) + { + if (isPolling.exchange(true)) + { + throw armarx::armem::error::ReadStreamAlreadyPolling(queriedId, __PRETTY_FUNCTION__); + } + pollingStoppedExternally = false; - if (result.success) - { - using EntitySnapshotReference = - std::reference_wrapper<armarx::armem::wm::EntitySnapshot>; - // copy references of snapshots into vector to sort them - std::vector<EntitySnapshotReference> snapshots; - - result.memory.forEachSnapshot( - [&snapshots](armarx::armem::wm::EntitySnapshot& snapshot) - { snapshots.push_back(snapshot); }); - - // sort correctly - std::sort(snapshots.begin(), - snapshots.end(), - [](const EntitySnapshotReference& a, const EntitySnapshotReference& b) - { return a.get().id().timestamp < b.get().id().timestamp; }); - - for (const auto& snapshot : snapshots) - { - // assert times in correct interval - ARMARX_CHECK_LESS_EQUAL(timeStart, snapshot.get().id().timestamp); - ARMARX_CHECK_GREATER_EQUAL(timeEnd, snapshot.get().id().timestamp); - - if (!callback(snapshot.get())) - { - return snapshot; - } - } + this->pollingThread = std::thread([&]() { this->_pollBlocking(callback); }); + } - timeStart = timeEnd + armarx::core::time::Duration::MicroSeconds(1); - } - else + std::optional<wm::EntitySnapshot> + ReadStream::_pollBlocking(SnapshotCallbackT& callback) + { + while (not pollingStoppedExternally) + { + auto snapshot = pollOnce(callback); + if (snapshot.has_value()) { - ARMARX_ERROR - << deactivateSpam() - << "Received an error in ReadStream when querying data from a " - "memory. The error was '" - << result.errorMessage - << "'. Continue with stream, perhaps the memory was not yet initialized."; + return snapshot; } } return std::nullopt; } void - ReadStream::openInBackground(const Reader& reader, - const MemoryID& queriedId, - CallbackT& continueIf, - const armarx::core::time::Frequency& maxFreq) + ReadStream::stop() + { + pollingStoppedExternally = true; + if (pollingThread.joinable()) + { + pollingThread.join(); + isPolling = false; + } + } + + std::optional<wm::EntitySnapshot> + ReadStream::pollOnce(SnapshotCallbackT& callback) { - std::scoped_lock r(runningMutex); - std::scoped_lock l(threadMutex); + if (isPolling.exchange(true)) + { + throw armarx::armem::error::ReadStreamAlreadyPolling(queriedId, __PRETTY_FUNCTION__); + } - streaming = true; - streamStoppedExternally = true; + auto snapshot = _pollOnce(callback); - // thread is part of this streamreader. Do not detach - this->runningThread = - std::thread([&]() { this->stream(reader, queriedId, continueIf, maxFreq); }); + isPolling = false; + return snapshot; } std::optional<wm::EntitySnapshot> - ReadStream::open(const Reader& reader, - const MemoryID& queriedId, - CallbackT& continueIf, - const armarx::core::time::Frequency& maxFreq) + ReadStream::_pollOnce(SnapshotCallbackT& callback) { - std::unique_lock r(runningMutex); - std::scoped_lock l(threadMutex); + // Make sure to not busy wait. Also wait until probably data is available in first iteration. + metronome.waitForNextTick(); - streaming = true; - streamStoppedExternally = true; + auto timeEnd = armarx::core::time::Clock::Now(); + auto makeQuery = [this, &timeEnd](const MemoryID& id) + { + query::Builder qb; + query::CoreSegmentSelector& core = + id.hasCoreSegmentName() ? qb.coreSegments().withID(id) : qb.coreSegments().all(); + query::ProviderSegmentSelector& prov = id.hasProviderSegmentName() + ? core.providerSegments().withID(id) + : core.providerSegments().all(); + query::EntitySelector& entity = + id.hasEntityName() ? prov.entities().withID(id) : prov.entities().all(); + entity.snapshots().timeRange(timeStart, timeEnd); - std::optional<wm::EntitySnapshot> ret = std::nullopt; + return qb.buildQueryInput(); + }; - // thread is part of this streamreader. Do not detach - this->runningThread = - std::thread([&]() { ret = this->stream(reader, queriedId, continueIf, maxFreq); }); + auto query = makeQuery(queriedId); - r.unlock(); - // we keep the threadMutex to make sure that noone is replacing the thread member + auto result = reader.query(query); - if (this->runningThread.joinable()) + if (result.success) { - // wait until finished - this->runningThread.join(); + using EntitySnapshotReference = + std::reference_wrapper<armarx::armem::wm::EntitySnapshot>; + // Copy references of snapshots into vector to sort them. + std::vector<EntitySnapshotReference> snapshots; + + result.memory.forEachSnapshot([&snapshots](armarx::armem::wm::EntitySnapshot& snapshot) + { snapshots.push_back(snapshot); }); + + // Sort correctly. + std::sort(snapshots.begin(), + snapshots.end(), + [](const EntitySnapshotReference& a, const EntitySnapshotReference& b) + { return a.get().id().timestamp < b.get().id().timestamp; }); + + // Determine the next start time. + DateTime nextStart; + if (snapshots.size() > 0) + { + // Because they are sorted, back() has the highest time stamp. + nextStart = snapshots.back().get().id().timestamp + + armarx::core::time::Duration::MicroSeconds(1); + } + else + { + nextStart = timeStart; + } + + // Call the callback on all snapshots. + for (const auto& snapshot : snapshots) + { + // Assert times in correct interval. + ARMARX_CHECK_LESS_EQUAL(timeStart, snapshot.get().id().timestamp); + ARMARX_CHECK_GREATER_EQUAL(timeEnd, snapshot.get().id().timestamp); + + const bool continue_ = callback(snapshot.get()); + if (not continue_) + { + return snapshot; + } + } + + timeStart = nextStart; + } + else + { + ARMARX_WARNING + << deactivateSpam() + << "Received an error in ReadStream when querying data from a " + "memory. The error was '" + << result.errorMessage + << "'. Continue with stream, perhaps the memory was not yet initialized."; } - return ret; + return std::nullopt; } - void - ReadStream::close() - { - this->streamStoppedExternally = true; - } + } // namespace armarx::armem::client diff --git a/source/RobotAPI/libraries/armem/client/ReadStream.h b/source/RobotAPI/libraries/armem/client/ReadStream.h index 14047a44d3631862920a450da44d90526facdea3..bd549770f1831cb849a1b9716609dd7816362c49 100644 --- a/source/RobotAPI/libraries/armem/client/ReadStream.h +++ b/source/RobotAPI/libraries/armem/client/ReadStream.h @@ -1,60 +1,138 @@ #pragma once #include <functional> +#include <optional> #include <thread> -#include <ArmarXCore/core/time/Clock.h> #include <ArmarXCore/core/time/DateTime.h> -#include <ArmarXCore/core/time/Duration.h> #include <ArmarXCore/core/time/Frequency.h> #include <ArmarXCore/core/time/Metronome.h> -#include <RobotAPI/libraries/armem/client/query/Builder.h> #include <RobotAPI/libraries/armem/core/MemoryID.h> -#include <RobotAPI/libraries/armem/core/wm.h> +#include <RobotAPI/libraries/armem/core/wm/memory_definitions.h> #include "Reader.h" namespace armarx::armem::client { /** - * @brief A readstream from the memory. + * @brief A stream reading entity snapshots from the memory. + * + * After constructing a ReadStream, polling can be started in three ways: + * + * 1. Run a polling loop in this thread, blocking execution until terminated. + * See pollBlocking(). + * 2. Run a polling loop in a new, separate thread, until it is stopped via stop(). + * See pollAsync() and stop(). + * 3. Perform a single query and process the result, embedded in your own loop or other control + * flow logic. + * See pollOnce(). */ class ReadStream { public: - using CallbackT = std::function<bool(const wm::EntitySnapshot&)>; - - /// Opens a stream and asyncronouely queries the memory and calls the callback function until the callback returns False. - void openInBackground(const Reader& reader, - const MemoryID& queriedId, - CallbackT& callback, - const armarx::core::time::Frequency& maxFreq = - armarx::core::time::Frequency::Hertz(10)); - - /// close an opened stream - void close(); - - /// open a stream and block until the stop function returns True. Return the object that yielded True. - std::optional<wm::EntitySnapshot> open(const Reader& reader, - const MemoryID& queriedId, - CallbackT& stopIf, - const armarx::core::time::Frequency& maxFreq = - armarx::core::time::Frequency::Hertz(10)); - - protected: - std::optional<wm::EntitySnapshot> stream(const Reader& reader, - const MemoryID& queriedId, - CallbackT& callback, - const armarx::core::time::Frequency& maxFreq); + /** + * @brief Callback called on each entity snapshot in the queried ID. + * + * If it returns false, the stream is stopped. + */ + using SnapshotCallbackT = std::function<bool(const wm::EntitySnapshot&)>; + + /** + * @brief Inizialize a ReadStream which does not represent a stream. + */ + ReadStream(); + + /** + * @brief Initialize a read stream. + * + * @param reader + * The reader to perform the queries. + * @param queriedId + * The memory ID in which all snapshots should be processed by the stream. + * @param maxPollFrequency + * The maximum frequency with which queries are performed. The + * real frequency might be lower. + */ + ReadStream(const Reader& reader, + const MemoryID& queriedId, + const armarx::core::time::Frequency& maxPollFrequency = + armarx::core::time::Frequency::Hertz(10)); + + + /** + * @brief Poll in this thread as long as callback returns true. + * + * @param callback Function to call on each entity snapshot. + * @return The snapshot object that returns false. + * @throw armarx::armem::error::ReadStreamAlreadyPolling If the stream is already polling. + */ + std::optional<wm::EntitySnapshot> pollBlocking(SnapshotCallbackT& callback); + + /** + * @brief Poll in a new thread as long as callback returns true. + * + * Note that callback will be called in a separate thread, so take care of synchronizing + * access to variables in the callback appropriately. + * + * Roughly equivalent to: + * @code + * std::thread thread([]() { stream.pollBlocking(); }); + * @endcode + * + * @param callback Function to call on each entity snapshot. + * @throw armarx::armem::error::ReadStreamAlreadyPolling If the stream is already polling. + */ + void pollAsync(SnapshotCallbackT& callback); + /** + * @brief Stop a running polling loop. + * + * If a polling thread has been started by pollAsync() before, joins the thread. + */ + void stop(); + + /** + * @brief Perform one query and call the callbacks on each snapshot. + * + * This allows you to define your own loop, for example: + * + * @code + * bool condition = true; + * while (condition) + * { + * auto snapshot = stream.pollOnce(callback); + * + * ... + * + * if (...) + * { + * condition = false; + * } + * } + * @endcode + * + * @param callback Function to call on each entity snapshot. + * @throw armarx::armem::error::ReadStreamAlreadyPolling If the stream is already polling. + */ + std::optional<wm::EntitySnapshot> pollOnce(SnapshotCallbackT& callback); + + + private: + std::optional<wm::EntitySnapshot> _pollBlocking(SnapshotCallbackT& callback); + std::optional<wm::EntitySnapshot> _pollOnce(SnapshotCallbackT& callback); + private: - mutable std::mutex runningMutex; - mutable std::mutex threadMutex; - std::thread runningThread; + Reader reader; + MemoryID queriedId; + + armarx::core::time::Metronome metronome; + armarx::DateTime timeStart; + + std::atomic_bool isPolling = false; + std::atomic_bool pollingStoppedExternally = false; - std::atomic_bool streaming = false; - std::atomic_bool streamStoppedExternally = false; + std::thread pollingThread; }; } // namespace armarx::armem::client diff --git a/source/RobotAPI/libraries/armem/core/error/ArMemError.cpp b/source/RobotAPI/libraries/armem/core/error/ArMemError.cpp index 354b95712761f6a14d3fb6deb5baeff6b2a18aba..7abe9891d8a1d4f599ae1e135d0dbe8e0d2a29dd 100644 --- a/source/RobotAPI/libraries/armem/core/error/ArMemError.cpp +++ b/source/RobotAPI/libraries/armem/core/error/ArMemError.cpp @@ -246,4 +246,19 @@ namespace armarx::armem::error return sstream.str(); } + ReadStreamAlreadyPolling::ReadStreamAlreadyPolling(const MemoryID& queriedId, + const std::string& calledFunction) : + ArMemError(makeMsg(queriedId, calledFunction)) + { + } + + std::string + ReadStreamAlreadyPolling::makeMsg(const MemoryID& queriedId, const std::string& calledFunction) + { + std::stringstream ss; + ss << "The ReadStream for " << queriedId << " was already running when " << calledFunction + << " was called."; + return ss.str(); + } + } // namespace armarx::armem::error diff --git a/source/RobotAPI/libraries/armem/core/error/ArMemError.h b/source/RobotAPI/libraries/armem/core/error/ArMemError.h index 69ab7d4ddff8e22791272c520386e2fdff3d6e6f..67435f8d3c4142d3453fc6f6626e4e48ebb33cad 100644 --- a/source/RobotAPI/libraries/armem/core/error/ArMemError.h +++ b/source/RobotAPI/libraries/armem/core/error/ArMemError.h @@ -220,4 +220,15 @@ namespace armarx::armem::error static std::string makeMsg(const std::string& proxyName, const std::string& message = ""); }; + /** + * @brief Indicates that a ReadStream is already polling when a polling method was called. + */ + class ReadStreamAlreadyPolling : public ArMemError + { + public: + ReadStreamAlreadyPolling(const MemoryID& queriedId, const std::string& calledFunction); + + static std::string makeMsg(const MemoryID& queriedId, const std::string& calledFunction); + }; + } // namespace armarx::armem::error