大多数企业的关键数据存在于OLTP数据库中,存储在这些数据库中的数据包含有关用户,产品和其他有用信息。如果要分析此数据,传统方法是定期将该数据复制到OLAP数据仓库中。Hadoop已经出现在这个领域并扮演了两个角色:数据仓库的替代品;结构化、非结构化数据和数据仓库之间的桥梁。图5.8显示了第一个角色,其中Hadoop在将数据导到OLAP系统(BI应用程序的常用平台)之前用作大规模加入和聚合工具。
图5.8使用Hadoop进行OLAP数据输入输出和处理
以Facebook为例,该企业已成功利用Hadoop和Hive作为OLAP平台来处理数PB数据。图5.9显示了类似于Facebook的架构。该体系结构还包括OLTP系统的反馈循环,可用于推送在Hadoop中发现的洞察,例如为用户提供建议。
在任一使用模型中,我们都需要一种将关系数据引入Hadoop的方法,还需要将其输出到关系数据库中。本节,我们将使用Sqoop简化将关系数据输出到Hadoop的过程。
实践:使用Sqoop从MySQL导入数据
Sqoop是一个可用于将关系数据输入和输出Hadoop的项目。它是一个很好的高级工具,封装了与关系数据移动到Hadoop相关的逻辑,我们需要做的就是为Sqoop提供确定输出哪些数据的SQL查询。该技术提供了有关如何使用Sqoop将MySQL中的某些数据移动到HDFS的详细信息。
图5.9使用Hadoop进行OLAP并反馈到OLTP系统
本节使用Sqoop1.4.4版本,此技术中使用的代码和脚本可能无法与其他版本的Sqoop一起使用,尤其是Sqoop2,它是作为Web应用程序实现的。
问题
将关系数据加载到集群中,并确保写入有效且幂等。
解决方案
在这种技术中,我们将看到如何使用Sqoop作为将关系数据引入Hadoop集群的简单机制。我们会介绍将数据从MySQL导入Sqoop的过程,还将介绍使用快速连接器的批量导入(连接器是提供数据库读写访问的特定于数据库的组件)。
讨论
Sqoop是一个关系数据库输入和输出系统,由Cloudera创建,目前是Apache项目。
执行导入时,Sqoop可以写入HDFS、Hive和HBase,对于输出,它可以执行相反操作。导入分为两部分:连接到数据源以收集统计信息,然后触发执行实际导入的MapReduce作业。图5.10显示了这些步骤。
图5.10Sqoop导入:连接到数据源并使用MapReduce
Sqoop有连接器的概念,它包含读写外部系统所需的专用逻辑。Sqoop提供两类连接器:用于常规读取和写入的通用连接器,以及使用数据库专有批处理机制进行高效导入的快速连接器。图5.11显示了这两类连接器及其支持的数据库。
图5.11用于读写外部系统的Sqoop连接器
在继续之前,我们需要访问MySQL数据库,并且MySQLJDBCJAR需要可用。以下脚本将创建必要的MySQL用户和模式并加载数据。该脚本创建了一个hip_sqoop_userMySQL用户,并创建了包含三个表的sqoop_test数据库:stocks,stocks_export和stocks_staging。然后,它将stock样本数据加载到表中。所有这些步骤都通过运行以下命令来执行:
这是快速浏览脚本功能:
第一个Sqoop命令是基本导入,在其中指定MySQL数据库和要导出的表连接信息:
MySQL表名称
Linux中的MySQL表名称区分大小写,确保在Sqoop命令中提供的表名使用正确的大小写。
默认情况下,Sqoop使用表名作为HDFS中的目标目录,用于执行导入的MapReduce作业。如果再次运行相同的命令,MapReduce作业将失败,因为该目录已存在。
我们来看看HDFS中的stocks目录:
导入数据格式
Sqoop已将数据导入为逗号分隔的文本文件。它支持许多其他文件格式,可以使用表5.6中列出的参数激活它们。
表5.6控制导入文件格式的Sqoop参数
如果要导入大量数据,则可能需要使用Avro等文件格式,这是一种紧凑的数据格式,并将其与压缩结合使用。以下示例将Snappy压缩编解码器与Avro文件结合使用。它还使用--target-dir选项将输出写入表名的不同目录,并指定应使用--where选项导入行的子集。可以使用--columns指定要提取的特定列:
请注意,必须在io.
不要忘记锁定文件以避免用户窥探:
然后,我们可以通过--options-file选项将此文件名提供给Sqoop作业,Sqoop将读取文件中指定的选项,这意味着无需在命令行上提供它们:
数据拆分
Sqoop如何在多个mapper之间并行化导入?在图5.10中,我展示了Sqoop的第一步是如何从数据库中提取元数据。它检查导入的表以确定主键,并运行查询以确定表中数据的下限和上限(见图5.12)。Sqoop假设在最小和最大键内的数据接近均匀分布,因为它将delta(最小和最大键之间的范围)按照mapper数量拆分。然后,为每个mapper提供包含一系列主键的唯一查询。
图5.12确定查询拆分的Sqoop预处理步骤
我们可以将Sqoop配置为使用带有--split-by参数的非主键,这在最小值和最大值之间没有均匀分布的情况下非常有用。但是,对于大型表,需要注意--split-by中指定的列已编制索引以确保最佳导入时间,可以使用--boundary-query参数构造备用查询以确定最小值和最大值。
增量导入
Sqoop支持两种导入类型:追加用于随时间递增的数值数据,例如自动增量键;lastmodified适用于带时间戳的数据。在这两种情况下,都需要使用--check-column指定列,通过--incremental参数指定模式(值必须是append或lastmodified),以及用于通过--last-value确定增量更改的实际值。
例如,如果要导入年1月1日更新的stock数据,则执行以下操作:
假设还有另一个系统继续写入该表,可以使用此作业的--last-value输出作为后续Sqoop作业的输入,这样只会导入比该日期更新的行。
Sqoop作业和Metastore
可以在命令输出中看到增量列的最后一个值。如何才能最好地自动化可以重用该值的流程?Sqoop有一个作业的概念,可以保存这些信息并在后续执行中重复使用:
执行上述命令会在SqoopMetastore中创建一个命名作业,该作业会跟踪所有作业。默认情况下,Metastore包含在.sqoop下的主目录中,仅用于自己的作业。如果要在用户和团队之间共享作业,则需要为Sqoop的Metastore安装符合JDBC的数据库,并在发出作业命令时使用--meta-connect参数指定其位置。
在上一个示例中执行的作业创建命令除了将作业添加到Metastore之外没有做任何其他操作。要运行作业,需要显式执行,如下所示:
--show参数显示的元数据包括增量列的最后一个值。这实际上是执行命令的时间,而不是表中的最后一个值。如果正在使用此功能,请确保数据库服务器和与服务器(包括Sqoop客户端)交互的任何客户端的时钟与网络时间协议(NTP)同步。
Sqoop将在运行作业时提示输入密码。要使其在自动脚本中运行,需要使用Expect(一种Linux自动化工具)在检测到Sqoop提示输入密码时从本地文件提供密码,可以在GitHub上找到与Sqoop一起使用的Expect脚本,网址为:
第一个Sqoop命令是基本导入,在其中指定MySQL数据库和要导出的表连接信息:
MySQL表名称
Linux中的MySQL表名称区分大小写,确保在Sqoop命令中提供的表名使用正确的大小写。
默认情况下,Sqoop使用表名作为HDFS中的目标目录,用于执行导入的MapReduce作业。如果再次运行相同的命令,MapReduce作业将失败,因为该目录已存在。
我们来看看HDFS中的stocks目录:
导入数据格式
Sqoop已将数据导入为逗号分隔的文本文件。它支持许多其他文件格式,可以使用表5.6中列出的参数激活它们。
表5.6控制导入文件格式的Sqoop参数
如果要导入大量数据,则可能需要使用Avro等文件格式,这是一种紧凑的数据格式,并将其与压缩结合使用。以下示例将Snappy压缩编解码器与Avro文件结合使用。它还使用--target-dir选项将输出写入表名的不同目录,并指定应使用--where选项导入行的子集。可以使用--columns指定要提取的特定列:
请注意,必须在io.
不要忘记锁定文件以避免用户窥探:
然后,我们可以通过--options-file选项将此文件名提供给Sqoop作业,Sqoop将读取文件中指定的选项,这意味着无需在命令行上提供它们:
数据拆分
Sqoop如何在多个mapper之间并行化导入?在图5.10中,我展示了Sqoop的第一步是如何从数据库中提取元数据。它检查导入的表以确定主键,并运行查询以确定表中数据的下限和上限(见图5.12)。Sqoop假设在最小和最大键内的数据接近均匀分布,因为它将delta(最小和最大键之间的范围)按照mapper数量拆分。然后,为每个mapper提供包含一系列主键的唯一查询。
图5.12确定查询拆分的Sqoop预处理步骤
我们可以将Sqoop配置为使用带有--split-by参数的非主键,这在最小值和最大值之间没有均匀分布的情况下非常有用。但是,对于大型表,需要注意--split-by中指定的列已编制索引以确保最佳导入时间,可以使用--boundary-query参数构造备用查询以确定最小值和最大值。
增量导入
Sqoop支持两种导入类型:追加用于随时间递增的数值数据,例如自动增量键;lastmodified适用于带时间戳的数据。在这两种情况下,都需要使用--check-column指定列,通过--incremental参数指定模式(值必须是append或lastmodified),以及用于通过--last-value确定增量更改的实际值。
例如,如果要导入年1月1日更新的stock数据,则执行以下操作:
假设还有另一个系统继续写入该表,可以使用此作业的--last-value输出作为后续Sqoop作业的输入,这样只会导入比该日期更新的行。