Skip to content
Snippets Groups Projects
Commit 77ca98b6 authored by Julian Tusch's avatar Julian Tusch :no_entry_sign:
Browse files

fixed more errors

parent 19de5282
No related branches found
No related tags found
2 merge requests!459fluxio/dev Pointer Shenanigans,!449Fluxio preliminary release
Pipeline #19673 passed
This commit is part of merge request !449. Comments created here will be created in the context of that merge request.
......@@ -286,6 +286,12 @@ namespace armarx
std::vector<std::string> nodeIdsToDelete = {};
for (const auto& [nodeId, nodePtr] : nodes)
{
if (nodePtr == nullptr)
{
ARMARX_WARNING << "Unexpected nullptr!";
continue;
}
if (nodePtr->nodeType != FluxioNodeType::SUBSKILL)
{
continue;
......@@ -299,7 +305,7 @@ namespace armarx
continue;
}
if (subSkillNodePtr->skillPtr != nullptr)
if (subSkillNodePtr->skillPtr == nullptr)
{
ARMARX_WARNING << "SkillPtr for SubSkillNode is not set";
continue;
......
......@@ -279,7 +279,7 @@ namespace armarx::skills
statusUpdate = statusUpdateIt.value();
// did the status change? update statusUpdates list
std::shared_lock statusMapLock_shared(statusUpdatesMutex);
std::unique_lock statusMapLock(statusUpdatesMutex);
const auto& lastUpdate =
std::find_if(statusUpdates.begin(),
statusUpdates.end(),
......@@ -288,19 +288,14 @@ namespace armarx::skills
if (lastUpdate == statusUpdates.end() || lastUpdate->status != statusUpdate.status)
{
statusMapLock_shared.unlock();
std::unique_lock statusMapLock_unique(statusUpdatesMutex);
statusUpdates.push_front({armarx::DateTime::Now(),
executorPtr->id,
subSkillNode->nodeId,
statusUpdate.status});
statusMapLock_unique.unlock();
}
else
{
statusMapLock_shared.unlock();
}
statusMapLock.unlock();
// check subskill is finished
if (statusUpdate.status == skills::SkillStatus::Succeeded ||
statusUpdate.status == skills::SkillStatus::Failed ||
......@@ -552,7 +547,7 @@ namespace armarx::skills
void
FluxioCompositeExecutor::pollSubStatuses()
{
std::shared_lock l(subExecutionsMapMutex);
std::scoped_lock l(subExecutionsMapMutex, statusUpdatesMutex);
for (const auto& [nodeId, executorPtr] : subExecutionsMap)
{
executorPtr->getStatusUpdate();
......@@ -562,23 +557,19 @@ namespace armarx::skills
continue;
}
std::shared_lock statusMapLock_shared(statusUpdatesMutex);
const auto& lastStatus =
find_if(statusUpdates.begin(),
statusUpdates.end(),
[&](const skills::FluxioSkillStatusUpdate& statusUpdate)
{ return statusUpdate.subSkillNodeId == nodeId; })
->status;
statusMapLock_shared.unlock();
if (lastStatus != s->status)
{
std::unique_lock statusMapLock_unique(statusUpdatesMutex);
statusUpdates.push_front(
{armarx::DateTime::Now(), executorPtr->id, nodeId, s->status});
statusMapLock_unique.unlock();
}
}
l.unlock();
}
bool
......
......@@ -2,7 +2,6 @@
#include <mutex>
#include <optional>
#include <shared_mutex>
#include <thread>
#include <vector>
......@@ -72,21 +71,20 @@ namespace armarx::skills
void
FluxioMergerExecutor::checkInToken(const std::string& parameterId)
{
std::shared_lock l_shared(tokenHasArrivedMapMutex);
std::unique_lock l(tokenHasArrivedMapMutex);
const auto it = tokenHasArrivedMap.find(parameterId);
if (it == tokenHasArrivedMap.end())
{
l_shared.unlock();
ARMARX_WARNING << "Unexpected parameterId: " << parameterId;
}
else
{
l_shared.unlock();
ARMARX_WARNING << "Fluxio Merger received token for parameterId: " << parameterId;
std::unique_lock l_unique(tokenHasArrivedMapMutex);
tokenHasArrivedMap[parameterId] = true;
l_unique.unlock();
}
l.unlock();
}
std::optional<std::vector<skills::FluxioSkillStatusUpdate>>
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment