一般用于有条件的数据导出。在此模式下,DataX不会按照指定的column、table参数进行SQL的拼接,而是会略过这些配置(如果有)直接执行querySQL语句。task数量固定为1,因此在此模式下channel的配置无多线程效果。
querySQL模式的数据导出示例如下:
{
"job": {
"content": [
"reader": {
"name": "mysqlreader", #指定使用mysqlreader读取数据。
"parameter": {
"username": "username",#MySQL用户名。
"password": "password",#MySQL密码。
"connection": [
"querySql": [ #指定执行的SQL语句。
"select bucket_name, delta , timestamp ,cdn_in, cdn_out ,total_request from vip_quota where bucket_name='xxx' "
"jdbcUrl": ["jdbc:mysql://192.168.0.8:3306/db1?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true" #jdbc连接串
"writer": {
"name": "otswriter",#指定使用otswriter进行数据写入。
"parameter": {#数据源配置。
"endpoint":"https://smoke-test.xxxx.ots.aliyuncs.com",#表格存储实例的服务地址(Endpoint)。
"accessId":"xxxx",
"accessKey":"xxxx",
"instanceName":"smoke-test",#实例名。
"table":"vip_quota",#写入数据的目标table名称。
#以下为otswriter的限制项配置,默认可以不填,如果数据不符合以下规则,则数据会被当成脏数据过滤掉。
"requestTotalSizeLimitation": 1048576, #单行数据大小限制,默认配置为1 MB,可不配置。
"attributeColumnSizeLimitation": 2097152, #单个属性列大小限制,默认配置为2 MB,可不配置。
"primaryKeyColumnSizeLimitation": 1024, #单个主键列大小限制,默认配置为1 KB,可不配置。
"attributeColumnMaxCount": 1024, #属性列个数限制,默认配置为1024,可不配置。
"primaryKey":[# 主键名称和类型。
{"name":"bucket_name", "type":"string"},
{"name":"delta", "type":"int"},
{"name":"timestamp", "type":"int"}
"column":[#其它column的名称和类型。
{"name":"cdn_in","type":"int"},
{"name":"cdn_out","type":"int"},
{"name":"total_request","type":"int"}
"writeMode":"UpdateRow" #写入模式。
table模式(多task)
在此模式下无需手动编写select语句,而是由DataX根据JSON中的column、table、splitPk配置项自行拼接SQL语句。观察执行日志如下:
table模式的数据导出示例代码如下:
{
"job": {
"setting": {
"speed": {
"channel": 3 #指定channel个数,该参数与并发数密切相关。
"errorLimit": {#容错限制。
"record": 0,
"percentage": 0.02
"content": [
"reader": {
"name": "mysqlreader",#指定使用mysqlreader读取。
"parameter": {
"username": "username",#MySQL用户名。
"password": "password",#MySQL密码。
"column": [ #table模式下可以指定需要查询的列。
"bucket_name",
"timestamp" ,
"delta" ,
"cdn_in",
"cdn_out" ,
"total_request"
"splitPk": "timestamp",#指定split字段。
"connection": [
"table": [#导出的表名。
"vip_quota"
"jdbcUrl": ["jdbc:mysql://192.168.1.7:3306/db1"#jdbc连接串。
"writer": {
"name": "otswriter",#指定使用otswriter进行写入。
"parameter": {#数据源配置。
"endpoint":"https://smoke-test.xxxx.ots.aliyuncs.com",#表格存储实例的Endpoint。
"accessId":"xxx",
"accessKey":"xxx",
"instanceName":"smoke-test",#实例名。
"table":"vip_quota",#写入目标的table名称。
"primaryKey":[#主键名称和类型。
{"name":"bucket_name", "type":"string"},
{"name":"delta", "type":"int"},
{"name":"timestamp", "type":"int"}
"column":[#其它column的名称和类型。
{"name":"haha","type":"int"},
{"name":"hahah","type":"int"},
{"name":"kengdie","type":"int"}
"writeMode":"UpdateRow"#写入模式。
上述JSON文件中定义了一次数据导出导入的数据源信息和部分系统配置。配置主要包括如下两部分:
setting:主要是speed(与速率、并发相关)和errorLimit(容错限制)。
channel:个数决定了reader和writer的个数上限。
splitPk:指定了splitPk字段,DataX会将MySQL表中数据按照splitPk切分成n段。splitPk的字段必须是整型或者字符串类型。
由于DataX的实现方式是按照splitPk字段分段查询数据库表,那么splitPk字段的选取应该尽可能选择分布均匀且有索引的字段,例如主键ID、唯一键等字段。如果不指定splitPk字段,则DataX将不会进行数据的切分,并行度会变为1。
JVM的内存
发送给MySQL数据库SQL语句后会得到查询的数据集,并缓存在DataX的buffer中。除此之外,每个channel也维护了自己的record队列。如果存在并发,则channel的个数越多,也会需要更多的内存。因此您可以考虑指定JVM的内存大小参数,即在步骤三:执行同步命令中通过-j
参数来指定JVM的内存大小。
channel的个数和流控参数
在conf/core.json中,控制channel的关键参数如下图所示。
一般情况下,channel队列本身配置的调整并不常见,但是在使用DataX时应该注意byte和record流控参数。这两个参数都是在flowControlInterval间隔中采样后根据采样值来决定是否进行流控。
为了提升同步效率,您可以适当提高channel的个数来提高并发数,以及调高每个channel的byte和record限制来提高DataX的吞吐量。
请综合考虑channel的个数和流控参数,保证理论峰值不会对服务器产生过高的压力。
详细参数说明请参见下表。
capacity和byteCapacity两个参数决定了每个channel能缓存的记录数量和内存占用情况。如果要调整相应参数,您需要按照DataX实际的运行环境进行配置。例如MySQL中每个record都比较大,那么可以考虑适当调高byteCapacity,调整该参数时还请同时考虑机器的内存情况。
{
"core": { #定义了全局的系统参数,如果不指定会使用默认值。
"transport": {
"channel": {
"speed": {
"record": 5000,
"byte": 102400
"job": {
"setting": {
"speed": { #定义了单个channel的控制参数。
"record": 10000,
"errorLimit": {
"record": 0,
"percentage": 0.02
"content": [
"reader": {
.....#省略
"writer": {
.....#省略
Tablestore是基于LSM设计的高性能高吞吐的分布式数据库产品,每一张表都会被切分为很多数据分区,数据分区分布在不同的服务器上,拥有极强的吞吐能力。如果写入能够分散在所有服务器上,则能够利用所有服务器的服务能力,更高速地写入数据,即表分区数量和吞吐能力正相关。
新建的表默认分区数量都是1,分区数量会随着表不断写入数据而自动分裂不断增长,但是自动分裂的周期较长。对于新建表,如果需要马上进行数据导入,由于单分区很可能不够用导致数据导入不够顺畅,因此建议在新建表时对表进行预分区,在开始导入数据时即可获得极好的性能,而不用等待自动分裂。
{
"job": {
"setting": {
....#省略
"content": [
"reader": {
.....#省略
"writer": {
"name": "otswriter",
"parameter": {
.......
"writeMode":"UpdateRow",
"batchWriteCount":100