Merge pull request #21660 from TolyaTalamanov:at/handle-exception-in-streamingexecutor

[G-API] Handle exceptions in streaming executor

* Handle exceptions in streaming executor

* Rethrow exception in non-streaming executor

* Clean up

* Put more tests

* Handle exceptions in IE backend

* Handle exception in IE callbacks

* Handle exception in GExecutor

* Handle all exceptions in IE backend

* Not only (std::exception& e)

* Fix comments to review

* Handle input exception in generic way

* Fix comment

* Clean up

* Apply review comments

* Put more comments
* Fix alignment
* Move test outside of HAVE_NGRAPH

* Fix compilation
This commit is contained in:
Anatoliy Talamanov 2022-03-25 11:19:53 +03:00 committed by GitHub
parent 78bc11465b
commit d98e07c3d3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 617 additions and 213 deletions

View File

@ -2,12 +2,17 @@
// It is subject to the license terms in the LICENSE file found in the top-level directory
// of this distribution and at http://opencv.org/license.html.
//
// Copyright (C) 2018-2020 Intel Corporation
// Copyright (C) 2018-2022 Intel Corporation
#ifndef OPENCV_GAPI_GCPUKERNEL_HPP
#define OPENCV_GAPI_GCPUKERNEL_HPP
#ifdef _MSC_VER
#pragma warning(disable: 4702) // "Unreachable code"
// on postprocess(...) call inside OCVCallHelper
#endif
#include <functional>
#include <unordered_map>
#include <utility>

View File

@ -389,10 +389,13 @@ public:
const IEUnit &uu;
cv::gimpl::GIslandExecutable::IOutput &out;
// NB: Need to gurantee that MediaFrame::View don't die until request is over.
// NB: Need to gurantee that MediaFrame::View doesn't die until request is over.
using Views = std::vector<std::unique_ptr<cv::MediaFrame::View>>;
Views views;
// To store exception appeared in callback.
std::exception_ptr eptr;
private:
cv::detail::VectorRef& outVecRef(std::size_t idx);
@ -656,7 +659,7 @@ std::vector<InferenceEngine::InferRequest> cv::gimpl::ie::IECompiled::createInfe
class cv::gimpl::ie::RequestPool {
public:
using RunF = std::function<void(InferenceEngine::InferRequest&)>;
using CallbackF = std::function<void(InferenceEngine::InferRequest&)>;
using CallbackF = std::function<void(InferenceEngine::InferRequest&, InferenceEngine::StatusCode)>;
// NB: The task is represented by:
// RunF - function which is set blobs and run async inference.
@ -675,7 +678,7 @@ private:
void callback(Task task,
size_t id,
IE::InferRequest request,
IE::StatusCode code);
IE::StatusCode code) noexcept;
void setup();
QueueClass<size_t> m_idle_ids;
@ -706,32 +709,28 @@ void cv::gimpl::ie::RequestPool::execute(cv::gimpl::ie::RequestPool::Task&& t) {
static_cast<callback_t>(
std::bind(&cv::gimpl::ie::RequestPool::callback, this,
t, id, _1, _2)));
t.run(request);
// NB: InferRequest is already marked as busy
// in case of exception need to return it back to the idle.
try {
t.run(request);
} catch (...) {
request.SetCompletionCallback([](){});
m_idle_ids.push(id);
throw;
}
}
void cv::gimpl::ie::RequestPool::callback(cv::gimpl::ie::RequestPool::Task task,
size_t id,
IE::InferRequest request,
IE::StatusCode code) {
// FIXME: Any exception which is arrised here must not leave this callback,
// because it won't be handled.
try {
if (code != IE::StatusCode::OK) {
throw std::logic_error("IE::InferRequest finished with not OK status");
}
task.callback(request);
// NB: IE::InferRequest keeps the callback until the new one is set.
// Since user's callback might keep resources that should be released,
// need to destroy its after execution.
// Let's set the empty one to cause the destruction of a callback.
request.SetCompletionCallback([](){});
m_idle_ids.push(id);
} catch (const std::exception& e) {
GAPI_LOG_FATAL(NULL, "Callback failed with error: " << e.what());
//FIXME: Exception CAN't be rethrown here, since this callback works
// in separate IE thread and such scenarios aren't handled properly in
// G-API so far.
}
IE::StatusCode code) noexcept {
// NB: Inference is over.
// 1. Run callback
// 2. Destroy callback to free resources.
// 3. Mark InferRequest as idle.
task.callback(request, code);
request.SetCompletionCallback([](){});
m_idle_ids.push(id);
}
// NB: Not thread-safe.
@ -786,18 +785,19 @@ void cv::gimpl::ie::GIEExecutable::run(cv::gimpl::GIslandExecutable::IInput &in
// 1. Collect island inputs/outputs.
// 2. Create kernel context. (Every kernel has his own context).
// 3. If the EndOfStream message is recieved, wait until all passed task are done.
// 4.
// 4. If the Exception message is revieved, propagate it further.
// 5.
// 5.1 Run the kernel.
// 5.2 Kernel wait for all nececcary infer requests and start asynchronous execution.
// 5.3 After the kernel is finished continue processing next frame.
//
// 5. If graph is compiled in non-streaming mode, wait until all tasks are done.
// 6. If graph is compiled in non-streaming mode, wait until all tasks are done.
std::vector<InObj> input_objs;
std::vector<OutObj> output_objs;
const auto &in_desc = in.desc();
const auto in_msg = in.get();
const auto &in_desc = in.desc();
auto in_msg = in.get();
if (cv::util::holds_alternative<cv::gimpl::EndOfStream>(in_msg))
{
@ -835,10 +835,20 @@ void cv::gimpl::ie::GIEExecutable::run(cv::gimpl::GIslandExecutable::IInput &in
const auto &kk = giem.metadata(this_nh).get<IECallable>();
// (4) Run the kernel.
kk.run(ctx, *m_reqPool);
// (5) Run the kernel.
try {
kk.run(ctx, *m_reqPool);
} catch (...) {
auto eptr = std::current_exception();
for (auto i : ade::util::iota(ctx->uu.params.num_out))
{
auto output = ctx->output(i);
ctx->out.post(std::move(output), eptr);
}
return;
}
// (5) In non-streaming mode need to wait until the all tasks are done
// (6) In non-streaming mode need to wait until the all tasks are done
// FIXME: Is there more graceful way to handle this case ?
if (!m_gm.metadata().contains<Streaming>()) {
m_reqPool->waitAll();
@ -944,19 +954,26 @@ static IE::PreProcessInfo configurePreProcInfo(const IE::InputInfo::CPtr& ii,
// NB: This is a callback used by async infer
// to post outputs blobs (cv::GMat's).
static void PostOutputs(InferenceEngine::InferRequest &request,
std::shared_ptr<IECallContext> ctx) {
static void PostOutputs(InferenceEngine::InferRequest &request,
InferenceEngine::StatusCode code,
std::shared_ptr<IECallContext> ctx) {
GAPI_ITT_STATIC_LOCAL_HANDLE(ie_cb_post_outputs_hndl, "IE_async_callback_PostOutputs");
GAPI_ITT_AUTO_TRACE_GUARD(ie_cb_post_outputs_hndl);
for (auto i : ade::util::iota(ctx->uu.params.num_out))
{
if (code != IE::StatusCode::OK) {
std::stringstream ss;
ss << "InferRequest for model: " << ctx->uu.params.model_path
<< " finished with InferenceEngine::StatusCode: " << static_cast<int>(code);
ctx->eptr = std::make_exception_ptr(std::logic_error(ss.str()));
}
for (auto i : ade::util::iota(ctx->uu.params.num_out)) {
auto& out_mat = ctx->outMatR(i);
IE::Blob::Ptr this_blob = request.GetBlob(ctx->uu.params.output_names[i]);
copyFromIE(this_blob, out_mat);
auto output = ctx->output(i);
ctx->out.meta(output, ctx->input(0).meta);
ctx->out.post(std::move(output));
ctx->out.post(std::move(output), ctx->eptr);
}
}
@ -966,7 +983,9 @@ public:
std::shared_ptr<IECallContext> ctx,
std::vector<std::vector<int>>&& cached_dims);
void operator()(InferenceEngine::InferRequest &request, size_t pos) const;
void operator()(InferenceEngine::InferRequest &request,
InferenceEngine::StatusCode code,
size_t pos) const;
private:
struct Priv {
@ -987,20 +1006,30 @@ PostOutputsList::PostOutputsList(size_t size,
m_priv->cached_dims = std::move(cached_dims);
}
void PostOutputsList::operator()(InferenceEngine::InferRequest &req, size_t pos) const {
void PostOutputsList::operator()(InferenceEngine::InferRequest &req,
InferenceEngine::StatusCode code,
size_t pos) const {
auto&& ctx = m_priv->ctx;
auto&& cached_dims = m_priv->cached_dims;
auto&& finished = m_priv->finished;
auto&& size = m_priv->size;
for (auto i : ade::util::iota(ctx->uu.params.num_out)) {
std::vector<cv::Mat> &out_vec = ctx->outVecR<cv::Mat>(i);
IE::Blob::Ptr out_blob = req.GetBlob(ctx->uu.params.output_names[i]);
GAPI_Assert(out_blob);
if (code != IE::StatusCode::OK) {
ctx->eptr = std::make_exception_ptr(
std::logic_error("IE::InferRequest finished with not OK status"));
}
// FIXME: Avoid data copy. Not sure if it is possible though
out_vec[pos].create(cached_dims[i], toCV(out_blob->getTensorDesc().getPrecision()));
copyFromIE(out_blob, out_vec[pos]);
if (!ctx->eptr) {
for (auto i : ade::util::iota(ctx->uu.params.num_out)) {
std::vector<cv::Mat> &out_vec = ctx->outVecR<cv::Mat>(i);
IE::Blob::Ptr out_blob = req.GetBlob(ctx->uu.params.output_names[i]);
GAPI_Assert(out_blob);
// FIXME: Avoid data copy. Not sure if it is possible though
out_vec[pos].create(cached_dims[i], toCV(out_blob->getTensorDesc().getPrecision()));
copyFromIE(out_blob, out_vec[pos]);
}
}
++finished;
@ -1008,7 +1037,7 @@ void PostOutputsList::operator()(InferenceEngine::InferRequest &req, size_t pos)
for (auto i : ade::util::iota(ctx->uu.params.num_out)) {
auto output = ctx->output(i);
ctx->out.meta(output, ctx->input(0).meta);
ctx->out.post(std::move(output));
ctx->out.post(std::move(output), ctx->eptr);
}
}
}
@ -1123,7 +1152,7 @@ struct Infer: public cv::detail::KernelTag {
// What about to do that in RequestPool ?
req.StartAsync();
},
std::bind(PostOutputs, _1, ctx)
std::bind(PostOutputs, _1, _2, ctx)
}
);
}
@ -1218,7 +1247,7 @@ struct InferROI: public cv::detail::KernelTag {
// What about to do that in RequestPool ?
req.StartAsync();
},
std::bind(PostOutputs, _1, ctx)
std::bind(PostOutputs, _1, _2, ctx)
}
);
}
@ -1294,7 +1323,6 @@ struct InferList: public cv::detail::KernelTag {
static void run(std::shared_ptr<IECallContext> ctx,
cv::gimpl::ie::RequestPool &reqPool) {
const auto& in_roi_vec = ctx->inArg<cv::detail::VectorRef>(0u).rref<cv::Rect>();
// NB: In case there is no input data need to post output anyway
if (in_roi_vec.empty()) {
@ -1335,7 +1363,7 @@ struct InferList: public cv::detail::KernelTag {
setROIBlob(req, ctx->uu.params.input_names[0u], this_blob, rc, *ctx);
req.StartAsync();
},
std::bind(callback, std::placeholders::_1, pos)
std::bind(callback, std::placeholders::_1, std::placeholders::_2, pos)
}
);
}
@ -1506,7 +1534,7 @@ struct InferList2: public cv::detail::KernelTag {
}
req.StartAsync();
},
std::bind(callback, std::placeholders::_1, list_idx)
std::bind(callback, std::placeholders::_1, std::placeholders::_2, list_idx)
} // task
);
} // for

View File

@ -172,6 +172,7 @@ void Copy::Actor::run(cv::gimpl::GIslandExecutable::IInput &in,
return;
}
GAPI_DbgAssert(cv::util::holds_alternative<cv::GRunArgs>(in_msg));
const cv::GRunArgs &in_args = cv::util::get<cv::GRunArgs>(in_msg);
GAPI_Assert(in_args.size() == 1u);
@ -212,6 +213,7 @@ public:
return;
}
GAPI_Assert(cv::util::holds_alternative<cv::GRunArgs>(in_msg));
const cv::GRunArgs &in_args = cv::util::get<cv::GRunArgs>(in_msg);
GAPI_Assert(in_args.size() == 1u);
auto frame = cv::util::get<cv::MediaFrame>(in_args[0]);

View File

@ -412,7 +412,17 @@ void GIslandExecutable::run(GIslandExecutable::IInput &in, GIslandExecutable::IO
out_objs.emplace_back(ade::util::value(it),
out.get(ade::util::checked_cast<int>(ade::util::index(it))));
}
run(std::move(in_objs), std::move(out_objs));
try {
run(std::move(in_objs), std::move(out_objs));
} catch (...) {
auto eptr = std::current_exception();
for (auto &&it: out_objs)
{
out.post(std::move(it.second), eptr);
}
return;
}
// Propagate in-graph meta down to the graph
// Note: this is not a complete implementation! Mainly this is a stub

View File

@ -161,7 +161,12 @@ public:
const std::vector<cv::gimpl::RcDesc> &desc() const { return d; }
};
struct EndOfStream {};
using StreamMsg = cv::util::variant<EndOfStream, cv::GRunArgs>;
struct Exception {
std::exception_ptr eptr;
};
using StreamMsg = cv::util::variant<EndOfStream, cv::GRunArgs, Exception>;
struct GIslandExecutable::IInput: public GIslandExecutable::IODesc {
virtual ~IInput() = default;
virtual StreamMsg get() = 0; // Get a new input vector (blocking)
@ -169,9 +174,11 @@ struct GIslandExecutable::IInput: public GIslandExecutable::IODesc {
};
struct GIslandExecutable::IOutput: public GIslandExecutable::IODesc {
virtual ~IOutput() = default;
virtual GRunArgP get(int idx) = 0; // Allocate (wrap) a new data object for output idx
virtual void post(GRunArgP&&) = 0; // Release the object back to the framework (mark available)
virtual void post(EndOfStream&&) = 0; // Post end-of-stream marker back to the framework
virtual GRunArgP get(int idx) = 0; // Allocate (wrap) a new data object for output idx
virtual void post(GRunArgP&&, const std::exception_ptr& = {}) = 0; // Release the object back to the framework (mark available)
virtual void post(EndOfStream&&) = 0; // Post end-of-stream marker back to the framework
virtual void post(Exception&&) = 0;
// Assign accumulated metadata to the given output object.
// This method can only be called after get() and before post().

View File

@ -270,6 +270,7 @@ class cv::gimpl::GExecutor::Output final: public cv::gimpl::GIslandExecutable::I
{
cv::gimpl::Mag &mag;
std::unordered_map<const void*, int> out_idx;
std::exception_ptr eptr;
GRunArgP get(int idx) override
{
@ -278,8 +279,18 @@ class cv::gimpl::GExecutor::Output final: public cv::gimpl::GIslandExecutable::I
out_idx[cv::gimpl::proto::ptr(r)] = idx;
return r;
}
void post(GRunArgP&&) override { } // Do nothing here
void post(GRunArgP&&, const std::exception_ptr& e) override
{
if (e)
{
eptr = e;
}
}
void post(EndOfStream&&) override {} // Do nothing here too
void post(Exception&& ex) override
{
eptr = std::move(ex.eptr);
}
void meta(const GRunArgP &out, const GRunArg::Meta &m) override
{
const auto idx = out_idx.at(cv::gimpl::proto::ptr(out));
@ -291,6 +302,14 @@ public:
{
set(rcs);
}
void verify()
{
if (eptr)
{
std::rethrow_exception(eptr);
}
}
};
void cv::gimpl::GExecutor::run(cv::gimpl::GRuntimeArgs &&args)
@ -389,6 +408,8 @@ void cv::gimpl::GExecutor::run(cv::gimpl::GRuntimeArgs &&args)
Input i{m_res, op.in_objects};
Output o{m_res, op.out_objects};
op.isl_exec->run(i, o);
// NB: Check if execution finished without exception.
o.verify();
}
// (7)

View File

@ -31,6 +31,8 @@
#include <opencv2/gapi/streaming/meta.hpp>
#include <opencv2/gapi/streaming/sync.hpp>
#include <opencv2/gapi/util/variant.hpp>
namespace
{
using namespace cv::gimpl::stream;
@ -310,14 +312,13 @@ class QueueReader
const std::size_t this_id);
public:
bool getInputVector (std::vector<Q*> &in_queues,
cv::GRunArgs &in_constants,
cv::GRunArgs &isl_inputs);
cv::gimpl::StreamMsg getInputVector (std::vector<Q*> &in_queues,
cv::GRunArgs &in_constants);
bool getResultsVector(std::vector<Q*> &in_queues,
const std::vector<int> &in_mapping,
const std::size_t out_size,
cv::GRunArgs &out_results);
using V = cv::util::variant<cv::GRunArgs, Stop, cv::gimpl::Exception>;
V getResultsVector(std::vector<Q*> &in_queues,
const std::vector<int> &in_mapping,
const std::size_t out_size);
};
void rewindToStop(std::vector<Q*> &in_queues,
@ -369,9 +370,8 @@ void QueueReader::rewindToStop(std::vector<Q*> &in_queues,
::rewindToStop(in_queues, this_id);
}
bool QueueReader::getInputVector(std::vector<Q*> &in_queues,
cv::GRunArgs &in_constants,
cv::GRunArgs &isl_inputs)
cv::gimpl::StreamMsg QueueReader::getInputVector(std::vector<Q*> &in_queues,
cv::GRunArgs &in_constants)
{
// NB: Need to release resources from the previous step, to fetch new ones.
// On some systems it might be impossible to allocate new memory
@ -381,72 +381,98 @@ bool QueueReader::getInputVector(std::vector<Q*> &in_queues,
// lifetime, keep the whole cmd vector (of size == # of inputs)
// in memory.
m_cmd.resize(in_queues.size());
isl_inputs.resize(in_queues.size());
cv::GRunArgs isl_inputs(in_queues.size());
cv::optional<cv::gimpl::Exception> exception;
for (auto &&it : ade::util::indexed(in_queues))
{
auto id = ade::util::index(it);
auto &q = ade::util::value(it);
auto id = ade::util::index(it);
auto &q = ade::util::value(it);
if (q == nullptr)
{
GAPI_Assert(!in_constants.empty());
// NULL queue means a graph-constant value (like a
// value-initialized scalar)
// It can also hold a constant value received with
// Stop::Kind::CNST message (see above).
isl_inputs[id] = in_constants[id];
continue;
}
if (q == nullptr)
{
GAPI_Assert(!in_constants.empty());
// NULL queue means a graph-constant value (like a
// value-initialized scalar)
// It can also hold a constant value received with
// Stop::Kind::CNST message (see above).
isl_inputs[id] = in_constants[id];
continue;
}
q->pop(m_cmd[id]);
if (!cv::util::holds_alternative<Stop>(m_cmd[id]))
{
isl_inputs[id] = cv::util::get<cv::GRunArg>(m_cmd[id]);
}
else // A Stop sign
{
const auto &stop = cv::util::get<Stop>(m_cmd[id]);
if (stop.kind == Stop::Kind::CNST)
{
// We've got a Stop signal from a const source,
// propagated as a result of real stream reaching its
// end. Sometimes these signals come earlier than
// real EOS Stops so are deprioritized -- just
// remember the Const value here and continue
// processing other queues. Set queue pointer to
// nullptr and update the const_val vector
// appropriately
m_finishing = true;
in_queues[id] = nullptr;
in_constants.resize(in_queues.size());
in_constants[id] = std::move(stop.cdata);
q->pop(m_cmd[id]);
switch (m_cmd[id].index())
{
case Cmd::index_of<cv::GRunArg>():
isl_inputs[id] = cv::util::get<cv::GRunArg>(m_cmd[id]);
break;
case Cmd::index_of<Stop>():
{
const auto &stop = cv::util::get<Stop>(m_cmd[id]);
if (stop.kind == Stop::Kind::CNST)
{
// We've got a Stop signal from a const source,
// propagated as a result of real stream reaching its
// end. Sometimes these signals come earlier than
// real EOS Stops so are deprioritized -- just
// remember the Const value here and continue
// processing other queues. Set queue pointer to
// nullptr and update the const_val vector
// appropriately
m_finishing = true;
in_queues[id] = nullptr;
in_constants.resize(in_queues.size());
in_constants[id] = std::move(stop.cdata);
// NEXT time (on a next call to getInputVector()), the
// "q==nullptr" check above will be triggered, but now
// we need to make it manually:
isl_inputs[id] = in_constants[id];
}
else
{
GAPI_Assert(stop.kind == Stop::Kind::HARD);
rewindToStop(in_queues, id);
// After queues are read to the proper indicator,
// indicate end-of-stream
return false;
} // if(Cnst)
} // if(Stop)
// NEXT time (on a next call to getInputVector()), the
// "q==nullptr" check above will be triggered, but now
// we need to make it manually:
isl_inputs[id] = in_constants[id];
}
else
{
GAPI_Assert(stop.kind == Stop::Kind::HARD);
rewindToStop(in_queues, id);
// After queues are read to the proper indicator,
// indicate end-of-stream
return cv::gimpl::StreamMsg{cv::gimpl::EndOfStream{}};
} // if(Cnst)
break;
}
case Cmd::index_of<cv::gimpl::Exception>():
{
exception =
cv::util::make_optional(cv::util::get<cv::gimpl::Exception>(m_cmd[id]));
break;
}
default:
GAPI_Assert(false && "Unsupported cmd type in getInputVector()");
}
} // for(in_queues)
if (exception.has_value()) {
return cv::gimpl::StreamMsg{exception.value()};
}
if (m_finishing)
{
// If the process is about to end (a soft Stop was received
// already) and an island has no other inputs than constant
// inputs, its queues may all become nullptrs. Indicate it as
// "no data".
return !ade::util::all_of(in_queues, [](Q *ptr){return ptr == nullptr;});
if (ade::util::all_of(in_queues, [](Q *ptr){return ptr == nullptr;})) {
return cv::gimpl::StreamMsg{cv::gimpl::EndOfStream{}};
}
}
return true; // A regular case - there is data to process.
// A regular case - there is data to process
for (auto& arg : isl_inputs) {
if (arg.index() == cv::GRunArg::index_of<cv::Mat>()) {
arg = cv::GRunArg{ cv::make_rmat<cv::gimpl::RMatOnMat>(cv::util::get<cv::Mat>(arg))
, arg.meta
};
}
}
return cv::gimpl::StreamMsg{std::move(isl_inputs)};
}
// This is a special method to obtain a result vector
@ -474,33 +500,47 @@ bool QueueReader::getInputVector(std::vector<Q*> &in_queues,
// (_may be_ partially filled) to the same final output queue.
// The receiver part at the GStreamingExecutor level won't change
// because of that.
bool QueueReader::getResultsVector(std::vector<Q*> &in_queues,
const std::vector<int> &in_mapping,
const std::size_t out_size,
cv::GRunArgs &out_results)
QueueReader::V QueueReader::getResultsVector(std::vector<Q*> &in_queues,
const std::vector<int> &in_mapping,
const std::size_t out_size)
{
cv::GRunArgs out_results(out_size);
m_cmd.resize(out_size);
cv::optional<cv::gimpl::Exception> exception;
for (auto &&it : ade::util::indexed(in_queues))
{
auto ii = ade::util::index(it);
auto oi = in_mapping[ii];
auto &q = ade::util::value(it);
q->pop(m_cmd[oi]);
if (!cv::util::holds_alternative<Stop>(m_cmd[oi]))
{
out_results[oi] = std::move(cv::util::get<cv::GRunArg>(m_cmd[oi]));
}
else // A Stop sign
{
// In theory, the CNST should never reach here.
// Collector thread never handles the inputs directly
// (collector's input queues are always produced by
// islands in the graph).
rewindToStop(in_queues, ii);
return false;
} // if(Stop)
switch (m_cmd[oi].index()) {
case Cmd::index_of<cv::GRunArg>():
out_results[oi] = std::move(cv::util::get<cv::GRunArg>(m_cmd[oi]));
break;
case Cmd::index_of<Stop>():
// In theory, the CNST should never reach here.
// Collector thread never handles the inputs directly
// (collector's input queues are always produced by
// islands in the graph).
rewindToStop(in_queues, ii);
return QueueReader::V(Stop{});
case Cmd::index_of<cv::gimpl::Exception>():
exception =
cv::util::make_optional(cv::util::get<cv::gimpl::Exception>(m_cmd[oi]));
break;
default:
cv::util::throw_error(
std::logic_error("Unexpected cmd kind in getResultsVector"));
} // switch
} // for(in_queues)
return true;
if (exception.has_value()) {
return QueueReader::V(exception.value());
}
return QueueReader::V(out_results);
}
@ -521,7 +561,9 @@ void emitterActorThread(std::shared_ptr<cv::gimpl::GIslandEmitter> emitter,
|| cv::util::holds_alternative<Stop>(cmd));
if (cv::util::holds_alternative<Stop>(cmd))
{
for (auto &&oq : out_queues) oq->push(cmd);
for (auto &&oq : out_queues) {
oq->push(cmd);
}
return;
}
@ -547,10 +589,21 @@ void emitterActorThread(std::shared_ptr<cv::gimpl::GIslandEmitter> emitter,
// Try to obtain next data chunk from the source
cv::GRunArg data;
const bool result = [&](){
GAPI_ITT_AUTO_TRACE_GUARD(emitter_pull_hndl);
return emitter->pull(data);
}();
bool result = false;
try {
result = [&](){
GAPI_ITT_AUTO_TRACE_GUARD(emitter_pull_hndl);
return emitter->pull(data);
}();
} catch (...) {
auto eptr = std::current_exception();
for (auto &&oq : out_queues)
{
oq->push(Cmd{cv::gimpl::Exception{eptr}});
}
// NB: Go to the next iteration.
continue;
}
if (result)
{
@ -673,28 +726,8 @@ class StreamingInput final: public cv::gimpl::GIslandExecutable::IInput
std::vector<Q*> &in_queues; // FIXME: This can be part of QueueReader
cv::GRunArgs &in_constants; // FIXME: This can be part of QueueReader
virtual cv::gimpl::StreamMsg get() override
{
GAPI_ITT_STATIC_LOCAL_HANDLE(inputs_get_hndl, "StreamingInput::get");
GAPI_ITT_AUTO_TRACE_GUARD(inputs_get_hndl);
cv::optional<cv::gimpl::StreamMsg> last_read_msg;
cv::GRunArgs isl_input_args;
if (!qr.getInputVector(in_queues, in_constants, isl_input_args))
{
// Stop case
return cv::gimpl::StreamMsg{cv::gimpl::EndOfStream{}};
}
// Wrap all input cv::Mats with RMats
for (auto& arg : isl_input_args) {
if (arg.index() == cv::GRunArg::index_of<cv::Mat>()) {
arg = cv::GRunArg{ cv::make_rmat<cv::gimpl::RMatOnMat>(cv::util::get<cv::Mat>(arg))
, arg.meta
};
}
}
return cv::gimpl::StreamMsg{std::move(isl_input_args)};
}
virtual cv::gimpl::StreamMsg try_get() override
{
// FIXME: This is not very usable at the moment!
@ -709,17 +742,43 @@ class StreamingInput final: public cv::gimpl::GIslandExecutable::IInput
{
set(in_descs);
}
const cv::gimpl::StreamMsg& read()
{
GAPI_ITT_STATIC_LOCAL_HANDLE(inputs_get_hndl, "StreamingInput::read");
GAPI_ITT_AUTO_TRACE_GUARD(inputs_get_hndl);
last_read_msg =
cv::optional<cv::gimpl::StreamMsg>(
qr.getInputVector(in_queues, in_constants));
return last_read_msg.value();
}
virtual cv::gimpl::StreamMsg get() override
{
GAPI_ITT_STATIC_LOCAL_HANDLE(inputs_get_hndl, "StreamingInput::get");
GAPI_ITT_AUTO_TRACE_GUARD(inputs_get_hndl);
if (!last_read_msg.has_value()) {
(void)read();
}
auto msg = std::move(last_read_msg.value());
last_read_msg = cv::optional<cv::gimpl::StreamMsg>();
return msg;
}
};
class StreamingOutput final: public cv::gimpl::GIslandExecutable::IOutput
{
// These objects form an internal state of the StreamingOutput
struct Posting
{
using V = cv::util::variant<cv::GRunArg, cv::gimpl::EndOfStream>;
V data;
bool ready = false;
};
{
using V = cv::util::variant<cv::GRunArg,
cv::gimpl::EndOfStream,
cv::gimpl::Exception>;
V data;
bool ready = false;
};
using PostingList = std::list<Posting>;
std::vector<PostingList> m_postings;
std::unordered_map< const void*
@ -820,7 +879,7 @@ class StreamingOutput final: public cv::gimpl::GIslandExecutable::IOutput
return ret_val;
}
virtual void post(cv::GRunArgP&& argp) override
virtual void post(cv::GRunArgP&& argp, const std::exception_ptr& exptr) override
{
GAPI_ITT_STATIC_LOCAL_HANDLE(outputs_post_hndl, "StreamingOutput::post");
GAPI_ITT_AUTO_TRACE_GUARD(outputs_post_hndl);
@ -834,6 +893,9 @@ class StreamingOutput final: public cv::gimpl::GIslandExecutable::IOutput
const int out_idx = it->second.first;
const auto out_iter = it->second.second;
out_iter->ready = true;
if (exptr) {
out_iter->data = cv::gimpl::Exception{exptr};
}
m_postIdx.erase(it); // Drop the link from the cache anyway
if (out_iter != m_postings[out_idx].begin())
{
@ -845,16 +907,22 @@ class StreamingOutput final: public cv::gimpl::GIslandExecutable::IOutput
while (post_iter != m_postings[out_idx].end() && post_iter->ready == true)
{
Cmd cmd;
if (cv::util::holds_alternative<cv::GRunArg>(post_iter->data))
switch (post_iter->data.index())
{
cmd = Cmd{cv::util::get<cv::GRunArg>(post_iter->data)};
}
else
{
GAPI_Assert(cv::util::holds_alternative<cv::gimpl::EndOfStream>(post_iter->data));
cmd = Cmd{Stop{}};
m_stops_sent++;
case Posting::V::index_of<cv::GRunArg>():
cmd = Cmd{cv::util::get<cv::GRunArg>(post_iter->data)};
break;
case Posting::V::index_of<cv::gimpl::Exception>():
cmd = Cmd{cv::util::get<cv::gimpl::Exception>(post_iter->data)};
break;
case Posting::V::index_of<cv::gimpl::EndOfStream>():
cmd = Cmd{Stop{}};
m_stops_sent++;
break;
default:
GAPI_Assert(false && "Unreachable code");
}
for (auto &&q : m_out_queues[out_idx])
{
q->push(cmd);
@ -889,6 +957,7 @@ class StreamingOutput final: public cv::gimpl::GIslandExecutable::IOutput
}
}
}
void meta(const cv::GRunArgP &out, const cv::GRunArg::Meta &m) override
{
std::lock_guard<std::mutex> lock{m_mutex};
@ -919,6 +988,32 @@ public:
// when it posted/resent all STOP messages to all its outputs.
return m_stops_sent == desc().size();
}
virtual void post(cv::gimpl::Exception&& error) override
{
std::lock_guard<std::mutex> lock{m_mutex};
// If the posting list is empty, just broadcast the stop message.
// If it is not, enqueue the Stop message in the postings list.
for (auto &&it : ade::util::indexed(m_postings))
{
const auto idx = ade::util::index(it);
auto &lst = ade::util::value(it);
if (lst.empty())
{
for (auto &&q : m_out_queues[idx])
{
q->push(Cmd(std::move(error)));
}
}
else
{
Posting p;
p.data = Posting::V{std::move(error)};
p.ready = true;
lst.push_back(std::move(p)); // FIXME: For some reason {}-ctor didn't work here
}
}
}
};
// This thread is a plain dumb processing actor. What it do is just:
@ -947,7 +1042,17 @@ void islandActorThread(std::vector<cv::gimpl::RcDesc> in_rcs,
while (!output.done())
{
GAPI_ITT_AUTO_TRACE_GUARD(island_hndl);
island_exec->run(input, output);
// NB: In case the input message is an cv::gimpl::Exception
// handle it in a general way.
if (cv::util::holds_alternative<cv::gimpl::Exception>(input.read()))
{
auto in_msg = input.get();
output.post(std::move(cv::util::get<cv::gimpl::Exception>(in_msg)));
}
else
{
island_exec->run(input, output);
}
}
}
@ -984,26 +1089,33 @@ void collectorThread(std::vector<Q*> in_queues,
while (true)
{
GAPI_ITT_AUTO_TRACE_GUARD(collector_hndl);
cv::GRunArgs this_result(out_size);
const bool ok = [&](){
const auto result = [&](){
GAPI_ITT_AUTO_TRACE_GUARD(collector_get_results_hndl);
return qr.getResultsVector(in_queues, in_mapping, out_size, this_result);
return qr.getResultsVector(in_queues, in_mapping, out_size);
}();
if (!ok)
switch (result.index())
{
if (handle_stop)
case QueueReader::V::index_of<cv::GRunArgs>():
{
out_queue.push(Cmd{Stop{}});
GAPI_ITT_AUTO_TRACE_GUARD(collector_push_hndl);
auto this_result = cv::util::get<cv::GRunArgs>(result);
out_queue.push(Cmd{Result{std::move(this_result), flags}});
break;
}
// Terminate the thread anyway
return;
}
{
GAPI_ITT_AUTO_TRACE_GUARD(collector_push_hndl);
out_queue.push(Cmd{Result{std::move(this_result), flags}});
case QueueReader::V::index_of<Stop>():
if (handle_stop)
{
out_queue.push(Cmd{Stop{}});
}
// Terminate the thread anyway
return;
case QueueReader::V::index_of<cv::gimpl::Exception>():
out_queue.push(Cmd{cv::util::get<cv::gimpl::Exception>(result)});
break;
default:
GAPI_Assert(false && "Unreachable code");
}
}
}
@ -1707,16 +1819,24 @@ bool cv::gimpl::GStreamingExecutor::pull(cv::GRunArgsP &&outs)
Cmd cmd;
m_out_queue.pop(cmd);
if (cv::util::holds_alternative<Stop>(cmd))
{
wait_shutdown();
return false;
switch (cmd.index()) {
case Cmd::index_of<Stop>():
wait_shutdown();
return false;
case Cmd::index_of<Result>(): {
GAPI_Assert(cv::util::holds_alternative<Result>(cmd));
cv::GRunArgs &this_result = cv::util::get<Result>(cmd).args;
sync_data(this_result, outs);
return true;
}
case Cmd::index_of<Exception>(): {
std::rethrow_exception(cv::util::get<Exception>(cmd).eptr);
return true;
}
default:
GAPI_Assert(false && "Unsupported cmd type in pull");
}
GAPI_Assert(cv::util::holds_alternative<Result>(cmd));
cv::GRunArgs &this_result = cv::util::get<Result>(cmd).args;
sync_data(this_result, outs);
return true;
GAPI_Assert(false && "Unreachable code");
}
bool cv::gimpl::GStreamingExecutor::pull(cv::GOptRunArgsP &&outs)
@ -1734,15 +1854,20 @@ bool cv::gimpl::GStreamingExecutor::pull(cv::GOptRunArgsP &&outs)
Cmd cmd;
m_out_queue.pop(cmd);
if (cv::util::holds_alternative<Stop>(cmd))
{
wait_shutdown();
return false;
switch (cmd.index()) {
case Cmd::index_of<Stop>():
wait_shutdown();
return false;
case Cmd::index_of<Result>(): {
sync_data(cv::util::get<Result>(cmd), outs);
return true;
}
case Cmd::index_of<Exception>(): {
std::rethrow_exception(cv::util::get<Exception>(cmd).eptr);
return true;
}
}
GAPI_Assert(cv::util::holds_alternative<Result>(cmd));
sync_data(cv::util::get<Result>(cmd), outs);
return true;
GAPI_Assert(false && "Unreachable code");
}
std::tuple<bool, cv::util::variant<cv::GRunArgs, cv::GOptRunArgs>> cv::gimpl::GStreamingExecutor::pull()

View File

@ -50,11 +50,12 @@ struct Result {
using Cmd = cv::util::variant
< cv::util::monostate
, Start // Tells emitters to start working. Not broadcasted to workers.
, Stop // Tells emitters to stop working. Broadcasted to workers.
, cv::GRunArg // Workers data payload to process.
, Result // Pipeline's data for gout()
>;
, Start // Tells emitters to start working. Not broadcasted to workers.
, Stop // Tells emitters to stop working. Broadcasted to workers.
, cv::GRunArg // Workers data payload to process.
, Result // Pipeline's data for gout()
, cv::gimpl::Exception // Exception which is thrown while execution.
>;
// Interface over a queue. The underlying queue implementation may be
// different. This class is mainly introduced to bring some

View File

@ -2915,6 +2915,47 @@ TEST(Infer, ModelWith2DInputs)
#endif // HAVE_NGRAPH
TEST(TestAgeGender, ThrowBlobAndInputPrecisionMismatchStreaming)
{
const std::string device = "MYRIAD";
skipIfDeviceNotAvailable(device);
initDLDTDataPath();
cv::gapi::ie::detail::ParamDesc params;
// NB: Precision for inputs is U8.
params.model_path = compileAgeGenderBlob(device);
params.device_id = device;
// Configure & run G-API
using AGInfo = std::tuple<cv::GMat, cv::GMat>;
G_API_NET(AgeGender, <AGInfo(cv::GMat)>, "test-age-gender");
auto pp = cv::gapi::ie::Params<AgeGender> {
params.model_path, params.device_id
}.cfgOutputLayers({ "age_conv3", "prob" });
cv::GMat in, age, gender;
std::tie(age, gender) = cv::gapi::infer<AgeGender>(in);
auto pipeline = cv::GComputation(cv::GIn(in), cv::GOut(age, gender))
.compileStreaming(cv::compile_args(cv::gapi::networks(pp)));
cv::Mat in_mat(320, 240, CV_32FC3);
cv::randu(in_mat, 0, 1);
cv::Mat gapi_age, gapi_gender;
pipeline.setSource(cv::gin(in_mat));
pipeline.start();
// NB: Blob precision is U8, but user pass FP32 data, so exception will be thrown.
// Now exception comes directly from IE, but since G-API has information
// about data precision at the compile stage, consider the possibility of
// throwing exception from there.
for (int i = 0; i < 10; ++i) {
EXPECT_ANY_THROW(pipeline.pull(cv::gout(gapi_age, gapi_gender)));
}
}
} // namespace opencv_test
#endif // HAVE_INF_ENGINE

View File

@ -304,6 +304,66 @@ void checkPullOverload(const cv::Mat& ref,
EXPECT_EQ(0., cv::norm(ref, out_mat, cv::NORM_INF));
}
class InvalidSource : public cv::gapi::wip::IStreamSource {
public:
InvalidSource(const size_t throw_every_nth_frame,
const size_t num_frames)
: m_throw_every_nth_frame(throw_every_nth_frame),
m_curr_frame_id(0u),
m_num_frames(num_frames),
m_mat(1, 1, CV_8U) {
}
static std::string exception_msg()
{
return "InvalidSource sucessfuly failed!";
}
bool pull(cv::gapi::wip::Data& d) {
++m_curr_frame_id;
if (m_curr_frame_id > m_num_frames) {
return false;
}
if (m_curr_frame_id % m_throw_every_nth_frame == 0) {
throw std::logic_error(InvalidSource::exception_msg());
return true;
} else {
d = cv::Mat(m_mat);
}
return true;
}
cv::GMetaArg descr_of() const override {
return cv::GMetaArg{cv::descr_of(m_mat)};
}
private:
size_t m_throw_every_nth_frame;
size_t m_curr_frame_id;
size_t m_num_frames;
cv::Mat m_mat;
};
G_TYPED_KERNEL(GThrowExceptionOp, <GMat(GMat)>, "org.opencv.test.throw_error_op")
{
static GMatDesc outMeta(GMatDesc in) { return in; }
};
GAPI_OCV_KERNEL(GThrowExceptionKernel, GThrowExceptionOp)
{
static std::string exception_msg()
{
return "GThrowExceptionKernel sucessfuly failed";
}
static void run(const cv::Mat&, cv::Mat&)
{
throw std::logic_error(GThrowExceptionKernel::exception_msg());
}
};
} // anonymous namespace
TEST_P(GAPI_Streaming, SmokeTest_ConstInput_GMat)
@ -2512,5 +2572,109 @@ TEST(GAPI_Streaming, TestDesyncMediaFrameGray) {
}
}
TEST(GAPI_Streaming_Exception, SingleKernelThrow) {
cv::GMat in;
auto pipeline = cv::GComputation(in, GThrowExceptionOp::on(in))
.compileStreaming(cv::compile_args(cv::gapi::kernels<GThrowExceptionKernel>()));
cv::Mat in_mat(cv::Size(300, 300), CV_8UC3);
cv::randu(in_mat, cv::Scalar::all(0), cv::Scalar::all(255));
pipeline.setSource(cv::gin(in_mat));
pipeline.start();
EXPECT_THROW(
try {
cv::Mat out_mat;
pipeline.pull(cv::gout(out_mat));
} catch (const std::logic_error& e) {
EXPECT_EQ(GThrowExceptionKernel::exception_msg(), e.what());
throw;
}, std::logic_error);
}
TEST(GAPI_Streaming_Exception, StreamingBackendExceptionAsInput) {
cv::GMat in;
auto pipeline = cv::GComputation(in,
cv::gapi::copy(GThrowExceptionOp::on(in)))
.compileStreaming(cv::compile_args(cv::gapi::kernels<GThrowExceptionKernel>()));
cv::Mat in_mat(cv::Size(300, 300), CV_8UC3);
cv::randu(in_mat, cv::Scalar::all(0), cv::Scalar::all(255));
pipeline.setSource(cv::gin(in_mat));
pipeline.start();
EXPECT_THROW(
try {
cv::Mat out_mat;
pipeline.pull(cv::gout(out_mat));
} catch (const std::logic_error& e) {
EXPECT_EQ(GThrowExceptionKernel::exception_msg(), e.what());
throw;
}, std::logic_error);
}
TEST(GAPI_Streaming_Exception, RegularBacckendsExceptionAsInput) {
cv::GMat in;
auto pipeline = cv::GComputation(in,
cv::gapi::add(GThrowExceptionOp::on(in), GThrowExceptionOp::on(in)))
.compileStreaming(cv::compile_args(cv::gapi::kernels<GThrowExceptionKernel>()));
cv::Mat in_mat(cv::Size(300, 300), CV_8UC3);
cv::randu(in_mat, cv::Scalar::all(0), cv::Scalar::all(255));
pipeline.setSource(cv::gin(in_mat));
pipeline.start();
EXPECT_THROW(
try {
cv::Mat out_mat;
pipeline.pull(cv::gout(out_mat));
} catch (const std::logic_error& e) {
EXPECT_EQ(GThrowExceptionKernel::exception_msg(), e.what());
throw;
}, std::logic_error);
}
TEST(GAPI_Streaming_Exception, SourceThrow) {
cv::GMat in;
auto pipeline = cv::GComputation(in, cv::gapi::copy(in)).compileStreaming();
pipeline.setSource(std::make_shared<InvalidSource>(1u, 1u));
pipeline.start();
EXPECT_THROW(
try {
cv::Mat out_mat;
pipeline.pull(cv::gout(out_mat));
} catch (const std::logic_error& e) {
EXPECT_EQ(InvalidSource::exception_msg(), e.what());
throw;
}, std::logic_error);
}
TEST(GAPI_Streaming_Exception, SourceThrowEverySecondFrame) {
constexpr size_t throw_every_nth_frame = 2u;
constexpr size_t num_frames = 10u;
size_t curr_frame = 0;
bool has_frame = true;
cv::Mat out_mat;
cv::GMat in;
auto pipeline = cv::GComputation(in, cv::gapi::copy(in)).compileStreaming();
pipeline.setSource(std::make_shared<InvalidSource>(throw_every_nth_frame, num_frames));
pipeline.start();
while (has_frame) {
++curr_frame;
try {
has_frame = pipeline.pull(cv::gout(out_mat));
} catch (const std::exception& e) {
EXPECT_TRUE(curr_frame % throw_every_nth_frame == 0);
EXPECT_EQ(InvalidSource::exception_msg(), e.what());
}
}
// NB: Pull was called num_frames + 1(stop).
EXPECT_EQ(num_frames, curr_frame - 1);
}
} // namespace opencv_test