基于大数据开发套件的增量同步策略

今天我们来讨论如何使用大数据开发套件进行增量同步。

我们把需要同步的数据,根据数据写入后是否会发生变化,分为会变化的数据(人员表比如说,人员的状态会发生变化)和不会发生变化的数据(一般是日志数据)。针对这两种场景,我们需要设计不同的同步策略。这里以把业务RDS数据库的数据同步到MaxCompute为例做一些说明,其他的数据源的道理是一样的。根据等幂性原则(也就是说一个任务,多次运行的结果是一样的,这样才能支持重跑调度。如果任务出现错误,也比较容易清理脏数据),我每次导入数据都是导入到一张单独的表/分区里,或者覆盖里面的历史记录。

本文的测试时间是2016-11-14,全量同步是在14号做的,同步历史数据到ds=20161113这个分区里。至于本文涉及的增量同步的场景,配置了自动调度,把增量数据在15号凌晨同步到ds=20161114的分区里。数据里有一个时间字段optime,用来表示这条数据的修改时间,从而判断这条数据是否是增量数据。

不变的数据

对应这种场景,因为数据生成后就不会发生变化,我们可以很方便地根据数据的生成规律进行分区,比较常见的是根据日期进行分区,比如每天一个分区。以下是测试数据:

drop table if exists oplog;
create table if not exists oplog(
 optime DATETIME,
 uname varchar(50),
 action varchar(50),
 status varchar(10)
 );

Insert into oplog values(str_to_date('2016-11-11','%Y-%m-%m'),'LiLei','SELECT','SUCCESS');
Insert into oplog values(str_to_date('2016-11-12','%Y-%m-%m'),'HanMM','DESC','SUCCESS');

这里有2条数据,当成历史数据。我先做一次全量数据同步,到昨天的分区里。配置方法如下:
先在MaxCompute创建好表:

--创建好MaxCompute表,按天进行分区
create table if not exists ods_oplog(
 optime datetime,
 uname string,
 action string,
 status string
) partitioned by (ds string);

然后配置了历史数据数据同步:

因为只需要跑一次,做以下测试就可以了。测试后到数据开发里把任务的状态改成暂停(最右边的调度配置了)并重新发布,免得明天他继续跑了。之后到MaxCompute里看一下结果:

测试通过后。往Mysql里多写一些数据作为增量数据:

 insert into oplog values(CURRENT_DATE,'Jim','Update','SUCCESS');
 insert into oplog values(CURRENT_DATE,'Kate','Delete','Failed');
 insert into oplog values(CURRENT_DATE,'Lily','Drop','Failed');

然后配置同步任务如下。需要特别注意的是数据过滤这的配置,通过这个配置,可以在15号的凌晨的同步的时候,把14号全天新增的数据查询出来,然后同步到增量分区里。

这个任务需要发布,设置调度周期为每天调度,第二天过来一看,MaxCompute里的数据变成了:

会变的数据

如人员表、订单表一类的会发生变化的数据,根据数据仓库的4个特点里的反映历史变化的这个特点的要求,我们建议每天对数据进行全量同步。也就是说每天保存的都是数据的全量数据,这样历史的数据和当前的数据都可以很方便地获得。不过如果真实的场景下因为某些特殊情况,需要每天也只做增量同步,因为MaxCompute不支持Update语句来修改数据,只能用别的一些方法来实现。两种同步策略的具体方法如下:

首先我们需要造一些数据:

drop table if exists user ;
create table if not exists user(
    uid int,
    uname varchar(50),
    deptno int,
    gender VARCHAR(1),
    optime DATETIME
    );
--历史数据
insert into user values (1,'LiLei',100,'M',str_to_date('2016-11-13','%Y-%m-%d'));
insert into user values (2,'HanMM',null,'F',str_to_date('2016-11-13','%Y-%m-%d'));
insert into user values (3,'Jim',102,'M',str_to_date('2016-11-12','%Y-%m-%d'));
insert into user values (4,'Kate',103,'F',str_to_date('2016-11-12','%Y-%m-%d'));
insert into user values (5,'Lily',104,'F',str_to_date('2016-11-11','%Y-%m-%d'));
--增量数据
update user set deptno=101,optime=CURRENT_TIME  where uid = 2; --null改成非null
update user set deptno=104,optime=CURRENT_TIME  where uid = 3; --非null改成非null
update user set deptno=null,optime=CURRENT_TIME  where uid = 4; --非null改成null
delete from user where uid = 5;
insert into user(uid,uname,deptno,gender,optime) values (6,'Lucy',105,'F',CURRENT_TIME);

每天全量同步

每天全量同步同步比较简单:

--全量同步
create table ods_user_full(
    uid bigint,
    uname string,
    deptno bigint,
    gender string,
    optime DATETIME
) partitioned by (ds string);

然后配置同步为:

测试后结果为

因为每天都是全量同步,没有全量和增量的区别,所以第二天就能看到数据结果为

如果需要查询的话,就用where ds =‘20161114’来取全量数据即可了。

每天增量

非常不推荐用这种方法,只有在极特殊的场景下才考虑。首先这种场景不支持delete语句,因为被删除的数据无法通过SQL语句的过滤条件查到。当然实际上公司里的代码很少直接有直接删除数据的,都是使用逻辑删除,那delete就转化成update来处理了。但是这里毕竟限制了一些特殊的业务场景不能做了,当出现特殊情况可能导致数据不一致。另外还有一个缺点就是同步后要对新增的数据和历史数据做合并。具体的做法如下:
首先需要创建2张表,一张写当前的最新数据,一张写增量数据:

--结果表
create table dw_user_inc(
    uid bigint,
    uname string,
    deptno bigint,
    gender string,
    optime DATETIME
);
--增量记录表
create table ods_user_inc(
    uid bigint,
    uname string,
    deptno bigint,
    gender string,
    optime DATETIME
)

然后全量数据可以直接写入结果表:

结果如下:

这个只要跑一次的,记得跑好后要暂停掉。
然后把增量数据写入到增量表里:

结果如下

然后做一次合并

insert overwrite table dw_user_inc
select
--所有select操作,如果ODS表有数据,说明发生了变动,以ODS表为准
case when b.uid is not null then b.uid else a.uid end as uid,
case when b.uid is not null then b.uname else a.uname end as uname,
case when b.uid is not null then b.deptno else a.deptno end as deptno,
case when b.uid is not null then b.gender else a.gender end as gender,
case when b.uid is not null then b.optime else a.optime end as optime
from
dw_user_inc a
full outer join ods_user_inc b
on a.uid  = b.uid ;

最终结果是:

可以看到,delete的那条记录没有同步成功。

对比以上两种同步方式,可以很清楚看到两种同步方法的区别和优劣。第二种方法的优点是同步的增量数据量比较小,但是带来的缺点有可能有数据不一致的风险,而且还需要用额外的计算进行数据合并。如无必要,会变化的数据就使用方法一即可。如果对历史数据希望只保留一定的时间,超出时间的做自动删除,可以设置Lifecycle。

时间: 2017-05-11

基于大数据开发套件的增量同步策略的相关文章

云享团——基于大数据开发套件的增量同步策略

免费开通大数据服务:https://www.aliyun.com/product/odps 转载自云享团 因为近期遇到用户在做ETL操作导入数据到MaxCompute的时候,对如何设置数据同步策略有疑惑,所以今天第一波我们来聊一下数据的同步策略,根据数据的特性,看看哪些数据适合增量同步,哪些适合全量同步,又是如何实现的?请认真看完下面的介绍,这些问题都不是事儿. 我们把需要同步的数据,根据数据写入后是否会发生变化分为:会变化的数据(人员表比如说,人员的状态会发生变化)和不会发生变化的数据(一般是

大数据开发套件-数据集成-云mongo跨区域如何同步到Maxcompute

在大数据开发套件中是可以实现mongo同步到Maxcompute. 数据集成文档:https://help.aliyun.com/document_detail/47677.html?spm=5176.7750354.6.599.jGn50I后端是通过华东1区的调度资源进行数据的调度传输.但是如果阿里云mongo不在华东1在其他区域,使用默认资源组就不能正常同步了.那么就需要用户通过自己添加调度机器进行同步.1,准备一台调度服务器,要求必须和云mongo相同网络类型相同的区域.官方文档:http

大数据开发套件—数据集成常见问题

我们在进行大数据开发过程中,会遇到各种问题,本文将定期收集整理一些在使用阿里云数加 大数据开发套件 过程中遇到的常见问题,供大家参考~ Q: 配置数据同步任务,在选择数据源时,出现下图中的错误,该怎么办? A: 建议您刷新页面,清空缓存,重新登录. Q:数据同步时,如何进行增量同步? A: 具体操作可参考 数据增量同步 文档 . Q:新增数据源时,RDS 数据源测试连通性不通怎么办? A:当 RDS 数据源测试连通性不通时,需要到自己的 RDS 上添加数据同步机器 IP 白名单: 10.152.

【大数据开发套件调度配置实践】——不同周期任务依赖配置

大数据开发过程中常遇到不同运行周期的任务进行依赖,常见 天任务依赖小时任务. 小时任务依赖分钟任务 .那么如何通过大数据开发套件开发这两种场景呢? 本文将从这两个场景出发,结合调度依赖/参数/调度执行等,介绍不同周期调度依赖的最佳操作实践. 再此之前,我们先明确几个概念: 业务日期:业务数据产生的日期,这里指完整一天的业务数据.在大数据开发套件里任务每天能处理的最近的完整一天业务数据是昨天的数据,所以业务日期=日常调度日期-1天. 依赖关系:依赖关系是描述两个或多个节点/工作流之间的语义连接关系

【大数据新手上路】“零基础”系列课程--如何通过大数据开发套件Data IDE玩转大数据

免费开通大数据服务:https://www.aliyun.com/product/odps 老板每天都要出这些业务数据(销售总额.总交易量.总点击次数.总加入购物车次数.总加入收藏夹次数...),我得想个一劳永逸的方法了- 幸好,我有数加神器大数据开发套件Data IDE,搞定业务工作流调度,每日定时自动执行任务,分分钟输出计算结果. 妈妈再也不用担心我焦头奋战了-- 本教程是一个大数据开发套件Data IDE零基础教程,通过Data IDE将多源异构的数据集导入云端MaxCompute,进行计

【大数据开发套件调度配置实践】——调度任务各种周期配置和调度形态

数加·大数据开发套件目前支持任务调度周期有五种:天.周.月.分钟.小时.本文将介绍这五种周期的配置和调度形态. 调度规则--调度任务是否能运行起来要满足的条件: 上游任务实例是否都运行成功.若所有上游任务实例都运行成功则触发任务进入等待时间状态. 任务实例定时时间是否已经到.任务实例进入等待时间状态后会check本身定时时间是否到,如果时间到了则进入等待资源状态: 当前调度资源是否充足.任务实例进入等待资源状态后,check当前本项目调度资源是否充足,若充足则可以运行起来. 天调度任务 天调度任

大数据开发套件—调度运维常见问题

我们在进行大数据开发过程中,会遇到各种问题,本文将定期收集整理一些在使用阿里云数加 大数据开发套件 时遇到的常见问题,供大家参考~ Q. 如果之前提交的任务修改后再次提交,是否会影响当天的任务调度? A. 根据修改的内容来确定是否会影响:如果修改的只是 sql 语句,则不会影 响:如果修改自定义参数和调度配置以后重新提交的,都会影响当天的任 务调度 . Q. 创建一个新的工作流任务,如果保存后没有提交任务,是否可以进行测试? A. 仅保存后没有提交,sql 任务可以在本地运行,但不可以提交测试

大数据开发套件中数据同步-日志报错回滚信息的一些问题总结

在使用大数据开发套件时最常用的就是数据同步模块,工单里最常见的问题就是其中数据同步的问题,这里总结一些常见一些从MaxCompute(原名ODPS)到其他数据源的同步任务报错案例,主要是日志中出现数据回滚写入的问题. 那首先看下日志中数据回滚的原因,当数据写入rds或者hybridDB等一些支持事务的数据库中,数据批量写入,一旦由于各种原因没有写入成功,这个批次的数据会回滚重新写入,如果再次写入失败,就会报脏数据的错误导致任务失败.数据写入失败可能是以下原因导致回滚.1,脏数据(数据值超过数据类

阿里云大学精品课程:深入理解阿里云数加大数据开发套件Data IDE-基本知识

阿里云大学精品课程:深入理解阿里云数加大数据开发套件Data IDE-基本知识 写在最前面 >>>进入了解更多>>>阿里云数加·MaxCompute大数据计算服务. 基于阿里云数加·MaxCompute构建大数据仓库的开发工具利器Data IDE<MaxCompute(原ODPS)开发入门指南--数据开发工具篇>,那么基于Data IDE进行数据开发想必也遇到一些不少的困惑,就自己在培训过程中的一些经验或者说阿里集团内的踩坑之路与大家在此分享,也欢迎拍砖.