From d98e07c3d37a30b077784f5b5806c302e18c7534 Mon Sep 17 00:00:00 2001 From: Anatoliy Talamanov Date: Fri, 25 Mar 2022 11:19:53 +0300 Subject: [PATCH] Merge pull request #21660 from TolyaTalamanov:at/handle-exception-in-streamingexecutor [G-API] Handle exceptions in streaming executor * Handle exceptions in streaming executor * Rethrow exception in non-streaming executor * Clean up * Put more tests * Handle exceptions in IE backend * Handle exception in IE callbacks * Handle exception in GExecutor * Handle all exceptions in IE backend * Not only (std::exception& e) * Fix comments to review * Handle input exception in generic way * Fix comment * Clean up * Apply review comments * Put more comments * Fix alignment * Move test outside of HAVE_NGRAPH * Fix compilation --- .../include/opencv2/gapi/cpu/gcpukernel.hpp | 7 +- modules/gapi/src/backends/ie/giebackend.cpp | 130 +++--- .../backends/streaming/gstreamingbackend.cpp | 2 + modules/gapi/src/compiler/gislandmodel.cpp | 12 +- modules/gapi/src/compiler/gislandmodel.hpp | 15 +- modules/gapi/src/executor/gexecutor.cpp | 23 +- .../gapi/src/executor/gstreamingexecutor.cpp | 425 +++++++++++------- .../gapi/src/executor/gstreamingexecutor.hpp | 11 +- .../gapi/test/infer/gapi_infer_ie_test.cpp | 41 ++ .../test/streaming/gapi_streaming_tests.cpp | 164 +++++++ 10 files changed, 617 insertions(+), 213 deletions(-) diff --git a/modules/gapi/include/opencv2/gapi/cpu/gcpukernel.hpp b/modules/gapi/include/opencv2/gapi/cpu/gcpukernel.hpp index 48909a84fc..ff3ee45ed3 100644 --- a/modules/gapi/include/opencv2/gapi/cpu/gcpukernel.hpp +++ b/modules/gapi/include/opencv2/gapi/cpu/gcpukernel.hpp @@ -2,12 +2,17 @@ // It is subject to the license terms in the LICENSE file found in the top-level directory // of this distribution and at http://opencv.org/license.html. // -// Copyright (C) 2018-2020 Intel Corporation +// Copyright (C) 2018-2022 Intel Corporation #ifndef OPENCV_GAPI_GCPUKERNEL_HPP #define OPENCV_GAPI_GCPUKERNEL_HPP +#ifdef _MSC_VER +#pragma warning(disable: 4702) // "Unreachable code" +// on postprocess(...) call inside OCVCallHelper +#endif + #include #include #include diff --git a/modules/gapi/src/backends/ie/giebackend.cpp b/modules/gapi/src/backends/ie/giebackend.cpp index 711827d574..52c60c1f0b 100644 --- a/modules/gapi/src/backends/ie/giebackend.cpp +++ b/modules/gapi/src/backends/ie/giebackend.cpp @@ -389,10 +389,13 @@ public: const IEUnit &uu; cv::gimpl::GIslandExecutable::IOutput &out; - // NB: Need to gurantee that MediaFrame::View don't die until request is over. + // NB: Need to gurantee that MediaFrame::View doesn't die until request is over. using Views = std::vector>; Views views; + // To store exception appeared in callback. + std::exception_ptr eptr; + private: cv::detail::VectorRef& outVecRef(std::size_t idx); @@ -656,7 +659,7 @@ std::vector cv::gimpl::ie::IECompiled::createInfe class cv::gimpl::ie::RequestPool { public: using RunF = std::function; - using CallbackF = std::function; + using CallbackF = std::function; // NB: The task is represented by: // RunF - function which is set blobs and run async inference. @@ -675,7 +678,7 @@ private: void callback(Task task, size_t id, IE::InferRequest request, - IE::StatusCode code); + IE::StatusCode code) noexcept; void setup(); QueueClass m_idle_ids; @@ -706,32 +709,28 @@ void cv::gimpl::ie::RequestPool::execute(cv::gimpl::ie::RequestPool::Task&& t) { static_cast( std::bind(&cv::gimpl::ie::RequestPool::callback, this, t, id, _1, _2))); - t.run(request); + // NB: InferRequest is already marked as busy + // in case of exception need to return it back to the idle. + try { + t.run(request); + } catch (...) { + request.SetCompletionCallback([](){}); + m_idle_ids.push(id); + throw; + } } void cv::gimpl::ie::RequestPool::callback(cv::gimpl::ie::RequestPool::Task task, size_t id, IE::InferRequest request, - IE::StatusCode code) { - // FIXME: Any exception which is arrised here must not leave this callback, - // because it won't be handled. - try { - if (code != IE::StatusCode::OK) { - throw std::logic_error("IE::InferRequest finished with not OK status"); - } - task.callback(request); - // NB: IE::InferRequest keeps the callback until the new one is set. - // Since user's callback might keep resources that should be released, - // need to destroy its after execution. - // Let's set the empty one to cause the destruction of a callback. - request.SetCompletionCallback([](){}); - m_idle_ids.push(id); - } catch (const std::exception& e) { - GAPI_LOG_FATAL(NULL, "Callback failed with error: " << e.what()); - //FIXME: Exception CAN't be rethrown here, since this callback works - // in separate IE thread and such scenarios aren't handled properly in - // G-API so far. - } + IE::StatusCode code) noexcept { + // NB: Inference is over. + // 1. Run callback + // 2. Destroy callback to free resources. + // 3. Mark InferRequest as idle. + task.callback(request, code); + request.SetCompletionCallback([](){}); + m_idle_ids.push(id); } // NB: Not thread-safe. @@ -786,18 +785,19 @@ void cv::gimpl::ie::GIEExecutable::run(cv::gimpl::GIslandExecutable::IInput &in // 1. Collect island inputs/outputs. // 2. Create kernel context. (Every kernel has his own context). // 3. If the EndOfStream message is recieved, wait until all passed task are done. - // 4. + // 4. If the Exception message is revieved, propagate it further. + // 5. // 5.1 Run the kernel. // 5.2 Kernel wait for all nececcary infer requests and start asynchronous execution. // 5.3 After the kernel is finished continue processing next frame. // - // 5. If graph is compiled in non-streaming mode, wait until all tasks are done. + // 6. If graph is compiled in non-streaming mode, wait until all tasks are done. std::vector input_objs; std::vector output_objs; - const auto &in_desc = in.desc(); - const auto in_msg = in.get(); + const auto &in_desc = in.desc(); + auto in_msg = in.get(); if (cv::util::holds_alternative(in_msg)) { @@ -835,10 +835,20 @@ void cv::gimpl::ie::GIEExecutable::run(cv::gimpl::GIslandExecutable::IInput &in const auto &kk = giem.metadata(this_nh).get(); - // (4) Run the kernel. - kk.run(ctx, *m_reqPool); + // (5) Run the kernel. + try { + kk.run(ctx, *m_reqPool); + } catch (...) { + auto eptr = std::current_exception(); + for (auto i : ade::util::iota(ctx->uu.params.num_out)) + { + auto output = ctx->output(i); + ctx->out.post(std::move(output), eptr); + } + return; + } - // (5) In non-streaming mode need to wait until the all tasks are done + // (6) In non-streaming mode need to wait until the all tasks are done // FIXME: Is there more graceful way to handle this case ? if (!m_gm.metadata().contains()) { m_reqPool->waitAll(); @@ -944,19 +954,26 @@ static IE::PreProcessInfo configurePreProcInfo(const IE::InputInfo::CPtr& ii, // NB: This is a callback used by async infer // to post outputs blobs (cv::GMat's). -static void PostOutputs(InferenceEngine::InferRequest &request, - std::shared_ptr ctx) { +static void PostOutputs(InferenceEngine::InferRequest &request, + InferenceEngine::StatusCode code, + std::shared_ptr ctx) { GAPI_ITT_STATIC_LOCAL_HANDLE(ie_cb_post_outputs_hndl, "IE_async_callback_PostOutputs"); GAPI_ITT_AUTO_TRACE_GUARD(ie_cb_post_outputs_hndl); - for (auto i : ade::util::iota(ctx->uu.params.num_out)) - { + if (code != IE::StatusCode::OK) { + std::stringstream ss; + ss << "InferRequest for model: " << ctx->uu.params.model_path + << " finished with InferenceEngine::StatusCode: " << static_cast(code); + ctx->eptr = std::make_exception_ptr(std::logic_error(ss.str())); + } + + for (auto i : ade::util::iota(ctx->uu.params.num_out)) { auto& out_mat = ctx->outMatR(i); IE::Blob::Ptr this_blob = request.GetBlob(ctx->uu.params.output_names[i]); copyFromIE(this_blob, out_mat); auto output = ctx->output(i); ctx->out.meta(output, ctx->input(0).meta); - ctx->out.post(std::move(output)); + ctx->out.post(std::move(output), ctx->eptr); } } @@ -966,7 +983,9 @@ public: std::shared_ptr ctx, std::vector>&& cached_dims); - void operator()(InferenceEngine::InferRequest &request, size_t pos) const; + void operator()(InferenceEngine::InferRequest &request, + InferenceEngine::StatusCode code, + size_t pos) const; private: struct Priv { @@ -987,20 +1006,30 @@ PostOutputsList::PostOutputsList(size_t size, m_priv->cached_dims = std::move(cached_dims); } -void PostOutputsList::operator()(InferenceEngine::InferRequest &req, size_t pos) const { +void PostOutputsList::operator()(InferenceEngine::InferRequest &req, + InferenceEngine::StatusCode code, + size_t pos) const { auto&& ctx = m_priv->ctx; auto&& cached_dims = m_priv->cached_dims; auto&& finished = m_priv->finished; auto&& size = m_priv->size; - for (auto i : ade::util::iota(ctx->uu.params.num_out)) { - std::vector &out_vec = ctx->outVecR(i); - IE::Blob::Ptr out_blob = req.GetBlob(ctx->uu.params.output_names[i]); - GAPI_Assert(out_blob); + if (code != IE::StatusCode::OK) { + ctx->eptr = std::make_exception_ptr( + std::logic_error("IE::InferRequest finished with not OK status")); + } - // FIXME: Avoid data copy. Not sure if it is possible though - out_vec[pos].create(cached_dims[i], toCV(out_blob->getTensorDesc().getPrecision())); - copyFromIE(out_blob, out_vec[pos]); + if (!ctx->eptr) { + for (auto i : ade::util::iota(ctx->uu.params.num_out)) { + std::vector &out_vec = ctx->outVecR(i); + + IE::Blob::Ptr out_blob = req.GetBlob(ctx->uu.params.output_names[i]); + GAPI_Assert(out_blob); + + // FIXME: Avoid data copy. Not sure if it is possible though + out_vec[pos].create(cached_dims[i], toCV(out_blob->getTensorDesc().getPrecision())); + copyFromIE(out_blob, out_vec[pos]); + } } ++finished; @@ -1008,7 +1037,7 @@ void PostOutputsList::operator()(InferenceEngine::InferRequest &req, size_t pos) for (auto i : ade::util::iota(ctx->uu.params.num_out)) { auto output = ctx->output(i); ctx->out.meta(output, ctx->input(0).meta); - ctx->out.post(std::move(output)); + ctx->out.post(std::move(output), ctx->eptr); } } } @@ -1123,7 +1152,7 @@ struct Infer: public cv::detail::KernelTag { // What about to do that in RequestPool ? req.StartAsync(); }, - std::bind(PostOutputs, _1, ctx) + std::bind(PostOutputs, _1, _2, ctx) } ); } @@ -1218,7 +1247,7 @@ struct InferROI: public cv::detail::KernelTag { // What about to do that in RequestPool ? req.StartAsync(); }, - std::bind(PostOutputs, _1, ctx) + std::bind(PostOutputs, _1, _2, ctx) } ); } @@ -1294,7 +1323,6 @@ struct InferList: public cv::detail::KernelTag { static void run(std::shared_ptr ctx, cv::gimpl::ie::RequestPool &reqPool) { - const auto& in_roi_vec = ctx->inArg(0u).rref(); // NB: In case there is no input data need to post output anyway if (in_roi_vec.empty()) { @@ -1335,7 +1363,7 @@ struct InferList: public cv::detail::KernelTag { setROIBlob(req, ctx->uu.params.input_names[0u], this_blob, rc, *ctx); req.StartAsync(); }, - std::bind(callback, std::placeholders::_1, pos) + std::bind(callback, std::placeholders::_1, std::placeholders::_2, pos) } ); } @@ -1506,7 +1534,7 @@ struct InferList2: public cv::detail::KernelTag { } req.StartAsync(); }, - std::bind(callback, std::placeholders::_1, list_idx) + std::bind(callback, std::placeholders::_1, std::placeholders::_2, list_idx) } // task ); } // for diff --git a/modules/gapi/src/backends/streaming/gstreamingbackend.cpp b/modules/gapi/src/backends/streaming/gstreamingbackend.cpp index 4bd2a10ea5..69b5f6c72b 100644 --- a/modules/gapi/src/backends/streaming/gstreamingbackend.cpp +++ b/modules/gapi/src/backends/streaming/gstreamingbackend.cpp @@ -172,6 +172,7 @@ void Copy::Actor::run(cv::gimpl::GIslandExecutable::IInput &in, return; } + GAPI_DbgAssert(cv::util::holds_alternative(in_msg)); const cv::GRunArgs &in_args = cv::util::get(in_msg); GAPI_Assert(in_args.size() == 1u); @@ -212,6 +213,7 @@ public: return; } + GAPI_Assert(cv::util::holds_alternative(in_msg)); const cv::GRunArgs &in_args = cv::util::get(in_msg); GAPI_Assert(in_args.size() == 1u); auto frame = cv::util::get(in_args[0]); diff --git a/modules/gapi/src/compiler/gislandmodel.cpp b/modules/gapi/src/compiler/gislandmodel.cpp index 1a8e0939e2..920fd700fc 100644 --- a/modules/gapi/src/compiler/gislandmodel.cpp +++ b/modules/gapi/src/compiler/gislandmodel.cpp @@ -412,7 +412,17 @@ void GIslandExecutable::run(GIslandExecutable::IInput &in, GIslandExecutable::IO out_objs.emplace_back(ade::util::value(it), out.get(ade::util::checked_cast(ade::util::index(it)))); } - run(std::move(in_objs), std::move(out_objs)); + + try { + run(std::move(in_objs), std::move(out_objs)); + } catch (...) { + auto eptr = std::current_exception(); + for (auto &&it: out_objs) + { + out.post(std::move(it.second), eptr); + } + return; + } // Propagate in-graph meta down to the graph // Note: this is not a complete implementation! Mainly this is a stub diff --git a/modules/gapi/src/compiler/gislandmodel.hpp b/modules/gapi/src/compiler/gislandmodel.hpp index 063504a922..565b3c4f21 100644 --- a/modules/gapi/src/compiler/gislandmodel.hpp +++ b/modules/gapi/src/compiler/gislandmodel.hpp @@ -161,7 +161,12 @@ public: const std::vector &desc() const { return d; } }; struct EndOfStream {}; -using StreamMsg = cv::util::variant; + +struct Exception { + std::exception_ptr eptr; +}; + +using StreamMsg = cv::util::variant; struct GIslandExecutable::IInput: public GIslandExecutable::IODesc { virtual ~IInput() = default; virtual StreamMsg get() = 0; // Get a new input vector (blocking) @@ -169,9 +174,11 @@ struct GIslandExecutable::IInput: public GIslandExecutable::IODesc { }; struct GIslandExecutable::IOutput: public GIslandExecutable::IODesc { virtual ~IOutput() = default; - virtual GRunArgP get(int idx) = 0; // Allocate (wrap) a new data object for output idx - virtual void post(GRunArgP&&) = 0; // Release the object back to the framework (mark available) - virtual void post(EndOfStream&&) = 0; // Post end-of-stream marker back to the framework + virtual GRunArgP get(int idx) = 0; // Allocate (wrap) a new data object for output idx + virtual void post(GRunArgP&&, const std::exception_ptr& = {}) = 0; // Release the object back to the framework (mark available) + virtual void post(EndOfStream&&) = 0; // Post end-of-stream marker back to the framework + virtual void post(Exception&&) = 0; + // Assign accumulated metadata to the given output object. // This method can only be called after get() and before post(). diff --git a/modules/gapi/src/executor/gexecutor.cpp b/modules/gapi/src/executor/gexecutor.cpp index 6c15d1dfc9..b7b0b5c2d0 100644 --- a/modules/gapi/src/executor/gexecutor.cpp +++ b/modules/gapi/src/executor/gexecutor.cpp @@ -270,6 +270,7 @@ class cv::gimpl::GExecutor::Output final: public cv::gimpl::GIslandExecutable::I { cv::gimpl::Mag &mag; std::unordered_map out_idx; + std::exception_ptr eptr; GRunArgP get(int idx) override { @@ -278,8 +279,18 @@ class cv::gimpl::GExecutor::Output final: public cv::gimpl::GIslandExecutable::I out_idx[cv::gimpl::proto::ptr(r)] = idx; return r; } - void post(GRunArgP&&) override { } // Do nothing here + void post(GRunArgP&&, const std::exception_ptr& e) override + { + if (e) + { + eptr = e; + } + } void post(EndOfStream&&) override {} // Do nothing here too + void post(Exception&& ex) override + { + eptr = std::move(ex.eptr); + } void meta(const GRunArgP &out, const GRunArg::Meta &m) override { const auto idx = out_idx.at(cv::gimpl::proto::ptr(out)); @@ -291,6 +302,14 @@ public: { set(rcs); } + + void verify() + { + if (eptr) + { + std::rethrow_exception(eptr); + } + } }; void cv::gimpl::GExecutor::run(cv::gimpl::GRuntimeArgs &&args) @@ -389,6 +408,8 @@ void cv::gimpl::GExecutor::run(cv::gimpl::GRuntimeArgs &&args) Input i{m_res, op.in_objects}; Output o{m_res, op.out_objects}; op.isl_exec->run(i, o); + // NB: Check if execution finished without exception. + o.verify(); } // (7) diff --git a/modules/gapi/src/executor/gstreamingexecutor.cpp b/modules/gapi/src/executor/gstreamingexecutor.cpp index a3a2746acc..34424cb94b 100644 --- a/modules/gapi/src/executor/gstreamingexecutor.cpp +++ b/modules/gapi/src/executor/gstreamingexecutor.cpp @@ -31,6 +31,8 @@ #include #include +#include + namespace { using namespace cv::gimpl::stream; @@ -310,14 +312,13 @@ class QueueReader const std::size_t this_id); public: - bool getInputVector (std::vector &in_queues, - cv::GRunArgs &in_constants, - cv::GRunArgs &isl_inputs); + cv::gimpl::StreamMsg getInputVector (std::vector &in_queues, + cv::GRunArgs &in_constants); - bool getResultsVector(std::vector &in_queues, - const std::vector &in_mapping, - const std::size_t out_size, - cv::GRunArgs &out_results); + using V = cv::util::variant; + V getResultsVector(std::vector &in_queues, + const std::vector &in_mapping, + const std::size_t out_size); }; void rewindToStop(std::vector &in_queues, @@ -369,9 +370,8 @@ void QueueReader::rewindToStop(std::vector &in_queues, ::rewindToStop(in_queues, this_id); } -bool QueueReader::getInputVector(std::vector &in_queues, - cv::GRunArgs &in_constants, - cv::GRunArgs &isl_inputs) +cv::gimpl::StreamMsg QueueReader::getInputVector(std::vector &in_queues, + cv::GRunArgs &in_constants) { // NB: Need to release resources from the previous step, to fetch new ones. // On some systems it might be impossible to allocate new memory @@ -381,72 +381,98 @@ bool QueueReader::getInputVector(std::vector &in_queues, // lifetime, keep the whole cmd vector (of size == # of inputs) // in memory. m_cmd.resize(in_queues.size()); - isl_inputs.resize(in_queues.size()); + cv::GRunArgs isl_inputs(in_queues.size()); + cv::optional exception; for (auto &&it : ade::util::indexed(in_queues)) { - auto id = ade::util::index(it); - auto &q = ade::util::value(it); + auto id = ade::util::index(it); + auto &q = ade::util::value(it); - if (q == nullptr) - { - GAPI_Assert(!in_constants.empty()); - // NULL queue means a graph-constant value (like a - // value-initialized scalar) - // It can also hold a constant value received with - // Stop::Kind::CNST message (see above). - isl_inputs[id] = in_constants[id]; - continue; - } + if (q == nullptr) + { + GAPI_Assert(!in_constants.empty()); + // NULL queue means a graph-constant value (like a + // value-initialized scalar) + // It can also hold a constant value received with + // Stop::Kind::CNST message (see above). + isl_inputs[id] = in_constants[id]; + continue; + } - q->pop(m_cmd[id]); - if (!cv::util::holds_alternative(m_cmd[id])) - { - isl_inputs[id] = cv::util::get(m_cmd[id]); - } - else // A Stop sign - { - const auto &stop = cv::util::get(m_cmd[id]); - if (stop.kind == Stop::Kind::CNST) - { - // We've got a Stop signal from a const source, - // propagated as a result of real stream reaching its - // end. Sometimes these signals come earlier than - // real EOS Stops so are deprioritized -- just - // remember the Const value here and continue - // processing other queues. Set queue pointer to - // nullptr and update the const_val vector - // appropriately - m_finishing = true; - in_queues[id] = nullptr; - in_constants.resize(in_queues.size()); - in_constants[id] = std::move(stop.cdata); + q->pop(m_cmd[id]); + switch (m_cmd[id].index()) + { + case Cmd::index_of(): + isl_inputs[id] = cv::util::get(m_cmd[id]); + break; + case Cmd::index_of(): + { + const auto &stop = cv::util::get(m_cmd[id]); + if (stop.kind == Stop::Kind::CNST) + { + // We've got a Stop signal from a const source, + // propagated as a result of real stream reaching its + // end. Sometimes these signals come earlier than + // real EOS Stops so are deprioritized -- just + // remember the Const value here and continue + // processing other queues. Set queue pointer to + // nullptr and update the const_val vector + // appropriately + m_finishing = true; + in_queues[id] = nullptr; + in_constants.resize(in_queues.size()); + in_constants[id] = std::move(stop.cdata); - // NEXT time (on a next call to getInputVector()), the - // "q==nullptr" check above will be triggered, but now - // we need to make it manually: - isl_inputs[id] = in_constants[id]; - } - else - { - GAPI_Assert(stop.kind == Stop::Kind::HARD); - rewindToStop(in_queues, id); - // After queues are read to the proper indicator, - // indicate end-of-stream - return false; - } // if(Cnst) - } // if(Stop) + // NEXT time (on a next call to getInputVector()), the + // "q==nullptr" check above will be triggered, but now + // we need to make it manually: + isl_inputs[id] = in_constants[id]; + } + else + { + GAPI_Assert(stop.kind == Stop::Kind::HARD); + rewindToStop(in_queues, id); + // After queues are read to the proper indicator, + // indicate end-of-stream + return cv::gimpl::StreamMsg{cv::gimpl::EndOfStream{}}; + } // if(Cnst) + break; + } + case Cmd::index_of(): + { + exception = + cv::util::make_optional(cv::util::get(m_cmd[id])); + break; + } + default: + GAPI_Assert(false && "Unsupported cmd type in getInputVector()"); + } } // for(in_queues) + if (exception.has_value()) { + return cv::gimpl::StreamMsg{exception.value()}; + } + if (m_finishing) { // If the process is about to end (a soft Stop was received // already) and an island has no other inputs than constant // inputs, its queues may all become nullptrs. Indicate it as // "no data". - return !ade::util::all_of(in_queues, [](Q *ptr){return ptr == nullptr;}); + if (ade::util::all_of(in_queues, [](Q *ptr){return ptr == nullptr;})) { + return cv::gimpl::StreamMsg{cv::gimpl::EndOfStream{}}; + } } - return true; // A regular case - there is data to process. + // A regular case - there is data to process + for (auto& arg : isl_inputs) { + if (arg.index() == cv::GRunArg::index_of()) { + arg = cv::GRunArg{ cv::make_rmat(cv::util::get(arg)) + , arg.meta + }; + } + } + return cv::gimpl::StreamMsg{std::move(isl_inputs)}; } // This is a special method to obtain a result vector @@ -474,33 +500,47 @@ bool QueueReader::getInputVector(std::vector &in_queues, // (_may be_ partially filled) to the same final output queue. // The receiver part at the GStreamingExecutor level won't change // because of that. -bool QueueReader::getResultsVector(std::vector &in_queues, - const std::vector &in_mapping, - const std::size_t out_size, - cv::GRunArgs &out_results) + +QueueReader::V QueueReader::getResultsVector(std::vector &in_queues, + const std::vector &in_mapping, + const std::size_t out_size) { + cv::GRunArgs out_results(out_size); m_cmd.resize(out_size); + cv::optional exception; for (auto &&it : ade::util::indexed(in_queues)) { auto ii = ade::util::index(it); auto oi = in_mapping[ii]; auto &q = ade::util::value(it); q->pop(m_cmd[oi]); - if (!cv::util::holds_alternative(m_cmd[oi])) - { - out_results[oi] = std::move(cv::util::get(m_cmd[oi])); - } - else // A Stop sign - { - // In theory, the CNST should never reach here. - // Collector thread never handles the inputs directly - // (collector's input queues are always produced by - // islands in the graph). - rewindToStop(in_queues, ii); - return false; - } // if(Stop) + + switch (m_cmd[oi].index()) { + case Cmd::index_of(): + out_results[oi] = std::move(cv::util::get(m_cmd[oi])); + break; + case Cmd::index_of(): + // In theory, the CNST should never reach here. + // Collector thread never handles the inputs directly + // (collector's input queues are always produced by + // islands in the graph). + rewindToStop(in_queues, ii); + return QueueReader::V(Stop{}); + case Cmd::index_of(): + exception = + cv::util::make_optional(cv::util::get(m_cmd[oi])); + break; + default: + cv::util::throw_error( + std::logic_error("Unexpected cmd kind in getResultsVector")); + } // switch } // for(in_queues) - return true; + + if (exception.has_value()) { + return QueueReader::V(exception.value()); + } + + return QueueReader::V(out_results); } @@ -521,7 +561,9 @@ void emitterActorThread(std::shared_ptr emitter, || cv::util::holds_alternative(cmd)); if (cv::util::holds_alternative(cmd)) { - for (auto &&oq : out_queues) oq->push(cmd); + for (auto &&oq : out_queues) { + oq->push(cmd); + } return; } @@ -547,10 +589,21 @@ void emitterActorThread(std::shared_ptr emitter, // Try to obtain next data chunk from the source cv::GRunArg data; - const bool result = [&](){ - GAPI_ITT_AUTO_TRACE_GUARD(emitter_pull_hndl); - return emitter->pull(data); - }(); + bool result = false; + try { + result = [&](){ + GAPI_ITT_AUTO_TRACE_GUARD(emitter_pull_hndl); + return emitter->pull(data); + }(); + } catch (...) { + auto eptr = std::current_exception(); + for (auto &&oq : out_queues) + { + oq->push(Cmd{cv::gimpl::Exception{eptr}}); + } + // NB: Go to the next iteration. + continue; + } if (result) { @@ -673,28 +726,8 @@ class StreamingInput final: public cv::gimpl::GIslandExecutable::IInput std::vector &in_queues; // FIXME: This can be part of QueueReader cv::GRunArgs &in_constants; // FIXME: This can be part of QueueReader - virtual cv::gimpl::StreamMsg get() override - { - GAPI_ITT_STATIC_LOCAL_HANDLE(inputs_get_hndl, "StreamingInput::get"); - GAPI_ITT_AUTO_TRACE_GUARD(inputs_get_hndl); + cv::optional last_read_msg; - cv::GRunArgs isl_input_args; - - if (!qr.getInputVector(in_queues, in_constants, isl_input_args)) - { - // Stop case - return cv::gimpl::StreamMsg{cv::gimpl::EndOfStream{}}; - } - // Wrap all input cv::Mats with RMats - for (auto& arg : isl_input_args) { - if (arg.index() == cv::GRunArg::index_of()) { - arg = cv::GRunArg{ cv::make_rmat(cv::util::get(arg)) - , arg.meta - }; - } - } - return cv::gimpl::StreamMsg{std::move(isl_input_args)}; - } virtual cv::gimpl::StreamMsg try_get() override { // FIXME: This is not very usable at the moment! @@ -709,17 +742,43 @@ class StreamingInput final: public cv::gimpl::GIslandExecutable::IInput { set(in_descs); } + + const cv::gimpl::StreamMsg& read() + { + GAPI_ITT_STATIC_LOCAL_HANDLE(inputs_get_hndl, "StreamingInput::read"); + GAPI_ITT_AUTO_TRACE_GUARD(inputs_get_hndl); + + last_read_msg = + cv::optional( + qr.getInputVector(in_queues, in_constants)); + return last_read_msg.value(); + } + + virtual cv::gimpl::StreamMsg get() override + { + GAPI_ITT_STATIC_LOCAL_HANDLE(inputs_get_hndl, "StreamingInput::get"); + GAPI_ITT_AUTO_TRACE_GUARD(inputs_get_hndl); + + if (!last_read_msg.has_value()) { + (void)read(); + } + auto msg = std::move(last_read_msg.value()); + last_read_msg = cv::optional(); + return msg; + } }; class StreamingOutput final: public cv::gimpl::GIslandExecutable::IOutput { // These objects form an internal state of the StreamingOutput struct Posting - { - using V = cv::util::variant; - V data; - bool ready = false; - }; + { + using V = cv::util::variant; + V data; + bool ready = false; + }; using PostingList = std::list; std::vector m_postings; std::unordered_map< const void* @@ -820,7 +879,7 @@ class StreamingOutput final: public cv::gimpl::GIslandExecutable::IOutput return ret_val; } - virtual void post(cv::GRunArgP&& argp) override + virtual void post(cv::GRunArgP&& argp, const std::exception_ptr& exptr) override { GAPI_ITT_STATIC_LOCAL_HANDLE(outputs_post_hndl, "StreamingOutput::post"); GAPI_ITT_AUTO_TRACE_GUARD(outputs_post_hndl); @@ -834,6 +893,9 @@ class StreamingOutput final: public cv::gimpl::GIslandExecutable::IOutput const int out_idx = it->second.first; const auto out_iter = it->second.second; out_iter->ready = true; + if (exptr) { + out_iter->data = cv::gimpl::Exception{exptr}; + } m_postIdx.erase(it); // Drop the link from the cache anyway if (out_iter != m_postings[out_idx].begin()) { @@ -845,16 +907,22 @@ class StreamingOutput final: public cv::gimpl::GIslandExecutable::IOutput while (post_iter != m_postings[out_idx].end() && post_iter->ready == true) { Cmd cmd; - if (cv::util::holds_alternative(post_iter->data)) + switch (post_iter->data.index()) { - cmd = Cmd{cv::util::get(post_iter->data)}; - } - else - { - GAPI_Assert(cv::util::holds_alternative(post_iter->data)); - cmd = Cmd{Stop{}}; - m_stops_sent++; + case Posting::V::index_of(): + cmd = Cmd{cv::util::get(post_iter->data)}; + break; + case Posting::V::index_of(): + cmd = Cmd{cv::util::get(post_iter->data)}; + break; + case Posting::V::index_of(): + cmd = Cmd{Stop{}}; + m_stops_sent++; + break; + default: + GAPI_Assert(false && "Unreachable code"); } + for (auto &&q : m_out_queues[out_idx]) { q->push(cmd); @@ -889,6 +957,7 @@ class StreamingOutput final: public cv::gimpl::GIslandExecutable::IOutput } } } + void meta(const cv::GRunArgP &out, const cv::GRunArg::Meta &m) override { std::lock_guard lock{m_mutex}; @@ -919,6 +988,32 @@ public: // when it posted/resent all STOP messages to all its outputs. return m_stops_sent == desc().size(); } + + virtual void post(cv::gimpl::Exception&& error) override + { + std::lock_guard lock{m_mutex}; + // If the posting list is empty, just broadcast the stop message. + // If it is not, enqueue the Stop message in the postings list. + for (auto &&it : ade::util::indexed(m_postings)) + { + const auto idx = ade::util::index(it); + auto &lst = ade::util::value(it); + if (lst.empty()) + { + for (auto &&q : m_out_queues[idx]) + { + q->push(Cmd(std::move(error))); + } + } + else + { + Posting p; + p.data = Posting::V{std::move(error)}; + p.ready = true; + lst.push_back(std::move(p)); // FIXME: For some reason {}-ctor didn't work here + } + } + } }; // This thread is a plain dumb processing actor. What it do is just: @@ -947,7 +1042,17 @@ void islandActorThread(std::vector in_rcs, while (!output.done()) { GAPI_ITT_AUTO_TRACE_GUARD(island_hndl); - island_exec->run(input, output); + // NB: In case the input message is an cv::gimpl::Exception + // handle it in a general way. + if (cv::util::holds_alternative(input.read())) + { + auto in_msg = input.get(); + output.post(std::move(cv::util::get(in_msg))); + } + else + { + island_exec->run(input, output); + } } } @@ -984,26 +1089,33 @@ void collectorThread(std::vector in_queues, while (true) { GAPI_ITT_AUTO_TRACE_GUARD(collector_hndl); - cv::GRunArgs this_result(out_size); - const bool ok = [&](){ + const auto result = [&](){ GAPI_ITT_AUTO_TRACE_GUARD(collector_get_results_hndl); - return qr.getResultsVector(in_queues, in_mapping, out_size, this_result); + return qr.getResultsVector(in_queues, in_mapping, out_size); }(); - if (!ok) + switch (result.index()) { - if (handle_stop) + case QueueReader::V::index_of(): { - out_queue.push(Cmd{Stop{}}); + GAPI_ITT_AUTO_TRACE_GUARD(collector_push_hndl); + auto this_result = cv::util::get(result); + out_queue.push(Cmd{Result{std::move(this_result), flags}}); + break; } - // Terminate the thread anyway - return; - } - - { - GAPI_ITT_AUTO_TRACE_GUARD(collector_push_hndl); - out_queue.push(Cmd{Result{std::move(this_result), flags}}); + case QueueReader::V::index_of(): + if (handle_stop) + { + out_queue.push(Cmd{Stop{}}); + } + // Terminate the thread anyway + return; + case QueueReader::V::index_of(): + out_queue.push(Cmd{cv::util::get(result)}); + break; + default: + GAPI_Assert(false && "Unreachable code"); } } } @@ -1707,16 +1819,24 @@ bool cv::gimpl::GStreamingExecutor::pull(cv::GRunArgsP &&outs) Cmd cmd; m_out_queue.pop(cmd); - if (cv::util::holds_alternative(cmd)) - { - wait_shutdown(); - return false; + switch (cmd.index()) { + case Cmd::index_of(): + wait_shutdown(); + return false; + case Cmd::index_of(): { + GAPI_Assert(cv::util::holds_alternative(cmd)); + cv::GRunArgs &this_result = cv::util::get(cmd).args; + sync_data(this_result, outs); + return true; + } + case Cmd::index_of(): { + std::rethrow_exception(cv::util::get(cmd).eptr); + return true; + } + default: + GAPI_Assert(false && "Unsupported cmd type in pull"); } - - GAPI_Assert(cv::util::holds_alternative(cmd)); - cv::GRunArgs &this_result = cv::util::get(cmd).args; - sync_data(this_result, outs); - return true; + GAPI_Assert(false && "Unreachable code"); } bool cv::gimpl::GStreamingExecutor::pull(cv::GOptRunArgsP &&outs) @@ -1734,15 +1854,20 @@ bool cv::gimpl::GStreamingExecutor::pull(cv::GOptRunArgsP &&outs) Cmd cmd; m_out_queue.pop(cmd); - if (cv::util::holds_alternative(cmd)) - { - wait_shutdown(); - return false; + switch (cmd.index()) { + case Cmd::index_of(): + wait_shutdown(); + return false; + case Cmd::index_of(): { + sync_data(cv::util::get(cmd), outs); + return true; + } + case Cmd::index_of(): { + std::rethrow_exception(cv::util::get(cmd).eptr); + return true; + } } - - GAPI_Assert(cv::util::holds_alternative(cmd)); - sync_data(cv::util::get(cmd), outs); - return true; + GAPI_Assert(false && "Unreachable code"); } std::tuple> cv::gimpl::GStreamingExecutor::pull() diff --git a/modules/gapi/src/executor/gstreamingexecutor.hpp b/modules/gapi/src/executor/gstreamingexecutor.hpp index b4aadcbbaf..da27f6a646 100644 --- a/modules/gapi/src/executor/gstreamingexecutor.hpp +++ b/modules/gapi/src/executor/gstreamingexecutor.hpp @@ -50,11 +50,12 @@ struct Result { using Cmd = cv::util::variant < cv::util::monostate - , Start // Tells emitters to start working. Not broadcasted to workers. - , Stop // Tells emitters to stop working. Broadcasted to workers. - , cv::GRunArg // Workers data payload to process. - , Result // Pipeline's data for gout() - >; + , Start // Tells emitters to start working. Not broadcasted to workers. + , Stop // Tells emitters to stop working. Broadcasted to workers. + , cv::GRunArg // Workers data payload to process. + , Result // Pipeline's data for gout() + , cv::gimpl::Exception // Exception which is thrown while execution. + >; // Interface over a queue. The underlying queue implementation may be // different. This class is mainly introduced to bring some diff --git a/modules/gapi/test/infer/gapi_infer_ie_test.cpp b/modules/gapi/test/infer/gapi_infer_ie_test.cpp index 8dc23a3880..3741438373 100644 --- a/modules/gapi/test/infer/gapi_infer_ie_test.cpp +++ b/modules/gapi/test/infer/gapi_infer_ie_test.cpp @@ -2915,6 +2915,47 @@ TEST(Infer, ModelWith2DInputs) #endif // HAVE_NGRAPH +TEST(TestAgeGender, ThrowBlobAndInputPrecisionMismatchStreaming) +{ + const std::string device = "MYRIAD"; + skipIfDeviceNotAvailable(device); + + initDLDTDataPath(); + + cv::gapi::ie::detail::ParamDesc params; + // NB: Precision for inputs is U8. + params.model_path = compileAgeGenderBlob(device); + params.device_id = device; + + // Configure & run G-API + using AGInfo = std::tuple; + G_API_NET(AgeGender, , "test-age-gender"); + + auto pp = cv::gapi::ie::Params { + params.model_path, params.device_id + }.cfgOutputLayers({ "age_conv3", "prob" }); + + cv::GMat in, age, gender; + std::tie(age, gender) = cv::gapi::infer(in); + auto pipeline = cv::GComputation(cv::GIn(in), cv::GOut(age, gender)) + .compileStreaming(cv::compile_args(cv::gapi::networks(pp))); + + cv::Mat in_mat(320, 240, CV_32FC3); + cv::randu(in_mat, 0, 1); + cv::Mat gapi_age, gapi_gender; + + pipeline.setSource(cv::gin(in_mat)); + pipeline.start(); + + // NB: Blob precision is U8, but user pass FP32 data, so exception will be thrown. + // Now exception comes directly from IE, but since G-API has information + // about data precision at the compile stage, consider the possibility of + // throwing exception from there. + for (int i = 0; i < 10; ++i) { + EXPECT_ANY_THROW(pipeline.pull(cv::gout(gapi_age, gapi_gender))); + } +} + } // namespace opencv_test #endif // HAVE_INF_ENGINE diff --git a/modules/gapi/test/streaming/gapi_streaming_tests.cpp b/modules/gapi/test/streaming/gapi_streaming_tests.cpp index 4d33d4b0c5..ffa1d452c1 100644 --- a/modules/gapi/test/streaming/gapi_streaming_tests.cpp +++ b/modules/gapi/test/streaming/gapi_streaming_tests.cpp @@ -304,6 +304,66 @@ void checkPullOverload(const cv::Mat& ref, EXPECT_EQ(0., cv::norm(ref, out_mat, cv::NORM_INF)); } +class InvalidSource : public cv::gapi::wip::IStreamSource { +public: + InvalidSource(const size_t throw_every_nth_frame, + const size_t num_frames) + : m_throw_every_nth_frame(throw_every_nth_frame), + m_curr_frame_id(0u), + m_num_frames(num_frames), + m_mat(1, 1, CV_8U) { + } + + static std::string exception_msg() + { + return "InvalidSource sucessfuly failed!"; + } + + bool pull(cv::gapi::wip::Data& d) { + ++m_curr_frame_id; + if (m_curr_frame_id > m_num_frames) { + return false; + } + + if (m_curr_frame_id % m_throw_every_nth_frame == 0) { + throw std::logic_error(InvalidSource::exception_msg()); + return true; + } else { + d = cv::Mat(m_mat); + } + + return true; + } + + cv::GMetaArg descr_of() const override { + return cv::GMetaArg{cv::descr_of(m_mat)}; + } + +private: + size_t m_throw_every_nth_frame; + size_t m_curr_frame_id; + size_t m_num_frames; + cv::Mat m_mat; +}; + +G_TYPED_KERNEL(GThrowExceptionOp, , "org.opencv.test.throw_error_op") +{ + static GMatDesc outMeta(GMatDesc in) { return in; } +}; + +GAPI_OCV_KERNEL(GThrowExceptionKernel, GThrowExceptionOp) +{ + static std::string exception_msg() + { + return "GThrowExceptionKernel sucessfuly failed"; + } + + static void run(const cv::Mat&, cv::Mat&) + { + throw std::logic_error(GThrowExceptionKernel::exception_msg()); + } +}; + } // anonymous namespace TEST_P(GAPI_Streaming, SmokeTest_ConstInput_GMat) @@ -2512,5 +2572,109 @@ TEST(GAPI_Streaming, TestDesyncMediaFrameGray) { } } +TEST(GAPI_Streaming_Exception, SingleKernelThrow) { + cv::GMat in; + auto pipeline = cv::GComputation(in, GThrowExceptionOp::on(in)) + .compileStreaming(cv::compile_args(cv::gapi::kernels())); + + cv::Mat in_mat(cv::Size(300, 300), CV_8UC3); + cv::randu(in_mat, cv::Scalar::all(0), cv::Scalar::all(255)); + pipeline.setSource(cv::gin(in_mat)); + pipeline.start(); + + EXPECT_THROW( + try { + cv::Mat out_mat; + pipeline.pull(cv::gout(out_mat)); + } catch (const std::logic_error& e) { + EXPECT_EQ(GThrowExceptionKernel::exception_msg(), e.what()); + throw; + }, std::logic_error); +} + +TEST(GAPI_Streaming_Exception, StreamingBackendExceptionAsInput) { + cv::GMat in; + auto pipeline = cv::GComputation(in, + cv::gapi::copy(GThrowExceptionOp::on(in))) + .compileStreaming(cv::compile_args(cv::gapi::kernels())); + + cv::Mat in_mat(cv::Size(300, 300), CV_8UC3); + cv::randu(in_mat, cv::Scalar::all(0), cv::Scalar::all(255)); + pipeline.setSource(cv::gin(in_mat)); + pipeline.start(); + + EXPECT_THROW( + try { + cv::Mat out_mat; + pipeline.pull(cv::gout(out_mat)); + } catch (const std::logic_error& e) { + EXPECT_EQ(GThrowExceptionKernel::exception_msg(), e.what()); + throw; + }, std::logic_error); +} + +TEST(GAPI_Streaming_Exception, RegularBacckendsExceptionAsInput) { + cv::GMat in; + auto pipeline = cv::GComputation(in, + cv::gapi::add(GThrowExceptionOp::on(in), GThrowExceptionOp::on(in))) + .compileStreaming(cv::compile_args(cv::gapi::kernels())); + + cv::Mat in_mat(cv::Size(300, 300), CV_8UC3); + cv::randu(in_mat, cv::Scalar::all(0), cv::Scalar::all(255)); + pipeline.setSource(cv::gin(in_mat)); + pipeline.start(); + + EXPECT_THROW( + try { + cv::Mat out_mat; + pipeline.pull(cv::gout(out_mat)); + } catch (const std::logic_error& e) { + EXPECT_EQ(GThrowExceptionKernel::exception_msg(), e.what()); + throw; + }, std::logic_error); +} + +TEST(GAPI_Streaming_Exception, SourceThrow) { + cv::GMat in; + auto pipeline = cv::GComputation(in, cv::gapi::copy(in)).compileStreaming(); + + pipeline.setSource(std::make_shared(1u, 1u)); + pipeline.start(); + + EXPECT_THROW( + try { + cv::Mat out_mat; + pipeline.pull(cv::gout(out_mat)); + } catch (const std::logic_error& e) { + EXPECT_EQ(InvalidSource::exception_msg(), e.what()); + throw; + }, std::logic_error); +} + +TEST(GAPI_Streaming_Exception, SourceThrowEverySecondFrame) { + constexpr size_t throw_every_nth_frame = 2u; + constexpr size_t num_frames = 10u; + size_t curr_frame = 0; + bool has_frame = true; + cv::Mat out_mat; + + cv::GMat in; + auto pipeline = cv::GComputation(in, cv::gapi::copy(in)).compileStreaming(); + + pipeline.setSource(std::make_shared(throw_every_nth_frame, num_frames)); + pipeline.start(); + while (has_frame) { + ++curr_frame; + try { + has_frame = pipeline.pull(cv::gout(out_mat)); + } catch (const std::exception& e) { + EXPECT_TRUE(curr_frame % throw_every_nth_frame == 0); + EXPECT_EQ(InvalidSource::exception_msg(), e.what()); + } + } + + // NB: Pull was called num_frames + 1(stop). + EXPECT_EQ(num_frames, curr_frame - 1); +} } // namespace opencv_test