千杯不醉的骆驼 · Nginx配置HTTPS跳转到非443端口的 ...· 1 年前 · |
纯真的仙人掌 · 怎样把chat GPT接入苹果手机 - 抖音· 1 年前 · |
腼腆的烈马 · 深度学习环境用linux还是windows? ...· 1 年前 · |
活泼的酱肘子 · Pandas读取或存储数据至Mysql - ...· 1 年前 · |
2)相机视频流停止预览;
3)相机拍照功能。
需求背景 :客户需要对生产的产品进行定期抽样质检,其中涉及到外观检测,比如,样品的表面清洁度、外观等指标。所以,需要先通过管理平台点击相机的“预览”按钮,进行预览相机拍摄的实时效果,当客户认为清晰度和角度满足条件时,才会点击“拍照”按钮进行拍摄,从而保证质检的照片是有意义和有实用价值的。
由于项目团队同事之前没有做过工业相机视频和拍照的相关开发,于是乎,就开启了“漫长”而“煎熬”的调研之路(断断续续持续了1个多月)。
最终于2022年12月6日,通过 Python “完美”实现了上述的三个功能。
特地写下此篇博客,供需要的网友参考,避免少走很多弯路。
https://www.hikrobotics.com/cn/machinevision/productdetail?id=8518&pageNumber=13&pageSize=20
可以在海康机器人官网提供的 客户端工具 MVS,“ 帮助 ”--> " Development ",点击“Development”会跳转到安装目录,从“Samples”中获取官方提供的一些简单示例。
如果已经安装了 MVS,直接进入 C:\Program Files (x86)\MVS\Development\Samples 目录即可看到,目前支持 C#、C++、 Java 、OpenCV、 Python 、VB等语言。
本人主要使用Java和Python,所以,本篇博文主要从Java和Python两种语言调研了实现方案。
( 很遗憾, 此路不通!!! )
因为我们用的 海康机器人工业相机 MV-CU060-10GM 这款相机 , 不支持 RTSP 协议 。
如下博客适用于 海康威视摄像头 , 并不适用于 海康工业相机 ,如果是使用海康威视摄像头的小伙伴可以参考下。
参考博客: 海康威视摄像头对接SDK实时预览功能和抓拍功能,懒癌福利,可直接CV
( 不能完全满足客户需求, 此路不全通 )
使用Java目前参考官网的示例,实现了图片抓取,并上传的功能,但是没有实现视频流的实时获取和显示的功能。
如果需求只是获取图片,不要求视频流实时显示,可以通过Java就可以实现。
( 目前,网上没有直接能完全满足上述三个需求的,本人通过借鉴、整合,结合Flask框架实现的 )
参考博客如下:
python语言下使用opencv接口cv2.VideoCapture()接口调用海康机器人工业相机 ( 此篇博文可以重点看!!! )
通过python调用海康威视工业摄像头并进行图像存储,同时使用opencv实时图像显示(数据流问题已解决)
python调用海康工业相机并用opencv显示(整体实现) ( 此篇博文可以重点看!!! )
pyQT5 学习使用 笔记 六 pyQt5+opencv 显示海康GIGE相机动态视频流 ( 该方式虽然实现了视频的实时显示,但是,无法被给前端直接调用 )
web实时显示摄像头图像(python) ( 此篇博文可以重点看!!! )
https://www.w3cschool.cn/flask/
https://dormousehole.readthedocs.io/en/latest/
https://blog.csdn.net/weixin_44239541/article/details/89390139
https://zhuanlan.zhihu.com/p/104273184
https://blog.csdn.net/tulan_xiaoxin/article/details/79132214
个人使用的是 Python 3.8.5 版本( 建议使用该版本或者更高版本 )
Python 官网地址: https://www.python.org/
安装和配置可以参考: https://www.runoob.com/python3/python3-install.html
2)Python 虚拟环境
个人建议给该工程单独创建一个Python虚拟环境(如:hikrobotEnv),后续现场如果出现无法连接外网的情况下,可以直接Copy虚拟环境部署,比较方便。
当然,如果不是特殊情况,多个项目工程可以共用一个虚拟环境。
可以使用 virtualenv、anaconda、PyCharm 等创建虚拟环境。
以 virtualenv 为例,创建 hikrobotEnv 虚拟环境的命令如下:
virtualenv hikrobotEnv --python=python3.8.5
如果没有安装 virtualenv,可以通过如下命令安装:
pip install virtualenv
关于 virtualenv 的相关介绍和使用,可以参考博客: Python虚拟环境Virtualenv详解
前提条件 :将 C:\Program Files (x86)\MVS\Development\Samples\Python\ 目录下的 MvImport 目录,copy到自己的工程目录下,创建测试文件 TestGrabImage.py
代码如下:
# -- coding: utf-8 --
import cv2
import sys
import copy
import msvcrt
import numpy as np
from ctypes import *
sys.path.append("./MvImport")
from MvCameraControl_class import *
if __name__ == "__main__":
deviceList = MV_CC_DEVICE_INFO_LIST()
tlayerType = MV_GIGE_DEVICE | MV_USB_DEVICE
# ch:枚举设备 | en:Enum device
ret = MvCamera.MV_CC_EnumDevices(tlayerType, deviceList)
if ret != 0:
print ("enum devices fail! ret[0x%x]" % ret)
sys.exit()
if deviceList.nDeviceNum == 0:
print ("find no device!")
sys.exit()
print ("find %d devices!" % deviceList.nDeviceNum)
for i in range(0, deviceList.nDeviceNum):
mvcc_dev_info = cast(deviceList.pDeviceInfo[i], POINTER(MV_CC_DEVICE_INFO)).contents
if mvcc_dev_info.nTLayerType == MV_GIGE_DEVICE:
print ("\ngige device: [%d]" % i)
strModeName = ""
for per in mvcc_dev_info.SpecialInfo.stGigEInfo.chModelName:
strModeName = strModeName + chr(per)
print ("device model name: %s" % strModeName)
nip1 = ((mvcc_dev_info.SpecialInfo.stGigEInfo.nCurrentIp & 0xff000000) >> 24)
nip2 = ((mvcc_dev_info.SpecialInfo.stGigEInfo.nCurrentIp & 0x00ff0000) >> 16)
nip3 = ((mvcc_dev_info.SpecialInfo.stGigEInfo.nCurrentIp & 0x0000ff00) >> 8)
nip4 = (mvcc_dev_info.SpecialInfo.stGigEInfo.nCurrentIp & 0x000000ff)
print ("current ip: %d.%d.%d.%d\n" % (nip1, nip2, nip3, nip4))
elif mvcc_dev_info.nTLayerType == MV_USB_DEVICE:
print ("\nu3v device: [%d]" % i)
strModeName = ""
for per in mvcc_dev_info.SpecialInfo.stUsb3VInfo.chModelName:
if per == 0:
break
strModeName = strModeName + chr(per)
print ("device model name: %s" % strModeName)
strSerialNumber = ""
for per in mvcc_dev_info.SpecialInfo.stUsb3VInfo.chSerialNumber:
if per == 0:
break
strSerialNumber = strSerialNumber + chr(per)
print ("user serial number: %s" % strSerialNumber)
nConnectionNum = 0
if int(nConnectionNum) >= deviceList.nDeviceNum:
print ("intput error!")
sys.exit()
# ch:创建相机实例 | en:Creat Camera Object
cam = MvCamera()
# ch:选择设备并创建句柄 | en:Select device and create handle
stDeviceList = cast(deviceList.pDeviceInfo[int(nConnectionNum)], POINTER(MV_CC_DEVICE_INFO)).contents
ret = cam.MV_CC_CreateHandle(stDeviceList)
if ret != 0:
print ("create handle fail! ret[0x%x]" % ret)
sys.exit()
# ch:打开设备 | en:Open device
ret = cam.MV_CC_OpenDevice(MV_ACCESS_Exclusive, 0)
if ret != 0:
print ("open device fail! ret[0x%x]" % ret)
sys.exit()
# ch:探测网络最佳包大小(只对GigE相机有效) | en:Detection network optimal package size(It only works for the GigE camera)
if stDeviceList.nTLayerType == MV_GIGE_DEVICE:
nPacketSize = cam.MV_CC_GetOptimalPacketSize()
if int(nPacketSize) > 0:
ret = cam.MV_CC_SetIntValue("GevSCPSPacketSize", nPacketSize)
if ret != 0:
print ("Warning: Set Packet Size fail! ret[0x%x]" % ret)
else:
print ("Warning: Get Packet Size fail! ret[0x%x]" % nPacketSize)
# ch:设置触发模式为off | en:Set trigger mode as off
ret = cam.MV_CC_SetEnumValue("TriggerMode", MV_TRIGGER_MODE_OFF)
if ret != 0:
print ("set trigger mode fail! ret[0x%x]" % ret)
sys.exit()
# ch:获取数据包大小 | en:Get payload size
stParam = MVCC_INTVALUE()
memset(byref(stParam), 0, sizeof(MVCC_INTVALUE))
ret = cam.MV_CC_GetIntValue("PayloadSize", stParam)
if ret != 0:
print ("get payload size fail! ret[0x%x]" % ret)
sys.exit()
nPayloadSize = stParam.nCurValue
# ch:开始取流 | en:Start grab image
ret = cam.MV_CC_StartGrabbing()
if ret != 0:
print ("start grabbing fail! ret[0x%x]" % ret)
sys.exit()
stDeviceList = MV_FRAME_OUT_INFO_EX()
memset(byref(stDeviceList), 0, sizeof(stDeviceList))
data_buf = (c_ubyte * nPayloadSize)()
ret = cam.MV_CC_GetOneFrameTimeout(byref(data_buf), nPayloadSize, stDeviceList, 1000)
if ret == 0:
print ("get one frame: Width[%d], Height[%d], nFrameNum[%d]" % (stDeviceList.nWidth, stDeviceList.nHeight, stDeviceList.nFrameNum))
nRGBSize = stDeviceList.nWidth * stDeviceList.nHeight * 3
stConvertParam=MV_SAVE_IMAGE_PARAM_EX()
stConvertParam.nWidth = stDeviceList.nWidth
stConvertParam.nHeight = stDeviceList.nHeight
stConvertParam.pData = data_buf
stConvertParam.nDataLen = stDeviceList.nFrameLen
stConvertParam.enPixelType = stDeviceList.enPixelType
stConvertParam.nImageLen = stConvertParam.nDataLen
stConvertParam.nJpgQuality = 70
stConvertParam.enImageType = MV_Image_Jpeg
stConvertParam.pImageBuffer = (c_ubyte * nRGBSize)()
stConvertParam.nBufferSize = nRGBSize
# ret = cam.MV_CC_ConvertPixelType(stConvertParam)
print(stConvertParam.nImageLen)
ret = cam.MV_CC_SaveImageEx2(stConvertParam)
if ret != 0:
print ("convert pixel fail ! ret[0x%x]" % ret)
del data_buf
sys.exit()
file_path = "AfterConvert_RGB2.jpg"
file_open = open(file_path.encode('ascii'), 'wb+')
img_buff = (c_ubyte * stConvertParam.nImageLen)()
cdll.msvcrt.memcpy(byref(img_buff), stConvertParam.pImageBuffer, stConvertParam.nImageLen)
file_open.write(img_buff)
print ("Save Image succeed!")
# ch:停止取流 | en:Stop grab image
ret = cam.MV_CC_StopGrabbing()
if ret != 0:
print ("stop grabbing fail! ret[0x%x]" % ret)
del data_buf
sys.exit()
# ch:关闭设备 | Close device
ret = cam.MV_CC_CloseDevice()
if ret != 0:
print ("close deivce fail! ret[0x%x]" % ret)
del data_buf
sys.exit()
# ch:销毁句柄 | Destroy handle
ret = cam.MV_CC_DestroyHandle()
if ret != 0:
print ("destroy handle fail! ret[0x%x]" % ret)
del data_buf
sys.exit()
del data_buf
在python 虚拟环境中,执行 python TestGrabImage.py 运行后,会在当前目录下生成一个名为 AfterConvert_RGB2.jpg 的图片文件。
如果运行过程中提示模块不存在,可以通过 pip install 命令安装相应的模块,建议使用 清华源 ,如 安装 opencv-python ,命令如下:
pip install opencv-python==4.1.2.30 -i https://pypi.tuna.tsinghua.edu.cn/simple
前提条件 :将 C:\Program Files (x86)\MVS\Development\Samples\Python\ 目录下的 MvImport 目录,copy到自己的工程目录下,创建测试文件 TestVideoStream.py
代码如下:
# -- coding: utf-8 -- import cv2 from PyQt5 import QtCore, QtGui, QtWidgets from PyQt5.QtCore import pyqtSignal from PyQt5.QtGui import * import numpy as np #from CameraControl_header import MV_CC_DEVICE_INFO_LIST #from mainWindow import Ui_MainWindow # 导入创建的GUI类 import sys import threading import msvcrt from ctypes import * sys.path.append("./MvImport") from MvCameraControl_class import * from Ui_MainWindow import * from CameraParams_header import * class mywindow(QtWidgets.QMainWindow, Ui_MainWindow): sendAddDeviceName = pyqtSignal() #定义一个添加设备列表的信号。 deviceList = MV_CC_DEVICE_INFO_LIST() g_bExit = False # ch:创建相机实例 | en:Creat Camera Object cam = MvCamera() def connect_and_emit_sendAddDeviceName(self): # Connect the sendAddDeviceName signal to a slot. self.sendAddDeviceName.connect(self.SelectDevice) # Emit the signal. self.sendAddDeviceName.emit() def __init__(self): super(mywindow, self).__init__() self.setupUi(self) self.connect_and_emit_sendAddDeviceName() self.butopenCam.clicked.connect(lambda:self.openCam(self.camSelect.currentData())) self.butcloseCam.clicked.connect(self.closeCam) # setting main window geometry desktop_geometry = QtWidgets.QApplication.desktop() # 获取屏幕大小 main_window_width = desktop_geometry.width() # 屏幕的宽 main_window_height = desktop_geometry.height() # 屏幕的高 rect = self.geometry() # 获取窗口界面大小 window_width = rect.width() # 窗口界面的宽 window_height = rect.height() # 窗口界面的高 x = (main_window_width - window_width) // 2 # 计算窗口左上角点横坐标 y = (main_window_height - window_height) // 2 # 计算窗口左上角点纵坐标 self.setGeometry(x, y, window_width, window_height) # 设置窗口界面在屏幕上的位置 # 无边框以及背景透明一般不会在主窗口中用到,一般使用在子窗口中,例如在子窗口中显示gif提示载入信息等等 # self.setWindowFlags(Qt.FramelessWindowHint) # self.setAttribute(Qt.WA_TranslucentBackground) #打开摄像头。 def openCam(self,camid): self.g_bExit = False # ch:选择设备并创建句柄 | en:Select device and create handle stDeviceList = cast(self.deviceList.pDeviceInfo[int(camid)], POINTER(MV_CC_DEVICE_INFO)).contents ret = self.cam.MV_CC_CreateHandle(stDeviceList) if ret != 0: print("create handle fail! ret[0x%x]" % ret) sys.exit() # ch:打开设备 | en:Open device ret = self.cam.MV_CC_OpenDevice(MV_ACCESS_Exclusive, 0) if ret != 0: print("open device fail! ret[0x%x]" % ret) sys.exit() # ch:探测网络最佳包大小(只对GigE相机有效) | en:Detection network optimal package size(It only works for the GigE camera) if stDeviceList.nTLayerType == MV_GIGE_DEVICE: nPacketSize = self.cam.MV_CC_GetOptimalPacketSize() if int(nPacketSize) > 0: ret = self.cam.MV_CC_SetIntValue("GevSCPSPacketSize", nPacketSize) if ret != 0: print("Warning: Set Packet Size fail! ret[0x%x]" % ret) else: print("Warning: Get Packet Size fail! ret[0x%x]" % nPacketSize) # ch:设置触发模式为off | en:Set trigger mode as off ret = self.cam.MV_CC_SetEnumValue("TriggerMode", MV_TRIGGER_MODE_OFF) if ret != 0: print("set trigger mode fail! ret[0x%x]" % ret) sys.exit() # ch:获取数据包大小 | en:Get payload size stParam = MVCC_INTVALUE() memset(byref(stParam), 0, sizeof(MVCC_INTVALUE)) ret = self.cam.MV_CC_GetIntValue("PayloadSize", stParam) if ret != 0: print("get payload size fail! ret[0x%x]" % ret) sys.exit() nPayloadSize = stParam.nCurValue # ch:开始取流 | en:Start grab image ret = self.cam.MV_CC_StartGrabbing() if ret != 0: print("start grabbing fail! ret[0x%x]" % ret) sys.exit() data_buf = (c_ubyte * nPayloadSize)() try: hThreadHandle = threading.Thread(target=self.work_thread, args=(self.cam, data_buf, nPayloadSize)) hThreadHandle.start() except: print("error: unable to start thread") #关闭相机 def closeCam(self): self.g_bExit=True # ch:停止取流 | en:Stop grab image ret = self.cam.MV_CC_StopGrabbing() if ret != 0: print("stop grabbing fail! ret[0x%x]" % ret) sys.exit() # ch:关闭设备 | Close device ret = self.cam.MV_CC_CloseDevice() if ret != 0: print("close deivce fail! ret[0x%x]" % ret) # ch:销毁句柄 | Destroy handle ret = self.cam.MV_CC_DestroyHandle() if ret != 0: print("destroy handle fail! ret[0x%x]" % ret) def work_thread(self,cam=0, pData=0, nDataSize=0): stFrameInfo = MV_FRAME_OUT_INFO_EX() memset(byref(stFrameInfo), 0, sizeof(stFrameInfo)) while True: QIm = np.asarray(pData) # 将c_ubyte_Array转化成ndarray得到(3686400,) QIm = QIm.reshape((2048, 3072, 1)) # 根据自己分辨率进行转化 # print(temp) # print(temp.shape) QIm = cv2.cvtColor(QIm, cv2.COLOR_BGR2RGB) # 这一步获取到的颜色不对,因为默认是BRG,要转化成RGB,颜色才正常 pyrD1=cv2.pyrDown(QIm) #向下取样 pyrD2 = cv2.pyrDown(pyrD1) # 向下取样 image_height, image_width, image_depth = pyrD2.shape # 读取图像高宽深度 pyrD3 = QImage(pyrD2, image_width, image_height, image_width * image_depth,QImage.Format_RGB888) self.label.setPixmap(QPixmap.fromImage(pyrD3)) #cv2.namedWindow("result", cv2.WINDOW_AUTOSIZE) #cv2.imshow("result", temp) #if cv2.waitKey(1) & 0xFF == ord('q'): # break ret = cam.MV_CC_GetOneFrameTimeout(pData, nDataSize, stFrameInfo, 1000) if ret == 0: print("get one frame: Width[%d], Height[%d], nFrameNum[%d]" % ( stFrameInfo.nWidth, stFrameInfo.nHeight, stFrameInfo.nFrameNum)) else: print("no data[0x%x]" % ret) if self.g_bExit == True: del pData break #获得所有相机的列表存入cmbSelectDevice中 def SelectDevice(self): '''选择所有能用的相机到列表中, gige相机需要配合 sdk 得到。 #得到相机列表 tlayerType = MV_GIGE_DEVICE | MV_USB_DEVICE # ch:枚举设备 | en:Enum device ret = MvCamera.MV_CC_EnumDevices(tlayerType, self.deviceList) if ret != 0: print("enum devices fail! ret[0x%x]" % ret) sys.exit() if self.deviceList.nDeviceNum == 0: print("find no device!") sys.exit() print("Find %d devices!" % self.deviceList.nDeviceNum) for i in range(0, self.deviceList.nDeviceNum): mvcc_dev_info = cast(self.deviceList.pDeviceInfo[i], POINTER(MV_CC_DEVICE_INFO)).contents if mvcc_dev_info.nTLayerType == MV_GIGE_DEVICE: print("\ngige device: [%d]" % i) strModeName = "" for per in mvcc_dev_info.SpecialInfo.stGigEInfo.chModelName: strModeName = strModeName + chr(per) print("device model name: %s" % strModeName) self.camSelect.addItem(strModeName,i) #写入设备列表。 def pushbutton_function(self): #do some things Img=cv2.imread('JP1.JPG') #通过opencv读入一张图片 image_height, image_width, image_depth=Img.shape #读取图像高宽深度 QIm=cv2.cvtColor(Img,cv2.COLOR_BGR2RGB) QIm=QImage(QIm.data, image_width, image_height, image_width * image_depth,QImage.Format_RGB888) self.label.setPixmap(QPixmap.fromImage(QIm)) if __name__ == '__main__': app = QtWidgets.QApplication(sys.argv) window = mywindow() window.show() sys.exit(app.exec_())
上述代码中用到的 Ui_MainWindow.py ,放到 MvImport 目录下。
代码如下:
# -*- coding: utf-8 -*-
# Form implementation generated from reading ui file 'mainWindow.ui'
# Created by: PyQt5 UI code generator 5.15.0
# WARNING: Any manual changes made to this file will be lost when pyuic5 is
# run again. Do not edit this file unless you know what you are doing.
from PyQt5 import QtCore, QtGui, QtWidgets
class Ui_MainWindow(object):
def setupUi(self, MainWindow):
MainWindow.setObjectName("MainWindow")
MainWindow.resize(589, 530)
self.centralwidget = QtWidgets.QWidget(MainWindow)
self.centralwidget.setObjectName("centralwidget")
self.verticalLayout = QtWidgets.QVBoxLayout(self.centralwidget)
self.verticalLayout.setObjectName("verticalLayout")
self.camSelect = QtWidgets.QComboBox(self.centralwidget)
self.camSelect.setObjectName("camSelect")
self.verticalLayout.addWidget(self.camSelect)
self.label = QtWidgets.QLabel(self.centralwidget)
self.label.setObjectName("label")
self.verticalLayout.addWidget(self.label)
self.butopenCam = QtWidgets.QPushButton(self.centralwidget)
self.butopenCam.setObjectName("butopenCam")
self.verticalLayout.addWidget(self.butopenCam)
self.butcloseCam = QtWidgets.QPushButton(self.centralwidget)
self.butcloseCam.setObjectName("butcloseCam")
self.verticalLayout.addWidget(self.butcloseCam)
MainWindow.setCentralWidget(self.centralwidget)
self.menubar = QtWidgets.QMenuBar(MainWindow)
self.menubar.setGeometry(QtCore.QRect(0, 0, 589, 23))
self.menubar.setObjectName("menubar")
MainWindow.setMenuBar(self.menubar)
self.statusbar = QtWidgets.QStatusBar(MainWindow)
self.statusbar.setObjectName("statusbar")
MainWindow.setStatusBar(self.statusbar)
self.retranslateUi(MainWindow)
QtCore.QMetaObject.connectSlotsByName(MainWindow)
def retranslateUi(self, MainWindow):
_translate = QtCore.QCoreApplication.translate
MainWindow.setWindowTitle(_translate("MainWindow", "MainWindow"))
self.label.setText(_translate("MainWindow", "TextLabel"))
self.butopenCam.setText(_translate("MainWindow", "打开相机"))
self.butcloseCam.setText(_translate("MainWindow", "关闭相机"))
在python 虚拟环境中,执行 python TestVideoStream.py 运行,可以看到如下视频中的效果:
如果运行过程中提示模块不存在,可以通过 pip install 命令安装相应的模块,建议使用 清华源 ,如 安装 opencv-python ,命令如下:
pip install opencv-python==4.1.2.30 -i https://pypi.tuna.tsinghua.edu.cn/simple
通过视频中的效果可以看出:虽然实现了视频流的实时获取和显示,但是无法直接对接前端展示,不够友好。
之前的两个测试代码,仅作为测试参考,并非最终代码!!!
此段落中的代码,才是最终代码,可以重点参考!!!
使用 Python + OpenCV + Flask 实现,可以满足视频实时获取,并返回通过GET请求返回给前端进行实时显示,也可以抓取图片,保存上传。
将 C:\Program Files (x86)\MVS\Development\Samples\Python\ 目录下的 MvImport 目录,copy到自己的工程目录下。
cd C:\Program Files (x86)\MVS\Development\ThirdPartyPlatformAdapter\DirectShow\x64\MvDSS2
使用 管理员权限 运行 InstallDSSvc_x64.bat
(本人使用的是64位Windows操作系统,运行代码也是基于64位运行,故而使用该版本;32位系统在上一目录中也存在,大家可以根据实际情况进行安装)
pip install opencv-python==4.1.2.30 -i https://pypi.tuna.tsinghua.edu.cn/simple
个人使用的是 Python 3.8.5 , opencv-python 对应使用 4.1.2.30 版本即可。
如果使用的 opencv-python 的版本过高,可能报如下错误:
cv2.error: Unknown C++ exception from OpenCV code.
pip install flask -i https://pypi.tuna.tsinghua.edu.cn/simple
也可以指定 Flask 的版本,如:
pip install flask=2.2.2 -i https://pypi.tuna.tsinghua.edu.cn/simple
工程代码结构如下:
在工程中创建 templates 目录,用于存放 index.hml
</ head > < h1 > 拍照预览 </ h1 > < img src ="{{ url_for('startPreview') }}" width ="50%" > </ body > </ html >用于规范返回给前端的数据类型,代码如下:
# -- coding: utf-8 --
class JsonResponse(object):
统一的json返回格式
def __init__(self, code, msg, data):
self.code = code
self.msg = msg
self.data = data
@classmethod
def success(cls, code=0, msg='success', data=None):
return cls(code, msg, data)
@classmethod
def error(cls, code=-1, msg='error', data=None):
return cls(code, msg, data)
def to_dict(self):
return {
"code": self.code,
"msg": self.msg,
"data": self.data
2)JsonFlask.py
用于重定义数据返回格式,代码如下:
# -- coding: utf-8 --
from flask import Flask, jsonify
from JsonResponse import *
class JsonFlask(Flask):
def make_response(self, rv):
"""视图函数可以直接返回: list、dict、None"""
if rv is None or isinstance(rv, (list, dict)):
rv = JsonResponse.success(rv)
if isinstance(rv, JsonResponse):
rv = jsonify(rv.to_dict())
return super().make_response(rv)
3)HikRobotCamera.py
核心代码文件,实现如下功能:
开始预览视频流、停止预览视频流、获取图片、记录日志等功能。
代码如下:
# -- coding: utf-8 --
import cv2
from flask import Flask, render_template, Response
import sys
import msvcrt
import base64
import datetime
import logging
sys.path.append("./MvImport")
from MvCameraControl_class import *
from JsonResponse import *
from JsonFlask import *
logging.basicConfig(level=logging.DEBUG,#控制台打印的日志级别
filename='hikrobot.log',
filemode='a',##模式,有w和a,w就是写模式,每次都会重新写日志,覆盖之前的日志
#a是追加模式,默认如果不写的话,就是追加模式
format=
'%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s'
#日志格式
# 这里配置一下 template_folder为当前目录,不然可以找不到 index.html
app = JsonFlask(__name__, template_folder='.')
# index
@app.route('/')
def index():
return render_template('./templates/index.html')
# 获取码流
def generate(cap):
# 捕获异常信息
try:
while True:
# 如果是关闭相机,先退出取视频流的循环
global open
if (not open):
break;
retgrab = cap.grab()
if retgrab == True:
logging.debug("Grab true")
ret1, frame = cap.retrieve()
# print(type(frame))
if frame is None:
logging.error("frame is None")
continue
ret1, jpeg = cv2.imencode('.jpg', frame)
jpg_frame = jpeg.tobytes()
yield (b'--frame\r\n'
b'Content-Type: image/jpeg\r\n\r\n' + jpg_frame + b'\r\n')
except Exception as e:
logging.error("generate error: %s" % str(e))
# 开始预览
@app.route('/startPreview')
def startPreview():
logging.info("======================================")
logging.info("start to preview video stream, current_time: " + str(datetime.datetime.now()))
# 全局变量,用于控制获取视频流的开关状态
global open
open = True
# 全局变量,获取视频连接
global cap
cap = cv2.VideoCapture(1)
if False == cap.isOpened():
logging.error("can't open camera")
quit()
else:
logging.info("start to open camera")
logging.info("open camera ok")
# 分辨率设置 3072*2048(海康机器人工业相机 MV-CU060-10GM)
cap.set(cv2.CAP_PROP_FRAME_WIDTH, 3072)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 2048)
# 帧率配置
cap.set(cv2.CAP_PROP_FPS, 15)
return Response(generate(cap), mimetype='multipart/x-mixed-replace;boundary=frame')
# 停止预览
@app.route('/stopPreview')
def stopPreview():
logging.info("======================================")
logging.info("stop to preview video stream, current_time: " + str(datetime.datetime.now()))
logging.info("start to close camera")
# 全局变量,用于停止循环
global open
open = False
logging.info("release resources start")
# 全局变量,用于释放相机资源
try:
global cap
cap.release()
cv2.destroyAllWindows()
except Exception as e:
logging.error("stopPreview error: %s" % str(e))
logging.info("release resources end")
logging.info("camera closed successfully, current_time: " + str(datetime.datetime.now()))
logging.info("======================================")
return "stop to preview"
@app.route('/openAndSave')
def openAndSave():
logging.info("======================================")
logging.info("start to grab image, current_time: " + str(datetime.datetime.now()))
code = 100000
msg = "连接相机时发生错误"
# img_base64 = None
try:
deviceList = MV_CC_DEVICE_INFO_LIST()
tlayerType = MV_GIGE_DEVICE | MV_USB_DEVICE
# ch:枚举设备 | en:Enum device
ret = MvCamera.MV_CC_EnumDevices(tlayerType, deviceList)
if ret != 0:
logging.error("enum devices fail! ret[0x%x]" % ret)
sys.exit()
if deviceList.nDeviceNum == 0:
logging.error("find no device!")
sys.exit()
logging.info("find %d devices!" % deviceList.nDeviceNum)
for i in range(0, deviceList.nDeviceNum):
mvcc_dev_info = cast(deviceList.pDeviceInfo[i], POINTER(MV_CC_DEVICE_INFO)).contents
if mvcc_dev_info.nTLayerType == MV_GIGE_DEVICE:
logging.info("\ngige device: [%d]" % i)
strModeName = ""
for per in mvcc_dev_info.SpecialInfo.stGigEInfo.chModelName:
strModeName = strModeName + chr(per)
logging.info("device model name: %s" % strModeName)
nip1 = ((mvcc_dev_info.SpecialInfo.stGigEInfo.nCurrentIp & 0xff000000) >> 24)
nip2 = ((mvcc_dev_info.SpecialInfo.stGigEInfo.nCurrentIp & 0x00ff0000) >> 16)
nip3 = ((mvcc_dev_info.SpecialInfo.stGigEInfo.nCurrentIp & 0x0000ff00) >> 8)
nip4 = (mvcc_dev_info.SpecialInfo.stGigEInfo.nCurrentIp & 0x000000ff)
logging.info("current ip: %d.%d.%d.%d\n" % (nip1, nip2, nip3, nip4))
elif mvcc_dev_info.nTLayerType == MV_USB_DEVICE:
logging.info("\nu3v device: [%d]" % i)
strModeName = ""
for per in mvcc_dev_info.SpecialInfo.stUsb3VInfo.chModelName:
if per == 0:
break
strModeName = strModeName + chr(per)
logging.info("device model name: %s" % strModeName)
strSerialNumber = ""
for per in mvcc_dev_info.SpecialInfo.stUsb3VInfo.chSerialNumber:
if per == 0:
break
strSerialNumber = strSerialNumber + chr(per)
logging.info("user serial number: %s" % strSerialNumber)
nConnectionNum = 0
if int(nConnectionNum) >= deviceList.nDeviceNum:
logging.error("intput error!")
sys.exit()
# ch:创建相机实例 | en:Creat Camera Object
cam = MvCamera()
# ch:选择设备并创建句柄 | en:Select device and create handle
stDeviceList = cast(deviceList.pDeviceInfo[int(nConnectionNum)], POINTER(MV_CC_DEVICE_INFO)).contents
ret = cam.MV_CC_CreateHandle(stDeviceList)
if ret != 0:
logging.error("create handle fail! ret[0x%x]" % ret)
sys.exit()
# ch:打开设备 | en:Open device
ret = cam.MV_CC_OpenDevice(MV_ACCESS_Exclusive, 0)
if ret != 0:
logging.error("open device fail! ret[0x%x]" % ret)
sys.exit()
# ch:探测网络最佳包大小(只对GigE相机有效) | en:Detection network optimal package size(It only works for the GigE camera)
if stDeviceList.nTLayerType == MV_GIGE_DEVICE:
nPacketSize = cam.MV_CC_GetOptimalPacketSize()
if int(nPacketSize) > 0:
ret = cam.MV_CC_SetIntValue("GevSCPSPacketSize", nPacketSize)
if ret != 0:
logging.warn("Warning: Set Packet Size fail! ret[0x%x]" % ret)
else:
logging.warn("Warning: Get Packet Size fail! ret[0x%x]" % nPacketSize)
# ch:设置触发模式为off | en:Set trigger mode as off
ret = cam.MV_CC_SetEnumValue("TriggerMode", MV_TRIGGER_MODE_OFF)
if ret != 0:
logging.error("set trigger mode fail! ret[0x%x]" % ret)
sys.exit()
# ch:获取数据包大小 | en:Get payload size
stParam = MVCC_INTVALUE()
memset(byref(stParam), 0, sizeof(MVCC_INTVALUE))
ret = cam.MV_CC_GetIntValue("PayloadSize", stParam)
if ret != 0:
logging.error("get payload size fail! ret[0x%x]" % ret)
sys.exit()
nPayloadSize = stParam.nCurValue
# ch:开始取流 | en:Start grab image
ret = cam.MV_CC_StartGrabbing()
if ret != 0:
logging.error("start grabbing fail! ret[0x%x]" % ret)
sys.exit()
stDeviceList = MV_FRAME_OUT_INFO_EX()
memset(byref(stDeviceList), 0, sizeof(stDeviceList))
data_buf = (c_ubyte * nPayloadSize)()
ret = cam.MV_CC_GetOneFrameTimeout(byref(data_buf), nPayloadSize, stDeviceList, 1000)
if ret == 0:
logging.info("get one frame: Width[%d], Height[%d], nFrameNum[%d]" % (stDeviceList.nWidth, stDeviceList.nHeight, stDeviceList.nFrameNum))
nRGBSize = stDeviceList.nWidth * stDeviceList.nHeight * 3
stConvertParam=MV_SAVE_IMAGE_PARAM_EX()
stConvertParam.nWidth = stDeviceList.nWidth
stConvertParam.nHeight = stDeviceList.nHeight
stConvertParam.pData = data_buf
stConvertParam.nDataLen = stDeviceList.nFrameLen
stConvertParam.enPixelType = stDeviceList.enPixelType
stConvertParam.nImageLen = stConvertParam.nDataLen
stConvertParam.nJpgQuality = 70
stConvertParam.enImageType = MV_Image_Jpeg
stConvertParam.pImageBuffer = (c_ubyte * nRGBSize)()
stConvertParam.nBufferSize = nRGBSize
# ret = cam.MV_CC_ConvertPixelType(stConvertParam)
logging.info("nImageLen: %d" % stConvertParam.nImageLen)
ret = cam.MV_CC_SaveImageEx2(stConvertParam)
if ret != 0:
logging.error("convert pixel fail ! ret[0x%x]" % ret)
del data_buf
sys.exit()
#file_path = "AfterConvert_RGB2.jpg"
#file_open = open(file_path, 'wb+')
#file_open = open(file_path.encode('utf8'), 'wb')
img_buff = (c_ubyte * stConvertParam.nImageLen)()
cdll.msvcrt.memcpy(byref(img_buff), stConvertParam.pImageBuffer, stConvertParam.nImageLen)
#file_open.write(img_buff)
# 对返回的图片进行 base64 格式转换
img_base64 = "data:image/jpg;base64," + str(base64.b64encode(img_buff)).split("'")[1]
code = 200
msg = "success"
logging.info("Save Image succeed!")
# ch:停止取流 | en:Stop grab image
ret = cam.MV_CC_StopGrabbing()
if ret != 0:
logging.error("stop grabbing fail! ret[0x%x]" % ret)
del data_buf
sys.exit()
# ch:关闭设备 | Close device
ret = cam.MV_CC_CloseDevice()
if ret != 0:
logging.error("close deivce fail! ret[0x%x]" % ret)
del data_buf
sys.exit()
# ch:销毁句柄 | Destroy handle
ret = cam.MV_CC_DestroyHandle()
if ret != 0:
logging.error("destroy handle fail! ret[0x%x]" % ret)
del data_buf
sys.exit()
del data_buf
except Exception as e:
logging.error("openAndSave error: %s" % str(e))
# print("openAndSave finished, current_time: " + str(datetime.datetime.now()))
return JsonResponse(code, msg, img_base64)
# 执行web服务, 端口号可自行修订
logging.info("start to run camera app, current_time: " + str(datetime.datetime.now()))
app.run(host='0.0.0.0', port=65432, debug=True, threaded=True)
5、运行代码
进入Python虚拟环境,执行如下命令:
python HikRobotCamera.py
6、功能验证
1)开始预览
可以将 http://127.0.0.1:65432/startPreview 在前端用 img 标签 显示视频的实时预览效果。
http://127.0.0.1:65432/startPreview
2)停止预览
由于使用的这款海康机器人工业相机(MV-CU060-10GM)只能创建一个连接,所以,当预览完实时视频,需要调用该接口释放相机资源,避免资源被长期占用。
http://127.0.0.1:65432/stopPreview
3)获取图片
返回 base64 格式 的图片,前端可以直接接收显示,调用上传接口保存。
http://127.0.0.1:65432/openAndSave
7、运行效果
由于网速问题和相机没有光圈,相机的拍摄效果有点不清晰。运行效果如下:
8、日志查看
工程目录下回生产日志文件 hikrobot.log,内容如下:
HikRobotCamera.py 的改良优化版,对获取图片方法做了简化,更为简洁,代码如下:
# -- coding: utf-8 --
import cv2
from flask import Flask, render_template, Response
import sys
import msvcrt
import base64
import datetime
import logging
sys.path.append("./MvImport")
from MvCameraControl_class import *
from JsonResponse import *
from JsonFlask import *
logging.basicConfig(level=logging.DEBUG,#控制台打印的日志级别
filename='hikrobot.log',
filemode='a',##模式,有w和a,w就是写模式,每次都会重新写日志,覆盖之前的日志
#a是追加模式,默认如果不写的话,就是追加模式
format=
'%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s'
#日志格式
# 这里配置一下 template_folder为当前目录,不然可以找不到 index.html
app = JsonFlask(__name__, template_folder='.')
# index
@app.route('/')
def index():
return render_template('./templates/index.html')
# 获取码流
def generate(cap):
# 捕获异常信息
try:
while True:
# 如果是关闭相机,先退出取视频流的循环
global open
if (not open):
break;
retgrab = cap.grab()
if retgrab == True:
logging.debug("Grab true")
ret1, frame = cap.retrieve()
# print(type(frame))
if frame is None:
logging.error("frame is None")
continue
ret1, jpeg = cv2.imencode('.jpg', frame)
jpg_frame = jpeg.tobytes()
yield (b'--frame\r\n'
b'Content-Type: image/jpeg\r\n\r\n' + jpg_frame + b'\r\n')
except Exception as e:
logging.error("generate error: %s" % str(e))
# 开始预览
@app.route('/startPreview')
def startPreview():
logging.info("================== startPreview start ====================")
logging.info("start to preview video stream, current_time: " + str(datetime.datetime.now()))
# 全局变量,用于控制获取视频流的开关状态
try:
global open
open = True
# 全局变量,获取视频连接
global cap
cap = cv2.VideoCapture(1)
if False == cap.isOpened():
logging.error("startPreview -- can't open camera")
quit()
else:
logging.info("startPreview -- start to open camera")
logging.info("startPreview -- open camera ok")
# 分辨率设置 3072*2048(海康机器人工业相机 MV-CU060-10GM)
cap.set(cv2.CAP_PROP_FRAME_WIDTH, 3072)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 2048)
# 帧率配置
cap.set(cv2.CAP_PROP_FPS, 15)
response = Response(generate(cap), mimetype='multipart/x-mixed-replace;boundary=frame')
except Exception as e:
logging.error("startPreview error: %s" % str(e))
return response
# 停止预览
@app.route('/stopPreview')
def stopPreview():
logging.info("================== stopPreview start ====================")
logging.info("stop to preview video stream, current_time: " + str(datetime.datetime.now()))
logging.info("start to close camera")
# 全局变量,用于停止循环
global open
open = False
logging.info("release resources start")
# 全局变量,用于释放相机资源
try:
global cap
cap.release()
cv2.destroyAllWindows()
except Exception as e:
logging.error("stopPreview error: %s" % str(e))
logging.info("release resources end")
logging.info("camera closed successfully, current_time: " + str(datetime.datetime.now()))
logging.info("=================== stopPreview end ===================")
return "stop to preview"
# 获取base64图片
@app.route('/grabImage')
def grabImage():
code = 100000
msg = "连接相机时发生错误"
# 捕获异常信息
try:
print("grabImage -- stopPreview start")
stopPreview()
print("grabImage -- stopPreview end")
# 全局变量,获取视频连接
global cap
cap = cv2.VideoCapture(1)
if False == cap.isOpened():
logging.error("can't open camera")
quit()
else:
logging.info("grabImage -- start to open camera")
logging.info("grabImage -- open camera ok")
# 分辨率设置 3072*2048(海康机器人工业相机 MV-CU060-10GM)
cap.set(cv2.CAP_PROP_FRAME_WIDTH, 3072)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 2048)
# 帧率配置
cap.set(cv2.CAP_PROP_FPS, 15)
retgrab1 = cap.grab()
if retgrab1 == True:
logging.debug("grabImage -- Grab true")
ret1, frame = cap.retrieve()
ret1, jpeg = cv2.imencode('.jpg', frame)
img_buff = jpeg.tobytes()
# 对返回的图片进行 base64 格式转换
img_base64 = "data:image/jpg;base64," + str(base64.b64encode(img_buff)).split("'")[1]
code = 200
msg = "success"
stopPreview()
except Exception as e:
logging.error("generate error: %s" % str(e))
return JsonResponse(code, msg, img_base64)
# 获取base64图片
@app.route('/hik/openAndSave')
def openAndSave():
code = 100000
msg = "连接相机时发生错误"
# 捕获异常信息
try:
logging.info("================= openAndSave start =====================")
# 拍照之前,先停止预览
stopPreview()
logging.info("start to grab image, current_time: " + str(datetime.datetime.now()))
# 全局变量,获取视频连接
global cap
cap = cv2.VideoCapture(1)
if False == cap.isOpened():
logging.error("openAndSave -- can't open camera")
quit()
else:
logging.info("openAndSave -- start to open camera")
logging.info("openAndSave -- open camera ok")
# 分辨率设置 3072*2048(海康机器人工业相机 MV-CU060-10GM)
cap.set(cv2.CAP_PROP_FRAME_WIDTH, 3072)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 2048)
# 帧率配置
cap.set(cv2.CAP_PROP_FPS, 15)
retgrab1 = cap.grab()
if retgrab1 == True:
logging.debug("openAndSave -- Grab true")
ret1, frame = cap.retrieve()
ret1, jpeg = cv2.imencode('.jpg', frame)
img_buff = jpeg.tobytes()
# 对返回的图片进行 base64 格式转换
img_base64 = "data:image/jpg;base64," + str(base64.b64encode(img_buff)).split("'")[1]
code = 200
msg = "success"
stopPreview()
logging.info("================= openAndSave end =====================")
except Exception as e:
logging.error("openAndSave error: %s" % str(e))
return JsonResponse(code, msg, img_base64)
@app.route('/hik/openAndSave2')
def openAndSave2():
# 拍照之前,先停止预览
stopPreview()
logging.info("======================================")
logging.info("start to grab image, current_time: " + str(datetime.datetime.now()))
code = 100000
msg = "连接相机时发生错误"
# img_base64 = None
try:
deviceList = MV_CC_DEVICE_INFO_LIST()
tlayerType = MV_GIGE_DEVICE | MV_USB_DEVICE
# ch:枚举设备 | en:Enum device
ret = MvCamera.MV_CC_EnumDevices(tlayerType, deviceList)
if ret != 0:
logging.error("enum devices fail! ret[0x%x]" % ret)
sys.exit()
if deviceList.nDeviceNum == 0:
logging.error("find no device!")
sys.exit()
logging.info("find %d devices!" % deviceList.nDeviceNum)
for i in range(0, deviceList.nDeviceNum):
mvcc_dev_info = cast(deviceList.pDeviceInfo[i], POINTER(MV_CC_DEVICE_INFO)).contents
if mvcc_dev_info.nTLayerType == MV_GIGE_DEVICE:
logging.info("\ngige device: [%d]" % i)
strModeName = ""
for per in mvcc_dev_info.SpecialInfo.stGigEInfo.chModelName:
strModeName = strModeName + chr(per)
logging.info("device model name: %s" % strModeName)
nip1 = ((mvcc_dev_info.SpecialInfo.stGigEInfo.nCurrentIp & 0xff000000) >> 24)
nip2 = ((mvcc_dev_info.SpecialInfo.stGigEInfo.nCurrentIp & 0x00ff0000) >> 16)
nip3 = ((mvcc_dev_info.SpecialInfo.stGigEInfo.nCurrentIp & 0x0000ff00) >> 8)
nip4 = (mvcc_dev_info.SpecialInfo.stGigEInfo.nCurrentIp & 0x000000ff)
logging.info("current ip: %d.%d.%d.%d\n" % (nip1, nip2, nip3, nip4))
elif mvcc_dev_info.nTLayerType == MV_USB_DEVICE:
logging.info("\nu3v device: [%d]" % i)
strModeName = ""
for per in mvcc_dev_info.SpecialInfo.stUsb3VInfo.chModelName:
if per == 0:
break
strModeName = strModeName + chr(per)
logging.info("device model name: %s" % strModeName)
strSerialNumber = ""
for per in mvcc_dev_info.SpecialInfo.stUsb3VInfo.chSerialNumber:
if per == 0:
break
strSerialNumber = strSerialNumber + chr(per)
logging.info("user serial number: %s" % strSerialNumber)
nConnectionNum = 0
if int(nConnectionNum) >= deviceList.nDeviceNum:
logging.error("intput error!")
sys.exit()
# ch:创建相机实例 | en:Creat Camera Object
cam = MvCamera()
# ch:选择设备并创建句柄 | en:Select device and create handle
stDeviceList = cast(deviceList.pDeviceInfo[int(nConnectionNum)], POINTER(MV_CC_DEVICE_INFO)).contents
ret = cam.MV_CC_CreateHandle(stDeviceList)
if ret != 0:
logging.error("create handle fail! ret[0x%x]" % ret)
sys.exit()
# ch:打开设备 | en:Open device
ret = cam.MV_CC_OpenDevice(MV_ACCESS_Exclusive, 0)
if ret != 0:
logging.error("open device fail! ret[0x%x]" % ret)
sys.exit()
# ch:探测网络最佳包大小(只对GigE相机有效) | en:Detection network optimal package size(It only works for the GigE camera)
if stDeviceList.nTLayerType == MV_GIGE_DEVICE:
nPacketSize = cam.MV_CC_GetOptimalPacketSize()
if int(nPacketSize) > 0:
ret = cam.MV_CC_SetIntValue("GevSCPSPacketSize", nPacketSize)
if ret != 0:
logging.warn("Warning: Set Packet Size fail! ret[0x%x]" % ret)
else:
logging.warn("Warning: Get Packet Size fail! ret[0x%x]" % nPacketSize)
# ch:设置触发模式为off | en:Set trigger mode as off
ret = cam.MV_CC_SetEnumValue("TriggerMode", MV_TRIGGER_MODE_OFF)
if ret != 0:
logging.error("set trigger mode fail! ret[0x%x]" % ret)
sys.exit()
# ch:获取数据包大小 | en:Get payload size
stParam = MVCC_INTVALUE()
memset(byref(stParam), 0, sizeof(MVCC_INTVALUE))
ret = cam.MV_CC_GetIntValue("PayloadSize", stParam)
if ret != 0:
logging.error("get payload size fail! ret[0x%x]" % ret)
sys.exit()
nPayloadSize = stParam.nCurValue
# ch:开始取流 | en:Start grab image
ret = cam.MV_CC_StartGrabbing()
if ret != 0:
logging.error("start grabbing fail! ret[0x%x]" % ret)
sys.exit()
stDeviceList = MV_FRAME_OUT_INFO_EX()
memset(byref(stDeviceList), 0, sizeof(stDeviceList))
data_buf = (c_ubyte * nPayloadSize)()
ret = cam.MV_CC_GetOneFrameTimeout(byref(data_buf), nPayloadSize, stDeviceList, 1000)
if ret == 0:
logging.info("get one frame: Width[%d], Height[%d], nFrameNum[%d]" % (stDeviceList.nWidth, stDeviceList.nHeight, stDeviceList.nFrameNum))
nRGBSize = stDeviceList.nWidth * stDeviceList.nHeight * 3
stConvertParam=MV_SAVE_IMAGE_PARAM_EX()
stConvertParam.nWidth = stDeviceList.nWidth
stConvertParam.nHeight = stDeviceList.nHeight
stConvertParam.pData = data_buf
stConvertParam.nDataLen = stDeviceList.nFrameLen
stConvertParam.enPixelType = stDeviceList.enPixelType
stConvertParam.nImageLen = stConvertParam.nDataLen
stConvertParam.nJpgQuality = 70
stConvertParam.enImageType = MV_Image_Jpeg
stConvertParam.pImageBuffer = (c_ubyte * nRGBSize)()
stConvertParam.nBufferSize = nRGBSize
# ret = cam.MV_CC_ConvertPixelType(stConvertParam)
logging.info("nImageLen: %d" % stConvertParam.nImageLen)
ret = cam.MV_CC_SaveImageEx2(stConvertParam)
if ret != 0:
logging.error("convert pixel fail ! ret[0x%x]" % ret)
del data_buf
sys.exit()
#file_path = "AfterConvert_RGB2.jpg"
#file_open = open(file_path, 'wb+')
#file_open = open(file_path.encode('utf8'), 'wb')
img_buff = (c_ubyte * stConvertParam.nImageLen)()
cdll.msvcrt.memcpy(byref(img_buff), stConvertParam.pImageBuffer, stConvertParam.nImageLen)
#file_open.write(img_buff)
# 对返回的图片进行 base64 格式转换
img_base64 = "data:image/jpg;base64," + str(base64.b64encode(img_buff)).split("'")[1]
code = 200
msg = "success"
logging.info("Save Image succeed!")
# ch:停止取流 | en:Stop grab image
ret = cam.MV_CC_StopGrabbing()
if ret != 0:
logging.error("stop grabbing fail! ret[0x%x]" % ret)
del data_buf
sys.exit()
# ch:关闭设备 | Close device
ret = cam.MV_CC_CloseDevice()
if ret != 0:
logging.error("close deivce fail! ret[0x%x]" % ret)
del data_buf
sys.exit()
# ch:销毁句柄 | Destroy handle
ret = cam.MV_CC_DestroyHandle()
if ret != 0:
logging.error("destroy handle fail! ret[0x%x]" % ret)
del data_buf
sys.exit()
del data_buf
except Exception as e:
logging.error("openAndSave error: %s" % str(e))
# print("openAndSave finished, current_time: " + str(datetime.datetime.now()))
return JsonResponse(code, msg, img_base64)
# 执行web服务, 端口号可自行修订
logging.info("start to run camera app, current_time: " + str(datetime.datetime.now()))
if __name__ == '__main__':
try:
app.run(host='0.0.0.0', port=65432, debug=True, threaded=True)
except Exception as e:
logging.error("app error: %s" % str(e))