public boolean processRow(StepMetaInterfacesmi, StepDataInterface sdi) throws KettleException
Object[]r = getRow();
if(r == null) {
setOutputDone();
returnfalse;
if (first){
first = false;
r= createOutputRow(r, data.outputRowMeta.size());
// Get the value from an input field
Long numerator = get(Fields.In, "numerator").getInteger(r);
Long denominator = get(Fields.In,"denominator").getInteger(r);
//avoid dividing by 0
if(denominator == 0){
//putErro is declared as follows:
//public void putError(RowMetaInterface rowMeta, Object[] row, long nrErrors,String errorDescriptions, String fieldNames, String errorCodes)
putError(data.outputRowMeta,r, 1, "Denominator must be different from 0","denominator", "DIV_0");
//get on with the next line
returntrue;
longinteger_division = numerator / denominator;
longremainder = numerator % denominator;
//write output fields
get(Fields.Out, "integer_division").setValue(r,Long.valueOf(integer_division));
get(Fields.Out, "remainder").setValue(r,Long.valueOf(remainder));
//Send the row on to the next step.
putRow(data.outputRowMeta, r);
returntrue;
访问数据库连接
如果
udjc
步骤需要实现一些和数据库相关的功能,那么可以使用
kettle
功能获取其数据库连接。下面示例中使用了
kettle
中定义的“
TestDB
”数据库连接。输入行有一个“
table_name
”字段,该步骤检查输入的表是否存在,并把结果写入的输出结果中。
如果需要在
udjc
步骤中实现一些和数据库相关的重要工作,最好对源码中的
org.pentaho.di.core.database
包内容比较熟悉,也可以查看和
DB
相关的步骤和示例代码,了解如何使用
database
包相关类的使用。
importorg.pentaho.di.core.database.Database;
importjava.util.List;
importjava.util.Arrays;
privateDatabase db = null;
privateFieldHelper outputField = null;
private FieldHelpertableField = null;
privateList existingTables = null;
publicboolean processRow(StepMetaInterface smi, StepDataInterface sdi) throwsKettleException
Object[] r = getRow();
if (r == null) {
setOutputDone();
return false;
if (first){
first = false;
existingTables =Arrays.asList(db.getTablenames());
tableField = get(Fields.In,"table_name");
outputField = get(Fields.Out,"table_exists");
r = createOutputRow(r,data.outputRowMeta.size());
if (existingTables.contains(tableField.getString(r))){
outputField.setValue(r, Long.valueOf(1));
else{
outputField.setValue(r,Long.valueOf(0));
// Send the row on to the next step.
putRow(data.outputRowMeta, r);
return true;
public booleaninit(StepMetaInterface stepMetaInterface, StepDataInterface stepDataInterface)
if (parent.initImpl(stepMetaInterface,stepDataInterface)){
db = newDatabase(this.parent, getTransMeta().findDatabase("TestDB"));
db.shareVariablesWith(this.parent);
db.connect();
return true;
catch(KettleDatabaseException e){
logError("Errorconnecting to TestDB: "+ e.getMessage());
setErrors(1);
stopAll();
return false;
publicvoid dispose(StepMetaInterface smi, StepDataInterface sdi)
if (db != null) {
db.disconnect();
parent.disposeImpl(smi, sdi);
示例
udjc
步骤中的重写了
init
()和
dispose
()方法,分别实现创建数据库连接和完成后断开连接。在转换初始化的时候,第一次执行
processRow
()之前调用
init
()方法。转换执行完成之后调用
dispose
()方法。如果有首先要初始化的工作以及一些清理资源代码,就考虑分别放在
init
和
dispose
方法中。示例转换的名称:
db_access.ktr
。
实现输入步骤
有时
udjc
步骤本身就是输入步骤,这时其自己生成输入行,而无需其他的输入行步骤。下面示例展示生成
java
的系统属性列表作为输入行。
代码如下:
import java.util.*;
private ArrayList keys = null;
private int idx = 0;
public boolean processRow(StepMetaInterfacesmi, StepDataInterface sdi) throws KettleException
if(first){
first= false;
//get the system property names, output is done one at a time later
keys= Collections.list(System.getProperties().propertyNames());
idx= 0;
if(idx >= keys.size()) {
setOutputDone();
returnfalse;
//create a row
Object[]r = RowDataUtil.allocateRowData(data.outputRowMeta.size());
// Set key and value in a new output row
get(Fields.Out, "key").setValue(r, keys.get(idx));
get(Fields.Out,"value").setValue(r,System.getProperties().get(keys.get(idx)));
idx++;
//Send the row on to the next step.
putRow(data.outputRowMeta, r);
returntrue;
在代码中没有调用
getRow
方法获取输入行,而是第一次调用
processRow
方法是初始化
java
系统属性列表。这些属性被逐个写入到输出流中。因为没有输入行,代码通过
RowDataUtil.allocateRowData()
方法创建,然后设置字段值并传输到下一步骤中。示例转换的名称
input_step.ktr
。
本文详细说明了
udjc
步骤在不同场景的使用方式。如果你需要自定义处理功能,但是
javascript
步骤实现不灵活或性能不够,这时可以考虑使用
udjc
步骤代替。为了学习更多的内容,我们也可以查看
sample
目录下的关于
udjc
的示例。
在使用
Kettle
的过程中,有可能遇到现有
步骤
无法满足需求的情况。解决此类问题,有诸如购买第
三
方插件、开发插件、自定义
Java
类等办法。最后一种办法因其代价小且门槛较低而成为最为常用的定制方法。本文将解释
Java
代码
步骤
的原理,并通过一个实际案例,快速掌握相关入门知识。
Java
代码
步骤
,位于
Kettle
转换的核心对象/脚本类别中,属于典型的需要编程基础才能掌控的
步骤
类型。而
Java
代码
步骤
,适用于熟悉
Java
语言的开发人员,用好这个
步骤
,需要对类、接口、多线程等语言相关知识有所掌握,并且需要对
Kettle
的基础框架有所理解。
Java
语言的基础知识不在本文讨论范围,下面将着重对
Kettle
框架的
在【核心对象】-【脚本】下有个【
User
Defined
Java
Class
】插件,功能很强大,允许自己在这里写
java
代码,其中用到了一个jar包,janino,有兴趣的自己上网去搜。janino只支持jdk 1.4,所以只能在【
User
Defined
Java
Class
】里写jdk 1.4的代码,泛型这些是不支持的了。
把【
User
Defined
Java
Class
】拖出来后双...
RowMetaInterface inputRowMeta = getInputRowMeta();
inputRowMeta对象包含了输入行的元数据,包括域、数据类型、长度、名字、格式等等。例如,查找名字为"customer"的域,可以采用如下方式:
ValueMetaInterface customer = inputRowMeta.searchV...
import org.pentaho.di.core.
Kettle
Environment;
import org.pentaho.di.core.RowMetaAndData;
import org.pentaho.di.core.database.DatabaseMeta;
import ...
import
java
.io.FileInputStream;
import
java
.io.FileOutputStream;
import
java
.io.InputStream;
import
java
.io.OutputStream;
import org.pentaho.di.core.
Kettle
Environment;
import org.pentaho.di.core.exception.
Kettle
Exception;
import org.pentaho.di.trans.Trans;
import org.pentaho.di.trans.TransMeta;
public
class
Kettle
Excel {
public static void main(String[] args) {
try {
Kettle
Environment.init(); //初始化
Kettle
环境
//创建TransMeta实例
TransMeta transMeta = new TransMeta("
kettle
/table2excel.ktr");
//创建Trans实例
Trans trans = new Trans(transMeta);
//执行转换
trans.execute(null);
//等待转换执行结束
trans.waitUntilFinished();
//把结果输出到文件
if (trans.getErrors() > 0) {
System.out.println("转换出错!");
} else {
String excelName = "test.xls";
File file = new File(excelName);
OutputStream os = new FileOutputStream(file);
InputStream is = trans.getTrans().getTransOutputExcelResultFiles().get(0).getContent();
byte[] b = new byte[1024];
int len;
while ((len = is.read(b)) != -1) {
os.write(b, 0, len);
is.close();
os.close();
System.out.println("转换完成!");
} catch (
Kettle
Exception e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();