mirror of
https://github.com/opencv/opencv.git
synced 2024-12-04 00:39:11 +08:00
e823493af1
Merged yolo_detector and object detection sample #25710 Relates to #25006 This pull request merges the yolo_detector.cpp sample with the object_detector.cpp sample. It also beautifies the bounding box display on the output images ### Pull Request Readiness Checklist See details at https://github.com/opencv/opencv/wiki/How_to_contribute#making-a-good-pull-request - [x] I agree to contribute to the project under Apache 2 License. - [x] To the best of my knowledge, the proposed patch is not based on a code under GPL or another license that is incompatible with OpenCV - [x] The PR is proposed to the proper branch - [x] There is a reference to the original bug report and related work - [ ] There is accuracy test, performance test and test data in opencv_extra repository, if applicable Patch to opencv_extra has the same branch name. - [x] The feature is well documented and sample code can be built with the project CMake
396 lines
16 KiB
Python
396 lines
16 KiB
Python
import cv2 as cv
|
|
import argparse
|
|
import numpy as np
|
|
import sys
|
|
import copy
|
|
import time
|
|
from threading import Thread
|
|
import queue
|
|
|
|
from common import *
|
|
from tf_text_graph_common import readTextMessage
|
|
from tf_text_graph_ssd import createSSDGraph
|
|
from tf_text_graph_faster_rcnn import createFasterRCNNGraph
|
|
|
|
def help():
|
|
print(
|
|
'''
|
|
Firstly, download required models using `download_models.py` (if not already done). Set environment variable OPENCV_DOWNLOAD_CACHE_DIR to specify where models should be downloaded. Also, point OPENCV_SAMPLES_DATA_PATH to opencv/samples/data.\n"\n
|
|
|
|
To run:
|
|
python object_detection.py model_name(e.g yolov8) --input=path/to/your/input/image/or/video (don't pass --input to use device camera)
|
|
|
|
Sample command:
|
|
python object_detection.py yolov8 --input=path/to/image
|
|
Model path can also be specified using --model argument
|
|
'''
|
|
)
|
|
|
|
backends = ("default", "openvino", "opencv", "vkcom", "cuda")
|
|
targets = ("cpu", "opencl", "opencl_fp16", "ncs2_vpu", "hddl_vpu", "vulkan", "cuda", "cuda_fp16")
|
|
|
|
parser = argparse.ArgumentParser(add_help=False)
|
|
parser.add_argument('--zoo', default=os.path.join(os.path.dirname(os.path.abspath(__file__)), 'models.yml'),
|
|
help='An optional path to file with preprocessing parameters.')
|
|
parser.add_argument('--input', help='Path to input image or video file. Skip this argument to capture frames from a camera.')
|
|
parser.add_argument('--out_tf_graph', default='graph.pbtxt',
|
|
help='For models from TensorFlow Object Detection API, you may '
|
|
'pass a .config file which was used for training through --config '
|
|
'argument. This way an additional .pbtxt file with TensorFlow graph will be created.')
|
|
parser.add_argument('--framework', choices=['caffe', 'tensorflow', 'darknet', 'dldt', 'onnx'],
|
|
help='Optional name of an origin framework of the model. '
|
|
'Detect it automatically if it does not set.')
|
|
parser.add_argument('--thr', type=float, default=0.5, help='Confidence threshold')
|
|
parser.add_argument('--nms', type=float, default=0.4, help='Non-maximum suppression threshold')
|
|
parser.add_argument('--backend', default="default", type=str, choices=backends,
|
|
help="Choose one of computation backends: "
|
|
"default: automatically (by default), "
|
|
"openvino: Intel's Deep Learning Inference Engine (https://software.intel.com/openvino-toolkit), "
|
|
"opencv: OpenCV implementation, "
|
|
"vkcom: VKCOM, "
|
|
"cuda: CUDA, "
|
|
"webnn: WebNN")
|
|
parser.add_argument('--target', default="cpu", type=str, choices=targets,
|
|
help="Choose one of target computation devices: "
|
|
"cpu: CPU target (by default), "
|
|
"opencl: OpenCL, "
|
|
"opencl_fp16: OpenCL fp16 (half-float precision), "
|
|
"ncs2_vpu: NCS2 VPU, "
|
|
"hddl_vpu: HDDL VPU, "
|
|
"vulkan: Vulkan, "
|
|
"cuda: CUDA, "
|
|
"cuda_fp16: CUDA fp16 (half-float preprocess)")
|
|
parser.add_argument('--async', type=int, default=0,
|
|
dest='use_threads',
|
|
help='Choose 0 for synchronous mode and 1 for asynchronous mode')
|
|
args, _ = parser.parse_known_args()
|
|
add_preproc_args(args.zoo, parser, 'object_detection')
|
|
parser = argparse.ArgumentParser(parents=[parser],
|
|
description='Use this script to run object detection deep learning networks using OpenCV.',
|
|
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
|
|
args = parser.parse_args()
|
|
|
|
if args.alias is None or hasattr(args, 'help'):
|
|
help()
|
|
exit(1)
|
|
|
|
args.model = findModel(args.model, args.sha1)
|
|
if args.config is not None:
|
|
args.config = findFile(args.config)
|
|
args.labels = findFile(args.labels)
|
|
|
|
# If config specified, try to load it as TensorFlow Object Detection API's pipeline.
|
|
config = readTextMessage(args.config)
|
|
if 'model' in config:
|
|
print('TensorFlow Object Detection API config detected')
|
|
if 'ssd' in config['model'][0]:
|
|
print('Preparing text graph representation for SSD model: ' + args.out_tf_graph)
|
|
createSSDGraph(args.model, args.config, args.out_tf_graph)
|
|
args.config = args.out_tf_graph
|
|
elif 'faster_rcnn' in config['model'][0]:
|
|
print('Preparing text graph representation for Faster-RCNN model: ' + args.out_tf_graph)
|
|
createFasterRCNNGraph(args.model, args.config, args.out_tf_graph)
|
|
args.config = args.out_tf_graph
|
|
|
|
|
|
# Load names of classes
|
|
labels = None
|
|
if args.labels:
|
|
with open(args.labels, 'rt') as f:
|
|
labels = f.read().rstrip('\n').split('\n')
|
|
|
|
# Load a network
|
|
net = cv.dnn.readNet(args.model, args.config, args.framework)
|
|
net.setPreferableBackend(get_backend_id(args.backend))
|
|
net.setPreferableTarget(get_target_id(args.target))
|
|
outNames = net.getUnconnectedOutLayersNames()
|
|
|
|
confThreshold = args.thr
|
|
nmsThreshold = args.nms
|
|
stdSize = 0.8
|
|
stdWeight = 2
|
|
stdImgSize = 512
|
|
asyncN = 0
|
|
|
|
def get_color(class_id):
|
|
r = min((class_id >> 0 & 1) * 128 + (class_id >> 3 & 1) * 64 + (class_id >> 6 & 1) * 32 + 80, 255)
|
|
g = min((class_id >> 1 & 1) * 128 + (class_id >> 4 & 1) * 64 + (class_id >> 7 & 1) * 32 + 40, 255)
|
|
b = min((class_id >> 2 & 1) * 128 + (class_id >> 5 & 1) * 64 + (class_id >> 8 & 1) * 32 + 40, 255)
|
|
return (int(b), int(g), int(r))
|
|
|
|
def get_text_color(bg_color):
|
|
luminance = 0.299 * bg_color[2] + 0.587 * bg_color[1] + 0.114 * bg_color[0]
|
|
return (0, 0, 0) if luminance > 128 else (255, 255, 255)
|
|
|
|
def postprocess(frame, outs):
|
|
frameHeight = frame.shape[0]
|
|
frameWidth = frame.shape[1]
|
|
|
|
layerNames = net.getLayerNames()
|
|
lastLayerId = net.getLayerId(layerNames[-1])
|
|
lastLayer = net.getLayer(lastLayerId)
|
|
|
|
classIds = []
|
|
confidences = []
|
|
boxes = []
|
|
if lastLayer.type == 'DetectionOutput':
|
|
# Network produces output blob with a shape 1x1xNx7 where N is a number of
|
|
# detections and an every detection is a vector of values
|
|
# [batchId, classId, confidence, left, top, right, bottom]
|
|
for out in outs:
|
|
for detection in out[0, 0]:
|
|
confidence = detection[2]
|
|
if confidence > confThreshold:
|
|
left = int(detection[3])
|
|
top = int(detection[4])
|
|
right = int(detection[5])
|
|
bottom = int(detection[6])
|
|
width = right - left + 1
|
|
height = bottom - top + 1
|
|
if width <= 2 or height <= 2:
|
|
left = int(detection[3] * frameWidth)
|
|
top = int(detection[4] * frameHeight)
|
|
right = int(detection[5] * frameWidth)
|
|
bottom = int(detection[6] * frameHeight)
|
|
width = right - left + 1
|
|
height = bottom - top + 1
|
|
classIds.append(int(detection[1]) - 1) # Skip background label
|
|
confidences.append(float(confidence))
|
|
boxes.append([left, top, width, height])
|
|
elif lastLayer.type == 'Region' or args.postprocessing == 'yolov8':
|
|
# Network produces output blob with a shape NxC where N is a number of
|
|
# detected objects and C is a number of classes + 4 where the first 4
|
|
# numbers are [center_x, center_y, width, height]
|
|
if args.postprocessing == 'yolov8':
|
|
box_scale_w = frameWidth / args.width
|
|
box_scale_h = frameHeight / args.height
|
|
else:
|
|
box_scale_w = frameWidth
|
|
box_scale_h = frameHeight
|
|
|
|
for out in outs:
|
|
if args.postprocessing == 'yolov8':
|
|
out = out[0].transpose(1, 0)
|
|
|
|
for detection in out:
|
|
scores = detection[4:]
|
|
if args.background_label_id >= 0:
|
|
scores = np.delete(scores, args.background_label_id)
|
|
classId = np.argmax(scores)
|
|
confidence = scores[classId]
|
|
if confidence > confThreshold:
|
|
center_x = int(detection[0] * box_scale_w)
|
|
center_y = int(detection[1] * box_scale_h)
|
|
width = int(detection[2] * box_scale_w)
|
|
height = int(detection[3] * box_scale_h)
|
|
left = int(center_x - width / 2)
|
|
top = int(center_y - height / 2)
|
|
classIds.append(classId)
|
|
confidences.append(float(confidence))
|
|
boxes.append([left, top, width, height])
|
|
else:
|
|
print('Unknown output layer type: ' + lastLayer.type)
|
|
exit()
|
|
|
|
# NMS is used inside Region layer only on DNN_BACKEND_OPENCV for another backends we need NMS in sample
|
|
# or NMS is required if number of outputs > 1
|
|
if len(outNames) > 1 or (lastLayer.type == 'Region' or args.postprocessing == 'yolov8') and args.backend != cv.dnn.DNN_BACKEND_OPENCV:
|
|
indices = []
|
|
classIds = np.array(classIds)
|
|
boxes = np.array(boxes)
|
|
confidences = np.array(confidences)
|
|
unique_classes = set(classIds)
|
|
for cl in unique_classes:
|
|
class_indices = np.where(classIds == cl)[0]
|
|
conf = confidences[class_indices]
|
|
box = boxes[class_indices].tolist()
|
|
nms_indices = cv.dnn.NMSBoxes(box, conf, confThreshold, nmsThreshold)
|
|
indices.extend(class_indices[nms_indices])
|
|
else:
|
|
indices = np.arange(0, len(classIds))
|
|
|
|
return boxes, classIds, confidences, indices
|
|
|
|
def drawPred(classIds, confidences, boxes, indices, fontSize, fontThickness):
|
|
for i in indices:
|
|
box = boxes[i]
|
|
left = box[0]
|
|
top = box[1]
|
|
right = box[0] + box[2]
|
|
bottom = box[1] + box[3]
|
|
bg_color = get_color(classIds[i])
|
|
cv.rectangle(frame, (left, top), (right, bottom), bg_color, fontThickness)
|
|
|
|
label = '%.2f' % confidences[i]
|
|
|
|
# Print a label of class.
|
|
if labels:
|
|
assert(classIds[i] < len(labels))
|
|
label = '%s: %s' % (labels[classIds[i]], label)
|
|
|
|
labelSize, baseLine = cv.getTextSize(label, cv.FONT_HERSHEY_SIMPLEX, fontSize, fontThickness)
|
|
top = max(top, labelSize[1])
|
|
cv.rectangle(frame, (int(left-fontThickness/2), top - labelSize[1]), (left + labelSize[0], top + baseLine), bg_color, cv.FILLED)
|
|
cv.putText(frame, label, (left, top-fontThickness), cv.FONT_HERSHEY_SIMPLEX, fontSize, get_text_color(bg_color), fontThickness)
|
|
|
|
# Process inputs
|
|
winName = 'Deep learning object detection in OpenCV'
|
|
cv.namedWindow(winName, cv.WINDOW_AUTOSIZE)
|
|
|
|
def callback(pos):
|
|
global confThreshold
|
|
confThreshold = pos / 100.0
|
|
|
|
cv.createTrackbar('Confidence threshold, %', winName, int(confThreshold * 100), 99, callback)
|
|
|
|
cap = cv.VideoCapture(cv.samples.findFileOrKeep(args.input) if args.input else 0)
|
|
|
|
class QueueFPS(queue.Queue):
|
|
def __init__(self):
|
|
queue.Queue.__init__(self)
|
|
self.startTime = 0
|
|
self.counter = 0
|
|
|
|
def put(self, v):
|
|
queue.Queue.put(self, v)
|
|
self.counter += 1
|
|
if self.counter == 1:
|
|
self.startTime = time.time()
|
|
|
|
def getFPS(self):
|
|
return self.counter / (time.time() - self.startTime)
|
|
|
|
|
|
process = True
|
|
|
|
#
|
|
# Frames capturing thread
|
|
#
|
|
framesQueue = QueueFPS()
|
|
def framesThreadBody():
|
|
global framesQueue, process
|
|
|
|
while process:
|
|
hasFrame, frame = cap.read()
|
|
if not hasFrame:
|
|
break
|
|
framesQueue.put(frame)
|
|
|
|
|
|
#
|
|
# Frames processing thread
|
|
#
|
|
processedFramesQueue = queue.Queue()
|
|
predictionsQueue = QueueFPS()
|
|
def processingThreadBody():
|
|
global processedFramesQueue, predictionsQueue, args, process, asyncN
|
|
|
|
futureOutputs = []
|
|
while process:
|
|
# Get a next frame
|
|
frame = None
|
|
try:
|
|
frame = framesQueue.get_nowait()
|
|
|
|
if asyncN:
|
|
if len(futureOutputs) == asyncN:
|
|
frame = None # Skip the frame
|
|
else:
|
|
framesQueue.queue.clear() # Skip the rest of frames
|
|
except queue.Empty:
|
|
pass
|
|
|
|
|
|
if not frame is None:
|
|
frameHeight = frame.shape[0]
|
|
frameWidth = frame.shape[1]
|
|
|
|
# Create a 4D blob from a frame.
|
|
inpWidth = args.width if args.width else frameWidth
|
|
inpHeight = args.height if args.height else frameHeight
|
|
blob = cv.dnn.blobFromImage(frame, size=(inpWidth, inpHeight), swapRB=args.rgb, ddepth=cv.CV_32F)
|
|
processedFramesQueue.put(frame)
|
|
|
|
# Run a model
|
|
net.setInput(blob, scalefactor=args.scale, mean=args.mean)
|
|
if net.getLayer(0).outputNameToIndex('im_info') != -1: # Faster-RCNN or R-FCN
|
|
frame = cv.resize(frame, (inpWidth, inpHeight))
|
|
net.setInput(np.array([[inpHeight, inpWidth, 1.6]], dtype=np.float32), 'im_info')
|
|
|
|
if asyncN:
|
|
futureOutputs.append(net.forwardAsync())
|
|
else:
|
|
outs = net.forward(outNames)
|
|
predictionsQueue.put(copy.deepcopy(outs))
|
|
|
|
while futureOutputs and futureOutputs[0].wait_for(0):
|
|
out = futureOutputs[0].get()
|
|
predictionsQueue.put(copy.deepcopy([out]))
|
|
|
|
del futureOutputs[0]
|
|
|
|
if args.use_threads:
|
|
framesThread = Thread(target=framesThreadBody)
|
|
framesThread.start()
|
|
|
|
processingThread = Thread(target=processingThreadBody)
|
|
processingThread.start()
|
|
|
|
#
|
|
# Postprocessing and rendering loop
|
|
#
|
|
while cv.waitKey(1) < 0:
|
|
try:
|
|
# Request prediction first because they put after frames
|
|
outs = predictionsQueue.get_nowait()
|
|
frame = processedFramesQueue.get_nowait()
|
|
imgWidth = max(frame.shape[:2])
|
|
fontSize = (stdSize*imgWidth)/stdImgSize
|
|
fontThickness = max(1,(stdWeight*imgWidth)//stdImgSize)
|
|
|
|
boxes, classIds, confidences, indices = postprocess(frame, outs)
|
|
drawPred(classIds, confidences, boxes, indices, fontSize, fontThickness)
|
|
fontSize = fontSize/2
|
|
# Put efficiency information.
|
|
if predictionsQueue.counter > 1:
|
|
label = 'Camera: %.2f FPS' % (framesQueue.getFPS())
|
|
cv.rectangle(frame, (0, 0), (int(260*fontSize), int(80*fontSize)), (255,255,255), cv.FILLED)
|
|
cv.putText(frame, label, (0, int(25*fontSize)), cv.FONT_HERSHEY_SIMPLEX, fontSize, (0, 0, 0), fontThickness)
|
|
|
|
label = 'Network: %.2f FPS' % (predictionsQueue.getFPS())
|
|
cv.putText(frame, label, (0, int(2*25*fontSize)), cv.FONT_HERSHEY_SIMPLEX, fontSize, (0, 0, 0), fontThickness)
|
|
|
|
label = 'Skipped frames: %d' % (framesQueue.counter - predictionsQueue.counter)
|
|
cv.putText(frame, label, (0, int(3*25*fontSize)), cv.FONT_HERSHEY_SIMPLEX, fontSize, (0, 0, 0), fontThickness)
|
|
|
|
cv.imshow(winName, frame)
|
|
except queue.Empty:
|
|
pass
|
|
|
|
|
|
process = False
|
|
framesThread.join()
|
|
processingThread.join()
|
|
|
|
else:
|
|
# Non-threaded processing if --async is 0
|
|
while cv.waitKey(1) < 0:
|
|
hasFrame, frame = cap.read()
|
|
if not hasFrame:
|
|
cv.waitKey()
|
|
break
|
|
|
|
frameHeight = frame.shape[0]
|
|
frameWidth = frame.shape[1]
|
|
|
|
inpWidth = args.width if args.width else frameWidth
|
|
inpHeight = args.height if args.height else frameHeight
|
|
blob = cv.dnn.blobFromImage(frame, size=(inpWidth, inpHeight), swapRB=args.rgb, ddepth=cv.CV_32F)
|
|
|
|
net.setInput(blob, scalefactor=args.scale, mean=args.mean)
|
|
outs = net.forward(outNames)
|
|
|
|
boxes, classIds, confidences, indices = postprocess(frame, outs)
|
|
drawPred(classIds, confidences, boxes, indices, (stdSize*max(frame.shape[:2]))/stdImgSize, (stdWeight*max(frame.shape[:2]))//stdImgSize)
|
|
|
|
cv.imshow(winName, frame) |