相关文章推荐
悲伤的鸭蛋  ·  Android ...·  1 年前    · 

项目中经常需要使用海康的网络摄像头,做视频图像算法经常会用到rtsp流,但是rtsp一般很难保证实时性且解码效率不能保证.通过海康给的python的demo,对模块进行封装以方便python像调用opencv的VideoCaputure一样调用.目录结构如下

├── HKCam.py #自己写的封装

├── HCNetSDK.py # SDK给定
├── PlayCtrl.py #SDK给定
├── lib
│   ├── linux
│   │   ├── HCNetSDKCom
│   │   │   ├── libanalyzedata.so
│   │   │   ├── libAudioIntercom.so
│   │   │   ├── libHCAlarm.so
│   │   │   ├── libHCCoreDevCfg.so
│   │   │   ├── libHCDisplay.so
│   │   │   ├── libHCGeneralCfgMgr.so
│   │   │   ├── libHCIndustry.so
│   │   │   ├── libHCPlayBack.so
│   │   │   ├── libHCPreview.so
│   │   │   ├── libHCVoiceTalk.so
│   │   │   ├── libiconv2.so
│   │   │   ├── libStreamTransClient.so
│   │   │   └── libSystemTransform.so
│   │   ├── HCNetSDK_Log_Switch.xml
│   │   ├── libAudioRender.so
│   │   ├── libcrypto.so.1.1
│   │   ├── libHCCore.so
│   │   ├── libhcnetsdk.so
│   │   ├── libhpr.so
│   │   ├── libNPQos.so
│   │   ├── libopenal.so.1
│   │   ├── libPlayCtrl.so
│   │   ├── libssl.so.1.1
│   │   ├── libSuperRender.so
│   │   ├── libz.so
│   ├── win
│   │   ├── HCCore.dll
│   │   ├── HCNetSDKCom
│   │   │   ├── AnalyzeData.dll
│   │   │   ├── AudioIntercom.dll
│   │   │   ├── AudioRender.dll
│   │   │   ├── HCAlarm.dll
│   │   │   ├── HCAlarm.lib
│   │   │   ├── HCCoreDevCfg.dll
│   │   │   ├── HCDisplay.dll
│   │   │   ├── HCGeneralCfgMgr.dll
│   │   │   ├── HCGeneralCfgMgr.lib
│   │   │   ├── HCIndustry.dll
│   │   │   ├── HCPlayBack.dll
│   │   │   ├── HCPreview.dll
│   │   │   ├── HCPreview.lib
│   │   │   ├── HCVoiceTalk.dll
│   │   │   ├── libiconv2.dll
│   │   │   ├── OpenAL32.dll
│   │   │   ├── StreamTransClient.dll
│   │   │   └── SystemTransform.dll
│   │   ├── HCNetSDK.dll
│   │   ├── hlog.dll
│   │   ├── hpr.dll
│   │   ├── libcrypto-1_1-x64.dll
│   │   ├── libssl-1_1-x64.dll
│   │   ├── PlayCtrl.dll
│   │   └── zlib1.dll

# coding=utf-8
HKCam.py
import os
import platform
from HCNetSDK import *
from PlayCtrl import *
import numpy as np
import time
import cv2
class HKCam(object):
    def __init__(self,camIP,username,password,devport=8000):
        # 登录的设备信息
        self.DEV_IP = create_string_buffer(camIP.encode())
        self.DEV_PORT =devport
        self.DEV_USER_NAME = create_string_buffer(username.encode())
        self.DEV_PASSWORD = create_string_buffer(password.encode())
        self.WINDOWS_FLAG = False if platform.system() != "Windows" else True
        self.funcRealDataCallBack_V30 = None
        self.recent_img = None #最新帧
        self.n_stamp = None #帧时间戳
        self.last_stamp = None #上次时间戳
        # 加载库,先加载依赖库
        if self.WINDOWS_FLAG:
            os.chdir(r'./lib/win')
            self.Objdll = ctypes.CDLL(r'./HCNetSDK.dll')  # 加载网络库
            self.Playctrldll = ctypes.CDLL(r'./PlayCtrl.dll')  # 加载播放库
        else:
            os.chdir(r'./lib/linux')
            self.Objdll = cdll.LoadLibrary(r'./libhcnetsdk.so')
            self.Playctrldll = cdll.LoadLibrary(r'./libPlayCtrl.so')
        # 设置组件库和SSL库加载路径    
        self.SetSDKInitCfg()
        # 初始化DLL
        self.Objdll.NET_DVR_Init()
        # 启用SDK写日志
        self.Objdll.NET_DVR_SetLogToFile(3, bytes('./SdkLog_Python/', encoding="utf-8"), False)
        os.chdir(r'../../') 
        (self.lUserId, self.device_info) = self.LoginDev()
        self.Playctrldll.PlayM4_ResetBuffer(self.lUserId,1)
        print(self.lUserId)
        if self.lUserId < 0:
            err = self.Objdll.NET_DVR_GetLastError()
            print('Login device fail, error code is: %d' % self.Objdll.NET_DVR_GetLastError())
            # 释放资源
            self.Objdll.NET_DVR_Cleanup()
            exit()
        else:
            print(f'摄像头[{camIP}]登录成功!!')
        self.start_play()
        time.sleep(1)
    def start_play(self,):
        #global funcRealDataCallBack_V30
        self.PlayCtrl_Port = c_long(-1)  # 播放句柄
        # 获取一个播放句柄
        if not self.Playctrldll.PlayM4_GetPort(byref(self.PlayCtrl_Port)):
            print(u'获取播放库句柄失败')
        # 定义码流回调函数
        self.funcRealDataCallBack_V30 = REALDATACALLBACK(self.RealDataCallBack_V30)
        # 开启预览
        self.preview_info = NET_DVR_PREVIEWINFO()
        self.preview_info.hPlayWnd = 0
        self.preview_info.lChannel = 1  # 通道号
        self.preview_info.dwStreamType = 0  # 主码流
        self.preview_info.dwLinkMode = 0  # TCP
        self.preview_info.bBlocked = 1  # 阻塞取流
        # 开始预览并且设置回调函数回调获取实时流数据
        self.lRealPlayHandle = self.Objdll.NET_DVR_RealPlay_V40(self.lUserId, byref(self.preview_info), self.funcRealDataCallBack_V30, None)
        if self.lRealPlayHandle < 0:
            print ('Open preview fail, error code is: %d' %self. Objdll.NET_DVR_GetLastError())
            # 登出设备
            self.Objdll.NET_DVR_Logout(self.lUserId)
            # 释放资源
            self.Objdll.NET_DVR_Cleanup()
            exit()
    def SetSDKInitCfg(self,):
        # 设置SDK初始化依赖库路径
        # 设置HCNetSDKCom组件库和SSL库加载路径
        # print(os.getcwd())
        if self.WINDOWS_FLAG:
            strPath = os.getcwd().encode('gbk')
            sdk_ComPath = NET_DVR_LOCAL_SDK_PATH()
            sdk_ComPath.sPath = strPath
            self.Objdll.NET_DVR_SetSDKInitCfg(2, byref(sdk_ComPath))
            self.Objdll.NET_DVR_SetSDKInitCfg(3, create_string_buffer(strPath + b'\libcrypto-1_1-x64.dll'))
            self.Objdll.NET_DVR_SetSDKInitCfg(4, create_string_buffer(strPath + b'\libssl-1_1-x64.dll'))
        else:
            strPath = os.getcwd().encode('utf-8')
            sdk_ComPath = NET_DVR_LOCAL_SDK_PATH()
            sdk_ComPath.sPath = strPath
            self.Objdll.NET_DVR_SetSDKInitCfg(2, byref(sdk_ComPath))
            self.Objdll.NET_DVR_SetSDKInitCfg(3, create_string_buffer(strPath + b'/libcrypto.so.1.1'))
            self.Objdll.NET_DVR_SetSDKInitCfg(4, create_string_buffer(strPath + b'/libssl.so.1.1'))
    def LoginDev(self,):
        # 登录注册设备
        device_info = NET_DVR_DEVICEINFO_V30()
        lUserId = self.Objdll.NET_DVR_Login_V30(self.DEV_IP, self.DEV_PORT, self.DEV_USER_NAME, self.DEV_PASSWORD, byref(device_info))
        return (lUserId, device_info)
    def read(self,):
        while self.n_stamp==self.last_stamp:
            continue
        self.last_stamp=self.n_stamp
        return self.n_stamp,self.recent_img
    def DecCBFun(self,nPort, pBuf, nSize, pFrameInfo, nUser, nReserved2):
            if pFrameInfo.contents.nType == 3:
                t0 = time.time()
                # 解码返回视频YUV数据,将YUV数据转成jpg图片保存到本地
                # 如果有耗时处理,需要将解码数据拷贝到回调函数外面的其他线程里面处理,避免阻塞回调导致解码丢帧
                nWidth = pFrameInfo.contents.nWidth
                nHeight = pFrameInfo.contents.nHeight
                #nType = pFrameInfo.contents.nType
                dwFrameNum = pFrameInfo.contents.dwFrameNum
                nStamp = pFrameInfo.contents.nStamp
                #print(nWidth, nHeight, nType, dwFrameNum, nStamp, sFileName)
                YUV = np.frombuffer(pBuf[:nSize],dtype=np.uint8)
                YUV = np.reshape(YUV,[nHeight+nHeight//2,nWidth])
                img_rgb = cv2.cvtColor(YUV,cv2.COLOR_YUV2BGR_YV12)
                self.recent_img,self.n_stamp = img_rgb,nStamp
    def RealDataCallBack_V30(self,lPlayHandle, dwDataType, pBuffer, dwBufSize, pUser):
        # 码流回调函数
         if dwDataType == NET_DVR_SYSHEAD:
            # 设置流播放模式
            self.Playctrldll.PlayM4_SetStreamOpenMode(self.PlayCtrl_Port, 0)
            # 打开码流,送入40字节系统头数据
            if self.Playctrldll.PlayM4_OpenStream(self.PlayCtrl_Port, pBuffer, dwBufSize, 1024*1024):
                # 设置解码回调,可以返回解码后YUV视频数据
                #global FuncDecCB
                self.FuncDecCB = DECCBFUNWIN(self.DecCBFun)
                self.Playctrldll.PlayM4_SetDecCallBackExMend(self.PlayCtrl_Port, self.FuncDecCB, None, 0, None)
                # 开始解码播放
                if self.Playctrldll.PlayM4_Play(self.PlayCtrl_Port, None):
                    print(u'播放库播放成功')
                else:
                    print(u'播放库播放失败')
            else:
                print(u'播放库打开流失败')
         elif dwDataType == NET_DVR_STREAMDATA:
            self.Playctrldll.PlayM4_InputData(self.PlayCtrl_Port, pBuffer, dwBufSize)
         else:
            print (u'其他数据,长度:', dwBufSize)
    def release(self):
        self.Objdll.NET_DVR_StopRealPlay(self.lRealPlayHandle)
        if self.PlayCtrl_Port.value > -1:
            self.Playctrldll.PlayM4_Stop(self.PlayCtrl_Port)
            self.Playctrldll.PlayM4_CloseStream( self.PlayCtrl_Port)
            self.Playctrldll.PlayM4_FreePort( self.PlayCtrl_Port)
            PlayCtrl_Port = c_long(-1)
            self.Objdll.NET_DVR_Logout(self.lUserId)
            self.Objdll.NET_DVR_Cleanup()
        print('释放资源结束')
    def __enter__(self):
        return self
    def __exit__(self, exc_type, exc_val, exc_tb):
        self.release()
if __name__=="__main__":
    camIP ='192.168.1.122'
    DEV_PORT = 8000
    username ='admin'
    password = 'admin'
    hkclass = HKCam(camIP,username,password)
    last_stamp = 0
    while True:
        t0 =time.time()
        n_stamp,img = hkclass.read()
        last_stamp=n_stamp
        cv2.imshow('xxxxxx',cv2.resize(img,(800,600)))
        kkk = cv2.waitKey(1)
        if kkk ==ord('q'):
            break
    hkclass.release()

https://gitee.com/MengNiMeia/python_hkcam https://gitee.com/MengNiMeia/python_hkcam

在这个例子中,我们首先创建一个Flask应用程序,并使用 海康 威视 SDK 创建一个客户端。最后,我们定义了一个名为’gen’的生成器函数,该函数不断获取视频帧,并将其转换为JPEG格式。本文将介绍如何使用 Python 海康 威视 SDK 来实现实时预览功能,快速构建高效的视频监控系统。在本文中,我们介绍了如何使用 Python 海康 威视 SDK 来实现实时预览功能。通过使用 Python 海康 威视 SDK ,可以快速构建高效的视频监控系统。在使用 Python 实现 海康 威视 SDK 之前,需要先安装 海康 威视 SDK 。1.安装 海康 威视 SDK 。 最近在新系统的研发中负责了视频监控模块的开发,项目监控设备全部采用 海康 的摄像头,枪机、球机都有,开发的过程中,有个需求是在前端页面上把摄像头画面进行平铺展示,最开始的方案是通过官方API完成,但是后面发现项目上所有的设备都是不联网的,所以只能转由 SDK 进行二次开发,这里使用的语言是JAVA,框架用的是SpringCloud。1.加载dll函数库。 我们知道 python 的执行效率不是很高,而且由于GIL的原因,导致 python 不能充分利用多核CPU。一般的解决方式是使用多进程,但是多进程开销比较大,而且进程之间的通信也会比较麻烦。因此在解决效率问题上,我们会把那些比较耗时的模块使用C或者C++编写,然后编译成动态链接库,Windows上面是dll,linux上面则是so,编译好之后,交给 python 调用 。而且通过扩展模块的方式还可以解决 python 的GIL的问题,因此如果想要利用多核,我们仍然可以通过扩展模块的方式。 python 如何 调用 扩展模块 python 调用 扩展模块的一种比较简单的方式就是使用 ctypes 这个库,这个库是p 这里写自定义目录标题本文参考以下博文加自己的实践,发现有些步骤是不必要的,例如不需要为 opencv 等第三方库指定路径运行环境:准备工作1、 海康 SDK 下载2、下载安装vcpkg3、安装Swig4、下载 OpenCV -swig接口文件 本文参考以下博文加自己的实践,发现有些步骤是不必要的,例如不需要为 opencv 等第三方库指定路径 https://blog.csdn.net/c20081052/article/details/95082377 https://www.ryannn.com/archives/hi 1.  static void ccvt_420p_rgb565(int width, int height, const unsigned char *src, __u16 *dst) 通过 python 调用 海康 威视工业摄像头并进行图像存储问题(数据流获取问题未能解决) 先说情况,本人是做视觉检测的需要高倍率摄像头进行实时检测,也就是需要深度学习进行图片数据处理,但是这个又是 python 来进行分析,而 海康 威视主要程序代码是以C为主的,传过来的数据我也尝试的去解析都是不能转化成 python 的BGR图像。 具体参照了:通过cv2 调用 海康 威视摄像头,但这个不能 调用 工业摄像头,通过官方给一个400什么软件要激活摄像头,可是却并不能检测到工业摄像头,通过mvs软件 调用 到摄像头地址进行测试也无法获取到摄 我使用的是 海康 DS-2CD852MF-E, 200万, 网络 摄像机,已经比较老了,不过 SDK 海康 官网下载的,开发流程都差不多. 海康 摄像机回调解码后的视频数据格式为YV12,顺便说一下YV12的数据格式   YYYY  V U.   我这个是720P,即1280 * 720分辨率. 那么Y分量的数量为 1280 * 720 = 921600 字节, V = U 数量为Y的1/4, 即921600 自己的一些探索发现:直接连摄像头的时候会报错,主要是分辨率不匹配的问题,原因是相机在刚上电以后会保存初始配置,他的分别率设置和代码中不一样,需要打开 SDK 连接相机,基本属性->像素格式->下拉设置为RGB8,在 SDK 中断开相机连接,再run代码就可以辣。最近用到 海康 的工业相机,需要读取它的视频流,听说RTSP协议延迟较高,并且没有尝试成功,用了CSDN中一篇文章分享的方法。除了main.py,还需要五个py文件,在 海康 SDK 中可以找到,把他们拿出来和main.py放一起就可以开run。