一、前言
这篇主要由五个部分来组成:
首先是有赞的实时平台架构。
其次是在调研阶段我们为什么选择了Flink。在这个部分,主要是Flink与Spark的structuredstreaming的一些对比和选择Flink的原因。
第三个就是比较重点的内容,Flink在有赞的实践。这其中包括了我们在使用Flink的过程中碰到的一些坑,也有一些具体的经验。
第四部分是将实时计算SQL化,界面化的一些实践。
最后的话就是对Flink未来的一些展望。这块可以分为两个部分,一部分是我们公司接下来会怎么去更深入的使用Flink,另一部分就是Flink以后可能会有的的一些新的特性。
二、有赞实时平台架构
有赞的实时平台架构呢有几个主要的组成部分。
首先,对于实时数据来说,一个消息中间件肯定是必不可少的。在有赞呢,除了业界常用的Kafka以外,还有NSQ。与Kafka有别的是,NSQ是使用Go开发的,所以公司封了一层Java的客户端是通过push和ack的模式去保证消息至少投递一次,所以Connector也会有比较大的差距,尤其是实现容错的部分。在实现的过程中呢,参考了Flink官方提供的RabbitMQ的连接器,结合NSQclient的特性做了一些改造。
接下来就是计算引擎了,最古老的就是Storm了,现在依然还有一些任务在Storm上面跑,至于新的任务基本已经不会基于它来开发了,因为除了开发成本高以外,语义的支持,SQL的支持包括状态管理的支持都做得不太好,吞吐量还比较低,将Storm的任务迁移到Flink上也是我们接下来的任务之一。还有呢就是SparkStreaming了,相对来说Spark有一个比较好的生态,但是SparkStreaming是微批处理的,这给它带来了很多限制,除了延迟高以外还会比较依赖外部存储来保存中间状态。Flink在有赞是比较新的引擎,为什么在有了Spark和Storm的情况下我们还要引入Flink呢,下一个部分我会提到。
存储引擎,除了传统的MySQL以外,我们还使用HBase,ES和ZanKV。ZanKV是我们公司开发的一个兼容Redis协议的分布式KV数据库,所以姑且就把它当成Redis来理解好了。
实时OLAP引擎的话基于Druid,在多维的统计上面有非常好的应用。
最后是我们的实时平台。实时平台提供了集群管理,项目管理,任务管理和报警监控的功能。。
关于实时平台的架构就简单介绍到这里,接下来是Flink在有赞的探索阶段。在这个部分,我主要会对比的SparkStructuredStreaming。
三、为什么选择引入Flink
至于为什么和SparkStructuredStreaming(SSS)进行对比呢?因为这是实时SQL化这个大背景下比较有代表性的两个引擎。
首先是性能上,从几个角度来比较一下。首先是延迟,毫无疑问,Flink作为一个流式引擎是优于SSS的微批引擎的。虽然说Spark也引入了一个连续的计算引擎,但是不管从语义的保证上,还是从成熟度上,都是不如Flink的。据我所知,他们是通过将rdd长期分配到一个结点上来实现的。
其次比较直观的指标就是吞吐了,这一点在某些场景下Flink略逊于Spark。但是当涉及到中间状态比较大的任务呢,Flink基于RocksDB的状态管理就显示出了它的优势。
Flink在中间状态的管理上可以使用纯内存,也可以使用RocksDB。至于RocksDB,简单点理解的话就是一个带缓存的嵌入式数据库。借助持久化到磁盘的能力,Flink相比SSS来说可以保存的状态量大得多,并且不容易OOM。并且在做checkpoint中选用了增量模式,应该是只需要备份与上一次checkpoint时不同的sst文件。使用过程中,发现RocksDB作为状态管理性能也是可以满足我们需求的。
聊完性能,接下来就说一说SQL化,这也是现在的一个大方向吧。我在开始尝试SSS的时候,尝试了一个SQL语句中有多个聚合操作,但是却抛了异常。后面仔细看了文档,发现确实这在SSS中是不支持的。第二个是distinct也是不支持的。这两点Flink是远优于SSS的。所以从实时SQL的角度,Flink又为自己赢得了一票。除此之外,Flink有更灵活的窗口。还有输出的话,同样参考的是DataFlow模型,Flink实现支持删除并更新的操作,SSS仅支持更新的操作。(这边SSS是基于Spark的2.3版本)
API的灵活性。在SSS中,诚然table带来了比较大的方便,但是对于有一些操作依然会想通过DStream或者rdd的形式来操作,但是SSS并没有提供这样的转换,只能编写一些UDF。但是在Flink中,Table和DataStream可以灵活地互相转换,以应对更复杂的场景。
四、Flink在有赞的实践
在真正开始使用Flink之前呢,第一个要考虑的就是部署的问题。因为现有的技术栈,所以选择了部署在Yarn上,并且使用的是SingleJob的模式,虽然会有更多的ApplicationMaster,但无疑是增加了隔离性的。
4.1问题一:FLINK-
在开始部署的时候我遇到了一个比较奇怪的问题。先讲一下背景吧,因为还处于调研阶段,所以使用的是Yarn的默认队列,优先级比较低,在资源紧张的时候也容易被抢占。有一个上午,我起了一个任务,申请了5个Container来运行TaskExecutor,一个比较简单地带状态的流式任务,想多跑一段时间看看稳定不稳定。这个Flink任务最后占了多个container,还在不停增加,但是只有五个Container在工作,其他的container都注册了slot,并且slot都处于闲置的状态。以下两张图分别代表正常状态下的任务,和出问题的任务。
出错后
在涉及到这个问题细节之前,我先介绍一下Flink是如何和Yarn整合到一块的。根据下图,我们从下往上一个一个介绍这些组件是做什么的。
TaskExecutor是实际任务的执行者,它可能有多个槽位,每个槽位执行一个具体的子任务。每个TaskExecutor会将自己的槽位注册到SlotManager上,并汇报自己的状态,是忙碌状态,还是处于一个闲置的状态。
SlotManager既是Slot的管理者,也负责给正在运行的任务提供符合需求的槽位。还记录了当前积压的槽位申请。当槽位不够的时候向Flink的ResourceManager申请容器。
Pendingslots积压的Slot申请及计数器
Flink的ResourceManager则负责了与Yarn的ResourceManager进行交互,进行一系列例如申请容器,启动容器,处理容器的退出等等操作。因为采用的是异步申请的方式,所以还需要记录当前积压的容器申请,防止接收过多容器。
Pendingcontainerrequest积压容器的计数器
AMRMClient是异步申请的执行者,CallbackHandler则在接收到容器和容器退出的时候通知Flink的ResourceManager。
Yarn的ResourceManager则像是一个资源的分发器,负责接收容器请求,并为Client准备好容器。
这边一下子引入的概念有点多,下面我用一个简单地例子来描述一下这些组件在运行中起到的角色。
首先,我们的配置是3个TaskManager,每个TaskManager有两个Slot,也就是总共需要6个槽位。当前已经拥有了4个槽位,任务的调度器向Slot申请还需要两个槽位来运行子任务。
这时SlotManager发现所有的槽位都已经被占用了,所以它将这个slot的request放入了pendingslots当中。所以可以看到pendingslots的那个计数器从刚才的0跳转到了现在的2.之后SlotManager就向Flink的ResourceManager申请一个新的TaskExecutor,正好就可以满足这两个槽位的需求。于是Flink的ResourceManager将pendingcontainerrequest加1,并通过AMRMClient去向Yarn申请资源。
当Yarn将相应的Container准备好以后,通过CallbackHandler去通知Flink的ResourceManager。Flink就会根据在每一个收到的container中启动一个TaskExecutor,并且将pendingcontainerrequest减1,当pendingcontainerrequest变为0之后,即使收到新的container也会马上退回。
当TaskExecutor启动之后,会向SlotManager注册自己的两个Slot可用,SlotManager便会将两个积压的SlotRequest完成,通知调度器这两个子任务可以到这个新的TaskExecutor上执行,并且pendingrequests也被置为0.到这儿一切都符合预期。
那这个超发的问题又是如何出现的呢?首先我们看一看这就是刚刚那个正常运行的任务。它占用了6个Slot。
如果在这个时候,出现了一些原因导致了TaskExecutor非正常退出,比如说Yarn将资源给抢占了。这时Yarn就会通知Flink的ResourceManager这三个Container已经异常退出。所以Flink的ResourceManager会立即申请三个新的container。在这儿会讨论的是一个worstcase,因为这个问题其实也不是稳定复现的。
CallbackHandler两次接收到回调发现Container是异常退出,所以立即申请新的Container,pendingcontainerrequests也被置为了3.
如果在这时,任务重启,调度器会向SlotManager申请6个Slot,SlotManager中也没有可用Slot,就会向Flink的ResourceManager申请3个Container,这时pendingcontainerrequests变为了6.
最后呢结果就如图所示,起了6个TaskExecutor,总共12个Slot,但是只有6个是被正常使用的,还有6个一直处于闲置的状态。
在修复这个问题的过程中,我有两次尝试。第一次尝试,在Container异常退出以后,我不去立即申请新的container。但是问题在于,如果Container在启动TaskExecutor的过程中出错,那么失去了这种补偿的机制,有些SlotRequest会被一直积压,因为SlotManager已经为它们申请了Container。
第二次尝试是在Flink的ResourceManager申请新的container之前先去检查pendingslots,如果当前的积压slots已经可以被积压的container给满足,那就没有必要申请新的container了。
4.2问题二:监控
我们使用过程中踩到的第二个坑,其实是跟延迟监控相关的。例子是一个很简单的任务,两个source,两个除了source之外的operator,并行度都是2.每个source和operator它都有两个子任务。
任务的逻辑是很简单,但是呢当我们打开延时监控。即使是这么简单的一个任务,它会记录每一个source的子任务到每一个算子的子任务的延迟数据。这个延迟数据里还包含了平均延迟,最大延迟,百分之99的延迟等等等等。那我们可以得出一个公式,延迟数据的数量是source的子任务数量乘以的source的数量乘以算子的并行度乘以算子的数量。N=n(subtaskspersource)*n(sources)*n(subtasksperoperator)*n(operator)
这边我做一个比较简单地假设,那就是source的子任务数量和算则的子任务数量都是p-并行度。从下面这个公式我们可以看出,监控的数量随着并行度的上升呈平方增长。N=p^2*n(sources)*n(operator)
如果我们把上个任务提升到10个并行度,那么就会收到份的延迟数据。这可能看起来还没有太大的问题,这貌似并不影响组件的正常运行。
但是,在Flink的devmailinglist当中,有一个用户反馈在开启了延迟监控之后,JobMaster很快就会挂掉。他收到了20+的监控数据,并且包含这些数据的ConcurrentHashMap在内存中占用了1.6G的内存。常规情况Flink的JobMaster时会给到多少内存,我一般会配1-2g,最后会导致长期FullGC和OOM的情况。
那怎么去解决这个问题呢?当延迟监控已经开始影响到系统的正常工作的时候,最简单的办法就是把它给关掉。可是把延时监控关掉,一方面我们无法得知当前任务的延时,另一方面,又没有办法去针对延时做一些报警的功能。
所以另一个解决方案就如下。首先是Flink-,它提供了更多的延迟监控粒度的选项,从源头上减少数量。比如说我们使用了Single模式去采集这些数据,那它只会记录每个operator的子任务的延迟,忽略是从哪个source或是source的子任务中来。这样就可以得出这样一个公式,也能将之前我们提到的十个并行度的任务产生的个延时监控降低到了40个。这个功能发布在了1.7.0中,并且backport回了1.5.5和1.6.2.
此外,Flink-提出了改进MetricQueryService。它包含了几个子任务,前三个子任务为监控服务建立了一个专有的低优先级的ActorSystem,在这里可以简单的理解为一个独立的线程池提供低优先级的线程去处理相关任务。它的目的也是为了防止监控任务影响到主要的组件。这个功能发布在了1.7.0中。
还有一个就是Flink-,它还依旧处于review和改进当中,目的是为了控制监控消息的大小。
4.3具体实践一
接下来会谈一下Flink在有赞的一些具体应用。
首先是Flink结合Spring。为什么要将这两者做结合呢,首先在有赞有很多服务都只暴露了Dubbo的接口,而用户往往都是通过Spring去获取这个服务的client,在实时计算的一些应用中也是如此。
另外,有不少数据应用的开发也是Java工程师,他们希望能在Flink中使用Spring以及生态中的一些组件去简化他们的开发。用户的需求肯定得得到满足。接下来我会讲一些错误的典型,以及最后是怎么去使用的。
第一个错误的典型就是在Flink的用户代码中启动一个Spring环境,然后在算子中取调用相关的bean。但是事实上,最后这个SpringContext是启动在client端的,也就是提交任务的这一端,在图中有一个红色的方框中间写着SpringContext表示了它启动的位置。可是用户在实际调用时确实在TaskManager的TaskSlot中,它们都处在不同的jvm,这明显是不合理的。所以呢我们又遇到了第二个错误。
第二个错误比第一个错误看起来要好多了,我们在算子中使用了RichFunction,并且在open方法中通过配置文件获取了一个SpringContext。但是先不说一个TaskManager中启动几个SpringContext是不是浪费,一个Jvm中启动两个SpringContext就会出问题。可能有用户就觉得,那还不简单,把TaskSlot设为1不就行了。可是还有OperatorChain这个机制将几个窄依赖的算子绑定到一块运行在一个TaskSlot中。那我们关闭OperatorChain不就行了?还是不行,Flink可能会做基于CoLocationGroup的优化,将多个subtask放到一个TaskSlot中轮番执行。
但其实最后的解决方案还是比较容易的,无非是使用单例模式来封装SpringContext,确保每个jvm中只有一个,在算子函数的open方法中通过这个单例来获取相应的Bean。
可是在调用Dubbo服务的时候,一次响应往往最少也要在10ms以上。一个TaskSlot最大的吞吐也就在一千,可以说对性能是大大的浪费。那么解决这个问题的话可以通过异步和缓存,对于多次返回同一个值的调用可以使用缓存,提升吞吐我们可以使用异步。
4.4具体实践二
可是如果想同时使用异步和缓存呢?刚开始我觉得这是一个挺容易实现的功能,但在实际写RichAsyncFunction的时候我发现并没有办法使用Flink托管的KeyedState。所以最初想到的方法就是做一个类似LRU的Cache去缓存数据。但是这完全不能借助到Flink的状态管理的优势。所以我研究了一下实现。
为什么不支持呢?
当一条记录进入算子的时候,Flink会先将key提取出来并将KeyedState指向与这个key关联的存储空间,图上就指向了key4相关的存储空间。但是如果此时key1关联的异步操作完成了,希望把内容缓存起来,会将内容写入到key4绑定的存储空间。当下一次key1相关的记录进入算子时,回去key1关联的存储空间查找,可是根本找不到数据,只好再次请求。
所以解决的方法是定制一个算子,每条记录进入系统,都让它指向同一个公用key的存储空间。在这个空间使用MapState来做缓存。最后算子运行的function继承AbstractRichFunction在open方法中来获取KeyedState,实现AsyncFunction接口来做异步操作。
五、实时计算SQL化与界面化
最早我们使用SDK的方式来简化SQL实时任务的开发,但是这对用户来说也不算非常友好,所以现在讲SQL实时任务界面化,用Flink作为底层引擎去执行这些任务。
在做SQL实时任务时,首先是外部系统的抽象,将数据源和数据池抽象为流资源,用户将它们数据的Schema信息和元信息注册到平台中,平台根据用户所在的项目组管理读写的权限。在这里消息源的格式如果能做到统一能降低很多复杂度。比如在有赞,想要接入的用户必须保证是Json格式的消息,通过一条样例消息可以直接生成Schema信息。
接下来是根据用户选择的数据源和数据池,获取相应的Schema信息和元信息,在Flink任务中注册相应的外部系统Table连接器,再执行相应的SQL语句。
在SQL语义不支持的功能上尽量使用UDF的方式来拓展。
有数据源和数据池之间的元信息,还可以获取实时任务之间可能存在的依赖关系,并且能做到整个链路的监控
六、未来与展望
Flink的批处理和ML模块的尝试,会跟Spark进行对比,分析优劣势。目前还处于调研阶段,目前比较