目标检测用到的异步多线程处理(1)-之异步处理   完整工程下载

采用C11以上语法支持

这里为了好说明问题就不用ffmpeg ,保证代码足够简单易懂以下还是用opencv的VideoCapture描述为主.

通常在目标检测分析处理中大致思路是下面这样的:

   如下:

//视频模拟camera
void testCPUvideo(const char* name)
{
    cv::VideoCapture vcap;
    cv::Mat fps_curr;
    vcap.open(name);      
    vcap.set(CAP_PROP_POS_FRAMES, 1);
    if (!vcap.isOpened())
        return ;
    while (true)
    {
        vcap >> fps_curr;
        if (fps_curr.empty())
            return ;
        //DNN(fps_curr, my_objinfo);        //目标检测
        //draw(fps_curr, my_objinfo);       //解析my_objinfo绘制框
        cv::namedWindow("CPU_Video_play", cv::WINDOW_OPENGL);   //gl
        cv::imshow("CPU_Video_play", fps_curr);
        //pushrtsp(fps_curr,1920,1080);     //推出去rtsp
        cv::waitKey(1);
    }
    cv::waitKey(0);
}

这是一个明显的单线程线性的处理方法,很多事情在同步阻塞下是不能操作的,这也导致了中间不能插入太多要求.真正项目上是力不从心的.

下面对上面做一个改进.将解码->处理->渲染(显示)分别拆开到独立线程处理.如下图:

void testMutiThread(const char* name)
{
    //数据结构
    using MAT = cv::Mat;
    using MyFrame = struct 
    {
        std::queue<MAT> image_data_queue;                   //解码数据
        std::mutex decode_mutex;
        std::queue<MAT> process_data_queue;                 //处理数据
        std::mutex process_mutex;
        std::queue<MAT> draw_data_queue;                    //渲染数据
        std::mutex draw_mutex;
    };
    static MyFrame m_Frame;
    std::atomic_bool brun1(true), brun2(true), brun3(true);

    auto CollectTd = [&]
    {
        cv::VideoCapture vcap;
        MAT capture264;
        vcap.open(name);           //打开文件
        vcap.set(cv::CAP_PROP_POS_FRAMES, 1);
        if (!vcap.isOpened())
            return;
        int numflag(0);
        while (true)
        {
            std::string strnum = std::to_string(numflag++);
            vcap >> capture264;     //此处模拟相机采集数据
            if (!capture264.empty())
            {
                std::lock_guard<std::mutex> lock_guard(m_Frame.decode_mutex);
                cout << "CollectThread image_data>> " << strnum << endl;
                m_Frame.image_data_queue.push(capture264);
            }
            else {
                brun1 = brun2 = brun3 = false;
                break;
            }
            this_thread::sleep_for(std::chrono::milliseconds(30));
        }
    };

    auto DecodeTd =[&]
    {
        while (brun1)
        {
            if (!m_Frame.image_data_queue.empty())
            {
                MAT image_data;
                {
                    std::lock_guard<std::mutex> lock_guard(m_Frame.decode_mutex);
                    image_data = m_Frame.image_data_queue.front();
                    m_Frame.image_data_queue.pop();
                    cout << "DecodeThread image_data<< " << endl;
                }
                //从收到的数据进行解码,解码后送到处理线程
                MAT process_data = image_data;
                {
                    std::lock_guard<std::mutex> lock_guard(m_Frame.process_mutex);
                    cout << "DecodeThread process_data>> " << endl;
                    m_Frame.process_data_queue.push(process_data);
                }
            }
            else
                this_thread::sleep_for(std::chrono::milliseconds(50));
        }
    };

    auto ProcessTd = [&]
    {
        while (brun2)
        {
            if (!m_Frame.process_data_queue.empty())
            {
                MAT process_data;
                {
                    std::lock_guard<std::mutex> lock_guard(m_Frame.process_mutex);
                    process_data = m_Frame.process_data_queue.front();
                    m_Frame.process_data_queue.pop();
                    cout << "ProcessThread process_data<< " << endl;
                }
                if (!process_data.empty())
                {
                    //处理图像 处理完送到渲染线程
                    //ImageProcess(process_data);
                    MAT draw_data = process_data;
                    std::lock_guard<std::mutex> lock_guard(m_Frame.draw_mutex);
                    cout << "ProcessThread draw_data>> " << endl;
                    m_Frame.draw_data_queue.push(draw_data);
                }
            }
            else
                this_thread::sleep_for(std::chrono::milliseconds(50));
        }
    };

    auto DrawTd = [&]
    {
        cv::namedWindow("DrawThread", cv::WINDOW_OPENGL);   //gl
        cv::resizeWindow("DrawThread", 800, 600);
        while (brun3)
        {
            if (!m_Frame.draw_data_queue.empty())
            {
                MAT draw_data;
                {
                    std::lock_guard<std::mutex> lock_guard(m_Frame.draw_mutex);
                    draw_data = m_Frame.draw_data_queue.front();
                    m_Frame.draw_data_queue.pop();
                    //用opencv渲染
                    cv::imshow("DrawThread", draw_data);
                    cv::waitKey(1);
                    cout << "DrawThread draw_data<< " << endl;
                }
            }
            else
                this_thread::sleep_for(std::chrono::milliseconds(50));
        }
    };

    std::thread td1(CollectTd);
    std::thread td2(DecodeTd);
    std::thread td3(ProcessTd);
    std::thread td4(DrawTd);
    td1.join();
    td2.join();
    td3.join();
    td4.join();
    this_thread::sleep_for(std::chrono::hours(1));
}

当然实际中可能还有更多的处理线程,这里我做个笔记. 所以 还是以类的形式更加直观,这里扩展一下:

采用6个线程来模拟,数据采用5个队列内部各自处理

部分参考示例代码如下:

//**********************************************************************
//                        C11 视频多线程框架
//                                          @2021-01-04阿甘整理
//**********************************************************************
//通用数据结构
using MAT = cv::Mat;

//共享数据结构
struct MyFrame
{
    std::queue<MAT> image_data_queue;                   //解码数据
    std::mutex decode_mutex;
    std::queue<MAT> process_data_queue;                 //处理数据
    std::mutex process_mutex;
    std::queue<MAT> push_data_queue;                    //推流数据
    std::mutex push_mutex;
    std::queue<MAT> draw_data_queue;                    //渲染数据
    std::mutex draw_mutex;
    std::queue<MAT> tracker_data_queue;                 //跟踪数据
    std::mutex tracker_mutex;
    //清空方法
    void clear(queue<MAT>& my_queue)
    {
        queue<MAT> empty_queue;
        std::swap(my_queue, empty_queue);
    }
    void clearall(void)
    {
        clear(image_data_queue);
        clear(process_data_queue);
        clear(push_data_queue);
        clear(draw_data_queue);
        clear(tracker_data_queue);
    }
};

//线程基类
class BaseThread
{
public:
    BaseThread(MyFrame& mFrame);
    virtual ~BaseThread(void);
    MyFrame& m_Frame;
public:
    void begintk(void);
    void endtk(void);
    bool getstatus(void);
    //线程函数用来多态重载
    virtual void run(void) = 0;
private:
    std::atomic_bool m_status;
    std::thread th;
};

//采集线程
class CollectTd :public BaseThread
{
public:
    using BaseThread::BaseThread;
    virtual void run(void);
    void setplayMp4(string url);
private:
    string m_mp4;
    cv::VideoCapture vcap;
    MAT capture264;
};

//解码线程
class DecodeTd :public BaseThread
{
public:
    using BaseThread::BaseThread;
    virtual void run(void);
};

//渲染线程
class DrawTd :public BaseThread
{
public:
    using BaseThread::BaseThread;
    virtual void run(void);
};

//处理线程
class ProcessTd :public BaseThread
{
public:
    using BaseThread::BaseThread;
    virtual void run(void);
};

//推流线程
class PushTd :public BaseThread
{
public:
    using BaseThread::BaseThread;
    virtual void run(void);
};

//跟踪线程
class TrackerTd :public BaseThread
{
public:
    using BaseThread::BaseThread;
    virtual void run(void);
};

//管理器
class C11VideoFrame
{
public:
    static void testrun(void)
    {
        C11VideoFrame mgr;
        mgr.init();
        mgr.beginwork();
        this_thread::sleep_for(std::chrono::hours(10));
    }
public:
    C11VideoFrame(void);
    ~C11VideoFrame(void);
public:
    void init(void);
    void beginwork(void);
    void endwork(void);
private:
    std::unique_ptr<CollectTd> m_collect;
    std::unique_ptr<DecodeTd> m_decode;
    std::unique_ptr<ProcessTd> m_Process;
    std::unique_ptr<PushTd> m_Push;
    std::unique_ptr<DrawTd> m_Draw;
    std::unique_ptr<TrackerTd> m_Tracker;
    std::thread th;
    MyFrame m_Frame;                    //中间数据结构
};

实现部分示例:

//**********************************************************************
//                        C11 视频多线程框架
//                                          @2021-01-04阿甘整理
//**********************************************************************


BaseThread::BaseThread(MyFrame& mFrame) : m_status(false)
, m_Frame(mFrame)
{
}

BaseThread::~BaseThread(void)
{
    endtk();
}

void BaseThread::begintk(void)
{
    if (m_status)
        endtk();
    m_status = true;
    th = thread([&]()
    {
        this->run();
    });
}

void BaseThread::endtk(void)
{
    m_status = false;
    if (th.joinable())
        th.join();
}

bool BaseThread::getstatus(void)
{
    return m_status;
}


void CollectTd::run(void)
{
    vcap.open(m_mp4);           //打开文件
    vcap.set(cv::CAP_PROP_POS_FRAMES, 1);
    if (!vcap.isOpened())
        return;
    int numflag(0);
    while (getstatus())
    {
        std::string strnum = std::to_string(numflag++);
        vcap >> capture264;     //此处模拟相机采集数据
        if (!capture264.empty())
        {
            std::lock_guard<std::mutex> lock_guard(m_Frame.decode_mutex);
            cout << "CollectThread image_data>> " << strnum << endl;
            m_Frame.image_data_queue.push(capture264);
        }
        else
            endtk();
        this_thread::sleep_for(std::chrono::milliseconds(30));
    }
}

void CollectTd::setplayMp4(string url)
{
    m_mp4 = url;
}

void DecodeTd::run(void)
{
    while (getstatus())
    {
        if (!m_Frame.image_data_queue.empty())
        {
            MAT image_data;
            {
                std::lock_guard<std::mutex> lock_guard(m_Frame.decode_mutex);
                image_data = m_Frame.image_data_queue.front();
                m_Frame.image_data_queue.pop();
                cout << "DecodeThread image_data<< "  << endl;
            }
            //从收到的数据进行硬解码,解码后送到处理线程
            MAT process_data = image_data;
            {
                //cuvidxx操作
                std::lock_guard<std::mutex> lock_guard(m_Frame.process_mutex);
                cout << "DecodeThread process_data>> " << endl;
                m_Frame.process_data_queue.push(process_data);
            }
        }
        else
            this_thread::sleep_for(std::chrono::milliseconds(50));
    }
}

void ProcessTd::run(void)
{
    while (getstatus())
    {
        if (!m_Frame.process_data_queue.empty())
        {
            MAT process_data;
            {
                std::lock_guard<std::mutex> lock_guard(m_Frame.process_mutex);
                process_data = m_Frame.process_data_queue.front();
                m_Frame.process_data_queue.pop();
                cout << "ProcessThread process_data<< "  << endl;
            }
            if (!process_data.empty())
            {
                //处理图像 处理完送到渲染线程
                //ImageProcess(process_data);
                MAT draw_data = process_data;
                std::lock_guard<std::mutex> lock_guard(m_Frame.draw_mutex);
                cout << "ProcessThread draw_data>> "  << endl;
                m_Frame.draw_data_queue.push(draw_data);
            }
        }
        else
            this_thread::sleep_for(std::chrono::milliseconds(50));
    }
}

void DrawTd::run(void)
{
    cv::namedWindow("DrawThread", cv::WINDOW_OPENGL);   //gl
    cv::resizeWindow("DrawThread", 800,600);
    while (getstatus())
    {
        if (!m_Frame.draw_data_queue.empty())
        {
            MAT draw_data;
            {
                std::lock_guard<std::mutex> lock_guard(m_Frame.draw_mutex);
                draw_data = m_Frame.draw_data_queue.front();
                m_Frame.draw_data_queue.pop();
                //用opencv渲染
                cv::imshow("DrawThread", draw_data);
                cv::waitKey(1);
                cout << "DrawThread draw_data<< " << endl;
            }
            //跟踪数据计算
            MAT tracker_data = draw_data;
            {
                std::lock_guard<std::mutex> lock_guard(m_Frame.tracker_mutex);
                cout << "DrawThread tracker_data>> " << endl;
                m_Frame.tracker_data_queue.push(tracker_data);
            }
            //处理推流数据
            MAT push_data = draw_data;
            {
                std::lock_guard<std::mutex> lock_guard(m_Frame.push_mutex);
                cout << "DrawThread push_data>> " << endl;
                m_Frame.push_data_queue.push(push_data);
            }
        }
        else
            this_thread::sleep_for(std::chrono::milliseconds(50));
    }
}

void PushTd::run(void)
{
    while (getstatus())
    {
        if (!m_Frame.push_data_queue.empty())
        {
            MAT push_data;
            {
                std::lock_guard<std::mutex> lock_guard(m_Frame.push_mutex);
                push_data = m_Frame.push_data_queue.front();
                m_Frame.push_data_queue.pop();
                cout << "PushThread push_data<< " << endl;
            }
            //调用推送方法
            if (!push_data.empty())
            {
                //RtmpPuller(push_data.data, push_data.cols * push_data.rows * push_data.channels());
            }
        }
        else
            this_thread::sleep_for(std::chrono::milliseconds(50));
    }
}

void TrackerTd::run(void)
{
    while (getstatus())
    {
        if (!m_Frame.tracker_data_queue.empty())
        {
            MAT tracker_data;
            {
                std::lock_guard<std::mutex> lock_guard(m_Frame.tracker_mutex);
                tracker_data = m_Frame.tracker_data_queue.front();
                m_Frame.tracker_data_queue.pop();
                cout << "TrackerThread tracker_data<< " << endl;
            }
            //中间跟踪器算法
            //tracker(tracker_data);
        }
        else
            this_thread::sleep_for(std::chrono::milliseconds(50));
    }
}




C11VideoFrame::C11VideoFrame(void)
{
}

C11VideoFrame::~C11VideoFrame(void)
{
}

void C11VideoFrame::init(void)
{
    m_collect = unique_ptr<CollectTd>(new CollectTd(m_Frame));
    m_collect->setplayMp4("cam.mp4");
    m_decode = unique_ptr<DecodeTd>(new DecodeTd(m_Frame));
    m_Process = unique_ptr<ProcessTd>(new ProcessTd(m_Frame));
    m_Push = unique_ptr<PushTd>(new PushTd(m_Frame));
    m_Draw = unique_ptr<DrawTd>(new DrawTd(m_Frame));
    m_Tracker = unique_ptr<TrackerTd>(new TrackerTd(m_Frame));
}

void C11VideoFrame::beginwork(void)
{
    m_decode->begintk();
    m_Process->begintk();
    m_Push->begintk();
    m_Draw->begintk();
    m_Tracker->begintk();
    m_collect->begintk();
    th = thread([&]()
    {
        while (true)
        {
            //采集线程停止 其他可以关闭了
            if (!m_collect->getstatus())
            {
                m_decode->endtk();
                m_Process->endtk();
                m_Push->endtk();
                m_Draw->endtk();
                m_Tracker->endtk();
                break;
            }
            this_thread::sleep_for(std::chrono::milliseconds(800));
        }
    });
}

void C11VideoFrame::endwork(void)
{
    m_collect->endtk();
    m_decode->endtk();
    m_Process->endtk();
    m_Push->endtk();
    m_Draw->endtk();
    m_Tracker->endtk();
}


最后运行效果和中间打印日志:还是以影像修复七里香来弄咯

总结:目前仅仅解决了在多线程中异步处理的思路,这里并没有对线程做干涉同步,而是用队列各自处理.在一定情况下还是有很大的折腾可自己搞的.中间的数据结构都是采用的数据拷贝这个是要根据需要自己折中选择的. 为了简单就这样了,就当是抛砖引玉,高手就见笑了.

视频分析中的多线程笔记记录(1)分享完毕.

完整下载

Logo

DAMO开发者矩阵,由阿里巴巴达摩院和中国互联网协会联合发起,致力于探讨最前沿的技术趋势与应用成果,搭建高质量的交流与分享平台,推动技术创新与产业应用链接,围绕“人工智能与新型计算”构建开放共享的开发者生态。

更多推荐