博客 数据同步FlinkX如何从本地IDEA提交任务到kerberos集群

数据同步FlinkX如何从本地IDEA提交任务到kerberos集群

   铁柱   发表于 2022-01-11 18:27  1258  0
FlinkX 1.12 版本
1.准备认证相关信息
krb5.conf, keytab, flink-conf.yaml, core-site.xml, yarn-site.xml等文件
2.配置提交参数
执行用户:hive
http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/be22f9d53c974fd037c5611a00e5f951..png
任务配置参数
提交任务到kerberos环境中,需要把认证相关的文件上传到任务执行环境中,需要需要在confProp中把keytab,krb5.conf上传;
相关参数:
"pipeline.cached-files\":\"name:krb5.conf,path:/Users/wtz/work_place/conf_place/hadoop2/cdp_liantong/auth/kerberos/krb5.conf,executable:false;name:hive.keytab,path:/Users/wtz/work_place/conf_place/hadoop2/cdp_liantong/auth/kerberos/hive.keytab,executable:false\"
完整参数如下:
-mode yarn-per-job -jobType sync -job /Users/wtz/work_place/project_place/flinkx_1.12/flinkx/flinkx-examples/json/stream/stream.json -flinkxDistDir /Users/wtz/work_place/project_place/flinkx_1.12/flinkx/flinkx-dist -flinkConfDir /Users/wtz/work_place/conf_place/flink/kudu1 -hadoopConfDir /Users/wtz/work_place/conf_place/hadoop2/cdp_liantong/conf -flinkLibDir /Users/wtz/work_place/jar_place/flink-1.12.5/lib -confProp {\"yarn.application.queue\":\"default\",\"pipeline.cached-files\":\"name:krb5.conf,path:/Users/wtz/work_place/conf_place/hadoop2/cdp_liantong/auth/kerberos/krb5.conf,executable:false;name:hive.keytab,path:/Users/wtz/work_place/conf_place/hadoop2/cdp_liantong/auth/kerberos/hive.keytab,executable:false\",\"flinkx.dirty-data.jdbc.table\":\"flinkx_dirty_data_tiezhu\",\"flinkx.dirty-data.jdbc.url\":\"jdbc:mysql://172.16.100.186:3306/test\",\"flinkx.dirty-data.output-type\":\"log\",\"flinkx.dirty-data.log.print-interval\":1,\"flinkx.dirty-data.jdbc.password\":\"Abc123456\",\"flinkx.dirty-data.max-collect-failed-rows\":4,\"flinkx.dirty-data.jdbc.username\":\"test\",\"flinkx.dirty-data.max-rows\":2}

任务JVM参数

-Djava.security.krb5.conf=/Users/wtz/work_place/conf_place/hadoop2/cdp_liantong/auth/kerberos/krb5.conf
任务环境变量
HADOOP_USER_NAME=hive
Flink配置文件
Flink 配置文件里需要添加kerberos认证相关信息
security.kerberos.login.use-ticket-cache: true security.kerberos.login.keytab: /Users/wtz/work_place/conf_place/hadoop2/cdp_liantong/auth/kerberos/hive.keytab security.kerberos.login.principal: hive/cdp01@CDP7DTSTACK.COM

配置好这些就能提交任务了。

FlinkX 1.10 版本
与FlinkX 1.12 相比,就一个任务提交参数不同,其他都一样;
认证相关参数:
-krb5conf /Users/wtz/work_place/conf_place/hadoop2/cdp_liantong/auth/kerberos/krb5.conf -keytab /Users/wtz/work_place/conf_place/hadoop2/cdp_liantong/auth/kerberos/hive.keytab -principal hive/cdp01@CDP7DTSTACK.COM
完整参数:
-mode yarnPer -job /Users/wtz/work_place/job_place/json/1.10/binlog-stream.json -pluginRoot /Users/wtz/work_place/project_place/flinkx_1.10/flinkx/syncplugins -flinkLibJar /Users/wtz/work_place/jar_place/flink-1.10.1/lib -flinkconf /Users/wtz/work_place/conf_place/flink/kudu1 -yarnconf /Users/wtz/work_place/conf_place/hadoop2/cdp_liantong/conf -pluginLoadMode shipfile -queue default -confProp "{\"flink.checkpoint.interval\":30000}" -krb5conf /Users/wtz/work_place/conf_place/hadoop2/cdp_liantong/auth/kerberos/krb5.conf -keytab /Users/wtz/work_place/conf_place/hadoop2/cdp_liantong/auth/kerberos/hive.keytab -principal hive/cdp01@CDP7DTSTACK.COM

问题记录

任务提交出现空指针问题
这个问题出现在FlinkX 1.12 版本,环境为CDP,问题在不同的环境不一定存在;
问题具体报错:
2022-01-11 17:56:02,685 - 10821 WARN [main] org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory:The short-circuit local reads feature cannot be used because libhadoop cannot be loaded. 2022-01-11 17:56:04,475 - 12611 WARN [Thread-6] org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer:DataStreamer Exception java.lang.NullPointerException at org.apache.hadoop.crypto.CryptoInputStream.(CryptoInputStream.java:133) at org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil.createStreamPair(DataTransferSaslUtil.java:345) at org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient.doSaslHandshake(SaslDataTransferClient.java:490) at org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient.getEncryptedStreams(SaslDataTransferClient.java:299) at org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient.send(SaslDataTransferClient.java:242) at org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient.checkTrustAndSend(SaslDataTransferClient.java:211) at org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient.socketSend(SaslDataTransferClient.java:183) at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.createBlockOutputStream(DFSOutputStream.java:1437) at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1385) at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:554) Exception in thread "main" org.apache.flink.client.deployment.ClusterDeploymentException: Could not deploy Yarn job cluster. at org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:491) at com.dtstack.flinkx.client.yarn.YarnPerJobClusterClientHelper.submit(YarnPerJobClusterClientHelper.java:97) at com.dtstack.flinkx.client.Launcher.main(Launcher.java:126) Caused by: java.io.IOException: DataStreamer Exception: at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:668) Caused by: java.lang.NullPointerException at org.apache.hadoop.crypto.CryptoInputStream.(CryptoInputStream.java:133) at org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil.createStreamPair(DataTransferSaslUtil.java:345) at org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient.doSaslHandshake(SaslDataTransferClient.java:490) at org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient.getEncryptedStreams(SaslDataTransferClient.java:299) at org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient.send(SaslDataTransferClient.java:242) at org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient.checkTrustAndSend(SaslDataTransferClient.java:211) at org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient.socketSend(SaslDataTransferClient.java:183) at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.createBlockOutputStream(DFSOutputStream.java:1437) at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1385) at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:554)
跟踪代码之后,发现在这部分代码返回了null值,
org.apache.hadoop.crypto.CryptoCodec#getInstance#getCodecClasses
getInstance方法:
http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/f9d479a64c728ff3b52399d14680612f..png
getCodecClasses方法
http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/5dabe2e2f34ad7dcf97cba63586d4daa..png
这里的configName值为
hadoop.security.crypto.codec.classes.aes.ctr.nopadding
http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/90e797be204a1d25e1434607fc289cc2..png
查看了hadoop 官方文档,发现有个参数
 
http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/ff263127914495dcbcba5ec6b0e12843..png
完整描述为:
Comma-separated list of crypto codec implementations for AES/CTR/NoPadding. The first implementation will be used if available, others are fallbacks.
解决方案:
在core-site.xml文件里,把这个相关参数加上即可。
http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/4fe641332395c335270c7ad140d6d851..png
0条评论
社区公告
  • 大数据领域最专业的产品&技术交流社区,专注于探讨与分享大数据领域有趣又火热的信息,专业又专注的数据人园地

最新活动更多
微信扫码获取数字化转型资料
钉钉扫码加入技术交流群