本文介绍自定义算子管理的主要功能。

1 什么是自定义算子

自定义算子是用户自己开发的算法,例如某个预标注算法,通过打包成代码镜像的方式注册到数据管理平台上,可以在数据处理任务中使用。

说明

此处定义的算子可在后续数据处理任务时调用运行,包括“数据标准化算子”、“数据预处理算子”和“数据预标注算子”。

数据标准化算子:对接入平台的非标准格式数据进行标准化,支持RGB图像、视频、传感器内外参、点云、激光雷达原始扫描、毫米波雷达原始扫描、毫米波雷达目标追踪、感知输出障碍物、静态地图、GPS输出、IMU输出、里程计输出、融合定位输出结果、算法输出轨迹、地盘输出、导航输出、标定等数据的标准化。 平台内置ROS2 STD、Autowave、Appllo Cyber等数据格式的标准化程序支持您自定义所需算子。

数据预处理算子:对数据进行智能预处理平台内置目标视觉增强、视觉图片视角偏转、各类点云滤波、图像分辨率调整、图像颜色通道调整等预处理算法支持您自定义所需算子。

数据预标注算子:对数据进行智能预标注平台内置2D图片/3D点云BondingBox与Tracking标注算法同时支持您自定义所需算子。

2 自定义算子的开发

用户的自定义算子需要集成平台提供的开发框架,以便能够在平台中被正常调用。

开发框架为您提供以下功能:

1、并行计算能力:您可以通过调用开发框架中提供的方法,使您的自定义算子能够支持并发计算。

2、本地联调的能力:开发框架模拟了产品线上环境,您可以在本地联调成功后上传到云上。

2.1 安装SDK

1、项目根目录创建SDK目录,下载SDK文件并放置在该目录下:ali_autodrive-0.0.1.tar.gz

2、安装模块:pip install sdk/ali_autodrive-0.0.1.tar.gz

2.2 实现抽象类

1、项目根目录创建模块(例如example),在example模块中创建数据处理类(如DataProcessor)。

2、DataProcessor实现抽象方法DataProcessTaskAbstract,重写拆分方法和处理方法。

import json
from abc import ABC
from PIL import Image as pilImage
from ali_autodrive.parallel_compute.executor_agent.LogUtil import *
from ali_autodrive.parallel_compute.model.FileContent import FileContent
from ali_autodrive.parallel_compute.utils.tree_util import *
from ali_autodrive.parallel_compute.DataProcessTaskAbstract import DataProcessTaskAbstract
from ali_autodrive.parallel_compute.biz_enum.DataTypeEnum import DataTypeEnum
# 节点参数
NODE_DATA_TRANSFORM_PARAMS = "transformParams"
# 车辆ID
VEHICLE_ID = "vehicleId"
class DataProcessor(DataProcessTaskAbstract, ABC):
    def __init__(self):
        super(DataProcessor, self).__init__()
    # 拆分方法
    def data_partition(self, context):
        self.get_logger().info("Data partition start.")
        # 数据分片,每个文件拆分成一个分片的场景
        file_list = get_sub_node_file_list(self.get_file_tree())
        for file in file_list:
            content = FileContent()
            content.file_path = file
            # 每次保存生成一个分片
            self.save_data_partition([json.dumps(content.__dict__)])
        self.get_logger().info("Data partition end.")
    # 处理方法,图片格式转换为JPEG
    def data_process(self, context):
        self.get_logger().info("Data process start.")
        # 获取工作空间
        workspace = self.get_user_workspace()
        self.get_logger().info("Workspace is " + workspace)
        # 获取节点配置参数
        params = json.loads(self.get_parameters()[NODE_DATA_TRANSFORM_PARAMS])
        vehicle_id = params[VEHICLE_ID]
        self.get_logger().info("VehicleId is " + vehicle_id)
        # 获取当前分片文件本地路径列表
        local_file_list = get_sub_node_local_file_list(self.get_file_tree())
        # 转换图片格式,获取新文件列表
        file_contents = self.__convert_image_format(workspace, local_file_list)
        # 存储新文件
        self.save_data_partition(file_contents)
        self.get_logger().info("Data process end.")
    def __convert_image_format(self, workspace, local_file_list):
        # 工作空间创建临时图片目录
        image_path = os.path.join(workspace, "image")
        if not os.path.exists(image_path):
            os.makedirs(image_path)
        # 逐个转换图片格式,临时目录创建新文件保存
        image_file_list = []
        for file in local_file_list:
            new_file_name = os.path.basename(file).split(".")[-2] + ".JPEG"
            new_file_path = os.path.join(image_path, new_file_name)
            im = pilImage.open(file)
            im.save(new_file_path, format='JPEG')
            im.close()
            image_file_list.append(self.__get_file_content(new_file_path, im.height, im.width))
        return image_file_list
    # 获取文件描述信息,json结构, 会自动创建数据集,可以在自动驾驶平台数据检索和回放
    @staticmethod
    def __get_file_content(file_path, height, width):
        # 文件打标
        file_tag = {"header": {}}
        file_tag["header"]["data_type"] = DataTypeEnum.camera_img.name
        file_tag["height"] = height
        file_tag["width"] = width
        file_tag["format"] = 'JPEG'
        # 待保存文件描述信息
        content = FileContent()
        content.file_tag = file_tag
        content.file_path = file_path
        return json.dumps(content.__dict__)

3 本地测试

3.1 创建配置文件

项目根目录创建配置文件(如config.ini)

[init]
#表示工作目录,替换为自己机器工作空间
workspace = /Users/icyore/workspace
#模拟OSS地址,替换为自己的测试文件目录
ossInput = /Users/icyore/oss/input 
#模拟OSS地址,替换为测试输出目录
ossOutput = /Users/icyore/oss/output
#【可空】算子初始化参数,可以在算子中获取,同数据管理平台标准化转换节点的“转换程序参数”,JSON字符串类型。
transformParams ={\"vehicleId\":\"并行计算测试车\"}

3.2 创建启动脚本

项目根目录创建启动脚本(如test_start.py)

from ali_autodrive.parallel_compute.service_startup import *
# ./config.ini   配置文件路径,可以是相对当前目录路径,也可以是绝对路径
# example.DataProcessor  算子所在模块
# DataProcessor  算子实现类
test("./config.ini", "example.DataProcessor", "DataProcessor")

3.3 运行启动脚本

运行启动脚本

python test_start.py

4 创建镜像

4.1 打包模块

项目根目录创建setup.py文件,执行打包命令: python setup.py sdist

注意:打包生成的文件存储在dist目录下

# -*- coding:utf-8 -*-
from setuptools import (setup, find_packages)
setup(
    name="example",
    version="0.0.1",
    # 需要包含的子包列表
    packages=find_packages(),
    # 添加依赖
    install_requires=[
        #'python-lzf==0.2.4',
)

4.2 创建Dockerfile

项目根目录创建Dockerfile文件

FROM python:3.8
COPY . /app
WORKDIR /app
RUN pip install sdk/ali_autodrive-0.0.1.tar.gz
ADD   sdk/ali_autodrive-0.0.1.tar.gz  ali_autodrive
RUN pip install dist/example-0.0.1.tar.gz
WORKDIR /app/ali_autodrive/ali_autodrive-0.0.1/ali_autodrive/parallel_compute
EXPOSE 5000
#example.DataProcessor  算子所在模块 
#DataProcessor  算子实现类
CMD ["python","service_startup.py" ,"example.DataProcessor","DataProcessor"]

提示:为了提升镜像打包速度,可以将上一个版本当作基础镜像,基于基础镜像打包,节省模块安装耗时。

4.3 上传镜像

制作镜像并上传到ACR

docker login --username=jieran.gjj@city-brain-pro auto-driver-registry.cn-hangzhou.cr.aliyuncs.com
docker build -t parallel-compute-example:0.0.1 .
docker tag parallel-compute-example:0.0.1 auto-driver-registry.cn-hangzhou.cr.aliyuncs.com/partition_compute/parallel-compute-example:0.0.1
docker push auto-driver-registry.cn-hangzhou.cr.aliyuncs.com/partition_compute/parallel-compute-example:0.0.1

其中auto-driver-registry.cn-hangzhou.cr.aliyuncs.com为ACR仓库地址,parallel_compute为ACR命名空间,需要替换为自己的地址,并使用自己的账号进行登录。

其中parallel-compute-example:0.0.1为镜像名:版本号,可以自定义名称和版本号。