1.背景介绍
年B站基于Hadoop开始搭建离线计算服务,计算集群规模从最初的两百台到发展到目前近万台,从单机房发展到多机房。我们先后在生产上大规模的使用了Hive、Spark、Presto作为离线计算引擎,其中Hive和Spark部署在Yarn上,具体的架构如下,目前每天有约20w的离线批作业运行在Spark和Hive上,下面介绍下我们做了哪些工作来确保这些作业的高效与稳定。
2.从Hive到Spark
21年初的时候Hive还是B站主要的离线计算引擎,80%以上的离线作业使用Hive执行,Spark2.4作业占比接近20%,集群资源的使用率长期维持在80%以上。21年3月Spark3.1发布,相较于Spark2.4性能有了较大的提升,我们开始推动Spark3.1在B站的落地,同时将Hive-SQL整体迁移至Spark-SQL。
在B站,离线计算的调度已经完成了收口,80%以上的作业来自于自建的BSK调度平台,其余的作业基本也都是airflow提交上来的,只有少量的任务来自散落的开发机。在推动Hive升级Spark时只要将调度平台的作业完成迁移就可以覆盖90%以上的作业。起步阶段我们进行了少量的人工迁移,对用户SQL进行了简单改写,修改了输入输出表后由两个引擎执行,开发了一个结果对比的工具,通过对双跑结果分析保障迁移效果。基于这个操作链路我们自研了一个自动迁移工具,减少人工失误和人力压力。
2.1语句转换
我们重写了SparkSqlParser,将从调度系统中收集到的SQL进行输入输出表的替换,避免对生产环境的影响。调度平台进行作业调度时以DAG为单位,一个调度任务里面可能存在多条SQL,这些SQL的输入输出表间存在依赖关系,为了保证双跑尽可能的模拟生产表现,对一个DAG里面的多个调度作业进行输入输出表替换时进行整体替换,保证了相互间依赖。对于Select语句因为本身没有输出表,需要将Select语句转换为CTAS语句,这样就能将执行结果落地进行对比,需要注意的是转换过程中要将列名进行编码防止中文列导致的建表失败。当迁移工具识别出SQL语句为DDL语句,如果不是CTAS这种需要消耗计算资源的就直接跳过对比,同时对该语句进行标记,保证交由Hive执行,防止意外的元信息修改。
2.2结果对比
双跑输出结果的对比是保证数据准确性的关键。首先对两个结果表的Schema进行对比,这个通过调用DESC语法返回结果对照就可以完成。对于Schema一致的两个表则进行下一步操作,两表全量数据对比,我们设计了一个SQL对数据按行进行整体对比,具体的对比思路如图:
第一步将两表按所有列(这里是name和num字段)进行GROUPBY,
第二步UNIONALL两表数据,第三步再按所有列(这里是name,num和cnt字段)GROUPBY一次产生最终表,在最终表中cnts值为2的行表示这行数据在两表中都有且重复值一致,对于值非2的数据就是差异行了。
从上图的例子来说差异行Jack
1
2
1表示Jack
1这行数据数据在一个表中存在两行,结合差异行Jack
1
1
1来看其实就是Jack
1这行数据一个表有一行另一个表有两行。通过这个方式就可以对双跑产出的结果表进行一个全量的对比。
通过这种结果对比方法可以完成大部分双跑任务的结果对比,但是对于结果表中存在LIST、SET、MAP这种容器类型的,因为在toString时顺序是无法保证的,所以会被识别为不一致,此外对于非稳定性的SQL如某列数据是random产生,因为每次执行产出的结果不一致,也会识别为对比失败,这两种情况下就需用人工的介入来分析了。
资源利用率的提升是做引擎升级的出发点,除了结果对比来保证数据准确性,我们还做了资源消耗对比来保证迁移的收益。对比系统收集了每个作业的执行时间以及消耗的资源,从执行时间、CPU和内存的资源消耗进行两个引擎执行性能的对比,在执行最终迁移前依据收集的数据为用户提供了迁移的预期收益,提高了用户迁移任务的积极性。从迁移中收集的数据来看hive切到spark可以减少40%以上的执行时间,同时整体资源消耗降低30%以上。
2.3迁移回滚
迁移系统对每个任务都执行了至少3次的双跑对比,但依然不能完全消除执行迁移的风险,在实际迁移过程中的几次问题都是迁移后稳定性不符合预期导致的,因此迁移系统对于迁移后的任务增加了监控,在一个任务迁移后,该任务的前3次调度执行消耗的时间、CPU和内存资源将被用来和迁移前的七次平均执行数据对比,如果存在负优化的情况则会将这个任务执行引擎进行回滚并通知我们介入进行进一步分析。
3.Spark在B站的实践
3.1稳定性改进
3.1.1小文件问题
随着B站业务高速发展,数据量和作业数增长越来越快,伴随而来的小文件数也快速增长,小文件太多会增加HDFS元数据的压力,在计算引擎读取时也大大增加了读请求的数量降低了读取效率。为了解决小文件的问题,在写表场景下对Spark做了如下两种改造。
兜底小文件合并:我们修改了数据的写出目录,引擎计算先写到一个中间目录,在FileFormatWriter.write结束后refreshUpdatedPartitions前,插入了一个文件合并逻辑,从中间目录中获取分区下文件的平均大小,对于不存在小文件情况的目录直接MV到最终目录,对于存在小文件的目录新增一个读RDDcoalesce到一个合适值写出后MV到最终目录。
基于reparation的小文件合并:可以看到兜底小文件合并方式需要先将数据落地到HDFS,重新读取后再写出,这样做放大了HDFS写操作(三副本),降低了计算引擎的执行性能。而Spark3的AQE特性可以在有shuffle的场景下有效解决小文件的问题,很多情况下对于没有shuffle的场景新增一个reparation操作就可以借助AQE的能力解决小文件的问题。社区AQE对于reparation这个hint是不会调整目标分区数的,我们新增了一个rebalancehint,本质上和reparation一样只是将AQE的特性应用在了这个操作上,同时将AQE目标size相关的属性和rebalance设置属性做了隔离方便更好的设置文件大小而不影响计算的并行度。rebalance操作会在最终写出前增加一个shufflestage,有些情况下没有这个stage上游输出就已经没有小文件了,为此作业是否增加rebalance操作依赖于我们对任务的画像通过HBO系统开启。
3.1.2shuffle稳定性问题
Shuffle稳定性直接影响了Spark作业的SLA,在B站推动Spark升级过程中成为用户顾虑的点。
shuffle磁盘分级:B站Yarn主集群采用DataNode和NodeManage混部模式,节点配置了多块HDD盘和少量SSD盘,NM以HDD盘作为计算盘,由于和DN没有做到IO隔离,DN和shuffleservice经常互相影响,因此我们对DiskBlockManager进行了改造,优先使用SSD盘下的目录作为工作目录,当SSD盘存储空间或者inode紧张时则降级到Yarn配置的计算目录,借助SSD优异的随机IO能力,有效的提高的了shuffle稳定性。
remoteshuffleservice:pushbasedshuffle方案可以大量降低磁盘随机IO读请求,如下图:
通过中间服务将同属一个分区的数据进行归并,后续reduce操作就不需要从上游所有的Map节点拉取数据,在shuffle上下游Task数量多的情况下会对磁盘IO压力指数放大,生产上shuffleheavy的任务表现很不稳定,经常出现FetchFailedException。
B站在推动RSS落地时选择了社区3.2Pushbasedshuffle的方案,这个方案主要的优点是对AQE支持比较好,缺点是因为本地也要写一份数据放大了写。将数据先写本地后异步的发送到driver维护的executor节点的externalshuffle节点上,后续生产实践中该方案有个问题,就是当作业启动时通常driver维护的executor数不足以满足远程节点的选择,而SQL作业参与计算的数据量通常是随着过滤条件层层递减的
通常shuffle数据量大的时候因为没有足够的节点会fallback到原先的shuffle方式,为了解决这个问题,我们新增了shuffleservicemaster节点,具体调用流程如下图,所有的externalshuffle节点启动时都会注册到shufflemaster节点上,后续节点本身也会周期性的上报心跳和节点繁忙程度
DAGScheduler后续请求远程节点都从shufflemaster申请,这样不仅解决了冷启动节点不足的问题,在节点选择上也考虑了节点的健康程度。因为是先落盘后发送,在stage执行结束后会有一个等待时间,这里面会有个性能回退的问题,对小任务不友好,所以在生产应用中我们基于任务画像系统HBO自动决定任务是否启用RSS服务,目前生产大约7%的大任务在使用RSS服务,这些任务平均执行时间缩短了25%,稳定性有了显著提升。
目前B站生产中使用该方案基本解决了shuffle稳定性的问题,不过这套方案依旧需要计算节点配置本地shuffle盘,在本地落shuffle数据,无法支持存算分离的架构。后续我们在k8s上会大规模上线混部集群,需要尽量不依赖本地磁盘,避免对在线应用的影响,我们也