mirror of
https://github.com/opencv/opencv.git
synced 2025-01-11 15:08:08 +08:00
7b399c4248
Optimization for parallelization when large core number #24280 **Problem description:** When the number of cores is large, OpenCV’s thread library may reduce performance when processing parallel jobs. **The reason for this problem:** When the number of cores (the thread pool initialized the threads, whose number is as same as the number of cores) is large, the main thread will spend too much time on waking up unnecessary threads. When a parallel job needs to be executed, the main thread will wake up all threads in sequence, and then wait for the signal for the job completion after waking up all threads. When the number of threads is larger than the parallel number of a job slices, there will be a situation where the main thread wakes up the threads in sequence and the awakened threads have completed the job, but the main thread is still waking up the other threads. The threads woken up by the main thread after this have nothing to do, and the broadcasts made by the waking threads take a lot of time, which reduce the performance. **Solution:** Reduce the time for the process of main thread waking up the worker threads through the following two methods: • The number of threads awakened by the main thread should be adjusted according to the parallel number of a job slices. If the number of threads is greater than the number of the parallel number of job slices, the total number of threads awakened should be reduced. • In the process of waking up threads in sequence, if the main thread finds that all parallel job slices have been allocated, it will jump out of the loop in time and wait for the signal for the job completion. **Performance Test:** The tests were run in the manner described by https://github.com/opencv/opencv/wiki/HowToUsePerfTests. At core number = 160, There are big performance gain in some cases. Take the following cases in the video module as examples: OpticalFlowPyrLK_self::Path_Idx_Cn_NPoints_WSize_Deriv::("cv/optflow/frames/VGA_%02d.png", 2, 1, (9, 9), 11, true) Performance improves 191%:0.185405ms ->0.0636496ms perf::DenseOpticalFlow_VariationalRefinement::(320x240, 10, 10) Performance improves 112%:23.88938ms -> 11.2562ms Among all the modules, the performance improvement is greatest on module video, and there are also certain improvements on other modules. At core number = 160, the times labeled below are the geometric mean of the average time of all cases for one module. The optimization is available on each module. overall | time(ms) | | | | | | | -- | -- | -- | -- | -- | -- | -- | -- | -- module name | gapi | dnn | features2d | objdetect | core | imgproc | stitching | video original | 0.185 | 1.586 | 9.998 | 11.846 | 0.205 | 0.215 | 164.409 | 0.803 optimized | 0.174 | 1.353 | 9.535 | 11.105 | 0.199 | 0.185 | 153.972 | 0.489 Performance improves | 6% | 17% | 5% | 7% | 3% | 16% | 7% | 64% Meanwhile, It is found that adjusting the order of test cases will have an impact on some test cases. For example, we used option --gtest-shuffle to run opencv_perf_gapi, the performance of TestPerformance::CmpWithScalarPerfTestFluid/CmpWithScalarPerfTest::(compare_f, CMP_GE, 1920x1080, 32FC1, { gapi.kernel_package }) case had 30% changes compared to the case without shuffle. I would like to ask if you have also encountered such a situation and could you share your experience? ### Pull Request Readiness Checklist See details at https://github.com/opencv/opencv/wiki/How_to_contribute#making-a-good-pull-request - [x] I agree to contribute to the project under Apache 2 License. - [x] To the best of my knowledge, the proposed patch is not based on a code under GPL or another license that is incompatible with OpenCV - [x] The PR is proposed to the proper branch - [ ] There is a reference to the original bug report and related work - [ ] There is accuracy test, performance test and test data in opencv_extra repository, if applicable Patch to opencv_extra has the same branch name. - [ ] The feature is well documented and sample code can be built with the project CMake
759 lines
27 KiB
C++
759 lines
27 KiB
C++
// This file is part of OpenCV project.
|
|
// It is subject to the license terms in the LICENSE file found in the top-level directory
|
|
// of this distribution and at http://opencv.org/license.html.
|
|
|
|
#include "precomp.hpp"
|
|
|
|
#include "parallel_impl.hpp"
|
|
|
|
#ifdef HAVE_PTHREADS_PF
|
|
#include <pthread.h>
|
|
|
|
#include <opencv2/core/utils/configuration.private.hpp>
|
|
|
|
#include <opencv2/core/utils/logger.defines.hpp>
|
|
//#undef CV_LOG_STRIP_LEVEL
|
|
//#define CV_LOG_STRIP_LEVEL CV_LOG_LEVEL_VERBOSE + 1
|
|
#include <opencv2/core/utils/logger.hpp>
|
|
|
|
#include <opencv2/core/utils/trace.private.hpp>
|
|
|
|
//#define CV_PROFILE_THREADS 64
|
|
//#define getTickCount getCPUTickCount // use this if getTickCount() calls are expensive (and getCPUTickCount() is accurate)
|
|
|
|
//#define CV_USE_GLOBAL_WORKERS_COND_VAR // not effective on many-core systems (10+)
|
|
|
|
#include <atomic>
|
|
|
|
// Spin lock's OS-level yield
|
|
#ifdef DECLARE_CV_YIELD
|
|
DECLARE_CV_YIELD
|
|
#endif
|
|
#ifndef CV_YIELD
|
|
# include <thread>
|
|
# define CV_YIELD() std::this_thread::yield()
|
|
#endif // CV_YIELD
|
|
|
|
// Spin lock's CPU-level yield (required for Hyper-Threading)
|
|
#ifdef DECLARE_CV_PAUSE
|
|
DECLARE_CV_PAUSE
|
|
#endif
|
|
#ifndef CV_PAUSE
|
|
# if defined __GNUC__ && (defined __i386__ || defined __x86_64__)
|
|
# include <x86intrin.h> /* for __rdtsc */
|
|
# if !defined(__SSE2__)
|
|
static inline void cv_non_sse_mm_pause() { __asm__ __volatile__ ("rep; nop"); }
|
|
# define _mm_pause cv_non_sse_mm_pause
|
|
# endif
|
|
// With Skylake CPUs and above, _mm_pause takes 140 cycles so no need for a loop.
|
|
# define CV_PAUSE(v) do { (void)v; _mm_pause(); } while (0)
|
|
# elif defined __GNUC__ && defined __aarch64__
|
|
# define CV_PAUSE(v) do { for (int __delay = (v); __delay > 0; --__delay) { asm volatile("yield" ::: "memory"); } } while (0)
|
|
# elif defined __GNUC__ && defined __arm__
|
|
# define CV_PAUSE(v) do { for (int __delay = (v); __delay > 0; --__delay) { asm volatile("" ::: "memory"); } } while (0)
|
|
# elif defined __GNUC__ && defined __mips__ && __mips_isa_rev >= 2
|
|
# define CV_PAUSE(v) do { for (int __delay = (v); __delay > 0; --__delay) { asm volatile("pause" ::: "memory"); } } while (0)
|
|
# elif defined __GNUC__ && defined __PPC64__
|
|
# define CV_PAUSE(v) do { for (int __delay = (v); __delay > 0; --__delay) { asm volatile("or 27,27,27" ::: "memory"); } } while (0)
|
|
# elif defined __GNUC__ && defined __riscv
|
|
// PAUSE HINT is not part of RISC-V ISA yet, but is under discussion now. For details see:
|
|
// https://github.com/riscv/riscv-isa-manual/pull/398
|
|
// https://github.com/riscv/riscv-isa-manual/issues/43
|
|
// # define CV_PAUSE(v) do { for (int __delay = (v); __delay > 0; --__delay) { asm volatile("pause"); } } while (0)
|
|
# define CV_PAUSE(v) do { for (int __delay = (v); __delay > 0; --__delay) { asm volatile("nop"); } } while (0)
|
|
# elif defined __GNUC__ && defined __loongarch__
|
|
# define CV_PAUSE(v) do { for (int __delay = (v); __delay > 0; --__delay) { asm volatile("nop"); } } while (0)
|
|
# else
|
|
# warning "Can't detect 'pause' (CPU-yield) instruction on the target platform. Specify CV_PAUSE() definition via compiler flags."
|
|
# define CV_PAUSE(...) do { /* no-op: works, but not effective */ } while (0)
|
|
# endif
|
|
#endif // CV_PAUSE
|
|
|
|
|
|
namespace cv
|
|
{
|
|
|
|
static int CV_ACTIVE_WAIT_PAUSE_LIMIT = (int)utils::getConfigurationParameterSizeT("OPENCV_THREAD_POOL_ACTIVE_WAIT_PAUSE_LIMIT", 16); // iterations
|
|
static int CV_WORKER_ACTIVE_WAIT = (int)utils::getConfigurationParameterSizeT("OPENCV_THREAD_POOL_ACTIVE_WAIT_WORKER", 2000); // iterations
|
|
static int CV_MAIN_THREAD_ACTIVE_WAIT = (int)utils::getConfigurationParameterSizeT("OPENCV_THREAD_POOL_ACTIVE_WAIT_MAIN", 10000); // iterations
|
|
|
|
static int CV_WORKER_ACTIVE_WAIT_THREADS_LIMIT = (int)utils::getConfigurationParameterSizeT("OPENCV_THREAD_POOL_ACTIVE_WAIT_THREADS_LIMIT", 0); // number of real cores
|
|
|
|
class WorkerThread;
|
|
class ParallelJob;
|
|
|
|
class ThreadPool
|
|
{
|
|
public:
|
|
static ThreadPool& instance()
|
|
{
|
|
CV_SINGLETON_LAZY_INIT_REF(ThreadPool, new ThreadPool())
|
|
}
|
|
|
|
static void stop()
|
|
{
|
|
ThreadPool& manager = instance();
|
|
manager.reconfigure(0);
|
|
}
|
|
|
|
void reconfigure(unsigned new_threads_count)
|
|
{
|
|
if (new_threads_count == threads.size())
|
|
return;
|
|
pthread_mutex_lock(&mutex);
|
|
reconfigure_(new_threads_count);
|
|
pthread_mutex_unlock(&mutex);
|
|
}
|
|
bool reconfigure_(unsigned new_threads_count); // internal implementation
|
|
|
|
void run(const Range& range, const ParallelLoopBody& body, double nstripes);
|
|
|
|
size_t getNumOfThreads();
|
|
|
|
void setNumOfThreads(unsigned n);
|
|
|
|
ThreadPool();
|
|
|
|
~ThreadPool();
|
|
|
|
unsigned num_threads;
|
|
|
|
pthread_mutex_t mutex; // guards fields (job/threads) from non-worker threads (concurrent parallel_for calls)
|
|
#if defined(CV_USE_GLOBAL_WORKERS_COND_VAR)
|
|
pthread_cond_t cond_thread_wake;
|
|
#endif
|
|
|
|
pthread_mutex_t mutex_notify;
|
|
pthread_cond_t cond_thread_task_complete;
|
|
|
|
std::vector< Ptr<WorkerThread> > threads;
|
|
|
|
Ptr<ParallelJob> job;
|
|
|
|
#ifdef CV_PROFILE_THREADS
|
|
double tickFreq;
|
|
int64 jobSubmitTime;
|
|
struct ThreadStatistics
|
|
{
|
|
ThreadStatistics() : threadWait(0)
|
|
{
|
|
reset();
|
|
}
|
|
void reset()
|
|
{
|
|
threadWake = 0;
|
|
threadExecuteStart = 0;
|
|
threadExecuteStop = 0;
|
|
executedTasks = 0;
|
|
keepActive = false;
|
|
threadPing = getTickCount();
|
|
}
|
|
int64 threadWait; // don't reset by default
|
|
int64 threadPing; // don't reset by default
|
|
int64 threadWake;
|
|
int64 threadExecuteStart;
|
|
int64 threadExecuteStop;
|
|
int64 threadFree;
|
|
unsigned executedTasks;
|
|
bool keepActive;
|
|
|
|
int64 dummy_[8]; // separate cache lines
|
|
|
|
void dump(int id, int64 baseTime, double tickFreq)
|
|
{
|
|
if (id < 0)
|
|
std::cout << "Main: ";
|
|
else
|
|
printf("T%03d: ", id + 2);
|
|
printf("wait=% 10.1f ping=% 6.1f",
|
|
threadWait > 0 ? (threadWait - baseTime) / tickFreq * 1e6 : -0.0,
|
|
threadPing > 0 ? (threadPing - baseTime) / tickFreq * 1e6 : -0.0);
|
|
if (threadWake > 0)
|
|
printf(" wake=% 6.1f",
|
|
(threadWake > 0 ? (threadWake - baseTime) / tickFreq * 1e6 : -0.0));
|
|
if (threadExecuteStart > 0)
|
|
{
|
|
printf(" exec=% 6.1f - % 6.1f tasksDone=%5u free=% 6.1f",
|
|
(threadExecuteStart > 0 ? (threadExecuteStart - baseTime) / tickFreq * 1e6 : -0.0),
|
|
(threadExecuteStop > 0 ? (threadExecuteStop - baseTime) / tickFreq * 1e6 : -0.0),
|
|
executedTasks,
|
|
(threadFree > 0 ? (threadFree - baseTime) / tickFreq * 1e6 : -0.0));
|
|
if (id >= 0)
|
|
printf(" active=%s\n", keepActive ? "true" : "false");
|
|
else
|
|
printf("\n");
|
|
}
|
|
else
|
|
printf(" ------------------------------------------------------------------------------\n");
|
|
}
|
|
};
|
|
ThreadStatistics threads_stat[CV_PROFILE_THREADS]; // 0 - main thread, 1..N - worker threads
|
|
#endif
|
|
|
|
};
|
|
|
|
class WorkerThread
|
|
{
|
|
public:
|
|
ThreadPool& thread_pool;
|
|
const unsigned id;
|
|
pthread_t posix_thread;
|
|
bool is_created;
|
|
|
|
std::atomic<bool> stop_thread;
|
|
|
|
std::atomic<bool> has_wake_signal;
|
|
|
|
Ptr<ParallelJob> job;
|
|
|
|
pthread_mutex_t mutex;
|
|
#if !defined(CV_USE_GLOBAL_WORKERS_COND_VAR)
|
|
volatile bool isActive;
|
|
pthread_cond_t cond_thread_wake;
|
|
#endif
|
|
|
|
WorkerThread(ThreadPool& thread_pool_, unsigned id_) :
|
|
thread_pool(thread_pool_),
|
|
id(id_),
|
|
posix_thread(0),
|
|
is_created(false),
|
|
stop_thread(false),
|
|
has_wake_signal(false)
|
|
#if !defined(CV_USE_GLOBAL_WORKERS_COND_VAR)
|
|
, isActive(true)
|
|
#endif
|
|
{
|
|
CV_LOG_VERBOSE(NULL, 1, "MainThread: initializing new worker: " << id);
|
|
int res = pthread_mutex_init(&mutex, NULL);
|
|
if (res != 0)
|
|
{
|
|
CV_LOG_ERROR(NULL, id << ": Can't create thread mutex: res = " << res);
|
|
return;
|
|
}
|
|
#if !defined(CV_USE_GLOBAL_WORKERS_COND_VAR)
|
|
res = pthread_cond_init(&cond_thread_wake, NULL);
|
|
if (res != 0)
|
|
{
|
|
CV_LOG_ERROR(NULL, id << ": Can't create thread condition variable: res = " << res);
|
|
return;
|
|
}
|
|
#endif
|
|
res = pthread_create(&posix_thread, NULL, thread_loop_wrapper, (void*)this);
|
|
if (res != 0)
|
|
{
|
|
CV_LOG_ERROR(NULL, id << ": Can't spawn new thread: res = " << res);
|
|
}
|
|
else
|
|
{
|
|
is_created = true;
|
|
}
|
|
}
|
|
|
|
~WorkerThread()
|
|
{
|
|
CV_LOG_VERBOSE(NULL, 1, "MainThread: destroy worker thread: " << id);
|
|
if (is_created)
|
|
{
|
|
if (!stop_thread)
|
|
{
|
|
pthread_mutex_lock(&mutex); // to avoid signal miss due pre-check
|
|
stop_thread = true;
|
|
pthread_mutex_unlock(&mutex);
|
|
#if defined(CV_USE_GLOBAL_WORKERS_COND_VAR)
|
|
pthread_cond_broadcast(&thread_pool.cond_thread_wake);
|
|
#else
|
|
pthread_cond_signal(&cond_thread_wake);
|
|
#endif
|
|
}
|
|
pthread_join(posix_thread, NULL);
|
|
}
|
|
#if !defined(CV_USE_GLOBAL_WORKERS_COND_VAR)
|
|
pthread_cond_destroy(&cond_thread_wake);
|
|
#endif
|
|
pthread_mutex_destroy(&mutex);
|
|
}
|
|
|
|
void thread_body();
|
|
static void* thread_loop_wrapper(void* thread_object)
|
|
{
|
|
#ifdef OPENCV_WITH_ITT
|
|
__itt_thread_set_name(cv::format("OpenCVThread-%03d", cv::utils::getThreadID()).c_str());
|
|
#endif
|
|
((WorkerThread*)thread_object)->thread_body();
|
|
return 0;
|
|
}
|
|
};
|
|
|
|
class ParallelJob
|
|
{
|
|
public:
|
|
ParallelJob(const ThreadPool& thread_pool_, const Range& range_, const ParallelLoopBody& body_, int nstripes_) :
|
|
thread_pool(thread_pool_),
|
|
body(body_),
|
|
range(range_),
|
|
nstripes((unsigned)nstripes_),
|
|
is_completed(false)
|
|
{
|
|
CV_LOG_VERBOSE(NULL, 5, "ParallelJob::ParallelJob(" << (void*)this << ")");
|
|
current_task.store(0, std::memory_order_relaxed);
|
|
active_thread_count.store(0, std::memory_order_relaxed);
|
|
completed_thread_count.store(0, std::memory_order_relaxed);
|
|
dummy0_[0] = 0, dummy1_[0] = 0, dummy2_[0] = 0; // compiler warning
|
|
}
|
|
|
|
~ParallelJob()
|
|
{
|
|
CV_LOG_VERBOSE(NULL, 5, "ParallelJob::~ParallelJob(" << (void*)this << ")");
|
|
}
|
|
|
|
unsigned execute(bool is_worker_thread)
|
|
{
|
|
unsigned executed_tasks = 0;
|
|
const int task_count = range.size();
|
|
const int remaining_multiplier = std::min(nstripes,
|
|
std::max(
|
|
std::min(100u, thread_pool.num_threads * 4),
|
|
thread_pool.num_threads * 2
|
|
)); // experimental value
|
|
for (;;)
|
|
{
|
|
int chunk_size = std::max(1, (task_count - current_task) / remaining_multiplier);
|
|
int id = current_task.fetch_add(chunk_size, std::memory_order_seq_cst);
|
|
if (id >= task_count)
|
|
break; // no more free tasks
|
|
|
|
executed_tasks += chunk_size;
|
|
int start_id = id;
|
|
int end_id = std::min(task_count, id + chunk_size);
|
|
CV_LOG_VERBOSE(NULL, 9, "Thread: job " << start_id << "-" << end_id);
|
|
|
|
//TODO: if (not pending exception)
|
|
{
|
|
body.operator()(Range(range.start + start_id, range.start + end_id));
|
|
}
|
|
if (is_worker_thread && is_completed)
|
|
{
|
|
CV_LOG_ERROR(NULL, "\t\t\t\tBUG! Job: " << (void*)this << " " << id << " " << active_thread_count << " " << completed_thread_count);
|
|
CV_Assert(!is_completed); // TODO Dbg this
|
|
}
|
|
}
|
|
return executed_tasks;
|
|
}
|
|
|
|
const ThreadPool& thread_pool;
|
|
const ParallelLoopBody& body;
|
|
const Range range;
|
|
const unsigned nstripes;
|
|
|
|
std::atomic<int> current_task; // next free part of job
|
|
int64 dummy0_[8]; // avoid cache-line reusing for the same atomics
|
|
|
|
std::atomic<int> active_thread_count; // number of threads worked on this job
|
|
int64 dummy1_[8]; // avoid cache-line reusing for the same atomics
|
|
|
|
std::atomic<int> completed_thread_count; // number of threads completed any activities on this job
|
|
int64 dummy2_[8]; // avoid cache-line reusing for the same atomics
|
|
|
|
std::atomic<bool> is_completed;
|
|
|
|
// TODO exception handling
|
|
};
|
|
|
|
|
|
// Disable thread sanitization check when CV_USE_GLOBAL_WORKERS_COND_VAR is not
|
|
// set because it triggers as the main thread reads isActive while the children
|
|
// thread writes it (but it all works out because a mutex is locked in the main
|
|
// thread and isActive re-checked).
|
|
// This is to solve issue #19463.
|
|
#if !defined(CV_USE_GLOBAL_WORKERS_COND_VAR) && defined(__clang__) && defined(__has_feature)
|
|
#if __has_feature(thread_sanitizer)
|
|
__attribute__((no_sanitize("thread")))
|
|
#endif
|
|
#endif
|
|
void WorkerThread::thread_body()
|
|
{
|
|
(void)cv::utils::getThreadID(); // notify OpenCV about new thread
|
|
CV_LOG_VERBOSE(NULL, 5, "Thread: new thread: " << id);
|
|
|
|
bool allow_active_wait = true;
|
|
|
|
#ifdef CV_PROFILE_THREADS
|
|
ThreadPool::ThreadStatistics& stat = thread_pool.threads_stat[id + 1];
|
|
#endif
|
|
|
|
while (!stop_thread)
|
|
{
|
|
CV_LOG_VERBOSE(NULL, 5, "Thread: ... loop iteration: allow_active_wait=" << allow_active_wait << " has_wake_signal=" << has_wake_signal);
|
|
if (allow_active_wait && CV_WORKER_ACTIVE_WAIT > 0)
|
|
{
|
|
allow_active_wait = false;
|
|
for (int i = 0; i < CV_WORKER_ACTIVE_WAIT; i++)
|
|
{
|
|
if (has_wake_signal)
|
|
break;
|
|
if (CV_ACTIVE_WAIT_PAUSE_LIMIT > 0 && (i < CV_ACTIVE_WAIT_PAUSE_LIMIT || (i & 1)))
|
|
CV_PAUSE(16);
|
|
else
|
|
CV_YIELD();
|
|
}
|
|
}
|
|
pthread_mutex_lock(&mutex);
|
|
#ifdef CV_PROFILE_THREADS
|
|
stat.threadWait = getTickCount();
|
|
#endif
|
|
while (!has_wake_signal) // to handle spurious wakeups
|
|
{
|
|
//CV_LOG_VERBOSE(NULL, 5, "Thread: wait (sleep) ...");
|
|
#if defined(CV_USE_GLOBAL_WORKERS_COND_VAR)
|
|
pthread_cond_wait(&thread_pool.cond_thread_wake, &mutex);
|
|
#else
|
|
isActive = false;
|
|
pthread_cond_wait(&cond_thread_wake, &mutex);
|
|
isActive = true;
|
|
#endif
|
|
CV_LOG_VERBOSE(NULL, 5, "Thread: wake ... (has_wake_signal=" << has_wake_signal << " stop_thread=" << stop_thread << ")")
|
|
}
|
|
#ifdef CV_PROFILE_THREADS
|
|
stat.threadWake = getTickCount();
|
|
#endif
|
|
|
|
CV_LOG_VERBOSE(NULL, 5, "Thread: checking for new job");
|
|
if (CV_WORKER_ACTIVE_WAIT_THREADS_LIMIT == 0)
|
|
allow_active_wait = true;
|
|
Ptr<ParallelJob> j_ptr; swap(j_ptr, job);
|
|
has_wake_signal = false;
|
|
pthread_mutex_unlock(&mutex);
|
|
|
|
if (!stop_thread)
|
|
{
|
|
ParallelJob* j = j_ptr;
|
|
if (j)
|
|
{
|
|
CV_LOG_VERBOSE(NULL, 5, "Thread: job size=" << j->range.size() << " done=" << j->current_task);
|
|
if (j->current_task < j->range.size())
|
|
{
|
|
int other = j->active_thread_count.fetch_add(1, std::memory_order_seq_cst);
|
|
CV_LOG_VERBOSE(NULL, 5, "Thread: processing new job (with " << other << " other threads)"); CV_UNUSED(other);
|
|
#ifdef CV_PROFILE_THREADS
|
|
stat.threadExecuteStart = getTickCount();
|
|
stat.executedTasks = j->execute(true);
|
|
stat.threadExecuteStop = getTickCount();
|
|
#else
|
|
j->execute(true);
|
|
#endif
|
|
int completed = j->completed_thread_count.fetch_add(1, std::memory_order_seq_cst) + 1;
|
|
int active = j->active_thread_count.load(std::memory_order_acquire);
|
|
if (CV_WORKER_ACTIVE_WAIT_THREADS_LIMIT > 0)
|
|
{
|
|
allow_active_wait = true;
|
|
if (active >= CV_WORKER_ACTIVE_WAIT_THREADS_LIMIT && (id & 1) == 0) // turn off a half of threads
|
|
allow_active_wait = false;
|
|
}
|
|
CV_LOG_VERBOSE(NULL, 5, "Thread: completed job processing: " << active << " " << completed);
|
|
if (active == completed)
|
|
{
|
|
bool need_signal = !j->is_completed;
|
|
j->is_completed = true;
|
|
j = NULL; j_ptr.release();
|
|
if (need_signal)
|
|
{
|
|
CV_LOG_VERBOSE(NULL, 5, "Thread: job finished => notifying the main thread");
|
|
pthread_mutex_lock(&thread_pool.mutex_notify); // to avoid signal miss due pre-check condition
|
|
// empty
|
|
pthread_mutex_unlock(&thread_pool.mutex_notify);
|
|
pthread_cond_broadcast/*pthread_cond_signal*/(&thread_pool.cond_thread_task_complete);
|
|
}
|
|
}
|
|
}
|
|
else
|
|
{
|
|
CV_LOG_VERBOSE(NULL, 5, "Thread: no free job tasks");
|
|
}
|
|
}
|
|
}
|
|
#ifdef CV_PROFILE_THREADS
|
|
stat.threadFree = getTickCount();
|
|
stat.keepActive = allow_active_wait;
|
|
#endif
|
|
}
|
|
}
|
|
|
|
ThreadPool::ThreadPool()
|
|
{
|
|
#ifdef CV_PROFILE_THREADS
|
|
tickFreq = getTickFrequency();
|
|
#endif
|
|
|
|
int res = 0;
|
|
res |= pthread_mutex_init(&mutex, NULL);
|
|
res |= pthread_mutex_init(&mutex_notify, NULL);
|
|
#if defined(CV_USE_GLOBAL_WORKERS_COND_VAR)
|
|
res |= pthread_cond_init(&cond_thread_wake, NULL);
|
|
#endif
|
|
res |= pthread_cond_init(&cond_thread_task_complete, NULL);
|
|
|
|
if (0 != res)
|
|
{
|
|
CV_LOG_FATAL(NULL, "Failed to initialize ThreadPool (pthreads)");
|
|
}
|
|
num_threads = defaultNumberOfThreads();
|
|
}
|
|
|
|
bool ThreadPool::reconfigure_(unsigned new_threads_count)
|
|
{
|
|
if (new_threads_count == threads.size())
|
|
return false;
|
|
|
|
if (new_threads_count < threads.size())
|
|
{
|
|
CV_LOG_VERBOSE(NULL, 1, "MainThread: reduce worker pool: " << threads.size() << " => " << new_threads_count);
|
|
std::vector< Ptr<WorkerThread> > release_threads(threads.size() - new_threads_count);
|
|
for (size_t i = new_threads_count; i < threads.size(); ++i)
|
|
{
|
|
pthread_mutex_lock(&threads[i]->mutex); // to avoid signal miss due pre-check
|
|
threads[i]->stop_thread = true;
|
|
threads[i]->has_wake_signal = true;
|
|
#if !defined(CV_USE_GLOBAL_WORKERS_COND_VAR)
|
|
pthread_mutex_unlock(&threads[i]->mutex);
|
|
pthread_cond_broadcast/*pthread_cond_signal*/(&threads[i]->cond_thread_wake); // wake thread
|
|
#else
|
|
pthread_mutex_unlock(&threads[i]->mutex);
|
|
#endif
|
|
std::swap(threads[i], release_threads[i - new_threads_count]);
|
|
}
|
|
#if defined(CV_USE_GLOBAL_WORKERS_COND_VAR)
|
|
CV_LOG_VERBOSE(NULL, 1, "MainThread: notify worker threads about termination...");
|
|
pthread_cond_broadcast(&cond_thread_wake); // wake all threads
|
|
#endif
|
|
threads.resize(new_threads_count);
|
|
release_threads.clear(); // calls thread_join which want to lock mutex
|
|
return false;
|
|
}
|
|
else
|
|
{
|
|
CV_LOG_VERBOSE(NULL, 1, "MainThread: upgrade worker pool: " << threads.size() << " => " << new_threads_count);
|
|
for (size_t i = threads.size(); i < new_threads_count; ++i)
|
|
{
|
|
threads.push_back(Ptr<WorkerThread>(new WorkerThread(*this, (unsigned)i))); // spawn more threads
|
|
}
|
|
}
|
|
return false;
|
|
}
|
|
|
|
ThreadPool::~ThreadPool()
|
|
{
|
|
reconfigure(0);
|
|
pthread_cond_destroy(&cond_thread_task_complete);
|
|
#if defined(CV_USE_GLOBAL_WORKERS_COND_VAR)
|
|
pthread_cond_destroy(&cond_thread_wake);
|
|
#endif
|
|
pthread_mutex_destroy(&mutex);
|
|
pthread_mutex_destroy(&mutex_notify);
|
|
}
|
|
|
|
void ThreadPool::run(const Range& range, const ParallelLoopBody& body, double nstripes)
|
|
{
|
|
CV_LOG_VERBOSE(NULL, 1, "MainThread: new parallel job: num_threads=" << num_threads << " range=" << range.size() << " nstripes=" << nstripes << " job=" << (void*)job);
|
|
#ifdef CV_PROFILE_THREADS
|
|
jobSubmitTime = getTickCount();
|
|
threads_stat[0].reset();
|
|
threads_stat[0].threadWait = jobSubmitTime;
|
|
threads_stat[0].threadWake = jobSubmitTime;
|
|
#endif
|
|
if (getNumOfThreads() > 1 &&
|
|
job == NULL &&
|
|
(range.size() * nstripes >= 2 || (range.size() > 1 && nstripes <= 0))
|
|
)
|
|
{
|
|
pthread_mutex_lock(&mutex);
|
|
if (job != NULL)
|
|
{
|
|
pthread_mutex_unlock(&mutex);
|
|
body(range);
|
|
return;
|
|
}
|
|
reconfigure_(num_threads - 1);
|
|
|
|
{
|
|
CV_LOG_VERBOSE(NULL, 1, "MainThread: initialize parallel job: " << range.size());
|
|
job = Ptr<ParallelJob>(new ParallelJob(*this, range, body, nstripes));
|
|
pthread_mutex_unlock(&mutex);
|
|
|
|
CV_LOG_VERBOSE(NULL, 5, "MainThread: wake worker threads...");
|
|
size_t num_threads_to_wake = std::min(static_cast<size_t>(range.size()), threads.size());
|
|
for (size_t i = 0; i < num_threads_to_wake; ++i)
|
|
{
|
|
if (job->current_task >= job->range.size())
|
|
break;
|
|
WorkerThread& thread = *(threads[i].get());
|
|
if (
|
|
#if defined(__clang__) && defined(__has_feature)
|
|
#if __has_feature(thread_sanitizer)
|
|
1 || // Robust workaround to avoid data race warning of `thread.job`
|
|
#endif
|
|
#endif
|
|
#if !defined(CV_USE_GLOBAL_WORKERS_COND_VAR)
|
|
thread.isActive ||
|
|
#endif
|
|
thread.has_wake_signal
|
|
|| !thread.job.empty() // #10881
|
|
)
|
|
{
|
|
pthread_mutex_lock(&thread.mutex);
|
|
thread.job = job;
|
|
#if !defined(CV_USE_GLOBAL_WORKERS_COND_VAR)
|
|
bool isActive = thread.isActive;
|
|
#endif
|
|
thread.has_wake_signal = true;
|
|
#ifdef CV_PROFILE_THREADS
|
|
threads_stat[i + 1].reset();
|
|
#endif
|
|
pthread_mutex_unlock(&thread.mutex);
|
|
#if !defined(CV_USE_GLOBAL_WORKERS_COND_VAR)
|
|
if (!isActive)
|
|
{
|
|
pthread_cond_broadcast/*pthread_cond_signal*/(&thread.cond_thread_wake); // wake thread
|
|
}
|
|
#endif
|
|
}
|
|
else
|
|
{
|
|
CV_Assert(thread.job.empty());
|
|
thread.job = job;
|
|
thread.has_wake_signal = true;
|
|
#ifdef CV_PROFILE_THREADS
|
|
threads_stat[i + 1].reset();
|
|
#endif
|
|
#if !defined(CV_USE_GLOBAL_WORKERS_COND_VAR)
|
|
pthread_cond_broadcast/*pthread_cond_signal*/(&thread.cond_thread_wake); // wake thread
|
|
#endif
|
|
}
|
|
}
|
|
#ifdef CV_PROFILE_THREADS
|
|
threads_stat[0].threadPing = getTickCount();
|
|
#endif
|
|
#if defined(CV_USE_GLOBAL_WORKERS_COND_VAR)
|
|
pthread_cond_broadcast(&cond_thread_wake); // wake all threads
|
|
#endif
|
|
#ifdef CV_PROFILE_THREADS
|
|
threads_stat[0].threadWake = getTickCount();
|
|
#endif
|
|
CV_LOG_VERBOSE(NULL, 5, "MainThread: wake worker threads... (done)");
|
|
|
|
{
|
|
ParallelJob& j = *(this->job);
|
|
#ifdef CV_PROFILE_THREADS
|
|
threads_stat[0].threadExecuteStart = getTickCount();
|
|
threads_stat[0].executedTasks = j.execute(false);
|
|
threads_stat[0].threadExecuteStop = getTickCount();
|
|
#else
|
|
j.execute(false);
|
|
#endif
|
|
CV_Assert(j.current_task >= j.range.size());
|
|
CV_LOG_VERBOSE(NULL, 5, "MainThread: complete self-tasks: " << j.active_thread_count << " " << j.completed_thread_count);
|
|
if (job->is_completed || j.active_thread_count == 0)
|
|
{
|
|
job->is_completed = true;
|
|
CV_LOG_VERBOSE(NULL, 5, "MainThread: no WIP worker threads");
|
|
}
|
|
else
|
|
{
|
|
if (CV_MAIN_THREAD_ACTIVE_WAIT > 0)
|
|
{
|
|
for (int i = 0; i < CV_MAIN_THREAD_ACTIVE_WAIT; i++) // don't spin too much in any case (inaccurate getTickCount())
|
|
{
|
|
if (job->is_completed)
|
|
{
|
|
CV_LOG_VERBOSE(NULL, 5, "MainThread: job finalize (active wait) " << j.active_thread_count << " " << j.completed_thread_count);
|
|
break;
|
|
}
|
|
if (CV_ACTIVE_WAIT_PAUSE_LIMIT > 0 && (i < CV_ACTIVE_WAIT_PAUSE_LIMIT || (i & 1)))
|
|
CV_PAUSE(16);
|
|
else
|
|
CV_YIELD();
|
|
}
|
|
}
|
|
if (!job->is_completed)
|
|
{
|
|
CV_LOG_VERBOSE(NULL, 5, "MainThread: prepare wait " << j.active_thread_count << " " << j.completed_thread_count);
|
|
pthread_mutex_lock(&mutex_notify);
|
|
for (;;)
|
|
{
|
|
if (job->is_completed)
|
|
{
|
|
CV_LOG_VERBOSE(NULL, 5, "MainThread: job finalize (wait) " << j.active_thread_count << " " << j.completed_thread_count);
|
|
break;
|
|
}
|
|
CV_LOG_VERBOSE(NULL, 5, "MainThread: wait completion (sleep) ...");
|
|
pthread_cond_wait(&cond_thread_task_complete, &mutex_notify);
|
|
CV_LOG_VERBOSE(NULL, 5, "MainThread: wake");
|
|
}
|
|
pthread_mutex_unlock(&mutex_notify);
|
|
}
|
|
}
|
|
}
|
|
#ifdef CV_PROFILE_THREADS
|
|
threads_stat[0].threadFree = getTickCount();
|
|
std::cout << "Job: sz=" << range.size() << " nstripes=" << nstripes << " Time: " << (threads_stat[0].threadFree - jobSubmitTime) / tickFreq * 1e6 << " usec" << std::endl;
|
|
for (int i = 0; i < (int)threads.size() + 1; i++)
|
|
{
|
|
threads_stat[i].dump(i - 1, jobSubmitTime, tickFreq);
|
|
}
|
|
#endif
|
|
if (job)
|
|
{
|
|
pthread_mutex_lock(&mutex);
|
|
CV_LOG_VERBOSE(NULL, 5, "MainThread: job release");
|
|
CV_Assert(job->is_completed);
|
|
job.release();
|
|
pthread_mutex_unlock(&mutex);
|
|
}
|
|
}
|
|
}
|
|
else
|
|
{
|
|
body(range);
|
|
}
|
|
}
|
|
|
|
size_t ThreadPool::getNumOfThreads()
|
|
{
|
|
return num_threads;
|
|
}
|
|
|
|
void ThreadPool::setNumOfThreads(unsigned n)
|
|
{
|
|
if (n != num_threads)
|
|
{
|
|
num_threads = n;
|
|
if (n == 1)
|
|
if (job == NULL) reconfigure(0); // stop worker threads immediately
|
|
}
|
|
}
|
|
|
|
size_t parallel_pthreads_get_threads_num()
|
|
{
|
|
return ThreadPool::instance().getNumOfThreads();
|
|
}
|
|
|
|
void parallel_pthreads_set_threads_num(int num)
|
|
{
|
|
if(num < 0)
|
|
{
|
|
ThreadPool::instance().setNumOfThreads(0);
|
|
}
|
|
else
|
|
{
|
|
ThreadPool::instance().setNumOfThreads(unsigned(num));
|
|
}
|
|
}
|
|
|
|
void parallel_for_pthreads(const Range& range, const ParallelLoopBody& body, double nstripes)
|
|
{
|
|
ThreadPool::instance().run(range, body, nstripes);
|
|
}
|
|
|
|
}
|
|
|
|
#endif
|