From a415ebd1905a03a4bc429c0cb0d251f1ec5954cb Mon Sep 17 00:00:00 2001 From: Fabian Paus <fabian.paus@kit.edu> Date: Wed, 16 Oct 2019 13:48:27 +0200 Subject: [PATCH] Change storage behavior --- .../components/ArViz/ArVizStorage.cpp | 151 ++---- .../RobotAPI/components/ArViz/ArVizStorage.h | 11 +- .../components/ArViz/Coin/Visualizer.cpp | 456 +++++++++--------- source/RobotAPI/interface/ArViz/Component.ice | 7 + 4 files changed, 293 insertions(+), 332 deletions(-) diff --git a/source/RobotAPI/components/ArViz/ArVizStorage.cpp b/source/RobotAPI/components/ArViz/ArVizStorage.cpp index da2708ee2..d4791fc8e 100644 --- a/source/RobotAPI/components/ArViz/ArVizStorage.cpp +++ b/source/RobotAPI/components/ArViz/ArVizStorage.cpp @@ -24,6 +24,8 @@ #include <ArmarXCore/core/exceptions/local/ExpressionException.h> +#include <ArmarXCore/core/time/TimeUtil.h> + using namespace armarx; @@ -39,8 +41,9 @@ void ArVizStorage::onInitComponent() void ArVizStorage::onConnectComponent() { - updateCounterBase = 0; - updateHistory.clear(); + updateCounter = 0; + currentState.clear(); + history.clear(); } @@ -65,12 +68,41 @@ void ArVizStorage::updateLayers(viz::LayerUpdateSeq const& updates, const Ice::C { std::unique_lock<std::mutex> lock(historyMutex); - updateHistory.insert(updateHistory.end(), updates.begin(), updates.end()); + updateCounter += 1; + IceUtil::Time now = TimeUtil::GetTime(); + long nowInMicroSeconds = now.toMicroSeconds(); + + for (auto& update : updates) + { + auto& historyEntry = history.emplace_back(); + historyEntry.updateCounter = updateCounter; + historyEntry.timestampInMicroseconds = nowInMicroSeconds; + historyEntry.update = update; + + // Insert or create the layer + bool found = false; + for (auto& layer : currentState) + { + if (layer.update.component == update.component + && layer.update.name == update.name) + { + layer = historyEntry; + found = true; + continue; + } + } + if (!found) + { + currentState.push_back(historyEntry); + } + } - long currentHistorySize = updateHistory.size(); + long currentHistorySize = history.size(); if (currentHistorySize >= maxHistorySize) { - compressHistory(); + // TODO: Implement serialization + // Probably also async serialization not blocking this method call + history.clear(); } } @@ -80,115 +112,16 @@ viz::LayerUpdates armarx::ArVizStorage::pullUpdatesSince(Ice::Long updateCounter { viz::LayerUpdates result; - long updateIndex = updateCounter - updateCounterBase; - - if (updateIndex < 0) - { - updateIndex = 0; - } - std::unique_lock<std::mutex> lock(historyMutex); - long nextUpdateIndex = (long)updateHistory.size(); - result.updateCounter = updateCounterBase + nextUpdateIndex; - if (updateIndex >= nextUpdateIndex) - { - // No new updates since last pull - return result; - } - - // Some new updates since last pull - result.updates.reserve(nextUpdateIndex - updateIndex); - for (long index = updateIndex; index < nextUpdateIndex; ++index) + result.updates.reserve(currentState.size()); + for (auto& layer : currentState) { - // TODO: Optimize since we now only support full layer updates - // => Only return the latest update per layer - result.updates.push_back(updateHistory[index]); - } - - return result; -} - -void ArVizStorage::compressHistory() -{ - long currentHistorySize = updateHistory.size(); - long updateCounterBeforeCompression = updateCounterBase + currentHistorySize; - // Compress history to one element - // TODO (Later): Save this history (to the disk?) - - using LayerID = std::pair<std::string, std::string>; - std::map<LayerID, viz::LayerUpdate> compressed; - - for (viz::LayerUpdate& update : updateHistory) - { - LayerID layerID(update.component, update.name); - auto iter = compressed.find(layerID); - if (update.action == viz::Layer_CREATE_OR_UPDATE) - { - if (iter == compressed.end()) - { - // Layer does not exist, so we create it - compressed.emplace(layerID, update); - continue; - } - - // Layer does exist, so we need to merge the update - viz::LayerUpdate& existing = iter->second; - // Do not merge but simply assign => We only do full layer updates - existing = update; -#if 0 - for (viz::ElementPtr const& updateElement : update.elements) - { - std::string const& updateID = updateElement->id; - auto matchingIter = std::find_if(existing.elements.begin(), existing.elements.end(), - [updateID](viz::ElementPtr const & element) - { - return element->id == updateID; - }); - - if (matchingIter == existing.elements.end()) - { - // Element does not exist, so we create it - existing.elements.push_back(updateElement); - continue; - } - - // Update existing element - switch (updateElement->action) - { - case viz::Element_ADD: - *matchingIter = updateElement; - break; - - case viz::Element_REMOVE: - existing.elements.erase(matchingIter); - break; - } - } -#endif - } - else if (update.action == viz::Layer_DELETE) + if (updateCounter < layer.updateCounter) { - compressed.erase(iter); + result.updates.push_back(layer.update); } } - // Newest update counter should point to the same index - long newHistorySize = (long)compressed.size(); - updateCounterBase += currentHistorySize - newHistorySize; - // Add a single merged update for each layer - updateHistory.clear(); - updateHistory.reserve(newHistorySize); - for (auto& [id, layer] : compressed) - { - updateHistory.push_back(std::move(layer)); - } - - ARMARX_VERBOSE << "Compressed history from " << currentHistorySize - << " to " << newHistorySize; - - long updateCounterAfterCompression = updateCounterBase + newHistorySize; - - ARMARX_CHECK_EQUAL(updateCounterAfterCompression, updateCounterBeforeCompression) - << "The update counters before and after history compression should not change"; + return result; } diff --git a/source/RobotAPI/components/ArViz/ArVizStorage.h b/source/RobotAPI/components/ArViz/ArVizStorage.h index 34139fa8f..506d4950e 100644 --- a/source/RobotAPI/components/ArViz/ArVizStorage.h +++ b/source/RobotAPI/components/ArViz/ArVizStorage.h @@ -27,6 +27,8 @@ #include <RobotAPI/interface/ArViz.h> #include <ArmarXCore/core/Component.h> +#include <boost/circular_buffer.hpp> + #include <mutex> #include <atomic> @@ -105,16 +107,15 @@ namespace armarx // StorageInterface interface viz::LayerUpdates pullUpdatesSince(Ice::Long updateCounter, const Ice::Current&) override; - private: - void compressHistory(); - private: std::string topicName; int maxHistorySize = 100; std::mutex historyMutex; - long updateCounterBase = 0; - std::vector<viz::LayerUpdate> updateHistory; + std::vector<viz::TimestampedLayerUpdate> currentState; + // We ignore the history for now + std::vector<viz::TimestampedLayerUpdate> history; + long updateCounter = 0; }; } diff --git a/source/RobotAPI/components/ArViz/Coin/Visualizer.cpp b/source/RobotAPI/components/ArViz/Coin/Visualizer.cpp index 90cd15487..92135134d 100644 --- a/source/RobotAPI/components/ArViz/Coin/Visualizer.cpp +++ b/source/RobotAPI/components/ArViz/Coin/Visualizer.cpp @@ -9,300 +9,320 @@ #include "VisualizationRobot.h" -namespace armarx +namespace armarx::viz { - namespace viz + static const char* toString(CoinVisualizerState state) { - - static const char* toString(CoinVisualizerState state) + switch (state) { - switch (state) - { - case CoinVisualizerState::STOPPED: - return "STOPPED"; - case CoinVisualizerState::STARTING: - return "STARTING"; - case CoinVisualizerState::RUNNING: - return "RUNNING"; - case CoinVisualizerState::STOPPING: - return "STOPPING"; - } - return "UNKNOWN"; + case CoinVisualizerState::STOPPED: + return "STOPPED"; + case CoinVisualizerState::STARTING: + return "STARTING"; + case CoinVisualizerState::RUNNING: + return "RUNNING"; + case CoinVisualizerState::STOPPING: + return "STOPPING"; } + return "UNKNOWN"; + } - static void updateVisualizationCB(void* data, SoSensor* sensor) + static void updateVisualizationCB(void* data, SoSensor* sensor) + { + auto* visu = static_cast<CoinVisualizer*>(data); + visu->update(); + } + + struct TimedBlock + { + TimedBlock(const char* function) + : start(IceUtil::Time::now()) + , function(function) { - auto* visu = static_cast<CoinVisualizer*>(data); - visu->update(); } - CoinVisualizer::CoinVisualizer() + ~TimedBlock() { - registerVisualizationTypes(); + IceUtil::Time diff = IceUtil::Time::now() - start; + // Horrible: We need to plot this actually + ARMARX_INFO << "Time '" << function << "': " + << diff.toMilliSecondsDouble() << " ms"; + } - root = new SoSeparator; + private: + IceUtil::Time start; + const char* function; + }; - timerSensor = new SoTimerSensor(updateVisualizationCB, this); + CoinVisualizer::CoinVisualizer() + { + registerVisualizationTypes(); - float cycleTimeMS = 33.0f; - timerSensor->setInterval(SbTime(cycleTimeMS / 1000.0f)); - } + root = new SoSeparator; - CoinVisualizer::~CoinVisualizer() + timerSensor = new SoTimerSensor(updateVisualizationCB, this); + + float cycleTimeMS = 33.0f; + timerSensor->setInterval(SbTime(cycleTimeMS / 1000.0f)); + } + + CoinVisualizer::~CoinVisualizer() + { + // We need to clear the caches while Coin is still initialized + coin::clearRobotCache(); + } + + + void CoinVisualizer::startAsync(StorageInterfacePrx const& storage) + { + std::unique_lock<std::mutex> lock(stateMutex); + if (state != CoinVisualizerState::STOPPED) { - // We need to clear the caches while Coin is still initialized - coin::clearRobotCache(); + ARMARX_WARNING << "Unexpected state of visualizer\n" + << "Expected: STOPPED\n" + << "But got: " << toString(state); + return; } + state = CoinVisualizerState::STARTING; + stateStorage = storage; + SoSensorManager* sensor_mgr = SoDB::getSensorManager(); + sensor_mgr->insertTimerSensor(timerSensor); + } - void CoinVisualizer::startAsync(StorageInterfacePrx const& storage) + void CoinVisualizer::stop() + { + if (state == CoinVisualizerState::STOPPED) { - std::unique_lock<std::mutex> lock(stateMutex); - if (state != CoinVisualizerState::STOPPED) - { - ARMARX_WARNING << "Unexpected state of visualizer\n" - << "Expected: STOPPED\n" - << "But got: " << toString(state); - return; - } - state = CoinVisualizerState::STARTING; - stateStorage = storage; - - SoSensorManager* sensor_mgr = SoDB::getSensorManager(); - sensor_mgr->insertTimerSensor(timerSensor); + return; } - void CoinVisualizer::stop() + SoSensorManager* sensor_mgr = SoDB::getSensorManager(); + state = CoinVisualizerState::STOPPING; + while (state != CoinVisualizerState::STOPPED) { - if (state == CoinVisualizerState::STOPPED) - { - return; - } + sensor_mgr->processTimerQueue(); + usleep(1000); + } + sensor_mgr->removeTimerSensor(timerSensor); - SoSensorManager* sensor_mgr = SoDB::getSensorManager(); - state = CoinVisualizerState::STOPPING; - while (state != CoinVisualizerState::STOPPED) - { - sensor_mgr->processTimerQueue(); - usleep(1000); - } - sensor_mgr->removeTimerSensor(timerSensor); + } - } + void CoinVisualizer::apply(LayerUpdate const& update) + { + auto layerID = std::make_pair(update.component, update.name); + auto layerIt = layers.find(layerID); - void CoinVisualizer::apply(LayerUpdate const& update) + if (layerIt == layers.end()) { - auto layerID = std::make_pair(update.component, update.name); - auto layerIt = layers.find(layerID); + // Create a new layer + SoSeparator* coinNode = new SoSeparator; + coinNode->ref(); + root->addChild(coinNode); - if (layerIt == layers.end()) - { - // Create a new layer - SoSeparator* coinNode = new SoSeparator; - coinNode->ref(); - root->addChild(coinNode); - - layerIt = layers.emplace(layerID, CoinLayer(coinNode)).first; - } + layerIt = layers.emplace(layerID, CoinLayer(coinNode)).first; + } - std::set<std::string> updatedIDs; + std::set<std::string> updatedIDs; - // Add or update the elements in the update - CoinLayer& layer = layerIt->second; - for (auto& updatedElementPtr : update.elements) - { - Element const& updatedElement = *updatedElementPtr; + // Add or update the elements in the update + CoinLayer& layer = layerIt->second; + for (auto& updatedElementPtr : update.elements) + { + Element const& updatedElement = *updatedElementPtr; - updatedIDs.insert(updatedElement.id); + updatedIDs.insert(updatedElement.id); - std::type_index elementType = typeid(updatedElement); - coin::ElementVisualizer* visualizer = nullptr; - for (auto& entry : elementVisualizers) - { - if (entry.first == elementType) - { - visualizer = entry.second.get(); - break; - } - } - if (visualizer == nullptr) + std::type_index elementType = typeid(updatedElement); + coin::ElementVisualizer* visualizer = nullptr; + for (auto& entry : elementVisualizers) + { + if (entry.first == elementType) { - ARMARX_WARNING << "No visualizer for element type found: " - << boost::core::demangle(elementType.name()); - return; + visualizer = entry.second.get(); + break; } + } + if (visualizer == nullptr) + { + ARMARX_WARNING << "No visualizer for element type found: " + << boost::core::demangle(elementType.name()); + return; + } - auto oldElementIter = layer.elements.find(updatedElement.id); - if (oldElementIter != layer.elements.end()) - { - // Element already exists - coin::ElementVisualization& oldElement = *oldElementIter->second; - - bool updated = visualizer->update(updatedElement, &oldElement); - if (updated) - { - continue; - } - else - { - // Types are different, so we delete the old element and create a new one - layer.node->removeChild(oldElement.separator); - } - } + auto oldElementIter = layer.elements.find(updatedElement.id); + if (oldElementIter != layer.elements.end()) + { + // Element already exists + coin::ElementVisualization& oldElement = *oldElementIter->second; - auto elementVisu = visualizer->create(updatedElement); - if (elementVisu->separator) + bool updated = visualizer->update(updatedElement, &oldElement); + if (updated) { - layer.node->addChild(elementVisu->separator); - layer.elements[updatedElement.id] = std::move(elementVisu); + continue; } else { - std::string typeName = boost::core::demangle(elementType.name()); - ARMARX_WARNING << deactivateSpam(typeName, 1) - << "CoinElementVisualizer returned null for type: " << typeName << "\n" - << "You need to register a visualizer for each type in ArViz/Coin/Visualizer.cpp"; + // Types are different, so we delete the old element and create a new one + layer.node->removeChild(oldElement.separator); } } - // Remove the elements which were not contained in the update - for (auto iter = layer.elements.begin(); iter != layer.elements.end();) + auto elementVisu = visualizer->create(updatedElement); + if (elementVisu->separator) { - std::string const& elementID = iter->first; - bool wasUpdated = (updatedIDs.count(elementID) > 0); - if (wasUpdated) - { - ++iter; - } - else - { - auto& elementVisu = *iter->second; - layer.node->removeChild(elementVisu.separator); - iter = layer.elements.erase(iter); - } + layer.node->addChild(elementVisu->separator); + layer.elements[updatedElement.id] = std::move(elementVisu); + } + else + { + std::string typeName = boost::core::demangle(elementType.name()); + ARMARX_WARNING << deactivateSpam(typeName, 1) + << "CoinElementVisualizer returned null for type: " << typeName << "\n" + << "You need to register a visualizer for each type in ArViz/Coin/Visualizer.cpp"; } } - void CoinVisualizer::update() + // Remove the elements which were not contained in the update + for (auto iter = layer.elements.begin(); iter != layer.elements.end();) { + std::string const& elementID = iter->first; + bool wasUpdated = (updatedIDs.count(elementID) > 0); + if (wasUpdated) { - switch (state) - { - case CoinVisualizerState::STARTING: - { - std::unique_lock<std::mutex> lock(stateMutex); - storage = stateStorage; - root->removeAllChildren(); - layers.clear(); - updateCounter = 0; - state = CoinVisualizerState::RUNNING; - } - break; + ++iter; + } + else + { + auto& elementVisu = *iter->second; + layer.node->removeChild(elementVisu.separator); + iter = layer.elements.erase(iter); + } + } + } - case CoinVisualizerState::RUNNING: - break; + void CoinVisualizer::update() + { + { + switch (state) + { + case CoinVisualizerState::STARTING: + { + std::unique_lock<std::mutex> lock(stateMutex); + storage = stateStorage; + root->removeAllChildren(); + layers.clear(); + updateCounter = 0; + state = CoinVisualizerState::RUNNING; + } + break; - case CoinVisualizerState::STOPPING: - { - // After we have reached the STOPPED state, we know that updates are no longer running - state = CoinVisualizerState::STOPPED; - return; - } + case CoinVisualizerState::RUNNING: break; - case CoinVisualizerState::STOPPED: - return; + case CoinVisualizerState::STOPPING: + { + // After we have reached the STOPPED state, we know that updates are no longer running + state = CoinVisualizerState::STOPPED; + return; } + break; + + case CoinVisualizerState::STOPPED: + return; } + } - try + try + { + if (pullUpdateResult) { - if (pullUpdateResult) + // We never block the GUI thread with Ice calls: Only get result when call is completed + if (pullUpdateResult->isCompleted()) { - // We never block the GUI thread with Ice calls: Only get result when call is completed - if (pullUpdateResult->isCompleted()) + TimedBlock block("apply-updates"); + + auto layerIDsBefore = getLayerIDs(); + + LayerUpdates pulledUpdates = storage->end_pullUpdatesSince(pullUpdateResult); + pullUpdateResult = nullptr; + for (LayerUpdate const& update : pulledUpdates.updates) { - auto layerIDsBefore = getLayerIDs(); - - LayerUpdates pulledUpdates = storage->end_pullUpdatesSince(pullUpdateResult); - pullUpdateResult = nullptr; - for (LayerUpdate const& update : pulledUpdates.updates) - { - apply(update); - } - updateCounter = pulledUpdates.updateCounter; - - auto layerIDsAfter = getLayerIDs(); - if (layerIDsAfter != layerIDsBefore) - { - emitLayersChanged(layerIDsAfter); - } + apply(update); } - else + updateCounter = pulledUpdates.updateCounter; + + auto layerIDsAfter = getLayerIDs(); + if (layerIDsAfter != layerIDsBefore) { - return; + emitLayersChanged(layerIDsAfter); } + + ARMARX_INFO << "Updates: " << pulledUpdates.updates.size(); + } + else + { + return; } - pullUpdateResult = storage->begin_pullUpdatesSince(updateCounter); - } - catch (Ice::LocalException const& ex) - { - ARMARX_WARNING << "Lost connection to ArVizStorage\n" - << ex.what(); - std::unique_lock<std::mutex> lock(stateMutex); - storage = nullptr; - updateCounter = 0; - state = CoinVisualizerState::STOPPED; - pullUpdateResult = nullptr; - return; } + pullUpdateResult = storage->begin_pullUpdatesSince(updateCounter); } + catch (Ice::LocalException const& ex) + { + ARMARX_WARNING << "Lost connection to ArVizStorage\n" + << ex.what(); + std::unique_lock<std::mutex> lock(stateMutex); + storage = nullptr; + updateCounter = 0; + state = CoinVisualizerState::STOPPED; + pullUpdateResult = nullptr; + return; + } + } - void CoinVisualizer::showLayer(CoinLayerID const& id, bool visible) + void CoinVisualizer::showLayer(CoinLayerID const& id, bool visible) + { + auto iter = layers.find(id); + if (iter == layers.end()) { - auto iter = layers.find(id); - if (iter == layers.end()) - { - return; - } + return; + } - viz::CoinLayer& layer = iter->second; - int childIndex = root->findChild(layer.node); - if (childIndex < 0) - { - // Layer is currently not visible - if (visible) - { - root->addChild(layer.node); - } - } - else + viz::CoinLayer& layer = iter->second; + int childIndex = root->findChild(layer.node); + if (childIndex < 0) + { + // Layer is currently not visible + if (visible) { - // Layer is currently visible - if (!visible) - { - root->removeChild(childIndex); - } + root->addChild(layer.node); } } - - std::vector<CoinLayerID> CoinVisualizer::getLayerIDs() + else { - std::vector<CoinLayerID> result; - for (auto& entry : layers) + // Layer is currently visible + if (!visible) { - result.push_back(entry.first); + root->removeChild(childIndex); } - return result; } + } - void CoinVisualizer::emitLayersChanged(std::vector<CoinLayerID> const& layerIDs) + std::vector<CoinLayerID> CoinVisualizer::getLayerIDs() + { + std::vector<CoinLayerID> result; + for (auto& entry : layers) { - if (layersChangedCallback) - { - layersChangedCallback(layerIDs); - } + result.push_back(entry.first); } + return result; + } + void CoinVisualizer::emitLayersChanged(std::vector<CoinLayerID> const& layerIDs) + { + if (layersChangedCallback) + { + layersChangedCallback(layerIDs); + } } } diff --git a/source/RobotAPI/interface/ArViz/Component.ice b/source/RobotAPI/interface/ArViz/Component.ice index 8f08bb5b8..8b9ceba89 100644 --- a/source/RobotAPI/interface/ArViz/Component.ice +++ b/source/RobotAPI/interface/ArViz/Component.ice @@ -25,6 +25,13 @@ struct LayerUpdate sequence <LayerUpdate> LayerUpdateSeq; +struct TimestampedLayerUpdate +{ + LayerUpdate update; + long updateCounter = 0; + long timestampInMicroseconds = 0; +}; + struct LayerUpdates { LayerUpdateSeq updates; -- GitLab