Hive 快速入门

简介

Hive 是基于 Hadoop 的一个数仓工具,基于存储在 HDFS 上的结构化的数据文件提供类似 SQL 的查询功能,本质上是将 SQL 转化为 MapReduce/Spark 任务执行,从而提取到需要的数据。

提出 Hive 的主要原因是:手动编写分布式的查询任务门槛较高,实现复杂,导致基于大数据的业务开发成本高。这一原因源于 MySQL 这类型的单机数据库难以承担大数据时代巨量的数据,所以分布式的系统是必须的。虽然 Hive 进行查询的时延较高,同时难以支持索引,但是其拥有灵活的拓展性,可以很容易地拓展到上千台机器,以此承载的大量数据处理能力是单机数据库难以企及的。

所以 Hive 通常是用于做批量数据的统计分析,不适合用于时延要求较低的在线服务。

SQL 与 MapReduce

我们可以从原始的 MapReduce 进行数据查询来理解,为什么说原始的分布式查询任务难度高。

假设我们存在三个表:
studens:

student_id name age major
1 Alice 22 Computer Science
2 Bob 21 Math
3 Charlie 23 Physics
courses:
course_id course_name credits
101 Databases 4
102 Algorithms 3
103 Calculus 4
enrollments:
student_id course_id
1 101
2 103
3 102

需要注意的是,虽然他们逻辑上和数据库一样是一个表,但本质上可能就是一些结构化的文件,这些表的元数据会存在另外的地方,例如 MySQL 中。一个示例的 Hive 表底层如下

1,Alice,22,Computer Sci
2,Bob,21,Math
3,Charlie,23,Physics

接下来我们使用 Python 代码来代表 map 和 reduce 的逻辑,代码中的 print 表示输出,来实现从 SQL 到 MapReduce 程序的映射。

查询

查询通常较为简单,可以不需要 reduce 阶段

SELECT name, major FROM students WHERE age > 21;
# Mapper
def mapper(line):
    parts = line.split(',')
    if int(parts[2]) > 21:
        print(f"{parts[1]},{parts[3]}")

# Reducer
def reducer(key, values):
    for value in values:
        print(value)

连接

连接稍微复杂点,在于 map 阶段会读取多个表,输出多种键值对,随后在 reduce 阶段进行连接操作,需要注意的是,在 map 阶段,相同 key 的会被 shuffle 到同一个 reduce 任务中,也就是 reduce 任务中拿到的键值对键都是一样的。所以我们在连接的操作中,只要保证所要求连接的字段作为 key,在 reduce 任务中直接笛卡尔积即可,无需额外的判断。

SELECT students.name, enrollments.student_id
FROM students
JOIN enrollments ON students.student_id = enrollments.student_id
# Mapper
# 以 student_id 为 key
def mapper(table, line):
    parts = line.split(',')
    if table == "students":
	    stu_id, stu_name = parts[0], parts[1]
        print(f"students,{stu_id},{stu_name}")
    elif table == "enrollments":
	    stu_id, course_id = parts[0], parts[1]
        print(f"enrollments,{stu_id},{course_id}")
# Reducer
# values 只包含 key 相同的键值对,也就是 student_id 相同
def reducer(key, values):
    students = []
    enrollments = []

    for value in values:
        parts = value.split(',')
        if parts[0] == 'students':
            students.append([parts[1], parts[2]])
        elif parts[0] == 'enrollments':
            enrollments.append([parts[1], parts[2]])

	for stu in students:
		for enroll in entollments:
			print(f'{stu[1]},{enroll[1]}')

更多 SQL 操作

可以参考 SQL转化为MapReduce的过程-CSDN博客

Hive 查询结构

在了解了底层的 SQL 转化为 MapReduce 任务的大致过程后,我们就可以开始了解 Hive 的结构,来看他是怎么实现从 SQL 转化成 MapReduce 任务的。

简单来讲,Hive 对于 SQL 解析有几个板块:

  • 解析器:该模块实现将 SQL 查询转化为抽象语法树(AST),并进行语义分析,确保查询是有效的,包括检查表和列是否存在,数据类型是否匹配等。
  • 编译器:将抽象语法树的表示转化为可以执行的逻辑计划,是一个有向无环图(DAG),该逻辑计划不依赖于底层的执行引擎,是一种中间表示,便于优化器进行优化。具体来讲,逻辑计划可以表示为一个操作树。
  • 优化器:对逻辑计划的操作树进行优化、合并冗余查询,提高查询执行的效率,优化器涉及到多种优化技术,主要目标是减少数据的读写操作。
  • 执行器:将优化器优化后的执行逻辑落实到各个执行引擎中,例如 MapReduce 或 Spark 等分布式任务引擎,会产生一系列具体的作业发布到集群中,是真正执行任务的地方。
  • 元数据库:存储一些表的元数据,例如表名、列名、列数据类型等,提供解析器在语义解析时快速查询相关元数据,这里通常使用 MySQL 来实现元数据库。

由于这部分的内容涉及到编译原理,就不展开描述了,具体可以参考 Hive SQL的编译过程 - 美团技术团队 (meituan.com)

Hive 表结构

Hive 的目录结构分为以下几个部分:

  • db:在 HDFS 下的一个目录,代表某一个数据库
  • table:db 下的一个目录,代表某一个表
  • partition:表现为 table 下的子目录,可根据逻辑嵌套
  • bucket:表现为 table 下的文件,根据行 hash 值散列成多个文件

Hive 的表一共有四种类型:

  • 内部表:对应传统数据库表
  • 外部表:对外引用一个已存在的表
  • 分区表:基于逻辑实现索引效果
  • 分桶表:基于数据分布实现索引效果

内部表和传统意义上的数据库表是类似的,每个表都有自己的存储目录,而外部表表示指向一个已经存在的数据,除了删除外部表纸删除元数据而不会删除实际数据外,和内部表没有太多区别。

除了内部表和外部表外,还有分区表和桶表,这两个分类一定程度上扮演了索引的职责,每个分区底层都表现为一个目录。

例如我们可以使用国家来进行分区,这样每个国家的文件都会在同一个目录中,还可以进一步对日期和城市进行分区,最后这样就可以实现一个查询路径:

/partitinTable/nation=china/city=Shanghai/ds=20130506/

这样就实现了一个类似索引的效果,不用进行全盘扫描了。

而对于分桶表来说,其则是通过字段的 hash 来实现数据的切分和索引,本质上就是不同 hash 值的数据写到不同的桶中。这样在查询的时候,根据查询的 hash 值,就可以缩小搜索的范围,也算是一种索引。

Hive 的其他概念

视图

Hive 中的视图和数据库中的视图类似,是一种逻辑上的简化,对复杂的 SQL 查询产生一种中间逻辑表。在 Hive 中,视图不会存储数据,只有当查询时,才会实际进行查询,其只会涉及到元数据的更改。所以如果底层表发生更改,可能导致视图查询失败。

数据抽样

当需要加快数据分析时,特别是初期分析时,可以先对数据进行采样,在小样本的数据上验证后再上全量数据,这样可以加快数据分析的开发流程。

其中分为随机抽样、桶表抽样和块抽样。随机抽样即对全量数据进行随机的选取,块抽样根据 hive 表的大小按比例抽取数据,桶表抽样则是随机选取几个桶。

缺点

  • 调优困难,在资源管理层面依赖 Hadoop,例如 Yarn,本身无法做到资源管理;其次优化器的优化能力较弱,本身是针对 MapReduce 任务调度来优化的,粒度较粗;

自定义函数

UDF(User-Defined Function),实现一种中间转化,例如字符串的格式化及解析、字段加密等等。输入一个元素,输出一个元素。

UDAF(User-Defined Aggregate Function),实现自定义基于 Group By 的聚合函数。输入多个元素,输出一个元素。

UDTF(User-Defined Table-Generating Function),通常用于复杂的数据转换和生成,例如将一个输入字符串解析成多个字段。输入零个或多个元素,输出多个元素。

数据倾斜

如果 key 的分布存在数据倾斜,会导致 shuffle 阶段产生瓶颈,例如存在大量的 null 值,导致一个 map 节点的负载过重,成为瓶颈。

set hive.groupby.skewindata = true

将一个 MapReduce 拆成两个阶段,第一个阶段的 key 会被打散,尽可能均匀分布在不同的 Reduce 节点中。但是此时 Reduce 节点不一定能拿到同一个 key 的所有 value,所以需要进行第二次的 MapReduce,这次就是正常的流程。由于第一次的 MapReduce 已经做了一次 Reduce 操作,原始的数据倾斜问题会得到一定的改善,能解决大部分数据倾斜的情况。

小文件合并

由于底层 HDFS 一个块的大小是固定的,每个文件至少占用一个块,如果存在大量小文件,则会浪费很多的空间;另一方面,大量小文件会增加 namenode 的元数据,导致 namenode 内存占用上升;最后,MapReduce 的 map 阶段会对单个文件启动一个 map 任务,如果小文件过多,则会创建过多的 map 任务,也会严重影响性能。

通过设置一些监控,当小文件过多时,就会触发文件的合并,将多个小文件合并成一个大文件。

Reference

[1] https://cloud.tencent.com/developer/article/1880494

上一篇 下一篇

评论 | 0条评论