Merge pull request #20739 from sivanov-work:merge_base_decode

G-API: oneVPL (simplification) Add simple decode pipeline

* Add simple decode pipeline & add onevpl namespace

* Address some review comments

* Add compilation guard
This commit is contained in:
Sergey Ivanov 2021-09-28 18:02:21 +03:00 committed by GitHub
parent 280dc77f8b
commit c1148c4ea6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 992 additions and 15 deletions

View File

@ -174,6 +174,10 @@ set(gapi_srcs
src/streaming/onevpl/accelerators/surface/surface_pool.cpp
src/streaming/onevpl/accelerators/accel_policy_cpu.cpp
src/streaming/onevpl/accelerators/accel_policy_dx11.cpp
src/streaming/onevpl/engine/engine_session.cpp
src/streaming/onevpl/engine/processing_engine_base.cpp
src/streaming/onevpl/engine/decode/decode_engine_legacy.cpp
src/streaming/onevpl/engine/decode/decode_session.cpp
# Utils (ITT tracing)
src/utils/itt.cpp

View File

@ -20,6 +20,7 @@
namespace cv {
namespace gapi {
namespace wip {
namespace onevpl {
VPLCPUAccelerationPolicy::VPLCPUAccelerationPolicy() {
GAPI_LOG_INFO(nullptr, "created");
@ -219,6 +220,7 @@ cv::MediaFrame::AdapterPtr VPLCPUAccelerationPolicy::create_frame_adapter(pool_k
return cv::MediaFrame::AdapterPtr{new VPLMediaFrameCPUAdapter(*it)};
#endif // TEST_PERF
}
} // namespace onevpl
} // namespace wip
} // namespace gapi
} // namespace cv

View File

@ -22,6 +22,7 @@
namespace cv {
namespace gapi {
namespace wip {
namespace onevpl {
// GAPI_EXPORTS for tests
struct GAPI_EXPORTS VPLCPUAccelerationPolicy final : public VPLAccelerationPolicy
@ -47,6 +48,7 @@ struct GAPI_EXPORTS VPLCPUAccelerationPolicy final : public VPLAccelerationPolic
private:
std::map<pool_key_t, pool_t> pool_table;
};
} // namespace onevpl
} // namespace wip
} // namespace gapi
} // namespace cv

View File

@ -8,6 +8,7 @@
#include "streaming/onevpl/accelerators/accel_policy_dx11.hpp"
#include "streaming/onevpl/accelerators/surface/cpu_frame_adapter.hpp"
#include "streaming/onevpl/accelerators/surface/surface.hpp"
#include "streaming/onevpl/utils.hpp"
#include "logger.hpp"
#ifdef HAVE_DIRECTX
@ -27,6 +28,8 @@
namespace cv {
namespace gapi {
namespace wip {
namespace onevpl {
VPLDX11AccelerationPolicy::VPLDX11AccelerationPolicy()
{
#ifdef CPU_ACCEL_ADAPTER
@ -47,7 +50,8 @@ void VPLDX11AccelerationPolicy::init(session_t session) {
mfxStatus sts = MFXVideoCORE_GetHandle(session, MFX_HANDLE_D3D11_DEVICE, reinterpret_cast<mfxHDL*>(&hw_handle));
if (sts != MFX_ERR_NONE)
{
throw std::logic_error("Cannot create VPLDX11AccelerationPolicy, MFXVideoCORE_GetHandle error");
throw std::logic_error("Cannot create VPLDX11AccelerationPolicy, MFXVideoCORE_GetHandle error: " +
mfxstatus_to_string(sts));
}
GAPI_LOG_INFO(nullptr, "VPLDX11AccelerationPolicy initialized, session: " << session);
@ -106,6 +110,7 @@ cv::MediaFrame::AdapterPtr VPLDX11AccelerationPolicy::create_frame_adapter(pool_
(void)surface;
throw std::runtime_error("VPLDX11AccelerationPolicy::create_frame_adapter() is not implemented");
}
} // namespace onevpl
} // namespace wip
} // namespace gapi
} // namespace cv

View File

@ -32,6 +32,7 @@
namespace cv {
namespace gapi {
namespace wip {
namespace onevpl {
// GAPI_EXPORTS for tests
struct GAPI_EXPORTS VPLDX11AccelerationPolicy final: public VPLAccelerationPolicy
@ -57,6 +58,7 @@ private:
std::unique_ptr<VPLCPUAccelerationPolicy> adapter;
#endif
};
} // namespace onevpl
} // namespace wip
} // namespace gapi
} // namespace cv

View File

@ -19,6 +19,7 @@
namespace cv {
namespace gapi {
namespace wip {
namespace onevpl {
class Surface;
struct VPLAccelerationPolicy
@ -51,6 +52,7 @@ struct VPLAccelerationPolicy
virtual cv::MediaFrame::AdapterPtr create_frame_adapter(pool_key_t key,
mfxFrameSurface1* surface) = 0;
};
} // namespace onevpl
} // namespace wip
} // namespace gapi
} // namespace cv

View File

@ -19,6 +19,7 @@
namespace cv {
namespace gapi {
namespace wip {
namespace onevpl {
VPLMediaFrameCPUAdapter::VPLMediaFrameCPUAdapter(std::shared_ptr<Surface> surface):
parent_surface_ptr(surface) {
@ -111,6 +112,7 @@ void VPLMediaFrameCPUAdapter::serialize(cv::gapi::s11n::IOStream&) {
void VPLMediaFrameCPUAdapter::deserialize(cv::gapi::s11n::IIStream&) {
GAPI_Assert("VPLMediaFrameCPUAdapter::deserialize() is not implemented");
}
} // namespace onevpl
} // namespace wip
} // namespace gapi
} // namespace cv

View File

@ -16,6 +16,7 @@
namespace cv {
namespace gapi {
namespace wip {
namespace onevpl {
class Surface;
class VPLMediaFrameCPUAdapter : public cv::MediaFrame::IAdapter {
@ -33,6 +34,7 @@ public:
private:
std::shared_ptr<Surface> parent_surface_ptr;
};
} // namespace onevpl
} // namespace wip
} // namespace gapi
} // namespace cv

View File

@ -12,6 +12,7 @@
namespace cv {
namespace gapi {
namespace wip {
namespace onevpl {
Surface::Surface(std::unique_ptr<handle_t>&& surf, std::shared_ptr<void> associated_memory) :
workspace_memory_ptr(associated_memory),
@ -69,6 +70,7 @@ size_t Surface::release_lock() {
", locked times: " << locked_count - 1);
return locked_count; // return preceding value
}
} // namespace onevpl
} // namespace wip
} // namespace gapi
} // namespace cv

View File

@ -23,6 +23,7 @@
namespace cv {
namespace gapi {
namespace wip {
namespace onevpl {
/**
* @brief Inner class for managing oneVPL surface through interface `mfxFrameSurface1`.
@ -95,6 +96,7 @@ private:
using surface_ptr_t = std::shared_ptr<Surface>;
using surface_weak_ptr_t = std::weak_ptr<Surface>;
} // namespace onevpl
} // namespace wip
} // namespace gapi
} // namespace cv

View File

@ -7,6 +7,7 @@
namespace cv {
namespace gapi {
namespace wip {
namespace onevpl {
void CachedPool::reserve(size_t size) {
surfaces.reserve(size);
@ -63,6 +64,7 @@ CachedPool::surface_ptr_t CachedPool::find_by_handle(mfxFrameSurface1* handle) {
GAPI_Assert(it != cache.end() && "Cannot find cached surface from pool. Data corruption is possible");
return it->second;
}
} // namespace onevpl
} // namespace wip
} // namespace gapi
} // namespace cv

View File

@ -17,6 +17,7 @@
namespace cv {
namespace gapi {
namespace wip {
namespace onevpl {
class Surface;
// GAPI_EXPORTS for tests
@ -38,6 +39,7 @@ private:
free_surface_iterator_t next_free_it;
cached_surface_container_t cache;
};
} // namespace onevpl
} // namespace wip
} // namespace gapi
} // namespace cv

View File

@ -0,0 +1,310 @@
// This file is part of OpenCV project.
// It is subject to the license terms in the LICENSE file found in the top-level directory
// of this distribution and at http://opencv.org/license.html.
//
// Copyright (C) 2021 Intel Corporation
#ifdef HAVE_ONEVPL
#include <algorithm>
#include <exception>
#include <opencv2/gapi/streaming/onevpl/data_provider_interface.hpp>
#include "streaming/onevpl/engine/decode/decode_engine_legacy.hpp"
#include "streaming/onevpl/engine/decode/decode_session.hpp"
#include "streaming/onevpl/accelerators/accel_policy_interface.hpp"
#include "streaming/onevpl/accelerators/surface/surface.hpp"
#include "streaming/onevpl/utils.hpp"
#include "logger.hpp"
namespace cv {
namespace gapi {
namespace wip {
namespace onevpl {
/* UTILS */
mfxU32 GetSurfaceSize(mfxU32 FourCC, mfxU32 width, mfxU32 height) {
mfxU32 nbytes = 0;
mfxU32 half_width = width / 2;
mfxU32 half_height = height / 2;
switch (FourCC) {
case MFX_FOURCC_I420:
case MFX_FOURCC_NV12:
nbytes = width * height + 2 * half_width * half_height;
break;
case MFX_FOURCC_I010:
case MFX_FOURCC_P010:
nbytes = width * height + 2 * half_width * half_height;
nbytes *= 2;
break;
case MFX_FOURCC_RGB4:
nbytes = width * height * 4;
break;
default:
GAPI_LOG_WARNING(nullptr, "Unsupported FourCC requested: " << FourCC);
GAPI_Assert(false && "Unsupported FourCC requested");
break;
}
return nbytes;
}
surface_ptr_t create_surface_RGB4(mfxFrameInfo frameInfo,
std::shared_ptr<void> out_buf_ptr,
size_t out_buf_ptr_offset,
size_t out_buf_size)
{
mfxU8* buf = reinterpret_cast<mfxU8*>(out_buf_ptr.get());
mfxU16 surfW = frameInfo.Width * 4;
mfxU16 surfH = frameInfo.Height;
(void)surfH;
// TODO more intelligent check
if (out_buf_size <= out_buf_ptr_offset) {
throw std::runtime_error(std::string("Insufficient buffer size: ") +
std::to_string(out_buf_size) + ", buffer offset: " +
std::to_string(out_buf_ptr_offset) +
", expected surface width: " + std::to_string(surfW) +
", height: " + std::to_string(surfH));
}
std::unique_ptr<mfxFrameSurface1> handle(new mfxFrameSurface1);
memset(handle.get(), 0, sizeof(mfxFrameSurface1));
handle->Info = frameInfo;
handle->Data.B = buf + out_buf_ptr_offset;
handle->Data.G = handle->Data.B + 1;
handle->Data.R = handle->Data.B + 2;
handle->Data.A = handle->Data.B + 3;
handle->Data.Pitch = surfW;
return Surface::create_surface(std::move(handle), out_buf_ptr);
}
surface_ptr_t create_surface_other(mfxFrameInfo frameInfo,
std::shared_ptr<void> out_buf_ptr,
size_t out_buf_ptr_offset,
size_t out_buf_size)
{
mfxU8* buf = reinterpret_cast<mfxU8*>(out_buf_ptr.get());
mfxU16 surfH = frameInfo.Height;
mfxU16 surfW = (frameInfo.FourCC == MFX_FOURCC_P010) ? frameInfo.Width * 2 : frameInfo.Width;
// TODO more intelligent check
if (out_buf_size <=
out_buf_ptr_offset + (surfW * surfH) + ((surfW / 2) * (surfH / 2))) {
throw std::runtime_error(std::string("Insufficient buffer size: ") +
std::to_string(out_buf_size) + ", buffer offset: " +
std::to_string(out_buf_ptr_offset) +
", expected surface width: " + std::to_string(surfW) +
", height: " + std::to_string(surfH));
}
std::unique_ptr<mfxFrameSurface1> handle(new mfxFrameSurface1);
memset(handle.get(), 0, sizeof(mfxFrameSurface1));
handle->Info = frameInfo;
handle->Data.Y = buf + out_buf_ptr_offset;
handle->Data.U = buf + out_buf_ptr_offset + (surfW * surfH);
handle->Data.V = handle->Data.U + ((surfW / 2) * (surfH / 2));
handle->Data.Pitch = surfW;
return Surface::create_surface(std::move(handle), out_buf_ptr);
}
VPLLegacyDecodeEngine::VPLLegacyDecodeEngine(std::unique_ptr<VPLAccelerationPolicy>&& accel)
: ProcessingEngineBase(std::move(accel)) {
GAPI_LOG_INFO(nullptr, "Create Legacy Decode Engine");
create_pipeline(
// 1) Read File
[this] (EngineSession& sess) -> ExecutionStatus
{
LegacyDecodeSession &my_sess = static_cast<LegacyDecodeSession&>(sess);
my_sess.last_status = ReadEncodedStream(my_sess.stream, my_sess.data_provider);
if (my_sess.last_status != MFX_ERR_NONE) {
my_sess.data_provider.reset(); //close source
}
return ExecutionStatus::Continue;
},
// 2) enqueue ASYNC decode
[this] (EngineSession& sess) -> ExecutionStatus
{
LegacyDecodeSession &my_sess = static_cast<LegacyDecodeSession&>(sess);
my_sess.last_status =
MFXVideoDECODE_DecodeFrameAsync(my_sess.session,
my_sess.last_status == MFX_ERR_NONE
? &my_sess.stream
: nullptr, /* No more data to read, start decode draining mode*/
my_sess.procesing_surface_ptr.lock()->get_handle(),
&my_sess.output_surface_ptr,
&my_sess.sync);
return ExecutionStatus::Continue;
},
// 3) Wait for ASYNC decode result
[this] (EngineSession& sess) -> ExecutionStatus
{
if (sess.last_status == MFX_ERR_NONE) // Got 1 decoded frame
{
do {
//TODO try to extract TIMESTAMP
sess.last_status = MFXVideoCORE_SyncOperation(sess.session, sess.sync, 100);
if (MFX_ERR_NONE == sess.last_status) {
LegacyDecodeSession& my_sess = static_cast<LegacyDecodeSession&>(sess);
on_frame_ready(my_sess);
}
} while (sess.last_status == MFX_WRN_IN_EXECUTION);
}
return ExecutionStatus::Continue;
},
// 4) Falls back on generic status procesing
[this] (EngineSession& sess) -> ExecutionStatus
{
return this->process_error(sess.last_status, static_cast<LegacyDecodeSession&>(sess));
}
);
}
void VPLLegacyDecodeEngine::initialize_session(mfxSession mfx_session,
DecoderParams&& decoder_param,
std::shared_ptr<onevpl::IDataProvider> provider)
{
mfxFrameAllocRequest decRequest = {};
// Query number required surfaces for decoder
MFXVideoDECODE_QueryIOSurf(mfx_session, &decoder_param.param, &decRequest);
// External (application) allocation of decode surfaces
GAPI_LOG_DEBUG(nullptr, "Query IOSurf for session: " << mfx_session <<
", mfxFrameAllocRequest.NumFrameSuggested: " << decRequest.NumFrameSuggested <<
", mfxFrameAllocRequest.Type: " << decRequest.Type);
mfxU32 singleSurfaceSize = GetSurfaceSize(decoder_param.param.mfx.FrameInfo.FourCC,
decoder_param.param.mfx.FrameInfo.Width,
decoder_param.param.mfx.FrameInfo.Height);
if (!singleSurfaceSize) {
throw std::runtime_error("Cannot determine surface size for: fourCC" +
std::to_string(decoder_param.param.mfx.FrameInfo.FourCC) +
", width: " + std::to_string(decoder_param.param.mfx.FrameInfo.Width) +
", height: " + std::to_string(decoder_param.param.mfx.FrameInfo.Height));
}
const auto &frameInfo = decoder_param.param.mfx.FrameInfo;
auto surface_creator =
[&frameInfo] (std::shared_ptr<void> out_buf_ptr, size_t out_buf_ptr_offset,
size_t out_buf_size) -> surface_ptr_t {
return (frameInfo.FourCC == MFX_FOURCC_RGB4) ?
create_surface_RGB4(frameInfo, out_buf_ptr, out_buf_ptr_offset,
out_buf_size) :
create_surface_other(frameInfo, out_buf_ptr, out_buf_ptr_offset,
out_buf_size);};
//TODO Configure preallocation size (how many frames we can hold)
const size_t preallocated_frames_count = 30;
VPLAccelerationPolicy::pool_key_t decode_pool_key =
acceleration_policy->create_surface_pool(decRequest.NumFrameSuggested * preallocated_frames_count,
singleSurfaceSize,
surface_creator);
// create session
std::shared_ptr<LegacyDecodeSession> sess_ptr =
register_session<LegacyDecodeSession>(mfx_session,
std::move(decoder_param),
provider);
sess_ptr->init_surface_pool(decode_pool_key);
// prepare working decode surface
sess_ptr->swap_surface(*this);
}
ProcessingEngineBase::ExecutionStatus VPLLegacyDecodeEngine::execute_op(operation_t& op, EngineSession& sess) {
return op(sess);
}
void VPLLegacyDecodeEngine::on_frame_ready(LegacyDecodeSession& sess)
{
GAPI_LOG_DEBUG(nullptr, "[" << sess.session << "], frame ready");
// manage memory ownership rely on acceleration policy
auto frame_adapter = acceleration_policy->create_frame_adapter(sess.decoder_pool_id,
sess.output_surface_ptr);
ready_frames.emplace(cv::MediaFrame(std::move(frame_adapter)), sess.generate_frame_meta());
}
ProcessingEngineBase::ExecutionStatus VPLLegacyDecodeEngine::process_error(mfxStatus status, LegacyDecodeSession& sess)
{
GAPI_LOG_DEBUG(nullptr, "status: " << mfxstatus_to_string(status));
switch (status) {
case MFX_ERR_NONE:
return ExecutionStatus::Continue;
case MFX_ERR_MORE_DATA: // The function requires more bitstream at input before decoding can proceed
if (!sess.data_provider || sess.data_provider->empty()) {
// No more data to drain from decoder, start encode draining mode
return ExecutionStatus::Processed;
}
else
return ExecutionStatus::Continue; // read more data
break;
case MFX_ERR_MORE_SURFACE:
{
// The function requires more frame surface at output before decoding can proceed.
// This applies to external memory allocations and should not be expected for
// a simple internal allocation case like this
try {
sess.swap_surface(*this);
return ExecutionStatus::Continue;
} catch (const std::exception& ex) {
GAPI_LOG_WARNING(nullptr, "[" << sess.session << "] error: " << ex.what());
}
break;
}
case MFX_ERR_DEVICE_LOST:
// For non-CPU implementations,
// Cleanup if device is lost
GAPI_DbgAssert(false && "VPLLegacyDecodeEngine::process_error - "
"MFX_ERR_DEVICE_LOST is not processed");
break;
case MFX_WRN_DEVICE_BUSY:
// For non-CPU implementations,
// Wait a few milliseconds then try again
GAPI_DbgAssert(false && "VPLLegacyDecodeEngine::process_error - "
"MFX_WRN_DEVICE_BUSY is not processed");
break;
case MFX_WRN_VIDEO_PARAM_CHANGED:
// The decoder detected a new sequence header in the bitstream.
// Video parameters may have changed.
// In external memory allocation case, might need to reallocate the output surface
GAPI_DbgAssert(false && "VPLLegacyDecodeEngine::process_error - "
"MFX_WRN_VIDEO_PARAM_CHANGED is not processed");
break;
case MFX_ERR_INCOMPATIBLE_VIDEO_PARAM:
// The function detected that video parameters provided by the application
// are incompatible with initialization parameters.
// The application should close the component and then reinitialize it
GAPI_DbgAssert(false && "VPLLegacyDecodeEngine::process_error - "
"MFX_ERR_INCOMPATIBLE_VIDEO_PARAM is not processed");
break;
case MFX_ERR_REALLOC_SURFACE:
// Bigger surface_work required. May be returned only if
// mfxInfoMFX::EnableReallocRequest was set to ON during initialization.
// This applies to external memory allocations and should not be expected for
// a simple internal allocation case like this
GAPI_DbgAssert(false && "VPLLegacyDecodeEngine::process_error - "
"MFX_ERR_REALLOC_SURFACE is not processed");
break;
default:
GAPI_LOG_WARNING(nullptr, "Unknown status code: " << mfxstatus_to_string(status));
break;
}
return ExecutionStatus::Failed;
}
} // namespace onevpl
} // namespace wip
} // namespace gapi
} // namespace cv
#endif // HAVE_ONEVPL

View File

@ -0,0 +1,48 @@
// This file is part of OpenCV project.
// It is subject to the license terms in the LICENSE file found in the top-level directory
// of this distribution and at http://opencv.org/license.html.
//
// Copyright (C) 2021 Intel Corporation
#ifndef GAPI_STREAMING_ONVPL_DECODE_ENGINE_LEGACY_HPP
#define GAPI_STREAMING_ONVPL_DECODE_ENGINE_LEGACY_HPP
#include <stdio.h>
#include <memory>
#include "streaming/onevpl/engine/processing_engine_base.hpp"
#ifdef HAVE_ONEVPL
#if (MFX_VERSION >= 2000)
#include <vpl/mfxdispatcher.h>
#endif
#include <vpl/mfx.h>
namespace cv {
namespace gapi {
namespace wip {
namespace onevpl {
class LegacyDecodeSession;
struct DecoderParams;
struct IDataProvider;
struct VPLAccelerationPolicy;
class VPLLegacyDecodeEngine : public ProcessingEngineBase {
public:
VPLLegacyDecodeEngine(std::unique_ptr<VPLAccelerationPolicy>&& accel);
void initialize_session(mfxSession mfx_session, DecoderParams&& decoder_param,
std::shared_ptr<IDataProvider> provider) override;
private:
ExecutionStatus execute_op(operation_t& op, EngineSession& sess) override;
ExecutionStatus process_error(mfxStatus status, LegacyDecodeSession& sess);
void on_frame_ready(LegacyDecodeSession& sess);
};
} // namespace onevpl
} // namespace wip
} // namespace gapi
} // namespace cv
#endif // HAVE_ONEVPL
#endif // GAPI_STREAMING_ONVPL_DECODE_ENGINE_LEGACY_HPP

View File

@ -0,0 +1,78 @@
// This file is part of OpenCV project.
// It is subject to the license terms in the LICENSE file found in the top-level directory
// of this distribution and at http://opencv.org/license.html.
//
// Copyright (C) 2021 Intel Corporation
#ifdef HAVE_ONEVPL
#include <chrono>
#include <exception>
#include "streaming/onevpl/engine/decode/decode_session.hpp"
#include "streaming/onevpl/engine/decode/decode_engine_legacy.hpp"
#include "streaming/onevpl/accelerators/accel_policy_interface.hpp"
#include "streaming/onevpl/accelerators/surface/surface.hpp"
#include "streaming/onevpl/utils.hpp"
#include "logger.hpp"
namespace cv {
namespace gapi {
namespace wip {
namespace onevpl {
LegacyDecodeSession::LegacyDecodeSession(mfxSession sess,
DecoderParams&& decoder_param,
std::shared_ptr<IDataProvider> provider) :
EngineSession(sess, std::move(decoder_param.stream)),
mfx_decoder_param(std::move(decoder_param.param)),
data_provider(std::move(provider)),
procesing_surface_ptr(),
output_surface_ptr(),
decoded_frames_count()
{
}
LegacyDecodeSession::~LegacyDecodeSession()
{
GAPI_LOG_INFO(nullptr, "Close Decode for session: " << session);
MFXVideoDECODE_Close(session);
}
void LegacyDecodeSession::swap_surface(VPLLegacyDecodeEngine& engine) {
VPLAccelerationPolicy* acceleration_policy = engine.get_accel();
GAPI_Assert(acceleration_policy && "Empty acceleration_policy");
auto old_locked = procesing_surface_ptr.lock();
try {
auto cand = acceleration_policy->get_free_surface(decoder_pool_id).lock();
GAPI_LOG_DEBUG(nullptr, "[" << session << "] swap surface"
", old: " << (old_locked ? old_locked->get_handle() : nullptr) <<
", new: "<< cand->get_handle());
procesing_surface_ptr = cand;
} catch (const std::exception& ex) {
GAPI_LOG_WARNING(nullptr, "[" << session << "] error: " << ex.what() <<
"Abort");
}
}
void LegacyDecodeSession::init_surface_pool(VPLAccelerationPolicy::pool_key_t key) {
GAPI_Assert(key && "Init decode pull with empty key");
decoder_pool_id = key;
}
Data::Meta LegacyDecodeSession::generate_frame_meta() {
const auto now = std::chrono::system_clock::now();
const auto dur = std::chrono::duration_cast<std::chrono::microseconds>
(now.time_since_epoch());
Data::Meta meta {
{cv::gapi::streaming::meta_tag::timestamp, int64_t{dur.count()} },
{cv::gapi::streaming::meta_tag::seq_id, int64_t{decoded_frames_count++}}
};
return meta;
}
} // namespace onevpl
} // namespace wip
} // namespace gapi
} // namespace cv
#endif // HAVE_ONEVPL

View File

@ -0,0 +1,60 @@
// This file is part of OpenCV project.
// It is subject to the license terms in the LICENSE file found in the top-level directory
// of this distribution and at http://opencv.org/license.html.
//
// Copyright (C) 2021 Intel Corporation
#ifndef GAPI_STREAMING_ONVPL_ENGINE_DECODE_DECODE_SESSION_HPP
#define GAPI_STREAMING_ONVPL_ENGINE_DECODE_DECODE_SESSION_HPP
#include <stdio.h>
#include <memory>
#include <opencv2/gapi/streaming/meta.hpp>
#include "streaming/onevpl/engine/engine_session.hpp"
#include "streaming/onevpl/accelerators/accel_policy_interface.hpp"
#ifdef HAVE_ONEVPL
#if (MFX_VERSION >= 2000)
#include <vpl/mfxdispatcher.h>
#endif
#include <vpl/mfx.h>
namespace cv {
namespace gapi {
namespace wip {
namespace onevpl {
struct IDataProvider;
class Surface;
struct VPLAccelerationPolicy;
class LegacyDecodeSession : public EngineSession {
public:
friend class VPLLegacyDecodeEngine;
LegacyDecodeSession(mfxSession sess, DecoderParams&& decoder_param, std::shared_ptr<IDataProvider> provider);
~LegacyDecodeSession();
using EngineSession::EngineSession;
void swap_surface(VPLLegacyDecodeEngine& engine);
void init_surface_pool(VPLAccelerationPolicy::pool_key_t key);
mfxVideoParam mfx_decoder_param;
std::shared_ptr<IDataProvider> data_provider;
Data::Meta generate_frame_meta();
private:
VPLAccelerationPolicy::pool_key_t decoder_pool_id;
mfxFrameAllocRequest request;
std::weak_ptr<Surface> procesing_surface_ptr;
mfxFrameSurface1* output_surface_ptr;
int64_t decoded_frames_count;
};
} // namespace onevpl
} // namespace wip
} // namespace gapi
} // namespace cv
#endif // HAVE_ONEVPL
#endif // GAPI_STREAMING_ONVPL_ENGINE_DECODE_DECODE_SESSION_HPP

View File

@ -0,0 +1,33 @@
// This file is part of OpenCV project.
// It is subject to the license terms in the LICENSE file found in the top-level directory
// of this distribution and at http://opencv.org/license.html.
//
// Copyright (C) 2021 Intel Corporation
#ifdef HAVE_ONEVPL
#include "streaming/onevpl/engine/engine_session.hpp"
#include "streaming/onevpl/utils.hpp"
#include "logger.hpp"
namespace cv {
namespace gapi {
namespace wip {
namespace onevpl {
EngineSession::EngineSession(mfxSession sess, mfxBitstream&& str) :
session(sess), stream(std::move(str)) {}
EngineSession::~EngineSession()
{
GAPI_LOG_INFO(nullptr, "Close session: " << session);
MFXClose(session);
}
std::string EngineSession::error_code_to_str() const
{
return mfxstatus_to_string(last_status);
}
} // namespace onevpl
} // namespace wip
} // namespace gapi
} // namespace cv
#endif // HAVE_ONEVPL

View File

@ -0,0 +1,49 @@
// This file is part of OpenCV project.
// It is subject to the license terms in the LICENSE file found in the top-level directory
// of this distribution and at http://opencv.org/license.html.
//
// Copyright (C) 2021 Intel Corporation
#ifndef GAPI_STREAMING_ONEVPL_ENGINE_ENGINE_SESSION_HPP
#define GAPI_STREAMING_ONEVPL_ENGINE_ENGINE_SESSION_HPP
#include <functional>
#include <map>
#include <memory>
#include <string>
#include <utility>
#include <vector>
#include "opencv2/gapi/own/exports.hpp" // GAPI_EXPORTS
#ifdef HAVE_ONEVPL
#include <vpl/mfxvideo.h>
namespace cv {
namespace gapi {
namespace wip {
namespace onevpl {
// GAPI_EXPORTS for tests
struct GAPI_EXPORTS DecoderParams {
mfxBitstream stream;
mfxVideoParam param;
};
struct GAPI_EXPORTS EngineSession {
mfxSession session;
mfxBitstream stream;
mfxSyncPoint sync;
mfxStatus last_status;
EngineSession(mfxSession sess, mfxBitstream&& str);
std::string error_code_to_str() const;
virtual ~EngineSession();
};
} // namespace onevpl
} // namespace wip
} // namespace gapi
} // namespace cv
#endif // HAVE_ONEVPL
#endif // GAPI_STREAMING_ONEVPL_ENGINE_ENGINE_SESSION_HPP

View File

@ -0,0 +1,134 @@
// This file is part of OpenCV project.
// It is subject to the license terms in the LICENSE file found in the top-level directory
// of this distribution and at http://opencv.org/license.html.
//
// Copyright (C) 2021 Intel Corporation
#ifdef HAVE_ONEVPL
#include <algorithm>
#include <opencv2/gapi/streaming/onevpl/data_provider_interface.hpp>
#include "streaming/onevpl/engine/processing_engine_base.hpp"
#include "streaming/onevpl/accelerators/accel_policy_interface.hpp"
#include "logger.hpp"
namespace cv {
namespace gapi {
namespace wip {
namespace onevpl {
ProcessingEngineBase::ProcessingEngineBase(std::unique_ptr<VPLAccelerationPolicy>&& accel) :
acceleration_policy(std::move(accel)) {
}
ProcessingEngineBase::~ProcessingEngineBase() {
GAPI_LOG_INFO(nullptr, "destroyed");
}
ProcessingEngineBase::ExecutionStatus ProcessingEngineBase::process(mfxSession session) {
auto sess_it = sessions.find(session);
if (sess_it == sessions.end()) {
return ExecutionStatus::SessionNotFound;
}
session_ptr processing_session = sess_it->second;
ExecutionData& exec_data = execution_table[session];
GAPI_LOG_DEBUG(nullptr, "[" << session <<"] start op id: " << exec_data.op_id);
ExecutionStatus status = execute_op(pipeline.at(exec_data.op_id), *processing_session);
size_t old_op_id = exec_data.op_id++;
if (exec_data.op_id == pipeline.size())
{
exec_data.op_id = 0;
}
GAPI_LOG_DEBUG(nullptr, "[" << session <<"] finish op id: " << old_op_id <<
", " << processing_session->error_code_to_str() <<
", " << ProcessingEngineBase::status_to_string(status) <<
", next op id: " << exec_data.op_id);
if (status == ExecutionStatus::Failed) {
GAPI_LOG_WARNING(nullptr, "Operation for session: " << session <<
", " << ProcessingEngineBase::status_to_string(status) <<
" - remove it");
sessions.erase(sess_it);
execution_table.erase(session);
}
if (status == ExecutionStatus::Processed) {
sessions.erase(sess_it);
execution_table.erase(session);
}
return status;
}
const char* ProcessingEngineBase::status_to_string(ExecutionStatus status)
{
switch(status) {
case ExecutionStatus::Continue: return "CONTINUE";
case ExecutionStatus::Processed: return "PROCESSED";
case ExecutionStatus::SessionNotFound: return "NOT_FOUND_SESSION";
case ExecutionStatus::Failed: return "FAILED";
default:
return "UNKNOWN";
}
}
ProcessingEngineBase::ExecutionStatus ProcessingEngineBase::execute_op(operation_t& op, EngineSession& sess)
{
return op(sess);
}
size_t ProcessingEngineBase::get_ready_frames_count() const
{
return ready_frames.size();
}
void ProcessingEngineBase::get_frame(Data &data)
{
data = ready_frames.front();
ready_frames.pop();
}
const VPLAccelerationPolicy* ProcessingEngineBase::get_accel() const {
return acceleration_policy.get();
}
VPLAccelerationPolicy* ProcessingEngineBase::get_accel() {
return const_cast<VPLAccelerationPolicy*>(static_cast<const ProcessingEngineBase*>(this)->get_accel());
}
// Read encoded stream from file
mfxStatus ReadEncodedStream(mfxBitstream &bs, std::shared_ptr<IDataProvider>& data_provider) {
if (!data_provider) {
return MFX_ERR_MORE_DATA;
}
mfxU8 *p0 = bs.Data;
mfxU8 *p1 = bs.Data + bs.DataOffset;
if (bs.DataOffset > bs.MaxLength - 1) {
return MFX_ERR_NOT_ENOUGH_BUFFER;
}
if (bs.DataLength + bs.DataOffset > bs.MaxLength) {
return MFX_ERR_NOT_ENOUGH_BUFFER;
}
std::copy_n(p0, bs.DataLength, p1);
bs.DataOffset = 0;
bs.DataLength += static_cast<mfxU32>(data_provider->fetch_data(bs.MaxLength - bs.DataLength,
bs.Data + bs.DataLength));
if (bs.DataLength == 0)
return MFX_ERR_MORE_DATA;
return MFX_ERR_NONE;
}
} // namespace onevpl
} // namespace wip
} // namespace gapi
} // namespace cv
#endif // HAVE_ONEVPL

View File

@ -0,0 +1,96 @@
// This file is part of OpenCV project.
// It is subject to the license terms in the LICENSE file found in the top-level directory
// of this distribution and at http://opencv.org/license.html.
//
// Copyright (C) 2021 Intel Corporation
#ifndef GAPI_STREAMING_ONEVPL_ENGINE_PROCESSING_ENGINE_BASE_HPP
#define GAPI_STREAMING_ONEVPL_ENGINE_PROCESSING_ENGINE_BASE_HPP
#include <queue>
#include "streaming/onevpl/engine/engine_session.hpp"
#include "opencv2/gapi/own/exports.hpp" // GAPI_EXPORTS
namespace cv {
namespace gapi {
namespace wip {
namespace onevpl {
struct VPLAccelerationPolicy;
struct IDataProvider;
// GAPI_EXPORTS for tests
class GAPI_EXPORTS ProcessingEngineBase {
public:
enum class ExecutionStatus {
Continue,
Processed,
SessionNotFound,
Failed
};
struct ExecutionData {
size_t op_id = 0;
};
using file_ptr = std::unique_ptr<FILE, decltype(&fclose)>;
using session_ptr = std::shared_ptr<EngineSession>;
using SessionsTable = std::map<mfxSession, session_ptr>;
using ExecutionDataTable = std::map<mfxSession, ExecutionData>;
using frame_t = cv::gapi::wip::Data;
using frames_container_t = std::queue<frame_t>;
using operation_t = std::function<ExecutionStatus(EngineSession&)>;
static const char * status_to_string(ExecutionStatus);
ProcessingEngineBase(std::unique_ptr<VPLAccelerationPolicy>&& accel);
virtual ~ProcessingEngineBase();
virtual void initialize_session(mfxSession mfx_session,
DecoderParams&& decoder_param,
std::shared_ptr<IDataProvider> provider) = 0;
ExecutionStatus process(mfxSession session);
size_t get_ready_frames_count() const;
void get_frame(Data &data);
const VPLAccelerationPolicy* get_accel() const;
VPLAccelerationPolicy* get_accel();
protected:
SessionsTable sessions;
frames_container_t ready_frames;
ExecutionDataTable execution_table;
std::vector<operation_t> pipeline;
std::unique_ptr<VPLAccelerationPolicy> acceleration_policy;
virtual ExecutionStatus execute_op(operation_t& op, EngineSession& sess);
template<class ...Ops>
void create_pipeline(Ops&&...ops)
{
GAPI_DbgAssert(pipeline.empty() && "Pipeline must be empty");
std::vector<operation_t>({std::forward<Ops>(ops)...}).swap(pipeline);
}
template<class SpecificSession, class ...SessionArgs>
std::shared_ptr<SpecificSession> register_session(mfxSession key,
SessionArgs&& ...args)
{
auto sess_impl = std::make_shared<SpecificSession>(key,
std::forward<SessionArgs>(args)...);
sessions.emplace(key, sess_impl);
execution_table.emplace(key, ExecutionData{});
return sess_impl;
}
};
mfxStatus ReadEncodedStream(mfxBitstream &bs, std::shared_ptr<IDataProvider>& data_provider);
} // namespace onevpl
} // namespace wip
} // namespace gapi
} // namespace cv
#endif // GAPI_STREAMING_ONEVPL_ENGINE_PROCESSING_ENGINE_BASE_HPP

View File

@ -0,0 +1,38 @@
// This file is part of OpenCV project.
// It is subject to the license terms in the LICENSE file found in the top-level directory
// of this distribution and at http://opencv.org/license.html.
//
// Copyright (C) 2021 Intel Corporation
#ifndef GAPI_STREAMING_ONEVPL_ONEVPL_UTILS_HPP
#define GAPI_STREAMING_ONEVPL_ONEVPL_UTILS_HPP
#ifdef HAVE_ONEVPL
#if (MFX_VERSION >= 2000)
#include <vpl/mfxdispatcher.h>
#endif // MFX_VERSION
#include <vpl/mfx.h>
#include <vpl/mfxvideo.h>
#include <map>
#include <string>
#include <opencv2/gapi/streaming/onevpl/cfg_params.hpp>
namespace cv {
namespace gapi {
namespace wip {
namespace onevpl {
inline std::string mfxstatus_to_string(mfxStatus) {
return "UNKNOWN";
}
} // namespace onevpl
} // namespace wip
} // namespace gapi
} // namespace cv
#endif // HAVE_ONEVPL
#endif // GAPI_STREAMING_ONEVPL_ONEVPL_UTILS_HPP

View File

@ -12,6 +12,7 @@
#include <chrono>
#include <future>
#include <opencv2/gapi/media.hpp>
#include <opencv2/gapi/cpu/core.hpp>
#include <opencv2/gapi/cpu/imgproc.hpp>
@ -29,23 +30,88 @@
#include <opencv2/gapi/streaming/onevpl/source.hpp>
#ifdef HAVE_ONEVPL
#include <opencv2/gapi/streaming/onevpl/data_provider_interface.hpp>
#include "streaming/onevpl/accelerators/surface/surface.hpp"
#include "streaming/onevpl/accelerators/surface/cpu_frame_adapter.hpp"
#include "streaming/onevpl/accelerators/accel_policy_cpu.hpp"
#include "streaming/onevpl/engine/processing_engine_base.hpp"
#include "streaming/onevpl/engine/engine_session.hpp"
namespace opencv_test
{
namespace
{
cv::gapi::wip::surface_ptr_t create_test_surface(std::shared_ptr<void> out_buf_ptr,
struct EmptyDataProvider : public cv::gapi::wip::onevpl::IDataProvider {
size_t fetch_data(size_t, void*) override {
return 0;
}
bool empty() const override {
return true;
}
};
struct TestProcessingSession : public cv::gapi::wip::onevpl::EngineSession {
TestProcessingSession(mfxSession mfx_session) :
EngineSession(mfx_session, {}) {
}
};
struct TestProcessingEngine: public cv::gapi::wip::onevpl::ProcessingEngineBase {
size_t pipeline_stage_num = 0;
TestProcessingEngine(std::unique_ptr<cv::gapi::wip::onevpl::VPLAccelerationPolicy>&& accel) :
cv::gapi::wip::onevpl::ProcessingEngineBase(std::move(accel)) {
using cv::gapi::wip::onevpl::EngineSession;
create_pipeline(
// 0)
[this] (EngineSession&) -> ExecutionStatus
{
pipeline_stage_num = 0;
return ExecutionStatus::Continue;
},
// 1)
[this] (EngineSession&) -> ExecutionStatus
{
pipeline_stage_num = 1;
return ExecutionStatus::Continue;
},
// 2)
[this] (EngineSession&) -> ExecutionStatus
{
pipeline_stage_num = 2;
return ExecutionStatus::Continue;
},
// 3)
[this] (EngineSession&) -> ExecutionStatus
{
pipeline_stage_num = 3;
ready_frames.emplace(cv::MediaFrame());
return ExecutionStatus::Processed;
}
);
}
void initialize_session(mfxSession mfx_session,
cv::gapi::wip::onevpl::DecoderParams&&,
std::shared_ptr<cv::gapi::wip::onevpl::IDataProvider>) override {
register_session<TestProcessingSession>(mfx_session);
}
};
cv::gapi::wip::onevpl::surface_ptr_t create_test_surface(std::shared_ptr<void> out_buf_ptr,
size_t, size_t) {
std::unique_ptr<mfxFrameSurface1> handle(new mfxFrameSurface1{});
return cv::gapi::wip::Surface::create_surface(std::move(handle), out_buf_ptr);
return cv::gapi::wip::onevpl::Surface::create_surface(std::move(handle), out_buf_ptr);
}
TEST(OneVPL_Source_Surface, InitSurface)
{
using namespace cv::gapi::wip;
using namespace cv::gapi::wip::onevpl;
// create raw MFX handle
std::unique_ptr<mfxFrameSurface1> handle(new mfxFrameSurface1{});
@ -67,7 +133,7 @@ TEST(OneVPL_Source_Surface, InitSurface)
TEST(OneVPL_Source_Surface, ConcurrentLock)
{
using namespace cv::gapi::wip;
using namespace cv::gapi::wip::onevpl;
// create raw MFX handle
std::unique_ptr<mfxFrameSurface1> handle(new mfxFrameSurface1{});
@ -107,7 +173,7 @@ TEST(OneVPL_Source_Surface, ConcurrentLock)
TEST(OneVPL_Source_Surface, MemoryLifeTime)
{
using namespace cv::gapi::wip;
using namespace cv::gapi::wip::onevpl;
// create preallocate surface memory
std::unique_ptr<char> preallocated_memory_ptr(new char);
@ -170,7 +236,7 @@ TEST(OneVPL_Source_Surface, MemoryLifeTime)
TEST(OneVPL_Source_CPU_FrameAdapter, InitFrameAdapter)
{
using namespace cv::gapi::wip;
using namespace cv::gapi::wip::onevpl;
// create raw MFX handle
std::unique_ptr<mfxFrameSurface1> handle(new mfxFrameSurface1{});
@ -191,8 +257,8 @@ TEST(OneVPL_Source_CPU_FrameAdapter, InitFrameAdapter)
TEST(OneVPL_Source_CPU_Accelerator, InitDestroy)
{
using cv::gapi::wip::VPLCPUAccelerationPolicy;
using cv::gapi::wip::VPLAccelerationPolicy;
using cv::gapi::wip::onevpl::VPLCPUAccelerationPolicy;
using cv::gapi::wip::onevpl::VPLAccelerationPolicy;
auto acceleration_policy = std::make_shared<VPLCPUAccelerationPolicy>();
@ -221,9 +287,9 @@ TEST(OneVPL_Source_CPU_Accelerator, InitDestroy)
TEST(OneVPL_Source_CPU_Accelerator, PoolProduceConsume)
{
using cv::gapi::wip::VPLCPUAccelerationPolicy;
using cv::gapi::wip::VPLAccelerationPolicy;
using cv::gapi::wip::Surface;
using cv::gapi::wip::onevpl::VPLCPUAccelerationPolicy;
using cv::gapi::wip::onevpl::VPLAccelerationPolicy;
using cv::gapi::wip::onevpl::Surface;
auto acceleration_policy = std::make_shared<VPLCPUAccelerationPolicy>();
@ -277,9 +343,9 @@ TEST(OneVPL_Source_CPU_Accelerator, PoolProduceConsume)
TEST(OneVPL_Source_CPU_Accelerator, PoolProduceConcurrentConsume)
{
using cv::gapi::wip::VPLCPUAccelerationPolicy;
using cv::gapi::wip::VPLAccelerationPolicy;
using cv::gapi::wip::Surface;
using cv::gapi::wip::onevpl::VPLCPUAccelerationPolicy;
using cv::gapi::wip::onevpl::VPLAccelerationPolicy;
using cv::gapi::wip::onevpl::Surface;
auto acceleration_policy = std::make_shared<VPLCPUAccelerationPolicy>();
@ -339,6 +405,42 @@ TEST(OneVPL_Source_CPU_Accelerator, PoolProduceConcurrentConsume)
worker_thread.join();
EXPECT_TRUE(free_surface_count >= free_surface_count_prev);
}
TEST(OneVPL_Source_ProcessingEngine, Init)
{
using namespace cv::gapi::wip::onevpl;
std::unique_ptr<VPLAccelerationPolicy> accel;
TestProcessingEngine engine(std::move(accel));
mfxSession mfx_session{};
engine.initialize_session(mfx_session, DecoderParams{}, std::shared_ptr<IDataProvider>{});
EXPECT_EQ(engine.get_ready_frames_count(), 0);
ProcessingEngineBase::ExecutionStatus ret = engine.process(mfx_session);
EXPECT_EQ(ret, ProcessingEngineBase::ExecutionStatus::Continue);
EXPECT_EQ(engine.pipeline_stage_num, 0);
ret = engine.process(mfx_session);
EXPECT_EQ(ret, ProcessingEngineBase::ExecutionStatus::Continue);
EXPECT_EQ(engine.pipeline_stage_num, 1);
ret = engine.process(mfx_session);
EXPECT_EQ(ret, ProcessingEngineBase::ExecutionStatus::Continue);
EXPECT_EQ(engine.pipeline_stage_num, 2);
ret = engine.process(mfx_session);
EXPECT_EQ(ret, ProcessingEngineBase::ExecutionStatus::Processed);
EXPECT_EQ(engine.pipeline_stage_num, 3);
EXPECT_EQ(engine.get_ready_frames_count(), 1);
ret = engine.process(mfx_session);
EXPECT_EQ(ret, ProcessingEngineBase::ExecutionStatus::SessionNotFound);
EXPECT_EQ(engine.pipeline_stage_num, 3);
EXPECT_EQ(engine.get_ready_frames_count(), 1);
cv::gapi::wip::Data frame;
engine.get_frame(frame);
}
}
} // namespace opencv_test
#endif // HAVE_ONEVPL