diff --git a/modules/gapi/include/opencv2/gapi/streaming/sync.hpp b/modules/gapi/include/opencv2/gapi/streaming/sync.hpp new file mode 100644 index 0000000000..5801e6f00a --- /dev/null +++ b/modules/gapi/include/opencv2/gapi/streaming/sync.hpp @@ -0,0 +1,30 @@ +// This file is part of OpenCV project. +// It is subject to the license terms in the LICENSE file found in the top-level directory +// of this distribution and at http://opencv.org/license.html. +// +// Copyright (C) 2021 Intel Corporation + +#ifndef OPENCV_GAPI_STREAMING_SYNC_HPP +#define OPENCV_GAPI_STREAMING_SYNC_HPP + +namespace cv { +namespace gapi { +namespace streaming { + +enum class sync_policy { + dont_sync, + drop +}; + +} // namespace streaming +} // namespace gapi + +namespace detail { + template<> struct CompileArgTag { + static const char* tag() { return "gapi.streaming.sync_policy"; } + }; + +} // namespace detail +} // namespace cv + +#endif // OPENCV_GAPI_STREAMING_SYNC_HPP diff --git a/modules/gapi/src/executor/gstreamingexecutor.cpp b/modules/gapi/src/executor/gstreamingexecutor.cpp index 3715f448d0..2a06873fee 100644 --- a/modules/gapi/src/executor/gstreamingexecutor.cpp +++ b/modules/gapi/src/executor/gstreamingexecutor.cpp @@ -24,6 +24,9 @@ #include "executor/gstreamingexecutor.hpp" +#include +#include + namespace { using namespace cv::gimpl::stream; @@ -312,12 +315,8 @@ public: cv::GRunArgs &out_results); }; -// This method handles a stop sign got from some input -// island. Reiterate through all _remaining valid_ queues (some of -// them can be set to nullptr already -- see handling in -// getInputVector) and rewind data to every Stop sign per queue. -void QueueReader::rewindToStop(std::vector &in_queues, - const std::size_t this_id) +void rewindToStop(std::vector &in_queues, + const std::size_t this_id) { for (auto &&qit : ade::util::indexed(in_queues)) { @@ -331,6 +330,16 @@ void QueueReader::rewindToStop(std::vector &in_queues, } } +// This method handles a stop sign got from some input +// island. Reiterate through all _remaining valid_ queues (some of +// them can be set to nullptr already -- see handling in +// getInputVector) and rewind data to every Stop sign per queue. +void QueueReader::rewindToStop(std::vector &in_queues, + const std::size_t this_id) +{ + ::rewindToStop(in_queues, this_id); +} + bool QueueReader::getInputVector(std::vector &in_queues, cv::GRunArgs &in_constants, cv::GRunArgs &isl_inputs) @@ -496,7 +505,7 @@ void emitterActorThread(std::shared_ptr emitter, return; } - // Try to obrain next data chunk from the source + // Try to obtain next data chunk from the source cv::GRunArg data; if (emitter->pull(data)) { @@ -521,6 +530,87 @@ void emitterActorThread(std::shared_ptr emitter, } } +// This thread pulls data from the assigned input queues and makes sure that +// all input args are in sync (timestamps are equal), dropping some inputs if required. +// After getting synchronized inputs from all input queues, the thread pushes them to out queues +void syncActorThread(std::vector in_queues, + std::vector> out_queues) { + using timestamp_t = int64_t; + std::vector pop_nexts(in_queues.size()); + std::vector cmds(in_queues.size()); + + while (true) { + // pop_nexts indicates which queue still contains earlier timestamps and + // needs to be popped at least one more time. + // For each iteration (frame) we need to pull from each input queue at least once, + // so switch all to true when start processing new frame + for (auto&& p : pop_nexts) { + p = true; + } + timestamp_t max_ts = 0u; + // Iterate through all input queues, pop GRunArg's and compare timestamps. + // Continue pulling from queues whose timestamps are smaller. + // Finish when all timestamps are equal. + do { + for (auto&& it : ade::util::indexed( + ade::util::zip(pop_nexts, in_queues, cmds))) { + auto& val = ade::util::value(it); + auto& pop_next = std::get<0>(val); + if (!pop_next) { + continue; + } + auto& q = std::get<1>(val); + auto& cmd = std::get<2>(val); + + q->pop(cmd); + if (cv::util::holds_alternative(cmd)) { + // We got a stop command from one of the input queues. + // Rewind all input queues till Stop command, + // Push Stop command down the graph, finish the thread + rewindToStop(in_queues, ade::util::index(it)); + for (auto &&oqs : out_queues) { + for (auto &&oq : oqs) { + oq->push(Cmd{Stop{}}); + } + } + return; + } + + // Extract the timestamp + auto& arg = cv::util::get(cmd); + auto ts = cv::util::any_cast(arg.meta[cv::gapi::streaming::meta_tag::timestamp]); + GAPI_Assert(ts >= 0u); + + // TODO: this whole drop logic can be imported via compile args + // to give a user a way to customize it + if (ts < max_ts) { + // Continue popping from this queue + pop_next = true; + } else if (ts == max_ts) { + // Stop popping from this queue + pop_next = false; + } else if (ts > max_ts) { + // We got a timestamp which is greater than timestamps from other queues. + // It means that we need to reiterate through all the queues one more time + // (except the current one) + max_ts = ts; + for (auto&& p : pop_nexts) { + p = true; + } + pop_next = false; + } + } + } while (ade::util::any_of(pop_nexts, [](bool v){ return v; })); + + // Finally we got all our inputs synchronized, push them further down the graph + for (auto &&it : ade::util::zip(out_queues, cmds)) { + for (auto &&q : std::get<0>(it)) { + q->push(std::get<1>(it)); + } + } + } +} + class StreamingInput final: public cv::gimpl::GIslandExecutable::IInput { QueueReader &qr; @@ -874,6 +964,85 @@ void check_DesyncObjectConsumedByMultipleIslands(const cv::gimpl::GIslandModel:: } // anonymous namespace +class cv::gimpl::GStreamingExecutor::Synchronizer final { + gapi::streaming::sync_policy m_sync_policy = gapi::streaming::sync_policy::dont_sync; + ade::Graph& m_island_graph; + cv::gimpl::GIslandModel::Graph m_gim; + std::size_t m_queue_capacity = 0u; + std::thread m_thread; + + std::vector m_synchronized_emitters; + std::vector m_sync_queues; + + std::vector newSyncQueue() { + m_sync_queues.emplace_back(SyncQueue{}); + m_sync_queues.back().set_capacity(m_queue_capacity); + return std::vector{&m_sync_queues.back()}; + } +public: + Synchronizer(gapi::streaming::sync_policy sync_policy, + ade::Graph& island_graph, + std::size_t queue_capacity) + : m_sync_policy(sync_policy) + , m_island_graph(island_graph) + , m_gim(m_island_graph) + , m_queue_capacity(queue_capacity) { + } + + void registerVideoEmitters(std::vector&& emitters) { + // There is no point to make synchronization for the one video input + // so do nothing in this case + if ( m_sync_policy == cv::gapi::streaming::sync_policy::drop + && emitters.size() > 1u) { + m_synchronized_emitters = std::move(emitters); + m_sync_queues.reserve(m_synchronized_emitters.size()); + } + } + + std::vector outQueues(const ade::NodeHandle& emitter) { + // If the emitter was registered previously (which means it needs to be synchronized), + // create a new queue for this emitter to push the data to. Sync thread will + // pop from this queue and push data to emitter's readers. + // If the emitter was not registered, direct emitter output to its immediate readers right away + return m_synchronized_emitters.end() != std::find(m_synchronized_emitters.begin(), + m_synchronized_emitters.end(), + emitter) + ? newSyncQueue() + : reader_queues(m_island_graph, emitter->outNodes().front()); + } + + // Start a thread which will handle the synchronization. + // Do nothing if synchronization is not needed + void start() { + if (m_synchronized_emitters.size() != 0) { + GAPI_Assert(m_synchronized_emitters.size() > 1u); + std::vector sync_in_queues(m_synchronized_emitters.size()); + std::vector> sync_out_queues(m_synchronized_emitters.size()); + for (auto it : ade::util::indexed(m_synchronized_emitters)) { + const auto id = ade::util::index(it); + const auto eh = ade::util::value(it); + sync_in_queues[id] = &m_sync_queues[id]; + sync_out_queues[id] = reader_queues(m_island_graph, eh->outNodes().front()); + } + m_thread = std::thread(syncActorThread, + std::move(sync_in_queues), + std::move(sync_out_queues)); + } + } + + void join() { + if (m_synchronized_emitters.size() != 0) { + m_thread.join(); + } + } + + void clear() { + for (auto &q : m_sync_queues) q.clear(); + m_sync_queues.clear(); + m_synchronized_emitters.clear(); + } +}; + // GStreamingExecutor expects compile arguments as input to have possibility to do // proper graph reshape and islands recompilation cv::gimpl::GStreamingExecutor::GStreamingExecutor(std::unique_ptr &&g_model, @@ -911,6 +1080,10 @@ cv::gimpl::GStreamingExecutor::GStreamingExecutor(std::unique_ptr && return m_gim.metadata(nh).get().k == NodeKind::ISLAND; }); + auto sync_policy = cv::gimpl::getCompileArg(m_comp_args) + .value_or(cv::gapi::streaming::sync_policy::dont_sync); + m_sync.reset(new Synchronizer(sync_policy, *m_island_graph, queue_capacity)); + // If metadata was not passed to compileStreaming, Islands are not compiled at this point. // It is fine -- Islands are then compiled in setSource (at the first valid call). const bool islands_compiled = m_gim.metadata().contains(); @@ -934,7 +1107,7 @@ cv::gimpl::GStreamingExecutor::GStreamingExecutor(std::unique_ptr && std::unordered_set > const_ins; // FIXME: THIS ORDER IS IRRELEVANT TO PROTOCOL OR ANY OTHER ORDER! - // FIXME: SAME APPLIES TO THE REGULAR GEEXECUTOR!! + // FIXME: SAME APPLIES TO THE REGULAR GEXECUTOR!! auto xtract_in = [&](ade::NodeHandle slot_nh, std::vector &vec) { const auto orig_data_nh @@ -1101,19 +1274,6 @@ void cv::gimpl::GStreamingExecutor::setSource(GRunArgs &&ins) { GAPI_Assert(state == State::READY || state == State::STOPPED); - const auto is_video = [](const GRunArg &arg) - { - return util::holds_alternative(arg); - }; - const auto num_videos = std::count_if(ins.begin(), ins.end(), is_video); - if (num_videos > 1) - { - // See below why (another reason - no documented behavior - // on handling videos streams of different length) - util::throw_error(std::logic_error("Only one video source is" - " currently supported!")); - } - GModel::ConstGraph gm(*m_orig_graph); // Now the tricky-part: completing Islands compilation if compileStreaming // has been called without meta arguments. @@ -1180,6 +1340,8 @@ void cv::gimpl::GStreamingExecutor::setSource(GRunArgs &&ins) // Walk through the protocol, set-up emitters appropriately // There's a 1:1 mapping between emitters and corresponding data inputs. + // Also collect video emitter nodes to use them later in synchronization + std::vector video_emitters; for (auto it : ade::util::zip(ade::util::toRange(m_emitters), ade::util::toRange(ins), ade::util::iota(m_emitters.size()))) @@ -1197,6 +1359,9 @@ void cv::gimpl::GStreamingExecutor::setSource(GRunArgs &&ins) case T::index_of(): #if !defined(GAPI_STANDALONE) emitter.reset(new VideoEmitter{emit_arg}); + // Currently all video inputs are syncronized if sync policy is to drop, + // there is no different fps branches etc, so all video emitters are registered + video_emitters.emplace_back(emit_nh); #else util::throw_error(std::logic_error("Video is not supported in the " "standalone mode")); @@ -1212,6 +1377,8 @@ void cv::gimpl::GStreamingExecutor::setSource(GRunArgs &&ins) } } + m_sync->registerVideoEmitters(std::move(video_emitters)); + // FIXME: The below code assumes our graph may have only one // real video source (and so, only one stream which may really end) // all other inputs are "constant" generators. @@ -1249,7 +1416,7 @@ void cv::gimpl::GStreamingExecutor::setSource(GRunArgs &&ins) auto emitter = m_gim.metadata(eh).get().object; // Collect all reader queues from the emitter's the only output object - auto out_queues = reader_queues(*m_island_graph, eh->outNodes().front()); + auto out_queues = m_sync->outQueues(eh); m_threads.emplace_back(emitterActorThread, emitter, @@ -1258,6 +1425,8 @@ void cv::gimpl::GStreamingExecutor::setSource(GRunArgs &&ins) real_video_completion_cb); } + m_sync->start(); + // Now do this for every island (in a topological order) for (auto &&op : m_ops) { @@ -1341,6 +1510,7 @@ void cv::gimpl::GStreamingExecutor::wait_shutdown() // FIXME: Of course it can be designed much better for (auto &t : m_threads) t.join(); m_threads.clear(); + m_sync->join(); // Clear all queues // If there are constant emitters, internal queues @@ -1352,7 +1522,10 @@ void cv::gimpl::GStreamingExecutor::wait_shutdown() for (auto &q : m_emitter_queues) q.clear(); for (auto &q : m_sink_queues) q->clear(); for (auto &q : m_internal_queues) q->clear(); + m_const_emitter_queues.clear(); + m_const_vals.clear(); m_out_queue.clear(); + m_sync->clear(); for (auto &&op : m_ops) { op.isl_exec->handleStopStream(); diff --git a/modules/gapi/src/executor/gstreamingexecutor.hpp b/modules/gapi/src/executor/gstreamingexecutor.hpp index b6093ac1ef..40b7872682 100644 --- a/modules/gapi/src/executor/gstreamingexecutor.hpp +++ b/modules/gapi/src/executor/gstreamingexecutor.hpp @@ -167,6 +167,9 @@ protected: std::vector m_emitters; std::vector m_sinks; + class Synchronizer; + std::unique_ptr m_sync; + std::vector m_threads; std::vector m_emitter_queues; diff --git a/modules/gapi/test/streaming/gapi_streaming_sync_tests.cpp b/modules/gapi/test/streaming/gapi_streaming_sync_tests.cpp new file mode 100644 index 0000000000..af15a0e36a --- /dev/null +++ b/modules/gapi/test/streaming/gapi_streaming_sync_tests.cpp @@ -0,0 +1,220 @@ +// This file is part of OpenCV project. +// It is subject to the license terms in the LICENSE file found in the top-level directory +// of this distribution and at http://opencv.org/license.html. +// +// Copyright (C) 2021 Intel Corporation + +#include "../test_precomp.hpp" + +#include +#include +#include +#include +#include + +namespace opencv_test { +namespace { + +using ts_t = int64_t; +using ts_vec = std::vector; +using cv::gapi::streaming::sync_policy; + +ts_t calcLeastCommonMultiple(const ts_vec& values) { + ts_t res = *std::max_element(values.begin(), values.end()); + auto isDivisor = [&](ts_t v) { return res % v == 0; }; + while(!std::all_of(values.begin(), values.end(), isDivisor)) { + res++; + } + return res; +} + +struct TimestampGenerationParams { + const ts_vec frame_times; + sync_policy policy; + ts_t end_time; + TimestampGenerationParams(const ts_vec& ft, sync_policy sp, ts_t et = 25) + : frame_times(ft), policy(sp), end_time(et) { + } +}; + +class MultiFrameSource { + class SingleSource : public cv::gapi::wip::IStreamSource { + MultiFrameSource& m_source; + std::size_t m_idx; + public: + SingleSource(MultiFrameSource& s, std::size_t idx) + : m_source(s) + , m_idx(idx) + {} + virtual bool pull(cv::gapi::wip::Data& data) { + return m_source.pull(data, m_idx); + } + virtual GMetaArg descr_of() const { return GMetaArg{m_source.desc()}; } + }; + + TimestampGenerationParams p; + ts_vec m_current_times; + cv::Mat m_mat; + +public: + MultiFrameSource(const TimestampGenerationParams& params) + : p(params) + , m_current_times(p.frame_times.size(), 0u) + , m_mat(8, 8, CV_8UC1) { + } + + bool pull(cv::gapi::wip::Data& data, std::size_t idx) { + cv::randn(m_mat, 127, 32); + GAPI_Assert(idx < p.frame_times.size()); + m_current_times[idx] += p.frame_times[idx]; + if (m_current_times[idx] >= p.end_time) { + return false; + } + data = m_mat.clone(); + data.meta[cv::gapi::streaming::meta_tag::timestamp] = m_current_times[idx]; + return true; + } + + cv::gapi::wip::IStreamSource::Ptr getSource(std::size_t idx) { + return cv::gapi::wip::IStreamSource::Ptr{new SingleSource(*this, idx)}; + } + + GMatDesc desc() const { return cv::descr_of(m_mat); } +}; + +class TimestampChecker { + TimestampGenerationParams p; + ts_t m_synced_time = 0u; + ts_t m_synced_frame_time = 0u; +public: + TimestampChecker(const TimestampGenerationParams& params) + : p(params) + , m_synced_frame_time(calcLeastCommonMultiple(p.frame_times)) { + } + + void checkNext(const ts_vec& timestamps) { + if (p.policy == sync_policy::dont_sync) { + // don't check timestamps if the policy is dont_sync + return; + } + m_synced_time += m_synced_frame_time; + for (const auto& ts : timestamps) { + EXPECT_EQ(m_synced_time, ts); + } + } + + std::size_t nFrames() const { + auto frame_time = p.policy == sync_policy::dont_sync + ? *std::max_element(p.frame_times.begin(), p.frame_times.end()) + : m_synced_frame_time; + auto n_frames = p.end_time / frame_time; + GAPI_Assert(n_frames > 0u); + return n_frames; + } +}; + +struct TimestampSyncTest : public ::testing::TestWithParam { + void run(cv::GProtoInputArgs&& ins, cv::GProtoOutputArgs&& outs, + const ts_vec& frame_times) { + auto video_in_n = frame_times.size(); + GAPI_Assert(video_in_n <= ins.m_args.size()); + // Assume that all remaining inputs are const + auto const_in_n = ins.m_args.size() - video_in_n; + auto out_n = outs.m_args.size(); + auto policy = GetParam(); + TimestampGenerationParams ts_params(frame_times, policy); + MultiFrameSource source(ts_params); + + GRunArgs gins; + for (std::size_t i = 0; i < video_in_n; i++) { + gins += cv::gin(source.getSource(i)); + } + auto desc = source.desc(); + cv::Mat const_mat = cv::Mat::eye(desc.size.height, + desc.size.width, + CV_MAKE_TYPE(desc.depth, desc.chan)); + for (std::size_t i = 0; i < const_in_n; i++) { + gins += cv::gin(const_mat); + } + ts_vec out_timestamps(out_n); + cv::GRunArgsP gouts{}; + for (auto& t : out_timestamps) { + gouts += cv::gout(t); + } + + auto pipe = cv::GComputation(std::move(ins), std::move(outs)) + .compileStreaming(cv::compile_args(policy)); + + pipe.setSource(std::move(gins)); + pipe.start(); + + std::size_t frames = 0u; + TimestampChecker checker(ts_params); + while(pipe.pull(std::move(gouts))) { + checker.checkNext(out_timestamps); + frames++; + } + + EXPECT_EQ(checker.nFrames(), frames); + } +}; + +} // anonymous namespace + +TEST_P(TimestampSyncTest, Basic) +{ + cv::GMat in1, in2; + auto out = cv::gapi::add(in1, in2); + auto ts = cv::gapi::streaming::timestamp(out); + + run(cv::GIn(in1, in2), cv::GOut(ts), {1,2}); +} + +TEST_P(TimestampSyncTest, ThreeInputs) +{ + cv::GMat in1, in2, in3; + auto tmp = cv::gapi::add(in1, in2); + auto out = cv::gapi::add(tmp, in3); + auto ts = cv::gapi::streaming::timestamp(out); + + run(cv::GIn(in1, in2, in3), cv::GOut(ts), {2,4,3}); +} + +TEST_P(TimestampSyncTest, TwoOutputs) +{ + cv::GMat in1, in2, in3; + auto out1 = cv::gapi::add(in1, in3); + auto out2 = cv::gapi::add(in2, in3); + auto ts1 = cv::gapi::streaming::timestamp(out1); + auto ts2 = cv::gapi::streaming::timestamp(out2); + + run(cv::GIn(in1, in2, in3), cv::GOut(ts1, ts2), {1,4,2}); +} + +TEST_P(TimestampSyncTest, ConstInput) +{ + cv::GMat in1, in2, in3; + auto out1 = cv::gapi::add(in1, in3); + auto out2 = cv::gapi::add(in2, in3); + auto ts1 = cv::gapi::streaming::timestamp(out1); + auto ts2 = cv::gapi::streaming::timestamp(out2); + + run(cv::GIn(in1, in2, in3), cv::GOut(ts1, ts2), {1,2}); +} + +TEST_P(TimestampSyncTest, ChangeSource) +{ + cv::GMat in1, in2, in3; + auto out1 = cv::gapi::add(in1, in3); + auto out2 = cv::gapi::add(in2, in3); + auto ts1 = cv::gapi::streaming::timestamp(out1); + auto ts2 = cv::gapi::streaming::timestamp(out2); + + run(cv::GIn(in1, in2, in3), cv::GOut(ts1, ts2), {1,2}); + run(cv::GIn(in1, in2, in3), cv::GOut(ts1, ts2), {1,2}); +} + +INSTANTIATE_TEST_CASE_P(InputSynchronization, TimestampSyncTest, + Values(sync_policy::dont_sync, + sync_policy::drop)); +} // namespace opencv_test diff --git a/modules/gapi/test/streaming/gapi_streaming_tests.cpp b/modules/gapi/test/streaming/gapi_streaming_tests.cpp index 627c1438b3..e7c59afe40 100644 --- a/modules/gapi/test/streaming/gapi_streaming_tests.cpp +++ b/modules/gapi/test/streaming/gapi_streaming_tests.cpp @@ -1104,25 +1104,37 @@ struct GAPI_Streaming_Unit: public ::testing::Test { // FIXME: (GAPI_Streaming_Types, XChangeOpaque) test is missing here! // FIXME: (GAPI_Streaming_Types, OutputOpaque) test is missing here! -TEST_F(GAPI_Streaming_Unit, TestTwoVideoSourcesFail) +TEST(GAPI_Streaming, TestTwoVideosDifferentLength) { - auto c_desc = cv::GMatDesc{CV_8U,3,{768,576}}; - auto m_desc = cv::descr_of(m); - auto path = findDataFile("cv/video/768x576.avi"); + initTestDataPath(); + auto desc = cv::GMatDesc{CV_8U,3,{768,576}}; + std::string path1, path2; try { - sc = cc.compileStreaming(c_desc, m_desc); - // FIXME: it should be EXPECT_NO_THROW() - sc.setSource(cv::gin(gapi::wip::make_src(path), m)); - sc = cc.compileStreaming(m_desc, c_desc); - // FIXME: it should be EXPECT_NO_THROW() - sc.setSource(cv::gin(m, gapi::wip::make_src(path))); + path1 = findDataFile("cv/video/768x576.avi"); + path2 = findDataFile("highgui/video/big_buck_bunny.avi"); } catch(...) { - throw SkipTestException("Video file can not be opened"); + throw SkipTestException("Video file can not be found"); } - sc = cc.compileStreaming(c_desc, c_desc); - auto c_ptr = gapi::wip::make_src(path); - EXPECT_ANY_THROW(sc.setSource(cv::gin(c_ptr, c_ptr))); + cv::GMat in1, in2; + auto out = in1 + cv::gapi::resize(in2, desc.size); + + cv::GComputation cc(cv::GIn(in1, in2), cv::GOut(out)); + auto sc = cc.compileStreaming(); + + sc.setSource(cv::gin(gapi::wip::make_src(path1), + gapi::wip::make_src(path2))); + sc.start(); + + cv::Mat out_mat; + std::size_t frames = 0u; + while(sc.pull(cv::gout(out_mat))) { + frames++; + } + + // big_buck_bunny.avi has 125 frames, 768x576.avi - 100 frames, + // expect framework to stop after 100 frames + EXPECT_EQ(100u, frames); } TEST_F(GAPI_Streaming_Unit, TestStartWithoutnSetSource)