一二, Spark概述和快速入门
一, Spark概述1.1 什么是SparkSpark是一种基于内存的快速,通用,可扩展的大数据分析计算引擎;1.2 Spark && HadoopSParkHadoop1.Scala开发, 快速通用,扩展的大数据分析引擎1. Java开发, 在分布式服务器集群上存储海量数据并运行分布式分析应用的开源框架2. Spark Core 提供了Spark最基础,核心的内容2. HDFS基
·
一, Spark概述
1.1 什么是Spark
- Spark是一种基于内存的快速,通用,可扩展的
大数据分析计算引擎; - “Apache Spark” is a unified analytics engine for large-scale data processing.
1.2 Spark的四大特点
1.2.1 Spark 速度快
- Spark运行速度相对于hadoop提高100倍;
- Apache Spark支持内存计算,并且通过DAG(有向无环图)执行引擎支持无环数据流, 实现批量和流式数据的高性能;

Spark处理数据相比于MapReduce处理数据:
- Spark处理数据, 可以将中间处理结果存储到内存中;
- Spark提供了非常丰富的算子(API), 可以做到复杂程序在一个Spark程序中完成;
☆☆☆:
Spark 比Hadoop快的原因(面试!)
详见文章: 点我
1.2.2 Spark 使用简单
- 可以通过各种不同语言快速编写Spark程序(Scala, Java, Python, R, SQL等)

可以看到spark读取数据和使用数据的方式还是比较易于理解的.
1.2.3 Spark 通用性强
- Spark框架不再是一个简单的框架, 可以把Spark理解成一个Spark生态系统, 它的内部包含了很多模块, 基于不同的应用场景可以选择对应的模块去使用:
- SparkSQL: 通过SQL去开发Spark程序做一些
离线分析; - SparkStreaming:
实时计算, 去处理一些流式数据; - MLib: 封装了一些
机器学习的算法库; - GraphX:
图计算;
- SparkSQL: 通过SQL去开发Spark程序做一些

1.2.4 Spark 兼容性高
spark程序就是一个计算逻辑程序,这个任务要运行就需要计算资源(内存、cpu、磁盘),哪里可以给当前这个任务提供计算资源,就可以把spark程序提交到哪里去运行。目前主要的运行方式是下面的standAlone和yarn。
- standAlone:它是spark自带的独立运行模式,整个任务的资源分配由spark集群的老大Master负责
- yarn:可以把spark程序提交到yarn中运行,整个任务的资源分配由yarn中的老大ResourceManager负责
- mesos:它也是apache开源的一个类似于yarn的资源调度平台

1.3 Spark vs Hadoop
| SPark | Hadoop |
|---|---|
| 1.Scala开发, 快速通用,扩展的大数据分析引擎 | 1. Java开发, 在分布式服务器集群上存储海量数据并运行分布式分析应用的开源框架 |
| 2. Spark Core 提供了Spark最基础,核心的内容 | 2. HDFS基于GFS理论, 分布式存储数据 |
| 3. SpaekSQL 是Spark用来操作结构化数据的组件, 通过Spark SQL, 用户可使用SQL或者HQL来查询数据 | 3. MapReduce基于Goole MapReduce, 分布式计算 |
| 4. SparkStreaming是Spark平台上针对实时数据进行了流式计算的组件, 提供了丰富的处理数据流的API | 5. Hbase基于Bigtable, 分布式数据库, 擅长实时的随机读写超大规模数据集 |
- 一次性数据计算:
框架在处理数据的时候, 会从存储设备中读取数据, 进行逻辑操作, 然后将处理的结果重新存储到介质中。
二, Spark 快速上手
- 简单的WordCount程序
- 必须的环境:
- IDEA装好
Scala插件: 参考本文
- IDEA装好
2.1 创建Maven项目
-
new project ->选则合适的project jdk(jdk 1.8) ->下一步->填好合适的gav
-
在 settings -> plugins
安装好scala插件, 并给参考本文 -
在 project structure -> global Libraries中
添加 scala的sdk
-
在maven项目中新建新的module, 对项目进行
分类
-
pom文件中, 添加Spark 3.0 的
依赖
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.0.0</version>
</dependency>
</dependencies>
- 配置log4j, 更好的跟踪程序执行日志, 即在maven项目的resources目录创建log4j.properties文件, 并添加日志配置信息如下:
- 控制日志级别. 只有ERROR才会显示
log4j.rootCategory=ERROR, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd
HH:mm:ss} %p %c{1}: %m%n
# Set the default spark-shell log level to ERROR. When running the spark-shell,
the
# log level for this class is used to overwrite the root logger's log level, so
that
# the user can have different defaults for the shell and regular Spark apps.
log4j.logger.org.apache.spark.repl.Main=ERROR
# Settings to quiet third party logs that are too verbose
log4j.logger.org.spark_project.jetty=ERROR
log4j.logger.org.spark_project.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=ERROR
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=ERROR
log4j.logger.org.apache.parquet=ERROR
log4j.logger.parquet=ERROR
# SPARK-9183: Settings to avoid annoying messages when looking up nonexistent
UDFs in SparkSQL with Hive support
log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL
log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR

2.2 Spark’s WordCount
- 让我们先用Scala的思路写一遍wordcount流程
不熟悉Scala集合函数的可以翻看笔者的文章: 五-2, Scala集合常用函数全总结
object SparkWordCountDemo {
def main(args: Array[String]): Unit = {
/////spark api 操作
//2. 创建配置对象, 并设置好必须的参数
var conf = new SparkConf
conf.setMaster("local[*]") //本地模式
conf.setAppName("WordCountEasy")
//1.创建Spark上下文,
var sc: SparkContext = new SparkContext(conf)
//3. 操作
//////3.1 读取文件数据
val line: RDD[String] = sc.textFile("spark_demo_data/input")
/////3.2 把每行的数据按空格切分, 然后每个单词单独存储起来
val words: RDD[String] = line.flatMap(x => x.split(" ")) /// ? 产生的是怎样的数据
/////3.3 对单词进行分组. 记住, 组名作为key, 每一组的元素是一个value
val groupedWords: RDD[(String, Iterable[String])] = words.groupBy(words => words)
////3.4 转换单词组为 (word, words集合) => (wrods, String的一个迭代器) => (words, words.size)
val resRDD: RDD[(String, Int)] = groupedWords.map(x => (x._1, x._2.size))
////3.5 将转换结果采集到控制台
val res: Array[(String, Int)] = resRDD.collect()
println("=====")
println(res.mkString("Array(",",",")"))
//sc.groupBy
//4. 关闭连接
sc.stop()
}
}
- 既然我们已经在学习Spark了, 就要尝试下用spark的聚合计算方法 ‘reduceByKey’
object WordCount {
def main(args: Array[String]): Unit = {
//2. 创建Spark的配置文件对象, 并设置一些必要的配置
val conf = new SparkConf()
conf.setMaster("local[*]")
conf.setAppName("StandardWordCount")
//1. 创建Spark上下文,
//2.1 向上下文对象中传入配置对象
val sc = new SparkContext(conf)
//3. 读取文件的每一行
val line: RDD[String] = sc.textFile("spark_demo_data/input")
//4. 格式化每一行, 分词
val words: RDD[String] = line.flatMap(_.split(" "))
//5. 给每个单词都加上1标识, 即 (word, 1)
val wordsWithOne: RDD[(String, Int)] = words.map(x => (x, 1))
//6. 规约
val resRDD: RDD[(String, Int)] = wordsWithOne.reduceByKey(_ + _)
//7. 从内存中把结果取出来, 并输出
val resArray: Array[(String, Int)] = resRDD.collect()
println(resArray.mkString("Array(",",",")") + "\n")
//8. 关闭资源连接
sc.stop()
}
}
- 沿着上面的这个思路, 我们还可以利用Scala集合的高级函数实现wordcount
package cn.cyy.spark.core.wordcountdemo.wordcount
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object WordCount {
def main(args: Array[String]): Unit = {
//2. 创建Spark的配置文件对象, 并设置一些必要的配置
val conf = new SparkConf()
conf.setMaster("local[*]")
conf.setAppName("StandardWordCount")
//1. 创建Spark上下文,
//2.1 向上下文对象中传入配置对象
val sc = new SparkContext(conf)
//3. 读取文件的每一行
val line: RDD[String] = sc.textFile("spark_demo_data/input")
//4. 格式化每一行, 分词
val words: RDD[String] = line.flatMap(_.split(" "))
//5. 给每个单词都加上1标识, 即 (word, 1)
val wordsWithOne: RDD[(String, Int)] = words.map(x => (x, 1))
//5.1 分组
val groupedWordWithOne: RDD[(String, Iterable[(String, Int)])] = wordsWithOne.groupBy(tuple => tuple._1)
//5.2 先reduce, 之后map
val resRDD: RDD[(String, Int)] = groupedWordWithOne.map {
case (word, list) => {
list.reduce(
(x, y) => {
(x._1, x._2 + y._2)
}
)
}
}
// //6. 规约
// val resRDD: RDD[(String, Int)] = wordsWithOne.reduceByKey(_ + _)
//7. 从内存中把结果取出来, 并输出
val resArray: Array[(String, Int)] = resRDD.collect()
println(resArray.mkString("Array(",",",")"))
//8. 关闭资源连接
sc.stop()
}
}
更多推荐
所有评论(0)