程序员的资源宝库

网站首页 > gitee 正文

kettle java 代码调用 kettle调用java代码处理数据

sanyeah 2024-03-29 17:14:26 gitee 6 ℃ 0 评论

使用程序调用 kettle

pom.xml

点击查看代码
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.example</groupId>
    <artifactId>fucking-great-kettle</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>fucking-great-kettle</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <java.version>1.8</java.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <spring-boot.version>2.2.6.RELEASE</spring-boot.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.pentaho</groupId>
            <artifactId>kettle-core</artifactId>
            <version>9.3.0.0-428</version>
            <scope>system</scope>
            <systemPath>${project.basedir}/src/main/resources/lib/kettle-core-9.3.0.0-428.jar</systemPath>
        </dependency>

        <dependency>
            <groupId>org.pentaho</groupId>
            <artifactId>kettle-engine</artifactId>
            <version>9.3.0.0-428</version>
            <scope>system</scope>
            <systemPath>${project.basedir}/src/main/resources/lib/kettle-engine-9.3.0.0-428.jar</systemPath>
        </dependency>

        <dependency>
            <groupId>org.pentaho</groupId>
            <artifactId>metastore</artifactId>
            <version>9.3.0.0-428</version>
            <scope>system</scope>
            <systemPath>${project.basedir}/src/main/resources/lib/metastore-9.3.0.0-428.jar</systemPath>
        </dependency>

        <dependency>
            <groupId>org.pentaho</groupId>
            <artifactId>pentaho-encryption-support</artifactId>
            <version>9.3.0.0-428</version>
            <scope>system</scope>
            <systemPath>${project.basedir}/src/main/resources/lib/pentaho-encryption-support-9.3.0.0-428.jar
            </systemPath>
        </dependency>

        <dependency>
            <groupId>org.pentaho</groupId>
            <artifactId>kettle-dbdialog</artifactId>
            <version>9.3.0.0-428</version>
            <scope>system</scope>
            <systemPath>${project.basedir}/src/main/resources/lib/kettle-dbdialog-9.3.0.0-428.jar</systemPath>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.commons/commons-vfs2 -->
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-vfs2</artifactId>
            <version>2.7.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-compress</artifactId>
            <version>1.20</version>
        </dependency>

        <dependency>
            <groupId>commons-net</groupId>
            <artifactId>commons-net</artifactId>
            <version>1.4.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpclient</artifactId>
            <version>4.5.9</version>
        </dependency>
        <dependency>
            <groupId>com.jcraft</groupId>
            <artifactId>jsch</artifactId>
            <version>0.1.54</version>
        </dependency>
        <!--        <dependency>-->
        <!--            <groupId>org.apache.jackrabbit</groupId>-->
        <!--            <artifactId>jackrabbit-jcr2dav</artifactId>-->
        <!--            <version>2.4.1</version>-->
        <!--        </dependency>-->


        <!-- https://mvnrepository.com/artifact/com.google.guava/guava -->
        <dependency>
            <groupId>com.google.guava</groupId>
            <artifactId>guava</artifactId>
            <version>30.0-android</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/commons-io/commons-io -->
        <dependency>
            <groupId>commons-io</groupId>
            <artifactId>commons-io</artifactId>
            <version>2.7</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.commons/commons-lang3 -->
        <!--dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
            <version>3.10</version>
        </dependency-->

        <!-- https://mvnrepository.com/artifact/commons-lang/commons-lang -->
        <dependency>
            <groupId>commons-lang</groupId>
            <artifactId>commons-lang</artifactId>
            <version>2.6</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/commons-codec/commons-codec -->
        <dependency>
            <groupId>commons-codec</groupId>
            <artifactId>commons-codec</artifactId>
            <version>1.14</version>
        </dependency>


        <!-- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.12</version>
            <scope>provided</scope>
        </dependency>

        <!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.20</version>
        </dependency>


        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.testng</groupId>
            <artifactId>testng</artifactId>
            <version>RELEASE</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-dependencies</artifactId>
                <version>${spring-boot.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
            <!-- https://mvnrepository.com/artifact/org.pentaho/pentaho-parent-pom -->
            <dependency>
                <groupId>org.pentaho</groupId>
                <artifactId>pentaho-parent-pom</artifactId>
                <version>8.1.0.0-365</version>
                <type>pom</type>
            </dependency>

            <!-- connector -->
            <dependency>
                <groupId>mysql</groupId>
                <artifactId>mysql-connector-java</artifactId>
                <version>5.1.49</version>
            </dependency>

        </dependencies>
    </dependencyManagement>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <version>2.2.6.RELEASE</version>
            </plugin>
        </plugins>
    </build>

</project>

整个代码
点击查看代码
package com.example.fg.kettle;

import lombok.extern.slf4j.Slf4j;
import org.pentaho.di.core.KettleEnvironment;
import org.pentaho.di.core.database.DatabaseMeta;
import org.pentaho.di.core.exception.KettleException;
import org.pentaho.di.core.exception.KettleXMLException;
import org.pentaho.di.core.plugins.PluginRegistry;
import org.pentaho.di.core.plugins.StepPluginType;
import org.pentaho.di.repository.kdr.KettleDatabaseRepository;
import org.pentaho.di.repository.kdr.KettleDatabaseRepositoryMeta;
import org.pentaho.di.trans.Trans;
import org.pentaho.di.trans.TransHopMeta;
import org.pentaho.di.trans.TransMeta;
import org.pentaho.di.trans.step.StepMeta;
import org.pentaho.di.trans.steps.insertupdate.InsertUpdateMeta;
import org.pentaho.di.trans.steps.tableinput.TableInputMeta;
import org.pentaho.di.trans.steps.tableoutput.TableOutputMeta;

import javax.servlet.http.HttpServletRequest;
import java.io.*;

@Slf4j
public class KettleClient {

    public static void main(String[] args) {
        try {
            KettleClient client = new KettleClient();
            client.initKettleEnvironment(null);

            TransMeta meta = client.buildTransMeta("kettle");
            PluginRegistry registry = client.getRegistry();

            StepMeta step1 = client.setTableInputStep(meta, registry,
                    "test-data", "select id,name from stu1", "table input");

            StepMeta step2 = client.setTableOutput(meta, registry,
                    "test-data", "stu2", "table insert");
            client.addTransHop(meta, step1, step2);

            client.executeTrans(meta, "test");
        } catch (KettleException e) {
            e.printStackTrace();
        }
    }

    /**
     * 初始化环境
     */
    public void initKettleEnvironment(HttpServletRequest request) throws KettleException {
        if (KettleEnvironment.isInitialized()) {
            return;
        }

        if (request == null) {
            // 运行环境初始化
            KettleEnvironment.init();
        } else {
            String userDir = System.getProperty("user.dir");
            String kettleHome = request.getSession().getServletContext().getRealPath(File.separator + "WEB-INF");
            // 设置用户路径和系统环境,包括用户路径和主目录
            System.setProperty("user.dir", kettleHome);
            System.setProperty("KETTLE_HOME", kettleHome);
            // 运行环境初始化
            KettleEnvironment.init();
            // 避免造成影响其他程序的运行,还原用户路径
            System.setProperty("user.dir", userDir);
        }
    }

    /**
     * 创建转化元
     */
    public TransMeta buildTransMeta(String metaName, String... transXML) throws KettleXMLException {
        TransMeta transMeta = new TransMeta();
        // 设置转化元的名称
        transMeta.setName(metaName);
        // 添加转换的数据库连接
        transMeta.addDatabase(getDatabaseMeta());
        return transMeta;
    }

    /**
     * 设置表输入步骤
     */
    public StepMeta setTableInputStep(TransMeta transMeta, PluginRegistry registry, String sourceDbName, String sql, String stepName) {
        // 创建表输入
        TableInputMeta tableInputMeta = new TableInputMeta();
        String pluginId = registry.getPluginId(StepPluginType.class, tableInputMeta);
        // 指定数据源数据库配置名
        DatabaseMeta source = transMeta.findDatabase(sourceDbName);
        tableInputMeta.setDatabaseMeta(source);
        tableInputMeta.setSQL(sql);
        // 将表输入添加到转换中
        StepMeta stepMeta = new StepMeta(pluginId, stepName, tableInputMeta);
        // 给步骤添加在spoon工具中的显示位置
        stepMeta.setDraw(true);
        stepMeta.setLocation(100, 100);
        // 将表输入添加到步骤中
        transMeta.addStep(stepMeta);
        return stepMeta;
    }

    /**
     * 设置表输出步骤,用于整表抽取
     */
    public StepMeta setTableOutput(TransMeta transMeta, PluginRegistry registry, String targetDbName, String targetTableName, String stepName) {
        // 创建表输出
        TableOutputMeta tableOutputMeta = new TableOutputMeta();
        String pluginId = registry.getPluginId(StepPluginType.class, tableOutputMeta);
        // 配置表输出的目标数据库配置名

        DatabaseMeta targetDb = transMeta.findDatabase(targetDbName);
        tableOutputMeta.setDatabaseMeta(targetDb);
        tableOutputMeta.setTableName(targetTableName);
        // 将表输出添加到转换中
        StepMeta stepMeta = new StepMeta(pluginId, stepName, tableOutputMeta);
        transMeta.addStep(stepMeta);
        return stepMeta;
    }

    /**
     * 设置表插入与更新步骤,用于表中部分字段更新
     */
    public StepMeta setInsertUpdateMeta(TransMeta transMeta, PluginRegistry registry, String targetDbName, String targetTableName, String[] updatelookup, String[] updateStream, String[] updateStream2, String[] conditions, Boolean[] updateOrNot, String stepName) {
        // 创建插入与更新
        InsertUpdateMeta insertUpdateMeta = new InsertUpdateMeta();
        String pluginId = registry.getPluginId(StepPluginType.class, insertUpdateMeta);
        // 配置目标数据库配置名
        DatabaseMeta database_target = transMeta.findDatabase(targetDbName);
        insertUpdateMeta.setDatabaseMeta(database_target);
        // 设置目标表名
        insertUpdateMeta.setTableName(targetTableName);
        // 设置用来查询的关键字
        insertUpdateMeta.setKeyLookup(updatelookup);
        insertUpdateMeta.setKeyStream(updateStream);
        insertUpdateMeta.setKeyStream2(updateStream2);// 这一步不能省略
        insertUpdateMeta.setKeyCondition(conditions);
        // 设置要更新的字段
        insertUpdateMeta.setUpdateLookup(updatelookup);
        insertUpdateMeta.setUpdateStream(updateStream);
        insertUpdateMeta.setUpdate(updateOrNot);
        // 添加步骤到转换中
        StepMeta stepMeta = new StepMeta(pluginId, stepName, insertUpdateMeta);
        stepMeta.setDraw(true);
        stepMeta.setLocation(250, 100);
        transMeta.addStep(stepMeta);
        return stepMeta;
    }

    /**
     * 用于将表输入步骤与第二步骤绑定 绑定关联步骤
     */
    public void addTransHop(TransMeta transMeta, StepMeta from, StepMeta to) {
        transMeta.addTransHop(new TransHopMeta(from, to));
    }

    /**
     * 执行抽取
     */
    @SuppressWarnings("resource")
    public void executeTrans(TransMeta transMeta, String targetDbName) {
        try {
            KettleDatabaseRepository repository = RepositoryCon();
            transMeta.setRepository(repository);
            Trans trans = new Trans(transMeta);
            trans.execute(new String[]{"start..."});
            trans.waitUntilFinished();
            // 关闭数据库连接
            if (trans.getErrors() > 0) {
                throw new RuntimeException("There were errors during transformation execution.");
            }
        } catch (KettleException e) {
            e.printStackTrace();
        }
    }

    private static DatabaseMeta getDatabaseMeta() {
        DatabaseMeta databaseMeta = new DatabaseMeta("test-data", "MYSQL", "Native", "localhost",
                "test", "3306", "root", "toor@1234");
        // 关闭mysql推荐SSL连接提示
        databaseMeta.addExtraOption("MYSQL", "useSSL", "false");
        return databaseMeta;
    }

    /**
     * * 连接到资源库
     */
    private static KettleDatabaseRepository RepositoryCon() throws KettleException {

        // 初始化环境
        if (!KettleEnvironment.isInitialized()) {
            try {
                KettleEnvironment.init();
            } catch (KettleException e) {
                e.printStackTrace();
            }
        }
        // 数据库形式的资源库元对象
        KettleDatabaseRepositoryMeta kettleDatabaseRepositoryMeta = new KettleDatabaseRepositoryMeta();
        kettleDatabaseRepositoryMeta.setConnection(getDatabaseMeta());
        // 数据库形式的资源库对象
        KettleDatabaseRepository kettleDatabaseRepository = new KettleDatabaseRepository();
        // 用资源库元对象初始化资源库对象
        kettleDatabaseRepository.init(kettleDatabaseRepositoryMeta);
        // 连接到资源库 , 默认的连接资源库的用户名和密码
        kettleDatabaseRepository.connect("admin", "admin");
        if (kettleDatabaseRepository.isConnected()) {
            System.out.println("连接成功");
            return kettleDatabaseRepository;
        } else {
            System.out.println("连接失败");
            return null;
        }
    }


    public PluginRegistry getRegistry() {
        // 插件注册,用于注册转换中需要用到的插件
        return PluginRegistry.getInstance();
    }


}

Tags:

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表