mirror of
https://github.com/opencv/opencv.git
synced 2025-01-06 02:08:12 +08:00
8e43c8f200
G-API: Implement concurrent executor #24845 ## Overview This PR introduces the new G-API executor called `GThreadedExecutor` which can be selected when the `GComputation` is compiled in `serial` mode (a.k.a `GComputation::compile(...)`) ### ThreadPool `cv::gapi::own::ThreadPool` has been introduced in order to abstract usage of threads in `GThreadedExecutor`. `ThreadPool` is implemented by using `own::concurrent_bounded_queue` `ThreadPool` has only as single method `schedule` that will push task into the queue for the further execution. The **important** notice is that if `Task` executed in `ThreadPool` throws exception - this is `UB`. ### GThreadedExecutor The `GThreadedExecutor` is mostly copy-paste of `GExecutor`, should we extend `GExecutor` instead? #### Implementation details 1. Build the dependency graph for `Island` nodes. 2. Store the tasks that don't have dependencies into separate `vector` in order to run them first. 3. at the `GThreadedExecutor::run()` schedule the tasks that don't have dependencies that will schedule their dependents and wait for the completion. ### Pull Request Readiness Checklist See details at https://github.com/opencv/opencv/wiki/How_to_contribute#making-a-good-pull-request - [ ] I agree to contribute to the project under Apache 2 License. - [ ] To the best of my knowledge, the proposed patch is not based on a code under GPL or another license that is incompatible with OpenCV - [ ] The PR is proposed to the proper branch - [ ] There is a reference to the original bug report and related work - [ ] There is accuracy test, performance test and test data in opencv_extra repository, if applicable Patch to opencv_extra has the same branch name. - [ ] The feature is well documented and sample code can be built with the project CMake
125 lines
3.1 KiB
C++
125 lines
3.1 KiB
C++
// This file is part of OpenCV project.
|
|
// It is subject to the license terms in the LICENSE file found in the top-level directory
|
|
// of this distribution and at http://opencv.org/license.html.
|
|
//
|
|
// Copyright (C) 2024 Intel Corporation
|
|
|
|
#include "../test_precomp.hpp"
|
|
|
|
#include <chrono>
|
|
#include <thread>
|
|
|
|
#include "executor/thread_pool.hpp"
|
|
|
|
namespace opencv_test
|
|
{
|
|
|
|
using namespace cv::gapi;
|
|
|
|
TEST(ThreadPool, ScheduleNotBlock)
|
|
{
|
|
own::Latch latch(1u);
|
|
std::atomic<uint32_t> counter{0u};
|
|
|
|
own::ThreadPool tp(4u);
|
|
tp.schedule([&](){
|
|
std::this_thread::sleep_for(std::chrono::milliseconds{500u});
|
|
counter++;
|
|
latch.count_down();
|
|
});
|
|
|
|
EXPECT_EQ(0u, counter);
|
|
latch.wait();
|
|
EXPECT_EQ(1u, counter);
|
|
}
|
|
|
|
TEST(ThreadPool, MultipleTasks)
|
|
{
|
|
const uint32_t kNumTasks = 100u;
|
|
own::Latch latch(kNumTasks);
|
|
std::atomic<uint32_t> completed{0u};
|
|
|
|
own::ThreadPool tp(4u);
|
|
for (uint32_t i = 0; i < kNumTasks; ++i) {
|
|
tp.schedule([&]() {
|
|
++completed;
|
|
latch.count_down();
|
|
});
|
|
}
|
|
latch.wait();
|
|
|
|
EXPECT_EQ(kNumTasks, completed.load());
|
|
}
|
|
|
|
struct ExecutionState {
|
|
ExecutionState(const uint32_t num_threads,
|
|
const uint32_t num_tasks)
|
|
: guard(0u),
|
|
critical(0u),
|
|
limit(num_tasks),
|
|
latch(num_threads),
|
|
tp(num_threads) {
|
|
}
|
|
|
|
std::atomic<uint32_t> guard;
|
|
std::atomic<uint32_t> critical;
|
|
const uint32_t limit;
|
|
own::Latch latch;
|
|
own::ThreadPool tp;
|
|
};
|
|
|
|
static void doRecursive(ExecutionState& state) {
|
|
// NB: Protects function to be executed no more than limit number of times
|
|
if (state.guard.fetch_add(1u) >= state.limit) {
|
|
state.latch.count_down();
|
|
return;
|
|
}
|
|
// NB: This simulates critical section
|
|
std::this_thread::sleep_for(std::chrono::milliseconds{50});
|
|
++state.critical;
|
|
// NB: Schedule the new one recursively
|
|
state.tp.schedule([&](){ doRecursive(state); });
|
|
}
|
|
|
|
TEST(ThreadPool, ScheduleRecursively)
|
|
{
|
|
const int kNumThreads = 5u;
|
|
const uint32_t kNumTasks = 100u;
|
|
|
|
ExecutionState state(kNumThreads, kNumTasks);
|
|
for (uint32_t i = 0; i < kNumThreads; ++i) {
|
|
state.tp.schedule([&](){
|
|
doRecursive(state);
|
|
});
|
|
}
|
|
state.latch.wait();
|
|
|
|
EXPECT_EQ(kNumTasks, state.critical.load());
|
|
}
|
|
|
|
TEST(ThreadPool, ExecutionIsParallel)
|
|
{
|
|
const uint32_t kNumThreads = 4u;
|
|
std::atomic<uint32_t> counter{0};
|
|
own::Latch latch{kNumThreads};
|
|
|
|
own::ThreadPool tp(kNumThreads);
|
|
auto start = std::chrono::high_resolution_clock::now();
|
|
for (uint32_t i = 0; i < kNumThreads; ++i) {
|
|
tp.schedule([&]() {
|
|
std::this_thread::sleep_for(std::chrono::milliseconds{800u});
|
|
++counter;
|
|
latch.count_down();
|
|
});
|
|
}
|
|
latch.wait();
|
|
|
|
auto end = std::chrono::high_resolution_clock::now();
|
|
auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count();
|
|
|
|
EXPECT_GE(1000u, elapsed);
|
|
EXPECT_EQ(kNumThreads, counter.load());
|
|
}
|
|
|
|
} // namespace opencv_test
|