From 491502a349010498571397217e3f7c1f7dbadda1 Mon Sep 17 00:00:00 2001 From: Alexander Alekhin Date: Fri, 16 Feb 2018 14:08:51 +0300 Subject: [PATCH] core: fix parallel_for data race --- modules/core/src/parallel_impl.cpp | 49 +++++++++++++++--------------- 1 file changed, 24 insertions(+), 25 deletions(-) diff --git a/modules/core/src/parallel_impl.cpp b/modules/core/src/parallel_impl.cpp index c270b94ba1..78d9eb6369 100644 --- a/modules/core/src/parallel_impl.cpp +++ b/modules/core/src/parallel_impl.cpp @@ -190,7 +190,7 @@ class WorkerThread { public: ThreadPool& thread_pool; - unsigned id; + const unsigned id; pthread_t posix_thread; bool is_created; @@ -418,14 +418,15 @@ void WorkerThread::thread_body() stat.threadWake = getTickCount(); #endif + CV_LOG_VERBOSE(NULL, 5, "Thread: checking for new job"); + if (CV_WORKER_ACTIVE_WAIT_THREADS_LIMIT == 0) + allow_active_wait = true; + Ptr j_ptr; swap(j_ptr, job); + has_wake_signal = false; // TODO .store(false, std::memory_order_release) + pthread_mutex_unlock(&mutex); + if (!stop_thread) { - CV_LOG_VERBOSE(NULL, 5, "Thread: checking for new job"); - if (CV_WORKER_ACTIVE_WAIT_THREADS_LIMIT == 0) - allow_active_wait = true; - Ptr j_ptr; swap(j_ptr, job); - has_wake_signal = false; - pthread_mutex_unlock(&mutex); ParallelJob* j = j_ptr; if (j) { @@ -480,10 +481,6 @@ void WorkerThread::thread_body() } } } - else - { - pthread_mutex_unlock(&mutex); - } #ifdef CV_PROFILE_THREADS stat.threadFree = getTickCount(); stat.keepActive = allow_active_wait; @@ -595,40 +592,42 @@ void ThreadPool::run(const Range& range, const ParallelLoopBody& body, double ns CV_LOG_VERBOSE(NULL, 5, "MainThread: wake worker threads..."); for (size_t i = 0; i < threads.size(); ++i) { + WorkerThread& thread = *(threads[i].get()); + if ( #if !defined(CV_USE_GLOBAL_WORKERS_COND_VAR) - bool isActive = threads[i]->isActive; - if (isActive || threads[i]->has_wake_signal) -#else - if (threads[i]->has_wake_signal) + thread.isActive || #endif + thread.has_wake_signal + || !thread.job.empty() // #10881 + ) { - pthread_mutex_lock(&threads[i]->mutex); - threads[i]->job = job; + pthread_mutex_lock(&thread.mutex); + thread.job = job; #if !defined(CV_USE_GLOBAL_WORKERS_COND_VAR) - isActive = threads[i]->isActive; + bool isActive = thread.isActive; #endif - threads[i]->has_wake_signal = true; + thread.has_wake_signal = true; #ifdef CV_PROFILE_THREADS threads_stat[i + 1].reset(); #endif - pthread_mutex_unlock(&threads[i]->mutex); + pthread_mutex_unlock(&thread.mutex); #if !defined(CV_USE_GLOBAL_WORKERS_COND_VAR) if (!isActive) { - pthread_cond_broadcast/*pthread_cond_signal*/(&threads[i]->cond_thread_wake); // wake thread + pthread_cond_broadcast/*pthread_cond_signal*/(&thread.cond_thread_wake); // wake thread } #endif } else { - CV_Assert(threads[i]->job.empty()); - threads[i]->job = job; - threads[i]->has_wake_signal = true; + CV_Assert(thread.job.empty()); + thread.job = job; + thread.has_wake_signal = true; #ifdef CV_PROFILE_THREADS threads_stat[i + 1].reset(); #endif #if !defined(CV_USE_GLOBAL_WORKERS_COND_VAR) - pthread_cond_broadcast/*pthread_cond_signal*/(&threads[i]->cond_thread_wake); // wake thread + pthread_cond_broadcast/*pthread_cond_signal*/(&thread.cond_thread_wake); // wake thread #endif } }