opencv/modules/gapi/test/own/thread_pool_tests.cpp
Anatoliy Talamanov 8e43c8f200
Merge pull request #24845 from TolyaTalamanov:at/concurrent-executor
G-API: Implement concurrent executor #24845

## Overview
This PR introduces the new G-API executor called `GThreadedExecutor` which can be selected when the `GComputation` is compiled in `serial` mode (a.k.a `GComputation::compile(...)`)

### ThreadPool
`cv::gapi::own::ThreadPool` has been introduced in order to abstract usage of threads in `GThreadedExecutor`.
`ThreadPool` is implemented by using  `own::concurrent_bounded_queue`

`ThreadPool` has only as single method `schedule` that will push task into the queue for the further execution.
The **important** notice is that if `Task` executed in `ThreadPool` throws exception - this is `UB`. 

### GThreadedExecutor
The `GThreadedExecutor` is mostly copy-paste of `GExecutor`, should we extend `GExecutor` instead? 

#### Implementation details
1. Build the dependency graph for `Island` nodes.
2. Store the tasks that don't have dependencies into separate `vector` in order to run them first.
3. at the `GThreadedExecutor::run()` schedule the tasks that don't have dependencies that will schedule their dependents and wait for the completion.


### Pull Request Readiness Checklist

See details at https://github.com/opencv/opencv/wiki/How_to_contribute#making-a-good-pull-request

- [ ] I agree to contribute to the project under Apache 2 License.
- [ ] To the best of my knowledge, the proposed patch is not based on a code under GPL or another license that is incompatible with OpenCV
- [ ] The PR is proposed to the proper branch
- [ ] There is a reference to the original bug report and related work
- [ ] There is accuracy test, performance test and test data in opencv_extra repository, if applicable
      Patch to opencv_extra has the same branch name.
- [ ] The feature is well documented and sample code can be built with the project CMake
2024-01-30 17:01:50 +03:00

125 lines
3.1 KiB
C++

// This file is part of OpenCV project.
// It is subject to the license terms in the LICENSE file found in the top-level directory
// of this distribution and at http://opencv.org/license.html.
//
// Copyright (C) 2024 Intel Corporation
#include "../test_precomp.hpp"
#include <chrono>
#include <thread>
#include "executor/thread_pool.hpp"
namespace opencv_test
{
using namespace cv::gapi;
TEST(ThreadPool, ScheduleNotBlock)
{
own::Latch latch(1u);
std::atomic<uint32_t> counter{0u};
own::ThreadPool tp(4u);
tp.schedule([&](){
std::this_thread::sleep_for(std::chrono::milliseconds{500u});
counter++;
latch.count_down();
});
EXPECT_EQ(0u, counter);
latch.wait();
EXPECT_EQ(1u, counter);
}
TEST(ThreadPool, MultipleTasks)
{
const uint32_t kNumTasks = 100u;
own::Latch latch(kNumTasks);
std::atomic<uint32_t> completed{0u};
own::ThreadPool tp(4u);
for (uint32_t i = 0; i < kNumTasks; ++i) {
tp.schedule([&]() {
++completed;
latch.count_down();
});
}
latch.wait();
EXPECT_EQ(kNumTasks, completed.load());
}
struct ExecutionState {
ExecutionState(const uint32_t num_threads,
const uint32_t num_tasks)
: guard(0u),
critical(0u),
limit(num_tasks),
latch(num_threads),
tp(num_threads) {
}
std::atomic<uint32_t> guard;
std::atomic<uint32_t> critical;
const uint32_t limit;
own::Latch latch;
own::ThreadPool tp;
};
static void doRecursive(ExecutionState& state) {
// NB: Protects function to be executed no more than limit number of times
if (state.guard.fetch_add(1u) >= state.limit) {
state.latch.count_down();
return;
}
// NB: This simulates critical section
std::this_thread::sleep_for(std::chrono::milliseconds{50});
++state.critical;
// NB: Schedule the new one recursively
state.tp.schedule([&](){ doRecursive(state); });
}
TEST(ThreadPool, ScheduleRecursively)
{
const int kNumThreads = 5u;
const uint32_t kNumTasks = 100u;
ExecutionState state(kNumThreads, kNumTasks);
for (uint32_t i = 0; i < kNumThreads; ++i) {
state.tp.schedule([&](){
doRecursive(state);
});
}
state.latch.wait();
EXPECT_EQ(kNumTasks, state.critical.load());
}
TEST(ThreadPool, ExecutionIsParallel)
{
const uint32_t kNumThreads = 4u;
std::atomic<uint32_t> counter{0};
own::Latch latch{kNumThreads};
own::ThreadPool tp(kNumThreads);
auto start = std::chrono::high_resolution_clock::now();
for (uint32_t i = 0; i < kNumThreads; ++i) {
tp.schedule([&]() {
std::this_thread::sleep_for(std::chrono::milliseconds{800u});
++counter;
latch.count_down();
});
}
latch.wait();
auto end = std::chrono::high_resolution_clock::now();
auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count();
EXPECT_GE(1000u, elapsed);
EXPECT_EQ(kNumThreads, counter.load());
}
} // namespace opencv_test