opencv/modules/gapi/src/executor/gthreadedexecutor.hpp
Anatoliy Talamanov 8e43c8f200
Merge pull request #24845 from TolyaTalamanov:at/concurrent-executor
G-API: Implement concurrent executor #24845

## Overview
This PR introduces the new G-API executor called `GThreadedExecutor` which can be selected when the `GComputation` is compiled in `serial` mode (a.k.a `GComputation::compile(...)`)

### ThreadPool
`cv::gapi::own::ThreadPool` has been introduced in order to abstract usage of threads in `GThreadedExecutor`.
`ThreadPool` is implemented by using  `own::concurrent_bounded_queue`

`ThreadPool` has only as single method `schedule` that will push task into the queue for the further execution.
The **important** notice is that if `Task` executed in `ThreadPool` throws exception - this is `UB`. 

### GThreadedExecutor
The `GThreadedExecutor` is mostly copy-paste of `GExecutor`, should we extend `GExecutor` instead? 

#### Implementation details
1. Build the dependency graph for `Island` nodes.
2. Store the tasks that don't have dependencies into separate `vector` in order to run them first.
3. at the `GThreadedExecutor::run()` schedule the tasks that don't have dependencies that will schedule their dependents and wait for the completion.


### Pull Request Readiness Checklist

See details at https://github.com/opencv/opencv/wiki/How_to_contribute#making-a-good-pull-request

- [ ] I agree to contribute to the project under Apache 2 License.
- [ ] To the best of my knowledge, the proposed patch is not based on a code under GPL or another license that is incompatible with OpenCV
- [ ] The PR is proposed to the proper branch
- [ ] There is a reference to the original bug report and related work
- [ ] There is accuracy test, performance test and test data in opencv_extra repository, if applicable
      Patch to opencv_extra has the same branch name.
- [ ] The feature is well documented and sample code can be built with the project CMake
2024-01-30 17:01:50 +03:00

124 lines
3.3 KiB
C++

// This file is part of OpenCV project.
// It is subject to the license terms in the LICENSE file found in the top-level directory
// of this distribution and at http://opencv.org/license.html.
//
// Copyright (C) 2024 Intel Corporation
#ifndef OPENCV_GAPI_GTHREADEDEXECUTOR_HPP
#define OPENCV_GAPI_GTHREADEDEXECUTOR_HPP
#include <utility> // tuple, required by magazine
#include <unordered_map> // required by magazine
#include "executor/gabstractexecutor.hpp"
#include "executor/thread_pool.hpp"
namespace cv {
namespace gimpl {
class Task;
class TaskManager {
public:
using F = std::function<void()>;
std::shared_ptr<Task> createTask(F &&f, std::vector<std::shared_ptr<Task>> &&producers);
void scheduleAndWait(cv::gapi::own::ThreadPool& tp);
private:
std::vector<std::shared_ptr<Task>> m_all_tasks;
std::vector<std::shared_ptr<Task>> m_initial_tasks;
};
struct GraphState {
Mag mag;
std::mutex m;
};
class IslandActor;
class GThreadedExecutor final: public GAbstractExecutor {
public:
class Input;
class Output;
explicit GThreadedExecutor(const uint32_t num_threads,
std::unique_ptr<ade::Graph> &&g_model);
void run(cv::gimpl::GRuntimeArgs &&args) override;
bool canReshape() const override;
void reshape(const GMetaArgs& inMetas, const GCompileArgs& args) override;
void prepareForNewStream() override;
private:
struct DataDesc
{
ade::NodeHandle slot_nh;
ade::NodeHandle data_nh;
};
void initResource(const ade::NodeHandle &nh, const ade::NodeHandle &orig_nh);
GraphState m_state;
std::vector<DataDesc> m_slots;
cv::gapi::own::ThreadPool m_thread_pool;
TaskManager m_task_manager;
std::vector<std::shared_ptr<IslandActor>> m_actors;
};
class GThreadedExecutor::Input final: public GIslandExecutable::IInput
{
public:
Input(GraphState& state, const std::vector<RcDesc> &rcs);
private:
virtual StreamMsg get() override;
virtual StreamMsg try_get() override { return get(); }
private:
GraphState& m_state;
};
class GThreadedExecutor::Output final: public GIslandExecutable::IOutput
{
public:
Output(GraphState &state, const std::vector<RcDesc> &rcs);
void verify();
private:
GRunArgP get(int idx) override;
void post(cv::GRunArgP&&, const std::exception_ptr& e) override;
void post(Exception&& ex) override;
void post(EndOfStream&&) override {};
void meta(const GRunArgP &out, const GRunArg::Meta &m) override;
private:
GraphState& m_state;
std::unordered_map<const void*, int> m_out_idx;
std::exception_ptr m_eptr;
};
class IslandActor {
public:
using Ptr = std::shared_ptr<IslandActor>;
IslandActor(const std::vector<RcDesc> &in_objects,
const std::vector<RcDesc> &out_objects,
std::shared_ptr<GIslandExecutable> isl_exec,
GraphState &state);
void run();
void verify();
std::shared_ptr<GIslandExecutable> exec() { return m_isl_exec; }
private:
std::shared_ptr<GIslandExecutable> m_isl_exec;
GThreadedExecutor::Input m_inputs;
GThreadedExecutor::Output m_outputs;
};
} // namespace gimpl
} // namespace cv
#endif // OPENCV_GAPI_GTHREADEDEXECUTOR_HPP