1.发展概览
平台支持公司内部所有部门的实时计算应用。主要的业务包括实时大屏、推荐、实验平台、实时监控和实时数据清洗等。
1.1集群规模
平台现有异地双机房双集群,具有多的物理机节点,利用k8s的namespaces,labels和taints等,实现业务隔离以及初步的计算负载隔离。目前线上实时应用有大概个,平台最近主要支持FlinkSQL任务的上线。
1.2平台架构
上图是唯品会实时计算平台的整体架构。
最底层是计算任务节点的资源调度层,实际是以deployment的模式运行在k8s上,平台虽然支持yarn调度,但是yarn调度是与批任务共享资源,所以主流任务还是运行在k8s上。
存储层这一层,支持公司内部基于kafka实时数据vms,基于binlog的vdp数据和原生kafka作为消息总线,状态存储在hdfs上,数据主要存入redis,mysql,hbase,kudu,clickhouse等。
计算引擎层,平台支持Flink,Spark,Storm主流框架容器化,提供了一些框架的封装和组件等。每个框架会都会支持几个版本的镜像满足不同的业务需求。
平台层提供作业配置、调度、版本管理、容器监控、job监控、告警、日志等功能,提供多租户的资源管理(quota,label管理),提供kafka监控。在Flink1.11版本之前,平台自建元数据管理系统为FlinkSQL管理schema,1.11版本开始,通过hivemetastore与公司元数据管理系统融合。
最上层就是各个业务的应用层。
2.Flink容器化实践
2.1容器化实践
上图是实时平台Flink容器化的架构。Flink容器化是基于standalone模式部署的。
部署模式共有client,jobmanager和taskmanager三个角色,每一个角色都由一个deployment控制。
用户通过平台上传任务jar包,配置等,存储于hdfs上。同时由平台维护的配置,依赖等也存储在hdfs上,当pod启动时,会进行拉取等初始化操作。
client中主进程是一个由go开发的agent,当client启动时,会首先检查集群状态,当集群ready后,从hdfs上拉取jar包向Flink集群提交任务。同时,client的主要功能还有监控任务状态,做savepoint等操作。
通过部署在每台物理机上的smart-agent采集容器的指标写入m3,以及通过Flink暴漏的接口将metrics写入prometheus,结合grafana展示。同样通过部署在每台物理机上的vfilebeat采集挂载出来的相关日志写入es,在dragonfly可以实现日志检索。
2.1.1Flink平台化
在实践过程中,结合具体场景以及易用性考虑,做了平台化工作。
平台的任务配置与镜像,Flink配置,自定义组件等解耦合,现阶段平台支持1.7、1.9、1.11、1.12等版本。
平台支持流水线编译或上传jar、作业配置、告警配置、生命周期管理等,从而减少用户的开发成本。
平台开发了容器级别的如火焰图等调优诊断的页面化功能,以及登陆容器的功能,支持用户进行作业诊断。
2.1.2Flink稳定性
在应用部署和运行过程中,不可避免的会出现异常。以下是平台保证任务在出现异常状况后的稳定性做的策略。
pod的健康和可用,由livenessProbe和readinessProbe检测,同时指定pod的重启策略。
Flink任务异常时:
Flink原生的restart策略和failover机制,作为第一层的保证。
在client中会定时监控Flink状态,同时将最新的checkpoint地址更新到自己的缓存中,并汇报到平台,固化到MySQL中。当Flink无法再重启时,由client重新从最新的成功checkpoint提交任务。作为第二层保证。这一层将checkpoint固化到MySQL中后,就不再使用FlinkHA机制了,少了zk的组件依赖。
当前两层无法重启时或集群出现异常时,由平台自动从固化到MySQL中的最新chekcpoint重新拉起一个集群,提交任务,作为第三层保证。
机房容灾:
用户的jar包,checkpoint都做了异地双HDFS存储
异地双机房双集群
2.2kafka监控方案
kafka监控是我们的任务监控里相对重要的一部分,整体监控流程如下所示。
平台提供监控kafka堆积,消费message等配置信息,从MySQL中将用户kafka监控配置提取后,通过jmx监控kafka,写入下游kafka,再通过另一个Flink任务实时监控,同时将这些数据写入ck,从而展示给用户。
3.FlinkSQL平台化建设
基于k8s的Flink容器化实现以后,方便了Flinkapi应用的发布,但是对于FlinkSQL的任务仍然不够便捷。于是平台提供了更加方便的在线编辑发布、SQL管理等一栈式开发平台。
3.1FlinkSQL方案
平台的FlinkSQL方案如上图所示,任务发布系统与元数据管理系统完全解耦。
3.1.1FlinkSQL任务发布平台化
在实践过程中,结合易用性考虑,做了平台化工作,主操作界面如下图所示:
FlinkSQL的版本管理,语法校验,拓扑图管理等;
UDF通用和任务级别的管理,支持用户自定义UDF;
提供参数化的配置界面,方便用户上线任务。
图片
3.1.2元数据管理
平台在1.11之前通过构建自己的元数据管理系统UDM,MySQL存储kafka,redis等schema,通过自定义catalog打通Flink与UDM,从而实现元数据管理。1.11之后,Flink集成hive逐渐完善,平台重构了FlinkSQL框架,通过部署一个SQL-gatewayservice服务,中间调用自己维护的SQL-clientjar包,从而与离线元数据打通,实现了实时离线元数据统一,为之后的流批一体做好工作。在元数据管理系统创建的Flink表操作界面如下所示,创建Flink表的元数据,持久化到hive里,FlinkSQL启动时从hive里读取对应表的tableschema信息。
3.2FlinkSQL相关实践
平台对于官方原生支持或者不支持的connector进行整合和开发,镜像和connector,format等相关依赖进行解耦,可以快捷的进行更新与迭代。
3.2.1FLINKSQL相关实践
connector层,现阶段平台支持官方支持的connector,并且构建了redis,kudu,clickhouse,vms,vdp等平台内部的connector。平台构建了内部的pbformat,支持protobuf实时清洗数据的读取。平台构建了kudu,vdp等内部catalog,支持直接读取相关的schema,不用再创建ddl。
平台层主要是在UDF、常用运行参数调整、以及升级hadoop3。
runntime层主要是支持拓扑图执行计划修改、维表关联keyBycache优化等
3.2.2拓扑图执行计划修改
针对现阶段SQL生成的streamgraph并行度无法修改等问题,平台提供可修改的拓扑预览修改相关参数。平台会将解析后的FlinkSQL的excutionplanjson提供给用户,利用uid保证算子的唯一性,修改每个算子的并行度,chain策略等,也为用户解决反压问题提供方法。例如针对clickhousesink小并发大批次的场景,我们支持修改clickhousesink并行度,source并行度=72,sink并行度=24,提高clickhousesinktps。
3.2.3维表关联keyBy优化cache
针对维表关联的情况,为了降低IO请求次数,降低维表数据库读压力,从而降低延迟,提高吞吐,有以下几种措施:
当维表数据量不大时,通过全量维表数据缓存在本地,同时ttl控制缓存刷新的时候,这可以极大的降低IO请求次数,但会要求更多但内存空间。
当维表数据量很大时,通过async和LRUcache策略,同时ttl和size来控制缓存数据的失效时间和缓存大小,可以提高吞吐率并降低数据库的读压力。
当维表数据量很大同时主流qps很高时,可以开启把维表join的key作为hash的条件,将数据进行分区,即在calc节点的分区策略是hash,这样下游算子的subtask的维表数据是独立的,不仅可以提高命中率,也可降低内存使用空间。
优化之前维表关联LookupJoin算子和正常算子chain在一起。
优化之间维表关联LookupJoin算子和正常算子不chain在一起,将joinkey作为hash策略的key。采用这种方式优化之后,例如原先W数据量的维表,10个TM节点,每个节点都要缓存W的数据,总共需要缓存W*10=3亿的量。而经过keyBy优化之后,每个TM节点只需要缓存W/10=W的数据量,总共缓存的数据量只有W,大大减少缓存数据量。
3.2.4维表关联延迟join
维表关联中,有很多业务场景,在维表数据新增数据之前,主流数据已经发生join操作,会出现关联不上的情况。因此,为了保证数据的正确,将关联不上的数据进行缓存,进行延迟join。
最简单的做法是,在维表关联的function里设置重试次数和重试间隔,这个方法会增大整个流的延迟,但主流qps不高的情况下,可以解决问题。
增加延迟join的算子,当join维表未关联时,先缓存起来,根据设置重试次数和重试间隔从而进行延迟的join。
4.应用案例
4.1.实时数仓
4.1.1实时数据入仓
流量数据一级kafka通过实时清洗之后,写到二级清洗kafka,主要是protobuf格式,再通过FlinkSQL写入hive5min表,以便做后续的准实时ETL,加速ods层数据源的准备时间。
MySQL业务库的数据,通过VDP解析形成binlogcdc消息流,再通过FlinkSQL写入hive5min表。
业务系统通过VMSAPI产生业务kafka消息流,通过FlinkSQL解析之后写入hive5min表。支持string、json、csv等消息格式。
使用FlinkSQL做流式数据入仓,非常的方便,而且1.12版本已经支持了小文件的自动合并,解决了小文件的痛点。
我们自定义分区提交策略,当前分区ready时候会调一下实时平台的分区提交api,在离线调度定时调度通过这个api检查分区是否ready。
采用FlinkSQL统一入仓方案以后,我们可以获得的收益:可解决以前Flume方案不稳定的问题,而且用户可自助入仓,大大降低入仓任务的维护成本。提升了离线数仓的时效性,从小时级降低至5min粒度入仓。
4.1.2实时指标计算
实时应用消费清洗后kafka,通过redis维表、api等方式关联,再通过Flinkwindow增量计算UV,持久化写到Hbase里。
实时应用消费VDP消息流之后,通过redis维表、api等方式关联,再通过FlinkSQL计算出销售额等相关指标,增量upsert到kudu里,方便根据range分区批量查询,最终通过数据服务对实时大屏提供最终服务。
以往指标计算通常采用Storm方式,需要通过api定制化开发,采用这样Flink方案以后,我们可以获得的收益:将计算逻辑切到FlinkSQL上,降低计算任务口径变化快,修改上线周期慢等问题。切换至FlinkSQL可以做到快速修改,快速上线,降低维护成本。
4.1.3实时离线一体化ETL数据集成
FlinkSQL在最近的版本中持续强化了维表join的能力,不仅可以实时关联数据库中的维表数据,现在还能关联Hive和Kafka中的维表数据,能灵活满足不同工作负载和时效性的需求。
基于Flink强大的流式ETL的能力,我们可以统一在实时层做数据接入和数据转换,然后将明细层的数据回流到离线数仓中。
我们通过将presto内部使用的HyperLogLog(后面简称HLL)实现引入到SparkUDAF函数里,打通HLL对象在SparkSQL与presto引擎之间的互通,如SparkSQL通过prepare函数生成的HLL对象,不仅可以在SparkSQL里merge查询而且可以在presto里进行merge查询。具体流程如下:
UV近似计算示例:
Step1:SparkSQL生成HLL对象
insertoverwritedws_goods_uvpartition(dt=")ASselectgoods_id,estimate_prepare(mid)aspre_hllfromdwd_table_goodsgroupbygoods_idwheredt=
Step2:SparkSQL通过goods_id维度的HLL对象merge成品牌维度
insertoverwritedws_brand_uvpartition(dt=")ASselectb.brand_id,estimate_merge(pre_hll)asmerge_hllfromdws_table_brandAleftjoindim_table_brand_goodsBonA.goods_id=B.goods_idwheredt=
Step3:SparkSQL查询品牌维度的UV
selectbrand_id,estimate_