一、CDC背景介绍
首先我们介绍什么是CDC?CDC的全称是ChangedataCapture,即变更数据捕获,它是数据库领域非常常见的技术,主要用于捕获数据库的一些变更,然后可以把变更数据发送到下游。它的应用比较广,可以做一些数据同步、数据分发和数据采集,还可以做ETL,今天主要分享的也是把DB数据通过CDC的方式ETL到数据湖。
对于CDC,业界主要有两种类型:一是基于查询的,客户端会通过SQL方式查询源库表变更数据,然后对外发送。二是基于日志,这也是业界广泛使用的一种方式,一般是通过binlog方式,变更的记录会写入binlog,解析binlog后会写入消息系统,或直接基于FlinkCDC进行处理。
它们两者是有区别的,基于查询比较简单,是入侵性的,而基于日志是非侵入性,对数据源没有影响,但binlog的解析比较复杂一些。
基于查询和基于日志,分别有四种实现技术,有基于时间戳、基于触发器和快照,还有基于日志的,这是实现CDC的技术,下面是几种方式的对比。
通过这个表格对比可以发现基于日志的综合最优,但解析比较复杂,但业界有很多开源的binlog的解析器,比较通用和流行的有Debezium、Canal,以及Maxwell。基于这些binlog解析器就可以构建ETL管道。
下面来看下业界比较流行的一种CDC入仓架构。
整个数据入仓是分实时流是离线流,实时流解析binlog,通过Canal解析binlog,然后写入Kafka,然后每个小时会把Kafka数据同步到Hive中;另外就是离线流,离线流需要对同步到Hive的贴源层的表进行拉取一次全量,如果只有前面的实时流是数据是不全的,必须通过离线流的SQLSelect把全量导入一次数据,对每张ODS表会把存量数据和增量数据做一个Merge。这里可以看到对于ODS层的实时性不够,存在小时、天级别的延迟。而对ODS层这个延时可以通过引入ApacheHudi做到分钟级。
二、CDC数据入湖方法
基于CDC数据的入湖,这个架构非常简单。上游各种各样的数据源,比如DB的变更数据、事件流,以及各种外部数据源,都可以通过变更流的方式写入表中,再进行外部的查询分析,整个架构非常简单。
架构虽然简单,但还是面临很多挑战。以ApacheHudi数据湖为例,数据湖是通过文件存储各种各样的数据,对于CDC的数据处理需要对湖里某部分文件进行可靠地、事务性变更,这样可以保证下游查询不会看到部分结果,另外对CDC数据需要高效的做更新、删除操作,这就需要快速定位到更改的文件,另外是对于每小批量的数据写入,希望能够自动处理小文件,避免繁杂的小文件处理,还有面向查询的布局优化,可以通过一些技术手段如Clustering改造文件布局,对外提供更好的查询性能。
而ApacheHudi是怎么应对这些挑战的呢?首先支持事务性写入,包括读写之间的MVCC机制保证写不影响读,也可以控制事务及并发保证,对于并发写采用OCC乐观锁机制,对更新删除,内置一些索引及自定义保证更新、删除比较高效。另外是面向查询优化,Hudi内部会自动做小文件的管理,文件会自动长到用户指定的文件大小,如M,这对Hudi来说也是比较核心的特性。另外Hudi提供了Clustering来优化文件布局的功能。
下图是典型CDC入湖的链路。上面的链路是大部分公司采取的链路,前面CDC的数据先通过CDC工具导入Kafka或者Pulsar,再通过Flink或者是Spark流式消费写到Hudi里。第二个架构是通过FlinkCDC直联到MySQL上游数据源,直接写到下游Hudi表。
其实,这两条链路各有优缺点。第一个链路统一数据总线,扩展性和容错性都很好。对于第二条链路,扩展性和容错性会稍微差点,但由于组件较少,维护成本相应较低。
这是阿里云数据库OLAP团队的CDC入湖链路,因为我们我们做Spark的团队,所以我们采用的SparkStreaming链路入湖。整个入湖链路也分为两个部分:首先有一个全量同步作业,会通过Spark做一次全量数据拉取,这里如果有从库可以直连从库做一次全量同步,避免对主库的影响,然后写到Hudi。然后会启动一个增量作业,增量作业通过Spark消费阿里云DTS里的binlog数据来将binlog准实时同步至Hudi表。全量和增量作业的编排借助了Lakehouse的作业自动编排能力,协调全量和增量作业,而对于全量和增量衔接时利用Hudi的Upsert语义保证全增量数据的最终的一致性,不会出现数据偏多和偏少的问题。
在Lakehouse的CDC入湖链路中,我们团队也做了一些优化。
第一个是原库的Schema变更处理,我们对接的客户某些列的增加、删除或者修改某些列的场景。在Spark写Hudi之前会做Schema的检验,看这个Schema是不是合法,如果合法就可以正常写入,如果不合法的话,则会写入失败,而删除字段会导致Schema校验不合法,导致作业失败,这样稳定性是没有保证的。因此我们会捕捉SchemaValidation的异常,如果发现是减少了字段,我们会把之前的字段做自动补全,然后做重试,保证链路是稳定的。
第二个有些客户表没有主键或者主键不合理,比如采用更新时间字段作为主键,或者设置会变化的分区字段,这时候就会导致写入Hudi的数据和源库表数据对不上。因此我们做了一些产品层面的优化,允许用户合理设置主键和分区映射,保证同步到Hudi里和源库是数据完全对齐的。
还有一个常见需求是用户在上游库中增加一个表,如果使用表级别同步的话,新增表在整个链路是无法感知的,也就无法同步到Hudi中,而在Lakehouse中,我们可以对整库进行同步,因此在库中新增表时,会自动感知新增表,将新增表数据自动同步到Hudi,做到原库增加表自动感知的能力。
还有一个是对CDC写入时候性能优化,比如拉取包含Insert、Update、Delete等事件的一批数据,是否一直使用Hudi的Upsert方式写入呢?这样控制比较简单,并且Upsert有数据去重能力,但它带来的问题是找索引的效率低,而对于Insert方式而言,不需要找索引,效率比较高。因此对于每一批次数据会判断是否都是Insert事件,如果都是Insert事件就直接Insert方式写入,避免查找文件是否更新的开销,数据显示大概可以提升30%~50%的性能。当然这里也需要考虑到DTS异常,重新消费数据时,恢复期间不能直接使用Insert方式,否则可能会存在数据重复,对于这个问题我们引入了表级别的Watermark,保证即使在DTS异常情况下也不会出现数据重复问题。
三、Hudi核心设计
接着介绍下Hudi的定位,根据社区最新的愿景,Hudi的定义是流式数据湖平台,它支持海量数据更新,内置表格式以及支持事务的储存,一系列列表服务Clean、Archive、
Compaction、Clustering等,以及开箱即用的数据服务,以及本身自带的运维工具和指标监控,提供很好的运维能力。
这是Hudi