简介
Hive 是基于 Hadoop 的一个数仓工具,基于存储在 HDFS 上的结构化的数据文件提供类似 SQL 的查询功能,本质上是将 SQL 转化为 MapReduce/Spark 任务执行,从而提取到需要的数据。
提出 Hive 的主要原因是:手动编写分布式的查询任务门槛较高,实现复杂,导致基于大数据的业务开发成本高。这一原因源于 MySQL 这类型的单机数据库难以承担大数据时代巨量的数据,所以分布式的系统是必须的。虽然 Hive 进行查询的时延较高,同时难以支持索引,但是其拥有灵活的拓展性,可以很容易地拓展到上千台机器,以此承载的大量数据处理能力是单机数据库难以企及的。
所以 Hive 通常是用于做批量数据的统计分析,不适合用于时延要求较低的在线服务。
SQL 与 MapReduce
我们可以从原始的 MapReduce 进行数据查询来理解,为什么说原始的分布式查询任务难度高。
假设我们存在三个表:
studens:
student_id | name | age | major |
---|---|---|---|
1 | Alice | 22 | Computer Science |
2 | Bob | 21 | Math |
3 | Charlie | 23 | Physics |
courses: |
course_id | course_name | credits |
---|---|---|
101 | Databases | 4 |
102 | Algorithms | 3 |
103 | Calculus | 4 |
enrollments: |
student_id | course_id |
---|---|
1 | 101 |
2 | 103 |
3 | 102 |
需要注意的是,虽然他们逻辑上和数据库一样是一个表,但本质上可能就是一些结构化的文件,这些表的元数据会存在另外的地方,例如 MySQL 中。一个示例的 Hive 表底层如下
1,Alice,22,Computer Sci
2,Bob,21,Math
3,Charlie,23,Physics
接下来我们使用 Python 代码来代表 map 和 reduce 的逻辑,代码中的 print
表示输出,来实现从 SQL 到 MapReduce 程序的映射。
查询
查询通常较为简单,可以不需要 reduce 阶段
SELECT name, major FROM students WHERE age > 21;
# Mapper
def mapper(line):
parts = line.split(',')
if int(parts[2]) > 21:
print(f"{parts[1]},{parts[3]}")
# Reducer
def reducer(key, values):
for value in values:
print(value)
连接
连接稍微复杂点,在于 map 阶段会读取多个表,输出多种键值对,随后在 reduce 阶段进行连接操作,需要注意的是,在 map 阶段,相同 key 的会被 shuffle 到同一个 reduce 任务中,也就是 reduce 任务中拿到的键值对键都是一样的。所以我们在连接的操作中,只要保证所要求连接的字段作为 key,在 reduce 任务中直接笛卡尔积即可,无需额外的判断。
SELECT students.name, enrollments.student_id
FROM students
JOIN enrollments ON students.student_id = enrollments.student_id
# Mapper
# 以 student_id 为 key
def mapper(table, line):
parts = line.split(',')
if table == "students":
stu_id, stu_name = parts[0], parts[1]
print(f"students,{stu_id},{stu_name}")
elif table == "enrollments":
stu_id, course_id = parts[0], parts[1]
print(f"enrollments,{stu_id},{course_id}")
# Reducer
# values 只包含 key 相同的键值对,也就是 student_id 相同
def reducer(key, values):
students = []
enrollments = []
for value in values:
parts = value.split(',')
if parts[0] == 'students':
students.append([parts[1], parts[2]])
elif parts[0] == 'enrollments':
enrollments.append([parts[1], parts[2]])
for stu in students:
for enroll in entollments:
print(f'{stu[1]},{enroll[1]}')
更多 SQL 操作
可以参考 SQL转化为MapReduce的过程-CSDN博客
Hive 查询结构
在了解了底层的 SQL 转化为 MapReduce 任务的大致过程后,我们就可以开始了解 Hive 的结构,来看他是怎么实现从 SQL 转化成 MapReduce 任务的。
简单来讲,Hive 对于 SQL 解析有几个板块:
- 解析器:该模块实现将 SQL 查询转化为抽象语法树(AST),并进行语义分析,确保查询是有效的,包括检查表和列是否存在,数据类型是否匹配等。
- 编译器:将抽象语法树的表示转化为可以执行的逻辑计划,是一个有向无环图(DAG),该逻辑计划不依赖于底层的执行引擎,是一种中间表示,便于优化器进行优化。具体来讲,逻辑计划可以表示为一个操作树。
- 优化器:对逻辑计划的操作树进行优化、合并冗余查询,提高查询执行的效率,优化器涉及到多种优化技术,主要目标是减少数据的读写操作。
- 执行器:将优化器优化后的执行逻辑落实到各个执行引擎中,例如 MapReduce 或 Spark 等分布式任务引擎,会产生一系列具体的作业发布到集群中,是真正执行任务的地方。
- 元数据库:存储一些表的元数据,例如表名、列名、列数据类型等,提供解析器在语义解析时快速查询相关元数据,这里通常使用 MySQL 来实现元数据库。
由于这部分的内容涉及到编译原理,就不展开描述了,具体可以参考 Hive SQL的编译过程 - 美团技术团队 (meituan.com)
Hive 表结构
Hive 的目录结构分为以下几个部分:
- db:在 HDFS 下的一个目录,代表某一个数据库
- table:db 下的一个目录,代表某一个表
- partition:表现为 table 下的子目录,可根据逻辑嵌套
- bucket:表现为 table 下的文件,根据行 hash 值散列成多个文件
Hive 的表一共有四种类型:
- 内部表:对应传统数据库表
- 外部表:对外引用一个已存在的表
- 分区表:基于逻辑实现索引效果
- 分桶表:基于数据分布实现索引效果
内部表和传统意义上的数据库表是类似的,每个表都有自己的存储目录,而外部表表示指向一个已经存在的数据,除了删除外部表纸删除元数据而不会删除实际数据外,和内部表没有太多区别。
除了内部表和外部表外,还有分区表和桶表,这两个分类一定程度上扮演了索引的职责,每个分区底层都表现为一个目录。
例如我们可以使用国家来进行分区,这样每个国家的文件都会在同一个目录中,还可以进一步对日期和城市进行分区,最后这样就可以实现一个查询路径:
/partitinTable/nation=china/city=Shanghai/ds=20130506/
这样就实现了一个类似索引的效果,不用进行全盘扫描了。
而对于分桶表来说,其则是通过字段的 hash 来实现数据的切分和索引,本质上就是不同 hash 值的数据写到不同的桶中。这样在查询的时候,根据查询的 hash 值,就可以缩小搜索的范围,也算是一种索引。
Hive 的其他概念
视图
Hive 中的视图和数据库中的视图类似,是一种逻辑上的简化,对复杂的 SQL 查询产生一种中间逻辑表。在 Hive 中,视图不会存储数据,只有当查询时,才会实际进行查询,其只会涉及到元数据的更改。所以如果底层表发生更改,可能导致视图查询失败。
数据抽样
当需要加快数据分析时,特别是初期分析时,可以先对数据进行采样,在小样本的数据上验证后再上全量数据,这样可以加快数据分析的开发流程。
其中分为随机抽样、桶表抽样和块抽样。随机抽样即对全量数据进行随机的选取,块抽样根据 hive 表的大小按比例抽取数据,桶表抽样则是随机选取几个桶。
缺点
- 调优困难,在资源管理层面依赖 Hadoop,例如 Yarn,本身无法做到资源管理;其次优化器的优化能力较弱,本身是针对 MapReduce 任务调度来优化的,粒度较粗;
自定义函数
UDF(User-Defined Function),实现一种中间转化,例如字符串的格式化及解析、字段加密等等。输入一个元素,输出一个元素。
UDAF(User-Defined Aggregate Function),实现自定义基于 Group By 的聚合函数。输入多个元素,输出一个元素。
UDTF(User-Defined Table-Generating Function),通常用于复杂的数据转换和生成,例如将一个输入字符串解析成多个字段。输入零个或多个元素,输出多个元素。
数据倾斜
如果 key 的分布存在数据倾斜,会导致 shuffle 阶段产生瓶颈,例如存在大量的 null 值,导致一个 map 节点的负载过重,成为瓶颈。
set hive.groupby.skewindata = true
将一个 MapReduce 拆成两个阶段,第一个阶段的 key 会被打散,尽可能均匀分布在不同的 Reduce 节点中。但是此时 Reduce 节点不一定能拿到同一个 key 的所有 value,所以需要进行第二次的 MapReduce,这次就是正常的流程。由于第一次的 MapReduce 已经做了一次 Reduce 操作,原始的数据倾斜问题会得到一定的改善,能解决大部分数据倾斜的情况。
小文件合并
由于底层 HDFS 一个块的大小是固定的,每个文件至少占用一个块,如果存在大量小文件,则会浪费很多的空间;另一方面,大量小文件会增加 namenode 的元数据,导致 namenode 内存占用上升;最后,MapReduce 的 map 阶段会对单个文件启动一个 map 任务,如果小文件过多,则会创建过多的 map 任务,也会严重影响性能。
通过设置一些监控,当小文件过多时,就会触发文件的合并,将多个小文件合并成一个大文件。
Reference
[1] https://cloud.tencent.com/developer/article/1880494