hive 是基于 Hadoop 的一个数据仓库工具,可以将结构化的数据文件映射为一张数据库表,并提供完整的 sql 查询功能,可以将 sql 语句转换为MapReduce 任务进行运行。
其优点是学习成本低,可以通过类 SQL 语句快速实现简单的 MapReduce 统计,不必开发专门的 MapReduce 应用,十分适合数据仓库的统计分析,但是 Hive 不支持实时查询。
由于 Hive 的元数据可能要面临不断地更新、修改和读取操作,所以它显然不适合使用 Hadoop 文件系统进行存储。目前 Hive 将元数据存储在 RDBMS 中,比如存储在 MySQL、Derby 中。元数据信息包括:存在的表、表的列、权限和更多的其他信息。
HiveSQL -> AST(抽象语法树) -> QB(查询块) -> OperatorTree(操作树)-> 优化后的操作树 -> mapreduce 任务树 -> 优化后的 mapreduce 任务树
过程描述如下:
SQL Parser:Antlr 定义 SQL 的语法规则,完成 SQL 词法,语法解析,将 SQL 转化为抽象语法树 AST Tree;
Semantic Analyzer:遍历 AST Tree,抽象出查询的基本组成单元 QueryBlock;
Logical plan:遍历 QueryBlock,翻译为执行操作树 OperatorTree;
Logical plan optimizer: 逻辑层优化器进行 OperatorTree 变换,合并不必要的 ReduceSinkOperator,减少 shuffle 数据量;
Physical plan:遍历 OperatorTree,翻译为 MapReduce 任务;
Logical plan optimizer:物理层优化器进行 MapReduce 任务的变换,生成最终的执行计划;
如果其中有一张表为小表,直接使用 map 端 join 的方式(map 端加载小表)进行聚合。
如果两张都是大表,例如分别是客户表和订单表 。那么采用联合 key,联合 key 的第一个组成部分是 join on 中的公共字段,第二部分是一个 flag,0 代表表 A,1 代表表 B,由此让 Reduce 区分客户信息和订单信息;在 Mapper 中同时处理两张表的信息,将 join on 公共字段相同的数据划分到同一个分区中,进而传递到一个 Reduce 中,然后在 Reduce 中实现聚合。
order by:会对输入做全局排序,因此只有一个 reducer(多个 reducer 无法保证全局有序)。只有一个 reducer,会导致当输入规模较大时,需要较长的计算时间 。
sort by:分区内有序,不是全局排序,其在数据进入 reducer 前完成排序
distribute by:按照指定的字段对数据进行划分输出到不同的 reduce 中 ,结合 sory by 使用
cluster by:当Distribute by和Sorts by字段相同时,可以使用Cluster by方式。Cluster by除了具有Distribute by的功能外还兼具Sort by的功能。但是排序只能是升序排序,不能指定排序规则为ASC或者DESC。
split 将字符串转化为数组,即:split('a,b,c,d' , ',') ==> ["a","b","c","d"]
coalesce(T v1, T v2, …) 返回参数中的第一个非空值;如果所有值都为 NULL,那么返回 NULL。
collect_list 列出该字段所有的值,不去重 => select collect_list(id) from table
Hive 支持三种不同的元存储服务器,分别为:内嵌式元存储服务器、本地元存储服务器、远程元存储服务器,每种存储方式使用不同的配置参数 。
内嵌式元存储主要用于单元测试,在该模式下每次只有一个进程可以连接到元存储,Derby 是内嵌式元存储的默认数据库 。
在本地模式下,每个 Hive 客户端都会打开到数据存储的连接并在该连接上请求 SQL 查询 。
在远程模式下,所有的 Hive 客户端都将打开一个到元数据服务器的连接,该服务器依次查询元数据,元数据服务器和客户端之间使用 Thrift 协议通信 。
内部表
如果 Hive 中没有特别指定,则默认创建的表都是管理表,也称内部表。由Hive负责管理表中的数据,管理表不共享数据。删除管理表时,会删除管理表中的数据和元数据信息 。
外部表
当一份数据需要被共享时,可以创建一个外部表指向这份数据 。
删除该表并不会删除掉原始数据,删除的是表的元数据。这样外部表相对来说更加安全些,数据组织也更加灵活,方便共享源数据 。当表结构或者分区数发生变化时,需要进行一步修复的操作。
默认格式,存储方式为行存储,数据不做压缩,磁盘开销大,数据解析开销大。可结合 Gzip、Bzip2 使用(系统自动检查,执行查询时自动解压),但使用 这种方式,压缩后的文件不支持 split,Hive 不会对数据进行切分,从而无法对数据进行并行操作。并且在反序列化过程中,必须逐个字符判断是不是分隔符和行结束符,因此反序列化开销会比 SequenceFile 高几十倍 。
SequenceFile 是 Hadoop API 提供的一种二进制文件支持,存储方式为行存储,其具有使用方便、可分割、可压缩的特点。
SequenceFile 支持三种压缩选择:NONE,RECORD,BLOCK。Record 压缩率低,一般建议使用 BLOCK 压缩。
优势是文件和 hadoop api 中的 MapFile 是相互兼容的 。
存储方式:数据按行分块,每块按列存储。结合了行存储和列存储的优点:
首先,RCFile 保证同一行的数据位于同一节点,因此元组重构的开销很低 ;
其次,像列存储一样,RCFile 能够利用列维度的数据压缩,并且能跳过不必要的列读取;
存储方式:数据按行分块 每块按照列存储。
压缩快、快速列存取。
效率比 rcfile 高,是 rcfile 的改良版本。
总结:
相比 TEXTFILE 和 SEQUENCEFILE,RCFILE 由于列式存储方式,数据加载时性能消耗较大,但是具有较好的压缩比和查询响应。
数据仓库的特点是一次写入、多次读取,因此,整体来看,RCFILE 相比其余两种格式具有较明显的优势。
不是,从 Hive0.10.0 版本开始,对于简单的不需要聚合的类似 SELECT from LIMIT n 语句,不需要起 MapReduce job,直接通过 Fetch task 获取数据。
UDF:单行进入,单行输出
UDAF:多行进入,单行输出
UDTF:单行输入,多行输出
桶表是对数据进行哈希取值,然后放到不同文件中存储 。
数据加载到桶表时,会对字段取 hash 值,然后与桶的数量取模。把数据放到对应的文件中。物理上,每个桶就是表(或分区)目录里的一个文件,一个作业产生的桶(输出文件)和 reduce 任务个数相同 。
桶表专门用于抽样查询,是很专业性的,不是日常用来存储数据的表,需要抽样查询时,才创建和使用桶表。
定位原因:
map 输出数据按 key Hash 的分配到 reduce 中,由于 key 分布不均匀、业务数据本身的特点、建表时考虑不周、某些 SQL 语句本身就有数据倾斜等原因造成的 reduce 上的数据量差异过大。
如何避免:
对于 key 为空产生的数据倾斜,可以对其赋予一个随机值
解决方案:
(1)参数调节:
hive.map.aggr = true
hive.groupby.skewindata=true
有数据倾斜的时候进行负载均衡,当选项设定位 true,生成的查询计划会有两个 MR Job。第一个 MR Job 中,Map 的输出结果集合会随机分布到 Reduce 中,每个 Reduce 做部分聚合操作,并输出结果,这样处理的结果是相同的 Group By Key 有可能被分发到不同的 Reduce 中,从而达到负载均衡的目的;
第二个 MR Job 再根据预处理的数据结果按照 Group By Key 分布到 Reduce 中(这个过程可以保证相同的 Group By Key 被分布到同一个 Reduce 中),最后完成最终的聚合操作
(2)SQL语句调节:
① 选用 join key 分布最均匀的表作为驱动表。做好列裁剪和 filter 操作,以达到两表做 join 的时候,数据量相对变小的效果。
② 大小表 Join:使用 map join 让小的维度表(1000 条以下的记录条数)先进内存。在map 端完成 reduce。
③ 大表 Join 大表:把空值的 key 变成一个字符串加上随机数,把倾斜的数据分到不同的 reduce 上,由于 null 值关联不上,处理后并不影响最终结果。
④ count distinct 大量相同特殊值:count distinct 时,将值为空的情况单独处理,如果是计算 count distinct, 可以不用处理,直接过滤,在最后结果中加 1。如果还有其他计算,需要进行 group by,可以先将值为空的记录单独处理,再和其他计算结果进行 union。
1)Rank
(1)RANK() 排序相同时会重复,总数不会变
(2)DENSE_RANK() 排序相同时会重复,总数会减少
(3)ROW_NUMBER() 会根据顺序计算
2) OVER()
指定分析函数工作的数据窗口大小,这个数据窗口大小可能会随着行的变而 变化
(1)CURRENT ROW:当前行
(2) n PRECEDING:往前 n 行数据
(3) n FOLLOWING:往后 n 行数据
(4) UNBOUNDED :起 点 , UNBOUNDED PRECEDING 表 示 从 前 面 的 起 点 , UNBOUNDED FOLLOWING 表示到后面的终点
(5) LAG(col,n) :往前第 n 行数据
(6) LEAD(col,n):往后第 n 行数据
(7) NTILE(n):把有序分区中的行分发到指定数据的组中,各个组有编号,编号从 1 开始,对于每一行,NTILE 返回此行所属的组的编号。注意:n 必须为 int 类型。
定位原因:
(1)动态分区插入数据,产生大量的小文件,从而导致 map 数量剧增;
(2)reduce 数量越多,小文件也越多(reduce 的个数和输出文件是对应的)
(3)数据源本身就包含大量的小文件。
解决方案:
(1)在 Map 执行前合并小文件,减少 Map 数: CombineHiveInputFormat 具有对小文件 进行合并的功能(系统默认的格式)。HiveInputFormat 没有对小文件合并功能。
(2)merge
SET hive.merge.mapfiles = true;
-- 默认 true,在 map-only 任务结束时合并 小文件
SET hive.merge.mapredfiles = true;
-- 默认 false,在 map-reduce 任务结 束时合并小文件
SET hive.merge.size.per.task = 268435456;
-- 默认 256M
SET hive.merge.smallfiles.avgsize = 16777216;
-- 当输出文件的平均大小 小于 16m 该值时,启动一个独立的 map-reduce 任务进行文件 merge
(3)开启 JVM 重用
set mapreduce.job.jvm.numtasks=10
Tez 可以将多个有依赖的作业转换为一个作业,这样只需写一次 HDFS,且中间节点较少,从而大大提升作业的计算性能。
Mr/tez/spark 区别:
Mr 引擎:多 job 串联,基于磁盘,落盘的地方比较多。虽然慢,但一定能跑出结果。一般处理,周、月、年指标 。
Spark 引擎:虽然在 Shuffle 过程中也落盘,但是并不是所有算子都需要 Shuffle,尤其 是多算子过程,中间过程不落盘 DAG 有向无环图。兼顾了可靠性和效率。一般处理天指标。
Tez 引擎:完全基于内存。注意:如果数据量特别大,慎重使用。容易 OOM。一般用于快速出结果,数据量比较小的场景。
本文系转载,版权归原作者所有,如若侵权请联系我们进行删除!
《数据资产管理白皮书》下载地址:https://www.dtstack.com/resources/1073/?src=bbs
《行业指标体系白皮书》下载地址:https://www.dtstack.com/resources/1057/?src=bbs
《数据治理行业实践白皮书》下载地址:https://www.dtstack.com/resources/1001/?src=bbs
《数栈V6.0产品白皮书》下载地址:https://www.dtstack.com/resources/1004/?src=bbs
想了解或咨询更多有关袋鼠云大数据产品、行业解决方案、客户案例的朋友,浏览袋鼠云官网:https://www.dtstack.com/?src=bbs
同时,欢迎对大数据开源项目有兴趣的同学加入「袋鼠云开源框架钉钉技术群」,交流最新开源技术信息,群号码:30537511,项目地址:https://github.com/DTStack