#!/usr/bin/env python import numpy as np import cv2 as cv import os from tests_common import NewOpenCVTests class test_gapi_streaming(NewOpenCVTests): def test_image_input(self): sz = (1280, 720) in_mat = np.random.randint(0, 100, sz).astype(np.uint8) # OpenCV expected = cv.medianBlur(in_mat, 3) # G-API g_in = cv.GMat() g_out = cv.gapi.medianBlur(g_in, 3) c = cv.GComputation(g_in, g_out) ccomp = c.compileStreaming(cv.descr_of(cv.gin(in_mat))) ccomp.setSource(cv.gin(in_mat)) ccomp.start() _, actual = ccomp.pull() # Assert self.assertEqual(0.0, cv.norm(expected, actual, cv.NORM_INF)) def test_video_input(self): ksize = 3 path = self.find_file('cv/video/768x576.avi', [os.environ['OPENCV_TEST_DATA_PATH']]) # OpenCV cap = cv.VideoCapture(path) # G-API g_in = cv.GMat() g_out = cv.gapi.medianBlur(g_in, ksize) c = cv.GComputation(g_in, g_out) ccomp = c.compileStreaming() source = cv.gapi.wip.make_capture_src(path) ccomp.setSource(source) ccomp.start() # Assert max_num_frames = 10 proc_num_frames = 0 while cap.isOpened(): has_expected, expected = cap.read() has_actual, actual = ccomp.pull() self.assertEqual(has_expected, has_actual) if not has_actual: break self.assertEqual(0.0, cv.norm(cv.medianBlur(expected, ksize), actual, cv.NORM_INF)) proc_num_frames += 1 if proc_num_frames == max_num_frames: break; def test_video_split3(self): path = self.find_file('cv/video/768x576.avi', [os.environ['OPENCV_TEST_DATA_PATH']]) # OpenCV cap = cv.VideoCapture(path) # G-API g_in = cv.GMat() b, g, r = cv.gapi.split3(g_in) c = cv.GComputation(cv.GIn(g_in), cv.GOut(b, g, r)) ccomp = c.compileStreaming() source = cv.gapi.wip.make_capture_src(path) ccomp.setSource(source) ccomp.start() # Assert max_num_frames = 10 proc_num_frames = 0 while cap.isOpened(): has_expected, frame = cap.read() has_actual, actual = ccomp.pull() self.assertEqual(has_expected, has_actual) if not has_actual: break expected = cv.split(frame) for e, a in zip(expected, actual): self.assertEqual(0.0, cv.norm(e, a, cv.NORM_INF)) proc_num_frames += 1 if proc_num_frames == max_num_frames: break; def test_video_add(self): sz = (576, 768, 3) in_mat = np.random.randint(0, 100, sz).astype(np.uint8) path = self.find_file('cv/video/768x576.avi', [os.environ['OPENCV_TEST_DATA_PATH']]) # OpenCV cap = cv.VideoCapture(path) # G-API g_in1 = cv.GMat() g_in2 = cv.GMat() out = cv.gapi.add(g_in1, g_in2) c = cv.GComputation(cv.GIn(g_in1, g_in2), cv.GOut(out)) ccomp = c.compileStreaming() source = cv.gapi.wip.make_capture_src(path) ccomp.setSource(cv.gin(source, in_mat)) ccomp.start() # Assert max_num_frames = 10 proc_num_frames = 0 while cap.isOpened(): has_expected, frame = cap.read() has_actual, actual = ccomp.pull() self.assertEqual(has_expected, has_actual) if not has_actual: break expected = cv.add(frame, in_mat) self.assertEqual(0.0, cv.norm(expected, actual, cv.NORM_INF)) proc_num_frames += 1 if proc_num_frames == max_num_frames: break; def test_video_good_features_to_track(self): path = self.find_file('cv/video/768x576.avi', [os.environ['OPENCV_TEST_DATA_PATH']]) # NB: goodFeaturesToTrack configuration max_corners = 50 quality_lvl = 0.01 min_distance = 10 block_sz = 3 use_harris_detector = True k = 0.04 mask = None # OpenCV cap = cv.VideoCapture(path) # G-API g_in = cv.GMat() g_gray = cv.gapi.RGB2Gray(g_in) g_out = cv.gapi.goodFeaturesToTrack(g_gray, max_corners, quality_lvl, min_distance, mask, block_sz, use_harris_detector, k) c = cv.GComputation(cv.GIn(g_in), cv.GOut(g_out)) ccomp = c.compileStreaming() source = cv.gapi.wip.make_capture_src(path) ccomp.setSource(source) ccomp.start() # Assert max_num_frames = 10 proc_num_frames = 0 while cap.isOpened(): has_expected, frame = cap.read() has_actual, actual = ccomp.pull() self.assertEqual(has_expected, has_actual) if not has_actual: break # OpenCV frame = cv.cvtColor(frame, cv.COLOR_RGB2GRAY) expected = cv.goodFeaturesToTrack(frame, max_corners, quality_lvl, min_distance, mask=mask, blockSize=block_sz, useHarrisDetector=use_harris_detector, k=k) for e, a in zip(expected, actual): # NB: OpenCV & G-API have different output shapes: # OpenCV - (num_points, 1, 2) # G-API - (num_points, 2) self.assertEqual(0.0, cv.norm(e.flatten(), a.flatten(), cv.NORM_INF)) proc_num_frames += 1 if proc_num_frames == max_num_frames: break; if __name__ == '__main__': NewOpenCVTests.bootstrap()