数据同步工具 DataX 的使用

2019-03-27 01:06|来源: 网路

架构设计

 

 

特点:

  • 支持sql-server / oracle / mysql 等jdbc支持的数据库之间互导
  • 支持数据库与solr搜索引擎之间互导
  • 采用http协议传送数据,在网络环境复杂和连接不稳定的情况下能正常工作,也可以扩展成集群、转发、负载均衡等
  • 网络不稳定、数据库连接不稳定的情况下,有重连、重试机制
  • 复杂的数据处理和异构,自定义Query-SQL和Insert/Delete/Update-SQL
  • 分布式事务、数据一致性保护。导入错误的情况下,两边数据都不会发生更改
  • 在工作异常的情况下,可以发送短信或邮件通知
  • 可以通过http网页形式随时查看工作状态和cpu 内存使用情况,方便监控

下面假设一个应用场景:

  1. 在db1上有商品TB_DEMO2_PROD、价格TB_DEMO2_PRICE、库存TB_DEMO2_STORAGE。总共3张表格
  2. 在db2上有商品及价格表TB_MY_DEMO2_PROD,库存TB_MY_DEMO2_STORAGE表2张表格
  3. 有一个solr服务器,集中了商品、价格、库存等所有信息
  4. 当db1中有数据更改时,同步到db2的表中,并从db2同步到solr搜索服务器
  5. db1到solr的同步延迟控制在5秒以内
  6. 当同步过程中有任何异常时,即可发送短信

配置步骤

db1上建立测试表格

create table TB_DEMO2_PROD  --商品表
(
  prod_id   VARCHAR2(200) not null, --商品ID
  prod_code VARCHAR2(200), --商品编号
  branchid  VARCHAR2(3),   --分公司编号
  prod_name VARCHAR2(200), --商品名称
  prod_unit VARCHAR2(50)   --计量单位
);

alter table TB_DEMO2_PROD
  add constraint PK_TB_DEMO2_PROD primary key (PROD_ID);

---------------
create table TB_DEMO2_PRICE      --价格表
(
  prod_id   VARCHAR2(200) not null, --商品ID
  price1 NUMBER(20,5),    --价格1
  price2 NUMBER(20,5),     --价格2
  price3 NUMBER(20,5)     --价格3
);

alter table TB_DEMO2_PRICE
  add constraint PK_TB_DEMO2_PRICE primary key (PROD_ID);

---------------  
create table TB_DEMO2_STORAGE      --库存表
(
  prod_id   VARCHAR2(200) not null, --商品ID
  amount NUMBER(18)    --库存量
);

alter table TB_DEMO2_STORAGE
  add constraint PK_TB_DEMO2_STORAGE primary key (PROD_ID);
  

 

db2上建立测试表格

create table TB_MY_DEMO2_PROD  --商品表
(
  prod_id   VARCHAR2(200) not null, --商品ID
  prod_code VARCHAR2(200), --商品编号
  branchid  VARCHAR2(3),   --分公司编号
  prod_name VARCHAR2(200), --商品名称
  prod_unit VARCHAR2(50),   --计量单位
  price1 NUMBER(20,5),    --价格1
  price2 NUMBER(20,5)     --价格2
);

alter table TB_MY_DEMO2_PROD
  add constraint PK_TB_MY_DEMO2_PROD primary key (PROD_ID);

---------------  
create table TB_MY_DEMO2_STORAGE      --库存表
(
  prod_id   VARCHAR2(200) not null, --商品ID
  amount NUMBER(18)    --库存量
);

alter table TB_MY_DEMO2_STORAGE
  add constraint PK_TB_MY_DEMO2_STORAGE primary key (PROD_ID);
  

 

 

建立DataX的系统事件表

如果db1上还没有DX_DATA_EVENT和DX_DATA_EVENT_STAGE表,就用下面的语句来执行建表操作

create table DX_DATA_EVENT_STAGE
(
  SYNC_NAME VARCHAR2(50) not null,  --同步方案名
  EVENT_ID    NUMBER(22) not null  --事件ID
);

alter table DX_DATA_EVENT_STAGE
  add constraint PK_DX_DATA_EVENT_STAGE primary key (SYNC_NAME);

create table DX_DATA_EVENT
(
  EVENT_ID NUMBER(22) not null,  --事件ID
  SYNC_NAME VARCHAR2(50) not null,  --同步方案名
  ROW_ID VARCHAR2(128),        --数据主键值
  OPT_TYPE VARCHAR2(1) not null,  --操作类型(U;D;I;)
  CREATE_TIME DATE not null    --更新时间
);

alter table DX_DATA_EVENT
  add constraint PK_DX_DATA_EVENT primary key (EVENT_ID);

create bitmap index IDX_DX_DATA_EVENT_SYNC_NAME on DX_DATA_EVENT (SYNC_NAME);
  
create sequence SEQ_DX_DATA_EVENT
minvalue 1
maxvalue 999999999999999999999999999
start with 1
increment by 1
cache 20;

 

编写同步方案的SQL语句

现在我们要开始做同步了,首先明确同步的方法,规定一个同步方案名(SyncName)

这是按照目标服务器的表格数来定义的,比如:J44_demo2Prod, J44_demo2Storage

 

编写同步源(source)的查询语句

<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="demoOrder1_source" >

  <select id="fullQuery" resultType="java.util.HashMap"><![CDATA[
    select t.order_code, c.danw_bh, t.modify_time, t.create_time 
      from TB_ORDER_MAIN_PARTITION t
     inner join TB_CUST_MAIN c
        on t.cust_id = c.cust_id
        where t.branch_id = 'J44'
  ]]></select>

  <select id="deltaQuery" resultType="java.util.HashMap">
    select t.order_id, t.order_code, c.danw_bh, t.modify_time, t.create_time 
      from TB_ORDER_MAIN_PARTITION t
     inner join TB_CUST_MAIN c
        on t.cust_id = c.cust_id
        where t.order_id in 
    <foreach item="item" index="index" collection="list" open="(" separator="," close=")">
        #{item}
    </foreach>
  </select>
  
</mapper>

 

编写同步目标(target)的插入语句

<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="demoOrder1_target" >
  <insert id="insertSum">
      <selectKey resultType="java.lang.Long" keyProperty="T1_ID" order="BEFORE" >
      SELECT SEQ_DX_TABLE1_SUM.NEXTVAL AS id FROM DUAL
    </selectKey>
    insert into dx_table1_sum
      (
      t1_id
      , billid
      , cust_code
      , last_modify
      , create_time
      )
    values
      (
      #{T1_ID, jdbcType=DECIMAL}
      , #{ORDER_CODE, jdbcType=VARCHAR}
      , #{DANW_BH, jdbcType=VARCHAR}
      , #{MODIFY_TIME, jdbcType=DATE}
      , #{CREATE_TIME, jdbcType=DATE}
      )
  </insert>
  
  <update id="updateSum">
    update dx_table1_sum
       set billid = #{BILLID, jdbcType=VARCHAR},
           cust_code = #{CUST_CODE, jdbcType=VARCHAR},
           last_modify = #{LAST_MODIFY, jdbcType=DATE},
           create_time = #{CREATE_TIME, jdbcType=DATE}
     where t1_id = #{entry.rowId, jdbcType=VARCHAR}
  </update>
  
  <delete id="deleteSum">
      delete from dx_table1_sum where t1_id = #{entry.rowId, jdbcType=VARCHAR}
  </delete>
  
  <delete id="clearSum">
      delete from dx_table1_sum
  </delete>
  
  <insert id="insertDet">
      <selectKey resultType="java.lang.Long" keyProperty="custId" order="BEFORE" >
      SELECT SEQ_DX_TABLE1_DET.NEXTVAL AS id FROM DUAL
    </selectKey>
    ...
  </insert>
</mapper>

 

在db1上编写触发器

----------- 表格 TB_DEMO2_PROD 对应同步方案是 sync_demo2Prod
create or replace trigger TRG_DX_TB_DEMO2_PROD
  after insert or update or delete on DX_TB_DEMO2_PROD for each row 
begin
  if inserting then 
      insert into DX_DATA_EVENT 
      values(SEQ_DX_DATA_EVENT.NEXTVAL,
              'sync_demo2Prod', :new.prod_id, 'I',  sysdate); 
  elsif updating then 
      insert into DX_DATA_EVENT 
      values(SEQ_DX_DATA_EVENT.NEXTVAL,
              'sync_demo2Prod', :old.prod_id, 'U',  sysdate);   
  elsif deleting then 
      insert into DX_DATA_EVENT 
      values(SEQ_DX_DATA_EVENT.NEXTVAL,
              'sync_demo2Prod', :old.prod_id, 'D',  sysdate);   
  end if;
end TRG_DX_TB_DEMO2_PROD;
----------- 表格 TB_DEMO2_PRICE 对应同步方案是 sync_demo2Prod
create or replace trigger TRG_DX_TB_DEMO2_PRICE
  after insert or update or delete on TB_DEMO2_PRICE for each row 
begin
  if inserting then 
      insert into DX_DATA_EVENT 
      values(SEQ_DX_DATA_EVENT.NEXTVAL,
              'sync_demo2Prod', :new.prod_id, 'I',  sysdate); 
  elsif updating then 
      insert into DX_DATA_EVENT 
      values(SEQ_DX_DATA_EVENT.NEXTVAL,
              'sync_demo2Prod', :old.prod_id, 'U',  sysdate);   
  elsif deleting then 
      insert into DX_DATA_EVENT 
      values(SEQ_DX_DATA_EVENT.NEXTVAL,
              'sync_demo2Prod', :old.prod_id, 'D',  sysdate);   
  end if;
end TRG_DX_TB_DEMO2_PRICE;
----------- 表格 DX_TB_DEMO2_STORAGE 对应同步方案是 sync_demo2Price
create or replace trigger TRG_DX_TB_DEMO2_STORAGE
  after insert or update or delete on TB_DEMO2_STORAGE for each row 
begin
  if inserting then 
      insert into DX_DATA_EVENT 
      values(SEQ_DX_DATA_EVENT.NEXTVAL,
              'sync_demo2Price', :new.prod_id, 'I',  sysdate); 
  elsif updating then 
      insert into DX_DATA_EVENT 
      values(SEQ_DX_DATA_EVENT.NEXTVAL,
              'sync_demo2Price', :old.prod_id, 'U',  sysdate);   
  elsif deleting then 
      insert into DX_DATA_EVENT 
      values(SEQ_DX_DATA_EVENT.NEXTVAL,
              'sync_demo2Price', :old.prod_id, 'D',  sysdate);   
  end if;
end TRG_DX_TB_DEMO2_STORAGE;

 

编写spring配置文件

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:jee="http://www.springframework.org/schema/jee" xmlns:tx="http://www.springframework.org/schema/tx"
    xmlns:context="http://www.springframework.org/schema/context" xmlns:aop="http://www.springframework.org/schema/aop"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd 
    http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd 
    http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-3.0.xsd 
    http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd 
    http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd"
    default-autowire="byName" default-lazy-init="false">
    
    <bean id="syncTarget_J44_demoOrder1" class="com.jzt.datax.core.SyncTargetServiceImpl">
        <property name="targetConfig" ref="syncTarget_J44_demoOrder1_config" />
    </bean>
    
    <bean id="syncSource_J44_demoOrder1" class="com.jzt.datax.core.SyncSourceServiceImpl">
        <property name="sourceConfig" ref="syncSource_J44_demoOrder1_config" />
    </bean>
    
    <bean id="syncTarget_J44_demoOrder1_config" class="com.jzt.datax.core.SyncTargetConfigration">
        <property name="syncName" value="J44_demoOrder1" />
        <property name="ibatisInsertData" value="demoOrder1_target.insertSum" />
        <property name="ibatisUpdateData" value="demoOrder1_target.updateSum" />
        <property name="ibatisDeleteData" value="demoOrder1_target.deleteSum" />
        <property name="ibatisBeforeFullSyncData" value="demoOrder1_target.clearSum" />
    </bean>
    
    <bean id="syncSource_J44_demoOrder1_config" class="com.jzt.datax.core.SyncSourceConfigration" >
        <!-- 名称(必须唯一) -->
        <property name="syncName" value="J44_demoOrder1" />
        <!-- 调度频率(cron表达式) -->
        <property name="tiggerCron" value="0/3 * * * * ?" />
        <!-- 事件检查动作 -->
        <property name="eventLookup" ref="defaultEventCheck" />
        <!-- 全量查询动作 -->
        <property name="ibatisFullQuery" value="demoOrder1_source.fullQuery" />
        <!-- 增量查询动作 -->
        <property name="ibatisDeltaQuery" value="demoOrder1_source.deltaQuery" />
        <!-- 查询结果中的主键字段名 -->
        <property name="identityField" value="ORDER_ID" />
        <!-- 同步管道 -->
        <property name="channel" ref="syncSource_J44_demoOrder1_channel" />
    </bean>
    
    <!-- 这里定义了一个同步管道,用http协议传输数据 -->
    <bean id="syncSource_J44_demoOrder1_channel" class="com.jzt.datax.core.channel.HttpPostChannel">
        <!-- 当上传数据达到某个阀值时开启压缩 -1代表永不压缩 0代表总是压缩 -->
        <property name="zipSize" value="-1" />
        <property name="dataTarget" value="http://127.0.0.1:9280/sync/J44_demoOrder1.json" />
    </bean>

</beans>

 

测试和调试项目

 

 

 


转自:http://www.cnblogs.com/luoyifan/articles/2953798

相关问答

更多
  • logstash的jdbc插件不过不支持删除、更新 所以最好的办法是自己写脚本按照mysql的日志来CRUD 如果数据量小,在数据写入时就直接同步写入mysql和es就行
  • 1.如何实现mysql与elasticsearch的数据同步? 逐条转换为json显然不合适,需要借助第三方工具或者自己实现。核心功能点:同步增、删、改、查同步。 2、mysql与elasticsearch同步的方法有哪些?优缺点对比? 目前该领域比较牛的插件有: 1)、elasticsearch-jdbc,严格意义上它已经不是第三方插件。已经成为独立的第三方工具。https://github.com/jprante/elasticsearch-jdbc 2)、elasticsearch-river-MyS ...
  • 你一定要看看phinx 。 以下是我使用MySQL和PHP执行的复杂数据库迁移步骤的示例: https : //coderwall.com/p/v3qg2q You should definitely have a look at phinx. Here's an example of a complex database migration step I performed using MySQL and PHP: https://coderwall.com/p/v3qg2q
  • matplot应该强制一次绘制绘图,这应该减少闪烁 matplot(dataX, cbind(dataY, dataY2), type="l", lwd=2, ylim = yLimits, col = c("black", "red"), lty = 1) matplot should force the plot to be drawn all at once which should reduce flickering matplot(dataX, cbind(dataY, dataY2), typ ...
  • + (NSMutableArray *)allDataInManagedObjectContext:(NSManagedObjectContext *)context { NSMutableArray *allData = [NSMutableArray array]; NSMutableArray *returnableArray=[NSMutableArray array]; NSFetchRequest *request = [[NSFetchRequest alloc] i ...
  • 警告告诉您predict.gam无法识别您传递给type参数的值。 由于它不理解,它决定使用type的默认值,即"terms" 。 请注意,带有type="terms" predict.gam返回有关模型术语的信息, 而不是概率。 因此输出值不在0和1之间。 有关mgcv::predict.gam更多信息,请查看此处 。 The warning is telling you that predict.gam doesn't recognize the value you passed to the type ...
  • 进行异步调用时无法返回。 而只是将foo称为回调: var foo = function (text) {     $('body').text(text); }; var bar = function () {     var parseData = function (dataX) {         // do something to data         foo(dataX.someText);     };     if (localStorage.hasOwnProperty(' ...
  • 这里的问题是d3.csv是异步的,因此数据的填充时间与访问时间不同。 如果你想运行d3.csv一次,获取数据并将它们保存在别处,你可以尝试这样或者这样或者这样的东西 一般来说: // define your module here var Module_BarDataDaily = function(data) { this.dataX = data; //this.dataX will store the data this.process = function(){// do wha ...
  • Java已经在base64中编码了原始图像数据,所以在PHP中你不需要使用imagecreatefromstring (除非你想进一步操作它),因为它已经是一个图像了。 这段代码可能足够了: $datax = base64_decode($data['image']); file_put_contents('upload/abc.png', $datax); Java already encoded the raw image data in base64 so in PHP you don't need ...
  • 您可以通过两种方式修复脚本,两者都涉及更改update功能: 我认为在更新函数中使用分散调用更清晰 在更新中调用set_offsets之前转置数据数组 使用分散调用是最明确的修复,您可以在运行期间增加代理: def update(self, i): """Update the scatter plot.""" dataX, dataY = next(self.stream) self.scat = self.ax.scatter(dataX, dataY, c="tomato", s ...