Skip to content
Snippets Groups Projects
Commit 53d0ba50 authored by Julian Tusch's avatar Julian Tusch :no_entry_sign:
Browse files

draft: abort fluxio skill

parent 71ce3d66
No related branches found
No related tags found
4 merge requests!460Draft: fluxio/dev-skill-timeout,!449Fluxio preliminary release,!448"fluxio composite executor",!446Draft: Fluxio related changes
Showing with 106 additions and 2 deletions
......@@ -28,7 +28,7 @@ namespace armarx::plugins
FluxioCompositeExecutor::run(std::string executorName, armarx::aron::data::DictPtr parameters)
{
// std::string NewExecutorName = skill->name;
std::vector<std::string> subExecutions;
subExecutions.clear();
if (skill == nullptr)
{
......@@ -263,6 +263,24 @@ namespace armarx::plugins
statusUpdates.push_front(update);
}
void
FluxioCompositeExecutor::abort()
{
ARMARX_WARNING << "Aborting skill " << skill->name;
if(plugin == nullptr)
{
ARMARX_WARNING << "Plugin is not set, cannot abort subskills.";
return;
}
// abort all subskills
for (const auto& eid : subExecutions)
{
plugin->abortFluxioSkill(eid);
}
}
std::optional<std::vector<skills::FluxioSkillStatusUpdate>>
FluxioCompositeExecutor::getStatusUpdate()
{
......
......@@ -18,8 +18,10 @@ namespace armarx::plugins
{
public:
FluxioCompositeExecutor(std::string& id,
SkillManagerComponentPlugin* plugin, skills::FluxioSkill* skill);
SkillManagerComponentPlugin* plugin,
skills::FluxioSkill* skill);
void run(std::string executorName, armarx::aron::data::DictPtr parameters) override;
void abort() override;
std::optional<std::vector<skills::FluxioSkillStatusUpdate>> getStatusUpdate() override;
// TODO: abort etc.
......@@ -27,5 +29,6 @@ namespace armarx::plugins
private:
std::optional<skills::SkillExecutionID> executionId = std::nullopt;
skills::FluxioSkill* skill;
std::vector<std::string> subExecutions = {};
};
} // namespace armarx::plugins
......@@ -19,6 +19,7 @@ namespace armarx::plugins
FluxioExecutor(std::string& id, SkillManagerComponentPlugin* plugin, bool native) :
native(native), id(id), plugin(plugin){};
virtual void run(std::string executorName, armarx::aron::data::DictPtr parameters){};
virtual void abort(){};
virtual std::optional<std::vector<skills::FluxioSkillStatusUpdate>>
getStatusUpdate()
......
......@@ -31,6 +31,19 @@ namespace armarx::plugins
this->executionId = eid;
}
void
FluxioNativeExecutor::abort()
{
if (!this->executionId.has_value() || this->plugin == nullptr)
{
// error
ARMARX_WARNING << "Execution ID or plugin is not set";
return;
}
this->plugin->abortSkill(this->executionId.value());
}
std::optional<std::vector<skills::FluxioSkillStatusUpdate>>
FluxioNativeExecutor::getStatusUpdate()
{
......
......@@ -20,6 +20,7 @@ namespace armarx::plugins
SkillManagerComponentPlugin* plugin,
skills::SkillID& skillId);
void run(std::string executorName, armarx::aron::data::DictPtr parameters) override;
void abort() override;
std::optional<std::vector<skills::FluxioSkillStatusUpdate>> getStatusUpdate() override;
// TODO: abort etc.
......
......@@ -658,6 +658,63 @@ namespace armarx::plugins
return executionId;
}
void
SkillManagerComponentPlugin::abortFluxioSkill(const std::string& executionId)
{
const auto& executorIt = fluxioDC.fluxioExecutors.find(executionId);
if (executorIt == fluxioDC.fluxioExecutors.end())
{
ARMARX_WARNING << "Execution with id '" << executionId << "' not found.";
return;
}
FluxioExecutor* executorPtr = executorIt->second;
if (executorPtr->native)
{
FluxioNativeExecutor* castedExecutor = nullptr;
try
{
castedExecutor = dynamic_cast<FluxioNativeExecutor*>(executorPtr);
if (castedExecutor == nullptr)
{
ARMARX_WARNING << "Executor with id '" << executionId
<< "' is not a FluxioNativeExecutor.";
return;
}
}
catch (const std::bad_cast& e)
{
ARMARX_WARNING << "Error while getting execution runner for fluxio skill with id "
<< executionId << ": " << e.what();
return;
}
castedExecutor->abort();
}
else
{
FluxioCompositeExecutor* castedExecutor = nullptr;
try
{
castedExecutor = dynamic_cast<FluxioCompositeExecutor*>(executorPtr);
if (castedExecutor == nullptr)
{
ARMARX_WARNING << "Executor with id '" << executionId
<< "' is not a FluxioCompositeExecutor.";
return;
}
}
catch (const std::bad_cast& e)
{
ARMARX_WARNING << "Error while getting execution runner for fluxio skill with id "
<< executionId << ": " << e.what();
return;
}
castedExecutor->abort();
}
}
std::optional<std::vector<skills::FluxioSkillStatusUpdate>>
SkillManagerComponentPlugin::getFluxioSkillExecutionStatus(const std::string& executionId)
{
......
......@@ -77,6 +77,8 @@ namespace armarx::plugins
std::optional<std::string> executeFluxioSkill(std::string skillId,
std::string executorName = "Fluxio");
void abortFluxioSkill(const std::string& executionId);
std::optional<std::vector<skills::FluxioSkillStatusUpdate>>
getFluxioSkillExecutionStatus(const std::string& executionId);
......
......@@ -169,6 +169,12 @@ namespace armarx
return {};
}
void
SkillManagerComponentPluginUser::abortFluxioSkill(const std::string& executionId, const Ice::Current& current)
{
this->plugin->abortFluxioSkill(executionId);
}
IceUtil::Optional<skills::manager::dto::FluxioSkillStatusUpdateList>
SkillManagerComponentPluginUser::getFluxioSkillExecutionStatus(const std::string& executionId,
const Ice::Current& current)
......
......@@ -71,6 +71,9 @@ namespace armarx
IceUtil::Optional<std::string> executeFluxioSkill(const std::string& skillId,
const Ice::Current& current) override;
void
abortFluxioSkill(const std::string& executionId, const Ice::Current& current) override;
IceUtil::Optional<skills::manager::dto::FluxioSkillStatusUpdateList> getFluxioSkillExecutionStatus(const std::string& executionId, const Ice::Current& current) override;
skills::manager::dto::FluxioSkillList getSkillList(const Ice::Current& current) override;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment