import org.pentaho.di.core.KettleEnvironment;
import org.pentaho.di.core.database.DatabaseMeta;
import org.pentaho.di.core.exception.KettleException;
import org.pentaho.di.core.logging.LogLevel;
import org.pentaho.di.core.util.EnvUtil;
import org.pentaho.di.job.Job;
import org.pentaho.di.job.JobMeta;
import org.pentaho.di.repository.RepositoryDirectoryInterface;
import org.pentaho.di.repository.filerep.KettleFileRepository;
import org.pentaho.di.repository.filerep.KettleFileRepositoryMeta;
import org.pentaho.di.trans.Trans;
import org.pentaho.di.trans.TransMeta;
import java.util.List;
public class Main {
public static void main(String[] args) throws KettleException {
String path = "转换路径";
String path1 = "作业路径";
KettleFileRepository rep = (KettleFileRepository) RepositoryCon();
//runTransfer(params, path);
runJob(rep);
public static void runTransfer(String[] params, String ktrPath) {
Trans trans = null;
try {
// 转换元对象
KettleEnvironment.init();// 初始化
EnvUtil.environmentInit();
TransMeta transMeta = new TransMeta(ktrPath);
List<DatabaseMeta> dmlist = transMeta.getDatabases();
initDatabase(dmlist);
// 转换
trans = new Trans(transMeta);
trans.setVariable("tablename","表名");
// 执行转换
trans.execute(null);
// 等待转换执行结束
trans.waitUntilFinished();
// 抛出异常
if (trans.getErrors() > 0) {
throw new Exception(
"There are errors during transformation exception!(传输过程中发生异常)");
} catch (Exception e) {
e.printStackTrace();
public static void runJob(KettleFileRepository rep) {
try {
// jobname 是Job脚本的路径及名称
RepositoryDirectoryInterface dir = rep.findDirectory("/");//根据指定的字符串路径 找到目录
JobMeta jobMeta = rep.loadJob("work01",dir, null,null);
System.out.println(jobMeta.getName());
System.out.println(jobMeta.getObjectId());
System.out.println(jobMeta.getFilename());
List<DatabaseMeta> dmlist = jobMeta.getDatabases();
initDatabase(dmlist);
Job job = new Job(rep, jobMeta);
job.setLogLevel(LogLevel.ERROR);//日志输出级别
job.start();
job.waitUntilFinished();
if (job.getErrors() > 0) {
throw new Exception(
"There are errors during job exception!(执行job发生异常)");
} catch (Exception e) {
e.printStackTrace();
public static void initDatabase(List<DatabaseMeta> dmlist){
for(DatabaseMeta dm : dmlist){
dm.setHostname("地址ip"); //连接地址
dm.setDBName("数据库名"); //数据库名称
dm.setDBPort("端口号"); //端口
dm.setUsername("username"); //用户
dm.setPassword("password"); //密码
public static Object RepositoryCon() throws KettleException {
// 初始化
KettleEnvironment.init();
// 资源库元对象
KettleFileRepositoryMeta repinfo = new KettleFileRepositoryMeta("","仓库名称","Kettle ETL File repository","仓库路径");
// 文件形式的资源库
KettleFileRepository rep = new KettleFileRepository();
rep.init(repinfo);
// 转换元对象
if(rep.isConnected()){
return rep;
}else{
return null;
参考地址:https://blog.csdn.net/hubeilihao/article/details/28647721
import org.pentaho.di.core.KettleEnvironment;import org.pentaho.di.core.database.DatabaseMeta;import org.pentaho.di.core.exception.KettleException;import org.pentaho.di.core.logging.LogLevel;impo...
在之前我写了如何通过java 脚本来修改数据,从而确定有一个processRow()方法,该方法中能获取到数据信息等,那么接下来就是需要运行一个简单的表交换来看数据是怎么流的。
首先写一个简单的tableInput->tableOutput的交换,这里代码就贴在最后,毕竟只是用mysql表交换,数据也不多,太简单了。
定义一个源表,放两条数据,然后新建一个u...
1. 下载源码 https://github.com/pentaho/pentaho-kettle/
2. 下载kettle发行版本 http://community.pentaho.com/projects/data-integration/ (主要是为了获取依赖的jar包)
以上两者版本请尽量保持一致。
源码的readme文件中描述了源码编译方法,你可以照着步骤作,此方法需要联
自己在做集成时,网上信息都是零散的,在这里汇总一下,加一些自己的想法。
这里要实现一个简单的对库对表数据交换,并实现灵活可配置。
根据Kettle转化的特性,仅需要配置以下几个属性:
1.输入输出数据库属性
2.输入输出表名,输出数据表数据池,可设计灵活配置SQL限制范围。
3.字符串数组形式的输入输出表字段。
一,引入jar包
Maven仓库:kettle中央仓库里没有,需要单独...
1、脚本的方式,windows下以bat脚本调用,Linux下以sh脚本调用。
2、http请求carte服务的方式进行集成调用。java端可以采用httpclient api去调用carte。
3、以java工程引入kettle依赖,采用api集成的方式调用。
二、为什么要以api的方式集成kettle
因为第一点提到的1、2两点,都需要在服务节点端安装kettle,且脚本的方式调用要维护多套系统执行脚本。而以c
后来发现是build文档未指定目录
Ant编译找不到包
Buildfile: C:\Users\db2admin\Desktop\TemplateStepPlugin (1)\TemplateStepPlugin - 副本\build.xml
init:
[echo] Init...
package java2cettle;
import org.pentaho.di.core.KettleEnvironment;
import org.pentaho.di.core.Result;
import org.pentaho.di.core.exception.KettleExcep
存储过程pro_create:实现在tb_teacher表中插入一条数据
1.调用不带参数的存储过程
create or replace procedure PRO_create is
begin
insert into tb_teacher(id,name) values(4,'郑老师');
commit;
end PRO_create;
-------------
在kettle使用java代码模块需要注意的一些事项,以防忘记。
首先需要继承processRow方法,跟java的main方法类似,代码从这里开始执行,并且在里边读取跟保存变量。
public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws Exception
Object[] r = g
Filed to convert property value of type ‘java.lang.String‘ to required type ‘java.util.Date‘ for XX
整型包装类之间的比较使用equlas