mirror of
https://github.com/opencv/opencv.git
synced 2025-01-18 22:44:02 +08:00
fc5d412ba7
G-API: Integration branch for ONNX & Python-related changes #23597 # Changes overview ## 1. Expose ONNX backend's Normalization and Mean-value parameters in Python * Since Python G-API bindings rely on `Generic` infer to express Inference, the `Generic` specialization of `onnx::Params` was extended with new methods to control normalization (`/255`) and mean-value; these methods were exposed in the Python bindings * Found some questionable parts in the existing API which I'd like to review/discuss (see comments) UPD: 1. Thanks to @TolyaTalamanov normalization inconsistencies have been identified with `squeezenet1.0-9` ONNX model itself; tests using these model were updated to DISABLE normalization and NOT using mean/value. 2. Questionable parts were removed and tests still pass. ### Details (taken from @TolyaTalamanov's comment): `squeezenet1.0.*onnx` - doesn't require scaling to [0,1] and mean/std because the weights of the first convolution already scaled. ONNX documentation is broken. So the correct approach to use this models is: 1. ONNX: apply preprocessing from the documentation: https://github.com/onnx/models/blob/main/vision/classification/imagenet_preprocess.py#L8-L44 but without normalization step: ``` # DON'T DO IT: # mean_vec = np.array([0.485, 0.456, 0.406]) # stddev_vec = np.array([0.229, 0.224, 0.225]) # norm_img_data = np.zeros(img_data.shape).astype('float32') # for i in range(img_data.shape[0]): # norm_img_data[i,:,:] = (img_data[i,:,:]/255 - mean_vec[i]) / stddev_vec[i] # # add batch channel # norm_img_data = norm_img_data.reshape(1, 3, 224, 224).astype('float32') # return norm_img_data # INSTEAD return img_data.reshape(1, 3, 224, 224) ``` 2. G-API: Convert image from BGR to RGB and then pass to `apply` as-is with configuring parameters: ``` net = cv.gapi.onnx.params('squeezenet', model_filename) net.cfgNormalize('data_0', False) ``` **Note**: Results might be difference because `G-API` doesn't apply central crop but just do resize to model resolution. --- `squeezenet1.1.*onnx` - requires scaling to [0,1] and mean/std - onnx documentation is correct. 1. ONNX: apply preprocessing from the documentation: https://github.com/onnx/models/blob/main/vision/classification/imagenet_preprocess.py#L8-L44 2. G-API: Convert image from BGR to RGB and then pass to `apply` as-is with configuring parameters: ``` net = cv.gapi.onnx.params('squeezenet', model_filename) net.cfgNormalize('data_0', True) // default net.cfgMeanStd('data_0', [0.485, 0.456, 0.406], [0.229, 0.224, 0.225]) ``` **Note**: Results might be difference because `G-API` doesn't apply central crop but just do resize to model resolution. ## 2. Expose Fluid & kernel package-related functionality in Python * `cv::gapi::combine()` * `cv::GKernelPackage::size()` (mainly for testing purposes) * `cv::gapi::imgproc::fluid::kernels()` Added a test for the above. ## 3. Fixed issues with Python stateful kernel handling Fixed error message when `outMeta()` of custom python operation fails. ## 4. Fixed various issues in Python tests 1. `test_gapi_streaming.py` - fixed behavior of Desync test to avoid sporadic issues 2. `test_gapi_infer_onnx.py` - fixed model lookup (it was still using the ONNX Zoo layout but was NOT using the proper env var we use to point to one). ### Pull Request Readiness Checklist See details at https://github.com/opencv/opencv/wiki/How_to_contribute#making-a-good-pull-request - [x] I agree to contribute to the project under Apache 2 License. - [x] To the best of my knowledge, the proposed patch is not based on a code under GPL or another license that is incompatible with OpenCV - [x] The PR is proposed to the proper branch - [x] There is a reference to the original bug report and related work - [x] There is accuracy test, performance test and test data in opencv_extra repository, if applicable Patch to opencv_extra has the same branch name. - [x] The feature is well documented and sample code can be built with the project CMake
560 lines
19 KiB
Python
560 lines
19 KiB
Python
#!/usr/bin/env python
|
|
|
|
import numpy as np
|
|
import cv2 as cv
|
|
import os
|
|
import sys
|
|
import unittest
|
|
import time
|
|
|
|
from tests_common import NewOpenCVTests
|
|
|
|
|
|
try:
|
|
if sys.version_info[:2] < (3, 0):
|
|
raise unittest.SkipTest('Python 2.x is not supported')
|
|
|
|
|
|
@cv.gapi.op('custom.delay', in_types=[cv.GMat], out_types=[cv.GMat])
|
|
class GDelay:
|
|
"""Delay for 50 ms."""
|
|
|
|
@staticmethod
|
|
def outMeta(desc):
|
|
return desc
|
|
|
|
|
|
@cv.gapi.kernel(GDelay)
|
|
class GDelayImpl:
|
|
"""Implementation for GDelay operation."""
|
|
|
|
@staticmethod
|
|
def run(img):
|
|
time.sleep(0.05)
|
|
return img
|
|
|
|
|
|
def convertNV12p2BGR(in_nv12):
|
|
shape = in_nv12.shape
|
|
y_height = shape[0] // 3 * 2
|
|
uv_shape = (shape[0] // 3, shape[1])
|
|
new_uv_shape = (uv_shape[0], uv_shape[1] // 2, 2)
|
|
return cv.cvtColorTwoPlane(in_nv12[:y_height, :],
|
|
in_nv12[ y_height:, :].reshape(new_uv_shape),
|
|
cv.COLOR_YUV2BGR_NV12)
|
|
|
|
|
|
class test_gapi_streaming(NewOpenCVTests):
|
|
|
|
def test_image_input(self):
|
|
sz = (1280, 720)
|
|
in_mat = np.random.randint(0, 100, sz).astype(np.uint8)
|
|
|
|
# OpenCV
|
|
expected = cv.medianBlur(in_mat, 3)
|
|
|
|
# G-API
|
|
g_in = cv.GMat()
|
|
g_out = cv.gapi.medianBlur(g_in, 3)
|
|
c = cv.GComputation(g_in, g_out)
|
|
ccomp = c.compileStreaming(cv.gapi.descr_of(in_mat))
|
|
ccomp.setSource(cv.gin(in_mat))
|
|
ccomp.start()
|
|
|
|
_, actual = ccomp.pull()
|
|
|
|
# Assert
|
|
self.assertEqual(0.0, cv.norm(expected, actual, cv.NORM_INF))
|
|
|
|
|
|
def test_video_input(self):
|
|
ksize = 3
|
|
path = self.find_file('cv/video/768x576.avi', [os.environ['OPENCV_TEST_DATA_PATH']])
|
|
|
|
# OpenCV
|
|
cap = cv.VideoCapture(path)
|
|
|
|
# G-API
|
|
g_in = cv.GMat()
|
|
g_out = cv.gapi.medianBlur(g_in, ksize)
|
|
c = cv.GComputation(g_in, g_out)
|
|
|
|
ccomp = c.compileStreaming()
|
|
source = cv.gapi.wip.make_capture_src(path)
|
|
ccomp.setSource(cv.gin(source))
|
|
ccomp.start()
|
|
|
|
# Assert
|
|
max_num_frames = 10
|
|
proc_num_frames = 0
|
|
while cap.isOpened():
|
|
has_expected, expected = cap.read()
|
|
has_actual, actual = ccomp.pull()
|
|
|
|
self.assertEqual(has_expected, has_actual)
|
|
|
|
if not has_actual:
|
|
break
|
|
|
|
self.assertEqual(0.0, cv.norm(cv.medianBlur(expected, ksize), actual, cv.NORM_INF))
|
|
|
|
proc_num_frames += 1
|
|
if proc_num_frames == max_num_frames:
|
|
break
|
|
|
|
|
|
def test_video_split3(self):
|
|
path = self.find_file('cv/video/768x576.avi', [os.environ['OPENCV_TEST_DATA_PATH']])
|
|
|
|
# OpenCV
|
|
cap = cv.VideoCapture(path)
|
|
|
|
# G-API
|
|
g_in = cv.GMat()
|
|
b, g, r = cv.gapi.split3(g_in)
|
|
c = cv.GComputation(cv.GIn(g_in), cv.GOut(b, g, r))
|
|
|
|
ccomp = c.compileStreaming()
|
|
source = cv.gapi.wip.make_capture_src(path)
|
|
ccomp.setSource(cv.gin(source))
|
|
ccomp.start()
|
|
|
|
# Assert
|
|
max_num_frames = 10
|
|
proc_num_frames = 0
|
|
while cap.isOpened():
|
|
has_expected, frame = cap.read()
|
|
has_actual, actual = ccomp.pull()
|
|
|
|
self.assertEqual(has_expected, has_actual)
|
|
|
|
if not has_actual:
|
|
break
|
|
|
|
expected = cv.split(frame)
|
|
for e, a in zip(expected, actual):
|
|
self.assertEqual(0.0, cv.norm(e, a, cv.NORM_INF))
|
|
|
|
proc_num_frames += 1
|
|
if proc_num_frames == max_num_frames:
|
|
break
|
|
|
|
|
|
def test_video_add(self):
|
|
sz = (576, 768, 3)
|
|
in_mat = np.random.randint(0, 100, sz).astype(np.uint8)
|
|
|
|
path = self.find_file('cv/video/768x576.avi', [os.environ['OPENCV_TEST_DATA_PATH']])
|
|
|
|
# OpenCV
|
|
cap = cv.VideoCapture(path)
|
|
|
|
# G-API
|
|
g_in1 = cv.GMat()
|
|
g_in2 = cv.GMat()
|
|
out = cv.gapi.add(g_in1, g_in2)
|
|
c = cv.GComputation(cv.GIn(g_in1, g_in2), cv.GOut(out))
|
|
|
|
ccomp = c.compileStreaming()
|
|
source = cv.gapi.wip.make_capture_src(path)
|
|
ccomp.setSource(cv.gin(source, in_mat))
|
|
ccomp.start()
|
|
|
|
# Assert
|
|
max_num_frames = 10
|
|
proc_num_frames = 0
|
|
while cap.isOpened():
|
|
has_expected, frame = cap.read()
|
|
has_actual, actual = ccomp.pull()
|
|
|
|
self.assertEqual(has_expected, has_actual)
|
|
|
|
if not has_actual:
|
|
break
|
|
|
|
expected = cv.add(frame, in_mat)
|
|
self.assertEqual(0.0, cv.norm(expected, actual, cv.NORM_INF))
|
|
|
|
proc_num_frames += 1
|
|
if proc_num_frames == max_num_frames:
|
|
break
|
|
|
|
|
|
def test_video_good_features_to_track(self):
|
|
path = self.find_file('cv/video/768x576.avi', [os.environ['OPENCV_TEST_DATA_PATH']])
|
|
|
|
# NB: goodFeaturesToTrack configuration
|
|
max_corners = 50
|
|
quality_lvl = 0.01
|
|
min_distance = 10
|
|
block_sz = 3
|
|
use_harris_detector = True
|
|
k = 0.04
|
|
mask = None
|
|
|
|
# OpenCV
|
|
cap = cv.VideoCapture(path)
|
|
|
|
# G-API
|
|
g_in = cv.GMat()
|
|
g_gray = cv.gapi.RGB2Gray(g_in)
|
|
g_out = cv.gapi.goodFeaturesToTrack(g_gray, max_corners, quality_lvl,
|
|
min_distance, mask, block_sz, use_harris_detector, k)
|
|
|
|
c = cv.GComputation(cv.GIn(g_in), cv.GOut(g_out))
|
|
|
|
ccomp = c.compileStreaming()
|
|
source = cv.gapi.wip.make_capture_src(path)
|
|
ccomp.setSource(cv.gin(source))
|
|
ccomp.start()
|
|
|
|
# Assert
|
|
max_num_frames = 10
|
|
proc_num_frames = 0
|
|
while cap.isOpened():
|
|
has_expected, frame = cap.read()
|
|
has_actual, actual = ccomp.pull()
|
|
|
|
self.assertEqual(has_expected, has_actual)
|
|
|
|
if not has_actual:
|
|
break
|
|
|
|
# OpenCV
|
|
frame = cv.cvtColor(frame, cv.COLOR_RGB2GRAY)
|
|
expected = cv.goodFeaturesToTrack(frame, max_corners, quality_lvl,
|
|
min_distance, mask=mask,
|
|
blockSize=block_sz, useHarrisDetector=use_harris_detector, k=k)
|
|
for e, a in zip(expected, actual):
|
|
# NB: OpenCV & G-API have different output shapes:
|
|
# OpenCV - (num_points, 1, 2)
|
|
# G-API - (num_points, 2)
|
|
self.assertEqual(0.0, cv.norm(e.flatten(),
|
|
np.array(a, np.float32).flatten(),
|
|
cv.NORM_INF))
|
|
|
|
proc_num_frames += 1
|
|
if proc_num_frames == max_num_frames:
|
|
break
|
|
|
|
|
|
def test_gapi_streaming_meta(self):
|
|
path = self.find_file('cv/video/768x576.avi', [os.environ['OPENCV_TEST_DATA_PATH']])
|
|
|
|
# G-API
|
|
g_in = cv.GMat()
|
|
g_ts = cv.gapi.streaming.timestamp(g_in)
|
|
g_seqno = cv.gapi.streaming.seqNo(g_in)
|
|
g_seqid = cv.gapi.streaming.seq_id(g_in)
|
|
|
|
c = cv.GComputation(cv.GIn(g_in), cv.GOut(g_ts, g_seqno, g_seqid))
|
|
|
|
ccomp = c.compileStreaming()
|
|
source = cv.gapi.wip.make_capture_src(path)
|
|
ccomp.setSource(cv.gin(source))
|
|
ccomp.start()
|
|
|
|
# Assert
|
|
max_num_frames = 10
|
|
curr_frame_number = 0
|
|
while True:
|
|
has_frame, (ts, seqno, seqid) = ccomp.pull()
|
|
|
|
if not has_frame:
|
|
break
|
|
|
|
self.assertEqual(curr_frame_number, seqno)
|
|
self.assertEqual(curr_frame_number, seqid)
|
|
|
|
curr_frame_number += 1
|
|
if curr_frame_number == max_num_frames:
|
|
break
|
|
|
|
|
|
def test_desync(self):
|
|
path = self.find_file('cv/video/768x576.avi', [os.environ['OPENCV_TEST_DATA_PATH']])
|
|
|
|
# G-API
|
|
g_in = cv.GMat()
|
|
g_out1 = cv.gapi.copy(g_in)
|
|
des = cv.gapi.streaming.desync(g_in)
|
|
g_out2 = GDelay.on(des)
|
|
|
|
c = cv.GComputation(cv.GIn(g_in), cv.GOut(g_out1, g_out2))
|
|
|
|
kernels = cv.gapi.kernels(GDelayImpl)
|
|
ccomp = c.compileStreaming(args=cv.gapi.compile_args(kernels))
|
|
source = cv.gapi.wip.make_capture_src(path)
|
|
ccomp.setSource(cv.gin(source))
|
|
ccomp.start()
|
|
|
|
# Assert
|
|
max_num_frames = 50
|
|
|
|
out_counter = 0
|
|
desync_out_counter = 0
|
|
none_counter = 0
|
|
while True:
|
|
has_frame, (out1, out2) = ccomp.pull()
|
|
if not has_frame:
|
|
break
|
|
|
|
if not out1 is None:
|
|
out_counter += 1
|
|
if not out2 is None:
|
|
desync_out_counter += 1
|
|
else:
|
|
none_counter += 1
|
|
|
|
if out_counter == max_num_frames:
|
|
ccomp.stop()
|
|
break
|
|
|
|
self.assertLess(0, out_counter)
|
|
self.assertLess(desync_out_counter, out_counter)
|
|
self.assertLess(0, none_counter)
|
|
|
|
|
|
def test_compile_streaming_empty(self):
|
|
g_in = cv.GMat()
|
|
comp = cv.GComputation(g_in, cv.gapi.medianBlur(g_in, 3))
|
|
comp.compileStreaming()
|
|
|
|
|
|
def test_compile_streaming_args(self):
|
|
g_in = cv.GMat()
|
|
comp = cv.GComputation(g_in, cv.gapi.medianBlur(g_in, 3))
|
|
comp.compileStreaming(cv.gapi.compile_args(cv.gapi.streaming.queue_capacity(1)))
|
|
|
|
|
|
def test_compile_streaming_descr_of(self):
|
|
g_in = cv.GMat()
|
|
comp = cv.GComputation(g_in, cv.gapi.medianBlur(g_in, 3))
|
|
img = np.zeros((3,300,300), dtype=np.float32)
|
|
comp.compileStreaming(cv.gapi.descr_of(img))
|
|
|
|
|
|
def test_compile_streaming_descr_of_and_args(self):
|
|
g_in = cv.GMat()
|
|
comp = cv.GComputation(g_in, cv.gapi.medianBlur(g_in, 3))
|
|
img = np.zeros((3,300,300), dtype=np.float32)
|
|
comp.compileStreaming(cv.gapi.descr_of(img),
|
|
cv.gapi.compile_args(cv.gapi.streaming.queue_capacity(1)))
|
|
|
|
|
|
def test_compile_streaming_meta(self):
|
|
g_in = cv.GMat()
|
|
comp = cv.GComputation(g_in, cv.gapi.medianBlur(g_in, 3))
|
|
img = np.zeros((3,300,300), dtype=np.float32)
|
|
comp.compileStreaming([cv.GMatDesc(cv.CV_8U, 3, (300, 300))])
|
|
|
|
|
|
def test_compile_streaming_meta_and_args(self):
|
|
g_in = cv.GMat()
|
|
comp = cv.GComputation(g_in, cv.gapi.medianBlur(g_in, 3))
|
|
img = np.zeros((3,300,300), dtype=np.float32)
|
|
comp.compileStreaming([cv.GMatDesc(cv.CV_8U, 3, (300, 300))],
|
|
cv.gapi.compile_args(cv.gapi.streaming.queue_capacity(1)))
|
|
|
|
|
|
def get_gst_source(self, gstpipeline):
|
|
# NB: Skip test in case gstreamer isn't available.
|
|
try:
|
|
return cv.gapi.wip.make_gst_src(gstpipeline)
|
|
except cv.error as e:
|
|
if str(e).find('Built without GStreamer support!') == -1:
|
|
raise e
|
|
else:
|
|
raise unittest.SkipTest(str(e))
|
|
|
|
|
|
def test_gst_source(self):
|
|
if not cv.videoio_registry.hasBackend(cv.CAP_GSTREAMER):
|
|
raise unittest.SkipTest("Backend is not available/disabled: GSTREAMER")
|
|
|
|
gstpipeline = """videotestsrc is-live=true pattern=colors num-buffers=10 !
|
|
videorate ! videoscale ! video/x-raw,width=1920,height=1080,
|
|
framerate=30/1 ! appsink"""
|
|
|
|
g_in = cv.GMat()
|
|
g_out = cv.gapi.copy(g_in)
|
|
c = cv.GComputation(cv.GIn(g_in), cv.GOut(g_out))
|
|
|
|
ccomp = c.compileStreaming()
|
|
|
|
source = self.get_gst_source(gstpipeline)
|
|
|
|
ccomp.setSource(cv.gin(source))
|
|
ccomp.start()
|
|
|
|
has_frame, output = ccomp.pull()
|
|
while has_frame:
|
|
self.assertTrue(output.size != 0)
|
|
has_frame, output = ccomp.pull()
|
|
|
|
|
|
def open_VideoCapture_gstreamer(self, gstpipeline):
|
|
try:
|
|
cap = cv.VideoCapture(gstpipeline, cv.CAP_GSTREAMER)
|
|
except Exception as e:
|
|
raise unittest.SkipTest("Backend GSTREAMER can't open the video; " +
|
|
"cause: " + str(e))
|
|
if not cap.isOpened():
|
|
raise unittest.SkipTest("Backend GSTREAMER can't open the video")
|
|
return cap
|
|
|
|
|
|
def test_gst_source_accuracy(self):
|
|
if not cv.videoio_registry.hasBackend(cv.CAP_GSTREAMER):
|
|
raise unittest.SkipTest("Backend is not available/disabled: GSTREAMER")
|
|
|
|
path = self.find_file('highgui/video/big_buck_bunny.avi',
|
|
[os.environ['OPENCV_TEST_DATA_PATH']])
|
|
gstpipeline = """filesrc location=""" + path + """ ! decodebin ! videoconvert !
|
|
videoscale ! video/x-raw,format=NV12 ! appsink"""
|
|
|
|
# G-API pipeline
|
|
g_in = cv.GMat()
|
|
g_out = cv.gapi.copy(g_in)
|
|
c = cv.GComputation(cv.GIn(g_in), cv.GOut(g_out))
|
|
|
|
ccomp = c.compileStreaming()
|
|
|
|
# G-API Gst-source
|
|
source = self.get_gst_source(gstpipeline)
|
|
ccomp.setSource(cv.gin(source))
|
|
ccomp.start()
|
|
|
|
# OpenCV Gst-source
|
|
cap = self.open_VideoCapture_gstreamer(gstpipeline)
|
|
|
|
# Assert
|
|
max_num_frames = 10
|
|
for _ in range(max_num_frames):
|
|
has_expected, expected = cap.read()
|
|
has_actual, actual = ccomp.pull()
|
|
|
|
self.assertEqual(has_expected, has_actual)
|
|
|
|
if not has_expected:
|
|
break
|
|
|
|
self.assertEqual(0.0, cv.norm(convertNV12p2BGR(expected), actual, cv.NORM_INF))
|
|
|
|
|
|
def get_gst_pipeline(self, gstpipeline):
|
|
# NB: Skip test in case gstreamer isn't available.
|
|
try:
|
|
return cv.gapi.wip.GStreamerPipeline(gstpipeline)
|
|
except cv.error as e:
|
|
if str(e).find('Built without GStreamer support!') == -1:
|
|
raise e
|
|
else:
|
|
raise unittest.SkipTest(str(e))
|
|
except SystemError as e:
|
|
raise unittest.SkipTest(str(e) + ", caused by " + str(e.__cause__))
|
|
|
|
|
|
def test_gst_multiple_sources(self):
|
|
if not cv.videoio_registry.hasBackend(cv.CAP_GSTREAMER):
|
|
raise unittest.SkipTest("Backend is not available/disabled: GSTREAMER")
|
|
|
|
gstpipeline = """videotestsrc is-live=true pattern=colors num-buffers=10 !
|
|
videorate ! videoscale !
|
|
video/x-raw,width=1920,height=1080,framerate=30/1 !
|
|
appsink name=sink1
|
|
videotestsrc is-live=true pattern=colors num-buffers=10 !
|
|
videorate ! videoscale !
|
|
video/x-raw,width=1920,height=1080,framerate=30/1 !
|
|
appsink name=sink2"""
|
|
|
|
g_in1 = cv.GMat()
|
|
g_in2 = cv.GMat()
|
|
g_out = cv.gapi.add(g_in1, g_in2)
|
|
c = cv.GComputation(cv.GIn(g_in1, g_in2), cv.GOut(g_out))
|
|
|
|
ccomp = c.compileStreaming()
|
|
|
|
pp = self.get_gst_pipeline(gstpipeline)
|
|
src1 = cv.gapi.wip.get_streaming_source(pp, "sink1")
|
|
src2 = cv.gapi.wip.get_streaming_source(pp, "sink2")
|
|
|
|
ccomp.setSource(cv.gin(src1, src2))
|
|
ccomp.start()
|
|
|
|
has_frame, out = ccomp.pull()
|
|
while has_frame:
|
|
self.assertTrue(out.size != 0)
|
|
has_frame, out = ccomp.pull()
|
|
|
|
|
|
def test_gst_multiple_sources_accuracy(self):
|
|
if not cv.videoio_registry.hasBackend(cv.CAP_GSTREAMER):
|
|
raise unittest.SkipTest("Backend is not available/disabled: GSTREAMER")
|
|
|
|
path = self.find_file('highgui/video/big_buck_bunny.avi',
|
|
[os.environ['OPENCV_TEST_DATA_PATH']])
|
|
gstpipeline1 = """filesrc location=""" + path + """ ! decodebin ! videoconvert !
|
|
videoscale ! video/x-raw,format=NV12 ! appsink"""
|
|
gstpipeline2 = """filesrc location=""" + path + """ ! decodebin !
|
|
videoflip method=clockwise ! videoconvert ! videoscale !
|
|
video/x-raw,format=NV12 ! appsink"""
|
|
gstpipeline_gapi = gstpipeline1 + ' name=sink1 ' + gstpipeline2 + ' name=sink2'
|
|
|
|
# G-API pipeline
|
|
g_in1 = cv.GMat()
|
|
g_in2 = cv.GMat()
|
|
g_out1 = cv.gapi.copy(g_in1)
|
|
g_out2 = cv.gapi.copy(g_in2)
|
|
c = cv.GComputation(cv.GIn(g_in1, g_in2), cv.GOut(g_out1, g_out2))
|
|
|
|
ccomp = c.compileStreaming()
|
|
|
|
# G-API Gst-source
|
|
pp = self.get_gst_pipeline(gstpipeline_gapi)
|
|
|
|
src1 = cv.gapi.wip.get_streaming_source(pp, "sink1")
|
|
src2 = cv.gapi.wip.get_streaming_source(pp, "sink2")
|
|
ccomp.setSource(cv.gin(src1, src2))
|
|
ccomp.start()
|
|
|
|
# OpenCV Gst-source
|
|
cap1 = self.open_VideoCapture_gstreamer(gstpipeline1)
|
|
cap2 = self.open_VideoCapture_gstreamer(gstpipeline2)
|
|
|
|
# Assert
|
|
max_num_frames = 10
|
|
for _ in range(max_num_frames):
|
|
has_expected1, expected1 = cap1.read()
|
|
has_expected2, expected2 = cap2.read()
|
|
has_actual, (actual1, actual2) = ccomp.pull()
|
|
|
|
self.assertEqual(has_expected1, has_expected2)
|
|
has_expected = has_expected1 and has_expected2
|
|
self.assertEqual(has_expected, has_actual)
|
|
|
|
if not has_expected:
|
|
break
|
|
|
|
self.assertEqual(0.0, cv.norm(convertNV12p2BGR(expected1), actual1, cv.NORM_INF))
|
|
self.assertEqual(0.0, cv.norm(convertNV12p2BGR(expected2), actual2, cv.NORM_INF))
|
|
|
|
|
|
|
|
except unittest.SkipTest as e:
|
|
|
|
message = str(e)
|
|
|
|
class TestSkip(unittest.TestCase):
|
|
def setUp(self):
|
|
self.skipTest('Skip tests: ' + message)
|
|
|
|
def test_skip():
|
|
pass
|
|
|
|
pass
|
|
|
|
|
|
if __name__ == '__main__':
|
|
NewOpenCVTests.bootstrap()
|