Asynchronous C++ sample

This commit is contained in:
Dmitry Kurtaev 2019-05-14 17:43:48 +03:00
parent c3b0a68a2b
commit 63bb97cc19

View File

@ -5,6 +5,11 @@
#include <opencv2/imgproc.hpp>
#include <opencv2/highgui.hpp>
#ifdef CV_CXX11
#include <thread>
#include <queue>
#endif
#include "common.hpp"
std::string keys =
@ -26,8 +31,9 @@ std::string keys =
"0: CPU target (by default), "
"1: OpenCL, "
"2: OpenCL fp16 (half-float precision), "
"3: VPU }";
"3: VPU }"
"{ async | 0 | Number of asynchronous forwards at the same time. "
"Choose 0 for synchronous mode }";
using namespace cv;
using namespace dnn;
@ -35,13 +41,66 @@ using namespace dnn;
float confThreshold, nmsThreshold;
std::vector<std::string> classes;
inline void preprocess(const Mat& frame, Net& net, Size inpSize, float scale,
const Scalar& mean, bool swapRB);
void postprocess(Mat& frame, const std::vector<Mat>& out, Net& net);
void drawPred(int classId, float conf, int left, int top, int right, int bottom, Mat& frame);
void callback(int pos, void* userdata);
std::vector<String> getOutputsNames(const Net& net);
#ifdef CV_CXX11
template <typename T>
class QueueFPS : public std::queue<T>
{
public:
QueueFPS() : counter(0) {}
void push(const T& entry)
{
std::lock_guard<std::mutex> lock(mutex);
std::queue<T>::push(entry);
counter += 1;
if (counter == 1)
{
// Start counting from a second frame (warmup).
tm.reset();
tm.start();
}
}
T get()
{
std::lock_guard<std::mutex> lock(mutex);
T entry = this->front();
this->pop();
return entry;
}
float getFPS()
{
tm.stop();
double fps = counter / tm.getTimeSec();
tm.start();
return static_cast<float>(fps);
}
void clear()
{
std::lock_guard<std::mutex> lock(mutex);
while (!this->empty())
this->pop();
}
unsigned int counter;
private:
TickMeter tm;
std::mutex mutex;
};
#endif // CV_CXX11
int main(int argc, char** argv)
{
@ -67,6 +126,7 @@ int main(int argc, char** argv)
bool swapRB = parser.get<bool>("rgb");
int inpWidth = parser.get<int>("width");
int inpHeight = parser.get<int>("height");
size_t async = parser.get<int>("async");
CV_Assert(parser.has("model"));
std::string modelPath = findFile(parser.get<String>("model"));
std::string configPath = findFile(parser.get<String>("config"));
@ -104,6 +164,108 @@ int main(int argc, char** argv)
else
cap.open(parser.get<int>("device"));
#ifdef CV_CXX11
bool process = true;
// Frames capturing thread
QueueFPS<Mat> framesQueue;
std::thread framesThread([&](){
Mat frame;
while (process)
{
cap >> frame;
if (!frame.empty())
framesQueue.push(frame.clone());
else
break;
}
});
// Frames processing thread
QueueFPS<Mat> processedFramesQueue;
QueueFPS<std::vector<Mat> > predictionsQueue;
std::thread processingThread([&](){
std::queue<std::future<Mat> > futureOutputs;
Mat blob;
while (process)
{
// Get a next frame
Mat frame;
{
if (!framesQueue.empty())
{
frame = framesQueue.get();
if (async)
{
if (futureOutputs.size() == async)
frame = Mat();
}
else
framesQueue.clear(); // Skip the rest of frames
}
}
// Process the frame
if (!frame.empty())
{
preprocess(frame, net, Size(inpWidth, inpHeight), scale, mean, swapRB);
processedFramesQueue.push(frame);
if (async)
{
futureOutputs.push(net.forwardAsync());
}
else
{
std::vector<Mat> outs;
net.forward(outs, outNames);
predictionsQueue.push(outs);
}
}
while (!futureOutputs.empty() &&
futureOutputs.front().wait_for(std::chrono::seconds(0)) == std::future_status::ready)
{
Mat out = futureOutputs.front().get();
predictionsQueue.push({out});
futureOutputs.pop();
}
}
});
// Postprocessing and rendering loop
while (waitKey(1) < 0)
{
if (predictionsQueue.empty())
continue;
std::vector<Mat> outs = predictionsQueue.get();
Mat frame = processedFramesQueue.get();
postprocess(frame, outs, net);
if (predictionsQueue.counter > 1)
{
std::string label = format("Camera: %.2f FPS", framesQueue.getFPS());
putText(frame, label, Point(0, 15), FONT_HERSHEY_SIMPLEX, 0.5, Scalar(0, 255, 0));
label = format("Network: %.2f FPS", predictionsQueue.getFPS());
putText(frame, label, Point(0, 30), FONT_HERSHEY_SIMPLEX, 0.5, Scalar(0, 255, 0));
label = format("Skipped frames: %d", framesQueue.counter - predictionsQueue.counter);
putText(frame, label, Point(0, 45), FONT_HERSHEY_SIMPLEX, 0.5, Scalar(0, 255, 0));
}
imshow(kWinName, frame);
}
process = false;
framesThread.join();
processingThread.join();
#else // CV_CXX11
if (async)
CV_Error(Error::StsNotImplemented, "Asynchronous forward is supported only with Inference Engine backend.");
// Process frames.
Mat frame, blob;
while (waitKey(1) < 0)
@ -115,19 +277,8 @@ int main(int argc, char** argv)
break;
}
// Create a 4D blob from a frame.
Size inpSize(inpWidth > 0 ? inpWidth : frame.cols,
inpHeight > 0 ? inpHeight : frame.rows);
blobFromImage(frame, blob, scale, inpSize, mean, swapRB, false);
preprocess(frame, net, Size(inpWidth, inpHeight), scale, mean, swapRB);
// Run a model.
net.setInput(blob);
if (net.getLayer(0)->outputNameToIndex("im_info") != -1) // Faster-RCNN or R-FCN
{
resize(frame, frame, inpSize);
Mat imInfo = (Mat_<float>(1, 3) << inpSize.height, inpSize.width, 1.6f);
net.setInput(imInfo, "im_info");
}
std::vector<Mat> outs;
net.forward(outs, outNames);
@ -142,9 +293,29 @@ int main(int argc, char** argv)
imshow(kWinName, frame);
}
#endif // CV_CXX11
return 0;
}
inline void preprocess(const Mat& frame, Net& net, Size inpSize, float scale,
const Scalar& mean, bool swapRB)
{
static Mat blob;
// Create a 4D blob from a frame.
if (inpSize.width <= 0) inpSize.width = frame.cols;
if (inpSize.height <= 0) inpSize.height = frame.rows;
blobFromImage(frame, blob, 1.0, inpSize, Scalar(), swapRB, false, CV_8U);
// Run a model.
net.setInput(blob, "", scale, mean);
if (net.getLayer(0)->outputNameToIndex("im_info") != -1) // Faster-RCNN or R-FCN
{
resize(frame, frame, inpSize);
Mat imInfo = (Mat_<float>(1, 3) << inpSize.height, inpSize.width, 1.6f);
net.setInput(imInfo, "im_info");
}
}
void postprocess(Mat& frame, const std::vector<Mat>& outs, Net& net)
{
static std::vector<int> outLayers = net.getUnconnectedOutLayers();