public boolean processRow(StepMetaInterfacesmi, StepDataInterface sdi) throws KettleException

Object[]r = getRow();

if(r == null) {

setOutputDone();

returnfalse;

if (first){

first = false;

r= createOutputRow(r, data.outputRowMeta.size());

// Get the value from an input field

Long numerator = get(Fields.In, "numerator").getInteger(r);

Long denominator = get(Fields.In,"denominator").getInteger(r);

//avoid dividing by 0

if(denominator == 0){

//putErro is declared as follows:

//public void putError(RowMetaInterface rowMeta, Object[] row, long nrErrors,String errorDescriptions, String fieldNames, String errorCodes)

putError(data.outputRowMeta,r, 1, "Denominator must be different from 0","denominator", "DIV_0");

//get on with the next line

returntrue;

longinteger_division = numerator / denominator;

longremainder = numerator % denominator;

//write output fields

get(Fields.Out, "integer_division").setValue(r,Long.valueOf(integer_division));

get(Fields.Out, "remainder").setValue(r,Long.valueOf(remainder));

//Send the row on to the next step.

putRow(data.outputRowMeta, r);

returntrue;

访问数据库连接

如果 udjc 步骤需要实现一些和数据库相关的功能,那么可以使用 kettle 功能获取其数据库连接。下面示例中使用了 kettle 中定义的“ TestDB ”数据库连接。输入行有一个“ table_name ”字段,该步骤检查输入的表是否存在,并把结果写入的输出结果中。

如果需要在 udjc 步骤中实现一些和数据库相关的重要工作,最好对源码中的 org.pentaho.di.core.database 包内容比较熟悉,也可以查看和 DB 相关的步骤和示例代码,了解如何使用 database 包相关类的使用。

importorg.pentaho.di.core.database.Database;

importjava.util.List;

importjava.util.Arrays;

privateDatabase db = null;

privateFieldHelper outputField = null;

private FieldHelpertableField = null;

privateList existingTables = null;

publicboolean processRow(StepMetaInterface smi, StepDataInterface sdi) throwsKettleException

Object[] r = getRow();

if (r == null) {

setOutputDone();

return false;

if (first){

first = false;

existingTables =Arrays.asList(db.getTablenames());

tableField = get(Fields.In,"table_name");

outputField = get(Fields.Out,"table_exists");

r = createOutputRow(r,data.outputRowMeta.size());

if (existingTables.contains(tableField.getString(r))){

outputField.setValue(r, Long.valueOf(1));

else{

outputField.setValue(r,Long.valueOf(0));

// Send the row on to the next step.

putRow(data.outputRowMeta, r);

return true;

public booleaninit(StepMetaInterface stepMetaInterface, StepDataInterface stepDataInterface)

if (parent.initImpl(stepMetaInterface,stepDataInterface)){

db = newDatabase(this.parent, getTransMeta().findDatabase("TestDB"));

db.shareVariablesWith(this.parent);

db.connect();

return true;

catch(KettleDatabaseException e){

logError("Errorconnecting to TestDB: "+ e.getMessage());

setErrors(1);

stopAll();

return false;

publicvoid dispose(StepMetaInterface smi, StepDataInterface sdi)

if (db != null) {

db.disconnect();

parent.disposeImpl(smi, sdi);

示例 udjc 步骤中的重写了 init ()和 dispose ()方法,分别实现创建数据库连接和完成后断开连接。在转换初始化的时候,第一次执行 processRow ()之前调用 init ()方法。转换执行完成之后调用 dispose ()方法。如果有首先要初始化的工作以及一些清理资源代码,就考虑分别放在 init dispose 方法中。示例转换的名称: db_access.ktr

实现输入步骤

有时 udjc 步骤本身就是输入步骤,这时其自己生成输入行,而无需其他的输入行步骤。下面示例展示生成 java 的系统属性列表作为输入行。

代码如下:

import java.util.*;

private ArrayList keys = null;

private int idx = 0;

public boolean processRow(StepMetaInterfacesmi, StepDataInterface sdi) throws KettleException

if(first){

first= false;

//get the system property names, output is done one at a time later

keys= Collections.list(System.getProperties().propertyNames());

idx= 0;

if(idx >= keys.size()) {

setOutputDone();

returnfalse;

//create a row

Object[]r = RowDataUtil.allocateRowData(data.outputRowMeta.size());

// Set key and value in a new output row

get(Fields.Out, "key").setValue(r, keys.get(idx));

get(Fields.Out,"value").setValue(r,System.getProperties().get(keys.get(idx)));

idx++;

//Send the row on to the next step.

putRow(data.outputRowMeta, r);

returntrue;

在代码中没有调用 getRow 方法获取输入行,而是第一次调用 processRow 方法是初始化 java 系统属性列表。这些属性被逐个写入到输出流中。因为没有输入行,代码通过 RowDataUtil.allocateRowData() 方法创建,然后设置字段值并传输到下一步骤中。示例转换的名称 input_step.ktr

本文详细说明了 udjc 步骤在不同场景的使用方式。如果你需要自定义处理功能,但是 javascript 步骤实现不灵活或性能不够,这时可以考虑使用 udjc 步骤代替。为了学习更多的内容,我们也可以查看 sample 目录下的关于 udjc 的示例。

在使用 Kettle 的过程中,有可能遇到现有 步骤 无法满足需求的情况。解决此类问题,有诸如购买第 方插件、开发插件、自定义 Java 类等办法。最后一种办法因其代价小且门槛较低而成为最为常用的定制方法。本文将解释 Java 代码 步骤 的原理,并通过一个实际案例,快速掌握相关入门知识。 Java 代码 步骤 ,位于 Kettle 转换的核心对象/脚本类别中,属于典型的需要编程基础才能掌控的 步骤 类型。而 Java 代码 步骤 ,适用于熟悉 Java 语言的开发人员,用好这个 步骤 ,需要对类、接口、多线程等语言相关知识有所掌握,并且需要对 Kettle 的基础框架有所理解。 Java 语言的基础知识不在本文讨论范围,下面将着重对 Kettle 框架的
在【核心对象】-【脚本】下有个【 User Defined Java Class 】插件,功能很强大,允许自己在这里写 java 代码,其中用到了一个jar包,janino,有兴趣的自己上网去搜。janino只支持jdk 1.4,所以只能在【 User Defined Java Class 】里写jdk 1.4的代码,泛型这些是不支持的了。 把【 User Defined Java Class 】拖出来后双...
RowMetaInterface inputRowMeta = getInputRowMeta();  inputRowMeta对象包含了输入行的元数据,包括域、数据类型、长度、名字、格式等等。例如,查找名字为"customer"的域,可以采用如下方式: ValueMetaInterface customer = inputRowMeta.searchV...
import org.pentaho.di.core. Kettle Environment; import org.pentaho.di.core.RowMetaAndData; import org.pentaho.di.core.database.DatabaseMeta; import ...
import java .io.FileInputStream; import java .io.FileOutputStream; import java .io.InputStream; import java .io.OutputStream; import org.pentaho.di.core. Kettle Environment; import org.pentaho.di.core.exception. Kettle Exception; import org.pentaho.di.trans.Trans; import org.pentaho.di.trans.TransMeta; public class Kettle Excel { public static void main(String[] args) { try { Kettle Environment.init(); //初始化 Kettle 环境 //创建TransMeta实例 TransMeta transMeta = new TransMeta(" kettle /table2excel.ktr"); //创建Trans实例 Trans trans = new Trans(transMeta); //执行转换 trans.execute(null); //等待转换执行结束 trans.waitUntilFinished(); //把结果输出到文件 if (trans.getErrors() > 0) { System.out.println("转换出错!"); } else { String excelName = "test.xls"; File file = new File(excelName); OutputStream os = new FileOutputStream(file); InputStream is = trans.getTrans().getTransOutputExcelResultFiles().get(0).getContent(); byte[] b = new byte[1024]; int len; while ((len = is.read(b)) != -1) { os.write(b, 0, len); is.close(); os.close(); System.out.println("转换完成!"); } catch ( Kettle Exception e) { e.printStackTrace(); } catch (Exception e) { e.printStackTrace();