问答 FlinkX mysql数据同步到hive 报错

FlinkX mysql数据同步到hive 报错

   攻城的狮子    发布于 2022-04-25 20:50 最新回复 2022-04-26 10:11  来自于   攻城的狮子  370  2

做项目技术选型验证时测试的hadoop版本3.2.2 hive版本3 mysql5.7

直接全量从mysql到hive测试OK,说明flinkx能写入hive3
flinkx -job /shell/flinxjson/mysql2hive.json -jobType sync
json文件如下
{
"job": {
"content": [{
"reader": {
"name": "mysqlreader",
"parameter": {
"column": [{
"name": "tenant_id",
"type": "int"
}, {
"name": "tenant_name",
"type": "string"
}, {
"name": "created_at",
"type": "varchar(50)"
}],
"username": "root",
"password": "root",
"connection": [{
"jdbcUrl": ["jdbc:mysql://localhost:3306/bidm?useSSL=false"],
"table": ["tb1"]
}]
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"path": "/user/hive/warehouse/myhive.db/tb1",
"defaultFS": "hdfs://nameservice1",
"hadoopConfig": {
"dfs.nameservices": "nameservice1",
"dfs.ha.namenodes.nameservice1": "nn1,nn2",
"dfs.namenode.rpc-address.nameservice1.nn1": "192.168.80.95:9000",
"dfs.namenode.rpc-address.nameservice1.nn2": "192.168.80.95:9000",
"dfs.client.failover.proxy.provider.nameservice1": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
},
"fileType": "text",
"writeMode": "overwrite",
"column": [{
"name": "tenant_id",
"type": "int",
"index": 0
}, {
"name": "tenant_name",
"type": "STRING",
"index": 1
}, {
"name": "created_at",
"type": "STRING",
"index": 2
}]
}
}
}],
"setting": {
"speed": {
"channel": 1,
"bytes": 0
}
}
}
}

想通过自定义sql导数json改成下面形式,调用: flinkx -job /shell/flinxjson/a_my2hive.json -jobType sync

{
"job": {
"content": [{
"reader": {
"name": "mysqlreader",
"parameter": {
"column": [
{
"name": "tenant_id",
"type": "int"
},
{
"name": "tenant_name",
"type": "string"
},
{
"name": "created_at",
"type": "varchar(50)"
}
],
"username": "root",
"password": "root",
"connection": [
{
"jdbcUrl": [
"jdbc:mysql://localhost:3306/bidm?useSSL=false"
],
"table": [
"tb1"
],
"column": ["tenant_id", "tenant_name","created_at"],
"customSql": "SELECT tenant_id,tenant_name, created_at from bidm.tb1 where created_at <> '20220423'"
}
]
}
},

"writer": {
"name": "hivewriter",
"parameter": {
"jdbcUrl": "jdbc:hive2://192.168.80.95:10000/myhive",
"fileType": "text",
"writeMode": "appent",
"charsetName": "UTF-8",
"fieldDelimiter":"\t",
"tablesColumn": "{\"tb1\":[{\"key\":\"tenant_id\",\"type\":\"int\"},{\"key\":\"tenant_name\",\"type\":\"string\"}]}",
"partition" : "created_at",
"defaultFS": "hdfs://192.168.80.95:9000",
"hadoopConfig": {
"dfs.nameservices": "nameservice1",
"dfs.ha.namenodes.nameservice1": "nn1,nn2",
"dfs.namenode.rpc-address.nameservice1.nn1": "192.168.80.95:9000",
"dfs.namenode.rpc-address.nameservice1.nn2": "192.168.80.95:9000",
"dfs.client.failover.proxy.provider.nameservice1": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"

}
}
}

}],
"setting": {
"speed": {
"channel": 1,
"bytes": 0
}
}
}
}

报错:

2022-04-25 20:47:14.423 [Legacy Source Thread - Source: mysqlsourcefactory -> Sink: hivesinkfactory (1/1)#0] ERROR com.dtstack.flinkx.connector.hive.util.HiveUtil  - 
com.dtstack.flinkx.throwable.FlinkxRuntimeException: execute sql:alter table tb1 add if not exists partition (created_at=20220425), errorMessage:[Error while compiling statement: FAILED: ValidationFailureSemanticException myhive.tb1 table is not partitioned but partition spec exists: {created_at=20220425}]
.......
2022-04-25 20:47:14.434 [Legacy Source Thread - Source: mysqlsourcefactory -> Sink: hivesinkfactory (1/1)#0] ERROR com.dtstack.flinkx.source.DtInputFormatSourceFunction - Exception happened, start to close format
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
........
Caused by: com.dtstack.flinkx.throwable.FlinkxRuntimeException: execute sql:alter table tb1 add if not exists partition (created_at=20220425), errorMessage:[Error while compiling statement: FAILED: ValidationFailureSemanticException myhive.tb1 table is not partitioned but partition spec exists: {created_at=20220425}]
at com.dtstack.flinkx.connector.hive.util.HiveDbUtil.executeSqlWithoutResultSet(HiveDbUtil.java:333)
at com.dtstack.flinkx.connector.hive.util.HiveUtil.createPartition(HiveUtil.java:112)

看起来像是需要建分表?? 怎么才能通过自定义sql实现mysql导数到hive




2条回答
攻城的狮子
回复于 2022-04-26 10:11

hive也改成了分区表 用的是上面的json 依然报错空指针

0 0
攻城的狮子
回复于 2022-04-26 09:15

改成这样 

{
"job": {
"content": [
{
"reader": {
"parameter": {
"username": "root",
"password": "root",
"connection": [{
"jdbcUrl": ["jdbc:mysql://192.168.80.95:3306/bidm?useUnicode=true&characterEncoding=utf8"],
"table": ["tb1"]
}],
"column": ["tenant_id", "tenant_name","created_at"],
"customSql": "SELECT tenant_id,tenant_name, created_at from bidm.tb1 where created_at <> '20220423'",
"splitPk": "",
"queryTimeOut": 1000,
"requestAccumulatorInterval": 2
},
"name": "mysqlreader"
},
"writer": {
// hivewriter必填
"name": "hivewriter",
"parameter": {
"defaultFS": "hdfs://192.168.80.95:9000",
"table-name":"myhive.tb1",
"hadoopConfig": {
// namenode节点名,逗号分割
"dfs.ha.namenodes.ns1" : "192.168.80.95",
// 访问hdfs路径
"fs.defaultFS" : "hdfs://192.168.80.95:9000",
// rpc远程调用的节点:端口
"dfs.namenode.rpc-address.ns1.nn1" : "192.168.80.95:9000",
"dfs.client.failover.proxy.provider.ns1" : "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider",
// nameservice的节点名
"dfs.nameservices" : "192.168.80.95",
"fs.hdfs.impl" : "org.apache.hadoop.hdfs.DistributedFileSystem",
"fs.hdfs.impl.disable.cache" : "true"

},
"partitionType":"DAY",
"partition":"pt",
"writeMode":"append",
"encoding": "utf-8",
"fileType": "text",
// 表名:[{key:字段名,type:字段类型},{}] **无表名则新建表再插入
"tablesColumn": "{\"tb1\":[{\"key\":\"tenant_id\",\"type\":\"int\"},{\"key\":\"tenant_name\",\"type\":\"string\"},{\"key\":\"created_at\",\"type\":\"string\"}]}",
// jdbc:hive2://hive所在IP地址:10000/库名
"jdbcUrl": "jdbc:hive2://192.168.80.95:10000/myhive",
"username" : "root",
"password" : "",
"charsetName" : "utf-8"
}
}
}
],
"setting": {
"speed": {
"channel": 1,
"bytes": 0
},
"errorLimit": {
"record": 100
},
"restore": {
"maxRowNumForCheckpoint": 0,
"isRestore": false,
"restoreColumnName": "",
"restoreColumnIndex": 0
},
"log" : {
"isLogger": false,
"level" : "debug",
"path" : "",
"pattern":""
}
}
}
}

还是报错 

022-04-25 22:23:33.562 [Legacy Source Thread - Source: mysqlsourcefactory -> Sink: hivesinkfactory (1/1)#0] INFO  com.dtstack.flinkx.connector.mysql.source.MysqlInputFormat  - Executing sql is: 'SELECT * FROM (SELECT tenant_id,tenant_name, created_at from bidm.tb1 where created_at <> '20220423') flinkx_tmp WHERE  1=1 '
2022-04-25 22:23:33.572 [Legacy Source Thread - Source: mysqlsourcefactory -> Sink: hivesinkfactory (1/1)#0] ERROR com.dtstack.flinkx.source.DtInputFormatSourceFunction - Exception happened, start to close format
java.lang.NullPointerException: null
at com.dtstack.flinkx.connector.mysql.converter.MysqlRawTypeConverter.apply(MysqlRawTypeConverter.java:22)
at com.dtstack.flinkx.util.TableUtil.createRowType(TableUtil.java:107)
at com.dtstack.flinkx.connector.jdbc.source.JdbcInputFormat.openInternal(JdbcInputFormat.java:140)
at com.dtstack.flinkx.source.format.BaseRichInputFormat.open(BaseRichInputFormat.java:156)
at com.dtstack.flinkx.source.DtInputFormatSourceFunction.run(DtInputFormatSourceFunction.java:126


0 0
社区公告
  • 大数据领域最专业的产品&技术交流社区,专注于探讨与分享大数据领域有趣又火热的信息,专业又专注的数据人园地

最新活动更多
微信扫码获取数字化转型资料
钉钉扫码加入技术交流群