mirror of
https://github.com/opencv/opencv.git
synced 2025-01-06 02:08:12 +08:00
0b5fd4f6bb
Preventing: gapi_async_test.cpp:448:26: error: ‘sleep_for’ is not a member of ‘std::this_thread’
526 lines
16 KiB
C++
526 lines
16 KiB
C++
// This file is part of OpenCV project.
|
|
// It is subject to the license terms in the LICENSE file found in the top-level directory
|
|
// of this distribution and at http://opencv.org/license.html.
|
|
//
|
|
// Copyright (C) 2019 Intel Corporation
|
|
|
|
|
|
#include "test_precomp.hpp"
|
|
#include <opencv2/gapi/gcomputation_async.hpp>
|
|
#include <opencv2/gapi/gcompiled_async.hpp>
|
|
#include <opencv2/gapi/gasync_context.hpp>
|
|
|
|
|
|
#include <condition_variable>
|
|
#include <stdexcept>
|
|
#include <thread>
|
|
|
|
namespace opencv_test
|
|
{
|
|
//Main idea behind these tests is to have the same test script that is parameterized in order to test all setups (GCompiled vs apply, callback vs future).
|
|
//So these differences are factored into devoted helper classes (mixins) which are then used by the common test script by help of CRTP.
|
|
//Actual GAPI Computation with parameters to run on is mixed into test via CRTP as well.
|
|
|
|
struct SumOfSum2x2 {
|
|
cv::GComputation sum_of_sum;
|
|
SumOfSum2x2() : sum_of_sum([]{
|
|
cv::GMat in;
|
|
cv::GScalar out = cv::gapi::sum(in + in);
|
|
return GComputation{in, out};
|
|
})
|
|
{}
|
|
|
|
const cv::Size sz{2, 2};
|
|
cv::Mat in_mat{sz, CV_8U, cv::Scalar(1)};
|
|
cv::Scalar out_sc;
|
|
|
|
cv::GCompiled compile(){
|
|
return sum_of_sum.compile(descr_of(in_mat));
|
|
}
|
|
|
|
cv::GComputation& computation(){
|
|
return sum_of_sum;
|
|
}
|
|
|
|
cv::GCompileArgs compile_args(){
|
|
return {};
|
|
}
|
|
|
|
cv::GRunArgs in_args(){
|
|
return cv::gin(in_mat);
|
|
}
|
|
|
|
cv::GRunArgsP out_args(){
|
|
return cv::gout(out_sc);
|
|
}
|
|
|
|
void verify(){
|
|
EXPECT_EQ(8, out_sc[0]);
|
|
}
|
|
};
|
|
|
|
namespace {
|
|
G_TYPED_KERNEL(GThrow, <GMat(GMat)>, "org.opencv.test.throw")
|
|
{
|
|
static GMatDesc outMeta(GMatDesc in) { return in; }
|
|
|
|
};
|
|
|
|
struct gthrow_exception : std::runtime_error {
|
|
using std::runtime_error::runtime_error;
|
|
};
|
|
|
|
GAPI_OCV_KERNEL(GThrowImpl, GThrow)
|
|
{
|
|
static void run(const cv::Mat& in, cv::Mat&)
|
|
{
|
|
//this condition is needed to avoid "Unreachable code" warning on windows inside OCVCallHelper
|
|
if (!in.empty())
|
|
{
|
|
throw gthrow_exception{"test"};
|
|
}
|
|
}
|
|
};
|
|
|
|
|
|
//TODO: unify with callback helper code
|
|
struct cancel_struct {
|
|
std::atomic<int> num_tasks_to_spawn;
|
|
|
|
cv::gapi::wip::GAsyncContext ctx;
|
|
|
|
cancel_struct(int tasks_to_spawn) : num_tasks_to_spawn(tasks_to_spawn) {}
|
|
};
|
|
|
|
G_TYPED_KERNEL(GCancelationAdHoc, <GMat(GMat, cancel_struct*)>, "org.opencv.test.cancel_ad_hoc")
|
|
{
|
|
static GMatDesc outMeta(GMatDesc in, cancel_struct* ) { return in; }
|
|
|
|
};
|
|
|
|
GAPI_OCV_KERNEL(GCancelationAdHocImpl, GCancelationAdHoc)
|
|
{
|
|
static void run(const cv::Mat& , cancel_struct* cancel_struct_p, cv::Mat&) {
|
|
auto& cancel_struct_ = * cancel_struct_p;
|
|
auto num_tasks_to_spawn = -- cancel_struct_.num_tasks_to_spawn;
|
|
cancel_struct_.ctx.cancel();
|
|
EXPECT_GT(num_tasks_to_spawn, 0)<<"Incorrect Test setup - to small number of tasks to feed the queue \n";
|
|
}
|
|
};
|
|
}
|
|
|
|
struct ExceptionOnExecution {
|
|
cv::GComputation throwing_gcomp;
|
|
ExceptionOnExecution() : throwing_gcomp([]{
|
|
cv::GMat in;
|
|
auto gout = GThrow::on(in);
|
|
return GComputation{in, gout};
|
|
})
|
|
{}
|
|
|
|
|
|
const cv::Size sz{2, 2};
|
|
cv::Mat in_mat{sz, CV_8U, cv::Scalar(1)};
|
|
cv::Mat out;
|
|
|
|
cv::GCompiled compile(){
|
|
return throwing_gcomp.compile(descr_of(in_mat), compile_args());
|
|
}
|
|
|
|
cv::GComputation& computation(){
|
|
return throwing_gcomp;
|
|
}
|
|
|
|
cv::GRunArgs in_args(){
|
|
return cv::gin(in_mat);
|
|
}
|
|
|
|
cv::GRunArgsP out_args(){
|
|
return cv::gout(out);
|
|
}
|
|
|
|
cv::GCompileArgs compile_args(){
|
|
auto pkg = cv::gapi::kernels<GThrowImpl>();
|
|
return cv::compile_args(pkg);
|
|
}
|
|
|
|
};
|
|
|
|
struct SelfCanceling {
|
|
cv::GComputation self_cancel;
|
|
SelfCanceling(cancel_struct* cancel_struct_p) : self_cancel([cancel_struct_p]{
|
|
cv::GMat in;
|
|
cv::GMat out = GCancelationAdHoc::on(in, cancel_struct_p);
|
|
return GComputation{in, out};
|
|
})
|
|
{}
|
|
|
|
const cv::Size sz{2, 2};
|
|
cv::Mat in_mat{sz, CV_8U, cv::Scalar(1)};
|
|
cv::Mat out_mat;
|
|
|
|
cv::GCompiled compile(){
|
|
return self_cancel.compile(descr_of(in_mat), compile_args());
|
|
}
|
|
|
|
cv::GComputation& computation(){
|
|
return self_cancel;
|
|
}
|
|
|
|
cv::GRunArgs in_args(){
|
|
return cv::gin(in_mat);
|
|
}
|
|
|
|
cv::GRunArgsP out_args(){
|
|
return cv::gout(out_mat);
|
|
}
|
|
|
|
cv::GCompileArgs compile_args(){
|
|
auto pkg = cv::gapi::kernels<GCancelationAdHocImpl>();
|
|
return cv::compile_args(pkg);
|
|
}
|
|
};
|
|
|
|
template<typename crtp_final_t>
|
|
struct crtp_cast {
|
|
template<typename crtp_base_t>
|
|
static crtp_final_t* crtp_cast_(crtp_base_t* this_)
|
|
{
|
|
return static_cast<crtp_final_t*>(this_);
|
|
}
|
|
};
|
|
|
|
//Test Mixin, hiding details of callback based notification
|
|
template<typename crtp_final_t>
|
|
struct CallBack: crtp_cast<crtp_final_t> {
|
|
std::atomic<bool> callback_called = {false};
|
|
std::mutex mtx;
|
|
std::exception_ptr ep;
|
|
|
|
std::condition_variable cv;
|
|
|
|
std::function<void(std::exception_ptr)> callback(){
|
|
return [&](std::exception_ptr ep_){
|
|
ep = ep_;
|
|
callback_called = true;
|
|
mtx.lock();
|
|
mtx.unlock();
|
|
cv.notify_one();
|
|
};
|
|
};
|
|
|
|
template<typename... Args >
|
|
void start_async(Args&&... args){
|
|
this->crtp_cast_(this)->async(callback(), std::forward<Args>(args)...);
|
|
}
|
|
|
|
template<typename... Args >
|
|
void start_async(cv::gapi::wip::GAsyncContext& ctx, Args&&... args){
|
|
this->crtp_cast_(this)->async(ctx, callback(), std::forward<Args>(args)...);
|
|
}
|
|
|
|
void wait_for_result()
|
|
{
|
|
std::unique_lock<std::mutex> lck{mtx};
|
|
cv.wait(lck,[&]{return callback_called == true;});
|
|
if (ep)
|
|
{
|
|
std::rethrow_exception(ep);
|
|
}
|
|
}
|
|
};
|
|
|
|
//Test Mixin, hiding details of future based notification
|
|
template<typename crtp_final_t>
|
|
struct Future: crtp_cast<crtp_final_t> {
|
|
std::future<void> f;
|
|
|
|
template<typename... Args >
|
|
void start_async(Args&&... args){
|
|
f = this->crtp_cast_(this)->async(std::forward<Args>(args)...);
|
|
}
|
|
|
|
void wait_for_result()
|
|
{
|
|
f.get();
|
|
}
|
|
};
|
|
|
|
//Test Mixin, hiding details of using compiled GAPI object
|
|
template<typename crtp_final_t>
|
|
struct AsyncCompiled : crtp_cast<crtp_final_t>{
|
|
|
|
template<typename... Args>
|
|
auto async(Args&&... args) -> decltype(cv::gapi::wip::async(std::declval<cv::GCompiled&>(), std::forward<Args>(args)...)){
|
|
auto gcmpld = this->crtp_cast_(this)->compile();
|
|
return cv::gapi::wip::async(gcmpld, std::forward<Args>(args)...);
|
|
}
|
|
|
|
template<typename... Args>
|
|
auto async(cv::gapi::wip::GAsyncContext& ctx, Args&&... args) ->
|
|
decltype(cv::gapi::wip::async(std::declval<cv::GCompiled&>(), std::forward<Args>(args)..., std::declval<cv::gapi::wip::GAsyncContext&>()))
|
|
{
|
|
auto gcmpld = this->crtp_cast_(this)->compile();
|
|
return cv::gapi::wip::async(gcmpld, std::forward<Args>(args)..., ctx);
|
|
}
|
|
};
|
|
|
|
//Test Mixin, hiding details of calling apply (async_apply) on GAPI Computation object
|
|
template<typename crtp_final_t>
|
|
struct AsyncApply : crtp_cast<crtp_final_t> {
|
|
|
|
template<typename... Args>
|
|
auto async(Args&&... args) ->
|
|
decltype(cv::gapi::wip::async_apply(std::declval<cv::GComputation&>(), std::forward<Args>(args)..., std::declval<cv::GCompileArgs>()))
|
|
{
|
|
return cv::gapi::wip::async_apply(
|
|
this->crtp_cast_(this)->computation(), std::forward<Args>(args)..., this->crtp_cast_(this)->compile_args()
|
|
);
|
|
}
|
|
|
|
template<typename... Args>
|
|
auto async(cv::gapi::wip::GAsyncContext& ctx, Args&&... args) ->
|
|
decltype(cv::gapi::wip::async_apply(std::declval<cv::GComputation&>(), std::forward<Args>(args)... , std::declval<cv::GCompileArgs>(), std::declval<cv::gapi::wip::GAsyncContext&>()))
|
|
{
|
|
return cv::gapi::wip::async_apply(
|
|
this->crtp_cast_(this)->computation(), std::forward<Args>(args)..., this->crtp_cast_(this)->compile_args(), ctx
|
|
);
|
|
}
|
|
|
|
};
|
|
|
|
|
|
template<typename case_t>
|
|
struct normal: ::testing::Test, case_t{};
|
|
|
|
TYPED_TEST_CASE_P(normal);
|
|
|
|
TYPED_TEST_P(normal, basic){
|
|
//Normal scenario: start function asynchronously and wait for the result, and verify it
|
|
this->start_async(this->in_args(), this->out_args());
|
|
this->wait_for_result();
|
|
|
|
this->verify();
|
|
}
|
|
|
|
REGISTER_TYPED_TEST_CASE_P(normal,
|
|
basic
|
|
);
|
|
|
|
template<typename case_t>
|
|
struct exception: ::testing::Test, case_t{};
|
|
TYPED_TEST_CASE_P(exception);
|
|
|
|
TYPED_TEST_P(exception, basic){
|
|
//Exceptional scenario: start function asynchronously and make sure exception is passed to the user
|
|
this->start_async(this->in_args(), this->out_args());
|
|
EXPECT_THROW(this->wait_for_result(), gthrow_exception);
|
|
}
|
|
|
|
REGISTER_TYPED_TEST_CASE_P(exception,
|
|
basic
|
|
);
|
|
|
|
template<typename case_t>
|
|
struct stress : ::testing::Test{};
|
|
TYPED_TEST_CASE_P(stress);
|
|
|
|
TYPED_TEST_P(stress, test){
|
|
//Some stress testing: use a number of threads to start a bunch of async requests
|
|
const std::size_t request_per_thread = 10;
|
|
const std::size_t number_of_threads = 4;
|
|
|
|
auto thread_body = [&](){
|
|
std::vector<TypeParam> requests(request_per_thread);
|
|
for (auto&& r : requests){
|
|
r.start_async(r.in_args(), r.out_args());
|
|
}
|
|
|
|
for (auto&& r : requests){
|
|
r.wait_for_result();
|
|
r.verify();
|
|
}
|
|
};
|
|
|
|
std::vector<std::thread> pool {number_of_threads};
|
|
for (auto&& t : pool){
|
|
t = std::thread{thread_body};
|
|
}
|
|
|
|
for (auto&& t : pool){
|
|
t.join();
|
|
}
|
|
}
|
|
REGISTER_TYPED_TEST_CASE_P(stress, test);
|
|
|
|
template<typename case_t>
|
|
struct cancel : ::testing::Test{};
|
|
TYPED_TEST_CASE_P(cancel);
|
|
|
|
TYPED_TEST_P(cancel, basic)
|
|
{
|
|
#if defined(__GNUC__) && __GNUC__ >= 11
|
|
// std::vector<TypeParam> requests can't handle type with ctor parameter (SelfCanceling)
|
|
FAIL() << "Test code is not available due to compilation error with GCC 11";
|
|
#else
|
|
constexpr int num_tasks = 100;
|
|
cancel_struct cancel_struct_ {num_tasks};
|
|
std::vector<TypeParam> requests; requests.reserve(num_tasks);
|
|
|
|
for (auto i = num_tasks; i>0; i--){
|
|
requests.emplace_back(&cancel_struct_);
|
|
}
|
|
for (auto&& r : requests){
|
|
//first request will cancel other on it's execution
|
|
r.start_async(cancel_struct_.ctx, r.in_args(), r.out_args());
|
|
}
|
|
|
|
unsigned int canceled = 0 ;
|
|
for (auto&& r : requests){
|
|
try {
|
|
r.wait_for_result();
|
|
}catch (cv::gapi::wip::GAsyncCanceled&){
|
|
++canceled;
|
|
}
|
|
}
|
|
ASSERT_GT(canceled, 0u);
|
|
#endif
|
|
}
|
|
|
|
namespace {
|
|
GRunArgs deep_copy_out_args(const GRunArgsP& args ){
|
|
GRunArgs result; result.reserve(args.size());
|
|
for (auto&& arg : args){
|
|
//FIXME: replace this switch with use of visit() on variant, when it will be available
|
|
switch (arg.index()){
|
|
case GRunArgP::index_of<cv::UMat*>() : result.emplace_back(*util::get<cv::UMat*>(arg)); break;
|
|
case GRunArgP::index_of<cv::Mat*>() : result.emplace_back(*util::get<cv::Mat*>(arg)); break;
|
|
case GRunArgP::index_of<cv::Scalar*>() : result.emplace_back(*util::get<cv::Scalar*> (arg)); break;
|
|
case GRunArgP::index_of<cv::detail::VectorRef>() : result.emplace_back(util::get<cv::detail::VectorRef> (arg)); break;
|
|
default : ;
|
|
}
|
|
}
|
|
return result;
|
|
}
|
|
|
|
GRunArgsP args_p_from_args(GRunArgs& args){
|
|
GRunArgsP result; result.reserve(args.size());
|
|
for (auto&& arg : args){
|
|
switch (arg.index()){
|
|
case GRunArg::index_of<cv::Mat>() : result.emplace_back(&util::get<cv::Mat>(arg)); break;
|
|
case GRunArg::index_of<cv::UMat>() : result.emplace_back(&util::get<cv::UMat>(arg)); break;
|
|
case GRunArg::index_of<cv::Scalar>() : result.emplace_back(&util::get<cv::Scalar> (arg)); break;
|
|
case GRunArg::index_of<cv::detail::VectorRef>() : result.emplace_back(util::get<cv::detail::VectorRef> (arg)); break;
|
|
default : ;
|
|
}
|
|
}
|
|
return result;
|
|
}
|
|
}
|
|
|
|
REGISTER_TYPED_TEST_CASE_P(cancel, basic);
|
|
|
|
template<typename case_t>
|
|
struct output_args_lifetime : ::testing::Test{
|
|
static constexpr const int num_of_requests = 20;
|
|
};
|
|
TYPED_TEST_CASE_P(output_args_lifetime);
|
|
//There are intentionally no actual checks (asserts and verify) in output_args_lifetime tests.
|
|
//They are more of example use-cases than real tests. (ASAN/valgrind can still catch issues here)
|
|
TYPED_TEST_P(output_args_lifetime, callback){
|
|
|
|
std::atomic<int> active_requests = {0};
|
|
|
|
for (int i=0; i<this->num_of_requests; i++)
|
|
{
|
|
TypeParam r;
|
|
|
|
//As output arguments are __captured by reference__ calling code
|
|
//__must__ ensure they live long enough to complete asynchronous activity.
|
|
//(i.e. live at least until callback is called)
|
|
auto out_args_ptr = std::make_shared<cv::GRunArgs>(deep_copy_out_args(r.out_args()));
|
|
|
|
//Extend lifetime of out_args_ptr content by capturing it into a callback
|
|
auto cb = [&active_requests, out_args_ptr](std::exception_ptr ){
|
|
--active_requests;
|
|
};
|
|
|
|
++active_requests;
|
|
|
|
r.async(cb, r.in_args(), args_p_from_args(*out_args_ptr));
|
|
}
|
|
|
|
|
|
while(active_requests){
|
|
std::this_thread::sleep_for(std::chrono::milliseconds{2});
|
|
}
|
|
}
|
|
|
|
|
|
TYPED_TEST_P(output_args_lifetime, future){
|
|
|
|
std::vector<std::future<void>> fs(this->num_of_requests);
|
|
std::vector<std::shared_ptr<cv::GRunArgs>> out_ptrs(this->num_of_requests);
|
|
|
|
for (int i=0; i<this->num_of_requests; i++)
|
|
{
|
|
TypeParam r;
|
|
|
|
//As output arguments are __captured by reference__ calling code
|
|
//__must__ ensure they live long enough to complete asynchronous activity.
|
|
//(i.e. live at least until future.get()/wait() is returned)
|
|
auto out_args_ptr = std::make_shared<cv::GRunArgs>(deep_copy_out_args(r.out_args()));
|
|
|
|
//Extend lifetime of out_args_ptr content
|
|
out_ptrs[i] = out_args_ptr;
|
|
|
|
fs[i] = r.async(r.in_args(), args_p_from_args(*out_args_ptr));
|
|
}
|
|
|
|
for (auto const& ftr : fs ){
|
|
ftr.wait();
|
|
}
|
|
}
|
|
REGISTER_TYPED_TEST_CASE_P(output_args_lifetime, callback, future);
|
|
|
|
//little helpers to match up all combinations of setups
|
|
template<typename compute_fixture_t, template<typename> class... args_t>
|
|
struct Case
|
|
: compute_fixture_t,
|
|
args_t<Case<compute_fixture_t, args_t...>> ...
|
|
{
|
|
template<typename... Args>
|
|
Case(Args&&... args) : compute_fixture_t(std::forward<Args>(args)...) { }
|
|
Case(Case const & ) = default;
|
|
Case(Case && ) = default;
|
|
|
|
Case() = default;
|
|
};
|
|
|
|
template<typename computation_t>
|
|
using cases = ::testing::Types<
|
|
Case<computation_t, CallBack, AsyncCompiled>,
|
|
Case<computation_t, CallBack, AsyncApply>,
|
|
Case<computation_t, Future, AsyncCompiled>,
|
|
Case<computation_t, Future, AsyncApply>
|
|
>;
|
|
|
|
INSTANTIATE_TYPED_TEST_CASE_P(AsyncAPINormalFlow_, normal, cases<SumOfSum2x2>);
|
|
INSTANTIATE_TYPED_TEST_CASE_P(AsyncAPIExceptionHandling_, exception, cases<ExceptionOnExecution>);
|
|
|
|
INSTANTIATE_TYPED_TEST_CASE_P(AsyncAPIStress, stress, cases<SumOfSum2x2>);
|
|
|
|
INSTANTIATE_TYPED_TEST_CASE_P(AsyncAPICancelation, cancel, cases<SelfCanceling>);
|
|
|
|
template<typename computation_t>
|
|
using explicit_wait_cases = ::testing::Types<
|
|
Case<computation_t, AsyncCompiled>,
|
|
Case<computation_t, AsyncApply>,
|
|
Case<computation_t, AsyncCompiled>,
|
|
Case<computation_t, AsyncApply>
|
|
>;
|
|
|
|
INSTANTIATE_TYPED_TEST_CASE_P(AsyncAPIOutArgsLifetTime, output_args_lifetime, explicit_wait_cases<SumOfSum2x2>);
|
|
|
|
} // namespace opencv_test
|