DataEnrichment
在流式处理作业(特别是实时数仓ETL作业)中,我们的数据流可以视为无界事实表,其中往往缺乏一些维度信息。例如,对于埋点日志流而言,为了减少传输冗余,可能只会带有城市ID、商品ID等,如果要映射到对应的名称,就需要与外部存储中的维度表进行关联。这里的外部存储一般是指适合OLTP场景的数据库,如MySQL、Redis、HBase等。
英文语境里习惯将上述操作称为dataenrichment。下图展示出了Trackunit公司的实时IoT处理架构,比较有代表性。注意图中的"Enrich"字样。
实时关联维度数据的思路主要有如下4种。
全量预加载+定时刷新:适用于规模较小的缓慢变化维度(SCD),思路最简单,可以参见笔者之前写的示例。
实时查询+缓存刷新:适用于规模较大的缓慢变化维度(SCD),在数仓维度建模过程中,这种维度最为常见,本文接下来会详细叙述其实现方式。
纯实时查询:适用于快速变化维度(RCD),或者对关联时效性要求极高的场合,需特别注意频繁请求对外部存储的压力。
流式化维度:比较特殊且灵活,将维度表的changelog转化为流,从而把静态表的关联转化为双流join。从changelog解析出的维度数据可以写入状态存储,起到缓存的作用。之后再提。
上述4种思路并没有绝对的好坏之分,而是需要根据业务特点和需求来取舍。
下面介绍用Flink异步I/O、Vert.xJDBCClient和GuavaCache实现的实时查询+缓存刷新方案。
FlinkAsyncI/O
Flink的异步I/O专门用来解决Flink计算过程中与外部系统的交互问题。在默认情况下,算子向外部系统发出请求后即阻塞,等待结果返回才能发送下一个请求,可能会造成较大的延迟,吞吐量下降。有了异步I/O之后,就可以并发地发出请求和接收响应,延迟大大降低。下图来自官方文档,一看便知。
关于它的细节,看官可以参考之前的《聊聊Flink异步I/O机制的原理》一文,不再废话。
Vert.xJDBCClient
Vert.x是一个由Eclipse基金会开源的跨语言、事件驱动的异步应用程序框架,运行在JVM平台上,底层依赖于Netty。Vert.x的异步应用场景极为广泛,如Web、数据库访问、响应式编程、微服务、MQTT、认证与鉴权、消息队列、事件总线等等,详情可以参见官方文档。
本文采用的维度表数据源是MySQL,而Java原生的JDBC机制是同步的,要与Flink异步I/O一同使用的话,按传统方式需要自己创建连接池、线程池并实现异步化。我们引入Vert.xJDBCClient模块来简化之,先加入依赖项。
dependencygroupIdmysql/groupIdartifactIdmysql-connector-java/artifactIdversion5.1.47/version/dependencydependencygroupIdio.vertx/groupIdartifactIdvertx-jdbc-client/artifactIdversion3.8.5/version/dependency
通过VertxOptions指定事件循环线程池和工作线程池的大小,然后指定JDBC连接的各项参数(注意c3p0的连接池大小max_pool_size),并创建异步的SQL客户端实例。
PropertiesdbProps=ParameterUtil.getFromResourceFile("mysql.properties");Vertxvertx=Vertx.vertx(newVertxOptions().setWorkerPoolSize(10).setEventLoopPoolSize(5));JsonObjectconfig=newJsonObject().put("url",dbProps.getProperty("mysql.sht.url")).put("driver_class","