简介:数据湖的架构中,CDC数据实时读写的方案和原理
本文由李劲松、胡争分享,社区志愿者杨伟海、李培殿整理。主要介绍在数据湖的架构中,CDC数据实时读写的方案和原理。文章主要分为4个部分内容:
常见的CDC分析方案为何选择Flink+Iceberg如何实时写入读取未来规划一、常见的CDC分析方案
我们先看一下今天的topic需要设计的是什么?输入是一个CDC或者upsert的数据,输出是Database或者是用于大数据OLAP分析的存储。
我们常见的输入主要有两种数据,第一种数据是数据库的CDC数据,不断的产生changeLog;另一种场景是流计算产生的upsert数据,在最新的Flink1.12版本已经支持了upsert数据。
1.1离线HBase集群分析CDC数据
我们通常想到的第一个方案,就是把CDCupsert的数据通过Flink进行一些处理之后,实时的写到HBase当中。HBase是一个在线的、能提供在线点查能力的一种数据库,具有非常高的实时性,对写入操作是非常友好的,也可以支持一些小范围的查询,而且集群可扩展。
这种方案其实跟普通的点查实时链路是同一套,那么用HBase来做大数据的OLAP的查询分析有什么问题呢?
首先,HBase是一个面向点查设计的一种数据库,是一种在线服务,它的行存的索引不适合分析任务。典型的数仓设计肯定是要列存的,这样压缩效率和查询效率才会高。第二,HBase的集群维护成本比较高。最后,HBase的数据是HFile,不方便与大数据里数仓当中典型的Parquet、Avro、Orc等结合。
1.2ApacheKudu维护CDC数据集
针对HBase分析能力比较弱的情况,社区前几年出现了一个新的项目,这就是ApacheKudu项目。Kudu项目拥有HBase的点查能力的同时,采用列存,这样列存加速非常适合OLAP分析。
这种方案会有什么问题呢?
首先Kudu是比较小众的、独立的集群,维护成本也比较高,跟HDFS、S3、OSS比较割裂。其次由于Kudu在设计上保留了点查能力,所以它的批量扫描性能不如parquet,另外Kudu对于delete的支持也比较弱,最后它也不支持增量拉取。
1.3直接导入CDC到Hive分析
第三种方案,也是大家在数仓中比较常用的方案,就是把MySQL的数据写到Hive,流程是:维护一个全量的分区,然后每天做一个增量的分区,最后把增量分区写好之后进行一次Merge,写入一个新的分区,流程上这样是走得通的。Hive之前的全量分区是不受增量的影响的,只有当增量Merge成功之后,分区才可查,才是一个全新的数据。这种纯列存的append的数据对于分析是非常友好的。
这种方案会有什么问题呢?
增量数据和全量数据的Merge是有延时的,数据不是实时写入的,典型的是一天进行一次Merge,这就是T+1的数据了。所以,时效性很差,不支持实时upsert。每次Merge都需要把所有数据全部重读重写一遍,效率比较差、比较浪费资源。
1.4Spark+Delta分析CDC数据
针对这个问题,Spark+Delta在分析CDC数据的时候提供了MERGEINTO的语法。这并不仅仅是对Hive数仓的语法简化,Spark+Delta作为新型数据湖的架构(例如Iceberg、Hudi),它对数据的管理不是分区,而是文件,因此Delta优化MERGEINTO语法,仅扫描和重写发生变化的文件即可,因此高效很多。
我们评估一下这个方案,他的优点是仅依赖Spark+Delta架构简洁、没有在线服务、列存,分析速度非常快。优化之后的MERGEINTO语法速度也够快。
这个方案,业务上是一个CopyOnWrite的一个方案,它只需要copy少量的文件,可以让延迟做的相对低。理论上,在更新的数据跟现有的存量没有很大重叠的话,可以把天级别的延迟做到小时级别的延迟,性能也是可以跟得上的。
这个方案在Hive仓库处理upsert数据的路上已经前进了一小步了。但小时级别的延迟毕竟不如实时更有效,因此这个方案最大的缺点在CopyOnWrite的Merge有一定的开销,延迟不能做的太低。
第一部分大概现有的方案就是这么多,同时还需要再强调一下,upsert之所以如此重要,是因为在数据湖的方案中,upsert是实现数据库准实时、实时入湖的一个关键技术点。
二、为何选择Flink+Iceberg
2.1Flink对CDC数据消费的支持
第一,Flink原生支持CDC数据消费。在前文Spark+Delta的方案中,MARGEINTO的语法,用户需要感知CDC的属性概念,然后写到merge的语法上来。但是Flink是原生支持CDC数据的。用户只要声明一个Debezium或者其他CDC的format,Flink上面的SQL是不需要感知任何CDC或者upsert的属性的。Flink中内置了hiddencolumn来标识它CDC的类型数据,所以对用户而言比较简洁。
如下图示例,在CDC的处理当中,Flink在只用声明一个MySQLBinlog的DDL语句,后面的select都不用感知CDC属性。
2.2Flink对ChangeLogStream的支持
下图介绍的是Flink原生支持ChangeLogStream,Flink在接入一个ChangeLogStream之后,拓扑是不用关心ChangeLogflag的SQL。拓扑完全是按照自己业务逻辑来定义,并且一直到最后写入Iceberg,中间不用感知ChangeLog的flag。
2.3Flink+IcebergCDC导入方案评估
最后,Flink+Iceberg的CDC导入方案的优点是什么?
对比之前的方案,CopyOnWrite跟MergeOnRead都有适用的场景,侧重点不同。CopyOnWrite在更新部分文件的场景中,当只需要重写其中的一部分文件时是很高效的,产生的数据是纯append的全量数据集,在用于数据分析的时候也是最快的,这是CopyOnWrite的优势。
另外一个是MergeOnRead,即将数据连同CDCflag直接append到Iceberg当中,在merge的时候,把这些增量的数据按照一定的组织格式、一定高效的计算方式与全量的上一次数据进行一次merge。这样的好处是支持近实时的导入和实时数据读取;这套计算方案的FlinkSQL原生支持CDC的摄入,不需要额外的业务字段设计。
Iceberg是统一的数据湖存储,支持多样化的计算模型,也支持各种引擎(包括Spark、Presto、hive)来进行分析;产生的file都是纯列存的,对于后面的分析是非常快的;Iceberg作为数据湖基于snapshot的设计,支持增量读取;Iceberg架构足够简洁,没有在线服务节点,纯tableformat的,这给了上游平台方足够的能力来定制自己的逻辑和服务化。
三、如何实时写入读取
3.1批量更新场景和CDC写入场景
首先我们来了解一下在整个数据湖里面批量更新的两个场景。
第一批量更新的这种场景,在这个场景中我们使用一个SQL更新了成千上万行的数据,比如欧洲的GDPR策略,当一个用户注销掉自己的账户之后,后台的系统是必须将这个用户所有相关的数据全部物理删除。第二个场景是我们需要将datelake中一些拥有共同特性的数据删除掉,这个场景也是属于批量更新的一个场景,在这个场景中删除的条件可能是任意的条件,跟主键(Primarykey)没有任何关系,同时这个待更新的数据集是非常大,这种作业是一个长耗时低频次的作业。另外是CDC写入的场景,对于对Flink来说,一般常用的有两种场景,第一种场景是上游的Binlog能够很快速的写到datalake中,然后供不同的分析引擎做分析使用;第二种场景是使用Flink做一些聚合操作,输出的流是upsert类型的数据流,也需要能够实时的写到数据湖或者是下游系统中去做分析。如下图示例中CDC写入场景中的SQL语句,我们使用单条SQL更新一行数据,这种计算模式是一种流式增量的导入,而且属于高频的更新。
3.2ApacheIceberg设计CDC写入方案需要考虑的问题
接下来我们看下iceberg对于CDC写入这种场景在方案设计时需要考虑哪些问题。
第一是正确性,即需要保证语义及数据的正确性,如上游数据upsert到iceberg中,当上游upsert停止后,iceberg中的数据需要和上游系统中的数据保持一致。第二是高效写入,由于upsert的写入频率非常高,我们需要保持高吞吐、高并发的写入。第三是快速读取,当数据写入后我们需要对数据进行分析,这其中涉及到两个问题,第一个问题是需要支持细粒度的并发,当作业使用多个task来读取时可以保证为各个task进行均衡的分配以此来加速数据的计算;第二个问题是我们要充分发挥列式存储的优势来加速读取。第四是支持增量读,例如一些传统数仓中的ETL,通过增量读取来进行进一步数据转换。
3.3ApacheIcebergBasic
在介绍具体的方案细节之前,我们先了解一下Iceberg在文件系统中的布局,总体来讲Iceberg分为两部分数据,第一部分是数据文件,如下图中的parquet文件,每个数据文件对应一个校验文件(.crc文件)。第二部分是表元数据文件(Metadata文件),包含Snapshot文件(snap-.avro)、Manifest文件(.avro)、TableMetadata文件(*.json)等。
下图展示了在iceberg中snapshot、manifest及partition中的文件的对应关系。下图中包含了三个partition,第一个partition中有两个文件f1、f3,第二个partition有两个文件f4、f5,第三个partition有一个文件f2。对于每一次写入都会生成一个manifest文件,该文件记录本次写入的文件与partition的对应关系。再向上层有snapshot的概念,snapshot能够帮助快速访问到整张表的全量数据,snapshot记录多个manifest,如第二个snapshot包含manifest2和manifest3。
3.4INSERT、UPDATE、DELETE写入
在了解了基本的概念,下面介绍iceberg中insert、update、delete操作的设计。
下图示例的SQL中展示的表包含两个字段即id、data,两个字段都是int类型。在一个transaction中我们进行了图示中的数据流操作,首先插入了(1,2)一条记录,接下来将这条记录更新为(1,3),在iceberg中update操作将会拆为delete和insert两个操作。
这么做的原因是考虑到iceberg作为流批统一的存储层,将update操作拆解为delete和insert操作可以保证流批场景做更新时读取路径的统一,如在批量删除的场景下以Hive为例,Hive会将待删除的行的文件offset写入到delta文件中,然后做一次mergeonread,因为这样会比较快,在merge时通过position将原文件和delta进行映射,将会很快得到所有未删除的记录。
接下来又插入记录(3,5),删除了记录(1,3),插入记录(2,5),最终查询是我们得到记录(3,5)(2,5)。
上面操作看上去非常简单,但在实现中是存在一些语义上的问题。如下图中,在一个transaction中首先执行插入记录(1,2)的操作,该操作会在datafile1文件中写入INSERT(1,2),然后执行删除记录(1,2)操作,该操作会在equalifydeletefile1中写入DELETE(1,2),接着又执行插入记录(1,2)操作,该操作会在datafile1文件中再写入INSERT(1,2),然后执行查询操作。
在正常情况下查询结果应该返回记录INSERT(1,2),但在实现中,DELETE(1,2)操作无法得知删除的是datafile1文件中的哪一行,因此两行INSERT(1,2)记录都将被删除。
那么如何来解决这个问题呢,社区当前的方式是采用了Mixedposition-deleteandequality-delete。Equality-delete即通过指定一列或多列来进行删除操作,position-delete是根据文件路径和行号来进行删除操作,通过将这两种方法结合起来以保证删除操作的正确性。
如下图我们在第一个transaction中插入了三行记录,即INSERT(1,2)、INSERT(1,3)、INSERT(1,4),然后执行