向前兼容性的推荐 :如果想要动态地创造Transformation (例如:从元数据),使用XML文件方法(KTR)而不是使用API。 XML文件兼容Kettle所有版本,同样对job有效的。
下面的例子进行以下操作:
1. 创建Transformation
2. 保存Transformation信息到XML文件
3. 获取在目标表操作的SQL
4. 执行Transformation
5. drop目标表,使这个示例可以重复。
// 创建“复制表”的 transformation元数据.TransMeta transMeta = TransBuilder.buildCopyTable( transformationName, sourceDatabaseName, sourceTableName, sourceFields, targetDatabaseName, targetTableName, targetFields);
// 保存为文件:String xml = transMeta.getXML();DataOutputStream dos = new DataOutputStream(new FileOutputStream(new File(fileName)));dos.write(xml.getBytes("UTF-8"));dos.close();System.out.println("Saved transformation to file: "+fileName);
//获取在目标表操作的SQLString sql = transMeta.getSQLStatementsString();
// 在目标表执行sql:Database targetDatabase = new Database(transMeta.findDatabase(targetDatabaseName));targetDatabase.connect();targetDatabase.execStatements(sql);
//现在执行 transformationTrans trans = new Trans(transMeta);trans.execute(null);trans.waitUntilFinished();
//为了重复测试,再次drop目标表targetDatabase.execStatement("drop table "+targetTableName);targetDatabase.disconnect();
下面是创建Transformation的方法的代码:
/*** Creates a new Transformation using input parameters such as the tablename to read from.* @param transformationName transformation的名称* @param sourceDatabaseName 输入的 database 名称* @param sourceTableName 要读取的表名* @param sourceFields 要读取的列名* @param targetDatabaseName 目标database名 * @param targetTableName要写入的表名* @param targetFields要写入的列名(要跟读取的列长度相同)* @return A new transformation metadata object* @throws KettleException In the rare case something goes wrong*/public static final TransMeta buildCopyTable(String transformationName, String sourceDatabaseName, String sourceTableName, String[] sourceFields, String targetDatabaseName, String targetTableName, String[] targetFields) throws KettleException { EnvUtil.environmentInit(); try { // Create a new transformation... // TransMeta transMeta = new TransMeta(); transMeta.setName(transformationName); // 添加数据库连接 for (int i = 0; i < databasesXML.length; i++) { DatabaseMeta databaseMeta = new DatabaseMeta(databasesXML[i]); transMeta.addDatabase(databaseMeta); } DatabaseMeta sourceDBInfo = transMeta.findDatabase(sourceDatabaseName); DatabaseMeta targetDBInfo = transMeta.findDatabase(targetDatabaseName); // //添加注释 // String note = "Reads information from table [" + sourceTableName+ "] on database [" + sourceDBInfo + "]" + Const.CR; note += "After that, it writes the information to table ["+ targetTableName + "] on database [" + targetDBInfo + "]"; NotePadMeta ni = new NotePadMeta(note, 150, 10, -1, -1); transMeta.addNote(ni); // // 创建读取数据源的 step... // String fromstepname = "read from [" + sourceTableName + "]"; TableInputMeta tii = new TableInputMeta(); tii.setDatabaseMeta(sourceDBInfo); String selectSQL = "SELECT " + Const.CR; for (int i = 0; i < sourceFields.length; i++) { if (i > 0) selectSQL += ", "; else selectSQL += " "; selectSQL += sourceFields[i] + Const.CR; } selectSQL += "FROM " + sourceTableName; tii.setSQL(selectSQL); StepLoader steploader = StepLoader.getInstance(); String fromstepid = steploader.getStepPluginID(tii); StepMeta fromstep = new StepMeta(log, fromstepid, fromstepname,(StepMetaInterface) tii); fromstep.setLocation(150, 100); fromstep.setDraw(true); fromstep.setDescription("Reads information from table [" + sourceTableName + "] on database [" + sourceDBInfo + "]"); transMeta.addStep(fromstep); // // 添加 重命名 fields的逻辑 // Use metadata logic in SelectValues, use SelectValueInfo... // SelectValuesMeta svi = new SelectValuesMeta(); svi.allocate(0, 0, sourceFields.length); for (int i = 0; i < sourceFields.length; i++) { svi.getMetaName()[i] = sourceFields[i]; svi.getMetaRename()[i] = targetFields[i]; } String selstepname = "Rename field names"; String selstepid = steploader.getStepPluginID(svi); StepMeta selstep = new StepMeta(log, selstepid, selstepname, (StepMetaInterface) svi); selstep.setLocation(350, 100); selstep.setDraw(true); selstep.setDescription("Rename field names"); transMeta.addStep(selstep); TransHopMeta shi = new TransHopMeta(fromstep, selstep); transMeta.addTransHop(shi); fromstep = selstep; // // 创建 写数据的 step... // // // 添加 输出表 step... // String tostepname = "write to [" + targetTableName + "]"; TableOutputMeta toi = new TableOutputMeta(); toi.setDatabase(targetDBInfo); toi.setTablename(targetTableName); toi.setCommitSize(200); toi.setTruncateTable(true); String tostepid = steploader.getStepPluginID(toi); StepMeta tostep = new StepMeta(log, tostepid, tostepname, (StepMetaInterface) toi); tostep.setLocation(550, 100); tostep.setDraw(true); tostep.setDescription("Write information to table [" + targetTableName + "] on database [" + targetDBInfo + "]"); transMeta.addStep(tostep); // // 添加连线... // TransHopMeta hi = new TransHopMeta(fromstep, tostep); transMeta.addTransHop(hi); // The transformation is complete, return it... return transMeta; } catch (Exception e) { throw new KettleException("An unexpected error occurred creating the new transformation", e); }}
2. 运行现有的Transformation
如果已经创建了Transformation,并且保存它在.ktr或.xml文件,能使用象下面的代码。
public static void runTransformation(String filename) { try { StepLoader.init(); EnvUtil.environmentInit(); TransMeta transMeta = new TransMeta(filename); Trans trans = new Trans(transMeta); trans.execute(null); // You can pass arguments instead of null. trans.waitUntilFinished(); if ( trans.getErrors() > 0 ) { throw new RuntimeException( "There were errors during transformation execution." ); } } catch ( KettleException e ) { System.out.println(e); }}