Skip to content
GitLab
Explore
Sign in
Primary navigation
Search or go to…
Project
R
RobotAPI
Manage
Activity
Members
Labels
Plan
Issues
Issue boards
Milestones
Code
Merge requests
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Build
Pipelines
Jobs
Pipeline schedules
Artifacts
Deploy
Container Registry
Model registry
Help
Help
Support
GitLab documentation
Compare GitLab plans
Community forum
Contribute to GitLab
Provide feedback
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
Software
ArmarX
RobotAPI
Merge requests
!395
Skills: Optionals in manager gui
Code
Review changes
Check out branch
Download
Patches
Plain diff
Merged
Skills: Optionals in manager gui
skills/optionals-in-manager-gui
into
master
Overview
1
Commits
2
Pipelines
1
Changes
4
Merged
Rainer Kartmann
requested to merge
skills/optionals-in-manager-gui
into
master
1 year ago
Overview
1
Commits
2
Pipelines
1
Changes
4
Expand
0
0
Merge request reports
Viewing commit
058bee70
Prev
Next
Show latest version
4 files
+
272
−
137
Inline
Compare changes
Side-by-side
Inline
Show whitespace changes
Show one file at a time
Files
4
Search (e.g. *.vue) (Ctrl+P)
058bee70
Merge remote-tracking branch 'origin/master' into skills/optionals-in-manager-gui
· 058bee70
Fabian Tërnava
authored
1 year ago
source/RobotAPI/libraries/armem/client/ReadStream.cpp
+
134
−
103
Options
#include
"ReadStream.h"
#include
<ArmarXCore/core/time/Clock.h>
#include
<RobotAPI/libraries/armem/client/query/Builder.h>
namespace
armarx
::
armem
::
client
{
std
::
optional
<
wm
::
EntitySnapshot
>
ReadStream
::
stream
(
const
Reader
&
reader
,
const
MemoryID
&
queriedId
,
CallbackT
&
callback
,
const
armarx
::
core
::
time
::
Frequency
&
maxFreq
)
ReadStream
::
ReadStream
()
:
metronome
{
armarx
::
core
::
time
::
Frequency
::
Hertz
(
10
)}
{
// noone else is allowed to open a stream while this stream is running
std
::
scoped_lock
l
(
runningMutex
);
}
armarx
::
core
::
time
::
Metronome
metronome
(
maxFreq
);
auto
timeStart
=
armarx
::
core
::
time
::
Clock
::
Now
();
ReadStream
::
ReadStream
(
const
Reader
&
reader
,
const
MemoryID
&
queriedId
,
const
core
::
time
::
Frequency
&
maxPollFrequency
)
:
reader
{
reader
},
queriedId
{
queriedId
},
metronome
{
maxPollFrequency
}
{
timeStart
=
armarx
::
core
::
time
::
Clock
::
Now
();
}
bool
callbackReturnedFalse
=
false
;
while
(
not
streamStoppedExternally
&&
not
callbackReturnedFalse
)
std
::
optional
<
wm
::
EntitySnapshot
>
ReadStream
::
pollBlocking
(
const
SnapshotCallbackT
&
callback
)
{
if
(
isPolling
.
exchange
(
true
))
{
// make sure to not busy wait. Also wait until probably data is available in first iteration
metronome
.
waitForNextTick
();
auto
timeEnd
=
armarx
::
core
::
time
::
Clock
::
Now
();
auto
makeQuery
=
[
&
timeStart
,
&
timeEnd
](
const
MemoryID
&
id
)
{
query
::
Builder
qb
;
query
::
CoreSegmentSelector
&
core
=
id
.
hasCoreSegmentName
()
?
qb
.
coreSegments
().
withID
(
id
)
:
qb
.
coreSegments
().
all
();
query
::
ProviderSegmentSelector
&
prov
=
id
.
hasProviderSegmentName
()
?
core
.
providerSegments
().
withID
(
id
)
:
core
.
providerSegments
().
all
();
query
::
EntitySelector
&
entity
=
id
.
hasEntityName
()
?
prov
.
entities
().
withID
(
id
)
:
prov
.
entities
().
all
();
entity
.
snapshots
().
timeRange
(
timeStart
,
timeEnd
);
throw
armarx
::
armem
::
error
::
ReadStreamAlreadyPolling
(
queriedId
,
__PRETTY_FUNCTION__
);
}
pollingStoppedExternally
=
false
;
return
qb
.
buildQueryInput
();
};
auto
result
=
_pollBlocking
(
callback
);
auto
query
=
makeQuery
(
queriedId
);
isPolling
=
false
;
return
result
;
}
auto
result
=
reader
.
query
(
query
);
void
ReadStream
::
pollAsync
(
const
SnapshotCallbackT
&
callback
)
{
if
(
isPolling
.
exchange
(
true
))
{
throw
armarx
::
armem
::
error
::
ReadStreamAlreadyPolling
(
queriedId
,
__PRETTY_FUNCTION__
);
}
pollingStoppedExternally
=
false
;
if
(
result
.
success
)
{
using
EntitySnapshotReference
=
std
::
reference_wrapper
<
armarx
::
armem
::
wm
::
EntitySnapshot
>
;
// copy references of snapshots into vector to sort them
std
::
vector
<
EntitySnapshotReference
>
snapshots
;
result
.
memory
.
forEachSnapshot
(
[
&
snapshots
](
armarx
::
armem
::
wm
::
EntitySnapshot
&
snapshot
)
{
snapshots
.
push_back
(
snapshot
);
});
// sort correctly
std
::
sort
(
snapshots
.
begin
(),
snapshots
.
end
(),
[](
const
EntitySnapshotReference
&
a
,
const
EntitySnapshotReference
&
b
)
{
return
a
.
get
().
id
().
timestamp
<
b
.
get
().
id
().
timestamp
;
});
for
(
const
auto
&
snapshot
:
snapshots
)
{
// assert times in correct interval
ARMARX_CHECK_LESS_EQUAL
(
timeStart
,
snapshot
.
get
().
id
().
timestamp
);
ARMARX_CHECK_GREATER_EQUAL
(
timeEnd
,
snapshot
.
get
().
id
().
timestamp
);
if
(
!
callback
(
snapshot
.
get
()))
{
return
snapshot
;
}
}
this
->
pollingThread
=
std
::
thread
([
&
]()
{
this
->
_pollBlocking
(
callback
);
});
}
timeStart
=
timeEnd
+
armarx
::
core
::
time
::
Duration
::
MicroSeconds
(
1
);
}
else
std
::
optional
<
wm
::
EntitySnapshot
>
ReadStream
::
_pollBlocking
(
const
SnapshotCallbackT
&
callback
)
{
while
(
not
pollingStoppedExternally
)
{
auto
snapshot
=
pollOnce
(
callback
);
if
(
snapshot
.
has_value
())
{
ARMARX_ERROR
<<
deactivateSpam
()
<<
"Received an error in ReadStream when querying data from a "
"memory. The error was '"
<<
result
.
errorMessage
<<
"'. Continue with stream, perhaps the memory was not yet initialized."
;
return
snapshot
;
}
}
return
std
::
nullopt
;
}
void
ReadStream
::
openInBackground
(
const
Reader
&
reader
,
const
MemoryID
&
queriedId
,
CallbackT
&
continueIf
,
const
armarx
::
core
::
time
::
Frequency
&
maxFreq
)
ReadStream
::
stop
()
{
pollingStoppedExternally
=
true
;
if
(
pollingThread
.
joinable
())
{
pollingThread
.
join
();
isPolling
=
false
;
}
}
std
::
optional
<
wm
::
EntitySnapshot
>
ReadStream
::
pollOnce
(
const
SnapshotCallbackT
&
callback
)
{
std
::
scoped_lock
r
(
runningMutex
);
std
::
scoped_lock
l
(
threadMutex
);
if
(
isPolling
.
exchange
(
true
))
{
throw
armarx
::
armem
::
error
::
ReadStreamAlreadyPolling
(
queriedId
,
__PRETTY_FUNCTION__
);
}
streaming
=
true
;
streamStoppedExternally
=
true
;
auto
snapshot
=
_pollOnce
(
callback
);
// thread is part of this streamreader. Do not detach
this
->
runningThread
=
std
::
thread
([
&
]()
{
this
->
stream
(
reader
,
queriedId
,
continueIf
,
maxFreq
);
});
isPolling
=
false
;
return
snapshot
;
}
std
::
optional
<
wm
::
EntitySnapshot
>
ReadStream
::
open
(
const
Reader
&
reader
,
const
MemoryID
&
queriedId
,
CallbackT
&
continueIf
,
const
armarx
::
core
::
time
::
Frequency
&
maxFreq
)
ReadStream
::
_pollOnce
(
const
SnapshotCallbackT
&
callback
)
{
std
::
unique_lock
r
(
runningMutex
);
std
::
scoped_lock
l
(
threadMutex
);
// Make sure to not busy wait. Also wait until probably data is available in first iteration.
metronome
.
waitForNextTick
(
);
streaming
=
true
;
streamStoppedExternally
=
true
;
auto
timeEnd
=
armarx
::
core
::
time
::
Clock
::
Now
();
auto
makeQuery
=
[
this
,
&
timeEnd
](
const
MemoryID
&
id
)
{
query
::
Builder
qb
;
query
::
CoreSegmentSelector
&
core
=
id
.
hasCoreSegmentName
()
?
qb
.
coreSegments
().
withID
(
id
)
:
qb
.
coreSegments
().
all
();
query
::
ProviderSegmentSelector
&
prov
=
id
.
hasProviderSegmentName
()
?
core
.
providerSegments
().
withID
(
id
)
:
core
.
providerSegments
().
all
();
query
::
EntitySelector
&
entity
=
id
.
hasEntityName
()
?
prov
.
entities
().
withID
(
id
)
:
prov
.
entities
().
all
();
entity
.
snapshots
().
timeRange
(
timeStart
,
timeEnd
);
std
::
optional
<
wm
::
EntitySnapshot
>
ret
=
std
::
nullopt
;
return
qb
.
buildQueryInput
();
};
// thread is part of this streamreader. Do not detach
this
->
runningThread
=
std
::
thread
([
&
]()
{
ret
=
this
->
stream
(
reader
,
queriedId
,
continueIf
,
maxFreq
);
});
auto
query
=
makeQuery
(
queriedId
);
r
.
unlock
();
// we keep the threadMutex to make sure that noone is replacing the thread member
auto
result
=
reader
.
query
(
query
);
if
(
this
->
runningThread
.
joinable
()
)
if
(
result
.
success
)
{
// wait until finished
this
->
runningThread
.
join
();
using
EntitySnapshotReference
=
std
::
reference_wrapper
<
armarx
::
armem
::
wm
::
EntitySnapshot
>
;
// Copy references of snapshots into vector to sort them.
std
::
vector
<
EntitySnapshotReference
>
snapshots
;
result
.
memory
.
forEachSnapshot
([
&
snapshots
](
armarx
::
armem
::
wm
::
EntitySnapshot
&
snapshot
)
{
snapshots
.
push_back
(
snapshot
);
});
// Sort correctly.
std
::
sort
(
snapshots
.
begin
(),
snapshots
.
end
(),
[](
const
EntitySnapshotReference
&
a
,
const
EntitySnapshotReference
&
b
)
{
return
a
.
get
().
id
().
timestamp
<
b
.
get
().
id
().
timestamp
;
});
// Determine the next start time.
DateTime
nextStart
;
if
(
snapshots
.
size
()
>
0
)
{
// Because they are sorted, back() has the highest time stamp.
nextStart
=
snapshots
.
back
().
get
().
id
().
timestamp
+
armarx
::
core
::
time
::
Duration
::
MicroSeconds
(
1
);
}
else
{
nextStart
=
timeStart
;
}
// Call the callback on all snapshots.
for
(
const
auto
&
snapshot
:
snapshots
)
{
// Assert times in correct interval.
ARMARX_CHECK_LESS_EQUAL
(
timeStart
,
snapshot
.
get
().
id
().
timestamp
);
ARMARX_CHECK_GREATER_EQUAL
(
timeEnd
,
snapshot
.
get
().
id
().
timestamp
);
const
bool
continue_
=
callback
(
snapshot
.
get
());
if
(
not
continue_
)
{
return
snapshot
;
}
}
timeStart
=
nextStart
;
}
else
{
ARMARX_WARNING
<<
deactivateSpam
()
<<
"Received an error in ReadStream when querying data from a "
"memory. The error was '"
<<
result
.
errorMessage
<<
"'. Continue with stream, perhaps the memory was not yet initialized."
;
}
return
re
t
;
return
std
::
nullop
t
;
}
void
ReadStream
::
close
()
{
this
->
streamStoppedExternally
=
true
;
}
}
// namespace armarx::armem::client
Loading