博客 SpringBoot 使用 Spark

SpringBoot 使用 Spark

   数栈君   发表于 2023-09-08 11:49  824  0

前提:


    1.SpringBoot 已经接入 Spark

    2.已配置 JavaSparkContext

    3.已配置 SparkSession

@Resource
private SparkSession sparkSession;

@Resource
private JavaSparkContext javaSparkContext;

读取 txt 文件

测试文件 word.txt
 

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/c1c26c8fbbd99818141ce2b7494fd635..png
  

java 代码

    ●textFile:获取文件内容,返回 JavaRDD

    ●flatMap:过滤数据

    ●mapToPair:把每个元素都转换成一个类型的对象,如 <123,1>,<456,1>

    ●reduceByKey:对相同key的数据集进行预聚合

public void testSparkText() {
    String file = "D:\\TEMP\\word.txt";
    JavaRDD fileRDD = javaSparkContext.textFile(file);

    JavaRDD wordsRDD = fileRDD.flatMap(line -> Arrays.asList(line.split(" ")).iterator());
    JavaPairRDD wordAndOneRDD = wordsRDD.mapToPair(word -> new Tuple2<>(word, 1));
    JavaPairRDD wordAndCountRDD = wordAndOneRDD.reduceByKey((a, b) -> a + b);

    //输出结果
    List> result = wordAndCountRDD.collect();
    result.forEach(System.out::println);
}

结果得出,123 有 3 个,456 有 2 个,789 有 1 个

 

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/76b7855ec798235b01443dbf08e39f66..png
  


读取 csv 文件

测试文件 testcsv.csv
http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/4be33abbec843c5c2ee1519666446418..png
  

java 代码

public void testSparkCsv() {
    String file = "D:\\TEMP\\testcsv.csv";
    JavaRDD fileRDD = javaSparkContext.textFile(file);
    JavaRDD wordsRDD = fileRDD.flatMap(line -> Arrays.asList(line.split(",")).iterator());

    //输出结果
    System.out.println(wordsRDD.collect());
}

输出结果

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/9016163ec6ffe554173475586d10613d..png
  


读取 MySQL 数据库表

    ●format:获取数据库建议是 jdbc

    ●option.url:添加 MySQL 连接 url

    ●option.user:MySQL 用户名

    ●option.password:MySQL 用户密码

    ●option.dbtable:sql 语句

    ●option.driver:数据库 driver,MySQL 使用 com.mysql.cj.jdbc.Driver

public void testSparkMysql() throws IOException {

    Dataset jdbcDF = sparkSession.read()
                .format("jdbc")
                .option("url", "jdbc:mysql://192.168.140.1:3306/user?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai")
                .option("dbtable", "(SELECT * FROM xxxtable) tmp")
                .option("user", "root")
                .option("password", "xxxxxxxxxx*k")
                .option("driver", "com.mysql.cj.jdbc.Driver")
                .load();

    jdbcDF.printSchema();
    jdbcDF.show();

    //转化为RDD
    JavaRDD rowJavaRDD = jdbcDF.javaRDD();
    System.out.println(rowJavaRDD.collect());
}

也可以把表内容输出到文件,添加以下代码

List list = rowJavaRDD.collect();
BufferedWriter bw;
bw = new BufferedWriter(new FileWriter("d:/test.txt"));
for (int j = 0; j < list.size(); j++) {
    bw.write(list.get(j).toString());
    bw.newLine();
    bw.flush();
}
bw.close();

结果输出

 

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/c67701f6faac4c8104ac60ae2dd23f8d..png
  


读取 Json 文件
测试文件 testjson.json,内容如下

[{
    "name": "name1",
    "age": "1"
}, {
    "name": "name2",
    "age": "2"
}, {
    "name": "name3",
    "age": "3"
}, {
    "name": "name4",
    "age": "4"
}]

注意:testjson.json 文件的内容不能带格式,需要进行压缩
http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/bd6d4b51af0f158067477404000dcdc1..png
  

java 代码

    ●createOrReplaceTempView:读取 json 数据后,创建数据表 t

    ●sparkSession.sql:使用 sql 对 t 进行查询,输出 age 大于 3 的数据

public void testSparkJson() {
    Dataset df = sparkSession.read().json("D:\\TEMP\\testjson.json");
    df.printSchema();

    df.createOrReplaceTempView("t");
    Dataset row = sparkSession.sql("select age,name from t where age > 3");

    JavaRDD rowJavaRDD = row.javaRDD();
    System.out.println(rowJavaRDD.collect());
}

输出结果

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/0cb17611aeddaa50747e381d90d2d848..png
  


中文输出乱码

测试文件 testcsv.csv
http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/5bc51763a271d6f5c952b82491c6d667..png
  

public void testSparkCsv() {
    String file = "D:\\TEMP\\testcsv.csv";
    JavaRDD fileRDD = javaSparkContext.textFile(file);
    JavaRDD wordsRDD = fileRDD.flatMap(line -> Arrays.asList(line.split(",")).iterator());

    //输出结果
    System.out.println(wordsRDD.collect());
}

输出结果,发现中文乱码,可恶
 

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/b314294a5f0c07daefa248dcb923ab50..png
  
原因:textFile 读取文件没有解决乱码问题,但 sparkSession.read() 却不会乱码
解决办法:获取文件方式由 textFile 改成 hadoopFile,由 hadoopFile 指定具体编码

    public void testSparkCsv() {
        String file = "D:\\TEMP\\testcsv.csv";
        String code = "gbk";

        JavaRDD gbkRDD = javaSparkContext.hadoopFile(file, TextInputFormat.class, LongWritable.class, Text.class).map(p -> new String(p._2.getBytes(), 0, p._2.getLength(), code));
        JavaRDD gbkWordsRDD = gbkRDD.flatMap(line -> Arrays.asList(line.split(",")).iterator());

        //输出结果
        System.out.println(gbkWordsRDD.collect());
}

输出结果
http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/d02f5d4613d6c0dd95a88ee1da36bd7f..png
  




免责申明:


本文系转载,版权归原作者所有,如若侵权请联系我们进行删除!

《数据治理行业实践白皮书》下载地址:https://fs80.cn/4w2atu

《数栈V6.0产品白皮书》下载地址:
https://fs80.cn/cw0iw1

想了解或咨询更多有关袋鼠云大数据产品、行业解决方案、客户案例的朋友,浏览袋鼠云官网:
https://www.dtstack.com/?src=bbs

同时,欢迎对大数据开源项目有兴趣的同学加入「袋鼠云开源框架钉钉技术群」,交流最新开源技术信息,群号码:30537511,项目地址:
https://github.com/DTStack

0条评论
社区公告
  • 大数据领域最专业的产品&技术交流社区,专注于探讨与分享大数据领域有趣又火热的信息,专业又专注的数据人园地

最新活动更多
微信扫码获取数字化转型资料
钉钉扫码加入技术交流群