Skip to content
Snippets Groups Projects
Commit aff12d8d authored by Julian Tusch's avatar Julian Tusch :no_entry_sign:
Browse files

draft: merger logic

parent e3f73701
No related branches found
No related tags found
3 merge requests!460Draft: fluxio/dev-skill-timeout,!456Fluxio/dev control token special nodes,!449Fluxio preliminary release
Pipeline #19148 passed
This commit is part of merge request !449. Comments created here will be created in the context of that merge request.
Showing with 729 additions and 419 deletions
......@@ -19,7 +19,7 @@ namespace armarx
std::string type;
bool required = true;
bool isInput;
std::list<FluxioValue> values = {};
std::list<FluxioValue> values;
manager::dto::FluxioParameter toManagerIce() const;
manager::dto::FluxioIdentificator toFluxioIdentificatorIce() const;
......
......@@ -21,12 +21,14 @@ armarx_add_library(
FluxioExecutor.cpp
FluxioNativeExecutor.cpp
FluxioCompositeExecutor.cpp
FluxioMergerExecutor.cpp
HEADERS
SkillManagerComponentPlugin.h
SkillManagerComponentPluginUser.h
FluxioExecutor.h
FluxioNativeExecutor.h
FluxioCompositeExecutor.h
FluxioMergerExecutor.h
)
add_library(RobotAPI::skills::manager ALIAS RobotAPISkillsManager)
......@@ -4,38 +4,55 @@
#include <optional>
#include <string>
#include "RobotAPI/libraries/skills/core/FluxioControlNode.h"
#include "RobotAPI/libraries/skills/core/FluxioParameter.h"
#include "RobotAPI/libraries/skills/core/FluxioParameterNode.h"
#include "RobotAPI/libraries/skills/core/FluxioSkill.h"
#include "RobotAPI/libraries/skills/core/FluxioSkillStatusUpdate.h"
#include "RobotAPI/libraries/skills/core/FluxioSubSkillNode.h"
#include "RobotAPI/libraries/skills/core/SkillExecutionID.h"
#include "RobotAPI/libraries/skills/core/SkillStatusUpdate.h"
#include "FluxioExecutor.h"
#include "SkillManagerComponentPlugin.h"
namespace armarx::plugins {
class SkillManagerComponentPlugin; // forward declaration
class FluxioCompositeExecutor : public FluxioExecutor {
public:
FluxioCompositeExecutor(std::string &id, SkillManagerComponentPlugin *plugin,
skills::FluxioSkill *skill);
void run(const std::string& executorName,
armarx::aron::data::DictPtr parameters) override;
void abort() override;
std::optional<std::vector<skills::FluxioSkillStatusUpdate>>
getStatusUpdate() override;
bool validateSkill(skills::FluxioNode *&startNodeId);
private:
void startSubRoutine(const skills::FluxioNode *startNode,
std::atomic_bool &running,
const std::string &executorName);
void abortSubExecutions();
std::optional<skills::SkillExecutionID> executionId = std::nullopt;
skills::FluxioSkill *skill;
void setStatus(skills::SkillStatus status) override;
void pollSubStatuses();
std::map<std::string, FluxioExecutor *> subExecutionsMap =
{}; // key is subSkillNodeId
};
namespace armarx::plugins
{
class SkillManagerComponentPlugin; // forward declaration
class FluxioCompositeExecutor : public FluxioExecutor
{
public:
FluxioCompositeExecutor(std::string& id,
SkillManagerComponentPlugin* plugin,
skills::FluxioSkill* skill);
void run(const std::string& executorName, armarx::aron::data::DictPtr parameters) override;
void abort() override;
std::optional<std::vector<skills::FluxioSkillStatusUpdate>> getStatusUpdate() override;
bool validateSkill(skills::FluxioEdge& ret);
private:
void startSubRoutine(const skills::FluxioNode* startNode,
const skills::FluxioParameter* startParameter,
std::atomic_bool& running,
const std::string& executorName);
void handleParameterRoutine(const skills::FluxioParameterNode* parameterNode,
std::atomic_bool& running,
const std::string& executorName);
void handleSubSkillRoutine(const skills::FluxioSubSkillNode* subSkillNode,
std::atomic_bool& running,
const std::string& executorName);
void handleControlRoutine(const skills::FluxioControlNode* controlNode,
const skills::FluxioParameter* startParameter,
std::atomic_bool& running,
const std::string& executorName);
void abortSubExecutions();
std::optional<skills::SkillExecutionID> executionId = std::nullopt;
skills::FluxioSkill* skill;
void setStatus(skills::SkillStatus status) override;
void pollSubStatuses();
std::map<std::string, FluxioExecutor*> subExecutionsMap; // key is subSkillNodeId
};
} // namespace armarx::plugins
......@@ -16,7 +16,7 @@ namespace armarx::plugins
public:
virtual ~FluxioExecutor(){};
FluxioExecutor(std::string& id, SkillManagerComponentPlugin* plugin, bool native) :
FluxioExecutor(const std::string& id, SkillManagerComponentPlugin* plugin, bool native) :
id(id), native(native), plugin(plugin){};
virtual void run(const std::string& executorName, armarx::aron::data::DictPtr parameters){};
virtual void abort(){};
......@@ -37,7 +37,7 @@ namespace armarx::plugins
const bool native;
protected:
std::list<skills::FluxioSkillStatusUpdate> statusUpdates = {};
std::list<skills::FluxioSkillStatusUpdate> statusUpdates;
std::string* executorName = nullptr;
SkillManagerComponentPlugin* plugin = nullptr;
std::optional<skills::FluxioSkillStatusUpdate> status = std::nullopt;
......
#include "FluxioMergerExecutor.h"
#include <optional>
#include <thread>
#include <vector>
#include <ArmarXCore/core/logging/Logging.h>
#include "RobotAPI/libraries/skills/core/SkillStatusUpdate.h"
namespace armarx::plugins
{
FluxioMergerExecutor::FluxioMergerExecutor(const std::string& id,
SkillManagerComponentPlugin* plugin,
const std::vector<std::string>& parameterIds) :
FluxioExecutor(id, plugin, false)
{
for (const auto& id : parameterIds)
{
tokenHasArrivedMap[id] = false;
}
}
void
FluxioMergerExecutor::run(const std::string& /*unused*/,
armarx::aron::data::DictPtr /*unused*/)
{
// set status running
setStatus(skills::SkillStatus::Running);
bool running = true;
while (running)
{
std::this_thread::sleep_for(std::chrono::milliseconds(250));
// check if all tokens have arrived
bool allTokensArrived = true;
for (const auto& [id, hasArrived] : tokenHasArrivedMap)
{
if (!hasArrived)
{
allTokensArrived = false;
break;
}
}
if (status->status == skills::SkillStatus::Aborted ||
status->status == skills::SkillStatus::Failed ||
status->status == skills::SkillStatus::Succeeded)
{
running = false;
}
else if (allTokensArrived)
{
// set status succeeded
setStatus(skills::SkillStatus::Succeeded);
running = false;
}
}
}
void
FluxioMergerExecutor::abort()
{
setStatus(skills::SkillStatus::Aborted);
}
void
FluxioMergerExecutor::checkInToken(const std::string& parameterId)
{
const auto it = tokenHasArrivedMap.find(parameterId);
if (it == tokenHasArrivedMap.end())
{
ARMARX_WARNING << "Unexpected parameterId: " << parameterId;
}
else
{
tokenHasArrivedMap[parameterId] = true;
}
}
std::optional<std::vector<skills::FluxioSkillStatusUpdate>>
FluxioMergerExecutor::getStatusUpdate()
{
// unused method
return std::nullopt;
}
void
FluxioMergerExecutor::setStatus(skills::SkillStatus status)
{
const skills::FluxioSkillStatusUpdate update = {
armarx::DateTime::Now(), this->id, "noIdNeeded", status};
this->status = update;
}
} // namespace armarx::plugins
#pragma once
#include <optional>
#include <string>
#include "RobotAPI/libraries/skills/core/FluxioSkillStatusUpdate.h"
#include "FluxioExecutor.h"
#include "SkillManagerComponentPlugin.h"
namespace armarx::plugins
{
class SkillManagerComponentPlugin; // forward declaration
class FluxioMergerExecutor : public FluxioExecutor
{
public:
FluxioMergerExecutor(const std::string& id,
SkillManagerComponentPlugin* plugin,
const std::vector<std::string>& parameterIds);
void run(const std::string& executorName, armarx::aron::data::DictPtr parameters) override;
void abort() override;
void checkInToken(const std::string& parameterId);
std::optional<std::vector<skills::FluxioSkillStatusUpdate>> getStatusUpdate() override;
private:
void setStatus(skills::SkillStatus status) override;
std::map<std::string, bool> tokenHasArrivedMap; // keys are parameterIds
};
} // namespace armarx::plugins
......@@ -4,9 +4,7 @@
#include <ArmarXCore/core/logging/Logging.h>
#include "RobotAPI/libraries/skills/core/FluxioSkillStatusUpdate.h"
#include "RobotAPI/libraries/skills/core/ProviderID.h"
#include "RobotAPI/libraries/skills/core/SkillExecutionID.h"
#include "RobotAPI/libraries/skills/core/SkillExecutionRequest.h"
#include "RobotAPI/libraries/skills/core/SkillID.h"
......
#pragma once
#include <functional>
#include <optional>
#include <string>
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment