从 InfluxDB 迁移数据到 DolphinDB
InfluxDB 是一款流行的时序数据处理平台,有着丰富的数据导入方式,完善的集数据处理、预警、可视化等功能为一体的操作界面。其主要是面向物联网场景开发的,同时从 2.0 版本开始,使用了基于 Flux 语言的操作模型来处理数据,对于用户来讲有一定的学习成本。
DolphinDB 是一款国产的高性能分布式时序数据库产品。支持 SQL 和使用类 Python 的语法来处理数据,相比 Flux 语言来讲学习成本较低。同时,DolphinDB 提供了 1400 多个函数,对于复杂的数据处理场景有很强的表达能力,极大的降低了用户开发成本。在数据存储压缩率方面,InfluxDB 和 DolphinDB 的相差不多。而数据查询性能方面,DolphinDB 要比 InfluxDB 高很多,甚至有多个数量级的差异。DolphinDB 也有强大的流式增量计算能力,满足用户实时数据的处理需求。
本文旨在为有从 InfluxDB 迁移至 DolphinDB 需求的用户提供一份简洁明了的参考。
1 实现方法
InfluxDB 支持丰富的数据源接入。在将数据导出时,用户可以通过 API 接口读取数据,并根据实际业务需求转换数据并导出,但这种方式会带来开发上的成本。
这种不同数据源之间的数据同步需求,催生了类似于 DataX 这样的同步工具/平台的诞生。这些工具主要通过插件实现各个数据源的读取和写入,用户只需提供一个配置文件,就可以实现不同数据源之间的同步需求。
DolphinDB 支持用户通过 DataX 插件实现数据的读写,但考虑到,InfluxDB1.0 和 2.0 版本均不支持 DataX 插件,我们基于另一个数据同步平台 Addax,开发了 DolphinDB 写入插件,通过配合 InfluxDB 的读写插件,实现将数据从 InfluxDB 迁移到 DolphinDB 的功能。
对于用户来讲,只需要配置 Addax 的任务文件,即配置 InfluxDB 的读插件参数以及 DolphinDB 的写插件参数,就可以完成从 InfluxDB 迁移数据到 DolphinDB 的功能。
2 迁移案例与操作步骤
在物联网监控场景中,数据有来源多、维度高、数据量大的特点。用户常见的需求是,在采集传感器数据时,实时写入、聚合数据,并做异常检测、预警等分析操作。这对数据库系统的吞吐量和实时分析能力提出了很高的要求。
在这方面,DolphinDB 有着丰富的实践案例,可以参考 DolphinDB 物联网范例 , DolphinDB 在工业物联网的应用 , DolphinDB 流计算在物联网的应用:实时检测传感器状态变化 等。
我们以物联网设备传感器的数据为例,来说明如何将 InfluxDB 的数据迁移到 DolphinDB。
2.1 生成测试数据
在实际迁移过程中,我们需要注意数据完成迁移后所使用的数据类型。具体 DolphinDB 支持的数据类型,可以参考 数据类型
这里以 InfluxDB 官方提供的示例数据里的“设备示例数据”( Sample data ) 为例,说明如何把数据导入到 DolphinDB。
在导入之前,我们创建一个 InfluxDB 的 bucket, 并使用 Flux 脚本导入测试数据到测试的 bucket,代码文件 genMockData.flux 内容如下:
import "influxdata/influxdb/sample"
sample.data(set: "machineProduction")
|> to(bucket: "demo-bucket")
2.2 在 DolphinDB 创建表
针对上面的测试数据,我们需要在 DolphinDB 里创建对应的库表,用于存储迁移过来的数据。对于实际的数据,需要综合考虑被迁移数据的字段,类型,数据量,在 DolphinDB 是否需要分区,分区方案,使用 OLAP 还是 TSDB 引擎等情况,去设计建库建表方案。一些数据存储库表设计实践,可以参考 分区数据库
本例建表文件 createTable.dos 内容如下:
login("admin","123456")
dbName="dfs://demo"
tbName="pt"
colNames=`time`stateionID`grinding_time`oil_temp`pressure`pressure_target`rework_time`state
colTypes=[NANOTIMESTAMP,SYMBOL,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,SYMBOL]
schemaTb=table(1:0,colNames,colTypes)
db = database(dbName, VALUE,2021.08.01..2022.12.31)
pt = db.createPartitionedTable(schemaTb, "pt", `time)
2.3 部署 Addax
Addax 有2种安装方式,一种是下载已经编译好的二进制包,一种是下载源码,并从源码编译安装。可以参考其官方 快速使用 来部署。
2.4 部署 Addax-DolphinDBWriter 插件
下载已编译好的 Addax 的 DolphinDB 写插件 ,拷贝到 Addax 的插件目录 plugin/writer 目录下。
或者也可以自行编译插件。
2.5 定义迁移任务
在定义迁移任务,主要是按照 Addax 的 任务配置 ,配置 InfluxDB 2.0 读插件 和 DolphinDB 写插件 参数。对于时间类型的数据,也需要注意一下格式转换的问题。
2.6 时间格式的转换
InfluxDB 2.0 中存储时间类型数据,采用的是带有时区信息的 RFC3339 格式。而 DolphinDB 里存储的是本地时间,没有时区的概念。在使用 InfluxDB2.0 插件读取数据时,此时如果直接写入 DolphinDB 表,会造成转换错误。因此需要自定义函数,去处理和转化数据。这部分转换规则可以在 dolphindb 的写入插件中去定义,通过定义参数 saveFunctionName 和 saveFunctionDef 来指定转换函数(具体转换函数的格式参数,可以参考 datax-writer )来实现时间列数据转换(可以参考下面的案例)。
2.7 编写配置文件
编写数据迁移任务的配置文件 influx2ddb.json (具体的每个配置参数的含义,可以参考附录说明),文件内容如下:
{
"job": {
"content": {
"reader": {
"name": "influxdb2reader",
"parameter": {
"column": ["*"],
"connection": [
"endpoint": "http://183.136.170.168:8086",
"bucket": "demo-bucket",
"table": [
"machinery"
"org": "zhiyu"
"token": "GLiPjQFQIxzVO0-atASJHH4b075sTlyEZGrqW20XURkelUT5pOlfhi_Yuo2fjcSKVZvyuO00kdXunWPrpJd_kg==",
"range": [
"2007-08-09"
"writer": {
"name": "dolphindbwriter",
"parameter": {
"userId": "admin",
"pwd": "123456",
"host": "115.239.209.122",
"port": 3134,
"dbPath": "dfs://demo",
"tableName": "pt",
"batchSize": 1000000,
"saveFunctionName": "transData",
"saveFunctionDef": "def parseRFC3339(timeStr) {if(strlen(timeStr) == 20) {return localtime(temporalParse(timeStr,'yyyy-MM-ddTHH:mm:ssZ'));} else if (strlen(timeStr) == 24) {return localtime(temporalParse(timeStr,'yyyy-MM-ddTHH:mm:ss.SSSZ'));} else {return timeStr;}};def transData(dbName, tbName, mutable data) {timeCol = exec time from data; timeCol=each(parseRFC3339, timeCol); writeLog(timeCol);replaceColumn!(data,'time', timeCol); loadTable(dbName,tbName).append!(data); }",
"table": [
"type": "DT_STRING",
"name": "time"
"type": "DT_SYMBOL",
"name": "stationID"
"type": "DT_DOUBLE",
"name": "grinding_time"
"type": "DT_DOUBLE",
"name": "oil_temp"
"type": "DT_DOUBLE",
"name": "pressure"
"type": "DT_DOUBLE",
"name": "pressure_target"
"type": "DT_DOUBLE",
"name": "rework_time"
"type": "DT_SYMBOL",
"name": "state"
"setting": {
"speed": {
"bytes": -1,
"channel": 1
}
其中 saveFunctionDef 的定义为:
def parseRFC3339(timeStr) {
if(strlen(timeStr) == 20) {
return localtime(temporalParse(timeStr,'yyyy-MM-ddTHH:mm:ssZ'));
} else if (strlen(timeStr) == 24) {
return localtime(temporalParse(timeStr,'yyyy-MM-ddTHH:mm:ss.SSSZ'));
} else {
return timeStr;
def transData(dbName, tbName, mutable data) {
timeCol = exec time from data; writeLog(timeCol);
timeCol=each(parseRFC3339, timeCol);
replaceColumn!(data, 'time', timeCol);
loadTable(dbName,tbName).append!(data);
}
这里主要定义了2个函数,转换函数入口是 transData。在 writer 的定义里,需要把参数 saveFunctionName 设置为转换入口函数"transData"。上面的函数实现里,主要是先对这个字符串类型的时间列做解析,生成需要的时间列,最后调用 append! 去写入分布式表。
在转换时间列时,通过函数 temporalParse 去分析时间字符串,并通过函数 localtime 去将 UTC 时间转换成本地时间并进行存储。如果业务上有需要,也可以不做时区转换,存储 temporalParse 的结果。关于时间信息处理相关的实践例子,可以参考 DolphinDB 中有关时间信息的最佳实践指南 。
2.8 执行 addax 任务
./bin/addax.sh ~/addax/job/influx2ddb.json
(上面路径根据实际 addax 的位置,以及配置文件路径来修改)
运行结果如下:
2022-12-13 21:17:17.870 [job-0] INFO JobContainer -
任务启动时刻 : 2022-12-13 21:17:14