import java.io.DataOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.util.Date;
import be.ibridge.kettle.core.Const;
import be.ibridge.kettle.core.LogWriter;
import be.ibridge.kettle.core.NotePadMeta;
import be.ibridge.kettle.core.database.Database;
import be.ibridge.kettle.core.database.DatabaseMeta;
import be.ibridge.kettle.core.exception.KettleException;
import be.ibridge.kettle.trans.StepLoader;
import be.ibridge.kettle.trans.Trans;
import be.ibridge.kettle.trans.TransHopMeta;
import be.ibridge.kettle.trans.TransMeta;
import be.ibridge.kettle.trans.step.StepMeta;
import be.ibridge.kettle.trans.step.StepMetaInterface;
import be.ibridge.kettle.trans.step.selectvalues.SelectValuesMeta;
import be.ibridge.kettle.trans.step.tableinput.TableInputMeta;
import be.ibridge.kettle.trans.step.tableoutput.TableOutputMeta;
/**
*
* <p>Title:
* 本文描述了以下操作:
1) 建立一个新的转换(transformation)
2) 把转换(transformation)存储为XML文件
3) 生成需要在目标表运行的SQL语句
4) 执行转换(transformation)
5) 删除目标表,可以使测试程序可以反复执行(这一点可根据需要修改)。
</p>
* <p>Description: TODO 类的功能描述</p>
* <p>Copyright: Copyright (c) 2003</p>
* @author <a href="mailto: hongliangpan@gmail.com">洪亮</a>
* @version 1.0
* <p>------------------------------------------------------------</p>
* <p> 修改历史 </p>
* <p> 序号 日期 时间 修 改 人 修 改 原 因</p>
* <p> 1 2006-9-20 下午05:59:06 洪亮 创建 </p>
*
*/
public class TransBuilderME
{
public static final String[] databasesXML = {
"<?xml version=\"1.0\" encoding=\"UTF-8\"?>" +
"<connection>" +
"<name>target</name>" +
"<server>192.168.169.220</server>" +
"<type>ORACLE</type>" +
"<access>Native</access>" +
"<database>NMSDB</database>" +
"<port>1521</port>" +
"<username>UCP</username>" +
"<password>UCP</password>" +
"</connection>",
"<?xml version=\"1.0\" encoding=\"UTF-8\"?>" +
"<connection>" +
"<name>source</name>" +
"<server>192.168.169.220</server>" +
"<type>ORACLE</type>" +
"<access>Native</access>" +
"<database>NMSDB</database>" +
"<port>1521</port>" +
"<username>UCP</username>" +
"<password>UCP</password>" +
"</connection>"
};
/**
* Creates a new Transformation using input parameters such as the tablename to read from.
* @param transformationName The name of the transformation
* @param sourceDatabaseName The name of the database to read from
* @param sourceTableName The name of the table to read from
* @param sourceFields The field names we want to read from the source table
* @param targetDatabaseName The name of the target database
* @param targetTableName The name of the target table we want to write to
* @param targetFields The names of the fields in the target table (same number of fields as sourceFields)
* @return A new transformation
* @throws KettleException In the rare case something goes wrong
*/
public static final TransMeta buildCopyTable(String transformationName, String sourceDatabaseName, String sourceTableName, String[] sourceFields, String targetDatabaseName, String targetTableName, String[] targetFields) throws KettleException
{
LogWriter log = LogWriter.getInstance();
try
{
//
// Create a new transformation...
//传输元信息
TransMeta transMeta = new TransMeta();
transMeta.setName(transformationName);//传输名称
// Add the database connections
for (int i=0;i<databasesXML.length;i++)//数据库配置信息数组
{
DatabaseMeta databaseMeta = new DatabaseMeta(databasesXML[i]);//数据库元信息
transMeta.addDatabase(databaseMeta);//传输元 中加入数据库元信息
}
DatabaseMeta sourceDBInfo = transMeta.findDatabase(sourceDatabaseName);//查找源数据库元信息
DatabaseMeta targetDBInfo = transMeta.findDatabase(targetDatabaseName);//查找目标数据库元信息
//
// Add a note
//
String note = "Reads information from table [" + sourceTableName+ "] on database [" + sourceDBInfo + "]" + Const.CR;
note += "After that, it writes the information to table [" + targetTableName + "] on database [" + targetDBInfo + "]";
NotePadMeta ni = new NotePadMeta(note, 150, 10, -1, -1);//注释信息
transMeta.addNote(ni);
//
// create the source step...
//
String fromstepname = "read from [" + sourceTableName + "]";//from步骤名称
TableInputMeta tii = new TableInputMeta();//表输入元数据信息
tii.setDatabaseMeta(sourceDBInfo);//为表输入 指定 数据库
String selectSQL = "SELECT "+Const.CR;//拼接查询sql语句
for (int i=0;i<sourceFields.length;i++)
{
if (i>0) selectSQL+=", "; else selectSQL+=" ";
selectSQL+=sourceFields[i]+Const.CR;
}
selectSQL+="FROM "+sourceTableName;
tii.setSQL(selectSQL);//设置查询sql语句
StepLoader steploader = StepLoader.getInstance();//???
String fromstepid = steploader.getStepPluginID(tii);
//步骤元数据信息
StepMeta fromstep = new StepMeta(log, fromstepid, fromstepname, (StepMetaInterface) tii);
fromstep.setLocation(150, 100);
fromstep.setDraw(true);
fromstep.setDescription("Reads information from table [" + sourceTableName + "] on database [" + sourceDBInfo + "]");
//传输中 添加步骤
transMeta.addStep(fromstep);
//
// add logic to rename fields
// Use metadata logic in SelectValues, use SelectValueInfo...
//选择字段(重命名)
SelectValuesMeta svi = new SelectValuesMeta();
svi.allocate(0, 0, sourceFields.length);
for (int i = 0; i < sourceFields.length; i++)
{
//设置源字段和目标字段
svi.getMetaName()[i] = sourceFields[i];
svi.getMetaRename()[i] = targetFields[i];
}
String selstepname = "Rename field names";
//获取步骤插件ID
String selstepid = steploader.getStepPluginID(svi);
//创建步骤元数据信息
StepMeta selstep = new StepMeta(log, selstepid, selstepname, (StepMetaInterface) svi);
selstep.setLocation(350, 100);
selstep.setDraw(true);
selstep.setDescription("Rename field names");
//添加步骤
transMeta.addStep(selstep);
//传输连接元数据信息(连接from和select)
TransHopMeta shi = new TransHopMeta(fromstep, selstep);
transMeta.addTransHop(shi);//添加到传输元对象
fromstep = selstep;//然后设置from步骤为select步骤
//
// Create the target step...
//
//
// Add the TableOutputMeta step...
//设置目标步骤名称
String tostepname = "write to [" + targetTableName + "]";
//表输出元对象
TableOutputMeta toi = new TableOutputMeta();
toi.setDatabase(targetDBInfo);//设置数据库
toi.setTablename(targetTableName);//设置表名
toi.setCommitSize(3000);//设置批量提交数
toi.setTruncateTable(true);//是否清除原有数据
//获取步骤ID
String tostepid = steploader.getStepPluginID(toi);
//创建to步骤
StepMeta tostep = new StepMeta(log, tostepid, tostepname, (StepMetaInterface) toi);
tostep.setLocation(550, 100);
tostep.setDraw(true);
tostep.setDescription("Write information to table [" + targetTableName + "] on database [" + targetDBInfo + "]");
transMeta.addStep(tostep);//添加步骤
//
// Add a hop between the two steps...
//
//创建连接 from--to
TransHopMeta hi = new TransHopMeta(fromstep, tostep);
transMeta.addTransHop(hi);
// OK, if we're still here: overwrite the current transformation...
return transMeta;
}
catch (Exception e)
{
throw new KettleException("An unexpected error occurred creating the new transformation", e);
}
}
/**
* 1) create a new transformation
* 2) save the transformation as XML file
* 3) generate the SQL for the target table
* 4) Execute the transformation
* 5) drop the target table to make this program repeatable
*
* @param args
*/
public static void main(String[] args) throws Exception
{
long start = new Date().getTime();
// Init the logging...
LogWriter log = LogWriter.getInstance("TransBuilder.log", true, LogWriter.LOG_LEVEL_DETAILED);
// Load the Kettle steps & plugins
StepLoader stloader = StepLoader.getInstance();
if (!stloader.read())
{
log.logError("TransBuilder", "Error loading Kettle steps & plugins... stopping now!");
return;
}
// The parameters we want, optionally this can be
String fileName = "./NewTrans.xml";
String transformationName = "Test Transformation";
String sourceDatabaseName = "source";
String sourceTableName = "emp_collect";
String sourceFields[] = {
"empno",
"ename",
"job",
"mgr",
"comm",
"sal",
"deptno",
"birthday"
};
String targetDatabaseName = "target";
String targetTableName = "emp_kettle01";
String targetFields[] = {
"empno01",
"ename01",
"job01",
"mgr01",
"comm",
"sal",
"deptno",
"birthday"
};
// Generate the transformation.
//创建转换元对象
TransMeta transMeta = TransBuilderME.buildCopyTable(
transformationName,
sourceDatabaseName,
sourceTableName,
sourceFields,
targetDatabaseName,
targetTableName,
targetFields
);
// transMeta = new TransMeta();
// Save it as a file:
//传输元对象 中获得XML,并输出
String xml = transMeta.getXML();
DataOutputStream dos = new DataOutputStream(new FileOutputStream(new File(fileName)));
dos.write(xml.getBytes("UTF-8"));
dos.close();
System.out.println("Saved transformation to file: "+fileName);
// OK, What's the SQL we need to execute to generate the target table?
//获得sql语句,创建表语句
String sql = transMeta.getSQLStatementsString();
// Execute the SQL on the target table:
//创建表
Database targetDatabase = new Database(transMeta.findDatabase(targetDatabaseName));
targetDatabase.connect();//连接数据库
targetDatabase.execStatements(sql);//执行sql
// Now execute the transformation...
//执行传输任务
Trans trans = new Trans(log, transMeta);
trans.execute(null);
trans.waitUntilFinished();//等待执行完毕
// For testing/repeatability, we drop the target table again
// targetDatabase.execStatement("drop table "+targetTableName);
targetDatabase.disconnect();//断开数据库连接
long end = new Date().getTime();
System.out.println("运行时间:" + (end - start) / 1000 + "秒");
long min = (end - start) / 1000 / 60;
long second = (end - start) / 1000 % 60;
System.out.println("运行时间:" + min + "分钟" + second + "秒");
}
}
分享到:
相关推荐
kettle执行转换每一行数据,数据库连接JDBC,oracle数据库,scott用户,密码oracle
利用kettle的api运用Java代码完成数据转换到excel和数据库源码,自己看了2天kettle的源码,写的2个简单小例子,供大家参考,网上搜这个资料太少了!希望其他人也能上传些相关资料!
Kettle java API Kettle java API
KETTLE JAVA API学习KETTLE JAVA API学习KETTLE JAVA API学习KETTLE JAVA API学习KETTLE JAVA API学习
ET工具KETTLE API、HTML格式
利用kettle工具进行原始字典和标准字典的映射,将原始数据进行标准化处理。
kettle的简单数据转换demokettle的简单数据转换demokettle的简单数据转换demo
java中调用kettle中的job与转换源码,其中kettle用的是5.2.0.0的版本。已经测试过可以调用访问,并且可以传入参数调用。有需要的可直接拿去使用。
此处主要以两个例子说明KETTLE可支持的数据转换操作,一个说明不同数据库表间的数据提取和加载,一个说明数据文件与数据库表间的数据提取与加载。
开源ETL工具-kettle API 使用手册 下载,帮助您了解kettle
这个是3.2版本的Kettle API,CHM格式的
对向前兼容性的推荐 :如果想要动态地创造Transformation (例如:从元数据),使用XML文件方法(KTR)而不是使用API。 XML文件兼容Kettle所有版本,同样对job有效的。 下面的例子进行以下操作: 1创建Transformation ...
Kettle JOB执行出错,发送带附件的邮件
kettle实现数据转换的完整例子,代数据库脚本。kettle源码文件
Kettle中有两种脚本文件,transformation和job,transformation完成针对数据的基础转换,job则完成整个工作流的控制。 作为Pentaho的一个重要组成部分,现在在国内项目应用上逐渐增多。这是其API文档的CHM格式,方便...
通过调用kettle的API接口,实现将一个库的数据转移到另一个数据库中。附件中同时提供了抽取需要的jar包
kettle使用事务的转换
ETL工具(kettle)使用系列(五)-kettle调用restApi接口获取数据插入数据库-真实案例脱密处理-kettle脚本
kettle3.2.0 java API kettle3.2.0 java API
KETTLE表输入变量执行每一行,ORACLE数据库,scott用户,密码oracle