mirror of
https://github.com/opencv/opencv.git
synced 2025-07-26 07:07:37 +08:00
Implement asynchronous execution for islands
This commit is contained in:
parent
cb5921b375
commit
b1f42a6506
@ -29,7 +29,6 @@ namespace cv
|
|||||||
// (user-inaccessible) classes.
|
// (user-inaccessible) classes.
|
||||||
class GNode;
|
class GNode;
|
||||||
struct GOrigin;
|
struct GOrigin;
|
||||||
|
|
||||||
template<typename T> class GArray;
|
template<typename T> class GArray;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -262,6 +261,9 @@ namespace detail
|
|||||||
{
|
{
|
||||||
return m_ref->m_desc;
|
return m_ref->m_desc;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// May be used to uniquely identify this object internally
|
||||||
|
const void *ptr() const { return static_cast<const void*>(m_ref.get()); }
|
||||||
};
|
};
|
||||||
|
|
||||||
// Helper (FIXME: work-around?)
|
// Helper (FIXME: work-around?)
|
||||||
|
@ -25,7 +25,6 @@ namespace cv
|
|||||||
// (user-inaccessible) classes.
|
// (user-inaccessible) classes.
|
||||||
class GNode;
|
class GNode;
|
||||||
struct GOrigin;
|
struct GOrigin;
|
||||||
|
|
||||||
template<typename T> class GOpaque;
|
template<typename T> class GOpaque;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -250,6 +249,9 @@ namespace detail
|
|||||||
{
|
{
|
||||||
return m_ref->m_desc;
|
return m_ref->m_desc;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// May be used to uniquely identify this object internally
|
||||||
|
const void *ptr() const { return static_cast<const void*>(m_ref.get()); }
|
||||||
};
|
};
|
||||||
} // namespace detail
|
} // namespace detail
|
||||||
|
|
||||||
|
@ -264,4 +264,28 @@ std::ostream& operator<<(std::ostream& os, const cv::GMetaArg &arg)
|
|||||||
|
|
||||||
return os;
|
return os;
|
||||||
}
|
}
|
||||||
|
} // namespace cv
|
||||||
|
|
||||||
|
// TODO: This function requires thorough testing
|
||||||
|
const void* cv::gimpl::proto::ptr(const GRunArgP &arg)
|
||||||
|
{
|
||||||
|
switch (arg.index())
|
||||||
|
{
|
||||||
|
#if !defined(GAPI_STANDALONE)
|
||||||
|
case GRunArgP::index_of<cv::Mat*>():
|
||||||
|
return static_cast<const void*>(cv::util::get<cv::Mat*>(arg));
|
||||||
|
case GRunArgP::index_of<cv::Scalar*>():
|
||||||
|
return static_cast<const void*>(cv::util::get<cv::Scalar*>(arg));
|
||||||
|
case GRunArgP::index_of<cv::UMat*>():
|
||||||
|
return static_cast<const void*>(cv::util::get<cv::UMat*>(arg));
|
||||||
|
#endif
|
||||||
|
case GRunArgP::index_of<cv::gapi::own::Mat*>():
|
||||||
|
return static_cast<const void*>(cv::util::get<cv::gapi::own::Mat*>(arg));
|
||||||
|
case GRunArgP::index_of<cv::detail::VectorRef>():
|
||||||
|
return cv::util::get<cv::detail::VectorRef>(arg).ptr();
|
||||||
|
case GRunArgP::index_of<cv::detail::OpaqueRef>():
|
||||||
|
return cv::util::get<cv::detail::OpaqueRef>(arg).ptr();
|
||||||
|
default:
|
||||||
|
util::throw_error(std::logic_error("Unknown GRunArgP type!"));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -28,8 +28,13 @@ GAPI_EXPORTS const GOrigin& origin_of (const GArg &arg);
|
|||||||
bool is_dynamic(const GArg &arg);
|
bool is_dynamic(const GArg &arg);
|
||||||
GProtoArg rewrap (const GArg &arg);
|
GProtoArg rewrap (const GArg &arg);
|
||||||
|
|
||||||
|
const void* ptr (const GRunArgP &arg);
|
||||||
|
|
||||||
} // proto
|
} // proto
|
||||||
} // gimpl
|
} // gimpl
|
||||||
} // cv
|
} // cv
|
||||||
|
|
||||||
|
// FIXME: the gproto.cpp file has more functions that listed here
|
||||||
|
// where those are declared??
|
||||||
|
|
||||||
#endif // OPENCV_GAPI_GPROTO_PRIV_HPP
|
#endif // OPENCV_GAPI_GPROTO_PRIV_HPP
|
||||||
|
@ -327,13 +327,36 @@ void GIslandExecutable::run(GIslandExecutable::IInput &in, GIslandExecutable::IO
|
|||||||
std::vector<OutObj> out_objs;
|
std::vector<OutObj> out_objs;
|
||||||
const auto &in_desc = in.desc();
|
const auto &in_desc = in.desc();
|
||||||
const auto &out_desc = out.desc();
|
const auto &out_desc = out.desc();
|
||||||
const auto in_vector = in.get(); // FIXME: passing temporary objects to toRange() leads to issues
|
const auto in_msg = in.get();
|
||||||
|
if (cv::util::holds_alternative<cv::gimpl::EndOfStream>(in_msg))
|
||||||
|
{
|
||||||
|
out.post(cv::gimpl::EndOfStream{});
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
GAPI_Assert(cv::util::holds_alternative<cv::GRunArgs>(in_msg));
|
||||||
|
const auto in_vector = cv::util::get<cv::GRunArgs>(in_msg);
|
||||||
in_objs.reserve(in_desc.size());
|
in_objs.reserve(in_desc.size());
|
||||||
out_objs.reserve(out_desc.size());
|
out_objs.reserve(out_desc.size());
|
||||||
for (auto &&it: ade::util::zip(ade::util::toRange(in_desc),
|
for (auto &&it: ade::util::zip(ade::util::toRange(in_desc),
|
||||||
ade::util::toRange(in_vector)))
|
ade::util::toRange(in_vector)))
|
||||||
{
|
{
|
||||||
in_objs.emplace_back(std::get<0>(it), std::get<1>(it));
|
// FIXME: Not every Island expects a cv::Mat instead of own::Mat on input
|
||||||
|
// This kludge should go as a result of de-ownification
|
||||||
|
const cv::GRunArg& in_data_orig = std::get<1>(it);
|
||||||
|
cv::GRunArg in_data;
|
||||||
|
switch (in_data_orig.index())
|
||||||
|
{
|
||||||
|
case cv::GRunArg::index_of<cv::Mat>():
|
||||||
|
in_data = cv::GRunArg{cv::to_own(cv::util::get<cv::Mat>(in_data_orig))};
|
||||||
|
break;
|
||||||
|
case cv::GRunArg::index_of<cv::Scalar>():
|
||||||
|
in_data = cv::GRunArg{(cv::util::get<cv::Scalar>(in_data_orig))};
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
in_data = in_data_orig;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
in_objs.emplace_back(std::get<0>(it), std::move(in_data));
|
||||||
}
|
}
|
||||||
for (auto &&it: ade::util::indexed(ade::util::toRange(out_desc)))
|
for (auto &&it: ade::util::indexed(ade::util::toRange(out_desc)))
|
||||||
{
|
{
|
||||||
|
@ -151,15 +151,18 @@ public:
|
|||||||
void set(const std::vector<cv::gimpl::RcDesc> &newd) { d = newd; }
|
void set(const std::vector<cv::gimpl::RcDesc> &newd) { d = newd; }
|
||||||
const std::vector<cv::gimpl::RcDesc> &desc() const { return d; }
|
const std::vector<cv::gimpl::RcDesc> &desc() const { return d; }
|
||||||
};
|
};
|
||||||
|
struct EndOfStream {};
|
||||||
|
using StreamMsg = cv::util::variant<EndOfStream, cv::GRunArgs>;
|
||||||
struct GIslandExecutable::IInput: public GIslandExecutable::IODesc {
|
struct GIslandExecutable::IInput: public GIslandExecutable::IODesc {
|
||||||
virtual ~IInput() = default;
|
virtual ~IInput() = default;
|
||||||
virtual cv::GRunArgs get() = 0; // Get a new input vector (blocking)
|
virtual StreamMsg get() = 0; // Get a new input vector (blocking)
|
||||||
virtual cv::GRunArgs try_get() = 0; // Get a new input vector (non-blocking)
|
virtual StreamMsg try_get() = 0; // Get a new input vector (non-blocking)
|
||||||
};
|
};
|
||||||
struct GIslandExecutable::IOutput: public GIslandExecutable::IODesc {
|
struct GIslandExecutable::IOutput: public GIslandExecutable::IODesc {
|
||||||
virtual ~IOutput() = default;
|
virtual ~IOutput() = default;
|
||||||
virtual GRunArgP get(int idx) = 0; // Allocate (wrap) a new data object for output idx
|
virtual GRunArgP get(int idx) = 0; // Allocate (wrap) a new data object for output idx
|
||||||
virtual void post(GRunArgP&&) = 0; // Release the object back to the framework (mark available)
|
virtual void post(GRunArgP&&) = 0; // Release the object back to the framework (mark available)
|
||||||
|
virtual void post(EndOfStream&&) = 0; // Post end-of-stream marker back to the framework
|
||||||
};
|
};
|
||||||
|
|
||||||
// GIslandEmitter - a backend-specific thing which feeds data into
|
// GIslandEmitter - a backend-specific thing which feeds data into
|
||||||
|
@ -126,13 +126,13 @@ void cv::gimpl::GExecutor::initResource(const ade::NodeHandle &orig_nh)
|
|||||||
class cv::gimpl::GExecutor::Input final: public cv::gimpl::GIslandExecutable::IInput
|
class cv::gimpl::GExecutor::Input final: public cv::gimpl::GIslandExecutable::IInput
|
||||||
{
|
{
|
||||||
cv::gimpl::Mag &mag;
|
cv::gimpl::Mag &mag;
|
||||||
virtual cv::GRunArgs get() override
|
virtual StreamMsg get() override
|
||||||
{
|
{
|
||||||
cv::GRunArgs res;
|
cv::GRunArgs res;
|
||||||
for (const auto &rc : desc()) { res.emplace_back(magazine::getArg(mag, rc)); }
|
for (const auto &rc : desc()) { res.emplace_back(magazine::getArg(mag, rc)); }
|
||||||
return res;
|
return StreamMsg{std::move(res)};
|
||||||
}
|
}
|
||||||
virtual cv::GRunArgs try_get() override { return get(); }
|
virtual StreamMsg try_get() override { return get(); }
|
||||||
public:
|
public:
|
||||||
Input(cv::gimpl::Mag &m, const std::vector<RcDesc> &rcs) : mag(m) { set(rcs); }
|
Input(cv::gimpl::Mag &m, const std::vector<RcDesc> &rcs) : mag(m) { set(rcs); }
|
||||||
};
|
};
|
||||||
@ -142,6 +142,7 @@ class cv::gimpl::GExecutor::Output final: public cv::gimpl::GIslandExecutable::I
|
|||||||
cv::gimpl::Mag &mag;
|
cv::gimpl::Mag &mag;
|
||||||
virtual GRunArgP get(int idx) override { return magazine::getObjPtr(mag, desc()[idx]); }
|
virtual GRunArgP get(int idx) override { return magazine::getObjPtr(mag, desc()[idx]); }
|
||||||
virtual void post(GRunArgP&&) override { } // Do nothing here
|
virtual void post(GRunArgP&&) override { } // Do nothing here
|
||||||
|
virtual void post(EndOfStream&&) override {} // Do nothing here too
|
||||||
public:
|
public:
|
||||||
Output(cv::gimpl::Mag &m, const std::vector<RcDesc> &rcs) : mag(m) { set(rcs); }
|
Output(cv::gimpl::Mag &m, const std::vector<RcDesc> &rcs) : mag(m) { set(rcs); }
|
||||||
};
|
};
|
||||||
|
@ -12,11 +12,13 @@
|
|||||||
|
|
||||||
#include <opencv2/gapi/opencv_includes.hpp>
|
#include <opencv2/gapi/opencv_includes.hpp>
|
||||||
|
|
||||||
#include "executor/gstreamingexecutor.hpp"
|
#include "api/gproto_priv.hpp" // ptr(GRunArgP)
|
||||||
#include "compiler/passes/passes.hpp"
|
#include "compiler/passes/passes.hpp"
|
||||||
#include "backends/common/gbackend.hpp" // createMat
|
#include "backends/common/gbackend.hpp" // createMat
|
||||||
#include "compiler/gcompiler.hpp" // for compileIslands
|
#include "compiler/gcompiler.hpp" // for compileIslands
|
||||||
|
|
||||||
|
#include "executor/gstreamingexecutor.hpp"
|
||||||
|
|
||||||
namespace
|
namespace
|
||||||
{
|
{
|
||||||
using namespace cv::gimpl::stream;
|
using namespace cv::gimpl::stream;
|
||||||
@ -362,6 +364,203 @@ void emitterActorThread(std::shared_ptr<cv::gimpl::GIslandEmitter> emitter,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
class StreamingInput final: public cv::gimpl::GIslandExecutable::IInput
|
||||||
|
{
|
||||||
|
QueueReader &qr;
|
||||||
|
std::vector<Q*> &in_queues; // FIXME: This can be part of QueueReader
|
||||||
|
cv::GRunArgs &in_constants; // FIXME: This can be part of QueueReader
|
||||||
|
|
||||||
|
virtual cv::gimpl::StreamMsg get() override
|
||||||
|
{
|
||||||
|
cv::GRunArgs isl_input_args;
|
||||||
|
if (!qr.getInputVector(in_queues, in_constants, isl_input_args))
|
||||||
|
{
|
||||||
|
// Stop case
|
||||||
|
return cv::gimpl::StreamMsg{cv::gimpl::EndOfStream{}};
|
||||||
|
}
|
||||||
|
return cv::gimpl::StreamMsg{std::move(isl_input_args)};
|
||||||
|
}
|
||||||
|
virtual cv::gimpl::StreamMsg try_get() override
|
||||||
|
{
|
||||||
|
// FIXME: This is not very usable at the moment!
|
||||||
|
return get();
|
||||||
|
}
|
||||||
|
public:
|
||||||
|
explicit StreamingInput(QueueReader &rdr,
|
||||||
|
std::vector<Q*> &inq,
|
||||||
|
cv::GRunArgs &inc,
|
||||||
|
const std::vector<cv::gimpl::RcDesc> &in_descs)
|
||||||
|
: qr(rdr), in_queues(inq), in_constants(inc)
|
||||||
|
{
|
||||||
|
set(in_descs);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
class StreamingOutput final: public cv::gimpl::GIslandExecutable::IOutput
|
||||||
|
{
|
||||||
|
// These objects form an internal state of the StreamingOutput
|
||||||
|
struct Posting
|
||||||
|
{
|
||||||
|
using V = cv::util::variant<cv::GRunArg, cv::gimpl::EndOfStream>;
|
||||||
|
V data;
|
||||||
|
bool ready = false;
|
||||||
|
};
|
||||||
|
using PostingList = std::list<Posting>;
|
||||||
|
std::vector<PostingList> m_postings;
|
||||||
|
std::unordered_map< const void*
|
||||||
|
, std::pair<int, PostingList::iterator>
|
||||||
|
> m_postIdx;
|
||||||
|
std::size_t m_stops_sent = 0u;
|
||||||
|
|
||||||
|
// These objects are owned externally
|
||||||
|
const cv::GMetaArgs &m_metas;
|
||||||
|
std::vector< std::vector<Q*> > &m_out_queues;
|
||||||
|
|
||||||
|
// Allocate a new data object for output under idx
|
||||||
|
// Prepare this object for posting
|
||||||
|
virtual cv::GRunArgP get(int idx) override
|
||||||
|
{
|
||||||
|
using MatType = cv::Mat;
|
||||||
|
using SclType = cv::Scalar;
|
||||||
|
|
||||||
|
// Allocate a new posting first, then bind this GRunArgP to this item
|
||||||
|
auto iter = m_postings[idx].insert(m_postings[idx].end(), Posting{});
|
||||||
|
const auto r = desc()[idx];
|
||||||
|
cv::GRunArg& out_arg = cv::util::get<cv::GRunArg>(iter->data);
|
||||||
|
cv::GRunArgP ret_val;
|
||||||
|
switch (r.shape) {
|
||||||
|
// Allocate a data object based on its shape & meta, and put it into our vectors.
|
||||||
|
// Yes, first we put a cv::Mat GRunArg, and then specify _THAT_
|
||||||
|
// pointer as an output parameter - to make sure that after island completes,
|
||||||
|
// our GRunArg still has the right (up-to-date) value.
|
||||||
|
// Same applies to other types.
|
||||||
|
// FIXME: This is absolutely ugly but seem to work perfectly for its purpose.
|
||||||
|
case cv::GShape::GMAT:
|
||||||
|
{
|
||||||
|
MatType newMat;
|
||||||
|
cv::gimpl::createMat(cv::util::get<cv::GMatDesc>(m_metas[idx]), newMat);
|
||||||
|
out_arg = cv::GRunArg(std::move(newMat));
|
||||||
|
ret_val = cv::GRunArgP(&cv::util::get<MatType>(out_arg));
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
case cv::GShape::GSCALAR:
|
||||||
|
{
|
||||||
|
SclType newScl;
|
||||||
|
out_arg = cv::GRunArg(std::move(newScl));
|
||||||
|
ret_val = cv::GRunArgP(&cv::util::get<SclType>(out_arg));
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
case cv::GShape::GARRAY:
|
||||||
|
{
|
||||||
|
cv::detail::VectorRef newVec;
|
||||||
|
cv::util::get<cv::detail::ConstructVec>(r.ctor)(newVec);
|
||||||
|
out_arg = cv::GRunArg(std::move(newVec));
|
||||||
|
// VectorRef is implicitly shared so no pointer is taken here
|
||||||
|
// FIXME: that variant MOVE problem again
|
||||||
|
const auto &rr = cv::util::get<cv::detail::VectorRef>(out_arg);
|
||||||
|
ret_val = cv::GRunArgP(rr);
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
case cv::GShape::GOPAQUE:
|
||||||
|
{
|
||||||
|
cv::detail::OpaqueRef newOpaque;
|
||||||
|
cv::util::get<cv::detail::ConstructOpaque>(r.ctor)(newOpaque);
|
||||||
|
out_arg = cv::GRunArg(std::move(newOpaque));
|
||||||
|
// OpaqueRef is implicitly shared so no pointer is taken here
|
||||||
|
// FIXME: that variant MOVE problem again
|
||||||
|
const auto &rr = cv::util::get<cv::detail::OpaqueRef>(out_arg);
|
||||||
|
ret_val = cv::GRunArgP(rr);
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
cv::util::throw_error(std::logic_error("Unsupported GShape"));
|
||||||
|
}
|
||||||
|
m_postIdx[cv::gimpl::proto::ptr(ret_val)] = std::make_pair(idx, iter);
|
||||||
|
return ret_val;
|
||||||
|
}
|
||||||
|
virtual void post(cv::GRunArgP&& argp) override
|
||||||
|
{
|
||||||
|
// Mark the output ready for posting. If it is the first in the line,
|
||||||
|
// actually post it and all its successors which are ready for posting too.
|
||||||
|
auto it = m_postIdx.find(cv::gimpl::proto::ptr(argp));
|
||||||
|
GAPI_Assert(it != m_postIdx.end());
|
||||||
|
const int out_idx = it->second.first;
|
||||||
|
const auto out_iter = it->second.second;
|
||||||
|
out_iter->ready = true;
|
||||||
|
m_postIdx.erase(it); // Drop the link from the cache anyway
|
||||||
|
if (out_iter != m_postings[out_idx].begin())
|
||||||
|
{
|
||||||
|
return; // There are some pending postings in the beginning, return
|
||||||
|
}
|
||||||
|
|
||||||
|
GAPI_Assert(out_iter == m_postings[out_idx].begin());
|
||||||
|
auto post_iter = m_postings[out_idx].begin();
|
||||||
|
while (post_iter != m_postings[out_idx].end() && post_iter->ready == true)
|
||||||
|
{
|
||||||
|
Cmd cmd;
|
||||||
|
if (cv::util::holds_alternative<cv::GRunArg>(post_iter->data))
|
||||||
|
{
|
||||||
|
// FIXME: That ugly VARIANT problem
|
||||||
|
cmd = Cmd{const_cast<const cv::GRunArg&>(cv::util::get<cv::GRunArg>(post_iter->data))};
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
GAPI_Assert(cv::util::holds_alternative<cv::gimpl::EndOfStream>(post_iter->data));
|
||||||
|
cmd = Cmd{Stop{}};
|
||||||
|
m_stops_sent++;
|
||||||
|
}
|
||||||
|
for (auto &&q : m_out_queues[out_idx])
|
||||||
|
{
|
||||||
|
// FIXME: This ugly VARIANT problem
|
||||||
|
q->push(const_cast<const Cmd&>(cmd));
|
||||||
|
}
|
||||||
|
post_iter = m_postings[out_idx].erase(post_iter);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
virtual void post(cv::gimpl::EndOfStream&&) override
|
||||||
|
{
|
||||||
|
// If the posting list is empty, just broadcast the stop message.
|
||||||
|
// If it is not, enqueue the Stop message in the postings list.
|
||||||
|
for (auto &&it : ade::util::indexed(m_postings))
|
||||||
|
{
|
||||||
|
const auto idx = ade::util::index(it);
|
||||||
|
auto &lst = ade::util::value(it);
|
||||||
|
if (lst.empty())
|
||||||
|
{
|
||||||
|
for (auto &&q : m_out_queues[idx])
|
||||||
|
{
|
||||||
|
q->push(Cmd(Stop{}));
|
||||||
|
}
|
||||||
|
m_stops_sent++;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
Posting p;
|
||||||
|
p.data = Posting::V{cv::gimpl::EndOfStream{}};
|
||||||
|
p.ready = true;
|
||||||
|
lst.push_back(std::move(p)); // FIXME: For some reason {}-ctor didn't work here
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
public:
|
||||||
|
explicit StreamingOutput(const cv::GMetaArgs &metas,
|
||||||
|
std::vector< std::vector<Q*> > &out_queues,
|
||||||
|
const std::vector<cv::gimpl::RcDesc> &out_descs)
|
||||||
|
: m_metas(metas)
|
||||||
|
, m_out_queues(out_queues)
|
||||||
|
{
|
||||||
|
set(out_descs);
|
||||||
|
m_postings.resize(out_descs.size());
|
||||||
|
}
|
||||||
|
|
||||||
|
bool done() const
|
||||||
|
{
|
||||||
|
// The streaming actor work is considered DONE for this stream
|
||||||
|
// when it posted/resent all STOP messages to all its outputs.
|
||||||
|
return m_stops_sent == desc().size();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
// This thread is a plain dumb processing actor. What it do is just:
|
// This thread is a plain dumb processing actor. What it do is just:
|
||||||
// - Reads input from the input queue(s), sleeps if there's nothing to read
|
// - Reads input from the input queue(s), sleeps if there's nothing to read
|
||||||
// - Once a full input vector is obtained, passes it to the underlying island
|
// - Once a full input vector is obtained, passes it to the underlying island
|
||||||
@ -380,133 +579,11 @@ void islandActorThread(std::vector<cv::gimpl::RcDesc> in_rcs, //
|
|||||||
GAPI_Assert(out_queues.size() == out_rcs.size());
|
GAPI_Assert(out_queues.size() == out_rcs.size());
|
||||||
GAPI_Assert(out_queues.size() == out_metas.size());
|
GAPI_Assert(out_queues.size() == out_metas.size());
|
||||||
QueueReader qr;
|
QueueReader qr;
|
||||||
while (true)
|
StreamingInput input(qr, in_queues, in_constants, in_rcs);
|
||||||
|
StreamingOutput output(out_metas, out_queues, out_rcs);
|
||||||
|
while (!output.done())
|
||||||
{
|
{
|
||||||
std::vector<cv::gimpl::GIslandExecutable::InObj> isl_inputs;
|
island->run(input, output);
|
||||||
isl_inputs.resize(in_rcs.size());
|
|
||||||
|
|
||||||
cv::GRunArgs isl_input_args;
|
|
||||||
if (!qr.getInputVector(in_queues, in_constants, isl_input_args))
|
|
||||||
{
|
|
||||||
// Stop received -- broadcast Stop down to the pipeline and quit
|
|
||||||
for (auto &&out_qq : out_queues)
|
|
||||||
{
|
|
||||||
for (auto &&out_q : out_qq) out_q->push(Cmd{Stop{}});
|
|
||||||
}
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
GAPI_Assert(isl_inputs.size() == isl_input_args.size());
|
|
||||||
for (auto &&it : ade::util::indexed(ade::util::zip(ade::util::toRange(in_rcs),
|
|
||||||
ade::util::toRange(isl_inputs),
|
|
||||||
ade::util::toRange(isl_input_args))))
|
|
||||||
{
|
|
||||||
const auto &value = ade::util::value(it);
|
|
||||||
const auto &in_rc = std::get<0>(value);
|
|
||||||
auto &isl_input = std::get<1>(value);
|
|
||||||
const auto &in_arg = std::get<2>(value); // FIXME: MOVE PROBLEM
|
|
||||||
isl_input.first = in_rc;
|
|
||||||
#if defined(GAPI_STANDALONE)
|
|
||||||
// Standalone mode - simply store input argument in the vector as-is
|
|
||||||
auto id = ade::util::index(it);
|
|
||||||
isl_inputs[id].second = in_arg;
|
|
||||||
#else
|
|
||||||
// Make Islands operate on own:: data types (i.e. in the same
|
|
||||||
// environment as GExecutor provides)
|
|
||||||
// This way several backends (e.g. Fluid) remain OpenCV-independent.
|
|
||||||
switch (in_arg.index()) {
|
|
||||||
case cv::GRunArg::index_of<cv::Mat>():
|
|
||||||
isl_input.second = cv::GRunArg{cv::to_own(cv::util::get<cv::Mat>(in_arg))};
|
|
||||||
break;
|
|
||||||
case cv::GRunArg::index_of<cv::Scalar>():
|
|
||||||
isl_input.second = cv::GRunArg{cv::util::get<cv::Scalar>(in_arg)};
|
|
||||||
break;
|
|
||||||
default:
|
|
||||||
isl_input.second = in_arg;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
#endif // GAPI_STANDALONE
|
|
||||||
}
|
|
||||||
// Once the vector is obtained, prepare data for island execution
|
|
||||||
// Note - we first allocate output vector via GRunArg!
|
|
||||||
// Then it is converted to a GRunArgP.
|
|
||||||
std::vector<cv::gimpl::GIslandExecutable::OutObj> isl_outputs;
|
|
||||||
cv::GRunArgs out_data;
|
|
||||||
isl_outputs.resize(out_rcs.size());
|
|
||||||
out_data.resize(out_rcs.size());
|
|
||||||
for (auto &&it : ade::util::indexed(out_rcs))
|
|
||||||
{
|
|
||||||
auto id = ade::util::index(it);
|
|
||||||
auto &r = ade::util::value(it);
|
|
||||||
|
|
||||||
#if !defined(GAPI_STANDALONE)
|
|
||||||
using MatType = cv::Mat;
|
|
||||||
using SclType = cv::Scalar;
|
|
||||||
#else
|
|
||||||
using MatType = cv::gapi::own::Mat;
|
|
||||||
using SclType = cv::Scalar;
|
|
||||||
#endif // GAPI_STANDALONE
|
|
||||||
|
|
||||||
switch (r.shape) {
|
|
||||||
// Allocate a data object based on its shape & meta, and put it into our vectors.
|
|
||||||
// Yes, first we put a cv::Mat GRunArg, and then specify _THAT_
|
|
||||||
// pointer as an output parameter - to make sure that after island completes,
|
|
||||||
// our GRunArg still has the right (up-to-date) value.
|
|
||||||
// Same applies to other types.
|
|
||||||
// FIXME: This is absolutely ugly but seem to work perfectly for its purpose.
|
|
||||||
case cv::GShape::GMAT:
|
|
||||||
{
|
|
||||||
MatType newMat;
|
|
||||||
cv::gimpl::createMat(cv::util::get<cv::GMatDesc>(out_metas[id]), newMat);
|
|
||||||
out_data[id] = cv::GRunArg(std::move(newMat));
|
|
||||||
isl_outputs[id] = { r, cv::GRunArgP(&cv::util::get<MatType>(out_data[id])) };
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
case cv::GShape::GSCALAR:
|
|
||||||
{
|
|
||||||
SclType newScl;
|
|
||||||
out_data[id] = cv::GRunArg(std::move(newScl));
|
|
||||||
isl_outputs[id] = { r, cv::GRunArgP(&cv::util::get<SclType>(out_data[id])) };
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
case cv::GShape::GARRAY:
|
|
||||||
{
|
|
||||||
cv::detail::VectorRef newVec;
|
|
||||||
cv::util::get<cv::detail::ConstructVec>(r.ctor)(newVec);
|
|
||||||
out_data[id] = cv::GRunArg(std::move(newVec));
|
|
||||||
// VectorRef is implicitly shared so no pointer is taken here
|
|
||||||
const auto &rr = cv::util::get<cv::detail::VectorRef>(out_data[id]); // FIXME: that variant MOVE problem again
|
|
||||||
isl_outputs[id] = { r, cv::GRunArgP(rr) };
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
case cv::GShape::GOPAQUE:
|
|
||||||
{
|
|
||||||
cv::detail::OpaqueRef newOpaque;
|
|
||||||
cv::util::get<cv::detail::ConstructOpaque>(r.ctor)(newOpaque);
|
|
||||||
out_data[id] = cv::GRunArg(std::move(newOpaque));
|
|
||||||
// OpaqueRef is implicitly shared so no pointer is taken here
|
|
||||||
const auto &rr = cv::util::get<cv::detail::OpaqueRef>(out_data[id]); // FIXME: that variant MOVE problem again
|
|
||||||
isl_outputs[id] = { r, cv::GRunArgP(rr) };
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
default:
|
|
||||||
cv::util::throw_error(std::logic_error("Unsupported GShape"));
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// Now ask Island to execute on this data
|
|
||||||
island->run(std::move(isl_inputs), std::move(isl_outputs));
|
|
||||||
|
|
||||||
// Once executed, dispatch our results down to the pipeline.
|
|
||||||
for (auto &&it : ade::util::zip(ade::util::toRange(out_queues),
|
|
||||||
ade::util::toRange(out_data)))
|
|
||||||
{
|
|
||||||
for (auto &&q : std::get<0>(it))
|
|
||||||
{
|
|
||||||
// FIXME: FATAL VARIANT ISSUE!!
|
|
||||||
const auto tmp = std::get<1>(it);
|
|
||||||
q->push(Cmd{tmp});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -815,6 +815,10 @@ struct GAPI_Streaming_Unit: public ::testing::Test {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// FIXME: (GAPI_Streaming_Types, InputOpaque) test is missing here!
|
||||||
|
// FIXME: (GAPI_Streaming_Types, XChangeOpaque) test is missing here!
|
||||||
|
// FIXME: (GAPI_Streaming_Types, OutputOpaque) test is missing here!
|
||||||
|
|
||||||
TEST_F(GAPI_Streaming_Unit, TestTwoVideoSourcesFail)
|
TEST_F(GAPI_Streaming_Unit, TestTwoVideoSourcesFail)
|
||||||
{
|
{
|
||||||
const auto c_ptr = gapi::wip::make_src<cv::gapi::wip::GCaptureSource>(findDataFile("cv/video/768x576.avi"));
|
const auto c_ptr = gapi::wip::make_src<cv::gapi::wip::GCaptureSource>(findDataFile("cv/video/768x576.avi"));
|
||||||
|
Loading…
Reference in New Issue
Block a user