diff --git a/modules/gapi/CMakeLists.txt b/modules/gapi/CMakeLists.txt index 0067cfa389..c1f58ee22b 100644 --- a/modules/gapi/CMakeLists.txt +++ b/modules/gapi/CMakeLists.txt @@ -107,6 +107,7 @@ set(gapi_srcs # Executor src/executor/gexecutor.cpp + src/executor/gtbbexecutor.cpp src/executor/gstreamingexecutor.cpp src/executor/gasync.cpp @@ -196,6 +197,10 @@ if(TARGET opencv_test_gapi) target_link_libraries(opencv_test_gapi PRIVATE ade) endif() +if(HAVE_TBB AND TARGET opencv_test_gapi) + ocv_target_link_libraries(opencv_test_gapi PRIVATE tbb) +endif() + if(HAVE_FREETYPE) ocv_target_compile_definitions(${the_module} PRIVATE -DHAVE_FREETYPE) if(TARGET opencv_test_gapi) diff --git a/modules/gapi/include/opencv2/gapi/own/assert.hpp b/modules/gapi/include/opencv2/gapi/own/assert.hpp index d0e0f1c3ff..d50543fdac 100644 --- a/modules/gapi/include/opencv2/gapi/own/assert.hpp +++ b/modules/gapi/include/opencv2/gapi/own/assert.hpp @@ -2,16 +2,28 @@ // It is subject to the license terms in the LICENSE file found in the top-level directory // of this distribution and at http://opencv.org/license.html. // -// Copyright (C) 2018 Intel Corporation +// Copyright (C) 2018-2020 Intel Corporation #ifndef OPENCV_GAPI_OWN_ASSERT_HPP #define OPENCV_GAPI_OWN_ASSERT_HPP +#include + +#define GAPI_DbgAssertNoOp(expr) { \ + constexpr bool _assert_tmp = false && (expr); \ + cv::util::suppress_unused_warning(_assert_tmp); \ +} + #if !defined(GAPI_STANDALONE) #include #define GAPI_Assert CV_Assert -#define GAPI_DbgAssert CV_DbgAssert + +#if defined _DEBUG || defined CV_STATIC_ANALYSIS +# define GAPI_DbgAssert CV_DbgAssert +#else +# define GAPI_DbgAssert(expr) GAPI_DbgAssertNoOp(expr) +#endif #else #include @@ -33,7 +45,7 @@ namespace detail #ifdef NDEBUG -# define GAPI_DbgAssert(expr) +# define GAPI_DbgAssert(expr) GAPI_DbgAssertNoOp(expr) #else # define GAPI_DbgAssert(expr) GAPI_Assert(expr) #endif diff --git a/modules/gapi/include/opencv2/gapi/util/copy_through_move.hpp b/modules/gapi/include/opencv2/gapi/util/copy_through_move.hpp new file mode 100644 index 0000000000..1a1121eb21 --- /dev/null +++ b/modules/gapi/include/opencv2/gapi/util/copy_through_move.hpp @@ -0,0 +1,34 @@ +// This file is part of OpenCV project. +// It is subject to the license terms in the LICENSE file found in the top-level directory +// of this distribution and at http://opencv.org/license.html. +// +// Copyright (C) 2020 Intel Corporation + +#ifndef OPENCV_GAPI_UTIL_COPY_THROUGH_MOVE_HPP +#define OPENCV_GAPI_UTIL_COPY_THROUGH_MOVE_HPP + +#include //decay_t + +namespace cv +{ +namespace util +{ + //This is a tool to move initialize captures of a lambda in C++11 + template + struct copy_through_move_t{ + T value; + const T& get() const {return value;} + T& get() {return value;} + copy_through_move_t(T&& g) : value(std::move(g)) {} + copy_through_move_t(copy_through_move_t&&) = default; + copy_through_move_t(copy_through_move_t const& lhs) : copy_through_move_t(std::move(const_cast(lhs))) {} + }; + + template + copy_through_move_t> copy_through_move(T&& t){ + return std::forward(t); + } +} // namespace util +} // namespace cv + +#endif /* OPENCV_GAPI_UTIL_COPY_THROUGH_MOVE_HPP */ diff --git a/modules/gapi/src/executor/gapi_itt.hpp b/modules/gapi/src/executor/gapi_itt.hpp new file mode 100644 index 0000000000..2ab3237e7f --- /dev/null +++ b/modules/gapi/src/executor/gapi_itt.hpp @@ -0,0 +1,59 @@ +// This file is part of OpenCV project. +// It is subject to the license terms in the LICENSE file found in the top-level directory +// of this distribution and at http://opencv.org/license.html. +// +// Copyright (C) 2020 Intel Corporation + +#ifndef OPENCV_GAPI_GAPI_ITT_HPP +#define OPENCV_GAPI_GAPI_ITT_HPP + +//for ITT_NAMED_TRACE_GUARD +#include +#include + +// FIXME: It seems that this macro is not propagated here by the OpenCV cmake (as this is not core module). +// (Consider using OpenCV's trace.hpp ) +#ifdef OPENCV_WITH_ITT +#include +#endif + +#include +namespace cv { +namespace util { + template< class T > + using remove_reference_t = typename std::remove_reference::type; + + // Home brew ScopeGuard + // D will be called automatically with p as argument when ScopeGuard goes out of scope. + // call release() on the ScopeGuard object to revoke guard action + template + auto make_ptr_guard(T* p, D&& d) -> std::unique_ptr> { + return {p, std::forward(d)}; + } +} // namespace util + +// FIXME: make it more reusable (and move to other place and other namespace) +namespace gimpl { namespace parallel { + #ifdef OPENCV_WITH_ITT + extern const __itt_domain* gapi_itt_domain; + + namespace { + auto make_itt_guard = [](__itt_string_handle* h) { + __itt_task_begin(gapi_itt_domain, __itt_null, __itt_null, (h)); + return util::make_ptr_guard(reinterpret_cast(1), [](int* ) { __itt_task_end(gapi_itt_domain); }); + }; + } // namespace + + #define GAPI_ITT_NAMED_TRACE_GUARD(name, h) auto name = cv::gimpl::parallel::make_itt_guard(h); cv::util::suppress_unused_warning(name) + #else + struct dumb_guard {void reset(){}}; + #define GAPI_ITT_NAMED_TRACE_GUARD(name, h) cv::gimpl::parallel::dumb_guard name; cv::util::suppress_unused_warning(name) + #endif + + #define GAPI_ITT_AUTO_TRACE_GUARD_IMPL_(LINE, h) GAPI_ITT_NAMED_TRACE_GUARD(itt_trace_guard_##LINE, h) + #define GAPI_ITT_AUTO_TRACE_GUARD_IMPL(LINE, h) GAPI_ITT_AUTO_TRACE_GUARD_IMPL_(LINE, h) + #define GAPI_ITT_AUTO_TRACE_GUARD(h) GAPI_ITT_AUTO_TRACE_GUARD_IMPL(__LINE__, h) +}} //gimpl::parallel +} //namespace cv + +#endif /* OPENCV_GAPI_GAPI_ITT_HPP */ diff --git a/modules/gapi/src/executor/gtbbexecutor.cpp b/modules/gapi/src/executor/gtbbexecutor.cpp new file mode 100644 index 0000000000..03c6757dc6 --- /dev/null +++ b/modules/gapi/src/executor/gtbbexecutor.cpp @@ -0,0 +1,445 @@ +// This file is part of OpenCV project. +// It is subject to the license terms in the LICENSE file found in the top-level directory +// of this distribution and at http://opencv.org/license.html. +// +// Copyright (C) 2020 Intel Corporation + +#include "gtbbexecutor.hpp" + +#if defined(HAVE_TBB) +#include "gapi_itt.hpp" + +#include +#include +#include "logger.hpp" // GAPI_LOG + +#include +#include // unique_ptr + +#include +#include + +#include + +#define ASSERT(expr) GAPI_DbgAssert(expr) + +#define LOG_INFO(tag, ...) GAPI_LOG_INFO(tag, __VA_ARGS__) +#define LOG_WARNING(tag, ...) GAPI_LOG_WARNING(tag, __VA_ARGS__) +#define LOG_DEBUG(tag, ...) GAPI_LOG_DEBUG(tag, __VA_ARGS__) + + +#ifdef OPENCV_WITH_ITT +const __itt_domain* cv::gimpl::parallel::gapi_itt_domain = __itt_domain_create("GAPI Context"); +#endif + +namespace cv { namespace gimpl { namespace parallel { + +namespace detail { +// some helper staff to deal with tbb::task related entities +namespace tasking { + +enum class use_tbb_scheduler_bypass { + NO, + YES +}; + +inline void assert_graph_is_running(tbb::task* root) { + // tbb::task::wait_for_all block calling thread until task ref_count is dropped to 1 + // So if the root task ref_count is greater than 1 graph still has a job to do and + // according wait_for_all() has not yet returned + ASSERT(root->ref_count() > 1); +} + +// made template to break circular dependencies +template +struct functor_task : tbb::task { + body_t body; + + template + functor_task(arg_t&& a) : body(std::forward(a)) {} + + tbb::task * execute() override { + assert_graph_is_running(parent()); + + auto reuse_current_task = body(); + // if needed, say TBB to execute current task once again + return (use_tbb_scheduler_bypass::YES == reuse_current_task) ? (recycle_as_continuation(), this) : nullptr; + } + ~functor_task() { + assert_graph_is_running(parent()); + } +}; + +template +auto allocate_task(tbb::task* root, body_t const& body) -> functor_task* { + return new(tbb::task::allocate_additional_child_of(*root)) functor_task{body}; +} + +template +void spawn_no_assert(tbb::task* root, body_t const& body) { + tbb::task::spawn(* allocate_task(root, body)); +} + +#ifdef OPENCV_WITH_ITT +namespace { + static __itt_string_handle* ittTbbAddReadyBlocksToQueue = __itt_string_handle_create("add ready blocks to queue"); + static __itt_string_handle* ittTbbSpawnReadyBlocks = __itt_string_handle_create("spawn ready blocks"); + static __itt_string_handle* ittTbbEnqueueSpawnReadyBlocks = __itt_string_handle_create("enqueueing a spawn of ready blocks"); + static __itt_string_handle* ittTbbUnlockMasterThread = __itt_string_handle_create("Unlocking master thread"); +} +#endif // OPENCV_WITH_ITT + + +template +void batch_spawn(size_t count, tbb::task* root, body_t const& body, bool do_assert_graph_is_running = true) { + GAPI_ITT_AUTO_TRACE_GUARD(ittTbbSpawnReadyBlocks); + if (do_assert_graph_is_running) { + assert_graph_is_running(root); + } + + for (size_t i=0; i; + +root_t inline create_root(tbb::task_group_context& ctx) { + root_t root{new (tbb::task::allocate_root(ctx)) tbb::empty_task}; + root->set_ref_count(1); // required by wait_for_all, as it waits until counter drops to 1 + return root; +} + +std::size_t inline tg_context_traits() { + // Specify tbb::task_group_context::concurrent_wait in the traits to ask TBB scheduler not to change + // ref_count of the task we wait on (root) when wait is complete. + return tbb::task_group_context::default_traits | tbb::task_group_context::concurrent_wait; +} + +} // namespace tasking + +namespace async { +struct async_tasks_t { + std::atomic count {0}; + std::condition_variable cv; + std::mutex mtx; +}; + +enum class wake_tbb_master { + NO, + YES +}; + +void inline wake_master(async_tasks_t& async_tasks, wake_tbb_master wake_master) { + // TODO: seems that this can be relaxed + auto active_async_tasks = --async_tasks.count; + + if ((active_async_tasks == 0) || (wake_master == wake_tbb_master::YES)) { + // Was the last async task or asked to wake TBB master up(e.g. there are new TBB tasks to execute) + GAPI_ITT_AUTO_TRACE_GUARD(ittTbbUnlockMasterThread); + // While decrement of async_tasks_t::count is atomic, it might occur after the waiting + // thread has read its value but _before_ it actually starts waiting on the condition variable. + // So, lock acquire is needed to guarantee that current condition check (if any) in the waiting thread + // (possibly ran in parallel to async_tasks_t::count decrement above) is completed _before_ signal is issued. + // Therefore when notify_one is called, waiting thread is either sleeping on the condition variable or + // running a new check which is guaranteed to pick the new value and return from wait(). + + // There is no need to _hold_ the lock while signaling, only to acquire it. + std::unique_lock {async_tasks.mtx}; // Acquire and release the lock. + async_tasks.cv.notify_one(); + } +} + +struct master_thread_sleep_lock_t +{ + struct sleep_unlock { + void operator()(async_tasks_t* t) const { + ASSERT(t); + wake_master(*t, wake_tbb_master::NO); + } + }; + + std::unique_ptr guard; + + master_thread_sleep_lock_t() = default; + master_thread_sleep_lock_t(async_tasks_t* async_tasks_ptr) : guard(async_tasks_ptr) { + // TODO: seems that this can be relaxed + ++(guard->count); + } + + void unlock(wake_tbb_master wake) { + if (auto* p = guard.release()) { + wake_master(*p, wake); + } + } +}; + +master_thread_sleep_lock_t inline lock_sleep_master(async_tasks_t& async_tasks) { + return {&async_tasks}; +} + +enum class is_tbb_work_present { + NO, + YES +}; + +//RAII object to block TBB master thread (one that does wait_for_all()) +//N.B. :wait_for_all() return control when root ref_count drops to 1, +struct root_wait_lock_t { + struct root_decrement_ref_count{ + void operator()(tbb::task* t) const { + ASSERT(t); + auto result = t->decrement_ref_count(); + ASSERT(result >= 1); + } + }; + + std::unique_ptr guard; + + root_wait_lock_t() = default; + root_wait_lock_t(tasking::root_t& root, is_tbb_work_present& previous_state) : guard{root.get()} { + // Block the master thread while the *this object is alive. + auto new_root_ref_count = root->add_ref_count(1); + previous_state = (new_root_ref_count == 2) ? is_tbb_work_present::NO : is_tbb_work_present::YES; + } + +}; + +root_wait_lock_t inline lock_wait_master(tasking::root_t& root, is_tbb_work_present& previous_state) { + return root_wait_lock_t{root, previous_state}; +} + +} // namespace async + +inline tile_node* pop(prio_items_queue_t& q) { + tile_node* node = nullptr; + bool popped = q.try_pop(node); + ASSERT(popped && "queue should be non empty as we push items to it before we spawn"); + return node; +} + +namespace graph { + // Returns : number of items actually pushed into the q + std::size_t inline push_ready_dependants(prio_items_queue_t& q, tile_node* node) { + GAPI_ITT_AUTO_TRACE_GUARD(ittTbbAddReadyBlocksToQueue); + std::size_t ready_items = 0; + // enable dependent tasks + for (auto* dependant : node->dependants) { + // fetch_and_sub returns previous value + if (1 == dependant->dependency_count.fetch_sub(1)) { + // tile node is ready for execution, add it to the queue + q.push(dependant); + ++ready_items; + } + } + return ready_items; + } + + struct exec_ctx { + tbb::task_arena& arena; + prio_items_queue_t& q; + tbb::task_group_context tg_ctx; + tasking::root_t root; + detail::async::async_tasks_t async_tasks; + std::atomic executed {0}; + + exec_ctx(tbb::task_arena& arena_, prio_items_queue_t& q_) + : arena(arena_), q(q_), + // As the traits is last argument, explicitly specify (default) value for first argument + tg_ctx{tbb::task_group_context::bound, tasking::tg_context_traits()}, + root(tasking::create_root(tg_ctx)) + {} + }; + + // At the moment there are no suitable tools to manage TBB priorities on task by task basis. + // Instead priority queue is used to respect tile_node priorities. + // As well, TBB task is not bound to any particular tile_node until actually executed. + + // Strictly speaking there are two graphs here: + // - G-API one, described by the connected tile_node instances. + // This graph is : + // - Known beforehand, and do not change during the execution (i.e. static) + // - Contains both TBB non-TBB parts + // - prioritized, (i.e. all nodes has assigned priority of execution) + // + // - TBB task tree, which is : + // - flat (Has only two levels : root and leaves) + // - dynamic, i.e. new leaves are added on demand when new tbb tasks are spawned + // - describes only TBB/CPU part of the whole graph + // - non-prioritized (i.e. all tasks are created equal) + + // Class below represents TBB task payload. + // + // Each instance basically does the three things : + // 1. Gets the tile_node item from the top of the queue + // 2. Executes its body + // 3. Pushes dependent tile_nodes to the queue once they are ready + // + struct task_body { + exec_ctx& ctx; + + std::size_t push_ready_dependants(tile_node* node) const { + return graph::push_ready_dependants(ctx.q, node); + } + + void spawn_clones(std::size_t items) const { + tasking::batch_spawn(items, ctx.root.get(), *this); + } + + task_body(exec_ctx& ctx_) : ctx(ctx_) {} + tasking::use_tbb_scheduler_bypass operator()() const { + ASSERT(!ctx.q.empty() && "Spawned task with no job to do ? "); + + tile_node* node = detail::pop(ctx.q); + + auto result = tasking::use_tbb_scheduler_bypass::NO; + // execute the task + + if (auto p = util::get_if(&(node->task_body))) { + // synchronous task + p->body(); + + std::size_t ready_items = push_ready_dependants(node); + + if (ready_items > 0) { + // spawn one less tasks and say TBB to reuse(recycle) current task + spawn_clones(ready_items - 1); + result = tasking::use_tbb_scheduler_bypass::YES; + } + } + else { + LOG_DEBUG(NULL, "Async task"); + using namespace detail::async; + using util::copy_through_move; + + auto block_master = copy_through_move(lock_sleep_master(ctx.async_tasks)); + + auto self_copy = *this; + auto callback = [node, block_master, self_copy] () mutable /*due to block_master.get().unlock()*/ { + LOG_DEBUG(NULL, "Async task callback is called"); + // Implicitly unlock master right in the end of callback + auto master_sleep_lock = std::move(block_master); + std::size_t ready_items = self_copy.push_ready_dependants(node); + if (ready_items > 0) { + auto master_was_active = is_tbb_work_present::NO; + { + GAPI_ITT_AUTO_TRACE_GUARD(ittTbbEnqueueSpawnReadyBlocks); + // Force master thread (one that does wait_for_all()) to (actively) wait for enqueued tasks + // and unlock it right after all dependent tasks are spawned. + + auto root_wait_lock = copy_through_move(lock_wait_master(self_copy.ctx.root, master_was_active)); + + // TODO: add test to cover proper holding of root_wait_lock + // As the calling thread most likely is not TBB one, instead of spawning TBB tasks directly we + // enqueue a task which will spawn them. + // For master thread to not leave wait_for_all() prematurely, + // hold the root_wait_lock until need tasks are actually spawned. + self_copy.ctx.arena.enqueue([ready_items, self_copy, root_wait_lock]() { + self_copy.spawn_clones(ready_items); + // TODO: why we need this? Either write a descriptive comment or remove it + volatile auto unused = root_wait_lock.get().guard.get(); + util::suppress_unused_warning(unused); + }); + } + // Wake master thread (if any) to pick up the enqueued tasks iff: + // 1. there is new TBB work to do, and + // 2. Master thread was sleeping on condition variable waiting for async tasks to complete + // (There was no active work before (i.e. root->ref_count() was == 1)) + auto wake_master = (master_was_active == is_tbb_work_present::NO) ? + wake_tbb_master::YES : wake_tbb_master::NO; + master_sleep_lock.get().unlock(wake_master); + } + }; + + auto& body = util::get(node->task_body).body; + body(std::move(callback), node->total_order_index); + } + + ctx.executed++; + // reset dependecy_count to initial state to simplify re-execution of the same graph + node->dependency_count = node->dependencies; + + return result; + } + }; +} +} // namespace detail +}}} // namespace cv::gimpl::parallel + +void cv::gimpl::parallel::execute(prio_items_queue_t& q) { + // get the reference to current task_arena (i.e. one we are running in) +#if TBB_INTERFACE_VERSION > 9002 + using attach_t = tbb::task_arena::attach; +#else + using attach_t = tbb::internal::attach; +#endif + + tbb::task_arena arena{attach_t{}}; + execute(q, arena); +} + +void cv::gimpl::parallel::execute(prio_items_queue_t& q, tbb::task_arena& arena) { + using namespace detail; + graph::exec_ctx ctx{arena, q}; + + arena.execute( + [&]() { + // Passed in queue is assumed to contain starting tasks, i.e. ones with no (or resolved) dependencies + auto num_start_tasks = q.size(); + + // TODO: use recursive spawning and task soft affinity for faster task distribution + // As graph is starting and no task has been spawned yet + // assert_graph_is_running(root) will not hold, so spawn without assert + tasking::batch_spawn(num_start_tasks, ctx.root.get(), graph::task_body{ctx}, /* assert_graph_is_running*/false); + + using namespace std::chrono; + high_resolution_clock timer; + + auto tbb_work_done = [&ctx]() { return 1 == ctx.root->ref_count(); }; + auto async_work_done = [&ctx]() { return 0 == ctx.async_tasks.count; }; + do { + // First participate in execution of TBB graph till there are no more ready tasks. + ctx.root->wait_for_all(); + + if (!async_work_done()) { // Wait on the conditional variable iff there is active async work + auto start = timer.now(); + std::unique_lock lk(ctx.async_tasks.mtx); + // Wait (probably by sleeping) until all async tasks are completed or new TBB tasks are created. + // FIXME: Use TBB resumable tasks here to avoid blocking TBB thread + ctx.async_tasks.cv.wait(lk, [&]{return async_work_done() || !tbb_work_done() ;}); + + LOG_INFO(NULL, "Slept for " << duration_cast(timer.now() - start).count() << " ms \n"); + } + } + while(!tbb_work_done() || !async_work_done()); + + ASSERT(tbb_work_done() && async_work_done() && "Graph is still running?"); + } + ); + + LOG_INFO(NULL, "Done. Executed " << ctx.executed << " tasks"); +} + +std::ostream& cv::gimpl::parallel::operator<<(std::ostream& o, tile_node const& n) { + o << "(" + << " at:" << &n << "," + << "indx: " << n.total_order_index << "," + << "deps #:" << n.dependency_count.value << ", " + << "prods:" << n.dependants.size(); + + o << "["; + for (auto* d: n.dependants) { + o << d << ","; + } + o << "]"; + + o << ")"; + return o; +} + +#endif // HAVE_TBB diff --git a/modules/gapi/src/executor/gtbbexecutor.hpp b/modules/gapi/src/executor/gtbbexecutor.hpp new file mode 100644 index 0000000000..8a62266f66 --- /dev/null +++ b/modules/gapi/src/executor/gtbbexecutor.hpp @@ -0,0 +1,103 @@ +// This file is part of OpenCV project. +// It is subject to the license terms in the LICENSE file found in the top-level directory +// of this distribution and at http://opencv.org/license.html. +// +// Copyright (C) 2020 Intel Corporation + +#ifndef OPENCV_GAPI_TBB_EXECUTOR_HPP +#define OPENCV_GAPI_TBB_EXECUTOR_HPP + +#if !defined(GAPI_STANDALONE) +#include +#endif + +#if defined(HAVE_TBB) + +#include +#include +#include +#include + +#include +#include + +#include + +namespace cv { namespace gimpl { namespace parallel { + +// simple wrapper to allow copies of std::atomic +template +struct atomic_copyable_wrapper { + std::atomic value; + + atomic_copyable_wrapper(count_t val) : value(val) {} + atomic_copyable_wrapper(atomic_copyable_wrapper const& lhs) : value (lhs.value.load(std::memory_order_relaxed)) {} + + atomic_copyable_wrapper& operator=(count_t val) { + value.store(val, std::memory_order_relaxed); + return *this; + } + + count_t fetch_sub(count_t val) { + return value.fetch_sub(val); + } + + count_t fetch_add(count_t val) { + return value.fetch_add(val); + } +}; + +struct async_tag {}; +constexpr async_tag async; + +// Class describing a piece of work in the node in the tasks graph. +// Most of the fields are set only once during graph compilation and never changes. +// (However at the moment they can not be made const due to two phase initialization +// of the tile_node objects) +// FIXME: refactor the code to make the const? +struct tile_node { + // place in totally ordered queue of tasks to execute. Inverse to priority, i.e. + // lower index means higher priority + size_t total_order_index = 0; + + // FIXME: use templates here instead of std::function + struct sync_task_body { + std::function body; + }; + struct async_task_body { + std::function&& callback, size_t total_order_index)> body; + }; + + util::variant task_body; + + // number of dependencies according to a dependency graph (i.e. number of "input" edges). + size_t dependencies = 0; + + // number of unsatisfied dependencies. When drops to zero task is ready for execution. + // Initially equal to "dependencies" + atomic_copyable_wrapper dependency_count = 0; + + std::vector dependants; + + tile_node(decltype(sync_task_body::body)&& f) : task_body(sync_task_body{std::move(f)}) {}; + tile_node(async_tag, decltype(async_task_body::body)&& f) : task_body(async_task_body{std::move(f)}) {}; +}; + +std::ostream& operator<<(std::ostream& o, tile_node const& n); + +struct tile_node_indirect_priority_comparator { + bool operator()(tile_node const * lhs, tile_node const * rhs) const { + return lhs->total_order_index > rhs->total_order_index; + } +}; + +using prio_items_queue_t = tbb::concurrent_priority_queue; + +void execute(prio_items_queue_t& q); +void execute(prio_items_queue_t& q, tbb::task_arena& arena); + +}}} // namespace cv::gimpl::parallel + +#endif // HAVE_TBB + +#endif // OPENCV_GAPI_TBB_EXECUTOR_HPP diff --git a/modules/gapi/test/executor/gtbbexecutor_internal_tests.cpp b/modules/gapi/test/executor/gtbbexecutor_internal_tests.cpp new file mode 100644 index 0000000000..d793683f94 --- /dev/null +++ b/modules/gapi/test/executor/gtbbexecutor_internal_tests.cpp @@ -0,0 +1,172 @@ +// This file is part of OpenCV project. +// It is subject to the license terms in the LICENSE file found in the top-level directory +// of this distribution and at http://opencv.org/license.html. +// +// Copyright (C) 2020 Intel Corporation + +// Deliberately include .cpp file instead of header as we use non exported function (execute) +#include + +#if defined(HAVE_TBB) + +#include "../test_precomp.hpp" +#include +#include + +namespace { + tbb::task_arena create_task_arena(int max_concurrency = tbb::task_arena::automatic /* set to 1 for single thread */) { + unsigned int reserved_for_master_threads = 1; + if (max_concurrency == 1) { + // Leave no room for TBB worker threads, by reserving all to masters. + // TBB runtime guarantees that no worker threads will join the arena + // if max_concurrency is equal to reserved_for_master_threads + // except 1:1 + use of enqueued tasks for safety guarantee. + // So deliberately make it 2:2 to force TBB not to create extra thread. + // + // N.B. one slot will left empty as only one master thread(one that + // calls root->wait_for_all()) will join the arena. + + // FIXME: strictly speaking master can take any free slot, not the first one. + // However at the moment master seems to pick 0 slot all the time. + max_concurrency = 2; + reserved_for_master_threads = 2; + } + return tbb::task_arena{max_concurrency, reserved_for_master_threads}; + } +} + +namespace opencv_test { + +TEST(TBBExecutor, Basic) { + using namespace cv::gimpl::parallel; + bool executed = false; + prio_items_queue_t q; + tile_node n([&]() { + executed = true; + }); + q.push(&n); + execute(q); + EXPECT_EQ(true, executed); +} + +TEST(TBBExecutor, SerialExecution) { + using namespace cv::gimpl::parallel; + const int n = 10; + prio_items_queue_t q; + std::vector nodes; nodes.reserve(n+1); + std::vector thread_id(n); + for (int i=0; i (std::count(thread_id.begin(), thread_id.end(), thread_id[0]))) + << print_thread_ids(); +} + +TEST(TBBExecutor, AsyncBasic) { + using namespace cv::gimpl::parallel; + + std::atomic callback_ready {false}; + std::function callback; + + std::atomic callback_called {false}; + std::atomic master_is_waiting {true}; + std::atomic master_was_blocked_until_callback_called {false}; + + auto async_thread = std::thread([&] { + bool slept = false; + while (!callback_ready) { + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + slept = true; + } + if (!slept) { + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } + callback(); + callback_called = true; + master_was_blocked_until_callback_called = (master_is_waiting == true); + }); + + auto async_task_body = [&](std::function&& cb, size_t /*total_order_index*/) { + callback = std::move(cb); + callback_ready = true; + }; + tile_node n(async, std::move(async_task_body)); + + prio_items_queue_t q; + q.push(&n); + execute(q); + master_is_waiting = false; + + async_thread.join(); + + EXPECT_EQ(true, callback_called); + EXPECT_EQ(true, master_was_blocked_until_callback_called); +} + +TEST(TBBExecutor, Dependencies) { + using namespace cv::gimpl::parallel; + const int n = 10; + bool serial = true; + std::atomic counter {0}; + prio_items_queue_t q; + std::vector nodes; nodes.reserve(n+1); + const int invalid_order = -10; + std::vector tiles_exec_order(n, invalid_order); + + auto add_dependency_to = [](tile_node& node, tile_node& dependency) { + dependency.dependants.push_back(&node); + node.dependencies++; + node.dependency_count.fetch_add(1); + }; + for (int i=0; i 0) { + auto last_node = nodes.end() - 1; + add_dependency_to(*last_node, *(last_node -1)); + } + } + + q.push(&nodes.front()); + + auto arena = serial ? create_task_arena(1) : create_task_arena(); + execute(q, arena); + auto print_execution_order = [&] { + std::stringstream str; + for (auto& i : tiles_exec_order) { str << i <<" ";} + return str.str(); + }; + ASSERT_EQ(0, std::count(tiles_exec_order.begin(), tiles_exec_order.end(), invalid_order)) + << "Not all " << n << " task executed ?\n" + <<" execution order : " << print_execution_order(); + + for (size_t i=0; i