基于FlinkHive构建流批一体准实

基于Hive的离线数仓往往是企业大数据生产系统中不可缺少的一环。Hive数仓有很高的成熟度和稳定性,但由于它是离线的,延时很大。在一些对延时要求比较高的场景,需要另外搭建基于Flink的实时数仓,将链路延时降低到秒级。但是一套离线数仓加一套实时数仓的架构会带来超过两倍的资源消耗,甚至导致重复开发。想要搭建流式链路就必须得抛弃现有的Hive数仓吗?并不是,借助Flink可以实现已有的Hive离线数仓准实时化。本文整理自ApacheFlinkCommitter、阿里巴巴技术专家李劲松在InfoQ技术公开课的分享,文章将分析当前离线数仓实时化的难点,详解Flink如何解决Hive流批一体准实时数仓的难题,实现更高效、合理的资源配置。文章大纲如下:1.离线数仓实时化的难点2.Flink在流批一体的探索3.构建流批一体准实时数仓应用实践

离线数仓实时化的难点

离线数仓

上图是一个典型的离线数仓,假设现在公司有一个需求,目前公司的数据量很大,需要每天出一个报表且输出到业务数据库中。首先是刚入库的业务数据,大致分为两种,一种是MySQL的binlog,另外一种是业务系统中的业务打点,这个日志打点信息可以通过Flume等工具去采集,再离线入库到数仓中。然后随着业务越来越多,业务中的各个表可以做一些抽象,抽象的好处是更好的管理和更高效的数据复用和计算复用。所以数仓就分成了多层(明细层、中间层、服务层等等),每一层存的是数据表,数据表之间通过HiveSQL的计算来实现ETL转换。

不止是HiveSQL,Hive只是静态的批计算,而业务每天都要出报表,这意味着每天都要进行计算,这种情况下会依赖于调度工具和血缘管理:

调度工具:按照某个策略把批计算调度起来。血缘管理:一个任务是由许多个作业组合而成,可能有非常复杂的表结构层次,整个计算是一个非常复杂的拓扑,作业间的依赖关系非常复杂(减少冗余存储和计算,也可以有较好的容错),只有当一级结束后才能进行下一级的计算。当任务十分庞大的时候,我们得出结果往往需要很长的一段时间,也就是我们常说的T+1,H+1,这就是离线数仓的问题。

第三方工具

上面说过,离线数仓不仅仅是简单的Hive计算,它还依赖了其它的第三方工具,比如:

使用Flume来入库,但存在一定的问题,首先,它的容错可能无法保证Exactly-Once效果,需要下游再次进行去重操作。其次,自定义逻辑需要通过一些手段,比如脚本来控制。第三,离线数仓并不具备良好的扩展能力,当数据剧增时,增加原本的并发数就比较困难了。基于调度工具的作业调度会带来级联的计算延迟,比如凌晨1点开始计算昨天的数据,可能需要到早上6、7点才能做完,并且无法保证在设置的调度时间内数据可以完全ready。此外,级联的计算还会带来复杂的血缘管理问题,大任务的Batch计算可能会突然打满集群的资源,所以也要求我们对于负载管理进行考量,这些都会给业务增加负担。无论是离线数仓还是第三方工具,其实主要的问题还是“慢”,如何解决慢的问题,此时就该实时数仓出场了。

实时数仓

实时数仓其实是从Hive+HDFS的组合换成了Kafka,ETL的功能通过Flink的流式处理解决。此时就不存在调度和血缘管理的问题了,通过实时不断的增量更新,最终输出到业务的DB中。

虽然延时降低了,但此时我们会面临另外一些问题:

历史数据丢失,因为Kafka只是临时的存储介质,数据会有一个超时的时间(比如只保存7天的数据),这会导致我们的历史数据丢失。成本相对较高,实时计算的成本要大于离线计算。Lambda架构

所以此时很多人就会选择一套实时一套离线的做法,互不干扰,根据任务是否需要走实时的需求来对需求进行分离。

这套架构看似解决了所有问题,但实际带来的问题也是非常多。首先,Lambda架构造成了离线和实时的割裂问题,它们解决的业务问题都是一样的,但是两套方案让同样的数据源产生了不同的计算结果。不同层级的表结构可能不一致,并且当数据产生不一致的问题时,还需要去进行比对排查。

随着这套Lambda架构越走越远,开发团队、表结构表依赖、计算模型等都可能会被割裂开,越到后面越会发现,成本越来越高,而统一的代价越来越大。

那么问题来了,实时数仓会耗费如此大的资源,且还不能保留历史数据,Lambda架构存在如此多的问题,有什么方案可以解决呢?

数据湖

数据湖拥有不少的优点,原子性可以让我们做到准实时的批流一体,并且支持已有数据的修改操作。但是毕竟数据湖是新一代数仓存储架构,各方面都还不是很完美,目前已有的数据湖都强依赖于Spark(当然Flink也正在拥抱数据湖),将数据迁移到数据湖需要团队对迁移成本和人员学习成本进行考量。

如果没有这么大的决心迁移数据湖,那有没有一个稍微缓和一些的方案加速已有的离线数仓呢?

Flink在批流一体上的探索

统一元数据

Flink一直持续致力于离线和实时的统一,首先是统一元数据。简单来说就是把Kafka表的元数据信息存储到HiveMetaStore中,做到离线和实时的表Meta的统一。

(目前开源的实时计算并没有一个较为完善的持久化MetaStore,HiveMetaStore不仅能保存离线表,也可以承担实时计算的MetaStore能力)。

统一计算引擎

同样的元数据之后,实时和离线的表结构和层次可以设计成一样,接下来就是可以共用:

同一套SQL,Flink自身提供批流一体的ANSI-SQL语法,可以大大减小用户SQL开发者和运维者的负担,让用户专注于业务逻辑。同一个引擎,Flink的流和批复用一套优化和Runtime框架,现阶段的大数据引擎还远远达不到完全稳定的情况,所以仍然有很多时候需要我们去深入的分析和优化,一套引擎可以让开发者专注单个技术栈,避免需要接触多个技术栈,而只有技术广度,没有技术深度。统一数据

分析了元数据和计算引擎的统一,更进一步,是否能统一实时和离线的数据,避免数据的不一致,避免数据的重复存储和重复计算。ETL计算是否能统一呢?既然实时表设计上可以和离线表一模一样,是否可以干脆只有实时表的ETL计算,离线表从实时表里获取数据?

并且,通过实时链路可以加速离线链路的数据准备,批计算可以把调度换成流输入。

FlinkHive/FileStreamingSink即为解决这个问题,实时Kafka表可以实时的同步到对于的离线表中:

离线表作为实时的历史数据,填补了实时数仓不存在历史数据的空缺。数据批量准实时摄入为Ad-hoc查询离线表提供了准实时输入。此时离线的批计算也可以交由实时调度,在实时任务处理中某个契机(PartitionCommit见后续)自行调度离线那块的任务进行数据同步操作。

此时实时和离线的表已经基本统一,那么问题来了,Kafka中的表和Hive中的表能否就共用一张表呢?我的想法是之后可能会出现以下情况,在数仓中定义一张表,分别对应着Kafka和Hive+HDFS两种物理存储:

用户在进行insert操作时,就自然插入到了Kafka的实时table当中,同时生成另外一条链路,自动同步到HiveTable当中。这样这一张表就非常的完整,不仅满足实时的需求,而且拥有历史的数据。一个SQL读取这样的一个HybridSource,根据你的查询语句后面的where条件,自动路由到Hive的历史数据,或者是Kafka的实时数据。根据一定的规则先读Hive历史数据,再读Kafka实时数据,当然这里有一个问题,它们之间通过什么标识来切换呢?一个想法是数据中或者Kafka的Timestamp。HiveStreamingSink的实现

Flink1.11前已经有了StreamingFileSink,在1.11中不但把它集成到SQL中,让这个HiveStreamingSink可以像离线的HiveSQL那样,所有的业务逻辑都由SQL去处理,而且带来了进一步的增量。

接下来介绍下Hive/FileStreamingSink,分为两个组件,FileWriter和PartitionCommitter:

FileWriter组件可以做到分区感知,通过checkpoint机制可以保证Exactly-Once(分布式场景是不可靠的,需要通过两阶段提交+文件Rename的幂等性),FileWriter也提供了Rolling相关的参数,这个Rolling指的是我们的流式处理过程,它可以通过两个参数来控制执行频率,file-size就是每个数据流的大小,rollover-interval就是时长间隔。但是需要注意,checkpoint不宜设置太频繁,以免产生过多的小文件。PartitionCommitter,通过一系列的业务逻辑处理后得到的FinishedFlies就直接可用了吗?因为我们典型的Hive表都是分区表,当一个分区就绪后,还需要通知下游,Partition已经处理完成,可以同步到Hivemetastore中了。我们需要在合适的时机来有效的trigger特定的Partition


转载请注明:http://www.aierlanlan.com/cyrz/3357.html