博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
【转】Kettle API - Java调用示例
阅读量:5919 次
发布时间:2019-06-19

本文共 5737 字,大约阅读时间需要 19 分钟。

hot3.png

向前兼容性的推荐 :如果想要动态地创造Transformation (例如:从元数据),使用XML文件方法(KTR)而不是使用API XML文件兼容Kettle所有版本,同样对job有效的。 

    下面的例子进行以下操作:

    1. 创建Transformation

    2. 保存Transformation信息到XML文件

    3. 获取在目标表操作的SQL

    4. 执行Transformation

       5. drop目标表,使这个示例可以重复。 

// 创建“复制表”的 transformation元数据.TransMeta transMeta = TransBuilder.buildCopyTable(   transformationName,   sourceDatabaseName,   sourceTableName,   sourceFields,   targetDatabaseName,   targetTableName,   targetFields);

// 保存为文件:String xml = transMeta.getXML();DataOutputStream dos = new DataOutputStream(new FileOutputStream(new File(fileName)));dos.write(xml.getBytes("UTF-8"));dos.close();System.out.println("Saved transformation to file: "+fileName);

//获取在目标表操作的SQLString sql = transMeta.getSQLStatementsString();

// 在目标表执行sql:Database targetDatabase = new Database(transMeta.findDatabase(targetDatabaseName));targetDatabase.connect();targetDatabase.execStatements(sql);

//现在执行 transformationTrans trans = new Trans(transMeta);trans.execute(null);trans.waitUntilFinished();

//为了重复测试,再次drop目标表targetDatabase.execStatement("drop table "+targetTableName);targetDatabase.disconnect();

下面是创建Transformation的方法的代码:

 

/*** Creates a new Transformation using input parameters such as the tablename to read from.* @param transformationName transformation的名称* @param sourceDatabaseName 输入的 database 名称* @param sourceTableName 要读取的表名* @param sourceFields 要读取的列名* @param targetDatabaseName 目标database名 * @param targetTableName要写入的表名* @param targetFields要写入的列名(要跟读取的列长度相同)* @return A new transformation metadata object* @throws KettleException In the rare case something goes wrong*/public static final TransMeta buildCopyTable(String transformationName,		String sourceDatabaseName, String sourceTableName,		String[] sourceFields, String targetDatabaseName,		String targetTableName, String[] targetFields)		throws KettleException {	EnvUtil.environmentInit();	try	{		// Create a new transformation...		//		TransMeta transMeta = new TransMeta();		transMeta.setName(transformationName);		// 添加数据库连接		for (int i = 0; i < databasesXML.length; i++) {			DatabaseMeta databaseMeta = new DatabaseMeta(databasesXML[i]);			transMeta.addDatabase(databaseMeta);		}		DatabaseMeta sourceDBInfo = transMeta.findDatabase(sourceDatabaseName);		DatabaseMeta targetDBInfo  = transMeta.findDatabase(targetDatabaseName);		//		//添加注释		//		String note = "Reads information from table [" + sourceTableName+ "] on database [" + sourceDBInfo + "]" + Const.CR;		note += "After that, it writes the information to table ["+ targetTableName + "] on database [" + targetDBInfo + "]";		NotePadMeta ni = new NotePadMeta(note, 150, 10, -1, -1);		transMeta.addNote(ni);		//		// 创建读取数据源的 step...		//		String fromstepname = "read from [" + sourceTableName + "]";		TableInputMeta tii = new TableInputMeta();		tii.setDatabaseMeta(sourceDBInfo);		String selectSQL = "SELECT " + Const.CR;		for (int i = 0; i < sourceFields.length; i++) {			if (i > 0) selectSQL += ", "; else selectSQL += " ";			selectSQL += sourceFields[i] + Const.CR;		}		selectSQL += "FROM " + sourceTableName;		tii.setSQL(selectSQL);		StepLoader steploader = StepLoader.getInstance();		String fromstepid = steploader.getStepPluginID(tii);		StepMeta fromstep = new StepMeta(log, fromstepid, fromstepname,(StepMetaInterface) tii);		fromstep.setLocation(150, 100);		fromstep.setDraw(true);		fromstep.setDescription("Reads information from table [" + sourceTableName + "] on database [" + sourceDBInfo + "]");		transMeta.addStep(fromstep);		//		// 添加 重命名 fields的逻辑		// Use metadata logic in SelectValues, use SelectValueInfo...		//		SelectValuesMeta svi = new SelectValuesMeta();		svi.allocate(0, 0, sourceFields.length);		for (int i = 0; i < sourceFields.length; i++) {			svi.getMetaName()[i] = sourceFields[i];			svi.getMetaRename()[i] = targetFields[i];		}		String selstepname = "Rename field names";		String selstepid = steploader.getStepPluginID(svi);		StepMeta selstep = new StepMeta(log, selstepid, selstepname, (StepMetaInterface) svi);		selstep.setLocation(350, 100);		selstep.setDraw(true);		selstep.setDescription("Rename field names");		transMeta.addStep(selstep);		TransHopMeta shi = new TransHopMeta(fromstep, selstep);		transMeta.addTransHop(shi);		fromstep = selstep;		//		// 创建 写数据的 step...		//		//		// 添加 输出表 step...		//		String tostepname = "write to [" + targetTableName + "]";		TableOutputMeta toi = new TableOutputMeta();		toi.setDatabase(targetDBInfo);		toi.setTablename(targetTableName);		toi.setCommitSize(200);		toi.setTruncateTable(true);		String tostepid = steploader.getStepPluginID(toi);		StepMeta tostep = new StepMeta(log, tostepid, tostepname, (StepMetaInterface) toi);		tostep.setLocation(550, 100);		tostep.setDraw(true);		tostep.setDescription("Write information to table [" + targetTableName + "] on database [" + targetDBInfo + "]");		transMeta.addStep(tostep);		//		// 添加连线...		//		TransHopMeta hi = new TransHopMeta(fromstep, tostep);		transMeta.addTransHop(hi);		// The transformation is complete, return it...		return transMeta;	} catch (Exception e) {		throw new KettleException("An unexpected error occurred creating the new transformation", e);	}}

2.        运行现有的Transformation

如果已经创建了Transformation,并且保存它在.ktr.xml文件,能使用象下面的代码。  

public static void runTransformation(String filename) {  try {    StepLoader.init();    EnvUtil.environmentInit();    TransMeta transMeta = new TransMeta(filename);    Trans trans = new Trans(transMeta);    trans.execute(null); // You can pass arguments instead of null.    trans.waitUntilFinished();    if ( trans.getErrors() > 0 )    {      throw new RuntimeException( "There were errors during transformation execution." );    }  }  catch ( KettleException e ) {      System.out.println(e);  }}

转载于:https://my.oschina.net/aiguozhe/blog/39085

你可能感兴趣的文章
Linux:awk命令
查看>>
组策略禁止部门员工上网
查看>>
桶排序
查看>>
PPC软件字体太小的调整
查看>>
ubuntu 13.14 下 tomcat8 启动报错解决办法
查看>>
hibernate API详解
查看>>
DIY强大的虚拟化环境-组装于测试部分1.1存储主机之硬盘
查看>>
MUI框架学习(2)-页面间传值
查看>>
java多线程——锁
查看>>
Windows Batchfile Directory
查看>>
linux添加硬盘
查看>>
docker 构建magent的memcached 集群
查看>>
SaltStack runners模块分析之pillar模块
查看>>
SOSP 文档 - Windows Azure 存储:具有强一致性的高可用性云存储服务
查看>>
Java自动处理文档
查看>>
Zabbix+shell监控报警任意web
查看>>
MySQL查看和修改字符编码的实现方法
查看>>
小结asp.net中get、post用法区别
查看>>
关于在grep命令中使用{}的有趣现象
查看>>
linux网络命令(十一)之glances
查看>>