程序员的资源宝库

网站首页 > gitee 正文

mac M2 apple silicon kettle 在线http 接口执行任务

sanyeah 2024-03-29 17:14:29 gitee 6 ℃ 0 评论

carte 和 kettle

  • Carte作为Kettle服务器的一种实现,可以方便地管理和执行转换和作业,并且可以在分布式环境中扩展
  • 使用Carte,可以通过REST Web服务接口进行调度和监视转换和作业。

配置 carte

  • data-integration/pwd/carte-config-master-8080.xml
<slave_config>
  <slaveserver>
    <name>master1</name>
    <hostname>localhost</hostname>
    <port>8080</port>
    <master>Y</master>
  </slaveserver>
    <repository>
    <name>Test_repo</name>
    <is_default>true</is_default>
    <connection>test_teldb</connection>
    <username>admin</username>
    <password>admin</password>
  </repository>
</slave_config>
  • repository 资源库信息 从当前用户的家目录 ./kettle/repository.xml 读取 必须名字保持一致

配置kettle 链接信息



新建一个简单任务 可以保存为 test.ktr

<?xml version="1.0" encoding="UTF-8"?>
<transformation>
  <info>
    <name>test</name>
    <description/>
    <extended_description/>
    <trans_version/>
    <trans_type>Normal</trans_type>
    <directory>/</directory>
    <parameters>
    </parameters>
    <log>
      <trans-log-table>
        <connection/>
        <schema/>
        <table/>
        <size_limit_lines/>
        <interval/>
        <timeout_days/>
        <field>
          <id>ID_BATCH</id>
          <enabled>Y</enabled>
          <name>ID_BATCH</name>
        </field>
        <field>
          <id>CHANNEL_ID</id>
          <enabled>Y</enabled>
          <name>CHANNEL_ID</name>
        </field>
        <field>
          <id>TRANSNAME</id>
          <enabled>Y</enabled>
          <name>TRANSNAME</name>
        </field>
        <field>
          <id>STATUS</id>
          <enabled>Y</enabled>
          <name>STATUS</name>
        </field>
        <field>
          <id>LINES_READ</id>
          <enabled>Y</enabled>
          <name>LINES_READ</name>
          <subject/>
        </field>
        <field>
          <id>LINES_WRITTEN</id>
          <enabled>Y</enabled>
          <name>LINES_WRITTEN</name>
          <subject/>
        </field>
        <field>
          <id>LINES_UPDATED</id>
          <enabled>Y</enabled>
          <name>LINES_UPDATED</name>
          <subject/>
        </field>
        <field>
          <id>LINES_INPUT</id>
          <enabled>Y</enabled>
          <name>LINES_INPUT</name>
          <subject/>
        </field>
        <field>
          <id>LINES_OUTPUT</id>
          <enabled>Y</enabled>
          <name>LINES_OUTPUT</name>
          <subject/>
        </field>
        <field>
          <id>LINES_REJECTED</id>
          <enabled>Y</enabled>
          <name>LINES_REJECTED</name>
          <subject/>
        </field>
        <field>
          <id>ERRORS</id>
          <enabled>Y</enabled>
          <name>ERRORS</name>
        </field>
        <field>
          <id>STARTDATE</id>
          <enabled>Y</enabled>
          <name>STARTDATE</name>
        </field>
        <field>
          <id>ENDDATE</id>
          <enabled>Y</enabled>
          <name>ENDDATE</name>
        </field>
        <field>
          <id>LOGDATE</id>
          <enabled>Y</enabled>
          <name>LOGDATE</name>
        </field>
        <field>
          <id>DEPDATE</id>
          <enabled>Y</enabled>
          <name>DEPDATE</name>
        </field>
        <field>
          <id>REPLAYDATE</id>
          <enabled>Y</enabled>
          <name>REPLAYDATE</name>
        </field>
        <field>
          <id>LOG_FIELD</id>
          <enabled>Y</enabled>
          <name>LOG_FIELD</name>
        </field>
        <field>
          <id>EXECUTING_SERVER</id>
          <enabled>N</enabled>
          <name>EXECUTING_SERVER</name>
        </field>
        <field>
          <id>EXECUTING_USER</id>
          <enabled>N</enabled>
          <name>EXECUTING_USER</name>
        </field>
        <field>
          <id>CLIENT</id>
          <enabled>N</enabled>
          <name>CLIENT</name>
        </field>
      </trans-log-table>
      <perf-log-table>
        <connection/>
        <schema/>
        <table/>
        <interval/>
        <timeout_days/>
        <field>
          <id>ID_BATCH</id>
          <enabled>Y</enabled>
          <name>ID_BATCH</name>
        </field>
        <field>
          <id>SEQ_NR</id>
          <enabled>Y</enabled>
          <name>SEQ_NR</name>
        </field>
        <field>
          <id>LOGDATE</id>
          <enabled>Y</enabled>
          <name>LOGDATE</name>
        </field>
        <field>
          <id>TRANSNAME</id>
          <enabled>Y</enabled>
          <name>TRANSNAME</name>
        </field>
        <field>
          <id>STEPNAME</id>
          <enabled>Y</enabled>
          <name>STEPNAME</name>
        </field>
        <field>
          <id>STEP_COPY</id>
          <enabled>Y</enabled>
          <name>STEP_COPY</name>
        </field>
        <field>
          <id>LINES_READ</id>
          <enabled>Y</enabled>
          <name>LINES_READ</name>
        </field>
        <field>
          <id>LINES_WRITTEN</id>
          <enabled>Y</enabled>
          <name>LINES_WRITTEN</name>
        </field>
        <field>
          <id>LINES_UPDATED</id>
          <enabled>Y</enabled>
          <name>LINES_UPDATED</name>
        </field>
        <field>
          <id>LINES_INPUT</id>
          <enabled>Y</enabled>
          <name>LINES_INPUT</name>
        </field>
        <field>
          <id>LINES_OUTPUT</id>
          <enabled>Y</enabled>
          <name>LINES_OUTPUT</name>
        </field>
        <field>
          <id>LINES_REJECTED</id>
          <enabled>Y</enabled>
          <name>LINES_REJECTED</name>
        </field>
        <field>
          <id>ERRORS</id>
          <enabled>Y</enabled>
          <name>ERRORS</name>
        </field>
        <field>
          <id>INPUT_BUFFER_ROWS</id>
          <enabled>Y</enabled>
          <name>INPUT_BUFFER_ROWS</name>
        </field>
        <field>
          <id>OUTPUT_BUFFER_ROWS</id>
          <enabled>Y</enabled>
          <name>OUTPUT_BUFFER_ROWS</name>
        </field>
      </perf-log-table>
      <channel-log-table>
        <connection/>
        <schema/>
        <table/>
        <timeout_days/>
        <field>
          <id>ID_BATCH</id>
          <enabled>Y</enabled>
          <name>ID_BATCH</name>
        </field>
        <field>
          <id>CHANNEL_ID</id>
          <enabled>Y</enabled>
          <name>CHANNEL_ID</name>
        </field>
        <field>
          <id>LOG_DATE</id>
          <enabled>Y</enabled>
          <name>LOG_DATE</name>
        </field>
        <field>
          <id>LOGGING_OBJECT_TYPE</id>
          <enabled>Y</enabled>
          <name>LOGGING_OBJECT_TYPE</name>
        </field>
        <field>
          <id>OBJECT_NAME</id>
          <enabled>Y</enabled>
          <name>OBJECT_NAME</name>
        </field>
        <field>
          <id>OBJECT_COPY</id>
          <enabled>Y</enabled>
          <name>OBJECT_COPY</name>
        </field>
        <field>
          <id>REPOSITORY_DIRECTORY</id>
          <enabled>Y</enabled>
          <name>REPOSITORY_DIRECTORY</name>
        </field>
        <field>
          <id>FILENAME</id>
          <enabled>Y</enabled>
          <name>FILENAME</name>
        </field>
        <field>
          <id>OBJECT_ID</id>
          <enabled>Y</enabled>
          <name>OBJECT_ID</name>
        </field>
        <field>
          <id>OBJECT_REVISION</id>
          <enabled>Y</enabled>
          <name>OBJECT_REVISION</name>
        </field>
        <field>
          <id>PARENT_CHANNEL_ID</id>
          <enabled>Y</enabled>
          <name>PARENT_CHANNEL_ID</name>
        </field>
        <field>
          <id>ROOT_CHANNEL_ID</id>
          <enabled>Y</enabled>
          <name>ROOT_CHANNEL_ID</name>
        </field>
      </channel-log-table>
      <step-log-table>
        <connection/>
        <schema/>
        <table/>
        <timeout_days/>
        <field>
          <id>ID_BATCH</id>
          <enabled>Y</enabled>
          <name>ID_BATCH</name>
        </field>
        <field>
          <id>CHANNEL_ID</id>
          <enabled>Y</enabled>
          <name>CHANNEL_ID</name>
        </field>
        <field>
          <id>LOG_DATE</id>
          <enabled>Y</enabled>
          <name>LOG_DATE</name>
        </field>
        <field>
          <id>TRANSNAME</id>
          <enabled>Y</enabled>
          <name>TRANSNAME</name>
        </field>
        <field>
          <id>STEPNAME</id>
          <enabled>Y</enabled>
          <name>STEPNAME</name>
        </field>
        <field>
          <id>STEP_COPY</id>
          <enabled>Y</enabled>
          <name>STEP_COPY</name>
        </field>
        <field>
          <id>LINES_READ</id>
          <enabled>Y</enabled>
          <name>LINES_READ</name>
        </field>
        <field>
          <id>LINES_WRITTEN</id>
          <enabled>Y</enabled>
          <name>LINES_WRITTEN</name>
        </field>
        <field>
          <id>LINES_UPDATED</id>
          <enabled>Y</enabled>
          <name>LINES_UPDATED</name>
        </field>
        <field>
          <id>LINES_INPUT</id>
          <enabled>Y</enabled>
          <name>LINES_INPUT</name>
        </field>
        <field>
          <id>LINES_OUTPUT</id>
          <enabled>Y</enabled>
          <name>LINES_OUTPUT</name>
        </field>
        <field>
          <id>LINES_REJECTED</id>
          <enabled>Y</enabled>
          <name>LINES_REJECTED</name>
        </field>
        <field>
          <id>ERRORS</id>
          <enabled>Y</enabled>
          <name>ERRORS</name>
        </field>
        <field>
          <id>LOG_FIELD</id>
          <enabled>N</enabled>
          <name>LOG_FIELD</name>
        </field>
      </step-log-table>
      <metrics-log-table>
        <connection/>
        <schema/>
        <table/>
        <timeout_days/>
        <field>
          <id>ID_BATCH</id>
          <enabled>Y</enabled>
          <name>ID_BATCH</name>
        </field>
        <field>
          <id>CHANNEL_ID</id>
          <enabled>Y</enabled>
          <name>CHANNEL_ID</name>
        </field>
        <field>
          <id>LOG_DATE</id>
          <enabled>Y</enabled>
          <name>LOG_DATE</name>
        </field>
        <field>
          <id>METRICS_DATE</id>
          <enabled>Y</enabled>
          <name>METRICS_DATE</name>
        </field>
        <field>
          <id>METRICS_CODE</id>
          <enabled>Y</enabled>
          <name>METRICS_CODE</name>
        </field>
        <field>
          <id>METRICS_DESCRIPTION</id>
          <enabled>Y</enabled>
          <name>METRICS_DESCRIPTION</name>
        </field>
        <field>
          <id>METRICS_SUBJECT</id>
          <enabled>Y</enabled>
          <name>METRICS_SUBJECT</name>
        </field>
        <field>
          <id>METRICS_TYPE</id>
          <enabled>Y</enabled>
          <name>METRICS_TYPE</name>
        </field>
        <field>
          <id>METRICS_VALUE</id>
          <enabled>Y</enabled>
          <name>METRICS_VALUE</name>
        </field>
      </metrics-log-table>
    </log>
    <maxdate>
      <connection/>
      <table/>
      <field/>
      <offset>0.0</offset>
      <maxdiff>0.0</maxdiff>
    </maxdate>
    <size_rowset>10000</size_rowset>
    <sleep_time_empty>50</sleep_time_empty>
    <sleep_time_full>50</sleep_time_full>
    <unique_connections>N</unique_connections>
    <feedback_shown>Y</feedback_shown>
    <feedback_size>50000</feedback_size>
    <using_thread_priorities>Y</using_thread_priorities>
    <shared_objects_file/>
    <capture_step_performance>N</capture_step_performance>
    <step_performance_capturing_delay>1000</step_performance_capturing_delay>
    <step_performance_capturing_size_limit>100</step_performance_capturing_size_limit>
    <dependencies>
    </dependencies>
    <partitionschemas>
    </partitionschemas>
    <slaveservers>
    </slaveservers>
    <clusterschemas>
    </clusterschemas>
    <created_user>-</created_user>
    <created_date>2022/12/09 17:08:25.717</created_date>
    <modified_user>-</modified_user>
    <modified_date>2022/12/09 17:08:25.717</modified_date>
    <key_for_session_key/>
    <is_key_private>N</is_key_private>
  </info>
  <notepads>
  </notepads>
  <connection>
    <name>test</name>
    <server>localhost</server>
    <type>MYSQL</type>
    <access>Native</access>
    <database>test</database>
    <port>3306</port>
    <username>root</username>
    <password>Encrypted 2be98afc86aa7f290a416bc508fc0fc8e</password>
    <servername/>
    <data_tablespace/>
    <index_tablespace/>
    <attributes>
      <attribute>
        <code>EXTRA_OPTION_MYSQL.defaultFetchSize</code>
        <attribute>500</attribute>
      </attribute>
      <attribute>
        <code>EXTRA_OPTION_MYSQL.useCursorFetch</code>
        <attribute>true</attribute>
      </attribute>
      <attribute>
        <code>FORCE_IDENTIFIERS_TO_LOWERCASE</code>
        <attribute>N</attribute>
      </attribute>
      <attribute>
        <code>FORCE_IDENTIFIERS_TO_UPPERCASE</code>
        <attribute>N</attribute>
      </attribute>
      <attribute>
        <code>IS_CLUSTERED</code>
        <attribute>N</attribute>
      </attribute>
      <attribute>
        <code>PORT_NUMBER</code>
        <attribute>3306</attribute>
      </attribute>
      <attribute>
        <code>PRESERVE_RESERVED_WORD_CASE</code>
        <attribute>Y</attribute>
      </attribute>
      <attribute>
        <code>QUOTE_ALL_FIELDS</code>
        <attribute>N</attribute>
      </attribute>
      <attribute>
        <code>STREAM_RESULTS</code>
        <attribute>Y</attribute>
      </attribute>
      <attribute>
        <code>SUPPORTS_BOOLEAN_DATA_TYPE</code>
        <attribute>Y</attribute>
      </attribute>
      <attribute>
        <code>SUPPORTS_TIMESTAMP_DATA_TYPE</code>
        <attribute>Y</attribute>
      </attribute>
      <attribute>
        <code>USE_POOLING</code>
        <attribute>N</attribute>
      </attribute>
    </attributes>
  </connection>
  <order>
    <hop>
      <from>表输入</from>
      <to>插入 / 更新</to>
      <enabled>Y</enabled>
    </hop>
  </order>
  <step>
    <name>表输入</name>
    <type>TableInput</type>
    <description/>
    <distribute>Y</distribute>
    <custom_distribution/>
    <copies>1</copies>
    <partitioning>
      <method>none</method>
      <schema_name/>
    </partitioning>
    <connection>test</connection>
    <sql>select * from stu1;</sql>
    <limit>0</limit>
    <lookup/>
    <execute_each_row>N</execute_each_row>
    <variables_active>N</variables_active>
    <lazy_conversion_active>N</lazy_conversion_active>
    <cached_row_meta_active>N</cached_row_meta_active>
    <row-meta>
      <value-meta>
        <type>Integer</type>
        <storagetype>normal</storagetype>
        <name>id</name>
        <length>9</length>
        <precision>0</precision>
        <origin>表输入</origin>
        <comments>id</comments>
        <conversion_Mask>####0;-####0</conversion_Mask>
        <decimal_symbol>.</decimal_symbol>
        <grouping_symbol>,</grouping_symbol>
        <currency_symbol/>
        <trim_type>none</trim_type>
        <case_insensitive>N</case_insensitive>
        <collator_disabled>Y</collator_disabled>
        <collator_strength>0</collator_strength>
        <sort_descending>N</sort_descending>
        <output_padding>N</output_padding>
        <date_format_lenient>N</date_format_lenient>
        <date_format_locale>zh_CN</date_format_locale>
        <date_format_timezone>Asia/Shanghai</date_format_timezone>
        <lenient_string_to_number>N</lenient_string_to_number>
      </value-meta>
      <value-meta>
        <type>String</type>
        <storagetype>normal</storagetype>
        <name>name</name>
        <length>20</length>
        <precision>-1</precision>
        <origin>表输入</origin>
        <comments>name</comments>
        <conversion_Mask/>
        <decimal_symbol>.</decimal_symbol>
        <grouping_symbol>,</grouping_symbol>
        <currency_symbol/>
        <trim_type>none</trim_type>
        <case_insensitive>N</case_insensitive>
        <collator_disabled>Y</collator_disabled>
        <collator_strength>0</collator_strength>
        <sort_descending>N</sort_descending>
        <output_padding>N</output_padding>
        <date_format_lenient>N</date_format_lenient>
        <date_format_locale>zh_CN</date_format_locale>
        <date_format_timezone>Asia/Shanghai</date_format_timezone>
        <lenient_string_to_number>N</lenient_string_to_number>
      </value-meta>
      <value-meta>
        <type>Integer</type>
        <storagetype>normal</storagetype>
        <name>age</name>
        <length>9</length>
        <precision>0</precision>
        <origin>表输入</origin>
        <comments>age</comments>
        <conversion_Mask>####0;-####0</conversion_Mask>
        <decimal_symbol>.</decimal_symbol>
        <grouping_symbol>,</grouping_symbol>
        <currency_symbol/>
        <trim_type>none</trim_type>
        <case_insensitive>N</case_insensitive>
        <collator_disabled>Y</collator_disabled>
        <collator_strength>0</collator_strength>
        <sort_descending>N</sort_descending>
        <output_padding>N</output_padding>
        <date_format_lenient>N</date_format_lenient>
        <date_format_locale>zh_CN</date_format_locale>
        <date_format_timezone>Asia/Shanghai</date_format_timezone>
        <lenient_string_to_number>N</lenient_string_to_number>
      </value-meta>
    </row-meta>
    <attributes/>
    <cluster_schema/>
    <remotesteps>
      <input>
      </input>
      <output>
      </output>
    </remotesteps>
    <GUI>
      <xloc>192</xloc>
      <yloc>96</yloc>
      <draw>Y</draw>
    </GUI>
  </step>
  <step>
    <name>插入 / 更新</name>
    <type>InsertUpdate</type>
    <description/>
    <distribute>Y</distribute>
    <custom_distribution/>
    <copies>1</copies>
    <partitioning>
      <method>none</method>
      <schema_name/>
    </partitioning>
    <connection>test</connection>
    <commit>100</commit>
    <update_bypassed>N</update_bypassed>
    <lookup>
      <schema/>
      <table>stu2</table>
      <key>
        <name>id</name>
        <field>id</field>
        <condition>=</condition>
        <name2/>
      </key>
      <key>
        <name>name</name>
        <field>name</field>
        <condition>=</condition>
        <name2/>
      </key>
      <value>
        <name>id</name>
        <rename>id</rename>
        <update>N</update>
      </value>
      <value>
        <name>name</name>
        <rename>name</rename>
        <update>Y</update>
      </value>
    </lookup>
    <attributes/>
    <cluster_schema/>
    <remotesteps>
      <input>
      </input>
      <output>
      </output>
    </remotesteps>
    <GUI>
      <xloc>392</xloc>
      <yloc>96</yloc>
      <draw>Y</draw>
    </GUI>
  </step>
  <step_error_handling>
  </step_error_handling>
  <slave-step-copy-partition-distribution>
  </slave-step-copy-partition-distribution>
  <slave_transformation>N</slave_transformation>
  <attributes/>
</transformation>

导入 kettle

点击运行 会提示是保存到资源库 运行后可以保证运行

启动carte

data-integration/carte.sh  pwd/carte-config-master-8080.xml

可以看到读取资源库

查看状态 一定资源库连接上了

  • http://localhost:8080/kettle/status/

执行任务 url没有返回 直接转圈

  • http://localhost:8080/kettle/executeTrans/?rep=Test_repo&user=admin&pass=admin&trans=test

  • 查看数据库 R_TRANSFORMATION

返回刷新首页

点击test

python 代码 http

def invoke_task(task_name, data_up_time_key=None, data_up_time_value=None):
    url = settings.CARTE_SERVER + "/kettle/executeTrans/"
    params = {"rep": "SugonEtlRepo",
              "user": "admin",
              "pass": "******",
              "trans": f"/task/trans/{task_name}",
              "data_up_time": datetime.strftime(datetime.now(), "%Y-%m-%d %H:%M:%S"),
              "DATA_UP_TIME": datetime.strftime(datetime.now(), "%Y-%m-%d %H:%M:%S"),
              "level": 'ERROR',
              "xml": "Y"
              }
    session = requests.Session()
    session.auth = ("admin", "sugon@666#")
    session.post("http://10.132.81.181:9898")
    resp = session.get(url, params=params, timeout=30)
    xml_string = resp.content
    if len(xml_string) > 0:
        xml_dict = xmltodict.parse(xml_string)
        print(xml_dict['webresult']['message'])
        return xml_string
    else:
        return None


def add_trans_task(task_name, data_up_time_key=None, data_up_time_value=None):
    url = settings.CARTE_SERVER + "/kettle/registerTrans/"
    params = {
        "rep": "******",
        "user": "admin",
        "pass": "*****",
        "trans": f"/task/trans/{task_name}?xml=y",
        "xml": "Y",
    }
    if data_up_time_key:
        params.update({data_up_time_key: data_up_time_value})
    try:
        session = requests.Session()
        session.auth = ("admin", "*****")
        print('invoke_task url %s' % url)
        response = session.get(url, params=params)
        response.raise_for_status()
        xml_string = response.content
        if len(xml_string) > 0:
            xml_dict = xmltodict.parse(xml_string)
            print(xml_dict['webresult']['message'])
        else:
            print('ok')

            return None
    except requests.exceptions.RequestException as e:
        print("请求失败:", e)
        return "请求失败:%s" % e


def task_status(task_name: str):
    url = settings.KETTLE_SERVER + "/kettle/transStatus/"
    payload = {
        "name": task_name,
        "xml": 'Y',
    }
    headers = {
        'content-type': "application/x-www-form-urlencoded",
    }
    try:
        session = requests.Session()
        session.auth = ("admin", "*******")
        print('invoke_task url %s' % url)
        response = session.post(url, data=payload, headers=headers)
        response.raise_for_status()
        xml_string = response.content
        if len(xml_string) > 0:
            xml_dict = xmltodict.parse(xml_string)
            if xml_dict.get('webresult'):
                print(xml_dict['webresult']['message'])
            else:
                trans_status = xml_dict['transstatus']
                data_content = trans_status['logging_string']
                encode_string = data_content.replace("<![CDATA[", "").replace("]]>", "")
                logging_string = decode_base64_zipped_string(encode_string)
                print(logging_string)
        else:
            print('ok')

        return None
    except requests.exceptions.RequestException as e:
        print("请求失败:", e)
        return "请求失败:%s" % e

Tags:

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表