From aff12d8dd74f58dfdf2b18fca452bf3741e23cc2 Mon Sep 17 00:00:00 2001 From: Julian Tusch <urhrf@student.kit.edu> Date: Sat, 11 May 2024 20:24:41 +0200 Subject: [PATCH] draft: merger logic --- .../libraries/skills/core/FluxioParameter.h | 2 +- .../libraries/skills/manager/CMakeLists.txt | 2 + .../manager/FluxioCompositeExecutor.cpp | 941 +++++++++++------- .../skills/manager/FluxioCompositeExecutor.h | 69 +- .../libraries/skills/manager/FluxioExecutor.h | 4 +- .../skills/manager/FluxioMergerExecutor.cpp | 96 ++ .../skills/manager/FluxioMergerExecutor.h | 31 + .../skills/manager/FluxioNativeExecutor.cpp | 2 - .../skills/manager/FluxioNativeExecutor.h | 1 - 9 files changed, 729 insertions(+), 419 deletions(-) create mode 100644 source/RobotAPI/libraries/skills/manager/FluxioMergerExecutor.cpp create mode 100644 source/RobotAPI/libraries/skills/manager/FluxioMergerExecutor.h diff --git a/source/RobotAPI/libraries/skills/core/FluxioParameter.h b/source/RobotAPI/libraries/skills/core/FluxioParameter.h index 124d8690e..d6e1c2d41 100644 --- a/source/RobotAPI/libraries/skills/core/FluxioParameter.h +++ b/source/RobotAPI/libraries/skills/core/FluxioParameter.h @@ -19,7 +19,7 @@ namespace armarx std::string type; bool required = true; bool isInput; - std::list<FluxioValue> values = {}; + std::list<FluxioValue> values; manager::dto::FluxioParameter toManagerIce() const; manager::dto::FluxioIdentificator toFluxioIdentificatorIce() const; diff --git a/source/RobotAPI/libraries/skills/manager/CMakeLists.txt b/source/RobotAPI/libraries/skills/manager/CMakeLists.txt index 14400219e..62c21f380 100644 --- a/source/RobotAPI/libraries/skills/manager/CMakeLists.txt +++ b/source/RobotAPI/libraries/skills/manager/CMakeLists.txt @@ -21,12 +21,14 @@ armarx_add_library( FluxioExecutor.cpp FluxioNativeExecutor.cpp FluxioCompositeExecutor.cpp + FluxioMergerExecutor.cpp HEADERS SkillManagerComponentPlugin.h SkillManagerComponentPluginUser.h FluxioExecutor.h FluxioNativeExecutor.h FluxioCompositeExecutor.h + FluxioMergerExecutor.h ) add_library(RobotAPI::skills::manager ALIAS RobotAPISkillsManager) diff --git a/source/RobotAPI/libraries/skills/manager/FluxioCompositeExecutor.cpp b/source/RobotAPI/libraries/skills/manager/FluxioCompositeExecutor.cpp index c51a8788c..37f5a2c16 100644 --- a/source/RobotAPI/libraries/skills/manager/FluxioCompositeExecutor.cpp +++ b/source/RobotAPI/libraries/skills/manager/FluxioCompositeExecutor.cpp @@ -5,425 +5,592 @@ #include <thread> #include <vector> +#include <IceUtil/UUID.h> + #include <ArmarXCore/core/logging/Logging.h> -#include "RobotAPI/libraries/skills/core/FluxioControlNode.h" -#include "RobotAPI/libraries/skills/core/FluxioNode.h" -#include "RobotAPI/libraries/skills/core/FluxioParameter.h" -#include "RobotAPI/libraries/skills/core/FluxioParameterNode.h" -#include "RobotAPI/libraries/skills/core/FluxioSkill.h" -#include "RobotAPI/libraries/skills/core/FluxioSkillStatusUpdate.h" -#include "RobotAPI/libraries/skills/core/FluxioSubSkillNode.h" -#include "RobotAPI/libraries/skills/core/SkillStatusUpdate.h" -#include "RobotAPI/libraries/skills/manager/SkillManagerComponentPlugin.h" - -namespace armarx::plugins { -FluxioCompositeExecutor::FluxioCompositeExecutor( - std::string &id, SkillManagerComponentPlugin *plugin, - skills::FluxioSkill *skill) - : FluxioExecutor(id, plugin, false), skill(skill) {} - -void FluxioCompositeExecutor::run(const std::string &executorName, - armarx::aron::data::DictPtr parameters) { - - ARMARX_INFO << "Running skill " << skill->name; - setStatus(skills::SkillStatus::Constructing); - - skills::FluxioNode *startNode; - if (!validateSkill(startNode)) { - - ARMARX_WARNING << "Skill execution cancelled."; - return; - } - - setStatus(skills::SkillStatus::Initializing); - subExecutionsMap.clear(); - setStatus(skills::SkillStatus::Preparing); - - std::atomic_bool skillRunning = true; - - // thread for polling status updates - std::thread([this, &skillRunning] { - while (skillRunning) { - this->pollSubStatuses(); - std::this_thread::sleep_for(std::chrono::milliseconds(250)); - } - }).detach(); +#include "RobotAPI/libraries/skills/core/FluxioEdge.h" +#include "RobotAPI/libraries/skills/manager/FluxioMergerExecutor.h" - // thread for the sub routines - const std::string newExecutorName = executorName + "/" + skill->name; - std::thread([this, startNode, &skillRunning, &newExecutorName] { - this->startSubRoutine(startNode, skillRunning, newExecutorName); - }).detach(); +namespace armarx::plugins +{ + FluxioCompositeExecutor::FluxioCompositeExecutor(std::string& id, + SkillManagerComponentPlugin* plugin, + skills::FluxioSkill* skill) : + FluxioExecutor(id, plugin, false), skill(skill) + { + } - setStatus(skills::SkillStatus::Running); + void + FluxioCompositeExecutor::run(const std::string& executorName, + armarx::aron::data::DictPtr parameters) + { - // main loop - while (skillRunning) { - std::this_thread::sleep_for(std::chrono::milliseconds(250)); - if (status->status == skills::SkillStatus::Aborted || - status->status == skills::SkillStatus::Failed || - status->status == skills::SkillStatus::Succeeded) { + ARMARX_INFO << "Running skill " << skill->name; + setStatus(skills::SkillStatus::Constructing); - skillRunning = false; + skills::FluxioEdge startEdge; + if (!validateSkill(startEdge)) + { - // the skill was aborted with the abort method,no need to abort the - // SubExecutions twice - if (status->status != skills::SkillStatus::Aborted) { - abortSubExecutions(); - } + ARMARX_WARNING << "Skill execution cancelled."; + return; + } - return; - } - } -} - -void FluxioCompositeExecutor::startSubRoutine( - const skills::FluxioNode *startNode, std::atomic_bool &running, - const std::string &executorName) { - if (!running) { - return; - } - - if (startNode == nullptr) { - ARMARX_WARNING << "Unexpected nullptr"; - setStatus(skills::SkillStatus::Failed); - return; - } - - if (startNode->nodeType == skills::FluxioNodeType::PARAMETER) { - // cast to parameter node - const auto *paramNodePtr = - dynamic_cast<const skills::FluxioParameterNode *>(startNode); - if (paramNodePtr == nullptr) { - ARMARX_WARNING << "Unexpected nullptr"; - setStatus(skills::SkillStatus::Failed); - return; + setStatus(skills::SkillStatus::Initializing); + subExecutionsMap.clear(); + setStatus(skills::SkillStatus::Preparing); + + std::atomic_bool skillRunning = true; + + // thread for polling status updates + std::thread( + [this, &skillRunning] + { + while (skillRunning) + { + this->pollSubStatuses(); + std::this_thread::sleep_for(std::chrono::milliseconds(250)); + } + }) + .detach(); + + // thread for the sub routines + const std::string newExecutorName = executorName + "/" + skill->name; + std::thread( + [this, &startEdge, &skillRunning, &newExecutorName] + { + this->startSubRoutine( + startEdge.toNodePtr, startEdge.toParameterPtr, skillRunning, newExecutorName); + }) + .detach(); + + setStatus(skills::SkillStatus::Running); + + // main loop + while (skillRunning) + { + std::this_thread::sleep_for(std::chrono::milliseconds(250)); + if (status->status == skills::SkillStatus::Aborted || + status->status == skills::SkillStatus::Failed || + status->status == skills::SkillStatus::Succeeded) + { + + skillRunning = false; + + // the skill was aborted with the abort method,no need to abort the + // SubExecutions twice + if (status->status != skills::SkillStatus::Aborted) + { + abortSubExecutions(); + } + + return; + } + } } - // make sure it is an event parameter - if (paramNodePtr->parameterPtr->type != "Event") { - ARMARX_WARNING << "Unexpected parameter type " - << paramNodePtr->parameterPtr->type; - setStatus(skills::SkillStatus::Failed); - return; - } + void + FluxioCompositeExecutor::startSubRoutine(const skills::FluxioNode* startNode, + const skills::FluxioParameter* startParameter, + std::atomic_bool& running, + const std::string& executorName) + { + if (!running) + { + return; + } - // get the event type and set the status accordingly - const std::string &eventType = paramNodePtr->parameterPtr->name; - if (eventType == "Succeeded") { - setStatus(skills::SkillStatus::Succeeded); - } else if (eventType == "Failed") { - setStatus(skills::SkillStatus::Failed); - } else if (eventType == "Aborted") { - setStatus(skills::SkillStatus::Aborted); - } else { - ARMARX_WARNING << "Unexpected event type " << eventType - << " for parameter " << paramNodePtr->parameterPtr->name; - setStatus(skills::SkillStatus::Failed); - } - } else if (startNode->nodeType == skills::FluxioNodeType::CONTROL) { - // cast to control node - const auto *controlNodePtr = - dynamic_cast<const skills::FluxioControlNode *>(startNode); - - if (controlNodePtr == nullptr) { - ARMARX_WARNING << "Unexpected nullptr"; - setStatus(skills::SkillStatus::Failed); - return; - } + if (startNode == nullptr) + { + ARMARX_WARNING << "Unexpected nullptr"; + setStatus(skills::SkillStatus::Failed); + return; + } - // check the controlType - if (controlNodePtr->controlType != "SPLITTER") { - ARMARX_WARNING << "Control node type " << controlNodePtr->controlType - << " is not implemented yet"; - setStatus(skills::SkillStatus::Failed); - return; + if (startNode->nodeType == skills::FluxioNodeType::PARAMETER) + { + // cast to parameter node + const auto* paramNodePtr = dynamic_cast<const skills::FluxioParameterNode*>(startNode); + if (paramNodePtr == nullptr) + { + ARMARX_WARNING << "Unexpected nullptr.Dynamic cast from FluxioNodePtr to " + "FluxioParameterNodePtr failed."; + setStatus(skills::SkillStatus::Failed); + return; + } + + this->handleParameterRoutine(paramNodePtr, running, executorName); + } + else if (startNode->nodeType == skills::FluxioNodeType::CONTROL) + { + // cast to control node + const auto* controlNodePtr = dynamic_cast<const skills::FluxioControlNode*>(startNode); + + if (controlNodePtr == nullptr) + { + ARMARX_WARNING << "Unexpected nullptr. Dynamic cast from FluxioNodePtr to " + "FluxioControlNodePtr failed."; + setStatus(skills::SkillStatus::Failed); + return; + } + + this->handleControlRoutine(controlNodePtr, startParameter, running, executorName); + } + else if (startNode->nodeType == skills::FluxioNodeType::SUBSKILL) + { + // cast to subskill node + const auto* subSkillNodePtr = + dynamic_cast<const skills::FluxioSubSkillNode*>(startNode); + if (subSkillNodePtr == nullptr || subSkillNodePtr->skillPtr == nullptr) + { + ARMARX_WARNING << "Unexpected nullptr. Dynamic cast from FluxioNodePtr to " + "FluxioSubSkillNodePtr failed."; + setStatus(skills::SkillStatus::Failed); + return; + } + + this->handleSubSkillRoutine(subSkillNodePtr, running, executorName); + } + else + { + ARMARX_WARNING + << "Unexpected node type '" + << skills::FluxioNodeTypeToString(startNode->nodeType).value_or("UNKNOWN") + << "' for Node with id " << startNode->nodeId; + setStatus(skills::SkillStatus::Failed); + return; + } } - // find all Nodes connected to the output parameters - std::vector<const skills::FluxioNode *> toNodes = {}; - for (const auto &[id, param] : controlNodePtr->parametersMap) { - // ignore the input parameters - if (param.isInput) { - continue; - } - - for (const auto &edge : skill->edges) { - if (edge.fromParameterPtr->id == id) { - toNodes.push_back(edge.toNodePtr); + void + FluxioCompositeExecutor::handleParameterRoutine( + const skills::FluxioParameterNode* parameterNode, + std::atomic_bool& running, + const std::string& /*unused*/) + { + if (parameterNode == nullptr || parameterNode->parameterPtr == nullptr) + { + ARMARX_WARNING << "Unexpected nullptr"; + setStatus(skills::SkillStatus::Failed); + return; } - } - } - const std::string newExecutorName = executorName + "/Splitter"; + // make sure it is an event parameter + if (parameterNode->parameterPtr->type != "Event") + { + ARMARX_WARNING << "Unexpected parameter type " << parameterNode->parameterPtr->type; + setStatus(skills::SkillStatus::Failed); + return; + } - // start subroutines in separate threads - const int size = toNodes.size() - 1; - for (int i = 0; i < size; i++) { - const auto &toNode = toNodes.at(i); - std::thread([this, &toNode, &running, &newExecutorName] { - startSubRoutine(toNode, running, newExecutorName); - }).detach(); + // get the event type and set the status accordingly + const std::string& eventType = parameterNode->parameterPtr->name; + if (eventType == "Succeeded") + { + setStatus(skills::SkillStatus::Succeeded); + } + else if (eventType == "Failed") + { + setStatus(skills::SkillStatus::Failed); + } + else if (eventType == "Aborted") + { + setStatus(skills::SkillStatus::Aborted); + } + else + { + ARMARX_WARNING << "Unexpected event type " << eventType << " for parameter " + << parameterNode->parameterPtr->name; + setStatus(skills::SkillStatus::Failed); + } } - // dont start a new thread for the last toNode, instead reuse this one - startSubRoutine(toNodes.at(size), running, newExecutorName); - } else if (startNode->nodeType == skills::FluxioNodeType::SUBSKILL) { - // cast to subskill node - const auto *subSkillNodePtr = - dynamic_cast<const skills::FluxioSubSkillNode *>(startNode); - if (subSkillNodePtr == nullptr || subSkillNodePtr->skillPtr == nullptr) { - ARMARX_WARNING << "Unexpected nullptr"; - setStatus(skills::SkillStatus::Failed); - return; - } + void + FluxioCompositeExecutor::handleSubSkillRoutine(const skills::FluxioSubSkillNode* subSkillNode, + std::atomic_bool& running, + const std::string& executorName) + { + if (subSkillNode == nullptr || subSkillNode->skillPtr == nullptr) + { + ARMARX_WARNING << "Unexpected nullptr"; + setStatus(skills::SkillStatus::Failed); + return; + } - // start skill execution - auto *executorPtr = - plugin->executeFluxioSkill(subSkillNodePtr->skillPtr->id, executorName); - if (executorPtr == nullptr) { - ARMARX_WARNING << "Failed to execute subskill " - << subSkillNodePtr->skillPtr->id; - setStatus(skills::SkillStatus::Failed); - return; - } - subExecutionsMap.insert({startNode->nodeId, executorPtr}); - - // wait until the skill has finished (or the super skill is finished) - skills::FluxioSkillStatusUpdate statusUpdate; - while (running) { - // sleep for a while - std::this_thread::sleep_for(std::chrono::milliseconds(250)); - executorPtr->getStatusUpdate(); // FIXME: bad design - const auto &statusUpdateIt = executorPtr->getStatus(); - - if (!statusUpdateIt.has_value()) { - ARMARX_INFO << "No status update from skill " - << subSkillNodePtr->skillPtr->name << " yet. Waiting..."; - continue; - } - - statusUpdate = statusUpdateIt.value(); - // did the status change? update statusUpdates list - const auto &lastUpdate = std::find_if( - statusUpdates.begin(), statusUpdates.end(), - [&startNode](const skills::FluxioSkillStatusUpdate &statusUpdate) { - return statusUpdate.subSkillNodeId == startNode->nodeId; - }); - - if (lastUpdate == statusUpdates.end() || - lastUpdate->status != statusUpdate.status) { - statusUpdates.push_front({armarx::DateTime::Now(), executorPtr->id, - startNode->nodeId, statusUpdate.status}); - } - - // check subskill is finished - if (statusUpdate.status == skills::SkillStatus::Succeeded || - statusUpdate.status == skills::SkillStatus::Failed || - statusUpdate.status == skills::SkillStatus::Aborted) { - break; - } - } + // start skill execution + auto* executorPtr = plugin->executeFluxioSkill(subSkillNode->skillPtr->id, executorName); + if (executorPtr == nullptr) + { + ARMARX_WARNING << "Failed to execute subskill " << subSkillNode->skillPtr->id; + setStatus(skills::SkillStatus::Failed); + return; + } + subExecutionsMap.insert({subSkillNode->nodeId, executorPtr}); + + // wait until the skill has finished (or the super skill is finished) + skills::FluxioSkillStatusUpdate statusUpdate; + while (running) + { + // sleep for a while + std::this_thread::sleep_for(std::chrono::milliseconds(250)); + executorPtr->getStatusUpdate(); // FIXME: bad design + const auto& statusUpdateIt = executorPtr->getStatus(); + + if (!statusUpdateIt.has_value()) + { + ARMARX_INFO << "No status update from skill " << subSkillNode->skillPtr->name + << " yet. Waiting..."; + continue; + } + + statusUpdate = statusUpdateIt.value(); + // did the status change? update statusUpdates list + const auto& lastUpdate = + std::find_if(statusUpdates.begin(), + statusUpdates.end(), + [&subSkillNode](const skills::FluxioSkillStatusUpdate& statusUpdate) + { return statusUpdate.subSkillNodeId == subSkillNode->nodeId; }); + + if (lastUpdate == statusUpdates.end() || lastUpdate->status != statusUpdate.status) + { + statusUpdates.push_front({armarx::DateTime::Now(), + executorPtr->id, + subSkillNode->nodeId, + statusUpdate.status}); + } + + // check subskill is finished + if (statusUpdate.status == skills::SkillStatus::Succeeded || + statusUpdate.status == skills::SkillStatus::Failed || + statusUpdate.status == skills::SkillStatus::Aborted) + { + break; + } + } - if (!running){ - return; + if (!running) + { + return; + } + + // check the final skill status get the output event parameter + const std::string& outputEventName = + statusUpdate.status == skills::SkillStatus::Succeeded ? "Succeeded" + : statusUpdate.status == skills::SkillStatus::Failed ? "Failed" + : statusUpdate.status == skills::SkillStatus::Aborted ? "Aborted" + : "Undefined"; + const auto& outputParam = std::find_if( + subSkillNode->skillPtr->parameters.begin(), + subSkillNode->skillPtr->parameters.end(), + [&outputEventName](const std::pair<std::string, skills::FluxioParameter>& param) + { + return (param.second.type == "Event" && !param.second.isInput && + param.second.name == outputEventName); + }); + + if (outputParam == subSkillNode->skillPtr->parameters.end()) + { + ARMARX_WARNING << "Skill " << subSkillNode->skillPtr->name + << " is missing the output event parameter " << outputEventName; + setStatus(skills::SkillStatus::Failed); + return; + } + + // find the connected edge + const auto& edge = + std::find_if(skill->edges.begin(), + skill->edges.end(), + [&subSkillNode, &outputParam](const skills::FluxioEdge& edge) + { + return (edge.fromNodePtr->nodeId == subSkillNode->nodeId && + edge.fromParameterPtr->id == outputParam->second.id); + }); + + if (edge == skill->edges.end()) + { + ARMARX_WARNING << "Skill " << skill->name + << " has no edge connected to the output event parameter " + << outputEventName; + setStatus(skills::SkillStatus::Failed); + return; + } + + // start new subroutine + const std::string& nextExecutorName = executorName + "/" + subSkillNode->skillPtr->name; + startSubRoutine(edge->toNodePtr, edge->toParameterPtr, running, nextExecutorName); } - // check the final skill status get the output event parameter - const std::string &outputEventName = - statusUpdate.status == skills::SkillStatus::Succeeded ? "Succeeded" - : statusUpdate.status == skills::SkillStatus::Failed ? "Failed" - : statusUpdate.status == skills::SkillStatus::Aborted ? "Aborted" - : "Undefined"; - const auto &outputParam = std::find_if( - subSkillNodePtr->skillPtr->parameters.begin(), - subSkillNodePtr->skillPtr->parameters.end(), - [&outputEventName]( - const std::pair<std::string, skills::FluxioParameter> ¶m) { - return (param.second.type == "Event" && !param.second.isInput && - param.second.name == outputEventName); - }); - - if (outputParam == subSkillNodePtr->skillPtr->parameters.end()) { - ARMARX_WARNING << "Skill " << subSkillNodePtr->skillPtr->name - << " is missing the output event parameter " - << outputEventName; - setStatus(skills::SkillStatus::Failed); - return; + void + FluxioCompositeExecutor::handleControlRoutine(const skills::FluxioControlNode* controlNode, + const skills::FluxioParameter* startParameter, + std::atomic_bool& running, + const std::string& executorName) + { + if (controlNode == nullptr) + { + ARMARX_WARNING << "Unexpected nullptr"; + setStatus(skills::SkillStatus::Failed); + return; + } + + // check the controlType + if (controlNode->controlType == "SPLITTER") + { + // find connected nodes and store the relevant edges + std::vector<skills::FluxioEdge> toEdges = {}; + for (const auto& [id, param] : controlNode->parametersMap) + { + // ignore the input parameters + if (param.isInput) + { + continue; + } + + for (const auto& edge : skill->edges) + { + if (edge.fromParameterPtr->id == id) + { + toEdges.push_back(edge); + } + } + } + + const std::string newExecutorName = executorName + "/Splitter"; + + // start subroutines in separate threads + for (const skills::FluxioEdge edge : toEdges) + { + std::thread( + [this, &edge, &running, &newExecutorName] { + startSubRoutine( + edge.toNodePtr, edge.toParameterPtr, running, newExecutorName); + }) + .detach(); + } + } + else if (controlNode->controlType == "MERGER") + { + // check the list of subexecutions for the node id + FluxioMergerExecutor* mergerExecutorPtr = nullptr; + const auto executorPtr = subExecutionsMap.find(controlNode->nodeId); + if (executorPtr == subExecutionsMap.end()) + { + // assemble paramId vector + std::vector<std::string> paramIds = {}; + for (const auto& [id, param] : controlNode->parametersMap) + { + if (param.isInput) + { + paramIds.push_back(id); + } + } + + // there is no execution for the merger yet, let´s start one + const std::string executionId = IceUtil::generateUUID(); + subExecutionsMap[executionId] = + new FluxioMergerExecutor(IceUtil::generateUUID(), plugin, paramIds); + + mergerExecutorPtr = + dynamic_cast<FluxioMergerExecutor*>(subExecutionsMap[executionId]); + } + else + { + mergerExecutorPtr = dynamic_cast<FluxioMergerExecutor*>(executorPtr->second); + } + + if (mergerExecutorPtr == nullptr) + { + ARMARX_WARNING << "Unexpected nullptr. Dynamic cast from FluxioExecutorPtr to " + "FluxioMergerExecutorPtr failed."; + setStatus(skills::SkillStatus::Failed); + return; + } + + // check in the token + mergerExecutorPtr->checkInToken(startParameter->id); + } + else + { + ARMARX_WARNING << "Unexpected control type " << controlNode->controlType; + setStatus(skills::SkillStatus::Failed); + return; + } } - // find the connected edge - const auto &edge = std::find_if( - skill->edges.begin(), skill->edges.end(), - [&startNode, &outputParam](const skills::FluxioEdge &edge) { - return (edge.fromNodePtr->nodeId == startNode->nodeId && - edge.fromParameterPtr->id == outputParam->second.id); - }); - - if (edge == skill->edges.end()) { - ARMARX_WARNING << "Skill " << skill->name - << " has no edge connected to the output event parameter " - << outputEventName; - setStatus(skills::SkillStatus::Failed); - return; + void + FluxioCompositeExecutor::abort() + { + ARMARX_INFO << "Aborting skill " << skill->name; + setStatus(skills::SkillStatus::Aborted); + abortSubExecutions(); } - // start new subroutine - const std::string &nextExecutorName = - executorName + "/" + subSkillNodePtr->skillPtr->name; - startSubRoutine(edge->toNodePtr, running, nextExecutorName); - } else { - ARMARX_WARNING << "Unexpected node type '" << skills::FluxioNodeTypeToString(startNode->nodeType).value_or("UNKNOWN") - << "' for Node with id " << startNode->nodeId; - setStatus(skills::SkillStatus::Failed); - return; - } -} - -void FluxioCompositeExecutor::abort() { - ARMARX_INFO << "Aborting skill " << skill->name; - setStatus(skills::SkillStatus::Aborted); - abortSubExecutions(); -} - -void FluxioCompositeExecutor::abortSubExecutions() { - for (const auto &[nodeId, executorPtr] : subExecutionsMap) { - auto s = executorPtr->getStatus(); - if (!s.has_value() || s->status == skills::SkillStatus::Succeeded || - s->status == skills::SkillStatus::Failed || - s->status == skills::SkillStatus::Aborted) { - continue; + void + FluxioCompositeExecutor::abortSubExecutions() + { + for (const auto& [nodeId, executorPtr] : subExecutionsMap) + { + auto s = executorPtr->getStatus(); + if (!s.has_value() || s->status == skills::SkillStatus::Succeeded || + s->status == skills::SkillStatus::Failed || + s->status == skills::SkillStatus::Aborted) + { + continue; + } + + executorPtr->abort(); + statusUpdates.push_front( + {armarx::DateTime::Now(), executorPtr->id, nodeId, skills::SkillStatus::Aborted}); + } } - executorPtr->abort(); - statusUpdates.push_front({armarx::DateTime::Now(), executorPtr->id, nodeId, - skills::SkillStatus::Aborted}); - } -} - -std::optional<std::vector<skills::FluxioSkillStatusUpdate>> -FluxioCompositeExecutor::getStatusUpdate() { - ARMARX_INFO << "Getting status updates for skill " << skill->name; - // convert statusupdates list to vector - auto ret = std::vector<skills::FluxioSkillStatusUpdate>(statusUpdates.begin(), - statusUpdates.end()); - - return ret; -} - -void FluxioCompositeExecutor::pollSubStatuses() { - for (const auto &[nodeId, executorPtr] : subExecutionsMap) { - executorPtr->getStatusUpdate(); - auto s = executorPtr->getStatus(); - if (!s.has_value()) { - continue; + std::optional<std::vector<skills::FluxioSkillStatusUpdate>> + FluxioCompositeExecutor::getStatusUpdate() + { + ARMARX_INFO << "Getting status updates for skill " << skill->name; + // convert statusupdates list to vector + auto ret = std::vector<skills::FluxioSkillStatusUpdate>(statusUpdates.begin(), + statusUpdates.end()); + + return ret; } - const auto &lastStatus = - find_if(statusUpdates.begin(), statusUpdates.end(), - [&](const skills::FluxioSkillStatusUpdate &statusUpdate) { - return statusUpdate.subSkillNodeId == nodeId; - }) - ->status; - if (lastStatus != s->status) { - statusUpdates.push_front( - {armarx::DateTime::Now(), executorPtr->id, nodeId, s->status}); + void + FluxioCompositeExecutor::pollSubStatuses() + { + for (const auto& [nodeId, executorPtr] : subExecutionsMap) + { + executorPtr->getStatusUpdate(); + auto s = executorPtr->getStatus(); + if (!s.has_value()) + { + continue; + } + + const auto& lastStatus = + find_if(statusUpdates.begin(), + statusUpdates.end(), + [&](const skills::FluxioSkillStatusUpdate& statusUpdate) + { return statusUpdate.subSkillNodeId == nodeId; }) + ->status; + if (lastStatus != s->status) + { + statusUpdates.push_front( + {armarx::DateTime::Now(), executorPtr->id, nodeId, s->status}); + } + } } - } -} - -bool FluxioCompositeExecutor::validateSkill(skills::FluxioNode *&ret) { - // check if skill is not null - if (skill == nullptr) { - ARMARX_WARNING << "Skill is not set"; - return false; - } - - // get start parameter - const auto &startParam = std::find_if( - skill->parameters.begin(), skill->parameters.end(), - [](const std::pair<std::string, skills::FluxioParameter> ¶m) { - return (param.second.type == "Event" && param.second.isInput && - param.second.name == "Start"); - }); - - if (startParam == skill->parameters.end()) { - ARMARX_WARNING << "Skill has no start parameter"; - return false; - } - - // get all parameter nodes for the start parameter - const auto &startNode = std::find_if( - skill->nodes.begin(), skill->nodes.end(), - [startParam]( - const std::pair<std::string, skills::FluxioNode *> &nodeEntry) { - if (nodeEntry.second->nodeType != skills::FluxioNodeType::PARAMETER) { - return false; + + bool + FluxioCompositeExecutor::validateSkill(skills::FluxioEdge& ret) + { + // check if skill is not null + if (skill == nullptr) + { + ARMARX_WARNING << "Skill is not set"; + return false; + } + + // get start parameter + const auto& startParam = + std::find_if(skill->parameters.begin(), + skill->parameters.end(), + [](const std::pair<std::string, skills::FluxioParameter>& param) { + return (param.second.type == "Event" && param.second.isInput && + param.second.name == "Start"); + }); + + if (startParam == skill->parameters.end()) + { + ARMARX_WARNING << "Skill has no start parameter"; + return false; + } + + // get all parameter nodes for the start parameter + const auto& startNode = + std::find_if(skill->nodes.begin(), + skill->nodes.end(), + [startParam](const std::pair<std::string, skills::FluxioNode*>& nodeEntry) + { + if (nodeEntry.second->nodeType != skills::FluxioNodeType::PARAMETER) + { + return false; + } + + const auto* paramNode = + dynamic_cast<const skills::FluxioParameterNode*>(nodeEntry.second); + return (paramNode->parameterPtr->id == startParam->second.id); + }); + + // there can only be one + if (startNode == skill->nodes.end()) + { + ARMARX_WARNING << "Skill has no start node"; + return false; + } + + // check if the start node is connected + const auto& startEdge = + std::find_if(skill->edges.begin(), + skill->edges.end(), + [startNode](const skills::FluxioEdge& edge) + { return (edge.fromNodePtr->nodeId == startNode->second->nodeId); }); + + // there can only be one + if (startEdge == skill->edges.end()) + { + ARMARX_WARNING << "Skill has noedge connected to the start node"; + return false; + } + + // get the output event parameters + const auto& outputParamsSuccess = + std::find_if(skill->parameters.begin(), + skill->parameters.end(), + [](const std::pair<std::string, skills::FluxioParameter>& param) + { + return (param.second.type == "Event" && !param.second.isInput && + param.second.name == "Succeeded"); + }); + const auto& outputParamsFailed = + std::find_if(skill->parameters.begin(), + skill->parameters.end(), + [](const std::pair<std::string, skills::FluxioParameter>& param) + { + return (param.second.type == "Event" && !param.second.isInput && + param.second.name == "Failed"); + }); + const auto& outputParamsAborted = + std::find_if(skill->parameters.begin(), + skill->parameters.end(), + [](const std::pair<std::string, skills::FluxioParameter>& param) + { + return (param.second.type == "Event" && !param.second.isInput && + param.second.name == "Aborted"); + }); + + if (outputParamsSuccess == skill->parameters.end() || + outputParamsFailed == skill->parameters.end() || + outputParamsAborted == skill->parameters.end()) + { + ARMARX_WARNING << "Skill is missing one or more output event parameters"; + return false; } - const auto *paramNode = - dynamic_cast<const skills::FluxioParameterNode *>(nodeEntry.second); - return (paramNode->parameterPtr->id == startParam->second.id); - }); - - // there can only be one - if (startNode == skill->nodes.end()) { - ARMARX_WARNING << "Skill has no start node"; - return false; - } - - // check if the start node is connected - const auto &startEdge = std::find_if( - skill->edges.begin(), skill->edges.end(), - [startNode](const skills::FluxioEdge &edge) { - return (edge.fromNodePtr->nodeId == startNode->second->nodeId); - }); - - // there can only be one - if (startEdge == skill->edges.end()) { - ARMARX_WARNING << "Skill has noedge connected to the start node"; - return false; - } - - // get the output event parameters - const auto &outputParamsSuccess = std::find_if( - skill->parameters.begin(), skill->parameters.end(), - [](const std::pair<std::string, skills::FluxioParameter> ¶m) { - return (param.second.type == "Event" && !param.second.isInput && - param.second.name == "Succeeded"); - }); - const auto &outputParamsFailed = std::find_if( - skill->parameters.begin(), skill->parameters.end(), - [](const std::pair<std::string, skills::FluxioParameter> ¶m) { - return (param.second.type == "Event" && !param.second.isInput && - param.second.name == "Failed"); - }); - const auto &outputParamsAborted = std::find_if( - skill->parameters.begin(), skill->parameters.end(), - [](const std::pair<std::string, skills::FluxioParameter> ¶m) { - return (param.second.type == "Event" && !param.second.isInput && - param.second.name == "Aborted"); - }); - - if (outputParamsSuccess == skill->parameters.end() || - outputParamsFailed == skill->parameters.end() || - outputParamsAborted == skill->parameters.end()) { - ARMARX_WARNING << "Skill is missing one or more output event parameters"; - return false; - } - - // TODO: the rest - - ARMARX_INFO << "Skill validation is not fully implemented yet."; - ret = startEdge->toNodePtr; - return true; -} - -void FluxioCompositeExecutor::setStatus(skills::SkillStatus status) { - const skills::FluxioSkillStatusUpdate update = {armarx::DateTime::Now(), id, - skill->id, status}; - this->status = update; - this->statusUpdates.push_front(update); -} + // TODO: the rest + + ARMARX_INFO << "Skill validation is not fully implemented yet."; + ret = *startEdge; + return true; + } + + void + FluxioCompositeExecutor::setStatus(skills::SkillStatus status) + { + const skills::FluxioSkillStatusUpdate update = { + armarx::DateTime::Now(), id, skill->id, status}; + this->status = update; + this->statusUpdates.push_front(update); + } } // namespace armarx::plugins diff --git a/source/RobotAPI/libraries/skills/manager/FluxioCompositeExecutor.h b/source/RobotAPI/libraries/skills/manager/FluxioCompositeExecutor.h index e7a379d60..d5a8f600e 100644 --- a/source/RobotAPI/libraries/skills/manager/FluxioCompositeExecutor.h +++ b/source/RobotAPI/libraries/skills/manager/FluxioCompositeExecutor.h @@ -4,38 +4,55 @@ #include <optional> #include <string> +#include "RobotAPI/libraries/skills/core/FluxioControlNode.h" +#include "RobotAPI/libraries/skills/core/FluxioParameter.h" +#include "RobotAPI/libraries/skills/core/FluxioParameterNode.h" #include "RobotAPI/libraries/skills/core/FluxioSkill.h" #include "RobotAPI/libraries/skills/core/FluxioSkillStatusUpdate.h" +#include "RobotAPI/libraries/skills/core/FluxioSubSkillNode.h" #include "RobotAPI/libraries/skills/core/SkillExecutionID.h" #include "RobotAPI/libraries/skills/core/SkillStatusUpdate.h" #include "FluxioExecutor.h" #include "SkillManagerComponentPlugin.h" -namespace armarx::plugins { -class SkillManagerComponentPlugin; // forward declaration - -class FluxioCompositeExecutor : public FluxioExecutor { -public: - FluxioCompositeExecutor(std::string &id, SkillManagerComponentPlugin *plugin, - skills::FluxioSkill *skill); - void run(const std::string& executorName, - armarx::aron::data::DictPtr parameters) override; - void abort() override; - std::optional<std::vector<skills::FluxioSkillStatusUpdate>> - getStatusUpdate() override; - bool validateSkill(skills::FluxioNode *&startNodeId); - -private: - void startSubRoutine(const skills::FluxioNode *startNode, - std::atomic_bool &running, - const std::string &executorName); - void abortSubExecutions(); - std::optional<skills::SkillExecutionID> executionId = std::nullopt; - skills::FluxioSkill *skill; - void setStatus(skills::SkillStatus status) override; - void pollSubStatuses(); - std::map<std::string, FluxioExecutor *> subExecutionsMap = - {}; // key is subSkillNodeId -}; +namespace armarx::plugins +{ + class SkillManagerComponentPlugin; // forward declaration + + class FluxioCompositeExecutor : public FluxioExecutor + { + public: + FluxioCompositeExecutor(std::string& id, + SkillManagerComponentPlugin* plugin, + skills::FluxioSkill* skill); + void run(const std::string& executorName, armarx::aron::data::DictPtr parameters) override; + void abort() override; + std::optional<std::vector<skills::FluxioSkillStatusUpdate>> getStatusUpdate() override; + bool validateSkill(skills::FluxioEdge& ret); + + private: + void startSubRoutine(const skills::FluxioNode* startNode, + const skills::FluxioParameter* startParameter, + std::atomic_bool& running, + const std::string& executorName); + + void handleParameterRoutine(const skills::FluxioParameterNode* parameterNode, + std::atomic_bool& running, + const std::string& executorName); + void handleSubSkillRoutine(const skills::FluxioSubSkillNode* subSkillNode, + std::atomic_bool& running, + const std::string& executorName); + void handleControlRoutine(const skills::FluxioControlNode* controlNode, + const skills::FluxioParameter* startParameter, + std::atomic_bool& running, + const std::string& executorName); + + void abortSubExecutions(); + std::optional<skills::SkillExecutionID> executionId = std::nullopt; + skills::FluxioSkill* skill; + void setStatus(skills::SkillStatus status) override; + void pollSubStatuses(); + std::map<std::string, FluxioExecutor*> subExecutionsMap; // key is subSkillNodeId + }; } // namespace armarx::plugins diff --git a/source/RobotAPI/libraries/skills/manager/FluxioExecutor.h b/source/RobotAPI/libraries/skills/manager/FluxioExecutor.h index fa1195d3f..3e1f5f231 100644 --- a/source/RobotAPI/libraries/skills/manager/FluxioExecutor.h +++ b/source/RobotAPI/libraries/skills/manager/FluxioExecutor.h @@ -16,7 +16,7 @@ namespace armarx::plugins public: virtual ~FluxioExecutor(){}; - FluxioExecutor(std::string& id, SkillManagerComponentPlugin* plugin, bool native) : + FluxioExecutor(const std::string& id, SkillManagerComponentPlugin* plugin, bool native) : id(id), native(native), plugin(plugin){}; virtual void run(const std::string& executorName, armarx::aron::data::DictPtr parameters){}; virtual void abort(){}; @@ -37,7 +37,7 @@ namespace armarx::plugins const bool native; protected: - std::list<skills::FluxioSkillStatusUpdate> statusUpdates = {}; + std::list<skills::FluxioSkillStatusUpdate> statusUpdates; std::string* executorName = nullptr; SkillManagerComponentPlugin* plugin = nullptr; std::optional<skills::FluxioSkillStatusUpdate> status = std::nullopt; diff --git a/source/RobotAPI/libraries/skills/manager/FluxioMergerExecutor.cpp b/source/RobotAPI/libraries/skills/manager/FluxioMergerExecutor.cpp new file mode 100644 index 000000000..161f2eca1 --- /dev/null +++ b/source/RobotAPI/libraries/skills/manager/FluxioMergerExecutor.cpp @@ -0,0 +1,96 @@ +#include "FluxioMergerExecutor.h" + +#include <optional> +#include <thread> +#include <vector> + +#include <ArmarXCore/core/logging/Logging.h> + +#include "RobotAPI/libraries/skills/core/SkillStatusUpdate.h" + +namespace armarx::plugins +{ + FluxioMergerExecutor::FluxioMergerExecutor(const std::string& id, + SkillManagerComponentPlugin* plugin, + const std::vector<std::string>& parameterIds) : + FluxioExecutor(id, plugin, false) + { + for (const auto& id : parameterIds) + { + tokenHasArrivedMap[id] = false; + } + } + + void + FluxioMergerExecutor::run(const std::string& /*unused*/, + armarx::aron::data::DictPtr /*unused*/) + { + // set status running + setStatus(skills::SkillStatus::Running); + + bool running = true; + while (running) + { + std::this_thread::sleep_for(std::chrono::milliseconds(250)); + + // check if all tokens have arrived + bool allTokensArrived = true; + for (const auto& [id, hasArrived] : tokenHasArrivedMap) + { + if (!hasArrived) + { + allTokensArrived = false; + break; + } + } + + if (status->status == skills::SkillStatus::Aborted || + status->status == skills::SkillStatus::Failed || + status->status == skills::SkillStatus::Succeeded) + { + running = false; + } + else if (allTokensArrived) + { + // set status succeeded + setStatus(skills::SkillStatus::Succeeded); + running = false; + } + } + } + + void + FluxioMergerExecutor::abort() + { + setStatus(skills::SkillStatus::Aborted); + } + + void + FluxioMergerExecutor::checkInToken(const std::string& parameterId) + { + const auto it = tokenHasArrivedMap.find(parameterId); + if (it == tokenHasArrivedMap.end()) + { + ARMARX_WARNING << "Unexpected parameterId: " << parameterId; + } + else + { + tokenHasArrivedMap[parameterId] = true; + } + } + + std::optional<std::vector<skills::FluxioSkillStatusUpdate>> + FluxioMergerExecutor::getStatusUpdate() + { + // unused method + return std::nullopt; + } + + void + FluxioMergerExecutor::setStatus(skills::SkillStatus status) + { + const skills::FluxioSkillStatusUpdate update = { + armarx::DateTime::Now(), this->id, "noIdNeeded", status}; + this->status = update; + } +} // namespace armarx::plugins diff --git a/source/RobotAPI/libraries/skills/manager/FluxioMergerExecutor.h b/source/RobotAPI/libraries/skills/manager/FluxioMergerExecutor.h new file mode 100644 index 000000000..50f218bae --- /dev/null +++ b/source/RobotAPI/libraries/skills/manager/FluxioMergerExecutor.h @@ -0,0 +1,31 @@ +#pragma once + +#include <optional> +#include <string> + +#include "RobotAPI/libraries/skills/core/FluxioSkillStatusUpdate.h" + +#include "FluxioExecutor.h" +#include "SkillManagerComponentPlugin.h" + +namespace armarx::plugins +{ + class SkillManagerComponentPlugin; // forward declaration + + class FluxioMergerExecutor : public FluxioExecutor + { + public: + FluxioMergerExecutor(const std::string& id, + SkillManagerComponentPlugin* plugin, + const std::vector<std::string>& parameterIds); + + void run(const std::string& executorName, armarx::aron::data::DictPtr parameters) override; + void abort() override; + void checkInToken(const std::string& parameterId); + std::optional<std::vector<skills::FluxioSkillStatusUpdate>> getStatusUpdate() override; + + private: + void setStatus(skills::SkillStatus status) override; + std::map<std::string, bool> tokenHasArrivedMap; // keys are parameterIds + }; +} // namespace armarx::plugins diff --git a/source/RobotAPI/libraries/skills/manager/FluxioNativeExecutor.cpp b/source/RobotAPI/libraries/skills/manager/FluxioNativeExecutor.cpp index aaecde0ba..f02a0130a 100644 --- a/source/RobotAPI/libraries/skills/manager/FluxioNativeExecutor.cpp +++ b/source/RobotAPI/libraries/skills/manager/FluxioNativeExecutor.cpp @@ -4,9 +4,7 @@ #include <ArmarXCore/core/logging/Logging.h> -#include "RobotAPI/libraries/skills/core/FluxioSkillStatusUpdate.h" #include "RobotAPI/libraries/skills/core/ProviderID.h" -#include "RobotAPI/libraries/skills/core/SkillExecutionID.h" #include "RobotAPI/libraries/skills/core/SkillExecutionRequest.h" #include "RobotAPI/libraries/skills/core/SkillID.h" diff --git a/source/RobotAPI/libraries/skills/manager/FluxioNativeExecutor.h b/source/RobotAPI/libraries/skills/manager/FluxioNativeExecutor.h index ef5f363c4..7d56f06cf 100644 --- a/source/RobotAPI/libraries/skills/manager/FluxioNativeExecutor.h +++ b/source/RobotAPI/libraries/skills/manager/FluxioNativeExecutor.h @@ -1,6 +1,5 @@ #pragma once -#include <functional> #include <optional> #include <string> -- GitLab