Spark 是一种快速、通用、可扩展的大数据分析引擎,2009 年诞生于加州大 学伯克利分校 AMPLab,2010 年开源,2013 年 6 月成为 Apache 孵化项目,2014 年 2 月成为 Apache 顶级项目。目前,Spark 生态系统已经发展成为一个包含多个 子项目的集合,其中包含 SparkSQL、Spark Streaming、GraphX、MLlib 等子项目, Spark 是基于内存计算的大数据并行计算框架。Spark 基于内存计算,提高了在大 数据环境下数据处理的实时性,同时保证了高容错性和高可伸缩性,允许用户将 Spark 部署在大量廉价硬件之上,形成集群。Spark 得到了众多大数据公司的支持, 这些公司包括 Hortonworks、IBM、Intel、Cloudera、MapR、Pivotal、百度、阿里、腾讯、京东、携程、优酷土豆。当前百度的 Spark 已应用于凤巢、大搜索、直达 号、百度大数据等业务;阿里利用 GraphX 构建了大规模的图计算和图挖掘系统, 实现了很多生产系统的推荐算法;腾讯 Spark 集群达到 8000 台的规模,是当前 已知的世界上最大的 Spark 集群。
Spark 是一个开源的类似于 Hadoop MapReduce 的通用的并行计算框架,Spark 基于 map reduce 算法实现的分布式计算,拥有 Hadoop MapReduce 所具有的优 点;但不同于 MapReduce 的是 Spark 中的 Job 中间输出和结果可以保存在内存中, 从而不再需要读写 HDFS,因此 Spark 能更好地适用于数据挖掘与机器学习等需 要迭代的 map reduce 的算法。
Spark 是 MapReduce 的替代方案,而且兼容 HDFS、Hive,可融入 Hadoop 的生态 系统,以弥补 MapReduce 的不足。
Spark 特点
快
与 Hadoop 的 MapReduce 相比,Spark 基于内存的运算要快 100 倍以上,基 于硬盘的运算也要快 10 倍以上。Spark 实现了高效的 DAG 执行引擎,可以通过 基于内存来高效处理数据流。
易用
Spark 支持 Java、Python 和 Scala 的 API,还支持超过 80 种高级算法,使用户可以快速构建不同的应用。而且 Spark 支持交互式的 Python 和 Scala 的 shell,可 以非常方便地在这些 shell 中使用 Spark 集群来验证解决问题的方法。
通用
Spark 提供了统一的解决方案。Spark 可以用于批处理、交互式查询 (Spark SQL)、实时流处理(Spark Streaming)、机器学习(Spark MLlib)和图 计算(GraphX)。这些不同类型的处理都可以在同一个应用中无缝使用。Spark 统一的解决方案非常具有吸引力,毕竟任何公司都想用统一的平台去处理遇到的 问题,减少开发和维护的人力成本和部署平台的物力成本。
兼容性
Spark 可以非常方便地与其他的开源产品进行融合。比如,Spark 可以使用 Hadoop 的 YARN 和 Apache Mesos 作为它的资源管理和调度器,器,并且可以处 理所有 Hadoop 支持的数据,包括 HDFS、HBase 和 Cassandra 等。这对于已经部 署 Hadoop 集群的用户特别重要,因为不需要做任何数据迁移就可以使用 Spark 的强大处理能力。Spark 也可以不依赖于第三方的资源管理和调度器,它实现了 Standalone 作为其内置的资源管理和调度框架,这样进一步降低了 Spark 的使用 门槛,使得所有人都可以非常容易地部署和使用 Spark。此外,Spark 还提供了在 EC2 上部署 Standalone 的 Spark 集群的工具。
搭建一个 Spark 集群
下载 spark 安装包 下载地址: spark 官网
这里我们使用 spark-2.0.2-bin-hadoop2.7 版本.
下载好之后上传, 解压
tar -zxvf spark-2.0.2-bin-hadoop2.7.tgz
1
重命名
mv spark-2.0.2-bin-hadoop2.7 spark
1
修改配置文件
#配置文件目录在 /opt/bigdata/spark/conf
#vi spark-env.sh 修改文件(先把 spark-env.sh.template 重命名 为 spark-env.sh)
#配置java环境变量
export JAVA_HOME=/export/server/jdk1.8.0_65
#指定spark老大Master的IP
#export SPARK_MASTER_HOST=node-1
#指定spark老大Master的端口
export SPARK_MASTER_PORT=7077
#通过zookeeper搭建高可用spark集群
export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=node-1:2181,node-2:2181,node-3:2181 -Dspark.deploy.zookeeper.dir=/spark"
#指定HDFS配置文件目录
export HADOOP_CONF_DIR=/export/server/hadoop-2.7.4/etc/hadoop
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
参数说明 spark.deploy.recoveryMode:恢复模式(Master 重新启动的模式) 有三种:
(1)ZooKeeper
(2) FileSystem
(3)NONE
spark.deploy.zookeeper.url:ZooKeeper 的 Server 地址 spark.deploy.zookeeper.dir:保存集群元数据信息的文件、目录。 包括 Worker,Driver 和 Application。
注意:在普通模式下启动 spark 集群,只需要在主机上面执行 start-all.sh 就可以了。 在高可用模式下启动 spark 集群,先需要在任意一台节点上启动 start-all.sh 命令。 然后在另外一台节点上单独启动 master。
命令
start-master.sh
1
#修改文件(先把 slaves.template 重命名为 slaves)
vi slaves
1
2
拷贝配置到其他主机
#通过 scp 命令将 spark 的安装目录拷贝到其他机器上
scp -r /export/server/spark hdp-node-02:/export/server/
scp -r /export/server/spark hdp-node-03:/export/server/
1
2
3
配置 spark 环境变量
vi /etc/profile
#添加
export SPARK_HOME=/export/server/spark
export PATH=${SPARK_HOME}/bin:${SPARK_HOME}/sbin:$PATH
1
2
3
4
注意最后 source /etc/profile 刷新配置
高可用部署说明
Spark Standalone 集群是 Master-Slaves 架构的集群模式,和大部分的 Master-Slaves 结构集群一样,存在着 Master 单点故障的问题。如何解决这个 单点故障的问题,Spark 提供了两种方案:
(1)基 于 文 件 系 统 的 单 点 恢 复 (Single-Node Recovery with Local File System)。主要用于开发或测试环境。当 spark 提供目录保存 spark Application 和 worker 的注册信息,并将他们的恢复状态写入该目录中,这时,一旦 Master 发生故障,就可以通过重新启动 Master 进程(sbin/start-master.sh),恢复 已运行的 spark Application 和 worker 的注册信息。
(2)基于 zookeeper 的 Standby Masters(Standby Masters with ZooKeeper)。 用于生产模式。其基本原理是通过 zookeeper 来选举一个 Master,其他 的 Master 处于 Standby 状态。将 spark 集群连接到同一个 ZooKeeper 实例并启 动多个 Master,利用 zookeeper 提供的选举和状态保存功能,可以使一个 Master 被选举成活着的 master,而其他 Master 处于 Standby 状态。如果现任 Master死去,另一个 Master 会通过选举产生,并恢复到旧的 Master 状态,然后恢复调 度。整个恢复过程可能要 1-2 分钟。
#在主节点上启动 spark
spark/sbin/start-all.sh
#在从节点上启动master
start-master.sh
#在主节点上停止 spark 集群
spark/sbin/stop-all.sh
#在从节点上停止master
stop-master.sh
1
2
3
4
5
6
7
8
正常启动 spark 集群后,可以通过访问 http://node-01:8080,查看 spark 的 web 界面, 查看相关信息。
Spark 角色介绍
Spark 是基于内存计算的大数据并行计算框架。因为其基于内存计算,比 Hadoop 中 MapReduce 计算框架具有更高的实时性,同时保证了高效容错性和可伸缩性。从 2009 年诞生于 AMPLab 到现在已经成为 Apache 顶级开源项目,并成 功应用于商业集群中,学习 Spark 就需要了解其架构。 Spark 架构图如下:
Spark 架构使用了分布式计算中 master-slave 模型,master 是集群中含 有 master 进程的节点,slave 是集群中含有 worker 进程的节点。
Driver Program :运⾏main 函数并且新建 SparkContext 的程序。
Application:基于 Spark 的应用程序,包含了 driver 程序和集群上的 executor。
Cluster Manager:指的是在集群上获取资源的外部服务。目前有三种类型
(1)Standalone: spark 原生的资源管理,由 Master 负责资源的分配
(2)Apache Mesos:与 hadoop MR 兼容性良好的一种资源调度框架
(3)Hadoop Yarn: 主要是指 Yarn 中的 ResourceManager
Worker Node:集群中任何可以运行 Application 代码的节点,在 Standalone 模式中指的是通过 slaves 文件配置的 Worker 节点,在 Spark on Yarn 模式 下就是 NodeManager 节点
Executor:是在一个 worker node 上为某应⽤启动的⼀个进程,该进程负责 运⾏行任务,并且负责将数据存在内存或者磁盘上。每个应⽤都有各自独立 的 executor。
Task:被送到某个 executor 上的工作单元。
编写简单的 Spark 应用程序
执行第一个 spark 程序
#普通模式提交任务
bin/spark-submit --class org.apache.spark.examples.SparkPi --master spark://node-01:7077 --executor-memory 1G --total-executor-cores 2 examples/jars/spark-examples_2.11-2.0.2.jar 10
1
2
该算法是利用蒙特·卡罗算法求圆周率 PI,通过计算机模拟大量的随机数, 最终会计算出比较精确的π。
在高可用模式下,因为涉及到多个 Master,所以对于应用程序的提交就有了 一点变化,因为应用程序需要知道当前的 Master 的 IP 地址和端口。这种 HA 方 案处理这种情况很简单,只需要在SparkContext指向一个Master列表就可以了, 如 spark://host1:port1,host2:port2,host3:port3,应用程序会轮询列表,找 到活着的 Master。
bin/spark-submit --class org.apache.spark.examples.SparkPi --master spark://node-1:7077,node-2:7077 --executor-memory 1G --total-executor-cores 1 examples/jars/spark-examples_2.11-2.0.2.jar 10
1
[root@node-1 spark]# bin/spark-submit --class org.apache.spark.examples.SparkPi --master spark://node-1:7077,node-2:7077 --executor-memory 1G --total-executor-cores 1 examples/jars/spark-examples_2.11-2.0.2.jar 10
20/09/30 09:52:28 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
20/09/30 09:52:32 WARN SparkContext: Use an existing SparkContext, some configuration may not take effect.
[Stage 0:> (0 + 0) / 10]20/09/30 09:52:48 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
20/09/30 09:53:03 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
20/09/30 09:53:18 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
[Stage 0:> (0 + 0) / 10]20/09/30 09:53:33 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
20/09/30 09:53:48 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
Pi is roughly 3.1456431456431457
1
2
3
4
5
6
7
8
9
Spark-Shell
spark-shell 是 Spark 自带的交互式 Shell 程序,方便用户进行交互式编程,用 户可以在该命令行下用 scala 编写 spark 程序。
运行 spark-shell --master local[N] 读取本地文件
单机模式:通过本地 N 个线程跑任务,只运行一个 SparkSubmit 进程。
读取本地文件,实现文件内的单词计数。本地文件 words.txt 内容如下:
hello me
hello you
hello her
1
2
3
运行 spark-shell --master local[2]
观察启动的进程
编写 scala 代码
sc.textFile("file:///root///words.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect
1
代码说明:
sc:Spark-Shell 中已经默认将 SparkContext 类初始化为对象 sc。用户代码如 果需要用到,则直接应用 sc 即可。
textFile:读取数据文件
flatMap:对文件中的每一行数据进行压平切分,这里按照空格分隔。 map:对出现的每一个单词记为 1(word,1)
reduceByKey:对相同的单词出现的次数进行累加
collect:触发任务执行,收集结果数据。
观察结果:
运行 spark-shell --master local[N] 读取 HDFS 上数据
整合 spark 和 HDFS,修改配置文件 在 spark-env.sh ,添加 HADOOP_CONF_DIR 配置,指明了 hadoop 的配置文件 后,默认它就是使用的 hdfs 上的文件
export HADOOP_CONF_DIR=/export/server/hadoop-2.7.4/etc/hadoop
1
再启动启动 hdfs,然后重启 spark 集群 , 向 hdfs 上传一个文件到 hdfs://node-1:9000/words.txt
在 spark shell 中用 scala 语言编写 spark 程序
sc.textFile("/words.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect
1
运行 spark-shell 指定具体的 master 地址
spark-shell 运行时指定具体的 master 地址,读取 HDFS 上的数据,做单词 计数,然后将结果保存在 HDFS 上。
spark-shell --master spark://node-1:7077 --executor-memory 1g --total-executor-cores 1
1
参数说明:
–master spark://hdp-node-01:7077 指定 Master 的地址
–executor-memory 1g 指定每个 worker 可用内存为 1g
–total-executor-cores 2 指定整个集群使用的 cup 核数为 2 个
注意: 如果启动 spark shell 时没有指定 master 地址,但是也可以正常启动 spark shell 和执行 spark shell 中的程序,其实是启动了 spark 的 local 模式,该模式仅在本机 启动一个进程,没有与集群建立联系。
编写 scala 代码
sc.textFile("/words.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).saveAsTextFile("/wc")
1
saveAsTextFile:保存结果数据到文件中
查看 hdfs 上结果
在 IDEA 中编写 WordCount 程序
spark-shell 仅在测试和验证我们的程序时使用的较多,在生产环境中,通常 会在 IDEA 中编写程序,然后打成 jar 包,最后提交到集群。最常用的是创建一个 Maven 项目,利用 Maven 来管理 jar 包的依赖。
创建一个项目
选择 Maven 项目,然后点击 next
填写项目名称, maven 的 GAV,然后点击 完成
配置 Maven 的 pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.chuang</groupId>
<artifactId>spark_test</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<scala.version>2.11.8</scala.version>
<hadoop.version>2.7.4</hadoop.version>
<spark.version>2.0.2</spark.version>
</properties>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
</dependencies>
<build>
<sourceDirectory>src/main/scala</sourceDirectory>
<testSourceDirectory>src/test/scala</testSourceDirectory>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
<configuration>
<args>
<arg>-dependencyfile</arg>
<arg>${project.build.directory}/.scala_dependencies</arg>
</args>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass></mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
修改 src/main/scala 和 src/test/scala,与 pom.xml 中的配置保持一致
新建一个 scala class,类型为 Object
创建包
new 新文件没有scala菜单解决方法
点击上图+号添加scala
编写 spark 程序
package com.chuang
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object WordCount {
def main(args: Array[String]): Unit = {
//设置spark的配置文件
val sparkConf: SparkConf = new SparkConf().setAppName("WordCount")
//构建sparkcontext上下文对象, 它是程序的入口, 所有计算的源头
val sc: SparkContext = new SparkContext(sparkConf)
//读取文件, 第一个参数是文件地址
val file: RDD[String] = sc.textFile(args(0))
//对文件中每一行单词进行压平切分
val word: RDD[String] = file.flatMap(_.split(" "))
//对每一个单词计数为1转化为(单词,1)
val wordAndOne: RDD[(String, Int)] = word.map(x=>(x,1))
//相同的单词进行汇总, 前一个下划线表示累加数据, 后一个下划线表示新数据
val result: RDD[(String, Int)] = wordAndOne.reduceByKey(_+_)
//保存数据到Hdfs
result.saveAsTextFile(args(1))
sc.stop()
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
使用 Maven 打包
点击 idea 右侧的 Maven选项
点击 Lifecycle,点击 package
选择编译成功的 jar 包,并将该 jar 上传到 Spark 集群中的某个节点上
启动 hdfs 和 Spark 集群
使用 spark-submit 命令提交 Spark 应用
注意参数的顺序
spark-submit --class com.chuang.WordCount --master spark://node-1:7077 --executor-memory 1g --total-executor-cores 1 /export/software/original-spark_test-1.0-SNAPSHOT.jar /input/words.txt /spark_out
1
这里通过 spark-submit 提交任务到集群上。用的是 spark 的 Standalone 模式 Standalone 模式是 Spark 内部默认实现的一种集群管理模式,这种模式是通过 集群中的 Master 来统一管理资源。
[root@node-1 ~]# spark-submit --class com.chuang.WordCount --master spark://node-1:7077 --executor-memory 1g --total-executor-cores 1 /export/software/original-spark_test-1.0-SNAPSHOT.jar /input/words.txt /spark_out
20/09/30 15:47:04 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
1
2
查看 Spark 的 web 管理界面, 地址: 192.168.136.129:8080
查看hdfs输出目录
使用 java 语言编写 spark wordcount 程序
package com.chuang;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import scala.tools.nsc.doc.model.Public;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
//todo:需求:利用java语言开发spark Wordcount程序
public class WordCount_java {
public static void main(String[] args) {
//创建sparkconf对象
SparkConf sparkConf = new SparkConf().setAppName("WordCount_Java").setMaster("local[2]");
//创建sparkcontext对象
JavaSparkContext sparkContext = new JavaSparkContext(sparkConf);
//读取数据文件
JavaRDD<String> dataJavaRDD = sparkContext.textFile("C:\\Users\\Administrator\\Desktop\\hbase-site.xml");
//切分每一行
JavaRDD<String> wordsJavaRDD = dataJavaRDD.flatMap(new FlatMapFunction<String, String>() {
public Iterator<String> call(String line) throws Exception{
String[] words = line.split(" ");
return Arrays.asList(words).iterator();
}
});
//每个单词记为1
JavaPairRDD<String, Integer> wordAndOneJavaPairRDD = wordsJavaRDD.mapToPair(new PairFunction<String, String, Integer>() {
public Tuple2<String, Integer> call(String word) throws Exception {
return new Tuple2<String, Integer>(word, 1);
}
});
//相同单词出现的次数累加
JavaPairRDD<String, Integer> resultJavaPairRDD = wordAndOneJavaPairRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {
public Integer call(Integer integer, Integer integer2) throws Exception {
return integer + integer2;
}
});
//根据单词出现的次数进行降序排列, 思想:先把(单词,次数)位置颠倒(次数,单词)
JavaPairRDD<Integer, String> reverseJavaPairRDD = resultJavaPairRDD.mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() {
public Tuple2<Integer, String> call(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
return new Tuple2<Integer, String>(stringIntegerTuple2._2, stringIntegerTuple2._1);
}
});
//按照单词出现的次数进行降序排列,调用sortbykey方法,然后将(次数,单词)颠倒为(单词,次数)
JavaPairRDD<String, Integer> sortJavaPairRDD = reverseJavaPairRDD.sortByKey(false).mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() {
public Tuple2<String, Integer> call(Tuple2<Integer, String> integerStringTuple2) throws Exception {
return new Tuple2<String, Integer>(integerStringTuple2._2, integerStringTuple2._1);
}
});
//搜集结果数据
List<Tuple2<String, Integer>> finalResult = sortJavaPairRDD.collect();
//循环打印结果数据
for (Tuple2<String, Integer> tuple:finalResult){
System.out.println(tuple._1+"出现的次数"+tuple._2);
}
//关闭sparkcontext
sparkContext.stop();
}
}
免责申明:
本文系转载,版权归原作者所有,如若侵权请联系我们进行删除!
《数据治理行业实践白皮书》下载地址:https://fs80.cn/4w2atu
《数栈V6.0产品白皮书》下载地址:https://fs80.cn/cw0iw1
想了解或咨询更多有关袋鼠云大数据产品、行业解决方案、客户案例的朋友,浏览袋鼠云官网:https://www.dtstack.com/?src=bbs
同时,欢迎对大数据开源项目有兴趣的同学加入「袋鼠云开源框架钉钉技术群」,交流最新开源技术信息,群号码:30537511,项目地址:https://github.com/DTStack