Merge pull request #14440 from alalek:async_array

This commit is contained in:
Alexander Alekhin 2019-06-08 20:57:15 +00:00
commit 6d916c5bb4
23 changed files with 944 additions and 145 deletions

View File

@ -68,6 +68,7 @@
@defgroup core_c_glue Connections with C++
@}
@defgroup core_array Operations on arrays
@defgroup core_async Asynchronous API
@defgroup core_xml XML/YAML Persistence
@defgroup core_cluster Clustering
@defgroup core_utils Utility and system functions and macros

View File

@ -0,0 +1,105 @@
// This file is part of OpenCV project.
// It is subject to the license terms in the LICENSE file found in the top-level directory
// of this distribution and at http://opencv.org/license.html.
#ifndef OPENCV_CORE_ASYNC_HPP
#define OPENCV_CORE_ASYNC_HPP
#include <opencv2/core/mat.hpp>
#ifdef CV_CXX11
//#include <future>
#include <chrono>
#endif
namespace cv {
/** @addtogroup core_async
@{
*/
/** @brief Returns result of asynchronous operations
Object has attached asynchronous state.
Assignment operator doesn't clone asynchronous state (it is shared between all instances).
Result can be fetched via get() method only once.
*/
class CV_EXPORTS_W AsyncArray
{
public:
~AsyncArray() CV_NOEXCEPT;
CV_WRAP AsyncArray() CV_NOEXCEPT;
AsyncArray(const AsyncArray& o) CV_NOEXCEPT;
AsyncArray& operator=(const AsyncArray& o) CV_NOEXCEPT;
CV_WRAP void release() CV_NOEXCEPT;
/** Fetch the result.
@param[out] dst destination array
Waits for result until container has valid result.
Throws exception if exception was stored as a result.
Throws exception on invalid container state.
@note Result or stored exception can be fetched only once.
*/
CV_WRAP void get(OutputArray dst) const;
/** Retrieving the result with timeout
@param[out] dst destination array
@param[in] timeoutNs timeout in nanoseconds, -1 for infinite wait
@returns true if result is ready, false if the timeout has expired
@note Result or stored exception can be fetched only once.
*/
bool get(OutputArray dst, int64 timeoutNs) const;
CV_WRAP inline
bool get(OutputArray dst, double timeoutNs) const { return get(dst, (int64)timeoutNs); }
bool wait_for(int64 timeoutNs) const;
CV_WRAP inline
bool wait_for(double timeoutNs) const { return wait_for((int64)timeoutNs); }
CV_WRAP bool valid() const CV_NOEXCEPT;
#ifdef CV_CXX11
inline AsyncArray(AsyncArray&& o) { p = o.p; o.p = NULL; }
inline AsyncArray& operator=(AsyncArray&& o) CV_NOEXCEPT { std::swap(p, o.p); return *this; }
template<typename _Rep, typename _Period>
inline bool get(OutputArray dst, const std::chrono::duration<_Rep, _Period>& timeout)
{
return get(dst, (int64)(std::chrono::nanoseconds(timeout).count()));
}
template<typename _Rep, typename _Period>
inline bool wait_for(const std::chrono::duration<_Rep, _Period>& timeout)
{
return wait_for((int64)(std::chrono::nanoseconds(timeout).count()));
}
#if 0
std::future<Mat> getFutureMat() const;
std::future<UMat> getFutureUMat() const;
#endif
#endif
// PImpl
struct Impl; friend struct Impl;
inline void* _getImpl() const CV_NOEXCEPT { return p; }
protected:
Impl* p;
};
//! @}
} // namespace
#endif // OPENCV_CORE_ASYNC_HPP

View File

@ -5,6 +5,9 @@
#ifndef OPENCV_CORE_BINDINGS_UTILS_HPP
#define OPENCV_CORE_BINDINGS_UTILS_HPP
#include <opencv2/core/async.hpp>
#include <opencv2/core/detail/async_promise.hpp>
namespace cv { namespace utils {
//! @addtogroup core_utils
//! @{
@ -17,6 +20,29 @@ CV_EXPORTS_W String dumpInputOutputArray(InputOutputArray argument);
CV_EXPORTS_W String dumpInputOutputArrayOfArrays(InputOutputArrayOfArrays argument);
CV_WRAP static inline
AsyncArray testAsyncArray(InputArray argument)
{
AsyncPromise p;
p.setValue(argument);
return p.getArrayResult();
}
CV_WRAP static inline
AsyncArray testAsyncException()
{
AsyncPromise p;
try
{
CV_Error(Error::StsOk, "Test: Generated async error");
}
catch (const cv::Exception& e)
{
p.setException(e);
}
return p.getArrayResult();
}
//! @}
}} // namespace

View File

@ -622,6 +622,19 @@ Cv64suf;
# define CV_FINAL
#endif
/****************************************************************************************\
* C++11 noexcept *
\****************************************************************************************/
#ifndef CV_NOEXCEPT
# ifdef CV_CXX11
# define CV_NOEXCEPT noexcept
# endif
#endif
#ifndef CV_NOEXCEPT
# define CV_NOEXCEPT
#endif
// Integer types portatibility

View File

@ -0,0 +1,71 @@
// This file is part of OpenCV project.
// It is subject to the license terms in the LICENSE file found in the top-level directory
// of this distribution and at http://opencv.org/license.html.
#ifndef OPENCV_CORE_ASYNC_PROMISE_HPP
#define OPENCV_CORE_ASYNC_PROMISE_HPP
#include "../async.hpp"
#include "exception_ptr.hpp"
namespace cv {
/** @addtogroup core_async
@{
*/
/** @brief Provides result of asynchronous operations
*/
class CV_EXPORTS AsyncPromise
{
public:
~AsyncPromise() CV_NOEXCEPT;
AsyncPromise() CV_NOEXCEPT;
explicit AsyncPromise(const AsyncPromise& o) CV_NOEXCEPT;
AsyncPromise& operator=(const AsyncPromise& o) CV_NOEXCEPT;
void release() CV_NOEXCEPT;
/** Returns associated AsyncArray
@note Can be called once
*/
AsyncArray getArrayResult();
/** Stores asynchronous result.
@param[in] value result
*/
void setValue(InputArray value);
// TODO "move" setters
#if CV__EXCEPTION_PTR
/** Stores exception.
@param[in] exception exception to be raised in AsyncArray
*/
void setException(std::exception_ptr exception);
#endif
/** Stores exception.
@param[in] exception exception to be raised in AsyncArray
*/
void setException(const cv::Exception& exception);
#ifdef CV_CXX11
explicit AsyncPromise(AsyncPromise&& o) { p = o.p; o.p = NULL; }
AsyncPromise& operator=(AsyncPromise&& o) CV_NOEXCEPT { std::swap(p, o.p); return *this; }
#endif
// PImpl
typedef struct AsyncArray::Impl Impl; friend struct AsyncArray::Impl;
inline void* _getImpl() const CV_NOEXCEPT { return p; }
protected:
Impl* p;
};
//! @}
} // namespace
#endif // OPENCV_CORE_ASYNC_PROMISE_HPP

View File

@ -0,0 +1,27 @@
// This file is part of OpenCV project.
// It is subject to the license terms in the LICENSE file found in the top-level directory
// of this distribution and at http://opencv.org/license.html.
#ifndef OPENCV_CORE_DETAILS_EXCEPTION_PTR_H
#define OPENCV_CORE_DETAILS_EXCEPTION_PTR_H
#ifndef CV__EXCEPTION_PTR
# if defined(__ANDROID__) && defined(ATOMIC_INT_LOCK_FREE) && ATOMIC_INT_LOCK_FREE < 2
# define CV__EXCEPTION_PTR 0 // Not supported, details: https://gcc.gnu.org/bugzilla/show_bug.cgi?id=58938
# elif defined(CV_CXX11)
# define CV__EXCEPTION_PTR 1
# elif defined(_MSC_VER)
# define CV__EXCEPTION_PTR (_MSC_VER >= 1600)
# elif defined(__clang__)
# define CV__EXCEPTION_PTR 0 // C++11 only (see above)
# elif defined(__GNUC__) && defined(__GXX_EXPERIMENTAL_CXX0X__)
# define CV__EXCEPTION_PTR (__GXX_EXPERIMENTAL_CXX0X__ > 0)
# endif
#endif
#ifndef CV__EXCEPTION_PTR
# define CV__EXCEPTION_PTR 0
#elif CV__EXCEPTION_PTR
# include <exception> // std::exception_ptr
#endif
#endif // OPENCV_CORE_DETAILS_EXCEPTION_PTR_H

View File

@ -377,6 +377,9 @@ public:
void assign(const std::vector<UMat>& v) const;
void assign(const std::vector<Mat>& v) const;
void move(UMat& u) const;
void move(Mat& m) const;
};

View File

@ -0,0 +1,8 @@
#ifdef HAVE_OPENCV_CORE
#include "opencv2/core/async.hpp"
CV_PY_TO_CLASS(AsyncArray);
CV_PY_FROM_CLASS(AsyncArray);
#endif

366
modules/core/src/async.cpp Normal file
View File

@ -0,0 +1,366 @@
// This file is part of OpenCV project.
// It is subject to the license terms in the LICENSE file found in the top-level directory
// of this distribution and at http://opencv.org/license.html.
#include "precomp.hpp"
//#undef CV_CXX11 // debug non C++11 mode
#include "opencv2/core/async.hpp"
#include "opencv2/core/detail/async_promise.hpp"
#include "opencv2/core/cvstd.hpp"
#include <opencv2/core/utils/logger.defines.hpp>
#undef CV_LOG_STRIP_LEVEL
#define CV_LOG_STRIP_LEVEL CV_LOG_LEVEL_DEBUG + 1
#include <opencv2/core/utils/logger.hpp>
#ifdef CV_CXX11
#include <mutex>
#include <condition_variable>
#include <chrono>
#endif
namespace cv {
/**
Manages shared state of asynchronous result
*/
struct AsyncArray::Impl
{
int refcount;
void addrefFuture() CV_NOEXCEPT { CV_XADD(&refcount_future, 1); CV_XADD(&refcount, 1); } \
void releaseFuture() CV_NOEXCEPT { CV_XADD(&refcount_future, -1); if(1 == CV_XADD(&refcount, -1)) delete this; } \
int refcount_future;
void addrefPromise() CV_NOEXCEPT { CV_XADD(&refcount_promise, 1); CV_XADD(&refcount, 1); } \
void releasePromise() CV_NOEXCEPT { CV_XADD(&refcount_promise, -1); if(1 == CV_XADD(&refcount, -1)) delete this; } \
int refcount_promise;
#ifdef CV_CXX11
mutable std::mutex mtx;
mutable std::condition_variable cond_var;
#else
mutable cv::Mutex mtx;
#endif
mutable bool has_result; // Mat, UMat or exception
mutable cv::Ptr<Mat> result_mat;
mutable cv::Ptr<UMat> result_umat;
bool has_exception;
#if CV__EXCEPTION_PTR
std::exception_ptr exception;
#endif
cv::Exception cv_exception;
mutable bool result_is_fetched;
bool future_is_returned;
Impl()
: refcount(1), refcount_future(0), refcount_promise(1)
, has_result(false)
, has_exception(false)
, result_is_fetched(false)
, future_is_returned(false)
{
// nothing
}
~Impl()
{
if (has_result && !result_is_fetched)
{
CV_LOG_INFO(NULL, "Asynchronous result has not been fetched");
}
}
bool get(OutputArray dst, int64 timeoutNs) const
{
CV_Assert(!result_is_fetched);
if (!has_result)
{
if(refcount_promise == 0)
CV_Error(Error::StsInternal, "Asynchronous result producer has been destroyed");
if (!wait_for(timeoutNs))
return false;
}
#ifdef CV_CXX11
std::unique_lock<std::mutex> lock(mtx);
#else
cv::AutoLock lock(mtx);
#endif
if (has_result)
{
if (!result_mat.empty())
{
dst.move(*result_mat.get());
result_mat.release();
result_is_fetched = true;
return true;
}
if (!result_umat.empty())
{
dst.move(*result_umat.get());
result_umat.release();
result_is_fetched = true;
return true;
}
#if CV__EXCEPTION_PTR
if (has_exception && exception)
{
result_is_fetched = true;
std::rethrow_exception(exception);
}
#endif
if (has_exception)
{
result_is_fetched = true;
throw cv_exception;
}
CV_Error(Error::StsInternal, "AsyncArray: invalid state of 'has_result = true'");
}
CV_Assert(!has_result);
CV_Assert(timeoutNs < 0);
return false;
}
bool valid() const CV_NOEXCEPT
{
if (result_is_fetched)
return false;
if (refcount_promise == 0 && !has_result)
return false;
return true;
}
bool wait_for(int64 timeoutNs) const
{
CV_Assert(valid());
if (has_result)
return has_result;
if (timeoutNs == 0)
return has_result;
CV_LOG_INFO(NULL, "Waiting for async result ...");
#ifdef CV_CXX11
std::unique_lock<std::mutex> lock(mtx);
const auto cond_pred = [&]{ return has_result == true; };
if (timeoutNs > 0)
return cond_var.wait_for(lock, std::chrono::nanoseconds(timeoutNs), cond_pred);
else
{
cond_var.wait(lock, cond_pred);
CV_Assert(has_result);
return true;
}
#else
CV_Error(Error::StsNotImplemented, "OpenCV has been built without async waiting support (C++11 is required)");
#endif
}
AsyncArray getArrayResult()
{
CV_Assert(refcount_future == 0);
AsyncArray result;
addrefFuture();
result.p = this;
future_is_returned = true;
return result;
}
void setValue(InputArray value)
{
if (future_is_returned && refcount_future == 0)
CV_Error(Error::StsError, "Associated AsyncArray has been destroyed");
#ifdef CV_CXX11
std::unique_lock<std::mutex> lock(mtx);
#else
cv::AutoLock lock(mtx);
#endif
CV_Assert(!has_result);
int k = value.kind();
if (k == _InputArray::UMAT)
{
result_umat = makePtr<UMat>();
value.copyTo(*result_umat.get());
}
else
{
result_mat = makePtr<Mat>();
value.copyTo(*result_mat.get());
}
has_result = true;
#ifdef CV_CXX11
cond_var.notify_all();
#endif
}
#if CV__EXCEPTION_PTR
void setException(std::exception_ptr e)
{
if (future_is_returned && refcount_future == 0)
CV_Error(Error::StsError, "Associated AsyncArray has been destroyed");
#ifdef CV_CXX11
std::unique_lock<std::mutex> lock(mtx);
#else
cv::AutoLock lock(mtx);
#endif
CV_Assert(!has_result);
has_exception = true;
exception = e;
has_result = true;
#ifdef CV_CXX11
cond_var.notify_all();
#endif
}
#endif
void setException(const cv::Exception e)
{
if (future_is_returned && refcount_future == 0)
CV_Error(Error::StsError, "Associated AsyncArray has been destroyed");
#ifdef CV_CXX11
std::unique_lock<std::mutex> lock(mtx);
#else
cv::AutoLock lock(mtx);
#endif
CV_Assert(!has_result);
has_exception = true;
cv_exception = e;
has_result = true;
#ifdef CV_CXX11
cond_var.notify_all();
#endif
}
};
AsyncArray::AsyncArray() CV_NOEXCEPT
: p(NULL)
{
}
AsyncArray::~AsyncArray() CV_NOEXCEPT
{
release();
}
AsyncArray::AsyncArray(const AsyncArray& o) CV_NOEXCEPT
: p(o.p)
{
if (p)
p->addrefFuture();
}
AsyncArray& AsyncArray::operator=(const AsyncArray& o) CV_NOEXCEPT
{
Impl* newp = o.p;
if (newp)
newp->addrefFuture();
release();
p = newp;
return *this;
}
void AsyncArray::release() CV_NOEXCEPT
{
Impl* impl = p;
p = NULL;
if (impl)
impl->releaseFuture();
}
bool AsyncArray::get(OutputArray dst, int64 timeoutNs) const
{
CV_Assert(p);
return p->get(dst, timeoutNs);
}
void AsyncArray::get(OutputArray dst) const
{
CV_Assert(p);
bool res = p->get(dst, -1);
CV_Assert(res);
}
bool AsyncArray::wait_for(int64 timeoutNs) const
{
CV_Assert(p);
return p->wait_for(timeoutNs);
}
bool AsyncArray::valid() const CV_NOEXCEPT
{
if (!p) return false;
return p->valid();
}
//
// AsyncPromise
//
AsyncPromise::AsyncPromise() CV_NOEXCEPT
: p(new AsyncArray::Impl())
{
}
AsyncPromise::~AsyncPromise() CV_NOEXCEPT
{
release();
}
AsyncPromise::AsyncPromise(const AsyncPromise& o) CV_NOEXCEPT
: p(o.p)
{
if (p)
p->addrefPromise();
}
AsyncPromise& AsyncPromise::operator=(const AsyncPromise& o) CV_NOEXCEPT
{
Impl* newp = o.p;
if (newp)
newp->addrefPromise();
release();
p = newp;
return *this;
}
void AsyncPromise::release() CV_NOEXCEPT
{
Impl* impl = p;
p = NULL;
if (impl)
impl->releasePromise();
}
AsyncArray AsyncPromise::getArrayResult()
{
CV_Assert(p);
return p->getArrayResult();
}
void AsyncPromise::setValue(InputArray value)
{
CV_Assert(p);
return p->setValue(value);
}
void AsyncPromise::setException(const cv::Exception& exception)
{
CV_Assert(p);
return p->setException(exception);
}
#if CV__EXCEPTION_PTR
void AsyncPromise::setException(std::exception_ptr exception)
{
CV_Assert(p);
return p->setException(exception);
}
#endif
} // namespace

View File

@ -1879,6 +1879,76 @@ void _OutputArray::assign(const Mat& m) const
}
void _OutputArray::move(UMat& u) const
{
if (fixedSize())
{
// TODO Performance warning
assign(u);
return;
}
int k = kind();
if (k == UMAT)
{
#ifdef CV_CXX11
*(UMat*)obj = std::move(u);
#else
*(UMat*)obj = u;
u.release();
#endif
}
else if (k == MAT)
{
u.copyTo(*(Mat*)obj); // TODO check u.getMat()
u.release();
}
else if (k == MATX)
{
u.copyTo(getMat()); // TODO check u.getMat()
u.release();
}
else
{
CV_Error(Error::StsNotImplemented, "");
}
}
void _OutputArray::move(Mat& m) const
{
if (fixedSize())
{
// TODO Performance warning
assign(m);
return;
}
int k = kind();
if (k == UMAT)
{
m.copyTo(*(UMat*)obj); // TODO check m.getUMat()
m.release();
}
else if (k == MAT)
{
#ifdef CV_CXX11
*(Mat*)obj = std::move(m);
#else
*(Mat*)obj = m;
m.release();
#endif
}
else if (k == MATX)
{
m.copyTo(getMat());
m.release();
}
else
{
CV_Error(Error::StsNotImplemented, "");
}
}
void _OutputArray::assign(const std::vector<UMat>& v) const
{
int k = kind();

View File

@ -129,27 +129,7 @@
#include "parallel_impl.hpp"
#ifndef CV__EXCEPTION_PTR
# if defined(__ANDROID__) && defined(ATOMIC_INT_LOCK_FREE) && ATOMIC_INT_LOCK_FREE < 2
# define CV__EXCEPTION_PTR 0 // Not supported, details: https://gcc.gnu.org/bugzilla/show_bug.cgi?id=58938
# elif defined(CV_CXX11)
# define CV__EXCEPTION_PTR 1
# elif defined(_MSC_VER)
# define CV__EXCEPTION_PTR (_MSC_VER >= 1600)
# elif defined(__clang__)
# define CV__EXCEPTION_PTR 0 // C++11 only (see above)
# elif defined(__GNUC__) && defined(__GXX_EXPERIMENTAL_CXX0X__)
# define CV__EXCEPTION_PTR (__GXX_EXPERIMENTAL_CXX0X__ > 0)
# endif
#endif
#ifndef CV__EXCEPTION_PTR
# define CV__EXCEPTION_PTR 0
#elif CV__EXCEPTION_PTR
# include <exception> // std::exception_ptr
#endif
#include "opencv2/core/detail/exception_ptr.hpp" // CV__EXCEPTION_PTR = 1 if std::exception_ptr is available
using namespace cv;

View File

@ -0,0 +1,154 @@
// This file is part of OpenCV project.
// It is subject to the license terms in the LICENSE file found in the top-level directory
// of this distribution and at http://opencv.org/license.html.
#include "test_precomp.hpp"
#include <opencv2/core/async.hpp>
#include <opencv2/core/detail/async_promise.hpp>
#include <opencv2/core/bindings_utils.hpp>
#ifdef CV_CXX11
#include <thread>
#include <chrono>
#endif
namespace opencv_test { namespace {
TEST(Core_Async, BasicCheck)
{
Mat m(3, 3, CV_32FC1, Scalar::all(5.0f));
AsyncPromise p;
AsyncArray r = p.getArrayResult();
EXPECT_TRUE(r.valid());
// Follow the limitations of std::promise::get_future
// https://en.cppreference.com/w/cpp/thread/promise/get_future
EXPECT_THROW(AsyncArray r2 = p.getArrayResult(), cv::Exception);
p.setValue(m);
Mat m2;
r.get(m2);
EXPECT_EQ(0, cvtest::norm(m, m2, NORM_INF));
// Follow the limitations of std::future::get
// https://en.cppreference.com/w/cpp/thread/future/get
EXPECT_FALSE(r.valid());
Mat m3;
EXPECT_THROW(r.get(m3), cv::Exception);
}
TEST(Core_Async, ExceptionCheck)
{
Mat m(3, 3, CV_32FC1, Scalar::all(5.0f));
AsyncPromise p;
AsyncArray r = p.getArrayResult();
EXPECT_TRUE(r.valid());
try
{
CV_Error(Error::StsOk, "Test: Generated async error");
}
catch (const cv::Exception& e)
{
p.setException(e);
}
try {
Mat m2;
r.get(m2);
FAIL() << "Exception is expected";
}
catch (const cv::Exception& e)
{
EXPECT_EQ(Error::StsOk, e.code) << e.what();
}
// Follow the limitations of std::future::get
// https://en.cppreference.com/w/cpp/thread/future/get
EXPECT_FALSE(r.valid());
}
TEST(Core_Async, LikePythonTest)
{
Mat m(3, 3, CV_32FC1, Scalar::all(5.0f));
AsyncArray r = cv::utils::testAsyncArray(m);
EXPECT_TRUE(r.valid());
Mat m2;
r.get(m2);
EXPECT_EQ(0, cvtest::norm(m, m2, NORM_INF));
// Follow the limitations of std::future::get
// https://en.cppreference.com/w/cpp/thread/future/get
EXPECT_FALSE(r.valid());
}
#ifdef CV_CXX11
TEST(Core_Async, AsyncThread_Simple)
{
Mat m(3, 3, CV_32FC1, Scalar::all(5.0f));
AsyncPromise p;
AsyncArray r = p.getArrayResult();
std::thread t([&]{
std::this_thread::sleep_for(std::chrono::milliseconds(100));
try {
p.setValue(m);
} catch (const std::exception& e) {
std::cout << e.what() << std::endl;
} catch (...) {
std::cout << "Unknown C++ exception" << std::endl;
}
});
try
{
Mat m2;
r.get(m2);
EXPECT_EQ(0, cvtest::norm(m, m2, NORM_INF));
t.join();
}
catch (...)
{
t.join();
throw;
}
}
TEST(Core_Async, AsyncThread_DetachedResult)
{
Mat m(3, 3, CV_32FC1, Scalar::all(5.0f));
AsyncPromise p;
{
AsyncArray r = p.getArrayResult();
r.release();
}
bool exception_ok = false;
std::thread t([&]{
std::this_thread::sleep_for(std::chrono::milliseconds(100));
try {
p.setValue(m);
} catch (const cv::Exception& e) {
if (e.code == Error::StsError)
exception_ok = true;
else
std::cout << e.what() << std::endl;
} catch (const std::exception& e) {
std::cout << e.what() << std::endl;
} catch (...) {
std::cout << "Unknown C++ exception" << std::endl;
}
});
t.join();
EXPECT_TRUE(exception_ok);
}
#endif
}} // namespace

View File

@ -44,9 +44,7 @@
#include <vector>
#include <opencv2/core.hpp>
#ifdef CV_CXX11
#include <future>
#endif
#include "opencv2/core/async.hpp"
#if !defined CV_DOXYGEN && !defined CV_STATIC_ANALYSIS && !defined CV_DNN_DONT_ADD_EXPERIMENTAL_NS
#define CV__DNN_EXPERIMENTAL_NS_BEGIN namespace experimental_dnn_34_v12 {
@ -67,18 +65,6 @@ CV__DNN_EXPERIMENTAL_NS_BEGIN
typedef std::vector<int> MatShape;
#if defined(CV_CXX11) || defined(CV_DOXYGEN)
typedef std::future<Mat> AsyncMat;
#else
// Just a workaround for bindings.
struct AsyncMat
{
Mat get() { return Mat(); }
void wait() const {}
size_t wait_for(size_t milliseconds) const { CV_UNUSED(milliseconds); return -1; }
};
#endif
/**
* @brief Enum of computation backends supported by layers.
* @see Net::setPreferableBackend
@ -483,7 +469,7 @@ CV__DNN_EXPERIMENTAL_NS_BEGIN
* This is an asynchronous version of forward(const String&).
* dnn::DNN_BACKEND_INFERENCE_ENGINE backend is required.
*/
CV_WRAP AsyncMat forwardAsync(const String& outputName = String());
CV_WRAP AsyncArray forwardAsync(const String& outputName = String());
/** @brief Runs forward pass to compute output of layer with name @p outputName.
* @param outputBlobs contains all output blobs for specified layer.

View File

@ -2,13 +2,6 @@
typedef dnn::DictValue LayerId;
typedef std::vector<dnn::MatShape> vector_MatShape;
typedef std::vector<std::vector<dnn::MatShape> > vector_vector_MatShape;
#ifdef CV_CXX11
typedef std::chrono::milliseconds chrono_milliseconds;
typedef std::future_status AsyncMatStatus;
#else
typedef size_t chrono_milliseconds;
typedef size_t AsyncMatStatus;
#endif
template<>
bool pyopencv_to(PyObject *o, dnn::DictValue &dv, const char *name)
@ -46,46 +39,6 @@ bool pyopencv_to(PyObject *o, std::vector<Mat> &blobs, const char *name) //requi
return pyopencvVecConverter<Mat>::to(o, blobs, ArgInfo(name, false));
}
#ifdef CV_CXX11
template<>
PyObject* pyopencv_from(const std::future<Mat>& f_)
{
std::future<Mat>& f = const_cast<std::future<Mat>&>(f_);
Ptr<cv::dnn::AsyncMat> p(new std::future<Mat>(std::move(f)));
return pyopencv_from(p);
}
template<>
PyObject* pyopencv_from(const std::future_status& status)
{
return pyopencv_from((int)status);
}
template<>
bool pyopencv_to(PyObject* src, std::chrono::milliseconds& dst, const char* name)
{
size_t millis = 0;
if (pyopencv_to(src, millis, name))
{
dst = std::chrono::milliseconds(millis);
return true;
}
else
return false;
}
#else
template<>
PyObject* pyopencv_from(const cv::dnn::AsyncMat&)
{
CV_Error(Error::StsNotImplemented, "C++11 is required.");
return 0;
}
#endif // CV_CXX11
template<typename T>
PyObject* pyopencv_from(const dnn::DictValue &dv)
{

View File

@ -1,22 +0,0 @@
#error This is a shadow header file, which is not intended for processing by any compiler. \
Only bindings parser should handle this file.
namespace cv { namespace dnn {
class CV_EXPORTS_W AsyncMat
{
public:
//! Wait for Mat object readiness and return it.
CV_WRAP Mat get();
//! Wait for Mat object readiness.
CV_WRAP void wait() const;
/** @brief Wait for Mat object readiness specific amount of time.
* @param timeout Timeout in milliseconds
* @returns [std::future_status](https://en.cppreference.com/w/cpp/thread/future_status)
*/
CV_WRAP AsyncMatStatus wait_for(std::chrono::milliseconds timeout) const;
};
}}

View File

@ -69,8 +69,9 @@ def printParams(backend, target):
class dnn_test(NewOpenCVTests):
def __init__(self, *args, **kwargs):
super(dnn_test, self).__init__(*args, **kwargs)
def setUp(self):
super(dnn_test, self).setUp()
self.dnnBackendsAndTargets = [
[cv.dnn.DNN_BACKEND_OPENCV, cv.dnn.DNN_TARGET_CPU],
]
@ -168,7 +169,7 @@ class dnn_test(NewOpenCVTests):
normAssertDetections(self, ref, out, 0.5, scoresDiff, iouDiff)
def test_async(self):
timeout = 5000 # in milliseconds
timeout = 500*10**6 # in nanoseconds (500ms)
testdata_required = bool(os.environ.get('OPENCV_DNN_TEST_REQUIRE_TESTDATA', False))
proto = self.find_dnn_file('dnn/layers/layer_convolution.prototxt', required=testdata_required)
model = self.find_dnn_file('dnn/layers/layer_convolution.caffemodel', required=testdata_required)
@ -209,11 +210,9 @@ class dnn_test(NewOpenCVTests):
outs.insert(0, netAsync.forwardAsync())
for i in reversed(range(numInputs)):
ret = outs[i].wait_for(timeout)
if ret == 1:
self.fail("Timeout")
self.assertEqual(ret, 0) # is ready
normAssert(self, refs[i], outs[i].get(), 'Index: %d' % i, 1e-10)
ret, result = outs[i].get(timeoutNs=float(timeout))
self.assertTrue(ret)
normAssert(self, refs[i], result, 'Index: %d' % i, 1e-10)
if __name__ == '__main__':

View File

@ -2557,7 +2557,7 @@ struct Net::Impl
}
#ifdef CV_CXX11
std::future<Mat> getBlobAsync(const LayerPin& pin)
AsyncArray getBlobAsync(const LayerPin& pin)
{
CV_TRACE_FUNCTION();
#ifdef HAVE_INF_ENGINE
@ -2586,7 +2586,7 @@ struct Net::Impl
#endif
}
std::future<Mat> getBlobAsync(String outputName)
AsyncArray getBlobAsync(String outputName)
{
return getBlobAsync(getPinByAlias(outputName));
}
@ -2714,7 +2714,7 @@ Mat Net::forward(const String& outputName)
return impl->getBlob(layerName);
}
AsyncMat Net::forwardAsync(const String& outputName)
AsyncArray Net::forwardAsync(const String& outputName)
{
CV_TRACE_FUNCTION();
#ifdef CV_CXX11

View File

@ -849,7 +849,7 @@ void InfEngineBackendNet::InfEngineReqWrapper::makePromises(const std::vector<Pt
outsNames.resize(outs.size());
for (int i = 0; i < outs.size(); ++i)
{
outs[i]->futureMat = outProms[i].get_future();
outs[i]->futureMat = outProms[i].getArrayResult();
outsNames[i] = outs[i]->dataPtr->name;
}
}
@ -906,20 +906,38 @@ void InfEngineBackendNet::forward(const std::vector<Ptr<BackendWrapper> >& outBl
{
InfEngineReqWrapper* wrapper;
request->GetUserData((void**)&wrapper, 0);
CV_Assert(wrapper);
CV_Assert(wrapper && "Internal error");
for (int i = 0; i < wrapper->outProms.size(); ++i)
size_t processedOutputs = 0;
try
{
const std::string& name = wrapper->outsNames[i];
Mat m = infEngineBlobToMat(wrapper->req.GetBlob(name));
for (; processedOutputs < wrapper->outProms.size(); ++processedOutputs)
{
const std::string& name = wrapper->outsNames[processedOutputs];
Mat m = infEngineBlobToMat(wrapper->req.GetBlob(name));
if (status == InferenceEngine::StatusCode::OK)
wrapper->outProms[i].set_value(m.clone());
else
try
{
CV_Assert(status == InferenceEngine::StatusCode::OK);
wrapper->outProms[processedOutputs].setValue(m.clone());
}
catch (...)
{
try {
wrapper->outProms[processedOutputs].setException(std::current_exception());
} catch(...) {
CV_LOG_ERROR(NULL, "DNN: Exception occured during async inference exception propagation");
}
}
}
}
catch (...)
{
std::exception_ptr e = std::current_exception();
for (; processedOutputs < wrapper->outProms.size(); ++processedOutputs)
{
try {
std::runtime_error e("Async request failed");
wrapper->outProms[i].set_exception(std::make_exception_ptr(e));
wrapper->outProms[processedOutputs].setException(e);
} catch(...) {
CV_LOG_ERROR(NULL, "DNN: Exception occured during async inference exception propagation");
}

View File

@ -12,6 +12,9 @@
#include "opencv2/core/cvstd.hpp"
#include "opencv2/dnn.hpp"
#include "opencv2/core/async.hpp"
#include "opencv2/core/detail/async_promise.hpp"
#include "opencv2/dnn/utils/inference_engine.hpp"
#ifdef HAVE_INF_ENGINE
@ -208,7 +211,7 @@ private:
void makePromises(const std::vector<Ptr<BackendWrapper> >& outs);
InferenceEngine::InferRequest req;
std::vector<std::promise<Mat> > outProms;
std::vector<cv::AsyncPromise> outProms;
std::vector<std::string> outsNames;
bool isReady;
};
@ -264,7 +267,7 @@ public:
InferenceEngine::DataPtr dataPtr;
InferenceEngine::Blob::Ptr blob;
std::future<Mat> futureMat;
AsyncArray futureMat;
};
InferenceEngine::Blob::Ptr wrapToInfEngineBlob(const Mat& m, InferenceEngine::Layout layout = InferenceEngine::Layout::ANY);

View File

@ -341,12 +341,13 @@ TEST(Net, forwardAndRetrieve)
}
#ifdef HAVE_INF_ENGINE
static const std::chrono::milliseconds async_timeout(500);
// This test runs network in synchronous mode for different inputs and then
// runs the same model asynchronously for the same inputs.
typedef testing::TestWithParam<tuple<int, Target> > Async;
TEST_P(Async, set_and_forward_single)
{
static const int kTimeout = 5000; // in milliseconds.
const int dtype = get<0>(GetParam());
const int target = get<1>(GetParam());
@ -383,16 +384,16 @@ TEST_P(Async, set_and_forward_single)
{
netAsync.setInput(inputs[i]);
std::future<Mat> out = netAsync.forwardAsync();
if (out.wait_for(std::chrono::milliseconds(kTimeout)) == std::future_status::timeout)
CV_Error(Error::StsAssert, "Timeout");
normAssert(refs[i], out.get(), format("Index: %d", i).c_str(), 0, 0);
AsyncArray out = netAsync.forwardAsync();
ASSERT_TRUE(out.valid());
Mat result;
EXPECT_TRUE(out.get(result, async_timeout));
normAssert(refs[i], result, format("Index: %d", i).c_str(), 0, 0);
}
}
TEST_P(Async, set_and_forward_all)
{
static const int kTimeout = 5000; // in milliseconds.
const int dtype = get<0>(GetParam());
const int target = get<1>(GetParam());
@ -426,7 +427,7 @@ TEST_P(Async, set_and_forward_all)
}
// Run asynchronously. To make test more robust, process inputs in the reversed order.
std::vector<std::future<Mat> > outs(numInputs);
std::vector<AsyncArray> outs(numInputs);
for (int i = numInputs - 1; i >= 0; --i)
{
netAsync.setInput(inputs[i]);
@ -435,9 +436,10 @@ TEST_P(Async, set_and_forward_all)
for (int i = numInputs - 1; i >= 0; --i)
{
if (outs[i].wait_for(std::chrono::milliseconds(kTimeout)) == std::future_status::timeout)
CV_Error(Error::StsAssert, "Timeout");
normAssert(refs[i], outs[i].get(), format("Index: %d", i).c_str(), 0, 0);
ASSERT_TRUE(outs[i].valid());
Mat result;
EXPECT_TRUE(outs[i].get(result, async_timeout));
normAssert(refs[i], result, format("Index: %d", i).c_str(), 0, 0);
}
}

View File

@ -0,0 +1,33 @@
#!/usr/bin/env python
from __future__ import print_function
import numpy as np
import cv2 as cv
from tests_common import NewOpenCVTests
class AsyncTest(NewOpenCVTests):
def test_async_simple(self):
m = np.array([[1,2],[3,4],[5,6]])
async_result = cv.utils.testAsyncArray(m)
self.assertTrue(async_result.valid())
ret, result = async_result.get(timeoutNs=10**6) # 1ms
self.assertTrue(ret)
self.assertFalse(async_result.valid())
self.assertEqual(cv.norm(m, result, cv.NORM_INF), 0)
def test_async_exception(self):
async_result = cv.utils.testAsyncException()
self.assertTrue(async_result.valid())
try:
_ret, _result = async_result.get(timeoutNs=10**6) # 1ms
self.fail("Exception expected")
except cv.error as e:
self.assertEqual(cv.Error.StsOk, e.code)
if __name__ == '__main__':
NewOpenCVTests.bootstrap()

View File

@ -6,6 +6,7 @@
#include <opencv2/highgui.hpp>
#ifdef CV_CXX11
#include <mutex>
#include <thread>
#include <queue>
#endif
@ -185,7 +186,7 @@ int main(int argc, char** argv)
QueueFPS<Mat> processedFramesQueue;
QueueFPS<std::vector<Mat> > predictionsQueue;
std::thread processingThread([&](){
std::queue<std::future<Mat> > futureOutputs;
std::queue<AsyncArray> futureOutputs;
Mat blob;
while (process)
{
@ -224,11 +225,13 @@ int main(int argc, char** argv)
}
while (!futureOutputs.empty() &&
futureOutputs.front().wait_for(std::chrono::seconds(0)) == std::future_status::ready)
futureOutputs.front().wait_for(std::chrono::seconds(0)))
{
Mat out = futureOutputs.front().get();
predictionsQueue.push({out});
AsyncArray async_out = futureOutputs.front();
futureOutputs.pop();
Mat out;
async_out.get(out);
predictionsQueue.push({out});
}
}
});

View File

@ -4,7 +4,7 @@ import numpy as np
import sys
import time
from threading import Thread
if sys.version_info[0] == '2':
if sys.version_info[0] == 2:
import Queue as queue
else:
import queue
@ -262,7 +262,7 @@ def processingThreadBody():
outs = net.forward(outNames)
predictionsQueue.put(np.copy(outs))
while futureOutputs and futureOutputs[0].wait_for(0) == 0:
while futureOutputs and futureOutputs[0].wait_for(0):
out = futureOutputs[0].get()
predictionsQueue.put(np.copy([out]))