博客 elasticsearch数据同步到hive

elasticsearch数据同步到hive

   数栈君   发表于 2023-07-31 11:29  298  0

背景

来自用户的需求: 用户有一部分数据来自 elasticsearch ,我们已经支持了通过 presto 查询 es 数据。但是用户需要将 es 表 和 hive 表做关联查询,而 presto 是不能跨数据源进行 join 查询的。所以需要先把 es 数据导入到 hive 中

用户对数据同步周期的要求并不高 一天1-2次就可以了,所以继续使用我们emr集群中已有的 azkaban 服务进行调度,把 es 数据同步到 hive 的过程写到 azkaban 中,实现了7张表的定期同步

hive 创建 es 外表

参考教程-Elasticsearch-Hive

hive 引入 elasticsearch-hadoop 依赖包

hive 默认不支持创建 es 外表,需要引入 elasticsearch-hadoop 依赖包

修改 hive.aux.jars.path 配置, 多个可以用逗号分隔,如下:

hive.aux.jars.path=file:///opt/modules/hive/auxlib/elasticsearch-hadoop-hive-8.8.0.jar


创建 hive 外表

sql 示例:

CREATE EXTERNAL TABLE temp.es_external_table ( fieldNameA STRING, fieldNameB STRING ) STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler'
TBLPROPERTIES(
        'es.resource'='es索引名',
        'es.nodes'='es_host',
        'es.port'='es_port',
        'es.mapping.names'='fieldNameA:fieldNameA,fieldNameB:fieldNameB'
);

这里踩了一个坑: 由于 hive 创建表会忽略大小写,不管 sql 中定义的字段是什么样子,都会统一转成小写。所以导致es 中的驼峰名称字段 会映射失败,最后查出的数据都是 null

类似的坑-创建mongodb 外表时遇到的

因此需要显式地通过 es.mapping.names 配置指定字段名称的关联关系,像示例那样

同步脚本

从 es 表到 hive 表,大致步骤为: 创建 hive 外表,关联 es 数据 => 创建 hive 内表 => 同步外表数据到内表

过程写到脚本中如下: (create_hive_to_es_table.sh)

## 获取指定索引的所有 es 表字段
get_index_field_ret=`curl http://${es_address}/${index_name}?pretty=true`
field_arr=`echo ${get_index_field_ret} | jq -r ".${index_name}.mappings.properties | keys | join(\" \")"`

## 创建 hive 外表
temp_table_name="temp.es_${index_name}"
temp_rename_table_name="${hive_db}.es_${index_name}_bak"
actual_table_name="${hive_db}.es_${index_name}"

create_external_table_sql="CREATE EXTERNAL TABLE ${temp_table_name} ("
for current_field in ${field_arr[@]}
do
create_external_table_sql="${create_external_table_sql} ${current_field} STRING,"
done
create_external_table_sql=`echo ${create_external_table_sql} | sed 's/,$//g'`

### 组装 es.mapping.names
create_external_table_sql="${create_external_table_sql}) STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler' TBLPROPERTIES('es.resource'='${index_name}','es.nodes'='${es_host}','es.port'='${es_port}','es.mapping.names'='"
for current_field in ${field_arr[@]}
do
create_external_table_sql="${create_external_table_sql}${current_field}:${current_field},"
done
create_external_table_sql=`echo ${create_external_table_sql} | sed 's/,$//g'`

create_external_table_sql="${create_external_table_sql}')"
drop_external_table_sql="drop table if exists ${temp_table_name}"
echo "create external sql: ${create_external_table_sql}"

beeline -n ${hive_user} -u ${hive_server} -e "${drop_external_table_sql}"
beeline -n ${hive_user} -u ${hive_server} -e "${create_external_table_sql}"

## 创建 hive 临时内表
create_temp_table_sql="CREATE TABLE ${temp_rename_table_name} AS SELECT * FROM ${temp_table_name}"
drop_temp_table_sql="drop table if exists ${temp_rename_table_name}"

echo "create temp table sql: ${create_temp_table_sql}"

beeline -n ${hive_user} -u ${hive_server} -e "${drop_temp_table_sql}"
beeline -n ${hive_user} -u ${hive_server} -e "${create_temp_table_sql}"

## 重命名表(用于快速重建用户直接用的表)

create_actual_table_sql="ALTER TABLE ${temp_rename_table_name} RENAME TO ${actual_table_name}"
drop_actual_table_sql="drop table if exists ${actual_table_name}"

echo "create actual table sql: ${create_actual_table_sql}"

beeline -n ${hive_user} -u ${hive_server} -e "${drop_actual_table_sql}"
beeline -n ${hive_user} -u ${hive_server} -e "${create_actual_table_sql}"

azkaban 任务

定义任务流程
需要重建7张表,因此定义成 父任务 -> 7个子任务

# es_to_hive_parent.job
type=command

command=echo "es to hive success!"

dependencies=table1,table2,table3,table4,table5,table6,table7

因为前面具体外表的创建流程 已经写在脚本中了,所以子任务这里直接调用 create_hive_to_es_table.sh 就行

# table1.job
type=flow

job.name=table1
flow.name=ES_TO_HIVE

index.name=es索引名
hive_db=目标 hive 库名

# ES_TO_HIVE.job
type=command

command=sh create_hive_to_es_table.sh ${es.address} ${index.name} ${hive.server} ${hive.user} ${hive.db}


总结

基于目前的资料搜索 这种方案应该是 es数据同步到 hive 比较通用的。但是确实不适合大批量数据同步的场景,也不能直接同步增量数据

想同步增量数据的话 应该需要从数据源头入手了,比如 es 数据是来自 kafka 的,那么需要通过类似 canal 的服务来同步增量数据,架构和这里说到的远远不同

免责申明:

本文系转载,版权归原作者所有,如若侵权请联系我们进行删除!

《数据治理行业实践白皮书》下载地址:https://fs80.cn/4w2atu

《数栈V6.0产品白皮书》下载地址:
https://fs80.cn/cw0iw1

想了解或咨询更多有关袋鼠云大数据产品、行业解决方案、客户案例的朋友,浏览袋鼠云官网:
https://www.dtstack.com/?src=bbs

同时,欢迎对大数据开源项目有兴趣的同学加入「袋鼠云开源框架钉钉技术群」,交流最新开源技术信息,群号码:30537511,项目地址:
https://github.com/DTStack

0条评论
社区公告
  • 大数据领域最专业的产品&技术交流社区,专注于探讨与分享大数据领域有趣又火热的信息,专业又专注的数据人园地

最新活动更多
微信扫码获取数字化转型资料
钉钉扫码加入技术交流群