策划编辑|Natalie
作者|RezaShiftehfar
译者|陈亮芬
编辑|Debra
第一代:Uber大数据的开端
在年之前,我们有限的数据量可以塞进一些传统的联机事务处理(OLTP)数据库中(例如MySQL和PostgreSQL)。为了利用这些数据,我们的工程师必须单独访问每个数据库或表,如果用户想将不同数据库的数据组合起来,需要自己编写代码。当时,我们还没有对所有存储的数据进行全局访问的需求,也没有这些数据的全局视图。事实上,我们的数据分散在不同的OLTP数据库中,总数据大小约为几TB,访问这些数据的延迟非常短(通常不到一分钟)。图1是我们在年之前的数据架构概览:
随着Uber业务量呈指数级增长(业务量包括Uber运营的城市/国家数量和每个城市使用该服务的乘客/司机数量),传入数据量跟着也增加了,我们需要建立第一代分析数据仓库,以满足访问和分析一个地方所有数据的需求。为了让Uber尽可能接近数据驱动,我们需要确保分析师可以在同一个地方访问分析数据。为实现这一目标,我们首先将数据用户分为三大类:
城市运营团队(数千人):这些现场工作人员负责管理和扩展每个市场的运输网络。随着业务扩展到新的城市,就有成千上万个城市运营团队需要定期访问这些数据,以便对司机与乘客的问题做出响应。
数据科学家和分析师(数百人):这些分析师和科学家分布在不同的功能组中,他们需要利用数据来帮助我们为用户提供最佳的运输和交付体验,例如通过预测乘客需求来提供及时的服务。
工程师团队(数百人):整个公司的工程师都专注于构建自动数据应用程序,比如欺诈检测和新司机准入平台。
我们的第一代分析数据仓库专注于将Uber的所有数据聚合在一个地方以及简化数据访问。对于前者,我们决定使用Vertica作为我们的数据仓库软件,因为它速度快、可扩展,而且是面向列的。我们还开发了多个临时ETL作业,这些作业将来自不同数据源(即AWSS3、OLTP数据库、服务日志等)的数据复制到Vertica中。为了实现后者,我们将SQL作为我的解决方案接口,同时构建了一个联机查询服务来处理用户查询,并将这些查询需求提交给底层查询引擎。图2描述了我们的分析数据仓库:
第一个数据仓库服务的发布是一个巨大的成功。这个服务使得数据用户第一次拥有了全局视图,可以从一个地方访问所有数据。这一新的突破使得大量新团队使用数据分析作为其技术和产品决策的基础。在几个月内,我们的分析数据量增长到数十TB,用户数量增加到数百个。
SQL作为简单的标准接口来使用,使城市运营者们能够轻松地实现数据交互,无需了解底层技术。此外,不同的工程师团队开始构建针对用户个性化需求定制的服务和产品,这些服务和产品通过这些数据(即uberPool、前期定价等)得到所需信息。与此同时,陆续组建新的团队以便更好地使用和提供这些数据(即我们的机器学习和实验团队)。
限制
另一方面,随着数据仓库和传入数据的广泛使用,我们也逐渐看到了一些限制。由于数据是通过临时ETL作业摄取的,而我们缺乏正式的模式通信机制,这导致数据可靠性成为一个问题。我们的大多数源数据都是JSON格式,并且摄取作业无法弹性应对生产者的代码变更。
随着公司的发展,数据仓库扩展变得越来越昂贵。为了降低成本,我们开始删除过时的数据,以释放出空间留给新数据。除此之外,我们大数据平台的很大一部分不能水平扩展,因为我们之前的主要目标是满足对集中数据的访问,根本没有足够的时间来确保所有部分都是水平可扩展的。我们的数据仓库实际上被用作数据湖,堆积所有原始数据,执行数据建模和提供数据。
此外,由于生成数据的服务与下游数据消费者之间缺乏正式合约,将数据摄入至数据仓库的ETL作业非常不稳定(使用灵活的JSON格式导致源数据缺乏模式约束)。如果不同的用户在摄取期间执行不同的转换,可能会多次摄取同一批数据。这对上游数据源(即联机存储数据)造成了额外的压力,也影响了它们的服务质量。另外,这导致我们的仓库中存储了相同数据的几个副本,进一步增加了存储成本。至于数据质量,由于ETL作业是临时的,并且依赖数据源,而且数据投射和转换是在摄取期间进行的,所以回填非常耗费时间和精力。由于摄取工作缺乏标准化,我们也很难摄取任何新的数据集和类型。
第二代:Hadoop的到来
为了解决这些限制,我们围绕Hadoop生态系统重新构建了大数据平台。具体来说,我们引入了一个Hadoop数据湖,所有的原始数据仅需要从不同的联机数据存储中摄取一次,并且在摄取期间没有进行转换。这种设计转变明显降低了联机数据存储的压力,让我们从临时摄取作业转变为可扩展的摄取平台。为了让用户能够访问Hadoop中的数据,我们引入了Presto来实现交互式临时用户查询,引入ApacheSpark来促进对原始数据(包括SQL和非SQL两种格式)的编程访问,并将ApacheHive作为主力来应对非常大的查询。这些不同的查询引擎让用户可以获得最能满足其需求的工具,这也使得我们的平台变得更加灵活和易于访问。
为了保持平台的可扩展性,我们确保所有数据建模和转换仅在Hadoop中进行,从而在出现问题时能够进行快速的回填和恢复。只有最关键的建模表(城市运营者们实时利用这些表来实现快速的SQL查询)才被转移到我们的数据仓库中。这大大降低了数据仓库的运营成本,同时还将用户引导到基于Hadoop的查询引擎。
我们还利用了ApacheParquet这个标准的列式文件格式,提高了压缩率,从而节省了存储空间,同时因为分析查询采用了列式访问,从而获得了计算资源收益。此外,Parquet与ApacheSpark的无缝集成使得该解决方案成为访问Hadoop数据的流行选择。图3总结了我们的第二代大数据平台的架构:
除了整合Hadoop数据湖之外,我们还让生态系统中的所有数据服务都可以水平扩展,从而提高了大数据平台的效率和稳定性。这种通用的水平可扩展性可以满足直接的业务需求,让我们能够集中精力构建下一代数据平台,而不是解决临时问题。
与第一代平台不同的是,数据管道易受上游数据格式变更影响的问题将不复存在,我们的第二次迭代使得我们可以对所有数据进行模式化,从JSON转换到Parquet,将模式和数据存储在一起。为此,我们构建了一个集中模式服务来收集、存储和提供模式,还提供了不同的客户端库,用于将不同的服务与这个集中式服务集成在一起。不稳定的临时数据摄取作业被标准平台替换,以便将原始嵌套格式的源数据传输到Hadoop数据湖中。
随着Uber的业务继续以闪电般的速度扩展,我们很快就拥有了数十PB的数据。而且每天都有数十TB的新数据被添加到数据湖中,我们的大数据平台增长到超过个vcore,任何一天都有超过0个批处理作业。这也使得我们的Hadoop数据湖成为所有分析Uber数据的事实来源。
限制
随着公司的不断扩展,我们的生态系统中存储了数十PB的数据,我们不得不面临一系列新的挑战。
首先,由于需要摄取更多数据以及有更多用户编写临时批处理作业(生成更多输出数据),存储在HDFS中的大量小文件开始给HDFSNameNode带来额外的压力。最重要的是,数据延迟仍然远远满足不了我们的业务需求。用户24小时内只能访问一次新数据,这对于需要做出实时决策的用户来说太慢了。虽然将ETL和建模迁移到Hadoop使得处理过程更具可扩展性,但因为这些ETL作业每次运行时都必须重新创建整个建模表,处理过程仍然存在瓶颈。除此之外,新数据的摄取和相关派生表的建模都基于整个数据集的快照,然后通过交换旧表和新表让用户能够访问新数据。摄取作业必须返回源数据存储,创建新快照,然后在每次运行期间将整个数据集摄取或转换为可消费的Parquet文件。随着我们的数据存储量的增长,运行个多Spark作业可能需要20多个小时。
每个作业的很大一部分都涉及将最新的快照转换为历史数据和新数据。虽然每个表每天只添加多GB新数据,但摄取作业每次在运行时都必须转换多TB的数据集。对于每次运行时都会重新创建新派生表的ETL和建模作业来说也是如此。由于历史数据的更新比例很高,这些作业必须依赖基于快照的源数据摄取。从本质上讲,我们的数据包含许多更新操作(例如乘客和司机评级、支持在行程结束后的几个小时甚至几天内的票价调整)。由于HDFS和Parquet不支持数据更新,所有摄取作业都需要从更新的源数据创建新快照,再将新快照摄取到Hadoop中,并转换为Parquet格式,然后交换输出表以查看新数据。图4总结了这些基于快照的摄取在我们的大数据平台中是如何流转的。
第三代:长期重建我们的大数据平台
到年初,整个公司的工程和运营团队都在使用我们的大数据平台,让他们能够从一个地方访问新数据或历史数据。用户可以通过UI门户(根据他们的个性化需求进行定制)轻松访问Hive、Presto、Spark、Vertica、Notebook和其他数据仓库中的数据。我们的HDFS中有多PB的数据,计算集群中有0多vcore,每天有0个Presto查询,个Spark作业,以及个Hive查询,我们的Hadoop架构因此遇到了扩展瓶颈,很多服务受到了数据延迟的影响。
幸运的是,由于我们的底层基础设施可以水平扩展,以满足当前的业务需求,因此我们有足够的时间来研究数据内容、数据访问模式和用户特定需求,以便在构建下一代数据平台之前找出亟待解决的问题。我们发现了以下四个主要的痛点:
HDFS可扩展性限制:很多依赖HDFS扩展其大数据基础设施的公司都面临这个问题。从设计上来看,HDFS的瓶颈是NameNode容量,因此存储大量小文件会对性能产生显著影响。当数据大小超过10PB时就会遇到扩展瓶颈,当数据大小超过50-PB以后这个限制将会成为一个真正的问题。幸运的是,有一些相对简单的解决方案可以将HDFS从几十个PB扩展到几百个PB,例如利用ViewFS和HDFSNameNodeFederation。通过控制小文件的数量,并将数据的不同部分移动到单独的集群(例如将HBase和Yarn的应用程序日志移动到一个单独的HDFS集群中),HDFS的可扩展性问题得到了一定缓解。
Hadoop中更快的数据:Uber的业务是实时运营的,所以我们的服务需要访问尽可能新鲜的数据。因此,在很多实际运营场景中,24小时的数据延迟太慢了,这些实际场景对更快的数据传输存在巨大需求。我们的第二代大数据平台所采用的基于快照的摄取方法效率低下,导致我们不能以较低的延迟摄取数据。为了加快数据交付速度,我们不得不重新设计管道,以便只进行更新数据和新数据的增量摄取。
支持Hadoop和Parquet的更新和删除:Uber的数据包含很多更新,变化范围从过去几天(例如乘客或者合作司机调整最近一次行程的价格)到过去几周(例如乘客开启新一次行程前对上一次行程进行打分)甚至过去几个月不等(例如由于业务需要回填或调整过去的数据)。通过基于快照的数据摄取,我们每24小时摄取一次源数据的新副本。换句话说,我们一次摄取所有更新,每天摄取一次。但是,由于需要摄取更新的数据和增量,我们的解决方案必须能够支持现有数据的更新和删除操作。然而,由于我们的数据存储在HDFS和Parquet中,无法直接支持对现有数据的更新操作。另一方面,我们的数据包含非常宽的表(每个表大约有个列),这些表有五个及五个以上级别的嵌套,而用户查询通常只需接触其中一些列,这导致我们不能以高效的方式使用非列式格式。为了使我们的大数据平台能够实现长期增长,我们必须找到一种方法来解决HDFS文件系统的这种限制,让它也可以支持更新/删除操作。
更快的ETL和建模:与原始数据摄取类似,ETL和建模作业也基于快照,需要我们的平台在每次运行时重建派生表。为减少建模表的数据延迟,ETL作业也需要利用增量。这要求ETL作业只需从原始源表中逐步地提取已更改的数据,并更新先前派生的输出表,而不是每隔几小时重建整个输出表。
引入Hudi
考虑到上述要求,我们构建了HadoopUpsertsanDIncremental(Hudi),这是一个开源的Spark库,在HDFS和Parquet之上提供抽象层,以支持所需的更新和删除操作。Hudi可以被用在任意的Spark作业中,可以水平扩展,并且只依赖HDFS。因此,任何需要支持历史数据更新/删除操作的大数据平台都可以使用Hudi。
Hudi使我们能够在Hadoop中更新、插入和删除现有的Parquet数据。此外,Hudi使得数据用户可以只需逐步提取更改的数据,显著提升了查询效率,实现派生建模表的增量更新。
我们Hadoop生态系统中的原始数据是根据时间划分的,任何旧日期分区的数据都可能在之后接收到更新。因此,对于依赖这些原始源数据表的数据用户或ETL作业,了解哪个日期分区包含更新数据的唯一方法是扫描整个源表,并根据一些已知的时间概念来过滤不需要的记录。这涉及了计算成本非常高昂的查询,需要完全扫描源表,而且不能频繁地运行ETL作业。
使用Hudi,用户很容易知道他们的最后一个检查点时间戳,并获取所有已更新的记录,无论这些更新是添加到最近日期分区的新记录,还是对旧数据的更新,都无需运行昂贵的查询来扫描整个源表。
使用Hudi库,我们能够从基于快照的原始数据摄取转换成增量摄取建模,这一转换使我们能够将数据延迟从24小时减少到一小时之内。图5描述了整合Hudi后的大数据平台:
通用数据摄取
Hudi并不是第三代大数据平台的唯一变化。我们还通过ApacheKafka实现了存储和大数据团队之间的上游数据存储变更的交接。上游数据存储事件(以及来自不同应用程序和服务的日志消息)使用统一的Avro编码流入Kafka,包括附加的标准全局元数据头部信息(比如时间戳、rowkey、版本、数据中心信息和源主机)。数据流团队和大数据团队都使用这些存储变更日志事件作为其源输入数据以进行进一步处理。
我们的数据摄取平台Marmaray以小批量的方式运行,并从Kafka获取上游的存储变更日志,通过Hudi库将这些日志应用在Hadoop的现有数据上。如之前所述,Hudi支持更新插入操作,允许用户添加新记录和更新或删除历史数据。Spark摄取作业每10-15分钟运行一次,在Hadoop中提供30分钟的原始数据延迟(具有1-2个摄取作业失败或重试的余量)。为避免因多次将相同的源数据摄取到Hadoop而导致效率低下,我们不允许在原始数据摄取期间进行任何转换,所以我们决定将原始数据提取框架变成EL平台,而不是传统的ETL平台。这个模型鼓励用户在上游数据以原始嵌套格式流入后,在Hadoop中以批处理模式执行所需的转换操作。
自从对我们的大数据平台实施这些更改以来,我们通过避免不必要或低效的摄取操作有效地节省了大量的计算资源。由于我们现在可以避免在摄取过程中进行容易出错的转换,原始数据的可靠性也得到了显著提升。现在,用户可以使用任何大数据处理引擎在原始源数据上运行转换。此外,一旦出现任何问题,用户可以再次重新运行转换,并通过使用更多计算资源和更高程度的并行性来更快地完成批转换作业,从而保证SLA。
增量数据建模
考虑到需要将大量上游数据存储摄入到Hadoop中(截至年有超过个原始Hadoop表),我们还构建了一个通用的摄取平台,以便以统一和可配置的方式将原始数据摄入到Hadoop中。现在,我们的大数据平台以增量的方式逐步更新原始Hadoop表,数据延迟为10-15分钟,实现了对源数据的快速访问。但是,为了确保建模表也具有低延迟,我们必须避免建模ETL作业中的低效率操作(比如避免完全重新创建派生表或进行完整的源原始表扫描)。实际上,Hudi允许ETL作业仅从源表中获取已更改的数据。建模作业只需要在每次迭代运行期间告知Hudi阅读器检查点时间戳,就可以从原始源表(不管实际记录存储在哪个日期分区)接收到一组新的或更新过的记录。
在ETL作业期间使用HudiWriter使我们能够更新派生建模表中的旧日期分区,而无需重新创建整个分区和表。因此,我们的建模ETL作业使用HudiReader逐步从源表中获取已更改的数据,并使用HudiWriter逐步更新派生的输出表。现在,ETL作业也能在不到30分钟内完成,Hadoop的所有派生表的端到端延迟减少到一个小时以内。
为了向Hadoop表的用户提供访问所有数据或仅访问新数据或更新数据的不同选项,使用Hudi存储格式的Hadoop原始表提供了两种不同的读取模式:
最新模式视图:在当前时间点提供整个Hadoop表的整体视图。这个视图包括所有记录的最新合并值以及表中的所有现有记录。
增量模式视图:仅根据给定时间戳从特定Hadoop表中获取新记录和更新记录。这个视图仅返回自最近检查点以来最近插入的以及已更新的行。此外,如果特定行自上一个检查点以来被多次更新,则将返回更新过程中所有更改的值(而不是仅返回最新的合并行)
图6描述了所有Hadoop表(以Hudi文件格式存储)的这两个读取视图:用户通常根据需要在这两个视图之间切换。当他们运行临时查询以便基于最新状态分析数据时,他们使用表的最新模式视图(比如获取美国每个城市的每周总行程次数)。另一方面,当用户需要完成迭代作业,或者仅需查询自最近的迭代执行以来已更改的记录或新记录,他们使用增量模式视图。两个视图始终可用于所有Hadoop表,用户可以根据需要在不同模式之间切换。
标准化数据模型
除了提供同一个表的不同视图外,我们还标准化了数据模型,为所有原始Hadoop数据提供两种类型的表:
变更日志历史记录表:包含特定上游表的所有变更日志历史记录。这个表让用户能够扫描给定表的变更历史记录,并且可以按照key进行合并,以提供每一行的最新值。
快照合并表:包含最新的上游表的合并视图。这个表包含所有历史变更日志的压缩合并视图。
图7描述了如何使用给定变更日志流为特定上游源数据存储生成不同的Hive原始表:
但是,变更日志流不一定包含给定key的整个行(所有列)。虽然合并的快照表始终提供特定key的所有列,但如果变更日志的上游流仅提供部分行变更日志,变更日志历史表可能就比较稀疏。如果用户希望从变更日志历史记录表中获取更改的值,并将其与合并的快照表相连以创建完整的数据行,我们还会在变更日志历史记录表的合并快照表中包含相同key的日期分区。通过避免合并快照表的完整表扫描,使得两个表能够更有效地进行跨分区连接。
图8总结了我们大数据平台的不同组件之间的关系:
第四代:下一步该怎样优化?
自年推出第三代大数据平台以来,整个公司的用户已经可以快速可靠地访问Hadoop中的数据,但仍然有提升的空间。我们正在努力增强我们的大数据平台,以改善数据质量、数据延迟,提升效率、可扩展性和可靠性等。
数据质量
为了提高数据质量,我们确定了两个需要改进的关键领域。首先,当某些上游数据存储没有强制执行或检查数据模式时(比如值为JSONblob的键值对),我们希望能够避免出现非相容模式的数据。不然不良数据会进入到我们的Hadoop生态系统,从而影响所有依赖这些数据的下游用户。为了防止不良数据的流入,我们正在转换所有上游数据存储,对数据内容执行强制性模式检查,并在数据出现任何问题时拒绝数据进入(比如未经模式的确认)。
我们发现的第二个问题是实际数据内容的质量。虽然使用模式可以确保数据包含正确的数据类型,但它们不会检查实际的数据值(比如一个整数,但不是0到之间的正数)。为了提高数据质量,我们正在扩展模式服务以支持语义检查。这些语义检查(也就是指Uber特定数据类型)使得我们能够在基本结构类型检查之外对实际数据内容添加额外的限制。
数据延迟
我们的目标是将Hadoop中的原始数据延迟减少到五分钟,将建模表的数据延迟减少到十分钟。这将使得更多用例从流式处理转向使用Hudi增量数据摄取的更有效的小批量处理。
同时,我们还在扩展我们的Hudi项目,以支持其他视图模式,其中包括现有的读取优化视图,以及显示数据延迟仅几分钟的实时视图。这个实时视图依赖于我们称之为Merge-On-Read或Hudi2.0的开源解决方案(也是Hudi的一部分)。
数据效率
为了提高数据效率,我们不再依赖专用硬件来实现任何服务,转向了服务容器化。此外,我们统一了所有资源调度程序和Hadoop生态系统,在整个公司内部搭建Hadoop和非数据服务之间的桥梁。这样我们就可以统一调度所有作业和服务,不用管它将运行在什么样的介质中。随着Uber的发展,数据位置将成为Hadoop应用程序的一大