mirror of
https://github.com/opencv/opencv.git
synced 2025-07-26 07:07:37 +08:00
Implemented basic frame drop functionality
This commit is contained in:
parent
b19f860384
commit
716bdd5ee5
30
modules/gapi/include/opencv2/gapi/streaming/sync.hpp
Normal file
30
modules/gapi/include/opencv2/gapi/streaming/sync.hpp
Normal file
@ -0,0 +1,30 @@
|
|||||||
|
// This file is part of OpenCV project.
|
||||||
|
// It is subject to the license terms in the LICENSE file found in the top-level directory
|
||||||
|
// of this distribution and at http://opencv.org/license.html.
|
||||||
|
//
|
||||||
|
// Copyright (C) 2021 Intel Corporation
|
||||||
|
|
||||||
|
#ifndef OPENCV_GAPI_STREAMING_SYNC_HPP
|
||||||
|
#define OPENCV_GAPI_STREAMING_SYNC_HPP
|
||||||
|
|
||||||
|
namespace cv {
|
||||||
|
namespace gapi {
|
||||||
|
namespace streaming {
|
||||||
|
|
||||||
|
enum class sync_policy {
|
||||||
|
dont_sync,
|
||||||
|
drop
|
||||||
|
};
|
||||||
|
|
||||||
|
} // namespace streaming
|
||||||
|
} // namespace gapi
|
||||||
|
|
||||||
|
namespace detail {
|
||||||
|
template<> struct CompileArgTag<gapi::streaming::sync_policy> {
|
||||||
|
static const char* tag() { return "gapi.streaming.sync_policy"; }
|
||||||
|
};
|
||||||
|
|
||||||
|
} // namespace detail
|
||||||
|
} // namespace cv
|
||||||
|
|
||||||
|
#endif // OPENCV_GAPI_STREAMING_SYNC_HPP
|
@ -24,6 +24,9 @@
|
|||||||
|
|
||||||
#include "executor/gstreamingexecutor.hpp"
|
#include "executor/gstreamingexecutor.hpp"
|
||||||
|
|
||||||
|
#include <opencv2/gapi/streaming/meta.hpp>
|
||||||
|
#include <opencv2/gapi/streaming/sync.hpp>
|
||||||
|
|
||||||
namespace
|
namespace
|
||||||
{
|
{
|
||||||
using namespace cv::gimpl::stream;
|
using namespace cv::gimpl::stream;
|
||||||
@ -312,12 +315,8 @@ public:
|
|||||||
cv::GRunArgs &out_results);
|
cv::GRunArgs &out_results);
|
||||||
};
|
};
|
||||||
|
|
||||||
// This method handles a stop sign got from some input
|
void rewindToStop(std::vector<Q*> &in_queues,
|
||||||
// island. Reiterate through all _remaining valid_ queues (some of
|
const std::size_t this_id)
|
||||||
// them can be set to nullptr already -- see handling in
|
|
||||||
// getInputVector) and rewind data to every Stop sign per queue.
|
|
||||||
void QueueReader::rewindToStop(std::vector<Q*> &in_queues,
|
|
||||||
const std::size_t this_id)
|
|
||||||
{
|
{
|
||||||
for (auto &&qit : ade::util::indexed(in_queues))
|
for (auto &&qit : ade::util::indexed(in_queues))
|
||||||
{
|
{
|
||||||
@ -331,6 +330,16 @@ void QueueReader::rewindToStop(std::vector<Q*> &in_queues,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// This method handles a stop sign got from some input
|
||||||
|
// island. Reiterate through all _remaining valid_ queues (some of
|
||||||
|
// them can be set to nullptr already -- see handling in
|
||||||
|
// getInputVector) and rewind data to every Stop sign per queue.
|
||||||
|
void QueueReader::rewindToStop(std::vector<Q*> &in_queues,
|
||||||
|
const std::size_t this_id)
|
||||||
|
{
|
||||||
|
::rewindToStop(in_queues, this_id);
|
||||||
|
}
|
||||||
|
|
||||||
bool QueueReader::getInputVector(std::vector<Q*> &in_queues,
|
bool QueueReader::getInputVector(std::vector<Q*> &in_queues,
|
||||||
cv::GRunArgs &in_constants,
|
cv::GRunArgs &in_constants,
|
||||||
cv::GRunArgs &isl_inputs)
|
cv::GRunArgs &isl_inputs)
|
||||||
@ -496,7 +505,7 @@ void emitterActorThread(std::shared_ptr<cv::gimpl::GIslandEmitter> emitter,
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Try to obrain next data chunk from the source
|
// Try to obtain next data chunk from the source
|
||||||
cv::GRunArg data;
|
cv::GRunArg data;
|
||||||
if (emitter->pull(data))
|
if (emitter->pull(data))
|
||||||
{
|
{
|
||||||
@ -521,6 +530,87 @@ void emitterActorThread(std::shared_ptr<cv::gimpl::GIslandEmitter> emitter,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// This thread pulls data from the assigned input queues and makes sure that
|
||||||
|
// all input args are in sync (timestamps are equal), dropping some inputs if required.
|
||||||
|
// After getting synchronized inputs from all input queues, the thread pushes them to out queues
|
||||||
|
void syncActorThread(std::vector<Q*> in_queues,
|
||||||
|
std::vector<std::vector<Q*>> out_queues) {
|
||||||
|
using timestamp_t = int64_t;
|
||||||
|
std::vector<bool> pop_nexts(in_queues.size());
|
||||||
|
std::vector<Cmd> cmds(in_queues.size());
|
||||||
|
|
||||||
|
while (true) {
|
||||||
|
// pop_nexts indicates which queue still contains earlier timestamps and
|
||||||
|
// needs to be popped at least one more time.
|
||||||
|
// For each iteration (frame) we need to pull from each input queue at least once,
|
||||||
|
// so switch all to true when start processing new frame
|
||||||
|
for (auto&& p : pop_nexts) {
|
||||||
|
p = true;
|
||||||
|
}
|
||||||
|
timestamp_t max_ts = 0u;
|
||||||
|
// Iterate through all input queues, pop GRunArg's and compare timestamps.
|
||||||
|
// Continue pulling from queues whose timestamps are smaller.
|
||||||
|
// Finish when all timestamps are equal.
|
||||||
|
do {
|
||||||
|
for (auto&& it : ade::util::indexed(
|
||||||
|
ade::util::zip(pop_nexts, in_queues, cmds))) {
|
||||||
|
auto& val = ade::util::value(it);
|
||||||
|
auto& pop_next = std::get<0>(val);
|
||||||
|
if (!pop_next) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
auto& q = std::get<1>(val);
|
||||||
|
auto& cmd = std::get<2>(val);
|
||||||
|
|
||||||
|
q->pop(cmd);
|
||||||
|
if (cv::util::holds_alternative<Stop>(cmd)) {
|
||||||
|
// We got a stop command from one of the input queues.
|
||||||
|
// Rewind all input queues till Stop command,
|
||||||
|
// Push Stop command down the graph, finish the thread
|
||||||
|
rewindToStop(in_queues, ade::util::index(it));
|
||||||
|
for (auto &&oqs : out_queues) {
|
||||||
|
for (auto &&oq : oqs) {
|
||||||
|
oq->push(Cmd{Stop{}});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Extract the timestamp
|
||||||
|
auto& arg = cv::util::get<cv::GRunArg>(cmd);
|
||||||
|
auto ts = cv::util::any_cast<int64_t>(arg.meta[cv::gapi::streaming::meta_tag::timestamp]);
|
||||||
|
GAPI_Assert(ts >= 0u);
|
||||||
|
|
||||||
|
// TODO: this whole drop logic can be imported via compile args
|
||||||
|
// to give a user a way to customize it
|
||||||
|
if (ts < max_ts) {
|
||||||
|
// Continue popping from this queue
|
||||||
|
pop_next = true;
|
||||||
|
} else if (ts == max_ts) {
|
||||||
|
// Stop popping from this queue
|
||||||
|
pop_next = false;
|
||||||
|
} else if (ts > max_ts) {
|
||||||
|
// We got a timestamp which is greater than timestamps from other queues.
|
||||||
|
// It means that we need to reiterate through all the queues one more time
|
||||||
|
// (except the current one)
|
||||||
|
max_ts = ts;
|
||||||
|
for (auto&& p : pop_nexts) {
|
||||||
|
p = true;
|
||||||
|
}
|
||||||
|
pop_next = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} while (ade::util::any_of(pop_nexts, [](bool v){ return v; }));
|
||||||
|
|
||||||
|
// Finally we got all our inputs synchronized, push them further down the graph
|
||||||
|
for (auto &&it : ade::util::zip(out_queues, cmds)) {
|
||||||
|
for (auto &&q : std::get<0>(it)) {
|
||||||
|
q->push(std::get<1>(it));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
class StreamingInput final: public cv::gimpl::GIslandExecutable::IInput
|
class StreamingInput final: public cv::gimpl::GIslandExecutable::IInput
|
||||||
{
|
{
|
||||||
QueueReader &qr;
|
QueueReader &qr;
|
||||||
@ -874,6 +964,85 @@ void check_DesyncObjectConsumedByMultipleIslands(const cv::gimpl::GIslandModel::
|
|||||||
|
|
||||||
} // anonymous namespace
|
} // anonymous namespace
|
||||||
|
|
||||||
|
class cv::gimpl::GStreamingExecutor::Synchronizer final {
|
||||||
|
gapi::streaming::sync_policy m_sync_policy = gapi::streaming::sync_policy::dont_sync;
|
||||||
|
ade::Graph& m_island_graph;
|
||||||
|
cv::gimpl::GIslandModel::Graph m_gim;
|
||||||
|
std::size_t m_queue_capacity = 0u;
|
||||||
|
std::thread m_thread;
|
||||||
|
|
||||||
|
std::vector<ade::NodeHandle> m_synchronized_emitters;
|
||||||
|
std::vector<stream::SyncQueue> m_sync_queues;
|
||||||
|
|
||||||
|
std::vector<stream::Q*> newSyncQueue() {
|
||||||
|
m_sync_queues.emplace_back(SyncQueue{});
|
||||||
|
m_sync_queues.back().set_capacity(m_queue_capacity);
|
||||||
|
return std::vector<Q*>{&m_sync_queues.back()};
|
||||||
|
}
|
||||||
|
public:
|
||||||
|
Synchronizer(gapi::streaming::sync_policy sync_policy,
|
||||||
|
ade::Graph& island_graph,
|
||||||
|
std::size_t queue_capacity)
|
||||||
|
: m_sync_policy(sync_policy)
|
||||||
|
, m_island_graph(island_graph)
|
||||||
|
, m_gim(m_island_graph)
|
||||||
|
, m_queue_capacity(queue_capacity) {
|
||||||
|
}
|
||||||
|
|
||||||
|
void registerVideoEmitters(std::vector<ade::NodeHandle>&& emitters) {
|
||||||
|
// There is no point to make synchronization for the one video input
|
||||||
|
// so do nothing in this case
|
||||||
|
if ( m_sync_policy == cv::gapi::streaming::sync_policy::drop
|
||||||
|
&& emitters.size() > 1u) {
|
||||||
|
m_synchronized_emitters = std::move(emitters);
|
||||||
|
m_sync_queues.reserve(m_synchronized_emitters.size());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
std::vector<stream::Q*> outQueues(const ade::NodeHandle& emitter) {
|
||||||
|
// If the emitter was registered previously (which means it needs to be synchronized),
|
||||||
|
// create a new queue for this emitter to push the data to. Sync thread will
|
||||||
|
// pop from this queue and push data to emitter's readers.
|
||||||
|
// If the emitter was not registered, direct emitter output to its immediate readers right away
|
||||||
|
return m_synchronized_emitters.end() != std::find(m_synchronized_emitters.begin(),
|
||||||
|
m_synchronized_emitters.end(),
|
||||||
|
emitter)
|
||||||
|
? newSyncQueue()
|
||||||
|
: reader_queues(m_island_graph, emitter->outNodes().front());
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start a thread which will handle the synchronization.
|
||||||
|
// Do nothing if synchronization is not needed
|
||||||
|
void start() {
|
||||||
|
if (m_synchronized_emitters.size() != 0) {
|
||||||
|
GAPI_Assert(m_synchronized_emitters.size() > 1u);
|
||||||
|
std::vector<Q*> sync_in_queues(m_synchronized_emitters.size());
|
||||||
|
std::vector<std::vector<Q*>> sync_out_queues(m_synchronized_emitters.size());
|
||||||
|
for (auto it : ade::util::indexed(m_synchronized_emitters)) {
|
||||||
|
const auto id = ade::util::index(it);
|
||||||
|
const auto eh = ade::util::value(it);
|
||||||
|
sync_in_queues[id] = &m_sync_queues[id];
|
||||||
|
sync_out_queues[id] = reader_queues(m_island_graph, eh->outNodes().front());
|
||||||
|
}
|
||||||
|
m_thread = std::thread(syncActorThread,
|
||||||
|
std::move(sync_in_queues),
|
||||||
|
std::move(sync_out_queues));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void join() {
|
||||||
|
if (m_synchronized_emitters.size() != 0) {
|
||||||
|
m_thread.join();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void clear() {
|
||||||
|
for (auto &q : m_sync_queues) q.clear();
|
||||||
|
m_sync_queues.clear();
|
||||||
|
m_synchronized_emitters.clear();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
// GStreamingExecutor expects compile arguments as input to have possibility to do
|
// GStreamingExecutor expects compile arguments as input to have possibility to do
|
||||||
// proper graph reshape and islands recompilation
|
// proper graph reshape and islands recompilation
|
||||||
cv::gimpl::GStreamingExecutor::GStreamingExecutor(std::unique_ptr<ade::Graph> &&g_model,
|
cv::gimpl::GStreamingExecutor::GStreamingExecutor(std::unique_ptr<ade::Graph> &&g_model,
|
||||||
@ -911,6 +1080,10 @@ cv::gimpl::GStreamingExecutor::GStreamingExecutor(std::unique_ptr<ade::Graph> &&
|
|||||||
return m_gim.metadata(nh).get<NodeKind>().k == NodeKind::ISLAND;
|
return m_gim.metadata(nh).get<NodeKind>().k == NodeKind::ISLAND;
|
||||||
});
|
});
|
||||||
|
|
||||||
|
auto sync_policy = cv::gimpl::getCompileArg<cv::gapi::streaming::sync_policy>(m_comp_args)
|
||||||
|
.value_or(cv::gapi::streaming::sync_policy::dont_sync);
|
||||||
|
m_sync.reset(new Synchronizer(sync_policy, *m_island_graph, queue_capacity));
|
||||||
|
|
||||||
// If metadata was not passed to compileStreaming, Islands are not compiled at this point.
|
// If metadata was not passed to compileStreaming, Islands are not compiled at this point.
|
||||||
// It is fine -- Islands are then compiled in setSource (at the first valid call).
|
// It is fine -- Islands are then compiled in setSource (at the first valid call).
|
||||||
const bool islands_compiled = m_gim.metadata().contains<IslandsCompiled>();
|
const bool islands_compiled = m_gim.metadata().contains<IslandsCompiled>();
|
||||||
@ -934,7 +1107,7 @@ cv::gimpl::GStreamingExecutor::GStreamingExecutor(std::unique_ptr<ade::Graph> &&
|
|||||||
std::unordered_set<ade::NodeHandle, ade::HandleHasher<ade::Node> > const_ins;
|
std::unordered_set<ade::NodeHandle, ade::HandleHasher<ade::Node> > const_ins;
|
||||||
|
|
||||||
// FIXME: THIS ORDER IS IRRELEVANT TO PROTOCOL OR ANY OTHER ORDER!
|
// FIXME: THIS ORDER IS IRRELEVANT TO PROTOCOL OR ANY OTHER ORDER!
|
||||||
// FIXME: SAME APPLIES TO THE REGULAR GEEXECUTOR!!
|
// FIXME: SAME APPLIES TO THE REGULAR GEXECUTOR!!
|
||||||
auto xtract_in = [&](ade::NodeHandle slot_nh, std::vector<RcDesc> &vec)
|
auto xtract_in = [&](ade::NodeHandle slot_nh, std::vector<RcDesc> &vec)
|
||||||
{
|
{
|
||||||
const auto orig_data_nh
|
const auto orig_data_nh
|
||||||
@ -1101,19 +1274,6 @@ void cv::gimpl::GStreamingExecutor::setSource(GRunArgs &&ins)
|
|||||||
{
|
{
|
||||||
GAPI_Assert(state == State::READY || state == State::STOPPED);
|
GAPI_Assert(state == State::READY || state == State::STOPPED);
|
||||||
|
|
||||||
const auto is_video = [](const GRunArg &arg)
|
|
||||||
{
|
|
||||||
return util::holds_alternative<cv::gapi::wip::IStreamSource::Ptr>(arg);
|
|
||||||
};
|
|
||||||
const auto num_videos = std::count_if(ins.begin(), ins.end(), is_video);
|
|
||||||
if (num_videos > 1)
|
|
||||||
{
|
|
||||||
// See below why (another reason - no documented behavior
|
|
||||||
// on handling videos streams of different length)
|
|
||||||
util::throw_error(std::logic_error("Only one video source is"
|
|
||||||
" currently supported!"));
|
|
||||||
}
|
|
||||||
|
|
||||||
GModel::ConstGraph gm(*m_orig_graph);
|
GModel::ConstGraph gm(*m_orig_graph);
|
||||||
// Now the tricky-part: completing Islands compilation if compileStreaming
|
// Now the tricky-part: completing Islands compilation if compileStreaming
|
||||||
// has been called without meta arguments.
|
// has been called without meta arguments.
|
||||||
@ -1180,6 +1340,8 @@ void cv::gimpl::GStreamingExecutor::setSource(GRunArgs &&ins)
|
|||||||
|
|
||||||
// Walk through the protocol, set-up emitters appropriately
|
// Walk through the protocol, set-up emitters appropriately
|
||||||
// There's a 1:1 mapping between emitters and corresponding data inputs.
|
// There's a 1:1 mapping between emitters and corresponding data inputs.
|
||||||
|
// Also collect video emitter nodes to use them later in synchronization
|
||||||
|
std::vector<ade::NodeHandle> video_emitters;
|
||||||
for (auto it : ade::util::zip(ade::util::toRange(m_emitters),
|
for (auto it : ade::util::zip(ade::util::toRange(m_emitters),
|
||||||
ade::util::toRange(ins),
|
ade::util::toRange(ins),
|
||||||
ade::util::iota(m_emitters.size())))
|
ade::util::iota(m_emitters.size())))
|
||||||
@ -1197,6 +1359,9 @@ void cv::gimpl::GStreamingExecutor::setSource(GRunArgs &&ins)
|
|||||||
case T::index_of<cv::gapi::wip::IStreamSource::Ptr>():
|
case T::index_of<cv::gapi::wip::IStreamSource::Ptr>():
|
||||||
#if !defined(GAPI_STANDALONE)
|
#if !defined(GAPI_STANDALONE)
|
||||||
emitter.reset(new VideoEmitter{emit_arg});
|
emitter.reset(new VideoEmitter{emit_arg});
|
||||||
|
// Currently all video inputs are syncronized if sync policy is to drop,
|
||||||
|
// there is no different fps branches etc, so all video emitters are registered
|
||||||
|
video_emitters.emplace_back(emit_nh);
|
||||||
#else
|
#else
|
||||||
util::throw_error(std::logic_error("Video is not supported in the "
|
util::throw_error(std::logic_error("Video is not supported in the "
|
||||||
"standalone mode"));
|
"standalone mode"));
|
||||||
@ -1212,6 +1377,8 @@ void cv::gimpl::GStreamingExecutor::setSource(GRunArgs &&ins)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
m_sync->registerVideoEmitters(std::move(video_emitters));
|
||||||
|
|
||||||
// FIXME: The below code assumes our graph may have only one
|
// FIXME: The below code assumes our graph may have only one
|
||||||
// real video source (and so, only one stream which may really end)
|
// real video source (and so, only one stream which may really end)
|
||||||
// all other inputs are "constant" generators.
|
// all other inputs are "constant" generators.
|
||||||
@ -1249,7 +1416,7 @@ void cv::gimpl::GStreamingExecutor::setSource(GRunArgs &&ins)
|
|||||||
auto emitter = m_gim.metadata(eh).get<Emitter>().object;
|
auto emitter = m_gim.metadata(eh).get<Emitter>().object;
|
||||||
|
|
||||||
// Collect all reader queues from the emitter's the only output object
|
// Collect all reader queues from the emitter's the only output object
|
||||||
auto out_queues = reader_queues(*m_island_graph, eh->outNodes().front());
|
auto out_queues = m_sync->outQueues(eh);
|
||||||
|
|
||||||
m_threads.emplace_back(emitterActorThread,
|
m_threads.emplace_back(emitterActorThread,
|
||||||
emitter,
|
emitter,
|
||||||
@ -1258,6 +1425,8 @@ void cv::gimpl::GStreamingExecutor::setSource(GRunArgs &&ins)
|
|||||||
real_video_completion_cb);
|
real_video_completion_cb);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
m_sync->start();
|
||||||
|
|
||||||
// Now do this for every island (in a topological order)
|
// Now do this for every island (in a topological order)
|
||||||
for (auto &&op : m_ops)
|
for (auto &&op : m_ops)
|
||||||
{
|
{
|
||||||
@ -1341,6 +1510,7 @@ void cv::gimpl::GStreamingExecutor::wait_shutdown()
|
|||||||
// FIXME: Of course it can be designed much better
|
// FIXME: Of course it can be designed much better
|
||||||
for (auto &t : m_threads) t.join();
|
for (auto &t : m_threads) t.join();
|
||||||
m_threads.clear();
|
m_threads.clear();
|
||||||
|
m_sync->join();
|
||||||
|
|
||||||
// Clear all queues
|
// Clear all queues
|
||||||
// If there are constant emitters, internal queues
|
// If there are constant emitters, internal queues
|
||||||
@ -1352,7 +1522,10 @@ void cv::gimpl::GStreamingExecutor::wait_shutdown()
|
|||||||
for (auto &q : m_emitter_queues) q.clear();
|
for (auto &q : m_emitter_queues) q.clear();
|
||||||
for (auto &q : m_sink_queues) q->clear();
|
for (auto &q : m_sink_queues) q->clear();
|
||||||
for (auto &q : m_internal_queues) q->clear();
|
for (auto &q : m_internal_queues) q->clear();
|
||||||
|
m_const_emitter_queues.clear();
|
||||||
|
m_const_vals.clear();
|
||||||
m_out_queue.clear();
|
m_out_queue.clear();
|
||||||
|
m_sync->clear();
|
||||||
|
|
||||||
for (auto &&op : m_ops) {
|
for (auto &&op : m_ops) {
|
||||||
op.isl_exec->handleStopStream();
|
op.isl_exec->handleStopStream();
|
||||||
|
@ -167,6 +167,9 @@ protected:
|
|||||||
std::vector<ade::NodeHandle> m_emitters;
|
std::vector<ade::NodeHandle> m_emitters;
|
||||||
std::vector<ade::NodeHandle> m_sinks;
|
std::vector<ade::NodeHandle> m_sinks;
|
||||||
|
|
||||||
|
class Synchronizer;
|
||||||
|
std::unique_ptr<Synchronizer> m_sync;
|
||||||
|
|
||||||
std::vector<std::thread> m_threads;
|
std::vector<std::thread> m_threads;
|
||||||
std::vector<stream::SyncQueue> m_emitter_queues;
|
std::vector<stream::SyncQueue> m_emitter_queues;
|
||||||
|
|
||||||
|
220
modules/gapi/test/streaming/gapi_streaming_sync_tests.cpp
Normal file
220
modules/gapi/test/streaming/gapi_streaming_sync_tests.cpp
Normal file
@ -0,0 +1,220 @@
|
|||||||
|
// This file is part of OpenCV project.
|
||||||
|
// It is subject to the license terms in the LICENSE file found in the top-level directory
|
||||||
|
// of this distribution and at http://opencv.org/license.html.
|
||||||
|
//
|
||||||
|
// Copyright (C) 2021 Intel Corporation
|
||||||
|
|
||||||
|
#include "../test_precomp.hpp"
|
||||||
|
|
||||||
|
#include <opencv2/gapi/streaming/cap.hpp>
|
||||||
|
#include <opencv2/gapi/core.hpp>
|
||||||
|
#include <opencv2/gapi/fluid/imgproc.hpp>
|
||||||
|
#include <opencv2/gapi/streaming/cap.hpp>
|
||||||
|
#include <opencv2/gapi/streaming/sync.hpp>
|
||||||
|
|
||||||
|
namespace opencv_test {
|
||||||
|
namespace {
|
||||||
|
|
||||||
|
using ts_t = int64_t;
|
||||||
|
using ts_vec = std::vector<ts_t>;
|
||||||
|
using cv::gapi::streaming::sync_policy;
|
||||||
|
|
||||||
|
ts_t calcLeastCommonMultiple(const ts_vec& values) {
|
||||||
|
ts_t res = *std::max_element(values.begin(), values.end());
|
||||||
|
auto isDivisor = [&](ts_t v) { return res % v == 0; };
|
||||||
|
while(!std::all_of(values.begin(), values.end(), isDivisor)) {
|
||||||
|
res++;
|
||||||
|
}
|
||||||
|
return res;
|
||||||
|
}
|
||||||
|
|
||||||
|
struct TimestampGenerationParams {
|
||||||
|
const ts_vec frame_times;
|
||||||
|
sync_policy policy;
|
||||||
|
ts_t end_time;
|
||||||
|
TimestampGenerationParams(const ts_vec& ft, sync_policy sp, ts_t et = 25)
|
||||||
|
: frame_times(ft), policy(sp), end_time(et) {
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
class MultiFrameSource {
|
||||||
|
class SingleSource : public cv::gapi::wip::IStreamSource {
|
||||||
|
MultiFrameSource& m_source;
|
||||||
|
std::size_t m_idx;
|
||||||
|
public:
|
||||||
|
SingleSource(MultiFrameSource& s, std::size_t idx)
|
||||||
|
: m_source(s)
|
||||||
|
, m_idx(idx)
|
||||||
|
{}
|
||||||
|
virtual bool pull(cv::gapi::wip::Data& data) {
|
||||||
|
return m_source.pull(data, m_idx);
|
||||||
|
}
|
||||||
|
virtual GMetaArg descr_of() const { return GMetaArg{m_source.desc()}; }
|
||||||
|
};
|
||||||
|
|
||||||
|
TimestampGenerationParams p;
|
||||||
|
ts_vec m_current_times;
|
||||||
|
cv::Mat m_mat;
|
||||||
|
|
||||||
|
public:
|
||||||
|
MultiFrameSource(const TimestampGenerationParams& params)
|
||||||
|
: p(params)
|
||||||
|
, m_current_times(p.frame_times.size(), 0u)
|
||||||
|
, m_mat(8, 8, CV_8UC1) {
|
||||||
|
}
|
||||||
|
|
||||||
|
bool pull(cv::gapi::wip::Data& data, std::size_t idx) {
|
||||||
|
cv::randn(m_mat, 127, 32);
|
||||||
|
GAPI_Assert(idx < p.frame_times.size());
|
||||||
|
m_current_times[idx] += p.frame_times[idx];
|
||||||
|
if (m_current_times[idx] >= p.end_time) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
data = m_mat.clone();
|
||||||
|
data.meta[cv::gapi::streaming::meta_tag::timestamp] = m_current_times[idx];
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
cv::gapi::wip::IStreamSource::Ptr getSource(std::size_t idx) {
|
||||||
|
return cv::gapi::wip::IStreamSource::Ptr{new SingleSource(*this, idx)};
|
||||||
|
}
|
||||||
|
|
||||||
|
GMatDesc desc() const { return cv::descr_of(m_mat); }
|
||||||
|
};
|
||||||
|
|
||||||
|
class TimestampChecker {
|
||||||
|
TimestampGenerationParams p;
|
||||||
|
ts_t m_synced_time = 0u;
|
||||||
|
ts_t m_synced_frame_time = 0u;
|
||||||
|
public:
|
||||||
|
TimestampChecker(const TimestampGenerationParams& params)
|
||||||
|
: p(params)
|
||||||
|
, m_synced_frame_time(calcLeastCommonMultiple(p.frame_times)) {
|
||||||
|
}
|
||||||
|
|
||||||
|
void checkNext(const ts_vec& timestamps) {
|
||||||
|
if (p.policy == sync_policy::dont_sync) {
|
||||||
|
// don't check timestamps if the policy is dont_sync
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
m_synced_time += m_synced_frame_time;
|
||||||
|
for (const auto& ts : timestamps) {
|
||||||
|
EXPECT_EQ(m_synced_time, ts);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
std::size_t nFrames() const {
|
||||||
|
auto frame_time = p.policy == sync_policy::dont_sync
|
||||||
|
? *std::max_element(p.frame_times.begin(), p.frame_times.end())
|
||||||
|
: m_synced_frame_time;
|
||||||
|
auto n_frames = p.end_time / frame_time;
|
||||||
|
GAPI_Assert(n_frames > 0u);
|
||||||
|
return n_frames;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
struct TimestampSyncTest : public ::testing::TestWithParam<sync_policy> {
|
||||||
|
void run(cv::GProtoInputArgs&& ins, cv::GProtoOutputArgs&& outs,
|
||||||
|
const ts_vec& frame_times) {
|
||||||
|
auto video_in_n = frame_times.size();
|
||||||
|
GAPI_Assert(video_in_n <= ins.m_args.size());
|
||||||
|
// Assume that all remaining inputs are const
|
||||||
|
auto const_in_n = ins.m_args.size() - video_in_n;
|
||||||
|
auto out_n = outs.m_args.size();
|
||||||
|
auto policy = GetParam();
|
||||||
|
TimestampGenerationParams ts_params(frame_times, policy);
|
||||||
|
MultiFrameSource source(ts_params);
|
||||||
|
|
||||||
|
GRunArgs gins;
|
||||||
|
for (std::size_t i = 0; i < video_in_n; i++) {
|
||||||
|
gins += cv::gin(source.getSource(i));
|
||||||
|
}
|
||||||
|
auto desc = source.desc();
|
||||||
|
cv::Mat const_mat = cv::Mat::eye(desc.size.height,
|
||||||
|
desc.size.width,
|
||||||
|
CV_MAKE_TYPE(desc.depth, desc.chan));
|
||||||
|
for (std::size_t i = 0; i < const_in_n; i++) {
|
||||||
|
gins += cv::gin(const_mat);
|
||||||
|
}
|
||||||
|
ts_vec out_timestamps(out_n);
|
||||||
|
cv::GRunArgsP gouts{};
|
||||||
|
for (auto& t : out_timestamps) {
|
||||||
|
gouts += cv::gout(t);
|
||||||
|
}
|
||||||
|
|
||||||
|
auto pipe = cv::GComputation(std::move(ins), std::move(outs))
|
||||||
|
.compileStreaming(cv::compile_args(policy));
|
||||||
|
|
||||||
|
pipe.setSource(std::move(gins));
|
||||||
|
pipe.start();
|
||||||
|
|
||||||
|
std::size_t frames = 0u;
|
||||||
|
TimestampChecker checker(ts_params);
|
||||||
|
while(pipe.pull(std::move(gouts))) {
|
||||||
|
checker.checkNext(out_timestamps);
|
||||||
|
frames++;
|
||||||
|
}
|
||||||
|
|
||||||
|
EXPECT_EQ(checker.nFrames(), frames);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
} // anonymous namespace
|
||||||
|
|
||||||
|
TEST_P(TimestampSyncTest, Basic)
|
||||||
|
{
|
||||||
|
cv::GMat in1, in2;
|
||||||
|
auto out = cv::gapi::add(in1, in2);
|
||||||
|
auto ts = cv::gapi::streaming::timestamp(out);
|
||||||
|
|
||||||
|
run(cv::GIn(in1, in2), cv::GOut(ts), {1,2});
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_P(TimestampSyncTest, ThreeInputs)
|
||||||
|
{
|
||||||
|
cv::GMat in1, in2, in3;
|
||||||
|
auto tmp = cv::gapi::add(in1, in2);
|
||||||
|
auto out = cv::gapi::add(tmp, in3);
|
||||||
|
auto ts = cv::gapi::streaming::timestamp(out);
|
||||||
|
|
||||||
|
run(cv::GIn(in1, in2, in3), cv::GOut(ts), {2,4,3});
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_P(TimestampSyncTest, TwoOutputs)
|
||||||
|
{
|
||||||
|
cv::GMat in1, in2, in3;
|
||||||
|
auto out1 = cv::gapi::add(in1, in3);
|
||||||
|
auto out2 = cv::gapi::add(in2, in3);
|
||||||
|
auto ts1 = cv::gapi::streaming::timestamp(out1);
|
||||||
|
auto ts2 = cv::gapi::streaming::timestamp(out2);
|
||||||
|
|
||||||
|
run(cv::GIn(in1, in2, in3), cv::GOut(ts1, ts2), {1,4,2});
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_P(TimestampSyncTest, ConstInput)
|
||||||
|
{
|
||||||
|
cv::GMat in1, in2, in3;
|
||||||
|
auto out1 = cv::gapi::add(in1, in3);
|
||||||
|
auto out2 = cv::gapi::add(in2, in3);
|
||||||
|
auto ts1 = cv::gapi::streaming::timestamp(out1);
|
||||||
|
auto ts2 = cv::gapi::streaming::timestamp(out2);
|
||||||
|
|
||||||
|
run(cv::GIn(in1, in2, in3), cv::GOut(ts1, ts2), {1,2});
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_P(TimestampSyncTest, ChangeSource)
|
||||||
|
{
|
||||||
|
cv::GMat in1, in2, in3;
|
||||||
|
auto out1 = cv::gapi::add(in1, in3);
|
||||||
|
auto out2 = cv::gapi::add(in2, in3);
|
||||||
|
auto ts1 = cv::gapi::streaming::timestamp(out1);
|
||||||
|
auto ts2 = cv::gapi::streaming::timestamp(out2);
|
||||||
|
|
||||||
|
run(cv::GIn(in1, in2, in3), cv::GOut(ts1, ts2), {1,2});
|
||||||
|
run(cv::GIn(in1, in2, in3), cv::GOut(ts1, ts2), {1,2});
|
||||||
|
}
|
||||||
|
|
||||||
|
INSTANTIATE_TEST_CASE_P(InputSynchronization, TimestampSyncTest,
|
||||||
|
Values(sync_policy::dont_sync,
|
||||||
|
sync_policy::drop));
|
||||||
|
} // namespace opencv_test
|
@ -1102,25 +1102,37 @@ struct GAPI_Streaming_Unit: public ::testing::Test {
|
|||||||
// FIXME: (GAPI_Streaming_Types, XChangeOpaque) test is missing here!
|
// FIXME: (GAPI_Streaming_Types, XChangeOpaque) test is missing here!
|
||||||
// FIXME: (GAPI_Streaming_Types, OutputOpaque) test is missing here!
|
// FIXME: (GAPI_Streaming_Types, OutputOpaque) test is missing here!
|
||||||
|
|
||||||
TEST_F(GAPI_Streaming_Unit, TestTwoVideoSourcesFail)
|
TEST(GAPI_Streaming, TestTwoVideosDifferentLength)
|
||||||
{
|
{
|
||||||
auto c_desc = cv::GMatDesc{CV_8U,3,{768,576}};
|
initTestDataPath();
|
||||||
auto m_desc = cv::descr_of(m);
|
auto desc = cv::GMatDesc{CV_8U,3,{768,576}};
|
||||||
auto path = findDataFile("cv/video/768x576.avi");
|
std::string path1, path2;
|
||||||
try {
|
try {
|
||||||
sc = cc.compileStreaming(c_desc, m_desc);
|
path1 = findDataFile("cv/video/768x576.avi");
|
||||||
// FIXME: it should be EXPECT_NO_THROW()
|
path2 = findDataFile("highgui/video/big_buck_bunny.avi");
|
||||||
sc.setSource(cv::gin(gapi::wip::make_src<cv::gapi::wip::GCaptureSource>(path), m));
|
|
||||||
sc = cc.compileStreaming(m_desc, c_desc);
|
|
||||||
// FIXME: it should be EXPECT_NO_THROW()
|
|
||||||
sc.setSource(cv::gin(m, gapi::wip::make_src<cv::gapi::wip::GCaptureSource>(path)));
|
|
||||||
} catch(...) {
|
} catch(...) {
|
||||||
throw SkipTestException("Video file can not be opened");
|
throw SkipTestException("Video file can not be found");
|
||||||
}
|
}
|
||||||
|
|
||||||
sc = cc.compileStreaming(c_desc, c_desc);
|
cv::GMat in1, in2;
|
||||||
auto c_ptr = gapi::wip::make_src<cv::gapi::wip::GCaptureSource>(path);
|
auto out = in1 + cv::gapi::resize(in2, desc.size);
|
||||||
EXPECT_ANY_THROW(sc.setSource(cv::gin(c_ptr, c_ptr)));
|
|
||||||
|
cv::GComputation cc(cv::GIn(in1, in2), cv::GOut(out));
|
||||||
|
auto sc = cc.compileStreaming();
|
||||||
|
|
||||||
|
sc.setSource(cv::gin(gapi::wip::make_src<cv::gapi::wip::GCaptureSource>(path1),
|
||||||
|
gapi::wip::make_src<cv::gapi::wip::GCaptureSource>(path2)));
|
||||||
|
sc.start();
|
||||||
|
|
||||||
|
cv::Mat out_mat;
|
||||||
|
std::size_t frames = 0u;
|
||||||
|
while(sc.pull(cv::gout(out_mat))) {
|
||||||
|
frames++;
|
||||||
|
}
|
||||||
|
|
||||||
|
// big_buck_bunny.avi has 125 frames, 768x576.avi - 100 frames,
|
||||||
|
// expect framework to stop after 100 frames
|
||||||
|
EXPECT_EQ(100u, frames);
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST_F(GAPI_Streaming_Unit, TestStartWithoutnSetSource)
|
TEST_F(GAPI_Streaming_Unit, TestStartWithoutnSetSource)
|
||||||
|
Loading…
Reference in New Issue
Block a user