数据接入概述
在大数据平台落地的过程中,数据接入是必不可少的一个关键环节。面对各种来源、各种类型的数据,需要通过数据接入就是将这些零散的数据整合在一起,纳入统一的大数据平台。从数据类型的角度,数据接入主要包括结构化数据(数据库)的接入、日志数据的接入、IoT数据的接入和文件的接入。
a.结构化数据接入:结构化数据的接入主要是通过ETL工具来实现的,包括Hadoop平台提供的Sqoop,开源的kettle、talend,以及商业化产品Informatica、DataStage等;
b.日志数据接入:日志的接入工具通常有Flume、FileBeats、Logstash等;
c.IoT数据接入:IoT数据通常是以流式数据的方式,通过Kafka接入大数据平台;
d.文件接入:通过FTP或者WebDAV协议接入。
结构化数据接入ETL数据抽取
结构化数据主要指的是关系型数据库的数据,如Oracle、MySQL、DB2等,也包括MongoDB等NoSQL数据库。
结构化数据接入大数据平台是通过ETL工具实现的。ETL是英文Extract-Transform-Load的缩写,用来描述将数据从源端经过抽取(extract)、转换(transform)、加载(load)至目标端的过程。
ETL传统上用于构建数据仓库,用户从数据源抽取出所需的数据,经过数据清洗转换,最终按照预先定义好的数据仓库模型,将数据加载到数据仓库中。而在大数据时代,ETL过程逐渐被ELT,即提取-加载-转换代替,其中数据转换根据下游使用的需要在大数据平台中进行,而不是在加载阶段期间。
数据库数据的抽取又可以分为全量抽取和增量抽取:
全量抽取:全量抽取类似于数据迁移或数据复制,它将数据源中的表或视图的数据从数据库中抽取出来,并通过ETL工具转换并存储到大数据平台。
增量抽取:增量抽取只抽取自上次抽取以来数据库中增删改的数据。在ETL使用过程中,增量抽取较全量抽取应用更广。如何捕获变化的数据是增量抽取的关键。
据库增量抽取全量抽取通常用于对数据库进行初始化同步。在完成初始化同步后,后续的数据接入主要采用增量抽取的方式。
数据增量抽取的主要方式有以下几种:
时间戳方式:时间戳方式是指增量抽取时,抽取进程通过比较系统时间与抽取源表的时间戳字段的值来决定抽取哪些数据。这种方式需要在源表上增加一个时间戳字段,系统中更新修改表数据的时候,同时修改时间戳字段的值。
触发器方式:触发器方式是普遍采取的一种增量抽取机制。该方式一般要在被抽取的源表上建立插入、修改、删除3个触发器,每当源表中的数据发生变化,就被相应的触发器将变化的数据写入一个临时表,ETL的增量抽取则是从临时表中而不是直接在源表中抽取数据,同时临时表中抽取过的数据被标记或删除。
全表比对方式:全表比对方式是在增量抽取时,ETL进程逐条比较源表和目标表的记录,将新增和修改的记录读取出来。如有源表和目标表的记录不同,则进行Update操作,如目标表没有存在该主键值,表示该记录还没有,则进行Insert操作。
日志表方式:在业务系统中添加系统日志表,当业务数据发生变化时,更新维护日志表内容,增量抽取数据时,通过读日志表数据决定抽取那些数据。
系统日志分析方式:关系型数据库系统都会将所有的DML操作存储在日志文件中,ETL增量抽取进程通过对数据库的日志进行分析,提取对相关源表在特定时间后发生的DML操作信息,就可以得知自上次抽取时刻以来该表的数据变化情况,从而指导增量抽取动作。Oracle的LogMiner就是一个数据库日志的分析工具。
CDC方式:CDC指的是ChangeDataCapture。CDC特性是在Oracle9i数据库中引入的。CDC能够帮助识别从上次提取之后发生变化的数据。利用CDC,在对源表进行INSERT、UPDATE或DELETE等操作的同时就可以提取数据,并且变化的数据被保存在数据库的变化表中。这样就可以捕获发生变化的数据。
数据库实时接入
在数据库数据接入Hadoop大数据平台的时候,大多数企业采用的是使用Sqoop、Kettle等ETL工具进行批量接入的方式,这种接入方式通常会有小时级的数据延迟。而对于越来越多的实时性要求更高的数据分析应用,这个延迟就无法满足要求了,这种情况就需要对数据库数据能够实现秒级的数据接入。
明析数据实时同步工具(InfoDTDataIngestion以下简称IDI)提供了一个简便、实时和通用的解决方案,将用户生产环境中关系型数据库或其他各种结构化和非结构化数据源同步到Hadoop大数据平台或数据湖存储;数据接入到Hadoop后,用户可以通过Hive和Spark接口访问和数据源端一致的数据。
IDI的实时数据同步机制实现各类数据源到数据湖的实时数据同步,以下描述了IDI相关模块在数据实时同步过程中的实现机制:
首先,在实时同步前,对应数据源相关表中当前的数据需要保存一份快照并传输到数据湖中存储,这个过程由IDI的Fulldump模块完成,该模块获取指定表的当前数据并存储到数据湖的HDFS分布式文件系统,每个表会对应相关的存储路径。
然后,部署在源数据端的IDIExtractor模块将数据库交易日志进行解析,过滤其中已请求同步到数据湖的表的相关交易日志,然后通过IDISender模块发送到接收端IDIReceiver模块;IDIExtractor支持不同类型数据库的交易日志解析,例如OracleRedoLog,MySQLbinlog,或DB2、PostgreSQL的transactionlog。
IDIReceiver模块将接收到的交易日志通过增量添加方式写入到HDFS,每个实时同步表的增量日志数据增量写入到HDFS的指定增量文件中。
交易日志读取、发送和接收组件IDIExtractor,Sender和Receiver的设计实现满足以下需求:
a.支持各种类型关系型数据库和NoSQL交易日志的读取和解析;
b.支持接收端多实例、冗余部署;
c.实现可监控可管理的数据同步组件;
d.数据传输加密;
e.数据一致性,完整性保证;
f.数据库主机低I/O,CPU消耗;
g.支持基于HTTPS的跨机房,广域网数据实时同步。
数据同步相关组件架构图其实现流程描述如下:
1.IDIExtractor组件读取数据库日志文件,将增量部分的数据写入日志解析文件,该模块读取文件使用INotify获取文件实时变化事件,通过AIO的方式将文件增量部分读出,支持断点续传的功能。
2.IDISender实时读取日志解析文件的增量部分内容,通过HTTPS协议发送到IDIReceiver,期间如果没有数据,则保持30秒发送一次心跳包。IDISender可以从日志解析文件的任意位置重发数据。IDISender支持断点续传的功能。IDIExtractor模块和IDISender都是无状态的,在极端情况下保证数据不丢失。
3.负载均衡组件LoadBalancer分发从IDISender传输过来的报文,根据报文内容按照路由规则分发到相应的IDIReceiver,路由规则支持failover和loadbalance。
4.LoadBalancer的负载均衡分发机制会将同一个数据源的日志解析报文发送到两台或多台IDIReceiver,两台或多台IDIReceiver写入HDFS的内容保持日志解析文件中的顺序。此处使用ZK来记录最后一条成功写入日志记录序列号,IDIReceiver检查每个报文的序列号,如果连续,则写入HDFS,不连续则返回最后一条成功写入日志记录序列号,要求IDISender重新发送,该机制保证报文发送不丢失,不重复。
IDIReceiver向IDISender和Reader发送指令是通过将指令嵌入在IDISender请求的响应报文中,然后IDISender在下一次heartbeat或其他任意请求中在将IDIReceiver请求报文的处理结果返回。
大数据分析应用的快速发展对于数据的接入提出了越来越高的实时性要求。
本文简述了大数据平台数据接入的主要方式,并介绍了明析数据IDI数据库数据实时接入的方案,能够满足客户在大数据平台建设过程中对于数据实时性和数据接入统一管理的需求。