// This file is part of OpenCV project. // It is subject to the license terms in the LICENSE file found in the top-level directory // of this distribution and at http://opencv.org/license.html. // // Copyright (C) 2019 Intel Corporation #include "precomp.hpp" #include #include #include #include "executor/gstreamingexecutor.hpp" #include "compiler/passes/passes.hpp" #include "backends/common/gbackend.hpp" // createMat namespace { using namespace cv::gimpl::stream; #if !defined(GAPI_STANDALONE) class VideoEmitter final: public cv::gimpl::GIslandEmitter { cv::gapi::wip::IStreamSource::Ptr src; virtual bool pull(cv::GRunArg &arg) override { // FIXME: probably we can maintain a pool of (then) pre-allocated // buffers to avoid runtime allocations. // Pool size can be determined given the internal queue size. cv::gapi::wip::Data newData; if (!src->pull(newData)) { return false; } arg = std::move(static_cast(newData)); return true; } public: explicit VideoEmitter(const cv::GRunArg &arg) { src = cv::util::get(arg); } }; #endif // GAPI_STANDALONE class ConstEmitter final: public cv::gimpl::GIslandEmitter { cv::GRunArg m_arg; virtual bool pull(cv::GRunArg &arg) override { arg = const_cast(m_arg); // FIXME: variant workaround return true; } public: explicit ConstEmitter(const cv::GRunArg &arg) : m_arg(arg) { } }; struct DataQueue { static const char *name() { return "StreamingDataQueue"; } explicit DataQueue(std::size_t capacity) { if (capacity) { q.set_capacity(capacity); } } cv::gimpl::stream::Q q; }; std::vector reader_queues( ade::Graph &g, const ade::NodeHandle &obj) { ade::TypedGraph qgr(g); std::vector result; for (auto &&out_eh : obj->outEdges()) { result.push_back(&qgr.metadata(out_eh).get().q); } return result; } std::vector input_queues( ade::Graph &g, const ade::NodeHandle &obj) { ade::TypedGraph qgr(g); std::vector result; for (auto &&in_eh : obj->inEdges()) { result.push_back(qgr.metadata(in_eh).contains() ? &qgr.metadata(in_eh).get().q : nullptr); } return result; } void sync_data(cv::GRunArgs &results, cv::GRunArgsP &outputs) { namespace own = cv::gapi::own; for (auto && it : ade::util::zip(ade::util::toRange(outputs), ade::util::toRange(results))) { auto &out_obj = std::get<0>(it); auto &res_obj = std::get<1>(it); // FIXME: this conversion should be unified using T = cv::GRunArgP; switch (out_obj.index()) { #if !defined(GAPI_STANDALONE) case T::index_of(): *cv::util::get(out_obj) = std::move(cv::util::get(res_obj)); break; case T::index_of(): *cv::util::get(out_obj) = std::move(cv::util::get(res_obj)); break; #endif // !GAPI_STANDALONE case T::index_of(): *cv::util::get(out_obj) = std::move(cv::util::get(res_obj)); break; case T::index_of(): *cv::util::get(out_obj) = std::move(cv::util::get(res_obj)); break; case T::index_of(): cv::util::get(out_obj).mov(cv::util::get(res_obj)); break; default: GAPI_Assert(false && "This value type is not supported!"); // ...maybe because of STANDALONE mode. break; } } } // This thread is a plain dump source actor. What it do is just: // - Check input queue (the only one) for a control command // - Depending on the state, obtains next data object and pushes it to the // pipeline. void emitterActorThread(std::shared_ptr emitter, Q& in_queue, std::vector out_queues, std::function cb_completion) { // Wait for the explicit Start command. // ...or Stop command, this also happens. Cmd cmd; in_queue.pop(cmd); GAPI_Assert( cv::util::holds_alternative(cmd) || cv::util::holds_alternative(cmd)); if (cv::util::holds_alternative(cmd)) { for (auto &&oq : out_queues) oq->push(cmd); return; } // Now start emitting the data from the source to the pipeline. while (true) { Cmd cancel; if (in_queue.try_pop(cancel)) { // if we just popped a cancellation command... GAPI_Assert(cv::util::holds_alternative(cancel)); // Broadcast it to the readers and quit. for (auto &&oq : out_queues) oq->push(cancel); return; } // Try to obrain next data chunk from the source cv::GRunArg data; if (emitter->pull(data)) { // // On success, broadcast it to our readers for (auto &&oq : out_queues) { // FIXME: FOR SOME REASON, oq->push(Cmd{data}) doesn't work!! // empty mats are arrived to the receivers! // There may be a fatal bug in our variant! const auto tmp = data; oq->push(Cmd{tmp}); } } else { // Otherwise, broadcast STOP message to our readers and quit. // This usually means end-of-stream, so trigger a callback for (auto &&oq : out_queues) oq->push(Cmd{Stop{}}); if (cb_completion) cb_completion(); return; } } } // This thread is a plain dumb processing actor. What it do is just: // - Reads input from the input queue(s), sleeps if there's nothing to read // - Once a full input vector is obtained, passes it to the underlying island // executable for processing. // - Pushes processing results down to consumers - to the subsequent queues. // Note: Every data object consumer has its own queue. void islandActorThread(std::vector in_rcs, // FIXME: this is... std::vector out_rcs, // FIXME: ...basically just... cv::GMetaArgs out_metas, // ... std::shared_ptr island, // FIXME: ...a copy of OpDesc{}. std::vector in_queues, std::vector in_constants, std::vector< std::vector > out_queues) { GAPI_Assert(in_queues.size() == in_rcs.size()); GAPI_Assert(out_queues.size() == out_rcs.size()); GAPI_Assert(out_queues.size() == out_metas.size()); while (true) { std::vector isl_inputs; isl_inputs.resize(in_rcs.size()); // Try to obtain the full input vector. // Note this may block us. We also may get Stop signal here // and then exit the thread. // NOTE: in order to maintain the GRunArg's underlying object // lifetime, keep the whole cmd vector (of size == # of inputs) // in memory. std::vector cmd(in_queues.size()); for (auto &&it : ade::util::indexed(in_queues)) { auto id = ade::util::index(it); auto &q = ade::util::value(it); isl_inputs[id].first = in_rcs[id]; if (q == nullptr) { // NULL queue means a graph-constant value // (like a value-initialized scalar) // FIXME: Variant move problem isl_inputs[id].second = const_cast(in_constants[id]); } else { q->pop(cmd[id]); if (cv::util::holds_alternative(cmd[id])) { // FIXME: This logic must be unified with what collectorThread is doing! // Just got a stop sign. Reiterate through all queues // and rewind data to every Stop sign per queue for (auto &&qit : ade::util::indexed(in_queues)) { auto id2 = ade::util::index(qit); auto &q2 = ade::util::value(qit); if (id == id2) continue; Cmd cmd2; while (q2 && !cv::util::holds_alternative(cmd2)) q2->pop(cmd2); } // Broadcast Stop down to the pipeline and quit for (auto &&out_qq : out_queues) { for (auto &&out_q : out_qq) out_q->push(Cmd{Stop{}}); } return; } // FIXME: MOVE PROBLEM const cv::GRunArg &in_arg = cv::util::get(cmd[id]); #if defined(GAPI_STANDALONE) // Standalone mode - simply store input argument in the vector as-is isl_inputs[id].second = in_arg; #else // Make Islands operate on own:: data types (i.e. in the same // environment as GExecutor provides) // This way several backends (e.g. Fluid) remain OpenCV-independent. switch (in_arg.index()) { case cv::GRunArg::index_of(): isl_inputs[id].second = cv::GRunArg{cv::to_own(cv::util::get(in_arg))}; break; case cv::GRunArg::index_of(): isl_inputs[id].second = cv::GRunArg{cv::to_own(cv::util::get(in_arg))}; break; default: isl_inputs[id].second = in_arg; break; } #endif // GAPI_STANDALONE } } // Once the vector is obtained, prepare data for island execution // Note - we first allocate output vector via GRunArg! // Then it is converted to a GRunArgP. std::vector isl_outputs; std::vector out_data; isl_outputs.resize(out_rcs.size()); out_data.resize(out_rcs.size()); for (auto &&it : ade::util::indexed(out_rcs)) { auto id = ade::util::index(it); auto &r = ade::util::value(it); #if !defined(GAPI_STANDALONE) using MatType = cv::Mat; using SclType = cv::Scalar; #else using MatType = cv::gapi::own::Mat; using SclType = cv::gapi::own::Scalar; #endif // GAPI_STANDALONE switch (r.shape) { // Allocate a data object based on its shape & meta, and put it into our vectors. // Yes, first we put a cv::Mat GRunArg, and then specify _THAT_ // pointer as an output parameter - to make sure that after island completes, // our GRunArg still has the right (up-to-date) value. // Same applies to other types. // FIXME: This is absolutely ugly but seem to work perfectly for its purpose. case cv::GShape::GMAT: { MatType newMat; cv::gimpl::createMat(cv::util::get(out_metas[id]), newMat); out_data[id] = cv::GRunArg(std::move(newMat)); isl_outputs[id] = { r, cv::GRunArgP(&cv::util::get(out_data[id])) }; } break; case cv::GShape::GSCALAR: { SclType newScl; out_data[id] = cv::GRunArg(std::move(newScl)); isl_outputs[id] = { r, cv::GRunArgP(&cv::util::get(out_data[id])) }; } break; case cv::GShape::GARRAY: { cv::detail::VectorRef newVec; cv::util::get(r.ctor)(newVec); out_data[id] = cv::GRunArg(std::move(newVec)); // VectorRef is implicitly shared so no pointer is taken here const auto &rr = cv::util::get(out_data[id]); // FIXME: that variant MOVE problem again isl_outputs[id] = { r, cv::GRunArgP(rr) }; } break; default: cv::util::throw_error(std::logic_error("Unsupported GShape")); break; } } // Now ask Island to execute on this data island->run(std::move(isl_inputs), std::move(isl_outputs)); // Once executed, dispatch our results down to the pipeline. for (auto &&it : ade::util::zip(ade::util::toRange(out_queues), ade::util::toRange(out_data))) { for (auto &&q : std::get<0>(it)) { // FIXME: FATAL VARIANT ISSUE!! const auto tmp = std::get<1>(it); q->push(Cmd{tmp}); } } } } // The idea of collectorThread is easy. If there're multiple outputs // in the graph, we need to pull an object from every associated queue // and then put the resulting vector into one single queue. While it // looks redundant, it simplifies dramatically the way how try_pull() // is implemented - we need to check one queue instead of many. void collectorThread(std::vector in_queues, Q& out_queue) { while (true) { cv::GRunArgs this_result(in_queues.size()); for (auto &&it : ade::util::indexed(in_queues)) { Cmd cmd; ade::util::value(it)->pop(cmd); if (cv::util::holds_alternative(cmd)) { // FIXME: Unify this code with island thread for (auto &&qit : ade::util::indexed(in_queues)) { if (ade::util::index(qit) == ade::util::index(it)) continue; Cmd cmd2; while (!cv::util::holds_alternative(cmd2)) ade::util::value(qit)->pop(cmd2); } out_queue.push(Cmd{Stop{}}); return; } else { // FIXME: MOVE_PROBLEM const cv::GRunArg &in_arg = cv::util::get(cmd); this_result[ade::util::index(it)] = in_arg; // FIXME: Check for other message types. } } out_queue.push(Cmd{this_result}); } } } // anonymous namespace cv::gimpl::GStreamingExecutor::GStreamingExecutor(std::unique_ptr &&g_model) : m_orig_graph(std::move(g_model)) , m_island_graph(GModel::Graph(*m_orig_graph).metadata() .get().model) , m_gim(*m_island_graph) { GModel::Graph gm(*m_orig_graph); // NB: Right now GIslandModel is acyclic, and all the below code assumes that. // NB: This naive execution code is taken from GExecutor nearly "as-is" const auto proto = gm.metadata().get(); m_emitters .resize(proto.in_nhs.size()); m_emitter_queues.resize(proto.in_nhs.size()); m_sinks .resize(proto.out_nhs.size()); m_sink_queues .resize(proto.out_nhs.size()); // Very rough estimation to limit internal queue sizes. // Pipeline depth is equal to number of its (pipeline) steps. const auto queue_capacity = std::count_if (m_gim.nodes().begin(), m_gim.nodes().end(), [&](ade::NodeHandle nh) { return m_gim.metadata(nh).get().k == NodeKind::ISLAND; }); auto sorted = m_gim.metadata().get(); for (auto nh : sorted.nodes()) { switch (m_gim.metadata(nh).get().k) { case NodeKind::ISLAND: { std::vector input_rcs; std::vector output_rcs; std::vector in_constants; cv::GMetaArgs output_metas; input_rcs.reserve(nh->inNodes().size()); in_constants.reserve(nh->inNodes().size()); // FIXME: Ugly output_rcs.reserve(nh->outNodes().size()); output_metas.reserve(nh->outNodes().size()); std::unordered_set > const_ins; // FIXME: THIS ORDER IS IRRELEVANT TO PROTOCOL OR ANY OTHER ORDER! // FIXME: SAME APPLIES TO THE REGULAR GEEXECUTOR!! auto xtract_in = [&](ade::NodeHandle slot_nh, std::vector &vec) { const auto orig_data_nh = m_gim.metadata(slot_nh).get().original_data_node; const auto &orig_data_info = gm.metadata(orig_data_nh).get(); if (orig_data_info.storage == Data::Storage::CONST_VAL) { const_ins.insert(slot_nh); // FIXME: Variant move issue in_constants.push_back(const_cast(gm.metadata(orig_data_nh).get().arg)); } else in_constants.push_back(cv::GRunArg{}); // FIXME: Make it in some smarter way pls if (orig_data_info.shape == GShape::GARRAY) { // FIXME: GArray lost host constructor problem GAPI_Assert(!cv::util::holds_alternative(orig_data_info.ctor)); } vec.emplace_back(RcDesc{ orig_data_info.rc , orig_data_info.shape , orig_data_info.ctor}); }; auto xtract_out = [&](ade::NodeHandle slot_nh, std::vector &vec, cv::GMetaArgs &metas) { const auto orig_data_nh = m_gim.metadata(slot_nh).get().original_data_node; const auto &orig_data_info = gm.metadata(orig_data_nh).get(); if (orig_data_info.shape == GShape::GARRAY) { // FIXME: GArray lost host constructor problem GAPI_Assert(!cv::util::holds_alternative(orig_data_info.ctor)); } vec.emplace_back(RcDesc{ orig_data_info.rc , orig_data_info.shape , orig_data_info.ctor}); metas.emplace_back(orig_data_info.meta); }; // FIXME: JEZ IT WAS SO AWFUL!!!! for (auto in_slot_nh : nh->inNodes()) xtract_in(in_slot_nh, input_rcs); for (auto out_slot_nh : nh->outNodes()) xtract_out(out_slot_nh, output_rcs, output_metas); m_ops.emplace_back(OpDesc{ std::move(input_rcs) , std::move(output_rcs) , std::move(output_metas) , nh , in_constants , m_gim.metadata(nh).get().object}); // Initialize queues for every operation's input ade::TypedGraph qgr(*m_island_graph); for (auto eh : nh->inEdges()) { // ...only if the data is not compile-const if (const_ins.count(eh->srcNode()) == 0) { qgr.metadata(eh).set(DataQueue(queue_capacity)); m_internal_queues.insert(&qgr.metadata(eh).get().q); } } } break; case NodeKind::SLOT: { const auto orig_data_nh = m_gim.metadata(nh).get().original_data_node; m_slots.emplace_back(DataDesc{nh, orig_data_nh}); } break; case NodeKind::EMIT: { const auto emitter_idx = m_gim.metadata(nh).get().proto_index; GAPI_Assert(emitter_idx < m_emitters.size()); m_emitters[emitter_idx] = nh; } break; case NodeKind::SINK: { const auto sink_idx = m_gim.metadata(nh).get().proto_index; GAPI_Assert(sink_idx < m_sinks.size()); m_sinks[sink_idx] = nh; // Also initialize Sink's input queue ade::TypedGraph qgr(*m_island_graph); GAPI_Assert(nh->inEdges().size() == 1u); qgr.metadata(nh->inEdges().front()).set(DataQueue(queue_capacity)); m_sink_queues[sink_idx] = &qgr.metadata(nh->inEdges().front()).get().q; } break; default: GAPI_Assert(false); break; } // switch(kind) } // for(gim nodes) m_out_queue.set_capacity(queue_capacity); } cv::gimpl::GStreamingExecutor::~GStreamingExecutor() { if (state == State::READY || state == State::RUNNING) stop(); } void cv::gimpl::GStreamingExecutor::setSource(GRunArgs &&ins) { GAPI_Assert(state == State::READY || state == State::STOPPED); const auto is_video = [](const GRunArg &arg) { return util::holds_alternative(arg); }; const auto num_videos = std::count_if(ins.begin(), ins.end(), is_video); if (num_videos > 1) { // See below why (another reason - no documented behavior // on handling videos streams of different length) util::throw_error(std::logic_error("Only one video source is" " currently supported!")); } // Walk through the protocol, set-up emitters appropriately // There's a 1:1 mapping between emitters and corresponding data inputs. for (auto it : ade::util::zip(ade::util::toRange(m_emitters), ade::util::toRange(ins), ade::util::iota(m_emitters.size()))) { auto emit_nh = std::get<0>(it); auto& emit_arg = std::get<1>(it); auto emit_idx = std::get<2>(it); auto& emitter = m_gim.metadata(emit_nh).get().object; using T = GRunArg; switch (emit_arg.index()) { // Create a streaming emitter. // Produces the next video frame when pulled. case T::index_of(): #if !defined(GAPI_STANDALONE) emitter.reset(new VideoEmitter{emit_arg}); #else util::throw_error(std::logic_error("Video is not supported in the " "standalone mode")); #endif break; default: // Create a constant emitter. // Produces always the same ("constant") value when pulled. emitter.reset(new ConstEmitter{emit_arg}); m_const_emitter_queues.push_back(&m_emitter_queues[emit_idx]); break; } } // FIXME: The below code assumes our graph may have only one // real video source (and so, only one stream which may really end) // all other inputs are "constant" generators. // Craft here a completion callback to notify Const emitters that // a video source is over auto real_video_completion_cb = [this]() { for (auto q : m_const_emitter_queues) q->push(Cmd{Stop{}}); }; // FIXME: ONLY now, after all executable objects are created, // we can set up our execution threads. Let's do it. // First create threads for all the emitters. // FIXME: One way to avoid this may be including an Emitter object as a part of // START message. Why not? if (state == State::READY) { stop(); } for (auto it : ade::util::indexed(m_emitters)) { const auto id = ade::util::index(it); // = index in GComputation's protocol const auto eh = ade::util::value(it); // Prepare emitter thread parameters auto emitter = m_gim.metadata(eh).get().object; // Collect all reader queues from the emitter's the only output object auto out_queues = reader_queues(*m_island_graph, eh->outNodes().front()); m_threads.emplace_back(emitterActorThread, emitter, std::ref(m_emitter_queues[id]), out_queues, real_video_completion_cb); } // Now do this for every island (in a topological order) for (auto &&op : m_ops) { // Prepare island thread parameters auto island = m_gim.metadata(op.nh).get().object; // Collect actor's input queues auto in_queues = input_queues(*m_island_graph, op.nh); // Collect actor's output queues. // This may be tricky... std::vector< std::vector > out_queues; for (auto &&out_eh : op.nh->outNodes()) { out_queues.push_back(reader_queues(*m_island_graph, out_eh)); } m_threads.emplace_back(islandActorThread, op.in_objects, op.out_objects, op.out_metas, island, in_queues, op.in_constants, out_queues); } // Finally, start a collector thread. m_threads.emplace_back(collectorThread, m_sink_queues, std::ref(m_out_queue)); state = State::READY; } void cv::gimpl::GStreamingExecutor::start() { if (state == State::STOPPED) { util::throw_error(std::logic_error("Please call setSource() before start() " "if the pipeline has been already stopped")); } GAPI_Assert(state == State::READY); // Currently just trigger our emitters to work state = State::RUNNING; for (auto &q : m_emitter_queues) { q.push(stream::Cmd{stream::Start{}}); } } void cv::gimpl::GStreamingExecutor::wait_shutdown() { // This utility is used by pull/try_pull/stop() to uniformly // shutdown the worker threads. // FIXME: Of course it can be designed much better for (auto &t : m_threads) t.join(); m_threads.clear(); // Clear all queues // If there are constant emitters, internal queues // may be polluted with constant values and have extra // data at the point of shutdown. // It usually happens when there's multiple inputs, // one constant and one is not, and the latter ends (e.g. // with end-of-stream). for (auto &q : m_emitter_queues) q.clear(); for (auto &q : m_sink_queues) q->clear(); for (auto &q : m_internal_queues) q->clear(); m_out_queue.clear(); state = State::STOPPED; } bool cv::gimpl::GStreamingExecutor::pull(cv::GRunArgsP &&outs) { if (state == State::STOPPED) return false; GAPI_Assert(state == State::RUNNING); GAPI_Assert(m_sink_queues.size() == outs.size()); Cmd cmd; m_out_queue.pop(cmd); if (cv::util::holds_alternative(cmd)) { wait_shutdown(); return false; } GAPI_Assert(cv::util::holds_alternative(cmd)); cv::GRunArgs &this_result = cv::util::get(cmd); sync_data(this_result, outs); return true; } bool cv::gimpl::GStreamingExecutor::try_pull(cv::GRunArgsP &&outs) { if (state == State::STOPPED) return false; GAPI_Assert(m_sink_queues.size() == outs.size()); Cmd cmd; if (!m_out_queue.try_pop(cmd)) { return false; } if (cv::util::holds_alternative(cmd)) { wait_shutdown(); return false; } GAPI_Assert(cv::util::holds_alternative(cmd)); cv::GRunArgs &this_result = cv::util::get(cmd); sync_data(this_result, outs); return true; } void cv::gimpl::GStreamingExecutor::stop() { if (state == State::STOPPED) return; // FIXME: ...and how to deal with still-unread data then? // Push a Stop message to the every emitter, // wait until it broadcasts within the pipeline, // FIXME: worker threads could stuck on push()! // need to read the output queues until Stop! for (auto &q : m_emitter_queues) { q.push(stream::Cmd{stream::Stop{}}); } // Pull messages from the final queue to ensure completion Cmd cmd; while (!cv::util::holds_alternative(cmd)) { m_out_queue.pop(cmd); } GAPI_Assert(cv::util::holds_alternative(cmd)); wait_shutdown(); } bool cv::gimpl::GStreamingExecutor::running() const { return (state == State::RUNNING); }