本文介绍自定义算子管理的主要功能。
1 什么是自定义算子
自定义算子是用户自己开发的算法,例如某个预标注算法,通过打包成代码镜像的方式注册到数据管理平台上,可以在数据处理任务中使用。
此处定义的算子可在后续数据处理任务时调用运行,包括“数据标准化算子”、“数据预处理算子”和“数据预标注算子”。
数据标准化算子:对接入平台的非标准格式数据进行标准化,支持RGB图像、视频、传感器内外参、点云、激光雷达原始扫描、毫米波雷达原始扫描、毫米波雷达目标追踪、感知输出障碍物、静态地图、GPS输出、IMU输出、里程计输出、融合定位输出结果、算法输出轨迹、地盘输出、导航输出、标定等数据的标准化。 平台内置ROS2 STD、Autowave、Appllo Cyber等数据格式的标准化程序支持您自定义所需算子。
数据预处理算子:对数据进行智能预处理平台内置目标视觉增强、视觉图片视角偏转、各类点云滤波、图像分辨率调整、图像颜色通道调整等预处理算法支持您自定义所需算子。
数据预标注算子:对数据进行智能预标注平台内置2D图片/3D点云BondingBox与Tracking标注算法同时支持您自定义所需算子。
2 自定义算子的开发
用户的自定义算子需要集成平台提供的开发框架,以便能够在平台中被正常调用。
开发框架为您提供以下功能:
1、并行计算能力:您可以通过调用开发框架中提供的方法,使您的自定义算子能够支持并发计算。
2、本地联调的能力:开发框架模拟了产品线上环境,您可以在本地联调成功后上传到云上。
2.1 安装SDK
1、项目根目录创建SDK目录,下载SDK文件并放置在该目录下:ali_autodrive-0.0.1.tar.gz
2、安装模块:pip install sdk/ali_autodrive-0.0.1.tar.gz
2.2 实现抽象类
1、项目根目录创建模块(例如example),在example模块中创建数据处理类(如DataProcessor)。
2、DataProcessor实现抽象方法DataProcessTaskAbstract,重写拆分方法和处理方法。
import json
from abc import ABC
from PIL import Image as pilImage
from ali_autodrive.parallel_compute.executor_agent.LogUtil import *
from ali_autodrive.parallel_compute.model.FileContent import FileContent
from ali_autodrive.parallel_compute.utils.tree_util import *
from ali_autodrive.parallel_compute.DataProcessTaskAbstract import DataProcessTaskAbstract
from ali_autodrive.parallel_compute.biz_enum.DataTypeEnum import DataTypeEnum
# 节点参数
NODE_DATA_TRANSFORM_PARAMS = "transformParams"
# 车辆ID
VEHICLE_ID = "vehicleId"
class DataProcessor(DataProcessTaskAbstract, ABC):
def __init__(self):
super(DataProcessor, self).__init__()
# 拆分方法
def data_partition(self, context):
self.get_logger().info("Data partition start.")
# 数据分片,每个文件拆分成一个分片的场景
file_list = get_sub_node_file_list(self.get_file_tree())
for file in file_list:
content = FileContent()
content.file_path = file
# 每次保存生成一个分片
self.save_data_partition([json.dumps(content.__dict__)])
self.get_logger().info("Data partition end.")
# 处理方法,图片格式转换为JPEG
def data_process(self, context):
self.get_logger().info("Data process start.")
# 获取工作空间
workspace = self.get_user_workspace()
self.get_logger().info("Workspace is " + workspace)
# 获取节点配置参数
params = json.loads(self.get_parameters()[NODE_DATA_TRANSFORM_PARAMS])
vehicle_id = params[VEHICLE_ID]
self.get_logger().info("VehicleId is " + vehicle_id)
# 获取当前分片文件本地路径列表
local_file_list = get_sub_node_local_file_list(self.get_file_tree())
# 转换图片格式,获取新文件列表
file_contents = self.__convert_image_format(workspace, local_file_list)
# 存储新文件
self.save_data_partition(file_contents)
self.get_logger().info("Data process end.")
def __convert_image_format(self, workspace, local_file_list):
# 工作空间创建临时图片目录
image_path = os.path.join(workspace, "image")
if not os.path.exists(image_path):
os.makedirs(image_path)
# 逐个转换图片格式,临时目录创建新文件保存
image_file_list = []
for file in local_file_list:
new_file_name = os.path.basename(file).split(".")[-2] + ".JPEG"
new_file_path = os.path.join(image_path, new_file_name)
im = pilImage.open(file)
im.save(new_file_path, format='JPEG')
im.close()
image_file_list.append(self.__get_file_content(new_file_path, im.height, im.width))
return image_file_list
# 获取文件描述信息,json结构, 会自动创建数据集,可以在自动驾驶平台数据检索和回放
@staticmethod
def __get_file_content(file_path, height, width):
# 文件打标
file_tag = {"header": {}}
file_tag["header"]["data_type"] = DataTypeEnum.camera_img.name
file_tag["height"] = height
file_tag["width"] = width
file_tag["format"] = 'JPEG'
# 待保存文件描述信息
content = FileContent()
content.file_tag = file_tag
content.file_path = file_path
return json.dumps(content.__dict__)
3 本地测试
3.1 创建配置文件
项目根目录创建配置文件(如config.ini)
[init]
#表示工作目录,替换为自己机器工作空间
workspace = /Users/icyore/workspace
#模拟OSS地址,替换为自己的测试文件目录
ossInput = /Users/icyore/oss/input
#模拟OSS地址,替换为测试输出目录
ossOutput = /Users/icyore/oss/output
#【可空】算子初始化参数,可以在算子中获取,同数据管理平台标准化转换节点的“转换程序参数”,JSON字符串类型。
transformParams ={\"vehicleId\":\"并行计算测试车\"}
3.2 创建启动脚本
项目根目录创建启动脚本(如test_start.py)
from ali_autodrive.parallel_compute.service_startup import *
# ./config.ini 配置文件路径,可以是相对当前目录路径,也可以是绝对路径
# example.DataProcessor 算子所在模块
# DataProcessor 算子实现类
test("./config.ini", "example.DataProcessor", "DataProcessor")
3.3 运行启动脚本
运行启动脚本
python test_start.py
4 创建镜像
4.1 打包模块
项目根目录创建setup.py文件,执行打包命令: python setup.py sdist
注意:打包生成的文件存储在dist目录下
# -*- coding:utf-8 -*-
from setuptools import (setup, find_packages)
setup(
name="example",
version="0.0.1",
# 需要包含的子包列表
packages=find_packages(),
# 添加依赖
install_requires=[
#'python-lzf==0.2.4',
)
4.2 创建Dockerfile
项目根目录创建Dockerfile文件
FROM python:3.8
COPY . /app
WORKDIR /app
RUN pip install sdk/ali_autodrive-0.0.1.tar.gz
ADD sdk/ali_autodrive-0.0.1.tar.gz ali_autodrive
RUN pip install dist/example-0.0.1.tar.gz
WORKDIR /app/ali_autodrive/ali_autodrive-0.0.1/ali_autodrive/parallel_compute
EXPOSE 5000
#example.DataProcessor 算子所在模块
#DataProcessor 算子实现类
CMD ["python","service_startup.py" ,"example.DataProcessor","DataProcessor"]
提示:为了提升镜像打包速度,可以将上一个版本当作基础镜像,基于基础镜像打包,节省模块安装耗时。
4.3 上传镜像
制作镜像并上传到ACR
docker login --username=jieran.gjj@city-brain-pro auto-driver-registry.cn-hangzhou.cr.aliyuncs.com
docker build -t parallel-compute-example:0.0.1 .
docker tag parallel-compute-example:0.0.1 auto-driver-registry.cn-hangzhou.cr.aliyuncs.com/partition_compute/parallel-compute-example:0.0.1
docker push auto-driver-registry.cn-hangzhou.cr.aliyuncs.com/partition_compute/parallel-compute-example:0.0.1
其中auto-driver-registry.cn-hangzhou.cr.aliyuncs.com为ACR仓库地址,parallel_compute为ACR命名空间,需要替换为自己的地址,并使用自己的账号进行登录。
其中parallel-compute-example:0.0.1为镜像名:版本号,可以自定义名称和版本号。