select transform 语法允许您启动一个指定的子进程,将输入数据按照一定的格式通过标准输入至子进程,并且通过解析子进程的标准输出获取输出数据。 select transform 让您无需编写UDF,即可实现MaxCompute SQL对其他脚本语言的支持。
select transform
select transform 与UDTF在不同场景下的性能不同。经过多种场景对比测试,数据量较小时,大多数场景下 select transform 有优势,而数据量大时UDTF有优势。 select transform 的开发更加简便,更适合于Ad Hoc(即席查询)。
select transform 不仅仅是语言支持的扩展。一些简单的功能,例如AWK、Python、Perl、Shell都支持直接在命令中写脚本,而不需要专门编写脚本文件、上传资源等,开发过程更简单。对于复杂的功能,您可以上传脚本文件来执行,请参见 调用Python脚本使用示例 、 调用Java脚本使用示例 。
UDTF与 select transform 的优势对比如下。
类型
UDTF
数据类型
子进程基于标准输入和标准输出传输数据,所有数据都当做STRING类型处理,因此 select transform 比UDTF多了一步类型转换。
UDTF的输出结果和输入参数支持多种数据类型。
数据传输
数据传输依赖于操作系统的管道,而管道的缓存仅有4 KB且不能设置。 select transform 读空或管道写满会导致进程被挂起。
数据的传输通过更底层的系统调用来读写,效率比Java高。
无管道缓存限制。
常量参数传输
常量参数需要传输。
常量参数可以不用传输。
线程
子进程和父进程是两个进程,如果计算占比较高,数据吞吐量较小, select transform 可以利用服务器的多核特性。
单线程。
性能
select transform 支持的部分工具,例如AWK是Native代码实现的。理论上,与Java相比,使用 select transform 会更有性能优势。
性能不高。
由于MaxCompute计算集群上未部署PHP和Ruby,所以不支持调用这两种脚本。
select transform(<arg1>, <arg2> ...) [(row format delimited (fields terminated by <field_delimiter> (escaped by <character_escape>)) (null defined as <null_value>))] using '<unix_command_line>' (resources '<res_name>' (',' '<res_name>')*) [(as <col1>, <col2> ...)] (row format delimited (fields terminated by <field_delimiter> (escaped by <character_escape>)) (null defined as <null_value>))
select transform 关键字:必填。可以用 map 或 reduce 关键字替换,语义是完全一样的。为使语法更清晰,推荐您使用 select transform 。
map
reduce
arg1,arg2... :必填。指定输入数据。其格式和 select 语句类似。默认格式下,参数的各个表达式结果在隐式转换成STRING类型后,用 \t 拼接,输入到子进程中。
select
\t
row format 子句:可选。允许自定义输入输出的格式。
语法中有两个 row format 子句,第一个子句指定输入数据的格式,第二个子句指定输出数据的格式。默认情况下使用 \t 作为列分隔符, \n 作为行分隔符,使用 \N 表示NULL。
\n
\N
field_delimiter 、 character_escape 只接受一个字符。如果指定的是字符串,则以第一个字符为准。
MaxCompute支持Hive指定格式的语法,例如 inputRecordReader 、 outputRecordReader 、 SerDe 等,但您需要打开Hive兼容模式才能使用。打开方式为在SQL语句前加set语句 set odps.sql.hive.compatible=true; 。Hive支持的语法详情请参见 Hive文档 。
inputRecordReader
outputRecordReader
SerDe
set odps.sql.hive.compatible=true;
如果使用Hive的 inputRecordReader 、 outputRecordReader 等自定义类,可能会降低执行性能。
using 子句:必填。指定要启动的子进程的命令。
大多数的MaxCompute SQL命令中 using 子句指定的是资源(Resources),但此处使用 using 子句指定启动子进程的命令。使用 using 子句是为了和Hive的语法兼容。
using 子句的格式和Shell语法类似,但并非真的启动Shell来执行,而是直接根据命令的内容创建子进程。因此,很多Shell的功能不能使用,例如输入输出重定向、管道、循环等。如果有需要,Shell本身也可以作为子进程命令来使用。
resources 子句:可选。允许指定子进程能够访问的资源,支持以下两种方式指定资源:
使用 resources 子句指定资源。例如 using 'sh foo.sh bar.txt' resources 'foo.sh','bar.txt' 。
using 'sh foo.sh bar.txt' resources 'foo.sh','bar.txt'
使用MaxCompute属性指定资源。在SQL语句前使用 set odps.sql.session.resources=foo.sh,bar.txt; 来指定资源。
set odps.sql.session.resources=foo.sh,bar.txt;
此配置是全局配置,即整个SQL中所有的 select transform 都可以访问此资源。多个资源文件之间使用英文逗号(,)分隔。
as 子句:可选。指定输出列。例如 as(col1 bigint, col2 boolean) 。
as(col1 bigint, col2 boolean)
输出列可以不指定数据类型,默认为STRING类型。例如 as(col1, col2) 。
as(col1, col2)
由于输出数据实际是解析子进程标准输出获取的,如果指定的数据不是STRING类型,系统会隐式调用 cast 函数进行转换,转换过程有可能出现运行异常。
cast
输出列的数据类型不支持部分指定,例如 as(col1, col2 bigint) 。
as(col1, col2 bigint)
关键字 as 可以省略,此时默认标准输出数据中第一个 \t 之前的字段为Key,后面的部分全部为Value,相当于 as(key, value) 。
as
as(key, value)
假设通过Shell命令生成50行数据,值是从1到50,输出为 data 字段。直接将Shell命令作为 transform 数据输入。命令示例如下:
data
transform
select transform(script) using 'sh' as (data) from ( select 'for i in `seq 1 50`; do echo $i; done' as script --等效于如下语句。 select transform('for i in `seq 1 50`; do echo $i; done') using 'sh' as (data);
返回结果如下:
+------------+ | data | +------------+ | 1 | | 2 | | 3 | | 4 | | 5 | | 6 | | 7 | | 8 | | 9 | | 10 | | 11 | | 12 | | 13 | | 14 | | 15 | | 16 | | 17 | | 18 | | 19 | | 20 | | 21 | | 22 | | 23 | | 24 | | 25 | | 26 | | 27 | | 28 | | 29 | | 30 | | 31 | | 32 | | 33 | | 34 | | 35 | | 36 | | 37 | | 38 | | 39 | | 40 | | 41 | | 42 | | 43 | | 44 | | 45 | | 46 | | 47 | | 48 | | 49 | | 50 | +------------+
假设通过Python命令生成50行数据,值是从1到50,输出为 data 字段。直接将Python命令作为 transform 数据输入。命令示例如下:
select transform(script) using 'python' as (data) from ( select 'for i in xrange(1, 51): print i;' as script --等效于如下语句。 select transform('for i in xrange(1, 51): print i;') using 'python' as (data);
创建一张测试表,假设通过AWK命令将测试表的第二列原样输出,输出为 data 字段。直接将AWK命令作为 transform 数据输入。命令示例如下:
--创建测试表。 create table testdata(c1 bigint,c2 bigint); --测试表中插入测试数据。 insert into table testdata values (1,4),(2,5),(3,6); --执行select transform语句。 select transform(*) using "awk '//{print $2}'" as (data) from testdata;
+------------+ | data | +------------+ | 4 | | 5 | | 6 | +------------+
创建一张测试表,假设通过Perl命令将测试表的数据原样输出,输出为 data 字段。直接将Perl命令作为 transform 数据输入。命令示例如下:
--创建测试表。 create table testdata(c1 bigint,c2 bigint); --测试表中插入测试数据。 insert into table testdata values (1,4),(2,5),(3,6); --执行select transform语句。 select transform(testdata.c1, testdata.c2) using "perl -e 'while($input = <STDIN>){print $input;}'" from testdata;
+------------+------------+ | key | value | +------------+------------+ | 1 | 4 | | 2 | 5 | | 3 | 6 | +------------+------------+
准备Python文件,脚本文件名为 myplus.py ,命令示例如下。
#!/usr/bin/env python import sys line = sys.stdin.readline() while line: token = line.split('\t') if (token[0] == '\\N') or (token[1] == '\\N'): print '\\N' else: print str(token[0]) +'\t' + str(token[1]) line = sys.stdin.readline()
将该Python脚本文件添加为MaxCompute资源(Resource)。
add py ./myplus.py -f;
您也可通过DataWorks控制台进行新增资源操作,请参见 创建并使用MaxCompute资源 。
使用 select transform 语法调用资源。
--创建测试表。 create table testdata(c1 bigint,c2 bigint); --测试表中插入测试数据。 insert into table testdata values (1,4),(2,5),(3,6); --执行select transform语句。 select transform (testdata.c1, testdata.c2) using 'python myplus.py' resources 'myplus.py' as (result1,result2) from testdata; --等效于如下语句。 set odps.sql.session.resources=myplus.py; select transform (testdata.c1, testdata.c2) using 'python myplus.py' as (result1,result2) from testdata;
+------------+------------+ | result1 | result2 | +------------+------------+ | 1 | 4 | | | NULL | | 2 | 5 | | | NULL | | 3 | 6 | | | NULL | +------------+------------+
准备好JAR文件,脚本文件名为 Sum.jar ,Java代码示例如下。
package com.aliyun.odps.test; import java.util.Scanner; public class Sum { public static void main(String[] args) { Scanner sc = new Scanner(System.in); while (sc.hasNext()) { String s = sc.nextLine(); String[] tokens = s.split("\t"); if (tokens.length < 2) { throw new RuntimeException("illegal input"); if (tokens[0].equals("\\N") || tokens[1].equals("\\N")) { System.out.println("\\N"); System.out.println(Long.parseLong(tokens[0]) + Long.parseLong(tokens[1])); }
将JAR文件添加为MaxCompute的资源。
add jar ./Sum.jar -f;
--创建测试表。 create table testdata(c1 bigint,c2 bigint); --测试表中插入测试数据。 insert into table testdata values (1,4),(2,5),(3,6); --执行select transform语句。 select transform(testdata.c1, testdata.c2) using 'java -cp Sum.jar com.aliyun.odps.test.Sum' resources 'Sum.jar' as cnt from testdata; --等效于如下语句。 set odps.sql.session.resources=Sum.jar; select transform(testdata.c1, testdata.c2) using 'java -cp Sum.jar com.aliyun.odps.test.Sum' as cnt from testdata;
+-----+ | cnt | +-----+ | 5 | | 7 | | 9 | +-----+
Java和Python虽然有现成的UDTF框架,但是用 select transform 编写更简单,不需要额外依赖以及没有格式要求,甚至可以实现直接使用离线脚本。Java和Python离线脚本的实际路径,可以从 JAVA_HOME 和 PYTHON_HOME 环境变量中得到。
JAVA_HOME
PYTHON_HOME
select transform 还可以串联使用。例如使用 distribute by 和 sort by 对输入数据做预处理。命令示例如下:
distribute by
sort by
select transform(key, value) using '<cmd2>' from select transform(*) using '<cmd1>' from select * from testdata distribute by c2 sort by c1 ) t distribute by key sort by value ) t2;
cmd1 、 cmd2 为要启动的子进程的命令。
cmd1
cmd2
或使用 map 、 reduce 关键字。
@a := select * from data distribute by col2 sort by col1; @b := map * using 'cmd1' distribute by col1 sort by col2 from @a; reduce * using 'cmd2' from @b;