mirror of
https://github.com/opencv/opencv.git
synced 2025-06-23 20:21:40 +08:00

Open VideoCapture from data stream #25584 ### Pull Request Readiness Checklist Add VideoCapture option to read a raw binary video data from `std::streambuf`. There are multiple motivations: 1. Avoid disk file creation in case of video already in memory (received by network or from database). 2. Streaming mode. Frames decoding starts during sequential file transfer by chunks. Suppoted backends: * FFmpeg * MSMF (no streaming mode) Supporter interfaces: * C++ (std::streambuf) * Python (io.BufferedIOBase) resolves https://github.com/opencv/opencv/issues/24400 - [x] test h264 - [x] test IP camera like approach with no metadata but key frame only? - [x] C API plugin See details at https://github.com/opencv/opencv/wiki/How_to_contribute#making-a-good-pull-request - [x] I agree to contribute to the project under Apache 2 License. - [x] To the best of my knowledge, the proposed patch is not based on a code under GPL or another license that is incompatible with OpenCV - [x] The PR is proposed to the proper branch - [x] There is a reference to the original bug report and related work - [x] There is accuracy test, performance test and test data in opencv_extra repository, if applicable Patch to opencv_extra has the same branch name. - [x] The feature is well documented and sample code can be built with the project CMake
92 lines
3.1 KiB
Python
92 lines
3.1 KiB
Python
#!/usr/bin/env python
|
|
from __future__ import print_function
|
|
|
|
import numpy as np
|
|
import cv2 as cv
|
|
import io
|
|
import sys
|
|
|
|
from tests_common import NewOpenCVTests
|
|
|
|
class Bindings(NewOpenCVTests):
|
|
|
|
def check_name(self, name):
|
|
#print(name)
|
|
self.assertFalse(name == None)
|
|
self.assertFalse(name == "")
|
|
|
|
def test_registry(self):
|
|
self.check_name(cv.videoio_registry.getBackendName(cv.CAP_ANY));
|
|
self.check_name(cv.videoio_registry.getBackendName(cv.CAP_FFMPEG))
|
|
self.check_name(cv.videoio_registry.getBackendName(cv.CAP_OPENCV_MJPEG))
|
|
backends = cv.videoio_registry.getBackends()
|
|
for backend in backends:
|
|
self.check_name(cv.videoio_registry.getBackendName(backend))
|
|
|
|
def test_capture_stream_file(self):
|
|
if sys.version_info[0] < 3:
|
|
raise self.skipTest('Python 3.x required')
|
|
|
|
api_pref = None
|
|
for backend in cv.videoio_registry.getStreamBufferedBackends():
|
|
if not cv.videoio_registry.hasBackend(backend):
|
|
continue
|
|
if not cv.videoio_registry.isBackendBuiltIn(backend):
|
|
_, abi, api = cv.videoio_registry.getStreamBufferedBackendPluginVersion(backend)
|
|
if (abi < 1 or (abi == 1 and api < 2)):
|
|
continue
|
|
api_pref = backend
|
|
break
|
|
|
|
if not api_pref:
|
|
raise self.skipTest("No available backends")
|
|
|
|
with open(self.find_file("cv/video/768x576.avi"), "rb") as f:
|
|
cap = cv.VideoCapture(f, api_pref, [])
|
|
self.assertTrue(cap.isOpened())
|
|
hasFrame, frame = cap.read()
|
|
self.assertTrue(hasFrame)
|
|
self.assertEqual(frame.shape, (576, 768, 3))
|
|
|
|
def test_capture_stream_buffer(self):
|
|
if sys.version_info[0] < 3:
|
|
raise self.skipTest('Python 3.x required')
|
|
|
|
api_pref = None
|
|
for backend in cv.videoio_registry.getStreamBufferedBackends():
|
|
if not cv.videoio_registry.hasBackend(backend):
|
|
continue
|
|
if not cv.videoio_registry.isBackendBuiltIn(backend):
|
|
_, abi, api = cv.videoio_registry.getStreamBufferedBackendPluginVersion(backend)
|
|
if (abi < 1 or (abi == 1 and api < 2)):
|
|
continue
|
|
api_pref = backend
|
|
break
|
|
|
|
if not api_pref:
|
|
raise self.skipTest("No available backends")
|
|
|
|
class BufferStream(io.BufferedIOBase):
|
|
def __init__(self, filepath):
|
|
self.f = open(filepath, "rb")
|
|
|
|
def read(self, size=-1):
|
|
return self.f.read(size)
|
|
|
|
def seek(self, offset, whence):
|
|
return self.f.seek(offset, whence)
|
|
|
|
def __del__(self):
|
|
self.f.close()
|
|
|
|
stream = BufferStream(self.find_file("cv/video/768x576.avi"))
|
|
|
|
cap = cv.VideoCapture(stream, api_pref, [])
|
|
self.assertTrue(cap.isOpened())
|
|
hasFrame, frame = cap.read()
|
|
self.assertTrue(hasFrame)
|
|
self.assertEqual(frame.shape, (576, 768, 3))
|
|
|
|
if __name__ == '__main__':
|
|
NewOpenCVTests.bootstrap()
|